mirror of
https://github.com/simstudioai/sim.git
synced 2026-02-05 04:05:14 -05:00
consolidate workflow execution and run from block hook code
This commit is contained in:
@@ -4,6 +4,11 @@ import { useQueryClient } from '@tanstack/react-query'
|
||||
import { v4 as uuidv4 } from 'uuid'
|
||||
import { buildTraceSpans } from '@/lib/logs/execution/trace-spans/trace-spans'
|
||||
import { processStreamingBlockLogs } from '@/lib/tokenization'
|
||||
import type {
|
||||
BlockCompletedData,
|
||||
BlockErrorData,
|
||||
BlockStartedData,
|
||||
} from '@/lib/workflows/executor/execution-events'
|
||||
import {
|
||||
extractTriggerMockPayload,
|
||||
selectBestTrigger,
|
||||
@@ -17,7 +22,13 @@ import {
|
||||
import { useCurrentWorkflow } from '@/app/workspace/[workspaceId]/w/[workflowId]/hooks/use-current-workflow'
|
||||
import { getBlock } from '@/blocks'
|
||||
import type { SerializableExecutionState } from '@/executor/execution/types'
|
||||
import type { BlockLog, BlockState, ExecutionResult, StreamingExecution } from '@/executor/types'
|
||||
import type {
|
||||
BlockLog,
|
||||
BlockState,
|
||||
ExecutionResult,
|
||||
NormalizedBlockOutput,
|
||||
StreamingExecution,
|
||||
} from '@/executor/types'
|
||||
import { hasExecutionResult } from '@/executor/utils/errors'
|
||||
import { coerceValue } from '@/executor/utils/start-block'
|
||||
import { subscriptionKeys } from '@/hooks/queries/subscription'
|
||||
@@ -41,6 +52,19 @@ interface DebugValidationResult {
|
||||
error?: string
|
||||
}
|
||||
|
||||
interface BlockEventHandlerConfig {
|
||||
workflowId?: string
|
||||
executionId?: string
|
||||
workflowEdges: Array<{ id: string; target: string }>
|
||||
activeBlocksSet: Set<string>
|
||||
accumulatedBlockLogs: BlockLog[]
|
||||
accumulatedBlockStates: Map<string, BlockState>
|
||||
executedBlockIds: Set<string>
|
||||
consoleMode: 'update' | 'add'
|
||||
includeStartConsoleEntry: boolean
|
||||
onBlockCompleteCallback?: (blockId: string, output: unknown) => Promise<void>
|
||||
}
|
||||
|
||||
const WORKFLOW_EXECUTION_FAILURE_MESSAGE = 'Workflow execution failed'
|
||||
|
||||
function isRecord(value: unknown): value is Record<string, unknown> {
|
||||
@@ -149,6 +173,340 @@ export function useWorkflowExecution() {
|
||||
setActiveBlocks,
|
||||
])
|
||||
|
||||
/**
|
||||
* Builds timing fields for execution-level console entries.
|
||||
*/
|
||||
const buildExecutionTiming = useCallback((durationMs?: number) => {
|
||||
const normalizedDuration = durationMs || 0
|
||||
return {
|
||||
durationMs: normalizedDuration,
|
||||
startedAt: new Date(Date.now() - normalizedDuration).toISOString(),
|
||||
endedAt: new Date().toISOString(),
|
||||
}
|
||||
}, [])
|
||||
|
||||
/**
|
||||
* Adds an execution-level error entry to the console when appropriate.
|
||||
*/
|
||||
const addExecutionErrorConsoleEntry = useCallback(
|
||||
(params: {
|
||||
workflowId?: string
|
||||
executionId?: string
|
||||
error?: string
|
||||
durationMs?: number
|
||||
blockLogs: BlockLog[]
|
||||
isPreExecutionError?: boolean
|
||||
}) => {
|
||||
if (!params.workflowId) return
|
||||
|
||||
const hasBlockError = params.blockLogs.some((log) => log.error)
|
||||
const isPreExecutionError = params.isPreExecutionError ?? false
|
||||
if (!isPreExecutionError && hasBlockError) {
|
||||
return
|
||||
}
|
||||
|
||||
const errorMessage = params.error || 'Execution failed'
|
||||
const isTimeout = errorMessage.toLowerCase().includes('timed out')
|
||||
const timing = buildExecutionTiming(params.durationMs)
|
||||
|
||||
addConsole({
|
||||
input: {},
|
||||
output: {},
|
||||
success: false,
|
||||
error: errorMessage,
|
||||
durationMs: timing.durationMs,
|
||||
startedAt: timing.startedAt,
|
||||
executionOrder: isPreExecutionError ? 0 : Number.MAX_SAFE_INTEGER,
|
||||
endedAt: timing.endedAt,
|
||||
workflowId: params.workflowId,
|
||||
blockId: isPreExecutionError
|
||||
? 'validation'
|
||||
: isTimeout
|
||||
? 'timeout-error'
|
||||
: 'execution-error',
|
||||
executionId: params.executionId,
|
||||
blockName: isPreExecutionError
|
||||
? 'Workflow Validation'
|
||||
: isTimeout
|
||||
? 'Timeout Error'
|
||||
: 'Execution Error',
|
||||
blockType: isPreExecutionError ? 'validation' : 'error',
|
||||
})
|
||||
},
|
||||
[addConsole, buildExecutionTiming]
|
||||
)
|
||||
|
||||
/**
|
||||
* Adds an execution-level cancellation entry to the console.
|
||||
*/
|
||||
const addExecutionCancelledConsoleEntry = useCallback(
|
||||
(params: { workflowId?: string; executionId?: string; durationMs?: number }) => {
|
||||
if (!params.workflowId) return
|
||||
|
||||
const timing = buildExecutionTiming(params.durationMs)
|
||||
addConsole({
|
||||
input: {},
|
||||
output: {},
|
||||
success: false,
|
||||
error: 'Execution was cancelled',
|
||||
durationMs: timing.durationMs,
|
||||
startedAt: timing.startedAt,
|
||||
executionOrder: Number.MAX_SAFE_INTEGER,
|
||||
endedAt: timing.endedAt,
|
||||
workflowId: params.workflowId,
|
||||
blockId: 'cancelled',
|
||||
executionId: params.executionId,
|
||||
blockName: 'Execution Cancelled',
|
||||
blockType: 'cancelled',
|
||||
})
|
||||
},
|
||||
[addConsole, buildExecutionTiming]
|
||||
)
|
||||
|
||||
/**
|
||||
* Handles workflow-level execution errors for console output.
|
||||
*/
|
||||
const handleExecutionErrorConsole = useCallback(
|
||||
(params: {
|
||||
workflowId?: string
|
||||
executionId?: string
|
||||
error?: string
|
||||
durationMs?: number
|
||||
blockLogs: BlockLog[]
|
||||
isPreExecutionError?: boolean
|
||||
}) => {
|
||||
if (params.workflowId) {
|
||||
cancelRunningEntries(params.workflowId)
|
||||
}
|
||||
addExecutionErrorConsoleEntry(params)
|
||||
},
|
||||
[addExecutionErrorConsoleEntry, cancelRunningEntries]
|
||||
)
|
||||
|
||||
/**
|
||||
* Handles workflow-level execution cancellations for console output.
|
||||
*/
|
||||
const handleExecutionCancelledConsole = useCallback(
|
||||
(params: { workflowId?: string; executionId?: string; durationMs?: number }) => {
|
||||
if (params.workflowId) {
|
||||
cancelRunningEntries(params.workflowId)
|
||||
}
|
||||
addExecutionCancelledConsoleEntry(params)
|
||||
},
|
||||
[addExecutionCancelledConsoleEntry, cancelRunningEntries]
|
||||
)
|
||||
|
||||
const buildBlockEventHandlers = useCallback(
|
||||
(config: BlockEventHandlerConfig) => {
|
||||
const {
|
||||
workflowId,
|
||||
executionId,
|
||||
workflowEdges,
|
||||
activeBlocksSet,
|
||||
accumulatedBlockLogs,
|
||||
accumulatedBlockStates,
|
||||
executedBlockIds,
|
||||
consoleMode,
|
||||
includeStartConsoleEntry,
|
||||
onBlockCompleteCallback,
|
||||
} = config
|
||||
|
||||
const updateActiveBlocks = (blockId: string, isActive: boolean) => {
|
||||
if (isActive) {
|
||||
activeBlocksSet.add(blockId)
|
||||
} else {
|
||||
activeBlocksSet.delete(blockId)
|
||||
}
|
||||
setActiveBlocks(new Set(activeBlocksSet))
|
||||
}
|
||||
|
||||
const markIncomingEdges = (blockId: string) => {
|
||||
const incomingEdges = workflowEdges.filter((edge) => edge.target === blockId)
|
||||
incomingEdges.forEach((edge) => {
|
||||
setEdgeRunStatus(edge.id, 'success')
|
||||
})
|
||||
}
|
||||
|
||||
const isContainerBlockType = (blockType?: string) => {
|
||||
return blockType === 'loop' || blockType === 'parallel'
|
||||
}
|
||||
|
||||
const createBlockLogEntry = (
|
||||
data: BlockCompletedData | BlockErrorData,
|
||||
options: { success: boolean; output?: unknown; error?: string }
|
||||
): BlockLog => ({
|
||||
blockId: data.blockId,
|
||||
blockName: data.blockName || 'Unknown Block',
|
||||
blockType: data.blockType || 'unknown',
|
||||
input: data.input || {},
|
||||
output: options.output ?? {},
|
||||
success: options.success,
|
||||
error: options.error,
|
||||
durationMs: data.durationMs,
|
||||
startedAt: data.startedAt,
|
||||
executionOrder: data.executionOrder,
|
||||
endedAt: data.endedAt,
|
||||
})
|
||||
|
||||
const addConsoleEntry = (data: BlockCompletedData, output: NormalizedBlockOutput) => {
|
||||
if (!workflowId) return
|
||||
addConsole({
|
||||
input: data.input || {},
|
||||
output,
|
||||
success: true,
|
||||
durationMs: data.durationMs,
|
||||
startedAt: data.startedAt,
|
||||
executionOrder: data.executionOrder,
|
||||
endedAt: data.endedAt,
|
||||
workflowId,
|
||||
blockId: data.blockId,
|
||||
executionId,
|
||||
blockName: data.blockName || 'Unknown Block',
|
||||
blockType: data.blockType || 'unknown',
|
||||
iterationCurrent: data.iterationCurrent,
|
||||
iterationTotal: data.iterationTotal,
|
||||
iterationType: data.iterationType,
|
||||
})
|
||||
}
|
||||
|
||||
const addConsoleErrorEntry = (data: BlockErrorData) => {
|
||||
if (!workflowId) return
|
||||
addConsole({
|
||||
input: data.input || {},
|
||||
output: {},
|
||||
success: false,
|
||||
error: data.error,
|
||||
durationMs: data.durationMs,
|
||||
startedAt: data.startedAt,
|
||||
executionOrder: data.executionOrder,
|
||||
endedAt: data.endedAt,
|
||||
workflowId,
|
||||
blockId: data.blockId,
|
||||
executionId,
|
||||
blockName: data.blockName || 'Unknown Block',
|
||||
blockType: data.blockType || 'unknown',
|
||||
iterationCurrent: data.iterationCurrent,
|
||||
iterationTotal: data.iterationTotal,
|
||||
iterationType: data.iterationType,
|
||||
})
|
||||
}
|
||||
|
||||
const updateConsoleEntry = (data: BlockCompletedData) => {
|
||||
updateConsole(
|
||||
data.blockId,
|
||||
{
|
||||
input: data.input || {},
|
||||
replaceOutput: data.output,
|
||||
success: true,
|
||||
durationMs: data.durationMs,
|
||||
startedAt: data.startedAt,
|
||||
endedAt: data.endedAt,
|
||||
isRunning: false,
|
||||
iterationCurrent: data.iterationCurrent,
|
||||
iterationTotal: data.iterationTotal,
|
||||
iterationType: data.iterationType,
|
||||
},
|
||||
executionId
|
||||
)
|
||||
}
|
||||
|
||||
const updateConsoleErrorEntry = (data: BlockErrorData) => {
|
||||
updateConsole(
|
||||
data.blockId,
|
||||
{
|
||||
input: data.input || {},
|
||||
replaceOutput: {},
|
||||
success: false,
|
||||
error: data.error,
|
||||
durationMs: data.durationMs,
|
||||
startedAt: data.startedAt,
|
||||
endedAt: data.endedAt,
|
||||
isRunning: false,
|
||||
iterationCurrent: data.iterationCurrent,
|
||||
iterationTotal: data.iterationTotal,
|
||||
iterationType: data.iterationType,
|
||||
},
|
||||
executionId
|
||||
)
|
||||
}
|
||||
|
||||
const onBlockStarted = (data: BlockStartedData) => {
|
||||
updateActiveBlocks(data.blockId, true)
|
||||
markIncomingEdges(data.blockId)
|
||||
|
||||
if (!includeStartConsoleEntry || !workflowId) return
|
||||
|
||||
const startedAt = new Date().toISOString()
|
||||
addConsole({
|
||||
input: {},
|
||||
output: undefined,
|
||||
success: undefined,
|
||||
durationMs: undefined,
|
||||
startedAt,
|
||||
executionOrder: data.executionOrder,
|
||||
endedAt: undefined,
|
||||
workflowId,
|
||||
blockId: data.blockId,
|
||||
executionId,
|
||||
blockName: data.blockName || 'Unknown Block',
|
||||
blockType: data.blockType || 'unknown',
|
||||
isRunning: true,
|
||||
iterationCurrent: data.iterationCurrent,
|
||||
iterationTotal: data.iterationTotal,
|
||||
iterationType: data.iterationType,
|
||||
})
|
||||
}
|
||||
|
||||
const onBlockCompleted = (data: BlockCompletedData) => {
|
||||
updateActiveBlocks(data.blockId, false)
|
||||
setBlockRunStatus(data.blockId, 'success')
|
||||
|
||||
executedBlockIds.add(data.blockId)
|
||||
accumulatedBlockStates.set(data.blockId, {
|
||||
output: data.output,
|
||||
executed: true,
|
||||
executionTime: data.durationMs,
|
||||
})
|
||||
|
||||
if (isContainerBlockType(data.blockType)) {
|
||||
return
|
||||
}
|
||||
|
||||
accumulatedBlockLogs.push(createBlockLogEntry(data, { success: true, output: data.output }))
|
||||
|
||||
if (consoleMode === 'update') {
|
||||
updateConsoleEntry(data)
|
||||
} else {
|
||||
addConsoleEntry(data, data.output as NormalizedBlockOutput)
|
||||
}
|
||||
|
||||
if (onBlockCompleteCallback) {
|
||||
onBlockCompleteCallback(data.blockId, data.output).catch((error) => {
|
||||
logger.error('Error in onBlockComplete callback:', error)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
const onBlockError = (data: BlockErrorData) => {
|
||||
updateActiveBlocks(data.blockId, false)
|
||||
setBlockRunStatus(data.blockId, 'error')
|
||||
|
||||
accumulatedBlockLogs.push(
|
||||
createBlockLogEntry(data, { success: false, output: {}, error: data.error })
|
||||
)
|
||||
|
||||
if (consoleMode === 'update') {
|
||||
updateConsoleErrorEntry(data)
|
||||
} else {
|
||||
addConsoleErrorEntry(data)
|
||||
}
|
||||
}
|
||||
|
||||
return { onBlockStarted, onBlockCompleted, onBlockError }
|
||||
},
|
||||
[addConsole, setActiveBlocks, setBlockRunStatus, setEdgeRunStatus, updateConsole]
|
||||
)
|
||||
|
||||
/**
|
||||
* Checks if debug session is complete based on execution result
|
||||
*/
|
||||
@@ -917,6 +1275,19 @@ export function useWorkflowExecution() {
|
||||
|
||||
// Execute the workflow
|
||||
try {
|
||||
const blockHandlers = buildBlockEventHandlers({
|
||||
workflowId: activeWorkflowId,
|
||||
executionId,
|
||||
workflowEdges,
|
||||
activeBlocksSet,
|
||||
accumulatedBlockLogs,
|
||||
accumulatedBlockStates,
|
||||
executedBlockIds,
|
||||
consoleMode: 'update',
|
||||
includeStartConsoleEntry: true,
|
||||
onBlockCompleteCallback: onBlockComplete,
|
||||
})
|
||||
|
||||
await executionStream.execute({
|
||||
workflowId: activeWorkflowId,
|
||||
input: finalWorkflowInput,
|
||||
@@ -939,145 +1310,9 @@ export function useWorkflowExecution() {
|
||||
logger.info('Server execution started:', data)
|
||||
},
|
||||
|
||||
onBlockStarted: (data) => {
|
||||
activeBlocksSet.add(data.blockId)
|
||||
// Create a new Set to trigger React re-render
|
||||
setActiveBlocks(new Set(activeBlocksSet))
|
||||
|
||||
// Track edges that led to this block as soon as execution starts
|
||||
const incomingEdges = workflowEdges.filter((edge) => edge.target === data.blockId)
|
||||
incomingEdges.forEach((edge) => {
|
||||
setEdgeRunStatus(edge.id, 'success')
|
||||
})
|
||||
|
||||
// Add entry to terminal immediately with isRunning=true
|
||||
// Use server-provided executionOrder to ensure correct sort order
|
||||
const startedAt = new Date().toISOString()
|
||||
addConsole({
|
||||
input: {},
|
||||
output: undefined,
|
||||
success: undefined,
|
||||
durationMs: undefined,
|
||||
startedAt,
|
||||
executionOrder: data.executionOrder,
|
||||
endedAt: undefined,
|
||||
workflowId: activeWorkflowId,
|
||||
blockId: data.blockId,
|
||||
executionId,
|
||||
blockName: data.blockName || 'Unknown Block',
|
||||
blockType: data.blockType || 'unknown',
|
||||
isRunning: true,
|
||||
// Pass through iteration context for subflow grouping
|
||||
iterationCurrent: data.iterationCurrent,
|
||||
iterationTotal: data.iterationTotal,
|
||||
iterationType: data.iterationType,
|
||||
})
|
||||
},
|
||||
|
||||
onBlockCompleted: (data) => {
|
||||
activeBlocksSet.delete(data.blockId)
|
||||
setActiveBlocks(new Set(activeBlocksSet))
|
||||
setBlockRunStatus(data.blockId, 'success')
|
||||
|
||||
executedBlockIds.add(data.blockId)
|
||||
accumulatedBlockStates.set(data.blockId, {
|
||||
output: data.output,
|
||||
executed: true,
|
||||
executionTime: data.durationMs,
|
||||
})
|
||||
|
||||
const isContainerBlock = data.blockType === 'loop' || data.blockType === 'parallel'
|
||||
if (isContainerBlock) return
|
||||
|
||||
const startedAt = data.startedAt
|
||||
const endedAt = data.endedAt
|
||||
|
||||
accumulatedBlockLogs.push({
|
||||
blockId: data.blockId,
|
||||
blockName: data.blockName || 'Unknown Block',
|
||||
blockType: data.blockType || 'unknown',
|
||||
input: data.input || {},
|
||||
output: data.output,
|
||||
success: true,
|
||||
durationMs: data.durationMs,
|
||||
startedAt,
|
||||
executionOrder: data.executionOrder,
|
||||
endedAt,
|
||||
})
|
||||
|
||||
// Update existing console entry (created in onBlockStarted) with completion data
|
||||
updateConsole(
|
||||
data.blockId,
|
||||
{
|
||||
input: data.input || {},
|
||||
replaceOutput: data.output,
|
||||
success: true,
|
||||
durationMs: data.durationMs,
|
||||
startedAt,
|
||||
endedAt,
|
||||
isRunning: false,
|
||||
// Pass through iteration context for subflow grouping
|
||||
iterationCurrent: data.iterationCurrent,
|
||||
iterationTotal: data.iterationTotal,
|
||||
iterationType: data.iterationType,
|
||||
},
|
||||
executionId
|
||||
)
|
||||
|
||||
// Call onBlockComplete callback if provided
|
||||
if (onBlockComplete) {
|
||||
onBlockComplete(data.blockId, data.output).catch((error) => {
|
||||
logger.error('Error in onBlockComplete callback:', error)
|
||||
})
|
||||
}
|
||||
},
|
||||
|
||||
onBlockError: (data) => {
|
||||
activeBlocksSet.delete(data.blockId)
|
||||
// Create a new Set to trigger React re-render
|
||||
setActiveBlocks(new Set(activeBlocksSet))
|
||||
|
||||
// Track failed block execution in run path
|
||||
setBlockRunStatus(data.blockId, 'error')
|
||||
|
||||
const startedAt = data.startedAt
|
||||
const endedAt = data.endedAt
|
||||
|
||||
// Accumulate block error log for the execution result
|
||||
accumulatedBlockLogs.push({
|
||||
blockId: data.blockId,
|
||||
blockName: data.blockName || 'Unknown Block',
|
||||
blockType: data.blockType || 'unknown',
|
||||
input: data.input || {},
|
||||
output: {},
|
||||
success: false,
|
||||
error: data.error,
|
||||
durationMs: data.durationMs,
|
||||
startedAt,
|
||||
executionOrder: data.executionOrder,
|
||||
endedAt,
|
||||
})
|
||||
|
||||
// Update existing console entry (created in onBlockStarted) with error data
|
||||
updateConsole(
|
||||
data.blockId,
|
||||
{
|
||||
input: data.input || {},
|
||||
replaceOutput: {},
|
||||
success: false,
|
||||
error: data.error,
|
||||
durationMs: data.durationMs,
|
||||
startedAt,
|
||||
endedAt,
|
||||
isRunning: false,
|
||||
// Pass through iteration context for subflow grouping
|
||||
iterationCurrent: data.iterationCurrent,
|
||||
iterationTotal: data.iterationTotal,
|
||||
iterationType: data.iterationType,
|
||||
},
|
||||
executionId
|
||||
)
|
||||
},
|
||||
onBlockStarted: blockHandlers.onBlockStarted,
|
||||
onBlockCompleted: blockHandlers.onBlockCompleted,
|
||||
onBlockError: blockHandlers.onBlockError,
|
||||
|
||||
onStreamChunk: (data) => {
|
||||
const existing = streamedContent.get(data.blockId) || ''
|
||||
@@ -1182,66 +1417,22 @@ export function useWorkflowExecution() {
|
||||
logs: accumulatedBlockLogs,
|
||||
}
|
||||
|
||||
if (activeWorkflowId) {
|
||||
cancelRunningEntries(activeWorkflowId)
|
||||
}
|
||||
|
||||
const isPreExecutionError = accumulatedBlockLogs.length === 0
|
||||
// Check if any block already has this error - don't duplicate block errors
|
||||
const blockAlreadyHasError = accumulatedBlockLogs.some((log) => log.error)
|
||||
|
||||
// Only add workflow-level error entry for:
|
||||
// 1. Pre-execution errors (validation) - no blocks ran
|
||||
// 2. Workflow-level errors (timeout, etc.) - no block has the error
|
||||
if (isPreExecutionError || !blockAlreadyHasError) {
|
||||
// Determine if this is a timeout error based on the error message
|
||||
const isTimeout = data.error?.toLowerCase().includes('timed out')
|
||||
addConsole({
|
||||
input: {},
|
||||
output: {},
|
||||
success: false,
|
||||
error: data.error,
|
||||
durationMs: data.duration || 0,
|
||||
startedAt: new Date(Date.now() - (data.duration || 0)).toISOString(),
|
||||
executionOrder: isPreExecutionError ? 0 : Number.MAX_SAFE_INTEGER,
|
||||
endedAt: new Date().toISOString(),
|
||||
workflowId: activeWorkflowId,
|
||||
blockId: isPreExecutionError
|
||||
? 'validation'
|
||||
: isTimeout
|
||||
? 'timeout-error'
|
||||
: 'execution-error',
|
||||
executionId,
|
||||
blockName: isPreExecutionError
|
||||
? 'Workflow Validation'
|
||||
: isTimeout
|
||||
? 'Timeout Error'
|
||||
: 'Execution Error',
|
||||
blockType: isPreExecutionError ? 'validation' : 'error',
|
||||
})
|
||||
}
|
||||
handleExecutionErrorConsole({
|
||||
workflowId: activeWorkflowId,
|
||||
executionId,
|
||||
error: data.error,
|
||||
durationMs: data.duration,
|
||||
blockLogs: accumulatedBlockLogs,
|
||||
isPreExecutionError,
|
||||
})
|
||||
},
|
||||
|
||||
onExecutionCancelled: (data) => {
|
||||
if (activeWorkflowId) {
|
||||
cancelRunningEntries(activeWorkflowId)
|
||||
}
|
||||
|
||||
// Add console entry for cancellation
|
||||
addConsole({
|
||||
input: {},
|
||||
output: {},
|
||||
success: false,
|
||||
error: 'Execution was cancelled',
|
||||
durationMs: data?.duration || 0,
|
||||
startedAt: new Date(Date.now() - (data?.duration || 0)).toISOString(),
|
||||
executionOrder: Number.MAX_SAFE_INTEGER,
|
||||
endedAt: new Date().toISOString(),
|
||||
handleExecutionCancelledConsole({
|
||||
workflowId: activeWorkflowId,
|
||||
blockId: 'cancelled',
|
||||
executionId,
|
||||
blockName: 'Execution Cancelled',
|
||||
blockType: 'cancelled',
|
||||
durationMs: data?.duration,
|
||||
})
|
||||
},
|
||||
},
|
||||
@@ -1638,115 +1829,27 @@ export function useWorkflowExecution() {
|
||||
const activeBlocksSet = new Set<string>()
|
||||
|
||||
try {
|
||||
const blockHandlers = buildBlockEventHandlers({
|
||||
workflowId,
|
||||
executionId,
|
||||
workflowEdges,
|
||||
activeBlocksSet,
|
||||
accumulatedBlockLogs,
|
||||
accumulatedBlockStates,
|
||||
executedBlockIds,
|
||||
consoleMode: 'add',
|
||||
includeStartConsoleEntry: false,
|
||||
})
|
||||
|
||||
await executionStream.executeFromBlock({
|
||||
workflowId,
|
||||
startBlockId: blockId,
|
||||
sourceSnapshot: effectiveSnapshot,
|
||||
input: workflowInput,
|
||||
callbacks: {
|
||||
onBlockStarted: (data) => {
|
||||
activeBlocksSet.add(data.blockId)
|
||||
setActiveBlocks(new Set(activeBlocksSet))
|
||||
|
||||
const incomingEdges = workflowEdges.filter((edge) => edge.target === data.blockId)
|
||||
incomingEdges.forEach((edge) => {
|
||||
setEdgeRunStatus(edge.id, 'success')
|
||||
})
|
||||
},
|
||||
|
||||
onBlockCompleted: (data) => {
|
||||
activeBlocksSet.delete(data.blockId)
|
||||
setActiveBlocks(new Set(activeBlocksSet))
|
||||
|
||||
setBlockRunStatus(data.blockId, 'success')
|
||||
|
||||
executedBlockIds.add(data.blockId)
|
||||
accumulatedBlockStates.set(data.blockId, {
|
||||
output: data.output,
|
||||
executed: true,
|
||||
executionTime: data.durationMs,
|
||||
})
|
||||
|
||||
const isContainerBlock = data.blockType === 'loop' || data.blockType === 'parallel'
|
||||
if (isContainerBlock) return
|
||||
|
||||
const startedAt = data.startedAt
|
||||
const endedAt = data.endedAt
|
||||
|
||||
accumulatedBlockLogs.push({
|
||||
blockId: data.blockId,
|
||||
blockName: data.blockName || 'Unknown Block',
|
||||
blockType: data.blockType || 'unknown',
|
||||
input: data.input || {},
|
||||
output: data.output,
|
||||
success: true,
|
||||
durationMs: data.durationMs,
|
||||
startedAt,
|
||||
executionOrder: data.executionOrder,
|
||||
endedAt,
|
||||
})
|
||||
|
||||
addConsole({
|
||||
input: data.input || {},
|
||||
output: data.output,
|
||||
success: true,
|
||||
durationMs: data.durationMs,
|
||||
startedAt,
|
||||
executionOrder: data.executionOrder,
|
||||
endedAt,
|
||||
workflowId,
|
||||
blockId: data.blockId,
|
||||
executionId,
|
||||
blockName: data.blockName || 'Unknown Block',
|
||||
blockType: data.blockType || 'unknown',
|
||||
iterationCurrent: data.iterationCurrent,
|
||||
iterationTotal: data.iterationTotal,
|
||||
iterationType: data.iterationType,
|
||||
})
|
||||
},
|
||||
|
||||
onBlockError: (data) => {
|
||||
activeBlocksSet.delete(data.blockId)
|
||||
setActiveBlocks(new Set(activeBlocksSet))
|
||||
|
||||
setBlockRunStatus(data.blockId, 'error')
|
||||
|
||||
const startedAt = data.startedAt
|
||||
const endedAt = data.endedAt
|
||||
|
||||
accumulatedBlockLogs.push({
|
||||
blockId: data.blockId,
|
||||
blockName: data.blockName || 'Unknown Block',
|
||||
blockType: data.blockType || 'unknown',
|
||||
input: data.input || {},
|
||||
output: {},
|
||||
success: false,
|
||||
error: data.error,
|
||||
executionOrder: data.executionOrder,
|
||||
durationMs: data.durationMs,
|
||||
startedAt,
|
||||
endedAt,
|
||||
})
|
||||
|
||||
addConsole({
|
||||
input: data.input || {},
|
||||
output: {},
|
||||
success: false,
|
||||
error: data.error,
|
||||
durationMs: data.durationMs,
|
||||
startedAt,
|
||||
executionOrder: data.executionOrder,
|
||||
endedAt,
|
||||
workflowId,
|
||||
blockId: data.blockId,
|
||||
executionId,
|
||||
blockName: data.blockName,
|
||||
blockType: data.blockType,
|
||||
iterationCurrent: data.iterationCurrent,
|
||||
iterationTotal: data.iterationTotal,
|
||||
iterationType: data.iterationType,
|
||||
})
|
||||
},
|
||||
onBlockStarted: blockHandlers.onBlockStarted,
|
||||
onBlockCompleted: blockHandlers.onBlockCompleted,
|
||||
onBlockError: blockHandlers.onBlockError,
|
||||
|
||||
onExecutionCompleted: (data) => {
|
||||
if (data.success) {
|
||||
@@ -1791,51 +1894,20 @@ export function useWorkflowExecution() {
|
||||
})
|
||||
}
|
||||
|
||||
cancelRunningEntries(workflowId)
|
||||
|
||||
// Check if any block already has an error - don't duplicate block errors
|
||||
const blockAlreadyHasError = accumulatedBlockLogs.some((log) => log.error)
|
||||
|
||||
// Only add execution error entry if no block has the error
|
||||
if (!blockAlreadyHasError) {
|
||||
// Determine if this is a timeout error based on the error message
|
||||
const isTimeout = data.error?.toLowerCase().includes('timed out')
|
||||
addConsole({
|
||||
input: {},
|
||||
output: {},
|
||||
success: false,
|
||||
error: data.error,
|
||||
durationMs: data.duration || 0,
|
||||
startedAt: new Date(Date.now() - (data.duration || 0)).toISOString(),
|
||||
executionOrder: Number.MAX_SAFE_INTEGER,
|
||||
endedAt: new Date().toISOString(),
|
||||
workflowId,
|
||||
blockId: isTimeout ? 'timeout-error' : 'execution-error',
|
||||
executionId,
|
||||
blockName: isTimeout ? 'Timeout Error' : 'Execution Error',
|
||||
blockType: 'error',
|
||||
})
|
||||
}
|
||||
handleExecutionErrorConsole({
|
||||
workflowId,
|
||||
executionId,
|
||||
error: data.error,
|
||||
durationMs: data.duration,
|
||||
blockLogs: accumulatedBlockLogs,
|
||||
})
|
||||
},
|
||||
|
||||
onExecutionCancelled: (data) => {
|
||||
cancelRunningEntries(workflowId)
|
||||
|
||||
// Add console entry for cancellation
|
||||
addConsole({
|
||||
input: {},
|
||||
output: {},
|
||||
success: false,
|
||||
error: 'Execution was cancelled',
|
||||
durationMs: data?.duration || 0,
|
||||
startedAt: new Date(Date.now() - (data?.duration || 0)).toISOString(),
|
||||
executionOrder: Number.MAX_SAFE_INTEGER,
|
||||
endedAt: new Date().toISOString(),
|
||||
handleExecutionCancelledConsole({
|
||||
workflowId,
|
||||
blockId: 'cancelled',
|
||||
executionId,
|
||||
blockName: 'Execution Cancelled',
|
||||
blockType: 'cancelled',
|
||||
durationMs: data?.duration,
|
||||
})
|
||||
},
|
||||
},
|
||||
@@ -1858,8 +1930,9 @@ export function useWorkflowExecution() {
|
||||
setBlockRunStatus,
|
||||
setEdgeRunStatus,
|
||||
addNotification,
|
||||
addConsole,
|
||||
cancelRunningEntries,
|
||||
buildBlockEventHandlers,
|
||||
handleExecutionErrorConsole,
|
||||
handleExecutionCancelledConsole,
|
||||
executionStream,
|
||||
]
|
||||
)
|
||||
|
||||
@@ -171,6 +171,7 @@ export async function createStreamingResponse(
|
||||
options: StreamingResponseOptions
|
||||
): Promise<ReadableStream> {
|
||||
const { requestId, workflow, input, executingUserId, streamConfig, executionId } = options
|
||||
const timeoutController = createTimeoutAbortController(streamConfig.timeoutMs)
|
||||
|
||||
return new ReadableStream({
|
||||
async start(controller) {
|
||||
@@ -270,8 +271,6 @@ export async function createStreamingResponse(
|
||||
}
|
||||
}
|
||||
|
||||
const timeoutController = createTimeoutAbortController(streamConfig.timeoutMs)
|
||||
|
||||
try {
|
||||
const result = await executeWorkflow(
|
||||
workflow,
|
||||
@@ -348,5 +347,17 @@ export async function createStreamingResponse(
|
||||
timeoutController.cleanup()
|
||||
}
|
||||
},
|
||||
async cancel(reason) {
|
||||
logger.info(`[${requestId}] Streaming response cancelled`, { reason })
|
||||
timeoutController.abort()
|
||||
timeoutController.cleanup()
|
||||
if (executionId) {
|
||||
try {
|
||||
await cleanupExecutionBase64Cache(executionId)
|
||||
} catch (error) {
|
||||
logger.error(`[${requestId}] Failed to cleanup base64 cache`, { error })
|
||||
}
|
||||
}
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user