Compare commits

..

6 Commits

Author SHA1 Message Date
Vikhyath Mondreti
bf22dd75ad address bugbot comments 2026-01-24 02:13:06 -08:00
Vikhyath Mondreti
eb767b5ede remove more dead code 2026-01-24 01:58:02 -08:00
Vikhyath Mondreti
594bcac5f2 type more code 2026-01-24 01:54:09 -08:00
Vikhyath Mondreti
d3f20311d0 update type check 2026-01-24 01:45:03 -08:00
Vikhyath Mondreti
587d44ad6f remove overly defensive programming 2026-01-24 01:44:53 -08:00
Vikhyath Mondreti
8bf2e69942 fix(child-workflow): nested spans handoff 2026-01-24 01:37:17 -08:00
14 changed files with 139 additions and 117 deletions

View File

@@ -30,6 +30,7 @@ import { normalizeName } from '@/executor/constants'
import { ExecutionSnapshot } from '@/executor/execution/snapshot'
import type { ExecutionMetadata, IterationContext } from '@/executor/execution/types'
import type { NormalizedBlockOutput, StreamingExecution } from '@/executor/types'
import { hasExecutionResult } from '@/executor/utils/errors'
import { Serializer } from '@/serializer'
import { CORE_TRIGGER_TYPES, type CoreTriggerType } from '@/stores/logs/filters/types'
@@ -467,17 +468,17 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
}
return NextResponse.json(filteredResult)
} catch (error: any) {
const errorMessage = error.message || 'Unknown error'
} catch (error: unknown) {
const errorMessage = error instanceof Error ? error.message : 'Unknown error'
logger.error(`[${requestId}] Non-SSE execution failed: ${errorMessage}`)
const executionResult = error.executionResult
const executionResult = hasExecutionResult(error) ? error.executionResult : undefined
return NextResponse.json(
{
success: false,
output: executionResult?.output,
error: executionResult?.error || error.message || 'Execution failed',
error: executionResult?.error || errorMessage || 'Execution failed',
metadata: executionResult?.metadata
? {
duration: executionResult.metadata.duration,
@@ -788,11 +789,11 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
// Cleanup base64 cache for this execution
await cleanupExecutionBase64Cache(executionId)
} catch (error: any) {
const errorMessage = error.message || 'Unknown error'
} catch (error: unknown) {
const errorMessage = error instanceof Error ? error.message : 'Unknown error'
logger.error(`[${requestId}] SSE execution failed: ${errorMessage}`)
const executionResult = error.executionResult
const executionResult = hasExecutionResult(error) ? error.executionResult : undefined
sendEvent({
type: 'execution:error',

View File

@@ -16,6 +16,7 @@ import {
} from '@/lib/workflows/triggers/triggers'
import { useCurrentWorkflow } from '@/app/workspace/[workspaceId]/w/[workflowId]/hooks/use-current-workflow'
import type { BlockLog, ExecutionResult, StreamingExecution } from '@/executor/types'
import { hasExecutionResult } from '@/executor/utils/errors'
import { coerceValue } from '@/executor/utils/start-block'
import { subscriptionKeys } from '@/hooks/queries/subscription'
import { useExecutionStream } from '@/hooks/use-execution-stream'
@@ -76,17 +77,6 @@ function normalizeErrorMessage(error: unknown): string {
return WORKFLOW_EXECUTION_FAILURE_MESSAGE
}
function isExecutionResult(value: unknown): value is ExecutionResult {
if (!isRecord(value)) return false
return typeof value.success === 'boolean' && isRecord(value.output)
}
function extractExecutionResult(error: unknown): ExecutionResult | null {
if (!isRecord(error)) return null
const candidate = error.executionResult
return isExecutionResult(candidate) ? candidate : null
}
export function useWorkflowExecution() {
const queryClient = useQueryClient()
const currentWorkflow = useCurrentWorkflow()
@@ -1138,11 +1128,11 @@ export function useWorkflowExecution() {
const handleExecutionError = (error: unknown, options?: { executionId?: string }) => {
const normalizedMessage = normalizeErrorMessage(error)
const executionResultFromError = extractExecutionResult(error)
let errorResult: ExecutionResult
if (executionResultFromError) {
if (hasExecutionResult(error)) {
const executionResultFromError = error.executionResult
const logs = Array.isArray(executionResultFromError.logs) ? executionResultFromError.logs : []
errorResult = {

View File

@@ -21,7 +21,7 @@ import {
} from '@/lib/workflows/schedules/utils'
import { ExecutionSnapshot } from '@/executor/execution/snapshot'
import type { ExecutionMetadata } from '@/executor/execution/types'
import type { ExecutionResult } from '@/executor/types'
import { hasExecutionResult } from '@/executor/utils/errors'
import { MAX_CONSECUTIVE_FAILURES } from '@/triggers/constants'
const logger = createLogger('TriggerScheduleExecution')
@@ -231,8 +231,7 @@ async function runWorkflowExecution({
} catch (error: unknown) {
logger.error(`[${requestId}] Early failure in scheduled workflow ${payload.workflowId}`, error)
const errorWithResult = error as { executionResult?: ExecutionResult }
const executionResult = errorWithResult?.executionResult
const executionResult = hasExecutionResult(error) ? error.executionResult : undefined
const { traceSpans } = executionResult ? buildTraceSpans(executionResult) : { traceSpans: [] }
await loggingSession.safeCompleteWithError({

View File

@@ -16,7 +16,7 @@ import { loadDeployedWorkflowState } from '@/lib/workflows/persistence/utils'
import { getWorkflowById } from '@/lib/workflows/utils'
import { ExecutionSnapshot } from '@/executor/execution/snapshot'
import type { ExecutionMetadata } from '@/executor/execution/types'
import type { ExecutionResult } from '@/executor/types'
import { hasExecutionResult } from '@/executor/utils/errors'
import { safeAssign } from '@/tools/safe-assign'
import { getTrigger, isTriggerValid } from '@/triggers'
@@ -578,12 +578,13 @@ async function executeWebhookJobInternal(
deploymentVersionId,
})
const errorWithResult = error as { executionResult?: ExecutionResult }
const executionResult = errorWithResult?.executionResult || {
success: false,
output: {},
logs: [],
}
const executionResult = hasExecutionResult(error)
? error.executionResult
: {
success: false,
output: {},
logs: [],
}
const { traceSpans } = buildTraceSpans(executionResult)
await loggingSession.safeCompleteWithError({

View File

@@ -9,7 +9,7 @@ import { PauseResumeManager } from '@/lib/workflows/executor/human-in-the-loop-m
import { getWorkflowById } from '@/lib/workflows/utils'
import { ExecutionSnapshot } from '@/executor/execution/snapshot'
import type { ExecutionMetadata } from '@/executor/execution/types'
import type { ExecutionResult } from '@/executor/types'
import { hasExecutionResult } from '@/executor/utils/errors'
import type { CoreTriggerType } from '@/stores/logs/filters/types'
const logger = createLogger('TriggerWorkflowExecution')
@@ -160,8 +160,7 @@ export async function executeWorkflowJob(payload: WorkflowExecutionPayload) {
executionId,
})
const errorWithResult = error as { executionResult?: ExecutionResult }
const executionResult = errorWithResult?.executionResult
const executionResult = hasExecutionResult(error) ? error.executionResult : undefined
const { traceSpans } = executionResult ? buildTraceSpans(executionResult) : { traceSpans: [] }
await loggingSession.safeCompleteWithError({

View File

@@ -0,0 +1,31 @@
import type { TraceSpan } from '@/lib/logs/types'
import type { ExecutionResult } from '@/executor/types'
interface ChildWorkflowErrorOptions {
message: string
childWorkflowName: string
childTraceSpans?: TraceSpan[]
executionResult?: ExecutionResult
cause?: Error
}
/**
* Error raised when a child workflow execution fails.
*/
export class ChildWorkflowError extends Error {
readonly childTraceSpans: TraceSpan[]
readonly childWorkflowName: string
readonly executionResult?: ExecutionResult
constructor(options: ChildWorkflowErrorOptions) {
super(options.message, { cause: options.cause })
this.name = 'ChildWorkflowError'
this.childWorkflowName = options.childWorkflowName
this.childTraceSpans = options.childTraceSpans ?? []
this.executionResult = options.executionResult
}
static isChildWorkflowError(error: unknown): error is ChildWorkflowError {
return error instanceof ChildWorkflowError
}
}

View File

@@ -13,6 +13,7 @@ import {
isSentinelBlockType,
} from '@/executor/constants'
import type { DAGNode } from '@/executor/dag/builder'
import { ChildWorkflowError } from '@/executor/errors/child-workflow-error'
import type { BlockStateWriter, ContextExtensions } from '@/executor/execution/types'
import {
generatePauseContextId,
@@ -213,24 +214,26 @@ export class BlockExecutor {
? resolvedInputs
: ((block.config?.params as Record<string, any> | undefined) ?? {})
const errorOutput: NormalizedBlockOutput = {
error: errorMessage,
}
if (ChildWorkflowError.isChildWorkflowError(error)) {
errorOutput.childTraceSpans = error.childTraceSpans
errorOutput.childWorkflowName = error.childWorkflowName
}
this.state.setBlockOutput(node.id, errorOutput, duration)
if (blockLog) {
blockLog.endedAt = new Date().toISOString()
blockLog.durationMs = duration
blockLog.success = false
blockLog.error = errorMessage
blockLog.input = input
blockLog.output = this.filterOutputForLog(block, errorOutput)
}
const errorOutput: NormalizedBlockOutput = {
error: errorMessage,
}
if (error && typeof error === 'object' && 'childTraceSpans' in error) {
errorOutput.childTraceSpans = (error as any).childTraceSpans
}
this.state.setBlockOutput(node.id, errorOutput, duration)
logger.error(
phase === 'input_resolution' ? 'Failed to resolve block inputs' : 'Block execution failed',
{

View File

@@ -13,7 +13,7 @@ import type {
PausePoint,
ResumeStatus,
} from '@/executor/types'
import { normalizeError } from '@/executor/utils/errors'
import { attachExecutionResult, normalizeError } from '@/executor/utils/errors'
const logger = createLogger('ExecutionEngine')
@@ -170,8 +170,8 @@ export class ExecutionEngine {
metadata: this.context.metadata,
}
if (error && typeof error === 'object') {
;(error as any).executionResult = executionResult
if (error instanceof Error) {
attachExecutionResult(error, executionResult)
}
throw error
}

View File

@@ -85,11 +85,7 @@ export class ConditionBlockHandler implements BlockHandler {
const sourceBlockId = ctx.workflow?.connections.find((conn) => conn.target === block.id)?.source
const evalContext = this.buildEvaluationContext(ctx, sourceBlockId)
const rawSourceOutput = sourceBlockId ? ctx.blockStates.get(sourceBlockId)?.output : null
// Filter out _pauseMetadata from source output to prevent the engine from
// thinking this block is pausing (it was already resumed by the HITL block)
const sourceOutput = this.filterPauseMetadata(rawSourceOutput)
const sourceOutput = sourceBlockId ? ctx.blockStates.get(sourceBlockId)?.output : null
const outgoingConnections = ctx.workflow?.connections.filter((conn) => conn.source === block.id)
@@ -129,14 +125,6 @@ export class ConditionBlockHandler implements BlockHandler {
}
}
private filterPauseMetadata(output: any): any {
if (!output || typeof output !== 'object') {
return output
}
const { _pauseMetadata, ...rest } = output
return rest
}
private parseConditions(input: any): Array<{ id: string; title: string; value: string }> {
try {
const conditions = Array.isArray(input) ? input : JSON.parse(input || '[]')

View File

@@ -4,12 +4,14 @@ import type { TraceSpan } from '@/lib/logs/types'
import type { BlockOutput } from '@/blocks/types'
import { Executor } from '@/executor'
import { BlockType, DEFAULTS, HTTP } from '@/executor/constants'
import { ChildWorkflowError } from '@/executor/errors/child-workflow-error'
import type {
BlockHandler,
ExecutionContext,
ExecutionResult,
StreamingExecution,
} from '@/executor/types'
import { hasExecutionResult } from '@/executor/utils/errors'
import { buildAPIUrl, buildAuthHeaders } from '@/executor/utils/http'
import { parseJSON } from '@/executor/utils/json'
import { lazyCleanupInputMapping } from '@/executor/utils/lazy-cleanup'
@@ -137,39 +139,39 @@ export class WorkflowBlockHandler implements BlockHandler {
)
return mappedResult
} catch (error: any) {
} catch (error: unknown) {
logger.error(`Error executing child workflow ${workflowId}:`, error)
const { workflows } = useWorkflowRegistry.getState()
const workflowMetadata = workflows[workflowId]
const childWorkflowName = workflowMetadata?.name || workflowId
const originalError = error.message || 'Unknown error'
const wrappedError = new Error(
`Error in child workflow "${childWorkflowName}": ${originalError}`
)
const originalError = error instanceof Error ? error.message : 'Unknown error'
let childTraceSpans: WorkflowTraceSpan[] = []
let executionResult: ExecutionResult | undefined
if (error.executionResult?.logs) {
const executionResult = error.executionResult as ExecutionResult
if (hasExecutionResult(error) && error.executionResult.logs) {
executionResult = error.executionResult
logger.info(`Extracting child trace spans from error.executionResult`, {
hasLogs: (executionResult.logs?.length ?? 0) > 0,
logCount: executionResult.logs?.length ?? 0,
})
const childTraceSpans = this.captureChildWorkflowLogs(
executionResult,
childWorkflowName,
ctx
)
childTraceSpans = this.captureChildWorkflowLogs(executionResult, childWorkflowName, ctx)
logger.info(`Captured ${childTraceSpans.length} child trace spans from failed execution`)
;(wrappedError as any).childTraceSpans = childTraceSpans
} else if (error.childTraceSpans && Array.isArray(error.childTraceSpans)) {
;(wrappedError as any).childTraceSpans = error.childTraceSpans
} else if (ChildWorkflowError.isChildWorkflowError(error)) {
childTraceSpans = error.childTraceSpans
}
throw wrappedError
throw new ChildWorkflowError({
message: `Error in child workflow "${childWorkflowName}": ${originalError}`,
childWorkflowName,
childTraceSpans,
executionResult,
cause: error instanceof Error ? error : undefined,
})
}
}
@@ -441,11 +443,11 @@ export class WorkflowBlockHandler implements BlockHandler {
if (!success) {
logger.warn(`Child workflow ${childWorkflowName} failed`)
const error = new Error(
`Error in child workflow "${childWorkflowName}": ${childResult.error || 'Child workflow execution failed'}`
)
;(error as any).childTraceSpans = childTraceSpans || []
throw error
throw new ChildWorkflowError({
message: `Error in child workflow "${childWorkflowName}": ${childResult.error || 'Child workflow execution failed'}`,
childWorkflowName,
childTraceSpans: childTraceSpans || [],
})
}
return {

View File

@@ -1,6 +1,39 @@
import type { ExecutionContext } from '@/executor/types'
import type { ExecutionContext, ExecutionResult } from '@/executor/types'
import type { SerializedBlock } from '@/serializer/types'
/**
* Interface for errors that carry an ExecutionResult.
* Used when workflow execution fails and we want to preserve partial results.
*/
export interface ErrorWithExecutionResult extends Error {
executionResult: ExecutionResult
}
/**
* Type guard to check if an error carries an ExecutionResult.
* Validates that executionResult has required fields (success, output).
*/
export function hasExecutionResult(error: unknown): error is ErrorWithExecutionResult {
if (
!(error instanceof Error) ||
!('executionResult' in error) ||
error.executionResult == null ||
typeof error.executionResult !== 'object'
) {
return false
}
const result = error.executionResult as Record<string, unknown>
return typeof result.success === 'boolean' && result.output != null
}
/**
* Attaches an ExecutionResult to an error for propagation to parent workflows.
*/
export function attachExecutionResult(error: Error, executionResult: ExecutionResult): void {
Object.assign(error, { executionResult })
}
export interface BlockExecutionErrorDetails {
block: SerializedBlock
error: Error | string

View File

@@ -100,8 +100,13 @@ export function useExecutionStream() {
})
if (!response.ok) {
const error = await response.json()
throw new Error(error.error || 'Failed to start execution')
const errorResponse = await response.json()
const error = new Error(errorResponse.error || 'Failed to start execution')
// Attach the execution result from server response for error handling
if (errorResponse && typeof errorResponse === 'object') {
Object.assign(error, { executionResult: errorResponse })
}
throw error
}
if (!response.body) {

View File

@@ -24,6 +24,7 @@ import type {
IterationContext,
} from '@/executor/execution/types'
import type { ExecutionResult, NormalizedBlockOutput } from '@/executor/types'
import { hasExecutionResult } from '@/executor/utils/errors'
import { Serializer } from '@/serializer'
import { mergeSubblockState } from '@/stores/workflows/server-utils'
@@ -383,20 +384,15 @@ export async function executeWorkflowCore(
} catch (error: unknown) {
logger.error(`[${requestId}] Execution failed:`, error)
const errorWithResult = error as {
executionResult?: ExecutionResult
message?: string
stack?: string
}
const executionResult = errorWithResult?.executionResult
const executionResult = hasExecutionResult(error) ? error.executionResult : undefined
const { traceSpans } = executionResult ? buildTraceSpans(executionResult) : { traceSpans: [] }
await loggingSession.safeCompleteWithError({
endedAt: new Date().toISOString(),
totalDurationMs: executionResult?.metadata?.duration || 0,
error: {
message: errorWithResult?.message || 'Execution failed',
stackTrace: errorWithResult?.stack,
message: error instanceof Error ? error.message : 'Execution failed',
stackTrace: error instanceof Error ? error.stack : undefined,
},
traceSpans,
})

View File

@@ -567,32 +567,6 @@ export class PauseResumeManager {
stateCopy.blockStates[stateBlockKey] = pauseBlockState
// Update the block log entry with the merged output so logs show the submission data
if (Array.isArray(stateCopy.blockLogs)) {
const blockLogIndex = stateCopy.blockLogs.findIndex(
(log: { blockId: string }) =>
log.blockId === stateBlockKey ||
log.blockId === pauseBlockId ||
log.blockId === contextId
)
if (blockLogIndex !== -1) {
// Filter output for logging (exclude internal fields and response)
const filteredOutput: Record<string, unknown> = {}
for (const [key, value] of Object.entries(mergedOutput)) {
if (key.startsWith('_')) continue
if (key === 'response') continue
filteredOutput[key] = value
}
stateCopy.blockLogs[blockLogIndex] = {
...stateCopy.blockLogs[blockLogIndex],
blockId: stateBlockKey,
output: filteredOutput,
durationMs: pauseDurationMs,
endedAt: new Date().toISOString(),
}
}
}
if (Array.isArray(stateCopy.executedBlocks)) {
const filtered = stateCopy.executedBlocks.filter(
(id: string) => id !== pauseBlockId && id !== contextId