mirror of
https://github.com/simstudioai/sim.git
synced 2026-01-28 16:27:55 -05:00
Compare commits
4 Commits
main
...
fix/loop-e
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d3c623e02b | ||
|
|
e7e2135639 | ||
|
|
0070433cca | ||
|
|
44db6a120d |
@@ -207,6 +207,7 @@ export class EdgeConstructor {
|
|||||||
for (const connection of workflow.connections) {
|
for (const connection of workflow.connections) {
|
||||||
let { source, target } = connection
|
let { source, target } = connection
|
||||||
const originalSource = source
|
const originalSource = source
|
||||||
|
const originalTarget = target
|
||||||
let sourceHandle = this.generateSourceHandle(
|
let sourceHandle = this.generateSourceHandle(
|
||||||
source,
|
source,
|
||||||
target,
|
target,
|
||||||
@@ -257,14 +258,14 @@ export class EdgeConstructor {
|
|||||||
target = sentinelStartId
|
target = sentinelStartId
|
||||||
}
|
}
|
||||||
|
|
||||||
if (loopSentinelStartId) {
|
|
||||||
this.addEdge(dag, loopSentinelStartId, target, EDGE.LOOP_EXIT, targetHandle)
|
|
||||||
}
|
|
||||||
|
|
||||||
if (this.edgeCrossesLoopBoundary(source, target, blocksInLoops, dag)) {
|
if (this.edgeCrossesLoopBoundary(source, target, blocksInLoops, dag)) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (loopSentinelStartId && !blocksInLoops.has(originalTarget)) {
|
||||||
|
this.addEdge(dag, loopSentinelStartId, target, EDGE.LOOP_EXIT, targetHandle)
|
||||||
|
}
|
||||||
|
|
||||||
if (!this.isEdgeReachable(source, target, reachableBlocks, dag)) {
|
if (!this.isEdgeReachable(source, target, reachableBlocks, dag)) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -77,15 +77,16 @@ export class EdgeManager {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check if any deactivation targets that previously received an activated edge are now ready
|
if (output.selectedRoute !== EDGE.LOOP_EXIT && output.selectedRoute !== EDGE.PARALLEL_EXIT) {
|
||||||
for (const { target } of edgesToDeactivate) {
|
for (const { target } of edgesToDeactivate) {
|
||||||
if (
|
if (
|
||||||
!readyNodes.includes(target) &&
|
!readyNodes.includes(target) &&
|
||||||
!activatedTargets.includes(target) &&
|
!activatedTargets.includes(target) &&
|
||||||
this.nodesWithActivatedEdge.has(target) &&
|
this.nodesWithActivatedEdge.has(target) &&
|
||||||
this.isTargetReady(target)
|
this.isTargetReady(target)
|
||||||
) {
|
) {
|
||||||
readyNodes.push(target)
|
readyNodes.push(target)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -390,6 +390,12 @@ export class ExecutionEngine {
|
|||||||
logger.info('Processing outgoing edges', {
|
logger.info('Processing outgoing edges', {
|
||||||
nodeId,
|
nodeId,
|
||||||
outgoingEdgesCount: node.outgoingEdges.size,
|
outgoingEdgesCount: node.outgoingEdges.size,
|
||||||
|
outgoingEdges: Array.from(node.outgoingEdges.entries()).map(([id, e]) => ({
|
||||||
|
id,
|
||||||
|
target: e.target,
|
||||||
|
sourceHandle: e.sourceHandle,
|
||||||
|
})),
|
||||||
|
output,
|
||||||
readyNodesCount: readyNodes.length,
|
readyNodesCount: readyNodes.length,
|
||||||
readyNodes,
|
readyNodes,
|
||||||
})
|
})
|
||||||
|
|||||||
@@ -27,6 +27,8 @@ export interface ParallelScope {
|
|||||||
items?: any[]
|
items?: any[]
|
||||||
/** Error message if parallel validation failed (e.g., exceeded max branches) */
|
/** Error message if parallel validation failed (e.g., exceeded max branches) */
|
||||||
validationError?: string
|
validationError?: string
|
||||||
|
/** Whether the parallel has an empty distribution and should be skipped */
|
||||||
|
isEmpty?: boolean
|
||||||
}
|
}
|
||||||
|
|
||||||
export class ExecutionState implements BlockStateController {
|
export class ExecutionState implements BlockStateController {
|
||||||
|
|||||||
@@ -386,10 +386,10 @@ export class LoopOrchestrator {
|
|||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
// forEach: skip if items array is empty
|
|
||||||
if (scope.loopType === 'forEach') {
|
if (scope.loopType === 'forEach') {
|
||||||
if (!scope.items || scope.items.length === 0) {
|
if (!scope.items || scope.items.length === 0) {
|
||||||
logger.info('ForEach loop has empty items, skipping loop body', { loopId })
|
logger.info('ForEach loop has empty collection, skipping loop body', { loopId })
|
||||||
|
this.state.setBlockOutput(loopId, { results: [] }, DEFAULTS.EXECUTION_TIME)
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
return true
|
return true
|
||||||
@@ -399,6 +399,8 @@ export class LoopOrchestrator {
|
|||||||
if (scope.loopType === 'for') {
|
if (scope.loopType === 'for') {
|
||||||
if (scope.maxIterations === 0) {
|
if (scope.maxIterations === 0) {
|
||||||
logger.info('For loop has 0 iterations, skipping loop body', { loopId })
|
logger.info('For loop has 0 iterations, skipping loop body', { loopId })
|
||||||
|
// Set empty output for the loop
|
||||||
|
this.state.setBlockOutput(loopId, { results: [] }, DEFAULTS.EXECUTION_TIME)
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
return true
|
return true
|
||||||
|
|||||||
@@ -97,7 +97,7 @@ export class NodeExecutionOrchestrator {
|
|||||||
if (loopId) {
|
if (loopId) {
|
||||||
const shouldExecute = await this.loopOrchestrator.evaluateInitialCondition(ctx, loopId)
|
const shouldExecute = await this.loopOrchestrator.evaluateInitialCondition(ctx, loopId)
|
||||||
if (!shouldExecute) {
|
if (!shouldExecute) {
|
||||||
logger.info('While loop initial condition false, skipping loop body', { loopId })
|
logger.info('Loop initial condition false, skipping loop body', { loopId })
|
||||||
return {
|
return {
|
||||||
sentinelStart: true,
|
sentinelStart: true,
|
||||||
shouldExit: true,
|
shouldExit: true,
|
||||||
@@ -158,6 +158,17 @@ export class NodeExecutionOrchestrator {
|
|||||||
this.parallelOrchestrator.initializeParallelScope(ctx, parallelId, nodesInParallel)
|
this.parallelOrchestrator.initializeParallelScope(ctx, parallelId, nodesInParallel)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const scope = this.parallelOrchestrator.getParallelScope(ctx, parallelId)
|
||||||
|
if (scope?.isEmpty) {
|
||||||
|
logger.info('Parallel has empty distribution, skipping parallel body', { parallelId })
|
||||||
|
return {
|
||||||
|
sentinelStart: true,
|
||||||
|
shouldExit: true,
|
||||||
|
selectedRoute: EDGE.PARALLEL_EXIT,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return { sentinelStart: true }
|
return { sentinelStart: true }
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -61,11 +61,13 @@ export class ParallelOrchestrator {
|
|||||||
|
|
||||||
let items: any[] | undefined
|
let items: any[] | undefined
|
||||||
let branchCount: number
|
let branchCount: number
|
||||||
|
let isEmpty = false
|
||||||
|
|
||||||
try {
|
try {
|
||||||
const resolved = this.resolveBranchCount(ctx, parallelConfig)
|
const resolved = this.resolveBranchCount(ctx, parallelConfig, parallelId)
|
||||||
branchCount = resolved.branchCount
|
branchCount = resolved.branchCount
|
||||||
items = resolved.items
|
items = resolved.items
|
||||||
|
isEmpty = resolved.isEmpty ?? false
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
const errorMessage = `Parallel Items did not resolve: ${error instanceof Error ? error.message : String(error)}`
|
const errorMessage = `Parallel Items did not resolve: ${error instanceof Error ? error.message : String(error)}`
|
||||||
logger.error(errorMessage, { parallelId, distribution: parallelConfig.distribution })
|
logger.error(errorMessage, { parallelId, distribution: parallelConfig.distribution })
|
||||||
@@ -91,6 +93,34 @@ export class ParallelOrchestrator {
|
|||||||
throw new Error(branchError)
|
throw new Error(branchError)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Handle empty distribution - skip parallel body
|
||||||
|
if (isEmpty || branchCount === 0) {
|
||||||
|
const scope: ParallelScope = {
|
||||||
|
parallelId,
|
||||||
|
totalBranches: 0,
|
||||||
|
branchOutputs: new Map(),
|
||||||
|
completedCount: 0,
|
||||||
|
totalExpectedNodes: 0,
|
||||||
|
items: [],
|
||||||
|
isEmpty: true,
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!ctx.parallelExecutions) {
|
||||||
|
ctx.parallelExecutions = new Map()
|
||||||
|
}
|
||||||
|
ctx.parallelExecutions.set(parallelId, scope)
|
||||||
|
|
||||||
|
// Set empty output for the parallel
|
||||||
|
this.state.setBlockOutput(parallelId, { results: [] })
|
||||||
|
|
||||||
|
logger.info('Parallel scope initialized with empty distribution, skipping body', {
|
||||||
|
parallelId,
|
||||||
|
branchCount: 0,
|
||||||
|
})
|
||||||
|
|
||||||
|
return scope
|
||||||
|
}
|
||||||
|
|
||||||
const { entryNodes } = this.expander.expandParallel(this.dag, parallelId, branchCount, items)
|
const { entryNodes } = this.expander.expandParallel(this.dag, parallelId, branchCount, items)
|
||||||
|
|
||||||
const scope: ParallelScope = {
|
const scope: ParallelScope = {
|
||||||
@@ -127,15 +157,17 @@ export class ParallelOrchestrator {
|
|||||||
|
|
||||||
private resolveBranchCount(
|
private resolveBranchCount(
|
||||||
ctx: ExecutionContext,
|
ctx: ExecutionContext,
|
||||||
config: SerializedParallel
|
config: SerializedParallel,
|
||||||
): { branchCount: number; items?: any[] } {
|
parallelId: string
|
||||||
|
): { branchCount: number; items?: any[]; isEmpty?: boolean } {
|
||||||
if (config.parallelType === 'count') {
|
if (config.parallelType === 'count') {
|
||||||
return { branchCount: config.count ?? 1 }
|
return { branchCount: config.count ?? 1 }
|
||||||
}
|
}
|
||||||
|
|
||||||
const items = this.resolveDistributionItems(ctx, config)
|
const items = this.resolveDistributionItems(ctx, config)
|
||||||
if (items.length === 0) {
|
if (items.length === 0) {
|
||||||
return { branchCount: config.count ?? 1 }
|
logger.info('Parallel has empty distribution, skipping parallel body', { parallelId })
|
||||||
|
return { branchCount: 0, items: [], isEmpty: true }
|
||||||
}
|
}
|
||||||
|
|
||||||
return { branchCount: items.length, items }
|
return { branchCount: items.length, items }
|
||||||
|
|||||||
Reference in New Issue
Block a user