Compare commits

..

2 Commits

Author SHA1 Message Date
Vikhyath Mondreti
501b44e05a fix type issues 2026-02-02 19:53:12 -08:00
Vikhyath Mondreti
7c1e7273de feat(timeouts): execution timeout limits 2026-02-02 19:24:09 -08:00
33 changed files with 482 additions and 567 deletions

View File

@@ -21,6 +21,7 @@ import { and, eq } from 'drizzle-orm'
import { type NextRequest, NextResponse } from 'next/server'
import { checkHybridAuth } from '@/lib/auth/hybrid'
import { generateInternalToken } from '@/lib/auth/internal'
import { getMaxExecutionTimeout } from '@/lib/core/execution-limits'
import { getBaseUrl } from '@/lib/core/utils/urls'
const logger = createLogger('WorkflowMcpServeAPI')
@@ -264,7 +265,7 @@ async function handleToolsCall(
method: 'POST',
headers,
body: JSON.stringify({ input: params.arguments || {}, triggerType: 'mcp' }),
signal: AbortSignal.timeout(600000), // 10 minute timeout
signal: AbortSignal.timeout(getMaxExecutionTimeout()),
})
const executeResult = await response.json()

View File

@@ -1,5 +1,8 @@
import { createLogger } from '@sim/logger'
import type { NextRequest } from 'next/server'
import { getHighestPrioritySubscription } from '@/lib/billing/core/plan'
import { getExecutionTimeout } from '@/lib/core/execution-limits'
import type { SubscriptionPlan } from '@/lib/core/rate-limiter/types'
import { getParsedBody, withMcpAuth } from '@/lib/mcp/middleware'
import { mcpService } from '@/lib/mcp/service'
import type { McpTool, McpToolCall, McpToolResult } from '@/lib/mcp/types'
@@ -7,7 +10,6 @@ import {
categorizeError,
createMcpErrorResponse,
createMcpSuccessResponse,
MCP_CONSTANTS,
validateStringParam,
} from '@/lib/mcp/utils'
@@ -171,13 +173,16 @@ export const POST = withMcpAuth('read')(
arguments: args,
}
const userSubscription = await getHighestPrioritySubscription(userId)
const executionTimeout = getExecutionTimeout(
userSubscription?.plan as SubscriptionPlan | undefined,
'sync'
)
const result = await Promise.race([
mcpService.executeTool(userId, serverId, toolCall, workspaceId),
new Promise<never>((_, reject) =>
setTimeout(
() => reject(new Error('Tool execution timeout')),
MCP_CONSTANTS.EXECUTION_TIMEOUT
)
setTimeout(() => reject(new Error('Tool execution timeout')), executionTimeout)
),
])

View File

