feat(hitl): add human in the loop block (#1832)

* fix(billing): should allow restoring subscription (#1728)

* fix(already-cancelled-sub): UI should allow restoring subscription

* restore functionality fixed

* fix

* Add pause resume block

* Add db schema

* Initial test passes

* Tests pass

* Execution pauses

* Snapshot serializer

* Ui checkpoint

* Works 1

* Pause resume simple v1

* Hitl block works in parallel branches without timing overlap

* Pending status to logs

* Pause resume ui link

* Big context consolidation

* HITL works in loops

* Fix parallels

* Reference blocks properly

* Fix tag dropdown and start block resolution

* Filter console logs for hitl block

* Fix notifs

* Fix logs page

* Fix logs page again

* Fix

* Checkpoint

* Cleanup v1

* Refactor v2

* Refactor v3

* Refactor v4

* Refactor v5

* Resume page

* Fix variables in loops

* Fix var res bugs

* Ui changes

* Approval block

* Hitl works e2e v1

* Fix tets

* Row level lock

---------

Co-authored-by: Waleed <walif6@gmail.com>
Co-authored-by: Vikhyath Mondreti <vikhyathvikku@gmail.com>
This commit is contained in:
Siddharth Ganesan
2025-11-06 15:59:28 -08:00
committed by GitHub
parent f9ce65eddf
commit 742d59f54d
90 changed files with 13498 additions and 1128 deletions

View File

@@ -1,17 +1,30 @@
import { createLogger } from '@/lib/logs/console/logger'
import { DEFAULTS, EDGE, isSentinelBlockType } from '@/executor/consts'
import { getBaseUrl } from '@/lib/urls/utils'
import {
BlockType,
buildResumeApiUrl,
buildResumeUiUrl,
DEFAULTS,
EDGE,
isSentinelBlockType,
} from '@/executor/consts'
import type { DAGNode } from '@/executor/dag/builder'
import type { BlockStateWriter, ContextExtensions } from '@/executor/execution/types'
import {
generatePauseContextId,
mapNodeMetadataToPauseScopes,
} from '@/executor/pause-resume/utils.ts'
import type {
BlockHandler,
BlockLog,
BlockState,
ExecutionContext,
NormalizedBlockOutput,
} from '@/executor/types'
import { buildBlockExecutionError, normalizeError } from '@/executor/utils/errors'
import type { VariableResolver } from '@/executor/variables/resolver'
import type { SerializedBlock } from '@/serializer/types'
import type { SubflowType } from '@/stores/workflows/workflow/types'
import type { DAGNode } from '../dag/builder'
import type { VariableResolver } from '../variables/resolver'
import type { ExecutionState } from './state'
import type { ContextExtensions } from './types'
const logger = createLogger('BlockExecutor')
@@ -20,7 +33,7 @@ export class BlockExecutor {
private blockHandlers: BlockHandler[],
private resolver: VariableResolver,
private contextExtensions: ContextExtensions,
private state?: ExecutionState
private state: BlockStateWriter
) {}
async execute(
@@ -30,7 +43,11 @@ export class BlockExecutor {
): Promise<NormalizedBlockOutput> {
const handler = this.findHandler(block)
if (!handler) {
throw new Error(`No handler found for block type: ${block.metadata?.id}`)
throw buildBlockExecutionError({
block,
context: ctx,
error: `No handler found for block type: ${block.metadata?.id ?? 'unknown'}`,
})
}
const isSentinel = isSentinelBlockType(block.metadata?.id ?? '')
@@ -45,9 +62,23 @@ export class BlockExecutor {
const startTime = Date.now()
let resolvedInputs: Record<string, any> = {}
const nodeMetadata = this.buildNodeMetadata(node)
let cleanupSelfReference: (() => void) | undefined
if (block.metadata?.id === BlockType.APPROVAL) {
cleanupSelfReference = this.preparePauseResumeSelfReference(ctx, node, block, nodeMetadata)
}
try {
resolvedInputs = this.resolver.resolveInputs(ctx, node.id, block.config.params, block)
const output = await handler.execute(ctx, block, resolvedInputs)
} finally {
cleanupSelfReference?.()
}
try {
const output = handler.executeWithNode
? await handler.executeWithNode(ctx, block, resolvedInputs, nodeMetadata)
: await handler.execute(ctx, block, resolvedInputs)
const isStreamingExecution =
output && typeof output === 'object' && 'stream' in output && 'execution' in output
@@ -65,7 +96,7 @@ export class BlockExecutor {
}
normalizedOutput = this.normalizeOutput(
streamingExec.execution.output || streamingExec.execution
streamingExec.execution.output ?? streamingExec.execution
)
} else {
normalizedOutput = this.normalizeOutput(output)
@@ -77,23 +108,20 @@ export class BlockExecutor {
blockLog.endedAt = new Date().toISOString()
blockLog.durationMs = duration
blockLog.success = true
blockLog.output = normalizedOutput
blockLog.output = this.filterOutputForLog(block, normalizedOutput)
}
ctx.blockStates.set(node.id, {
output: normalizedOutput,
executed: true,
executionTime: duration,
})
this.state.setBlockOutput(node.id, normalizedOutput, duration)
if (!isSentinel) {
this.callOnBlockComplete(ctx, node, block, resolvedInputs, normalizedOutput, duration)
const filteredOutput = this.filterOutputForLog(block, normalizedOutput)
this.callOnBlockComplete(ctx, node, block, resolvedInputs, filteredOutput, duration)
}
return normalizedOutput
} catch (error) {
const duration = Date.now() - startTime
const errorMessage = error instanceof Error ? error.message : String(error)
const errorMessage = normalizeError(error)
if (blockLog) {
blockLog.endedAt = new Date().toISOString()
@@ -106,11 +134,7 @@ export class BlockExecutor {
error: errorMessage,
}
ctx.blockStates.set(node.id, {
output: errorOutput,
executed: true,
executionTime: duration,
})
this.state.setBlockOutput(node.id, errorOutput, duration)
logger.error('Block execution failed', {
blockId: node.id,
@@ -132,7 +156,39 @@ export class BlockExecutor {
return errorOutput
}
throw error
let errorToThrow: Error | string
if (error instanceof Error) {
errorToThrow = error
} else {
errorToThrow = errorMessage
}
throw buildBlockExecutionError({
block,
error: errorToThrow,
context: ctx,
additionalInfo: {
nodeId: node.id,
executionTime: duration,
},
})
}
}
private buildNodeMetadata(node: DAGNode): {
nodeId: string
loopId?: string
parallelId?: string
branchIndex?: number
branchTotal?: number
} {
const metadata = node?.metadata ?? {}
return {
nodeId: node.id,
loopId: metadata.loopId,
parallelId: metadata.parallelId,
branchIndex: metadata.branchIndex,
branchTotal: metadata.branchTotal,
}
}
@@ -155,7 +211,7 @@ export class BlockExecutor {
block: SerializedBlock,
node: DAGNode
): BlockLog {
let blockName = block.metadata?.name || blockId
let blockName = block.metadata?.name ?? blockId
let loopId: string | undefined
let parallelId: string | undefined
let iterationIndex: number | undefined
@@ -165,24 +221,12 @@ export class BlockExecutor {
blockName = `${blockName} (iteration ${node.metadata.branchIndex})`
iterationIndex = node.metadata.branchIndex
parallelId = node.metadata.parallelId
logger.debug('Added parallel iteration suffix', {
blockId,
parallelId,
branchIndex: node.metadata.branchIndex,
blockName,
})
} else if (node.metadata.isLoopNode && node.metadata.loopId && this.state) {
} else if (node.metadata.isLoopNode && node.metadata.loopId) {
loopId = node.metadata.loopId
const loopScope = this.state.getLoopScope(loopId)
const loopScope = ctx.loopExecutions?.get(loopId)
if (loopScope && loopScope.iteration !== undefined) {
blockName = `${blockName} (iteration ${loopScope.iteration})`
iterationIndex = loopScope.iteration
logger.debug('Added loop iteration suffix', {
blockId,
loopId,
iteration: loopScope.iteration,
blockName,
})
} else {
logger.warn('Loop scope not found for block', { blockId, loopId })
}
@@ -192,7 +236,7 @@ export class BlockExecutor {
return {
blockId,
blockName,
blockType: block.metadata?.id || DEFAULTS.BLOCK_TYPE,
blockType: block.metadata?.id ?? DEFAULTS.BLOCK_TYPE,
startedAt: new Date().toISOString(),
endedAt: '',
durationMs: 0,
@@ -215,12 +259,28 @@ export class BlockExecutor {
return { result: output }
}
private filterOutputForLog(
block: SerializedBlock,
output: NormalizedBlockOutput
): NormalizedBlockOutput {
if (block.metadata?.id === BlockType.APPROVAL) {
const filtered: NormalizedBlockOutput = {}
for (const [key, value] of Object.entries(output)) {
if (key.startsWith('_')) continue
if (key === 'response') continue
filtered[key] = value
}
return filtered
}
return output
}
private callOnBlockStart(ctx: ExecutionContext, node: DAGNode, block: SerializedBlock): void {
const blockId = node.id
const blockName = block.metadata?.name || blockId
const blockType = block.metadata?.id || DEFAULTS.BLOCK_TYPE
const blockName = block.metadata?.name ?? blockId
const blockType = block.metadata?.id ?? DEFAULTS.BLOCK_TYPE
const iterationContext = this.getIterationContext(node)
const iterationContext = this.getIterationContext(ctx, node)
if (this.contextExtensions.onBlockStart) {
this.contextExtensions.onBlockStart(blockId, blockName, blockType, iterationContext)
@@ -236,10 +296,10 @@ export class BlockExecutor {
duration: number
): void {
const blockId = node.id
const blockName = block.metadata?.name || blockId
const blockType = block.metadata?.id || DEFAULTS.BLOCK_TYPE
const blockName = block.metadata?.name ?? blockId
const blockType = block.metadata?.id ?? DEFAULTS.BLOCK_TYPE
const iterationContext = this.getIterationContext(node)
const iterationContext = this.getIterationContext(ctx, node)
if (this.contextExtensions.onBlockComplete) {
this.contextExtensions.onBlockComplete(
@@ -257,6 +317,7 @@ export class BlockExecutor {
}
private getIterationContext(
ctx: ExecutionContext,
node: DAGNode
): { iterationCurrent: number; iterationTotal: number; iterationType: SubflowType } | undefined {
if (!node?.metadata) return undefined
@@ -269,8 +330,8 @@ export class BlockExecutor {
}
}
if (node.metadata.isLoopNode && node.metadata.loopId && this.state) {
const loopScope = this.state.getLoopScope(node.metadata.loopId)
if (node.metadata.isLoopNode && node.metadata.loopId) {
const loopScope = ctx.loopExecutions?.get(node.metadata.loopId)
if (loopScope && loopScope.iteration !== undefined && loopScope.maxIterations) {
return {
iterationCurrent: loopScope.iteration,
@@ -282,4 +343,74 @@ export class BlockExecutor {
return undefined
}
private preparePauseResumeSelfReference(
ctx: ExecutionContext,
node: DAGNode,
block: SerializedBlock,
nodeMetadata: {
nodeId: string
loopId?: string
parallelId?: string
branchIndex?: number
branchTotal?: number
}
): (() => void) | undefined {
const blockId = node.id
const existingState = ctx.blockStates.get(blockId)
if (existingState?.executed) {
return undefined
}
const executionId = ctx.executionId ?? ctx.metadata?.executionId
const workflowId = ctx.workflowId
if (!executionId || !workflowId) {
return undefined
}
const { loopScope } = mapNodeMetadataToPauseScopes(ctx, nodeMetadata)
const contextId = generatePauseContextId(block.id, nodeMetadata, loopScope)
let resumeLinks: { apiUrl: string; uiUrl: string }
try {
const baseUrl = getBaseUrl()
resumeLinks = {
apiUrl: buildResumeApiUrl(baseUrl, workflowId, executionId, contextId),
uiUrl: buildResumeUiUrl(baseUrl, workflowId, executionId),
}
} catch {
resumeLinks = {
apiUrl: buildResumeApiUrl(undefined, workflowId, executionId, contextId),
uiUrl: buildResumeUiUrl(undefined, workflowId, executionId),
}
}
let previousState: BlockState | undefined
if (existingState) {
previousState = { ...existingState }
}
const hadPrevious = existingState !== undefined
const placeholderState: BlockState = {
output: {
uiUrl: resumeLinks.uiUrl,
apiUrl: resumeLinks.apiUrl,
},
executed: false,
executionTime: existingState?.executionTime ?? 0,
}
this.state.setBlockState(blockId, placeholderState)
return () => {
if (hadPrevious && previousState) {
this.state.setBlockState(blockId, previousState)
} else {
this.state.deleteBlockState(blockId)
}
}
}
}

View File

@@ -1,8 +1,8 @@
import { createLogger } from '@/lib/logs/console/logger'
import { EDGE } from '@/executor/consts'
import type { DAG, DAGNode } from '@/executor/dag/builder'
import type { DAGEdge } from '@/executor/dag/types'
import type { NormalizedBlockOutput } from '@/executor/types'
import type { DAG, DAGNode } from '../dag/builder'
import type { DAGEdge } from '../dag/types'
const logger = createLogger('EdgeManager')
@@ -17,15 +17,9 @@ export class EdgeManager {
skipBackwardsEdge = false
): string[] {
const readyNodes: string[] = []
logger.debug('Processing outgoing edges', {
nodeId: node.id,
edgeCount: node.outgoingEdges.size,
skipBackwardsEdge,
})
for (const [edgeId, edge] of node.outgoingEdges) {
if (skipBackwardsEdge && this.isBackwardsEdge(edge.sourceHandle)) {
logger.debug('Skipping backwards edge', { edgeId })
continue
}
@@ -40,14 +34,6 @@ export class EdgeManager {
this.deactivateEdgeAndDescendants(node.id, edge.target, edge.sourceHandle)
}
logger.debug('Edge not activated', {
edgeId,
sourceHandle: edge.sourceHandle,
from: node.id,
to: edge.target,
isLoopEdge,
deactivatedDescendants: !isLoopEdge,
})
continue
}
@@ -58,14 +44,8 @@ export class EdgeManager {
}
targetNode.incomingEdges.delete(node.id)
logger.debug('Removed incoming edge', {
from: node.id,
target: edge.target,
remainingIncomingEdges: targetNode.incomingEdges.size,
})
if (this.isNodeReady(targetNode)) {
logger.debug('Node ready', { nodeId: targetNode.id })
readyNodes.push(targetNode.id)
}
}
@@ -80,18 +60,9 @@ export class EdgeManager {
const activeIncomingCount = this.countActiveIncomingEdges(node)
if (activeIncomingCount > 0) {
logger.debug('Node not ready - waiting for active incoming edges', {
nodeId: node.id,
totalIncoming: node.incomingEdges.size,
activeIncoming: activeIncomingCount,
})
return false
}
logger.debug('Node ready - all remaining edges are deactivated', {
nodeId: node.id,
totalIncoming: node.incomingEdges.size,
})
return true
}
@@ -103,10 +74,6 @@ export class EdgeManager {
}
targetNode.incomingEdges.add(sourceNodeId)
logger.debug('Restored incoming edge', {
from: sourceNodeId,
to: targetNodeId,
})
}
clearDeactivatedEdges(): void {
@@ -116,33 +83,37 @@ export class EdgeManager {
private shouldActivateEdge(edge: DAGEdge, output: NormalizedBlockOutput): boolean {
const handle = edge.sourceHandle
if (handle?.startsWith(EDGE.CONDITION_PREFIX)) {
if (!handle) {
return true
}
if (handle.startsWith(EDGE.CONDITION_PREFIX)) {
const conditionValue = handle.substring(EDGE.CONDITION_PREFIX.length)
return output.selectedOption === conditionValue
}
if (handle?.startsWith(EDGE.ROUTER_PREFIX)) {
if (handle.startsWith(EDGE.ROUTER_PREFIX)) {
const routeId = handle.substring(EDGE.ROUTER_PREFIX.length)
return output.selectedRoute === routeId
}
if (handle === EDGE.LOOP_CONTINUE || handle === EDGE.LOOP_CONTINUE_ALT) {
return output.selectedRoute === EDGE.LOOP_CONTINUE
}
switch (handle) {
case EDGE.LOOP_CONTINUE:
case EDGE.LOOP_CONTINUE_ALT:
return output.selectedRoute === EDGE.LOOP_CONTINUE
if (handle === EDGE.LOOP_EXIT) {
return output.selectedRoute === EDGE.LOOP_EXIT
}
case EDGE.LOOP_EXIT:
return output.selectedRoute === EDGE.LOOP_EXIT
if (handle === EDGE.ERROR && !output.error) {
return false
}
case EDGE.ERROR:
return !!output.error
if (handle === EDGE.SOURCE && output.error) {
return false
}
case EDGE.SOURCE:
return !output.error
return true
default:
return true
}
}
private isBackwardsEdge(sourceHandle?: string): boolean {
@@ -165,7 +136,6 @@ export class EdgeManager {
const hasOtherActiveIncoming = this.hasActiveIncomingEdges(targetNode, sourceId)
if (!hasOtherActiveIncoming) {
logger.debug('Deactivating descendants of unreachable node', { nodeId: targetId })
for (const [_, outgoingEdge] of targetNode.outgoingEdges) {
this.deactivateEdgeAndDescendants(targetId, outgoingEdge.target, outgoingEdge.sourceHandle)
}
@@ -218,6 +188,6 @@ export class EdgeManager {
}
private createEdgeKey(sourceId: string, targetId: string, sourceHandle?: string): string {
return `${sourceId}-${targetId}-${sourceHandle || EDGE.DEFAULT}`
return `${sourceId}-${targetId}-${sourceHandle ?? EDGE.DEFAULT}`
}
}

View File

@@ -1,9 +1,18 @@
import { createLogger } from '@/lib/logs/console/logger'
import { BlockType } from '@/executor/consts'
import type { ExecutionContext, ExecutionResult, NormalizedBlockOutput } from '@/executor/types'
import type { DAG } from '../dag/builder'
import type { NodeExecutionOrchestrator } from '../orchestrators/node'
import type { EdgeManager } from './edge-manager'
import type { DAG } from '@/executor/dag/builder'
import type { EdgeManager } from '@/executor/execution/edge-manager'
import { serializePauseSnapshot } from '@/executor/execution/snapshot-serializer'
import type { NodeExecutionOrchestrator } from '@/executor/orchestrators/node'
import type {
ExecutionContext,
ExecutionResult,
NormalizedBlockOutput,
PauseMetadata,
PausePoint,
ResumeStatus,
} from '@/executor/types'
import { normalizeError } from '@/executor/utils/errors'
const logger = createLogger('ExecutionEngine')
@@ -12,32 +21,32 @@ export class ExecutionEngine {
private executing = new Set<Promise<void>>()
private queueLock = Promise.resolve()
private finalOutput: NormalizedBlockOutput = {}
private pausedBlocks: Map<string, PauseMetadata> = new Map()
private allowResumeTriggers: boolean
constructor(
private context: ExecutionContext,
private dag: DAG,
private edgeManager: EdgeManager,
private nodeOrchestrator: NodeExecutionOrchestrator,
private context: ExecutionContext
) {}
private nodeOrchestrator: NodeExecutionOrchestrator
) {
this.allowResumeTriggers = this.context.metadata.resumeFromSnapshot === true
}
async run(triggerBlockId?: string): Promise<ExecutionResult> {
const startTime = Date.now()
try {
this.initializeQueue(triggerBlockId)
logger.debug('Starting execution loop', {
initialQueueSize: this.readyQueue.length,
startNodeId: triggerBlockId,
})
while (this.hasWork()) {
await this.processQueue()
}
logger.debug('Execution loop completed', {
finalOutputKeys: Object.keys(this.finalOutput),
})
await this.waitForAllExecutions()
if (this.pausedBlocks.size > 0) {
return this.buildPausedResult(startTime)
}
const endTime = Date.now()
this.context.metadata.endTime = new Date(endTime).toISOString()
this.context.metadata.duration = endTime - startTime
@@ -53,7 +62,7 @@ export class ExecutionEngine {
this.context.metadata.endTime = new Date(endTime).toISOString()
this.context.metadata.duration = endTime - startTime
const errorMessage = error instanceof Error ? error.message : String(error)
const errorMessage = normalizeError(error)
logger.error('Execution failed', { error: errorMessage })
const executionResult: ExecutionResult = {
@@ -74,9 +83,13 @@ export class ExecutionEngine {
}
private addToQueue(nodeId: string): void {
const node = this.dag.nodes.get(nodeId)
if (node?.metadata?.isResumeTrigger && !this.allowResumeTriggers) {
return
}
if (!this.readyQueue.includes(nodeId)) {
this.readyQueue.push(nodeId)
logger.debug('Added to queue', { nodeId, queueLength: this.readyQueue.length })
}
}
@@ -122,6 +135,56 @@ export class ExecutionEngine {
}
private initializeQueue(triggerBlockId?: string): void {
const pendingBlocks = this.context.metadata.pendingBlocks
const remainingEdges = (this.context.metadata as any).remainingEdges
if (remainingEdges && Array.isArray(remainingEdges) && remainingEdges.length > 0) {
logger.info('Removing edges from resumed pause blocks', {
edgeCount: remainingEdges.length,
edges: remainingEdges,
})
for (const edge of remainingEdges) {
const targetNode = this.dag.nodes.get(edge.target)
if (targetNode) {
const hadEdge = targetNode.incomingEdges.has(edge.source)
targetNode.incomingEdges.delete(edge.source)
if (this.edgeManager.isNodeReady(targetNode)) {
logger.info('Node became ready after edge removal', { nodeId: targetNode.id })
this.addToQueue(targetNode.id)
}
}
}
logger.info('Edge removal complete, queued ready nodes', {
queueLength: this.readyQueue.length,
queuedNodes: this.readyQueue,
})
return
}
if (pendingBlocks && pendingBlocks.length > 0) {
logger.info('Initializing queue from pending blocks (resume mode)', {
pendingBlocks,
allowResumeTriggers: this.allowResumeTriggers,
dagNodeCount: this.dag.nodes.size,
})
for (const nodeId of pendingBlocks) {
this.addToQueue(nodeId)
}
logger.info('Pending blocks queued', {
queueLength: this.readyQueue.length,
queuedNodes: this.readyQueue,
})
this.context.metadata.pendingBlocks = []
return
}
if (triggerBlockId) {
this.addToQueue(triggerBlockId)
return
@@ -155,18 +218,17 @@ export class ExecutionEngine {
private async executeNodeAsync(nodeId: string): Promise<void> {
try {
const wasAlreadyExecuted = this.context.executedBlocks.has(nodeId)
const result = await this.nodeOrchestrator.executeNode(nodeId, this.context)
const node = this.dag.nodes.get(nodeId)
const result = await this.nodeOrchestrator.executeNode(this.context, nodeId)
if (!wasAlreadyExecuted) {
await this.withQueueLock(async () => {
await this.handleNodeCompletion(nodeId, result.output, result.isFinalOutput)
})
} else {
logger.debug('Node was already executed, skipping edge processing to avoid loops', {
nodeId,
})
}
} catch (error) {
const errorMessage = error instanceof Error ? error.message : String(error)
const errorMessage = normalizeError(error)
logger.error('Node execution failed', { nodeId, error: errorMessage })
throw error
}
@@ -183,19 +245,73 @@ export class ExecutionEngine {
return
}
await this.nodeOrchestrator.handleNodeCompletion(nodeId, output, this.context)
if (output._pauseMetadata) {
const pauseMetadata = output._pauseMetadata
this.pausedBlocks.set(pauseMetadata.contextId, pauseMetadata)
this.context.metadata.status = 'paused'
this.context.metadata.pausePoints = Array.from(this.pausedBlocks.keys())
return
}
await this.nodeOrchestrator.handleNodeCompletion(this.context, nodeId, output)
if (isFinalOutput) {
this.finalOutput = output
}
const readyNodes = this.edgeManager.processOutgoingEdges(node, output, false)
this.addMultipleToQueue(readyNodes)
logger.debug('Node completion handled', {
logger.info('Processing outgoing edges', {
nodeId,
outgoingEdgesCount: node.outgoingEdges.size,
readyNodesCount: readyNodes.length,
queueSize: this.readyQueue.length,
readyNodes,
})
this.addMultipleToQueue(readyNodes)
}
private buildPausedResult(startTime: number): ExecutionResult {
const endTime = Date.now()
this.context.metadata.endTime = new Date(endTime).toISOString()
this.context.metadata.duration = endTime - startTime
this.context.metadata.status = 'paused'
const snapshotSeed = serializePauseSnapshot(this.context, [], this.dag)
const pausePoints: PausePoint[] = Array.from(this.pausedBlocks.values()).map((pause) => ({
contextId: pause.contextId,
blockId: pause.blockId,
response: pause.response,
registeredAt: pause.timestamp,
resumeStatus: 'paused' as ResumeStatus,
snapshotReady: true,
parallelScope: pause.parallelScope,
loopScope: pause.loopScope,
resumeLinks: pause.resumeLinks,
}))
return {
success: true,
output: this.collectPauseResponses(),
logs: this.context.blockLogs,
metadata: this.context.metadata,
status: 'paused',
pausePoints,
snapshotSeed,
}
}
private collectPauseResponses(): NormalizedBlockOutput {
const responses = Array.from(this.pausedBlocks.values()).map((pause) => pause.response)
if (responses.length === 1) {
return responses[0]
}
return {
pausedBlocks: responses,
pauseCount: responses.length,
}
}
}

View File

@@ -1,23 +1,24 @@
import { createLogger } from '@/lib/logs/console/logger'
import { StartBlockPath } from '@/lib/workflows/triggers'
import type { BlockOutput } from '@/blocks/types'
import { DAGBuilder } from '@/executor/dag/builder'
import { BlockExecutor } from '@/executor/execution/block-executor'
import { EdgeManager } from '@/executor/execution/edge-manager'
import { ExecutionEngine } from '@/executor/execution/engine'
import { ExecutionState } from '@/executor/execution/state'
import type { ContextExtensions, WorkflowInput } from '@/executor/execution/types'
import { createBlockHandlers } from '@/executor/handlers/registry'
import type { ExecutionContext, ExecutionResult } from '@/executor/types'
import { LoopOrchestrator } from '@/executor/orchestrators/loop'
import { NodeExecutionOrchestrator } from '@/executor/orchestrators/node'
import { ParallelOrchestrator } from '@/executor/orchestrators/parallel'
import type { BlockState, ExecutionContext, ExecutionResult } from '@/executor/types'
import {
buildResolutionFromBlock,
buildStartBlockOutput,
resolveExecutorStartBlock,
} from '@/executor/utils/start-block'
import { VariableResolver } from '@/executor/variables/resolver'
import type { SerializedWorkflow } from '@/serializer/types'
import { DAGBuilder } from '../dag/builder'
import { LoopOrchestrator } from '../orchestrators/loop'
import { NodeExecutionOrchestrator } from '../orchestrators/node'
import { ParallelOrchestrator } from '../orchestrators/parallel'
import { VariableResolver } from '../variables/resolver'
import { BlockExecutor } from './block-executor'
import { EdgeManager } from './edge-manager'
import { ExecutionEngine } from './engine'
import { ExecutionState } from './state'
import type { ContextExtensions, WorkflowInput } from './types'
const logger = createLogger('DAGExecutor')
@@ -32,7 +33,6 @@ export interface DAGExecutorOptions {
export class DAGExecutor {
private workflow: SerializedWorkflow
private initialBlockStates: Record<string, BlockOutput>
private environmentVariables: Record<string, string>
private workflowInput: WorkflowInput
private workflowVariables: Record<string, unknown>
@@ -42,19 +42,25 @@ export class DAGExecutor {
constructor(options: DAGExecutorOptions) {
this.workflow = options.workflow
this.initialBlockStates = options.currentBlockStates || {}
this.environmentVariables = options.envVarValues || {}
this.workflowInput = options.workflowInput || {}
this.workflowVariables = options.workflowVariables || {}
this.contextExtensions = options.contextExtensions || {}
this.environmentVariables = options.envVarValues ?? {}
this.workflowInput = options.workflowInput ?? {}
this.workflowVariables = options.workflowVariables ?? {}
this.contextExtensions = options.contextExtensions ?? {}
this.dagBuilder = new DAGBuilder()
}
async execute(workflowId: string, triggerBlockId?: string): Promise<ExecutionResult> {
const dag = this.dagBuilder.build(this.workflow, triggerBlockId)
const context = this.createExecutionContext(workflowId, triggerBlockId)
// Create state with shared references to context's maps/sets for single source of truth
const state = new ExecutionState(context.blockStates, context.executedBlocks)
const savedIncomingEdges = this.contextExtensions.dagIncomingEdges
const dag = this.dagBuilder.build(this.workflow, triggerBlockId, savedIncomingEdges)
const { context, state } = this.createExecutionContext(workflowId, triggerBlockId)
// Link cancellation flag to context
Object.defineProperty(context, 'isCancelled', {
get: () => this.isCancelled,
enumerable: true,
configurable: true,
})
const resolver = new VariableResolver(this.workflow, this.workflowVariables, state)
const loopOrchestrator = new LoopOrchestrator(dag, state, resolver)
const parallelOrchestrator = new ParallelOrchestrator(dag, state)
@@ -68,7 +74,7 @@ export class DAGExecutor {
loopOrchestrator,
parallelOrchestrator
)
const engine = new ExecutionEngine(dag, edgeManager, nodeOrchestrator, context)
const engine = new ExecutionEngine(context, dag, edgeManager, nodeOrchestrator)
return await engine.run(triggerBlockId)
}
@@ -77,14 +83,14 @@ export class DAGExecutor {
}
async continueExecution(
pendingBlocks: string[],
_pendingBlocks: string[],
context: ExecutionContext
): Promise<ExecutionResult> {
logger.warn('Debug mode (continueExecution) is not yet implemented in the refactored executor')
return {
success: false,
output: {},
logs: context.blockLogs || [],
logs: context.blockLogs ?? [],
error: 'Debug mode is not yet supported in the refactored executor',
metadata: {
duration: 0,
@@ -93,44 +99,118 @@ export class DAGExecutor {
}
}
private createExecutionContext(workflowId: string, triggerBlockId?: string): ExecutionContext {
private createExecutionContext(
workflowId: string,
triggerBlockId?: string
): { context: ExecutionContext; state: ExecutionState } {
const snapshotState = this.contextExtensions.snapshotState
const blockStates = snapshotState?.blockStates
? new Map(Object.entries(snapshotState.blockStates))
: new Map<string, BlockState>()
const executedBlocks = snapshotState?.executedBlocks
? new Set(snapshotState.executedBlocks)
: new Set<string>()
const state = new ExecutionState(blockStates, executedBlocks)
const context: ExecutionContext = {
workflowId,
workspaceId: this.contextExtensions.workspaceId,
executionId: this.contextExtensions.executionId,
userId: this.contextExtensions.userId,
isDeployedContext: this.contextExtensions.isDeployedContext,
blockStates: new Map(),
blockLogs: [],
blockStates: state.getBlockStates(),
blockLogs: snapshotState?.blockLogs ?? [],
metadata: {
startTime: new Date().toISOString(),
duration: 0,
useDraftState: this.contextExtensions.isDeployedContext !== true,
},
environmentVariables: this.environmentVariables,
workflowVariables: this.workflowVariables,
decisions: {
router: new Map(),
condition: new Map(),
router: snapshotState?.decisions?.router
? new Map(Object.entries(snapshotState.decisions.router))
: new Map(),
condition: snapshotState?.decisions?.condition
? new Map(Object.entries(snapshotState.decisions.condition))
: new Map(),
},
loopIterations: new Map(),
loopItems: new Map(),
completedLoops: new Set(),
executedBlocks: new Set(),
activeExecutionPath: new Set(),
completedLoops: snapshotState?.completedLoops
? new Set(snapshotState.completedLoops)
: new Set(),
loopExecutions: snapshotState?.loopExecutions
? new Map(
Object.entries(snapshotState.loopExecutions).map(([loopId, scope]) => [
loopId,
{
...scope,
currentIterationOutputs: scope.currentIterationOutputs
? new Map(Object.entries(scope.currentIterationOutputs))
: new Map(),
},
])
)
: new Map(),
parallelExecutions: snapshotState?.parallelExecutions
? new Map(
Object.entries(snapshotState.parallelExecutions).map(([parallelId, scope]) => [
parallelId,
{
...scope,
branchOutputs: scope.branchOutputs
? new Map(Object.entries(scope.branchOutputs).map(([k, v]) => [Number(k), v]))
: new Map(),
},
])
)
: new Map(),
executedBlocks: state.getExecutedBlocks(),
activeExecutionPath: snapshotState?.activeExecutionPath
? new Set(snapshotState.activeExecutionPath)
: new Set(),
workflow: this.workflow,
stream: this.contextExtensions.stream || false,
selectedOutputs: this.contextExtensions.selectedOutputs || [],
edges: this.contextExtensions.edges || [],
stream: this.contextExtensions.stream ?? false,
selectedOutputs: this.contextExtensions.selectedOutputs ?? [],
edges: this.contextExtensions.edges ?? [],
onStream: this.contextExtensions.onStream,
onBlockStart: this.contextExtensions.onBlockStart,
onBlockComplete: this.contextExtensions.onBlockComplete,
}
this.initializeStarterBlock(context, triggerBlockId)
return context
if (this.contextExtensions.resumeFromSnapshot) {
context.metadata.resumeFromSnapshot = true
logger.info('Resume from snapshot enabled', {
resumePendingQueue: this.contextExtensions.resumePendingQueue,
remainingEdges: this.contextExtensions.remainingEdges,
triggerBlockId,
})
}
if (this.contextExtensions.remainingEdges) {
;(context.metadata as any).remainingEdges = this.contextExtensions.remainingEdges
logger.info('Set remaining edges for resume', {
edgeCount: this.contextExtensions.remainingEdges.length,
})
}
if (this.contextExtensions.resumePendingQueue?.length) {
context.metadata.pendingBlocks = [...this.contextExtensions.resumePendingQueue]
logger.info('Set pending blocks from resume queue', {
pendingBlocks: context.metadata.pendingBlocks,
skipStarterBlockInit: true,
})
} else {
this.initializeStarterBlock(context, state, triggerBlockId)
}
return { context, state }
}
private initializeStarterBlock(context: ExecutionContext, triggerBlockId?: string): void {
private initializeStarterBlock(
context: ExecutionContext,
state: ExecutionState,
triggerBlockId?: string
): void {
let startResolution: ReturnType<typeof resolveExecutorStartBlock> | null = null
if (triggerBlockId) {
@@ -145,14 +225,10 @@ export class DAGExecutor {
startResolution = buildResolutionFromBlock(triggerBlock)
if (!startResolution) {
logger.debug('Creating generic resolution for trigger block', {
triggerBlockId,
blockType: triggerBlock.metadata?.id,
})
startResolution = {
blockId: triggerBlock.id,
block: triggerBlock,
path: 'split_manual' as any,
path: StartBlockPath.SPLIT_MANUAL,
}
}
} else {
@@ -167,21 +243,20 @@ export class DAGExecutor {
}
}
if (state.getBlockStates().has(startResolution.block.id)) {
return
}
const blockOutput = buildStartBlockOutput({
resolution: startResolution,
workflowInput: this.workflowInput,
isDeployedExecution: this.contextExtensions?.isDeployedContext === true,
})
context.blockStates.set(startResolution.block.id, {
state.setBlockState(startResolution.block.id, {
output: blockOutput,
executed: true,
executed: false,
executionTime: 0,
})
logger.debug('Initialized start block', {
blockId: startResolution.block.id,
blockType: startResolution.block.metadata?.id,
})
}
}

View File

@@ -0,0 +1,129 @@
import type { DAG } from '@/executor/dag/builder'
import type { SerializableExecutionState } from '@/executor/execution/snapshot'
import { ExecutionSnapshot } from '@/executor/execution/snapshot'
import type { ExecutionContext, ExecutionMetadata, SerializedSnapshot } from '@/executor/types'
function mapFromEntries<T>(map?: Map<string, T>): Record<string, T> | undefined {
if (!map) return undefined
return Object.fromEntries(map)
}
function setToArray<T>(set?: Set<T>): T[] | undefined {
if (!set) return undefined
return Array.from(set)
}
function serializeLoopExecutions(
loopExecutions?: Map<string, any>
): Record<string, any> | undefined {
if (!loopExecutions) return undefined
const result: Record<string, any> = {}
for (const [loopId, scope] of loopExecutions.entries()) {
let currentIterationOutputs: any
if (scope.currentIterationOutputs instanceof Map) {
currentIterationOutputs = Object.fromEntries(scope.currentIterationOutputs)
} else {
currentIterationOutputs = scope.currentIterationOutputs ?? {}
}
result[loopId] = {
...scope,
currentIterationOutputs,
}
}
return result
}
function serializeParallelExecutions(
parallelExecutions?: Map<string, any>
): Record<string, any> | undefined {
if (!parallelExecutions) return undefined
const result: Record<string, any> = {}
for (const [parallelId, scope] of parallelExecutions.entries()) {
let branchOutputs: any
if (scope.branchOutputs instanceof Map) {
branchOutputs = Object.fromEntries(scope.branchOutputs)
} else {
branchOutputs = scope.branchOutputs ?? {}
}
result[parallelId] = {
...scope,
branchOutputs,
}
}
return result
}
export function serializePauseSnapshot(
context: ExecutionContext,
triggerBlockIds: string[],
dag?: DAG
): SerializedSnapshot {
const metadataFromContext = context.metadata as ExecutionMetadata | undefined
let useDraftState: boolean
if (metadataFromContext?.useDraftState !== undefined) {
useDraftState = metadataFromContext.useDraftState
} else if (context.isDeployedContext === true) {
useDraftState = false
} else {
useDraftState = true
}
const dagIncomingEdges: Record<string, string[]> | undefined = dag
? Object.fromEntries(
Array.from(dag.nodes.entries()).map(([nodeId, node]) => [
nodeId,
Array.from(node.incomingEdges),
])
)
: undefined
const state: SerializableExecutionState = {
blockStates: Object.fromEntries(context.blockStates),
executedBlocks: Array.from(context.executedBlocks),
blockLogs: context.blockLogs,
decisions: {
router: Object.fromEntries(context.decisions.router),
condition: Object.fromEntries(context.decisions.condition),
},
completedLoops: Array.from(context.completedLoops),
loopExecutions: serializeLoopExecutions(context.loopExecutions),
parallelExecutions: serializeParallelExecutions(context.parallelExecutions),
parallelBlockMapping: mapFromEntries(context.parallelBlockMapping),
activeExecutionPath: Array.from(context.activeExecutionPath),
pendingQueue: triggerBlockIds,
dagIncomingEdges,
}
const executionMetadata = {
requestId:
(context.metadata as any)?.requestId ??
context.executionId ??
context.workflowId ??
'unknown',
executionId: context.executionId ?? 'unknown',
workflowId: context.workflowId,
workspaceId: context.workspaceId,
userId: (context.metadata as any)?.userId ?? '',
triggerType: (context.metadata as any)?.triggerType ?? 'manual',
triggerBlockId: triggerBlockIds[0],
useDraftState,
startTime: context.metadata.startTime ?? new Date().toISOString(),
}
const snapshot = new ExecutionSnapshot(
executionMetadata,
context.workflow,
{},
context.environmentVariables ?? {},
context.workflowVariables ?? {},
context.selectedOutputs ?? [],
state
)
return {
snapshot: snapshot.toJSON(),
triggerIds: triggerBlockIds,
}
}

View File

@@ -11,6 +11,8 @@ export interface ExecutionMetadata {
triggerBlockId?: string
useDraftState: boolean
startTime: string
pendingBlocks?: string[]
resumeFromSnapshot?: boolean
}
export interface ExecutionCallbacks {
@@ -33,8 +35,6 @@ export interface SerializableExecutionState {
router: Record<string, string>
condition: Record<string, string>
}
loopIterations: Record<string, number>
loopItems: Record<string, any>
completedLoops: string[]
loopExecutions?: Record<string, any>
parallelExecutions?: Record<string, any>
@@ -42,6 +42,8 @@ export interface SerializableExecutionState {
activeExecutionPath: string[]
pendingQueue?: string[]
remainingEdges?: Edge[]
dagIncomingEdges?: Record<string, string[]>
completedPauseContexts?: string[]
}
export class ExecutionSnapshot {
@@ -80,19 +82,3 @@ export class ExecutionSnapshot {
)
}
}
// TODO: Implement pause/resume functionality
//
// Future implementation should include:
// 1. executor.pause() - Captures current state mid-execution
// - Serialize ExecutionContext (blockStates, decisions, loops, etc) to state property
// - Save snapshot.toJSON() to database
// 2. executor.resume(snapshot) - Reconstructs execution from saved state
// - Load snapshot from database
// - Restore ExecutionContext from state property
// - Continue execution from pendingQueue
// 3. API endpoints:
// - POST /api/executions/[id]/pause
// - POST /api/executions/[id]/resume
// 4. Database schema:
// - execution_snapshots table with snapshot JSON column

View File

@@ -1,4 +1,9 @@
import type { NormalizedBlockOutput } from '@/executor/types'
import type { BlockStateController } from '@/executor/execution/types'
import type { BlockState, NormalizedBlockOutput } from '@/executor/types'
function normalizeLookupId(id: string): string {
return id.replace(/\d+/gu, '').replace(/_loop\d+/g, '')
}
export interface LoopScope {
iteration: number
currentIterationOutputs: Map<string, NormalizedBlockOutput>
@@ -18,53 +23,77 @@ export interface ParallelScope {
totalExpectedNodes: number
}
export class ExecutionState {
// Shared references with ExecutionContext for single source of truth
readonly blockStates: Map<
string,
{ output: NormalizedBlockOutput; executed: boolean; executionTime: number }
>
readonly executedBlocks: Set<string>
readonly loopScopes = new Map<string, LoopScope>()
readonly parallelScopes = new Map<string, ParallelScope>()
export class ExecutionState implements BlockStateController {
private readonly blockStates: Map<string, BlockState>
private readonly executedBlocks: Set<string>
constructor(
blockStates: Map<
string,
{ output: NormalizedBlockOutput; executed: boolean; executionTime: number }
>,
executedBlocks: Set<string>
) {
this.blockStates = blockStates
this.executedBlocks = executedBlocks
constructor(blockStates?: Map<string, BlockState>, executedBlocks?: Set<string>) {
this.blockStates = blockStates ?? new Map()
this.executedBlocks = executedBlocks ?? new Set()
}
getBlockOutput(blockId: string): NormalizedBlockOutput | undefined {
return this.blockStates.get(blockId)?.output
getBlockStates(): ReadonlyMap<string, BlockState> {
return this.blockStates
}
setBlockOutput(blockId: string, output: NormalizedBlockOutput): void {
this.blockStates.set(blockId, { output, executed: true, executionTime: 0 })
getExecutedBlocks(): ReadonlySet<string> {
return this.executedBlocks
}
getBlockOutput(blockId: string, currentNodeId?: string): NormalizedBlockOutput | undefined {
const direct = this.blockStates.get(blockId)?.output
if (direct !== undefined) {
return direct
}
const normalizedId = normalizeLookupId(blockId)
if (normalizedId !== blockId) {
return undefined
}
if (currentNodeId) {
const currentSuffix = currentNodeId.replace(normalizedId, '').match(/₍\d+₎/g)?.[0] ?? ''
const loopSuffix = currentNodeId.match(/_loop\d+/)?.[0] ?? ''
const withSuffix = `${blockId}${currentSuffix}${loopSuffix}`
const suffixedOutput = this.blockStates.get(withSuffix)?.output
if (suffixedOutput !== undefined) {
return suffixedOutput
}
}
for (const [storedId, state] of this.blockStates.entries()) {
if (normalizeLookupId(storedId) === blockId) {
return state.output
}
}
return undefined
}
setBlockOutput(blockId: string, output: NormalizedBlockOutput, executionTime = 0): void {
this.blockStates.set(blockId, { output, executed: true, executionTime })
this.executedBlocks.add(blockId)
}
setBlockState(blockId: string, state: BlockState): void {
this.blockStates.set(blockId, state)
if (state.executed) {
this.executedBlocks.add(blockId)
} else {
this.executedBlocks.delete(blockId)
}
}
deleteBlockState(blockId: string): void {
this.blockStates.delete(blockId)
this.executedBlocks.delete(blockId)
}
unmarkExecuted(blockId: string): void {
this.executedBlocks.delete(blockId)
}
hasExecuted(blockId: string): boolean {
return this.executedBlocks.has(blockId)
}
getLoopScope(loopId: string): LoopScope | undefined {
return this.loopScopes.get(loopId)
}
setLoopScope(loopId: string, scope: LoopScope): void {
this.loopScopes.set(loopId, scope)
}
getParallelScope(parallelId: string): ParallelScope | undefined {
return this.parallelScopes.get(parallelId)
}
setParallelScope(parallelId: string, scope: ParallelScope): void {
this.parallelScopes.set(parallelId, scope)
}
}

View File

@@ -1,4 +1,4 @@
import type { NormalizedBlockOutput } from '@/executor/types'
import type { BlockState, NormalizedBlockOutput } from '@/executor/types'
import type { SubflowType } from '@/stores/workflows/workflow/types'
export interface ContextExtensions {
@@ -10,6 +10,16 @@ export interface ContextExtensions {
edges?: Array<{ source: string; target: string }>
isDeployedContext?: boolean
isChildExecution?: boolean
resumeFromSnapshot?: boolean
resumePendingQueue?: string[]
remainingEdges?: Array<{
source: string
target: string
sourceHandle?: string
targetHandle?: string
}>
dagIncomingEdges?: Record<string, string[]>
snapshotState?: import('@/executor/execution/snapshot').SerializableExecutionState
onStream?: (streamingExecution: unknown) => Promise<void>
onBlockStart?: (
blockId: string,
@@ -37,3 +47,17 @@ export interface ContextExtensions {
export interface WorkflowInput {
[key: string]: unknown
}
export interface BlockStateReader {
getBlockOutput(blockId: string, currentNodeId?: string): NormalizedBlockOutput | undefined
hasExecuted(blockId: string): boolean
}
export interface BlockStateWriter {
setBlockOutput(blockId: string, output: NormalizedBlockOutput, executionTime?: number): void
setBlockState(blockId: string, state: BlockState): void
deleteBlockState(blockId: string): void
unmarkExecuted(blockId: string): void
}
export type BlockStateController = BlockStateReader & BlockStateWriter