Files
sim/apps/sim/hooks/use-execution-stream.ts
Waleed 81dfeb0bb0 fix(terminal): reconnect to running executions after page refresh (#3200)
* fix(terminal): reconnect to running executions after page refresh

* fix(terminal): use ExecutionEvent type instead of any in reconnection stream

* fix(execution): type event buffer with ExecutionEvent instead of Record<string, unknown>

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* fix(execution): validate fromEventId query param in reconnection endpoint

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* Fix some bugs

* fix(variables): fix tag dropdown and cursor alignment in variables block (#3199)

* feat(confluence): added list space labels, delete label, delete page prop (#3201)

* updated route

* ack comments

* fix(execution): reset execution state in reconnection cleanup to unblock re-entry

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* fix(execution): restore running entries when reconnection is interrupted by navigation

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* done

* remove cast in ioredis types

* ack PR comments

---------

Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
Co-authored-by: Siddharth Ganesan <siddharthganesan@gmail.com>
2026-02-11 19:31:29 -08:00

355 lines
11 KiB
TypeScript

import { useCallback } from 'react'
import { createLogger } from '@sim/logger'
import type {
BlockCompletedData,
BlockErrorData,
BlockStartedData,
ExecutionCancelledData,
ExecutionCompletedData,
ExecutionErrorData,
ExecutionEvent,
ExecutionStartedData,
StreamChunkData,
StreamDoneData,
} from '@/lib/workflows/executor/execution-events'
import type { SerializableExecutionState } from '@/executor/execution/types'
const logger = createLogger('useExecutionStream')
/**
* Detects errors caused by the browser killing a fetch (page refresh, navigation, tab close).
* These should be treated as clean disconnects, not execution errors.
*/
function isClientDisconnectError(error: any): boolean {
if (error.name === 'AbortError') return true
const msg = (error.message ?? '').toLowerCase()
return (
msg.includes('network error') || msg.includes('failed to fetch') || msg.includes('load failed')
)
}
/**
* Processes SSE events from a response body and invokes appropriate callbacks.
*/
async function processSSEStream(
reader: ReadableStreamDefaultReader<Uint8Array>,
callbacks: ExecutionStreamCallbacks,
logPrefix: string
): Promise<void> {
const decoder = new TextDecoder()
let buffer = ''
try {
while (true) {
const { done, value } = await reader.read()
if (done) break
buffer += decoder.decode(value, { stream: true })
const lines = buffer.split('\n\n')
buffer = lines.pop() || ''
for (const line of lines) {
if (!line.trim() || !line.startsWith('data: ')) continue
const data = line.substring(6).trim()
if (data === '[DONE]') {
logger.info(`${logPrefix} stream completed`)
continue
}
try {
const event = JSON.parse(data) as ExecutionEvent
switch (event.type) {
case 'execution:started':
callbacks.onExecutionStarted?.(event.data)
break
case 'execution:completed':
callbacks.onExecutionCompleted?.(event.data)
break
case 'execution:error':
callbacks.onExecutionError?.(event.data)
break
case 'execution:cancelled':
callbacks.onExecutionCancelled?.(event.data)
break
case 'block:started':
callbacks.onBlockStarted?.(event.data)
break
case 'block:completed':
callbacks.onBlockCompleted?.(event.data)
break
case 'block:error':
callbacks.onBlockError?.(event.data)
break
case 'stream:chunk':
callbacks.onStreamChunk?.(event.data)
break
case 'stream:done':
callbacks.onStreamDone?.(event.data)
break
default:
logger.warn('Unknown event type:', (event as any).type)
}
} catch (error) {
logger.error('Failed to parse SSE event:', error, { data })
}
}
}
} finally {
reader.releaseLock()
}
}
export interface ExecutionStreamCallbacks {
onExecutionStarted?: (data: ExecutionStartedData) => void
onExecutionCompleted?: (data: ExecutionCompletedData) => void
onExecutionError?: (data: ExecutionErrorData) => void
onExecutionCancelled?: (data: ExecutionCancelledData) => void
onBlockStarted?: (data: BlockStartedData) => void
onBlockCompleted?: (data: BlockCompletedData) => void
onBlockError?: (data: BlockErrorData) => void
onStreamChunk?: (data: StreamChunkData) => void
onStreamDone?: (data: StreamDoneData) => void
}
export interface ExecuteStreamOptions {
workflowId: string
input?: any
workflowInput?: any
currentBlockStates?: Record<string, any>
envVarValues?: Record<string, string>
workflowVariables?: Record<string, any>
selectedOutputs?: string[]
startBlockId?: string
triggerType?: string
useDraftState?: boolean
isClientSession?: boolean
workflowStateOverride?: {
blocks: Record<string, any>
edges: any[]
loops?: Record<string, any>
parallels?: Record<string, any>
}
stopAfterBlockId?: string
onExecutionId?: (executionId: string) => void
callbacks?: ExecutionStreamCallbacks
}
export interface ExecuteFromBlockOptions {
workflowId: string
startBlockId: string
sourceSnapshot: SerializableExecutionState
input?: any
onExecutionId?: (executionId: string) => void
callbacks?: ExecutionStreamCallbacks
}
export interface ReconnectStreamOptions {
workflowId: string
executionId: string
fromEventId?: number
callbacks?: ExecutionStreamCallbacks
}
/**
* Module-level map shared across all hook instances.
* Ensures ANY instance can cancel streams started by ANY other instance,
* which is critical for SPA navigation where the original hook instance unmounts
* but the SSE stream must be cancellable from the new instance.
*/
const sharedAbortControllers = new Map<string, AbortController>()
/**
* Hook for executing workflows via server-side SSE streaming.
* Supports concurrent executions via per-workflow AbortController maps.
*/
export function useExecutionStream() {
const execute = useCallback(async (options: ExecuteStreamOptions) => {
const { workflowId, callbacks = {}, onExecutionId, ...payload } = options
const existing = sharedAbortControllers.get(workflowId)
if (existing) {
existing.abort()
}
const abortController = new AbortController()
sharedAbortControllers.set(workflowId, abortController)
try {
const response = await fetch(`/api/workflows/${workflowId}/execute`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
},
body: JSON.stringify({ ...payload, stream: true }),
signal: abortController.signal,
})
if (!response.ok) {
const errorResponse = await response.json()
const error = new Error(errorResponse.error || 'Failed to start execution')
if (errorResponse && typeof errorResponse === 'object') {
Object.assign(error, { executionResult: errorResponse })
}
throw error
}
if (!response.body) {
throw new Error('No response body')
}
const serverExecutionId = response.headers.get('X-Execution-Id')
if (serverExecutionId) {
onExecutionId?.(serverExecutionId)
}
const reader = response.body.getReader()
await processSSEStream(reader, callbacks, 'Execution')
} catch (error: any) {
if (isClientDisconnectError(error)) {
logger.info('Execution stream disconnected (page unload or abort)')
return
}
logger.error('Execution stream error:', error)
callbacks.onExecutionError?.({
error: error.message || 'Unknown error',
duration: 0,
})
throw error
} finally {
if (sharedAbortControllers.get(workflowId) === abortController) {
sharedAbortControllers.delete(workflowId)
}
}
}, [])
const executeFromBlock = useCallback(async (options: ExecuteFromBlockOptions) => {
const {
workflowId,
startBlockId,
sourceSnapshot,
input,
onExecutionId,
callbacks = {},
} = options
const existing = sharedAbortControllers.get(workflowId)
if (existing) {
existing.abort()
}
const abortController = new AbortController()
sharedAbortControllers.set(workflowId, abortController)
try {
const response = await fetch(`/api/workflows/${workflowId}/execute`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
},
body: JSON.stringify({
stream: true,
input,
runFromBlock: { startBlockId, sourceSnapshot },
}),
signal: abortController.signal,
})
if (!response.ok) {
let errorResponse: any
try {
errorResponse = await response.json()
} catch {
throw new Error(`Server error (${response.status}): ${response.statusText}`)
}
const error = new Error(errorResponse.error || 'Failed to start execution')
if (errorResponse && typeof errorResponse === 'object') {
Object.assign(error, { executionResult: errorResponse })
}
throw error
}
if (!response.body) {
throw new Error('No response body')
}
const serverExecutionId = response.headers.get('X-Execution-Id')
if (serverExecutionId) {
onExecutionId?.(serverExecutionId)
}
const reader = response.body.getReader()
await processSSEStream(reader, callbacks, 'Run-from-block')
} catch (error: any) {
if (isClientDisconnectError(error)) {
logger.info('Run-from-block stream disconnected (page unload or abort)')
return
}
logger.error('Run-from-block execution error:', error)
callbacks.onExecutionError?.({
error: error.message || 'Unknown error',
duration: 0,
})
throw error
} finally {
if (sharedAbortControllers.get(workflowId) === abortController) {
sharedAbortControllers.delete(workflowId)
}
}
}, [])
const reconnect = useCallback(async (options: ReconnectStreamOptions) => {
const { workflowId, executionId, fromEventId = 0, callbacks = {} } = options
const existing = sharedAbortControllers.get(workflowId)
if (existing) {
existing.abort()
}
const abortController = new AbortController()
sharedAbortControllers.set(workflowId, abortController)
try {
const response = await fetch(
`/api/workflows/${workflowId}/executions/${executionId}/stream?from=${fromEventId}`,
{ signal: abortController.signal }
)
if (!response.ok) throw new Error(`Reconnect failed (${response.status})`)
if (!response.body) throw new Error('No response body')
await processSSEStream(response.body.getReader(), callbacks, 'Reconnect')
} catch (error: any) {
if (isClientDisconnectError(error)) return
logger.error('Reconnection stream error:', error)
throw error
} finally {
if (sharedAbortControllers.get(workflowId) === abortController) {
sharedAbortControllers.delete(workflowId)
}
}
}, [])
const cancel = useCallback((workflowId?: string) => {
if (workflowId) {
const controller = sharedAbortControllers.get(workflowId)
if (controller) {
controller.abort()
sharedAbortControllers.delete(workflowId)
}
} else {
for (const [, controller] of sharedAbortControllers) {
controller.abort()
}
sharedAbortControllers.clear()
}
}, [])
return {
execute,
executeFromBlock,
reconnect,
cancel,
}
}