mirror of
https://github.com/simstudioai/sim.git
synced 2026-02-11 23:14:58 -05:00
* fix(terminal): reconnect to running executions after page refresh * fix(terminal): use ExecutionEvent type instead of any in reconnection stream * fix(execution): type event buffer with ExecutionEvent instead of Record<string, unknown> Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * fix(execution): validate fromEventId query param in reconnection endpoint Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * Fix some bugs * fix(variables): fix tag dropdown and cursor alignment in variables block (#3199) * feat(confluence): added list space labels, delete label, delete page prop (#3201) * updated route * ack comments * fix(execution): reset execution state in reconnection cleanup to unblock re-entry Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * fix(execution): restore running entries when reconnection is interrupted by navigation Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * done * remove cast in ioredis types * ack PR comments --------- Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com> Co-authored-by: Siddharth Ganesan <siddharthganesan@gmail.com>
355 lines
11 KiB
TypeScript
355 lines
11 KiB
TypeScript
import { useCallback } from 'react'
|
|
import { createLogger } from '@sim/logger'
|
|
import type {
|
|
BlockCompletedData,
|
|
BlockErrorData,
|
|
BlockStartedData,
|
|
ExecutionCancelledData,
|
|
ExecutionCompletedData,
|
|
ExecutionErrorData,
|
|
ExecutionEvent,
|
|
ExecutionStartedData,
|
|
StreamChunkData,
|
|
StreamDoneData,
|
|
} from '@/lib/workflows/executor/execution-events'
|
|
import type { SerializableExecutionState } from '@/executor/execution/types'
|
|
|
|
const logger = createLogger('useExecutionStream')
|
|
|
|
/**
|
|
* Detects errors caused by the browser killing a fetch (page refresh, navigation, tab close).
|
|
* These should be treated as clean disconnects, not execution errors.
|
|
*/
|
|
function isClientDisconnectError(error: any): boolean {
|
|
if (error.name === 'AbortError') return true
|
|
const msg = (error.message ?? '').toLowerCase()
|
|
return (
|
|
msg.includes('network error') || msg.includes('failed to fetch') || msg.includes('load failed')
|
|
)
|
|
}
|
|
|
|
/**
|
|
* Processes SSE events from a response body and invokes appropriate callbacks.
|
|
*/
|
|
async function processSSEStream(
|
|
reader: ReadableStreamDefaultReader<Uint8Array>,
|
|
callbacks: ExecutionStreamCallbacks,
|
|
logPrefix: string
|
|
): Promise<void> {
|
|
const decoder = new TextDecoder()
|
|
let buffer = ''
|
|
|
|
try {
|
|
while (true) {
|
|
const { done, value } = await reader.read()
|
|
|
|
if (done) break
|
|
|
|
buffer += decoder.decode(value, { stream: true })
|
|
const lines = buffer.split('\n\n')
|
|
buffer = lines.pop() || ''
|
|
|
|
for (const line of lines) {
|
|
if (!line.trim() || !line.startsWith('data: ')) continue
|
|
|
|
const data = line.substring(6).trim()
|
|
if (data === '[DONE]') {
|
|
logger.info(`${logPrefix} stream completed`)
|
|
continue
|
|
}
|
|
|
|
try {
|
|
const event = JSON.parse(data) as ExecutionEvent
|
|
|
|
switch (event.type) {
|
|
case 'execution:started':
|
|
callbacks.onExecutionStarted?.(event.data)
|
|
break
|
|
case 'execution:completed':
|
|
callbacks.onExecutionCompleted?.(event.data)
|
|
break
|
|
case 'execution:error':
|
|
callbacks.onExecutionError?.(event.data)
|
|
break
|
|
case 'execution:cancelled':
|
|
callbacks.onExecutionCancelled?.(event.data)
|
|
break
|
|
case 'block:started':
|
|
callbacks.onBlockStarted?.(event.data)
|
|
break
|
|
case 'block:completed':
|
|
callbacks.onBlockCompleted?.(event.data)
|
|
break
|
|
case 'block:error':
|
|
callbacks.onBlockError?.(event.data)
|
|
break
|
|
case 'stream:chunk':
|
|
callbacks.onStreamChunk?.(event.data)
|
|
break
|
|
case 'stream:done':
|
|
callbacks.onStreamDone?.(event.data)
|
|
break
|
|
default:
|
|
logger.warn('Unknown event type:', (event as any).type)
|
|
}
|
|
} catch (error) {
|
|
logger.error('Failed to parse SSE event:', error, { data })
|
|
}
|
|
}
|
|
}
|
|
} finally {
|
|
reader.releaseLock()
|
|
}
|
|
}
|
|
|
|
export interface ExecutionStreamCallbacks {
|
|
onExecutionStarted?: (data: ExecutionStartedData) => void
|
|
onExecutionCompleted?: (data: ExecutionCompletedData) => void
|
|
onExecutionError?: (data: ExecutionErrorData) => void
|
|
onExecutionCancelled?: (data: ExecutionCancelledData) => void
|
|
onBlockStarted?: (data: BlockStartedData) => void
|
|
onBlockCompleted?: (data: BlockCompletedData) => void
|
|
onBlockError?: (data: BlockErrorData) => void
|
|
onStreamChunk?: (data: StreamChunkData) => void
|
|
onStreamDone?: (data: StreamDoneData) => void
|
|
}
|
|
|
|
export interface ExecuteStreamOptions {
|
|
workflowId: string
|
|
input?: any
|
|
workflowInput?: any
|
|
currentBlockStates?: Record<string, any>
|
|
envVarValues?: Record<string, string>
|
|
workflowVariables?: Record<string, any>
|
|
selectedOutputs?: string[]
|
|
startBlockId?: string
|
|
triggerType?: string
|
|
useDraftState?: boolean
|
|
isClientSession?: boolean
|
|
workflowStateOverride?: {
|
|
blocks: Record<string, any>
|
|
edges: any[]
|
|
loops?: Record<string, any>
|
|
parallels?: Record<string, any>
|
|
}
|
|
stopAfterBlockId?: string
|
|
onExecutionId?: (executionId: string) => void
|
|
callbacks?: ExecutionStreamCallbacks
|
|
}
|
|
|
|
export interface ExecuteFromBlockOptions {
|
|
workflowId: string
|
|
startBlockId: string
|
|
sourceSnapshot: SerializableExecutionState
|
|
input?: any
|
|
onExecutionId?: (executionId: string) => void
|
|
callbacks?: ExecutionStreamCallbacks
|
|
}
|
|
|
|
export interface ReconnectStreamOptions {
|
|
workflowId: string
|
|
executionId: string
|
|
fromEventId?: number
|
|
callbacks?: ExecutionStreamCallbacks
|
|
}
|
|
|
|
/**
|
|
* Module-level map shared across all hook instances.
|
|
* Ensures ANY instance can cancel streams started by ANY other instance,
|
|
* which is critical for SPA navigation where the original hook instance unmounts
|
|
* but the SSE stream must be cancellable from the new instance.
|
|
*/
|
|
const sharedAbortControllers = new Map<string, AbortController>()
|
|
|
|
/**
|
|
* Hook for executing workflows via server-side SSE streaming.
|
|
* Supports concurrent executions via per-workflow AbortController maps.
|
|
*/
|
|
export function useExecutionStream() {
|
|
const execute = useCallback(async (options: ExecuteStreamOptions) => {
|
|
const { workflowId, callbacks = {}, onExecutionId, ...payload } = options
|
|
|
|
const existing = sharedAbortControllers.get(workflowId)
|
|
if (existing) {
|
|
existing.abort()
|
|
}
|
|
|
|
const abortController = new AbortController()
|
|
sharedAbortControllers.set(workflowId, abortController)
|
|
|
|
try {
|
|
const response = await fetch(`/api/workflows/${workflowId}/execute`, {
|
|
method: 'POST',
|
|
headers: {
|
|
'Content-Type': 'application/json',
|
|
},
|
|
body: JSON.stringify({ ...payload, stream: true }),
|
|
signal: abortController.signal,
|
|
})
|
|
|
|
if (!response.ok) {
|
|
const errorResponse = await response.json()
|
|
const error = new Error(errorResponse.error || 'Failed to start execution')
|
|
if (errorResponse && typeof errorResponse === 'object') {
|
|
Object.assign(error, { executionResult: errorResponse })
|
|
}
|
|
throw error
|
|
}
|
|
|
|
if (!response.body) {
|
|
throw new Error('No response body')
|
|
}
|
|
|
|
const serverExecutionId = response.headers.get('X-Execution-Id')
|
|
if (serverExecutionId) {
|
|
onExecutionId?.(serverExecutionId)
|
|
}
|
|
|
|
const reader = response.body.getReader()
|
|
await processSSEStream(reader, callbacks, 'Execution')
|
|
} catch (error: any) {
|
|
if (isClientDisconnectError(error)) {
|
|
logger.info('Execution stream disconnected (page unload or abort)')
|
|
return
|
|
}
|
|
logger.error('Execution stream error:', error)
|
|
callbacks.onExecutionError?.({
|
|
error: error.message || 'Unknown error',
|
|
duration: 0,
|
|
})
|
|
throw error
|
|
} finally {
|
|
if (sharedAbortControllers.get(workflowId) === abortController) {
|
|
sharedAbortControllers.delete(workflowId)
|
|
}
|
|
}
|
|
}, [])
|
|
|
|
const executeFromBlock = useCallback(async (options: ExecuteFromBlockOptions) => {
|
|
const {
|
|
workflowId,
|
|
startBlockId,
|
|
sourceSnapshot,
|
|
input,
|
|
onExecutionId,
|
|
callbacks = {},
|
|
} = options
|
|
|
|
const existing = sharedAbortControllers.get(workflowId)
|
|
if (existing) {
|
|
existing.abort()
|
|
}
|
|
|
|
const abortController = new AbortController()
|
|
sharedAbortControllers.set(workflowId, abortController)
|
|
|
|
try {
|
|
const response = await fetch(`/api/workflows/${workflowId}/execute`, {
|
|
method: 'POST',
|
|
headers: {
|
|
'Content-Type': 'application/json',
|
|
},
|
|
body: JSON.stringify({
|
|
stream: true,
|
|
input,
|
|
runFromBlock: { startBlockId, sourceSnapshot },
|
|
}),
|
|
signal: abortController.signal,
|
|
})
|
|
|
|
if (!response.ok) {
|
|
let errorResponse: any
|
|
try {
|
|
errorResponse = await response.json()
|
|
} catch {
|
|
throw new Error(`Server error (${response.status}): ${response.statusText}`)
|
|
}
|
|
const error = new Error(errorResponse.error || 'Failed to start execution')
|
|
if (errorResponse && typeof errorResponse === 'object') {
|
|
Object.assign(error, { executionResult: errorResponse })
|
|
}
|
|
throw error
|
|
}
|
|
|
|
if (!response.body) {
|
|
throw new Error('No response body')
|
|
}
|
|
|
|
const serverExecutionId = response.headers.get('X-Execution-Id')
|
|
if (serverExecutionId) {
|
|
onExecutionId?.(serverExecutionId)
|
|
}
|
|
|
|
const reader = response.body.getReader()
|
|
await processSSEStream(reader, callbacks, 'Run-from-block')
|
|
} catch (error: any) {
|
|
if (isClientDisconnectError(error)) {
|
|
logger.info('Run-from-block stream disconnected (page unload or abort)')
|
|
return
|
|
}
|
|
logger.error('Run-from-block execution error:', error)
|
|
callbacks.onExecutionError?.({
|
|
error: error.message || 'Unknown error',
|
|
duration: 0,
|
|
})
|
|
throw error
|
|
} finally {
|
|
if (sharedAbortControllers.get(workflowId) === abortController) {
|
|
sharedAbortControllers.delete(workflowId)
|
|
}
|
|
}
|
|
}, [])
|
|
|
|
const reconnect = useCallback(async (options: ReconnectStreamOptions) => {
|
|
const { workflowId, executionId, fromEventId = 0, callbacks = {} } = options
|
|
|
|
const existing = sharedAbortControllers.get(workflowId)
|
|
if (existing) {
|
|
existing.abort()
|
|
}
|
|
|
|
const abortController = new AbortController()
|
|
sharedAbortControllers.set(workflowId, abortController)
|
|
try {
|
|
const response = await fetch(
|
|
`/api/workflows/${workflowId}/executions/${executionId}/stream?from=${fromEventId}`,
|
|
{ signal: abortController.signal }
|
|
)
|
|
if (!response.ok) throw new Error(`Reconnect failed (${response.status})`)
|
|
if (!response.body) throw new Error('No response body')
|
|
|
|
await processSSEStream(response.body.getReader(), callbacks, 'Reconnect')
|
|
} catch (error: any) {
|
|
if (isClientDisconnectError(error)) return
|
|
logger.error('Reconnection stream error:', error)
|
|
throw error
|
|
} finally {
|
|
if (sharedAbortControllers.get(workflowId) === abortController) {
|
|
sharedAbortControllers.delete(workflowId)
|
|
}
|
|
}
|
|
}, [])
|
|
|
|
const cancel = useCallback((workflowId?: string) => {
|
|
if (workflowId) {
|
|
const controller = sharedAbortControllers.get(workflowId)
|
|
if (controller) {
|
|
controller.abort()
|
|
sharedAbortControllers.delete(workflowId)
|
|
}
|
|
} else {
|
|
for (const [, controller] of sharedAbortControllers) {
|
|
controller.abort()
|
|
}
|
|
sharedAbortControllers.clear()
|
|
}
|
|
}, [])
|
|
|
|
return {
|
|
execute,
|
|
executeFromBlock,
|
|
reconnect,
|
|
cancel,
|
|
}
|
|
}
|