Allow dynamic key reloading when having inactive keys (imported & derived) (#8119)

* restart waiting for activation on key change

* test fixes

* wiat for activation comments

* regression test

* log fatal when validator cast fails

* derived keymanager

* review comments

* add buffer to channel

* simplify key refetch logic

* reload keys into empty wallet

* removed warning on wallet creation

* add empty line

* export AccountsKeystoreRepresentation type

* unit test for handleAccountsChanged

* test ctx cancellation

* add missing mockRemoteKeymanager interface function

* gazelle

* gzl

* fix panic inside goroutine during runner tests

* rename error message variables

* Update validator/accounts/accounts_list_test.go

* reorder imports

Co-authored-by: Raul Jordan <raul@prysmaticlabs.com>
Co-authored-by: Preston Van Loon <preston@prysmaticlabs.com>
This commit is contained in:
Radosław Kapka
2021-01-22 21:21:34 +01:00
committed by GitHub
parent 229abed848
commit 8ffb95bd9d
23 changed files with 371 additions and 78 deletions

View File

@@ -75,6 +75,7 @@ go_test(
"//proto/validator/accounts/v2:go_default_library",
"//shared/bls:go_default_library",
"//shared/bytesutil:go_default_library",
"//shared/event:go_default_library",
"//shared/fileutil:go_default_library",
"//shared/mock:go_default_library",
"//shared/params:go_default_library",

View File

@@ -5,10 +5,9 @@ import (
"github.com/prysmaticlabs/prysm/validator/keymanager"
)
var msgKeymanagerNotSupported = "keymanager kind not supported: %s"
var (
// ErrCouldNotInitializeKeymanager informs about failed keymanager initialization
errKeymanagerNotSupported = "keymanager kind not supported: %s"
// MsgCouldNotInitializeKeymanager informs about failed keymanager initialization
ErrCouldNotInitializeKeymanager = "could not initialize keymanager"
)

View File

@@ -114,7 +114,7 @@ func BackupAccountsCli(cliCtx *cli.Context) error {
case keymanager.Remote:
return errors.New("backing up keys is not supported for a remote keymanager")
default:
return fmt.Errorf(msgKeymanagerNotSupported, w.KeymanagerKind())
return fmt.Errorf(errKeymanagerNotSupported, w.KeymanagerKind())
}
return zipKeystoresToOutputDir(keystoresToBackup, backupDir)
}

View File

@@ -131,7 +131,7 @@ func DeleteAccount(ctx context.Context, cfg *AccountsConfig) error {
return errors.Wrap(err, "could not delete accounts")
}
default:
return fmt.Errorf(msgKeymanagerNotSupported, cfg.Wallet.KeymanagerKind())
return fmt.Errorf(errKeymanagerNotSupported, cfg.Wallet.KeymanagerKind())
}
return nil
}

View File

@@ -62,7 +62,7 @@ func ListAccountsCli(cliCtx *cli.Context) error {
return errors.Wrap(err, "could not list validator accounts with remote keymanager")
}
default:
return fmt.Errorf(msgKeymanagerNotSupported, w.KeymanagerKind().String())
return fmt.Errorf(errKeymanagerNotSupported, w.KeymanagerKind().String())
}
return nil
}

View File

@@ -13,6 +13,7 @@ import (
validatorpb "github.com/prysmaticlabs/prysm/proto/validator/accounts/v2"
"github.com/prysmaticlabs/prysm/shared/bls"
"github.com/prysmaticlabs/prysm/shared/bytesutil"
"github.com/prysmaticlabs/prysm/shared/event"
"github.com/prysmaticlabs/prysm/shared/petnames"
"github.com/prysmaticlabs/prysm/shared/testutil/assert"
"github.com/prysmaticlabs/prysm/shared/testutil/require"
@@ -41,6 +42,10 @@ func (m *mockRemoteKeymanager) Sign(context.Context, *validatorpb.SignRequest) (
return nil, nil
}
func (m *mockRemoteKeymanager) SubscribeAccountChanges(_ chan [][48]byte) event.Subscription {
return nil
}
func createRandomKeystore(t testing.TB, password string) *keymanager.Keystore {
encryptor := keystorev4.New()
id, err := uuid.NewRandom()

View File

@@ -116,7 +116,7 @@ func CreateWalletWithKeymanager(ctx context.Context, cfg *CreateWalletConfig) (*
"Successfully created wallet with remote keymanager configuration",
)
default:
return nil, errors.Wrapf(err, msgKeymanagerNotSupported, w.KeymanagerKind())
return nil, errors.Wrapf(err, errKeymanagerNotSupported, w.KeymanagerKind())
}
return w, nil
}

View File

@@ -50,7 +50,7 @@ func EditWalletConfigurationCli(cliCtx *cli.Context) error {
return errors.Wrap(err, "could not write config to disk")
}
default:
return fmt.Errorf(msgKeymanagerNotSupported, w.KeymanagerKind())
return fmt.Errorf(errKeymanagerNotSupported, w.KeymanagerKind())
}
return nil
}

