diff --git a/apps/sim/executor/constants.ts b/apps/sim/executor/constants.ts index 134f6de28..fd7c44436 100644 --- a/apps/sim/executor/constants.ts +++ b/apps/sim/executor/constants.ts @@ -53,6 +53,7 @@ export const EDGE = { LOOP_CONTINUE: 'loop_continue', LOOP_CONTINUE_ALT: 'loop-continue-source', LOOP_EXIT: 'loop_exit', + PARALLEL_EXIT: 'parallel_exit', ERROR: 'error', SOURCE: 'source', DEFAULT: 'default', @@ -88,6 +89,16 @@ export const PARALLEL = { SUFFIX: '₎', }, + SENTINEL: { + PREFIX: 'parallel-', + START_SUFFIX: '-sentinel-start', + END_SUFFIX: '-sentinel-end', + START_TYPE: 'start' as SentinelType, + END_TYPE: 'end' as SentinelType, + START_NAME_PREFIX: 'Parallel Start', + END_NAME_PREFIX: 'Parallel End', + }, + DEFAULT_COUNT: 1, } as const diff --git a/apps/sim/executor/dag/builder.ts b/apps/sim/executor/dag/builder.ts index 592bb683a..b072faaed 100644 --- a/apps/sim/executor/dag/builder.ts +++ b/apps/sim/executor/dag/builder.ts @@ -2,9 +2,14 @@ import { createLogger } from '@sim/logger' import { EdgeConstructor } from '@/executor/dag/construction/edges' import { LoopConstructor } from '@/executor/dag/construction/loops' import { NodeConstructor } from '@/executor/dag/construction/nodes' +import { ParallelConstructor } from '@/executor/dag/construction/parallels' import { PathConstructor } from '@/executor/dag/construction/paths' import type { DAGEdge, NodeMetadata } from '@/executor/dag/types' -import { buildSentinelStartId, extractBaseBlockId } from '@/executor/utils/subflow-utils' +import { + buildParallelSentinelStartId, + buildSentinelStartId, + extractBaseBlockId, +} from '@/executor/utils/subflow-utils' import type { SerializedBlock, SerializedLoop, @@ -31,6 +36,7 @@ export interface DAG { export class DAGBuilder { private pathConstructor = new PathConstructor() private loopConstructor = new LoopConstructor() + private parallelConstructor = new ParallelConstructor() private nodeConstructor = new NodeConstructor() private edgeConstructor = new EdgeConstructor() @@ -50,6 +56,7 @@ export class DAGBuilder { const reachableBlocks = this.pathConstructor.execute(workflow, triggerBlockId) this.loopConstructor.execute(dag, reachableBlocks) + this.parallelConstructor.execute(dag, reachableBlocks) const { blocksInLoops, blocksInParallels, pauseTriggerMapping } = this.nodeConstructor.execute( workflow, @@ -135,7 +142,9 @@ export class DAGBuilder { ) } - const sentinelStartNode = dag.nodes.get(buildSentinelStartId(id)) + const sentinelStartId = + type === 'Loop' ? buildSentinelStartId(id) : buildParallelSentinelStartId(id) + const sentinelStartNode = dag.nodes.get(sentinelStartId) if (!sentinelStartNode) return const hasConnections = Array.from(sentinelStartNode.outgoingEdges.values()).some((edge) => diff --git a/apps/sim/executor/dag/construction/edges.test.ts b/apps/sim/executor/dag/construction/edges.test.ts index 211e547f6..aac75bd27 100644 --- a/apps/sim/executor/dag/construction/edges.test.ts +++ b/apps/sim/executor/dag/construction/edges.test.ts @@ -564,4 +564,548 @@ describe('EdgeConstructor', () => { expect(outsideNode.outgoingEdges.size).toBe(0) }) }) + + describe('Subflow-to-subflow edge wiring', () => { + describe('Parallel → Parallel', () => { + it('should wire parallel sentinel end to next parallel sentinel start', () => { + const parallel1Id = 'parallel-1' + const parallel2Id = 'parallel-2' + const task1Id = 'task-1' + const task2Id = 'task-2' + const task1TemplateId = `${task1Id}__branch-0` + const task2TemplateId = `${task2Id}__branch-0` + const parallel1SentinelStart = `parallel-${parallel1Id}-sentinel-start` + const parallel1SentinelEnd = `parallel-${parallel1Id}-sentinel-end` + const parallel2SentinelStart = `parallel-${parallel2Id}-sentinel-start` + const parallel2SentinelEnd = `parallel-${parallel2Id}-sentinel-end` + + const workflow = createMockWorkflow( + [ + createMockBlock(parallel1Id, 'parallel'), + createMockBlock(parallel2Id, 'parallel'), + createMockBlock(task1Id), + createMockBlock(task2Id), + ], + [{ source: parallel1Id, target: parallel2Id }], + {}, + { + [parallel1Id]: { id: parallel1Id, nodes: [task1Id], count: 2 }, + [parallel2Id]: { id: parallel2Id, nodes: [task2Id], count: 2 }, + } + ) + + const dag = createMockDAG([ + task1TemplateId, + task2TemplateId, + parallel1SentinelStart, + parallel1SentinelEnd, + parallel2SentinelStart, + parallel2SentinelEnd, + ]) + dag.parallelConfigs.set(parallel1Id, { id: parallel1Id, nodes: [task1Id], count: 2 }) + dag.parallelConfigs.set(parallel2Id, { id: parallel2Id, nodes: [task2Id], count: 2 }) + + edgeConstructor.execute( + workflow, + dag, + new Set([task1Id, task2Id]), + new Set(), + new Set([ + task1TemplateId, + task2TemplateId, + parallel1SentinelStart, + parallel1SentinelEnd, + parallel2SentinelStart, + parallel2SentinelEnd, + ]), + new Map() + ) + + const parallel1EndNode = dag.nodes.get(parallel1SentinelEnd)! + const edgesToParallel2 = Array.from(parallel1EndNode.outgoingEdges.values()).filter( + (e) => e.target === parallel2SentinelStart + ) + expect(edgesToParallel2.length).toBeGreaterThan(0) + expect(edgesToParallel2[0].sourceHandle).toBe('parallel_exit') + }) + }) + + describe('Loop → Loop', () => { + it('should wire loop sentinel end to next loop sentinel start', () => { + const loop1Id = 'loop-1' + const loop2Id = 'loop-2' + const task1Id = 'task-1' + const task2Id = 'task-2' + const loop1SentinelStart = `loop-${loop1Id}-sentinel-start` + const loop1SentinelEnd = `loop-${loop1Id}-sentinel-end` + const loop2SentinelStart = `loop-${loop2Id}-sentinel-start` + const loop2SentinelEnd = `loop-${loop2Id}-sentinel-end` + + const workflow = createMockWorkflow( + [ + createMockBlock(loop1Id, 'loop'), + createMockBlock(loop2Id, 'loop'), + createMockBlock(task1Id), + createMockBlock(task2Id), + ], + [{ source: loop1Id, target: loop2Id }], + { + [loop1Id]: { + id: loop1Id, + nodes: [task1Id], + iterations: 3, + loopType: 'for', + } as SerializedLoop, + [loop2Id]: { + id: loop2Id, + nodes: [task2Id], + iterations: 3, + loopType: 'for', + } as SerializedLoop, + } + ) + + const dag = createMockDAG([ + task1Id, + task2Id, + loop1SentinelStart, + loop1SentinelEnd, + loop2SentinelStart, + loop2SentinelEnd, + ]) + dag.loopConfigs.set(loop1Id, { + id: loop1Id, + nodes: [task1Id], + iterations: 3, + loopType: 'for', + } as SerializedLoop) + dag.loopConfigs.set(loop2Id, { + id: loop2Id, + nodes: [task2Id], + iterations: 3, + loopType: 'for', + } as SerializedLoop) + + edgeConstructor.execute( + workflow, + dag, + new Set(), + new Set([task1Id, task2Id]), + new Set([ + task1Id, + task2Id, + loop1SentinelStart, + loop1SentinelEnd, + loop2SentinelStart, + loop2SentinelEnd, + ]), + new Map() + ) + + const loop1EndNode = dag.nodes.get(loop1SentinelEnd)! + const edgesToLoop2 = Array.from(loop1EndNode.outgoingEdges.values()).filter( + (e) => e.target === loop2SentinelStart + ) + expect(edgesToLoop2.length).toBeGreaterThan(0) + expect(edgesToLoop2[0].sourceHandle).toBe('loop_exit') + + const loop1StartNode = dag.nodes.get(loop1SentinelStart)! + const earlyExitEdges = Array.from(loop1StartNode.outgoingEdges.values()).filter( + (e) => e.target === loop2SentinelStart && e.sourceHandle === 'loop_exit' + ) + expect(earlyExitEdges.length).toBeGreaterThan(0) + }) + }) + + describe('Parallel → Loop', () => { + it('should wire parallel sentinel end to loop sentinel start', () => { + const parallelId = 'parallel-1' + const loopId = 'loop-1' + const taskInParallelId = 'parallel-task' + const taskInParallelTemplateId = `${taskInParallelId}__branch-0` + const taskInLoopId = 'loop-task' + const parallelSentinelStart = `parallel-${parallelId}-sentinel-start` + const parallelSentinelEnd = `parallel-${parallelId}-sentinel-end` + const loopSentinelStart = `loop-${loopId}-sentinel-start` + const loopSentinelEnd = `loop-${loopId}-sentinel-end` + + const workflow = createMockWorkflow( + [ + createMockBlock(parallelId, 'parallel'), + createMockBlock(loopId, 'loop'), + createMockBlock(taskInParallelId), + createMockBlock(taskInLoopId), + ], + [{ source: parallelId, target: loopId }], + { + [loopId]: { + id: loopId, + nodes: [taskInLoopId], + iterations: 3, + loopType: 'for', + } as SerializedLoop, + }, + { + [parallelId]: { id: parallelId, nodes: [taskInParallelId], count: 2 }, + } + ) + + const dag = createMockDAG([ + taskInParallelTemplateId, + taskInLoopId, + parallelSentinelStart, + parallelSentinelEnd, + loopSentinelStart, + loopSentinelEnd, + ]) + dag.parallelConfigs.set(parallelId, { + id: parallelId, + nodes: [taskInParallelId], + count: 2, + }) + dag.loopConfigs.set(loopId, { + id: loopId, + nodes: [taskInLoopId], + iterations: 3, + loopType: 'for', + } as SerializedLoop) + + edgeConstructor.execute( + workflow, + dag, + new Set([taskInParallelId]), + new Set([taskInLoopId]), + new Set([ + taskInParallelTemplateId, + taskInLoopId, + parallelSentinelStart, + parallelSentinelEnd, + loopSentinelStart, + loopSentinelEnd, + ]), + new Map() + ) + + const parallelEndNode = dag.nodes.get(parallelSentinelEnd)! + const edgesToLoop = Array.from(parallelEndNode.outgoingEdges.values()).filter( + (e) => e.target === loopSentinelStart + ) + expect(edgesToLoop.length).toBeGreaterThan(0) + expect(edgesToLoop[0].sourceHandle).toBe('parallel_exit') + }) + }) + + describe('Loop → Parallel', () => { + it('should wire loop sentinel end to parallel sentinel start', () => { + const loopId = 'loop-1' + const parallelId = 'parallel-1' + const taskInLoopId = 'loop-task' + const taskInParallelId = 'parallel-task' + const taskInParallelTemplateId = `${taskInParallelId}__branch-0` + const loopSentinelStart = `loop-${loopId}-sentinel-start` + const loopSentinelEnd = `loop-${loopId}-sentinel-end` + const parallelSentinelStart = `parallel-${parallelId}-sentinel-start` + const parallelSentinelEnd = `parallel-${parallelId}-sentinel-end` + + const workflow = createMockWorkflow( + [ + createMockBlock(loopId, 'loop'), + createMockBlock(parallelId, 'parallel'), + createMockBlock(taskInLoopId), + createMockBlock(taskInParallelId), + ], + [{ source: loopId, target: parallelId }], + { + [loopId]: { + id: loopId, + nodes: [taskInLoopId], + iterations: 3, + loopType: 'for', + } as SerializedLoop, + }, + { + [parallelId]: { id: parallelId, nodes: [taskInParallelId], count: 2 }, + } + ) + + const dag = createMockDAG([ + taskInLoopId, + taskInParallelTemplateId, + loopSentinelStart, + loopSentinelEnd, + parallelSentinelStart, + parallelSentinelEnd, + ]) + dag.loopConfigs.set(loopId, { + id: loopId, + nodes: [taskInLoopId], + iterations: 3, + loopType: 'for', + } as SerializedLoop) + dag.parallelConfigs.set(parallelId, { + id: parallelId, + nodes: [taskInParallelId], + count: 2, + }) + + edgeConstructor.execute( + workflow, + dag, + new Set([taskInParallelId]), + new Set([taskInLoopId]), + new Set([ + taskInLoopId, + taskInParallelTemplateId, + loopSentinelStart, + loopSentinelEnd, + parallelSentinelStart, + parallelSentinelEnd, + ]), + new Map() + ) + + const loopEndNode = dag.nodes.get(loopSentinelEnd)! + const edgesToParallel = Array.from(loopEndNode.outgoingEdges.values()).filter( + (e) => e.target === parallelSentinelStart + ) + expect(edgesToParallel.length).toBeGreaterThan(0) + expect(edgesToParallel[0].sourceHandle).toBe('loop_exit') + + const loopStartNode = dag.nodes.get(loopSentinelStart)! + const earlyExitEdges = Array.from(loopStartNode.outgoingEdges.values()).filter( + (e) => e.target === parallelSentinelStart && e.sourceHandle === 'loop_exit' + ) + expect(earlyExitEdges.length).toBeGreaterThan(0) + }) + }) + + describe('Subflow → Regular block', () => { + it('should wire parallel sentinel end to regular block', () => { + const parallelId = 'parallel-1' + const taskInParallelId = 'parallel-task' + const taskInParallelTemplateId = `${taskInParallelId}__branch-0` + const regularBlockId = 'regular-block' + const parallelSentinelStart = `parallel-${parallelId}-sentinel-start` + const parallelSentinelEnd = `parallel-${parallelId}-sentinel-end` + + const workflow = createMockWorkflow( + [ + createMockBlock(parallelId, 'parallel'), + createMockBlock(taskInParallelId), + createMockBlock(regularBlockId), + ], + [{ source: parallelId, target: regularBlockId }], + {}, + { + [parallelId]: { id: parallelId, nodes: [taskInParallelId], count: 2 }, + } + ) + + const dag = createMockDAG([ + taskInParallelTemplateId, + regularBlockId, + parallelSentinelStart, + parallelSentinelEnd, + ]) + dag.parallelConfigs.set(parallelId, { + id: parallelId, + nodes: [taskInParallelId], + count: 2, + }) + + edgeConstructor.execute( + workflow, + dag, + new Set([taskInParallelId]), + new Set(), + new Set([ + taskInParallelTemplateId, + regularBlockId, + parallelSentinelStart, + parallelSentinelEnd, + ]), + new Map() + ) + + const parallelEndNode = dag.nodes.get(parallelSentinelEnd)! + const edgesToRegular = Array.from(parallelEndNode.outgoingEdges.values()).filter( + (e) => e.target === regularBlockId + ) + expect(edgesToRegular.length).toBe(1) + expect(edgesToRegular[0].sourceHandle).toBe('parallel_exit') + + const regularBlockNode = dag.nodes.get(regularBlockId)! + expect(regularBlockNode.incomingEdges.has(parallelSentinelEnd)).toBe(true) + }) + + it('should wire loop sentinel end to regular block', () => { + const loopId = 'loop-1' + const taskInLoopId = 'loop-task' + const regularBlockId = 'regular-block' + const loopSentinelStart = `loop-${loopId}-sentinel-start` + const loopSentinelEnd = `loop-${loopId}-sentinel-end` + + const workflow = createMockWorkflow( + [ + createMockBlock(loopId, 'loop'), + createMockBlock(taskInLoopId), + createMockBlock(regularBlockId), + ], + [{ source: loopId, target: regularBlockId }], + { + [loopId]: { + id: loopId, + nodes: [taskInLoopId], + iterations: 3, + loopType: 'for', + } as SerializedLoop, + } + ) + + const dag = createMockDAG([ + taskInLoopId, + regularBlockId, + loopSentinelStart, + loopSentinelEnd, + ]) + dag.loopConfigs.set(loopId, { + id: loopId, + nodes: [taskInLoopId], + iterations: 3, + loopType: 'for', + } as SerializedLoop) + + edgeConstructor.execute( + workflow, + dag, + new Set(), + new Set([taskInLoopId]), + new Set([taskInLoopId, regularBlockId, loopSentinelStart, loopSentinelEnd]), + new Map() + ) + + const loopEndNode = dag.nodes.get(loopSentinelEnd)! + const edgesToRegular = Array.from(loopEndNode.outgoingEdges.values()).filter( + (e) => e.target === regularBlockId + ) + expect(edgesToRegular.length).toBe(1) + expect(edgesToRegular[0].sourceHandle).toBe('loop_exit') + }) + }) + + describe('Regular block → Subflow', () => { + it('should wire regular block to parallel sentinel start', () => { + const regularBlockId = 'regular-block' + const parallelId = 'parallel-1' + const taskInParallelId = 'parallel-task' + const taskInParallelTemplateId = `${taskInParallelId}__branch-0` + const parallelSentinelStart = `parallel-${parallelId}-sentinel-start` + const parallelSentinelEnd = `parallel-${parallelId}-sentinel-end` + + const workflow = createMockWorkflow( + [ + createMockBlock(regularBlockId), + createMockBlock(parallelId, 'parallel'), + createMockBlock(taskInParallelId), + ], + [{ source: regularBlockId, target: parallelId }], + {}, + { + [parallelId]: { id: parallelId, nodes: [taskInParallelId], count: 2 }, + } + ) + + const dag = createMockDAG([ + regularBlockId, + taskInParallelTemplateId, + parallelSentinelStart, + parallelSentinelEnd, + ]) + dag.parallelConfigs.set(parallelId, { + id: parallelId, + nodes: [taskInParallelId], + count: 2, + }) + + edgeConstructor.execute( + workflow, + dag, + new Set([taskInParallelId]), + new Set(), + new Set([ + regularBlockId, + taskInParallelTemplateId, + parallelSentinelStart, + parallelSentinelEnd, + ]), + new Map() + ) + + const regularBlockNode = dag.nodes.get(regularBlockId)! + const edgesToParallel = Array.from(regularBlockNode.outgoingEdges.values()).filter( + (e) => e.target === parallelSentinelStart + ) + expect(edgesToParallel.length).toBe(1) + + const parallelStartNode = dag.nodes.get(parallelSentinelStart)! + expect(parallelStartNode.incomingEdges.has(regularBlockId)).toBe(true) + }) + + it('should wire regular block to loop sentinel start', () => { + const regularBlockId = 'regular-block' + const loopId = 'loop-1' + const taskInLoopId = 'loop-task' + const loopSentinelStart = `loop-${loopId}-sentinel-start` + const loopSentinelEnd = `loop-${loopId}-sentinel-end` + + const workflow = createMockWorkflow( + [ + createMockBlock(regularBlockId), + createMockBlock(loopId, 'loop'), + createMockBlock(taskInLoopId), + ], + [{ source: regularBlockId, target: loopId }], + { + [loopId]: { + id: loopId, + nodes: [taskInLoopId], + iterations: 3, + loopType: 'for', + } as SerializedLoop, + } + ) + + const dag = createMockDAG([ + regularBlockId, + taskInLoopId, + loopSentinelStart, + loopSentinelEnd, + ]) + dag.loopConfigs.set(loopId, { + id: loopId, + nodes: [taskInLoopId], + iterations: 3, + loopType: 'for', + } as SerializedLoop) + + edgeConstructor.execute( + workflow, + dag, + new Set(), + new Set([taskInLoopId]), + new Set([regularBlockId, taskInLoopId, loopSentinelStart, loopSentinelEnd]), + new Map() + ) + + const regularBlockNode = dag.nodes.get(regularBlockId)! + const edgesToLoop = Array.from(regularBlockNode.outgoingEdges.values()).filter( + (e) => e.target === loopSentinelStart + ) + expect(edgesToLoop.length).toBe(1) + + const loopStartNode = dag.nodes.get(loopSentinelStart)! + expect(loopStartNode.incomingEdges.has(regularBlockId)).toBe(true) + }) + }) + }) }) diff --git a/apps/sim/executor/dag/construction/edges.ts b/apps/sim/executor/dag/construction/edges.ts index e41d9a64b..f3021ab6f 100644 --- a/apps/sim/executor/dag/construction/edges.ts +++ b/apps/sim/executor/dag/construction/edges.ts @@ -3,11 +3,11 @@ import { EDGE, isConditionBlockType, isRouterBlockType } from '@/executor/consta import type { DAG } from '@/executor/dag/builder' import { buildBranchNodeId, + buildParallelSentinelEndId, + buildParallelSentinelStartId, buildSentinelEndId, buildSentinelStartId, - calculateBranchCount, extractBaseBlockId, - parseDistributionItems, } from '@/executor/utils/subflow-utils' import type { SerializedWorkflow } from '@/serializer/types' @@ -51,7 +51,7 @@ export class EdgeConstructor { ) this.wireLoopSentinels(dag, reachableBlocks) - this.wireParallelBlocks(workflow, dag, loopBlockIds, parallelBlockIds, pauseTriggerMapping) + this.wireParallelSentinels(dag) } private buildMetadataMaps(workflow: SerializedWorkflow): EdgeMetadata { @@ -157,43 +157,45 @@ export class EdgeConstructor { const sourceIsParallelBlock = parallelBlockIds.has(source) const targetIsParallelBlock = parallelBlockIds.has(target) - if ( - sourceIsLoopBlock || - targetIsLoopBlock || - sourceIsParallelBlock || - targetIsParallelBlock - ) { - let loopSentinelStartId: string | undefined + let loopSentinelStartId: string | undefined - if (sourceIsLoopBlock) { - const sentinelEndId = buildSentinelEndId(originalSource) - loopSentinelStartId = buildSentinelStartId(originalSource) - - if (!dag.nodes.has(sentinelEndId) || !dag.nodes.has(loopSentinelStartId)) { - continue - } - - source = sentinelEndId - sourceHandle = EDGE.LOOP_EXIT - } - - if (targetIsLoopBlock) { - const sentinelStartId = buildSentinelStartId(target) - - if (!dag.nodes.has(sentinelStartId)) { - continue - } - - target = sentinelStartId - } - - if (loopSentinelStartId) { - this.addEdge(dag, loopSentinelStartId, target, EDGE.LOOP_EXIT, targetHandle) - } - - if (sourceIsParallelBlock || targetIsParallelBlock) { + if (sourceIsLoopBlock) { + const sentinelEndId = buildSentinelEndId(originalSource) + loopSentinelStartId = buildSentinelStartId(originalSource) + if (!dag.nodes.has(sentinelEndId) || !dag.nodes.has(loopSentinelStartId)) { continue } + source = sentinelEndId + sourceHandle = EDGE.LOOP_EXIT + } + + if (targetIsLoopBlock) { + const sentinelStartId = buildSentinelStartId(target) + if (!dag.nodes.has(sentinelStartId)) { + continue + } + target = sentinelStartId + } + + if (sourceIsParallelBlock) { + const sentinelEndId = buildParallelSentinelEndId(originalSource) + if (!dag.nodes.has(sentinelEndId)) { + continue + } + source = sentinelEndId + sourceHandle = EDGE.PARALLEL_EXIT + } + + if (targetIsParallelBlock) { + const sentinelStartId = buildParallelSentinelStartId(target) + if (!dag.nodes.has(sentinelStartId)) { + continue + } + target = sentinelStartId + } + + if (loopSentinelStartId) { + this.addEdge(dag, loopSentinelStartId, target, EDGE.LOOP_EXIT, targetHandle) } if (this.edgeCrossesLoopBoundary(source, target, blocksInLoops, dag)) { @@ -209,19 +211,12 @@ export class EdgeConstructor { const targetParallelId = this.getParallelId(target, dag) if (sourceParallelId === targetParallelId) { - this.wireParallelInternalEdge( - source, - target, - sourceParallelId!, - dag, - sourceHandle, - targetHandle, - pauseTriggerMapping - ) + this.wireParallelTemplateEdge(source, target, dag, sourceHandle, targetHandle) } else { logger.warn('Edge between different parallels - invalid workflow', { source, target }) } } else if (blocksInParallels.has(source) || blocksInParallels.has(target)) { + // Skip - will be handled by sentinel wiring } else { const resolvedSource = pauseTriggerMapping.get(originalSource) ?? source this.addEdge(dag, resolvedSource, target, sourceHandle, targetHandle) @@ -256,81 +251,32 @@ export class EdgeConstructor { } } - private wireParallelBlocks( - workflow: SerializedWorkflow, - dag: DAG, - loopBlockIds: Set, - parallelBlockIds: Set, - pauseTriggerMapping: Map - ): void { + private wireParallelSentinels(dag: DAG): void { for (const [parallelId, parallelConfig] of dag.parallelConfigs) { const nodes = parallelConfig.nodes if (nodes.length === 0) continue - const { entryNodes, terminalNodes, branchCount } = this.findParallelBoundaryNodes( - nodes, - parallelId, - dag - ) + const sentinelStartId = buildParallelSentinelStartId(parallelId) + const sentinelEndId = buildParallelSentinelEndId(parallelId) - logger.info('Wiring parallel block edges', { - parallelId, - entryNodes, - terminalNodes, - branchCount, - }) + if (!dag.nodes.has(sentinelStartId) || !dag.nodes.has(sentinelEndId)) { + continue + } - for (const connection of workflow.connections) { - const { source, target, sourceHandle, targetHandle } = connection + const { entryNodes, terminalNodes } = this.findParallelBoundaryNodes(nodes, dag) - if (target === parallelId) { - if (loopBlockIds.has(source) || parallelBlockIds.has(source)) continue - - if (nodes.includes(source)) { - logger.warn('Invalid: parallel block connected from its own internal node', { - parallelId, - source, - }) - continue - } - - logger.info('Wiring edge to parallel block', { source, parallelId, entryNodes }) - - for (const entryNodeId of entryNodes) { - for (let i = 0; i < branchCount; i++) { - const branchNodeId = buildBranchNodeId(entryNodeId, i) - - if (dag.nodes.has(branchNodeId)) { - this.addEdge(dag, source, branchNodeId, sourceHandle, targetHandle) - } - } - } + for (const entryNodeId of entryNodes) { + const templateNodeId = buildBranchNodeId(entryNodeId, 0) + if (dag.nodes.has(templateNodeId)) { + this.addEdge(dag, sentinelStartId, templateNodeId) } + } - if (source === parallelId) { - if (loopBlockIds.has(target) || parallelBlockIds.has(target)) continue - - if (nodes.includes(target)) { - logger.warn('Invalid: parallel block connected to its own internal node', { - parallelId, - target, - }) - continue - } - - logger.info('Wiring edge from parallel block', { parallelId, target, terminalNodes }) - - for (const terminalNodeId of terminalNodes) { - for (let i = 0; i < branchCount; i++) { - const branchNodeId = buildBranchNodeId(terminalNodeId, i) - - if (dag.nodes.has(branchNodeId)) { - const resolvedSourceId = pauseTriggerMapping.get(branchNodeId) ?? branchNodeId - this.addEdge(dag, resolvedSourceId, target, sourceHandle, targetHandle) - } - } - } + for (const terminalNodeId of terminalNodes) { + const templateNodeId = buildBranchNodeId(terminalNodeId, 0) + if (dag.nodes.has(templateNodeId)) { + this.addEdge(dag, templateNodeId, sentinelEndId) } } } @@ -384,30 +330,16 @@ export class EdgeConstructor { return true } - private wireParallelInternalEdge( + private wireParallelTemplateEdge( source: string, target: string, - parallelId: string, dag: DAG, sourceHandle?: string, - targetHandle?: string, - pauseTriggerMapping?: Map + targetHandle?: string ): void { - const parallelConfig = dag.parallelConfigs.get(parallelId) - - if (!parallelConfig) { - throw new Error(`Parallel config not found: ${parallelId}`) - } - - const distributionItems = parseDistributionItems(parallelConfig) - const count = calculateBranchCount(parallelConfig, distributionItems) - - for (let i = 0; i < count; i++) { - const sourceNodeId = buildBranchNodeId(source, i) - const targetNodeId = buildBranchNodeId(target, i) - const resolvedSourceId = pauseTriggerMapping?.get(sourceNodeId) ?? sourceNodeId - this.addEdge(dag, resolvedSourceId, targetNodeId, sourceHandle, targetHandle) - } + const sourceNodeId = buildBranchNodeId(source, 0) + const targetNodeId = buildBranchNodeId(target, 0) + this.addEdge(dag, sourceNodeId, targetNodeId, sourceHandle, targetHandle) } private findLoopBoundaryNodes( @@ -465,92 +397,44 @@ export class EdgeConstructor { private findParallelBoundaryNodes( nodes: string[], - parallelId: string, dag: DAG - ): { entryNodes: string[]; terminalNodes: string[]; branchCount: number } { + ): { entryNodes: string[]; terminalNodes: string[] } { const nodesSet = new Set(nodes) - const entryNodesSet = new Set() - const terminalNodesSet = new Set() - const parallelConfig = dag.parallelConfigs.get(parallelId) - - if (!parallelConfig) { - throw new Error(`Parallel config not found: ${parallelId}`) - } - - const distributionItems = parseDistributionItems(parallelConfig) - const branchCount = calculateBranchCount(parallelConfig, distributionItems) + const entryNodes: string[] = [] + const terminalNodes: string[] = [] for (const nodeId of nodes) { - let hasAnyBranch = false + const templateId = buildBranchNodeId(nodeId, 0) + const templateNode = dag.nodes.get(templateId) - for (let i = 0; i < branchCount; i++) { - if (dag.nodes.has(buildBranchNodeId(nodeId, i))) { - hasAnyBranch = true - break - } - } - - if (!hasAnyBranch) continue - - const firstBranchId = buildBranchNodeId(nodeId, 0) - const firstBranchNode = dag.nodes.get(firstBranchId) - - if (!firstBranchNode) continue + if (!templateNode) continue let hasIncomingFromParallel = false - - for (const incomingNodeId of firstBranchNode.incomingEdges) { + for (const incomingNodeId of templateNode.incomingEdges) { const originalNodeId = extractBaseBlockId(incomingNodeId) - if (nodesSet.has(originalNodeId)) { hasIncomingFromParallel = true break } } - if (!hasIncomingFromParallel) { - entryNodesSet.add(nodeId) + entryNodes.push(nodeId) } - } - - for (const nodeId of nodes) { - let hasAnyBranch = false - - for (let i = 0; i < branchCount; i++) { - if (dag.nodes.has(buildBranchNodeId(nodeId, i))) { - hasAnyBranch = true - break - } - } - - if (!hasAnyBranch) continue - - const firstBranchId = buildBranchNodeId(nodeId, 0) - const firstBranchNode = dag.nodes.get(firstBranchId) - - if (!firstBranchNode) continue let hasOutgoingToParallel = false - - for (const [_, edge] of firstBranchNode.outgoingEdges) { + for (const [, edge] of templateNode.outgoingEdges) { const originalTargetId = extractBaseBlockId(edge.target) - if (nodesSet.has(originalTargetId)) { hasOutgoingToParallel = true break } } - if (!hasOutgoingToParallel) { - terminalNodesSet.add(nodeId) + terminalNodes.push(nodeId) } } - return { - entryNodes: Array.from(entryNodesSet), - terminalNodes: Array.from(terminalNodesSet), - branchCount, - } + return { entryNodes, terminalNodes } } private getParallelId(blockId: string, dag: DAG): string | null { diff --git a/apps/sim/executor/dag/construction/nodes.ts b/apps/sim/executor/dag/construction/nodes.ts index b4ceeb1ae..42c575600 100644 --- a/apps/sim/executor/dag/construction/nodes.ts +++ b/apps/sim/executor/dag/construction/nodes.ts @@ -1,18 +1,8 @@ import { BlockType, isMetadataOnlyBlockType } from '@/executor/constants' -import type { DAG, DAGNode } from '@/executor/dag/builder' -import { - buildBranchNodeId, - calculateBranchCount, - parseDistributionItems, -} from '@/executor/utils/subflow-utils' +import type { DAG } from '@/executor/dag/builder' +import { buildBranchNodeId } from '@/executor/utils/subflow-utils' import type { SerializedBlock, SerializedWorkflow } from '@/serializer/types' -interface ParallelExpansion { - parallelId: string - branchCount: number - distributionItems: any[] -} - export class NodeConstructor { execute( workflow: SerializedWorkflow, @@ -37,7 +27,7 @@ export class NodeConstructor { const parallelId = this.findParallelForBlock(block.id, dag) if (parallelId) { - this.createParallelBranchNodes(block, parallelId, dag) + this.createParallelTemplateNode(block, parallelId, dag) } else { this.createRegularOrLoopNode(block, blocksInLoops, dag) } @@ -100,57 +90,27 @@ export class NodeConstructor { } } - private createParallelBranchNodes(block: SerializedBlock, parallelId: string, dag: DAG): void { - const expansion = this.calculateParallelExpansion(parallelId, dag) - - for (let branchIndex = 0; branchIndex < expansion.branchCount; branchIndex++) { - const branchNode = this.createParallelBranchNode(block, branchIndex, expansion) - dag.nodes.set(branchNode.id, branchNode) - } - } - - private calculateParallelExpansion(parallelId: string, dag: DAG): ParallelExpansion { - const config = dag.parallelConfigs.get(parallelId) - - if (!config) { - throw new Error(`Parallel config not found: ${parallelId}`) - } - - const distributionItems = parseDistributionItems(config) - const branchCount = calculateBranchCount(config, distributionItems) - - return { - parallelId, - branchCount, - distributionItems, - } - } - - private createParallelBranchNode( - baseBlock: SerializedBlock, - branchIndex: number, - expansion: ParallelExpansion - ): DAGNode { - const branchNodeId = buildBranchNodeId(baseBlock.id, branchIndex) + private createParallelTemplateNode(block: SerializedBlock, parallelId: string, dag: DAG): void { + const templateNodeId = buildBranchNodeId(block.id, 0) const blockClone: SerializedBlock = { - ...baseBlock, - id: branchNodeId, + ...block, + id: templateNodeId, } - return { - id: branchNodeId, + + dag.nodes.set(templateNodeId, { + id: templateNodeId, block: blockClone, incomingEdges: new Set(), outgoingEdges: new Map(), metadata: { isParallelBranch: true, - parallelId: expansion.parallelId, - branchIndex, - branchTotal: expansion.branchCount, - distributionItem: expansion.distributionItems[branchIndex], - isPauseResponse: baseBlock.metadata?.id === BlockType.HUMAN_IN_THE_LOOP, - originalBlockId: baseBlock.id, + parallelId, + branchIndex: 0, + branchTotal: 1, + isPauseResponse: block.metadata?.id === BlockType.HUMAN_IN_THE_LOOP, + originalBlockId: block.id, }, - } + }) } private createRegularOrLoopNode( @@ -176,44 +136,6 @@ export class NodeConstructor { }) } - private createTriggerNode( - block: SerializedBlock, - triggerId: string, - options: { - loopId?: string - isParallelBranch?: boolean - parallelId?: string - branchIndex?: number - branchTotal?: number - } - ): DAGNode { - const triggerBlock: SerializedBlock = { - ...block, - id: triggerId, - enabled: true, - metadata: { - ...block.metadata, - id: BlockType.START_TRIGGER, - }, - } - - return { - id: triggerId, - block: triggerBlock, - incomingEdges: new Set(), - outgoingEdges: new Map(), - metadata: { - isResumeTrigger: true, - originalBlockId: block.id, - loopId: options.loopId, - isParallelBranch: options.isParallelBranch, - parallelId: options.parallelId, - branchIndex: options.branchIndex, - branchTotal: options.branchTotal, - }, - } - } - private findLoopIdForBlock(blockId: string, dag: DAG): string | undefined { for (const [loopId, loopConfig] of dag.loopConfigs) { if (loopConfig.nodes.includes(blockId)) { diff --git a/apps/sim/executor/dag/construction/parallels.ts b/apps/sim/executor/dag/construction/parallels.ts new file mode 100644 index 000000000..cbb55a7a0 --- /dev/null +++ b/apps/sim/executor/dag/construction/parallels.ts @@ -0,0 +1,88 @@ +import { createLogger } from '@sim/logger' +import { BlockType, PARALLEL, type SentinelType } from '@/executor/constants' +import type { DAG, DAGNode } from '@/executor/dag/builder' +import { + buildParallelSentinelEndId, + buildParallelSentinelStartId, +} from '@/executor/utils/subflow-utils' + +const logger = createLogger('ParallelConstructor') + +export class ParallelConstructor { + execute(dag: DAG, reachableBlocks: Set): void { + for (const [parallelId, parallelConfig] of dag.parallelConfigs) { + const parallelNodes = parallelConfig.nodes + + if (parallelNodes.length === 0) { + continue + } + + if (!this.hasReachableNodes(parallelNodes, reachableBlocks)) { + continue + } + + this.createSentinelPair(dag, parallelId) + } + } + + private hasReachableNodes(parallelNodes: string[], reachableBlocks: Set): boolean { + return parallelNodes.some((nodeId) => reachableBlocks.has(nodeId)) + } + + private createSentinelPair(dag: DAG, parallelId: string): void { + const startId = buildParallelSentinelStartId(parallelId) + const endId = buildParallelSentinelEndId(parallelId) + + dag.nodes.set( + startId, + this.createSentinelNode({ + id: startId, + parallelId, + sentinelType: PARALLEL.SENTINEL.START_TYPE, + blockType: BlockType.SENTINEL_START, + name: `${PARALLEL.SENTINEL.START_NAME_PREFIX} (${parallelId})`, + }) + ) + + dag.nodes.set( + endId, + this.createSentinelNode({ + id: endId, + parallelId, + sentinelType: PARALLEL.SENTINEL.END_TYPE, + blockType: BlockType.SENTINEL_END, + name: `${PARALLEL.SENTINEL.END_NAME_PREFIX} (${parallelId})`, + }) + ) + } + + private createSentinelNode(config: { + id: string + parallelId: string + sentinelType: SentinelType + blockType: BlockType + name: string + }): DAGNode { + return { + id: config.id, + block: { + id: config.id, + enabled: true, + metadata: { + id: config.blockType, + name: config.name, + parallelId: config.parallelId, + }, + config: { params: {} }, + } as any, + incomingEdges: new Set(), + outgoingEdges: new Map(), + metadata: { + isSentinel: true, + isParallelSentinel: true, + sentinelType: config.sentinelType, + parallelId: config.parallelId, + }, + } + } +} diff --git a/apps/sim/executor/dag/types.ts b/apps/sim/executor/dag/types.ts index cbd1374d4..2bedb57f3 100644 --- a/apps/sim/executor/dag/types.ts +++ b/apps/sim/executor/dag/types.ts @@ -7,6 +7,7 @@ export interface DAGEdge { export interface NodeMetadata { isParallelBranch?: boolean + isParallelSentinel?: boolean parallelId?: string branchIndex?: number branchTotal?: number diff --git a/apps/sim/executor/execution/edge-manager.ts b/apps/sim/executor/execution/edge-manager.ts index e4451506a..0a7395aad 100644 --- a/apps/sim/executor/execution/edge-manager.ts +++ b/apps/sim/executor/execution/edge-manager.ts @@ -129,6 +129,10 @@ export class EdgeManager { return handle === EDGE.LOOP_CONTINUE || handle === EDGE.LOOP_CONTINUE_ALT } + if (output.selectedRoute === EDGE.PARALLEL_EXIT) { + return handle === EDGE.PARALLEL_EXIT + } + if (!handle) { return true } diff --git a/apps/sim/executor/orchestrators/node.ts b/apps/sim/executor/orchestrators/node.ts index d6721ad94..e5d7bc1a1 100644 --- a/apps/sim/executor/orchestrators/node.ts +++ b/apps/sim/executor/orchestrators/node.ts @@ -53,18 +53,11 @@ export class NodeExecutionOrchestrator { } } - // Initialize parallel scope BEFORE execution so can be resolved const parallelId = node.metadata.parallelId if (parallelId && !this.parallelOrchestrator.getParallelScope(ctx, parallelId)) { - const totalBranches = node.metadata.branchTotal || 1 const parallelConfig = this.dag.parallelConfigs.get(parallelId) - const nodesInParallel = (parallelConfig as any)?.nodes?.length || 1 - this.parallelOrchestrator.initializeParallelScope( - ctx, - parallelId, - totalBranches, - nodesInParallel - ) + const nodesInParallel = parallelConfig?.nodes?.length || 1 + this.parallelOrchestrator.initializeParallelScope(ctx, parallelId, nodesInParallel) } if (node.metadata.isSentinel) { @@ -92,6 +85,12 @@ export class NodeExecutionOrchestrator { ): Promise { const sentinelType = node.metadata.sentinelType const loopId = node.metadata.loopId + const parallelId = node.metadata.parallelId + const isParallelSentinel = node.metadata.isParallelSentinel + + if (isParallelSentinel) { + return this.handleParallelSentinel(ctx, node, sentinelType, parallelId) + } switch (sentinelType) { case 'start': { @@ -140,6 +139,42 @@ export class NodeExecutionOrchestrator { } } + private handleParallelSentinel( + ctx: ExecutionContext, + node: DAGNode, + sentinelType: string | undefined, + parallelId: string | undefined + ): NormalizedBlockOutput { + if (!parallelId) { + logger.warn('Parallel sentinel called without parallelId') + return {} + } + + if (sentinelType === 'start') { + if (!this.parallelOrchestrator.getParallelScope(ctx, parallelId)) { + const parallelConfig = this.dag.parallelConfigs.get(parallelId) + if (parallelConfig) { + const nodesInParallel = parallelConfig.nodes?.length || 1 + this.parallelOrchestrator.initializeParallelScope(ctx, parallelId, nodesInParallel) + } + } + return { sentinelStart: true } + } + + if (sentinelType === 'end') { + const result = this.parallelOrchestrator.aggregateParallelResults(ctx, parallelId) + return { + results: result.results || [], + sentinelEnd: true, + selectedRoute: EDGE.PARALLEL_EXIT, + totalBranches: result.totalBranches, + } + } + + logger.warn('Unknown parallel sentinel type', { sentinelType }) + return {} + } + async handleNodeCompletion( ctx: ExecutionContext, nodeId: string, @@ -188,15 +223,9 @@ export class NodeExecutionOrchestrator { ): void { const scope = this.parallelOrchestrator.getParallelScope(ctx, parallelId) if (!scope) { - const totalBranches = node.metadata.branchTotal || 1 const parallelConfig = this.dag.parallelConfigs.get(parallelId) - const nodesInParallel = (parallelConfig as any)?.nodes?.length || 1 - this.parallelOrchestrator.initializeParallelScope( - ctx, - parallelId, - totalBranches, - nodesInParallel - ) + const nodesInParallel = parallelConfig?.nodes?.length || 1 + this.parallelOrchestrator.initializeParallelScope(ctx, parallelId, nodesInParallel) } const allComplete = this.parallelOrchestrator.handleParallelBranchCompletion( ctx, diff --git a/apps/sim/executor/orchestrators/parallel.ts b/apps/sim/executor/orchestrators/parallel.ts index ad702a376..ef17d624a 100644 --- a/apps/sim/executor/orchestrators/parallel.ts +++ b/apps/sim/executor/orchestrators/parallel.ts @@ -1,17 +1,14 @@ import { createLogger } from '@sim/logger' import { DEFAULTS } from '@/executor/constants' -import type { DAG, DAGNode } from '@/executor/dag/builder' +import type { DAG } from '@/executor/dag/builder' import type { ParallelScope } from '@/executor/execution/state' import type { BlockStateWriter, ContextExtensions } from '@/executor/execution/types' import type { ExecutionContext, NormalizedBlockOutput } from '@/executor/types' import type { ParallelConfigWithNodes } from '@/executor/types/parallel' +import { ParallelExpander } from '@/executor/utils/parallel-expansion' import { addSubflowErrorLog, - buildBranchNodeId, - calculateBranchCount, - extractBaseBlockId, extractBranchIndex, - parseDistributionItems, resolveArrayInput, validateMaxCount, } from '@/executor/utils/subflow-utils' @@ -37,6 +34,7 @@ export interface ParallelAggregationResult { export class ParallelOrchestrator { private resolver: VariableResolver | null = null private contextExtensions: ContextExtensions | null = null + private expander = new ParallelExpander() constructor( private dag: DAG, @@ -54,100 +52,95 @@ export class ParallelOrchestrator { initializeParallelScope( ctx: ExecutionContext, parallelId: string, - totalBranches: number, terminalNodesCount = 1 ): ParallelScope { const parallelConfig = this.dag.parallelConfigs.get(parallelId) - - let items: any[] | undefined - if (parallelConfig) { - try { - items = this.resolveDistributionItems(ctx, parallelConfig) - } catch (error) { - const errorMessage = `Parallel Items did not resolve: ${error instanceof Error ? error.message : String(error)}` - logger.error(errorMessage, { - parallelId, - distribution: parallelConfig.distribution, - }) - this.addParallelErrorLog(ctx, parallelId, errorMessage, { - distribution: parallelConfig.distribution, - }) - this.setErrorScope(ctx, parallelId, errorMessage) - throw new Error(errorMessage) - } + if (!parallelConfig) { + throw new Error(`Parallel config not found: ${parallelId}`) } - const actualBranchCount = items && items.length > totalBranches ? items.length : totalBranches + let items: any[] | undefined + let branchCount: number + + try { + const resolved = this.resolveBranchCount(ctx, parallelConfig) + branchCount = resolved.branchCount + items = resolved.items + } catch (error) { + const errorMessage = `Parallel Items did not resolve: ${error instanceof Error ? error.message : String(error)}` + logger.error(errorMessage, { parallelId, distribution: parallelConfig.distribution }) + this.addParallelErrorLog(ctx, parallelId, errorMessage, { + distribution: parallelConfig.distribution, + }) + this.setErrorScope(ctx, parallelId, errorMessage) + throw new Error(errorMessage) + } const branchError = validateMaxCount( - actualBranchCount, + branchCount, DEFAULTS.MAX_PARALLEL_BRANCHES, 'Parallel branch count' ) if (branchError) { - logger.error(branchError, { parallelId, actualBranchCount }) + logger.error(branchError, { parallelId, branchCount }) this.addParallelErrorLog(ctx, parallelId, branchError, { - distribution: parallelConfig?.distribution, - branchCount: actualBranchCount, + distribution: parallelConfig.distribution, + branchCount, }) this.setErrorScope(ctx, parallelId, branchError) throw new Error(branchError) } + const { entryNodes } = this.expander.expandParallel(this.dag, parallelId, branchCount, items) + const scope: ParallelScope = { parallelId, - totalBranches: actualBranchCount, + totalBranches: branchCount, branchOutputs: new Map(), completedCount: 0, - totalExpectedNodes: actualBranchCount * terminalNodesCount, + totalExpectedNodes: branchCount * terminalNodesCount, items, } + if (!ctx.parallelExecutions) { ctx.parallelExecutions = new Map() } ctx.parallelExecutions.set(parallelId, scope) - // Dynamically expand DAG if needed - if (items && items.length > totalBranches && parallelConfig) { - logger.info('Dynamically expanding parallel branches', { - parallelId, - existingBranches: totalBranches, - targetBranches: items.length, - itemsCount: items.length, - }) - - const newEntryNodes = this.expandParallelBranches( - parallelId, - parallelConfig, - totalBranches, - items.length - ) - - logger.info('Parallel expansion complete', { - parallelId, - newEntryNodes, - totalNodesInDag: this.dag.nodes.size, - }) - - // Add new entry nodes to pending dynamic nodes so the engine can schedule them - if (newEntryNodes.length > 0) { - if (!ctx.pendingDynamicNodes) { - ctx.pendingDynamicNodes = [] - } - ctx.pendingDynamicNodes.push(...newEntryNodes) + const newEntryNodes = entryNodes.filter((nodeId) => !nodeId.endsWith('__branch-0')) + if (newEntryNodes.length > 0) { + if (!ctx.pendingDynamicNodes) { + ctx.pendingDynamicNodes = [] } - } else { - logger.info('No parallel expansion needed', { - parallelId, - itemsLength: items?.length, - totalBranches, - hasParallelConfig: !!parallelConfig, - }) + ctx.pendingDynamicNodes.push(...newEntryNodes) } + logger.info('Parallel scope initialized', { + parallelId, + branchCount, + entryNodeCount: entryNodes.length, + newEntryNodes: newEntryNodes.length, + }) + return scope } + private resolveBranchCount( + ctx: ExecutionContext, + config: SerializedParallel + ): { branchCount: number; items?: any[] } { + if (config.parallelType === 'count') { + return { branchCount: config.count ?? 1 } + } + + const items = this.resolveDistributionItems(ctx, config) + if (items.length === 0) { + return { branchCount: config.count ?? 1 } + } + + return { branchCount: items.length, items } + } + private addParallelErrorLog( ctx: ExecutionContext, parallelId: string, @@ -180,189 +173,6 @@ export class ParallelOrchestrator { ctx.parallelExecutions.set(parallelId, scope) } - /** - * Dynamically expand the DAG to include additional branch nodes when - * the resolved item count exceeds the pre-built branch count. - */ - private expandParallelBranches( - parallelId: string, - config: SerializedParallel, - existingBranchCount: number, - targetBranchCount: number - ): string[] { - // Get all blocks that are part of this parallel - const blocksInParallel = config.nodes - const blocksInParallelSet = new Set(blocksInParallel) - - // Step 1: Create all new nodes first - for (const blockId of blocksInParallel) { - const branch0NodeId = buildBranchNodeId(blockId, 0) - const templateNode = this.dag.nodes.get(branch0NodeId) - - if (!templateNode) { - logger.warn('Template node not found for parallel expansion', { blockId, branch0NodeId }) - continue - } - - for (let branchIndex = existingBranchCount; branchIndex < targetBranchCount; branchIndex++) { - const newNodeId = buildBranchNodeId(blockId, branchIndex) - - const newNode: DAGNode = { - id: newNodeId, - block: { - ...templateNode.block, - id: newNodeId, - }, - incomingEdges: new Set(), - outgoingEdges: new Map(), - metadata: { - ...templateNode.metadata, - branchIndex, - branchTotal: targetBranchCount, - originalBlockId: blockId, - }, - } - - this.dag.nodes.set(newNodeId, newNode) - } - } - - // Step 2: Wire edges between the new branch nodes - this.wireExpandedBranchEdges( - parallelId, - blocksInParallel, - existingBranchCount, - targetBranchCount - ) - - // Step 3: Update metadata on existing nodes to reflect new total - this.updateExistingBranchMetadata(blocksInParallel, existingBranchCount, targetBranchCount) - - // Step 4: Identify entry nodes AFTER edges are wired - // Entry nodes are those with no INTERNAL incoming edges (edges from outside parallel don't count) - const newEntryNodes: string[] = [] - for (const blockId of blocksInParallel) { - const branch0NodeId = buildBranchNodeId(blockId, 0) - const templateNode = this.dag.nodes.get(branch0NodeId) - if (!templateNode) continue - - // Check if template has any INTERNAL incoming edges - let hasInternalIncoming = false - for (const incomingId of templateNode.incomingEdges) { - const baseIncomingId = extractBaseBlockId(incomingId) - if (blocksInParallelSet.has(baseIncomingId)) { - hasInternalIncoming = true - break - } - } - - // If no internal incoming edges, the new branches of this block are entry nodes - if (!hasInternalIncoming) { - for ( - let branchIndex = existingBranchCount; - branchIndex < targetBranchCount; - branchIndex++ - ) { - newEntryNodes.push(buildBranchNodeId(blockId, branchIndex)) - } - } - } - - return newEntryNodes - } - - /** - * Wire edges between expanded branch nodes by replicating the edge pattern from branch 0. - * Handles both internal edges (within the parallel) and exit edges (to blocks after the parallel). - */ - private wireExpandedBranchEdges( - parallelId: string, - blocksInParallel: string[], - existingBranchCount: number, - targetBranchCount: number - ): void { - const blocksInParallelSet = new Set(blocksInParallel) - - // For each block, look at branch 0's outgoing edges and replicate for new branches - for (const blockId of blocksInParallel) { - const branch0NodeId = buildBranchNodeId(blockId, 0) - const branch0Node = this.dag.nodes.get(branch0NodeId) - - if (!branch0Node) continue - - // Replicate outgoing edges for each new branch - for (const [, edge] of branch0Node.outgoingEdges) { - // Use edge.target (the actual target node ID), not the Map key which may be a formatted edge ID - const actualTargetNodeId = edge.target - - // Extract the base target block ID - const baseTargetId = extractBaseBlockId(actualTargetNodeId) - - // Check if target is inside or outside the parallel - const isInternalEdge = blocksInParallelSet.has(baseTargetId) - - for ( - let branchIndex = existingBranchCount; - branchIndex < targetBranchCount; - branchIndex++ - ) { - const sourceNodeId = buildBranchNodeId(blockId, branchIndex) - const sourceNode = this.dag.nodes.get(sourceNodeId) - - if (!sourceNode) continue - - if (isInternalEdge) { - // Internal edge: wire to the corresponding branch of the target - const newTargetNodeId = buildBranchNodeId(baseTargetId, branchIndex) - const targetNode = this.dag.nodes.get(newTargetNodeId) - - if (targetNode) { - sourceNode.outgoingEdges.set(newTargetNodeId, { - target: newTargetNodeId, - sourceHandle: edge.sourceHandle, - targetHandle: edge.targetHandle, - }) - targetNode.incomingEdges.add(sourceNodeId) - } - } else { - // Exit edge: wire to the same external target (blocks after the parallel) - // All branches point to the same external node - const externalTargetNode = this.dag.nodes.get(actualTargetNodeId) - - if (externalTargetNode) { - sourceNode.outgoingEdges.set(actualTargetNodeId, { - target: actualTargetNodeId, - sourceHandle: edge.sourceHandle, - targetHandle: edge.targetHandle, - }) - // Add incoming edge from this new branch to the external node - externalTargetNode.incomingEdges.add(sourceNodeId) - } - } - } - } - } - } - - /** - * Update existing branch nodes' metadata to reflect the new total branch count. - */ - private updateExistingBranchMetadata( - blocksInParallel: string[], - existingBranchCount: number, - targetBranchCount: number - ): void { - for (const blockId of blocksInParallel) { - for (let branchIndex = 0; branchIndex < existingBranchCount; branchIndex++) { - const nodeId = buildBranchNodeId(blockId, branchIndex) - const node = this.dag.nodes.get(nodeId) - if (node) { - node.metadata.branchTotal = targetBranchCount - } - } - } - } - private resolveDistributionItems(ctx: ExecutionContext, config: SerializedParallel): any[] { if (config.parallelType === 'count') { return [] @@ -429,28 +239,25 @@ export class ParallelOrchestrator { } } extractBranchMetadata(nodeId: string): ParallelBranchMetadata | null { + const node = this.dag.nodes.get(nodeId) + if (!node?.metadata.isParallelBranch) { + return null + } + const branchIndex = extractBranchIndex(nodeId) if (branchIndex === null) { return null } - const baseId = extractBaseBlockId(nodeId) - const parallelId = this.findParallelIdForNode(baseId) + const parallelId = node.metadata.parallelId if (!parallelId) { return null } - const parallelConfig = this.dag.parallelConfigs.get(parallelId) - if (!parallelConfig) { - return null - } - const { totalBranches, distributionItem } = this.getParallelConfigInfo( - parallelConfig, - branchIndex - ) + return { branchIndex, - branchTotal: totalBranches, - distributionItem, + branchTotal: node.metadata.branchTotal ?? 1, + distributionItem: node.metadata.distributionItem, parallelId, } } @@ -468,18 +275,4 @@ export class ParallelOrchestrator { } return undefined } - - private getParallelConfigInfo( - parallelConfig: SerializedParallel, - branchIndex: number - ): { totalBranches: number; distributionItem?: any } { - const distributionItems = parseDistributionItems(parallelConfig) - const totalBranches = calculateBranchCount(parallelConfig, distributionItems) - - let distributionItem: any - if (Array.isArray(distributionItems) && branchIndex < distributionItems.length) { - distributionItem = distributionItems[branchIndex] - } - return { totalBranches, distributionItem } - } } diff --git a/apps/sim/executor/utils/parallel-expansion.ts b/apps/sim/executor/utils/parallel-expansion.ts new file mode 100644 index 000000000..00c458353 --- /dev/null +++ b/apps/sim/executor/utils/parallel-expansion.ts @@ -0,0 +1,255 @@ +import { createLogger } from '@sim/logger' +import type { DAG, DAGNode } from '@/executor/dag/builder' +import type { SerializedBlock } from '@/serializer/types' +import { + buildBranchNodeId, + buildParallelSentinelEndId, + buildParallelSentinelStartId, + extractBaseBlockId, +} from './subflow-utils' + +const logger = createLogger('ParallelExpansion') + +export interface ExpansionResult { + entryNodes: string[] + terminalNodes: string[] + allBranchNodes: string[] +} + +export class ParallelExpander { + expandParallel( + dag: DAG, + parallelId: string, + branchCount: number, + distributionItems?: any[] + ): ExpansionResult { + const config = dag.parallelConfigs.get(parallelId) + if (!config) { + throw new Error(`Parallel config not found: ${parallelId}`) + } + + const blocksInParallel = config.nodes || [] + if (blocksInParallel.length === 0) { + return { entryNodes: [], terminalNodes: [], allBranchNodes: [] } + } + + const blocksSet = new Set(blocksInParallel) + const allBranchNodes: string[] = [] + + for (const blockId of blocksInParallel) { + const templateId = buildBranchNodeId(blockId, 0) + const templateNode = dag.nodes.get(templateId) + + if (!templateNode) { + logger.warn('Template node not found', { blockId, templateId }) + continue + } + + for (let i = 0; i < branchCount; i++) { + const branchNodeId = buildBranchNodeId(blockId, i) + allBranchNodes.push(branchNodeId) + + if (i === 0) { + this.updateBranchMetadata(templateNode, i, branchCount, distributionItems?.[i]) + continue + } + + const branchNode = this.cloneTemplateNode( + templateNode, + blockId, + i, + branchCount, + distributionItems?.[i] + ) + dag.nodes.set(branchNodeId, branchNode) + } + } + + this.wireInternalEdges(dag, blocksInParallel, blocksSet, branchCount) + + const { entryNodes, terminalNodes } = this.identifyBoundaryNodes( + dag, + blocksInParallel, + blocksSet, + branchCount + ) + + this.wireSentinelEdges(dag, parallelId, entryNodes, terminalNodes, branchCount) + + logger.info('Parallel expanded', { + parallelId, + branchCount, + blocksCount: blocksInParallel.length, + totalNodes: allBranchNodes.length, + }) + + return { entryNodes, terminalNodes, allBranchNodes } + } + + private updateBranchMetadata( + node: DAGNode, + branchIndex: number, + branchTotal: number, + distributionItem?: any + ): void { + node.metadata.branchIndex = branchIndex + node.metadata.branchTotal = branchTotal + if (distributionItem !== undefined) { + node.metadata.distributionItem = distributionItem + } + } + + private cloneTemplateNode( + template: DAGNode, + originalBlockId: string, + branchIndex: number, + branchTotal: number, + distributionItem?: any + ): DAGNode { + const branchNodeId = buildBranchNodeId(originalBlockId, branchIndex) + const blockClone: SerializedBlock = { + ...template.block, + id: branchNodeId, + } + + return { + id: branchNodeId, + block: blockClone, + incomingEdges: new Set(), + outgoingEdges: new Map(), + metadata: { + ...template.metadata, + branchIndex, + branchTotal, + distributionItem, + originalBlockId, + }, + } + } + + private wireInternalEdges( + dag: DAG, + blocksInParallel: string[], + blocksSet: Set, + branchCount: number + ): void { + for (const blockId of blocksInParallel) { + const templateId = buildBranchNodeId(blockId, 0) + const templateNode = dag.nodes.get(templateId) + if (!templateNode) continue + + for (const [, edge] of templateNode.outgoingEdges) { + const baseTargetId = extractBaseBlockId(edge.target) + if (!blocksSet.has(baseTargetId)) continue + + for (let i = 1; i < branchCount; i++) { + const sourceNodeId = buildBranchNodeId(blockId, i) + const targetNodeId = buildBranchNodeId(baseTargetId, i) + const sourceNode = dag.nodes.get(sourceNodeId) + const targetNode = dag.nodes.get(targetNodeId) + + if (!sourceNode || !targetNode) continue + + const edgeId = edge.sourceHandle + ? `${sourceNodeId}→${targetNodeId}-${edge.sourceHandle}` + : `${sourceNodeId}→${targetNodeId}` + + sourceNode.outgoingEdges.set(edgeId, { + target: targetNodeId, + sourceHandle: edge.sourceHandle, + targetHandle: edge.targetHandle, + }) + targetNode.incomingEdges.add(sourceNodeId) + } + } + } + } + + private identifyBoundaryNodes( + dag: DAG, + blocksInParallel: string[], + blocksSet: Set, + branchCount: number + ): { entryNodes: string[]; terminalNodes: string[] } { + const entryNodes: string[] = [] + const terminalNodes: string[] = [] + + for (const blockId of blocksInParallel) { + const templateId = buildBranchNodeId(blockId, 0) + const templateNode = dag.nodes.get(templateId) + if (!templateNode) continue + + const hasInternalIncoming = this.hasInternalIncomingEdge(templateNode, blocksSet) + const hasInternalOutgoing = this.hasInternalOutgoingEdge(templateNode, blocksSet) + + for (let i = 0; i < branchCount; i++) { + const branchNodeId = buildBranchNodeId(blockId, i) + if (!hasInternalIncoming) { + entryNodes.push(branchNodeId) + } + if (!hasInternalOutgoing) { + terminalNodes.push(branchNodeId) + } + } + } + + return { entryNodes, terminalNodes } + } + + private hasInternalIncomingEdge(node: DAGNode, blocksSet: Set): boolean { + for (const incomingId of node.incomingEdges) { + const baseId = extractBaseBlockId(incomingId) + if (blocksSet.has(baseId)) { + return true + } + } + return false + } + + private hasInternalOutgoingEdge(node: DAGNode, blocksSet: Set): boolean { + for (const [, edge] of node.outgoingEdges) { + const baseId = extractBaseBlockId(edge.target) + if (blocksSet.has(baseId)) { + return true + } + } + return false + } + + private wireSentinelEdges( + dag: DAG, + parallelId: string, + entryNodes: string[], + terminalNodes: string[], + branchCount: number + ): void { + const sentinelStartId = buildParallelSentinelStartId(parallelId) + const sentinelEndId = buildParallelSentinelEndId(parallelId) + const sentinelStart = dag.nodes.get(sentinelStartId) + const sentinelEnd = dag.nodes.get(sentinelEndId) + + if (!sentinelStart || !sentinelEnd) { + logger.warn('Sentinel nodes not found', { parallelId, sentinelStartId, sentinelEndId }) + return + } + + sentinelStart.outgoingEdges.clear() + for (const entryNodeId of entryNodes) { + const entryNode = dag.nodes.get(entryNodeId) + if (!entryNode) continue + + const edgeId = `${sentinelStartId}→${entryNodeId}` + sentinelStart.outgoingEdges.set(edgeId, { target: entryNodeId }) + entryNode.incomingEdges.add(sentinelStartId) + } + + for (const terminalNodeId of terminalNodes) { + const terminalNode = dag.nodes.get(terminalNodeId) + if (!terminalNode) continue + + const edgeId = `${terminalNodeId}→${sentinelEndId}` + terminalNode.outgoingEdges.set(edgeId, { target: sentinelEndId }) + sentinelEnd.incomingEdges.add(terminalNodeId) + } + } +} diff --git a/apps/sim/executor/utils/subflow-utils.ts b/apps/sim/executor/utils/subflow-utils.ts index 05fbf709d..7f741b2f0 100644 --- a/apps/sim/executor/utils/subflow-utils.ts +++ b/apps/sim/executor/utils/subflow-utils.ts @@ -3,101 +3,70 @@ import { LOOP, PARALLEL, PARSING, REFERENCE } from '@/executor/constants' import type { ContextExtensions } from '@/executor/execution/types' import type { BlockLog, ExecutionContext } from '@/executor/types' import type { VariableResolver } from '@/executor/variables/resolver' -import type { SerializedParallel } from '@/serializer/types' const logger = createLogger('SubflowUtils') const BRANCH_PATTERN = new RegExp(`${PARALLEL.BRANCH.PREFIX}\\d+${PARALLEL.BRANCH.SUFFIX}$`) const BRANCH_INDEX_PATTERN = new RegExp(`${PARALLEL.BRANCH.PREFIX}(\\d+)${PARALLEL.BRANCH.SUFFIX}$`) -const SENTINEL_START_PATTERN = new RegExp( +const LOOP_SENTINEL_START_PATTERN = new RegExp( `${LOOP.SENTINEL.PREFIX}(.+)${LOOP.SENTINEL.START_SUFFIX}` ) -const SENTINEL_END_PATTERN = new RegExp(`${LOOP.SENTINEL.PREFIX}(.+)${LOOP.SENTINEL.END_SUFFIX}`) +const LOOP_SENTINEL_END_PATTERN = new RegExp( + `${LOOP.SENTINEL.PREFIX}(.+)${LOOP.SENTINEL.END_SUFFIX}` +) +const PARALLEL_SENTINEL_START_PATTERN = new RegExp( + `${PARALLEL.SENTINEL.PREFIX}(.+)${PARALLEL.SENTINEL.START_SUFFIX}` +) +const PARALLEL_SENTINEL_END_PATTERN = new RegExp( + `${PARALLEL.SENTINEL.PREFIX}(.+)${PARALLEL.SENTINEL.END_SUFFIX}` +) -/** Build sentinel start node ID */ export function buildSentinelStartId(loopId: string): string { return `${LOOP.SENTINEL.PREFIX}${loopId}${LOOP.SENTINEL.START_SUFFIX}` } -/** - * Build sentinel end node ID - */ + export function buildSentinelEndId(loopId: string): string { return `${LOOP.SENTINEL.PREFIX}${loopId}${LOOP.SENTINEL.END_SUFFIX}` } -/** - * Check if a node ID is a sentinel node - */ -export function isSentinelNodeId(nodeId: string): boolean { + +export function buildParallelSentinelStartId(parallelId: string): string { + return `${PARALLEL.SENTINEL.PREFIX}${parallelId}${PARALLEL.SENTINEL.START_SUFFIX}` +} + +export function buildParallelSentinelEndId(parallelId: string): string { + return `${PARALLEL.SENTINEL.PREFIX}${parallelId}${PARALLEL.SENTINEL.END_SUFFIX}` +} + +export function isLoopSentinelNodeId(nodeId: string): boolean { return nodeId.includes(LOOP.SENTINEL.START_SUFFIX) || nodeId.includes(LOOP.SENTINEL.END_SUFFIX) } +export function isParallelSentinelNodeId(nodeId: string): boolean { + return ( + nodeId.includes(PARALLEL.SENTINEL.START_SUFFIX) || nodeId.includes(PARALLEL.SENTINEL.END_SUFFIX) + ) +} + +export function isSentinelNodeId(nodeId: string): boolean { + return isLoopSentinelNodeId(nodeId) || isParallelSentinelNodeId(nodeId) +} + export function extractLoopIdFromSentinel(sentinelId: string): string | null { - const startMatch = sentinelId.match(SENTINEL_START_PATTERN) + const startMatch = sentinelId.match(LOOP_SENTINEL_START_PATTERN) if (startMatch) return startMatch[1] - const endMatch = sentinelId.match(SENTINEL_END_PATTERN) + const endMatch = sentinelId.match(LOOP_SENTINEL_END_PATTERN) if (endMatch) return endMatch[1] return null } -/** - * Parse distribution items from parallel config - * Handles: arrays, JSON strings, objects, and references - * Note: References (starting with '<') cannot be resolved at DAG construction time, - * they must be resolved at runtime. This function returns [] for references. - */ -export function parseDistributionItems(config: SerializedParallel): any[] { - const rawItems = config.distribution ?? [] - - // Already an array - return as-is - if (Array.isArray(rawItems)) { - return rawItems - } - - // Object - convert to entries array (consistent with loop forEach behavior) - if (typeof rawItems === 'object' && rawItems !== null) { - return Object.entries(rawItems) - } - - // String handling - if (typeof rawItems === 'string') { - // References cannot be resolved at DAG construction time - if (rawItems.startsWith(REFERENCE.START) && rawItems.endsWith(REFERENCE.END)) { - return [] - } - - // Try to parse as JSON - try { - const normalizedJSON = rawItems.replace(/'/g, '"') - const parsed = JSON.parse(normalizedJSON) - if (Array.isArray(parsed)) { - return parsed - } - // Parsed to non-array (e.g. object) - convert to entries - if (typeof parsed === 'object' && parsed !== null) { - return Object.entries(parsed) - } - return [] - } catch (error) { - logger.error('Failed to parse distribution items', { - rawItems, - error: error instanceof Error ? error.message : String(error), - }) - return [] - } - } - - return [] -} -/** - * Calculate branch count from parallel config - */ -export function calculateBranchCount(config: SerializedParallel, distributionItems: any[]): number { - const explicitCount = config.count ?? PARALLEL.DEFAULT_COUNT - if (config.parallelType === PARALLEL.TYPE.COLLECTION && distributionItems.length > 0) { - return distributionItems.length - } - return explicitCount +export function extractParallelIdFromSentinel(sentinelId: string): string | null { + const startMatch = sentinelId.match(PARALLEL_SENTINEL_START_PATTERN) + if (startMatch) return startMatch[1] + const endMatch = sentinelId.match(PARALLEL_SENTINEL_END_PATTERN) + if (endMatch) return endMatch[1] + return null } + /** * Build branch node ID with subscript notation * Example: ("blockId", 2) → "blockId₍2₎" @@ -119,20 +88,23 @@ export function isBranchNodeId(nodeId: string): boolean { } export function isLoopNode(nodeId: string): boolean { - return isSentinelNodeId(nodeId) || nodeId.startsWith(LOOP.SENTINEL.PREFIX) + return isLoopSentinelNodeId(nodeId) || nodeId.startsWith(LOOP.SENTINEL.PREFIX) } export function isParallelNode(nodeId: string): boolean { - return isBranchNodeId(nodeId) + return isBranchNodeId(nodeId) || isParallelSentinelNodeId(nodeId) } export function normalizeNodeId(nodeId: string): string { if (isBranchNodeId(nodeId)) { return extractBaseBlockId(nodeId) } - if (isSentinelNodeId(nodeId)) { + if (isLoopSentinelNodeId(nodeId)) { return extractLoopIdFromSentinel(nodeId) || nodeId } + if (isParallelSentinelNodeId(nodeId)) { + return extractParallelIdFromSentinel(nodeId) || nodeId + } return nodeId } diff --git a/apps/sim/lib/copilot/tools/server/workflow/edit-workflow.ts b/apps/sim/lib/copilot/tools/server/workflow/edit-workflow.ts index 120651244..86fe3669a 100644 --- a/apps/sim/lib/copilot/tools/server/workflow/edit-workflow.ts +++ b/apps/sim/lib/copilot/tools/server/workflow/edit-workflow.ts @@ -56,6 +56,7 @@ type SkippedItemType = | 'invalid_subblock_field' | 'missing_required_params' | 'invalid_subflow_parent' + | 'nested_subflow_not_allowed' | 'duplicate_block_name' /** @@ -1487,6 +1488,17 @@ function applyOperationsToWorkflowState( return } + if (childBlock.type === 'loop' || childBlock.type === 'parallel') { + logSkippedItem(skippedItems, { + type: 'nested_subflow_not_allowed', + operationType: 'edit_nested_node', + blockId: childId, + reason: `Cannot nest ${childBlock.type} inside ${block.type} - nested subflows are not supported`, + details: { parentType: block.type, childType: childBlock.type }, + }) + return + } + const childBlockState = createBlockFromParams( childId, childBlock, @@ -1733,6 +1745,17 @@ function applyOperationsToWorkflowState( return } + if (childBlock.type === 'loop' || childBlock.type === 'parallel') { + logSkippedItem(skippedItems, { + type: 'nested_subflow_not_allowed', + operationType: 'add_nested_node', + blockId: childId, + reason: `Cannot nest ${childBlock.type} inside ${params.type} - nested subflows are not supported`, + details: { parentType: params.type, childType: childBlock.type }, + }) + return + } + const childBlockState = createBlockFromParams( childId, childBlock, @@ -1799,6 +1822,17 @@ function applyOperationsToWorkflowState( break } + if (params.type === 'loop' || params.type === 'parallel') { + logSkippedItem(skippedItems, { + type: 'nested_subflow_not_allowed', + operationType: 'insert_into_subflow', + blockId: block_id, + reason: `Cannot nest ${params.type} inside ${subflowBlock.type} - nested subflows are not supported`, + details: { parentType: subflowBlock.type, childType: params.type }, + }) + break + } + // Get block configuration const blockConfig = getAllBlocks().find((block) => block.type === params.type) @@ -1806,6 +1840,17 @@ function applyOperationsToWorkflowState( const existingBlock = modifiedState.blocks[block_id] if (existingBlock) { + if (existingBlock.type === 'loop' || existingBlock.type === 'parallel') { + logSkippedItem(skippedItems, { + type: 'nested_subflow_not_allowed', + operationType: 'insert_into_subflow', + blockId: block_id, + reason: `Cannot move ${existingBlock.type} into ${subflowBlock.type} - nested subflows are not supported`, + details: { parentType: subflowBlock.type, childType: existingBlock.type }, + }) + break + } + // Moving existing block into subflow - just update parent existingBlock.data = { ...existingBlock.data,