From 7c1e7273de7f63b209a3bc475f1b32e235601882 Mon Sep 17 00:00:00 2001 From: Vikhyath Mondreti Date: Mon, 2 Feb 2026 19:24:09 -0800 Subject: [PATCH] feat(timeouts): execution timeout limits --- .../sim/app/api/mcp/serve/[serverId]/route.ts | 3 +- apps/sim/app/api/mcp/tools/execute/route.ts | 11 +- .../[id]/execute-from-block/route.ts | 65 ++++++++-- .../app/api/workflows/[id]/execute/route.ts | 96 ++++++++++---- .../hooks/use-workflow-execution.ts | 74 ++++++----- apps/sim/executor/constants.ts | 10 +- .../executor/handlers/wait/wait-handler.ts | 10 +- apps/sim/lib/a2a/constants.ts | 5 +- .../sim/lib/copilot/tools/client/base-tool.ts | 6 +- apps/sim/lib/core/config/env.ts | 5 + apps/sim/lib/core/execution-limits/index.ts | 1 + apps/sim/lib/core/execution-limits/types.ts | 122 ++++++++++++++++++ apps/sim/lib/execution/constants.ts | 8 +- apps/sim/lib/execution/preprocessing.ts | 16 ++- .../sim/lib/logs/execution/logging-session.ts | 11 +- apps/sim/lib/mcp/client.ts | 3 +- apps/sim/lib/mcp/shared.ts | 2 +- apps/sim/lib/mcp/utils.test.ts | 8 +- apps/sim/lib/mcp/utils.ts | 16 +-- .../workflows/executor/execution-events.ts | 6 - apps/sim/tools/apify/run_actor_async.ts | 5 +- apps/sim/tools/browser_use/run_task.ts | 3 +- apps/sim/tools/exa/research.ts | 5 +- apps/sim/tools/firecrawl/agent.ts | 5 +- apps/sim/tools/firecrawl/crawl.ts | 5 +- apps/sim/tools/firecrawl/extract.ts | 5 +- apps/sim/tools/index.ts | 4 +- apps/sim/tools/utils.ts | 5 +- apps/sim/trigger.config.ts | 2 +- 29 files changed, 390 insertions(+), 127 deletions(-) create mode 100644 apps/sim/lib/core/execution-limits/index.ts create mode 100644 apps/sim/lib/core/execution-limits/types.ts diff --git a/apps/sim/app/api/mcp/serve/[serverId]/route.ts b/apps/sim/app/api/mcp/serve/[serverId]/route.ts index 2359d9019..9d9f917bd 100644 --- a/apps/sim/app/api/mcp/serve/[serverId]/route.ts +++ b/apps/sim/app/api/mcp/serve/[serverId]/route.ts @@ -21,6 +21,7 @@ import { and, eq } from 'drizzle-orm' import { type NextRequest, NextResponse } from 'next/server' import { checkHybridAuth } from '@/lib/auth/hybrid' import { generateInternalToken } from '@/lib/auth/internal' +import { getMaxExecutionTimeout } from '@/lib/core/execution-limits' import { getBaseUrl } from '@/lib/core/utils/urls' const logger = createLogger('WorkflowMcpServeAPI') @@ -264,7 +265,7 @@ async function handleToolsCall( method: 'POST', headers, body: JSON.stringify({ input: params.arguments || {}, triggerType: 'mcp' }), - signal: AbortSignal.timeout(600000), // 10 minute timeout + signal: AbortSignal.timeout(getMaxExecutionTimeout()), }) const executeResult = await response.json() diff --git a/apps/sim/app/api/mcp/tools/execute/route.ts b/apps/sim/app/api/mcp/tools/execute/route.ts index fe0736ba1..8371b88c2 100644 --- a/apps/sim/app/api/mcp/tools/execute/route.ts +++ b/apps/sim/app/api/mcp/tools/execute/route.ts @@ -1,5 +1,7 @@ import { createLogger } from '@sim/logger' import type { NextRequest } from 'next/server' +import { getHighestPrioritySubscription } from '@/lib/billing/core/plan' +import { getExecutionTimeout } from '@/lib/core/execution-limits' import { getParsedBody, withMcpAuth } from '@/lib/mcp/middleware' import { mcpService } from '@/lib/mcp/service' import type { McpTool, McpToolCall, McpToolResult } from '@/lib/mcp/types' @@ -7,7 +9,6 @@ import { categorizeError, createMcpErrorResponse, createMcpSuccessResponse, - MCP_CONSTANTS, validateStringParam, } from '@/lib/mcp/utils' @@ -171,13 +172,13 @@ export const POST = withMcpAuth('read')( arguments: args, } + const userSubscription = await getHighestPrioritySubscription(userId) + const executionTimeout = getExecutionTimeout(userSubscription?.plan, 'sync') + const result = await Promise.race([ mcpService.executeTool(userId, serverId, toolCall, workspaceId), new Promise((_, reject) => - setTimeout( - () => reject(new Error('Tool execution timeout')), - MCP_CONSTANTS.EXECUTION_TIMEOUT - ) + setTimeout(() => reject(new Error('Tool execution timeout')), executionTimeout) ), ]) diff --git a/apps/sim/app/api/workflows/[id]/execute-from-block/route.ts b/apps/sim/app/api/workflows/[id]/execute-from-block/route.ts index dd59f3338..03654e61b 100644 --- a/apps/sim/app/api/workflows/[id]/execute-from-block/route.ts +++ b/apps/sim/app/api/workflows/[id]/execute-from-block/route.ts @@ -3,6 +3,7 @@ import { type NextRequest, NextResponse } from 'next/server' import { v4 as uuidv4 } from 'uuid' import { z } from 'zod' import { checkHybridAuth } from '@/lib/auth/hybrid' +import { getTimeoutErrorMessage, isTimeoutError } from '@/lib/core/execution-limits' import { generateRequestId } from '@/lib/core/utils/request' import { SSE_HEADERS } from '@/lib/core/utils/sse' import { markExecutionCancelled } from '@/lib/execution/cancellation' @@ -116,6 +117,16 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id: const loggingSession = new LoggingSession(workflowId, executionId, 'manual', requestId) const abortController = new AbortController() let isStreamClosed = false + let isTimedOut = false + + const syncTimeout = preprocessResult.executionTimeout?.sync + let timeoutId: NodeJS.Timeout | undefined + if (syncTimeout) { + timeoutId = setTimeout(() => { + isTimedOut = true + abortController.abort() + }, syncTimeout) + } const stream = new ReadableStream({ async start(controller) { @@ -167,13 +178,33 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id: }) if (result.status === 'cancelled') { - sendEvent({ - type: 'execution:cancelled', - timestamp: new Date().toISOString(), - executionId, - workflowId, - data: { duration: result.metadata?.duration || 0 }, - }) + if (isTimedOut && syncTimeout) { + const timeoutErrorMessage = getTimeoutErrorMessage(null, syncTimeout) + logger.info(`[${requestId}] Run-from-block execution timed out`, { + timeoutMs: syncTimeout, + }) + + await loggingSession.markAsFailed(timeoutErrorMessage) + + sendEvent({ + type: 'execution:error', + timestamp: new Date().toISOString(), + executionId, + workflowId, + data: { + error: timeoutErrorMessage, + duration: result.metadata?.duration || 0, + }, + }) + } else { + sendEvent({ + type: 'execution:cancelled', + timestamp: new Date().toISOString(), + executionId, + workflowId, + data: { duration: result.metadata?.duration || 0 }, + }) + } } else { sendEvent({ type: 'execution:completed', @@ -190,11 +221,25 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id: }) } } catch (error: unknown) { - const errorMessage = error instanceof Error ? error.message : 'Unknown error' - logger.error(`[${requestId}] Run-from-block execution failed: ${errorMessage}`) + const isTimeout = isTimeoutError(error) || isTimedOut + const errorMessage = isTimeout + ? getTimeoutErrorMessage(error, syncTimeout) + : error instanceof Error + ? error.message + : 'Unknown error' + + logger.error(`[${requestId}] Run-from-block execution failed: ${errorMessage}`, { + isTimeout, + }) const executionResult = hasExecutionResult(error) ? error.executionResult : undefined + await loggingSession.safeCompleteWithError({ + totalDurationMs: executionResult?.metadata?.duration, + error: { message: errorMessage }, + traceSpans: executionResult?.logs as any, + }) + sendEvent({ type: 'execution:error', timestamp: new Date().toISOString(), @@ -206,6 +251,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id: }, }) } finally { + if (timeoutId) clearTimeout(timeoutId) if (!isStreamClosed) { try { controller.enqueue(new TextEncoder().encode('data: [DONE]\n\n')) @@ -216,6 +262,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id: }, cancel() { isStreamClosed = true + if (timeoutId) clearTimeout(timeoutId) abortController.abort() markExecutionCancelled(executionId).catch(() => {}) }, diff --git a/apps/sim/app/api/workflows/[id]/execute/route.ts b/apps/sim/app/api/workflows/[id]/execute/route.ts index 53161e42a..e7905aa53 100644 --- a/apps/sim/app/api/workflows/[id]/execute/route.ts +++ b/apps/sim/app/api/workflows/[id]/execute/route.ts @@ -5,6 +5,7 @@ import { validate as uuidValidate, v4 as uuidv4 } from 'uuid' import { z } from 'zod' import { checkHybridAuth } from '@/lib/auth/hybrid' import { isTriggerDevEnabled } from '@/lib/core/config/feature-flags' +import { getTimeoutErrorMessage, isTimeoutError } from '@/lib/core/execution-limits' import { generateRequestId } from '@/lib/core/utils/request' import { SSE_HEADERS } from '@/lib/core/utils/sse' import { getBaseUrl } from '@/lib/core/utils/urls' @@ -120,10 +121,6 @@ type AsyncExecutionParams = { triggerType: CoreTriggerType } -/** - * Handles async workflow execution by queueing a background job. - * Returns immediately with a 202 Accepted response containing the job ID. - */ async function handleAsyncExecution(params: AsyncExecutionParams): Promise { const { requestId, workflowId, userId, input, triggerType } = params @@ -405,6 +402,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id: if (!enableSSE) { logger.info(`[${requestId}] Using non-SSE execution (direct JSON response)`) + const syncTimeout = preprocessResult.executionTimeout?.sync try { const metadata: ExecutionMetadata = { requestId, @@ -438,6 +436,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id: includeFileBase64, base64MaxBytes, stopAfterBlockId, + abortSignal: syncTimeout ? AbortSignal.timeout(syncTimeout) : undefined, }) const outputWithBase64 = includeFileBase64 @@ -473,11 +472,23 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id: return NextResponse.json(filteredResult) } catch (error: unknown) { - const errorMessage = error instanceof Error ? error.message : 'Unknown error' - logger.error(`[${requestId}] Non-SSE execution failed: ${errorMessage}`) + const isTimeout = isTimeoutError(error) + const errorMessage = isTimeout + ? getTimeoutErrorMessage(error, syncTimeout) + : error instanceof Error + ? error.message + : 'Unknown error' + + logger.error(`[${requestId}] Non-SSE execution failed: ${errorMessage}`, { isTimeout }) const executionResult = hasExecutionResult(error) ? error.executionResult : undefined + await loggingSession.safeCompleteWithError({ + totalDurationMs: executionResult?.metadata?.duration, + error: { message: errorMessage }, + traceSpans: executionResult?.logs as any, + }) + return NextResponse.json( { success: false, @@ -491,7 +502,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id: } : undefined, }, - { status: 500 } + { status: isTimeout ? 408 : 500 } ) } } @@ -537,6 +548,16 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id: const encoder = new TextEncoder() const abortController = new AbortController() let isStreamClosed = false + let isTimedOut = false + + const syncTimeout = preprocessResult.executionTimeout?.sync + let timeoutId: NodeJS.Timeout | undefined + if (syncTimeout) { + timeoutId = setTimeout(() => { + isTimedOut = true + abortController.abort() + }, syncTimeout) + } const stream = new ReadableStream({ async start(controller) { @@ -763,16 +784,35 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id: } if (result.status === 'cancelled') { - logger.info(`[${requestId}] Workflow execution was cancelled`) - sendEvent({ - type: 'execution:cancelled', - timestamp: new Date().toISOString(), - executionId, - workflowId, - data: { - duration: result.metadata?.duration || 0, - }, - }) + if (isTimedOut && syncTimeout) { + const timeoutErrorMessage = getTimeoutErrorMessage(null, syncTimeout) + logger.info(`[${requestId}] Workflow execution timed out`, { timeoutMs: syncTimeout }) + + await loggingSession.markAsFailed(timeoutErrorMessage) + + sendEvent({ + type: 'execution:error', + timestamp: new Date().toISOString(), + executionId, + workflowId, + data: { + error: timeoutErrorMessage, + duration: result.metadata?.duration || 0, + }, + }) + } else { + logger.info(`[${requestId}] Workflow execution was cancelled`) + + sendEvent({ + type: 'execution:cancelled', + timestamp: new Date().toISOString(), + executionId, + workflowId, + data: { + duration: result.metadata?.duration || 0, + }, + }) + } return } @@ -799,11 +839,23 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id: // Cleanup base64 cache for this execution await cleanupExecutionBase64Cache(executionId) } catch (error: unknown) { - const errorMessage = error instanceof Error ? error.message : 'Unknown error' - logger.error(`[${requestId}] SSE execution failed: ${errorMessage}`) + const isTimeout = isTimeoutError(error) || isTimedOut + const errorMessage = isTimeout + ? getTimeoutErrorMessage(error, syncTimeout) + : error instanceof Error + ? error.message + : 'Unknown error' + + logger.error(`[${requestId}] SSE execution failed: ${errorMessage}`, { isTimeout }) const executionResult = hasExecutionResult(error) ? error.executionResult : undefined + await loggingSession.safeCompleteWithError({ + totalDurationMs: executionResult?.metadata?.duration, + error: { message: errorMessage }, + traceSpans: executionResult?.logs as any, + }) + sendEvent({ type: 'execution:error', timestamp: new Date().toISOString(), @@ -815,18 +867,18 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id: }, }) } finally { + if (timeoutId) clearTimeout(timeoutId) if (!isStreamClosed) { try { controller.enqueue(encoder.encode('data: [DONE]\n\n')) controller.close() - } catch { - // Stream already closed - nothing to do - } + } catch {} } } }, cancel() { isStreamClosed = true + if (timeoutId) clearTimeout(timeoutId) logger.info(`[${requestId}] Client aborted SSE stream, signalling cancellation`) abortController.abort() markExecutionCancelled(executionId).catch(() => {}) diff --git a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/hooks/use-workflow-execution.ts b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/hooks/use-workflow-execution.ts index 6dcad6c17..74b3df7ad 100644 --- a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/hooks/use-workflow-execution.ts +++ b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/hooks/use-workflow-execution.ts @@ -27,7 +27,7 @@ import { useExecutionStore } from '@/stores/execution' import { useNotificationStore } from '@/stores/notifications' import { useVariablesStore } from '@/stores/panel' import { useEnvironmentStore } from '@/stores/settings/environment' -import { type ConsoleEntry, useTerminalConsoleStore } from '@/stores/terminal' +import { useTerminalConsoleStore } from '@/stores/terminal' import { useWorkflowDiffStore } from '@/stores/workflow-diff' import { useWorkflowRegistry } from '@/stores/workflows/registry/store' import { mergeSubblockState } from '@/stores/workflows/utils' @@ -1153,30 +1153,29 @@ export function useWorkflowExecution() { logs: accumulatedBlockLogs, } - // Only add workflow-level error if no blocks have executed yet - // This catches pre-execution errors (validation, serialization, etc.) - // Block execution errors are already logged via onBlockError callback - const { entries } = useTerminalConsoleStore.getState() - const existingLogs = entries.filter( - (log: ConsoleEntry) => log.executionId === executionId - ) + if (activeWorkflowId) { + cancelRunningEntries(activeWorkflowId) + } - if (existingLogs.length === 0) { - // No blocks executed yet - this is a pre-execution error - addConsole({ - input: {}, - output: {}, - success: false, - error: data.error, - durationMs: data.duration || 0, - startedAt: new Date(Date.now() - (data.duration || 0)).toISOString(), - endedAt: new Date().toISOString(), - workflowId: activeWorkflowId, - blockId: 'validation', - executionId, - blockName: 'Workflow Validation', - blockType: 'validation', - }) + addConsole({ + input: {}, + output: {}, + success: false, + error: data.error, + durationMs: data.duration || 0, + startedAt: new Date(Date.now() - (data.duration || 0)).toISOString(), + endedAt: new Date().toISOString(), + workflowId: activeWorkflowId, + blockId: 'workflow-error', + executionId, + blockName: 'Workflow Error', + blockType: 'error', + }) + }, + + onExecutionCancelled: () => { + if (activeWorkflowId) { + cancelRunningEntries(activeWorkflowId) } }, }, @@ -1718,13 +1717,28 @@ export function useWorkflowExecution() { 'Workflow was modified. Run the workflow again to enable running from block.', workflowId, }) - } else { - addNotification({ - level: 'error', - message: data.error || 'Run from block failed', - workflowId, - }) } + + cancelRunningEntries(workflowId) + + addConsole({ + input: {}, + output: {}, + success: false, + error: data.error, + durationMs: data.duration || 0, + startedAt: new Date(Date.now() - (data.duration || 0)).toISOString(), + endedAt: new Date().toISOString(), + workflowId, + blockId: 'workflow-error', + executionId, + blockName: 'Workflow Error', + blockType: 'error', + }) + }, + + onExecutionCancelled: () => { + cancelRunningEntries(workflowId) }, }, }) diff --git a/apps/sim/executor/constants.ts b/apps/sim/executor/constants.ts index 8cfbaed58..dcbe957db 100644 --- a/apps/sim/executor/constants.ts +++ b/apps/sim/executor/constants.ts @@ -185,10 +185,16 @@ export const HTTP = { }, } as const +import { getMaxExecutionTimeout } from '@/lib/core/execution-limits' + export const AGENT = { DEFAULT_MODEL: 'claude-sonnet-4-5', - DEFAULT_FUNCTION_TIMEOUT: 600000, - REQUEST_TIMEOUT: 600000, + get DEFAULT_FUNCTION_TIMEOUT() { + return getMaxExecutionTimeout() + }, + get REQUEST_TIMEOUT() { + return getMaxExecutionTimeout() + }, CUSTOM_TOOL_PREFIX: 'custom_', } as const diff --git a/apps/sim/executor/handlers/wait/wait-handler.ts b/apps/sim/executor/handlers/wait/wait-handler.ts index 5d62509f0..09ce055ec 100644 --- a/apps/sim/executor/handlers/wait/wait-handler.ts +++ b/apps/sim/executor/handlers/wait/wait-handler.ts @@ -14,7 +14,7 @@ const sleep = async (ms: number, options: SleepOptions = {}): Promise = const { signal, executionId } = options const useRedis = isRedisCancellationEnabled() && !!executionId - if (!useRedis && signal?.aborted) { + if (signal?.aborted) { return false } @@ -27,7 +27,7 @@ const sleep = async (ms: number, options: SleepOptions = {}): Promise = const cleanup = () => { if (mainTimeoutId) clearTimeout(mainTimeoutId) if (checkIntervalId) clearInterval(checkIntervalId) - if (!useRedis && signal) signal.removeEventListener('abort', onAbort) + if (signal) signal.removeEventListener('abort', onAbort) } const onAbort = () => { @@ -37,6 +37,10 @@ const sleep = async (ms: number, options: SleepOptions = {}): Promise = resolve(false) } + if (signal) { + signal.addEventListener('abort', onAbort, { once: true }) + } + if (useRedis) { checkIntervalId = setInterval(async () => { if (resolved) return @@ -49,8 +53,6 @@ const sleep = async (ms: number, options: SleepOptions = {}): Promise = } } catch {} }, CANCELLATION_CHECK_INTERVAL_MS) - } else if (signal) { - signal.addEventListener('abort', onAbort, { once: true }) } mainTimeoutId = setTimeout(() => { diff --git a/apps/sim/lib/a2a/constants.ts b/apps/sim/lib/a2a/constants.ts index 902715185..d55dec2f5 100644 --- a/apps/sim/lib/a2a/constants.ts +++ b/apps/sim/lib/a2a/constants.ts @@ -1,7 +1,10 @@ export { AGENT_CARD_PATH } from '@a2a-js/sdk' + +import { DEFAULT_EXECUTION_TIMEOUT_MS } from '@/lib/core/execution-limits' + export const A2A_PROTOCOL_VERSION = '0.3.0' -export const A2A_DEFAULT_TIMEOUT = 300000 +export const A2A_DEFAULT_TIMEOUT = DEFAULT_EXECUTION_TIMEOUT_MS /** * Maximum number of messages stored per task in the database. diff --git a/apps/sim/lib/copilot/tools/client/base-tool.ts b/apps/sim/lib/copilot/tools/client/base-tool.ts index 75b02bfe2..522dc1d04 100644 --- a/apps/sim/lib/copilot/tools/client/base-tool.ts +++ b/apps/sim/lib/copilot/tools/client/base-tool.ts @@ -5,11 +5,9 @@ import type { ToolUIConfig } from './ui-config' const baseToolLogger = createLogger('BaseClientTool') -/** Default timeout for tool execution (5 minutes) */ -const DEFAULT_TOOL_TIMEOUT_MS = 2 * 60 * 1000 +const DEFAULT_TOOL_TIMEOUT_MS = 5 * 60 * 1000 -/** Timeout for tools that run workflows (10 minutes) */ -export const WORKFLOW_EXECUTION_TIMEOUT_MS = 10 * 60 * 1000 +export const WORKFLOW_EXECUTION_TIMEOUT_MS = 5 * 60 * 1000 // Client tool call states used by the new runtime export enum ClientToolCallState { diff --git a/apps/sim/lib/core/config/env.ts b/apps/sim/lib/core/config/env.ts index 6bd9df299..60dfb73b3 100644 --- a/apps/sim/lib/core/config/env.ts +++ b/apps/sim/lib/core/config/env.ts @@ -170,6 +170,11 @@ export const env = createEnv({ RATE_LIMIT_ENTERPRISE_SYNC: z.string().optional().default('600'), // Enterprise tier sync API executions per minute RATE_LIMIT_ENTERPRISE_ASYNC: z.string().optional().default('5000'), // Enterprise tier async API executions per minute + EXECUTION_TIMEOUT_FREE: z.string().optional().default('300'), + EXECUTION_TIMEOUT_PRO: z.string().optional().default('3600'), + EXECUTION_TIMEOUT_TEAM: z.string().optional().default('3600'), + EXECUTION_TIMEOUT_ENTERPRISE: z.string().optional().default('3600'), + // Knowledge Base Processing Configuration - Shared across all processing methods KB_CONFIG_MAX_DURATION: z.number().optional().default(600), // Max processing duration in seconds (10 minutes) KB_CONFIG_MAX_ATTEMPTS: z.number().optional().default(3), // Max retry attempts diff --git a/apps/sim/lib/core/execution-limits/index.ts b/apps/sim/lib/core/execution-limits/index.ts new file mode 100644 index 000000000..c9f6f047d --- /dev/null +++ b/apps/sim/lib/core/execution-limits/index.ts @@ -0,0 +1 @@ +export * from './types' diff --git a/apps/sim/lib/core/execution-limits/types.ts b/apps/sim/lib/core/execution-limits/types.ts new file mode 100644 index 000000000..e7e69e232 --- /dev/null +++ b/apps/sim/lib/core/execution-limits/types.ts @@ -0,0 +1,122 @@ +import { env } from '@/lib/core/config/env' +import type { SubscriptionPlan } from '@/lib/core/rate-limiter/types' + +export interface ExecutionTimeoutConfig { + sync: number + async: number +} + +const DEFAULT_SYNC_TIMEOUTS = { + free: 300, + pro: 3600, + team: 3600, + enterprise: 3600, +} as const + +const ASYNC_TIMEOUT_SECONDS = 5400 + +function getSyncTimeoutForPlan(plan: SubscriptionPlan): number { + const envVarMap: Record = { + free: env.EXECUTION_TIMEOUT_FREE, + pro: env.EXECUTION_TIMEOUT_PRO, + team: env.EXECUTION_TIMEOUT_TEAM, + enterprise: env.EXECUTION_TIMEOUT_ENTERPRISE, + } + return (Number.parseInt(envVarMap[plan] || '') || DEFAULT_SYNC_TIMEOUTS[plan]) * 1000 +} + +export const EXECUTION_TIMEOUTS: Record = { + free: { + sync: getSyncTimeoutForPlan('free'), + async: ASYNC_TIMEOUT_SECONDS * 1000, + }, + pro: { + sync: getSyncTimeoutForPlan('pro'), + async: ASYNC_TIMEOUT_SECONDS * 1000, + }, + team: { + sync: getSyncTimeoutForPlan('team'), + async: ASYNC_TIMEOUT_SECONDS * 1000, + }, + enterprise: { + sync: getSyncTimeoutForPlan('enterprise'), + async: ASYNC_TIMEOUT_SECONDS * 1000, + }, +} + +export function getExecutionTimeout( + plan: SubscriptionPlan | undefined, + type: 'sync' | 'async' = 'sync' +): number { + return EXECUTION_TIMEOUTS[plan || 'free'][type] +} + +export function getExecutionTimeoutSeconds( + plan: SubscriptionPlan | undefined, + type: 'sync' | 'async' = 'sync' +): number { + return Math.floor(getExecutionTimeout(plan, type) / 1000) +} + +export function getMaxExecutionTimeout(): number { + return EXECUTION_TIMEOUTS.enterprise.async +} + +export const DEFAULT_EXECUTION_TIMEOUT_MS = EXECUTION_TIMEOUTS.free.sync + +export class ExecutionTimeoutError extends Error { + constructor( + public readonly timeoutMs: number, + public readonly plan?: SubscriptionPlan + ) { + const timeoutSeconds = Math.floor(timeoutMs / 1000) + const timeoutMinutes = Math.floor(timeoutSeconds / 60) + const displayTime = + timeoutMinutes > 0 + ? `${timeoutMinutes} minute${timeoutMinutes > 1 ? 's' : ''}` + : `${timeoutSeconds} seconds` + super(`Execution timed out after ${displayTime}`) + this.name = 'ExecutionTimeoutError' + } +} + +export function isTimeoutError(error: unknown): boolean { + if (error instanceof ExecutionTimeoutError) return true + if (!(error instanceof Error)) return false + + const name = error.name.toLowerCase() + const message = error.message.toLowerCase() + + return ( + name === 'timeouterror' || + name === 'aborterror' || + message.includes('timeout') || + message.includes('timed out') || + message.includes('aborted') + ) +} + +export function createTimeoutError( + timeoutMs: number, + plan?: SubscriptionPlan +): ExecutionTimeoutError { + return new ExecutionTimeoutError(timeoutMs, plan) +} + +export function getTimeoutErrorMessage(error: unknown, timeoutMs?: number): string { + if (error instanceof ExecutionTimeoutError) { + return error.message + } + + if (timeoutMs) { + const timeoutSeconds = Math.floor(timeoutMs / 1000) + const timeoutMinutes = Math.floor(timeoutSeconds / 60) + const displayTime = + timeoutMinutes > 0 + ? `${timeoutMinutes} minute${timeoutMinutes > 1 ? 's' : ''}` + : `${timeoutSeconds} seconds` + return `Execution timed out after ${displayTime}` + } + + return 'Execution timed out' +} diff --git a/apps/sim/lib/execution/constants.ts b/apps/sim/lib/execution/constants.ts index bf095770b..faca9679b 100644 --- a/apps/sim/lib/execution/constants.ts +++ b/apps/sim/lib/execution/constants.ts @@ -1,7 +1,3 @@ -/** - * Execution timeout constants - * - * DEFAULT_EXECUTION_TIMEOUT_MS: The default timeout for executing user code (10 minutes) - */ +import { DEFAULT_EXECUTION_TIMEOUT_MS } from '@/lib/core/execution-limits' -export const DEFAULT_EXECUTION_TIMEOUT_MS = 600000 // 10 minutes (600 seconds) +export { DEFAULT_EXECUTION_TIMEOUT_MS } diff --git a/apps/sim/lib/execution/preprocessing.ts b/apps/sim/lib/execution/preprocessing.ts index 5bbb834e8..f14b78e2d 100644 --- a/apps/sim/lib/execution/preprocessing.ts +++ b/apps/sim/lib/execution/preprocessing.ts @@ -4,6 +4,7 @@ import { createLogger } from '@sim/logger' import { eq } from 'drizzle-orm' import { checkServerSideUsageLimits } from '@/lib/billing/calculations/usage-monitor' import { getHighestPrioritySubscription } from '@/lib/billing/core/subscription' +import { getExecutionTimeout } from '@/lib/core/execution-limits' import { RateLimiter } from '@/lib/core/rate-limiter/rate-limiter' import { LoggingSession } from '@/lib/logs/execution/logging-session' import { getWorkspaceBilledAccountUserId } from '@/lib/workspaces/utils' @@ -133,10 +134,10 @@ export interface PreprocessExecutionResult { success: boolean error?: { message: string - statusCode: number // HTTP status code (401, 402, 403, 404, 429, 500) - logCreated: boolean // Whether error was logged to execution_logs + statusCode: number + logCreated: boolean } - actorUserId?: string // The user ID that will be billed + actorUserId?: string workflowRecord?: WorkflowRecord userSubscription?: SubscriptionInfo | null rateLimitInfo?: { @@ -144,6 +145,10 @@ export interface PreprocessExecutionResult { remaining: number resetAt: Date } + executionTimeout?: { + sync: number + async: number + } } type WorkflowRecord = typeof workflow.$inferSelect @@ -484,12 +489,17 @@ export async function preprocessExecution( triggerType, }) + const plan = userSubscription?.plan return { success: true, actorUserId, workflowRecord, userSubscription, rateLimitInfo, + executionTimeout: { + sync: getExecutionTimeout(plan, 'sync'), + async: getExecutionTimeout(plan, 'async'), + }, } } diff --git a/apps/sim/lib/logs/execution/logging-session.ts b/apps/sim/lib/logs/execution/logging-session.ts index fd3ae55ba..be1515686 100644 --- a/apps/sim/lib/logs/execution/logging-session.ts +++ b/apps/sim/lib/logs/execution/logging-session.ts @@ -776,11 +776,16 @@ export class LoggingSession { await db .update(workflowExecutionLogs) .set({ + level: 'error', status: 'failed', executionData: sql`jsonb_set( - COALESCE(execution_data, '{}'::jsonb), - ARRAY['error'], - to_jsonb(${message}::text) + jsonb_set( + COALESCE(execution_data, '{}'::jsonb), + ARRAY['error'], + to_jsonb(${message}::text) + ), + ARRAY['finalOutput'], + jsonb_build_object('error', ${message}::text) )`, }) .where(eq(workflowExecutionLogs.executionId, executionId)) diff --git a/apps/sim/lib/mcp/client.ts b/apps/sim/lib/mcp/client.ts index 5fdb4adac..b65e9a145 100644 --- a/apps/sim/lib/mcp/client.ts +++ b/apps/sim/lib/mcp/client.ts @@ -12,6 +12,7 @@ import { Client } from '@modelcontextprotocol/sdk/client/index.js' import { StreamableHTTPClientTransport } from '@modelcontextprotocol/sdk/client/streamableHttp.js' import type { ListToolsResult, Tool } from '@modelcontextprotocol/sdk/types.js' import { createLogger } from '@sim/logger' +import { getMaxExecutionTimeout } from '@/lib/core/execution-limits' import { McpConnectionError, type McpConnectionStatus, @@ -202,7 +203,7 @@ export class McpClient { const sdkResult = await this.client.callTool( { name: toolCall.name, arguments: toolCall.arguments }, undefined, - { timeout: 600000 } // 10 minutes - override SDK's 60s default + { timeout: getMaxExecutionTimeout() } ) return sdkResult as McpToolResult diff --git a/apps/sim/lib/mcp/shared.ts b/apps/sim/lib/mcp/shared.ts index 3f3bdae66..33d844199 100644 --- a/apps/sim/lib/mcp/shared.ts +++ b/apps/sim/lib/mcp/shared.ts @@ -34,7 +34,7 @@ export function sanitizeHeaders( * Client-safe MCP constants */ export const MCP_CLIENT_CONSTANTS = { - CLIENT_TIMEOUT: 600000, + CLIENT_TIMEOUT: 5 * 60 * 1000, MAX_RETRIES: 3, RECONNECT_DELAY: 1000, } as const diff --git a/apps/sim/lib/mcp/utils.test.ts b/apps/sim/lib/mcp/utils.test.ts index 1b0cabf98..f46b3932d 100644 --- a/apps/sim/lib/mcp/utils.test.ts +++ b/apps/sim/lib/mcp/utils.test.ts @@ -81,8 +81,8 @@ describe('generateMcpServerId', () => { }) describe('MCP_CONSTANTS', () => { - it.concurrent('has correct execution timeout (10 minutes)', () => { - expect(MCP_CONSTANTS.EXECUTION_TIMEOUT).toBe(600000) + it.concurrent('has correct execution timeout (5 minutes)', () => { + expect(MCP_CONSTANTS.EXECUTION_TIMEOUT).toBe(300000) }) it.concurrent('has correct cache timeout (5 minutes)', () => { @@ -107,8 +107,8 @@ describe('MCP_CONSTANTS', () => { }) describe('MCP_CLIENT_CONSTANTS', () => { - it.concurrent('has correct client timeout (10 minutes)', () => { - expect(MCP_CLIENT_CONSTANTS.CLIENT_TIMEOUT).toBe(600000) + it.concurrent('has correct client timeout (5 minutes)', () => { + expect(MCP_CLIENT_CONSTANTS.CLIENT_TIMEOUT).toBe(300000) }) it.concurrent('has correct auto refresh interval (5 minutes)', () => { diff --git a/apps/sim/lib/mcp/utils.ts b/apps/sim/lib/mcp/utils.ts index ab28a66ee..58bf16c84 100644 --- a/apps/sim/lib/mcp/utils.ts +++ b/apps/sim/lib/mcp/utils.ts @@ -1,12 +1,11 @@ import { NextResponse } from 'next/server' +import { DEFAULT_EXECUTION_TIMEOUT_MS, getExecutionTimeout } from '@/lib/core/execution-limits' +import type { SubscriptionPlan } from '@/lib/core/rate-limiter/types' import type { McpApiResponse } from '@/lib/mcp/types' import { isMcpTool, MCP } from '@/executor/constants' -/** - * MCP-specific constants - */ export const MCP_CONSTANTS = { - EXECUTION_TIMEOUT: 600000, + EXECUTION_TIMEOUT: DEFAULT_EXECUTION_TIMEOUT_MS, CACHE_TIMEOUT: 5 * 60 * 1000, DEFAULT_RETRIES: 3, DEFAULT_CONNECTION_TIMEOUT: 30000, @@ -14,6 +13,10 @@ export const MCP_CONSTANTS = { MAX_CONSECUTIVE_FAILURES: 3, } as const +export function getMcpExecutionTimeout(plan?: SubscriptionPlan): number { + return getExecutionTimeout(plan, 'sync') +} + /** * Core MCP tool parameter keys that are metadata, not user-entered test values. * These should be preserved when cleaning up params during schema updates. @@ -45,11 +48,8 @@ export function sanitizeHeaders( ) } -/** - * Client-safe MCP constants - */ export const MCP_CLIENT_CONSTANTS = { - CLIENT_TIMEOUT: 600000, + CLIENT_TIMEOUT: DEFAULT_EXECUTION_TIMEOUT_MS, AUTO_REFRESH_INTERVAL: 5 * 60 * 1000, } as const diff --git a/apps/sim/lib/workflows/executor/execution-events.ts b/apps/sim/lib/workflows/executor/execution-events.ts index 6c3998e23..ee6e75b56 100644 --- a/apps/sim/lib/workflows/executor/execution-events.ts +++ b/apps/sim/lib/workflows/executor/execution-events.ts @@ -62,9 +62,6 @@ export interface ExecutionErrorEvent extends BaseExecutionEvent { } } -/** - * Execution cancelled event - */ export interface ExecutionCancelledEvent extends BaseExecutionEvent { type: 'execution:cancelled' workflowId: string @@ -171,9 +168,6 @@ export type ExecutionEvent = | StreamChunkEvent | StreamDoneEvent -/** - * Extracted data types for use in callbacks - */ export type ExecutionStartedData = ExecutionStartedEvent['data'] export type ExecutionCompletedData = ExecutionCompletedEvent['data'] export type ExecutionErrorData = ExecutionErrorEvent['data'] diff --git a/apps/sim/tools/apify/run_actor_async.ts b/apps/sim/tools/apify/run_actor_async.ts index 0eeef731d..ca600e11d 100644 --- a/apps/sim/tools/apify/run_actor_async.ts +++ b/apps/sim/tools/apify/run_actor_async.ts @@ -1,8 +1,9 @@ +import { DEFAULT_EXECUTION_TIMEOUT_MS } from '@/lib/core/execution-limits' import type { RunActorParams, RunActorResult } from '@/tools/apify/types' import type { ToolConfig } from '@/tools/types' -const POLL_INTERVAL_MS = 5000 // 5 seconds between polls -const MAX_POLL_TIME_MS = 300000 // 5 minutes maximum polling time +const POLL_INTERVAL_MS = 5000 +const MAX_POLL_TIME_MS = DEFAULT_EXECUTION_TIMEOUT_MS export const apifyRunActorAsyncTool: ToolConfig = { id: 'apify_run_actor_async', diff --git a/apps/sim/tools/browser_use/run_task.ts b/apps/sim/tools/browser_use/run_task.ts index 9dbeeb5b6..6f30e8a03 100644 --- a/apps/sim/tools/browser_use/run_task.ts +++ b/apps/sim/tools/browser_use/run_task.ts @@ -1,11 +1,12 @@ import { createLogger } from '@sim/logger' +import { DEFAULT_EXECUTION_TIMEOUT_MS } from '@/lib/core/execution-limits' import type { BrowserUseRunTaskParams, BrowserUseRunTaskResponse } from '@/tools/browser_use/types' import type { ToolConfig, ToolResponse } from '@/tools/types' const logger = createLogger('BrowserUseTool') const POLL_INTERVAL_MS = 5000 -const MAX_POLL_TIME_MS = 600000 // 10 minutes +const MAX_POLL_TIME_MS = DEFAULT_EXECUTION_TIMEOUT_MS const MAX_CONSECUTIVE_ERRORS = 3 async function createSessionWithProfile( diff --git a/apps/sim/tools/exa/research.ts b/apps/sim/tools/exa/research.ts index 95f08cd07..8af21576f 100644 --- a/apps/sim/tools/exa/research.ts +++ b/apps/sim/tools/exa/research.ts @@ -1,11 +1,12 @@ import { createLogger } from '@sim/logger' +import { DEFAULT_EXECUTION_TIMEOUT_MS } from '@/lib/core/execution-limits' import type { ExaResearchParams, ExaResearchResponse } from '@/tools/exa/types' import type { ToolConfig } from '@/tools/types' const logger = createLogger('ExaResearchTool') -const POLL_INTERVAL_MS = 5000 // 5 seconds between polls -const MAX_POLL_TIME_MS = 300000 // 5 minutes maximum polling time +const POLL_INTERVAL_MS = 5000 +const MAX_POLL_TIME_MS = DEFAULT_EXECUTION_TIMEOUT_MS export const researchTool: ToolConfig = { id: 'exa_research', diff --git a/apps/sim/tools/firecrawl/agent.ts b/apps/sim/tools/firecrawl/agent.ts index 9b5c2e691..bd4b4f41e 100644 --- a/apps/sim/tools/firecrawl/agent.ts +++ b/apps/sim/tools/firecrawl/agent.ts @@ -1,11 +1,12 @@ import { createLogger } from '@sim/logger' +import { DEFAULT_EXECUTION_TIMEOUT_MS } from '@/lib/core/execution-limits' import type { AgentParams, AgentResponse } from '@/tools/firecrawl/types' import type { ToolConfig } from '@/tools/types' const logger = createLogger('FirecrawlAgentTool') -const POLL_INTERVAL_MS = 5000 // 5 seconds between polls -const MAX_POLL_TIME_MS = 300000 // 5 minutes maximum polling time +const POLL_INTERVAL_MS = 5000 +const MAX_POLL_TIME_MS = DEFAULT_EXECUTION_TIMEOUT_MS export const agentTool: ToolConfig = { id: 'firecrawl_agent', diff --git a/apps/sim/tools/firecrawl/crawl.ts b/apps/sim/tools/firecrawl/crawl.ts index d490994ff..04886632d 100644 --- a/apps/sim/tools/firecrawl/crawl.ts +++ b/apps/sim/tools/firecrawl/crawl.ts @@ -1,12 +1,13 @@ import { createLogger } from '@sim/logger' +import { DEFAULT_EXECUTION_TIMEOUT_MS } from '@/lib/core/execution-limits' import type { FirecrawlCrawlParams, FirecrawlCrawlResponse } from '@/tools/firecrawl/types' import { CRAWLED_PAGE_OUTPUT_PROPERTIES } from '@/tools/firecrawl/types' import type { ToolConfig } from '@/tools/types' const logger = createLogger('FirecrawlCrawlTool') -const POLL_INTERVAL_MS = 5000 // 5 seconds between polls -const MAX_POLL_TIME_MS = 300000 // 5 minutes maximum polling time +const POLL_INTERVAL_MS = 5000 +const MAX_POLL_TIME_MS = DEFAULT_EXECUTION_TIMEOUT_MS export const crawlTool: ToolConfig = { id: 'firecrawl_crawl', diff --git a/apps/sim/tools/firecrawl/extract.ts b/apps/sim/tools/firecrawl/extract.ts index 86d76d502..a82aa3eb2 100644 --- a/apps/sim/tools/firecrawl/extract.ts +++ b/apps/sim/tools/firecrawl/extract.ts @@ -1,11 +1,12 @@ import { createLogger } from '@sim/logger' +import { DEFAULT_EXECUTION_TIMEOUT_MS } from '@/lib/core/execution-limits' import type { ExtractParams, ExtractResponse } from '@/tools/firecrawl/types' import type { ToolConfig } from '@/tools/types' const logger = createLogger('FirecrawlExtractTool') -const POLL_INTERVAL_MS = 5000 // 5 seconds between polls -const MAX_POLL_TIME_MS = 300000 // 5 minutes maximum polling time +const POLL_INTERVAL_MS = 5000 +const MAX_POLL_TIME_MS = DEFAULT_EXECUTION_TIMEOUT_MS export const extractTool: ToolConfig = { id: 'firecrawl_extract', diff --git a/apps/sim/tools/index.ts b/apps/sim/tools/index.ts index 3b1c0f15b..b2f36adf6 100644 --- a/apps/sim/tools/index.ts +++ b/apps/sim/tools/index.ts @@ -1,5 +1,6 @@ import { createLogger } from '@sim/logger' import { generateInternalToken } from '@/lib/auth/internal' +import { DEFAULT_EXECUTION_TIMEOUT_MS } from '@/lib/core/execution-limits' import { secureFetchWithPinnedIP, validateUrlWithDNS } from '@/lib/core/security/input-validation' import { generateRequestId } from '@/lib/core/utils/request' import { getBaseUrl } from '@/lib/core/utils/urls' @@ -625,9 +626,8 @@ async function executeToolRequest( let response: Response if (isInternalRoute) { - // Set up AbortController for timeout support on internal routes const controller = new AbortController() - const timeout = requestParams.timeout || 300000 + const timeout = requestParams.timeout || DEFAULT_EXECUTION_TIMEOUT_MS const timeoutId = setTimeout(() => controller.abort(), timeout) try { diff --git a/apps/sim/tools/utils.ts b/apps/sim/tools/utils.ts index 12ab81772..c20b8720e 100644 --- a/apps/sim/tools/utils.ts +++ b/apps/sim/tools/utils.ts @@ -1,4 +1,5 @@ import { createLogger } from '@sim/logger' +import { getMaxExecutionTimeout } from '@/lib/core/execution-limits' import { getBaseUrl } from '@/lib/core/utils/urls' import { AGENT, isCustomTool } from '@/executor/constants' import { getCustomTool } from '@/hooks/queries/custom-tools' @@ -123,9 +124,7 @@ export function formatRequestParams(tool: ToolConfig, params: Record