mirror of
https://github.com/openclaw/openclaw.git
synced 2026-02-19 18:39:20 -05:00
fix(cron): isolate schedule errors to prevent one bad job from breaking all jobs (#14385)
Previously, if one cron job had a malformed schedule expression (e.g. invalid cron syntax), the error would propagate up and break the entire scheduler loop. This meant one misconfigured job could prevent ALL cron jobs from running. Changes: - Wrap per-job schedule computation in try/catch in recomputeNextRuns() - Track consecutive schedule errors via new scheduleErrorCount field - Log warnings for schedule errors with job ID and name - Auto-disable jobs after 3 consecutive schedule errors (with error-level log) - Clear error count when schedule computation succeeds - Continue processing other jobs even when one fails This ensures the scheduler is resilient to individual job misconfigurations while still providing visibility into problems through logging. Co-authored-by: Marvin <numegilagent@gmail.com>
This commit is contained in:
189
src/cron/service/jobs.schedule-error-isolation.test.ts
Normal file
189
src/cron/service/jobs.schedule-error-isolation.test.ts
Normal file
@@ -0,0 +1,189 @@
|
||||
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
|
||||
import type { CronJob, CronStoreFile } from "../types.js";
|
||||
import type { CronServiceState } from "./state.js";
|
||||
import { recomputeNextRuns } from "./jobs.js";
|
||||
|
||||
function createMockState(jobs: CronJob[]): CronServiceState {
|
||||
const store: CronStoreFile = { version: 1, jobs };
|
||||
return {
|
||||
deps: {
|
||||
cronEnabled: true,
|
||||
nowMs: () => Date.now(),
|
||||
log: {
|
||||
debug: vi.fn(),
|
||||
info: vi.fn(),
|
||||
warn: vi.fn(),
|
||||
error: vi.fn(),
|
||||
},
|
||||
enqueueSystemEvent: vi.fn(),
|
||||
requestHeartbeatNow: vi.fn(),
|
||||
runHeartbeatOnce: vi.fn(),
|
||||
runIsolatedAgentJob: vi.fn(),
|
||||
onEvent: vi.fn(),
|
||||
persistence: {
|
||||
read: vi.fn(),
|
||||
write: vi.fn(),
|
||||
},
|
||||
},
|
||||
store,
|
||||
timer: null,
|
||||
running: false,
|
||||
} as unknown as CronServiceState;
|
||||
}
|
||||
|
||||
function createJob(overrides: Partial<CronJob> = {}): CronJob {
|
||||
return {
|
||||
id: "test-job-1",
|
||||
name: "Test Job",
|
||||
enabled: true,
|
||||
createdAtMs: Date.now() - 100_000,
|
||||
updatedAtMs: Date.now() - 100_000,
|
||||
schedule: { kind: "cron", expr: "0 * * * *" }, // Every hour
|
||||
sessionTarget: "main",
|
||||
wakeMode: "now",
|
||||
payload: { kind: "systemEvent", text: "test" },
|
||||
state: {},
|
||||
...overrides,
|
||||
};
|
||||
}
|
||||
|
||||
describe("cron schedule error isolation", () => {
|
||||
beforeEach(() => {
|
||||
vi.useFakeTimers();
|
||||
vi.setSystemTime(new Date("2025-01-15T10:30:00.000Z"));
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
vi.useRealTimers();
|
||||
});
|
||||
|
||||
it("continues processing other jobs when one has a malformed schedule", () => {
|
||||
const goodJob1 = createJob({ id: "good-1", name: "Good Job 1" });
|
||||
const badJob = createJob({
|
||||
id: "bad-job",
|
||||
name: "Bad Job",
|
||||
schedule: { kind: "cron", expr: "invalid cron expression" },
|
||||
});
|
||||
const goodJob2 = createJob({ id: "good-2", name: "Good Job 2" });
|
||||
|
||||
const state = createMockState([goodJob1, badJob, goodJob2]);
|
||||
|
||||
const changed = recomputeNextRuns(state);
|
||||
|
||||
expect(changed).toBe(true);
|
||||
// Good jobs should have their nextRunAtMs computed
|
||||
expect(goodJob1.state.nextRunAtMs).toBeDefined();
|
||||
expect(goodJob2.state.nextRunAtMs).toBeDefined();
|
||||
// Bad job should have undefined nextRunAtMs and an error recorded
|
||||
expect(badJob.state.nextRunAtMs).toBeUndefined();
|
||||
expect(badJob.state.lastError).toMatch(/schedule error/);
|
||||
expect(badJob.state.scheduleErrorCount).toBe(1);
|
||||
// Job should still be enabled after first error
|
||||
expect(badJob.enabled).toBe(true);
|
||||
});
|
||||
|
||||
it("logs a warning for the first schedule error", () => {
|
||||
const badJob = createJob({
|
||||
id: "bad-job",
|
||||
name: "Bad Job",
|
||||
schedule: { kind: "cron", expr: "not valid" },
|
||||
});
|
||||
const state = createMockState([badJob]);
|
||||
|
||||
recomputeNextRuns(state);
|
||||
|
||||
expect(state.deps.log.warn).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
jobId: "bad-job",
|
||||
name: "Bad Job",
|
||||
errorCount: 1,
|
||||
}),
|
||||
expect.stringContaining("failed to compute next run"),
|
||||
);
|
||||
});
|
||||
|
||||
it("auto-disables job after 3 consecutive schedule errors", () => {
|
||||
const badJob = createJob({
|
||||
id: "bad-job",
|
||||
name: "Bad Job",
|
||||
schedule: { kind: "cron", expr: "garbage" },
|
||||
state: { scheduleErrorCount: 2 }, // Already had 2 errors
|
||||
});
|
||||
const state = createMockState([badJob]);
|
||||
|
||||
recomputeNextRuns(state);
|
||||
|
||||
// After 3rd error, job should be disabled
|
||||
expect(badJob.enabled).toBe(false);
|
||||
expect(badJob.state.scheduleErrorCount).toBe(3);
|
||||
expect(state.deps.log.error).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
jobId: "bad-job",
|
||||
name: "Bad Job",
|
||||
errorCount: 3,
|
||||
}),
|
||||
expect.stringContaining("auto-disabled job"),
|
||||
);
|
||||
});
|
||||
|
||||
it("clears scheduleErrorCount when schedule computation succeeds", () => {
|
||||
const job = createJob({
|
||||
id: "recovering-job",
|
||||
name: "Recovering Job",
|
||||
schedule: { kind: "cron", expr: "0 * * * *" }, // Valid
|
||||
state: { scheduleErrorCount: 2 }, // Had previous errors
|
||||
});
|
||||
const state = createMockState([job]);
|
||||
|
||||
const changed = recomputeNextRuns(state);
|
||||
|
||||
expect(changed).toBe(true);
|
||||
expect(job.state.nextRunAtMs).toBeDefined();
|
||||
expect(job.state.scheduleErrorCount).toBeUndefined();
|
||||
});
|
||||
|
||||
it("does not modify disabled jobs", () => {
|
||||
const disabledBadJob = createJob({
|
||||
id: "disabled-bad",
|
||||
name: "Disabled Bad Job",
|
||||
enabled: false,
|
||||
schedule: { kind: "cron", expr: "invalid" },
|
||||
});
|
||||
const state = createMockState([disabledBadJob]);
|
||||
|
||||
recomputeNextRuns(state);
|
||||
|
||||
// Should not attempt to compute schedule for disabled jobs
|
||||
expect(disabledBadJob.state.scheduleErrorCount).toBeUndefined();
|
||||
expect(state.deps.log.warn).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("increments error count on each failed computation", () => {
|
||||
const badJob = createJob({
|
||||
id: "bad-job",
|
||||
name: "Bad Job",
|
||||
schedule: { kind: "cron", expr: "@@@@" },
|
||||
state: { scheduleErrorCount: 1 },
|
||||
});
|
||||
const state = createMockState([badJob]);
|
||||
|
||||
recomputeNextRuns(state);
|
||||
|
||||
expect(badJob.state.scheduleErrorCount).toBe(2);
|
||||
expect(badJob.enabled).toBe(true); // Not yet at threshold
|
||||
});
|
||||
|
||||
it("stores error message in lastError", () => {
|
||||
const badJob = createJob({
|
||||
id: "bad-job",
|
||||
name: "Bad Job",
|
||||
schedule: { kind: "cron", expr: "invalid expression here" },
|
||||
});
|
||||
const state = createMockState([badJob]);
|
||||
|
||||
recomputeNextRuns(state);
|
||||
|
||||
expect(badJob.state.lastError).toMatch(/^schedule error:/);
|
||||
expect(badJob.state.lastError).toBeTruthy();
|
||||
});
|
||||
});
|
||||
@@ -87,6 +87,9 @@ export function computeJobNextRunAtMs(job: CronJob, nowMs: number): number | und
|
||||
return computeNextRunAtMs(job.schedule, nowMs);
|
||||
}
|
||||
|
||||
/** Maximum consecutive schedule errors before auto-disabling a job. */
|
||||
const MAX_SCHEDULE_ERRORS = 3;
|
||||
|
||||
export function recomputeNextRuns(state: CronServiceState): boolean {
|
||||
if (!state.store) {
|
||||
return false;
|
||||
@@ -124,10 +127,36 @@ export function recomputeNextRuns(state: CronServiceState): boolean {
|
||||
const nextRun = job.state.nextRunAtMs;
|
||||
const isDueOrMissing = nextRun === undefined || now >= nextRun;
|
||||
if (isDueOrMissing) {
|
||||
const newNext = computeJobNextRunAtMs(job, now);
|
||||
if (job.state.nextRunAtMs !== newNext) {
|
||||
job.state.nextRunAtMs = newNext;
|
||||
try {
|
||||
const newNext = computeJobNextRunAtMs(job, now);
|
||||
if (job.state.nextRunAtMs !== newNext) {
|
||||
job.state.nextRunAtMs = newNext;
|
||||
changed = true;
|
||||
}
|
||||
// Clear schedule error count on successful computation.
|
||||
if (job.state.scheduleErrorCount) {
|
||||
job.state.scheduleErrorCount = undefined;
|
||||
changed = true;
|
||||
}
|
||||
} catch (err) {
|
||||
const errorCount = (job.state.scheduleErrorCount ?? 0) + 1;
|
||||
job.state.scheduleErrorCount = errorCount;
|
||||
job.state.nextRunAtMs = undefined;
|
||||
job.state.lastError = `schedule error: ${String(err)}`;
|
||||
changed = true;
|
||||
|
||||
if (errorCount >= MAX_SCHEDULE_ERRORS) {
|
||||
job.enabled = false;
|
||||
state.deps.log.error(
|
||||
{ jobId: job.id, name: job.name, errorCount, err: String(err) },
|
||||
"cron: auto-disabled job after repeated schedule errors",
|
||||
);
|
||||
} else {
|
||||
state.deps.log.warn(
|
||||
{ jobId: job.id, name: job.name, errorCount, err: String(err) },
|
||||
"cron: failed to compute next run for job (skipping)",
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -61,6 +61,8 @@ export type CronJobState = {
|
||||
lastDurationMs?: number;
|
||||
/** Number of consecutive execution errors (reset on success). Used for backoff. */
|
||||
consecutiveErrors?: number;
|
||||
/** Number of consecutive schedule computation errors. Auto-disables job after threshold. */
|
||||
scheduleErrorCount?: number;
|
||||
};
|
||||
|
||||
export type CronJob = {
|
||||
|
||||
Reference in New Issue
Block a user