move validator run slot ticker (#15479)

* moving the ticker from chain start to right before the main loop and also after the wait for activation edge case

* fixing unit test

* adding in a unit test

* adding in comment based on potuz's feedback
This commit is contained in:
james-prysm
2025-07-11 14:39:52 -05:00
committed by GitHub
parent 83943b5dd8
commit 78f8411ad2
8 changed files with 64 additions and 7 deletions

View File

@@ -0,0 +1,3 @@
### Fixed
- Fixes edge case starting validator client with new validator keys starts the slot ticker too early resulting in replayed slots in the main runner loop. Fixes edge case of replayed slots when waiting for account acivations.

View File

@@ -362,6 +362,18 @@ func (mr *MockValidatorMockRecorder) SetProposerSettings(arg0, arg1 any) *gomock
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetProposerSettings", reflect.TypeOf((*MockValidator)(nil).SetProposerSettings), arg0, arg1)
}
// SetTicker mocks base method.
func (m *MockValidator) SetTicker() {
m.ctrl.T.Helper()
m.ctrl.Call(m, "SetTicker")
}
// SetTicker indicates an expected call of SetTicker.
func (mr *MockValidatorMockRecorder) SetTicker() *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetTicker", reflect.TypeOf((*MockValidator)(nil).SetTicker))
}
// SignValidatorRegistrationRequest mocks base method.
func (m *MockValidator) SignValidatorRegistrationRequest(arg0 context.Context, arg1 iface.SigningFunc, arg2 *eth.ValidatorRegistrationV1) (*eth.SignedValidatorRegistrationV1, bool, error) {
m.ctrl.T.Helper()

View File

@@ -1,6 +1,7 @@
package slots
import (
"context"
"math"
"testing"
"time"
@@ -701,3 +702,27 @@ func TestToForkVersion(t *testing.T) {
require.Equal(t, version.Phase0, result)
})
}
func TestSlotTickerReplayBehaviour(t *testing.T) {
secondsPerslot := uint64(1)
st := NewSlotTicker(time.Unix(time.Now().Unix(), 0), secondsPerslot) // 1-second period
const ticks = 5
ctx, cancel := context.WithTimeout(t.Context(), 6*time.Second) // make the timeout very close
defer cancel()
time.Sleep(time.Duration(ticks) * time.Second) // simulate slow consumer by delaying tick consumption
counter := 0
prevTime := time.Now()
for counter < ticks {
select {
case <-st.C(): // simulate ticks faster than supposed iteration due to replaying old ticks
assert.Equal(t, true, time.Now().Sub(prevTime) < time.Duration(secondsPerslot)*time.Second)
counter++
prevTime = time.Now()
case <-ctx.Done(): // timed out before enough ticks arrived
t.Fatalf("expected %d ticks, got %d", ticks, counter)
}
}
require.Equal(t, ticks, counter)
}

View File

@@ -70,6 +70,7 @@ type Validator interface {
DeleteGraffiti(ctx context.Context, pubKey [fieldparams.BLSPubkeyLength]byte) error
Host() string
FindHealthyHost(ctx context.Context) bool
SetTicker()
}
// SigningFunc interface defines a type for the function that signs a message

View File

@@ -67,7 +67,6 @@ func newRunner(ctx context.Context, v iface.Validator, monitor *healthMonitor) (
v.Done()
return nil, errors.Wrap(err, "failed to update proposer settings")
}
return &runner{
validator: v,
healthMonitor: monitor,
@@ -86,7 +85,7 @@ func (r *runner) run(ctx context.Context) {
v := r.validator
cleanup := v.Done
defer cleanup()
v.SetTicker()
for {
select {
case <-ctx.Done():
@@ -169,6 +168,9 @@ func onAccountsChanged(ctx context.Context, v iface.Validator, current [][48]byt
err := v.WaitForActivation(ctx)
if err != nil {
log.WithError(err).Warn("Could not wait for validator activation")
} else {
log.Debug("Resetting slot ticker after waiting for validator activation.")
v.SetTicker()
}
}
}

View File

@@ -333,3 +333,6 @@ func (*FakeValidator) Host() string {
func (fv *FakeValidator) FindHealthyHost(_ context.Context) bool {
return fv.CanChangeHost
}
func (fv *FakeValidator) SetTicker() {
}

View File

@@ -310,7 +310,6 @@ func (v *validator) WaitForChainStart(ctx context.Context) error {
return errors.Wrap(err, "could not save genesis validators root")
}
v.setTicker()
return nil
}
@@ -328,11 +327,25 @@ func (v *validator) WaitForChainStart(ctx context.Context) error {
)
}
v.setTicker()
return nil
}
func (v *validator) setTicker() {
func (v *validator) SetTicker() {
// If a ticker already exists, stop it before creating a new one
// to prevent resource leaks.
// note to reader:
// This function chooses to adapt to the existing slot ticker instead of changing how it works
// The slot ticker will currently start from genesis time but tick based on the current time.
// This means that sometimes we need to reset the ticker to avoid replaying old ticks on a slow consumer of the ticks.
// i.e.,
// 1. tick starts at 0
// 2. loop stops consuming on slot 10 due to accounts changed tigger with no active keys
// 3. new active keys are added in slot 20 resolving wait for activation
// 4. new tick starts ticking from slot 20 instead of slot 10
if v.ticker != nil {
v.ticker.Done()
}
// Once the ChainStart log is received, we update the genesis time of the validator client
// and begin a slot ticker used to track the current slot the beacon node is in.
v.ticker = slots.NewSlotTicker(time.Unix(int64(v.genesisTime), 0), params.BeaconConfig().SecondsPerSlot)

View File

@@ -192,7 +192,6 @@ func TestWaitForChainStart_SetsGenesisInfo(t *testing.T) {
assert.DeepEqual(t, genesisValidatorsRoot[:], savedGenValRoot, "Unexpected saved genesis validators root")
assert.Equal(t, genesis, v.genesisTime, "Unexpected chain start time")
assert.NotNil(t, v.ticker, "Expected ticker to be set, received nil")
// Make sure there are no errors running if it is the same data.
client.EXPECT().WaitForChainStart(
@@ -236,7 +235,6 @@ func TestWaitForChainStart_SetsGenesisInfo_IncorrectSecondTry(t *testing.T) {
assert.DeepEqual(t, genesisValidatorsRoot[:], savedGenValRoot, "Unexpected saved genesis validators root")
assert.Equal(t, genesis, v.genesisTime, "Unexpected chain start time")
assert.NotNil(t, v.ticker, "Expected ticker to be set, received nil")
genesisValidatorsRoot = bytesutil.ToBytes32([]byte("badvalidators"))