Compare commits

...

13 Commits

Author SHA1 Message Date
james-prysm
1c5ccb6572 fixing comment 2024-11-21 08:59:45 -06:00
james-prysm
1b95d133fd adding comment for not clear process 2024-11-19 17:40:23 -06:00
james-prysm
495788d99b renames in block queue 2024-11-19 17:19:11 -06:00
james-prysm
64115dfdab Merge branch 'develop' into service-renames 2024-11-19 15:15:04 -06:00
james-prysm
cefd271afe feedback 2024-11-18 15:26:50 -06:00
james-prysm
38139d5bf0 Merge branch 'develop' into service-renames 2024-11-18 14:55:39 -06:00
james-prysm
f5f3c5f0a9 Update beacon-chain/sync/pending_attestations_queue.go
Co-authored-by: Sammy Rosso <15244892+saolyn@users.noreply.github.com>
2024-11-18 14:36:25 -06:00
james-prysm
1c56655749 small update to fix redundancy 2024-11-18 10:37:58 -06:00
james-prysm
c5139a674e fixing missed error update 2024-11-18 09:29:06 -06:00
james-prysm
99924f3aa0 Merge branch 'develop' into service-renames 2024-11-15 15:41:24 -06:00
james-prysm
14233c730c missed linting 2024-11-15 15:35:42 -06:00
james-prysm
04a62213f6 more renames 2024-11-15 15:23:26 -06:00
james-prysm
3509752b32 renaming functions and services to better reflect what they do 2024-11-15 14:49:46 -06:00
22 changed files with 250 additions and 266 deletions

View File

@@ -90,7 +90,7 @@ type config struct {
StateGen *stategen.State
SlasherAttestationsFeed *event.Feed
WeakSubjectivityCheckpt *ethpb.Checkpoint
BlockFetcher execution.POWBlockFetcher
ExecutionBlockFetcher execution.BlockFetcher
FinalizedStateAtStartUp state.BeaconState
ExecutionEngineCaller execution.EngineCaller
SyncChecker Checker

View File

@@ -400,15 +400,12 @@ func UpdateCommitteeCache(ctx context.Context, state state.ReadOnlyBeaconState,
return sortedIndices[i] < sortedIndices[j]
})
if err := committeeCache.AddCommitteeShuffledList(ctx, &cache.Committees{
return committeeCache.AddCommitteeShuffledList(ctx, &cache.Committees{
ShuffledIndices: shuffledIndices,
CommitteeCount: uint64(params.BeaconConfig().SlotsPerEpoch.Mul(count)),
Seed: seed,
SortedIndices: sortedIndices,
}); err != nil {
return err
}
return nil
})
}
// UpdateProposerIndicesInCache updates proposer indices entry of the committee cache.

View File

