Add Optimistic Sync Scenario Testing (#10836)

* add latest changes

* fix it

* add multiclient support

* fix tests

* Apply suggestions from code review

* fix test

Co-authored-by: Raul Jordan <raul@prysmaticlabs.com>
Co-authored-by: prylabs-bulldozer[bot] <58059840+prylabs-bulldozer[bot]@users.noreply.github.com>
This commit is contained in:
Nishant Das
2022-06-10 07:24:53 +08:00
committed by GitHub
parent 18fc17c903
commit 7f443e8387
18 changed files with 363 additions and 72 deletions

View File

@@ -19,6 +19,8 @@ common_deps = [
"//io/file:go_default_library",
"//proto/eth/service:go_default_library",
"//proto/eth/v1:go_default_library",
"//math:go_default_library",
"//proto/engine/v1:go_default_library",
"//proto/prysm/v1alpha1:go_default_library",
"//testing/assert:go_default_library",
"//testing/endtoend/components:go_default_library",
@@ -64,6 +66,7 @@ go_test(
"@web3signer",
],
eth_network = "minimal",
flaky = True,
shard_count = 2,
tags = [
"e2e",
@@ -72,7 +75,6 @@ go_test(
"requires-network",
],
deps = common_deps,
flaky = True,
)
go_test(
@@ -96,6 +98,7 @@ go_test(
"@web3signer",
],
eth_network = "mainnet",
flaky = True,
shard_count = 2,
tags = [
"e2e",
@@ -105,7 +108,6 @@ go_test(
"requires-network",
],
deps = common_deps,
flaky = True,
)
go_test(
@@ -129,6 +131,7 @@ go_test(
"@web3signer",
],
eth_network = "mainnet",
flaky = True,
shard_count = 2,
tags = [
"exclusive",
@@ -138,7 +141,6 @@ go_test(
"scenario",
],
deps = common_deps,
flaky = True,
)
go_test(
@@ -162,6 +164,7 @@ go_test(
"@web3signer",
],
eth_network = "minimal",
flaky = True,
shard_count = 2,
tags = [
"exclusive",
@@ -171,5 +174,4 @@ go_test(
"scenario",
],
deps = common_deps,
flaky = True,
)

View File

@@ -26,7 +26,7 @@ type componentHandler struct {
web3Signer e2etypes.ComponentRunner
bootnode e2etypes.ComponentRunner
eth1Miner e2etypes.ComponentRunner
eth1Proxy e2etypes.ComponentRunner
eth1Proxy e2etypes.MultipleComponentRunners
eth1Nodes e2etypes.MultipleComponentRunners
beaconNodes e2etypes.MultipleComponentRunners
validatorNodes e2etypes.MultipleComponentRunners

View File

@@ -137,6 +137,14 @@ func (s *BeaconNodeSet) StopAtIndex(i int) error {
return s.nodes[i].Stop()
}
// ComponentAtIndex returns the component at the provided index.
func (s *BeaconNodeSet) ComponentAtIndex(i int) (e2etypes.ComponentRunner, error) {
if i >= len(s.nodes) {
return nil, errors.Errorf("provided index exceeds slice size: %d >= %d", i, len(s.nodes))
}
return s.nodes[i], nil
}
// BeaconNode represents beacon node.
type BeaconNode struct {
e2etypes.ComponentRunner

View File

@@ -28,7 +28,7 @@ var _ e2etypes.MultipleComponentRunners = (*NodeSet)(nil)
var _ e2etypes.MultipleComponentRunners = (*ProxySet)(nil)
var _ e2etypes.ComponentRunner = (*Miner)(nil)
var _ e2etypes.ComponentRunner = (*Node)(nil)
var _ e2etypes.ComponentRunner = (*Proxy)(nil)
var _ e2etypes.EngineProxy = (*Proxy)(nil)
// WaitForBlocks waits for a certain amount of blocks to be mined by the ETH1 chain before returning.
func WaitForBlocks(web3 *ethclient.Client, keystore *keystore.Key, blocksToWait uint64) error {

View File

@@ -110,3 +110,11 @@ func (s *NodeSet) StopAtIndex(i int) error {
}
return s.nodes[i].Stop()
}
// ComponentAtIndex returns the component at the provided index.
func (s *NodeSet) ComponentAtIndex(i int) (e2etypes.ComponentRunner, error) {
if i >= len(s.nodes) {
return nil, errors.Errorf("provided index exceeds slice size: %d >= %d", i, len(s.nodes))
}
return s.nodes[i], nil
}

View File

@@ -110,12 +110,21 @@ func (s *ProxySet) StopAtIndex(i int) error {
return s.proxies[i].Stop()
}
// ComponentAtIndex returns the component at the provided index.
func (s *ProxySet) ComponentAtIndex(i int) (e2etypes.ComponentRunner, error) {
if i >= len(s.proxies) {
return nil, errors.Errorf("provided index exceeds slice size: %d >= %d", i, len(s.proxies))
}
return s.proxies[i], nil
}
// Proxy represents an engine-api proxy.
type Proxy struct {
e2etypes.ComponentRunner
started chan struct{}
index int
cancel func()
started chan struct{}
index int
engineProxy *proxy.Proxy
cancel func()
}
// NewProxy creates and returns an engine-api proxy.
@@ -157,6 +166,7 @@ func (node *Proxy) Start(ctx context.Context) error {
// Set cancel into context.
ctx, cancel := context.WithCancel(ctx)
node.cancel = cancel
node.engineProxy = nProxy
// Mark node as ready.
close(node.started)
return nProxy.Start(ctx)
@@ -185,6 +195,22 @@ func (node *Proxy) Stop() error {
return nil
}
// AddRequestInterceptor adds in a json-rpc request interceptor.
func (node *Proxy) AddRequestInterceptor(rpcMethodName string, responseGen func() interface{}, trigger func() bool) {
node.engineProxy.AddRequestInterceptor(rpcMethodName, responseGen, trigger)
}
// RemoveRequestInterceptor removes the request interceptor for the provided method.
func (node *Proxy) RemoveRequestInterceptor(rpcMethodName string) {
node.engineProxy.RemoveRequestInterceptor(rpcMethodName)
}
// ReleaseBackedUpRequests releases backed up http requests which
// were previously ignored due to our interceptors.
func (node *Proxy) ReleaseBackedUpRequests(rpcMethodName string) {
node.engineProxy.ReleaseBackedUpRequests(rpcMethodName)
}
func parseJWTSecretFromFile(jwtSecretFile string) ([]byte, error) {
enc, err := file.ReadFileAsBytes(jwtSecretFile)
if err != nil {

View File

@@ -127,6 +127,14 @@ func (s *LighthouseBeaconNodeSet) StopAtIndex(i int) error {
return s.nodes[i].Stop()
}
// ComponentAtIndex returns the component at the provided index.
func (s *LighthouseBeaconNodeSet) ComponentAtIndex(i int) (e2etypes.ComponentRunner, error) {
if i >= len(s.nodes) {
return nil, errors.Errorf("provided index exceeds slice size: %d >= %d", i, len(s.nodes))
}
return s.nodes[i], nil
}
// LighthouseBeaconNode represents a lighthouse beacon node.
type LighthouseBeaconNode struct {
e2etypes.ComponentRunner

View File

@@ -131,6 +131,14 @@ func (s *LighthouseValidatorNodeSet) StopAtIndex(i int) error {
return s.nodes[i].Stop()
}
// ComponentAtIndex returns the component at the provided index.
func (s *LighthouseValidatorNodeSet) ComponentAtIndex(i int) (e2etypes.ComponentRunner, error) {
if i >= len(s.nodes) {
return nil, errors.Errorf("provided index exceeds slice size: %d >= %d", i, len(s.nodes))
}
return s.nodes[i], nil
}
// LighthouseValidatorNode represents a lighthouse validator node.
type LighthouseValidatorNode struct {
e2etypes.ComponentRunner

View File

@@ -138,6 +138,14 @@ func (s *ValidatorNodeSet) StopAtIndex(i int) error {
return s.nodes[i].Stop()
}
// ComponentAtIndex returns the component at the provided index.
func (s *ValidatorNodeSet) ComponentAtIndex(i int) (e2etypes.ComponentRunner, error) {
if i >= len(s.nodes) {
return nil, errors.Errorf("provided index exceeds slice size: %d >= %d", i, len(s.nodes))
}
return s.nodes[i], nil
}
// ValidatorNode represents a validator node.
type ValidatorNode struct {
e2etypes.ComponentRunner

View File

@@ -1,6 +1,6 @@
load("@bazel_tools//tools/build_defs/repo:http.bzl", "http_archive") # gazelle:keep
lighthouse_version = "v2.1.4"
lighthouse_version = "v2.3.0"
lighthouse_archive_name = "lighthouse-%s-x86_64-unknown-linux-gnu-portable.tar.gz" % lighthouse_version
def e2e_deps():
@@ -14,12 +14,7 @@ def e2e_deps():
http_archive(
name = "lighthouse",
sha256 = "236883a4827037d96636aa259eef8cf3abc54c795adc18c4c2880842e09c743c",
sha256 = "6029acd211f269bcf41f86fd72fe540703a167ff96dfd952ff95b1693e3b5495",
build_file = "@prysm//testing/endtoend:lighthouse.BUILD",
# url = ("https://github.com/sigp/lighthouse/releases/download/%s/" + lighthouse_archive_name) % lighthouse_version,
# This is a compiled version of lighthouse from their `unstable` branch at this commit
# https://github.com/sigp/lighthouse/commit/99bb55472c278a1050f7679b2e018546ad3a28bf. Lighthouse does not have support
# for all the merge features as of their latest release, so this is a temporary compromise to allow multiclient test
# runs till their official release includes the required merge features in.
url = "https://prysmaticlabs.com/uploads/misc/lighthouse-99bb5547.tar.xz",
url = ("https://github.com/sigp/lighthouse/releases/download/%s/" + lighthouse_archive_name) % lighthouse_version,
)

View File

@@ -17,6 +17,7 @@ import (
"github.com/prysmaticlabs/prysm/api/client/beacon"
"github.com/prysmaticlabs/prysm/encoding/bytesutil"
"github.com/prysmaticlabs/prysm/io/file"
enginev1 "github.com/prysmaticlabs/prysm/proto/engine/v1"
"github.com/prysmaticlabs/prysm/proto/eth/service"
v1 "github.com/prysmaticlabs/prysm/proto/eth/v1"
"google.golang.org/grpc/codes"
@@ -149,26 +150,10 @@ func (r *testRunner) runEvaluators(conns []*grpc.ClientConn, tickingStartTime ti
secondsPerEpoch := uint64(params.BeaconConfig().SlotsPerEpoch.Mul(params.BeaconConfig().SecondsPerSlot))
ticker := helpers.NewEpochTicker(tickingStartTime, secondsPerEpoch)
for currentEpoch := range ticker.C() {
if config.EvalInterceptor(currentEpoch) {
if config.EvalInterceptor(currentEpoch, conns) {
continue
}
wg := new(sync.WaitGroup)
for _, eval := range config.Evaluators {
// Fix reference to evaluator as it will be running
// in a separate goroutine.
evaluator := eval
// Only run if the policy says so.
if !evaluator.Policy(types.Epoch(currentEpoch)) {
continue
}
wg.Add(1)
go t.Run(fmt.Sprintf(evaluator.Name, currentEpoch), func(t *testing.T) {
err := evaluator.Evaluation(conns...)
assert.NoError(t, err, "Evaluation failed for epoch %d: %v", currentEpoch, err)
wg.Done()
})
}
wg.Wait()
r.executeProvidedEvaluators(currentEpoch, conns, config.Evaluators)
if t.Failed() || currentEpoch >= config.EpochsToRun-1 {
ticker.Done()
@@ -558,7 +543,27 @@ func (r *testRunner) addEvent(ev func() error) {
r.comHandler.group.Go(ev)
}
func (r *testRunner) singleNodeOffline(epoch uint64) bool {
func (r *testRunner) executeProvidedEvaluators(currentEpoch uint64, conns []*grpc.ClientConn, evals []e2etypes.Evaluator) {
wg := new(sync.WaitGroup)
for _, eval := range evals {
// Fix reference to evaluator as it will be running
// in a separate goroutine.
evaluator := eval
// Only run if the policy says so.
if !evaluator.Policy(types.Epoch(currentEpoch)) {
continue
}
wg.Add(1)
go r.t.Run(fmt.Sprintf(evaluator.Name, currentEpoch), func(t *testing.T) {
err := evaluator.Evaluation(conns...)
assert.NoError(t, err, "Evaluation failed for epoch %d: %v", currentEpoch, err)
wg.Done()
})
}
wg.Wait()
}
func (r *testRunner) singleNodeOffline(epoch uint64, _ []*grpc.ClientConn) bool {
switch epoch {
case 9:
require.NoError(r.t, r.comHandler.beaconNodes.PauseAtIndex(0))
@@ -575,7 +580,7 @@ func (r *testRunner) singleNodeOffline(epoch uint64) bool {
return false
}
func (r *testRunner) singleNodeOfflineMulticlient(epoch uint64) bool {
func (r *testRunner) singleNodeOfflineMulticlient(epoch uint64, _ []*grpc.ClientConn) bool {
switch epoch {
case 9:
require.NoError(r.t, r.comHandler.beaconNodes.PauseAtIndex(0))
@@ -596,7 +601,7 @@ func (r *testRunner) singleNodeOfflineMulticlient(epoch uint64) bool {
return false
}
func (r *testRunner) eeOffline(epoch uint64) bool {
func (r *testRunner) eeOffline(epoch uint64, _ []*grpc.ClientConn) bool {
switch epoch {
case 9:
require.NoError(r.t, r.comHandler.eth1Miner.Pause())
@@ -611,7 +616,7 @@ func (r *testRunner) eeOffline(epoch uint64) bool {
return false
}
func (r *testRunner) allValidatorsOffline(epoch uint64) bool {
func (r *testRunner) allValidatorsOffline(epoch uint64, _ []*grpc.ClientConn) bool {
switch epoch {
case 9:
require.NoError(r.t, r.comHandler.validatorNodes.PauseAtIndex(0))
@@ -628,7 +633,95 @@ func (r *testRunner) allValidatorsOffline(epoch uint64) bool {
return false
}
// All Epochs are valid.
func defaultInterceptor(_ uint64) bool {
func (r *testRunner) optimisticSync(epoch uint64, conns []*grpc.ClientConn) bool {
switch epoch {
case 9:
component, err := r.comHandler.eth1Proxy.ComponentAtIndex(0)
require.NoError(r.t, err)
component.(e2etypes.EngineProxy).AddRequestInterceptor("engine_newPayloadV1", func() interface{} {
return &enginev1.PayloadStatus{
Status: enginev1.PayloadStatus_SYNCING,
LatestValidHash: make([]byte, 32),
}
}, func() bool {
return true
})
return true
case 10:
r.executeProvidedEvaluators(epoch, []*grpc.ClientConn{conns[0]}, []e2etypes.Evaluator{
ev.OptimisticSyncEnabled,
})
// Disable Interceptor
component, err := r.comHandler.eth1Proxy.ComponentAtIndex(0)
require.NoError(r.t, err)
engineProxy, ok := component.(e2etypes.EngineProxy)
require.Equal(r.t, true, ok)
engineProxy.RemoveRequestInterceptor("engine_newPayloadV1")
engineProxy.ReleaseBackedUpRequests("engine_newPayloadV1")
return true
case 11, 12:
// Allow 2 epochs for the network to finalize again.
return true
}
return false
}
func (r *testRunner) optimisticSyncMulticlient(epoch uint64, conns []*grpc.ClientConn) bool {
switch epoch {
case 9:
// Set it for prysm beacon node.
component, err := r.comHandler.eth1Proxy.ComponentAtIndex(0)
require.NoError(r.t, err)
component.(e2etypes.EngineProxy).AddRequestInterceptor("engine_newPayloadV1", func() interface{} {
return &enginev1.PayloadStatus{
Status: enginev1.PayloadStatus_SYNCING,
LatestValidHash: make([]byte, 32),
}
}, func() bool {
return true
})
// Set it for lighthouse beacon node.
component, err = r.comHandler.eth1Proxy.ComponentAtIndex(2)
require.NoError(r.t, err)
component.(e2etypes.EngineProxy).AddRequestInterceptor("engine_newPayloadV1", func() interface{} {
return &enginev1.PayloadStatus{
Status: enginev1.PayloadStatus_SYNCING,
LatestValidHash: make([]byte, 32),
}
}, func() bool {
return true
})
return true
case 10:
r.executeProvidedEvaluators(epoch, []*grpc.ClientConn{conns[0]}, []e2etypes.Evaluator{
ev.OptimisticSyncEnabled,
})
// Disable Interceptor
component, err := r.comHandler.eth1Proxy.ComponentAtIndex(0)
require.NoError(r.t, err)
engineProxy, ok := component.(e2etypes.EngineProxy)
require.Equal(r.t, true, ok)
engineProxy.RemoveRequestInterceptor("engine_newPayloadV1")
engineProxy.ReleaseBackedUpRequests("engine_newPayloadV1")
// Remove for lighthouse too
component, err = r.comHandler.eth1Proxy.ComponentAtIndex(2)
require.NoError(r.t, err)
engineProxy, ok = component.(e2etypes.EngineProxy)
require.Equal(r.t, true, ok)
engineProxy.RemoveRequestInterceptor("engine_newPayloadV1")
engineProxy.ReleaseBackedUpRequests("engine_newPayloadV1")
return true
case 11, 12:
// Allow 2 epochs for the network to finalize again.
return true
}
return false
}
// All Epochs are valid.
func defaultInterceptor(_ uint64, _ []*grpc.ClientConn) bool {
return false
}

View File

@@ -31,6 +31,7 @@ go_library(
"//consensus-types/wrapper:go_default_library",
"//container/slice:go_default_library",
"//encoding/bytesutil:go_default_library",
"//math:go_default_library",
"//network/forks:go_default_library",
"//proto/eth/service:go_default_library",
"//proto/eth/v1:go_default_library",

View File

@@ -3,14 +3,20 @@ package evaluators
import (
"context"
"math"
"strconv"
"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/config/params"
ctypes "github.com/prysmaticlabs/prysm/consensus-types/primitives"
mathutil "github.com/prysmaticlabs/prysm/math"
"github.com/prysmaticlabs/prysm/proto/eth/service"
v2 "github.com/prysmaticlabs/prysm/proto/eth/v2"
ethpb "github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/testing/endtoend/helpers"
e2e "github.com/prysmaticlabs/prysm/testing/endtoend/params"
"github.com/prysmaticlabs/prysm/testing/endtoend/policies"
"github.com/prysmaticlabs/prysm/testing/endtoend/types"
"github.com/prysmaticlabs/prysm/time/slots"
"google.golang.org/grpc"
"google.golang.org/protobuf/types/known/emptypb"
)
@@ -23,6 +29,13 @@ var TransactionsPresent = types.Evaluator{
Evaluation: transactionsPresent,
}
// OptimisticSyncEnabled checks that the node is in an optimistic state.
var OptimisticSyncEnabled = types.Evaluator{
Name: "optimistic_sync_at_epoch_%d",
Policy: policies.AllEpochs,
Evaluation: optimisticSyncEnabled,
}
func transactionsPresent(conns ...*grpc.ClientConn) error {
conn := conns[0]
client := ethpb.NewBeaconChainClient(conn)
@@ -52,3 +65,49 @@ func transactionsPresent(conns ...*grpc.ClientConn) error {
}
return nil
}
func optimisticSyncEnabled(conns ...*grpc.ClientConn) error {
for _, conn := range conns {
client := service.NewBeaconChainClient(conn)
head, err := client.GetBlockV2(context.Background(), &v2.BlockRequestV2{BlockId: []byte("head")})
if err != nil {
return err
}
headSlot := uint64(0)
switch hb := head.Data.Message.(type) {
case *v2.SignedBeaconBlockContainerV2_Phase0Block:
headSlot = uint64(hb.Phase0Block.Slot)
case *v2.SignedBeaconBlockContainerV2_AltairBlock:
headSlot = uint64(hb.AltairBlock.Slot)
case *v2.SignedBeaconBlockContainerV2_BellatrixBlock:
headSlot = uint64(hb.BellatrixBlock.Slot)
default:
return errors.New("no valid block type retrieved")
}
currEpoch := slots.ToEpoch(ctypes.Slot(headSlot))
startSlot, err := slots.EpochStart(currEpoch)
if err != nil {
return err
}
isOptimistic := false
for i := startSlot; i <= ctypes.Slot(headSlot); i++ {
castI, err := mathutil.Int(uint64(i))
if err != nil {
return err
}
block, err := client.GetBlockV2(context.Background(), &v2.BlockRequestV2{BlockId: []byte(strconv.Itoa(castI))})
if err != nil {
// Continue in the event of non-existent blocks.
continue
}
if !block.ExecutionOptimistic {
return errors.New("expected block to be optimistic, but it is not")
}
isOptimistic = true
}
if !isOptimistic {
return errors.New("expected block to be optimistic, but it is not")
}
}
return nil
}

View File

@@ -14,3 +14,10 @@ func TestEndToEnd_ScenarioRun_BeaconOffline_Multiclient(t *testing.T) {
runner.config.EvalInterceptor = runner.singleNodeOffline
runner.scenarioRunner()
}
func TestEndToEnd_ScenarioRun_OptimisticSync_Multiclient(t *testing.T) {
runner := e2eMainnet(t, false /*usePrysmSh*/, true /*useMultiClient*/)
runner.config.Evaluators = scenarioEvalsMulti()
runner.config.EvalInterceptor = runner.optimisticSyncMulticlient
runner.scenarioRunner()
}

View File

@@ -26,3 +26,11 @@ func TestEndToEnd_ScenarioRun_EEOffline(t *testing.T) {
runner.config.EvalInterceptor = runner.eeOffline
runner.scenarioRunner()
}
func TestEndToEnd_ScenarioRun_OptimisticSync(t *testing.T) {
runner := e2eMinimal(t)
runner.config.Evaluators = scenarioEvals()
runner.config.EvalInterceptor = runner.optimisticSync
runner.scenarioRunner()
}

View File

@@ -44,7 +44,7 @@ type E2EConfig struct {
Seed int64
TracingSinkEndpoint string
Evaluators []Evaluator
EvalInterceptor func(uint64) bool
EvalInterceptor func(uint64, []*grpc.ClientConn) bool
BeaconFlags []string
ValidatorFlags []string
PeerIDs []string
@@ -75,6 +75,8 @@ type ComponentRunner interface {
type MultipleComponentRunners interface {
ComponentRunner
// ComponentAtIndex returns the component at index
ComponentAtIndex(i int) (ComponentRunner, error)
// PauseAtIndex pauses the grouped component element at the desired index.
PauseAtIndex(i int) error
// ResumeAtIndex resumes the grouped component element at the desired index.
@@ -83,6 +85,16 @@ type MultipleComponentRunners interface {
StopAtIndex(i int) error
}
type EngineProxy interface {
ComponentRunner
// AddRequestInterceptor adds in a json-rpc request interceptor.
AddRequestInterceptor(rpcMethodName string, responseGen func() interface{}, trigger func() bool)
// RemoveRequestInterceptor removes the request interceptor for the provided method.
RemoveRequestInterceptor(rpcMethodName string)
// ReleaseBackedUpRequests releases backed up http requests.
ReleaseBackedUpRequests(rpcMethodName string)
}
// BeaconNodeSet defines an interface for an object that fulfills the duties
// of a group of beacon nodes.
type BeaconNodeSet interface {

View File

@@ -34,8 +34,8 @@ type jsonRPCObject struct {
}
type interceptorConfig struct {
response interface{}
trigger func() bool
responseGen func() interface{}
trigger func() bool
}
// Proxy server that sits as a middleware between an Ethereum consensus client and an execution client,
@@ -46,7 +46,7 @@ type Proxy struct {
srv *http.Server
lock sync.RWMutex
interceptors map[string]*interceptorConfig
backedUpRequests []*http.Request
backedUpRequests map[string][]*http.Request
}
// New creates a proxy server forwarding requests from a consensus client to an execution client.
@@ -57,7 +57,8 @@ func New(opts ...Option) (*Proxy, error) {
proxyPort: defaultProxyPort,
logger: logrus.New(),
},
interceptors: make(map[string]*interceptorConfig),
interceptors: make(map[string]*interceptorConfig),
backedUpRequests: map[string][]*http.Request{},
}
for _, o := range opts {
if err := o(p); err != nil {
@@ -112,7 +113,7 @@ func (p *Proxy) ServeHTTP(w http.ResponseWriter, r *http.Request) {
return
}
// Check if we need to intercept the request with a custom response.
hasIntercepted, err := p.interceptIfNeeded(requestBytes, w)
hasIntercepted, err := p.interceptIfNeeded(requestBytes, w, r)
if err != nil {
p.cfg.logger.WithError(err).Error("Could not intercept request")
return
@@ -126,7 +127,7 @@ func (p *Proxy) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// AddRequestInterceptor for a desired json-rpc method by specifying a custom response
// and a function that checks if the interceptor should be triggered.
func (p *Proxy) AddRequestInterceptor(rpcMethodName string, response interface{}, trigger func() bool) {
func (p *Proxy) AddRequestInterceptor(rpcMethodName string, response func() interface{}, trigger func() bool) {
p.lock.Lock()
defer p.lock.Unlock()
p.cfg.logger.Infof("Adding in interceptor for method %s", rpcMethodName)
@@ -136,9 +137,40 @@ func (p *Proxy) AddRequestInterceptor(rpcMethodName string, response interface{}
}
}
// RemoveRequestInterceptor removes the request interceptor for the provided method.
func (p *Proxy) RemoveRequestInterceptor(rpcMethodName string) {
p.lock.Lock()
defer p.lock.Unlock()
p.cfg.logger.Infof("Removing interceptor for method %s", rpcMethodName)
delete(p.interceptors, rpcMethodName)
}
// ReleaseBackedUpRequests releases backed up http requests which
// were previously ignored due to our interceptors.
func (p *Proxy) ReleaseBackedUpRequests(rpcMethodName string) {
p.lock.Lock()
defer p.lock.Unlock()
reqs := p.backedUpRequests[rpcMethodName]
for _, r := range reqs {
p.cfg.logger.Infof("Sending backed up request for method %s", rpcMethodName)
rBytes, err := parseRequestBytes(r)
if err != nil {
p.cfg.logger.Error(err)
continue
}
res, err := p.sendHttpRequest(r, rBytes)
if err != nil {
p.cfg.logger.Error(err)
continue
}
p.cfg.logger.Infof("Received response %s for backed up request for method %s", http.StatusText(res.StatusCode), rpcMethodName)
}
delete(p.backedUpRequests, rpcMethodName)
}
// Checks if there is a custom interceptor hook on the request, check if it can be
// triggered, and then write the custom response to the writer.
func (p *Proxy) interceptIfNeeded(requestBytes []byte, w http.ResponseWriter) (hasIntercepted bool, err error) {
func (p *Proxy) interceptIfNeeded(requestBytes []byte, w http.ResponseWriter, r *http.Request) (hasIntercepted bool, err error) {
if !isEngineAPICall(requestBytes) {
return
}
@@ -159,12 +191,13 @@ func (p *Proxy) interceptIfNeeded(requestBytes []byte, w http.ResponseWriter) (h
jResp := &jsonRPCObject{
Method: jreq.Method,
ID: jreq.ID,
Result: interceptor.response,
Result: interceptor.responseGen(),
}
if err = json.NewEncoder(w).Encode(jResp); err != nil {
return
}
hasIntercepted = true
p.backedUpRequests[jreq.Method] = append(p.backedUpRequests[jreq.Method], r)
return
}
@@ -177,18 +210,39 @@ func (p *Proxy) proxyRequest(requestBytes []byte, w http.ResponseWriter, r *http
jreq = &jsonRPCObject{Method: "unknown"}
}
p.cfg.logger.Infof("Forwarding %s request for method %s to %s", r.Method, jreq.Method, p.cfg.destinationUrl.String())
proxyReq, err := http.NewRequest(r.Method, p.cfg.destinationUrl.String(), r.Body)
proxyRes, err := p.sendHttpRequest(r, requestBytes)
if err != nil {
p.cfg.logger.WithError(err).Error("Could create new request")
return
}
p.cfg.logger.Infof("Received response for %s request with method %s from %s", r.Method, jreq.Method, p.cfg.destinationUrl.String())
defer func() {
if err = proxyRes.Body.Close(); err != nil {
p.cfg.logger.WithError(err).Error("Could not do close proxy responseGen body")
}
}()
// Pipe the proxy responseGen to the original caller.
if _, err = io.Copy(w, proxyRes.Body); err != nil {
p.cfg.logger.WithError(err).Error("Could not copy proxy request body")
return
}
}
func (p *Proxy) sendHttpRequest(req *http.Request, requestBytes []byte) (*http.Response, error) {
proxyReq, err := http.NewRequest(req.Method, p.cfg.destinationUrl.String(), req.Body)
if err != nil {
p.cfg.logger.WithError(err).Error("Could not create new request")
return nil, err
}
// Set the modified request as the proxy request body.
proxyReq.Body = ioutil.NopCloser(bytes.NewBuffer(requestBytes))
// Required proxy headers for forwarding JSON-RPC requests to the execution client.
proxyReq.Header.Set("Host", r.Host)
proxyReq.Header.Set("X-Forwarded-For", r.RemoteAddr)
proxyReq.Header.Set("Host", req.Host)
proxyReq.Header.Set("X-Forwarded-For", req.RemoteAddr)
proxyReq.Header.Set("Content-Type", "application/json")
client := &http.Client{}
@@ -198,21 +252,9 @@ func (p *Proxy) proxyRequest(requestBytes []byte, w http.ResponseWriter, r *http
proxyRes, err := client.Do(proxyReq)
if err != nil {
p.cfg.logger.WithError(err).Error("Could not forward request to destination server")
return
}
p.cfg.logger.Infof("Received response for %s request with method %s from %s", r.Method, jreq.Method, p.cfg.destinationUrl.String())
defer func() {
if err = proxyRes.Body.Close(); err != nil {
p.cfg.logger.WithError(err).Error("Could not do close proxy response body")
}
}()
// Pipe the proxy response to the original caller.
if _, err = io.Copy(w, proxyRes.Body); err != nil {
p.cfg.logger.WithError(err).Error("Could not copy proxy request body")
return
return nil, err
}
return proxyRes, nil
}
// Peek into the bytes of an HTTP request's body.

View File

@@ -116,7 +116,9 @@ func TestProxy_CustomInterceptors(t *testing.T) {
// RPC method to intercept.
proxy.AddRequestInterceptor(
method,
&syncingResponse{Syncing: false}, // Custom response.
func() interface{} {
return &syncingResponse{Syncing: false}
}, // Custom response.
func() bool {
return true // Always intercept with a custom response.
},
@@ -164,7 +166,9 @@ func TestProxy_CustomInterceptors(t *testing.T) {
method := "engine_newPayloadV1"
// RPC method to intercept.
wantInterceptedResponse := &engineResponse{BlockHash: common.BytesToHash([]byte("bar"))}
wantInterceptedResponse := func() interface{} {
return &engineResponse{BlockHash: common.BytesToHash([]byte("bar"))}
}
conditional := false
proxy.AddRequestInterceptor(
method,
@@ -199,7 +203,7 @@ func TestProxy_CustomInterceptors(t *testing.T) {
proxyResult = &engineResponse{}
err = rpcClient.CallContext(ctx, proxyResult, method)
require.NoError(t, err)
require.DeepEqual(t, wantInterceptedResponse, proxyResult)
require.DeepEqual(t, wantInterceptedResponse(), proxyResult)
})
t.Run("triggers interceptor response correctly", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
@@ -230,7 +234,9 @@ func TestProxy_CustomInterceptors(t *testing.T) {
method := "engine_newPayloadV1"
// RPC method to intercept.
wantInterceptedResponse := &engineResponse{BlockHash: common.BytesToHash([]byte("bar"))}
wantInterceptedResponse := func() interface{} {
return &engineResponse{BlockHash: common.BytesToHash([]byte("bar"))}
}
proxy.AddRequestInterceptor(
method,
wantInterceptedResponse,
@@ -249,7 +255,7 @@ func TestProxy_CustomInterceptors(t *testing.T) {
// The interception should work and we should not be getting the destination
// response but rather a custom response from the interceptor config.
require.DeepEqual(t, wantInterceptedResponse, proxyResult)
require.DeepEqual(t, wantInterceptedResponse(), proxyResult)
})
}