Merge pull request #71 from simstudioai/feature/starter

feat(starter): add routes for scheduled workflows
This commit is contained in:
waleedlatif1
2025-02-19 01:10:39 -08:00
committed by GitHub
7 changed files with 522 additions and 0 deletions

View File

@@ -0,0 +1,226 @@
import { NextRequest, NextResponse } from 'next/server'
import { Cron } from 'croner'
import { eq, lte } from 'drizzle-orm'
import { v4 as uuidv4 } from 'uuid'
import { persistLog } from '@/lib/logging'
import { BlockState, WorkflowState } from '@/stores/workflow/types'
import { mergeSubblockState } from '@/stores/workflow/utils'
import { db } from '@/db'
import { workflow, workflowSchedule } from '@/db/schema'
import { Executor } from '@/executor'
import { Serializer } from '@/serializer'
interface SubBlockValue {
value: string
}
function getSubBlockValue(block: BlockState, id: string): string {
const subBlock = block.subBlocks[id] as SubBlockValue | undefined
return subBlock?.value || ''
}
function calculateNextRunTime(
schedule: typeof workflowSchedule.$inferSelect,
blocks: Record<string, BlockState>
): Date {
// Find the starter block
const starterBlock = Object.values(blocks).find((block) => block.type === 'starter')
if (!starterBlock) throw new Error('No starter block found')
const scheduleType = getSubBlockValue(starterBlock, 'scheduleType')
const timezone = getSubBlockValue(starterBlock, 'timezone') || 'UTC'
switch (scheduleType) {
case 'minutes': {
const interval = parseInt(getSubBlockValue(starterBlock, 'minutesInterval') || '15')
// If this is the first run and we have a starting time preference
const startingAt = getSubBlockValue(starterBlock, 'minutesStartingAt')
if (!schedule.lastRanAt && startingAt) {
const [hours, minutes] = startingAt.split(':')
const startTime = new Date()
startTime.setHours(parseInt(hours), parseInt(minutes), 0, 0)
// If start time is in the past, add interval until it's in the future
while (startTime <= new Date()) {
startTime.setMinutes(startTime.getMinutes() + interval)
}
return startTime
}
// For subsequent runs, just add the interval to the last run
const nextRun = new Date(schedule.lastRanAt || new Date())
nextRun.setMinutes(nextRun.getMinutes() + interval)
return nextRun
}
case 'hourly': {
const minute = parseInt(getSubBlockValue(starterBlock, 'hourlyMinute') || '0')
const nextRun = new Date()
nextRun.setHours(nextRun.getHours() + 1, minute, 0, 0)
return nextRun
}
case 'daily': {
const [hours, minutes] = getSubBlockValue(starterBlock, 'dailyTime').split(':')
const nextRun = new Date()
nextRun.setHours(parseInt(hours || '9'), parseInt(minutes || '0'), 0, 0)
if (nextRun <= new Date()) {
nextRun.setDate(nextRun.getDate() + 1)
}
return nextRun
}
case 'weekly': {
const dayMap: Record<string, number> = {
MON: 1,
TUE: 2,
WED: 3,
THU: 4,
FRI: 5,
SAT: 6,
SUN: 0,
}
const targetDay = dayMap[getSubBlockValue(starterBlock, 'weeklyDay') || 'MON']
const [hours, minutes] = getSubBlockValue(starterBlock, 'weeklyDayTime').split(':')
const nextRun = new Date()
nextRun.setHours(parseInt(hours || '9'), parseInt(minutes || '0'), 0, 0)
while (nextRun.getDay() !== targetDay || nextRun <= new Date()) {
nextRun.setDate(nextRun.getDate() + 1)
}
return nextRun
}
case 'monthly': {
const day = parseInt(getSubBlockValue(starterBlock, 'monthlyDay') || '1')
const [hours, minutes] = getSubBlockValue(starterBlock, 'monthlyTime').split(':')
const nextRun = new Date()
nextRun.setDate(day)
nextRun.setHours(parseInt(hours || '9'), parseInt(minutes || '0'), 0, 0)
if (nextRun <= new Date()) {
nextRun.setMonth(nextRun.getMonth() + 1)
}
return nextRun
}
case 'custom': {
const cronExpression = getSubBlockValue(starterBlock, 'cronExpression')
if (!cronExpression) throw new Error('No cron expression provided')
// Create a new cron instance with the expression and timezone
const cron = new Cron(cronExpression, { timezone })
// Get the next occurrence after now
const nextDate = cron.nextRun()
if (!nextDate) throw new Error('Invalid cron expression or no future occurrences')
return nextDate
}
default:
throw new Error(`Unsupported schedule type: ${scheduleType}`)
}
}
export const config = {
runtime: 'edge',
schedule: '*/1 * * * *',
}
// Keep track of running executions to prevent overlap
const runningExecutions = new Set<string>()
export async function POST(req: NextRequest) {
const now = new Date()
// Query schedules due for execution
const dueSchedules = await db
.select()
.from(workflowSchedule)
.where(lte(workflowSchedule.nextRunAt, now))
// Limit to 10 workflows per minute to prevent overload
.limit(10)
for (const schedule of dueSchedules) {
try {
// Skip if this workflow is already running
if (runningExecutions.has(schedule.workflowId)) {
console.log(`Skipping workflow ${schedule.workflowId} - already running`)
continue
}
runningExecutions.add(schedule.workflowId)
// Retrieve the workflow record
const [workflowRecord] = await db
.select()
.from(workflow)
.where(eq(workflow.id, schedule.workflowId))
.limit(1)
if (!workflowRecord) {
runningExecutions.delete(schedule.workflowId)
continue
}
// The state in the database is exactly what we store in localStorage
const state = workflowRecord.state as WorkflowState
const { blocks, edges, loops } = state
// Use the same execution flow as in use-workflow-execution.ts
const mergedStates = mergeSubblockState(blocks)
const currentBlockStates = Object.entries(mergedStates).reduce(
(acc, [id, block]) => {
acc[id] = Object.entries(block.subBlocks).reduce(
(subAcc, [key, subBlock]) => {
subAcc[key] = subBlock.value
return subAcc
},
{} as Record<string, any>
)
return acc
},
{} as Record<string, Record<string, any>>
)
// Serialize and execute the workflow
const serializedWorkflow = new Serializer().serializeWorkflow(mergedStates, edges, loops)
const executor = new Executor(serializedWorkflow, currentBlockStates, {})
const executionId = uuidv4()
const result = await executor.execute(schedule.workflowId)
// Log the execution result
await persistLog({
id: uuidv4(),
workflowId: schedule.workflowId,
executionId,
level: result.success ? 'info' : 'error',
message: result.success
? 'Scheduled workflow executed successfully'
: `Scheduled workflow execution failed: ${result.error}`,
createdAt: new Date(),
})
// Calculate the next run time based on the schedule configuration
const nextRunAt = calculateNextRunTime(schedule, blocks)
// Update the schedule with the next run time
await db
.update(workflowSchedule)
.set({
lastRanAt: now,
updatedAt: now,
nextRunAt,
})
.where(eq(workflowSchedule.id, schedule.id))
} catch (error: any) {
await persistLog({
id: uuidv4(),
workflowId: schedule.workflowId,
executionId: uuidv4(),
level: 'error',
message: error.message || 'Unknown error during scheduled workflow execution',
createdAt: new Date(),
})
} finally {
runningExecutions.delete(schedule.workflowId)
}
}
return NextResponse.json({
message: 'Scheduled workflow executions processed',
executedCount: dueSchedules.length,
})
}

