mirror of
https://github.com/simstudioai/sim.git
synced 2026-01-24 14:27:56 -05:00
Compare commits
6 Commits
fix/hitl
...
fix/nested
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
bf22dd75ad | ||
|
|
eb767b5ede | ||
|
|
594bcac5f2 | ||
|
|
d3f20311d0 | ||
|
|
587d44ad6f | ||
|
|
8bf2e69942 |
@@ -30,6 +30,7 @@ import { normalizeName } from '@/executor/constants'
|
||||
import { ExecutionSnapshot } from '@/executor/execution/snapshot'
|
||||
import type { ExecutionMetadata, IterationContext } from '@/executor/execution/types'
|
||||
import type { NormalizedBlockOutput, StreamingExecution } from '@/executor/types'
|
||||
import { hasExecutionResult } from '@/executor/utils/errors'
|
||||
import { Serializer } from '@/serializer'
|
||||
import { CORE_TRIGGER_TYPES, type CoreTriggerType } from '@/stores/logs/filters/types'
|
||||
|
||||
@@ -467,17 +468,17 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
|
||||
}
|
||||
|
||||
return NextResponse.json(filteredResult)
|
||||
} catch (error: any) {
|
||||
const errorMessage = error.message || 'Unknown error'
|
||||
} catch (error: unknown) {
|
||||
const errorMessage = error instanceof Error ? error.message : 'Unknown error'
|
||||
logger.error(`[${requestId}] Non-SSE execution failed: ${errorMessage}`)
|
||||
|
||||
const executionResult = error.executionResult
|
||||
const executionResult = hasExecutionResult(error) ? error.executionResult : undefined
|
||||
|
||||
return NextResponse.json(
|
||||
{
|
||||
success: false,
|
||||
output: executionResult?.output,
|
||||
error: executionResult?.error || error.message || 'Execution failed',
|
||||
error: executionResult?.error || errorMessage || 'Execution failed',
|
||||
metadata: executionResult?.metadata
|
||||
? {
|
||||
duration: executionResult.metadata.duration,
|
||||
@@ -788,11 +789,11 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
|
||||
|
||||
// Cleanup base64 cache for this execution
|
||||
await cleanupExecutionBase64Cache(executionId)
|
||||
} catch (error: any) {
|
||||
const errorMessage = error.message || 'Unknown error'
|
||||
} catch (error: unknown) {
|
||||
const errorMessage = error instanceof Error ? error.message : 'Unknown error'
|
||||
logger.error(`[${requestId}] SSE execution failed: ${errorMessage}`)
|
||||
|
||||
const executionResult = error.executionResult
|
||||
const executionResult = hasExecutionResult(error) ? error.executionResult : undefined
|
||||
|
||||
sendEvent({
|
||||
type: 'execution:error',
|
||||
|
||||
@@ -16,6 +16,7 @@ import {
|
||||
} from '@/lib/workflows/triggers/triggers'
|
||||
import { useCurrentWorkflow } from '@/app/workspace/[workspaceId]/w/[workflowId]/hooks/use-current-workflow'
|
||||
import type { BlockLog, ExecutionResult, StreamingExecution } from '@/executor/types'
|
||||
import { hasExecutionResult } from '@/executor/utils/errors'
|
||||
import { coerceValue } from '@/executor/utils/start-block'
|
||||
import { subscriptionKeys } from '@/hooks/queries/subscription'
|
||||
import { useExecutionStream } from '@/hooks/use-execution-stream'
|
||||
@@ -76,17 +77,6 @@ function normalizeErrorMessage(error: unknown): string {
|
||||
return WORKFLOW_EXECUTION_FAILURE_MESSAGE
|
||||
}
|
||||
|
||||
function isExecutionResult(value: unknown): value is ExecutionResult {
|
||||
if (!isRecord(value)) return false
|
||||
return typeof value.success === 'boolean' && isRecord(value.output)
|
||||
}
|
||||
|
||||
function extractExecutionResult(error: unknown): ExecutionResult | null {
|
||||
if (!isRecord(error)) return null
|
||||
const candidate = error.executionResult
|
||||
return isExecutionResult(candidate) ? candidate : null
|
||||
}
|
||||
|
||||
export function useWorkflowExecution() {
|
||||
const queryClient = useQueryClient()
|
||||
const currentWorkflow = useCurrentWorkflow()
|
||||
@@ -1138,11 +1128,11 @@ export function useWorkflowExecution() {
|
||||
|
||||
const handleExecutionError = (error: unknown, options?: { executionId?: string }) => {
|
||||
const normalizedMessage = normalizeErrorMessage(error)
|
||||
const executionResultFromError = extractExecutionResult(error)
|
||||
|
||||
let errorResult: ExecutionResult
|
||||
|
||||
if (executionResultFromError) {
|
||||
if (hasExecutionResult(error)) {
|
||||
const executionResultFromError = error.executionResult
|
||||
const logs = Array.isArray(executionResultFromError.logs) ? executionResultFromError.logs : []
|
||||
|
||||
errorResult = {
|
||||
|
||||
@@ -21,7 +21,7 @@ import {
|
||||
} from '@/lib/workflows/schedules/utils'
|
||||
import { ExecutionSnapshot } from '@/executor/execution/snapshot'
|
||||
import type { ExecutionMetadata } from '@/executor/execution/types'
|
||||
import type { ExecutionResult } from '@/executor/types'
|
||||
import { hasExecutionResult } from '@/executor/utils/errors'
|
||||
import { MAX_CONSECUTIVE_FAILURES } from '@/triggers/constants'
|
||||
|
||||
const logger = createLogger('TriggerScheduleExecution')
|
||||
@@ -231,8 +231,7 @@ async function runWorkflowExecution({
|
||||
} catch (error: unknown) {
|
||||
logger.error(`[${requestId}] Early failure in scheduled workflow ${payload.workflowId}`, error)
|
||||
|
||||
const errorWithResult = error as { executionResult?: ExecutionResult }
|
||||
const executionResult = errorWithResult?.executionResult
|
||||
const executionResult = hasExecutionResult(error) ? error.executionResult : undefined
|
||||
const { traceSpans } = executionResult ? buildTraceSpans(executionResult) : { traceSpans: [] }
|
||||
|
||||
await loggingSession.safeCompleteWithError({
|
||||
|
||||
@@ -16,7 +16,7 @@ import { loadDeployedWorkflowState } from '@/lib/workflows/persistence/utils'
|
||||
import { getWorkflowById } from '@/lib/workflows/utils'
|
||||
import { ExecutionSnapshot } from '@/executor/execution/snapshot'
|
||||
import type { ExecutionMetadata } from '@/executor/execution/types'
|
||||
import type { ExecutionResult } from '@/executor/types'
|
||||
import { hasExecutionResult } from '@/executor/utils/errors'
|
||||
import { safeAssign } from '@/tools/safe-assign'
|
||||
import { getTrigger, isTriggerValid } from '@/triggers'
|
||||
|
||||
@@ -578,12 +578,13 @@ async function executeWebhookJobInternal(
|
||||
deploymentVersionId,
|
||||
})
|
||||
|
||||
const errorWithResult = error as { executionResult?: ExecutionResult }
|
||||
const executionResult = errorWithResult?.executionResult || {
|
||||
success: false,
|
||||
output: {},
|
||||
logs: [],
|
||||
}
|
||||
const executionResult = hasExecutionResult(error)
|
||||
? error.executionResult
|
||||
: {
|
||||
success: false,
|
||||
output: {},
|
||||
logs: [],
|
||||
}
|
||||
const { traceSpans } = buildTraceSpans(executionResult)
|
||||
|
||||
await loggingSession.safeCompleteWithError({
|
||||
|
||||
@@ -9,7 +9,7 @@ import { PauseResumeManager } from '@/lib/workflows/executor/human-in-the-loop-m
|
||||
import { getWorkflowById } from '@/lib/workflows/utils'
|
||||
import { ExecutionSnapshot } from '@/executor/execution/snapshot'
|
||||
import type { ExecutionMetadata } from '@/executor/execution/types'
|
||||
import type { ExecutionResult } from '@/executor/types'
|
||||
import { hasExecutionResult } from '@/executor/utils/errors'
|
||||
import type { CoreTriggerType } from '@/stores/logs/filters/types'
|
||||
|
||||
const logger = createLogger('TriggerWorkflowExecution')
|
||||
@@ -160,8 +160,7 @@ export async function executeWorkflowJob(payload: WorkflowExecutionPayload) {
|
||||
executionId,
|
||||
})
|
||||
|
||||
const errorWithResult = error as { executionResult?: ExecutionResult }
|
||||
const executionResult = errorWithResult?.executionResult
|
||||
const executionResult = hasExecutionResult(error) ? error.executionResult : undefined
|
||||
const { traceSpans } = executionResult ? buildTraceSpans(executionResult) : { traceSpans: [] }
|
||||
|
||||
await loggingSession.safeCompleteWithError({
|
||||
|
||||
31
apps/sim/executor/errors/child-workflow-error.ts
Normal file
31
apps/sim/executor/errors/child-workflow-error.ts
Normal file
@@ -0,0 +1,31 @@
|
||||
import type { TraceSpan } from '@/lib/logs/types'
|
||||
import type { ExecutionResult } from '@/executor/types'
|
||||
|
||||
interface ChildWorkflowErrorOptions {
|
||||
message: string
|
||||
childWorkflowName: string
|
||||
childTraceSpans?: TraceSpan[]
|
||||
executionResult?: ExecutionResult
|
||||
cause?: Error
|
||||
}
|
||||
|
||||
/**
|
||||
* Error raised when a child workflow execution fails.
|
||||
*/
|
||||
export class ChildWorkflowError extends Error {
|
||||
readonly childTraceSpans: TraceSpan[]
|
||||
readonly childWorkflowName: string
|
||||
readonly executionResult?: ExecutionResult
|
||||
|
||||
constructor(options: ChildWorkflowErrorOptions) {
|
||||
super(options.message, { cause: options.cause })
|
||||
this.name = 'ChildWorkflowError'
|
||||
this.childWorkflowName = options.childWorkflowName
|
||||
this.childTraceSpans = options.childTraceSpans ?? []
|
||||
this.executionResult = options.executionResult
|
||||
}
|
||||
|
||||
static isChildWorkflowError(error: unknown): error is ChildWorkflowError {
|
||||
return error instanceof ChildWorkflowError
|
||||
}
|
||||
}
|
||||
@@ -13,6 +13,7 @@ import {
|
||||
isSentinelBlockType,
|
||||
} from '@/executor/constants'
|
||||
import type { DAGNode } from '@/executor/dag/builder'
|
||||
import { ChildWorkflowError } from '@/executor/errors/child-workflow-error'
|
||||
import type { BlockStateWriter, ContextExtensions } from '@/executor/execution/types'
|
||||
import {
|
||||
generatePauseContextId,
|
||||
@@ -213,24 +214,26 @@ export class BlockExecutor {
|
||||
? resolvedInputs
|
||||
: ((block.config?.params as Record<string, any> | undefined) ?? {})
|
||||
|
||||
const errorOutput: NormalizedBlockOutput = {
|
||||
error: errorMessage,
|
||||
}
|
||||
|
||||
if (ChildWorkflowError.isChildWorkflowError(error)) {
|
||||
errorOutput.childTraceSpans = error.childTraceSpans
|
||||
errorOutput.childWorkflowName = error.childWorkflowName
|
||||
}
|
||||
|
||||
this.state.setBlockOutput(node.id, errorOutput, duration)
|
||||
|
||||
if (blockLog) {
|
||||
blockLog.endedAt = new Date().toISOString()
|
||||
blockLog.durationMs = duration
|
||||
blockLog.success = false
|
||||
blockLog.error = errorMessage
|
||||
blockLog.input = input
|
||||
blockLog.output = this.filterOutputForLog(block, errorOutput)
|
||||
}
|
||||
|
||||
const errorOutput: NormalizedBlockOutput = {
|
||||
error: errorMessage,
|
||||
}
|
||||
|
||||
if (error && typeof error === 'object' && 'childTraceSpans' in error) {
|
||||
errorOutput.childTraceSpans = (error as any).childTraceSpans
|
||||
}
|
||||
|
||||
this.state.setBlockOutput(node.id, errorOutput, duration)
|
||||
|
||||
logger.error(
|
||||
phase === 'input_resolution' ? 'Failed to resolve block inputs' : 'Block execution failed',
|
||||
{
|
||||
|
||||
@@ -13,7 +13,7 @@ import type {
|
||||
PausePoint,
|
||||
ResumeStatus,
|
||||
} from '@/executor/types'
|
||||
import { normalizeError } from '@/executor/utils/errors'
|
||||
import { attachExecutionResult, normalizeError } from '@/executor/utils/errors'
|
||||
|
||||
const logger = createLogger('ExecutionEngine')
|
||||
|
||||
@@ -170,8 +170,8 @@ export class ExecutionEngine {
|
||||
metadata: this.context.metadata,
|
||||
}
|
||||
|
||||
if (error && typeof error === 'object') {
|
||||
;(error as any).executionResult = executionResult
|
||||
if (error instanceof Error) {
|
||||
attachExecutionResult(error, executionResult)
|
||||
}
|
||||
throw error
|
||||
}
|
||||
|
||||
@@ -85,11 +85,7 @@ export class ConditionBlockHandler implements BlockHandler {
|
||||
|
||||
const sourceBlockId = ctx.workflow?.connections.find((conn) => conn.target === block.id)?.source
|
||||
const evalContext = this.buildEvaluationContext(ctx, sourceBlockId)
|
||||
const rawSourceOutput = sourceBlockId ? ctx.blockStates.get(sourceBlockId)?.output : null
|
||||
|
||||
// Filter out _pauseMetadata from source output to prevent the engine from
|
||||
// thinking this block is pausing (it was already resumed by the HITL block)
|
||||
const sourceOutput = this.filterPauseMetadata(rawSourceOutput)
|
||||
const sourceOutput = sourceBlockId ? ctx.blockStates.get(sourceBlockId)?.output : null
|
||||
|
||||
const outgoingConnections = ctx.workflow?.connections.filter((conn) => conn.source === block.id)
|
||||
|
||||
@@ -129,14 +125,6 @@ export class ConditionBlockHandler implements BlockHandler {
|
||||
}
|
||||
}
|
||||
|
||||
private filterPauseMetadata(output: any): any {
|
||||
if (!output || typeof output !== 'object') {
|
||||
return output
|
||||
}
|
||||
const { _pauseMetadata, ...rest } = output
|
||||
return rest
|
||||
}
|
||||
|
||||
private parseConditions(input: any): Array<{ id: string; title: string; value: string }> {
|
||||
try {
|
||||
const conditions = Array.isArray(input) ? input : JSON.parse(input || '[]')
|
||||
|
||||
@@ -4,12 +4,14 @@ import type { TraceSpan } from '@/lib/logs/types'
|
||||
import type { BlockOutput } from '@/blocks/types'
|
||||
import { Executor } from '@/executor'
|
||||
import { BlockType, DEFAULTS, HTTP } from '@/executor/constants'
|
||||
import { ChildWorkflowError } from '@/executor/errors/child-workflow-error'
|
||||
import type {
|
||||
BlockHandler,
|
||||
ExecutionContext,
|
||||
ExecutionResult,
|
||||
StreamingExecution,
|
||||
} from '@/executor/types'
|
||||
import { hasExecutionResult } from '@/executor/utils/errors'
|
||||
import { buildAPIUrl, buildAuthHeaders } from '@/executor/utils/http'
|
||||
import { parseJSON } from '@/executor/utils/json'
|
||||
import { lazyCleanupInputMapping } from '@/executor/utils/lazy-cleanup'
|
||||
@@ -137,39 +139,39 @@ export class WorkflowBlockHandler implements BlockHandler {
|
||||
)
|
||||
|
||||
return mappedResult
|
||||
} catch (error: any) {
|
||||
} catch (error: unknown) {
|
||||
logger.error(`Error executing child workflow ${workflowId}:`, error)
|
||||
|
||||
const { workflows } = useWorkflowRegistry.getState()
|
||||
const workflowMetadata = workflows[workflowId]
|
||||
const childWorkflowName = workflowMetadata?.name || workflowId
|
||||
|
||||
const originalError = error.message || 'Unknown error'
|
||||
const wrappedError = new Error(
|
||||
`Error in child workflow "${childWorkflowName}": ${originalError}`
|
||||
)
|
||||
const originalError = error instanceof Error ? error.message : 'Unknown error'
|
||||
let childTraceSpans: WorkflowTraceSpan[] = []
|
||||
let executionResult: ExecutionResult | undefined
|
||||
|
||||
if (error.executionResult?.logs) {
|
||||
const executionResult = error.executionResult as ExecutionResult
|
||||
if (hasExecutionResult(error) && error.executionResult.logs) {
|
||||
executionResult = error.executionResult
|
||||
|
||||
logger.info(`Extracting child trace spans from error.executionResult`, {
|
||||
hasLogs: (executionResult.logs?.length ?? 0) > 0,
|
||||
logCount: executionResult.logs?.length ?? 0,
|
||||
})
|
||||
|
||||
const childTraceSpans = this.captureChildWorkflowLogs(
|
||||
executionResult,
|
||||
childWorkflowName,
|
||||
ctx
|
||||
)
|
||||
childTraceSpans = this.captureChildWorkflowLogs(executionResult, childWorkflowName, ctx)
|
||||
|
||||
logger.info(`Captured ${childTraceSpans.length} child trace spans from failed execution`)
|
||||
;(wrappedError as any).childTraceSpans = childTraceSpans
|
||||
} else if (error.childTraceSpans && Array.isArray(error.childTraceSpans)) {
|
||||
;(wrappedError as any).childTraceSpans = error.childTraceSpans
|
||||
} else if (ChildWorkflowError.isChildWorkflowError(error)) {
|
||||
childTraceSpans = error.childTraceSpans
|
||||
}
|
||||
|
||||
throw wrappedError
|
||||
throw new ChildWorkflowError({
|
||||
message: `Error in child workflow "${childWorkflowName}": ${originalError}`,
|
||||
childWorkflowName,
|
||||
childTraceSpans,
|
||||
executionResult,
|
||||
cause: error instanceof Error ? error : undefined,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -441,11 +443,11 @@ export class WorkflowBlockHandler implements BlockHandler {
|
||||
|
||||
if (!success) {
|
||||
logger.warn(`Child workflow ${childWorkflowName} failed`)
|
||||
const error = new Error(
|
||||
`Error in child workflow "${childWorkflowName}": ${childResult.error || 'Child workflow execution failed'}`
|
||||
)
|
||||
;(error as any).childTraceSpans = childTraceSpans || []
|
||||
throw error
|
||||
throw new ChildWorkflowError({
|
||||
message: `Error in child workflow "${childWorkflowName}": ${childResult.error || 'Child workflow execution failed'}`,
|
||||
childWorkflowName,
|
||||
childTraceSpans: childTraceSpans || [],
|
||||
})
|
||||
}
|
||||
|
||||
return {
|
||||
|
||||
@@ -1,6 +1,39 @@
|
||||
import type { ExecutionContext } from '@/executor/types'
|
||||
import type { ExecutionContext, ExecutionResult } from '@/executor/types'
|
||||
import type { SerializedBlock } from '@/serializer/types'
|
||||
|
||||
/**
|
||||
* Interface for errors that carry an ExecutionResult.
|
||||
* Used when workflow execution fails and we want to preserve partial results.
|
||||
*/
|
||||
export interface ErrorWithExecutionResult extends Error {
|
||||
executionResult: ExecutionResult
|
||||
}
|
||||
|
||||
/**
|
||||
* Type guard to check if an error carries an ExecutionResult.
|
||||
* Validates that executionResult has required fields (success, output).
|
||||
*/
|
||||
export function hasExecutionResult(error: unknown): error is ErrorWithExecutionResult {
|
||||
if (
|
||||
!(error instanceof Error) ||
|
||||
!('executionResult' in error) ||
|
||||
error.executionResult == null ||
|
||||
typeof error.executionResult !== 'object'
|
||||
) {
|
||||
return false
|
||||
}
|
||||
|
||||
const result = error.executionResult as Record<string, unknown>
|
||||
return typeof result.success === 'boolean' && result.output != null
|
||||
}
|
||||
|
||||
/**
|
||||
* Attaches an ExecutionResult to an error for propagation to parent workflows.
|
||||
*/
|
||||
export function attachExecutionResult(error: Error, executionResult: ExecutionResult): void {
|
||||
Object.assign(error, { executionResult })
|
||||
}
|
||||
|
||||
export interface BlockExecutionErrorDetails {
|
||||
block: SerializedBlock
|
||||
error: Error | string
|
||||
|
||||
@@ -100,8 +100,13 @@ export function useExecutionStream() {
|
||||
})
|
||||
|
||||
if (!response.ok) {
|
||||
const error = await response.json()
|
||||
throw new Error(error.error || 'Failed to start execution')
|
||||
const errorResponse = await response.json()
|
||||
const error = new Error(errorResponse.error || 'Failed to start execution')
|
||||
// Attach the execution result from server response for error handling
|
||||
if (errorResponse && typeof errorResponse === 'object') {
|
||||
Object.assign(error, { executionResult: errorResponse })
|
||||
}
|
||||
throw error
|
||||
}
|
||||
|
||||
if (!response.body) {
|
||||
|
||||
@@ -24,6 +24,7 @@ import type {
|
||||
IterationContext,
|
||||
} from '@/executor/execution/types'
|
||||
import type { ExecutionResult, NormalizedBlockOutput } from '@/executor/types'
|
||||
import { hasExecutionResult } from '@/executor/utils/errors'
|
||||
import { Serializer } from '@/serializer'
|
||||
import { mergeSubblockState } from '@/stores/workflows/server-utils'
|
||||
|
||||
@@ -383,20 +384,15 @@ export async function executeWorkflowCore(
|
||||
} catch (error: unknown) {
|
||||
logger.error(`[${requestId}] Execution failed:`, error)
|
||||
|
||||
const errorWithResult = error as {
|
||||
executionResult?: ExecutionResult
|
||||
message?: string
|
||||
stack?: string
|
||||
}
|
||||
const executionResult = errorWithResult?.executionResult
|
||||
const executionResult = hasExecutionResult(error) ? error.executionResult : undefined
|
||||
const { traceSpans } = executionResult ? buildTraceSpans(executionResult) : { traceSpans: [] }
|
||||
|
||||
await loggingSession.safeCompleteWithError({
|
||||
endedAt: new Date().toISOString(),
|
||||
totalDurationMs: executionResult?.metadata?.duration || 0,
|
||||
error: {
|
||||
message: errorWithResult?.message || 'Execution failed',
|
||||
stackTrace: errorWithResult?.stack,
|
||||
message: error instanceof Error ? error.message : 'Execution failed',
|
||||
stackTrace: error instanceof Error ? error.stack : undefined,
|
||||
},
|
||||
traceSpans,
|
||||
})
|
||||
|
||||
@@ -567,32 +567,6 @@ export class PauseResumeManager {
|
||||
|
||||
stateCopy.blockStates[stateBlockKey] = pauseBlockState
|
||||
|
||||
// Update the block log entry with the merged output so logs show the submission data
|
||||
if (Array.isArray(stateCopy.blockLogs)) {
|
||||
const blockLogIndex = stateCopy.blockLogs.findIndex(
|
||||
(log: { blockId: string }) =>
|
||||
log.blockId === stateBlockKey ||
|
||||
log.blockId === pauseBlockId ||
|
||||
log.blockId === contextId
|
||||
)
|
||||
if (blockLogIndex !== -1) {
|
||||
// Filter output for logging (exclude internal fields and response)
|
||||
const filteredOutput: Record<string, unknown> = {}
|
||||
for (const [key, value] of Object.entries(mergedOutput)) {
|
||||
if (key.startsWith('_')) continue
|
||||
if (key === 'response') continue
|
||||
filteredOutput[key] = value
|
||||
}
|
||||
stateCopy.blockLogs[blockLogIndex] = {
|
||||
...stateCopy.blockLogs[blockLogIndex],
|
||||
blockId: stateBlockKey,
|
||||
output: filteredOutput,
|
||||
durationMs: pauseDurationMs,
|
||||
endedAt: new Date().toISOString(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (Array.isArray(stateCopy.executedBlocks)) {
|
||||
const filtered = stateCopy.executedBlocks.filter(
|
||||
(id: string) => id !== pauseBlockId && id !== contextId
|
||||
|
||||
Reference in New Issue
Block a user