diff --git a/apps/sim/app/api/workflows/[id]/execute/route.ts b/apps/sim/app/api/workflows/[id]/execute/route.ts index d372e3e26..b89611b17 100644 --- a/apps/sim/app/api/workflows/[id]/execute/route.ts +++ b/apps/sim/app/api/workflows/[id]/execute/route.ts @@ -517,8 +517,6 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id: cachedWorkflowData?.blocks || {} ) const streamVariables = cachedWorkflowData?.variables ?? (workflow as any).variables - const streamingTimeout = preprocessResult.executionTimeout?.sync - const stream = await createStreamingResponse({ requestId, workflow: { @@ -536,7 +534,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id: workflowTriggerType: triggerType === 'chat' ? 'chat' : 'api', includeFileBase64, base64MaxBytes, - abortSignal: streamingTimeout ? AbortSignal.timeout(streamingTimeout) : undefined, + timeoutMs: preprocessResult.executionTimeout?.sync, }, executionId, }) diff --git a/apps/sim/lib/workflows/streaming/streaming.ts b/apps/sim/lib/workflows/streaming/streaming.ts index b990d53c4..65b0e67ac 100644 --- a/apps/sim/lib/workflows/streaming/streaming.ts +++ b/apps/sim/lib/workflows/streaming/streaming.ts @@ -1,4 +1,5 @@ import { createLogger } from '@sim/logger' +import { getTimeoutErrorMessage } from '@/lib/core/execution-limits' import { extractBlockIdFromOutputId, extractPathFromOutputId, @@ -32,7 +33,7 @@ export interface StreamingConfig { workflowTriggerType?: 'api' | 'chat' includeFileBase64?: boolean base64MaxBytes?: number - abortSignal?: AbortSignal + timeoutMs?: number } export interface StreamingResponseOptions { @@ -269,6 +270,18 @@ export async function createStreamingResponse( } } + const timeoutMs = streamConfig.timeoutMs + const abortController = new AbortController() + let isTimedOut = false + let timeoutId: NodeJS.Timeout | undefined + + if (timeoutMs) { + timeoutId = setTimeout(() => { + isTimedOut = true + abortController.abort() + }, timeoutMs) + } + try { const result = await executeWorkflow( workflow, @@ -285,7 +298,7 @@ export async function createStreamingResponse( skipLoggingComplete: true, includeFileBase64: streamConfig.includeFileBase64, base64MaxBytes: streamConfig.base64MaxBytes, - abortSignal: streamConfig.abortSignal, + abortSignal: abortController.signal, }, executionId ) @@ -295,18 +308,28 @@ export async function createStreamingResponse( processStreamingBlockLogs(result.logs, state.streamedContent) } - await completeLoggingSession(result) + if (result.status === 'cancelled' && isTimedOut && timeoutMs) { + const timeoutErrorMessage = getTimeoutErrorMessage(null, timeoutMs) + logger.info(`[${requestId}] Streaming execution timed out`, { timeoutMs }) + if (result._streamingMetadata?.loggingSession) { + await result._streamingMetadata.loggingSession.markAsFailed(timeoutErrorMessage) + } + controller.enqueue(encodeSSE({ event: 'error', error: timeoutErrorMessage })) + } else { + await completeLoggingSession(result) - const minimalResult = await buildMinimalResult( - result, - streamConfig.selectedOutputs, - state.streamedContent, - requestId, - streamConfig.includeFileBase64 ?? true, - streamConfig.base64MaxBytes - ) + const minimalResult = await buildMinimalResult( + result, + streamConfig.selectedOutputs, + state.streamedContent, + requestId, + streamConfig.includeFileBase64 ?? true, + streamConfig.base64MaxBytes + ) + + controller.enqueue(encodeSSE({ event: 'final', data: minimalResult })) + } - controller.enqueue(encodeSSE({ event: 'final', data: minimalResult })) controller.enqueue(encodeSSE('[DONE]')) if (executionId) { @@ -325,6 +348,8 @@ export async function createStreamingResponse( } controller.close() + } finally { + if (timeoutId) clearTimeout(timeoutId) } }, })