diff --git a/beacon-chain/sync/initial-sync/BUILD.bazel b/beacon-chain/sync/initial-sync/BUILD.bazel index d69e1e8aec..2ae82047f0 100644 --- a/beacon-chain/sync/initial-sync/BUILD.bazel +++ b/beacon-chain/sync/initial-sync/BUILD.bazel @@ -100,10 +100,13 @@ go_test( "fsm_test.go", "initial_sync_test.go", "round_robin_test.go", + "service_test.go", ], embed = [":go_default_library"], deps = [ "//beacon-chain/blockchain/testing:go_default_library", + "//beacon-chain/core/feed:go_default_library", + "//beacon-chain/core/feed/state:go_default_library", "//beacon-chain/core/helpers:go_default_library", "//beacon-chain/db:go_default_library", "//beacon-chain/db/testing:go_default_library", diff --git a/beacon-chain/sync/initial-sync/round_robin_test.go b/beacon-chain/sync/initial-sync/round_robin_test.go index 037c105cde..a0791ba8d1 100644 --- a/beacon-chain/sync/initial-sync/round_robin_test.go +++ b/beacon-chain/sync/initial-sync/round_robin_test.go @@ -8,21 +8,13 @@ import ( eth "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1" mock "github.com/prysmaticlabs/prysm/beacon-chain/blockchain/testing" dbtest "github.com/prysmaticlabs/prysm/beacon-chain/db/testing" - "github.com/prysmaticlabs/prysm/beacon-chain/flags" p2pt "github.com/prysmaticlabs/prysm/beacon-chain/p2p/testing" - "github.com/prysmaticlabs/prysm/shared/params" "github.com/prysmaticlabs/prysm/shared/sliceutil" "github.com/prysmaticlabs/prysm/shared/testutil" "github.com/prysmaticlabs/prysm/shared/testutil/assert" "github.com/prysmaticlabs/prysm/shared/testutil/require" ) -func TestConstants(t *testing.T) { - if params.BeaconConfig().MaxPeersToSync*flags.Get().BlockBatchLimit > 1000 { - t.Fatal("rpc rejects requests over 1000 range slots") - } -} - func TestService_roundRobinSync(t *testing.T) { tests := []struct { name string diff --git a/beacon-chain/sync/initial-sync/service.go b/beacon-chain/sync/initial-sync/service.go index c5ad4ed78b..1820d9c841 100644 --- a/beacon-chain/sync/initial-sync/service.go +++ b/beacon-chain/sync/initial-sync/service.go @@ -75,47 +75,9 @@ func NewInitialSync(ctx context.Context, cfg *Config) *Service { // Start the initial sync service. func (s *Service) Start() { - var genesis time.Time - - headState, err := s.chain.HeadState(s.ctx) - if headState == nil || err != nil { - // Wait for state to be initialized. - stateChannel := make(chan *feed.Event, 1) - stateSub := s.stateNotifier.StateFeed().Subscribe(stateChannel) - // We have two instances in which we call unsubscribe. The first - // instance below is to account for the fact that we exit - // the for-select loop through a return when we receive a closed - // context or error from our subscription. The only way to correctly - // close the subscription would be through a defer. The second instance we - // call unsubscribe when we have already received the state - // initialized event and are proceeding with the main synchronization - // routine. - defer stateSub.Unsubscribe() - genesisSet := false - for !genesisSet { - select { - case event := <-stateChannel: - if event.Type == statefeed.Initialized { - data, ok := event.Data.(*statefeed.InitializedData) - if !ok { - log.Error("Event feed data is not type *statefeed.InitializedData") - continue - } - log.WithField("starttime", data.StartTime).Debug("Received state initialized event") - genesis = data.StartTime - genesisSet = true - } - case <-s.ctx.Done(): - log.Debug("Context closed, exiting goroutine") - return - case err := <-stateSub.Err(): - log.WithError(err).Error("Subscription to state notifier failed") - return - } - } - stateSub.Unsubscribe() - } else { - genesis = time.Unix(int64(headState.GenesisTime()), 0) + genesis, err := s.waitForStateInitialization() + if err != nil { + return } if flags.Get().DisableSync { @@ -239,3 +201,41 @@ func (s *Service) waitForMinimumPeers() { time.Sleep(handshakePollingInterval) } } + +// waitForStateInitialization makes sure that beacon node is ready to be accessed: it is either +// already properly configured or system waits up until state initialized event is triggered. +func (s *Service) waitForStateInitialization() (time.Time, error) { + headState, err := s.chain.HeadState(s.ctx) + if err != nil { + return time.Time{}, err + } + if headState != nil { + return time.Unix(int64(headState.GenesisTime()), 0), nil + } + + // Wait for state to be initialized. + stateChannel := make(chan *feed.Event, 1) + stateSub := s.stateNotifier.StateFeed().Subscribe(stateChannel) + defer stateSub.Unsubscribe() + log.Info("Waiting for state to be initialized") + for { + select { + case event := <-stateChannel: + if event.Type == statefeed.Initialized { + data, ok := event.Data.(*statefeed.InitializedData) + if !ok { + log.Error("Event feed data is not type *statefeed.InitializedData") + continue + } + log.WithField("starttime", data.StartTime).Debug("Received state initialized event") + return data.StartTime, nil + } + case <-s.ctx.Done(): + log.Debug("Context closed, exiting goroutine") + return time.Time{}, errors.New("context closed") + case err := <-stateSub.Err(): + log.WithError(err).Error("Subscription to state notifier failed") + return time.Time{}, err + } + } +} diff --git a/beacon-chain/sync/initial-sync/service_test.go b/beacon-chain/sync/initial-sync/service_test.go new file mode 100644 index 0000000000..bcec9bf641 --- /dev/null +++ b/beacon-chain/sync/initial-sync/service_test.go @@ -0,0 +1,157 @@ +package initialsync + +import ( + "context" + "sync" + "testing" + "time" + + eth "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1" + mock "github.com/prysmaticlabs/prysm/beacon-chain/blockchain/testing" + "github.com/prysmaticlabs/prysm/beacon-chain/core/feed" + statefeed "github.com/prysmaticlabs/prysm/beacon-chain/core/feed/state" + dbtest "github.com/prysmaticlabs/prysm/beacon-chain/db/testing" + "github.com/prysmaticlabs/prysm/beacon-chain/flags" + p2pt "github.com/prysmaticlabs/prysm/beacon-chain/p2p/testing" + "github.com/prysmaticlabs/prysm/shared/params" + "github.com/prysmaticlabs/prysm/shared/testutil" + "github.com/prysmaticlabs/prysm/shared/testutil/assert" + "github.com/prysmaticlabs/prysm/shared/testutil/require" + logTest "github.com/sirupsen/logrus/hooks/test" +) + +func TestService_Constants(t *testing.T) { + if params.BeaconConfig().MaxPeersToSync*flags.Get().BlockBatchLimit > 1000 { + t.Fatal("rpc rejects requests over 1000 range slots") + } +} + +func TestService_InitStartStop(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + mc, p2p, db := initializeTestServices(t, []uint64{}, []*peerData{}) + s := NewInitialSync(ctx, &Config{ + P2P: p2p, + DB: db, + Chain: mc, + }) + assert.NotNil(t, s) +} + +func TestService_waitForStateInitialization(t *testing.T) { + hook := logTest.NewGlobal() + + // Setup database. + beaconDB, _ := dbtest.SetupDB(t) + genesisBlk := testutil.NewBeaconBlock() + genesisBlkRoot, err := genesisBlk.Block.HashTreeRoot() + require.NoError(t, err) + err = beaconDB.SaveBlock(context.Background(), genesisBlk) + require.NoError(t, err) + + newService := func(ctx context.Context, mc *mock.ChainService) *Service { + s := NewInitialSync(ctx, &Config{ + P2P: p2pt.NewTestP2P(t), + DB: beaconDB, + Chain: mc, + StateNotifier: mc.StateNotifier(), + }) + require.NotNil(t, s) + return s + } + + t.Run("head state exists", func(t *testing.T) { + defer hook.Reset() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + mc := &mock.ChainService{ + State: testutil.NewBeaconState(), + Root: genesisBlkRoot[:], + DB: beaconDB, + FinalizedCheckPoint: ð.Checkpoint{ + Epoch: 0, + }, + } + s := newService(ctx, mc) + + expectedGenesisTime := time.Unix(25000, 0) + var receivedGenesisTime time.Time + require.NoError(t, mc.State.SetGenesisTime(uint64(expectedGenesisTime.Unix()))) + receivedGenesisTime, err = s.waitForStateInitialization() + assert.NoError(t, err) + assert.Equal(t, expectedGenesisTime, receivedGenesisTime) + assert.LogsDoNotContain(t, hook, "Waiting for state to be initialized") + }) + + t.Run("no state and context close", func(t *testing.T) { + defer hook.Reset() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + s := newService(ctx, &mock.ChainService{}) + wg := &sync.WaitGroup{} + wg.Add(1) + go func() { + _, err := s.waitForStateInitialization() + assert.ErrorContains(t, "context closed", err) + wg.Done() + }() + go func() { + time.AfterFunc(500*time.Millisecond, func() { + cancel() + }) + }() + + if testutil.WaitTimeout(wg, time.Second*2) { + t.Fatalf("Test should have exited by now, timed out") + } + assert.LogsContain(t, hook, "Waiting for state to be initialized") + assert.LogsContain(t, hook, "Context closed, exiting goroutine") + assert.LogsDoNotContain(t, hook, "Subscription to state notifier failed") + }) + + t.Run("no state and state init event received", func(t *testing.T) { + defer hook.Reset() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + s := newService(ctx, &mock.ChainService{}) + + expectedGenesisTime := time.Unix(358544700, 0) + var receivedGenesisTime time.Time + wg := &sync.WaitGroup{} + wg.Add(1) + go func() { + receivedGenesisTime, err = s.waitForStateInitialization() + assert.NoError(t, err) + wg.Done() + }() + go func() { + time.AfterFunc(500*time.Millisecond, func() { + // Send invalid event at first. + s.stateNotifier.StateFeed().Send(&feed.Event{ + Type: statefeed.Initialized, + Data: &statefeed.BlockProcessedData{}, + }) + // Send valid event. + s.stateNotifier.StateFeed().Send(&feed.Event{ + Type: statefeed.Initialized, + Data: &statefeed.InitializedData{ + StartTime: expectedGenesisTime, + GenesisValidatorsRoot: make([]byte, 32), + }, + }) + }) + }() + + if testutil.WaitTimeout(wg, time.Second*2) { + t.Fatalf("Test should have exited by now, timed out") + } + assert.Equal(t, expectedGenesisTime, receivedGenesisTime) + assert.LogsContain(t, hook, "Event feed data is not type *statefeed.InitializedData") + assert.LogsContain(t, hook, "Waiting for state to be initialized") + assert.LogsContain(t, hook, "Received state initialized event") + assert.LogsDoNotContain(t, hook, "Context closed, exiting goroutine") + }) +}