mirror of
https://github.com/simstudioai/sim.git
synced 2026-01-24 14:27:56 -05:00
Compare commits
6 Commits
fix/ghost-
...
fix/nested
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
bf22dd75ad | ||
|
|
eb767b5ede | ||
|
|
594bcac5f2 | ||
|
|
d3f20311d0 | ||
|
|
587d44ad6f | ||
|
|
8bf2e69942 |
@@ -30,6 +30,7 @@ import { normalizeName } from '@/executor/constants'
|
||||
import { ExecutionSnapshot } from '@/executor/execution/snapshot'
|
||||
import type { ExecutionMetadata, IterationContext } from '@/executor/execution/types'
|
||||
import type { NormalizedBlockOutput, StreamingExecution } from '@/executor/types'
|
||||
import { hasExecutionResult } from '@/executor/utils/errors'
|
||||
import { Serializer } from '@/serializer'
|
||||
import { CORE_TRIGGER_TYPES, type CoreTriggerType } from '@/stores/logs/filters/types'
|
||||
|
||||
@@ -467,17 +468,17 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
|
||||
}
|
||||
|
||||
return NextResponse.json(filteredResult)
|
||||
} catch (error: any) {
|
||||
const errorMessage = error.message || 'Unknown error'
|
||||
} catch (error: unknown) {
|
||||
const errorMessage = error instanceof Error ? error.message : 'Unknown error'
|
||||
logger.error(`[${requestId}] Non-SSE execution failed: ${errorMessage}`)
|
||||
|
||||
const executionResult = error.executionResult
|
||||
const executionResult = hasExecutionResult(error) ? error.executionResult : undefined
|
||||
|
||||
return NextResponse.json(
|
||||
{
|
||||
success: false,
|
||||
output: executionResult?.output,
|
||||
error: executionResult?.error || error.message || 'Execution failed',
|
||||
error: executionResult?.error || errorMessage || 'Execution failed',
|
||||
metadata: executionResult?.metadata
|
||||
? {
|
||||
duration: executionResult.metadata.duration,
|
||||
@@ -788,11 +789,11 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
|
||||
|
||||
// Cleanup base64 cache for this execution
|
||||
await cleanupExecutionBase64Cache(executionId)
|
||||
} catch (error: any) {
|
||||
const errorMessage = error.message || 'Unknown error'
|
||||
} catch (error: unknown) {
|
||||
const errorMessage = error instanceof Error ? error.message : 'Unknown error'
|
||||
logger.error(`[${requestId}] SSE execution failed: ${errorMessage}`)
|
||||
|
||||
const executionResult = error.executionResult
|
||||
const executionResult = hasExecutionResult(error) ? error.executionResult : undefined
|
||||
|
||||
sendEvent({
|
||||
type: 'execution:error',
|
||||
|
||||
@@ -16,6 +16,7 @@ import {
|
||||
} from '@/lib/workflows/triggers/triggers'
|
||||
import { useCurrentWorkflow } from '@/app/workspace/[workspaceId]/w/[workflowId]/hooks/use-current-workflow'
|
||||
import type { BlockLog, ExecutionResult, StreamingExecution } from '@/executor/types'
|
||||
import { hasExecutionResult } from '@/executor/utils/errors'
|
||||
import { coerceValue } from '@/executor/utils/start-block'
|
||||
import { subscriptionKeys } from '@/hooks/queries/subscription'
|
||||
import { useExecutionStream } from '@/hooks/use-execution-stream'
|
||||
@@ -76,17 +77,6 @@ function normalizeErrorMessage(error: unknown): string {
|
||||
return WORKFLOW_EXECUTION_FAILURE_MESSAGE
|
||||
}
|
||||
|
||||
function isExecutionResult(value: unknown): value is ExecutionResult {
|
||||
if (!isRecord(value)) return false
|
||||
return typeof value.success === 'boolean' && isRecord(value.output)
|
||||
}
|
||||
|
||||
function extractExecutionResult(error: unknown): ExecutionResult | null {
|
||||
if (!isRecord(error)) return null
|
||||
const candidate = error.executionResult
|
||||
return isExecutionResult(candidate) ? candidate : null
|
||||
}
|
||||
|
||||
export function useWorkflowExecution() {
|
||||
const queryClient = useQueryClient()
|
||||
const currentWorkflow = useCurrentWorkflow()
|
||||
@@ -1138,11 +1128,11 @@ export function useWorkflowExecution() {
|
||||
|
||||
const handleExecutionError = (error: unknown, options?: { executionId?: string }) => {
|
||||
const normalizedMessage = normalizeErrorMessage(error)
|
||||
const executionResultFromError = extractExecutionResult(error)
|
||||
|
||||
let errorResult: ExecutionResult
|
||||
|
||||
if (executionResultFromError) {
|
||||
if (hasExecutionResult(error)) {
|
||||
const executionResultFromError = error.executionResult
|
||||
const logs = Array.isArray(executionResultFromError.logs) ? executionResultFromError.logs : []
|
||||
|
||||
errorResult = {
|
||||
|
||||
@@ -81,7 +81,6 @@ import { useWorkflowDiffStore } from '@/stores/workflow-diff/store'
|
||||
import { useWorkflowRegistry } from '@/stores/workflows/registry/store'
|
||||
import { getUniqueBlockName, prepareBlockState } from '@/stores/workflows/utils'
|
||||
import { useWorkflowStore } from '@/stores/workflows/workflow/store'
|
||||
import type { BlockState } from '@/stores/workflows/workflow/types'
|
||||
|
||||
/** Lazy-loaded components for non-critical UI that can load after initial render */
|
||||
const LazyChat = lazy(() =>
|
||||
@@ -536,7 +535,8 @@ const WorkflowContent = React.memo(() => {
|
||||
return edgesToFilter.filter((edge) => {
|
||||
const sourceBlock = blocks[edge.source]
|
||||
const targetBlock = blocks[edge.target]
|
||||
return Boolean(sourceBlock && targetBlock)
|
||||
if (!sourceBlock || !targetBlock) return false
|
||||
return !isAnnotationOnlyBlock(sourceBlock.type) && !isAnnotationOnlyBlock(targetBlock.type)
|
||||
})
|
||||
}, [edges, isShowingDiff, isDiffReady, diffAnalysis, blocks])
|
||||
|
||||
@@ -1097,13 +1097,6 @@ const WorkflowContent = React.memo(() => {
|
||||
[collaborativeBatchRemoveEdges]
|
||||
)
|
||||
|
||||
const isAutoConnectSourceCandidate = useCallback((block: BlockState): boolean => {
|
||||
if (!block.enabled) return false
|
||||
if (block.type === 'response') return false
|
||||
if (isAnnotationOnlyBlock(block.type)) return false
|
||||
return true
|
||||
}, [])
|
||||
|
||||
/** Finds the closest block to a position for auto-connect. */
|
||||
const findClosestOutput = useCallback(
|
||||
(newNodePosition: { x: number; y: number }): BlockData | null => {
|
||||
@@ -1116,7 +1109,8 @@ const WorkflowContent = React.memo(() => {
|
||||
position: { x: number; y: number }
|
||||
distanceSquared: number
|
||||
} | null>((acc, [id, block]) => {
|
||||
if (!isAutoConnectSourceCandidate(block)) return acc
|
||||
if (!block.enabled) return acc
|
||||
if (block.type === 'response') return acc
|
||||
const node = nodeIndex.get(id)
|
||||
if (!node) return acc
|
||||
|
||||
@@ -1146,7 +1140,7 @@ const WorkflowContent = React.memo(() => {
|
||||
position: closest.position,
|
||||
}
|
||||
},
|
||||
[blocks, getNodes, getNodeAnchorPosition, isPointInLoopNode, isAutoConnectSourceCandidate]
|
||||
[blocks, getNodes, getNodeAnchorPosition, isPointInLoopNode]
|
||||
)
|
||||
|
||||
/** Determines the appropriate source handle based on block type. */
|
||||
@@ -1214,8 +1208,7 @@ const WorkflowContent = React.memo(() => {
|
||||
position: { x: number; y: number }
|
||||
distanceSquared: number
|
||||
} | null>((acc, block) => {
|
||||
const blockState = blocks[block.id]
|
||||
if (!blockState || !isAutoConnectSourceCandidate(blockState)) return acc
|
||||
if (block.type === 'response') return acc
|
||||
const distanceSquared =
|
||||
(block.position.x - targetPosition.x) ** 2 + (block.position.y - targetPosition.y) ** 2
|
||||
if (!acc || distanceSquared < acc.distanceSquared) {
|
||||
@@ -1232,7 +1225,7 @@ const WorkflowContent = React.memo(() => {
|
||||
}
|
||||
: undefined
|
||||
},
|
||||
[blocks, isAutoConnectSourceCandidate]
|
||||
[]
|
||||
)
|
||||
|
||||
/**
|
||||
@@ -1248,6 +1241,8 @@ const WorkflowContent = React.memo(() => {
|
||||
position: { x: number; y: number },
|
||||
targetBlockId: string,
|
||||
options: {
|
||||
blockType: string
|
||||
enableTriggerMode?: boolean
|
||||
targetParentId?: string | null
|
||||
existingChildBlocks?: { id: string; type: string; position: { x: number; y: number } }[]
|
||||
containerId?: string
|
||||
@@ -1255,6 +1250,17 @@ const WorkflowContent = React.memo(() => {
|
||||
): Edge | undefined => {
|
||||
if (!autoConnectRef.current) return undefined
|
||||
|
||||
// Don't auto-connect starter or annotation-only blocks
|
||||
if (options.blockType === 'starter' || isAnnotationOnlyBlock(options.blockType)) {
|
||||
return undefined
|
||||
}
|
||||
|
||||
// Check if target is a trigger block
|
||||
const targetBlockConfig = getBlock(options.blockType)
|
||||
const isTargetTrigger =
|
||||
options.enableTriggerMode || targetBlockConfig?.category === 'triggers'
|
||||
if (isTargetTrigger) return undefined
|
||||
|
||||
// Case 1: Adding block inside a container with existing children
|
||||
if (options.existingChildBlocks && options.existingChildBlocks.length > 0) {
|
||||
const closestBlock = findClosestBlockInSet(options.existingChildBlocks, position)
|
||||
@@ -1362,6 +1368,7 @@ const WorkflowContent = React.memo(() => {
|
||||
const name = getUniqueBlockName(baseName, blocks)
|
||||
|
||||
const autoConnectEdge = tryCreateAutoConnectEdge(position, id, {
|
||||
blockType: data.type,
|
||||
targetParentId: null,
|
||||
})
|
||||
|
||||
@@ -1432,6 +1439,8 @@ const WorkflowContent = React.memo(() => {
|
||||
.map((b) => ({ id: b.id, type: b.type, position: b.position }))
|
||||
|
||||
const autoConnectEdge = tryCreateAutoConnectEdge(relativePosition, id, {
|
||||
blockType: data.type,
|
||||
enableTriggerMode: data.enableTriggerMode,
|
||||
targetParentId: containerInfo.loopId,
|
||||
existingChildBlocks,
|
||||
containerId: containerInfo.loopId,
|
||||
@@ -1460,6 +1469,8 @@ const WorkflowContent = React.memo(() => {
|
||||
if (checkTriggerConstraints(data.type)) return
|
||||
|
||||
const autoConnectEdge = tryCreateAutoConnectEdge(position, id, {
|
||||
blockType: data.type,
|
||||
enableTriggerMode: data.enableTriggerMode,
|
||||
targetParentId: null,
|
||||
})
|
||||
|
||||
@@ -1515,6 +1526,7 @@ const WorkflowContent = React.memo(() => {
|
||||
const name = getUniqueBlockName(baseName, blocks)
|
||||
|
||||
const autoConnectEdge = tryCreateAutoConnectEdge(basePosition, id, {
|
||||
blockType: type,
|
||||
targetParentId: null,
|
||||
})
|
||||
|
||||
@@ -1550,6 +1562,8 @@ const WorkflowContent = React.memo(() => {
|
||||
const name = getUniqueBlockName(baseName, blocks)
|
||||
|
||||
const autoConnectEdge = tryCreateAutoConnectEdge(basePosition, id, {
|
||||
blockType: type,
|
||||
enableTriggerMode,
|
||||
targetParentId: null,
|
||||
})
|
||||
|
||||
@@ -2350,6 +2364,24 @@ const WorkflowContent = React.memo(() => {
|
||||
|
||||
if (!sourceNode || !targetNode) return
|
||||
|
||||
// Prevent connections to/from annotation-only blocks (non-executable)
|
||||
if (
|
||||
isAnnotationOnlyBlock(sourceNode.data?.type) ||
|
||||
isAnnotationOnlyBlock(targetNode.data?.type)
|
||||
) {
|
||||
return
|
||||
}
|
||||
|
||||
// Prevent incoming connections to trigger blocks (webhook, schedule, etc.)
|
||||
if (targetNode.data?.config?.category === 'triggers') {
|
||||
return
|
||||
}
|
||||
|
||||
// Prevent incoming connections to starter blocks (still keep separate for backward compatibility)
|
||||
if (targetNode.data?.type === 'starter') {
|
||||
return
|
||||
}
|
||||
|
||||
// Get parent information (handle container start node case)
|
||||
const sourceParentId =
|
||||
blocks[sourceNode.id]?.data?.parentId ||
|
||||
@@ -2755,6 +2787,7 @@ const WorkflowContent = React.memo(() => {
|
||||
.map((b) => ({ id: b.id, type: b.type, position: b.position }))
|
||||
|
||||
const autoConnectEdge = tryCreateAutoConnectEdge(relativePositionBefore, node.id, {
|
||||
blockType: node.data?.type || '',
|
||||
targetParentId: potentialParentId,
|
||||
existingChildBlocks,
|
||||
containerId: potentialParentId,
|
||||
|
||||
@@ -352,7 +352,7 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
|
||||
})
|
||||
})
|
||||
|
||||
useWorkflowStore.getState().replaceWorkflowState({
|
||||
useWorkflowStore.setState({
|
||||
blocks: workflowState.blocks || {},
|
||||
edges: workflowState.edges || [],
|
||||
loops: workflowState.loops || {},
|
||||
|
||||
@@ -21,7 +21,7 @@ import {
|
||||
} from '@/lib/workflows/schedules/utils'
|
||||
import { ExecutionSnapshot } from '@/executor/execution/snapshot'
|
||||
import type { ExecutionMetadata } from '@/executor/execution/types'
|
||||
import type { ExecutionResult } from '@/executor/types'
|
||||
import { hasExecutionResult } from '@/executor/utils/errors'
|
||||
import { MAX_CONSECUTIVE_FAILURES } from '@/triggers/constants'
|
||||
|
||||
const logger = createLogger('TriggerScheduleExecution')
|
||||
@@ -231,8 +231,7 @@ async function runWorkflowExecution({
|
||||
} catch (error: unknown) {
|
||||
logger.error(`[${requestId}] Early failure in scheduled workflow ${payload.workflowId}`, error)
|
||||
|
||||
const errorWithResult = error as { executionResult?: ExecutionResult }
|
||||
const executionResult = errorWithResult?.executionResult
|
||||
const executionResult = hasExecutionResult(error) ? error.executionResult : undefined
|
||||
const { traceSpans } = executionResult ? buildTraceSpans(executionResult) : { traceSpans: [] }
|
||||
|
||||
await loggingSession.safeCompleteWithError({
|
||||
|
||||
@@ -16,7 +16,7 @@ import { loadDeployedWorkflowState } from '@/lib/workflows/persistence/utils'
|
||||
import { getWorkflowById } from '@/lib/workflows/utils'
|
||||
import { ExecutionSnapshot } from '@/executor/execution/snapshot'
|
||||
import type { ExecutionMetadata } from '@/executor/execution/types'
|
||||
import type { ExecutionResult } from '@/executor/types'
|
||||
import { hasExecutionResult } from '@/executor/utils/errors'
|
||||
import { safeAssign } from '@/tools/safe-assign'
|
||||
import { getTrigger, isTriggerValid } from '@/triggers'
|
||||
|
||||
@@ -578,12 +578,13 @@ async function executeWebhookJobInternal(
|
||||
deploymentVersionId,
|
||||
})
|
||||
|
||||
const errorWithResult = error as { executionResult?: ExecutionResult }
|
||||
const executionResult = errorWithResult?.executionResult || {
|
||||
success: false,
|
||||
output: {},
|
||||
logs: [],
|
||||
}
|
||||
const executionResult = hasExecutionResult(error)
|
||||
? error.executionResult
|
||||
: {
|
||||
success: false,
|
||||
output: {},
|
||||
logs: [],
|
||||
}
|
||||
const { traceSpans } = buildTraceSpans(executionResult)
|
||||
|
||||
await loggingSession.safeCompleteWithError({
|
||||
|
||||
@@ -9,7 +9,7 @@ import { PauseResumeManager } from '@/lib/workflows/executor/human-in-the-loop-m
|
||||
import { getWorkflowById } from '@/lib/workflows/utils'
|
||||
import { ExecutionSnapshot } from '@/executor/execution/snapshot'
|
||||
import type { ExecutionMetadata } from '@/executor/execution/types'
|
||||
import type { ExecutionResult } from '@/executor/types'
|
||||
import { hasExecutionResult } from '@/executor/utils/errors'
|
||||
import type { CoreTriggerType } from '@/stores/logs/filters/types'
|
||||
|
||||
const logger = createLogger('TriggerWorkflowExecution')
|
||||
@@ -160,8 +160,7 @@ export async function executeWorkflowJob(payload: WorkflowExecutionPayload) {
|
||||
executionId,
|
||||
})
|
||||
|
||||
const errorWithResult = error as { executionResult?: ExecutionResult }
|
||||
const executionResult = errorWithResult?.executionResult
|
||||
const executionResult = hasExecutionResult(error) ? error.executionResult : undefined
|
||||
const { traceSpans } = executionResult ? buildTraceSpans(executionResult) : { traceSpans: [] }
|
||||
|
||||
await loggingSession.safeCompleteWithError({
|
||||
|
||||
31
apps/sim/executor/errors/child-workflow-error.ts
Normal file
31
apps/sim/executor/errors/child-workflow-error.ts
Normal file
@@ -0,0 +1,31 @@
|
||||
import type { TraceSpan } from '@/lib/logs/types'
|
||||
import type { ExecutionResult } from '@/executor/types'
|
||||
|
||||
interface ChildWorkflowErrorOptions {
|
||||
message: string
|
||||
childWorkflowName: string
|
||||
childTraceSpans?: TraceSpan[]
|
||||
executionResult?: ExecutionResult
|
||||
cause?: Error
|
||||
}
|
||||
|
||||
/**
|
||||
* Error raised when a child workflow execution fails.
|
||||
*/
|
||||
export class ChildWorkflowError extends Error {
|
||||
readonly childTraceSpans: TraceSpan[]
|
||||
readonly childWorkflowName: string
|
||||
readonly executionResult?: ExecutionResult
|
||||
|
||||
constructor(options: ChildWorkflowErrorOptions) {
|
||||
super(options.message, { cause: options.cause })
|
||||
this.name = 'ChildWorkflowError'
|
||||
this.childWorkflowName = options.childWorkflowName
|
||||
this.childTraceSpans = options.childTraceSpans ?? []
|
||||
this.executionResult = options.executionResult
|
||||
}
|
||||
|
||||
static isChildWorkflowError(error: unknown): error is ChildWorkflowError {
|
||||
return error instanceof ChildWorkflowError
|
||||
}
|
||||
}
|
||||
@@ -13,6 +13,7 @@ import {
|
||||
isSentinelBlockType,
|
||||
} from '@/executor/constants'
|
||||
import type { DAGNode } from '@/executor/dag/builder'
|
||||
import { ChildWorkflowError } from '@/executor/errors/child-workflow-error'
|
||||
import type { BlockStateWriter, ContextExtensions } from '@/executor/execution/types'
|
||||
import {
|
||||
generatePauseContextId,
|
||||
@@ -213,24 +214,26 @@ export class BlockExecutor {
|
||||
? resolvedInputs
|
||||
: ((block.config?.params as Record<string, any> | undefined) ?? {})
|
||||
|
||||
const errorOutput: NormalizedBlockOutput = {
|
||||
error: errorMessage,
|
||||
}
|
||||
|
||||
if (ChildWorkflowError.isChildWorkflowError(error)) {
|
||||
errorOutput.childTraceSpans = error.childTraceSpans
|
||||
errorOutput.childWorkflowName = error.childWorkflowName
|
||||
}
|
||||
|
||||
this.state.setBlockOutput(node.id, errorOutput, duration)
|
||||
|
||||
if (blockLog) {
|
||||
blockLog.endedAt = new Date().toISOString()
|
||||
blockLog.durationMs = duration
|
||||
blockLog.success = false
|
||||
blockLog.error = errorMessage
|
||||
blockLog.input = input
|
||||
blockLog.output = this.filterOutputForLog(block, errorOutput)
|
||||
}
|
||||
|
||||
const errorOutput: NormalizedBlockOutput = {
|
||||
error: errorMessage,
|
||||
}
|
||||
|
||||
if (error && typeof error === 'object' && 'childTraceSpans' in error) {
|
||||
errorOutput.childTraceSpans = (error as any).childTraceSpans
|
||||
}
|
||||
|
||||
this.state.setBlockOutput(node.id, errorOutput, duration)
|
||||
|
||||
logger.error(
|
||||
phase === 'input_resolution' ? 'Failed to resolve block inputs' : 'Block execution failed',
|
||||
{
|
||||
|
||||
@@ -13,7 +13,7 @@ import type {
|
||||
PausePoint,
|
||||
ResumeStatus,
|
||||
} from '@/executor/types'
|
||||
import { normalizeError } from '@/executor/utils/errors'
|
||||
import { attachExecutionResult, normalizeError } from '@/executor/utils/errors'
|
||||
|
||||
const logger = createLogger('ExecutionEngine')
|
||||
|
||||
@@ -170,8 +170,8 @@ export class ExecutionEngine {
|
||||
metadata: this.context.metadata,
|
||||
}
|
||||
|
||||
if (error && typeof error === 'object') {
|
||||
;(error as any).executionResult = executionResult
|
||||
if (error instanceof Error) {
|
||||
attachExecutionResult(error, executionResult)
|
||||
}
|
||||
throw error
|
||||
}
|
||||
|
||||
@@ -4,12 +4,14 @@ import type { TraceSpan } from '@/lib/logs/types'
|
||||
import type { BlockOutput } from '@/blocks/types'
|
||||
import { Executor } from '@/executor'
|
||||
import { BlockType, DEFAULTS, HTTP } from '@/executor/constants'
|
||||
import { ChildWorkflowError } from '@/executor/errors/child-workflow-error'
|
||||
import type {
|
||||
BlockHandler,
|
||||
ExecutionContext,
|
||||
ExecutionResult,
|
||||
StreamingExecution,
|
||||
} from '@/executor/types'
|
||||
import { hasExecutionResult } from '@/executor/utils/errors'
|
||||
import { buildAPIUrl, buildAuthHeaders } from '@/executor/utils/http'
|
||||
import { parseJSON } from '@/executor/utils/json'
|
||||
import { lazyCleanupInputMapping } from '@/executor/utils/lazy-cleanup'
|
||||
@@ -137,39 +139,39 @@ export class WorkflowBlockHandler implements BlockHandler {
|
||||
)
|
||||
|
||||
return mappedResult
|
||||
} catch (error: any) {
|
||||
} catch (error: unknown) {
|
||||
logger.error(`Error executing child workflow ${workflowId}:`, error)
|
||||
|
||||
const { workflows } = useWorkflowRegistry.getState()
|
||||
const workflowMetadata = workflows[workflowId]
|
||||
const childWorkflowName = workflowMetadata?.name || workflowId
|
||||
|
||||
const originalError = error.message || 'Unknown error'
|
||||
const wrappedError = new Error(
|
||||
`Error in child workflow "${childWorkflowName}": ${originalError}`
|
||||
)
|
||||
const originalError = error instanceof Error ? error.message : 'Unknown error'
|
||||
let childTraceSpans: WorkflowTraceSpan[] = []
|
||||
let executionResult: ExecutionResult | undefined
|
||||
|
||||
if (error.executionResult?.logs) {
|
||||
const executionResult = error.executionResult as ExecutionResult
|
||||
if (hasExecutionResult(error) && error.executionResult.logs) {
|
||||
executionResult = error.executionResult
|
||||
|
||||
logger.info(`Extracting child trace spans from error.executionResult`, {
|
||||
hasLogs: (executionResult.logs?.length ?? 0) > 0,
|
||||
logCount: executionResult.logs?.length ?? 0,
|
||||
})
|
||||
|
||||
const childTraceSpans = this.captureChildWorkflowLogs(
|
||||
executionResult,
|
||||
childWorkflowName,
|
||||
ctx
|
||||
)
|
||||
childTraceSpans = this.captureChildWorkflowLogs(executionResult, childWorkflowName, ctx)
|
||||
|
||||
logger.info(`Captured ${childTraceSpans.length} child trace spans from failed execution`)
|
||||
;(wrappedError as any).childTraceSpans = childTraceSpans
|
||||
} else if (error.childTraceSpans && Array.isArray(error.childTraceSpans)) {
|
||||
;(wrappedError as any).childTraceSpans = error.childTraceSpans
|
||||
} else if (ChildWorkflowError.isChildWorkflowError(error)) {
|
||||
childTraceSpans = error.childTraceSpans
|
||||
}
|
||||
|
||||
throw wrappedError
|
||||
throw new ChildWorkflowError({
|
||||
message: `Error in child workflow "${childWorkflowName}": ${originalError}`,
|
||||
childWorkflowName,
|
||||
childTraceSpans,
|
||||
executionResult,
|
||||
cause: error instanceof Error ? error : undefined,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -441,11 +443,11 @@ export class WorkflowBlockHandler implements BlockHandler {
|
||||
|
||||
if (!success) {
|
||||
logger.warn(`Child workflow ${childWorkflowName} failed`)
|
||||
const error = new Error(
|
||||
`Error in child workflow "${childWorkflowName}": ${childResult.error || 'Child workflow execution failed'}`
|
||||
)
|
||||
;(error as any).childTraceSpans = childTraceSpans || []
|
||||
throw error
|
||||
throw new ChildWorkflowError({
|
||||
message: `Error in child workflow "${childWorkflowName}": ${childResult.error || 'Child workflow execution failed'}`,
|
||||
childWorkflowName,
|
||||
childTraceSpans: childTraceSpans || [],
|
||||
})
|
||||
}
|
||||
|
||||
return {
|
||||
|
||||
@@ -1,6 +1,39 @@
|
||||
import type { ExecutionContext } from '@/executor/types'
|
||||
import type { ExecutionContext, ExecutionResult } from '@/executor/types'
|
||||
import type { SerializedBlock } from '@/serializer/types'
|
||||
|
||||
/**
|
||||
* Interface for errors that carry an ExecutionResult.
|
||||
* Used when workflow execution fails and we want to preserve partial results.
|
||||
*/
|
||||
export interface ErrorWithExecutionResult extends Error {
|
||||
executionResult: ExecutionResult
|
||||
}
|
||||
|
||||
/**
|
||||
* Type guard to check if an error carries an ExecutionResult.
|
||||
* Validates that executionResult has required fields (success, output).
|
||||
*/
|
||||
export function hasExecutionResult(error: unknown): error is ErrorWithExecutionResult {
|
||||
if (
|
||||
!(error instanceof Error) ||
|
||||
!('executionResult' in error) ||
|
||||
error.executionResult == null ||
|
||||
typeof error.executionResult !== 'object'
|
||||
) {
|
||||
return false
|
||||
}
|
||||
|
||||
const result = error.executionResult as Record<string, unknown>
|
||||
return typeof result.success === 'boolean' && result.output != null
|
||||
}
|
||||
|
||||
/**
|
||||
* Attaches an ExecutionResult to an error for propagation to parent workflows.
|
||||
*/
|
||||
export function attachExecutionResult(error: Error, executionResult: ExecutionResult): void {
|
||||
Object.assign(error, { executionResult })
|
||||
}
|
||||
|
||||
export interface BlockExecutionErrorDetails {
|
||||
block: SerializedBlock
|
||||
error: Error | string
|
||||
|
||||
@@ -100,8 +100,13 @@ export function useExecutionStream() {
|
||||
})
|
||||
|
||||
if (!response.ok) {
|
||||
const error = await response.json()
|
||||
throw new Error(error.error || 'Failed to start execution')
|
||||
const errorResponse = await response.json()
|
||||
const error = new Error(errorResponse.error || 'Failed to start execution')
|
||||
// Attach the execution result from server response for error handling
|
||||
if (errorResponse && typeof errorResponse === 'object') {
|
||||
Object.assign(error, { executionResult: errorResponse })
|
||||
}
|
||||
throw error
|
||||
}
|
||||
|
||||
if (!response.body) {
|
||||
|
||||
@@ -24,6 +24,7 @@ import type {
|
||||
IterationContext,
|
||||
} from '@/executor/execution/types'
|
||||
import type { ExecutionResult, NormalizedBlockOutput } from '@/executor/types'
|
||||
import { hasExecutionResult } from '@/executor/utils/errors'
|
||||
import { Serializer } from '@/serializer'
|
||||
import { mergeSubblockState } from '@/stores/workflows/server-utils'
|
||||
|
||||
@@ -383,20 +384,15 @@ export async function executeWorkflowCore(
|
||||
} catch (error: unknown) {
|
||||
logger.error(`[${requestId}] Execution failed:`, error)
|
||||
|
||||
const errorWithResult = error as {
|
||||
executionResult?: ExecutionResult
|
||||
message?: string
|
||||
stack?: string
|
||||
}
|
||||
const executionResult = errorWithResult?.executionResult
|
||||
const executionResult = hasExecutionResult(error) ? error.executionResult : undefined
|
||||
const { traceSpans } = executionResult ? buildTraceSpans(executionResult) : { traceSpans: [] }
|
||||
|
||||
await loggingSession.safeCompleteWithError({
|
||||
endedAt: new Date().toISOString(),
|
||||
totalDurationMs: executionResult?.metadata?.duration || 0,
|
||||
error: {
|
||||
message: errorWithResult?.message || 'Execution failed',
|
||||
stackTrace: errorWithResult?.stack,
|
||||
message: error instanceof Error ? error.message : 'Execution failed',
|
||||
stackTrace: error instanceof Error ? error.stack : undefined,
|
||||
},
|
||||
traceSpans,
|
||||
})
|
||||
|
||||
@@ -4,10 +4,9 @@ import { create } from 'zustand'
|
||||
import { devtools } from 'zustand/middleware'
|
||||
import { DEFAULT_DUPLICATE_OFFSET } from '@/lib/workflows/autolayout/constants'
|
||||
import { getBlockOutputs } from '@/lib/workflows/blocks/block-outputs'
|
||||
import { TriggerUtils } from '@/lib/workflows/triggers/triggers'
|
||||
import { getBlock } from '@/blocks'
|
||||
import type { SubBlockConfig } from '@/blocks/types'
|
||||
import { isAnnotationOnlyBlock, normalizeName, RESERVED_BLOCK_NAMES } from '@/executor/constants'
|
||||
import { normalizeName, RESERVED_BLOCK_NAMES } from '@/executor/constants'
|
||||
import { useWorkflowRegistry } from '@/stores/workflows/registry/store'
|
||||
import { useSubBlockStore } from '@/stores/workflows/subblock/store'
|
||||
import { filterNewEdges, getUniqueBlockName, mergeSubblockState } from '@/stores/workflows/utils'
|
||||
@@ -91,26 +90,6 @@ function resolveInitialSubblockValue(config: SubBlockConfig): unknown {
|
||||
return null
|
||||
}
|
||||
|
||||
function isValidEdge(
|
||||
edge: Edge,
|
||||
blocks: Record<string, { type: string; triggerMode?: boolean }>
|
||||
): boolean {
|
||||
const sourceBlock = blocks[edge.source]
|
||||
const targetBlock = blocks[edge.target]
|
||||
if (!sourceBlock || !targetBlock) return false
|
||||
if (isAnnotationOnlyBlock(sourceBlock.type)) return false
|
||||
if (isAnnotationOnlyBlock(targetBlock.type)) return false
|
||||
if (TriggerUtils.isTriggerBlock(targetBlock)) return false
|
||||
return true
|
||||
}
|
||||
|
||||
function filterValidEdges(
|
||||
edges: Edge[],
|
||||
blocks: Record<string, { type: string; triggerMode?: boolean }>
|
||||
): Edge[] {
|
||||
return edges.filter((edge) => isValidEdge(edge, blocks))
|
||||
}
|
||||
|
||||
const initialState = {
|
||||
blocks: {},
|
||||
edges: [],
|
||||
@@ -381,9 +360,8 @@ export const useWorkflowStore = create<WorkflowStore>()(
|
||||
}
|
||||
|
||||
if (edges && edges.length > 0) {
|
||||
const validEdges = filterValidEdges(edges, newBlocks)
|
||||
const existingEdgeIds = new Set(currentEdges.map((e) => e.id))
|
||||
for (const edge of validEdges) {
|
||||
for (const edge of edges) {
|
||||
if (!existingEdgeIds.has(edge.id)) {
|
||||
newEdges.push({
|
||||
id: edge.id || crypto.randomUUID(),
|
||||
@@ -517,11 +495,8 @@ export const useWorkflowStore = create<WorkflowStore>()(
|
||||
},
|
||||
|
||||
batchAddEdges: (edges: Edge[]) => {
|
||||
const blocks = get().blocks
|
||||
const currentEdges = get().edges
|
||||
|
||||
const validEdges = filterValidEdges(edges, blocks)
|
||||
const filtered = filterNewEdges(validEdges, currentEdges)
|
||||
const filtered = filterNewEdges(edges, currentEdges)
|
||||
const newEdges = [...currentEdges]
|
||||
|
||||
for (const edge of filtered) {
|
||||
@@ -537,6 +512,7 @@ export const useWorkflowStore = create<WorkflowStore>()(
|
||||
})
|
||||
}
|
||||
|
||||
const blocks = get().blocks
|
||||
set({
|
||||
blocks: { ...blocks },
|
||||
edges: newEdges,
|
||||
@@ -596,7 +572,7 @@ export const useWorkflowStore = create<WorkflowStore>()(
|
||||
) => {
|
||||
set((state) => {
|
||||
const nextBlocks = workflowState.blocks || {}
|
||||
const nextEdges = filterValidEdges(workflowState.edges || [], nextBlocks)
|
||||
const nextEdges = workflowState.edges || []
|
||||
const nextLoops =
|
||||
Object.keys(workflowState.loops || {}).length > 0
|
||||
? workflowState.loops
|
||||
@@ -1107,7 +1083,7 @@ export const useWorkflowStore = create<WorkflowStore>()(
|
||||
|
||||
const newState = {
|
||||
blocks: deployedState.blocks,
|
||||
edges: filterValidEdges(deployedState.edges ?? [], deployedState.blocks),
|
||||
edges: deployedState.edges,
|
||||
loops: deployedState.loops || {},
|
||||
parallels: deployedState.parallels || {},
|
||||
needsRedeployment: false,
|
||||
|
||||
Reference in New Issue
Block a user