mirror of
https://github.com/openclaw/openclaw.git
synced 2026-02-19 18:39:20 -05:00
perf(test): stop polling cron job list
This commit is contained in:
@@ -2,7 +2,7 @@ import fs from "node:fs/promises";
|
||||
import path from "node:path";
|
||||
import { describe, expect, it, vi } from "vitest";
|
||||
import type { HeartbeatRunResult } from "../infra/heartbeat-wake.js";
|
||||
import type { CronJob } from "./types.js";
|
||||
import type { CronEvent } from "./service.js";
|
||||
import { CronService } from "./service.js";
|
||||
import {
|
||||
createCronStoreHarness,
|
||||
@@ -14,16 +14,46 @@ const noopLogger = createNoopLogger();
|
||||
const { makeStorePath } = createCronStoreHarness();
|
||||
installCronTestHooks({ logger: noopLogger });
|
||||
|
||||
async function waitForJobs(cron: CronService, predicate: (jobs: CronJob[]) => boolean) {
|
||||
let latest: CronJob[] = [];
|
||||
for (let i = 0; i < 30; i++) {
|
||||
latest = await cron.list({ includeDisabled: true });
|
||||
if (predicate(latest)) {
|
||||
return latest;
|
||||
function createDeferred<T>() {
|
||||
let resolve!: (value: T) => void;
|
||||
let reject!: (reason?: unknown) => void;
|
||||
const promise = new Promise<T>((res, rej) => {
|
||||
resolve = res;
|
||||
reject = rej;
|
||||
});
|
||||
return { promise, resolve, reject };
|
||||
}
|
||||
|
||||
function createCronEventHarness() {
|
||||
const events: CronEvent[] = [];
|
||||
const waiters: Array<{
|
||||
predicate: (evt: CronEvent) => boolean;
|
||||
deferred: ReturnType<typeof createDeferred<CronEvent>>;
|
||||
}> = [];
|
||||
|
||||
const onEvent = (evt: CronEvent) => {
|
||||
events.push(evt);
|
||||
for (let i = waiters.length - 1; i >= 0; i -= 1) {
|
||||
const waiter = waiters[i];
|
||||
if (waiter && waiter.predicate(evt)) {
|
||||
waiters.splice(i, 1);
|
||||
waiter.deferred.resolve(evt);
|
||||
}
|
||||
}
|
||||
await vi.runOnlyPendingTimersAsync();
|
||||
}
|
||||
return latest;
|
||||
};
|
||||
|
||||
const waitFor = (predicate: (evt: CronEvent) => boolean) => {
|
||||
for (const evt of events) {
|
||||
if (predicate(evt)) {
|
||||
return Promise.resolve(evt);
|
||||
}
|
||||
}
|
||||
const deferred = createDeferred<CronEvent>();
|
||||
waiters.push({ predicate, deferred });
|
||||
return deferred.promise;
|
||||
};
|
||||
|
||||
return { onEvent, waitFor, events };
|
||||
}
|
||||
|
||||
describe("CronService", () => {
|
||||
@@ -31,6 +61,7 @@ describe("CronService", () => {
|
||||
const store = await makeStorePath();
|
||||
const enqueueSystemEvent = vi.fn();
|
||||
const requestHeartbeatNow = vi.fn();
|
||||
const events = createCronEventHarness();
|
||||
|
||||
const cron = new CronService({
|
||||
storePath: store.storePath,
|
||||
@@ -39,6 +70,7 @@ describe("CronService", () => {
|
||||
enqueueSystemEvent,
|
||||
requestHeartbeatNow,
|
||||
runIsolatedAgentJob: vi.fn(async () => ({ status: "ok" })),
|
||||
onEvent: events.onEvent,
|
||||
});
|
||||
|
||||
await cron.start();
|
||||
@@ -57,10 +89,9 @@ describe("CronService", () => {
|
||||
|
||||
vi.setSystemTime(new Date("2025-12-13T00:00:02.000Z"));
|
||||
await vi.runOnlyPendingTimersAsync();
|
||||
await events.waitFor((evt) => evt.jobId === job.id && evt.action === "finished");
|
||||
|
||||
const jobs = await waitForJobs(cron, (items) =>
|
||||
items.some((item) => item.id === job.id && !item.enabled),
|
||||
);
|
||||
const jobs = await cron.list({ includeDisabled: true });
|
||||
const updated = jobs.find((j) => j.id === job.id);
|
||||
expect(updated?.enabled).toBe(false);
|
||||
expect(enqueueSystemEvent).toHaveBeenCalledWith("hello", {
|
||||
@@ -77,6 +108,7 @@ describe("CronService", () => {
|
||||
const store = await makeStorePath();
|
||||
const enqueueSystemEvent = vi.fn();
|
||||
const requestHeartbeatNow = vi.fn();
|
||||
const events = createCronEventHarness();
|
||||
|
||||
const cron = new CronService({
|
||||
storePath: store.storePath,
|
||||
@@ -85,6 +117,7 @@ describe("CronService", () => {
|
||||
enqueueSystemEvent,
|
||||
requestHeartbeatNow,
|
||||
runIsolatedAgentJob: vi.fn(async () => ({ status: "ok" })),
|
||||
onEvent: events.onEvent,
|
||||
});
|
||||
|
||||
await cron.start();
|
||||
@@ -100,8 +133,9 @@ describe("CronService", () => {
|
||||
|
||||
vi.setSystemTime(new Date("2025-12-13T00:00:02.000Z"));
|
||||
await vi.runOnlyPendingTimersAsync();
|
||||
await events.waitFor((evt) => evt.jobId === job.id && evt.action === "removed");
|
||||
|
||||
const jobs = await waitForJobs(cron, (items) => !items.some((item) => item.id === job.id));
|
||||
const jobs = await cron.list({ includeDisabled: true });
|
||||
expect(jobs.find((j) => j.id === job.id)).toBeUndefined();
|
||||
expect(enqueueSystemEvent).toHaveBeenCalledWith("hello", {
|
||||
agentId: undefined,
|
||||
@@ -282,6 +316,7 @@ describe("CronService", () => {
|
||||
status: "ok" as const,
|
||||
summary: "done",
|
||||
}));
|
||||
const events = createCronEventHarness();
|
||||
|
||||
const cron = new CronService({
|
||||
storePath: store.storePath,
|
||||
@@ -290,11 +325,12 @@ describe("CronService", () => {
|
||||
enqueueSystemEvent,
|
||||
requestHeartbeatNow,
|
||||
runIsolatedAgentJob,
|
||||
onEvent: events.onEvent,
|
||||
});
|
||||
|
||||
await cron.start();
|
||||
const atMs = Date.parse("2025-12-13T00:00:01.000Z");
|
||||
await cron.add({
|
||||
const job = await cron.add({
|
||||
enabled: true,
|
||||
name: "weekly",
|
||||
schedule: { kind: "at", at: new Date(atMs).toISOString() },
|
||||
@@ -307,7 +343,9 @@ describe("CronService", () => {
|
||||
vi.setSystemTime(new Date("2025-12-13T00:00:01.000Z"));
|
||||
await vi.runOnlyPendingTimersAsync();
|
||||
|
||||
await waitForJobs(cron, (items) => items.some((item) => item.state.lastStatus === "ok"));
|
||||
await events.waitFor(
|
||||
(evt) => evt.jobId === job.id && evt.action === "finished" && evt.status === "ok",
|
||||
);
|
||||
expect(runIsolatedAgentJob).toHaveBeenCalledTimes(1);
|
||||
expect(enqueueSystemEvent).toHaveBeenCalledWith("Cron: done", {
|
||||
agentId: undefined,
|
||||
@@ -326,6 +364,7 @@ describe("CronService", () => {
|
||||
summary: "done",
|
||||
delivered: true,
|
||||
}));
|
||||
const events = createCronEventHarness();
|
||||
|
||||
const cron = new CronService({
|
||||
storePath: store.storePath,
|
||||
@@ -334,11 +373,12 @@ describe("CronService", () => {
|
||||
enqueueSystemEvent,
|
||||
requestHeartbeatNow,
|
||||
runIsolatedAgentJob,
|
||||
onEvent: events.onEvent,
|
||||
});
|
||||
|
||||
await cron.start();
|
||||
const atMs = Date.parse("2025-12-13T00:00:01.000Z");
|
||||
await cron.add({
|
||||
const job = await cron.add({
|
||||
enabled: true,
|
||||
name: "weekly delivered",
|
||||
schedule: { kind: "at", at: new Date(atMs).toISOString() },
|
||||
@@ -351,7 +391,9 @@ describe("CronService", () => {
|
||||
vi.setSystemTime(new Date("2025-12-13T00:00:01.000Z"));
|
||||
await vi.runOnlyPendingTimersAsync();
|
||||
|
||||
await waitForJobs(cron, (items) => items.some((item) => item.state.lastStatus === "ok"));
|
||||
await events.waitFor(
|
||||
(evt) => evt.jobId === job.id && evt.action === "finished" && evt.status === "ok",
|
||||
);
|
||||
expect(runIsolatedAgentJob).toHaveBeenCalledTimes(1);
|
||||
expect(enqueueSystemEvent).not.toHaveBeenCalled();
|
||||
expect(requestHeartbeatNow).not.toHaveBeenCalled();
|
||||
@@ -473,6 +515,7 @@ describe("CronService", () => {
|
||||
summary: "last output",
|
||||
error: "boom",
|
||||
}));
|
||||
const events = createCronEventHarness();
|
||||
|
||||
const cron = new CronService({
|
||||
storePath: store.storePath,
|
||||
@@ -481,11 +524,12 @@ describe("CronService", () => {
|
||||
enqueueSystemEvent,
|
||||
requestHeartbeatNow,
|
||||
runIsolatedAgentJob,
|
||||
onEvent: events.onEvent,
|
||||
});
|
||||
|
||||
await cron.start();
|
||||
const atMs = Date.parse("2025-12-13T00:00:01.000Z");
|
||||
await cron.add({
|
||||
const job = await cron.add({
|
||||
name: "isolated error test",
|
||||
enabled: true,
|
||||
schedule: { kind: "at", at: new Date(atMs).toISOString() },
|
||||
@@ -497,7 +541,9 @@ describe("CronService", () => {
|
||||
|
||||
vi.setSystemTime(new Date("2025-12-13T00:00:01.000Z"));
|
||||
await vi.runOnlyPendingTimersAsync();
|
||||
await waitForJobs(cron, (items) => items.some((item) => item.state.lastStatus === "error"));
|
||||
await events.waitFor(
|
||||
(evt) => evt.jobId === job.id && evt.action === "finished" && evt.status === "error",
|
||||
);
|
||||
|
||||
expect(enqueueSystemEvent).toHaveBeenCalledWith("Cron (error): last output", {
|
||||
agentId: undefined,
|
||||
@@ -551,6 +597,7 @@ describe("CronService", () => {
|
||||
const store = await makeStorePath();
|
||||
const enqueueSystemEvent = vi.fn();
|
||||
const requestHeartbeatNow = vi.fn();
|
||||
const events = createCronEventHarness();
|
||||
|
||||
const atMs = Date.parse("2025-12-13T00:00:01.000Z");
|
||||
await fs.mkdir(path.dirname(store.storePath), { recursive: true });
|
||||
@@ -581,17 +628,21 @@ describe("CronService", () => {
|
||||
enqueueSystemEvent,
|
||||
requestHeartbeatNow,
|
||||
runIsolatedAgentJob: vi.fn(async () => ({ status: "ok" })),
|
||||
onEvent: events.onEvent,
|
||||
});
|
||||
|
||||
await cron.start();
|
||||
|
||||
vi.setSystemTime(new Date("2025-12-13T00:00:01.000Z"));
|
||||
await vi.runOnlyPendingTimersAsync();
|
||||
await events.waitFor(
|
||||
(evt) => evt.jobId === "job-1" && evt.action === "finished" && evt.status === "skipped",
|
||||
);
|
||||
|
||||
expect(enqueueSystemEvent).not.toHaveBeenCalled();
|
||||
expect(requestHeartbeatNow).not.toHaveBeenCalled();
|
||||
|
||||
const jobs = await waitForJobs(cron, (items) => items[0]?.state.lastStatus === "skipped");
|
||||
const jobs = await cron.list({ includeDisabled: true });
|
||||
expect(jobs[0]?.state.lastStatus).toBe("skipped");
|
||||
expect(jobs[0]?.state.lastError).toMatch(/main job requires/i);
|
||||
|
||||
|
||||
Reference in New Issue
Block a user