support streaming and async paths'

This commit is contained in:
Vikhyath Mondreti
2026-02-03 18:22:08 -08:00
parent d2e4afd15b
commit c332efd1e4
10 changed files with 153 additions and 58 deletions

View File

@@ -217,16 +217,16 @@ Different subscription plans have different usage limits:
Workflows have maximum execution time limits based on your subscription plan:
| Plan | Sync Execution Limit |
|------|---------------------|
| **Free** | 5 minutes |
| **Pro** | 60 minutes |
| **Team** | 60 minutes |
| **Enterprise** | 60 minutes |
| Plan | Sync Execution | Async Execution |
|------|----------------|-----------------|
| **Free** | 5 minutes | 10 minutes |
| **Pro** | 60 minutes | 90 minutes |
| **Team** | 60 minutes | 90 minutes |
| **Enterprise** | 60 minutes | 90 minutes |
**Sync executions** run immediately and return results directly. These are triggered via the API with `async: false` (default) or through the UI.
**Async executions** (triggered via API with `async: true`, webhooks, or schedules) run in the background with a 90-minute time limit for all plans.
**Async executions** (triggered via API with `async: true`, webhooks, or schedules) run in the background. Async time limits are 2x the sync limit, capped at 90 minutes.
<Callout type="info">
If a workflow exceeds its time limit, it will be terminated and marked as failed with a timeout error. Design long-running workflows to use async execution or break them into smaller workflows.

View File

