mirror of
https://github.com/simstudioai/sim.git
synced 2026-01-24 14:27:56 -05:00
Compare commits
6 Commits
fix/hitl
...
fix/nested
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
bf22dd75ad | ||
|
|
eb767b5ede | ||
|
|
594bcac5f2 | ||
|
|
d3f20311d0 | ||
|
|
587d44ad6f | ||
|
|
8bf2e69942 |
@@ -30,6 +30,7 @@ import { normalizeName } from '@/executor/constants'
|
|||||||
import { ExecutionSnapshot } from '@/executor/execution/snapshot'
|
import { ExecutionSnapshot } from '@/executor/execution/snapshot'
|
||||||
import type { ExecutionMetadata, IterationContext } from '@/executor/execution/types'
|
import type { ExecutionMetadata, IterationContext } from '@/executor/execution/types'
|
||||||
import type { NormalizedBlockOutput, StreamingExecution } from '@/executor/types'
|
import type { NormalizedBlockOutput, StreamingExecution } from '@/executor/types'
|
||||||
|
import { hasExecutionResult } from '@/executor/utils/errors'
|
||||||
import { Serializer } from '@/serializer'
|
import { Serializer } from '@/serializer'
|
||||||
import { CORE_TRIGGER_TYPES, type CoreTriggerType } from '@/stores/logs/filters/types'
|
import { CORE_TRIGGER_TYPES, type CoreTriggerType } from '@/stores/logs/filters/types'
|
||||||
|
|
||||||
@@ -467,17 +468,17 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
|
|||||||
}
|
}
|
||||||
|
|
||||||
return NextResponse.json(filteredResult)
|
return NextResponse.json(filteredResult)
|
||||||
} catch (error: any) {
|
} catch (error: unknown) {
|
||||||
const errorMessage = error.message || 'Unknown error'
|
const errorMessage = error instanceof Error ? error.message : 'Unknown error'
|
||||||
logger.error(`[${requestId}] Non-SSE execution failed: ${errorMessage}`)
|
logger.error(`[${requestId}] Non-SSE execution failed: ${errorMessage}`)
|
||||||
|
|
||||||
const executionResult = error.executionResult
|
const executionResult = hasExecutionResult(error) ? error.executionResult : undefined
|
||||||
|
|
||||||
return NextResponse.json(
|
return NextResponse.json(
|
||||||
{
|
{
|
||||||
success: false,
|
success: false,
|
||||||
output: executionResult?.output,
|
output: executionResult?.output,
|
||||||
error: executionResult?.error || error.message || 'Execution failed',
|
error: executionResult?.error || errorMessage || 'Execution failed',
|
||||||
metadata: executionResult?.metadata
|
metadata: executionResult?.metadata
|
||||||
? {
|
? {
|
||||||
duration: executionResult.metadata.duration,
|
duration: executionResult.metadata.duration,
|
||||||
@@ -788,11 +789,11 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
|
|||||||
|
|
||||||
// Cleanup base64 cache for this execution
|
// Cleanup base64 cache for this execution
|
||||||
await cleanupExecutionBase64Cache(executionId)
|
await cleanupExecutionBase64Cache(executionId)
|
||||||
} catch (error: any) {
|
} catch (error: unknown) {
|
||||||
const errorMessage = error.message || 'Unknown error'
|
const errorMessage = error instanceof Error ? error.message : 'Unknown error'
|
||||||
logger.error(`[${requestId}] SSE execution failed: ${errorMessage}`)
|
logger.error(`[${requestId}] SSE execution failed: ${errorMessage}`)
|
||||||
|
|
||||||
const executionResult = error.executionResult
|
const executionResult = hasExecutionResult(error) ? error.executionResult : undefined
|
||||||
|
|
||||||
sendEvent({
|
sendEvent({
|
||||||
type: 'execution:error',
|
type: 'execution:error',
|
||||||
|
|||||||
@@ -16,6 +16,7 @@ import {
|
|||||||
} from '@/lib/workflows/triggers/triggers'
|
} from '@/lib/workflows/triggers/triggers'
|
||||||
import { useCurrentWorkflow } from '@/app/workspace/[workspaceId]/w/[workflowId]/hooks/use-current-workflow'
|
import { useCurrentWorkflow } from '@/app/workspace/[workspaceId]/w/[workflowId]/hooks/use-current-workflow'
|
||||||
import type { BlockLog, ExecutionResult, StreamingExecution } from '@/executor/types'
|
import type { BlockLog, ExecutionResult, StreamingExecution } from '@/executor/types'
|
||||||
|
import { hasExecutionResult } from '@/executor/utils/errors'
|
||||||
import { coerceValue } from '@/executor/utils/start-block'
|
import { coerceValue } from '@/executor/utils/start-block'
|
||||||
import { subscriptionKeys } from '@/hooks/queries/subscription'
|
import { subscriptionKeys } from '@/hooks/queries/subscription'
|
||||||
import { useExecutionStream } from '@/hooks/use-execution-stream'
|
import { useExecutionStream } from '@/hooks/use-execution-stream'
|
||||||
@@ -76,17 +77,6 @@ function normalizeErrorMessage(error: unknown): string {
|
|||||||
return WORKFLOW_EXECUTION_FAILURE_MESSAGE
|
return WORKFLOW_EXECUTION_FAILURE_MESSAGE
|
||||||
}
|
}
|
||||||
|
|
||||||
function isExecutionResult(value: unknown): value is ExecutionResult {
|
|
||||||
if (!isRecord(value)) return false
|
|
||||||
return typeof value.success === 'boolean' && isRecord(value.output)
|
|
||||||
}
|
|
||||||
|
|
||||||
function extractExecutionResult(error: unknown): ExecutionResult | null {
|
|
||||||
if (!isRecord(error)) return null
|
|
||||||
const candidate = error.executionResult
|
|
||||||
return isExecutionResult(candidate) ? candidate : null
|
|
||||||
}
|
|
||||||
|
|
||||||
export function useWorkflowExecution() {
|
export function useWorkflowExecution() {
|
||||||
const queryClient = useQueryClient()
|
const queryClient = useQueryClient()
|
||||||
const currentWorkflow = useCurrentWorkflow()
|
const currentWorkflow = useCurrentWorkflow()
|
||||||
@@ -1138,11 +1128,11 @@ export function useWorkflowExecution() {
|
|||||||
|
|
||||||
const handleExecutionError = (error: unknown, options?: { executionId?: string }) => {
|
const handleExecutionError = (error: unknown, options?: { executionId?: string }) => {
|
||||||
const normalizedMessage = normalizeErrorMessage(error)
|
const normalizedMessage = normalizeErrorMessage(error)
|
||||||
const executionResultFromError = extractExecutionResult(error)
|
|
||||||
|
|
||||||
let errorResult: ExecutionResult
|
let errorResult: ExecutionResult
|
||||||
|
|
||||||
if (executionResultFromError) {
|
if (hasExecutionResult(error)) {
|
||||||
|
const executionResultFromError = error.executionResult
|
||||||
const logs = Array.isArray(executionResultFromError.logs) ? executionResultFromError.logs : []
|
const logs = Array.isArray(executionResultFromError.logs) ? executionResultFromError.logs : []
|
||||||
|
|
||||||
errorResult = {
|
errorResult = {
|
||||||
|
|||||||
@@ -21,7 +21,7 @@ import {
|
|||||||
} from '@/lib/workflows/schedules/utils'
|
} from '@/lib/workflows/schedules/utils'
|
||||||
import { ExecutionSnapshot } from '@/executor/execution/snapshot'
|
import { ExecutionSnapshot } from '@/executor/execution/snapshot'
|
||||||
import type { ExecutionMetadata } from '@/executor/execution/types'
|
import type { ExecutionMetadata } from '@/executor/execution/types'
|
||||||
import type { ExecutionResult } from '@/executor/types'
|
import { hasExecutionResult } from '@/executor/utils/errors'
|
||||||
import { MAX_CONSECUTIVE_FAILURES } from '@/triggers/constants'
|
import { MAX_CONSECUTIVE_FAILURES } from '@/triggers/constants'
|
||||||
|
|
||||||
const logger = createLogger('TriggerScheduleExecution')
|
const logger = createLogger('TriggerScheduleExecution')
|
||||||
@@ -231,8 +231,7 @@ async function runWorkflowExecution({
|
|||||||
} catch (error: unknown) {
|
} catch (error: unknown) {
|
||||||
logger.error(`[${requestId}] Early failure in scheduled workflow ${payload.workflowId}`, error)
|
logger.error(`[${requestId}] Early failure in scheduled workflow ${payload.workflowId}`, error)
|
||||||
|
|
||||||
const errorWithResult = error as { executionResult?: ExecutionResult }
|
const executionResult = hasExecutionResult(error) ? error.executionResult : undefined
|
||||||
const executionResult = errorWithResult?.executionResult
|
|
||||||
const { traceSpans } = executionResult ? buildTraceSpans(executionResult) : { traceSpans: [] }
|
const { traceSpans } = executionResult ? buildTraceSpans(executionResult) : { traceSpans: [] }
|
||||||
|
|
||||||
await loggingSession.safeCompleteWithError({
|
await loggingSession.safeCompleteWithError({
|
||||||
|
|||||||
@@ -16,7 +16,7 @@ import { loadDeployedWorkflowState } from '@/lib/workflows/persistence/utils'
|
|||||||
import { getWorkflowById } from '@/lib/workflows/utils'
|
import { getWorkflowById } from '@/lib/workflows/utils'
|
||||||
import { ExecutionSnapshot } from '@/executor/execution/snapshot'
|
import { ExecutionSnapshot } from '@/executor/execution/snapshot'
|
||||||
import type { ExecutionMetadata } from '@/executor/execution/types'
|
import type { ExecutionMetadata } from '@/executor/execution/types'
|
||||||
import type { ExecutionResult } from '@/executor/types'
|
import { hasExecutionResult } from '@/executor/utils/errors'
|
||||||
import { safeAssign } from '@/tools/safe-assign'
|
import { safeAssign } from '@/tools/safe-assign'
|
||||||
import { getTrigger, isTriggerValid } from '@/triggers'
|
import { getTrigger, isTriggerValid } from '@/triggers'
|
||||||
|
|
||||||
@@ -578,12 +578,13 @@ async function executeWebhookJobInternal(
|
|||||||
deploymentVersionId,
|
deploymentVersionId,
|
||||||
})
|
})
|
||||||
|
|
||||||
const errorWithResult = error as { executionResult?: ExecutionResult }
|
const executionResult = hasExecutionResult(error)
|
||||||
const executionResult = errorWithResult?.executionResult || {
|
? error.executionResult
|
||||||
success: false,
|
: {
|
||||||
output: {},
|
success: false,
|
||||||
logs: [],
|
output: {},
|
||||||
}
|
logs: [],
|
||||||
|
}
|
||||||
const { traceSpans } = buildTraceSpans(executionResult)
|
const { traceSpans } = buildTraceSpans(executionResult)
|
||||||
|
|
||||||
await loggingSession.safeCompleteWithError({
|
await loggingSession.safeCompleteWithError({
|
||||||
|
|||||||
@@ -9,7 +9,7 @@ import { PauseResumeManager } from '@/lib/workflows/executor/human-in-the-loop-m
|
|||||||
import { getWorkflowById } from '@/lib/workflows/utils'
|
import { getWorkflowById } from '@/lib/workflows/utils'
|
||||||
import { ExecutionSnapshot } from '@/executor/execution/snapshot'
|
import { ExecutionSnapshot } from '@/executor/execution/snapshot'
|
||||||
import type { ExecutionMetadata } from '@/executor/execution/types'
|
import type { ExecutionMetadata } from '@/executor/execution/types'
|
||||||
import type { ExecutionResult } from '@/executor/types'
|
import { hasExecutionResult } from '@/executor/utils/errors'
|
||||||
import type { CoreTriggerType } from '@/stores/logs/filters/types'
|
import type { CoreTriggerType } from '@/stores/logs/filters/types'
|
||||||
|
|
||||||
const logger = createLogger('TriggerWorkflowExecution')
|
const logger = createLogger('TriggerWorkflowExecution')
|
||||||
@@ -160,8 +160,7 @@ export async function executeWorkflowJob(payload: WorkflowExecutionPayload) {
|
|||||||
executionId,
|
executionId,
|
||||||
})
|
})
|
||||||
|
|
||||||
const errorWithResult = error as { executionResult?: ExecutionResult }
|
const executionResult = hasExecutionResult(error) ? error.executionResult : undefined
|
||||||
const executionResult = errorWithResult?.executionResult
|
|
||||||
const { traceSpans } = executionResult ? buildTraceSpans(executionResult) : { traceSpans: [] }
|
const { traceSpans } = executionResult ? buildTraceSpans(executionResult) : { traceSpans: [] }
|
||||||
|
|
||||||
await loggingSession.safeCompleteWithError({
|
await loggingSession.safeCompleteWithError({
|
||||||
|
|||||||
31
apps/sim/executor/errors/child-workflow-error.ts
Normal file
31
apps/sim/executor/errors/child-workflow-error.ts
Normal file
@@ -0,0 +1,31 @@
|
|||||||
|
import type { TraceSpan } from '@/lib/logs/types'
|
||||||
|
import type { ExecutionResult } from '@/executor/types'
|
||||||
|
|
||||||
|
interface ChildWorkflowErrorOptions {
|
||||||
|
message: string
|
||||||
|
childWorkflowName: string
|
||||||
|
childTraceSpans?: TraceSpan[]
|
||||||
|
executionResult?: ExecutionResult
|
||||||
|
cause?: Error
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Error raised when a child workflow execution fails.
|
||||||
|
*/
|
||||||
|
export class ChildWorkflowError extends Error {
|
||||||
|
readonly childTraceSpans: TraceSpan[]
|
||||||
|
readonly childWorkflowName: string
|
||||||
|
readonly executionResult?: ExecutionResult
|
||||||
|
|
||||||
|
constructor(options: ChildWorkflowErrorOptions) {
|
||||||
|
super(options.message, { cause: options.cause })
|
||||||
|
this.name = 'ChildWorkflowError'
|
||||||
|
this.childWorkflowName = options.childWorkflowName
|
||||||
|
this.childTraceSpans = options.childTraceSpans ?? []
|
||||||
|
this.executionResult = options.executionResult
|
||||||
|
}
|
||||||
|
|
||||||
|
static isChildWorkflowError(error: unknown): error is ChildWorkflowError {
|
||||||
|
return error instanceof ChildWorkflowError
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -13,6 +13,7 @@ import {
|
|||||||
isSentinelBlockType,
|
isSentinelBlockType,
|
||||||
} from '@/executor/constants'
|
} from '@/executor/constants'
|
||||||
import type { DAGNode } from '@/executor/dag/builder'
|
import type { DAGNode } from '@/executor/dag/builder'
|
||||||
|
import { ChildWorkflowError } from '@/executor/errors/child-workflow-error'
|
||||||
import type { BlockStateWriter, ContextExtensions } from '@/executor/execution/types'
|
import type { BlockStateWriter, ContextExtensions } from '@/executor/execution/types'
|
||||||
import {
|
import {
|
||||||
generatePauseContextId,
|
generatePauseContextId,
|
||||||
@@ -213,24 +214,26 @@ export class BlockExecutor {
|
|||||||
? resolvedInputs
|
? resolvedInputs
|
||||||
: ((block.config?.params as Record<string, any> | undefined) ?? {})
|
: ((block.config?.params as Record<string, any> | undefined) ?? {})
|
||||||
|
|
||||||
|
const errorOutput: NormalizedBlockOutput = {
|
||||||
|
error: errorMessage,
|
||||||
|
}
|
||||||
|
|
||||||
|
if (ChildWorkflowError.isChildWorkflowError(error)) {
|
||||||
|
errorOutput.childTraceSpans = error.childTraceSpans
|
||||||
|
errorOutput.childWorkflowName = error.childWorkflowName
|
||||||
|
}
|
||||||
|
|
||||||
|
this.state.setBlockOutput(node.id, errorOutput, duration)
|
||||||
|
|
||||||
if (blockLog) {
|
if (blockLog) {
|
||||||
blockLog.endedAt = new Date().toISOString()
|
blockLog.endedAt = new Date().toISOString()
|
||||||
blockLog.durationMs = duration
|
blockLog.durationMs = duration
|
||||||
blockLog.success = false
|
blockLog.success = false
|
||||||
blockLog.error = errorMessage
|
blockLog.error = errorMessage
|
||||||
blockLog.input = input
|
blockLog.input = input
|
||||||
|
blockLog.output = this.filterOutputForLog(block, errorOutput)
|
||||||
}
|
}
|
||||||
|
|
||||||
const errorOutput: NormalizedBlockOutput = {
|
|
||||||
error: errorMessage,
|
|
||||||
}
|
|
||||||
|
|
||||||
if (error && typeof error === 'object' && 'childTraceSpans' in error) {
|
|
||||||
errorOutput.childTraceSpans = (error as any).childTraceSpans
|
|
||||||
}
|
|
||||||
|
|
||||||
this.state.setBlockOutput(node.id, errorOutput, duration)
|
|
||||||
|
|
||||||
logger.error(
|
logger.error(
|
||||||
phase === 'input_resolution' ? 'Failed to resolve block inputs' : 'Block execution failed',
|
phase === 'input_resolution' ? 'Failed to resolve block inputs' : 'Block execution failed',
|
||||||
{
|
{
|
||||||
|
|||||||
@@ -13,7 +13,7 @@ import type {
|
|||||||
PausePoint,
|
PausePoint,
|
||||||
ResumeStatus,
|
ResumeStatus,
|
||||||
} from '@/executor/types'
|
} from '@/executor/types'
|
||||||
import { normalizeError } from '@/executor/utils/errors'
|
import { attachExecutionResult, normalizeError } from '@/executor/utils/errors'
|
||||||
|
|
||||||
const logger = createLogger('ExecutionEngine')
|
const logger = createLogger('ExecutionEngine')
|
||||||
|
|
||||||
@@ -170,8 +170,8 @@ export class ExecutionEngine {
|
|||||||
metadata: this.context.metadata,
|
metadata: this.context.metadata,
|
||||||
}
|
}
|
||||||
|
|
||||||
if (error && typeof error === 'object') {
|
if (error instanceof Error) {
|
||||||
;(error as any).executionResult = executionResult
|
attachExecutionResult(error, executionResult)
|
||||||
}
|
}
|
||||||
throw error
|
throw error
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -4,12 +4,14 @@ import type { TraceSpan } from '@/lib/logs/types'
|
|||||||
import type { BlockOutput } from '@/blocks/types'
|
import type { BlockOutput } from '@/blocks/types'
|
||||||
import { Executor } from '@/executor'
|
import { Executor } from '@/executor'
|
||||||
import { BlockType, DEFAULTS, HTTP } from '@/executor/constants'
|
import { BlockType, DEFAULTS, HTTP } from '@/executor/constants'
|
||||||
|
import { ChildWorkflowError } from '@/executor/errors/child-workflow-error'
|
||||||
import type {
|
import type {
|
||||||
BlockHandler,
|
BlockHandler,
|
||||||
ExecutionContext,
|
ExecutionContext,
|
||||||
ExecutionResult,
|
ExecutionResult,
|
||||||
StreamingExecution,
|
StreamingExecution,
|
||||||
} from '@/executor/types'
|
} from '@/executor/types'
|
||||||
|
import { hasExecutionResult } from '@/executor/utils/errors'
|
||||||
import { buildAPIUrl, buildAuthHeaders } from '@/executor/utils/http'
|
import { buildAPIUrl, buildAuthHeaders } from '@/executor/utils/http'
|
||||||
import { parseJSON } from '@/executor/utils/json'
|
import { parseJSON } from '@/executor/utils/json'
|
||||||
import { lazyCleanupInputMapping } from '@/executor/utils/lazy-cleanup'
|
import { lazyCleanupInputMapping } from '@/executor/utils/lazy-cleanup'
|
||||||
@@ -137,39 +139,39 @@ export class WorkflowBlockHandler implements BlockHandler {
|
|||||||
)
|
)
|
||||||
|
|
||||||
return mappedResult
|
return mappedResult
|
||||||
} catch (error: any) {
|
} catch (error: unknown) {
|
||||||
logger.error(`Error executing child workflow ${workflowId}:`, error)
|
logger.error(`Error executing child workflow ${workflowId}:`, error)
|
||||||
|
|
||||||
const { workflows } = useWorkflowRegistry.getState()
|
const { workflows } = useWorkflowRegistry.getState()
|
||||||
const workflowMetadata = workflows[workflowId]
|
const workflowMetadata = workflows[workflowId]
|
||||||
const childWorkflowName = workflowMetadata?.name || workflowId
|
const childWorkflowName = workflowMetadata?.name || workflowId
|
||||||
|
|
||||||
const originalError = error.message || 'Unknown error'
|
const originalError = error instanceof Error ? error.message : 'Unknown error'
|
||||||
const wrappedError = new Error(
|
let childTraceSpans: WorkflowTraceSpan[] = []
|
||||||
`Error in child workflow "${childWorkflowName}": ${originalError}`
|
let executionResult: ExecutionResult | undefined
|
||||||
)
|
|
||||||
|
|
||||||
if (error.executionResult?.logs) {
|
if (hasExecutionResult(error) && error.executionResult.logs) {
|
||||||
const executionResult = error.executionResult as ExecutionResult
|
executionResult = error.executionResult
|
||||||
|
|
||||||
logger.info(`Extracting child trace spans from error.executionResult`, {
|
logger.info(`Extracting child trace spans from error.executionResult`, {
|
||||||
hasLogs: (executionResult.logs?.length ?? 0) > 0,
|
hasLogs: (executionResult.logs?.length ?? 0) > 0,
|
||||||
logCount: executionResult.logs?.length ?? 0,
|
logCount: executionResult.logs?.length ?? 0,
|
||||||
})
|
})
|
||||||
|
|
||||||
const childTraceSpans = this.captureChildWorkflowLogs(
|
childTraceSpans = this.captureChildWorkflowLogs(executionResult, childWorkflowName, ctx)
|
||||||
executionResult,
|
|
||||||
childWorkflowName,
|
|
||||||
ctx
|
|
||||||
)
|
|
||||||
|
|
||||||
logger.info(`Captured ${childTraceSpans.length} child trace spans from failed execution`)
|
logger.info(`Captured ${childTraceSpans.length} child trace spans from failed execution`)
|
||||||
;(wrappedError as any).childTraceSpans = childTraceSpans
|
} else if (ChildWorkflowError.isChildWorkflowError(error)) {
|
||||||
} else if (error.childTraceSpans && Array.isArray(error.childTraceSpans)) {
|
childTraceSpans = error.childTraceSpans
|
||||||
;(wrappedError as any).childTraceSpans = error.childTraceSpans
|
|
||||||
}
|
}
|
||||||
|
|
||||||
throw wrappedError
|
throw new ChildWorkflowError({
|
||||||
|
message: `Error in child workflow "${childWorkflowName}": ${originalError}`,
|
||||||
|
childWorkflowName,
|
||||||
|
childTraceSpans,
|
||||||
|
executionResult,
|
||||||
|
cause: error instanceof Error ? error : undefined,
|
||||||
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -441,11 +443,11 @@ export class WorkflowBlockHandler implements BlockHandler {
|
|||||||
|
|
||||||
if (!success) {
|
if (!success) {
|
||||||
logger.warn(`Child workflow ${childWorkflowName} failed`)
|
logger.warn(`Child workflow ${childWorkflowName} failed`)
|
||||||
const error = new Error(
|
throw new ChildWorkflowError({
|
||||||
`Error in child workflow "${childWorkflowName}": ${childResult.error || 'Child workflow execution failed'}`
|
message: `Error in child workflow "${childWorkflowName}": ${childResult.error || 'Child workflow execution failed'}`,
|
||||||
)
|
childWorkflowName,
|
||||||
;(error as any).childTraceSpans = childTraceSpans || []
|
childTraceSpans: childTraceSpans || [],
|
||||||
throw error
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
return {
|
return {
|
||||||
|
|||||||
@@ -1,6 +1,39 @@
|
|||||||
import type { ExecutionContext } from '@/executor/types'
|
import type { ExecutionContext, ExecutionResult } from '@/executor/types'
|
||||||
import type { SerializedBlock } from '@/serializer/types'
|
import type { SerializedBlock } from '@/serializer/types'
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Interface for errors that carry an ExecutionResult.
|
||||||
|
* Used when workflow execution fails and we want to preserve partial results.
|
||||||
|
*/
|
||||||
|
export interface ErrorWithExecutionResult extends Error {
|
||||||
|
executionResult: ExecutionResult
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Type guard to check if an error carries an ExecutionResult.
|
||||||
|
* Validates that executionResult has required fields (success, output).
|
||||||
|
*/
|
||||||
|
export function hasExecutionResult(error: unknown): error is ErrorWithExecutionResult {
|
||||||
|
if (
|
||||||
|
!(error instanceof Error) ||
|
||||||
|
!('executionResult' in error) ||
|
||||||
|
error.executionResult == null ||
|
||||||
|
typeof error.executionResult !== 'object'
|
||||||
|
) {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
const result = error.executionResult as Record<string, unknown>
|
||||||
|
return typeof result.success === 'boolean' && result.output != null
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Attaches an ExecutionResult to an error for propagation to parent workflows.
|
||||||
|
*/
|
||||||
|
export function attachExecutionResult(error: Error, executionResult: ExecutionResult): void {
|
||||||
|
Object.assign(error, { executionResult })
|
||||||
|
}
|
||||||
|
|
||||||
export interface BlockExecutionErrorDetails {
|
export interface BlockExecutionErrorDetails {
|
||||||
block: SerializedBlock
|
block: SerializedBlock
|
||||||
error: Error | string
|
error: Error | string
|
||||||
|
|||||||
@@ -100,8 +100,13 @@ export function useExecutionStream() {
|
|||||||
})
|
})
|
||||||
|
|
||||||
if (!response.ok) {
|
if (!response.ok) {
|
||||||
const error = await response.json()
|
const errorResponse = await response.json()
|
||||||
throw new Error(error.error || 'Failed to start execution')
|
const error = new Error(errorResponse.error || 'Failed to start execution')
|
||||||
|
// Attach the execution result from server response for error handling
|
||||||
|
if (errorResponse && typeof errorResponse === 'object') {
|
||||||
|
Object.assign(error, { executionResult: errorResponse })
|
||||||
|
}
|
||||||
|
throw error
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!response.body) {
|
if (!response.body) {
|
||||||
|
|||||||
@@ -24,6 +24,7 @@ import type {
|
|||||||
IterationContext,
|
IterationContext,
|
||||||
} from '@/executor/execution/types'
|
} from '@/executor/execution/types'
|
||||||
import type { ExecutionResult, NormalizedBlockOutput } from '@/executor/types'
|
import type { ExecutionResult, NormalizedBlockOutput } from '@/executor/types'
|
||||||
|
import { hasExecutionResult } from '@/executor/utils/errors'
|
||||||
import { Serializer } from '@/serializer'
|
import { Serializer } from '@/serializer'
|
||||||
import { mergeSubblockState } from '@/stores/workflows/server-utils'
|
import { mergeSubblockState } from '@/stores/workflows/server-utils'
|
||||||
|
|
||||||
@@ -383,20 +384,15 @@ export async function executeWorkflowCore(
|
|||||||
} catch (error: unknown) {
|
} catch (error: unknown) {
|
||||||
logger.error(`[${requestId}] Execution failed:`, error)
|
logger.error(`[${requestId}] Execution failed:`, error)
|
||||||
|
|
||||||
const errorWithResult = error as {
|
const executionResult = hasExecutionResult(error) ? error.executionResult : undefined
|
||||||
executionResult?: ExecutionResult
|
|
||||||
message?: string
|
|
||||||
stack?: string
|
|
||||||
}
|
|
||||||
const executionResult = errorWithResult?.executionResult
|
|
||||||
const { traceSpans } = executionResult ? buildTraceSpans(executionResult) : { traceSpans: [] }
|
const { traceSpans } = executionResult ? buildTraceSpans(executionResult) : { traceSpans: [] }
|
||||||
|
|
||||||
await loggingSession.safeCompleteWithError({
|
await loggingSession.safeCompleteWithError({
|
||||||
endedAt: new Date().toISOString(),
|
endedAt: new Date().toISOString(),
|
||||||
totalDurationMs: executionResult?.metadata?.duration || 0,
|
totalDurationMs: executionResult?.metadata?.duration || 0,
|
||||||
error: {
|
error: {
|
||||||
message: errorWithResult?.message || 'Execution failed',
|
message: error instanceof Error ? error.message : 'Execution failed',
|
||||||
stackTrace: errorWithResult?.stack,
|
stackTrace: error instanceof Error ? error.stack : undefined,
|
||||||
},
|
},
|
||||||
traceSpans,
|
traceSpans,
|
||||||
})
|
})
|
||||||
|
|||||||
Reference in New Issue
Block a user