View File

@@ -0,0 +1,180 @@
import { NextRequest, NextResponse } from 'next/server'
import { eq } from 'drizzle-orm'
import { z } from 'zod'
import { getSession } from '@/lib/auth'
import { BlockState } from '@/stores/workflow/types'
import { db } from '@/db'
import { workflow, workflowSchedule } from '@/db/schema'
interface SubBlockValue {
value: string
}
function getSubBlockValue(block: BlockState, id: string): string {
const subBlock = block.subBlocks[id] as SubBlockValue | undefined
return subBlock?.value || ''
}
// Schema for schedule request
const ScheduleRequestSchema = z.object({
workflowId: z.string(),
state: z.object({
blocks: z.record(z.any()),
edges: z.array(z.any()),
loops: z.record(z.any()),
}),
})
export async function POST(req: NextRequest) {
try {
const session = await getSession()
if (!session?.user?.id) {
return NextResponse.json({ error: 'Unauthorized' }, { status: 401 })
}
const body = await req.json()
const { workflowId, state } = ScheduleRequestSchema.parse(body)
// Find the starter block to check if it's configured for scheduling
const starterBlock = Object.values(state.blocks).find(
(block: any) => block.type === 'starter'
) as BlockState | undefined
if (!starterBlock) {
return NextResponse.json({ error: 'No starter block found in workflow' }, { status: 400 })
}
const startWorkflow = getSubBlockValue(starterBlock, 'startWorkflow')
// If the workflow is not scheduled, delete any existing schedule
if (startWorkflow !== 'schedule') {
await db.delete(workflowSchedule).where(eq(workflowSchedule.workflowId, workflowId))
return NextResponse.json({ message: 'Schedule removed' })
}
// Get schedule configuration from starter block
const scheduleType = getSubBlockValue(starterBlock, 'scheduleType')
const timezone = getSubBlockValue(starterBlock, 'timezone') || 'UTC'
// Calculate initial next run time based on schedule type
let cronExpression: string | null = null
let nextRunAt = new Date()
switch (scheduleType) {
case 'minutes': {
const interval = parseInt(getSubBlockValue(starterBlock, 'minutesInterval') || '15')
const startingAt = getSubBlockValue(starterBlock, 'minutesStartingAt')
if (startingAt) {
const [hours, minutes] = startingAt.split(':')
nextRunAt.setHours(parseInt(hours), parseInt(minutes), 0, 0)
while (nextRunAt <= new Date()) {
nextRunAt.setMinutes(nextRunAt.getMinutes() + interval)
}
} else {
nextRunAt.setMinutes(nextRunAt.getMinutes() + interval)
}
cronExpression = `*/${interval} * * * *`
break
}
case 'hourly': {
const minute = parseInt(getSubBlockValue(starterBlock, 'hourlyMinute') || '0')
nextRunAt.setHours(nextRunAt.getHours() + 1, minute, 0, 0)
cronExpression = `${minute} * * * *`
break
}
case 'daily': {
const [hours, minutes] = getSubBlockValue(starterBlock, 'dailyTime').split(':')
nextRunAt.setHours(parseInt(hours || '9'), parseInt(minutes || '0'), 0, 0)
if (nextRunAt <= new Date()) {
nextRunAt.setDate(nextRunAt.getDate() + 1)
}
cronExpression = `${minutes || '0'} ${hours || '9'} * * *`
break
}
case 'weekly': {
const dayMap: Record<string, number> = {
MON: 1,
TUE: 2,
WED: 3,
THU: 4,
FRI: 5,
SAT: 6,
SUN: 0,
}
const targetDay = dayMap[getSubBlockValue(starterBlock, 'weeklyDay') || 'MON']
const [hours, minutes] = getSubBlockValue(starterBlock, 'weeklyDayTime').split(':')
nextRunAt.setHours(parseInt(hours || '9'), parseInt(minutes || '0'), 0, 0)
while (nextRunAt.getDay() !== targetDay || nextRunAt <= new Date()) {
nextRunAt.setDate(nextRunAt.getDate() + 1)
}
cronExpression = `${minutes || '0'} ${hours || '9'} * * ${targetDay}`
break
}
case 'monthly': {
const day = parseInt(getSubBlockValue(starterBlock, 'monthlyDay') || '1')
const [hours, minutes] = getSubBlockValue(starterBlock, 'monthlyTime').split(':')
nextRunAt.setDate(day)
nextRunAt.setHours(parseInt(hours || '9'), parseInt(minutes || '0'), 0, 0)
if (nextRunAt <= new Date()) {
nextRunAt.setMonth(nextRunAt.getMonth() + 1)
}
cronExpression = `${minutes || '0'} ${hours || '9'} ${day} * *`
break
}
case 'custom': {
cronExpression = getSubBlockValue(starterBlock, 'cronExpression')
if (!cronExpression) {
return NextResponse.json(
{ error: 'No cron expression provided for custom schedule' },
{ status: 400 }
)
}
// The execute-workflows endpoint will handle custom cron calculation
nextRunAt = new Date()
nextRunAt.setMinutes(nextRunAt.getMinutes() + 1) // Start in 1 minute
break
}
default:
return NextResponse.json({ error: 'Invalid schedule type' }, { status: 400 })
}
// Upsert the schedule
await db
.insert(workflowSchedule)
.values({
id: crypto.randomUUID(),
workflowId,
cronExpression,
nextRunAt,
timezone,
triggerType: 'schedule',
createdAt: new Date(),
updatedAt: new Date(),
})
.onConflictDoUpdate({
target: [workflowSchedule.workflowId],
set: {
cronExpression,
nextRunAt,
timezone,
updatedAt: new Date(),
},
})
return NextResponse.json({
message: 'Schedule updated',
nextRunAt,
cronExpression,
})
} catch (error) {
console.error('Error updating workflow schedule:', error)
if (error instanceof z.ZodError) {
return NextResponse.json(
{ error: 'Invalid request data', details: error.errors },
{ status: 400 }
)
}
return NextResponse.json({ error: 'Failed to update workflow schedule' }, { status: 500 })
}
}

