From 7744c3ae479dd1770d1d4a0298cfed4bed6c9ef6 Mon Sep 17 00:00:00 2001 From: Nishant Das Date: Thu, 20 Aug 2020 12:50:14 +0800 Subject: [PATCH] Batch Block Roots Requesting (#7027) * checkpoint * Merge branch 'master' of https://github.com/prysmaticlabs/geth-sharding into batchRootReq * test * gaz * fix test * comment * Merge refs/heads/master into multipleBranchProcessing * Merge refs/heads/master into multipleBranchProcessing * Merge refs/heads/master into multipleBranchProcessing * Merge refs/heads/master into multipleBranchProcessing * Merge refs/heads/master into multipleBranchProcessing * Merge branch 'master' of https://github.com/prysmaticlabs/geth-sharding into batchRootReq * terence's review * Merge branch 'multipleBranchProcessing' of https://github.com/prysmaticlabs/geth-sharding into batchRootReq --- beacon-chain/p2p/service.go | 2 +- beacon-chain/sync/BUILD.bazel | 1 + .../sync/pending_attestations_queue.go | 36 +------- .../sync/pending_attestations_queue_test.go | 2 +- beacon-chain/sync/pending_blocks_queue.go | 69 +++++++++++----- .../sync/pending_blocks_queue_test.go | 82 +++++++++++++++++++ beacon-chain/sync/utils.go | 13 +++ 7 files changed, 151 insertions(+), 54 deletions(-) diff --git a/beacon-chain/p2p/service.go b/beacon-chain/p2p/service.go index 79647a49bf..b1d5c28e7e 100644 --- a/beacon-chain/p2p/service.go +++ b/beacon-chain/p2p/service.go @@ -57,7 +57,7 @@ const maxBadResponses = 5 const cacheNumCounters, cacheMaxCost, cacheBufferItems = 1000, 1000, 64 // maxDialTimeout is the timeout for a single peer dial. -const maxDialTimeout = 30 * time.Second +var maxDialTimeout = params.BeaconNetworkConfig().RespTimeout // Service for managing peer to peer (p2p) networking. type Service struct { diff --git a/beacon-chain/sync/BUILD.bazel b/beacon-chain/sync/BUILD.bazel index 2733090daf..e0cd43f8d3 100644 --- a/beacon-chain/sync/BUILD.bazel +++ b/beacon-chain/sync/BUILD.bazel @@ -153,6 +153,7 @@ go_test( "//shared/bytesutil:go_default_library", "//shared/featureconfig:go_default_library", "//shared/params:go_default_library", + "//shared/rand:go_default_library", "//shared/roughtime:go_default_library", "//shared/testutil:go_default_library", "//shared/testutil/assert:go_default_library", diff --git a/beacon-chain/sync/pending_attestations_queue.go b/beacon-chain/sync/pending_attestations_queue.go index 75410f8edd..0ceb04e1af 100644 --- a/beacon-chain/sync/pending_attestations_queue.go +++ b/beacon-chain/sync/pending_attestations_queue.go @@ -3,11 +3,9 @@ package sync import ( "context" "encoding/hex" - "io" "sync" pubsub "github.com/libp2p/go-libp2p-pubsub" - "github.com/pkg/errors" ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1" "github.com/prysmaticlabs/prysm/beacon-chain/core/helpers" "github.com/prysmaticlabs/prysm/shared/bls" @@ -17,7 +15,6 @@ import ( "github.com/prysmaticlabs/prysm/shared/rand" "github.com/prysmaticlabs/prysm/shared/runutil" "github.com/prysmaticlabs/prysm/shared/slotutil" - "github.com/prysmaticlabs/prysm/shared/traceutil" "github.com/sirupsen/logrus" "go.opencensus.io/trace" ) @@ -47,8 +44,6 @@ func (s *Service) processPendingAtts(ctx context.Context) error { ctx, span := trace.StartSpan(ctx, "processPendingAtts") defer span.End() - pids := s.p2p.Peers().Connected() - // Before a node processes pending attestations queue, it verifies // the attestations in the queue are still valid. Attestations will // be deleted from the queue if invalid (ie. getting staled from falling too many slots behind). @@ -61,6 +56,7 @@ func (s *Service) processPendingAtts(ctx context.Context) error { } s.pendingAttsLock.RUnlock() + pendingRoots := [][32]byte{} randGen := rand.NewGenerator() for _, bRoot := range roots { s.pendingAttsLock.RLock() @@ -123,36 +119,10 @@ func (s *Service) processPendingAtts(ctx context.Context) error { "attCount": len(attestations), "blockRoot": hex.EncodeToString(bytesutil.Trunc(bRoot[:])), }).Debug("Requesting block for pending attestation") - - // Start with a random peer to query, but choose the first peer in our unsorted list that claims to - // have a head slot newer or equal to the pending attestation's target boundary slot. - // If there are no peer id's available, then we should exit from this function. The function will - // be run again periodically, and there may be peers available in future runs. - if len(pids) == 0 { - log.Debug("No peer IDs available to request missing block from for pending attestation") - return nil - } - pid := pids[randGen.Int()%len(pids)] - targetSlot := helpers.SlotToEpoch(attestations[0].Message.Aggregate.Data.Target.Epoch) - for _, p := range pids { - cs, err := s.p2p.Peers().ChainState(p) - if err != nil { - return errors.Wrap(err, "could not get chain state for peer") - } - if cs != nil && cs.HeadSlot >= targetSlot { - pid = p - break - } - } - - req := [][32]byte{bRoot} - if err := s.sendRecentBeaconBlocksRequest(ctx, req, pid); err != nil && err == io.EOF { - traceutil.AnnotateError(span, err) - log.Debugf("Could not send recent block request: %v", err) - } + pendingRoots = append(pendingRoots, bRoot) } } - return nil + return s.sendBatchRootRequest(ctx, pendingRoots, randGen) } // This defines how pending attestations is saved in the map. The key is the diff --git a/beacon-chain/sync/pending_attestations_queue_test.go b/beacon-chain/sync/pending_attestations_queue_test.go index cfe998f374..c99a19e126 100644 --- a/beacon-chain/sync/pending_attestations_queue_test.go +++ b/beacon-chain/sync/pending_attestations_queue_test.go @@ -44,7 +44,7 @@ func TestProcessPendingAtts_NoBlockRequestBlock(t *testing.T) { r := &Service{ p2p: p1, db: db, - chain: &mock.ChainService{Genesis: roughtime.Now()}, + chain: &mock.ChainService{Genesis: roughtime.Now(), FinalizedCheckPoint: ðpb.Checkpoint{}}, blkRootToPendingAtts: make(map[[32]byte][]*ethpb.SignedAggregateAttestationAndProof), stateSummaryCache: cache.NewStateSummaryCache(), } diff --git a/beacon-chain/sync/pending_blocks_queue.go b/beacon-chain/sync/pending_blocks_queue.go index 97901a1f37..9c1ab12586 100644 --- a/beacon-chain/sync/pending_blocks_queue.go +++ b/beacon-chain/sync/pending_blocks_queue.go @@ -12,6 +12,7 @@ import ( "github.com/prysmaticlabs/prysm/beacon-chain/core/helpers" "github.com/prysmaticlabs/prysm/beacon-chain/state/stateutil" "github.com/prysmaticlabs/prysm/shared/bytesutil" + "github.com/prysmaticlabs/prysm/shared/params" "github.com/prysmaticlabs/prysm/shared/rand" "github.com/prysmaticlabs/prysm/shared/runutil" "github.com/prysmaticlabs/prysm/shared/slotutil" @@ -22,6 +23,9 @@ import ( var processPendingBlocksPeriod = slotutil.DivideSlotBy(3 /* times per slot */) +const maxPeerRequest = 50 +const numOfTries = 5 + // processes pending blocks queue on every processPendingBlocksPeriod func (s *Service) processPendingBlocksQueue() { ctx := context.Background() @@ -46,6 +50,7 @@ func (s *Service) processPendingBlocks(ctx context.Context) error { return errors.Wrap(err, "could not validate pending slots") } slots := s.sortedPendingSlots() + parentRoots := [][32]byte{} span.AddAttributes( trace.Int64Attribute("numSlots", int64(len(slots))), @@ -110,26 +115,8 @@ func (s *Service) processPendingBlocks(ctx context.Context) error { "currentSlot": b.Block.Slot, "parentRoot": hex.EncodeToString(bytesutil.Trunc(b.Block.ParentRoot)), }).Info("Requesting parent block") - req := [][32]byte{bytesutil.ToBytes32(b.Block.ParentRoot)} + parentRoots = append(parentRoots, bytesutil.ToBytes32(b.Block.ParentRoot)) - // Start with a random peer to query, but choose the first peer in our unsorted list that claims to - // have a head slot newer than the block slot we are requesting. - pid := pids[randGen.Int()%len(pids)] - for _, p := range pids { - cs, err := s.p2p.Peers().ChainState(p) - if err != nil { - return errors.Wrap(err, "failed to read chain state for peer") - } - if cs != nil && cs.HeadSlot >= slot { - pid = p - break - } - } - - if err := s.sendRecentBeaconBlocksRequest(ctx, req, pid); err != nil { - traceutil.AnnotateError(span, err) - log.Debugf("Could not send recent block request: %v", err) - } span.End() continue } @@ -162,6 +149,50 @@ func (s *Service) processPendingBlocks(ctx context.Context) error { } } + return s.sendBatchRootRequest(ctx, parentRoots, randGen) +} + +func (s *Service) sendBatchRootRequest(ctx context.Context, roots [][32]byte, randGen *rand.Rand) error { + ctx, span := trace.StartSpan(ctx, "sendBatchRootRequest") + defer span.End() + + if len(roots) == 0 { + return nil + } + + _, bestPeers := s.p2p.Peers().BestFinalized(maxPeerRequest, s.chain.FinalizedCheckpt().Epoch) + if len(bestPeers) == 0 { + return nil + } + roots = s.dedupRoots(roots) + // Randomly choose a peer to query from our best peers. If that peer cannot return + // all the requested blocks, we randomly select another peer. + pid := bestPeers[randGen.Int()%len(bestPeers)] + for i := 0; i < numOfTries; i++ { + req := roots + if len(roots) > int(params.BeaconNetworkConfig().MaxRequestBlocks) { + req = roots[:params.BeaconNetworkConfig().MaxRequestBlocks] + } + if err := s.sendRecentBeaconBlocksRequest(ctx, req, pid); err != nil { + traceutil.AnnotateError(span, err) + log.Debugf("Could not send recent block request: %v", err) + } + newRoots := make([][32]byte, 0, len(roots)) + s.pendingQueueLock.RLock() + for _, rt := range roots { + if !s.seenPendingBlocks[rt] { + newRoots = append(newRoots, rt) + } + } + s.pendingQueueLock.RUnlock() + if len(newRoots) == 0 { + break + } + // Choosing a new peer with the leftover set of + // roots to request. + roots = newRoots + pid = bestPeers[randGen.Int()%len(bestPeers)] + } return nil } diff --git a/beacon-chain/sync/pending_blocks_queue_test.go b/beacon-chain/sync/pending_blocks_queue_test.go index 745badc1c4..8745407504 100644 --- a/beacon-chain/sync/pending_blocks_queue_test.go +++ b/beacon-chain/sync/pending_blocks_queue_test.go @@ -5,6 +5,7 @@ import ( "math" "sync" "testing" + "time" "github.com/ethereum/go-ethereum/p2p/enr" "github.com/libp2p/go-libp2p-core/network" @@ -17,6 +18,8 @@ import ( p2ptest "github.com/prysmaticlabs/prysm/beacon-chain/p2p/testing" "github.com/prysmaticlabs/prysm/beacon-chain/state/stateutil" pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1" + "github.com/prysmaticlabs/prysm/shared/rand" + "github.com/prysmaticlabs/prysm/shared/testutil" "github.com/prysmaticlabs/prysm/shared/testutil/assert" "github.com/prysmaticlabs/prysm/shared/testutil/require" ) @@ -278,3 +281,82 @@ func TestService_sortedPendingSlots(t *testing.T) { want := []uint64{lastSlot - 5, lastSlot - 3, lastSlot - 2, lastSlot} assert.DeepEqual(t, want, r.sortedPendingSlots(), "Unexpected pending slots list") } + +func TestService_BatchRootRequest(t *testing.T) { + db, _ := dbtest.SetupDB(t) + p1 := p2ptest.NewTestP2P(t) + p2 := p2ptest.NewTestP2P(t) + p1.Connect(p2) + assert.Equal(t, 1, len(p1.BHost.Network().Peers()), "Expected peers to be connected") + + r := &Service{ + p2p: p1, + db: db, + chain: &mock.ChainService{ + FinalizedCheckPoint: ðpb.Checkpoint{ + Epoch: 1, + }, + }, + slotToPendingBlocks: make(map[uint64][]*ethpb.SignedBeaconBlock), + seenPendingBlocks: make(map[[32]byte]bool), + } + + err := r.initCaches() + require.NoError(t, err) + p1.Peers().Add(new(enr.Record), p2.PeerID(), nil, network.DirOutbound) + p1.Peers().SetConnectionState(p2.PeerID(), peers.PeerConnected) + p1.Peers().SetChainState(p2.PeerID(), &pb.Status{FinalizedEpoch: 2}) + + b0 := ðpb.SignedBeaconBlock{Block: ðpb.BeaconBlock{}} + require.NoError(t, r.db.SaveBlock(context.Background(), b0)) + b0Root, err := stateutil.BlockRoot(b0.Block) + require.NoError(t, err) + b1 := ðpb.SignedBeaconBlock{Block: ðpb.BeaconBlock{Slot: 1, ParentRoot: b0Root[:]}} + require.NoError(t, r.db.SaveBlock(context.Background(), b1)) + b1Root, err := stateutil.BlockRoot(b1.Block) + require.NoError(t, err) + + b2 := ðpb.BeaconBlock{Slot: 2, ParentRoot: b1Root[:]} + b2Root, err := ssz.HashTreeRoot(b2) + require.NoError(t, err) + b5 := ðpb.BeaconBlock{Slot: 5, ParentRoot: b2Root[:]} + b5Root, err := ssz.HashTreeRoot(b5) + require.NoError(t, err) + b3 := ðpb.BeaconBlock{Slot: 3, ParentRoot: b0Root[:]} + b3Root, err := ssz.HashTreeRoot(b3) + require.NoError(t, err) + b4 := ðpb.BeaconBlock{Slot: 4, ParentRoot: b3Root[:]} + b4Root, err := ssz.HashTreeRoot(b4) + require.NoError(t, err) + + // Send in duplicated roots to also test deduplicaton. + sentRoots := [][32]byte{b2Root, b2Root, b3Root, b3Root, b4Root, b5Root} + expectedRoots := [][32]byte{b2Root, b3Root, b4Root, b5Root} + + pcl := protocol.ID("/eth2/beacon_chain/req/beacon_blocks_by_root/1/ssz_snappy") + var wg sync.WaitGroup + wg.Add(1) + p2.BHost.SetStreamHandler(pcl, func(stream network.Stream) { + defer wg.Done() + out := [][32]byte{} + assert.NoError(t, p2.Encoding().DecodeWithMaxLength(stream, &out)) + assert.DeepEqual(t, expectedRoots, out, "Did not receive expected message") + response := []*ethpb.SignedBeaconBlock{{Block: b2}, + {Block: b3}, {Block: b4}, {Block: b5}} + for _, blk := range response { + _, err := stream.Write([]byte{responseCodeSuccess}) + assert.NoError(t, err, "Failed to write to stream") + _, err = p2.Encoding().EncodeWithMaxLength(stream, blk) + assert.NoError(t, err, "Could not send response back") + } + assert.NoError(t, stream.Close()) + }) + + require.NoError(t, r.sendBatchRootRequest(context.Background(), sentRoots, rand.NewGenerator())) + + if testutil.WaitTimeout(&wg, 1*time.Second) { + t.Fatal("Did not receive stream within 1 sec") + } + assert.Equal(t, 4, len(r.slotToPendingBlocks), "Incorrect size for slot to pending blocks cache") + assert.Equal(t, 4, len(r.seenPendingBlocks), "Incorrect size for seen pending block") +} diff --git a/beacon-chain/sync/utils.go b/beacon-chain/sync/utils.go index 4fc4cf2a36..337a497b10 100644 --- a/beacon-chain/sync/utils.go +++ b/beacon-chain/sync/utils.go @@ -49,6 +49,19 @@ func (s *Service) dedupBlocksAndRoots(blks []*ethpb.SignedBeaconBlock, roots [][ return newBlks, newRoots, nil } +func (s *Service) dedupRoots(roots [][32]byte) [][32]byte { + newRoots := make([][32]byte, 0, len(roots)) + rootMap := make(map[[32]byte]bool, len(roots)) + for i, r := range roots { + if rootMap[r] { + continue + } + rootMap[r] = true + newRoots = append(newRoots, roots[i]) + } + return newRoots +} + // sort the provided blocks and roots in ascending order. This method assumes that the size of // block slice and root slice is equal. func (s *Service) sortBlocksAndRoots(blks []*ethpb.SignedBeaconBlock, roots [][32]byte) ([]*ethpb.SignedBeaconBlock, [][32]byte) {