diff --git a/beacon-chain/sync/initial-sync/service.go b/beacon-chain/sync/initial-sync/service.go index 0a969b89e2..ef4c237fcd 100644 --- a/beacon-chain/sync/initial-sync/service.go +++ b/beacon-chain/sync/initial-sync/service.go @@ -49,29 +49,35 @@ type Service struct { synced *abool.AtomicBool chainStarted *abool.AtomicBool counter *ratecounter.RateCounter + genesisChan chan time.Time } // NewService configures the initial sync service responsible for bringing the node up to the // latest head of the blockchain. func NewService(ctx context.Context, cfg *Config) *Service { ctx, cancel := context.WithCancel(ctx) - return &Service{ + s := &Service{ cfg: cfg, ctx: ctx, cancel: cancel, synced: abool.New(), chainStarted: abool.New(), counter: ratecounter.NewRateCounter(counterSeconds * time.Second), + genesisChan: make(chan time.Time), } + + // The reason why we have this goroutine in the constructor is to avoid a race condition + // between services' Start method and the initialization event. + // See https://github.com/prysmaticlabs/prysm/issues/10602 for details. + go s.waitForStateInitialization() + + return s } // Start the initial sync service. func (s *Service) Start() { - genesis, err := s.waitForStateInitialization() - if err != nil { - log.WithError(err).Fatal("Failed to wait for state initialization.") - return - } + // Wait for state initialized event. + genesis := <-s.genesisChan if genesis.IsZero() { log.Debug("Exiting Initial Sync Service") return @@ -179,10 +185,9 @@ func (s *Service) waitForMinimumPeers() { } } -// TODO: Return error // waitForStateInitialization makes sure that beacon node is ready to be accessed: it is either // already properly configured or system waits up until state initialized event is triggered. -func (s *Service) waitForStateInitialization() (time.Time, error) { +func (s *Service) waitForStateInitialization() { // Wait for state to be initialized. stateChannel := make(chan *feed.Event, 1) stateSub := s.cfg.StateNotifier.StateFeed().Subscribe(stateChannel) @@ -198,14 +203,19 @@ func (s *Service) waitForStateInitialization() (time.Time, error) { continue } log.WithField("starttime", data.StartTime).Debug("Received state initialized event") - return data.StartTime, nil + s.genesisChan <- data.StartTime + return } case <-s.ctx.Done(): + log.Debug("Context closed, exiting goroutine") // Send a zero time in the event we are exiting. - return time.Time{}, errors.New("context closed, exiting goroutine") + s.genesisChan <- time.Time{} + return case err := <-stateSub.Err(): + log.WithError(err).Error("Subscription to state notifier failed") // Send a zero time in the event we are exiting. - return time.Time{}, errors.Wrap(err, "subscription to state notifier failed") + s.genesisChan <- time.Time{} + return } } } diff --git a/beacon-chain/sync/initial-sync/service_test.go b/beacon-chain/sync/initial-sync/service_test.go index c3dcae49ac..7c605d4d16 100644 --- a/beacon-chain/sync/initial-sync/service_test.go +++ b/beacon-chain/sync/initial-sync/service_test.go @@ -168,7 +168,11 @@ func TestService_InitStartStop(t *testing.T) { Chain: mc, StateNotifier: notifier, }) + time.Sleep(500 * time.Millisecond) assert.NotNil(t, s) + if tt.methodRuns != nil { + tt.methodRuns(notifier.StateFeed()) + } wg := &sync.WaitGroup{} wg.Add(1) @@ -177,11 +181,6 @@ func TestService_InitStartStop(t *testing.T) { wg.Done() }() - time.Sleep(500 * time.Millisecond) - if tt.methodRuns != nil { - tt.methodRuns(notifier.StateFeed()) - } - go func() { // Allow to exit from test (on no head loop waiting for head is started). // In most tests, this is redundant, as Start() already exited. @@ -208,6 +207,7 @@ func TestService_waitForStateInitialization(t *testing.T) { synced: abool.New(), chainStarted: abool.New(), counter: ratecounter.NewRateCounter(counterSeconds * time.Second), + genesisChan: make(chan time.Time), } return s } @@ -221,8 +221,9 @@ func TestService_waitForStateInitialization(t *testing.T) { wg := &sync.WaitGroup{} wg.Add(1) go func() { - _, err := s.waitForStateInitialization() - assert.ErrorContains(t, "context closed", err) + go s.waitForStateInitialization() + currTime := <-s.genesisChan + assert.Equal(t, true, currTime.IsZero()) wg.Done() }() go func() { @@ -235,6 +236,8 @@ func TestService_waitForStateInitialization(t *testing.T) { t.Fatalf("Test should have exited by now, timed out") } assert.LogsContain(t, hook, "Waiting for state to be initialized") + assert.LogsContain(t, hook, "Context closed, exiting goroutine") + assert.LogsDoNotContain(t, hook, "Subscription to state notifier failed") }) t.Run("no state and state init event received", func(t *testing.T) { @@ -248,9 +251,8 @@ func TestService_waitForStateInitialization(t *testing.T) { wg := &sync.WaitGroup{} wg.Add(1) go func() { - var err error - receivedGenesisTime, err = s.waitForStateInitialization() - require.NoError(t, err) + go s.waitForStateInitialization() + receivedGenesisTime = <-s.genesisChan assert.Equal(t, false, receivedGenesisTime.IsZero()) wg.Done() }() @@ -279,6 +281,7 @@ func TestService_waitForStateInitialization(t *testing.T) { assert.LogsContain(t, hook, "Event feed data is not type *statefeed.InitializedData") assert.LogsContain(t, hook, "Waiting for state to be initialized") assert.LogsContain(t, hook, "Received state initialized event") + assert.LogsDoNotContain(t, hook, "Context closed, exiting goroutine") }) t.Run("no state and state init event received and service start", func(t *testing.T) { @@ -293,8 +296,7 @@ func TestService_waitForStateInitialization(t *testing.T) { wg := &sync.WaitGroup{} wg.Add(1) go func() { - _, err := s.waitForStateInitialization() - require.NoError(t, err) + s.waitForStateInitialization() wg.Done() }() @@ -319,6 +321,7 @@ func TestService_waitForStateInitialization(t *testing.T) { } assert.LogsContain(t, hook, "Waiting for state to be initialized") assert.LogsContain(t, hook, "Received state initialized event") + assert.LogsDoNotContain(t, hook, "Context closed, exiting goroutine") }) }