Refactor and use has_block getter (#10592)

* Refactor and use has block getter

* Update blocks_fetcher_utils.go

* Fix tests

* Fix tests

* Fix test

Co-authored-by: prylabs-bulldozer[bot] <58059840+prylabs-bulldozer[bot]@users.noreply.github.com>
This commit is contained in:
terencechain
2022-05-03 17:17:40 -07:00
committed by GitHub
parent f763b35494
commit 03ae8672b6
13 changed files with 53 additions and 28 deletions

View File

@@ -21,7 +21,7 @@ var epochsSinceFinalitySaveHotStateDB = types.Epoch(100)
type BlockReceiver interface {
ReceiveBlock(ctx context.Context, block interfaces.SignedBeaconBlock, blockRoot [32]byte) error
ReceiveBlockBatch(ctx context.Context, blocks []interfaces.SignedBeaconBlock, blkRoots [][32]byte) error
HasInitSyncBlock(root [32]byte) bool
HasBlock(ctx context.Context, root [32]byte) bool
}
// ReceiveBlock is a function that defines the the operations (minus pubsub)
@@ -128,9 +128,9 @@ func (s *Service) ReceiveBlockBatch(ctx context.Context, blocks []interfaces.Sig
return nil
}
// HasInitSyncBlock returns true if the block of the input root exists in initial sync blocks cache.
func (s *Service) HasInitSyncBlock(root [32]byte) bool {
return s.hasInitSyncBlock(root)
// HasBlock returns true if the block of the input root exists in initial sync blocks cache or DB.
func (s *Service) HasBlock(ctx context.Context, root [32]byte) bool {
return s.hasBlockInInitSyncOrDB(ctx, root)
}
func (s *Service) handlePostBlockOperations(b interfaces.BeaconBlock) error {

View File

@@ -280,21 +280,29 @@ func TestService_ReceiveBlockBatch(t *testing.T) {
}
}
func TestService_HasInitSyncBlock(t *testing.T) {
opts := testServiceOptsNoDB()
func TestService_HasBlock(t *testing.T) {
opts := testServiceOptsWithDB(t)
opts = append(opts, WithStateNotifier(&blockchainTesting.MockStateNotifier{}))
s, err := NewService(context.Background(), opts...)
require.NoError(t, err)
r := [32]byte{'a'}
if s.HasInitSyncBlock(r) {
if s.HasBlock(context.Background(), r) {
t.Error("Should not have block")
}
wsb, err := wrapper.WrappedSignedBeaconBlock(util.NewBeaconBlock())
require.NoError(t, err)
s.saveInitSyncBlock(r, wsb)
if !s.HasInitSyncBlock(r) {
if !s.HasBlock(context.Background(), r) {
t.Error("Should have block")
}
b := util.NewBeaconBlock()
b.Block.Slot = 1
wsb, err = wrapper.WrappedSignedBeaconBlock(b)
require.NoError(t, err)
r, err = b.Block.HashTreeRoot()
require.NoError(t, err)
require.NoError(t, s.cfg.BeaconDB.SaveBlock(context.Background(), wsb))
require.Equal(t, true, s.HasBlock(context.Background(), r))
}
func TestCheckSaveHotStateDB_Enabling(t *testing.T) {

View File

@@ -357,8 +357,14 @@ func (s *ChainService) IsCanonical(_ context.Context, r [32]byte) (bool, error)
return true, nil
}
// HasInitSyncBlock mocks the same method in the chain service.
func (s *ChainService) HasInitSyncBlock(rt [32]byte) bool {
// HasBlock mocks the same method in the chain service.
func (s *ChainService) HasBlock(ctx context.Context, rt [32]byte) bool {
if s.DB == nil {
return false
}
if s.DB.HasBlock(ctx, rt) {
return true
}
if s.InitSyncBlockRoots == nil {
return false
}

View File

@@ -234,7 +234,7 @@ func (f *blocksFetcher) findForkWithPeer(ctx context.Context, pid peer.ID, slot
// Traverse blocks, and if we've got one that doesn't have parent in DB, backtrack on it.
for i, block := range blocks {
parentRoot := bytesutil.ToBytes32(block.Block().ParentRoot())
if !f.db.HasBlock(ctx, parentRoot) && !f.chain.HasInitSyncBlock(parentRoot) {
if !f.chain.HasBlock(ctx, parentRoot) {
log.WithFields(logrus.Fields{
"peer": pid,
"slot": block.Block().Slot(),
@@ -261,7 +261,7 @@ func (f *blocksFetcher) findAncestor(ctx context.Context, pid peer.ID, b interfa
outBlocks := []interfaces.SignedBeaconBlock{b}
for i := uint64(0); i < backtrackingMaxHops; i++ {
parentRoot := bytesutil.ToBytes32(outBlocks[len(outBlocks)-1].Block().ParentRoot())
if f.db.HasBlock(ctx, parentRoot) || f.chain.HasInitSyncBlock(parentRoot) {
if f.chain.HasBlock(ctx, parentRoot) {
// Common ancestor found, forward blocks back to processor.
sort.Slice(outBlocks, func(i, j int) bool {
return outBlocks[i].Block().Slot() < outBlocks[j].Block().Slot()

View File

@@ -223,7 +223,7 @@ func TestBlocksFetcher_findFork(t *testing.T) {
for _, blk := range chain1 {
blkRoot, err := blk.Block.HashTreeRoot()
require.NoError(t, err)
require.Equal(t, true, beaconDB.HasBlock(ctx, blkRoot) || mc.HasInitSyncBlock(blkRoot))
require.Equal(t, true, beaconDB.HasBlock(ctx, blkRoot) || mc.HasBlock(ctx, blkRoot))
}
assert.Equal(t, types.Slot(250), mc.HeadSlot())
@@ -273,7 +273,7 @@ func TestBlocksFetcher_findFork(t *testing.T) {
for _, blk := range chain2[forkSlot:] {
blkRoot, err := blk.Block.HashTreeRoot()
require.NoError(t, err)
require.Equal(t, false, beaconDB.HasBlock(ctx, blkRoot) || mc.HasInitSyncBlock(blkRoot))
require.Equal(t, false, beaconDB.HasBlock(ctx, blkRoot) || mc.HasBlock(ctx, blkRoot))
}
// Search for alternative paths (add single peer having alternative path).
@@ -299,7 +299,7 @@ func TestBlocksFetcher_findFork(t *testing.T) {
require.Equal(t, blk.Block.Slot, i, "incorrect block selected for slot %d", i)
// Only save is parent block exists.
parentRoot := bytesutil.ToBytes32(blk.Block.ParentRoot)
if beaconDB.HasBlock(ctx, parentRoot) || mc.HasInitSyncBlock(parentRoot) {
if beaconDB.HasBlock(ctx, parentRoot) || mc.HasBlock(ctx, parentRoot) {
wsb, err := wrapper.WrappedSignedBeaconBlock(blk)
require.NoError(t, err)
require.NoError(t, beaconDB.SaveBlock(ctx, wsb))
@@ -311,7 +311,7 @@ func TestBlocksFetcher_findFork(t *testing.T) {
for _, blk := range chain2 {
blkRoot, err := blk.Block.HashTreeRoot()
require.NoError(t, err)
require.Equal(t, true, beaconDB.HasBlock(ctx, blkRoot) || mc.HasInitSyncBlock(blkRoot), "slot %d", blk.Block.Slot)
require.Equal(t, true, beaconDB.HasBlock(ctx, blkRoot) || mc.HasBlock(ctx, blkRoot), "slot %d", blk.Block.Slot)
}
}

View File

@@ -1088,7 +1088,7 @@ func TestBlocksQueue_stuckInUnfavourableFork(t *testing.T) {
for _, blk := range chain1[1:] {
parentRoot := bytesutil.ToBytes32(blk.Block.ParentRoot)
// Save block only if parent root is already in database or cache.
if beaconDB.HasBlock(ctx, parentRoot) || mc.HasInitSyncBlock(parentRoot) {
if beaconDB.HasBlock(ctx, parentRoot) || mc.HasBlock(ctx, parentRoot) {
wsb, err := wrapper.WrappedSignedBeaconBlock(blk)
require.NoError(t, err)
require.NoError(t, beaconDB.SaveBlock(ctx, wsb))
@@ -1274,7 +1274,7 @@ func TestBlocksQueue_stuckWhenHeadIsSetToOrphanedBlock(t *testing.T) {
for _, blk := range chain[1:84] {
parentRoot := bytesutil.ToBytes32(blk.Block.ParentRoot)
// Save block only if parent root is already in database or cache.
if beaconDB.HasBlock(ctx, parentRoot) || mc.HasInitSyncBlock(parentRoot) {
if beaconDB.HasBlock(ctx, parentRoot) || mc.HasBlock(ctx, parentRoot) {
wsb, err := wrapper.WrappedSignedBeaconBlock(blk)
require.NoError(t, err)
require.NoError(t, beaconDB.SaveBlock(ctx, wsb))
@@ -1325,7 +1325,7 @@ func TestBlocksQueue_stuckWhenHeadIsSetToOrphanedBlock(t *testing.T) {
if err != nil {
return false
}
if blk.Block().Slot() <= finalizedSlot || (beaconDB.HasBlock(ctx, blkRoot) || mc.HasInitSyncBlock(blkRoot)) {
if blk.Block().Slot() <= finalizedSlot || (beaconDB.HasBlock(ctx, blkRoot) || mc.HasBlock(ctx, blkRoot)) {
return true
}
return false
@@ -1344,7 +1344,7 @@ func TestBlocksQueue_stuckWhenHeadIsSetToOrphanedBlock(t *testing.T) {
}
parentRoot := bytesutil.ToBytes32(blk.Block().ParentRoot())
if !beaconDB.HasBlock(ctx, parentRoot) && !mc.HasInitSyncBlock(parentRoot) {
if !beaconDB.HasBlock(ctx, parentRoot) && !mc.HasBlock(ctx, parentRoot) {
log.Errorf("%v: %#x", errParentDoesNotExist, blk.Block().ParentRoot())
continue
}
@@ -1360,6 +1360,6 @@ func TestBlocksQueue_stuckWhenHeadIsSetToOrphanedBlock(t *testing.T) {
for _, blk := range chain[:orphanedBlock.Block.Slot+32] {
blkRoot, err := blk.Block.HashTreeRoot()
require.NoError(t, err)
require.Equal(t, true, beaconDB.HasBlock(ctx, blkRoot) || mc.HasInitSyncBlock(blkRoot), "slot %d", blk.Block.Slot)
require.Equal(t, true, beaconDB.HasBlock(ctx, blkRoot) || mc.HasBlock(ctx, blkRoot), "slot %d", blk.Block.Slot)
}
}

View File

@@ -233,7 +233,7 @@ func (s *Service) processBlock(
s.logSyncStatus(genesis, blk.Block(), blkRoot)
parentRoot := bytesutil.ToBytes32(blk.Block().ParentRoot())
if !s.cfg.DB.HasBlock(ctx, parentRoot) && !s.cfg.Chain.HasInitSyncBlock(parentRoot) {
if !s.cfg.Chain.HasBlock(ctx, parentRoot) {
return fmt.Errorf("%w: (in processBlock, slot=%d) %#x", errParentDoesNotExist, blk.Block().Slot(), blk.Block().ParentRoot())
}
return blockReceiver(ctx, blk, blkRoot)
@@ -263,7 +263,7 @@ func (s *Service) processBatchedBlocks(ctx context.Context, genesis time.Time,
}
s.logBatchSyncStatus(genesis, blks, blkRoot)
parentRoot := bytesutil.ToBytes32(firstBlock.Block().ParentRoot())
if !s.cfg.DB.HasBlock(ctx, parentRoot) && !s.cfg.Chain.HasInitSyncBlock(parentRoot) {
if !s.cfg.Chain.HasBlock(ctx, parentRoot) {
return fmt.Errorf("%w: %#x (in processBatchedBlocks, slot=%d)", errParentDoesNotExist, firstBlock.Block().ParentRoot(), firstBlock.Block().Slot())
}
blockRoots := make([][32]byte, len(blks))
@@ -309,10 +309,9 @@ func (s *Service) isProcessedBlock(ctx context.Context, blk interfaces.SignedBea
if blk.Block().Slot() <= finalizedSlot {
return true
}
blockExistsInDB := s.cfg.DB.HasBlock(ctx, blkRoot) || s.cfg.Chain.HasInitSyncBlock(blkRoot)
// If block exists in our db and is before or equal to our current head
// we ignore it.
if blockExistsInDB && s.cfg.Chain.HeadSlot() >= blk.Block().Slot() {
if s.cfg.Chain.HasBlock(ctx, blkRoot) && s.cfg.Chain.HeadSlot() >= blk.Block().Slot() {
return true
}
return false

View File

@@ -316,6 +316,7 @@ func TestProcessPendingAtts_HasBlockSaveAggregatedAtt(t *testing.T) {
p2p: p1,
beaconDB: db,
chain: &mock.ChainService{Genesis: time.Now(),
DB: db,
State: beaconState,
FinalizedCheckPoint: &ethpb.Checkpoint{
Root: aggregateAndProof.Aggregate.Data.BeaconBlockRoot,

View File

@@ -369,6 +369,7 @@ func TestValidateAggregateAndProof_CanValidate(t *testing.T) {
beaconDB: db,
initialSync: &mockSync.Sync{IsSyncing: false},
chain: &mock.ChainService{Genesis: time.Now().Add(-oneEpoch()),
DB: db,
State: beaconState,
ValidAttestation: true,
FinalizedCheckPoint: &ethpb.Checkpoint{
@@ -470,6 +471,7 @@ func TestVerifyIndexInCommittee_SeenAggregatorEpoch(t *testing.T) {
beaconDB: db,
initialSync: &mockSync.Sync{IsSyncing: false},
chain: &mock.ChainService{Genesis: time.Now().Add(-oneEpoch()),
DB: db,
ValidatorsRoot: [32]byte{'A'},
State: beaconState,
ValidAttestation: true,

View File

@@ -261,6 +261,5 @@ func (s *Service) setSeenCommitteeIndicesSlot(slot types.Slot, committeeID types
func (s *Service) hasBlockAndState(ctx context.Context, blockRoot [32]byte) bool {
hasStateSummary := s.cfg.beaconDB.HasStateSummary(ctx, blockRoot)
hasState := hasStateSummary || s.cfg.beaconDB.HasState(ctx, blockRoot)
hasBlock := s.cfg.chain.HasInitSyncBlock(blockRoot) || s.cfg.beaconDB.HasBlock(ctx, blockRoot)
return hasState && hasBlock
return hasState && s.cfg.chain.HasBlock(ctx, blockRoot)
}

View File

@@ -38,6 +38,7 @@ func TestService_validateCommitteeIndexBeaconAttestation(t *testing.T) {
Genesis: time.Now().Add(time.Duration(-1*int64(params.BeaconConfig().SecondsPerSlot)) * time.Second),
ValidatorsRoot: [32]byte{'A'},
ValidAttestation: true,
DB: db,
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

View File

@@ -155,7 +155,7 @@ func (s *Service) validateBeaconBlockPubSub(ctx context.Context, pid peer.ID, ms
}
// Handle block when the parent is unknown.
if !s.cfg.beaconDB.HasBlock(ctx, bytesutil.ToBytes32(blk.Block().ParentRoot())) && !s.cfg.chain.HasInitSyncBlock(bytesutil.ToBytes32(blk.Block().ParentRoot())) {
if !s.cfg.chain.HasBlock(ctx, bytesutil.ToBytes32(blk.Block().ParentRoot())) {
s.pendingQueueLock.Lock()
if err := s.insertBlockToPendingQueue(blk.Block().Slot(), blk, blockRoot); err != nil {
s.pendingQueueLock.Unlock()

View File

@@ -172,6 +172,7 @@ func TestValidateBeaconBlockPubSub_CanRecoverStateSummary(t *testing.T) {
Epoch: 0,
Root: make([]byte, 32),
},
DB: db,
}
r := &Service{
cfg: &config{
@@ -236,6 +237,7 @@ func TestValidateBeaconBlockPubSub_IsInCache(t *testing.T) {
Root: make([]byte, 32),
},
InitSyncBlockRoots: map[[32]byte]bool{bRoot: true},
DB: db,
}
r := &Service{
cfg: &config{
@@ -302,6 +304,7 @@ func TestValidateBeaconBlockPubSub_ValidProposerSignature(t *testing.T) {
Epoch: 0,
Root: make([]byte, 32),
},
DB: db,
}
r := &Service{
cfg: &config{
@@ -367,6 +370,7 @@ func TestValidateBeaconBlockPubSub_WithLookahead(t *testing.T) {
stateGen := stategen.New(db)
offset := int64(blkSlot.Mul(params.BeaconConfig().SecondsPerSlot))
chainService := &mock.ChainService{Genesis: time.Unix(time.Now().Unix()-offset, 0),
DB: db,
State: beaconState,
FinalizedCheckPoint: &ethpb.Checkpoint{
Epoch: 0,
@@ -436,6 +440,7 @@ func TestValidateBeaconBlockPubSub_AdvanceEpochsForState(t *testing.T) {
stateGen := stategen.New(db)
offset := int64(blkSlot.Mul(params.BeaconConfig().SecondsPerSlot))
chainService := &mock.ChainService{Genesis: time.Unix(time.Now().Unix()-offset, 0),
DB: db,
State: beaconState,
FinalizedCheckPoint: &ethpb.Checkpoint{
Epoch: 0,
@@ -840,6 +845,7 @@ func TestValidateBeaconBlockPubSub_ParentNotFinalizedDescendant(t *testing.T) {
Root: make([]byte, 32),
},
VerifyBlkDescendantErr: errors.New("not part of finalized chain"),
DB: db,
}
r := &Service{
cfg: &config{
@@ -1107,6 +1113,7 @@ func TestValidateBeaconBlockPubSub_ValidExecutionPayload(t *testing.T) {
stateGen := stategen.New(db)
chainService := &mock.ChainService{Genesis: time.Unix(presentTime-int64(params.BeaconConfig().SecondsPerSlot), 0),
DB: db,
FinalizedCheckPoint: &ethpb.Checkpoint{
Epoch: 0,
Root: make([]byte, 32),
@@ -1180,6 +1187,7 @@ func TestValidateBeaconBlockPubSub_InvalidPayloadTimestamp(t *testing.T) {
stateGen := stategen.New(db)
chainService := &mock.ChainService{Genesis: time.Unix(presentTime-int64(params.BeaconConfig().SecondsPerSlot), 0),
DB: db,
FinalizedCheckPoint: &ethpb.Checkpoint{
Epoch: 0,
Root: make([]byte, 32),
@@ -1341,6 +1349,7 @@ func Test_validateBeaconBlockProcessingWhenParentIsOptimistic(t *testing.T) {
require.NoError(t, err)
chainService := &mock.ChainService{Genesis: time.Unix(int64(beaconState.GenesisTime()), 0),
DB: db,
Optimistic: true,
FinalizedCheckPoint: &ethpb.Checkpoint{
Epoch: 0,