Refactor Pending Block Queue Logic in Sync Package (#13026)

* Clean up pending block queue

* Kasey's feedback

* Kasey's feedback on validateBeaconBlock err handling

* Clean up handleBlockProcessingError

* Clean up old comments

* James feedback

* has peer helper

* Reuse parent reoot

---------

Co-authored-by: prylabs-bulldozer[bot] <58059840+prylabs-bulldozer[bot]@users.noreply.github.com>
This commit is contained in:
terencechain
2023-10-11 07:41:24 -07:00
committed by GitHub
parent 55e4c6e1db
commit c5501f8775

View File

@@ -49,81 +49,68 @@ func (s *Service) processPendingBlocksQueue() {
})
}
// processes the block tree inside the queue
// processPendingBlocks validates, processes, and broadcasts pending blocks.
func (s *Service) processPendingBlocks(ctx context.Context) error {
ctx, span := trace.StartSpan(ctx, "processPendingBlocks")
defer span.End()
pids := s.cfg.p2p.Peers().Connected()
// Validate pending slots before processing.
if err := s.validatePendingSlots(); err != nil {
return errors.Wrap(err, "could not validate pending slots")
}
ss := s.sortedPendingSlots()
var parentRoots [][32]byte
span.AddAttributes(
trace.Int64Attribute("numSlots", int64(len(ss))),
trace.Int64Attribute("numPeers", int64(len(pids))),
)
// Sort slots for ordered processing.
sortedSlots := s.sortedPendingSlots()
span.AddAttributes(trace.Int64Attribute("numSlots", int64(len(sortedSlots))), trace.Int64Attribute("numPeers", int64(len(s.cfg.p2p.Peers().Connected()))))
randGen := rand.NewGenerator()
for _, slot := range ss {
// process the blocks during their respective slot.
// otherwise wait for the right slot to process the block.
var parentRoots [][32]byte
// Iterate through sorted slots.
for _, slot := range sortedSlots {
// Skip processing if slot is in the future.
if slot > s.cfg.clock.CurrentSlot() {
continue
}
ctx, span := trace.StartSpan(ctx, "processPendingBlocks.InnerLoop")
span.AddAttributes(trace.Int64Attribute("slot", int64(slot))) // lint:ignore uintcast -- This conversion is OK for tracing.
ctx, span := startInnerSpan(ctx, slot)
s.pendingQueueLock.RLock()
bs := s.pendingBlocksInCache(slot)
// Skip if there's no block in the queue.
if len(bs) == 0 {
s.pendingQueueLock.RUnlock()
// Get blocks in the pending queue for the current slot.
blocksInCache := s.getBlocksInQueue(slot)
if len(blocksInCache) == 0 {
span.End()
continue
}
s.pendingQueueLock.RUnlock()
// Loop through the pending queue and mark the potential parent blocks as seen.
for _, b := range bs {
if b == nil || b.IsNil() || b.Block().IsNil() {
span.End()
// Process each block in the queue.
for _, b := range blocksInCache {
if err := blocks.BeaconBlockIsNil(b); err != nil {
continue
}
blkRoot, err := b.Block().HashTreeRoot()
if err != nil {
tracing.AnnotateError(span, err)
span.End()
return err
}
// No need to process the same block if we are already processing it
// Skip blocks that are already being processed.
if s.cfg.chain.BlockBeingSynced(blkRoot) {
rootString := fmt.Sprintf("%#x", blkRoot)
log.WithField("BlockRoot", rootString).Info("Skipping pending block already being processed")
log.WithField("BlockRoot", fmt.Sprintf("%#x", blkRoot)).Info("Skipping pending block already being processed")
continue
}
inDB := s.cfg.beaconDB.HasBlock(ctx, blkRoot)
// No need to process the same block twice.
if inDB {
s.pendingQueueLock.Lock()
if err = s.deleteBlockFromPendingQueue(slot, b, blkRoot); err != nil {
s.pendingQueueLock.Unlock()
// Remove and skip blocks already in the database.
if s.cfg.beaconDB.HasBlock(ctx, blkRoot) {
if err := s.removeBlockFromQueue(b, blkRoot); err != nil {
return err
}
s.pendingQueueLock.Unlock()
span.End()
continue
}
s.pendingQueueLock.RLock()
inPendingQueue := s.seenPendingBlocks[b.Block().ParentRoot()]
s.pendingQueueLock.RUnlock()
parentRoot := b.Block().ParentRoot()
inPendingQueue := s.isBlockInQueue(parentRoot)
// Check if block is bad.
keepProcessing, err := s.checkIfBlockIsBad(ctx, span, slot, b, blkRoot)
if err != nil {
return err
@@ -132,85 +119,104 @@ func (s *Service) processPendingBlocks(ctx context.Context) error {
continue
}
parentRoot := b.Block().ParentRoot()
parentInDb := s.cfg.beaconDB.HasBlock(ctx, parentRoot)
hasPeer := len(pids) != 0
// Only request for missing parent block if it's not in beaconDB, not in pending cache
// and has peer in the peer list.
if !inPendingQueue && !parentInDb && hasPeer {
log.WithFields(logrus.Fields{
"currentSlot": b.Block().Slot(),
"parentRoot": hex.EncodeToString(bytesutil.Trunc(parentRoot[:])),
}).Debug("Requesting parent block")
parentRoots = append(parentRoots, b.Block().ParentRoot())
span.End()
// Request parent block if not in the pending queue and not in the database.
isParentBlockInDB := s.cfg.beaconDB.HasBlock(ctx, parentRoot)
if !inPendingQueue && !isParentBlockInDB && s.hasPeer() {
log.WithFields(logrus.Fields{"currentSlot": b.Block().Slot(), "parentRoot": hex.EncodeToString(parentRoot[:])}).Debug("Requesting parent block")
parentRoots = append(parentRoots, parentRoot)
continue
}
if !isParentBlockInDB {
continue
}
if !parentInDb {
span.End()
// Process and broadcast the block.
if err := s.processAndBroadcastBlock(ctx, b, blkRoot); err != nil {
s.handleBlockProcessingError(ctx, err, b, blkRoot)
continue
}
err = s.validateBeaconBlock(ctx, b, blkRoot)
switch {
case errors.Is(ErrOptimisticParent, err): // Ok to continue process block with parent that is an optimistic candidate.
case err != nil:
log.WithError(err).WithField("slot", b.Block().Slot()).Debug("Could not validate block")
tracing.AnnotateError(span, err)
span.End()
continue
default:
}
if err := s.cfg.chain.ReceiveBlock(ctx, b, blkRoot); err != nil {
if blockchain.IsInvalidBlock(err) {
r := blockchain.InvalidBlockRoot(err)
if r != [32]byte{} {
s.setBadBlock(ctx, r) // Setting head block as bad.
} else {
s.setBadBlock(ctx, blkRoot)
}
}
log.WithError(err).WithField("slot", b.Block().Slot()).Debug("Could not process block")
// In the next iteration of the queue, this block will be removed from
// the pending queue as it has been marked as a 'bad' block.
span.End()
continue
}
s.setSeenBlockIndexSlot(b.Block().Slot(), b.Block().ProposerIndex())
// Broadcasting the block again once a node is able to process it.
pb, err := b.Proto()
if err != nil {
log.WithError(err).Debug("Could not get protobuf block")
} else {
if err := s.cfg.p2p.Broadcast(ctx, pb); err != nil {
log.WithError(err).Debug("Could not broadcast block")
}
}
s.pendingQueueLock.Lock()
if err := s.deleteBlockFromPendingQueue(slot, b, blkRoot); err != nil {
s.pendingQueueLock.Unlock()
// Remove the processed block from the queue.
if err := s.removeBlockFromQueue(b, blkRoot); err != nil {
return err
}
s.pendingQueueLock.Unlock()
log.WithFields(logrus.Fields{"slot": slot, "blockRoot": hex.EncodeToString(bytesutil.Trunc(blkRoot[:]))}).Debug("Processed pending block and cleared it in cache")
}
span.End()
}
return s.sendBatchRootRequest(ctx, parentRoots, randGen)
}
log.WithFields(logrus.Fields{
"slot": slot,
"blockRoot": hex.EncodeToString(bytesutil.Trunc(blkRoot[:])),
}).Debug("Processed pending block and cleared it in cache")
// startInnerSpan starts a new tracing span for an inner loop and returns the new context and span.
func startInnerSpan(ctx context.Context, slot primitives.Slot) (context.Context, *trace.Span) {
ctx, span := trace.StartSpan(ctx, "processPendingBlocks.InnerLoop")
span.AddAttributes(trace.Int64Attribute("slot", int64(slot))) // lint:ignore uintcast -- This conversion is OK for tracing.
return ctx, span
}
span.End()
// getBlocksInQueue retrieves the blocks in the pending queue for a given slot.
func (s *Service) getBlocksInQueue(slot primitives.Slot) []interfaces.ReadOnlySignedBeaconBlock {
s.pendingQueueLock.RLock()
defer s.pendingQueueLock.RUnlock()
return s.pendingBlocksInCache(slot)
}
// removeBlockFromQueue removes a block from the pending queue.
func (s *Service) removeBlockFromQueue(b interfaces.ReadOnlySignedBeaconBlock, blkRoot [32]byte) error {
s.pendingQueueLock.Lock()
defer s.pendingQueueLock.Unlock()
if err := s.deleteBlockFromPendingQueue(b.Block().Slot(), b, blkRoot); err != nil {
return err
}
return nil
}
// isBlockInQueue checks if a block's parent root is in the pending queue.
func (s *Service) isBlockInQueue(parentRoot [32]byte) bool {
s.pendingQueueLock.RLock()
defer s.pendingQueueLock.RUnlock()
return s.seenPendingBlocks[parentRoot]
}
func (s *Service) hasPeer() bool {
return len(s.cfg.p2p.Peers().Connected()) > 0
}
// processAndBroadcastBlock validates, processes, and broadcasts a block.
func (s *Service) processAndBroadcastBlock(ctx context.Context, b interfaces.ReadOnlySignedBeaconBlock, blkRoot [32]byte) error {
if err := s.validateBeaconBlock(ctx, b, blkRoot); err != nil {
if !errors.Is(ErrOptimisticParent, err) {
log.WithError(err).WithField("slot", b.Block().Slot()).Debug("Could not validate block")
return err
}
}
return s.sendBatchRootRequest(ctx, parentRoots, randGen)
if err := s.cfg.chain.ReceiveBlock(ctx, b, blkRoot); err != nil {
return err
}
s.setSeenBlockIndexSlot(b.Block().Slot(), b.Block().ProposerIndex())
pb, err := b.Proto()
if err != nil {
log.WithError(err).Debug("Could not get protobuf block")
return err
}
if err := s.cfg.p2p.Broadcast(ctx, pb); err != nil {
log.WithError(err).Debug("Could not broadcast block")
return err
}
return nil
}
// handleBlockProcessingError handles errors during block processing.
func (s *Service) handleBlockProcessingError(ctx context.Context, err error, b interfaces.ReadOnlySignedBeaconBlock, blkRoot [32]byte) {
if blockchain.IsInvalidBlock(err) {
s.setBadBlock(ctx, blkRoot)
}
log.WithError(err).WithField("slot", b.Block().Slot()).Debug("Could not process block")
}
func (s *Service) checkIfBlockIsBad(