diff --git a/beacon-chain/sync/pending_blocks_queue.go b/beacon-chain/sync/pending_blocks_queue.go index f61282a744..adab59c305 100644 --- a/beacon-chain/sync/pending_blocks_queue.go +++ b/beacon-chain/sync/pending_blocks_queue.go @@ -6,6 +6,7 @@ import ( "sort" "sync" + "github.com/gogo/protobuf/proto" "github.com/pkg/errors" ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1" "github.com/prysmaticlabs/prysm/beacon-chain/core/helpers" @@ -57,100 +58,111 @@ func (s *Service) processPendingBlocks(ctx context.Context) error { span.AddAttributes(trace.Int64Attribute("slot", int64(slot))) s.pendingQueueLock.RLock() - b := s.slotToPendingBlocks[slot] - // Skip if block does not exist. - if b == nil || b.Block == nil { + bs := s.slotToPendingBlocks[slot] + // Skip if there's no block in the queue. + if len(bs) == 0 { s.pendingQueueLock.RUnlock() span.End() continue } - inPendingQueue := s.seenPendingBlocks[bytesutil.ToBytes32(b.Block.ParentRoot)] s.pendingQueueLock.RUnlock() - blkRoot, err := stateutil.BlockRoot(b.Block) - if err != nil { - traceutil.AnnotateError(span, err) - span.End() - return err - } - parentIsBad := s.hasBadBlock(bytesutil.ToBytes32(b.Block.ParentRoot)) - blockIsBad := s.hasBadBlock(blkRoot) - // Check if parent is a bad block. - if parentIsBad || blockIsBad { - // Set block as bad if its parent block is bad too. - if parentIsBad { - s.setBadBlock(blkRoot) + // Loop through the pending queue and mark the potential parent blocks as seen. + for _, b := range bs { + if b == nil || b.Block == nil { + span.End() + continue } - // Remove block from queue. + + s.pendingQueueLock.RLock() + inPendingQueue := s.seenPendingBlocks[bytesutil.ToBytes32(b.Block.ParentRoot)] + s.pendingQueueLock.RUnlock() + + blkRoot, err := stateutil.BlockRoot(b.Block) + if err != nil { + traceutil.AnnotateError(span, err) + span.End() + return err + } + parentIsBad := s.hasBadBlock(bytesutil.ToBytes32(b.Block.ParentRoot)) + blockIsBad := s.hasBadBlock(blkRoot) + // Check if parent is a bad block. + if parentIsBad || blockIsBad { + // Set block as bad if its parent block is bad too. + if parentIsBad { + s.setBadBlock(blkRoot) + } + // Remove block from queue. + s.pendingQueueLock.Lock() + s.deleteBlockFromPendingQueue(slot, b) + delete(s.seenPendingBlocks, blkRoot) + s.pendingQueueLock.Unlock() + span.End() + continue + } + + inDB := s.db.HasBlock(ctx, bytesutil.ToBytes32(b.Block.ParentRoot)) + hasPeer := len(pids) != 0 + + // Only request for missing parent block if it's not in DB, not in pending cache + // and has peer in the peer list. + if !inPendingQueue && !inDB && hasPeer { + log.WithFields(logrus.Fields{ + "currentSlot": b.Block.Slot, + "parentRoot": hex.EncodeToString(bytesutil.Trunc(b.Block.ParentRoot)), + }).Info("Requesting parent block") + req := [][32]byte{bytesutil.ToBytes32(b.Block.ParentRoot)} + + // Start with a random peer to query, but choose the first peer in our unsorted list that claims to + // have a head slot newer than the block slot we are requesting. + pid := pids[randGen.Int()%len(pids)] + for _, p := range pids { + cs, err := s.p2p.Peers().ChainState(p) + if err != nil { + return errors.Wrap(err, "failed to read chain state for peer") + } + if cs != nil && cs.HeadSlot >= slot { + pid = p + break + } + } + + if err := s.sendRecentBeaconBlocksRequest(ctx, req, pid); err != nil { + traceutil.AnnotateError(span, err) + log.Debugf("Could not send recent block request: %v", err) + } + span.End() + continue + } + + if !inDB { + span.End() + continue + } + + if err := s.chain.ReceiveBlock(ctx, b, blkRoot); err != nil { + log.Debugf("Could not process block from slot %d: %v", b.Block.Slot, err) + s.setBadBlock(blkRoot) + traceutil.AnnotateError(span, err) + } + + // Broadcasting the block again once a node is able to process it. + if err := s.p2p.Broadcast(ctx, b); err != nil { + log.WithError(err).Debug("Failed to broadcast block") + } + s.pendingQueueLock.Lock() - delete(s.slotToPendingBlocks, slot) + s.deleteBlockFromPendingQueue(slot, b) delete(s.seenPendingBlocks, blkRoot) s.pendingQueueLock.Unlock() - span.End() - continue - } - inDB := s.db.HasBlock(ctx, bytesutil.ToBytes32(b.Block.ParentRoot)) - hasPeer := len(pids) != 0 - - // Only request for missing parent block if it's not in DB, not in pending cache - // and has peer in the peer list. - if !inPendingQueue && !inDB && hasPeer { log.WithFields(logrus.Fields{ - "currentSlot": b.Block.Slot, - "parentRoot": hex.EncodeToString(bytesutil.Trunc(b.Block.ParentRoot)), - }).Info("Requesting parent block") - req := [][32]byte{bytesutil.ToBytes32(b.Block.ParentRoot)} + "slot": slot, + "blockRoot": hex.EncodeToString(bytesutil.Trunc(blkRoot[:])), + }).Debug("Processed pending block and cleared it in cache") - // Start with a random peer to query, but choose the first peer in our unsorted list that claims to - // have a head slot newer than the block slot we are requesting. - pid := pids[randGen.Int()%len(pids)] - for _, p := range pids { - cs, err := s.p2p.Peers().ChainState(p) - if err != nil { - return errors.Wrap(err, "failed to read chain state for peer") - } - if cs != nil && cs.HeadSlot >= slot { - pid = p - break - } - } - - if err := s.sendRecentBeaconBlocksRequest(ctx, req, pid); err != nil { - traceutil.AnnotateError(span, err) - log.Debugf("Could not send recent block request: %v", err) - } span.End() - continue } - - if !inDB { - span.End() - continue - } - - if err := s.chain.ReceiveBlock(ctx, b, blkRoot); err != nil { - log.Debugf("Could not process block from slot %d: %v", b.Block.Slot, err) - s.setBadBlock(blkRoot) - traceutil.AnnotateError(span, err) - } - - // Broadcasting the block again once a node is able to process it. - if err := s.p2p.Broadcast(ctx, b); err != nil { - log.WithError(err).Debug("Failed to broadcast block") - } - - s.pendingQueueLock.Lock() - delete(s.slotToPendingBlocks, slot) - delete(s.seenPendingBlocks, blkRoot) - s.pendingQueueLock.Unlock() - - log.WithFields(logrus.Fields{ - "slot": slot, - "blockRoot": hex.EncodeToString(bytesutil.Trunc(blkRoot[:])), - }).Debug("Processed pending block and cleared it in cache") - - span.End() } return nil @@ -179,28 +191,30 @@ func (s *Service) validatePendingSlots() error { oldBlockRoots := make(map[[32]byte]bool) finalizedEpoch := s.chain.FinalizedCheckpt().Epoch - for slot, b := range s.slotToPendingBlocks { - epoch := helpers.SlotToEpoch(slot) - // remove all descendant blocks of old blocks - if oldBlockRoots[bytesutil.ToBytes32(b.Block.ParentRoot)] { - root, err := stateutil.BlockRoot(b.Block) - if err != nil { - return err + for slot, blks := range s.slotToPendingBlocks { + for _, b := range blks { + epoch := helpers.SlotToEpoch(slot) + // remove all descendant blocks of old blocks + if oldBlockRoots[bytesutil.ToBytes32(b.Block.ParentRoot)] { + root, err := stateutil.BlockRoot(b.Block) + if err != nil { + return err + } + oldBlockRoots[root] = true + s.deleteBlockFromPendingQueue(slot, b) + delete(s.seenPendingBlocks, root) + continue } - oldBlockRoots[root] = true - delete(s.slotToPendingBlocks, slot) - delete(s.seenPendingBlocks, root) - continue - } - // don't process old blocks - if finalizedEpoch > 0 && epoch <= finalizedEpoch { - blkRoot, err := stateutil.BlockRoot(b.Block) - if err != nil { - return err + // don't process old blocks + if finalizedEpoch > 0 && epoch <= finalizedEpoch { + blkRoot, err := stateutil.BlockRoot(b.Block) + if err != nil { + return err + } + oldBlockRoots[blkRoot] = true + s.deleteBlockFromPendingQueue(slot, b) + delete(s.seenPendingBlocks, blkRoot) } - oldBlockRoots[blkRoot] = true - delete(s.slotToPendingBlocks, slot) - delete(s.seenPendingBlocks, blkRoot) } } return nil @@ -209,6 +223,38 @@ func (s *Service) validatePendingSlots() error { func (s *Service) clearPendingSlots() { s.pendingQueueLock.Lock() defer s.pendingQueueLock.Unlock() - s.slotToPendingBlocks = make(map[uint64]*ethpb.SignedBeaconBlock) + s.slotToPendingBlocks = make(map[uint64][]*ethpb.SignedBeaconBlock) s.seenPendingBlocks = make(map[[32]byte]bool) } + +// Delete block from the list from the pending queue using the slot as key. +// Note: this helper is not thread safe. +func (s *Service) deleteBlockFromPendingQueue(slot uint64, b *ethpb.SignedBeaconBlock) { + blks, ok := s.slotToPendingBlocks[slot] + if !ok { + return + } + newBlks := make([]*ethpb.SignedBeaconBlock, 0, len(blks)) + for _, blk := range blks { + if proto.Equal(blk, b) { + continue + } + newBlks = append(newBlks, blk) + } + if len(newBlks) == 0 { + delete(s.slotToPendingBlocks, slot) + return + } + s.slotToPendingBlocks[slot] = newBlks +} + +// Insert block to the list in the pending queue using the slot as key. +// Note: this helper is not thread safe. +func (s *Service) insertBlockToPendingQueue(slot uint64, b *ethpb.SignedBeaconBlock) { + _, ok := s.slotToPendingBlocks[slot] + if ok { + s.slotToPendingBlocks[slot] = append(s.slotToPendingBlocks[slot], b) + return + } + s.slotToPendingBlocks[slot] = []*ethpb.SignedBeaconBlock{b} +} diff --git a/beacon-chain/sync/pending_blocks_queue_test.go b/beacon-chain/sync/pending_blocks_queue_test.go index 099944af83..23ed73ab16 100644 --- a/beacon-chain/sync/pending_blocks_queue_test.go +++ b/beacon-chain/sync/pending_blocks_queue_test.go @@ -37,7 +37,7 @@ func TestRegularSyncBeaconBlockSubscriber_ProcessPendingBlocks1(t *testing.T) { Epoch: 0, }, }, - slotToPendingBlocks: make(map[uint64]*ethpb.SignedBeaconBlock), + slotToPendingBlocks: make(map[uint64][]*ethpb.SignedBeaconBlock), seenPendingBlocks: make(map[[32]byte]bool), } err := r.initCaches() @@ -58,7 +58,7 @@ func TestRegularSyncBeaconBlockSubscriber_ProcessPendingBlocks1(t *testing.T) { require.NoError(t, err) // Add b2 to the cache - r.slotToPendingBlocks[b2.Block.Slot] = b2 + r.insertBlockToPendingQueue(b2.Block.Slot, b2) r.seenPendingBlocks[b2Root] = true require.NoError(t, r.processPendingBlocks(context.Background())) @@ -66,11 +66,15 @@ func TestRegularSyncBeaconBlockSubscriber_ProcessPendingBlocks1(t *testing.T) { assert.Equal(t, 1, len(r.seenPendingBlocks), "Incorrect size for seen pending block") // Add b1 to the cache - r.slotToPendingBlocks[b1.Block.Slot] = b1 + r.insertBlockToPendingQueue(b1.Block.Slot, b1) r.seenPendingBlocks[b1Root] = true require.NoError(t, r.db.SaveBlock(context.Background(), b1)) + + // Insert bad b1 in the cache to verify the good one doesn't get replaced. + r.insertBlockToPendingQueue(b1.Block.Slot, ðpb.SignedBeaconBlock{Block: ðpb.BeaconBlock{}}) + require.NoError(t, r.processPendingBlocks(context.Background())) - assert.Equal(t, 0, len(r.slotToPendingBlocks), "Incorrect size for slot to pending blocks cache") + assert.Equal(t, 1, len(r.slotToPendingBlocks), "Incorrect size for slot to pending blocks cache") assert.Equal(t, 0, len(r.seenPendingBlocks), "Incorrect size for seen pending block") } @@ -108,7 +112,7 @@ func TestRegularSyncBeaconBlockSubscriber_ProcessPendingBlocks2(t *testing.T) { Epoch: 0, }, }, - slotToPendingBlocks: make(map[uint64]*ethpb.SignedBeaconBlock), + slotToPendingBlocks: make(map[uint64][]*ethpb.SignedBeaconBlock), seenPendingBlocks: make(map[[32]byte]bool), } err := r.initCaches() @@ -140,9 +144,9 @@ func TestRegularSyncBeaconBlockSubscriber_ProcessPendingBlocks2(t *testing.T) { b4Root, err := ssz.HashTreeRoot(b4) require.NoError(t, err) - r.slotToPendingBlocks[b4.Slot] = ðpb.SignedBeaconBlock{Block: b4} + r.insertBlockToPendingQueue(b4.Slot, ðpb.SignedBeaconBlock{Block: b4}) r.seenPendingBlocks[b4Root] = true - r.slotToPendingBlocks[b5.Slot] = ðpb.SignedBeaconBlock{Block: b5} + r.insertBlockToPendingQueue(b5.Slot, ðpb.SignedBeaconBlock{Block: b5}) r.seenPendingBlocks[b5Root] = true require.NoError(t, r.processPendingBlocks(context.Background())) @@ -150,7 +154,7 @@ func TestRegularSyncBeaconBlockSubscriber_ProcessPendingBlocks2(t *testing.T) { assert.Equal(t, 2, len(r.seenPendingBlocks), "Incorrect size for seen pending block") // Add b3 to the cache - r.slotToPendingBlocks[b3.Slot] = ðpb.SignedBeaconBlock{Block: b3} + r.insertBlockToPendingQueue(b3.Slot, ðpb.SignedBeaconBlock{Block: b3}) r.seenPendingBlocks[b3Root] = true require.NoError(t, r.db.SaveBlock(context.Background(), ðpb.SignedBeaconBlock{Block: b3})) require.NoError(t, r.processPendingBlocks(context.Background())) @@ -158,7 +162,7 @@ func TestRegularSyncBeaconBlockSubscriber_ProcessPendingBlocks2(t *testing.T) { assert.Equal(t, 1, len(r.seenPendingBlocks), "Incorrect size for seen pending block") // Add b2 to the cache - r.slotToPendingBlocks[b2.Slot] = ðpb.SignedBeaconBlock{Block: b2} + r.insertBlockToPendingQueue(b2.Slot, ðpb.SignedBeaconBlock{Block: b2}) r.seenPendingBlocks[b2Root] = true require.NoError(t, r.db.SaveBlock(context.Background(), ðpb.SignedBeaconBlock{Block: b2})) @@ -182,7 +186,7 @@ func TestRegularSyncBeaconBlockSubscriber_PruneOldPendingBlocks(t *testing.T) { Epoch: 1, }, }, - slotToPendingBlocks: make(map[uint64]*ethpb.SignedBeaconBlock), + slotToPendingBlocks: make(map[uint64][]*ethpb.SignedBeaconBlock), seenPendingBlocks: make(map[[32]byte]bool), } err := r.initCaches() @@ -214,13 +218,13 @@ func TestRegularSyncBeaconBlockSubscriber_PruneOldPendingBlocks(t *testing.T) { b4Root, err := ssz.HashTreeRoot(b4) require.NoError(t, err) - r.slotToPendingBlocks[b2.Slot] = ðpb.SignedBeaconBlock{Block: b2} + r.insertBlockToPendingQueue(b2.Slot, ðpb.SignedBeaconBlock{Block: b2}) r.seenPendingBlocks[b2Root] = true - r.slotToPendingBlocks[b3.Slot] = ðpb.SignedBeaconBlock{Block: b3} + r.insertBlockToPendingQueue(b3.Slot, ðpb.SignedBeaconBlock{Block: b3}) r.seenPendingBlocks[b3Root] = true - r.slotToPendingBlocks[b4.Slot] = ðpb.SignedBeaconBlock{Block: b4} + r.insertBlockToPendingQueue(b4.Slot, ðpb.SignedBeaconBlock{Block: b4}) r.seenPendingBlocks[b4Root] = true - r.slotToPendingBlocks[b5.Slot] = ðpb.SignedBeaconBlock{Block: b5} + r.insertBlockToPendingQueue(b5.Slot, ðpb.SignedBeaconBlock{Block: b5}) r.seenPendingBlocks[b5Root] = true require.NoError(t, r.processPendingBlocks(context.Background())) @@ -230,14 +234,14 @@ func TestRegularSyncBeaconBlockSubscriber_PruneOldPendingBlocks(t *testing.T) { func TestService_sortedPendingSlots(t *testing.T) { r := &Service{ - slotToPendingBlocks: make(map[uint64]*ethpb.SignedBeaconBlock), + slotToPendingBlocks: make(map[uint64][]*ethpb.SignedBeaconBlock), } var lastSlot uint64 = math.MaxUint64 - r.slotToPendingBlocks[lastSlot] = ðpb.SignedBeaconBlock{} - r.slotToPendingBlocks[lastSlot-3] = ðpb.SignedBeaconBlock{} - r.slotToPendingBlocks[lastSlot-5] = ðpb.SignedBeaconBlock{} - r.slotToPendingBlocks[lastSlot-2] = ðpb.SignedBeaconBlock{} + r.insertBlockToPendingQueue(lastSlot, ðpb.SignedBeaconBlock{}) + r.insertBlockToPendingQueue(lastSlot-3, ðpb.SignedBeaconBlock{}) + r.insertBlockToPendingQueue(lastSlot-5, ðpb.SignedBeaconBlock{}) + r.insertBlockToPendingQueue(lastSlot-2, ðpb.SignedBeaconBlock{}) want := []uint64{lastSlot - 5, lastSlot - 3, lastSlot - 2, lastSlot} assert.DeepEqual(t, want, r.sortedPendingSlots(), "Unexpected pending slots list") diff --git a/beacon-chain/sync/rpc_beacon_blocks_by_root.go b/beacon-chain/sync/rpc_beacon_blocks_by_root.go index 54e7fb26b4..645a08633e 100644 --- a/beacon-chain/sync/rpc_beacon_blocks_by_root.go +++ b/beacon-chain/sync/rpc_beacon_blocks_by_root.go @@ -48,7 +48,7 @@ func (s *Service) sendRecentBeaconBlocksRequest(ctx context.Context, blockRoots return err } s.pendingQueueLock.Lock() - s.slotToPendingBlocks[blk.Block.Slot] = blk + s.insertBlockToPendingQueue(blk.Block.Slot, blk) s.seenPendingBlocks[blkRoot] = true s.pendingQueueLock.Unlock() diff --git a/beacon-chain/sync/rpc_beacon_blocks_by_root_test.go b/beacon-chain/sync/rpc_beacon_blocks_by_root_test.go index f2b1deb6e4..1068fbc64a 100644 --- a/beacon-chain/sync/rpc_beacon_blocks_by_root_test.go +++ b/beacon-chain/sync/rpc_beacon_blocks_by_root_test.go @@ -104,7 +104,7 @@ func TestRecentBeaconBlocks_RPCRequestSent(t *testing.T) { FinalizedCheckPoint: finalizedCheckpt, Root: blockARoot[:], }, - slotToPendingBlocks: make(map[uint64]*ethpb.SignedBeaconBlock), + slotToPendingBlocks: make(map[uint64][]*ethpb.SignedBeaconBlock), seenPendingBlocks: make(map[[32]byte]bool), ctx: context.Background(), rateLimiter: newRateLimiter(p1), diff --git a/beacon-chain/sync/service.go b/beacon-chain/sync/service.go index 0a265606e8..bb0a8072a8 100644 --- a/beacon-chain/sync/service.go +++ b/beacon-chain/sync/service.go @@ -82,7 +82,7 @@ type Service struct { exitPool *voluntaryexits.Pool slashingPool *slashings.Pool chain blockchainService - slotToPendingBlocks map[uint64]*ethpb.SignedBeaconBlock + slotToPendingBlocks map[uint64][]*ethpb.SignedBeaconBlock seenPendingBlocks map[[32]byte]bool blkRootToPendingAtts map[[32]byte][]*ethpb.SignedAggregateAttestationAndProof pendingAttsLock sync.RWMutex @@ -125,7 +125,7 @@ func NewRegularSync(cfg *Config) *Service { chain: cfg.Chain, initialSync: cfg.InitialSync, attestationNotifier: cfg.AttestationNotifier, - slotToPendingBlocks: make(map[uint64]*ethpb.SignedBeaconBlock), + slotToPendingBlocks: make(map[uint64][]*ethpb.SignedBeaconBlock), seenPendingBlocks: make(map[[32]byte]bool), blkRootToPendingAtts: make(map[[32]byte][]*ethpb.SignedAggregateAttestationAndProof), stateNotifier: cfg.StateNotifier, diff --git a/beacon-chain/sync/validate_beacon_blocks.go b/beacon-chain/sync/validate_beacon_blocks.go index 1c5df5c395..b3264ab594 100644 --- a/beacon-chain/sync/validate_beacon_blocks.go +++ b/beacon-chain/sync/validate_beacon_blocks.go @@ -110,7 +110,7 @@ func (s *Service) validateBeaconBlockPubSub(ctx context.Context, pid peer.ID, ms // Handle block when the parent is unknown. if !s.db.HasBlock(ctx, bytesutil.ToBytes32(blk.Block.ParentRoot)) { s.pendingQueueLock.Lock() - s.slotToPendingBlocks[blk.Block.Slot] = blk + s.insertBlockToPendingQueue(blk.Block.Slot, blk) s.seenPendingBlocks[blockRoot] = true s.pendingQueueLock.Unlock() return pubsub.ValidationIgnore diff --git a/beacon-chain/sync/validate_beacon_blocks_test.go b/beacon-chain/sync/validate_beacon_blocks_test.go index cb58c43d28..ab2cc60f2f 100644 --- a/beacon-chain/sync/validate_beacon_blocks_test.go +++ b/beacon-chain/sync/validate_beacon_blocks_test.go @@ -176,7 +176,7 @@ func TestValidateBeaconBlockPubSub_ValidProposerSignature(t *testing.T) { blockNotifier: chainService.BlockNotifier(), seenBlockCache: c, badBlockCache: c2, - slotToPendingBlocks: make(map[uint64]*ethpb.SignedBeaconBlock), + slotToPendingBlocks: make(map[uint64][]*ethpb.SignedBeaconBlock), seenPendingBlocks: make(map[[32]byte]bool), stateSummaryCache: stateSummaryCache, stateGen: stateGen, @@ -248,7 +248,7 @@ func TestValidateBeaconBlockPubSub_AdvanceEpochsForState(t *testing.T) { blockNotifier: chainService.BlockNotifier(), seenBlockCache: c, badBlockCache: c2, - slotToPendingBlocks: make(map[uint64]*ethpb.SignedBeaconBlock), + slotToPendingBlocks: make(map[uint64][]*ethpb.SignedBeaconBlock), seenPendingBlocks: make(map[[32]byte]bool), stateSummaryCache: stateSummaryCache, stateGen: stateGen, @@ -340,7 +340,7 @@ func TestValidateBeaconBlockPubSub_RejectBlocksFromFuture(t *testing.T) { blockNotifier: chainService.BlockNotifier(), seenBlockCache: c, badBlockCache: c2, - slotToPendingBlocks: make(map[uint64]*ethpb.SignedBeaconBlock), + slotToPendingBlocks: make(map[uint64][]*ethpb.SignedBeaconBlock), seenPendingBlocks: make(map[[32]byte]bool), } @@ -455,7 +455,7 @@ func TestValidateBeaconBlockPubSub_SeenProposerSlot(t *testing.T) { blockNotifier: chainService.BlockNotifier(), seenBlockCache: c, badBlockCache: c2, - slotToPendingBlocks: make(map[uint64]*ethpb.SignedBeaconBlock), + slotToPendingBlocks: make(map[uint64][]*ethpb.SignedBeaconBlock), seenPendingBlocks: make(map[[32]byte]bool), stateSummaryCache: cache.NewStateSummaryCache(), } @@ -592,7 +592,7 @@ func TestValidateBeaconBlockPubSub_ParentNotFinalizedDescendant(t *testing.T) { blockNotifier: chainService.BlockNotifier(), seenBlockCache: c, badBlockCache: c2, - slotToPendingBlocks: make(map[uint64]*ethpb.SignedBeaconBlock), + slotToPendingBlocks: make(map[uint64][]*ethpb.SignedBeaconBlock), seenPendingBlocks: make(map[[32]byte]bool), stateSummaryCache: stateSummaryCache, stateGen: stateGen, @@ -665,7 +665,7 @@ func TestValidateBeaconBlockPubSub_InvalidParentBlock(t *testing.T) { blockNotifier: chainService.BlockNotifier(), seenBlockCache: c, badBlockCache: c2, - slotToPendingBlocks: make(map[uint64]*ethpb.SignedBeaconBlock), + slotToPendingBlocks: make(map[uint64][]*ethpb.SignedBeaconBlock), seenPendingBlocks: make(map[[32]byte]bool), stateSummaryCache: stateSummaryCache, stateGen: stateGen,