Compare commits

...

4 Commits

Author SHA1 Message Date
Vikhyath Mondreti
d3c623e02b only reachable subflow nodes should hit validation 2026-01-28 12:08:36 -08:00
Vikhyath Mondreti
e7e2135639 order of ops for validations 2026-01-28 12:03:06 -08:00
Siddharth Ganesan
0070433cca Cleanup 2026-01-28 11:31:10 -08:00
Siddharth Ganesan
44db6a120d Fix 2026-01-28 11:25:38 -08:00
7 changed files with 75 additions and 20 deletions

View File

@@ -207,6 +207,7 @@ export class EdgeConstructor {
for (const connection of workflow.connections) { for (const connection of workflow.connections) {
let { source, target } = connection let { source, target } = connection
const originalSource = source const originalSource = source
const originalTarget = target
let sourceHandle = this.generateSourceHandle( let sourceHandle = this.generateSourceHandle(
source, source,
target, target,
@@ -257,14 +258,14 @@ export class EdgeConstructor {
target = sentinelStartId target = sentinelStartId
} }
if (loopSentinelStartId) {
this.addEdge(dag, loopSentinelStartId, target, EDGE.LOOP_EXIT, targetHandle)
}
if (this.edgeCrossesLoopBoundary(source, target, blocksInLoops, dag)) { if (this.edgeCrossesLoopBoundary(source, target, blocksInLoops, dag)) {
continue continue
} }
if (loopSentinelStartId && !blocksInLoops.has(originalTarget)) {
this.addEdge(dag, loopSentinelStartId, target, EDGE.LOOP_EXIT, targetHandle)
}
if (!this.isEdgeReachable(source, target, reachableBlocks, dag)) { if (!this.isEdgeReachable(source, target, reachableBlocks, dag)) {
continue continue
} }

View File

@@ -77,15 +77,16 @@ export class EdgeManager {
} }
} }
// Check if any deactivation targets that previously received an activated edge are now ready if (output.selectedRoute !== EDGE.LOOP_EXIT && output.selectedRoute !== EDGE.PARALLEL_EXIT) {
for (const { target } of edgesToDeactivate) { for (const { target } of edgesToDeactivate) {
if ( if (
!readyNodes.includes(target) && !readyNodes.includes(target) &&
!activatedTargets.includes(target) && !activatedTargets.includes(target) &&
this.nodesWithActivatedEdge.has(target) && this.nodesWithActivatedEdge.has(target) &&
this.isTargetReady(target) this.isTargetReady(target)
) { ) {
readyNodes.push(target) readyNodes.push(target)
}
} }
} }

View File

