fix(child-workflow): nested spans handoff

This commit is contained in:
Vikhyath Mondreti
2026-01-24 01:37:17 -08:00
parent 12100e6881
commit 8bf2e69942
3 changed files with 64 additions and 28 deletions

View File

@@ -0,0 +1,31 @@
import type { TraceSpan } from '@/lib/logs/types'
import type { ExecutionResult } from '@/executor/types'
interface ChildWorkflowErrorOptions {
message: string
childWorkflowName: string
childTraceSpans?: TraceSpan[]
executionResult?: ExecutionResult
cause?: Error
}
/**
* Error raised when a child workflow execution fails.
*/
export class ChildWorkflowError extends Error {
readonly childTraceSpans: TraceSpan[]
readonly childWorkflowName: string
readonly executionResult?: ExecutionResult
constructor(options: ChildWorkflowErrorOptions) {
super(options.message, { cause: options.cause })
this.name = 'ChildWorkflowError'
this.childWorkflowName = options.childWorkflowName
this.childTraceSpans = options.childTraceSpans ?? []
this.executionResult = options.executionResult
}
static isChildWorkflowError(error: unknown): error is ChildWorkflowError {
return error instanceof ChildWorkflowError
}
}

View File

@@ -13,6 +13,7 @@ import {
isSentinelBlockType,
} from '@/executor/constants'
import type { DAGNode } from '@/executor/dag/builder'
import { ChildWorkflowError } from '@/executor/errors/child-workflow-error'
import type { BlockStateWriter, ContextExtensions } from '@/executor/execution/types'
import {
generatePauseContextId,
@@ -213,24 +214,28 @@ export class BlockExecutor {
? resolvedInputs
: ((block.config?.params as Record<string, any> | undefined) ?? {})
const errorOutput: NormalizedBlockOutput = {
error: errorMessage,
}
if (ChildWorkflowError.isChildWorkflowError(error)) {
errorOutput.childTraceSpans = error.childTraceSpans
errorOutput.childWorkflowName = error.childWorkflowName
} else if (error && typeof error === 'object' && 'childTraceSpans' in error) {
errorOutput.childTraceSpans = (error as any).childTraceSpans
}
this.state.setBlockOutput(node.id, errorOutput, duration)
if (blockLog) {
blockLog.endedAt = new Date().toISOString()
blockLog.durationMs = duration
blockLog.success = false
blockLog.error = errorMessage
blockLog.input = input
blockLog.output = this.filterOutputForLog(block, errorOutput)
}
const errorOutput: NormalizedBlockOutput = {
error: errorMessage,
}
if (error && typeof error === 'object' && 'childTraceSpans' in error) {
errorOutput.childTraceSpans = (error as any).childTraceSpans
}
this.state.setBlockOutput(node.id, errorOutput, duration)
logger.error(
phase === 'input_resolution' ? 'Failed to resolve block inputs' : 'Block execution failed',
{

View File

@@ -4,6 +4,7 @@ import type { TraceSpan } from '@/lib/logs/types'
import type { BlockOutput } from '@/blocks/types'
import { Executor } from '@/executor'
import { BlockType, DEFAULTS, HTTP } from '@/executor/constants'
import { ChildWorkflowError } from '@/executor/errors/child-workflow-error'
import type {
BlockHandler,
ExecutionContext,
@@ -145,31 +146,30 @@ export class WorkflowBlockHandler implements BlockHandler {
const childWorkflowName = workflowMetadata?.name || workflowId
const originalError = error.message || 'Unknown error'
const wrappedError = new Error(
`Error in child workflow "${childWorkflowName}": ${originalError}`
)
let childTraceSpans: WorkflowTraceSpan[] = []
let executionResult: ExecutionResult | undefined
if (error.executionResult?.logs) {
const executionResult = error.executionResult as ExecutionResult
executionResult = error.executionResult as ExecutionResult
logger.info(`Extracting child trace spans from error.executionResult`, {
hasLogs: (executionResult.logs?.length ?? 0) > 0,
logCount: executionResult.logs?.length ?? 0,
})
const childTraceSpans = this.captureChildWorkflowLogs(
executionResult,
childWorkflowName,
ctx
)
childTraceSpans = this.captureChildWorkflowLogs(executionResult, childWorkflowName, ctx)
logger.info(`Captured ${childTraceSpans.length} child trace spans from failed execution`)
;(wrappedError as any).childTraceSpans = childTraceSpans
} else if (error.childTraceSpans && Array.isArray(error.childTraceSpans)) {
;(wrappedError as any).childTraceSpans = error.childTraceSpans
childTraceSpans = error.childTraceSpans
}
throw wrappedError
throw new ChildWorkflowError({
message: `Error in child workflow "${childWorkflowName}": ${originalError}`,
childWorkflowName,
childTraceSpans,
executionResult,
cause: error,
})
}
}
@@ -441,11 +441,11 @@ export class WorkflowBlockHandler implements BlockHandler {
if (!success) {
logger.warn(`Child workflow ${childWorkflowName} failed`)
const error = new Error(
`Error in child workflow "${childWorkflowName}": ${childResult.error || 'Child workflow execution failed'}`
)
;(error as any).childTraceSpans = childTraceSpans || []
throw error
throw new ChildWorkflowError({
message: `Error in child workflow "${childWorkflowName}": ${childResult.error || 'Child workflow execution failed'}`,
childWorkflowName,
childTraceSpans: childTraceSpans || [],
})
}
return {