diff --git a/deps.bzl b/deps.bzl index fd114ece03..77dc983932 100644 --- a/deps.bzl +++ b/deps.bzl @@ -3614,3 +3614,9 @@ def prysm_deps(): sum = "h1:DP+LD/t0njgoPBvT5MJLeliUIVQR03hiKR6vezdwHlc=", version = "v0.7.1", ) + go_repository( + name = "com_github_go_fsnotify_fsnotify", + importpath = "github.com/go-fsnotify/fsnotify", + sum = "h1:PeVNzgTRtWGm6fVic5i21t+n5ptPGCZuMcSPVMyTWjs=", + version = "v0.0.0-20180321022601-755488143dae", + ) diff --git a/go.mod b/go.mod index bf5419be85..0b2e2b0798 100644 --- a/go.mod +++ b/go.mod @@ -25,6 +25,7 @@ require ( github.com/fatih/color v1.9.0 // indirect github.com/ferranbt/fastssz v0.0.0-20200826142241-3a913c5a1313 github.com/fjl/memsize v0.0.0-20190710130421-bcb5799ab5e5 + github.com/fsnotify/fsnotify v1.4.7 github.com/gballet/go-libpcsclite v0.0.0-20191108122812-4678299bea08 // indirect github.com/ghodss/yaml v1.0.0 github.com/go-yaml/yaml v2.1.0+incompatible diff --git a/shared/asyncutil/BUILD.bazel b/shared/asyncutil/BUILD.bazel new file mode 100644 index 0000000000..22d5a0c75d --- /dev/null +++ b/shared/asyncutil/BUILD.bazel @@ -0,0 +1,15 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_test") +load("@prysm//tools/go:def.bzl", "go_library") + +go_library( + name = "go_default_library", + srcs = ["debounce.go"], + importpath = "github.com/prysmaticlabs/prysm/shared/asyncutil", + visibility = ["//visibility:public"], +) + +go_test( + name = "go_default_test", + srcs = ["debounce_test.go"], + embed = [":go_default_library"], +) diff --git a/shared/asyncutil/debounce.go b/shared/asyncutil/debounce.go new file mode 100644 index 0000000000..8194d72c59 --- /dev/null +++ b/shared/asyncutil/debounce.go @@ -0,0 +1,29 @@ +package asyncutil + +import ( + "context" + "time" +) + +// Debounce events fired over a channel by a specified duration, ensuring no events +// are handled until a certain interval of time has passed. +func Debounce(ctx context.Context, interval time.Duration, eventsChan <-chan interface{}, handler func(interface{})) { + for event := range eventsChan { + loop: + for { + // If an event is received, wait the specified interval before calling the + // handler. + // If another event is received before the interval has passed, store + // it and reset the timer. + select { + // Do nothing until we can handle the events after the debounce interval. + case event = <-eventsChan: + case <-time.After(interval): + handler(event) + break loop + case <-ctx.Done(): + return + } + } + } +} diff --git a/shared/asyncutil/debounce_test.go b/shared/asyncutil/debounce_test.go new file mode 100644 index 0000000000..012a656541 --- /dev/null +++ b/shared/asyncutil/debounce_test.go @@ -0,0 +1,27 @@ +package asyncutil + +import ( + "context" + "testing" + "time" +) + +func TestDebounce(t *testing.T) { + eventsChan := make(chan interface{}, 100) + ctx, cancel := context.WithCancel(context.Background()) + interval := time.Second + timesHandled := 0 + go Debounce(ctx, interval, eventsChan, func(event interface{}) { + timesHandled++ + }) + for i := 0; i < 100; i++ { + eventsChan <- struct{}{} + } + time.Sleep(interval) + cancel() + // We should expect 100 rapid fire changes to only have caused + // 1 handler to trigger after the debouncing period. + if timesHandled != 1 { + t.Errorf("Expected 1 handler call, received %d", timesHandled) + } +} diff --git a/validator/BUILD.bazel b/validator/BUILD.bazel index aed333489c..63bc390903 100644 --- a/validator/BUILD.bazel +++ b/validator/BUILD.bazel @@ -14,7 +14,6 @@ go_library( importpath = "github.com/prysmaticlabs/prysm/validator", visibility = ["//validator:__subpackages__"], deps = [ - "//shared/bytesutil:go_default_library", "//shared/cmd:go_default_library", "//shared/debug:go_default_library", "//shared/featureconfig:go_default_library", diff --git a/validator/accounts/v2/BUILD.bazel b/validator/accounts/v2/BUILD.bazel index a864341ed4..5e244b1f19 100644 --- a/validator/accounts/v2/BUILD.bazel +++ b/validator/accounts/v2/BUILD.bazel @@ -29,6 +29,7 @@ go_library( deps = [ "//shared/bls:go_default_library", "//shared/bytesutil:go_default_library", + "//shared/event:go_default_library", "//shared/featureconfig:go_default_library", "//shared/fileutil:go_default_library", "//shared/params:go_default_library", diff --git a/validator/accounts/v2/cmd_accounts.go b/validator/accounts/v2/cmd_accounts.go index 1777ad4dfc..0ae2a604f6 100644 --- a/validator/accounts/v2/cmd_accounts.go +++ b/validator/accounts/v2/cmd_accounts.go @@ -83,6 +83,7 @@ this command outputs a deposit data string which is required to become a validat "list of hex string public keys", Flags: []cli.Flag{ flags.WalletDirFlag, + flags.WalletPasswordFileFlag, flags.BackupDirFlag, flags.BackupPublicKeysFlag, flags.BackupPasswordFile, diff --git a/validator/accounts/v2/wallet.go b/validator/accounts/v2/wallet.go index 5cdf650330..f1e3fe9b5a 100644 --- a/validator/accounts/v2/wallet.go +++ b/validator/accounts/v2/wallet.go @@ -11,6 +11,7 @@ import ( "github.com/gofrs/flock" "github.com/pkg/errors" + "github.com/prysmaticlabs/prysm/shared/event" "github.com/prysmaticlabs/prysm/shared/fileutil" "github.com/prysmaticlabs/prysm/shared/params" "github.com/prysmaticlabs/prysm/shared/promptutil" @@ -67,12 +68,13 @@ type WalletConfig struct { // and providing secure access to eth2 secrets depending on an // associated keymanager (either direct, derived, or remote signing enabled). type Wallet struct { - walletDir string - accountsPath string - configFilePath string - walletPassword string - walletFileLock *flock.Flock - keymanagerKind v2keymanager.Kind + walletDir string + accountsPath string + configFilePath string + walletPassword string + walletFileLock *flock.Flock + keymanagerKind v2keymanager.Kind + accountsChangedFeed *event.Feed } // WalletExists check if a wallet at the specified directory diff --git a/validator/client/BUILD.bazel b/validator/client/BUILD.bazel index 8cc64f965d..7794f02730 100644 --- a/validator/client/BUILD.bazel +++ b/validator/client/BUILD.bazel @@ -34,6 +34,7 @@ go_library( "//validator/db:go_default_library", "//validator/keymanager/v1:go_default_library", "//validator/keymanager/v2:go_default_library", + "//validator/keymanager/v2/direct:go_default_library", "//validator/slashing-protection:go_default_library", "@com_github_dgraph_io_ristretto//:go_default_library", "@com_github_ferranbt_fastssz//:go_default_library", diff --git a/validator/client/service.go b/validator/client/service.go index fbb1f36f99..2264f28e5f 100644 --- a/validator/client/service.go +++ b/validator/client/service.go @@ -24,6 +24,7 @@ import ( "github.com/prysmaticlabs/prysm/validator/db" keymanager "github.com/prysmaticlabs/prysm/validator/keymanager/v1" v2 "github.com/prysmaticlabs/prysm/validator/keymanager/v2" + "github.com/prysmaticlabs/prysm/validator/keymanager/v2/direct" slashingprotection "github.com/prysmaticlabs/prysm/validator/slashing-protection" "github.com/sirupsen/logrus" "go.opencensus.io/plugin/ocgrpc" @@ -51,7 +52,6 @@ type ValidatorService struct { logValidatorBalances bool emitAccountMetrics bool maxCallRecvMsgSize int - validatingPubKeys [][48]byte grpcRetries uint grpcRetryDelay time.Duration grpcHeaders []string @@ -64,7 +64,6 @@ type Config struct { DataDir string CertFlag string GraffitiFlag string - ValidatingPubKeys [][48]byte KeyManager keymanager.KeyManager KeyManagerV2 v2.IKeymanager LogValidatorBalances bool @@ -91,7 +90,6 @@ func NewValidatorService(ctx context.Context, cfg *Config) (*ValidatorService, e graffiti: []byte(cfg.GraffitiFlag), keyManager: cfg.KeyManager, keyManagerV2: cfg.KeyManagerV2, - validatingPubKeys: cfg.ValidatingPubKeys, logValidatorBalances: cfg.LogValidatorBalances, emitAccountMetrics: cfg.EmitAccountMetrics, maxCallRecvMsgSize: cfg.GrpcMaxCallRecvMsgSizeFlag, @@ -169,7 +167,26 @@ func (v *ValidatorService) Start() { protector: v.protector, voteStats: voteStats{startEpoch: ^uint64(0)}, } + var validatingKeys [][48]byte go run(v.ctx, v.validator) + if featureconfig.Get().EnableAccountsV2 { + validatingKeys, err = v.keyManagerV2.FetchValidatingPublicKeys(v.ctx) + if err != nil { + log.WithError(err).Debug("Could not fetch validating keys") + } + if err := v.db.UpdatePublicKeysBuckets(validatingKeys); err != nil { + log.WithError(err).Debug("Could not update public keys buckets") + } + go recheckValidatingKeysBucket(v.ctx, v.db, v.keyManagerV2) + } else { + validatingKeys, err = v.keyManager.FetchValidatingKeys() + if err != nil { + log.WithError(err).Debug("Could not fetch validating keys") + } + if err := v.db.UpdatePublicKeysBuckets(validatingKeys); err != nil { + log.WithError(err).Debug("Could not update public keys buckets") + } + } } // Stop the validator service. @@ -293,6 +310,30 @@ func ConstructDialOptions( return dialOpts } +// Reloads the validating keys upon receiving an event over a feed subscription +// to accounts changes in the keymanager, then updates those keys' +// buckets in bolt DB if a bucket for a key does not exist. +func recheckValidatingKeysBucket(ctx context.Context, valDB db.Database, km v2.IKeymanager) { + directKeymanager, ok := km.(*direct.Keymanager) + if !ok { + return + } + validatingPubKeysChan := make(chan [][48]byte, 1) + sub := directKeymanager.SubscribeAccountChanges(validatingPubKeysChan) + defer sub.Unsubscribe() + for { + select { + case keys := <-validatingPubKeysChan: + if err := valDB.UpdatePublicKeysBuckets(keys); err != nil { + log.WithError(err).Debug("Could not update public keys buckets") + continue + } + case <-ctx.Done(): + return + } + } +} + // ValidatorBalances returns the validator balances mapping keyed by public keys. func (v *ValidatorService) ValidatorBalances(ctx context.Context) map[[48]byte]uint64 { return v.validator.BalancesByPubkeys(ctx) diff --git a/validator/db/iface/interface.go b/validator/db/iface/interface.go index cd665810ac..5b24e7c37d 100644 --- a/validator/db/iface/interface.go +++ b/validator/db/iface/interface.go @@ -14,6 +14,7 @@ type ValidatorDB interface { io.Closer DatabasePath() string ClearDB() error + UpdatePublicKeysBuckets(publicKeys [][48]byte) error // Proposer protection related methods. ProposalHistoryForEpoch(ctx context.Context, publicKey []byte, epoch uint64) (bitfield.Bitlist, error) SaveProposalHistoryForEpoch(ctx context.Context, publicKey []byte, epoch uint64, history bitfield.Bitlist) error diff --git a/validator/db/kv/db.go b/validator/db/kv/db.go index 70344d54cd..f4f39c8914 100644 --- a/validator/db/kv/db.go +++ b/validator/db/kv/db.go @@ -83,7 +83,7 @@ func NewKVStore(dirPath string, pubKeys [][48]byte) (*Store, error) { } // Initialize the required public keys into the DB to ensure they're not empty. - if err := kv.initializeSubBuckets(pubKeys); err != nil { + if err := kv.UpdatePublicKeysBuckets(pubKeys); err != nil { return nil, err } diff --git a/validator/db/kv/proposal_history.go b/validator/db/kv/proposal_history.go index e3a20321ae..95fef33833 100644 --- a/validator/db/kv/proposal_history.go +++ b/validator/db/kv/proposal_history.go @@ -61,6 +61,19 @@ func (store *Store) SaveProposalHistoryForEpoch(ctx context.Context, pubKey []by return err } +// UpdatePublicKeysBuckets for a specified list of keys. +func (store *Store) UpdatePublicKeysBuckets(pubKeys [][48]byte) error { + return store.update(func(tx *bolt.Tx) error { + bucket := tx.Bucket(historicProposalsBucket) + for _, pubKey := range pubKeys { + if _, err := bucket.CreateBucketIfNotExists(pubKey[:]); err != nil { + return errors.Wrap(err, "failed to create proposal history bucket") + } + } + return nil + }) +} + func pruneProposalHistory(valBucket *bolt.Bucket, newestEpoch uint64) error { c := valBucket.Cursor() for k, _ := c.First(); k != nil; k, _ = c.First() { @@ -77,15 +90,3 @@ func pruneProposalHistory(valBucket *bolt.Bucket, newestEpoch uint64) error { } return nil } - -func (store *Store) initializeSubBuckets(pubKeys [][48]byte) error { - return store.update(func(tx *bolt.Tx) error { - bucket := tx.Bucket(historicProposalsBucket) - for _, pubKey := range pubKeys { - if _, err := bucket.CreateBucketIfNotExists(pubKey[:]); err != nil { - return errors.Wrap(err, "failed to create proposal history bucket") - } - } - return nil - }) -} diff --git a/validator/keymanager/v2/direct/BUILD.bazel b/validator/keymanager/v2/direct/BUILD.bazel index 7f543c0809..bd21ecd30f 100644 --- a/validator/keymanager/v2/direct/BUILD.bazel +++ b/validator/keymanager/v2/direct/BUILD.bazel @@ -8,6 +8,7 @@ go_library( "direct.go", "doc.go", "import.go", + "refresh.go", ], importpath = "github.com/prysmaticlabs/prysm/validator/keymanager/v2/direct", visibility = [ @@ -17,15 +18,19 @@ go_library( deps = [ "//beacon-chain/core/helpers:go_default_library", "//proto/validator/accounts/v2:go_default_library", + "//shared/asyncutil:go_default_library", "//shared/bls:go_default_library", "//shared/bytesutil:go_default_library", "//shared/depositutil:go_default_library", + "//shared/event:go_default_library", + "//shared/fileutil:go_default_library", "//shared/interop:go_default_library", "//shared/params:go_default_library", "//shared/petnames:go_default_library", "//shared/promptutil:go_default_library", "//validator/accounts/v2/iface:go_default_library", "//validator/keymanager/v2:go_default_library", + "@com_github_fsnotify_fsnotify//:go_default_library", "@com_github_google_uuid//:go_default_library", "@com_github_k0kubun_go_ansi//:go_default_library", "@com_github_logrusorgru_aurora//:go_default_library", @@ -42,12 +47,14 @@ go_test( "backup_test.go", "direct_test.go", "import_test.go", + "refresh_test.go", ], embed = [":go_default_library"], deps = [ "//proto/validator/accounts/v2:go_default_library", "//shared/bls:go_default_library", "//shared/bytesutil:go_default_library", + "//shared/event:go_default_library", "//shared/testutil/assert:go_default_library", "//shared/testutil/require:go_default_library", "//validator/accounts/v2/testing:go_default_library", diff --git a/validator/keymanager/v2/direct/direct.go b/validator/keymanager/v2/direct/direct.go index 65602b5aac..1ba281900b 100644 --- a/validator/keymanager/v2/direct/direct.go +++ b/validator/keymanager/v2/direct/direct.go @@ -19,6 +19,7 @@ import ( "github.com/prysmaticlabs/prysm/shared/bls" "github.com/prysmaticlabs/prysm/shared/bytesutil" "github.com/prysmaticlabs/prysm/shared/depositutil" + "github.com/prysmaticlabs/prysm/shared/event" "github.com/prysmaticlabs/prysm/shared/interop" "github.com/prysmaticlabs/prysm/shared/params" "github.com/prysmaticlabs/prysm/shared/petnames" @@ -47,14 +48,16 @@ type KeymanagerOpts struct { // Keymanager implementation for direct keystores utilizing EIP-2335. type Keymanager struct { - wallet iface.Wallet - opts *KeymanagerOpts - keysCache map[[48]byte]bls.SecretKey - accountsStore *AccountStore - lock sync.RWMutex + wallet iface.Wallet + opts *KeymanagerOpts + keysCache map[[48]byte]bls.SecretKey + accountsStore *AccountStore + lock sync.RWMutex + accountsChangedFeed *event.Feed } -// AccountStore -- +// AccountStore defines a struct containing 1-to-1 corresponding +// private keys and public keys for eth2 validators. type AccountStore struct { PrivateKeys [][]byte `json:"private_keys"` PublicKeys [][]byte `json:"public_keys"` @@ -79,10 +82,11 @@ type SetupConfig struct { // NewKeymanager instantiates a new direct keymanager from configuration options. func NewKeymanager(ctx context.Context, cfg *SetupConfig) (*Keymanager, error) { k := &Keymanager{ - wallet: cfg.Wallet, - opts: cfg.Opts, - keysCache: make(map[[48]byte]bls.SecretKey), - accountsStore: &AccountStore{}, + wallet: cfg.Wallet, + opts: cfg.Opts, + keysCache: make(map[[48]byte]bls.SecretKey), + accountsStore: &AccountStore{}, + accountsChangedFeed: new(event.Feed), } // If the wallet has the capability of unlocking accounts using @@ -92,13 +96,18 @@ func NewKeymanager(ctx context.Context, cfg *SetupConfig) (*Keymanager, error) { if err := k.initializeSecretKeysCache(ctx); err != nil { return nil, errors.Wrap(err, "could not initialize keys cache") } + + // We begin a goroutine to listen for file changes to our + // all-accounts.keystore.json file in the wallet directory. + go k.listenForAccountChanges(ctx) return k, nil } // NewInteropKeymanager instantiates a new direct keymanager with the deterministically generated interop keys. func NewInteropKeymanager(ctx context.Context, offset uint64, numValidatorKeys uint64) (*Keymanager, error) { k := &Keymanager{ - keysCache: make(map[[48]byte]bls.SecretKey), + keysCache: make(map[[48]byte]bls.SecretKey), + accountsChangedFeed: new(event.Feed), } if numValidatorKeys == 0 { return k, nil @@ -156,6 +165,13 @@ func (opts *KeymanagerOpts) String() string { return b.String() } +// SubscribeAccountChanges creates an event subscription for a channel +// to listen for public key changes at runtime, such as when new validator accounts +// are imported into the keymanager while the validator process is running. +func (dr *Keymanager) SubscribeAccountChanges(pubKeysChan chan [][48]byte) event.Subscription { + return dr.accountsChangedFeed.Subscribe(pubKeysChan) +} + // ValidatingAccountNames for a direct keymanager. func (dr *Keymanager) ValidatingAccountNames() ([]string, error) { names := make([]string, len(dr.keysCache)) @@ -343,7 +359,7 @@ func (dr *Keymanager) initializeSecretKeysCache(ctx context.Context) error { if err != nil && strings.Contains(err.Error(), "invalid checksum") { // If the password fails for an individual account, we ask the user to input // that individual account's password until it succeeds. - enc, password, err = dr.askUntilPasswordConfirms(decryptor, keystoreFile) + enc, password, err = askUntilPasswordConfirms(decryptor, keystoreFile) if err != nil { return errors.Wrap(err, "could not confirm password via prompt") } @@ -379,7 +395,6 @@ func (dr *Keymanager) createAccountsKeystore( privateKeys [][]byte, publicKeys [][]byte, ) (*v2keymanager.Keystore, error) { - au := aurora.NewAurora(true) encryptor := keystorev4.New() id, err := uuid.NewRandom() if err != nil { @@ -410,7 +425,6 @@ func (dr *Keymanager) createAccountsKeystore( _, privKeyExists := existingPrivKeys[string(sk)] _, pubKeyExists := existingPubKeys[string(pk)] if privKeyExists || pubKeyExists { - fmt.Printf("Public key %#x already exists\n", au.BrightMagenta(bytesutil.Trunc(pk))) continue } dr.accountsStore.PublicKeys = append(dr.accountsStore.PublicKeys, pk) @@ -433,7 +447,7 @@ func (dr *Keymanager) createAccountsKeystore( }, nil } -func (dr *Keymanager) askUntilPasswordConfirms( +func askUntilPasswordConfirms( decryptor *keystorev4.Encryptor, keystore *v2keymanager.Keystore, ) ([]byte, string, error) { au := aurora.NewAurora(true) diff --git a/validator/keymanager/v2/direct/import.go b/validator/keymanager/v2/direct/import.go index 90197f9099..e131ed22c9 100644 --- a/validator/keymanager/v2/direct/import.go +++ b/validator/keymanager/v2/direct/import.go @@ -65,7 +65,7 @@ func (dr *Keymanager) attemptDecryptKeystore( if err != nil && strings.Contains(err.Error(), "invalid checksum") { // If the password fails for an individual account, we ask the user to input // that individual account's password until it succeeds. - privKeyBytes, password, err = dr.askUntilPasswordConfirms(enc, keystore) + privKeyBytes, password, err = askUntilPasswordConfirms(enc, keystore) if err != nil { return nil, nil, "", errors.Wrap(err, "could not confirm password via prompt") } diff --git a/validator/keymanager/v2/direct/refresh.go b/validator/keymanager/v2/direct/refresh.go new file mode 100644 index 0000000000..6a8e606e7a --- /dev/null +++ b/validator/keymanager/v2/direct/refresh.go @@ -0,0 +1,124 @@ +package direct + +import ( + "context" + "encoding/json" + "io/ioutil" + "path/filepath" + "time" + + "github.com/fsnotify/fsnotify" + "github.com/pkg/errors" + "github.com/prysmaticlabs/prysm/shared/asyncutil" + "github.com/prysmaticlabs/prysm/shared/bls" + "github.com/prysmaticlabs/prysm/shared/bytesutil" + "github.com/prysmaticlabs/prysm/shared/fileutil" + v2keymanager "github.com/prysmaticlabs/prysm/validator/keymanager/v2" + keystorev4 "github.com/wealdtech/go-eth2-wallet-encryptor-keystorev4" +) + +var ( + debounceFileChangesInterval = time.Second +) + +// Listen for changes to the all-accounts.keystore.json file in our wallet +// to load in new keys we observe into our keymanager. This uses the fsnotify +// library to listen for file-system changes and debounces these events to +// ensure we can handle thousands of events fired in a short time-span. +func (dr *Keymanager) listenForAccountChanges(ctx context.Context) { + accountsFilePath := filepath.Join(dr.wallet.AccountsDir(), AccountsPath, accountsKeystoreFileName) + if !fileutil.FileExists(accountsFilePath) { + return + } + watcher, err := fsnotify.NewWatcher() + if err != nil { + log.WithError(err).Error("Could not initialize file watcher") + return + } + defer func() { + if err := watcher.Close(); err != nil { + log.WithError(err).Error("Could not close file watcher") + } + }() + if err := watcher.Add(accountsFilePath); err != nil { + log.WithError(err).Errorf("Could not add file %s to file watcher", accountsFilePath) + return + } + ctx, cancel := context.WithCancel(ctx) + defer cancel() + fileChangesChan := make(chan interface{}, 100) + defer close(fileChangesChan) + + // We debounce events sent over the file changes channel by an interval + // to ensure we are not overwhelmed by a ton of events fired over the channel in + // a short span of time. + go asyncutil.Debounce(ctx, debounceFileChangesInterval, fileChangesChan, func(event interface{}) { + ev, ok := event.(fsnotify.Event) + if !ok { + log.Errorf("Type %T is not a valid file system event", event) + return + } + fileBytes, err := ioutil.ReadFile(ev.Name) + if err != nil { + log.WithError(err).Errorf("Could not read file at path: %s", ev.Name) + return + } + accountsKeystore := &v2keymanager.Keystore{} + if err := json.Unmarshal(fileBytes, accountsKeystore); err != nil { + log.WithError( + err, + ).Errorf("Could not read valid, EIP-2335 keystore json file at path: %s", ev.Name) + return + } + if err := dr.reloadAccountsFromKeystore(accountsKeystore); err != nil { + log.WithError( + err, + ).Error("Could not replace the accounts store from keystore file") + } + }) + for { + select { + case event := <-watcher.Events: + // If a file was modified, we attempt to read that file + // and parse it into our accounts store. + if event.Op&fsnotify.Write == fsnotify.Write { + fileChangesChan <- event + } + case err := <-watcher.Errors: + log.WithError(err).Errorf("Could not watch for file changes for: %s", accountsFilePath) + case <-ctx.Done(): + return + } + } +} + +// Replaces the accounts store struct in the direct keymanager with +// the contents of a keystore file by decrypting it with the accounts password. +func (dr *Keymanager) reloadAccountsFromKeystore(keystore *v2keymanager.Keystore) error { + decryptor := keystorev4.New() + encodedAccounts, err := decryptor.Decrypt(keystore.Crypto, dr.wallet.Password()) + if err != nil { + return errors.Wrapf(err, "could not decrypt keystore file with public key %s", keystore.Pubkey) + } + newAccountsStore := &AccountStore{} + if err := json.Unmarshal(encodedAccounts, newAccountsStore); err != nil { + return err + } + dr.lock.Lock() + defer dr.lock.Unlock() + dr.keysCache = make(map[[48]byte]bls.SecretKey) + dr.accountsStore = newAccountsStore + pubKeys := make([][48]byte, len(dr.accountsStore.PublicKeys)) + for i := 0; i < len(dr.accountsStore.PrivateKeys); i++ { + privKey, err := bls.SecretKeyFromBytes(dr.accountsStore.PrivateKeys[i]) + if err != nil { + return errors.Wrap(err, "could not initialize private key") + } + pubKeyBytes := privKey.PublicKey().Marshal() + pubKeys[i] = bytesutil.ToBytes48(pubKeyBytes) + dr.keysCache[bytesutil.ToBytes48(pubKeyBytes)] = privKey + } + log.Info("Reloaded validator keys into keymanager") + dr.accountsChangedFeed.Send(pubKeys) + return nil +} diff --git a/validator/keymanager/v2/direct/refresh_test.go b/validator/keymanager/v2/direct/refresh_test.go new file mode 100644 index 0000000000..4f97316e3a --- /dev/null +++ b/validator/keymanager/v2/direct/refresh_test.go @@ -0,0 +1,51 @@ +package direct + +import ( + "context" + "testing" + + "github.com/prysmaticlabs/prysm/shared/bls" + "github.com/prysmaticlabs/prysm/shared/bytesutil" + "github.com/prysmaticlabs/prysm/shared/event" + "github.com/prysmaticlabs/prysm/shared/testutil/assert" + "github.com/prysmaticlabs/prysm/shared/testutil/require" + mock "github.com/prysmaticlabs/prysm/validator/accounts/v2/testing" +) + +func TestDirectKeymanager_reloadAccountsFromKeystore(t *testing.T) { + password := "Passw03rdz293**%#2" + wallet := &mock.Wallet{ + Files: make(map[string]map[string][]byte), + AccountPasswords: make(map[string]string), + WalletPassword: password, + } + dr := &Keymanager{ + wallet: wallet, + keysCache: make(map[[48]byte]bls.SecretKey), + accountsChangedFeed: new(event.Feed), + } + + numAccounts := 20 + privKeys := make([][]byte, numAccounts) + pubKeys := make([][]byte, numAccounts) + for i := 0; i < numAccounts; i++ { + privKey := bls.RandKey() + privKeys[i] = privKey.Marshal() + pubKeys[i] = privKey.PublicKey().Marshal() + } + + accountsStore, err := dr.createAccountsKeystore(context.Background(), privKeys, pubKeys) + require.NoError(t, err) + require.NoError(t, dr.reloadAccountsFromKeystore(accountsStore)) + + // Check the key was added to the keys cache. + for _, keyBytes := range pubKeys { + _, ok := dr.keysCache[bytesutil.ToBytes48(keyBytes)] + require.Equal(t, true, ok) + } + + // Check the key was added to the global accounts store. + require.Equal(t, numAccounts, len(dr.accountsStore.PublicKeys)) + require.Equal(t, numAccounts, len(dr.accountsStore.PrivateKeys)) + assert.DeepEqual(t, dr.accountsStore.PublicKeys[0], pubKeys[0]) +} diff --git a/validator/main.go b/validator/main.go index a31241cc8d..34bb25122e 100644 --- a/validator/main.go +++ b/validator/main.go @@ -14,7 +14,6 @@ import ( joonix "github.com/joonix/log" ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1" - "github.com/prysmaticlabs/prysm/shared/bytesutil" "github.com/prysmaticlabs/prysm/shared/cmd" "github.com/prysmaticlabs/prysm/shared/debug" "github.com/prysmaticlabs/prysm/shared/featureconfig" @@ -184,22 +183,15 @@ contract in order to activate the validator client`, Action: func(cliCtx *cli.Context) error { var err error var pubKeys [][]byte - if cliCtx.String(flags.KeyManager.Name) != "" { - pubKeysBytes48, success := node.ExtractPublicKeysFromKeymanager( - cliCtx, - nil, /* nil v1 keymanager */ - nil, /* nil v2 keymanager */ - ) - pubKeys, err = bytesutil.FromBytes48Array(pubKeysBytes48), success - } else { + if cliCtx.String(flags.KeyManager.Name) == "" { keystorePath, passphrase, err := v1.HandleEmptyKeystoreFlags(cliCtx, false /*confirmPassword*/) if err != nil { return err } pubKeys, err = v1.ExtractPublicKeysFromKeyStore(keystorePath, passphrase) - } - if err != nil { - return err + if err != nil { + return err + } } ctx, cancel := context.WithTimeout(context.Background(), connTimeout) defer cancel() diff --git a/validator/node/BUILD.bazel b/validator/node/BUILD.bazel index ffa14d221a..6aefa451d2 100644 --- a/validator/node/BUILD.bazel +++ b/validator/node/BUILD.bazel @@ -22,7 +22,6 @@ go_library( visibility = ["//validator:__subpackages__"], deps = [ "//shared:go_default_library", - "//shared/bytesutil:go_default_library", "//shared/cmd:go_default_library", "//shared/debug:go_default_library", "//shared/featureconfig:go_default_library", diff --git a/validator/node/node.go b/validator/node/node.go index 0ac31e3da6..9c3ce060e4 100644 --- a/validator/node/node.go +++ b/validator/node/node.go @@ -15,7 +15,6 @@ import ( "github.com/pkg/errors" "github.com/prysmaticlabs/prysm/shared" - "github.com/prysmaticlabs/prysm/shared/bytesutil" "github.com/prysmaticlabs/prysm/shared/cmd" "github.com/prysmaticlabs/prysm/shared/debug" "github.com/prysmaticlabs/prysm/shared/featureconfig" @@ -120,22 +119,10 @@ func NewValidatorClient(cliCtx *cli.Context) (*ValidatorClient, error) { } } - pubKeys, err := ExtractPublicKeysFromKeymanager(cliCtx, keyManagerV1, keyManagerV2) - if err != nil { - return nil, err - } - if len(pubKeys) == 0 { - log.Error("No keys found. Please verify `--keystore-path` is the correct path") - } else { - log.WithField("validators", len(pubKeys)).Debug("Found validator keys") - for _, key := range pubKeys { - log.WithField("pubKey", fmt.Sprintf("%#x", bytesutil.Trunc(key[:]))).Info("Validating for public key") - } - } - clearFlag := cliCtx.Bool(cmd.ClearDB.Name) forceClearFlag := cliCtx.Bool(cmd.ForceClearDB.Name) dataDir := cliCtx.String(cmd.DataDirFlag.Name) + validatingPubKeys := make([][48]byte, 0) if clearFlag || forceClearFlag { if dataDir == "" { dataDir = cmd.DefaultDataDir() @@ -147,13 +134,21 @@ func NewValidatorClient(cliCtx *cli.Context) (*ValidatorClient, error) { } } - if err := clearDB(dataDir, pubKeys, forceClearFlag); err != nil { + if featureconfig.Get().EnableAccountsV2 { + validatingPubKeys, err = keyManagerV2.FetchValidatingPublicKeys(context.Background()) + } else { + validatingPubKeys, err = keyManagerV1.FetchValidatingKeys() + } + if err != nil { + return nil, err + } + if err := clearDB(dataDir, validatingPubKeys, forceClearFlag); err != nil { return nil, err } } log.WithField("databasePath", dataDir).Info("Checking DB") - valDB, err := kv.NewKVStore(dataDir, pubKeys) + valDB, err := kv.NewKVStore(dataDir, validatingPubKeys) if err != nil { return nil, errors.Wrap(err, "could not initialize db") } @@ -167,7 +162,7 @@ func NewValidatorClient(cliCtx *cli.Context) (*ValidatorClient, error) { return nil, err } } - if err := ValidatorClient.registerClientService(keyManagerV1, keyManagerV2, pubKeys); err != nil { + if err := ValidatorClient.registerClientService(keyManagerV1, keyManagerV2); err != nil { return nil, err } @@ -243,7 +238,6 @@ func (s *ValidatorClient) registerPrometheusService() error { func (s *ValidatorClient) registerClientService( keyManager v1.KeyManager, keyManagerV2 v2.IKeymanager, - validatingPubKeys [][48]byte, ) error { endpoint := s.cliCtx.String(flags.BeaconRPCProviderFlag.Name) dataDir := s.cliCtx.String(cmd.DataDirFlag.Name) @@ -268,7 +262,6 @@ func (s *ValidatorClient) registerClientService( EmitAccountMetrics: emitAccountMetrics, CertFlag: cert, GraffitiFlag: graffiti, - ValidatingPubKeys: validatingPubKeys, GrpcMaxCallRecvMsgSizeFlag: maxCallRecvMsgSize, GrpcRetriesFlag: grpcRetries, GrpcRetryDelay: grpcRetryDelay, @@ -435,17 +428,3 @@ func clearDB(dataDir string, pubkeys [][48]byte, force bool) error { return nil } - -// ExtractPublicKeysFromKeymanager extracts only the public keys from the specified key manager. -func ExtractPublicKeysFromKeymanager(cliCtx *cli.Context, keyManagerV1 v1.KeyManager, keyManagerV2 v2.IKeymanager) ([][48]byte, error) { - var pubKeys [][48]byte - var err error - if featureconfig.Get().EnableAccountsV2 { - pubKeys, err = keyManagerV2.FetchValidatingPublicKeys(context.Background()) - if err != nil { - return nil, errors.Wrap(err, "failed to obtain public keys for validation") - } - return pubKeys, nil - } - return keyManagerV1.FetchValidatingKeys() -}