mirror of
https://github.com/simstudioai/sim.git
synced 2026-01-29 00:38:03 -05:00
order of ops for validations
This commit is contained in:
@@ -136,18 +136,18 @@ export class DAGBuilder {
|
||||
nodes: string[] | undefined,
|
||||
type: 'Loop' | 'Parallel'
|
||||
): void {
|
||||
const sentinelStartId =
|
||||
type === 'Loop' ? buildSentinelStartId(id) : buildParallelSentinelStartId(id)
|
||||
const sentinelStartNode = dag.nodes.get(sentinelStartId)
|
||||
|
||||
if (!sentinelStartNode) return
|
||||
|
||||
if (!nodes || nodes.length === 0) {
|
||||
throw new Error(
|
||||
`${type} has no blocks inside. Add at least one block to the ${type.toLowerCase()}.`
|
||||
)
|
||||
}
|
||||
|
||||
const sentinelStartId =
|
||||
type === 'Loop' ? buildSentinelStartId(id) : buildParallelSentinelStartId(id)
|
||||
const sentinelStartNode = dag.nodes.get(sentinelStartId)
|
||||
|
||||
if (!sentinelStartNode) return
|
||||
|
||||
const hasConnections = Array.from(sentinelStartNode.outgoingEdges.values()).some((edge) =>
|
||||
nodes.includes(extractBaseBlockId(edge.target))
|
||||
)
|
||||
|
||||
@@ -27,6 +27,8 @@ export interface ParallelScope {
|
||||
items?: any[]
|
||||
/** Error message if parallel validation failed (e.g., exceeded max branches) */
|
||||
validationError?: string
|
||||
/** Whether the parallel has an empty distribution and should be skipped */
|
||||
isEmpty?: boolean
|
||||
}
|
||||
|
||||
export class ExecutionState implements BlockStateController {
|
||||
|
||||
@@ -386,10 +386,10 @@ export class LoopOrchestrator {
|
||||
return true
|
||||
}
|
||||
|
||||
// forEach: skip if items array is empty
|
||||
if (scope.loopType === 'forEach') {
|
||||
if (!scope.items || scope.items.length === 0) {
|
||||
logger.info('ForEach loop has empty items, skipping loop body', { loopId })
|
||||
logger.info('ForEach loop has empty collection, skipping loop body', { loopId })
|
||||
this.state.setBlockOutput(loopId, { results: [] }, DEFAULTS.EXECUTION_TIME)
|
||||
return false
|
||||
}
|
||||
return true
|
||||
@@ -399,6 +399,8 @@ export class LoopOrchestrator {
|
||||
if (scope.loopType === 'for') {
|
||||
if (scope.maxIterations === 0) {
|
||||
logger.info('For loop has 0 iterations, skipping loop body', { loopId })
|
||||
// Set empty output for the loop
|
||||
this.state.setBlockOutput(loopId, { results: [] }, DEFAULTS.EXECUTION_TIME)
|
||||
return false
|
||||
}
|
||||
return true
|
||||
|
||||
@@ -158,6 +158,17 @@ export class NodeExecutionOrchestrator {
|
||||
this.parallelOrchestrator.initializeParallelScope(ctx, parallelId, nodesInParallel)
|
||||
}
|
||||
}
|
||||
|
||||
const scope = this.parallelOrchestrator.getParallelScope(ctx, parallelId)
|
||||
if (scope?.isEmpty) {
|
||||
logger.info('Parallel has empty distribution, skipping parallel body', { parallelId })
|
||||
return {
|
||||
sentinelStart: true,
|
||||
shouldExit: true,
|
||||
selectedRoute: EDGE.PARALLEL_EXIT,
|
||||
}
|
||||
}
|
||||
|
||||
return { sentinelStart: true }
|
||||
}
|
||||
|
||||
|
||||
@@ -61,11 +61,13 @@ export class ParallelOrchestrator {
|
||||
|
||||
let items: any[] | undefined
|
||||
let branchCount: number
|
||||
let isEmpty = false
|
||||
|
||||
try {
|
||||
const resolved = this.resolveBranchCount(ctx, parallelConfig)
|
||||
const resolved = this.resolveBranchCount(ctx, parallelConfig, parallelId)
|
||||
branchCount = resolved.branchCount
|
||||
items = resolved.items
|
||||
isEmpty = resolved.isEmpty ?? false
|
||||
} catch (error) {
|
||||
const errorMessage = `Parallel Items did not resolve: ${error instanceof Error ? error.message : String(error)}`
|
||||
logger.error(errorMessage, { parallelId, distribution: parallelConfig.distribution })
|
||||
@@ -91,6 +93,34 @@ export class ParallelOrchestrator {
|
||||
throw new Error(branchError)
|
||||
}
|
||||
|
||||
// Handle empty distribution - skip parallel body
|
||||
if (isEmpty || branchCount === 0) {
|
||||
const scope: ParallelScope = {
|
||||
parallelId,
|
||||
totalBranches: 0,
|
||||
branchOutputs: new Map(),
|
||||
completedCount: 0,
|
||||
totalExpectedNodes: 0,
|
||||
items: [],
|
||||
isEmpty: true,
|
||||
}
|
||||
|
||||
if (!ctx.parallelExecutions) {
|
||||
ctx.parallelExecutions = new Map()
|
||||
}
|
||||
ctx.parallelExecutions.set(parallelId, scope)
|
||||
|
||||
// Set empty output for the parallel
|
||||
this.state.setBlockOutput(parallelId, { results: [] })
|
||||
|
||||
logger.info('Parallel scope initialized with empty distribution, skipping body', {
|
||||
parallelId,
|
||||
branchCount: 0,
|
||||
})
|
||||
|
||||
return scope
|
||||
}
|
||||
|
||||
const { entryNodes } = this.expander.expandParallel(this.dag, parallelId, branchCount, items)
|
||||
|
||||
const scope: ParallelScope = {
|
||||
@@ -127,15 +157,17 @@ export class ParallelOrchestrator {
|
||||
|
||||
private resolveBranchCount(
|
||||
ctx: ExecutionContext,
|
||||
config: SerializedParallel
|
||||
): { branchCount: number; items?: any[] } {
|
||||
config: SerializedParallel,
|
||||
parallelId: string
|
||||
): { branchCount: number; items?: any[]; isEmpty?: boolean } {
|
||||
if (config.parallelType === 'count') {
|
||||
return { branchCount: config.count ?? 1 }
|
||||
}
|
||||
|
||||
const items = this.resolveDistributionItems(ctx, config)
|
||||
if (items.length === 0) {
|
||||
return { branchCount: config.count ?? 1 }
|
||||
logger.info('Parallel has empty distribution, skipping parallel body', { parallelId })
|
||||
return { branchCount: 0, items: [], isEmpty: true }
|
||||
}
|
||||
|
||||
return { branchCount: items.length, items }
|
||||
|
||||
Reference in New Issue
Block a user