diff --git a/src/cron/service.runs-one-shot-main-job-disables-it.test.ts b/src/cron/service.runs-one-shot-main-job-disables-it.test.ts index 4a0e356fb2..70d78721bd 100644 --- a/src/cron/service.runs-one-shot-main-job-disables-it.test.ts +++ b/src/cron/service.runs-one-shot-main-job-disables-it.test.ts @@ -2,7 +2,7 @@ import fs from "node:fs/promises"; import path from "node:path"; import { describe, expect, it, vi } from "vitest"; import type { HeartbeatRunResult } from "../infra/heartbeat-wake.js"; -import type { CronJob } from "./types.js"; +import type { CronEvent } from "./service.js"; import { CronService } from "./service.js"; import { createCronStoreHarness, @@ -14,16 +14,46 @@ const noopLogger = createNoopLogger(); const { makeStorePath } = createCronStoreHarness(); installCronTestHooks({ logger: noopLogger }); -async function waitForJobs(cron: CronService, predicate: (jobs: CronJob[]) => boolean) { - let latest: CronJob[] = []; - for (let i = 0; i < 30; i++) { - latest = await cron.list({ includeDisabled: true }); - if (predicate(latest)) { - return latest; +function createDeferred() { + let resolve!: (value: T) => void; + let reject!: (reason?: unknown) => void; + const promise = new Promise((res, rej) => { + resolve = res; + reject = rej; + }); + return { promise, resolve, reject }; +} + +function createCronEventHarness() { + const events: CronEvent[] = []; + const waiters: Array<{ + predicate: (evt: CronEvent) => boolean; + deferred: ReturnType>; + }> = []; + + const onEvent = (evt: CronEvent) => { + events.push(evt); + for (let i = waiters.length - 1; i >= 0; i -= 1) { + const waiter = waiters[i]; + if (waiter && waiter.predicate(evt)) { + waiters.splice(i, 1); + waiter.deferred.resolve(evt); + } } - await vi.runOnlyPendingTimersAsync(); - } - return latest; + }; + + const waitFor = (predicate: (evt: CronEvent) => boolean) => { + for (const evt of events) { + if (predicate(evt)) { + return Promise.resolve(evt); + } + } + const deferred = createDeferred(); + waiters.push({ predicate, deferred }); + return deferred.promise; + }; + + return { onEvent, waitFor, events }; } describe("CronService", () => { @@ -31,6 +61,7 @@ describe("CronService", () => { const store = await makeStorePath(); const enqueueSystemEvent = vi.fn(); const requestHeartbeatNow = vi.fn(); + const events = createCronEventHarness(); const cron = new CronService({ storePath: store.storePath, @@ -39,6 +70,7 @@ describe("CronService", () => { enqueueSystemEvent, requestHeartbeatNow, runIsolatedAgentJob: vi.fn(async () => ({ status: "ok" })), + onEvent: events.onEvent, }); await cron.start(); @@ -57,10 +89,9 @@ describe("CronService", () => { vi.setSystemTime(new Date("2025-12-13T00:00:02.000Z")); await vi.runOnlyPendingTimersAsync(); + await events.waitFor((evt) => evt.jobId === job.id && evt.action === "finished"); - const jobs = await waitForJobs(cron, (items) => - items.some((item) => item.id === job.id && !item.enabled), - ); + const jobs = await cron.list({ includeDisabled: true }); const updated = jobs.find((j) => j.id === job.id); expect(updated?.enabled).toBe(false); expect(enqueueSystemEvent).toHaveBeenCalledWith("hello", { @@ -77,6 +108,7 @@ describe("CronService", () => { const store = await makeStorePath(); const enqueueSystemEvent = vi.fn(); const requestHeartbeatNow = vi.fn(); + const events = createCronEventHarness(); const cron = new CronService({ storePath: store.storePath, @@ -85,6 +117,7 @@ describe("CronService", () => { enqueueSystemEvent, requestHeartbeatNow, runIsolatedAgentJob: vi.fn(async () => ({ status: "ok" })), + onEvent: events.onEvent, }); await cron.start(); @@ -100,8 +133,9 @@ describe("CronService", () => { vi.setSystemTime(new Date("2025-12-13T00:00:02.000Z")); await vi.runOnlyPendingTimersAsync(); + await events.waitFor((evt) => evt.jobId === job.id && evt.action === "removed"); - const jobs = await waitForJobs(cron, (items) => !items.some((item) => item.id === job.id)); + const jobs = await cron.list({ includeDisabled: true }); expect(jobs.find((j) => j.id === job.id)).toBeUndefined(); expect(enqueueSystemEvent).toHaveBeenCalledWith("hello", { agentId: undefined, @@ -282,6 +316,7 @@ describe("CronService", () => { status: "ok" as const, summary: "done", })); + const events = createCronEventHarness(); const cron = new CronService({ storePath: store.storePath, @@ -290,11 +325,12 @@ describe("CronService", () => { enqueueSystemEvent, requestHeartbeatNow, runIsolatedAgentJob, + onEvent: events.onEvent, }); await cron.start(); const atMs = Date.parse("2025-12-13T00:00:01.000Z"); - await cron.add({ + const job = await cron.add({ enabled: true, name: "weekly", schedule: { kind: "at", at: new Date(atMs).toISOString() }, @@ -307,7 +343,9 @@ describe("CronService", () => { vi.setSystemTime(new Date("2025-12-13T00:00:01.000Z")); await vi.runOnlyPendingTimersAsync(); - await waitForJobs(cron, (items) => items.some((item) => item.state.lastStatus === "ok")); + await events.waitFor( + (evt) => evt.jobId === job.id && evt.action === "finished" && evt.status === "ok", + ); expect(runIsolatedAgentJob).toHaveBeenCalledTimes(1); expect(enqueueSystemEvent).toHaveBeenCalledWith("Cron: done", { agentId: undefined, @@ -326,6 +364,7 @@ describe("CronService", () => { summary: "done", delivered: true, })); + const events = createCronEventHarness(); const cron = new CronService({ storePath: store.storePath, @@ -334,11 +373,12 @@ describe("CronService", () => { enqueueSystemEvent, requestHeartbeatNow, runIsolatedAgentJob, + onEvent: events.onEvent, }); await cron.start(); const atMs = Date.parse("2025-12-13T00:00:01.000Z"); - await cron.add({ + const job = await cron.add({ enabled: true, name: "weekly delivered", schedule: { kind: "at", at: new Date(atMs).toISOString() }, @@ -351,7 +391,9 @@ describe("CronService", () => { vi.setSystemTime(new Date("2025-12-13T00:00:01.000Z")); await vi.runOnlyPendingTimersAsync(); - await waitForJobs(cron, (items) => items.some((item) => item.state.lastStatus === "ok")); + await events.waitFor( + (evt) => evt.jobId === job.id && evt.action === "finished" && evt.status === "ok", + ); expect(runIsolatedAgentJob).toHaveBeenCalledTimes(1); expect(enqueueSystemEvent).not.toHaveBeenCalled(); expect(requestHeartbeatNow).not.toHaveBeenCalled(); @@ -473,6 +515,7 @@ describe("CronService", () => { summary: "last output", error: "boom", })); + const events = createCronEventHarness(); const cron = new CronService({ storePath: store.storePath, @@ -481,11 +524,12 @@ describe("CronService", () => { enqueueSystemEvent, requestHeartbeatNow, runIsolatedAgentJob, + onEvent: events.onEvent, }); await cron.start(); const atMs = Date.parse("2025-12-13T00:00:01.000Z"); - await cron.add({ + const job = await cron.add({ name: "isolated error test", enabled: true, schedule: { kind: "at", at: new Date(atMs).toISOString() }, @@ -497,7 +541,9 @@ describe("CronService", () => { vi.setSystemTime(new Date("2025-12-13T00:00:01.000Z")); await vi.runOnlyPendingTimersAsync(); - await waitForJobs(cron, (items) => items.some((item) => item.state.lastStatus === "error")); + await events.waitFor( + (evt) => evt.jobId === job.id && evt.action === "finished" && evt.status === "error", + ); expect(enqueueSystemEvent).toHaveBeenCalledWith("Cron (error): last output", { agentId: undefined, @@ -551,6 +597,7 @@ describe("CronService", () => { const store = await makeStorePath(); const enqueueSystemEvent = vi.fn(); const requestHeartbeatNow = vi.fn(); + const events = createCronEventHarness(); const atMs = Date.parse("2025-12-13T00:00:01.000Z"); await fs.mkdir(path.dirname(store.storePath), { recursive: true }); @@ -581,17 +628,21 @@ describe("CronService", () => { enqueueSystemEvent, requestHeartbeatNow, runIsolatedAgentJob: vi.fn(async () => ({ status: "ok" })), + onEvent: events.onEvent, }); await cron.start(); vi.setSystemTime(new Date("2025-12-13T00:00:01.000Z")); await vi.runOnlyPendingTimersAsync(); + await events.waitFor( + (evt) => evt.jobId === "job-1" && evt.action === "finished" && evt.status === "skipped", + ); expect(enqueueSystemEvent).not.toHaveBeenCalled(); expect(requestHeartbeatNow).not.toHaveBeenCalled(); - const jobs = await waitForJobs(cron, (items) => items[0]?.state.lastStatus === "skipped"); + const jobs = await cron.list({ includeDisabled: true }); expect(jobs[0]?.state.lastStatus).toBe("skipped"); expect(jobs[0]?.state.lastError).toMatch(/main job requires/i);