@@ -390,6 +390,12 @@ export class ExecutionEngine {
logger.info('Processing outgoing edges', { logger.info('Processing outgoing edges', {
nodeId, nodeId,
outgoingEdgesCount: node.outgoingEdges.size, outgoingEdgesCount: node.outgoingEdges.size,
outgoingEdges: Array.from(node.outgoingEdges.entries()).map(([id, e]) => ({
id,
target: e.target,
sourceHandle: e.sourceHandle,
})),
output,
readyNodesCount: readyNodes.length, readyNodesCount: readyNodes.length,
readyNodes, readyNodes,
}) })

View File

@@ -27,6 +27,8 @@ export interface ParallelScope {
items?: any[] items?: any[]
/** Error message if parallel validation failed (e.g., exceeded max branches) */ /** Error message if parallel validation failed (e.g., exceeded max branches) */
validationError?: string validationError?: string
/** Whether the parallel has an empty distribution and should be skipped */
isEmpty?: boolean
} }
export class ExecutionState implements BlockStateController { export class ExecutionState implements BlockStateController {

View File

@@ -386,10 +386,10 @@ export class LoopOrchestrator {
return true return true
} }
// forEach: skip if items array is empty
if (scope.loopType === 'forEach') { if (scope.loopType === 'forEach') {
if (!scope.items || scope.items.length === 0) { if (!scope.items || scope.items.length === 0) {
logger.info('ForEach loop has empty items, skipping loop body', { loopId }) logger.info('ForEach loop has empty collection, skipping loop body', { loopId })
this.state.setBlockOutput(loopId, { results: [] }, DEFAULTS.EXECUTION_TIME)
return false return false
} }
return true return true
@@ -399,6 +399,8 @@ export class LoopOrchestrator {
if (scope.loopType === 'for') { if (scope.loopType === 'for') {
if (scope.maxIterations === 0) { if (scope.maxIterations === 0) {
logger.info('For loop has 0 iterations, skipping loop body', { loopId }) logger.info('For loop has 0 iterations, skipping loop body', { loopId })
// Set empty output for the loop
this.state.setBlockOutput(loopId, { results: [] }, DEFAULTS.EXECUTION_TIME)
return false return false
} }
return true return true

View File

@@ -97,7 +97,7 @@ export class NodeExecutionOrchestrator {
if (loopId) { if (loopId) {
const shouldExecute = await this.loopOrchestrator.evaluateInitialCondition(ctx, loopId) const shouldExecute = await this.loopOrchestrator.evaluateInitialCondition(ctx, loopId)
if (!shouldExecute) { if (!shouldExecute) {
logger.info('While loop initial condition false, skipping loop body', { loopId }) logger.info('Loop initial condition false, skipping loop body', { loopId })
return { return {
sentinelStart: true, sentinelStart: true,
shouldExit: true, shouldExit: true,
@@ -158,6 +158,17 @@ export class NodeExecutionOrchestrator {
this.parallelOrchestrator.initializeParallelScope(ctx, parallelId, nodesInParallel) this.parallelOrchestrator.initializeParallelScope(ctx, parallelId, nodesInParallel)
} }
} }
const scope = this.parallelOrchestrator.getParallelScope(ctx, parallelId)
if (scope?.isEmpty) {
logger.info('Parallel has empty distribution, skipping parallel body', { parallelId })
return {
sentinelStart: true,
shouldExit: true,
selectedRoute: EDGE.PARALLEL_EXIT,
}
}
return { sentinelStart: true } return { sentinelStart: true }
} }

View File

@@ -61,11 +61,13 @@ export class ParallelOrchestrator {
let items: any[] | undefined let items: any[] | undefined
let branchCount: number let branchCount: number
let isEmpty = false
try { try {
const resolved = this.resolveBranchCount(ctx, parallelConfig) const resolved = this.resolveBranchCount(ctx, parallelConfig, parallelId)
branchCount = resolved.branchCount branchCount = resolved.branchCount
items = resolved.items items = resolved.items
isEmpty = resolved.isEmpty ?? false
} catch (error) { } catch (error) {
const errorMessage = `Parallel Items did not resolve: ${error instanceof Error ? error.message : String(error)}` const errorMessage = `Parallel Items did not resolve: ${error instanceof Error ? error.message : String(error)}`
logger.error(errorMessage, { parallelId, distribution: parallelConfig.distribution }) logger.error(errorMessage, { parallelId, distribution: parallelConfig.distribution })
@@ -91,6 +93,34 @@ export class ParallelOrchestrator {
throw new Error(branchError) throw new Error(branchError)
} }
// Handle empty distribution - skip parallel body
if (isEmpty || branchCount === 0) {
const scope: ParallelScope = {
parallelId,
totalBranches: 0,
branchOutputs: new Map(),
completedCount: 0,
totalExpectedNodes: 0,
items: [],
isEmpty: true,
}
if (!ctx.parallelExecutions) {
ctx.parallelExecutions = new Map()
}
ctx.parallelExecutions.set(parallelId, scope)
// Set empty output for the parallel
this.state.setBlockOutput(parallelId, { results: [] })
logger.info('Parallel scope initialized with empty distribution, skipping body', {
parallelId,
branchCount: 0,
})
return scope
}
const { entryNodes } = this.expander.expandParallel(this.dag, parallelId, branchCount, items) const { entryNodes } = this.expander.expandParallel(this.dag, parallelId, branchCount, items)
const scope: ParallelScope = { const scope: ParallelScope = {
@@ -127,15 +157,17 @@ export class ParallelOrchestrator {
private resolveBranchCount( private resolveBranchCount(
ctx: ExecutionContext, ctx: ExecutionContext,
config: SerializedParallel config: SerializedParallel,
): { branchCount: number; items?: any[] } { parallelId: string
): { branchCount: number; items?: any[]; isEmpty?: boolean } {
if (config.parallelType === 'count') { if (config.parallelType === 'count') {
return { branchCount: config.count ?? 1 } return { branchCount: config.count ?? 1 }
} }
const items = this.resolveDistributionItems(ctx, config) const items = this.resolveDistributionItems(ctx, config)
if (items.length === 0) { if (items.length === 0) {
return { branchCount: config.count ?? 1 } logger.info('Parallel has empty distribution, skipping parallel body', { parallelId })
return { branchCount: 0, items: [], isEmpty: true }
} }
return { branchCount: items.length, items } return { branchCount: items.length, items }