15
lib/logging.ts Normal file
View File

@@ -0,0 +1,15 @@
import { db } from '@/db'
import { consoleLog } from '@/db/schema'
export interface LogEntry {
id: string
workflowId: string
executionId: string
level: string
message: string
createdAt: Date
}
export async function persistLog(log: LogEntry) {
await db.insert(consoleLog).values(log)
}

46
package-lock.json generated
View File

@@ -26,6 +26,8 @@
"class-variance-authority": "^0.7.1",
"clsx": "^2.1.1",
"cmdk": "^1.0.0",
"cron-parser": "^5.0.2",
"croner": "^9.0.0",
"date-fns": "^4.1.0",
"drizzle-orm": "^0.39.3",
"lodash.debounce": "^4.0.8",
@@ -41,6 +43,7 @@
"resend": "^4.1.2",
"tailwind-merge": "^2.6.0",
"tailwindcss-animate": "^1.0.7",
"uuid": "^11.0.5",
"zod": "^3.24.1"
},
"devDependencies": {
@@ -5975,6 +5978,27 @@
"node": "^14.15.0 || ^16.10.0 || >=18.0.0"
}
},
"node_modules/cron-parser": {
"version": "5.0.2",
"resolved": "https://registry.npmjs.org/cron-parser/-/cron-parser-5.0.2.tgz",
"integrity": "sha512-RXXr5WuvLInay/DstwRD087/DpLOm24Y3mrkRmOdrMjbdGfQyPiMsx1Ad+SaJF2zN0tN78ckBjcEkIHVd+MX7Q==",
"license": "MIT",
"dependencies": {
"luxon": "^3.5.0"
},
"engines": {
"node": ">=18"
}
},
"node_modules/croner": {
"version": "9.0.0",
"resolved": "https://registry.npmjs.org/croner/-/croner-9.0.0.tgz",
"integrity": "sha512-onMB0OkDjkXunhdW9htFjEhqrD54+M94i6ackoUkjHKbRnXdyEyKRelp4nJ1kAz32+s27jP1FsebpJCVl0BsvA==",
"license": "MIT",
"engines": {
"node": ">=18.0"
}
},
"node_modules/cross-spawn": {
"version": "7.0.6",
"resolved": "https://registry.npmjs.org/cross-spawn/-/cross-spawn-7.0.6.tgz",
@@ -8682,6 +8706,15 @@
"react": "^16.5.1 || ^17.0.0 || ^18.0.0 || ^19.0.0"
}
},
"node_modules/luxon": {
"version": "3.5.0",
"resolved": "https://registry.npmjs.org/luxon/-/luxon-3.5.0.tgz",
"integrity": "sha512-rh+Zjr6DNfUYR3bPwJEnuwDdqMbxZW7LOQfUN4B54+Cl+0o5zaU9RJ6bcidfDtC1cWCZXQ+nvX8bf6bAji37QQ==",
"license": "MIT",
"engines": {
"node": ">=12"
}
},
"node_modules/make-dir": {
"version": "4.0.0",
"resolved": "https://registry.npmjs.org/make-dir/-/make-dir-4.0.0.tgz",
@@ -10928,6 +10961,19 @@
"integrity": "sha512-EPD5q1uXyFxJpCrLnCc1nHnq3gOa6DZBocAIiI2TaSCA7VCJ1UJDMagCzIkXNsUYfD1daK//LTEQ8xiIbrHtcw==",
"license": "MIT"
},
"node_modules/uuid": {
"version": "11.0.5",
"resolved": "https://registry.npmjs.org/uuid/-/uuid-11.0.5.tgz",
"integrity": "sha512-508e6IcKLrhxKdBbcA2b4KQZlLVp2+J5UwQ6F7Drckkc5N9ZJwFa4TgWtsww9UG8fGHbm6gbV19TdM5pQ4GaIA==",
"funding": [
"https://github.com/sponsors/broofa",
"https://github.com/sponsors/ctavan"
],
"license": "MIT",
"bin": {
"uuid": "dist/esm/bin/uuid"
}
},
"node_modules/v8-to-istanbul": {
"version": "9.3.0",
"resolved": "https://registry.npmjs.org/v8-to-istanbul/-/v8-to-istanbul-9.3.0.tgz",

View File

@@ -34,6 +34,8 @@
"class-variance-authority": "^0.7.1",
"clsx": "^2.1.1",
"cmdk": "^1.0.0",
"cron-parser": "^5.0.2",
"croner": "^9.0.0",
"date-fns": "^4.1.0",
"drizzle-orm": "^0.39.3",
"lodash.debounce": "^4.0.8",
@@ -49,6 +51,7 @@
"resend": "^4.1.2",
"tailwind-merge": "^2.6.0",
"tailwindcss-animate": "^1.0.7",
"uuid": "^11.0.5",
"zod": "^3.24.1"
},
"devDependencies": {

View File

@@ -19,6 +19,7 @@ interface WorkflowSyncPayload {
const SYNC_INTERVAL_MS = 30000
const API_ENDPOINTS = {
SYNC: '/api/db/sync',
SCHEDULE: '/api/scheduled/schedule',
LOGIN: '/login',
} as const
@@ -55,9 +56,19 @@ async function prepareSyncPayload(
}
}
// Check if workflow has scheduling enabled
function hasSchedulingEnabled(state: WorkflowSyncPayload['state']): boolean {
const starterBlock = Object.values(state.blocks).find((block) => block.type === 'starter')
if (!starterBlock) return false
const startWorkflow = starterBlock.subBlocks.startWorkflow?.value
return startWorkflow === 'schedule'
}
// Server sync logic
async function syncWorkflowsToServer(payloads: WorkflowSyncPayload[]): Promise<boolean> {
try {
// First sync workflows to the database
const response = await fetch(API_ENDPOINTS.SYNC, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
@@ -76,6 +87,39 @@ async function syncWorkflowsToServer(payloads: WorkflowSyncPayload[]): Promise<b
throw new Error(`Batch sync failed: ${response.statusText}`)
}
// Then update schedules for workflows that have scheduling enabled
const scheduleResults = await Promise.allSettled(
payloads.map(async (payload) => {
// Update schedule if workflow has scheduling enabled
if (hasSchedulingEnabled(payload.state)) {
const response = await fetch(API_ENDPOINTS.SCHEDULE, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
workflowId: payload.id,
state: payload.state,
}),
})
if (!response.ok) {
throw new Error(
`Failed to update schedule for workflow ${payload.id}: ${response.statusText}`
)
}
const result = await response.json()
console.log(`Schedule updated for workflow ${payload.id}:`, result)
}
})
)
// Log any schedule sync failures but don't fail the overall sync
scheduleResults.forEach((result, index) => {
if (result.status === 'rejected') {
console.error(`Failed to sync schedule for workflow ${payloads[index].id}:`, result.reason)
}
})
deletedWorkflowIds.clear()
console.log('Workflows synced successfully')
return true

8
vercel.json Normal file
View File

@@ -0,0 +1,8 @@
{
"crons": [
{
"path": "/api/scheduled/execute",
"schedule": "*/1 * * * *"
}
]
}