mirror of
https://github.com/OffchainLabs/prysm.git
synced 2026-01-07 20:43:57 -05:00
move account changed channel into validator for code cleanup (#15298)
* move account changed channel into validator for code cleanup * gaz
This commit is contained in:
3
changelog/james-prysm_waitforactivation-cleanup.md
Normal file
3
changelog/james-prysm_waitforactivation-cleanup.md
Normal file
@@ -0,0 +1,3 @@
|
||||
### Ignored
|
||||
|
||||
- code cleanup on wait for activation and keymanagement through moving the account changed channel as a field in the validator.
|
||||
@@ -36,9 +36,10 @@ const (
|
||||
// Validator interface defines the primary methods of a validator client.
|
||||
type Validator interface {
|
||||
Done()
|
||||
AccountsChangedChan() <-chan [][fieldparams.BLSPubkeyLength]byte
|
||||
WaitForChainStart(ctx context.Context) error
|
||||
WaitForSync(ctx context.Context) error
|
||||
WaitForActivation(ctx context.Context, accountsChangedChan chan [][fieldparams.BLSPubkeyLength]byte) error
|
||||
WaitForActivation(ctx context.Context) error
|
||||
CanonicalHeadSlot(ctx context.Context) (primitives.Slot, error)
|
||||
NextSlot() <-chan primitives.Slot
|
||||
SlotDeadline(slot primitives.Slot) time.Time
|
||||
@@ -57,7 +58,7 @@ type Validator interface {
|
||||
Keymanager() (keymanager.IKeymanager, error)
|
||||
HandleKeyReload(ctx context.Context, currentKeys [][fieldparams.BLSPubkeyLength]byte) (bool, error)
|
||||
CheckDoppelGanger(ctx context.Context) error
|
||||
PushProposerSettings(ctx context.Context, km keymanager.IKeymanager, slot primitives.Slot, forceFullPush bool) error
|
||||
PushProposerSettings(ctx context.Context, slot primitives.Slot, forceFullPush bool) error
|
||||
SignValidatorRegistrationRequest(ctx context.Context, signer SigningFunc, newValidatorRegistration *ethpb.ValidatorRegistrationV1) (*ethpb.SignedValidatorRegistrationV1, bool /* isCached */, error)
|
||||
StartEventStream(ctx context.Context, topics []string, eventsChan chan<- *event.Event)
|
||||
EventStreamIsRunning() bool
|
||||
|
||||
@@ -58,27 +58,19 @@ func run(ctx context.Context, v iface.Validator) {
|
||||
healthTracker := v.HealthTracker()
|
||||
runHealthCheckRoutine(ctx, v, eventsChan)
|
||||
|
||||
accountsChangedChan := make(chan [][fieldparams.BLSPubkeyLength]byte, 1)
|
||||
km, err := v.Keymanager()
|
||||
if err != nil {
|
||||
log.WithError(err).Fatal("Could not get keymanager")
|
||||
}
|
||||
sub := km.SubscribeAccountChanges(accountsChangedChan)
|
||||
// check if proposer settings is still nil
|
||||
// Set properties on the beacon node like the fee recipient for validators that are being used & active.
|
||||
if v.ProposerSettings() == nil {
|
||||
log.Warn("Validator client started without proposer settings such as fee recipient" +
|
||||
" and will continue to use settings provided in the beacon node.")
|
||||
}
|
||||
if err := v.PushProposerSettings(ctx, km, headSlot, true); err != nil {
|
||||
if err := v.PushProposerSettings(ctx, headSlot, true); err != nil {
|
||||
log.WithError(err).Fatal("Failed to update proposer settings")
|
||||
}
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
log.Info("Context canceled, stopping validator")
|
||||
sub.Unsubscribe()
|
||||
close(accountsChangedChan)
|
||||
return // Exit if context is canceled.
|
||||
case slot := <-v.NextSlot():
|
||||
if !healthTracker.IsHealthy(ctx) {
|
||||
@@ -113,7 +105,7 @@ func run(ctx context.Context, v iface.Validator) {
|
||||
// call push proposer settings often to account for the following edge cases:
|
||||
// proposer is activated at the start of epoch and tries to propose immediately
|
||||
// account has changed in the middle of an epoch
|
||||
if err := v.PushProposerSettings(slotCtx, km, slot, false); err != nil {
|
||||
if err := v.PushProposerSettings(slotCtx, slot, false); err != nil {
|
||||
log.WithError(err).Warn("Failed to update proposer settings")
|
||||
}
|
||||
|
||||
@@ -159,13 +151,13 @@ func run(ctx context.Context, v iface.Validator) {
|
||||
}
|
||||
case e := <-eventsChan:
|
||||
v.ProcessEvent(ctx, e)
|
||||
case currentKeys := <-accountsChangedChan: // should be less of a priority than next slot
|
||||
onAccountsChanged(ctx, v, currentKeys, accountsChangedChan)
|
||||
case currentKeys := <-v.AccountsChangedChan(): // should be less of a priority than next slot
|
||||
onAccountsChanged(ctx, v, currentKeys)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func onAccountsChanged(ctx context.Context, v iface.Validator, current [][48]byte, ac chan [][fieldparams.BLSPubkeyLength]byte) {
|
||||
func onAccountsChanged(ctx context.Context, v iface.Validator, current [][48]byte) {
|
||||
ctx, span := prysmTrace.StartSpan(ctx, "validator.accountsChanged")
|
||||
defer span.End()
|
||||
|
||||
@@ -175,7 +167,7 @@ func onAccountsChanged(ctx context.Context, v iface.Validator, current [][48]byt
|
||||
}
|
||||
if !anyActive {
|
||||
log.Warn("No active keys found. Waiting for activation...")
|
||||
err := v.WaitForActivation(ctx, ac)
|
||||
err := v.WaitForActivation(ctx)
|
||||
if err != nil {
|
||||
log.WithError(err).Warn("Could not wait for validator activation")
|
||||
}
|
||||
@@ -231,7 +223,7 @@ func initializeValidatorAndGetHeadSlot(ctx context.Context, v iface.Validator) (
|
||||
log.WithError(err).Fatal("Could not determine if beacon node synced")
|
||||
}
|
||||
|
||||
if err := v.WaitForActivation(ctx, nil /* accountsChangedChan */); err != nil {
|
||||
if err := v.WaitForActivation(ctx); err != nil {
|
||||
log.WithError(err).Fatal("Could not wait for validator activation")
|
||||
}
|
||||
|
||||
@@ -338,17 +330,12 @@ func runHealthCheckRoutine(ctx context.Context, v iface.Validator, eventsChan ch
|
||||
continue // Skip to the next ticker
|
||||
}
|
||||
|
||||
km, err := v.Keymanager()
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Could not get keymanager")
|
||||
return
|
||||
}
|
||||
slot, err := v.CanonicalHeadSlot(ctx)
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Could not get canonical head slot")
|
||||
return
|
||||
}
|
||||
if err := v.PushProposerSettings(ctx, km, slot, true); err != nil {
|
||||
if err := v.PushProposerSettings(ctx, slot, true); err != nil {
|
||||
log.WithError(err).Warn("Failed to update proposer settings")
|
||||
}
|
||||
}
|
||||
|
||||
@@ -265,10 +265,9 @@ func TestKeyReload_ActiveKey(t *testing.T) {
|
||||
node := health.NewMockHealthClient(ctrl)
|
||||
tracker := health.NewTracker(node)
|
||||
node.EXPECT().IsHealthy(gomock.Any()).Return(true).AnyTimes()
|
||||
v := &testutil.FakeValidator{Km: km, Tracker: tracker}
|
||||
ac := make(chan [][fieldparams.BLSPubkeyLength]byte)
|
||||
v := &testutil.FakeValidator{Km: km, Tracker: tracker, AccountsChannel: make(chan [][fieldparams.BLSPubkeyLength]byte)}
|
||||
current := [][fieldparams.BLSPubkeyLength]byte{testutil.ActiveKey}
|
||||
onAccountsChanged(ctx, v, current, ac)
|
||||
onAccountsChanged(ctx, v, current)
|
||||
assert.Equal(t, true, v.HandleKeyReloadCalled)
|
||||
// HandleKeyReloadCalled in the FakeValidator returns true if one of the keys is equal to the
|
||||
// ActiveKey. WaitForActivation is only called if none of the keys are active, so it shouldn't be called at all.
|
||||
@@ -284,10 +283,9 @@ func TestKeyReload_NoActiveKey(t *testing.T) {
|
||||
node := health.NewMockHealthClient(ctrl)
|
||||
tracker := health.NewTracker(node)
|
||||
node.EXPECT().IsHealthy(gomock.Any()).Return(true).AnyTimes()
|
||||
v := &testutil.FakeValidator{Km: km, Tracker: tracker}
|
||||
ac := make(chan [][fieldparams.BLSPubkeyLength]byte)
|
||||
v := &testutil.FakeValidator{Km: km, Tracker: tracker, AccountsChannel: make(chan [][fieldparams.BLSPubkeyLength]byte)}
|
||||
current := [][fieldparams.BLSPubkeyLength]byte{na}
|
||||
onAccountsChanged(ctx, v, current, ac)
|
||||
onAccountsChanged(ctx, v, current)
|
||||
assert.Equal(t, true, v.HandleKeyReloadCalled)
|
||||
// HandleKeyReloadCalled in the FakeValidator returns true if one of the keys is equal to the
|
||||
// ActiveKey. Since we are using a key we know is not active, it should return false, which
|
||||
|
||||
@@ -222,6 +222,7 @@ func (v *ValidatorService) Start() {
|
||||
enableAPI: v.enableAPI,
|
||||
distributed: v.distributed,
|
||||
disableDutiesPolling: v.disableDutiesPolling,
|
||||
accountsChangedChannel: make(chan [][fieldparams.BLSPubkeyLength]byte, 1),
|
||||
}
|
||||
|
||||
v.validator = valStruct
|
||||
|
||||
@@ -14,6 +14,7 @@ go_library(
|
||||
"//api/client/beacon/health:go_default_library",
|
||||
"//api/client/event:go_default_library",
|
||||
"//config/fieldparams:go_default_library",
|
||||
"//config/params:go_default_library",
|
||||
"//config/proposer:go_default_library",
|
||||
"//consensus-types/primitives:go_default_library",
|
||||
"//encoding/bytesutil:go_default_library",
|
||||
|
||||
@@ -10,6 +10,7 @@ import (
|
||||
"github.com/OffchainLabs/prysm/v6/api/client/beacon/health"
|
||||
"github.com/OffchainLabs/prysm/v6/api/client/event"
|
||||
fieldparams "github.com/OffchainLabs/prysm/v6/config/fieldparams"
|
||||
"github.com/OffchainLabs/prysm/v6/config/params"
|
||||
"github.com/OffchainLabs/prysm/v6/config/proposer"
|
||||
"github.com/OffchainLabs/prysm/v6/consensus-types/primitives"
|
||||
ethpb "github.com/OffchainLabs/prysm/v6/proto/prysm/v1alpha1"
|
||||
@@ -23,46 +24,50 @@ var _ iface.Validator = (*FakeValidator)(nil)
|
||||
|
||||
// FakeValidator for mocking.
|
||||
type FakeValidator struct {
|
||||
DoneCalled bool
|
||||
WaitForWalletInitializationCalled bool
|
||||
SlasherReadyCalled bool
|
||||
NextSlotCalled bool
|
||||
UpdateDutiesCalled bool
|
||||
UpdateProtectionsCalled bool
|
||||
RoleAtCalled bool
|
||||
AttestToBlockHeadCalled bool
|
||||
ProposeBlockCalled bool
|
||||
IsRegularDeadline bool
|
||||
LogValidatorGainsAndLossesCalled bool
|
||||
SaveProtectionsCalled bool
|
||||
DeleteProtectionCalled bool
|
||||
SlotDeadlineCalled bool
|
||||
HandleKeyReloadCalled bool
|
||||
WaitForChainStartCalled int
|
||||
WaitForSyncCalled int
|
||||
WaitForActivationCalled int
|
||||
CanonicalHeadSlotCalled int
|
||||
ReceiveBlocksCalled int
|
||||
RetryTillSuccess int
|
||||
ProposeBlockArg1 uint64
|
||||
AttestToBlockHeadArg1 uint64
|
||||
RoleAtArg1 uint64
|
||||
UpdateDutiesArg1 uint64
|
||||
NextSlotRet <-chan primitives.Slot
|
||||
PublicKey string
|
||||
UpdateDutiesRet error
|
||||
ProposerSettingsErr error
|
||||
RolesAtRet []iface.ValidatorRole
|
||||
Balances map[[fieldparams.BLSPubkeyLength]byte]uint64
|
||||
WaitForWalletInitializationCalled bool
|
||||
SlasherReadyCalled bool
|
||||
NextSlotCalled bool
|
||||
AttestToBlockHeadCalled bool
|
||||
DoneCalled bool
|
||||
ProposeBlockCalled bool
|
||||
UpdateProtectionsCalled bool
|
||||
UpdateDutiesCalled bool
|
||||
RoleAtCalled bool
|
||||
IndexToPubkeyMap map[uint64][fieldparams.BLSPubkeyLength]byte
|
||||
PubkeyToIndexMap map[[fieldparams.BLSPubkeyLength]byte]uint64
|
||||
PubkeysToStatusesMap map[[fieldparams.BLSPubkeyLength]byte]ethpb.ValidatorStatus
|
||||
proposerSettings *proposer.Settings
|
||||
ProposerSettingWait time.Duration
|
||||
NextSlotRet <-chan primitives.Slot
|
||||
UpdateDutiesArg1 uint64
|
||||
RoleAtArg1 uint64
|
||||
AttestToBlockHeadArg1 uint64
|
||||
ProposeBlockArg1 uint64
|
||||
RetryTillSuccess int
|
||||
Balances map[[fieldparams.BLSPubkeyLength]byte]uint64
|
||||
CanonicalHeadSlotCalled int
|
||||
WaitForActivationCalled int
|
||||
WaitForSyncCalled int
|
||||
WaitForChainStartCalled int
|
||||
AttSubmitted chan interface{}
|
||||
BlockProposed chan interface{}
|
||||
AccountsChannel chan [][fieldparams.BLSPubkeyLength]byte
|
||||
EventsChannel chan *event.Event
|
||||
GenesisT uint64
|
||||
ReceiveBlocksCalled int
|
||||
proposerSettings *proposer.Settings
|
||||
UpdateDutiesRet error
|
||||
ProposerSettingsErr error
|
||||
Km keymanager.IKeymanager
|
||||
graffiti string
|
||||
Tracker health.Tracker
|
||||
AttSubmitted chan interface{}
|
||||
BlockProposed chan interface{}
|
||||
PublicKey string
|
||||
RolesAtRet []iface.ValidatorRole
|
||||
}
|
||||
|
||||
// Done for mocking.
|
||||
@@ -70,6 +75,14 @@ func (fv *FakeValidator) Done() {
|
||||
fv.DoneCalled = true
|
||||
}
|
||||
|
||||
func (fv *FakeValidator) AccountsChangedChan() <-chan [][fieldparams.BLSPubkeyLength]byte {
|
||||
return fv.AccountsChannel
|
||||
}
|
||||
|
||||
func (fv *FakeValidator) GenesisTime() uint64 {
|
||||
return fv.GenesisT
|
||||
}
|
||||
|
||||
// WaitForKeymanagerInitialization for mocking.
|
||||
func (fv *FakeValidator) WaitForKeymanagerInitialization(_ context.Context) error {
|
||||
fv.WaitForWalletInitializationCalled = true
|
||||
@@ -89,9 +102,9 @@ func (fv *FakeValidator) WaitForChainStart(_ context.Context) error {
|
||||
}
|
||||
|
||||
// WaitForActivation for mocking.
|
||||
func (fv *FakeValidator) WaitForActivation(_ context.Context, accountChan chan [][fieldparams.BLSPubkeyLength]byte) error {
|
||||
func (fv *FakeValidator) WaitForActivation(_ context.Context) error {
|
||||
fv.WaitForActivationCalled++
|
||||
if accountChan == nil {
|
||||
if fv.AccountsChannel == nil {
|
||||
return nil
|
||||
}
|
||||
if fv.RetryTillSuccess >= fv.WaitForActivationCalled {
|
||||
@@ -127,6 +140,9 @@ func (fv *FakeValidator) CanonicalHeadSlot(_ context.Context) (primitives.Slot,
|
||||
// SlotDeadline for mocking.
|
||||
func (fv *FakeValidator) SlotDeadline(_ primitives.Slot) time.Time {
|
||||
fv.SlotDeadlineCalled = true
|
||||
if fv.IsRegularDeadline {
|
||||
return prysmTime.Now().Add(time.Duration(params.BeaconConfig().SecondsPerSlot) * time.Second)
|
||||
}
|
||||
return prysmTime.Now()
|
||||
}
|
||||
|
||||
@@ -253,7 +269,7 @@ func (*FakeValidator) HasProposerSettings() bool {
|
||||
}
|
||||
|
||||
// PushProposerSettings for mocking
|
||||
func (fv *FakeValidator) PushProposerSettings(ctx context.Context, _ keymanager.IKeymanager, _ primitives.Slot, _ bool) error {
|
||||
func (fv *FakeValidator) PushProposerSettings(ctx context.Context, _ primitives.Slot, _ bool) error {
|
||||
time.Sleep(fv.ProposerSettingWait)
|
||||
if errors.Is(ctx.Err(), context.DeadlineExceeded) {
|
||||
log.Error("deadline exceeded")
|
||||
|
||||
@@ -113,6 +113,8 @@ type validator struct {
|
||||
attSelectionLock sync.Mutex
|
||||
dutiesLock sync.RWMutex
|
||||
disableDutiesPolling bool
|
||||
accountsChangedChannel chan [][fieldparams.BLSPubkeyLength]byte
|
||||
accountChangedSub event.Subscription
|
||||
}
|
||||
|
||||
type validatorStatus struct {
|
||||
@@ -128,7 +130,16 @@ type attSelectionKey struct {
|
||||
|
||||
// Done cleans up the validator.
|
||||
func (v *validator) Done() {
|
||||
v.ticker.Done()
|
||||
if v.accountChangedSub != nil {
|
||||
v.accountChangedSub.Unsubscribe()
|
||||
}
|
||||
if v.ticker != nil {
|
||||
v.ticker.Done()
|
||||
}
|
||||
}
|
||||
|
||||
func (v *validator) AccountsChangedChan() <-chan [][fieldparams.BLSPubkeyLength]byte {
|
||||
return v.accountsChangedChannel
|
||||
}
|
||||
|
||||
// WaitForKeymanagerInitialization checks if the validator needs to wait for keymanager initialization.
|
||||
@@ -170,6 +181,7 @@ func (v *validator) WaitForKeymanagerInitialization(ctx context.Context) error {
|
||||
return errors.New("key manager not set")
|
||||
}
|
||||
recheckKeys(ctx, v.db, v.km)
|
||||
v.accountChangedSub = v.km.SubscribeAccountChanges(v.accountsChangedChannel)
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -1079,12 +1091,13 @@ func (v *validator) SetProposerSettings(ctx context.Context, settings *proposer.
|
||||
}
|
||||
|
||||
// PushProposerSettings calls the prepareBeaconProposer RPC to set the fee recipient and also the register validator API if using a custom builder.
|
||||
func (v *validator) PushProposerSettings(ctx context.Context, km keymanager.IKeymanager, slot primitives.Slot, forceFullPush bool) error {
|
||||
func (v *validator) PushProposerSettings(ctx context.Context, slot primitives.Slot, forceFullPush bool) error {
|
||||
ctx, span := trace.StartSpan(ctx, "validator.PushProposerSettings")
|
||||
defer span.End()
|
||||
|
||||
if km == nil {
|
||||
return errors.New("keymanager is nil when calling PrepareBeaconProposer")
|
||||
km, err := v.Keymanager()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
pubkeys, err := km.FetchValidatingPublicKeys(ctx)
|
||||
|
||||
@@ -2050,7 +2050,7 @@ func TestValidator_PushSettings(t *testing.T) {
|
||||
require.Equal(t, len(tt.mockExpectedRequests), len(signedRegisterValidatorRequests))
|
||||
require.Equal(t, len(signedRegisterValidatorRequests), len(v.signedValidatorRegistrations))
|
||||
}
|
||||
if err := v.PushProposerSettings(ctx, km, 0, false); tt.err != "" {
|
||||
if err := v.PushProposerSettings(ctx, 0, false); tt.err != "" {
|
||||
assert.ErrorContains(t, tt.err, err)
|
||||
}
|
||||
if len(tt.logMessages) > 0 {
|
||||
|
||||
@@ -4,7 +4,6 @@ import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
fieldparams "github.com/OffchainLabs/prysm/v6/config/fieldparams"
|
||||
"github.com/OffchainLabs/prysm/v6/math"
|
||||
"github.com/OffchainLabs/prysm/v6/monitoring/tracing"
|
||||
"github.com/OffchainLabs/prysm/v6/monitoring/tracing/trace"
|
||||
@@ -19,26 +18,7 @@ import (
|
||||
// from the gRPC server.
|
||||
//
|
||||
// If the channel parameter is nil, WaitForActivation creates and manages its own channel.
|
||||
func (v *validator) WaitForActivation(ctx context.Context, accountsChangedChan chan [][fieldparams.BLSPubkeyLength]byte) error {
|
||||
// Monitor the key manager for updates.
|
||||
if accountsChangedChan == nil {
|
||||
accountsChangedChan = make(chan [][fieldparams.BLSPubkeyLength]byte, 1)
|
||||
km, err := v.Keymanager()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// subscribe to the channel if it's the first time
|
||||
sub := km.SubscribeAccountChanges(accountsChangedChan)
|
||||
defer func() {
|
||||
sub.Unsubscribe()
|
||||
close(accountsChangedChan)
|
||||
}()
|
||||
}
|
||||
return v.internalWaitForActivation(ctx, accountsChangedChan)
|
||||
}
|
||||
|
||||
// internalWaitForActivation recursively waits for at least one active validator key
|
||||
func (v *validator) internalWaitForActivation(ctx context.Context, accountsChangedChan <-chan [][fieldparams.BLSPubkeyLength]byte) error {
|
||||
func (v *validator) WaitForActivation(ctx context.Context) error {
|
||||
ctx, span := trace.StartSpan(ctx, "validator.WaitForActivation")
|
||||
defer span.End()
|
||||
|
||||
@@ -51,12 +31,12 @@ func (v *validator) internalWaitForActivation(ctx context.Context, accountsChang
|
||||
// Step 2: If no keys, wait for accounts change or context cancellation.
|
||||
if len(validatingKeys) == 0 {
|
||||
log.Warn(msgNoKeysFetched)
|
||||
return v.waitForAccountsChange(ctx, accountsChangedChan)
|
||||
return v.waitForAccountsChange(ctx)
|
||||
}
|
||||
|
||||
// Step 3: update validator statuses in cache.
|
||||
if err := v.updateValidatorStatusCache(ctx, validatingKeys); err != nil {
|
||||
return v.retryWaitForActivation(ctx, span, err, "Connection broken while waiting for activation. Reconnecting...", accountsChangedChan)
|
||||
return v.retryWaitForActivation(ctx, span, err, "Connection broken while waiting for activation. Reconnecting...")
|
||||
}
|
||||
|
||||
// Step 4: Check and log validator statuses.
|
||||
@@ -67,42 +47,42 @@ func (v *validator) internalWaitForActivation(ctx context.Context, accountsChang
|
||||
case <-ctx.Done():
|
||||
log.Debug("Context closed, exiting WaitForActivation")
|
||||
return ctx.Err()
|
||||
case <-accountsChangedChan:
|
||||
case <-v.accountsChangedChannel:
|
||||
// Accounts (keys) changed, restart the process.
|
||||
return v.internalWaitForActivation(ctx, accountsChangedChan)
|
||||
return v.WaitForActivation(ctx)
|
||||
default:
|
||||
if err := v.waitForNextEpoch(ctx, v.genesisTime, accountsChangedChan); err != nil {
|
||||
return v.retryWaitForActivation(ctx, span, err, "Failed to wait for next epoch. Reconnecting...", accountsChangedChan)
|
||||
if err := v.waitForNextEpoch(ctx, v.genesisTime); err != nil {
|
||||
return v.retryWaitForActivation(ctx, span, err, "Failed to wait for next epoch. Reconnecting...")
|
||||
}
|
||||
return v.internalWaitForActivation(incrementRetries(ctx), accountsChangedChan)
|
||||
return v.WaitForActivation(incrementRetries(ctx))
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (v *validator) retryWaitForActivation(ctx context.Context, span octrace.Span, err error, message string, accountsChangedChan <-chan [][fieldparams.BLSPubkeyLength]byte) error {
|
||||
func (v *validator) retryWaitForActivation(ctx context.Context, span octrace.Span, err error, message string) error {
|
||||
tracing.AnnotateError(span, err)
|
||||
attempts := activationAttempts(ctx)
|
||||
log.WithError(err).WithField("attempts", attempts).Error(message)
|
||||
// Reconnection attempt backoff, up to 60s.
|
||||
time.Sleep(time.Second * time.Duration(math.Min(uint64(attempts), 60)))
|
||||
// TODO: refactor this to use the health tracker instead for reattempt
|
||||
return v.internalWaitForActivation(incrementRetries(ctx), accountsChangedChan)
|
||||
return v.WaitForActivation(incrementRetries(ctx))
|
||||
}
|
||||
|
||||
func (v *validator) waitForAccountsChange(ctx context.Context, accountsChangedChan <-chan [][fieldparams.BLSPubkeyLength]byte) error {
|
||||
func (v *validator) waitForAccountsChange(ctx context.Context) error {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
log.Debug("Context closed, exiting waitForAccountsChange")
|
||||
return ctx.Err()
|
||||
case <-accountsChangedChan:
|
||||
case <-v.accountsChangedChannel:
|
||||
// If the accounts changed, try again.
|
||||
return v.internalWaitForActivation(ctx, accountsChangedChan)
|
||||
return v.WaitForActivation(ctx)
|
||||
}
|
||||
}
|
||||
|
||||
// waitForNextEpoch creates a blocking function to wait until the next epoch start given the current slot
|
||||
func (v *validator) waitForNextEpoch(ctx context.Context, genesisTimeSec uint64, accountsChangedChan <-chan [][fieldparams.BLSPubkeyLength]byte) error {
|
||||
func (v *validator) waitForNextEpoch(ctx context.Context, genesisTimeSec uint64) error {
|
||||
waitTime, err := slots.SecondsUntilNextEpochStart(genesisTimeSec)
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -112,9 +92,9 @@ func (v *validator) waitForNextEpoch(ctx context.Context, genesisTimeSec uint64,
|
||||
case <-ctx.Done():
|
||||
log.Debug("Context closed, exiting waitForNextEpoch")
|
||||
return ctx.Err()
|
||||
case <-accountsChangedChan:
|
||||
case <-v.accountsChangedChannel:
|
||||
// Accounts (keys) changed, restart the process.
|
||||
return v.internalWaitForActivation(ctx, accountsChangedChan)
|
||||
return v.WaitForActivation(ctx)
|
||||
case <-time.After(time.Duration(waitTime) * time.Second):
|
||||
log.Debug("Done waiting for epoch start")
|
||||
// The ticker has ticked, indicating we've reached the next epoch
|
||||
|
||||
@@ -31,10 +31,11 @@ func TestWaitActivation_Exiting_OK(t *testing.T) {
|
||||
prysmChainClient := validatormock.NewMockPrysmChainClient(ctrl)
|
||||
kp := randKeypair(t)
|
||||
v := validator{
|
||||
validatorClient: validatorClient,
|
||||
km: newMockKeymanager(t, kp),
|
||||
chainClient: chainClient,
|
||||
prysmChainClient: prysmChainClient,
|
||||
validatorClient: validatorClient,
|
||||
km: newMockKeymanager(t, kp),
|
||||
chainClient: chainClient,
|
||||
prysmChainClient: prysmChainClient,
|
||||
accountsChangedChannel: make(chan [][fieldparams.BLSPubkeyLength]byte, 1),
|
||||
}
|
||||
ctx := context.Background()
|
||||
resp := testutil.GenerateMultipleValidatorStatusResponse([][]byte{kp.pub[:]})
|
||||
@@ -46,7 +47,7 @@ func TestWaitActivation_Exiting_OK(t *testing.T) {
|
||||
},
|
||||
).Return(resp, nil)
|
||||
|
||||
require.NoError(t, v.WaitForActivation(ctx, nil))
|
||||
require.NoError(t, v.WaitForActivation(ctx))
|
||||
require.Equal(t, 1, len(v.pubkeyToStatus))
|
||||
}
|
||||
|
||||
@@ -83,19 +84,20 @@ func TestWaitForActivation_RefetchKeys(t *testing.T) {
|
||||
},
|
||||
).Return(resp, nil)
|
||||
|
||||
accountChan := make(chan [][fieldparams.BLSPubkeyLength]byte)
|
||||
accountChan := make(chan [][fieldparams.BLSPubkeyLength]byte, 1)
|
||||
sub := km.SubscribeAccountChanges(accountChan)
|
||||
defer func() {
|
||||
sub.Unsubscribe()
|
||||
close(accountChan)
|
||||
}()
|
||||
v.accountsChangedChannel = accountChan
|
||||
// update the accounts from 0 to 1 after a delay
|
||||
go func() {
|
||||
time.Sleep(1 * time.Second)
|
||||
require.NoError(t, km.add(kp))
|
||||
km.SimulateAccountChanges([][48]byte{kp.pub})
|
||||
}()
|
||||
assert.NoError(t, v.internalWaitForActivation(context.Background(), accountChan), "Could not wait for activation")
|
||||
assert.NoError(t, v.WaitForActivation(context.Background()), "Could not wait for activation")
|
||||
assert.LogsContain(t, hook, msgNoKeysFetched)
|
||||
assert.LogsContain(t, hook, "Validator activated")
|
||||
}
|
||||
@@ -112,13 +114,21 @@ func TestWaitForActivation_AccountsChanged(t *testing.T) {
|
||||
validatorClient := validatormock.NewMockValidatorClient(ctrl)
|
||||
chainClient := validatormock.NewMockChainClient(ctrl)
|
||||
prysmChainClient := validatormock.NewMockPrysmChainClient(ctrl)
|
||||
ch := make(chan [][fieldparams.BLSPubkeyLength]byte, 1)
|
||||
v := validator{
|
||||
validatorClient: validatorClient,
|
||||
km: km,
|
||||
chainClient: chainClient,
|
||||
prysmChainClient: prysmChainClient,
|
||||
pubkeyToStatus: make(map[[48]byte]*validatorStatus),
|
||||
validatorClient: validatorClient,
|
||||
km: km,
|
||||
chainClient: chainClient,
|
||||
prysmChainClient: prysmChainClient,
|
||||
pubkeyToStatus: make(map[[48]byte]*validatorStatus),
|
||||
accountsChangedChannel: ch,
|
||||
accountChangedSub: km.SubscribeAccountChanges(ch),
|
||||
}
|
||||
defer func() {
|
||||
close(v.accountsChangedChannel)
|
||||
v.accountChangedSub.Unsubscribe()
|
||||
}()
|
||||
|
||||
inactiveResp := testutil.GenerateMultipleValidatorStatusResponse([][]byte{inactive.pub[:]})
|
||||
inactiveResp.Statuses[0].Status = ethpb.ValidatorStatus_UNKNOWN_STATUS
|
||||
|
||||
@@ -149,7 +159,7 @@ func TestWaitForActivation_AccountsChanged(t *testing.T) {
|
||||
ðpb.ChainHead{HeadEpoch: 0},
|
||||
nil,
|
||||
).AnyTimes()
|
||||
assert.NoError(t, v.WaitForActivation(context.Background(), nil))
|
||||
assert.NoError(t, v.WaitForActivation(context.Background()))
|
||||
assert.LogsContain(t, hook, "Waiting for deposit to be observed by beacon node")
|
||||
assert.LogsContain(t, hook, "Validator activated")
|
||||
})
|
||||
@@ -199,6 +209,7 @@ func TestWaitForActivation_AccountsChanged(t *testing.T) {
|
||||
activeResp.Statuses[1].Status = ethpb.ValidatorStatus_ACTIVE
|
||||
channel := make(chan [][fieldparams.BLSPubkeyLength]byte, 1)
|
||||
km.SubscribeAccountChanges(channel)
|
||||
v.accountsChangedChannel = channel
|
||||
gomock.InOrder(
|
||||
validatorClient.EXPECT().MultipleValidatorStatus(
|
||||
gomock.Any(),
|
||||
@@ -227,7 +238,7 @@ func TestWaitForActivation_AccountsChanged(t *testing.T) {
|
||||
ðpb.ChainHead{HeadEpoch: 0},
|
||||
nil,
|
||||
).AnyTimes()
|
||||
assert.NoError(t, v.internalWaitForActivation(context.Background(), channel))
|
||||
assert.NoError(t, v.WaitForActivation(context.Background()))
|
||||
assert.LogsContain(t, hook, "Waiting for deposit to be observed by beacon node")
|
||||
assert.LogsContain(t, hook, "Validator activated")
|
||||
})
|
||||
@@ -246,11 +257,12 @@ func TestWaitForActivation_AttemptsReconnectionOnFailure(t *testing.T) {
|
||||
prysmChainClient := validatormock.NewMockPrysmChainClient(ctrl)
|
||||
kp := randKeypair(t)
|
||||
v := validator{
|
||||
validatorClient: validatorClient,
|
||||
km: newMockKeymanager(t, kp),
|
||||
chainClient: chainClient,
|
||||
prysmChainClient: prysmChainClient,
|
||||
pubkeyToStatus: make(map[[48]byte]*validatorStatus),
|
||||
validatorClient: validatorClient,
|
||||
km: newMockKeymanager(t, kp),
|
||||
chainClient: chainClient,
|
||||
prysmChainClient: prysmChainClient,
|
||||
pubkeyToStatus: make(map[[48]byte]*validatorStatus),
|
||||
accountsChangedChannel: make(chan [][fieldparams.BLSPubkeyLength]byte, 1),
|
||||
}
|
||||
active := randKeypair(t)
|
||||
activeResp := testutil.GenerateMultipleValidatorStatusResponse([][]byte{active.pub[:]})
|
||||
@@ -271,5 +283,5 @@ func TestWaitForActivation_AttemptsReconnectionOnFailure(t *testing.T) {
|
||||
ðpb.ChainHead{HeadEpoch: 0},
|
||||
nil,
|
||||
).AnyTimes()
|
||||
assert.NoError(t, v.WaitForActivation(context.Background(), nil))
|
||||
assert.NoError(t, v.WaitForActivation(context.Background()))
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user