diff --git a/beacon-chain/sync/BUILD.bazel b/beacon-chain/sync/BUILD.bazel index dfc0d622de..04cda5fc3e 100644 --- a/beacon-chain/sync/BUILD.bazel +++ b/beacon-chain/sync/BUILD.bazel @@ -91,6 +91,7 @@ go_library( "//shared/sszutil:go_default_library", "//shared/timeutils:go_default_library", "//shared/traceutil:go_default_library", + "//shared/version:go_default_library", "@com_github_hashicorp_golang_lru//:go_default_library", "@com_github_kevinms_leakybucket_go//:go_default_library", "@com_github_libp2p_go_libp2p_core//:go_default_library", @@ -123,6 +124,7 @@ go_test( "rate_limiter_test.go", "rpc_beacon_blocks_by_range_test.go", "rpc_beacon_blocks_by_root_test.go", + "rpc_chunked_response_test.go", "rpc_goodbye_test.go", "rpc_metadata_test.go", "rpc_ping_test.go", @@ -148,6 +150,7 @@ go_test( embed = [":go_default_library"], shard_count = 4, deps = [ + "//beacon-chain/blockchain:go_default_library", "//beacon-chain/blockchain/testing:go_default_library", "//beacon-chain/cache:go_default_library", "//beacon-chain/core/altair:go_default_library", diff --git a/beacon-chain/sync/context.go b/beacon-chain/sync/context.go index 21db7ff08c..785e9708cf 100644 --- a/beacon-chain/sync/context.go +++ b/beacon-chain/sync/context.go @@ -1,15 +1,21 @@ package sync import ( - "errors" - "github.com/libp2p/go-libp2p-core/network" + "github.com/pkg/errors" "github.com/prysmaticlabs/prysm/beacon-chain/blockchain" + "github.com/prysmaticlabs/prysm/beacon-chain/core/helpers" "github.com/prysmaticlabs/prysm/beacon-chain/p2p" ) +// Specifies the fixed size context length. +const forkDigestLength = 4 + // writes peer's current context for the expected payload to the stream. -func writeContextToStream(stream network.Stream, chain blockchain.ChainInfoFetcher) error { +func writeContextToStream(objCtx []byte, stream network.Stream, chain blockchain.ChainInfoFetcher) error { + // The rpc context for our v2 methods is the fork-digest of + // the relevant payload. We write the associated fork-digest(context) + // into the stream for the payload. rpcCtx, err := rpcContext(stream, chain) if err != nil { return err @@ -18,6 +24,10 @@ func writeContextToStream(stream network.Stream, chain blockchain.ChainInfoFetch if len(rpcCtx) == 0 { return nil } + // Always choose the object's context when writing to the stream. + if objCtx != nil { + rpcCtx = objCtx + } _, err = stream.Write(rpcCtx) return err } @@ -32,7 +42,7 @@ func readContextFromStream(stream network.Stream, chain blockchain.ChainInfoFetc return []byte{}, nil } // Read context (fork-digest) from stream - b := make([]byte, 4) + b := make([]byte, forkDigestLength) if _, err := stream.Read(b); err != nil { return nil, err } @@ -49,6 +59,14 @@ func rpcContext(stream network.Stream, chain blockchain.ChainInfoFetcher) ([]byt case p2p.SchemaVersionV1: // Return empty context for a v1 method. return []byte{}, nil + case p2p.SchemaVersionV2: + currFork := chain.CurrentFork() + genRoot := chain.GenesisValidatorRoot() + digest, err := helpers.ComputeForkDigest(currFork.CurrentVersion, genRoot[:]) + if err != nil { + return nil, err + } + return digest[:], nil default: return nil, errors.New("invalid version of %s registered for topic: %s") } diff --git a/beacon-chain/sync/context_test.go b/beacon-chain/sync/context_test.go index c70e3d9a5e..169e8232a0 100644 --- a/beacon-chain/sync/context_test.go +++ b/beacon-chain/sync/context_test.go @@ -30,7 +30,7 @@ func TestContextWrite_NoWrites(t *testing.T) { assert.NoError(t, err) // Nothing will be written to the stream - assert.NoError(t, writeContextToStream(strm, nil)) + assert.NoError(t, writeContextToStream(nil, strm, nil)) if testutil.WaitTimeout(wg, 1*time.Second) { t.Fatal("Did not receive stream within 1 sec") } diff --git a/beacon-chain/sync/initial-sync/blocks_fetcher_test.go b/beacon-chain/sync/initial-sync/blocks_fetcher_test.go index 43e2b27e50..2c975e6b49 100644 --- a/beacon-chain/sync/initial-sync/blocks_fetcher_test.go +++ b/beacon-chain/sync/initial-sync/blocks_fetcher_test.go @@ -281,6 +281,8 @@ func TestBlocksFetcher_RoundRobin(t *testing.T) { FinalizedCheckPoint: ð.Checkpoint{ Epoch: 0, }, + Genesis: time.Now(), + ValidatorsRoot: [32]byte{}, } ctx, cancel := context.WithCancel(context.Background()) @@ -414,7 +416,8 @@ func TestBlocksFetcher_handleRequest(t *testing.T) { } mc, p2p, _ := initializeTestServices(t, chainConfig.expectedBlockSlots, chainConfig.peers) - + mc.ValidatorsRoot = [32]byte{} + mc.Genesis = time.Now() t.Run("context cancellation", func(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) fetcher := newBlocksFetcher(ctx, &blocksFetcherConfig{ @@ -547,7 +550,7 @@ func TestBlocksFetcher_RequestBlocksRateLimitingLocks(t *testing.T) { defer cancel() fetcher := newBlocksFetcher(ctx, &blocksFetcherConfig{p2p: p1}) fetcher.rateLimiter = leakybucket.NewCollector(float64(req.Count), int64(req.Count*burstFactor), false) - + fetcher.chain = &mock.ChainService{Genesis: time.Now(), ValidatorsRoot: [32]byte{}} hook := logTest.NewGlobal() wg := new(sync.WaitGroup) wg.Add(1) @@ -610,7 +613,8 @@ func TestBlocksFetcher_requestBlocksFromPeerReturningInvalidBlocks(t *testing.T) for i := req.StartSlot; i < req.StartSlot.Add(req.Count*req.Step); i += types.Slot(req.Step) { blk := testutil.NewBeaconBlock() blk.Block.Slot = i - assert.NoError(t, beaconsync.WriteChunk(stream, nil, p1.Encoding(), blk)) + mchain := &mock.ChainService{Genesis: time.Now(), ValidatorsRoot: [32]byte{}} + assert.NoError(t, beaconsync.WriteBlockChunk(stream, mchain, p1.Encoding(), wrapper.WrappedPhase0SignedBeaconBlock(blk))) } assert.NoError(t, stream.Close()) } @@ -631,7 +635,8 @@ func TestBlocksFetcher_requestBlocksFromPeerReturningInvalidBlocks(t *testing.T) for i := req.StartSlot; i < req.StartSlot.Add(req.Count*req.Step+1); i += types.Slot(req.Step) { blk := testutil.NewBeaconBlock() blk.Block.Slot = i - assert.NoError(t, beaconsync.WriteChunk(stream, nil, p1.Encoding(), blk)) + chain := &mock.ChainService{Genesis: time.Now(), ValidatorsRoot: [32]byte{}} + assert.NoError(t, beaconsync.WriteBlockChunk(stream, chain, p1.Encoding(), wrapper.WrappedPhase0SignedBeaconBlock(blk))) } assert.NoError(t, stream.Close()) } @@ -652,11 +657,12 @@ func TestBlocksFetcher_requestBlocksFromPeerReturningInvalidBlocks(t *testing.T) return func(stream network.Stream) { blk := testutil.NewBeaconBlock() blk.Block.Slot = 163 - assert.NoError(t, beaconsync.WriteChunk(stream, nil, p1.Encoding(), blk)) + chain := &mock.ChainService{Genesis: time.Now(), ValidatorsRoot: [32]byte{}} + assert.NoError(t, beaconsync.WriteBlockChunk(stream, chain, p1.Encoding(), wrapper.WrappedPhase0SignedBeaconBlock(blk))) blk = testutil.NewBeaconBlock() blk.Block.Slot = 162 - assert.NoError(t, beaconsync.WriteChunk(stream, nil, p1.Encoding(), blk)) + assert.NoError(t, beaconsync.WriteBlockChunk(stream, chain, p1.Encoding(), wrapper.WrappedPhase0SignedBeaconBlock(blk))) assert.NoError(t, stream.Close()) } }, @@ -676,11 +682,13 @@ func TestBlocksFetcher_requestBlocksFromPeerReturningInvalidBlocks(t *testing.T) return func(stream network.Stream) { blk := testutil.NewBeaconBlock() blk.Block.Slot = 160 - assert.NoError(t, beaconsync.WriteChunk(stream, nil, p1.Encoding(), blk)) + chain := &mock.ChainService{Genesis: time.Now(), ValidatorsRoot: [32]byte{}} + + assert.NoError(t, beaconsync.WriteBlockChunk(stream, chain, p1.Encoding(), wrapper.WrappedPhase0SignedBeaconBlock(blk))) blk = testutil.NewBeaconBlock() blk.Block.Slot = 160 - assert.NoError(t, beaconsync.WriteChunk(stream, nil, p1.Encoding(), blk)) + assert.NoError(t, beaconsync.WriteBlockChunk(stream, chain, p1.Encoding(), wrapper.WrappedPhase0SignedBeaconBlock(blk))) assert.NoError(t, stream.Close()) } }, @@ -703,14 +711,15 @@ func TestBlocksFetcher_requestBlocksFromPeerReturningInvalidBlocks(t *testing.T) }() for i := req.StartSlot; i < req.StartSlot.Add(req.Count*req.Step); i += types.Slot(req.Step) { blk := testutil.NewBeaconBlock() + chain := &mock.ChainService{Genesis: time.Now(), ValidatorsRoot: [32]byte{}} // Patch mid block, with invalid slot number. if i == req.StartSlot.Add(req.Count*req.Step/2) { blk.Block.Slot = req.StartSlot - 1 - assert.NoError(t, beaconsync.WriteChunk(stream, nil, p1.Encoding(), blk)) + assert.NoError(t, beaconsync.WriteBlockChunk(stream, chain, p1.Encoding(), wrapper.WrappedPhase0SignedBeaconBlock(blk))) break } else { blk.Block.Slot = i - assert.NoError(t, beaconsync.WriteChunk(stream, nil, p1.Encoding(), blk)) + assert.NoError(t, beaconsync.WriteBlockChunk(stream, chain, p1.Encoding(), wrapper.WrappedPhase0SignedBeaconBlock(blk))) } } } @@ -734,14 +743,15 @@ func TestBlocksFetcher_requestBlocksFromPeerReturningInvalidBlocks(t *testing.T) }() for i := req.StartSlot; i < req.StartSlot.Add(req.Count*req.Step); i += types.Slot(req.Step) { blk := testutil.NewBeaconBlock() + chain := &mock.ChainService{Genesis: time.Now(), ValidatorsRoot: [32]byte{}} // Patch mid block, with invalid slot number. if i == req.StartSlot.Add(req.Count*req.Step/2) { blk.Block.Slot = req.StartSlot.Add(req.Count * req.Step) - assert.NoError(t, beaconsync.WriteChunk(stream, nil, p1.Encoding(), blk)) + assert.NoError(t, beaconsync.WriteBlockChunk(stream, chain, p1.Encoding(), wrapper.WrappedPhase0SignedBeaconBlock(blk))) break } else { blk.Block.Slot = i - assert.NoError(t, beaconsync.WriteChunk(stream, nil, p1.Encoding(), blk)) + assert.NoError(t, beaconsync.WriteBlockChunk(stream, chain, p1.Encoding(), wrapper.WrappedPhase0SignedBeaconBlock(blk))) } } } @@ -762,11 +772,12 @@ func TestBlocksFetcher_requestBlocksFromPeerReturningInvalidBlocks(t *testing.T) return func(stream network.Stream) { blk := testutil.NewBeaconBlock() blk.Block.Slot = 100 - assert.NoError(t, beaconsync.WriteChunk(stream, nil, p1.Encoding(), blk)) + chain := &mock.ChainService{Genesis: time.Now(), ValidatorsRoot: [32]byte{}} + assert.NoError(t, beaconsync.WriteBlockChunk(stream, chain, p1.Encoding(), wrapper.WrappedPhase0SignedBeaconBlock(blk))) blk = testutil.NewBeaconBlock() blk.Block.Slot = 105 - assert.NoError(t, beaconsync.WriteChunk(stream, nil, p1.Encoding(), blk)) + assert.NoError(t, beaconsync.WriteBlockChunk(stream, chain, p1.Encoding(), wrapper.WrappedPhase0SignedBeaconBlock(blk))) assert.NoError(t, stream.Close()) } }, @@ -785,11 +796,12 @@ func TestBlocksFetcher_requestBlocksFromPeerReturningInvalidBlocks(t *testing.T) return func(stream network.Stream) { blk := testutil.NewBeaconBlock() blk.Block.Slot = 100 - assert.NoError(t, beaconsync.WriteChunk(stream, nil, p1.Encoding(), blk)) + chain := &mock.ChainService{Genesis: time.Now(), ValidatorsRoot: [32]byte{}} + assert.NoError(t, beaconsync.WriteBlockChunk(stream, chain, p1.Encoding(), wrapper.WrappedPhase0SignedBeaconBlock(blk))) blk = testutil.NewBeaconBlock() blk.Block.Slot = 103 - assert.NoError(t, beaconsync.WriteChunk(stream, nil, p1.Encoding(), blk)) + assert.NoError(t, beaconsync.WriteBlockChunk(stream, chain, p1.Encoding(), wrapper.WrappedPhase0SignedBeaconBlock(blk))) assert.NoError(t, stream.Close()) } }, @@ -805,7 +817,7 @@ func TestBlocksFetcher_requestBlocksFromPeerReturningInvalidBlocks(t *testing.T) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - fetcher := newBlocksFetcher(ctx, &blocksFetcherConfig{p2p: p1}) + fetcher := newBlocksFetcher(ctx, &blocksFetcherConfig{p2p: p1, chain: &mock.ChainService{Genesis: time.Now(), ValidatorsRoot: [32]byte{}}}) fetcher.rateLimiter = leakybucket.NewCollector(0.000001, 640, false) for _, tt := range tests { diff --git a/beacon-chain/sync/initial-sync/blocks_fetcher_utils_test.go b/beacon-chain/sync/initial-sync/blocks_fetcher_utils_test.go index 68e90e748d..f0b5073c51 100644 --- a/beacon-chain/sync/initial-sync/blocks_fetcher_utils_test.go +++ b/beacon-chain/sync/initial-sync/blocks_fetcher_utils_test.go @@ -178,6 +178,8 @@ func TestBlocksFetcher_findFork(t *testing.T) { Epoch: finalizedEpoch, Root: []byte(fmt.Sprintf("finalized_root %d", finalizedEpoch)), }, + Genesis: time.Now(), + ValidatorsRoot: [32]byte{}, } ctx, cancel := context.WithCancel(context.Background()) @@ -321,9 +323,11 @@ func TestBlocksFetcher_findForkWithPeer(t *testing.T) { st, err := testutil.NewBeaconState() require.NoError(t, err) mc := &mock.ChainService{ - State: st, - Root: genesisRoot[:], - DB: beaconDB, + State: st, + Root: genesisRoot[:], + DB: beaconDB, + Genesis: time.Now(), + ValidatorsRoot: [32]byte{}, } ctx, cancel := context.WithCancel(context.Background()) @@ -438,6 +442,8 @@ func TestBlocksFetcher_findAncestor(t *testing.T) { Epoch: finalizedEpoch, Root: []byte(fmt.Sprintf("finalized_root %d", finalizedEpoch)), }, + Genesis: time.Now(), + ValidatorsRoot: [32]byte{}, } ctx, cancel := context.WithCancel(context.Background()) diff --git a/beacon-chain/sync/initial-sync/blocks_queue_test.go b/beacon-chain/sync/initial-sync/blocks_queue_test.go index 4ba30c4c85..03000bbf35 100644 --- a/beacon-chain/sync/initial-sync/blocks_queue_test.go +++ b/beacon-chain/sync/initial-sync/blocks_queue_test.go @@ -1052,6 +1052,8 @@ func TestBlocksQueue_stuckInUnfavourableFork(t *testing.T) { Epoch: finalizedEpoch, Root: []byte(fmt.Sprintf("finalized_root %d", finalizedEpoch)), }, + Genesis: time.Now(), + ValidatorsRoot: [32]byte{}, } ctx, cancel := context.WithCancel(context.Background()) @@ -1251,6 +1253,8 @@ func TestBlocksQueue_stuckWhenHeadIsSetToOrphanedBlock(t *testing.T) { Epoch: finalizedEpoch, Root: []byte(fmt.Sprintf("finalized_root %d", finalizedEpoch)), }, + Genesis: time.Now(), + ValidatorsRoot: [32]byte{}, } // Populate database with blocks with part of the chain, orphaned block will be added on top. diff --git a/beacon-chain/sync/initial-sync/initial_sync_test.go b/beacon-chain/sync/initial-sync/initial_sync_test.go index 64e6a1621e..e0e22a3901 100644 --- a/beacon-chain/sync/initial-sync/initial_sync_test.go +++ b/beacon-chain/sync/initial-sync/initial_sync_test.go @@ -96,6 +96,8 @@ func initializeTestServices(t *testing.T, slots []types.Slot, peers []*peerData) FinalizedCheckPoint: ð.Checkpoint{ Epoch: 0, }, + Genesis: time.Now(), + ValidatorsRoot: [32]byte{}, }, p, beaconDB } @@ -216,8 +218,9 @@ func connectPeer(t *testing.T, host *p2pt.TestP2P, datum *peerData, peerStatus * ret = ret[:req.Count] } + mChain := &mock.ChainService{Genesis: time.Now(), ValidatorsRoot: [32]byte{}} for i := 0; i < len(ret); i++ { - assert.NoError(t, beaconsync.WriteChunk(stream, nil, p.Encoding(), ret[i])) + assert.NoError(t, beaconsync.WriteBlockChunk(stream, mChain, p.Encoding(), wrapper.WrappedPhase0SignedBeaconBlock(ret[i]))) } }) @@ -286,7 +289,8 @@ func connectPeerHavingBlocks( if uint64(i) >= uint64(len(blocks)) { break } - require.NoError(t, beaconsync.WriteChunk(stream, nil, p.Encoding(), blocks[i])) + chain := &mock.ChainService{Genesis: time.Now(), ValidatorsRoot: [32]byte{}} + require.NoError(t, beaconsync.WriteBlockChunk(stream, chain, p.Encoding(), wrapper.WrappedPhase0SignedBeaconBlock(blocks[i]))) } }) diff --git a/beacon-chain/sync/initial-sync/round_robin_test.go b/beacon-chain/sync/initial-sync/round_robin_test.go index 91a6c00dd8..c7e4f5961c 100644 --- a/beacon-chain/sync/initial-sync/round_robin_test.go +++ b/beacon-chain/sync/initial-sync/round_robin_test.go @@ -295,6 +295,8 @@ func TestService_roundRobinSync(t *testing.T) { FinalizedCheckPoint: ð.Checkpoint{ Epoch: 0, }, + Genesis: time.Now(), + ValidatorsRoot: [32]byte{}, } // no-op mock s := &Service{ ctx: context.Background(), @@ -338,6 +340,8 @@ func TestService_processBlock(t *testing.T) { FinalizedCheckPoint: ð.Checkpoint{ Epoch: 0, }, + Genesis: time.Now(), + ValidatorsRoot: [32]byte{}, }, StateNotifier: &mock.MockStateNotifier{}, }) @@ -527,6 +531,8 @@ func TestService_blockProviderScoring(t *testing.T) { Epoch: 0, Root: make([]byte, 32), }, + Genesis: time.Now(), + ValidatorsRoot: [32]byte{}, } // no-op mock s := &Service{ ctx: context.Background(), @@ -590,6 +596,8 @@ func TestService_syncToFinalizedEpoch(t *testing.T) { Epoch: 0, Root: make([]byte, 32), }, + Genesis: time.Now(), + ValidatorsRoot: [32]byte{}, } s := &Service{ ctx: context.Background(), diff --git a/beacon-chain/sync/initial-sync/service_test.go b/beacon-chain/sync/initial-sync/service_test.go index 8344b43ab2..dd90e30a08 100644 --- a/beacon-chain/sync/initial-sync/service_test.go +++ b/beacon-chain/sync/initial-sync/service_test.go @@ -58,6 +58,8 @@ func TestService_InitStartStop(t *testing.T) { FinalizedCheckPoint: ð.Checkpoint{ Epoch: 0, }, + Genesis: time.Unix(4113849600, 0), + ValidatorsRoot: [32]byte{}, } }, methodRuns: func(fd *event.Feed) { @@ -86,6 +88,8 @@ func TestService_InitStartStop(t *testing.T) { FinalizedCheckPoint: ð.Checkpoint{ Epoch: 0, }, + Genesis: time.Now().Add(-5 * time.Minute), + ValidatorsRoot: [32]byte{}, } }, methodRuns: func(fd *event.Feed) { @@ -117,6 +121,8 @@ func TestService_InitStartStop(t *testing.T) { FinalizedCheckPoint: ð.Checkpoint{ Epoch: helpers.SlotToEpoch(futureSlot), }, + Genesis: makeGenesisTime(futureSlot), + ValidatorsRoot: [32]byte{}, } }, methodRuns: func(fd *event.Feed) { @@ -150,7 +156,7 @@ func TestService_InitStartStop(t *testing.T) { defer hook.Reset() ctx, cancel := context.WithCancel(context.Background()) defer cancel() - mc := &mock.ChainService{} + mc := &mock.ChainService{Genesis: time.Now(), ValidatorsRoot: [32]byte{}} // Allow overriding with customized chain service. if tt.chainService != nil { mc = tt.chainService() @@ -211,7 +217,7 @@ func TestService_waitForStateInitialization(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - s := newService(ctx, &mock.ChainService{}) + s := newService(ctx, &mock.ChainService{Genesis: time.Now(), ValidatorsRoot: [32]byte{}}) wg := &sync.WaitGroup{} wg.Add(1) go func() { @@ -238,7 +244,7 @@ func TestService_waitForStateInitialization(t *testing.T) { defer hook.Reset() ctx, cancel := context.WithCancel(context.Background()) defer cancel() - s := newService(ctx, &mock.ChainService{}) + s := newService(ctx, &mock.ChainService{Genesis: time.Now(), ValidatorsRoot: [32]byte{}}) expectedGenesisTime := time.Unix(358544700, 0) var receivedGenesisTime time.Time @@ -282,7 +288,7 @@ func TestService_waitForStateInitialization(t *testing.T) { defer hook.Reset() ctx, cancel := context.WithCancel(context.Background()) defer cancel() - s := newService(ctx, &mock.ChainService{}) + s := newService(ctx, &mock.ChainService{Genesis: time.Now(), ValidatorsRoot: [32]byte{}}) // Initialize mock feed _ = s.cfg.StateNotifier.StateFeed() @@ -320,7 +326,7 @@ func TestService_waitForStateInitialization(t *testing.T) { } func TestService_markSynced(t *testing.T) { - mc := &mock.ChainService{} + mc := &mock.ChainService{Genesis: time.Now(), ValidatorsRoot: [32]byte{}} ctx, cancel := context.WithCancel(context.Background()) defer cancel() s := NewService(ctx, &Config{ @@ -403,6 +409,8 @@ func TestService_Resync(t *testing.T) { FinalizedCheckPoint: ð.Checkpoint{ Epoch: helpers.SlotToEpoch(futureSlot), }, + Genesis: time.Now(), + ValidatorsRoot: [32]byte{}, } }, assert: func(s *Service) { diff --git a/beacon-chain/sync/rpc_beacon_blocks_by_range.go b/beacon-chain/sync/rpc_beacon_blocks_by_range.go index fd9dbda6b1..05d5091573 100644 --- a/beacon-chain/sync/rpc_beacon_blocks_by_range.go +++ b/beacon-chain/sync/rpc_beacon_blocks_by_range.go @@ -162,7 +162,7 @@ func (s *Service) writeBlockRangeToStream(ctx context.Context, startSlot, endSlo if b == nil || b.IsNil() || b.Block().IsNil() { continue } - if chunkErr := s.chunkWriter(stream, b.Proto()); chunkErr != nil { + if chunkErr := s.chunkBlockWriter(stream, b); chunkErr != nil { log.WithError(chunkErr).Debug("Could not send a chunked response") s.writeErrorResponseToStream(responseCodeServerError, p2ptypes.ErrGeneric.Error(), stream) traceutil.AnnotateError(span, chunkErr) diff --git a/beacon-chain/sync/rpc_beacon_blocks_by_range_test.go b/beacon-chain/sync/rpc_beacon_blocks_by_range_test.go index 1693879d02..70869f1f92 100644 --- a/beacon-chain/sync/rpc_beacon_blocks_by_range_test.go +++ b/beacon-chain/sync/rpc_beacon_blocks_by_range_test.go @@ -22,7 +22,6 @@ import ( "github.com/prysmaticlabs/prysm/cmd/beacon-chain/flags" ethpb "github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1" pb "github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1" - statepb "github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1" "github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1/wrapper" "github.com/prysmaticlabs/prysm/shared/bytesutil" "github.com/prysmaticlabs/prysm/shared/params" @@ -635,7 +634,7 @@ func TestRPCBeaconBlocksByRange_FilterBlocks(t *testing.T) { require.NoError(t, d.SaveBlock(context.Background(), wrapper.WrappedPhase0SignedBeaconBlock(blocks[j]))) j++ } - stateSummaries := make([]*statepb.StateSummary, len(blocks)) + stateSummaries := make([]*ethpb.StateSummary, len(blocks)) if finalized { if chain.CanonicalRoots == nil { @@ -644,7 +643,7 @@ func TestRPCBeaconBlocksByRange_FilterBlocks(t *testing.T) { for i, b := range blocks { bRoot, err := b.Block.HashTreeRoot() require.NoError(t, err) - stateSummaries[i] = &statepb.StateSummary{ + stateSummaries[i] = ðpb.StateSummary{ Slot: b.Block.Slot, Root: bRoot[:], } @@ -685,7 +684,7 @@ func TestRPCBeaconBlocksByRange_FilterBlocks(t *testing.T) { require.NoError(t, d.SaveBlock(context.Background(), wrapper.WrappedPhase0SignedBeaconBlock(blocks[j]))) j++ } - stateSummaries := make([]*statepb.StateSummary, len(blocks)) + stateSummaries := make([]*ethpb.StateSummary, len(blocks)) if finalized { if chain.CanonicalRoots == nil { chain.CanonicalRoots = map[[32]byte]bool{} @@ -693,7 +692,7 @@ func TestRPCBeaconBlocksByRange_FilterBlocks(t *testing.T) { for i, b := range blocks { bRoot, err := b.Block.HashTreeRoot() require.NoError(t, err) - stateSummaries[i] = &statepb.StateSummary{ + stateSummaries[i] = ðpb.StateSummary{ Slot: b.Block.Slot, Root: bRoot[:], } diff --git a/beacon-chain/sync/rpc_beacon_blocks_by_root.go b/beacon-chain/sync/rpc_beacon_blocks_by_root.go index ba4f9ccc94..73f91b8e6b 100644 --- a/beacon-chain/sync/rpc_beacon_blocks_by_root.go +++ b/beacon-chain/sync/rpc_beacon_blocks_by_root.go @@ -71,7 +71,7 @@ func (s *Service) beaconBlocksRootRPCHandler(ctx context.Context, msg interface{ if blk == nil || blk.IsNil() { continue } - if err := s.chunkWriter(stream, blk.Proto()); err != nil { + if err := s.chunkBlockWriter(stream, blk); err != nil { return err } } diff --git a/beacon-chain/sync/rpc_beacon_blocks_by_root_test.go b/beacon-chain/sync/rpc_beacon_blocks_by_root_test.go index 9119dcf5e0..29e6dd768e 100644 --- a/beacon-chain/sync/rpc_beacon_blocks_by_root_test.go +++ b/beacon-chain/sync/rpc_beacon_blocks_by_root_test.go @@ -44,6 +44,7 @@ func TestRecentBeaconBlocksRPCHandler_ReturnsBlocks(t *testing.T) { } r := &Service{cfg: &Config{P2P: p1, DB: d}, rateLimiter: newRateLimiter(p1)} + r.cfg.Chain = &mock.ChainService{ValidatorsRoot: [32]byte{}} pcl := protocol.ID(p2p.RPCBlocksByRootTopicV1) topic := string(pcl) r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(10000, 10000, false) @@ -104,6 +105,8 @@ func TestRecentBeaconBlocks_RPCRequestSent(t *testing.T) { State: genesisState, FinalizedCheckPoint: finalizedCheckpt, Root: blockARoot[:], + Genesis: time.Now(), + ValidatorsRoot: [32]byte{}, }, }, slotToPendingBlocks: gcache.New(time.Second, 2*time.Second), diff --git a/beacon-chain/sync/rpc_chunked_response.go b/beacon-chain/sync/rpc_chunked_response.go index d11ee1e787..6b5823fc51 100644 --- a/beacon-chain/sync/rpc_chunked_response.go +++ b/beacon-chain/sync/rpc_chunked_response.go @@ -1,35 +1,56 @@ package sync import ( - "errors" - libp2pcore "github.com/libp2p/go-libp2p-core" + "github.com/pkg/errors" "github.com/prysmaticlabs/prysm/beacon-chain/blockchain" + "github.com/prysmaticlabs/prysm/beacon-chain/core/helpers" "github.com/prysmaticlabs/prysm/beacon-chain/p2p" "github.com/prysmaticlabs/prysm/beacon-chain/p2p/encoder" - eth "github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1" + "github.com/prysmaticlabs/prysm/beacon-chain/p2p/types" "github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1/block" - "github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1/wrapper" + "github.com/prysmaticlabs/prysm/shared/bytesutil" + "github.com/prysmaticlabs/prysm/shared/p2putils" + "github.com/prysmaticlabs/prysm/shared/params" + "github.com/prysmaticlabs/prysm/shared/version" ) -// chunkWriter writes the given message as a chunked response to the given network +// chunkBlockWriter writes the given message as a chunked response to the given network // stream. // response_chunk ::= | | | -func (s *Service) chunkWriter(stream libp2pcore.Stream, msg interface{}) error { +func (s *Service) chunkBlockWriter(stream libp2pcore.Stream, blk block.SignedBeaconBlock) error { SetStreamWriteDeadline(stream, defaultWriteDuration) - return WriteChunk(stream, s.cfg.Chain, s.cfg.P2P.Encoding(), msg) + return WriteBlockChunk(stream, s.cfg.Chain, s.cfg.P2P.Encoding(), blk) } // WriteChunk object to stream. // response_chunk ::= | | | -func WriteChunk(stream libp2pcore.Stream, chain blockchain.ChainInfoFetcher, encoding encoder.NetworkEncoding, msg interface{}) error { +func WriteBlockChunk(stream libp2pcore.Stream, chain blockchain.ChainInfoFetcher, encoding encoder.NetworkEncoding, blk block.SignedBeaconBlock) error { if _, err := stream.Write([]byte{responseCodeSuccess}); err != nil { return err } - if err := writeContextToStream(stream, chain); err != nil { + obtainedCtx := []byte{} + switch blk.Version() { + case version.Phase0: + valRoot := chain.GenesisValidatorRoot() + digest, err := p2putils.ForkDigestFromEpoch(params.BeaconConfig().GenesisEpoch, valRoot[:]) + if err != nil { + return err + } + obtainedCtx = digest[:] + case version.Altair: + valRoot := chain.GenesisValidatorRoot() + digest, err := p2putils.ForkDigestFromEpoch(params.BeaconConfig().AltairForkEpoch, valRoot[:]) + if err != nil { + return err + } + obtainedCtx = digest[:] + } + + if err := writeContextToStream(obtainedCtx, stream, chain); err != nil { return err } - _, err := encoding.EncodeWithMaxLength(stream, msg) + _, err := encoding.EncodeWithMaxLength(stream, blk) return err } @@ -40,17 +61,13 @@ func ReadChunkedBlock(stream libp2pcore.Stream, chain blockchain.ChainInfoFetche if isFirstChunk { return readFirstChunkedBlock(stream, chain, p2p) } - blk := ð.SignedBeaconBlock{} - if err := readResponseChunk(stream, chain, p2p, blk); err != nil { - return nil, err - } - return wrapper.WrappedPhase0SignedBeaconBlock(blk), nil + + return readResponseChunk(stream, chain, p2p) } // readFirstChunkedBlock reads the first chunked block and applies the appropriate deadlines to // it. func readFirstChunkedBlock(stream libp2pcore.Stream, chain blockchain.ChainInfoFetcher, p2p p2p.P2P) (block.SignedBeaconBlock, error) { - blk := ð.SignedBeaconBlock{} code, errMsg, err := ReadStatusCode(stream, p2p.Encoding()) if err != nil { return nil, err @@ -58,30 +75,62 @@ func readFirstChunkedBlock(stream libp2pcore.Stream, chain blockchain.ChainInfoF if code != 0 { return nil, errors.New(errMsg) } - // No-op for now with the rpc context. - _, err = readContextFromStream(stream, chain) + rpcCtx, err := readContextFromStream(stream, chain) + if err != nil { + return nil, err + } + blk, err := extractBlockDataType(rpcCtx, chain) if err != nil { return nil, err } err = p2p.Encoding().DecodeWithMaxLength(stream, blk) - return wrapper.WrappedPhase0SignedBeaconBlock(blk), err + return blk, err } // readResponseChunk reads the response from the stream and decodes it into the // provided message type. -func readResponseChunk(stream libp2pcore.Stream, chain blockchain.ChainInfoFetcher, p2p p2p.P2P, to interface{}) error { +func readResponseChunk(stream libp2pcore.Stream, chain blockchain.ChainInfoFetcher, p2p p2p.P2P) (block.SignedBeaconBlock, error) { SetStreamReadDeadline(stream, respTimeout) code, errMsg, err := readStatusCodeNoDeadline(stream, p2p.Encoding()) if err != nil { - return err + return nil, err } if code != 0 { - return errors.New(errMsg) + return nil, errors.New(errMsg) } // No-op for now with the rpc context. - _, err = readContextFromStream(stream, chain) + rpcCtx, err := readContextFromStream(stream, chain) if err != nil { - return err + return nil, err } - return p2p.Encoding().DecodeWithMaxLength(stream, to) + blk, err := extractBlockDataType(rpcCtx, chain) + if err != nil { + return nil, err + } + err = p2p.Encoding().DecodeWithMaxLength(stream, blk) + return blk, err +} + +func extractBlockDataType(digest []byte, chain blockchain.ChainInfoFetcher) (block.SignedBeaconBlock, error) { + if len(digest) == 0 { + bFunc, ok := types.BlockMap[bytesutil.ToBytes4(params.BeaconConfig().GenesisForkVersion)] + if !ok { + return nil, errors.New("no block type exists for the genesis fork version.") + } + return bFunc() + } + if len(digest) != forkDigestLength { + return nil, errors.Errorf("invalid digest returned, wanted a length of %d but received %d", forkDigestLength, len(digest)) + } + vRoot := chain.GenesisValidatorRoot() + for k, blkFunc := range types.BlockMap { + rDigest, err := helpers.ComputeForkDigest(k[:], vRoot[:]) + if err != nil { + return nil, err + } + if rDigest == bytesutil.ToBytes4(digest) { + return blkFunc() + } + } + return nil, errors.New("no valid digest matched") } diff --git a/beacon-chain/sync/rpc_chunked_response_test.go b/beacon-chain/sync/rpc_chunked_response_test.go new file mode 100644 index 0000000000..cde28d4844 --- /dev/null +++ b/beacon-chain/sync/rpc_chunked_response_test.go @@ -0,0 +1,96 @@ +package sync + +import ( + "reflect" + "testing" + + "github.com/prysmaticlabs/prysm/beacon-chain/blockchain" + mock "github.com/prysmaticlabs/prysm/beacon-chain/blockchain/testing" + "github.com/prysmaticlabs/prysm/beacon-chain/core/helpers" + ethpb "github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1" + "github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1/block" + "github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1/wrapper" + "github.com/prysmaticlabs/prysm/shared/params" + "github.com/prysmaticlabs/prysm/shared/testutil/require" +) + +func TestExtractBlockDataType(t *testing.T) { + // Precompute digests + genDigest, err := helpers.ComputeForkDigest(params.BeaconConfig().GenesisForkVersion, params.BeaconConfig().ZeroHash[:]) + require.NoError(t, err) + altairDigest, err := helpers.ComputeForkDigest(params.BeaconConfig().AltairForkVersion, params.BeaconConfig().ZeroHash[:]) + require.NoError(t, err) + + type args struct { + digest []byte + chain blockchain.ChainInfoFetcher + } + tests := []struct { + name string + args args + want block.SignedBeaconBlock + wantErr bool + }{ + { + name: "no digest", + args: args{ + digest: []byte{}, + chain: &mock.ChainService{ValidatorsRoot: [32]byte{}}, + }, + want: wrapper.WrappedPhase0SignedBeaconBlock(ðpb.SignedBeaconBlock{}), + wantErr: false, + }, + { + name: "invalid digest", + args: args{ + digest: []byte{0x00, 0x01}, + chain: &mock.ChainService{ValidatorsRoot: [32]byte{}}, + }, + want: nil, + wantErr: true, + }, + { + name: "non existent digest", + args: args{ + digest: []byte{0x00, 0x01, 0x02, 0x03}, + chain: &mock.ChainService{ValidatorsRoot: [32]byte{}}, + }, + want: nil, + wantErr: true, + }, + { + name: "genesis fork version", + args: args{ + digest: genDigest[:], + chain: &mock.ChainService{ValidatorsRoot: [32]byte{}}, + }, + want: wrapper.WrappedPhase0SignedBeaconBlock(ðpb.SignedBeaconBlock{}), + wantErr: false, + }, + { + name: "altair fork version", + args: args{ + digest: altairDigest[:], + chain: &mock.ChainService{ValidatorsRoot: [32]byte{}}, + }, + want: func() block.SignedBeaconBlock { + wsb, err := wrapper.WrappedAltairSignedBeaconBlock(ðpb.SignedBeaconBlockAltair{Block: ðpb.BeaconBlockAltair{}}) + require.NoError(t, err) + return wsb + }(), + wantErr: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := extractBlockDataType(tt.args.digest, tt.args.chain) + if (err != nil) != tt.wantErr { + t.Errorf("extractBlockDataType() error = %v, wantErr %v", err, tt.wantErr) + return + } + if !reflect.DeepEqual(got, tt.want) { + t.Errorf("extractBlockDataType() got = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/beacon-chain/sync/rpc_send_request.go b/beacon-chain/sync/rpc_send_request.go index 9e54cea343..efe630840a 100644 --- a/beacon-chain/sync/rpc_send_request.go +++ b/beacon-chain/sync/rpc_send_request.go @@ -8,6 +8,7 @@ import ( "github.com/pkg/errors" types "github.com/prysmaticlabs/eth2-types" "github.com/prysmaticlabs/prysm/beacon-chain/blockchain" + "github.com/prysmaticlabs/prysm/beacon-chain/core/helpers" "github.com/prysmaticlabs/prysm/beacon-chain/p2p" p2ptypes "github.com/prysmaticlabs/prysm/beacon-chain/p2p/types" pb "github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1" @@ -27,7 +28,11 @@ func SendBeaconBlocksByRangeRequest( ctx context.Context, chain blockchain.ChainInfoFetcher, p2pProvider p2p.P2P, pid peer.ID, req *pb.BeaconBlocksByRangeRequest, blockProcessor BeaconBlockProcessor, ) ([]block.SignedBeaconBlock, error) { - stream, err := p2pProvider.Send(ctx, req, p2p.RPCBlocksByRangeTopicV1, pid) + topic, err := p2p.TopicFromMessage(p2p.BeaconBlocksByRangeMessageName, helpers.SlotToEpoch(chain.CurrentSlot())) + if err != nil { + return nil, err + } + stream, err := p2pProvider.Send(ctx, req, topic, pid) if err != nil { return nil, err } @@ -85,7 +90,11 @@ func SendBeaconBlocksByRootRequest( ctx context.Context, chain blockchain.ChainInfoFetcher, p2pProvider p2p.P2P, pid peer.ID, req *p2ptypes.BeaconBlockByRootsReq, blockProcessor BeaconBlockProcessor, ) ([]block.SignedBeaconBlock, error) { - stream, err := p2pProvider.Send(ctx, req, p2p.RPCBlocksByRootTopicV1, pid) + topic, err := p2p.TopicFromMessage(p2p.BeaconBlocksByRootsMessageName, helpers.SlotToEpoch(chain.CurrentSlot())) + if err != nil { + return nil, err + } + stream, err := p2pProvider.Send(ctx, req, topic, pid) if err != nil { return nil, err } diff --git a/beacon-chain/sync/rpc_send_request_test.go b/beacon-chain/sync/rpc_send_request_test.go index 87e095294a..c0173eb736 100644 --- a/beacon-chain/sync/rpc_send_request_test.go +++ b/beacon-chain/sync/rpc_send_request_test.go @@ -6,10 +6,12 @@ import ( "fmt" "io" "testing" + "time" "github.com/libp2p/go-libp2p-core/mux" "github.com/libp2p/go-libp2p-core/network" types "github.com/prysmaticlabs/eth2-types" + mock "github.com/prysmaticlabs/prysm/beacon-chain/blockchain/testing" "github.com/prysmaticlabs/prysm/beacon-chain/p2p" p2ptest "github.com/prysmaticlabs/prysm/beacon-chain/p2p/testing" p2pTypes "github.com/prysmaticlabs/prysm/beacon-chain/p2p/types" @@ -35,7 +37,8 @@ func TestSendRequest_SendBeaconBlocksByRangeRequest(t *testing.T) { p1.Connect(bogusPeer) req := &pb.BeaconBlocksByRangeRequest{} - _, err := SendBeaconBlocksByRangeRequest(ctx, nil, p1, bogusPeer.PeerID(), req, nil) + chain := &mock.ChainService{Genesis: time.Now(), ValidatorsRoot: [32]byte{}} + _, err := SendBeaconBlocksByRangeRequest(ctx, chain, p1, bogusPeer.PeerID(), req, nil) assert.ErrorContains(t, "protocol not supported", err) }) @@ -80,7 +83,8 @@ func TestSendRequest_SendBeaconBlocksByRangeRequest(t *testing.T) { if uint64(i) >= uint64(len(knownBlocks)) { break } - err = WriteChunk(stream, nil, p2pProvider.Encoding(), knownBlocks[i]) + chain := &mock.ChainService{Genesis: time.Now(), ValidatorsRoot: [32]byte{}} + err = WriteBlockChunk(stream, chain, p2pProvider.Encoding(), wrapper.WrappedPhase0SignedBeaconBlock(knownBlocks[i])) if err != nil && err.Error() != mux.ErrReset.Error() { require.NoError(t, err) } @@ -99,7 +103,8 @@ func TestSendRequest_SendBeaconBlocksByRangeRequest(t *testing.T) { Count: 128, Step: 1, } - blocks, err := SendBeaconBlocksByRangeRequest(ctx, nil, p1, p2.PeerID(), req, nil) + chain := &mock.ChainService{Genesis: time.Now(), ValidatorsRoot: [32]byte{}} + blocks, err := SendBeaconBlocksByRangeRequest(ctx, chain, p1, p2.PeerID(), req, nil) assert.NoError(t, err) assert.Equal(t, 128, len(blocks)) }) @@ -117,7 +122,8 @@ func TestSendRequest_SendBeaconBlocksByRangeRequest(t *testing.T) { Step: 1, } blocksFromProcessor := make([]block.SignedBeaconBlock, 0) - blocks, err := SendBeaconBlocksByRangeRequest(ctx, nil, p1, p2.PeerID(), req, func(block block.SignedBeaconBlock) error { + chain := &mock.ChainService{Genesis: time.Now(), ValidatorsRoot: [32]byte{}} + blocks, err := SendBeaconBlocksByRangeRequest(ctx, chain, p1, p2.PeerID(), req, func(block block.SignedBeaconBlock) error { blocksFromProcessor = append(blocksFromProcessor, block) return nil }) @@ -139,7 +145,8 @@ func TestSendRequest_SendBeaconBlocksByRangeRequest(t *testing.T) { Step: 1, } errFromProcessor := errors.New("processor error") - _, err := SendBeaconBlocksByRangeRequest(ctx, nil, p1, p2.PeerID(), req, func(block block.SignedBeaconBlock) error { + chain := &mock.ChainService{Genesis: time.Now(), ValidatorsRoot: [32]byte{}} + _, err := SendBeaconBlocksByRangeRequest(ctx, chain, p1, p2.PeerID(), req, func(block block.SignedBeaconBlock) error { return errFromProcessor }) assert.ErrorContains(t, errFromProcessor.Error(), err) @@ -157,7 +164,8 @@ func TestSendRequest_SendBeaconBlocksByRangeRequest(t *testing.T) { Count: 128, Step: 1, } - blocks, err := SendBeaconBlocksByRangeRequest(ctx, nil, p1, p2.PeerID(), req, nil) + chain := &mock.ChainService{Genesis: time.Now(), ValidatorsRoot: [32]byte{}} + blocks, err := SendBeaconBlocksByRangeRequest(ctx, chain, p1, p2.PeerID(), req, nil) assert.NoError(t, err) assert.Equal(t, 128, len(blocks)) @@ -168,7 +176,7 @@ func TestSendRequest_SendBeaconBlocksByRangeRequest(t *testing.T) { cfg.MaxRequestBlocks = maxRequestBlocks params.OverrideBeaconNetworkConfig(cfg) }() - blocks, err = SendBeaconBlocksByRangeRequest(ctx, nil, p1, p2.PeerID(), req, func(block block.SignedBeaconBlock) error { + blocks, err = SendBeaconBlocksByRangeRequest(ctx, chain, p1, p2.PeerID(), req, func(block block.SignedBeaconBlock) error { // Since ssz checks the boundaries, and doesn't normally allow to send requests bigger than // the max request size, we are updating max request size dynamically. Even when updated dynamically, // no more than max request size of blocks is expected on return. @@ -199,7 +207,8 @@ func TestSendRequest_SendBeaconBlocksByRangeRequest(t *testing.T) { Count: 128, Step: 1, } - blocks, err := SendBeaconBlocksByRangeRequest(ctx, nil, p1, p2.PeerID(), req, nil) + chain := &mock.ChainService{Genesis: time.Now(), ValidatorsRoot: [32]byte{}} + blocks, err := SendBeaconBlocksByRangeRequest(ctx, chain, p1, p2.PeerID(), req, nil) assert.ErrorContains(t, expectedErr.Error(), err) assert.Equal(t, 0, len(blocks)) }) @@ -227,7 +236,8 @@ func TestSendRequest_SendBeaconBlocksByRangeRequest(t *testing.T) { if uint64(i) >= uint64(len(knownBlocks)) { break } - err = WriteChunk(stream, nil, p2.Encoding(), knownBlocks[i]) + chain := &mock.ChainService{Genesis: time.Now(), ValidatorsRoot: [32]byte{}} + err = WriteBlockChunk(stream, chain, p2.Encoding(), wrapper.WrappedPhase0SignedBeaconBlock(knownBlocks[i])) if err != nil && err.Error() != mux.ErrReset.Error() { require.NoError(t, err) } @@ -239,7 +249,8 @@ func TestSendRequest_SendBeaconBlocksByRangeRequest(t *testing.T) { Count: 128, Step: 1, } - blocks, err := SendBeaconBlocksByRangeRequest(ctx, nil, p1, p2.PeerID(), req, nil) + chain := &mock.ChainService{Genesis: time.Now(), ValidatorsRoot: [32]byte{}} + blocks, err := SendBeaconBlocksByRangeRequest(ctx, chain, p1, p2.PeerID(), req, nil) assert.ErrorContains(t, ErrInvalidFetchedData.Error(), err) assert.Equal(t, 0, len(blocks)) @@ -268,7 +279,8 @@ func TestSendRequest_SendBeaconBlocksByRangeRequest(t *testing.T) { if uint64(i) >= uint64(len(knownBlocks)) { break } - err = WriteChunk(stream, nil, p2.Encoding(), knownBlocks[i]) + chain := &mock.ChainService{Genesis: time.Now(), ValidatorsRoot: [32]byte{}} + err = WriteBlockChunk(stream, chain, p2.Encoding(), wrapper.WrappedPhase0SignedBeaconBlock(knownBlocks[i])) if err != nil && err.Error() != mux.ErrReset.Error() { require.NoError(t, err) } @@ -280,7 +292,8 @@ func TestSendRequest_SendBeaconBlocksByRangeRequest(t *testing.T) { Count: 128, Step: 10, } - blocks, err := SendBeaconBlocksByRangeRequest(ctx, nil, p1, p2.PeerID(), req, nil) + chain := &mock.ChainService{Genesis: time.Now(), ValidatorsRoot: [32]byte{}} + blocks, err := SendBeaconBlocksByRangeRequest(ctx, chain, p1, p2.PeerID(), req, nil) assert.ErrorContains(t, ErrInvalidFetchedData.Error(), err) assert.Equal(t, 0, len(blocks)) @@ -309,7 +322,8 @@ func TestSendRequest_SendBeaconBlocksByRootRequest(t *testing.T) { p1.Connect(bogusPeer) req := &p2pTypes.BeaconBlockByRootsReq{} - _, err := SendBeaconBlocksByRootRequest(ctx, nil, p1, bogusPeer.PeerID(), req, nil) + chain := &mock.ChainService{Genesis: time.Now(), ValidatorsRoot: [32]byte{}} + _, err := SendBeaconBlocksByRootRequest(ctx, chain, p1, bogusPeer.PeerID(), req, nil) assert.ErrorContains(t, "protocol not supported", err) }) @@ -356,7 +370,8 @@ func TestSendRequest_SendBeaconBlocksByRootRequest(t *testing.T) { p2.SetStreamHandler(pcl, knownBlocksProvider(p2, nil)) req := &p2pTypes.BeaconBlockByRootsReq{knownRoots[0], knownRoots[1]} - blocks, err := SendBeaconBlocksByRootRequest(ctx, nil, p1, p2.PeerID(), req, nil) + chain := &mock.ChainService{Genesis: time.Now(), ValidatorsRoot: [32]byte{}} + blocks, err := SendBeaconBlocksByRootRequest(ctx, chain, p1, p2.PeerID(), req, nil) assert.NoError(t, err) assert.Equal(t, 2, len(blocks)) }) @@ -370,7 +385,8 @@ func TestSendRequest_SendBeaconBlocksByRootRequest(t *testing.T) { // No error from block processor. req := &p2pTypes.BeaconBlockByRootsReq{knownRoots[0], knownRoots[1]} blocksFromProcessor := make([]block.SignedBeaconBlock, 0) - blocks, err := SendBeaconBlocksByRootRequest(ctx, nil, p1, p2.PeerID(), req, func(block block.SignedBeaconBlock) error { + chain := &mock.ChainService{Genesis: time.Now(), ValidatorsRoot: [32]byte{}} + blocks, err := SendBeaconBlocksByRootRequest(ctx, chain, p1, p2.PeerID(), req, func(block block.SignedBeaconBlock) error { blocksFromProcessor = append(blocksFromProcessor, block) return nil }) @@ -388,7 +404,8 @@ func TestSendRequest_SendBeaconBlocksByRootRequest(t *testing.T) { // Send error from block processor. req := &p2pTypes.BeaconBlockByRootsReq{knownRoots[0], knownRoots[1]} errFromProcessor := errors.New("processor error") - _, err := SendBeaconBlocksByRootRequest(ctx, nil, p1, p2.PeerID(), req, func(block block.SignedBeaconBlock) error { + chain := &mock.ChainService{Genesis: time.Now(), ValidatorsRoot: [32]byte{}} + _, err := SendBeaconBlocksByRootRequest(ctx, chain, p1, p2.PeerID(), req, func(block block.SignedBeaconBlock) error { return errFromProcessor }) assert.ErrorContains(t, errFromProcessor.Error(), err) @@ -402,7 +419,8 @@ func TestSendRequest_SendBeaconBlocksByRootRequest(t *testing.T) { // No cap on max roots. req := &p2pTypes.BeaconBlockByRootsReq{knownRoots[0], knownRoots[1], knownRoots[2], knownRoots[3]} - blocks, err := SendBeaconBlocksByRootRequest(ctx, nil, p1, p2.PeerID(), req, nil) + chain := &mock.ChainService{Genesis: time.Now(), ValidatorsRoot: [32]byte{}} + blocks, err := SendBeaconBlocksByRootRequest(ctx, chain, p1, p2.PeerID(), req, nil) assert.NoError(t, err) assert.Equal(t, 4, len(blocks)) @@ -413,7 +431,7 @@ func TestSendRequest_SendBeaconBlocksByRootRequest(t *testing.T) { cfg.MaxRequestBlocks = maxRequestBlocks params.OverrideBeaconNetworkConfig(cfg) }() - blocks, err = SendBeaconBlocksByRootRequest(ctx, nil, p1, p2.PeerID(), req, func(block block.SignedBeaconBlock) error { + blocks, err = SendBeaconBlocksByRootRequest(ctx, chain, p1, p2.PeerID(), req, func(block block.SignedBeaconBlock) error { // Since ssz checks the boundaries, and doesn't normally allow to send requests bigger than // the max request size, we are updating max request size dynamically. Even when updated dynamically, // no more than max request size of blocks is expected on return. @@ -440,7 +458,8 @@ func TestSendRequest_SendBeaconBlocksByRootRequest(t *testing.T) { })) req := &p2pTypes.BeaconBlockByRootsReq{knownRoots[0], knownRoots[1], knownRoots[2], knownRoots[3]} - blocks, err := SendBeaconBlocksByRootRequest(ctx, nil, p1, p2.PeerID(), req, nil) + chain := &mock.ChainService{Genesis: time.Now(), ValidatorsRoot: [32]byte{}} + blocks, err := SendBeaconBlocksByRootRequest(ctx, chain, p1, p2.PeerID(), req, nil) assert.ErrorContains(t, expectedErr.Error(), err) assert.Equal(t, 0, len(blocks)) }) @@ -460,7 +479,8 @@ func TestSendRequest_SendBeaconBlocksByRootRequest(t *testing.T) { })) req := &p2pTypes.BeaconBlockByRootsReq{knownRoots[0], knownRoots[1], knownRoots[2], knownRoots[3]} - blocks, err := SendBeaconBlocksByRootRequest(ctx, nil, p1, p2.PeerID(), req, nil) + chain := &mock.ChainService{Genesis: time.Now(), ValidatorsRoot: [32]byte{}} + blocks, err := SendBeaconBlocksByRootRequest(ctx, chain, p1, p2.PeerID(), req, nil) assert.NoError(t, err) assert.Equal(t, 3, len(blocks)) })