From 7f443e8387e0713c79d310dafd502b0799683145 Mon Sep 17 00:00:00 2001 From: Nishant Das Date: Fri, 10 Jun 2022 07:24:53 +0800 Subject: [PATCH] Add Optimistic Sync Scenario Testing (#10836) * add latest changes * fix it * add multiclient support * fix tests * Apply suggestions from code review * fix test Co-authored-by: Raul Jordan Co-authored-by: prylabs-bulldozer[bot] <58059840+prylabs-bulldozer[bot]@users.noreply.github.com> --- testing/endtoend/BUILD.bazel | 10 +- testing/endtoend/component_handler_test.go | 2 +- testing/endtoend/components/beacon_node.go | 8 + testing/endtoend/components/eth1/helpers.go | 2 +- testing/endtoend/components/eth1/node_set.go | 8 + testing/endtoend/components/eth1/proxy.go | 32 +++- .../endtoend/components/lighthouse_beacon.go | 8 + .../components/lighthouse_validator.go | 8 + testing/endtoend/components/validator.go | 8 + testing/endtoend/deps.bzl | 11 +- testing/endtoend/endtoend_test.go | 141 +++++++++++++++--- testing/endtoend/evaluators/BUILD.bazel | 1 + .../endtoend/evaluators/execution_engine.go | 59 ++++++++ testing/endtoend/mainnet_scenario_e2e_test.go | 7 + testing/endtoend/minimal_scenario_e2e_test.go | 8 + testing/endtoend/types/types.go | 14 +- testing/middleware/engine-api-proxy/proxy.go | 92 ++++++++---- .../middleware/engine-api-proxy/proxy_test.go | 16 +- 18 files changed, 363 insertions(+), 72 deletions(-) diff --git a/testing/endtoend/BUILD.bazel b/testing/endtoend/BUILD.bazel index bd663dd4b3..5581b86fe4 100644 --- a/testing/endtoend/BUILD.bazel +++ b/testing/endtoend/BUILD.bazel @@ -19,6 +19,8 @@ common_deps = [ "//io/file:go_default_library", "//proto/eth/service:go_default_library", "//proto/eth/v1:go_default_library", + "//math:go_default_library", + "//proto/engine/v1:go_default_library", "//proto/prysm/v1alpha1:go_default_library", "//testing/assert:go_default_library", "//testing/endtoend/components:go_default_library", @@ -64,6 +66,7 @@ go_test( "@web3signer", ], eth_network = "minimal", + flaky = True, shard_count = 2, tags = [ "e2e", @@ -72,7 +75,6 @@ go_test( "requires-network", ], deps = common_deps, - flaky = True, ) go_test( @@ -96,6 +98,7 @@ go_test( "@web3signer", ], eth_network = "mainnet", + flaky = True, shard_count = 2, tags = [ "e2e", @@ -105,7 +108,6 @@ go_test( "requires-network", ], deps = common_deps, - flaky = True, ) go_test( @@ -129,6 +131,7 @@ go_test( "@web3signer", ], eth_network = "mainnet", + flaky = True, shard_count = 2, tags = [ "exclusive", @@ -138,7 +141,6 @@ go_test( "scenario", ], deps = common_deps, - flaky = True, ) go_test( @@ -162,6 +164,7 @@ go_test( "@web3signer", ], eth_network = "minimal", + flaky = True, shard_count = 2, tags = [ "exclusive", @@ -171,5 +174,4 @@ go_test( "scenario", ], deps = common_deps, - flaky = True, ) diff --git a/testing/endtoend/component_handler_test.go b/testing/endtoend/component_handler_test.go index 0a7cf5dd13..d1cc7885fa 100644 --- a/testing/endtoend/component_handler_test.go +++ b/testing/endtoend/component_handler_test.go @@ -26,7 +26,7 @@ type componentHandler struct { web3Signer e2etypes.ComponentRunner bootnode e2etypes.ComponentRunner eth1Miner e2etypes.ComponentRunner - eth1Proxy e2etypes.ComponentRunner + eth1Proxy e2etypes.MultipleComponentRunners eth1Nodes e2etypes.MultipleComponentRunners beaconNodes e2etypes.MultipleComponentRunners validatorNodes e2etypes.MultipleComponentRunners diff --git a/testing/endtoend/components/beacon_node.go b/testing/endtoend/components/beacon_node.go index ca73333253..b8f9055027 100644 --- a/testing/endtoend/components/beacon_node.go +++ b/testing/endtoend/components/beacon_node.go @@ -137,6 +137,14 @@ func (s *BeaconNodeSet) StopAtIndex(i int) error { return s.nodes[i].Stop() } +// ComponentAtIndex returns the component at the provided index. +func (s *BeaconNodeSet) ComponentAtIndex(i int) (e2etypes.ComponentRunner, error) { + if i >= len(s.nodes) { + return nil, errors.Errorf("provided index exceeds slice size: %d >= %d", i, len(s.nodes)) + } + return s.nodes[i], nil +} + // BeaconNode represents beacon node. type BeaconNode struct { e2etypes.ComponentRunner diff --git a/testing/endtoend/components/eth1/helpers.go b/testing/endtoend/components/eth1/helpers.go index e6dce3b3ed..c5e5c0fae1 100644 --- a/testing/endtoend/components/eth1/helpers.go +++ b/testing/endtoend/components/eth1/helpers.go @@ -28,7 +28,7 @@ var _ e2etypes.MultipleComponentRunners = (*NodeSet)(nil) var _ e2etypes.MultipleComponentRunners = (*ProxySet)(nil) var _ e2etypes.ComponentRunner = (*Miner)(nil) var _ e2etypes.ComponentRunner = (*Node)(nil) -var _ e2etypes.ComponentRunner = (*Proxy)(nil) +var _ e2etypes.EngineProxy = (*Proxy)(nil) // WaitForBlocks waits for a certain amount of blocks to be mined by the ETH1 chain before returning. func WaitForBlocks(web3 *ethclient.Client, keystore *keystore.Key, blocksToWait uint64) error { diff --git a/testing/endtoend/components/eth1/node_set.go b/testing/endtoend/components/eth1/node_set.go index d56ab61a94..70c27c31b1 100644 --- a/testing/endtoend/components/eth1/node_set.go +++ b/testing/endtoend/components/eth1/node_set.go @@ -110,3 +110,11 @@ func (s *NodeSet) StopAtIndex(i int) error { } return s.nodes[i].Stop() } + +// ComponentAtIndex returns the component at the provided index. +func (s *NodeSet) ComponentAtIndex(i int) (e2etypes.ComponentRunner, error) { + if i >= len(s.nodes) { + return nil, errors.Errorf("provided index exceeds slice size: %d >= %d", i, len(s.nodes)) + } + return s.nodes[i], nil +} diff --git a/testing/endtoend/components/eth1/proxy.go b/testing/endtoend/components/eth1/proxy.go index 54685e2538..1454ab9cbe 100644 --- a/testing/endtoend/components/eth1/proxy.go +++ b/testing/endtoend/components/eth1/proxy.go @@ -110,12 +110,21 @@ func (s *ProxySet) StopAtIndex(i int) error { return s.proxies[i].Stop() } +// ComponentAtIndex returns the component at the provided index. +func (s *ProxySet) ComponentAtIndex(i int) (e2etypes.ComponentRunner, error) { + if i >= len(s.proxies) { + return nil, errors.Errorf("provided index exceeds slice size: %d >= %d", i, len(s.proxies)) + } + return s.proxies[i], nil +} + // Proxy represents an engine-api proxy. type Proxy struct { e2etypes.ComponentRunner - started chan struct{} - index int - cancel func() + started chan struct{} + index int + engineProxy *proxy.Proxy + cancel func() } // NewProxy creates and returns an engine-api proxy. @@ -157,6 +166,7 @@ func (node *Proxy) Start(ctx context.Context) error { // Set cancel into context. ctx, cancel := context.WithCancel(ctx) node.cancel = cancel + node.engineProxy = nProxy // Mark node as ready. close(node.started) return nProxy.Start(ctx) @@ -185,6 +195,22 @@ func (node *Proxy) Stop() error { return nil } +// AddRequestInterceptor adds in a json-rpc request interceptor. +func (node *Proxy) AddRequestInterceptor(rpcMethodName string, responseGen func() interface{}, trigger func() bool) { + node.engineProxy.AddRequestInterceptor(rpcMethodName, responseGen, trigger) +} + +// RemoveRequestInterceptor removes the request interceptor for the provided method. +func (node *Proxy) RemoveRequestInterceptor(rpcMethodName string) { + node.engineProxy.RemoveRequestInterceptor(rpcMethodName) +} + +// ReleaseBackedUpRequests releases backed up http requests which +// were previously ignored due to our interceptors. +func (node *Proxy) ReleaseBackedUpRequests(rpcMethodName string) { + node.engineProxy.ReleaseBackedUpRequests(rpcMethodName) +} + func parseJWTSecretFromFile(jwtSecretFile string) ([]byte, error) { enc, err := file.ReadFileAsBytes(jwtSecretFile) if err != nil { diff --git a/testing/endtoend/components/lighthouse_beacon.go b/testing/endtoend/components/lighthouse_beacon.go index 51b42e46dd..102781f2a5 100644 --- a/testing/endtoend/components/lighthouse_beacon.go +++ b/testing/endtoend/components/lighthouse_beacon.go @@ -127,6 +127,14 @@ func (s *LighthouseBeaconNodeSet) StopAtIndex(i int) error { return s.nodes[i].Stop() } +// ComponentAtIndex returns the component at the provided index. +func (s *LighthouseBeaconNodeSet) ComponentAtIndex(i int) (e2etypes.ComponentRunner, error) { + if i >= len(s.nodes) { + return nil, errors.Errorf("provided index exceeds slice size: %d >= %d", i, len(s.nodes)) + } + return s.nodes[i], nil +} + // LighthouseBeaconNode represents a lighthouse beacon node. type LighthouseBeaconNode struct { e2etypes.ComponentRunner diff --git a/testing/endtoend/components/lighthouse_validator.go b/testing/endtoend/components/lighthouse_validator.go index b86a6932a0..d5921c7f68 100644 --- a/testing/endtoend/components/lighthouse_validator.go +++ b/testing/endtoend/components/lighthouse_validator.go @@ -131,6 +131,14 @@ func (s *LighthouseValidatorNodeSet) StopAtIndex(i int) error { return s.nodes[i].Stop() } +// ComponentAtIndex returns the component at the provided index. +func (s *LighthouseValidatorNodeSet) ComponentAtIndex(i int) (e2etypes.ComponentRunner, error) { + if i >= len(s.nodes) { + return nil, errors.Errorf("provided index exceeds slice size: %d >= %d", i, len(s.nodes)) + } + return s.nodes[i], nil +} + // LighthouseValidatorNode represents a lighthouse validator node. type LighthouseValidatorNode struct { e2etypes.ComponentRunner diff --git a/testing/endtoend/components/validator.go b/testing/endtoend/components/validator.go index 6eb0d117af..c8fa0e3e71 100644 --- a/testing/endtoend/components/validator.go +++ b/testing/endtoend/components/validator.go @@ -138,6 +138,14 @@ func (s *ValidatorNodeSet) StopAtIndex(i int) error { return s.nodes[i].Stop() } +// ComponentAtIndex returns the component at the provided index. +func (s *ValidatorNodeSet) ComponentAtIndex(i int) (e2etypes.ComponentRunner, error) { + if i >= len(s.nodes) { + return nil, errors.Errorf("provided index exceeds slice size: %d >= %d", i, len(s.nodes)) + } + return s.nodes[i], nil +} + // ValidatorNode represents a validator node. type ValidatorNode struct { e2etypes.ComponentRunner diff --git a/testing/endtoend/deps.bzl b/testing/endtoend/deps.bzl index 646ff879d0..9ed31ccf6e 100644 --- a/testing/endtoend/deps.bzl +++ b/testing/endtoend/deps.bzl @@ -1,6 +1,6 @@ load("@bazel_tools//tools/build_defs/repo:http.bzl", "http_archive") # gazelle:keep -lighthouse_version = "v2.1.4" +lighthouse_version = "v2.3.0" lighthouse_archive_name = "lighthouse-%s-x86_64-unknown-linux-gnu-portable.tar.gz" % lighthouse_version def e2e_deps(): @@ -14,12 +14,7 @@ def e2e_deps(): http_archive( name = "lighthouse", - sha256 = "236883a4827037d96636aa259eef8cf3abc54c795adc18c4c2880842e09c743c", + sha256 = "6029acd211f269bcf41f86fd72fe540703a167ff96dfd952ff95b1693e3b5495", build_file = "@prysm//testing/endtoend:lighthouse.BUILD", - # url = ("https://github.com/sigp/lighthouse/releases/download/%s/" + lighthouse_archive_name) % lighthouse_version, - # This is a compiled version of lighthouse from their `unstable` branch at this commit - # https://github.com/sigp/lighthouse/commit/99bb55472c278a1050f7679b2e018546ad3a28bf. Lighthouse does not have support - # for all the merge features as of their latest release, so this is a temporary compromise to allow multiclient test - # runs till their official release includes the required merge features in. - url = "https://prysmaticlabs.com/uploads/misc/lighthouse-99bb5547.tar.xz", + url = ("https://github.com/sigp/lighthouse/releases/download/%s/" + lighthouse_archive_name) % lighthouse_version, ) diff --git a/testing/endtoend/endtoend_test.go b/testing/endtoend/endtoend_test.go index 4e6de0f026..ad5213a694 100644 --- a/testing/endtoend/endtoend_test.go +++ b/testing/endtoend/endtoend_test.go @@ -17,6 +17,7 @@ import ( "github.com/prysmaticlabs/prysm/api/client/beacon" "github.com/prysmaticlabs/prysm/encoding/bytesutil" "github.com/prysmaticlabs/prysm/io/file" + enginev1 "github.com/prysmaticlabs/prysm/proto/engine/v1" "github.com/prysmaticlabs/prysm/proto/eth/service" v1 "github.com/prysmaticlabs/prysm/proto/eth/v1" "google.golang.org/grpc/codes" @@ -149,26 +150,10 @@ func (r *testRunner) runEvaluators(conns []*grpc.ClientConn, tickingStartTime ti secondsPerEpoch := uint64(params.BeaconConfig().SlotsPerEpoch.Mul(params.BeaconConfig().SecondsPerSlot)) ticker := helpers.NewEpochTicker(tickingStartTime, secondsPerEpoch) for currentEpoch := range ticker.C() { - if config.EvalInterceptor(currentEpoch) { + if config.EvalInterceptor(currentEpoch, conns) { continue } - wg := new(sync.WaitGroup) - for _, eval := range config.Evaluators { - // Fix reference to evaluator as it will be running - // in a separate goroutine. - evaluator := eval - // Only run if the policy says so. - if !evaluator.Policy(types.Epoch(currentEpoch)) { - continue - } - wg.Add(1) - go t.Run(fmt.Sprintf(evaluator.Name, currentEpoch), func(t *testing.T) { - err := evaluator.Evaluation(conns...) - assert.NoError(t, err, "Evaluation failed for epoch %d: %v", currentEpoch, err) - wg.Done() - }) - } - wg.Wait() + r.executeProvidedEvaluators(currentEpoch, conns, config.Evaluators) if t.Failed() || currentEpoch >= config.EpochsToRun-1 { ticker.Done() @@ -558,7 +543,27 @@ func (r *testRunner) addEvent(ev func() error) { r.comHandler.group.Go(ev) } -func (r *testRunner) singleNodeOffline(epoch uint64) bool { +func (r *testRunner) executeProvidedEvaluators(currentEpoch uint64, conns []*grpc.ClientConn, evals []e2etypes.Evaluator) { + wg := new(sync.WaitGroup) + for _, eval := range evals { + // Fix reference to evaluator as it will be running + // in a separate goroutine. + evaluator := eval + // Only run if the policy says so. + if !evaluator.Policy(types.Epoch(currentEpoch)) { + continue + } + wg.Add(1) + go r.t.Run(fmt.Sprintf(evaluator.Name, currentEpoch), func(t *testing.T) { + err := evaluator.Evaluation(conns...) + assert.NoError(t, err, "Evaluation failed for epoch %d: %v", currentEpoch, err) + wg.Done() + }) + } + wg.Wait() +} + +func (r *testRunner) singleNodeOffline(epoch uint64, _ []*grpc.ClientConn) bool { switch epoch { case 9: require.NoError(r.t, r.comHandler.beaconNodes.PauseAtIndex(0)) @@ -575,7 +580,7 @@ func (r *testRunner) singleNodeOffline(epoch uint64) bool { return false } -func (r *testRunner) singleNodeOfflineMulticlient(epoch uint64) bool { +func (r *testRunner) singleNodeOfflineMulticlient(epoch uint64, _ []*grpc.ClientConn) bool { switch epoch { case 9: require.NoError(r.t, r.comHandler.beaconNodes.PauseAtIndex(0)) @@ -596,7 +601,7 @@ func (r *testRunner) singleNodeOfflineMulticlient(epoch uint64) bool { return false } -func (r *testRunner) eeOffline(epoch uint64) bool { +func (r *testRunner) eeOffline(epoch uint64, _ []*grpc.ClientConn) bool { switch epoch { case 9: require.NoError(r.t, r.comHandler.eth1Miner.Pause()) @@ -611,7 +616,7 @@ func (r *testRunner) eeOffline(epoch uint64) bool { return false } -func (r *testRunner) allValidatorsOffline(epoch uint64) bool { +func (r *testRunner) allValidatorsOffline(epoch uint64, _ []*grpc.ClientConn) bool { switch epoch { case 9: require.NoError(r.t, r.comHandler.validatorNodes.PauseAtIndex(0)) @@ -628,7 +633,95 @@ func (r *testRunner) allValidatorsOffline(epoch uint64) bool { return false } -// All Epochs are valid. -func defaultInterceptor(_ uint64) bool { +func (r *testRunner) optimisticSync(epoch uint64, conns []*grpc.ClientConn) bool { + switch epoch { + case 9: + component, err := r.comHandler.eth1Proxy.ComponentAtIndex(0) + require.NoError(r.t, err) + component.(e2etypes.EngineProxy).AddRequestInterceptor("engine_newPayloadV1", func() interface{} { + return &enginev1.PayloadStatus{ + Status: enginev1.PayloadStatus_SYNCING, + LatestValidHash: make([]byte, 32), + } + }, func() bool { + return true + }) + return true + case 10: + r.executeProvidedEvaluators(epoch, []*grpc.ClientConn{conns[0]}, []e2etypes.Evaluator{ + ev.OptimisticSyncEnabled, + }) + // Disable Interceptor + component, err := r.comHandler.eth1Proxy.ComponentAtIndex(0) + require.NoError(r.t, err) + engineProxy, ok := component.(e2etypes.EngineProxy) + require.Equal(r.t, true, ok) + engineProxy.RemoveRequestInterceptor("engine_newPayloadV1") + engineProxy.ReleaseBackedUpRequests("engine_newPayloadV1") + + return true + case 11, 12: + // Allow 2 epochs for the network to finalize again. + return true + } + return false +} + +func (r *testRunner) optimisticSyncMulticlient(epoch uint64, conns []*grpc.ClientConn) bool { + switch epoch { + case 9: + // Set it for prysm beacon node. + component, err := r.comHandler.eth1Proxy.ComponentAtIndex(0) + require.NoError(r.t, err) + component.(e2etypes.EngineProxy).AddRequestInterceptor("engine_newPayloadV1", func() interface{} { + return &enginev1.PayloadStatus{ + Status: enginev1.PayloadStatus_SYNCING, + LatestValidHash: make([]byte, 32), + } + }, func() bool { + return true + }) + // Set it for lighthouse beacon node. + component, err = r.comHandler.eth1Proxy.ComponentAtIndex(2) + require.NoError(r.t, err) + component.(e2etypes.EngineProxy).AddRequestInterceptor("engine_newPayloadV1", func() interface{} { + return &enginev1.PayloadStatus{ + Status: enginev1.PayloadStatus_SYNCING, + LatestValidHash: make([]byte, 32), + } + }, func() bool { + return true + }) + return true + case 10: + r.executeProvidedEvaluators(epoch, []*grpc.ClientConn{conns[0]}, []e2etypes.Evaluator{ + ev.OptimisticSyncEnabled, + }) + // Disable Interceptor + component, err := r.comHandler.eth1Proxy.ComponentAtIndex(0) + require.NoError(r.t, err) + engineProxy, ok := component.(e2etypes.EngineProxy) + require.Equal(r.t, true, ok) + engineProxy.RemoveRequestInterceptor("engine_newPayloadV1") + engineProxy.ReleaseBackedUpRequests("engine_newPayloadV1") + + // Remove for lighthouse too + component, err = r.comHandler.eth1Proxy.ComponentAtIndex(2) + require.NoError(r.t, err) + engineProxy, ok = component.(e2etypes.EngineProxy) + require.Equal(r.t, true, ok) + engineProxy.RemoveRequestInterceptor("engine_newPayloadV1") + engineProxy.ReleaseBackedUpRequests("engine_newPayloadV1") + + return true + case 11, 12: + // Allow 2 epochs for the network to finalize again. + return true + } + return false +} + +// All Epochs are valid. +func defaultInterceptor(_ uint64, _ []*grpc.ClientConn) bool { return false } diff --git a/testing/endtoend/evaluators/BUILD.bazel b/testing/endtoend/evaluators/BUILD.bazel index 4dabca2445..d14d61493a 100644 --- a/testing/endtoend/evaluators/BUILD.bazel +++ b/testing/endtoend/evaluators/BUILD.bazel @@ -31,6 +31,7 @@ go_library( "//consensus-types/wrapper:go_default_library", "//container/slice:go_default_library", "//encoding/bytesutil:go_default_library", + "//math:go_default_library", "//network/forks:go_default_library", "//proto/eth/service:go_default_library", "//proto/eth/v1:go_default_library", diff --git a/testing/endtoend/evaluators/execution_engine.go b/testing/endtoend/evaluators/execution_engine.go index a5aa1de4df..bae1072b48 100644 --- a/testing/endtoend/evaluators/execution_engine.go +++ b/testing/endtoend/evaluators/execution_engine.go @@ -3,14 +3,20 @@ package evaluators import ( "context" "math" + "strconv" "github.com/pkg/errors" "github.com/prysmaticlabs/prysm/config/params" + ctypes "github.com/prysmaticlabs/prysm/consensus-types/primitives" + mathutil "github.com/prysmaticlabs/prysm/math" + "github.com/prysmaticlabs/prysm/proto/eth/service" + v2 "github.com/prysmaticlabs/prysm/proto/eth/v2" ethpb "github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1" "github.com/prysmaticlabs/prysm/testing/endtoend/helpers" e2e "github.com/prysmaticlabs/prysm/testing/endtoend/params" "github.com/prysmaticlabs/prysm/testing/endtoend/policies" "github.com/prysmaticlabs/prysm/testing/endtoend/types" + "github.com/prysmaticlabs/prysm/time/slots" "google.golang.org/grpc" "google.golang.org/protobuf/types/known/emptypb" ) @@ -23,6 +29,13 @@ var TransactionsPresent = types.Evaluator{ Evaluation: transactionsPresent, } +// OptimisticSyncEnabled checks that the node is in an optimistic state. +var OptimisticSyncEnabled = types.Evaluator{ + Name: "optimistic_sync_at_epoch_%d", + Policy: policies.AllEpochs, + Evaluation: optimisticSyncEnabled, +} + func transactionsPresent(conns ...*grpc.ClientConn) error { conn := conns[0] client := ethpb.NewBeaconChainClient(conn) @@ -52,3 +65,49 @@ func transactionsPresent(conns ...*grpc.ClientConn) error { } return nil } + +func optimisticSyncEnabled(conns ...*grpc.ClientConn) error { + for _, conn := range conns { + client := service.NewBeaconChainClient(conn) + head, err := client.GetBlockV2(context.Background(), &v2.BlockRequestV2{BlockId: []byte("head")}) + if err != nil { + return err + } + headSlot := uint64(0) + switch hb := head.Data.Message.(type) { + case *v2.SignedBeaconBlockContainerV2_Phase0Block: + headSlot = uint64(hb.Phase0Block.Slot) + case *v2.SignedBeaconBlockContainerV2_AltairBlock: + headSlot = uint64(hb.AltairBlock.Slot) + case *v2.SignedBeaconBlockContainerV2_BellatrixBlock: + headSlot = uint64(hb.BellatrixBlock.Slot) + default: + return errors.New("no valid block type retrieved") + } + currEpoch := slots.ToEpoch(ctypes.Slot(headSlot)) + startSlot, err := slots.EpochStart(currEpoch) + if err != nil { + return err + } + isOptimistic := false + for i := startSlot; i <= ctypes.Slot(headSlot); i++ { + castI, err := mathutil.Int(uint64(i)) + if err != nil { + return err + } + block, err := client.GetBlockV2(context.Background(), &v2.BlockRequestV2{BlockId: []byte(strconv.Itoa(castI))}) + if err != nil { + // Continue in the event of non-existent blocks. + continue + } + if !block.ExecutionOptimistic { + return errors.New("expected block to be optimistic, but it is not") + } + isOptimistic = true + } + if !isOptimistic { + return errors.New("expected block to be optimistic, but it is not") + } + } + return nil +} diff --git a/testing/endtoend/mainnet_scenario_e2e_test.go b/testing/endtoend/mainnet_scenario_e2e_test.go index cf8dbe2f45..1214fba379 100644 --- a/testing/endtoend/mainnet_scenario_e2e_test.go +++ b/testing/endtoend/mainnet_scenario_e2e_test.go @@ -14,3 +14,10 @@ func TestEndToEnd_ScenarioRun_BeaconOffline_Multiclient(t *testing.T) { runner.config.EvalInterceptor = runner.singleNodeOffline runner.scenarioRunner() } + +func TestEndToEnd_ScenarioRun_OptimisticSync_Multiclient(t *testing.T) { + runner := e2eMainnet(t, false /*usePrysmSh*/, true /*useMultiClient*/) + runner.config.Evaluators = scenarioEvalsMulti() + runner.config.EvalInterceptor = runner.optimisticSyncMulticlient + runner.scenarioRunner() +} diff --git a/testing/endtoend/minimal_scenario_e2e_test.go b/testing/endtoend/minimal_scenario_e2e_test.go index 6c305a93f2..82c4bbd016 100644 --- a/testing/endtoend/minimal_scenario_e2e_test.go +++ b/testing/endtoend/minimal_scenario_e2e_test.go @@ -26,3 +26,11 @@ func TestEndToEnd_ScenarioRun_EEOffline(t *testing.T) { runner.config.EvalInterceptor = runner.eeOffline runner.scenarioRunner() } + +func TestEndToEnd_ScenarioRun_OptimisticSync(t *testing.T) { + runner := e2eMinimal(t) + + runner.config.Evaluators = scenarioEvals() + runner.config.EvalInterceptor = runner.optimisticSync + runner.scenarioRunner() +} diff --git a/testing/endtoend/types/types.go b/testing/endtoend/types/types.go index 2edfec98e5..384740f61b 100644 --- a/testing/endtoend/types/types.go +++ b/testing/endtoend/types/types.go @@ -44,7 +44,7 @@ type E2EConfig struct { Seed int64 TracingSinkEndpoint string Evaluators []Evaluator - EvalInterceptor func(uint64) bool + EvalInterceptor func(uint64, []*grpc.ClientConn) bool BeaconFlags []string ValidatorFlags []string PeerIDs []string @@ -75,6 +75,8 @@ type ComponentRunner interface { type MultipleComponentRunners interface { ComponentRunner + // ComponentAtIndex returns the component at index + ComponentAtIndex(i int) (ComponentRunner, error) // PauseAtIndex pauses the grouped component element at the desired index. PauseAtIndex(i int) error // ResumeAtIndex resumes the grouped component element at the desired index. @@ -83,6 +85,16 @@ type MultipleComponentRunners interface { StopAtIndex(i int) error } +type EngineProxy interface { + ComponentRunner + // AddRequestInterceptor adds in a json-rpc request interceptor. + AddRequestInterceptor(rpcMethodName string, responseGen func() interface{}, trigger func() bool) + // RemoveRequestInterceptor removes the request interceptor for the provided method. + RemoveRequestInterceptor(rpcMethodName string) + // ReleaseBackedUpRequests releases backed up http requests. + ReleaseBackedUpRequests(rpcMethodName string) +} + // BeaconNodeSet defines an interface for an object that fulfills the duties // of a group of beacon nodes. type BeaconNodeSet interface { diff --git a/testing/middleware/engine-api-proxy/proxy.go b/testing/middleware/engine-api-proxy/proxy.go index 4715371546..5806a6b43f 100644 --- a/testing/middleware/engine-api-proxy/proxy.go +++ b/testing/middleware/engine-api-proxy/proxy.go @@ -34,8 +34,8 @@ type jsonRPCObject struct { } type interceptorConfig struct { - response interface{} - trigger func() bool + responseGen func() interface{} + trigger func() bool } // Proxy server that sits as a middleware between an Ethereum consensus client and an execution client, @@ -46,7 +46,7 @@ type Proxy struct { srv *http.Server lock sync.RWMutex interceptors map[string]*interceptorConfig - backedUpRequests []*http.Request + backedUpRequests map[string][]*http.Request } // New creates a proxy server forwarding requests from a consensus client to an execution client. @@ -57,7 +57,8 @@ func New(opts ...Option) (*Proxy, error) { proxyPort: defaultProxyPort, logger: logrus.New(), }, - interceptors: make(map[string]*interceptorConfig), + interceptors: make(map[string]*interceptorConfig), + backedUpRequests: map[string][]*http.Request{}, } for _, o := range opts { if err := o(p); err != nil { @@ -112,7 +113,7 @@ func (p *Proxy) ServeHTTP(w http.ResponseWriter, r *http.Request) { return } // Check if we need to intercept the request with a custom response. - hasIntercepted, err := p.interceptIfNeeded(requestBytes, w) + hasIntercepted, err := p.interceptIfNeeded(requestBytes, w, r) if err != nil { p.cfg.logger.WithError(err).Error("Could not intercept request") return @@ -126,7 +127,7 @@ func (p *Proxy) ServeHTTP(w http.ResponseWriter, r *http.Request) { // AddRequestInterceptor for a desired json-rpc method by specifying a custom response // and a function that checks if the interceptor should be triggered. -func (p *Proxy) AddRequestInterceptor(rpcMethodName string, response interface{}, trigger func() bool) { +func (p *Proxy) AddRequestInterceptor(rpcMethodName string, response func() interface{}, trigger func() bool) { p.lock.Lock() defer p.lock.Unlock() p.cfg.logger.Infof("Adding in interceptor for method %s", rpcMethodName) @@ -136,9 +137,40 @@ func (p *Proxy) AddRequestInterceptor(rpcMethodName string, response interface{} } } +// RemoveRequestInterceptor removes the request interceptor for the provided method. +func (p *Proxy) RemoveRequestInterceptor(rpcMethodName string) { + p.lock.Lock() + defer p.lock.Unlock() + p.cfg.logger.Infof("Removing interceptor for method %s", rpcMethodName) + delete(p.interceptors, rpcMethodName) +} + +// ReleaseBackedUpRequests releases backed up http requests which +// were previously ignored due to our interceptors. +func (p *Proxy) ReleaseBackedUpRequests(rpcMethodName string) { + p.lock.Lock() + defer p.lock.Unlock() + reqs := p.backedUpRequests[rpcMethodName] + for _, r := range reqs { + p.cfg.logger.Infof("Sending backed up request for method %s", rpcMethodName) + rBytes, err := parseRequestBytes(r) + if err != nil { + p.cfg.logger.Error(err) + continue + } + res, err := p.sendHttpRequest(r, rBytes) + if err != nil { + p.cfg.logger.Error(err) + continue + } + p.cfg.logger.Infof("Received response %s for backed up request for method %s", http.StatusText(res.StatusCode), rpcMethodName) + } + delete(p.backedUpRequests, rpcMethodName) +} + // Checks if there is a custom interceptor hook on the request, check if it can be // triggered, and then write the custom response to the writer. -func (p *Proxy) interceptIfNeeded(requestBytes []byte, w http.ResponseWriter) (hasIntercepted bool, err error) { +func (p *Proxy) interceptIfNeeded(requestBytes []byte, w http.ResponseWriter, r *http.Request) (hasIntercepted bool, err error) { if !isEngineAPICall(requestBytes) { return } @@ -159,12 +191,13 @@ func (p *Proxy) interceptIfNeeded(requestBytes []byte, w http.ResponseWriter) (h jResp := &jsonRPCObject{ Method: jreq.Method, ID: jreq.ID, - Result: interceptor.response, + Result: interceptor.responseGen(), } if err = json.NewEncoder(w).Encode(jResp); err != nil { return } hasIntercepted = true + p.backedUpRequests[jreq.Method] = append(p.backedUpRequests[jreq.Method], r) return } @@ -177,18 +210,39 @@ func (p *Proxy) proxyRequest(requestBytes []byte, w http.ResponseWriter, r *http jreq = &jsonRPCObject{Method: "unknown"} } p.cfg.logger.Infof("Forwarding %s request for method %s to %s", r.Method, jreq.Method, p.cfg.destinationUrl.String()) - proxyReq, err := http.NewRequest(r.Method, p.cfg.destinationUrl.String(), r.Body) + proxyRes, err := p.sendHttpRequest(r, requestBytes) if err != nil { p.cfg.logger.WithError(err).Error("Could create new request") return } + p.cfg.logger.Infof("Received response for %s request with method %s from %s", r.Method, jreq.Method, p.cfg.destinationUrl.String()) + + defer func() { + if err = proxyRes.Body.Close(); err != nil { + p.cfg.logger.WithError(err).Error("Could not do close proxy responseGen body") + } + }() + + // Pipe the proxy responseGen to the original caller. + if _, err = io.Copy(w, proxyRes.Body); err != nil { + p.cfg.logger.WithError(err).Error("Could not copy proxy request body") + return + } +} + +func (p *Proxy) sendHttpRequest(req *http.Request, requestBytes []byte) (*http.Response, error) { + proxyReq, err := http.NewRequest(req.Method, p.cfg.destinationUrl.String(), req.Body) + if err != nil { + p.cfg.logger.WithError(err).Error("Could not create new request") + return nil, err + } // Set the modified request as the proxy request body. proxyReq.Body = ioutil.NopCloser(bytes.NewBuffer(requestBytes)) // Required proxy headers for forwarding JSON-RPC requests to the execution client. - proxyReq.Header.Set("Host", r.Host) - proxyReq.Header.Set("X-Forwarded-For", r.RemoteAddr) + proxyReq.Header.Set("Host", req.Host) + proxyReq.Header.Set("X-Forwarded-For", req.RemoteAddr) proxyReq.Header.Set("Content-Type", "application/json") client := &http.Client{} @@ -198,21 +252,9 @@ func (p *Proxy) proxyRequest(requestBytes []byte, w http.ResponseWriter, r *http proxyRes, err := client.Do(proxyReq) if err != nil { p.cfg.logger.WithError(err).Error("Could not forward request to destination server") - return - } - p.cfg.logger.Infof("Received response for %s request with method %s from %s", r.Method, jreq.Method, p.cfg.destinationUrl.String()) - - defer func() { - if err = proxyRes.Body.Close(); err != nil { - p.cfg.logger.WithError(err).Error("Could not do close proxy response body") - } - }() - - // Pipe the proxy response to the original caller. - if _, err = io.Copy(w, proxyRes.Body); err != nil { - p.cfg.logger.WithError(err).Error("Could not copy proxy request body") - return + return nil, err } + return proxyRes, nil } // Peek into the bytes of an HTTP request's body. diff --git a/testing/middleware/engine-api-proxy/proxy_test.go b/testing/middleware/engine-api-proxy/proxy_test.go index 90a2051f66..3bb7e526a0 100644 --- a/testing/middleware/engine-api-proxy/proxy_test.go +++ b/testing/middleware/engine-api-proxy/proxy_test.go @@ -116,7 +116,9 @@ func TestProxy_CustomInterceptors(t *testing.T) { // RPC method to intercept. proxy.AddRequestInterceptor( method, - &syncingResponse{Syncing: false}, // Custom response. + func() interface{} { + return &syncingResponse{Syncing: false} + }, // Custom response. func() bool { return true // Always intercept with a custom response. }, @@ -164,7 +166,9 @@ func TestProxy_CustomInterceptors(t *testing.T) { method := "engine_newPayloadV1" // RPC method to intercept. - wantInterceptedResponse := &engineResponse{BlockHash: common.BytesToHash([]byte("bar"))} + wantInterceptedResponse := func() interface{} { + return &engineResponse{BlockHash: common.BytesToHash([]byte("bar"))} + } conditional := false proxy.AddRequestInterceptor( method, @@ -199,7 +203,7 @@ func TestProxy_CustomInterceptors(t *testing.T) { proxyResult = &engineResponse{} err = rpcClient.CallContext(ctx, proxyResult, method) require.NoError(t, err) - require.DeepEqual(t, wantInterceptedResponse, proxyResult) + require.DeepEqual(t, wantInterceptedResponse(), proxyResult) }) t.Run("triggers interceptor response correctly", func(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) @@ -230,7 +234,9 @@ func TestProxy_CustomInterceptors(t *testing.T) { method := "engine_newPayloadV1" // RPC method to intercept. - wantInterceptedResponse := &engineResponse{BlockHash: common.BytesToHash([]byte("bar"))} + wantInterceptedResponse := func() interface{} { + return &engineResponse{BlockHash: common.BytesToHash([]byte("bar"))} + } proxy.AddRequestInterceptor( method, wantInterceptedResponse, @@ -249,7 +255,7 @@ func TestProxy_CustomInterceptors(t *testing.T) { // The interception should work and we should not be getting the destination // response but rather a custom response from the interceptor config. - require.DeepEqual(t, wantInterceptedResponse, proxyResult) + require.DeepEqual(t, wantInterceptedResponse(), proxyResult) }) }