fix(schedules): release lastQueuedAt lock on all exit paths to prevent stuck schedules

Multiple error/early-return paths in executeScheduleJob and executeJobInline
were exiting without clearing lastQueuedAt, causing the dueFilter to permanently
skip those schedules — resulting in stale "X hours ago" display for nextRunAt.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
waleed
2026-03-11 16:45:03 -07:00
parent 6fd871268e
commit 511e3a9011
2 changed files with 54 additions and 14 deletions

View File

@@ -5,7 +5,11 @@ import { type NextRequest, NextResponse } from 'next/server'
import { verifyCronAuth } from '@/lib/auth/internal'
import { getJobQueue, shouldExecuteInline } from '@/lib/core/async-jobs'
import { generateRequestId } from '@/lib/core/utils/request'
import { executeJobInline, executeScheduleJob } from '@/background/schedule-execution'
import {
executeJobInline,
executeScheduleJob,
releaseScheduleLock,
} from '@/background/schedule-execution'
export const dynamic = 'force-dynamic'
@@ -150,6 +154,12 @@ export async function GET(request: NextRequest) {
logger.error(`[${requestId}] Job execution failed for ${job.id}`, {
error: error instanceof Error ? error.message : String(error),
})
await releaseScheduleLock(
job.id,
requestId,
queuedAt,
`Failed to release lock for job ${job.id}`
)
}
})()
})

View File

@@ -51,7 +51,7 @@ async function applyScheduleUpdate(
}
}
async function releaseScheduleLock(
export async function releaseScheduleLock(
scheduleId: string,
requestId: string,
now: Date,
@@ -400,6 +400,7 @@ export async function executeScheduleJob(payload: ScheduleExecutionPayload) {
{
updatedAt: now,
nextRunAt: nextRetryAt,
lastQueuedAt: null,
},
requestId,
`Error updating schedule ${payload.scheduleId} for rate limit`
@@ -409,18 +410,19 @@ export async function executeScheduleJob(payload: ScheduleExecutionPayload) {
case 402: {
logger.warn(`[${requestId}] Usage limit exceeded, scheduling next run`)
const nextRunAt = await calculateNextRunFromDeployment(payload, requestId)
if (nextRunAt) {
await applyScheduleUpdate(
payload.scheduleId,
{
updatedAt: now,
nextRunAt,
},
requestId,
`Error updating schedule ${payload.scheduleId} after usage limit check`
)
}
const nextRunAt =
(await calculateNextRunFromDeployment(payload, requestId)) ??
new Date(now.getTime() + 60 * 60 * 1000)
await applyScheduleUpdate(
payload.scheduleId,
{
updatedAt: now,
lastQueuedAt: null,
nextRunAt,
},
requestId,
`Error updating schedule ${payload.scheduleId} after usage limit check`
)
return
}
@@ -440,6 +442,7 @@ export async function executeScheduleJob(payload: ScheduleExecutionPayload) {
payload.scheduleId,
{
updatedAt: now,
lastQueuedAt: null,
nextRunAt,
failedCount: newFailedCount,
lastFailedAt: now,
@@ -456,6 +459,12 @@ export async function executeScheduleJob(payload: ScheduleExecutionPayload) {
const { actorUserId, workflowRecord } = preprocessResult
if (!actorUserId || !workflowRecord) {
logger.error(`[${requestId}] Missing required preprocessing data`)
await releaseScheduleLock(
payload.scheduleId,
requestId,
now,
`Failed to release schedule ${payload.scheduleId} after missing preprocessing data`
)
return
}
@@ -519,6 +528,7 @@ export async function executeScheduleJob(payload: ScheduleExecutionPayload) {
payload.scheduleId,
{
updatedAt: now,
lastQueuedAt: null,
nextRunAt,
failedCount: newFailedCount,
lastFailedAt: now,
@@ -540,6 +550,7 @@ export async function executeScheduleJob(payload: ScheduleExecutionPayload) {
payload.scheduleId,
{
updatedAt: now,
lastQueuedAt: null,
nextRunAt: nextRetryAt,
},
requestId,
@@ -564,6 +575,7 @@ export async function executeScheduleJob(payload: ScheduleExecutionPayload) {
payload.scheduleId,
{
updatedAt: now,
lastQueuedAt: null,
nextRunAt,
failedCount: newFailedCount,
lastFailedAt: now,
@@ -575,6 +587,12 @@ export async function executeScheduleJob(payload: ScheduleExecutionPayload) {
}
} catch (error: unknown) {
logger.error(`[${requestId}] Error processing schedule ${payload.scheduleId}`, error)
await releaseScheduleLock(
payload.scheduleId,
requestId,
now,
`Failed to release schedule ${payload.scheduleId} after unhandled error`
)
}
}
@@ -777,6 +795,12 @@ export async function executeJobInline(payload: JobExecutionPayload) {
logger.error(`[${requestId}] Job record missing required fields`, {
scheduleId: payload.scheduleId,
})
await releaseScheduleLock(
payload.scheduleId,
requestId,
now,
`Failed to release job ${payload.scheduleId} after missing fields`
)
return
}
@@ -784,6 +808,12 @@ export async function executeJobInline(payload: JobExecutionPayload) {
logger.info(`[${requestId}] Job already completed, skipping`, {
scheduleId: payload.scheduleId,
})
await releaseScheduleLock(
payload.scheduleId,
requestId,
now,
`Failed to release job ${payload.scheduleId} after completed skip`
)
return
}