fix(condition): execution with subflow sentinels follow-on, snapshot highlighting, duplicate terminal logs (#3429)

* fix(condition): consecutive error logging + execution dequeuing

* fix snapshot highlighting

* address minor gaps

* fix incomplete case

* remove activatedEdges path

* cleanup tests

* address greptile comments

* update tests:
This commit is contained in:
Vikhyath Mondreti
2026-03-05 17:03:02 -08:00
committed by GitHub
parent 28f8e0fd97
commit d640fa0852
11 changed files with 688 additions and 89 deletions

View File

@@ -20,7 +20,10 @@ import {
TriggerUtils,
} from '@/lib/workflows/triggers/triggers'
import { useCurrentWorkflow } from '@/app/workspace/[workspaceId]/w/[workflowId]/hooks/use-current-workflow'
import { updateActiveBlockRefCount } from '@/app/workspace/[workspaceId]/w/[workflowId]/utils/workflow-execution-utils'
import {
markOutgoingEdgesFromOutput,
updateActiveBlockRefCount,
} from '@/app/workspace/[workspaceId]/w/[workflowId]/utils/workflow-execution-utils'
import { getBlock } from '@/blocks'
import type { SerializableExecutionState } from '@/executor/execution/types'
import type {
@@ -63,7 +66,7 @@ interface DebugValidationResult {
interface BlockEventHandlerConfig {
workflowId?: string
executionIdRef: { current: string }
workflowEdges: Array<{ id: string; target: string; sourceHandle?: string | null }>
workflowEdges: Array<{ id: string; source: string; target: string; sourceHandle?: string | null }>
activeBlocksSet: Set<string>
activeBlockRefCounts: Map<string, number>
accumulatedBlockLogs: BlockLog[]
@@ -335,13 +338,9 @@ export function useWorkflowExecution() {
setActiveBlocks(workflowId, new Set(activeBlocksSet))
}
const markIncomingEdges = (blockId: string) => {
const markOutgoingEdges = (blockId: string, output: Record<string, any> | undefined) => {
if (!workflowId) return
const incomingEdges = workflowEdges.filter((edge) => edge.target === blockId)
incomingEdges.forEach((edge) => {
const status = edge.sourceHandle === 'error' ? 'error' : 'success'
setEdgeRunStatus(workflowId, edge.id, status)
})
markOutgoingEdgesFromOutput(blockId, output, workflowEdges, workflowId, setEdgeRunStatus)
}
const isContainerBlockType = (blockType?: string) => {
@@ -460,7 +459,6 @@ export function useWorkflowExecution() {
const onBlockStarted = (data: BlockStartedData) => {
if (isStaleExecution()) return
updateActiveBlocks(data.blockId, true)
markIncomingEdges(data.blockId)
if (!includeStartConsoleEntry || !workflowId) return
@@ -487,6 +485,7 @@ export function useWorkflowExecution() {
if (isStaleExecution()) return
updateActiveBlocks(data.blockId, false)
if (workflowId) setBlockRunStatus(workflowId, data.blockId, 'success')
markOutgoingEdges(data.blockId, data.output as Record<string, any> | undefined)
executedBlockIds.add(data.blockId)
accumulatedBlockStates.set(data.blockId, {
output: data.output,
@@ -505,7 +504,9 @@ export function useWorkflowExecution() {
}
if (isContainerBlockType(data.blockType) && !data.iterationContainerId) {
return
const output = data.output as Record<string, any> | undefined
const isEmptySubflow = Array.isArray(output?.results) && output.results.length === 0
if (!isEmptySubflow) return
}
accumulatedBlockLogs.push(createBlockLogEntry(data, { success: true, output: data.output }))
@@ -527,6 +528,7 @@ export function useWorkflowExecution() {
if (isStaleExecution()) return
updateActiveBlocks(data.blockId, false)
if (workflowId) setBlockRunStatus(workflowId, data.blockId, 'error')
markOutgoingEdges(data.blockId, { error: data.error })
executedBlockIds.add(data.blockId)
accumulatedBlockStates.set(data.blockId, {

View File

@@ -29,6 +29,69 @@ export function updateActiveBlockRefCount(
}
}
/**
* Determines if a workflow edge should be marked as active based on its handle and the block output.
* Mirrors the executor's EdgeManager.shouldActivateEdge logic on the client side.
*/
function shouldActivateEdgeClient(
handle: string | null | undefined,
output: Record<string, any> | undefined
): boolean {
if (!handle) return true
if (handle.startsWith('condition-')) {
return output?.selectedOption === handle.substring('condition-'.length)
}
if (handle.startsWith('router-')) {
return output?.selectedRoute === handle.substring('router-'.length)
}
switch (handle) {
case 'error':
return !!output?.error
case 'source':
return !output?.error
case 'loop-start-source':
case 'loop-end-source':
case 'parallel-start-source':
case 'parallel-end-source':
case 'loop_exit':
case 'loop_continue':
case 'loop_continue_alt':
case 'parallel_exit':
return true
default:
return true
}
}
export function markOutgoingEdgesFromOutput(
blockId: string,
output: Record<string, any> | undefined,
workflowEdges: Array<{
id: string
source: string
target: string
sourceHandle?: string | null
}>,
workflowId: string,
setEdgeRunStatus: (wfId: string, edgeId: string, status: 'success' | 'error') => void
): void {
const outgoing = workflowEdges.filter((edge) => edge.source === blockId)
for (const edge of outgoing) {
const handle = edge.sourceHandle
if (!handle) {
setEdgeRunStatus(workflowId, edge.id, 'success')
continue
}
if (shouldActivateEdgeClient(handle, output)) {
const status = handle === 'error' ? 'error' : 'success'
setEdgeRunStatus(workflowId, edge.id, status)
}
}
}
export interface WorkflowExecutionOptions {
workflowInput?: any
onStream?: (se: StreamingExecution) => Promise<void>
@@ -135,13 +198,6 @@ export async function executeWorkflowWithFullLogging(
true
)
setActiveBlocks(wfId, new Set(activeBlocksSet))
const incomingEdges = workflowEdges.filter(
(edge) => edge.target === event.data.blockId
)
incomingEdges.forEach((edge) => {
setEdgeRunStatus(wfId, edge.id, 'success')
})
break
}
@@ -155,6 +211,13 @@ export async function executeWorkflowWithFullLogging(
setActiveBlocks(wfId, new Set(activeBlocksSet))
setBlockRunStatus(wfId, event.data.blockId, 'success')
markOutgoingEdgesFromOutput(
event.data.blockId,
event.data.output,
workflowEdges,
wfId,
setEdgeRunStatus
)
addConsole({
input: event.data.input || {},
@@ -194,6 +257,13 @@ export async function executeWorkflowWithFullLogging(
setActiveBlocks(wfId, new Set(activeBlocksSet))
setBlockRunStatus(wfId, event.data.blockId, 'error')
markOutgoingEdgesFromOutput(
event.data.blockId,
{ error: event.data.error },
workflowEdges,
wfId,
setEdgeRunStatus
)
addConsole({
input: event.data.input || {},

View File

@@ -145,7 +145,7 @@ interface PreviewWorkflowProps {
/** Cursor style to show when hovering the canvas */
cursorStyle?: 'default' | 'pointer' | 'grab'
/** Map of executed block IDs to their status for highlighting the execution path */
executedBlocks?: Record<string, { status: string }>
executedBlocks?: Record<string, { status: string; output?: unknown }>
/** Currently selected block ID for highlighting */
selectedBlockId?: string | null
/** Skips expensive subblock computations for thumbnails/template previews */
@@ -274,9 +274,9 @@ export function PreviewWorkflow({
/** Maps base block IDs to execution data, handling parallel iteration variants (blockId₍n₎). */
const blockExecutionMap = useMemo(() => {
if (!executedBlocks) return new Map<string, { status: string }>()
if (!executedBlocks) return new Map<string, { status: string; output?: unknown }>()
const map = new Map<string, { status: string }>()
const map = new Map<string, { status: string; output?: unknown }>()
for (const [key, value] of Object.entries(executedBlocks)) {
// Extract base ID (remove iteration suffix like ₍0₎)
const baseId = key.includes('₍') ? key.split('₍')[0] : key
@@ -451,7 +451,6 @@ export function PreviewWorkflow({
const edges: Edge[] = useMemo(() => {
if (!isValidWorkflowState) return []
/** Edge is green if target executed and source condition met by edge type. */
const getEdgeExecutionStatus = (edge: {
source: string
target: string
@@ -463,17 +462,40 @@ export function PreviewWorkflow({
if (!targetStatus?.executed) return 'not-executed'
const sourceStatus = getBlockExecutionStatus(edge.source)
const { sourceHandle } = edge
if (!sourceStatus?.executed) return 'not-executed'
if (sourceHandle === 'error') {
return sourceStatus?.status === 'error' ? 'success' : 'not-executed'
const handle = edge.sourceHandle
if (!handle) {
return sourceStatus.status === 'success' ? 'success' : 'not-executed'
}
if (sourceHandle === 'loop-start-source' || sourceHandle === 'parallel-start-source') {
return 'success'
const sourceOutput = blockExecutionMap.get(edge.source)?.output as
| Record<string, any>
| undefined
if (handle.startsWith('condition-')) {
const conditionValue = handle.substring('condition-'.length)
return sourceOutput?.selectedOption === conditionValue ? 'success' : 'not-executed'
}
return sourceStatus?.status === 'success' ? 'success' : 'not-executed'
if (handle.startsWith('router-')) {
const routeId = handle.substring('router-'.length)
return sourceOutput?.selectedRoute === routeId ? 'success' : 'not-executed'
}
switch (handle) {
case 'error':
return sourceStatus.status === 'error' ? 'error' : 'not-executed'
case 'source':
return sourceStatus.status === 'success' ? 'success' : 'not-executed'
case 'loop-start-source':
case 'loop-end-source':
case 'parallel-start-source':
case 'parallel-end-source':
return 'success'
default:
return sourceStatus.status === 'success' ? 'success' : 'not-executed'
}
}
return (workflowState.edges || []).map((edge) => {

View File

@@ -66,11 +66,15 @@ describe('EdgeManager', () => {
const dag = createMockDAG(nodes)
const edgeManager = new EdgeManager(dag)
const readyAfterA = edgeManager.processOutgoingEdges(blockANode, { result: 'done' })
const readyAfterA = edgeManager.processOutgoingEdges(blockANode, {
result: 'done',
})
expect(readyAfterA).toContain(blockBId)
expect(readyAfterA).not.toContain(blockCId)
const readyAfterB = edgeManager.processOutgoingEdges(blockBNode, { result: 'done' })
const readyAfterB = edgeManager.processOutgoingEdges(blockBNode, {
result: 'done',
})
expect(readyAfterB).toContain(blockCId)
})
@@ -591,7 +595,9 @@ describe('EdgeManager', () => {
function1Node.incomingEdges.add(conditionId)
const readyNodes = edgeManager.processOutgoingEdges(conditionNode, { selectedOption: 'if' })
const readyNodes = edgeManager.processOutgoingEdges(conditionNode, {
selectedOption: 'if',
})
expect(readyNodes).toContain(function1Id)
})
})
@@ -977,11 +983,15 @@ describe('EdgeManager', () => {
const dag = createMockDAG(nodes)
const edgeManager = new EdgeManager(dag)
const ready1 = edgeManager.processOutgoingEdges(condition1Node, { selectedOption: 'if' })
const ready1 = edgeManager.processOutgoingEdges(condition1Node, {
selectedOption: 'if',
})
expect(ready1).toContain(condition2Id)
expect(ready1).not.toContain(target1Id)
const ready2 = edgeManager.processOutgoingEdges(condition2Node, { selectedOption: 'else' })
const ready2 = edgeManager.processOutgoingEdges(condition2Node, {
selectedOption: 'else',
})
expect(ready2).toContain(target1Id)
expect(ready2).not.toContain(target2Id)
})
@@ -1394,10 +1404,14 @@ describe('EdgeManager', () => {
const edgeManager = new EdgeManager(dag)
// Path: condition1(if) → condition2(else) → nodeC → sentinel_end
const ready1 = edgeManager.processOutgoingEdges(condition1Node, { selectedOption: 'if' })
const ready1 = edgeManager.processOutgoingEdges(condition1Node, {
selectedOption: 'if',
})
expect(ready1).toContain(condition2Id)
const ready2 = edgeManager.processOutgoingEdges(condition2Node, { selectedOption: 'else' })
const ready2 = edgeManager.processOutgoingEdges(condition2Node, {
selectedOption: 'else',
})
expect(ready2).toContain(nodeCId)
const ready3 = edgeManager.processOutgoingEdges(nodeCNode, {})
@@ -1448,7 +1462,9 @@ describe('EdgeManager', () => {
const edgeManager = new EdgeManager(dag)
// Test else path through diamond
const ready1 = edgeManager.processOutgoingEdges(conditionNode, { selectedOption: 'else' })
const ready1 = edgeManager.processOutgoingEdges(conditionNode, {
selectedOption: 'else',
})
expect(ready1).toContain(nodeBId)
expect(ready1).not.toContain(nodeAId)
@@ -1509,7 +1525,9 @@ describe('EdgeManager', () => {
const edgeManager = new EdgeManager(dag)
// Select else - triggers deep cascade deactivation of if path
const ready1 = edgeManager.processOutgoingEdges(conditionNode, { selectedOption: 'else' })
const ready1 = edgeManager.processOutgoingEdges(conditionNode, {
selectedOption: 'else',
})
expect(ready1).toContain(nodeDId)
const ready2 = edgeManager.processOutgoingEdges(nodeDNode, {})
@@ -1566,7 +1584,9 @@ describe('EdgeManager', () => {
const edgeManager = new EdgeManager(dag)
// Test middle branch (elseif2)
const ready1 = edgeManager.processOutgoingEdges(conditionNode, { selectedOption: 'elseif2' })
const ready1 = edgeManager.processOutgoingEdges(conditionNode, {
selectedOption: 'elseif2',
})
expect(ready1).toContain(nodeCId)
expect(ready1).not.toContain(nodeAId)
expect(ready1).not.toContain(nodeBId)
@@ -1629,7 +1649,7 @@ describe('EdgeManager', () => {
// Scenario: Loop with Function 1 → Condition 1 → Function 2
// Condition has "if" branch → Function 2
// Condition has "else" branch → NO connection (dead end)
// When else is selected (selectedOption: null), the loop should continue
// When else is selected, the loop sentinel should still fire
//
// DAG structure:
// sentinel_start → func1 → condition → (if) → func2 → sentinel_end
@@ -1637,11 +1657,12 @@ describe('EdgeManager', () => {
// sentinel_end → (loop_continue) → sentinel_start
//
// When condition takes else with no edge:
// - selectedOption: null (no condition matches)
// - selectedOption is set (condition made a routing decision)
// - The "if" edge gets deactivated
// - func2 has no other active incoming edges, so edge to sentinel_end gets deactivated
// - sentinel_end has no active incoming edges and should become ready
// - sentinel_end is the enclosing loop's sentinel and should become ready
const loopId = 'loop-1'
const sentinelStartId = 'sentinel-start'
const sentinelEndId = 'sentinel-end'
const func1Id = 'func1'
@@ -1649,14 +1670,21 @@ describe('EdgeManager', () => {
const func2Id = 'func2'
const sentinelStartNode = createMockNode(sentinelStartId, [{ target: func1Id }])
sentinelStartNode.metadata = { isSentinel: true, sentinelType: 'start', loopId }
const func1Node = createMockNode(func1Id, [{ target: conditionId }], [sentinelStartId])
// Condition only has "if" branch, no "else" edge (dead end)
func1Node.metadata = { loopId, isLoopNode: true }
const conditionNode = createMockNode(
conditionId,
[{ target: func2Id, sourceHandle: 'condition-if' }],
[func1Id]
)
conditionNode.metadata = { loopId, isLoopNode: true }
const func2Node = createMockNode(func2Id, [{ target: sentinelEndId }], [conditionId])
func2Node.metadata = { loopId, isLoopNode: true }
const sentinelEndNode = createMockNode(
sentinelEndId,
[
@@ -1665,6 +1693,8 @@ describe('EdgeManager', () => {
],
[func2Id]
)
sentinelEndNode.metadata = { isSentinel: true, sentinelType: 'end', loopId }
const afterLoopNode = createMockNode('after-loop', [], [sentinelEndId])
const nodes = new Map<string, DAGNode>([
@@ -1679,22 +1709,17 @@ describe('EdgeManager', () => {
const dag = createMockDAG(nodes)
const edgeManager = new EdgeManager(dag)
// Simulate execution: sentinel_start → func1 → condition
// Clear incoming edges as execution progresses (simulating normal flow)
func1Node.incomingEdges.clear()
conditionNode.incomingEdges.clear()
// Condition takes "else" but there's no else edge
// selectedOption: null means no condition branch matches
// Condition selects dead-end else (selectedOption is set — routing decision made)
// but it's inside the loop, so the enclosing sentinel should still fire
const ready = edgeManager.processOutgoingEdges(conditionNode, {
selectedOption: null,
conditionResult: false,
selectedOption: 'else-id',
conditionResult: true,
selectedPath: null,
})
// The "if" edge to func2 should be deactivated
// func2 has no other incoming edges, so its edge to sentinel_end gets deactivated
// sentinel_end has no active incoming edges and should be ready
expect(ready).toContain(sentinelEndId)
})
@@ -1763,11 +1788,12 @@ describe('EdgeManager', () => {
// → (else) → [nothing]
// → (else) → [nothing]
//
// When condition1 takes if, then condition2 takes else:
// When condition1 takes if, then condition2 takes else (dead-end):
// - condition2's "if" edge to func gets deactivated
// - func's edge to sentinel_end gets deactivated
// - sentinel_end should become ready
// - sentinel_end is the enclosing loop's sentinel and should become ready
const loopId = 'loop-1'
const sentinelStartId = 'sentinel-start'
const sentinelEndId = 'sentinel-end'
const condition1Id = 'condition1'
@@ -1775,22 +1801,31 @@ describe('EdgeManager', () => {
const funcId = 'func'
const sentinelStartNode = createMockNode(sentinelStartId, [{ target: condition1Id }])
sentinelStartNode.metadata = { isSentinel: true, sentinelType: 'start', loopId }
const condition1Node = createMockNode(
condition1Id,
[{ target: condition2Id, sourceHandle: 'condition-if' }],
[sentinelStartId]
)
condition1Node.metadata = { loopId, isLoopNode: true }
const condition2Node = createMockNode(
condition2Id,
[{ target: funcId, sourceHandle: 'condition-if' }],
[condition1Id]
)
condition2Node.metadata = { loopId, isLoopNode: true }
const funcNode = createMockNode(funcId, [{ target: sentinelEndId }], [condition2Id])
funcNode.metadata = { loopId, isLoopNode: true }
const sentinelEndNode = createMockNode(
sentinelEndId,
[{ target: sentinelStartId, sourceHandle: 'loop_continue' }],
[funcId]
)
sentinelEndNode.metadata = { isSentinel: true, sentinelType: 'end', loopId }
const nodes = new Map<string, DAGNode>([
[sentinelStartId, sentinelStartNode],
@@ -1803,22 +1838,95 @@ describe('EdgeManager', () => {
const dag = createMockDAG(nodes)
const edgeManager = new EdgeManager(dag)
// Clear incoming edges as execution progresses
condition1Node.incomingEdges.clear()
// condition1 takes "if" - condition2 becomes ready
const ready1 = edgeManager.processOutgoingEdges(condition1Node, { selectedOption: 'if' })
const ready1 = edgeManager.processOutgoingEdges(condition1Node, {
selectedOption: 'if',
})
expect(ready1).toContain(condition2Id)
condition2Node.incomingEdges.clear()
// condition2 takes "else" (dead end)
const ready2 = edgeManager.processOutgoingEdges(condition2Node, { selectedOption: null })
// condition2 selects dead-end else (selectedOption set — routing decision made)
const ready2 = edgeManager.processOutgoingEdges(condition2Node, {
selectedOption: 'else-id',
})
// sentinel_end should be ready because all paths to it are deactivated
// sentinel_end is the enclosing loop's sentinel and should be ready
expect(ready2).toContain(sentinelEndId)
})
it('should not fire nested subflow sentinel when condition inside outer loop hits dead-end', () => {
// Scenario: outer loop contains condition → (if) → inner loop → sentinel_end
// → (else) → [dead end]
//
// When condition selects dead-end else:
// - The outer loop's sentinel should fire (enclosing subflow)
// - The inner loop's sentinel should NOT fire (downstream subflow)
const outerLoopId = 'outer-loop'
const innerLoopId = 'inner-loop'
const outerStartId = 'outer-start'
const outerEndId = 'outer-end'
const conditionId = 'condition'
const innerStartId = 'inner-start'
const innerBodyId = 'inner-body'
const innerEndId = 'inner-end'
const outerStartNode = createMockNode(outerStartId, [{ target: conditionId }])
outerStartNode.metadata = { isSentinel: true, sentinelType: 'start', loopId: outerLoopId }
const conditionNode = createMockNode(
conditionId,
[{ target: innerStartId, sourceHandle: 'condition-if' }],
[outerStartId]
)
conditionNode.metadata = { loopId: outerLoopId, isLoopNode: true }
const innerStartNode = createMockNode(innerStartId, [{ target: innerBodyId }], [conditionId])
innerStartNode.metadata = { isSentinel: true, sentinelType: 'start', loopId: innerLoopId }
const innerBodyNode = createMockNode(innerBodyId, [{ target: innerEndId }], [innerStartId])
innerBodyNode.metadata = { loopId: innerLoopId, isLoopNode: true }
const innerEndNode = createMockNode(
innerEndId,
[{ target: outerEndId, sourceHandle: 'loop_exit' }],
[innerBodyId]
)
innerEndNode.metadata = { isSentinel: true, sentinelType: 'end', loopId: innerLoopId }
const outerEndNode = createMockNode(
outerEndId,
[{ target: outerStartId, sourceHandle: 'loop_continue' }],
[innerEndId]
)
outerEndNode.metadata = { isSentinel: true, sentinelType: 'end', loopId: outerLoopId }
const nodes = new Map<string, DAGNode>([
[outerStartId, outerStartNode],
[conditionId, conditionNode],
[innerStartId, innerStartNode],
[innerBodyId, innerBodyNode],
[innerEndId, innerEndNode],
[outerEndId, outerEndNode],
])
const dag = createMockDAG(nodes)
const edgeManager = new EdgeManager(dag)
conditionNode.incomingEdges.clear()
const ready = edgeManager.processOutgoingEdges(conditionNode, {
selectedOption: 'else-id',
})
// Outer loop sentinel should fire (condition is inside outer loop)
expect(ready).toContain(outerEndId)
// Inner loop sentinel should NOT fire (it's a downstream subflow)
expect(ready).not.toContain(innerEndId)
})
it('should NOT execute intermediate nodes in long cascade chains (2+ hops)', () => {
// Regression test: When condition hits dead-end with 2+ intermediate nodes,
// only sentinel_end should be ready, NOT the intermediate nodes.
@@ -1922,7 +2030,9 @@ describe('EdgeManager', () => {
const edgeManager = new EdgeManager(dag)
// Select else path
const ready1 = edgeManager.processOutgoingEdges(conditionNode, { selectedOption: 'else' })
const ready1 = edgeManager.processOutgoingEdges(conditionNode, {
selectedOption: 'else',
})
expect(ready1).toContain(nodeBId)
expect(ready1).not.toContain(nodeAId)
@@ -1968,7 +2078,9 @@ describe('EdgeManager', () => {
const edgeManager = new EdgeManager(dag)
// When selectedOption is null, the cascade deactivation makes parallel_end ready
const ready = edgeManager.processOutgoingEdges(conditionNode, { selectedOption: null })
const ready = edgeManager.processOutgoingEdges(conditionNode, {
selectedOption: null,
})
expect(ready).toContain(parallelEndId)
})
@@ -2039,11 +2151,15 @@ describe('EdgeManager', () => {
const edgeManager = new EdgeManager(dag)
// Branch 1: condition1 selects else
const ready1 = edgeManager.processOutgoingEdges(condition1Node, { selectedOption: 'else' })
const ready1 = edgeManager.processOutgoingEdges(condition1Node, {
selectedOption: 'else',
})
expect(ready1).toContain(nodeBId)
// Branch 2: condition2 selects if
const ready2 = edgeManager.processOutgoingEdges(condition2Node, { selectedOption: 'if' })
const ready2 = edgeManager.processOutgoingEdges(condition2Node, {
selectedOption: 'if',
})
expect(ready2).toContain(nodeCId)
// Both complete
@@ -2200,7 +2316,9 @@ describe('EdgeManager', () => {
const edgeManager = new EdgeManager(dag)
// nodeA errors
const ready1 = edgeManager.processOutgoingEdges(nodeANode, { error: 'Something failed' })
const ready1 = edgeManager.processOutgoingEdges(nodeANode, {
error: 'Something failed',
})
expect(ready1).toContain(errorNodeId)
expect(ready1).not.toContain(successNodeId)
@@ -2289,7 +2407,9 @@ describe('EdgeManager', () => {
edgeManager.processOutgoingEdges(conditionNode, { selectedOption: 'if' })
edgeManager.processOutgoingEdges(nodeANode, {})
const ready2 = edgeManager.processOutgoingEdges(loopEndNode, { selectedRoute: 'loop_exit' })
const ready2 = edgeManager.processOutgoingEdges(loopEndNode, {
selectedRoute: 'loop_exit',
})
expect(ready2).toContain(parallelEndId)
const ready3 = edgeManager.processOutgoingEdges(parallelEndNode, {
@@ -2413,7 +2533,9 @@ describe('EdgeManager', () => {
const dag = createMockDAG(nodes)
const edgeManager = new EdgeManager(dag)
const successReady = edgeManager.processOutgoingEdges(sourceNode, { result: 'ok' })
const successReady = edgeManager.processOutgoingEdges(sourceNode, {
result: 'ok',
})
expect(successReady).toContain(targetId)
})
})
@@ -2472,7 +2594,9 @@ describe('EdgeManager', () => {
const edgeManager = new EdgeManager(dag)
// Condition selects "else" branch, deactivating the "if" branch (which contains the loop)
const readyNodes = edgeManager.processOutgoingEdges(conditionNode, { selectedOption: 'else' })
const readyNodes = edgeManager.processOutgoingEdges(conditionNode, {
selectedOption: 'else',
})
// Only otherBranch should be ready
expect(readyNodes).toContain(otherBranchId)
@@ -2539,7 +2663,9 @@ describe('EdgeManager', () => {
const edgeManager = new EdgeManager(dag)
// Condition selects "else" branch
const readyNodes = edgeManager.processOutgoingEdges(conditionNode, { selectedOption: 'else' })
const readyNodes = edgeManager.processOutgoingEdges(conditionNode, {
selectedOption: 'else',
})
expect(readyNodes).toContain(otherBranchId)
expect(readyNodes).not.toContain(parallelStartId)
@@ -2626,6 +2752,171 @@ describe('EdgeManager', () => {
expect(readyNodes).not.toContain(afterLoopId)
})
it('should not queue sentinel-end when condition selects no-edge path (loop)', () => {
// Bug scenario: condition → (if) → sentinel_start → body → sentinel_end → (loop_exit) → after_loop
// → (else) → [NO outgoing edge]
// Condition evaluates false, else is selected but has no edge.
// With selectedOption set (routing decision made), cascadeTargets should NOT be queued.
// Previously sentinel_end was queued via cascadeTargets, causing downstream blocks to execute.
const conditionId = 'condition'
const sentinelStartId = 'sentinel-start'
const loopBodyId = 'loop-body'
const sentinelEndId = 'sentinel-end'
const afterLoopId = 'after-loop'
const conditionNode = createMockNode(conditionId, [
{ target: sentinelStartId, sourceHandle: 'condition-if-id' },
])
const sentinelStartNode = createMockNode(
sentinelStartId,
[{ target: loopBodyId }],
[conditionId]
)
const loopBodyNode = createMockNode(
loopBodyId,
[{ target: sentinelEndId }],
[sentinelStartId]
)
const sentinelEndNode = createMockNode(
sentinelEndId,
[
{ target: sentinelStartId, sourceHandle: 'loop_continue' },
{ target: afterLoopId, sourceHandle: 'loop_exit' },
],
[loopBodyId]
)
const afterLoopNode = createMockNode(afterLoopId, [], [sentinelEndId])
const nodes = new Map<string, DAGNode>([
[conditionId, conditionNode],
[sentinelStartId, sentinelStartNode],
[loopBodyId, loopBodyNode],
[sentinelEndId, sentinelEndNode],
[afterLoopId, afterLoopNode],
])
const dag = createMockDAG(nodes)
const edgeManager = new EdgeManager(dag)
// Condition selected else, but else has no outgoing edge.
// selectedOption is set (routing decision was made).
const readyNodes = edgeManager.processOutgoingEdges(conditionNode, {
selectedOption: 'else-id',
})
// Nothing should be queued -- the entire branch is intentionally dead
expect(readyNodes).not.toContain(sentinelStartId)
expect(readyNodes).not.toContain(loopBodyId)
expect(readyNodes).not.toContain(sentinelEndId)
expect(readyNodes).not.toContain(afterLoopId)
expect(readyNodes).toHaveLength(0)
})
it('should not queue sentinel-end when condition selects no-edge path (parallel)', () => {
// Same scenario with parallel instead of loop
const conditionId = 'condition'
const parallelStartId = 'parallel-start'
const branchId = 'branch-0'
const parallelEndId = 'parallel-end'
const afterParallelId = 'after-parallel'
const conditionNode = createMockNode(conditionId, [
{ target: parallelStartId, sourceHandle: 'condition-if-id' },
])
const parallelStartNode = createMockNode(
parallelStartId,
[{ target: branchId }],
[conditionId]
)
const branchNode = createMockNode(
branchId,
[{ target: parallelEndId, sourceHandle: 'parallel_exit' }],
[parallelStartId]
)
const parallelEndNode = createMockNode(
parallelEndId,
[{ target: afterParallelId, sourceHandle: 'parallel_exit' }],
[branchId]
)
const afterParallelNode = createMockNode(afterParallelId, [], [parallelEndId])
const nodes = new Map<string, DAGNode>([
[conditionId, conditionNode],
[parallelStartId, parallelStartNode],
[branchId, branchNode],
[parallelEndId, parallelEndNode],
[afterParallelId, afterParallelNode],
])
const dag = createMockDAG(nodes)
const edgeManager = new EdgeManager(dag)
const readyNodes = edgeManager.processOutgoingEdges(conditionNode, {
selectedOption: 'else-id',
})
expect(readyNodes).not.toContain(parallelStartId)
expect(readyNodes).not.toContain(branchId)
expect(readyNodes).not.toContain(parallelEndId)
expect(readyNodes).not.toContain(afterParallelId)
expect(readyNodes).toHaveLength(0)
})
it('should still queue sentinel-end inside loop when no condition matches (true dead-end)', () => {
// Contrast: condition INSIDE a loop with selectedOption null (no match, no routing decision).
// This is a true dead-end where cascadeTargets SHOULD fire so the loop sentinel can handle exit.
const sentinelStartId = 'sentinel-start'
const sentinelEndId = 'sentinel-end'
const conditionId = 'condition'
const nodeAId = 'node-a'
const sentinelStartNode = createMockNode(sentinelStartId, [{ target: conditionId }])
const conditionNode = createMockNode(
conditionId,
[{ target: nodeAId, sourceHandle: 'condition-if' }],
[sentinelStartId]
)
const nodeANode = createMockNode(nodeAId, [{ target: sentinelEndId }], [conditionId])
const sentinelEndNode = createMockNode(
sentinelEndId,
[
{ target: sentinelStartId, sourceHandle: 'loop_continue' },
{ target: 'after-loop', sourceHandle: 'loop_exit' },
],
[nodeAId]
)
const nodes = new Map<string, DAGNode>([
[sentinelStartId, sentinelStartNode],
[conditionId, conditionNode],
[nodeAId, nodeANode],
[sentinelEndId, sentinelEndNode],
])
const dag = createMockDAG(nodes)
const edgeManager = new EdgeManager(dag)
conditionNode.incomingEdges.clear()
// selectedOption: null → no routing decision, true dead-end
const readyNodes = edgeManager.processOutgoingEdges(conditionNode, {
selectedOption: null,
})
// sentinel-end SHOULD be queued (true dead-end inside loop)
expect(readyNodes).toContain(sentinelEndId)
})
it('should still correctly handle normal loop exit (not deactivate when loop runs)', () => {
// When a loop actually executes and exits normally, after_loop should become ready
const sentinelStartId = 'sentinel-start'

View File

@@ -69,15 +69,23 @@ export class EdgeManager {
}
}
const isDeadEnd = activatedTargets.length === 0
const isRoutedDeadEnd = isDeadEnd && !!(output.selectedOption || output.selectedRoute)
for (const targetId of cascadeTargets) {
if (!readyNodes.includes(targetId) && !activatedTargets.includes(targetId)) {
// Only queue cascade terminal control nodes when ALL outgoing edges from the
// current node were deactivated (dead-end scenario). When some edges are
// activated, terminal control nodes on deactivated branches should NOT be
// queued - they will be reached through the normal activated path's completion.
// This prevents loop/parallel sentinels on fully deactivated paths (e.g., an
// upstream condition took a different branch) from being spuriously executed.
if (activatedTargets.length === 0 && this.isTargetReady(targetId)) {
if (!isDeadEnd || !this.isTargetReady(targetId)) continue
if (isRoutedDeadEnd) {
// A condition/router deliberately selected a dead-end path.
// Only queue the sentinel if it belongs to the SAME subflow as the
// current node (the condition is inside the loop/parallel and the
// loop still needs to continue/exit). Downstream subflow sentinels
// should NOT fire.
if (this.isEnclosingSentinel(node, targetId)) {
readyNodes.push(targetId)
}
} else {
readyNodes.push(targetId)
}
}
@@ -145,6 +153,27 @@ export class EdgeManager {
return targetNode ? this.isNodeReady(targetNode) : false
}
/**
* Checks if the cascade target sentinel belongs to the same subflow as the source node.
* A condition inside a loop that hits a dead-end should still allow the enclosing
* loop's sentinel to fire so the loop can continue or exit.
*/
private isEnclosingSentinel(sourceNode: DAGNode, sentinelId: string): boolean {
const sentinel = this.dag.nodes.get(sentinelId)
if (!sentinel?.metadata.isSentinel) return false
const sourceLoopId = sourceNode.metadata.loopId
const sourceParallelId = sourceNode.metadata.parallelId
const sentinelLoopId = sentinel.metadata.loopId
const sentinelParallelId = sentinel.metadata.parallelId
if (sourceLoopId && sentinelLoopId && sourceLoopId === sentinelLoopId) return true
if (sourceParallelId && sentinelParallelId && sourceParallelId === sentinelParallelId)
return true
return false
}
private isLoopEdge(handle?: string): boolean {
return (
handle === EDGE.LOOP_CONTINUE ||

View File

@@ -555,7 +555,7 @@ describe('ConditionBlockHandler', () => {
})
describe('Condition with no outgoing edge', () => {
it('should return null path when condition matches but has no edge', async () => {
it('should set selectedOption when condition matches but has no edge', async () => {
const conditions = [
{ id: 'cond1', title: 'if', value: 'true' },
{ id: 'else1', title: 'else', value: '' },
@@ -570,9 +570,52 @@ describe('ConditionBlockHandler', () => {
const result = await handler.execute(mockContext, mockBlock, inputs)
// Condition matches but no edge for it
expect((result as any).conditionResult).toBe(false)
expect((result as any).conditionResult).toBe(true)
expect((result as any).selectedPath).toBeNull()
expect((result as any).selectedOption).toBe('cond1')
expect(mockContext.decisions.condition.get(mockBlock.id)).toBe('cond1')
})
it('should set selectedOption when else is selected but has no edge', async () => {
const conditions = [
{ id: 'cond1', title: 'if', value: 'false' },
{ id: 'else1', title: 'else', value: '' },
]
const inputs = { conditions: JSON.stringify(conditions) }
// Only the if branch has an edge; else has no outgoing connection
mockContext.workflow!.connections = [
{ source: mockSourceBlock.id, target: mockBlock.id },
{ source: mockBlock.id, target: mockTargetBlock1.id, sourceHandle: 'condition-cond1' },
]
const result = await handler.execute(mockContext, mockBlock, inputs)
expect((result as any).conditionResult).toBe(true)
expect((result as any).selectedPath).toBeNull()
expect((result as any).selectedOption).toBe('else1')
expect(mockContext.decisions.condition.get(mockBlock.id)).toBe('else1')
})
it('should deactivate if-path when else is selected with no edge', async () => {
const conditions = [
{ id: 'cond1', title: 'if', value: 'context.value > 100' },
{ id: 'else1', title: 'else', value: '' },
]
const inputs = { conditions: JSON.stringify(conditions) }
// Only the if branch has an edge to a loop; else has nothing
mockContext.workflow!.connections = [
{ source: mockSourceBlock.id, target: mockBlock.id },
{ source: mockBlock.id, target: mockTargetBlock1.id, sourceHandle: 'condition-cond1' },
]
const result = await handler.execute(mockContext, mockBlock, inputs)
// Else was selected (value 10 is not > 100), so selectedOption should be 'else1'
// This allows the edge manager to deactivate the cond1 edge
expect((result as any).selectedOption).toBe('else1')
expect((result as any).conditionResult).toBe(true)
})
})
@@ -602,6 +645,67 @@ describe('ConditionBlockHandler', () => {
})
})
describe('Source output filtering', () => {
it('should not propagate error field from source block output', async () => {
;(mockContext.blockStates as any).set(mockSourceBlock.id, {
output: { value: 10, text: 'hello', error: 'upstream block failed' },
executed: true,
executionTime: 100,
})
const conditions = [
{ id: 'cond1', title: 'if', value: 'context.value > 5' },
{ id: 'else1', title: 'else', value: '' },
]
const inputs = { conditions: JSON.stringify(conditions) }
const result = await handler.execute(mockContext, mockBlock, inputs)
expect((result as any).conditionResult).toBe(true)
expect((result as any).selectedOption).toBe('cond1')
expect(result).not.toHaveProperty('error')
})
it('should not propagate _pauseMetadata from source block output', async () => {
;(mockContext.blockStates as any).set(mockSourceBlock.id, {
output: { value: 10, _pauseMetadata: { contextId: 'abc' } },
executed: true,
executionTime: 100,
})
const conditions = [
{ id: 'cond1', title: 'if', value: 'context.value > 5' },
{ id: 'else1', title: 'else', value: '' },
]
const inputs = { conditions: JSON.stringify(conditions) }
const result = await handler.execute(mockContext, mockBlock, inputs)
expect((result as any).conditionResult).toBe(true)
expect(result).not.toHaveProperty('_pauseMetadata')
})
it('should still pass through non-control fields from source output', async () => {
;(mockContext.blockStates as any).set(mockSourceBlock.id, {
output: { value: 10, text: 'hello', customData: { nested: true } },
executed: true,
executionTime: 100,
})
const conditions = [
{ id: 'cond1', title: 'if', value: 'context.value > 5' },
{ id: 'else1', title: 'else', value: '' },
]
const inputs = { conditions: JSON.stringify(conditions) }
const result = await handler.execute(mockContext, mockBlock, inputs)
expect((result as any).value).toBe(10)
expect((result as any).text).toBe('hello')
expect((result as any).customData).toEqual({ nested: true })
})
})
describe('Virtual block ID handling', () => {
it('should use currentVirtualBlockId for decision key when available', async () => {
mockContext.currentVirtualBlockId = 'virtual-block-123'

View File

@@ -108,9 +108,7 @@ export class ConditionBlockHandler implements BlockHandler {
const evalContext = this.buildEvaluationContext(ctx, sourceBlockId)
const rawSourceOutput = sourceBlockId ? ctx.blockStates.get(sourceBlockId)?.output : null
// Filter out _pauseMetadata from source output to prevent the engine from
// thinking this block is pausing (it was already resumed by the HITL block)
const sourceOutput = this.filterPauseMetadata(rawSourceOutput)
const sourceOutput = this.filterSourceOutput(rawSourceOutput)
const outgoingConnections = ctx.workflow?.connections.filter(
(conn) => conn.source === baseBlockId
@@ -124,7 +122,7 @@ export class ConditionBlockHandler implements BlockHandler {
block.id
)
if (!selectedConnection || !selectedCondition) {
if (!selectedCondition) {
return {
...((sourceOutput as any) || {}),
conditionResult: false,
@@ -133,6 +131,17 @@ export class ConditionBlockHandler implements BlockHandler {
}
}
if (!selectedConnection) {
const decisionKey = ctx.currentVirtualBlockId || block.id
ctx.decisions.condition.set(decisionKey, selectedCondition.id)
return {
...((sourceOutput as any) || {}),
conditionResult: true,
selectedPath: null,
selectedOption: selectedCondition.id,
}
}
const targetBlock = ctx.workflow?.blocks.find((b) => b.id === selectedConnection?.target)
if (!targetBlock) {
throw new Error(`Target block ${selectedConnection?.target} not found`)
@@ -153,11 +162,11 @@ export class ConditionBlockHandler implements BlockHandler {
}
}
private filterPauseMetadata(output: any): any {
private filterSourceOutput(output: any): any {
if (!output || typeof output !== 'object') {
return output
}
const { _pauseMetadata, ...rest } = output
const { _pauseMetadata, error, ...rest } = output
return rest
}
@@ -223,8 +232,7 @@ export class ConditionBlockHandler implements BlockHandler {
if (connection) {
return { selectedConnection: connection, selectedCondition: condition }
}
// Condition is true but has no outgoing edge - branch ends gracefully
return { selectedConnection: null, selectedCondition: null }
return { selectedConnection: null, selectedCondition: condition }
}
} catch (error: any) {
logger.error(`Failed to evaluate condition "${condition.title}": ${error.message}`)
@@ -238,7 +246,7 @@ export class ConditionBlockHandler implements BlockHandler {
if (elseConnection) {
return { selectedConnection: elseConnection, selectedCondition: elseCondition }
}
return { selectedConnection: null, selectedCondition: null }
return { selectedConnection: null, selectedCondition: elseCondition }
}
return { selectedConnection: null, selectedCondition: null }

View File

@@ -21,6 +21,7 @@ import {
buildParallelSentinelStartId,
buildSentinelEndId,
buildSentinelStartId,
emitEmptySubflowEvents,
extractBaseBlockId,
resolveArrayInput,
validateMaxCount,
@@ -596,6 +597,7 @@ export class LoopOrchestrator {
if (!scope.items || scope.items.length === 0) {
logger.info('ForEach loop has empty collection, skipping loop body', { loopId })
this.state.setBlockOutput(loopId, { results: [] }, DEFAULTS.EXECUTION_TIME)
emitEmptySubflowEvents(ctx, loopId, 'loop', this.contextExtensions)
return false
}
return true
@@ -605,6 +607,7 @@ export class LoopOrchestrator {
if (scope.maxIterations === 0) {
logger.info('For loop has 0 iterations, skipping loop body', { loopId })
this.state.setBlockOutput(loopId, { results: [] }, DEFAULTS.EXECUTION_TIME)
emitEmptySubflowEvents(ctx, loopId, 'loop', this.contextExtensions)
return false
}
return true
@@ -617,6 +620,8 @@ export class LoopOrchestrator {
if (scope.loopType === 'while') {
if (!scope.condition) {
logger.warn('No condition defined for while loop', { loopId })
this.state.setBlockOutput(loopId, { results: [] }, DEFAULTS.EXECUTION_TIME)
emitEmptySubflowEvents(ctx, loopId, 'loop', this.contextExtensions)
return false
}
@@ -627,6 +632,11 @@ export class LoopOrchestrator {
result,
})
if (!result) {
this.state.setBlockOutput(loopId, { results: [] }, DEFAULTS.EXECUTION_TIME)
emitEmptySubflowEvents(ctx, loopId, 'loop', this.contextExtensions)
}
return result
}

View File

@@ -13,6 +13,7 @@ import { buildContainerIterationContext } from '@/executor/utils/iteration-conte
import { ParallelExpander } from '@/executor/utils/parallel-expansion'
import {
addSubflowErrorLog,
emitEmptySubflowEvents,
extractBranchIndex,
resolveArrayInput,
validateMaxCount,
@@ -108,6 +109,8 @@ export class ParallelOrchestrator {
this.state.setBlockOutput(parallelId, { results: [] })
emitEmptySubflowEvents(ctx, parallelId, 'parallel', this.contextExtensions)
logger.info('Parallel scope initialized with empty distribution, skipping body', {
parallelId,
branchCount: 0,

View File

@@ -1,6 +1,7 @@
import { LOOP, PARALLEL, REFERENCE } from '@/executor/constants'
import { DEFAULTS, LOOP, PARALLEL, REFERENCE } from '@/executor/constants'
import type { ContextExtensions } from '@/executor/execution/types'
import { type BlockLog, type ExecutionContext, getNextExecutionOrder } from '@/executor/types'
import { buildContainerIterationContext } from '@/executor/utils/iteration-context'
import type { VariableResolver } from '@/executor/variables/resolver'
const BRANCH_PATTERN = new RegExp(`${PARALLEL.BRANCH.PREFIX}\\d+${PARALLEL.BRANCH.SUFFIX}$`)
@@ -309,3 +310,54 @@ export function addSubflowErrorLog(
})
}
}
/**
* Emits block log + SSE events for a loop/parallel that was skipped due to an
* empty collection or false initial condition. This ensures the container block
* appears in terminal logs, execution snapshots, and edge highlighting.
*/
export function emitEmptySubflowEvents(
ctx: ExecutionContext,
blockId: string,
blockType: 'loop' | 'parallel',
contextExtensions: ContextExtensions | null
): void {
const now = new Date().toISOString()
const executionOrder = getNextExecutionOrder(ctx)
const output = { results: [] }
const block = ctx.workflow?.blocks.find((b) => b.id === blockId)
const blockName = block?.metadata?.name ?? blockType
const iterationContext = buildContainerIterationContext(ctx, blockId)
ctx.blockLogs.push({
blockId,
blockName,
blockType,
startedAt: now,
endedAt: now,
durationMs: DEFAULTS.EXECUTION_TIME,
success: true,
output,
executionOrder,
})
if (contextExtensions?.onBlockStart) {
contextExtensions.onBlockStart(blockId, blockName, blockType, executionOrder)
}
if (contextExtensions?.onBlockComplete) {
contextExtensions.onBlockComplete(
blockId,
blockName,
blockType,
{
output,
executionTime: DEFAULTS.EXECUTION_TIME,
startedAt: now,
executionOrder,
endedAt: now,
},
iterationContext
)
}
}

View File

@@ -761,9 +761,17 @@ function groupIterationBlocksRecursive(
}
}
// Non-iteration spans that aren't consumed container sentinels go straight to result
const containerIdsWithIterations = new Set<string>()
for (const span of iterationSpans) {
const outermost = getOutermostContainer(span)
if (outermost) containerIdsWithIterations.add(outermost.containerId)
}
const nonContainerSpans = nonIterationSpans.filter(
(span) => (span.type !== 'parallel' && span.type !== 'loop') || span.status === 'error'
(span) =>
(span.type !== 'parallel' && span.type !== 'loop') ||
span.status === 'error' ||
(span.blockId && !containerIdsWithIterations.has(span.blockId))
)
if (iterationSpans.length === 0) {