diff --git a/app/api/scheduled/execute/route.ts b/app/api/scheduled/execute/route.ts new file mode 100644 index 000000000..7ca3195bd --- /dev/null +++ b/app/api/scheduled/execute/route.ts @@ -0,0 +1,226 @@ +import { NextRequest, NextResponse } from 'next/server' +import { Cron } from 'croner' +import { eq, lte } from 'drizzle-orm' +import { v4 as uuidv4 } from 'uuid' +import { persistLog } from '@/lib/logging' +import { BlockState, WorkflowState } from '@/stores/workflow/types' +import { mergeSubblockState } from '@/stores/workflow/utils' +import { db } from '@/db' +import { workflow, workflowSchedule } from '@/db/schema' +import { Executor } from '@/executor' +import { Serializer } from '@/serializer' + +interface SubBlockValue { + value: string +} + +function getSubBlockValue(block: BlockState, id: string): string { + const subBlock = block.subBlocks[id] as SubBlockValue | undefined + return subBlock?.value || '' +} + +function calculateNextRunTime( + schedule: typeof workflowSchedule.$inferSelect, + blocks: Record +): Date { + // Find the starter block + const starterBlock = Object.values(blocks).find((block) => block.type === 'starter') + if (!starterBlock) throw new Error('No starter block found') + + const scheduleType = getSubBlockValue(starterBlock, 'scheduleType') + const timezone = getSubBlockValue(starterBlock, 'timezone') || 'UTC' + + switch (scheduleType) { + case 'minutes': { + const interval = parseInt(getSubBlockValue(starterBlock, 'minutesInterval') || '15') + // If this is the first run and we have a starting time preference + const startingAt = getSubBlockValue(starterBlock, 'minutesStartingAt') + if (!schedule.lastRanAt && startingAt) { + const [hours, minutes] = startingAt.split(':') + const startTime = new Date() + startTime.setHours(parseInt(hours), parseInt(minutes), 0, 0) + // If start time is in the past, add interval until it's in the future + while (startTime <= new Date()) { + startTime.setMinutes(startTime.getMinutes() + interval) + } + return startTime + } + // For subsequent runs, just add the interval to the last run + const nextRun = new Date(schedule.lastRanAt || new Date()) + nextRun.setMinutes(nextRun.getMinutes() + interval) + return nextRun + } + case 'hourly': { + const minute = parseInt(getSubBlockValue(starterBlock, 'hourlyMinute') || '0') + const nextRun = new Date() + nextRun.setHours(nextRun.getHours() + 1, minute, 0, 0) + return nextRun + } + case 'daily': { + const [hours, minutes] = getSubBlockValue(starterBlock, 'dailyTime').split(':') + const nextRun = new Date() + nextRun.setHours(parseInt(hours || '9'), parseInt(minutes || '0'), 0, 0) + if (nextRun <= new Date()) { + nextRun.setDate(nextRun.getDate() + 1) + } + return nextRun + } + case 'weekly': { + const dayMap: Record = { + MON: 1, + TUE: 2, + WED: 3, + THU: 4, + FRI: 5, + SAT: 6, + SUN: 0, + } + const targetDay = dayMap[getSubBlockValue(starterBlock, 'weeklyDay') || 'MON'] + const [hours, minutes] = getSubBlockValue(starterBlock, 'weeklyDayTime').split(':') + const nextRun = new Date() + nextRun.setHours(parseInt(hours || '9'), parseInt(minutes || '0'), 0, 0) + + while (nextRun.getDay() !== targetDay || nextRun <= new Date()) { + nextRun.setDate(nextRun.getDate() + 1) + } + return nextRun + } + case 'monthly': { + const day = parseInt(getSubBlockValue(starterBlock, 'monthlyDay') || '1') + const [hours, minutes] = getSubBlockValue(starterBlock, 'monthlyTime').split(':') + const nextRun = new Date() + nextRun.setDate(day) + nextRun.setHours(parseInt(hours || '9'), parseInt(minutes || '0'), 0, 0) + if (nextRun <= new Date()) { + nextRun.setMonth(nextRun.getMonth() + 1) + } + return nextRun + } + case 'custom': { + const cronExpression = getSubBlockValue(starterBlock, 'cronExpression') + if (!cronExpression) throw new Error('No cron expression provided') + + // Create a new cron instance with the expression and timezone + const cron = new Cron(cronExpression, { timezone }) + + // Get the next occurrence after now + const nextDate = cron.nextRun() + if (!nextDate) throw new Error('Invalid cron expression or no future occurrences') + + return nextDate + } + default: + throw new Error(`Unsupported schedule type: ${scheduleType}`) + } +} + +export const config = { + runtime: 'edge', + schedule: '*/1 * * * *', +} + +// Keep track of running executions to prevent overlap +const runningExecutions = new Set() + +export async function POST(req: NextRequest) { + const now = new Date() + + // Query schedules due for execution + const dueSchedules = await db + .select() + .from(workflowSchedule) + .where(lte(workflowSchedule.nextRunAt, now)) + // Limit to 10 workflows per minute to prevent overload + .limit(10) + + for (const schedule of dueSchedules) { + try { + // Skip if this workflow is already running + if (runningExecutions.has(schedule.workflowId)) { + console.log(`Skipping workflow ${schedule.workflowId} - already running`) + continue + } + + runningExecutions.add(schedule.workflowId) + + // Retrieve the workflow record + const [workflowRecord] = await db + .select() + .from(workflow) + .where(eq(workflow.id, schedule.workflowId)) + .limit(1) + + if (!workflowRecord) { + runningExecutions.delete(schedule.workflowId) + continue + } + + // The state in the database is exactly what we store in localStorage + const state = workflowRecord.state as WorkflowState + const { blocks, edges, loops } = state + + // Use the same execution flow as in use-workflow-execution.ts + const mergedStates = mergeSubblockState(blocks) + const currentBlockStates = Object.entries(mergedStates).reduce( + (acc, [id, block]) => { + acc[id] = Object.entries(block.subBlocks).reduce( + (subAcc, [key, subBlock]) => { + subAcc[key] = subBlock.value + return subAcc + }, + {} as Record + ) + return acc + }, + {} as Record> + ) + + // Serialize and execute the workflow + const serializedWorkflow = new Serializer().serializeWorkflow(mergedStates, edges, loops) + const executor = new Executor(serializedWorkflow, currentBlockStates, {}) + const executionId = uuidv4() + const result = await executor.execute(schedule.workflowId) + + // Log the execution result + await persistLog({ + id: uuidv4(), + workflowId: schedule.workflowId, + executionId, + level: result.success ? 'info' : 'error', + message: result.success + ? 'Scheduled workflow executed successfully' + : `Scheduled workflow execution failed: ${result.error}`, + createdAt: new Date(), + }) + + // Calculate the next run time based on the schedule configuration + const nextRunAt = calculateNextRunTime(schedule, blocks) + + // Update the schedule with the next run time + await db + .update(workflowSchedule) + .set({ + lastRanAt: now, + updatedAt: now, + nextRunAt, + }) + .where(eq(workflowSchedule.id, schedule.id)) + } catch (error: any) { + await persistLog({ + id: uuidv4(), + workflowId: schedule.workflowId, + executionId: uuidv4(), + level: 'error', + message: error.message || 'Unknown error during scheduled workflow execution', + createdAt: new Date(), + }) + } finally { + runningExecutions.delete(schedule.workflowId) + } + } + + return NextResponse.json({ + message: 'Scheduled workflow executions processed', + executedCount: dueSchedules.length, + }) +} diff --git a/app/api/scheduled/schedule/route.ts b/app/api/scheduled/schedule/route.ts new file mode 100644 index 000000000..412285481 --- /dev/null +++ b/app/api/scheduled/schedule/route.ts @@ -0,0 +1,180 @@ +import { NextRequest, NextResponse } from 'next/server' +import { eq } from 'drizzle-orm' +import { z } from 'zod' +import { getSession } from '@/lib/auth' +import { BlockState } from '@/stores/workflow/types' +import { db } from '@/db' +import { workflow, workflowSchedule } from '@/db/schema' + +interface SubBlockValue { + value: string +} + +function getSubBlockValue(block: BlockState, id: string): string { + const subBlock = block.subBlocks[id] as SubBlockValue | undefined + return subBlock?.value || '' +} + +// Schema for schedule request +const ScheduleRequestSchema = z.object({ + workflowId: z.string(), + state: z.object({ + blocks: z.record(z.any()), + edges: z.array(z.any()), + loops: z.record(z.any()), + }), +}) + +export async function POST(req: NextRequest) { + try { + const session = await getSession() + if (!session?.user?.id) { + return NextResponse.json({ error: 'Unauthorized' }, { status: 401 }) + } + + const body = await req.json() + const { workflowId, state } = ScheduleRequestSchema.parse(body) + + // Find the starter block to check if it's configured for scheduling + const starterBlock = Object.values(state.blocks).find( + (block: any) => block.type === 'starter' + ) as BlockState | undefined + + if (!starterBlock) { + return NextResponse.json({ error: 'No starter block found in workflow' }, { status: 400 }) + } + + const startWorkflow = getSubBlockValue(starterBlock, 'startWorkflow') + + // If the workflow is not scheduled, delete any existing schedule + if (startWorkflow !== 'schedule') { + await db.delete(workflowSchedule).where(eq(workflowSchedule.workflowId, workflowId)) + + return NextResponse.json({ message: 'Schedule removed' }) + } + + // Get schedule configuration from starter block + const scheduleType = getSubBlockValue(starterBlock, 'scheduleType') + const timezone = getSubBlockValue(starterBlock, 'timezone') || 'UTC' + + // Calculate initial next run time based on schedule type + let cronExpression: string | null = null + let nextRunAt = new Date() + + switch (scheduleType) { + case 'minutes': { + const interval = parseInt(getSubBlockValue(starterBlock, 'minutesInterval') || '15') + const startingAt = getSubBlockValue(starterBlock, 'minutesStartingAt') + if (startingAt) { + const [hours, minutes] = startingAt.split(':') + nextRunAt.setHours(parseInt(hours), parseInt(minutes), 0, 0) + while (nextRunAt <= new Date()) { + nextRunAt.setMinutes(nextRunAt.getMinutes() + interval) + } + } else { + nextRunAt.setMinutes(nextRunAt.getMinutes() + interval) + } + cronExpression = `*/${interval} * * * *` + break + } + case 'hourly': { + const minute = parseInt(getSubBlockValue(starterBlock, 'hourlyMinute') || '0') + nextRunAt.setHours(nextRunAt.getHours() + 1, minute, 0, 0) + cronExpression = `${minute} * * * *` + break + } + case 'daily': { + const [hours, minutes] = getSubBlockValue(starterBlock, 'dailyTime').split(':') + nextRunAt.setHours(parseInt(hours || '9'), parseInt(minutes || '0'), 0, 0) + if (nextRunAt <= new Date()) { + nextRunAt.setDate(nextRunAt.getDate() + 1) + } + cronExpression = `${minutes || '0'} ${hours || '9'} * * *` + break + } + case 'weekly': { + const dayMap: Record = { + MON: 1, + TUE: 2, + WED: 3, + THU: 4, + FRI: 5, + SAT: 6, + SUN: 0, + } + const targetDay = dayMap[getSubBlockValue(starterBlock, 'weeklyDay') || 'MON'] + const [hours, minutes] = getSubBlockValue(starterBlock, 'weeklyDayTime').split(':') + nextRunAt.setHours(parseInt(hours || '9'), parseInt(minutes || '0'), 0, 0) + while (nextRunAt.getDay() !== targetDay || nextRunAt <= new Date()) { + nextRunAt.setDate(nextRunAt.getDate() + 1) + } + cronExpression = `${minutes || '0'} ${hours || '9'} * * ${targetDay}` + break + } + case 'monthly': { + const day = parseInt(getSubBlockValue(starterBlock, 'monthlyDay') || '1') + const [hours, minutes] = getSubBlockValue(starterBlock, 'monthlyTime').split(':') + nextRunAt.setDate(day) + nextRunAt.setHours(parseInt(hours || '9'), parseInt(minutes || '0'), 0, 0) + if (nextRunAt <= new Date()) { + nextRunAt.setMonth(nextRunAt.getMonth() + 1) + } + cronExpression = `${minutes || '0'} ${hours || '9'} ${day} * *` + break + } + case 'custom': { + cronExpression = getSubBlockValue(starterBlock, 'cronExpression') + if (!cronExpression) { + return NextResponse.json( + { error: 'No cron expression provided for custom schedule' }, + { status: 400 } + ) + } + // The execute-workflows endpoint will handle custom cron calculation + nextRunAt = new Date() + nextRunAt.setMinutes(nextRunAt.getMinutes() + 1) // Start in 1 minute + break + } + default: + return NextResponse.json({ error: 'Invalid schedule type' }, { status: 400 }) + } + + // Upsert the schedule + await db + .insert(workflowSchedule) + .values({ + id: crypto.randomUUID(), + workflowId, + cronExpression, + nextRunAt, + timezone, + triggerType: 'schedule', + createdAt: new Date(), + updatedAt: new Date(), + }) + .onConflictDoUpdate({ + target: [workflowSchedule.workflowId], + set: { + cronExpression, + nextRunAt, + timezone, + updatedAt: new Date(), + }, + }) + + return NextResponse.json({ + message: 'Schedule updated', + nextRunAt, + cronExpression, + }) + } catch (error) { + console.error('Error updating workflow schedule:', error) + if (error instanceof z.ZodError) { + return NextResponse.json( + { error: 'Invalid request data', details: error.errors }, + { status: 400 } + ) + } + return NextResponse.json({ error: 'Failed to update workflow schedule' }, { status: 500 }) + } +} diff --git a/lib/logging.ts b/lib/logging.ts new file mode 100644 index 000000000..906621cc5 --- /dev/null +++ b/lib/logging.ts @@ -0,0 +1,15 @@ +import { db } from '@/db' +import { consoleLog } from '@/db/schema' + +export interface LogEntry { + id: string + workflowId: string + executionId: string + level: string + message: string + createdAt: Date +} + +export async function persistLog(log: LogEntry) { + await db.insert(consoleLog).values(log) +} diff --git a/package-lock.json b/package-lock.json index 0e1a21d1a..714d2bb88 100644 --- a/package-lock.json +++ b/package-lock.json @@ -26,6 +26,8 @@ "class-variance-authority": "^0.7.1", "clsx": "^2.1.1", "cmdk": "^1.0.0", + "cron-parser": "^5.0.2", + "croner": "^9.0.0", "date-fns": "^4.1.0", "drizzle-orm": "^0.39.3", "lodash.debounce": "^4.0.8", @@ -41,6 +43,7 @@ "resend": "^4.1.2", "tailwind-merge": "^2.6.0", "tailwindcss-animate": "^1.0.7", + "uuid": "^11.0.5", "zod": "^3.24.1" }, "devDependencies": { @@ -5975,6 +5978,27 @@ "node": "^14.15.0 || ^16.10.0 || >=18.0.0" } }, + "node_modules/cron-parser": { + "version": "5.0.2", + "resolved": "https://registry.npmjs.org/cron-parser/-/cron-parser-5.0.2.tgz", + "integrity": "sha512-RXXr5WuvLInay/DstwRD087/DpLOm24Y3mrkRmOdrMjbdGfQyPiMsx1Ad+SaJF2zN0tN78ckBjcEkIHVd+MX7Q==", + "license": "MIT", + "dependencies": { + "luxon": "^3.5.0" + }, + "engines": { + "node": ">=18" + } + }, + "node_modules/croner": { + "version": "9.0.0", + "resolved": "https://registry.npmjs.org/croner/-/croner-9.0.0.tgz", + "integrity": "sha512-onMB0OkDjkXunhdW9htFjEhqrD54+M94i6ackoUkjHKbRnXdyEyKRelp4nJ1kAz32+s27jP1FsebpJCVl0BsvA==", + "license": "MIT", + "engines": { + "node": ">=18.0" + } + }, "node_modules/cross-spawn": { "version": "7.0.6", "resolved": "https://registry.npmjs.org/cross-spawn/-/cross-spawn-7.0.6.tgz", @@ -8682,6 +8706,15 @@ "react": "^16.5.1 || ^17.0.0 || ^18.0.0 || ^19.0.0" } }, + "node_modules/luxon": { + "version": "3.5.0", + "resolved": "https://registry.npmjs.org/luxon/-/luxon-3.5.0.tgz", + "integrity": "sha512-rh+Zjr6DNfUYR3bPwJEnuwDdqMbxZW7LOQfUN4B54+Cl+0o5zaU9RJ6bcidfDtC1cWCZXQ+nvX8bf6bAji37QQ==", + "license": "MIT", + "engines": { + "node": ">=12" + } + }, "node_modules/make-dir": { "version": "4.0.0", "resolved": "https://registry.npmjs.org/make-dir/-/make-dir-4.0.0.tgz", @@ -10928,6 +10961,19 @@ "integrity": "sha512-EPD5q1uXyFxJpCrLnCc1nHnq3gOa6DZBocAIiI2TaSCA7VCJ1UJDMagCzIkXNsUYfD1daK//LTEQ8xiIbrHtcw==", "license": "MIT" }, + "node_modules/uuid": { + "version": "11.0.5", + "resolved": "https://registry.npmjs.org/uuid/-/uuid-11.0.5.tgz", + "integrity": "sha512-508e6IcKLrhxKdBbcA2b4KQZlLVp2+J5UwQ6F7Drckkc5N9ZJwFa4TgWtsww9UG8fGHbm6gbV19TdM5pQ4GaIA==", + "funding": [ + "https://github.com/sponsors/broofa", + "https://github.com/sponsors/ctavan" + ], + "license": "MIT", + "bin": { + "uuid": "dist/esm/bin/uuid" + } + }, "node_modules/v8-to-istanbul": { "version": "9.3.0", "resolved": "https://registry.npmjs.org/v8-to-istanbul/-/v8-to-istanbul-9.3.0.tgz", diff --git a/package.json b/package.json index 4b3f5ef1a..b2268ed00 100644 --- a/package.json +++ b/package.json @@ -34,6 +34,8 @@ "class-variance-authority": "^0.7.1", "clsx": "^2.1.1", "cmdk": "^1.0.0", + "cron-parser": "^5.0.2", + "croner": "^9.0.0", "date-fns": "^4.1.0", "drizzle-orm": "^0.39.3", "lodash.debounce": "^4.0.8", @@ -49,6 +51,7 @@ "resend": "^4.1.2", "tailwind-merge": "^2.6.0", "tailwindcss-animate": "^1.0.7", + "uuid": "^11.0.5", "zod": "^3.24.1" }, "devDependencies": { diff --git a/stores/sync-manager.ts b/stores/sync-manager.ts index 813516dfd..75908ae21 100644 --- a/stores/sync-manager.ts +++ b/stores/sync-manager.ts @@ -19,6 +19,7 @@ interface WorkflowSyncPayload { const SYNC_INTERVAL_MS = 30000 const API_ENDPOINTS = { SYNC: '/api/db/sync', + SCHEDULE: '/api/scheduled/schedule', LOGIN: '/login', } as const @@ -55,9 +56,19 @@ async function prepareSyncPayload( } } +// Check if workflow has scheduling enabled +function hasSchedulingEnabled(state: WorkflowSyncPayload['state']): boolean { + const starterBlock = Object.values(state.blocks).find((block) => block.type === 'starter') + if (!starterBlock) return false + + const startWorkflow = starterBlock.subBlocks.startWorkflow?.value + return startWorkflow === 'schedule' +} + // Server sync logic async function syncWorkflowsToServer(payloads: WorkflowSyncPayload[]): Promise { try { + // First sync workflows to the database const response = await fetch(API_ENDPOINTS.SYNC, { method: 'POST', headers: { 'Content-Type': 'application/json' }, @@ -76,6 +87,39 @@ async function syncWorkflowsToServer(payloads: WorkflowSyncPayload[]): Promise { + // Update schedule if workflow has scheduling enabled + if (hasSchedulingEnabled(payload.state)) { + const response = await fetch(API_ENDPOINTS.SCHEDULE, { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ + workflowId: payload.id, + state: payload.state, + }), + }) + + if (!response.ok) { + throw new Error( + `Failed to update schedule for workflow ${payload.id}: ${response.statusText}` + ) + } + + const result = await response.json() + console.log(`Schedule updated for workflow ${payload.id}:`, result) + } + }) + ) + + // Log any schedule sync failures but don't fail the overall sync + scheduleResults.forEach((result, index) => { + if (result.status === 'rejected') { + console.error(`Failed to sync schedule for workflow ${payloads[index].id}:`, result.reason) + } + }) + deletedWorkflowIds.clear() console.log('Workflows synced successfully') return true diff --git a/vercel.json b/vercel.json new file mode 100644 index 000000000..588158865 --- /dev/null +++ b/vercel.json @@ -0,0 +1,8 @@ +{ + "crons": [ + { + "path": "/api/scheduled/execute", + "schedule": "*/1 * * * *" + } + ] +}