mirror of
https://github.com/simstudioai/sim.git
synced 2026-02-03 03:04:57 -05:00
feat(timeouts): execution timeout limits
This commit is contained in:
@@ -21,6 +21,7 @@ import { and, eq } from 'drizzle-orm'
|
||||
import { type NextRequest, NextResponse } from 'next/server'
|
||||
import { checkHybridAuth } from '@/lib/auth/hybrid'
|
||||
import { generateInternalToken } from '@/lib/auth/internal'
|
||||
import { getMaxExecutionTimeout } from '@/lib/core/execution-limits'
|
||||
import { getBaseUrl } from '@/lib/core/utils/urls'
|
||||
|
||||
const logger = createLogger('WorkflowMcpServeAPI')
|
||||
@@ -264,7 +265,7 @@ async function handleToolsCall(
|
||||
method: 'POST',
|
||||
headers,
|
||||
body: JSON.stringify({ input: params.arguments || {}, triggerType: 'mcp' }),
|
||||
signal: AbortSignal.timeout(600000), // 10 minute timeout
|
||||
signal: AbortSignal.timeout(getMaxExecutionTimeout()),
|
||||
})
|
||||
|
||||
const executeResult = await response.json()
|
||||
|
||||
@@ -1,5 +1,7 @@
|
||||
import { createLogger } from '@sim/logger'
|
||||
import type { NextRequest } from 'next/server'
|
||||
import { getHighestPrioritySubscription } from '@/lib/billing/core/plan'
|
||||
import { getExecutionTimeout } from '@/lib/core/execution-limits'
|
||||
import { getParsedBody, withMcpAuth } from '@/lib/mcp/middleware'
|
||||
import { mcpService } from '@/lib/mcp/service'
|
||||
import type { McpTool, McpToolCall, McpToolResult } from '@/lib/mcp/types'
|
||||
@@ -7,7 +9,6 @@ import {
|
||||
categorizeError,
|
||||
createMcpErrorResponse,
|
||||
createMcpSuccessResponse,
|
||||
MCP_CONSTANTS,
|
||||
validateStringParam,
|
||||
} from '@/lib/mcp/utils'
|
||||
|
||||
@@ -171,13 +172,13 @@ export const POST = withMcpAuth('read')(
|
||||
arguments: args,
|
||||
}
|
||||
|
||||
const userSubscription = await getHighestPrioritySubscription(userId)
|
||||
const executionTimeout = getExecutionTimeout(userSubscription?.plan, 'sync')
|
||||
|
||||
const result = await Promise.race([
|
||||
mcpService.executeTool(userId, serverId, toolCall, workspaceId),
|
||||
new Promise<never>((_, reject) =>
|
||||
setTimeout(
|
||||
() => reject(new Error('Tool execution timeout')),
|
||||
MCP_CONSTANTS.EXECUTION_TIMEOUT
|
||||
)
|
||||
setTimeout(() => reject(new Error('Tool execution timeout')), executionTimeout)
|
||||
),
|
||||
])
|
||||
|
||||
|
||||
@@ -3,6 +3,7 @@ import { type NextRequest, NextResponse } from 'next/server'
|
||||
import { v4 as uuidv4 } from 'uuid'
|
||||
import { z } from 'zod'
|
||||
import { checkHybridAuth } from '@/lib/auth/hybrid'
|
||||
import { getTimeoutErrorMessage, isTimeoutError } from '@/lib/core/execution-limits'
|
||||
import { generateRequestId } from '@/lib/core/utils/request'
|
||||
import { SSE_HEADERS } from '@/lib/core/utils/sse'
|
||||
import { markExecutionCancelled } from '@/lib/execution/cancellation'
|
||||
@@ -116,6 +117,16 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
|
||||
const loggingSession = new LoggingSession(workflowId, executionId, 'manual', requestId)
|
||||
const abortController = new AbortController()
|
||||
let isStreamClosed = false
|
||||
let isTimedOut = false
|
||||
|
||||
const syncTimeout = preprocessResult.executionTimeout?.sync
|
||||
let timeoutId: NodeJS.Timeout | undefined
|
||||
if (syncTimeout) {
|
||||
timeoutId = setTimeout(() => {
|
||||
isTimedOut = true
|
||||
abortController.abort()
|
||||
}, syncTimeout)
|
||||
}
|
||||
|
||||
const stream = new ReadableStream<Uint8Array>({
|
||||
async start(controller) {
|
||||
@@ -167,13 +178,33 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
|
||||
})
|
||||
|
||||
if (result.status === 'cancelled') {
|
||||
sendEvent({
|
||||
type: 'execution:cancelled',
|
||||
timestamp: new Date().toISOString(),
|
||||
executionId,
|
||||
workflowId,
|
||||
data: { duration: result.metadata?.duration || 0 },
|
||||
})
|
||||
if (isTimedOut && syncTimeout) {
|
||||
const timeoutErrorMessage = getTimeoutErrorMessage(null, syncTimeout)
|
||||
logger.info(`[${requestId}] Run-from-block execution timed out`, {
|
||||
timeoutMs: syncTimeout,
|
||||
})
|
||||
|
||||
await loggingSession.markAsFailed(timeoutErrorMessage)
|
||||
|
||||
sendEvent({
|
||||
type: 'execution:error',
|
||||
timestamp: new Date().toISOString(),
|
||||
executionId,
|
||||
workflowId,
|
||||
data: {
|
||||
error: timeoutErrorMessage,
|
||||
duration: result.metadata?.duration || 0,
|
||||
},
|
||||
})
|
||||
} else {
|
||||
sendEvent({
|
||||
type: 'execution:cancelled',
|
||||
timestamp: new Date().toISOString(),
|
||||
executionId,
|
||||
workflowId,
|
||||
data: { duration: result.metadata?.duration || 0 },
|
||||
})
|
||||
}
|
||||
} else {
|
||||
sendEvent({
|
||||
type: 'execution:completed',
|
||||
@@ -190,11 +221,25 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
|
||||
})
|
||||
}
|
||||
} catch (error: unknown) {
|
||||
const errorMessage = error instanceof Error ? error.message : 'Unknown error'
|
||||
logger.error(`[${requestId}] Run-from-block execution failed: ${errorMessage}`)
|
||||
const isTimeout = isTimeoutError(error) || isTimedOut
|
||||
const errorMessage = isTimeout
|
||||
? getTimeoutErrorMessage(error, syncTimeout)
|
||||
: error instanceof Error
|
||||
? error.message
|
||||
: 'Unknown error'
|
||||
|
||||
logger.error(`[${requestId}] Run-from-block execution failed: ${errorMessage}`, {
|
||||
isTimeout,
|
||||
})
|
||||
|
||||
const executionResult = hasExecutionResult(error) ? error.executionResult : undefined
|
||||
|
||||
await loggingSession.safeCompleteWithError({
|
||||
totalDurationMs: executionResult?.metadata?.duration,
|
||||
error: { message: errorMessage },
|
||||
traceSpans: executionResult?.logs as any,
|
||||
})
|
||||
|
||||
sendEvent({
|
||||
type: 'execution:error',
|
||||
timestamp: new Date().toISOString(),
|
||||
@@ -206,6 +251,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
|
||||
},
|
||||
})
|
||||
} finally {
|
||||
if (timeoutId) clearTimeout(timeoutId)
|
||||
if (!isStreamClosed) {
|
||||
try {
|
||||
controller.enqueue(new TextEncoder().encode('data: [DONE]\n\n'))
|
||||
@@ -216,6 +262,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
|
||||
},
|
||||
cancel() {
|
||||
isStreamClosed = true
|
||||
if (timeoutId) clearTimeout(timeoutId)
|
||||
abortController.abort()
|
||||
markExecutionCancelled(executionId).catch(() => {})
|
||||
},
|
||||
|
||||
@@ -5,6 +5,7 @@ import { validate as uuidValidate, v4 as uuidv4 } from 'uuid'
|
||||
import { z } from 'zod'
|
||||
import { checkHybridAuth } from '@/lib/auth/hybrid'
|
||||
import { isTriggerDevEnabled } from '@/lib/core/config/feature-flags'
|
||||
import { getTimeoutErrorMessage, isTimeoutError } from '@/lib/core/execution-limits'
|
||||
import { generateRequestId } from '@/lib/core/utils/request'
|
||||
import { SSE_HEADERS } from '@/lib/core/utils/sse'
|
||||
import { getBaseUrl } from '@/lib/core/utils/urls'
|
||||
@@ -120,10 +121,6 @@ type AsyncExecutionParams = {
|
||||
triggerType: CoreTriggerType
|
||||
}
|
||||
|
||||
/**
|
||||
* Handles async workflow execution by queueing a background job.
|
||||
* Returns immediately with a 202 Accepted response containing the job ID.
|
||||
*/
|
||||
async function handleAsyncExecution(params: AsyncExecutionParams): Promise<NextResponse> {
|
||||
const { requestId, workflowId, userId, input, triggerType } = params
|
||||
|
||||
@@ -405,6 +402,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
|
||||
|
||||
if (!enableSSE) {
|
||||
logger.info(`[${requestId}] Using non-SSE execution (direct JSON response)`)
|
||||
const syncTimeout = preprocessResult.executionTimeout?.sync
|
||||
try {
|
||||
const metadata: ExecutionMetadata = {
|
||||
requestId,
|
||||
@@ -438,6 +436,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
|
||||
includeFileBase64,
|
||||
base64MaxBytes,
|
||||
stopAfterBlockId,
|
||||
abortSignal: syncTimeout ? AbortSignal.timeout(syncTimeout) : undefined,
|
||||
})
|
||||
|
||||
const outputWithBase64 = includeFileBase64
|
||||
@@ -473,11 +472,23 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
|
||||
|
||||
return NextResponse.json(filteredResult)
|
||||
} catch (error: unknown) {
|
||||
const errorMessage = error instanceof Error ? error.message : 'Unknown error'
|
||||
logger.error(`[${requestId}] Non-SSE execution failed: ${errorMessage}`)
|
||||
const isTimeout = isTimeoutError(error)
|
||||
const errorMessage = isTimeout
|
||||
? getTimeoutErrorMessage(error, syncTimeout)
|
||||
: error instanceof Error
|
||||
? error.message
|
||||
: 'Unknown error'
|
||||
|
||||
logger.error(`[${requestId}] Non-SSE execution failed: ${errorMessage}`, { isTimeout })
|
||||
|
||||
const executionResult = hasExecutionResult(error) ? error.executionResult : undefined
|
||||
|
||||
await loggingSession.safeCompleteWithError({
|
||||
totalDurationMs: executionResult?.metadata?.duration,
|
||||
error: { message: errorMessage },
|
||||
traceSpans: executionResult?.logs as any,
|
||||
})
|
||||
|
||||
return NextResponse.json(
|
||||
{
|
||||
success: false,
|
||||
@@ -491,7 +502,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
|
||||
}
|
||||
: undefined,
|
||||
},
|
||||
{ status: 500 }
|
||||
{ status: isTimeout ? 408 : 500 }
|
||||
)
|
||||
}
|
||||
}
|
||||
@@ -537,6 +548,16 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
|
||||
const encoder = new TextEncoder()
|
||||
const abortController = new AbortController()
|
||||
let isStreamClosed = false
|
||||
let isTimedOut = false
|
||||
|
||||
const syncTimeout = preprocessResult.executionTimeout?.sync
|
||||
let timeoutId: NodeJS.Timeout | undefined
|
||||
if (syncTimeout) {
|
||||
timeoutId = setTimeout(() => {
|
||||
isTimedOut = true
|
||||
abortController.abort()
|
||||
}, syncTimeout)
|
||||
}
|
||||
|
||||
const stream = new ReadableStream<Uint8Array>({
|
||||
async start(controller) {
|
||||
@@ -763,16 +784,35 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
|
||||
}
|
||||
|
||||
if (result.status === 'cancelled') {
|
||||
logger.info(`[${requestId}] Workflow execution was cancelled`)
|
||||
sendEvent({
|
||||
type: 'execution:cancelled',
|
||||
timestamp: new Date().toISOString(),
|
||||
executionId,
|
||||
workflowId,
|
||||
data: {
|
||||
duration: result.metadata?.duration || 0,
|
||||
},
|
||||
})
|
||||
if (isTimedOut && syncTimeout) {
|
||||
const timeoutErrorMessage = getTimeoutErrorMessage(null, syncTimeout)
|
||||
logger.info(`[${requestId}] Workflow execution timed out`, { timeoutMs: syncTimeout })
|
||||
|
||||
await loggingSession.markAsFailed(timeoutErrorMessage)
|
||||
|
||||
sendEvent({
|
||||
type: 'execution:error',
|
||||
timestamp: new Date().toISOString(),
|
||||
executionId,
|
||||
workflowId,
|
||||
data: {
|
||||
error: timeoutErrorMessage,
|
||||
duration: result.metadata?.duration || 0,
|
||||
},
|
||||
})
|
||||
} else {
|
||||
logger.info(`[${requestId}] Workflow execution was cancelled`)
|
||||
|
||||
sendEvent({
|
||||
type: 'execution:cancelled',
|
||||
timestamp: new Date().toISOString(),
|
||||
executionId,
|
||||
workflowId,
|
||||
data: {
|
||||
duration: result.metadata?.duration || 0,
|
||||
},
|
||||
})
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
@@ -799,11 +839,23 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
|
||||
// Cleanup base64 cache for this execution
|
||||
await cleanupExecutionBase64Cache(executionId)
|
||||
} catch (error: unknown) {
|
||||
const errorMessage = error instanceof Error ? error.message : 'Unknown error'
|
||||
logger.error(`[${requestId}] SSE execution failed: ${errorMessage}`)
|
||||
const isTimeout = isTimeoutError(error) || isTimedOut
|
||||
const errorMessage = isTimeout
|
||||
? getTimeoutErrorMessage(error, syncTimeout)
|
||||
: error instanceof Error
|
||||
? error.message
|
||||
: 'Unknown error'
|
||||
|
||||
logger.error(`[${requestId}] SSE execution failed: ${errorMessage}`, { isTimeout })
|
||||
|
||||
const executionResult = hasExecutionResult(error) ? error.executionResult : undefined
|
||||
|
||||
await loggingSession.safeCompleteWithError({
|
||||
totalDurationMs: executionResult?.metadata?.duration,
|
||||
error: { message: errorMessage },
|
||||
traceSpans: executionResult?.logs as any,
|
||||
})
|
||||
|
||||
sendEvent({
|
||||
type: 'execution:error',
|
||||
timestamp: new Date().toISOString(),
|
||||
@@ -815,18 +867,18 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
|
||||
},
|
||||
})
|
||||
} finally {
|
||||
if (timeoutId) clearTimeout(timeoutId)
|
||||
if (!isStreamClosed) {
|
||||
try {
|
||||
controller.enqueue(encoder.encode('data: [DONE]\n\n'))
|
||||
controller.close()
|
||||
} catch {
|
||||
// Stream already closed - nothing to do
|
||||
}
|
||||
} catch {}
|
||||
}
|
||||
}
|
||||
},
|
||||
cancel() {
|
||||
isStreamClosed = true
|
||||
if (timeoutId) clearTimeout(timeoutId)
|
||||
logger.info(`[${requestId}] Client aborted SSE stream, signalling cancellation`)
|
||||
abortController.abort()
|
||||
markExecutionCancelled(executionId).catch(() => {})
|
||||
|
||||
@@ -27,7 +27,7 @@ import { useExecutionStore } from '@/stores/execution'
|
||||
import { useNotificationStore } from '@/stores/notifications'
|
||||
import { useVariablesStore } from '@/stores/panel'
|
||||
import { useEnvironmentStore } from '@/stores/settings/environment'
|
||||
import { type ConsoleEntry, useTerminalConsoleStore } from '@/stores/terminal'
|
||||
import { useTerminalConsoleStore } from '@/stores/terminal'
|
||||
import { useWorkflowDiffStore } from '@/stores/workflow-diff'
|
||||
import { useWorkflowRegistry } from '@/stores/workflows/registry/store'
|
||||
import { mergeSubblockState } from '@/stores/workflows/utils'
|
||||
@@ -1153,30 +1153,29 @@ export function useWorkflowExecution() {
|
||||
logs: accumulatedBlockLogs,
|
||||
}
|
||||
|
||||
// Only add workflow-level error if no blocks have executed yet
|
||||
// This catches pre-execution errors (validation, serialization, etc.)
|
||||
// Block execution errors are already logged via onBlockError callback
|
||||
const { entries } = useTerminalConsoleStore.getState()
|
||||
const existingLogs = entries.filter(
|
||||
(log: ConsoleEntry) => log.executionId === executionId
|
||||
)
|
||||
if (activeWorkflowId) {
|
||||
cancelRunningEntries(activeWorkflowId)
|
||||
}
|
||||
|
||||
if (existingLogs.length === 0) {
|
||||
// No blocks executed yet - this is a pre-execution error
|
||||
addConsole({
|
||||
input: {},
|
||||
output: {},
|
||||
success: false,
|
||||
error: data.error,
|
||||
durationMs: data.duration || 0,
|
||||
startedAt: new Date(Date.now() - (data.duration || 0)).toISOString(),
|
||||
endedAt: new Date().toISOString(),
|
||||
workflowId: activeWorkflowId,
|
||||
blockId: 'validation',
|
||||
executionId,
|
||||
blockName: 'Workflow Validation',
|
||||
blockType: 'validation',
|
||||
})
|
||||
addConsole({
|
||||
input: {},
|
||||
output: {},
|
||||
success: false,
|
||||
error: data.error,
|
||||
durationMs: data.duration || 0,
|
||||
startedAt: new Date(Date.now() - (data.duration || 0)).toISOString(),
|
||||
endedAt: new Date().toISOString(),
|
||||
workflowId: activeWorkflowId,
|
||||
blockId: 'workflow-error',
|
||||
executionId,
|
||||
blockName: 'Workflow Error',
|
||||
blockType: 'error',
|
||||
})
|
||||
},
|
||||
|
||||
onExecutionCancelled: () => {
|
||||
if (activeWorkflowId) {
|
||||
cancelRunningEntries(activeWorkflowId)
|
||||
}
|
||||
},
|
||||
},
|
||||
@@ -1718,13 +1717,28 @@ export function useWorkflowExecution() {
|
||||
'Workflow was modified. Run the workflow again to enable running from block.',
|
||||
workflowId,
|
||||
})
|
||||
} else {
|
||||
addNotification({
|
||||
level: 'error',
|
||||
message: data.error || 'Run from block failed',
|
||||
workflowId,
|
||||
})
|
||||
}
|
||||
|
||||
cancelRunningEntries(workflowId)
|
||||
|
||||
addConsole({
|
||||
input: {},
|
||||
output: {},
|
||||
success: false,
|
||||
error: data.error,
|
||||
durationMs: data.duration || 0,
|
||||
startedAt: new Date(Date.now() - (data.duration || 0)).toISOString(),
|
||||
endedAt: new Date().toISOString(),
|
||||
workflowId,
|
||||
blockId: 'workflow-error',
|
||||
executionId,
|
||||
blockName: 'Workflow Error',
|
||||
blockType: 'error',
|
||||
})
|
||||
},
|
||||
|
||||
onExecutionCancelled: () => {
|
||||
cancelRunningEntries(workflowId)
|
||||
},
|
||||
},
|
||||
})
|
||||
|
||||
@@ -185,10 +185,16 @@ export const HTTP = {
|
||||
},
|
||||
} as const
|
||||
|
||||
import { getMaxExecutionTimeout } from '@/lib/core/execution-limits'
|
||||
|
||||
export const AGENT = {
|
||||
DEFAULT_MODEL: 'claude-sonnet-4-5',
|
||||
DEFAULT_FUNCTION_TIMEOUT: 600000,
|
||||
REQUEST_TIMEOUT: 600000,
|
||||
get DEFAULT_FUNCTION_TIMEOUT() {
|
||||
return getMaxExecutionTimeout()
|
||||
},
|
||||
get REQUEST_TIMEOUT() {
|
||||
return getMaxExecutionTimeout()
|
||||
},
|
||||
CUSTOM_TOOL_PREFIX: 'custom_',
|
||||
} as const
|
||||
|
||||
|
||||
@@ -14,7 +14,7 @@ const sleep = async (ms: number, options: SleepOptions = {}): Promise<boolean> =
|
||||
const { signal, executionId } = options
|
||||
const useRedis = isRedisCancellationEnabled() && !!executionId
|
||||
|
||||
if (!useRedis && signal?.aborted) {
|
||||
if (signal?.aborted) {
|
||||
return false
|
||||
}
|
||||
|
||||
@@ -27,7 +27,7 @@ const sleep = async (ms: number, options: SleepOptions = {}): Promise<boolean> =
|
||||
const cleanup = () => {
|
||||
if (mainTimeoutId) clearTimeout(mainTimeoutId)
|
||||
if (checkIntervalId) clearInterval(checkIntervalId)
|
||||
if (!useRedis && signal) signal.removeEventListener('abort', onAbort)
|
||||
if (signal) signal.removeEventListener('abort', onAbort)
|
||||
}
|
||||
|
||||
const onAbort = () => {
|
||||
@@ -37,6 +37,10 @@ const sleep = async (ms: number, options: SleepOptions = {}): Promise<boolean> =
|
||||
resolve(false)
|
||||
}
|
||||
|
||||
if (signal) {
|
||||
signal.addEventListener('abort', onAbort, { once: true })
|
||||
}
|
||||
|
||||
if (useRedis) {
|
||||
checkIntervalId = setInterval(async () => {
|
||||
if (resolved) return
|
||||
@@ -49,8 +53,6 @@ const sleep = async (ms: number, options: SleepOptions = {}): Promise<boolean> =
|
||||
}
|
||||
} catch {}
|
||||
}, CANCELLATION_CHECK_INTERVAL_MS)
|
||||
} else if (signal) {
|
||||
signal.addEventListener('abort', onAbort, { once: true })
|
||||
}
|
||||
|
||||
mainTimeoutId = setTimeout(() => {
|
||||
|
||||
@@ -1,7 +1,10 @@
|
||||
export { AGENT_CARD_PATH } from '@a2a-js/sdk'
|
||||
|
||||
import { DEFAULT_EXECUTION_TIMEOUT_MS } from '@/lib/core/execution-limits'
|
||||
|
||||
export const A2A_PROTOCOL_VERSION = '0.3.0'
|
||||
|
||||
export const A2A_DEFAULT_TIMEOUT = 300000
|
||||
export const A2A_DEFAULT_TIMEOUT = DEFAULT_EXECUTION_TIMEOUT_MS
|
||||
|
||||
/**
|
||||
* Maximum number of messages stored per task in the database.
|
||||
|
||||
@@ -5,11 +5,9 @@ import type { ToolUIConfig } from './ui-config'
|
||||
|
||||
const baseToolLogger = createLogger('BaseClientTool')
|
||||
|
||||
/** Default timeout for tool execution (5 minutes) */
|
||||
const DEFAULT_TOOL_TIMEOUT_MS = 2 * 60 * 1000
|
||||
const DEFAULT_TOOL_TIMEOUT_MS = 5 * 60 * 1000
|
||||
|
||||
/** Timeout for tools that run workflows (10 minutes) */
|
||||
export const WORKFLOW_EXECUTION_TIMEOUT_MS = 10 * 60 * 1000
|
||||
export const WORKFLOW_EXECUTION_TIMEOUT_MS = 5 * 60 * 1000
|
||||
|
||||
// Client tool call states used by the new runtime
|
||||
export enum ClientToolCallState {
|
||||
|
||||
@@ -170,6 +170,11 @@ export const env = createEnv({
|
||||
RATE_LIMIT_ENTERPRISE_SYNC: z.string().optional().default('600'), // Enterprise tier sync API executions per minute
|
||||
RATE_LIMIT_ENTERPRISE_ASYNC: z.string().optional().default('5000'), // Enterprise tier async API executions per minute
|
||||
|
||||
EXECUTION_TIMEOUT_FREE: z.string().optional().default('300'),
|
||||
EXECUTION_TIMEOUT_PRO: z.string().optional().default('3600'),
|
||||
EXECUTION_TIMEOUT_TEAM: z.string().optional().default('3600'),
|
||||
EXECUTION_TIMEOUT_ENTERPRISE: z.string().optional().default('3600'),
|
||||
|
||||
// Knowledge Base Processing Configuration - Shared across all processing methods
|
||||
KB_CONFIG_MAX_DURATION: z.number().optional().default(600), // Max processing duration in seconds (10 minutes)
|
||||
KB_CONFIG_MAX_ATTEMPTS: z.number().optional().default(3), // Max retry attempts
|
||||
|
||||
1
apps/sim/lib/core/execution-limits/index.ts
Normal file
1
apps/sim/lib/core/execution-limits/index.ts
Normal file
@@ -0,0 +1 @@
|
||||
export * from './types'
|
||||
122
apps/sim/lib/core/execution-limits/types.ts
Normal file
122
apps/sim/lib/core/execution-limits/types.ts
Normal file
@@ -0,0 +1,122 @@
|
||||
import { env } from '@/lib/core/config/env'
|
||||
import type { SubscriptionPlan } from '@/lib/core/rate-limiter/types'
|
||||
|
||||
export interface ExecutionTimeoutConfig {
|
||||
sync: number
|
||||
async: number
|
||||
}
|
||||
|
||||
const DEFAULT_SYNC_TIMEOUTS = {
|
||||
free: 300,
|
||||
pro: 3600,
|
||||
team: 3600,
|
||||
enterprise: 3600,
|
||||
} as const
|
||||
|
||||
const ASYNC_TIMEOUT_SECONDS = 5400
|
||||
|
||||
function getSyncTimeoutForPlan(plan: SubscriptionPlan): number {
|
||||
const envVarMap: Record<SubscriptionPlan, string | undefined> = {
|
||||
free: env.EXECUTION_TIMEOUT_FREE,
|
||||
pro: env.EXECUTION_TIMEOUT_PRO,
|
||||
team: env.EXECUTION_TIMEOUT_TEAM,
|
||||
enterprise: env.EXECUTION_TIMEOUT_ENTERPRISE,
|
||||
}
|
||||
return (Number.parseInt(envVarMap[plan] || '') || DEFAULT_SYNC_TIMEOUTS[plan]) * 1000
|
||||
}
|
||||
|
||||
export const EXECUTION_TIMEOUTS: Record<SubscriptionPlan, ExecutionTimeoutConfig> = {
|
||||
free: {
|
||||
sync: getSyncTimeoutForPlan('free'),
|
||||
async: ASYNC_TIMEOUT_SECONDS * 1000,
|
||||
},
|
||||
pro: {
|
||||
sync: getSyncTimeoutForPlan('pro'),
|
||||
async: ASYNC_TIMEOUT_SECONDS * 1000,
|
||||
},
|
||||
team: {
|
||||
sync: getSyncTimeoutForPlan('team'),
|
||||
async: ASYNC_TIMEOUT_SECONDS * 1000,
|
||||
},
|
||||
enterprise: {
|
||||
sync: getSyncTimeoutForPlan('enterprise'),
|
||||
async: ASYNC_TIMEOUT_SECONDS * 1000,
|
||||
},
|
||||
}
|
||||
|
||||
export function getExecutionTimeout(
|
||||
plan: SubscriptionPlan | undefined,
|
||||
type: 'sync' | 'async' = 'sync'
|
||||
): number {
|
||||
return EXECUTION_TIMEOUTS[plan || 'free'][type]
|
||||
}
|
||||
|
||||
export function getExecutionTimeoutSeconds(
|
||||
plan: SubscriptionPlan | undefined,
|
||||
type: 'sync' | 'async' = 'sync'
|
||||
): number {
|
||||
return Math.floor(getExecutionTimeout(plan, type) / 1000)
|
||||
}
|
||||
|
||||
export function getMaxExecutionTimeout(): number {
|
||||
return EXECUTION_TIMEOUTS.enterprise.async
|
||||
}
|
||||
|
||||
export const DEFAULT_EXECUTION_TIMEOUT_MS = EXECUTION_TIMEOUTS.free.sync
|
||||
|
||||
export class ExecutionTimeoutError extends Error {
|
||||
constructor(
|
||||
public readonly timeoutMs: number,
|
||||
public readonly plan?: SubscriptionPlan
|
||||
) {
|
||||
const timeoutSeconds = Math.floor(timeoutMs / 1000)
|
||||
const timeoutMinutes = Math.floor(timeoutSeconds / 60)
|
||||
const displayTime =
|
||||
timeoutMinutes > 0
|
||||
? `${timeoutMinutes} minute${timeoutMinutes > 1 ? 's' : ''}`
|
||||
: `${timeoutSeconds} seconds`
|
||||
super(`Execution timed out after ${displayTime}`)
|
||||
this.name = 'ExecutionTimeoutError'
|
||||
}
|
||||
}
|
||||
|
||||
export function isTimeoutError(error: unknown): boolean {
|
||||
if (error instanceof ExecutionTimeoutError) return true
|
||||
if (!(error instanceof Error)) return false
|
||||
|
||||
const name = error.name.toLowerCase()
|
||||
const message = error.message.toLowerCase()
|
||||
|
||||
return (
|
||||
name === 'timeouterror' ||
|
||||
name === 'aborterror' ||
|
||||
message.includes('timeout') ||
|
||||
message.includes('timed out') ||
|
||||
message.includes('aborted')
|
||||
)
|
||||
}
|
||||
|
||||
export function createTimeoutError(
|
||||
timeoutMs: number,
|
||||
plan?: SubscriptionPlan
|
||||
): ExecutionTimeoutError {
|
||||
return new ExecutionTimeoutError(timeoutMs, plan)
|
||||
}
|
||||
|
||||
export function getTimeoutErrorMessage(error: unknown, timeoutMs?: number): string {
|
||||
if (error instanceof ExecutionTimeoutError) {
|
||||
return error.message
|
||||
}
|
||||
|
||||
if (timeoutMs) {
|
||||
const timeoutSeconds = Math.floor(timeoutMs / 1000)
|
||||
const timeoutMinutes = Math.floor(timeoutSeconds / 60)
|
||||
const displayTime =
|
||||
timeoutMinutes > 0
|
||||
? `${timeoutMinutes} minute${timeoutMinutes > 1 ? 's' : ''}`
|
||||
: `${timeoutSeconds} seconds`
|
||||
return `Execution timed out after ${displayTime}`
|
||||
}
|
||||
|
||||
return 'Execution timed out'
|
||||
}
|
||||
@@ -1,7 +1,3 @@
|
||||
/**
|
||||
* Execution timeout constants
|
||||
*
|
||||
* DEFAULT_EXECUTION_TIMEOUT_MS: The default timeout for executing user code (10 minutes)
|
||||
*/
|
||||
import { DEFAULT_EXECUTION_TIMEOUT_MS } from '@/lib/core/execution-limits'
|
||||
|
||||
export const DEFAULT_EXECUTION_TIMEOUT_MS = 600000 // 10 minutes (600 seconds)
|
||||
export { DEFAULT_EXECUTION_TIMEOUT_MS }
|
||||
|
||||
@@ -4,6 +4,7 @@ import { createLogger } from '@sim/logger'
|
||||
import { eq } from 'drizzle-orm'
|
||||
import { checkServerSideUsageLimits } from '@/lib/billing/calculations/usage-monitor'
|
||||
import { getHighestPrioritySubscription } from '@/lib/billing/core/subscription'
|
||||
import { getExecutionTimeout } from '@/lib/core/execution-limits'
|
||||
import { RateLimiter } from '@/lib/core/rate-limiter/rate-limiter'
|
||||
import { LoggingSession } from '@/lib/logs/execution/logging-session'
|
||||
import { getWorkspaceBilledAccountUserId } from '@/lib/workspaces/utils'
|
||||
@@ -133,10 +134,10 @@ export interface PreprocessExecutionResult {
|
||||
success: boolean
|
||||
error?: {
|
||||
message: string
|
||||
statusCode: number // HTTP status code (401, 402, 403, 404, 429, 500)
|
||||
logCreated: boolean // Whether error was logged to execution_logs
|
||||
statusCode: number
|
||||
logCreated: boolean
|
||||
}
|
||||
actorUserId?: string // The user ID that will be billed
|
||||
actorUserId?: string
|
||||
workflowRecord?: WorkflowRecord
|
||||
userSubscription?: SubscriptionInfo | null
|
||||
rateLimitInfo?: {
|
||||
@@ -144,6 +145,10 @@ export interface PreprocessExecutionResult {
|
||||
remaining: number
|
||||
resetAt: Date
|
||||
}
|
||||
executionTimeout?: {
|
||||
sync: number
|
||||
async: number
|
||||
}
|
||||
}
|
||||
|
||||
type WorkflowRecord = typeof workflow.$inferSelect
|
||||
@@ -484,12 +489,17 @@ export async function preprocessExecution(
|
||||
triggerType,
|
||||
})
|
||||
|
||||
const plan = userSubscription?.plan
|
||||
return {
|
||||
success: true,
|
||||
actorUserId,
|
||||
workflowRecord,
|
||||
userSubscription,
|
||||
rateLimitInfo,
|
||||
executionTimeout: {
|
||||
sync: getExecutionTimeout(plan, 'sync'),
|
||||
async: getExecutionTimeout(plan, 'async'),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -776,11 +776,16 @@ export class LoggingSession {
|
||||
await db
|
||||
.update(workflowExecutionLogs)
|
||||
.set({
|
||||
level: 'error',
|
||||
status: 'failed',
|
||||
executionData: sql`jsonb_set(
|
||||
COALESCE(execution_data, '{}'::jsonb),
|
||||
ARRAY['error'],
|
||||
to_jsonb(${message}::text)
|
||||
jsonb_set(
|
||||
COALESCE(execution_data, '{}'::jsonb),
|
||||
ARRAY['error'],
|
||||
to_jsonb(${message}::text)
|
||||
),
|
||||
ARRAY['finalOutput'],
|
||||
jsonb_build_object('error', ${message}::text)
|
||||
)`,
|
||||
})
|
||||
.where(eq(workflowExecutionLogs.executionId, executionId))
|
||||
|
||||
@@ -12,6 +12,7 @@ import { Client } from '@modelcontextprotocol/sdk/client/index.js'
|
||||
import { StreamableHTTPClientTransport } from '@modelcontextprotocol/sdk/client/streamableHttp.js'
|
||||
import type { ListToolsResult, Tool } from '@modelcontextprotocol/sdk/types.js'
|
||||
import { createLogger } from '@sim/logger'
|
||||
import { getMaxExecutionTimeout } from '@/lib/core/execution-limits'
|
||||
import {
|
||||
McpConnectionError,
|
||||
type McpConnectionStatus,
|
||||
@@ -202,7 +203,7 @@ export class McpClient {
|
||||
const sdkResult = await this.client.callTool(
|
||||
{ name: toolCall.name, arguments: toolCall.arguments },
|
||||
undefined,
|
||||
{ timeout: 600000 } // 10 minutes - override SDK's 60s default
|
||||
{ timeout: getMaxExecutionTimeout() }
|
||||
)
|
||||
|
||||
return sdkResult as McpToolResult
|
||||
|
||||
@@ -34,7 +34,7 @@ export function sanitizeHeaders(
|
||||
* Client-safe MCP constants
|
||||
*/
|
||||
export const MCP_CLIENT_CONSTANTS = {
|
||||
CLIENT_TIMEOUT: 600000,
|
||||
CLIENT_TIMEOUT: 5 * 60 * 1000,
|
||||
MAX_RETRIES: 3,
|
||||
RECONNECT_DELAY: 1000,
|
||||
} as const
|
||||
|
||||
@@ -81,8 +81,8 @@ describe('generateMcpServerId', () => {
|
||||
})
|
||||
|
||||
describe('MCP_CONSTANTS', () => {
|
||||
it.concurrent('has correct execution timeout (10 minutes)', () => {
|
||||
expect(MCP_CONSTANTS.EXECUTION_TIMEOUT).toBe(600000)
|
||||
it.concurrent('has correct execution timeout (5 minutes)', () => {
|
||||
expect(MCP_CONSTANTS.EXECUTION_TIMEOUT).toBe(300000)
|
||||
})
|
||||
|
||||
it.concurrent('has correct cache timeout (5 minutes)', () => {
|
||||
@@ -107,8 +107,8 @@ describe('MCP_CONSTANTS', () => {
|
||||
})
|
||||
|
||||
describe('MCP_CLIENT_CONSTANTS', () => {
|
||||
it.concurrent('has correct client timeout (10 minutes)', () => {
|
||||
expect(MCP_CLIENT_CONSTANTS.CLIENT_TIMEOUT).toBe(600000)
|
||||
it.concurrent('has correct client timeout (5 minutes)', () => {
|
||||
expect(MCP_CLIENT_CONSTANTS.CLIENT_TIMEOUT).toBe(300000)
|
||||
})
|
||||
|
||||
it.concurrent('has correct auto refresh interval (5 minutes)', () => {
|
||||
|
||||
@@ -1,12 +1,11 @@
|
||||
import { NextResponse } from 'next/server'
|
||||
import { DEFAULT_EXECUTION_TIMEOUT_MS, getExecutionTimeout } from '@/lib/core/execution-limits'
|
||||
import type { SubscriptionPlan } from '@/lib/core/rate-limiter/types'
|
||||
import type { McpApiResponse } from '@/lib/mcp/types'
|
||||
import { isMcpTool, MCP } from '@/executor/constants'
|
||||
|
||||
/**
|
||||
* MCP-specific constants
|
||||
*/
|
||||
export const MCP_CONSTANTS = {
|
||||
EXECUTION_TIMEOUT: 600000,
|
||||
EXECUTION_TIMEOUT: DEFAULT_EXECUTION_TIMEOUT_MS,
|
||||
CACHE_TIMEOUT: 5 * 60 * 1000,
|
||||
DEFAULT_RETRIES: 3,
|
||||
DEFAULT_CONNECTION_TIMEOUT: 30000,
|
||||
@@ -14,6 +13,10 @@ export const MCP_CONSTANTS = {
|
||||
MAX_CONSECUTIVE_FAILURES: 3,
|
||||
} as const
|
||||
|
||||
export function getMcpExecutionTimeout(plan?: SubscriptionPlan): number {
|
||||
return getExecutionTimeout(plan, 'sync')
|
||||
}
|
||||
|
||||
/**
|
||||
* Core MCP tool parameter keys that are metadata, not user-entered test values.
|
||||
* These should be preserved when cleaning up params during schema updates.
|
||||
@@ -45,11 +48,8 @@ export function sanitizeHeaders(
|
||||
)
|
||||
}
|
||||
|
||||
/**
|
||||
* Client-safe MCP constants
|
||||
*/
|
||||
export const MCP_CLIENT_CONSTANTS = {
|
||||
CLIENT_TIMEOUT: 600000,
|
||||
CLIENT_TIMEOUT: DEFAULT_EXECUTION_TIMEOUT_MS,
|
||||
AUTO_REFRESH_INTERVAL: 5 * 60 * 1000,
|
||||
} as const
|
||||
|
||||
|
||||
@@ -62,9 +62,6 @@ export interface ExecutionErrorEvent extends BaseExecutionEvent {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Execution cancelled event
|
||||
*/
|
||||
export interface ExecutionCancelledEvent extends BaseExecutionEvent {
|
||||
type: 'execution:cancelled'
|
||||
workflowId: string
|
||||
@@ -171,9 +168,6 @@ export type ExecutionEvent =
|
||||
| StreamChunkEvent
|
||||
| StreamDoneEvent
|
||||
|
||||
/**
|
||||
* Extracted data types for use in callbacks
|
||||
*/
|
||||
export type ExecutionStartedData = ExecutionStartedEvent['data']
|
||||
export type ExecutionCompletedData = ExecutionCompletedEvent['data']
|
||||
export type ExecutionErrorData = ExecutionErrorEvent['data']
|
||||
|
||||
@@ -1,8 +1,9 @@
|
||||
import { DEFAULT_EXECUTION_TIMEOUT_MS } from '@/lib/core/execution-limits'
|
||||
import type { RunActorParams, RunActorResult } from '@/tools/apify/types'
|
||||
import type { ToolConfig } from '@/tools/types'
|
||||
|
||||
const POLL_INTERVAL_MS = 5000 // 5 seconds between polls
|
||||
const MAX_POLL_TIME_MS = 300000 // 5 minutes maximum polling time
|
||||
const POLL_INTERVAL_MS = 5000
|
||||
const MAX_POLL_TIME_MS = DEFAULT_EXECUTION_TIMEOUT_MS
|
||||
|
||||
export const apifyRunActorAsyncTool: ToolConfig<RunActorParams, RunActorResult> = {
|
||||
id: 'apify_run_actor_async',
|
||||
|
||||
@@ -1,11 +1,12 @@
|
||||
import { createLogger } from '@sim/logger'
|
||||
import { DEFAULT_EXECUTION_TIMEOUT_MS } from '@/lib/core/execution-limits'
|
||||
import type { BrowserUseRunTaskParams, BrowserUseRunTaskResponse } from '@/tools/browser_use/types'
|
||||
import type { ToolConfig, ToolResponse } from '@/tools/types'
|
||||
|
||||
const logger = createLogger('BrowserUseTool')
|
||||
|
||||
const POLL_INTERVAL_MS = 5000
|
||||
const MAX_POLL_TIME_MS = 600000 // 10 minutes
|
||||
const MAX_POLL_TIME_MS = DEFAULT_EXECUTION_TIMEOUT_MS
|
||||
const MAX_CONSECUTIVE_ERRORS = 3
|
||||
|
||||
async function createSessionWithProfile(
|
||||
|
||||
@@ -1,11 +1,12 @@
|
||||
import { createLogger } from '@sim/logger'
|
||||
import { DEFAULT_EXECUTION_TIMEOUT_MS } from '@/lib/core/execution-limits'
|
||||
import type { ExaResearchParams, ExaResearchResponse } from '@/tools/exa/types'
|
||||
import type { ToolConfig } from '@/tools/types'
|
||||
|
||||
const logger = createLogger('ExaResearchTool')
|
||||
|
||||
const POLL_INTERVAL_MS = 5000 // 5 seconds between polls
|
||||
const MAX_POLL_TIME_MS = 300000 // 5 minutes maximum polling time
|
||||
const POLL_INTERVAL_MS = 5000
|
||||
const MAX_POLL_TIME_MS = DEFAULT_EXECUTION_TIMEOUT_MS
|
||||
|
||||
export const researchTool: ToolConfig<ExaResearchParams, ExaResearchResponse> = {
|
||||
id: 'exa_research',
|
||||
|
||||
@@ -1,11 +1,12 @@
|
||||
import { createLogger } from '@sim/logger'
|
||||
import { DEFAULT_EXECUTION_TIMEOUT_MS } from '@/lib/core/execution-limits'
|
||||
import type { AgentParams, AgentResponse } from '@/tools/firecrawl/types'
|
||||
import type { ToolConfig } from '@/tools/types'
|
||||
|
||||
const logger = createLogger('FirecrawlAgentTool')
|
||||
|
||||
const POLL_INTERVAL_MS = 5000 // 5 seconds between polls
|
||||
const MAX_POLL_TIME_MS = 300000 // 5 minutes maximum polling time
|
||||
const POLL_INTERVAL_MS = 5000
|
||||
const MAX_POLL_TIME_MS = DEFAULT_EXECUTION_TIMEOUT_MS
|
||||
|
||||
export const agentTool: ToolConfig<AgentParams, AgentResponse> = {
|
||||
id: 'firecrawl_agent',
|
||||
|
||||
@@ -1,12 +1,13 @@
|
||||
import { createLogger } from '@sim/logger'
|
||||
import { DEFAULT_EXECUTION_TIMEOUT_MS } from '@/lib/core/execution-limits'
|
||||
import type { FirecrawlCrawlParams, FirecrawlCrawlResponse } from '@/tools/firecrawl/types'
|
||||
import { CRAWLED_PAGE_OUTPUT_PROPERTIES } from '@/tools/firecrawl/types'
|
||||
import type { ToolConfig } from '@/tools/types'
|
||||
|
||||
const logger = createLogger('FirecrawlCrawlTool')
|
||||
|
||||
const POLL_INTERVAL_MS = 5000 // 5 seconds between polls
|
||||
const MAX_POLL_TIME_MS = 300000 // 5 minutes maximum polling time
|
||||
const POLL_INTERVAL_MS = 5000
|
||||
const MAX_POLL_TIME_MS = DEFAULT_EXECUTION_TIMEOUT_MS
|
||||
|
||||
export const crawlTool: ToolConfig<FirecrawlCrawlParams, FirecrawlCrawlResponse> = {
|
||||
id: 'firecrawl_crawl',
|
||||
|
||||
@@ -1,11 +1,12 @@
|
||||
import { createLogger } from '@sim/logger'
|
||||
import { DEFAULT_EXECUTION_TIMEOUT_MS } from '@/lib/core/execution-limits'
|
||||
import type { ExtractParams, ExtractResponse } from '@/tools/firecrawl/types'
|
||||
import type { ToolConfig } from '@/tools/types'
|
||||
|
||||
const logger = createLogger('FirecrawlExtractTool')
|
||||
|
||||
const POLL_INTERVAL_MS = 5000 // 5 seconds between polls
|
||||
const MAX_POLL_TIME_MS = 300000 // 5 minutes maximum polling time
|
||||
const POLL_INTERVAL_MS = 5000
|
||||
const MAX_POLL_TIME_MS = DEFAULT_EXECUTION_TIMEOUT_MS
|
||||
|
||||
export const extractTool: ToolConfig<ExtractParams, ExtractResponse> = {
|
||||
id: 'firecrawl_extract',
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
import { createLogger } from '@sim/logger'
|
||||
import { generateInternalToken } from '@/lib/auth/internal'
|
||||
import { DEFAULT_EXECUTION_TIMEOUT_MS } from '@/lib/core/execution-limits'
|
||||
import { secureFetchWithPinnedIP, validateUrlWithDNS } from '@/lib/core/security/input-validation'
|
||||
import { generateRequestId } from '@/lib/core/utils/request'
|
||||
import { getBaseUrl } from '@/lib/core/utils/urls'
|
||||
@@ -625,9 +626,8 @@ async function executeToolRequest(
|
||||
let response: Response
|
||||
|
||||
if (isInternalRoute) {
|
||||
// Set up AbortController for timeout support on internal routes
|
||||
const controller = new AbortController()
|
||||
const timeout = requestParams.timeout || 300000
|
||||
const timeout = requestParams.timeout || DEFAULT_EXECUTION_TIMEOUT_MS
|
||||
const timeoutId = setTimeout(() => controller.abort(), timeout)
|
||||
|
||||
try {
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
import { createLogger } from '@sim/logger'
|
||||
import { getMaxExecutionTimeout } from '@/lib/core/execution-limits'
|
||||
import { getBaseUrl } from '@/lib/core/utils/urls'
|
||||
import { AGENT, isCustomTool } from '@/executor/constants'
|
||||
import { getCustomTool } from '@/hooks/queries/custom-tools'
|
||||
@@ -123,9 +124,7 @@ export function formatRequestParams(tool: ToolConfig, params: Record<string, any
|
||||
}
|
||||
}
|
||||
|
||||
// Get timeout from params (if specified) and validate
|
||||
// Must be a finite positive number, max 600000ms (10 minutes) as documented
|
||||
const MAX_TIMEOUT_MS = 600000
|
||||
const MAX_TIMEOUT_MS = getMaxExecutionTimeout()
|
||||
const rawTimeout = params.timeout
|
||||
const timeout = rawTimeout != null ? Number(rawTimeout) : undefined
|
||||
const validTimeout =
|
||||
|
||||
@@ -6,7 +6,7 @@ export default defineConfig({
|
||||
project: env.TRIGGER_PROJECT_ID!,
|
||||
runtime: 'node',
|
||||
logLevel: 'log',
|
||||
maxDuration: 600,
|
||||
maxDuration: 5400,
|
||||
retries: {
|
||||
enabledInDev: false,
|
||||
default: {
|
||||
|
||||
Reference in New Issue
Block a user