View File

@@ -103,8 +103,10 @@ go_test(
"//shared/testutil/assert:go_default_library",
"//shared/testutil/require:go_default_library",
"//shared/timeutils:go_default_library",
"//validator/accounts/testing:go_default_library",
"//validator/db/testing:go_default_library",
"//validator/graffiti:go_default_library",
"//validator/keymanager/derived:go_default_library",
"//validator/testing:go_default_library",
"@com_github_gogo_protobuf//types:go_default_library",
"@com_github_golang_mock//gomock:go_default_library",
@@ -114,6 +116,8 @@ go_test(
"@com_github_prysmaticlabs_go_bitfield//:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",
"@com_github_sirupsen_logrus//hooks/test:go_default_library",
"@com_github_tyler_smith_go_bip39//:go_default_library",
"@com_github_wealdtech_go_eth2_util//:go_default_library",
"@in_gopkg_d4l3k_messagediff_v1//:go_default_library",
"@org_golang_google_grpc//metadata:go_default_library",
],

View File

@@ -6,6 +6,7 @@ import (
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
"github.com/prysmaticlabs/prysm/shared/timeutils"
"github.com/prysmaticlabs/prysm/validator/keymanager"
)
var _ Validator = (*FakeValidator)(nil)
@@ -41,6 +42,7 @@ type FakeValidator struct {
IndexToPubkeyMap map[uint64][48]byte
PubkeyToIndexMap map[[48]byte]uint64
PubkeysToStatusesMap map[[48]byte]ethpb.ValidatorStatus
Keymanager keymanager.IKeymanager
}
type ctxKey string
@@ -65,7 +67,7 @@ func (fv *FakeValidator) WaitForChainStart(_ context.Context) error {
}
// WaitForActivation for mocking.
func (fv *FakeValidator) WaitForActivation(_ context.Context) error {
func (fv *FakeValidator) WaitForActivation(_ context.Context, _ chan struct{}) error {
fv.WaitForActivationCalled = true
return nil
}
@@ -187,5 +189,10 @@ func (fv *FakeValidator) AllValidatorsAreExited(ctx context.Context) (bool, erro
return ctx.Value(allValidatorsAreExitedCtxKey).(bool), nil
}
// GetKeymanager for mocking
func (fv *FakeValidator) GetKeymanager() keymanager.IKeymanager {
return fv.Keymanager
}
// ReceiveBlocks for mocking
func (fv *FakeValidator) ReceiveBlocks(ctx context.Context) {}

View File

@@ -10,6 +10,7 @@ import (
"github.com/prysmaticlabs/prysm/shared/bytesutil"
"github.com/prysmaticlabs/prysm/shared/featureconfig"
"github.com/prysmaticlabs/prysm/shared/params"
"github.com/prysmaticlabs/prysm/validator/keymanager"
"go.opencensus.io/trace"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
@@ -20,7 +21,7 @@ type Validator interface {
Done()
WaitForChainStart(ctx context.Context) error
WaitForSync(ctx context.Context) error
WaitForActivation(ctx context.Context) error
WaitForActivation(ctx context.Context, accountsChangedChan chan struct{}) error
SlasherReady(ctx context.Context) error
CanonicalHeadSlot(ctx context.Context) (uint64, error)
NextSlot() <-chan uint64
@@ -36,6 +37,7 @@ type Validator interface {
UpdateDomainDataCaches(ctx context.Context, slot uint64)
WaitForWalletInitialization(ctx context.Context) error
AllValidatorsAreExited(ctx context.Context) (bool, error)
GetKeymanager() keymanager.IKeymanager
ReceiveBlocks(ctx context.Context)
}
@@ -68,9 +70,13 @@ func run(ctx context.Context, v Validator) {
if err := v.WaitForSync(ctx); err != nil {
log.Fatalf("Could not determine if beacon node synced: %v", err)
}
if err := v.WaitForActivation(ctx); err != nil {
accountsChangedChan := make(chan struct{}, 1)
go handleAccountsChanged(ctx, v, accountsChangedChan)
if err := v.WaitForActivation(ctx, accountsChangedChan); err != nil {
log.Fatalf("Could not wait for validator activation: %v", err)
}
go v.ReceiveBlocks(ctx)
headSlot, err := v.CanonicalHeadSlot(ctx)
@@ -175,3 +181,24 @@ func handleAssignmentError(err error, slot uint64) {
log.WithField("error", err).Error("Failed to update assignments")
}
}
func handleAccountsChanged(ctx context.Context, v Validator, accountsChangedChan chan struct{}) {
validatingPubKeysChan := make(chan [][48]byte, 1)
var sub = v.GetKeymanager().SubscribeAccountChanges(validatingPubKeysChan)
defer func() {
sub.Unsubscribe()
close(validatingPubKeysChan)
}()
for {
select {
case <-validatingPubKeysChan:
accountsChangedChan <- struct{}{}
case err := <-sub.Err():
log.WithError(err).Error("accounts changed subscription failed")
return
case <-ctx.Done():
return
}
}
}

View File

@@ -6,12 +6,15 @@ import (
"testing"
"time"
"github.com/prysmaticlabs/prysm/shared/event"
"github.com/prysmaticlabs/prysm/shared/featureconfig"
"github.com/prysmaticlabs/prysm/shared/testutil/assert"
"github.com/prysmaticlabs/prysm/shared/testutil/require"
logTest "github.com/sirupsen/logrus/hooks/test"
)
var walletPassword = "OhWOWthisisatest42!$"
func cancelledContext() context.Context {
ctx, cancel := context.WithCancel(context.Background())
cancel()
@@ -19,25 +22,25 @@ func cancelledContext() context.Context {
}
func TestCancelledContext_CleansUpValidator(t *testing.T) {
v := &FakeValidator{}
v := &FakeValidator{Keymanager: &mockKeymanager{accountsChangedFeed: &event.Feed{}}}
run(cancelledContext(), v)
assert.Equal(t, true, v.DoneCalled, "Expected Done() to be called")
}
func TestCancelledContext_WaitsForChainStart(t *testing.T) {
v := &FakeValidator{}
v := &FakeValidator{Keymanager: &mockKeymanager{accountsChangedFeed: &event.Feed{}}}
run(cancelledContext(), v)
assert.Equal(t, true, v.WaitForChainStartCalled, "Expected WaitForChainStart() to be called")
}
func TestCancelledContext_WaitsForActivation(t *testing.T) {
v := &FakeValidator{}
v := &FakeValidator{Keymanager: &mockKeymanager{accountsChangedFeed: &event.Feed{}}}
run(cancelledContext(), v)
assert.Equal(t, true, v.WaitForActivationCalled, "Expected WaitForActivation() to be called")
}
func TestCancelledContext_ChecksSlasherReady(t *testing.T) {
v := &FakeValidator{}
v := &FakeValidator{Keymanager: &mockKeymanager{accountsChangedFeed: &event.Feed{}}}
cfg := &featureconfig.Flags{
SlasherProtection: true,
}
@@ -48,7 +51,7 @@ func TestCancelledContext_ChecksSlasherReady(t *testing.T) {
}
func TestUpdateDuties_NextSlot(t *testing.T) {
v := &FakeValidator{}
v := &FakeValidator{Keymanager: &mockKeymanager{accountsChangedFeed: &event.Feed{}}}
ctx, cancel := context.WithCancel(context.Background())
slot := uint64(55)
@@ -68,7 +71,7 @@ func TestUpdateDuties_NextSlot(t *testing.T) {
func TestUpdateDuties_HandlesError(t *testing.T) {
hook := logTest.NewGlobal()
v := &FakeValidator{}
v := &FakeValidator{Keymanager: &mockKeymanager{accountsChangedFeed: &event.Feed{}}}
ctx, cancel := context.WithCancel(context.Background())
slot := uint64(55)
@@ -87,7 +90,7 @@ func TestUpdateDuties_HandlesError(t *testing.T) {
}
func TestRoleAt_NextSlot(t *testing.T) {
v := &FakeValidator{}
v := &FakeValidator{Keymanager: &mockKeymanager{accountsChangedFeed: &event.Feed{}}}
ctx, cancel := context.WithCancel(context.Background())
slot := uint64(55)
@@ -106,7 +109,7 @@ func TestRoleAt_NextSlot(t *testing.T) {
}
func TestAttests_NextSlot(t *testing.T) {
v := &FakeValidator{}
v := &FakeValidator{Keymanager: &mockKeymanager{accountsChangedFeed: &event.Feed{}}}
ctx, cancel := context.WithCancel(context.Background())
slot := uint64(55)
@@ -126,7 +129,7 @@ func TestAttests_NextSlot(t *testing.T) {
}
func TestProposes_NextSlot(t *testing.T) {
v := &FakeValidator{}
v := &FakeValidator{Keymanager: &mockKeymanager{accountsChangedFeed: &event.Feed{}}}
ctx, cancel := context.WithCancel(context.Background())
slot := uint64(55)
@@ -146,7 +149,7 @@ func TestProposes_NextSlot(t *testing.T) {
}
func TestBothProposesAndAttests_NextSlot(t *testing.T) {
v := &FakeValidator{}
v := &FakeValidator{Keymanager: &mockKeymanager{accountsChangedFeed: &event.Feed{}}}
ctx, cancel := context.WithCancel(context.Background())
slot := uint64(55)
@@ -168,7 +171,7 @@ func TestBothProposesAndAttests_NextSlot(t *testing.T) {
}
func TestAllValidatorsAreExited_NextSlot(t *testing.T) {
v := &FakeValidator{}
v := &FakeValidator{Keymanager: &mockKeymanager{accountsChangedFeed: &event.Feed{}}}
ctx, cancel := context.WithCancel(context.WithValue(context.Background(), allValidatorsAreExitedCtxKey, true))
hook := logTest.NewGlobal()
@@ -183,3 +186,55 @@ func TestAllValidatorsAreExited_NextSlot(t *testing.T) {
run(ctx, v)
assert.LogsContain(t, hook, "All validators are exited")
}
func TestHandleAccountsChanged_Ok(t *testing.T) {
ctx := context.Background()
defer ctx.Done()
km := &mockKeymanager{accountsChangedFeed: &event.Feed{}}
v := &FakeValidator{Keymanager: km}
channel := make(chan struct{})
go handleAccountsChanged(ctx, v, channel)
time.Sleep(time.Second) // Allow time for subscribing to changes.
km.SimulateAccountChanges()
time.Sleep(time.Second) // Allow time for handling subscribed changes.
select {
case _, ok := <-channel:
if !ok {
t.Error("Account changed channel is closed")
}
default:
t.Error("Accounts changed channel is empty")
}
}
func TestHandleAccountsChanged_CtxCancelled(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
km := &mockKeymanager{accountsChangedFeed: &event.Feed{}}
v := &FakeValidator{Keymanager: km}
channel := make(chan struct{}, 2)
go handleAccountsChanged(ctx, v, channel)
time.Sleep(time.Second) // Allow time for subscribing to changes.
km.SimulateAccountChanges()
time.Sleep(time.Second) // Allow time for handling subscribed changes.
cancel()
time.Sleep(time.Second) // Allow time for handling cancellation.
km.SimulateAccountChanges()
time.Sleep(time.Second) // Allow time for handling subscribed changes.
var values int
for loop := true; loop == true; {
select {
case _, ok := <-channel:
if ok {
values++
}
default:
loop = false
}
}
assert.Equal(t, 1, values, "Incorrect number of values were passed to the channel")
}

View File

@@ -330,7 +330,10 @@ func recheckValidatingKeysBucket(ctx context.Context, valDB db.Database, km keym
}
validatingPubKeysChan := make(chan [][48]byte, 1)
sub := importedKeymanager.SubscribeAccountChanges(validatingPubKeysChan)
defer sub.Unsubscribe()
defer func() {
sub.Unsubscribe()
close(validatingPubKeysChan)
}()
for {
select {
case keys := <-validatingPubKeysChan:

View File

@@ -491,6 +491,11 @@ func (v *validator) RolesAt(ctx context.Context, slot uint64) (map[[48]byte][]Va
return rolesAt, nil
}
// GetKeymanager returns the underlying validator's keymanager.
func (v *validator) GetKeymanager() keymanager.IKeymanager {
return v.keyManager
}
// isAggregator checks if a validator is an aggregator of a given slot, it uses the selection algorithm outlined in:
// https://github.com/ethereum/eth2.0-specs/blob/v0.9.3/specs/validator/0_beacon-chain-validator.md#aggregation-selection
func (v *validator) isAggregator(ctx context.Context, committee []uint64, slot uint64, pubKey [48]byte) (bool, error) {

View File

@@ -47,9 +47,10 @@ func genMockKeymanger(numKeys int) *mockKeymanager {
}
type mockKeymanager struct {
lock sync.RWMutex
keysMap map[[48]byte]bls.SecretKey
fetchNoKeys bool
lock sync.RWMutex
keysMap map[[48]byte]bls.SecretKey
fetchNoKeys bool
accountsChangedFeed *event.Feed
}
func (m *mockKeymanager) FetchValidatingPublicKeys(ctx context.Context) ([][48]byte, error) {
@@ -89,6 +90,14 @@ func (m *mockKeymanager) Sign(ctx context.Context, req *validatorpb.SignRequest)
return sig, nil
}
func (m *mockKeymanager) SubscribeAccountChanges(pubKeysChan chan [][48]byte) event.Subscription {
return m.accountsChangedFeed.Subscribe(pubKeysChan)
}
func (m *mockKeymanager) SimulateAccountChanges() {
m.accountsChangedFeed.Send(make([][48]byte, 0))
}
func generateMockStatusResponse(pubkeys [][]byte) *ethpb.ValidatorActivationResponse {
multipleStatus := make([]*ethpb.ValidatorActivationResponse_Status, len(pubkeys))
for i, key := range pubkeys {
@@ -351,7 +360,7 @@ func TestWaitMultipleActivation_LogsActivationEpochOK(t *testing.T) {
resp,
nil,
)
require.NoError(t, v.WaitForActivation(context.Background()), "Could not wait for activation")
require.NoError(t, v.WaitForActivation(context.Background(), make(chan struct{})), "Could not wait for activation")
require.LogsContain(t, hook, "Validator activated")
}
@@ -389,7 +398,7 @@ func TestWaitActivation_NotAllValidatorsActivatedOK(t *testing.T) {
resp,
nil,
)
assert.NoError(t, v.WaitForActivation(context.Background()), "Could not wait for activation")
assert.NoError(t, v.WaitForActivation(context.Background(), make(chan struct{})), "Could not wait for activation")
}
func TestWaitSync_ContextCanceled(t *testing.T) {

View File

@@ -20,7 +20,7 @@ import (
// WaitForActivation checks whether the validator pubkey is in the active
// validator set. If not, this operation will block until an activation message is
// received.
func (v *validator) WaitForActivation(ctx context.Context) error {
func (v *validator) WaitForActivation(ctx context.Context, accountsChangedChan chan struct{}) error {
ctx, span := trace.StartSpan(ctx, "validator.WaitForActivation")
defer span.End()
@@ -36,17 +36,14 @@ func (v *validator) WaitForActivation(ctx context.Context) error {
for {
select {
case <-ticker.C:
keys, err := v.keyManager.FetchValidatingPublicKeys(ctx)
validatingKeys, err = v.keyManager.FetchValidatingPublicKeys(ctx)
if err != nil {
return errors.Wrap(err, msgCouldNotFetchKeys)
}
if len(keys) == 0 {
if len(validatingKeys) == 0 {
log.Warn(msgNoKeysFetched)
continue
}
// after this statement we jump out of `select` and hit `break`,
// thus jumping out of `for` into the rest of the function
validatingKeys = keys
case <-ctx.Done():
log.Debug("Context closed, exiting fetching validating keys")
return ctx.Err()
@@ -66,44 +63,52 @@ func (v *validator) WaitForActivation(ctx context.Context) error {
Error("Stream broken while waiting for activation. Reconnecting...")
// Reconnection attempt backoff, up to 60s.
time.Sleep(time.Second * time.Duration(mathutil.Min(uint64(attempts), 60)))
return v.WaitForActivation(incrementRetries(ctx))
return v.WaitForActivation(incrementRetries(ctx), accountsChangedChan)
}
for {
res, err := stream.Recv()
// If the stream is closed, we stop the loop.
if errors.Is(err, io.EOF) {
break
}
// If context is canceled we stop the loop.
if ctx.Err() == context.Canceled {
return errors.Wrap(ctx.Err(), "context has been canceled so shutting down the loop")
}
if err != nil {
traceutil.AnnotateError(span, err)
attempts := getStreamAttempts(ctx)
log.WithError(err).WithField("attempts", attempts).
Error("Stream broken while waiting for activation. Reconnecting...")
// Reconnection attempt backoff, up to 60s.
time.Sleep(time.Second * time.Duration(mathutil.Min(uint64(attempts), 60)))
return v.WaitForActivation(incrementRetries(ctx))
}
valActivated := v.checkAndLogValidatorStatus(res.Statuses)
if valActivated {
for _, statusResp := range res.Statuses {
if statusResp.Status.Status != ethpb.ValidatorStatus_ACTIVE {
continue
}
log.WithFields(logrus.Fields{
"publicKey": fmt.Sprintf("%#x", bytesutil.Trunc(statusResp.PublicKey)),
"index": statusResp.Index,
}).Info("Validator activated")
select {
case <-accountsChangedChan:
// Accounts (keys) changed, restart the process.
return v.WaitForActivation(ctx, accountsChangedChan)
default:
res, err := stream.Recv()
// If the stream is closed, we stop the loop.
if errors.Is(err, io.EOF) {
break
}
break
}
}
v.ticker = slotutil.GetSlotTicker(time.Unix(int64(v.genesisTime), 0), params.BeaconConfig().SecondsPerSlot)
// If context is canceled we return from the function.
if ctx.Err() == context.Canceled {
return errors.Wrap(ctx.Err(), "context has been canceled so shutting down the loop")
}
if err != nil {
traceutil.AnnotateError(span, err)
attempts := getStreamAttempts(ctx)
log.WithError(err).WithField("attempts", attempts).
Error("Stream broken while waiting for activation. Reconnecting...")
// Reconnection attempt backoff, up to 60s.
time.Sleep(time.Second * time.Duration(mathutil.Min(uint64(attempts), 60)))
return v.WaitForActivation(incrementRetries(ctx), accountsChangedChan)
}
valActivated := v.checkAndLogValidatorStatus(res.Statuses)
if valActivated {
for _, statusResp := range res.Statuses {
if statusResp.Status.Status != ethpb.ValidatorStatus_ACTIVE {
continue
}
log.WithFields(logrus.Fields{
"publicKey": fmt.Sprintf("%#x", bytesutil.Trunc(statusResp.PublicKey)),
"index": statusResp.Index,
}).Info("Validator activated")
}
} else {
continue
}
}
break
}
v.ticker = slotutil.GetSlotTicker(time.Unix(int64(v.genesisTime), 0), params.BeaconConfig().SecondsPerSlot)
return nil
}

View File

@@ -2,6 +2,7 @@ package client
import (
"context"
"fmt"
"testing"
"time"
@@ -12,7 +13,11 @@ import (
"github.com/prysmaticlabs/prysm/shared/mock"
"github.com/prysmaticlabs/prysm/shared/testutil/assert"
"github.com/prysmaticlabs/prysm/shared/testutil/require"
walletMock "github.com/prysmaticlabs/prysm/validator/accounts/testing"
"github.com/prysmaticlabs/prysm/validator/keymanager/derived"
logTest "github.com/sirupsen/logrus/hooks/test"
"github.com/tyler-smith/go-bip39"
util "github.com/wealdtech/go-eth2-util"
)
func TestWaitActivation_ContextCanceled(t *testing.T) {
@@ -46,7 +51,7 @@ func TestWaitActivation_ContextCanceled(t *testing.T) {
)
ctx, cancel := context.WithCancel(context.Background())
cancel()
assert.ErrorContains(t, cancelledCtx, v.WaitForActivation(ctx))
assert.ErrorContains(t, cancelledCtx, v.WaitForActivation(ctx, make(chan struct{})))
}
func TestWaitActivation_StreamSetupFails_AttemptsToReconnect(t *testing.T) {
@@ -77,7 +82,7 @@ func TestWaitActivation_StreamSetupFails_AttemptsToReconnect(t *testing.T) {
resp := generateMockStatusResponse([][]byte{pubKey[:]})
resp.Statuses[0].Status.Status = ethpb.ValidatorStatus_ACTIVE
clientStream.EXPECT().Recv().Return(resp, nil)
assert.NoError(t, v.WaitForActivation(context.Background()))
assert.NoError(t, v.WaitForActivation(context.Background(), make(chan struct{})))
}
func TestWaitForActivation_ReceiveErrorFromStream_AttemptsReconnection(t *testing.T) {
@@ -112,7 +117,7 @@ func TestWaitForActivation_ReceiveErrorFromStream_AttemptsReconnection(t *testin
nil,
errors.New("fails"),
).Return(resp, nil)
assert.NoError(t, v.WaitForActivation(context.Background()))
assert.NoError(t, v.WaitForActivation(context.Background(), make(chan struct{})))
}
func TestWaitActivation_LogsActivationEpochOK(t *testing.T) {
@@ -147,8 +152,8 @@ func TestWaitActivation_LogsActivationEpochOK(t *testing.T) {
resp,
nil,
)
assert.NoError(t, v.WaitForActivation(context.Background()), "Could not wait for activation")
require.LogsContain(t, hook, "Validator activated")
assert.NoError(t, v.WaitForActivation(context.Background(), make(chan struct{})), "Could not wait for activation")
assert.LogsContain(t, hook, "Validator activated")
}
func TestWaitForActivation_Exiting(t *testing.T) {
@@ -182,7 +187,7 @@ func TestWaitForActivation_Exiting(t *testing.T) {
resp,
nil,
)
require.NoError(t, v.WaitForActivation(context.Background()))
assert.NoError(t, v.WaitForActivation(context.Background(), make(chan struct{})))
}
func TestWaitForActivation_RefetchKeys(t *testing.T) {
@@ -190,7 +195,7 @@ func TestWaitForActivation_RefetchKeys(t *testing.T) {
defer func() {
keyRefetchPeriod = originalPeriod
}()
keyRefetchPeriod = 5 * time.Second
keyRefetchPeriod = 1 * time.Second
hook := logTest.NewGlobal()
ctrl := gomock.NewController(t)
@@ -224,7 +229,152 @@ func TestWaitForActivation_RefetchKeys(t *testing.T) {
resp,
nil,
)
assert.NoError(t, v.WaitForActivation(context.Background()), "Could not wait for activation")
assert.NoError(t, v.WaitForActivation(context.Background(), make(chan struct{})), "Could not wait for activation")
assert.LogsContain(t, hook, msgNoKeysFetched)
assert.LogsContain(t, hook, "Validator activated")
}
// Regression test for a scenario where you start with an inactive key and then import an active key.
func TestWaitForActivation_AccountsChanged(t *testing.T) {
hook := logTest.NewGlobal()
ctrl := gomock.NewController(t)
defer ctrl.Finish()
t.Run("Imported keymanager", func(t *testing.T) {
inactivePrivKey, err := bls.RandKey()
require.NoError(t, err)
inactivePubKey := [48]byte{}
copy(inactivePubKey[:], inactivePrivKey.PublicKey().Marshal())
activePrivKey, err := bls.RandKey()
require.NoError(t, err)
activePubKey := [48]byte{}
copy(activePubKey[:], activePrivKey.PublicKey().Marshal())
km := &mockKeymanager{
keysMap: map[[48]byte]bls.SecretKey{
inactivePubKey: inactivePrivKey,
},
}
client := mock.NewMockBeaconNodeValidatorClient(ctrl)
v := validator{
validatorClient: client,
keyManager: km,
genesisTime: 1,
}
inactiveResp := generateMockStatusResponse([][]byte{inactivePubKey[:]})
inactiveResp.Statuses[0].Status.Status = ethpb.ValidatorStatus_UNKNOWN_STATUS
inactiveClientStream := mock.NewMockBeaconNodeValidator_WaitForActivationClient(ctrl)
client.EXPECT().WaitForActivation(
gomock.Any(),
&ethpb.ValidatorActivationRequest{
PublicKeys: [][]byte{inactivePubKey[:]},
},
).Return(inactiveClientStream, nil)
inactiveClientStream.EXPECT().Recv().Return(
inactiveResp,
nil,
).AnyTimes()
activeResp := generateMockStatusResponse([][]byte{inactivePubKey[:], activePubKey[:]})
activeResp.Statuses[0].Status.Status = ethpb.ValidatorStatus_UNKNOWN_STATUS
activeResp.Statuses[1].Status.Status = ethpb.ValidatorStatus_ACTIVE
activeClientStream := mock.NewMockBeaconNodeValidator_WaitForActivationClient(ctrl)
client.EXPECT().WaitForActivation(
gomock.Any(),
&ethpb.ValidatorActivationRequest{
PublicKeys: [][]byte{inactivePubKey[:], activePubKey[:]},
},
).Return(activeClientStream, nil)
activeClientStream.EXPECT().Recv().Return(
activeResp,
nil,
)
channel := make(chan struct{})
go func() {
// We add the active key into the keymanager and simulate a key refresh.
time.Sleep(time.Second * 1)
km.keysMap[activePubKey] = activePrivKey
channel <- struct{}{}
}()
assert.NoError(t, v.WaitForActivation(context.Background(), channel))
assert.LogsContain(t, hook, "Waiting for deposit to be observed by beacon node")
assert.LogsContain(t, hook, "Validator activated")
})
t.Run("Derived keymanager", func(t *testing.T) {
mnemonic := "tumble turn jewel sudden social great water general cabin jacket bounce dry flip monster advance problem social half flee inform century chicken hard reason"
seed := bip39.NewSeed(mnemonic, "")
inactivePrivKey, err :=
util.PrivateKeyFromSeedAndPath(seed, fmt.Sprintf(derived.ValidatingKeyDerivationPathTemplate, 0))
require.NoError(t, err)
inactivePubKey := [48]byte{}
copy(inactivePubKey[:], inactivePrivKey.PublicKey().Marshal())
activePrivKey, err :=
util.PrivateKeyFromSeedAndPath(seed, fmt.Sprintf(derived.ValidatingKeyDerivationPathTemplate, 1))
require.NoError(t, err)
activePubKey := [48]byte{}
copy(activePubKey[:], activePrivKey.PublicKey().Marshal())
wallet := &walletMock.Wallet{
Files: make(map[string]map[string][]byte),
AccountPasswords: make(map[string]string),
WalletPassword: "secretPassw0rd$1999",
}
ctx := context.Background()
km, err := derived.NewKeymanager(ctx, &derived.SetupConfig{
Wallet: wallet,
})
require.NoError(t, err)
err = km.RecoverAccountsFromMnemonic(ctx, mnemonic, "", 1)
require.NoError(t, err)
client := mock.NewMockBeaconNodeValidatorClient(ctrl)
v := validator{
validatorClient: client,
keyManager: km,
genesisTime: 1,
}
inactiveResp := generateMockStatusResponse([][]byte{inactivePubKey[:]})
inactiveResp.Statuses[0].Status.Status = ethpb.ValidatorStatus_UNKNOWN_STATUS
inactiveClientStream := mock.NewMockBeaconNodeValidator_WaitForActivationClient(ctrl)
client.EXPECT().WaitForActivation(
gomock.Any(),
&ethpb.ValidatorActivationRequest{
PublicKeys: [][]byte{inactivePubKey[:]},
},
).Return(inactiveClientStream, nil)
inactiveClientStream.EXPECT().Recv().Return(
inactiveResp,
nil,
).AnyTimes()
activeResp := generateMockStatusResponse([][]byte{inactivePubKey[:], activePubKey[:]})
activeResp.Statuses[0].Status.Status = ethpb.ValidatorStatus_UNKNOWN_STATUS
activeResp.Statuses[1].Status.Status = ethpb.ValidatorStatus_ACTIVE
activeClientStream := mock.NewMockBeaconNodeValidator_WaitForActivationClient(ctrl)
client.EXPECT().WaitForActivation(
gomock.Any(),
&ethpb.ValidatorActivationRequest{
PublicKeys: [][]byte{inactivePubKey[:], activePubKey[:]},
},
).Return(activeClientStream, nil)
activeClientStream.EXPECT().Recv().Return(
activeResp,
nil,
)
channel := make(chan struct{})
go func() {
// We add the active key into the keymanager and simulate a key refresh.
time.Sleep(time.Second * 1)
err = km.RecoverAccountsFromMnemonic(ctx, mnemonic, "", 2)
require.NoError(t, err)
channel <- struct{}{}
}()
assert.NoError(t, v.WaitForActivation(context.Background(), channel))
assert.LogsContain(t, hook, "Waiting for deposit to be observed by beacon node")
assert.LogsContain(t, hook, "Validator activated")
})
}

View File

@@ -13,6 +13,7 @@ go_library(
deps = [
"//proto/validator/accounts/v2:go_default_library",
"//shared/bls:go_default_library",
"//shared/event:go_default_library",
],
)

View File

@@ -13,6 +13,7 @@ go_library(
deps = [
"//proto/validator/accounts/v2:go_default_library",
"//shared/bls:go_default_library",
"//shared/event:go_default_library",
"//shared/promptutil:go_default_library",
"//shared/rand:go_default_library",
"//validator/accounts/iface:go_default_library",

View File

@@ -7,6 +7,7 @@ import (
"github.com/pkg/errors"
validatorpb "github.com/prysmaticlabs/prysm/proto/validator/accounts/v2"
"github.com/prysmaticlabs/prysm/shared/bls"
"github.com/prysmaticlabs/prysm/shared/event"
"github.com/prysmaticlabs/prysm/validator/accounts/iface"
"github.com/prysmaticlabs/prysm/validator/keymanager"
"github.com/prysmaticlabs/prysm/validator/keymanager/imported"
@@ -112,3 +113,10 @@ func (km *Keymanager) FetchValidatingPrivateKeys(ctx context.Context) ([][32]byt
func (km *Keymanager) DeleteAccounts(ctx context.Context, publicKeys [][]byte) error {
return km.importedKM.DeleteAccounts(ctx, publicKeys)
}
// SubscribeAccountChanges creates an event subscription for a channel
// to listen for public key changes at runtime, such as when new validator accounts
// are imported into the keymanager while the validator process is running.
func (dr *Keymanager) SubscribeAccountChanges(pubKeysChan chan [][48]byte) event.Subscription {
return dr.importedKM.SubscribeAccountChanges(pubKeysChan)
}

View File

@@ -17,6 +17,7 @@ go_library(
"//proto/validator/accounts/v2:go_default_library",
"//shared/bls:go_default_library",
"//shared/bytesutil:go_default_library",
"//shared/event:go_default_library",
"@com_github_gogo_protobuf//types:go_default_library",
"@com_github_logrusorgru_aurora//:go_default_library",
"@com_github_pkg_errors//:go_default_library",

View File

@@ -16,6 +16,7 @@ import (
validatorpb "github.com/prysmaticlabs/prysm/proto/validator/accounts/v2"
"github.com/prysmaticlabs/prysm/shared/bls"
"github.com/prysmaticlabs/prysm/shared/bytesutil"
"github.com/prysmaticlabs/prysm/shared/event"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
)
@@ -227,3 +228,11 @@ func (km *Keymanager) Sign(ctx context.Context, req *validatorpb.SignRequest) (b
}
return bls.SignatureFromBytes(resp.Signature)
}
// SubscribeAccountChanges is currently NOT IMPLEMENTED for the remote keymanager.
// INVOKING THIS FUNCTION HAS NO EFFECT!
func (k *Keymanager) SubscribeAccountChanges(_ chan [][48]byte) event.Subscription {
return event.NewSubscription(func(i <-chan struct{}) error {
return nil
})
}

View File

@@ -6,16 +6,19 @@ import (
validatorpb "github.com/prysmaticlabs/prysm/proto/validator/accounts/v2"
"github.com/prysmaticlabs/prysm/shared/bls"
"github.com/prysmaticlabs/prysm/shared/event"
)
// IKeymanager defines a general keymanager interface for Prysm wallets.
type IKeymanager interface {
// FetchValidatingKeys fetches the list of active public keys that should be used to validate with.
// FetchValidatingPublicKeys fetches the list of active public keys that should be used to validate with.
FetchValidatingPublicKeys(ctx context.Context) ([][48]byte, error)
// FetchAllValidatingKeys fetches the list of all public keys, including disabled ones.
// FetchAllValidatingPublicKeys fetches the list of all public keys, including disabled ones.
FetchAllValidatingPublicKeys(ctx context.Context) ([][48]byte, error)
// Sign signs a message using a validator key.
Sign(context.Context, *validatorpb.SignRequest) (bls.Signature, error)
// SubscribeAccountChanges subscribes to changes made to the underlying keys.
SubscribeAccountChanges(pubKeysChan chan [][48]byte) event.Subscription
}
// Keystore json file representation as a Go struct.