From e7e2135639ca0963e03b357f37d231bba6c83e54 Mon Sep 17 00:00:00 2001 From: Vikhyath Mondreti Date: Wed, 28 Jan 2026 12:03:06 -0800 Subject: [PATCH] order of ops for validations --- apps/sim/executor/dag/builder.ts | 12 +++---- apps/sim/executor/execution/state.ts | 2 ++ apps/sim/executor/orchestrators/loop.ts | 6 ++-- apps/sim/executor/orchestrators/node.ts | 11 ++++++ apps/sim/executor/orchestrators/parallel.ts | 40 ++++++++++++++++++--- 5 files changed, 59 insertions(+), 12 deletions(-) diff --git a/apps/sim/executor/dag/builder.ts b/apps/sim/executor/dag/builder.ts index 5465df634..a0c612d17 100644 --- a/apps/sim/executor/dag/builder.ts +++ b/apps/sim/executor/dag/builder.ts @@ -136,18 +136,18 @@ export class DAGBuilder { nodes: string[] | undefined, type: 'Loop' | 'Parallel' ): void { - const sentinelStartId = - type === 'Loop' ? buildSentinelStartId(id) : buildParallelSentinelStartId(id) - const sentinelStartNode = dag.nodes.get(sentinelStartId) - - if (!sentinelStartNode) return - if (!nodes || nodes.length === 0) { throw new Error( `${type} has no blocks inside. Add at least one block to the ${type.toLowerCase()}.` ) } + const sentinelStartId = + type === 'Loop' ? buildSentinelStartId(id) : buildParallelSentinelStartId(id) + const sentinelStartNode = dag.nodes.get(sentinelStartId) + + if (!sentinelStartNode) return + const hasConnections = Array.from(sentinelStartNode.outgoingEdges.values()).some((edge) => nodes.includes(extractBaseBlockId(edge.target)) ) diff --git a/apps/sim/executor/execution/state.ts b/apps/sim/executor/execution/state.ts index 7cf849c9e..bbbc7bc42 100644 --- a/apps/sim/executor/execution/state.ts +++ b/apps/sim/executor/execution/state.ts @@ -27,6 +27,8 @@ export interface ParallelScope { items?: any[] /** Error message if parallel validation failed (e.g., exceeded max branches) */ validationError?: string + /** Whether the parallel has an empty distribution and should be skipped */ + isEmpty?: boolean } export class ExecutionState implements BlockStateController { diff --git a/apps/sim/executor/orchestrators/loop.ts b/apps/sim/executor/orchestrators/loop.ts index b9a5bd335..f0757e642 100644 --- a/apps/sim/executor/orchestrators/loop.ts +++ b/apps/sim/executor/orchestrators/loop.ts @@ -386,10 +386,10 @@ export class LoopOrchestrator { return true } - // forEach: skip if items array is empty if (scope.loopType === 'forEach') { if (!scope.items || scope.items.length === 0) { - logger.info('ForEach loop has empty items, skipping loop body', { loopId }) + logger.info('ForEach loop has empty collection, skipping loop body', { loopId }) + this.state.setBlockOutput(loopId, { results: [] }, DEFAULTS.EXECUTION_TIME) return false } return true @@ -399,6 +399,8 @@ export class LoopOrchestrator { if (scope.loopType === 'for') { if (scope.maxIterations === 0) { logger.info('For loop has 0 iterations, skipping loop body', { loopId }) + // Set empty output for the loop + this.state.setBlockOutput(loopId, { results: [] }, DEFAULTS.EXECUTION_TIME) return false } return true diff --git a/apps/sim/executor/orchestrators/node.ts b/apps/sim/executor/orchestrators/node.ts index 6ac9661ce..7ec669bd3 100644 --- a/apps/sim/executor/orchestrators/node.ts +++ b/apps/sim/executor/orchestrators/node.ts @@ -158,6 +158,17 @@ export class NodeExecutionOrchestrator { this.parallelOrchestrator.initializeParallelScope(ctx, parallelId, nodesInParallel) } } + + const scope = this.parallelOrchestrator.getParallelScope(ctx, parallelId) + if (scope?.isEmpty) { + logger.info('Parallel has empty distribution, skipping parallel body', { parallelId }) + return { + sentinelStart: true, + shouldExit: true, + selectedRoute: EDGE.PARALLEL_EXIT, + } + } + return { sentinelStart: true } } diff --git a/apps/sim/executor/orchestrators/parallel.ts b/apps/sim/executor/orchestrators/parallel.ts index ef17d624a..12ae70f72 100644 --- a/apps/sim/executor/orchestrators/parallel.ts +++ b/apps/sim/executor/orchestrators/parallel.ts @@ -61,11 +61,13 @@ export class ParallelOrchestrator { let items: any[] | undefined let branchCount: number + let isEmpty = false try { - const resolved = this.resolveBranchCount(ctx, parallelConfig) + const resolved = this.resolveBranchCount(ctx, parallelConfig, parallelId) branchCount = resolved.branchCount items = resolved.items + isEmpty = resolved.isEmpty ?? false } catch (error) { const errorMessage = `Parallel Items did not resolve: ${error instanceof Error ? error.message : String(error)}` logger.error(errorMessage, { parallelId, distribution: parallelConfig.distribution }) @@ -91,6 +93,34 @@ export class ParallelOrchestrator { throw new Error(branchError) } + // Handle empty distribution - skip parallel body + if (isEmpty || branchCount === 0) { + const scope: ParallelScope = { + parallelId, + totalBranches: 0, + branchOutputs: new Map(), + completedCount: 0, + totalExpectedNodes: 0, + items: [], + isEmpty: true, + } + + if (!ctx.parallelExecutions) { + ctx.parallelExecutions = new Map() + } + ctx.parallelExecutions.set(parallelId, scope) + + // Set empty output for the parallel + this.state.setBlockOutput(parallelId, { results: [] }) + + logger.info('Parallel scope initialized with empty distribution, skipping body', { + parallelId, + branchCount: 0, + }) + + return scope + } + const { entryNodes } = this.expander.expandParallel(this.dag, parallelId, branchCount, items) const scope: ParallelScope = { @@ -127,15 +157,17 @@ export class ParallelOrchestrator { private resolveBranchCount( ctx: ExecutionContext, - config: SerializedParallel - ): { branchCount: number; items?: any[] } { + config: SerializedParallel, + parallelId: string + ): { branchCount: number; items?: any[]; isEmpty?: boolean } { if (config.parallelType === 'count') { return { branchCount: config.count ?? 1 } } const items = this.resolveDistributionItems(ctx, config) if (items.length === 0) { - return { branchCount: config.count ?? 1 } + logger.info('Parallel has empty distribution, skipping parallel body', { parallelId }) + return { branchCount: 0, items: [], isEmpty: true } } return { branchCount: items.length, items }