mirror of
https://github.com/simstudioai/sim.git
synced 2026-02-04 19:55:08 -05:00
Compare commits
28 Commits
fix/restor
...
feat/timeo
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
fc2f9853c5 | ||
|
|
eff4fc922b | ||
|
|
e63a642f11 | ||
|
|
cb63e98dc8 | ||
|
|
7f23b9057c | ||
|
|
7b75f156b4 | ||
|
|
7d28f62398 | ||
|
|
b890120afb | ||
|
|
c519034c8d | ||
|
|
17f02f8ed2 | ||
|
|
32a571a22a | ||
|
|
593bda7d0b | ||
|
|
5565677f7c | ||
|
|
06ddd80ab4 | ||
|
|
fe27adfb7c | ||
|
|
ee06ee34f6 | ||
|
|
39d75892a3 | ||
|
|
424b6e6a61 | ||
|
|
066850b65a | ||
|
|
c332efd1e4 | ||
|
|
d2e4afd15b | ||
|
|
bbf5c66abd | ||
|
|
f104659330 | ||
|
|
eac163cfd0 | ||
|
|
b53ed5dae0 | ||
|
|
d7259e304a | ||
|
|
501b44e05a | ||
|
|
7c1e7273de |
@@ -213,6 +213,25 @@ Different subscription plans have different usage limits:
|
|||||||
| **Team** | $40/seat (pooled, adjustable) | 300 sync, 2,500 async |
|
| **Team** | $40/seat (pooled, adjustable) | 300 sync, 2,500 async |
|
||||||
| **Enterprise** | Custom | Custom |
|
| **Enterprise** | Custom | Custom |
|
||||||
|
|
||||||
|
## Execution Time Limits
|
||||||
|
|
||||||
|
Workflows have maximum execution time limits based on your subscription plan:
|
||||||
|
|
||||||
|
| Plan | Sync Execution | Async Execution |
|
||||||
|
|------|----------------|-----------------|
|
||||||
|
| **Free** | 5 minutes | 10 minutes |
|
||||||
|
| **Pro** | 60 minutes | 90 minutes |
|
||||||
|
| **Team** | 60 minutes | 90 minutes |
|
||||||
|
| **Enterprise** | 60 minutes | 90 minutes |
|
||||||
|
|
||||||
|
**Sync executions** run immediately and return results directly. These are triggered via the API with `async: false` (default) or through the UI.
|
||||||
|
**Async executions** (triggered via API with `async: true`, webhooks, or schedules) run in the background. Async time limits are up to 2x the sync limit, capped at 90 minutes.
|
||||||
|
|
||||||
|
|
||||||
|
<Callout type="info">
|
||||||
|
If a workflow exceeds its time limit, it will be terminated and marked as failed with a timeout error. Design long-running workflows to use async execution or break them into smaller workflows.
|
||||||
|
</Callout>
|
||||||
|
|
||||||
## Billing Model
|
## Billing Model
|
||||||
|
|
||||||
Sim uses a **base subscription + overage** billing model:
|
Sim uses a **base subscription + overage** billing model:
|
||||||
|
|||||||
@@ -11,7 +11,7 @@ import {
|
|||||||
Database,
|
Database,
|
||||||
DollarSign,
|
DollarSign,
|
||||||
HardDrive,
|
HardDrive,
|
||||||
Workflow,
|
Timer,
|
||||||
} from 'lucide-react'
|
} from 'lucide-react'
|
||||||
import { useRouter } from 'next/navigation'
|
import { useRouter } from 'next/navigation'
|
||||||
import { cn } from '@/lib/core/utils/cn'
|
import { cn } from '@/lib/core/utils/cn'
|
||||||
@@ -44,7 +44,7 @@ interface PricingTier {
|
|||||||
const FREE_PLAN_FEATURES: PricingFeature[] = [
|
const FREE_PLAN_FEATURES: PricingFeature[] = [
|
||||||
{ icon: DollarSign, text: '$20 usage limit' },
|
{ icon: DollarSign, text: '$20 usage limit' },
|
||||||
{ icon: HardDrive, text: '5GB file storage' },
|
{ icon: HardDrive, text: '5GB file storage' },
|
||||||
{ icon: Workflow, text: 'Public template access' },
|
{ icon: Timer, text: '5 min execution limit' },
|
||||||
{ icon: Database, text: 'Limited log retention' },
|
{ icon: Database, text: 'Limited log retention' },
|
||||||
{ icon: Code2, text: 'CLI/SDK Access' },
|
{ icon: Code2, text: 'CLI/SDK Access' },
|
||||||
]
|
]
|
||||||
|
|||||||
@@ -4,10 +4,12 @@ import { createLogger } from '@sim/logger'
|
|||||||
import { and, eq, lt, sql } from 'drizzle-orm'
|
import { and, eq, lt, sql } from 'drizzle-orm'
|
||||||
import { type NextRequest, NextResponse } from 'next/server'
|
import { type NextRequest, NextResponse } from 'next/server'
|
||||||
import { verifyCronAuth } from '@/lib/auth/internal'
|
import { verifyCronAuth } from '@/lib/auth/internal'
|
||||||
|
import { getMaxExecutionTimeout } from '@/lib/core/execution-limits'
|
||||||
|
|
||||||
const logger = createLogger('CleanupStaleExecutions')
|
const logger = createLogger('CleanupStaleExecutions')
|
||||||
|
|
||||||
const STALE_THRESHOLD_MINUTES = 30
|
const STALE_THRESHOLD_MS = getMaxExecutionTimeout() + 5 * 60 * 1000
|
||||||
|
const STALE_THRESHOLD_MINUTES = Math.ceil(STALE_THRESHOLD_MS / 60000)
|
||||||
const MAX_INT32 = 2_147_483_647
|
const MAX_INT32 = 2_147_483_647
|
||||||
|
|
||||||
export async function GET(request: NextRequest) {
|
export async function GET(request: NextRequest) {
|
||||||
|
|||||||
@@ -21,6 +21,7 @@ import { and, eq } from 'drizzle-orm'
|
|||||||
import { type NextRequest, NextResponse } from 'next/server'
|
import { type NextRequest, NextResponse } from 'next/server'
|
||||||
import { checkHybridAuth } from '@/lib/auth/hybrid'
|
import { checkHybridAuth } from '@/lib/auth/hybrid'
|
||||||
import { generateInternalToken } from '@/lib/auth/internal'
|
import { generateInternalToken } from '@/lib/auth/internal'
|
||||||
|
import { getMaxExecutionTimeout } from '@/lib/core/execution-limits'
|
||||||
import { getBaseUrl } from '@/lib/core/utils/urls'
|
import { getBaseUrl } from '@/lib/core/utils/urls'
|
||||||
|
|
||||||
const logger = createLogger('WorkflowMcpServeAPI')
|
const logger = createLogger('WorkflowMcpServeAPI')
|
||||||
@@ -264,7 +265,7 @@ async function handleToolsCall(
|
|||||||
method: 'POST',
|
method: 'POST',
|
||||||
headers,
|
headers,
|
||||||
body: JSON.stringify({ input: params.arguments || {}, triggerType: 'mcp' }),
|
body: JSON.stringify({ input: params.arguments || {}, triggerType: 'mcp' }),
|
||||||
signal: AbortSignal.timeout(600000), // 10 minute timeout
|
signal: AbortSignal.timeout(getMaxExecutionTimeout()),
|
||||||
})
|
})
|
||||||
|
|
||||||
const executeResult = await response.json()
|
const executeResult = await response.json()
|
||||||
|
|||||||
@@ -1,5 +1,8 @@
|
|||||||
import { createLogger } from '@sim/logger'
|
import { createLogger } from '@sim/logger'
|
||||||
import type { NextRequest } from 'next/server'
|
import type { NextRequest } from 'next/server'
|
||||||
|
import { getHighestPrioritySubscription } from '@/lib/billing/core/plan'
|
||||||
|
import { getExecutionTimeout } from '@/lib/core/execution-limits'
|
||||||
|
import type { SubscriptionPlan } from '@/lib/core/rate-limiter/types'
|
||||||
import { getParsedBody, withMcpAuth } from '@/lib/mcp/middleware'
|
import { getParsedBody, withMcpAuth } from '@/lib/mcp/middleware'
|
||||||
import { mcpService } from '@/lib/mcp/service'
|
import { mcpService } from '@/lib/mcp/service'
|
||||||
import type { McpTool, McpToolCall, McpToolResult } from '@/lib/mcp/types'
|
import type { McpTool, McpToolCall, McpToolResult } from '@/lib/mcp/types'
|
||||||
@@ -7,7 +10,6 @@ import {
|
|||||||
categorizeError,
|
categorizeError,
|
||||||
createMcpErrorResponse,
|
createMcpErrorResponse,
|
||||||
createMcpSuccessResponse,
|
createMcpSuccessResponse,
|
||||||
MCP_CONSTANTS,
|
|
||||||
validateStringParam,
|
validateStringParam,
|
||||||
} from '@/lib/mcp/utils'
|
} from '@/lib/mcp/utils'
|
||||||
|
|
||||||
@@ -171,13 +173,16 @@ export const POST = withMcpAuth('read')(
|
|||||||
arguments: args,
|
arguments: args,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const userSubscription = await getHighestPrioritySubscription(userId)
|
||||||
|
const executionTimeout = getExecutionTimeout(
|
||||||
|
userSubscription?.plan as SubscriptionPlan | undefined,
|
||||||
|
'sync'
|
||||||
|
)
|
||||||
|
|
||||||
const result = await Promise.race([
|
const result = await Promise.race([
|
||||||
mcpService.executeTool(userId, serverId, toolCall, workspaceId),
|
mcpService.executeTool(userId, serverId, toolCall, workspaceId),
|
||||||
new Promise<never>((_, reject) =>
|
new Promise<never>((_, reject) =>
|
||||||
setTimeout(
|
setTimeout(() => reject(new Error('Tool execution timeout')), executionTimeout)
|
||||||
() => reject(new Error('Tool execution timeout')),
|
|
||||||
MCP_CONSTANTS.EXECUTION_TIMEOUT
|
|
||||||
)
|
|
||||||
),
|
),
|
||||||
])
|
])
|
||||||
|
|
||||||
|
|||||||
@@ -3,6 +3,7 @@ import { type NextRequest, NextResponse } from 'next/server'
|
|||||||
import { z } from 'zod'
|
import { z } from 'zod'
|
||||||
import { checkInternalAuth } from '@/lib/auth/hybrid'
|
import { checkInternalAuth } from '@/lib/auth/hybrid'
|
||||||
import { generateRequestId } from '@/lib/core/utils/request'
|
import { generateRequestId } from '@/lib/core/utils/request'
|
||||||
|
import { httpHeaderSafeJson } from '@/lib/core/utils/validation'
|
||||||
import { FileInputSchema } from '@/lib/uploads/utils/file-schemas'
|
import { FileInputSchema } from '@/lib/uploads/utils/file-schemas'
|
||||||
import { processFilesToUserFiles, type RawFileInput } from '@/lib/uploads/utils/file-utils'
|
import { processFilesToUserFiles, type RawFileInput } from '@/lib/uploads/utils/file-utils'
|
||||||
import { downloadFileFromStorage } from '@/lib/uploads/utils/file-utils.server'
|
import { downloadFileFromStorage } from '@/lib/uploads/utils/file-utils.server'
|
||||||
@@ -11,16 +12,6 @@ export const dynamic = 'force-dynamic'
|
|||||||
|
|
||||||
const logger = createLogger('DropboxUploadAPI')
|
const logger = createLogger('DropboxUploadAPI')
|
||||||
|
|
||||||
/**
|
|
||||||
* Escapes non-ASCII characters in JSON string for HTTP header safety.
|
|
||||||
* Dropbox API requires characters 0x7F and all non-ASCII to be escaped as \uXXXX.
|
|
||||||
*/
|
|
||||||
function httpHeaderSafeJson(value: object): string {
|
|
||||||
return JSON.stringify(value).replace(/[\u007f-\uffff]/g, (c) => {
|
|
||||||
return '\\u' + ('0000' + c.charCodeAt(0).toString(16)).slice(-4)
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
const DropboxUploadSchema = z.object({
|
const DropboxUploadSchema = z.object({
|
||||||
accessToken: z.string().min(1, 'Access token is required'),
|
accessToken: z.string().min(1, 'Access token is required'),
|
||||||
path: z.string().min(1, 'Destination path is required'),
|
path: z.string().min(1, 'Destination path is required'),
|
||||||
|
|||||||
@@ -2,6 +2,7 @@ import { createLogger } from '@sim/logger'
|
|||||||
import { type NextRequest, NextResponse } from 'next/server'
|
import { type NextRequest, NextResponse } from 'next/server'
|
||||||
import { extractAudioFromVideo, isVideoFile } from '@/lib/audio/extractor'
|
import { extractAudioFromVideo, isVideoFile } from '@/lib/audio/extractor'
|
||||||
import { checkInternalAuth } from '@/lib/auth/hybrid'
|
import { checkInternalAuth } from '@/lib/auth/hybrid'
|
||||||
|
import { DEFAULT_EXECUTION_TIMEOUT_MS } from '@/lib/core/execution-limits'
|
||||||
import {
|
import {
|
||||||
secureFetchWithPinnedIP,
|
secureFetchWithPinnedIP,
|
||||||
validateUrlWithDNS,
|
validateUrlWithDNS,
|
||||||
@@ -636,7 +637,8 @@ async function transcribeWithAssemblyAI(
|
|||||||
|
|
||||||
let transcript: any
|
let transcript: any
|
||||||
let attempts = 0
|
let attempts = 0
|
||||||
const maxAttempts = 60 // 5 minutes with 5-second intervals
|
const pollIntervalMs = 5000
|
||||||
|
const maxAttempts = Math.ceil(DEFAULT_EXECUTION_TIMEOUT_MS / pollIntervalMs)
|
||||||
|
|
||||||
while (attempts < maxAttempts) {
|
while (attempts < maxAttempts) {
|
||||||
const statusResponse = await fetch(`https://api.assemblyai.com/v2/transcript/${id}`, {
|
const statusResponse = await fetch(`https://api.assemblyai.com/v2/transcript/${id}`, {
|
||||||
|
|||||||
@@ -3,6 +3,7 @@ import { createLogger } from '@sim/logger'
|
|||||||
import { type NextRequest, NextResponse } from 'next/server'
|
import { type NextRequest, NextResponse } from 'next/server'
|
||||||
import { z } from 'zod'
|
import { z } from 'zod'
|
||||||
import { checkInternalAuth } from '@/lib/auth/hybrid'
|
import { checkInternalAuth } from '@/lib/auth/hybrid'
|
||||||
|
import { DEFAULT_EXECUTION_TIMEOUT_MS } from '@/lib/core/execution-limits'
|
||||||
import { validateAwsRegion, validateS3BucketName } from '@/lib/core/security/input-validation'
|
import { validateAwsRegion, validateS3BucketName } from '@/lib/core/security/input-validation'
|
||||||
import {
|
import {
|
||||||
secureFetchWithPinnedIP,
|
secureFetchWithPinnedIP,
|
||||||
@@ -226,8 +227,8 @@ async function pollForJobCompletion(
|
|||||||
useAnalyzeDocument: boolean,
|
useAnalyzeDocument: boolean,
|
||||||
requestId: string
|
requestId: string
|
||||||
): Promise<Record<string, unknown>> {
|
): Promise<Record<string, unknown>> {
|
||||||
const pollIntervalMs = 5000 // 5 seconds between polls
|
const pollIntervalMs = 5000
|
||||||
const maxPollTimeMs = 180000 // 3 minutes maximum polling time
|
const maxPollTimeMs = DEFAULT_EXECUTION_TIMEOUT_MS
|
||||||
const maxAttempts = Math.ceil(maxPollTimeMs / pollIntervalMs)
|
const maxAttempts = Math.ceil(maxPollTimeMs / pollIntervalMs)
|
||||||
|
|
||||||
const getTarget = useAnalyzeDocument
|
const getTarget = useAnalyzeDocument
|
||||||
|
|||||||
@@ -2,6 +2,7 @@ import { createLogger } from '@sim/logger'
|
|||||||
import type { NextRequest } from 'next/server'
|
import type { NextRequest } from 'next/server'
|
||||||
import { NextResponse } from 'next/server'
|
import { NextResponse } from 'next/server'
|
||||||
import { checkInternalAuth } from '@/lib/auth/hybrid'
|
import { checkInternalAuth } from '@/lib/auth/hybrid'
|
||||||
|
import { DEFAULT_EXECUTION_TIMEOUT_MS } from '@/lib/core/execution-limits'
|
||||||
import { validateAlphanumericId } from '@/lib/core/security/input-validation'
|
import { validateAlphanumericId } from '@/lib/core/security/input-validation'
|
||||||
import { getBaseUrl } from '@/lib/core/utils/urls'
|
import { getBaseUrl } from '@/lib/core/utils/urls'
|
||||||
import { StorageService } from '@/lib/uploads'
|
import { StorageService } from '@/lib/uploads'
|
||||||
@@ -60,7 +61,7 @@ export async function POST(request: NextRequest) {
|
|||||||
text,
|
text,
|
||||||
model_id: modelId,
|
model_id: modelId,
|
||||||
}),
|
}),
|
||||||
signal: AbortSignal.timeout(60000),
|
signal: AbortSignal.timeout(DEFAULT_EXECUTION_TIMEOUT_MS),
|
||||||
})
|
})
|
||||||
|
|
||||||
if (!response.ok) {
|
if (!response.ok) {
|
||||||
|
|||||||
@@ -1,6 +1,7 @@
|
|||||||
import { createLogger } from '@sim/logger'
|
import { createLogger } from '@sim/logger'
|
||||||
import { type NextRequest, NextResponse } from 'next/server'
|
import { type NextRequest, NextResponse } from 'next/server'
|
||||||
import { checkInternalAuth } from '@/lib/auth/hybrid'
|
import { checkInternalAuth } from '@/lib/auth/hybrid'
|
||||||
|
import { getMaxExecutionTimeout } from '@/lib/core/execution-limits'
|
||||||
import { downloadFileFromStorage } from '@/lib/uploads/utils/file-utils.server'
|
import { downloadFileFromStorage } from '@/lib/uploads/utils/file-utils.server'
|
||||||
import type { UserFile } from '@/executor/types'
|
import type { UserFile } from '@/executor/types'
|
||||||
import type { VideoRequestBody } from '@/tools/video/types'
|
import type { VideoRequestBody } from '@/tools/video/types'
|
||||||
@@ -326,11 +327,12 @@ async function generateWithRunway(
|
|||||||
|
|
||||||
logger.info(`[${requestId}] Runway task created: ${taskId}`)
|
logger.info(`[${requestId}] Runway task created: ${taskId}`)
|
||||||
|
|
||||||
const maxAttempts = 120 // 10 minutes with 5-second intervals
|
const pollIntervalMs = 5000
|
||||||
|
const maxAttempts = Math.ceil(getMaxExecutionTimeout() / pollIntervalMs)
|
||||||
let attempts = 0
|
let attempts = 0
|
||||||
|
|
||||||
while (attempts < maxAttempts) {
|
while (attempts < maxAttempts) {
|
||||||
await sleep(5000) // Poll every 5 seconds
|
await sleep(pollIntervalMs)
|
||||||
|
|
||||||
const statusResponse = await fetch(`https://api.dev.runwayml.com/v1/tasks/${taskId}`, {
|
const statusResponse = await fetch(`https://api.dev.runwayml.com/v1/tasks/${taskId}`, {
|
||||||
headers: {
|
headers: {
|
||||||
@@ -370,7 +372,7 @@ async function generateWithRunway(
|
|||||||
attempts++
|
attempts++
|
||||||
}
|
}
|
||||||
|
|
||||||
throw new Error('Runway generation timed out after 10 minutes')
|
throw new Error('Runway generation timed out')
|
||||||
}
|
}
|
||||||
|
|
||||||
async function generateWithVeo(
|
async function generateWithVeo(
|
||||||
@@ -429,11 +431,12 @@ async function generateWithVeo(
|
|||||||
|
|
||||||
logger.info(`[${requestId}] Veo operation created: ${operationName}`)
|
logger.info(`[${requestId}] Veo operation created: ${operationName}`)
|
||||||
|
|
||||||
const maxAttempts = 60 // 5 minutes with 5-second intervals
|
const pollIntervalMs = 5000
|
||||||
|
const maxAttempts = Math.ceil(getMaxExecutionTimeout() / pollIntervalMs)
|
||||||
let attempts = 0
|
let attempts = 0
|
||||||
|
|
||||||
while (attempts < maxAttempts) {
|
while (attempts < maxAttempts) {
|
||||||
await sleep(5000)
|
await sleep(pollIntervalMs)
|
||||||
|
|
||||||
const statusResponse = await fetch(
|
const statusResponse = await fetch(
|
||||||
`https://generativelanguage.googleapis.com/v1beta/${operationName}`,
|
`https://generativelanguage.googleapis.com/v1beta/${operationName}`,
|
||||||
@@ -485,7 +488,7 @@ async function generateWithVeo(
|
|||||||
attempts++
|
attempts++
|
||||||
}
|
}
|
||||||
|
|
||||||
throw new Error('Veo generation timed out after 5 minutes')
|
throw new Error('Veo generation timed out')
|
||||||
}
|
}
|
||||||
|
|
||||||
async function generateWithLuma(
|
async function generateWithLuma(
|
||||||
@@ -541,11 +544,12 @@ async function generateWithLuma(
|
|||||||
|
|
||||||
logger.info(`[${requestId}] Luma generation created: ${generationId}`)
|
logger.info(`[${requestId}] Luma generation created: ${generationId}`)
|
||||||
|
|
||||||
const maxAttempts = 120 // 10 minutes
|
const pollIntervalMs = 5000
|
||||||
|
const maxAttempts = Math.ceil(getMaxExecutionTimeout() / pollIntervalMs)
|
||||||
let attempts = 0
|
let attempts = 0
|
||||||
|
|
||||||
while (attempts < maxAttempts) {
|
while (attempts < maxAttempts) {
|
||||||
await sleep(5000)
|
await sleep(pollIntervalMs)
|
||||||
|
|
||||||
const statusResponse = await fetch(
|
const statusResponse = await fetch(
|
||||||
`https://api.lumalabs.ai/dream-machine/v1/generations/${generationId}`,
|
`https://api.lumalabs.ai/dream-machine/v1/generations/${generationId}`,
|
||||||
@@ -592,7 +596,7 @@ async function generateWithLuma(
|
|||||||
attempts++
|
attempts++
|
||||||
}
|
}
|
||||||
|
|
||||||
throw new Error('Luma generation timed out after 10 minutes')
|
throw new Error('Luma generation timed out')
|
||||||
}
|
}
|
||||||
|
|
||||||
async function generateWithMiniMax(
|
async function generateWithMiniMax(
|
||||||
@@ -658,14 +662,13 @@ async function generateWithMiniMax(
|
|||||||
|
|
||||||
logger.info(`[${requestId}] MiniMax task created: ${taskId}`)
|
logger.info(`[${requestId}] MiniMax task created: ${taskId}`)
|
||||||
|
|
||||||
// Poll for completion (6-10 minutes typical)
|
const pollIntervalMs = 5000
|
||||||
const maxAttempts = 120 // 10 minutes with 5-second intervals
|
const maxAttempts = Math.ceil(getMaxExecutionTimeout() / pollIntervalMs)
|
||||||
let attempts = 0
|
let attempts = 0
|
||||||
|
|
||||||
while (attempts < maxAttempts) {
|
while (attempts < maxAttempts) {
|
||||||
await sleep(5000)
|
await sleep(pollIntervalMs)
|
||||||
|
|
||||||
// Query task status
|
|
||||||
const statusResponse = await fetch(
|
const statusResponse = await fetch(
|
||||||
`https://api.minimax.io/v1/query/video_generation?task_id=${taskId}`,
|
`https://api.minimax.io/v1/query/video_generation?task_id=${taskId}`,
|
||||||
{
|
{
|
||||||
@@ -743,7 +746,7 @@ async function generateWithMiniMax(
|
|||||||
attempts++
|
attempts++
|
||||||
}
|
}
|
||||||
|
|
||||||
throw new Error('MiniMax generation timed out after 10 minutes')
|
throw new Error('MiniMax generation timed out')
|
||||||
}
|
}
|
||||||
|
|
||||||
// Helper function to strip subpaths from Fal.ai model IDs for status/result endpoints
|
// Helper function to strip subpaths from Fal.ai model IDs for status/result endpoints
|
||||||
@@ -861,11 +864,12 @@ async function generateWithFalAI(
|
|||||||
// Get base model ID (without subpath) for status and result endpoints
|
// Get base model ID (without subpath) for status and result endpoints
|
||||||
const baseModelId = getBaseModelId(falModelId)
|
const baseModelId = getBaseModelId(falModelId)
|
||||||
|
|
||||||
const maxAttempts = 96 // 8 minutes with 5-second intervals
|
const pollIntervalMs = 5000
|
||||||
|
const maxAttempts = Math.ceil(getMaxExecutionTimeout() / pollIntervalMs)
|
||||||
let attempts = 0
|
let attempts = 0
|
||||||
|
|
||||||
while (attempts < maxAttempts) {
|
while (attempts < maxAttempts) {
|
||||||
await sleep(5000)
|
await sleep(pollIntervalMs)
|
||||||
|
|
||||||
const statusResponse = await fetch(
|
const statusResponse = await fetch(
|
||||||
`https://queue.fal.run/${baseModelId}/requests/${requestIdFal}/status`,
|
`https://queue.fal.run/${baseModelId}/requests/${requestIdFal}/status`,
|
||||||
@@ -938,7 +942,7 @@ async function generateWithFalAI(
|
|||||||
attempts++
|
attempts++
|
||||||
}
|
}
|
||||||
|
|
||||||
throw new Error('Fal.ai generation timed out after 8 minutes')
|
throw new Error('Fal.ai generation timed out')
|
||||||
}
|
}
|
||||||
|
|
||||||
function getVideoDimensions(
|
function getVideoDimensions(
|
||||||
|
|||||||
@@ -3,11 +3,13 @@ import { type NextRequest, NextResponse } from 'next/server'
|
|||||||
import { v4 as uuidv4 } from 'uuid'
|
import { v4 as uuidv4 } from 'uuid'
|
||||||
import { z } from 'zod'
|
import { z } from 'zod'
|
||||||
import { checkHybridAuth } from '@/lib/auth/hybrid'
|
import { checkHybridAuth } from '@/lib/auth/hybrid'
|
||||||
|
import { getTimeoutErrorMessage, isTimeoutError } from '@/lib/core/execution-limits'
|
||||||
import { generateRequestId } from '@/lib/core/utils/request'
|
import { generateRequestId } from '@/lib/core/utils/request'
|
||||||
import { SSE_HEADERS } from '@/lib/core/utils/sse'
|
import { SSE_HEADERS } from '@/lib/core/utils/sse'
|
||||||
import { markExecutionCancelled } from '@/lib/execution/cancellation'
|
import { markExecutionCancelled } from '@/lib/execution/cancellation'
|
||||||
import { preprocessExecution } from '@/lib/execution/preprocessing'
|
import { preprocessExecution } from '@/lib/execution/preprocessing'
|
||||||
import { LoggingSession } from '@/lib/logs/execution/logging-session'
|
import { LoggingSession } from '@/lib/logs/execution/logging-session'
|
||||||
|
import { buildTraceSpans } from '@/lib/logs/execution/trace-spans/trace-spans'
|
||||||
import { executeWorkflowCore } from '@/lib/workflows/executor/execution-core'
|
import { executeWorkflowCore } from '@/lib/workflows/executor/execution-core'
|
||||||
import { createSSECallbacks } from '@/lib/workflows/executor/execution-events'
|
import { createSSECallbacks } from '@/lib/workflows/executor/execution-events'
|
||||||
import { ExecutionSnapshot } from '@/executor/execution/snapshot'
|
import { ExecutionSnapshot } from '@/executor/execution/snapshot'
|
||||||
@@ -116,6 +118,16 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
|
|||||||
const loggingSession = new LoggingSession(workflowId, executionId, 'manual', requestId)
|
const loggingSession = new LoggingSession(workflowId, executionId, 'manual', requestId)
|
||||||
const abortController = new AbortController()
|
const abortController = new AbortController()
|
||||||
let isStreamClosed = false
|
let isStreamClosed = false
|
||||||
|
let isTimedOut = false
|
||||||
|
|
||||||
|
const syncTimeout = preprocessResult.executionTimeout?.sync
|
||||||
|
let timeoutId: NodeJS.Timeout | undefined
|
||||||
|
if (syncTimeout) {
|
||||||
|
timeoutId = setTimeout(() => {
|
||||||
|
isTimedOut = true
|
||||||
|
abortController.abort()
|
||||||
|
}, syncTimeout)
|
||||||
|
}
|
||||||
|
|
||||||
const stream = new ReadableStream<Uint8Array>({
|
const stream = new ReadableStream<Uint8Array>({
|
||||||
async start(controller) {
|
async start(controller) {
|
||||||
@@ -167,13 +179,33 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
|
|||||||
})
|
})
|
||||||
|
|
||||||
if (result.status === 'cancelled') {
|
if (result.status === 'cancelled') {
|
||||||
sendEvent({
|
if (isTimedOut && syncTimeout) {
|
||||||
type: 'execution:cancelled',
|
const timeoutErrorMessage = getTimeoutErrorMessage(null, syncTimeout)
|
||||||
timestamp: new Date().toISOString(),
|
logger.info(`[${requestId}] Run-from-block execution timed out`, {
|
||||||
executionId,
|
timeoutMs: syncTimeout,
|
||||||
workflowId,
|
})
|
||||||
data: { duration: result.metadata?.duration || 0 },
|
|
||||||
})
|
await loggingSession.markAsFailed(timeoutErrorMessage)
|
||||||
|
|
||||||
|
sendEvent({
|
||||||
|
type: 'execution:error',
|
||||||
|
timestamp: new Date().toISOString(),
|
||||||
|
executionId,
|
||||||
|
workflowId,
|
||||||
|
data: {
|
||||||
|
error: timeoutErrorMessage,
|
||||||
|
duration: result.metadata?.duration || 0,
|
||||||
|
},
|
||||||
|
})
|
||||||
|
} else {
|
||||||
|
sendEvent({
|
||||||
|
type: 'execution:cancelled',
|
||||||
|
timestamp: new Date().toISOString(),
|
||||||
|
executionId,
|
||||||
|
workflowId,
|
||||||
|
data: { duration: result.metadata?.duration || 0 },
|
||||||
|
})
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
sendEvent({
|
sendEvent({
|
||||||
type: 'execution:completed',
|
type: 'execution:completed',
|
||||||
@@ -190,10 +222,27 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
} catch (error: unknown) {
|
} catch (error: unknown) {
|
||||||
const errorMessage = error instanceof Error ? error.message : 'Unknown error'
|
const isTimeout = isTimeoutError(error) || isTimedOut
|
||||||
logger.error(`[${requestId}] Run-from-block execution failed: ${errorMessage}`)
|
const errorMessage = isTimeout
|
||||||
|
? getTimeoutErrorMessage(error, syncTimeout)
|
||||||
|
: error instanceof Error
|
||||||
|
? error.message
|
||||||
|
: 'Unknown error'
|
||||||
|
|
||||||
|
logger.error(`[${requestId}] Run-from-block execution failed: ${errorMessage}`, {
|
||||||
|
isTimeout,
|
||||||
|
})
|
||||||
|
|
||||||
const executionResult = hasExecutionResult(error) ? error.executionResult : undefined
|
const executionResult = hasExecutionResult(error) ? error.executionResult : undefined
|
||||||
|
const { traceSpans, totalDuration } = executionResult
|
||||||
|
? buildTraceSpans(executionResult)
|
||||||
|
: { traceSpans: [], totalDuration: 0 }
|
||||||
|
|
||||||
|
await loggingSession.safeCompleteWithError({
|
||||||
|
totalDurationMs: totalDuration || executionResult?.metadata?.duration,
|
||||||
|
error: { message: errorMessage },
|
||||||
|
traceSpans,
|
||||||
|
})
|
||||||
|
|
||||||
sendEvent({
|
sendEvent({
|
||||||
type: 'execution:error',
|
type: 'execution:error',
|
||||||
@@ -206,6 +255,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
|
|||||||
},
|
},
|
||||||
})
|
})
|
||||||
} finally {
|
} finally {
|
||||||
|
if (timeoutId) clearTimeout(timeoutId)
|
||||||
if (!isStreamClosed) {
|
if (!isStreamClosed) {
|
||||||
try {
|
try {
|
||||||
controller.enqueue(new TextEncoder().encode('data: [DONE]\n\n'))
|
controller.enqueue(new TextEncoder().encode('data: [DONE]\n\n'))
|
||||||
@@ -216,6 +266,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
|
|||||||
},
|
},
|
||||||
cancel() {
|
cancel() {
|
||||||
isStreamClosed = true
|
isStreamClosed = true
|
||||||
|
if (timeoutId) clearTimeout(timeoutId)
|
||||||
abortController.abort()
|
abortController.abort()
|
||||||
markExecutionCancelled(executionId).catch(() => {})
|
markExecutionCancelled(executionId).catch(() => {})
|
||||||
},
|
},
|
||||||
|
|||||||
@@ -5,6 +5,11 @@ import { validate as uuidValidate, v4 as uuidv4 } from 'uuid'
|
|||||||
import { z } from 'zod'
|
import { z } from 'zod'
|
||||||
import { checkHybridAuth } from '@/lib/auth/hybrid'
|
import { checkHybridAuth } from '@/lib/auth/hybrid'
|
||||||
import { isTriggerDevEnabled } from '@/lib/core/config/feature-flags'
|
import { isTriggerDevEnabled } from '@/lib/core/config/feature-flags'
|
||||||
|
import {
|
||||||
|
createTimeoutAbortController,
|
||||||
|
getTimeoutErrorMessage,
|
||||||
|
isTimeoutError,
|
||||||
|
} from '@/lib/core/execution-limits'
|
||||||
import { generateRequestId } from '@/lib/core/utils/request'
|
import { generateRequestId } from '@/lib/core/utils/request'
|
||||||
import { SSE_HEADERS } from '@/lib/core/utils/sse'
|
import { SSE_HEADERS } from '@/lib/core/utils/sse'
|
||||||
import { getBaseUrl } from '@/lib/core/utils/urls'
|
import { getBaseUrl } from '@/lib/core/utils/urls'
|
||||||
@@ -12,6 +17,7 @@ import { markExecutionCancelled } from '@/lib/execution/cancellation'
|
|||||||
import { processInputFileFields } from '@/lib/execution/files'
|
import { processInputFileFields } from '@/lib/execution/files'
|
||||||
import { preprocessExecution } from '@/lib/execution/preprocessing'
|
import { preprocessExecution } from '@/lib/execution/preprocessing'
|
||||||
import { LoggingSession } from '@/lib/logs/execution/logging-session'
|
import { LoggingSession } from '@/lib/logs/execution/logging-session'
|
||||||
|
import { buildTraceSpans } from '@/lib/logs/execution/trace-spans/trace-spans'
|
||||||
import {
|
import {
|
||||||
cleanupExecutionBase64Cache,
|
cleanupExecutionBase64Cache,
|
||||||
hydrateUserFilesWithBase64,
|
hydrateUserFilesWithBase64,
|
||||||
@@ -120,10 +126,6 @@ type AsyncExecutionParams = {
|
|||||||
triggerType: CoreTriggerType
|
triggerType: CoreTriggerType
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Handles async workflow execution by queueing a background job.
|
|
||||||
* Returns immediately with a 202 Accepted response containing the job ID.
|
|
||||||
*/
|
|
||||||
async function handleAsyncExecution(params: AsyncExecutionParams): Promise<NextResponse> {
|
async function handleAsyncExecution(params: AsyncExecutionParams): Promise<NextResponse> {
|
||||||
const { requestId, workflowId, userId, input, triggerType } = params
|
const { requestId, workflowId, userId, input, triggerType } = params
|
||||||
|
|
||||||
@@ -405,6 +407,10 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
|
|||||||
|
|
||||||
if (!enableSSE) {
|
if (!enableSSE) {
|
||||||
logger.info(`[${requestId}] Using non-SSE execution (direct JSON response)`)
|
logger.info(`[${requestId}] Using non-SSE execution (direct JSON response)`)
|
||||||
|
const timeoutController = createTimeoutAbortController(
|
||||||
|
preprocessResult.executionTimeout?.sync
|
||||||
|
)
|
||||||
|
|
||||||
try {
|
try {
|
||||||
const metadata: ExecutionMetadata = {
|
const metadata: ExecutionMetadata = {
|
||||||
requestId,
|
requestId,
|
||||||
@@ -438,8 +444,37 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
|
|||||||
includeFileBase64,
|
includeFileBase64,
|
||||||
base64MaxBytes,
|
base64MaxBytes,
|
||||||
stopAfterBlockId,
|
stopAfterBlockId,
|
||||||
|
abortSignal: timeoutController.signal,
|
||||||
})
|
})
|
||||||
|
|
||||||
|
if (
|
||||||
|
result.status === 'cancelled' &&
|
||||||
|
timeoutController.isTimedOut() &&
|
||||||
|
timeoutController.timeoutMs
|
||||||
|
) {
|
||||||
|
const timeoutErrorMessage = getTimeoutErrorMessage(null, timeoutController.timeoutMs)
|
||||||
|
logger.info(`[${requestId}] Non-SSE execution timed out`, {
|
||||||
|
timeoutMs: timeoutController.timeoutMs,
|
||||||
|
})
|
||||||
|
await loggingSession.markAsFailed(timeoutErrorMessage)
|
||||||
|
|
||||||
|
return NextResponse.json(
|
||||||
|
{
|
||||||
|
success: false,
|
||||||
|
output: result.output,
|
||||||
|
error: timeoutErrorMessage,
|
||||||
|
metadata: result.metadata
|
||||||
|
? {
|
||||||
|
duration: result.metadata.duration,
|
||||||
|
startTime: result.metadata.startTime,
|
||||||
|
endTime: result.metadata.endTime,
|
||||||
|
}
|
||||||
|
: undefined,
|
||||||
|
},
|
||||||
|
{ status: 408 }
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
const outputWithBase64 = includeFileBase64
|
const outputWithBase64 = includeFileBase64
|
||||||
? ((await hydrateUserFilesWithBase64(result.output, {
|
? ((await hydrateUserFilesWithBase64(result.output, {
|
||||||
requestId,
|
requestId,
|
||||||
@@ -450,9 +485,6 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
|
|||||||
|
|
||||||
const resultWithBase64 = { ...result, output: outputWithBase64 }
|
const resultWithBase64 = { ...result, output: outputWithBase64 }
|
||||||
|
|
||||||
// Cleanup base64 cache for this execution
|
|
||||||
await cleanupExecutionBase64Cache(executionId)
|
|
||||||
|
|
||||||
const hasResponseBlock = workflowHasResponseBlock(resultWithBase64)
|
const hasResponseBlock = workflowHasResponseBlock(resultWithBase64)
|
||||||
if (hasResponseBlock) {
|
if (hasResponseBlock) {
|
||||||
return createHttpResponseFromBlock(resultWithBase64)
|
return createHttpResponseFromBlock(resultWithBase64)
|
||||||
@@ -474,10 +506,17 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
|
|||||||
return NextResponse.json(filteredResult)
|
return NextResponse.json(filteredResult)
|
||||||
} catch (error: unknown) {
|
} catch (error: unknown) {
|
||||||
const errorMessage = error instanceof Error ? error.message : 'Unknown error'
|
const errorMessage = error instanceof Error ? error.message : 'Unknown error'
|
||||||
|
|
||||||
logger.error(`[${requestId}] Non-SSE execution failed: ${errorMessage}`)
|
logger.error(`[${requestId}] Non-SSE execution failed: ${errorMessage}`)
|
||||||
|
|
||||||
const executionResult = hasExecutionResult(error) ? error.executionResult : undefined
|
const executionResult = hasExecutionResult(error) ? error.executionResult : undefined
|
||||||
|
|
||||||
|
await loggingSession.safeCompleteWithError({
|
||||||
|
totalDurationMs: executionResult?.metadata?.duration,
|
||||||
|
error: { message: errorMessage },
|
||||||
|
traceSpans: executionResult?.logs as any,
|
||||||
|
})
|
||||||
|
|
||||||
return NextResponse.json(
|
return NextResponse.json(
|
||||||
{
|
{
|
||||||
success: false,
|
success: false,
|
||||||
@@ -493,6 +532,15 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
|
|||||||
},
|
},
|
||||||
{ status: 500 }
|
{ status: 500 }
|
||||||
)
|
)
|
||||||
|
} finally {
|
||||||
|
timeoutController.cleanup()
|
||||||
|
if (executionId) {
|
||||||
|
try {
|
||||||
|
await cleanupExecutionBase64Cache(executionId)
|
||||||
|
} catch (error) {
|
||||||
|
logger.error(`[${requestId}] Failed to cleanup base64 cache`, { error })
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -506,7 +554,6 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
|
|||||||
cachedWorkflowData?.blocks || {}
|
cachedWorkflowData?.blocks || {}
|
||||||
)
|
)
|
||||||
const streamVariables = cachedWorkflowData?.variables ?? (workflow as any).variables
|
const streamVariables = cachedWorkflowData?.variables ?? (workflow as any).variables
|
||||||
|
|
||||||
const stream = await createStreamingResponse({
|
const stream = await createStreamingResponse({
|
||||||
requestId,
|
requestId,
|
||||||
workflow: {
|
workflow: {
|
||||||
@@ -524,6 +571,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
|
|||||||
workflowTriggerType: triggerType === 'chat' ? 'chat' : 'api',
|
workflowTriggerType: triggerType === 'chat' ? 'chat' : 'api',
|
||||||
includeFileBase64,
|
includeFileBase64,
|
||||||
base64MaxBytes,
|
base64MaxBytes,
|
||||||
|
timeoutMs: preprocessResult.executionTimeout?.sync,
|
||||||
},
|
},
|
||||||
executionId,
|
executionId,
|
||||||
})
|
})
|
||||||
@@ -535,7 +583,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
|
|||||||
}
|
}
|
||||||
|
|
||||||
const encoder = new TextEncoder()
|
const encoder = new TextEncoder()
|
||||||
const abortController = new AbortController()
|
const timeoutController = createTimeoutAbortController(preprocessResult.executionTimeout?.sync)
|
||||||
let isStreamClosed = false
|
let isStreamClosed = false
|
||||||
|
|
||||||
const stream = new ReadableStream<Uint8Array>({
|
const stream = new ReadableStream<Uint8Array>({
|
||||||
@@ -731,7 +779,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
|
|||||||
onStream,
|
onStream,
|
||||||
},
|
},
|
||||||
loggingSession,
|
loggingSession,
|
||||||
abortSignal: abortController.signal,
|
abortSignal: timeoutController.signal,
|
||||||
includeFileBase64,
|
includeFileBase64,
|
||||||
base64MaxBytes,
|
base64MaxBytes,
|
||||||
stopAfterBlockId,
|
stopAfterBlockId,
|
||||||
@@ -767,16 +815,37 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (result.status === 'cancelled') {
|
if (result.status === 'cancelled') {
|
||||||
logger.info(`[${requestId}] Workflow execution was cancelled`)
|
if (timeoutController.isTimedOut() && timeoutController.timeoutMs) {
|
||||||
sendEvent({
|
const timeoutErrorMessage = getTimeoutErrorMessage(null, timeoutController.timeoutMs)
|
||||||
type: 'execution:cancelled',
|
logger.info(`[${requestId}] Workflow execution timed out`, {
|
||||||
timestamp: new Date().toISOString(),
|
timeoutMs: timeoutController.timeoutMs,
|
||||||
executionId,
|
})
|
||||||
workflowId,
|
|
||||||
data: {
|
await loggingSession.markAsFailed(timeoutErrorMessage)
|
||||||
duration: result.metadata?.duration || 0,
|
|
||||||
},
|
sendEvent({
|
||||||
})
|
type: 'execution:error',
|
||||||
|
timestamp: new Date().toISOString(),
|
||||||
|
executionId,
|
||||||
|
workflowId,
|
||||||
|
data: {
|
||||||
|
error: timeoutErrorMessage,
|
||||||
|
duration: result.metadata?.duration || 0,
|
||||||
|
},
|
||||||
|
})
|
||||||
|
} else {
|
||||||
|
logger.info(`[${requestId}] Workflow execution was cancelled`)
|
||||||
|
|
||||||
|
sendEvent({
|
||||||
|
type: 'execution:cancelled',
|
||||||
|
timestamp: new Date().toISOString(),
|
||||||
|
executionId,
|
||||||
|
workflowId,
|
||||||
|
data: {
|
||||||
|
duration: result.metadata?.duration || 0,
|
||||||
|
},
|
||||||
|
})
|
||||||
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -799,14 +868,26 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
|
|||||||
endTime: result.metadata?.endTime || new Date().toISOString(),
|
endTime: result.metadata?.endTime || new Date().toISOString(),
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
|
|
||||||
// Cleanup base64 cache for this execution
|
|
||||||
await cleanupExecutionBase64Cache(executionId)
|
|
||||||
} catch (error: unknown) {
|
} catch (error: unknown) {
|
||||||
const errorMessage = error instanceof Error ? error.message : 'Unknown error'
|
const isTimeout = isTimeoutError(error) || timeoutController.isTimedOut()
|
||||||
logger.error(`[${requestId}] SSE execution failed: ${errorMessage}`)
|
const errorMessage = isTimeout
|
||||||
|
? getTimeoutErrorMessage(error, timeoutController.timeoutMs)
|
||||||
|
: error instanceof Error
|
||||||
|
? error.message
|
||||||
|
: 'Unknown error'
|
||||||
|
|
||||||
|
logger.error(`[${requestId}] SSE execution failed: ${errorMessage}`, { isTimeout })
|
||||||
|
|
||||||
const executionResult = hasExecutionResult(error) ? error.executionResult : undefined
|
const executionResult = hasExecutionResult(error) ? error.executionResult : undefined
|
||||||
|
const { traceSpans, totalDuration } = executionResult
|
||||||
|
? buildTraceSpans(executionResult)
|
||||||
|
: { traceSpans: [], totalDuration: 0 }
|
||||||
|
|
||||||
|
await loggingSession.safeCompleteWithError({
|
||||||
|
totalDurationMs: totalDuration || executionResult?.metadata?.duration,
|
||||||
|
error: { message: errorMessage },
|
||||||
|
traceSpans,
|
||||||
|
})
|
||||||
|
|
||||||
sendEvent({
|
sendEvent({
|
||||||
type: 'execution:error',
|
type: 'execution:error',
|
||||||
@@ -819,20 +900,23 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
|
|||||||
},
|
},
|
||||||
})
|
})
|
||||||
} finally {
|
} finally {
|
||||||
|
timeoutController.cleanup()
|
||||||
|
if (executionId) {
|
||||||
|
await cleanupExecutionBase64Cache(executionId)
|
||||||
|
}
|
||||||
if (!isStreamClosed) {
|
if (!isStreamClosed) {
|
||||||
try {
|
try {
|
||||||
controller.enqueue(encoder.encode('data: [DONE]\n\n'))
|
controller.enqueue(encoder.encode('data: [DONE]\n\n'))
|
||||||
controller.close()
|
controller.close()
|
||||||
} catch {
|
} catch {}
|
||||||
// Stream already closed - nothing to do
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
cancel() {
|
cancel() {
|
||||||
isStreamClosed = true
|
isStreamClosed = true
|
||||||
|
timeoutController.cleanup()
|
||||||
logger.info(`[${requestId}] Client aborted SSE stream, signalling cancellation`)
|
logger.info(`[${requestId}] Client aborted SSE stream, signalling cancellation`)
|
||||||
abortController.abort()
|
timeoutController.abort()
|
||||||
markExecutionCancelled(executionId).catch(() => {})
|
markExecutionCancelled(executionId).catch(() => {})
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
|
|||||||
@@ -1,5 +1,5 @@
|
|||||||
import type React from 'react'
|
import type React from 'react'
|
||||||
import { RepeatIcon, SplitIcon } from 'lucide-react'
|
import { AlertTriangleIcon, BanIcon, RepeatIcon, SplitIcon, XCircleIcon } from 'lucide-react'
|
||||||
import { getBlock } from '@/blocks'
|
import { getBlock } from '@/blocks'
|
||||||
import { TERMINAL_BLOCK_COLUMN_WIDTH } from '@/stores/constants'
|
import { TERMINAL_BLOCK_COLUMN_WIDTH } from '@/stores/constants'
|
||||||
import type { ConsoleEntry } from '@/stores/terminal'
|
import type { ConsoleEntry } from '@/stores/terminal'
|
||||||
@@ -12,6 +12,15 @@ const SUBFLOW_COLORS = {
|
|||||||
parallel: '#FEE12B',
|
parallel: '#FEE12B',
|
||||||
} as const
|
} as const
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Special block type colors for errors and system messages
|
||||||
|
*/
|
||||||
|
const SPECIAL_BLOCK_COLORS = {
|
||||||
|
error: '#ef4444',
|
||||||
|
validation: '#f59e0b',
|
||||||
|
cancelled: '#6b7280',
|
||||||
|
} as const
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Retrieves the icon component for a given block type
|
* Retrieves the icon component for a given block type
|
||||||
*/
|
*/
|
||||||
@@ -32,6 +41,18 @@ export function getBlockIcon(
|
|||||||
return SplitIcon
|
return SplitIcon
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (blockType === 'error') {
|
||||||
|
return XCircleIcon
|
||||||
|
}
|
||||||
|
|
||||||
|
if (blockType === 'validation') {
|
||||||
|
return AlertTriangleIcon
|
||||||
|
}
|
||||||
|
|
||||||
|
if (blockType === 'cancelled') {
|
||||||
|
return BanIcon
|
||||||
|
}
|
||||||
|
|
||||||
return null
|
return null
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -50,6 +71,16 @@ export function getBlockColor(blockType: string): string {
|
|||||||
if (blockType === 'parallel') {
|
if (blockType === 'parallel') {
|
||||||
return SUBFLOW_COLORS.parallel
|
return SUBFLOW_COLORS.parallel
|
||||||
}
|
}
|
||||||
|
// Special block types for errors and system messages
|
||||||
|
if (blockType === 'error') {
|
||||||
|
return SPECIAL_BLOCK_COLORS.error
|
||||||
|
}
|
||||||
|
if (blockType === 'validation') {
|
||||||
|
return SPECIAL_BLOCK_COLORS.validation
|
||||||
|
}
|
||||||
|
if (blockType === 'cancelled') {
|
||||||
|
return SPECIAL_BLOCK_COLORS.cancelled
|
||||||
|
}
|
||||||
return '#6b7280'
|
return '#6b7280'
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -4,6 +4,11 @@ import { useQueryClient } from '@tanstack/react-query'
|
|||||||
import { v4 as uuidv4 } from 'uuid'
|
import { v4 as uuidv4 } from 'uuid'
|
||||||
import { buildTraceSpans } from '@/lib/logs/execution/trace-spans/trace-spans'
|
import { buildTraceSpans } from '@/lib/logs/execution/trace-spans/trace-spans'
|
||||||
import { processStreamingBlockLogs } from '@/lib/tokenization'
|
import { processStreamingBlockLogs } from '@/lib/tokenization'
|
||||||
|
import type {
|
||||||
|
BlockCompletedData,
|
||||||
|
BlockErrorData,
|
||||||
|
BlockStartedData,
|
||||||
|
} from '@/lib/workflows/executor/execution-events'
|
||||||
import {
|
import {
|
||||||
extractTriggerMockPayload,
|
extractTriggerMockPayload,
|
||||||
selectBestTrigger,
|
selectBestTrigger,
|
||||||
@@ -17,7 +22,13 @@ import {
|
|||||||
import { useCurrentWorkflow } from '@/app/workspace/[workspaceId]/w/[workflowId]/hooks/use-current-workflow'
|
import { useCurrentWorkflow } from '@/app/workspace/[workspaceId]/w/[workflowId]/hooks/use-current-workflow'
|
||||||
import { getBlock } from '@/blocks'
|
import { getBlock } from '@/blocks'
|
||||||
import type { SerializableExecutionState } from '@/executor/execution/types'
|
import type { SerializableExecutionState } from '@/executor/execution/types'
|
||||||
import type { BlockLog, BlockState, ExecutionResult, StreamingExecution } from '@/executor/types'
|
import type {
|
||||||
|
BlockLog,
|
||||||
|
BlockState,
|
||||||
|
ExecutionResult,
|
||||||
|
NormalizedBlockOutput,
|
||||||
|
StreamingExecution,
|
||||||
|
} from '@/executor/types'
|
||||||
import { hasExecutionResult } from '@/executor/utils/errors'
|
import { hasExecutionResult } from '@/executor/utils/errors'
|
||||||
import { coerceValue } from '@/executor/utils/start-block'
|
import { coerceValue } from '@/executor/utils/start-block'
|
||||||
import { subscriptionKeys } from '@/hooks/queries/subscription'
|
import { subscriptionKeys } from '@/hooks/queries/subscription'
|
||||||
@@ -27,7 +38,7 @@ import { useExecutionStore } from '@/stores/execution'
|
|||||||
import { useNotificationStore } from '@/stores/notifications'
|
import { useNotificationStore } from '@/stores/notifications'
|
||||||
import { useVariablesStore } from '@/stores/panel'
|
import { useVariablesStore } from '@/stores/panel'
|
||||||
import { useEnvironmentStore } from '@/stores/settings/environment'
|
import { useEnvironmentStore } from '@/stores/settings/environment'
|
||||||
import { type ConsoleEntry, useTerminalConsoleStore } from '@/stores/terminal'
|
import { useTerminalConsoleStore } from '@/stores/terminal'
|
||||||
import { useWorkflowDiffStore } from '@/stores/workflow-diff'
|
import { useWorkflowDiffStore } from '@/stores/workflow-diff'
|
||||||
import { useWorkflowRegistry } from '@/stores/workflows/registry/store'
|
import { useWorkflowRegistry } from '@/stores/workflows/registry/store'
|
||||||
import { mergeSubblockState } from '@/stores/workflows/utils'
|
import { mergeSubblockState } from '@/stores/workflows/utils'
|
||||||
@@ -41,6 +52,19 @@ interface DebugValidationResult {
|
|||||||
error?: string
|
error?: string
|
||||||
}
|
}
|
||||||
|
|
||||||
|
interface BlockEventHandlerConfig {
|
||||||
|
workflowId?: string
|
||||||
|
executionId?: string
|
||||||
|
workflowEdges: Array<{ id: string; target: string }>
|
||||||
|
activeBlocksSet: Set<string>
|
||||||
|
accumulatedBlockLogs: BlockLog[]
|
||||||
|
accumulatedBlockStates: Map<string, BlockState>
|
||||||
|
executedBlockIds: Set<string>
|
||||||
|
consoleMode: 'update' | 'add'
|
||||||
|
includeStartConsoleEntry: boolean
|
||||||
|
onBlockCompleteCallback?: (blockId: string, output: unknown) => Promise<void>
|
||||||
|
}
|
||||||
|
|
||||||
const WORKFLOW_EXECUTION_FAILURE_MESSAGE = 'Workflow execution failed'
|
const WORKFLOW_EXECUTION_FAILURE_MESSAGE = 'Workflow execution failed'
|
||||||
|
|
||||||
function isRecord(value: unknown): value is Record<string, unknown> {
|
function isRecord(value: unknown): value is Record<string, unknown> {
|
||||||
@@ -149,6 +173,340 @@ export function useWorkflowExecution() {
|
|||||||
setActiveBlocks,
|
setActiveBlocks,
|
||||||
])
|
])
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Builds timing fields for execution-level console entries.
|
||||||
|
*/
|
||||||
|
const buildExecutionTiming = useCallback((durationMs?: number) => {
|
||||||
|
const normalizedDuration = durationMs || 0
|
||||||
|
return {
|
||||||
|
durationMs: normalizedDuration,
|
||||||
|
startedAt: new Date(Date.now() - normalizedDuration).toISOString(),
|
||||||
|
endedAt: new Date().toISOString(),
|
||||||
|
}
|
||||||
|
}, [])
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Adds an execution-level error entry to the console when appropriate.
|
||||||
|
*/
|
||||||
|
const addExecutionErrorConsoleEntry = useCallback(
|
||||||
|
(params: {
|
||||||
|
workflowId?: string
|
||||||
|
executionId?: string
|
||||||
|
error?: string
|
||||||
|
durationMs?: number
|
||||||
|
blockLogs: BlockLog[]
|
||||||
|
isPreExecutionError?: boolean
|
||||||
|
}) => {
|
||||||
|
if (!params.workflowId) return
|
||||||
|
|
||||||
|
const hasBlockError = params.blockLogs.some((log) => log.error)
|
||||||
|
const isPreExecutionError = params.isPreExecutionError ?? false
|
||||||
|
if (!isPreExecutionError && hasBlockError) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
const errorMessage = params.error || 'Execution failed'
|
||||||
|
const isTimeout = errorMessage.toLowerCase().includes('timed out')
|
||||||
|
const timing = buildExecutionTiming(params.durationMs)
|
||||||
|
|
||||||
|
addConsole({
|
||||||
|
input: {},
|
||||||
|
output: {},
|
||||||
|
success: false,
|
||||||
|
error: errorMessage,
|
||||||
|
durationMs: timing.durationMs,
|
||||||
|
startedAt: timing.startedAt,
|
||||||
|
executionOrder: isPreExecutionError ? 0 : Number.MAX_SAFE_INTEGER,
|
||||||
|
endedAt: timing.endedAt,
|
||||||
|
workflowId: params.workflowId,
|
||||||
|
blockId: isPreExecutionError
|
||||||
|
? 'validation'
|
||||||
|
: isTimeout
|
||||||
|
? 'timeout-error'
|
||||||
|
: 'execution-error',
|
||||||
|
executionId: params.executionId,
|
||||||
|
blockName: isPreExecutionError
|
||||||
|
? 'Workflow Validation'
|
||||||
|
: isTimeout
|
||||||
|
? 'Timeout Error'
|
||||||
|
: 'Execution Error',
|
||||||
|
blockType: isPreExecutionError ? 'validation' : 'error',
|
||||||
|
})
|
||||||
|
},
|
||||||
|
[addConsole, buildExecutionTiming]
|
||||||
|
)
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Adds an execution-level cancellation entry to the console.
|
||||||
|
*/
|
||||||
|
const addExecutionCancelledConsoleEntry = useCallback(
|
||||||
|
(params: { workflowId?: string; executionId?: string; durationMs?: number }) => {
|
||||||
|
if (!params.workflowId) return
|
||||||
|
|
||||||
|
const timing = buildExecutionTiming(params.durationMs)
|
||||||
|
addConsole({
|
||||||
|
input: {},
|
||||||
|
output: {},
|
||||||
|
success: false,
|
||||||
|
error: 'Execution was cancelled',
|
||||||
|
durationMs: timing.durationMs,
|
||||||
|
startedAt: timing.startedAt,
|
||||||
|
executionOrder: Number.MAX_SAFE_INTEGER,
|
||||||
|
endedAt: timing.endedAt,
|
||||||
|
workflowId: params.workflowId,
|
||||||
|
blockId: 'cancelled',
|
||||||
|
executionId: params.executionId,
|
||||||
|
blockName: 'Execution Cancelled',
|
||||||
|
blockType: 'cancelled',
|
||||||
|
})
|
||||||
|
},
|
||||||
|
[addConsole, buildExecutionTiming]
|
||||||
|
)
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Handles workflow-level execution errors for console output.
|
||||||
|
*/
|
||||||
|
const handleExecutionErrorConsole = useCallback(
|
||||||
|
(params: {
|
||||||
|
workflowId?: string
|
||||||
|
executionId?: string
|
||||||
|
error?: string
|
||||||
|
durationMs?: number
|
||||||
|
blockLogs: BlockLog[]
|
||||||
|
isPreExecutionError?: boolean
|
||||||
|
}) => {
|
||||||
|
if (params.workflowId) {
|
||||||
|
cancelRunningEntries(params.workflowId)
|
||||||
|
}
|
||||||
|
addExecutionErrorConsoleEntry(params)
|
||||||
|
},
|
||||||
|
[addExecutionErrorConsoleEntry, cancelRunningEntries]
|
||||||
|
)
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Handles workflow-level execution cancellations for console output.
|
||||||
|
*/
|
||||||
|
const handleExecutionCancelledConsole = useCallback(
|
||||||
|
(params: { workflowId?: string; executionId?: string; durationMs?: number }) => {
|
||||||
|
if (params.workflowId) {
|
||||||
|
cancelRunningEntries(params.workflowId)
|
||||||
|
}
|
||||||
|
addExecutionCancelledConsoleEntry(params)
|
||||||
|
},
|
||||||
|
[addExecutionCancelledConsoleEntry, cancelRunningEntries]
|
||||||
|
)
|
||||||
|
|
||||||
|
const buildBlockEventHandlers = useCallback(
|
||||||
|
(config: BlockEventHandlerConfig) => {
|
||||||
|
const {
|
||||||
|
workflowId,
|
||||||
|
executionId,
|
||||||
|
workflowEdges,
|
||||||
|
activeBlocksSet,
|
||||||
|
accumulatedBlockLogs,
|
||||||
|
accumulatedBlockStates,
|
||||||
|
executedBlockIds,
|
||||||
|
consoleMode,
|
||||||
|
includeStartConsoleEntry,
|
||||||
|
onBlockCompleteCallback,
|
||||||
|
} = config
|
||||||
|
|
||||||
|
const updateActiveBlocks = (blockId: string, isActive: boolean) => {
|
||||||
|
if (isActive) {
|
||||||
|
activeBlocksSet.add(blockId)
|
||||||
|
} else {
|
||||||
|
activeBlocksSet.delete(blockId)
|
||||||
|
}
|
||||||
|
setActiveBlocks(new Set(activeBlocksSet))
|
||||||
|
}
|
||||||
|
|
||||||
|
const markIncomingEdges = (blockId: string) => {
|
||||||
|
const incomingEdges = workflowEdges.filter((edge) => edge.target === blockId)
|
||||||
|
incomingEdges.forEach((edge) => {
|
||||||
|
setEdgeRunStatus(edge.id, 'success')
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
const isContainerBlockType = (blockType?: string) => {
|
||||||
|
return blockType === 'loop' || blockType === 'parallel'
|
||||||
|
}
|
||||||
|
|
||||||
|
const createBlockLogEntry = (
|
||||||
|
data: BlockCompletedData | BlockErrorData,
|
||||||
|
options: { success: boolean; output?: unknown; error?: string }
|
||||||
|
): BlockLog => ({
|
||||||
|
blockId: data.blockId,
|
||||||
|
blockName: data.blockName || 'Unknown Block',
|
||||||
|
blockType: data.blockType || 'unknown',
|
||||||
|
input: data.input || {},
|
||||||
|
output: options.output ?? {},
|
||||||
|
success: options.success,
|
||||||
|
error: options.error,
|
||||||
|
durationMs: data.durationMs,
|
||||||
|
startedAt: data.startedAt,
|
||||||
|
executionOrder: data.executionOrder,
|
||||||
|
endedAt: data.endedAt,
|
||||||
|
})
|
||||||
|
|
||||||
|
const addConsoleEntry = (data: BlockCompletedData, output: NormalizedBlockOutput) => {
|
||||||
|
if (!workflowId) return
|
||||||
|
addConsole({
|
||||||
|
input: data.input || {},
|
||||||
|
output,
|
||||||
|
success: true,
|
||||||
|
durationMs: data.durationMs,
|
||||||
|
startedAt: data.startedAt,
|
||||||
|
executionOrder: data.executionOrder,
|
||||||
|
endedAt: data.endedAt,
|
||||||
|
workflowId,
|
||||||
|
blockId: data.blockId,
|
||||||
|
executionId,
|
||||||
|
blockName: data.blockName || 'Unknown Block',
|
||||||
|
blockType: data.blockType || 'unknown',
|
||||||
|
iterationCurrent: data.iterationCurrent,
|
||||||
|
iterationTotal: data.iterationTotal,
|
||||||
|
iterationType: data.iterationType,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
const addConsoleErrorEntry = (data: BlockErrorData) => {
|
||||||
|
if (!workflowId) return
|
||||||
|
addConsole({
|
||||||
|
input: data.input || {},
|
||||||
|
output: {},
|
||||||
|
success: false,
|
||||||
|
error: data.error,
|
||||||
|
durationMs: data.durationMs,
|
||||||
|
startedAt: data.startedAt,
|
||||||
|
executionOrder: data.executionOrder,
|
||||||
|
endedAt: data.endedAt,
|
||||||
|
workflowId,
|
||||||
|
blockId: data.blockId,
|
||||||
|
executionId,
|
||||||
|
blockName: data.blockName || 'Unknown Block',
|
||||||
|
blockType: data.blockType || 'unknown',
|
||||||
|
iterationCurrent: data.iterationCurrent,
|
||||||
|
iterationTotal: data.iterationTotal,
|
||||||
|
iterationType: data.iterationType,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
const updateConsoleEntry = (data: BlockCompletedData) => {
|
||||||
|
updateConsole(
|
||||||
|
data.blockId,
|
||||||
|
{
|
||||||
|
input: data.input || {},
|
||||||
|
replaceOutput: data.output,
|
||||||
|
success: true,
|
||||||
|
durationMs: data.durationMs,
|
||||||
|
startedAt: data.startedAt,
|
||||||
|
endedAt: data.endedAt,
|
||||||
|
isRunning: false,
|
||||||
|
iterationCurrent: data.iterationCurrent,
|
||||||
|
iterationTotal: data.iterationTotal,
|
||||||
|
iterationType: data.iterationType,
|
||||||
|
},
|
||||||
|
executionId
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
const updateConsoleErrorEntry = (data: BlockErrorData) => {
|
||||||
|
updateConsole(
|
||||||
|
data.blockId,
|
||||||
|
{
|
||||||
|
input: data.input || {},
|
||||||
|
replaceOutput: {},
|
||||||
|
success: false,
|
||||||
|
error: data.error,
|
||||||
|
durationMs: data.durationMs,
|
||||||
|
startedAt: data.startedAt,
|
||||||
|
endedAt: data.endedAt,
|
||||||
|
isRunning: false,
|
||||||
|
iterationCurrent: data.iterationCurrent,
|
||||||
|
iterationTotal: data.iterationTotal,
|
||||||
|
iterationType: data.iterationType,
|
||||||
|
},
|
||||||
|
executionId
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
const onBlockStarted = (data: BlockStartedData) => {
|
||||||
|
updateActiveBlocks(data.blockId, true)
|
||||||
|
markIncomingEdges(data.blockId)
|
||||||
|
|
||||||
|
if (!includeStartConsoleEntry || !workflowId) return
|
||||||
|
|
||||||
|
const startedAt = new Date().toISOString()
|
||||||
|
addConsole({
|
||||||
|
input: {},
|
||||||
|
output: undefined,
|
||||||
|
success: undefined,
|
||||||
|
durationMs: undefined,
|
||||||
|
startedAt,
|
||||||
|
executionOrder: data.executionOrder,
|
||||||
|
endedAt: undefined,
|
||||||
|
workflowId,
|
||||||
|
blockId: data.blockId,
|
||||||
|
executionId,
|
||||||
|
blockName: data.blockName || 'Unknown Block',
|
||||||
|
blockType: data.blockType || 'unknown',
|
||||||
|
isRunning: true,
|
||||||
|
iterationCurrent: data.iterationCurrent,
|
||||||
|
iterationTotal: data.iterationTotal,
|
||||||
|
iterationType: data.iterationType,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
const onBlockCompleted = (data: BlockCompletedData) => {
|
||||||
|
updateActiveBlocks(data.blockId, false)
|
||||||
|
setBlockRunStatus(data.blockId, 'success')
|
||||||
|
|
||||||
|
executedBlockIds.add(data.blockId)
|
||||||
|
accumulatedBlockStates.set(data.blockId, {
|
||||||
|
output: data.output,
|
||||||
|
executed: true,
|
||||||
|
executionTime: data.durationMs,
|
||||||
|
})
|
||||||
|
|
||||||
|
if (isContainerBlockType(data.blockType)) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
accumulatedBlockLogs.push(createBlockLogEntry(data, { success: true, output: data.output }))
|
||||||
|
|
||||||
|
if (consoleMode === 'update') {
|
||||||
|
updateConsoleEntry(data)
|
||||||
|
} else {
|
||||||
|
addConsoleEntry(data, data.output as NormalizedBlockOutput)
|
||||||
|
}
|
||||||
|
|
||||||
|
if (onBlockCompleteCallback) {
|
||||||
|
onBlockCompleteCallback(data.blockId, data.output).catch((error) => {
|
||||||
|
logger.error('Error in onBlockComplete callback:', error)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
const onBlockError = (data: BlockErrorData) => {
|
||||||
|
updateActiveBlocks(data.blockId, false)
|
||||||
|
setBlockRunStatus(data.blockId, 'error')
|
||||||
|
|
||||||
|
accumulatedBlockLogs.push(
|
||||||
|
createBlockLogEntry(data, { success: false, output: {}, error: data.error })
|
||||||
|
)
|
||||||
|
|
||||||
|
if (consoleMode === 'update') {
|
||||||
|
updateConsoleErrorEntry(data)
|
||||||
|
} else {
|
||||||
|
addConsoleErrorEntry(data)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return { onBlockStarted, onBlockCompleted, onBlockError }
|
||||||
|
},
|
||||||
|
[addConsole, setActiveBlocks, setBlockRunStatus, setEdgeRunStatus, updateConsole]
|
||||||
|
)
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Checks if debug session is complete based on execution result
|
* Checks if debug session is complete based on execution result
|
||||||
*/
|
*/
|
||||||
@@ -789,7 +1147,12 @@ export function useWorkflowExecution() {
|
|||||||
const startBlock = TriggerUtils.findStartBlock(filteredStates, 'chat')
|
const startBlock = TriggerUtils.findStartBlock(filteredStates, 'chat')
|
||||||
|
|
||||||
if (!startBlock) {
|
if (!startBlock) {
|
||||||
throw new Error(TriggerUtils.getTriggerValidationMessage('chat', 'missing'))
|
throw new WorkflowValidationError(
|
||||||
|
TriggerUtils.getTriggerValidationMessage('chat', 'missing'),
|
||||||
|
'validation',
|
||||||
|
'validation',
|
||||||
|
'Workflow Validation'
|
||||||
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
startBlockId = startBlock.blockId
|
startBlockId = startBlock.blockId
|
||||||
@@ -800,7 +1163,12 @@ export function useWorkflowExecution() {
|
|||||||
})
|
})
|
||||||
|
|
||||||
if (candidates.length === 0) {
|
if (candidates.length === 0) {
|
||||||
const error = new Error('Workflow requires at least one trigger block to execute')
|
const error = new WorkflowValidationError(
|
||||||
|
'Workflow requires at least one trigger block to execute',
|
||||||
|
'validation',
|
||||||
|
'validation',
|
||||||
|
'Workflow Validation'
|
||||||
|
)
|
||||||
logger.error('No trigger blocks found for manual run', {
|
logger.error('No trigger blocks found for manual run', {
|
||||||
allBlockTypes: Object.values(filteredStates).map((b) => b.type),
|
allBlockTypes: Object.values(filteredStates).map((b) => b.type),
|
||||||
})
|
})
|
||||||
@@ -813,7 +1181,12 @@ export function useWorkflowExecution() {
|
|||||||
(candidate) => candidate.path === StartBlockPath.SPLIT_API
|
(candidate) => candidate.path === StartBlockPath.SPLIT_API
|
||||||
)
|
)
|
||||||
if (apiCandidates.length > 1) {
|
if (apiCandidates.length > 1) {
|
||||||
const error = new Error('Multiple API Trigger blocks found. Keep only one.')
|
const error = new WorkflowValidationError(
|
||||||
|
'Multiple API Trigger blocks found. Keep only one.',
|
||||||
|
'validation',
|
||||||
|
'validation',
|
||||||
|
'Workflow Validation'
|
||||||
|
)
|
||||||
logger.error('Multiple API triggers found')
|
logger.error('Multiple API triggers found')
|
||||||
setIsExecuting(false)
|
setIsExecuting(false)
|
||||||
throw error
|
throw error
|
||||||
@@ -833,7 +1206,12 @@ export function useWorkflowExecution() {
|
|||||||
const outgoingConnections = workflowEdges.filter((edge) => edge.source === startBlockId)
|
const outgoingConnections = workflowEdges.filter((edge) => edge.source === startBlockId)
|
||||||
if (outgoingConnections.length === 0) {
|
if (outgoingConnections.length === 0) {
|
||||||
const triggerName = selectedTrigger.name || selectedTrigger.type
|
const triggerName = selectedTrigger.name || selectedTrigger.type
|
||||||
const error = new Error(`${triggerName} must be connected to other blocks to execute`)
|
const error = new WorkflowValidationError(
|
||||||
|
`${triggerName} must be connected to other blocks to execute`,
|
||||||
|
'validation',
|
||||||
|
'validation',
|
||||||
|
'Workflow Validation'
|
||||||
|
)
|
||||||
logger.error('Trigger has no outgoing connections', { triggerName, startBlockId })
|
logger.error('Trigger has no outgoing connections', { triggerName, startBlockId })
|
||||||
setIsExecuting(false)
|
setIsExecuting(false)
|
||||||
throw error
|
throw error
|
||||||
@@ -859,7 +1237,12 @@ export function useWorkflowExecution() {
|
|||||||
|
|
||||||
// If we don't have a valid startBlockId at this point, throw an error
|
// If we don't have a valid startBlockId at this point, throw an error
|
||||||
if (!startBlockId) {
|
if (!startBlockId) {
|
||||||
const error = new Error('No valid trigger block found to start execution')
|
const error = new WorkflowValidationError(
|
||||||
|
'No valid trigger block found to start execution',
|
||||||
|
'validation',
|
||||||
|
'validation',
|
||||||
|
'Workflow Validation'
|
||||||
|
)
|
||||||
logger.error('No startBlockId found after trigger search')
|
logger.error('No startBlockId found after trigger search')
|
||||||
setIsExecuting(false)
|
setIsExecuting(false)
|
||||||
throw error
|
throw error
|
||||||
@@ -892,6 +1275,19 @@ export function useWorkflowExecution() {
|
|||||||
|
|
||||||
// Execute the workflow
|
// Execute the workflow
|
||||||
try {
|
try {
|
||||||
|
const blockHandlers = buildBlockEventHandlers({
|
||||||
|
workflowId: activeWorkflowId,
|
||||||
|
executionId,
|
||||||
|
workflowEdges,
|
||||||
|
activeBlocksSet,
|
||||||
|
accumulatedBlockLogs,
|
||||||
|
accumulatedBlockStates,
|
||||||
|
executedBlockIds,
|
||||||
|
consoleMode: 'update',
|
||||||
|
includeStartConsoleEntry: true,
|
||||||
|
onBlockCompleteCallback: onBlockComplete,
|
||||||
|
})
|
||||||
|
|
||||||
await executionStream.execute({
|
await executionStream.execute({
|
||||||
workflowId: activeWorkflowId,
|
workflowId: activeWorkflowId,
|
||||||
input: finalWorkflowInput,
|
input: finalWorkflowInput,
|
||||||
@@ -914,145 +1310,9 @@ export function useWorkflowExecution() {
|
|||||||
logger.info('Server execution started:', data)
|
logger.info('Server execution started:', data)
|
||||||
},
|
},
|
||||||
|
|
||||||
onBlockStarted: (data) => {
|
onBlockStarted: blockHandlers.onBlockStarted,
|
||||||
activeBlocksSet.add(data.blockId)
|
onBlockCompleted: blockHandlers.onBlockCompleted,
|
||||||
// Create a new Set to trigger React re-render
|
onBlockError: blockHandlers.onBlockError,
|
||||||
setActiveBlocks(new Set(activeBlocksSet))
|
|
||||||
|
|
||||||
// Track edges that led to this block as soon as execution starts
|
|
||||||
const incomingEdges = workflowEdges.filter((edge) => edge.target === data.blockId)
|
|
||||||
incomingEdges.forEach((edge) => {
|
|
||||||
setEdgeRunStatus(edge.id, 'success')
|
|
||||||
})
|
|
||||||
|
|
||||||
// Add entry to terminal immediately with isRunning=true
|
|
||||||
// Use server-provided executionOrder to ensure correct sort order
|
|
||||||
const startedAt = new Date().toISOString()
|
|
||||||
addConsole({
|
|
||||||
input: {},
|
|
||||||
output: undefined,
|
|
||||||
success: undefined,
|
|
||||||
durationMs: undefined,
|
|
||||||
startedAt,
|
|
||||||
executionOrder: data.executionOrder,
|
|
||||||
endedAt: undefined,
|
|
||||||
workflowId: activeWorkflowId,
|
|
||||||
blockId: data.blockId,
|
|
||||||
executionId,
|
|
||||||
blockName: data.blockName || 'Unknown Block',
|
|
||||||
blockType: data.blockType || 'unknown',
|
|
||||||
isRunning: true,
|
|
||||||
// Pass through iteration context for subflow grouping
|
|
||||||
iterationCurrent: data.iterationCurrent,
|
|
||||||
iterationTotal: data.iterationTotal,
|
|
||||||
iterationType: data.iterationType,
|
|
||||||
})
|
|
||||||
},
|
|
||||||
|
|
||||||
onBlockCompleted: (data) => {
|
|
||||||
activeBlocksSet.delete(data.blockId)
|
|
||||||
setActiveBlocks(new Set(activeBlocksSet))
|
|
||||||
setBlockRunStatus(data.blockId, 'success')
|
|
||||||
|
|
||||||
executedBlockIds.add(data.blockId)
|
|
||||||
accumulatedBlockStates.set(data.blockId, {
|
|
||||||
output: data.output,
|
|
||||||
executed: true,
|
|
||||||
executionTime: data.durationMs,
|
|
||||||
})
|
|
||||||
|
|
||||||
const isContainerBlock = data.blockType === 'loop' || data.blockType === 'parallel'
|
|
||||||
if (isContainerBlock) return
|
|
||||||
|
|
||||||
const startedAt = data.startedAt
|
|
||||||
const endedAt = data.endedAt
|
|
||||||
|
|
||||||
accumulatedBlockLogs.push({
|
|
||||||
blockId: data.blockId,
|
|
||||||
blockName: data.blockName || 'Unknown Block',
|
|
||||||
blockType: data.blockType || 'unknown',
|
|
||||||
input: data.input || {},
|
|
||||||
output: data.output,
|
|
||||||
success: true,
|
|
||||||
durationMs: data.durationMs,
|
|
||||||
startedAt,
|
|
||||||
executionOrder: data.executionOrder,
|
|
||||||
endedAt,
|
|
||||||
})
|
|
||||||
|
|
||||||
// Update existing console entry (created in onBlockStarted) with completion data
|
|
||||||
updateConsole(
|
|
||||||
data.blockId,
|
|
||||||
{
|
|
||||||
input: data.input || {},
|
|
||||||
replaceOutput: data.output,
|
|
||||||
success: true,
|
|
||||||
durationMs: data.durationMs,
|
|
||||||
startedAt,
|
|
||||||
endedAt,
|
|
||||||
isRunning: false,
|
|
||||||
// Pass through iteration context for subflow grouping
|
|
||||||
iterationCurrent: data.iterationCurrent,
|
|
||||||
iterationTotal: data.iterationTotal,
|
|
||||||
iterationType: data.iterationType,
|
|
||||||
},
|
|
||||||
executionId
|
|
||||||
)
|
|
||||||
|
|
||||||
// Call onBlockComplete callback if provided
|
|
||||||
if (onBlockComplete) {
|
|
||||||
onBlockComplete(data.blockId, data.output).catch((error) => {
|
|
||||||
logger.error('Error in onBlockComplete callback:', error)
|
|
||||||
})
|
|
||||||
}
|
|
||||||
},
|
|
||||||
|
|
||||||
onBlockError: (data) => {
|
|
||||||
activeBlocksSet.delete(data.blockId)
|
|
||||||
// Create a new Set to trigger React re-render
|
|
||||||
setActiveBlocks(new Set(activeBlocksSet))
|
|
||||||
|
|
||||||
// Track failed block execution in run path
|
|
||||||
setBlockRunStatus(data.blockId, 'error')
|
|
||||||
|
|
||||||
const startedAt = data.startedAt
|
|
||||||
const endedAt = data.endedAt
|
|
||||||
|
|
||||||
// Accumulate block error log for the execution result
|
|
||||||
accumulatedBlockLogs.push({
|
|
||||||
blockId: data.blockId,
|
|
||||||
blockName: data.blockName || 'Unknown Block',
|
|
||||||
blockType: data.blockType || 'unknown',
|
|
||||||
input: data.input || {},
|
|
||||||
output: {},
|
|
||||||
success: false,
|
|
||||||
error: data.error,
|
|
||||||
durationMs: data.durationMs,
|
|
||||||
startedAt,
|
|
||||||
executionOrder: data.executionOrder,
|
|
||||||
endedAt,
|
|
||||||
})
|
|
||||||
|
|
||||||
// Update existing console entry (created in onBlockStarted) with error data
|
|
||||||
updateConsole(
|
|
||||||
data.blockId,
|
|
||||||
{
|
|
||||||
input: data.input || {},
|
|
||||||
replaceOutput: {},
|
|
||||||
success: false,
|
|
||||||
error: data.error,
|
|
||||||
durationMs: data.durationMs,
|
|
||||||
startedAt,
|
|
||||||
endedAt,
|
|
||||||
isRunning: false,
|
|
||||||
// Pass through iteration context for subflow grouping
|
|
||||||
iterationCurrent: data.iterationCurrent,
|
|
||||||
iterationTotal: data.iterationTotal,
|
|
||||||
iterationType: data.iterationType,
|
|
||||||
},
|
|
||||||
executionId
|
|
||||||
)
|
|
||||||
},
|
|
||||||
|
|
||||||
onStreamChunk: (data) => {
|
onStreamChunk: (data) => {
|
||||||
const existing = streamedContent.get(data.blockId) || ''
|
const existing = streamedContent.get(data.blockId) || ''
|
||||||
@@ -1157,39 +1417,23 @@ export function useWorkflowExecution() {
|
|||||||
logs: accumulatedBlockLogs,
|
logs: accumulatedBlockLogs,
|
||||||
}
|
}
|
||||||
|
|
||||||
// Only add workflow-level error if no blocks have executed yet
|
const isPreExecutionError = accumulatedBlockLogs.length === 0
|
||||||
// This catches pre-execution errors (validation, serialization, etc.)
|
handleExecutionErrorConsole({
|
||||||
// Block execution errors are already logged via onBlockError callback
|
workflowId: activeWorkflowId,
|
||||||
const { entries } = useTerminalConsoleStore.getState()
|
executionId,
|
||||||
const existingLogs = entries.filter(
|
error: data.error,
|
||||||
(log: ConsoleEntry) => log.executionId === executionId
|
durationMs: data.duration,
|
||||||
)
|
blockLogs: accumulatedBlockLogs,
|
||||||
|
isPreExecutionError,
|
||||||
if (existingLogs.length === 0) {
|
})
|
||||||
// No blocks executed yet - this is a pre-execution error
|
|
||||||
// Use 0 for executionOrder so validation errors appear first
|
|
||||||
addConsole({
|
|
||||||
input: {},
|
|
||||||
output: {},
|
|
||||||
success: false,
|
|
||||||
error: data.error,
|
|
||||||
durationMs: data.duration || 0,
|
|
||||||
startedAt: new Date(Date.now() - (data.duration || 0)).toISOString(),
|
|
||||||
executionOrder: 0,
|
|
||||||
endedAt: new Date().toISOString(),
|
|
||||||
workflowId: activeWorkflowId,
|
|
||||||
blockId: 'validation',
|
|
||||||
executionId,
|
|
||||||
blockName: 'Workflow Validation',
|
|
||||||
blockType: 'validation',
|
|
||||||
})
|
|
||||||
}
|
|
||||||
},
|
},
|
||||||
|
|
||||||
onExecutionCancelled: () => {
|
onExecutionCancelled: (data) => {
|
||||||
if (activeWorkflowId) {
|
handleExecutionCancelledConsole({
|
||||||
cancelRunningEntries(activeWorkflowId)
|
workflowId: activeWorkflowId,
|
||||||
}
|
executionId,
|
||||||
|
durationMs: data?.duration,
|
||||||
|
})
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
@@ -1585,115 +1829,27 @@ export function useWorkflowExecution() {
|
|||||||
const activeBlocksSet = new Set<string>()
|
const activeBlocksSet = new Set<string>()
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
const blockHandlers = buildBlockEventHandlers({
|
||||||
|
workflowId,
|
||||||
|
executionId,
|
||||||
|
workflowEdges,
|
||||||
|
activeBlocksSet,
|
||||||
|
accumulatedBlockLogs,
|
||||||
|
accumulatedBlockStates,
|
||||||
|
executedBlockIds,
|
||||||
|
consoleMode: 'add',
|
||||||
|
includeStartConsoleEntry: false,
|
||||||
|
})
|
||||||
|
|
||||||
await executionStream.executeFromBlock({
|
await executionStream.executeFromBlock({
|
||||||
workflowId,
|
workflowId,
|
||||||
startBlockId: blockId,
|
startBlockId: blockId,
|
||||||
sourceSnapshot: effectiveSnapshot,
|
sourceSnapshot: effectiveSnapshot,
|
||||||
input: workflowInput,
|
input: workflowInput,
|
||||||
callbacks: {
|
callbacks: {
|
||||||
onBlockStarted: (data) => {
|
onBlockStarted: blockHandlers.onBlockStarted,
|
||||||
activeBlocksSet.add(data.blockId)
|
onBlockCompleted: blockHandlers.onBlockCompleted,
|
||||||
setActiveBlocks(new Set(activeBlocksSet))
|
onBlockError: blockHandlers.onBlockError,
|
||||||
|
|
||||||
const incomingEdges = workflowEdges.filter((edge) => edge.target === data.blockId)
|
|
||||||
incomingEdges.forEach((edge) => {
|
|
||||||
setEdgeRunStatus(edge.id, 'success')
|
|
||||||
})
|
|
||||||
},
|
|
||||||
|
|
||||||
onBlockCompleted: (data) => {
|
|
||||||
activeBlocksSet.delete(data.blockId)
|
|
||||||
setActiveBlocks(new Set(activeBlocksSet))
|
|
||||||
|
|
||||||
setBlockRunStatus(data.blockId, 'success')
|
|
||||||
|
|
||||||
executedBlockIds.add(data.blockId)
|
|
||||||
accumulatedBlockStates.set(data.blockId, {
|
|
||||||
output: data.output,
|
|
||||||
executed: true,
|
|
||||||
executionTime: data.durationMs,
|
|
||||||
})
|
|
||||||
|
|
||||||
const isContainerBlock = data.blockType === 'loop' || data.blockType === 'parallel'
|
|
||||||
if (isContainerBlock) return
|
|
||||||
|
|
||||||
const startedAt = data.startedAt
|
|
||||||
const endedAt = data.endedAt
|
|
||||||
|
|
||||||
accumulatedBlockLogs.push({
|
|
||||||
blockId: data.blockId,
|
|
||||||
blockName: data.blockName || 'Unknown Block',
|
|
||||||
blockType: data.blockType || 'unknown',
|
|
||||||
input: data.input || {},
|
|
||||||
output: data.output,
|
|
||||||
success: true,
|
|
||||||
durationMs: data.durationMs,
|
|
||||||
startedAt,
|
|
||||||
executionOrder: data.executionOrder,
|
|
||||||
endedAt,
|
|
||||||
})
|
|
||||||
|
|
||||||
addConsole({
|
|
||||||
input: data.input || {},
|
|
||||||
output: data.output,
|
|
||||||
success: true,
|
|
||||||
durationMs: data.durationMs,
|
|
||||||
startedAt,
|
|
||||||
executionOrder: data.executionOrder,
|
|
||||||
endedAt,
|
|
||||||
workflowId,
|
|
||||||
blockId: data.blockId,
|
|
||||||
executionId,
|
|
||||||
blockName: data.blockName || 'Unknown Block',
|
|
||||||
blockType: data.blockType || 'unknown',
|
|
||||||
iterationCurrent: data.iterationCurrent,
|
|
||||||
iterationTotal: data.iterationTotal,
|
|
||||||
iterationType: data.iterationType,
|
|
||||||
})
|
|
||||||
},
|
|
||||||
|
|
||||||
onBlockError: (data) => {
|
|
||||||
activeBlocksSet.delete(data.blockId)
|
|
||||||
setActiveBlocks(new Set(activeBlocksSet))
|
|
||||||
|
|
||||||
setBlockRunStatus(data.blockId, 'error')
|
|
||||||
|
|
||||||
const startedAt = data.startedAt
|
|
||||||
const endedAt = data.endedAt
|
|
||||||
|
|
||||||
accumulatedBlockLogs.push({
|
|
||||||
blockId: data.blockId,
|
|
||||||
blockName: data.blockName || 'Unknown Block',
|
|
||||||
blockType: data.blockType || 'unknown',
|
|
||||||
input: data.input || {},
|
|
||||||
output: {},
|
|
||||||
success: false,
|
|
||||||
error: data.error,
|
|
||||||
executionOrder: data.executionOrder,
|
|
||||||
durationMs: data.durationMs,
|
|
||||||
startedAt,
|
|
||||||
endedAt,
|
|
||||||
})
|
|
||||||
|
|
||||||
addConsole({
|
|
||||||
input: data.input || {},
|
|
||||||
output: {},
|
|
||||||
success: false,
|
|
||||||
error: data.error,
|
|
||||||
durationMs: data.durationMs,
|
|
||||||
startedAt,
|
|
||||||
executionOrder: data.executionOrder,
|
|
||||||
endedAt,
|
|
||||||
workflowId,
|
|
||||||
blockId: data.blockId,
|
|
||||||
executionId,
|
|
||||||
blockName: data.blockName,
|
|
||||||
blockType: data.blockType,
|
|
||||||
iterationCurrent: data.iterationCurrent,
|
|
||||||
iterationTotal: data.iterationTotal,
|
|
||||||
iterationType: data.iterationType,
|
|
||||||
})
|
|
||||||
},
|
|
||||||
|
|
||||||
onExecutionCompleted: (data) => {
|
onExecutionCompleted: (data) => {
|
||||||
if (data.success) {
|
if (data.success) {
|
||||||
@@ -1736,17 +1892,23 @@ export function useWorkflowExecution() {
|
|||||||
'Workflow was modified. Run the workflow again to enable running from block.',
|
'Workflow was modified. Run the workflow again to enable running from block.',
|
||||||
workflowId,
|
workflowId,
|
||||||
})
|
})
|
||||||
} else {
|
|
||||||
addNotification({
|
|
||||||
level: 'error',
|
|
||||||
message: data.error || 'Run from block failed',
|
|
||||||
workflowId,
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
handleExecutionErrorConsole({
|
||||||
|
workflowId,
|
||||||
|
executionId,
|
||||||
|
error: data.error,
|
||||||
|
durationMs: data.duration,
|
||||||
|
blockLogs: accumulatedBlockLogs,
|
||||||
|
})
|
||||||
},
|
},
|
||||||
|
|
||||||
onExecutionCancelled: () => {
|
onExecutionCancelled: (data) => {
|
||||||
cancelRunningEntries(workflowId)
|
handleExecutionCancelledConsole({
|
||||||
|
workflowId,
|
||||||
|
executionId,
|
||||||
|
durationMs: data?.duration,
|
||||||
|
})
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
@@ -1768,8 +1930,9 @@ export function useWorkflowExecution() {
|
|||||||
setBlockRunStatus,
|
setBlockRunStatus,
|
||||||
setEdgeRunStatus,
|
setEdgeRunStatus,
|
||||||
addNotification,
|
addNotification,
|
||||||
addConsole,
|
buildBlockEventHandlers,
|
||||||
cancelRunningEntries,
|
handleExecutionErrorConsole,
|
||||||
|
handleExecutionCancelledConsole,
|
||||||
executionStream,
|
executionStream,
|
||||||
]
|
]
|
||||||
)
|
)
|
||||||
|
|||||||
@@ -1,11 +1,11 @@
|
|||||||
import {
|
import {
|
||||||
Building2,
|
|
||||||
Clock,
|
Clock,
|
||||||
Database,
|
Database,
|
||||||
HardDrive,
|
HardDrive,
|
||||||
HeadphonesIcon,
|
HeadphonesIcon,
|
||||||
Server,
|
Server,
|
||||||
ShieldCheck,
|
ShieldCheck,
|
||||||
|
Timer,
|
||||||
Users,
|
Users,
|
||||||
Zap,
|
Zap,
|
||||||
} from 'lucide-react'
|
} from 'lucide-react'
|
||||||
@@ -15,8 +15,8 @@ import type { PlanFeature } from '@/app/workspace/[workspaceId]/w/components/sid
|
|||||||
export const PRO_PLAN_FEATURES: PlanFeature[] = [
|
export const PRO_PLAN_FEATURES: PlanFeature[] = [
|
||||||
{ icon: Zap, text: '150 runs per minute (sync)' },
|
{ icon: Zap, text: '150 runs per minute (sync)' },
|
||||||
{ icon: Clock, text: '1,000 runs per minute (async)' },
|
{ icon: Clock, text: '1,000 runs per minute (async)' },
|
||||||
|
{ icon: Timer, text: '60 min sync execution limit' },
|
||||||
{ icon: HardDrive, text: '50GB file storage' },
|
{ icon: HardDrive, text: '50GB file storage' },
|
||||||
{ icon: Building2, text: 'Unlimited workspaces' },
|
|
||||||
{ icon: Users, text: 'Unlimited invites' },
|
{ icon: Users, text: 'Unlimited invites' },
|
||||||
{ icon: Database, text: 'Unlimited log retention' },
|
{ icon: Database, text: 'Unlimited log retention' },
|
||||||
]
|
]
|
||||||
@@ -24,8 +24,8 @@ export const PRO_PLAN_FEATURES: PlanFeature[] = [
|
|||||||
export const TEAM_PLAN_FEATURES: PlanFeature[] = [
|
export const TEAM_PLAN_FEATURES: PlanFeature[] = [
|
||||||
{ icon: Zap, text: '300 runs per minute (sync)' },
|
{ icon: Zap, text: '300 runs per minute (sync)' },
|
||||||
{ icon: Clock, text: '2,500 runs per minute (async)' },
|
{ icon: Clock, text: '2,500 runs per minute (async)' },
|
||||||
|
{ icon: Timer, text: '60 min sync execution limit' },
|
||||||
{ icon: HardDrive, text: '500GB file storage (pooled)' },
|
{ icon: HardDrive, text: '500GB file storage (pooled)' },
|
||||||
{ icon: Building2, text: 'Unlimited workspaces' },
|
|
||||||
{ icon: Users, text: 'Unlimited invites' },
|
{ icon: Users, text: 'Unlimited invites' },
|
||||||
{ icon: Database, text: 'Unlimited log retention' },
|
{ icon: Database, text: 'Unlimited log retention' },
|
||||||
{ icon: SlackMonoIcon, text: 'Dedicated Slack channel' },
|
{ icon: SlackMonoIcon, text: 'Dedicated Slack channel' },
|
||||||
|
|||||||
@@ -4,6 +4,7 @@ import { task } from '@trigger.dev/sdk'
|
|||||||
import { Cron } from 'croner'
|
import { Cron } from 'croner'
|
||||||
import { eq } from 'drizzle-orm'
|
import { eq } from 'drizzle-orm'
|
||||||
import { v4 as uuidv4 } from 'uuid'
|
import { v4 as uuidv4 } from 'uuid'
|
||||||
|
import { createTimeoutAbortController, getTimeoutErrorMessage } from '@/lib/core/execution-limits'
|
||||||
import { preprocessExecution } from '@/lib/execution/preprocessing'
|
import { preprocessExecution } from '@/lib/execution/preprocessing'
|
||||||
import { LoggingSession } from '@/lib/logs/execution/logging-session'
|
import { LoggingSession } from '@/lib/logs/execution/logging-session'
|
||||||
import { buildTraceSpans } from '@/lib/logs/execution/trace-spans/trace-spans'
|
import { buildTraceSpans } from '@/lib/logs/execution/trace-spans/trace-spans'
|
||||||
@@ -120,6 +121,7 @@ async function runWorkflowExecution({
|
|||||||
loggingSession,
|
loggingSession,
|
||||||
requestId,
|
requestId,
|
||||||
executionId,
|
executionId,
|
||||||
|
asyncTimeout,
|
||||||
}: {
|
}: {
|
||||||
payload: ScheduleExecutionPayload
|
payload: ScheduleExecutionPayload
|
||||||
workflowRecord: WorkflowRecord
|
workflowRecord: WorkflowRecord
|
||||||
@@ -127,6 +129,7 @@ async function runWorkflowExecution({
|
|||||||
loggingSession: LoggingSession
|
loggingSession: LoggingSession
|
||||||
requestId: string
|
requestId: string
|
||||||
executionId: string
|
executionId: string
|
||||||
|
asyncTimeout?: number
|
||||||
}): Promise<RunWorkflowResult> {
|
}): Promise<RunWorkflowResult> {
|
||||||
try {
|
try {
|
||||||
logger.debug(`[${requestId}] Loading deployed workflow ${payload.workflowId}`)
|
logger.debug(`[${requestId}] Loading deployed workflow ${payload.workflowId}`)
|
||||||
@@ -181,15 +184,33 @@ async function runWorkflowExecution({
|
|||||||
[]
|
[]
|
||||||
)
|
)
|
||||||
|
|
||||||
const executionResult = await executeWorkflowCore({
|
const timeoutController = createTimeoutAbortController(asyncTimeout)
|
||||||
snapshot,
|
|
||||||
callbacks: {},
|
|
||||||
loggingSession,
|
|
||||||
includeFileBase64: true,
|
|
||||||
base64MaxBytes: undefined,
|
|
||||||
})
|
|
||||||
|
|
||||||
if (executionResult.status === 'paused') {
|
let executionResult
|
||||||
|
try {
|
||||||
|
executionResult = await executeWorkflowCore({
|
||||||
|
snapshot,
|
||||||
|
callbacks: {},
|
||||||
|
loggingSession,
|
||||||
|
includeFileBase64: true,
|
||||||
|
base64MaxBytes: undefined,
|
||||||
|
abortSignal: timeoutController.signal,
|
||||||
|
})
|
||||||
|
} finally {
|
||||||
|
timeoutController.cleanup()
|
||||||
|
}
|
||||||
|
|
||||||
|
if (
|
||||||
|
executionResult.status === 'cancelled' &&
|
||||||
|
timeoutController.isTimedOut() &&
|
||||||
|
timeoutController.timeoutMs
|
||||||
|
) {
|
||||||
|
const timeoutErrorMessage = getTimeoutErrorMessage(null, timeoutController.timeoutMs)
|
||||||
|
logger.info(`[${requestId}] Scheduled workflow execution timed out`, {
|
||||||
|
timeoutMs: timeoutController.timeoutMs,
|
||||||
|
})
|
||||||
|
await loggingSession.markAsFailed(timeoutErrorMessage)
|
||||||
|
} else if (executionResult.status === 'paused') {
|
||||||
if (!executionResult.snapshotSeed) {
|
if (!executionResult.snapshotSeed) {
|
||||||
logger.error(`[${requestId}] Missing snapshot seed for paused execution`, {
|
logger.error(`[${requestId}] Missing snapshot seed for paused execution`, {
|
||||||
executionId,
|
executionId,
|
||||||
@@ -453,6 +474,7 @@ export async function executeScheduleJob(payload: ScheduleExecutionPayload) {
|
|||||||
loggingSession,
|
loggingSession,
|
||||||
requestId,
|
requestId,
|
||||||
executionId,
|
executionId,
|
||||||
|
asyncTimeout: preprocessResult.executionTimeout?.async,
|
||||||
})
|
})
|
||||||
|
|
||||||
if (executionResult.status === 'skip') {
|
if (executionResult.status === 'skip') {
|
||||||
|
|||||||
@@ -4,7 +4,14 @@ import { createLogger } from '@sim/logger'
|
|||||||
import { task } from '@trigger.dev/sdk'
|
import { task } from '@trigger.dev/sdk'
|
||||||
import { eq } from 'drizzle-orm'
|
import { eq } from 'drizzle-orm'
|
||||||
import { v4 as uuidv4 } from 'uuid'
|
import { v4 as uuidv4 } from 'uuid'
|
||||||
|
import { getHighestPrioritySubscription } from '@/lib/billing'
|
||||||
|
import {
|
||||||
|
createTimeoutAbortController,
|
||||||
|
getExecutionTimeout,
|
||||||
|
getTimeoutErrorMessage,
|
||||||
|
} from '@/lib/core/execution-limits'
|
||||||
import { IdempotencyService, webhookIdempotency } from '@/lib/core/idempotency'
|
import { IdempotencyService, webhookIdempotency } from '@/lib/core/idempotency'
|
||||||
|
import type { SubscriptionPlan } from '@/lib/core/rate-limiter/types'
|
||||||
import { processExecutionFiles } from '@/lib/execution/files'
|
import { processExecutionFiles } from '@/lib/execution/files'
|
||||||
import { LoggingSession } from '@/lib/logs/execution/logging-session'
|
import { LoggingSession } from '@/lib/logs/execution/logging-session'
|
||||||
import { buildTraceSpans } from '@/lib/logs/execution/trace-spans/trace-spans'
|
import { buildTraceSpans } from '@/lib/logs/execution/trace-spans/trace-spans'
|
||||||
@@ -134,7 +141,13 @@ async function executeWebhookJobInternal(
|
|||||||
requestId
|
requestId
|
||||||
)
|
)
|
||||||
|
|
||||||
// Track deploymentVersionId at function scope so it's available in catch block
|
const userSubscription = await getHighestPrioritySubscription(payload.userId)
|
||||||
|
const asyncTimeout = getExecutionTimeout(
|
||||||
|
userSubscription?.plan as SubscriptionPlan | undefined,
|
||||||
|
'async'
|
||||||
|
)
|
||||||
|
const timeoutController = createTimeoutAbortController(asyncTimeout)
|
||||||
|
|
||||||
let deploymentVersionId: string | undefined
|
let deploymentVersionId: string | undefined
|
||||||
|
|
||||||
try {
|
try {
|
||||||
@@ -241,11 +254,22 @@ async function executeWebhookJobInternal(
|
|||||||
snapshot,
|
snapshot,
|
||||||
callbacks: {},
|
callbacks: {},
|
||||||
loggingSession,
|
loggingSession,
|
||||||
includeFileBase64: true, // Enable base64 hydration
|
includeFileBase64: true,
|
||||||
base64MaxBytes: undefined, // Use default limit
|
base64MaxBytes: undefined,
|
||||||
|
abortSignal: timeoutController.signal,
|
||||||
})
|
})
|
||||||
|
|
||||||
if (executionResult.status === 'paused') {
|
if (
|
||||||
|
executionResult.status === 'cancelled' &&
|
||||||
|
timeoutController.isTimedOut() &&
|
||||||
|
timeoutController.timeoutMs
|
||||||
|
) {
|
||||||
|
const timeoutErrorMessage = getTimeoutErrorMessage(null, timeoutController.timeoutMs)
|
||||||
|
logger.info(`[${requestId}] Airtable webhook execution timed out`, {
|
||||||
|
timeoutMs: timeoutController.timeoutMs,
|
||||||
|
})
|
||||||
|
await loggingSession.markAsFailed(timeoutErrorMessage)
|
||||||
|
} else if (executionResult.status === 'paused') {
|
||||||
if (!executionResult.snapshotSeed) {
|
if (!executionResult.snapshotSeed) {
|
||||||
logger.error(`[${requestId}] Missing snapshot seed for paused execution`, {
|
logger.error(`[${requestId}] Missing snapshot seed for paused execution`, {
|
||||||
executionId,
|
executionId,
|
||||||
@@ -497,9 +521,20 @@ async function executeWebhookJobInternal(
|
|||||||
callbacks: {},
|
callbacks: {},
|
||||||
loggingSession,
|
loggingSession,
|
||||||
includeFileBase64: true,
|
includeFileBase64: true,
|
||||||
|
abortSignal: timeoutController.signal,
|
||||||
})
|
})
|
||||||
|
|
||||||
if (executionResult.status === 'paused') {
|
if (
|
||||||
|
executionResult.status === 'cancelled' &&
|
||||||
|
timeoutController.isTimedOut() &&
|
||||||
|
timeoutController.timeoutMs
|
||||||
|
) {
|
||||||
|
const timeoutErrorMessage = getTimeoutErrorMessage(null, timeoutController.timeoutMs)
|
||||||
|
logger.info(`[${requestId}] Webhook execution timed out`, {
|
||||||
|
timeoutMs: timeoutController.timeoutMs,
|
||||||
|
})
|
||||||
|
await loggingSession.markAsFailed(timeoutErrorMessage)
|
||||||
|
} else if (executionResult.status === 'paused') {
|
||||||
if (!executionResult.snapshotSeed) {
|
if (!executionResult.snapshotSeed) {
|
||||||
logger.error(`[${requestId}] Missing snapshot seed for paused execution`, {
|
logger.error(`[${requestId}] Missing snapshot seed for paused execution`, {
|
||||||
executionId,
|
executionId,
|
||||||
@@ -601,6 +636,8 @@ async function executeWebhookJobInternal(
|
|||||||
}
|
}
|
||||||
|
|
||||||
throw error
|
throw error
|
||||||
|
} finally {
|
||||||
|
timeoutController.cleanup()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -1,6 +1,7 @@
|
|||||||
import { createLogger } from '@sim/logger'
|
import { createLogger } from '@sim/logger'
|
||||||
import { task } from '@trigger.dev/sdk'
|
import { task } from '@trigger.dev/sdk'
|
||||||
import { v4 as uuidv4 } from 'uuid'
|
import { v4 as uuidv4 } from 'uuid'
|
||||||
|
import { createTimeoutAbortController, getTimeoutErrorMessage } from '@/lib/core/execution-limits'
|
||||||
import { preprocessExecution } from '@/lib/execution/preprocessing'
|
import { preprocessExecution } from '@/lib/execution/preprocessing'
|
||||||
import { LoggingSession } from '@/lib/logs/execution/logging-session'
|
import { LoggingSession } from '@/lib/logs/execution/logging-session'
|
||||||
import { buildTraceSpans } from '@/lib/logs/execution/trace-spans/trace-spans'
|
import { buildTraceSpans } from '@/lib/logs/execution/trace-spans/trace-spans'
|
||||||
@@ -103,15 +104,33 @@ export async function executeWorkflowJob(payload: WorkflowExecutionPayload) {
|
|||||||
[]
|
[]
|
||||||
)
|
)
|
||||||
|
|
||||||
const result = await executeWorkflowCore({
|
const timeoutController = createTimeoutAbortController(preprocessResult.executionTimeout?.async)
|
||||||
snapshot,
|
|
||||||
callbacks: {},
|
|
||||||
loggingSession,
|
|
||||||
includeFileBase64: true,
|
|
||||||
base64MaxBytes: undefined,
|
|
||||||
})
|
|
||||||
|
|
||||||
if (result.status === 'paused') {
|
let result
|
||||||
|
try {
|
||||||
|
result = await executeWorkflowCore({
|
||||||
|
snapshot,
|
||||||
|
callbacks: {},
|
||||||
|
loggingSession,
|
||||||
|
includeFileBase64: true,
|
||||||
|
base64MaxBytes: undefined,
|
||||||
|
abortSignal: timeoutController.signal,
|
||||||
|
})
|
||||||
|
} finally {
|
||||||
|
timeoutController.cleanup()
|
||||||
|
}
|
||||||
|
|
||||||
|
if (
|
||||||
|
result.status === 'cancelled' &&
|
||||||
|
timeoutController.isTimedOut() &&
|
||||||
|
timeoutController.timeoutMs
|
||||||
|
) {
|
||||||
|
const timeoutErrorMessage = getTimeoutErrorMessage(null, timeoutController.timeoutMs)
|
||||||
|
logger.info(`[${requestId}] Workflow execution timed out`, {
|
||||||
|
timeoutMs: timeoutController.timeoutMs,
|
||||||
|
})
|
||||||
|
await loggingSession.markAsFailed(timeoutErrorMessage)
|
||||||
|
} else if (result.status === 'paused') {
|
||||||
if (!result.snapshotSeed) {
|
if (!result.snapshotSeed) {
|
||||||
logger.error(`[${requestId}] Missing snapshot seed for paused execution`, {
|
logger.error(`[${requestId}] Missing snapshot seed for paused execution`, {
|
||||||
executionId,
|
executionId,
|
||||||
|
|||||||
@@ -1,3 +1,4 @@
|
|||||||
|
import { getMaxExecutionTimeout } from '@/lib/core/execution-limits'
|
||||||
import type { LoopType, ParallelType } from '@/lib/workflows/types'
|
import type { LoopType, ParallelType } from '@/lib/workflows/types'
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -187,8 +188,12 @@ export const HTTP = {
|
|||||||
|
|
||||||
export const AGENT = {
|
export const AGENT = {
|
||||||
DEFAULT_MODEL: 'claude-sonnet-4-5',
|
DEFAULT_MODEL: 'claude-sonnet-4-5',
|
||||||
DEFAULT_FUNCTION_TIMEOUT: 600000,
|
get DEFAULT_FUNCTION_TIMEOUT() {
|
||||||
REQUEST_TIMEOUT: 600000,
|
return getMaxExecutionTimeout()
|
||||||
|
},
|
||||||
|
get REQUEST_TIMEOUT() {
|
||||||
|
return getMaxExecutionTimeout()
|
||||||
|
},
|
||||||
CUSTOM_TOOL_PREFIX: 'custom_',
|
CUSTOM_TOOL_PREFIX: 'custom_',
|
||||||
} as const
|
} as const
|
||||||
|
|
||||||
|
|||||||
@@ -162,6 +162,8 @@ export class ExecutionEngine {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
this.finalizeIncompleteLogs()
|
||||||
|
|
||||||
const errorMessage = normalizeError(error)
|
const errorMessage = normalizeError(error)
|
||||||
logger.error('Execution failed', { error: errorMessage })
|
logger.error('Execution failed', { error: errorMessage })
|
||||||
|
|
||||||
|
|||||||
@@ -14,7 +14,7 @@ const sleep = async (ms: number, options: SleepOptions = {}): Promise<boolean> =
|
|||||||
const { signal, executionId } = options
|
const { signal, executionId } = options
|
||||||
const useRedis = isRedisCancellationEnabled() && !!executionId
|
const useRedis = isRedisCancellationEnabled() && !!executionId
|
||||||
|
|
||||||
if (!useRedis && signal?.aborted) {
|
if (signal?.aborted) {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -27,7 +27,7 @@ const sleep = async (ms: number, options: SleepOptions = {}): Promise<boolean> =
|
|||||||
const cleanup = () => {
|
const cleanup = () => {
|
||||||
if (mainTimeoutId) clearTimeout(mainTimeoutId)
|
if (mainTimeoutId) clearTimeout(mainTimeoutId)
|
||||||
if (checkIntervalId) clearInterval(checkIntervalId)
|
if (checkIntervalId) clearInterval(checkIntervalId)
|
||||||
if (!useRedis && signal) signal.removeEventListener('abort', onAbort)
|
if (signal) signal.removeEventListener('abort', onAbort)
|
||||||
}
|
}
|
||||||
|
|
||||||
const onAbort = () => {
|
const onAbort = () => {
|
||||||
@@ -37,6 +37,10 @@ const sleep = async (ms: number, options: SleepOptions = {}): Promise<boolean> =
|
|||||||
resolve(false)
|
resolve(false)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (signal) {
|
||||||
|
signal.addEventListener('abort', onAbort, { once: true })
|
||||||
|
}
|
||||||
|
|
||||||
if (useRedis) {
|
if (useRedis) {
|
||||||
checkIntervalId = setInterval(async () => {
|
checkIntervalId = setInterval(async () => {
|
||||||
if (resolved) return
|
if (resolved) return
|
||||||
@@ -49,8 +53,6 @@ const sleep = async (ms: number, options: SleepOptions = {}): Promise<boolean> =
|
|||||||
}
|
}
|
||||||
} catch {}
|
} catch {}
|
||||||
}, CANCELLATION_CHECK_INTERVAL_MS)
|
}, CANCELLATION_CHECK_INTERVAL_MS)
|
||||||
} else if (signal) {
|
|
||||||
signal.addEventListener('abort', onAbort, { once: true })
|
|
||||||
}
|
}
|
||||||
|
|
||||||
mainTimeoutId = setTimeout(() => {
|
mainTimeoutId = setTimeout(() => {
|
||||||
|
|||||||
@@ -126,6 +126,7 @@ export class WorkflowBlockHandler implements BlockHandler {
|
|||||||
workspaceId: ctx.workspaceId,
|
workspaceId: ctx.workspaceId,
|
||||||
userId: ctx.userId,
|
userId: ctx.userId,
|
||||||
executionId: ctx.executionId,
|
executionId: ctx.executionId,
|
||||||
|
abortSignal: ctx.abortSignal,
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
|
|
||||||
|
|||||||
@@ -229,6 +229,10 @@ export function addSubflowErrorLog(
|
|||||||
}
|
}
|
||||||
ctx.blockLogs.push(blockLog)
|
ctx.blockLogs.push(blockLog)
|
||||||
|
|
||||||
|
if (contextExtensions?.onBlockStart) {
|
||||||
|
contextExtensions.onBlockStart(blockId, blockName, blockType, execOrder)
|
||||||
|
}
|
||||||
|
|
||||||
if (contextExtensions?.onBlockComplete) {
|
if (contextExtensions?.onBlockComplete) {
|
||||||
contextExtensions.onBlockComplete(blockId, blockName, blockType, {
|
contextExtensions.onBlockComplete(blockId, blockName, blockType, {
|
||||||
input: inputData,
|
input: inputData,
|
||||||
|
|||||||
@@ -1,7 +1,10 @@
|
|||||||
export { AGENT_CARD_PATH } from '@a2a-js/sdk'
|
export { AGENT_CARD_PATH } from '@a2a-js/sdk'
|
||||||
|
|
||||||
|
import { DEFAULT_EXECUTION_TIMEOUT_MS } from '@/lib/core/execution-limits'
|
||||||
|
|
||||||
export const A2A_PROTOCOL_VERSION = '0.3.0'
|
export const A2A_PROTOCOL_VERSION = '0.3.0'
|
||||||
|
|
||||||
export const A2A_DEFAULT_TIMEOUT = 300000
|
export const A2A_DEFAULT_TIMEOUT = DEFAULT_EXECUTION_TIMEOUT_MS
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Maximum number of messages stored per task in the database.
|
* Maximum number of messages stored per task in the database.
|
||||||
|
|||||||
@@ -5,10 +5,8 @@ import type { ToolUIConfig } from './ui-config'
|
|||||||
|
|
||||||
const baseToolLogger = createLogger('BaseClientTool')
|
const baseToolLogger = createLogger('BaseClientTool')
|
||||||
|
|
||||||
/** Default timeout for tool execution (5 minutes) */
|
const DEFAULT_TOOL_TIMEOUT_MS = 5 * 60 * 1000
|
||||||
const DEFAULT_TOOL_TIMEOUT_MS = 2 * 60 * 1000
|
|
||||||
|
|
||||||
/** Timeout for tools that run workflows (10 minutes) */
|
|
||||||
export const WORKFLOW_EXECUTION_TIMEOUT_MS = 10 * 60 * 1000
|
export const WORKFLOW_EXECUTION_TIMEOUT_MS = 10 * 60 * 1000
|
||||||
|
|
||||||
// Client tool call states used by the new runtime
|
// Client tool call states used by the new runtime
|
||||||
|
|||||||
@@ -170,6 +170,11 @@ export const env = createEnv({
|
|||||||
RATE_LIMIT_ENTERPRISE_SYNC: z.string().optional().default('600'), // Enterprise tier sync API executions per minute
|
RATE_LIMIT_ENTERPRISE_SYNC: z.string().optional().default('600'), // Enterprise tier sync API executions per minute
|
||||||
RATE_LIMIT_ENTERPRISE_ASYNC: z.string().optional().default('5000'), // Enterprise tier async API executions per minute
|
RATE_LIMIT_ENTERPRISE_ASYNC: z.string().optional().default('5000'), // Enterprise tier async API executions per minute
|
||||||
|
|
||||||
|
EXECUTION_TIMEOUT_FREE: z.string().optional().default('300'),
|
||||||
|
EXECUTION_TIMEOUT_PRO: z.string().optional().default('3600'),
|
||||||
|
EXECUTION_TIMEOUT_TEAM: z.string().optional().default('3600'),
|
||||||
|
EXECUTION_TIMEOUT_ENTERPRISE: z.string().optional().default('3600'),
|
||||||
|
|
||||||
// Knowledge Base Processing Configuration - Shared across all processing methods
|
// Knowledge Base Processing Configuration - Shared across all processing methods
|
||||||
KB_CONFIG_MAX_DURATION: z.number().optional().default(600), // Max processing duration in seconds (10 minutes)
|
KB_CONFIG_MAX_DURATION: z.number().optional().default(600), // Max processing duration in seconds (10 minutes)
|
||||||
KB_CONFIG_MAX_ATTEMPTS: z.number().optional().default(3), // Max retry attempts
|
KB_CONFIG_MAX_ATTEMPTS: z.number().optional().default(3), // Max retry attempts
|
||||||
|
|||||||
1
apps/sim/lib/core/execution-limits/index.ts
Normal file
1
apps/sim/lib/core/execution-limits/index.ts
Normal file
@@ -0,0 +1 @@
|
|||||||
|
export * from './types'
|
||||||
134
apps/sim/lib/core/execution-limits/types.ts
Normal file
134
apps/sim/lib/core/execution-limits/types.ts
Normal file
@@ -0,0 +1,134 @@
|
|||||||
|
import { env } from '@/lib/core/config/env'
|
||||||
|
import type { SubscriptionPlan } from '@/lib/core/rate-limiter/types'
|
||||||
|
|
||||||
|
interface ExecutionTimeoutConfig {
|
||||||
|
sync: number
|
||||||
|
async: number
|
||||||
|
}
|
||||||
|
|
||||||
|
const DEFAULT_SYNC_TIMEOUTS_SECONDS = {
|
||||||
|
free: 300,
|
||||||
|
pro: 3600,
|
||||||
|
team: 3600,
|
||||||
|
enterprise: 3600,
|
||||||
|
} as const
|
||||||
|
|
||||||
|
const ASYNC_MULTIPLIER = 2
|
||||||
|
const MAX_ASYNC_TIMEOUT_SECONDS = 5400
|
||||||
|
|
||||||
|
function getSyncTimeoutForPlan(plan: SubscriptionPlan): number {
|
||||||
|
const envVarMap: Record<SubscriptionPlan, string | undefined> = {
|
||||||
|
free: env.EXECUTION_TIMEOUT_FREE,
|
||||||
|
pro: env.EXECUTION_TIMEOUT_PRO,
|
||||||
|
team: env.EXECUTION_TIMEOUT_TEAM,
|
||||||
|
enterprise: env.EXECUTION_TIMEOUT_ENTERPRISE,
|
||||||
|
}
|
||||||
|
return (Number.parseInt(envVarMap[plan] || '') || DEFAULT_SYNC_TIMEOUTS_SECONDS[plan]) * 1000
|
||||||
|
}
|
||||||
|
|
||||||
|
function getAsyncTimeoutForPlan(plan: SubscriptionPlan): number {
|
||||||
|
const syncMs = getSyncTimeoutForPlan(plan)
|
||||||
|
const asyncMs = syncMs * ASYNC_MULTIPLIER
|
||||||
|
const maxAsyncMs = MAX_ASYNC_TIMEOUT_SECONDS * 1000
|
||||||
|
return Math.min(asyncMs, maxAsyncMs)
|
||||||
|
}
|
||||||
|
|
||||||
|
const EXECUTION_TIMEOUTS: Record<SubscriptionPlan, ExecutionTimeoutConfig> = {
|
||||||
|
free: {
|
||||||
|
sync: getSyncTimeoutForPlan('free'),
|
||||||
|
async: getAsyncTimeoutForPlan('free'),
|
||||||
|
},
|
||||||
|
pro: {
|
||||||
|
sync: getSyncTimeoutForPlan('pro'),
|
||||||
|
async: getAsyncTimeoutForPlan('pro'),
|
||||||
|
},
|
||||||
|
team: {
|
||||||
|
sync: getSyncTimeoutForPlan('team'),
|
||||||
|
async: getAsyncTimeoutForPlan('team'),
|
||||||
|
},
|
||||||
|
enterprise: {
|
||||||
|
sync: getSyncTimeoutForPlan('enterprise'),
|
||||||
|
async: getAsyncTimeoutForPlan('enterprise'),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
export function getExecutionTimeout(
|
||||||
|
plan: SubscriptionPlan | undefined,
|
||||||
|
type: 'sync' | 'async' = 'sync'
|
||||||
|
): number {
|
||||||
|
return EXECUTION_TIMEOUTS[plan || 'free'][type]
|
||||||
|
}
|
||||||
|
|
||||||
|
export function getMaxExecutionTimeout(): number {
|
||||||
|
return EXECUTION_TIMEOUTS.enterprise.async
|
||||||
|
}
|
||||||
|
|
||||||
|
export const DEFAULT_EXECUTION_TIMEOUT_MS = EXECUTION_TIMEOUTS.free.sync
|
||||||
|
|
||||||
|
export function isTimeoutError(error: unknown): boolean {
|
||||||
|
if (!error) return false
|
||||||
|
|
||||||
|
if (error instanceof Error) {
|
||||||
|
return error.name === 'TimeoutError'
|
||||||
|
}
|
||||||
|
|
||||||
|
if (typeof error === 'object' && 'name' in error) {
|
||||||
|
return (error as { name: string }).name === 'TimeoutError'
|
||||||
|
}
|
||||||
|
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
export function getTimeoutErrorMessage(error: unknown, timeoutMs?: number): string {
|
||||||
|
if (timeoutMs) {
|
||||||
|
const timeoutSeconds = Math.floor(timeoutMs / 1000)
|
||||||
|
const timeoutMinutes = Math.floor(timeoutSeconds / 60)
|
||||||
|
const displayTime =
|
||||||
|
timeoutMinutes > 0
|
||||||
|
? `${timeoutMinutes} minute${timeoutMinutes > 1 ? 's' : ''}`
|
||||||
|
: `${timeoutSeconds} seconds`
|
||||||
|
return `Execution timed out after ${displayTime}`
|
||||||
|
}
|
||||||
|
|
||||||
|
return 'Execution timed out'
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Helper to create an AbortController with timeout handling.
|
||||||
|
* Centralizes the timeout abort pattern used across execution paths.
|
||||||
|
*/
|
||||||
|
export interface TimeoutAbortController {
|
||||||
|
/** The AbortSignal to pass to execution functions */
|
||||||
|
signal: AbortSignal
|
||||||
|
/** Returns true if the abort was triggered by timeout (not user cancellation) */
|
||||||
|
isTimedOut: () => boolean
|
||||||
|
/** Cleanup function - call in finally block to clear the timeout */
|
||||||
|
cleanup: () => void
|
||||||
|
/** Manually abort the execution (for user cancellation) */
|
||||||
|
abort: () => void
|
||||||
|
/** The timeout duration in milliseconds (undefined if no timeout) */
|
||||||
|
timeoutMs: number | undefined
|
||||||
|
}
|
||||||
|
|
||||||
|
export function createTimeoutAbortController(timeoutMs?: number): TimeoutAbortController {
|
||||||
|
const abortController = new AbortController()
|
||||||
|
let isTimedOut = false
|
||||||
|
let timeoutId: NodeJS.Timeout | undefined
|
||||||
|
|
||||||
|
if (timeoutMs) {
|
||||||
|
timeoutId = setTimeout(() => {
|
||||||
|
isTimedOut = true
|
||||||
|
abortController.abort()
|
||||||
|
}, timeoutMs)
|
||||||
|
}
|
||||||
|
|
||||||
|
return {
|
||||||
|
signal: abortController.signal,
|
||||||
|
isTimedOut: () => isTimedOut,
|
||||||
|
cleanup: () => {
|
||||||
|
if (timeoutId) clearTimeout(timeoutId)
|
||||||
|
},
|
||||||
|
abort: () => abortController.abort(),
|
||||||
|
timeoutMs,
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -4,6 +4,7 @@ import { idempotencyKey } from '@sim/db/schema'
|
|||||||
import { createLogger } from '@sim/logger'
|
import { createLogger } from '@sim/logger'
|
||||||
import { eq } from 'drizzle-orm'
|
import { eq } from 'drizzle-orm'
|
||||||
import { getRedisClient } from '@/lib/core/config/redis'
|
import { getRedisClient } from '@/lib/core/config/redis'
|
||||||
|
import { getMaxExecutionTimeout } from '@/lib/core/execution-limits'
|
||||||
import { getStorageMethod, type StorageMethod } from '@/lib/core/storage'
|
import { getStorageMethod, type StorageMethod } from '@/lib/core/storage'
|
||||||
import { extractProviderIdentifierFromBody } from '@/lib/webhooks/provider-utils'
|
import { extractProviderIdentifierFromBody } from '@/lib/webhooks/provider-utils'
|
||||||
|
|
||||||
@@ -36,9 +37,9 @@ export interface AtomicClaimResult {
|
|||||||
storageMethod: StorageMethod
|
storageMethod: StorageMethod
|
||||||
}
|
}
|
||||||
|
|
||||||
const DEFAULT_TTL = 60 * 60 * 24 * 7 // 7 days
|
const DEFAULT_TTL = 60 * 60 * 24 * 7
|
||||||
const REDIS_KEY_PREFIX = 'idempotency:'
|
const REDIS_KEY_PREFIX = 'idempotency:'
|
||||||
const MAX_WAIT_TIME_MS = 300000 // 5 minutes max wait
|
const MAX_WAIT_TIME_MS = getMaxExecutionTimeout()
|
||||||
const POLL_INTERVAL_MS = 1000
|
const POLL_INTERVAL_MS = 1000
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|||||||
@@ -50,3 +50,13 @@ export function getInvalidCharacters(name: string): string[] {
|
|||||||
const invalidChars = name.match(/[^a-zA-Z0-9_\s]/g)
|
const invalidChars = name.match(/[^a-zA-Z0-9_\s]/g)
|
||||||
return invalidChars ? [...new Set(invalidChars)] : []
|
return invalidChars ? [...new Set(invalidChars)] : []
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Escapes non-ASCII characters in JSON string for HTTP header safety.
|
||||||
|
* Dropbox API requires characters 0x7F and all non-ASCII to be escaped as \uXXXX.
|
||||||
|
*/
|
||||||
|
export function httpHeaderSafeJson(value: object): string {
|
||||||
|
return JSON.stringify(value).replace(/[\u007f-\uffff]/g, (c) => {
|
||||||
|
return `\\u${(`0000${c.charCodeAt(0).toString(16)}`).slice(-4)}`
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|||||||
@@ -1,7 +1,3 @@
|
|||||||
/**
|
import { DEFAULT_EXECUTION_TIMEOUT_MS } from '@/lib/core/execution-limits'
|
||||||
* Execution timeout constants
|
|
||||||
*
|
|
||||||
* DEFAULT_EXECUTION_TIMEOUT_MS: The default timeout for executing user code (10 minutes)
|
|
||||||
*/
|
|
||||||
|
|
||||||
export const DEFAULT_EXECUTION_TIMEOUT_MS = 600000 // 10 minutes (600 seconds)
|
export { DEFAULT_EXECUTION_TIMEOUT_MS }
|
||||||
|
|||||||
@@ -4,7 +4,9 @@ import { createLogger } from '@sim/logger'
|
|||||||
import { eq } from 'drizzle-orm'
|
import { eq } from 'drizzle-orm'
|
||||||
import { checkServerSideUsageLimits } from '@/lib/billing/calculations/usage-monitor'
|
import { checkServerSideUsageLimits } from '@/lib/billing/calculations/usage-monitor'
|
||||||
import { getHighestPrioritySubscription } from '@/lib/billing/core/subscription'
|
import { getHighestPrioritySubscription } from '@/lib/billing/core/subscription'
|
||||||
|
import { getExecutionTimeout } from '@/lib/core/execution-limits'
|
||||||
import { RateLimiter } from '@/lib/core/rate-limiter/rate-limiter'
|
import { RateLimiter } from '@/lib/core/rate-limiter/rate-limiter'
|
||||||
|
import type { SubscriptionPlan } from '@/lib/core/rate-limiter/types'
|
||||||
import { LoggingSession } from '@/lib/logs/execution/logging-session'
|
import { LoggingSession } from '@/lib/logs/execution/logging-session'
|
||||||
import { getWorkspaceBilledAccountUserId } from '@/lib/workspaces/utils'
|
import { getWorkspaceBilledAccountUserId } from '@/lib/workspaces/utils'
|
||||||
import type { CoreTriggerType } from '@/stores/logs/filters/types'
|
import type { CoreTriggerType } from '@/stores/logs/filters/types'
|
||||||
@@ -133,10 +135,10 @@ export interface PreprocessExecutionResult {
|
|||||||
success: boolean
|
success: boolean
|
||||||
error?: {
|
error?: {
|
||||||
message: string
|
message: string
|
||||||
statusCode: number // HTTP status code (401, 402, 403, 404, 429, 500)
|
statusCode: number
|
||||||
logCreated: boolean // Whether error was logged to execution_logs
|
logCreated: boolean
|
||||||
}
|
}
|
||||||
actorUserId?: string // The user ID that will be billed
|
actorUserId?: string
|
||||||
workflowRecord?: WorkflowRecord
|
workflowRecord?: WorkflowRecord
|
||||||
userSubscription?: SubscriptionInfo | null
|
userSubscription?: SubscriptionInfo | null
|
||||||
rateLimitInfo?: {
|
rateLimitInfo?: {
|
||||||
@@ -144,6 +146,10 @@ export interface PreprocessExecutionResult {
|
|||||||
remaining: number
|
remaining: number
|
||||||
resetAt: Date
|
resetAt: Date
|
||||||
}
|
}
|
||||||
|
executionTimeout?: {
|
||||||
|
sync: number
|
||||||
|
async: number
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
type WorkflowRecord = typeof workflow.$inferSelect
|
type WorkflowRecord = typeof workflow.$inferSelect
|
||||||
@@ -484,12 +490,17 @@ export async function preprocessExecution(
|
|||||||
triggerType,
|
triggerType,
|
||||||
})
|
})
|
||||||
|
|
||||||
|
const plan = userSubscription?.plan as SubscriptionPlan | undefined
|
||||||
return {
|
return {
|
||||||
success: true,
|
success: true,
|
||||||
actorUserId,
|
actorUserId,
|
||||||
workflowRecord,
|
workflowRecord,
|
||||||
userSubscription,
|
userSubscription,
|
||||||
rateLimitInfo,
|
rateLimitInfo,
|
||||||
|
executionTimeout: {
|
||||||
|
sync: getExecutionTimeout(plan, 'sync'),
|
||||||
|
async: getExecutionTimeout(plan, 'async'),
|
||||||
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -261,10 +261,14 @@ export class ExecutionLogger implements IExecutionLoggerService {
|
|||||||
models: costSummary.models,
|
models: costSummary.models,
|
||||||
}
|
}
|
||||||
|
|
||||||
const totalDuration =
|
const rawDurationMs =
|
||||||
isResume && existingLog?.startedAt
|
isResume && existingLog?.startedAt
|
||||||
? new Date(endedAt).getTime() - new Date(existingLog.startedAt).getTime()
|
? new Date(endedAt).getTime() - new Date(existingLog.startedAt).getTime()
|
||||||
: totalDurationMs
|
: totalDurationMs
|
||||||
|
const totalDuration =
|
||||||
|
typeof rawDurationMs === 'number' && Number.isFinite(rawDurationMs)
|
||||||
|
? Math.max(0, Math.round(rawDurationMs))
|
||||||
|
: 0
|
||||||
|
|
||||||
const [updatedLog] = await db
|
const [updatedLog] = await db
|
||||||
.update(workflowExecutionLogs)
|
.update(workflowExecutionLogs)
|
||||||
|
|||||||
@@ -776,11 +776,16 @@ export class LoggingSession {
|
|||||||
await db
|
await db
|
||||||
.update(workflowExecutionLogs)
|
.update(workflowExecutionLogs)
|
||||||
.set({
|
.set({
|
||||||
|
level: 'error',
|
||||||
status: 'failed',
|
status: 'failed',
|
||||||
executionData: sql`jsonb_set(
|
executionData: sql`jsonb_set(
|
||||||
COALESCE(execution_data, '{}'::jsonb),
|
jsonb_set(
|
||||||
ARRAY['error'],
|
COALESCE(execution_data, '{}'::jsonb),
|
||||||
to_jsonb(${message}::text)
|
ARRAY['error'],
|
||||||
|
to_jsonb(${message}::text)
|
||||||
|
),
|
||||||
|
ARRAY['finalOutput'],
|
||||||
|
jsonb_build_object('error', ${message}::text)
|
||||||
)`,
|
)`,
|
||||||
})
|
})
|
||||||
.where(eq(workflowExecutionLogs.executionId, executionId))
|
.where(eq(workflowExecutionLogs.executionId, executionId))
|
||||||
|
|||||||
@@ -12,6 +12,7 @@ import { Client } from '@modelcontextprotocol/sdk/client/index.js'
|
|||||||
import { StreamableHTTPClientTransport } from '@modelcontextprotocol/sdk/client/streamableHttp.js'
|
import { StreamableHTTPClientTransport } from '@modelcontextprotocol/sdk/client/streamableHttp.js'
|
||||||
import type { ListToolsResult, Tool } from '@modelcontextprotocol/sdk/types.js'
|
import type { ListToolsResult, Tool } from '@modelcontextprotocol/sdk/types.js'
|
||||||
import { createLogger } from '@sim/logger'
|
import { createLogger } from '@sim/logger'
|
||||||
|
import { getMaxExecutionTimeout } from '@/lib/core/execution-limits'
|
||||||
import {
|
import {
|
||||||
McpConnectionError,
|
McpConnectionError,
|
||||||
type McpConnectionStatus,
|
type McpConnectionStatus,
|
||||||
@@ -202,7 +203,7 @@ export class McpClient {
|
|||||||
const sdkResult = await this.client.callTool(
|
const sdkResult = await this.client.callTool(
|
||||||
{ name: toolCall.name, arguments: toolCall.arguments },
|
{ name: toolCall.name, arguments: toolCall.arguments },
|
||||||
undefined,
|
undefined,
|
||||||
{ timeout: 600000 } // 10 minutes - override SDK's 60s default
|
{ timeout: getMaxExecutionTimeout() }
|
||||||
)
|
)
|
||||||
|
|
||||||
return sdkResult as McpToolResult
|
return sdkResult as McpToolResult
|
||||||
|
|||||||
@@ -32,9 +32,11 @@ export function sanitizeHeaders(
|
|||||||
|
|
||||||
/**
|
/**
|
||||||
* Client-safe MCP constants
|
* Client-safe MCP constants
|
||||||
|
* Note: CLIENT_TIMEOUT should match DEFAULT_EXECUTION_TIMEOUT_MS from @/lib/core/execution-limits
|
||||||
|
* (5 minutes = 300 seconds for free tier). Keep in sync if that value changes.
|
||||||
*/
|
*/
|
||||||
export const MCP_CLIENT_CONSTANTS = {
|
export const MCP_CLIENT_CONSTANTS = {
|
||||||
CLIENT_TIMEOUT: 600000,
|
CLIENT_TIMEOUT: 5 * 60 * 1000, // 5 minutes - matches DEFAULT_EXECUTION_TIMEOUT_MS
|
||||||
MAX_RETRIES: 3,
|
MAX_RETRIES: 3,
|
||||||
RECONNECT_DELAY: 1000,
|
RECONNECT_DELAY: 1000,
|
||||||
} as const
|
} as const
|
||||||
|
|||||||
@@ -1,4 +1,5 @@
|
|||||||
import { describe, expect, it } from 'vitest'
|
import { describe, expect, it } from 'vitest'
|
||||||
|
import { DEFAULT_EXECUTION_TIMEOUT_MS } from '@/lib/core/execution-limits'
|
||||||
import {
|
import {
|
||||||
categorizeError,
|
categorizeError,
|
||||||
createMcpToolId,
|
createMcpToolId,
|
||||||
@@ -81,8 +82,8 @@ describe('generateMcpServerId', () => {
|
|||||||
})
|
})
|
||||||
|
|
||||||
describe('MCP_CONSTANTS', () => {
|
describe('MCP_CONSTANTS', () => {
|
||||||
it.concurrent('has correct execution timeout (10 minutes)', () => {
|
it.concurrent('has correct execution timeout', () => {
|
||||||
expect(MCP_CONSTANTS.EXECUTION_TIMEOUT).toBe(600000)
|
expect(MCP_CONSTANTS.EXECUTION_TIMEOUT).toBe(DEFAULT_EXECUTION_TIMEOUT_MS)
|
||||||
})
|
})
|
||||||
|
|
||||||
it.concurrent('has correct cache timeout (5 minutes)', () => {
|
it.concurrent('has correct cache timeout (5 minutes)', () => {
|
||||||
@@ -107,8 +108,8 @@ describe('MCP_CONSTANTS', () => {
|
|||||||
})
|
})
|
||||||
|
|
||||||
describe('MCP_CLIENT_CONSTANTS', () => {
|
describe('MCP_CLIENT_CONSTANTS', () => {
|
||||||
it.concurrent('has correct client timeout (10 minutes)', () => {
|
it.concurrent('has correct client timeout', () => {
|
||||||
expect(MCP_CLIENT_CONSTANTS.CLIENT_TIMEOUT).toBe(600000)
|
expect(MCP_CLIENT_CONSTANTS.CLIENT_TIMEOUT).toBe(DEFAULT_EXECUTION_TIMEOUT_MS)
|
||||||
})
|
})
|
||||||
|
|
||||||
it.concurrent('has correct auto refresh interval (5 minutes)', () => {
|
it.concurrent('has correct auto refresh interval (5 minutes)', () => {
|
||||||
|
|||||||
@@ -1,12 +1,10 @@
|
|||||||
import { NextResponse } from 'next/server'
|
import { NextResponse } from 'next/server'
|
||||||
|
import { DEFAULT_EXECUTION_TIMEOUT_MS } from '@/lib/core/execution-limits'
|
||||||
import type { McpApiResponse } from '@/lib/mcp/types'
|
import type { McpApiResponse } from '@/lib/mcp/types'
|
||||||
import { isMcpTool, MCP } from '@/executor/constants'
|
import { isMcpTool, MCP } from '@/executor/constants'
|
||||||
|
|
||||||
/**
|
|
||||||
* MCP-specific constants
|
|
||||||
*/
|
|
||||||
export const MCP_CONSTANTS = {
|
export const MCP_CONSTANTS = {
|
||||||
EXECUTION_TIMEOUT: 600000,
|
EXECUTION_TIMEOUT: DEFAULT_EXECUTION_TIMEOUT_MS,
|
||||||
CACHE_TIMEOUT: 5 * 60 * 1000,
|
CACHE_TIMEOUT: 5 * 60 * 1000,
|
||||||
DEFAULT_RETRIES: 3,
|
DEFAULT_RETRIES: 3,
|
||||||
DEFAULT_CONNECTION_TIMEOUT: 30000,
|
DEFAULT_CONNECTION_TIMEOUT: 30000,
|
||||||
@@ -45,11 +43,8 @@ export function sanitizeHeaders(
|
|||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Client-safe MCP constants
|
|
||||||
*/
|
|
||||||
export const MCP_CLIENT_CONSTANTS = {
|
export const MCP_CLIENT_CONSTANTS = {
|
||||||
CLIENT_TIMEOUT: 600000,
|
CLIENT_TIMEOUT: DEFAULT_EXECUTION_TIMEOUT_MS,
|
||||||
AUTO_REFRESH_INTERVAL: 5 * 60 * 1000,
|
AUTO_REFRESH_INTERVAL: 5 * 60 * 1000,
|
||||||
} as const
|
} as const
|
||||||
|
|
||||||
|
|||||||
@@ -19,6 +19,7 @@ export interface ExecuteWorkflowOptions {
|
|||||||
skipLoggingComplete?: boolean
|
skipLoggingComplete?: boolean
|
||||||
includeFileBase64?: boolean
|
includeFileBase64?: boolean
|
||||||
base64MaxBytes?: number
|
base64MaxBytes?: number
|
||||||
|
abortSignal?: AbortSignal
|
||||||
}
|
}
|
||||||
|
|
||||||
export interface WorkflowInfo {
|
export interface WorkflowInfo {
|
||||||
@@ -82,6 +83,7 @@ export async function executeWorkflow(
|
|||||||
loggingSession,
|
loggingSession,
|
||||||
includeFileBase64: streamConfig?.includeFileBase64,
|
includeFileBase64: streamConfig?.includeFileBase64,
|
||||||
base64MaxBytes: streamConfig?.base64MaxBytes,
|
base64MaxBytes: streamConfig?.base64MaxBytes,
|
||||||
|
abortSignal: streamConfig?.abortSignal,
|
||||||
})
|
})
|
||||||
|
|
||||||
if (result.status === 'paused') {
|
if (result.status === 'paused') {
|
||||||
|
|||||||
@@ -58,9 +58,6 @@ export interface ExecutionErrorEvent extends BaseExecutionEvent {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Execution cancelled event
|
|
||||||
*/
|
|
||||||
export interface ExecutionCancelledEvent extends BaseExecutionEvent {
|
export interface ExecutionCancelledEvent extends BaseExecutionEvent {
|
||||||
type: 'execution:cancelled'
|
type: 'execution:cancelled'
|
||||||
workflowId: string
|
workflowId: string
|
||||||
@@ -167,9 +164,6 @@ export type ExecutionEvent =
|
|||||||
| StreamChunkEvent
|
| StreamChunkEvent
|
||||||
| StreamDoneEvent
|
| StreamDoneEvent
|
||||||
|
|
||||||
/**
|
|
||||||
* Extracted data types for use in callbacks
|
|
||||||
*/
|
|
||||||
export type ExecutionStartedData = ExecutionStartedEvent['data']
|
export type ExecutionStartedData = ExecutionStartedEvent['data']
|
||||||
export type ExecutionCompletedData = ExecutionCompletedEvent['data']
|
export type ExecutionCompletedData = ExecutionCompletedEvent['data']
|
||||||
export type ExecutionErrorData = ExecutionErrorEvent['data']
|
export type ExecutionErrorData = ExecutionErrorEvent['data']
|
||||||
|
|||||||
@@ -4,6 +4,7 @@ import { pausedExecutions, resumeQueue, workflowExecutionLogs } from '@sim/db/sc
|
|||||||
import { createLogger } from '@sim/logger'
|
import { createLogger } from '@sim/logger'
|
||||||
import { and, asc, desc, eq, inArray, lt, type SQL, sql } from 'drizzle-orm'
|
import { and, asc, desc, eq, inArray, lt, type SQL, sql } from 'drizzle-orm'
|
||||||
import type { Edge } from 'reactflow'
|
import type { Edge } from 'reactflow'
|
||||||
|
import { createTimeoutAbortController, getTimeoutErrorMessage } from '@/lib/core/execution-limits'
|
||||||
import { preprocessExecution } from '@/lib/execution/preprocessing'
|
import { preprocessExecution } from '@/lib/execution/preprocessing'
|
||||||
import { LoggingSession } from '@/lib/logs/execution/logging-session'
|
import { LoggingSession } from '@/lib/logs/execution/logging-session'
|
||||||
import { executeWorkflowCore } from '@/lib/workflows/executor/execution-core'
|
import { executeWorkflowCore } from '@/lib/workflows/executor/execution-core'
|
||||||
@@ -771,14 +772,39 @@ export class PauseResumeManager {
|
|||||||
actorUserId: metadata.userId,
|
actorUserId: metadata.userId,
|
||||||
})
|
})
|
||||||
|
|
||||||
return await executeWorkflowCore({
|
const timeoutController = createTimeoutAbortController(
|
||||||
snapshot: resumeSnapshot,
|
preprocessingResult.executionTimeout?.async
|
||||||
callbacks: {},
|
)
|
||||||
loggingSession,
|
|
||||||
skipLogCreation: true, // Reuse existing log entry
|
let result: ExecutionResult
|
||||||
includeFileBase64: true, // Enable base64 hydration
|
try {
|
||||||
base64MaxBytes: undefined, // Use default limit
|
result = await executeWorkflowCore({
|
||||||
})
|
snapshot: resumeSnapshot,
|
||||||
|
callbacks: {},
|
||||||
|
loggingSession,
|
||||||
|
skipLogCreation: true, // Reuse existing log entry
|
||||||
|
includeFileBase64: true, // Enable base64 hydration
|
||||||
|
base64MaxBytes: undefined, // Use default limit
|
||||||
|
abortSignal: timeoutController.signal,
|
||||||
|
})
|
||||||
|
} finally {
|
||||||
|
timeoutController.cleanup()
|
||||||
|
}
|
||||||
|
|
||||||
|
if (
|
||||||
|
result.status === 'cancelled' &&
|
||||||
|
timeoutController.isTimedOut() &&
|
||||||
|
timeoutController.timeoutMs
|
||||||
|
) {
|
||||||
|
const timeoutErrorMessage = getTimeoutErrorMessage(null, timeoutController.timeoutMs)
|
||||||
|
logger.info('Resume execution timed out', {
|
||||||
|
resumeExecutionId,
|
||||||
|
timeoutMs: timeoutController.timeoutMs,
|
||||||
|
})
|
||||||
|
await loggingSession.markAsFailed(timeoutErrorMessage)
|
||||||
|
}
|
||||||
|
|
||||||
|
return result
|
||||||
}
|
}
|
||||||
|
|
||||||
private static async markResumeCompleted(args: {
|
private static async markResumeCompleted(args: {
|
||||||
|
|||||||
@@ -1,4 +1,5 @@
|
|||||||
import { createLogger } from '@sim/logger'
|
import { createLogger } from '@sim/logger'
|
||||||
|
import { createTimeoutAbortController, getTimeoutErrorMessage } from '@/lib/core/execution-limits'
|
||||||
import {
|
import {
|
||||||
extractBlockIdFromOutputId,
|
extractBlockIdFromOutputId,
|
||||||
extractPathFromOutputId,
|
extractPathFromOutputId,
|
||||||
@@ -32,6 +33,7 @@ export interface StreamingConfig {
|
|||||||
workflowTriggerType?: 'api' | 'chat'
|
workflowTriggerType?: 'api' | 'chat'
|
||||||
includeFileBase64?: boolean
|
includeFileBase64?: boolean
|
||||||
base64MaxBytes?: number
|
base64MaxBytes?: number
|
||||||
|
timeoutMs?: number
|
||||||
}
|
}
|
||||||
|
|
||||||
export interface StreamingResponseOptions {
|
export interface StreamingResponseOptions {
|
||||||
@@ -169,6 +171,7 @@ export async function createStreamingResponse(
|
|||||||
options: StreamingResponseOptions
|
options: StreamingResponseOptions
|
||||||
): Promise<ReadableStream> {
|
): Promise<ReadableStream> {
|
||||||
const { requestId, workflow, input, executingUserId, streamConfig, executionId } = options
|
const { requestId, workflow, input, executingUserId, streamConfig, executionId } = options
|
||||||
|
const timeoutController = createTimeoutAbortController(streamConfig.timeoutMs)
|
||||||
|
|
||||||
return new ReadableStream({
|
return new ReadableStream({
|
||||||
async start(controller) {
|
async start(controller) {
|
||||||
@@ -284,6 +287,7 @@ export async function createStreamingResponse(
|
|||||||
skipLoggingComplete: true,
|
skipLoggingComplete: true,
|
||||||
includeFileBase64: streamConfig.includeFileBase64,
|
includeFileBase64: streamConfig.includeFileBase64,
|
||||||
base64MaxBytes: streamConfig.base64MaxBytes,
|
base64MaxBytes: streamConfig.base64MaxBytes,
|
||||||
|
abortSignal: timeoutController.signal,
|
||||||
},
|
},
|
||||||
executionId
|
executionId
|
||||||
)
|
)
|
||||||
@@ -293,18 +297,34 @@ export async function createStreamingResponse(
|
|||||||
processStreamingBlockLogs(result.logs, state.streamedContent)
|
processStreamingBlockLogs(result.logs, state.streamedContent)
|
||||||
}
|
}
|
||||||
|
|
||||||
await completeLoggingSession(result)
|
if (
|
||||||
|
result.status === 'cancelled' &&
|
||||||
|
timeoutController.isTimedOut() &&
|
||||||
|
timeoutController.timeoutMs
|
||||||
|
) {
|
||||||
|
const timeoutErrorMessage = getTimeoutErrorMessage(null, timeoutController.timeoutMs)
|
||||||
|
logger.info(`[${requestId}] Streaming execution timed out`, {
|
||||||
|
timeoutMs: timeoutController.timeoutMs,
|
||||||
|
})
|
||||||
|
if (result._streamingMetadata?.loggingSession) {
|
||||||
|
await result._streamingMetadata.loggingSession.markAsFailed(timeoutErrorMessage)
|
||||||
|
}
|
||||||
|
controller.enqueue(encodeSSE({ event: 'error', error: timeoutErrorMessage }))
|
||||||
|
} else {
|
||||||
|
await completeLoggingSession(result)
|
||||||
|
|
||||||
const minimalResult = await buildMinimalResult(
|
const minimalResult = await buildMinimalResult(
|
||||||
result,
|
result,
|
||||||
streamConfig.selectedOutputs,
|
streamConfig.selectedOutputs,
|
||||||
state.streamedContent,
|
state.streamedContent,
|
||||||
requestId,
|
requestId,
|
||||||
streamConfig.includeFileBase64 ?? true,
|
streamConfig.includeFileBase64 ?? true,
|
||||||
streamConfig.base64MaxBytes
|
streamConfig.base64MaxBytes
|
||||||
)
|
)
|
||||||
|
|
||||||
|
controller.enqueue(encodeSSE({ event: 'final', data: minimalResult }))
|
||||||
|
}
|
||||||
|
|
||||||
controller.enqueue(encodeSSE({ event: 'final', data: minimalResult }))
|
|
||||||
controller.enqueue(encodeSSE('[DONE]'))
|
controller.enqueue(encodeSSE('[DONE]'))
|
||||||
|
|
||||||
if (executionId) {
|
if (executionId) {
|
||||||
@@ -323,6 +343,20 @@ export async function createStreamingResponse(
|
|||||||
}
|
}
|
||||||
|
|
||||||
controller.close()
|
controller.close()
|
||||||
|
} finally {
|
||||||
|
timeoutController.cleanup()
|
||||||
|
}
|
||||||
|
},
|
||||||
|
async cancel(reason) {
|
||||||
|
logger.info(`[${requestId}] Streaming response cancelled`, { reason })
|
||||||
|
timeoutController.abort()
|
||||||
|
timeoutController.cleanup()
|
||||||
|
if (executionId) {
|
||||||
|
try {
|
||||||
|
await cleanupExecutionBase64Cache(executionId)
|
||||||
|
} catch (error) {
|
||||||
|
logger.error(`[${requestId}] Failed to cleanup base64 cache`, { error })
|
||||||
|
}
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
|
|||||||
@@ -1,8 +1,9 @@
|
|||||||
|
import { DEFAULT_EXECUTION_TIMEOUT_MS } from '@/lib/core/execution-limits'
|
||||||
import type { RunActorParams, RunActorResult } from '@/tools/apify/types'
|
import type { RunActorParams, RunActorResult } from '@/tools/apify/types'
|
||||||
import type { ToolConfig } from '@/tools/types'
|
import type { ToolConfig } from '@/tools/types'
|
||||||
|
|
||||||
const POLL_INTERVAL_MS = 5000 // 5 seconds between polls
|
const POLL_INTERVAL_MS = 5000
|
||||||
const MAX_POLL_TIME_MS = 300000 // 5 minutes maximum polling time
|
const MAX_POLL_TIME_MS = DEFAULT_EXECUTION_TIMEOUT_MS
|
||||||
|
|
||||||
export const apifyRunActorAsyncTool: ToolConfig<RunActorParams, RunActorResult> = {
|
export const apifyRunActorAsyncTool: ToolConfig<RunActorParams, RunActorResult> = {
|
||||||
id: 'apify_run_actor_async',
|
id: 'apify_run_actor_async',
|
||||||
|
|||||||
@@ -1,11 +1,12 @@
|
|||||||
import { createLogger } from '@sim/logger'
|
import { createLogger } from '@sim/logger'
|
||||||
|
import { getMaxExecutionTimeout } from '@/lib/core/execution-limits'
|
||||||
import type { BrowserUseRunTaskParams, BrowserUseRunTaskResponse } from '@/tools/browser_use/types'
|
import type { BrowserUseRunTaskParams, BrowserUseRunTaskResponse } from '@/tools/browser_use/types'
|
||||||
import type { ToolConfig, ToolResponse } from '@/tools/types'
|
import type { ToolConfig, ToolResponse } from '@/tools/types'
|
||||||
|
|
||||||
const logger = createLogger('BrowserUseTool')
|
const logger = createLogger('BrowserUseTool')
|
||||||
|
|
||||||
const POLL_INTERVAL_MS = 5000
|
const POLL_INTERVAL_MS = 5000
|
||||||
const MAX_POLL_TIME_MS = 600000 // 10 minutes
|
const MAX_POLL_TIME_MS = getMaxExecutionTimeout()
|
||||||
const MAX_CONSECUTIVE_ERRORS = 3
|
const MAX_CONSECUTIVE_ERRORS = 3
|
||||||
|
|
||||||
async function createSessionWithProfile(
|
async function createSessionWithProfile(
|
||||||
|
|||||||
@@ -1,16 +1,7 @@
|
|||||||
|
import { httpHeaderSafeJson } from '@/lib/core/utils/validation'
|
||||||
import type { DropboxDownloadParams, DropboxDownloadResponse } from '@/tools/dropbox/types'
|
import type { DropboxDownloadParams, DropboxDownloadResponse } from '@/tools/dropbox/types'
|
||||||
import type { ToolConfig } from '@/tools/types'
|
import type { ToolConfig } from '@/tools/types'
|
||||||
|
|
||||||
/**
|
|
||||||
* Escapes non-ASCII characters in JSON string for HTTP header safety.
|
|
||||||
* Dropbox API requires characters 0x7F and all non-ASCII to be escaped as \uXXXX.
|
|
||||||
*/
|
|
||||||
function httpHeaderSafeJson(value: object): string {
|
|
||||||
return JSON.stringify(value).replace(/[\u007f-\uffff]/g, (c) => {
|
|
||||||
return '\\u' + ('0000' + c.charCodeAt(0).toString(16)).slice(-4)
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
export const dropboxDownloadTool: ToolConfig<DropboxDownloadParams, DropboxDownloadResponse> = {
|
export const dropboxDownloadTool: ToolConfig<DropboxDownloadParams, DropboxDownloadResponse> = {
|
||||||
id: 'dropbox_download',
|
id: 'dropbox_download',
|
||||||
name: 'Dropbox Download File',
|
name: 'Dropbox Download File',
|
||||||
|
|||||||
@@ -1,11 +1,12 @@
|
|||||||
import { createLogger } from '@sim/logger'
|
import { createLogger } from '@sim/logger'
|
||||||
|
import { DEFAULT_EXECUTION_TIMEOUT_MS } from '@/lib/core/execution-limits'
|
||||||
import type { ExaResearchParams, ExaResearchResponse } from '@/tools/exa/types'
|
import type { ExaResearchParams, ExaResearchResponse } from '@/tools/exa/types'
|
||||||
import type { ToolConfig } from '@/tools/types'
|
import type { ToolConfig } from '@/tools/types'
|
||||||
|
|
||||||
const logger = createLogger('ExaResearchTool')
|
const logger = createLogger('ExaResearchTool')
|
||||||
|
|
||||||
const POLL_INTERVAL_MS = 5000 // 5 seconds between polls
|
const POLL_INTERVAL_MS = 5000
|
||||||
const MAX_POLL_TIME_MS = 300000 // 5 minutes maximum polling time
|
const MAX_POLL_TIME_MS = DEFAULT_EXECUTION_TIMEOUT_MS
|
||||||
|
|
||||||
export const researchTool: ToolConfig<ExaResearchParams, ExaResearchResponse> = {
|
export const researchTool: ToolConfig<ExaResearchParams, ExaResearchResponse> = {
|
||||||
id: 'exa_research',
|
id: 'exa_research',
|
||||||
|
|||||||
@@ -1,11 +1,12 @@
|
|||||||
import { createLogger } from '@sim/logger'
|
import { createLogger } from '@sim/logger'
|
||||||
|
import { DEFAULT_EXECUTION_TIMEOUT_MS } from '@/lib/core/execution-limits'
|
||||||
import type { AgentParams, AgentResponse } from '@/tools/firecrawl/types'
|
import type { AgentParams, AgentResponse } from '@/tools/firecrawl/types'
|
||||||
import type { ToolConfig } from '@/tools/types'
|
import type { ToolConfig } from '@/tools/types'
|
||||||
|
|
||||||
const logger = createLogger('FirecrawlAgentTool')
|
const logger = createLogger('FirecrawlAgentTool')
|
||||||
|
|
||||||
const POLL_INTERVAL_MS = 5000 // 5 seconds between polls
|
const POLL_INTERVAL_MS = 5000
|
||||||
const MAX_POLL_TIME_MS = 300000 // 5 minutes maximum polling time
|
const MAX_POLL_TIME_MS = DEFAULT_EXECUTION_TIMEOUT_MS
|
||||||
|
|
||||||
export const agentTool: ToolConfig<AgentParams, AgentResponse> = {
|
export const agentTool: ToolConfig<AgentParams, AgentResponse> = {
|
||||||
id: 'firecrawl_agent',
|
id: 'firecrawl_agent',
|
||||||
|
|||||||
@@ -1,12 +1,13 @@
|
|||||||
import { createLogger } from '@sim/logger'
|
import { createLogger } from '@sim/logger'
|
||||||
|
import { DEFAULT_EXECUTION_TIMEOUT_MS } from '@/lib/core/execution-limits'
|
||||||
import type { FirecrawlCrawlParams, FirecrawlCrawlResponse } from '@/tools/firecrawl/types'
|
import type { FirecrawlCrawlParams, FirecrawlCrawlResponse } from '@/tools/firecrawl/types'
|
||||||
import { CRAWLED_PAGE_OUTPUT_PROPERTIES } from '@/tools/firecrawl/types'
|
import { CRAWLED_PAGE_OUTPUT_PROPERTIES } from '@/tools/firecrawl/types'
|
||||||
import type { ToolConfig } from '@/tools/types'
|
import type { ToolConfig } from '@/tools/types'
|
||||||
|
|
||||||
const logger = createLogger('FirecrawlCrawlTool')
|
const logger = createLogger('FirecrawlCrawlTool')
|
||||||
|
|
||||||
const POLL_INTERVAL_MS = 5000 // 5 seconds between polls
|
const POLL_INTERVAL_MS = 5000
|
||||||
const MAX_POLL_TIME_MS = 300000 // 5 minutes maximum polling time
|
const MAX_POLL_TIME_MS = DEFAULT_EXECUTION_TIMEOUT_MS
|
||||||
|
|
||||||
export const crawlTool: ToolConfig<FirecrawlCrawlParams, FirecrawlCrawlResponse> = {
|
export const crawlTool: ToolConfig<FirecrawlCrawlParams, FirecrawlCrawlResponse> = {
|
||||||
id: 'firecrawl_crawl',
|
id: 'firecrawl_crawl',
|
||||||
|
|||||||
@@ -1,11 +1,12 @@
|
|||||||
import { createLogger } from '@sim/logger'
|
import { createLogger } from '@sim/logger'
|
||||||
|
import { DEFAULT_EXECUTION_TIMEOUT_MS } from '@/lib/core/execution-limits'
|
||||||
import type { ExtractParams, ExtractResponse } from '@/tools/firecrawl/types'
|
import type { ExtractParams, ExtractResponse } from '@/tools/firecrawl/types'
|
||||||
import type { ToolConfig } from '@/tools/types'
|
import type { ToolConfig } from '@/tools/types'
|
||||||
|
|
||||||
const logger = createLogger('FirecrawlExtractTool')
|
const logger = createLogger('FirecrawlExtractTool')
|
||||||
|
|
||||||
const POLL_INTERVAL_MS = 5000 // 5 seconds between polls
|
const POLL_INTERVAL_MS = 5000
|
||||||
const MAX_POLL_TIME_MS = 300000 // 5 minutes maximum polling time
|
const MAX_POLL_TIME_MS = DEFAULT_EXECUTION_TIMEOUT_MS
|
||||||
|
|
||||||
export const extractTool: ToolConfig<ExtractParams, ExtractResponse> = {
|
export const extractTool: ToolConfig<ExtractParams, ExtractResponse> = {
|
||||||
id: 'firecrawl_extract',
|
id: 'firecrawl_extract',
|
||||||
|
|||||||
@@ -1,5 +1,6 @@
|
|||||||
import { createLogger } from '@sim/logger'
|
import { createLogger } from '@sim/logger'
|
||||||
import { generateInternalToken } from '@/lib/auth/internal'
|
import { generateInternalToken } from '@/lib/auth/internal'
|
||||||
|
import { DEFAULT_EXECUTION_TIMEOUT_MS } from '@/lib/core/execution-limits'
|
||||||
import {
|
import {
|
||||||
secureFetchWithPinnedIP,
|
secureFetchWithPinnedIP,
|
||||||
validateUrlWithDNS,
|
validateUrlWithDNS,
|
||||||
@@ -628,9 +629,8 @@ async function executeToolRequest(
|
|||||||
let response: Response
|
let response: Response
|
||||||
|
|
||||||
if (isInternalRoute) {
|
if (isInternalRoute) {
|
||||||
// Set up AbortController for timeout support on internal routes
|
|
||||||
const controller = new AbortController()
|
const controller = new AbortController()
|
||||||
const timeout = requestParams.timeout || 300000
|
const timeout = requestParams.timeout || DEFAULT_EXECUTION_TIMEOUT_MS
|
||||||
const timeoutId = setTimeout(() => controller.abort(), timeout)
|
const timeoutId = setTimeout(() => controller.abort(), timeout)
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
|||||||
@@ -1,4 +1,5 @@
|
|||||||
import { createLogger } from '@sim/logger'
|
import { createLogger } from '@sim/logger'
|
||||||
|
import { getMaxExecutionTimeout } from '@/lib/core/execution-limits'
|
||||||
import { getBaseUrl } from '@/lib/core/utils/urls'
|
import { getBaseUrl } from '@/lib/core/utils/urls'
|
||||||
import { AGENT, isCustomTool } from '@/executor/constants'
|
import { AGENT, isCustomTool } from '@/executor/constants'
|
||||||
import { getCustomTool } from '@/hooks/queries/custom-tools'
|
import { getCustomTool } from '@/hooks/queries/custom-tools'
|
||||||
@@ -122,9 +123,7 @@ export function formatRequestParams(tool: ToolConfig, params: Record<string, any
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get timeout from params (if specified) and validate
|
const MAX_TIMEOUT_MS = getMaxExecutionTimeout()
|
||||||
// Must be a finite positive number, max 600000ms (10 minutes) as documented
|
|
||||||
const MAX_TIMEOUT_MS = 600000
|
|
||||||
const rawTimeout = params.timeout
|
const rawTimeout = params.timeout
|
||||||
const timeout = rawTimeout != null ? Number(rawTimeout) : undefined
|
const timeout = rawTimeout != null ? Number(rawTimeout) : undefined
|
||||||
const validTimeout =
|
const validTimeout =
|
||||||
|
|||||||
@@ -6,7 +6,7 @@ export default defineConfig({
|
|||||||
project: env.TRIGGER_PROJECT_ID!,
|
project: env.TRIGGER_PROJECT_ID!,
|
||||||
runtime: 'node',
|
runtime: 'node',
|
||||||
logLevel: 'log',
|
logLevel: 'log',
|
||||||
maxDuration: 600,
|
maxDuration: 5400,
|
||||||
retries: {
|
retries: {
|
||||||
enabledInDev: false,
|
enabledInDev: false,
|
||||||
default: {
|
default: {
|
||||||
|
|||||||
Reference in New Issue
Block a user