diff --git a/beacon-chain/sync/initial-sync/service.go b/beacon-chain/sync/initial-sync/service.go index 953e7ab762..0a969b89e2 100644 --- a/beacon-chain/sync/initial-sync/service.go +++ b/beacon-chain/sync/initial-sync/service.go @@ -49,30 +49,29 @@ type Service struct { synced *abool.AtomicBool chainStarted *abool.AtomicBool counter *ratecounter.RateCounter - genesisChan chan time.Time } // NewService configures the initial sync service responsible for bringing the node up to the // latest head of the blockchain. func NewService(ctx context.Context, cfg *Config) *Service { ctx, cancel := context.WithCancel(ctx) - s := &Service{ + return &Service{ cfg: cfg, ctx: ctx, cancel: cancel, synced: abool.New(), chainStarted: abool.New(), counter: ratecounter.NewRateCounter(counterSeconds * time.Second), - genesisChan: make(chan time.Time), } - go s.waitForStateInitialization() - return s } // Start the initial sync service. func (s *Service) Start() { - // Wait for state initialized event. - genesis := <-s.genesisChan + genesis, err := s.waitForStateInitialization() + if err != nil { + log.WithError(err).Fatal("Failed to wait for state initialization.") + return + } if genesis.IsZero() { log.Debug("Exiting Initial Sync Service") return @@ -180,9 +179,10 @@ func (s *Service) waitForMinimumPeers() { } } +// TODO: Return error // waitForStateInitialization makes sure that beacon node is ready to be accessed: it is either // already properly configured or system waits up until state initialized event is triggered. -func (s *Service) waitForStateInitialization() { +func (s *Service) waitForStateInitialization() (time.Time, error) { // Wait for state to be initialized. stateChannel := make(chan *feed.Event, 1) stateSub := s.cfg.StateNotifier.StateFeed().Subscribe(stateChannel) @@ -198,19 +198,14 @@ func (s *Service) waitForStateInitialization() { continue } log.WithField("starttime", data.StartTime).Debug("Received state initialized event") - s.genesisChan <- data.StartTime - return + return data.StartTime, nil } case <-s.ctx.Done(): - log.Debug("Context closed, exiting goroutine") // Send a zero time in the event we are exiting. - s.genesisChan <- time.Time{} - return + return time.Time{}, errors.New("context closed, exiting goroutine") case err := <-stateSub.Err(): - log.WithError(err).Error("Subscription to state notifier failed") // Send a zero time in the event we are exiting. - s.genesisChan <- time.Time{} - return + return time.Time{}, errors.Wrap(err, "subscription to state notifier failed") } } } diff --git a/beacon-chain/sync/initial-sync/service_test.go b/beacon-chain/sync/initial-sync/service_test.go index f9d9ba9847..d4d3151fcd 100644 --- a/beacon-chain/sync/initial-sync/service_test.go +++ b/beacon-chain/sync/initial-sync/service_test.go @@ -168,11 +168,7 @@ func TestService_InitStartStop(t *testing.T) { Chain: mc, StateNotifier: notifier, }) - time.Sleep(500 * time.Millisecond) assert.NotNil(t, s) - if tt.methodRuns != nil { - tt.methodRuns(notifier.StateFeed()) - } wg := &sync.WaitGroup{} wg.Add(1) @@ -181,6 +177,11 @@ func TestService_InitStartStop(t *testing.T) { wg.Done() }() + time.Sleep(500 * time.Millisecond) + if tt.methodRuns != nil { + tt.methodRuns(notifier.StateFeed()) + } + go func() { // Allow to exit from test (on no head loop waiting for head is started). // In most tests, this is redundant, as Start() already exited. @@ -207,7 +208,6 @@ func TestService_waitForStateInitialization(t *testing.T) { synced: abool.New(), chainStarted: abool.New(), counter: ratecounter.NewRateCounter(counterSeconds * time.Second), - genesisChan: make(chan time.Time), } return s } @@ -221,9 +221,8 @@ func TestService_waitForStateInitialization(t *testing.T) { wg := &sync.WaitGroup{} wg.Add(1) go func() { - go s.waitForStateInitialization() - currTime := <-s.genesisChan - assert.Equal(t, true, currTime.IsZero()) + _, err := s.waitForStateInitialization() + assert.ErrorContains(t, "context closed", err) wg.Done() }() go func() { @@ -236,8 +235,6 @@ func TestService_waitForStateInitialization(t *testing.T) { t.Fatalf("Test should have exited by now, timed out") } assert.LogsContain(t, hook, "Waiting for state to be initialized") - assert.LogsContain(t, hook, "Context closed, exiting goroutine") - assert.LogsDoNotContain(t, hook, "Subscription to state notifier failed") }) t.Run("no state and state init event received", func(t *testing.T) { @@ -251,8 +248,9 @@ func TestService_waitForStateInitialization(t *testing.T) { wg := &sync.WaitGroup{} wg.Add(1) go func() { - go s.waitForStateInitialization() - receivedGenesisTime = <-s.genesisChan + var err error + receivedGenesisTime, err = s.waitForStateInitialization() + require.NoError(t, err) assert.Equal(t, false, receivedGenesisTime.IsZero()) wg.Done() }() @@ -281,7 +279,6 @@ func TestService_waitForStateInitialization(t *testing.T) { assert.LogsContain(t, hook, "Event feed data is not type *statefeed.InitializedData") assert.LogsContain(t, hook, "Waiting for state to be initialized") assert.LogsContain(t, hook, "Received state initialized event") - assert.LogsDoNotContain(t, hook, "Context closed, exiting goroutine") }) t.Run("no state and state init event received and service start", func(t *testing.T) { @@ -296,7 +293,8 @@ func TestService_waitForStateInitialization(t *testing.T) { wg := &sync.WaitGroup{} wg.Add(1) go func() { - s.waitForStateInitialization() + _, err := s.waitForStateInitialization() + require.NoError(t, err) wg.Done() }() @@ -321,7 +319,6 @@ func TestService_waitForStateInitialization(t *testing.T) { } assert.LogsContain(t, hook, "Waiting for state to be initialized") assert.LogsContain(t, hook, "Received state initialized event") - assert.LogsDoNotContain(t, hook, "Context closed, exiting goroutine") }) }