mirror of
https://github.com/simstudioai/sim.git
synced 2026-01-29 00:38:03 -05:00
feat(child-workflows): nested execution snapshots
This commit is contained in:
@@ -6,7 +6,7 @@ import {
|
||||
workflowExecutionSnapshots,
|
||||
} from '@sim/db/schema'
|
||||
import { createLogger } from '@sim/logger'
|
||||
import { and, eq } from 'drizzle-orm'
|
||||
import { and, eq, inArray } from 'drizzle-orm'
|
||||
import { type NextRequest, NextResponse } from 'next/server'
|
||||
import { checkHybridAuth } from '@/lib/auth/hybrid'
|
||||
import { generateRequestId } from '@/lib/core/utils/request'
|
||||
@@ -48,6 +48,7 @@ export async function GET(
|
||||
endedAt: workflowExecutionLogs.endedAt,
|
||||
totalDurationMs: workflowExecutionLogs.totalDurationMs,
|
||||
cost: workflowExecutionLogs.cost,
|
||||
executionData: workflowExecutionLogs.executionData,
|
||||
})
|
||||
.from(workflowExecutionLogs)
|
||||
.innerJoin(workflow, eq(workflowExecutionLogs.workflowId, workflow.id))
|
||||
@@ -78,10 +79,44 @@ export async function GET(
|
||||
return NextResponse.json({ error: 'Workflow state snapshot not found' }, { status: 404 })
|
||||
}
|
||||
|
||||
const traceSpans =
|
||||
(workflowLog.executionData as { traceSpans?: Array<{ [key: string]: unknown }> })
|
||||
?.traceSpans || []
|
||||
const childSnapshotIds = new Set<string>()
|
||||
const collectSnapshotIds = (spans: Array<{ [key: string]: unknown }>) => {
|
||||
spans.forEach((span) => {
|
||||
const snapshotId = span.childWorkflowSnapshotId
|
||||
if (typeof snapshotId === 'string') {
|
||||
childSnapshotIds.add(snapshotId)
|
||||
}
|
||||
const children = span.children
|
||||
if (Array.isArray(children)) {
|
||||
collectSnapshotIds(children as Array<{ [key: string]: unknown }>)
|
||||
}
|
||||
})
|
||||
}
|
||||
if (traceSpans.length > 0) {
|
||||
collectSnapshotIds(traceSpans)
|
||||
}
|
||||
|
||||
const childWorkflowSnapshots =
|
||||
childSnapshotIds.size > 0
|
||||
? await db
|
||||
.select()
|
||||
.from(workflowExecutionSnapshots)
|
||||
.where(inArray(workflowExecutionSnapshots.id, Array.from(childSnapshotIds)))
|
||||
: []
|
||||
|
||||
const childSnapshotMap = childWorkflowSnapshots.reduce<Record<string, unknown>>((acc, snap) => {
|
||||
acc[snap.id] = snap.stateData
|
||||
return acc
|
||||
}, {})
|
||||
|
||||
const response = {
|
||||
executionId,
|
||||
workflowId: workflowLog.workflowId,
|
||||
workflowState: snapshot.stateData,
|
||||
childWorkflowSnapshots: childSnapshotMap,
|
||||
executionMetadata: {
|
||||
trigger: workflowLog.trigger,
|
||||
startedAt: workflowLog.startedAt.toISOString(),
|
||||
|
||||
@@ -80,6 +80,9 @@ export function ExecutionSnapshot({
|
||||
}, [executionId, closeMenu])
|
||||
|
||||
const workflowState = data?.workflowState as WorkflowState | undefined
|
||||
const childWorkflowSnapshots = data?.childWorkflowSnapshots as
|
||||
| Record<string, WorkflowState>
|
||||
| undefined
|
||||
|
||||
const renderContent = () => {
|
||||
if (isLoading) {
|
||||
@@ -148,6 +151,7 @@ export function ExecutionSnapshot({
|
||||
key={executionId}
|
||||
workflowState={workflowState}
|
||||
traceSpans={traceSpans}
|
||||
childWorkflowSnapshots={childWorkflowSnapshots}
|
||||
className={className}
|
||||
height={height}
|
||||
width={width}
|
||||
|
||||
@@ -690,6 +690,7 @@ interface ExecutionData {
|
||||
output?: unknown
|
||||
status?: string
|
||||
durationMs?: number
|
||||
childWorkflowSnapshotId?: string
|
||||
}
|
||||
|
||||
interface WorkflowVariable {
|
||||
@@ -714,6 +715,8 @@ interface PreviewEditorProps {
|
||||
parallels?: Record<string, Parallel>
|
||||
/** When true, shows "Not Executed" badge if no executionData is provided */
|
||||
isExecutionMode?: boolean
|
||||
/** Child workflow snapshots keyed by snapshot ID (execution mode only) */
|
||||
childWorkflowSnapshots?: Record<string, WorkflowState>
|
||||
/** Optional close handler - if not provided, no close button is shown */
|
||||
onClose?: () => void
|
||||
/** Callback to drill down into a nested workflow block */
|
||||
@@ -739,6 +742,7 @@ function PreviewEditorContent({
|
||||
loops,
|
||||
parallels,
|
||||
isExecutionMode = false,
|
||||
childWorkflowSnapshots,
|
||||
onClose,
|
||||
onDrillDown,
|
||||
}: PreviewEditorProps) {
|
||||
@@ -768,17 +772,31 @@ function PreviewEditorContent({
|
||||
const { data: childWorkflowState, isLoading: isLoadingChildWorkflow } = useWorkflowState(
|
||||
childWorkflowId ?? undefined
|
||||
)
|
||||
const childWorkflowSnapshotId = executionData?.childWorkflowSnapshotId
|
||||
const childWorkflowSnapshotState = childWorkflowSnapshotId
|
||||
? childWorkflowSnapshots?.[childWorkflowSnapshotId]
|
||||
: undefined
|
||||
|
||||
/** Drills down into the child workflow or opens it in a new tab */
|
||||
const handleExpandChildWorkflow = useCallback(() => {
|
||||
if (!childWorkflowId || !childWorkflowState) return
|
||||
if (!childWorkflowId) return
|
||||
|
||||
if (isExecutionMode && onDrillDown) {
|
||||
onDrillDown(block.id, childWorkflowState)
|
||||
const resolvedChildState = childWorkflowSnapshotState ?? childWorkflowState
|
||||
if (!resolvedChildState) return
|
||||
onDrillDown(block.id, resolvedChildState)
|
||||
} else if (workspaceId) {
|
||||
window.open(`/workspace/${workspaceId}/w/${childWorkflowId}`, '_blank', 'noopener,noreferrer')
|
||||
}
|
||||
}, [childWorkflowId, childWorkflowState, isExecutionMode, onDrillDown, block.id, workspaceId])
|
||||
}, [
|
||||
childWorkflowId,
|
||||
childWorkflowSnapshotState,
|
||||
childWorkflowState,
|
||||
isExecutionMode,
|
||||
onDrillDown,
|
||||
block.id,
|
||||
workspaceId,
|
||||
])
|
||||
|
||||
const contentRef = useRef<HTMLDivElement>(null)
|
||||
const subBlocksRef = useRef<HTMLDivElement>(null)
|
||||
|
||||
@@ -19,6 +19,8 @@ interface TraceSpan {
|
||||
status?: string
|
||||
duration?: number
|
||||
children?: TraceSpan[]
|
||||
childWorkflowSnapshotId?: string
|
||||
childWorkflowId?: string
|
||||
}
|
||||
|
||||
interface BlockExecutionData {
|
||||
@@ -28,6 +30,7 @@ interface BlockExecutionData {
|
||||
durationMs: number
|
||||
/** Child trace spans for nested workflow blocks */
|
||||
children?: TraceSpan[]
|
||||
childWorkflowSnapshotId?: string
|
||||
}
|
||||
|
||||
/** Represents a level in the workflow navigation stack */
|
||||
@@ -89,6 +92,7 @@ export function buildBlockExecutions(spans: TraceSpan[]): Record<string, BlockEx
|
||||
status: span.status || 'unknown',
|
||||
durationMs: span.duration || 0,
|
||||
children: span.children,
|
||||
childWorkflowSnapshotId: span.childWorkflowSnapshotId,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -103,6 +107,8 @@ interface PreviewProps {
|
||||
traceSpans?: TraceSpan[]
|
||||
/** Pre-computed block executions (optional - will be built from traceSpans if not provided) */
|
||||
blockExecutions?: Record<string, BlockExecutionData>
|
||||
/** Child workflow snapshots keyed by snapshot ID (execution mode only) */
|
||||
childWorkflowSnapshots?: Record<string, WorkflowState>
|
||||
/** Additional CSS class names */
|
||||
className?: string
|
||||
/** Height of the component */
|
||||
@@ -135,6 +141,7 @@ export function Preview({
|
||||
workflowState: rootWorkflowState,
|
||||
traceSpans: rootTraceSpans,
|
||||
blockExecutions: providedBlockExecutions,
|
||||
childWorkflowSnapshots,
|
||||
className,
|
||||
height = '100%',
|
||||
width = '100%',
|
||||
@@ -284,6 +291,7 @@ export function Preview({
|
||||
loops={workflowState.loops}
|
||||
parallels={workflowState.parallels}
|
||||
isExecutionMode={isExecutionMode}
|
||||
childWorkflowSnapshots={childWorkflowSnapshots}
|
||||
onClose={handleEditorClose}
|
||||
onDrillDown={handleDrillDown}
|
||||
/>
|
||||
|
||||
@@ -6,6 +6,7 @@ interface ChildWorkflowErrorOptions {
|
||||
childWorkflowName: string
|
||||
childTraceSpans?: TraceSpan[]
|
||||
executionResult?: ExecutionResult
|
||||
childWorkflowSnapshotId?: string
|
||||
cause?: Error
|
||||
}
|
||||
|
||||
@@ -16,6 +17,7 @@ export class ChildWorkflowError extends Error {
|
||||
readonly childTraceSpans: TraceSpan[]
|
||||
readonly childWorkflowName: string
|
||||
readonly executionResult?: ExecutionResult
|
||||
readonly childWorkflowSnapshotId?: string
|
||||
|
||||
constructor(options: ChildWorkflowErrorOptions) {
|
||||
super(options.message, { cause: options.cause })
|
||||
@@ -23,6 +25,7 @@ export class ChildWorkflowError extends Error {
|
||||
this.childWorkflowName = options.childWorkflowName
|
||||
this.childTraceSpans = options.childTraceSpans ?? []
|
||||
this.executionResult = options.executionResult
|
||||
this.childWorkflowSnapshotId = options.childWorkflowSnapshotId
|
||||
}
|
||||
|
||||
static isChildWorkflowError(error: unknown): error is ChildWorkflowError {
|
||||
|
||||
@@ -237,6 +237,9 @@ export class BlockExecutor {
|
||||
if (ChildWorkflowError.isChildWorkflowError(error)) {
|
||||
errorOutput.childTraceSpans = error.childTraceSpans
|
||||
errorOutput.childWorkflowName = error.childWorkflowName
|
||||
if (error.childWorkflowSnapshotId) {
|
||||
errorOutput.childWorkflowSnapshotId = error.childWorkflowSnapshotId
|
||||
}
|
||||
}
|
||||
|
||||
this.state.setBlockOutput(node.id, errorOutput, duration)
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
import { createLogger } from '@sim/logger'
|
||||
import { snapshotService } from '@/lib/logs/execution/snapshot/service'
|
||||
import { buildTraceSpans } from '@/lib/logs/execution/trace-spans/trace-spans'
|
||||
import type { TraceSpan } from '@/lib/logs/types'
|
||||
import type { BlockOutput } from '@/blocks/types'
|
||||
@@ -57,6 +58,7 @@ export class WorkflowBlockHandler implements BlockHandler {
|
||||
const workflowMetadata = workflows[workflowId]
|
||||
let childWorkflowName = workflowMetadata?.name || workflowId
|
||||
|
||||
let childWorkflowSnapshotId: string | undefined
|
||||
try {
|
||||
const currentDepth = (ctx.workflowId?.split('_sub_').length || 1) - 1
|
||||
if (currentDepth >= DEFAULTS.MAX_WORKFLOW_DEPTH) {
|
||||
@@ -107,6 +109,12 @@ export class WorkflowBlockHandler implements BlockHandler {
|
||||
childWorkflowInput = inputs.input
|
||||
}
|
||||
|
||||
const childSnapshotResult = await snapshotService.createSnapshotWithDeduplication(
|
||||
workflowId,
|
||||
childWorkflow.workflowState
|
||||
)
|
||||
childWorkflowSnapshotId = childSnapshotResult.snapshot.id
|
||||
|
||||
const subExecutor = new Executor({
|
||||
workflow: childWorkflow.serializedState,
|
||||
workflowInput: childWorkflowInput,
|
||||
@@ -139,7 +147,8 @@ export class WorkflowBlockHandler implements BlockHandler {
|
||||
workflowId,
|
||||
childWorkflowName,
|
||||
duration,
|
||||
childTraceSpans
|
||||
childTraceSpans,
|
||||
childWorkflowSnapshotId
|
||||
)
|
||||
|
||||
return mappedResult
|
||||
@@ -172,6 +181,7 @@ export class WorkflowBlockHandler implements BlockHandler {
|
||||
childWorkflowName,
|
||||
childTraceSpans,
|
||||
executionResult,
|
||||
childWorkflowSnapshotId,
|
||||
cause: error instanceof Error ? error : undefined,
|
||||
})
|
||||
}
|
||||
@@ -279,6 +289,10 @@ export class WorkflowBlockHandler implements BlockHandler {
|
||||
)
|
||||
|
||||
const workflowVariables = (workflowData.variables as Record<string, any>) || {}
|
||||
const workflowStateWithVariables = {
|
||||
...workflowState,
|
||||
variables: workflowVariables,
|
||||
}
|
||||
|
||||
if (Object.keys(workflowVariables).length > 0) {
|
||||
logger.info(
|
||||
@@ -290,6 +304,7 @@ export class WorkflowBlockHandler implements BlockHandler {
|
||||
name: workflowData.name,
|
||||
serializedState: serializedWorkflow,
|
||||
variables: workflowVariables,
|
||||
workflowState: workflowStateWithVariables,
|
||||
rawBlocks: workflowState.blocks,
|
||||
}
|
||||
}
|
||||
@@ -358,11 +373,16 @@ export class WorkflowBlockHandler implements BlockHandler {
|
||||
)
|
||||
|
||||
const workflowVariables = (wfData?.variables as Record<string, any>) || {}
|
||||
const workflowStateWithVariables = {
|
||||
...deployedState,
|
||||
variables: workflowVariables,
|
||||
}
|
||||
|
||||
return {
|
||||
name: wfData?.name || DEFAULTS.WORKFLOW_NAME,
|
||||
serializedState: serializedWorkflow,
|
||||
variables: workflowVariables,
|
||||
workflowState: workflowStateWithVariables,
|
||||
rawBlocks: deployedState.blocks,
|
||||
}
|
||||
}
|
||||
@@ -504,7 +524,8 @@ export class WorkflowBlockHandler implements BlockHandler {
|
||||
childWorkflowId: string,
|
||||
childWorkflowName: string,
|
||||
duration: number,
|
||||
childTraceSpans?: WorkflowTraceSpan[]
|
||||
childTraceSpans?: WorkflowTraceSpan[],
|
||||
childWorkflowSnapshotId?: string
|
||||
): BlockOutput {
|
||||
const success = childResult.success !== false
|
||||
const result = childResult.output || {}
|
||||
@@ -521,6 +542,8 @@ export class WorkflowBlockHandler implements BlockHandler {
|
||||
return {
|
||||
success: true,
|
||||
childWorkflowName,
|
||||
childWorkflowId,
|
||||
childWorkflowSnapshotId,
|
||||
result,
|
||||
childTraceSpans: childTraceSpans || [],
|
||||
} as Record<string, any>
|
||||
|
||||
@@ -210,6 +210,7 @@ export interface ExecutionSnapshotData {
|
||||
executionId: string
|
||||
workflowId: string
|
||||
workflowState: Record<string, unknown>
|
||||
childWorkflowSnapshots?: Record<string, Record<string, unknown>>
|
||||
executionMetadata: {
|
||||
trigger: string
|
||||
startedAt: string
|
||||
|
||||
@@ -112,6 +112,26 @@ export function buildTraceSpans(result: ExecutionResult): {
|
||||
const duration = log.durationMs || 0
|
||||
|
||||
let output = log.output || {}
|
||||
let childWorkflowSnapshotId: string | undefined
|
||||
let childWorkflowId: string | undefined
|
||||
|
||||
if (output && typeof output === 'object') {
|
||||
const outputRecord = output as Record<string, unknown>
|
||||
childWorkflowSnapshotId =
|
||||
typeof outputRecord.childWorkflowSnapshotId === 'string'
|
||||
? outputRecord.childWorkflowSnapshotId
|
||||
: undefined
|
||||
childWorkflowId =
|
||||
typeof outputRecord.childWorkflowId === 'string' ? outputRecord.childWorkflowId : undefined
|
||||
if (childWorkflowSnapshotId || childWorkflowId) {
|
||||
const {
|
||||
childWorkflowSnapshotId: _childSnapshotId,
|
||||
childWorkflowId: _childWorkflowId,
|
||||
...outputRest
|
||||
} = outputRecord
|
||||
output = outputRest
|
||||
}
|
||||
}
|
||||
|
||||
if (log.error) {
|
||||
output = {
|
||||
@@ -134,6 +154,8 @@ export function buildTraceSpans(result: ExecutionResult): {
|
||||
blockId: log.blockId,
|
||||
input: log.input || {},
|
||||
output: output,
|
||||
...(childWorkflowSnapshotId ? { childWorkflowSnapshotId } : {}),
|
||||
...(childWorkflowId ? { childWorkflowId } : {}),
|
||||
...(log.loopId && { loopId: log.loopId }),
|
||||
...(log.parallelId && { parallelId: log.parallelId }),
|
||||
...(log.iterationIndex !== undefined && { iterationIndex: log.iterationIndex }),
|
||||
|
||||
@@ -178,6 +178,8 @@ export interface TraceSpan {
|
||||
blockId?: string
|
||||
input?: Record<string, unknown>
|
||||
output?: Record<string, unknown>
|
||||
childWorkflowSnapshotId?: string
|
||||
childWorkflowId?: string
|
||||
model?: string
|
||||
cost?: {
|
||||
input?: number
|
||||
|
||||
Reference in New Issue
Block a user