feat: add validator client option to disable skipping slots (#8631)

**Motivation**

We don't wanna skip slots if slot processing takes longer than slot
duration in a distributed setup as delays might get caused by hanging
HTTP requests due to DVT middleware not reaching the signature
threshold.

**Description**

Adds new flag `--clock.skipSlots` (default: `true`) to validator client
to allow disabling skipping slots by setting it to `false`.

**Note:** it will always be set to `false` if `--distributed` flag is
set as the behavior is not desired in a DVT cluster as assumptions about
beacon node change and skipping slots does more harm than good.

Related issue https://github.com/ChainSafe/lodestar/issues/5314
This commit is contained in:
Nico Flaig
2025-11-28 02:57:47 +01:00
committed by GitHub
parent ac3059c9b9
commit dcab62468a
5 changed files with 98 additions and 4 deletions

View File

@@ -183,6 +183,12 @@ export async function validatorHandler(args: IValidatorCliArgs & GlobalArgs): Pr
fetch: args["externalSigner.fetch"],
fetchInterval: args["externalSigner.fetchInterval"],
},
clock: {
// We don't wanna skip slots if slot processing takes longer than slot duration
// in a distributed setup as delays might get caused by hanging HTTP requests
// due to DVT middleware not reaching the signature threshold
skipSlots: args["clock.skipSlots"] ?? !args.distributed,
},
},
metrics
);

View File

