Embed Config Pattern For Attestation, POW Chain, and RPC Services (#8635)

This commit is contained in:
kevlu93
2021-03-21 17:58:41 +01:00
committed by GitHub
parent 9421ac13d8
commit 4a64d4d133
14 changed files with 208 additions and 297 deletions

View File

@@ -29,13 +29,9 @@ var _ powchain.ChainStartFetcher = (*Service)(nil)
// Service spins up an client interoperability service that handles responsibilities such
// as kickstarting a genesis state for the beacon node from cli flags or a genesis.ssz file.
type Service struct {
cfg *Config
ctx context.Context
cancel context.CancelFunc
genesisTime uint64
numValidators uint64
beaconDB db.HeadAccessDatabase
depositCache *depositcache.DepositCache
genesisPath string
chainStartDeposits []*ethpb.Deposit
}
@@ -56,17 +52,13 @@ func NewService(ctx context.Context, cfg *Config) *Service {
ctx, cancel := context.WithCancel(ctx)
s := &Service{
ctx: ctx,
cancel: cancel,
genesisTime: cfg.GenesisTime,
numValidators: cfg.NumValidators,
beaconDB: cfg.BeaconDB,
depositCache: cfg.DepositCache,
genesisPath: cfg.GenesisPath,
cfg: cfg,
ctx: ctx,
cancel: cancel,
}
if s.genesisPath != "" {
data, err := ioutil.ReadFile(s.genesisPath)
if s.cfg.GenesisPath != "" {
data, err := ioutil.ReadFile(s.cfg.GenesisPath)
if err != nil {
log.Fatalf("Could not read pre-loaded state: %v", err)
}
@@ -85,7 +77,7 @@ func NewService(ctx context.Context, cfg *Config) *Service {
}
// Save genesis state in db
genesisState, _, err := interop.GenerateGenesisState(s.genesisTime, s.numValidators)
genesisState, _, err := interop.GenerateGenesisState(s.cfg.GenesisTime, s.cfg.NumValidators)
if err != nil {
log.Fatalf("Could not generate interop genesis state: %v", err)
}
@@ -93,15 +85,15 @@ func NewService(ctx context.Context, cfg *Config) *Service {
if err != nil {
log.Fatalf("Could not get state trie: %v", err)
}
if s.genesisTime == 0 {
if s.cfg.GenesisTime == 0 {
// Generated genesis time; fetch it
s.genesisTime = genesisTrie.GenesisTime()
s.cfg.GenesisTime = genesisTrie.GenesisTime()
}
gRoot, err := genesisTrie.HashTreeRoot(s.ctx)
if err != nil {
log.Fatalf("Could not hash tree root genesis state: %v", err)
}
go slotutil.CountdownToGenesis(ctx, time.Unix(int64(s.genesisTime), 0), s.numValidators, gRoot)
go slotutil.CountdownToGenesis(ctx, time.Unix(int64(s.cfg.GenesisTime), 0), s.cfg.NumValidators, gRoot)
if err := s.saveGenesisState(ctx, genesisTrie); err != nil {
log.Fatalf("Could not save interop genesis state %v", err)
@@ -170,7 +162,7 @@ func (s *Service) NonFinalizedDeposits(_ context.Context, _ *big.Int) []*ethpb.D
}
func (s *Service) saveGenesisState(ctx context.Context, genesisState iface.BeaconState) error {
if err := s.beaconDB.SaveGenesisData(ctx, genesisState); err != nil {
if err := s.cfg.BeaconDB.SaveGenesisData(ctx, genesisState); err != nil {
return err
}

View File

@@ -33,6 +33,6 @@ var (
)
func (s *Service) updateMetrics() {
aggregatedAttsCount.Set(float64(s.pool.AggregatedAttestationCount()))
unaggregatedAttsCount.Set(float64(s.pool.UnaggregatedAttestationCount()))
aggregatedAttsCount.Set(float64(s.cfg.Pool.AggregatedAttestationCount()))
unaggregatedAttsCount.Set(float64(s.cfg.Pool.UnaggregatedAttestationCount()))
}

View File

@@ -43,11 +43,11 @@ func (s *Service) batchForkChoiceAtts(ctx context.Context) error {
ctx, span := trace.StartSpan(ctx, "Operations.attestations.batchForkChoiceAtts")
defer span.End()
if err := s.pool.AggregateUnaggregatedAttestations(ctx); err != nil {
if err := s.cfg.Pool.AggregateUnaggregatedAttestations(ctx); err != nil {
return err
}
atts := append(s.pool.AggregatedAttestations(), s.pool.BlockAttestations()...)
atts = append(atts, s.pool.ForkchoiceAttestations()...)
atts := append(s.cfg.Pool.AggregatedAttestations(), s.cfg.Pool.BlockAttestations()...)
atts = append(atts, s.cfg.Pool.ForkchoiceAttestations()...)
attsByDataRoot := make(map[[32]byte][]*ethpb.Attestation, len(atts))
@@ -74,8 +74,8 @@ func (s *Service) batchForkChoiceAtts(ctx context.Context) error {
}
}
for _, a := range s.pool.BlockAttestations() {
if err := s.pool.DeleteBlockAttestation(a); err != nil {
for _, a := range s.cfg.Pool.BlockAttestations() {
if err := s.cfg.Pool.DeleteBlockAttestation(a); err != nil {
return err
}
}
@@ -95,7 +95,7 @@ func (s *Service) aggregateAndSaveForkChoiceAtts(atts []*ethpb.Attestation) erro
return err
}
return s.pool.SaveForkchoiceAttestations(aggregatedAtts)
return s.cfg.Pool.SaveForkchoiceAttestations(aggregatedAtts)
}
// This checks if the attestation has previously been aggregated for fork choice

View File

@@ -86,9 +86,9 @@ func TestBatchAttestations_Multiple(t *testing.T) {
Source: &ethpb.Checkpoint{Root: mockRoot[:]},
Target: &ethpb.Checkpoint{Root: mockRoot[:]}}, AggregationBits: bitfield.Bitlist{0b100011}, Signature: sig.Marshal()}, // Duplicated
}
require.NoError(t, s.pool.SaveUnaggregatedAttestations(unaggregatedAtts))
require.NoError(t, s.pool.SaveAggregatedAttestations(aggregatedAtts))
require.NoError(t, s.pool.SaveBlockAttestations(blockAtts))
require.NoError(t, s.cfg.Pool.SaveUnaggregatedAttestations(unaggregatedAtts))
require.NoError(t, s.cfg.Pool.SaveAggregatedAttestations(aggregatedAtts))
require.NoError(t, s.cfg.Pool.SaveBlockAttestations(blockAtts))
require.NoError(t, s.batchForkChoiceAtts(context.Background()))
wanted, err := attaggregation.Aggregate([]*ethpb.Attestation{aggregatedAtts[0], blockAtts[0]})
@@ -100,8 +100,8 @@ func TestBatchAttestations_Multiple(t *testing.T) {
require.NoError(t, err)
wanted = append(wanted, aggregated...)
require.NoError(t, s.pool.AggregateUnaggregatedAttestations(context.Background()))
received := s.pool.ForkchoiceAttestations()
require.NoError(t, s.cfg.Pool.AggregateUnaggregatedAttestations(context.Background()))
received := s.cfg.Pool.ForkchoiceAttestations()
sort.Slice(received, func(i, j int) bool {
return received[i].Data.Slot < received[j].Data.Slot
@@ -140,9 +140,9 @@ func TestBatchAttestations_Single(t *testing.T) {
{Data: d, AggregationBits: bitfield.Bitlist{0b100010}, Signature: sig.Marshal()},
{Data: d, AggregationBits: bitfield.Bitlist{0b110010}, Signature: sig.Marshal()}, // Duplicated
}
require.NoError(t, s.pool.SaveUnaggregatedAttestations(unaggregatedAtts))
require.NoError(t, s.pool.SaveAggregatedAttestations(aggregatedAtts))
require.NoError(t, s.pool.SaveBlockAttestations(blockAtts))
require.NoError(t, s.cfg.Pool.SaveUnaggregatedAttestations(unaggregatedAtts))
require.NoError(t, s.cfg.Pool.SaveAggregatedAttestations(aggregatedAtts))
require.NoError(t, s.cfg.Pool.SaveBlockAttestations(blockAtts))
require.NoError(t, s.batchForkChoiceAtts(context.Background()))
wanted, err := attaggregation.Aggregate(append(aggregatedAtts, unaggregatedAtts...))
@@ -151,7 +151,7 @@ func TestBatchAttestations_Single(t *testing.T) {
wanted, err = attaggregation.Aggregate(append(wanted, blockAtts...))
require.NoError(t, err)
got := s.pool.ForkchoiceAttestations()
got := s.cfg.Pool.ForkchoiceAttestations()
assert.DeepEqual(t, wanted, got)
}
@@ -176,7 +176,7 @@ func TestAggregateAndSaveForkChoiceAtts_Single(t *testing.T) {
wanted, err := attaggregation.Aggregate(atts)
require.NoError(t, err)
assert.DeepEqual(t, wanted, s.pool.ForkchoiceAttestations())
assert.DeepEqual(t, wanted, s.cfg.Pool.ForkchoiceAttestations())
}
func TestAggregateAndSaveForkChoiceAtts_Multiple(t *testing.T) {
@@ -223,7 +223,7 @@ func TestAggregateAndSaveForkChoiceAtts_Multiple(t *testing.T) {
wanted = append(wanted, aggregated...)
wanted = append(wanted, att3...)
received := s.pool.ForkchoiceAttestations()
received := s.cfg.Pool.ForkchoiceAttestations()
sort.Slice(received, func(i, j int) bool {
return received[i].Data.Slot < received[j].Data.Slot
})

View File

@@ -10,7 +10,7 @@ import (
// pruneAttsPool prunes attestations pool on every slot interval.
func (s *Service) pruneAttsPool() {
ticker := time.NewTicker(s.pruneInterval)
ticker := time.NewTicker(s.cfg.pruneInterval)
defer ticker.Stop()
for {
select {
@@ -26,37 +26,37 @@ func (s *Service) pruneAttsPool() {
// This prunes expired attestations from the pool.
func (s *Service) pruneExpiredAtts() {
aggregatedAtts := s.pool.AggregatedAttestations()
aggregatedAtts := s.cfg.Pool.AggregatedAttestations()
for _, att := range aggregatedAtts {
if s.expired(att.Data.Slot) {
if err := s.pool.DeleteAggregatedAttestation(att); err != nil {
if err := s.cfg.Pool.DeleteAggregatedAttestation(att); err != nil {
log.WithError(err).Error("Could not delete expired aggregated attestation")
}
expiredAggregatedAtts.Inc()
}
}
if _, err := s.pool.DeleteSeenUnaggregatedAttestations(); err != nil {
if _, err := s.cfg.Pool.DeleteSeenUnaggregatedAttestations(); err != nil {
log.WithError(err).Error("Cannot delete seen attestations")
}
unAggregatedAtts, err := s.pool.UnaggregatedAttestations()
unAggregatedAtts, err := s.cfg.Pool.UnaggregatedAttestations()
if err != nil {
log.WithError(err).Error("Could not get unaggregated attestations")
return
}
for _, att := range unAggregatedAtts {
if s.expired(att.Data.Slot) {
if err := s.pool.DeleteUnaggregatedAttestation(att); err != nil {
if err := s.cfg.Pool.DeleteUnaggregatedAttestation(att); err != nil {
log.WithError(err).Error("Could not delete expired unaggregated attestation")
}
expiredUnaggregatedAtts.Inc()
}
}
blockAtts := s.pool.BlockAttestations()
blockAtts := s.cfg.Pool.BlockAttestations()
for _, att := range blockAtts {
if s.expired(att.Data.Slot) {
if err := s.pool.DeleteBlockAttestation(att); err != nil {
if err := s.cfg.Pool.DeleteBlockAttestation(att); err != nil {
log.WithError(err).Error("Could not delete expired block attestation")
}
}

View File

@@ -33,15 +33,15 @@ func TestPruneExpired_Ticker(t *testing.T) {
{Data: ad1, AggregationBits: bitfield.Bitlist{0b1000, 0b1}, Signature: make([]byte, 96)},
{Data: ad2, AggregationBits: bitfield.Bitlist{0b1000, 0b1}, Signature: make([]byte, 96)},
}
require.NoError(t, s.pool.SaveUnaggregatedAttestations(atts))
require.Equal(t, 2, s.pool.UnaggregatedAttestationCount(), "Unexpected number of attestations")
require.NoError(t, s.cfg.Pool.SaveUnaggregatedAttestations(atts))
require.Equal(t, 2, s.cfg.Pool.UnaggregatedAttestationCount(), "Unexpected number of attestations")
atts = []*ethpb.Attestation{
{Data: ad1, AggregationBits: bitfield.Bitlist{0b1101, 0b1}, Signature: make([]byte, 96)},
{Data: ad2, AggregationBits: bitfield.Bitlist{0b1101, 0b1}, Signature: make([]byte, 96)},
}
require.NoError(t, s.pool.SaveAggregatedAttestations(atts))
assert.Equal(t, 2, s.pool.AggregatedAttestationCount())
require.NoError(t, s.pool.SaveBlockAttestations(atts))
require.NoError(t, s.cfg.Pool.SaveAggregatedAttestations(atts))
assert.Equal(t, 2, s.cfg.Pool.AggregatedAttestationCount())
require.NoError(t, s.cfg.Pool.SaveBlockAttestations(atts))
// Rewind back one epoch worth of time.
s.genesisTime = uint64(timeutils.Now().Unix()) - uint64(params.BeaconConfig().SlotsPerEpoch.Mul(params.BeaconConfig().SecondsPerSlot))
@@ -50,24 +50,24 @@ func TestPruneExpired_Ticker(t *testing.T) {
done := make(chan struct{}, 1)
runutil.RunEvery(ctx, 500*time.Millisecond, func() {
atts, err := s.pool.UnaggregatedAttestations()
atts, err := s.cfg.Pool.UnaggregatedAttestations()
require.NoError(t, err)
for _, attestation := range atts {
if attestation.Data.Slot == 0 {
return
}
}
for _, attestation := range s.pool.AggregatedAttestations() {
for _, attestation := range s.cfg.Pool.AggregatedAttestations() {
if attestation.Data.Slot == 0 {
return
}
}
for _, attestation := range s.pool.BlockAttestations() {
for _, attestation := range s.cfg.Pool.BlockAttestations() {
if attestation.Data.Slot == 0 {
return
}
}
if s.pool.UnaggregatedAttestationCount() != 1 || s.pool.AggregatedAttestationCount() != 1 {
if s.cfg.Pool.UnaggregatedAttestationCount() != 1 || s.cfg.Pool.AggregatedAttestationCount() != 1 {
return
}
done <- struct{}{}
@@ -93,20 +93,20 @@ func TestPruneExpired_PruneExpiredAtts(t *testing.T) {
att3 := &ethpb.Attestation{Data: ad2, AggregationBits: bitfield.Bitlist{0b1101}}
att4 := &ethpb.Attestation{Data: ad2, AggregationBits: bitfield.Bitlist{0b1110}}
atts := []*ethpb.Attestation{att1, att2, att3, att4}
require.NoError(t, s.pool.SaveAggregatedAttestations(atts))
require.NoError(t, s.pool.SaveBlockAttestations(atts))
require.NoError(t, s.cfg.Pool.SaveAggregatedAttestations(atts))
require.NoError(t, s.cfg.Pool.SaveBlockAttestations(atts))
// Rewind back one epoch worth of time.
s.genesisTime = uint64(timeutils.Now().Unix()) - uint64(params.BeaconConfig().SlotsPerEpoch.Mul(params.BeaconConfig().SecondsPerSlot))
s.pruneExpiredAtts()
// All the attestations on slot 0 should be pruned.
for _, attestation := range s.pool.AggregatedAttestations() {
for _, attestation := range s.cfg.Pool.AggregatedAttestations() {
if attestation.Data.Slot == 0 {
t.Error("Should be pruned")
}
}
for _, attestation := range s.pool.BlockAttestations() {
for _, attestation := range s.cfg.Pool.BlockAttestations() {
if attestation.Data.Slot == 0 {
t.Error("Should be pruned")
}

View File

@@ -15,13 +15,12 @@ var forkChoiceProcessedRootsSize = 1 << 16
// Service of attestation pool operations.
type Service struct {
cfg *Config
ctx context.Context
cancel context.CancelFunc
pool Pool
err error
forkChoiceProcessedRoots *lru.Cache
genesisTime uint64
pruneInterval time.Duration
}
// Config options for the service.
@@ -38,19 +37,17 @@ func NewService(ctx context.Context, cfg *Config) (*Service, error) {
return nil, err
}
pruneInterval := cfg.pruneInterval
if pruneInterval == 0 {
if cfg.pruneInterval == 0 {
// Prune expired attestations from the pool every slot interval.
pruneInterval = time.Duration(params.BeaconConfig().SecondsPerSlot) * time.Second
cfg.pruneInterval = time.Duration(params.BeaconConfig().SecondsPerSlot) * time.Second
}
ctx, cancel := context.WithCancel(ctx)
return &Service{
cfg: cfg,
ctx: ctx,
cancel: cancel,
pool: cfg.Pool,
forkChoiceProcessedRoots: cache,
pruneInterval: pruneInterval,
}, nil
}

View File

@@ -22,7 +22,7 @@ var endpoint = "http://127.0.0.1"
func setDefaultMocks(service *Service) *Service {
service.eth1DataFetcher = &goodFetcher{}
service.httpLogger = &goodLogger{}
service.stateNotifier = &goodNotifier{}
service.cfg.StateNotifier = &goodNotifier{}
return service
}

View File

@@ -53,7 +53,7 @@ func (s *Service) Eth2GenesisPowchainInfo() (uint64, *big.Int) {
func (s *Service) ProcessETH1Block(ctx context.Context, blkNum *big.Int) error {
query := ethereum.FilterQuery{
Addresses: []common.Address{
s.depositContractAddress,
s.cfg.DepositContract,
},
FromBlock: blkNum,
ToBlock: blkNum,
@@ -154,7 +154,7 @@ func (s *Service) ProcessDepositLog(ctx context.Context, depositLog gethTypes.Lo
}
// We always store all historical deposits in the DB.
s.depositCache.InsertDeposit(ctx, deposit, depositLog.BlockNumber, index, s.depositTrie.Root())
s.cfg.DepositCache.InsertDeposit(ctx, deposit, depositLog.BlockNumber, index, s.depositTrie.Root())
validData := true
if !s.chainStartData.Chainstarted {
s.chainStartData.ChainstartDeposits = append(s.chainStartData.ChainstartDeposits, deposit)
@@ -168,7 +168,7 @@ func (s *Service) ProcessDepositLog(ctx context.Context, depositLog gethTypes.Lo
validData = false
}
} else {
s.depositCache.InsertPendingDeposit(ctx, deposit, depositLog.BlockNumber, index, s.depositTrie.Root())
s.cfg.DepositCache.InsertPendingDeposit(ctx, deposit, depositLog.BlockNumber, index, s.depositTrie.Root())
}
if validData {
log.WithFields(logrus.Fields{
@@ -227,7 +227,7 @@ func (s *Service) ProcessChainStart(genesisTime uint64, eth1BlockHash [32]byte,
log.WithFields(logrus.Fields{
"ChainStartTime": chainStartTime,
}).Info("Minimum number of validators reached for beacon-chain to start")
s.stateNotifier.StateFeed().Send(&feed.Event{
s.cfg.StateNotifier.StateFeed().Send(&feed.Event{
Type: statefeed.ChainStarted,
Data: &statefeed.ChainStartedData{
StartTime: chainStartTime,
@@ -284,7 +284,7 @@ func (s *Service) processPastLogs(ctx context.Context) error {
return err
}
batchSize := s.eth1HeaderReqLimit
batchSize := s.cfg.Eth1HeaderReqLimit
additiveFactor := uint64(float64(batchSize) * additiveFactorMultiplier)
for currentBlockNum < latestFollowHeight {
@@ -297,7 +297,7 @@ func (s *Service) processPastLogs(ctx context.Context) error {
}
query := ethereum.FilterQuery{
Addresses: []common.Address{
s.depositContractAddress,
s.cfg.DepositContract,
},
FromBlock: big.NewInt(int64(start)),
ToBlock: big.NewInt(int64(end)),
@@ -350,18 +350,18 @@ func (s *Service) processPastLogs(ctx context.Context) error {
}
currentBlockNum = end
if batchSize < s.eth1HeaderReqLimit {
if batchSize < s.cfg.Eth1HeaderReqLimit {
// update the batchSize with additive increase
batchSize += additiveFactor
if batchSize > s.eth1HeaderReqLimit {
batchSize = s.eth1HeaderReqLimit
if batchSize > s.cfg.Eth1HeaderReqLimit {
batchSize = s.cfg.Eth1HeaderReqLimit
}
}
}
s.latestEth1Data.LastRequestedBlock = currentBlockNum
c, err := s.beaconDB.FinalizedCheckpoint(ctx)
c, err := s.cfg.BeaconDB.FinalizedCheckpoint(ctx)
if err != nil {
return err
}
@@ -370,12 +370,12 @@ func (s *Service) processPastLogs(ctx context.Context) error {
if fRoot == params.BeaconConfig().ZeroHash {
return nil
}
fState, err := s.stateGen.StateByRoot(ctx, fRoot)
fState, err := s.cfg.StateGen.StateByRoot(ctx, fRoot)
if err != nil {
return err
}
if fState != nil && fState.Eth1DepositIndex() > 0 {
s.depositCache.PrunePendingDeposits(ctx, int64(fState.Eth1DepositIndex()))
s.cfg.DepositCache.PrunePendingDeposits(ctx, int64(fState.Eth1DepositIndex()))
}
return nil
}
@@ -540,7 +540,7 @@ func (s *Service) savePowchainData(ctx context.Context) error {
ChainstartData: s.chainStartData,
BeaconState: pbState, // I promise not to mutate it!
Trie: s.depositTrie.ToProto(),
DepositContainers: s.depositCache.AllDepositContainers(ctx),
DepositContainers: s.cfg.DepositCache.AllDepositContainers(ctx),
}
return s.beaconDB.SavePowchainData(ctx, eth1Data)
return s.cfg.BeaconDB.SavePowchainData(ctx, eth1Data)
}

View File

@@ -65,7 +65,7 @@ func TestProcessDepositLog_OK(t *testing.T) {
query := ethereum.FilterQuery{
Addresses: []common.Address{
web3Service.depositContractAddress,
web3Service.cfg.DepositContract,
},
}
@@ -133,7 +133,7 @@ func TestProcessDepositLog_InsertsPendingDeposit(t *testing.T) {
query := ethereum.FilterQuery{
Addresses: []common.Address{
web3Service.depositContractAddress,
web3Service.cfg.DepositContract,
},
}
@@ -147,7 +147,7 @@ func TestProcessDepositLog_InsertsPendingDeposit(t *testing.T) {
err = web3Service.ProcessDepositLog(context.Background(), logs[1])
require.NoError(t, err)
pendingDeposits := web3Service.depositCache.PendingDeposits(context.Background(), nil /*blockNum*/)
pendingDeposits := web3Service.cfg.DepositCache.PendingDeposits(context.Background(), nil /*blockNum*/)
require.Equal(t, 2, len(pendingDeposits), "Unexpected number of deposits")
hook.Reset()
@@ -184,7 +184,7 @@ func TestUnpackDepositLogData_OK(t *testing.T) {
query := ethereum.FilterQuery{
Addresses: []common.Address{
web3Service.depositContractAddress,
web3Service.cfg.DepositContract,
},
}
@@ -250,7 +250,7 @@ func TestProcessETH2GenesisLog_8DuplicatePubkeys(t *testing.T) {
query := ethereum.FilterQuery{
Addresses: []common.Address{
web3Service.depositContractAddress,
web3Service.cfg.DepositContract,
},
}
@@ -318,7 +318,7 @@ func TestProcessETH2GenesisLog(t *testing.T) {
query := ethereum.FilterQuery{
Addresses: []common.Address{
web3Service.depositContractAddress,
web3Service.cfg.DepositContract,
},
}
@@ -328,7 +328,7 @@ func TestProcessETH2GenesisLog(t *testing.T) {
// Set up our subscriber now to listen for the chain started event.
stateChannel := make(chan *feed.Event, 1)
stateSub := web3Service.stateNotifier.StateFeed().Subscribe(stateChannel)
stateSub := web3Service.cfg.StateNotifier.StateFeed().Subscribe(stateChannel)
defer stateSub.Unsubscribe()
for _, log := range logs {
@@ -425,7 +425,7 @@ func TestProcessETH2GenesisLog_CorrectNumOfDeposits(t *testing.T) {
// Set up our subscriber now to listen for the chain started event.
stateChannel := make(chan *feed.Event, 1)
stateSub := web3Service.stateNotifier.StateFeed().Subscribe(stateChannel)
stateSub := web3Service.cfg.StateNotifier.StateFeed().Subscribe(stateChannel)
defer stateSub.Unsubscribe()
err = web3Service.processPastLogs(context.Background())
@@ -529,7 +529,7 @@ func TestProcessETH2GenesisLog_LargePeriodOfNoLogs(t *testing.T) {
// Set up our subscriber now to listen for the chain started event.
stateChannel := make(chan *feed.Event, 1)
stateSub := web3Service.stateNotifier.StateFeed().Subscribe(stateChannel)
stateSub := web3Service.cfg.StateNotifier.StateFeed().Subscribe(stateChannel)
defer stateSub.Unsubscribe()
err = web3Service.processPastLogs(context.Background())
@@ -599,7 +599,7 @@ func TestWeb3ServiceProcessDepositLog_RequestMissedDeposits(t *testing.T) {
query := ethereum.FilterQuery{
Addresses: []common.Address{
web3Service.depositContractAddress,
web3Service.cfg.DepositContract,
},
}

View File

@@ -131,14 +131,12 @@ type Service struct {
requestingOldLogs bool
connectedETH1 bool
isRunning bool
depositContractAddress common.Address
processingLock sync.RWMutex
cfg *Web3ServiceConfig
ctx context.Context
cancel context.CancelFunc
headTicker *time.Ticker
currHttpEndpoint string
httpEndpoints []string
stateNotifier statefeed.Notifier
httpLogger bind.ContractFilterer
eth1DataFetcher RPCDataFetcher
rpcClient RPCClient
@@ -147,13 +145,9 @@ type Service struct {
depositContractCaller *contracts.DepositContractCaller
depositTrie *trieutil.SparseMerkleTrie
chainStartData *protodb.ChainStartData
beaconDB db.HeadAccessDatabase // Circular dep if using HeadFetcher.
depositCache *depositcache.DepositCache
lastReceivedMerkleIndex int64 // Keeps track of the last received index to prevent log spam.
runError error
preGenesisState iface.BeaconState
stateGen *stategen.State
eth1HeaderReqLimit uint64
}
// Web3ServiceConfig defines a config struct for web3 service to use through its life cycle.
@@ -182,9 +176,8 @@ func NewService(ctx context.Context, config *Web3ServiceConfig) (*Service, error
return nil, errors.Wrap(err, "could not setup genesis state")
}
eth1HeaderReqLimit := config.Eth1HeaderReqLimit
if eth1HeaderReqLimit == 0 {
eth1HeaderReqLimit = defaultEth1HeaderReqLimit
if config.Eth1HeaderReqLimit == 0 {
config.Eth1HeaderReqLimit = defaultEth1HeaderReqLimit
}
endpoints := dedupEndpoints(config.HTTPEndpoints)
@@ -194,9 +187,9 @@ func NewService(ctx context.Context, config *Web3ServiceConfig) (*Service, error
currEndpoint = endpoints[0]
}
s := &Service{
cfg: config,
ctx: ctx,
cancel: cancel,
httpEndpoints: endpoints,
currHttpEndpoint: currEndpoint,
latestEth1Data: &protodb.LatestETH1Data{
BlockHeight: 0,
@@ -204,21 +197,15 @@ func NewService(ctx context.Context, config *Web3ServiceConfig) (*Service, error
BlockHash: []byte{},
LastRequestedBlock: 0,
},
headerCache: newHeaderCache(),
depositContractAddress: config.DepositContract,
stateNotifier: config.StateNotifier,
depositTrie: depositTrie,
headerCache: newHeaderCache(),
depositTrie: depositTrie,
chainStartData: &protodb.ChainStartData{
Eth1Data: &ethpb.Eth1Data{},
ChainstartDeposits: make([]*ethpb.Deposit, 0),
},
beaconDB: config.BeaconDB,
depositCache: config.DepositCache,
lastReceivedMerkleIndex: -1,
preGenesisState: genState,
headTicker: time.NewTicker(time.Duration(params.BeaconConfig().SecondsPerETH1Block) * time.Second),
stateGen: config.StateGen,
eth1HeaderReqLimit: eth1HeaderReqLimit,
}
eth1Data, err := config.BeaconDB.PowchainData(ctx)
@@ -250,7 +237,7 @@ func (s *Service) Start() {
if !s.chainStartData.Chainstarted && s.currHttpEndpoint == "" {
// check for genesis state before shutting down the node,
// if a genesis state exists, we can continue on.
genState, err := s.beaconDB.GenesisState(s.ctx)
genState, err := s.cfg.BeaconDB.GenesisState(s.ctx)
if err != nil {
log.Fatal(err)
}
@@ -356,7 +343,7 @@ func (s *Service) AreAllDepositsProcessed() (bool, error) {
return false, errors.Wrap(err, "could not get deposit count")
}
count := bytesutil.FromBytes8(countByte)
deposits := s.depositCache.AllDeposits(s.ctx, nil)
deposits := s.cfg.DepositCache.AllDeposits(s.ctx, nil)
if count != uint64(len(deposits)) {
return false, nil
}
@@ -379,7 +366,7 @@ func (s *Service) connectToPowChain() error {
return errors.Wrap(err, "could not dial eth1 nodes")
}
depositContractCaller, err := contracts.NewDepositContractCaller(s.depositContractAddress, httpClient)
depositContractCaller, err := contracts.NewDepositContractCaller(s.cfg.DepositContract, httpClient)
if err != nil {
return errors.Wrap(err, "could not create deposit contract caller")
}
@@ -555,27 +542,27 @@ func (s *Service) initDepositCaches(ctx context.Context, ctrs []*protodb.Deposit
if len(ctrs) == 0 {
return nil
}
s.depositCache.InsertDepositContainers(ctx, ctrs)
s.cfg.DepositCache.InsertDepositContainers(ctx, ctrs)
if !s.chainStartData.Chainstarted {
// do not add to pending cache
// if no genesis state exists.
validDepositsCount.Add(float64(s.preGenesisState.Eth1DepositIndex()))
return nil
}
genesisState, err := s.beaconDB.GenesisState(ctx)
genesisState, err := s.cfg.BeaconDB.GenesisState(ctx)
if err != nil {
return err
}
// Default to all deposits post-genesis deposits in
// the event we cannot find a finalized state.
currIndex := genesisState.Eth1DepositIndex()
chkPt, err := s.beaconDB.FinalizedCheckpoint(ctx)
chkPt, err := s.cfg.BeaconDB.FinalizedCheckpoint(ctx)
if err != nil {
return err
}
rt := bytesutil.ToBytes32(chkPt.Root)
if rt != [32]byte{} {
fState, err := s.stateGen.StateByRoot(ctx, rt)
fState, err := s.cfg.StateGen.StateByRoot(ctx, rt)
if err != nil {
return errors.Wrap(err, "could not get finalized state")
}
@@ -590,7 +577,7 @@ func (s *Service) initDepositCaches(ctx context.Context, ctrs []*protodb.Deposit
// is more than the current index in state.
if uint64(len(ctrs)) > currIndex {
for _, c := range ctrs[currIndex:] {
s.depositCache.InsertPendingDeposit(ctx, c.Deposit, c.Eth1BlockHeight, c.Index, bytesutil.ToBytes32(c.DepositRoot))
s.cfg.DepositCache.InsertPendingDeposit(ctx, c.Deposit, c.Eth1BlockHeight, c.Index, bytesutil.ToBytes32(c.DepositRoot))
}
}
return nil
@@ -857,7 +844,7 @@ func (s *Service) determineEarliestVotingBlock(ctx context.Context, followBlock
// is ready to serve we connect to it again. This method is only
// relevant if we are on our backup endpoint.
func (s *Service) checkDefaultEndpoint() {
primaryEndpoint := s.httpEndpoints[0]
primaryEndpoint := s.cfg.HTTPEndpoints[0]
// Return early if we are running on our primary
// endpoint.
if s.currHttpEndpoint == primaryEndpoint {
@@ -888,9 +875,9 @@ func (s *Service) checkDefaultEndpoint() {
func (s *Service) fallbackToNextEndpoint() {
currEndpoint := s.currHttpEndpoint
currIndex := 0
totalEndpoints := len(s.httpEndpoints)
totalEndpoints := len(s.cfg.HTTPEndpoints)
for i, endpoint := range s.httpEndpoints {
for i, endpoint := range s.cfg.HTTPEndpoints {
if endpoint == currEndpoint {
currIndex = i
break
@@ -904,7 +891,7 @@ func (s *Service) fallbackToNextEndpoint() {
if nextIndex == currIndex {
return
}
s.currHttpEndpoint = s.httpEndpoints[nextIndex]
s.currHttpEndpoint = s.cfg.HTTPEndpoints[nextIndex]
log.Infof("Falling back to alternative endpoint: %s", logutil.MaskCredentialsLogging(s.currHttpEndpoint))
}

View File

@@ -433,25 +433,25 @@ func TestInitDepositCache_OK(t *testing.T) {
beaconDB := dbutil.SetupDB(t)
s := &Service{
chainStartData: &protodb.ChainStartData{Chainstarted: false},
beaconDB: beaconDB,
preGenesisState: gs,
cfg: &Web3ServiceConfig{BeaconDB: beaconDB},
}
var err error
s.depositCache, err = depositcache.New()
s.cfg.DepositCache, err = depositcache.New()
require.NoError(t, err)
require.NoError(t, s.initDepositCaches(context.Background(), ctrs))
require.Equal(t, 0, len(s.depositCache.PendingContainers(context.Background(), nil)))
require.Equal(t, 0, len(s.cfg.DepositCache.PendingContainers(context.Background(), nil)))
blockRootA := [32]byte{'a'}
emptyState, err := testutil.NewBeaconState()
require.NoError(t, err)
require.NoError(t, s.beaconDB.SaveGenesisBlockRoot(context.Background(), blockRootA))
require.NoError(t, s.beaconDB.SaveState(context.Background(), emptyState, blockRootA))
require.NoError(t, s.cfg.BeaconDB.SaveGenesisBlockRoot(context.Background(), blockRootA))
require.NoError(t, s.cfg.BeaconDB.SaveState(context.Background(), emptyState, blockRootA))
s.chainStartData.Chainstarted = true
require.NoError(t, s.initDepositCaches(context.Background(), ctrs))
require.Equal(t, 3, len(s.depositCache.PendingContainers(context.Background(), nil)))
require.Equal(t, 3, len(s.cfg.DepositCache.PendingContainers(context.Background(), nil)))
}
func TestNewService_EarliestVotingBlock(t *testing.T) {
@@ -515,7 +515,7 @@ func TestNewService_Eth1HeaderRequLimit(t *testing.T) {
BeaconDB: beaconDB,
})
require.NoError(t, err, "unable to setup web3 ETH1.0 chain service")
assert.Equal(t, defaultEth1HeaderReqLimit, s1.eth1HeaderReqLimit, "default eth1 header request limit not set")
assert.Equal(t, defaultEth1HeaderReqLimit, s1.cfg.Eth1HeaderReqLimit, "default eth1 header request limit not set")
s2, err := NewService(context.Background(), &Web3ServiceConfig{
HTTPEndpoints: []string{endpoint},
@@ -524,7 +524,7 @@ func TestNewService_Eth1HeaderRequLimit(t *testing.T) {
Eth1HeaderReqLimit: uint64(150),
})
require.NoError(t, err, "unable to setup web3 ETH1.0 chain service")
assert.Equal(t, uint64(150), s2.eth1HeaderReqLimit, "unable to set eth1HeaderRequestLimit")
assert.Equal(t, uint64(150), s2.cfg.Eth1HeaderReqLimit, "unable to set eth1HeaderRequestLimit")
}
func TestServiceFallbackCorrectly(t *testing.T) {
@@ -547,7 +547,7 @@ func TestServiceFallbackCorrectly(t *testing.T) {
s1.fallbackToNextEndpoint()
assert.Equal(t, firstEndpoint, s1.currHttpEndpoint, "Unexpected http endpoint")
s1.httpEndpoints = append(s1.httpEndpoints, secondEndpoint)
s1.cfg.HTTPEndpoints = append(s1.cfg.HTTPEndpoints, secondEndpoint)
s1.fallbackToNextEndpoint()
assert.Equal(t, secondEndpoint, s1.currHttpEndpoint, "Unexpected http endpoint")
@@ -555,7 +555,7 @@ func TestServiceFallbackCorrectly(t *testing.T) {
thirdEndpoint := "C"
fourthEndpoint := "D"
s1.httpEndpoints = append(s1.httpEndpoints, thirdEndpoint, fourthEndpoint)
s1.cfg.HTTPEndpoints = append(s1.cfg.HTTPEndpoints, thirdEndpoint, fourthEndpoint)
s1.fallbackToNextEndpoint()
assert.Equal(t, thirdEndpoint, s1.currHttpEndpoint, "Unexpected http endpoint")

View File

@@ -53,50 +53,16 @@ const attestationBufferSize = 100
// Service defining an RPC server for a beacon node.
type Service struct {
ctx context.Context
cancel context.CancelFunc
beaconDB db.HeadAccessDatabase
chainInfoFetcher blockchain.ChainInfoFetcher
headFetcher blockchain.HeadFetcher
canonicalFetcher blockchain.CanonicalFetcher
forkFetcher blockchain.ForkFetcher
finalizationFetcher blockchain.FinalizationFetcher
timeFetcher blockchain.TimeFetcher
genesisFetcher blockchain.GenesisFetcher
attestationReceiver blockchain.AttestationReceiver
blockReceiver blockchain.BlockReceiver
powChainService powchain.Chain
chainStartFetcher powchain.ChainStartFetcher
mockEth1Votes bool
enableDebugRPCEndpoints bool
attestationsPool attestations.Pool
exitPool voluntaryexits.PoolManager
slashingsPool slashings.PoolManager
syncService chainSync.Checker
host string
port string
beaconMonitoringHost string
beaconMonitoringPort int
listener net.Listener
withCert string
withKey string
grpcServer *grpc.Server
canonicalStateChan chan *pbp2p.BeaconState
incomingAttestation chan *ethpb.Attestation
credentialError error
p2p p2p.Broadcaster
peersFetcher p2p.PeersProvider
peerManager p2p.PeerManager
metadataProvider p2p.MetadataProvider
depositFetcher depositcache.DepositFetcher
pendingDepositFetcher depositcache.PendingDepositsFetcher
stateNotifier statefeed.Notifier
blockNotifier blockfeed.Notifier
operationNotifier opfeed.Notifier
stateGen *stategen.State
connectedRPCClients map[net.Addr]bool
clientConnectionLock sync.Mutex
maxMsgSize int
cfg *Config
ctx context.Context
cancel context.CancelFunc
listener net.Listener
grpcServer *grpc.Server
canonicalStateChan chan *pbp2p.BeaconState
incomingAttestation chan *ethpb.Attestation
credentialError error
connectedRPCClients map[net.Addr]bool
clientConnectionLock sync.Mutex
}
// Config options for the beacon node RPC server.
@@ -143,52 +109,18 @@ type Config struct {
func NewService(ctx context.Context, cfg *Config) *Service {
ctx, cancel := context.WithCancel(ctx)
return &Service{
ctx: ctx,
cancel: cancel,
beaconDB: cfg.BeaconDB,
chainInfoFetcher: cfg.ChainInfoFetcher,
headFetcher: cfg.HeadFetcher,
forkFetcher: cfg.ForkFetcher,
finalizationFetcher: cfg.FinalizationFetcher,
canonicalFetcher: cfg.CanonicalFetcher,
timeFetcher: cfg.GenesisTimeFetcher,
genesisFetcher: cfg.GenesisFetcher,
attestationReceiver: cfg.AttestationReceiver,
blockReceiver: cfg.BlockReceiver,
p2p: cfg.Broadcaster,
peersFetcher: cfg.PeersFetcher,
peerManager: cfg.PeerManager,
metadataProvider: cfg.MetadataProvider,
powChainService: cfg.POWChainService,
chainStartFetcher: cfg.ChainStartFetcher,
mockEth1Votes: cfg.MockEth1Votes,
attestationsPool: cfg.AttestationsPool,
exitPool: cfg.ExitPool,
slashingsPool: cfg.SlashingsPool,
syncService: cfg.SyncService,
host: cfg.Host,
port: cfg.Port,
beaconMonitoringHost: cfg.BeaconMonitoringHost,
beaconMonitoringPort: cfg.BeaconMonitoringPort,
withCert: cfg.CertFlag,
withKey: cfg.KeyFlag,
depositFetcher: cfg.DepositFetcher,
pendingDepositFetcher: cfg.PendingDepositFetcher,
canonicalStateChan: make(chan *pbp2p.BeaconState, params.BeaconConfig().DefaultBufferSize),
incomingAttestation: make(chan *ethpb.Attestation, params.BeaconConfig().DefaultBufferSize),
stateNotifier: cfg.StateNotifier,
blockNotifier: cfg.BlockNotifier,
operationNotifier: cfg.OperationNotifier,
stateGen: cfg.StateGen,
enableDebugRPCEndpoints: cfg.EnableDebugRPCEndpoints,
connectedRPCClients: make(map[net.Addr]bool),
maxMsgSize: cfg.MaxMsgSize,
cfg: cfg,
ctx: ctx,
cancel: cancel,
canonicalStateChan: make(chan *pbp2p.BeaconState, params.BeaconConfig().DefaultBufferSize),
incomingAttestation: make(chan *ethpb.Attestation, params.BeaconConfig().DefaultBufferSize),
connectedRPCClients: make(map[net.Addr]bool),
}
}
// Start the gRPC server.
func (s *Service) Start() {
address := fmt.Sprintf("%s:%s", s.host, s.port)
address := fmt.Sprintf("%s:%s", s.cfg.Host, s.cfg.Port)
lis, err := net.Listen("tcp", address)
if err != nil {
log.Errorf("Could not listen to port in Start() %s: %v", address, err)
@@ -214,11 +146,11 @@ func (s *Service) Start() {
grpc_opentracing.UnaryServerInterceptor(),
s.validatorUnaryConnectionInterceptor,
)),
grpc.MaxRecvMsgSize(s.maxMsgSize),
grpc.MaxRecvMsgSize(s.cfg.MaxMsgSize),
}
grpc_prometheus.EnableHandlingTimeHistogram()
if s.withCert != "" && s.withKey != "" {
creds, err := credentials.NewServerTLSFromFile(s.withCert, s.withKey)
if s.cfg.CertFlag != "" && s.cfg.KeyFlag != "" {
creds, err := credentials.NewServerTLSFromFile(s.cfg.CertFlag, s.cfg.KeyFlag)
if err != nil {
log.WithError(err).Fatal("Could not load TLS keys")
}
@@ -232,110 +164,110 @@ func (s *Service) Start() {
validatorServer := &validator.Server{
Ctx: s.ctx,
BeaconDB: s.beaconDB,
BeaconDB: s.cfg.BeaconDB,
AttestationCache: cache.NewAttestationCache(),
AttPool: s.attestationsPool,
ExitPool: s.exitPool,
HeadFetcher: s.headFetcher,
ForkFetcher: s.forkFetcher,
FinalizationFetcher: s.finalizationFetcher,
TimeFetcher: s.timeFetcher,
AttPool: s.cfg.AttestationsPool,
ExitPool: s.cfg.ExitPool,
HeadFetcher: s.cfg.HeadFetcher,
ForkFetcher: s.cfg.ForkFetcher,
FinalizationFetcher: s.cfg.FinalizationFetcher,
TimeFetcher: s.cfg.GenesisTimeFetcher,
CanonicalStateChan: s.canonicalStateChan,
BlockFetcher: s.powChainService,
DepositFetcher: s.depositFetcher,
ChainStartFetcher: s.chainStartFetcher,
Eth1InfoFetcher: s.powChainService,
SyncChecker: s.syncService,
StateNotifier: s.stateNotifier,
BlockNotifier: s.blockNotifier,
OperationNotifier: s.operationNotifier,
P2P: s.p2p,
BlockReceiver: s.blockReceiver,
MockEth1Votes: s.mockEth1Votes,
Eth1BlockFetcher: s.powChainService,
PendingDepositsFetcher: s.pendingDepositFetcher,
SlashingsPool: s.slashingsPool,
StateGen: s.stateGen,
BlockFetcher: s.cfg.POWChainService,
DepositFetcher: s.cfg.DepositFetcher,
ChainStartFetcher: s.cfg.ChainStartFetcher,
Eth1InfoFetcher: s.cfg.POWChainService,
SyncChecker: s.cfg.SyncService,
StateNotifier: s.cfg.StateNotifier,
BlockNotifier: s.cfg.BlockNotifier,
OperationNotifier: s.cfg.OperationNotifier,
P2P: s.cfg.Broadcaster,
BlockReceiver: s.cfg.BlockReceiver,
MockEth1Votes: s.cfg.MockEth1Votes,
Eth1BlockFetcher: s.cfg.POWChainService,
PendingDepositsFetcher: s.cfg.PendingDepositFetcher,
SlashingsPool: s.cfg.SlashingsPool,
StateGen: s.cfg.StateGen,
}
nodeServer := &node.Server{
LogsStreamer: logutil.NewStreamServer(),
StreamLogsBufferSize: 1000, // Enough to handle bursts of beacon node logs for gRPC streaming.
BeaconDB: s.beaconDB,
BeaconDB: s.cfg.BeaconDB,
Server: s.grpcServer,
SyncChecker: s.syncService,
GenesisTimeFetcher: s.timeFetcher,
PeersFetcher: s.peersFetcher,
PeerManager: s.peerManager,
GenesisFetcher: s.genesisFetcher,
BeaconMonitoringHost: s.beaconMonitoringHost,
BeaconMonitoringPort: s.beaconMonitoringPort,
SyncChecker: s.cfg.SyncService,
GenesisTimeFetcher: s.cfg.GenesisTimeFetcher,
PeersFetcher: s.cfg.PeersFetcher,
PeerManager: s.cfg.PeerManager,
GenesisFetcher: s.cfg.GenesisFetcher,
BeaconMonitoringHost: s.cfg.BeaconMonitoringHost,
BeaconMonitoringPort: s.cfg.BeaconMonitoringPort,
}
nodeServerV1 := &nodev1.Server{
BeaconDB: s.beaconDB,
BeaconDB: s.cfg.BeaconDB,
Server: s.grpcServer,
SyncChecker: s.syncService,
GenesisTimeFetcher: s.timeFetcher,
PeersFetcher: s.peersFetcher,
PeerManager: s.peerManager,
GenesisFetcher: s.genesisFetcher,
MetadataProvider: s.metadataProvider,
HeadFetcher: s.headFetcher,
SyncChecker: s.cfg.SyncService,
GenesisTimeFetcher: s.cfg.GenesisTimeFetcher,
PeersFetcher: s.cfg.PeersFetcher,
PeerManager: s.cfg.PeerManager,
GenesisFetcher: s.cfg.GenesisFetcher,
MetadataProvider: s.cfg.MetadataProvider,
HeadFetcher: s.cfg.HeadFetcher,
}
beaconChainServer := &beacon.Server{
Ctx: s.ctx,
BeaconDB: s.beaconDB,
AttestationsPool: s.attestationsPool,
SlashingsPool: s.slashingsPool,
HeadFetcher: s.headFetcher,
FinalizationFetcher: s.finalizationFetcher,
CanonicalFetcher: s.canonicalFetcher,
ChainStartFetcher: s.chainStartFetcher,
DepositFetcher: s.depositFetcher,
BlockFetcher: s.powChainService,
BeaconDB: s.cfg.BeaconDB,
AttestationsPool: s.cfg.AttestationsPool,
SlashingsPool: s.cfg.SlashingsPool,
HeadFetcher: s.cfg.HeadFetcher,
FinalizationFetcher: s.cfg.FinalizationFetcher,
CanonicalFetcher: s.cfg.CanonicalFetcher,
ChainStartFetcher: s.cfg.ChainStartFetcher,
DepositFetcher: s.cfg.DepositFetcher,
BlockFetcher: s.cfg.POWChainService,
CanonicalStateChan: s.canonicalStateChan,
GenesisTimeFetcher: s.timeFetcher,
StateNotifier: s.stateNotifier,
BlockNotifier: s.blockNotifier,
AttestationNotifier: s.operationNotifier,
Broadcaster: s.p2p,
StateGen: s.stateGen,
SyncChecker: s.syncService,
GenesisTimeFetcher: s.cfg.GenesisTimeFetcher,
StateNotifier: s.cfg.StateNotifier,
BlockNotifier: s.cfg.BlockNotifier,
AttestationNotifier: s.cfg.OperationNotifier,
Broadcaster: s.cfg.Broadcaster,
StateGen: s.cfg.StateGen,
SyncChecker: s.cfg.SyncService,
ReceivedAttestationsBuffer: make(chan *ethpb.Attestation, attestationBufferSize),
CollectedAttestationsBuffer: make(chan []*ethpb.Attestation, attestationBufferSize),
}
beaconChainServerV1 := &beaconv1.Server{
Ctx: s.ctx,
BeaconDB: s.beaconDB,
AttestationsPool: s.attestationsPool,
SlashingsPool: s.slashingsPool,
ChainInfoFetcher: s.chainInfoFetcher,
ChainStartFetcher: s.chainStartFetcher,
DepositFetcher: s.depositFetcher,
BlockFetcher: s.powChainService,
BeaconDB: s.cfg.BeaconDB,
AttestationsPool: s.cfg.AttestationsPool,
SlashingsPool: s.cfg.SlashingsPool,
ChainInfoFetcher: s.cfg.ChainInfoFetcher,
ChainStartFetcher: s.cfg.ChainStartFetcher,
DepositFetcher: s.cfg.DepositFetcher,
BlockFetcher: s.cfg.POWChainService,
CanonicalStateChan: s.canonicalStateChan,
GenesisTimeFetcher: s.timeFetcher,
StateNotifier: s.stateNotifier,
BlockNotifier: s.blockNotifier,
AttestationNotifier: s.operationNotifier,
Broadcaster: s.p2p,
StateGenService: s.stateGen,
SyncChecker: s.syncService,
GenesisTimeFetcher: s.cfg.GenesisTimeFetcher,
StateNotifier: s.cfg.StateNotifier,
BlockNotifier: s.cfg.BlockNotifier,
AttestationNotifier: s.cfg.OperationNotifier,
Broadcaster: s.cfg.Broadcaster,
StateGenService: s.cfg.StateGen,
SyncChecker: s.cfg.SyncService,
}
ethpb.RegisterNodeServer(s.grpcServer, nodeServer)
ethpbv1.RegisterBeaconNodeServer(s.grpcServer, nodeServerV1)
pbrpc.RegisterHealthServer(s.grpcServer, nodeServer)
ethpb.RegisterBeaconChainServer(s.grpcServer, beaconChainServer)
ethpbv1.RegisterBeaconChainServer(s.grpcServer, beaconChainServerV1)
if s.enableDebugRPCEndpoints {
if s.cfg.EnableDebugRPCEndpoints {
log.Info("Enabled debug gRPC endpoints")
debugServer := &debug.Server{
GenesisTimeFetcher: s.timeFetcher,
BeaconDB: s.beaconDB,
StateGen: s.stateGen,
HeadFetcher: s.headFetcher,
PeerManager: s.peerManager,
PeersFetcher: s.peersFetcher,
GenesisTimeFetcher: s.cfg.GenesisTimeFetcher,
BeaconDB: s.cfg.BeaconDB,
StateGen: s.cfg.StateGen,
HeadFetcher: s.cfg.HeadFetcher,
PeerManager: s.cfg.PeerManager,
PeersFetcher: s.cfg.PeersFetcher,
}
pbrpc.RegisterDebugServer(s.grpcServer, debugServer)
}
@@ -365,7 +297,7 @@ func (s *Service) Stop() error {
// Status returns nil or credentialError
func (s *Service) Status() error {
if s.syncService.Syncing() {
if s.cfg.SyncService.Syncing() {
return errors.New("syncing")
}
if s.credentialError != nil {

View File

@@ -45,7 +45,10 @@ func TestLifecycle_OK(t *testing.T) {
func TestStatus_CredentialError(t *testing.T) {
credentialErr := errors.New("credentialError")
s := &Service{credentialError: credentialErr, syncService: &mockSync.Sync{IsSyncing: false}}
s := &Service{
cfg: &Config{SyncService: &mockSync.Sync{IsSyncing: false}},
credentialError: credentialErr,
}
assert.ErrorContains(t, s.credentialError.Error(), s.Status())
}