mirror of
https://github.com/simstudioai/sim.git
synced 2026-02-04 03:35:04 -05:00
Compare commits
1 Commits
feat/timeo
...
staging
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
f811594875 |
@@ -213,25 +213,6 @@ Different subscription plans have different usage limits:
|
||||
| **Team** | $40/seat (pooled, adjustable) | 300 sync, 2,500 async |
|
||||
| **Enterprise** | Custom | Custom |
|
||||
|
||||
## Execution Time Limits
|
||||
|
||||
Workflows have maximum execution time limits based on your subscription plan:
|
||||
|
||||
| Plan | Sync Execution | Async Execution |
|
||||
|------|----------------|-----------------|
|
||||
| **Free** | 5 minutes | 10 minutes |
|
||||
| **Pro** | 60 minutes | 90 minutes |
|
||||
| **Team** | 60 minutes | 90 minutes |
|
||||
| **Enterprise** | 60 minutes | 90 minutes |
|
||||
|
||||
**Sync executions** run immediately and return results directly. These are triggered via the API with `async: false` (default) or through the UI.
|
||||
|
||||
**Async executions** (triggered via API with `async: true`, webhooks, or schedules) run in the background. Async time limits are 2x the sync limit, capped at 90 minutes.
|
||||
|
||||
<Callout type="info">
|
||||
If a workflow exceeds its time limit, it will be terminated and marked as failed with a timeout error. Design long-running workflows to use async execution or break them into smaller workflows.
|
||||
</Callout>
|
||||
|
||||
## Billing Model
|
||||
|
||||
Sim uses a **base subscription + overage** billing model:
|
||||
|
||||
@@ -11,7 +11,7 @@ import {
|
||||
Database,
|
||||
DollarSign,
|
||||
HardDrive,
|
||||
Timer,
|
||||
Workflow,
|
||||
} from 'lucide-react'
|
||||
import { useRouter } from 'next/navigation'
|
||||
import { cn } from '@/lib/core/utils/cn'
|
||||
@@ -44,7 +44,7 @@ interface PricingTier {
|
||||
const FREE_PLAN_FEATURES: PricingFeature[] = [
|
||||
{ icon: DollarSign, text: '$20 usage limit' },
|
||||
{ icon: HardDrive, text: '5GB file storage' },
|
||||
{ icon: Timer, text: '5 min execution limit' },
|
||||
{ icon: Workflow, text: 'Public template access' },
|
||||
{ icon: Database, text: 'Limited log retention' },
|
||||
{ icon: Code2, text: 'CLI/SDK Access' },
|
||||
]
|
||||
|
||||
@@ -4,12 +4,10 @@ import { createLogger } from '@sim/logger'
|
||||
import { and, eq, lt, sql } from 'drizzle-orm'
|
||||
import { type NextRequest, NextResponse } from 'next/server'
|
||||
import { verifyCronAuth } from '@/lib/auth/internal'
|
||||
import { getMaxExecutionTimeout } from '@/lib/core/execution-limits'
|
||||
|
||||
const logger = createLogger('CleanupStaleExecutions')
|
||||
|
||||
const STALE_THRESHOLD_MS = getMaxExecutionTimeout() + 5 * 60 * 1000
|
||||
const STALE_THRESHOLD_MINUTES = Math.ceil(STALE_THRESHOLD_MS / 60000)
|
||||
const STALE_THRESHOLD_MINUTES = 30
|
||||
const MAX_INT32 = 2_147_483_647
|
||||
|
||||
export async function GET(request: NextRequest) {
|
||||
|
||||
@@ -21,7 +21,6 @@ import { and, eq } from 'drizzle-orm'
|
||||
import { type NextRequest, NextResponse } from 'next/server'
|
||||
import { checkHybridAuth } from '@/lib/auth/hybrid'
|
||||
import { generateInternalToken } from '@/lib/auth/internal'
|
||||
import { getMaxExecutionTimeout } from '@/lib/core/execution-limits'
|
||||
import { getBaseUrl } from '@/lib/core/utils/urls'
|
||||
|
||||
const logger = createLogger('WorkflowMcpServeAPI')
|
||||
@@ -265,7 +264,7 @@ async function handleToolsCall(
|
||||
method: 'POST',
|
||||
headers,
|
||||
body: JSON.stringify({ input: params.arguments || {}, triggerType: 'mcp' }),
|
||||
signal: AbortSignal.timeout(getMaxExecutionTimeout()),
|
||||
signal: AbortSignal.timeout(600000), // 10 minute timeout
|
||||
})
|
||||
|
||||
const executeResult = await response.json()
|
||||
|
||||
@@ -1,8 +1,5 @@
|
||||
import { createLogger } from '@sim/logger'
|
||||
import type { NextRequest } from 'next/server'
|
||||
import { getHighestPrioritySubscription } from '@/lib/billing/core/plan'
|
||||
import { getExecutionTimeout } from '@/lib/core/execution-limits'
|
||||
import type { SubscriptionPlan } from '@/lib/core/rate-limiter/types'
|
||||
import { getParsedBody, withMcpAuth } from '@/lib/mcp/middleware'
|
||||
import { mcpService } from '@/lib/mcp/service'
|
||||
import type { McpTool, McpToolCall, McpToolResult } from '@/lib/mcp/types'
|
||||
@@ -10,6 +7,7 @@ import {
|
||||
categorizeError,
|
||||
createMcpErrorResponse,
|
||||
createMcpSuccessResponse,
|
||||
MCP_CONSTANTS,
|
||||
validateStringParam,
|
||||
} from '@/lib/mcp/utils'
|
||||
|
||||
@@ -173,16 +171,13 @@ export const POST = withMcpAuth('read')(
|
||||
arguments: args,
|
||||
}
|
||||
|
||||
const userSubscription = await getHighestPrioritySubscription(userId)
|
||||
const executionTimeout = getExecutionTimeout(
|
||||
userSubscription?.plan as SubscriptionPlan | undefined,
|
||||
'sync'
|
||||
)
|
||||
|
||||
const result = await Promise.race([
|
||||
mcpService.executeTool(userId, serverId, toolCall, workspaceId),
|
||||
new Promise<never>((_, reject) =>
|
||||
setTimeout(() => reject(new Error('Tool execution timeout')), executionTimeout)
|
||||
setTimeout(
|
||||
() => reject(new Error('Tool execution timeout')),
|
||||
MCP_CONSTANTS.EXECUTION_TIMEOUT
|
||||
)
|
||||
),
|
||||
])
|
||||
|
||||
|
||||
@@ -17,7 +17,7 @@ const logger = createLogger('DropboxUploadAPI')
|
||||
*/
|
||||
function httpHeaderSafeJson(value: object): string {
|
||||
return JSON.stringify(value).replace(/[\u007f-\uffff]/g, (c) => {
|
||||
return `\\u${(`0000${c.charCodeAt(0).toString(16)}`).slice(-4)}`
|
||||
return '\\u' + ('0000' + c.charCodeAt(0).toString(16)).slice(-4)
|
||||
})
|
||||
}
|
||||
|
||||
|
||||
@@ -2,7 +2,6 @@ import { createLogger } from '@sim/logger'
|
||||
import { type NextRequest, NextResponse } from 'next/server'
|
||||
import { extractAudioFromVideo, isVideoFile } from '@/lib/audio/extractor'
|
||||
import { checkInternalAuth } from '@/lib/auth/hybrid'
|
||||
import { DEFAULT_EXECUTION_TIMEOUT_MS } from '@/lib/core/execution-limits'
|
||||
import {
|
||||
secureFetchWithPinnedIP,
|
||||
validateUrlWithDNS,
|
||||
@@ -637,8 +636,7 @@ async function transcribeWithAssemblyAI(
|
||||
|
||||
let transcript: any
|
||||
let attempts = 0
|
||||
const pollIntervalMs = 5000
|
||||
const maxAttempts = Math.ceil(DEFAULT_EXECUTION_TIMEOUT_MS / pollIntervalMs)
|
||||
const maxAttempts = 60 // 5 minutes with 5-second intervals
|
||||
|
||||
while (attempts < maxAttempts) {
|
||||
const statusResponse = await fetch(`https://api.assemblyai.com/v2/transcript/${id}`, {
|
||||
|
||||
@@ -3,7 +3,6 @@ import { createLogger } from '@sim/logger'
|
||||
import { type NextRequest, NextResponse } from 'next/server'
|
||||
import { z } from 'zod'
|
||||
import { checkInternalAuth } from '@/lib/auth/hybrid'
|
||||
import { DEFAULT_EXECUTION_TIMEOUT_MS } from '@/lib/core/execution-limits'
|
||||
import { validateAwsRegion, validateS3BucketName } from '@/lib/core/security/input-validation'
|
||||
import {
|
||||
secureFetchWithPinnedIP,
|
||||
@@ -227,8 +226,8 @@ async function pollForJobCompletion(
|
||||
useAnalyzeDocument: boolean,
|
||||
requestId: string
|
||||
): Promise<Record<string, unknown>> {
|
||||
const pollIntervalMs = 5000
|
||||
const maxPollTimeMs = DEFAULT_EXECUTION_TIMEOUT_MS
|
||||
const pollIntervalMs = 5000 // 5 seconds between polls
|
||||
const maxPollTimeMs = 180000 // 3 minutes maximum polling time
|
||||
const maxAttempts = Math.ceil(maxPollTimeMs / pollIntervalMs)
|
||||
|
||||
const getTarget = useAnalyzeDocument
|
||||
|
||||
@@ -2,7 +2,6 @@ import { createLogger } from '@sim/logger'
|
||||
import type { NextRequest } from 'next/server'
|
||||
import { NextResponse } from 'next/server'
|
||||
import { checkInternalAuth } from '@/lib/auth/hybrid'
|
||||
import { DEFAULT_EXECUTION_TIMEOUT_MS } from '@/lib/core/execution-limits'
|
||||
import { validateAlphanumericId } from '@/lib/core/security/input-validation'
|
||||
import { getBaseUrl } from '@/lib/core/utils/urls'
|
||||
import { StorageService } from '@/lib/uploads'
|
||||
@@ -61,7 +60,7 @@ export async function POST(request: NextRequest) {
|
||||
text,
|
||||
model_id: modelId,
|
||||
}),
|
||||
signal: AbortSignal.timeout(DEFAULT_EXECUTION_TIMEOUT_MS),
|
||||
signal: AbortSignal.timeout(60000),
|
||||
})
|
||||
|
||||
if (!response.ok) {
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
import { createLogger } from '@sim/logger'
|
||||
import { type NextRequest, NextResponse } from 'next/server'
|
||||
import { checkInternalAuth } from '@/lib/auth/hybrid'
|
||||
import { getMaxExecutionTimeout } from '@/lib/core/execution-limits'
|
||||
import { downloadFileFromStorage } from '@/lib/uploads/utils/file-utils.server'
|
||||
import type { UserFile } from '@/executor/types'
|
||||
import type { VideoRequestBody } from '@/tools/video/types'
|
||||
@@ -327,12 +326,11 @@ async function generateWithRunway(
|
||||
|
||||
logger.info(`[${requestId}] Runway task created: ${taskId}`)
|
||||
|
||||
const pollIntervalMs = 5000
|
||||
const maxAttempts = Math.ceil(getMaxExecutionTimeout() / pollIntervalMs)
|
||||
const maxAttempts = 120 // 10 minutes with 5-second intervals
|
||||
let attempts = 0
|
||||
|
||||
while (attempts < maxAttempts) {
|
||||
await sleep(pollIntervalMs)
|
||||
await sleep(5000) // Poll every 5 seconds
|
||||
|
||||
const statusResponse = await fetch(`https://api.dev.runwayml.com/v1/tasks/${taskId}`, {
|
||||
headers: {
|
||||
@@ -372,7 +370,7 @@ async function generateWithRunway(
|
||||
attempts++
|
||||
}
|
||||
|
||||
throw new Error('Runway generation timed out')
|
||||
throw new Error('Runway generation timed out after 10 minutes')
|
||||
}
|
||||
|
||||
async function generateWithVeo(
|
||||
@@ -431,12 +429,11 @@ async function generateWithVeo(
|
||||
|
||||
logger.info(`[${requestId}] Veo operation created: ${operationName}`)
|
||||
|
||||
const pollIntervalMs = 5000
|
||||
const maxAttempts = Math.ceil(getMaxExecutionTimeout() / pollIntervalMs)
|
||||
const maxAttempts = 60 // 5 minutes with 5-second intervals
|
||||
let attempts = 0
|
||||
|
||||
while (attempts < maxAttempts) {
|
||||
await sleep(pollIntervalMs)
|
||||
await sleep(5000)
|
||||
|
||||
const statusResponse = await fetch(
|
||||
`https://generativelanguage.googleapis.com/v1beta/${operationName}`,
|
||||
@@ -488,7 +485,7 @@ async function generateWithVeo(
|
||||
attempts++
|
||||
}
|
||||
|
||||
throw new Error('Veo generation timed out')
|
||||
throw new Error('Veo generation timed out after 5 minutes')
|
||||
}
|
||||
|
||||
async function generateWithLuma(
|
||||
@@ -544,12 +541,11 @@ async function generateWithLuma(
|
||||
|
||||
logger.info(`[${requestId}] Luma generation created: ${generationId}`)
|
||||
|
||||
const pollIntervalMs = 5000
|
||||
const maxAttempts = Math.ceil(getMaxExecutionTimeout() / pollIntervalMs)
|
||||
const maxAttempts = 120 // 10 minutes
|
||||
let attempts = 0
|
||||
|
||||
while (attempts < maxAttempts) {
|
||||
await sleep(pollIntervalMs)
|
||||
await sleep(5000)
|
||||
|
||||
const statusResponse = await fetch(
|
||||
`https://api.lumalabs.ai/dream-machine/v1/generations/${generationId}`,
|
||||
@@ -596,7 +592,7 @@ async function generateWithLuma(
|
||||
attempts++
|
||||
}
|
||||
|
||||
throw new Error('Luma generation timed out')
|
||||
throw new Error('Luma generation timed out after 10 minutes')
|
||||
}
|
||||
|
||||
async function generateWithMiniMax(
|
||||
@@ -662,13 +658,14 @@ async function generateWithMiniMax(
|
||||
|
||||
logger.info(`[${requestId}] MiniMax task created: ${taskId}`)
|
||||
|
||||
const pollIntervalMs = 5000
|
||||
const maxAttempts = Math.ceil(getMaxExecutionTimeout() / pollIntervalMs)
|
||||
// Poll for completion (6-10 minutes typical)
|
||||
const maxAttempts = 120 // 10 minutes with 5-second intervals
|
||||
let attempts = 0
|
||||
|
||||
while (attempts < maxAttempts) {
|
||||
await sleep(pollIntervalMs)
|
||||
await sleep(5000)
|
||||
|
||||
// Query task status
|
||||
const statusResponse = await fetch(
|
||||
`https://api.minimax.io/v1/query/video_generation?task_id=${taskId}`,
|
||||
{
|
||||
@@ -746,7 +743,7 @@ async function generateWithMiniMax(
|
||||
attempts++
|
||||
}
|
||||
|
||||
throw new Error('MiniMax generation timed out')
|
||||
throw new Error('MiniMax generation timed out after 10 minutes')
|
||||
}
|
||||
|
||||
// Helper function to strip subpaths from Fal.ai model IDs for status/result endpoints
|
||||
@@ -864,12 +861,11 @@ async function generateWithFalAI(
|
||||
// Get base model ID (without subpath) for status and result endpoints
|
||||
const baseModelId = getBaseModelId(falModelId)
|
||||
|
||||
const pollIntervalMs = 5000
|
||||
const maxAttempts = Math.ceil(getMaxExecutionTimeout() / pollIntervalMs)
|
||||
const maxAttempts = 96 // 8 minutes with 5-second intervals
|
||||
let attempts = 0
|
||||
|
||||
while (attempts < maxAttempts) {
|
||||
await sleep(pollIntervalMs)
|
||||
await sleep(5000)
|
||||
|
||||
const statusResponse = await fetch(
|
||||
`https://queue.fal.run/${baseModelId}/requests/${requestIdFal}/status`,
|
||||
@@ -942,7 +938,7 @@ async function generateWithFalAI(
|
||||
attempts++
|
||||
}
|
||||
|
||||
throw new Error('Fal.ai generation timed out')
|
||||
throw new Error('Fal.ai generation timed out after 8 minutes')
|
||||
}
|
||||
|
||||
function getVideoDimensions(
|
||||
|
||||
@@ -3,7 +3,6 @@ import { type NextRequest, NextResponse } from 'next/server'
|
||||
import { v4 as uuidv4 } from 'uuid'
|
||||
import { z } from 'zod'
|
||||
import { checkHybridAuth } from '@/lib/auth/hybrid'
|
||||
import { getTimeoutErrorMessage, isTimeoutError } from '@/lib/core/execution-limits'
|
||||
import { generateRequestId } from '@/lib/core/utils/request'
|
||||
import { SSE_HEADERS } from '@/lib/core/utils/sse'
|
||||
import { markExecutionCancelled } from '@/lib/execution/cancellation'
|
||||
@@ -117,16 +116,6 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
|
||||
const loggingSession = new LoggingSession(workflowId, executionId, 'manual', requestId)
|
||||
const abortController = new AbortController()
|
||||
let isStreamClosed = false
|
||||
let isTimedOut = false
|
||||
|
||||
const syncTimeout = preprocessResult.executionTimeout?.sync
|
||||
let timeoutId: NodeJS.Timeout | undefined
|
||||
if (syncTimeout) {
|
||||
timeoutId = setTimeout(() => {
|
||||
isTimedOut = true
|
||||
abortController.abort()
|
||||
}, syncTimeout)
|
||||
}
|
||||
|
||||
const stream = new ReadableStream<Uint8Array>({
|
||||
async start(controller) {
|
||||
@@ -178,33 +167,13 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
|
||||
})
|
||||
|
||||
if (result.status === 'cancelled') {
|
||||
if (isTimedOut && syncTimeout) {
|
||||
const timeoutErrorMessage = getTimeoutErrorMessage(null, syncTimeout)
|
||||
logger.info(`[${requestId}] Run-from-block execution timed out`, {
|
||||
timeoutMs: syncTimeout,
|
||||
})
|
||||
|
||||
await loggingSession.markAsFailed(timeoutErrorMessage)
|
||||
|
||||
sendEvent({
|
||||
type: 'execution:error',
|
||||
timestamp: new Date().toISOString(),
|
||||
executionId,
|
||||
workflowId,
|
||||
data: {
|
||||
error: timeoutErrorMessage,
|
||||
duration: result.metadata?.duration || 0,
|
||||
},
|
||||
})
|
||||
} else {
|
||||
sendEvent({
|
||||
type: 'execution:cancelled',
|
||||
timestamp: new Date().toISOString(),
|
||||
executionId,
|
||||
workflowId,
|
||||
data: { duration: result.metadata?.duration || 0 },
|
||||
})
|
||||
}
|
||||
sendEvent({
|
||||
type: 'execution:cancelled',
|
||||
timestamp: new Date().toISOString(),
|
||||
executionId,
|
||||
workflowId,
|
||||
data: { duration: result.metadata?.duration || 0 },
|
||||
})
|
||||
} else {
|
||||
sendEvent({
|
||||
type: 'execution:completed',
|
||||
@@ -221,25 +190,11 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
|
||||
})
|
||||
}
|
||||
} catch (error: unknown) {
|
||||
const isTimeout = isTimeoutError(error) || isTimedOut
|
||||
const errorMessage = isTimeout
|
||||
? getTimeoutErrorMessage(error, syncTimeout)
|
||||
: error instanceof Error
|
||||
? error.message
|
||||
: 'Unknown error'
|
||||
|
||||
logger.error(`[${requestId}] Run-from-block execution failed: ${errorMessage}`, {
|
||||
isTimeout,
|
||||
})
|
||||
const errorMessage = error instanceof Error ? error.message : 'Unknown error'
|
||||
logger.error(`[${requestId}] Run-from-block execution failed: ${errorMessage}`)
|
||||
|
||||
const executionResult = hasExecutionResult(error) ? error.executionResult : undefined
|
||||
|
||||
await loggingSession.safeCompleteWithError({
|
||||
totalDurationMs: executionResult?.metadata?.duration,
|
||||
error: { message: errorMessage },
|
||||
traceSpans: executionResult?.logs as any,
|
||||
})
|
||||
|
||||
sendEvent({
|
||||
type: 'execution:error',
|
||||
timestamp: new Date().toISOString(),
|
||||
@@ -251,7 +206,6 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
|
||||
},
|
||||
})
|
||||
} finally {
|
||||
if (timeoutId) clearTimeout(timeoutId)
|
||||
if (!isStreamClosed) {
|
||||
try {
|
||||
controller.enqueue(new TextEncoder().encode('data: [DONE]\n\n'))
|
||||
@@ -262,7 +216,6 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
|
||||
},
|
||||
cancel() {
|
||||
isStreamClosed = true
|
||||
if (timeoutId) clearTimeout(timeoutId)
|
||||
abortController.abort()
|
||||
markExecutionCancelled(executionId).catch(() => {})
|
||||
},
|
||||
|
||||
@@ -5,11 +5,6 @@ import { validate as uuidValidate, v4 as uuidv4 } from 'uuid'
|
||||
import { z } from 'zod'
|
||||
import { checkHybridAuth } from '@/lib/auth/hybrid'
|
||||
import { isTriggerDevEnabled } from '@/lib/core/config/feature-flags'
|
||||
import {
|
||||
createTimeoutAbortController,
|
||||
getTimeoutErrorMessage,
|
||||
isTimeoutError,
|
||||
} from '@/lib/core/execution-limits'
|
||||
import { generateRequestId } from '@/lib/core/utils/request'
|
||||
import { SSE_HEADERS } from '@/lib/core/utils/sse'
|
||||
import { getBaseUrl } from '@/lib/core/utils/urls'
|
||||
@@ -125,6 +120,10 @@ type AsyncExecutionParams = {
|
||||
triggerType: CoreTriggerType
|
||||
}
|
||||
|
||||
/**
|
||||
* Handles async workflow execution by queueing a background job.
|
||||
* Returns immediately with a 202 Accepted response containing the job ID.
|
||||
*/
|
||||
async function handleAsyncExecution(params: AsyncExecutionParams): Promise<NextResponse> {
|
||||
const { requestId, workflowId, userId, input, triggerType } = params
|
||||
|
||||
@@ -406,10 +405,6 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
|
||||
|
||||
if (!enableSSE) {
|
||||
logger.info(`[${requestId}] Using non-SSE execution (direct JSON response)`)
|
||||
const timeoutController = createTimeoutAbortController(
|
||||
preprocessResult.executionTimeout?.sync
|
||||
)
|
||||
|
||||
try {
|
||||
const metadata: ExecutionMetadata = {
|
||||
requestId,
|
||||
@@ -443,39 +438,8 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
|
||||
includeFileBase64,
|
||||
base64MaxBytes,
|
||||
stopAfterBlockId,
|
||||
abortSignal: timeoutController.signal,
|
||||
})
|
||||
|
||||
if (
|
||||
result.status === 'cancelled' &&
|
||||
timeoutController.isTimedOut() &&
|
||||
timeoutController.timeoutMs
|
||||
) {
|
||||
const timeoutErrorMessage = getTimeoutErrorMessage(null, timeoutController.timeoutMs)
|
||||
logger.info(`[${requestId}] Non-SSE execution timed out`, {
|
||||
timeoutMs: timeoutController.timeoutMs,
|
||||
})
|
||||
await loggingSession.markAsFailed(timeoutErrorMessage)
|
||||
|
||||
await cleanupExecutionBase64Cache(executionId)
|
||||
|
||||
return NextResponse.json(
|
||||
{
|
||||
success: false,
|
||||
output: result.output,
|
||||
error: timeoutErrorMessage,
|
||||
metadata: result.metadata
|
||||
? {
|
||||
duration: result.metadata.duration,
|
||||
startTime: result.metadata.startTime,
|
||||
endTime: result.metadata.endTime,
|
||||
}
|
||||
: undefined,
|
||||
},
|
||||
{ status: 408 }
|
||||
)
|
||||
}
|
||||
|
||||
const outputWithBase64 = includeFileBase64
|
||||
? ((await hydrateUserFilesWithBase64(result.output, {
|
||||
requestId,
|
||||
@@ -510,17 +474,10 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
|
||||
return NextResponse.json(filteredResult)
|
||||
} catch (error: unknown) {
|
||||
const errorMessage = error instanceof Error ? error.message : 'Unknown error'
|
||||
|
||||
logger.error(`[${requestId}] Non-SSE execution failed: ${errorMessage}`)
|
||||
|
||||
const executionResult = hasExecutionResult(error) ? error.executionResult : undefined
|
||||
|
||||
await loggingSession.safeCompleteWithError({
|
||||
totalDurationMs: executionResult?.metadata?.duration,
|
||||
error: { message: errorMessage },
|
||||
traceSpans: executionResult?.logs as any,
|
||||
})
|
||||
|
||||
return NextResponse.json(
|
||||
{
|
||||
success: false,
|
||||
@@ -536,8 +493,6 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
|
||||
},
|
||||
{ status: 500 }
|
||||
)
|
||||
} finally {
|
||||
timeoutController.cleanup()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -551,6 +506,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
|
||||
cachedWorkflowData?.blocks || {}
|
||||
)
|
||||
const streamVariables = cachedWorkflowData?.variables ?? (workflow as any).variables
|
||||
|
||||
const stream = await createStreamingResponse({
|
||||
requestId,
|
||||
workflow: {
|
||||
@@ -568,7 +524,6 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
|
||||
workflowTriggerType: triggerType === 'chat' ? 'chat' : 'api',
|
||||
includeFileBase64,
|
||||
base64MaxBytes,
|
||||
timeoutMs: preprocessResult.executionTimeout?.sync,
|
||||
},
|
||||
executionId,
|
||||
})
|
||||
@@ -580,7 +535,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
|
||||
}
|
||||
|
||||
const encoder = new TextEncoder()
|
||||
const timeoutController = createTimeoutAbortController(preprocessResult.executionTimeout?.sync)
|
||||
const abortController = new AbortController()
|
||||
let isStreamClosed = false
|
||||
|
||||
const stream = new ReadableStream<Uint8Array>({
|
||||
@@ -776,7 +731,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
|
||||
onStream,
|
||||
},
|
||||
loggingSession,
|
||||
abortSignal: timeoutController.signal,
|
||||
abortSignal: abortController.signal,
|
||||
includeFileBase64,
|
||||
base64MaxBytes,
|
||||
stopAfterBlockId,
|
||||
@@ -812,37 +767,16 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
|
||||
}
|
||||
|
||||
if (result.status === 'cancelled') {
|
||||
if (timeoutController.isTimedOut() && timeoutController.timeoutMs) {
|
||||
const timeoutErrorMessage = getTimeoutErrorMessage(null, timeoutController.timeoutMs)
|
||||
logger.info(`[${requestId}] Workflow execution timed out`, {
|
||||
timeoutMs: timeoutController.timeoutMs,
|
||||
})
|
||||
|
||||
await loggingSession.markAsFailed(timeoutErrorMessage)
|
||||
|
||||
sendEvent({
|
||||
type: 'execution:error',
|
||||
timestamp: new Date().toISOString(),
|
||||
executionId,
|
||||
workflowId,
|
||||
data: {
|
||||
error: timeoutErrorMessage,
|
||||
duration: result.metadata?.duration || 0,
|
||||
},
|
||||
})
|
||||
} else {
|
||||
logger.info(`[${requestId}] Workflow execution was cancelled`)
|
||||
|
||||
sendEvent({
|
||||
type: 'execution:cancelled',
|
||||
timestamp: new Date().toISOString(),
|
||||
executionId,
|
||||
workflowId,
|
||||
data: {
|
||||
duration: result.metadata?.duration || 0,
|
||||
},
|
||||
})
|
||||
}
|
||||
logger.info(`[${requestId}] Workflow execution was cancelled`)
|
||||
sendEvent({
|
||||
type: 'execution:cancelled',
|
||||
timestamp: new Date().toISOString(),
|
||||
executionId,
|
||||
workflowId,
|
||||
data: {
|
||||
duration: result.metadata?.duration || 0,
|
||||
},
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
@@ -869,23 +803,11 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
|
||||
// Cleanup base64 cache for this execution
|
||||
await cleanupExecutionBase64Cache(executionId)
|
||||
} catch (error: unknown) {
|
||||
const isTimeout = isTimeoutError(error) || timeoutController.isTimedOut()
|
||||
const errorMessage = isTimeout
|
||||
? getTimeoutErrorMessage(error, timeoutController.timeoutMs)
|
||||
: error instanceof Error
|
||||
? error.message
|
||||
: 'Unknown error'
|
||||
|
||||
logger.error(`[${requestId}] SSE execution failed: ${errorMessage}`, { isTimeout })
|
||||
const errorMessage = error instanceof Error ? error.message : 'Unknown error'
|
||||
logger.error(`[${requestId}] SSE execution failed: ${errorMessage}`)
|
||||
|
||||
const executionResult = hasExecutionResult(error) ? error.executionResult : undefined
|
||||
|
||||
await loggingSession.safeCompleteWithError({
|
||||
totalDurationMs: executionResult?.metadata?.duration,
|
||||
error: { message: errorMessage },
|
||||
traceSpans: executionResult?.logs as any,
|
||||
})
|
||||
|
||||
sendEvent({
|
||||
type: 'execution:error',
|
||||
timestamp: new Date().toISOString(),
|
||||
@@ -897,20 +819,20 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
|
||||
},
|
||||
})
|
||||
} finally {
|
||||
timeoutController.cleanup()
|
||||
if (!isStreamClosed) {
|
||||
try {
|
||||
controller.enqueue(encoder.encode('data: [DONE]\n\n'))
|
||||
controller.close()
|
||||
} catch {}
|
||||
} catch {
|
||||
// Stream already closed - nothing to do
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
cancel() {
|
||||
isStreamClosed = true
|
||||
timeoutController.cleanup()
|
||||
logger.info(`[${requestId}] Client aborted SSE stream, signalling cancellation`)
|
||||
timeoutController.abort()
|
||||
abortController.abort()
|
||||
markExecutionCancelled(executionId).catch(() => {})
|
||||
},
|
||||
})
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
import type React from 'react'
|
||||
import { AlertTriangleIcon, BanIcon, RepeatIcon, SplitIcon, XCircleIcon } from 'lucide-react'
|
||||
import { RepeatIcon, SplitIcon } from 'lucide-react'
|
||||
import { getBlock } from '@/blocks'
|
||||
import { TERMINAL_BLOCK_COLUMN_WIDTH } from '@/stores/constants'
|
||||
import type { ConsoleEntry } from '@/stores/terminal'
|
||||
@@ -12,15 +12,6 @@ const SUBFLOW_COLORS = {
|
||||
parallel: '#FEE12B',
|
||||
} as const
|
||||
|
||||
/**
|
||||
* Special block type colors for errors and system messages
|
||||
*/
|
||||
const SPECIAL_BLOCK_COLORS = {
|
||||
error: '#ef4444',
|
||||
validation: '#f59e0b',
|
||||
cancelled: '#6b7280',
|
||||
} as const
|
||||
|
||||
/**
|
||||
* Retrieves the icon component for a given block type
|
||||
*/
|
||||
@@ -41,18 +32,6 @@ export function getBlockIcon(
|
||||
return SplitIcon
|
||||
}
|
||||
|
||||
if (blockType === 'error') {
|
||||
return XCircleIcon
|
||||
}
|
||||
|
||||
if (blockType === 'validation') {
|
||||
return AlertTriangleIcon
|
||||
}
|
||||
|
||||
if (blockType === 'cancelled') {
|
||||
return BanIcon
|
||||
}
|
||||
|
||||
return null
|
||||
}
|
||||
|
||||
@@ -71,16 +50,6 @@ export function getBlockColor(blockType: string): string {
|
||||
if (blockType === 'parallel') {
|
||||
return SUBFLOW_COLORS.parallel
|
||||
}
|
||||
// Special block types for errors and system messages
|
||||
if (blockType === 'error') {
|
||||
return SPECIAL_BLOCK_COLORS.error
|
||||
}
|
||||
if (blockType === 'validation') {
|
||||
return SPECIAL_BLOCK_COLORS.validation
|
||||
}
|
||||
if (blockType === 'cancelled') {
|
||||
return SPECIAL_BLOCK_COLORS.cancelled
|
||||
}
|
||||
return '#6b7280'
|
||||
}
|
||||
|
||||
|
||||
@@ -27,7 +27,7 @@ import { useExecutionStore } from '@/stores/execution'
|
||||
import { useNotificationStore } from '@/stores/notifications'
|
||||
import { useVariablesStore } from '@/stores/panel'
|
||||
import { useEnvironmentStore } from '@/stores/settings/environment'
|
||||
import { useTerminalConsoleStore } from '@/stores/terminal'
|
||||
import { type ConsoleEntry, useTerminalConsoleStore } from '@/stores/terminal'
|
||||
import { useWorkflowDiffStore } from '@/stores/workflow-diff'
|
||||
import { useWorkflowRegistry } from '@/stores/workflows/registry/store'
|
||||
import { mergeSubblockState } from '@/stores/workflows/utils'
|
||||
@@ -789,12 +789,7 @@ export function useWorkflowExecution() {
|
||||
const startBlock = TriggerUtils.findStartBlock(filteredStates, 'chat')
|
||||
|
||||
if (!startBlock) {
|
||||
throw new WorkflowValidationError(
|
||||
TriggerUtils.getTriggerValidationMessage('chat', 'missing'),
|
||||
'validation',
|
||||
'validation',
|
||||
'Workflow Validation'
|
||||
)
|
||||
throw new Error(TriggerUtils.getTriggerValidationMessage('chat', 'missing'))
|
||||
}
|
||||
|
||||
startBlockId = startBlock.blockId
|
||||
@@ -805,12 +800,7 @@ export function useWorkflowExecution() {
|
||||
})
|
||||
|
||||
if (candidates.length === 0) {
|
||||
const error = new WorkflowValidationError(
|
||||
'Workflow requires at least one trigger block to execute',
|
||||
'validation',
|
||||
'validation',
|
||||
'Workflow Validation'
|
||||
)
|
||||
const error = new Error('Workflow requires at least one trigger block to execute')
|
||||
logger.error('No trigger blocks found for manual run', {
|
||||
allBlockTypes: Object.values(filteredStates).map((b) => b.type),
|
||||
})
|
||||
@@ -823,12 +813,7 @@ export function useWorkflowExecution() {
|
||||
(candidate) => candidate.path === StartBlockPath.SPLIT_API
|
||||
)
|
||||
if (apiCandidates.length > 1) {
|
||||
const error = new WorkflowValidationError(
|
||||
'Multiple API Trigger blocks found. Keep only one.',
|
||||
'validation',
|
||||
'validation',
|
||||
'Workflow Validation'
|
||||
)
|
||||
const error = new Error('Multiple API Trigger blocks found. Keep only one.')
|
||||
logger.error('Multiple API triggers found')
|
||||
setIsExecuting(false)
|
||||
throw error
|
||||
@@ -848,12 +833,7 @@ export function useWorkflowExecution() {
|
||||
const outgoingConnections = workflowEdges.filter((edge) => edge.source === startBlockId)
|
||||
if (outgoingConnections.length === 0) {
|
||||
const triggerName = selectedTrigger.name || selectedTrigger.type
|
||||
const error = new WorkflowValidationError(
|
||||
`${triggerName} must be connected to other blocks to execute`,
|
||||
'validation',
|
||||
'validation',
|
||||
'Workflow Validation'
|
||||
)
|
||||
const error = new Error(`${triggerName} must be connected to other blocks to execute`)
|
||||
logger.error('Trigger has no outgoing connections', { triggerName, startBlockId })
|
||||
setIsExecuting(false)
|
||||
throw error
|
||||
@@ -879,12 +859,7 @@ export function useWorkflowExecution() {
|
||||
|
||||
// If we don't have a valid startBlockId at this point, throw an error
|
||||
if (!startBlockId) {
|
||||
const error = new WorkflowValidationError(
|
||||
'No valid trigger block found to start execution',
|
||||
'validation',
|
||||
'validation',
|
||||
'Workflow Validation'
|
||||
)
|
||||
const error = new Error('No valid trigger block found to start execution')
|
||||
logger.error('No startBlockId found after trigger search')
|
||||
setIsExecuting(false)
|
||||
throw error
|
||||
@@ -1182,20 +1157,17 @@ export function useWorkflowExecution() {
|
||||
logs: accumulatedBlockLogs,
|
||||
}
|
||||
|
||||
if (activeWorkflowId) {
|
||||
cancelRunningEntries(activeWorkflowId)
|
||||
}
|
||||
// Only add workflow-level error if no blocks have executed yet
|
||||
// This catches pre-execution errors (validation, serialization, etc.)
|
||||
// Block execution errors are already logged via onBlockError callback
|
||||
const { entries } = useTerminalConsoleStore.getState()
|
||||
const existingLogs = entries.filter(
|
||||
(log: ConsoleEntry) => log.executionId === executionId
|
||||
)
|
||||
|
||||
const isPreExecutionError = accumulatedBlockLogs.length === 0
|
||||
// Check if any block already has this error - don't duplicate block errors
|
||||
const blockAlreadyHasError = accumulatedBlockLogs.some((log) => log.error)
|
||||
|
||||
// Only add workflow-level error entry for:
|
||||
// 1. Pre-execution errors (validation) - no blocks ran
|
||||
// 2. Workflow-level errors (timeout, etc.) - no block has the error
|
||||
if (isPreExecutionError || !blockAlreadyHasError) {
|
||||
// Determine if this is a timeout error based on the error message
|
||||
const isTimeout = data.error?.toLowerCase().includes('timed out')
|
||||
if (existingLogs.length === 0) {
|
||||
// No blocks executed yet - this is a pre-execution error
|
||||
// Use 0 for executionOrder so validation errors appear first
|
||||
addConsole({
|
||||
input: {},
|
||||
output: {},
|
||||
@@ -1203,46 +1175,21 @@ export function useWorkflowExecution() {
|
||||
error: data.error,
|
||||
durationMs: data.duration || 0,
|
||||
startedAt: new Date(Date.now() - (data.duration || 0)).toISOString(),
|
||||
executionOrder: isPreExecutionError ? 0 : Number.MAX_SAFE_INTEGER,
|
||||
executionOrder: 0,
|
||||
endedAt: new Date().toISOString(),
|
||||
workflowId: activeWorkflowId,
|
||||
blockId: isPreExecutionError
|
||||
? 'validation'
|
||||
: isTimeout
|
||||
? 'timeout-error'
|
||||
: 'execution-error',
|
||||
blockId: 'validation',
|
||||
executionId,
|
||||
blockName: isPreExecutionError
|
||||
? 'Workflow Validation'
|
||||
: isTimeout
|
||||
? 'Timeout Error'
|
||||
: 'Execution Error',
|
||||
blockType: isPreExecutionError ? 'validation' : 'error',
|
||||
blockName: 'Workflow Validation',
|
||||
blockType: 'validation',
|
||||
})
|
||||
}
|
||||
},
|
||||
|
||||
onExecutionCancelled: (data) => {
|
||||
onExecutionCancelled: () => {
|
||||
if (activeWorkflowId) {
|
||||
cancelRunningEntries(activeWorkflowId)
|
||||
}
|
||||
|
||||
// Add console entry for cancellation
|
||||
addConsole({
|
||||
input: {},
|
||||
output: {},
|
||||
success: false,
|
||||
error: 'Execution was cancelled',
|
||||
durationMs: data?.duration || 0,
|
||||
startedAt: new Date(Date.now() - (data?.duration || 0)).toISOString(),
|
||||
executionOrder: Number.MAX_SAFE_INTEGER,
|
||||
endedAt: new Date().toISOString(),
|
||||
workflowId: activeWorkflowId,
|
||||
blockId: 'cancelled',
|
||||
executionId,
|
||||
blockName: 'Execution Cancelled',
|
||||
blockType: 'cancelled',
|
||||
})
|
||||
},
|
||||
},
|
||||
})
|
||||
@@ -1789,54 +1736,17 @@ export function useWorkflowExecution() {
|
||||
'Workflow was modified. Run the workflow again to enable running from block.',
|
||||
workflowId,
|
||||
})
|
||||
}
|
||||
|
||||
cancelRunningEntries(workflowId)
|
||||
|
||||
// Check if any block already has an error - don't duplicate block errors
|
||||
const blockAlreadyHasError = accumulatedBlockLogs.some((log) => log.error)
|
||||
|
||||
// Only add execution error entry if no block has the error
|
||||
if (!blockAlreadyHasError) {
|
||||
// Determine if this is a timeout error based on the error message
|
||||
const isTimeout = data.error?.toLowerCase().includes('timed out')
|
||||
addConsole({
|
||||
input: {},
|
||||
output: {},
|
||||
success: false,
|
||||
error: data.error,
|
||||
durationMs: data.duration || 0,
|
||||
startedAt: new Date(Date.now() - (data.duration || 0)).toISOString(),
|
||||
executionOrder: Number.MAX_SAFE_INTEGER,
|
||||
endedAt: new Date().toISOString(),
|
||||
} else {
|
||||
addNotification({
|
||||
level: 'error',
|
||||
message: data.error || 'Run from block failed',
|
||||
workflowId,
|
||||
blockId: isTimeout ? 'timeout-error' : 'execution-error',
|
||||
executionId,
|
||||
blockName: isTimeout ? 'Timeout Error' : 'Execution Error',
|
||||
blockType: 'error',
|
||||
})
|
||||
}
|
||||
},
|
||||
|
||||
onExecutionCancelled: (data) => {
|
||||
onExecutionCancelled: () => {
|
||||
cancelRunningEntries(workflowId)
|
||||
|
||||
// Add console entry for cancellation
|
||||
addConsole({
|
||||
input: {},
|
||||
output: {},
|
||||
success: false,
|
||||
error: 'Execution was cancelled',
|
||||
durationMs: data?.duration || 0,
|
||||
startedAt: new Date(Date.now() - (data?.duration || 0)).toISOString(),
|
||||
executionOrder: Number.MAX_SAFE_INTEGER,
|
||||
endedAt: new Date().toISOString(),
|
||||
workflowId,
|
||||
blockId: 'cancelled',
|
||||
executionId,
|
||||
blockName: 'Execution Cancelled',
|
||||
blockType: 'cancelled',
|
||||
})
|
||||
},
|
||||
},
|
||||
})
|
||||
|
||||
@@ -1,11 +1,11 @@
|
||||
import {
|
||||
Building2,
|
||||
Clock,
|
||||
Database,
|
||||
HardDrive,
|
||||
HeadphonesIcon,
|
||||
Server,
|
||||
ShieldCheck,
|
||||
Timer,
|
||||
Users,
|
||||
Zap,
|
||||
} from 'lucide-react'
|
||||
@@ -15,8 +15,8 @@ import type { PlanFeature } from '@/app/workspace/[workspaceId]/w/components/sid
|
||||
export const PRO_PLAN_FEATURES: PlanFeature[] = [
|
||||
{ icon: Zap, text: '150 runs per minute (sync)' },
|
||||
{ icon: Clock, text: '1,000 runs per minute (async)' },
|
||||
{ icon: Timer, text: '60 min sync execution limit' },
|
||||
{ icon: HardDrive, text: '50GB file storage' },
|
||||
{ icon: Building2, text: 'Unlimited workspaces' },
|
||||
{ icon: Users, text: 'Unlimited invites' },
|
||||
{ icon: Database, text: 'Unlimited log retention' },
|
||||
]
|
||||
@@ -24,8 +24,8 @@ export const PRO_PLAN_FEATURES: PlanFeature[] = [
|
||||
export const TEAM_PLAN_FEATURES: PlanFeature[] = [
|
||||
{ icon: Zap, text: '300 runs per minute (sync)' },
|
||||
{ icon: Clock, text: '2,500 runs per minute (async)' },
|
||||
{ icon: Timer, text: '60 min sync execution limit' },
|
||||
{ icon: HardDrive, text: '500GB file storage (pooled)' },
|
||||
{ icon: Building2, text: 'Unlimited workspaces' },
|
||||
{ icon: Users, text: 'Unlimited invites' },
|
||||
{ icon: Database, text: 'Unlimited log retention' },
|
||||
{ icon: SlackMonoIcon, text: 'Dedicated Slack channel' },
|
||||
|
||||
@@ -14,6 +14,7 @@ import { createLogger } from '@sim/logger'
|
||||
import { useParams } from 'next/navigation'
|
||||
import { io, type Socket } from 'socket.io-client'
|
||||
import { getEnv } from '@/lib/core/config/env'
|
||||
import { useOperationQueueStore } from '@/stores/operation-queue/store'
|
||||
|
||||
const logger = createLogger('SocketContext')
|
||||
|
||||
@@ -138,6 +139,7 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
|
||||
const [authFailed, setAuthFailed] = useState(false)
|
||||
const initializedRef = useRef(false)
|
||||
const socketRef = useRef<Socket | null>(null)
|
||||
const triggerOfflineMode = useOperationQueueStore((state) => state.triggerOfflineMode)
|
||||
|
||||
const params = useParams()
|
||||
const urlWorkflowId = params?.workflowId as string | undefined
|
||||
@@ -341,9 +343,12 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
|
||||
})
|
||||
})
|
||||
|
||||
socketInstance.on('join-workflow-error', ({ error }) => {
|
||||
socketInstance.on('join-workflow-error', ({ error, code }) => {
|
||||
isRejoiningRef.current = false
|
||||
logger.error('Failed to join workflow:', error)
|
||||
logger.error('Failed to join workflow:', { error, code })
|
||||
if (code === 'ROOM_MANAGER_UNAVAILABLE') {
|
||||
triggerOfflineMode()
|
||||
}
|
||||
})
|
||||
|
||||
socketInstance.on('workflow-operation', (data) => {
|
||||
|
||||
@@ -4,7 +4,6 @@ import { task } from '@trigger.dev/sdk'
|
||||
import { Cron } from 'croner'
|
||||
import { eq } from 'drizzle-orm'
|
||||
import { v4 as uuidv4 } from 'uuid'
|
||||
import { createTimeoutAbortController, getTimeoutErrorMessage } from '@/lib/core/execution-limits'
|
||||
import { preprocessExecution } from '@/lib/execution/preprocessing'
|
||||
import { LoggingSession } from '@/lib/logs/execution/logging-session'
|
||||
import { buildTraceSpans } from '@/lib/logs/execution/trace-spans/trace-spans'
|
||||
@@ -121,7 +120,6 @@ async function runWorkflowExecution({
|
||||
loggingSession,
|
||||
requestId,
|
||||
executionId,
|
||||
asyncTimeout,
|
||||
}: {
|
||||
payload: ScheduleExecutionPayload
|
||||
workflowRecord: WorkflowRecord
|
||||
@@ -129,7 +127,6 @@ async function runWorkflowExecution({
|
||||
loggingSession: LoggingSession
|
||||
requestId: string
|
||||
executionId: string
|
||||
asyncTimeout?: number
|
||||
}): Promise<RunWorkflowResult> {
|
||||
try {
|
||||
logger.debug(`[${requestId}] Loading deployed workflow ${payload.workflowId}`)
|
||||
@@ -184,33 +181,15 @@ async function runWorkflowExecution({
|
||||
[]
|
||||
)
|
||||
|
||||
const timeoutController = createTimeoutAbortController(asyncTimeout)
|
||||
const executionResult = await executeWorkflowCore({
|
||||
snapshot,
|
||||
callbacks: {},
|
||||
loggingSession,
|
||||
includeFileBase64: true,
|
||||
base64MaxBytes: undefined,
|
||||
})
|
||||
|
||||
let executionResult
|
||||
try {
|
||||
executionResult = await executeWorkflowCore({
|
||||
snapshot,
|
||||
callbacks: {},
|
||||
loggingSession,
|
||||
includeFileBase64: true,
|
||||
base64MaxBytes: undefined,
|
||||
abortSignal: timeoutController.signal,
|
||||
})
|
||||
} finally {
|
||||
timeoutController.cleanup()
|
||||
}
|
||||
|
||||
if (
|
||||
executionResult.status === 'cancelled' &&
|
||||
timeoutController.isTimedOut() &&
|
||||
timeoutController.timeoutMs
|
||||
) {
|
||||
const timeoutErrorMessage = getTimeoutErrorMessage(null, timeoutController.timeoutMs)
|
||||
logger.info(`[${requestId}] Scheduled workflow execution timed out`, {
|
||||
timeoutMs: timeoutController.timeoutMs,
|
||||
})
|
||||
await loggingSession.markAsFailed(timeoutErrorMessage)
|
||||
} else if (executionResult.status === 'paused') {
|
||||
if (executionResult.status === 'paused') {
|
||||
if (!executionResult.snapshotSeed) {
|
||||
logger.error(`[${requestId}] Missing snapshot seed for paused execution`, {
|
||||
executionId,
|
||||
@@ -474,7 +453,6 @@ export async function executeScheduleJob(payload: ScheduleExecutionPayload) {
|
||||
loggingSession,
|
||||
requestId,
|
||||
executionId,
|
||||
asyncTimeout: preprocessResult.executionTimeout?.async,
|
||||
})
|
||||
|
||||
if (executionResult.status === 'skip') {
|
||||
|
||||
@@ -4,14 +4,7 @@ import { createLogger } from '@sim/logger'
|
||||
import { task } from '@trigger.dev/sdk'
|
||||
import { eq } from 'drizzle-orm'
|
||||
import { v4 as uuidv4 } from 'uuid'
|
||||
import { getHighestPrioritySubscription } from '@/lib/billing'
|
||||
import {
|
||||
createTimeoutAbortController,
|
||||
getExecutionTimeout,
|
||||
getTimeoutErrorMessage,
|
||||
} from '@/lib/core/execution-limits'
|
||||
import { IdempotencyService, webhookIdempotency } from '@/lib/core/idempotency'
|
||||
import type { SubscriptionPlan } from '@/lib/core/rate-limiter/types'
|
||||
import { processExecutionFiles } from '@/lib/execution/files'
|
||||
import { LoggingSession } from '@/lib/logs/execution/logging-session'
|
||||
import { buildTraceSpans } from '@/lib/logs/execution/trace-spans/trace-spans'
|
||||
@@ -141,13 +134,7 @@ async function executeWebhookJobInternal(
|
||||
requestId
|
||||
)
|
||||
|
||||
const userSubscription = await getHighestPrioritySubscription(payload.userId)
|
||||
const asyncTimeout = getExecutionTimeout(
|
||||
userSubscription?.plan as SubscriptionPlan | undefined,
|
||||
'async'
|
||||
)
|
||||
const timeoutController = createTimeoutAbortController(asyncTimeout)
|
||||
|
||||
// Track deploymentVersionId at function scope so it's available in catch block
|
||||
let deploymentVersionId: string | undefined
|
||||
|
||||
try {
|
||||
@@ -254,22 +241,11 @@ async function executeWebhookJobInternal(
|
||||
snapshot,
|
||||
callbacks: {},
|
||||
loggingSession,
|
||||
includeFileBase64: true,
|
||||
base64MaxBytes: undefined,
|
||||
abortSignal: timeoutController.signal,
|
||||
includeFileBase64: true, // Enable base64 hydration
|
||||
base64MaxBytes: undefined, // Use default limit
|
||||
})
|
||||
|
||||
if (
|
||||
executionResult.status === 'cancelled' &&
|
||||
timeoutController.isTimedOut() &&
|
||||
timeoutController.timeoutMs
|
||||
) {
|
||||
const timeoutErrorMessage = getTimeoutErrorMessage(null, timeoutController.timeoutMs)
|
||||
logger.info(`[${requestId}] Airtable webhook execution timed out`, {
|
||||
timeoutMs: timeoutController.timeoutMs,
|
||||
})
|
||||
await loggingSession.markAsFailed(timeoutErrorMessage)
|
||||
} else if (executionResult.status === 'paused') {
|
||||
if (executionResult.status === 'paused') {
|
||||
if (!executionResult.snapshotSeed) {
|
||||
logger.error(`[${requestId}] Missing snapshot seed for paused execution`, {
|
||||
executionId,
|
||||
@@ -521,20 +497,9 @@ async function executeWebhookJobInternal(
|
||||
callbacks: {},
|
||||
loggingSession,
|
||||
includeFileBase64: true,
|
||||
abortSignal: timeoutController.signal,
|
||||
})
|
||||
|
||||
if (
|
||||
executionResult.status === 'cancelled' &&
|
||||
timeoutController.isTimedOut() &&
|
||||
timeoutController.timeoutMs
|
||||
) {
|
||||
const timeoutErrorMessage = getTimeoutErrorMessage(null, timeoutController.timeoutMs)
|
||||
logger.info(`[${requestId}] Webhook execution timed out`, {
|
||||
timeoutMs: timeoutController.timeoutMs,
|
||||
})
|
||||
await loggingSession.markAsFailed(timeoutErrorMessage)
|
||||
} else if (executionResult.status === 'paused') {
|
||||
if (executionResult.status === 'paused') {
|
||||
if (!executionResult.snapshotSeed) {
|
||||
logger.error(`[${requestId}] Missing snapshot seed for paused execution`, {
|
||||
executionId,
|
||||
@@ -636,8 +601,6 @@ async function executeWebhookJobInternal(
|
||||
}
|
||||
|
||||
throw error
|
||||
} finally {
|
||||
timeoutController.cleanup()
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
import { createLogger } from '@sim/logger'
|
||||
import { task } from '@trigger.dev/sdk'
|
||||
import { v4 as uuidv4 } from 'uuid'
|
||||
import { createTimeoutAbortController, getTimeoutErrorMessage } from '@/lib/core/execution-limits'
|
||||
import { preprocessExecution } from '@/lib/execution/preprocessing'
|
||||
import { LoggingSession } from '@/lib/logs/execution/logging-session'
|
||||
import { buildTraceSpans } from '@/lib/logs/execution/trace-spans/trace-spans'
|
||||
@@ -104,33 +103,15 @@ export async function executeWorkflowJob(payload: WorkflowExecutionPayload) {
|
||||
[]
|
||||
)
|
||||
|
||||
const timeoutController = createTimeoutAbortController(preprocessResult.executionTimeout?.async)
|
||||
const result = await executeWorkflowCore({
|
||||
snapshot,
|
||||
callbacks: {},
|
||||
loggingSession,
|
||||
includeFileBase64: true,
|
||||
base64MaxBytes: undefined,
|
||||
})
|
||||
|
||||
let result
|
||||
try {
|
||||
result = await executeWorkflowCore({
|
||||
snapshot,
|
||||
callbacks: {},
|
||||
loggingSession,
|
||||
includeFileBase64: true,
|
||||
base64MaxBytes: undefined,
|
||||
abortSignal: timeoutController.signal,
|
||||
})
|
||||
} finally {
|
||||
timeoutController.cleanup()
|
||||
}
|
||||
|
||||
if (
|
||||
result.status === 'cancelled' &&
|
||||
timeoutController.isTimedOut() &&
|
||||
timeoutController.timeoutMs
|
||||
) {
|
||||
const timeoutErrorMessage = getTimeoutErrorMessage(null, timeoutController.timeoutMs)
|
||||
logger.info(`[${requestId}] Workflow execution timed out`, {
|
||||
timeoutMs: timeoutController.timeoutMs,
|
||||
})
|
||||
await loggingSession.markAsFailed(timeoutErrorMessage)
|
||||
} else if (result.status === 'paused') {
|
||||
if (result.status === 'paused') {
|
||||
if (!result.snapshotSeed) {
|
||||
logger.error(`[${requestId}] Missing snapshot seed for paused execution`, {
|
||||
executionId,
|
||||
|
||||
@@ -1,4 +1,3 @@
|
||||
import { getMaxExecutionTimeout } from '@/lib/core/execution-limits'
|
||||
import type { LoopType, ParallelType } from '@/lib/workflows/types'
|
||||
|
||||
/**
|
||||
@@ -188,12 +187,8 @@ export const HTTP = {
|
||||
|
||||
export const AGENT = {
|
||||
DEFAULT_MODEL: 'claude-sonnet-4-5',
|
||||
get DEFAULT_FUNCTION_TIMEOUT() {
|
||||
return getMaxExecutionTimeout()
|
||||
},
|
||||
get REQUEST_TIMEOUT() {
|
||||
return getMaxExecutionTimeout()
|
||||
},
|
||||
DEFAULT_FUNCTION_TIMEOUT: 600000,
|
||||
REQUEST_TIMEOUT: 600000,
|
||||
CUSTOM_TOOL_PREFIX: 'custom_',
|
||||
} as const
|
||||
|
||||
|
||||
@@ -14,7 +14,7 @@ const sleep = async (ms: number, options: SleepOptions = {}): Promise<boolean> =
|
||||
const { signal, executionId } = options
|
||||
const useRedis = isRedisCancellationEnabled() && !!executionId
|
||||
|
||||
if (signal?.aborted) {
|
||||
if (!useRedis && signal?.aborted) {
|
||||
return false
|
||||
}
|
||||
|
||||
@@ -27,7 +27,7 @@ const sleep = async (ms: number, options: SleepOptions = {}): Promise<boolean> =
|
||||
const cleanup = () => {
|
||||
if (mainTimeoutId) clearTimeout(mainTimeoutId)
|
||||
if (checkIntervalId) clearInterval(checkIntervalId)
|
||||
if (signal) signal.removeEventListener('abort', onAbort)
|
||||
if (!useRedis && signal) signal.removeEventListener('abort', onAbort)
|
||||
}
|
||||
|
||||
const onAbort = () => {
|
||||
@@ -37,10 +37,6 @@ const sleep = async (ms: number, options: SleepOptions = {}): Promise<boolean> =
|
||||
resolve(false)
|
||||
}
|
||||
|
||||
if (signal) {
|
||||
signal.addEventListener('abort', onAbort, { once: true })
|
||||
}
|
||||
|
||||
if (useRedis) {
|
||||
checkIntervalId = setInterval(async () => {
|
||||
if (resolved) return
|
||||
@@ -53,6 +49,8 @@ const sleep = async (ms: number, options: SleepOptions = {}): Promise<boolean> =
|
||||
}
|
||||
} catch {}
|
||||
}, CANCELLATION_CHECK_INTERVAL_MS)
|
||||
} else if (signal) {
|
||||
signal.addEventListener('abort', onAbort, { once: true })
|
||||
}
|
||||
|
||||
mainTimeoutId = setTimeout(() => {
|
||||
|
||||
@@ -126,7 +126,6 @@ export class WorkflowBlockHandler implements BlockHandler {
|
||||
workspaceId: ctx.workspaceId,
|
||||
userId: ctx.userId,
|
||||
executionId: ctx.executionId,
|
||||
abortSignal: ctx.abortSignal,
|
||||
},
|
||||
})
|
||||
|
||||
|
||||
@@ -1,10 +1,7 @@
|
||||
export { AGENT_CARD_PATH } from '@a2a-js/sdk'
|
||||
|
||||
import { DEFAULT_EXECUTION_TIMEOUT_MS } from '@/lib/core/execution-limits'
|
||||
|
||||
export const A2A_PROTOCOL_VERSION = '0.3.0'
|
||||
|
||||
export const A2A_DEFAULT_TIMEOUT = DEFAULT_EXECUTION_TIMEOUT_MS
|
||||
export const A2A_DEFAULT_TIMEOUT = 300000
|
||||
|
||||
/**
|
||||
* Maximum number of messages stored per task in the database.
|
||||
|
||||
@@ -5,8 +5,10 @@ import type { ToolUIConfig } from './ui-config'
|
||||
|
||||
const baseToolLogger = createLogger('BaseClientTool')
|
||||
|
||||
const DEFAULT_TOOL_TIMEOUT_MS = 5 * 60 * 1000
|
||||
/** Default timeout for tool execution (5 minutes) */
|
||||
const DEFAULT_TOOL_TIMEOUT_MS = 2 * 60 * 1000
|
||||
|
||||
/** Timeout for tools that run workflows (10 minutes) */
|
||||
export const WORKFLOW_EXECUTION_TIMEOUT_MS = 10 * 60 * 1000
|
||||
|
||||
// Client tool call states used by the new runtime
|
||||
|
||||
@@ -170,11 +170,6 @@ export const env = createEnv({
|
||||
RATE_LIMIT_ENTERPRISE_SYNC: z.string().optional().default('600'), // Enterprise tier sync API executions per minute
|
||||
RATE_LIMIT_ENTERPRISE_ASYNC: z.string().optional().default('5000'), // Enterprise tier async API executions per minute
|
||||
|
||||
EXECUTION_TIMEOUT_FREE: z.string().optional().default('300'),
|
||||
EXECUTION_TIMEOUT_PRO: z.string().optional().default('3600'),
|
||||
EXECUTION_TIMEOUT_TEAM: z.string().optional().default('3600'),
|
||||
EXECUTION_TIMEOUT_ENTERPRISE: z.string().optional().default('3600'),
|
||||
|
||||
// Knowledge Base Processing Configuration - Shared across all processing methods
|
||||
KB_CONFIG_MAX_DURATION: z.number().optional().default(600), // Max processing duration in seconds (10 minutes)
|
||||
KB_CONFIG_MAX_ATTEMPTS: z.number().optional().default(3), // Max retry attempts
|
||||
|
||||
@@ -1 +0,0 @@
|
||||
export * from './types'
|
||||
@@ -1,134 +0,0 @@
|
||||
import { env } from '@/lib/core/config/env'
|
||||
import type { SubscriptionPlan } from '@/lib/core/rate-limiter/types'
|
||||
|
||||
interface ExecutionTimeoutConfig {
|
||||
sync: number
|
||||
async: number
|
||||
}
|
||||
|
||||
const DEFAULT_SYNC_TIMEOUTS_SECONDS = {
|
||||
free: 300,
|
||||
pro: 3600,
|
||||
team: 3600,
|
||||
enterprise: 3600,
|
||||
} as const
|
||||
|
||||
const ASYNC_MULTIPLIER = 2
|
||||
const MAX_ASYNC_TIMEOUT_SECONDS = 5400
|
||||
|
||||
function getSyncTimeoutForPlan(plan: SubscriptionPlan): number {
|
||||
const envVarMap: Record<SubscriptionPlan, string | undefined> = {
|
||||
free: env.EXECUTION_TIMEOUT_FREE,
|
||||
pro: env.EXECUTION_TIMEOUT_PRO,
|
||||
team: env.EXECUTION_TIMEOUT_TEAM,
|
||||
enterprise: env.EXECUTION_TIMEOUT_ENTERPRISE,
|
||||
}
|
||||
return (Number.parseInt(envVarMap[plan] || '') || DEFAULT_SYNC_TIMEOUTS_SECONDS[plan]) * 1000
|
||||
}
|
||||
|
||||
function getAsyncTimeoutForPlan(plan: SubscriptionPlan): number {
|
||||
const syncMs = getSyncTimeoutForPlan(plan)
|
||||
const asyncMs = syncMs * ASYNC_MULTIPLIER
|
||||
const maxAsyncMs = MAX_ASYNC_TIMEOUT_SECONDS * 1000
|
||||
return Math.min(asyncMs, maxAsyncMs)
|
||||
}
|
||||
|
||||
const EXECUTION_TIMEOUTS: Record<SubscriptionPlan, ExecutionTimeoutConfig> = {
|
||||
free: {
|
||||
sync: getSyncTimeoutForPlan('free'),
|
||||
async: getAsyncTimeoutForPlan('free'),
|
||||
},
|
||||
pro: {
|
||||
sync: getSyncTimeoutForPlan('pro'),
|
||||
async: getAsyncTimeoutForPlan('pro'),
|
||||
},
|
||||
team: {
|
||||
sync: getSyncTimeoutForPlan('team'),
|
||||
async: getAsyncTimeoutForPlan('team'),
|
||||
},
|
||||
enterprise: {
|
||||
sync: getSyncTimeoutForPlan('enterprise'),
|
||||
async: getAsyncTimeoutForPlan('enterprise'),
|
||||
},
|
||||
}
|
||||
|
||||
export function getExecutionTimeout(
|
||||
plan: SubscriptionPlan | undefined,
|
||||
type: 'sync' | 'async' = 'sync'
|
||||
): number {
|
||||
return EXECUTION_TIMEOUTS[plan || 'free'][type]
|
||||
}
|
||||
|
||||
export function getMaxExecutionTimeout(): number {
|
||||
return EXECUTION_TIMEOUTS.enterprise.async
|
||||
}
|
||||
|
||||
export const DEFAULT_EXECUTION_TIMEOUT_MS = EXECUTION_TIMEOUTS.free.sync
|
||||
|
||||
export function isTimeoutError(error: unknown): boolean {
|
||||
if (!error) return false
|
||||
|
||||
if (error instanceof Error) {
|
||||
return error.name === 'TimeoutError'
|
||||
}
|
||||
|
||||
if (typeof error === 'object' && 'name' in error) {
|
||||
return (error as { name: string }).name === 'TimeoutError'
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
export function getTimeoutErrorMessage(error: unknown, timeoutMs?: number): string {
|
||||
if (timeoutMs) {
|
||||
const timeoutSeconds = Math.floor(timeoutMs / 1000)
|
||||
const timeoutMinutes = Math.floor(timeoutSeconds / 60)
|
||||
const displayTime =
|
||||
timeoutMinutes > 0
|
||||
? `${timeoutMinutes} minute${timeoutMinutes > 1 ? 's' : ''}`
|
||||
: `${timeoutSeconds} seconds`
|
||||
return `Execution timed out after ${displayTime}`
|
||||
}
|
||||
|
||||
return 'Execution timed out'
|
||||
}
|
||||
|
||||
/**
|
||||
* Helper to create an AbortController with timeout handling.
|
||||
* Centralizes the timeout abort pattern used across execution paths.
|
||||
*/
|
||||
export interface TimeoutAbortController {
|
||||
/** The AbortSignal to pass to execution functions */
|
||||
signal: AbortSignal
|
||||
/** Returns true if the abort was triggered by timeout (not user cancellation) */
|
||||
isTimedOut: () => boolean
|
||||
/** Cleanup function - call in finally block to clear the timeout */
|
||||
cleanup: () => void
|
||||
/** Manually abort the execution (for user cancellation) */
|
||||
abort: () => void
|
||||
/** The timeout duration in milliseconds (undefined if no timeout) */
|
||||
timeoutMs: number | undefined
|
||||
}
|
||||
|
||||
export function createTimeoutAbortController(timeoutMs?: number): TimeoutAbortController {
|
||||
const abortController = new AbortController()
|
||||
let isTimedOut = false
|
||||
let timeoutId: NodeJS.Timeout | undefined
|
||||
|
||||
if (timeoutMs) {
|
||||
timeoutId = setTimeout(() => {
|
||||
isTimedOut = true
|
||||
abortController.abort()
|
||||
}, timeoutMs)
|
||||
}
|
||||
|
||||
return {
|
||||
signal: abortController.signal,
|
||||
isTimedOut: () => isTimedOut,
|
||||
cleanup: () => {
|
||||
if (timeoutId) clearTimeout(timeoutId)
|
||||
},
|
||||
abort: () => abortController.abort(),
|
||||
timeoutMs,
|
||||
}
|
||||
}
|
||||
@@ -4,7 +4,6 @@ import { idempotencyKey } from '@sim/db/schema'
|
||||
import { createLogger } from '@sim/logger'
|
||||
import { eq } from 'drizzle-orm'
|
||||
import { getRedisClient } from '@/lib/core/config/redis'
|
||||
import { getMaxExecutionTimeout } from '@/lib/core/execution-limits'
|
||||
import { getStorageMethod, type StorageMethod } from '@/lib/core/storage'
|
||||
import { extractProviderIdentifierFromBody } from '@/lib/webhooks/provider-utils'
|
||||
|
||||
@@ -37,9 +36,9 @@ export interface AtomicClaimResult {
|
||||
storageMethod: StorageMethod
|
||||
}
|
||||
|
||||
const DEFAULT_TTL = 60 * 60 * 24 * 7
|
||||
const DEFAULT_TTL = 60 * 60 * 24 * 7 // 7 days
|
||||
const REDIS_KEY_PREFIX = 'idempotency:'
|
||||
const MAX_WAIT_TIME_MS = getMaxExecutionTimeout()
|
||||
const MAX_WAIT_TIME_MS = 300000 // 5 minutes max wait
|
||||
const POLL_INTERVAL_MS = 1000
|
||||
|
||||
/**
|
||||
|
||||
@@ -1,3 +1,7 @@
|
||||
import { DEFAULT_EXECUTION_TIMEOUT_MS } from '@/lib/core/execution-limits'
|
||||
/**
|
||||
* Execution timeout constants
|
||||
*
|
||||
* DEFAULT_EXECUTION_TIMEOUT_MS: The default timeout for executing user code (10 minutes)
|
||||
*/
|
||||
|
||||
export { DEFAULT_EXECUTION_TIMEOUT_MS }
|
||||
export const DEFAULT_EXECUTION_TIMEOUT_MS = 600000 // 10 minutes (600 seconds)
|
||||
|
||||
@@ -4,9 +4,7 @@ import { createLogger } from '@sim/logger'
|
||||
import { eq } from 'drizzle-orm'
|
||||
import { checkServerSideUsageLimits } from '@/lib/billing/calculations/usage-monitor'
|
||||
import { getHighestPrioritySubscription } from '@/lib/billing/core/subscription'
|
||||
import { getExecutionTimeout } from '@/lib/core/execution-limits'
|
||||
import { RateLimiter } from '@/lib/core/rate-limiter/rate-limiter'
|
||||
import type { SubscriptionPlan } from '@/lib/core/rate-limiter/types'
|
||||
import { LoggingSession } from '@/lib/logs/execution/logging-session'
|
||||
import { getWorkspaceBilledAccountUserId } from '@/lib/workspaces/utils'
|
||||
import type { CoreTriggerType } from '@/stores/logs/filters/types'
|
||||
@@ -135,10 +133,10 @@ export interface PreprocessExecutionResult {
|
||||
success: boolean
|
||||
error?: {
|
||||
message: string
|
||||
statusCode: number
|
||||
logCreated: boolean
|
||||
statusCode: number // HTTP status code (401, 402, 403, 404, 429, 500)
|
||||
logCreated: boolean // Whether error was logged to execution_logs
|
||||
}
|
||||
actorUserId?: string
|
||||
actorUserId?: string // The user ID that will be billed
|
||||
workflowRecord?: WorkflowRecord
|
||||
userSubscription?: SubscriptionInfo | null
|
||||
rateLimitInfo?: {
|
||||
@@ -146,10 +144,6 @@ export interface PreprocessExecutionResult {
|
||||
remaining: number
|
||||
resetAt: Date
|
||||
}
|
||||
executionTimeout?: {
|
||||
sync: number
|
||||
async: number
|
||||
}
|
||||
}
|
||||
|
||||
type WorkflowRecord = typeof workflow.$inferSelect
|
||||
@@ -490,17 +484,12 @@ export async function preprocessExecution(
|
||||
triggerType,
|
||||
})
|
||||
|
||||
const plan = userSubscription?.plan as SubscriptionPlan | undefined
|
||||
return {
|
||||
success: true,
|
||||
actorUserId,
|
||||
workflowRecord,
|
||||
userSubscription,
|
||||
rateLimitInfo,
|
||||
executionTimeout: {
|
||||
sync: getExecutionTimeout(plan, 'sync'),
|
||||
async: getExecutionTimeout(plan, 'async'),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -776,16 +776,11 @@ export class LoggingSession {
|
||||
await db
|
||||
.update(workflowExecutionLogs)
|
||||
.set({
|
||||
level: 'error',
|
||||
status: 'failed',
|
||||
executionData: sql`jsonb_set(
|
||||
jsonb_set(
|
||||
COALESCE(execution_data, '{}'::jsonb),
|
||||
ARRAY['error'],
|
||||
to_jsonb(${message}::text)
|
||||
),
|
||||
ARRAY['finalOutput'],
|
||||
jsonb_build_object('error', ${message}::text)
|
||||
COALESCE(execution_data, '{}'::jsonb),
|
||||
ARRAY['error'],
|
||||
to_jsonb(${message}::text)
|
||||
)`,
|
||||
})
|
||||
.where(eq(workflowExecutionLogs.executionId, executionId))
|
||||
|
||||
@@ -12,7 +12,6 @@ import { Client } from '@modelcontextprotocol/sdk/client/index.js'
|
||||
import { StreamableHTTPClientTransport } from '@modelcontextprotocol/sdk/client/streamableHttp.js'
|
||||
import type { ListToolsResult, Tool } from '@modelcontextprotocol/sdk/types.js'
|
||||
import { createLogger } from '@sim/logger'
|
||||
import { getMaxExecutionTimeout } from '@/lib/core/execution-limits'
|
||||
import {
|
||||
McpConnectionError,
|
||||
type McpConnectionStatus,
|
||||
@@ -203,7 +202,7 @@ export class McpClient {
|
||||
const sdkResult = await this.client.callTool(
|
||||
{ name: toolCall.name, arguments: toolCall.arguments },
|
||||
undefined,
|
||||
{ timeout: getMaxExecutionTimeout() }
|
||||
{ timeout: 600000 } // 10 minutes - override SDK's 60s default
|
||||
)
|
||||
|
||||
return sdkResult as McpToolResult
|
||||
|
||||
@@ -32,11 +32,9 @@ export function sanitizeHeaders(
|
||||
|
||||
/**
|
||||
* Client-safe MCP constants
|
||||
* Note: CLIENT_TIMEOUT should match DEFAULT_EXECUTION_TIMEOUT_MS from @/lib/core/execution-limits
|
||||
* (5 minutes = 300 seconds for free tier). Keep in sync if that value changes.
|
||||
*/
|
||||
export const MCP_CLIENT_CONSTANTS = {
|
||||
CLIENT_TIMEOUT: 5 * 60 * 1000, // 5 minutes - matches DEFAULT_EXECUTION_TIMEOUT_MS
|
||||
CLIENT_TIMEOUT: 600000,
|
||||
MAX_RETRIES: 3,
|
||||
RECONNECT_DELAY: 1000,
|
||||
} as const
|
||||
|
||||
@@ -1,5 +1,4 @@
|
||||
import { describe, expect, it } from 'vitest'
|
||||
import { DEFAULT_EXECUTION_TIMEOUT_MS } from '@/lib/core/execution-limits'
|
||||
import {
|
||||
categorizeError,
|
||||
createMcpToolId,
|
||||
@@ -82,8 +81,8 @@ describe('generateMcpServerId', () => {
|
||||
})
|
||||
|
||||
describe('MCP_CONSTANTS', () => {
|
||||
it.concurrent('has correct execution timeout', () => {
|
||||
expect(MCP_CONSTANTS.EXECUTION_TIMEOUT).toBe(DEFAULT_EXECUTION_TIMEOUT_MS)
|
||||
it.concurrent('has correct execution timeout (10 minutes)', () => {
|
||||
expect(MCP_CONSTANTS.EXECUTION_TIMEOUT).toBe(600000)
|
||||
})
|
||||
|
||||
it.concurrent('has correct cache timeout (5 minutes)', () => {
|
||||
@@ -108,8 +107,8 @@ describe('MCP_CONSTANTS', () => {
|
||||
})
|
||||
|
||||
describe('MCP_CLIENT_CONSTANTS', () => {
|
||||
it.concurrent('has correct client timeout', () => {
|
||||
expect(MCP_CLIENT_CONSTANTS.CLIENT_TIMEOUT).toBe(DEFAULT_EXECUTION_TIMEOUT_MS)
|
||||
it.concurrent('has correct client timeout (10 minutes)', () => {
|
||||
expect(MCP_CLIENT_CONSTANTS.CLIENT_TIMEOUT).toBe(600000)
|
||||
})
|
||||
|
||||
it.concurrent('has correct auto refresh interval (5 minutes)', () => {
|
||||
|
||||
@@ -1,10 +1,12 @@
|
||||
import { NextResponse } from 'next/server'
|
||||
import { DEFAULT_EXECUTION_TIMEOUT_MS } from '@/lib/core/execution-limits'
|
||||
import type { McpApiResponse } from '@/lib/mcp/types'
|
||||
import { isMcpTool, MCP } from '@/executor/constants'
|
||||
|
||||
/**
|
||||
* MCP-specific constants
|
||||
*/
|
||||
export const MCP_CONSTANTS = {
|
||||
EXECUTION_TIMEOUT: DEFAULT_EXECUTION_TIMEOUT_MS,
|
||||
EXECUTION_TIMEOUT: 600000,
|
||||
CACHE_TIMEOUT: 5 * 60 * 1000,
|
||||
DEFAULT_RETRIES: 3,
|
||||
DEFAULT_CONNECTION_TIMEOUT: 30000,
|
||||
@@ -43,8 +45,11 @@ export function sanitizeHeaders(
|
||||
)
|
||||
}
|
||||
|
||||
/**
|
||||
* Client-safe MCP constants
|
||||
*/
|
||||
export const MCP_CLIENT_CONSTANTS = {
|
||||
CLIENT_TIMEOUT: DEFAULT_EXECUTION_TIMEOUT_MS,
|
||||
CLIENT_TIMEOUT: 600000,
|
||||
AUTO_REFRESH_INTERVAL: 5 * 60 * 1000,
|
||||
} as const
|
||||
|
||||
|
||||
@@ -19,7 +19,6 @@ export interface ExecuteWorkflowOptions {
|
||||
skipLoggingComplete?: boolean
|
||||
includeFileBase64?: boolean
|
||||
base64MaxBytes?: number
|
||||
abortSignal?: AbortSignal
|
||||
}
|
||||
|
||||
export interface WorkflowInfo {
|
||||
@@ -83,7 +82,6 @@ export async function executeWorkflow(
|
||||
loggingSession,
|
||||
includeFileBase64: streamConfig?.includeFileBase64,
|
||||
base64MaxBytes: streamConfig?.base64MaxBytes,
|
||||
abortSignal: streamConfig?.abortSignal,
|
||||
})
|
||||
|
||||
if (result.status === 'paused') {
|
||||
|
||||
@@ -58,6 +58,9 @@ export interface ExecutionErrorEvent extends BaseExecutionEvent {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Execution cancelled event
|
||||
*/
|
||||
export interface ExecutionCancelledEvent extends BaseExecutionEvent {
|
||||
type: 'execution:cancelled'
|
||||
workflowId: string
|
||||
@@ -164,6 +167,9 @@ export type ExecutionEvent =
|
||||
| StreamChunkEvent
|
||||
| StreamDoneEvent
|
||||
|
||||
/**
|
||||
* Extracted data types for use in callbacks
|
||||
*/
|
||||
export type ExecutionStartedData = ExecutionStartedEvent['data']
|
||||
export type ExecutionCompletedData = ExecutionCompletedEvent['data']
|
||||
export type ExecutionErrorData = ExecutionErrorEvent['data']
|
||||
|
||||
@@ -4,7 +4,6 @@ import { pausedExecutions, resumeQueue, workflowExecutionLogs } from '@sim/db/sc
|
||||
import { createLogger } from '@sim/logger'
|
||||
import { and, asc, desc, eq, inArray, lt, type SQL, sql } from 'drizzle-orm'
|
||||
import type { Edge } from 'reactflow'
|
||||
import { createTimeoutAbortController, getTimeoutErrorMessage } from '@/lib/core/execution-limits'
|
||||
import { preprocessExecution } from '@/lib/execution/preprocessing'
|
||||
import { LoggingSession } from '@/lib/logs/execution/logging-session'
|
||||
import { executeWorkflowCore } from '@/lib/workflows/executor/execution-core'
|
||||
@@ -772,39 +771,14 @@ export class PauseResumeManager {
|
||||
actorUserId: metadata.userId,
|
||||
})
|
||||
|
||||
const timeoutController = createTimeoutAbortController(
|
||||
preprocessingResult.executionTimeout?.async
|
||||
)
|
||||
|
||||
let result: ExecutionResult
|
||||
try {
|
||||
result = await executeWorkflowCore({
|
||||
snapshot: resumeSnapshot,
|
||||
callbacks: {},
|
||||
loggingSession,
|
||||
skipLogCreation: true, // Reuse existing log entry
|
||||
includeFileBase64: true, // Enable base64 hydration
|
||||
base64MaxBytes: undefined, // Use default limit
|
||||
abortSignal: timeoutController.signal,
|
||||
})
|
||||
} finally {
|
||||
timeoutController.cleanup()
|
||||
}
|
||||
|
||||
if (
|
||||
result.status === 'cancelled' &&
|
||||
timeoutController.isTimedOut() &&
|
||||
timeoutController.timeoutMs
|
||||
) {
|
||||
const timeoutErrorMessage = getTimeoutErrorMessage(null, timeoutController.timeoutMs)
|
||||
logger.info('Resume execution timed out', {
|
||||
resumeExecutionId,
|
||||
timeoutMs: timeoutController.timeoutMs,
|
||||
})
|
||||
await loggingSession.markAsFailed(timeoutErrorMessage)
|
||||
}
|
||||
|
||||
return result
|
||||
return await executeWorkflowCore({
|
||||
snapshot: resumeSnapshot,
|
||||
callbacks: {},
|
||||
loggingSession,
|
||||
skipLogCreation: true, // Reuse existing log entry
|
||||
includeFileBase64: true, // Enable base64 hydration
|
||||
base64MaxBytes: undefined, // Use default limit
|
||||
})
|
||||
}
|
||||
|
||||
private static async markResumeCompleted(args: {
|
||||
|
||||
@@ -1,5 +1,4 @@
|
||||
import { createLogger } from '@sim/logger'
|
||||
import { createTimeoutAbortController, getTimeoutErrorMessage } from '@/lib/core/execution-limits'
|
||||
import {
|
||||
extractBlockIdFromOutputId,
|
||||
extractPathFromOutputId,
|
||||
@@ -33,7 +32,6 @@ export interface StreamingConfig {
|
||||
workflowTriggerType?: 'api' | 'chat'
|
||||
includeFileBase64?: boolean
|
||||
base64MaxBytes?: number
|
||||
timeoutMs?: number
|
||||
}
|
||||
|
||||
export interface StreamingResponseOptions {
|
||||
@@ -270,8 +268,6 @@ export async function createStreamingResponse(
|
||||
}
|
||||
}
|
||||
|
||||
const timeoutController = createTimeoutAbortController(streamConfig.timeoutMs)
|
||||
|
||||
try {
|
||||
const result = await executeWorkflow(
|
||||
workflow,
|
||||
@@ -288,7 +284,6 @@ export async function createStreamingResponse(
|
||||
skipLoggingComplete: true,
|
||||
includeFileBase64: streamConfig.includeFileBase64,
|
||||
base64MaxBytes: streamConfig.base64MaxBytes,
|
||||
abortSignal: timeoutController.signal,
|
||||
},
|
||||
executionId
|
||||
)
|
||||
@@ -298,34 +293,18 @@ export async function createStreamingResponse(
|
||||
processStreamingBlockLogs(result.logs, state.streamedContent)
|
||||
}
|
||||
|
||||
if (
|
||||
result.status === 'cancelled' &&
|
||||
timeoutController.isTimedOut() &&
|
||||
timeoutController.timeoutMs
|
||||
) {
|
||||
const timeoutErrorMessage = getTimeoutErrorMessage(null, timeoutController.timeoutMs)
|
||||
logger.info(`[${requestId}] Streaming execution timed out`, {
|
||||
timeoutMs: timeoutController.timeoutMs,
|
||||
})
|
||||
if (result._streamingMetadata?.loggingSession) {
|
||||
await result._streamingMetadata.loggingSession.markAsFailed(timeoutErrorMessage)
|
||||
}
|
||||
controller.enqueue(encodeSSE({ event: 'error', error: timeoutErrorMessage }))
|
||||
} else {
|
||||
await completeLoggingSession(result)
|
||||
await completeLoggingSession(result)
|
||||
|
||||
const minimalResult = await buildMinimalResult(
|
||||
result,
|
||||
streamConfig.selectedOutputs,
|
||||
state.streamedContent,
|
||||
requestId,
|
||||
streamConfig.includeFileBase64 ?? true,
|
||||
streamConfig.base64MaxBytes
|
||||
)
|
||||
|
||||
controller.enqueue(encodeSSE({ event: 'final', data: minimalResult }))
|
||||
}
|
||||
const minimalResult = await buildMinimalResult(
|
||||
result,
|
||||
streamConfig.selectedOutputs,
|
||||
state.streamedContent,
|
||||
requestId,
|
||||
streamConfig.includeFileBase64 ?? true,
|
||||
streamConfig.base64MaxBytes
|
||||
)
|
||||
|
||||
controller.enqueue(encodeSSE({ event: 'final', data: minimalResult }))
|
||||
controller.enqueue(encodeSSE('[DONE]'))
|
||||
|
||||
if (executionId) {
|
||||
@@ -344,8 +323,6 @@ export async function createStreamingResponse(
|
||||
}
|
||||
|
||||
controller.close()
|
||||
} finally {
|
||||
timeoutController.cleanup()
|
||||
}
|
||||
},
|
||||
})
|
||||
|
||||
@@ -12,39 +12,70 @@ import {
|
||||
import { persistWorkflowOperation } from '@/socket/database/operations'
|
||||
import type { AuthenticatedSocket } from '@/socket/middleware/auth'
|
||||
import { checkRolePermission } from '@/socket/middleware/permissions'
|
||||
import type { IRoomManager } from '@/socket/rooms'
|
||||
import type { IRoomManager, UserSession } from '@/socket/rooms'
|
||||
import { WorkflowOperationSchema } from '@/socket/validation/schemas'
|
||||
|
||||
const logger = createLogger('OperationsHandlers')
|
||||
|
||||
export function setupOperationsHandlers(socket: AuthenticatedSocket, roomManager: IRoomManager) {
|
||||
socket.on('workflow-operation', async (data) => {
|
||||
const workflowId = await roomManager.getWorkflowIdForSocket(socket.id)
|
||||
const session = await roomManager.getUserSession(socket.id)
|
||||
|
||||
if (!workflowId || !session) {
|
||||
socket.emit('operation-forbidden', {
|
||||
type: 'SESSION_ERROR',
|
||||
message: 'Session expired, please rejoin workflow',
|
||||
})
|
||||
if (data?.operationId) {
|
||||
socket.emit('operation-failed', { operationId: data.operationId, error: 'Session expired' })
|
||||
const emitOperationError = (
|
||||
forbidden: { type: string; message: string; operation?: string; target?: string },
|
||||
failed?: { error: string; retryable?: boolean }
|
||||
) => {
|
||||
socket.emit('operation-forbidden', forbidden)
|
||||
if (failed && data?.operationId) {
|
||||
socket.emit('operation-failed', { operationId: data.operationId, ...failed })
|
||||
}
|
||||
}
|
||||
|
||||
if (!roomManager.isReady()) {
|
||||
emitOperationError(
|
||||
{ type: 'ROOM_MANAGER_UNAVAILABLE', message: 'Realtime unavailable' },
|
||||
{ error: 'Realtime unavailable', retryable: true }
|
||||
)
|
||||
return
|
||||
}
|
||||
|
||||
const hasRoom = await roomManager.hasWorkflowRoom(workflowId)
|
||||
let workflowId: string | null = null
|
||||
let session: UserSession | null = null
|
||||
|
||||
try {
|
||||
workflowId = await roomManager.getWorkflowIdForSocket(socket.id)
|
||||
session = await roomManager.getUserSession(socket.id)
|
||||
} catch (error) {
|
||||
logger.error('Error loading session for workflow operation:', error)
|
||||
emitOperationError(
|
||||
{ type: 'ROOM_MANAGER_UNAVAILABLE', message: 'Realtime unavailable' },
|
||||
{ error: 'Realtime unavailable', retryable: true }
|
||||
)
|
||||
return
|
||||
}
|
||||
|
||||
if (!workflowId || !session) {
|
||||
emitOperationError(
|
||||
{ type: 'SESSION_ERROR', message: 'Session expired, please rejoin workflow' },
|
||||
{ error: 'Session expired' }
|
||||
)
|
||||
return
|
||||
}
|
||||
|
||||
let hasRoom = false
|
||||
try {
|
||||
hasRoom = await roomManager.hasWorkflowRoom(workflowId)
|
||||
} catch (error) {
|
||||
logger.error('Error checking workflow room:', error)
|
||||
emitOperationError(
|
||||
{ type: 'ROOM_MANAGER_UNAVAILABLE', message: 'Realtime unavailable' },
|
||||
{ error: 'Realtime unavailable', retryable: true }
|
||||
)
|
||||
return
|
||||
}
|
||||
if (!hasRoom) {
|
||||
socket.emit('operation-forbidden', {
|
||||
type: 'ROOM_NOT_FOUND',
|
||||
message: 'Workflow room not found',
|
||||
})
|
||||
if (data?.operationId) {
|
||||
socket.emit('operation-failed', {
|
||||
operationId: data.operationId,
|
||||
error: 'Workflow room not found',
|
||||
})
|
||||
}
|
||||
emitOperationError(
|
||||
{ type: 'ROOM_NOT_FOUND', message: 'Workflow room not found' },
|
||||
{ error: 'Workflow room not found' }
|
||||
)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -77,15 +108,15 @@ export function setupOperationsHandlers(socket: AuthenticatedSocket, roomManager
|
||||
// Check permissions from cached role for all other operations
|
||||
if (!userPresence) {
|
||||
logger.warn(`User presence not found for socket ${socket.id}`)
|
||||
socket.emit('operation-forbidden', {
|
||||
type: 'SESSION_ERROR',
|
||||
message: 'User session not found',
|
||||
operation,
|
||||
target,
|
||||
})
|
||||
if (operationId) {
|
||||
socket.emit('operation-failed', { operationId, error: 'User session not found' })
|
||||
}
|
||||
emitOperationError(
|
||||
{
|
||||
type: 'SESSION_ERROR',
|
||||
message: 'User session not found',
|
||||
operation,
|
||||
target,
|
||||
},
|
||||
{ error: 'User session not found' }
|
||||
)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -97,7 +128,7 @@ export function setupOperationsHandlers(socket: AuthenticatedSocket, roomManager
|
||||
logger.warn(
|
||||
`User ${session.userId} (role: ${userPresence.role}) forbidden from ${operation} on ${target}`
|
||||
)
|
||||
socket.emit('operation-forbidden', {
|
||||
emitOperationError({
|
||||
type: 'INSUFFICIENT_PERMISSIONS',
|
||||
message: `${permissionCheck.reason} on '${target}'`,
|
||||
operation,
|
||||
|
||||
@@ -48,6 +48,21 @@ export function setupSubblocksHandlers(socket: AuthenticatedSocket, roomManager:
|
||||
operationId,
|
||||
} = data
|
||||
|
||||
if (!roomManager.isReady()) {
|
||||
socket.emit('operation-forbidden', {
|
||||
type: 'ROOM_MANAGER_UNAVAILABLE',
|
||||
message: 'Realtime unavailable',
|
||||
})
|
||||
if (operationId) {
|
||||
socket.emit('operation-failed', {
|
||||
operationId,
|
||||
error: 'Realtime unavailable',
|
||||
retryable: true,
|
||||
})
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
try {
|
||||
const sessionWorkflowId = await roomManager.getWorkflowIdForSocket(socket.id)
|
||||
const session = await roomManager.getUserSession(socket.id)
|
||||
|
||||
@@ -37,6 +37,21 @@ export function setupVariablesHandlers(socket: AuthenticatedSocket, roomManager:
|
||||
socket.on('variable-update', async (data) => {
|
||||
const { workflowId: payloadWorkflowId, variableId, field, value, timestamp, operationId } = data
|
||||
|
||||
if (!roomManager.isReady()) {
|
||||
socket.emit('operation-forbidden', {
|
||||
type: 'ROOM_MANAGER_UNAVAILABLE',
|
||||
message: 'Realtime unavailable',
|
||||
})
|
||||
if (operationId) {
|
||||
socket.emit('operation-failed', {
|
||||
operationId,
|
||||
error: 'Realtime unavailable',
|
||||
retryable: true,
|
||||
})
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
try {
|
||||
const sessionWorkflowId = await roomManager.getWorkflowIdForSocket(socket.id)
|
||||
const session = await roomManager.getUserSession(socket.id)
|
||||
|
||||
@@ -20,6 +20,15 @@ export function setupWorkflowHandlers(socket: AuthenticatedSocket, roomManager:
|
||||
return
|
||||
}
|
||||
|
||||
if (!roomManager.isReady()) {
|
||||
logger.warn(`Join workflow rejected: Room manager unavailable`)
|
||||
socket.emit('join-workflow-error', {
|
||||
error: 'Realtime unavailable',
|
||||
code: 'ROOM_MANAGER_UNAVAILABLE',
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
logger.info(`Join workflow request from ${userId} (${userName}) for workflow ${workflowId}`)
|
||||
|
||||
// Verify workflow access
|
||||
@@ -128,12 +137,20 @@ export function setupWorkflowHandlers(socket: AuthenticatedSocket, roomManager:
|
||||
// Undo socket.join and room manager entry if any operation failed
|
||||
socket.leave(workflowId)
|
||||
await roomManager.removeUserFromRoom(socket.id)
|
||||
socket.emit('join-workflow-error', { error: 'Failed to join workflow' })
|
||||
const isReady = roomManager.isReady()
|
||||
socket.emit('join-workflow-error', {
|
||||
error: isReady ? 'Failed to join workflow' : 'Realtime unavailable',
|
||||
code: isReady ? undefined : 'ROOM_MANAGER_UNAVAILABLE',
|
||||
})
|
||||
}
|
||||
})
|
||||
|
||||
socket.on('leave-workflow', async () => {
|
||||
try {
|
||||
if (!roomManager.isReady()) {
|
||||
return
|
||||
}
|
||||
|
||||
const workflowId = await roomManager.getWorkflowIdForSocket(socket.id)
|
||||
const session = await roomManager.getUserSession(socket.id)
|
||||
|
||||
|
||||
@@ -26,6 +26,10 @@ export class MemoryRoomManager implements IRoomManager {
|
||||
logger.info('MemoryRoomManager initialized (single-pod mode)')
|
||||
}
|
||||
|
||||
isReady(): boolean {
|
||||
return true
|
||||
}
|
||||
|
||||
async shutdown(): Promise<void> {
|
||||
this.workflowRooms.clear()
|
||||
this.socketToWorkflow.clear()
|
||||
|
||||
@@ -96,17 +96,6 @@ export class RedisRoomManager implements IRoomManager {
|
||||
this._io = io
|
||||
this.redis = createClient({
|
||||
url: redisUrl,
|
||||
socket: {
|
||||
reconnectStrategy: (retries) => {
|
||||
if (retries > 10) {
|
||||
logger.error('Redis reconnection failed after 10 attempts')
|
||||
return new Error('Redis reconnection failed')
|
||||
}
|
||||
const delay = Math.min(retries * 100, 3000)
|
||||
logger.warn(`Redis reconnecting in ${delay}ms (attempt ${retries})`)
|
||||
return delay
|
||||
},
|
||||
},
|
||||
})
|
||||
|
||||
this.redis.on('error', (err) => {
|
||||
@@ -122,12 +111,21 @@ export class RedisRoomManager implements IRoomManager {
|
||||
logger.info('Redis client ready')
|
||||
this.isConnected = true
|
||||
})
|
||||
|
||||
this.redis.on('end', () => {
|
||||
logger.warn('Redis client connection closed')
|
||||
this.isConnected = false
|
||||
})
|
||||
}
|
||||
|
||||
get io(): Server {
|
||||
return this._io
|
||||
}
|
||||
|
||||
isReady(): boolean {
|
||||
return this.isConnected
|
||||
}
|
||||
|
||||
async initialize(): Promise<void> {
|
||||
if (this.isConnected) return
|
||||
|
||||
|
||||
@@ -48,6 +48,11 @@ export interface IRoomManager {
|
||||
*/
|
||||
initialize(): Promise<void>
|
||||
|
||||
/**
|
||||
* Whether the room manager is ready to serve requests
|
||||
*/
|
||||
isReady(): boolean
|
||||
|
||||
/**
|
||||
* Clean shutdown
|
||||
*/
|
||||
|
||||
@@ -85,6 +85,11 @@ export function createHttpHandler(roomManager: IRoomManager, logger: Logger) {
|
||||
res.end(JSON.stringify({ error: authResult.error }))
|
||||
return
|
||||
}
|
||||
|
||||
if (!roomManager.isReady()) {
|
||||
sendError(res, 'Room manager unavailable', 503)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// Handle workflow deletion notifications from the main API
|
||||
|
||||
@@ -1,9 +1,8 @@
|
||||
import { DEFAULT_EXECUTION_TIMEOUT_MS } from '@/lib/core/execution-limits'
|
||||
import type { RunActorParams, RunActorResult } from '@/tools/apify/types'
|
||||
import type { ToolConfig } from '@/tools/types'
|
||||
|
||||
const POLL_INTERVAL_MS = 5000
|
||||
const MAX_POLL_TIME_MS = DEFAULT_EXECUTION_TIMEOUT_MS
|
||||
const POLL_INTERVAL_MS = 5000 // 5 seconds between polls
|
||||
const MAX_POLL_TIME_MS = 300000 // 5 minutes maximum polling time
|
||||
|
||||
export const apifyRunActorAsyncTool: ToolConfig<RunActorParams, RunActorResult> = {
|
||||
id: 'apify_run_actor_async',
|
||||
|
||||
@@ -1,12 +1,11 @@
|
||||
import { createLogger } from '@sim/logger'
|
||||
import { getMaxExecutionTimeout } from '@/lib/core/execution-limits'
|
||||
import type { BrowserUseRunTaskParams, BrowserUseRunTaskResponse } from '@/tools/browser_use/types'
|
||||
import type { ToolConfig, ToolResponse } from '@/tools/types'
|
||||
|
||||
const logger = createLogger('BrowserUseTool')
|
||||
|
||||
const POLL_INTERVAL_MS = 5000
|
||||
const MAX_POLL_TIME_MS = getMaxExecutionTimeout()
|
||||
const MAX_POLL_TIME_MS = 600000 // 10 minutes
|
||||
const MAX_CONSECUTIVE_ERRORS = 3
|
||||
|
||||
async function createSessionWithProfile(
|
||||
|
||||
@@ -7,7 +7,7 @@ import type { ToolConfig } from '@/tools/types'
|
||||
*/
|
||||
function httpHeaderSafeJson(value: object): string {
|
||||
return JSON.stringify(value).replace(/[\u007f-\uffff]/g, (c) => {
|
||||
return `\\u${(`0000${c.charCodeAt(0).toString(16)}`).slice(-4)}`
|
||||
return '\\u' + ('0000' + c.charCodeAt(0).toString(16)).slice(-4)
|
||||
})
|
||||
}
|
||||
|
||||
|
||||
@@ -1,12 +1,11 @@
|
||||
import { createLogger } from '@sim/logger'
|
||||
import { DEFAULT_EXECUTION_TIMEOUT_MS } from '@/lib/core/execution-limits'
|
||||
import type { ExaResearchParams, ExaResearchResponse } from '@/tools/exa/types'
|
||||
import type { ToolConfig } from '@/tools/types'
|
||||
|
||||
const logger = createLogger('ExaResearchTool')
|
||||
|
||||
const POLL_INTERVAL_MS = 5000
|
||||
const MAX_POLL_TIME_MS = DEFAULT_EXECUTION_TIMEOUT_MS
|
||||
const POLL_INTERVAL_MS = 5000 // 5 seconds between polls
|
||||
const MAX_POLL_TIME_MS = 300000 // 5 minutes maximum polling time
|
||||
|
||||
export const researchTool: ToolConfig<ExaResearchParams, ExaResearchResponse> = {
|
||||
id: 'exa_research',
|
||||
|
||||
@@ -1,12 +1,11 @@
|
||||
import { createLogger } from '@sim/logger'
|
||||
import { DEFAULT_EXECUTION_TIMEOUT_MS } from '@/lib/core/execution-limits'
|
||||
import type { AgentParams, AgentResponse } from '@/tools/firecrawl/types'
|
||||
import type { ToolConfig } from '@/tools/types'
|
||||
|
||||
const logger = createLogger('FirecrawlAgentTool')
|
||||
|
||||
const POLL_INTERVAL_MS = 5000
|
||||
const MAX_POLL_TIME_MS = DEFAULT_EXECUTION_TIMEOUT_MS
|
||||
const POLL_INTERVAL_MS = 5000 // 5 seconds between polls
|
||||
const MAX_POLL_TIME_MS = 300000 // 5 minutes maximum polling time
|
||||
|
||||
export const agentTool: ToolConfig<AgentParams, AgentResponse> = {
|
||||
id: 'firecrawl_agent',
|
||||
|
||||
@@ -1,13 +1,12 @@
|
||||
import { createLogger } from '@sim/logger'
|
||||
import { DEFAULT_EXECUTION_TIMEOUT_MS } from '@/lib/core/execution-limits'
|
||||
import type { FirecrawlCrawlParams, FirecrawlCrawlResponse } from '@/tools/firecrawl/types'
|
||||
import { CRAWLED_PAGE_OUTPUT_PROPERTIES } from '@/tools/firecrawl/types'
|
||||
import type { ToolConfig } from '@/tools/types'
|
||||
|
||||
const logger = createLogger('FirecrawlCrawlTool')
|
||||
|
||||
const POLL_INTERVAL_MS = 5000
|
||||
const MAX_POLL_TIME_MS = DEFAULT_EXECUTION_TIMEOUT_MS
|
||||
const POLL_INTERVAL_MS = 5000 // 5 seconds between polls
|
||||
const MAX_POLL_TIME_MS = 300000 // 5 minutes maximum polling time
|
||||
|
||||
export const crawlTool: ToolConfig<FirecrawlCrawlParams, FirecrawlCrawlResponse> = {
|
||||
id: 'firecrawl_crawl',
|
||||
|
||||
@@ -1,12 +1,11 @@
|
||||
import { createLogger } from '@sim/logger'
|
||||
import { DEFAULT_EXECUTION_TIMEOUT_MS } from '@/lib/core/execution-limits'
|
||||
import type { ExtractParams, ExtractResponse } from '@/tools/firecrawl/types'
|
||||
import type { ToolConfig } from '@/tools/types'
|
||||
|
||||
const logger = createLogger('FirecrawlExtractTool')
|
||||
|
||||
const POLL_INTERVAL_MS = 5000
|
||||
const MAX_POLL_TIME_MS = DEFAULT_EXECUTION_TIMEOUT_MS
|
||||
const POLL_INTERVAL_MS = 5000 // 5 seconds between polls
|
||||
const MAX_POLL_TIME_MS = 300000 // 5 minutes maximum polling time
|
||||
|
||||
export const extractTool: ToolConfig<ExtractParams, ExtractResponse> = {
|
||||
id: 'firecrawl_extract',
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
import { createLogger } from '@sim/logger'
|
||||
import { generateInternalToken } from '@/lib/auth/internal'
|
||||
import { DEFAULT_EXECUTION_TIMEOUT_MS } from '@/lib/core/execution-limits'
|
||||
import {
|
||||
secureFetchWithPinnedIP,
|
||||
validateUrlWithDNS,
|
||||
@@ -629,8 +628,9 @@ async function executeToolRequest(
|
||||
let response: Response
|
||||
|
||||
if (isInternalRoute) {
|
||||
// Set up AbortController for timeout support on internal routes
|
||||
const controller = new AbortController()
|
||||
const timeout = requestParams.timeout || DEFAULT_EXECUTION_TIMEOUT_MS
|
||||
const timeout = requestParams.timeout || 300000
|
||||
const timeoutId = setTimeout(() => controller.abort(), timeout)
|
||||
|
||||
try {
|
||||
|
||||
@@ -1,5 +1,4 @@
|
||||
import { createLogger } from '@sim/logger'
|
||||
import { getMaxExecutionTimeout } from '@/lib/core/execution-limits'
|
||||
import { getBaseUrl } from '@/lib/core/utils/urls'
|
||||
import { AGENT, isCustomTool } from '@/executor/constants'
|
||||
import { getCustomTool } from '@/hooks/queries/custom-tools'
|
||||
@@ -123,7 +122,9 @@ export function formatRequestParams(tool: ToolConfig, params: Record<string, any
|
||||
}
|
||||
}
|
||||
|
||||
const MAX_TIMEOUT_MS = getMaxExecutionTimeout()
|
||||
// Get timeout from params (if specified) and validate
|
||||
// Must be a finite positive number, max 600000ms (10 minutes) as documented
|
||||
const MAX_TIMEOUT_MS = 600000
|
||||
const rawTimeout = params.timeout
|
||||
const timeout = rawTimeout != null ? Number(rawTimeout) : undefined
|
||||
const validTimeout =
|
||||
|
||||
@@ -6,7 +6,7 @@ export default defineConfig({
|
||||
project: env.TRIGGER_PROJECT_ID!,
|
||||
runtime: 'node',
|
||||
logLevel: 'log',
|
||||
maxDuration: 5400,
|
||||
maxDuration: 600,
|
||||
retries: {
|
||||
enabledInDev: false,
|
||||
default: {
|
||||
|
||||
Reference in New Issue
Block a user