From cb63e98dc8265bc6e53f99bbc536cbeb86ca4acd Mon Sep 17 00:00:00 2001 From: Vikhyath Mondreti Date: Wed, 4 Feb 2026 01:31:30 -0800 Subject: [PATCH] consolidate workflow execution and run from block hook code --- .../hooks/use-workflow-execution.ts | 751 ++++++++++-------- apps/sim/lib/workflows/streaming/streaming.ts | 15 +- 2 files changed, 425 insertions(+), 341 deletions(-) diff --git a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/hooks/use-workflow-execution.ts b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/hooks/use-workflow-execution.ts index 2ef9afe59..1b514dccd 100644 --- a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/hooks/use-workflow-execution.ts +++ b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/hooks/use-workflow-execution.ts @@ -4,6 +4,11 @@ import { useQueryClient } from '@tanstack/react-query' import { v4 as uuidv4 } from 'uuid' import { buildTraceSpans } from '@/lib/logs/execution/trace-spans/trace-spans' import { processStreamingBlockLogs } from '@/lib/tokenization' +import type { + BlockCompletedData, + BlockErrorData, + BlockStartedData, +} from '@/lib/workflows/executor/execution-events' import { extractTriggerMockPayload, selectBestTrigger, @@ -17,7 +22,13 @@ import { import { useCurrentWorkflow } from '@/app/workspace/[workspaceId]/w/[workflowId]/hooks/use-current-workflow' import { getBlock } from '@/blocks' import type { SerializableExecutionState } from '@/executor/execution/types' -import type { BlockLog, BlockState, ExecutionResult, StreamingExecution } from '@/executor/types' +import type { + BlockLog, + BlockState, + ExecutionResult, + NormalizedBlockOutput, + StreamingExecution, +} from '@/executor/types' import { hasExecutionResult } from '@/executor/utils/errors' import { coerceValue } from '@/executor/utils/start-block' import { subscriptionKeys } from '@/hooks/queries/subscription' @@ -41,6 +52,19 @@ interface DebugValidationResult { error?: string } +interface BlockEventHandlerConfig { + workflowId?: string + executionId?: string + workflowEdges: Array<{ id: string; target: string }> + activeBlocksSet: Set + accumulatedBlockLogs: BlockLog[] + accumulatedBlockStates: Map + executedBlockIds: Set + consoleMode: 'update' | 'add' + includeStartConsoleEntry: boolean + onBlockCompleteCallback?: (blockId: string, output: unknown) => Promise +} + const WORKFLOW_EXECUTION_FAILURE_MESSAGE = 'Workflow execution failed' function isRecord(value: unknown): value is Record { @@ -149,6 +173,340 @@ export function useWorkflowExecution() { setActiveBlocks, ]) + /** + * Builds timing fields for execution-level console entries. + */ + const buildExecutionTiming = useCallback((durationMs?: number) => { + const normalizedDuration = durationMs || 0 + return { + durationMs: normalizedDuration, + startedAt: new Date(Date.now() - normalizedDuration).toISOString(), + endedAt: new Date().toISOString(), + } + }, []) + + /** + * Adds an execution-level error entry to the console when appropriate. + */ + const addExecutionErrorConsoleEntry = useCallback( + (params: { + workflowId?: string + executionId?: string + error?: string + durationMs?: number + blockLogs: BlockLog[] + isPreExecutionError?: boolean + }) => { + if (!params.workflowId) return + + const hasBlockError = params.blockLogs.some((log) => log.error) + const isPreExecutionError = params.isPreExecutionError ?? false + if (!isPreExecutionError && hasBlockError) { + return + } + + const errorMessage = params.error || 'Execution failed' + const isTimeout = errorMessage.toLowerCase().includes('timed out') + const timing = buildExecutionTiming(params.durationMs) + + addConsole({ + input: {}, + output: {}, + success: false, + error: errorMessage, + durationMs: timing.durationMs, + startedAt: timing.startedAt, + executionOrder: isPreExecutionError ? 0 : Number.MAX_SAFE_INTEGER, + endedAt: timing.endedAt, + workflowId: params.workflowId, + blockId: isPreExecutionError + ? 'validation' + : isTimeout + ? 'timeout-error' + : 'execution-error', + executionId: params.executionId, + blockName: isPreExecutionError + ? 'Workflow Validation' + : isTimeout + ? 'Timeout Error' + : 'Execution Error', + blockType: isPreExecutionError ? 'validation' : 'error', + }) + }, + [addConsole, buildExecutionTiming] + ) + + /** + * Adds an execution-level cancellation entry to the console. + */ + const addExecutionCancelledConsoleEntry = useCallback( + (params: { workflowId?: string; executionId?: string; durationMs?: number }) => { + if (!params.workflowId) return + + const timing = buildExecutionTiming(params.durationMs) + addConsole({ + input: {}, + output: {}, + success: false, + error: 'Execution was cancelled', + durationMs: timing.durationMs, + startedAt: timing.startedAt, + executionOrder: Number.MAX_SAFE_INTEGER, + endedAt: timing.endedAt, + workflowId: params.workflowId, + blockId: 'cancelled', + executionId: params.executionId, + blockName: 'Execution Cancelled', + blockType: 'cancelled', + }) + }, + [addConsole, buildExecutionTiming] + ) + + /** + * Handles workflow-level execution errors for console output. + */ + const handleExecutionErrorConsole = useCallback( + (params: { + workflowId?: string + executionId?: string + error?: string + durationMs?: number + blockLogs: BlockLog[] + isPreExecutionError?: boolean + }) => { + if (params.workflowId) { + cancelRunningEntries(params.workflowId) + } + addExecutionErrorConsoleEntry(params) + }, + [addExecutionErrorConsoleEntry, cancelRunningEntries] + ) + + /** + * Handles workflow-level execution cancellations for console output. + */ + const handleExecutionCancelledConsole = useCallback( + (params: { workflowId?: string; executionId?: string; durationMs?: number }) => { + if (params.workflowId) { + cancelRunningEntries(params.workflowId) + } + addExecutionCancelledConsoleEntry(params) + }, + [addExecutionCancelledConsoleEntry, cancelRunningEntries] + ) + + const buildBlockEventHandlers = useCallback( + (config: BlockEventHandlerConfig) => { + const { + workflowId, + executionId, + workflowEdges, + activeBlocksSet, + accumulatedBlockLogs, + accumulatedBlockStates, + executedBlockIds, + consoleMode, + includeStartConsoleEntry, + onBlockCompleteCallback, + } = config + + const updateActiveBlocks = (blockId: string, isActive: boolean) => { + if (isActive) { + activeBlocksSet.add(blockId) + } else { + activeBlocksSet.delete(blockId) + } + setActiveBlocks(new Set(activeBlocksSet)) + } + + const markIncomingEdges = (blockId: string) => { + const incomingEdges = workflowEdges.filter((edge) => edge.target === blockId) + incomingEdges.forEach((edge) => { + setEdgeRunStatus(edge.id, 'success') + }) + } + + const isContainerBlockType = (blockType?: string) => { + return blockType === 'loop' || blockType === 'parallel' + } + + const createBlockLogEntry = ( + data: BlockCompletedData | BlockErrorData, + options: { success: boolean; output?: unknown; error?: string } + ): BlockLog => ({ + blockId: data.blockId, + blockName: data.blockName || 'Unknown Block', + blockType: data.blockType || 'unknown', + input: data.input || {}, + output: options.output ?? {}, + success: options.success, + error: options.error, + durationMs: data.durationMs, + startedAt: data.startedAt, + executionOrder: data.executionOrder, + endedAt: data.endedAt, + }) + + const addConsoleEntry = (data: BlockCompletedData, output: NormalizedBlockOutput) => { + if (!workflowId) return + addConsole({ + input: data.input || {}, + output, + success: true, + durationMs: data.durationMs, + startedAt: data.startedAt, + executionOrder: data.executionOrder, + endedAt: data.endedAt, + workflowId, + blockId: data.blockId, + executionId, + blockName: data.blockName || 'Unknown Block', + blockType: data.blockType || 'unknown', + iterationCurrent: data.iterationCurrent, + iterationTotal: data.iterationTotal, + iterationType: data.iterationType, + }) + } + + const addConsoleErrorEntry = (data: BlockErrorData) => { + if (!workflowId) return + addConsole({ + input: data.input || {}, + output: {}, + success: false, + error: data.error, + durationMs: data.durationMs, + startedAt: data.startedAt, + executionOrder: data.executionOrder, + endedAt: data.endedAt, + workflowId, + blockId: data.blockId, + executionId, + blockName: data.blockName || 'Unknown Block', + blockType: data.blockType || 'unknown', + iterationCurrent: data.iterationCurrent, + iterationTotal: data.iterationTotal, + iterationType: data.iterationType, + }) + } + + const updateConsoleEntry = (data: BlockCompletedData) => { + updateConsole( + data.blockId, + { + input: data.input || {}, + replaceOutput: data.output, + success: true, + durationMs: data.durationMs, + startedAt: data.startedAt, + endedAt: data.endedAt, + isRunning: false, + iterationCurrent: data.iterationCurrent, + iterationTotal: data.iterationTotal, + iterationType: data.iterationType, + }, + executionId + ) + } + + const updateConsoleErrorEntry = (data: BlockErrorData) => { + updateConsole( + data.blockId, + { + input: data.input || {}, + replaceOutput: {}, + success: false, + error: data.error, + durationMs: data.durationMs, + startedAt: data.startedAt, + endedAt: data.endedAt, + isRunning: false, + iterationCurrent: data.iterationCurrent, + iterationTotal: data.iterationTotal, + iterationType: data.iterationType, + }, + executionId + ) + } + + const onBlockStarted = (data: BlockStartedData) => { + updateActiveBlocks(data.blockId, true) + markIncomingEdges(data.blockId) + + if (!includeStartConsoleEntry || !workflowId) return + + const startedAt = new Date().toISOString() + addConsole({ + input: {}, + output: undefined, + success: undefined, + durationMs: undefined, + startedAt, + executionOrder: data.executionOrder, + endedAt: undefined, + workflowId, + blockId: data.blockId, + executionId, + blockName: data.blockName || 'Unknown Block', + blockType: data.blockType || 'unknown', + isRunning: true, + iterationCurrent: data.iterationCurrent, + iterationTotal: data.iterationTotal, + iterationType: data.iterationType, + }) + } + + const onBlockCompleted = (data: BlockCompletedData) => { + updateActiveBlocks(data.blockId, false) + setBlockRunStatus(data.blockId, 'success') + + executedBlockIds.add(data.blockId) + accumulatedBlockStates.set(data.blockId, { + output: data.output, + executed: true, + executionTime: data.durationMs, + }) + + if (isContainerBlockType(data.blockType)) { + return + } + + accumulatedBlockLogs.push(createBlockLogEntry(data, { success: true, output: data.output })) + + if (consoleMode === 'update') { + updateConsoleEntry(data) + } else { + addConsoleEntry(data, data.output as NormalizedBlockOutput) + } + + if (onBlockCompleteCallback) { + onBlockCompleteCallback(data.blockId, data.output).catch((error) => { + logger.error('Error in onBlockComplete callback:', error) + }) + } + } + + const onBlockError = (data: BlockErrorData) => { + updateActiveBlocks(data.blockId, false) + setBlockRunStatus(data.blockId, 'error') + + accumulatedBlockLogs.push( + createBlockLogEntry(data, { success: false, output: {}, error: data.error }) + ) + + if (consoleMode === 'update') { + updateConsoleErrorEntry(data) + } else { + addConsoleErrorEntry(data) + } + } + + return { onBlockStarted, onBlockCompleted, onBlockError } + }, + [addConsole, setActiveBlocks, setBlockRunStatus, setEdgeRunStatus, updateConsole] + ) + /** * Checks if debug session is complete based on execution result */ @@ -917,6 +1275,19 @@ export function useWorkflowExecution() { // Execute the workflow try { + const blockHandlers = buildBlockEventHandlers({ + workflowId: activeWorkflowId, + executionId, + workflowEdges, + activeBlocksSet, + accumulatedBlockLogs, + accumulatedBlockStates, + executedBlockIds, + consoleMode: 'update', + includeStartConsoleEntry: true, + onBlockCompleteCallback: onBlockComplete, + }) + await executionStream.execute({ workflowId: activeWorkflowId, input: finalWorkflowInput, @@ -939,145 +1310,9 @@ export function useWorkflowExecution() { logger.info('Server execution started:', data) }, - onBlockStarted: (data) => { - activeBlocksSet.add(data.blockId) - // Create a new Set to trigger React re-render - setActiveBlocks(new Set(activeBlocksSet)) - - // Track edges that led to this block as soon as execution starts - const incomingEdges = workflowEdges.filter((edge) => edge.target === data.blockId) - incomingEdges.forEach((edge) => { - setEdgeRunStatus(edge.id, 'success') - }) - - // Add entry to terminal immediately with isRunning=true - // Use server-provided executionOrder to ensure correct sort order - const startedAt = new Date().toISOString() - addConsole({ - input: {}, - output: undefined, - success: undefined, - durationMs: undefined, - startedAt, - executionOrder: data.executionOrder, - endedAt: undefined, - workflowId: activeWorkflowId, - blockId: data.blockId, - executionId, - blockName: data.blockName || 'Unknown Block', - blockType: data.blockType || 'unknown', - isRunning: true, - // Pass through iteration context for subflow grouping - iterationCurrent: data.iterationCurrent, - iterationTotal: data.iterationTotal, - iterationType: data.iterationType, - }) - }, - - onBlockCompleted: (data) => { - activeBlocksSet.delete(data.blockId) - setActiveBlocks(new Set(activeBlocksSet)) - setBlockRunStatus(data.blockId, 'success') - - executedBlockIds.add(data.blockId) - accumulatedBlockStates.set(data.blockId, { - output: data.output, - executed: true, - executionTime: data.durationMs, - }) - - const isContainerBlock = data.blockType === 'loop' || data.blockType === 'parallel' - if (isContainerBlock) return - - const startedAt = data.startedAt - const endedAt = data.endedAt - - accumulatedBlockLogs.push({ - blockId: data.blockId, - blockName: data.blockName || 'Unknown Block', - blockType: data.blockType || 'unknown', - input: data.input || {}, - output: data.output, - success: true, - durationMs: data.durationMs, - startedAt, - executionOrder: data.executionOrder, - endedAt, - }) - - // Update existing console entry (created in onBlockStarted) with completion data - updateConsole( - data.blockId, - { - input: data.input || {}, - replaceOutput: data.output, - success: true, - durationMs: data.durationMs, - startedAt, - endedAt, - isRunning: false, - // Pass through iteration context for subflow grouping - iterationCurrent: data.iterationCurrent, - iterationTotal: data.iterationTotal, - iterationType: data.iterationType, - }, - executionId - ) - - // Call onBlockComplete callback if provided - if (onBlockComplete) { - onBlockComplete(data.blockId, data.output).catch((error) => { - logger.error('Error in onBlockComplete callback:', error) - }) - } - }, - - onBlockError: (data) => { - activeBlocksSet.delete(data.blockId) - // Create a new Set to trigger React re-render - setActiveBlocks(new Set(activeBlocksSet)) - - // Track failed block execution in run path - setBlockRunStatus(data.blockId, 'error') - - const startedAt = data.startedAt - const endedAt = data.endedAt - - // Accumulate block error log for the execution result - accumulatedBlockLogs.push({ - blockId: data.blockId, - blockName: data.blockName || 'Unknown Block', - blockType: data.blockType || 'unknown', - input: data.input || {}, - output: {}, - success: false, - error: data.error, - durationMs: data.durationMs, - startedAt, - executionOrder: data.executionOrder, - endedAt, - }) - - // Update existing console entry (created in onBlockStarted) with error data - updateConsole( - data.blockId, - { - input: data.input || {}, - replaceOutput: {}, - success: false, - error: data.error, - durationMs: data.durationMs, - startedAt, - endedAt, - isRunning: false, - // Pass through iteration context for subflow grouping - iterationCurrent: data.iterationCurrent, - iterationTotal: data.iterationTotal, - iterationType: data.iterationType, - }, - executionId - ) - }, + onBlockStarted: blockHandlers.onBlockStarted, + onBlockCompleted: blockHandlers.onBlockCompleted, + onBlockError: blockHandlers.onBlockError, onStreamChunk: (data) => { const existing = streamedContent.get(data.blockId) || '' @@ -1182,66 +1417,22 @@ export function useWorkflowExecution() { logs: accumulatedBlockLogs, } - if (activeWorkflowId) { - cancelRunningEntries(activeWorkflowId) - } - const isPreExecutionError = accumulatedBlockLogs.length === 0 - // Check if any block already has this error - don't duplicate block errors - const blockAlreadyHasError = accumulatedBlockLogs.some((log) => log.error) - - // Only add workflow-level error entry for: - // 1. Pre-execution errors (validation) - no blocks ran - // 2. Workflow-level errors (timeout, etc.) - no block has the error - if (isPreExecutionError || !blockAlreadyHasError) { - // Determine if this is a timeout error based on the error message - const isTimeout = data.error?.toLowerCase().includes('timed out') - addConsole({ - input: {}, - output: {}, - success: false, - error: data.error, - durationMs: data.duration || 0, - startedAt: new Date(Date.now() - (data.duration || 0)).toISOString(), - executionOrder: isPreExecutionError ? 0 : Number.MAX_SAFE_INTEGER, - endedAt: new Date().toISOString(), - workflowId: activeWorkflowId, - blockId: isPreExecutionError - ? 'validation' - : isTimeout - ? 'timeout-error' - : 'execution-error', - executionId, - blockName: isPreExecutionError - ? 'Workflow Validation' - : isTimeout - ? 'Timeout Error' - : 'Execution Error', - blockType: isPreExecutionError ? 'validation' : 'error', - }) - } + handleExecutionErrorConsole({ + workflowId: activeWorkflowId, + executionId, + error: data.error, + durationMs: data.duration, + blockLogs: accumulatedBlockLogs, + isPreExecutionError, + }) }, onExecutionCancelled: (data) => { - if (activeWorkflowId) { - cancelRunningEntries(activeWorkflowId) - } - - // Add console entry for cancellation - addConsole({ - input: {}, - output: {}, - success: false, - error: 'Execution was cancelled', - durationMs: data?.duration || 0, - startedAt: new Date(Date.now() - (data?.duration || 0)).toISOString(), - executionOrder: Number.MAX_SAFE_INTEGER, - endedAt: new Date().toISOString(), + handleExecutionCancelledConsole({ workflowId: activeWorkflowId, - blockId: 'cancelled', executionId, - blockName: 'Execution Cancelled', - blockType: 'cancelled', + durationMs: data?.duration, }) }, }, @@ -1638,115 +1829,27 @@ export function useWorkflowExecution() { const activeBlocksSet = new Set() try { + const blockHandlers = buildBlockEventHandlers({ + workflowId, + executionId, + workflowEdges, + activeBlocksSet, + accumulatedBlockLogs, + accumulatedBlockStates, + executedBlockIds, + consoleMode: 'add', + includeStartConsoleEntry: false, + }) + await executionStream.executeFromBlock({ workflowId, startBlockId: blockId, sourceSnapshot: effectiveSnapshot, input: workflowInput, callbacks: { - onBlockStarted: (data) => { - activeBlocksSet.add(data.blockId) - setActiveBlocks(new Set(activeBlocksSet)) - - const incomingEdges = workflowEdges.filter((edge) => edge.target === data.blockId) - incomingEdges.forEach((edge) => { - setEdgeRunStatus(edge.id, 'success') - }) - }, - - onBlockCompleted: (data) => { - activeBlocksSet.delete(data.blockId) - setActiveBlocks(new Set(activeBlocksSet)) - - setBlockRunStatus(data.blockId, 'success') - - executedBlockIds.add(data.blockId) - accumulatedBlockStates.set(data.blockId, { - output: data.output, - executed: true, - executionTime: data.durationMs, - }) - - const isContainerBlock = data.blockType === 'loop' || data.blockType === 'parallel' - if (isContainerBlock) return - - const startedAt = data.startedAt - const endedAt = data.endedAt - - accumulatedBlockLogs.push({ - blockId: data.blockId, - blockName: data.blockName || 'Unknown Block', - blockType: data.blockType || 'unknown', - input: data.input || {}, - output: data.output, - success: true, - durationMs: data.durationMs, - startedAt, - executionOrder: data.executionOrder, - endedAt, - }) - - addConsole({ - input: data.input || {}, - output: data.output, - success: true, - durationMs: data.durationMs, - startedAt, - executionOrder: data.executionOrder, - endedAt, - workflowId, - blockId: data.blockId, - executionId, - blockName: data.blockName || 'Unknown Block', - blockType: data.blockType || 'unknown', - iterationCurrent: data.iterationCurrent, - iterationTotal: data.iterationTotal, - iterationType: data.iterationType, - }) - }, - - onBlockError: (data) => { - activeBlocksSet.delete(data.blockId) - setActiveBlocks(new Set(activeBlocksSet)) - - setBlockRunStatus(data.blockId, 'error') - - const startedAt = data.startedAt - const endedAt = data.endedAt - - accumulatedBlockLogs.push({ - blockId: data.blockId, - blockName: data.blockName || 'Unknown Block', - blockType: data.blockType || 'unknown', - input: data.input || {}, - output: {}, - success: false, - error: data.error, - executionOrder: data.executionOrder, - durationMs: data.durationMs, - startedAt, - endedAt, - }) - - addConsole({ - input: data.input || {}, - output: {}, - success: false, - error: data.error, - durationMs: data.durationMs, - startedAt, - executionOrder: data.executionOrder, - endedAt, - workflowId, - blockId: data.blockId, - executionId, - blockName: data.blockName, - blockType: data.blockType, - iterationCurrent: data.iterationCurrent, - iterationTotal: data.iterationTotal, - iterationType: data.iterationType, - }) - }, + onBlockStarted: blockHandlers.onBlockStarted, + onBlockCompleted: blockHandlers.onBlockCompleted, + onBlockError: blockHandlers.onBlockError, onExecutionCompleted: (data) => { if (data.success) { @@ -1791,51 +1894,20 @@ export function useWorkflowExecution() { }) } - cancelRunningEntries(workflowId) - - // Check if any block already has an error - don't duplicate block errors - const blockAlreadyHasError = accumulatedBlockLogs.some((log) => log.error) - - // Only add execution error entry if no block has the error - if (!blockAlreadyHasError) { - // Determine if this is a timeout error based on the error message - const isTimeout = data.error?.toLowerCase().includes('timed out') - addConsole({ - input: {}, - output: {}, - success: false, - error: data.error, - durationMs: data.duration || 0, - startedAt: new Date(Date.now() - (data.duration || 0)).toISOString(), - executionOrder: Number.MAX_SAFE_INTEGER, - endedAt: new Date().toISOString(), - workflowId, - blockId: isTimeout ? 'timeout-error' : 'execution-error', - executionId, - blockName: isTimeout ? 'Timeout Error' : 'Execution Error', - blockType: 'error', - }) - } + handleExecutionErrorConsole({ + workflowId, + executionId, + error: data.error, + durationMs: data.duration, + blockLogs: accumulatedBlockLogs, + }) }, onExecutionCancelled: (data) => { - cancelRunningEntries(workflowId) - - // Add console entry for cancellation - addConsole({ - input: {}, - output: {}, - success: false, - error: 'Execution was cancelled', - durationMs: data?.duration || 0, - startedAt: new Date(Date.now() - (data?.duration || 0)).toISOString(), - executionOrder: Number.MAX_SAFE_INTEGER, - endedAt: new Date().toISOString(), + handleExecutionCancelledConsole({ workflowId, - blockId: 'cancelled', executionId, - blockName: 'Execution Cancelled', - blockType: 'cancelled', + durationMs: data?.duration, }) }, }, @@ -1858,8 +1930,9 @@ export function useWorkflowExecution() { setBlockRunStatus, setEdgeRunStatus, addNotification, - addConsole, - cancelRunningEntries, + buildBlockEventHandlers, + handleExecutionErrorConsole, + handleExecutionCancelledConsole, executionStream, ] ) diff --git a/apps/sim/lib/workflows/streaming/streaming.ts b/apps/sim/lib/workflows/streaming/streaming.ts index f290d3e99..0bb84b66b 100644 --- a/apps/sim/lib/workflows/streaming/streaming.ts +++ b/apps/sim/lib/workflows/streaming/streaming.ts @@ -171,6 +171,7 @@ export async function createStreamingResponse( options: StreamingResponseOptions ): Promise { const { requestId, workflow, input, executingUserId, streamConfig, executionId } = options + const timeoutController = createTimeoutAbortController(streamConfig.timeoutMs) return new ReadableStream({ async start(controller) { @@ -270,8 +271,6 @@ export async function createStreamingResponse( } } - const timeoutController = createTimeoutAbortController(streamConfig.timeoutMs) - try { const result = await executeWorkflow( workflow, @@ -348,5 +347,17 @@ export async function createStreamingResponse( timeoutController.cleanup() } }, + async cancel(reason) { + logger.info(`[${requestId}] Streaming response cancelled`, { reason }) + timeoutController.abort() + timeoutController.cleanup() + if (executionId) { + try { + await cleanupExecutionBase64Cache(executionId) + } catch (error) { + logger.error(`[${requestId}] Failed to cleanup base64 cache`, { error }) + } + } + }, }) }