From 115d565f49b2adc63fbf1b25c8d3901fbd1635b8 Mon Sep 17 00:00:00 2001 From: terencechain Date: Fri, 16 Jun 2023 06:47:19 -0700 Subject: [PATCH] fix: late block task wait for initial sync (#12526) * fix: late block task wait for initial sync * fix: remove wait for clock --- beacon-chain/blockchain/options.go | 7 +++++++ beacon-chain/blockchain/process_block.go | 17 +++++++++++++---- beacon-chain/blockchain/service.go | 1 + beacon-chain/node/node.go | 5 +++-- 4 files changed, 24 insertions(+), 6 deletions(-) diff --git a/beacon-chain/blockchain/options.go b/beacon-chain/blockchain/options.go index 2ad8cce8ff..04998ad842 100644 --- a/beacon-chain/blockchain/options.go +++ b/beacon-chain/blockchain/options.go @@ -172,3 +172,10 @@ func WithClockSynchronizer(gs *startup.ClockSynchronizer) Option { return nil } } + +func WithSyncComplete(c chan struct{}) Option { + return func(s *Service) error { + s.syncComplete = c + return nil + } +} diff --git a/beacon-chain/blockchain/process_block.go b/beacon-chain/blockchain/process_block.go index e5de22cf2c..59d290864a 100644 --- a/beacon-chain/blockchain/process_block.go +++ b/beacon-chain/blockchain/process_block.go @@ -652,18 +652,17 @@ func (s *Service) validateMergeTransitionBlock(ctx context.Context, stateVersion // This routine checks if there is a cached proposer payload ID available for the next slot proposer. // If there is not, it will call forkchoice updated with the correct payload attribute then cache the payload ID. func (s *Service) runLateBlockTasks() { - _, err := s.clockWaiter.WaitForClock(s.ctx) - if err != nil { - log.WithError(err).Error("runLateBlockTasks encountered an error waiting for initialization") + if err := s.waitForSync(); err != nil { + log.WithError(err).Error("failed to wait for initial sync") return } + attThreshold := params.BeaconConfig().SecondsPerSlot / 3 ticker := slots.NewSlotTickerWithOffset(s.genesisTime, time.Duration(attThreshold)*time.Second, params.BeaconConfig().SecondsPerSlot) for { select { case <-ticker.C(): s.lateBlockTasks(s.ctx) - case <-s.ctx.Done(): log.Debug("Context closed, exiting routine") return @@ -720,3 +719,13 @@ func (s *Service) lateBlockTasks(ctx context.Context) { log.WithError(err).Debug("could not perform late block tasks: failed to update forkchoice with engine") } } + +// waitForSync blocks until the node is synced to the head. +func (s *Service) waitForSync() error { + select { + case <-s.syncComplete: + return nil + case <-s.ctx.Done(): + return errors.New("context closed, exiting goroutine") + } +} diff --git a/beacon-chain/blockchain/service.go b/beacon-chain/blockchain/service.go index 13f4200add..019c4a0277 100644 --- a/beacon-chain/blockchain/service.go +++ b/beacon-chain/blockchain/service.go @@ -60,6 +60,7 @@ type Service struct { wsVerifier *WeakSubjectivityVerifier clockSetter startup.ClockSetter clockWaiter startup.ClockWaiter + syncComplete chan struct{} } // config options for the service. diff --git a/beacon-chain/node/node.go b/beacon-chain/node/node.go index 1bfbba72d6..c6668e92ee 100644 --- a/beacon-chain/node/node.go +++ b/beacon-chain/node/node.go @@ -236,7 +236,7 @@ func New(cliCtx *cli.Context, opts ...Option) (*BeaconNode, error) { } log.Debugln("Registering Blockchain Service") - if err := beacon.registerBlockchainService(beacon.forkChoicer, synchronizer); err != nil { + if err := beacon.registerBlockchainService(beacon.forkChoicer, synchronizer, beacon.initialSyncComplete); err != nil { return nil, err } @@ -590,7 +590,7 @@ func (b *BeaconNode) registerAttestationPool() error { return b.services.RegisterService(s) } -func (b *BeaconNode) registerBlockchainService(fc forkchoice.ForkChoicer, gs *startup.ClockSynchronizer) error { +func (b *BeaconNode) registerBlockchainService(fc forkchoice.ForkChoicer, gs *startup.ClockSynchronizer, syncComplete chan struct{}) error { var web3Service *execution.Service if err := b.services.FetchService(&web3Service); err != nil { return err @@ -621,6 +621,7 @@ func (b *BeaconNode) registerBlockchainService(fc forkchoice.ForkChoicer, gs *st blockchain.WithFinalizedStateAtStartUp(b.finalizedStateAtStartUp), blockchain.WithProposerIdsCache(b.proposerIdsCache), blockchain.WithClockSynchronizer(gs), + blockchain.WithSyncComplete(syncComplete), ) blockchainService, err := blockchain.NewService(b.ctx, opts...)