diff --git a/validator/client/runner.go b/validator/client/runner.go index 1b21187b7a..a8024aac29 100644 --- a/validator/client/runner.go +++ b/validator/client/runner.go @@ -13,6 +13,7 @@ import ( "github.com/prysmaticlabs/prysm/shared/featureconfig" "github.com/prysmaticlabs/prysm/shared/params" "github.com/prysmaticlabs/prysm/validator/client/iface" + "github.com/prysmaticlabs/prysm/validator/keymanager/remote" "go.opencensus.io/trace" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -135,6 +136,14 @@ func run(ctx context.Context, v iface.Validator) { case slot := <-v.NextSlot(): span.AddAttributes(trace.Int64Attribute("slot", int64(slot))) + remoteKm, ok := v.GetKeymanager().(remote.RemoteKeymanager) + if ok { + _, err := remoteKm.ReloadPublicKeys(ctx) + if err != nil { + log.WithError(err).Error(msgCouldNotFetchKeys) + } + } + allExited, err := v.AllValidatorsAreExited(ctx) if err != nil { log.WithError(err).Error("Could not check if validators are exited") diff --git a/validator/client/runner_test.go b/validator/client/runner_test.go index 2f6656770b..17ee83fa25 100644 --- a/validator/client/runner_test.go +++ b/validator/client/runner_test.go @@ -13,6 +13,7 @@ import ( "github.com/prysmaticlabs/prysm/shared/testutil/require" "github.com/prysmaticlabs/prysm/validator/client/iface" "github.com/prysmaticlabs/prysm/validator/client/testutil" + "github.com/prysmaticlabs/prysm/validator/keymanager/remote" logTest "github.com/sirupsen/logrus/hooks/test" ) @@ -238,3 +239,19 @@ func TestKeyReload_NoActiveKey(t *testing.T) { assert.Equal(t, true, v.HandleKeyReloadCalled) assert.Equal(t, 2, v.WaitForActivationCalled) } + +func TestKeyReload_RemoteKeymanager(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + km := remote.NewMock() + v := &testutil.FakeValidator{Keymanager: &km} + + ticker := make(chan types.Slot) + v.NextSlotRet = ticker + go func() { + ticker <- types.Slot(55) + + cancel() + }() + run(ctx, v) + assert.Equal(t, true, km.ReloadPublicKeysCalled) +} diff --git a/validator/client/wait_for_activation.go b/validator/client/wait_for_activation.go index 997ea67f54..494d02cc15 100644 --- a/validator/client/wait_for_activation.go +++ b/validator/client/wait_for_activation.go @@ -91,39 +91,49 @@ func (v *validator) waitForActivation(ctx context.Context, accountsChangedChan < remoteKm, ok := v.keyManager.(remote.RemoteKeymanager) if ok { - for range v.NextSlot() { - if ctx.Err() == context.Canceled { - return errors.Wrap(ctx.Err(), "context canceled, not waiting for activation anymore") - } + for { + select { + case <-accountsChangedChan: + // Accounts (keys) changed, restart the process. + return v.waitForActivation(ctx, accountsChangedChan) + case <-v.NextSlot(): + if ctx.Err() == context.Canceled { + return errors.Wrap(ctx.Err(), "context canceled, not waiting for activation anymore") + } - validatingKeys, err = remoteKm.ReloadPublicKeys(ctx) - if err != nil { - return errors.Wrap(err, msgCouldNotFetchKeys) - } - statusRequestKeys := make([][]byte, len(validatingKeys)) - for i := range validatingKeys { - statusRequestKeys[i] = validatingKeys[i][:] - } - resp, err := v.validatorClient.MultipleValidatorStatus(ctx, ðpb.MultipleValidatorStatusRequest{ - PublicKeys: statusRequestKeys, - }) - if err != nil { - return err - } - statuses := make([]*validatorStatus, len(resp.Statuses)) - for i, s := range resp.Statuses { - statuses[i] = &validatorStatus{ - publicKey: resp.PublicKeys[i], - status: s, - index: resp.Indices[i], + log.Error("Before ReloadPublicKeys") + validatingKeys, err = remoteKm.ReloadPublicKeys(ctx) + if err != nil { + return errors.Wrap(err, msgCouldNotFetchKeys) + } + log.Error("After ReloadPublicKeys") + statusRequestKeys := make([][]byte, len(validatingKeys)) + for i := range validatingKeys { + statusRequestKeys[i] = validatingKeys[i][:] + } + resp, err := v.validatorClient.MultipleValidatorStatus(ctx, ðpb.MultipleValidatorStatusRequest{ + PublicKeys: statusRequestKeys, + }) + if err != nil { + return err + } + statuses := make([]*validatorStatus, len(resp.Statuses)) + for i, s := range resp.Statuses { + statuses[i] = &validatorStatus{ + publicKey: resp.PublicKeys[i], + status: s, + index: resp.Indices[i], + } + } + + valActivated := v.checkAndLogValidatorStatus(statuses) + if valActivated { + logActiveValidatorStatus(statuses) + } else { + continue } } - - valActivated := v.checkAndLogValidatorStatus(statuses) - if valActivated { - logActiveValidatorStatus(statuses) - break - } + break } } else { for { diff --git a/validator/client/wait_for_activation_test.go b/validator/client/wait_for_activation_test.go index 5beac6ec2e..5cab2c7d54 100644 --- a/validator/client/wait_for_activation_test.go +++ b/validator/client/wait_for_activation_test.go @@ -397,9 +397,8 @@ func TestWaitForActivation_RemoteKeymanager(t *testing.T) { inactiveKey := bytesutil.ToBytes48([]byte("inactive")) activeKey := bytesutil.ToBytes48([]byte("active")) - km := &remote.MockKeymanager{ - PublicKeys: [][48]byte{inactiveKey, activeKey}, - } + km := remote.NewMock() + km.PublicKeys = [][48]byte{inactiveKey, activeKey} slot := types.Slot(0) t.Run("activated", func(t *testing.T) { @@ -412,7 +411,7 @@ func TestWaitForActivation_RemoteKeymanager(t *testing.T) { } v := validator{ validatorClient: client, - keyManager: km, + keyManager: &km, ticker: ticker, } go func() { @@ -447,7 +446,7 @@ func TestWaitForActivation_RemoteKeymanager(t *testing.T) { } v := validator{ validatorClient: client, - keyManager: km, + keyManager: &km, ticker: ticker, } go func() { @@ -458,4 +457,52 @@ func TestWaitForActivation_RemoteKeymanager(t *testing.T) { err := v.waitForActivation(ctx, nil /* accountsChangedChan */) assert.ErrorContains(t, "context canceled, not waiting for activation anymore", err) }) + t.Run("reloaded", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + hook := logTest.NewGlobal() + remoteKm := remote.NewMock() + remoteKm.PublicKeys = [][48]byte{inactiveKey} + + tickerChan := make(chan types.Slot) + ticker := &slotutilmock.MockTicker{ + Channel: tickerChan, + } + v := validator{ + validatorClient: client, + keyManager: &remoteKm, + ticker: ticker, + } + go func() { + tickerChan <- slot + time.Sleep(time.Second) + remoteKm.PublicKeys = [][48]byte{inactiveKey, activeKey} + tickerChan <- slot + // Cancel after timeout to avoid waiting on channel forever in case test goes wrong. + time.Sleep(time.Second) + cancel() + }() + + resp := testutil.GenerateMultipleValidatorStatusResponse([][]byte{inactiveKey[:]}) + resp.Statuses[0].Status = ethpb.ValidatorStatus_UNKNOWN_STATUS + client.EXPECT().MultipleValidatorStatus( + gomock.Any(), + ðpb.MultipleValidatorStatusRequest{ + PublicKeys: [][]byte{inactiveKey[:]}, + }, + ).Return(resp, nil /* err */) + resp2 := testutil.GenerateMultipleValidatorStatusResponse([][]byte{inactiveKey[:], activeKey[:]}) + resp2.Statuses[0].Status = ethpb.ValidatorStatus_UNKNOWN_STATUS + resp2.Statuses[1].Status = ethpb.ValidatorStatus_ACTIVE + client.EXPECT().MultipleValidatorStatus( + gomock.Any(), + ðpb.MultipleValidatorStatusRequest{ + PublicKeys: [][]byte{inactiveKey[:], activeKey[:]}, + }, + ).Return(resp2, nil /* err */) + + err := v.waitForActivation(ctx, remoteKm.ReloadPublicKeysChan /* accountsChangedChan */) + require.NoError(t, err) + assert.LogsContain(t, hook, "Waiting for deposit to be observed by beacon node") + assert.LogsContain(t, hook, "Validator activated") + }) } diff --git a/validator/keymanager/remote/mock_keymanager.go b/validator/keymanager/remote/mock_keymanager.go index f59c7ad4b0..5bdfa4b703 100644 --- a/validator/keymanager/remote/mock_keymanager.go +++ b/validator/keymanager/remote/mock_keymanager.go @@ -10,7 +10,17 @@ import ( // MockKeymanager -- type MockKeymanager struct { - PublicKeys [][48]byte + PublicKeys [][48]byte + ReloadPublicKeysChan chan [][48]byte + ReloadPublicKeysCalled bool + accountsChangedFeed *event.Feed +} + +func NewMock() MockKeymanager { + return MockKeymanager{ + accountsChangedFeed: new(event.Feed), + ReloadPublicKeysChan: make(chan [][48]byte, 1), + } } // FetchValidatingPublicKeys -- @@ -24,11 +34,13 @@ func (*MockKeymanager) Sign(context.Context, *validatorpb.SignRequest) (bls.Sign } // SubscribeAccountChanges -- -func (*MockKeymanager) SubscribeAccountChanges(chan [][48]byte) event.Subscription { - panic("implement me") +func (m *MockKeymanager) SubscribeAccountChanges(chan [][48]byte) event.Subscription { + return m.accountsChangedFeed.Subscribe(m.ReloadPublicKeysChan) } // ReloadPublicKeys -- func (m *MockKeymanager) ReloadPublicKeys(context.Context) ([][48]byte, error) { + m.ReloadPublicKeysCalled = true + m.ReloadPublicKeysChan <- m.PublicKeys return m.PublicKeys, nil }