Compare commits

...

6 Commits

Author SHA1 Message Date
Vikhyath Mondreti
bf22dd75ad address bugbot comments 2026-01-24 02:13:06 -08:00
Vikhyath Mondreti
eb767b5ede remove more dead code 2026-01-24 01:58:02 -08:00
Vikhyath Mondreti
594bcac5f2 type more code 2026-01-24 01:54:09 -08:00
Vikhyath Mondreti
d3f20311d0 update type check 2026-01-24 01:45:03 -08:00
Vikhyath Mondreti
587d44ad6f remove overly defensive programming 2026-01-24 01:44:53 -08:00
Vikhyath Mondreti
8bf2e69942 fix(child-workflow): nested spans handoff 2026-01-24 01:37:17 -08:00
12 changed files with 138 additions and 78 deletions

View File

@@ -30,6 +30,7 @@ import { normalizeName } from '@/executor/constants'
import { ExecutionSnapshot } from '@/executor/execution/snapshot' import { ExecutionSnapshot } from '@/executor/execution/snapshot'
import type { ExecutionMetadata, IterationContext } from '@/executor/execution/types' import type { ExecutionMetadata, IterationContext } from '@/executor/execution/types'
import type { NormalizedBlockOutput, StreamingExecution } from '@/executor/types' import type { NormalizedBlockOutput, StreamingExecution } from '@/executor/types'
import { hasExecutionResult } from '@/executor/utils/errors'
import { Serializer } from '@/serializer' import { Serializer } from '@/serializer'
import { CORE_TRIGGER_TYPES, type CoreTriggerType } from '@/stores/logs/filters/types' import { CORE_TRIGGER_TYPES, type CoreTriggerType } from '@/stores/logs/filters/types'
@@ -467,17 +468,17 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
} }
return NextResponse.json(filteredResult) return NextResponse.json(filteredResult)
} catch (error: any) { } catch (error: unknown) {
const errorMessage = error.message || 'Unknown error' const errorMessage = error instanceof Error ? error.message : 'Unknown error'
logger.error(`[${requestId}] Non-SSE execution failed: ${errorMessage}`) logger.error(`[${requestId}] Non-SSE execution failed: ${errorMessage}`)
const executionResult = error.executionResult const executionResult = hasExecutionResult(error) ? error.executionResult : undefined
return NextResponse.json( return NextResponse.json(
{ {
success: false, success: false,
output: executionResult?.output, output: executionResult?.output,
error: executionResult?.error || error.message || 'Execution failed', error: executionResult?.error || errorMessage || 'Execution failed',
metadata: executionResult?.metadata metadata: executionResult?.metadata
? { ? {
duration: executionResult.metadata.duration, duration: executionResult.metadata.duration,
@@ -788,11 +789,11 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
// Cleanup base64 cache for this execution // Cleanup base64 cache for this execution
await cleanupExecutionBase64Cache(executionId) await cleanupExecutionBase64Cache(executionId)
} catch (error: any) { } catch (error: unknown) {
const errorMessage = error.message || 'Unknown error' const errorMessage = error instanceof Error ? error.message : 'Unknown error'
logger.error(`[${requestId}] SSE execution failed: ${errorMessage}`) logger.error(`[${requestId}] SSE execution failed: ${errorMessage}`)
const executionResult = error.executionResult const executionResult = hasExecutionResult(error) ? error.executionResult : undefined
sendEvent({ sendEvent({
type: 'execution:error', type: 'execution:error',

View File

@@ -16,6 +16,7 @@ import {
} from '@/lib/workflows/triggers/triggers' } from '@/lib/workflows/triggers/triggers'
import { useCurrentWorkflow } from '@/app/workspace/[workspaceId]/w/[workflowId]/hooks/use-current-workflow' import { useCurrentWorkflow } from '@/app/workspace/[workspaceId]/w/[workflowId]/hooks/use-current-workflow'
import type { BlockLog, ExecutionResult, StreamingExecution } from '@/executor/types' import type { BlockLog, ExecutionResult, StreamingExecution } from '@/executor/types'
import { hasExecutionResult } from '@/executor/utils/errors'
import { coerceValue } from '@/executor/utils/start-block' import { coerceValue } from '@/executor/utils/start-block'
import { subscriptionKeys } from '@/hooks/queries/subscription' import { subscriptionKeys } from '@/hooks/queries/subscription'
import { useExecutionStream } from '@/hooks/use-execution-stream' import { useExecutionStream } from '@/hooks/use-execution-stream'
@@ -76,17 +77,6 @@ function normalizeErrorMessage(error: unknown): string {
return WORKFLOW_EXECUTION_FAILURE_MESSAGE return WORKFLOW_EXECUTION_FAILURE_MESSAGE
} }
function isExecutionResult(value: unknown): value is ExecutionResult {
if (!isRecord(value)) return false
return typeof value.success === 'boolean' && isRecord(value.output)
}
function extractExecutionResult(error: unknown): ExecutionResult | null {
if (!isRecord(error)) return null
const candidate = error.executionResult
return isExecutionResult(candidate) ? candidate : null
}
export function useWorkflowExecution() { export function useWorkflowExecution() {
const queryClient = useQueryClient() const queryClient = useQueryClient()
const currentWorkflow = useCurrentWorkflow() const currentWorkflow = useCurrentWorkflow()
@@ -1138,11 +1128,11 @@ export function useWorkflowExecution() {
const handleExecutionError = (error: unknown, options?: { executionId?: string }) => { const handleExecutionError = (error: unknown, options?: { executionId?: string }) => {
const normalizedMessage = normalizeErrorMessage(error) const normalizedMessage = normalizeErrorMessage(error)
const executionResultFromError = extractExecutionResult(error)
let errorResult: ExecutionResult let errorResult: ExecutionResult
if (executionResultFromError) { if (hasExecutionResult(error)) {
const executionResultFromError = error.executionResult
const logs = Array.isArray(executionResultFromError.logs) ? executionResultFromError.logs : [] const logs = Array.isArray(executionResultFromError.logs) ? executionResultFromError.logs : []
errorResult = { errorResult = {

View File

@@ -21,7 +21,7 @@ import {
} from '@/lib/workflows/schedules/utils' } from '@/lib/workflows/schedules/utils'
import { ExecutionSnapshot } from '@/executor/execution/snapshot' import { ExecutionSnapshot } from '@/executor/execution/snapshot'
import type { ExecutionMetadata } from '@/executor/execution/types' import type { ExecutionMetadata } from '@/executor/execution/types'
import type { ExecutionResult } from '@/executor/types' import { hasExecutionResult } from '@/executor/utils/errors'
import { MAX_CONSECUTIVE_FAILURES } from '@/triggers/constants' import { MAX_CONSECUTIVE_FAILURES } from '@/triggers/constants'
const logger = createLogger('TriggerScheduleExecution') const logger = createLogger('TriggerScheduleExecution')
@@ -231,8 +231,7 @@ async function runWorkflowExecution({
} catch (error: unknown) { } catch (error: unknown) {
logger.error(`[${requestId}] Early failure in scheduled workflow ${payload.workflowId}`, error) logger.error(`[${requestId}] Early failure in scheduled workflow ${payload.workflowId}`, error)
const errorWithResult = error as { executionResult?: ExecutionResult } const executionResult = hasExecutionResult(error) ? error.executionResult : undefined
const executionResult = errorWithResult?.executionResult
const { traceSpans } = executionResult ? buildTraceSpans(executionResult) : { traceSpans: [] } const { traceSpans } = executionResult ? buildTraceSpans(executionResult) : { traceSpans: [] }
await loggingSession.safeCompleteWithError({ await loggingSession.safeCompleteWithError({

View File

@@ -16,7 +16,7 @@ import { loadDeployedWorkflowState } from '@/lib/workflows/persistence/utils'
import { getWorkflowById } from '@/lib/workflows/utils' import { getWorkflowById } from '@/lib/workflows/utils'
import { ExecutionSnapshot } from '@/executor/execution/snapshot' import { ExecutionSnapshot } from '@/executor/execution/snapshot'
import type { ExecutionMetadata } from '@/executor/execution/types' import type { ExecutionMetadata } from '@/executor/execution/types'
import type { ExecutionResult } from '@/executor/types' import { hasExecutionResult } from '@/executor/utils/errors'
import { safeAssign } from '@/tools/safe-assign' import { safeAssign } from '@/tools/safe-assign'
import { getTrigger, isTriggerValid } from '@/triggers' import { getTrigger, isTriggerValid } from '@/triggers'
@@ -578,12 +578,13 @@ async function executeWebhookJobInternal(
deploymentVersionId, deploymentVersionId,
}) })
const errorWithResult = error as { executionResult?: ExecutionResult } const executionResult = hasExecutionResult(error)
const executionResult = errorWithResult?.executionResult || { ? error.executionResult
success: false, : {
output: {}, success: false,
logs: [], output: {},
} logs: [],
}
const { traceSpans } = buildTraceSpans(executionResult) const { traceSpans } = buildTraceSpans(executionResult)
await loggingSession.safeCompleteWithError({ await loggingSession.safeCompleteWithError({

View File

@@ -9,7 +9,7 @@ import { PauseResumeManager } from '@/lib/workflows/executor/human-in-the-loop-m
import { getWorkflowById } from '@/lib/workflows/utils' import { getWorkflowById } from '@/lib/workflows/utils'
import { ExecutionSnapshot } from '@/executor/execution/snapshot' import { ExecutionSnapshot } from '@/executor/execution/snapshot'
import type { ExecutionMetadata } from '@/executor/execution/types' import type { ExecutionMetadata } from '@/executor/execution/types'
import type { ExecutionResult } from '@/executor/types' import { hasExecutionResult } from '@/executor/utils/errors'
import type { CoreTriggerType } from '@/stores/logs/filters/types' import type { CoreTriggerType } from '@/stores/logs/filters/types'
const logger = createLogger('TriggerWorkflowExecution') const logger = createLogger('TriggerWorkflowExecution')
@@ -160,8 +160,7 @@ export async function executeWorkflowJob(payload: WorkflowExecutionPayload) {
executionId, executionId,
}) })
const errorWithResult = error as { executionResult?: ExecutionResult } const executionResult = hasExecutionResult(error) ? error.executionResult : undefined
const executionResult = errorWithResult?.executionResult
const { traceSpans } = executionResult ? buildTraceSpans(executionResult) : { traceSpans: [] } const { traceSpans } = executionResult ? buildTraceSpans(executionResult) : { traceSpans: [] }
await loggingSession.safeCompleteWithError({ await loggingSession.safeCompleteWithError({

View File

@@ -0,0 +1,31 @@
import type { TraceSpan } from '@/lib/logs/types'
import type { ExecutionResult } from '@/executor/types'
interface ChildWorkflowErrorOptions {
message: string
childWorkflowName: string
childTraceSpans?: TraceSpan[]
executionResult?: ExecutionResult
cause?: Error
}
/**
* Error raised when a child workflow execution fails.
*/
export class ChildWorkflowError extends Error {
readonly childTraceSpans: TraceSpan[]
readonly childWorkflowName: string
readonly executionResult?: ExecutionResult
constructor(options: ChildWorkflowErrorOptions) {
super(options.message, { cause: options.cause })
this.name = 'ChildWorkflowError'
this.childWorkflowName = options.childWorkflowName
this.childTraceSpans = options.childTraceSpans ?? []
this.executionResult = options.executionResult
}
static isChildWorkflowError(error: unknown): error is ChildWorkflowError {
return error instanceof ChildWorkflowError
}
}

View File

@@ -13,6 +13,7 @@ import {
isSentinelBlockType, isSentinelBlockType,
} from '@/executor/constants' } from '@/executor/constants'
import type { DAGNode } from '@/executor/dag/builder' import type { DAGNode } from '@/executor/dag/builder'
import { ChildWorkflowError } from '@/executor/errors/child-workflow-error'
import type { BlockStateWriter, ContextExtensions } from '@/executor/execution/types' import type { BlockStateWriter, ContextExtensions } from '@/executor/execution/types'
import { import {
generatePauseContextId, generatePauseContextId,
@@ -213,24 +214,26 @@ export class BlockExecutor {
? resolvedInputs ? resolvedInputs
: ((block.config?.params as Record<string, any> | undefined) ?? {}) : ((block.config?.params as Record<string, any> | undefined) ?? {})
const errorOutput: NormalizedBlockOutput = {
error: errorMessage,
}
if (ChildWorkflowError.isChildWorkflowError(error)) {
errorOutput.childTraceSpans = error.childTraceSpans
errorOutput.childWorkflowName = error.childWorkflowName
}
this.state.setBlockOutput(node.id, errorOutput, duration)
if (blockLog) { if (blockLog) {
blockLog.endedAt = new Date().toISOString() blockLog.endedAt = new Date().toISOString()
blockLog.durationMs = duration blockLog.durationMs = duration
blockLog.success = false blockLog.success = false
blockLog.error = errorMessage blockLog.error = errorMessage
blockLog.input = input blockLog.input = input
blockLog.output = this.filterOutputForLog(block, errorOutput)
} }
const errorOutput: NormalizedBlockOutput = {
error: errorMessage,
}
if (error && typeof error === 'object' && 'childTraceSpans' in error) {
errorOutput.childTraceSpans = (error as any).childTraceSpans
}
this.state.setBlockOutput(node.id, errorOutput, duration)
logger.error( logger.error(
phase === 'input_resolution' ? 'Failed to resolve block inputs' : 'Block execution failed', phase === 'input_resolution' ? 'Failed to resolve block inputs' : 'Block execution failed',
{ {

View File

@@ -13,7 +13,7 @@ import type {
PausePoint, PausePoint,
ResumeStatus, ResumeStatus,
} from '@/executor/types' } from '@/executor/types'
import { normalizeError } from '@/executor/utils/errors' import { attachExecutionResult, normalizeError } from '@/executor/utils/errors'
const logger = createLogger('ExecutionEngine') const logger = createLogger('ExecutionEngine')
@@ -170,8 +170,8 @@ export class ExecutionEngine {
metadata: this.context.metadata, metadata: this.context.metadata,
} }
if (error && typeof error === 'object') { if (error instanceof Error) {
;(error as any).executionResult = executionResult attachExecutionResult(error, executionResult)
} }
throw error throw error
} }

View File

@@ -4,12 +4,14 @@ import type { TraceSpan } from '@/lib/logs/types'
import type { BlockOutput } from '@/blocks/types' import type { BlockOutput } from '@/blocks/types'
import { Executor } from '@/executor' import { Executor } from '@/executor'
import { BlockType, DEFAULTS, HTTP } from '@/executor/constants' import { BlockType, DEFAULTS, HTTP } from '@/executor/constants'
import { ChildWorkflowError } from '@/executor/errors/child-workflow-error'
import type { import type {
BlockHandler, BlockHandler,
ExecutionContext, ExecutionContext,
ExecutionResult, ExecutionResult,
StreamingExecution, StreamingExecution,
} from '@/executor/types' } from '@/executor/types'
import { hasExecutionResult } from '@/executor/utils/errors'
import { buildAPIUrl, buildAuthHeaders } from '@/executor/utils/http' import { buildAPIUrl, buildAuthHeaders } from '@/executor/utils/http'
import { parseJSON } from '@/executor/utils/json' import { parseJSON } from '@/executor/utils/json'
import { lazyCleanupInputMapping } from '@/executor/utils/lazy-cleanup' import { lazyCleanupInputMapping } from '@/executor/utils/lazy-cleanup'
@@ -137,39 +139,39 @@ export class WorkflowBlockHandler implements BlockHandler {
) )
return mappedResult return mappedResult
} catch (error: any) { } catch (error: unknown) {
logger.error(`Error executing child workflow ${workflowId}:`, error) logger.error(`Error executing child workflow ${workflowId}:`, error)
const { workflows } = useWorkflowRegistry.getState() const { workflows } = useWorkflowRegistry.getState()
const workflowMetadata = workflows[workflowId] const workflowMetadata = workflows[workflowId]
const childWorkflowName = workflowMetadata?.name || workflowId const childWorkflowName = workflowMetadata?.name || workflowId
const originalError = error.message || 'Unknown error' const originalError = error instanceof Error ? error.message : 'Unknown error'
const wrappedError = new Error( let childTraceSpans: WorkflowTraceSpan[] = []
`Error in child workflow "${childWorkflowName}": ${originalError}` let executionResult: ExecutionResult | undefined
)
if (error.executionResult?.logs) { if (hasExecutionResult(error) && error.executionResult.logs) {
const executionResult = error.executionResult as ExecutionResult executionResult = error.executionResult
logger.info(`Extracting child trace spans from error.executionResult`, { logger.info(`Extracting child trace spans from error.executionResult`, {
hasLogs: (executionResult.logs?.length ?? 0) > 0, hasLogs: (executionResult.logs?.length ?? 0) > 0,
logCount: executionResult.logs?.length ?? 0, logCount: executionResult.logs?.length ?? 0,
}) })
const childTraceSpans = this.captureChildWorkflowLogs( childTraceSpans = this.captureChildWorkflowLogs(executionResult, childWorkflowName, ctx)
executionResult,
childWorkflowName,
ctx
)
logger.info(`Captured ${childTraceSpans.length} child trace spans from failed execution`) logger.info(`Captured ${childTraceSpans.length} child trace spans from failed execution`)
;(wrappedError as any).childTraceSpans = childTraceSpans } else if (ChildWorkflowError.isChildWorkflowError(error)) {
} else if (error.childTraceSpans && Array.isArray(error.childTraceSpans)) { childTraceSpans = error.childTraceSpans
;(wrappedError as any).childTraceSpans = error.childTraceSpans
} }
throw wrappedError throw new ChildWorkflowError({
message: `Error in child workflow "${childWorkflowName}": ${originalError}`,
childWorkflowName,
childTraceSpans,
executionResult,
cause: error instanceof Error ? error : undefined,
})
} }
} }
@@ -441,11 +443,11 @@ export class WorkflowBlockHandler implements BlockHandler {
if (!success) { if (!success) {
logger.warn(`Child workflow ${childWorkflowName} failed`) logger.warn(`Child workflow ${childWorkflowName} failed`)
const error = new Error( throw new ChildWorkflowError({
`Error in child workflow "${childWorkflowName}": ${childResult.error || 'Child workflow execution failed'}` message: `Error in child workflow "${childWorkflowName}": ${childResult.error || 'Child workflow execution failed'}`,
) childWorkflowName,
;(error as any).childTraceSpans = childTraceSpans || [] childTraceSpans: childTraceSpans || [],
throw error })
} }
return { return {

View File

@@ -1,6 +1,39 @@
import type { ExecutionContext } from '@/executor/types' import type { ExecutionContext, ExecutionResult } from '@/executor/types'
import type { SerializedBlock } from '@/serializer/types' import type { SerializedBlock } from '@/serializer/types'
/**
* Interface for errors that carry an ExecutionResult.
* Used when workflow execution fails and we want to preserve partial results.
*/
export interface ErrorWithExecutionResult extends Error {
executionResult: ExecutionResult
}
/**
* Type guard to check if an error carries an ExecutionResult.
* Validates that executionResult has required fields (success, output).
*/
export function hasExecutionResult(error: unknown): error is ErrorWithExecutionResult {
if (
!(error instanceof Error) ||
!('executionResult' in error) ||
error.executionResult == null ||
typeof error.executionResult !== 'object'
) {
return false
}
const result = error.executionResult as Record<string, unknown>
return typeof result.success === 'boolean' && result.output != null
}
/**
* Attaches an ExecutionResult to an error for propagation to parent workflows.
*/
export function attachExecutionResult(error: Error, executionResult: ExecutionResult): void {
Object.assign(error, { executionResult })
}
export interface BlockExecutionErrorDetails { export interface BlockExecutionErrorDetails {
block: SerializedBlock block: SerializedBlock
error: Error | string error: Error | string

View File

@@ -100,8 +100,13 @@ export function useExecutionStream() {
}) })
if (!response.ok) { if (!response.ok) {
const error = await response.json() const errorResponse = await response.json()
throw new Error(error.error || 'Failed to start execution') const error = new Error(errorResponse.error || 'Failed to start execution')
// Attach the execution result from server response for error handling
if (errorResponse && typeof errorResponse === 'object') {
Object.assign(error, { executionResult: errorResponse })
}
throw error
} }
if (!response.body) { if (!response.body) {

View File

@@ -24,6 +24,7 @@ import type {
IterationContext, IterationContext,
} from '@/executor/execution/types' } from '@/executor/execution/types'
import type { ExecutionResult, NormalizedBlockOutput } from '@/executor/types' import type { ExecutionResult, NormalizedBlockOutput } from '@/executor/types'
import { hasExecutionResult } from '@/executor/utils/errors'
import { Serializer } from '@/serializer' import { Serializer } from '@/serializer'
import { mergeSubblockState } from '@/stores/workflows/server-utils' import { mergeSubblockState } from '@/stores/workflows/server-utils'
@@ -383,20 +384,15 @@ export async function executeWorkflowCore(
} catch (error: unknown) { } catch (error: unknown) {
logger.error(`[${requestId}] Execution failed:`, error) logger.error(`[${requestId}] Execution failed:`, error)
const errorWithResult = error as { const executionResult = hasExecutionResult(error) ? error.executionResult : undefined
executionResult?: ExecutionResult
message?: string
stack?: string
}
const executionResult = errorWithResult?.executionResult
const { traceSpans } = executionResult ? buildTraceSpans(executionResult) : { traceSpans: [] } const { traceSpans } = executionResult ? buildTraceSpans(executionResult) : { traceSpans: [] }
await loggingSession.safeCompleteWithError({ await loggingSession.safeCompleteWithError({
endedAt: new Date().toISOString(), endedAt: new Date().toISOString(),
totalDurationMs: executionResult?.metadata?.duration || 0, totalDurationMs: executionResult?.metadata?.duration || 0,
error: { error: {
message: errorWithResult?.message || 'Execution failed', message: error instanceof Error ? error.message : 'Execution failed',
stackTrace: errorWithResult?.stack, stackTrace: error instanceof Error ? error.stack : undefined,
}, },
traceSpans, traceSpans,
}) })