fix(executor): fix. convergent error edges (#3015)

This commit is contained in:
Siddharth Ganesan
2026-01-26 17:25:09 -08:00
committed by GitHub
parent 9dbf56f9cd
commit cb650132c7
2 changed files with 198 additions and 1 deletions

View File

@@ -773,6 +773,176 @@ describe('EdgeManager', () => {
})
})
describe('Multiple error ports to same target', () => {
it('should mark target ready when one source errors and another succeeds', () => {
// This tests the case where a node has multiple incoming error edges
// from different sources. When one source errors (activating its error edge)
// and another source succeeds (deactivating its error edge), the target
// should become ready after both sources complete.
//
// Workflow 1 (errors) ─── error ───┐
// ├──→ Error Handler
// Workflow 7 (succeeds) ─ error ───┘
const workflow1Id = 'workflow-1'
const workflow7Id = 'workflow-7'
const errorHandlerId = 'error-handler'
const workflow1Node = createMockNode(workflow1Id, [
{ target: errorHandlerId, sourceHandle: 'error' },
])
const workflow7Node = createMockNode(workflow7Id, [
{ target: errorHandlerId, sourceHandle: 'error' },
])
const errorHandlerNode = createMockNode(errorHandlerId, [], [workflow1Id, workflow7Id])
const nodes = new Map<string, DAGNode>([
[workflow1Id, workflow1Node],
[workflow7Id, workflow7Node],
[errorHandlerId, errorHandlerNode],
])
const dag = createMockDAG(nodes)
const edgeManager = new EdgeManager(dag)
// Workflow 1 errors first - error edge activates
const readyAfterWorkflow1 = edgeManager.processOutgoingEdges(workflow1Node, {
error: 'Something went wrong',
})
// Error handler should NOT be ready yet (waiting for workflow 7)
expect(readyAfterWorkflow1).not.toContain(errorHandlerId)
// Workflow 7 succeeds - error edge deactivates
const readyAfterWorkflow7 = edgeManager.processOutgoingEdges(workflow7Node, {
result: 'success',
})
// Error handler SHOULD be ready now (workflow 1's error edge activated)
expect(readyAfterWorkflow7).toContain(errorHandlerId)
})
it('should mark target ready when first source succeeds then second errors', () => {
// Opposite order: first source succeeds, then second errors
const workflow1Id = 'workflow-1'
const workflow7Id = 'workflow-7'
const errorHandlerId = 'error-handler'
const workflow1Node = createMockNode(workflow1Id, [
{ target: errorHandlerId, sourceHandle: 'error' },
])
const workflow7Node = createMockNode(workflow7Id, [
{ target: errorHandlerId, sourceHandle: 'error' },
])
const errorHandlerNode = createMockNode(errorHandlerId, [], [workflow1Id, workflow7Id])
const nodes = new Map<string, DAGNode>([
[workflow1Id, workflow1Node],
[workflow7Id, workflow7Node],
[errorHandlerId, errorHandlerNode],
])
const dag = createMockDAG(nodes)
const edgeManager = new EdgeManager(dag)
// Workflow 1 succeeds first - error edge deactivates
const readyAfterWorkflow1 = edgeManager.processOutgoingEdges(workflow1Node, {
result: 'success',
})
// Error handler should NOT be ready yet (waiting for workflow 7)
expect(readyAfterWorkflow1).not.toContain(errorHandlerId)
// Workflow 7 errors - error edge activates
const readyAfterWorkflow7 = edgeManager.processOutgoingEdges(workflow7Node, {
error: 'Something went wrong',
})
// Error handler SHOULD be ready now (workflow 7's error edge activated)
expect(readyAfterWorkflow7).toContain(errorHandlerId)
})
it('should NOT mark target ready when all sources succeed (no errors)', () => {
// When neither source errors, the error handler should NOT run
const workflow1Id = 'workflow-1'
const workflow7Id = 'workflow-7'
const errorHandlerId = 'error-handler'
const workflow1Node = createMockNode(workflow1Id, [
{ target: errorHandlerId, sourceHandle: 'error' },
])
const workflow7Node = createMockNode(workflow7Id, [
{ target: errorHandlerId, sourceHandle: 'error' },
])
const errorHandlerNode = createMockNode(errorHandlerId, [], [workflow1Id, workflow7Id])
const nodes = new Map<string, DAGNode>([
[workflow1Id, workflow1Node],
[workflow7Id, workflow7Node],
[errorHandlerId, errorHandlerNode],
])
const dag = createMockDAG(nodes)
const edgeManager = new EdgeManager(dag)
// Both workflows succeed - both error edges deactivate
const readyAfterWorkflow1 = edgeManager.processOutgoingEdges(workflow1Node, {
result: 'success',
})
expect(readyAfterWorkflow1).not.toContain(errorHandlerId)
const readyAfterWorkflow7 = edgeManager.processOutgoingEdges(workflow7Node, {
result: 'success',
})
// Error handler should NOT be ready (no errors occurred)
expect(readyAfterWorkflow7).not.toContain(errorHandlerId)
})
it('should mark target ready when both sources error', () => {
// When both sources error, the error handler should run
const workflow1Id = 'workflow-1'
const workflow7Id = 'workflow-7'
const errorHandlerId = 'error-handler'
const workflow1Node = createMockNode(workflow1Id, [
{ target: errorHandlerId, sourceHandle: 'error' },
])
const workflow7Node = createMockNode(workflow7Id, [
{ target: errorHandlerId, sourceHandle: 'error' },
])
const errorHandlerNode = createMockNode(errorHandlerId, [], [workflow1Id, workflow7Id])
const nodes = new Map<string, DAGNode>([
[workflow1Id, workflow1Node],
[workflow7Id, workflow7Node],
[errorHandlerId, errorHandlerNode],
])
const dag = createMockDAG(nodes)
const edgeManager = new EdgeManager(dag)
// Workflow 1 errors
const readyAfterWorkflow1 = edgeManager.processOutgoingEdges(workflow1Node, {
error: 'Error 1',
})
expect(readyAfterWorkflow1).not.toContain(errorHandlerId)
// Workflow 7 errors
const readyAfterWorkflow7 = edgeManager.processOutgoingEdges(workflow7Node, {
error: 'Error 2',
})
// Error handler SHOULD be ready (both edges activated)
expect(readyAfterWorkflow7).toContain(errorHandlerId)
})
})
describe('Chained conditions', () => {
it('should handle sequential conditions (condition1 → condition2)', () => {
const condition1Id = 'condition-1'

View File

@@ -8,6 +8,7 @@ const logger = createLogger('EdgeManager')
export class EdgeManager {
private deactivatedEdges = new Set<string>()
private nodesWithActivatedEdge = new Set<string>()
constructor(private dag: DAG) {}
@@ -35,6 +36,11 @@ export class EdgeManager {
activatedTargets.push(edge.target)
}
// Track nodes that have received at least one activated edge
for (const targetId of activatedTargets) {
this.nodesWithActivatedEdge.add(targetId)
}
const cascadeTargets = new Set<string>()
for (const { target, handle } of edgesToDeactivate) {
this.deactivateEdgeAndDescendants(node.id, target, handle, cascadeTargets)
@@ -71,6 +77,18 @@ export class EdgeManager {
}
}
// Check if any deactivation targets that previously received an activated edge are now ready
for (const { target } of edgesToDeactivate) {
if (
!readyNodes.includes(target) &&
!activatedTargets.includes(target) &&
this.nodesWithActivatedEdge.has(target) &&
this.isTargetReady(target)
) {
readyNodes.push(target)
}
}
return readyNodes
}
@@ -90,6 +108,7 @@ export class EdgeManager {
clearDeactivatedEdges(): void {
this.deactivatedEdges.clear()
this.nodesWithActivatedEdge.clear()
}
/**
@@ -108,6 +127,10 @@ export class EdgeManager {
for (const edgeKey of edgesToRemove) {
this.deactivatedEdges.delete(edgeKey)
}
// Also clear activated edge tracking for these nodes
for (const nodeId of nodeIds) {
this.nodesWithActivatedEdge.delete(nodeId)
}
}
private isTargetReady(targetId: string): boolean {
@@ -210,7 +233,11 @@ export class EdgeManager {
cascadeTargets?.add(targetId)
}
if (this.hasActiveIncomingEdges(targetNode, edgeKey)) {
// Don't cascade if node has active incoming edges OR has received an activated edge
if (
this.hasActiveIncomingEdges(targetNode, edgeKey) ||
this.nodesWithActivatedEdge.has(targetId)
) {
return
}