@@ -3,6 +3,7 @@ import { type NextRequest, NextResponse } from 'next/server'
import { v4 as uuidv4 } from 'uuid'
import { z } from 'zod'
import { checkHybridAuth } from '@/lib/auth/hybrid'
import { getTimeoutErrorMessage, isTimeoutError } from '@/lib/core/execution-limits'
import { generateRequestId } from '@/lib/core/utils/request'
import { SSE_HEADERS } from '@/lib/core/utils/sse'
import { markExecutionCancelled } from '@/lib/execution/cancellation'
@@ -116,6 +117,16 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
const loggingSession = new LoggingSession(workflowId, executionId, 'manual', requestId)
const abortController = new AbortController()
let isStreamClosed = false
let isTimedOut = false
const syncTimeout = preprocessResult.executionTimeout?.sync
let timeoutId: NodeJS.Timeout | undefined
if (syncTimeout) {
timeoutId = setTimeout(() => {
isTimedOut = true
abortController.abort()
}, syncTimeout)
}
const stream = new ReadableStream<Uint8Array>({
async start(controller) {
@@ -167,13 +178,33 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
})
if (result.status === 'cancelled') {
sendEvent({
type: 'execution:cancelled',
timestamp: new Date().toISOString(),
executionId,
workflowId,
data: { duration: result.metadata?.duration || 0 },
})
if (isTimedOut && syncTimeout) {
const timeoutErrorMessage = getTimeoutErrorMessage(null, syncTimeout)
logger.info(`[${requestId}] Run-from-block execution timed out`, {
timeoutMs: syncTimeout,
})
await loggingSession.markAsFailed(timeoutErrorMessage)
sendEvent({
type: 'execution:error',
timestamp: new Date().toISOString(),
executionId,
workflowId,
data: {
error: timeoutErrorMessage,
duration: result.metadata?.duration || 0,
},
})
} else {
sendEvent({
type: 'execution:cancelled',
timestamp: new Date().toISOString(),
executionId,
workflowId,
data: { duration: result.metadata?.duration || 0 },
})
}
} else {
sendEvent({
type: 'execution:completed',
@@ -190,11 +221,25 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
})
}
} catch (error: unknown) {
const errorMessage = error instanceof Error ? error.message : 'Unknown error'
logger.error(`[${requestId}] Run-from-block execution failed: ${errorMessage}`)
const isTimeout = isTimeoutError(error) || isTimedOut
const errorMessage = isTimeout
? getTimeoutErrorMessage(error, syncTimeout)
: error instanceof Error
? error.message
: 'Unknown error'
logger.error(`[${requestId}] Run-from-block execution failed: ${errorMessage}`, {
isTimeout,
})
const executionResult = hasExecutionResult(error) ? error.executionResult : undefined
await loggingSession.safeCompleteWithError({
totalDurationMs: executionResult?.metadata?.duration,
error: { message: errorMessage },
traceSpans: executionResult?.logs as any,
})
sendEvent({
type: 'execution:error',
timestamp: new Date().toISOString(),
@@ -206,6 +251,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
},
})
} finally {
if (timeoutId) clearTimeout(timeoutId)
if (!isStreamClosed) {
try {
controller.enqueue(new TextEncoder().encode('data: [DONE]\n\n'))
@@ -216,6 +262,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
},
cancel() {
isStreamClosed = true
if (timeoutId) clearTimeout(timeoutId)
abortController.abort()
markExecutionCancelled(executionId).catch(() => {})
},

View File

@@ -5,6 +5,7 @@ import { validate as uuidValidate, v4 as uuidv4 } from 'uuid'
import { z } from 'zod'
import { checkHybridAuth } from '@/lib/auth/hybrid'
import { isTriggerDevEnabled } from '@/lib/core/config/feature-flags'
import { getTimeoutErrorMessage, isTimeoutError } from '@/lib/core/execution-limits'
import { generateRequestId } from '@/lib/core/utils/request'
import { SSE_HEADERS } from '@/lib/core/utils/sse'
import { getBaseUrl } from '@/lib/core/utils/urls'
@@ -120,10 +121,6 @@ type AsyncExecutionParams = {
triggerType: CoreTriggerType
}
/**
* Handles async workflow execution by queueing a background job.
* Returns immediately with a 202 Accepted response containing the job ID.
*/
async function handleAsyncExecution(params: AsyncExecutionParams): Promise<NextResponse> {
const { requestId, workflowId, userId, input, triggerType } = params
@@ -405,6 +402,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
if (!enableSSE) {
logger.info(`[${requestId}] Using non-SSE execution (direct JSON response)`)
const syncTimeout = preprocessResult.executionTimeout?.sync
try {
const metadata: ExecutionMetadata = {
requestId,
@@ -438,6 +436,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
includeFileBase64,
base64MaxBytes,
stopAfterBlockId,
abortSignal: syncTimeout ? AbortSignal.timeout(syncTimeout) : undefined,
})
const outputWithBase64 = includeFileBase64
@@ -473,11 +472,23 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
return NextResponse.json(filteredResult)
} catch (error: unknown) {
const errorMessage = error instanceof Error ? error.message : 'Unknown error'
logger.error(`[${requestId}] Non-SSE execution failed: ${errorMessage}`)
const isTimeout = isTimeoutError(error)
const errorMessage = isTimeout
? getTimeoutErrorMessage(error, syncTimeout)
: error instanceof Error
? error.message
: 'Unknown error'
logger.error(`[${requestId}] Non-SSE execution failed: ${errorMessage}`, { isTimeout })
const executionResult = hasExecutionResult(error) ? error.executionResult : undefined
await loggingSession.safeCompleteWithError({
totalDurationMs: executionResult?.metadata?.duration,
error: { message: errorMessage },
traceSpans: executionResult?.logs as any,
})
return NextResponse.json(
{
success: false,
@@ -491,7 +502,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
}
: undefined,
},
{ status: 500 }
{ status: isTimeout ? 408 : 500 }
)
}
}
@@ -537,6 +548,16 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
const encoder = new TextEncoder()
const abortController = new AbortController()
let isStreamClosed = false
let isTimedOut = false
const syncTimeout = preprocessResult.executionTimeout?.sync
let timeoutId: NodeJS.Timeout | undefined
if (syncTimeout) {
timeoutId = setTimeout(() => {
isTimedOut = true
abortController.abort()
}, syncTimeout)
}
const stream = new ReadableStream<Uint8Array>({
async start(controller) {
@@ -763,16 +784,35 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
}
if (result.status === 'cancelled') {
logger.info(`[${requestId}] Workflow execution was cancelled`)
sendEvent({
type: 'execution:cancelled',
timestamp: new Date().toISOString(),
executionId,
workflowId,
data: {
duration: result.metadata?.duration || 0,
},
})
if (isTimedOut && syncTimeout) {
const timeoutErrorMessage = getTimeoutErrorMessage(null, syncTimeout)
logger.info(`[${requestId}] Workflow execution timed out`, { timeoutMs: syncTimeout })
await loggingSession.markAsFailed(timeoutErrorMessage)
sendEvent({
type: 'execution:error',
timestamp: new Date().toISOString(),
executionId,
workflowId,
data: {
error: timeoutErrorMessage,
duration: result.metadata?.duration || 0,
},
})
} else {
logger.info(`[${requestId}] Workflow execution was cancelled`)
sendEvent({
type: 'execution:cancelled',
timestamp: new Date().toISOString(),
executionId,
workflowId,
data: {
duration: result.metadata?.duration || 0,
},
})
}
return
}
@@ -799,11 +839,23 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
// Cleanup base64 cache for this execution
await cleanupExecutionBase64Cache(executionId)
} catch (error: unknown) {
const errorMessage = error instanceof Error ? error.message : 'Unknown error'
logger.error(`[${requestId}] SSE execution failed: ${errorMessage}`)
const isTimeout = isTimeoutError(error) || isTimedOut
const errorMessage = isTimeout
? getTimeoutErrorMessage(error, syncTimeout)
: error instanceof Error
? error.message
: 'Unknown error'
logger.error(`[${requestId}] SSE execution failed: ${errorMessage}`, { isTimeout })
const executionResult = hasExecutionResult(error) ? error.executionResult : undefined
await loggingSession.safeCompleteWithError({
totalDurationMs: executionResult?.metadata?.duration,
error: { message: errorMessage },
traceSpans: executionResult?.logs as any,
})
sendEvent({
type: 'execution:error',
timestamp: new Date().toISOString(),
@@ -815,18 +867,18 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
},
})
} finally {
if (timeoutId) clearTimeout(timeoutId)
if (!isStreamClosed) {
try {
controller.enqueue(encoder.encode('data: [DONE]\n\n'))
controller.close()
} catch {
// Stream already closed - nothing to do
}
} catch {}
}
}
},
cancel() {
isStreamClosed = true
if (timeoutId) clearTimeout(timeoutId)
logger.info(`[${requestId}] Client aborted SSE stream, signalling cancellation`)
abortController.abort()
markExecutionCancelled(executionId).catch(() => {})

View File

@@ -1,7 +1,7 @@
import type React from 'react'
import { memo, useCallback, useEffect, useMemo, useRef, useState } from 'react'
import { createLogger } from '@sim/logger'
import { ChevronRight, Loader2, ServerIcon, WrenchIcon, XIcon } from 'lucide-react'
import { Loader2, WrenchIcon, XIcon } from 'lucide-react'
import { useParams } from 'next/navigation'
import {
Badge,
@@ -111,33 +111,18 @@ interface ToolInputProps {
* Represents a tool selected and configured in the workflow
*
* @remarks
* Valid types include:
* - Standard block types (e.g., 'api', 'search', 'function')
* - 'custom-tool': User-defined tools with custom code
* - 'mcp': Individual MCP tool from a connected server
* - 'mcp-server': All tools from an MCP server (agent discovery mode).
* At execution time, this expands into individual tool definitions for
* all tools available on the server.
*
* For custom tools (new format), we only store: type, customToolId, usageControl, isExpanded.
* Everything else (title, schema, code) is loaded dynamically from the database.
* Legacy custom tools with inline schema/code are still supported for backwards compatibility.
*/
interface StoredTool {
/**
* Block type identifier.
* 'mcp-server' enables server-level selection where all tools from
* the server are made available to the LLM at execution time.
*/
/** Block type identifier */
type: string
/** Display title for the tool (optional for new custom tool format) */
title?: string
/** Direct tool ID for execution (optional for new custom tool format) */
toolId?: string
/**
* Parameter values configured by the user.
* For 'mcp-server' type, includes: serverId, serverUrl, serverName, toolCount
*/
/** Parameter values configured by the user (optional for new custom tool format) */
params?: Record<string, string>
/** Whether the tool details are expanded in UI */
isExpanded?: boolean
@@ -1022,7 +1007,6 @@ export const ToolInput = memo(function ToolInput({
const [draggedIndex, setDraggedIndex] = useState<number | null>(null)
const [dragOverIndex, setDragOverIndex] = useState<number | null>(null)
const [usageControlPopoverIndex, setUsageControlPopoverIndex] = useState<number | null>(null)
const [expandedMcpServers, setExpandedMcpServers] = useState<Set<string>>(new Set())
const value = isPreview ? previewValue : storeValue
@@ -1252,18 +1236,6 @@ export const ToolInput = memo(function ToolInput({
return selectedTools.some((tool) => tool.type === 'mcp' && tool.toolId === mcpToolId)
}
/**
* Checks if an MCP server is already selected (all tools mode).
*
* @param serverId - The MCP server identifier to check
* @returns `true` if the MCP server is already selected
*/
const isMcpServerAlreadySelected = (serverId: string): boolean => {
return selectedTools.some(
(tool) => tool.type === 'mcp-server' && tool.params?.serverId === serverId
)
}
/**
* Checks if a custom tool is already selected.
*
@@ -1288,37 +1260,6 @@ export const ToolInput = memo(function ToolInput({
)
}
/**
* Groups MCP tools by their parent server.
*
* @returns Map of serverId to array of tools
*/
const mcpToolsByServer = useMemo(() => {
const grouped = new Map<string, typeof availableMcpTools>()
for (const tool of availableMcpTools) {
if (!grouped.has(tool.serverId)) {
grouped.set(tool.serverId, [])
}
grouped.get(tool.serverId)!.push(tool)
}
return grouped
}, [availableMcpTools])
/**
* Toggles the expanded state of an MCP server in the dropdown.
*/
const toggleMcpServerExpanded = useCallback((serverId: string) => {
setExpandedMcpServers((prev) => {
const next = new Set(prev)
if (next.has(serverId)) {
next.delete(serverId)
} else {
next.add(serverId)
}
return next
})
}, [])
/**
* Checks if a block supports multiple operations.
*
@@ -1864,125 +1805,41 @@ export const ToolInput = memo(function ToolInput({
})
}
// MCP Servers section - grouped by server with expandable folders
if (!permissionConfig.disableMcpTools && mcpToolsByServer.size > 0) {
// Create items for each server (as expandable folders)
const serverItems: ComboboxOption[] = []
for (const [serverId, tools] of mcpToolsByServer) {
const server = mcpServers.find((s) => s.id === serverId)
const serverName = tools[0]?.serverName || server?.name || 'Unknown Server'
const isExpanded = expandedMcpServers.has(serverId)
const serverAlreadySelected = isMcpServerAlreadySelected(serverId)
const toolCount = tools.length
// Server folder header (clickable to expand/collapse)
serverItems.push({
label: serverName,
value: `mcp-server-folder-${serverId}`,
iconElement: (
<div className='flex items-center gap-[4px]'>
<ChevronRight
className={cn(
'h-[12px] w-[12px] text-[var(--text-tertiary)] transition-transform',
isExpanded && 'rotate-90'
)}
/>
<div
className='flex h-[16px] w-[16px] flex-shrink-0 items-center justify-center rounded-[4px]'
style={{ background: '#6366F1' }}
>
<ServerIcon className='h-[10px] w-[10px] text-white' />
</div>
</div>
),
onSelect: () => {
toggleMcpServerExpanded(serverId)
},
disabled: false,
keepOpen: true, // Keep dropdown open when toggling folder expansion
})
// If expanded, show "Use all tools" option and individual tools
if (isExpanded) {
// "Use all tools from server" option
serverItems.push({
label: `Use all ${toolCount} tools`,
value: `mcp-server-all-${serverId}`,
iconElement: (
<div className='ml-[20px] flex h-[16px] w-[16px] flex-shrink-0 items-center justify-center rounded-[4px] bg-[#6366F1]'>
<McpIcon className='h-[10px] w-[10px] text-white' />
</div>
),
onSelect: () => {
if (serverAlreadySelected) return
// Remove any individual tools from this server that were previously selected
const filteredTools = selectedTools.filter(
(tool) => !(tool.type === 'mcp' && tool.params?.serverId === serverId)
)
const newTool: StoredTool = {
type: 'mcp-server',
title: `${serverName} (all tools)`,
toolId: `mcp-server-${serverId}`,
params: {
serverId,
...(server?.url && { serverUrl: server.url }),
serverName,
toolCount: String(toolCount),
},
isExpanded: false,
usageControl: 'auto',
}
setStoreValue([
...filteredTools.map((tool) => ({ ...tool, isExpanded: false })),
newTool,
])
setOpen(false)
},
disabled: isPreview || disabled || serverAlreadySelected,
})
// Individual tools from this server
for (const mcpTool of tools) {
const alreadySelected = isMcpToolAlreadySelected(mcpTool.id) || serverAlreadySelected
serverItems.push({
label: mcpTool.name,
value: `mcp-${mcpTool.id}`,
iconElement: (
<div className='ml-[20px]'>
{createToolIcon(mcpTool.bgColor || '#6366F1', mcpTool.icon || McpIcon)}
</div>
),
onSelect: () => {
if (alreadySelected) return
const newTool: StoredTool = {
type: 'mcp',
title: mcpTool.name,
toolId: mcpTool.id,
params: {
serverId: mcpTool.serverId,
...(server?.url && { serverUrl: server.url }),
toolName: mcpTool.name,
serverName: mcpTool.serverName,
},
isExpanded: true,
usageControl: 'auto',
schema: {
...mcpTool.inputSchema,
description: mcpTool.description,
},
}
handleMcpToolSelect(newTool, true)
},
disabled: isPreview || disabled || alreadySelected,
})
}
}
}
// MCP Tools section
if (!permissionConfig.disableMcpTools && availableMcpTools.length > 0) {
groups.push({
section: 'MCP Servers',
items: serverItems,
section: 'MCP Tools',
items: availableMcpTools.map((mcpTool) => {
const server = mcpServers.find((s) => s.id === mcpTool.serverId)
const alreadySelected = isMcpToolAlreadySelected(mcpTool.id)
return {
label: mcpTool.name,
value: `mcp-${mcpTool.id}`,
iconElement: createToolIcon(mcpTool.bgColor || '#6366F1', mcpTool.icon || McpIcon),
onSelect: () => {
if (alreadySelected) return
const newTool: StoredTool = {
type: 'mcp',
title: mcpTool.name,
toolId: mcpTool.id,
params: {
serverId: mcpTool.serverId,
...(server?.url && { serverUrl: server.url }),
toolName: mcpTool.name,
serverName: mcpTool.serverName,
},
isExpanded: true,
usageControl: 'auto',
schema: {
...mcpTool.inputSchema,
description: mcpTool.description,
},
}
handleMcpToolSelect(newTool, true)
},
disabled: isPreview || disabled || alreadySelected,
}
}),
})
}
@@ -2065,8 +1922,6 @@ export const ToolInput = memo(function ToolInput({
customTools,
availableMcpTools,
mcpServers,
mcpToolsByServer,
expandedMcpServers,
toolBlocks,
isPreview,
disabled,
@@ -2080,10 +1935,8 @@ export const ToolInput = memo(function ToolInput({
getToolIdForOperation,
isToolAlreadySelected,
isMcpToolAlreadySelected,
isMcpServerAlreadySelected,
isCustomToolAlreadySelected,
isWorkflowAlreadySelected,
toggleMcpServerExpanded,
])
const toolRequiresOAuth = (toolId: string): boolean => {
@@ -2510,25 +2363,24 @@ export const ToolInput = memo(function ToolInput({
{/* Selected Tools List */}
{selectedTools.length > 0 &&
selectedTools.map((tool, toolIndex) => {
// Handle custom tools, MCP tools, MCP servers, and workflow tools differently
// Handle custom tools, MCP tools, and workflow tools differently
const isCustomTool = tool.type === 'custom-tool'
const isMcpTool = tool.type === 'mcp'
const isMcpServer = tool.type === 'mcp-server'
const isWorkflowTool = tool.type === 'workflow'
const toolBlock =
!isCustomTool && !isMcpTool && !isMcpServer
!isCustomTool && !isMcpTool
? toolBlocks.find((block) => block.type === tool.type)
: null
// Get the current tool ID (may change based on operation)
const currentToolId =
!isCustomTool && !isMcpTool && !isMcpServer
!isCustomTool && !isMcpTool
? getToolIdForOperation(tool.type, tool.operation) || tool.toolId || ''
: tool.toolId || ''
// Get tool parameters using the new utility with block type for UI components
const toolParams =
!isCustomTool && !isMcpTool && !isMcpServer && currentToolId
!isCustomTool && !isMcpTool && currentToolId
? getToolParametersConfig(currentToolId, tool.type, {
operation: tool.operation,
...tool.params,
@@ -2597,32 +2449,21 @@ export const ToolInput = memo(function ToolInput({
? customToolParams
: isMcpTool
? mcpToolParams
: isMcpServer
? [] // MCP servers have no user-configurable params
: toolParams?.userInputParameters || []
: toolParams?.userInputParameters || []
// Check if tool requires OAuth
const requiresOAuth =
!isCustomTool &&
!isMcpTool &&
!isMcpServer &&
currentToolId &&
toolRequiresOAuth(currentToolId)
!isCustomTool && !isMcpTool && currentToolId && toolRequiresOAuth(currentToolId)
const oauthConfig =
!isCustomTool && !isMcpTool && !isMcpServer && currentToolId
? getToolOAuthConfig(currentToolId)
: null
!isCustomTool && !isMcpTool && currentToolId ? getToolOAuthConfig(currentToolId) : null
// Determine if tool has expandable body content
const hasOperations =
!isCustomTool && !isMcpTool && !isMcpServer && hasMultipleOperations(tool.type)
const hasOperations = !isCustomTool && !isMcpTool && hasMultipleOperations(tool.type)
const filteredDisplayParams = displayParams.filter((param) =>
evaluateParameterCondition(param, tool)
)
// MCP servers are expandable to show tool list
const hasToolBody = isMcpServer
? true
: hasOperations || (requiresOAuth && oauthConfig) || filteredDisplayParams.length > 0
const hasToolBody =
hasOperations || (requiresOAuth && oauthConfig) || filteredDisplayParams.length > 0
// Only show expansion if tool has body content
const isExpandedForDisplay = hasToolBody
@@ -2631,11 +2472,6 @@ export const ToolInput = memo(function ToolInput({
: !!tool.isExpanded
: false
// For MCP servers, get the list of tools for display
const mcpServerTools = isMcpServer
? availableMcpTools.filter((t) => t.serverId === tool.params?.serverId)
: []
return (
<div
key={`${tool.customToolId || tool.toolId || toolIndex}-${toolIndex}`}
@@ -2672,7 +2508,7 @@ export const ToolInput = memo(function ToolInput({
style={{
backgroundColor: isCustomTool
? '#3B82F6'
: isMcpTool || isMcpServer
: isMcpTool
? mcpTool?.bgColor || '#6366F1'
: isWorkflowTool
? '#6366F1'
@@ -2683,8 +2519,6 @@ export const ToolInput = memo(function ToolInput({
<WrenchIcon className='h-[10px] w-[10px] text-white' />
) : isMcpTool ? (
<IconComponent icon={McpIcon} className='h-[10px] w-[10px] text-white' />
) : isMcpServer ? (
<ServerIcon className='h-[10px] w-[10px] text-white' />
) : isWorkflowTool ? (
<IconComponent icon={WorkflowIcon} className='h-[10px] w-[10px] text-white' />
) : (
@@ -2697,11 +2531,6 @@ export const ToolInput = memo(function ToolInput({
<span className='truncate font-medium text-[13px] text-[var(--text-primary)]'>
{isCustomTool ? customToolTitle : tool.title}
</span>
{isMcpServer && (
<Badge variant='default' size='sm'>
{tool.params?.toolCount || mcpServerTools.length} tools
</Badge>
)}
{isMcpTool &&
!mcpDataLoading &&
(() => {
@@ -2807,53 +2636,31 @@ export const ToolInput = memo(function ToolInput({
{!isCustomTool && isExpandedForDisplay && (
<div className='flex flex-col gap-[10px] overflow-visible rounded-b-[4px] border-[var(--border-1)] border-t px-[8px] py-[8px]'>
{/* MCP Server tool list (read-only) */}
{isMcpServer && mcpServerTools.length > 0 && (
<div className='flex flex-col gap-[4px]'>
<div className='font-medium text-[12px] text-[var(--text-tertiary)]'>
Available tools:
</div>
<div className='flex flex-wrap gap-[4px]'>
{mcpServerTools.map((serverTool) => (
<Badge
key={serverTool.id}
variant='outline'
size='sm'
className='text-[11px]'
>
{serverTool.name}
</Badge>
))}
</div>
</div>
)}
{/* Operation dropdown for tools with multiple operations */}
{!isMcpServer &&
(() => {
const hasOperations = hasMultipleOperations(tool.type)
const operationOptions = hasOperations ? getOperationOptions(tool.type) : []
{(() => {
const hasOperations = hasMultipleOperations(tool.type)
const operationOptions = hasOperations ? getOperationOptions(tool.type) : []
return hasOperations && operationOptions.length > 0 ? (
<div className='relative space-y-[6px]'>
<div className='font-medium text-[13px] text-[var(--text-primary)]'>
Operation
</div>
<Combobox
options={operationOptions
.filter((option) => option.id !== '')
.map((option) => ({
label: option.label,
value: option.id,
}))}
value={tool.operation || operationOptions[0].id}
onChange={(value) => handleOperationChange(toolIndex, value)}
placeholder='Select operation'
disabled={disabled}
/>
return hasOperations && operationOptions.length > 0 ? (
<div className='relative space-y-[6px]'>
<div className='font-medium text-[13px] text-[var(--text-primary)]'>
Operation
</div>
) : null
})()}
<Combobox
options={operationOptions
.filter((option) => option.id !== '')
.map((option) => ({
label: option.label,
value: option.id,
}))}
value={tool.operation || operationOptions[0].id}
onChange={(value) => handleOperationChange(toolIndex, value)}
placeholder='Select operation'
disabled={disabled}
/>
</div>
) : null
})()}
{/* OAuth credential selector if required */}
{requiresOAuth && oauthConfig && (

View File

@@ -27,7 +27,7 @@ import { useExecutionStore } from '@/stores/execution'
import { useNotificationStore } from '@/stores/notifications'
import { useVariablesStore } from '@/stores/panel'
import { useEnvironmentStore } from '@/stores/settings/environment'
import { type ConsoleEntry, useTerminalConsoleStore } from '@/stores/terminal'
import { useTerminalConsoleStore } from '@/stores/terminal'
import { useWorkflowDiffStore } from '@/stores/workflow-diff'
import { useWorkflowRegistry } from '@/stores/workflows/registry/store'
import { mergeSubblockState } from '@/stores/workflows/utils'
@@ -1153,30 +1153,29 @@ export function useWorkflowExecution() {
logs: accumulatedBlockLogs,
}
// Only add workflow-level error if no blocks have executed yet
// This catches pre-execution errors (validation, serialization, etc.)
// Block execution errors are already logged via onBlockError callback
const { entries } = useTerminalConsoleStore.getState()
const existingLogs = entries.filter(
(log: ConsoleEntry) => log.executionId === executionId
)
if (activeWorkflowId) {
cancelRunningEntries(activeWorkflowId)
}
if (existingLogs.length === 0) {
// No blocks executed yet - this is a pre-execution error
addConsole({
input: {},
output: {},
success: false,
error: data.error,
durationMs: data.duration || 0,
startedAt: new Date(Date.now() - (data.duration || 0)).toISOString(),
endedAt: new Date().toISOString(),
workflowId: activeWorkflowId,
blockId: 'validation',
executionId,
blockName: 'Workflow Validation',
blockType: 'validation',
})
addConsole({
input: {},
output: {},
success: false,
error: data.error,
durationMs: data.duration || 0,
startedAt: new Date(Date.now() - (data.duration || 0)).toISOString(),
endedAt: new Date().toISOString(),
workflowId: activeWorkflowId,
blockId: 'workflow-error',
executionId,
blockName: 'Workflow Error',
blockType: 'error',
})
},
onExecutionCancelled: () => {
if (activeWorkflowId) {
cancelRunningEntries(activeWorkflowId)
}
},
},
@@ -1718,13 +1717,28 @@ export function useWorkflowExecution() {
'Workflow was modified. Run the workflow again to enable running from block.',
workflowId,
})
} else {
addNotification({
level: 'error',
message: data.error || 'Run from block failed',
workflowId,
})
}
cancelRunningEntries(workflowId)
addConsole({
input: {},
output: {},
success: false,
error: data.error,
durationMs: data.duration || 0,
startedAt: new Date(Date.now() - (data.duration || 0)).toISOString(),
endedAt: new Date().toISOString(),
workflowId,
blockId: 'workflow-error',
executionId,
blockName: 'Workflow Error',
blockType: 'error',
})
},
onExecutionCancelled: () => {
cancelRunningEntries(workflowId)
},
},
})

View File

@@ -52,8 +52,6 @@ export type ComboboxOption = {
onSelect?: () => void
/** Whether this option is disabled */
disabled?: boolean
/** When true, keep the dropdown open after selecting this option */
keepOpen?: boolean
}
/**
@@ -254,15 +252,13 @@ const Combobox = memo(
* Handles selection of an option
*/
const handleSelect = useCallback(
(selectedValue: string, customOnSelect?: () => void, keepOpen?: boolean) => {
(selectedValue: string, customOnSelect?: () => void) => {
// If option has custom onSelect, use it instead
if (customOnSelect) {
customOnSelect()
if (!keepOpen) {
setOpen(false)
setHighlightedIndex(-1)
setSearchQuery('')
}
setOpen(false)
setHighlightedIndex(-1)
setSearchQuery('')
return
}
@@ -274,13 +270,11 @@ const Combobox = memo(
onMultiSelectChange(newValues)
} else {
onChange?.(selectedValue)
if (!keepOpen) {
setOpen(false)
setHighlightedIndex(-1)
setSearchQuery('')
if (editable && inputRef.current) {
inputRef.current.blur()
}
setOpen(false)
setHighlightedIndex(-1)
setSearchQuery('')
if (editable && inputRef.current) {
inputRef.current.blur()
}
}
},
@@ -349,7 +343,7 @@ const Combobox = memo(
e.preventDefault()
const selectedOption = filteredOptions[highlightedIndex]
if (selectedOption && !selectedOption.disabled) {
handleSelect(selectedOption.value, selectedOption.onSelect, selectedOption.keepOpen)
handleSelect(selectedOption.value, selectedOption.onSelect)
}
} else if (!editable) {
e.preventDefault()
@@ -674,7 +668,7 @@ const Combobox = memo(
e.preventDefault()
e.stopPropagation()
if (!option.disabled) {
handleSelect(option.value, option.onSelect, option.keepOpen)
handleSelect(option.value, option.onSelect)
}
}}
onMouseEnter={() =>
@@ -749,7 +743,7 @@ const Combobox = memo(
e.preventDefault()
e.stopPropagation()
if (!option.disabled) {
handleSelect(option.value, option.onSelect, option.keepOpen)
handleSelect(option.value, option.onSelect)
}
}}
onMouseEnter={() => !option.disabled && setHighlightedIndex(index)}

View File

@@ -185,10 +185,16 @@ export const HTTP = {
},
} as const
import { getMaxExecutionTimeout } from '@/lib/core/execution-limits'
export const AGENT = {
DEFAULT_MODEL: 'claude-sonnet-4-5',
DEFAULT_FUNCTION_TIMEOUT: 600000,
REQUEST_TIMEOUT: 600000,
get DEFAULT_FUNCTION_TIMEOUT() {
return getMaxExecutionTimeout()
},
get REQUEST_TIMEOUT() {
return getMaxExecutionTimeout()
},
CUSTOM_TOOL_PREFIX: 'custom_',
} as const

View File

@@ -143,7 +143,7 @@ export class AgentBlockHandler implements BlockHandler {
private async validateToolPermissions(ctx: ExecutionContext, tools: ToolInput[]): Promise<void> {
if (!Array.isArray(tools) || tools.length === 0) return
const hasMcpTools = tools.some((t) => t.type === 'mcp' || t.type === 'mcp-server')
const hasMcpTools = tools.some((t) => t.type === 'mcp')
const hasCustomTools = tools.some((t) => t.type === 'custom-tool')
if (hasMcpTools) {
@@ -161,7 +161,7 @@ export class AgentBlockHandler implements BlockHandler {
): Promise<ToolInput[]> {
if (!Array.isArray(tools) || tools.length === 0) return tools
const mcpTools = tools.filter((t) => t.type === 'mcp' || t.type === 'mcp-server')
const mcpTools = tools.filter((t) => t.type === 'mcp')
if (mcpTools.length === 0) return tools
const serverIds = [...new Set(mcpTools.map((t) => t.params?.serverId).filter(Boolean))]
@@ -195,7 +195,7 @@ export class AgentBlockHandler implements BlockHandler {
}
return tools.filter((tool) => {
if (tool.type !== 'mcp' && tool.type !== 'mcp-server') return true
if (tool.type !== 'mcp') return true
const serverId = tool.params?.serverId
if (!serverId) return false
return availableServerIds.has(serverId)
@@ -211,14 +211,11 @@ export class AgentBlockHandler implements BlockHandler {
})
const mcpTools: ToolInput[] = []
const mcpServers: ToolInput[] = []
const otherTools: ToolInput[] = []
for (const tool of filtered) {
if (tool.type === 'mcp') {
mcpTools.push(tool)
} else if (tool.type === 'mcp-server') {
mcpServers.push(tool)
} else {
otherTools.push(tool)
}
@@ -227,12 +224,7 @@ export class AgentBlockHandler implements BlockHandler {
const otherResults = await Promise.all(
otherTools.map(async (tool) => {
try {
if (
tool.type &&
tool.type !== 'custom-tool' &&
tool.type !== 'mcp' &&
tool.type !== 'mcp-server'
) {
if (tool.type && tool.type !== 'custom-tool' && tool.type !== 'mcp') {
await validateBlockType(ctx.userId, tool.type, ctx)
}
if (tool.type === 'custom-tool' && (tool.schema || tool.customToolId)) {
@@ -248,133 +240,12 @@ export class AgentBlockHandler implements BlockHandler {
const mcpResults = await this.processMcpToolsBatched(ctx, mcpTools)
// Process MCP servers (all tools from server mode)
const mcpServerResults = await this.processMcpServerSelections(ctx, mcpServers)
const allTools = [...otherResults, ...mcpResults, ...mcpServerResults]
const allTools = [...otherResults, ...mcpResults]
return allTools.filter(
(tool): tool is NonNullable<typeof tool> => tool !== null && tool !== undefined
)
}
/**
* Process MCP server selections by discovering and formatting all tools from each server.
* This enables "agent discovery" mode where the LLM can call any tool from the server.
*/
private async processMcpServerSelections(
ctx: ExecutionContext,
mcpServerSelections: ToolInput[]
): Promise<any[]> {
if (mcpServerSelections.length === 0) return []
const results: any[] = []
for (const serverSelection of mcpServerSelections) {
const serverId = serverSelection.params?.serverId
const serverName = serverSelection.params?.serverName
const usageControl = serverSelection.usageControl || 'auto'
if (!serverId) {
logger.error('MCP server selection missing serverId:', serverSelection)
continue
}
try {
// Discover all tools from this server
const discoveredTools = await this.discoverMcpToolsForServer(ctx, serverId)
// Create tool definitions for each discovered tool
for (const mcpTool of discoveredTools) {
const created = await this.createMcpToolFromDiscoveredServerTool(
ctx,
mcpTool,
serverId,
serverName || serverId,
usageControl
)
if (created) results.push(created)
}
logger.info(
`[AgentHandler] Expanded MCP server ${serverName} into ${discoveredTools.length} tools`
)
} catch (error) {
logger.error(`[AgentHandler] Failed to process MCP server selection:`, { serverId, error })
}
}
return results
}
/**
* Create an MCP tool from server discovery for the "all tools" mode.
*/
private async createMcpToolFromDiscoveredServerTool(
ctx: ExecutionContext,
mcpTool: any,
serverId: string,
serverName: string,
usageControl: string
): Promise<any> {
const toolName = mcpTool.name
const { filterSchemaForLLM } = await import('@/tools/params')
const filteredSchema = filterSchemaForLLM(
mcpTool.inputSchema || { type: 'object', properties: {} },
{}
)
const toolId = createMcpToolId(serverId, toolName)
return {
id: toolId,
name: toolName,
description: mcpTool.description || `MCP tool ${toolName} from ${serverName}`,
parameters: filteredSchema,
params: {},
usageControl,
executeFunction: async (callParams: Record<string, any>) => {
const headers = await buildAuthHeaders()
const execUrl = buildAPIUrl('/api/mcp/tools/execute')
const execResponse = await fetch(execUrl.toString(), {
method: 'POST',
headers,
body: stringifyJSON({
serverId,
toolName,
arguments: callParams,
workspaceId: ctx.workspaceId,
workflowId: ctx.workflowId,
toolSchema: mcpTool.inputSchema,
}),
})
if (!execResponse.ok) {
throw new Error(
`MCP tool execution failed: ${execResponse.status} ${execResponse.statusText}`
)
}
const result = await execResponse.json()
if (!result.success) {
throw new Error(result.error || 'MCP tool execution failed')
}
return {
success: true,
output: result.data.output || {},
metadata: {
source: 'mcp-server',
serverId,
serverName,
toolName,
},
}
},
}
}
private async createCustomTool(ctx: ExecutionContext, tool: ToolInput): Promise<any> {
const userProvidedParams = tool.params || {}

View File

@@ -29,36 +29,11 @@ export interface AgentInputs {
verbosity?: string
}
/**
* Represents a tool input for the agent block.
*
* @remarks
* Valid types include:
* - Standard block types (e.g., 'api', 'search', 'function')
* - 'custom-tool': User-defined tools with custom code
* - 'mcp': Individual MCP tool from a connected server
* - 'mcp-server': All tools from an MCP server (agent discovery mode).
* At execution time, this is expanded into individual tool definitions
* for all tools available on the server. This enables dynamic capability
* discovery where the LLM can call any tool from the server.
*/
export interface ToolInput {
/**
* Tool type identifier.
* 'mcp-server' enables server-level selection where all tools from
* the server are made available to the LLM at execution time.
*/
type?: string
schema?: any
title?: string
code?: string
/**
* Tool parameters. For 'mcp-server' type, includes:
* - serverId: The MCP server ID
* - serverUrl: The server URL (optional)
* - serverName: Human-readable server name
* - toolCount: Number of tools available (for display)
*/
params?: Record<string, any>
timeout?: number
usageControl?: 'auto' | 'force' | 'none'

View File

@@ -14,7 +14,7 @@ const sleep = async (ms: number, options: SleepOptions = {}): Promise<boolean> =
const { signal, executionId } = options
const useRedis = isRedisCancellationEnabled() && !!executionId
if (!useRedis && signal?.aborted) {
if (signal?.aborted) {
return false
}
@@ -27,7 +27,7 @@ const sleep = async (ms: number, options: SleepOptions = {}): Promise<boolean> =
const cleanup = () => {
if (mainTimeoutId) clearTimeout(mainTimeoutId)
if (checkIntervalId) clearInterval(checkIntervalId)
if (!useRedis && signal) signal.removeEventListener('abort', onAbort)
if (signal) signal.removeEventListener('abort', onAbort)
}
const onAbort = () => {
@@ -37,6 +37,10 @@ const sleep = async (ms: number, options: SleepOptions = {}): Promise<boolean> =
resolve(false)
}
if (signal) {
signal.addEventListener('abort', onAbort, { once: true })
}
if (useRedis) {
checkIntervalId = setInterval(async () => {
if (resolved) return
@@ -49,8 +53,6 @@ const sleep = async (ms: number, options: SleepOptions = {}): Promise<boolean> =
}
} catch {}
}, CANCELLATION_CHECK_INTERVAL_MS)
} else if (signal) {
signal.addEventListener('abort', onAbort, { once: true })
}
mainTimeoutId = setTimeout(() => {

View File

@@ -1,7 +1,10 @@
export { AGENT_CARD_PATH } from '@a2a-js/sdk'
import { DEFAULT_EXECUTION_TIMEOUT_MS } from '@/lib/core/execution-limits'
export const A2A_PROTOCOL_VERSION = '0.3.0'
export const A2A_DEFAULT_TIMEOUT = 300000
export const A2A_DEFAULT_TIMEOUT = DEFAULT_EXECUTION_TIMEOUT_MS
/**
* Maximum number of messages stored per task in the database.

View File

@@ -5,11 +5,9 @@ import type { ToolUIConfig } from './ui-config'
const baseToolLogger = createLogger('BaseClientTool')
/** Default timeout for tool execution (5 minutes) */
const DEFAULT_TOOL_TIMEOUT_MS = 2 * 60 * 1000
const DEFAULT_TOOL_TIMEOUT_MS = 5 * 60 * 1000
/** Timeout for tools that run workflows (10 minutes) */
export const WORKFLOW_EXECUTION_TIMEOUT_MS = 10 * 60 * 1000
export const WORKFLOW_EXECUTION_TIMEOUT_MS = 5 * 60 * 1000
// Client tool call states used by the new runtime
export enum ClientToolCallState {

View File

@@ -170,6 +170,11 @@ export const env = createEnv({
RATE_LIMIT_ENTERPRISE_SYNC: z.string().optional().default('600'), // Enterprise tier sync API executions per minute
RATE_LIMIT_ENTERPRISE_ASYNC: z.string().optional().default('5000'), // Enterprise tier async API executions per minute
EXECUTION_TIMEOUT_FREE: z.string().optional().default('300'),
EXECUTION_TIMEOUT_PRO: z.string().optional().default('3600'),
EXECUTION_TIMEOUT_TEAM: z.string().optional().default('3600'),
EXECUTION_TIMEOUT_ENTERPRISE: z.string().optional().default('3600'),
// Knowledge Base Processing Configuration - Shared across all processing methods
KB_CONFIG_MAX_DURATION: z.number().optional().default(600), // Max processing duration in seconds (10 minutes)
KB_CONFIG_MAX_ATTEMPTS: z.number().optional().default(3), // Max retry attempts

View File

@@ -0,0 +1 @@
export * from './types'

View File

@@ -0,0 +1,122 @@
import { env } from '@/lib/core/config/env'
import type { SubscriptionPlan } from '@/lib/core/rate-limiter/types'
export interface ExecutionTimeoutConfig {
sync: number
async: number
}
const DEFAULT_SYNC_TIMEOUTS = {
free: 300,
pro: 3600,
team: 3600,
enterprise: 3600,
} as const
const ASYNC_TIMEOUT_SECONDS = 5400
function getSyncTimeoutForPlan(plan: SubscriptionPlan): number {
const envVarMap: Record<SubscriptionPlan, string | undefined> = {
free: env.EXECUTION_TIMEOUT_FREE,
pro: env.EXECUTION_TIMEOUT_PRO,
team: env.EXECUTION_TIMEOUT_TEAM,
enterprise: env.EXECUTION_TIMEOUT_ENTERPRISE,
}
return (Number.parseInt(envVarMap[plan] || '') || DEFAULT_SYNC_TIMEOUTS[plan]) * 1000
}
export const EXECUTION_TIMEOUTS: Record<SubscriptionPlan, ExecutionTimeoutConfig> = {
free: {
sync: getSyncTimeoutForPlan('free'),
async: ASYNC_TIMEOUT_SECONDS * 1000,
},
pro: {
sync: getSyncTimeoutForPlan('pro'),
async: ASYNC_TIMEOUT_SECONDS * 1000,
},
team: {
sync: getSyncTimeoutForPlan('team'),
async: ASYNC_TIMEOUT_SECONDS * 1000,
},
enterprise: {
sync: getSyncTimeoutForPlan('enterprise'),
async: ASYNC_TIMEOUT_SECONDS * 1000,
},
}
export function getExecutionTimeout(
plan: SubscriptionPlan | undefined,
type: 'sync' | 'async' = 'sync'
): number {
return EXECUTION_TIMEOUTS[plan || 'free'][type]
}
export function getExecutionTimeoutSeconds(
plan: SubscriptionPlan | undefined,
type: 'sync' | 'async' = 'sync'
): number {
return Math.floor(getExecutionTimeout(plan, type) / 1000)
}
export function getMaxExecutionTimeout(): number {
return EXECUTION_TIMEOUTS.enterprise.async
}
export const DEFAULT_EXECUTION_TIMEOUT_MS = EXECUTION_TIMEOUTS.free.sync
export class ExecutionTimeoutError extends Error {
constructor(
public readonly timeoutMs: number,
public readonly plan?: SubscriptionPlan
) {
const timeoutSeconds = Math.floor(timeoutMs / 1000)
const timeoutMinutes = Math.floor(timeoutSeconds / 60)
const displayTime =
timeoutMinutes > 0
? `${timeoutMinutes} minute${timeoutMinutes > 1 ? 's' : ''}`
: `${timeoutSeconds} seconds`
super(`Execution timed out after ${displayTime}`)
this.name = 'ExecutionTimeoutError'
}
}
export function isTimeoutError(error: unknown): boolean {
if (error instanceof ExecutionTimeoutError) return true
if (!(error instanceof Error)) return false
const name = error.name.toLowerCase()
const message = error.message.toLowerCase()
return (
name === 'timeouterror' ||
name === 'aborterror' ||
message.includes('timeout') ||
message.includes('timed out') ||
message.includes('aborted')
)
}
export function createTimeoutError(
timeoutMs: number,
plan?: SubscriptionPlan
): ExecutionTimeoutError {
return new ExecutionTimeoutError(timeoutMs, plan)
}
export function getTimeoutErrorMessage(error: unknown, timeoutMs?: number): string {
if (error instanceof ExecutionTimeoutError) {
return error.message
}
if (timeoutMs) {
const timeoutSeconds = Math.floor(timeoutMs / 1000)
const timeoutMinutes = Math.floor(timeoutSeconds / 60)
const displayTime =
timeoutMinutes > 0
? `${timeoutMinutes} minute${timeoutMinutes > 1 ? 's' : ''}`
: `${timeoutSeconds} seconds`
return `Execution timed out after ${displayTime}`
}
return 'Execution timed out'
}

View File

@@ -1,7 +1,3 @@
/**
* Execution timeout constants
*
* DEFAULT_EXECUTION_TIMEOUT_MS: The default timeout for executing user code (10 minutes)
*/
import { DEFAULT_EXECUTION_TIMEOUT_MS } from '@/lib/core/execution-limits'
export const DEFAULT_EXECUTION_TIMEOUT_MS = 600000 // 10 minutes (600 seconds)
export { DEFAULT_EXECUTION_TIMEOUT_MS }

View File

@@ -4,7 +4,9 @@ import { createLogger } from '@sim/logger'
import { eq } from 'drizzle-orm'
import { checkServerSideUsageLimits } from '@/lib/billing/calculations/usage-monitor'
import { getHighestPrioritySubscription } from '@/lib/billing/core/subscription'
import { getExecutionTimeout } from '@/lib/core/execution-limits'
import { RateLimiter } from '@/lib/core/rate-limiter/rate-limiter'
import type { SubscriptionPlan } from '@/lib/core/rate-limiter/types'
import { LoggingSession } from '@/lib/logs/execution/logging-session'
import { getWorkspaceBilledAccountUserId } from '@/lib/workspaces/utils'
import type { CoreTriggerType } from '@/stores/logs/filters/types'
@@ -133,10 +135,10 @@ export interface PreprocessExecutionResult {
success: boolean
error?: {
message: string
statusCode: number // HTTP status code (401, 402, 403, 404, 429, 500)
logCreated: boolean // Whether error was logged to execution_logs
statusCode: number
logCreated: boolean
}
actorUserId?: string // The user ID that will be billed
actorUserId?: string
workflowRecord?: WorkflowRecord
userSubscription?: SubscriptionInfo | null
rateLimitInfo?: {
@@ -144,6 +146,10 @@ export interface PreprocessExecutionResult {
remaining: number
resetAt: Date
}
executionTimeout?: {
sync: number
async: number
}
}
type WorkflowRecord = typeof workflow.$inferSelect
@@ -484,12 +490,17 @@ export async function preprocessExecution(
triggerType,
})
const plan = userSubscription?.plan as SubscriptionPlan | undefined
return {
success: true,
actorUserId,
workflowRecord,
userSubscription,
rateLimitInfo,
executionTimeout: {
sync: getExecutionTimeout(plan, 'sync'),
async: getExecutionTimeout(plan, 'async'),
},
}
}

View File

@@ -776,11 +776,16 @@ export class LoggingSession {
await db
.update(workflowExecutionLogs)
.set({
level: 'error',
status: 'failed',
executionData: sql`jsonb_set(
COALESCE(execution_data, '{}'::jsonb),
ARRAY['error'],
to_jsonb(${message}::text)
jsonb_set(
COALESCE(execution_data, '{}'::jsonb),
ARRAY['error'],
to_jsonb(${message}::text)
),
ARRAY['finalOutput'],
jsonb_build_object('error', ${message}::text)
)`,
})
.where(eq(workflowExecutionLogs.executionId, executionId))

View File

@@ -12,6 +12,7 @@ import { Client } from '@modelcontextprotocol/sdk/client/index.js'
import { StreamableHTTPClientTransport } from '@modelcontextprotocol/sdk/client/streamableHttp.js'
import type { ListToolsResult, Tool } from '@modelcontextprotocol/sdk/types.js'
import { createLogger } from '@sim/logger'
import { getMaxExecutionTimeout } from '@/lib/core/execution-limits'
import {
McpConnectionError,
type McpConnectionStatus,
@@ -202,7 +203,7 @@ export class McpClient {
const sdkResult = await this.client.callTool(
{ name: toolCall.name, arguments: toolCall.arguments },
undefined,
{ timeout: 600000 } // 10 minutes - override SDK's 60s default
{ timeout: getMaxExecutionTimeout() }
)
return sdkResult as McpToolResult

View File

@@ -34,7 +34,7 @@ export function sanitizeHeaders(
* Client-safe MCP constants
*/
export const MCP_CLIENT_CONSTANTS = {
CLIENT_TIMEOUT: 600000,
CLIENT_TIMEOUT: 5 * 60 * 1000,
MAX_RETRIES: 3,
RECONNECT_DELAY: 1000,
} as const

View File

@@ -81,8 +81,8 @@ describe('generateMcpServerId', () => {
})
describe('MCP_CONSTANTS', () => {
it.concurrent('has correct execution timeout (10 minutes)', () => {
expect(MCP_CONSTANTS.EXECUTION_TIMEOUT).toBe(600000)
it.concurrent('has correct execution timeout (5 minutes)', () => {
expect(MCP_CONSTANTS.EXECUTION_TIMEOUT).toBe(300000)
})
it.concurrent('has correct cache timeout (5 minutes)', () => {
@@ -107,8 +107,8 @@ describe('MCP_CONSTANTS', () => {
})
describe('MCP_CLIENT_CONSTANTS', () => {
it.concurrent('has correct client timeout (10 minutes)', () => {
expect(MCP_CLIENT_CONSTANTS.CLIENT_TIMEOUT).toBe(600000)
it.concurrent('has correct client timeout (5 minutes)', () => {
expect(MCP_CLIENT_CONSTANTS.CLIENT_TIMEOUT).toBe(300000)
})
it.concurrent('has correct auto refresh interval (5 minutes)', () => {

View File

@@ -1,12 +1,11 @@
import { NextResponse } from 'next/server'
import { DEFAULT_EXECUTION_TIMEOUT_MS, getExecutionTimeout } from '@/lib/core/execution-limits'
import type { SubscriptionPlan } from '@/lib/core/rate-limiter/types'
import type { McpApiResponse } from '@/lib/mcp/types'
import { isMcpTool, MCP } from '@/executor/constants'
/**
* MCP-specific constants
*/
export const MCP_CONSTANTS = {
EXECUTION_TIMEOUT: 600000,
EXECUTION_TIMEOUT: DEFAULT_EXECUTION_TIMEOUT_MS,
CACHE_TIMEOUT: 5 * 60 * 1000,
DEFAULT_RETRIES: 3,
DEFAULT_CONNECTION_TIMEOUT: 30000,
@@ -14,6 +13,10 @@ export const MCP_CONSTANTS = {
MAX_CONSECUTIVE_FAILURES: 3,
} as const
export function getMcpExecutionTimeout(plan?: SubscriptionPlan): number {
return getExecutionTimeout(plan, 'sync')
}
/**
* Core MCP tool parameter keys that are metadata, not user-entered test values.
* These should be preserved when cleaning up params during schema updates.
@@ -45,11 +48,8 @@ export function sanitizeHeaders(
)
}
/**
* Client-safe MCP constants
*/
export const MCP_CLIENT_CONSTANTS = {
CLIENT_TIMEOUT: 600000,
CLIENT_TIMEOUT: DEFAULT_EXECUTION_TIMEOUT_MS,
AUTO_REFRESH_INTERVAL: 5 * 60 * 1000,
} as const

View File

@@ -62,9 +62,6 @@ export interface ExecutionErrorEvent extends BaseExecutionEvent {
}
}
/**
* Execution cancelled event
*/
export interface ExecutionCancelledEvent extends BaseExecutionEvent {
type: 'execution:cancelled'
workflowId: string
@@ -171,9 +168,6 @@ export type ExecutionEvent =
| StreamChunkEvent
| StreamDoneEvent
/**
* Extracted data types for use in callbacks
*/
export type ExecutionStartedData = ExecutionStartedEvent['data']
export type ExecutionCompletedData = ExecutionCompletedEvent['data']
export type ExecutionErrorData = ExecutionErrorEvent['data']

View File

@@ -1,8 +1,9 @@
import { DEFAULT_EXECUTION_TIMEOUT_MS } from '@/lib/core/execution-limits'
import type { RunActorParams, RunActorResult } from '@/tools/apify/types'
import type { ToolConfig } from '@/tools/types'
const POLL_INTERVAL_MS = 5000 // 5 seconds between polls
const MAX_POLL_TIME_MS = 300000 // 5 minutes maximum polling time
const POLL_INTERVAL_MS = 5000
const MAX_POLL_TIME_MS = DEFAULT_EXECUTION_TIMEOUT_MS
export const apifyRunActorAsyncTool: ToolConfig<RunActorParams, RunActorResult> = {
id: 'apify_run_actor_async',

View File

@@ -1,11 +1,12 @@
import { createLogger } from '@sim/logger'
import { DEFAULT_EXECUTION_TIMEOUT_MS } from '@/lib/core/execution-limits'
import type { BrowserUseRunTaskParams, BrowserUseRunTaskResponse } from '@/tools/browser_use/types'
import type { ToolConfig, ToolResponse } from '@/tools/types'
const logger = createLogger('BrowserUseTool')
const POLL_INTERVAL_MS = 5000
const MAX_POLL_TIME_MS = 600000 // 10 minutes
const MAX_POLL_TIME_MS = DEFAULT_EXECUTION_TIMEOUT_MS
const MAX_CONSECUTIVE_ERRORS = 3
async function createSessionWithProfile(

View File

@@ -1,11 +1,12 @@
import { createLogger } from '@sim/logger'
import { DEFAULT_EXECUTION_TIMEOUT_MS } from '@/lib/core/execution-limits'
import type { ExaResearchParams, ExaResearchResponse } from '@/tools/exa/types'
import type { ToolConfig } from '@/tools/types'
const logger = createLogger('ExaResearchTool')
const POLL_INTERVAL_MS = 5000 // 5 seconds between polls
const MAX_POLL_TIME_MS = 300000 // 5 minutes maximum polling time
const POLL_INTERVAL_MS = 5000
const MAX_POLL_TIME_MS = DEFAULT_EXECUTION_TIMEOUT_MS
export const researchTool: ToolConfig<ExaResearchParams, ExaResearchResponse> = {
id: 'exa_research',

View File

@@ -1,11 +1,12 @@
import { createLogger } from '@sim/logger'
import { DEFAULT_EXECUTION_TIMEOUT_MS } from '@/lib/core/execution-limits'
import type { AgentParams, AgentResponse } from '@/tools/firecrawl/types'
import type { ToolConfig } from '@/tools/types'
const logger = createLogger('FirecrawlAgentTool')
const POLL_INTERVAL_MS = 5000 // 5 seconds between polls
const MAX_POLL_TIME_MS = 300000 // 5 minutes maximum polling time
const POLL_INTERVAL_MS = 5000
const MAX_POLL_TIME_MS = DEFAULT_EXECUTION_TIMEOUT_MS
export const agentTool: ToolConfig<AgentParams, AgentResponse> = {
id: 'firecrawl_agent',

View File

@@ -1,12 +1,13 @@
import { createLogger } from '@sim/logger'
import { DEFAULT_EXECUTION_TIMEOUT_MS } from '@/lib/core/execution-limits'
import type { FirecrawlCrawlParams, FirecrawlCrawlResponse } from '@/tools/firecrawl/types'
import { CRAWLED_PAGE_OUTPUT_PROPERTIES } from '@/tools/firecrawl/types'
import type { ToolConfig } from '@/tools/types'
const logger = createLogger('FirecrawlCrawlTool')
const POLL_INTERVAL_MS = 5000 // 5 seconds between polls
const MAX_POLL_TIME_MS = 300000 // 5 minutes maximum polling time
const POLL_INTERVAL_MS = 5000
const MAX_POLL_TIME_MS = DEFAULT_EXECUTION_TIMEOUT_MS
export const crawlTool: ToolConfig<FirecrawlCrawlParams, FirecrawlCrawlResponse> = {
id: 'firecrawl_crawl',

View File

@@ -1,11 +1,12 @@
import { createLogger } from '@sim/logger'
import { DEFAULT_EXECUTION_TIMEOUT_MS } from '@/lib/core/execution-limits'
import type { ExtractParams, ExtractResponse } from '@/tools/firecrawl/types'
import type { ToolConfig } from '@/tools/types'
const logger = createLogger('FirecrawlExtractTool')
const POLL_INTERVAL_MS = 5000 // 5 seconds between polls
const MAX_POLL_TIME_MS = 300000 // 5 minutes maximum polling time
const POLL_INTERVAL_MS = 5000
const MAX_POLL_TIME_MS = DEFAULT_EXECUTION_TIMEOUT_MS
export const extractTool: ToolConfig<ExtractParams, ExtractResponse> = {
id: 'firecrawl_extract',

View File

@@ -1,5 +1,6 @@
import { createLogger } from '@sim/logger'
import { generateInternalToken } from '@/lib/auth/internal'
import { DEFAULT_EXECUTION_TIMEOUT_MS } from '@/lib/core/execution-limits'
import { secureFetchWithPinnedIP, validateUrlWithDNS } from '@/lib/core/security/input-validation'
import { generateRequestId } from '@/lib/core/utils/request'
import { getBaseUrl } from '@/lib/core/utils/urls'
@@ -625,9 +626,8 @@ async function executeToolRequest(
let response: Response
if (isInternalRoute) {
// Set up AbortController for timeout support on internal routes
const controller = new AbortController()
const timeout = requestParams.timeout || 300000
const timeout = requestParams.timeout || DEFAULT_EXECUTION_TIMEOUT_MS
const timeoutId = setTimeout(() => controller.abort(), timeout)
try {

View File

@@ -1,4 +1,5 @@
import { createLogger } from '@sim/logger'
import { getMaxExecutionTimeout } from '@/lib/core/execution-limits'
import { getBaseUrl } from '@/lib/core/utils/urls'
import { AGENT, isCustomTool } from '@/executor/constants'
import { getCustomTool } from '@/hooks/queries/custom-tools'
@@ -123,9 +124,7 @@ export function formatRequestParams(tool: ToolConfig, params: Record<string, any
}
}
// Get timeout from params (if specified) and validate
// Must be a finite positive number, max 600000ms (10 minutes) as documented
const MAX_TIMEOUT_MS = 600000
const MAX_TIMEOUT_MS = getMaxExecutionTimeout()
const rawTimeout = params.timeout
const timeout = rawTimeout != null ? Number(rawTimeout) : undefined
const validTimeout =

View File

@@ -6,7 +6,7 @@ export default defineConfig({
project: env.TRIGGER_PROJECT_ID!,
runtime: 'node',
logLevel: 'log',
maxDuration: 600,
maxDuration: 5400,
retries: {
enabledInDev: false,
default: {