From cb650132c75c3434dd69d21232c60c5ce3e12fab Mon Sep 17 00:00:00 2001 From: Siddharth Ganesan <33737564+Sg312@users.noreply.github.com> Date: Mon, 26 Jan 2026 17:25:09 -0800 Subject: [PATCH] fix(executor): fix. convergent error edges (#3015) --- .../executor/execution/edge-manager.test.ts | 170 ++++++++++++++++++ apps/sim/executor/execution/edge-manager.ts | 29 ++- 2 files changed, 198 insertions(+), 1 deletion(-) diff --git a/apps/sim/executor/execution/edge-manager.test.ts b/apps/sim/executor/execution/edge-manager.test.ts index f78bb8cf2..f7b332792 100644 --- a/apps/sim/executor/execution/edge-manager.test.ts +++ b/apps/sim/executor/execution/edge-manager.test.ts @@ -773,6 +773,176 @@ describe('EdgeManager', () => { }) }) + describe('Multiple error ports to same target', () => { + it('should mark target ready when one source errors and another succeeds', () => { + // This tests the case where a node has multiple incoming error edges + // from different sources. When one source errors (activating its error edge) + // and another source succeeds (deactivating its error edge), the target + // should become ready after both sources complete. + // + // Workflow 1 (errors) ─── error ───┐ + // ├──→ Error Handler + // Workflow 7 (succeeds) ─ error ───┘ + + const workflow1Id = 'workflow-1' + const workflow7Id = 'workflow-7' + const errorHandlerId = 'error-handler' + + const workflow1Node = createMockNode(workflow1Id, [ + { target: errorHandlerId, sourceHandle: 'error' }, + ]) + + const workflow7Node = createMockNode(workflow7Id, [ + { target: errorHandlerId, sourceHandle: 'error' }, + ]) + + const errorHandlerNode = createMockNode(errorHandlerId, [], [workflow1Id, workflow7Id]) + + const nodes = new Map([ + [workflow1Id, workflow1Node], + [workflow7Id, workflow7Node], + [errorHandlerId, errorHandlerNode], + ]) + + const dag = createMockDAG(nodes) + const edgeManager = new EdgeManager(dag) + + // Workflow 1 errors first - error edge activates + const readyAfterWorkflow1 = edgeManager.processOutgoingEdges(workflow1Node, { + error: 'Something went wrong', + }) + // Error handler should NOT be ready yet (waiting for workflow 7) + expect(readyAfterWorkflow1).not.toContain(errorHandlerId) + + // Workflow 7 succeeds - error edge deactivates + const readyAfterWorkflow7 = edgeManager.processOutgoingEdges(workflow7Node, { + result: 'success', + }) + // Error handler SHOULD be ready now (workflow 1's error edge activated) + expect(readyAfterWorkflow7).toContain(errorHandlerId) + }) + + it('should mark target ready when first source succeeds then second errors', () => { + // Opposite order: first source succeeds, then second errors + + const workflow1Id = 'workflow-1' + const workflow7Id = 'workflow-7' + const errorHandlerId = 'error-handler' + + const workflow1Node = createMockNode(workflow1Id, [ + { target: errorHandlerId, sourceHandle: 'error' }, + ]) + + const workflow7Node = createMockNode(workflow7Id, [ + { target: errorHandlerId, sourceHandle: 'error' }, + ]) + + const errorHandlerNode = createMockNode(errorHandlerId, [], [workflow1Id, workflow7Id]) + + const nodes = new Map([ + [workflow1Id, workflow1Node], + [workflow7Id, workflow7Node], + [errorHandlerId, errorHandlerNode], + ]) + + const dag = createMockDAG(nodes) + const edgeManager = new EdgeManager(dag) + + // Workflow 1 succeeds first - error edge deactivates + const readyAfterWorkflow1 = edgeManager.processOutgoingEdges(workflow1Node, { + result: 'success', + }) + // Error handler should NOT be ready yet (waiting for workflow 7) + expect(readyAfterWorkflow1).not.toContain(errorHandlerId) + + // Workflow 7 errors - error edge activates + const readyAfterWorkflow7 = edgeManager.processOutgoingEdges(workflow7Node, { + error: 'Something went wrong', + }) + // Error handler SHOULD be ready now (workflow 7's error edge activated) + expect(readyAfterWorkflow7).toContain(errorHandlerId) + }) + + it('should NOT mark target ready when all sources succeed (no errors)', () => { + // When neither source errors, the error handler should NOT run + + const workflow1Id = 'workflow-1' + const workflow7Id = 'workflow-7' + const errorHandlerId = 'error-handler' + + const workflow1Node = createMockNode(workflow1Id, [ + { target: errorHandlerId, sourceHandle: 'error' }, + ]) + + const workflow7Node = createMockNode(workflow7Id, [ + { target: errorHandlerId, sourceHandle: 'error' }, + ]) + + const errorHandlerNode = createMockNode(errorHandlerId, [], [workflow1Id, workflow7Id]) + + const nodes = new Map([ + [workflow1Id, workflow1Node], + [workflow7Id, workflow7Node], + [errorHandlerId, errorHandlerNode], + ]) + + const dag = createMockDAG(nodes) + const edgeManager = new EdgeManager(dag) + + // Both workflows succeed - both error edges deactivate + const readyAfterWorkflow1 = edgeManager.processOutgoingEdges(workflow1Node, { + result: 'success', + }) + expect(readyAfterWorkflow1).not.toContain(errorHandlerId) + + const readyAfterWorkflow7 = edgeManager.processOutgoingEdges(workflow7Node, { + result: 'success', + }) + // Error handler should NOT be ready (no errors occurred) + expect(readyAfterWorkflow7).not.toContain(errorHandlerId) + }) + + it('should mark target ready when both sources error', () => { + // When both sources error, the error handler should run + + const workflow1Id = 'workflow-1' + const workflow7Id = 'workflow-7' + const errorHandlerId = 'error-handler' + + const workflow1Node = createMockNode(workflow1Id, [ + { target: errorHandlerId, sourceHandle: 'error' }, + ]) + + const workflow7Node = createMockNode(workflow7Id, [ + { target: errorHandlerId, sourceHandle: 'error' }, + ]) + + const errorHandlerNode = createMockNode(errorHandlerId, [], [workflow1Id, workflow7Id]) + + const nodes = new Map([ + [workflow1Id, workflow1Node], + [workflow7Id, workflow7Node], + [errorHandlerId, errorHandlerNode], + ]) + + const dag = createMockDAG(nodes) + const edgeManager = new EdgeManager(dag) + + // Workflow 1 errors + const readyAfterWorkflow1 = edgeManager.processOutgoingEdges(workflow1Node, { + error: 'Error 1', + }) + expect(readyAfterWorkflow1).not.toContain(errorHandlerId) + + // Workflow 7 errors + const readyAfterWorkflow7 = edgeManager.processOutgoingEdges(workflow7Node, { + error: 'Error 2', + }) + // Error handler SHOULD be ready (both edges activated) + expect(readyAfterWorkflow7).toContain(errorHandlerId) + }) + }) + describe('Chained conditions', () => { it('should handle sequential conditions (condition1 → condition2)', () => { const condition1Id = 'condition-1' diff --git a/apps/sim/executor/execution/edge-manager.ts b/apps/sim/executor/execution/edge-manager.ts index 28416333c..f0ac33fa7 100644 --- a/apps/sim/executor/execution/edge-manager.ts +++ b/apps/sim/executor/execution/edge-manager.ts @@ -8,6 +8,7 @@ const logger = createLogger('EdgeManager') export class EdgeManager { private deactivatedEdges = new Set() + private nodesWithActivatedEdge = new Set() constructor(private dag: DAG) {} @@ -35,6 +36,11 @@ export class EdgeManager { activatedTargets.push(edge.target) } + // Track nodes that have received at least one activated edge + for (const targetId of activatedTargets) { + this.nodesWithActivatedEdge.add(targetId) + } + const cascadeTargets = new Set() for (const { target, handle } of edgesToDeactivate) { this.deactivateEdgeAndDescendants(node.id, target, handle, cascadeTargets) @@ -71,6 +77,18 @@ export class EdgeManager { } } + // Check if any deactivation targets that previously received an activated edge are now ready + for (const { target } of edgesToDeactivate) { + if ( + !readyNodes.includes(target) && + !activatedTargets.includes(target) && + this.nodesWithActivatedEdge.has(target) && + this.isTargetReady(target) + ) { + readyNodes.push(target) + } + } + return readyNodes } @@ -90,6 +108,7 @@ export class EdgeManager { clearDeactivatedEdges(): void { this.deactivatedEdges.clear() + this.nodesWithActivatedEdge.clear() } /** @@ -108,6 +127,10 @@ export class EdgeManager { for (const edgeKey of edgesToRemove) { this.deactivatedEdges.delete(edgeKey) } + // Also clear activated edge tracking for these nodes + for (const nodeId of nodeIds) { + this.nodesWithActivatedEdge.delete(nodeId) + } } private isTargetReady(targetId: string): boolean { @@ -210,7 +233,11 @@ export class EdgeManager { cascadeTargets?.add(targetId) } - if (this.hasActiveIncomingEdges(targetNode, edgeKey)) { + // Don't cascade if node has active incoming edges OR has received an activated edge + if ( + this.hasActiveIncomingEdges(targetNode, edgeKey) || + this.nodesWithActivatedEdge.has(targetId) + ) { return }