diff --git a/validator/accounts/accounts_backup_test.go b/validator/accounts/accounts_backup_test.go index 4c6f27201c..8e4ad54b10 100644 --- a/validator/accounts/accounts_backup_test.go +++ b/validator/accounts/accounts_backup_test.go @@ -4,7 +4,6 @@ import ( "archive/zip" "encoding/hex" "encoding/json" - constant "github.com/prysmaticlabs/prysm/validator/testing" "io/ioutil" "os" "path/filepath" @@ -21,6 +20,7 @@ import ( "github.com/prysmaticlabs/prysm/validator/accounts/wallet" "github.com/prysmaticlabs/prysm/validator/keymanager" "github.com/prysmaticlabs/prysm/validator/keymanager/derived" + constant "github.com/prysmaticlabs/prysm/validator/testing" ) func TestBackupAccounts_Noninteractive_Derived(t *testing.T) { diff --git a/validator/accounts/accounts_list_test.go b/validator/accounts/accounts_list_test.go index 0ed02ac225..f070acc2f8 100644 --- a/validator/accounts/accounts_list_test.go +++ b/validator/accounts/accounts_list_test.go @@ -3,7 +3,6 @@ package accounts import ( "context" "fmt" - constant "github.com/prysmaticlabs/prysm/validator/testing" "io/ioutil" "math" "os" @@ -28,6 +27,7 @@ import ( "github.com/prysmaticlabs/prysm/validator/keymanager/derived" "github.com/prysmaticlabs/prysm/validator/keymanager/imported" "github.com/prysmaticlabs/prysm/validator/keymanager/remote" + constant "github.com/prysmaticlabs/prysm/validator/testing" keystorev4 "github.com/wealdtech/go-eth2-wallet-encryptor-keystorev4" ) diff --git a/validator/client/mock_validator.go b/validator/client/mock_validator.go index 13a791f2cb..32364b2629 100644 --- a/validator/client/mock_validator.go +++ b/validator/client/mock_validator.go @@ -73,7 +73,7 @@ func (fv *FakeValidator) WaitForChainStart(_ context.Context) error { } // WaitForActivation for mocking. -func (fv *FakeValidator) WaitForActivation(_ context.Context, _ <-chan struct{}) error { +func (fv *FakeValidator) WaitForActivation(_ context.Context) error { fv.WaitForActivationCalled++ if fv.RetryTillSuccess >= fv.WaitForActivationCalled { return errConnectionIssue diff --git a/validator/client/runner.go b/validator/client/runner.go index 65e107fbd4..07f5f8c630 100644 --- a/validator/client/runner.go +++ b/validator/client/runner.go @@ -27,7 +27,7 @@ type Validator interface { Done() WaitForChainStart(ctx context.Context) error WaitForSync(ctx context.Context) error - WaitForActivation(ctx context.Context, accountsChangedChan <-chan struct{}) error + WaitForActivation(ctx context.Context) error SlasherReady(ctx context.Context) error CanonicalHeadSlot(ctx context.Context) (types.Slot, error) NextSlot() <-chan types.Slot @@ -75,7 +75,6 @@ func run(ctx context.Context, v Validator) { var headSlot types.Slot firstTime := true - accountsChangedChan := make(chan struct{}, 1) for { if !firstTime { if ctx.Err() != nil { @@ -102,7 +101,7 @@ func run(ctx context.Context, v Validator) { if err != nil { log.Fatalf("Could not determine if beacon node synced: %v", err) } - err = v.WaitForActivation(ctx, accountsChangedChan) + err = v.WaitForActivation(ctx) if isConnectionError(err) { log.Warnf("Could not wait for validator activation: %v", err) continue @@ -121,7 +120,6 @@ func run(ctx context.Context, v Validator) { break } - go handleAccountsChanged(ctx, v, accountsChangedChan) connectionErrorChannel := make(chan error, 1) go v.ReceiveBlocks(ctx, connectionErrorChannel) if err := v.UpdateDuties(ctx, headSlot); err != nil { @@ -233,24 +231,3 @@ func handleAssignmentError(err error, slot types.Slot) { log.WithField("error", err).Error("Failed to update assignments") } } - -func handleAccountsChanged(ctx context.Context, v Validator, accountsChangedChan chan<- struct{}) { - validatingPubKeysChan := make(chan [][48]byte, 1) - var sub = v.GetKeymanager().SubscribeAccountChanges(validatingPubKeysChan) - defer func() { - sub.Unsubscribe() - close(validatingPubKeysChan) - }() - - for { - select { - case <-validatingPubKeysChan: - accountsChangedChan <- struct{}{} - case err := <-sub.Err(): - log.WithError(err).Error("accounts changed subscription failed") - return - case <-ctx.Done(): - return - } - } -} diff --git a/validator/client/runner_test.go b/validator/client/runner_test.go index 1ee1a2dd98..fc458c093b 100644 --- a/validator/client/runner_test.go +++ b/validator/client/runner_test.go @@ -206,55 +206,3 @@ func TestAllValidatorsAreExited_NextSlot(t *testing.T) { run(ctx, v) assert.LogsContain(t, hook, "All validators are exited") } - -func TestHandleAccountsChanged_Ok(t *testing.T) { - ctx := context.Background() - defer ctx.Done() - - km := &mockKeymanager{accountsChangedFeed: &event.Feed{}} - v := &FakeValidator{Keymanager: km} - channel := make(chan struct{}) - go handleAccountsChanged(ctx, v, channel) - time.Sleep(time.Second) // Allow time for subscribing to changes. - km.SimulateAccountChanges() - time.Sleep(time.Second) // Allow time for handling subscribed changes. - - select { - case _, ok := <-channel: - if !ok { - t.Error("Account changed channel is closed") - } - default: - t.Error("Accounts changed channel is empty") - } -} - -func TestHandleAccountsChanged_CtxCancelled(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - - km := &mockKeymanager{accountsChangedFeed: &event.Feed{}} - v := &FakeValidator{Keymanager: km} - channel := make(chan struct{}, 2) - go handleAccountsChanged(ctx, v, channel) - time.Sleep(time.Second) // Allow time for subscribing to changes. - km.SimulateAccountChanges() - time.Sleep(time.Second) // Allow time for handling subscribed changes. - - cancel() - time.Sleep(time.Second) // Allow time for handling cancellation. - km.SimulateAccountChanges() - time.Sleep(time.Second) // Allow time for handling subscribed changes. - - var values int - for loop := true; loop == true; { - select { - case _, ok := <-channel: - if ok { - values++ - } - default: - loop = false - } - } - assert.Equal(t, 1, values, "Incorrect number of values were passed to the channel") -} diff --git a/validator/client/validator_test.go b/validator/client/validator_test.go index 31c7b5509c..cbf9c7ac67 100644 --- a/validator/client/validator_test.go +++ b/validator/client/validator_test.go @@ -93,6 +93,9 @@ func (m *mockKeymanager) Sign(ctx context.Context, req *validatorpb.SignRequest) } func (m *mockKeymanager) SubscribeAccountChanges(pubKeysChan chan [][48]byte) event.Subscription { + if m.accountsChangedFeed == nil { + m.accountsChangedFeed = &event.Feed{} + } return m.accountsChangedFeed.Subscribe(pubKeysChan) } @@ -362,7 +365,7 @@ func TestWaitMultipleActivation_LogsActivationEpochOK(t *testing.T) { resp, nil, ) - require.NoError(t, v.WaitForActivation(context.Background(), make(chan struct{})), "Could not wait for activation") + require.NoError(t, v.WaitForActivation(context.Background()), "Could not wait for activation") require.LogsContain(t, hook, "Validator activated") } @@ -400,7 +403,7 @@ func TestWaitActivation_NotAllValidatorsActivatedOK(t *testing.T) { resp, nil, ) - assert.NoError(t, v.WaitForActivation(context.Background(), make(chan struct{})), "Could not wait for activation") + assert.NoError(t, v.WaitForActivation(context.Background()), "Could not wait for activation") } func TestWaitSync_ContextCanceled(t *testing.T) { diff --git a/validator/client/wait_for_activation.go b/validator/client/wait_for_activation.go index 8120ce14b2..f2b6d9f8ad 100644 --- a/validator/client/wait_for_activation.go +++ b/validator/client/wait_for_activation.go @@ -19,8 +19,28 @@ import ( // WaitForActivation checks whether the validator pubkey is in the active // validator set. If not, this operation will block until an activation message is -// received. -func (v *validator) WaitForActivation(ctx context.Context, accountsChangedChan <-chan struct{}) error { +// received. This method also monitors the keymanager for updates while waiting for an activation +// from the gRPC server. +func (v *validator) WaitForActivation(ctx context.Context) error { + // Monitor the key manager for updates. + accountsChangedChan := make(chan [][48]byte) + sub := v.GetKeymanager().SubscribeAccountChanges(accountsChangedChan) + defer func() { + sub.Unsubscribe() + close(accountsChangedChan) + }() + + return v.waitForActivation(ctx, accountsChangedChan) +} + +// waitForActivation performs the following: +// 1) While the key manager is empty, poll the key manager until some validator keys exist. +// 2) Open a server side stream for activation events against the given keys. +// 3) In another go routine, the key manager is monitored for updates and emits an update event on +// the accountsChangedChan. When an event signal is received, restart the waitForActivation routine. +// 4) If the stream is reset in error, restart the routine. +// 5) If the stream returns a response indicating one or more validators are active, exit the routine. +func (v *validator) waitForActivation(ctx context.Context, accountsChangedChan <-chan [][48]byte) error { ctx, span := trace.StartSpan(ctx, "validator.WaitForActivation") defer span.End() @@ -63,13 +83,13 @@ func (v *validator) WaitForActivation(ctx context.Context, accountsChangedChan < Error("Stream broken while waiting for activation. Reconnecting...") // Reconnection attempt backoff, up to 60s. time.Sleep(time.Second * time.Duration(mathutil.Min(uint64(attempts), 60))) - return v.WaitForActivation(incrementRetries(ctx), accountsChangedChan) + return v.waitForActivation(incrementRetries(ctx), accountsChangedChan) } for { select { case <-accountsChangedChan: // Accounts (keys) changed, restart the process. - return v.WaitForActivation(ctx, accountsChangedChan) + return v.waitForActivation(ctx, accountsChangedChan) default: res, err := stream.Recv() // If the stream is closed, we stop the loop. @@ -87,7 +107,7 @@ func (v *validator) WaitForActivation(ctx context.Context, accountsChangedChan < Error("Stream broken while waiting for activation. Reconnecting...") // Reconnection attempt backoff, up to 60s. time.Sleep(time.Second * time.Duration(mathutil.Min(uint64(attempts), 60))) - return v.WaitForActivation(incrementRetries(ctx), accountsChangedChan) + return v.waitForActivation(incrementRetries(ctx), accountsChangedChan) } valActivated := v.checkAndLogValidatorStatus(res.Statuses) diff --git a/validator/client/wait_for_activation_test.go b/validator/client/wait_for_activation_test.go index 9ff20d6cfc..d6f6e55ca5 100644 --- a/validator/client/wait_for_activation_test.go +++ b/validator/client/wait_for_activation_test.go @@ -3,7 +3,6 @@ package client import ( "context" "fmt" - constant "github.com/prysmaticlabs/prysm/validator/testing" "testing" "time" @@ -16,6 +15,7 @@ import ( "github.com/prysmaticlabs/prysm/shared/testutil/require" walletMock "github.com/prysmaticlabs/prysm/validator/accounts/testing" "github.com/prysmaticlabs/prysm/validator/keymanager/derived" + constant "github.com/prysmaticlabs/prysm/validator/testing" logTest "github.com/sirupsen/logrus/hooks/test" "github.com/tyler-smith/go-bip39" util "github.com/wealdtech/go-eth2-util" @@ -52,7 +52,7 @@ func TestWaitActivation_ContextCanceled(t *testing.T) { ) ctx, cancel := context.WithCancel(context.Background()) cancel() - assert.ErrorContains(t, cancelledCtx, v.WaitForActivation(ctx, make(chan struct{}))) + assert.ErrorContains(t, cancelledCtx, v.WaitForActivation(ctx)) } func TestWaitActivation_StreamSetupFails_AttemptsToReconnect(t *testing.T) { @@ -83,7 +83,7 @@ func TestWaitActivation_StreamSetupFails_AttemptsToReconnect(t *testing.T) { resp := generateMockStatusResponse([][]byte{pubKey[:]}) resp.Statuses[0].Status.Status = ethpb.ValidatorStatus_ACTIVE clientStream.EXPECT().Recv().Return(resp, nil) - assert.NoError(t, v.WaitForActivation(context.Background(), make(chan struct{}))) + assert.NoError(t, v.WaitForActivation(context.Background())) } func TestWaitForActivation_ReceiveErrorFromStream_AttemptsReconnection(t *testing.T) { @@ -118,7 +118,7 @@ func TestWaitForActivation_ReceiveErrorFromStream_AttemptsReconnection(t *testin nil, errors.New("fails"), ).Return(resp, nil) - assert.NoError(t, v.WaitForActivation(context.Background(), make(chan struct{}))) + assert.NoError(t, v.WaitForActivation(context.Background())) } func TestWaitActivation_LogsActivationEpochOK(t *testing.T) { @@ -153,7 +153,7 @@ func TestWaitActivation_LogsActivationEpochOK(t *testing.T) { resp, nil, ) - assert.NoError(t, v.WaitForActivation(context.Background(), make(chan struct{})), "Could not wait for activation") + assert.NoError(t, v.WaitForActivation(context.Background()), "Could not wait for activation") assert.LogsContain(t, hook, "Validator activated") } @@ -188,7 +188,7 @@ func TestWaitForActivation_Exiting(t *testing.T) { resp, nil, ) - assert.NoError(t, v.WaitForActivation(context.Background(), make(chan struct{}))) + assert.NoError(t, v.WaitForActivation(context.Background())) } func TestWaitForActivation_RefetchKeys(t *testing.T) { @@ -230,7 +230,7 @@ func TestWaitForActivation_RefetchKeys(t *testing.T) { resp, nil, ) - assert.NoError(t, v.WaitForActivation(context.Background(), make(chan struct{})), "Could not wait for activation") + assert.NoError(t, v.waitForActivation(context.Background(), make(chan [][48]byte)), "Could not wait for activation") assert.LogsContain(t, hook, msgNoKeysFetched) assert.LogsContain(t, hook, "Validator activated") } @@ -291,15 +291,14 @@ func TestWaitForActivation_AccountsChanged(t *testing.T) { nil, ) - channel := make(chan struct{}) go func() { // We add the active key into the keymanager and simulate a key refresh. time.Sleep(time.Second * 1) km.keysMap[activePubKey] = activePrivKey - channel <- struct{}{} + km.SimulateAccountChanges() }() - assert.NoError(t, v.WaitForActivation(context.Background(), channel)) + assert.NoError(t, v.WaitForActivation(context.Background())) assert.LogsContain(t, hook, "Waiting for deposit to be observed by beacon node") assert.LogsContain(t, hook, "Validator activated") }) @@ -365,16 +364,16 @@ func TestWaitForActivation_AccountsChanged(t *testing.T) { nil, ) - channel := make(chan struct{}) + channel := make(chan [][48]byte) go func() { // We add the active key into the keymanager and simulate a key refresh. time.Sleep(time.Second * 1) err = km.RecoverAccountsFromMnemonic(ctx, constant.TestMnemonic, "", 2) require.NoError(t, err) - channel <- struct{}{} + channel <- [][48]byte{} }() - assert.NoError(t, v.WaitForActivation(context.Background(), channel)) + assert.NoError(t, v.waitForActivation(context.Background(), channel)) assert.LogsContain(t, hook, "Waiting for deposit to be observed by beacon node") assert.LogsContain(t, hook, "Validator activated") }) diff --git a/validator/keymanager/derived/keymanager_test.go b/validator/keymanager/derived/keymanager_test.go index 439d86fa7e..e53f06d301 100644 --- a/validator/keymanager/derived/keymanager_test.go +++ b/validator/keymanager/derived/keymanager_test.go @@ -3,7 +3,6 @@ package derived import ( "context" "fmt" - constant "github.com/prysmaticlabs/prysm/validator/testing" "testing" validatorpb "github.com/prysmaticlabs/prysm/proto/validator/accounts/v2" @@ -12,6 +11,7 @@ import ( "github.com/prysmaticlabs/prysm/shared/testutil/assert" "github.com/prysmaticlabs/prysm/shared/testutil/require" mock "github.com/prysmaticlabs/prysm/validator/accounts/testing" + constant "github.com/prysmaticlabs/prysm/validator/testing" "github.com/tyler-smith/go-bip39" util "github.com/wealdtech/go-eth2-util" )