fix(parallel): variable resolution in collection (#2314)

* Fix var resolution in parallel

* Fix parallel

* Clean logs

* FIx loop error port
This commit is contained in:
Siddharth Ganesan
2025-12-11 13:51:22 -08:00
committed by GitHub
parent b595273c3f
commit c0bb85479d
9 changed files with 367 additions and 8 deletions

View File

@@ -86,6 +86,27 @@ export class EdgeManager {
this.deactivatedEdges.clear()
}
/**
* Clear deactivated edges for a set of nodes (used when restoring loop state for next iteration).
* This ensures error/success edges can be re-evaluated on each iteration.
*/
clearDeactivatedEdgesForNodes(nodeIds: Set<string>): void {
const edgesToRemove: string[] = []
for (const edgeKey of this.deactivatedEdges) {
// Edge key format is "sourceId-targetId-handle"
// Check if either source or target is in the nodeIds set
for (const nodeId of nodeIds) {
if (edgeKey.startsWith(`${nodeId}-`) || edgeKey.includes(`-${nodeId}-`)) {
edgesToRemove.push(edgeKey)
break
}
}
}
for (const edgeKey of edgesToRemove) {
this.deactivatedEdges.delete(edgeKey)
}
}
private shouldActivateEdge(edge: DAGEdge, output: NormalizedBlockOutput): boolean {
const handle = edge.sourceHandle
@@ -180,7 +201,7 @@ export class EdgeManager {
const sourceNode = this.dag.nodes.get(sourceId)
if (!sourceNode) continue
for (const [_, edge] of sourceNode.outgoingEdges) {
for (const [, edge] of sourceNode.outgoingEdges) {
if (edge.target === node.id) {
const edgeKey = this.createEdgeKey(sourceId, edge.target, edge.sourceHandle)
if (!this.deactivatedEdges.has(edgeKey)) {

View File

@@ -279,6 +279,14 @@ export class ExecutionEngine {
})
this.addMultipleToQueue(readyNodes)
// Check for dynamically added nodes (e.g., from parallel expansion)
if (this.context.pendingDynamicNodes && this.context.pendingDynamicNodes.length > 0) {
const dynamicNodes = this.context.pendingDynamicNodes
this.context.pendingDynamicNodes = []
logger.info('Adding dynamically expanded parallel nodes', { dynamicNodes })
this.addMultipleToQueue(dynamicNodes)
}
}
private buildPausedResult(startTime: number): ExecutionResult {

View File

@@ -64,9 +64,11 @@ export class DAGExecutor {
const resolver = new VariableResolver(this.workflow, this.workflowVariables, state)
const loopOrchestrator = new LoopOrchestrator(dag, state, resolver)
const parallelOrchestrator = new ParallelOrchestrator(dag, state)
parallelOrchestrator.setResolver(resolver)
const allHandlers = createBlockHandlers()
const blockExecutor = new BlockExecutor(allHandlers, resolver, this.contextExtensions, state)
const edgeManager = new EdgeManager(dag)
loopOrchestrator.setEdgeManager(edgeManager)
const nodeOrchestrator = new NodeExecutionOrchestrator(
dag,
state,

View File

@@ -22,6 +22,7 @@ export interface ParallelScope {
branchOutputs: Map<number, NormalizedBlockOutput[]>
completedCount: number
totalExpectedNodes: number
items?: any[]
}
export class ExecutionState implements BlockStateController {

View File

@@ -1,6 +1,7 @@
import { createLogger } from '@/lib/logs/console/logger'
import { buildLoopIndexCondition, DEFAULTS, EDGE } from '@/executor/constants'
import type { DAG } from '@/executor/dag/builder'
import type { EdgeManager } from '@/executor/execution/edge-manager'
import type { LoopScope } from '@/executor/execution/state'
import type { BlockStateController } from '@/executor/execution/types'
import type { ExecutionContext, NormalizedBlockOutput } from '@/executor/types'
@@ -26,12 +27,18 @@ export interface LoopContinuationResult {
}
export class LoopOrchestrator {
private edgeManager: EdgeManager | null = null
constructor(
private dag: DAG,
private state: BlockStateController,
private resolver: VariableResolver
) {}
setEdgeManager(edgeManager: EdgeManager): void {
this.edgeManager = edgeManager
}
initializeLoopScope(ctx: ExecutionContext, loopId: string): LoopScope {
const loopConfig = this.dag.loopConfigs.get(loopId) as SerializedLoop | undefined
if (!loopConfig) {
@@ -216,7 +223,11 @@ export class LoopOrchestrator {
const loopNodes = loopConfig.nodes
const allLoopNodeIds = new Set([sentinelStartId, sentinelEndId, ...loopNodes])
let restoredCount = 0
// Clear deactivated edges for loop nodes so error/success edges can be re-evaluated
if (this.edgeManager) {
this.edgeManager.clearDeactivatedEdgesForNodes(allLoopNodeIds)
}
for (const nodeId of allLoopNodeIds) {
const nodeToRestore = this.dag.nodes.get(nodeId)
if (!nodeToRestore) continue
@@ -224,7 +235,7 @@ export class LoopOrchestrator {
for (const [potentialSourceId, potentialSourceNode] of this.dag.nodes) {
if (!allLoopNodeIds.has(potentialSourceId)) continue
for (const [_, edge] of potentialSourceNode.outgoingEdges) {
for (const [, edge] of potentialSourceNode.outgoingEdges) {
if (edge.target === nodeId) {
const isBackwardEdge =
edge.sourceHandle === EDGE.LOOP_CONTINUE ||
@@ -232,7 +243,6 @@ export class LoopOrchestrator {
if (!isBackwardEdge) {
nodeToRestore.incomingEdges.add(potentialSourceId)
restoredCount++
}
}
}

View File

@@ -53,6 +53,20 @@ export class NodeExecutionOrchestrator {
}
}
// Initialize parallel scope BEFORE execution so <parallel.currentItem> can be resolved
const parallelId = node.metadata.parallelId
if (parallelId && !this.parallelOrchestrator.getParallelScope(ctx, parallelId)) {
const totalBranches = node.metadata.branchTotal || 1
const parallelConfig = this.dag.parallelConfigs.get(parallelId)
const nodesInParallel = (parallelConfig as any)?.nodes?.length || 1
this.parallelOrchestrator.initializeParallelScope(
ctx,
parallelId,
totalBranches,
nodesInParallel
)
}
if (node.metadata.isSentinel) {
const output = this.handleSentinel(ctx, node)
const isFinalOutput = node.outgoingEdges.size === 0

View File

@@ -1,15 +1,17 @@
import { createLogger } from '@/lib/logs/console/logger'
import type { DAG } from '@/executor/dag/builder'
import type { DAG, DAGNode } from '@/executor/dag/builder'
import type { ParallelScope } from '@/executor/execution/state'
import type { BlockStateWriter } from '@/executor/execution/types'
import type { ExecutionContext, NormalizedBlockOutput } from '@/executor/types'
import type { ParallelConfigWithNodes } from '@/executor/types/parallel'
import {
buildBranchNodeId,
calculateBranchCount,
extractBaseBlockId,
extractBranchIndex,
parseDistributionItems,
} from '@/executor/utils/subflow-utils'
import type { VariableResolver } from '@/executor/variables/resolver'
import type { SerializedParallel } from '@/serializer/types'
const logger = createLogger('ParallelOrchestrator')
@@ -29,31 +31,325 @@ export interface ParallelAggregationResult {
}
export class ParallelOrchestrator {
private resolver: VariableResolver | null = null
constructor(
private dag: DAG,
private state: BlockStateWriter
) {}
setResolver(resolver: VariableResolver): void {
this.resolver = resolver
}
initializeParallelScope(
ctx: ExecutionContext,
parallelId: string,
totalBranches: number,
terminalNodesCount = 1
): ParallelScope {
const parallelConfig = this.dag.parallelConfigs.get(parallelId)
const items = parallelConfig ? this.resolveDistributionItems(ctx, parallelConfig) : undefined
// If we have more items than pre-built branches, expand the DAG
const actualBranchCount = items && items.length > totalBranches ? items.length : totalBranches
const scope: ParallelScope = {
parallelId,
totalBranches,
totalBranches: actualBranchCount,
branchOutputs: new Map(),
completedCount: 0,
totalExpectedNodes: totalBranches * terminalNodesCount,
totalExpectedNodes: actualBranchCount * terminalNodesCount,
items,
}
if (!ctx.parallelExecutions) {
ctx.parallelExecutions = new Map()
}
ctx.parallelExecutions.set(parallelId, scope)
// Dynamically expand DAG if needed
if (items && items.length > totalBranches && parallelConfig) {
logger.info('Dynamically expanding parallel branches', {
parallelId,
existingBranches: totalBranches,
targetBranches: items.length,
itemsCount: items.length,
})
const newEntryNodes = this.expandParallelBranches(
parallelId,
parallelConfig,
totalBranches,
items.length
)
logger.info('Parallel expansion complete', {
parallelId,
newEntryNodes,
totalNodesInDag: this.dag.nodes.size,
})
// Add new entry nodes to pending dynamic nodes so the engine can schedule them
if (newEntryNodes.length > 0) {
if (!ctx.pendingDynamicNodes) {
ctx.pendingDynamicNodes = []
}
ctx.pendingDynamicNodes.push(...newEntryNodes)
}
} else {
logger.info('No parallel expansion needed', {
parallelId,
itemsLength: items?.length,
totalBranches,
hasParallelConfig: !!parallelConfig,
})
}
return scope
}
/**
* Dynamically expand the DAG to include additional branch nodes when
* the resolved item count exceeds the pre-built branch count.
*/
private expandParallelBranches(
parallelId: string,
config: SerializedParallel,
existingBranchCount: number,
targetBranchCount: number
): string[] {
// Get all blocks that are part of this parallel
const blocksInParallel = config.nodes
const blocksInParallelSet = new Set(blocksInParallel)
// Step 1: Create all new nodes first
for (const blockId of blocksInParallel) {
const branch0NodeId = buildBranchNodeId(blockId, 0)
const templateNode = this.dag.nodes.get(branch0NodeId)
if (!templateNode) {
logger.warn('Template node not found for parallel expansion', { blockId, branch0NodeId })
continue
}
for (let branchIndex = existingBranchCount; branchIndex < targetBranchCount; branchIndex++) {
const newNodeId = buildBranchNodeId(blockId, branchIndex)
const newNode: DAGNode = {
id: newNodeId,
block: {
...templateNode.block,
id: newNodeId,
},
incomingEdges: new Set(),
outgoingEdges: new Map(),
metadata: {
...templateNode.metadata,
branchIndex,
branchTotal: targetBranchCount,
originalBlockId: blockId,
},
}
this.dag.nodes.set(newNodeId, newNode)
}
}
// Step 2: Wire edges between the new branch nodes
this.wireExpandedBranchEdges(
parallelId,
blocksInParallel,
existingBranchCount,
targetBranchCount
)
// Step 3: Update metadata on existing nodes to reflect new total
this.updateExistingBranchMetadata(blocksInParallel, existingBranchCount, targetBranchCount)
// Step 4: Identify entry nodes AFTER edges are wired
// Entry nodes are those with no INTERNAL incoming edges (edges from outside parallel don't count)
const newEntryNodes: string[] = []
for (const blockId of blocksInParallel) {
const branch0NodeId = buildBranchNodeId(blockId, 0)
const templateNode = this.dag.nodes.get(branch0NodeId)
if (!templateNode) continue
// Check if template has any INTERNAL incoming edges
let hasInternalIncoming = false
for (const incomingId of templateNode.incomingEdges) {
const baseIncomingId = extractBaseBlockId(incomingId)
if (blocksInParallelSet.has(baseIncomingId)) {
hasInternalIncoming = true
break
}
}
// If no internal incoming edges, the new branches of this block are entry nodes
if (!hasInternalIncoming) {
for (
let branchIndex = existingBranchCount;
branchIndex < targetBranchCount;
branchIndex++
) {
newEntryNodes.push(buildBranchNodeId(blockId, branchIndex))
}
}
}
return newEntryNodes
}
/**
* Wire edges between expanded branch nodes by replicating the edge pattern from branch 0.
* Handles both internal edges (within the parallel) and exit edges (to blocks after the parallel).
*/
private wireExpandedBranchEdges(
parallelId: string,
blocksInParallel: string[],
existingBranchCount: number,
targetBranchCount: number
): void {
const blocksInParallelSet = new Set(blocksInParallel)
// For each block, look at branch 0's outgoing edges and replicate for new branches
for (const blockId of blocksInParallel) {
const branch0NodeId = buildBranchNodeId(blockId, 0)
const branch0Node = this.dag.nodes.get(branch0NodeId)
if (!branch0Node) continue
// Replicate outgoing edges for each new branch
for (const [, edge] of branch0Node.outgoingEdges) {
// Use edge.target (the actual target node ID), not the Map key which may be a formatted edge ID
const actualTargetNodeId = edge.target
// Extract the base target block ID
const baseTargetId = extractBaseBlockId(actualTargetNodeId)
// Check if target is inside or outside the parallel
const isInternalEdge = blocksInParallelSet.has(baseTargetId)
for (
let branchIndex = existingBranchCount;
branchIndex < targetBranchCount;
branchIndex++
) {
const sourceNodeId = buildBranchNodeId(blockId, branchIndex)
const sourceNode = this.dag.nodes.get(sourceNodeId)
if (!sourceNode) continue
if (isInternalEdge) {
// Internal edge: wire to the corresponding branch of the target
const newTargetNodeId = buildBranchNodeId(baseTargetId, branchIndex)
const targetNode = this.dag.nodes.get(newTargetNodeId)
if (targetNode) {
sourceNode.outgoingEdges.set(newTargetNodeId, {
target: newTargetNodeId,
sourceHandle: edge.sourceHandle,
targetHandle: edge.targetHandle,
})
targetNode.incomingEdges.add(sourceNodeId)
}
} else {
// Exit edge: wire to the same external target (blocks after the parallel)
// All branches point to the same external node
const externalTargetNode = this.dag.nodes.get(actualTargetNodeId)
if (externalTargetNode) {
sourceNode.outgoingEdges.set(actualTargetNodeId, {
target: actualTargetNodeId,
sourceHandle: edge.sourceHandle,
targetHandle: edge.targetHandle,
})
// Add incoming edge from this new branch to the external node
externalTargetNode.incomingEdges.add(sourceNodeId)
}
}
}
}
}
}
/**
* Update existing branch nodes' metadata to reflect the new total branch count.
*/
private updateExistingBranchMetadata(
blocksInParallel: string[],
existingBranchCount: number,
targetBranchCount: number
): void {
for (const blockId of blocksInParallel) {
for (let branchIndex = 0; branchIndex < existingBranchCount; branchIndex++) {
const nodeId = buildBranchNodeId(blockId, branchIndex)
const node = this.dag.nodes.get(nodeId)
if (node) {
node.metadata.branchTotal = targetBranchCount
}
}
}
}
/**
* Resolve distribution items at runtime, handling references like <previousBlock.items>
* This mirrors how LoopOrchestrator.resolveForEachItems works.
*/
private resolveDistributionItems(ctx: ExecutionContext, config: SerializedParallel): any[] {
const rawItems = config.distribution
if (rawItems === undefined || rawItems === null) {
return []
}
// Already an array - return as-is
if (Array.isArray(rawItems)) {
return rawItems
}
// Object - convert to entries array (consistent with loop forEach behavior)
if (typeof rawItems === 'object') {
return Object.entries(rawItems)
}
// String handling
if (typeof rawItems === 'string') {
// Resolve references at runtime using the variable resolver
if (rawItems.startsWith('<') && rawItems.endsWith('>') && this.resolver) {
const resolved = this.resolver.resolveSingleReference(ctx, '', rawItems)
if (Array.isArray(resolved)) {
return resolved
}
if (typeof resolved === 'object' && resolved !== null) {
return Object.entries(resolved)
}
logger.warn('Distribution reference did not resolve to array or object', {
rawItems,
resolved,
})
return []
}
// Try to parse as JSON
try {
const normalized = rawItems.replace(/'/g, '"')
const parsed = JSON.parse(normalized)
if (Array.isArray(parsed)) {
return parsed
}
if (typeof parsed === 'object' && parsed !== null) {
return Object.entries(parsed)
}
return []
} catch (error) {
logger.error('Failed to parse distribution items', { rawItems, error })
return []
}
}
return []
}
handleParallelBranchCompletion(
ctx: ExecutionContext,
parallelId: string,

View File

@@ -190,6 +190,7 @@ export interface ExecutionContext {
completedCount: number
totalExpectedNodes: number
parallelType?: 'count' | 'collection'
items?: any[]
}
>
@@ -223,6 +224,9 @@ export interface ExecutionContext {
// Cancellation support
isCancelled?: boolean
// Dynamically added nodes that need to be scheduled (e.g., from parallel expansion)
pendingDynamicNodes?: string[]
}
export interface ExecutionResult {

View File

@@ -49,7 +49,10 @@ export class ParallelResolver implements Resolver {
return undefined
}
const distributionItems = this.getDistributionItems(parallelConfig)
// First try to get items from the parallel scope (resolved at runtime)
// This is the same pattern as LoopResolver reading from loopScope.items
const parallelScope = context.executionContext.parallelExecutions?.get(parallelId)
const distributionItems = parallelScope?.items ?? this.getDistributionItems(parallelConfig)
let value: any
switch (property) {