diff --git a/beacon-chain/sync/pending_blocks_queue.go b/beacon-chain/sync/pending_blocks_queue.go index 2daf49b752..5bebd8c3ce 100644 --- a/beacon-chain/sync/pending_blocks_queue.go +++ b/beacon-chain/sync/pending_blocks_queue.go @@ -49,81 +49,68 @@ func (s *Service) processPendingBlocksQueue() { }) } -// processes the block tree inside the queue +// processPendingBlocks validates, processes, and broadcasts pending blocks. func (s *Service) processPendingBlocks(ctx context.Context) error { ctx, span := trace.StartSpan(ctx, "processPendingBlocks") defer span.End() - pids := s.cfg.p2p.Peers().Connected() + // Validate pending slots before processing. if err := s.validatePendingSlots(); err != nil { return errors.Wrap(err, "could not validate pending slots") } - ss := s.sortedPendingSlots() - var parentRoots [][32]byte - span.AddAttributes( - trace.Int64Attribute("numSlots", int64(len(ss))), - trace.Int64Attribute("numPeers", int64(len(pids))), - ) + // Sort slots for ordered processing. + sortedSlots := s.sortedPendingSlots() + + span.AddAttributes(trace.Int64Attribute("numSlots", int64(len(sortedSlots))), trace.Int64Attribute("numPeers", int64(len(s.cfg.p2p.Peers().Connected())))) randGen := rand.NewGenerator() - for _, slot := range ss { - // process the blocks during their respective slot. - // otherwise wait for the right slot to process the block. + var parentRoots [][32]byte + + // Iterate through sorted slots. + for _, slot := range sortedSlots { + // Skip processing if slot is in the future. if slot > s.cfg.clock.CurrentSlot() { continue } - ctx, span := trace.StartSpan(ctx, "processPendingBlocks.InnerLoop") - span.AddAttributes(trace.Int64Attribute("slot", int64(slot))) // lint:ignore uintcast -- This conversion is OK for tracing. + ctx, span := startInnerSpan(ctx, slot) - s.pendingQueueLock.RLock() - bs := s.pendingBlocksInCache(slot) - // Skip if there's no block in the queue. - if len(bs) == 0 { - s.pendingQueueLock.RUnlock() + // Get blocks in the pending queue for the current slot. + blocksInCache := s.getBlocksInQueue(slot) + if len(blocksInCache) == 0 { span.End() continue } - s.pendingQueueLock.RUnlock() - // Loop through the pending queue and mark the potential parent blocks as seen. - for _, b := range bs { - if b == nil || b.IsNil() || b.Block().IsNil() { - span.End() + // Process each block in the queue. + for _, b := range blocksInCache { + if err := blocks.BeaconBlockIsNil(b); err != nil { continue } - blkRoot, err := b.Block().HashTreeRoot() if err != nil { - tracing.AnnotateError(span, err) - span.End() return err } - // No need to process the same block if we are already processing it + + // Skip blocks that are already being processed. if s.cfg.chain.BlockBeingSynced(blkRoot) { - rootString := fmt.Sprintf("%#x", blkRoot) - log.WithField("BlockRoot", rootString).Info("Skipping pending block already being processed") + log.WithField("BlockRoot", fmt.Sprintf("%#x", blkRoot)).Info("Skipping pending block already being processed") continue } - inDB := s.cfg.beaconDB.HasBlock(ctx, blkRoot) - // No need to process the same block twice. - if inDB { - s.pendingQueueLock.Lock() - if err = s.deleteBlockFromPendingQueue(slot, b, blkRoot); err != nil { - s.pendingQueueLock.Unlock() + // Remove and skip blocks already in the database. + if s.cfg.beaconDB.HasBlock(ctx, blkRoot) { + if err := s.removeBlockFromQueue(b, blkRoot); err != nil { return err } - s.pendingQueueLock.Unlock() - span.End() continue } - s.pendingQueueLock.RLock() - inPendingQueue := s.seenPendingBlocks[b.Block().ParentRoot()] - s.pendingQueueLock.RUnlock() + parentRoot := b.Block().ParentRoot() + inPendingQueue := s.isBlockInQueue(parentRoot) + // Check if block is bad. keepProcessing, err := s.checkIfBlockIsBad(ctx, span, slot, b, blkRoot) if err != nil { return err @@ -132,85 +119,104 @@ func (s *Service) processPendingBlocks(ctx context.Context) error { continue } - parentRoot := b.Block().ParentRoot() - parentInDb := s.cfg.beaconDB.HasBlock(ctx, parentRoot) - hasPeer := len(pids) != 0 - - // Only request for missing parent block if it's not in beaconDB, not in pending cache - // and has peer in the peer list. - if !inPendingQueue && !parentInDb && hasPeer { - log.WithFields(logrus.Fields{ - "currentSlot": b.Block().Slot(), - "parentRoot": hex.EncodeToString(bytesutil.Trunc(parentRoot[:])), - }).Debug("Requesting parent block") - parentRoots = append(parentRoots, b.Block().ParentRoot()) - - span.End() + // Request parent block if not in the pending queue and not in the database. + isParentBlockInDB := s.cfg.beaconDB.HasBlock(ctx, parentRoot) + if !inPendingQueue && !isParentBlockInDB && s.hasPeer() { + log.WithFields(logrus.Fields{"currentSlot": b.Block().Slot(), "parentRoot": hex.EncodeToString(parentRoot[:])}).Debug("Requesting parent block") + parentRoots = append(parentRoots, parentRoot) + continue + } + if !isParentBlockInDB { continue } - if !parentInDb { - span.End() + // Process and broadcast the block. + if err := s.processAndBroadcastBlock(ctx, b, blkRoot); err != nil { + s.handleBlockProcessingError(ctx, err, b, blkRoot) continue } - err = s.validateBeaconBlock(ctx, b, blkRoot) - switch { - case errors.Is(ErrOptimisticParent, err): // Ok to continue process block with parent that is an optimistic candidate. - case err != nil: - log.WithError(err).WithField("slot", b.Block().Slot()).Debug("Could not validate block") - tracing.AnnotateError(span, err) - span.End() - continue - default: - } - - if err := s.cfg.chain.ReceiveBlock(ctx, b, blkRoot); err != nil { - if blockchain.IsInvalidBlock(err) { - r := blockchain.InvalidBlockRoot(err) - if r != [32]byte{} { - s.setBadBlock(ctx, r) // Setting head block as bad. - } else { - s.setBadBlock(ctx, blkRoot) - } - } - log.WithError(err).WithField("slot", b.Block().Slot()).Debug("Could not process block") - - // In the next iteration of the queue, this block will be removed from - // the pending queue as it has been marked as a 'bad' block. - span.End() - continue - } - - s.setSeenBlockIndexSlot(b.Block().Slot(), b.Block().ProposerIndex()) - - // Broadcasting the block again once a node is able to process it. - pb, err := b.Proto() - if err != nil { - log.WithError(err).Debug("Could not get protobuf block") - } else { - if err := s.cfg.p2p.Broadcast(ctx, pb); err != nil { - log.WithError(err).Debug("Could not broadcast block") - } - } - - s.pendingQueueLock.Lock() - if err := s.deleteBlockFromPendingQueue(slot, b, blkRoot); err != nil { - s.pendingQueueLock.Unlock() + // Remove the processed block from the queue. + if err := s.removeBlockFromQueue(b, blkRoot); err != nil { return err } - s.pendingQueueLock.Unlock() + log.WithFields(logrus.Fields{"slot": slot, "blockRoot": hex.EncodeToString(bytesutil.Trunc(blkRoot[:]))}).Debug("Processed pending block and cleared it in cache") + } + span.End() + } + return s.sendBatchRootRequest(ctx, parentRoots, randGen) +} - log.WithFields(logrus.Fields{ - "slot": slot, - "blockRoot": hex.EncodeToString(bytesutil.Trunc(blkRoot[:])), - }).Debug("Processed pending block and cleared it in cache") +// startInnerSpan starts a new tracing span for an inner loop and returns the new context and span. +func startInnerSpan(ctx context.Context, slot primitives.Slot) (context.Context, *trace.Span) { + ctx, span := trace.StartSpan(ctx, "processPendingBlocks.InnerLoop") + span.AddAttributes(trace.Int64Attribute("slot", int64(slot))) // lint:ignore uintcast -- This conversion is OK for tracing. + return ctx, span +} - span.End() +// getBlocksInQueue retrieves the blocks in the pending queue for a given slot. +func (s *Service) getBlocksInQueue(slot primitives.Slot) []interfaces.ReadOnlySignedBeaconBlock { + s.pendingQueueLock.RLock() + defer s.pendingQueueLock.RUnlock() + return s.pendingBlocksInCache(slot) +} + +// removeBlockFromQueue removes a block from the pending queue. +func (s *Service) removeBlockFromQueue(b interfaces.ReadOnlySignedBeaconBlock, blkRoot [32]byte) error { + s.pendingQueueLock.Lock() + defer s.pendingQueueLock.Unlock() + if err := s.deleteBlockFromPendingQueue(b.Block().Slot(), b, blkRoot); err != nil { + return err + } + return nil +} + +// isBlockInQueue checks if a block's parent root is in the pending queue. +func (s *Service) isBlockInQueue(parentRoot [32]byte) bool { + s.pendingQueueLock.RLock() + defer s.pendingQueueLock.RUnlock() + + return s.seenPendingBlocks[parentRoot] +} + +func (s *Service) hasPeer() bool { + return len(s.cfg.p2p.Peers().Connected()) > 0 +} + +// processAndBroadcastBlock validates, processes, and broadcasts a block. +func (s *Service) processAndBroadcastBlock(ctx context.Context, b interfaces.ReadOnlySignedBeaconBlock, blkRoot [32]byte) error { + if err := s.validateBeaconBlock(ctx, b, blkRoot); err != nil { + if !errors.Is(ErrOptimisticParent, err) { + log.WithError(err).WithField("slot", b.Block().Slot()).Debug("Could not validate block") + return err } } - return s.sendBatchRootRequest(ctx, parentRoots, randGen) + if err := s.cfg.chain.ReceiveBlock(ctx, b, blkRoot); err != nil { + return err + } + + s.setSeenBlockIndexSlot(b.Block().Slot(), b.Block().ProposerIndex()) + + pb, err := b.Proto() + if err != nil { + log.WithError(err).Debug("Could not get protobuf block") + return err + } + if err := s.cfg.p2p.Broadcast(ctx, pb); err != nil { + log.WithError(err).Debug("Could not broadcast block") + return err + } + + return nil +} + +// handleBlockProcessingError handles errors during block processing. +func (s *Service) handleBlockProcessingError(ctx context.Context, err error, b interfaces.ReadOnlySignedBeaconBlock, blkRoot [32]byte) { + if blockchain.IsInvalidBlock(err) { + s.setBadBlock(ctx, blkRoot) + } + log.WithError(err).WithField("slot", b.Block().Slot()).Debug("Could not process block") } func (s *Service) checkIfBlockIsBad(