From 35dc8c31fad3802f53c07578a086b912f83a52d9 Mon Sep 17 00:00:00 2001 From: Nishant Das Date: Tue, 18 May 2021 03:25:59 +0800 Subject: [PATCH] Add in Context For Req/Resp (#8895) * Add in context * use better function signatures * add in chainservice * fix rpc requests * add in test * fix tests * add in comments * Update beacon-chain/sync/rpc_chunked_response.go Co-authored-by: terence tsao * Update beacon-chain/sync/rpc_send_request.go Co-authored-by: terence tsao * rename files * fmt Co-authored-by: Raul Jordan Co-authored-by: terence tsao Co-authored-by: prylabs-bulldozer[bot] <58059840+prylabs-bulldozer[bot]@users.noreply.github.com> --- beacon-chain/sync/BUILD.bazel | 2 + beacon-chain/sync/context.go | 55 ++++++++++++++ beacon-chain/sync/context_test.go | 72 +++++++++++++++++++ .../sync/initial-sync/blocks_fetcher.go | 5 +- .../sync/initial-sync/blocks_fetcher_test.go | 28 ++++---- .../sync/initial-sync/initial_sync_test.go | 4 +- beacon-chain/sync/initial-sync/service.go | 3 +- .../sync/rpc_beacon_blocks_by_range_test.go | 21 +++--- .../sync/rpc_beacon_blocks_by_root.go | 2 +- .../sync/rpc_beacon_blocks_by_root_test.go | 5 +- beacon-chain/sync/rpc_chunked_response.go | 33 ++++++--- beacon-chain/sync/rpc_metadata.go | 5 ++ beacon-chain/sync/rpc_ping.go | 5 ++ beacon-chain/sync/rpc_send_request.go | 9 +-- beacon-chain/sync/rpc_send_request_test.go | 40 +++++------ beacon-chain/sync/rpc_status.go | 6 +- 16 files changed, 226 insertions(+), 69 deletions(-) create mode 100644 beacon-chain/sync/context.go create mode 100644 beacon-chain/sync/context_test.go diff --git a/beacon-chain/sync/BUILD.bazel b/beacon-chain/sync/BUILD.bazel index 4a1cee5b9d..f1e2c9645f 100644 --- a/beacon-chain/sync/BUILD.bazel +++ b/beacon-chain/sync/BUILD.bazel @@ -4,6 +4,7 @@ load("@io_bazel_rules_go//go:def.bzl", "go_test") go_library( name = "go_default_library", srcs = [ + "context.go", "deadlines.go", "decode_pubsub.go", "doc.go", @@ -106,6 +107,7 @@ go_test( name = "go_default_test", size = "small", srcs = [ + "context_test.go", "decode_pubsub_test.go", "error_test.go", "pending_attestations_queue_test.go", diff --git a/beacon-chain/sync/context.go b/beacon-chain/sync/context.go new file mode 100644 index 0000000000..21db7ff08c --- /dev/null +++ b/beacon-chain/sync/context.go @@ -0,0 +1,55 @@ +package sync + +import ( + "errors" + + "github.com/libp2p/go-libp2p-core/network" + "github.com/prysmaticlabs/prysm/beacon-chain/blockchain" + "github.com/prysmaticlabs/prysm/beacon-chain/p2p" +) + +// writes peer's current context for the expected payload to the stream. +func writeContextToStream(stream network.Stream, chain blockchain.ChainInfoFetcher) error { + rpcCtx, err := rpcContext(stream, chain) + if err != nil { + return err + } + // Exit early if there is an empty context. + if len(rpcCtx) == 0 { + return nil + } + _, err = stream.Write(rpcCtx) + return err +} + +// reads any attached context-bytes to the payload. +func readContextFromStream(stream network.Stream, chain blockchain.ChainInfoFetcher) ([]byte, error) { + rpcCtx, err := rpcContext(stream, chain) + if err != nil { + return nil, err + } + if len(rpcCtx) == 0 { + return []byte{}, nil + } + // Read context (fork-digest) from stream + b := make([]byte, 4) + if _, err := stream.Read(b); err != nil { + return nil, err + } + return b, nil +} + +// retrieve expected context depending on rpc topic schema version. +func rpcContext(stream network.Stream, chain blockchain.ChainInfoFetcher) ([]byte, error) { + _, _, version, err := p2p.TopicDeconstructor(string(stream.Protocol())) + if err != nil { + return nil, err + } + switch version { + case p2p.SchemaVersionV1: + // Return empty context for a v1 method. + return []byte{}, nil + default: + return nil, errors.New("invalid version of %s registered for topic: %s") + } +} diff --git a/beacon-chain/sync/context_test.go b/beacon-chain/sync/context_test.go new file mode 100644 index 0000000000..c70e3d9a5e --- /dev/null +++ b/beacon-chain/sync/context_test.go @@ -0,0 +1,72 @@ +package sync + +import ( + "context" + "sync" + "testing" + "time" + + core "github.com/libp2p/go-libp2p-core" + "github.com/libp2p/go-libp2p-core/network" + "github.com/prysmaticlabs/prysm/beacon-chain/p2p" + p2ptest "github.com/prysmaticlabs/prysm/beacon-chain/p2p/testing" + "github.com/prysmaticlabs/prysm/shared/testutil" + "github.com/prysmaticlabs/prysm/shared/testutil/assert" +) + +func TestContextWrite_NoWrites(t *testing.T) { + p1 := p2ptest.NewTestP2P(t) + nPeer := p2ptest.NewTestP2P(t) + p1.Connect(nPeer) + + wg := new(sync.WaitGroup) + prID := p2p.RPCPingTopicV1 + wg.Add(1) + nPeer.BHost.SetStreamHandler(core.ProtocolID(prID), func(stream network.Stream) { + wg.Done() + // no-op + }) + strm, err := p1.BHost.NewStream(context.Background(), nPeer.PeerID(), p2p.RPCPingTopicV1) + assert.NoError(t, err) + + // Nothing will be written to the stream + assert.NoError(t, writeContextToStream(strm, nil)) + if testutil.WaitTimeout(wg, 1*time.Second) { + t.Fatal("Did not receive stream within 1 sec") + } +} + +func TestContextRead_NoReads(t *testing.T) { + p1 := p2ptest.NewTestP2P(t) + nPeer := p2ptest.NewTestP2P(t) + p1.Connect(nPeer) + + wg := new(sync.WaitGroup) + prID := p2p.RPCPingTopicV1 + wg.Add(1) + wantedData := []byte{'A', 'B', 'C', 'D'} + nPeer.BHost.SetStreamHandler(core.ProtocolID(prID), func(stream network.Stream) { + // No Context will be read from it + dt, err := readContextFromStream(stream, nil) + assert.NoError(t, err) + assert.Equal(t, 0, len(dt)) + + // Ensure sent over data hasn't been modified. + buf := make([]byte, len(wantedData)) + n, err := stream.Read(buf) + assert.NoError(t, err) + assert.Equal(t, len(wantedData), n) + assert.DeepEqual(t, wantedData, buf) + + wg.Done() + }) + strm, err := p1.BHost.NewStream(context.Background(), nPeer.PeerID(), p2p.RPCPingTopicV1) + assert.NoError(t, err) + + n, err := strm.Write(wantedData) + assert.NoError(t, err) + assert.Equal(t, len(wantedData), n) + if testutil.WaitTimeout(wg, 1*time.Second) { + t.Fatal("Did not receive stream within 1 sec") + } +} diff --git a/beacon-chain/sync/initial-sync/blocks_fetcher.go b/beacon-chain/sync/initial-sync/blocks_fetcher.go index 0b890afcc0..65e2b55316 100644 --- a/beacon-chain/sync/initial-sync/blocks_fetcher.go +++ b/beacon-chain/sync/initial-sync/blocks_fetcher.go @@ -323,8 +323,7 @@ func (f *blocksFetcher) requestBlocks( } f.rateLimiter.Add(pid.String(), int64(req.Count)) l.Unlock() - - return prysmsync.SendBeaconBlocksByRangeRequest(ctx, f.p2p, pid, req, nil) + return prysmsync.SendBeaconBlocksByRangeRequest(ctx, f.chain, f.p2p, pid, req, nil) } // requestBlocksByRoot is a wrapper for handling BeaconBlockByRootsReq requests/streams. @@ -352,7 +351,7 @@ func (f *blocksFetcher) requestBlocksByRoot( f.rateLimiter.Add(pid.String(), int64(len(*req))) l.Unlock() - return prysmsync.SendBeaconBlocksByRootRequest(ctx, f.p2p, pid, req, nil) + return prysmsync.SendBeaconBlocksByRootRequest(ctx, f.chain, f.p2p, pid, req, nil) } // waitForBandwidth blocks up until peer's bandwidth is restored. diff --git a/beacon-chain/sync/initial-sync/blocks_fetcher_test.go b/beacon-chain/sync/initial-sync/blocks_fetcher_test.go index a61abfcfa6..0f0fc40528 100644 --- a/beacon-chain/sync/initial-sync/blocks_fetcher_test.go +++ b/beacon-chain/sync/initial-sync/blocks_fetcher_test.go @@ -608,7 +608,7 @@ func TestBlocksFetcher_requestBlocksFromPeerReturningInvalidBlocks(t *testing.T) for i := req.StartSlot; i < req.StartSlot.Add(req.Count*req.Step); i += types.Slot(req.Step) { blk := testutil.NewBeaconBlock() blk.Block.Slot = i - assert.NoError(t, beaconsync.WriteChunk(stream, p1.Encoding(), blk)) + assert.NoError(t, beaconsync.WriteChunk(stream, nil, p1.Encoding(), blk)) } assert.NoError(t, stream.Close()) } @@ -629,7 +629,7 @@ func TestBlocksFetcher_requestBlocksFromPeerReturningInvalidBlocks(t *testing.T) for i := req.StartSlot; i < req.StartSlot.Add(req.Count*req.Step+1); i += types.Slot(req.Step) { blk := testutil.NewBeaconBlock() blk.Block.Slot = i - assert.NoError(t, beaconsync.WriteChunk(stream, p1.Encoding(), blk)) + assert.NoError(t, beaconsync.WriteChunk(stream, nil, p1.Encoding(), blk)) } assert.NoError(t, stream.Close()) } @@ -650,11 +650,11 @@ func TestBlocksFetcher_requestBlocksFromPeerReturningInvalidBlocks(t *testing.T) return func(stream network.Stream) { blk := testutil.NewBeaconBlock() blk.Block.Slot = 163 - assert.NoError(t, beaconsync.WriteChunk(stream, p1.Encoding(), blk)) + assert.NoError(t, beaconsync.WriteChunk(stream, nil, p1.Encoding(), blk)) blk = testutil.NewBeaconBlock() blk.Block.Slot = 162 - assert.NoError(t, beaconsync.WriteChunk(stream, p1.Encoding(), blk)) + assert.NoError(t, beaconsync.WriteChunk(stream, nil, p1.Encoding(), blk)) assert.NoError(t, stream.Close()) } }, @@ -674,11 +674,11 @@ func TestBlocksFetcher_requestBlocksFromPeerReturningInvalidBlocks(t *testing.T) return func(stream network.Stream) { blk := testutil.NewBeaconBlock() blk.Block.Slot = 160 - assert.NoError(t, beaconsync.WriteChunk(stream, p1.Encoding(), blk)) + assert.NoError(t, beaconsync.WriteChunk(stream, nil, p1.Encoding(), blk)) blk = testutil.NewBeaconBlock() blk.Block.Slot = 160 - assert.NoError(t, beaconsync.WriteChunk(stream, p1.Encoding(), blk)) + assert.NoError(t, beaconsync.WriteChunk(stream, nil, p1.Encoding(), blk)) assert.NoError(t, stream.Close()) } }, @@ -704,11 +704,11 @@ func TestBlocksFetcher_requestBlocksFromPeerReturningInvalidBlocks(t *testing.T) // Patch mid block, with invalid slot number. if i == req.StartSlot.Add(req.Count*req.Step/2) { blk.Block.Slot = req.StartSlot - 1 - assert.NoError(t, beaconsync.WriteChunk(stream, p1.Encoding(), blk)) + assert.NoError(t, beaconsync.WriteChunk(stream, nil, p1.Encoding(), blk)) break } else { blk.Block.Slot = i - assert.NoError(t, beaconsync.WriteChunk(stream, p1.Encoding(), blk)) + assert.NoError(t, beaconsync.WriteChunk(stream, nil, p1.Encoding(), blk)) } } } @@ -735,11 +735,11 @@ func TestBlocksFetcher_requestBlocksFromPeerReturningInvalidBlocks(t *testing.T) // Patch mid block, with invalid slot number. if i == req.StartSlot.Add(req.Count*req.Step/2) { blk.Block.Slot = req.StartSlot.Add(req.Count * req.Step) - assert.NoError(t, beaconsync.WriteChunk(stream, p1.Encoding(), blk)) + assert.NoError(t, beaconsync.WriteChunk(stream, nil, p1.Encoding(), blk)) break } else { blk.Block.Slot = i - assert.NoError(t, beaconsync.WriteChunk(stream, p1.Encoding(), blk)) + assert.NoError(t, beaconsync.WriteChunk(stream, nil, p1.Encoding(), blk)) } } } @@ -760,11 +760,11 @@ func TestBlocksFetcher_requestBlocksFromPeerReturningInvalidBlocks(t *testing.T) return func(stream network.Stream) { blk := testutil.NewBeaconBlock() blk.Block.Slot = 100 - assert.NoError(t, beaconsync.WriteChunk(stream, p1.Encoding(), blk)) + assert.NoError(t, beaconsync.WriteChunk(stream, nil, p1.Encoding(), blk)) blk = testutil.NewBeaconBlock() blk.Block.Slot = 105 - assert.NoError(t, beaconsync.WriteChunk(stream, p1.Encoding(), blk)) + assert.NoError(t, beaconsync.WriteChunk(stream, nil, p1.Encoding(), blk)) assert.NoError(t, stream.Close()) } }, @@ -783,11 +783,11 @@ func TestBlocksFetcher_requestBlocksFromPeerReturningInvalidBlocks(t *testing.T) return func(stream network.Stream) { blk := testutil.NewBeaconBlock() blk.Block.Slot = 100 - assert.NoError(t, beaconsync.WriteChunk(stream, p1.Encoding(), blk)) + assert.NoError(t, beaconsync.WriteChunk(stream, nil, p1.Encoding(), blk)) blk = testutil.NewBeaconBlock() blk.Block.Slot = 103 - assert.NoError(t, beaconsync.WriteChunk(stream, p1.Encoding(), blk)) + assert.NoError(t, beaconsync.WriteChunk(stream, nil, p1.Encoding(), blk)) assert.NoError(t, stream.Close()) } }, diff --git a/beacon-chain/sync/initial-sync/initial_sync_test.go b/beacon-chain/sync/initial-sync/initial_sync_test.go index 0c0dd5d80f..f8836c9e99 100644 --- a/beacon-chain/sync/initial-sync/initial_sync_test.go +++ b/beacon-chain/sync/initial-sync/initial_sync_test.go @@ -216,7 +216,7 @@ func connectPeer(t *testing.T, host *p2pt.TestP2P, datum *peerData, peerStatus * } for i := 0; i < len(ret); i++ { - assert.NoError(t, beaconsync.WriteChunk(stream, p.Encoding(), ret[i])) + assert.NoError(t, beaconsync.WriteChunk(stream, nil, p.Encoding(), ret[i])) } }) @@ -285,7 +285,7 @@ func connectPeerHavingBlocks( if uint64(i) >= uint64(len(blocks)) { break } - require.NoError(t, beaconsync.WriteChunk(stream, p.Encoding(), blocks[i])) + require.NoError(t, beaconsync.WriteChunk(stream, nil, p.Encoding(), blocks[i])) } }) diff --git a/beacon-chain/sync/initial-sync/service.go b/beacon-chain/sync/initial-sync/service.go index de24812c42..1d5b24efca 100644 --- a/beacon-chain/sync/initial-sync/service.go +++ b/beacon-chain/sync/initial-sync/service.go @@ -29,8 +29,7 @@ var _ shared.Service = (*Service)(nil) // blockchainService defines the interface for interaction with block chain service. type blockchainService interface { blockchain.BlockReceiver - blockchain.HeadFetcher - blockchain.FinalizationFetcher + blockchain.ChainInfoFetcher } // Config to set up the initial sync service. diff --git a/beacon-chain/sync/rpc_beacon_blocks_by_range_test.go b/beacon-chain/sync/rpc_beacon_blocks_by_range_test.go index 000150b065..ac05b1eb6e 100644 --- a/beacon-chain/sync/rpc_beacon_blocks_by_range_test.go +++ b/beacon-chain/sync/rpc_beacon_blocks_by_range_test.go @@ -16,6 +16,7 @@ import ( "github.com/prysmaticlabs/prysm/beacon-chain/core/helpers" db2 "github.com/prysmaticlabs/prysm/beacon-chain/db" db "github.com/prysmaticlabs/prysm/beacon-chain/db/testing" + "github.com/prysmaticlabs/prysm/beacon-chain/p2p" "github.com/prysmaticlabs/prysm/beacon-chain/p2p/encoder" p2ptest "github.com/prysmaticlabs/prysm/beacon-chain/p2p/testing" p2ptypes "github.com/prysmaticlabs/prysm/beacon-chain/p2p/types" @@ -51,7 +52,7 @@ func TestRPCBeaconBlocksByRange_RPCHandlerReturnsBlocks(t *testing.T) { // Start service with 160 as allowed blocks capacity (and almost zero capacity recovery). r := &Service{cfg: &Config{P2P: p1, DB: d, Chain: &chainMock.ChainService{}}, rateLimiter: newRateLimiter(p1)} - pcl := protocol.ID("/testing") + pcl := protocol.ID(p2p.RPCBlocksByRangeTopicV1) topic := string(pcl) r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(0.000001, int64(req.Count*10), false) var wg sync.WaitGroup @@ -113,7 +114,7 @@ func TestRPCBeaconBlocksByRange_ReturnCorrectNumberBack(t *testing.T) { // Start service with 160 as allowed blocks capacity (and almost zero capacity recovery). r := &Service{cfg: &Config{P2P: p1, DB: d, Chain: &chainMock.ChainService{}}, rateLimiter: newRateLimiter(p1)} - pcl := protocol.ID("/testing") + pcl := protocol.ID(p2p.RPCBlocksByRangeTopicV1) topic := string(pcl) r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(0.000001, int64(req.Count*10), false) var wg sync.WaitGroup @@ -177,7 +178,7 @@ func TestRPCBeaconBlocksByRange_RPCHandlerReturnsSortedBlocks(t *testing.T) { // Start service with 160 as allowed blocks capacity (and almost zero capacity recovery). r := &Service{cfg: &Config{P2P: p1, DB: d, Chain: &chainMock.ChainService{}}, rateLimiter: newRateLimiter(p1)} - pcl := protocol.ID("/testing") + pcl := protocol.ID(p2p.RPCBlocksByRangeTopicV1) topic := string(pcl) r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(0.000001, int64(req.Count*10), false) @@ -242,7 +243,7 @@ func TestRPCBeaconBlocksByRange_ReturnsGenesisBlock(t *testing.T) { } r := &Service{cfg: &Config{P2P: p1, DB: d, Chain: &chainMock.ChainService{}}, rateLimiter: newRateLimiter(p1)} - pcl := protocol.ID("/testing") + pcl := protocol.ID(p2p.RPCBlocksByRangeTopicV1) topic := string(pcl) r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(10000, 10000, false) @@ -292,7 +293,7 @@ func TestRPCBeaconBlocksByRange_RPCHandlerRateLimitOverflow(t *testing.T) { req *pb.BeaconBlocksByRangeRequest, validateBlocks bool, success bool) error { var wg sync.WaitGroup wg.Add(1) - pcl := protocol.ID("/testing") + pcl := protocol.ID(p2p.RPCBlocksByRangeTopicV1) p2.BHost.SetStreamHandler(pcl, func(stream network.Stream) { defer wg.Done() if !validateBlocks { @@ -330,7 +331,7 @@ func TestRPCBeaconBlocksByRange_RPCHandlerRateLimitOverflow(t *testing.T) { capacity := int64(flags.Get().BlockBatchLimit * 3) r := &Service{cfg: &Config{P2P: p1, DB: d, Chain: &chainMock.ChainService{}}, rateLimiter: newRateLimiter(p1)} - pcl := protocol.ID("/testing") + pcl := protocol.ID(p2p.RPCBlocksByRangeTopicV1) topic := string(pcl) r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(0.000001, capacity, false) req := &pb.BeaconBlocksByRangeRequest{ @@ -356,7 +357,7 @@ func TestRPCBeaconBlocksByRange_RPCHandlerRateLimitOverflow(t *testing.T) { capacity := int64(flags.Get().BlockBatchLimit * 3) r := &Service{cfg: &Config{P2P: p1, DB: d, Chain: &chainMock.ChainService{}}, rateLimiter: newRateLimiter(p1)} - pcl := protocol.ID("/testing") + pcl := protocol.ID(p2p.RPCBlocksByRangeTopicV1) topic := string(pcl) r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(0.000001, capacity, false) @@ -385,7 +386,7 @@ func TestRPCBeaconBlocksByRange_RPCHandlerRateLimitOverflow(t *testing.T) { capacity := int64(flags.Get().BlockBatchLimit * flags.Get().BlockBatchLimitBurstFactor) r := &Service{cfg: &Config{P2P: p1, DB: d, Chain: &chainMock.ChainService{}}, rateLimiter: newRateLimiter(p1)} - pcl := protocol.ID("/testing") + pcl := protocol.ID(p2p.RPCBlocksByRangeTopicV1) topic := string(pcl) r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(0.000001, capacity, false) @@ -547,7 +548,7 @@ func TestRPCBeaconBlocksByRange_EnforceResponseInvariants(t *testing.T) { parentRoot = rt } } - pcl := protocol.ID("/testing") + pcl := protocol.ID(p2p.RPCBlocksByRangeTopicV1) sendRequest := func(p1, p2 *p2ptest.TestP2P, r *Service, req *pb.BeaconBlocksByRangeRequest, processBlocks func([]*ethpb.SignedBeaconBlock)) error { var wg sync.WaitGroup @@ -703,7 +704,7 @@ func TestRPCBeaconBlocksByRange_FilterBlocks(t *testing.T) { })) } } - pcl := protocol.ID("/testing") + pcl := protocol.ID(p2p.RPCBlocksByRangeTopicV1) sendRequest := func(p1, p2 *p2ptest.TestP2P, r *Service, req *pb.BeaconBlocksByRangeRequest, processBlocks func([]*ethpb.SignedBeaconBlock)) error { var wg sync.WaitGroup diff --git a/beacon-chain/sync/rpc_beacon_blocks_by_root.go b/beacon-chain/sync/rpc_beacon_blocks_by_root.go index 73db0dad23..b7af04049e 100644 --- a/beacon-chain/sync/rpc_beacon_blocks_by_root.go +++ b/beacon-chain/sync/rpc_beacon_blocks_by_root.go @@ -17,7 +17,7 @@ func (s *Service) sendRecentBeaconBlocksRequest(ctx context.Context, blockRoots ctx, cancel := context.WithTimeout(ctx, respTimeout) defer cancel() - _, err := SendBeaconBlocksByRootRequest(ctx, s.cfg.P2P, id, blockRoots, func(blk *ethpb.SignedBeaconBlock) error { + _, err := SendBeaconBlocksByRootRequest(ctx, s.cfg.Chain, s.cfg.P2P, id, blockRoots, func(blk *ethpb.SignedBeaconBlock) error { blkRoot, err := blk.Block.HashTreeRoot() if err != nil { return err diff --git a/beacon-chain/sync/rpc_beacon_blocks_by_root_test.go b/beacon-chain/sync/rpc_beacon_blocks_by_root_test.go index 902adb488d..057dc23dcf 100644 --- a/beacon-chain/sync/rpc_beacon_blocks_by_root_test.go +++ b/beacon-chain/sync/rpc_beacon_blocks_by_root_test.go @@ -15,6 +15,7 @@ import ( mock "github.com/prysmaticlabs/prysm/beacon-chain/blockchain/testing" "github.com/prysmaticlabs/prysm/beacon-chain/core/state" db "github.com/prysmaticlabs/prysm/beacon-chain/db/testing" + "github.com/prysmaticlabs/prysm/beacon-chain/p2p" p2ptest "github.com/prysmaticlabs/prysm/beacon-chain/p2p/testing" p2pTypes "github.com/prysmaticlabs/prysm/beacon-chain/p2p/types" "github.com/prysmaticlabs/prysm/shared/params" @@ -42,7 +43,7 @@ func TestRecentBeaconBlocksRPCHandler_ReturnsBlocks(t *testing.T) { } r := &Service{cfg: &Config{P2P: p1, DB: d}, rateLimiter: newRateLimiter(p1)} - pcl := protocol.ID("/testing") + pcl := protocol.ID(p2p.RPCBlocksByRootTopicV1) topic := string(pcl) r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(10000, 10000, false) @@ -148,7 +149,7 @@ func TestRecentBeaconBlocksRPCHandler_HandleZeroBlocks(t *testing.T) { d := db.SetupDB(t) r := &Service{cfg: &Config{P2P: p1, DB: d}, rateLimiter: newRateLimiter(p1)} - pcl := protocol.ID("/testing") + pcl := protocol.ID(p2p.RPCBlocksByRootTopicV1) topic := string(pcl) r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(1, 1, false) diff --git a/beacon-chain/sync/rpc_chunked_response.go b/beacon-chain/sync/rpc_chunked_response.go index 30728b7ecf..92d517b130 100644 --- a/beacon-chain/sync/rpc_chunked_response.go +++ b/beacon-chain/sync/rpc_chunked_response.go @@ -5,37 +5,41 @@ import ( libp2pcore "github.com/libp2p/go-libp2p-core" eth "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1" + "github.com/prysmaticlabs/prysm/beacon-chain/blockchain" "github.com/prysmaticlabs/prysm/beacon-chain/p2p" "github.com/prysmaticlabs/prysm/beacon-chain/p2p/encoder" ) // chunkWriter writes the given message as a chunked response to the given network // stream. -// response_chunk ::= | | | +// response_chunk ::= | | | func (s *Service) chunkWriter(stream libp2pcore.Stream, msg interface{}) error { SetStreamWriteDeadline(stream, defaultWriteDuration) - return WriteChunk(stream, s.cfg.P2P.Encoding(), msg) + return WriteChunk(stream, s.cfg.Chain, s.cfg.P2P.Encoding(), msg) } // WriteChunk object to stream. -// response_chunk ::= | | | -func WriteChunk(stream libp2pcore.Stream, encoding encoder.NetworkEncoding, msg interface{}) error { +// response_chunk ::= | | | +func WriteChunk(stream libp2pcore.Stream, chain blockchain.ChainInfoFetcher, encoding encoder.NetworkEncoding, msg interface{}) error { if _, err := stream.Write([]byte{responseCodeSuccess}); err != nil { return err } + if err := writeContextToStream(stream, chain); err != nil { + return err + } _, err := encoding.EncodeWithMaxLength(stream, msg) return err } // ReadChunkedBlock handles each response chunk that is sent by the // peer and converts it into a beacon block. -func ReadChunkedBlock(stream libp2pcore.Stream, p2p p2p.P2P, isFirstChunk bool) (*eth.SignedBeaconBlock, error) { +func ReadChunkedBlock(stream libp2pcore.Stream, chain blockchain.ChainInfoFetcher, p2p p2p.P2P, isFirstChunk bool) (*eth.SignedBeaconBlock, error) { // Handle deadlines differently for first chunk if isFirstChunk { - return readFirstChunkedBlock(stream, p2p) + return readFirstChunkedBlock(stream, chain, p2p) } blk := ð.SignedBeaconBlock{} - if err := readResponseChunk(stream, p2p, blk); err != nil { + if err := readResponseChunk(stream, chain, p2p, blk); err != nil { return nil, err } return blk, nil @@ -43,7 +47,7 @@ func ReadChunkedBlock(stream libp2pcore.Stream, p2p p2p.P2P, isFirstChunk bool) // readFirstChunkedBlock reads the first chunked block and applies the appropriate deadlines to // it. -func readFirstChunkedBlock(stream libp2pcore.Stream, p2p p2p.P2P) (*eth.SignedBeaconBlock, error) { +func readFirstChunkedBlock(stream libp2pcore.Stream, chain blockchain.ChainInfoFetcher, p2p p2p.P2P) (*eth.SignedBeaconBlock, error) { blk := ð.SignedBeaconBlock{} code, errMsg, err := ReadStatusCode(stream, p2p.Encoding()) if err != nil { @@ -52,21 +56,30 @@ func readFirstChunkedBlock(stream libp2pcore.Stream, p2p p2p.P2P) (*eth.SignedBe if code != 0 { return nil, errors.New(errMsg) } + // No-op for now with the rpc context. + _, err = readContextFromStream(stream, chain) + if err != nil { + return nil, err + } err = p2p.Encoding().DecodeWithMaxLength(stream, blk) return blk, err } // readResponseChunk reads the response from the stream and decodes it into the // provided message type. -func readResponseChunk(stream libp2pcore.Stream, p2p p2p.P2P, to interface{}) error { +func readResponseChunk(stream libp2pcore.Stream, chain blockchain.ChainInfoFetcher, p2p p2p.P2P, to interface{}) error { SetStreamReadDeadline(stream, respTimeout) code, errMsg, err := readStatusCodeNoDeadline(stream, p2p.Encoding()) if err != nil { return err } - if code != 0 { return errors.New(errMsg) } + // No-op for now with the rpc context. + _, err = readContextFromStream(stream, chain) + if err != nil { + return err + } return p2p.Encoding().DecodeWithMaxLength(stream, to) } diff --git a/beacon-chain/sync/rpc_metadata.go b/beacon-chain/sync/rpc_metadata.go index 02b6d468a5..f01acb8251 100644 --- a/beacon-chain/sync/rpc_metadata.go +++ b/beacon-chain/sync/rpc_metadata.go @@ -47,6 +47,11 @@ func (s *Service) sendMetaDataRequest(ctx context.Context, id peer.ID) (*pb.Meta s.cfg.P2P.Peers().Scorers().BadResponsesScorer().Increment(stream.Conn().RemotePeer()) return nil, errors.New(errMsg) } + // No-op for now with the rpc context. + _, err = readContextFromStream(stream, s.cfg.Chain) + if err != nil { + return nil, err + } msg := new(pb.MetaData) if err := s.cfg.P2P.Encoding().DecodeWithMaxLength(stream, msg); err != nil { return nil, err diff --git a/beacon-chain/sync/rpc_ping.go b/beacon-chain/sync/rpc_ping.go index 5d4d99ca77..67aaa505d2 100644 --- a/beacon-chain/sync/rpc_ping.go +++ b/beacon-chain/sync/rpc_ping.go @@ -95,6 +95,11 @@ func (s *Service) sendPingRequest(ctx context.Context, id peer.ID) error { s.cfg.P2P.Peers().Scorers().BadResponsesScorer().Increment(stream.Conn().RemotePeer()) return errors.New(errMsg) } + // No-op for now with the rpc context. + _, err = readContextFromStream(stream, s.cfg.Chain) + if err != nil { + return err + } msg := new(types.SSZUint64) if err := s.cfg.P2P.Encoding().DecodeWithMaxLength(stream, msg); err != nil { return err diff --git a/beacon-chain/sync/rpc_send_request.go b/beacon-chain/sync/rpc_send_request.go index d499a95a85..4b66b46f30 100644 --- a/beacon-chain/sync/rpc_send_request.go +++ b/beacon-chain/sync/rpc_send_request.go @@ -8,6 +8,7 @@ import ( "github.com/pkg/errors" types "github.com/prysmaticlabs/eth2-types" ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1" + "github.com/prysmaticlabs/prysm/beacon-chain/blockchain" "github.com/prysmaticlabs/prysm/beacon-chain/p2p" p2ptypes "github.com/prysmaticlabs/prysm/beacon-chain/p2p/types" pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1" @@ -23,7 +24,7 @@ type BeaconBlockProcessor func(block *ethpb.SignedBeaconBlock) error // SendBeaconBlocksByRangeRequest sends BeaconBlocksByRange and returns fetched blocks, if any. func SendBeaconBlocksByRangeRequest( - ctx context.Context, p2pProvider p2p.P2P, pid peer.ID, + ctx context.Context, chain blockchain.ChainInfoFetcher, p2pProvider p2p.P2P, pid peer.ID, req *pb.BeaconBlocksByRangeRequest, blockProcessor BeaconBlockProcessor, ) ([]*ethpb.SignedBeaconBlock, error) { stream, err := p2pProvider.Send(ctx, req, p2p.RPCBlocksByRangeTopicV1, pid) @@ -44,7 +45,7 @@ func SendBeaconBlocksByRangeRequest( var prevSlot types.Slot for i := uint64(0); ; i++ { isFirstChunk := i == 0 - blk, err := ReadChunkedBlock(stream, p2pProvider, isFirstChunk) + blk, err := ReadChunkedBlock(stream, chain, p2pProvider, isFirstChunk) if errors.Is(err, io.EOF) { break } @@ -81,7 +82,7 @@ func SendBeaconBlocksByRangeRequest( // SendBeaconBlocksByRootRequest sends BeaconBlocksByRoot and returns fetched blocks, if any. func SendBeaconBlocksByRootRequest( - ctx context.Context, p2pProvider p2p.P2P, pid peer.ID, + ctx context.Context, chain blockchain.ChainInfoFetcher, p2pProvider p2p.P2P, pid peer.ID, req *p2ptypes.BeaconBlockByRootsReq, blockProcessor BeaconBlockProcessor, ) ([]*ethpb.SignedBeaconBlock, error) { stream, err := p2pProvider.Send(ctx, req, p2p.RPCBlocksByRootTopicV1, pid) @@ -105,7 +106,7 @@ func SendBeaconBlocksByRootRequest( break } isFirstChunk := i == 0 - blk, err := ReadChunkedBlock(stream, p2pProvider, isFirstChunk) + blk, err := ReadChunkedBlock(stream, chain, p2pProvider, isFirstChunk) if errors.Is(err, io.EOF) { break } diff --git a/beacon-chain/sync/rpc_send_request_test.go b/beacon-chain/sync/rpc_send_request_test.go index 88b1fad10c..9cc0d4bf05 100644 --- a/beacon-chain/sync/rpc_send_request_test.go +++ b/beacon-chain/sync/rpc_send_request_test.go @@ -33,7 +33,7 @@ func TestSendRequest_SendBeaconBlocksByRangeRequest(t *testing.T) { p1.Connect(bogusPeer) req := &pb.BeaconBlocksByRangeRequest{} - _, err := SendBeaconBlocksByRangeRequest(ctx, p1, bogusPeer.PeerID(), req, nil) + _, err := SendBeaconBlocksByRangeRequest(ctx, nil, p1, bogusPeer.PeerID(), req, nil) assert.ErrorContains(t, "protocol not supported", err) }) @@ -78,7 +78,7 @@ func TestSendRequest_SendBeaconBlocksByRangeRequest(t *testing.T) { if uint64(i) >= uint64(len(knownBlocks)) { break } - err = WriteChunk(stream, p2pProvider.Encoding(), knownBlocks[i]) + err = WriteChunk(stream, nil, p2pProvider.Encoding(), knownBlocks[i]) if err != nil && err.Error() != mux.ErrReset.Error() { require.NoError(t, err) } @@ -97,7 +97,7 @@ func TestSendRequest_SendBeaconBlocksByRangeRequest(t *testing.T) { Count: 128, Step: 1, } - blocks, err := SendBeaconBlocksByRangeRequest(ctx, p1, p2.PeerID(), req, nil) + blocks, err := SendBeaconBlocksByRangeRequest(ctx, nil, p1, p2.PeerID(), req, nil) assert.NoError(t, err) assert.Equal(t, 128, len(blocks)) }) @@ -115,7 +115,7 @@ func TestSendRequest_SendBeaconBlocksByRangeRequest(t *testing.T) { Step: 1, } blocksFromProcessor := make([]*eth.SignedBeaconBlock, 0) - blocks, err := SendBeaconBlocksByRangeRequest(ctx, p1, p2.PeerID(), req, func(block *eth.SignedBeaconBlock) error { + blocks, err := SendBeaconBlocksByRangeRequest(ctx, nil, p1, p2.PeerID(), req, func(block *eth.SignedBeaconBlock) error { blocksFromProcessor = append(blocksFromProcessor, block) return nil }) @@ -137,7 +137,7 @@ func TestSendRequest_SendBeaconBlocksByRangeRequest(t *testing.T) { Step: 1, } errFromProcessor := errors.New("processor error") - _, err := SendBeaconBlocksByRangeRequest(ctx, p1, p2.PeerID(), req, func(block *eth.SignedBeaconBlock) error { + _, err := SendBeaconBlocksByRangeRequest(ctx, nil, p1, p2.PeerID(), req, func(block *eth.SignedBeaconBlock) error { return errFromProcessor }) assert.ErrorContains(t, errFromProcessor.Error(), err) @@ -155,7 +155,7 @@ func TestSendRequest_SendBeaconBlocksByRangeRequest(t *testing.T) { Count: 128, Step: 1, } - blocks, err := SendBeaconBlocksByRangeRequest(ctx, p1, p2.PeerID(), req, nil) + blocks, err := SendBeaconBlocksByRangeRequest(ctx, nil, p1, p2.PeerID(), req, nil) assert.NoError(t, err) assert.Equal(t, 128, len(blocks)) @@ -166,7 +166,7 @@ func TestSendRequest_SendBeaconBlocksByRangeRequest(t *testing.T) { cfg.MaxRequestBlocks = maxRequestBlocks params.OverrideBeaconNetworkConfig(cfg) }() - blocks, err = SendBeaconBlocksByRangeRequest(ctx, p1, p2.PeerID(), req, func(block *eth.SignedBeaconBlock) error { + blocks, err = SendBeaconBlocksByRangeRequest(ctx, nil, p1, p2.PeerID(), req, func(block *eth.SignedBeaconBlock) error { // Since ssz checks the boundaries, and doesn't normally allow to send requests bigger than // the max request size, we are updating max request size dynamically. Even when updated dynamically, // no more than max request size of blocks is expected on return. @@ -197,7 +197,7 @@ func TestSendRequest_SendBeaconBlocksByRangeRequest(t *testing.T) { Count: 128, Step: 1, } - blocks, err := SendBeaconBlocksByRangeRequest(ctx, p1, p2.PeerID(), req, nil) + blocks, err := SendBeaconBlocksByRangeRequest(ctx, nil, p1, p2.PeerID(), req, nil) assert.ErrorContains(t, expectedErr.Error(), err) assert.Equal(t, 0, len(blocks)) }) @@ -225,7 +225,7 @@ func TestSendRequest_SendBeaconBlocksByRangeRequest(t *testing.T) { if uint64(i) >= uint64(len(knownBlocks)) { break } - err = WriteChunk(stream, p2.Encoding(), knownBlocks[i]) + err = WriteChunk(stream, nil, p2.Encoding(), knownBlocks[i]) if err != nil && err.Error() != mux.ErrReset.Error() { require.NoError(t, err) } @@ -237,7 +237,7 @@ func TestSendRequest_SendBeaconBlocksByRangeRequest(t *testing.T) { Count: 128, Step: 1, } - blocks, err := SendBeaconBlocksByRangeRequest(ctx, p1, p2.PeerID(), req, nil) + blocks, err := SendBeaconBlocksByRangeRequest(ctx, nil, p1, p2.PeerID(), req, nil) assert.ErrorContains(t, ErrInvalidFetchedData.Error(), err) assert.Equal(t, 0, len(blocks)) @@ -266,7 +266,7 @@ func TestSendRequest_SendBeaconBlocksByRangeRequest(t *testing.T) { if uint64(i) >= uint64(len(knownBlocks)) { break } - err = WriteChunk(stream, p2.Encoding(), knownBlocks[i]) + err = WriteChunk(stream, nil, p2.Encoding(), knownBlocks[i]) if err != nil && err.Error() != mux.ErrReset.Error() { require.NoError(t, err) } @@ -278,7 +278,7 @@ func TestSendRequest_SendBeaconBlocksByRangeRequest(t *testing.T) { Count: 128, Step: 10, } - blocks, err := SendBeaconBlocksByRangeRequest(ctx, p1, p2.PeerID(), req, nil) + blocks, err := SendBeaconBlocksByRangeRequest(ctx, nil, p1, p2.PeerID(), req, nil) assert.ErrorContains(t, ErrInvalidFetchedData.Error(), err) assert.Equal(t, 0, len(blocks)) @@ -307,7 +307,7 @@ func TestSendRequest_SendBeaconBlocksByRootRequest(t *testing.T) { p1.Connect(bogusPeer) req := &p2pTypes.BeaconBlockByRootsReq{} - _, err := SendBeaconBlocksByRootRequest(ctx, p1, bogusPeer.PeerID(), req, nil) + _, err := SendBeaconBlocksByRootRequest(ctx, nil, p1, bogusPeer.PeerID(), req, nil) assert.ErrorContains(t, "protocol not supported", err) }) @@ -354,7 +354,7 @@ func TestSendRequest_SendBeaconBlocksByRootRequest(t *testing.T) { p2.SetStreamHandler(pcl, knownBlocksProvider(p2, nil)) req := &p2pTypes.BeaconBlockByRootsReq{knownRoots[0], knownRoots[1]} - blocks, err := SendBeaconBlocksByRootRequest(ctx, p1, p2.PeerID(), req, nil) + blocks, err := SendBeaconBlocksByRootRequest(ctx, nil, p1, p2.PeerID(), req, nil) assert.NoError(t, err) assert.Equal(t, 2, len(blocks)) }) @@ -368,7 +368,7 @@ func TestSendRequest_SendBeaconBlocksByRootRequest(t *testing.T) { // No error from block processor. req := &p2pTypes.BeaconBlockByRootsReq{knownRoots[0], knownRoots[1]} blocksFromProcessor := make([]*eth.SignedBeaconBlock, 0) - blocks, err := SendBeaconBlocksByRootRequest(ctx, p1, p2.PeerID(), req, func(block *eth.SignedBeaconBlock) error { + blocks, err := SendBeaconBlocksByRootRequest(ctx, nil, p1, p2.PeerID(), req, func(block *eth.SignedBeaconBlock) error { blocksFromProcessor = append(blocksFromProcessor, block) return nil }) @@ -386,7 +386,7 @@ func TestSendRequest_SendBeaconBlocksByRootRequest(t *testing.T) { // Send error from block processor. req := &p2pTypes.BeaconBlockByRootsReq{knownRoots[0], knownRoots[1]} errFromProcessor := errors.New("processor error") - _, err := SendBeaconBlocksByRootRequest(ctx, p1, p2.PeerID(), req, func(block *eth.SignedBeaconBlock) error { + _, err := SendBeaconBlocksByRootRequest(ctx, nil, p1, p2.PeerID(), req, func(block *eth.SignedBeaconBlock) error { return errFromProcessor }) assert.ErrorContains(t, errFromProcessor.Error(), err) @@ -400,7 +400,7 @@ func TestSendRequest_SendBeaconBlocksByRootRequest(t *testing.T) { // No cap on max roots. req := &p2pTypes.BeaconBlockByRootsReq{knownRoots[0], knownRoots[1], knownRoots[2], knownRoots[3]} - blocks, err := SendBeaconBlocksByRootRequest(ctx, p1, p2.PeerID(), req, nil) + blocks, err := SendBeaconBlocksByRootRequest(ctx, nil, p1, p2.PeerID(), req, nil) assert.NoError(t, err) assert.Equal(t, 4, len(blocks)) @@ -411,7 +411,7 @@ func TestSendRequest_SendBeaconBlocksByRootRequest(t *testing.T) { cfg.MaxRequestBlocks = maxRequestBlocks params.OverrideBeaconNetworkConfig(cfg) }() - blocks, err = SendBeaconBlocksByRootRequest(ctx, p1, p2.PeerID(), req, func(block *eth.SignedBeaconBlock) error { + blocks, err = SendBeaconBlocksByRootRequest(ctx, nil, p1, p2.PeerID(), req, func(block *eth.SignedBeaconBlock) error { // Since ssz checks the boundaries, and doesn't normally allow to send requests bigger than // the max request size, we are updating max request size dynamically. Even when updated dynamically, // no more than max request size of blocks is expected on return. @@ -438,7 +438,7 @@ func TestSendRequest_SendBeaconBlocksByRootRequest(t *testing.T) { })) req := &p2pTypes.BeaconBlockByRootsReq{knownRoots[0], knownRoots[1], knownRoots[2], knownRoots[3]} - blocks, err := SendBeaconBlocksByRootRequest(ctx, p1, p2.PeerID(), req, nil) + blocks, err := SendBeaconBlocksByRootRequest(ctx, nil, p1, p2.PeerID(), req, nil) assert.ErrorContains(t, expectedErr.Error(), err) assert.Equal(t, 0, len(blocks)) }) @@ -458,7 +458,7 @@ func TestSendRequest_SendBeaconBlocksByRootRequest(t *testing.T) { })) req := &p2pTypes.BeaconBlockByRootsReq{knownRoots[0], knownRoots[1], knownRoots[2], knownRoots[3]} - blocks, err := SendBeaconBlocksByRootRequest(ctx, p1, p2.PeerID(), req, nil) + blocks, err := SendBeaconBlocksByRootRequest(ctx, nil, p1, p2.PeerID(), req, nil) assert.NoError(t, err) assert.Equal(t, 3, len(blocks)) }) diff --git a/beacon-chain/sync/rpc_status.go b/beacon-chain/sync/rpc_status.go index 4cee35c194..9fadad8714 100644 --- a/beacon-chain/sync/rpc_status.go +++ b/beacon-chain/sync/rpc_status.go @@ -154,7 +154,11 @@ func (s *Service) sendRPCStatusRequest(ctx context.Context, id peer.ID) error { s.cfg.P2P.Peers().Scorers().BadResponsesScorer().Increment(id) return errors.New(errMsg) } - + // No-op for now with the rpc context. + _, err = readContextFromStream(stream, s.cfg.Chain) + if err != nil { + return err + } msg := &pb.Status{} if err := s.cfg.P2P.Encoding().DecodeWithMaxLength(stream, msg); err != nil { return err