fix streaming path

This commit is contained in:
Vikhyath Mondreti
2026-02-03 18:25:04 -08:00
parent c332efd1e4
commit 066850b65a
2 changed files with 38 additions and 15 deletions

View File

@@ -517,8 +517,6 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
cachedWorkflowData?.blocks || {}
)
const streamVariables = cachedWorkflowData?.variables ?? (workflow as any).variables
const streamingTimeout = preprocessResult.executionTimeout?.sync
const stream = await createStreamingResponse({
requestId,
workflow: {
@@ -536,7 +534,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
workflowTriggerType: triggerType === 'chat' ? 'chat' : 'api',
includeFileBase64,
base64MaxBytes,
abortSignal: streamingTimeout ? AbortSignal.timeout(streamingTimeout) : undefined,
timeoutMs: preprocessResult.executionTimeout?.sync,
},
executionId,
})

View File

@@ -1,4 +1,5 @@
import { createLogger } from '@sim/logger'
import { getTimeoutErrorMessage } from '@/lib/core/execution-limits'
import {
extractBlockIdFromOutputId,
extractPathFromOutputId,
@@ -32,7 +33,7 @@ export interface StreamingConfig {
workflowTriggerType?: 'api' | 'chat'
includeFileBase64?: boolean
base64MaxBytes?: number
abortSignal?: AbortSignal
timeoutMs?: number
}
export interface StreamingResponseOptions {
@@ -269,6 +270,18 @@ export async function createStreamingResponse(
}
}
const timeoutMs = streamConfig.timeoutMs
const abortController = new AbortController()
let isTimedOut = false
let timeoutId: NodeJS.Timeout | undefined
if (timeoutMs) {
timeoutId = setTimeout(() => {
isTimedOut = true
abortController.abort()
}, timeoutMs)
}
try {
const result = await executeWorkflow(
workflow,
@@ -285,7 +298,7 @@ export async function createStreamingResponse(
skipLoggingComplete: true,
includeFileBase64: streamConfig.includeFileBase64,
base64MaxBytes: streamConfig.base64MaxBytes,
abortSignal: streamConfig.abortSignal,
abortSignal: abortController.signal,
},
executionId
)
@@ -295,18 +308,28 @@ export async function createStreamingResponse(
processStreamingBlockLogs(result.logs, state.streamedContent)
}
await completeLoggingSession(result)
if (result.status === 'cancelled' && isTimedOut && timeoutMs) {
const timeoutErrorMessage = getTimeoutErrorMessage(null, timeoutMs)
logger.info(`[${requestId}] Streaming execution timed out`, { timeoutMs })
if (result._streamingMetadata?.loggingSession) {
await result._streamingMetadata.loggingSession.markAsFailed(timeoutErrorMessage)
}
controller.enqueue(encodeSSE({ event: 'error', error: timeoutErrorMessage }))
} else {
await completeLoggingSession(result)
const minimalResult = await buildMinimalResult(
result,
streamConfig.selectedOutputs,
state.streamedContent,
requestId,
streamConfig.includeFileBase64 ?? true,
streamConfig.base64MaxBytes
)
const minimalResult = await buildMinimalResult(
result,
streamConfig.selectedOutputs,
state.streamedContent,
requestId,
streamConfig.includeFileBase64 ?? true,
streamConfig.base64MaxBytes
)
controller.enqueue(encodeSSE({ event: 'final', data: minimalResult }))
}
controller.enqueue(encodeSSE({ event: 'final', data: minimalResult }))
controller.enqueue(encodeSSE('[DONE]'))
if (executionId) {
@@ -325,6 +348,8 @@ export async function createStreamingResponse(
}
controller.close()
} finally {
if (timeoutId) clearTimeout(timeoutId)
}
},
})