refactor(cron): share runnable job filter

This commit is contained in:
Peter Steinberger
2026-02-16 01:29:01 +00:00
parent 2679089e9e
commit a73e7786e7

View File

@@ -342,19 +342,54 @@ function findDueJobs(state: CronServiceState): CronJob[] {
return [];
}
const now = state.deps.nowMs();
return state.store.jobs.filter((j) => {
if (!j.state) {
j.state = {};
}
if (!j.enabled) {
return false;
}
if (typeof j.state.runningAtMs === "number") {
return false;
}
const next = j.state.nextRunAtMs;
return typeof next === "number" && now >= next;
});
return collectRunnableJobs(state, now);
}
function isRunnableJob(params: {
job: CronJob;
nowMs: number;
skipJobIds?: ReadonlySet<string>;
skipAtIfAlreadyRan?: boolean;
}): boolean {
const { job, nowMs } = params;
if (!job.state) {
job.state = {};
}
if (!job.enabled) {
return false;
}
if (params.skipJobIds?.has(job.id)) {
return false;
}
if (typeof job.state.runningAtMs === "number") {
return false;
}
if (params.skipAtIfAlreadyRan && job.schedule.kind === "at" && job.state.lastStatus) {
// Any terminal status (ok, error, skipped) means the job already ran at least once.
// Don't re-fire it on restart — applyJobResult disables one-shot jobs, but guard
// here defensively (#13845).
return false;
}
const next = job.state.nextRunAtMs;
return typeof next === "number" && nowMs >= next;
}
function collectRunnableJobs(
state: CronServiceState,
nowMs: number,
opts?: { skipJobIds?: ReadonlySet<string>; skipAtIfAlreadyRan?: boolean },
): CronJob[] {
if (!state.store) {
return [];
}
return state.store.jobs.filter((job) =>
isRunnableJob({
job,
nowMs,
skipJobIds: opts?.skipJobIds,
skipAtIfAlreadyRan: opts?.skipAtIfAlreadyRan,
}),
);
}
export async function runMissedJobs(
@@ -366,28 +401,7 @@ export async function runMissedJobs(
}
const now = state.deps.nowMs();
const skipJobIds = opts?.skipJobIds;
const missed = state.store.jobs.filter((j) => {
if (!j.state) {
j.state = {};
}
if (!j.enabled) {
return false;
}
if (skipJobIds?.has(j.id)) {
return false;
}
if (typeof j.state.runningAtMs === "number") {
return false;
}
const next = j.state.nextRunAtMs;
if (j.schedule.kind === "at" && j.state.lastStatus) {
// Any terminal status (ok, error, skipped) means the job already
// ran at least once. Don't re-fire it on restart — applyJobResult
// disables one-shot jobs, but guard here defensively (#13845).
return false;
}
return typeof next === "number" && now >= next;
});
const missed = collectRunnableJobs(state, now, { skipJobIds, skipAtIfAlreadyRan: true });
if (missed.length > 0) {
state.deps.log.info(
@@ -405,19 +419,7 @@ export async function runDueJobs(state: CronServiceState) {
return;
}
const now = state.deps.nowMs();
const due = state.store.jobs.filter((j) => {
if (!j.state) {
j.state = {};
}
if (!j.enabled) {
return false;
}
if (typeof j.state.runningAtMs === "number") {
return false;
}
const next = j.state.nextRunAtMs;
return typeof next === "number" && now >= next;
});
const due = collectRunnableJobs(state, now);
for (const job of due) {
await executeJob(state, job, now, { forced: false });
}