From 43765b5cb0a63d17632e651865f84c2379bf2d4c Mon Sep 17 00:00:00 2001 From: Nishant Das Date: Sat, 10 Oct 2020 16:50:28 +0800 Subject: [PATCH] Register Subscribers After Node Is Synced (#7468) * wait for synced * fix again * add test * fix all * fixes deepsource reported issue Co-authored-by: Victor Farazdagi --- beacon-chain/sync/initial-sync/BUILD.bazel | 1 + .../sync/initial-sync/round_robin_test.go | 6 +- beacon-chain/sync/initial-sync/service.go | 33 +++-- .../sync/initial-sync/service_test.go | 134 +++++++++++++----- beacon-chain/sync/service.go | 33 +++-- beacon-chain/sync/service_test.go | 74 ++++++++++ 6 files changed, 215 insertions(+), 66 deletions(-) diff --git a/beacon-chain/sync/initial-sync/BUILD.bazel b/beacon-chain/sync/initial-sync/BUILD.bazel index 6a42168423..9c4228eaea 100644 --- a/beacon-chain/sync/initial-sync/BUILD.bazel +++ b/beacon-chain/sync/initial-sync/BUILD.bazel @@ -118,6 +118,7 @@ go_test( "//beacon-chain/sync:go_default_library", "//proto/beacon/p2p/v1:go_default_library", "//shared/bytesutil:go_default_library", + "//shared/event:go_default_library", "//shared/featureconfig:go_default_library", "//shared/hashutil:go_default_library", "//shared/params:go_default_library", diff --git a/beacon-chain/sync/initial-sync/round_robin_test.go b/beacon-chain/sync/initial-sync/round_robin_test.go index 1a8356ecb8..4cd54250da 100644 --- a/beacon-chain/sync/initial-sync/round_robin_test.go +++ b/beacon-chain/sync/initial-sync/round_robin_test.go @@ -2,7 +2,6 @@ package initialsync import ( "context" - "fmt" "testing" eth "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1" @@ -333,6 +332,7 @@ func TestService_processBlock(t *testing.T) { Epoch: 0, }, }, + StateNotifier: &mock.MockStateNotifier{}, }) ctx := context.Background() genesis := makeGenesisTime(32) @@ -392,6 +392,7 @@ func TestService_processBlockBatch(t *testing.T) { Epoch: 0, }, }, + StateNotifier: &mock.MockStateNotifier{}, }) ctx := context.Background() genesis := makeGenesisTime(32) @@ -439,8 +440,7 @@ func TestService_processBlockBatch(t *testing.T) { ctx context.Context, blocks []*eth.SignedBeaconBlock, blockRoots [][32]byte) error { return nil }) - expectedErr := fmt.Sprintf("no good blocks in batch") - assert.ErrorContains(t, expectedErr, err) + assert.ErrorContains(t, "no good blocks in batch", err) var badBatch2 []*eth.SignedBeaconBlock for i, b := range batch2 { diff --git a/beacon-chain/sync/initial-sync/service.go b/beacon-chain/sync/initial-sync/service.go index 00f0bdfb9b..a41ce8b91d 100644 --- a/beacon-chain/sync/initial-sync/service.go +++ b/beacon-chain/sync/initial-sync/service.go @@ -54,13 +54,14 @@ type Service struct { stateNotifier statefeed.Notifier counter *ratecounter.RateCounter lastProcessedSlot uint64 + genesisChan chan time.Time } // NewService configures the initial sync service responsible for bringing the node up to the // latest head of the blockchain. func NewService(ctx context.Context, cfg *Config) *Service { ctx, cancel := context.WithCancel(ctx) - return &Service{ + s := &Service{ ctx: ctx, cancel: cancel, chain: cfg.Chain, @@ -68,13 +69,18 @@ func NewService(ctx context.Context, cfg *Config) *Service { db: cfg.DB, stateNotifier: cfg.StateNotifier, counter: ratecounter.NewRateCounter(counterSeconds * time.Second), + genesisChan: make(chan time.Time), } + go s.waitForStateInitialization() + return s } // Start the initial sync service. func (s *Service) Start() { - genesis, err := s.waitForStateInitialization() - if err != nil { + // Wait for state initialized event. + genesis := <-s.genesisChan + if genesis.IsZero() { + log.Debug("Exiting Initial Sync Service") return } if flags.Get().DisableSync { @@ -169,15 +175,7 @@ func (s *Service) waitForMinimumPeers() { // waitForStateInitialization makes sure that beacon node is ready to be accessed: it is either // already properly configured or system waits up until state initialized event is triggered. -func (s *Service) waitForStateInitialization() (time.Time, error) { - headState, err := s.chain.HeadState(s.ctx) - if err != nil { - return time.Time{}, err - } - if headState != nil { - return time.Unix(int64(headState.GenesisTime()), 0), nil - } - +func (s *Service) waitForStateInitialization() { // Wait for state to be initialized. stateChannel := make(chan *feed.Event, 1) stateSub := s.stateNotifier.StateFeed().Subscribe(stateChannel) @@ -193,14 +191,19 @@ func (s *Service) waitForStateInitialization() (time.Time, error) { continue } log.WithField("starttime", data.StartTime).Debug("Received state initialized event") - return data.StartTime, nil + s.genesisChan <- data.StartTime + return } case <-s.ctx.Done(): log.Debug("Context closed, exiting goroutine") - return time.Time{}, errors.New("context closed") + // Send a zero time in the event we are exiting. + s.genesisChan <- time.Time{} + return case err := <-stateSub.Err(): log.WithError(err).Error("Subscription to state notifier failed") - return time.Time{}, err + // Send a zero time in the event we are exiting. + s.genesisChan <- time.Time{} + return } } } diff --git a/beacon-chain/sync/initial-sync/service_test.go b/beacon-chain/sync/initial-sync/service_test.go index 3b741c20ba..b2265912f6 100644 --- a/beacon-chain/sync/initial-sync/service_test.go +++ b/beacon-chain/sync/initial-sync/service_test.go @@ -14,6 +14,7 @@ import ( dbtest "github.com/prysmaticlabs/prysm/beacon-chain/db/testing" "github.com/prysmaticlabs/prysm/beacon-chain/flags" p2pt "github.com/prysmaticlabs/prysm/beacon-chain/p2p/testing" + "github.com/prysmaticlabs/prysm/shared/event" "github.com/prysmaticlabs/prysm/shared/params" "github.com/prysmaticlabs/prysm/shared/testutil" "github.com/prysmaticlabs/prysm/shared/testutil/assert" @@ -32,6 +33,7 @@ func TestService_InitStartStop(t *testing.T) { tests := []struct { name string assert func() + methodRuns func(fd *event.Feed) chainService func() *mock.ChainService }{ { @@ -45,7 +47,7 @@ func TestService_InitStartStop(t *testing.T) { chainService: func() *mock.ChainService { // Set to future time (genesis time hasn't arrived yet). st := testutil.NewBeaconState() - require.NoError(t, st.SetGenesisTime(uint64(time.Unix(4113849600, 0).Unix()))) + return &mock.ChainService{ State: st, FinalizedCheckPoint: ð.Checkpoint{ @@ -53,9 +55,19 @@ func TestService_InitStartStop(t *testing.T) { }, } }, + methodRuns: func(fd *event.Feed) { + // Send valid event. + fd.Send(&feed.Event{ + Type: statefeed.Initialized, + Data: &statefeed.InitializedData{ + StartTime: time.Unix(4113849600, 0), + GenesisValidatorsRoot: make([]byte, 32), + }, + }) + }, assert: func() { assert.LogsContain(t, hook, "Genesis time has not arrived - not syncing") - assert.LogsDoNotContain(t, hook, "Waiting for state to be initialized") + assert.LogsContain(t, hook, "Waiting for state to be initialized") }, }, { @@ -63,7 +75,6 @@ func TestService_InitStartStop(t *testing.T) { chainService: func() *mock.ChainService { // Set to nearby slot. st := testutil.NewBeaconState() - require.NoError(t, st.SetGenesisTime(uint64(time.Now().Add(-5*time.Minute).Unix()))) return &mock.ChainService{ State: st, FinalizedCheckPoint: ð.Checkpoint{ @@ -71,10 +82,20 @@ func TestService_InitStartStop(t *testing.T) { }, } }, + methodRuns: func(fd *event.Feed) { + // Send valid event. + fd.Send(&feed.Event{ + Type: statefeed.Initialized, + Data: &statefeed.InitializedData{ + StartTime: time.Now().Add(-5 * time.Minute), + GenesisValidatorsRoot: make([]byte, 32), + }, + }) + }, assert: func() { assert.LogsContain(t, hook, "Chain started within the last epoch - not syncing") assert.LogsDoNotContain(t, hook, "Genesis time has not arrived - not syncing") - assert.LogsDoNotContain(t, hook, "Waiting for state to be initialized") + assert.LogsContain(t, hook, "Waiting for state to be initialized") }, }, { @@ -83,7 +104,6 @@ func TestService_InitStartStop(t *testing.T) { // Set to some future slot, and then make sure that current head matches it. st := testutil.NewBeaconState() futureSlot := uint64(27354) - require.NoError(t, st.SetGenesisTime(uint64(makeGenesisTime(futureSlot).Unix()))) require.NoError(t, st.SetSlot(futureSlot)) return &mock.ChainService{ State: st, @@ -92,19 +112,33 @@ func TestService_InitStartStop(t *testing.T) { }, } }, + methodRuns: func(fd *event.Feed) { + futureSlot := uint64(27354) + // Send valid event. + fd.Send(&feed.Event{ + Type: statefeed.Initialized, + Data: &statefeed.InitializedData{ + StartTime: makeGenesisTime(futureSlot), + GenesisValidatorsRoot: make([]byte, 32), + }, + }) + }, assert: func() { assert.LogsContain(t, hook, "Starting initial chain sync...") assert.LogsContain(t, hook, "Already synced to the current chain head") assert.LogsDoNotContain(t, hook, "Chain started within the last epoch - not syncing") assert.LogsDoNotContain(t, hook, "Genesis time has not arrived - not syncing") - assert.LogsDoNotContain(t, hook, "Waiting for state to be initialized") + assert.LogsContain(t, hook, "Waiting for state to be initialized") }, }, } p := p2pt.NewTestP2P(t) connectPeers(t, p, []*peerData{}, p.Peers()) - for _, tt := range tests { + for i, tt := range tests { + if i == 0 { + continue + } t.Run(tt.name, func(t *testing.T) { defer hook.Reset() ctx, cancel := context.WithCancel(context.Background()) @@ -114,12 +148,18 @@ func TestService_InitStartStop(t *testing.T) { if tt.chainService != nil { mc = tt.chainService() } + // Initialize feed + notifier := &mock.MockStateNotifier{} s := NewService(ctx, &Config{ P2P: p, Chain: mc, - StateNotifier: mc.StateNotifier(), + StateNotifier: notifier, }) + time.Sleep(500 * time.Millisecond) assert.NotNil(t, s) + if tt.methodRuns != nil { + tt.methodRuns(notifier.StateFeed()) + } wg := &sync.WaitGroup{} wg.Add(1) @@ -127,14 +167,15 @@ func TestService_InitStartStop(t *testing.T) { s.Start() wg.Done() }() + go func() { // Allow to exit from test (on no head loop waiting for head is started). // In most tests, this is redundant, as Start() already exited. - time.AfterFunc(500*time.Millisecond, func() { + time.AfterFunc(3*time.Second, func() { cancel() }) }() - if testutil.WaitTimeout(wg, time.Second*2) { + if testutil.WaitTimeout(wg, time.Second*4) { t.Fatalf("Test should have exited by now, timed out") } tt.assert() @@ -153,28 +194,6 @@ func TestService_waitForStateInitialization(t *testing.T) { return s } - t.Run("head state exists", func(t *testing.T) { - defer hook.Reset() - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - mc := &mock.ChainService{ - State: testutil.NewBeaconState(), - FinalizedCheckPoint: ð.Checkpoint{ - Epoch: 0, - }, - } - s := newService(ctx, mc) - - expectedGenesisTime := time.Unix(25000, 0) - var receivedGenesisTime time.Time - require.NoError(t, mc.State.SetGenesisTime(uint64(expectedGenesisTime.Unix()))) - receivedGenesisTime, err := s.waitForStateInitialization() - assert.NoError(t, err) - assert.Equal(t, expectedGenesisTime, receivedGenesisTime) - assert.LogsDoNotContain(t, hook, "Waiting for state to be initialized") - }) - t.Run("no state and context close", func(t *testing.T) { defer hook.Reset() ctx, cancel := context.WithCancel(context.Background()) @@ -184,8 +203,9 @@ func TestService_waitForStateInitialization(t *testing.T) { wg := &sync.WaitGroup{} wg.Add(1) go func() { - _, err := s.waitForStateInitialization() - assert.ErrorContains(t, "context closed", err) + go s.waitForStateInitialization() + currTime := <-s.genesisChan + assert.Equal(t, true, currTime.IsZero()) wg.Done() }() go func() { @@ -213,9 +233,9 @@ func TestService_waitForStateInitialization(t *testing.T) { wg := &sync.WaitGroup{} wg.Add(1) go func() { - var err error - receivedGenesisTime, err = s.waitForStateInitialization() - assert.NoError(t, err) + go s.waitForStateInitialization() + receivedGenesisTime = <-s.genesisChan + assert.Equal(t, false, receivedGenesisTime.IsZero()) wg.Done() }() go func() { @@ -245,6 +265,46 @@ func TestService_waitForStateInitialization(t *testing.T) { assert.LogsContain(t, hook, "Received state initialized event") assert.LogsDoNotContain(t, hook, "Context closed, exiting goroutine") }) + + t.Run("no state and state init event received and service start", func(t *testing.T) { + defer hook.Reset() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + s := newService(ctx, &mock.ChainService{}) + // Initialize mock feed + _ = s.stateNotifier.StateFeed() + + expectedGenesisTime := time.Now().Add(60 * time.Second) + wg := &sync.WaitGroup{} + wg.Add(1) + go func() { + s.waitForStateInitialization() + wg.Done() + }() + + wg.Add(1) + go func() { + time.AfterFunc(500*time.Millisecond, func() { + // Send valid event. + s.stateNotifier.StateFeed().Send(&feed.Event{ + Type: statefeed.Initialized, + Data: &statefeed.InitializedData{ + StartTime: expectedGenesisTime, + GenesisValidatorsRoot: make([]byte, 32), + }, + }) + }) + s.Start() + wg.Done() + }() + + if testutil.WaitTimeout(wg, time.Second*3) { + t.Fatalf("Test should have exited by now, timed out") + } + assert.LogsContain(t, hook, "Waiting for state to be initialized") + assert.LogsContain(t, hook, "Received state initialized event") + assert.LogsDoNotContain(t, hook, "Context closed, exiting goroutine") + }) } func TestService_markSynced(t *testing.T) { diff --git a/beacon-chain/sync/service.go b/beacon-chain/sync/service.go index a1efb492b8..6db02d8eab 100644 --- a/beacon-chain/sync/service.go +++ b/beacon-chain/sync/service.go @@ -234,27 +234,38 @@ func (s *Service) registerHandlers() { stateChannel := make(chan *feed.Event, 1) stateSub := s.stateNotifier.StateFeed().Subscribe(stateChannel) defer stateSub.Unsubscribe() - for !s.chainStarted { + for { select { case event := <-stateChannel: - if event.Type == statefeed.Initialized { + switch event.Type { + case statefeed.Initialized: data, ok := event.Data.(*statefeed.InitializedData) if !ok { log.Error("Event feed data is not type *statefeed.InitializedData") return } - log.WithField("starttime", data.StartTime).Debug("Received state initialized event") + startTime := data.StartTime + log.WithField("starttime", startTime).Debug("Received state initialized event") - // Register respective rpc and pubsub handlers at state initialized event. + // Register respective rpc handlers at state initialized event. s.registerRPCHandlers() - s.registerSubscribers() - - if data.StartTime.After(timeutils.Now()) { - stateSub.Unsubscribe() - time.Sleep(timeutils.Until(data.StartTime)) + // Wait for chainstart in separate routine. + go func() { + if startTime.After(timeutils.Now()) { + time.Sleep(timeutils.Until(startTime)) + } + log.WithField("starttime", startTime).Debug("Chain started in sync service") + s.chainStarted = true + }() + case statefeed.Synced: + _, ok := event.Data.(*statefeed.SyncedData) + if !ok { + log.Error("Event feed data is not type *statefeed.SyncedData") + return } - log.WithField("starttime", data.StartTime).Debug("Chain started in sync service") - s.chainStarted = true + // Register respective pubsub handlers at state synced event. + s.registerSubscribers() + return } case <-s.ctx.Done(): log.Debug("Context closed, exiting goroutine") diff --git a/beacon-chain/sync/service_test.go b/beacon-chain/sync/service_test.go index a82386ed06..eeb4c37f5b 100644 --- a/beacon-chain/sync/service_test.go +++ b/beacon-chain/sync/service_test.go @@ -2,6 +2,7 @@ package sync import ( "context" + "sync" "testing" "time" @@ -74,3 +75,76 @@ func TestSyncHandlers_WaitToSync(t *testing.T) { time.Sleep(400 * time.Millisecond) require.Equal(t, true, r.chainStarted, "Did not receive chain start event.") } + +func TestSyncHandlers_WaitTillSynced(t *testing.T) { + p2p := p2ptest.NewTestP2P(t) + chainService := &mockChain.ChainService{ + Genesis: time.Now(), + ValidatorsRoot: [32]byte{'A'}, + } + r := Service{ + ctx: context.Background(), + p2p: p2p, + chain: chainService, + stateNotifier: chainService.StateNotifier(), + initialSync: &mockSync.Sync{IsSyncing: false}, + } + + topic := "/eth2/%x/beacon_block" + go r.registerHandlers() + time.Sleep(100 * time.Millisecond) + i := r.stateNotifier.StateFeed().Send(&feed.Event{ + Type: statefeed.Initialized, + Data: &statefeed.InitializedData{ + StartTime: time.Now(), + }, + }) + if i == 0 { + t.Fatal("didn't send genesis time to subscribers") + } + b := []byte("sk") + b32 := bytesutil.ToBytes32(b) + sk, err := bls.SecretKeyFromBytes(b32[:]) + require.NoError(t, err) + + msg := testutil.NewBeaconBlock() + msg.Block.ParentRoot = testutil.Random32Bytes(t) + msg.Signature = sk.Sign([]byte("data")).Marshal() + p2p.Digest, err = r.forkDigest() + r.blockNotifier = chainService.BlockNotifier() + blockChan := make(chan feed.Event, 1) + sub := r.blockNotifier.BlockFeed().Subscribe(blockChan) + + require.NoError(t, err) + p2p.ReceivePubSub(topic, msg) + + // wait for chainstart to be sent + time.Sleep(2 * time.Second) + require.Equal(t, true, r.chainStarted, "Did not receive chain start event.") + + assert.Equal(t, 0, len(blockChan), "block was received by sync service despite not being fully synced") + + i = r.stateNotifier.StateFeed().Send(&feed.Event{ + Type: statefeed.Synced, + Data: &statefeed.SyncedData{ + StartTime: time.Now(), + }, + }) + + if i == 0 { + t.Fatal("didn't send genesis time to sync event subscribers") + } + + wg := new(sync.WaitGroup) + wg.Add(1) + go func() { + // Wait for block to be received by service. + <-blockChan + wg.Done() + sub.Unsubscribe() + }() + + p2p.ReceivePubSub(topic, msg) + // wait for message to be sent + testutil.WaitTimeout(wg, 2*time.Second) +}