@@ -1,7 +1,7 @@
import { createLogger } from '@sim/logger'
import { type NextRequest, NextResponse } from 'next/server'
import { checkInternalAuth } from '@/lib/auth/hybrid'
import { DEFAULT_EXECUTION_TIMEOUT_MS } from '@/lib/core/execution-limits'
import { getMaxExecutionTimeout } from '@/lib/core/execution-limits'
import { downloadFileFromStorage } from '@/lib/uploads/utils/file-utils.server'
import type { UserFile } from '@/executor/types'
import type { VideoRequestBody } from '@/tools/video/types'
@@ -328,7 +328,7 @@ async function generateWithRunway(
logger.info(`[${requestId}] Runway task created: ${taskId}`)
const pollIntervalMs = 5000
const maxAttempts = Math.ceil(DEFAULT_EXECUTION_TIMEOUT_MS / pollIntervalMs)
const maxAttempts = Math.ceil(getMaxExecutionTimeout() / pollIntervalMs)
let attempts = 0
while (attempts < maxAttempts) {
@@ -372,7 +372,7 @@ async function generateWithRunway(
attempts++
}
throw new Error('Runway generation timed out after 10 minutes')
throw new Error('Runway generation timed out')
}
async function generateWithVeo(
@@ -432,7 +432,7 @@ async function generateWithVeo(
logger.info(`[${requestId}] Veo operation created: ${operationName}`)
const pollIntervalMs = 5000
const maxAttempts = Math.ceil(DEFAULT_EXECUTION_TIMEOUT_MS / pollIntervalMs)
const maxAttempts = Math.ceil(getMaxExecutionTimeout() / pollIntervalMs)
let attempts = 0
while (attempts < maxAttempts) {
@@ -488,7 +488,7 @@ async function generateWithVeo(
attempts++
}
throw new Error('Veo generation timed out after 5 minutes')
throw new Error('Veo generation timed out')
}
async function generateWithLuma(
@@ -545,7 +545,7 @@ async function generateWithLuma(
logger.info(`[${requestId}] Luma generation created: ${generationId}`)
const pollIntervalMs = 5000
const maxAttempts = Math.ceil(DEFAULT_EXECUTION_TIMEOUT_MS / pollIntervalMs)
const maxAttempts = Math.ceil(getMaxExecutionTimeout() / pollIntervalMs)
let attempts = 0
while (attempts < maxAttempts) {
@@ -596,7 +596,7 @@ async function generateWithLuma(
attempts++
}
throw new Error('Luma generation timed out after 10 minutes')
throw new Error('Luma generation timed out')
}
async function generateWithMiniMax(
@@ -663,7 +663,7 @@ async function generateWithMiniMax(
logger.info(`[${requestId}] MiniMax task created: ${taskId}`)
const pollIntervalMs = 5000
const maxAttempts = Math.ceil(DEFAULT_EXECUTION_TIMEOUT_MS / pollIntervalMs)
const maxAttempts = Math.ceil(getMaxExecutionTimeout() / pollIntervalMs)
let attempts = 0
while (attempts < maxAttempts) {
@@ -746,7 +746,7 @@ async function generateWithMiniMax(
attempts++
}
throw new Error('MiniMax generation timed out after 10 minutes')
throw new Error('MiniMax generation timed out')
}
// Helper function to strip subpaths from Fal.ai model IDs for status/result endpoints
@@ -865,7 +865,7 @@ async function generateWithFalAI(
const baseModelId = getBaseModelId(falModelId)
const pollIntervalMs = 5000
const maxAttempts = Math.ceil(DEFAULT_EXECUTION_TIMEOUT_MS / pollIntervalMs)
const maxAttempts = Math.ceil(getMaxExecutionTimeout() / pollIntervalMs)
let attempts = 0
while (attempts < maxAttempts) {
@@ -942,7 +942,7 @@ async function generateWithFalAI(
attempts++
}
throw new Error('Fal.ai generation timed out after 8 minutes')
throw new Error('Fal.ai generation timed out')
}
function getVideoDimensions(

View File

@@ -517,6 +517,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
cachedWorkflowData?.blocks || {}
)
const streamVariables = cachedWorkflowData?.variables ?? (workflow as any).variables
const streamingTimeout = preprocessResult.executionTimeout?.sync
const stream = await createStreamingResponse({
requestId,
@@ -535,6 +536,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
workflowTriggerType: triggerType === 'chat' ? 'chat' : 'api',
includeFileBase64,
base64MaxBytes,
abortSignal: streamingTimeout ? AbortSignal.timeout(streamingTimeout) : undefined,
},
executionId,
})

View File

@@ -4,6 +4,7 @@ import { task } from '@trigger.dev/sdk'
import { Cron } from 'croner'
import { eq } from 'drizzle-orm'
import { v4 as uuidv4 } from 'uuid'
import { getTimeoutErrorMessage } from '@/lib/core/execution-limits'
import { preprocessExecution } from '@/lib/execution/preprocessing'
import { LoggingSession } from '@/lib/logs/execution/logging-session'
import { buildTraceSpans } from '@/lib/logs/execution/trace-spans/trace-spans'
@@ -120,6 +121,7 @@ async function runWorkflowExecution({
loggingSession,
requestId,
executionId,
asyncTimeout,
}: {
payload: ScheduleExecutionPayload
workflowRecord: WorkflowRecord
@@ -127,6 +129,7 @@ async function runWorkflowExecution({
loggingSession: LoggingSession
requestId: string
executionId: string
asyncTimeout?: number
}): Promise<RunWorkflowResult> {
try {
logger.debug(`[${requestId}] Loading deployed workflow ${payload.workflowId}`)
@@ -181,15 +184,38 @@ async function runWorkflowExecution({
[]
)
const executionResult = await executeWorkflowCore({
snapshot,
callbacks: {},
loggingSession,
includeFileBase64: true,
base64MaxBytes: undefined,
})
const abortController = new AbortController()
let isTimedOut = false
let timeoutId: NodeJS.Timeout | undefined
if (executionResult.status === 'paused') {
if (asyncTimeout) {
timeoutId = setTimeout(() => {
isTimedOut = true
abortController.abort()
}, asyncTimeout)
}
let executionResult
try {
executionResult = await executeWorkflowCore({
snapshot,
callbacks: {},
loggingSession,
includeFileBase64: true,
base64MaxBytes: undefined,
abortSignal: abortController.signal,
})
} finally {
if (timeoutId) clearTimeout(timeoutId)
}
if (executionResult.status === 'cancelled' && isTimedOut && asyncTimeout) {
const timeoutErrorMessage = getTimeoutErrorMessage(null, asyncTimeout)
logger.info(`[${requestId}] Scheduled workflow execution timed out`, {
timeoutMs: asyncTimeout,
})
await loggingSession.markAsFailed(timeoutErrorMessage)
} else if (executionResult.status === 'paused') {
if (!executionResult.snapshotSeed) {
logger.error(`[${requestId}] Missing snapshot seed for paused execution`, {
executionId,
@@ -453,6 +479,7 @@ export async function executeScheduleJob(payload: ScheduleExecutionPayload) {
loggingSession,
requestId,
executionId,
asyncTimeout: preprocessResult.executionTimeout?.async,
})
if (executionResult.status === 'skip') {

View File

@@ -4,7 +4,10 @@ import { createLogger } from '@sim/logger'
import { task } from '@trigger.dev/sdk'
import { eq } from 'drizzle-orm'
import { v4 as uuidv4 } from 'uuid'
import { getHighestPrioritySubscription } from '@/lib/billing'
import { getExecutionTimeout, getTimeoutErrorMessage } from '@/lib/core/execution-limits'
import { IdempotencyService, webhookIdempotency } from '@/lib/core/idempotency'
import type { SubscriptionPlan } from '@/lib/core/rate-limiter/types'
import { processExecutionFiles } from '@/lib/execution/files'
import { LoggingSession } from '@/lib/logs/execution/logging-session'
import { buildTraceSpans } from '@/lib/logs/execution/trace-spans/trace-spans'
@@ -134,7 +137,22 @@ async function executeWebhookJobInternal(
requestId
)
// Track deploymentVersionId at function scope so it's available in catch block
const userSubscription = await getHighestPrioritySubscription(payload.userId)
const asyncTimeout = getExecutionTimeout(
userSubscription?.plan as SubscriptionPlan | undefined,
'async'
)
const abortController = new AbortController()
let isTimedOut = false
let timeoutId: NodeJS.Timeout | undefined
if (asyncTimeout) {
timeoutId = setTimeout(() => {
isTimedOut = true
abortController.abort()
}, asyncTimeout)
}
let deploymentVersionId: string | undefined
try {
@@ -241,11 +259,18 @@ async function executeWebhookJobInternal(
snapshot,
callbacks: {},
loggingSession,
includeFileBase64: true, // Enable base64 hydration
base64MaxBytes: undefined, // Use default limit
includeFileBase64: true,
base64MaxBytes: undefined,
abortSignal: abortController.signal,
})
if (executionResult.status === 'paused') {
if (executionResult.status === 'cancelled' && isTimedOut && asyncTimeout) {
const timeoutErrorMessage = getTimeoutErrorMessage(null, asyncTimeout)
logger.info(`[${requestId}] Airtable webhook execution timed out`, {
timeoutMs: asyncTimeout,
})
await loggingSession.markAsFailed(timeoutErrorMessage)
} else if (executionResult.status === 'paused') {
if (!executionResult.snapshotSeed) {
logger.error(`[${requestId}] Missing snapshot seed for paused execution`, {
executionId,
@@ -497,9 +522,14 @@ async function executeWebhookJobInternal(
callbacks: {},
loggingSession,
includeFileBase64: true,
abortSignal: abortController.signal,
})
if (executionResult.status === 'paused') {
if (executionResult.status === 'cancelled' && isTimedOut && asyncTimeout) {
const timeoutErrorMessage = getTimeoutErrorMessage(null, asyncTimeout)
logger.info(`[${requestId}] Webhook execution timed out`, { timeoutMs: asyncTimeout })
await loggingSession.markAsFailed(timeoutErrorMessage)
} else if (executionResult.status === 'paused') {
if (!executionResult.snapshotSeed) {
logger.error(`[${requestId}] Missing snapshot seed for paused execution`, {
executionId,
@@ -601,6 +631,8 @@ async function executeWebhookJobInternal(
}
throw error
} finally {
if (timeoutId) clearTimeout(timeoutId)
}
}

View File

@@ -1,6 +1,7 @@
import { createLogger } from '@sim/logger'
import { task } from '@trigger.dev/sdk'
import { v4 as uuidv4 } from 'uuid'
import { getTimeoutErrorMessage } from '@/lib/core/execution-limits'
import { preprocessExecution } from '@/lib/execution/preprocessing'
import { LoggingSession } from '@/lib/logs/execution/logging-session'
import { buildTraceSpans } from '@/lib/logs/execution/trace-spans/trace-spans'
@@ -103,15 +104,37 @@ export async function executeWorkflowJob(payload: WorkflowExecutionPayload) {
[]
)
const result = await executeWorkflowCore({
snapshot,
callbacks: {},
loggingSession,
includeFileBase64: true,
base64MaxBytes: undefined,
})
const asyncTimeout = preprocessResult.executionTimeout?.async
const abortController = new AbortController()
let isTimedOut = false
let timeoutId: NodeJS.Timeout | undefined
if (result.status === 'paused') {
if (asyncTimeout) {
timeoutId = setTimeout(() => {
isTimedOut = true
abortController.abort()
}, asyncTimeout)
}
let result
try {
result = await executeWorkflowCore({
snapshot,
callbacks: {},
loggingSession,
includeFileBase64: true,
base64MaxBytes: undefined,
abortSignal: abortController.signal,
})
} finally {
if (timeoutId) clearTimeout(timeoutId)
}
if (result.status === 'cancelled' && isTimedOut && asyncTimeout) {
const timeoutErrorMessage = getTimeoutErrorMessage(null, asyncTimeout)
logger.info(`[${requestId}] Workflow execution timed out`, { timeoutMs: asyncTimeout })
await loggingSession.markAsFailed(timeoutErrorMessage)
} else if (result.status === 'paused') {
if (!result.snapshotSeed) {
logger.error(`[${requestId}] Missing snapshot seed for paused execution`, {
executionId,

View File

@@ -6,14 +6,15 @@ interface ExecutionTimeoutConfig {
async: number
}
const DEFAULT_SYNC_TIMEOUTS = {
const DEFAULT_SYNC_TIMEOUTS_SECONDS = {
free: 300,
pro: 3600,
team: 3600,
enterprise: 3600,
} as const
const ASYNC_TIMEOUT_SECONDS = 5400
const ASYNC_MULTIPLIER = 2
const MAX_ASYNC_TIMEOUT_SECONDS = 5400
function getSyncTimeoutForPlan(plan: SubscriptionPlan): number {
const envVarMap: Record<SubscriptionPlan, string | undefined> = {
@@ -22,25 +23,32 @@ function getSyncTimeoutForPlan(plan: SubscriptionPlan): number {
team: env.EXECUTION_TIMEOUT_TEAM,
enterprise: env.EXECUTION_TIMEOUT_ENTERPRISE,
}
return (Number.parseInt(envVarMap[plan] || '') || DEFAULT_SYNC_TIMEOUTS[plan]) * 1000
return (Number.parseInt(envVarMap[plan] || '') || DEFAULT_SYNC_TIMEOUTS_SECONDS[plan]) * 1000
}
function getAsyncTimeoutForPlan(plan: SubscriptionPlan): number {
const syncMs = getSyncTimeoutForPlan(plan)
const asyncMs = syncMs * ASYNC_MULTIPLIER
const maxAsyncMs = MAX_ASYNC_TIMEOUT_SECONDS * 1000
return Math.min(asyncMs, maxAsyncMs)
}
const EXECUTION_TIMEOUTS: Record<SubscriptionPlan, ExecutionTimeoutConfig> = {
free: {
sync: getSyncTimeoutForPlan('free'),
async: ASYNC_TIMEOUT_SECONDS * 1000,
async: getAsyncTimeoutForPlan('free'),
},
pro: {
sync: getSyncTimeoutForPlan('pro'),
async: ASYNC_TIMEOUT_SECONDS * 1000,
async: getAsyncTimeoutForPlan('pro'),
},
team: {
sync: getSyncTimeoutForPlan('team'),
async: ASYNC_TIMEOUT_SECONDS * 1000,
async: getAsyncTimeoutForPlan('team'),
},
enterprise: {
sync: getSyncTimeoutForPlan('enterprise'),
async: ASYNC_TIMEOUT_SECONDS * 1000,
async: getAsyncTimeoutForPlan('enterprise'),
},
}
@@ -58,18 +66,17 @@ export function getMaxExecutionTimeout(): number {
export const DEFAULT_EXECUTION_TIMEOUT_MS = EXECUTION_TIMEOUTS.free.sync
export function isTimeoutError(error: unknown): boolean {
if (!(error instanceof Error)) return false
if (!error) return false
const name = error.name.toLowerCase()
const message = error.message.toLowerCase()
if (error instanceof Error) {
return error.name === 'TimeoutError'
}
return (
name === 'timeouterror' ||
name === 'aborterror' ||
message.includes('timeout') ||
message.includes('timed out') ||
message.includes('aborted')
)
if (typeof error === 'object' && 'name' in error) {
return (error as { name: string }).name === 'TimeoutError'
}
return false
}
export function getTimeoutErrorMessage(error: unknown, timeoutMs?: number): string {

View File

@@ -19,6 +19,7 @@ export interface ExecuteWorkflowOptions {
skipLoggingComplete?: boolean
includeFileBase64?: boolean
base64MaxBytes?: number
abortSignal?: AbortSignal
}
export interface WorkflowInfo {
@@ -82,6 +83,7 @@ export async function executeWorkflow(
loggingSession,
includeFileBase64: streamConfig?.includeFileBase64,
base64MaxBytes: streamConfig?.base64MaxBytes,
abortSignal: streamConfig?.abortSignal,
})
if (result.status === 'paused') {

View File

@@ -32,6 +32,7 @@ export interface StreamingConfig {
workflowTriggerType?: 'api' | 'chat'
includeFileBase64?: boolean
base64MaxBytes?: number
abortSignal?: AbortSignal
}
export interface StreamingResponseOptions {
@@ -284,6 +285,7 @@ export async function createStreamingResponse(
skipLoggingComplete: true,
includeFileBase64: streamConfig.includeFileBase64,
base64MaxBytes: streamConfig.base64MaxBytes,
abortSignal: streamConfig.abortSignal,
},
executionId
)

View File

@@ -1,12 +1,12 @@
import { createLogger } from '@sim/logger'
import { DEFAULT_EXECUTION_TIMEOUT_MS } from '@/lib/core/execution-limits'
import { getMaxExecutionTimeout } from '@/lib/core/execution-limits'
import type { BrowserUseRunTaskParams, BrowserUseRunTaskResponse } from '@/tools/browser_use/types'
import type { ToolConfig, ToolResponse } from '@/tools/types'
const logger = createLogger('BrowserUseTool')
const POLL_INTERVAL_MS = 5000
const MAX_POLL_TIME_MS = DEFAULT_EXECUTION_TIMEOUT_MS
const MAX_POLL_TIME_MS = getMaxExecutionTimeout()
const MAX_CONSECUTIVE_ERRORS = 3
async function createSessionWithProfile(