Fix remote keymanager's dynamic key reload (#8817)

* Fix remote keymanager's dynamic key reload

* wait for activation test

* runner test

* rename mock creation func

* fix compile error

Co-authored-by: Raul Jordan <raul@prysmaticlabs.com>
Co-authored-by: prylabs-bulldozer[bot] <58059840+prylabs-bulldozer[bot]@users.noreply.github.com>
This commit is contained in:
Radosław Kapka
2021-04-30 17:15:22 +02:00
committed by GitHub
parent a063952abe
commit 2fdfda2804
5 changed files with 133 additions and 38 deletions

View File

@@ -13,6 +13,7 @@ import (
"github.com/prysmaticlabs/prysm/shared/featureconfig"
"github.com/prysmaticlabs/prysm/shared/params"
"github.com/prysmaticlabs/prysm/validator/client/iface"
"github.com/prysmaticlabs/prysm/validator/keymanager/remote"
"go.opencensus.io/trace"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
@@ -135,6 +136,14 @@ func run(ctx context.Context, v iface.Validator) {
case slot := <-v.NextSlot():
span.AddAttributes(trace.Int64Attribute("slot", int64(slot)))
remoteKm, ok := v.GetKeymanager().(remote.RemoteKeymanager)
if ok {
_, err := remoteKm.ReloadPublicKeys(ctx)
if err != nil {
log.WithError(err).Error(msgCouldNotFetchKeys)
}
}
allExited, err := v.AllValidatorsAreExited(ctx)
if err != nil {
log.WithError(err).Error("Could not check if validators are exited")

View File

@@ -13,6 +13,7 @@ import (
"github.com/prysmaticlabs/prysm/shared/testutil/require"
"github.com/prysmaticlabs/prysm/validator/client/iface"
"github.com/prysmaticlabs/prysm/validator/client/testutil"
"github.com/prysmaticlabs/prysm/validator/keymanager/remote"
logTest "github.com/sirupsen/logrus/hooks/test"
)
@@ -238,3 +239,19 @@ func TestKeyReload_NoActiveKey(t *testing.T) {
assert.Equal(t, true, v.HandleKeyReloadCalled)
assert.Equal(t, 2, v.WaitForActivationCalled)
}
func TestKeyReload_RemoteKeymanager(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
km := remote.NewMock()
v := &testutil.FakeValidator{Keymanager: &km}
ticker := make(chan types.Slot)
v.NextSlotRet = ticker
go func() {
ticker <- types.Slot(55)
cancel()
}()
run(ctx, v)
assert.Equal(t, true, km.ReloadPublicKeysCalled)
}

View File

@@ -91,39 +91,49 @@ func (v *validator) waitForActivation(ctx context.Context, accountsChangedChan <
remoteKm, ok := v.keyManager.(remote.RemoteKeymanager)
if ok {
for range v.NextSlot() {
if ctx.Err() == context.Canceled {
return errors.Wrap(ctx.Err(), "context canceled, not waiting for activation anymore")
}
for {
select {
case <-accountsChangedChan:
// Accounts (keys) changed, restart the process.
return v.waitForActivation(ctx, accountsChangedChan)
case <-v.NextSlot():
if ctx.Err() == context.Canceled {
return errors.Wrap(ctx.Err(), "context canceled, not waiting for activation anymore")
}
validatingKeys, err = remoteKm.ReloadPublicKeys(ctx)
if err != nil {
return errors.Wrap(err, msgCouldNotFetchKeys)
}
statusRequestKeys := make([][]byte, len(validatingKeys))
for i := range validatingKeys {
statusRequestKeys[i] = validatingKeys[i][:]
}
resp, err := v.validatorClient.MultipleValidatorStatus(ctx, &ethpb.MultipleValidatorStatusRequest{
PublicKeys: statusRequestKeys,
})
if err != nil {
return err
}
statuses := make([]*validatorStatus, len(resp.Statuses))
for i, s := range resp.Statuses {
statuses[i] = &validatorStatus{
publicKey: resp.PublicKeys[i],
status: s,
index: resp.Indices[i],
log.Error("Before ReloadPublicKeys")
validatingKeys, err = remoteKm.ReloadPublicKeys(ctx)
if err != nil {
return errors.Wrap(err, msgCouldNotFetchKeys)
}
log.Error("After ReloadPublicKeys")
statusRequestKeys := make([][]byte, len(validatingKeys))
for i := range validatingKeys {
statusRequestKeys[i] = validatingKeys[i][:]
}
resp, err := v.validatorClient.MultipleValidatorStatus(ctx, &ethpb.MultipleValidatorStatusRequest{
PublicKeys: statusRequestKeys,
})
if err != nil {
return err
}
statuses := make([]*validatorStatus, len(resp.Statuses))
for i, s := range resp.Statuses {
statuses[i] = &validatorStatus{
publicKey: resp.PublicKeys[i],
status: s,
index: resp.Indices[i],
}
}
valActivated := v.checkAndLogValidatorStatus(statuses)
if valActivated {
logActiveValidatorStatus(statuses)
} else {
continue
}
}
valActivated := v.checkAndLogValidatorStatus(statuses)
if valActivated {
logActiveValidatorStatus(statuses)
break
}
break
}
} else {
for {

View File

@@ -397,9 +397,8 @@ func TestWaitForActivation_RemoteKeymanager(t *testing.T) {
inactiveKey := bytesutil.ToBytes48([]byte("inactive"))
activeKey := bytesutil.ToBytes48([]byte("active"))
km := &remote.MockKeymanager{
PublicKeys: [][48]byte{inactiveKey, activeKey},
}
km := remote.NewMock()
km.PublicKeys = [][48]byte{inactiveKey, activeKey}
slot := types.Slot(0)
t.Run("activated", func(t *testing.T) {
@@ -412,7 +411,7 @@ func TestWaitForActivation_RemoteKeymanager(t *testing.T) {
}
v := validator{
validatorClient: client,
keyManager: km,
keyManager: &km,
ticker: ticker,
}
go func() {
@@ -447,7 +446,7 @@ func TestWaitForActivation_RemoteKeymanager(t *testing.T) {
}
v := validator{
validatorClient: client,
keyManager: km,
keyManager: &km,
ticker: ticker,
}
go func() {
@@ -458,4 +457,52 @@ func TestWaitForActivation_RemoteKeymanager(t *testing.T) {
err := v.waitForActivation(ctx, nil /* accountsChangedChan */)
assert.ErrorContains(t, "context canceled, not waiting for activation anymore", err)
})
t.Run("reloaded", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
hook := logTest.NewGlobal()
remoteKm := remote.NewMock()
remoteKm.PublicKeys = [][48]byte{inactiveKey}
tickerChan := make(chan types.Slot)
ticker := &slotutilmock.MockTicker{
Channel: tickerChan,
}
v := validator{
validatorClient: client,
keyManager: &remoteKm,
ticker: ticker,
}
go func() {
tickerChan <- slot
time.Sleep(time.Second)
remoteKm.PublicKeys = [][48]byte{inactiveKey, activeKey}
tickerChan <- slot
// Cancel after timeout to avoid waiting on channel forever in case test goes wrong.
time.Sleep(time.Second)
cancel()
}()
resp := testutil.GenerateMultipleValidatorStatusResponse([][]byte{inactiveKey[:]})
resp.Statuses[0].Status = ethpb.ValidatorStatus_UNKNOWN_STATUS
client.EXPECT().MultipleValidatorStatus(
gomock.Any(),
&ethpb.MultipleValidatorStatusRequest{
PublicKeys: [][]byte{inactiveKey[:]},
},
).Return(resp, nil /* err */)
resp2 := testutil.GenerateMultipleValidatorStatusResponse([][]byte{inactiveKey[:], activeKey[:]})
resp2.Statuses[0].Status = ethpb.ValidatorStatus_UNKNOWN_STATUS
resp2.Statuses[1].Status = ethpb.ValidatorStatus_ACTIVE
client.EXPECT().MultipleValidatorStatus(
gomock.Any(),
&ethpb.MultipleValidatorStatusRequest{
PublicKeys: [][]byte{inactiveKey[:], activeKey[:]},
},
).Return(resp2, nil /* err */)
err := v.waitForActivation(ctx, remoteKm.ReloadPublicKeysChan /* accountsChangedChan */)
require.NoError(t, err)
assert.LogsContain(t, hook, "Waiting for deposit to be observed by beacon node")
assert.LogsContain(t, hook, "Validator activated")
})
}

View File

@@ -10,7 +10,17 @@ import (
// MockKeymanager --
type MockKeymanager struct {
PublicKeys [][48]byte
PublicKeys [][48]byte
ReloadPublicKeysChan chan [][48]byte
ReloadPublicKeysCalled bool
accountsChangedFeed *event.Feed
}
func NewMock() MockKeymanager {
return MockKeymanager{
accountsChangedFeed: new(event.Feed),
ReloadPublicKeysChan: make(chan [][48]byte, 1),
}
}
// FetchValidatingPublicKeys --
@@ -24,11 +34,13 @@ func (*MockKeymanager) Sign(context.Context, *validatorpb.SignRequest) (bls.Sign
}
// SubscribeAccountChanges --
func (*MockKeymanager) SubscribeAccountChanges(chan [][48]byte) event.Subscription {
panic("implement me")
func (m *MockKeymanager) SubscribeAccountChanges(chan [][48]byte) event.Subscription {
return m.accountsChangedFeed.Subscribe(m.ReloadPublicKeysChan)
}
// ReloadPublicKeys --
func (m *MockKeymanager) ReloadPublicKeys(context.Context) ([][48]byte, error) {
m.ReloadPublicKeysCalled = true
m.ReloadPublicKeysChan <- m.PublicKeys
return m.PublicKeys, nil
}