diff --git a/apps/docs/content/docs/en/execution/costs.mdx b/apps/docs/content/docs/en/execution/costs.mdx index 9be402ce7..c3524f180 100644 --- a/apps/docs/content/docs/en/execution/costs.mdx +++ b/apps/docs/content/docs/en/execution/costs.mdx @@ -217,16 +217,16 @@ Different subscription plans have different usage limits: Workflows have maximum execution time limits based on your subscription plan: -| Plan | Sync Execution Limit | -|------|---------------------| -| **Free** | 5 minutes | -| **Pro** | 60 minutes | -| **Team** | 60 minutes | -| **Enterprise** | 60 minutes | +| Plan | Sync Execution | Async Execution | +|------|----------------|-----------------| +| **Free** | 5 minutes | 10 minutes | +| **Pro** | 60 minutes | 90 minutes | +| **Team** | 60 minutes | 90 minutes | +| **Enterprise** | 60 minutes | 90 minutes | **Sync executions** run immediately and return results directly. These are triggered via the API with `async: false` (default) or through the UI. -**Async executions** (triggered via API with `async: true`, webhooks, or schedules) run in the background with a 90-minute time limit for all plans. +**Async executions** (triggered via API with `async: true`, webhooks, or schedules) run in the background. Async time limits are 2x the sync limit, capped at 90 minutes. If a workflow exceeds its time limit, it will be terminated and marked as failed with a timeout error. Design long-running workflows to use async execution or break them into smaller workflows. diff --git a/apps/sim/app/api/tools/video/route.ts b/apps/sim/app/api/tools/video/route.ts index 7391acf58..dd131cad3 100644 --- a/apps/sim/app/api/tools/video/route.ts +++ b/apps/sim/app/api/tools/video/route.ts @@ -1,7 +1,7 @@ import { createLogger } from '@sim/logger' import { type NextRequest, NextResponse } from 'next/server' import { checkInternalAuth } from '@/lib/auth/hybrid' -import { DEFAULT_EXECUTION_TIMEOUT_MS } from '@/lib/core/execution-limits' +import { getMaxExecutionTimeout } from '@/lib/core/execution-limits' import { downloadFileFromStorage } from '@/lib/uploads/utils/file-utils.server' import type { UserFile } from '@/executor/types' import type { VideoRequestBody } from '@/tools/video/types' @@ -328,7 +328,7 @@ async function generateWithRunway( logger.info(`[${requestId}] Runway task created: ${taskId}`) const pollIntervalMs = 5000 - const maxAttempts = Math.ceil(DEFAULT_EXECUTION_TIMEOUT_MS / pollIntervalMs) + const maxAttempts = Math.ceil(getMaxExecutionTimeout() / pollIntervalMs) let attempts = 0 while (attempts < maxAttempts) { @@ -372,7 +372,7 @@ async function generateWithRunway( attempts++ } - throw new Error('Runway generation timed out after 10 minutes') + throw new Error('Runway generation timed out') } async function generateWithVeo( @@ -432,7 +432,7 @@ async function generateWithVeo( logger.info(`[${requestId}] Veo operation created: ${operationName}`) const pollIntervalMs = 5000 - const maxAttempts = Math.ceil(DEFAULT_EXECUTION_TIMEOUT_MS / pollIntervalMs) + const maxAttempts = Math.ceil(getMaxExecutionTimeout() / pollIntervalMs) let attempts = 0 while (attempts < maxAttempts) { @@ -488,7 +488,7 @@ async function generateWithVeo( attempts++ } - throw new Error('Veo generation timed out after 5 minutes') + throw new Error('Veo generation timed out') } async function generateWithLuma( @@ -545,7 +545,7 @@ async function generateWithLuma( logger.info(`[${requestId}] Luma generation created: ${generationId}`) const pollIntervalMs = 5000 - const maxAttempts = Math.ceil(DEFAULT_EXECUTION_TIMEOUT_MS / pollIntervalMs) + const maxAttempts = Math.ceil(getMaxExecutionTimeout() / pollIntervalMs) let attempts = 0 while (attempts < maxAttempts) { @@ -596,7 +596,7 @@ async function generateWithLuma( attempts++ } - throw new Error('Luma generation timed out after 10 minutes') + throw new Error('Luma generation timed out') } async function generateWithMiniMax( @@ -663,7 +663,7 @@ async function generateWithMiniMax( logger.info(`[${requestId}] MiniMax task created: ${taskId}`) const pollIntervalMs = 5000 - const maxAttempts = Math.ceil(DEFAULT_EXECUTION_TIMEOUT_MS / pollIntervalMs) + const maxAttempts = Math.ceil(getMaxExecutionTimeout() / pollIntervalMs) let attempts = 0 while (attempts < maxAttempts) { @@ -746,7 +746,7 @@ async function generateWithMiniMax( attempts++ } - throw new Error('MiniMax generation timed out after 10 minutes') + throw new Error('MiniMax generation timed out') } // Helper function to strip subpaths from Fal.ai model IDs for status/result endpoints @@ -865,7 +865,7 @@ async function generateWithFalAI( const baseModelId = getBaseModelId(falModelId) const pollIntervalMs = 5000 - const maxAttempts = Math.ceil(DEFAULT_EXECUTION_TIMEOUT_MS / pollIntervalMs) + const maxAttempts = Math.ceil(getMaxExecutionTimeout() / pollIntervalMs) let attempts = 0 while (attempts < maxAttempts) { @@ -942,7 +942,7 @@ async function generateWithFalAI( attempts++ } - throw new Error('Fal.ai generation timed out after 8 minutes') + throw new Error('Fal.ai generation timed out') } function getVideoDimensions( diff --git a/apps/sim/app/api/workflows/[id]/execute/route.ts b/apps/sim/app/api/workflows/[id]/execute/route.ts index e7905aa53..d372e3e26 100644 --- a/apps/sim/app/api/workflows/[id]/execute/route.ts +++ b/apps/sim/app/api/workflows/[id]/execute/route.ts @@ -517,6 +517,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id: cachedWorkflowData?.blocks || {} ) const streamVariables = cachedWorkflowData?.variables ?? (workflow as any).variables + const streamingTimeout = preprocessResult.executionTimeout?.sync const stream = await createStreamingResponse({ requestId, @@ -535,6 +536,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id: workflowTriggerType: triggerType === 'chat' ? 'chat' : 'api', includeFileBase64, base64MaxBytes, + abortSignal: streamingTimeout ? AbortSignal.timeout(streamingTimeout) : undefined, }, executionId, }) diff --git a/apps/sim/background/schedule-execution.ts b/apps/sim/background/schedule-execution.ts index c52676b8d..c43bb6f53 100644 --- a/apps/sim/background/schedule-execution.ts +++ b/apps/sim/background/schedule-execution.ts @@ -4,6 +4,7 @@ import { task } from '@trigger.dev/sdk' import { Cron } from 'croner' import { eq } from 'drizzle-orm' import { v4 as uuidv4 } from 'uuid' +import { getTimeoutErrorMessage } from '@/lib/core/execution-limits' import { preprocessExecution } from '@/lib/execution/preprocessing' import { LoggingSession } from '@/lib/logs/execution/logging-session' import { buildTraceSpans } from '@/lib/logs/execution/trace-spans/trace-spans' @@ -120,6 +121,7 @@ async function runWorkflowExecution({ loggingSession, requestId, executionId, + asyncTimeout, }: { payload: ScheduleExecutionPayload workflowRecord: WorkflowRecord @@ -127,6 +129,7 @@ async function runWorkflowExecution({ loggingSession: LoggingSession requestId: string executionId: string + asyncTimeout?: number }): Promise { try { logger.debug(`[${requestId}] Loading deployed workflow ${payload.workflowId}`) @@ -181,15 +184,38 @@ async function runWorkflowExecution({ [] ) - const executionResult = await executeWorkflowCore({ - snapshot, - callbacks: {}, - loggingSession, - includeFileBase64: true, - base64MaxBytes: undefined, - }) + const abortController = new AbortController() + let isTimedOut = false + let timeoutId: NodeJS.Timeout | undefined - if (executionResult.status === 'paused') { + if (asyncTimeout) { + timeoutId = setTimeout(() => { + isTimedOut = true + abortController.abort() + }, asyncTimeout) + } + + let executionResult + try { + executionResult = await executeWorkflowCore({ + snapshot, + callbacks: {}, + loggingSession, + includeFileBase64: true, + base64MaxBytes: undefined, + abortSignal: abortController.signal, + }) + } finally { + if (timeoutId) clearTimeout(timeoutId) + } + + if (executionResult.status === 'cancelled' && isTimedOut && asyncTimeout) { + const timeoutErrorMessage = getTimeoutErrorMessage(null, asyncTimeout) + logger.info(`[${requestId}] Scheduled workflow execution timed out`, { + timeoutMs: asyncTimeout, + }) + await loggingSession.markAsFailed(timeoutErrorMessage) + } else if (executionResult.status === 'paused') { if (!executionResult.snapshotSeed) { logger.error(`[${requestId}] Missing snapshot seed for paused execution`, { executionId, @@ -453,6 +479,7 @@ export async function executeScheduleJob(payload: ScheduleExecutionPayload) { loggingSession, requestId, executionId, + asyncTimeout: preprocessResult.executionTimeout?.async, }) if (executionResult.status === 'skip') { diff --git a/apps/sim/background/webhook-execution.ts b/apps/sim/background/webhook-execution.ts index e5e3d3007..30403a3d8 100644 --- a/apps/sim/background/webhook-execution.ts +++ b/apps/sim/background/webhook-execution.ts @@ -4,7 +4,10 @@ import { createLogger } from '@sim/logger' import { task } from '@trigger.dev/sdk' import { eq } from 'drizzle-orm' import { v4 as uuidv4 } from 'uuid' +import { getHighestPrioritySubscription } from '@/lib/billing' +import { getExecutionTimeout, getTimeoutErrorMessage } from '@/lib/core/execution-limits' import { IdempotencyService, webhookIdempotency } from '@/lib/core/idempotency' +import type { SubscriptionPlan } from '@/lib/core/rate-limiter/types' import { processExecutionFiles } from '@/lib/execution/files' import { LoggingSession } from '@/lib/logs/execution/logging-session' import { buildTraceSpans } from '@/lib/logs/execution/trace-spans/trace-spans' @@ -134,7 +137,22 @@ async function executeWebhookJobInternal( requestId ) - // Track deploymentVersionId at function scope so it's available in catch block + const userSubscription = await getHighestPrioritySubscription(payload.userId) + const asyncTimeout = getExecutionTimeout( + userSubscription?.plan as SubscriptionPlan | undefined, + 'async' + ) + const abortController = new AbortController() + let isTimedOut = false + let timeoutId: NodeJS.Timeout | undefined + + if (asyncTimeout) { + timeoutId = setTimeout(() => { + isTimedOut = true + abortController.abort() + }, asyncTimeout) + } + let deploymentVersionId: string | undefined try { @@ -241,11 +259,18 @@ async function executeWebhookJobInternal( snapshot, callbacks: {}, loggingSession, - includeFileBase64: true, // Enable base64 hydration - base64MaxBytes: undefined, // Use default limit + includeFileBase64: true, + base64MaxBytes: undefined, + abortSignal: abortController.signal, }) - if (executionResult.status === 'paused') { + if (executionResult.status === 'cancelled' && isTimedOut && asyncTimeout) { + const timeoutErrorMessage = getTimeoutErrorMessage(null, asyncTimeout) + logger.info(`[${requestId}] Airtable webhook execution timed out`, { + timeoutMs: asyncTimeout, + }) + await loggingSession.markAsFailed(timeoutErrorMessage) + } else if (executionResult.status === 'paused') { if (!executionResult.snapshotSeed) { logger.error(`[${requestId}] Missing snapshot seed for paused execution`, { executionId, @@ -497,9 +522,14 @@ async function executeWebhookJobInternal( callbacks: {}, loggingSession, includeFileBase64: true, + abortSignal: abortController.signal, }) - if (executionResult.status === 'paused') { + if (executionResult.status === 'cancelled' && isTimedOut && asyncTimeout) { + const timeoutErrorMessage = getTimeoutErrorMessage(null, asyncTimeout) + logger.info(`[${requestId}] Webhook execution timed out`, { timeoutMs: asyncTimeout }) + await loggingSession.markAsFailed(timeoutErrorMessage) + } else if (executionResult.status === 'paused') { if (!executionResult.snapshotSeed) { logger.error(`[${requestId}] Missing snapshot seed for paused execution`, { executionId, @@ -601,6 +631,8 @@ async function executeWebhookJobInternal( } throw error + } finally { + if (timeoutId) clearTimeout(timeoutId) } } diff --git a/apps/sim/background/workflow-execution.ts b/apps/sim/background/workflow-execution.ts index 99e83d54c..47d2fcb64 100644 --- a/apps/sim/background/workflow-execution.ts +++ b/apps/sim/background/workflow-execution.ts @@ -1,6 +1,7 @@ import { createLogger } from '@sim/logger' import { task } from '@trigger.dev/sdk' import { v4 as uuidv4 } from 'uuid' +import { getTimeoutErrorMessage } from '@/lib/core/execution-limits' import { preprocessExecution } from '@/lib/execution/preprocessing' import { LoggingSession } from '@/lib/logs/execution/logging-session' import { buildTraceSpans } from '@/lib/logs/execution/trace-spans/trace-spans' @@ -103,15 +104,37 @@ export async function executeWorkflowJob(payload: WorkflowExecutionPayload) { [] ) - const result = await executeWorkflowCore({ - snapshot, - callbacks: {}, - loggingSession, - includeFileBase64: true, - base64MaxBytes: undefined, - }) + const asyncTimeout = preprocessResult.executionTimeout?.async + const abortController = new AbortController() + let isTimedOut = false + let timeoutId: NodeJS.Timeout | undefined - if (result.status === 'paused') { + if (asyncTimeout) { + timeoutId = setTimeout(() => { + isTimedOut = true + abortController.abort() + }, asyncTimeout) + } + + let result + try { + result = await executeWorkflowCore({ + snapshot, + callbacks: {}, + loggingSession, + includeFileBase64: true, + base64MaxBytes: undefined, + abortSignal: abortController.signal, + }) + } finally { + if (timeoutId) clearTimeout(timeoutId) + } + + if (result.status === 'cancelled' && isTimedOut && asyncTimeout) { + const timeoutErrorMessage = getTimeoutErrorMessage(null, asyncTimeout) + logger.info(`[${requestId}] Workflow execution timed out`, { timeoutMs: asyncTimeout }) + await loggingSession.markAsFailed(timeoutErrorMessage) + } else if (result.status === 'paused') { if (!result.snapshotSeed) { logger.error(`[${requestId}] Missing snapshot seed for paused execution`, { executionId, diff --git a/apps/sim/lib/core/execution-limits/types.ts b/apps/sim/lib/core/execution-limits/types.ts index 51cfbcb10..ea4b31b86 100644 --- a/apps/sim/lib/core/execution-limits/types.ts +++ b/apps/sim/lib/core/execution-limits/types.ts @@ -6,14 +6,15 @@ interface ExecutionTimeoutConfig { async: number } -const DEFAULT_SYNC_TIMEOUTS = { +const DEFAULT_SYNC_TIMEOUTS_SECONDS = { free: 300, pro: 3600, team: 3600, enterprise: 3600, } as const -const ASYNC_TIMEOUT_SECONDS = 5400 +const ASYNC_MULTIPLIER = 2 +const MAX_ASYNC_TIMEOUT_SECONDS = 5400 function getSyncTimeoutForPlan(plan: SubscriptionPlan): number { const envVarMap: Record = { @@ -22,25 +23,32 @@ function getSyncTimeoutForPlan(plan: SubscriptionPlan): number { team: env.EXECUTION_TIMEOUT_TEAM, enterprise: env.EXECUTION_TIMEOUT_ENTERPRISE, } - return (Number.parseInt(envVarMap[plan] || '') || DEFAULT_SYNC_TIMEOUTS[plan]) * 1000 + return (Number.parseInt(envVarMap[plan] || '') || DEFAULT_SYNC_TIMEOUTS_SECONDS[plan]) * 1000 +} + +function getAsyncTimeoutForPlan(plan: SubscriptionPlan): number { + const syncMs = getSyncTimeoutForPlan(plan) + const asyncMs = syncMs * ASYNC_MULTIPLIER + const maxAsyncMs = MAX_ASYNC_TIMEOUT_SECONDS * 1000 + return Math.min(asyncMs, maxAsyncMs) } const EXECUTION_TIMEOUTS: Record = { free: { sync: getSyncTimeoutForPlan('free'), - async: ASYNC_TIMEOUT_SECONDS * 1000, + async: getAsyncTimeoutForPlan('free'), }, pro: { sync: getSyncTimeoutForPlan('pro'), - async: ASYNC_TIMEOUT_SECONDS * 1000, + async: getAsyncTimeoutForPlan('pro'), }, team: { sync: getSyncTimeoutForPlan('team'), - async: ASYNC_TIMEOUT_SECONDS * 1000, + async: getAsyncTimeoutForPlan('team'), }, enterprise: { sync: getSyncTimeoutForPlan('enterprise'), - async: ASYNC_TIMEOUT_SECONDS * 1000, + async: getAsyncTimeoutForPlan('enterprise'), }, } @@ -58,18 +66,17 @@ export function getMaxExecutionTimeout(): number { export const DEFAULT_EXECUTION_TIMEOUT_MS = EXECUTION_TIMEOUTS.free.sync export function isTimeoutError(error: unknown): boolean { - if (!(error instanceof Error)) return false + if (!error) return false - const name = error.name.toLowerCase() - const message = error.message.toLowerCase() + if (error instanceof Error) { + return error.name === 'TimeoutError' + } - return ( - name === 'timeouterror' || - name === 'aborterror' || - message.includes('timeout') || - message.includes('timed out') || - message.includes('aborted') - ) + if (typeof error === 'object' && 'name' in error) { + return (error as { name: string }).name === 'TimeoutError' + } + + return false } export function getTimeoutErrorMessage(error: unknown, timeoutMs?: number): string { diff --git a/apps/sim/lib/workflows/executor/execute-workflow.ts b/apps/sim/lib/workflows/executor/execute-workflow.ts index 1ed65c119..8edce5526 100644 --- a/apps/sim/lib/workflows/executor/execute-workflow.ts +++ b/apps/sim/lib/workflows/executor/execute-workflow.ts @@ -19,6 +19,7 @@ export interface ExecuteWorkflowOptions { skipLoggingComplete?: boolean includeFileBase64?: boolean base64MaxBytes?: number + abortSignal?: AbortSignal } export interface WorkflowInfo { @@ -82,6 +83,7 @@ export async function executeWorkflow( loggingSession, includeFileBase64: streamConfig?.includeFileBase64, base64MaxBytes: streamConfig?.base64MaxBytes, + abortSignal: streamConfig?.abortSignal, }) if (result.status === 'paused') { diff --git a/apps/sim/lib/workflows/streaming/streaming.ts b/apps/sim/lib/workflows/streaming/streaming.ts index 88e7a584d..b990d53c4 100644 --- a/apps/sim/lib/workflows/streaming/streaming.ts +++ b/apps/sim/lib/workflows/streaming/streaming.ts @@ -32,6 +32,7 @@ export interface StreamingConfig { workflowTriggerType?: 'api' | 'chat' includeFileBase64?: boolean base64MaxBytes?: number + abortSignal?: AbortSignal } export interface StreamingResponseOptions { @@ -284,6 +285,7 @@ export async function createStreamingResponse( skipLoggingComplete: true, includeFileBase64: streamConfig.includeFileBase64, base64MaxBytes: streamConfig.base64MaxBytes, + abortSignal: streamConfig.abortSignal, }, executionId ) diff --git a/apps/sim/tools/browser_use/run_task.ts b/apps/sim/tools/browser_use/run_task.ts index 6f30e8a03..9422282a0 100644 --- a/apps/sim/tools/browser_use/run_task.ts +++ b/apps/sim/tools/browser_use/run_task.ts @@ -1,12 +1,12 @@ import { createLogger } from '@sim/logger' -import { DEFAULT_EXECUTION_TIMEOUT_MS } from '@/lib/core/execution-limits' +import { getMaxExecutionTimeout } from '@/lib/core/execution-limits' import type { BrowserUseRunTaskParams, BrowserUseRunTaskResponse } from '@/tools/browser_use/types' import type { ToolConfig, ToolResponse } from '@/tools/types' const logger = createLogger('BrowserUseTool') const POLL_INTERVAL_MS = 5000 -const MAX_POLL_TIME_MS = DEFAULT_EXECUTION_TIMEOUT_MS +const MAX_POLL_TIME_MS = getMaxExecutionTimeout() const MAX_CONSECUTIVE_ERRORS = 3 async function createSessionWithProfile(