Improve wait for activation (#13448)

* removing timeout on wait for activation, instead switched to an event driven approach

* fixing unit tests

* linting

* simplifying return

* adding sleep for the remaining slot to avoid cpu spikes

* removing ifstatement on log

* removing ifstatement on log

* improving switch statement

* removing the loop entirely

* fixing unit test

* fixing manu's reported issue with deletion of json file

* missed change around writefile at path

* gofmt

* fixing deepsource issue with reading file

* trying to clean file to avoid deepsource issue

* still getting error trying a different approach

* fixing stream loop

* fixing unit test

* Update validator/keymanager/local/keymanager.go

Co-authored-by: Manu NALEPA <enalepa@offchainlabs.com>

* fixing linting

---------

Co-authored-by: Manu NALEPA <enalepa@offchainlabs.com>
This commit is contained in:
james-prysm
2024-01-16 11:04:54 -06:00
committed by GitHub
parent 46387a903a
commit 790a09f9b1
11 changed files with 161 additions and 160 deletions

View File

@@ -23,7 +23,7 @@ type Wallet interface {
// Read methods for important wallet and accounts-related files.
ReadFileAtPath(ctx context.Context, filePath string, fileName string) ([]byte, error)
// Write methods to persist important wallet and accounts-related files to disk.
WriteFileAtPath(ctx context.Context, pathName string, fileName string, data []byte) error
WriteFileAtPath(ctx context.Context, pathName string, fileName string, data []byte) (bool, error)
// Method for initializing a new keymanager.
InitializeKeymanager(ctx context.Context, cfg InitKeymanagerConfig) (keymanager.IKeymanager, error)
}

View File

@@ -55,19 +55,19 @@ func (w *Wallet) Password() string {
}
// WriteFileAtPath --
func (w *Wallet) WriteFileAtPath(_ context.Context, pathName, fileName string, data []byte) error {
func (w *Wallet) WriteFileAtPath(_ context.Context, pathName, fileName string, data []byte) (bool, error) {
w.lock.Lock()
defer w.lock.Unlock()
if w.HasWriteFileError {
// reset the flag to not contaminate other tests
w.HasWriteFileError = false
return errors.New("could not write keystore file for accounts")
return false, errors.New("could not write keystore file for accounts")
}
if w.Files[pathName] == nil {
w.Files[pathName] = make(map[string][]byte)
}
w.Files[pathName][fileName] = data
return nil
return true, nil
}
// ReadFileAtPath --

View File

@@ -366,26 +366,27 @@ func (w *Wallet) InitializeKeymanager(ctx context.Context, cfg iface.InitKeymana
}
// WriteFileAtPath within the wallet directory given the desired path, filename, and raw data.
func (w *Wallet) WriteFileAtPath(_ context.Context, filePath, fileName string, data []byte) error {
func (w *Wallet) WriteFileAtPath(_ context.Context, filePath, fileName string, data []byte) (bool /* exited previously */, error) {
accountPath := filepath.Join(w.accountsPath, filePath)
hasDir, err := file.HasDir(accountPath)
if err != nil {
return err
return false, err
}
if !hasDir {
if err := file.MkdirAll(accountPath); err != nil {
return errors.Wrapf(err, "could not create path: %s", accountPath)
return false, errors.Wrapf(err, "could not create path: %s", accountPath)
}
}
fullPath := filepath.Join(accountPath, fileName)
existedPreviously := file.Exists(fullPath)
if err := file.WriteFile(fullPath, data); err != nil {
return errors.Wrapf(err, "could not write %s", filePath)
return false, errors.Wrapf(err, "could not write %s", filePath)
}
log.WithFields(logrus.Fields{
"path": fullPath,
"fileName": fileName,
}).Debug("Wrote new file at path")
return nil
return existedPreviously, nil
}
// ReadFileAtPath within the wallet directory given the desired path and filename.

View File

@@ -32,7 +32,8 @@ func (acm *CLIManager) WalletCreate(ctx context.Context) (*wallet.Wallet, error)
if err != nil {
return nil, err
}
if err = w.WriteFileAtPath(ctx, local.AccountsPath, local.AccountsKeystoreFileName, encodedAccounts); err != nil {
_, err = w.WriteFileAtPath(ctx, local.AccountsPath, local.AccountsKeystoreFileName, encodedAccounts)
if err != nil {
return nil, err
}
log.WithField("--wallet-dir", acm.walletDir).Info(

View File

@@ -48,10 +48,5 @@ func (v *validator) HandleKeyReload(ctx context.Context, currentKeys [][fieldpar
valCount = int64(valCounts[0].Count)
}
anyActive = v.checkAndLogValidatorStatus(statuses, valCount)
if anyActive {
logActiveValidatorStatus(statuses)
}
return anyActive, nil
return v.checkAndLogValidatorStatus(statuses, valCount), nil
}

View File

@@ -54,14 +54,13 @@ import (
// keyFetchPeriod is the frequency that we try to refetch validating keys
// in case no keys were fetched previously.
var (
keyRefetchPeriod = 30 * time.Second
ErrBuilderValidatorRegistration = errors.New("Builder API validator registration unsuccessful")
ErrValidatorsAllExited = errors.New("All validators are exited, no more work to perform...")
)
var (
msgCouldNotFetchKeys = "could not fetch validating keys"
msgNoKeysFetched = "No validating keys fetched. Trying again"
msgNoKeysFetched = "No validating keys fetched. Waiting for keys..."
)
type validator struct {
@@ -403,6 +402,10 @@ func (v *validator) checkAndLogValidatorStatus(statuses []*validatorStatus, acti
}
case ethpb.ValidatorStatus_ACTIVE, ethpb.ValidatorStatus_EXITING:
validatorActivated = true
log.WithFields(logrus.Fields{
"publicKey": fmt.Sprintf("%#x", bytesutil.Trunc(status.publicKey)),
"index": status.index,
}).Info("Validator activated")
case ethpb.ValidatorStatus_EXITED:
log.Info("Validator exited")
case ethpb.ValidatorStatus_INVALID:
@@ -416,18 +419,6 @@ func (v *validator) checkAndLogValidatorStatus(statuses []*validatorStatus, acti
return validatorActivated
}
func logActiveValidatorStatus(statuses []*validatorStatus) {
for _, s := range statuses {
if s.status.Status != ethpb.ValidatorStatus_ACTIVE {
continue
}
log.WithFields(logrus.Fields{
"publicKey": fmt.Sprintf("%#x", bytesutil.Trunc(s.publicKey)),
"index": s.index,
}).Info("Validator activated")
}
}
// CanonicalHeadSlot returns the slot of canonical block currently found in the
// beacon chain via RPC.
func (v *validator) CanonicalHeadSlot(ctx context.Context) (primitives.Slot, error) {

View File

@@ -388,43 +388,6 @@ func TestWaitMultipleActivation_LogsActivationEpochOK(t *testing.T) {
require.LogsContain(t, hook, "Validator activated")
}
func TestWaitActivation_NotAllValidatorsActivatedOK(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
validatorClient := validatormock.NewMockValidatorClient(ctrl)
beaconClient := validatormock.NewMockBeaconChainClient(ctrl)
prysmBeaconClient := validatormock.NewMockPrysmBeaconChainClient(ctrl)
kp := randKeypair(t)
v := validator{
validatorClient: validatorClient,
keyManager: newMockKeymanager(t, kp),
beaconClient: beaconClient,
prysmBeaconClient: prysmBeaconClient,
}
resp := generateMockStatusResponse([][]byte{kp.pub[:]})
resp.Statuses[0].Status.Status = ethpb.ValidatorStatus_ACTIVE
clientStream := mock2.NewMockBeaconNodeValidator_WaitForActivationClient(ctrl)
validatorClient.EXPECT().WaitForActivation(
gomock.Any(),
gomock.Any(),
).Return(clientStream, nil)
prysmBeaconClient.EXPECT().GetValidatorCount(
gomock.Any(),
"head",
[]validatorType.Status{validatorType.Active},
).Return([]iface.ValidatorCount{}, nil).Times(2)
clientStream.EXPECT().Recv().Return(
&ethpb.ValidatorActivationResponse{},
nil,
)
clientStream.EXPECT().Recv().Return(
resp,
nil,
)
assert.NoError(t, v.WaitForActivation(context.Background(), nil), "Could not wait for activation")
}
func TestWaitSync_ContextCanceled(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

View File

@@ -5,17 +5,14 @@ import (
"io"
"time"
validator2 "github.com/prysmaticlabs/prysm/v4/consensus-types/validator"
"github.com/prysmaticlabs/prysm/v4/validator/client/iface"
"github.com/pkg/errors"
fieldparams "github.com/prysmaticlabs/prysm/v4/config/fieldparams"
"github.com/prysmaticlabs/prysm/v4/config/params"
validator2 "github.com/prysmaticlabs/prysm/v4/consensus-types/validator"
"github.com/prysmaticlabs/prysm/v4/encoding/bytesutil"
"github.com/prysmaticlabs/prysm/v4/math"
"github.com/prysmaticlabs/prysm/v4/monitoring/tracing"
ethpb "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/v4/time/slots"
"github.com/prysmaticlabs/prysm/v4/validator/client/iface"
"go.opencensus.io/trace"
)
@@ -33,18 +30,18 @@ func (v *validator) WaitForActivation(ctx context.Context, accountsChangedChan c
if err != nil {
return err
}
// subscribe to the channel if it's the first time
sub := km.SubscribeAccountChanges(accountsChangedChan)
defer func() {
sub.Unsubscribe()
close(accountsChangedChan)
}()
}
return v.internalWaitForActivation(ctx, accountsChangedChan)
}
// internalWaitForActivation performs the following:
// 1) While the key manager is empty, poll the key manager until some validator keys exist.
// 1) While the key manager is empty, subscribe to keymanager changes until some validator keys exist.
// 2) Open a server side stream for activation events against the given keys.
// 3) In another go routine, the key manager is monitored for updates and emits an update event on
// the accountsChangedChan. When an event signal is received, restart the internalWaitForActivation routine.
@@ -53,39 +50,26 @@ func (v *validator) WaitForActivation(ctx context.Context, accountsChangedChan c
func (v *validator) internalWaitForActivation(ctx context.Context, accountsChangedChan <-chan [][fieldparams.BLSPubkeyLength]byte) error {
ctx, span := trace.StartSpan(ctx, "validator.WaitForActivation")
defer span.End()
validatingKeys, err := v.keyManager.FetchValidatingPublicKeys(ctx)
if err != nil {
return errors.Wrap(err, "could not fetch validating keys")
return errors.Wrap(err, msgCouldNotFetchKeys)
}
// if there are no validating keys, wait for some
if len(validatingKeys) == 0 {
log.Warn(msgNoKeysFetched)
ticker := time.NewTicker(keyRefetchPeriod)
defer ticker.Stop()
for {
select {
case <-ticker.C:
validatingKeys, err = v.keyManager.FetchValidatingPublicKeys(ctx)
if err != nil {
return errors.Wrap(err, msgCouldNotFetchKeys)
}
if len(validatingKeys) == 0 {
log.Warn(msgNoKeysFetched)
continue
}
case <-ctx.Done():
log.Debug("Context closed, exiting fetching validating keys")
return ctx.Err()
}
break
select {
case <-ctx.Done():
log.Debug("Context closed, exiting fetching validating keys")
return ctx.Err()
case <-accountsChangedChan:
// if the accounts changed try it again
return v.internalWaitForActivation(ctx, accountsChangedChan)
}
}
req := &ethpb.ValidatorActivationRequest{
stream, err := v.validatorClient.WaitForActivation(ctx, &ethpb.ValidatorActivationRequest{
PublicKeys: bytesutil.FromBytes48Array(validatingKeys),
}
stream, err := v.validatorClient.WaitForActivation(ctx, req)
})
if err != nil {
tracing.AnnotateError(span, err)
attempts := streamAttempts(ctx)
@@ -96,22 +80,17 @@ func (v *validator) internalWaitForActivation(ctx context.Context, accountsChang
return v.internalWaitForActivation(incrementRetries(ctx), accountsChangedChan)
}
if err = v.handleAccountsChanged(ctx, accountsChangedChan, &stream, span); err != nil {
return err
}
v.ticker = slots.NewSlotTicker(time.Unix(int64(v.genesisTime), 0), params.BeaconConfig().SecondsPerSlot)
return nil
}
func (v *validator) handleAccountsChanged(ctx context.Context, accountsChangedChan <-chan [][fieldparams.BLSPubkeyLength]byte, stream *ethpb.BeaconNodeValidator_WaitForActivationClient, span *trace.Span) error {
for {
someAreActive := false
for !someAreActive {
select {
case <-ctx.Done():
log.Debug("Context closed, exiting fetching validating keys")
return ctx.Err()
case <-accountsChangedChan:
// Accounts (keys) changed, restart the process.
return v.internalWaitForActivation(ctx, accountsChangedChan)
default:
res, err := (*stream).Recv()
res, err := (stream).Recv() // retrieve from stream one loop at a time
// If the stream is closed, we stop the loop.
if errors.Is(err, io.EOF) {
break
@@ -150,15 +129,10 @@ func (v *validator) handleAccountsChanged(ctx context.Context, accountsChangedCh
valCount = int64(valCounts[0].Count)
}
valActivated := v.checkAndLogValidatorStatus(statuses, valCount)
if valActivated {
logActiveValidatorStatus(statuses)
} else {
continue
}
someAreActive = v.checkAndLogValidatorStatus(statuses, valCount)
}
break
}
return nil
}

View File

@@ -7,6 +7,7 @@ import (
"testing"
"time"
"github.com/prysmaticlabs/prysm/v4/config/params"
validatorType "github.com/prysmaticlabs/prysm/v4/consensus-types/validator"
"github.com/prysmaticlabs/prysm/v4/validator/client/iface"
@@ -39,7 +40,7 @@ func TestWaitActivation_ContextCanceled(t *testing.T) {
beaconClient: beaconClient,
}
clientStream := mock.NewMockBeaconNodeValidator_WaitForActivationClient(ctrl)
ctx, cancel := context.WithCancel(context.Background())
validatorClient.EXPECT().WaitForActivation(
gomock.Any(),
&ethpb.ValidatorActivationRequest{
@@ -49,9 +50,7 @@ func TestWaitActivation_ContextCanceled(t *testing.T) {
clientStream.EXPECT().Recv().Return(
&ethpb.ValidatorActivationResponse{},
nil,
)
ctx, cancel := context.WithCancel(context.Background())
cancel()
).Do(func() { cancel() })
assert.ErrorContains(t, cancelledCtx, v.WaitForActivation(ctx, nil))
}
@@ -193,12 +192,11 @@ func TestWaitForActivation_Exiting(t *testing.T) {
}
func TestWaitForActivation_RefetchKeys(t *testing.T) {
originalPeriod := keyRefetchPeriod
defer func() {
keyRefetchPeriod = originalPeriod
}()
keyRefetchPeriod = 1 * time.Second
params.SetupTestConfigCleanup(t)
cfg := params.MainnetConfig().Copy()
cfg.ConfigName = "test"
cfg.SecondsPerSlot = 1
params.OverrideBeaconConfig(cfg)
hook := logTest.NewGlobal()
ctrl := gomock.NewController(t)
defer ctrl.Finish()
@@ -207,8 +205,7 @@ func TestWaitForActivation_RefetchKeys(t *testing.T) {
prysmBeaconClient := validatormock.NewMockPrysmBeaconChainClient(ctrl)
kp := randKeypair(t)
km := newMockKeymanager(t, kp)
km.fetchNoKeys = true
km := newMockKeymanager(t)
v := validator{
validatorClient: validatorClient,
@@ -233,7 +230,19 @@ func TestWaitForActivation_RefetchKeys(t *testing.T) {
clientStream.EXPECT().Recv().Return(
resp,
nil)
assert.NoError(t, v.internalWaitForActivation(context.Background(), make(chan [][fieldparams.BLSPubkeyLength]byte)), "Could not wait for activation")
accountChan := make(chan [][fieldparams.BLSPubkeyLength]byte)
sub := km.SubscribeAccountChanges(accountChan)
defer func() {
sub.Unsubscribe()
close(accountChan)
}()
// update the accounts after a delay
go func() {
time.Sleep(2 * time.Second)
require.NoError(t, km.add(kp))
km.SimulateAccountChanges([][48]byte{kp.pub})
}()
assert.NoError(t, v.internalWaitForActivation(context.Background(), accountChan), "Could not wait for activation")
assert.LogsContain(t, hook, msgNoKeysFetched)
assert.LogsContain(t, hook, "Validator activated")
}
@@ -265,7 +274,11 @@ func TestWaitForActivation_AccountsChanged(t *testing.T) {
&ethpb.ValidatorActivationRequest{
PublicKeys: [][]byte{inactive.pub[:]},
},
).Return(inactiveClientStream, nil)
).DoAndReturn(func(ctx context.Context, in *ethpb.ValidatorActivationRequest) (*mock.MockBeaconNodeValidator_WaitForActivationClient, error) {
//delay a bit so that other key can be added
time.Sleep(time.Second * 2)
return inactiveClientStream, nil
})
prysmBeaconClient.EXPECT().GetValidatorCount(
gomock.Any(),
"head",
@@ -353,7 +366,11 @@ func TestWaitForActivation_AccountsChanged(t *testing.T) {
&ethpb.ValidatorActivationRequest{
PublicKeys: [][]byte{inactivePubKey[:]},
},
).Return(inactiveClientStream, nil)
).DoAndReturn(func(ctx context.Context, in *ethpb.ValidatorActivationRequest) (*mock.MockBeaconNodeValidator_WaitForActivationClient, error) {
//delay a bit so that other key can be added
time.Sleep(time.Second * 2)
return inactiveClientStream, nil
})
prysmBeaconClient.EXPECT().GetValidatorCount(
gomock.Any(),
"head",
@@ -393,3 +410,40 @@ func TestWaitForActivation_AccountsChanged(t *testing.T) {
assert.LogsContain(t, hook, "Validator activated")
})
}
func TestWaitActivation_NotAllValidatorsActivatedOK(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
validatorClient := validatormock.NewMockValidatorClient(ctrl)
beaconClient := validatormock.NewMockBeaconChainClient(ctrl)
prysmBeaconClient := validatormock.NewMockPrysmBeaconChainClient(ctrl)
kp := randKeypair(t)
v := validator{
validatorClient: validatorClient,
keyManager: newMockKeymanager(t, kp),
beaconClient: beaconClient,
prysmBeaconClient: prysmBeaconClient,
}
resp := generateMockStatusResponse([][]byte{kp.pub[:]})
resp.Statuses[0].Status.Status = ethpb.ValidatorStatus_ACTIVE
clientStream := mock.NewMockBeaconNodeValidator_WaitForActivationClient(ctrl)
validatorClient.EXPECT().WaitForActivation(
gomock.Any(),
gomock.Any(),
).Return(clientStream, nil)
prysmBeaconClient.EXPECT().GetValidatorCount(
gomock.Any(),
"head",
[]validatorType.Status{validatorType.Active},
).Return([]iface.ValidatorCount{}, nil).Times(2)
clientStream.EXPECT().Recv().Return(
&ethpb.ValidatorActivationResponse{},
nil,
)
clientStream.EXPECT().Recv().Return(
resp,
nil,
)
assert.NoError(t, v.WaitForActivation(context.Background(), nil), "Could not wait for activation")
}

View File

@@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"fmt"
"path/filepath"
"strings"
"sync"
@@ -282,18 +283,29 @@ func (km *Keymanager) SaveStoreAndReInitialize(ctx context.Context, store *accou
if err != nil {
return err
}
if err := km.wallet.WriteFileAtPath(ctx, AccountsPath, AccountsKeystoreFileName, encodedAccounts); err != nil {
existedPreviously, err := km.wallet.WriteFileAtPath(ctx, AccountsPath, AccountsKeystoreFileName, encodedAccounts)
if err != nil {
return err
}
// Reinitialize account store and cache
// This will update the in-memory information instead of reading from the file itself for safety concerns
km.accountsStore = store
err = km.initializeKeysCachesFromKeystore()
if err != nil {
return errors.Wrap(err, "failed to initialize keys caches")
if existedPreviously {
// Reinitialize account store and cache
// This will update the in-memory information instead of reading from the file itself for safety concerns
km.accountsStore = store
err = km.initializeKeysCachesFromKeystore()
if err != nil {
return errors.Wrap(err, "failed to initialize keys caches")
}
return nil
}
return err
// manually reload the account from the keystore the first time
km.reloadAccountsFromKeystoreFile(filepath.Join(km.wallet.AccountsDir(), AccountsPath, AccountsKeystoreFileName))
// listen to account changes of the new file
go km.listenForAccountChanges(ctx)
return nil
}
// CreateAccountsKeystoreRepresentation is a pure function that takes an accountStore and wallet password and returns the encrypted formatted json version for local writing.

View File

@@ -26,6 +26,7 @@ func (km *Keymanager) listenForAccountChanges(ctx context.Context) {
debounceFileChangesInterval := features.Get().KeystoreImportDebounceInterval
accountsFilePath := filepath.Join(km.wallet.AccountsDir(), AccountsPath, AccountsKeystoreFileName)
if !file.Exists(accountsFilePath) {
log.Warnf("Starting without accounts located in wallet at %s", accountsFilePath)
return
}
watcher, err := fsnotify.NewWatcher()
@@ -56,27 +57,7 @@ func (km *Keymanager) listenForAccountChanges(ctx context.Context) {
log.Errorf("Type %T is not a valid file system event", event)
return
}
fileBytes, err := os.ReadFile(ev.Name)
if err != nil {
log.WithError(err).Errorf("Could not read file at path: %s", ev.Name)
return
}
if fileBytes == nil {
log.WithError(err).Errorf("Loaded in an empty file: %s", ev.Name)
return
}
accountsKeystore := &AccountsKeystoreRepresentation{}
if err := json.Unmarshal(fileBytes, accountsKeystore); err != nil {
log.WithError(
err,
).Errorf("Could not read valid, EIP-2335 keystore json file at path: %s", ev.Name)
return
}
if err := km.reloadAccountsFromKeystore(accountsKeystore); err != nil {
log.WithError(
err,
).Error("Could not replace the accounts store from keystore file")
}
km.reloadAccountsFromKeystoreFile(ev.Name)
})
for {
select {
@@ -92,6 +73,34 @@ func (km *Keymanager) listenForAccountChanges(ctx context.Context) {
}
}
func (km *Keymanager) reloadAccountsFromKeystoreFile(accountsFilePath string) {
if km.wallet == nil {
log.Error("Could not reload accounts because wallet was undefined")
return
}
fileBytes, err := os.ReadFile(filepath.Clean(accountsFilePath))
if err != nil {
log.WithError(err).Errorf("Could not read file at path: %s", accountsFilePath)
return
}
if fileBytes == nil {
log.WithError(err).Errorf("Loaded in an empty file: %s", accountsFilePath)
return
}
accountsKeystore := &AccountsKeystoreRepresentation{}
if err := json.Unmarshal(fileBytes, accountsKeystore); err != nil {
log.WithError(
err,
).Errorf("Could not read valid, EIP-2335 keystore json file at path: %s", accountsFilePath)
return
}
if err := km.reloadAccountsFromKeystore(accountsKeystore); err != nil {
log.WithError(
err,
).Error("Could not replace the accounts store from keystore file")
}
}
// Replaces the accounts store struct in the local keymanager with
// the contents of a keystore file by decrypting it with the accounts password.
func (km *Keymanager) reloadAccountsFromKeystore(keystore *AccountsKeystoreRepresentation) error {
@@ -107,6 +116,7 @@ func (km *Keymanager) reloadAccountsFromKeystore(keystore *AccountsKeystoreRepre
if len(newAccountsStore.PublicKeys) != len(newAccountsStore.PrivateKeys) {
return errors.New("number of public and private keys in keystore do not match")
}
pubKeys := make([][fieldparams.BLSPubkeyLength]byte, len(newAccountsStore.PublicKeys))
for i := 0; i < len(newAccountsStore.PrivateKeys); i++ {
privKey, err := bls.SecretKeyFromBytes(newAccountsStore.PrivateKeys[i])