diff --git a/app/api/scheduled/execute/route.ts b/app/api/scheduled/execute/route.ts index 871dae915..09c7fb87c 100644 --- a/app/api/scheduled/execute/route.ts +++ b/app/api/scheduled/execute/route.ts @@ -31,24 +31,47 @@ function calculateNextRunTime( const scheduleType = getSubBlockValue(starterBlock, 'scheduleType') + // If there's a cron expression, use that first regardless of schedule type + if (schedule.cronExpression) { + const cron = new Cron(schedule.cronExpression) + const nextDate = cron.nextRun() + if (!nextDate) throw new Error('Invalid cron expression or no future occurrences') + return nextDate + } + switch (scheduleType) { case 'minutes': { const interval = parseInt(getSubBlockValue(starterBlock, 'minutesInterval') || '15') - // If this is the first run and we have a starting time preference const startingAt = getSubBlockValue(starterBlock, 'minutesStartingAt') + + // If we have a specific starting time and this is the first run if (!schedule.lastRanAt && startingAt) { const [hours, minutes] = startingAt.split(':') const startTime = new Date() startTime.setHours(parseInt(hours), parseInt(minutes), 0, 0) - // If start time is in the past, add interval until it's in the future while (startTime <= new Date()) { startTime.setMinutes(startTime.getMinutes() + interval) } return startTime } - // For subsequent runs, just add the interval to the last run - const nextRun = new Date(schedule.lastRanAt || new Date()) - nextRun.setMinutes(nextRun.getMinutes() + interval) + + // For subsequent runs or if no starting time specified + const baseTime = schedule.lastRanAt ? new Date(schedule.lastRanAt) : new Date() + const currentMinutes = baseTime.getMinutes() + + // Find the next interval boundary after the base time + const nextIntervalBoundary = Math.ceil(currentMinutes / interval) * interval + const nextRun = new Date(baseTime) + + // Handle minute rollover properly + const minutesToAdd = nextIntervalBoundary - currentMinutes + nextRun.setMinutes(nextRun.getMinutes() + minutesToAdd, 0, 0) + + // If we're already past this time, add another interval + if (nextRun <= new Date()) { + nextRun.setMinutes(nextRun.getMinutes() + interval) + } + return nextRun } case 'hourly': { @@ -119,7 +142,7 @@ function calculateNextRunTime( const EnvVarsSchema = z.record(z.string()) export const config = { - runtime: 'edge', + runtime: 'nodejs', schedule: '*/1 * * * *', } @@ -129,177 +152,229 @@ const runningExecutions = new Set() // Add GET handler for cron job export async function GET(req: NextRequest) { const now = new Date() + console.log('Starting scheduled execution check at:', now.toISOString()) - // Query schedules due for execution - const dueSchedules = await db - .select() - .from(workflowSchedule) - .where(lte(workflowSchedule.nextRunAt, now)) - // Limit to 10 workflows per minute to prevent overload - .limit(10) + let dueSchedules: (typeof workflowSchedule.$inferSelect)[] = [] - for (const schedule of dueSchedules) { - try { - // Skip if this workflow is already running - if (runningExecutions.has(schedule.workflowId)) { - console.log(`Skipping workflow ${schedule.workflowId} - already running`) - continue - } + try { + // Query schedules due for execution + dueSchedules = await db + .select() + .from(workflowSchedule) + .where(lte(workflowSchedule.nextRunAt, now)) + // Limit to 10 workflows per minute to prevent overload + .limit(10) - runningExecutions.add(schedule.workflowId) + console.log('Found due schedules:', dueSchedules.length) - // Retrieve the workflow record - const [workflowRecord] = await db - .select() - .from(workflow) - .where(eq(workflow.id, schedule.workflowId)) - .limit(1) + for (const schedule of dueSchedules) { + console.log('Processing schedule:', { + id: schedule.id, + workflowId: schedule.workflowId, + nextRunAt: schedule.nextRunAt, + cronExpression: schedule.cronExpression, + }) - if (!workflowRecord) { - runningExecutions.delete(schedule.workflowId) - continue - } + try { + // Skip if this workflow is already running + if (runningExecutions.has(schedule.workflowId)) { + console.log(`Skipping workflow ${schedule.workflowId} - already running`) + continue + } - // The state in the database is exactly what we store in localStorage - const state = workflowRecord.state as WorkflowState - const { blocks, edges, loops } = state + runningExecutions.add(schedule.workflowId) - // Use the same execution flow as in use-workflow-execution.ts - const mergedStates = mergeSubblockState(blocks) + // Retrieve the workflow record + const [workflowRecord] = await db + .select() + .from(workflow) + .where(eq(workflow.id, schedule.workflowId)) + .limit(1) - // Retrieve environment variables for this user - const [userEnv] = await db - .select() - .from(environment) - .where(eq(environment.userId, workflowRecord.userId)) - .limit(1) + if (!workflowRecord) { + runningExecutions.delete(schedule.workflowId) + continue + } - if (!userEnv) { - throw new Error('No environment variables found for this user') - } + // The state in the database is exactly what we store in localStorage + const state = workflowRecord.state as WorkflowState + const { blocks, edges, loops } = state - // Parse and validate environment variables - const variables = EnvVarsSchema.parse(userEnv.variables) + // Use the same execution flow as in use-workflow-execution.ts + const mergedStates = mergeSubblockState(blocks) - // Replace environment variables in the block states - const currentBlockStates = await Object.entries(mergedStates).reduce( - async (accPromise, [id, block]) => { - const acc = await accPromise - acc[id] = await Object.entries(block.subBlocks).reduce( - async (subAccPromise, [key, subBlock]) => { - const subAcc = await subAccPromise - let value = subBlock.value + // Retrieve environment variables for this user + const [userEnv] = await db + .select() + .from(environment) + .where(eq(environment.userId, workflowRecord.userId)) + .limit(1) - // If the value is a string and contains environment variable syntax - if (typeof value === 'string' && value.includes('{{') && value.includes('}}')) { - const matches = value.match(/{{([^}]+)}}/g) - if (matches) { - // Process all matches sequentially - for (const match of matches) { - const varName = match.slice(2, -2) // Remove {{ and }} - const encryptedValue = variables[varName] - if (!encryptedValue) { - throw new Error(`Environment variable "${varName}" was not found`) - } + if (!userEnv) { + throw new Error('No environment variables found for this user') + } - try { - const { decrypted } = await decryptSecret(encryptedValue) - value = (value as string).replace(match, decrypted) - } catch (error: any) { - console.error('Error decrypting value:', error) - throw new Error( - `Failed to decrypt environment variable "${varName}": ${error.message}` - ) + // Parse and validate environment variables + const variables = EnvVarsSchema.parse(userEnv.variables) + + // Replace environment variables in the block states + const currentBlockStates = await Object.entries(mergedStates).reduce( + async (accPromise, [id, block]) => { + const acc = await accPromise + acc[id] = await Object.entries(block.subBlocks).reduce( + async (subAccPromise, [key, subBlock]) => { + const subAcc = await subAccPromise + let value = subBlock.value + + // If the value is a string and contains environment variable syntax + if (typeof value === 'string' && value.includes('{{') && value.includes('}}')) { + const matches = value.match(/{{([^}]+)}}/g) + if (matches) { + // Process all matches sequentially + for (const match of matches) { + const varName = match.slice(2, -2) // Remove {{ and }} + const encryptedValue = variables[varName] + if (!encryptedValue) { + throw new Error(`Environment variable "${varName}" was not found`) + } + + try { + const { decrypted } = await decryptSecret(encryptedValue) + value = (value as string).replace(match, decrypted) + } catch (error: any) { + console.error('Error decrypting value:', error) + throw new Error( + `Failed to decrypt environment variable "${varName}": ${error.message}` + ) + } } } } - } - subAcc[key] = value - return subAcc - }, - Promise.resolve({} as Record) - ) - return acc - }, - Promise.resolve({} as Record>) - ) + subAcc[key] = value + return subAcc + }, + Promise.resolve({} as Record) + ) + return acc + }, + Promise.resolve({} as Record>) + ) - // Create a map of decrypted environment variables - const decryptedEnvVars: Record = {} - for (const [key, encryptedValue] of Object.entries(variables)) { - try { - const { decrypted } = await decryptSecret(encryptedValue) - decryptedEnvVars[key] = decrypted - } catch (error: any) { - console.error(`Failed to decrypt ${key}:`, error) - throw new Error(`Failed to decrypt environment variable "${key}": ${error.message}`) + // Create a map of decrypted environment variables + const decryptedEnvVars: Record = {} + for (const [key, encryptedValue] of Object.entries(variables)) { + try { + const { decrypted } = await decryptSecret(encryptedValue) + decryptedEnvVars[key] = decrypted + } catch (error: any) { + console.error(`Failed to decrypt ${key}:`, error) + throw new Error(`Failed to decrypt environment variable "${key}": ${error.message}`) + } } - } - // Serialize and execute the workflow - const serializedWorkflow = new Serializer().serializeWorkflow(mergedStates, edges, loops) - const executor = new Executor(serializedWorkflow, currentBlockStates, decryptedEnvVars) - const executionId = uuidv4() - const result = await executor.execute(schedule.workflowId) + // Serialize and execute the workflow + const serializedWorkflow = new Serializer().serializeWorkflow(mergedStates, edges, loops) + const executor = new Executor(serializedWorkflow, currentBlockStates, decryptedEnvVars) + const executionId = uuidv4() + const result = await executor.execute(schedule.workflowId) - // Log each execution step - for (const log of result.logs || []) { + // Log each execution step + for (const log of result.logs || []) { + await persistLog({ + id: uuidv4(), + workflowId: schedule.workflowId, + executionId, + level: log.success ? 'info' : 'error', + message: `Block ${log.blockName || log.blockId} (${log.blockType}): ${ + log.error || `Completed successfully` + }`, + duration: log.success ? `${log.durationMs}ms` : 'NA', + createdAt: new Date(log.endedAt || log.startedAt), + }) + } + + // Calculate total duration from successful block logs + const totalDuration = (result.logs || []) + .filter((log) => log.success) + .reduce((sum, log) => sum + log.durationMs, 0) + + // Log the final execution result await persistLog({ id: uuidv4(), workflowId: schedule.workflowId, executionId, - level: log.success ? 'info' : 'error', - message: `Block ${log.blockName || log.blockId} (${log.blockType}): ${ - log.error || `Completed successfully` - }`, - duration: log.success ? `${log.durationMs}ms` : 'NA', - createdAt: new Date(log.endedAt || log.startedAt), + level: result.success ? 'info' : 'error', + message: result.success + ? 'Scheduled workflow executed successfully' + : `Scheduled workflow execution failed: ${result.error}`, + duration: result.success ? `${totalDuration}ms` : 'NA', + createdAt: new Date(), }) + + // Only update next_run_at if execution was successful + if (result.success) { + // Calculate the next run time based on the schedule configuration + const nextRunAt = calculateNextRunTime(schedule, blocks) + console.log('Calculated next run time:', nextRunAt.toISOString()) + + // Update the schedule with the next run time + await db + .update(workflowSchedule) + .set({ + lastRanAt: now, + updatedAt: now, + nextRunAt, + }) + .where(eq(workflowSchedule.id, schedule.id)) + + console.log('Updated schedule with new run time') + } else { + // If execution failed, increment next_run_at by a small delay to prevent immediate retries + const retryDelay = 1 * 60 * 1000 // 1 minute delay + const nextRetryAt = new Date(now.getTime() + retryDelay) + + await db + .update(workflowSchedule) + .set({ + updatedAt: now, + nextRunAt: nextRetryAt, + }) + .where(eq(workflowSchedule.id, schedule.id)) + + console.log('Execution failed, scheduled retry at:', nextRetryAt.toISOString()) + } + } catch (error: any) { + console.error('Error executing workflow:', error) + await persistLog({ + id: uuidv4(), + workflowId: schedule.workflowId, + executionId: uuidv4(), + level: 'error', + message: error.message || 'Unknown error during scheduled workflow execution', + createdAt: new Date(), + }) + + // On error, increment next_run_at by a small delay to prevent immediate retries + const retryDelay = 1 * 60 * 1000 // 1 minute delay + const nextRetryAt = new Date(now.getTime() + retryDelay) + + await db + .update(workflowSchedule) + .set({ + updatedAt: now, + nextRunAt: nextRetryAt, + }) + .where(eq(workflowSchedule.id, schedule.id)) + + console.log('Execution error, scheduled retry at:', nextRetryAt.toISOString()) + } finally { + runningExecutions.delete(schedule.workflowId) } - - // Calculate total duration from successful block logs - const totalDuration = (result.logs || []) - .filter((log) => log.success) - .reduce((sum, log) => sum + log.durationMs, 0) - - // Log the final execution result - await persistLog({ - id: uuidv4(), - workflowId: schedule.workflowId, - executionId, - level: result.success ? 'info' : 'error', - message: result.success - ? 'Scheduled workflow executed successfully' - : `Scheduled workflow execution failed: ${result.error}`, - duration: result.success ? `${totalDuration}ms` : 'NA', - createdAt: new Date(), - }) - - // Calculate the next run time based on the schedule configuration - const nextRunAt = calculateNextRunTime(schedule, blocks) - - // Update the schedule with the next run time - await db - .update(workflowSchedule) - .set({ - lastRanAt: now, - updatedAt: now, - nextRunAt, - }) - .where(eq(workflowSchedule.id, schedule.id)) - } catch (error: any) { - await persistLog({ - id: uuidv4(), - workflowId: schedule.workflowId, - executionId: uuidv4(), - level: 'error', - message: error.message || 'Unknown error during scheduled workflow execution', - createdAt: new Date(), - }) - } finally { - runningExecutions.delete(schedule.workflowId) } + } catch (error: any) { + console.error('Error in scheduled execution:', error) + return NextResponse.json({ error: error.message }, { status: 500 }) } return NextResponse.json({ diff --git a/app/api/scheduled/schedule/route.ts b/app/api/scheduled/schedule/route.ts index 85e76a486..72541d4cb 100644 --- a/app/api/scheduled/schedule/route.ts +++ b/app/api/scheduled/schedule/route.ts @@ -56,39 +56,72 @@ export async function POST(req: NextRequest) { // Get schedule configuration from starter block const scheduleType = getSubBlockValue(starterBlock, 'scheduleType') - // Calculate initial next run time based on schedule type + // Calculate cron expression based on schedule type let cronExpression: string | null = null - let nextRunAt = new Date() + let shouldUpdateNextRunAt = false + let nextRunAt: Date | undefined + + // First check if there's an existing schedule + const existingSchedule = await db + .select() + .from(workflowSchedule) + .where(eq(workflowSchedule.workflowId, workflowId)) + .limit(1) switch (scheduleType) { case 'minutes': { const interval = parseInt(getSubBlockValue(starterBlock, 'minutesInterval') || '15') - const startingAt = getSubBlockValue(starterBlock, 'minutesStartingAt') - if (startingAt) { - const [hours, minutes] = startingAt.split(':') - nextRunAt.setHours(parseInt(hours), parseInt(minutes), 0, 0) - while (nextRunAt <= new Date()) { - nextRunAt.setMinutes(nextRunAt.getMinutes() + interval) - } - } else { - nextRunAt.setMinutes(nextRunAt.getMinutes() + interval) - } cronExpression = `*/${interval} * * * *` + + // Check if we need to update next_run_at + if (!existingSchedule[0] || existingSchedule[0].cronExpression !== cronExpression) { + shouldUpdateNextRunAt = true + nextRunAt = new Date() + const startingAt = getSubBlockValue(starterBlock, 'minutesStartingAt') + + if (startingAt) { + const [hours, minutes] = startingAt.split(':') + nextRunAt.setHours(parseInt(hours), parseInt(minutes), 0, 0) + while (nextRunAt <= new Date()) { + nextRunAt.setMinutes(nextRunAt.getMinutes() + interval) + } + } else { + // Round down to nearest interval boundary + const now = new Date() + const currentMinutes = now.getMinutes() + const lastIntervalBoundary = Math.floor(currentMinutes / interval) * interval + nextRunAt = new Date(now) + nextRunAt.setMinutes(lastIntervalBoundary, 0, 0) + while (nextRunAt <= now) { + nextRunAt.setMinutes(nextRunAt.getMinutes() + interval) + } + } + } break } case 'hourly': { const minute = parseInt(getSubBlockValue(starterBlock, 'hourlyMinute') || '0') - nextRunAt.setHours(nextRunAt.getHours() + 1, minute, 0, 0) cronExpression = `${minute} * * * *` + + if (!existingSchedule[0] || existingSchedule[0].cronExpression !== cronExpression) { + shouldUpdateNextRunAt = true + nextRunAt = new Date() + nextRunAt.setHours(nextRunAt.getHours() + 1, minute, 0, 0) + } break } case 'daily': { const [hours, minutes] = getSubBlockValue(starterBlock, 'dailyTime').split(':') - nextRunAt.setHours(parseInt(hours || '9'), parseInt(minutes || '0'), 0, 0) - if (nextRunAt <= new Date()) { - nextRunAt.setDate(nextRunAt.getDate() + 1) - } cronExpression = `${minutes || '0'} ${hours || '9'} * * *` + + if (!existingSchedule[0] || existingSchedule[0].cronExpression !== cronExpression) { + shouldUpdateNextRunAt = true + nextRunAt = new Date() + nextRunAt.setHours(parseInt(hours || '9'), parseInt(minutes || '0'), 0, 0) + if (nextRunAt <= new Date()) { + nextRunAt.setDate(nextRunAt.getDate() + 1) + } + } break } case 'weekly': { @@ -103,22 +136,32 @@ export async function POST(req: NextRequest) { } const targetDay = dayMap[getSubBlockValue(starterBlock, 'weeklyDay') || 'MON'] const [hours, minutes] = getSubBlockValue(starterBlock, 'weeklyDayTime').split(':') - nextRunAt.setHours(parseInt(hours || '9'), parseInt(minutes || '0'), 0, 0) - while (nextRunAt.getDay() !== targetDay || nextRunAt <= new Date()) { - nextRunAt.setDate(nextRunAt.getDate() + 1) - } cronExpression = `${minutes || '0'} ${hours || '9'} * * ${targetDay}` + + if (!existingSchedule[0] || existingSchedule[0].cronExpression !== cronExpression) { + shouldUpdateNextRunAt = true + nextRunAt = new Date() + nextRunAt.setHours(parseInt(hours || '9'), parseInt(minutes || '0'), 0, 0) + while (nextRunAt.getDay() !== targetDay || nextRunAt <= new Date()) { + nextRunAt.setDate(nextRunAt.getDate() + 1) + } + } break } case 'monthly': { const day = parseInt(getSubBlockValue(starterBlock, 'monthlyDay') || '1') const [hours, minutes] = getSubBlockValue(starterBlock, 'monthlyTime').split(':') - nextRunAt.setDate(day) - nextRunAt.setHours(parseInt(hours || '9'), parseInt(minutes || '0'), 0, 0) - if (nextRunAt <= new Date()) { - nextRunAt.setMonth(nextRunAt.getMonth() + 1) - } cronExpression = `${minutes || '0'} ${hours || '9'} ${day} * *` + + if (!existingSchedule[0] || existingSchedule[0].cronExpression !== cronExpression) { + shouldUpdateNextRunAt = true + nextRunAt = new Date() + nextRunAt.setDate(day) + nextRunAt.setHours(parseInt(hours || '9'), parseInt(minutes || '0'), 0, 0) + if (nextRunAt <= new Date()) { + nextRunAt.setMonth(nextRunAt.getMonth() + 1) + } + } break } case 'custom': { @@ -129,39 +172,56 @@ export async function POST(req: NextRequest) { { status: 400 } ) } - // The execute-workflows endpoint will handle custom cron calculation - nextRunAt = new Date() - nextRunAt.setMinutes(nextRunAt.getMinutes() + 1) // Start in 1 minute + + if (!existingSchedule[0] || existingSchedule[0].cronExpression !== cronExpression) { + shouldUpdateNextRunAt = true + nextRunAt = new Date() + nextRunAt.setMinutes(nextRunAt.getMinutes() + 1) + } break } default: return NextResponse.json({ error: 'Invalid schedule type' }, { status: 400 }) } + // Prepare the values for upsert + const values: any = { + id: crypto.randomUUID(), + workflowId, + cronExpression, + triggerType: 'schedule', + createdAt: new Date(), + updatedAt: new Date(), + } + + // Only include next_run_at if it should be updated + if (shouldUpdateNextRunAt && nextRunAt) { + values.nextRunAt = nextRunAt + } + + // Prepare the set values for update + const setValues: any = { + cronExpression, + updatedAt: new Date(), + } + + // Only include next_run_at in the update if it should be updated + if (shouldUpdateNextRunAt && nextRunAt) { + setValues.nextRunAt = nextRunAt + } + // Upsert the schedule await db .insert(workflowSchedule) - .values({ - id: crypto.randomUUID(), - workflowId, - cronExpression, - nextRunAt, - triggerType: 'schedule', - createdAt: new Date(), - updatedAt: new Date(), - }) + .values(values) .onConflictDoUpdate({ target: [workflowSchedule.workflowId], - set: { - cronExpression, - nextRunAt, - updatedAt: new Date(), - }, + set: setValues, }) return NextResponse.json({ message: 'Schedule updated', - nextRunAt, + nextRunAt: shouldUpdateNextRunAt ? nextRunAt : existingSchedule[0]?.nextRunAt, cronExpression, }) } catch (error) {