feat(starter): add GET for vercel cron job to execute scheduled workflows

This commit is contained in:
Waleed Latif
2025-02-19 11:56:18 -08:00
parent 9de0d6fb95
commit c8a8d9608a

View File

@@ -224,3 +224,107 @@ export async function POST(req: NextRequest) {
executedCount: dueSchedules.length,
})
}
// Add GET handler for cron job
export async function GET(req: NextRequest) {
const now = new Date()
// Query schedules due for execution
const dueSchedules = await db
.select()
.from(workflowSchedule)
.where(lte(workflowSchedule.nextRunAt, now))
// Limit to 10 workflows per minute to prevent overload
.limit(10)
for (const schedule of dueSchedules) {
try {
// Skip if this workflow is already running
if (runningExecutions.has(schedule.workflowId)) {
console.log(`Skipping workflow ${schedule.workflowId} - already running`)
continue
}
runningExecutions.add(schedule.workflowId)
// Retrieve the workflow record
const [workflowRecord] = await db
.select()
.from(workflow)
.where(eq(workflow.id, schedule.workflowId))
.limit(1)
if (!workflowRecord) {
runningExecutions.delete(schedule.workflowId)
continue
}
// The state in the database is exactly what we store in localStorage
const state = workflowRecord.state as WorkflowState
const { blocks, edges, loops } = state
// Use the same execution flow as in use-workflow-execution.ts
const mergedStates = mergeSubblockState(blocks)
const currentBlockStates = Object.entries(mergedStates).reduce(
(acc, [id, block]) => {
acc[id] = Object.entries(block.subBlocks).reduce(
(subAcc, [key, subBlock]) => {
subAcc[key] = subBlock.value
return subAcc
},
{} as Record<string, any>
)
return acc
},
{} as Record<string, Record<string, any>>
)
// Serialize and execute the workflow
const serializedWorkflow = new Serializer().serializeWorkflow(mergedStates, edges, loops)
const executor = new Executor(serializedWorkflow, currentBlockStates, {})
const executionId = uuidv4()
const result = await executor.execute(schedule.workflowId)
// Log the execution result
await persistLog({
id: uuidv4(),
workflowId: schedule.workflowId,
executionId,
level: result.success ? 'info' : 'error',
message: result.success
? 'Scheduled workflow executed successfully'
: `Scheduled workflow execution failed: ${result.error}`,
createdAt: new Date(),
})
// Calculate the next run time based on the schedule configuration
const nextRunAt = calculateNextRunTime(schedule, blocks)
// Update the schedule with the next run time
await db
.update(workflowSchedule)
.set({
lastRanAt: now,
updatedAt: now,
nextRunAt,
})
.where(eq(workflowSchedule.id, schedule.id))
} catch (error: any) {
await persistLog({
id: uuidv4(),
workflowId: schedule.workflowId,
executionId: uuidv4(),
level: 'error',
message: error.message || 'Unknown error during scheduled workflow execution',
createdAt: new Date(),
})
} finally {
runningExecutions.delete(schedule.workflowId)
}
}
return NextResponse.json({
message: 'Scheduled workflow executions processed',
executedCount: dueSchedules.length,
})
}