mirror of
https://github.com/simstudioai/sim.git
synced 2026-04-06 03:00:16 -04:00
fix(workflow) Fix embedded workflow logs (#3587)
* Extract workflow run logic into shared util * Fix lint * Fix isRunning being stuck in true * Fix lint --------- Co-authored-by: Theodore Li <theo@sim.ai>
This commit is contained in:
@@ -4,11 +4,6 @@ import { useQueryClient } from '@tanstack/react-query'
|
||||
import { v4 as uuidv4 } from 'uuid'
|
||||
import { buildTraceSpans } from '@/lib/logs/execution/trace-spans/trace-spans'
|
||||
import { processStreamingBlockLogs } from '@/lib/tokenization'
|
||||
import type {
|
||||
BlockCompletedData,
|
||||
BlockErrorData,
|
||||
BlockStartedData,
|
||||
} from '@/lib/workflows/executor/execution-events'
|
||||
import {
|
||||
extractTriggerMockPayload,
|
||||
selectBestTrigger,
|
||||
@@ -21,21 +16,14 @@ import {
|
||||
} from '@/lib/workflows/triggers/triggers'
|
||||
import { useCurrentWorkflow } from '@/app/workspace/[workspaceId]/w/[workflowId]/hooks/use-current-workflow'
|
||||
import {
|
||||
markOutgoingEdgesFromOutput,
|
||||
updateActiveBlockRefCount,
|
||||
type BlockEventHandlerConfig,
|
||||
createBlockEventHandlers,
|
||||
} from '@/app/workspace/[workspaceId]/w/[workflowId]/utils/workflow-execution-utils'
|
||||
import { getBlock } from '@/blocks'
|
||||
import type { SerializableExecutionState } from '@/executor/execution/types'
|
||||
import type {
|
||||
BlockLog,
|
||||
BlockState,
|
||||
ExecutionResult,
|
||||
NormalizedBlockOutput,
|
||||
StreamingExecution,
|
||||
} from '@/executor/types'
|
||||
import type { BlockLog, BlockState, ExecutionResult, StreamingExecution } from '@/executor/types'
|
||||
import { hasExecutionResult } from '@/executor/utils/errors'
|
||||
import { coerceValue } from '@/executor/utils/start-block'
|
||||
import { stripCloneSuffixes } from '@/executor/utils/subflow-utils'
|
||||
import { subscriptionKeys } from '@/hooks/queries/subscription'
|
||||
import { useExecutionStream } from '@/hooks/use-execution-stream'
|
||||
import { WorkflowValidationError } from '@/serializer'
|
||||
@@ -63,20 +51,6 @@ interface DebugValidationResult {
|
||||
error?: string
|
||||
}
|
||||
|
||||
interface BlockEventHandlerConfig {
|
||||
workflowId?: string
|
||||
executionIdRef: { current: string }
|
||||
workflowEdges: Array<{ id: string; source: string; target: string; sourceHandle?: string | null }>
|
||||
activeBlocksSet: Set<string>
|
||||
activeBlockRefCounts: Map<string, number>
|
||||
accumulatedBlockLogs: BlockLog[]
|
||||
accumulatedBlockStates: Map<string, BlockState>
|
||||
executedBlockIds: Set<string>
|
||||
consoleMode: 'update' | 'add'
|
||||
includeStartConsoleEntry: boolean
|
||||
onBlockCompleteCallback?: (blockId: string, output: unknown) => Promise<void>
|
||||
}
|
||||
|
||||
const WORKFLOW_EXECUTION_FAILURE_MESSAGE = 'Workflow execution failed'
|
||||
|
||||
function isRecord(value: unknown): value is Record<string, unknown> {
|
||||
@@ -309,279 +283,15 @@ export function useWorkflowExecution() {
|
||||
)
|
||||
|
||||
const buildBlockEventHandlers = useCallback(
|
||||
(config: BlockEventHandlerConfig) => {
|
||||
const {
|
||||
workflowId,
|
||||
executionIdRef,
|
||||
workflowEdges,
|
||||
activeBlocksSet,
|
||||
activeBlockRefCounts,
|
||||
accumulatedBlockLogs,
|
||||
accumulatedBlockStates,
|
||||
executedBlockIds,
|
||||
consoleMode,
|
||||
includeStartConsoleEntry,
|
||||
onBlockCompleteCallback,
|
||||
} = config
|
||||
|
||||
/** Returns true if this execution was cancelled or superseded by another run. */
|
||||
const isStaleExecution = () =>
|
||||
!!(
|
||||
workflowId &&
|
||||
executionIdRef.current &&
|
||||
useExecutionStore.getState().getCurrentExecutionId(workflowId) !== executionIdRef.current
|
||||
)
|
||||
|
||||
const updateActiveBlocks = (blockId: string, isActive: boolean) => {
|
||||
if (!workflowId) return
|
||||
updateActiveBlockRefCount(activeBlockRefCounts, activeBlocksSet, blockId, isActive)
|
||||
setActiveBlocks(workflowId, new Set(activeBlocksSet))
|
||||
}
|
||||
|
||||
const markOutgoingEdges = (blockId: string, output: Record<string, any> | undefined) => {
|
||||
if (!workflowId) return
|
||||
markOutgoingEdgesFromOutput(blockId, output, workflowEdges, workflowId, setEdgeRunStatus)
|
||||
}
|
||||
|
||||
const isContainerBlockType = (blockType?: string) => {
|
||||
return blockType === 'loop' || blockType === 'parallel'
|
||||
}
|
||||
|
||||
/** Extracts iteration and child-workflow fields shared across console entry call sites. */
|
||||
const extractIterationFields = (
|
||||
data: BlockStartedData | BlockCompletedData | BlockErrorData
|
||||
) => ({
|
||||
iterationCurrent: data.iterationCurrent,
|
||||
iterationTotal: data.iterationTotal,
|
||||
iterationType: data.iterationType,
|
||||
iterationContainerId: data.iterationContainerId,
|
||||
parentIterations: data.parentIterations,
|
||||
childWorkflowBlockId: data.childWorkflowBlockId,
|
||||
childWorkflowName: data.childWorkflowName,
|
||||
...('childWorkflowInstanceId' in data && {
|
||||
childWorkflowInstanceId: data.childWorkflowInstanceId,
|
||||
}),
|
||||
})
|
||||
|
||||
const createBlockLogEntry = (
|
||||
data: BlockCompletedData | BlockErrorData,
|
||||
options: { success: boolean; output?: unknown; error?: string }
|
||||
): BlockLog => ({
|
||||
blockId: data.blockId,
|
||||
blockName: data.blockName || 'Unknown Block',
|
||||
blockType: data.blockType || 'unknown',
|
||||
input: data.input || {},
|
||||
output: options.output ?? {},
|
||||
success: options.success,
|
||||
error: options.error,
|
||||
durationMs: data.durationMs,
|
||||
startedAt: data.startedAt,
|
||||
executionOrder: data.executionOrder,
|
||||
endedAt: data.endedAt,
|
||||
})
|
||||
|
||||
const addConsoleEntry = (data: BlockCompletedData, output: NormalizedBlockOutput) => {
|
||||
if (!workflowId) return
|
||||
addConsole({
|
||||
input: data.input || {},
|
||||
output,
|
||||
success: true,
|
||||
durationMs: data.durationMs,
|
||||
startedAt: data.startedAt,
|
||||
executionOrder: data.executionOrder,
|
||||
endedAt: data.endedAt,
|
||||
workflowId,
|
||||
blockId: data.blockId,
|
||||
executionId: executionIdRef.current,
|
||||
blockName: data.blockName || 'Unknown Block',
|
||||
blockType: data.blockType || 'unknown',
|
||||
...extractIterationFields(data),
|
||||
})
|
||||
}
|
||||
|
||||
const addConsoleErrorEntry = (data: BlockErrorData) => {
|
||||
if (!workflowId) return
|
||||
addConsole({
|
||||
input: data.input || {},
|
||||
output: {},
|
||||
success: false,
|
||||
error: data.error,
|
||||
durationMs: data.durationMs,
|
||||
startedAt: data.startedAt,
|
||||
executionOrder: data.executionOrder,
|
||||
endedAt: data.endedAt,
|
||||
workflowId,
|
||||
blockId: data.blockId,
|
||||
executionId: executionIdRef.current,
|
||||
blockName: data.blockName || 'Unknown Block',
|
||||
blockType: data.blockType || 'unknown',
|
||||
...extractIterationFields(data),
|
||||
})
|
||||
}
|
||||
|
||||
const updateConsoleEntry = (data: BlockCompletedData) => {
|
||||
updateConsole(
|
||||
data.blockId,
|
||||
{
|
||||
executionOrder: data.executionOrder,
|
||||
input: data.input || {},
|
||||
replaceOutput: data.output,
|
||||
success: true,
|
||||
durationMs: data.durationMs,
|
||||
startedAt: data.startedAt,
|
||||
endedAt: data.endedAt,
|
||||
isRunning: false,
|
||||
...extractIterationFields(data),
|
||||
},
|
||||
executionIdRef.current
|
||||
)
|
||||
}
|
||||
|
||||
const updateConsoleErrorEntry = (data: BlockErrorData) => {
|
||||
updateConsole(
|
||||
data.blockId,
|
||||
{
|
||||
executionOrder: data.executionOrder,
|
||||
input: data.input || {},
|
||||
replaceOutput: {},
|
||||
success: false,
|
||||
error: data.error,
|
||||
durationMs: data.durationMs,
|
||||
startedAt: data.startedAt,
|
||||
endedAt: data.endedAt,
|
||||
isRunning: false,
|
||||
...extractIterationFields(data),
|
||||
},
|
||||
executionIdRef.current
|
||||
)
|
||||
}
|
||||
|
||||
const onBlockStarted = (data: BlockStartedData) => {
|
||||
if (isStaleExecution()) return
|
||||
updateActiveBlocks(data.blockId, true)
|
||||
|
||||
if (!includeStartConsoleEntry || !workflowId) return
|
||||
|
||||
const startedAt = new Date().toISOString()
|
||||
addConsole({
|
||||
input: {},
|
||||
output: undefined,
|
||||
success: undefined,
|
||||
durationMs: undefined,
|
||||
startedAt,
|
||||
executionOrder: data.executionOrder,
|
||||
endedAt: undefined,
|
||||
workflowId,
|
||||
blockId: data.blockId,
|
||||
executionId: executionIdRef.current,
|
||||
blockName: data.blockName || 'Unknown Block',
|
||||
blockType: data.blockType || 'unknown',
|
||||
isRunning: true,
|
||||
...extractIterationFields(data),
|
||||
})
|
||||
}
|
||||
|
||||
const onBlockCompleted = (data: BlockCompletedData) => {
|
||||
if (isStaleExecution()) return
|
||||
updateActiveBlocks(data.blockId, false)
|
||||
if (workflowId) setBlockRunStatus(workflowId, data.blockId, 'success')
|
||||
markOutgoingEdges(data.blockId, data.output as Record<string, any> | undefined)
|
||||
executedBlockIds.add(data.blockId)
|
||||
accumulatedBlockStates.set(data.blockId, {
|
||||
output: data.output,
|
||||
executed: true,
|
||||
executionTime: data.durationMs,
|
||||
})
|
||||
|
||||
// For nested containers, the SSE blockId may be a cloned ID (e.g. P1__obranch-0).
|
||||
// Also record the original workflow-level ID so the canvas can highlight it.
|
||||
if (isContainerBlockType(data.blockType)) {
|
||||
const originalId = stripCloneSuffixes(data.blockId)
|
||||
if (originalId !== data.blockId) {
|
||||
executedBlockIds.add(originalId)
|
||||
if (workflowId) setBlockRunStatus(workflowId, originalId, 'success')
|
||||
}
|
||||
}
|
||||
|
||||
if (isContainerBlockType(data.blockType) && !data.iterationContainerId) {
|
||||
const output = data.output as Record<string, any> | undefined
|
||||
const isEmptySubflow = Array.isArray(output?.results) && output.results.length === 0
|
||||
if (!isEmptySubflow) return
|
||||
}
|
||||
|
||||
accumulatedBlockLogs.push(createBlockLogEntry(data, { success: true, output: data.output }))
|
||||
|
||||
if (consoleMode === 'update') {
|
||||
updateConsoleEntry(data)
|
||||
} else {
|
||||
addConsoleEntry(data, data.output as NormalizedBlockOutput)
|
||||
}
|
||||
|
||||
if (onBlockCompleteCallback) {
|
||||
onBlockCompleteCallback(data.blockId, data.output).catch((error) => {
|
||||
logger.error('Error in onBlockComplete callback:', error)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
const onBlockError = (data: BlockErrorData) => {
|
||||
if (isStaleExecution()) return
|
||||
updateActiveBlocks(data.blockId, false)
|
||||
if (workflowId) setBlockRunStatus(workflowId, data.blockId, 'error')
|
||||
markOutgoingEdges(data.blockId, { error: data.error })
|
||||
|
||||
executedBlockIds.add(data.blockId)
|
||||
accumulatedBlockStates.set(data.blockId, {
|
||||
output: { error: data.error },
|
||||
executed: true,
|
||||
executionTime: data.durationMs || 0,
|
||||
})
|
||||
|
||||
// For nested containers, also record the original workflow-level ID
|
||||
if (isContainerBlockType(data.blockType)) {
|
||||
const originalId = stripCloneSuffixes(data.blockId)
|
||||
if (originalId !== data.blockId) {
|
||||
executedBlockIds.add(originalId)
|
||||
if (workflowId) setBlockRunStatus(workflowId, originalId, 'error')
|
||||
}
|
||||
}
|
||||
|
||||
accumulatedBlockLogs.push(
|
||||
createBlockLogEntry(data, { success: false, output: {}, error: data.error })
|
||||
)
|
||||
|
||||
if (consoleMode === 'update') {
|
||||
updateConsoleErrorEntry(data)
|
||||
} else {
|
||||
addConsoleErrorEntry(data)
|
||||
}
|
||||
}
|
||||
|
||||
const onBlockChildWorkflowStarted = (data: {
|
||||
blockId: string
|
||||
childWorkflowInstanceId: string
|
||||
iterationCurrent?: number
|
||||
iterationContainerId?: string
|
||||
executionOrder?: number
|
||||
}) => {
|
||||
if (isStaleExecution()) return
|
||||
updateConsole(
|
||||
data.blockId,
|
||||
{
|
||||
childWorkflowInstanceId: data.childWorkflowInstanceId,
|
||||
...(data.iterationCurrent !== undefined && { iterationCurrent: data.iterationCurrent }),
|
||||
...(data.iterationContainerId !== undefined && {
|
||||
iterationContainerId: data.iterationContainerId,
|
||||
}),
|
||||
...(data.executionOrder !== undefined && { executionOrder: data.executionOrder }),
|
||||
},
|
||||
executionIdRef.current
|
||||
)
|
||||
}
|
||||
|
||||
return { onBlockStarted, onBlockCompleted, onBlockError, onBlockChildWorkflowStarted }
|
||||
},
|
||||
[addConsole, setActiveBlocks, setBlockRunStatus, setEdgeRunStatus, updateConsole]
|
||||
(config: BlockEventHandlerConfig) =>
|
||||
createBlockEventHandlers(config, {
|
||||
addConsole,
|
||||
updateConsole,
|
||||
setActiveBlocks,
|
||||
setBlockRunStatus,
|
||||
setEdgeRunStatus,
|
||||
}),
|
||||
[addConsole, updateConsole, setActiveBlocks, setBlockRunStatus, setEdgeRunStatus]
|
||||
)
|
||||
|
||||
/**
|
||||
|
||||
@@ -1,6 +1,23 @@
|
||||
import { createLogger } from '@sim/logger'
|
||||
import { v4 as uuidv4 } from 'uuid'
|
||||
import type { ExecutionResult, StreamingExecution } from '@/executor/types'
|
||||
import type {
|
||||
BlockCompletedData,
|
||||
BlockErrorData,
|
||||
BlockStartedData,
|
||||
} from '@/lib/workflows/executor/execution-events'
|
||||
import type {
|
||||
BlockLog,
|
||||
BlockState,
|
||||
ExecutionResult,
|
||||
NormalizedBlockOutput,
|
||||
StreamingExecution,
|
||||
} from '@/executor/types'
|
||||
import { stripCloneSuffixes } from '@/executor/utils/subflow-utils'
|
||||
|
||||
const logger = createLogger('workflow-execution-utils')
|
||||
|
||||
import { useExecutionStore } from '@/stores/execution'
|
||||
import type { ConsoleEntry, ConsoleUpdate } from '@/stores/terminal'
|
||||
import { useTerminalConsoleStore } from '@/stores/terminal'
|
||||
import { useWorkflowRegistry } from '@/stores/workflows/registry/store'
|
||||
import { useWorkflowStore } from '@/stores/workflows/workflow/store'
|
||||
@@ -85,6 +102,310 @@ export function markOutgoingEdgesFromOutput(
|
||||
}
|
||||
}
|
||||
|
||||
export interface BlockEventHandlerConfig {
|
||||
workflowId?: string
|
||||
executionIdRef: { current: string }
|
||||
workflowEdges: Array<{ id: string; source: string; target: string; sourceHandle?: string | null }>
|
||||
activeBlocksSet: Set<string>
|
||||
activeBlockRefCounts: Map<string, number>
|
||||
accumulatedBlockLogs: BlockLog[]
|
||||
accumulatedBlockStates: Map<string, BlockState>
|
||||
executedBlockIds: Set<string>
|
||||
consoleMode: 'update' | 'add'
|
||||
includeStartConsoleEntry: boolean
|
||||
onBlockCompleteCallback?: (blockId: string, output: unknown) => Promise<void>
|
||||
}
|
||||
|
||||
export interface BlockEventHandlerDeps {
|
||||
addConsole: (entry: Omit<ConsoleEntry, 'id' | 'timestamp'>) => ConsoleEntry
|
||||
updateConsole: (blockId: string, update: string | ConsoleUpdate, executionId?: string) => void
|
||||
setActiveBlocks: (workflowId: string, blocks: Set<string>) => void
|
||||
setBlockRunStatus: (workflowId: string, blockId: string, status: 'success' | 'error') => void
|
||||
setEdgeRunStatus: (workflowId: string, edgeId: string, status: 'success' | 'error') => void
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates block event handlers for SSE execution events.
|
||||
* Shared by the workflow execution hook and standalone execution utilities.
|
||||
*/
|
||||
export function createBlockEventHandlers(
|
||||
config: BlockEventHandlerConfig,
|
||||
deps: BlockEventHandlerDeps
|
||||
) {
|
||||
const {
|
||||
workflowId,
|
||||
executionIdRef,
|
||||
workflowEdges,
|
||||
activeBlocksSet,
|
||||
activeBlockRefCounts,
|
||||
accumulatedBlockLogs,
|
||||
accumulatedBlockStates,
|
||||
executedBlockIds,
|
||||
consoleMode,
|
||||
includeStartConsoleEntry,
|
||||
onBlockCompleteCallback,
|
||||
} = config
|
||||
|
||||
const { addConsole, updateConsole, setActiveBlocks, setBlockRunStatus, setEdgeRunStatus } = deps
|
||||
|
||||
const isStaleExecution = () =>
|
||||
!!(
|
||||
workflowId &&
|
||||
executionIdRef.current &&
|
||||
useExecutionStore.getState().getCurrentExecutionId(workflowId) !== executionIdRef.current
|
||||
)
|
||||
|
||||
const updateActiveBlocks = (blockId: string, isActive: boolean) => {
|
||||
if (!workflowId) return
|
||||
updateActiveBlockRefCount(activeBlockRefCounts, activeBlocksSet, blockId, isActive)
|
||||
setActiveBlocks(workflowId, new Set(activeBlocksSet))
|
||||
}
|
||||
|
||||
const markOutgoingEdges = (blockId: string, output: Record<string, any> | undefined) => {
|
||||
if (!workflowId) return
|
||||
markOutgoingEdgesFromOutput(blockId, output, workflowEdges, workflowId, setEdgeRunStatus)
|
||||
}
|
||||
|
||||
const isContainerBlockType = (blockType?: string) => {
|
||||
return blockType === 'loop' || blockType === 'parallel'
|
||||
}
|
||||
|
||||
const extractIterationFields = (
|
||||
data: BlockStartedData | BlockCompletedData | BlockErrorData
|
||||
) => ({
|
||||
iterationCurrent: data.iterationCurrent,
|
||||
iterationTotal: data.iterationTotal,
|
||||
iterationType: data.iterationType,
|
||||
iterationContainerId: data.iterationContainerId,
|
||||
parentIterations: data.parentIterations,
|
||||
childWorkflowBlockId: data.childWorkflowBlockId,
|
||||
childWorkflowName: data.childWorkflowName,
|
||||
...('childWorkflowInstanceId' in data && {
|
||||
childWorkflowInstanceId: data.childWorkflowInstanceId,
|
||||
}),
|
||||
})
|
||||
|
||||
const createBlockLogEntry = (
|
||||
data: BlockCompletedData | BlockErrorData,
|
||||
options: { success: boolean; output?: unknown; error?: string }
|
||||
): BlockLog => ({
|
||||
blockId: data.blockId,
|
||||
blockName: data.blockName || 'Unknown Block',
|
||||
blockType: data.blockType || 'unknown',
|
||||
input: data.input || {},
|
||||
output: options.output ?? {},
|
||||
success: options.success,
|
||||
error: options.error,
|
||||
durationMs: data.durationMs,
|
||||
startedAt: data.startedAt,
|
||||
executionOrder: data.executionOrder,
|
||||
endedAt: data.endedAt,
|
||||
})
|
||||
|
||||
const addConsoleEntry = (data: BlockCompletedData, output: NormalizedBlockOutput) => {
|
||||
if (!workflowId) return
|
||||
addConsole({
|
||||
input: data.input || {},
|
||||
output,
|
||||
success: true,
|
||||
durationMs: data.durationMs,
|
||||
startedAt: data.startedAt,
|
||||
executionOrder: data.executionOrder,
|
||||
endedAt: data.endedAt,
|
||||
workflowId,
|
||||
blockId: data.blockId,
|
||||
executionId: executionIdRef.current,
|
||||
blockName: data.blockName || 'Unknown Block',
|
||||
blockType: data.blockType || 'unknown',
|
||||
...extractIterationFields(data),
|
||||
})
|
||||
}
|
||||
|
||||
const addConsoleErrorEntry = (data: BlockErrorData) => {
|
||||
if (!workflowId) return
|
||||
addConsole({
|
||||
input: data.input || {},
|
||||
output: {},
|
||||
success: false,
|
||||
error: data.error,
|
||||
durationMs: data.durationMs,
|
||||
startedAt: data.startedAt,
|
||||
executionOrder: data.executionOrder,
|
||||
endedAt: data.endedAt,
|
||||
workflowId,
|
||||
blockId: data.blockId,
|
||||
executionId: executionIdRef.current,
|
||||
blockName: data.blockName || 'Unknown Block',
|
||||
blockType: data.blockType || 'unknown',
|
||||
...extractIterationFields(data),
|
||||
})
|
||||
}
|
||||
|
||||
const updateConsoleEntry = (data: BlockCompletedData) => {
|
||||
updateConsole(
|
||||
data.blockId,
|
||||
{
|
||||
executionOrder: data.executionOrder,
|
||||
input: data.input || {},
|
||||
replaceOutput: data.output,
|
||||
success: true,
|
||||
durationMs: data.durationMs,
|
||||
startedAt: data.startedAt,
|
||||
endedAt: data.endedAt,
|
||||
isRunning: false,
|
||||
...extractIterationFields(data),
|
||||
},
|
||||
executionIdRef.current
|
||||
)
|
||||
}
|
||||
|
||||
const updateConsoleErrorEntry = (data: BlockErrorData) => {
|
||||
updateConsole(
|
||||
data.blockId,
|
||||
{
|
||||
executionOrder: data.executionOrder,
|
||||
input: data.input || {},
|
||||
replaceOutput: {},
|
||||
success: false,
|
||||
error: data.error,
|
||||
durationMs: data.durationMs,
|
||||
startedAt: data.startedAt,
|
||||
endedAt: data.endedAt,
|
||||
isRunning: false,
|
||||
...extractIterationFields(data),
|
||||
},
|
||||
executionIdRef.current
|
||||
)
|
||||
}
|
||||
|
||||
const onBlockStarted = (data: BlockStartedData) => {
|
||||
if (isStaleExecution()) return
|
||||
updateActiveBlocks(data.blockId, true)
|
||||
|
||||
if (!includeStartConsoleEntry || !workflowId) return
|
||||
|
||||
const startedAt = new Date().toISOString()
|
||||
addConsole({
|
||||
input: {},
|
||||
output: undefined,
|
||||
success: undefined,
|
||||
durationMs: undefined,
|
||||
startedAt,
|
||||
executionOrder: data.executionOrder,
|
||||
endedAt: undefined,
|
||||
workflowId,
|
||||
blockId: data.blockId,
|
||||
executionId: executionIdRef.current,
|
||||
blockName: data.blockName || 'Unknown Block',
|
||||
blockType: data.blockType || 'unknown',
|
||||
isRunning: true,
|
||||
...extractIterationFields(data),
|
||||
})
|
||||
}
|
||||
|
||||
const onBlockCompleted = (data: BlockCompletedData) => {
|
||||
if (isStaleExecution()) return
|
||||
updateActiveBlocks(data.blockId, false)
|
||||
if (workflowId) setBlockRunStatus(workflowId, data.blockId, 'success')
|
||||
markOutgoingEdges(data.blockId, data.output as Record<string, any> | undefined)
|
||||
executedBlockIds.add(data.blockId)
|
||||
accumulatedBlockStates.set(data.blockId, {
|
||||
output: data.output,
|
||||
executed: true,
|
||||
executionTime: data.durationMs,
|
||||
})
|
||||
|
||||
if (isContainerBlockType(data.blockType)) {
|
||||
const originalId = stripCloneSuffixes(data.blockId)
|
||||
if (originalId !== data.blockId) {
|
||||
executedBlockIds.add(originalId)
|
||||
if (workflowId) setBlockRunStatus(workflowId, originalId, 'success')
|
||||
}
|
||||
}
|
||||
|
||||
if (isContainerBlockType(data.blockType) && !data.iterationContainerId) {
|
||||
const output = data.output as Record<string, any> | undefined
|
||||
const isEmptySubflow = Array.isArray(output?.results) && output.results.length === 0
|
||||
if (!isEmptySubflow) {
|
||||
if (includeStartConsoleEntry) {
|
||||
updateConsoleEntry(data)
|
||||
}
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
accumulatedBlockLogs.push(createBlockLogEntry(data, { success: true, output: data.output }))
|
||||
|
||||
if (consoleMode === 'update') {
|
||||
updateConsoleEntry(data)
|
||||
} else {
|
||||
addConsoleEntry(data, data.output as NormalizedBlockOutput)
|
||||
}
|
||||
|
||||
if (onBlockCompleteCallback) {
|
||||
onBlockCompleteCallback(data.blockId, data.output).catch((error) => {
|
||||
logger.error('Error in onBlockComplete callback:', { blockId: data.blockId, error })
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
const onBlockError = (data: BlockErrorData) => {
|
||||
if (isStaleExecution()) return
|
||||
updateActiveBlocks(data.blockId, false)
|
||||
if (workflowId) setBlockRunStatus(workflowId, data.blockId, 'error')
|
||||
markOutgoingEdges(data.blockId, { error: data.error })
|
||||
|
||||
executedBlockIds.add(data.blockId)
|
||||
accumulatedBlockStates.set(data.blockId, {
|
||||
output: { error: data.error },
|
||||
executed: true,
|
||||
executionTime: data.durationMs || 0,
|
||||
})
|
||||
|
||||
if (isContainerBlockType(data.blockType)) {
|
||||
const originalId = stripCloneSuffixes(data.blockId)
|
||||
if (originalId !== data.blockId) {
|
||||
executedBlockIds.add(originalId)
|
||||
if (workflowId) setBlockRunStatus(workflowId, originalId, 'error')
|
||||
}
|
||||
}
|
||||
|
||||
accumulatedBlockLogs.push(
|
||||
createBlockLogEntry(data, { success: false, output: {}, error: data.error })
|
||||
)
|
||||
|
||||
if (consoleMode === 'update') {
|
||||
updateConsoleErrorEntry(data)
|
||||
} else {
|
||||
addConsoleErrorEntry(data)
|
||||
}
|
||||
}
|
||||
|
||||
const onBlockChildWorkflowStarted = (data: {
|
||||
blockId: string
|
||||
childWorkflowInstanceId: string
|
||||
iterationCurrent?: number
|
||||
iterationContainerId?: string
|
||||
executionOrder?: number
|
||||
}) => {
|
||||
if (isStaleExecution()) return
|
||||
updateConsole(
|
||||
data.blockId,
|
||||
{
|
||||
childWorkflowInstanceId: data.childWorkflowInstanceId,
|
||||
...(data.iterationCurrent !== undefined && { iterationCurrent: data.iterationCurrent }),
|
||||
...(data.iterationContainerId !== undefined && {
|
||||
iterationContainerId: data.iterationContainerId,
|
||||
}),
|
||||
...(data.executionOrder !== undefined && { executionOrder: data.executionOrder }),
|
||||
},
|
||||
executionIdRef.current
|
||||
)
|
||||
}
|
||||
|
||||
return { onBlockStarted, onBlockCompleted, onBlockError, onBlockChildWorkflowStarted }
|
||||
}
|
||||
|
||||
export interface WorkflowExecutionOptions {
|
||||
workflowId?: string
|
||||
workflowInput?: any
|
||||
@@ -115,7 +436,7 @@ export async function executeWorkflowWithFullLogging(
|
||||
}
|
||||
|
||||
const executionId = options.executionId || uuidv4()
|
||||
const { addConsole } = useTerminalConsoleStore.getState()
|
||||
const { addConsole, updateConsole } = useTerminalConsoleStore.getState()
|
||||
const { setActiveBlocks, setBlockRunStatus, setEdgeRunStatus, setCurrentExecutionId } =
|
||||
useExecutionStore.getState()
|
||||
const wfId = targetWorkflowId
|
||||
@@ -123,6 +444,24 @@ export async function executeWorkflowWithFullLogging(
|
||||
|
||||
const activeBlocksSet = new Set<string>()
|
||||
const activeBlockRefCounts = new Map<string, number>()
|
||||
const executionIdRef = { current: executionId }
|
||||
|
||||
const blockHandlers = createBlockEventHandlers(
|
||||
{
|
||||
workflowId: wfId,
|
||||
executionIdRef,
|
||||
workflowEdges,
|
||||
activeBlocksSet,
|
||||
activeBlockRefCounts,
|
||||
accumulatedBlockLogs: [],
|
||||
accumulatedBlockStates: new Map(),
|
||||
executedBlockIds: new Set(),
|
||||
consoleMode: 'update',
|
||||
includeStartConsoleEntry: true,
|
||||
onBlockCompleteCallback: options.onBlockComplete,
|
||||
},
|
||||
{ addConsole, updateConsole, setActiveBlocks, setBlockRunStatus, setEdgeRunStatus }
|
||||
)
|
||||
|
||||
const payload: any = {
|
||||
input: options.workflowInput,
|
||||
@@ -188,125 +527,25 @@ export async function executeWorkflowWithFullLogging(
|
||||
switch (event.type) {
|
||||
case 'execution:started': {
|
||||
setCurrentExecutionId(wfId, event.executionId)
|
||||
break
|
||||
}
|
||||
case 'block:started': {
|
||||
updateActiveBlockRefCount(
|
||||
activeBlockRefCounts,
|
||||
activeBlocksSet,
|
||||
event.data.blockId,
|
||||
true
|
||||
)
|
||||
setActiveBlocks(wfId, new Set(activeBlocksSet))
|
||||
executionIdRef.current = event.executionId || executionId
|
||||
break
|
||||
}
|
||||
|
||||
case 'block:completed': {
|
||||
updateActiveBlockRefCount(
|
||||
activeBlockRefCounts,
|
||||
activeBlocksSet,
|
||||
event.data.blockId,
|
||||
false
|
||||
)
|
||||
setActiveBlocks(wfId, new Set(activeBlocksSet))
|
||||
|
||||
setBlockRunStatus(wfId, event.data.blockId, 'success')
|
||||
markOutgoingEdgesFromOutput(
|
||||
event.data.blockId,
|
||||
event.data.output,
|
||||
workflowEdges,
|
||||
wfId,
|
||||
setEdgeRunStatus
|
||||
)
|
||||
|
||||
addConsole({
|
||||
input: event.data.input || {},
|
||||
output: event.data.output,
|
||||
success: true,
|
||||
durationMs: event.data.durationMs,
|
||||
startedAt: new Date(Date.now() - event.data.durationMs).toISOString(),
|
||||
executionOrder: event.data.executionOrder,
|
||||
endedAt: new Date().toISOString(),
|
||||
workflowId: targetWorkflowId,
|
||||
blockId: event.data.blockId,
|
||||
executionId,
|
||||
blockName: event.data.blockName,
|
||||
blockType: event.data.blockType,
|
||||
iterationCurrent: event.data.iterationCurrent,
|
||||
iterationTotal: event.data.iterationTotal,
|
||||
iterationType: event.data.iterationType,
|
||||
iterationContainerId: event.data.iterationContainerId,
|
||||
childWorkflowBlockId: event.data.childWorkflowBlockId,
|
||||
childWorkflowName: event.data.childWorkflowName,
|
||||
childWorkflowInstanceId: event.data.childWorkflowInstanceId,
|
||||
})
|
||||
|
||||
if (options.onBlockComplete) {
|
||||
options.onBlockComplete(event.data.blockId, event.data.output).catch(() => {})
|
||||
}
|
||||
case 'block:started':
|
||||
blockHandlers.onBlockStarted(event.data)
|
||||
break
|
||||
}
|
||||
|
||||
case 'block:error': {
|
||||
updateActiveBlockRefCount(
|
||||
activeBlockRefCounts,
|
||||
activeBlocksSet,
|
||||
event.data.blockId,
|
||||
false
|
||||
)
|
||||
setActiveBlocks(wfId, new Set(activeBlocksSet))
|
||||
|
||||
setBlockRunStatus(wfId, event.data.blockId, 'error')
|
||||
markOutgoingEdgesFromOutput(
|
||||
event.data.blockId,
|
||||
{ error: event.data.error },
|
||||
workflowEdges,
|
||||
wfId,
|
||||
setEdgeRunStatus
|
||||
)
|
||||
|
||||
addConsole({
|
||||
input: event.data.input || {},
|
||||
output: {},
|
||||
success: false,
|
||||
error: event.data.error,
|
||||
durationMs: event.data.durationMs,
|
||||
startedAt: new Date(Date.now() - event.data.durationMs).toISOString(),
|
||||
executionOrder: event.data.executionOrder,
|
||||
endedAt: new Date().toISOString(),
|
||||
workflowId: targetWorkflowId,
|
||||
blockId: event.data.blockId,
|
||||
executionId,
|
||||
blockName: event.data.blockName,
|
||||
blockType: event.data.blockType,
|
||||
iterationCurrent: event.data.iterationCurrent,
|
||||
iterationTotal: event.data.iterationTotal,
|
||||
iterationType: event.data.iterationType,
|
||||
iterationContainerId: event.data.iterationContainerId,
|
||||
childWorkflowBlockId: event.data.childWorkflowBlockId,
|
||||
childWorkflowName: event.data.childWorkflowName,
|
||||
childWorkflowInstanceId: event.data.childWorkflowInstanceId,
|
||||
})
|
||||
case 'block:completed':
|
||||
blockHandlers.onBlockCompleted(event.data)
|
||||
break
|
||||
}
|
||||
|
||||
case 'block:childWorkflowStarted': {
|
||||
const { updateConsole } = useTerminalConsoleStore.getState()
|
||||
updateConsole(
|
||||
event.data.blockId,
|
||||
{
|
||||
childWorkflowInstanceId: event.data.childWorkflowInstanceId,
|
||||
...(event.data.iterationCurrent !== undefined && {
|
||||
iterationCurrent: event.data.iterationCurrent,
|
||||
}),
|
||||
...(event.data.iterationContainerId !== undefined && {
|
||||
iterationContainerId: event.data.iterationContainerId,
|
||||
}),
|
||||
},
|
||||
executionId
|
||||
)
|
||||
case 'block:error':
|
||||
blockHandlers.onBlockError(event.data)
|
||||
break
|
||||
|
||||
case 'block:childWorkflowStarted':
|
||||
blockHandlers.onBlockChildWorkflowStarted(event.data)
|
||||
break
|
||||
}
|
||||
|
||||
case 'execution:completed':
|
||||
setCurrentExecutionId(wfId, null)
|
||||
|
||||
Reference in New Issue
Block a user