fix(schedules): locking schedules to prevent double runs (#1854)

* fix(schedules): locking schedules to prevent double runs

* add migration file

* fix
This commit is contained in:
Vikhyath Mondreti
2025-11-07 19:45:25 -08:00
committed by GitHub
parent e91a8af7cd
commit 7a8d47a72e
7 changed files with 8451 additions and 584 deletions

View File

@@ -53,14 +53,14 @@ describe('Scheduled Workflow Execution API Route', () => {
and: vi.fn((...conditions) => ({ type: 'and', conditions })),
eq: vi.fn((field, value) => ({ field, value, type: 'eq' })),
lte: vi.fn((field, value) => ({ field, value, type: 'lte' })),
lt: vi.fn((field, value) => ({ field, value, type: 'lt' })),
not: vi.fn((condition) => ({ type: 'not', condition })),
isNull: vi.fn((field) => ({ type: 'isNull', field })),
or: vi.fn((...conditions) => ({ type: 'or', conditions })),
}))
vi.doMock('@sim/db', () => {
const mockDb = {
select: vi.fn().mockImplementation(() => ({
from: vi.fn().mockImplementation(() => ({
where: vi.fn().mockImplementation(() => [
const returningSchedules = [
{
id: 'schedule-1',
workflowId: 'workflow-1',
@@ -68,15 +68,31 @@ describe('Scheduled Workflow Execution API Route', () => {
cronExpression: null,
lastRanAt: null,
failedCount: 0,
nextRunAt: new Date('2025-01-01T00:00:00.000Z'),
lastQueuedAt: undefined,
},
]),
})),
})),
}
]
const mockReturning = vi.fn().mockReturnValue(returningSchedules)
const mockWhere = vi.fn().mockReturnValue({ returning: mockReturning })
const mockSet = vi.fn().mockReturnValue({ where: mockWhere })
const mockUpdate = vi.fn().mockReturnValue({ set: mockSet })
return {
db: mockDb,
workflowSchedule: {},
db: {
update: mockUpdate,
},
workflowSchedule: {
id: 'id',
workflowId: 'workflowId',
blockId: 'blockId',
cronExpression: 'cronExpression',
lastRanAt: 'lastRanAt',
failedCount: 'failedCount',
status: 'status',
nextRunAt: 'nextRunAt',
lastQueuedAt: 'lastQueuedAt',
},
}
})
@@ -114,14 +130,14 @@ describe('Scheduled Workflow Execution API Route', () => {
and: vi.fn((...conditions) => ({ type: 'and', conditions })),
eq: vi.fn((field, value) => ({ field, value, type: 'eq' })),
lte: vi.fn((field, value) => ({ field, value, type: 'lte' })),
lt: vi.fn((field, value) => ({ field, value, type: 'lt' })),
not: vi.fn((condition) => ({ type: 'not', condition })),
isNull: vi.fn((field) => ({ type: 'isNull', field })),
or: vi.fn((...conditions) => ({ type: 'or', conditions })),
}))
vi.doMock('@sim/db', () => {
const mockDb = {
select: vi.fn().mockImplementation(() => ({
from: vi.fn().mockImplementation(() => ({
where: vi.fn().mockImplementation(() => [
const returningSchedules = [
{
id: 'schedule-1',
workflowId: 'workflow-1',
@@ -129,15 +145,31 @@ describe('Scheduled Workflow Execution API Route', () => {
cronExpression: null,
lastRanAt: null,
failedCount: 0,
nextRunAt: new Date('2025-01-01T00:00:00.000Z'),
lastQueuedAt: undefined,
},
]),
})),
})),
}
]
const mockReturning = vi.fn().mockReturnValue(returningSchedules)
const mockWhere = vi.fn().mockReturnValue({ returning: mockReturning })
const mockSet = vi.fn().mockReturnValue({ where: mockWhere })
const mockUpdate = vi.fn().mockReturnValue({ set: mockSet })
return {
db: mockDb,
workflowSchedule: {},
db: {
update: mockUpdate,
},
workflowSchedule: {
id: 'id',
workflowId: 'workflowId',
blockId: 'blockId',
cronExpression: 'cronExpression',
lastRanAt: 'lastRanAt',
failedCount: 'failedCount',
status: 'status',
nextRunAt: 'nextRunAt',
lastQueuedAt: 'lastQueuedAt',
},
}
})
@@ -170,21 +202,33 @@ describe('Scheduled Workflow Execution API Route', () => {
and: vi.fn((...conditions) => ({ type: 'and', conditions })),
eq: vi.fn((field, value) => ({ field, value, type: 'eq' })),
lte: vi.fn((field, value) => ({ field, value, type: 'lte' })),
lt: vi.fn((field, value) => ({ field, value, type: 'lt' })),
not: vi.fn((condition) => ({ type: 'not', condition })),
isNull: vi.fn((field) => ({ type: 'isNull', field })),
or: vi.fn((...conditions) => ({ type: 'or', conditions })),
}))
vi.doMock('@sim/db', () => {
const mockDb = {
select: vi.fn().mockImplementation(() => ({
from: vi.fn().mockImplementation(() => ({
where: vi.fn().mockImplementation(() => []),
})),
})),
}
const mockReturning = vi.fn().mockReturnValue([])
const mockWhere = vi.fn().mockReturnValue({ returning: mockReturning })
const mockSet = vi.fn().mockReturnValue({ where: mockWhere })
const mockUpdate = vi.fn().mockReturnValue({ set: mockSet })
return {
db: mockDb,
workflowSchedule: {},
db: {
update: mockUpdate,
},
workflowSchedule: {
id: 'id',
workflowId: 'workflowId',
blockId: 'blockId',
cronExpression: 'cronExpression',
lastRanAt: 'lastRanAt',
failedCount: 'failedCount',
status: 'status',
nextRunAt: 'nextRunAt',
lastQueuedAt: 'lastQueuedAt',
},
}
})
@@ -217,14 +261,14 @@ describe('Scheduled Workflow Execution API Route', () => {
and: vi.fn((...conditions) => ({ type: 'and', conditions })),
eq: vi.fn((field, value) => ({ field, value, type: 'eq' })),
lte: vi.fn((field, value) => ({ field, value, type: 'lte' })),
lt: vi.fn((field, value) => ({ field, value, type: 'lt' })),
not: vi.fn((condition) => ({ type: 'not', condition })),
isNull: vi.fn((field) => ({ type: 'isNull', field })),
or: vi.fn((...conditions) => ({ type: 'or', conditions })),
}))
vi.doMock('@sim/db', () => {
const mockDb = {
select: vi.fn().mockImplementation(() => ({
from: vi.fn().mockImplementation(() => ({
where: vi.fn().mockImplementation(() => [
const returningSchedules = [
{
id: 'schedule-1',
workflowId: 'workflow-1',
@@ -232,6 +276,8 @@ describe('Scheduled Workflow Execution API Route', () => {
cronExpression: null,
lastRanAt: null,
failedCount: 0,
nextRunAt: new Date('2025-01-01T00:00:00.000Z'),
lastQueuedAt: undefined,
},
{
id: 'schedule-2',
@@ -240,15 +286,31 @@ describe('Scheduled Workflow Execution API Route', () => {
cronExpression: null,
lastRanAt: null,
failedCount: 0,
nextRunAt: new Date('2025-01-01T01:00:00.000Z'),
lastQueuedAt: undefined,
},
]),
})),
})),
}
]
const mockReturning = vi.fn().mockReturnValue(returningSchedules)
const mockWhere = vi.fn().mockReturnValue({ returning: mockReturning })
const mockSet = vi.fn().mockReturnValue({ where: mockWhere })
const mockUpdate = vi.fn().mockReturnValue({ set: mockSet })
return {
db: mockDb,
workflowSchedule: {},
db: {
update: mockUpdate,
},
workflowSchedule: {
id: 'id',
workflowId: 'workflowId',
blockId: 'blockId',
cronExpression: 'cronExpression',
lastRanAt: 'lastRanAt',
failedCount: 'failedCount',
status: 'status',
nextRunAt: 'nextRunAt',
lastQueuedAt: 'lastQueuedAt',
},
}
})

View File

@@ -1,6 +1,6 @@
import { db, workflowSchedule } from '@sim/db'
import { tasks } from '@trigger.dev/sdk'
import { and, eq, lte, not } from 'drizzle-orm'
import { and, eq, isNull, lt, lte, not, or } from 'drizzle-orm'
import { type NextRequest, NextResponse } from 'next/server'
import { verifyCronAuth } from '@/lib/auth/internal'
import { env, isTruthy } from '@/lib/env'
@@ -21,15 +21,35 @@ export async function GET(request: NextRequest) {
return authError
}
const now = new Date()
const queuedAt = new Date()
try {
const dueSchedules = await db
.select()
.from(workflowSchedule)
.update(workflowSchedule)
.set({
lastQueuedAt: queuedAt,
updatedAt: queuedAt,
})
.where(
and(lte(workflowSchedule.nextRunAt, now), not(eq(workflowSchedule.status, 'disabled')))
and(
lte(workflowSchedule.nextRunAt, queuedAt),
not(eq(workflowSchedule.status, 'disabled')),
or(
isNull(workflowSchedule.lastQueuedAt),
lt(workflowSchedule.lastQueuedAt, workflowSchedule.nextRunAt)
)
)
)
.returning({
id: workflowSchedule.id,
workflowId: workflowSchedule.workflowId,
blockId: workflowSchedule.blockId,
cronExpression: workflowSchedule.cronExpression,
lastRanAt: workflowSchedule.lastRanAt,
failedCount: workflowSchedule.failedCount,
nextRunAt: workflowSchedule.nextRunAt,
lastQueuedAt: workflowSchedule.lastQueuedAt,
})
logger.debug(`[${requestId}] Successfully queried schedules: ${dueSchedules.length} found`)
logger.info(`[${requestId}] Processing ${dueSchedules.length} due scheduled workflows`)
@@ -38,6 +58,8 @@ export async function GET(request: NextRequest) {
if (useTrigger) {
const triggerPromises = dueSchedules.map(async (schedule) => {
const queueTime = schedule.lastQueuedAt ?? queuedAt
try {
const payload = {
scheduleId: schedule.id,
@@ -46,7 +68,8 @@ export async function GET(request: NextRequest) {
cronExpression: schedule.cronExpression || undefined,
lastRanAt: schedule.lastRanAt?.toISOString(),
failedCount: schedule.failedCount || 0,
now: now.toISOString(),
now: queueTime.toISOString(),
scheduledFor: schedule.nextRunAt?.toISOString(),
}
const handle = await tasks.trigger('schedule-execution', payload)
@@ -68,6 +91,8 @@ export async function GET(request: NextRequest) {
logger.info(`[${requestId}] Queued ${dueSchedules.length} schedule executions to Trigger.dev`)
} else {
const directExecutionPromises = dueSchedules.map(async (schedule) => {
const queueTime = schedule.lastQueuedAt ?? queuedAt
const payload = {
scheduleId: schedule.id,
workflowId: schedule.workflowId,
@@ -75,7 +100,8 @@ export async function GET(request: NextRequest) {
cronExpression: schedule.cronExpression || undefined,
lastRanAt: schedule.lastRanAt?.toISOString(),
failedCount: schedule.failedCount || 0,
now: now.toISOString(),
now: queueTime.toISOString(),
scheduledFor: schedule.nextRunAt?.toISOString(),
}
void executeScheduleJob(payload).catch((error) => {

View File

@@ -3,6 +3,7 @@ import { task } from '@trigger.dev/sdk'
import { Cron } from 'croner'
import { eq } from 'drizzle-orm'
import { v4 as uuidv4 } from 'uuid'
import type { ZodRecord, ZodString } from 'zod'
import { checkServerSideUsageLimits } from '@/lib/billing'
import { getHighestPrioritySubscription } from '@/lib/billing/core/subscription'
import { getPersonalAndWorkspaceEnv } from '@/lib/environment/utils'
@@ -20,7 +21,6 @@ import { executeWorkflowCore } from '@/lib/workflows/executor/execution-core'
import { PauseResumeManager } from '@/lib/workflows/executor/pause-resume-manager'
import { getWorkspaceBilledAccountUserId } from '@/lib/workspaces/utils'
import { type ExecutionMetadata, ExecutionSnapshot } from '@/executor/execution/snapshot'
import { Serializer } from '@/serializer'
import { RateLimiter } from '@/services/queue'
import { mergeSubblockState } from '@/stores/workflows/server-utils'
@@ -28,71 +28,70 @@ const logger = createLogger('TriggerScheduleExecution')
const MAX_CONSECUTIVE_FAILURES = 3
export type ScheduleExecutionPayload = {
scheduleId: string
workflowId: string
blockId?: string
cronExpression?: string
lastRanAt?: string
failedCount?: number
now: string
type WorkflowRecord = typeof workflow.$inferSelect
type WorkflowScheduleUpdate = Partial<typeof workflowSchedule.$inferInsert>
type ExecutionCoreResult = Awaited<ReturnType<typeof executeWorkflowCore>>
type RunWorkflowResult =
| { status: 'skip'; blocks: Record<string, BlockState> }
| { status: 'success'; blocks: Record<string, BlockState>; executionResult: ExecutionCoreResult }
| { status: 'failure'; blocks: Record<string, BlockState>; executionResult: ExecutionCoreResult }
async function applyScheduleUpdate(
scheduleId: string,
updates: WorkflowScheduleUpdate,
requestId: string,
context: string,
successLog?: string
) {
try {
await db.update(workflowSchedule).set(updates).where(eq(workflowSchedule.id, scheduleId))
if (successLog) {
logger.debug(`[${requestId}] ${successLog}`)
}
} catch (error) {
logger.error(`[${requestId}] ${context}`, error)
}
}
function calculateNextRunTime(
schedule: { cronExpression?: string; lastRanAt?: string },
blocks: Record<string, BlockState>
): Date {
const scheduleBlock = Object.values(blocks).find(
(block) => block.type === 'starter' || block.type === 'schedule'
)
if (!scheduleBlock) throw new Error('No starter or schedule block found')
const scheduleType = getSubBlockValue(scheduleBlock, 'scheduleType')
const scheduleValues = getScheduleTimeValues(scheduleBlock)
const timezone = scheduleValues.timezone || 'UTC'
if (schedule.cronExpression) {
const cron = new Cron(schedule.cronExpression, {
timezone,
})
const nextDate = cron.nextRun()
if (!nextDate) throw new Error('Invalid cron expression or no future occurrences')
return nextDate
async function releaseScheduleLock(
scheduleId: string,
requestId: string,
now: Date,
context: string,
nextRunAt?: Date | null
) {
const updates: WorkflowScheduleUpdate = {
updatedAt: now,
lastQueuedAt: null,
}
const lastRanAt = schedule.lastRanAt ? new Date(schedule.lastRanAt) : null
return calculateNextTime(scheduleType, scheduleValues, lastRanAt)
if (nextRunAt) {
updates.nextRunAt = nextRunAt
}
await applyScheduleUpdate(scheduleId, updates, requestId, context)
}
export async function executeScheduleJob(payload: ScheduleExecutionPayload) {
const executionId = uuidv4()
const requestId = executionId.slice(0, 8)
const now = new Date(payload.now)
async function resolveActorUserId(workflowRecord: WorkflowRecord) {
if (workflowRecord.workspaceId) {
const actor = await getWorkspaceBilledAccountUserId(workflowRecord.workspaceId)
if (actor) {
return actor
}
}
logger.info(`[${requestId}] Starting schedule execution`, {
scheduleId: payload.scheduleId,
workflowId: payload.workflowId,
executionId,
})
return workflowRecord.userId ?? null
}
const EnvVarsSchema = (await import('zod')).z.record((await import('zod')).z.string())
try {
const [workflowRecord] = await db
.select()
.from(workflow)
.where(eq(workflow.id, payload.workflowId))
.limit(1)
if (!workflowRecord) {
logger.warn(`[${requestId}] Workflow ${payload.workflowId} not found`)
const loggingSession = new LoggingSession(
payload.workflowId,
executionId,
'schedule',
requestId
)
async function handleWorkflowNotFound(
payload: ScheduleExecutionPayload,
executionId: string,
requestId: string,
now: Date
) {
const loggingSession = new LoggingSession(payload.workflowId, executionId, 'schedule', requestId)
await loggingSession.safeStart({
userId: 'unknown',
@@ -109,30 +108,27 @@ export async function executeScheduleJob(payload: ScheduleExecutionPayload) {
traceSpans: [],
})
return
}
let actorUserId: string | null = null
if (workflowRecord.workspaceId) {
actorUserId = await getWorkspaceBilledAccountUserId(workflowRecord.workspaceId)
}
if (!actorUserId) {
actorUserId = workflowRecord.userId ?? null
}
if (!actorUserId) {
logger.warn(
`[${requestId}] Skipping schedule ${payload.scheduleId}: unable to resolve billed account.`
await applyScheduleUpdate(
payload.scheduleId,
{
updatedAt: now,
lastQueuedAt: null,
status: 'disabled',
},
requestId,
`Failed to disable schedule ${payload.scheduleId} after missing workflow`,
`Disabled schedule ${payload.scheduleId} because the workflow no longer exists`
)
}
const loggingSession = new LoggingSession(
payload.workflowId,
executionId,
'schedule',
requestId
)
async function handleMissingActor(
payload: ScheduleExecutionPayload,
workflowRecord: WorkflowRecord,
executionId: string,
requestId: string,
now: Date
) {
const loggingSession = new LoggingSession(payload.workflowId, executionId, 'schedule', requestId)
await loggingSession.safeStart({
userId: workflowRecord.userId ?? 'unknown',
@@ -149,19 +145,24 @@ export async function executeScheduleJob(payload: ScheduleExecutionPayload) {
traceSpans: [],
})
return
}
const userSubscription = await getHighestPrioritySubscription(actorUserId)
const loggingSession = new LoggingSession(
payload.workflowId,
executionId,
'schedule',
requestId
await releaseScheduleLock(
payload.scheduleId,
requestId,
now,
`Failed to release schedule ${payload.scheduleId} after billing account lookup`
)
}
const rateLimiter = new RateLimiter()
async function ensureRateLimit(
actorUserId: string,
userSubscription: Awaited<ReturnType<typeof getHighestPrioritySubscription>>,
rateLimiter: RateLimiter,
loggingSession: LoggingSession,
payload: ScheduleExecutionPayload,
workflowRecord: WorkflowRecord,
requestId: string,
now: Date
) {
const rateLimitCheck = await rateLimiter.checkRateLimitWithSubscription(
actorUserId,
userSubscription,
@@ -169,15 +170,15 @@ export async function executeScheduleJob(payload: ScheduleExecutionPayload) {
false
)
if (!rateLimitCheck.allowed) {
logger.warn(
`[${requestId}] Rate limit exceeded for scheduled workflow ${payload.workflowId}`,
{
if (rateLimitCheck.allowed) {
return true
}
logger.warn(`[${requestId}] Rate limit exceeded for scheduled workflow ${payload.workflowId}`, {
userId: workflowRecord.userId,
remaining: rateLimitCheck.remaining,
resetAt: rateLimitCheck.resetAt,
}
)
})
await loggingSession.safeStart({
userId: actorUserId,
@@ -187,7 +188,9 @@ export async function executeScheduleJob(payload: ScheduleExecutionPayload) {
await loggingSession.safeCompleteWithError({
error: {
message: `Rate limit exceeded. ${rateLimitCheck.remaining || 0} requests remaining. Resets at ${rateLimitCheck.resetAt ? new Date(rateLimitCheck.resetAt).toISOString() : 'unknown'}. Schedule will retry in 5 minutes.`,
message: `Rate limit exceeded. ${rateLimitCheck.remaining || 0} requests remaining. Resets at ${
rateLimitCheck.resetAt ? new Date(rateLimitCheck.resetAt).toISOString() : 'unknown'
}. Schedule will retry in 5 minutes.`,
stackTrace: undefined,
},
traceSpans: [],
@@ -196,25 +199,49 @@ export async function executeScheduleJob(payload: ScheduleExecutionPayload) {
const retryDelay = 5 * 60 * 1000
const nextRetryAt = new Date(now.getTime() + retryDelay)
try {
await db
.update(workflowSchedule)
.set({
await applyScheduleUpdate(
payload.scheduleId,
{
updatedAt: now,
nextRunAt: nextRetryAt,
})
.where(eq(workflowSchedule.id, payload.scheduleId))
},
requestId,
`Error updating schedule ${payload.scheduleId} for rate limit`,
`Updated next retry time for schedule ${payload.scheduleId} due to rate limit`
)
logger.debug(`[${requestId}] Updated next retry time due to rate limit`)
} catch (updateError) {
logger.error(`[${requestId}] Error updating schedule for rate limit:`, updateError)
}
return
return false
}
async function calculateNextRunFromDeployment(
payload: ScheduleExecutionPayload,
requestId: string
) {
try {
const deployedData = await loadDeployedWorkflowState(payload.workflowId)
return calculateNextRunTime(payload, deployedData.blocks as Record<string, BlockState>)
} catch (error) {
logger.warn(
`[${requestId}] Unable to calculate nextRunAt for schedule ${payload.scheduleId}`,
error
)
return null
}
}
async function ensureUsageLimits(
actorUserId: string,
payload: ScheduleExecutionPayload,
workflowRecord: WorkflowRecord,
loggingSession: LoggingSession,
requestId: string,
now: Date
) {
const usageCheck = await checkServerSideUsageLimits(actorUserId)
if (usageCheck.isExceeded) {
if (!usageCheck.isExceeded) {
return true
}
logger.warn(
`[${requestId}] User ${workflowRecord.userId} has exceeded usage limits. Skipping scheduled execution.`,
{
@@ -240,26 +267,119 @@ export async function executeScheduleJob(payload: ScheduleExecutionPayload) {
traceSpans: [],
})
try {
const deployedData = await loadDeployedWorkflowState(payload.workflowId)
const nextRunAt = calculateNextRunTime(payload, deployedData.blocks as any)
await db
.update(workflowSchedule)
.set({ updatedAt: now, nextRunAt })
.where(eq(workflowSchedule.id, payload.scheduleId))
} catch (calcErr) {
logger.warn(
`[${requestId}] Unable to calculate nextRunAt while skipping schedule ${payload.scheduleId}`,
calcErr
const nextRunAt = await calculateNextRunFromDeployment(payload, requestId)
if (nextRunAt) {
await applyScheduleUpdate(
payload.scheduleId,
{
updatedAt: now,
nextRunAt,
},
requestId,
`Error updating schedule ${payload.scheduleId} after usage limit check`,
`Scheduled next run for ${payload.scheduleId} after usage limit`
)
}
return false
}
async function determineNextRunAfterError(
payload: ScheduleExecutionPayload,
now: Date,
requestId: string
) {
try {
const [workflowRecord] = await db
.select()
.from(workflow)
.where(eq(workflow.id, payload.workflowId))
.limit(1)
if (workflowRecord?.isDeployed) {
const nextRunAt = await calculateNextRunFromDeployment(payload, requestId)
if (nextRunAt) {
return nextRunAt
}
}
} catch (workflowError) {
logger.error(`[${requestId}] Error retrieving workflow for next run calculation`, workflowError)
}
return new Date(now.getTime() + 24 * 60 * 60 * 1000)
}
async function ensureBlockVariablesResolvable(
blocks: Record<string, BlockState>,
variables: Record<string, string>,
requestId: string
) {
await Promise.all(
Object.values(blocks).map(async (block) => {
const subBlocks = block.subBlocks ?? {}
await Promise.all(
Object.values(subBlocks).map(async (subBlock) => {
const value = subBlock.value
if (typeof value !== 'string' || !value.includes('{{') || !value.includes('}}')) {
return
}
logger.info(`[${requestId}] Executing scheduled workflow ${payload.workflowId}`)
const matches = value.match(/{{([^}]+)}}/g)
if (!matches) {
return
}
for (const match of matches) {
const varName = match.slice(2, -2)
const encryptedValue = variables[varName]
if (!encryptedValue) {
throw new Error(`Environment variable "${varName}" was not found`)
}
try {
const executionSuccess = await (async () => {
await decryptSecret(encryptedValue)
} catch (error) {
logger.error(`[${requestId}] Error decrypting value for variable "${varName}"`, error)
const message = error instanceof Error ? error.message : 'Unknown error'
throw new Error(`Failed to decrypt environment variable "${varName}": ${message}`)
}
}
})
)
})
)
}
async function ensureEnvVarsDecryptable(variables: Record<string, string>, requestId: string) {
for (const [key, encryptedValue] of Object.entries(variables)) {
try {
await decryptSecret(encryptedValue)
} catch (error) {
logger.error(`[${requestId}] Failed to decrypt environment variable "${key}"`, error)
const message = error instanceof Error ? error.message : 'Unknown error'
throw new Error(`Failed to decrypt environment variable "${key}": ${message}`)
}
}
}
async function runWorkflowExecution({
payload,
workflowRecord,
actorUserId,
loggingSession,
requestId,
executionId,
EnvVarsSchema,
}: {
payload: ScheduleExecutionPayload
workflowRecord: WorkflowRecord
actorUserId: string
loggingSession: LoggingSession
requestId: string
executionId: string
EnvVarsSchema: ZodRecord<ZodString, ZodString>
}): Promise<RunWorkflowResult> {
try {
logger.debug(`[${requestId}] Loading deployed workflow ${payload.workflowId}`)
const deployedData = await loadDeployedWorkflowState(payload.workflowId)
@@ -277,21 +397,7 @@ export async function executeScheduleJob(payload: ScheduleExecutionPayload) {
`[${requestId}] Schedule trigger block ${payload.blockId} not found in deployed workflow ${payload.workflowId}. Skipping execution.`
)
await loggingSession.safeStart({
userId: actorUserId,
workspaceId: workflowRecord.workspaceId || '',
variables: {},
})
await loggingSession.safeCompleteWithError({
error: {
message: `Trigger block not deployed. The schedule trigger (block ${payload.blockId}) is not present in the deployed workflow. Please redeploy the workflow.`,
stackTrace: undefined,
},
traceSpans: [],
})
return { skip: true, blocks: {} as Record<string, BlockState> }
return { status: 'skip', blocks: {} as Record<string, BlockState> }
}
}
@@ -301,118 +407,14 @@ export async function executeScheduleJob(payload: ScheduleExecutionPayload) {
actorUserId,
workflowRecord.workspaceId || undefined
)
const variables = EnvVarsSchema.parse({
...personalEncrypted,
...workspaceEncrypted,
})
const currentBlockStates = await Object.entries(mergedStates).reduce(
async (accPromise, [id, block]) => {
const acc = await accPromise
acc[id] = await Object.entries(block.subBlocks).reduce(
async (subAccPromise, [key, subBlock]) => {
const subAcc = await subAccPromise
let value = subBlock.value
if (typeof value === 'string' && value.includes('{{') && value.includes('}}')) {
const matches = value.match(/{{([^}]+)}}/g)
if (matches) {
for (const match of matches) {
const varName = match.slice(2, -2)
const encryptedValue = variables[varName]
if (!encryptedValue) {
throw new Error(`Environment variable "${varName}" was not found`)
}
try {
const { decrypted } = await decryptSecret(encryptedValue)
value = (value as string).replace(match, decrypted)
} catch (error: any) {
logger.error(
`[${requestId}] Error decrypting value for variable "${varName}"`,
error
)
throw new Error(
`Failed to decrypt environment variable "${varName}": ${error.message}`
)
}
}
}
}
subAcc[key] = value
return subAcc
},
Promise.resolve({} as Record<string, any>)
)
return acc
},
Promise.resolve({} as Record<string, Record<string, any>>)
)
const decryptedEnvVars: Record<string, string> = {}
for (const [key, encryptedValue] of Object.entries(variables)) {
try {
const { decrypted } = await decryptSecret(encryptedValue)
decryptedEnvVars[key] = decrypted
} catch (error: any) {
logger.error(`[${requestId}] Failed to decrypt environment variable "${key}"`, error)
throw new Error(`Failed to decrypt environment variable "${key}": ${error.message}`)
}
}
const processedBlockStates = Object.entries(currentBlockStates).reduce(
(acc, [blockId, blockState]) => {
if (blockState.responseFormat && typeof blockState.responseFormat === 'string') {
const responseFormatValue = blockState.responseFormat.trim()
if (responseFormatValue.startsWith('<') && responseFormatValue.includes('>')) {
logger.debug(
`[${requestId}] Response format contains variable reference for block ${blockId}`
)
acc[blockId] = blockState
} else if (responseFormatValue === '') {
acc[blockId] = {
...blockState,
responseFormat: undefined,
}
} else {
try {
logger.debug(`[${requestId}] Parsing responseFormat for block ${blockId}`)
const parsedResponseFormat = JSON.parse(responseFormatValue)
acc[blockId] = {
...blockState,
responseFormat: parsedResponseFormat,
}
} catch (error) {
logger.warn(
`[${requestId}] Failed to parse responseFormat for block ${blockId}, using undefined`,
error
)
acc[blockId] = {
...blockState,
responseFormat: undefined,
}
}
}
} else {
acc[blockId] = blockState
}
return acc
},
{} as Record<string, Record<string, any>>
)
const workflowVariables = (workflowRecord.variables as Record<string, any>) || {}
const serializedWorkflow = new Serializer().serializeWorkflow(
mergedStates,
edges,
loops,
parallels,
true
)
await ensureBlockVariablesResolvable(mergedStates, variables, requestId)
await ensureEnvVarsDecryptable(variables, requestId)
const input = {
_context: {
@@ -476,8 +478,12 @@ export async function executeScheduleJob(payload: ScheduleExecutionPayload) {
executionTime: executionResult.metadata?.duration,
})
return { success: executionResult.success, blocks, executionResult }
} catch (earlyError: any) {
if (executionResult.success) {
return { status: 'success', blocks, executionResult }
}
return { status: 'failure', blocks, executionResult }
} catch (earlyError) {
logger.error(
`[${requestId}] Early failure in scheduled workflow ${payload.workflowId}`,
earlyError
@@ -486,109 +492,73 @@ export async function executeScheduleJob(payload: ScheduleExecutionPayload) {
try {
await loggingSession.safeCompleteWithError({
error: {
message: `Schedule execution failed: ${earlyError.message}`,
stackTrace: earlyError.stack,
message: `Schedule execution failed: ${
earlyError instanceof Error ? earlyError.message : String(earlyError)
}`,
stackTrace: earlyError instanceof Error ? earlyError.stack : undefined,
},
traceSpans: [],
})
} catch (loggingError) {
logger.error(
`[${requestId}] Failed to complete log entry for schedule failure`,
loggingError
)
logger.error(`[${requestId}] Failed to complete log entry for schedule failure`, loggingError)
}
throw earlyError
}
})()
}
if ('skip' in executionSuccess && executionSuccess.skip) {
return
}
export type ScheduleExecutionPayload = {
scheduleId: string
workflowId: string
blockId?: string
cronExpression?: string
lastRanAt?: string
failedCount?: number
now: string
scheduledFor?: string
}
if (executionSuccess.success) {
logger.info(`[${requestId}] Workflow ${payload.workflowId} executed successfully`)
const nextRunAt = calculateNextRunTime(payload, executionSuccess.blocks)
logger.debug(
`[${requestId}] Calculated next run time: ${nextRunAt.toISOString()} for workflow ${payload.workflowId}`
function calculateNextRunTime(
schedule: { cronExpression?: string; lastRanAt?: string },
blocks: Record<string, BlockState>
): Date {
const scheduleBlock = Object.values(blocks).find(
(block) => block.type === 'starter' || block.type === 'schedule'
)
if (!scheduleBlock) throw new Error('No starter or schedule block found')
const scheduleType = getSubBlockValue(scheduleBlock, 'scheduleType')
const scheduleValues = getScheduleTimeValues(scheduleBlock)
try {
await db
.update(workflowSchedule)
.set({
lastRanAt: now,
updatedAt: now,
nextRunAt,
failedCount: 0,
const timezone = scheduleValues.timezone || 'UTC'
if (schedule.cronExpression) {
const cron = new Cron(schedule.cronExpression, {
timezone,
})
.where(eq(workflowSchedule.id, payload.scheduleId))
logger.debug(
`[${requestId}] Updated next run time for workflow ${payload.workflowId} to ${nextRunAt.toISOString()}`
)
} catch (updateError) {
logger.error(`[${requestId}] Error updating schedule after success:`, updateError)
}
} else {
logger.warn(`[${requestId}] Workflow ${payload.workflowId} execution failed`)
const newFailedCount = (payload.failedCount || 0) + 1
const shouldDisable = newFailedCount >= MAX_CONSECUTIVE_FAILURES
const nextRunAt = calculateNextRunTime(payload, executionSuccess.blocks)
if (shouldDisable) {
logger.warn(
`[${requestId}] Disabling schedule for workflow ${payload.workflowId} after ${MAX_CONSECUTIVE_FAILURES} consecutive failures`
)
const nextDate = cron.nextRun()
if (!nextDate) throw new Error('Invalid cron expression or no future occurrences')
return nextDate
}
try {
await db
.update(workflowSchedule)
.set({
updatedAt: now,
nextRunAt,
failedCount: newFailedCount,
lastFailedAt: now,
status: shouldDisable ? 'disabled' : 'active',
const lastRanAt = schedule.lastRanAt ? new Date(schedule.lastRanAt) : null
return calculateNextTime(scheduleType, scheduleValues, lastRanAt)
}
export async function executeScheduleJob(payload: ScheduleExecutionPayload) {
const executionId = uuidv4()
const requestId = executionId.slice(0, 8)
const now = new Date(payload.now)
const scheduledFor = payload.scheduledFor ? new Date(payload.scheduledFor) : null
logger.info(`[${requestId}] Starting schedule execution`, {
scheduleId: payload.scheduleId,
workflowId: payload.workflowId,
executionId,
})
.where(eq(workflowSchedule.id, payload.scheduleId))
logger.debug(`[${requestId}] Updated schedule after failure`)
} catch (updateError) {
logger.error(`[${requestId}] Error updating schedule after failure:`, updateError)
}
}
} catch (error: any) {
if (error.message?.includes('Service overloaded')) {
logger.warn(`[${requestId}] Service overloaded, retrying schedule in 5 minutes`)
const zod = await import('zod')
const EnvVarsSchema = zod.z.record(zod.z.string())
const retryDelay = 5 * 60 * 1000
const nextRetryAt = new Date(now.getTime() + retryDelay)
try {
await db
.update(workflowSchedule)
.set({
updatedAt: now,
nextRunAt: nextRetryAt,
})
.where(eq(workflowSchedule.id, payload.scheduleId))
logger.debug(`[${requestId}] Updated schedule retry time due to service overload`)
} catch (updateError) {
logger.error(`[${requestId}] Error updating schedule for service overload:`, updateError)
}
} else {
logger.error(
`[${requestId}] Error executing scheduled workflow ${payload.workflowId}`,
error
)
let nextRunAt: Date
try {
const [workflowRecord] = await db
.select()
@@ -596,24 +566,150 @@ export async function executeScheduleJob(payload: ScheduleExecutionPayload) {
.where(eq(workflow.id, payload.workflowId))
.limit(1)
if (workflowRecord?.isDeployed) {
try {
const deployedData = await loadDeployedWorkflowState(payload.workflowId)
nextRunAt = calculateNextRunTime(payload, deployedData.blocks as any)
} catch {
nextRunAt = new Date(now.getTime() + 24 * 60 * 60 * 1000)
}
} else {
nextRunAt = new Date(now.getTime() + 24 * 60 * 60 * 1000)
}
} catch (workflowError) {
logger.error(
`[${requestId}] Error retrieving workflow for next run calculation`,
workflowError
)
nextRunAt = new Date(now.getTime() + 24 * 60 * 60 * 1000)
if (!workflowRecord) {
logger.warn(`[${requestId}] Workflow ${payload.workflowId} not found`)
await handleWorkflowNotFound(payload, executionId, requestId, now)
return
}
const actorUserId = await resolveActorUserId(workflowRecord)
if (!actorUserId) {
logger.warn(
`[${requestId}] Skipping schedule ${payload.scheduleId}: unable to resolve billed account.`
)
await handleMissingActor(payload, workflowRecord, executionId, requestId, now)
return
}
const userSubscription = await getHighestPrioritySubscription(actorUserId)
const loggingSession = new LoggingSession(
payload.workflowId,
executionId,
'schedule',
requestId
)
const rateLimiter = new RateLimiter()
const withinRateLimit = await ensureRateLimit(
actorUserId,
userSubscription,
rateLimiter,
loggingSession,
payload,
workflowRecord,
requestId,
now
)
if (!withinRateLimit) {
return
}
const withinUsageLimits = await ensureUsageLimits(
actorUserId,
payload,
workflowRecord,
loggingSession,
requestId,
now
)
if (!withinUsageLimits) {
return
}
logger.info(`[${requestId}] Executing scheduled workflow ${payload.workflowId}`)
try {
const executionResult = await runWorkflowExecution({
payload,
workflowRecord,
actorUserId,
loggingSession,
requestId,
executionId,
EnvVarsSchema,
})
if (executionResult.status === 'skip') {
await releaseScheduleLock(
payload.scheduleId,
requestId,
now,
`Failed to release schedule ${payload.scheduleId} after skip`,
scheduledFor ?? now
)
return
}
if (executionResult.status === 'success') {
logger.info(`[${requestId}] Workflow ${payload.workflowId} executed successfully`)
const nextRunAt = calculateNextRunTime(payload, executionResult.blocks)
await applyScheduleUpdate(
payload.scheduleId,
{
lastRanAt: now,
updatedAt: now,
nextRunAt,
failedCount: 0,
},
requestId,
`Error updating schedule ${payload.scheduleId} after success`,
`Updated next run time for workflow ${payload.workflowId} to ${nextRunAt.toISOString()}`
)
return
}
logger.warn(`[${requestId}] Workflow ${payload.workflowId} execution failed`)
const newFailedCount = (payload.failedCount || 0) + 1
const shouldDisable = newFailedCount >= MAX_CONSECUTIVE_FAILURES
if (shouldDisable) {
logger.warn(
`[${requestId}] Disabling schedule for workflow ${payload.workflowId} after ${MAX_CONSECUTIVE_FAILURES} consecutive failures`
)
}
const nextRunAt = calculateNextRunTime(payload, executionResult.blocks)
await applyScheduleUpdate(
payload.scheduleId,
{
updatedAt: now,
nextRunAt,
failedCount: newFailedCount,
lastFailedAt: now,
status: shouldDisable ? 'disabled' : 'active',
},
requestId,
`Error updating schedule ${payload.scheduleId} after failure`,
`Updated schedule ${payload.scheduleId} after failure`
)
} catch (error: any) {
if (error?.message?.includes('Service overloaded')) {
logger.warn(`[${requestId}] Service overloaded, retrying schedule in 5 minutes`)
const retryDelay = 5 * 60 * 1000
const nextRetryAt = new Date(now.getTime() + retryDelay)
await applyScheduleUpdate(
payload.scheduleId,
{
updatedAt: now,
nextRunAt: nextRetryAt,
},
requestId,
`Error updating schedule ${payload.scheduleId} for service overload`,
`Updated schedule ${payload.scheduleId} retry time due to service overload`
)
return
}
logger.error(`[${requestId}] Error executing scheduled workflow ${payload.workflowId}`, error)
const nextRunAt = await determineNextRunAfterError(payload, now, requestId)
const newFailedCount = (payload.failedCount || 0) + 1
const shouldDisable = newFailedCount >= MAX_CONSECUTIVE_FAILURES
@@ -623,23 +719,19 @@ export async function executeScheduleJob(payload: ScheduleExecutionPayload) {
)
}
try {
await db
.update(workflowSchedule)
.set({
await applyScheduleUpdate(
payload.scheduleId,
{
updatedAt: now,
nextRunAt,
failedCount: newFailedCount,
lastFailedAt: now,
status: shouldDisable ? 'disabled' : 'active',
})
.where(eq(workflowSchedule.id, payload.scheduleId))
logger.debug(`[${requestId}] Updated schedule after execution error`)
} catch (updateError) {
logger.error(`[${requestId}] Error updating schedule after execution error:`, updateError)
}
}
},
requestId,
`Error updating schedule ${payload.scheduleId} after execution error`,
`Updated schedule ${payload.scheduleId} after execution error`
)
}
} catch (error: any) {
logger.error(`[${requestId}] Error processing schedule ${payload.scheduleId}`, error)

View File

@@ -0,0 +1 @@
ALTER TABLE "workflow_schedule" ADD COLUMN "last_queued_at" timestamp;

File diff suppressed because it is too large Load Diff

View File

@@ -750,6 +750,13 @@
"when": 1762565365042,
"tag": "0107_silky_agent_brand",
"breakpoints": true
},
{
"idx": 108,
"version": "7",
"when": 1762572820066,
"tag": "0108_cuddly_scream",
"breakpoints": true
}
]
}

View File

@@ -443,6 +443,7 @@ export const workflowSchedule = pgTable(
cronExpression: text('cron_expression'),
nextRunAt: timestamp('next_run_at'),
lastRanAt: timestamp('last_ran_at'),
lastQueuedAt: timestamp('last_queued_at'),
triggerType: text('trigger_type').notNull(), // "manual", "webhook", "schedule"
timezone: text('timezone').notNull().default('UTC'),
failedCount: integer('failed_count').notNull().default(0), // Track consecutive failures