@@ -61,6 +61,8 @@ export type IValidatorCliArgs = AccountValidatorArgs &
"http.requestWireFormat"?: string;
"http.responseWireFormat"?: string;
"clock.skipSlots"?: boolean;
"externalSigner.url"?: string;
"externalSigner.pubkeys"?: string[];
"externalSigner.fetch"?: boolean;
@@ -331,6 +333,12 @@ export const validatorOptions: CliCommandOptions<IValidatorCliArgs> = {
group: "http",
},
"clock.skipSlots": {
hidden: true,
description: "Skip slots when tasks take more than one slot to run",
type: "boolean",
},
// External signer
"externalSigner.url": {

View File

@@ -6,6 +6,10 @@ import {ErrorAborted, Logger, isErrorAborted, sleep} from "@lodestar/utils";
type RunEveryFn = (slot: Slot, signal: AbortSignal) => Promise<void>;
export type ClockOptions = {
skipSlots?: boolean;
};
export interface IClock {
readonly genesisTime: number;
readonly secondsPerSlot: number;
@@ -34,7 +38,11 @@ export class Clock implements IClock {
private readonly logger: Logger;
private readonly fns: {timeItem: TimeItem; fn: RunEveryFn}[] = [];
constructor(config: ChainForkConfig, logger: Logger, opts: {genesisTime: number}) {
constructor(
config: ChainForkConfig,
logger: Logger,
private readonly opts: {genesisTime: number} & ClockOptions
) {
this.genesisTime = opts.genesisTime;
this.secondsPerSlot = config.SLOT_DURATION_MS / 1000;
this.config = config;
@@ -98,10 +106,15 @@ export class Clock implements IClock {
let slotOrEpoch = timeItem === TimeItem.Slot ? slot : computeEpochAtSlot(slot);
while (!signal.aborted) {
// Must catch fn() to ensure `sleep()` is awaited both for resolve and reject
await fn(slotOrEpoch, signal).catch((e: Error) => {
const task = fn(slotOrEpoch, signal).catch((e: Error) => {
if (!isErrorAborted(e)) this.logger.error("Error on runEvery fn", {}, e);
});
if (timeItem !== TimeItem.Slot || this.opts.skipSlots !== false) {
// await response to only continue with next task if current task finished within slot
await task;
}
try {
await sleep(this.timeUntilNext(timeItem), signal);
// calling getCurrentSlot here may not be correct when we're close to the next slot

View File

@@ -20,7 +20,7 @@ import {SyncingStatusTracker} from "./services/syncingStatusTracker.js";
import {Signer, ValidatorProposerConfig, ValidatorStore, defaultOptions} from "./services/validatorStore.js";
import {ISlashingProtection, Interchange, InterchangeFormatVersion} from "./slashingProtection/index.js";
import {LodestarValidatorDatabaseController, ProcessShutdownCallback, PubkeyHex} from "./types.js";
import {Clock, IClock} from "./util/clock.js";
import {Clock, ClockOptions, IClock} from "./util/clock.js";
import {NotEqualParamsError, assertEqualParams, getLoggerVc} from "./util/index.js";
export type ValidatorModules = {
@@ -63,6 +63,7 @@ export type ValidatorOptions = {
broadcastValidation?: routes.beacon.BroadcastValidation;
blindedLocal?: boolean;
externalSigner?: ExternalSignerOptions;
clock?: ClockOptions;
};
// TODO: Extend the timeout, and let it be customizable
@@ -167,7 +168,7 @@ export class Validator {
const {db, config: chainConfig, logger, slashingProtection, signers, valProposerConfig} = opts;
const config = createBeaconConfig(chainConfig, genesis.genesisValidatorsRoot);
const controller = opts.abortController;
const clock = new Clock(config, logger, {genesisTime: Number(genesis.genesisTime)});
const clock = new Clock(config, logger, {genesisTime: Number(genesis.genesisTime), ...opts.clock});
const loggerVc = getLoggerVc(logger, clock);
let api: ApiClient;

View File

@@ -80,6 +80,72 @@ describe("util / Clock", () => {
expect(onEpoch).toHaveBeenNthCalledWith(2, 1, expect.any(AbortSignal));
});
it("Should skip slots when tasks take longer than one slot to run", async () => {
const genesisTime = Math.floor(Date.now() / 1000) - config.SLOT_DURATION_MS / 2000;
const clock = new Clock(config, logger, {genesisTime, skipSlots: true});
const slotsCalled: number[] = [];
const onSlot = vi.fn().mockImplementation(async (slot: number) => {
slotsCalled.push(slot);
// First task takes longer than a slot
if (slot === 0) {
await new Promise((resolve) => setTimeout(resolve, config.SLOT_DURATION_MS + 100));
}
});
clock.runEverySlot(onSlot);
clock.start(controller.signal);
// Must run once immediately
expect(onSlot).toHaveBeenCalledOnce();
expect(onSlot).toHaveBeenNthCalledWith(1, 0, expect.any(AbortSignal));
expect(slotsCalled).toEqual([0]);
// Advance time to slot 2
await vi.advanceTimersByTimeAsync(config.SLOT_DURATION_MS * 2 + 200);
// Slot 1 should be skipped and we should be on slot 2
expect(onSlot).toHaveBeenCalledTimes(2);
expect(onSlot).toHaveBeenNthCalledWith(2, 2, expect.any(AbortSignal));
expect(slotsCalled).toEqual([0, 2]);
});
it("Should not skip slots when option is disabled", async () => {
const genesisTime = Math.floor(Date.now() / 1000) - config.SLOT_DURATION_MS / 2000;
const clock = new Clock(config, logger, {genesisTime, skipSlots: false});
const slotsCalled: number[] = [];
const onSlot = vi.fn().mockImplementation(async (slot: number) => {
slotsCalled.push(slot);
// First task takes longer than a slot
if (slot === 0) {
await new Promise((resolve) => setTimeout(resolve, config.SLOT_DURATION_MS + 100));
}
});
clock.runEverySlot(onSlot);
clock.start(controller.signal);
// Must run once immediately
expect(onSlot).toHaveBeenCalledOnce();
expect(slotsCalled).toEqual([0]);
// Should trigger slot 1 even though slot 0 is still running
await vi.advanceTimersByTimeAsync(config.SLOT_DURATION_MS);
expect(onSlot).toHaveBeenCalledTimes(2);
expect(slotsCalled).toEqual([0, 1]);
// Should trigger slot 2
await vi.advanceTimersByTimeAsync(config.SLOT_DURATION_MS);
expect(onSlot).toHaveBeenCalledTimes(3);
expect(slotsCalled).toEqual([0, 1, 2]);
// All slots should be called without skipping
expect(onSlot).toHaveBeenNthCalledWith(1, 0, expect.any(AbortSignal));
expect(onSlot).toHaveBeenNthCalledWith(2, 1, expect.any(AbortSignal));
expect(onSlot).toHaveBeenNthCalledWith(3, 2, expect.any(AbortSignal));
});
describe("getCurrentSlot", () => {
const testConfig = {SLOT_DURATION_MS: 12 * 1000} as BeaconConfig;
const genesisTime = Math.floor(new Date("2021-01-01").getTime() / 1000);