From e34313c7520ce864306c3e5f0484ea01011fdb9e Mon Sep 17 00:00:00 2001 From: james-prysm <90280386+james-prysm@users.noreply.github.com> Date: Tue, 27 May 2025 09:58:28 -0500 Subject: [PATCH] move account changed channel into validator for code cleanup (#15298) * move account changed channel into validator for code cleanup * gaz --- .../james-prysm_waitforactivation-cleanup.md | 3 + validator/client/iface/validator.go | 5 +- validator/client/runner.go | 29 ++----- validator/client/runner_test.go | 10 +-- validator/client/service.go | 1 + validator/client/testutil/BUILD.bazel | 1 + validator/client/testutil/mock_validator.go | 78 +++++++++++-------- validator/client/validator.go | 21 ++++- validator/client/validator_test.go | 2 +- validator/client/wait_for_activation.go | 52 ++++--------- validator/client/wait_for_activation_test.go | 52 ++++++++----- 11 files changed, 133 insertions(+), 121 deletions(-) create mode 100644 changelog/james-prysm_waitforactivation-cleanup.md diff --git a/changelog/james-prysm_waitforactivation-cleanup.md b/changelog/james-prysm_waitforactivation-cleanup.md new file mode 100644 index 0000000000..dfe31b2b07 --- /dev/null +++ b/changelog/james-prysm_waitforactivation-cleanup.md @@ -0,0 +1,3 @@ +### Ignored + +- code cleanup on wait for activation and keymanagement through moving the account changed channel as a field in the validator. \ No newline at end of file diff --git a/validator/client/iface/validator.go b/validator/client/iface/validator.go index 411c1801c9..c47fde166e 100644 --- a/validator/client/iface/validator.go +++ b/validator/client/iface/validator.go @@ -36,9 +36,10 @@ const ( // Validator interface defines the primary methods of a validator client. type Validator interface { Done() + AccountsChangedChan() <-chan [][fieldparams.BLSPubkeyLength]byte WaitForChainStart(ctx context.Context) error WaitForSync(ctx context.Context) error - WaitForActivation(ctx context.Context, accountsChangedChan chan [][fieldparams.BLSPubkeyLength]byte) error + WaitForActivation(ctx context.Context) error CanonicalHeadSlot(ctx context.Context) (primitives.Slot, error) NextSlot() <-chan primitives.Slot SlotDeadline(slot primitives.Slot) time.Time @@ -57,7 +58,7 @@ type Validator interface { Keymanager() (keymanager.IKeymanager, error) HandleKeyReload(ctx context.Context, currentKeys [][fieldparams.BLSPubkeyLength]byte) (bool, error) CheckDoppelGanger(ctx context.Context) error - PushProposerSettings(ctx context.Context, km keymanager.IKeymanager, slot primitives.Slot, forceFullPush bool) error + PushProposerSettings(ctx context.Context, slot primitives.Slot, forceFullPush bool) error SignValidatorRegistrationRequest(ctx context.Context, signer SigningFunc, newValidatorRegistration *ethpb.ValidatorRegistrationV1) (*ethpb.SignedValidatorRegistrationV1, bool /* isCached */, error) StartEventStream(ctx context.Context, topics []string, eventsChan chan<- *event.Event) EventStreamIsRunning() bool diff --git a/validator/client/runner.go b/validator/client/runner.go index f526775a30..1a31fbabf4 100644 --- a/validator/client/runner.go +++ b/validator/client/runner.go @@ -58,27 +58,19 @@ func run(ctx context.Context, v iface.Validator) { healthTracker := v.HealthTracker() runHealthCheckRoutine(ctx, v, eventsChan) - accountsChangedChan := make(chan [][fieldparams.BLSPubkeyLength]byte, 1) - km, err := v.Keymanager() - if err != nil { - log.WithError(err).Fatal("Could not get keymanager") - } - sub := km.SubscribeAccountChanges(accountsChangedChan) // check if proposer settings is still nil // Set properties on the beacon node like the fee recipient for validators that are being used & active. if v.ProposerSettings() == nil { log.Warn("Validator client started without proposer settings such as fee recipient" + " and will continue to use settings provided in the beacon node.") } - if err := v.PushProposerSettings(ctx, km, headSlot, true); err != nil { + if err := v.PushProposerSettings(ctx, headSlot, true); err != nil { log.WithError(err).Fatal("Failed to update proposer settings") } for { select { case <-ctx.Done(): log.Info("Context canceled, stopping validator") - sub.Unsubscribe() - close(accountsChangedChan) return // Exit if context is canceled. case slot := <-v.NextSlot(): if !healthTracker.IsHealthy(ctx) { @@ -113,7 +105,7 @@ func run(ctx context.Context, v iface.Validator) { // call push proposer settings often to account for the following edge cases: // proposer is activated at the start of epoch and tries to propose immediately // account has changed in the middle of an epoch - if err := v.PushProposerSettings(slotCtx, km, slot, false); err != nil { + if err := v.PushProposerSettings(slotCtx, slot, false); err != nil { log.WithError(err).Warn("Failed to update proposer settings") } @@ -159,13 +151,13 @@ func run(ctx context.Context, v iface.Validator) { } case e := <-eventsChan: v.ProcessEvent(ctx, e) - case currentKeys := <-accountsChangedChan: // should be less of a priority than next slot - onAccountsChanged(ctx, v, currentKeys, accountsChangedChan) + case currentKeys := <-v.AccountsChangedChan(): // should be less of a priority than next slot + onAccountsChanged(ctx, v, currentKeys) } } } -func onAccountsChanged(ctx context.Context, v iface.Validator, current [][48]byte, ac chan [][fieldparams.BLSPubkeyLength]byte) { +func onAccountsChanged(ctx context.Context, v iface.Validator, current [][48]byte) { ctx, span := prysmTrace.StartSpan(ctx, "validator.accountsChanged") defer span.End() @@ -175,7 +167,7 @@ func onAccountsChanged(ctx context.Context, v iface.Validator, current [][48]byt } if !anyActive { log.Warn("No active keys found. Waiting for activation...") - err := v.WaitForActivation(ctx, ac) + err := v.WaitForActivation(ctx) if err != nil { log.WithError(err).Warn("Could not wait for validator activation") } @@ -231,7 +223,7 @@ func initializeValidatorAndGetHeadSlot(ctx context.Context, v iface.Validator) ( log.WithError(err).Fatal("Could not determine if beacon node synced") } - if err := v.WaitForActivation(ctx, nil /* accountsChangedChan */); err != nil { + if err := v.WaitForActivation(ctx); err != nil { log.WithError(err).Fatal("Could not wait for validator activation") } @@ -338,17 +330,12 @@ func runHealthCheckRoutine(ctx context.Context, v iface.Validator, eventsChan ch continue // Skip to the next ticker } - km, err := v.Keymanager() - if err != nil { - log.WithError(err).Error("Could not get keymanager") - return - } slot, err := v.CanonicalHeadSlot(ctx) if err != nil { log.WithError(err).Error("Could not get canonical head slot") return } - if err := v.PushProposerSettings(ctx, km, slot, true); err != nil { + if err := v.PushProposerSettings(ctx, slot, true); err != nil { log.WithError(err).Warn("Failed to update proposer settings") } } diff --git a/validator/client/runner_test.go b/validator/client/runner_test.go index 7440234525..71602096dd 100644 --- a/validator/client/runner_test.go +++ b/validator/client/runner_test.go @@ -265,10 +265,9 @@ func TestKeyReload_ActiveKey(t *testing.T) { node := health.NewMockHealthClient(ctrl) tracker := health.NewTracker(node) node.EXPECT().IsHealthy(gomock.Any()).Return(true).AnyTimes() - v := &testutil.FakeValidator{Km: km, Tracker: tracker} - ac := make(chan [][fieldparams.BLSPubkeyLength]byte) + v := &testutil.FakeValidator{Km: km, Tracker: tracker, AccountsChannel: make(chan [][fieldparams.BLSPubkeyLength]byte)} current := [][fieldparams.BLSPubkeyLength]byte{testutil.ActiveKey} - onAccountsChanged(ctx, v, current, ac) + onAccountsChanged(ctx, v, current) assert.Equal(t, true, v.HandleKeyReloadCalled) // HandleKeyReloadCalled in the FakeValidator returns true if one of the keys is equal to the // ActiveKey. WaitForActivation is only called if none of the keys are active, so it shouldn't be called at all. @@ -284,10 +283,9 @@ func TestKeyReload_NoActiveKey(t *testing.T) { node := health.NewMockHealthClient(ctrl) tracker := health.NewTracker(node) node.EXPECT().IsHealthy(gomock.Any()).Return(true).AnyTimes() - v := &testutil.FakeValidator{Km: km, Tracker: tracker} - ac := make(chan [][fieldparams.BLSPubkeyLength]byte) + v := &testutil.FakeValidator{Km: km, Tracker: tracker, AccountsChannel: make(chan [][fieldparams.BLSPubkeyLength]byte)} current := [][fieldparams.BLSPubkeyLength]byte{na} - onAccountsChanged(ctx, v, current, ac) + onAccountsChanged(ctx, v, current) assert.Equal(t, true, v.HandleKeyReloadCalled) // HandleKeyReloadCalled in the FakeValidator returns true if one of the keys is equal to the // ActiveKey. Since we are using a key we know is not active, it should return false, which diff --git a/validator/client/service.go b/validator/client/service.go index db31e4022b..a8c4de7b80 100644 --- a/validator/client/service.go +++ b/validator/client/service.go @@ -222,6 +222,7 @@ func (v *ValidatorService) Start() { enableAPI: v.enableAPI, distributed: v.distributed, disableDutiesPolling: v.disableDutiesPolling, + accountsChangedChannel: make(chan [][fieldparams.BLSPubkeyLength]byte, 1), } v.validator = valStruct diff --git a/validator/client/testutil/BUILD.bazel b/validator/client/testutil/BUILD.bazel index a7d9119f7a..a8d0aef276 100644 --- a/validator/client/testutil/BUILD.bazel +++ b/validator/client/testutil/BUILD.bazel @@ -14,6 +14,7 @@ go_library( "//api/client/beacon/health:go_default_library", "//api/client/event:go_default_library", "//config/fieldparams:go_default_library", + "//config/params:go_default_library", "//config/proposer:go_default_library", "//consensus-types/primitives:go_default_library", "//encoding/bytesutil:go_default_library", diff --git a/validator/client/testutil/mock_validator.go b/validator/client/testutil/mock_validator.go index 7b69803535..f972292d85 100644 --- a/validator/client/testutil/mock_validator.go +++ b/validator/client/testutil/mock_validator.go @@ -10,6 +10,7 @@ import ( "github.com/OffchainLabs/prysm/v6/api/client/beacon/health" "github.com/OffchainLabs/prysm/v6/api/client/event" fieldparams "github.com/OffchainLabs/prysm/v6/config/fieldparams" + "github.com/OffchainLabs/prysm/v6/config/params" "github.com/OffchainLabs/prysm/v6/config/proposer" "github.com/OffchainLabs/prysm/v6/consensus-types/primitives" ethpb "github.com/OffchainLabs/prysm/v6/proto/prysm/v1alpha1" @@ -23,46 +24,50 @@ var _ iface.Validator = (*FakeValidator)(nil) // FakeValidator for mocking. type FakeValidator struct { - DoneCalled bool - WaitForWalletInitializationCalled bool - SlasherReadyCalled bool - NextSlotCalled bool - UpdateDutiesCalled bool - UpdateProtectionsCalled bool - RoleAtCalled bool - AttestToBlockHeadCalled bool - ProposeBlockCalled bool + IsRegularDeadline bool LogValidatorGainsAndLossesCalled bool SaveProtectionsCalled bool DeleteProtectionCalled bool SlotDeadlineCalled bool HandleKeyReloadCalled bool - WaitForChainStartCalled int - WaitForSyncCalled int - WaitForActivationCalled int - CanonicalHeadSlotCalled int - ReceiveBlocksCalled int - RetryTillSuccess int - ProposeBlockArg1 uint64 - AttestToBlockHeadArg1 uint64 - RoleAtArg1 uint64 - UpdateDutiesArg1 uint64 - NextSlotRet <-chan primitives.Slot - PublicKey string - UpdateDutiesRet error - ProposerSettingsErr error - RolesAtRet []iface.ValidatorRole - Balances map[[fieldparams.BLSPubkeyLength]byte]uint64 + WaitForWalletInitializationCalled bool + SlasherReadyCalled bool + NextSlotCalled bool + AttestToBlockHeadCalled bool + DoneCalled bool + ProposeBlockCalled bool + UpdateProtectionsCalled bool + UpdateDutiesCalled bool + RoleAtCalled bool IndexToPubkeyMap map[uint64][fieldparams.BLSPubkeyLength]byte PubkeyToIndexMap map[[fieldparams.BLSPubkeyLength]byte]uint64 PubkeysToStatusesMap map[[fieldparams.BLSPubkeyLength]byte]ethpb.ValidatorStatus - proposerSettings *proposer.Settings ProposerSettingWait time.Duration + NextSlotRet <-chan primitives.Slot + UpdateDutiesArg1 uint64 + RoleAtArg1 uint64 + AttestToBlockHeadArg1 uint64 + ProposeBlockArg1 uint64 + RetryTillSuccess int + Balances map[[fieldparams.BLSPubkeyLength]byte]uint64 + CanonicalHeadSlotCalled int + WaitForActivationCalled int + WaitForSyncCalled int + WaitForChainStartCalled int + AttSubmitted chan interface{} + BlockProposed chan interface{} + AccountsChannel chan [][fieldparams.BLSPubkeyLength]byte + EventsChannel chan *event.Event + GenesisT uint64 + ReceiveBlocksCalled int + proposerSettings *proposer.Settings + UpdateDutiesRet error + ProposerSettingsErr error Km keymanager.IKeymanager graffiti string Tracker health.Tracker - AttSubmitted chan interface{} - BlockProposed chan interface{} + PublicKey string + RolesAtRet []iface.ValidatorRole } // Done for mocking. @@ -70,6 +75,14 @@ func (fv *FakeValidator) Done() { fv.DoneCalled = true } +func (fv *FakeValidator) AccountsChangedChan() <-chan [][fieldparams.BLSPubkeyLength]byte { + return fv.AccountsChannel +} + +func (fv *FakeValidator) GenesisTime() uint64 { + return fv.GenesisT +} + // WaitForKeymanagerInitialization for mocking. func (fv *FakeValidator) WaitForKeymanagerInitialization(_ context.Context) error { fv.WaitForWalletInitializationCalled = true @@ -89,9 +102,9 @@ func (fv *FakeValidator) WaitForChainStart(_ context.Context) error { } // WaitForActivation for mocking. -func (fv *FakeValidator) WaitForActivation(_ context.Context, accountChan chan [][fieldparams.BLSPubkeyLength]byte) error { +func (fv *FakeValidator) WaitForActivation(_ context.Context) error { fv.WaitForActivationCalled++ - if accountChan == nil { + if fv.AccountsChannel == nil { return nil } if fv.RetryTillSuccess >= fv.WaitForActivationCalled { @@ -127,6 +140,9 @@ func (fv *FakeValidator) CanonicalHeadSlot(_ context.Context) (primitives.Slot, // SlotDeadline for mocking. func (fv *FakeValidator) SlotDeadline(_ primitives.Slot) time.Time { fv.SlotDeadlineCalled = true + if fv.IsRegularDeadline { + return prysmTime.Now().Add(time.Duration(params.BeaconConfig().SecondsPerSlot) * time.Second) + } return prysmTime.Now() } @@ -253,7 +269,7 @@ func (*FakeValidator) HasProposerSettings() bool { } // PushProposerSettings for mocking -func (fv *FakeValidator) PushProposerSettings(ctx context.Context, _ keymanager.IKeymanager, _ primitives.Slot, _ bool) error { +func (fv *FakeValidator) PushProposerSettings(ctx context.Context, _ primitives.Slot, _ bool) error { time.Sleep(fv.ProposerSettingWait) if errors.Is(ctx.Err(), context.DeadlineExceeded) { log.Error("deadline exceeded") diff --git a/validator/client/validator.go b/validator/client/validator.go index bc52076944..9f0d536b2c 100644 --- a/validator/client/validator.go +++ b/validator/client/validator.go @@ -113,6 +113,8 @@ type validator struct { attSelectionLock sync.Mutex dutiesLock sync.RWMutex disableDutiesPolling bool + accountsChangedChannel chan [][fieldparams.BLSPubkeyLength]byte + accountChangedSub event.Subscription } type validatorStatus struct { @@ -128,7 +130,16 @@ type attSelectionKey struct { // Done cleans up the validator. func (v *validator) Done() { - v.ticker.Done() + if v.accountChangedSub != nil { + v.accountChangedSub.Unsubscribe() + } + if v.ticker != nil { + v.ticker.Done() + } +} + +func (v *validator) AccountsChangedChan() <-chan [][fieldparams.BLSPubkeyLength]byte { + return v.accountsChangedChannel } // WaitForKeymanagerInitialization checks if the validator needs to wait for keymanager initialization. @@ -170,6 +181,7 @@ func (v *validator) WaitForKeymanagerInitialization(ctx context.Context) error { return errors.New("key manager not set") } recheckKeys(ctx, v.db, v.km) + v.accountChangedSub = v.km.SubscribeAccountChanges(v.accountsChangedChannel) return nil } @@ -1079,12 +1091,13 @@ func (v *validator) SetProposerSettings(ctx context.Context, settings *proposer. } // PushProposerSettings calls the prepareBeaconProposer RPC to set the fee recipient and also the register validator API if using a custom builder. -func (v *validator) PushProposerSettings(ctx context.Context, km keymanager.IKeymanager, slot primitives.Slot, forceFullPush bool) error { +func (v *validator) PushProposerSettings(ctx context.Context, slot primitives.Slot, forceFullPush bool) error { ctx, span := trace.StartSpan(ctx, "validator.PushProposerSettings") defer span.End() - if km == nil { - return errors.New("keymanager is nil when calling PrepareBeaconProposer") + km, err := v.Keymanager() + if err != nil { + return err } pubkeys, err := km.FetchValidatingPublicKeys(ctx) diff --git a/validator/client/validator_test.go b/validator/client/validator_test.go index 4dfd346c34..00ef966f39 100644 --- a/validator/client/validator_test.go +++ b/validator/client/validator_test.go @@ -2050,7 +2050,7 @@ func TestValidator_PushSettings(t *testing.T) { require.Equal(t, len(tt.mockExpectedRequests), len(signedRegisterValidatorRequests)) require.Equal(t, len(signedRegisterValidatorRequests), len(v.signedValidatorRegistrations)) } - if err := v.PushProposerSettings(ctx, km, 0, false); tt.err != "" { + if err := v.PushProposerSettings(ctx, 0, false); tt.err != "" { assert.ErrorContains(t, tt.err, err) } if len(tt.logMessages) > 0 { diff --git a/validator/client/wait_for_activation.go b/validator/client/wait_for_activation.go index 8dbbe12d8d..eaa5f65795 100644 --- a/validator/client/wait_for_activation.go +++ b/validator/client/wait_for_activation.go @@ -4,7 +4,6 @@ import ( "context" "time" - fieldparams "github.com/OffchainLabs/prysm/v6/config/fieldparams" "github.com/OffchainLabs/prysm/v6/math" "github.com/OffchainLabs/prysm/v6/monitoring/tracing" "github.com/OffchainLabs/prysm/v6/monitoring/tracing/trace" @@ -19,26 +18,7 @@ import ( // from the gRPC server. // // If the channel parameter is nil, WaitForActivation creates and manages its own channel. -func (v *validator) WaitForActivation(ctx context.Context, accountsChangedChan chan [][fieldparams.BLSPubkeyLength]byte) error { - // Monitor the key manager for updates. - if accountsChangedChan == nil { - accountsChangedChan = make(chan [][fieldparams.BLSPubkeyLength]byte, 1) - km, err := v.Keymanager() - if err != nil { - return err - } - // subscribe to the channel if it's the first time - sub := km.SubscribeAccountChanges(accountsChangedChan) - defer func() { - sub.Unsubscribe() - close(accountsChangedChan) - }() - } - return v.internalWaitForActivation(ctx, accountsChangedChan) -} - -// internalWaitForActivation recursively waits for at least one active validator key -func (v *validator) internalWaitForActivation(ctx context.Context, accountsChangedChan <-chan [][fieldparams.BLSPubkeyLength]byte) error { +func (v *validator) WaitForActivation(ctx context.Context) error { ctx, span := trace.StartSpan(ctx, "validator.WaitForActivation") defer span.End() @@ -51,12 +31,12 @@ func (v *validator) internalWaitForActivation(ctx context.Context, accountsChang // Step 2: If no keys, wait for accounts change or context cancellation. if len(validatingKeys) == 0 { log.Warn(msgNoKeysFetched) - return v.waitForAccountsChange(ctx, accountsChangedChan) + return v.waitForAccountsChange(ctx) } // Step 3: update validator statuses in cache. if err := v.updateValidatorStatusCache(ctx, validatingKeys); err != nil { - return v.retryWaitForActivation(ctx, span, err, "Connection broken while waiting for activation. Reconnecting...", accountsChangedChan) + return v.retryWaitForActivation(ctx, span, err, "Connection broken while waiting for activation. Reconnecting...") } // Step 4: Check and log validator statuses. @@ -67,42 +47,42 @@ func (v *validator) internalWaitForActivation(ctx context.Context, accountsChang case <-ctx.Done(): log.Debug("Context closed, exiting WaitForActivation") return ctx.Err() - case <-accountsChangedChan: + case <-v.accountsChangedChannel: // Accounts (keys) changed, restart the process. - return v.internalWaitForActivation(ctx, accountsChangedChan) + return v.WaitForActivation(ctx) default: - if err := v.waitForNextEpoch(ctx, v.genesisTime, accountsChangedChan); err != nil { - return v.retryWaitForActivation(ctx, span, err, "Failed to wait for next epoch. Reconnecting...", accountsChangedChan) + if err := v.waitForNextEpoch(ctx, v.genesisTime); err != nil { + return v.retryWaitForActivation(ctx, span, err, "Failed to wait for next epoch. Reconnecting...") } - return v.internalWaitForActivation(incrementRetries(ctx), accountsChangedChan) + return v.WaitForActivation(incrementRetries(ctx)) } } return nil } -func (v *validator) retryWaitForActivation(ctx context.Context, span octrace.Span, err error, message string, accountsChangedChan <-chan [][fieldparams.BLSPubkeyLength]byte) error { +func (v *validator) retryWaitForActivation(ctx context.Context, span octrace.Span, err error, message string) error { tracing.AnnotateError(span, err) attempts := activationAttempts(ctx) log.WithError(err).WithField("attempts", attempts).Error(message) // Reconnection attempt backoff, up to 60s. time.Sleep(time.Second * time.Duration(math.Min(uint64(attempts), 60))) // TODO: refactor this to use the health tracker instead for reattempt - return v.internalWaitForActivation(incrementRetries(ctx), accountsChangedChan) + return v.WaitForActivation(incrementRetries(ctx)) } -func (v *validator) waitForAccountsChange(ctx context.Context, accountsChangedChan <-chan [][fieldparams.BLSPubkeyLength]byte) error { +func (v *validator) waitForAccountsChange(ctx context.Context) error { select { case <-ctx.Done(): log.Debug("Context closed, exiting waitForAccountsChange") return ctx.Err() - case <-accountsChangedChan: + case <-v.accountsChangedChannel: // If the accounts changed, try again. - return v.internalWaitForActivation(ctx, accountsChangedChan) + return v.WaitForActivation(ctx) } } // waitForNextEpoch creates a blocking function to wait until the next epoch start given the current slot -func (v *validator) waitForNextEpoch(ctx context.Context, genesisTimeSec uint64, accountsChangedChan <-chan [][fieldparams.BLSPubkeyLength]byte) error { +func (v *validator) waitForNextEpoch(ctx context.Context, genesisTimeSec uint64) error { waitTime, err := slots.SecondsUntilNextEpochStart(genesisTimeSec) if err != nil { return err @@ -112,9 +92,9 @@ func (v *validator) waitForNextEpoch(ctx context.Context, genesisTimeSec uint64, case <-ctx.Done(): log.Debug("Context closed, exiting waitForNextEpoch") return ctx.Err() - case <-accountsChangedChan: + case <-v.accountsChangedChannel: // Accounts (keys) changed, restart the process. - return v.internalWaitForActivation(ctx, accountsChangedChan) + return v.WaitForActivation(ctx) case <-time.After(time.Duration(waitTime) * time.Second): log.Debug("Done waiting for epoch start") // The ticker has ticked, indicating we've reached the next epoch diff --git a/validator/client/wait_for_activation_test.go b/validator/client/wait_for_activation_test.go index 7fcb157577..f9ddc2515f 100644 --- a/validator/client/wait_for_activation_test.go +++ b/validator/client/wait_for_activation_test.go @@ -31,10 +31,11 @@ func TestWaitActivation_Exiting_OK(t *testing.T) { prysmChainClient := validatormock.NewMockPrysmChainClient(ctrl) kp := randKeypair(t) v := validator{ - validatorClient: validatorClient, - km: newMockKeymanager(t, kp), - chainClient: chainClient, - prysmChainClient: prysmChainClient, + validatorClient: validatorClient, + km: newMockKeymanager(t, kp), + chainClient: chainClient, + prysmChainClient: prysmChainClient, + accountsChangedChannel: make(chan [][fieldparams.BLSPubkeyLength]byte, 1), } ctx := context.Background() resp := testutil.GenerateMultipleValidatorStatusResponse([][]byte{kp.pub[:]}) @@ -46,7 +47,7 @@ func TestWaitActivation_Exiting_OK(t *testing.T) { }, ).Return(resp, nil) - require.NoError(t, v.WaitForActivation(ctx, nil)) + require.NoError(t, v.WaitForActivation(ctx)) require.Equal(t, 1, len(v.pubkeyToStatus)) } @@ -83,19 +84,20 @@ func TestWaitForActivation_RefetchKeys(t *testing.T) { }, ).Return(resp, nil) - accountChan := make(chan [][fieldparams.BLSPubkeyLength]byte) + accountChan := make(chan [][fieldparams.BLSPubkeyLength]byte, 1) sub := km.SubscribeAccountChanges(accountChan) defer func() { sub.Unsubscribe() close(accountChan) }() + v.accountsChangedChannel = accountChan // update the accounts from 0 to 1 after a delay go func() { time.Sleep(1 * time.Second) require.NoError(t, km.add(kp)) km.SimulateAccountChanges([][48]byte{kp.pub}) }() - assert.NoError(t, v.internalWaitForActivation(context.Background(), accountChan), "Could not wait for activation") + assert.NoError(t, v.WaitForActivation(context.Background()), "Could not wait for activation") assert.LogsContain(t, hook, msgNoKeysFetched) assert.LogsContain(t, hook, "Validator activated") } @@ -112,13 +114,21 @@ func TestWaitForActivation_AccountsChanged(t *testing.T) { validatorClient := validatormock.NewMockValidatorClient(ctrl) chainClient := validatormock.NewMockChainClient(ctrl) prysmChainClient := validatormock.NewMockPrysmChainClient(ctrl) + ch := make(chan [][fieldparams.BLSPubkeyLength]byte, 1) v := validator{ - validatorClient: validatorClient, - km: km, - chainClient: chainClient, - prysmChainClient: prysmChainClient, - pubkeyToStatus: make(map[[48]byte]*validatorStatus), + validatorClient: validatorClient, + km: km, + chainClient: chainClient, + prysmChainClient: prysmChainClient, + pubkeyToStatus: make(map[[48]byte]*validatorStatus), + accountsChangedChannel: ch, + accountChangedSub: km.SubscribeAccountChanges(ch), } + defer func() { + close(v.accountsChangedChannel) + v.accountChangedSub.Unsubscribe() + }() + inactiveResp := testutil.GenerateMultipleValidatorStatusResponse([][]byte{inactive.pub[:]}) inactiveResp.Statuses[0].Status = ethpb.ValidatorStatus_UNKNOWN_STATUS @@ -149,7 +159,7 @@ func TestWaitForActivation_AccountsChanged(t *testing.T) { ðpb.ChainHead{HeadEpoch: 0}, nil, ).AnyTimes() - assert.NoError(t, v.WaitForActivation(context.Background(), nil)) + assert.NoError(t, v.WaitForActivation(context.Background())) assert.LogsContain(t, hook, "Waiting for deposit to be observed by beacon node") assert.LogsContain(t, hook, "Validator activated") }) @@ -199,6 +209,7 @@ func TestWaitForActivation_AccountsChanged(t *testing.T) { activeResp.Statuses[1].Status = ethpb.ValidatorStatus_ACTIVE channel := make(chan [][fieldparams.BLSPubkeyLength]byte, 1) km.SubscribeAccountChanges(channel) + v.accountsChangedChannel = channel gomock.InOrder( validatorClient.EXPECT().MultipleValidatorStatus( gomock.Any(), @@ -227,7 +238,7 @@ func TestWaitForActivation_AccountsChanged(t *testing.T) { ðpb.ChainHead{HeadEpoch: 0}, nil, ).AnyTimes() - assert.NoError(t, v.internalWaitForActivation(context.Background(), channel)) + assert.NoError(t, v.WaitForActivation(context.Background())) assert.LogsContain(t, hook, "Waiting for deposit to be observed by beacon node") assert.LogsContain(t, hook, "Validator activated") }) @@ -246,11 +257,12 @@ func TestWaitForActivation_AttemptsReconnectionOnFailure(t *testing.T) { prysmChainClient := validatormock.NewMockPrysmChainClient(ctrl) kp := randKeypair(t) v := validator{ - validatorClient: validatorClient, - km: newMockKeymanager(t, kp), - chainClient: chainClient, - prysmChainClient: prysmChainClient, - pubkeyToStatus: make(map[[48]byte]*validatorStatus), + validatorClient: validatorClient, + km: newMockKeymanager(t, kp), + chainClient: chainClient, + prysmChainClient: prysmChainClient, + pubkeyToStatus: make(map[[48]byte]*validatorStatus), + accountsChangedChannel: make(chan [][fieldparams.BLSPubkeyLength]byte, 1), } active := randKeypair(t) activeResp := testutil.GenerateMultipleValidatorStatusResponse([][]byte{active.pub[:]}) @@ -271,5 +283,5 @@ func TestWaitForActivation_AttemptsReconnectionOnFailure(t *testing.T) { ðpb.ChainHead{HeadEpoch: 0}, nil, ).AnyTimes() - assert.NoError(t, v.WaitForActivation(context.Background(), nil)) + assert.NoError(t, v.WaitForActivation(context.Background())) }