fix(scheduling): fix error where scheduled executions were not actually executed

This commit is contained in:
Waleed Latif
2025-02-19 23:06:28 -08:00
parent 5029c99ce9
commit 49614a4e54
2 changed files with 328 additions and 193 deletions

View File

@@ -31,24 +31,47 @@ function calculateNextRunTime(
const scheduleType = getSubBlockValue(starterBlock, 'scheduleType')
// If there's a cron expression, use that first regardless of schedule type
if (schedule.cronExpression) {
const cron = new Cron(schedule.cronExpression)
const nextDate = cron.nextRun()
if (!nextDate) throw new Error('Invalid cron expression or no future occurrences')
return nextDate
}
switch (scheduleType) {
case 'minutes': {
const interval = parseInt(getSubBlockValue(starterBlock, 'minutesInterval') || '15')
// If this is the first run and we have a starting time preference
const startingAt = getSubBlockValue(starterBlock, 'minutesStartingAt')
// If we have a specific starting time and this is the first run
if (!schedule.lastRanAt && startingAt) {
const [hours, minutes] = startingAt.split(':')
const startTime = new Date()
startTime.setHours(parseInt(hours), parseInt(minutes), 0, 0)
// If start time is in the past, add interval until it's in the future
while (startTime <= new Date()) {
startTime.setMinutes(startTime.getMinutes() + interval)
}
return startTime
}
// For subsequent runs, just add the interval to the last run
const nextRun = new Date(schedule.lastRanAt || new Date())
nextRun.setMinutes(nextRun.getMinutes() + interval)
// For subsequent runs or if no starting time specified
const baseTime = schedule.lastRanAt ? new Date(schedule.lastRanAt) : new Date()
const currentMinutes = baseTime.getMinutes()
// Find the next interval boundary after the base time
const nextIntervalBoundary = Math.ceil(currentMinutes / interval) * interval
const nextRun = new Date(baseTime)
// Handle minute rollover properly
const minutesToAdd = nextIntervalBoundary - currentMinutes
nextRun.setMinutes(nextRun.getMinutes() + minutesToAdd, 0, 0)
// If we're already past this time, add another interval
if (nextRun <= new Date()) {
nextRun.setMinutes(nextRun.getMinutes() + interval)
}
return nextRun
}
case 'hourly': {
@@ -119,7 +142,7 @@ function calculateNextRunTime(
const EnvVarsSchema = z.record(z.string())
export const config = {
runtime: 'edge',
runtime: 'nodejs',
schedule: '*/1 * * * *',
}
@@ -129,177 +152,229 @@ const runningExecutions = new Set<string>()
// Add GET handler for cron job
export async function GET(req: NextRequest) {
const now = new Date()
console.log('Starting scheduled execution check at:', now.toISOString())
// Query schedules due for execution
const dueSchedules = await db
.select()
.from(workflowSchedule)
.where(lte(workflowSchedule.nextRunAt, now))
// Limit to 10 workflows per minute to prevent overload
.limit(10)
let dueSchedules: (typeof workflowSchedule.$inferSelect)[] = []
for (const schedule of dueSchedules) {
try {
// Skip if this workflow is already running
if (runningExecutions.has(schedule.workflowId)) {
console.log(`Skipping workflow ${schedule.workflowId} - already running`)
continue
}
try {
// Query schedules due for execution
dueSchedules = await db
.select()
.from(workflowSchedule)
.where(lte(workflowSchedule.nextRunAt, now))
// Limit to 10 workflows per minute to prevent overload
.limit(10)
runningExecutions.add(schedule.workflowId)
console.log('Found due schedules:', dueSchedules.length)
// Retrieve the workflow record
const [workflowRecord] = await db
.select()
.from(workflow)
.where(eq(workflow.id, schedule.workflowId))
.limit(1)
for (const schedule of dueSchedules) {
console.log('Processing schedule:', {
id: schedule.id,
workflowId: schedule.workflowId,
nextRunAt: schedule.nextRunAt,
cronExpression: schedule.cronExpression,
})
if (!workflowRecord) {
runningExecutions.delete(schedule.workflowId)
continue
}
try {
// Skip if this workflow is already running
if (runningExecutions.has(schedule.workflowId)) {
console.log(`Skipping workflow ${schedule.workflowId} - already running`)
continue
}
// The state in the database is exactly what we store in localStorage
const state = workflowRecord.state as WorkflowState
const { blocks, edges, loops } = state
runningExecutions.add(schedule.workflowId)
// Use the same execution flow as in use-workflow-execution.ts
const mergedStates = mergeSubblockState(blocks)
// Retrieve the workflow record
const [workflowRecord] = await db
.select()
.from(workflow)
.where(eq(workflow.id, schedule.workflowId))
.limit(1)
// Retrieve environment variables for this user
const [userEnv] = await db
.select()
.from(environment)
.where(eq(environment.userId, workflowRecord.userId))
.limit(1)
if (!workflowRecord) {
runningExecutions.delete(schedule.workflowId)
continue
}
if (!userEnv) {
throw new Error('No environment variables found for this user')
}
// The state in the database is exactly what we store in localStorage
const state = workflowRecord.state as WorkflowState
const { blocks, edges, loops } = state
// Parse and validate environment variables
const variables = EnvVarsSchema.parse(userEnv.variables)
// Use the same execution flow as in use-workflow-execution.ts
const mergedStates = mergeSubblockState(blocks)
// Replace environment variables in the block states
const currentBlockStates = await Object.entries(mergedStates).reduce(
async (accPromise, [id, block]) => {
const acc = await accPromise
acc[id] = await Object.entries(block.subBlocks).reduce(
async (subAccPromise, [key, subBlock]) => {
const subAcc = await subAccPromise
let value = subBlock.value
// Retrieve environment variables for this user
const [userEnv] = await db
.select()
.from(environment)
.where(eq(environment.userId, workflowRecord.userId))
.limit(1)
// If the value is a string and contains environment variable syntax
if (typeof value === 'string' && value.includes('{{') && value.includes('}}')) {
const matches = value.match(/{{([^}]+)}}/g)
if (matches) {
// Process all matches sequentially
for (const match of matches) {
const varName = match.slice(2, -2) // Remove {{ and }}
const encryptedValue = variables[varName]
if (!encryptedValue) {
throw new Error(`Environment variable "${varName}" was not found`)
}
if (!userEnv) {
throw new Error('No environment variables found for this user')
}
try {
const { decrypted } = await decryptSecret(encryptedValue)
value = (value as string).replace(match, decrypted)
} catch (error: any) {
console.error('Error decrypting value:', error)
throw new Error(
`Failed to decrypt environment variable "${varName}": ${error.message}`
)
// Parse and validate environment variables
const variables = EnvVarsSchema.parse(userEnv.variables)
// Replace environment variables in the block states
const currentBlockStates = await Object.entries(mergedStates).reduce(
async (accPromise, [id, block]) => {
const acc = await accPromise
acc[id] = await Object.entries(block.subBlocks).reduce(
async (subAccPromise, [key, subBlock]) => {
const subAcc = await subAccPromise
let value = subBlock.value
// If the value is a string and contains environment variable syntax
if (typeof value === 'string' && value.includes('{{') && value.includes('}}')) {
const matches = value.match(/{{([^}]+)}}/g)
if (matches) {
// Process all matches sequentially
for (const match of matches) {
const varName = match.slice(2, -2) // Remove {{ and }}
const encryptedValue = variables[varName]
if (!encryptedValue) {
throw new Error(`Environment variable "${varName}" was not found`)
}
try {
const { decrypted } = await decryptSecret(encryptedValue)
value = (value as string).replace(match, decrypted)
} catch (error: any) {
console.error('Error decrypting value:', error)
throw new Error(
`Failed to decrypt environment variable "${varName}": ${error.message}`
)
}
}
}
}
}
subAcc[key] = value
return subAcc
},
Promise.resolve({} as Record<string, any>)
)
return acc
},
Promise.resolve({} as Record<string, Record<string, any>>)
)
subAcc[key] = value
return subAcc
},
Promise.resolve({} as Record<string, any>)
)
return acc
},
Promise.resolve({} as Record<string, Record<string, any>>)
)
// Create a map of decrypted environment variables
const decryptedEnvVars: Record<string, string> = {}
for (const [key, encryptedValue] of Object.entries(variables)) {
try {
const { decrypted } = await decryptSecret(encryptedValue)
decryptedEnvVars[key] = decrypted
} catch (error: any) {
console.error(`Failed to decrypt ${key}:`, error)
throw new Error(`Failed to decrypt environment variable "${key}": ${error.message}`)
// Create a map of decrypted environment variables
const decryptedEnvVars: Record<string, string> = {}
for (const [key, encryptedValue] of Object.entries(variables)) {
try {
const { decrypted } = await decryptSecret(encryptedValue)
decryptedEnvVars[key] = decrypted
} catch (error: any) {
console.error(`Failed to decrypt ${key}:`, error)
throw new Error(`Failed to decrypt environment variable "${key}": ${error.message}`)
}
}
}
// Serialize and execute the workflow
const serializedWorkflow = new Serializer().serializeWorkflow(mergedStates, edges, loops)
const executor = new Executor(serializedWorkflow, currentBlockStates, decryptedEnvVars)
const executionId = uuidv4()
const result = await executor.execute(schedule.workflowId)
// Serialize and execute the workflow
const serializedWorkflow = new Serializer().serializeWorkflow(mergedStates, edges, loops)
const executor = new Executor(serializedWorkflow, currentBlockStates, decryptedEnvVars)
const executionId = uuidv4()
const result = await executor.execute(schedule.workflowId)
// Log each execution step
for (const log of result.logs || []) {
// Log each execution step
for (const log of result.logs || []) {
await persistLog({
id: uuidv4(),
workflowId: schedule.workflowId,
executionId,
level: log.success ? 'info' : 'error',
message: `Block ${log.blockName || log.blockId} (${log.blockType}): ${
log.error || `Completed successfully`
}`,
duration: log.success ? `${log.durationMs}ms` : 'NA',
createdAt: new Date(log.endedAt || log.startedAt),
})
}
// Calculate total duration from successful block logs
const totalDuration = (result.logs || [])
.filter((log) => log.success)
.reduce((sum, log) => sum + log.durationMs, 0)
// Log the final execution result
await persistLog({
id: uuidv4(),
workflowId: schedule.workflowId,
executionId,
level: log.success ? 'info' : 'error',
message: `Block ${log.blockName || log.blockId} (${log.blockType}): ${
log.error || `Completed successfully`
}`,
duration: log.success ? `${log.durationMs}ms` : 'NA',
createdAt: new Date(log.endedAt || log.startedAt),
level: result.success ? 'info' : 'error',
message: result.success
? 'Scheduled workflow executed successfully'
: `Scheduled workflow execution failed: ${result.error}`,
duration: result.success ? `${totalDuration}ms` : 'NA',
createdAt: new Date(),
})
// Only update next_run_at if execution was successful
if (result.success) {
// Calculate the next run time based on the schedule configuration
const nextRunAt = calculateNextRunTime(schedule, blocks)
console.log('Calculated next run time:', nextRunAt.toISOString())
// Update the schedule with the next run time
await db
.update(workflowSchedule)
.set({
lastRanAt: now,
updatedAt: now,
nextRunAt,
})
.where(eq(workflowSchedule.id, schedule.id))
console.log('Updated schedule with new run time')
} else {
// If execution failed, increment next_run_at by a small delay to prevent immediate retries
const retryDelay = 1 * 60 * 1000 // 1 minute delay
const nextRetryAt = new Date(now.getTime() + retryDelay)
await db
.update(workflowSchedule)
.set({
updatedAt: now,
nextRunAt: nextRetryAt,
})
.where(eq(workflowSchedule.id, schedule.id))
console.log('Execution failed, scheduled retry at:', nextRetryAt.toISOString())
}
} catch (error: any) {
console.error('Error executing workflow:', error)
await persistLog({
id: uuidv4(),
workflowId: schedule.workflowId,
executionId: uuidv4(),
level: 'error',
message: error.message || 'Unknown error during scheduled workflow execution',
createdAt: new Date(),
})
// On error, increment next_run_at by a small delay to prevent immediate retries
const retryDelay = 1 * 60 * 1000 // 1 minute delay
const nextRetryAt = new Date(now.getTime() + retryDelay)
await db
.update(workflowSchedule)
.set({
updatedAt: now,
nextRunAt: nextRetryAt,
})
.where(eq(workflowSchedule.id, schedule.id))
console.log('Execution error, scheduled retry at:', nextRetryAt.toISOString())
} finally {
runningExecutions.delete(schedule.workflowId)
}
// Calculate total duration from successful block logs
const totalDuration = (result.logs || [])
.filter((log) => log.success)
.reduce((sum, log) => sum + log.durationMs, 0)
// Log the final execution result
await persistLog({
id: uuidv4(),
workflowId: schedule.workflowId,
executionId,
level: result.success ? 'info' : 'error',
message: result.success
? 'Scheduled workflow executed successfully'
: `Scheduled workflow execution failed: ${result.error}`,
duration: result.success ? `${totalDuration}ms` : 'NA',
createdAt: new Date(),
})
// Calculate the next run time based on the schedule configuration
const nextRunAt = calculateNextRunTime(schedule, blocks)
// Update the schedule with the next run time
await db
.update(workflowSchedule)
.set({
lastRanAt: now,
updatedAt: now,
nextRunAt,
})
.where(eq(workflowSchedule.id, schedule.id))
} catch (error: any) {
await persistLog({
id: uuidv4(),
workflowId: schedule.workflowId,
executionId: uuidv4(),
level: 'error',
message: error.message || 'Unknown error during scheduled workflow execution',
createdAt: new Date(),
})
} finally {
runningExecutions.delete(schedule.workflowId)
}
} catch (error: any) {
console.error('Error in scheduled execution:', error)
return NextResponse.json({ error: error.message }, { status: 500 })
}
return NextResponse.json({

View File

@@ -56,39 +56,72 @@ export async function POST(req: NextRequest) {
// Get schedule configuration from starter block
const scheduleType = getSubBlockValue(starterBlock, 'scheduleType')
// Calculate initial next run time based on schedule type
// Calculate cron expression based on schedule type
let cronExpression: string | null = null
let nextRunAt = new Date()
let shouldUpdateNextRunAt = false
let nextRunAt: Date | undefined
// First check if there's an existing schedule
const existingSchedule = await db
.select()
.from(workflowSchedule)
.where(eq(workflowSchedule.workflowId, workflowId))
.limit(1)
switch (scheduleType) {
case 'minutes': {
const interval = parseInt(getSubBlockValue(starterBlock, 'minutesInterval') || '15')
const startingAt = getSubBlockValue(starterBlock, 'minutesStartingAt')
if (startingAt) {
const [hours, minutes] = startingAt.split(':')
nextRunAt.setHours(parseInt(hours), parseInt(minutes), 0, 0)
while (nextRunAt <= new Date()) {
nextRunAt.setMinutes(nextRunAt.getMinutes() + interval)
}
} else {
nextRunAt.setMinutes(nextRunAt.getMinutes() + interval)
}
cronExpression = `*/${interval} * * * *`
// Check if we need to update next_run_at
if (!existingSchedule[0] || existingSchedule[0].cronExpression !== cronExpression) {
shouldUpdateNextRunAt = true
nextRunAt = new Date()
const startingAt = getSubBlockValue(starterBlock, 'minutesStartingAt')
if (startingAt) {
const [hours, minutes] = startingAt.split(':')
nextRunAt.setHours(parseInt(hours), parseInt(minutes), 0, 0)
while (nextRunAt <= new Date()) {
nextRunAt.setMinutes(nextRunAt.getMinutes() + interval)
}
} else {
// Round down to nearest interval boundary
const now = new Date()
const currentMinutes = now.getMinutes()
const lastIntervalBoundary = Math.floor(currentMinutes / interval) * interval
nextRunAt = new Date(now)
nextRunAt.setMinutes(lastIntervalBoundary, 0, 0)
while (nextRunAt <= now) {
nextRunAt.setMinutes(nextRunAt.getMinutes() + interval)
}
}
}
break
}
case 'hourly': {
const minute = parseInt(getSubBlockValue(starterBlock, 'hourlyMinute') || '0')
nextRunAt.setHours(nextRunAt.getHours() + 1, minute, 0, 0)
cronExpression = `${minute} * * * *`
if (!existingSchedule[0] || existingSchedule[0].cronExpression !== cronExpression) {
shouldUpdateNextRunAt = true
nextRunAt = new Date()
nextRunAt.setHours(nextRunAt.getHours() + 1, minute, 0, 0)
}
break
}
case 'daily': {
const [hours, minutes] = getSubBlockValue(starterBlock, 'dailyTime').split(':')
nextRunAt.setHours(parseInt(hours || '9'), parseInt(minutes || '0'), 0, 0)
if (nextRunAt <= new Date()) {
nextRunAt.setDate(nextRunAt.getDate() + 1)
}
cronExpression = `${minutes || '0'} ${hours || '9'} * * *`
if (!existingSchedule[0] || existingSchedule[0].cronExpression !== cronExpression) {
shouldUpdateNextRunAt = true
nextRunAt = new Date()
nextRunAt.setHours(parseInt(hours || '9'), parseInt(minutes || '0'), 0, 0)
if (nextRunAt <= new Date()) {
nextRunAt.setDate(nextRunAt.getDate() + 1)
}
}
break
}
case 'weekly': {
@@ -103,22 +136,32 @@ export async function POST(req: NextRequest) {
}
const targetDay = dayMap[getSubBlockValue(starterBlock, 'weeklyDay') || 'MON']
const [hours, minutes] = getSubBlockValue(starterBlock, 'weeklyDayTime').split(':')
nextRunAt.setHours(parseInt(hours || '9'), parseInt(minutes || '0'), 0, 0)
while (nextRunAt.getDay() !== targetDay || nextRunAt <= new Date()) {
nextRunAt.setDate(nextRunAt.getDate() + 1)
}
cronExpression = `${minutes || '0'} ${hours || '9'} * * ${targetDay}`
if (!existingSchedule[0] || existingSchedule[0].cronExpression !== cronExpression) {
shouldUpdateNextRunAt = true
nextRunAt = new Date()
nextRunAt.setHours(parseInt(hours || '9'), parseInt(minutes || '0'), 0, 0)
while (nextRunAt.getDay() !== targetDay || nextRunAt <= new Date()) {
nextRunAt.setDate(nextRunAt.getDate() + 1)
}
}
break
}
case 'monthly': {
const day = parseInt(getSubBlockValue(starterBlock, 'monthlyDay') || '1')
const [hours, minutes] = getSubBlockValue(starterBlock, 'monthlyTime').split(':')
nextRunAt.setDate(day)
nextRunAt.setHours(parseInt(hours || '9'), parseInt(minutes || '0'), 0, 0)
if (nextRunAt <= new Date()) {
nextRunAt.setMonth(nextRunAt.getMonth() + 1)
}
cronExpression = `${minutes || '0'} ${hours || '9'} ${day} * *`
if (!existingSchedule[0] || existingSchedule[0].cronExpression !== cronExpression) {
shouldUpdateNextRunAt = true
nextRunAt = new Date()
nextRunAt.setDate(day)
nextRunAt.setHours(parseInt(hours || '9'), parseInt(minutes || '0'), 0, 0)
if (nextRunAt <= new Date()) {
nextRunAt.setMonth(nextRunAt.getMonth() + 1)
}
}
break
}
case 'custom': {
@@ -129,39 +172,56 @@ export async function POST(req: NextRequest) {
{ status: 400 }
)
}
// The execute-workflows endpoint will handle custom cron calculation
nextRunAt = new Date()
nextRunAt.setMinutes(nextRunAt.getMinutes() + 1) // Start in 1 minute
if (!existingSchedule[0] || existingSchedule[0].cronExpression !== cronExpression) {
shouldUpdateNextRunAt = true
nextRunAt = new Date()
nextRunAt.setMinutes(nextRunAt.getMinutes() + 1)
}
break
}
default:
return NextResponse.json({ error: 'Invalid schedule type' }, { status: 400 })
}
// Prepare the values for upsert
const values: any = {
id: crypto.randomUUID(),
workflowId,
cronExpression,
triggerType: 'schedule',
createdAt: new Date(),
updatedAt: new Date(),
}
// Only include next_run_at if it should be updated
if (shouldUpdateNextRunAt && nextRunAt) {
values.nextRunAt = nextRunAt
}
// Prepare the set values for update
const setValues: any = {
cronExpression,
updatedAt: new Date(),
}
// Only include next_run_at in the update if it should be updated
if (shouldUpdateNextRunAt && nextRunAt) {
setValues.nextRunAt = nextRunAt
}
// Upsert the schedule
await db
.insert(workflowSchedule)
.values({
id: crypto.randomUUID(),
workflowId,
cronExpression,
nextRunAt,
triggerType: 'schedule',
createdAt: new Date(),
updatedAt: new Date(),
})
.values(values)
.onConflictDoUpdate({
target: [workflowSchedule.workflowId],
set: {
cronExpression,
nextRunAt,
updatedAt: new Date(),
},
set: setValues,
})
return NextResponse.json({
message: 'Schedule updated',
nextRunAt,
nextRunAt: shouldUpdateNextRunAt ? nextRunAt : existingSchedule[0]?.nextRunAt,
cronExpression,
})
} catch (error) {