Compare commits

...

4 Commits

Author SHA1 Message Date
Vikhyath Mondreti
d3c623e02b only reachable subflow nodes should hit validation 2026-01-28 12:08:36 -08:00
Vikhyath Mondreti
e7e2135639 order of ops for validations 2026-01-28 12:03:06 -08:00
Siddharth Ganesan
0070433cca Cleanup 2026-01-28 11:31:10 -08:00
Siddharth Ganesan
44db6a120d Fix 2026-01-28 11:25:38 -08:00
7 changed files with 75 additions and 20 deletions

View File

@@ -207,6 +207,7 @@ export class EdgeConstructor {
for (const connection of workflow.connections) {
let { source, target } = connection
const originalSource = source
const originalTarget = target
let sourceHandle = this.generateSourceHandle(
source,
target,
@@ -257,14 +258,14 @@ export class EdgeConstructor {
target = sentinelStartId
}
if (loopSentinelStartId) {
this.addEdge(dag, loopSentinelStartId, target, EDGE.LOOP_EXIT, targetHandle)
}
if (this.edgeCrossesLoopBoundary(source, target, blocksInLoops, dag)) {
continue
}
if (loopSentinelStartId && !blocksInLoops.has(originalTarget)) {
this.addEdge(dag, loopSentinelStartId, target, EDGE.LOOP_EXIT, targetHandle)
}
if (!this.isEdgeReachable(source, target, reachableBlocks, dag)) {
continue
}

View File

@@ -77,15 +77,16 @@ export class EdgeManager {
}
}
// Check if any deactivation targets that previously received an activated edge are now ready
for (const { target } of edgesToDeactivate) {
if (
!readyNodes.includes(target) &&
!activatedTargets.includes(target) &&
this.nodesWithActivatedEdge.has(target) &&
this.isTargetReady(target)
) {
readyNodes.push(target)
if (output.selectedRoute !== EDGE.LOOP_EXIT && output.selectedRoute !== EDGE.PARALLEL_EXIT) {
for (const { target } of edgesToDeactivate) {
if (
!readyNodes.includes(target) &&
!activatedTargets.includes(target) &&
this.nodesWithActivatedEdge.has(target) &&
this.isTargetReady(target)
) {
readyNodes.push(target)
}
}
}

View File

@@ -390,6 +390,12 @@ export class ExecutionEngine {
logger.info('Processing outgoing edges', {
nodeId,
outgoingEdgesCount: node.outgoingEdges.size,
outgoingEdges: Array.from(node.outgoingEdges.entries()).map(([id, e]) => ({
id,
target: e.target,
sourceHandle: e.sourceHandle,
})),
output,
readyNodesCount: readyNodes.length,
readyNodes,
})

View File

@@ -27,6 +27,8 @@ export interface ParallelScope {
items?: any[]
/** Error message if parallel validation failed (e.g., exceeded max branches) */
validationError?: string
/** Whether the parallel has an empty distribution and should be skipped */
isEmpty?: boolean
}
export class ExecutionState implements BlockStateController {

View File

@@ -386,10 +386,10 @@ export class LoopOrchestrator {
return true
}
// forEach: skip if items array is empty
if (scope.loopType === 'forEach') {
if (!scope.items || scope.items.length === 0) {
logger.info('ForEach loop has empty items, skipping loop body', { loopId })
logger.info('ForEach loop has empty collection, skipping loop body', { loopId })
this.state.setBlockOutput(loopId, { results: [] }, DEFAULTS.EXECUTION_TIME)
return false
}
return true
@@ -399,6 +399,8 @@ export class LoopOrchestrator {
if (scope.loopType === 'for') {
if (scope.maxIterations === 0) {
logger.info('For loop has 0 iterations, skipping loop body', { loopId })
// Set empty output for the loop
this.state.setBlockOutput(loopId, { results: [] }, DEFAULTS.EXECUTION_TIME)
return false
}
return true

View File

@@ -97,7 +97,7 @@ export class NodeExecutionOrchestrator {
if (loopId) {
const shouldExecute = await this.loopOrchestrator.evaluateInitialCondition(ctx, loopId)
if (!shouldExecute) {
logger.info('While loop initial condition false, skipping loop body', { loopId })
logger.info('Loop initial condition false, skipping loop body', { loopId })
return {
sentinelStart: true,
shouldExit: true,
@@ -158,6 +158,17 @@ export class NodeExecutionOrchestrator {
this.parallelOrchestrator.initializeParallelScope(ctx, parallelId, nodesInParallel)
}
}
const scope = this.parallelOrchestrator.getParallelScope(ctx, parallelId)
if (scope?.isEmpty) {
logger.info('Parallel has empty distribution, skipping parallel body', { parallelId })
return {
sentinelStart: true,
shouldExit: true,
selectedRoute: EDGE.PARALLEL_EXIT,
}
}
return { sentinelStart: true }
}

View File

@@ -61,11 +61,13 @@ export class ParallelOrchestrator {
let items: any[] | undefined
let branchCount: number
let isEmpty = false
try {
const resolved = this.resolveBranchCount(ctx, parallelConfig)
const resolved = this.resolveBranchCount(ctx, parallelConfig, parallelId)
branchCount = resolved.branchCount
items = resolved.items
isEmpty = resolved.isEmpty ?? false
} catch (error) {
const errorMessage = `Parallel Items did not resolve: ${error instanceof Error ? error.message : String(error)}`
logger.error(errorMessage, { parallelId, distribution: parallelConfig.distribution })
@@ -91,6 +93,34 @@ export class ParallelOrchestrator {
throw new Error(branchError)
}
// Handle empty distribution - skip parallel body
if (isEmpty || branchCount === 0) {
const scope: ParallelScope = {
parallelId,
totalBranches: 0,
branchOutputs: new Map(),
completedCount: 0,
totalExpectedNodes: 0,
items: [],
isEmpty: true,
}
if (!ctx.parallelExecutions) {
ctx.parallelExecutions = new Map()
}
ctx.parallelExecutions.set(parallelId, scope)
// Set empty output for the parallel
this.state.setBlockOutput(parallelId, { results: [] })
logger.info('Parallel scope initialized with empty distribution, skipping body', {
parallelId,
branchCount: 0,
})
return scope
}
const { entryNodes } = this.expander.expandParallel(this.dag, parallelId, branchCount, items)
const scope: ParallelScope = {
@@ -127,15 +157,17 @@ export class ParallelOrchestrator {
private resolveBranchCount(
ctx: ExecutionContext,
config: SerializedParallel
): { branchCount: number; items?: any[] } {
config: SerializedParallel,
parallelId: string
): { branchCount: number; items?: any[]; isEmpty?: boolean } {
if (config.parallelType === 'count') {
return { branchCount: config.count ?? 1 }
}
const items = this.resolveDistributionItems(ctx, config)
if (items.length === 0) {
return { branchCount: config.count ?? 1 }
logger.info('Parallel has empty distribution, skipping parallel body', { parallelId })
return { branchCount: 0, items: [], isEmpty: true }
}
return { branchCount: items.length, items }