mirror of
https://github.com/OffchainLabs/prysm.git
synced 2026-01-09 15:37:56 -05:00
Prevent Reprocessing of a Block From Our Pending Queue (#9904)
* fix bugs * test * raul's review
This commit is contained in:
@@ -88,16 +88,29 @@ func (s *Service) processPendingBlocks(ctx context.Context) error {
|
||||
continue
|
||||
}
|
||||
|
||||
s.pendingQueueLock.RLock()
|
||||
inPendingQueue := s.seenPendingBlocks[bytesutil.ToBytes32(b.Block().ParentRoot())]
|
||||
s.pendingQueueLock.RUnlock()
|
||||
|
||||
blkRoot, err := b.Block().HashTreeRoot()
|
||||
if err != nil {
|
||||
tracing.AnnotateError(span, err)
|
||||
span.End()
|
||||
return err
|
||||
}
|
||||
inDB := s.cfg.beaconDB.HasBlock(ctx, blkRoot)
|
||||
// No need to process the same block twice.
|
||||
if inDB {
|
||||
s.pendingQueueLock.Lock()
|
||||
if err := s.deleteBlockFromPendingQueue(slot, b, blkRoot); err != nil {
|
||||
s.pendingQueueLock.Unlock()
|
||||
return err
|
||||
}
|
||||
s.pendingQueueLock.Unlock()
|
||||
span.End()
|
||||
continue
|
||||
}
|
||||
|
||||
s.pendingQueueLock.RLock()
|
||||
inPendingQueue := s.seenPendingBlocks[bytesutil.ToBytes32(b.Block().ParentRoot())]
|
||||
s.pendingQueueLock.RUnlock()
|
||||
|
||||
parentIsBad := s.hasBadBlock(bytesutil.ToBytes32(b.Block().ParentRoot()))
|
||||
blockIsBad := s.hasBadBlock(blkRoot)
|
||||
// Check if parent is a bad block.
|
||||
@@ -117,12 +130,12 @@ func (s *Service) processPendingBlocks(ctx context.Context) error {
|
||||
continue
|
||||
}
|
||||
|
||||
inDB := s.cfg.beaconDB.HasBlock(ctx, bytesutil.ToBytes32(b.Block().ParentRoot()))
|
||||
parentInDb := s.cfg.beaconDB.HasBlock(ctx, bytesutil.ToBytes32(b.Block().ParentRoot()))
|
||||
hasPeer := len(pids) != 0
|
||||
|
||||
// Only request for missing parent block if it's not in beaconDB, not in pending cache
|
||||
// and has peer in the peer list.
|
||||
if !inPendingQueue && !inDB && hasPeer {
|
||||
if !inPendingQueue && !parentInDb && hasPeer {
|
||||
log.WithFields(logrus.Fields{
|
||||
"currentSlot": b.Block().Slot(),
|
||||
"parentRoot": hex.EncodeToString(bytesutil.Trunc(b.Block().ParentRoot())),
|
||||
@@ -133,7 +146,7 @@ func (s *Service) processPendingBlocks(ctx context.Context) error {
|
||||
continue
|
||||
}
|
||||
|
||||
if !inDB {
|
||||
if !parentInDb {
|
||||
span.End()
|
||||
continue
|
||||
}
|
||||
@@ -167,6 +180,7 @@ func (s *Service) processPendingBlocks(ctx context.Context) error {
|
||||
|
||||
s.pendingQueueLock.Lock()
|
||||
if err := s.deleteBlockFromPendingQueue(slot, b, blkRoot); err != nil {
|
||||
s.pendingQueueLock.Unlock()
|
||||
return err
|
||||
}
|
||||
s.pendingQueueLock.Unlock()
|
||||
@@ -321,6 +335,7 @@ func (s *Service) deleteBlockFromPendingQueue(slot types.Slot, b block.SignedBea
|
||||
}
|
||||
if len(newBlks) == 0 {
|
||||
s.slotToPendingBlocks.Delete(slotToCacheKey(slot))
|
||||
delete(s.seenPendingBlocks, r)
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
@@ -84,9 +84,13 @@ func TestRegularSyncBeaconBlockSubscriber_ProcessPendingBlocks1(t *testing.T) {
|
||||
require.NoError(t, r.insertBlockToPendingQueue(b1.Block.Slot, wrapper.WrappedPhase0SignedBeaconBlock(b1), b1Root))
|
||||
require.NoError(t, r.cfg.beaconDB.SaveBlock(context.Background(), wrapper.WrappedPhase0SignedBeaconBlock(b1)))
|
||||
|
||||
// Insert bad b1 in the cache to verify the good one doesn't get replaced.
|
||||
require.NoError(t, r.insertBlockToPendingQueue(b1.Block.Slot, wrapper.WrappedPhase0SignedBeaconBlock(util.NewBeaconBlock()), [32]byte{}))
|
||||
nBlock := util.NewBeaconBlock()
|
||||
nBlock.Block.Slot = b1.Block.Slot
|
||||
nRoot, err := nBlock.Block.HashTreeRoot()
|
||||
require.NoError(t, err)
|
||||
|
||||
// Insert bad b1 in the cache to verify the good one doesn't get replaced.
|
||||
require.NoError(t, r.insertBlockToPendingQueue(nBlock.Block.Slot, wrapper.WrappedPhase0SignedBeaconBlock(nBlock), nRoot))
|
||||
require.NoError(t, r.processPendingBlocks(context.Background())) // Marks a block as bad
|
||||
require.NoError(t, r.processPendingBlocks(context.Background())) // Bad block removed on second run
|
||||
|
||||
@@ -140,6 +144,46 @@ func TestRegularSync_InsertDuplicateBlocks(t *testing.T) {
|
||||
|
||||
}
|
||||
|
||||
func TestRegularSyncBeaconBlockSubscriber_DoNotReprocessBlock(t *testing.T) {
|
||||
db := dbtest.SetupDB(t)
|
||||
|
||||
p1 := p2ptest.NewTestP2P(t)
|
||||
r := &Service{
|
||||
cfg: &config{
|
||||
p2p: p1,
|
||||
beaconDB: db,
|
||||
chain: &mock.ChainService{
|
||||
FinalizedCheckPoint: ðpb.Checkpoint{
|
||||
Epoch: 0,
|
||||
},
|
||||
},
|
||||
stateGen: stategen.New(db),
|
||||
},
|
||||
slotToPendingBlocks: gcache.New(time.Second, 2*time.Second),
|
||||
seenPendingBlocks: make(map[[32]byte]bool),
|
||||
}
|
||||
r.initCaches()
|
||||
|
||||
b0 := util.NewBeaconBlock()
|
||||
require.NoError(t, r.cfg.beaconDB.SaveBlock(context.Background(), wrapper.WrappedPhase0SignedBeaconBlock(b0)))
|
||||
b0Root, err := b0.Block.HashTreeRoot()
|
||||
require.NoError(t, err)
|
||||
b3 := util.NewBeaconBlock()
|
||||
b3.Block.Slot = 3
|
||||
b3.Block.ParentRoot = b0Root[:]
|
||||
b3Root, err := b3.Block.HashTreeRoot()
|
||||
require.NoError(t, err)
|
||||
|
||||
require.NoError(t, r.cfg.beaconDB.SaveBlock(context.Background(), wrapper.WrappedPhase0SignedBeaconBlock(b3)))
|
||||
|
||||
// Add b3 to the cache
|
||||
require.NoError(t, r.insertBlockToPendingQueue(b3.Block.Slot, wrapper.WrappedPhase0SignedBeaconBlock(b3), b3Root))
|
||||
|
||||
require.NoError(t, r.processPendingBlocks(context.Background()))
|
||||
assert.Equal(t, 0, len(r.slotToPendingBlocks.Items()), "Incorrect size for slot to pending blocks cache")
|
||||
assert.Equal(t, 0, len(r.seenPendingBlocks), "Incorrect size for seen pending block")
|
||||
}
|
||||
|
||||
// /- b1 - b2 - b5
|
||||
// b0
|
||||
// \- b3 - b4
|
||||
@@ -237,7 +281,7 @@ func TestRegularSyncBeaconBlockSubscriber_ProcessPendingBlocks_2Chains(t *testin
|
||||
require.NoError(t, r.processPendingBlocks(context.Background())) // Bad block removed on second run
|
||||
|
||||
assert.Equal(t, 1, len(r.slotToPendingBlocks.Items()), "Incorrect size for slot to pending blocks cache")
|
||||
assert.Equal(t, 3, len(r.seenPendingBlocks), "Incorrect size for seen pending block")
|
||||
assert.Equal(t, 1, len(r.seenPendingBlocks), "Incorrect size for seen pending block")
|
||||
|
||||
// Add b2 to the cache
|
||||
require.NoError(t, r.insertBlockToPendingQueue(b2.Block.Slot, wrapper.WrappedPhase0SignedBeaconBlock(b2), b2Root))
|
||||
@@ -248,7 +292,7 @@ func TestRegularSyncBeaconBlockSubscriber_ProcessPendingBlocks_2Chains(t *testin
|
||||
require.NoError(t, r.processPendingBlocks(context.Background())) // Bad block removed on second run
|
||||
|
||||
assert.Equal(t, 0, len(r.slotToPendingBlocks.Items()), "Incorrect size for slot to pending blocks cache")
|
||||
assert.Equal(t, 4, len(r.seenPendingBlocks), "Incorrect size for seen pending block")
|
||||
assert.Equal(t, 0, len(r.seenPendingBlocks), "Incorrect size for seen pending block")
|
||||
}
|
||||
|
||||
func TestRegularSyncBeaconBlockSubscriber_PruneOldPendingBlocks(t *testing.T) {
|
||||
@@ -318,7 +362,7 @@ func TestRegularSyncBeaconBlockSubscriber_PruneOldPendingBlocks(t *testing.T) {
|
||||
|
||||
require.NoError(t, r.processPendingBlocks(context.Background()))
|
||||
assert.Equal(t, 0, len(r.slotToPendingBlocks.Items()), "Incorrect size for slot to pending blocks cache")
|
||||
assert.Equal(t, 4, len(r.seenPendingBlocks), "Incorrect size for seen pending block")
|
||||
assert.Equal(t, 0, len(r.seenPendingBlocks), "Incorrect size for seen pending block")
|
||||
}
|
||||
|
||||
func TestService_sortedPendingSlots(t *testing.T) {
|
||||
@@ -429,7 +473,7 @@ func TestService_BatchRootRequest(t *testing.T) {
|
||||
assert.Equal(t, 4, len(r.seenPendingBlocks), "Incorrect size for seen pending block")
|
||||
}
|
||||
|
||||
func TestService_AddPeningBlockToQueueOverMax(t *testing.T) {
|
||||
func TestService_AddPendingBlockToQueueOverMax(t *testing.T) {
|
||||
r := &Service{
|
||||
slotToPendingBlocks: gcache.New(time.Second, 2*time.Second),
|
||||
seenPendingBlocks: make(map[[32]byte]bool),
|
||||
|
||||
Reference in New Issue
Block a user