From c8a8d9608a4f69cd0d64d7834de933770406a0c1 Mon Sep 17 00:00:00 2001 From: Waleed Latif Date: Wed, 19 Feb 2025 11:56:18 -0800 Subject: [PATCH] feat(starter): add GET for vercel cron job to execute scheduled workflows --- app/api/scheduled/execute/route.ts | 104 +++++++++++++++++++++++++++++ 1 file changed, 104 insertions(+) diff --git a/app/api/scheduled/execute/route.ts b/app/api/scheduled/execute/route.ts index 7ca3195bd..5cc24071f 100644 --- a/app/api/scheduled/execute/route.ts +++ b/app/api/scheduled/execute/route.ts @@ -224,3 +224,107 @@ export async function POST(req: NextRequest) { executedCount: dueSchedules.length, }) } + +// Add GET handler for cron job +export async function GET(req: NextRequest) { + const now = new Date() + + // Query schedules due for execution + const dueSchedules = await db + .select() + .from(workflowSchedule) + .where(lte(workflowSchedule.nextRunAt, now)) + // Limit to 10 workflows per minute to prevent overload + .limit(10) + + for (const schedule of dueSchedules) { + try { + // Skip if this workflow is already running + if (runningExecutions.has(schedule.workflowId)) { + console.log(`Skipping workflow ${schedule.workflowId} - already running`) + continue + } + + runningExecutions.add(schedule.workflowId) + + // Retrieve the workflow record + const [workflowRecord] = await db + .select() + .from(workflow) + .where(eq(workflow.id, schedule.workflowId)) + .limit(1) + + if (!workflowRecord) { + runningExecutions.delete(schedule.workflowId) + continue + } + + // The state in the database is exactly what we store in localStorage + const state = workflowRecord.state as WorkflowState + const { blocks, edges, loops } = state + + // Use the same execution flow as in use-workflow-execution.ts + const mergedStates = mergeSubblockState(blocks) + const currentBlockStates = Object.entries(mergedStates).reduce( + (acc, [id, block]) => { + acc[id] = Object.entries(block.subBlocks).reduce( + (subAcc, [key, subBlock]) => { + subAcc[key] = subBlock.value + return subAcc + }, + {} as Record + ) + return acc + }, + {} as Record> + ) + + // Serialize and execute the workflow + const serializedWorkflow = new Serializer().serializeWorkflow(mergedStates, edges, loops) + const executor = new Executor(serializedWorkflow, currentBlockStates, {}) + const executionId = uuidv4() + const result = await executor.execute(schedule.workflowId) + + // Log the execution result + await persistLog({ + id: uuidv4(), + workflowId: schedule.workflowId, + executionId, + level: result.success ? 'info' : 'error', + message: result.success + ? 'Scheduled workflow executed successfully' + : `Scheduled workflow execution failed: ${result.error}`, + createdAt: new Date(), + }) + + // Calculate the next run time based on the schedule configuration + const nextRunAt = calculateNextRunTime(schedule, blocks) + + // Update the schedule with the next run time + await db + .update(workflowSchedule) + .set({ + lastRanAt: now, + updatedAt: now, + nextRunAt, + }) + .where(eq(workflowSchedule.id, schedule.id)) + } catch (error: any) { + await persistLog({ + id: uuidv4(), + workflowId: schedule.workflowId, + executionId: uuidv4(), + level: 'error', + message: error.message || 'Unknown error during scheduled workflow execution', + createdAt: new Date(), + }) + } finally { + runningExecutions.delete(schedule.workflowId) + } + } + + return NextResponse.json({ + message: 'Scheduled workflow executions processed', + executedCount: dueSchedules.length, + }) +}