Pending queue to handle same slot blocks (#7036)

* Pending queue: rough draft
* Removed extra RUnlock
* apply diff
* Update pending queue tests
* Merge branch 'fix-pending-blocks' of github.com:prysmaticlabs/prysm into fix-pending-blocks
* Merge branch 'master' into fix-pending-blocks
* Comments
* Merge branch 'fix-pending-blocks' of github.com:prysmaticlabs/prysm into fix-pending-blocks
This commit is contained in:
terence tsao
2020-08-17 17:01:32 -07:00
committed by GitHub
parent dbd1e8c247
commit 9caa92cae4
7 changed files with 180 additions and 130 deletions

View File

@@ -6,6 +6,7 @@ import (
"sort"
"sync"
"github.com/gogo/protobuf/proto"
"github.com/pkg/errors"
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
@@ -57,100 +58,111 @@ func (s *Service) processPendingBlocks(ctx context.Context) error {
span.AddAttributes(trace.Int64Attribute("slot", int64(slot)))
s.pendingQueueLock.RLock()
b := s.slotToPendingBlocks[slot]
// Skip if block does not exist.
if b == nil || b.Block == nil {
bs := s.slotToPendingBlocks[slot]
// Skip if there's no block in the queue.
if len(bs) == 0 {
s.pendingQueueLock.RUnlock()
span.End()
continue
}
inPendingQueue := s.seenPendingBlocks[bytesutil.ToBytes32(b.Block.ParentRoot)]
s.pendingQueueLock.RUnlock()
blkRoot, err := stateutil.BlockRoot(b.Block)
if err != nil {
traceutil.AnnotateError(span, err)
span.End()
return err
}
parentIsBad := s.hasBadBlock(bytesutil.ToBytes32(b.Block.ParentRoot))
blockIsBad := s.hasBadBlock(blkRoot)
// Check if parent is a bad block.
if parentIsBad || blockIsBad {
// Set block as bad if its parent block is bad too.
if parentIsBad {
s.setBadBlock(blkRoot)
// Loop through the pending queue and mark the potential parent blocks as seen.
for _, b := range bs {
if b == nil || b.Block == nil {
span.End()
continue
}
// Remove block from queue.
s.pendingQueueLock.RLock()
inPendingQueue := s.seenPendingBlocks[bytesutil.ToBytes32(b.Block.ParentRoot)]
s.pendingQueueLock.RUnlock()
blkRoot, err := stateutil.BlockRoot(b.Block)
if err != nil {
traceutil.AnnotateError(span, err)
span.End()
return err
}
parentIsBad := s.hasBadBlock(bytesutil.ToBytes32(b.Block.ParentRoot))
blockIsBad := s.hasBadBlock(blkRoot)
// Check if parent is a bad block.
if parentIsBad || blockIsBad {
// Set block as bad if its parent block is bad too.
if parentIsBad {
s.setBadBlock(blkRoot)
}
// Remove block from queue.
s.pendingQueueLock.Lock()
s.deleteBlockFromPendingQueue(slot, b)
delete(s.seenPendingBlocks, blkRoot)
s.pendingQueueLock.Unlock()
span.End()
continue
}
inDB := s.db.HasBlock(ctx, bytesutil.ToBytes32(b.Block.ParentRoot))
hasPeer := len(pids) != 0
// Only request for missing parent block if it's not in DB, not in pending cache
// and has peer in the peer list.
if !inPendingQueue && !inDB && hasPeer {
log.WithFields(logrus.Fields{
"currentSlot": b.Block.Slot,
"parentRoot": hex.EncodeToString(bytesutil.Trunc(b.Block.ParentRoot)),
}).Info("Requesting parent block")
req := [][32]byte{bytesutil.ToBytes32(b.Block.ParentRoot)}
// Start with a random peer to query, but choose the first peer in our unsorted list that claims to
// have a head slot newer than the block slot we are requesting.
pid := pids[randGen.Int()%len(pids)]
for _, p := range pids {
cs, err := s.p2p.Peers().ChainState(p)
if err != nil {
return errors.Wrap(err, "failed to read chain state for peer")
}
if cs != nil && cs.HeadSlot >= slot {
pid = p
break
}
}
if err := s.sendRecentBeaconBlocksRequest(ctx, req, pid); err != nil {
traceutil.AnnotateError(span, err)
log.Debugf("Could not send recent block request: %v", err)
}
span.End()
continue
}
if !inDB {
span.End()
continue
}
if err := s.chain.ReceiveBlock(ctx, b, blkRoot); err != nil {
log.Debugf("Could not process block from slot %d: %v", b.Block.Slot, err)
s.setBadBlock(blkRoot)
traceutil.AnnotateError(span, err)
}
// Broadcasting the block again once a node is able to process it.
if err := s.p2p.Broadcast(ctx, b); err != nil {
log.WithError(err).Debug("Failed to broadcast block")
}
s.pendingQueueLock.Lock()
delete(s.slotToPendingBlocks, slot)
s.deleteBlockFromPendingQueue(slot, b)
delete(s.seenPendingBlocks, blkRoot)
s.pendingQueueLock.Unlock()
span.End()
continue
}
inDB := s.db.HasBlock(ctx, bytesutil.ToBytes32(b.Block.ParentRoot))
hasPeer := len(pids) != 0
// Only request for missing parent block if it's not in DB, not in pending cache
// and has peer in the peer list.
if !inPendingQueue && !inDB && hasPeer {
log.WithFields(logrus.Fields{
"currentSlot": b.Block.Slot,
"parentRoot": hex.EncodeToString(bytesutil.Trunc(b.Block.ParentRoot)),
}).Info("Requesting parent block")
req := [][32]byte{bytesutil.ToBytes32(b.Block.ParentRoot)}
"slot": slot,
"blockRoot": hex.EncodeToString(bytesutil.Trunc(blkRoot[:])),
}).Debug("Processed pending block and cleared it in cache")
// Start with a random peer to query, but choose the first peer in our unsorted list that claims to
// have a head slot newer than the block slot we are requesting.
pid := pids[randGen.Int()%len(pids)]
for _, p := range pids {
cs, err := s.p2p.Peers().ChainState(p)
if err != nil {
return errors.Wrap(err, "failed to read chain state for peer")
}
if cs != nil && cs.HeadSlot >= slot {
pid = p
break
}
}
if err := s.sendRecentBeaconBlocksRequest(ctx, req, pid); err != nil {
traceutil.AnnotateError(span, err)
log.Debugf("Could not send recent block request: %v", err)
}
span.End()
continue
}
if !inDB {
span.End()
continue
}
if err := s.chain.ReceiveBlock(ctx, b, blkRoot); err != nil {
log.Debugf("Could not process block from slot %d: %v", b.Block.Slot, err)
s.setBadBlock(blkRoot)
traceutil.AnnotateError(span, err)
}
// Broadcasting the block again once a node is able to process it.
if err := s.p2p.Broadcast(ctx, b); err != nil {
log.WithError(err).Debug("Failed to broadcast block")
}
s.pendingQueueLock.Lock()
delete(s.slotToPendingBlocks, slot)
delete(s.seenPendingBlocks, blkRoot)
s.pendingQueueLock.Unlock()
log.WithFields(logrus.Fields{
"slot": slot,
"blockRoot": hex.EncodeToString(bytesutil.Trunc(blkRoot[:])),
}).Debug("Processed pending block and cleared it in cache")
span.End()
}
return nil
@@ -179,28 +191,30 @@ func (s *Service) validatePendingSlots() error {
oldBlockRoots := make(map[[32]byte]bool)
finalizedEpoch := s.chain.FinalizedCheckpt().Epoch
for slot, b := range s.slotToPendingBlocks {
epoch := helpers.SlotToEpoch(slot)
// remove all descendant blocks of old blocks
if oldBlockRoots[bytesutil.ToBytes32(b.Block.ParentRoot)] {
root, err := stateutil.BlockRoot(b.Block)
if err != nil {
return err
for slot, blks := range s.slotToPendingBlocks {
for _, b := range blks {
epoch := helpers.SlotToEpoch(slot)
// remove all descendant blocks of old blocks
if oldBlockRoots[bytesutil.ToBytes32(b.Block.ParentRoot)] {
root, err := stateutil.BlockRoot(b.Block)
if err != nil {
return err
}
oldBlockRoots[root] = true
s.deleteBlockFromPendingQueue(slot, b)
delete(s.seenPendingBlocks, root)
continue
}
oldBlockRoots[root] = true
delete(s.slotToPendingBlocks, slot)
delete(s.seenPendingBlocks, root)
continue
}
// don't process old blocks
if finalizedEpoch > 0 && epoch <= finalizedEpoch {
blkRoot, err := stateutil.BlockRoot(b.Block)
if err != nil {
return err
// don't process old blocks
if finalizedEpoch > 0 && epoch <= finalizedEpoch {
blkRoot, err := stateutil.BlockRoot(b.Block)
if err != nil {
return err
}
oldBlockRoots[blkRoot] = true
s.deleteBlockFromPendingQueue(slot, b)
delete(s.seenPendingBlocks, blkRoot)
}
oldBlockRoots[blkRoot] = true
delete(s.slotToPendingBlocks, slot)
delete(s.seenPendingBlocks, blkRoot)
}
}
return nil
@@ -209,6 +223,38 @@ func (s *Service) validatePendingSlots() error {
func (s *Service) clearPendingSlots() {
s.pendingQueueLock.Lock()
defer s.pendingQueueLock.Unlock()
s.slotToPendingBlocks = make(map[uint64]*ethpb.SignedBeaconBlock)
s.slotToPendingBlocks = make(map[uint64][]*ethpb.SignedBeaconBlock)
s.seenPendingBlocks = make(map[[32]byte]bool)
}
// Delete block from the list from the pending queue using the slot as key.
// Note: this helper is not thread safe.
func (s *Service) deleteBlockFromPendingQueue(slot uint64, b *ethpb.SignedBeaconBlock) {
blks, ok := s.slotToPendingBlocks[slot]
if !ok {
return
}
newBlks := make([]*ethpb.SignedBeaconBlock, 0, len(blks))
for _, blk := range blks {
if proto.Equal(blk, b) {
continue
}
newBlks = append(newBlks, blk)
}
if len(newBlks) == 0 {
delete(s.slotToPendingBlocks, slot)
return
}
s.slotToPendingBlocks[slot] = newBlks
}
// Insert block to the list in the pending queue using the slot as key.
// Note: this helper is not thread safe.
func (s *Service) insertBlockToPendingQueue(slot uint64, b *ethpb.SignedBeaconBlock) {
_, ok := s.slotToPendingBlocks[slot]
if ok {
s.slotToPendingBlocks[slot] = append(s.slotToPendingBlocks[slot], b)
return
}
s.slotToPendingBlocks[slot] = []*ethpb.SignedBeaconBlock{b}
}