From 11f3da7669fc89facc0307548ce87b5860789fd8 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Mon, 16 Feb 2026 22:30:16 +0000 Subject: [PATCH] refactor(test): dedupe cron service test harness setup --- ...runs-one-shot-main-job-disables-it.test.ts | 283 +++++++++--------- 1 file changed, 138 insertions(+), 145 deletions(-) diff --git a/src/cron/service.runs-one-shot-main-job-disables-it.test.ts b/src/cron/service.runs-one-shot-main-job-disables-it.test.ts index 6fa4c0fa62..64f5e39ae1 100644 --- a/src/cron/service.runs-one-shot-main-job-disables-it.test.ts +++ b/src/cron/service.runs-one-shot-main-job-disables-it.test.ts @@ -238,45 +238,74 @@ function createCronEventHarness() { return { onEvent, waitFor, events }; } -async function createMainOneShotHarness() { +type CronHarnessOptions = { + runIsolatedAgentJob?: ReturnType; + runHeartbeatOnce?: ReturnType; + nowMs?: () => number; + wakeNowHeartbeatBusyMaxWaitMs?: number; + wakeNowHeartbeatBusyRetryDelayMs?: number; + withEvents?: boolean; +}; + +async function createCronHarness(options: CronHarnessOptions = {}) { ensureDir(fixturesRoot); const store = await makeStorePath(); const enqueueSystemEvent = vi.fn(); const requestHeartbeatNow = vi.fn(); - const events = createCronEventHarness(); + const events = options.withEvents === false ? undefined : createCronEventHarness(); const cron = new CronService({ storePath: store.storePath, cronEnabled: true, log: noopLogger, + ...(options.nowMs ? { nowMs: options.nowMs } : {}), + ...(options.wakeNowHeartbeatBusyMaxWaitMs !== undefined + ? { wakeNowHeartbeatBusyMaxWaitMs: options.wakeNowHeartbeatBusyMaxWaitMs } + : {}), + ...(options.wakeNowHeartbeatBusyRetryDelayMs !== undefined + ? { wakeNowHeartbeatBusyRetryDelayMs: options.wakeNowHeartbeatBusyRetryDelayMs } + : {}), enqueueSystemEvent, requestHeartbeatNow, - runIsolatedAgentJob: vi.fn(async () => ({ status: "ok" })), - onEvent: events.onEvent, + ...(options.runHeartbeatOnce ? { runHeartbeatOnce: options.runHeartbeatOnce } : {}), + runIsolatedAgentJob: options.runIsolatedAgentJob ?? vi.fn(async () => ({ status: "ok" })), + ...(events ? { onEvent: events.onEvent } : {}), }); await cron.start(); return { store, cron, enqueueSystemEvent, requestHeartbeatNow, events }; } +async function createMainOneShotHarness() { + const harness = await createCronHarness(); + if (!harness.events) { + throw new Error("missing event harness"); + } + return { ...harness, events: harness.events }; +} + async function createIsolatedAnnounceHarness(runIsolatedAgentJob: ReturnType) { - ensureDir(fixturesRoot); - const store = await makeStorePath(); - const enqueueSystemEvent = vi.fn(); - const requestHeartbeatNow = vi.fn(); - const events = createCronEventHarness(); - - const cron = new CronService({ - storePath: store.storePath, - cronEnabled: true, - log: noopLogger, - enqueueSystemEvent, - requestHeartbeatNow, + const harness = await createCronHarness({ runIsolatedAgentJob, - onEvent: events.onEvent, }); + if (!harness.events) { + throw new Error("missing event harness"); + } + return { ...harness, events: harness.events }; +} - await cron.start(); - return { store, cron, enqueueSystemEvent, requestHeartbeatNow, events }; +async function createWakeModeNowMainHarness(options: { + nowMs?: () => number; + runHeartbeatOnce: ReturnType; + wakeNowHeartbeatBusyMaxWaitMs?: number; + wakeNowHeartbeatBusyRetryDelayMs?: number; +}) { + return createCronHarness({ + runHeartbeatOnce: options.runHeartbeatOnce, + nowMs: options.nowMs, + wakeNowHeartbeatBusyMaxWaitMs: options.wakeNowHeartbeatBusyMaxWaitMs, + wakeNowHeartbeatBusyRetryDelayMs: options.wakeNowHeartbeatBusyRetryDelayMs, + withEvents: false, + }); } async function addDefaultIsolatedAnnounceJob(cron: CronService, name: string) { @@ -293,6 +322,60 @@ async function addDefaultIsolatedAnnounceJob(cron: CronService, name: string) { return { job, runAt }; } +async function runIsolatedAnnounceJobAndWait(params: { + cron: CronService; + events: ReturnType; + name: string; + status: "ok" | "error"; +}) { + const { job, runAt } = await addDefaultIsolatedAnnounceJob(params.cron, params.name); + vi.setSystemTime(runAt); + await vi.runOnlyPendingTimersAsync(); + await params.events.waitFor( + (evt) => evt.jobId === job.id && evt.action === "finished" && evt.status === params.status, + ); + return job; +} + +async function addWakeModeNowMainSystemEventJob( + cron: CronService, + options?: { name?: string; agentId?: string }, +) { + return cron.add({ + name: options?.name ?? "wakeMode now", + ...(options?.agentId ? { agentId: options.agentId } : {}), + enabled: true, + schedule: { kind: "at", at: new Date(1).toISOString() }, + sessionTarget: "main", + wakeMode: "now", + payload: { kind: "systemEvent", text: "hello" }, + }); +} + +function createLegacyDeliveryMigrationJob(options: { + id: string; + payload: { provider?: string; channel?: string }; +}) { + return { + id: options.id, + name: "legacy", + enabled: true, + createdAtMs: Date.now(), + updatedAtMs: Date.now(), + schedule: { kind: "cron", expr: "* * * * *" }, + sessionTarget: "isolated", + wakeMode: "now", + payload: { + kind: "agentTurn", + message: "hi", + deliver: true, + ...options.payload, + to: "7200373102", + }, + state: {}, + }; +} + async function loadLegacyDeliveryMigration(rawJob: Record) { ensureDir(fixturesRoot); const store = await makeStorePath(); @@ -377,11 +460,6 @@ describe("CronService", () => { }); it("wakeMode now waits for heartbeat completion when available", async () => { - ensureDir(fixturesRoot); - const store = await makeStorePath(); - const enqueueSystemEvent = vi.fn(); - const requestHeartbeatNow = vi.fn(); - let now = 0; const nowMs = () => { now += 10; @@ -396,26 +474,12 @@ describe("CronService", () => { }), ); - const cron = new CronService({ - storePath: store.storePath, - cronEnabled: true, - log: noopLogger, - nowMs, - enqueueSystemEvent, - requestHeartbeatNow, - runHeartbeatOnce, - runIsolatedAgentJob: vi.fn(async () => ({ status: "ok" })), - }); - - await cron.start(); - const job = await cron.add({ - name: "wakeMode now waits", - enabled: true, - schedule: { kind: "at", at: new Date(1).toISOString() }, - sessionTarget: "main", - wakeMode: "now", - payload: { kind: "systemEvent", text: "hello" }, - }); + const { store, cron, enqueueSystemEvent, requestHeartbeatNow } = + await createWakeModeNowMainHarness({ + runHeartbeatOnce, + nowMs, + }); + const job = await addWakeModeNowMainSystemEventJob(cron, { name: "wakeMode now waits" }); const runPromise = cron.run(job.id, "force"); for (let i = 0; i < 10; i++) { @@ -445,34 +509,19 @@ describe("CronService", () => { }); it("passes agentId to runHeartbeatOnce for main-session wakeMode now jobs", async () => { - ensureDir(fixturesRoot); - const store = await makeStorePath(); - const enqueueSystemEvent = vi.fn(); - const requestHeartbeatNow = vi.fn(); const runHeartbeatOnce = vi.fn(async () => ({ status: "ran" as const, durationMs: 1 })); - const cron = new CronService({ - storePath: store.storePath, - cronEnabled: true, - log: noopLogger, - // Perf: avoid advancing fake timers by 2+ minutes for the busy-heartbeat fallback. - wakeNowHeartbeatBusyMaxWaitMs: 1, - wakeNowHeartbeatBusyRetryDelayMs: 2, - enqueueSystemEvent, - requestHeartbeatNow, - runHeartbeatOnce, - runIsolatedAgentJob: vi.fn(async () => ({ status: "ok" })), - }); + const { store, cron, enqueueSystemEvent, requestHeartbeatNow } = + await createWakeModeNowMainHarness({ + runHeartbeatOnce, + // Perf: avoid advancing fake timers by 2+ minutes for the busy-heartbeat fallback. + wakeNowHeartbeatBusyMaxWaitMs: 1, + wakeNowHeartbeatBusyRetryDelayMs: 2, + }); - await cron.start(); - const job = await cron.add({ + const job = await addWakeModeNowMainSystemEventJob(cron, { name: "wakeMode now with agent", agentId: "ops", - enabled: true, - schedule: { kind: "at", at: new Date(1).toISOString() }, - sessionTarget: "main", - wakeMode: "now", - payload: { kind: "systemEvent", text: "hello" }, }); await cron.run(job.id, "force"); @@ -495,10 +544,6 @@ describe("CronService", () => { }); it("wakeMode now falls back to queued heartbeat when main lane stays busy", async () => { - ensureDir(fixturesRoot); - const store = await makeStorePath(); - const enqueueSystemEvent = vi.fn(); - const requestHeartbeatNow = vi.fn(); const runHeartbeatOnce = vi.fn(async () => ({ status: "skipped" as const, reason: "requests-in-flight", @@ -509,29 +554,15 @@ describe("CronService", () => { return now; }; - const cron = new CronService({ - storePath: store.storePath, - cronEnabled: true, - log: noopLogger, + const { store, cron, requestHeartbeatNow } = await createWakeModeNowMainHarness({ + runHeartbeatOnce, nowMs, // Perf: avoid advancing fake timers by 2+ minutes for the busy-heartbeat fallback. wakeNowHeartbeatBusyMaxWaitMs: 1, wakeNowHeartbeatBusyRetryDelayMs: 2, - enqueueSystemEvent, - requestHeartbeatNow, - runHeartbeatOnce, - runIsolatedAgentJob: vi.fn(async () => ({ status: "ok" })), }); - await cron.start(); - const job = await cron.add({ - name: "wakeMode now fallback", - enabled: true, - schedule: { kind: "at", at: new Date(1).toISOString() }, - sessionTarget: "main", - wakeMode: "now", - payload: { kind: "systemEvent", text: "hello" }, - }); + const job = await addWakeModeNowMainSystemEventJob(cron, { name: "wakeMode now fallback" }); await cron.run(job.id, "force"); @@ -549,14 +580,7 @@ describe("CronService", () => { const runIsolatedAgentJob = vi.fn(async () => ({ status: "ok" as const, summary: "done" })); const { store, cron, enqueueSystemEvent, requestHeartbeatNow, events } = await createIsolatedAnnounceHarness(runIsolatedAgentJob); - const { job, runAt } = await addDefaultIsolatedAnnounceJob(cron, "weekly"); - - vi.setSystemTime(runAt); - await vi.runOnlyPendingTimersAsync(); - - await events.waitFor( - (evt) => evt.jobId === job.id && evt.action === "finished" && evt.status === "ok", - ); + await runIsolatedAnnounceJobAndWait({ cron, events, name: "weekly", status: "ok" }); expect(runIsolatedAgentJob).toHaveBeenCalledTimes(1); expect(enqueueSystemEvent).toHaveBeenCalledWith( "Cron: done", @@ -575,14 +599,12 @@ describe("CronService", () => { })); const { store, cron, enqueueSystemEvent, requestHeartbeatNow, events } = await createIsolatedAnnounceHarness(runIsolatedAgentJob); - - const { job, runAt } = await addDefaultIsolatedAnnounceJob(cron, "weekly delivered"); - vi.setSystemTime(runAt); - await vi.runOnlyPendingTimersAsync(); - - await events.waitFor( - (evt) => evt.jobId === job.id && evt.action === "finished" && evt.status === "ok", - ); + await runIsolatedAnnounceJobAndWait({ + cron, + events, + name: "weekly delivered", + status: "ok", + }); expect(runIsolatedAgentJob).toHaveBeenCalledTimes(1); expect(enqueueSystemEvent).not.toHaveBeenCalled(); expect(requestHeartbeatNow).not.toHaveBeenCalled(); @@ -591,24 +613,10 @@ describe("CronService", () => { }); it("migrates legacy payload.provider to payload.channel on load", async () => { - const rawJob = { + const rawJob = createLegacyDeliveryMigrationJob({ id: "legacy-1", - name: "legacy", - enabled: true, - createdAtMs: Date.now(), - updatedAtMs: Date.now(), - schedule: { kind: "cron", expr: "* * * * *" }, - sessionTarget: "isolated", - wakeMode: "now", - payload: { - kind: "agentTurn", - message: "hi", - deliver: true, - provider: " TeLeGrAm ", - to: "7200373102", - }, - state: {}, - }; + payload: { provider: " TeLeGrAm " }, + }); const { store, cron, job } = await loadLegacyDeliveryMigration(rawJob); // Legacy delivery fields are migrated to the top-level delivery object const delivery = job?.delivery as unknown as Record; @@ -622,24 +630,10 @@ describe("CronService", () => { }); it("canonicalizes payload.channel casing on load", async () => { - const rawJob = { + const rawJob = createLegacyDeliveryMigrationJob({ id: "legacy-2", - name: "legacy", - enabled: true, - createdAtMs: Date.now(), - updatedAtMs: Date.now(), - schedule: { kind: "cron", expr: "* * * * *" }, - sessionTarget: "isolated", - wakeMode: "now", - payload: { - kind: "agentTurn", - message: "hi", - deliver: true, - channel: "Telegram", - to: "7200373102", - }, - state: {}, - }; + payload: { channel: "Telegram" }, + }); const { store, cron, job } = await loadLegacyDeliveryMigration(rawJob); // Legacy delivery fields are migrated to the top-level delivery object const delivery = job?.delivery as unknown as Record; @@ -657,13 +651,12 @@ describe("CronService", () => { })); const { store, cron, enqueueSystemEvent, requestHeartbeatNow, events } = await createIsolatedAnnounceHarness(runIsolatedAgentJob); - const { job, runAt } = await addDefaultIsolatedAnnounceJob(cron, "isolated error test"); - - vi.setSystemTime(runAt); - await vi.runOnlyPendingTimersAsync(); - await events.waitFor( - (evt) => evt.jobId === job.id && evt.action === "finished" && evt.status === "error", - ); + await runIsolatedAnnounceJobAndWait({ + cron, + events, + name: "isolated error test", + status: "error", + }); expect(enqueueSystemEvent).toHaveBeenCalledWith( "Cron (error): last output",