From a73e7786e7dc63c239234cee7f64af5c2870cbb5 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Mon, 16 Feb 2026 01:29:01 +0000 Subject: [PATCH] refactor(cron): share runnable job filter --- src/cron/service/timer.ts | 98 ++++++++++++++++++++------------------- 1 file changed, 50 insertions(+), 48 deletions(-) diff --git a/src/cron/service/timer.ts b/src/cron/service/timer.ts index 3c18a5e03f..8526785526 100644 --- a/src/cron/service/timer.ts +++ b/src/cron/service/timer.ts @@ -342,19 +342,54 @@ function findDueJobs(state: CronServiceState): CronJob[] { return []; } const now = state.deps.nowMs(); - return state.store.jobs.filter((j) => { - if (!j.state) { - j.state = {}; - } - if (!j.enabled) { - return false; - } - if (typeof j.state.runningAtMs === "number") { - return false; - } - const next = j.state.nextRunAtMs; - return typeof next === "number" && now >= next; - }); + return collectRunnableJobs(state, now); +} + +function isRunnableJob(params: { + job: CronJob; + nowMs: number; + skipJobIds?: ReadonlySet; + skipAtIfAlreadyRan?: boolean; +}): boolean { + const { job, nowMs } = params; + if (!job.state) { + job.state = {}; + } + if (!job.enabled) { + return false; + } + if (params.skipJobIds?.has(job.id)) { + return false; + } + if (typeof job.state.runningAtMs === "number") { + return false; + } + if (params.skipAtIfAlreadyRan && job.schedule.kind === "at" && job.state.lastStatus) { + // Any terminal status (ok, error, skipped) means the job already ran at least once. + // Don't re-fire it on restart — applyJobResult disables one-shot jobs, but guard + // here defensively (#13845). + return false; + } + const next = job.state.nextRunAtMs; + return typeof next === "number" && nowMs >= next; +} + +function collectRunnableJobs( + state: CronServiceState, + nowMs: number, + opts?: { skipJobIds?: ReadonlySet; skipAtIfAlreadyRan?: boolean }, +): CronJob[] { + if (!state.store) { + return []; + } + return state.store.jobs.filter((job) => + isRunnableJob({ + job, + nowMs, + skipJobIds: opts?.skipJobIds, + skipAtIfAlreadyRan: opts?.skipAtIfAlreadyRan, + }), + ); } export async function runMissedJobs( @@ -366,28 +401,7 @@ export async function runMissedJobs( } const now = state.deps.nowMs(); const skipJobIds = opts?.skipJobIds; - const missed = state.store.jobs.filter((j) => { - if (!j.state) { - j.state = {}; - } - if (!j.enabled) { - return false; - } - if (skipJobIds?.has(j.id)) { - return false; - } - if (typeof j.state.runningAtMs === "number") { - return false; - } - const next = j.state.nextRunAtMs; - if (j.schedule.kind === "at" && j.state.lastStatus) { - // Any terminal status (ok, error, skipped) means the job already - // ran at least once. Don't re-fire it on restart — applyJobResult - // disables one-shot jobs, but guard here defensively (#13845). - return false; - } - return typeof next === "number" && now >= next; - }); + const missed = collectRunnableJobs(state, now, { skipJobIds, skipAtIfAlreadyRan: true }); if (missed.length > 0) { state.deps.log.info( @@ -405,19 +419,7 @@ export async function runDueJobs(state: CronServiceState) { return; } const now = state.deps.nowMs(); - const due = state.store.jobs.filter((j) => { - if (!j.state) { - j.state = {}; - } - if (!j.enabled) { - return false; - } - if (typeof j.state.runningAtMs === "number") { - return false; - } - const next = j.state.nextRunAtMs; - return typeof next === "number" && now >= next; - }); + const due = collectRunnableJobs(state, now); for (const job of due) { await executeJob(state, job, now, { forced: false }); }