Files
sim/apps/sim/executor/execution/snapshot-serializer.ts
Vikhyath Mondreti dc3de95c39 fix(logging): hitl + trigger dev crash protection (#2664)
* hitl gaps

* deal with trigger worker crashes

* cleanup import strcuture
2026-01-02 14:01:01 -08:00

131 lines
4.2 KiB
TypeScript

import type { DAG } from '@/executor/dag/builder'
import { ExecutionSnapshot } from '@/executor/execution/snapshot'
import type { ExecutionMetadata, SerializableExecutionState } from '@/executor/execution/types'
import type { ExecutionContext, SerializedSnapshot } from '@/executor/types'
function mapFromEntries<T>(map?: Map<string, T>): Record<string, T> | undefined {
if (!map) return undefined
return Object.fromEntries(map)
}
function serializeLoopExecutions(
loopExecutions?: Map<string, any>
): Record<string, any> | undefined {
if (!loopExecutions) return undefined
const result: Record<string, any> = {}
for (const [loopId, scope] of loopExecutions.entries()) {
let currentIterationOutputs: any
if (scope.currentIterationOutputs instanceof Map) {
currentIterationOutputs = Object.fromEntries(scope.currentIterationOutputs)
} else {
currentIterationOutputs = scope.currentIterationOutputs ?? {}
}
result[loopId] = {
...scope,
currentIterationOutputs,
}
}
return result
}
function serializeParallelExecutions(
parallelExecutions?: Map<string, any>
): Record<string, any> | undefined {
if (!parallelExecutions) return undefined
const result: Record<string, any> = {}
for (const [parallelId, scope] of parallelExecutions.entries()) {
let branchOutputs: any
if (scope.branchOutputs instanceof Map) {
branchOutputs = Object.fromEntries(scope.branchOutputs)
} else {
branchOutputs = scope.branchOutputs ?? {}
}
result[parallelId] = {
...scope,
branchOutputs,
}
}
return result
}
export function serializePauseSnapshot(
context: ExecutionContext,
triggerBlockIds: string[],
dag?: DAG
): SerializedSnapshot {
const metadataFromContext = context.metadata as ExecutionMetadata | undefined
let useDraftState: boolean
if (metadataFromContext?.useDraftState !== undefined) {
useDraftState = metadataFromContext.useDraftState
} else if (context.isDeployedContext === true) {
useDraftState = false
} else {
useDraftState = true
}
const dagIncomingEdges: Record<string, string[]> | undefined = dag
? Object.fromEntries(
Array.from(dag.nodes.entries()).map(([nodeId, node]) => [
nodeId,
Array.from(node.incomingEdges),
])
)
: undefined
const state: SerializableExecutionState = {
blockStates: Object.fromEntries(context.blockStates),
executedBlocks: Array.from(context.executedBlocks),
blockLogs: context.blockLogs,
decisions: {
router: Object.fromEntries(context.decisions.router),
condition: Object.fromEntries(context.decisions.condition),
},
completedLoops: Array.from(context.completedLoops),
loopExecutions: serializeLoopExecutions(context.loopExecutions),
parallelExecutions: serializeParallelExecutions(context.parallelExecutions),
parallelBlockMapping: mapFromEntries(context.parallelBlockMapping),
activeExecutionPath: Array.from(context.activeExecutionPath),
pendingQueue: triggerBlockIds,
dagIncomingEdges,
}
const workspaceId = metadataFromContext?.workspaceId ?? context.workspaceId
if (!workspaceId) {
throw new Error(
`Cannot serialize pause snapshot: missing workspaceId for workflow ${context.workflowId}`
)
}
const executionMetadata: ExecutionMetadata = {
requestId:
metadataFromContext?.requestId ?? context.executionId ?? context.workflowId ?? 'unknown',
executionId: context.executionId ?? 'unknown',
workflowId: context.workflowId,
workspaceId,
userId: metadataFromContext?.userId ?? '',
sessionUserId: metadataFromContext?.sessionUserId,
workflowUserId: metadataFromContext?.workflowUserId,
triggerType: metadataFromContext?.triggerType ?? 'manual',
triggerBlockId: triggerBlockIds[0],
useDraftState,
startTime: metadataFromContext?.startTime ?? new Date().toISOString(),
isClientSession: metadataFromContext?.isClientSession,
}
const snapshot = new ExecutionSnapshot(
executionMetadata,
context.workflow,
{},
context.workflowVariables ?? {},
context.selectedOutputs ?? [],
state
)
return {
snapshot: snapshot.toJSON(),
triggerIds: triggerBlockIds,
}
}