From c26cf6aa830c739e009e93d6b44162407ea1fb25 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Tue, 17 Feb 2026 23:46:05 +0100 Subject: [PATCH] feat(cron): add default stagger controls for scheduled jobs --- CHANGELOG.md | 2 + docs/automation/cron-jobs.md | 32 ++- docs/automation/cron-vs-heartbeat.md | 8 +- src/cli/cron-cli.test.ts | 235 ++++++++++++++++-- src/cli/cron-cli/register.cron-add.ts | 29 ++- src/cli/cron-cli/register.cron-edit.ts | 48 ++++ src/cli/cron-cli/shared.test.ts | 50 ++++ src/cli/cron-cli/shared.ts | 12 +- src/cron/normalize.test.ts | 44 ++++ src/cron/normalize.ts | 23 +- src/cron/service.issue-regressions.test.ts | 14 +- src/cron/service.jobs.test.ts | 103 +++++++- .../service.jobs.top-of-hour-stagger.test.ts | 93 +++++++ src/cron/service.store.migration.test.ts | 65 +++++ src/cron/service/jobs.ts | 90 +++++-- src/cron/service/store.ts | 25 +- src/cron/stagger.test.ts | 36 +++ src/cron/stagger.ts | 45 ++++ src/cron/types.ts | 8 +- src/gateway/protocol/schema/cron.ts | 1 + 20 files changed, 907 insertions(+), 56 deletions(-) create mode 100644 src/cron/service.jobs.top-of-hour-stagger.test.ts create mode 100644 src/cron/stagger.test.ts create mode 100644 src/cron/stagger.ts diff --git a/CHANGELOG.md b/CHANGELOG.md index 3b20be3238..108861360e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,6 +18,8 @@ Docs: https://docs.openclaw.ai - Slack: add external-select flow for large argument menus. (#18496) Thanks @Solvely-Colin. - Discord: expose native `/exec` command options (host/security/ask/node) so Discord slash commands get autocomplete and structured inputs. Thanks @thewilloftheshadow. - Discord: allow reusable interactive components with `components.reusable=true` so buttons, selects, and forms can be used multiple times before expiring. Thanks @thewilloftheshadow. +- Cron/Gateway: separate per-job webhook delivery (`delivery.mode = "webhook"`) from announce delivery, enforce valid HTTP(S) webhook URLs, and keep a temporary legacy `notify + cron.webhook` fallback for stored jobs. (#17901) Thanks @advaitpaliwal. +- Cron/CLI: add deterministic default stagger for recurring top-of-hour cron schedules (including 6-field seconds cron), auto-migrate existing jobs to persisted `schedule.staggerMs`, and add `openclaw cron add/edit --stagger ` plus `--exact` overrides for per-job timing control. - Discord: add per-button `allowedUsers` allowlist for interactive components to restrict who can click buttons. Thanks @thewilloftheshadow. - Mattermost: add emoji reaction actions plus reaction event notifications, including an explicit boolean `remove` flag to avoid accidental removals. (#18608) Thanks @echo931. - Commands/Subagents: add `/subagents spawn` for deterministic subagent activation from chat commands. (#18218) Thanks @JoshuaLelon. diff --git a/docs/automation/cron-jobs.md b/docs/automation/cron-jobs.md index 96fd46f99d..aae5f58fdf 100644 --- a/docs/automation/cron-jobs.md +++ b/docs/automation/cron-jobs.md @@ -115,11 +115,22 @@ Cron supports three schedule kinds: - `at`: one-shot timestamp via `schedule.at` (ISO 8601). - `every`: fixed interval (ms). -- `cron`: 5-field cron expression with optional IANA timezone. +- `cron`: 5-field cron expression (or 6-field with seconds) with optional IANA timezone. Cron expressions use `croner`. If a timezone is omitted, the Gateway host’s local timezone is used. +To reduce top-of-hour load spikes across many gateways, OpenClaw applies a +deterministic per-job stagger window of up to 5 minutes for recurring +top-of-hour expressions (for example `0 * * * *`, `0 */2 * * *`). Fixed-hour +expressions such as `0 7 * * *` remain exact. + +For any cron schedule, you can set an explicit stagger window with `schedule.staggerMs` +(`0` keeps exact timing). CLI shortcuts: + +- `--stagger 30s` (or `1m`, `5m`) to set an explicit stagger window. +- `--exact` to force `staggerMs = 0`. + ### Main vs isolated execution #### Main session jobs (system events) @@ -408,6 +419,19 @@ openclaw cron add \ --to "+15551234567" ``` +Recurring cron job with explicit 30-second stagger: + +```bash +openclaw cron add \ + --name "Minute watcher" \ + --cron "0 * * * * *" \ + --tz "UTC" \ + --stagger 30s \ + --session isolated \ + --message "Run minute watcher checks." \ + --announce +``` + Recurring isolated job (deliver to a Telegram topic): ```bash @@ -465,6 +489,12 @@ openclaw cron edit \ --thinking low ``` +Force an existing cron job to run exactly on schedule (no stagger): + +```bash +openclaw cron edit --exact +``` + Run history: ```bash diff --git a/docs/automation/cron-vs-heartbeat.md b/docs/automation/cron-vs-heartbeat.md index 423565d4f3..a138e721ae 100644 --- a/docs/automation/cron-vs-heartbeat.md +++ b/docs/automation/cron-vs-heartbeat.md @@ -74,7 +74,9 @@ See [Heartbeat](/gateway/heartbeat) for full configuration. ## Cron: Precise Scheduling -Cron jobs run at **exact times** and can run in isolated sessions without affecting main context. +Cron jobs run at precise times and can run in isolated sessions without affecting main context. +Recurring top-of-hour schedules are automatically spread by a deterministic +per-job offset in a 0-5 minute window. ### When to use cron @@ -87,7 +89,9 @@ Cron jobs run at **exact times** and can run in isolated sessions without affect ### Cron advantages -- **Exact timing**: 5-field cron expressions with timezone support. +- **Precise timing**: 5-field or 6-field (seconds) cron expressions with timezone support. +- **Built-in load spreading**: recurring top-of-hour schedules are staggered by up to 5 minutes by default. +- **Per-job control**: override stagger with `--stagger ` or force exact timing with `--exact`. - **Session isolation**: Runs in `cron:` without polluting main history. - **Model overrides**: Use a cheaper or more powerful model per job. - **Delivery control**: Isolated jobs default to `announce` (summary); choose `none` as needed. diff --git a/src/cli/cron-cli.test.ts b/src/cli/cron-cli.test.ts index 9677c03e9b..f86c96ad1f 100644 --- a/src/cli/cron-cli.test.ts +++ b/src/cli/cron-cli.test.ts @@ -1,14 +1,18 @@ import { Command } from "commander"; import { describe, expect, it, vi } from "vitest"; -const callGatewayFromCli = vi.fn( - async (method: string, _opts: unknown, params?: unknown, _timeoutMs?: number) => { - if (method === "cron.status") { - return { enabled: true }; - } - return { ok: true, params }; - }, -); +const defaultGatewayMock = async ( + method: string, + _opts: unknown, + params?: unknown, + _timeoutMs?: number, +) => { + if (method === "cron.status") { + return { enabled: true }; + } + return { ok: true, params }; +}; +const callGatewayFromCli = vi.fn(defaultGatewayMock); vi.mock("./gateway-rpc.js", async () => { const actual = await vi.importActual("./gateway-rpc.js"); @@ -45,8 +49,13 @@ function buildProgram() { return program; } +function resetGatewayMock() { + callGatewayFromCli.mockReset(); + callGatewayFromCli.mockImplementation(defaultGatewayMock); +} + async function runCronEditAndGetPatch(editArgs: string[]): Promise { - callGatewayFromCli.mockClear(); + resetGatewayMock(); const program = buildProgram(); await program.parseAsync(["cron", "edit", "job-1", ...editArgs], { from: "user" }); const updateCall = callGatewayFromCli.mock.calls.find((call) => call[0] === "cron.update"); @@ -55,7 +64,7 @@ async function runCronEditAndGetPatch(editArgs: string[]): Promise { it("trims model and thinking on cron add", { timeout: 60_000 }, async () => { - callGatewayFromCli.mockClear(); + resetGatewayMock(); const program = buildProgram(); @@ -89,7 +98,7 @@ describe("cron cli", () => { }); it("defaults isolated cron add to announce delivery", async () => { - callGatewayFromCli.mockClear(); + resetGatewayMock(); const program = buildProgram(); @@ -116,7 +125,7 @@ describe("cron cli", () => { }); it("infers sessionTarget from payload when --session is omitted", async () => { - callGatewayFromCli.mockClear(); + resetGatewayMock(); const program = buildProgram(); @@ -130,7 +139,7 @@ describe("cron cli", () => { expect(params?.sessionTarget).toBe("main"); expect(params?.payload?.kind).toBe("systemEvent"); - callGatewayFromCli.mockClear(); + resetGatewayMock(); await program.parseAsync( ["cron", "add", "--name", "Isolated task", "--cron", "* * * * *", "--message", "hello"], @@ -144,7 +153,7 @@ describe("cron cli", () => { }); it("supports --keep-after-run on cron add", async () => { - callGatewayFromCli.mockClear(); + resetGatewayMock(); const program = buildProgram(); @@ -171,7 +180,7 @@ describe("cron cli", () => { }); it("sends agent id on cron add", async () => { - callGatewayFromCli.mockClear(); + resetGatewayMock(); const program = buildProgram(); @@ -199,7 +208,7 @@ describe("cron cli", () => { }); it("omits empty model and thinking on cron edit", async () => { - callGatewayFromCli.mockClear(); + resetGatewayMock(); const program = buildProgram(); @@ -218,7 +227,7 @@ describe("cron cli", () => { }); it("trims model and thinking on cron edit", async () => { - callGatewayFromCli.mockClear(); + resetGatewayMock(); const program = buildProgram(); @@ -247,7 +256,7 @@ describe("cron cli", () => { }); it("sets and clears agent id on cron edit", async () => { - callGatewayFromCli.mockClear(); + resetGatewayMock(); const program = buildProgram(); @@ -259,7 +268,7 @@ describe("cron cli", () => { const patch = updateCall?.[2] as { patch?: { agentId?: unknown } }; expect(patch?.patch?.agentId).toBe("ops"); - callGatewayFromCli.mockClear(); + resetGatewayMock(); await program.parseAsync(["cron", "edit", "job-2", "--clear-agent"], { from: "user", }); @@ -269,7 +278,7 @@ describe("cron cli", () => { }); it("allows model/thinking updates without --message", async () => { - callGatewayFromCli.mockClear(); + resetGatewayMock(); const program = buildProgram(); @@ -288,7 +297,7 @@ describe("cron cli", () => { }); it("updates delivery settings without requiring --message", async () => { - callGatewayFromCli.mockClear(); + resetGatewayMock(); const program = buildProgram(); @@ -313,7 +322,7 @@ describe("cron cli", () => { }); it("supports --no-deliver on cron edit", async () => { - callGatewayFromCli.mockClear(); + resetGatewayMock(); const program = buildProgram(); @@ -329,7 +338,7 @@ describe("cron cli", () => { }); it("does not include undefined delivery fields when updating message", async () => { - callGatewayFromCli.mockClear(); + resetGatewayMock(); const program = buildProgram(); @@ -404,4 +413,184 @@ describe("cron cli", () => { expect(patch?.patch?.delivery?.mode).toBe("announce"); expect(patch?.patch?.delivery?.bestEffort).toBe(false); }); + + it("sets explicit stagger for cron add", async () => { + resetGatewayMock(); + const program = buildProgram(); + + await program.parseAsync( + [ + "cron", + "add", + "--name", + "staggered", + "--cron", + "0 * * * *", + "--stagger", + "45s", + "--session", + "main", + "--system-event", + "tick", + ], + { from: "user" }, + ); + + const addCall = callGatewayFromCli.mock.calls.find((call) => call[0] === "cron.add"); + const params = addCall?.[2] as { schedule?: { kind?: string; staggerMs?: number } }; + expect(params?.schedule?.kind).toBe("cron"); + expect(params?.schedule?.staggerMs).toBe(45_000); + }); + + it("sets exact cron mode on add", async () => { + resetGatewayMock(); + const program = buildProgram(); + + await program.parseAsync( + [ + "cron", + "add", + "--name", + "exact", + "--cron", + "0 * * * *", + "--exact", + "--session", + "main", + "--system-event", + "tick", + ], + { from: "user" }, + ); + + const addCall = callGatewayFromCli.mock.calls.find((call) => call[0] === "cron.add"); + const params = addCall?.[2] as { schedule?: { kind?: string; staggerMs?: number } }; + expect(params?.schedule?.kind).toBe("cron"); + expect(params?.schedule?.staggerMs).toBe(0); + }); + + it("rejects --stagger with --exact on add", async () => { + resetGatewayMock(); + const program = buildProgram(); + + await expect( + program.parseAsync( + [ + "cron", + "add", + "--name", + "invalid", + "--cron", + "0 * * * *", + "--stagger", + "1m", + "--exact", + "--session", + "main", + "--system-event", + "tick", + ], + { from: "user" }, + ), + ).rejects.toThrow("__exit__:1"); + }); + + it("rejects --stagger when schedule is not cron", async () => { + resetGatewayMock(); + const program = buildProgram(); + + await expect( + program.parseAsync( + [ + "cron", + "add", + "--name", + "invalid", + "--every", + "10m", + "--stagger", + "30s", + "--session", + "main", + "--system-event", + "tick", + ], + { from: "user" }, + ), + ).rejects.toThrow("__exit__:1"); + }); + + it("sets explicit stagger for cron edit", async () => { + resetGatewayMock(); + const program = buildProgram(); + + await program.parseAsync(["cron", "edit", "job-1", "--cron", "0 * * * *", "--stagger", "30s"], { + from: "user", + }); + + const updateCall = callGatewayFromCli.mock.calls.find((call) => call[0] === "cron.update"); + const patch = updateCall?.[2] as { + patch?: { schedule?: { kind?: string; staggerMs?: number } }; + }; + expect(patch?.patch?.schedule?.kind).toBe("cron"); + expect(patch?.patch?.schedule?.staggerMs).toBe(30_000); + }); + + it("applies --exact to existing cron job without requiring --cron on edit", async () => { + resetGatewayMock(); + callGatewayFromCli.mockImplementation( + async (method: string, _opts: unknown, params?: unknown) => { + if (method === "cron.status") { + return { enabled: true }; + } + if (method === "cron.list") { + return { + jobs: [ + { + id: "job-1", + schedule: { kind: "cron", expr: "0 */2 * * *", tz: "UTC", staggerMs: 300_000 }, + }, + ], + }; + } + return { ok: true, params }; + }, + ); + const program = buildProgram(); + + await program.parseAsync(["cron", "edit", "job-1", "--exact"], { from: "user" }); + + const updateCall = callGatewayFromCli.mock.calls.find((call) => call[0] === "cron.update"); + const patch = updateCall?.[2] as { + patch?: { schedule?: { kind?: string; expr?: string; tz?: string; staggerMs?: number } }; + }; + expect(patch?.patch?.schedule).toEqual({ + kind: "cron", + expr: "0 */2 * * *", + tz: "UTC", + staggerMs: 0, + }); + }); + + it("rejects --exact on edit when existing job is not cron", async () => { + resetGatewayMock(); + callGatewayFromCli.mockImplementation( + async (method: string, _opts: unknown, params?: unknown) => { + if (method === "cron.status") { + return { enabled: true }; + } + if (method === "cron.list") { + return { + jobs: [{ id: "job-1", schedule: { kind: "every", everyMs: 60_000 } }], + }; + } + return { ok: true, params }; + }, + ); + const program = buildProgram(); + + await expect( + program.parseAsync(["cron", "edit", "job-1", "--exact"], { from: "user" }), + ).rejects.toThrow("__exit__:1"); + }); }); diff --git a/src/cli/cron-cli/register.cron-add.ts b/src/cli/cron-cli/register.cron-add.ts index 29df5085fb..1e489bd0a5 100644 --- a/src/cli/cron-cli/register.cron-add.ts +++ b/src/cli/cron-cli/register.cron-add.ts @@ -1,9 +1,9 @@ import type { Command } from "commander"; import type { CronJob } from "../../cron/types.js"; +import type { GatewayRpcOpts } from "../gateway-rpc.js"; import { danger } from "../../globals.js"; import { sanitizeAgentId } from "../../routing/session-key.js"; import { defaultRuntime } from "../../runtime.js"; -import type { GatewayRpcOpts } from "../gateway-rpc.js"; import { addGatewayClientOptions, callGatewayFromCli } from "../gateway-rpc.js"; import { parsePositiveIntOrUndefined } from "../program/helpers.js"; import { @@ -74,8 +74,10 @@ export function registerCronAddCommand(cron: Command) { .option("--wake ", "Wake mode (now|next-heartbeat)", "now") .option("--at ", "Run once at time (ISO) or +duration (e.g. 20m)") .option("--every ", "Run every duration (e.g. 10m, 1h)") - .option("--cron ", "Cron expression (5-field)") + .option("--cron ", "Cron expression (5-field or 6-field with seconds)") .option("--tz ", "Timezone for cron expressions (IANA)", "") + .option("--stagger ", "Cron stagger window (e.g. 30s, 5m)") + .option("--exact", "Disable cron staggering (set stagger to 0)", false) .option("--system-event ", "System event payload (main session)") .option("--message ", "Agent message payload") .option("--thinking ", "Thinking level for agent jobs (off|minimal|low|medium|high)") @@ -93,6 +95,12 @@ export function registerCronAddCommand(cron: Command) { .option("--json", "Output JSON", false) .action(async (opts: GatewayRpcOpts & Record, cmd?: Command) => { try { + const staggerRaw = typeof opts.stagger === "string" ? opts.stagger.trim() : ""; + const useExact = Boolean(opts.exact); + if (staggerRaw && useExact) { + throw new Error("Choose either --stagger or --exact, not both"); + } + const schedule = (() => { const at = typeof opts.at === "string" ? opts.at : ""; const every = typeof opts.every === "string" ? opts.every : ""; @@ -101,6 +109,9 @@ export function registerCronAddCommand(cron: Command) { if (chosen !== 1) { throw new Error("Choose exactly one schedule: --at, --every, or --cron"); } + if ((useExact || staggerRaw) && !cronExpr) { + throw new Error("--stagger/--exact are only valid with --cron"); + } if (at) { const atIso = parseAt(at); if (!atIso) { @@ -115,10 +126,24 @@ export function registerCronAddCommand(cron: Command) { } return { kind: "every" as const, everyMs }; } + const staggerMs = (() => { + if (useExact) { + return 0; + } + if (!staggerRaw) { + return undefined; + } + const parsed = parseDurationMs(staggerRaw); + if (!parsed) { + throw new Error("Invalid --stagger; use e.g. 30s, 1m, 5m"); + } + return parsed; + })(); return { kind: "cron" as const, expr: cronExpr, tz: typeof opts.tz === "string" && opts.tz.trim() ? opts.tz.trim() : undefined, + staggerMs, }; })(); diff --git a/src/cli/cron-cli/register.cron-edit.ts b/src/cli/cron-cli/register.cron-edit.ts index bced50e7f0..f1e6c74d77 100644 --- a/src/cli/cron-cli/register.cron-edit.ts +++ b/src/cli/cron-cli/register.cron-edit.ts @@ -1,4 +1,5 @@ import type { Command } from "commander"; +import type { CronJob } from "../../cron/types.js"; import { danger } from "../../globals.js"; import { sanitizeAgentId } from "../../routing/session-key.js"; import { defaultRuntime } from "../../runtime.js"; @@ -41,6 +42,8 @@ export function registerCronEditCommand(cron: Command) { .option("--every ", "Set interval duration like 10m") .option("--cron ", "Set cron expression") .option("--tz ", "Timezone for cron expressions (IANA)") + .option("--stagger ", "Cron stagger window (e.g. 30s, 5m)") + .option("--exact", "Disable cron staggering (set stagger to 0)") .option("--system-event ", "Set systemEvent payload") .option("--message ", "Set agentTurn payload message") .option("--thinking ", "Thinking level for agent jobs") @@ -71,6 +74,24 @@ export function registerCronEditCommand(cron: Command) { if (opts.announce && typeof opts.deliver === "boolean") { throw new Error("Choose --announce or --no-deliver (not multiple)."); } + const staggerRaw = typeof opts.stagger === "string" ? opts.stagger.trim() : ""; + const useExact = Boolean(opts.exact); + if (staggerRaw && useExact) { + throw new Error("Choose either --stagger or --exact, not both"); + } + const requestedStaggerMs = (() => { + if (useExact) { + return 0; + } + if (!staggerRaw) { + return undefined; + } + const parsed = parseDurationMs(staggerRaw); + if (!parsed) { + throw new Error("Invalid --stagger; use e.g. 30s, 1m, 5m"); + } + return parsed; + })(); const patch: Record = {}; if (typeof opts.name === "string") { @@ -117,6 +138,12 @@ export function registerCronEditCommand(cron: Command) { if (scheduleChosen > 1) { throw new Error("Choose at most one schedule change"); } + if ( + (requestedStaggerMs !== undefined || typeof opts.tz === "string") && + (opts.at || opts.every) + ) { + throw new Error("--stagger/--exact/--tz are only valid for cron schedules"); + } if (opts.at) { const atIso = parseAt(String(opts.at)); if (!atIso) { @@ -134,6 +161,27 @@ export function registerCronEditCommand(cron: Command) { kind: "cron", expr: String(opts.cron), tz: typeof opts.tz === "string" && opts.tz.trim() ? opts.tz.trim() : undefined, + staggerMs: requestedStaggerMs, + }; + } else if (requestedStaggerMs !== undefined || typeof opts.tz === "string") { + const listed = (await callGatewayFromCli("cron.list", opts, { + includeDisabled: true, + })) as { jobs?: CronJob[] } | null; + const existing = (listed?.jobs ?? []).find((job) => job.id === id); + if (!existing) { + throw new Error(`unknown cron job id: ${id}`); + } + if (existing.schedule.kind !== "cron") { + throw new Error("Current job is not a cron schedule; use --cron to convert first"); + } + const tz = + typeof opts.tz === "string" ? opts.tz.trim() || undefined : existing.schedule.tz; + patch.schedule = { + kind: "cron", + expr: existing.schedule.expr, + tz, + staggerMs: + requestedStaggerMs !== undefined ? requestedStaggerMs : existing.schedule.staggerMs, }; } diff --git a/src/cli/cron-cli/shared.test.ts b/src/cli/cron-cli/shared.test.ts index ffd67c1f2b..fb453a930a 100644 --- a/src/cli/cron-cli/shared.test.ts +++ b/src/cli/cron-cli/shared.test.ts @@ -60,4 +60,54 @@ describe("printCronList", () => { expect(() => printCronList([jobWithTarget], mockRuntime)).not.toThrow(); expect(logs.some((line) => line.includes("isolated"))).toBe(true); }); + + it("shows stagger label for cron schedules", () => { + const logs: string[] = []; + const mockRuntime = { + log: (msg: string) => logs.push(msg), + error: () => {}, + exit: () => {}, + } as RuntimeEnv; + + const job: CronJob = { + id: "staggered-job", + name: "Staggered", + enabled: true, + createdAtMs: Date.now(), + updatedAtMs: Date.now(), + schedule: { kind: "cron", expr: "0 * * * *", staggerMs: 5 * 60_000 }, + sessionTarget: "main", + wakeMode: "next-heartbeat", + payload: { kind: "systemEvent", text: "tick" }, + state: {}, + }; + + printCronList([job], mockRuntime); + expect(logs.some((line) => line.includes("(stagger 5m)"))).toBe(true); + }); + + it("shows exact label for cron schedules with stagger disabled", () => { + const logs: string[] = []; + const mockRuntime = { + log: (msg: string) => logs.push(msg), + error: () => {}, + exit: () => {}, + } as RuntimeEnv; + + const job: CronJob = { + id: "exact-job", + name: "Exact", + enabled: true, + createdAtMs: Date.now(), + updatedAtMs: Date.now(), + schedule: { kind: "cron", expr: "0 7 * * *", staggerMs: 0 }, + sessionTarget: "main", + wakeMode: "next-heartbeat", + payload: { kind: "systemEvent", text: "tick" }, + state: {}, + }; + + printCronList([job], mockRuntime); + expect(logs.some((line) => line.includes("(exact)"))).toBe(true); + }); }); diff --git a/src/cli/cron-cli/shared.ts b/src/cli/cron-cli/shared.ts index 9c74e10854..97b2bbae85 100644 --- a/src/cli/cron-cli/shared.ts +++ b/src/cli/cron-cli/shared.ts @@ -1,10 +1,11 @@ +import type { CronJob, CronSchedule } from "../../cron/types.js"; +import type { GatewayRpcOpts } from "../gateway-rpc.js"; import { listChannelPlugins } from "../../channels/plugins/index.js"; import { parseAbsoluteTimeMs } from "../../cron/parse.js"; -import type { CronJob, CronSchedule } from "../../cron/types.js"; +import { resolveCronStaggerMs } from "../../cron/stagger.js"; import { formatDurationHuman } from "../../infra/format-time/format-duration.ts"; import { defaultRuntime } from "../../runtime.js"; import { colorize, isRich, theme } from "../../terminal/theme.js"; -import type { GatewayRpcOpts } from "../gateway-rpc.js"; import { callGatewayFromCli } from "../gateway-rpc.js"; export const getCronChannelOptions = () => @@ -137,7 +138,12 @@ const formatSchedule = (schedule: CronSchedule) => { if (schedule.kind === "every") { return `every ${formatDurationHuman(schedule.everyMs)}`; } - return schedule.tz ? `cron ${schedule.expr} @ ${schedule.tz}` : `cron ${schedule.expr}`; + const base = schedule.tz ? `cron ${schedule.expr} @ ${schedule.tz}` : `cron ${schedule.expr}`; + const staggerMs = resolveCronStaggerMs(schedule); + if (staggerMs <= 0) { + return `${base} (exact)`; + } + return `${base} (stagger ${formatDurationHuman(staggerMs)})`; }; const formatStatus = (job: CronJob) => { diff --git a/src/cron/normalize.test.ts b/src/cron/normalize.test.ts index c381faddee..71ec7ca3f0 100644 --- a/src/cron/normalize.test.ts +++ b/src/cron/normalize.test.ts @@ -1,5 +1,6 @@ import { describe, expect, it } from "vitest"; import { normalizeCronJobCreate, normalizeCronJobPatch } from "./normalize.js"; +import { DEFAULT_TOP_OF_HOUR_STAGGER_MS } from "./stagger.js"; function expectNormalizedAtSchedule(scheduleInput: Record) { const normalized = normalizeCronJobCreate({ @@ -137,6 +138,40 @@ describe("normalizeCronJobCreate", () => { expectNormalizedAtSchedule({ kind: "at", atMs: "2026-01-12T18:00:00" }); }); + it("defaults cron stagger for recurring top-of-hour schedules", () => { + const normalized = normalizeCronJobCreate({ + name: "hourly", + enabled: true, + schedule: { kind: "cron", expr: "0 * * * *", tz: "UTC" }, + sessionTarget: "main", + wakeMode: "next-heartbeat", + payload: { + kind: "systemEvent", + text: "tick", + }, + }) as unknown as Record; + + const schedule = normalized.schedule as Record; + expect(schedule.staggerMs).toBe(DEFAULT_TOP_OF_HOUR_STAGGER_MS); + }); + + it("preserves explicit exact cron schedule", () => { + const normalized = normalizeCronJobCreate({ + name: "exact", + enabled: true, + schedule: { kind: "cron", expr: "0 * * * *", tz: "UTC", staggerMs: 0 }, + sessionTarget: "main", + wakeMode: "next-heartbeat", + payload: { + kind: "systemEvent", + text: "tick", + }, + }) as unknown as Record; + + const schedule = normalized.schedule as Record; + expect(schedule.staggerMs).toBe(0); + }); + it("defaults deleteAfterRun for one-shot schedules", () => { const normalized = normalizeCronJobCreate({ name: "default delete", @@ -377,4 +412,13 @@ describe("normalizeCronJobPatch", () => { }) as unknown as Record; expect(cleared.sessionKey).toBeNull(); }); + + it("normalizes cron stagger values in patch schedules", () => { + const normalized = normalizeCronJobPatch({ + schedule: { kind: "cron", expr: "0 * * * *", staggerMs: "30000" }, + }) as unknown as Record; + + const schedule = normalized.schedule as Record; + expect(schedule.staggerMs).toBe(30_000); + }); }); diff --git a/src/cron/normalize.ts b/src/cron/normalize.ts index 3d4c533efa..198bccf8ec 100644 --- a/src/cron/normalize.ts +++ b/src/cron/normalize.ts @@ -1,3 +1,4 @@ +import type { CronJobCreate, CronJobPatch } from "./types.js"; import { sanitizeAgentId } from "../routing/session-key.js"; import { isRecord } from "../utils.js"; import { @@ -8,7 +9,7 @@ import { import { parseAbsoluteTimeMs } from "./parse.js"; import { migrateLegacyCronPayload } from "./payload-migration.js"; import { inferLegacyName } from "./service/normalize.js"; -import type { CronJobCreate, CronJobPatch } from "./types.js"; +import { normalizeCronStaggerMs, resolveDefaultCronStaggerMs } from "./stagger.js"; type UnknownRecord = Record; @@ -61,6 +62,13 @@ function coerceSchedule(schedule: UnknownRecord) { delete next.atMs; } + const staggerMs = normalizeCronStaggerMs(schedule.staggerMs); + if (staggerMs !== undefined) { + next.staggerMs = staggerMs; + } else if ("staggerMs" in next) { + delete next.staggerMs; + } + return next; } @@ -420,6 +428,19 @@ export function normalizeCronJobInput( ) { next.deleteAfterRun = true; } + if ("schedule" in next && isRecord(next.schedule) && next.schedule.kind === "cron") { + const schedule = next.schedule as UnknownRecord; + const explicit = normalizeCronStaggerMs(schedule.staggerMs); + if (explicit !== undefined) { + schedule.staggerMs = explicit; + } else { + const expr = typeof schedule.expr === "string" ? schedule.expr : ""; + const defaultStaggerMs = resolveDefaultCronStaggerMs(expr); + if (defaultStaggerMs !== undefined) { + schedule.staggerMs = defaultStaggerMs; + } + } + } const payload = isRecord(next.payload) ? next.payload : null; const payloadKind = payload && typeof payload.kind === "string" ? payload.kind : ""; const sessionTarget = typeof next.sessionTarget === "string" ? next.sessionTarget : ""; diff --git a/src/cron/service.issue-regressions.test.ts b/src/cron/service.issue-regressions.test.ts index 00af22e975..63f76dd98b 100644 --- a/src/cron/service.issue-regressions.test.ts +++ b/src/cron/service.issue-regressions.test.ts @@ -1,13 +1,14 @@ +import crypto from "node:crypto"; import fs from "node:fs/promises"; import os from "node:os"; import path from "node:path"; import { afterAll, afterEach, beforeAll, beforeEach, describe, expect, it, vi } from "vitest"; +import type { CronJob, CronJobState } from "./types.js"; import * as schedule from "./schedule.js"; import { CronService } from "./service.js"; import { computeJobNextRunAtMs } from "./service/jobs.js"; import { createCronServiceState, type CronEvent } from "./service/state.js"; import { onTimer } from "./service/timer.js"; -import type { CronJob, CronJobState } from "./types.js"; const noopLogger = { info: vi.fn(), @@ -16,6 +17,12 @@ const noopLogger = { debug: vi.fn(), trace: vi.fn(), }; +const TOP_OF_HOUR_STAGGER_MS = 5 * 60 * 1_000; + +function topOfHourOffsetMs(jobId: string) { + const digest = crypto.createHash("sha256").update(jobId).digest(); + return digest.readUInt32BE(0) % TOP_OF_HOUR_STAGGER_MS; +} let fixtureRoot = ""; let fixtureCount = 0; @@ -101,13 +108,14 @@ describe("Cron issue regressions", () => { wakeMode: "next-heartbeat", payload: { kind: "systemEvent", text: "tick" }, }); - expect(created.state.nextRunAtMs).toBe(Date.parse("2026-02-06T11:00:00.000Z")); + const offsetMs = topOfHourOffsetMs(created.id); + expect(created.state.nextRunAtMs).toBe(Date.parse("2026-02-06T11:00:00.000Z") + offsetMs); const updated = await cron.update(created.id, { schedule: { kind: "cron", expr: "0 */2 * * *", tz: "UTC" }, }); - expect(updated.state.nextRunAtMs).toBe(Date.parse("2026-02-06T12:00:00.000Z")); + expect(updated.state.nextRunAtMs).toBe(Date.parse("2026-02-06T12:00:00.000Z") + offsetMs); const forceNow = await cron.add({ name: "force-now", diff --git a/src/cron/service.jobs.test.ts b/src/cron/service.jobs.test.ts index 16fa96e36f..7cfe651e8b 100644 --- a/src/cron/service.jobs.test.ts +++ b/src/cron/service.jobs.test.ts @@ -1,6 +1,8 @@ import { describe, expect, it } from "vitest"; -import { applyJobPatch } from "./service/jobs.js"; +import type { CronServiceState } from "./service/state.js"; import type { CronJob, CronJobPatch } from "./types.js"; +import { applyJobPatch, createJob } from "./service/jobs.js"; +import { DEFAULT_TOP_OF_HOUR_STAGGER_MS } from "./stagger.js"; describe("applyJobPatch", () => { it("clears delivery when switching to main session", () => { @@ -179,3 +181,102 @@ describe("applyJobPatch", () => { expect(job.delivery).toEqual({ mode: "webhook", to: "https://example.invalid/trim" }); }); }); + +function createMockState(now: number): CronServiceState { + return { + deps: { + nowMs: () => now, + }, + } as unknown as CronServiceState; +} + +describe("cron stagger defaults", () => { + it("defaults top-of-hour cron jobs to 5m stagger", () => { + const now = Date.parse("2026-02-08T10:00:00.000Z"); + const state = createMockState(now); + + const job = createJob(state, { + name: "hourly", + enabled: true, + schedule: { kind: "cron", expr: "0 * * * *", tz: "UTC" }, + sessionTarget: "main", + wakeMode: "now", + payload: { kind: "systemEvent", text: "tick" }, + }); + + expect(job.schedule.kind).toBe("cron"); + if (job.schedule.kind === "cron") { + expect(job.schedule.staggerMs).toBe(DEFAULT_TOP_OF_HOUR_STAGGER_MS); + } + }); + + it("keeps exact schedules when staggerMs is explicitly 0", () => { + const now = Date.parse("2026-02-08T10:00:00.000Z"); + const state = createMockState(now); + + const job = createJob(state, { + name: "exact-hourly", + enabled: true, + schedule: { kind: "cron", expr: "0 * * * *", tz: "UTC", staggerMs: 0 }, + sessionTarget: "main", + wakeMode: "now", + payload: { kind: "systemEvent", text: "tick" }, + }); + + expect(job.schedule.kind).toBe("cron"); + if (job.schedule.kind === "cron") { + expect(job.schedule.staggerMs).toBe(0); + } + }); + + it("preserves existing stagger when editing cron expression without stagger", () => { + const now = Date.now(); + const job: CronJob = { + id: "job-keep-stagger", + name: "job-keep-stagger", + enabled: true, + createdAtMs: now, + updatedAtMs: now, + schedule: { kind: "cron", expr: "0 * * * *", tz: "UTC", staggerMs: 120_000 }, + sessionTarget: "main", + wakeMode: "now", + payload: { kind: "systemEvent", text: "tick" }, + state: {}, + }; + + applyJobPatch(job, { + schedule: { kind: "cron", expr: "0 */2 * * *", tz: "UTC" }, + }); + + expect(job.schedule.kind).toBe("cron"); + if (job.schedule.kind === "cron") { + expect(job.schedule.expr).toBe("0 */2 * * *"); + expect(job.schedule.staggerMs).toBe(120_000); + } + }); + + it("applies default stagger when switching from every to top-of-hour cron", () => { + const now = Date.now(); + const job: CronJob = { + id: "job-switch-cron", + name: "job-switch-cron", + enabled: true, + createdAtMs: now, + updatedAtMs: now, + schedule: { kind: "every", everyMs: 60_000 }, + sessionTarget: "main", + wakeMode: "now", + payload: { kind: "systemEvent", text: "tick" }, + state: {}, + }; + + applyJobPatch(job, { + schedule: { kind: "cron", expr: "0 * * * *", tz: "UTC" }, + }); + + expect(job.schedule.kind).toBe("cron"); + if (job.schedule.kind === "cron") { + expect(job.schedule.staggerMs).toBe(DEFAULT_TOP_OF_HOUR_STAGGER_MS); + } + }); +}); diff --git a/src/cron/service.jobs.top-of-hour-stagger.test.ts b/src/cron/service.jobs.top-of-hour-stagger.test.ts new file mode 100644 index 0000000000..c88de3fb8d --- /dev/null +++ b/src/cron/service.jobs.top-of-hour-stagger.test.ts @@ -0,0 +1,93 @@ +import crypto from "node:crypto"; +import { describe, expect, it } from "vitest"; +import type { CronJob } from "./types.js"; +import { computeJobNextRunAtMs } from "./service/jobs.js"; +import { DEFAULT_TOP_OF_HOUR_STAGGER_MS } from "./stagger.js"; + +function stableOffsetMs(jobId: string, windowMs: number) { + const digest = crypto.createHash("sha256").update(jobId).digest(); + return digest.readUInt32BE(0) % windowMs; +} + +function createCronJob(params: { + id: string; + expr: string; + tz?: string; + staggerMs?: number; + state?: CronJob["state"]; +}): CronJob { + return { + id: params.id, + name: params.id, + enabled: true, + createdAtMs: Date.parse("2026-02-06T00:00:00.000Z"), + updatedAtMs: Date.parse("2026-02-06T00:00:00.000Z"), + schedule: { kind: "cron", expr: params.expr, tz: params.tz, staggerMs: params.staggerMs }, + sessionTarget: "main", + wakeMode: "next-heartbeat", + payload: { kind: "systemEvent", text: "tick" }, + state: params.state ?? {}, + }; +} + +describe("computeJobNextRunAtMs top-of-hour staggering", () => { + it("applies deterministic 0..5m stagger for recurring top-of-hour schedules", () => { + const now = Date.parse("2026-02-06T10:05:00.000Z"); + const job = createCronJob({ id: "hourly-job-a", expr: "0 * * * *", tz: "UTC" }); + const offsetMs = stableOffsetMs(job.id, DEFAULT_TOP_OF_HOUR_STAGGER_MS); + + const next = computeJobNextRunAtMs(job, now); + + expect(next).toBe(Date.parse("2026-02-06T11:00:00.000Z") + offsetMs); + expect(offsetMs).toBeGreaterThanOrEqual(0); + expect(offsetMs).toBeLessThan(DEFAULT_TOP_OF_HOUR_STAGGER_MS); + }); + + it("can still fire in the current hour when the staggered slot is ahead", () => { + const now = Date.parse("2026-02-06T10:02:00.000Z"); + const thisHour = Date.parse("2026-02-06T10:00:00.000Z"); + const nextHour = Date.parse("2026-02-06T11:00:00.000Z"); + const job = createCronJob({ id: "hourly-job-b", expr: "0 * * * *", tz: "UTC" }); + const offsetMs = stableOffsetMs(job.id, DEFAULT_TOP_OF_HOUR_STAGGER_MS); + + const expected = thisHour + offsetMs > now ? thisHour + offsetMs : nextHour + offsetMs; + const next = computeJobNextRunAtMs(job, now); + + expect(next).toBe(expected); + }); + + it("also applies to 6-field top-of-hour cron expressions", () => { + const now = Date.parse("2026-02-06T10:05:00.000Z"); + const job = createCronJob({ id: "hourly-job-seconds", expr: "0 0 * * * *", tz: "UTC" }); + const offsetMs = stableOffsetMs(job.id, DEFAULT_TOP_OF_HOUR_STAGGER_MS); + + const next = computeJobNextRunAtMs(job, now); + + expect(next).toBe(Date.parse("2026-02-06T11:00:00.000Z") + offsetMs); + }); + + it("supports explicit stagger for non top-of-hour cron expressions", () => { + const now = Date.parse("2026-02-06T10:05:00.000Z"); + const windowMs = 30_000; + const job = createCronJob({ + id: "minute-17-staggered", + expr: "17 * * * *", + tz: "UTC", + staggerMs: windowMs, + }); + const offsetMs = stableOffsetMs(job.id, windowMs); + + const next = computeJobNextRunAtMs(job, now); + + expect(next).toBe(Date.parse("2026-02-06T10:17:00.000Z") + offsetMs); + }); + + it("keeps schedules exact when staggerMs is set to 0", () => { + const now = Date.parse("2026-02-06T10:05:00.000Z"); + const job = createCronJob({ id: "daily-job", expr: "0 7 * * *", tz: "UTC", staggerMs: 0 }); + + const next = computeJobNextRunAtMs(job, now); + + expect(next).toBe(Date.parse("2026-02-07T07:00:00.000Z")); + }); +}); diff --git a/src/cron/service.store.migration.test.ts b/src/cron/service.store.migration.test.ts index 7ba59757ef..11b5da506a 100644 --- a/src/cron/service.store.migration.test.ts +++ b/src/cron/service.store.migration.test.ts @@ -3,6 +3,7 @@ import os from "node:os"; import path from "node:path"; import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; import { CronService } from "./service.js"; +import { DEFAULT_TOP_OF_HOUR_STAGGER_MS } from "./stagger.js"; import { loadCronStore } from "./store.js"; const noopLogger = { @@ -136,4 +137,68 @@ describe("cron store migration", () => { await store.cleanup(); }); + + it("adds default staggerMs to legacy recurring top-of-hour cron schedules", async () => { + const store = await makeStorePath(); + const createdAtMs = 1_700_000_000_000; + const legacyJob = { + id: "job-cron-legacy", + agentId: undefined, + name: "Legacy cron", + description: null, + enabled: true, + deleteAfterRun: false, + createdAtMs, + updatedAtMs: createdAtMs, + schedule: { kind: "cron", expr: "0 */2 * * *", tz: "UTC" }, + sessionTarget: "main", + wakeMode: "next-heartbeat", + payload: { + kind: "systemEvent", + text: "tick", + }, + state: {}, + }; + await fs.mkdir(path.dirname(store.storePath), { recursive: true }); + await fs.writeFile(store.storePath, JSON.stringify({ version: 1, jobs: [legacyJob] }, null, 2)); + + const migrated = await migrateAndLoadFirstJob(store.storePath); + const schedule = migrated.schedule as Record; + expect(schedule.kind).toBe("cron"); + expect(schedule.staggerMs).toBe(DEFAULT_TOP_OF_HOUR_STAGGER_MS); + + await store.cleanup(); + }); + + it("adds default staggerMs to legacy 6-field top-of-hour cron schedules", async () => { + const store = await makeStorePath(); + const createdAtMs = 1_700_000_000_000; + const legacyJob = { + id: "job-cron-seconds-legacy", + agentId: undefined, + name: "Legacy cron seconds", + description: null, + enabled: true, + deleteAfterRun: false, + createdAtMs, + updatedAtMs: createdAtMs, + schedule: { kind: "cron", expr: "0 0 */3 * * *", tz: "UTC" }, + sessionTarget: "main", + wakeMode: "next-heartbeat", + payload: { + kind: "systemEvent", + text: "tick", + }, + state: {}, + }; + await fs.mkdir(path.dirname(store.storePath), { recursive: true }); + await fs.writeFile(store.storePath, JSON.stringify({ version: 1, jobs: [legacyJob] }, null, 2)); + + const migrated = await migrateAndLoadFirstJob(store.storePath); + const schedule = migrated.schedule as Record; + expect(schedule.kind).toBe("cron"); + expect(schedule.staggerMs).toBe(DEFAULT_TOP_OF_HOUR_STAGGER_MS); + + await store.cleanup(); + }); }); diff --git a/src/cron/service/jobs.ts b/src/cron/service/jobs.ts index b3e246d112..302ccefa25 100644 --- a/src/cron/service/jobs.ts +++ b/src/cron/service/jobs.ts @@ -1,6 +1,4 @@ import crypto from "node:crypto"; -import { parseAbsoluteTimeMs } from "../parse.js"; -import { computeNextRunAtMs } from "../schedule.js"; import type { CronDelivery, CronDeliveryPatch, @@ -10,6 +8,14 @@ import type { CronPayload, CronPayloadPatch, } from "../types.js"; +import type { CronServiceState } from "./state.js"; +import { parseAbsoluteTimeMs } from "../parse.js"; +import { computeNextRunAtMs } from "../schedule.js"; +import { + normalizeCronStaggerMs, + resolveCronStaggerMs, + resolveDefaultCronStaggerMs, +} from "../stagger.js"; import { normalizeHttpWebhookUrl } from "../webhook-url.js"; import { normalizeOptionalAgentId, @@ -18,10 +24,45 @@ import { normalizePayloadToSystemText, normalizeRequiredName, } from "./normalize.js"; -import type { CronServiceState } from "./state.js"; const STUCK_RUN_MS = 2 * 60 * 60 * 1000; +function resolveStableCronOffsetMs(jobId: string, staggerMs: number) { + if (staggerMs <= 1) { + return 0; + } + const digest = crypto.createHash("sha256").update(jobId).digest(); + return digest.readUInt32BE(0) % staggerMs; +} + +function computeStaggeredCronNextRunAtMs(job: CronJob, nowMs: number) { + if (job.schedule.kind !== "cron") { + return computeNextRunAtMs(job.schedule, nowMs); + } + + const staggerMs = resolveCronStaggerMs(job.schedule); + const offsetMs = resolveStableCronOffsetMs(job.id, staggerMs); + if (offsetMs <= 0) { + return computeNextRunAtMs(job.schedule, nowMs); + } + + // Shift the schedule cursor backwards by the per-job offset so we can still + // target the current schedule window if its staggered slot has not passed yet. + let cursorMs = Math.max(0, nowMs - offsetMs); + for (let attempt = 0; attempt < 4; attempt += 1) { + const baseNext = computeNextRunAtMs(job.schedule, cursorMs); + if (baseNext === undefined) { + return undefined; + } + const shifted = baseNext + offsetMs; + if (shifted > nowMs) { + return shifted; + } + cursorMs = Math.max(cursorMs + 1, baseNext + 1_000); + } + return undefined; +} + function resolveEveryAnchorMs(params: { schedule: { everyMs: number; anchorMs?: number }; fallbackAnchorMs: number; @@ -97,18 +138,7 @@ export function computeJobNextRunAtMs(job: CronJob, nowMs: number): number | und : null; return atMs !== null ? atMs : undefined; } - const next = computeNextRunAtMs(job.schedule, nowMs); - // Guard against the scheduler returning a time within the same second as - // nowMs. When a cron job completes within the same wall-clock second it - // was scheduled for, some croner versions/timezone combinations may return - // the current second (or computeNextRunAtMs may return undefined, which - // triggers recomputation). Advancing to the next second and retrying - // ensures we always land on the *next* occurrence. (See #17821) - if (next === undefined && job.schedule.kind === "cron") { - const nextSecondMs = (Math.floor(nowMs / 1000) + 1) * 1000; - return computeNextRunAtMs(job.schedule, nextSecondMs); - } - return next; + return computeStaggeredCronNextRunAtMs(job, nowMs); } /** Maximum consecutive schedule errors before auto-disabling a job. */ @@ -288,7 +318,18 @@ export function createJob(state: CronServiceState, input: CronJobCreate): CronJo fallbackAnchorMs: now, }), } - : input.schedule; + : input.schedule.kind === "cron" + ? (() => { + const explicitStaggerMs = normalizeCronStaggerMs(input.schedule.staggerMs); + if (explicitStaggerMs !== undefined) { + return { ...input.schedule, staggerMs: explicitStaggerMs }; + } + const defaultStaggerMs = resolveDefaultCronStaggerMs(input.schedule.expr); + return defaultStaggerMs !== undefined + ? { ...input.schedule, staggerMs: defaultStaggerMs } + : input.schedule; + })() + : input.schedule; const deleteAfterRun = typeof input.deleteAfterRun === "boolean" ? input.deleteAfterRun @@ -335,7 +376,22 @@ export function applyJobPatch(job: CronJob, patch: CronJobPatch) { job.deleteAfterRun = patch.deleteAfterRun; } if (patch.schedule) { - job.schedule = patch.schedule; + if (patch.schedule.kind === "cron") { + const explicitStaggerMs = normalizeCronStaggerMs(patch.schedule.staggerMs); + if (explicitStaggerMs !== undefined) { + job.schedule = { ...patch.schedule, staggerMs: explicitStaggerMs }; + } else if (job.schedule.kind === "cron") { + job.schedule = { ...patch.schedule, staggerMs: job.schedule.staggerMs }; + } else { + const defaultStaggerMs = resolveDefaultCronStaggerMs(patch.schedule.expr); + job.schedule = + defaultStaggerMs !== undefined + ? { ...patch.schedule, staggerMs: defaultStaggerMs } + : patch.schedule; + } + } else { + job.schedule = patch.schedule; + } } if (patch.sessionTarget) { job.sessionTarget = patch.sessionTarget; diff --git a/src/cron/service/store.ts b/src/cron/service/store.ts index b29dd51719..506f7a55a4 100644 --- a/src/cron/service/store.ts +++ b/src/cron/service/store.ts @@ -1,4 +1,6 @@ import fs from "node:fs"; +import type { CronJob } from "../types.js"; +import type { CronServiceState } from "./state.js"; import { buildDeliveryFromLegacyPayload, hasLegacyDeliveryHints, @@ -6,11 +8,10 @@ import { } from "../legacy-delivery.js"; import { parseAbsoluteTimeMs } from "../parse.js"; import { migrateLegacyCronPayload } from "../payload-migration.js"; +import { normalizeCronStaggerMs, resolveDefaultCronStaggerMs } from "../stagger.js"; import { loadCronStore, saveCronStore } from "../store.js"; -import type { CronJob } from "../types.js"; import { recomputeNextRuns } from "./jobs.js"; import { inferLegacyName, normalizeOptionalText } from "./normalize.js"; -import type { CronServiceState } from "./state.js"; function buildDeliveryPatchFromLegacyPayload(payload: Record) { const deliver = payload.deliver; @@ -380,6 +381,26 @@ export async function ensureLoaded( mutated = true; } } + + const exprRaw = typeof sched.expr === "string" ? sched.expr.trim() : ""; + if (typeof sched.expr === "string" && sched.expr !== exprRaw) { + sched.expr = exprRaw; + mutated = true; + } + if ((kind === "cron" || sched.kind === "cron") && exprRaw) { + const explicitStaggerMs = normalizeCronStaggerMs(sched.staggerMs); + const defaultStaggerMs = resolveDefaultCronStaggerMs(exprRaw); + const targetStaggerMs = explicitStaggerMs ?? defaultStaggerMs; + if (targetStaggerMs === undefined) { + if ("staggerMs" in sched) { + delete sched.staggerMs; + mutated = true; + } + } else if (sched.staggerMs !== targetStaggerMs) { + sched.staggerMs = targetStaggerMs; + mutated = true; + } + } } const delivery = raw.delivery; diff --git a/src/cron/stagger.test.ts b/src/cron/stagger.test.ts new file mode 100644 index 0000000000..d62e3fe3d6 --- /dev/null +++ b/src/cron/stagger.test.ts @@ -0,0 +1,36 @@ +import { describe, expect, it } from "vitest"; +import { + DEFAULT_TOP_OF_HOUR_STAGGER_MS, + isRecurringTopOfHourCronExpr, + normalizeCronStaggerMs, + resolveCronStaggerMs, +} from "./stagger.js"; + +describe("cron stagger helpers", () => { + it("detects recurring top-of-hour cron expressions for 5-field and 6-field cron", () => { + expect(isRecurringTopOfHourCronExpr("0 * * * *")).toBe(true); + expect(isRecurringTopOfHourCronExpr("0 */2 * * *")).toBe(true); + expect(isRecurringTopOfHourCronExpr("0 0 */3 * * *")).toBe(true); + expect(isRecurringTopOfHourCronExpr("0 7 * * *")).toBe(false); + expect(isRecurringTopOfHourCronExpr("15 * * * *")).toBe(false); + }); + + it("normalizes explicit stagger values", () => { + expect(normalizeCronStaggerMs("30000")).toBe(30_000); + expect(normalizeCronStaggerMs(42.8)).toBe(42); + expect(normalizeCronStaggerMs(-10)).toBe(0); + expect(normalizeCronStaggerMs("")).toBeUndefined(); + expect(normalizeCronStaggerMs("abc")).toBeUndefined(); + }); + + it("resolves effective stagger for cron schedules", () => { + expect(resolveCronStaggerMs({ kind: "cron", expr: "0 * * * *" })).toBe( + DEFAULT_TOP_OF_HOUR_STAGGER_MS, + ); + expect(resolveCronStaggerMs({ kind: "cron", expr: "0 * * * *", staggerMs: 30_000 })).toBe( + 30_000, + ); + expect(resolveCronStaggerMs({ kind: "cron", expr: "0 * * * *", staggerMs: 0 })).toBe(0); + expect(resolveCronStaggerMs({ kind: "cron", expr: "15 * * * *" })).toBe(0); + }); +}); diff --git a/src/cron/stagger.ts b/src/cron/stagger.ts new file mode 100644 index 0000000000..2eecdd18f3 --- /dev/null +++ b/src/cron/stagger.ts @@ -0,0 +1,45 @@ +import type { CronSchedule } from "./types.js"; + +export const DEFAULT_TOP_OF_HOUR_STAGGER_MS = 5 * 60 * 1000; + +function parseCronFields(expr: string) { + return expr.trim().split(/\s+/).filter(Boolean); +} + +export function isRecurringTopOfHourCronExpr(expr: string) { + const fields = parseCronFields(expr); + if (fields.length === 5) { + const [minuteField, hourField] = fields; + return minuteField === "0" && hourField.includes("*"); + } + if (fields.length === 6) { + const [secondField, minuteField, hourField] = fields; + return secondField === "0" && minuteField === "0" && hourField.includes("*"); + } + return false; +} + +export function normalizeCronStaggerMs(raw: unknown): number | undefined { + const numeric = + typeof raw === "number" + ? raw + : typeof raw === "string" && raw.trim() + ? Number(raw) + : Number.NaN; + if (!Number.isFinite(numeric)) { + return undefined; + } + return Math.max(0, Math.floor(numeric)); +} + +export function resolveDefaultCronStaggerMs(expr: string): number | undefined { + return isRecurringTopOfHourCronExpr(expr) ? DEFAULT_TOP_OF_HOUR_STAGGER_MS : undefined; +} + +export function resolveCronStaggerMs(schedule: Extract): number { + const explicit = normalizeCronStaggerMs(schedule.staggerMs); + if (explicit !== undefined) { + return explicit; + } + return resolveDefaultCronStaggerMs(schedule.expr) ?? 0; +} diff --git a/src/cron/types.ts b/src/cron/types.ts index 2d525ffecd..435a1ddaf3 100644 --- a/src/cron/types.ts +++ b/src/cron/types.ts @@ -3,7 +3,13 @@ import type { ChannelId } from "../channels/plugins/types.js"; export type CronSchedule = | { kind: "at"; at: string } | { kind: "every"; everyMs: number; anchorMs?: number } - | { kind: "cron"; expr: string; tz?: string }; + | { + kind: "cron"; + expr: string; + tz?: string; + /** Optional deterministic stagger window in milliseconds (0 keeps exact schedule). */ + staggerMs?: number; + }; export type CronSessionTarget = "main" | "isolated"; export type CronWakeMode = "next-heartbeat" | "now"; diff --git a/src/gateway/protocol/schema/cron.ts b/src/gateway/protocol/schema/cron.ts index a45e3403a4..270ca4a742 100644 --- a/src/gateway/protocol/schema/cron.ts +++ b/src/gateway/protocol/schema/cron.ts @@ -40,6 +40,7 @@ export const CronScheduleSchema = Type.Union([ kind: Type.Literal("cron"), expr: NonEmptyString, tz: Type.Optional(Type.String()), + staggerMs: Type.Optional(Type.Integer({ minimum: 0 })), }, { additionalProperties: false }, ),