Init sync refactor state initialization check + tests (#7285)

* refactor state initialization into waitForStateInitialization
* more tests
* redundant blank line
* Merge branch 'master' into init-sync-refactor-state-init
* moves comment up
* Nishant's suggestions + bit more refactoring
* Explicitly process error (Nishant's suggestion)
This commit is contained in:
Victor Farazdagi
2020-09-21 15:20:26 +03:00
committed by GitHub
parent bdf8bf7be2
commit 7545d3f2b3
4 changed files with 201 additions and 49 deletions

View File

@@ -75,47 +75,9 @@ func NewInitialSync(ctx context.Context, cfg *Config) *Service {
// Start the initial sync service.
func (s *Service) Start() {
var genesis time.Time
headState, err := s.chain.HeadState(s.ctx)
if headState == nil || err != nil {
// Wait for state to be initialized.
stateChannel := make(chan *feed.Event, 1)
stateSub := s.stateNotifier.StateFeed().Subscribe(stateChannel)
// We have two instances in which we call unsubscribe. The first
// instance below is to account for the fact that we exit
// the for-select loop through a return when we receive a closed
// context or error from our subscription. The only way to correctly
// close the subscription would be through a defer. The second instance we
// call unsubscribe when we have already received the state
// initialized event and are proceeding with the main synchronization
// routine.
defer stateSub.Unsubscribe()
genesisSet := false
for !genesisSet {
select {
case event := <-stateChannel:
if event.Type == statefeed.Initialized {
data, ok := event.Data.(*statefeed.InitializedData)
if !ok {
log.Error("Event feed data is not type *statefeed.InitializedData")
continue
}
log.WithField("starttime", data.StartTime).Debug("Received state initialized event")
genesis = data.StartTime
genesisSet = true
}
case <-s.ctx.Done():
log.Debug("Context closed, exiting goroutine")
return
case err := <-stateSub.Err():
log.WithError(err).Error("Subscription to state notifier failed")
return
}
}
stateSub.Unsubscribe()
} else {
genesis = time.Unix(int64(headState.GenesisTime()), 0)
genesis, err := s.waitForStateInitialization()
if err != nil {
return
}
if flags.Get().DisableSync {
@@ -239,3 +201,41 @@ func (s *Service) waitForMinimumPeers() {
time.Sleep(handshakePollingInterval)
}
}
// waitForStateInitialization makes sure that beacon node is ready to be accessed: it is either
// already properly configured or system waits up until state initialized event is triggered.
func (s *Service) waitForStateInitialization() (time.Time, error) {
headState, err := s.chain.HeadState(s.ctx)
if err != nil {
return time.Time{}, err
}
if headState != nil {
return time.Unix(int64(headState.GenesisTime()), 0), nil
}
// Wait for state to be initialized.
stateChannel := make(chan *feed.Event, 1)
stateSub := s.stateNotifier.StateFeed().Subscribe(stateChannel)
defer stateSub.Unsubscribe()
log.Info("Waiting for state to be initialized")
for {
select {
case event := <-stateChannel:
if event.Type == statefeed.Initialized {
data, ok := event.Data.(*statefeed.InitializedData)
if !ok {
log.Error("Event feed data is not type *statefeed.InitializedData")
continue
}
log.WithField("starttime", data.StartTime).Debug("Received state initialized event")
return data.StartTime, nil
}
case <-s.ctx.Done():
log.Debug("Context closed, exiting goroutine")
return time.Time{}, errors.New("context closed")
case err := <-stateSub.Err():
log.WithError(err).Error("Subscription to state notifier failed")
return time.Time{}, err
}
}
}