validator client: removing need to call canonical head api (#15480)

* removing need to call cononical head api

* typo

* removing unneeded tests

* fixing unit tests
This commit is contained in:
james-prysm
2025-07-10 15:25:17 -05:00
committed by GitHub
parent bc7e4f7751
commit 83943b5dd8
8 changed files with 41 additions and 107 deletions

View File

@@ -35,12 +35,12 @@ const (
// Validator interface defines the primary methods of a validator client.
type Validator interface {
Done()
GenesisTime() uint64
EventsChan() <-chan *event.Event
AccountsChangedChan() <-chan [][fieldparams.BLSPubkeyLength]byte
WaitForChainStart(ctx context.Context) error
WaitForSync(ctx context.Context) error
WaitForActivation(ctx context.Context) error
CanonicalHeadSlot(ctx context.Context) (primitives.Slot, error)
NextSlot() <-chan primitives.Slot
SlotDeadline(slot primitives.Slot) time.Time
LogValidatorGainsAndLosses(ctx context.Context, slot primitives.Slot) error

View File

@@ -37,22 +37,23 @@ type runner struct {
// 2 - Wait for validator activation
func newRunner(ctx context.Context, v iface.Validator, monitor *healthMonitor) (*runner, error) {
// Initialize validator and get head slot
headSlot, err := initializeValidatorAndGetHeadSlot(ctx, v)
err := initialize(ctx, v)
if err != nil {
v.Done()
return nil, err
}
currentSlot := slots.CurrentSlot(v.GenesisTime()) // set in v.WaitForChainStart
// Prepare initial duties update
ss, err := slots.EpochStart(slots.ToEpoch(headSlot + 1))
ss, err := slots.EpochStart(slots.ToEpoch(currentSlot + 1))
if err != nil {
log.WithError(err).Error("Failed to get epoch start")
ss = headSlot
ss = currentSlot
}
startDeadline := v.SlotDeadline(ss + params.BeaconConfig().SlotsPerEpoch - 1)
startCtx, startCancel := context.WithDeadline(ctx, startDeadline)
if err := v.UpdateDuties(startCtx); err != nil {
// Don't return error here, just log it
handleAssignmentError(err, headSlot)
handleAssignmentError(err, currentSlot)
}
startCancel()
@@ -62,7 +63,7 @@ func newRunner(ctx context.Context, v iface.Validator, monitor *healthMonitor) (
log.Warn("Validator client started without proposer settings such as fee recipient" +
" and will continue to use settings provided in the beacon node.")
}
if err := v.PushProposerSettings(ctx, headSlot, true); err != nil {
if err := v.PushProposerSettings(ctx, currentSlot, true); err != nil {
v.Done()
return nil, errors.Wrap(err, "failed to update proposer settings")
}
@@ -172,8 +173,8 @@ func onAccountsChanged(ctx context.Context, v iface.Validator, current [][48]byt
}
}
func initializeValidatorAndGetHeadSlot(ctx context.Context, v iface.Validator) (primitives.Slot, error) {
ctx, span := prysmTrace.StartSpan(ctx, "validator.initializeValidatorAndGetHeadSlot")
func initialize(ctx context.Context, v iface.Validator) error {
ctx, span := prysmTrace.StartSpan(ctx, "validator.initialize")
defer span.End()
ticker := time.NewTicker(backOffPeriod)
@@ -181,16 +182,11 @@ func initializeValidatorAndGetHeadSlot(ctx context.Context, v iface.Validator) (
firstTime := true
var (
headSlot primitives.Slot
err error
)
for {
if !firstTime {
if ctx.Err() != nil {
log.Info("Context canceled, stopping validator")
return headSlot, errors.New("context canceled")
return errors.New("context canceled")
}
<-ticker.C
}
@@ -203,11 +199,11 @@ func initializeValidatorAndGetHeadSlot(ctx context.Context, v iface.Validator) (
continue
}
return 0, errors.Wrap(err, "could not determine if beacon chain started")
return errors.Wrap(err, "could not determine if beacon chain started")
}
if err := v.WaitForKeymanagerInitialization(ctx); err != nil {
return 0, errors.Wrap(err, "Wallet is not ready")
return errors.Wrap(err, "Wallet is not ready")
}
if err := v.WaitForSync(ctx); err != nil {
@@ -216,21 +212,11 @@ func initializeValidatorAndGetHeadSlot(ctx context.Context, v iface.Validator) (
continue
}
return 0, errors.Wrap(err, "could not determine if beacon node synced")
return errors.Wrap(err, "could not determine if beacon node synced")
}
if err := v.WaitForActivation(ctx); err != nil {
return 0, errors.Wrap(err, "could not wait for validator activation")
}
headSlot, err = v.CanonicalHeadSlot(ctx)
if isConnectionError(err) {
log.WithError(err).Warn("could not get current canonical head slot")
continue
}
if err != nil {
return 0, errors.Wrap(err, "could not get current canonical head slot")
return errors.Wrap(err, "could not wait for validator activation")
}
if err := v.CheckDoppelGanger(ctx); err != nil {
@@ -239,11 +225,12 @@ func initializeValidatorAndGetHeadSlot(ctx context.Context, v iface.Validator) (
continue
}
return 0, errors.Wrap(err, "could not succeed with doppelganger check")
return errors.Wrap(err, "could not succeed with doppelganger check")
}
break
}
return headSlot, nil
return nil
}
func performRoles(slotCtx context.Context, allRoles map[[48]byte][]iface.ValidatorRole, v iface.Validator, slot primitives.Slot, wg *sync.WaitGroup, span trace.Span) {

View File

@@ -86,10 +86,9 @@ func TestRetry_On_ConnectionError(t *testing.T) {
time.Sleep(time.Duration(retry*6) * backOffPeriod)
cancel()
// every call will fail retry=10 times so first one will be called 4 * retry=10.
assert.Equal(t, retry*3, v.WaitForChainStartCalled, "Expected WaitForChainStart() to be called")
assert.Equal(t, retry*2, v.WaitForSyncCalled, "Expected WaitForSync() to be called")
assert.Equal(t, retry, v.WaitForActivationCalled, "Expected WaitForActivation() to be called")
assert.Equal(t, retry, v.CanonicalHeadSlotCalled, "Expected CanonicalHeadSlotCalled() to be called")
assert.Equal(t, retry*2+1, v.WaitForChainStartCalled, "Expected WaitForChainStart() to be called")
assert.Equal(t, retry+1, v.WaitForSyncCalled, "Expected WaitForSync() to be called")
assert.Equal(t, 1, v.WaitForActivationCalled, "Expected WaitForActivation() to be called")
}
func TestCancelledContext_WaitsForActivation(t *testing.T) {
@@ -529,14 +528,10 @@ func TestRunnerPushesProposerSettings_ValidContext(t *testing.T) {
ncm := validatormock.NewMockNodeClient(ctrl)
ncm.EXPECT().SyncStatus(liveCtx, gomock.Any()).Return(&ethpb.SyncStatus{Syncing: false}, nil)
ccm := validatormock.NewMockChainClient(ctrl)
ccm.EXPECT().ChainHead(liveCtx, gomock.Any()).Return(&ethpb.ChainHead{}, nil).Do(func(_, _ any) { delay(t) })
// Setup the actual validator service.
v := &validator{
validatorClient: vcm,
nodeClient: ncm,
chainClient: ccm,
db: testing2.SetupDB(t, t.TempDir(), [][fieldparams.BLSPubkeyLength]byte{}, false),
interopKeysConfig: &local.InteropKeymanagerConfig{
NumValidatorKeys: uint64(params.BeaconConfig().SlotsPerEpoch) * 4, // 4 Attesters per slot.

View File

@@ -40,7 +40,6 @@ type FakeValidator struct {
WaitForWalletInitializationCalled bool
NextSlotCalled bool
WaitForActivationCalled int
CanonicalHeadSlotCalled int
WaitForSyncCalled int
RetryTillSuccess int
ProposeBlockArg1 uint64
@@ -131,15 +130,6 @@ func (fv *FakeValidator) SlasherReady(_ context.Context) error {
return nil
}
// CanonicalHeadSlot for mocking.
func (fv *FakeValidator) CanonicalHeadSlot(_ context.Context) (primitives.Slot, error) {
fv.CanonicalHeadSlotCalled++
if fv.RetryTillSuccess > fv.CanonicalHeadSlotCalled {
return 0, api.ErrConnectionIssue
}
return 0, nil
}
// SlotDeadline for mocking.
func (fv *FakeValidator) SlotDeadline(_ primitives.Slot) time.Time {
fv.SlotDeadlineCalled = true

View File

@@ -138,6 +138,10 @@ func (v *validator) Done() {
}
}
func (v *validator) GenesisTime() uint64 {
return v.genesisTime
}
func (v *validator) EventsChan() <-chan *eventClient.Event {
return v.eventsChannel
}
@@ -407,18 +411,6 @@ func (v *validator) checkAndLogValidatorStatus() bool {
return someAreActive
}
// CanonicalHeadSlot returns the slot of canonical block currently found in the
// beacon chain via RPC.
func (v *validator) CanonicalHeadSlot(ctx context.Context) (primitives.Slot, error) {
ctx, span := trace.StartSpan(ctx, "validator.CanonicalHeadSlot")
defer span.End()
head, err := v.chainClient.ChainHead(ctx, &emptypb.Empty{})
if err != nil {
return 0, errors.Wrap(client.ErrConnectionIssue, err.Error())
}
return head.HeadSlot, nil
}
// NextSlot emits the next slot number at the start time of that slot.
func (v *validator) NextSlot() <-chan primitives.Slot {
return v.ticker.C()

View File

@@ -296,38 +296,6 @@ func TestWaitForChainStart_ReceiveErrorFromStream(t *testing.T) {
assert.ErrorContains(t, want, err)
}
func TestCanonicalHeadSlot_FailedRPC(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
client := validatormock.NewMockChainClient(ctrl)
v := validator{
chainClient: client,
genesisTime: 1,
}
client.EXPECT().ChainHead(
gomock.Any(),
gomock.Any(),
).Return(nil, errors.New("failed"))
_, err := v.CanonicalHeadSlot(t.Context())
assert.ErrorContains(t, "failed", err)
}
func TestCanonicalHeadSlot_OK(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
client := validatormock.NewMockChainClient(ctrl)
v := validator{
chainClient: client,
}
client.EXPECT().ChainHead(
gomock.Any(),
gomock.Any(),
).Return(&ethpb.ChainHead{HeadSlot: 0}, nil)
headSlot, err := v.CanonicalHeadSlot(t.Context())
require.NoError(t, err)
assert.Equal(t, primitives.Slot(0), headSlot, "Mismatch slots")
}
func TestWaitSync_ContextCanceled(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()