Pending blocks queue: Better locking priority (#4709)

* Better locking priority, use correct lock on validating pending blocks
This commit is contained in:
Preston Van Loon
2020-02-01 14:47:51 -08:00
committed by GitHub
parent 2a79c572a5
commit 069ec1726b
2 changed files with 13 additions and 8 deletions

View File

@@ -53,10 +53,12 @@ func (r *Service) processPendingBlocks(ctx context.Context) error {
b := r.slotToPendingBlocks[uint64(s)]
// Skip if block does not exist.
if b == nil || b.Block == nil {
r.pendingQueueLock.RUnlock()
span.End()
continue
}
inPendingQueue := r.seenPendingBlocks[bytesutil.ToBytes32(b.Block.ParentRoot)]
r.pendingQueueLock.RUnlock()
inPendingQueue := r.seenPendingBlocks[bytesutil.ToBytes32(b.Block.ParentRoot)]
inDB := r.db.HasBlock(ctx, bytesutil.ToBytes32(b.Block.ParentRoot))
hasPeer := len(pids) != 0
@@ -98,18 +100,21 @@ func (r *Service) processPendingBlocks(ctx context.Context) error {
traceutil.AnnotateError(span, err)
}
r.pendingQueueLock.Lock()
delete(r.slotToPendingBlocks, uint64(s))
blkRoot, err := ssz.HashTreeRoot(b.Block)
if err != nil {
traceutil.AnnotateError(span, err)
span.End()
return err
}
r.pendingQueueLock.Lock()
delete(r.slotToPendingBlocks, uint64(s))
delete(r.seenPendingBlocks, blkRoot)
r.pendingQueueLock.Unlock()
log.Infof("Processed ancestor block with slot %d and cleared pending block cache", s)
span.End()
}
return nil
}
@@ -130,8 +135,8 @@ func (r *Service) sortedPendingSlots() []int {
// by their slot. If they are before the current finalized
// checkpoint, these blocks are removed from the queue.
func (r *Service) validatePendingSlots() error {
r.pendingQueueLock.RLock()
defer r.pendingQueueLock.RUnlock()
r.pendingQueueLock.Lock()
defer r.pendingQueueLock.Unlock()
oldBlockRoots := make(map[[32]byte]bool)
finalizedEpoch := r.chain.FinalizedCheckpt().Epoch
@@ -159,7 +164,6 @@ func (r *Service) validatePendingSlots() error {
delete(r.seenPendingBlocks, blkRoot)
}
}
oldBlockRoots = nil
return nil
}

View File

@@ -30,12 +30,13 @@ func (r *Service) sendRecentBeaconBlocksRequest(ctx context.Context, blockRoots
log.WithError(err).Error("Unable to retrieve block from stream")
return err
}
r.pendingQueueLock.Lock()
r.slotToPendingBlocks[blk.Block.Slot] = blk
blkRoot, err := ssz.HashTreeRoot(blk.Block)
if err != nil {
return err
}
r.pendingQueueLock.Lock()
r.slotToPendingBlocks[blk.Block.Slot] = blk
r.seenPendingBlocks[blkRoot] = true
r.pendingQueueLock.Unlock()