diff --git a/beacon-chain/node/node.go b/beacon-chain/node/node.go index e9c1f06e0e..d2acbf8b7b 100644 --- a/beacon-chain/node/node.go +++ b/beacon-chain/node/node.go @@ -244,7 +244,8 @@ func New(cliCtx *cli.Context, cancel context.CancelFunc, opts ...Option) (*Beaco beacon.clockWaiter, forkchoice.NewROForkChoice(beacon.forkChoicer), beacon.stateGen) pa := peers.NewAssigner(beacon.fetchP2P().Peers(), beacon.forkChoicer) - beacon.BackfillOpts = append(beacon.BackfillOpts, backfill.WithVerifierWaiter(beacon.verifyInitWaiter)) + beacon.BackfillOpts = append(beacon.BackfillOpts, backfill.WithVerifierWaiter(beacon.verifyInitWaiter), + backfill.WithInitSyncWaiter(initSyncWaiter(ctx, beacon.initialSyncComplete))) bf, err := backfill.NewService(ctx, bfs, beacon.BlobStorage, beacon.clockWaiter, beacon.fetchP2P(), pa, beacon.BackfillOpts...) if err != nil { return nil, errors.Wrap(err, "error initializing backfill service") @@ -327,6 +328,16 @@ func New(cliCtx *cli.Context, cancel context.CancelFunc, opts ...Option) (*Beaco return beacon, nil } +func initSyncWaiter(ctx context.Context, complete chan struct{}) func() error { + return func() error { + select { + case <-complete: + return nil + case <-ctx.Done(): + return ctx.Err() + } + } +} func newRouter(cliCtx *cli.Context) *mux.Router { var allowedOrigins []string diff --git a/beacon-chain/sync/backfill/service.go b/beacon-chain/sync/backfill/service.go index e1436d660e..fbcd77182f 100644 --- a/beacon-chain/sync/backfill/service.go +++ b/beacon-chain/sync/backfill/service.go @@ -39,6 +39,7 @@ type Service struct { pa PeerAssigner batchImporter batchImporter blobStore *filesystem.BlobStorage + initSyncWaiter func() error } var _ runtime.Service = (*Service)(nil) @@ -93,6 +94,15 @@ func WithBatchSize(n uint64) ServiceOption { } } +// WithInitSyncWaiter sets a function on the service which will block until init-sync +// completes for the first time, or returns an error if context is canceled. +func WithInitSyncWaiter(w func() error) ServiceOption { + return func(s *Service) error { + s.initSyncWaiter = w + return nil + } +} + // InitializerWaiter is an interface that is satisfied by verification.InitializerWaiter. // Using this interface enables node init to satisfy this requirement for the backfill service // while also allowing backfill to mock it in tests. @@ -261,8 +271,15 @@ func (s *Service) Start() { log.WithError(err).Error("Unable to initialize backfill verifier.") return } - s.pool.spawn(ctx, s.nWorkers, clock, s.pa, s.verifier, s.ctxMap, s.newBlobVerifier, s.blobStore) + if s.initSyncWaiter != nil { + log.Info("Backfill service waiting for initial-sync to reach head before starting.") + if err := s.initSyncWaiter(); err != nil { + log.WithError(err).Error("Error waiting for init-sync to complete.") + return + } + } + s.pool.spawn(ctx, s.nWorkers, clock, s.pa, s.verifier, s.ctxMap, s.newBlobVerifier, s.blobStore) s.batchSeq = newBatchSequencer(s.nWorkers, s.ms(s.clock.CurrentSlot()), primitives.Slot(status.LowSlot), primitives.Slot(s.batchSize)) if err = s.initBatches(); err != nil { log.WithError(err).Error("Non-recoverable error in backfill service.")