@@ -83,8 +83,8 @@ type ChainInfoFetcher interface {
ExecutionClientConnectionErr() error
}
// POWBlockFetcher defines a struct that can retrieve mainchain blocks.
type POWBlockFetcher interface {
// BlockFetcher defines a struct that can retrieve mainchain blocks.
type BlockFetcher interface {
BlockTimeByHeight(ctx context.Context, height *big.Int) (uint64, error)
BlockByTimestamp(ctx context.Context, time uint64) (*types.HeaderInfo, error)
BlockHashByHeight(ctx context.Context, height *big.Int) (common.Hash, error)
@@ -95,7 +95,7 @@ type POWBlockFetcher interface {
type Chain interface {
ChainStartFetcher
ChainInfoFetcher
POWBlockFetcher
BlockFetcher
}
// RPCClient defines the rpc methods required to interact with the eth1 node.
@@ -206,7 +206,7 @@ func NewService(ctx context.Context, opts ...Option) (*Service, error) {
}
}
eth1Data, err := s.validPowchainData(ctx)
eth1Data, err := s.validExecutionChainData(ctx)
if err != nil {
return nil, errors.Wrap(err, "unable to validate powchain data")
}
@@ -316,6 +316,7 @@ func (s *Service) updateConnectedETH1(state bool) {
s.updateBeaconNodeStats()
}
// TODO: deprecate sometime after Electra
// refers to the latest eth1 block which follows the condition: eth1_timestamp +
// SECONDS_PER_ETH1_BLOCK * ETH1_FOLLOW_DISTANCE <= current_unix_time
func (s *Service) followedBlockHeight(ctx context.Context) (uint64, error) {
@@ -460,6 +461,7 @@ func safelyHandlePanic() {
}
}
// TODO: deprecate sometime after Electra
func (s *Service) handleETH1FollowDistance() {
defer safelyHandlePanic()
ctx := s.ctx
@@ -498,7 +500,7 @@ func (s *Service) handleETH1FollowDistance() {
}
}
func (s *Service) initPOWService() {
func (s *Service) initExecutionChainService() {
// Use a custom logger to only log errors
logCounter := 0
errorLogger := func(err error, msg string) {
@@ -584,7 +586,7 @@ func (s *Service) initPOWService() {
func (s *Service) run(done <-chan struct{}) {
s.runError = nil
s.initPOWService()
s.initExecutionChainService()
// Do not keep storing the finalized state as it is
// no longer of use.
s.removeStartupState()
@@ -810,9 +812,9 @@ func validateDepositContainers(ctrs []*ethpb.DepositContainer) bool {
return true
}
// Validates the current powchain data is saved and makes sure that any
// Validates the current execution chain data is saved and makes sure that any
// embedded genesis state is correctly accounted for.
func (s *Service) validPowchainData(ctx context.Context) (*ethpb.ETH1ChainData, error) {
func (s *Service) validExecutionChainData(ctx context.Context) (*ethpb.ETH1ChainData, error) {
genState, err := s.cfg.beaconDB.GenesisState(ctx)
if err != nil {
return nil, err
@@ -842,11 +844,11 @@ func (s *Service) validPowchainData(ctx context.Context) (*ethpb.ETH1ChainData,
BeaconState: pbState,
DepositContainers: s.cfg.depositCache.AllDepositContainers(ctx),
}
trie, ok := s.depositTrie.(*depositsnapshot.DepositTree)
depositTrie, ok := s.depositTrie.(*depositsnapshot.DepositTree)
if !ok {
return nil, errors.New("deposit trie was not EIP4881 DepositTree")
}
eth1Data.DepositSnapshot, err = trie.ToProto()
eth1Data.DepositSnapshot, err = depositTrie.ToProto()
if err != nil {
return nil, err
}

View File

@@ -40,7 +40,7 @@ import (
var _ ChainStartFetcher = (*Service)(nil)
var _ ChainInfoFetcher = (*Service)(nil)
var _ POWBlockFetcher = (*Service)(nil)
var _ BlockFetcher = (*Service)(nil)
var _ Chain = (*Service)(nil)
type goodLogger struct {
@@ -580,7 +580,7 @@ func TestService_EnsureConsistentPowchainData(t *testing.T) {
assert.NoError(t, genState.SetSlot(1000))
require.NoError(t, s1.cfg.beaconDB.SaveGenesisData(context.Background(), genState))
_, err = s1.validPowchainData(context.Background())
_, err = s1.validExecutionChainData(context.Background())
require.NoError(t, err)
eth1Data, err := s1.cfg.beaconDB.ExecutionChainData(context.Background())
@@ -611,7 +611,7 @@ func TestService_InitializeCorrectly(t *testing.T) {
assert.NoError(t, genState.SetSlot(1000))
require.NoError(t, s1.cfg.beaconDB.SaveGenesisData(context.Background(), genState))
_, err = s1.validPowchainData(context.Background())
_, err = s1.validExecutionChainData(context.Background())
require.NoError(t, err)
eth1Data, err := s1.cfg.beaconDB.ExecutionChainData(context.Background())
@@ -647,7 +647,7 @@ func TestService_EnsureValidPowchainData(t *testing.T) {
DepositContainers: []*ethpb.DepositContainer{{Index: 1}},
})
require.NoError(t, err)
_, err = s1.validPowchainData(context.Background())
_, err = s1.validExecutionChainData(context.Background())
require.NoError(t, err)
eth1Data, err := s1.cfg.beaconDB.ExecutionChainData(context.Background())

View File

@@ -314,9 +314,9 @@ func registerServices(cliCtx *cli.Context, beacon *BeaconNode, synchronizer *sta
return errors.Wrap(err, "could not register Back Fill service")
}
log.Debugln("Registering POW Chain Service")
if err := beacon.registerPOWChainService(); err != nil {
return errors.Wrap(err, "could not register POW chain service")
log.Debugln("Registering Execution Chain Service")
if err := beacon.registerExecutionChainService(); err != nil {
return errors.Wrap(err, "could not register execution chain service")
}
log.Debugln("Registering Attestation Pool Service")
@@ -723,8 +723,8 @@ func (b *BeaconNode) registerAttestationPool() error {
}
func (b *BeaconNode) registerBlockchainService(fc forkchoice.ForkChoicer, gs *startup.ClockSynchronizer, syncComplete chan struct{}) error {
var web3Service *execution.Service
if err := b.services.FetchService(&web3Service); err != nil {
var executionChainService *execution.Service
if err := b.services.FetchService(&executionChainService); err != nil {
return err
}
@@ -739,8 +739,8 @@ func (b *BeaconNode) registerBlockchainService(fc forkchoice.ForkChoicer, gs *st
blockchain.WithForkChoiceStore(fc),
blockchain.WithDatabase(b.db),
blockchain.WithDepositCache(b.depositCache),
blockchain.WithChainStartFetcher(web3Service),
blockchain.WithExecutionEngineCaller(web3Service),
blockchain.WithChainStartFetcher(executionChainService),
blockchain.WithExecutionEngineCaller(executionChainService),
blockchain.WithAttestationPool(b.attestationPool),
blockchain.WithExitPool(b.exitPool),
blockchain.WithSlashingPool(b.slashingsPool),
@@ -766,10 +766,11 @@ func (b *BeaconNode) registerBlockchainService(fc forkchoice.ForkChoicer, gs *st
return b.services.RegisterService(blockchainService)
}
func (b *BeaconNode) registerPOWChainService() error {
func (b *BeaconNode) registerExecutionChainService() error {
if b.cliCtx.Bool(testSkipPowFlag) {
return b.services.RegisterService(&execution.Service{})
}
// TODO: rename POW to execution
bs, err := execution.NewPowchainCollector(b.ctx)
if err != nil {
return err
@@ -792,17 +793,17 @@ func (b *BeaconNode) registerPOWChainService() error {
execution.WithJwtId(b.cliCtx.String(flags.JwtId.Name)),
execution.WithVerifierWaiter(b.verifyInitWaiter),
)
web3Service, err := execution.NewService(b.ctx, opts...)
executionChainService, err := execution.NewService(b.ctx, opts...)
if err != nil {
return errors.Wrap(err, "could not register proof-of-work chain web3Service")
return errors.Wrap(err, "could not register execution chain service")
}
return b.services.RegisterService(web3Service)
return b.services.RegisterService(executionChainService)
}
func (b *BeaconNode) registerSyncService(initialSyncComplete chan struct{}, bFillStore *backfill.Store) error {
var web3Service *execution.Service
if err := b.services.FetchService(&web3Service); err != nil {
var executionChainService *execution.Service
if err := b.services.FetchService(&executionChainService); err != nil {
return err
}
@@ -833,7 +834,7 @@ func (b *BeaconNode) registerSyncService(initialSyncComplete chan struct{}, bFil
regularsync.WithStateGen(b.stateGen),
regularsync.WithSlasherAttestationsFeed(b.slasherAttestationsFeed),
regularsync.WithSlasherBlockHeadersFeed(b.slasherBlockHeadersFeed),
regularsync.WithReconstructor(web3Service),
regularsync.WithReconstructor(executionChainService),
regularsync.WithClockWaiter(b.clockWaiter),
regularsync.WithInitialSyncComplete(initialSyncComplete),
regularsync.WithStateNotifier(b),
@@ -904,8 +905,8 @@ func (b *BeaconNode) registerRPCService(router *http.ServeMux) error {
return err
}
var web3Service *execution.Service
if err := b.services.FetchService(&web3Service); err != nil {
var executionChainService *execution.Service
if err := b.services.FetchService(&executionChainService); err != nil {
return err
}
@@ -933,7 +934,7 @@ func (b *BeaconNode) registerRPCService(router *http.ServeMux) error {
chainStartFetcher = interopService
} else {
depositFetcher = b.depositCache
chainStartFetcher = web3Service
chainStartFetcher = executionChainService
}
host := b.cliCtx.String(flags.RPCHost.Name)
@@ -948,8 +949,8 @@ func (b *BeaconNode) registerRPCService(router *http.ServeMux) error {
p2pService := b.fetchP2P()
rpcService := rpc.NewService(b.ctx, &rpc.Config{
ExecutionEngineCaller: web3Service,
ExecutionReconstructor: web3Service,
ExecutionEngineCaller: executionChainService,
ExecutionReconstructor: executionChainService,
Host: host,
Port: port,
BeaconMonitoringHost: beaconMonitoringHost,
@@ -978,8 +979,8 @@ func (b *BeaconNode) registerRPCService(router *http.ServeMux) error {
SlashingsPool: b.slashingsPool,
BLSChangesPool: b.blsToExecPool,
SyncCommitteeObjectPool: b.syncCommitteePool,
ExecutionChainService: web3Service,
ExecutionChainInfoFetcher: web3Service,
ExecutionChainService: executionChainService,
ExecutionChainInfoFetcher: executionChainService,
ChainStartFetcher: chainStartFetcher,
MockEth1Votes: mockEth1DataVotes,
SyncService: syncService,

View File

@@ -33,7 +33,7 @@ type Server struct {
CanonicalFetcher blockchain.CanonicalFetcher
FinalizationFetcher blockchain.FinalizationFetcher
DepositFetcher cache.DepositFetcher
BlockFetcher execution.POWBlockFetcher
ExecutionBlockFetcher execution.BlockFetcher
GenesisTimeFetcher blockchain.TimeFetcher
StateNotifier statefeed.Notifier
BlockNotifier blockfeed.Notifier

View File

@@ -187,7 +187,7 @@ func TestProposer_PendingDeposits_Electra(t *testing.T) {
bs := &Server{
ChainStartFetcher: p,
Eth1InfoFetcher: p,
Eth1BlockFetcher: p,
ExecutionBlockFetcher: p,
DepositFetcher: depositCache,
PendingDepositsFetcher: depositCache,
BlockReceiver: &mock.ChainService{State: beaconState, Root: blkRoot[:]},

View File

@@ -58,7 +58,7 @@ func (vs *Server) eth1DataMajorityVote(ctx context.Context, beaconState state.Be
return vs.HeadFetcher.HeadETH1Data(), nil
}
lastBlockByLatestValidTime, err := vs.Eth1BlockFetcher.BlockByTimestamp(ctx, latestValidTime)
lastBlockByLatestValidTime, err := vs.ExecutionBlockFetcher.BlockByTimestamp(ctx, latestValidTime)
if err != nil {
log.WithError(err).Error("Could not get last block by latest valid time")
return vs.randomETH1DataVote(ctx)
@@ -73,7 +73,7 @@ func (vs *Server) eth1DataMajorityVote(ctx context.Context, beaconState state.Be
}
if lastBlockDepositCount >= vs.HeadFetcher.HeadETH1Data().DepositCount {
h, err := vs.Eth1BlockFetcher.BlockHashByHeight(ctx, lastBlockByLatestValidTime.Number)
h, err := vs.ExecutionBlockFetcher.BlockHashByHeight(ctx, lastBlockByLatestValidTime.Number)
if err != nil {
log.WithError(err).Error("Could not get hash of last block by latest valid time")
return vs.randomETH1DataVote(ctx)
@@ -118,7 +118,7 @@ func (vs *Server) canonicalEth1Data(
if features.Get().DisableStakinContractCheck && eth1BlockHash == [32]byte{} {
return canonicalEth1Data, new(big.Int).SetInt64(0), nil
}
_, canonicalEth1DataHeight, err := vs.Eth1BlockFetcher.BlockExists(ctx, eth1BlockHash)
_, canonicalEth1DataHeight, err := vs.ExecutionBlockFetcher.BlockExists(ctx, eth1BlockHash)
if err != nil {
return nil, nil, errors.Wrap(err, "could not fetch eth1data height")
}

View File

@@ -224,7 +224,7 @@ func (vs *Server) getTerminalBlockHashIfExists(ctx context.Context, transitionTi
terminalBlockHash := params.BeaconConfig().TerminalBlockHash
// Terminal block hash override takes precedence over terminal total difficulty.
if params.BeaconConfig().TerminalBlockHash != params.BeaconConfig().ZeroHash {
exists, _, err := vs.Eth1BlockFetcher.BlockExists(ctx, terminalBlockHash)
exists, _, err := vs.ExecutionBlockFetcher.BlockExists(ctx, terminalBlockHash)
if err != nil {
return nil, false, err
}

View File

@@ -369,7 +369,7 @@ func TestServer_getTerminalBlockHashIfExists(t *testing.T) {
c := powtesting.New()
c.HashesByHeight[0] = tt.wantTerminalBlockHash
vs := &Server{
Eth1BlockFetcher: c,
ExecutionBlockFetcher: c,
ExecutionEngineCaller: &powtesting.EngineClient{
ExecutionBlock: tt.currentPowBlock,
BlockByHashMap: m,

View File

@@ -276,7 +276,7 @@ func TestServer_GetBeaconBlock_Bellatrix(t *testing.T) {
}
proposerServer := getProposerServer(db, beaconState, parentRoot[:])
proposerServer.Eth1BlockFetcher = c
proposerServer.ExecutionBlockFetcher = c
ed, err := blocks.NewWrappedExecutionData(payload)
require.NoError(t, err)
proposerServer.ExecutionEngineCaller = &mockExecution.EngineClient{
@@ -715,7 +715,7 @@ func getProposerServer(db db.HeadAccessDatabase, headState state.BeaconState, he
BlockReceiver: mockChainService,
ChainStartFetcher: &mockExecution.Chain{},
Eth1InfoFetcher: &mockExecution.Chain{},
Eth1BlockFetcher: &mockExecution.Chain{},
ExecutionBlockFetcher: &mockExecution.Chain{},
FinalizationFetcher: mockChainService,
ForkFetcher: mockChainService,
ForkchoiceFetcher: mockChainService,
@@ -1009,10 +1009,10 @@ func TestProposer_ComputeStateRoot_OK(t *testing.T) {
beaconState, parentRoot, privKeys := util.DeterministicGenesisStateWithGenesisBlock(t, ctx, db, 100)
proposerServer := &Server{
ChainStartFetcher: &mockExecution.Chain{},
Eth1InfoFetcher: &mockExecution.Chain{},
Eth1BlockFetcher: &mockExecution.Chain{},
StateGen: stategen.New(db, doublylinkedtree.New()),
ChainStartFetcher: &mockExecution.Chain{},
Eth1InfoFetcher: &mockExecution.Chain{},
ExecutionBlockFetcher: &mockExecution.Chain{},
StateGen: stategen.New(db, doublylinkedtree.New()),
}
req := util.NewBeaconBlock()
req.Block.ProposerIndex = 84
@@ -1079,11 +1079,11 @@ func TestProposer_PendingDeposits_Eth1DataVoteOK(t *testing.T) {
require.NoError(t, err)
bs := &Server{
ChainStartFetcher: p,
Eth1InfoFetcher: p,
Eth1BlockFetcher: p,
BlockReceiver: &mock.ChainService{State: beaconState, Root: blkRoot[:]},
HeadFetcher: &mock.ChainService{State: beaconState, Root: blkRoot[:]},
ChainStartFetcher: p,
Eth1InfoFetcher: p,
ExecutionBlockFetcher: p,
BlockReceiver: &mock.ChainService{State: beaconState, Root: blkRoot[:]},
HeadFetcher: &mock.ChainService{State: beaconState, Root: blkRoot[:]},
}
// It should also return the recent deposits after their follow window.
@@ -1214,7 +1214,7 @@ func TestProposer_PendingDeposits_OutsideEth1FollowWindow(t *testing.T) {
bs := &Server{
ChainStartFetcher: p,
Eth1InfoFetcher: p,
Eth1BlockFetcher: p,
ExecutionBlockFetcher: p,
DepositFetcher: depositCache,
PendingDepositsFetcher: depositCache,
BlockReceiver: &mock.ChainService{State: beaconState, Root: blkRoot[:]},
@@ -1347,7 +1347,7 @@ func TestProposer_PendingDeposits_FollowsCorrectEth1Block(t *testing.T) {
bs := &Server{
ChainStartFetcher: p,
Eth1InfoFetcher: p,
Eth1BlockFetcher: p,
ExecutionBlockFetcher: p,
DepositFetcher: depositCache,
PendingDepositsFetcher: depositCache,
BlockReceiver: &mock.ChainService{State: beaconState, Root: blkRoot[:]},
@@ -1450,7 +1450,7 @@ func TestProposer_PendingDeposits_CantReturnBelowStateEth1DepositIndex(t *testin
bs := &Server{
ChainStartFetcher: p,
Eth1InfoFetcher: p,
Eth1BlockFetcher: p,
ExecutionBlockFetcher: p,
DepositFetcher: depositCache,
PendingDepositsFetcher: depositCache,
BlockReceiver: &mock.ChainService{State: beaconState, Root: blkRoot[:]},
@@ -1550,7 +1550,7 @@ func TestProposer_PendingDeposits_CantReturnMoreThanMax(t *testing.T) {
bs := &Server{
ChainStartFetcher: p,
Eth1InfoFetcher: p,
Eth1BlockFetcher: p,
ExecutionBlockFetcher: p,
DepositFetcher: depositCache,
PendingDepositsFetcher: depositCache,
BlockReceiver: &mock.ChainService{State: beaconState, Root: blkRoot[:]},
@@ -1650,7 +1650,7 @@ func TestProposer_PendingDeposits_CantReturnMoreThanDepositCount(t *testing.T) {
HeadFetcher: &mock.ChainService{State: beaconState, Root: blkRoot[:]},
ChainStartFetcher: p,
Eth1InfoFetcher: p,
Eth1BlockFetcher: p,
ExecutionBlockFetcher: p,
DepositFetcher: depositCache,
PendingDepositsFetcher: depositCache,
}
@@ -1761,7 +1761,7 @@ func TestProposer_DepositTrie_UtilizesCachedFinalizedDeposits(t *testing.T) {
bs := &Server{
ChainStartFetcher: p,
Eth1InfoFetcher: p,
Eth1BlockFetcher: p,
ExecutionBlockFetcher: p,
DepositFetcher: depositCache,
PendingDepositsFetcher: depositCache,
BlockReceiver: &mock.ChainService{State: beaconState, Root: blkRoot[:]},
@@ -1890,7 +1890,7 @@ func TestProposer_DepositTrie_RebuildTrie(t *testing.T) {
bs := &Server{
ChainStartFetcher: p,
Eth1InfoFetcher: p,
Eth1BlockFetcher: p,
ExecutionBlockFetcher: p,
DepositFetcher: depositCache,
PendingDepositsFetcher: depositCache,
BlockReceiver: &mock.ChainService{State: beaconState, Root: blkRoot[:]},
@@ -1998,12 +1998,11 @@ func TestProposer_Eth1Data_MajorityVote_SpansGenesis(t *testing.T) {
depositCache, err := depositsnapshot.New()
require.NoError(t, err)
ps := &Server{
ChainStartFetcher: p,
Eth1InfoFetcher: p,
Eth1BlockFetcher: p,
BlockFetcher: p,
DepositFetcher: depositCache,
HeadFetcher: &mock.ChainService{ETH1Data: &ethpb.Eth1Data{BlockHash: headBlockHash, DepositCount: 0}},
ChainStartFetcher: p,
Eth1InfoFetcher: p,
ExecutionBlockFetcher: p,
DepositFetcher: depositCache,
HeadFetcher: &mock.ChainService{ETH1Data: &ethpb.Eth1Data{BlockHash: headBlockHash, DepositCount: 0}},
}
beaconState, err := state_native.InitializeFromProtoPhase0(&ethpb.BeaconState{
@@ -2061,12 +2060,11 @@ func TestProposer_Eth1Data_MajorityVote(t *testing.T) {
require.NoError(t, err)
ps := &Server{
ChainStartFetcher: p,
Eth1InfoFetcher: p,
Eth1BlockFetcher: p,
BlockFetcher: p,
DepositFetcher: depositCache,
HeadFetcher: &mock.ChainService{ETH1Data: &ethpb.Eth1Data{DepositCount: 1}},
ChainStartFetcher: p,
Eth1InfoFetcher: p,
ExecutionBlockFetcher: p,
DepositFetcher: depositCache,
HeadFetcher: &mock.ChainService{ETH1Data: &ethpb.Eth1Data{DepositCount: 1}},
}
ctx := context.Background()
@@ -2097,12 +2095,11 @@ func TestProposer_Eth1Data_MajorityVote(t *testing.T) {
require.NoError(t, err)
ps := &Server{
ChainStartFetcher: p,
Eth1InfoFetcher: p,
Eth1BlockFetcher: p,
BlockFetcher: p,
DepositFetcher: depositCache,
HeadFetcher: &mock.ChainService{ETH1Data: &ethpb.Eth1Data{DepositCount: 1}},
ChainStartFetcher: p,
Eth1InfoFetcher: p,
ExecutionBlockFetcher: p,
DepositFetcher: depositCache,
HeadFetcher: &mock.ChainService{ETH1Data: &ethpb.Eth1Data{DepositCount: 1}},
}
ctx := context.Background()
@@ -2133,12 +2130,11 @@ func TestProposer_Eth1Data_MajorityVote(t *testing.T) {
require.NoError(t, err)
ps := &Server{
ChainStartFetcher: p,
Eth1InfoFetcher: p,
Eth1BlockFetcher: p,
BlockFetcher: p,
DepositFetcher: depositCache,
HeadFetcher: &mock.ChainService{ETH1Data: &ethpb.Eth1Data{DepositCount: 1}},
ChainStartFetcher: p,
Eth1InfoFetcher: p,
ExecutionBlockFetcher: p,
DepositFetcher: depositCache,
HeadFetcher: &mock.ChainService{ETH1Data: &ethpb.Eth1Data{DepositCount: 1}},
}
ctx := context.Background()
@@ -2170,12 +2166,11 @@ func TestProposer_Eth1Data_MajorityVote(t *testing.T) {
require.NoError(t, err)
ps := &Server{
ChainStartFetcher: p,
Eth1InfoFetcher: p,
Eth1BlockFetcher: p,
BlockFetcher: p,
DepositFetcher: depositCache,
HeadFetcher: &mock.ChainService{ETH1Data: &ethpb.Eth1Data{DepositCount: 1}},
ChainStartFetcher: p,
Eth1InfoFetcher: p,
ExecutionBlockFetcher: p,
DepositFetcher: depositCache,
HeadFetcher: &mock.ChainService{ETH1Data: &ethpb.Eth1Data{DepositCount: 1}},
}
ctx := context.Background()
@@ -2207,12 +2202,11 @@ func TestProposer_Eth1Data_MajorityVote(t *testing.T) {
require.NoError(t, err)
ps := &Server{
ChainStartFetcher: p,
Eth1InfoFetcher: p,
Eth1BlockFetcher: p,
BlockFetcher: p,
DepositFetcher: depositCache,
HeadFetcher: &mock.ChainService{ETH1Data: &ethpb.Eth1Data{DepositCount: 1}},
ChainStartFetcher: p,
Eth1InfoFetcher: p,
ExecutionBlockFetcher: p,
DepositFetcher: depositCache,
HeadFetcher: &mock.ChainService{ETH1Data: &ethpb.Eth1Data{DepositCount: 1}},
}
ctx := context.Background()
@@ -2244,12 +2238,11 @@ func TestProposer_Eth1Data_MajorityVote(t *testing.T) {
require.NoError(t, err)
ps := &Server{
ChainStartFetcher: p,
Eth1InfoFetcher: p,
Eth1BlockFetcher: p,
BlockFetcher: p,
DepositFetcher: depositCache,
HeadFetcher: &mock.ChainService{ETH1Data: &ethpb.Eth1Data{DepositCount: 1}},
ChainStartFetcher: p,
Eth1InfoFetcher: p,
ExecutionBlockFetcher: p,
DepositFetcher: depositCache,
HeadFetcher: &mock.ChainService{ETH1Data: &ethpb.Eth1Data{DepositCount: 1}},
}
ctx := context.Background()
@@ -2274,12 +2267,11 @@ func TestProposer_Eth1Data_MajorityVote(t *testing.T) {
currentEth1Data := &ethpb.Eth1Data{DepositCount: 1, BlockHash: []byte("current")}
ps := &Server{
ChainStartFetcher: p,
Eth1InfoFetcher: p,
Eth1BlockFetcher: p,
BlockFetcher: p,
DepositFetcher: depositCache,
HeadFetcher: &mock.ChainService{ETH1Data: currentEth1Data},
ChainStartFetcher: p,
Eth1InfoFetcher: p,
ExecutionBlockFetcher: p,
DepositFetcher: depositCache,
HeadFetcher: &mock.ChainService{ETH1Data: currentEth1Data},
}
ctx := context.Background()
@@ -2309,12 +2301,11 @@ func TestProposer_Eth1Data_MajorityVote(t *testing.T) {
require.NoError(t, err)
ps := &Server{
ChainStartFetcher: p,
Eth1InfoFetcher: p,
Eth1BlockFetcher: p,
BlockFetcher: p,
DepositFetcher: depositCache,
HeadFetcher: &mock.ChainService{ETH1Data: &ethpb.Eth1Data{DepositCount: 1}},
ChainStartFetcher: p,
Eth1InfoFetcher: p,
ExecutionBlockFetcher: p,
DepositFetcher: depositCache,
HeadFetcher: &mock.ChainService{ETH1Data: &ethpb.Eth1Data{DepositCount: 1}},
}
ctx := context.Background()
@@ -2339,12 +2330,11 @@ func TestProposer_Eth1Data_MajorityVote(t *testing.T) {
require.NoError(t, err)
ps := &Server{
ChainStartFetcher: p,
Eth1InfoFetcher: p,
Eth1BlockFetcher: p,
BlockFetcher: p,
DepositFetcher: depositCache,
HeadFetcher: &mock.ChainService{ETH1Data: &ethpb.Eth1Data{DepositCount: 1}},
ChainStartFetcher: p,
Eth1InfoFetcher: p,
ExecutionBlockFetcher: p,
DepositFetcher: depositCache,
HeadFetcher: &mock.ChainService{ETH1Data: &ethpb.Eth1Data{DepositCount: 1}},
}
ctx := context.Background()
@@ -2371,12 +2361,11 @@ func TestProposer_Eth1Data_MajorityVote(t *testing.T) {
// Set the deposit count in current eth1data to exceed the latest most recent block's deposit count.
currentEth1Data := &ethpb.Eth1Data{DepositCount: 2, BlockHash: []byte("current")}
ps := &Server{
ChainStartFetcher: p,
Eth1InfoFetcher: p,
Eth1BlockFetcher: p,
BlockFetcher: p,
DepositFetcher: depositCache,
HeadFetcher: &mock.ChainService{ETH1Data: currentEth1Data},
ChainStartFetcher: p,
Eth1InfoFetcher: p,
ExecutionBlockFetcher: p,
DepositFetcher: depositCache,
HeadFetcher: &mock.ChainService{ETH1Data: currentEth1Data},
}
ctx := context.Background()
@@ -2407,12 +2396,11 @@ func TestProposer_Eth1Data_MajorityVote(t *testing.T) {
require.NoError(t, err)
ps := &Server{
ChainStartFetcher: p,
Eth1InfoFetcher: p,
Eth1BlockFetcher: p,
BlockFetcher: p,
DepositFetcher: depositCache,
HeadFetcher: &mock.ChainService{ETH1Data: &ethpb.Eth1Data{DepositCount: 1}},
ChainStartFetcher: p,
Eth1InfoFetcher: p,
ExecutionBlockFetcher: p,
DepositFetcher: depositCache,
HeadFetcher: &mock.ChainService{ETH1Data: &ethpb.Eth1Data{DepositCount: 1}},
}
ctx := context.Background()
@@ -2444,12 +2432,11 @@ func TestProposer_Eth1Data_MajorityVote(t *testing.T) {
require.NoError(t, err)
ps := &Server{
ChainStartFetcher: p,
Eth1InfoFetcher: p,
Eth1BlockFetcher: p,
BlockFetcher: p,
DepositFetcher: depositCache,
HeadFetcher: &mock.ChainService{ETH1Data: &ethpb.Eth1Data{DepositCount: 1}},
ChainStartFetcher: p,
Eth1InfoFetcher: p,
ExecutionBlockFetcher: p,
DepositFetcher: depositCache,
HeadFetcher: &mock.ChainService{ETH1Data: &ethpb.Eth1Data{DepositCount: 1}},
}
ctx := context.Background()
@@ -2475,12 +2462,11 @@ func TestProposer_Eth1Data_MajorityVote(t *testing.T) {
require.NoError(t, err)
ps := &Server{
ChainStartFetcher: p,
Eth1InfoFetcher: p,
Eth1BlockFetcher: p,
BlockFetcher: p,
DepositFetcher: depositCache,
HeadFetcher: &mock.ChainService{ETH1Data: &ethpb.Eth1Data{DepositCount: 1}},
ChainStartFetcher: p,
Eth1InfoFetcher: p,
ExecutionBlockFetcher: p,
DepositFetcher: depositCache,
HeadFetcher: &mock.ChainService{ETH1Data: &ethpb.Eth1Data{DepositCount: 1}},
}
ctx := context.Background()
@@ -2509,12 +2495,11 @@ func TestProposer_Eth1Data_MajorityVote(t *testing.T) {
require.NoError(t, err)
ps := &Server{
ChainStartFetcher: p,
Eth1InfoFetcher: p,
Eth1BlockFetcher: p,
BlockFetcher: p,
DepositFetcher: depositCache,
HeadFetcher: &mock.ChainService{ETH1Data: &ethpb.Eth1Data{DepositCount: 1}},
ChainStartFetcher: p,
Eth1InfoFetcher: p,
ExecutionBlockFetcher: p,
DepositFetcher: depositCache,
HeadFetcher: &mock.ChainService{ETH1Data: &ethpb.Eth1Data{DepositCount: 1}},
}
ctx := context.Background()
@@ -2548,12 +2533,11 @@ func TestProposer_Eth1Data_MajorityVote(t *testing.T) {
require.NoError(t, err)
ps := &Server{
ChainStartFetcher: p,
Eth1InfoFetcher: p,
Eth1BlockFetcher: p,
BlockFetcher: p,
DepositFetcher: depositCache,
HeadFetcher: &mock.ChainService{ETH1Data: &ethpb.Eth1Data{DepositCount: 0}},
ChainStartFetcher: p,
Eth1InfoFetcher: p,
ExecutionBlockFetcher: p,
DepositFetcher: depositCache,
HeadFetcher: &mock.ChainService{ETH1Data: &ethpb.Eth1Data{DepositCount: 0}},
}
ctx := context.Background()
@@ -2755,7 +2739,7 @@ func TestProposer_Deposits_ReturnsEmptyList_IfLatestEth1DataEqGenesisEth1Block(t
HeadFetcher: &mock.ChainService{State: beaconState, Root: blkRoot[:]},
ChainStartFetcher: p,
Eth1InfoFetcher: p,
Eth1BlockFetcher: p,
ExecutionBlockFetcher: p,
DepositFetcher: depositCache,
PendingDepositsFetcher: depositCache,
}
@@ -3057,10 +3041,10 @@ func TestProposer_GetParentHeadState(t *testing.T) {
require.NoError(t, transition.UpdateNextSlotCache(ctx, parentRoot[:], parentState))
proposerServer := &Server{
ChainStartFetcher: &mockExecution.Chain{},
Eth1InfoFetcher: &mockExecution.Chain{},
Eth1BlockFetcher: &mockExecution.Chain{},
StateGen: stategen.New(db, doublylinkedtree.New()),
ChainStartFetcher: &mockExecution.Chain{},
Eth1InfoFetcher: &mockExecution.Chain{},
ExecutionBlockFetcher: &mockExecution.Chain{},
StateGen: stategen.New(db, doublylinkedtree.New()),
}
t.Run("successful reorg", func(tt *testing.T) {
head, err := proposerServer.getParentStateFromReorgData(ctx, 1, parentRoot, parentRoot, headRoot)

View File

@@ -51,7 +51,6 @@ type Server struct {
GenesisFetcher blockchain.GenesisFetcher
FinalizationFetcher blockchain.FinalizationFetcher
TimeFetcher blockchain.TimeFetcher
BlockFetcher execution.POWBlockFetcher
DepositFetcher cache.DepositFetcher
ChainStartFetcher execution.ChainStartFetcher
Eth1InfoFetcher execution.ChainInfoFetcher
@@ -67,7 +66,7 @@ type Server struct {
BlockReceiver blockchain.BlockReceiver
BlobReceiver blockchain.BlobReceiver
MockEth1Votes bool
Eth1BlockFetcher execution.POWBlockFetcher
ExecutionBlockFetcher execution.BlockFetcher
PendingDepositsFetcher depositsnapshot.PendingDepositsFetcher
OperationNotifier opfeed.Notifier
StateGen stategen.StateManager

View File

@@ -74,12 +74,12 @@ func TestWaitForActivation_ValidatorOriginallyExists(t *testing.T) {
s, err := state_native.InitializeFromProtoUnsafePhase0(beaconState)
require.NoError(t, err)
vs := &Server{
Ctx: context.Background(),
ChainStartFetcher: &mockExecution.Chain{},
BlockFetcher: &mockExecution.Chain{},
Eth1InfoFetcher: &mockExecution.Chain{},
DepositFetcher: depositCache,
HeadFetcher: &mockChain.ChainService{State: s, Root: genesisRoot[:]},
Ctx: context.Background(),
ChainStartFetcher: &mockExecution.Chain{},
ExecutionBlockFetcher: &mockExecution.Chain{},
Eth1InfoFetcher: &mockExecution.Chain{},
DepositFetcher: depositCache,
HeadFetcher: &mockChain.ChainService{State: s, Root: genesisRoot[:]},
}
req := &ethpb.ValidatorActivationRequest{
PublicKeys: [][]byte{pubKey1, pubKey2},

View File

@@ -74,12 +74,12 @@ func TestWaitForActivation_ContextClosed(t *testing.T) {
require.NoError(t, err)
vs := &Server{
Ctx: ctx,
ChainStartFetcher: &mockExecution.Chain{},
BlockFetcher: &mockExecution.Chain{},
Eth1InfoFetcher: &mockExecution.Chain{},
DepositFetcher: depositCache,
HeadFetcher: &mockChain.ChainService{State: beaconState, Root: genesisRoot[:]},
Ctx: ctx,
ChainStartFetcher: &mockExecution.Chain{},
ExecutionBlockFetcher: &mockExecution.Chain{},
Eth1InfoFetcher: &mockExecution.Chain{},
DepositFetcher: depositCache,
HeadFetcher: &mockChain.ChainService{State: beaconState, Root: genesisRoot[:]},
}
req := &ethpb.ValidatorActivationRequest{
PublicKeys: [][]byte{pubKey(1)},

View File

@@ -73,11 +73,11 @@ func TestValidatorStatus_Active(t *testing.T) {
},
}
vs := &Server{
ChainStartFetcher: p,
BlockFetcher: p,
Eth1InfoFetcher: p,
DepositFetcher: depositCache,
HeadFetcher: &mockChain.ChainService{State: stateObj, Root: genesisRoot[:]},
ChainStartFetcher: p,
ExecutionBlockFetcher: p,
Eth1InfoFetcher: p,
DepositFetcher: depositCache,
HeadFetcher: &mockChain.ChainService{State: stateObj, Root: genesisRoot[:]},
}
req := &ethpb.ValidatorStatusRequest{
PublicKey: pubkey,

View File

@@ -53,8 +53,8 @@ func TestValidatorStatus_DepositedEth1(t *testing.T) {
stateObj, err := state_native.InitializeFromProtoUnsafePhase0(&ethpb.BeaconState{})
require.NoError(t, err)
vs := &Server{
DepositFetcher: depositCache,
BlockFetcher: p,
DepositFetcher: depositCache,
ExecutionBlockFetcher: p,
HeadFetcher: &mockChain.ChainService{
State: stateObj,
},
@@ -95,8 +95,8 @@ func TestValidatorStatus_Deposited(t *testing.T) {
stateObj, err := state_native.InitializeFromProtoUnsafePhase0(&ethpb.BeaconState{})
require.NoError(t, err)
vs := &Server{
DepositFetcher: depositCache,
BlockFetcher: p,
DepositFetcher: depositCache,
ExecutionBlockFetcher: p,
HeadFetcher: &mockChain.ChainService{
State: stateObj,
},
@@ -148,8 +148,8 @@ func TestValidatorStatus_PartiallyDeposited(t *testing.T) {
})
require.NoError(t, err)
vs := &Server{
DepositFetcher: depositCache,
BlockFetcher: p,
DepositFetcher: depositCache,
ExecutionBlockFetcher: p,
HeadFetcher: &mockChain.ChainService{
State: stateObj,
},
@@ -207,8 +207,8 @@ func TestValidatorStatus_Pending_MultipleDeposits(t *testing.T) {
require.NoError(t, err)
require.NoError(t, stateObj.SetSlot(params.BeaconConfig().SlotsPerEpoch))
vs := &Server{
DepositFetcher: depositCache,
BlockFetcher: p,
DepositFetcher: depositCache,
ExecutionBlockFetcher: p,
HeadFetcher: &mockChain.ChainService{
State: stateObj,
},
@@ -269,11 +269,11 @@ func TestValidatorStatus_Pending(t *testing.T) {
},
}
vs := &Server{
ChainStartFetcher: p,
BlockFetcher: p,
Eth1InfoFetcher: p,
DepositFetcher: depositCache,
HeadFetcher: &mockChain.ChainService{State: st, Root: genesisRoot[:]},
ChainStartFetcher: p,
ExecutionBlockFetcher: p,
Eth1InfoFetcher: p,
DepositFetcher: depositCache,
HeadFetcher: &mockChain.ChainService{State: st, Root: genesisRoot[:]},
}
req := &ethpb.ValidatorStatusRequest{
PublicKey: pubKey,
@@ -331,11 +331,11 @@ func TestValidatorStatus_Exiting(t *testing.T) {
},
}
vs := &Server{
ChainStartFetcher: p,
BlockFetcher: p,
Eth1InfoFetcher: p,
DepositFetcher: depositCache,
HeadFetcher: &mockChain.ChainService{State: stateObj, Root: genesisRoot[:]},
ChainStartFetcher: p,
ExecutionBlockFetcher: p,
Eth1InfoFetcher: p,
DepositFetcher: depositCache,
HeadFetcher: &mockChain.ChainService{State: stateObj, Root: genesisRoot[:]},
}
req := &ethpb.ValidatorStatusRequest{
PublicKey: pubKey,
@@ -390,11 +390,11 @@ func TestValidatorStatus_Slashing(t *testing.T) {
},
}
vs := &Server{
ChainStartFetcher: p,
Eth1InfoFetcher: p,
DepositFetcher: depositCache,
BlockFetcher: p,
HeadFetcher: &mockChain.ChainService{State: stateObj, Root: genesisRoot[:]},
ChainStartFetcher: p,
Eth1InfoFetcher: p,
DepositFetcher: depositCache,
ExecutionBlockFetcher: p,
HeadFetcher: &mockChain.ChainService{State: stateObj, Root: genesisRoot[:]},
}
req := &ethpb.ValidatorStatusRequest{
PublicKey: pubKey,
@@ -448,11 +448,11 @@ func TestValidatorStatus_Exited(t *testing.T) {
},
}
vs := &Server{
ChainStartFetcher: p,
Eth1InfoFetcher: p,
BlockFetcher: p,
DepositFetcher: depositCache,
HeadFetcher: &mockChain.ChainService{State: st, Root: genesisRoot[:]},
ChainStartFetcher: p,
Eth1InfoFetcher: p,
ExecutionBlockFetcher: p,
DepositFetcher: depositCache,
HeadFetcher: &mockChain.ChainService{State: st, Root: genesisRoot[:]},
}
req := &ethpb.ValidatorStatusRequest{
PublicKey: pubKey,
@@ -534,12 +534,12 @@ func TestActivationStatus_OK(t *testing.T) {
assert.NoError(t, depositCache.InsertDeposit(context.Background(), dep, 0, 1, root))
vs := &Server{
Ctx: context.Background(),
ChainStartFetcher: &mockExecution.Chain{},
BlockFetcher: &mockExecution.Chain{},
Eth1InfoFetcher: &mockExecution.Chain{},
DepositFetcher: depositCache,
HeadFetcher: &mockChain.ChainService{State: stateObj, Root: genesisRoot[:]},
Ctx: context.Background(),
ChainStartFetcher: &mockExecution.Chain{},
ExecutionBlockFetcher: &mockExecution.Chain{},
Eth1InfoFetcher: &mockExecution.Chain{},
DepositFetcher: depositCache,
HeadFetcher: &mockChain.ChainService{State: stateObj, Root: genesisRoot[:]},
}
activeExists, response, err := vs.activationStatus(context.Background(), pubKeys)
require.NoError(t, err)
@@ -685,11 +685,11 @@ func TestValidatorStatus_CorrectActivationQueue(t *testing.T) {
},
}
vs := &Server{
ChainStartFetcher: p,
BlockFetcher: p,
Eth1InfoFetcher: p,
DepositFetcher: depositCache,
HeadFetcher: &mockChain.ChainService{State: st, Root: genesisRoot[:]},
ChainStartFetcher: p,
ExecutionBlockFetcher: p,
Eth1InfoFetcher: p,
DepositFetcher: depositCache,
HeadFetcher: &mockChain.ChainService{State: st, Root: genesisRoot[:]},
}
req := &ethpb.ValidatorStatusRequest{
PublicKey: pbKey,
@@ -765,13 +765,13 @@ func TestMultipleValidatorStatus_Pubkeys(t *testing.T) {
assert.NoError(t, depositCache.InsertDeposit(context.Background(), dep, 0, 1, root))
vs := &Server{
Ctx: context.Background(),
ChainStartFetcher: &mockExecution.Chain{},
BlockFetcher: &mockExecution.Chain{},
Eth1InfoFetcher: &mockExecution.Chain{},
DepositFetcher: depositCache,
HeadFetcher: &mockChain.ChainService{State: stateObj, Root: genesisRoot[:]},
SyncChecker: &mockSync.Sync{IsSyncing: false},
Ctx: context.Background(),
ChainStartFetcher: &mockExecution.Chain{},
ExecutionBlockFetcher: &mockExecution.Chain{},
Eth1InfoFetcher: &mockExecution.Chain{},
DepositFetcher: depositCache,
HeadFetcher: &mockChain.ChainService{State: stateObj, Root: genesisRoot[:]},
SyncChecker: &mockSync.Sync{IsSyncing: false},
}
want := []*ethpb.ValidatorStatusResponse{
@@ -860,12 +860,12 @@ func TestMultipleValidatorStatus_Indices(t *testing.T) {
require.NoError(t, err, "Could not get signing root")
vs := &Server{
Ctx: context.Background(),
ChainStartFetcher: &mockExecution.Chain{},
BlockFetcher: &mockExecution.Chain{},
Eth1InfoFetcher: &mockExecution.Chain{},
HeadFetcher: &mockChain.ChainService{State: stateObj, Root: genesisRoot[:]},
SyncChecker: &mockSync.Sync{IsSyncing: false},
Ctx: context.Background(),
ChainStartFetcher: &mockExecution.Chain{},
ExecutionBlockFetcher: &mockExecution.Chain{},
Eth1InfoFetcher: &mockExecution.Chain{},
HeadFetcher: &mockChain.ChainService{State: stateObj, Root: genesisRoot[:]},
SyncChecker: &mockSync.Sync{IsSyncing: false},
}
want := []*ethpb.ValidatorStatusResponse{
@@ -931,8 +931,8 @@ func TestValidatorStatus_Invalid(t *testing.T) {
stateObj, err := state_native.InitializeFromProtoUnsafePhase0(&ethpb.BeaconState{})
require.NoError(t, err)
vs := &Server{
DepositFetcher: depositCache,
BlockFetcher: p,
DepositFetcher: depositCache,
ExecutionBlockFetcher: p,
HeadFetcher: &mockChain.ChainService{
State: stateObj,
},

View File

@@ -240,7 +240,6 @@ func NewService(ctx context.Context, cfg *Config) *Service {
GenesisFetcher: s.cfg.GenesisFetcher,
FinalizationFetcher: s.cfg.FinalizationFetcher,
TimeFetcher: s.cfg.GenesisTimeFetcher,
BlockFetcher: s.cfg.ExecutionChainService,
DepositFetcher: s.cfg.DepositFetcher,
ChainStartFetcher: s.cfg.ChainStartFetcher,
Eth1InfoFetcher: s.cfg.ExecutionChainService,
@@ -253,7 +252,7 @@ func NewService(ctx context.Context, cfg *Config) *Service {
BlockReceiver: s.cfg.BlockReceiver,
BlobReceiver: s.cfg.BlobReceiver,
MockEth1Votes: s.cfg.MockEth1Votes,
Eth1BlockFetcher: s.cfg.ExecutionChainService,
ExecutionBlockFetcher: s.cfg.ExecutionChainService,
PendingDepositsFetcher: s.cfg.PendingDepositFetcher,
SlashingsPool: s.cfg.SlashingsPool,
StateGen: s.cfg.StateGen,
@@ -294,7 +293,7 @@ func NewService(ctx context.Context, cfg *Config) *Service {
CanonicalFetcher: s.cfg.CanonicalFetcher,
ChainStartFetcher: s.cfg.ChainStartFetcher,
DepositFetcher: s.cfg.DepositFetcher,
BlockFetcher: s.cfg.ExecutionChainService,
ExecutionBlockFetcher: s.cfg.ExecutionChainService,
GenesisTimeFetcher: s.cfg.GenesisTimeFetcher,
StateNotifier: s.cfg.StateNotifier,
BlockNotifier: s.cfg.BlockNotifier,

View File

@@ -164,8 +164,8 @@ func newBlocksFetcher(ctx context.Context, cfg *blocksFetcherConfig) *blocksFetc
}
}
// This specifies the block batch limit the initial sync fetcher will use. In the event the user has provided
// and excessive number, this is automatically lowered.
// This specifies the block batch limit the initial sync fetcher will use.
// In the event the user has provided an excessive number, this is automatically lowered.
func maxBatchLimit() int {
currLimit := flags.Get().BlockBatchLimit
maxLimit := params.BeaconConfig().MaxRequestBlocks

View File

@@ -31,7 +31,7 @@ func (s *Service) processPendingAttsQueue() {
mutex := new(sync.Mutex)
async.RunEvery(s.ctx, processPendingAttsPeriod, func() {
mutex.Lock()
if err := s.processPendingAtts(s.ctx); err != nil {
if err := s.processPendingAttsByBlkRoot(s.ctx); err != nil {
log.WithError(err).Debugf("Could not process pending attestation: %v", err)
}
mutex.Unlock()
@@ -42,8 +42,8 @@ func (s *Service) processPendingAttsQueue() {
// 1. Clean up invalid pending attestations from the queue.
// 2. Check if pending attestations can be processed when the block has arrived.
// 3. Request block from a random peer if unable to proceed step 2.
func (s *Service) processPendingAtts(ctx context.Context) error {
ctx, span := trace.StartSpan(ctx, "processPendingAtts")
func (s *Service) processPendingAttsByBlkRoot(ctx context.Context) error {
ctx, span := trace.StartSpan(ctx, "processPendingAttsByBlkRoot")
defer span.End()
// Before a node processes pending attestations queue, it verifies
@@ -66,7 +66,7 @@ func (s *Service) processPendingAtts(ctx context.Context) error {
s.pendingAttsLock.RUnlock()
// has the pending attestation's missing block arrived and the node processed block yet?
if s.cfg.beaconDB.HasBlock(ctx, bRoot) && (s.cfg.beaconDB.HasState(ctx, bRoot) || s.cfg.beaconDB.HasStateSummary(ctx, bRoot)) {
s.processAttestations(ctx, attestations)
s.processPendingAttestations(ctx, attestations)
log.WithFields(logrus.Fields{
"blockRoot": hex.EncodeToString(bytesutil.Trunc(bRoot[:])),
"pendingAttsCount": len(attestations),
@@ -88,7 +88,7 @@ func (s *Service) processPendingAtts(ctx context.Context) error {
return s.sendBatchRootRequest(ctx, pendingRoots, randGen)
}
func (s *Service) processAttestations(ctx context.Context, attestations []ethpb.SignedAggregateAttAndProof) {
func (s *Service) processPendingAttestations(ctx context.Context, attestations []ethpb.SignedAggregateAttAndProof) {
for _, signedAtt := range attestations {
aggregate := signedAtt.AggregateAttestationAndProof().AggregateVal()
data := aggregate.GetData()
@@ -102,7 +102,7 @@ func (s *Service) processAttestations(ctx context.Context, attestations []ethpb.
log.WithError(err).Debug("Pending aggregated attestation failed validation")
}
aggValid := pubsub.ValidationAccept == valRes
if s.validateBlockInAttestation(ctx, signedAtt) && aggValid {
if s.checkBlockOrQueueAtt(ctx, signedAtt) && aggValid {
if err := s.cfg.attPool.SaveAggregatedAttestation(aggregate); err != nil {
log.WithError(err).Debug("Could not save aggregate attestation")
continue

View File

@@ -52,7 +52,7 @@ func TestProcessPendingAtts_NoBlockRequestBlock(t *testing.T) {
a := &ethpb.AggregateAttestationAndProof{Aggregate: &ethpb.Attestation{Data: &ethpb.AttestationData{Target: &ethpb.Checkpoint{Root: make([]byte, 32)}}}}
r.blkRootToPendingAtts[[32]byte{'A'}] = []ethpb.SignedAggregateAttAndProof{&ethpb.SignedAggregateAttestationAndProof{Message: a}}
require.NoError(t, r.processPendingAtts(context.Background()))
require.NoError(t, r.processPendingAttsByBlkRoot(context.Background()))
require.LogsContain(t, hook, "Requesting block by root")
}
@@ -135,7 +135,7 @@ func TestProcessPendingAtts_HasBlockSaveUnAggregatedAtt(t *testing.T) {
require.NoError(t, r.cfg.beaconDB.SaveState(context.Background(), s, root))
r.blkRootToPendingAtts[root] = []ethpb.SignedAggregateAttAndProof{&ethpb.SignedAggregateAttestationAndProof{Message: aggregateAndProof, Signature: aggreSig}}
require.NoError(t, r.processPendingAtts(context.Background()))
require.NoError(t, r.processPendingAttsByBlkRoot(context.Background()))
atts, err := r.cfg.attPool.UnaggregatedAttestations()
require.NoError(t, err)
@@ -183,7 +183,7 @@ func TestProcessPendingAtts_NoBroadcastWithBadSignature(t *testing.T) {
require.NoError(t, r.cfg.beaconDB.SaveState(context.Background(), s, r32))
r.blkRootToPendingAtts[r32] = []ethpb.SignedAggregateAttAndProof{&ethpb.SignedAggregateAttestationAndProof{Message: a, Signature: make([]byte, fieldparams.BLSSignatureLength)}}
require.NoError(t, r.processPendingAtts(context.Background()))
require.NoError(t, r.processPendingAttsByBlkRoot(context.Background()))
assert.Equal(t, false, p1.BroadcastCalled.Load(), "Broadcasted bad aggregate")
// Clear pool.
@@ -252,7 +252,7 @@ func TestProcessPendingAtts_NoBroadcastWithBadSignature(t *testing.T) {
go r.verifierRoutine()
r.blkRootToPendingAtts[r32] = []ethpb.SignedAggregateAttAndProof{&ethpb.SignedAggregateAttestationAndProof{Message: aggregateAndProof, Signature: aggreSig}}
require.NoError(t, r.processPendingAtts(context.Background()))
require.NoError(t, r.processPendingAttsByBlkRoot(context.Background()))
assert.Equal(t, true, p1.BroadcastCalled.Load(), "Could not broadcast the good aggregate")
cancel()
@@ -340,7 +340,7 @@ func TestProcessPendingAtts_HasBlockSaveAggregatedAtt(t *testing.T) {
require.NoError(t, r.cfg.beaconDB.SaveState(context.Background(), s, root))
r.blkRootToPendingAtts[root] = []ethpb.SignedAggregateAttAndProof{&ethpb.SignedAggregateAttestationAndProof{Message: aggregateAndProof, Signature: aggreSig}}
require.NoError(t, r.processPendingAtts(context.Background()))
require.NoError(t, r.processPendingAttsByBlkRoot(context.Background()))
assert.Equal(t, 1, len(r.cfg.attPool.AggregatedAttestations()), "Did not save aggregated att")
assert.DeepEqual(t, att, r.cfg.attPool.AggregatedAttestations()[0], "Incorrect saved att")

View File

@@ -59,9 +59,9 @@ func (s *Service) processPendingBlocks(ctx context.Context) error {
// Remove old blocks from our expiration cache.
s.deleteExpiredBlocksFromCache()
// Validate pending slots before processing.
if err := s.validatePendingSlots(); err != nil {
return errors.Wrap(err, "could not validate pending slots")
// remove expired pending blocks that are before the finalized checkpoint.
if err := s.removeExpiredPendingBlocksBySlot(); err != nil {
return errors.Wrap(err, "could not remove expired pending blocks before finalized checkpoint")
}
// Sort slots for ordered processing.
@@ -82,14 +82,14 @@ func (s *Service) processPendingBlocks(ctx context.Context) error {
ctx, span := startInnerSpan(ctx, slot)
// Get blocks in the pending queue for the current slot.
blocksInCache := s.getBlocksInQueue(slot)
if len(blocksInCache) == 0 {
blocksInQueue := s.getBlocksInQueue(slot)
if len(blocksInQueue) == 0 {
span.End()
continue
}
// Process each block in the queue.
for _, b := range blocksInCache {
for _, b := range blocksInQueue {
if err := blocks.BeaconBlockIsNil(b); err != nil {
continue
}
@@ -154,6 +154,7 @@ func (s *Service) processPendingBlocks(ctx context.Context) error {
}
span.End()
}
// request missing parent blocks
return s.sendBatchRootRequest(ctx, parentRoots, randGen)
}
@@ -355,10 +356,10 @@ func (s *Service) sortedPendingSlots() []primitives.Slot {
return ss
}
// validatePendingSlots validates the pending blocks
// removeExpiredPendingBlocksBySlot validates the pending blocks
// by their slot. If they are before the current finalized
// checkpoint, these blocks are removed from the queue.
func (s *Service) validatePendingSlots() error {
func (s *Service) removeExpiredPendingBlocksBySlot() error {
s.pendingQueueLock.Lock()
defer s.pendingQueueLock.Unlock()
oldBlockRoots := make(map[[32]byte]bool)

View File

@@ -117,7 +117,7 @@ func (s *Service) validateAggregateAndProof(ctx context.Context, pid peer.ID, ms
if seen {
return pubsub.ValidationIgnore, nil
}
if !s.validateBlockInAttestation(ctx, m) {
if !s.checkBlockOrQueueAtt(ctx, m) {
return pubsub.ValidationIgnore, nil
}
@@ -222,7 +222,8 @@ func (s *Service) validateAggregatedAtt(ctx context.Context, signed ethpb.Signed
return s.validateWithBatchVerifier(ctx, "aggregate", set)
}
func (s *Service) validateBlockInAttestation(ctx context.Context, satt ethpb.SignedAggregateAttAndProof) bool {
// checkBlockOrQueueAtt validates the block root from the pending attestation, if it's not there it queues the pending attestation
func (s *Service) checkBlockOrQueueAtt(ctx context.Context, satt ethpb.SignedAggregateAttAndProof) bool {
// Verify the block being voted and the processed state is in beaconDB. The block should have passed validation if it's in the beaconDB.
blockRoot := bytesutil.ToBytes32(satt.AggregateAttestationAndProof().AggregateVal().GetData().BeaconBlockRoot)
if !s.hasBlockAndState(ctx, blockRoot) {