mirror of
https://github.com/OffchainLabs/prysm.git
synced 2026-01-09 15:37:56 -05:00
Fix validator activation monitoring with inactive keys (#8558)
* refactor / move waiting for activation updates * Commentary * Update test to follow the full code path * gofmt and goimports * manual imports fixes * Apply suggestions from code review typo fixes * Remove redundant handleAccountsChanged and chan. Thanks @nisdas * var sub = to sub := Co-authored-by: Radosław Kapka <rkapka@wp.pl>
This commit is contained in:
@@ -4,7 +4,6 @@ import (
|
||||
"archive/zip"
|
||||
"encoding/hex"
|
||||
"encoding/json"
|
||||
constant "github.com/prysmaticlabs/prysm/validator/testing"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"path/filepath"
|
||||
@@ -21,6 +20,7 @@ import (
|
||||
"github.com/prysmaticlabs/prysm/validator/accounts/wallet"
|
||||
"github.com/prysmaticlabs/prysm/validator/keymanager"
|
||||
"github.com/prysmaticlabs/prysm/validator/keymanager/derived"
|
||||
constant "github.com/prysmaticlabs/prysm/validator/testing"
|
||||
)
|
||||
|
||||
func TestBackupAccounts_Noninteractive_Derived(t *testing.T) {
|
||||
|
||||
@@ -3,7 +3,6 @@ package accounts
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
constant "github.com/prysmaticlabs/prysm/validator/testing"
|
||||
"io/ioutil"
|
||||
"math"
|
||||
"os"
|
||||
@@ -28,6 +27,7 @@ import (
|
||||
"github.com/prysmaticlabs/prysm/validator/keymanager/derived"
|
||||
"github.com/prysmaticlabs/prysm/validator/keymanager/imported"
|
||||
"github.com/prysmaticlabs/prysm/validator/keymanager/remote"
|
||||
constant "github.com/prysmaticlabs/prysm/validator/testing"
|
||||
keystorev4 "github.com/wealdtech/go-eth2-wallet-encryptor-keystorev4"
|
||||
)
|
||||
|
||||
|
||||
@@ -73,7 +73,7 @@ func (fv *FakeValidator) WaitForChainStart(_ context.Context) error {
|
||||
}
|
||||
|
||||
// WaitForActivation for mocking.
|
||||
func (fv *FakeValidator) WaitForActivation(_ context.Context, _ <-chan struct{}) error {
|
||||
func (fv *FakeValidator) WaitForActivation(_ context.Context) error {
|
||||
fv.WaitForActivationCalled++
|
||||
if fv.RetryTillSuccess >= fv.WaitForActivationCalled {
|
||||
return errConnectionIssue
|
||||
|
||||
@@ -27,7 +27,7 @@ type Validator interface {
|
||||
Done()
|
||||
WaitForChainStart(ctx context.Context) error
|
||||
WaitForSync(ctx context.Context) error
|
||||
WaitForActivation(ctx context.Context, accountsChangedChan <-chan struct{}) error
|
||||
WaitForActivation(ctx context.Context) error
|
||||
SlasherReady(ctx context.Context) error
|
||||
CanonicalHeadSlot(ctx context.Context) (types.Slot, error)
|
||||
NextSlot() <-chan types.Slot
|
||||
@@ -75,7 +75,6 @@ func run(ctx context.Context, v Validator) {
|
||||
|
||||
var headSlot types.Slot
|
||||
firstTime := true
|
||||
accountsChangedChan := make(chan struct{}, 1)
|
||||
for {
|
||||
if !firstTime {
|
||||
if ctx.Err() != nil {
|
||||
@@ -102,7 +101,7 @@ func run(ctx context.Context, v Validator) {
|
||||
if err != nil {
|
||||
log.Fatalf("Could not determine if beacon node synced: %v", err)
|
||||
}
|
||||
err = v.WaitForActivation(ctx, accountsChangedChan)
|
||||
err = v.WaitForActivation(ctx)
|
||||
if isConnectionError(err) {
|
||||
log.Warnf("Could not wait for validator activation: %v", err)
|
||||
continue
|
||||
@@ -121,7 +120,6 @@ func run(ctx context.Context, v Validator) {
|
||||
break
|
||||
}
|
||||
|
||||
go handleAccountsChanged(ctx, v, accountsChangedChan)
|
||||
connectionErrorChannel := make(chan error, 1)
|
||||
go v.ReceiveBlocks(ctx, connectionErrorChannel)
|
||||
if err := v.UpdateDuties(ctx, headSlot); err != nil {
|
||||
@@ -233,24 +231,3 @@ func handleAssignmentError(err error, slot types.Slot) {
|
||||
log.WithField("error", err).Error("Failed to update assignments")
|
||||
}
|
||||
}
|
||||
|
||||
func handleAccountsChanged(ctx context.Context, v Validator, accountsChangedChan chan<- struct{}) {
|
||||
validatingPubKeysChan := make(chan [][48]byte, 1)
|
||||
var sub = v.GetKeymanager().SubscribeAccountChanges(validatingPubKeysChan)
|
||||
defer func() {
|
||||
sub.Unsubscribe()
|
||||
close(validatingPubKeysChan)
|
||||
}()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-validatingPubKeysChan:
|
||||
accountsChangedChan <- struct{}{}
|
||||
case err := <-sub.Err():
|
||||
log.WithError(err).Error("accounts changed subscription failed")
|
||||
return
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -206,55 +206,3 @@ func TestAllValidatorsAreExited_NextSlot(t *testing.T) {
|
||||
run(ctx, v)
|
||||
assert.LogsContain(t, hook, "All validators are exited")
|
||||
}
|
||||
|
||||
func TestHandleAccountsChanged_Ok(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
defer ctx.Done()
|
||||
|
||||
km := &mockKeymanager{accountsChangedFeed: &event.Feed{}}
|
||||
v := &FakeValidator{Keymanager: km}
|
||||
channel := make(chan struct{})
|
||||
go handleAccountsChanged(ctx, v, channel)
|
||||
time.Sleep(time.Second) // Allow time for subscribing to changes.
|
||||
km.SimulateAccountChanges()
|
||||
time.Sleep(time.Second) // Allow time for handling subscribed changes.
|
||||
|
||||
select {
|
||||
case _, ok := <-channel:
|
||||
if !ok {
|
||||
t.Error("Account changed channel is closed")
|
||||
}
|
||||
default:
|
||||
t.Error("Accounts changed channel is empty")
|
||||
}
|
||||
}
|
||||
|
||||
func TestHandleAccountsChanged_CtxCancelled(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
|
||||
km := &mockKeymanager{accountsChangedFeed: &event.Feed{}}
|
||||
v := &FakeValidator{Keymanager: km}
|
||||
channel := make(chan struct{}, 2)
|
||||
go handleAccountsChanged(ctx, v, channel)
|
||||
time.Sleep(time.Second) // Allow time for subscribing to changes.
|
||||
km.SimulateAccountChanges()
|
||||
time.Sleep(time.Second) // Allow time for handling subscribed changes.
|
||||
|
||||
cancel()
|
||||
time.Sleep(time.Second) // Allow time for handling cancellation.
|
||||
km.SimulateAccountChanges()
|
||||
time.Sleep(time.Second) // Allow time for handling subscribed changes.
|
||||
|
||||
var values int
|
||||
for loop := true; loop == true; {
|
||||
select {
|
||||
case _, ok := <-channel:
|
||||
if ok {
|
||||
values++
|
||||
}
|
||||
default:
|
||||
loop = false
|
||||
}
|
||||
}
|
||||
assert.Equal(t, 1, values, "Incorrect number of values were passed to the channel")
|
||||
}
|
||||
|
||||
@@ -93,6 +93,9 @@ func (m *mockKeymanager) Sign(ctx context.Context, req *validatorpb.SignRequest)
|
||||
}
|
||||
|
||||
func (m *mockKeymanager) SubscribeAccountChanges(pubKeysChan chan [][48]byte) event.Subscription {
|
||||
if m.accountsChangedFeed == nil {
|
||||
m.accountsChangedFeed = &event.Feed{}
|
||||
}
|
||||
return m.accountsChangedFeed.Subscribe(pubKeysChan)
|
||||
}
|
||||
|
||||
@@ -362,7 +365,7 @@ func TestWaitMultipleActivation_LogsActivationEpochOK(t *testing.T) {
|
||||
resp,
|
||||
nil,
|
||||
)
|
||||
require.NoError(t, v.WaitForActivation(context.Background(), make(chan struct{})), "Could not wait for activation")
|
||||
require.NoError(t, v.WaitForActivation(context.Background()), "Could not wait for activation")
|
||||
require.LogsContain(t, hook, "Validator activated")
|
||||
}
|
||||
|
||||
@@ -400,7 +403,7 @@ func TestWaitActivation_NotAllValidatorsActivatedOK(t *testing.T) {
|
||||
resp,
|
||||
nil,
|
||||
)
|
||||
assert.NoError(t, v.WaitForActivation(context.Background(), make(chan struct{})), "Could not wait for activation")
|
||||
assert.NoError(t, v.WaitForActivation(context.Background()), "Could not wait for activation")
|
||||
}
|
||||
|
||||
func TestWaitSync_ContextCanceled(t *testing.T) {
|
||||
|
||||
@@ -19,8 +19,28 @@ import (
|
||||
|
||||
// WaitForActivation checks whether the validator pubkey is in the active
|
||||
// validator set. If not, this operation will block until an activation message is
|
||||
// received.
|
||||
func (v *validator) WaitForActivation(ctx context.Context, accountsChangedChan <-chan struct{}) error {
|
||||
// received. This method also monitors the keymanager for updates while waiting for an activation
|
||||
// from the gRPC server.
|
||||
func (v *validator) WaitForActivation(ctx context.Context) error {
|
||||
// Monitor the key manager for updates.
|
||||
accountsChangedChan := make(chan [][48]byte)
|
||||
sub := v.GetKeymanager().SubscribeAccountChanges(accountsChangedChan)
|
||||
defer func() {
|
||||
sub.Unsubscribe()
|
||||
close(accountsChangedChan)
|
||||
}()
|
||||
|
||||
return v.waitForActivation(ctx, accountsChangedChan)
|
||||
}
|
||||
|
||||
// waitForActivation performs the following:
|
||||
// 1) While the key manager is empty, poll the key manager until some validator keys exist.
|
||||
// 2) Open a server side stream for activation events against the given keys.
|
||||
// 3) In another go routine, the key manager is monitored for updates and emits an update event on
|
||||
// the accountsChangedChan. When an event signal is received, restart the waitForActivation routine.
|
||||
// 4) If the stream is reset in error, restart the routine.
|
||||
// 5) If the stream returns a response indicating one or more validators are active, exit the routine.
|
||||
func (v *validator) waitForActivation(ctx context.Context, accountsChangedChan <-chan [][48]byte) error {
|
||||
ctx, span := trace.StartSpan(ctx, "validator.WaitForActivation")
|
||||
defer span.End()
|
||||
|
||||
@@ -63,13 +83,13 @@ func (v *validator) WaitForActivation(ctx context.Context, accountsChangedChan <
|
||||
Error("Stream broken while waiting for activation. Reconnecting...")
|
||||
// Reconnection attempt backoff, up to 60s.
|
||||
time.Sleep(time.Second * time.Duration(mathutil.Min(uint64(attempts), 60)))
|
||||
return v.WaitForActivation(incrementRetries(ctx), accountsChangedChan)
|
||||
return v.waitForActivation(incrementRetries(ctx), accountsChangedChan)
|
||||
}
|
||||
for {
|
||||
select {
|
||||
case <-accountsChangedChan:
|
||||
// Accounts (keys) changed, restart the process.
|
||||
return v.WaitForActivation(ctx, accountsChangedChan)
|
||||
return v.waitForActivation(ctx, accountsChangedChan)
|
||||
default:
|
||||
res, err := stream.Recv()
|
||||
// If the stream is closed, we stop the loop.
|
||||
@@ -87,7 +107,7 @@ func (v *validator) WaitForActivation(ctx context.Context, accountsChangedChan <
|
||||
Error("Stream broken while waiting for activation. Reconnecting...")
|
||||
// Reconnection attempt backoff, up to 60s.
|
||||
time.Sleep(time.Second * time.Duration(mathutil.Min(uint64(attempts), 60)))
|
||||
return v.WaitForActivation(incrementRetries(ctx), accountsChangedChan)
|
||||
return v.waitForActivation(incrementRetries(ctx), accountsChangedChan)
|
||||
}
|
||||
valActivated := v.checkAndLogValidatorStatus(res.Statuses)
|
||||
|
||||
|
||||
@@ -3,7 +3,6 @@ package client
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
constant "github.com/prysmaticlabs/prysm/validator/testing"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
@@ -16,6 +15,7 @@ import (
|
||||
"github.com/prysmaticlabs/prysm/shared/testutil/require"
|
||||
walletMock "github.com/prysmaticlabs/prysm/validator/accounts/testing"
|
||||
"github.com/prysmaticlabs/prysm/validator/keymanager/derived"
|
||||
constant "github.com/prysmaticlabs/prysm/validator/testing"
|
||||
logTest "github.com/sirupsen/logrus/hooks/test"
|
||||
"github.com/tyler-smith/go-bip39"
|
||||
util "github.com/wealdtech/go-eth2-util"
|
||||
@@ -52,7 +52,7 @@ func TestWaitActivation_ContextCanceled(t *testing.T) {
|
||||
)
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
cancel()
|
||||
assert.ErrorContains(t, cancelledCtx, v.WaitForActivation(ctx, make(chan struct{})))
|
||||
assert.ErrorContains(t, cancelledCtx, v.WaitForActivation(ctx))
|
||||
}
|
||||
|
||||
func TestWaitActivation_StreamSetupFails_AttemptsToReconnect(t *testing.T) {
|
||||
@@ -83,7 +83,7 @@ func TestWaitActivation_StreamSetupFails_AttemptsToReconnect(t *testing.T) {
|
||||
resp := generateMockStatusResponse([][]byte{pubKey[:]})
|
||||
resp.Statuses[0].Status.Status = ethpb.ValidatorStatus_ACTIVE
|
||||
clientStream.EXPECT().Recv().Return(resp, nil)
|
||||
assert.NoError(t, v.WaitForActivation(context.Background(), make(chan struct{})))
|
||||
assert.NoError(t, v.WaitForActivation(context.Background()))
|
||||
}
|
||||
|
||||
func TestWaitForActivation_ReceiveErrorFromStream_AttemptsReconnection(t *testing.T) {
|
||||
@@ -118,7 +118,7 @@ func TestWaitForActivation_ReceiveErrorFromStream_AttemptsReconnection(t *testin
|
||||
nil,
|
||||
errors.New("fails"),
|
||||
).Return(resp, nil)
|
||||
assert.NoError(t, v.WaitForActivation(context.Background(), make(chan struct{})))
|
||||
assert.NoError(t, v.WaitForActivation(context.Background()))
|
||||
}
|
||||
|
||||
func TestWaitActivation_LogsActivationEpochOK(t *testing.T) {
|
||||
@@ -153,7 +153,7 @@ func TestWaitActivation_LogsActivationEpochOK(t *testing.T) {
|
||||
resp,
|
||||
nil,
|
||||
)
|
||||
assert.NoError(t, v.WaitForActivation(context.Background(), make(chan struct{})), "Could not wait for activation")
|
||||
assert.NoError(t, v.WaitForActivation(context.Background()), "Could not wait for activation")
|
||||
assert.LogsContain(t, hook, "Validator activated")
|
||||
}
|
||||
|
||||
@@ -188,7 +188,7 @@ func TestWaitForActivation_Exiting(t *testing.T) {
|
||||
resp,
|
||||
nil,
|
||||
)
|
||||
assert.NoError(t, v.WaitForActivation(context.Background(), make(chan struct{})))
|
||||
assert.NoError(t, v.WaitForActivation(context.Background()))
|
||||
}
|
||||
|
||||
func TestWaitForActivation_RefetchKeys(t *testing.T) {
|
||||
@@ -230,7 +230,7 @@ func TestWaitForActivation_RefetchKeys(t *testing.T) {
|
||||
resp,
|
||||
nil,
|
||||
)
|
||||
assert.NoError(t, v.WaitForActivation(context.Background(), make(chan struct{})), "Could not wait for activation")
|
||||
assert.NoError(t, v.waitForActivation(context.Background(), make(chan [][48]byte)), "Could not wait for activation")
|
||||
assert.LogsContain(t, hook, msgNoKeysFetched)
|
||||
assert.LogsContain(t, hook, "Validator activated")
|
||||
}
|
||||
@@ -291,15 +291,14 @@ func TestWaitForActivation_AccountsChanged(t *testing.T) {
|
||||
nil,
|
||||
)
|
||||
|
||||
channel := make(chan struct{})
|
||||
go func() {
|
||||
// We add the active key into the keymanager and simulate a key refresh.
|
||||
time.Sleep(time.Second * 1)
|
||||
km.keysMap[activePubKey] = activePrivKey
|
||||
channel <- struct{}{}
|
||||
km.SimulateAccountChanges()
|
||||
}()
|
||||
|
||||
assert.NoError(t, v.WaitForActivation(context.Background(), channel))
|
||||
assert.NoError(t, v.WaitForActivation(context.Background()))
|
||||
assert.LogsContain(t, hook, "Waiting for deposit to be observed by beacon node")
|
||||
assert.LogsContain(t, hook, "Validator activated")
|
||||
})
|
||||
@@ -365,16 +364,16 @@ func TestWaitForActivation_AccountsChanged(t *testing.T) {
|
||||
nil,
|
||||
)
|
||||
|
||||
channel := make(chan struct{})
|
||||
channel := make(chan [][48]byte)
|
||||
go func() {
|
||||
// We add the active key into the keymanager and simulate a key refresh.
|
||||
time.Sleep(time.Second * 1)
|
||||
err = km.RecoverAccountsFromMnemonic(ctx, constant.TestMnemonic, "", 2)
|
||||
require.NoError(t, err)
|
||||
channel <- struct{}{}
|
||||
channel <- [][48]byte{}
|
||||
}()
|
||||
|
||||
assert.NoError(t, v.WaitForActivation(context.Background(), channel))
|
||||
assert.NoError(t, v.waitForActivation(context.Background(), channel))
|
||||
assert.LogsContain(t, hook, "Waiting for deposit to be observed by beacon node")
|
||||
assert.LogsContain(t, hook, "Validator activated")
|
||||
})
|
||||
|
||||
@@ -3,7 +3,6 @@ package derived
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
constant "github.com/prysmaticlabs/prysm/validator/testing"
|
||||
"testing"
|
||||
|
||||
validatorpb "github.com/prysmaticlabs/prysm/proto/validator/accounts/v2"
|
||||
@@ -12,6 +11,7 @@ import (
|
||||
"github.com/prysmaticlabs/prysm/shared/testutil/assert"
|
||||
"github.com/prysmaticlabs/prysm/shared/testutil/require"
|
||||
mock "github.com/prysmaticlabs/prysm/validator/accounts/testing"
|
||||
constant "github.com/prysmaticlabs/prysm/validator/testing"
|
||||
"github.com/tyler-smith/go-bip39"
|
||||
util "github.com/wealdtech/go-eth2-util"
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user