fix(parallel): add parallel sentinel to make parallel-parallel and parallel-loop work correctly (#2593)

* fix(parallel): add parallel sentinel to make parallel-parallel and parallel-loop work correctly

* fix regular -> parallel + copilot nested subflows

* add tests

* consolidate to always explode parallel dag at runtime
This commit is contained in:
Vikhyath Mondreti
2025-12-26 16:51:54 -08:00
committed by GitHub
parent b60b98e42c
commit 298546daf1
13 changed files with 1207 additions and 650 deletions

View File

@@ -53,6 +53,7 @@ export const EDGE = {
LOOP_CONTINUE: 'loop_continue',
LOOP_CONTINUE_ALT: 'loop-continue-source',
LOOP_EXIT: 'loop_exit',
PARALLEL_EXIT: 'parallel_exit',
ERROR: 'error',
SOURCE: 'source',
DEFAULT: 'default',
@@ -88,6 +89,16 @@ export const PARALLEL = {
SUFFIX: '₎',
},
SENTINEL: {
PREFIX: 'parallel-',
START_SUFFIX: '-sentinel-start',
END_SUFFIX: '-sentinel-end',
START_TYPE: 'start' as SentinelType,
END_TYPE: 'end' as SentinelType,
START_NAME_PREFIX: 'Parallel Start',
END_NAME_PREFIX: 'Parallel End',
},
DEFAULT_COUNT: 1,
} as const

View File

@@ -2,9 +2,14 @@ import { createLogger } from '@sim/logger'
import { EdgeConstructor } from '@/executor/dag/construction/edges'
import { LoopConstructor } from '@/executor/dag/construction/loops'
import { NodeConstructor } from '@/executor/dag/construction/nodes'
import { ParallelConstructor } from '@/executor/dag/construction/parallels'
import { PathConstructor } from '@/executor/dag/construction/paths'
import type { DAGEdge, NodeMetadata } from '@/executor/dag/types'
import { buildSentinelStartId, extractBaseBlockId } from '@/executor/utils/subflow-utils'
import {
buildParallelSentinelStartId,
buildSentinelStartId,
extractBaseBlockId,
} from '@/executor/utils/subflow-utils'
import type {
SerializedBlock,
SerializedLoop,
@@ -31,6 +36,7 @@ export interface DAG {
export class DAGBuilder {
private pathConstructor = new PathConstructor()
private loopConstructor = new LoopConstructor()
private parallelConstructor = new ParallelConstructor()
private nodeConstructor = new NodeConstructor()
private edgeConstructor = new EdgeConstructor()
@@ -50,6 +56,7 @@ export class DAGBuilder {
const reachableBlocks = this.pathConstructor.execute(workflow, triggerBlockId)
this.loopConstructor.execute(dag, reachableBlocks)
this.parallelConstructor.execute(dag, reachableBlocks)
const { blocksInLoops, blocksInParallels, pauseTriggerMapping } = this.nodeConstructor.execute(
workflow,
@@ -135,7 +142,9 @@ export class DAGBuilder {
)
}
const sentinelStartNode = dag.nodes.get(buildSentinelStartId(id))
const sentinelStartId =
type === 'Loop' ? buildSentinelStartId(id) : buildParallelSentinelStartId(id)
const sentinelStartNode = dag.nodes.get(sentinelStartId)
if (!sentinelStartNode) return
const hasConnections = Array.from(sentinelStartNode.outgoingEdges.values()).some((edge) =>

View File

@@ -564,4 +564,548 @@ describe('EdgeConstructor', () => {
expect(outsideNode.outgoingEdges.size).toBe(0)
})
})
describe('Subflow-to-subflow edge wiring', () => {
describe('Parallel → Parallel', () => {
it('should wire parallel sentinel end to next parallel sentinel start', () => {
const parallel1Id = 'parallel-1'
const parallel2Id = 'parallel-2'
const task1Id = 'task-1'
const task2Id = 'task-2'
const task1TemplateId = `${task1Id}__branch-0`
const task2TemplateId = `${task2Id}__branch-0`
const parallel1SentinelStart = `parallel-${parallel1Id}-sentinel-start`
const parallel1SentinelEnd = `parallel-${parallel1Id}-sentinel-end`
const parallel2SentinelStart = `parallel-${parallel2Id}-sentinel-start`
const parallel2SentinelEnd = `parallel-${parallel2Id}-sentinel-end`
const workflow = createMockWorkflow(
[
createMockBlock(parallel1Id, 'parallel'),
createMockBlock(parallel2Id, 'parallel'),
createMockBlock(task1Id),
createMockBlock(task2Id),
],
[{ source: parallel1Id, target: parallel2Id }],
{},
{
[parallel1Id]: { id: parallel1Id, nodes: [task1Id], count: 2 },
[parallel2Id]: { id: parallel2Id, nodes: [task2Id], count: 2 },
}
)
const dag = createMockDAG([
task1TemplateId,
task2TemplateId,
parallel1SentinelStart,
parallel1SentinelEnd,
parallel2SentinelStart,
parallel2SentinelEnd,
])
dag.parallelConfigs.set(parallel1Id, { id: parallel1Id, nodes: [task1Id], count: 2 })
dag.parallelConfigs.set(parallel2Id, { id: parallel2Id, nodes: [task2Id], count: 2 })
edgeConstructor.execute(
workflow,
dag,
new Set([task1Id, task2Id]),
new Set(),
new Set([
task1TemplateId,
task2TemplateId,
parallel1SentinelStart,
parallel1SentinelEnd,
parallel2SentinelStart,
parallel2SentinelEnd,
]),
new Map()
)
const parallel1EndNode = dag.nodes.get(parallel1SentinelEnd)!
const edgesToParallel2 = Array.from(parallel1EndNode.outgoingEdges.values()).filter(
(e) => e.target === parallel2SentinelStart
)
expect(edgesToParallel2.length).toBeGreaterThan(0)
expect(edgesToParallel2[0].sourceHandle).toBe('parallel_exit')
})
})
describe('Loop → Loop', () => {
it('should wire loop sentinel end to next loop sentinel start', () => {
const loop1Id = 'loop-1'
const loop2Id = 'loop-2'
const task1Id = 'task-1'
const task2Id = 'task-2'
const loop1SentinelStart = `loop-${loop1Id}-sentinel-start`
const loop1SentinelEnd = `loop-${loop1Id}-sentinel-end`
const loop2SentinelStart = `loop-${loop2Id}-sentinel-start`
const loop2SentinelEnd = `loop-${loop2Id}-sentinel-end`
const workflow = createMockWorkflow(
[
createMockBlock(loop1Id, 'loop'),
createMockBlock(loop2Id, 'loop'),
createMockBlock(task1Id),
createMockBlock(task2Id),
],
[{ source: loop1Id, target: loop2Id }],
{
[loop1Id]: {
id: loop1Id,
nodes: [task1Id],
iterations: 3,
loopType: 'for',
} as SerializedLoop,
[loop2Id]: {
id: loop2Id,
nodes: [task2Id],
iterations: 3,
loopType: 'for',
} as SerializedLoop,
}
)
const dag = createMockDAG([
task1Id,
task2Id,
loop1SentinelStart,
loop1SentinelEnd,
loop2SentinelStart,
loop2SentinelEnd,
])
dag.loopConfigs.set(loop1Id, {
id: loop1Id,
nodes: [task1Id],
iterations: 3,
loopType: 'for',
} as SerializedLoop)
dag.loopConfigs.set(loop2Id, {
id: loop2Id,
nodes: [task2Id],
iterations: 3,
loopType: 'for',
} as SerializedLoop)
edgeConstructor.execute(
workflow,
dag,
new Set(),
new Set([task1Id, task2Id]),
new Set([
task1Id,
task2Id,
loop1SentinelStart,
loop1SentinelEnd,
loop2SentinelStart,
loop2SentinelEnd,
]),
new Map()
)
const loop1EndNode = dag.nodes.get(loop1SentinelEnd)!
const edgesToLoop2 = Array.from(loop1EndNode.outgoingEdges.values()).filter(
(e) => e.target === loop2SentinelStart
)
expect(edgesToLoop2.length).toBeGreaterThan(0)
expect(edgesToLoop2[0].sourceHandle).toBe('loop_exit')
const loop1StartNode = dag.nodes.get(loop1SentinelStart)!
const earlyExitEdges = Array.from(loop1StartNode.outgoingEdges.values()).filter(
(e) => e.target === loop2SentinelStart && e.sourceHandle === 'loop_exit'
)
expect(earlyExitEdges.length).toBeGreaterThan(0)
})
})
describe('Parallel → Loop', () => {
it('should wire parallel sentinel end to loop sentinel start', () => {
const parallelId = 'parallel-1'
const loopId = 'loop-1'
const taskInParallelId = 'parallel-task'
const taskInParallelTemplateId = `${taskInParallelId}__branch-0`
const taskInLoopId = 'loop-task'
const parallelSentinelStart = `parallel-${parallelId}-sentinel-start`
const parallelSentinelEnd = `parallel-${parallelId}-sentinel-end`
const loopSentinelStart = `loop-${loopId}-sentinel-start`
const loopSentinelEnd = `loop-${loopId}-sentinel-end`
const workflow = createMockWorkflow(
[
createMockBlock(parallelId, 'parallel'),
createMockBlock(loopId, 'loop'),
createMockBlock(taskInParallelId),
createMockBlock(taskInLoopId),
],
[{ source: parallelId, target: loopId }],
{
[loopId]: {
id: loopId,
nodes: [taskInLoopId],
iterations: 3,
loopType: 'for',
} as SerializedLoop,
},
{
[parallelId]: { id: parallelId, nodes: [taskInParallelId], count: 2 },
}
)
const dag = createMockDAG([
taskInParallelTemplateId,
taskInLoopId,
parallelSentinelStart,
parallelSentinelEnd,
loopSentinelStart,
loopSentinelEnd,
])
dag.parallelConfigs.set(parallelId, {
id: parallelId,
nodes: [taskInParallelId],
count: 2,
})
dag.loopConfigs.set(loopId, {
id: loopId,
nodes: [taskInLoopId],
iterations: 3,
loopType: 'for',
} as SerializedLoop)
edgeConstructor.execute(
workflow,
dag,
new Set([taskInParallelId]),
new Set([taskInLoopId]),
new Set([
taskInParallelTemplateId,
taskInLoopId,
parallelSentinelStart,
parallelSentinelEnd,
loopSentinelStart,
loopSentinelEnd,
]),
new Map()
)
const parallelEndNode = dag.nodes.get(parallelSentinelEnd)!
const edgesToLoop = Array.from(parallelEndNode.outgoingEdges.values()).filter(
(e) => e.target === loopSentinelStart
)
expect(edgesToLoop.length).toBeGreaterThan(0)
expect(edgesToLoop[0].sourceHandle).toBe('parallel_exit')
})
})
describe('Loop → Parallel', () => {
it('should wire loop sentinel end to parallel sentinel start', () => {
const loopId = 'loop-1'
const parallelId = 'parallel-1'
const taskInLoopId = 'loop-task'
const taskInParallelId = 'parallel-task'
const taskInParallelTemplateId = `${taskInParallelId}__branch-0`
const loopSentinelStart = `loop-${loopId}-sentinel-start`
const loopSentinelEnd = `loop-${loopId}-sentinel-end`
const parallelSentinelStart = `parallel-${parallelId}-sentinel-start`
const parallelSentinelEnd = `parallel-${parallelId}-sentinel-end`
const workflow = createMockWorkflow(
[
createMockBlock(loopId, 'loop'),
createMockBlock(parallelId, 'parallel'),
createMockBlock(taskInLoopId),
createMockBlock(taskInParallelId),
],
[{ source: loopId, target: parallelId }],
{
[loopId]: {
id: loopId,
nodes: [taskInLoopId],
iterations: 3,
loopType: 'for',
} as SerializedLoop,
},
{
[parallelId]: { id: parallelId, nodes: [taskInParallelId], count: 2 },
}
)
const dag = createMockDAG([
taskInLoopId,
taskInParallelTemplateId,
loopSentinelStart,
loopSentinelEnd,
parallelSentinelStart,
parallelSentinelEnd,
])
dag.loopConfigs.set(loopId, {
id: loopId,
nodes: [taskInLoopId],
iterations: 3,
loopType: 'for',
} as SerializedLoop)
dag.parallelConfigs.set(parallelId, {
id: parallelId,
nodes: [taskInParallelId],
count: 2,
})
edgeConstructor.execute(
workflow,
dag,
new Set([taskInParallelId]),
new Set([taskInLoopId]),
new Set([
taskInLoopId,
taskInParallelTemplateId,
loopSentinelStart,
loopSentinelEnd,
parallelSentinelStart,
parallelSentinelEnd,
]),
new Map()
)
const loopEndNode = dag.nodes.get(loopSentinelEnd)!
const edgesToParallel = Array.from(loopEndNode.outgoingEdges.values()).filter(
(e) => e.target === parallelSentinelStart
)
expect(edgesToParallel.length).toBeGreaterThan(0)
expect(edgesToParallel[0].sourceHandle).toBe('loop_exit')
const loopStartNode = dag.nodes.get(loopSentinelStart)!
const earlyExitEdges = Array.from(loopStartNode.outgoingEdges.values()).filter(
(e) => e.target === parallelSentinelStart && e.sourceHandle === 'loop_exit'
)
expect(earlyExitEdges.length).toBeGreaterThan(0)
})
})
describe('Subflow → Regular block', () => {
it('should wire parallel sentinel end to regular block', () => {
const parallelId = 'parallel-1'
const taskInParallelId = 'parallel-task'
const taskInParallelTemplateId = `${taskInParallelId}__branch-0`
const regularBlockId = 'regular-block'
const parallelSentinelStart = `parallel-${parallelId}-sentinel-start`
const parallelSentinelEnd = `parallel-${parallelId}-sentinel-end`
const workflow = createMockWorkflow(
[
createMockBlock(parallelId, 'parallel'),
createMockBlock(taskInParallelId),
createMockBlock(regularBlockId),
],
[{ source: parallelId, target: regularBlockId }],
{},
{
[parallelId]: { id: parallelId, nodes: [taskInParallelId], count: 2 },
}
)
const dag = createMockDAG([
taskInParallelTemplateId,
regularBlockId,
parallelSentinelStart,
parallelSentinelEnd,
])
dag.parallelConfigs.set(parallelId, {
id: parallelId,
nodes: [taskInParallelId],
count: 2,
})
edgeConstructor.execute(
workflow,
dag,
new Set([taskInParallelId]),
new Set(),
new Set([
taskInParallelTemplateId,
regularBlockId,
parallelSentinelStart,
parallelSentinelEnd,
]),
new Map()
)
const parallelEndNode = dag.nodes.get(parallelSentinelEnd)!
const edgesToRegular = Array.from(parallelEndNode.outgoingEdges.values()).filter(
(e) => e.target === regularBlockId
)
expect(edgesToRegular.length).toBe(1)
expect(edgesToRegular[0].sourceHandle).toBe('parallel_exit')
const regularBlockNode = dag.nodes.get(regularBlockId)!
expect(regularBlockNode.incomingEdges.has(parallelSentinelEnd)).toBe(true)
})
it('should wire loop sentinel end to regular block', () => {
const loopId = 'loop-1'
const taskInLoopId = 'loop-task'
const regularBlockId = 'regular-block'
const loopSentinelStart = `loop-${loopId}-sentinel-start`
const loopSentinelEnd = `loop-${loopId}-sentinel-end`
const workflow = createMockWorkflow(
[
createMockBlock(loopId, 'loop'),
createMockBlock(taskInLoopId),
createMockBlock(regularBlockId),
],
[{ source: loopId, target: regularBlockId }],
{
[loopId]: {
id: loopId,
nodes: [taskInLoopId],
iterations: 3,
loopType: 'for',
} as SerializedLoop,
}
)
const dag = createMockDAG([
taskInLoopId,
regularBlockId,
loopSentinelStart,
loopSentinelEnd,
])
dag.loopConfigs.set(loopId, {
id: loopId,
nodes: [taskInLoopId],
iterations: 3,
loopType: 'for',
} as SerializedLoop)
edgeConstructor.execute(
workflow,
dag,
new Set(),
new Set([taskInLoopId]),
new Set([taskInLoopId, regularBlockId, loopSentinelStart, loopSentinelEnd]),
new Map()
)
const loopEndNode = dag.nodes.get(loopSentinelEnd)!
const edgesToRegular = Array.from(loopEndNode.outgoingEdges.values()).filter(
(e) => e.target === regularBlockId
)
expect(edgesToRegular.length).toBe(1)
expect(edgesToRegular[0].sourceHandle).toBe('loop_exit')
})
})
describe('Regular block → Subflow', () => {
it('should wire regular block to parallel sentinel start', () => {
const regularBlockId = 'regular-block'
const parallelId = 'parallel-1'
const taskInParallelId = 'parallel-task'
const taskInParallelTemplateId = `${taskInParallelId}__branch-0`
const parallelSentinelStart = `parallel-${parallelId}-sentinel-start`
const parallelSentinelEnd = `parallel-${parallelId}-sentinel-end`
const workflow = createMockWorkflow(
[
createMockBlock(regularBlockId),
createMockBlock(parallelId, 'parallel'),
createMockBlock(taskInParallelId),
],
[{ source: regularBlockId, target: parallelId }],
{},
{
[parallelId]: { id: parallelId, nodes: [taskInParallelId], count: 2 },
}
)
const dag = createMockDAG([
regularBlockId,
taskInParallelTemplateId,
parallelSentinelStart,
parallelSentinelEnd,
])
dag.parallelConfigs.set(parallelId, {
id: parallelId,
nodes: [taskInParallelId],
count: 2,
})
edgeConstructor.execute(
workflow,
dag,
new Set([taskInParallelId]),
new Set(),
new Set([
regularBlockId,
taskInParallelTemplateId,
parallelSentinelStart,
parallelSentinelEnd,
]),
new Map()
)
const regularBlockNode = dag.nodes.get(regularBlockId)!
const edgesToParallel = Array.from(regularBlockNode.outgoingEdges.values()).filter(
(e) => e.target === parallelSentinelStart
)
expect(edgesToParallel.length).toBe(1)
const parallelStartNode = dag.nodes.get(parallelSentinelStart)!
expect(parallelStartNode.incomingEdges.has(regularBlockId)).toBe(true)
})
it('should wire regular block to loop sentinel start', () => {
const regularBlockId = 'regular-block'
const loopId = 'loop-1'
const taskInLoopId = 'loop-task'
const loopSentinelStart = `loop-${loopId}-sentinel-start`
const loopSentinelEnd = `loop-${loopId}-sentinel-end`
const workflow = createMockWorkflow(
[
createMockBlock(regularBlockId),
createMockBlock(loopId, 'loop'),
createMockBlock(taskInLoopId),
],
[{ source: regularBlockId, target: loopId }],
{
[loopId]: {
id: loopId,
nodes: [taskInLoopId],
iterations: 3,
loopType: 'for',
} as SerializedLoop,
}
)
const dag = createMockDAG([
regularBlockId,
taskInLoopId,
loopSentinelStart,
loopSentinelEnd,
])
dag.loopConfigs.set(loopId, {
id: loopId,
nodes: [taskInLoopId],
iterations: 3,
loopType: 'for',
} as SerializedLoop)
edgeConstructor.execute(
workflow,
dag,
new Set(),
new Set([taskInLoopId]),
new Set([regularBlockId, taskInLoopId, loopSentinelStart, loopSentinelEnd]),
new Map()
)
const regularBlockNode = dag.nodes.get(regularBlockId)!
const edgesToLoop = Array.from(regularBlockNode.outgoingEdges.values()).filter(
(e) => e.target === loopSentinelStart
)
expect(edgesToLoop.length).toBe(1)
const loopStartNode = dag.nodes.get(loopSentinelStart)!
expect(loopStartNode.incomingEdges.has(regularBlockId)).toBe(true)
})
})
})
})

View File

@@ -3,11 +3,11 @@ import { EDGE, isConditionBlockType, isRouterBlockType } from '@/executor/consta
import type { DAG } from '@/executor/dag/builder'
import {
buildBranchNodeId,
buildParallelSentinelEndId,
buildParallelSentinelStartId,
buildSentinelEndId,
buildSentinelStartId,
calculateBranchCount,
extractBaseBlockId,
parseDistributionItems,
} from '@/executor/utils/subflow-utils'
import type { SerializedWorkflow } from '@/serializer/types'
@@ -51,7 +51,7 @@ export class EdgeConstructor {
)
this.wireLoopSentinels(dag, reachableBlocks)
this.wireParallelBlocks(workflow, dag, loopBlockIds, parallelBlockIds, pauseTriggerMapping)
this.wireParallelSentinels(dag)
}
private buildMetadataMaps(workflow: SerializedWorkflow): EdgeMetadata {
@@ -157,43 +157,45 @@ export class EdgeConstructor {
const sourceIsParallelBlock = parallelBlockIds.has(source)
const targetIsParallelBlock = parallelBlockIds.has(target)
if (
sourceIsLoopBlock ||
targetIsLoopBlock ||
sourceIsParallelBlock ||
targetIsParallelBlock
) {
let loopSentinelStartId: string | undefined
let loopSentinelStartId: string | undefined
if (sourceIsLoopBlock) {
const sentinelEndId = buildSentinelEndId(originalSource)
loopSentinelStartId = buildSentinelStartId(originalSource)
if (!dag.nodes.has(sentinelEndId) || !dag.nodes.has(loopSentinelStartId)) {
continue
}
source = sentinelEndId
sourceHandle = EDGE.LOOP_EXIT
}
if (targetIsLoopBlock) {
const sentinelStartId = buildSentinelStartId(target)
if (!dag.nodes.has(sentinelStartId)) {
continue
}
target = sentinelStartId
}
if (loopSentinelStartId) {
this.addEdge(dag, loopSentinelStartId, target, EDGE.LOOP_EXIT, targetHandle)
}
if (sourceIsParallelBlock || targetIsParallelBlock) {
if (sourceIsLoopBlock) {
const sentinelEndId = buildSentinelEndId(originalSource)
loopSentinelStartId = buildSentinelStartId(originalSource)
if (!dag.nodes.has(sentinelEndId) || !dag.nodes.has(loopSentinelStartId)) {
continue
}
source = sentinelEndId
sourceHandle = EDGE.LOOP_EXIT
}
if (targetIsLoopBlock) {
const sentinelStartId = buildSentinelStartId(target)
if (!dag.nodes.has(sentinelStartId)) {
continue
}
target = sentinelStartId
}
if (sourceIsParallelBlock) {
const sentinelEndId = buildParallelSentinelEndId(originalSource)
if (!dag.nodes.has(sentinelEndId)) {
continue
}
source = sentinelEndId
sourceHandle = EDGE.PARALLEL_EXIT
}
if (targetIsParallelBlock) {
const sentinelStartId = buildParallelSentinelStartId(target)
if (!dag.nodes.has(sentinelStartId)) {
continue
}
target = sentinelStartId
}
if (loopSentinelStartId) {
this.addEdge(dag, loopSentinelStartId, target, EDGE.LOOP_EXIT, targetHandle)
}
if (this.edgeCrossesLoopBoundary(source, target, blocksInLoops, dag)) {
@@ -209,19 +211,12 @@ export class EdgeConstructor {
const targetParallelId = this.getParallelId(target, dag)
if (sourceParallelId === targetParallelId) {
this.wireParallelInternalEdge(
source,
target,
sourceParallelId!,
dag,
sourceHandle,
targetHandle,
pauseTriggerMapping
)
this.wireParallelTemplateEdge(source, target, dag, sourceHandle, targetHandle)
} else {
logger.warn('Edge between different parallels - invalid workflow', { source, target })
}
} else if (blocksInParallels.has(source) || blocksInParallels.has(target)) {
// Skip - will be handled by sentinel wiring
} else {
const resolvedSource = pauseTriggerMapping.get(originalSource) ?? source
this.addEdge(dag, resolvedSource, target, sourceHandle, targetHandle)
@@ -256,81 +251,32 @@ export class EdgeConstructor {
}
}
private wireParallelBlocks(
workflow: SerializedWorkflow,
dag: DAG,
loopBlockIds: Set<string>,
parallelBlockIds: Set<string>,
pauseTriggerMapping: Map<string, string>
): void {
private wireParallelSentinels(dag: DAG): void {
for (const [parallelId, parallelConfig] of dag.parallelConfigs) {
const nodes = parallelConfig.nodes
if (nodes.length === 0) continue
const { entryNodes, terminalNodes, branchCount } = this.findParallelBoundaryNodes(
nodes,
parallelId,
dag
)
const sentinelStartId = buildParallelSentinelStartId(parallelId)
const sentinelEndId = buildParallelSentinelEndId(parallelId)
logger.info('Wiring parallel block edges', {
parallelId,
entryNodes,
terminalNodes,
branchCount,
})
if (!dag.nodes.has(sentinelStartId) || !dag.nodes.has(sentinelEndId)) {
continue
}
for (const connection of workflow.connections) {
const { source, target, sourceHandle, targetHandle } = connection
const { entryNodes, terminalNodes } = this.findParallelBoundaryNodes(nodes, dag)
if (target === parallelId) {
if (loopBlockIds.has(source) || parallelBlockIds.has(source)) continue
if (nodes.includes(source)) {
logger.warn('Invalid: parallel block connected from its own internal node', {
parallelId,
source,
})
continue
}
logger.info('Wiring edge to parallel block', { source, parallelId, entryNodes })
for (const entryNodeId of entryNodes) {
for (let i = 0; i < branchCount; i++) {
const branchNodeId = buildBranchNodeId(entryNodeId, i)
if (dag.nodes.has(branchNodeId)) {
this.addEdge(dag, source, branchNodeId, sourceHandle, targetHandle)
}
}
}
for (const entryNodeId of entryNodes) {
const templateNodeId = buildBranchNodeId(entryNodeId, 0)
if (dag.nodes.has(templateNodeId)) {
this.addEdge(dag, sentinelStartId, templateNodeId)
}
}
if (source === parallelId) {
if (loopBlockIds.has(target) || parallelBlockIds.has(target)) continue
if (nodes.includes(target)) {
logger.warn('Invalid: parallel block connected to its own internal node', {
parallelId,
target,
})
continue
}
logger.info('Wiring edge from parallel block', { parallelId, target, terminalNodes })
for (const terminalNodeId of terminalNodes) {
for (let i = 0; i < branchCount; i++) {
const branchNodeId = buildBranchNodeId(terminalNodeId, i)
if (dag.nodes.has(branchNodeId)) {
const resolvedSourceId = pauseTriggerMapping.get(branchNodeId) ?? branchNodeId
this.addEdge(dag, resolvedSourceId, target, sourceHandle, targetHandle)
}
}
}
for (const terminalNodeId of terminalNodes) {
const templateNodeId = buildBranchNodeId(terminalNodeId, 0)
if (dag.nodes.has(templateNodeId)) {
this.addEdge(dag, templateNodeId, sentinelEndId)
}
}
}
@@ -384,30 +330,16 @@ export class EdgeConstructor {
return true
}
private wireParallelInternalEdge(
private wireParallelTemplateEdge(
source: string,
target: string,
parallelId: string,
dag: DAG,
sourceHandle?: string,
targetHandle?: string,
pauseTriggerMapping?: Map<string, string>
targetHandle?: string
): void {
const parallelConfig = dag.parallelConfigs.get(parallelId)
if (!parallelConfig) {
throw new Error(`Parallel config not found: ${parallelId}`)
}
const distributionItems = parseDistributionItems(parallelConfig)
const count = calculateBranchCount(parallelConfig, distributionItems)
for (let i = 0; i < count; i++) {
const sourceNodeId = buildBranchNodeId(source, i)
const targetNodeId = buildBranchNodeId(target, i)
const resolvedSourceId = pauseTriggerMapping?.get(sourceNodeId) ?? sourceNodeId
this.addEdge(dag, resolvedSourceId, targetNodeId, sourceHandle, targetHandle)
}
const sourceNodeId = buildBranchNodeId(source, 0)
const targetNodeId = buildBranchNodeId(target, 0)
this.addEdge(dag, sourceNodeId, targetNodeId, sourceHandle, targetHandle)
}
private findLoopBoundaryNodes(
@@ -465,92 +397,44 @@ export class EdgeConstructor {
private findParallelBoundaryNodes(
nodes: string[],
parallelId: string,
dag: DAG
): { entryNodes: string[]; terminalNodes: string[]; branchCount: number } {
): { entryNodes: string[]; terminalNodes: string[] } {
const nodesSet = new Set(nodes)
const entryNodesSet = new Set<string>()
const terminalNodesSet = new Set<string>()
const parallelConfig = dag.parallelConfigs.get(parallelId)
if (!parallelConfig) {
throw new Error(`Parallel config not found: ${parallelId}`)
}
const distributionItems = parseDistributionItems(parallelConfig)
const branchCount = calculateBranchCount(parallelConfig, distributionItems)
const entryNodes: string[] = []
const terminalNodes: string[] = []
for (const nodeId of nodes) {
let hasAnyBranch = false
const templateId = buildBranchNodeId(nodeId, 0)
const templateNode = dag.nodes.get(templateId)
for (let i = 0; i < branchCount; i++) {
if (dag.nodes.has(buildBranchNodeId(nodeId, i))) {
hasAnyBranch = true
break
}
}
if (!hasAnyBranch) continue
const firstBranchId = buildBranchNodeId(nodeId, 0)
const firstBranchNode = dag.nodes.get(firstBranchId)
if (!firstBranchNode) continue
if (!templateNode) continue
let hasIncomingFromParallel = false
for (const incomingNodeId of firstBranchNode.incomingEdges) {
for (const incomingNodeId of templateNode.incomingEdges) {
const originalNodeId = extractBaseBlockId(incomingNodeId)
if (nodesSet.has(originalNodeId)) {
hasIncomingFromParallel = true
break
}
}
if (!hasIncomingFromParallel) {
entryNodesSet.add(nodeId)
entryNodes.push(nodeId)
}
}
for (const nodeId of nodes) {
let hasAnyBranch = false
for (let i = 0; i < branchCount; i++) {
if (dag.nodes.has(buildBranchNodeId(nodeId, i))) {
hasAnyBranch = true
break
}
}
if (!hasAnyBranch) continue
const firstBranchId = buildBranchNodeId(nodeId, 0)
const firstBranchNode = dag.nodes.get(firstBranchId)
if (!firstBranchNode) continue
let hasOutgoingToParallel = false
for (const [_, edge] of firstBranchNode.outgoingEdges) {
for (const [, edge] of templateNode.outgoingEdges) {
const originalTargetId = extractBaseBlockId(edge.target)
if (nodesSet.has(originalTargetId)) {
hasOutgoingToParallel = true
break
}
}
if (!hasOutgoingToParallel) {
terminalNodesSet.add(nodeId)
terminalNodes.push(nodeId)
}
}
return {
entryNodes: Array.from(entryNodesSet),
terminalNodes: Array.from(terminalNodesSet),
branchCount,
}
return { entryNodes, terminalNodes }
}
private getParallelId(blockId: string, dag: DAG): string | null {

View File

@@ -1,18 +1,8 @@
import { BlockType, isMetadataOnlyBlockType } from '@/executor/constants'
import type { DAG, DAGNode } from '@/executor/dag/builder'
import {
buildBranchNodeId,
calculateBranchCount,
parseDistributionItems,
} from '@/executor/utils/subflow-utils'
import type { DAG } from '@/executor/dag/builder'
import { buildBranchNodeId } from '@/executor/utils/subflow-utils'
import type { SerializedBlock, SerializedWorkflow } from '@/serializer/types'
interface ParallelExpansion {
parallelId: string
branchCount: number
distributionItems: any[]
}
export class NodeConstructor {
execute(
workflow: SerializedWorkflow,
@@ -37,7 +27,7 @@ export class NodeConstructor {
const parallelId = this.findParallelForBlock(block.id, dag)
if (parallelId) {
this.createParallelBranchNodes(block, parallelId, dag)
this.createParallelTemplateNode(block, parallelId, dag)
} else {
this.createRegularOrLoopNode(block, blocksInLoops, dag)
}
@@ -100,57 +90,27 @@ export class NodeConstructor {
}
}
private createParallelBranchNodes(block: SerializedBlock, parallelId: string, dag: DAG): void {
const expansion = this.calculateParallelExpansion(parallelId, dag)
for (let branchIndex = 0; branchIndex < expansion.branchCount; branchIndex++) {
const branchNode = this.createParallelBranchNode(block, branchIndex, expansion)
dag.nodes.set(branchNode.id, branchNode)
}
}
private calculateParallelExpansion(parallelId: string, dag: DAG): ParallelExpansion {
const config = dag.parallelConfigs.get(parallelId)
if (!config) {
throw new Error(`Parallel config not found: ${parallelId}`)
}
const distributionItems = parseDistributionItems(config)
const branchCount = calculateBranchCount(config, distributionItems)
return {
parallelId,
branchCount,
distributionItems,
}
}
private createParallelBranchNode(
baseBlock: SerializedBlock,
branchIndex: number,
expansion: ParallelExpansion
): DAGNode {
const branchNodeId = buildBranchNodeId(baseBlock.id, branchIndex)
private createParallelTemplateNode(block: SerializedBlock, parallelId: string, dag: DAG): void {
const templateNodeId = buildBranchNodeId(block.id, 0)
const blockClone: SerializedBlock = {
...baseBlock,
id: branchNodeId,
...block,
id: templateNodeId,
}
return {
id: branchNodeId,
dag.nodes.set(templateNodeId, {
id: templateNodeId,
block: blockClone,
incomingEdges: new Set(),
outgoingEdges: new Map(),
metadata: {
isParallelBranch: true,
parallelId: expansion.parallelId,
branchIndex,
branchTotal: expansion.branchCount,
distributionItem: expansion.distributionItems[branchIndex],
isPauseResponse: baseBlock.metadata?.id === BlockType.HUMAN_IN_THE_LOOP,
originalBlockId: baseBlock.id,
parallelId,
branchIndex: 0,
branchTotal: 1,
isPauseResponse: block.metadata?.id === BlockType.HUMAN_IN_THE_LOOP,
originalBlockId: block.id,
},
}
})
}
private createRegularOrLoopNode(
@@ -176,44 +136,6 @@ export class NodeConstructor {
})
}
private createTriggerNode(
block: SerializedBlock,
triggerId: string,
options: {
loopId?: string
isParallelBranch?: boolean
parallelId?: string
branchIndex?: number
branchTotal?: number
}
): DAGNode {
const triggerBlock: SerializedBlock = {
...block,
id: triggerId,
enabled: true,
metadata: {
...block.metadata,
id: BlockType.START_TRIGGER,
},
}
return {
id: triggerId,
block: triggerBlock,
incomingEdges: new Set(),
outgoingEdges: new Map(),
metadata: {
isResumeTrigger: true,
originalBlockId: block.id,
loopId: options.loopId,
isParallelBranch: options.isParallelBranch,
parallelId: options.parallelId,
branchIndex: options.branchIndex,
branchTotal: options.branchTotal,
},
}
}
private findLoopIdForBlock(blockId: string, dag: DAG): string | undefined {
for (const [loopId, loopConfig] of dag.loopConfigs) {
if (loopConfig.nodes.includes(blockId)) {

View File

@@ -0,0 +1,88 @@
import { createLogger } from '@sim/logger'
import { BlockType, PARALLEL, type SentinelType } from '@/executor/constants'
import type { DAG, DAGNode } from '@/executor/dag/builder'
import {
buildParallelSentinelEndId,
buildParallelSentinelStartId,
} from '@/executor/utils/subflow-utils'
const logger = createLogger('ParallelConstructor')
export class ParallelConstructor {
execute(dag: DAG, reachableBlocks: Set<string>): void {
for (const [parallelId, parallelConfig] of dag.parallelConfigs) {
const parallelNodes = parallelConfig.nodes
if (parallelNodes.length === 0) {
continue
}
if (!this.hasReachableNodes(parallelNodes, reachableBlocks)) {
continue
}
this.createSentinelPair(dag, parallelId)
}
}
private hasReachableNodes(parallelNodes: string[], reachableBlocks: Set<string>): boolean {
return parallelNodes.some((nodeId) => reachableBlocks.has(nodeId))
}
private createSentinelPair(dag: DAG, parallelId: string): void {
const startId = buildParallelSentinelStartId(parallelId)
const endId = buildParallelSentinelEndId(parallelId)
dag.nodes.set(
startId,
this.createSentinelNode({
id: startId,
parallelId,
sentinelType: PARALLEL.SENTINEL.START_TYPE,
blockType: BlockType.SENTINEL_START,
name: `${PARALLEL.SENTINEL.START_NAME_PREFIX} (${parallelId})`,
})
)
dag.nodes.set(
endId,
this.createSentinelNode({
id: endId,
parallelId,
sentinelType: PARALLEL.SENTINEL.END_TYPE,
blockType: BlockType.SENTINEL_END,
name: `${PARALLEL.SENTINEL.END_NAME_PREFIX} (${parallelId})`,
})
)
}
private createSentinelNode(config: {
id: string
parallelId: string
sentinelType: SentinelType
blockType: BlockType
name: string
}): DAGNode {
return {
id: config.id,
block: {
id: config.id,
enabled: true,
metadata: {
id: config.blockType,
name: config.name,
parallelId: config.parallelId,
},
config: { params: {} },
} as any,
incomingEdges: new Set(),
outgoingEdges: new Map(),
metadata: {
isSentinel: true,
isParallelSentinel: true,
sentinelType: config.sentinelType,
parallelId: config.parallelId,
},
}
}
}

View File

@@ -7,6 +7,7 @@ export interface DAGEdge {
export interface NodeMetadata {
isParallelBranch?: boolean
isParallelSentinel?: boolean
parallelId?: string
branchIndex?: number
branchTotal?: number

View File

@@ -129,6 +129,10 @@ export class EdgeManager {
return handle === EDGE.LOOP_CONTINUE || handle === EDGE.LOOP_CONTINUE_ALT
}
if (output.selectedRoute === EDGE.PARALLEL_EXIT) {
return handle === EDGE.PARALLEL_EXIT
}
if (!handle) {
return true
}

View File

@@ -53,18 +53,11 @@ export class NodeExecutionOrchestrator {
}
}
// Initialize parallel scope BEFORE execution so <parallel.currentItem> can be resolved
const parallelId = node.metadata.parallelId
if (parallelId && !this.parallelOrchestrator.getParallelScope(ctx, parallelId)) {
const totalBranches = node.metadata.branchTotal || 1
const parallelConfig = this.dag.parallelConfigs.get(parallelId)
const nodesInParallel = (parallelConfig as any)?.nodes?.length || 1
this.parallelOrchestrator.initializeParallelScope(
ctx,
parallelId,
totalBranches,
nodesInParallel
)
const nodesInParallel = parallelConfig?.nodes?.length || 1
this.parallelOrchestrator.initializeParallelScope(ctx, parallelId, nodesInParallel)
}
if (node.metadata.isSentinel) {
@@ -92,6 +85,12 @@ export class NodeExecutionOrchestrator {
): Promise<NormalizedBlockOutput> {
const sentinelType = node.metadata.sentinelType
const loopId = node.metadata.loopId
const parallelId = node.metadata.parallelId
const isParallelSentinel = node.metadata.isParallelSentinel
if (isParallelSentinel) {
return this.handleParallelSentinel(ctx, node, sentinelType, parallelId)
}
switch (sentinelType) {
case 'start': {
@@ -140,6 +139,42 @@ export class NodeExecutionOrchestrator {
}
}
private handleParallelSentinel(
ctx: ExecutionContext,
node: DAGNode,
sentinelType: string | undefined,
parallelId: string | undefined
): NormalizedBlockOutput {
if (!parallelId) {
logger.warn('Parallel sentinel called without parallelId')
return {}
}
if (sentinelType === 'start') {
if (!this.parallelOrchestrator.getParallelScope(ctx, parallelId)) {
const parallelConfig = this.dag.parallelConfigs.get(parallelId)
if (parallelConfig) {
const nodesInParallel = parallelConfig.nodes?.length || 1
this.parallelOrchestrator.initializeParallelScope(ctx, parallelId, nodesInParallel)
}
}
return { sentinelStart: true }
}
if (sentinelType === 'end') {
const result = this.parallelOrchestrator.aggregateParallelResults(ctx, parallelId)
return {
results: result.results || [],
sentinelEnd: true,
selectedRoute: EDGE.PARALLEL_EXIT,
totalBranches: result.totalBranches,
}
}
logger.warn('Unknown parallel sentinel type', { sentinelType })
return {}
}
async handleNodeCompletion(
ctx: ExecutionContext,
nodeId: string,
@@ -188,15 +223,9 @@ export class NodeExecutionOrchestrator {
): void {
const scope = this.parallelOrchestrator.getParallelScope(ctx, parallelId)
if (!scope) {
const totalBranches = node.metadata.branchTotal || 1
const parallelConfig = this.dag.parallelConfigs.get(parallelId)
const nodesInParallel = (parallelConfig as any)?.nodes?.length || 1
this.parallelOrchestrator.initializeParallelScope(
ctx,
parallelId,
totalBranches,
nodesInParallel
)
const nodesInParallel = parallelConfig?.nodes?.length || 1
this.parallelOrchestrator.initializeParallelScope(ctx, parallelId, nodesInParallel)
}
const allComplete = this.parallelOrchestrator.handleParallelBranchCompletion(
ctx,

View File

@@ -1,17 +1,14 @@
import { createLogger } from '@sim/logger'
import { DEFAULTS } from '@/executor/constants'
import type { DAG, DAGNode } from '@/executor/dag/builder'
import type { DAG } from '@/executor/dag/builder'
import type { ParallelScope } from '@/executor/execution/state'
import type { BlockStateWriter, ContextExtensions } from '@/executor/execution/types'
import type { ExecutionContext, NormalizedBlockOutput } from '@/executor/types'
import type { ParallelConfigWithNodes } from '@/executor/types/parallel'
import { ParallelExpander } from '@/executor/utils/parallel-expansion'
import {
addSubflowErrorLog,
buildBranchNodeId,
calculateBranchCount,
extractBaseBlockId,
extractBranchIndex,
parseDistributionItems,
resolveArrayInput,
validateMaxCount,
} from '@/executor/utils/subflow-utils'
@@ -37,6 +34,7 @@ export interface ParallelAggregationResult {
export class ParallelOrchestrator {
private resolver: VariableResolver | null = null
private contextExtensions: ContextExtensions | null = null
private expander = new ParallelExpander()
constructor(
private dag: DAG,
@@ -54,100 +52,95 @@ export class ParallelOrchestrator {
initializeParallelScope(
ctx: ExecutionContext,
parallelId: string,
totalBranches: number,
terminalNodesCount = 1
): ParallelScope {
const parallelConfig = this.dag.parallelConfigs.get(parallelId)
let items: any[] | undefined
if (parallelConfig) {
try {
items = this.resolveDistributionItems(ctx, parallelConfig)
} catch (error) {
const errorMessage = `Parallel Items did not resolve: ${error instanceof Error ? error.message : String(error)}`
logger.error(errorMessage, {
parallelId,
distribution: parallelConfig.distribution,
})
this.addParallelErrorLog(ctx, parallelId, errorMessage, {
distribution: parallelConfig.distribution,
})
this.setErrorScope(ctx, parallelId, errorMessage)
throw new Error(errorMessage)
}
if (!parallelConfig) {
throw new Error(`Parallel config not found: ${parallelId}`)
}
const actualBranchCount = items && items.length > totalBranches ? items.length : totalBranches
let items: any[] | undefined
let branchCount: number
try {
const resolved = this.resolveBranchCount(ctx, parallelConfig)
branchCount = resolved.branchCount
items = resolved.items
} catch (error) {
const errorMessage = `Parallel Items did not resolve: ${error instanceof Error ? error.message : String(error)}`
logger.error(errorMessage, { parallelId, distribution: parallelConfig.distribution })
this.addParallelErrorLog(ctx, parallelId, errorMessage, {
distribution: parallelConfig.distribution,
})
this.setErrorScope(ctx, parallelId, errorMessage)
throw new Error(errorMessage)
}
const branchError = validateMaxCount(
actualBranchCount,
branchCount,
DEFAULTS.MAX_PARALLEL_BRANCHES,
'Parallel branch count'
)
if (branchError) {
logger.error(branchError, { parallelId, actualBranchCount })
logger.error(branchError, { parallelId, branchCount })
this.addParallelErrorLog(ctx, parallelId, branchError, {
distribution: parallelConfig?.distribution,
branchCount: actualBranchCount,
distribution: parallelConfig.distribution,
branchCount,
})
this.setErrorScope(ctx, parallelId, branchError)
throw new Error(branchError)
}
const { entryNodes } = this.expander.expandParallel(this.dag, parallelId, branchCount, items)
const scope: ParallelScope = {
parallelId,
totalBranches: actualBranchCount,
totalBranches: branchCount,
branchOutputs: new Map(),
completedCount: 0,
totalExpectedNodes: actualBranchCount * terminalNodesCount,
totalExpectedNodes: branchCount * terminalNodesCount,
items,
}
if (!ctx.parallelExecutions) {
ctx.parallelExecutions = new Map()
}
ctx.parallelExecutions.set(parallelId, scope)
// Dynamically expand DAG if needed
if (items && items.length > totalBranches && parallelConfig) {
logger.info('Dynamically expanding parallel branches', {
parallelId,
existingBranches: totalBranches,
targetBranches: items.length,
itemsCount: items.length,
})
const newEntryNodes = this.expandParallelBranches(
parallelId,
parallelConfig,
totalBranches,
items.length
)
logger.info('Parallel expansion complete', {
parallelId,
newEntryNodes,
totalNodesInDag: this.dag.nodes.size,
})
// Add new entry nodes to pending dynamic nodes so the engine can schedule them
if (newEntryNodes.length > 0) {
if (!ctx.pendingDynamicNodes) {
ctx.pendingDynamicNodes = []
}
ctx.pendingDynamicNodes.push(...newEntryNodes)
const newEntryNodes = entryNodes.filter((nodeId) => !nodeId.endsWith('__branch-0'))
if (newEntryNodes.length > 0) {
if (!ctx.pendingDynamicNodes) {
ctx.pendingDynamicNodes = []
}
} else {
logger.info('No parallel expansion needed', {
parallelId,
itemsLength: items?.length,
totalBranches,
hasParallelConfig: !!parallelConfig,
})
ctx.pendingDynamicNodes.push(...newEntryNodes)
}
logger.info('Parallel scope initialized', {
parallelId,
branchCount,
entryNodeCount: entryNodes.length,
newEntryNodes: newEntryNodes.length,
})
return scope
}
private resolveBranchCount(
ctx: ExecutionContext,
config: SerializedParallel
): { branchCount: number; items?: any[] } {
if (config.parallelType === 'count') {
return { branchCount: config.count ?? 1 }
}
const items = this.resolveDistributionItems(ctx, config)
if (items.length === 0) {
return { branchCount: config.count ?? 1 }
}
return { branchCount: items.length, items }
}
private addParallelErrorLog(
ctx: ExecutionContext,
parallelId: string,
@@ -180,189 +173,6 @@ export class ParallelOrchestrator {
ctx.parallelExecutions.set(parallelId, scope)
}
/**
* Dynamically expand the DAG to include additional branch nodes when
* the resolved item count exceeds the pre-built branch count.
*/
private expandParallelBranches(
parallelId: string,
config: SerializedParallel,
existingBranchCount: number,
targetBranchCount: number
): string[] {
// Get all blocks that are part of this parallel
const blocksInParallel = config.nodes
const blocksInParallelSet = new Set(blocksInParallel)
// Step 1: Create all new nodes first
for (const blockId of blocksInParallel) {
const branch0NodeId = buildBranchNodeId(blockId, 0)
const templateNode = this.dag.nodes.get(branch0NodeId)
if (!templateNode) {
logger.warn('Template node not found for parallel expansion', { blockId, branch0NodeId })
continue
}
for (let branchIndex = existingBranchCount; branchIndex < targetBranchCount; branchIndex++) {
const newNodeId = buildBranchNodeId(blockId, branchIndex)
const newNode: DAGNode = {
id: newNodeId,
block: {
...templateNode.block,
id: newNodeId,
},
incomingEdges: new Set(),
outgoingEdges: new Map(),
metadata: {
...templateNode.metadata,
branchIndex,
branchTotal: targetBranchCount,
originalBlockId: blockId,
},
}
this.dag.nodes.set(newNodeId, newNode)
}
}
// Step 2: Wire edges between the new branch nodes
this.wireExpandedBranchEdges(
parallelId,
blocksInParallel,
existingBranchCount,
targetBranchCount
)
// Step 3: Update metadata on existing nodes to reflect new total
this.updateExistingBranchMetadata(blocksInParallel, existingBranchCount, targetBranchCount)
// Step 4: Identify entry nodes AFTER edges are wired
// Entry nodes are those with no INTERNAL incoming edges (edges from outside parallel don't count)
const newEntryNodes: string[] = []
for (const blockId of blocksInParallel) {
const branch0NodeId = buildBranchNodeId(blockId, 0)
const templateNode = this.dag.nodes.get(branch0NodeId)
if (!templateNode) continue
// Check if template has any INTERNAL incoming edges
let hasInternalIncoming = false
for (const incomingId of templateNode.incomingEdges) {
const baseIncomingId = extractBaseBlockId(incomingId)
if (blocksInParallelSet.has(baseIncomingId)) {
hasInternalIncoming = true
break
}
}
// If no internal incoming edges, the new branches of this block are entry nodes
if (!hasInternalIncoming) {
for (
let branchIndex = existingBranchCount;
branchIndex < targetBranchCount;
branchIndex++
) {
newEntryNodes.push(buildBranchNodeId(blockId, branchIndex))
}
}
}
return newEntryNodes
}
/**
* Wire edges between expanded branch nodes by replicating the edge pattern from branch 0.
* Handles both internal edges (within the parallel) and exit edges (to blocks after the parallel).
*/
private wireExpandedBranchEdges(
parallelId: string,
blocksInParallel: string[],
existingBranchCount: number,
targetBranchCount: number
): void {
const blocksInParallelSet = new Set(blocksInParallel)
// For each block, look at branch 0's outgoing edges and replicate for new branches
for (const blockId of blocksInParallel) {
const branch0NodeId = buildBranchNodeId(blockId, 0)
const branch0Node = this.dag.nodes.get(branch0NodeId)
if (!branch0Node) continue
// Replicate outgoing edges for each new branch
for (const [, edge] of branch0Node.outgoingEdges) {
// Use edge.target (the actual target node ID), not the Map key which may be a formatted edge ID
const actualTargetNodeId = edge.target
// Extract the base target block ID
const baseTargetId = extractBaseBlockId(actualTargetNodeId)
// Check if target is inside or outside the parallel
const isInternalEdge = blocksInParallelSet.has(baseTargetId)
for (
let branchIndex = existingBranchCount;
branchIndex < targetBranchCount;
branchIndex++
) {
const sourceNodeId = buildBranchNodeId(blockId, branchIndex)
const sourceNode = this.dag.nodes.get(sourceNodeId)
if (!sourceNode) continue
if (isInternalEdge) {
// Internal edge: wire to the corresponding branch of the target
const newTargetNodeId = buildBranchNodeId(baseTargetId, branchIndex)
const targetNode = this.dag.nodes.get(newTargetNodeId)
if (targetNode) {
sourceNode.outgoingEdges.set(newTargetNodeId, {
target: newTargetNodeId,
sourceHandle: edge.sourceHandle,
targetHandle: edge.targetHandle,
})
targetNode.incomingEdges.add(sourceNodeId)
}
} else {
// Exit edge: wire to the same external target (blocks after the parallel)
// All branches point to the same external node
const externalTargetNode = this.dag.nodes.get(actualTargetNodeId)
if (externalTargetNode) {
sourceNode.outgoingEdges.set(actualTargetNodeId, {
target: actualTargetNodeId,
sourceHandle: edge.sourceHandle,
targetHandle: edge.targetHandle,
})
// Add incoming edge from this new branch to the external node
externalTargetNode.incomingEdges.add(sourceNodeId)
}
}
}
}
}
}
/**
* Update existing branch nodes' metadata to reflect the new total branch count.
*/
private updateExistingBranchMetadata(
blocksInParallel: string[],
existingBranchCount: number,
targetBranchCount: number
): void {
for (const blockId of blocksInParallel) {
for (let branchIndex = 0; branchIndex < existingBranchCount; branchIndex++) {
const nodeId = buildBranchNodeId(blockId, branchIndex)
const node = this.dag.nodes.get(nodeId)
if (node) {
node.metadata.branchTotal = targetBranchCount
}
}
}
}
private resolveDistributionItems(ctx: ExecutionContext, config: SerializedParallel): any[] {
if (config.parallelType === 'count') {
return []
@@ -429,28 +239,25 @@ export class ParallelOrchestrator {
}
}
extractBranchMetadata(nodeId: string): ParallelBranchMetadata | null {
const node = this.dag.nodes.get(nodeId)
if (!node?.metadata.isParallelBranch) {
return null
}
const branchIndex = extractBranchIndex(nodeId)
if (branchIndex === null) {
return null
}
const baseId = extractBaseBlockId(nodeId)
const parallelId = this.findParallelIdForNode(baseId)
const parallelId = node.metadata.parallelId
if (!parallelId) {
return null
}
const parallelConfig = this.dag.parallelConfigs.get(parallelId)
if (!parallelConfig) {
return null
}
const { totalBranches, distributionItem } = this.getParallelConfigInfo(
parallelConfig,
branchIndex
)
return {
branchIndex,
branchTotal: totalBranches,
distributionItem,
branchTotal: node.metadata.branchTotal ?? 1,
distributionItem: node.metadata.distributionItem,
parallelId,
}
}
@@ -468,18 +275,4 @@ export class ParallelOrchestrator {
}
return undefined
}
private getParallelConfigInfo(
parallelConfig: SerializedParallel,
branchIndex: number
): { totalBranches: number; distributionItem?: any } {
const distributionItems = parseDistributionItems(parallelConfig)
const totalBranches = calculateBranchCount(parallelConfig, distributionItems)
let distributionItem: any
if (Array.isArray(distributionItems) && branchIndex < distributionItems.length) {
distributionItem = distributionItems[branchIndex]
}
return { totalBranches, distributionItem }
}
}

View File

@@ -0,0 +1,255 @@
import { createLogger } from '@sim/logger'
import type { DAG, DAGNode } from '@/executor/dag/builder'
import type { SerializedBlock } from '@/serializer/types'
import {
buildBranchNodeId,
buildParallelSentinelEndId,
buildParallelSentinelStartId,
extractBaseBlockId,
} from './subflow-utils'
const logger = createLogger('ParallelExpansion')
export interface ExpansionResult {
entryNodes: string[]
terminalNodes: string[]
allBranchNodes: string[]
}
export class ParallelExpander {
expandParallel(
dag: DAG,
parallelId: string,
branchCount: number,
distributionItems?: any[]
): ExpansionResult {
const config = dag.parallelConfigs.get(parallelId)
if (!config) {
throw new Error(`Parallel config not found: ${parallelId}`)
}
const blocksInParallel = config.nodes || []
if (blocksInParallel.length === 0) {
return { entryNodes: [], terminalNodes: [], allBranchNodes: [] }
}
const blocksSet = new Set(blocksInParallel)
const allBranchNodes: string[] = []
for (const blockId of blocksInParallel) {
const templateId = buildBranchNodeId(blockId, 0)
const templateNode = dag.nodes.get(templateId)
if (!templateNode) {
logger.warn('Template node not found', { blockId, templateId })
continue
}
for (let i = 0; i < branchCount; i++) {
const branchNodeId = buildBranchNodeId(blockId, i)
allBranchNodes.push(branchNodeId)
if (i === 0) {
this.updateBranchMetadata(templateNode, i, branchCount, distributionItems?.[i])
continue
}
const branchNode = this.cloneTemplateNode(
templateNode,
blockId,
i,
branchCount,
distributionItems?.[i]
)
dag.nodes.set(branchNodeId, branchNode)
}
}
this.wireInternalEdges(dag, blocksInParallel, blocksSet, branchCount)
const { entryNodes, terminalNodes } = this.identifyBoundaryNodes(
dag,
blocksInParallel,
blocksSet,
branchCount
)
this.wireSentinelEdges(dag, parallelId, entryNodes, terminalNodes, branchCount)
logger.info('Parallel expanded', {
parallelId,
branchCount,
blocksCount: blocksInParallel.length,
totalNodes: allBranchNodes.length,
})
return { entryNodes, terminalNodes, allBranchNodes }
}
private updateBranchMetadata(
node: DAGNode,
branchIndex: number,
branchTotal: number,
distributionItem?: any
): void {
node.metadata.branchIndex = branchIndex
node.metadata.branchTotal = branchTotal
if (distributionItem !== undefined) {
node.metadata.distributionItem = distributionItem
}
}
private cloneTemplateNode(
template: DAGNode,
originalBlockId: string,
branchIndex: number,
branchTotal: number,
distributionItem?: any
): DAGNode {
const branchNodeId = buildBranchNodeId(originalBlockId, branchIndex)
const blockClone: SerializedBlock = {
...template.block,
id: branchNodeId,
}
return {
id: branchNodeId,
block: blockClone,
incomingEdges: new Set(),
outgoingEdges: new Map(),
metadata: {
...template.metadata,
branchIndex,
branchTotal,
distributionItem,
originalBlockId,
},
}
}
private wireInternalEdges(
dag: DAG,
blocksInParallel: string[],
blocksSet: Set<string>,
branchCount: number
): void {
for (const blockId of blocksInParallel) {
const templateId = buildBranchNodeId(blockId, 0)
const templateNode = dag.nodes.get(templateId)
if (!templateNode) continue
for (const [, edge] of templateNode.outgoingEdges) {
const baseTargetId = extractBaseBlockId(edge.target)
if (!blocksSet.has(baseTargetId)) continue
for (let i = 1; i < branchCount; i++) {
const sourceNodeId = buildBranchNodeId(blockId, i)
const targetNodeId = buildBranchNodeId(baseTargetId, i)
const sourceNode = dag.nodes.get(sourceNodeId)
const targetNode = dag.nodes.get(targetNodeId)
if (!sourceNode || !targetNode) continue
const edgeId = edge.sourceHandle
? `${sourceNodeId}${targetNodeId}-${edge.sourceHandle}`
: `${sourceNodeId}${targetNodeId}`
sourceNode.outgoingEdges.set(edgeId, {
target: targetNodeId,
sourceHandle: edge.sourceHandle,
targetHandle: edge.targetHandle,
})
targetNode.incomingEdges.add(sourceNodeId)
}
}
}
}
private identifyBoundaryNodes(
dag: DAG,
blocksInParallel: string[],
blocksSet: Set<string>,
branchCount: number
): { entryNodes: string[]; terminalNodes: string[] } {
const entryNodes: string[] = []
const terminalNodes: string[] = []
for (const blockId of blocksInParallel) {
const templateId = buildBranchNodeId(blockId, 0)
const templateNode = dag.nodes.get(templateId)
if (!templateNode) continue
const hasInternalIncoming = this.hasInternalIncomingEdge(templateNode, blocksSet)
const hasInternalOutgoing = this.hasInternalOutgoingEdge(templateNode, blocksSet)
for (let i = 0; i < branchCount; i++) {
const branchNodeId = buildBranchNodeId(blockId, i)
if (!hasInternalIncoming) {
entryNodes.push(branchNodeId)
}
if (!hasInternalOutgoing) {
terminalNodes.push(branchNodeId)
}
}
}
return { entryNodes, terminalNodes }
}
private hasInternalIncomingEdge(node: DAGNode, blocksSet: Set<string>): boolean {
for (const incomingId of node.incomingEdges) {
const baseId = extractBaseBlockId(incomingId)
if (blocksSet.has(baseId)) {
return true
}
}
return false
}
private hasInternalOutgoingEdge(node: DAGNode, blocksSet: Set<string>): boolean {
for (const [, edge] of node.outgoingEdges) {
const baseId = extractBaseBlockId(edge.target)
if (blocksSet.has(baseId)) {
return true
}
}
return false
}
private wireSentinelEdges(
dag: DAG,
parallelId: string,
entryNodes: string[],
terminalNodes: string[],
branchCount: number
): void {
const sentinelStartId = buildParallelSentinelStartId(parallelId)
const sentinelEndId = buildParallelSentinelEndId(parallelId)
const sentinelStart = dag.nodes.get(sentinelStartId)
const sentinelEnd = dag.nodes.get(sentinelEndId)
if (!sentinelStart || !sentinelEnd) {
logger.warn('Sentinel nodes not found', { parallelId, sentinelStartId, sentinelEndId })
return
}
sentinelStart.outgoingEdges.clear()
for (const entryNodeId of entryNodes) {
const entryNode = dag.nodes.get(entryNodeId)
if (!entryNode) continue
const edgeId = `${sentinelStartId}${entryNodeId}`
sentinelStart.outgoingEdges.set(edgeId, { target: entryNodeId })
entryNode.incomingEdges.add(sentinelStartId)
}
for (const terminalNodeId of terminalNodes) {
const terminalNode = dag.nodes.get(terminalNodeId)
if (!terminalNode) continue
const edgeId = `${terminalNodeId}${sentinelEndId}`
terminalNode.outgoingEdges.set(edgeId, { target: sentinelEndId })
sentinelEnd.incomingEdges.add(terminalNodeId)
}
}
}

View File

@@ -3,101 +3,70 @@ import { LOOP, PARALLEL, PARSING, REFERENCE } from '@/executor/constants'
import type { ContextExtensions } from '@/executor/execution/types'
import type { BlockLog, ExecutionContext } from '@/executor/types'
import type { VariableResolver } from '@/executor/variables/resolver'
import type { SerializedParallel } from '@/serializer/types'
const logger = createLogger('SubflowUtils')
const BRANCH_PATTERN = new RegExp(`${PARALLEL.BRANCH.PREFIX}\\d+${PARALLEL.BRANCH.SUFFIX}$`)
const BRANCH_INDEX_PATTERN = new RegExp(`${PARALLEL.BRANCH.PREFIX}(\\d+)${PARALLEL.BRANCH.SUFFIX}$`)
const SENTINEL_START_PATTERN = new RegExp(
const LOOP_SENTINEL_START_PATTERN = new RegExp(
`${LOOP.SENTINEL.PREFIX}(.+)${LOOP.SENTINEL.START_SUFFIX}`
)
const SENTINEL_END_PATTERN = new RegExp(`${LOOP.SENTINEL.PREFIX}(.+)${LOOP.SENTINEL.END_SUFFIX}`)
const LOOP_SENTINEL_END_PATTERN = new RegExp(
`${LOOP.SENTINEL.PREFIX}(.+)${LOOP.SENTINEL.END_SUFFIX}`
)
const PARALLEL_SENTINEL_START_PATTERN = new RegExp(
`${PARALLEL.SENTINEL.PREFIX}(.+)${PARALLEL.SENTINEL.START_SUFFIX}`
)
const PARALLEL_SENTINEL_END_PATTERN = new RegExp(
`${PARALLEL.SENTINEL.PREFIX}(.+)${PARALLEL.SENTINEL.END_SUFFIX}`
)
/** Build sentinel start node ID */
export function buildSentinelStartId(loopId: string): string {
return `${LOOP.SENTINEL.PREFIX}${loopId}${LOOP.SENTINEL.START_SUFFIX}`
}
/**
* Build sentinel end node ID
*/
export function buildSentinelEndId(loopId: string): string {
return `${LOOP.SENTINEL.PREFIX}${loopId}${LOOP.SENTINEL.END_SUFFIX}`
}
/**
* Check if a node ID is a sentinel node
*/
export function isSentinelNodeId(nodeId: string): boolean {
export function buildParallelSentinelStartId(parallelId: string): string {
return `${PARALLEL.SENTINEL.PREFIX}${parallelId}${PARALLEL.SENTINEL.START_SUFFIX}`
}
export function buildParallelSentinelEndId(parallelId: string): string {
return `${PARALLEL.SENTINEL.PREFIX}${parallelId}${PARALLEL.SENTINEL.END_SUFFIX}`
}
export function isLoopSentinelNodeId(nodeId: string): boolean {
return nodeId.includes(LOOP.SENTINEL.START_SUFFIX) || nodeId.includes(LOOP.SENTINEL.END_SUFFIX)
}
export function isParallelSentinelNodeId(nodeId: string): boolean {
return (
nodeId.includes(PARALLEL.SENTINEL.START_SUFFIX) || nodeId.includes(PARALLEL.SENTINEL.END_SUFFIX)
)
}
export function isSentinelNodeId(nodeId: string): boolean {
return isLoopSentinelNodeId(nodeId) || isParallelSentinelNodeId(nodeId)
}
export function extractLoopIdFromSentinel(sentinelId: string): string | null {
const startMatch = sentinelId.match(SENTINEL_START_PATTERN)
const startMatch = sentinelId.match(LOOP_SENTINEL_START_PATTERN)
if (startMatch) return startMatch[1]
const endMatch = sentinelId.match(SENTINEL_END_PATTERN)
const endMatch = sentinelId.match(LOOP_SENTINEL_END_PATTERN)
if (endMatch) return endMatch[1]
return null
}
/**
* Parse distribution items from parallel config
* Handles: arrays, JSON strings, objects, and references
* Note: References (starting with '<') cannot be resolved at DAG construction time,
* they must be resolved at runtime. This function returns [] for references.
*/
export function parseDistributionItems(config: SerializedParallel): any[] {
const rawItems = config.distribution ?? []
// Already an array - return as-is
if (Array.isArray(rawItems)) {
return rawItems
}
// Object - convert to entries array (consistent with loop forEach behavior)
if (typeof rawItems === 'object' && rawItems !== null) {
return Object.entries(rawItems)
}
// String handling
if (typeof rawItems === 'string') {
// References cannot be resolved at DAG construction time
if (rawItems.startsWith(REFERENCE.START) && rawItems.endsWith(REFERENCE.END)) {
return []
}
// Try to parse as JSON
try {
const normalizedJSON = rawItems.replace(/'/g, '"')
const parsed = JSON.parse(normalizedJSON)
if (Array.isArray(parsed)) {
return parsed
}
// Parsed to non-array (e.g. object) - convert to entries
if (typeof parsed === 'object' && parsed !== null) {
return Object.entries(parsed)
}
return []
} catch (error) {
logger.error('Failed to parse distribution items', {
rawItems,
error: error instanceof Error ? error.message : String(error),
})
return []
}
}
return []
}
/**
* Calculate branch count from parallel config
*/
export function calculateBranchCount(config: SerializedParallel, distributionItems: any[]): number {
const explicitCount = config.count ?? PARALLEL.DEFAULT_COUNT
if (config.parallelType === PARALLEL.TYPE.COLLECTION && distributionItems.length > 0) {
return distributionItems.length
}
return explicitCount
export function extractParallelIdFromSentinel(sentinelId: string): string | null {
const startMatch = sentinelId.match(PARALLEL_SENTINEL_START_PATTERN)
if (startMatch) return startMatch[1]
const endMatch = sentinelId.match(PARALLEL_SENTINEL_END_PATTERN)
if (endMatch) return endMatch[1]
return null
}
/**
* Build branch node ID with subscript notation
* Example: ("blockId", 2) → "blockId₍2₎"
@@ -119,20 +88,23 @@ export function isBranchNodeId(nodeId: string): boolean {
}
export function isLoopNode(nodeId: string): boolean {
return isSentinelNodeId(nodeId) || nodeId.startsWith(LOOP.SENTINEL.PREFIX)
return isLoopSentinelNodeId(nodeId) || nodeId.startsWith(LOOP.SENTINEL.PREFIX)
}
export function isParallelNode(nodeId: string): boolean {
return isBranchNodeId(nodeId)
return isBranchNodeId(nodeId) || isParallelSentinelNodeId(nodeId)
}
export function normalizeNodeId(nodeId: string): string {
if (isBranchNodeId(nodeId)) {
return extractBaseBlockId(nodeId)
}
if (isSentinelNodeId(nodeId)) {
if (isLoopSentinelNodeId(nodeId)) {
return extractLoopIdFromSentinel(nodeId) || nodeId
}
if (isParallelSentinelNodeId(nodeId)) {
return extractParallelIdFromSentinel(nodeId) || nodeId
}
return nodeId
}

View File

@@ -56,6 +56,7 @@ type SkippedItemType =
| 'invalid_subblock_field'
| 'missing_required_params'
| 'invalid_subflow_parent'
| 'nested_subflow_not_allowed'
| 'duplicate_block_name'
/**
@@ -1487,6 +1488,17 @@ function applyOperationsToWorkflowState(
return
}
if (childBlock.type === 'loop' || childBlock.type === 'parallel') {
logSkippedItem(skippedItems, {
type: 'nested_subflow_not_allowed',
operationType: 'edit_nested_node',
blockId: childId,
reason: `Cannot nest ${childBlock.type} inside ${block.type} - nested subflows are not supported`,
details: { parentType: block.type, childType: childBlock.type },
})
return
}
const childBlockState = createBlockFromParams(
childId,
childBlock,
@@ -1733,6 +1745,17 @@ function applyOperationsToWorkflowState(
return
}
if (childBlock.type === 'loop' || childBlock.type === 'parallel') {
logSkippedItem(skippedItems, {
type: 'nested_subflow_not_allowed',
operationType: 'add_nested_node',
blockId: childId,
reason: `Cannot nest ${childBlock.type} inside ${params.type} - nested subflows are not supported`,
details: { parentType: params.type, childType: childBlock.type },
})
return
}
const childBlockState = createBlockFromParams(
childId,
childBlock,
@@ -1799,6 +1822,17 @@ function applyOperationsToWorkflowState(
break
}
if (params.type === 'loop' || params.type === 'parallel') {
logSkippedItem(skippedItems, {
type: 'nested_subflow_not_allowed',
operationType: 'insert_into_subflow',
blockId: block_id,
reason: `Cannot nest ${params.type} inside ${subflowBlock.type} - nested subflows are not supported`,
details: { parentType: subflowBlock.type, childType: params.type },
})
break
}
// Get block configuration
const blockConfig = getAllBlocks().find((block) => block.type === params.type)
@@ -1806,6 +1840,17 @@ function applyOperationsToWorkflowState(
const existingBlock = modifiedState.blocks[block_id]
if (existingBlock) {
if (existingBlock.type === 'loop' || existingBlock.type === 'parallel') {
logSkippedItem(skippedItems, {
type: 'nested_subflow_not_allowed',
operationType: 'insert_into_subflow',
blockId: block_id,
reason: `Cannot move ${existingBlock.type} into ${subflowBlock.type} - nested subflows are not supported`,
details: { parentType: subflowBlock.type, childType: existingBlock.type },
})
break
}
// Moving existing block into subflow - just update parent
existingBlock.data = {
...existingBlock.data,