fix(schedules): migrate to trigger dev (#1618)

* fix(schedules): add cron job auth like other cron routes

* migrate schedules to trigger dev

* remove executions check

* fix tests
This commit is contained in:
Vikhyath Mondreti
2025-10-13 13:12:55 -07:00
committed by GitHub
parent 09cccd5487
commit 3dd36a8a35
4 changed files with 868 additions and 851 deletions

View File

@@ -3,81 +3,50 @@
*
* @vitest-environment node
*/
import type { NextRequest } from 'next/server'
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'
import {
mockExecutionDependencies,
mockScheduleExecuteDb,
sampleWorkflowState,
} from '@/app/api/__test-utils__/utils'
function createMockRequest(): NextRequest {
const mockHeaders = new Map([
['authorization', 'Bearer test-cron-secret'],
['content-type', 'application/json'],
])
return {
headers: {
get: (key: string) => mockHeaders.get(key.toLowerCase()) || null,
},
url: 'http://localhost:3000/api/schedules/execute',
} as NextRequest
}
describe('Scheduled Workflow Execution API Route', () => {
beforeEach(() => {
vi.clearAllMocks()
vi.resetModules()
})
mockExecutionDependencies()
afterEach(() => {
vi.clearAllMocks()
vi.resetModules()
})
// Mock all dependencies
vi.doMock('@/services/queue', () => ({
RateLimiter: vi.fn().mockImplementation(() => ({
checkRateLimitWithSubscription: vi.fn().mockResolvedValue({
allowed: true,
remaining: 100,
resetAt: new Date(Date.now() + 60000),
}),
})),
it('should execute scheduled workflows with Trigger.dev disabled', async () => {
const mockExecuteScheduleJob = vi.fn().mockResolvedValue(undefined)
vi.doMock('@/lib/auth/internal', () => ({
verifyCronAuth: vi.fn().mockReturnValue(null),
}))
vi.doMock('@/lib/billing', () => ({
checkServerSideUsageLimits: vi.fn().mockResolvedValue({ isExceeded: false }),
vi.doMock('@/background/schedule-execution', () => ({
executeScheduleJob: mockExecuteScheduleJob,
}))
vi.doMock('@/lib/billing/core/subscription', () => ({
getHighestPrioritySubscription: vi.fn().mockResolvedValue({
plan: 'pro',
status: 'active',
}),
}))
vi.doMock('@/lib/environment/utils', () => ({
getPersonalAndWorkspaceEnv: vi.fn().mockResolvedValue({
personalEncrypted: {},
workspaceEncrypted: {},
}),
}))
vi.doMock('@/lib/logs/execution/logging-session', () => ({
LoggingSession: vi.fn().mockImplementation(() => ({
safeStart: vi.fn().mockResolvedValue(undefined),
safeComplete: vi.fn().mockResolvedValue(undefined),
safeCompleteWithError: vi.fn().mockResolvedValue(undefined),
setupExecutor: vi.fn(),
})),
}))
vi.doMock('@/lib/workflows/db-helpers', () => ({
loadDeployedWorkflowState: vi.fn().mockResolvedValue({
blocks: sampleWorkflowState.blocks,
edges: sampleWorkflowState.edges || [],
loops: sampleWorkflowState.loops || {},
parallels: sampleWorkflowState.parallels || {},
}),
loadWorkflowFromNormalizedTables: vi.fn().mockResolvedValue({
blocks: sampleWorkflowState.blocks,
edges: sampleWorkflowState.edges || [],
loops: sampleWorkflowState.loops || {},
parallels: {},
isFromNormalizedTables: true,
}),
}))
vi.doMock('@/stores/workflows/server-utils', () => ({
mergeSubblockState: vi.fn().mockReturnValue(sampleWorkflowState.blocks),
}))
vi.doMock('@/lib/schedules/utils', () => ({
calculateNextRunTime: vi.fn().mockReturnValue(new Date(Date.now() + 60000)),
getScheduleTimeValues: vi.fn().mockReturnValue({}),
getSubBlockValue: vi.fn().mockReturnValue('manual'),
vi.doMock('@/lib/env', () => ({
env: {
TRIGGER_DEV_ENABLED: false,
},
isTruthy: vi.fn(() => false),
}))
vi.doMock('drizzle-orm', () => ({
@@ -85,198 +54,209 @@ describe('Scheduled Workflow Execution API Route', () => {
eq: vi.fn((field, value) => ({ field, value, type: 'eq' })),
lte: vi.fn((field, value) => ({ field, value, type: 'lte' })),
not: vi.fn((condition) => ({ type: 'not', condition })),
sql: vi.fn((strings, ...values) => ({ strings, values, type: 'sql' })),
}))
vi.doMock('croner', () => ({
Cron: vi.fn().mockImplementation(() => ({
nextRun: vi.fn().mockReturnValue(new Date(Date.now() + 60000)), // Next run in 1 minute
})),
}))
vi.doMock('@sim/db', () => {
const mockDb = {
select: vi.fn().mockImplementation(() => ({
from: vi.fn().mockImplementation((_table: any) => ({
where: vi.fn().mockImplementation((_cond: any) => ({
limit: vi.fn().mockImplementation((n?: number) => {
// Always return empty array - no due schedules
return []
}),
})),
})),
})),
update: vi.fn().mockImplementation(() => ({
set: vi.fn().mockImplementation(() => ({
where: vi.fn().mockResolvedValue([]),
from: vi.fn().mockImplementation(() => ({
where: vi.fn().mockImplementation(() => [
{
id: 'schedule-1',
workflowId: 'workflow-1',
blockId: null,
cronExpression: null,
lastRanAt: null,
failedCount: 0,
},
]),
})),
})),
}
return {
db: mockDb,
userStats: {
userId: 'userId',
totalScheduledExecutions: 'totalScheduledExecutions',
lastActive: 'lastActive',
},
workflow: { id: 'id', userId: 'userId', state: 'state' },
workflowSchedule: {
id: 'id',
workflowId: 'workflowId',
nextRunAt: 'nextRunAt',
status: 'status',
},
workflowSchedule: {},
}
})
const { GET } = await import('@/app/api/schedules/execute/route')
const response = await GET(createMockRequest())
expect(response).toBeDefined()
expect(response.status).toBe(200)
const data = await response.json()
expect(data).toHaveProperty('message')
expect(data).toHaveProperty('executedCount', 1)
})
afterEach(() => {
vi.clearAllMocks()
})
it('should queue schedules to Trigger.dev when enabled', async () => {
const mockTrigger = vi.fn().mockResolvedValue({ id: 'task-id-123' })
it('should execute scheduled workflows successfully', async () => {
const executeMock = vi.fn().mockResolvedValue({
success: true,
output: { response: 'Scheduled execution completed' },
logs: [],
metadata: {
duration: 100,
startTime: new Date().toISOString(),
endTime: new Date().toISOString(),
vi.doMock('@/lib/auth/internal', () => ({
verifyCronAuth: vi.fn().mockReturnValue(null),
}))
vi.doMock('@trigger.dev/sdk', () => ({
tasks: {
trigger: mockTrigger,
},
})
vi.doMock('@/executor', () => ({
Executor: vi.fn().mockImplementation(() => ({
execute: executeMock,
})),
}))
const { GET } = await import('@/app/api/schedules/execute/route')
const response = await GET()
expect(response).toBeDefined()
const data = await response.json()
expect(data).toHaveProperty('message')
expect(data).toHaveProperty('executedCount')
})
it('should handle errors during scheduled execution gracefully', async () => {
vi.doMock('@/executor', () => ({
Executor: vi.fn().mockImplementation(() => ({
execute: vi.fn().mockRejectedValue(new Error('Execution failed')),
})),
vi.doMock('@/lib/env', () => ({
env: {
TRIGGER_DEV_ENABLED: true,
},
isTruthy: vi.fn(() => true),
}))
const { GET } = await import('@/app/api/schedules/execute/route')
const response = await GET()
vi.doMock('drizzle-orm', () => ({
and: vi.fn((...conditions) => ({ type: 'and', conditions })),
eq: vi.fn((field, value) => ({ field, value, type: 'eq' })),
lte: vi.fn((field, value) => ({ field, value, type: 'lte' })),
not: vi.fn((condition) => ({ type: 'not', condition })),
}))
expect(response).toBeDefined()
const data = await response.json()
expect(data).toHaveProperty('message')
})
it('should handle case with no due schedules', async () => {
vi.doMock('@sim/db', () => {
const mockDb = {
select: vi.fn().mockImplementation(() => ({
from: vi.fn().mockImplementation(() => ({
where: vi.fn().mockImplementation(() => ({
limit: vi.fn().mockImplementation(() => []),
})),
})),
})),
update: vi.fn().mockImplementation(() => ({
set: vi.fn().mockImplementation(() => ({
where: vi.fn().mockResolvedValue([]),
where: vi.fn().mockImplementation(() => [
{
id: 'schedule-1',
workflowId: 'workflow-1',
blockId: null,
cronExpression: null,
lastRanAt: null,
failedCount: 0,
},
]),
})),
})),
}
return { db: mockDb }
return {
db: mockDb,
workflowSchedule: {},
}
})
const { GET } = await import('@/app/api/schedules/execute/route')
const response = await GET()
const response = await GET(createMockRequest())
expect(response).toBeDefined()
expect(response.status).toBe(200)
const data = await response.json()
expect(data).toHaveProperty('executedCount', 0)
const executeMock = vi.fn()
vi.doMock('@/executor', () => ({
Executor: vi.fn().mockImplementation(() => ({
execute: executeMock,
})),
}))
expect(executeMock).not.toHaveBeenCalled()
expect(data).toHaveProperty('executedCount', 1)
})
// Removed: Test isolation issues with mocks make this unreliable
it('should execute schedules that are explicitly marked as active', async () => {
const executeMock = vi.fn().mockResolvedValue({ success: true, metadata: {} })
vi.doMock('@/executor', () => ({
Executor: vi.fn().mockImplementation(() => ({
execute: executeMock,
})),
it('should handle case with no due schedules', async () => {
vi.doMock('@/lib/auth/internal', () => ({
verifyCronAuth: vi.fn().mockReturnValue(null),
}))
mockScheduleExecuteDb({
schedules: [
{
id: 'schedule-active',
workflowId: 'workflow-id',
userId: 'user-id',
status: 'active',
nextRunAt: new Date(Date.now() - 60_000),
lastRanAt: null,
cronExpression: null,
failedCount: 0,
},
],
vi.doMock('@/background/schedule-execution', () => ({
executeScheduleJob: vi.fn().mockResolvedValue(undefined),
}))
vi.doMock('@/lib/env', () => ({
env: {
TRIGGER_DEV_ENABLED: false,
},
isTruthy: vi.fn(() => false),
}))
vi.doMock('drizzle-orm', () => ({
and: vi.fn((...conditions) => ({ type: 'and', conditions })),
eq: vi.fn((field, value) => ({ field, value, type: 'eq' })),
lte: vi.fn((field, value) => ({ field, value, type: 'lte' })),
not: vi.fn((condition) => ({ type: 'not', condition })),
}))
vi.doMock('@sim/db', () => {
const mockDb = {
select: vi.fn().mockImplementation(() => ({
from: vi.fn().mockImplementation(() => ({
where: vi.fn().mockImplementation(() => []),
})),
})),
}
return {
db: mockDb,
workflowSchedule: {},
}
})
const { GET } = await import('@/app/api/schedules/execute/route')
const response = await GET()
expect(response.status).toBe(200)
})
it('should not execute schedules that are disabled', async () => {
const executeMock = vi.fn()
vi.doMock('@/executor', () => ({
Executor: vi.fn().mockImplementation(() => ({
execute: executeMock,
})),
}))
mockScheduleExecuteDb({
schedules: [
{
id: 'schedule-disabled',
workflowId: 'workflow-id',
userId: 'user-id',
status: 'disabled',
nextRunAt: new Date(Date.now() - 60_000),
lastRanAt: null,
cronExpression: null,
failedCount: 0,
},
],
})
const { GET } = await import('@/app/api/schedules/execute/route')
const response = await GET()
const response = await GET(createMockRequest())
expect(response.status).toBe(200)
const data = await response.json()
expect(data).toHaveProperty('message')
expect(data).toHaveProperty('executedCount', 0)
})
expect(executeMock).not.toHaveBeenCalled()
it('should execute multiple schedules in parallel', async () => {
vi.doMock('@/lib/auth/internal', () => ({
verifyCronAuth: vi.fn().mockReturnValue(null),
}))
vi.doMock('@/background/schedule-execution', () => ({
executeScheduleJob: vi.fn().mockResolvedValue(undefined),
}))
vi.doMock('@/lib/env', () => ({
env: {
TRIGGER_DEV_ENABLED: false,
},
isTruthy: vi.fn(() => false),
}))
vi.doMock('drizzle-orm', () => ({
and: vi.fn((...conditions) => ({ type: 'and', conditions })),
eq: vi.fn((field, value) => ({ field, value, type: 'eq' })),
lte: vi.fn((field, value) => ({ field, value, type: 'lte' })),
not: vi.fn((condition) => ({ type: 'not', condition })),
}))
vi.doMock('@sim/db', () => {
const mockDb = {
select: vi.fn().mockImplementation(() => ({
from: vi.fn().mockImplementation(() => ({
where: vi.fn().mockImplementation(() => [
{
id: 'schedule-1',
workflowId: 'workflow-1',
blockId: null,
cronExpression: null,
lastRanAt: null,
failedCount: 0,
},
{
id: 'schedule-2',
workflowId: 'workflow-2',
blockId: null,
cronExpression: null,
lastRanAt: null,
failedCount: 0,
},
]),
})),
})),
}
return {
db: mockDb,
workflowSchedule: {},
}
})
const { GET } = await import('@/app/api/schedules/execute/route')
const response = await GET(createMockRequest())
expect(response.status).toBe(200)
const data = await response.json()
expect(data).toHaveProperty('executedCount', 2)
})
})

View File

@@ -1,673 +1,108 @@
import { db, userStats, workflow, workflowSchedule } from '@sim/db'
import { Cron } from 'croner'
import { and, eq, lte, not, sql } from 'drizzle-orm'
import { NextResponse } from 'next/server'
import { v4 as uuidv4 } from 'uuid'
import { z } from 'zod'
import { getApiKeyOwnerUserId } from '@/lib/api-key/service'
import { checkServerSideUsageLimits } from '@/lib/billing'
import { getHighestPrioritySubscription } from '@/lib/billing/core/subscription'
import { getPersonalAndWorkspaceEnv } from '@/lib/environment/utils'
import { db, workflowSchedule } from '@sim/db'
import { tasks } from '@trigger.dev/sdk'
import { and, eq, lte, not } from 'drizzle-orm'
import { type NextRequest, NextResponse } from 'next/server'
import { verifyCronAuth } from '@/lib/auth/internal'
import { env, isTruthy } from '@/lib/env'
import { createLogger } from '@/lib/logs/console/logger'
import { LoggingSession } from '@/lib/logs/execution/logging-session'
import { buildTraceSpans } from '@/lib/logs/execution/trace-spans/trace-spans'
import {
type BlockState,
calculateNextRunTime as calculateNextTime,
getScheduleTimeValues,
getSubBlockValue,
} from '@/lib/schedules/utils'
import { decryptSecret, generateRequestId } from '@/lib/utils'
import { blockExistsInDeployment, loadDeployedWorkflowState } from '@/lib/workflows/db-helpers'
import { updateWorkflowRunCounts } from '@/lib/workflows/utils'
import { Executor } from '@/executor'
import { Serializer } from '@/serializer'
import { RateLimiter } from '@/services/queue'
import { mergeSubblockState } from '@/stores/workflows/server-utils'
import { generateRequestId } from '@/lib/utils'
import { executeScheduleJob } from '@/background/schedule-execution'
export const dynamic = 'force-dynamic'
const logger = createLogger('ScheduledExecuteAPI')
// Maximum number of consecutive failures before disabling a schedule
const MAX_CONSECUTIVE_FAILURES = 3
export async function GET(request: NextRequest) {
const requestId = generateRequestId()
logger.info(`[${requestId}] Scheduled execution triggered at ${new Date().toISOString()}`)
/**
* Calculate the next run time for a schedule
* This is a wrapper around the utility function in schedule-utils.ts
*/
function calculateNextRunTime(
schedule: typeof workflowSchedule.$inferSelect,
blocks: Record<string, BlockState>
): Date {
// Look for either starter block or schedule trigger block
const scheduleBlock = Object.values(blocks).find(
(block) => block.type === 'starter' || block.type === 'schedule'
)
if (!scheduleBlock) throw new Error('No starter or schedule block found')
const scheduleType = getSubBlockValue(scheduleBlock, 'scheduleType')
const scheduleValues = getScheduleTimeValues(scheduleBlock)
if (schedule.cronExpression) {
const cron = new Cron(schedule.cronExpression)
const nextDate = cron.nextRun()
if (!nextDate) throw new Error('Invalid cron expression or no future occurrences')
return nextDate
const authError = verifyCronAuth(request, 'Schedule execution')
if (authError) {
return authError
}
const lastRanAt = schedule.lastRanAt ? new Date(schedule.lastRanAt) : null
return calculateNextTime(scheduleType, scheduleValues, lastRanAt)
}
const EnvVarsSchema = z.record(z.string())
const runningExecutions = new Set<string>()
export async function GET() {
logger.info(`Scheduled execution triggered at ${new Date().toISOString()}`)
const requestId = generateRequestId()
const now = new Date()
let dueSchedules: (typeof workflowSchedule.$inferSelect)[] = []
try {
dueSchedules = await db
const dueSchedules = await db
.select()
.from(workflowSchedule)
.where(
and(lte(workflowSchedule.nextRunAt, now), not(eq(workflowSchedule.status, 'disabled')))
)
.limit(10)
logger.debug(`[${requestId}] Successfully queried schedules: ${dueSchedules.length} found`)
logger.info(`[${requestId}] Processing ${dueSchedules.length} due scheduled workflows`)
for (const schedule of dueSchedules) {
const executionId = uuidv4()
try {
if (runningExecutions.has(schedule.workflowId)) {
logger.debug(`[${requestId}] Skipping workflow ${schedule.workflowId} - already running`)
continue
}
runningExecutions.add(schedule.workflowId)
logger.debug(`[${requestId}] Starting execution of workflow ${schedule.workflowId}`)
const [workflowRecord] = await db
.select()
.from(workflow)
.where(eq(workflow.id, schedule.workflowId))
.limit(1)
if (!workflowRecord) {
logger.warn(`[${requestId}] Workflow ${schedule.workflowId} not found`)
runningExecutions.delete(schedule.workflowId)
continue
}
const actorUserId = await getApiKeyOwnerUserId(workflowRecord.pinnedApiKeyId)
if (!actorUserId) {
logger.warn(
`[${requestId}] Skipping schedule ${schedule.id}: pinned API key required to attribute usage.`
)
runningExecutions.delete(schedule.workflowId)
continue
}
// Check rate limits for scheduled execution (checks both personal and org subscriptions)
const userSubscription = await getHighestPrioritySubscription(actorUserId)
const rateLimiter = new RateLimiter()
const rateLimitCheck = await rateLimiter.checkRateLimitWithSubscription(
actorUserId,
userSubscription,
'schedule',
false // schedules are always sync
)
if (!rateLimitCheck.allowed) {
logger.warn(
`[${requestId}] Rate limit exceeded for scheduled workflow ${schedule.workflowId}`,
{
userId: workflowRecord.userId,
remaining: rateLimitCheck.remaining,
resetAt: rateLimitCheck.resetAt,
}
)
// Retry in 5 minutes for rate limit
const retryDelay = 5 * 60 * 1000 // 5 minutes
const nextRetryAt = new Date(now.getTime() + retryDelay)
try {
await db
.update(workflowSchedule)
.set({
updatedAt: now,
nextRunAt: nextRetryAt,
})
.where(eq(workflowSchedule.id, schedule.id))
logger.debug(`[${requestId}] Updated next retry time due to rate limit`)
} catch (updateError) {
logger.error(`[${requestId}] Error updating schedule for rate limit:`, updateError)
}
runningExecutions.delete(schedule.workflowId)
continue
}
const usageCheck = await checkServerSideUsageLimits(actorUserId)
if (usageCheck.isExceeded) {
logger.warn(
`[${requestId}] User ${workflowRecord.userId} has exceeded usage limits. Skipping scheduled execution.`,
{
currentUsage: usageCheck.currentUsage,
limit: usageCheck.limit,
workflowId: schedule.workflowId,
}
)
try {
const deployedData = await loadDeployedWorkflowState(schedule.workflowId)
const nextRunAt = calculateNextRunTime(schedule, deployedData.blocks as any)
await db
.update(workflowSchedule)
.set({ updatedAt: now, nextRunAt })
.where(eq(workflowSchedule.id, schedule.id))
} catch (calcErr) {
logger.warn(
`[${requestId}] Unable to calculate nextRunAt while skipping schedule ${schedule.id}`,
calcErr
)
}
runningExecutions.delete(schedule.workflowId)
continue
}
// Execute scheduled workflow immediately (no queuing)
logger.info(`[${requestId}] Executing scheduled workflow ${schedule.workflowId}`)
const useTrigger = isTruthy(env.TRIGGER_DEV_ENABLED)
if (useTrigger) {
const triggerPromises = dueSchedules.map(async (schedule) => {
try {
const executionSuccess = await (async () => {
// Create logging session inside the execution callback
const loggingSession = new LoggingSession(
schedule.workflowId,
executionId,
'schedule',
requestId
)
try {
logger.debug(`[${requestId}] Loading deployed workflow ${schedule.workflowId}`)
const deployedData = await loadDeployedWorkflowState(schedule.workflowId)
const blocks = deployedData.blocks
const edges = deployedData.edges
const loops = deployedData.loops
const parallels = deployedData.parallels
logger.info(`[${requestId}] Loaded deployed workflow ${schedule.workflowId}`)
// Validate that the schedule's trigger block exists in the deployed state
if (schedule.blockId) {
const blockExists = await blockExistsInDeployment(
schedule.workflowId,
schedule.blockId
)
if (!blockExists) {
logger.warn(
`[${requestId}] Schedule trigger block ${schedule.blockId} not found in deployed workflow ${schedule.workflowId}. Skipping execution.`
)
return { skip: true, blocks: {} as Record<string, BlockState> }
}
}
const mergedStates = mergeSubblockState(blocks)
// Retrieve environment variables with workspace precedence
const { personalEncrypted, workspaceEncrypted } = await getPersonalAndWorkspaceEnv(
actorUserId,
workflowRecord.workspaceId || undefined
)
const variables = EnvVarsSchema.parse({
...personalEncrypted,
...workspaceEncrypted,
})
const currentBlockStates = await Object.entries(mergedStates).reduce(
async (accPromise, [id, block]) => {
const acc = await accPromise
acc[id] = await Object.entries(block.subBlocks).reduce(
async (subAccPromise, [key, subBlock]) => {
const subAcc = await subAccPromise
let value = subBlock.value
if (
typeof value === 'string' &&
value.includes('{{') &&
value.includes('}}')
) {
const matches = value.match(/{{([^}]+)}}/g)
if (matches) {
for (const match of matches) {
const varName = match.slice(2, -2)
const encryptedValue = variables[varName]
if (!encryptedValue) {
throw new Error(`Environment variable "${varName}" was not found`)
}
try {
const { decrypted } = await decryptSecret(encryptedValue)
value = (value as string).replace(match, decrypted)
} catch (error: any) {
logger.error(
`[${requestId}] Error decrypting value for variable "${varName}"`,
error
)
throw new Error(
`Failed to decrypt environment variable "${varName}": ${error.message}`
)
}
}
}
}
subAcc[key] = value
return subAcc
},
Promise.resolve({} as Record<string, any>)
)
return acc
},
Promise.resolve({} as Record<string, Record<string, any>>)
)
const decryptedEnvVars: Record<string, string> = {}
for (const [key, encryptedValue] of Object.entries(variables)) {
try {
const { decrypted } = await decryptSecret(encryptedValue)
decryptedEnvVars[key] = decrypted
} catch (error: any) {
logger.error(
`[${requestId}] Failed to decrypt environment variable "${key}"`,
error
)
throw new Error(
`Failed to decrypt environment variable "${key}": ${error.message}`
)
}
}
// Process the block states to ensure response formats are properly parsed
const processedBlockStates = Object.entries(currentBlockStates).reduce(
(acc, [blockId, blockState]) => {
// Check if this block has a responseFormat that needs to be parsed
if (blockState.responseFormat && typeof blockState.responseFormat === 'string') {
const responseFormatValue = blockState.responseFormat.trim()
// Check for variable references like <start.input>
if (responseFormatValue.startsWith('<') && responseFormatValue.includes('>')) {
logger.debug(
`[${requestId}] Response format contains variable reference for block ${blockId}`
)
// Keep variable references as-is - they will be resolved during execution
acc[blockId] = blockState
} else if (responseFormatValue === '') {
// Empty string - remove response format
acc[blockId] = {
...blockState,
responseFormat: undefined,
}
} else {
try {
logger.debug(`[${requestId}] Parsing responseFormat for block ${blockId}`)
// Attempt to parse the responseFormat if it's a string
const parsedResponseFormat = JSON.parse(responseFormatValue)
acc[blockId] = {
...blockState,
responseFormat: parsedResponseFormat,
}
} catch (error) {
logger.warn(
`[${requestId}] Failed to parse responseFormat for block ${blockId}, using undefined`,
error
)
// Set to undefined instead of keeping malformed JSON - this allows execution to continue
acc[blockId] = {
...blockState,
responseFormat: undefined,
}
}
}
} else {
acc[blockId] = blockState
}
return acc
},
{} as Record<string, Record<string, any>>
)
// Get workflow variables
let workflowVariables = {}
if (workflowRecord.variables) {
try {
if (typeof workflowRecord.variables === 'string') {
workflowVariables = JSON.parse(workflowRecord.variables)
} else {
workflowVariables = workflowRecord.variables
}
} catch (error) {
logger.error(`Failed to parse workflow variables: ${schedule.workflowId}`, error)
}
}
const serializedWorkflow = new Serializer().serializeWorkflow(
mergedStates,
edges,
loops,
parallels,
true // Enable validation during execution
)
const input = {
_context: {
workflowId: schedule.workflowId,
},
}
// Start logging with environment variables
await loggingSession.safeStart({
userId: actorUserId,
workspaceId: workflowRecord.workspaceId || '',
variables: variables || {},
})
const executor = new Executor({
workflow: serializedWorkflow,
currentBlockStates: processedBlockStates,
envVarValues: decryptedEnvVars,
workflowInput: input,
workflowVariables,
contextExtensions: {
executionId,
workspaceId: workflowRecord.workspaceId || '',
isDeployedContext: true,
},
})
// Set up logging on the executor
loggingSession.setupExecutor(executor)
const result = await executor.execute(
schedule.workflowId,
schedule.blockId || undefined
)
const executionResult =
'stream' in result && 'execution' in result ? result.execution : result
logger.info(`[${requestId}] Workflow execution completed: ${schedule.workflowId}`, {
success: executionResult.success,
executionTime: executionResult.metadata?.duration,
})
if (executionResult.success) {
await updateWorkflowRunCounts(schedule.workflowId)
try {
await db
.update(userStats)
.set({
totalScheduledExecutions: sql`total_scheduled_executions + 1`,
lastActive: now,
})
.where(eq(userStats.userId, actorUserId))
logger.debug(`[${requestId}] Updated user stats for scheduled execution`)
} catch (statsError) {
logger.error(`[${requestId}] Error updating user stats:`, statsError)
}
}
const { traceSpans, totalDuration } = buildTraceSpans(executionResult)
// Complete logging
await loggingSession.safeComplete({
endedAt: new Date().toISOString(),
totalDurationMs: totalDuration || 0,
finalOutput: executionResult.output || {},
traceSpans: (traceSpans || []) as any,
})
return { success: executionResult.success, blocks, executionResult }
} catch (earlyError: any) {
// Handle errors that occur before workflow execution (e.g., missing data, env vars, etc.)
logger.error(
`[${requestId}] Early failure in scheduled workflow ${schedule.workflowId}`,
earlyError
)
// Create a minimal log entry for early failures
try {
await loggingSession.safeStart({
userId: workflowRecord.userId,
workspaceId: workflowRecord.workspaceId || '',
variables: {},
})
await loggingSession.safeCompleteWithError({
error: {
message: `Schedule execution failed before workflow started: ${earlyError.message}`,
stackTrace: earlyError.stack,
},
traceSpans: [],
})
} catch (loggingError) {
logger.error(
`[${requestId}] Failed to create log entry for early schedule failure`,
loggingError
)
}
// Re-throw the error to be handled by the outer catch block
throw earlyError
}
})()
// Check if execution was skipped (e.g., trigger block not found)
if ('skip' in executionSuccess && executionSuccess.skip) {
runningExecutions.delete(schedule.workflowId)
continue
const payload = {
scheduleId: schedule.id,
workflowId: schedule.workflowId,
blockId: schedule.blockId || undefined,
cronExpression: schedule.cronExpression || undefined,
lastRanAt: schedule.lastRanAt?.toISOString(),
failedCount: schedule.failedCount || 0,
now: now.toISOString(),
}
if (executionSuccess.success) {
logger.info(`[${requestId}] Workflow ${schedule.workflowId} executed successfully`)
const nextRunAt = calculateNextRunTime(schedule, executionSuccess.blocks)
logger.debug(
`[${requestId}] Calculated next run time: ${nextRunAt.toISOString()} for workflow ${schedule.workflowId}`
)
try {
await db
.update(workflowSchedule)
.set({
lastRanAt: now,
updatedAt: now,
nextRunAt,
failedCount: 0, // Reset failure count on success
})
.where(eq(workflowSchedule.id, schedule.id))
logger.debug(
`[${requestId}] Updated next run time for workflow ${schedule.workflowId} to ${nextRunAt.toISOString()}`
)
} catch (updateError) {
logger.error(`[${requestId}] Error updating schedule after success:`, updateError)
}
} else {
logger.warn(`[${requestId}] Workflow ${schedule.workflowId} execution failed`)
const newFailedCount = (schedule.failedCount || 0) + 1
const shouldDisable = newFailedCount >= MAX_CONSECUTIVE_FAILURES
const nextRunAt = calculateNextRunTime(schedule, executionSuccess.blocks)
if (shouldDisable) {
logger.warn(
`[${requestId}] Disabling schedule for workflow ${schedule.workflowId} after ${MAX_CONSECUTIVE_FAILURES} consecutive failures`
)
}
try {
await db
.update(workflowSchedule)
.set({
updatedAt: now,
nextRunAt,
failedCount: newFailedCount,
lastFailedAt: now,
status: shouldDisable ? 'disabled' : 'active',
})
.where(eq(workflowSchedule.id, schedule.id))
logger.debug(`[${requestId}] Updated schedule after failure`)
} catch (updateError) {
logger.error(`[${requestId}] Error updating schedule after failure:`, updateError)
}
}
} catch (error: any) {
// Handle sync queue overload
if (error.message?.includes('Service overloaded')) {
logger.warn(`[${requestId}] Service overloaded, retrying schedule in 5 minutes`)
const retryDelay = 5 * 60 * 1000 // 5 minutes
const nextRetryAt = new Date(now.getTime() + retryDelay)
try {
await db
.update(workflowSchedule)
.set({
updatedAt: now,
nextRunAt: nextRetryAt,
})
.where(eq(workflowSchedule.id, schedule.id))
logger.debug(`[${requestId}] Updated schedule retry time due to service overload`)
} catch (updateError) {
logger.error(
`[${requestId}] Error updating schedule for service overload:`,
updateError
)
}
} else {
logger.error(
`[${requestId}] Error executing scheduled workflow ${schedule.workflowId}`,
error
)
// Ensure we create a log entry for this failed execution
try {
const failureLoggingSession = new LoggingSession(
schedule.workflowId,
executionId,
'schedule',
requestId
)
await failureLoggingSession.safeStart({
userId: workflowRecord.userId,
workspaceId: workflowRecord.workspaceId || '',
variables: {},
})
await failureLoggingSession.safeCompleteWithError({
error: {
message: `Schedule execution failed: ${error.message}`,
stackTrace: error.stack,
},
traceSpans: [],
})
} catch (loggingError) {
logger.error(
`[${requestId}] Failed to create log entry for failed schedule execution`,
loggingError
)
}
let nextRunAt: Date
try {
const [workflowRecord] = await db
.select()
.from(workflow)
.where(eq(workflow.id, schedule.workflowId))
.limit(1)
if (workflowRecord?.isDeployed) {
try {
const deployedData = await loadDeployedWorkflowState(schedule.workflowId)
nextRunAt = calculateNextRunTime(schedule, deployedData.blocks as any)
} catch {
nextRunAt = new Date(now.getTime() + 24 * 60 * 60 * 1000)
}
} else {
nextRunAt = new Date(now.getTime() + 24 * 60 * 60 * 1000)
}
} catch (workflowError) {
logger.error(
`[${requestId}] Error retrieving workflow for next run calculation`,
workflowError
)
nextRunAt = new Date(now.getTime() + 24 * 60 * 60 * 1000) // 24 hours as a fallback
}
const newFailedCount = (schedule.failedCount || 0) + 1
const shouldDisable = newFailedCount >= MAX_CONSECUTIVE_FAILURES
if (shouldDisable) {
logger.warn(
`[${requestId}] Disabling schedule for workflow ${schedule.workflowId} after ${MAX_CONSECUTIVE_FAILURES} consecutive failures`
)
}
try {
await db
.update(workflowSchedule)
.set({
updatedAt: now,
nextRunAt,
failedCount: newFailedCount,
lastFailedAt: now,
status: shouldDisable ? 'disabled' : 'active',
})
.where(eq(workflowSchedule.id, schedule.id))
logger.debug(`[${requestId}] Updated schedule after execution error`)
} catch (updateError) {
logger.error(
`[${requestId}] Error updating schedule after execution error:`,
updateError
)
}
}
} finally {
runningExecutions.delete(schedule.workflowId)
const handle = await tasks.trigger('schedule-execution', payload)
logger.info(
`[${requestId}] Queued schedule execution task ${handle.id} for workflow ${schedule.workflowId}`
)
return handle
} catch (error) {
logger.error(
`[${requestId}] Failed to trigger schedule execution for workflow ${schedule.workflowId}`,
error
)
return null
}
} catch (error: any) {
logger.error(`[${requestId}] Error in scheduled execution handler`, error)
return NextResponse.json({ error: error.message }, { status: 500 })
}
})
await Promise.allSettled(triggerPromises)
logger.info(`[${requestId}] Queued ${dueSchedules.length} schedule executions to Trigger.dev`)
} else {
const directExecutionPromises = dueSchedules.map(async (schedule) => {
const payload = {
scheduleId: schedule.id,
workflowId: schedule.workflowId,
blockId: schedule.blockId || undefined,
cronExpression: schedule.cronExpression || undefined,
lastRanAt: schedule.lastRanAt?.toISOString(),
failedCount: schedule.failedCount || 0,
now: now.toISOString(),
}
void executeScheduleJob(payload).catch((error) => {
logger.error(
`[${requestId}] Direct schedule execution failed for workflow ${schedule.workflowId}`,
error
)
})
logger.info(
`[${requestId}] Queued direct schedule execution for workflow ${schedule.workflowId} (Trigger.dev disabled)`
)
})
await Promise.allSettled(directExecutionPromises)
logger.info(
`[${requestId}] Queued ${dueSchedules.length} direct schedule executions (Trigger.dev disabled)`
)
}
return NextResponse.json({
message: 'Scheduled workflow executions processed',
executedCount: dueSchedules.length,
})
} catch (error: any) {
logger.error(`[${requestId}] Error in scheduled execution handler`, error)
return NextResponse.json({ error: error.message }, { status: 500 })
}
return NextResponse.json({
message: 'Scheduled workflow executions processed',
executedCount: dueSchedules.length,
})
}

View File

@@ -0,0 +1,598 @@
import { db, userStats, workflow, workflowSchedule } from '@sim/db'
import { task } from '@trigger.dev/sdk'
import { Cron } from 'croner'
import { eq, sql } from 'drizzle-orm'
import { v4 as uuidv4 } from 'uuid'
import { getApiKeyOwnerUserId } from '@/lib/api-key/service'
import { checkServerSideUsageLimits } from '@/lib/billing'
import { getHighestPrioritySubscription } from '@/lib/billing/core/subscription'
import { getPersonalAndWorkspaceEnv } from '@/lib/environment/utils'
import { createLogger } from '@/lib/logs/console/logger'
import { LoggingSession } from '@/lib/logs/execution/logging-session'
import { buildTraceSpans } from '@/lib/logs/execution/trace-spans/trace-spans'
import {
type BlockState,
calculateNextRunTime as calculateNextTime,
getScheduleTimeValues,
getSubBlockValue,
} from '@/lib/schedules/utils'
import { decryptSecret } from '@/lib/utils'
import { blockExistsInDeployment, loadDeployedWorkflowState } from '@/lib/workflows/db-helpers'
import { updateWorkflowRunCounts } from '@/lib/workflows/utils'
import { Executor } from '@/executor'
import { Serializer } from '@/serializer'
import { RateLimiter } from '@/services/queue'
import { mergeSubblockState } from '@/stores/workflows/server-utils'
const logger = createLogger('TriggerScheduleExecution')
const MAX_CONSECUTIVE_FAILURES = 3
export type ScheduleExecutionPayload = {
scheduleId: string
workflowId: string
blockId?: string
cronExpression?: string
lastRanAt?: string
failedCount?: number
now: string
}
function calculateNextRunTime(
schedule: { cronExpression?: string; lastRanAt?: string },
blocks: Record<string, BlockState>
): Date {
const scheduleBlock = Object.values(blocks).find(
(block) => block.type === 'starter' || block.type === 'schedule'
)
if (!scheduleBlock) throw new Error('No starter or schedule block found')
const scheduleType = getSubBlockValue(scheduleBlock, 'scheduleType')
const scheduleValues = getScheduleTimeValues(scheduleBlock)
if (schedule.cronExpression) {
const cron = new Cron(schedule.cronExpression)
const nextDate = cron.nextRun()
if (!nextDate) throw new Error('Invalid cron expression or no future occurrences')
return nextDate
}
const lastRanAt = schedule.lastRanAt ? new Date(schedule.lastRanAt) : null
return calculateNextTime(scheduleType, scheduleValues, lastRanAt)
}
export async function executeScheduleJob(payload: ScheduleExecutionPayload) {
const executionId = uuidv4()
const requestId = executionId.slice(0, 8)
const now = new Date(payload.now)
logger.info(`[${requestId}] Starting schedule execution`, {
scheduleId: payload.scheduleId,
workflowId: payload.workflowId,
executionId,
})
const EnvVarsSchema = (await import('zod')).z.record((await import('zod')).z.string())
try {
const [workflowRecord] = await db
.select()
.from(workflow)
.where(eq(workflow.id, payload.workflowId))
.limit(1)
if (!workflowRecord) {
logger.warn(`[${requestId}] Workflow ${payload.workflowId} not found`)
return
}
const actorUserId = await getApiKeyOwnerUserId(workflowRecord.pinnedApiKeyId)
if (!actorUserId) {
logger.warn(
`[${requestId}] Skipping schedule ${payload.scheduleId}: pinned API key required to attribute usage.`
)
return
}
const userSubscription = await getHighestPrioritySubscription(actorUserId)
const rateLimiter = new RateLimiter()
const rateLimitCheck = await rateLimiter.checkRateLimitWithSubscription(
actorUserId,
userSubscription,
'schedule',
false
)
if (!rateLimitCheck.allowed) {
logger.warn(
`[${requestId}] Rate limit exceeded for scheduled workflow ${payload.workflowId}`,
{
userId: workflowRecord.userId,
remaining: rateLimitCheck.remaining,
resetAt: rateLimitCheck.resetAt,
}
)
const retryDelay = 5 * 60 * 1000
const nextRetryAt = new Date(now.getTime() + retryDelay)
try {
await db
.update(workflowSchedule)
.set({
updatedAt: now,
nextRunAt: nextRetryAt,
})
.where(eq(workflowSchedule.id, payload.scheduleId))
logger.debug(`[${requestId}] Updated next retry time due to rate limit`)
} catch (updateError) {
logger.error(`[${requestId}] Error updating schedule for rate limit:`, updateError)
}
return
}
const usageCheck = await checkServerSideUsageLimits(actorUserId)
if (usageCheck.isExceeded) {
logger.warn(
`[${requestId}] User ${workflowRecord.userId} has exceeded usage limits. Skipping scheduled execution.`,
{
currentUsage: usageCheck.currentUsage,
limit: usageCheck.limit,
workflowId: payload.workflowId,
}
)
try {
const deployedData = await loadDeployedWorkflowState(payload.workflowId)
const nextRunAt = calculateNextRunTime(payload, deployedData.blocks as any)
await db
.update(workflowSchedule)
.set({ updatedAt: now, nextRunAt })
.where(eq(workflowSchedule.id, payload.scheduleId))
} catch (calcErr) {
logger.warn(
`[${requestId}] Unable to calculate nextRunAt while skipping schedule ${payload.scheduleId}`,
calcErr
)
}
return
}
logger.info(`[${requestId}] Executing scheduled workflow ${payload.workflowId}`)
const loggingSession = new LoggingSession(
payload.workflowId,
executionId,
'schedule',
requestId
)
try {
const executionSuccess = await (async () => {
try {
logger.debug(`[${requestId}] Loading deployed workflow ${payload.workflowId}`)
const deployedData = await loadDeployedWorkflowState(payload.workflowId)
const blocks = deployedData.blocks
const edges = deployedData.edges
const loops = deployedData.loops
const parallels = deployedData.parallels
logger.info(`[${requestId}] Loaded deployed workflow ${payload.workflowId}`)
if (payload.blockId) {
const blockExists = await blockExistsInDeployment(payload.workflowId, payload.blockId)
if (!blockExists) {
logger.warn(
`[${requestId}] Schedule trigger block ${payload.blockId} not found in deployed workflow ${payload.workflowId}. Skipping execution.`
)
return { skip: true, blocks: {} as Record<string, BlockState> }
}
}
const mergedStates = mergeSubblockState(blocks)
const { personalEncrypted, workspaceEncrypted } = await getPersonalAndWorkspaceEnv(
actorUserId,
workflowRecord.workspaceId || undefined
)
const variables = EnvVarsSchema.parse({
...personalEncrypted,
...workspaceEncrypted,
})
const currentBlockStates = await Object.entries(mergedStates).reduce(
async (accPromise, [id, block]) => {
const acc = await accPromise
acc[id] = await Object.entries(block.subBlocks).reduce(
async (subAccPromise, [key, subBlock]) => {
const subAcc = await subAccPromise
let value = subBlock.value
if (typeof value === 'string' && value.includes('{{') && value.includes('}}')) {
const matches = value.match(/{{([^}]+)}}/g)
if (matches) {
for (const match of matches) {
const varName = match.slice(2, -2)
const encryptedValue = variables[varName]
if (!encryptedValue) {
throw new Error(`Environment variable "${varName}" was not found`)
}
try {
const { decrypted } = await decryptSecret(encryptedValue)
value = (value as string).replace(match, decrypted)
} catch (error: any) {
logger.error(
`[${requestId}] Error decrypting value for variable "${varName}"`,
error
)
throw new Error(
`Failed to decrypt environment variable "${varName}": ${error.message}`
)
}
}
}
}
subAcc[key] = value
return subAcc
},
Promise.resolve({} as Record<string, any>)
)
return acc
},
Promise.resolve({} as Record<string, Record<string, any>>)
)
const decryptedEnvVars: Record<string, string> = {}
for (const [key, encryptedValue] of Object.entries(variables)) {
try {
const { decrypted } = await decryptSecret(encryptedValue)
decryptedEnvVars[key] = decrypted
} catch (error: any) {
logger.error(`[${requestId}] Failed to decrypt environment variable "${key}"`, error)
throw new Error(`Failed to decrypt environment variable "${key}": ${error.message}`)
}
}
const processedBlockStates = Object.entries(currentBlockStates).reduce(
(acc, [blockId, blockState]) => {
if (blockState.responseFormat && typeof blockState.responseFormat === 'string') {
const responseFormatValue = blockState.responseFormat.trim()
if (responseFormatValue.startsWith('<') && responseFormatValue.includes('>')) {
logger.debug(
`[${requestId}] Response format contains variable reference for block ${blockId}`
)
acc[blockId] = blockState
} else if (responseFormatValue === '') {
acc[blockId] = {
...blockState,
responseFormat: undefined,
}
} else {
try {
logger.debug(`[${requestId}] Parsing responseFormat for block ${blockId}`)
const parsedResponseFormat = JSON.parse(responseFormatValue)
acc[blockId] = {
...blockState,
responseFormat: parsedResponseFormat,
}
} catch (error) {
logger.warn(
`[${requestId}] Failed to parse responseFormat for block ${blockId}, using undefined`,
error
)
acc[blockId] = {
...blockState,
responseFormat: undefined,
}
}
}
} else {
acc[blockId] = blockState
}
return acc
},
{} as Record<string, Record<string, any>>
)
let workflowVariables = {}
if (workflowRecord.variables) {
try {
if (typeof workflowRecord.variables === 'string') {
workflowVariables = JSON.parse(workflowRecord.variables)
} else {
workflowVariables = workflowRecord.variables
}
} catch (error) {
logger.error(`Failed to parse workflow variables: ${payload.workflowId}`, error)
}
}
const serializedWorkflow = new Serializer().serializeWorkflow(
mergedStates,
edges,
loops,
parallels,
true
)
const input = {
_context: {
workflowId: payload.workflowId,
},
}
await loggingSession.safeStart({
userId: actorUserId,
workspaceId: workflowRecord.workspaceId || '',
variables: variables || {},
})
const executor = new Executor({
workflow: serializedWorkflow,
currentBlockStates: processedBlockStates,
envVarValues: decryptedEnvVars,
workflowInput: input,
workflowVariables,
contextExtensions: {
executionId,
workspaceId: workflowRecord.workspaceId || '',
isDeployedContext: true,
},
})
loggingSession.setupExecutor(executor)
const result = await executor.execute(payload.workflowId, payload.blockId || undefined)
const executionResult =
'stream' in result && 'execution' in result ? result.execution : result
logger.info(`[${requestId}] Workflow execution completed: ${payload.workflowId}`, {
success: executionResult.success,
executionTime: executionResult.metadata?.duration,
})
if (executionResult.success) {
await updateWorkflowRunCounts(payload.workflowId)
try {
await db
.update(userStats)
.set({
totalScheduledExecutions: sql`total_scheduled_executions + 1`,
lastActive: now,
})
.where(eq(userStats.userId, actorUserId))
logger.debug(`[${requestId}] Updated user stats for scheduled execution`)
} catch (statsError) {
logger.error(`[${requestId}] Error updating user stats:`, statsError)
}
}
const { traceSpans, totalDuration } = buildTraceSpans(executionResult)
await loggingSession.safeComplete({
endedAt: new Date().toISOString(),
totalDurationMs: totalDuration || 0,
finalOutput: executionResult.output || {},
traceSpans: (traceSpans || []) as any,
})
return { success: executionResult.success, blocks, executionResult }
} catch (earlyError: any) {
logger.error(
`[${requestId}] Early failure in scheduled workflow ${payload.workflowId}`,
earlyError
)
try {
await loggingSession.safeStart({
userId: workflowRecord.userId,
workspaceId: workflowRecord.workspaceId || '',
variables: {},
})
await loggingSession.safeCompleteWithError({
error: {
message: `Schedule execution failed before workflow started: ${earlyError.message}`,
stackTrace: earlyError.stack,
},
traceSpans: [],
})
} catch (loggingError) {
logger.error(
`[${requestId}] Failed to create log entry for early schedule failure`,
loggingError
)
}
throw earlyError
}
})()
if ('skip' in executionSuccess && executionSuccess.skip) {
return
}
if (executionSuccess.success) {
logger.info(`[${requestId}] Workflow ${payload.workflowId} executed successfully`)
const nextRunAt = calculateNextRunTime(payload, executionSuccess.blocks)
logger.debug(
`[${requestId}] Calculated next run time: ${nextRunAt.toISOString()} for workflow ${payload.workflowId}`
)
try {
await db
.update(workflowSchedule)
.set({
lastRanAt: now,
updatedAt: now,
nextRunAt,
failedCount: 0,
})
.where(eq(workflowSchedule.id, payload.scheduleId))
logger.debug(
`[${requestId}] Updated next run time for workflow ${payload.workflowId} to ${nextRunAt.toISOString()}`
)
} catch (updateError) {
logger.error(`[${requestId}] Error updating schedule after success:`, updateError)
}
} else {
logger.warn(`[${requestId}] Workflow ${payload.workflowId} execution failed`)
const newFailedCount = (payload.failedCount || 0) + 1
const shouldDisable = newFailedCount >= MAX_CONSECUTIVE_FAILURES
const nextRunAt = calculateNextRunTime(payload, executionSuccess.blocks)
if (shouldDisable) {
logger.warn(
`[${requestId}] Disabling schedule for workflow ${payload.workflowId} after ${MAX_CONSECUTIVE_FAILURES} consecutive failures`
)
}
try {
await db
.update(workflowSchedule)
.set({
updatedAt: now,
nextRunAt,
failedCount: newFailedCount,
lastFailedAt: now,
status: shouldDisable ? 'disabled' : 'active',
})
.where(eq(workflowSchedule.id, payload.scheduleId))
logger.debug(`[${requestId}] Updated schedule after failure`)
} catch (updateError) {
logger.error(`[${requestId}] Error updating schedule after failure:`, updateError)
}
}
} catch (error: any) {
if (error.message?.includes('Service overloaded')) {
logger.warn(`[${requestId}] Service overloaded, retrying schedule in 5 minutes`)
const retryDelay = 5 * 60 * 1000
const nextRetryAt = new Date(now.getTime() + retryDelay)
try {
await db
.update(workflowSchedule)
.set({
updatedAt: now,
nextRunAt: nextRetryAt,
})
.where(eq(workflowSchedule.id, payload.scheduleId))
logger.debug(`[${requestId}] Updated schedule retry time due to service overload`)
} catch (updateError) {
logger.error(`[${requestId}] Error updating schedule for service overload:`, updateError)
}
} else {
logger.error(
`[${requestId}] Error executing scheduled workflow ${payload.workflowId}`,
error
)
try {
const failureLoggingSession = new LoggingSession(
payload.workflowId,
executionId,
'schedule',
requestId
)
await failureLoggingSession.safeStart({
userId: workflowRecord.userId,
workspaceId: workflowRecord.workspaceId || '',
variables: {},
})
await failureLoggingSession.safeCompleteWithError({
error: {
message: `Schedule execution failed: ${error.message}`,
stackTrace: error.stack,
},
traceSpans: [],
})
} catch (loggingError) {
logger.error(
`[${requestId}] Failed to create log entry for failed schedule execution`,
loggingError
)
}
let nextRunAt: Date
try {
const [workflowRecord] = await db
.select()
.from(workflow)
.where(eq(workflow.id, payload.workflowId))
.limit(1)
if (workflowRecord?.isDeployed) {
try {
const deployedData = await loadDeployedWorkflowState(payload.workflowId)
nextRunAt = calculateNextRunTime(payload, deployedData.blocks as any)
} catch {
nextRunAt = new Date(now.getTime() + 24 * 60 * 60 * 1000)
}
} else {
nextRunAt = new Date(now.getTime() + 24 * 60 * 60 * 1000)
}
} catch (workflowError) {
logger.error(
`[${requestId}] Error retrieving workflow for next run calculation`,
workflowError
)
nextRunAt = new Date(now.getTime() + 24 * 60 * 60 * 1000)
}
const newFailedCount = (payload.failedCount || 0) + 1
const shouldDisable = newFailedCount >= MAX_CONSECUTIVE_FAILURES
if (shouldDisable) {
logger.warn(
`[${requestId}] Disabling schedule for workflow ${payload.workflowId} after ${MAX_CONSECUTIVE_FAILURES} consecutive failures`
)
}
try {
await db
.update(workflowSchedule)
.set({
updatedAt: now,
nextRunAt,
failedCount: newFailedCount,
lastFailedAt: now,
status: shouldDisable ? 'disabled' : 'active',
})
.where(eq(workflowSchedule.id, payload.scheduleId))
logger.debug(`[${requestId}] Updated schedule after execution error`)
} catch (updateError) {
logger.error(`[${requestId}] Error updating schedule after execution error:`, updateError)
}
}
}
} catch (error: any) {
logger.error(`[${requestId}] Error processing schedule ${payload.scheduleId}`, error)
}
}
export const scheduleExecution = task({
id: 'schedule-execution',
retry: {
maxAttempts: 1,
},
run: async (payload: ScheduleExecutionPayload) => executeScheduleJob(payload),
})

View File

@@ -41,6 +41,9 @@ spec:
securityContext:
{{- toYaml . | nindent 14 }}
{{- end }}
env:
- name: CRON_SECRET
value: {{ $.Values.app.env.CRON_SECRET | quote }}
command:
- /bin/sh
- -c
@@ -58,6 +61,7 @@ spec:
if curl -f -s -S --max-time 60 --retry 2 --retry-delay 5 \
-H "Content-Type: application/json" \
-H "User-Agent: Kubernetes-CronJob/{{ $jobConfig.name }}" \
-H "Authorization: Bearer ${CRON_SECRET}" \
"$SERVICE_URL{{ $jobConfig.path }}"; then
echo "Success: HTTP request completed"
exit 0