mirror of
https://github.com/simstudioai/sim.git
synced 2026-01-08 22:48:14 -05:00
improvement(logs): fixed logs for parallel and loop execution flow (#2468)
* fixed logs for parallel and loop execution flow * Fix array check for collection * fixed for empty loop and paralle blocks and showing input on dashboard * extracted utility functions * fixed the refrencing errors and making sure it propogates to the console * fix parallel * fix tests' --------- Co-authored-by: priyanshu.solanki <priyanshu.solanki@saviynt.com> Co-authored-by: Siddharth Ganesan <siddharthganesan@gmail.com> Co-authored-by: Vikhyath Mondreti <vikhyath@simstudio.ai>
This commit is contained in:
committed by
GitHub
parent
b0748c82f9
commit
c252e885af
@@ -128,6 +128,8 @@ export const DEFAULTS = {
|
||||
BLOCK_TITLE: 'Untitled Block',
|
||||
WORKFLOW_NAME: 'Workflow',
|
||||
MAX_LOOP_ITERATIONS: 1000,
|
||||
MAX_FOREACH_ITEMS: 1000,
|
||||
MAX_PARALLEL_BRANCHES: 20,
|
||||
MAX_WORKFLOW_DEPTH: 10,
|
||||
EXECUTION_TIME: 0,
|
||||
TOKENS: {
|
||||
|
||||
@@ -4,6 +4,7 @@ import { LoopConstructor } from '@/executor/dag/construction/loops'
|
||||
import { NodeConstructor } from '@/executor/dag/construction/nodes'
|
||||
import { PathConstructor } from '@/executor/dag/construction/paths'
|
||||
import type { DAGEdge, NodeMetadata } from '@/executor/dag/types'
|
||||
import { buildSentinelStartId, extractBaseBlockId } from '@/executor/utils/subflow-utils'
|
||||
import type {
|
||||
SerializedBlock,
|
||||
SerializedLoop,
|
||||
@@ -79,6 +80,9 @@ export class DAGBuilder {
|
||||
}
|
||||
}
|
||||
|
||||
// Validate loop and parallel structure
|
||||
this.validateSubflowStructure(dag)
|
||||
|
||||
logger.info('DAG built', {
|
||||
totalNodes: dag.nodes.size,
|
||||
loopCount: dag.loopConfigs.size,
|
||||
@@ -105,4 +109,43 @@ export class DAGBuilder {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Validates that loops and parallels have proper internal structure.
|
||||
* Throws an error if a loop/parallel has no blocks inside or no connections from start.
|
||||
*/
|
||||
private validateSubflowStructure(dag: DAG): void {
|
||||
for (const [id, config] of dag.loopConfigs) {
|
||||
this.validateSubflow(dag, id, config.nodes, 'Loop')
|
||||
}
|
||||
for (const [id, config] of dag.parallelConfigs) {
|
||||
this.validateSubflow(dag, id, config.nodes, 'Parallel')
|
||||
}
|
||||
}
|
||||
|
||||
private validateSubflow(
|
||||
dag: DAG,
|
||||
id: string,
|
||||
nodes: string[] | undefined,
|
||||
type: 'Loop' | 'Parallel'
|
||||
): void {
|
||||
if (!nodes || nodes.length === 0) {
|
||||
throw new Error(
|
||||
`${type} has no blocks inside. Add at least one block to the ${type.toLowerCase()}.`
|
||||
)
|
||||
}
|
||||
|
||||
const sentinelStartNode = dag.nodes.get(buildSentinelStartId(id))
|
||||
if (!sentinelStartNode) return
|
||||
|
||||
const hasConnections = Array.from(sentinelStartNode.outgoingEdges.values()).some((edge) =>
|
||||
nodes.includes(extractBaseBlockId(edge.target))
|
||||
)
|
||||
|
||||
if (!hasConnections) {
|
||||
throw new Error(
|
||||
`${type} start is not connected to any blocks. Connect a block to the ${type.toLowerCase()} start.`
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -63,8 +63,10 @@ export class DAGExecutor {
|
||||
|
||||
const resolver = new VariableResolver(this.workflow, this.workflowVariables, state)
|
||||
const loopOrchestrator = new LoopOrchestrator(dag, state, resolver)
|
||||
loopOrchestrator.setContextExtensions(this.contextExtensions)
|
||||
const parallelOrchestrator = new ParallelOrchestrator(dag, state)
|
||||
parallelOrchestrator.setResolver(resolver)
|
||||
parallelOrchestrator.setContextExtensions(this.contextExtensions)
|
||||
const allHandlers = createBlockHandlers()
|
||||
const blockExecutor = new BlockExecutor(allHandlers, resolver, this.contextExtensions, state)
|
||||
const edgeManager = new EdgeManager(dag)
|
||||
|
||||
@@ -14,6 +14,8 @@ export interface LoopScope {
|
||||
condition?: string
|
||||
loopType?: 'for' | 'forEach' | 'while' | 'doWhile'
|
||||
skipFirstConditionCheck?: boolean
|
||||
/** Error message if loop validation failed (e.g., exceeded max iterations) */
|
||||
validationError?: string
|
||||
}
|
||||
|
||||
export interface ParallelScope {
|
||||
@@ -23,6 +25,8 @@ export interface ParallelScope {
|
||||
completedCount: number
|
||||
totalExpectedNodes: number
|
||||
items?: any[]
|
||||
/** Error message if parallel validation failed (e.g., exceeded max branches) */
|
||||
validationError?: string
|
||||
}
|
||||
|
||||
export class ExecutionState implements BlockStateController {
|
||||
|
||||
@@ -5,14 +5,17 @@ import { buildLoopIndexCondition, DEFAULTS, EDGE } from '@/executor/constants'
|
||||
import type { DAG } from '@/executor/dag/builder'
|
||||
import type { EdgeManager } from '@/executor/execution/edge-manager'
|
||||
import type { LoopScope } from '@/executor/execution/state'
|
||||
import type { BlockStateController } from '@/executor/execution/types'
|
||||
import type { BlockStateController, ContextExtensions } from '@/executor/execution/types'
|
||||
import type { ExecutionContext, NormalizedBlockOutput } from '@/executor/types'
|
||||
import type { LoopConfigWithNodes } from '@/executor/types/loop'
|
||||
import { replaceValidReferences } from '@/executor/utils/reference-validation'
|
||||
import {
|
||||
addSubflowErrorLog,
|
||||
buildSentinelEndId,
|
||||
buildSentinelStartId,
|
||||
extractBaseBlockId,
|
||||
resolveArrayInput,
|
||||
validateMaxCount,
|
||||
} from '@/executor/utils/subflow-utils'
|
||||
import type { VariableResolver } from '@/executor/variables/resolver'
|
||||
import type { SerializedLoop } from '@/serializer/types'
|
||||
@@ -32,6 +35,7 @@ export interface LoopContinuationResult {
|
||||
|
||||
export class LoopOrchestrator {
|
||||
private edgeManager: EdgeManager | null = null
|
||||
private contextExtensions: ContextExtensions | null = null
|
||||
|
||||
constructor(
|
||||
private dag: DAG,
|
||||
@@ -39,6 +43,10 @@ export class LoopOrchestrator {
|
||||
private resolver: VariableResolver
|
||||
) {}
|
||||
|
||||
setContextExtensions(contextExtensions: ContextExtensions): void {
|
||||
this.contextExtensions = contextExtensions
|
||||
}
|
||||
|
||||
setEdgeManager(edgeManager: EdgeManager): void {
|
||||
this.edgeManager = edgeManager
|
||||
}
|
||||
@@ -48,7 +56,6 @@ export class LoopOrchestrator {
|
||||
if (!loopConfig) {
|
||||
throw new Error(`Loop config not found: ${loopId}`)
|
||||
}
|
||||
|
||||
const scope: LoopScope = {
|
||||
iteration: 0,
|
||||
currentIterationOutputs: new Map(),
|
||||
@@ -58,15 +65,70 @@ export class LoopOrchestrator {
|
||||
const loopType = loopConfig.loopType
|
||||
|
||||
switch (loopType) {
|
||||
case 'for':
|
||||
case 'for': {
|
||||
scope.loopType = 'for'
|
||||
scope.maxIterations = loopConfig.iterations || DEFAULTS.MAX_LOOP_ITERATIONS
|
||||
const requestedIterations = loopConfig.iterations || DEFAULTS.MAX_LOOP_ITERATIONS
|
||||
|
||||
const iterationError = validateMaxCount(
|
||||
requestedIterations,
|
||||
DEFAULTS.MAX_LOOP_ITERATIONS,
|
||||
'For loop iterations'
|
||||
)
|
||||
if (iterationError) {
|
||||
logger.error(iterationError, { loopId, requestedIterations })
|
||||
this.addLoopErrorLog(ctx, loopId, loopType, iterationError, {
|
||||
iterations: requestedIterations,
|
||||
})
|
||||
scope.maxIterations = 0
|
||||
scope.validationError = iterationError
|
||||
scope.condition = buildLoopIndexCondition(0)
|
||||
ctx.loopExecutions?.set(loopId, scope)
|
||||
throw new Error(iterationError)
|
||||
}
|
||||
|
||||
scope.maxIterations = requestedIterations
|
||||
scope.condition = buildLoopIndexCondition(scope.maxIterations)
|
||||
break
|
||||
}
|
||||
|
||||
case 'forEach': {
|
||||
scope.loopType = 'forEach'
|
||||
const items = this.resolveForEachItems(ctx, loopConfig.forEachItems)
|
||||
let items: any[]
|
||||
try {
|
||||
items = this.resolveForEachItems(ctx, loopConfig.forEachItems)
|
||||
} catch (error) {
|
||||
const errorMessage = `ForEach loop resolution failed: ${error instanceof Error ? error.message : String(error)}`
|
||||
logger.error(errorMessage, { loopId, forEachItems: loopConfig.forEachItems })
|
||||
this.addLoopErrorLog(ctx, loopId, loopType, errorMessage, {
|
||||
forEachItems: loopConfig.forEachItems,
|
||||
})
|
||||
scope.items = []
|
||||
scope.maxIterations = 0
|
||||
scope.validationError = errorMessage
|
||||
scope.condition = buildLoopIndexCondition(0)
|
||||
ctx.loopExecutions?.set(loopId, scope)
|
||||
throw new Error(errorMessage)
|
||||
}
|
||||
|
||||
const sizeError = validateMaxCount(
|
||||
items.length,
|
||||
DEFAULTS.MAX_FOREACH_ITEMS,
|
||||
'ForEach loop collection size'
|
||||
)
|
||||
if (sizeError) {
|
||||
logger.error(sizeError, { loopId, collectionSize: items.length })
|
||||
this.addLoopErrorLog(ctx, loopId, loopType, sizeError, {
|
||||
forEachItems: loopConfig.forEachItems,
|
||||
collectionSize: items.length,
|
||||
})
|
||||
scope.items = []
|
||||
scope.maxIterations = 0
|
||||
scope.validationError = sizeError
|
||||
scope.condition = buildLoopIndexCondition(0)
|
||||
ctx.loopExecutions?.set(loopId, scope)
|
||||
throw new Error(sizeError)
|
||||
}
|
||||
|
||||
scope.items = items
|
||||
scope.maxIterations = items.length
|
||||
scope.item = items[0]
|
||||
@@ -79,15 +141,35 @@ export class LoopOrchestrator {
|
||||
scope.condition = loopConfig.whileCondition
|
||||
break
|
||||
|
||||
case 'doWhile':
|
||||
case 'doWhile': {
|
||||
scope.loopType = 'doWhile'
|
||||
if (loopConfig.doWhileCondition) {
|
||||
scope.condition = loopConfig.doWhileCondition
|
||||
} else {
|
||||
scope.maxIterations = loopConfig.iterations || DEFAULTS.MAX_LOOP_ITERATIONS
|
||||
const requestedIterations = loopConfig.iterations || DEFAULTS.MAX_LOOP_ITERATIONS
|
||||
|
||||
const iterationError = validateMaxCount(
|
||||
requestedIterations,
|
||||
DEFAULTS.MAX_LOOP_ITERATIONS,
|
||||
'Do-While loop iterations'
|
||||
)
|
||||
if (iterationError) {
|
||||
logger.error(iterationError, { loopId, requestedIterations })
|
||||
this.addLoopErrorLog(ctx, loopId, loopType, iterationError, {
|
||||
iterations: requestedIterations,
|
||||
})
|
||||
scope.maxIterations = 0
|
||||
scope.validationError = iterationError
|
||||
scope.condition = buildLoopIndexCondition(0)
|
||||
ctx.loopExecutions?.set(loopId, scope)
|
||||
throw new Error(iterationError)
|
||||
}
|
||||
|
||||
scope.maxIterations = requestedIterations
|
||||
scope.condition = buildLoopIndexCondition(scope.maxIterations)
|
||||
}
|
||||
break
|
||||
}
|
||||
|
||||
default:
|
||||
throw new Error(`Unknown loop type: ${loopType}`)
|
||||
@@ -100,6 +182,23 @@ export class LoopOrchestrator {
|
||||
return scope
|
||||
}
|
||||
|
||||
private addLoopErrorLog(
|
||||
ctx: ExecutionContext,
|
||||
loopId: string,
|
||||
loopType: string,
|
||||
errorMessage: string,
|
||||
inputData?: any
|
||||
): void {
|
||||
addSubflowErrorLog(
|
||||
ctx,
|
||||
loopId,
|
||||
'loop',
|
||||
errorMessage,
|
||||
{ loopType, ...inputData },
|
||||
this.contextExtensions
|
||||
)
|
||||
}
|
||||
|
||||
storeLoopNodeOutput(
|
||||
ctx: ExecutionContext,
|
||||
loopId: string,
|
||||
@@ -412,54 +511,6 @@ export class LoopOrchestrator {
|
||||
}
|
||||
|
||||
private resolveForEachItems(ctx: ExecutionContext, items: any): any[] {
|
||||
if (Array.isArray(items)) {
|
||||
return items
|
||||
}
|
||||
|
||||
if (typeof items === 'object' && items !== null) {
|
||||
return Object.entries(items)
|
||||
}
|
||||
|
||||
if (typeof items === 'string') {
|
||||
if (items.startsWith('<') && items.endsWith('>')) {
|
||||
const resolved = this.resolver.resolveSingleReference(ctx, '', items)
|
||||
if (Array.isArray(resolved)) {
|
||||
return resolved
|
||||
}
|
||||
return []
|
||||
}
|
||||
|
||||
try {
|
||||
const normalized = items.replace(/'/g, '"')
|
||||
const parsed = JSON.parse(normalized)
|
||||
if (Array.isArray(parsed)) {
|
||||
return parsed
|
||||
}
|
||||
return []
|
||||
} catch (error) {
|
||||
logger.error('Failed to parse forEach items', { items, error })
|
||||
return []
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
const resolved = this.resolver.resolveInputs(ctx, 'loop_foreach_items', { items }).items
|
||||
|
||||
if (Array.isArray(resolved)) {
|
||||
return resolved
|
||||
}
|
||||
|
||||
logger.warn('ForEach items did not resolve to array', {
|
||||
items,
|
||||
resolved,
|
||||
})
|
||||
|
||||
return []
|
||||
} catch (error: any) {
|
||||
logger.error('Error resolving forEach items, returning empty array:', {
|
||||
error: error.message,
|
||||
})
|
||||
return []
|
||||
}
|
||||
return resolveArrayInput(ctx, items, this.resolver)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,15 +1,19 @@
|
||||
import { createLogger } from '@/lib/logs/console/logger'
|
||||
import { DEFAULTS } from '@/executor/constants'
|
||||
import type { DAG, DAGNode } from '@/executor/dag/builder'
|
||||
import type { ParallelScope } from '@/executor/execution/state'
|
||||
import type { BlockStateWriter } from '@/executor/execution/types'
|
||||
import type { BlockStateWriter, ContextExtensions } from '@/executor/execution/types'
|
||||
import type { ExecutionContext, NormalizedBlockOutput } from '@/executor/types'
|
||||
import type { ParallelConfigWithNodes } from '@/executor/types/parallel'
|
||||
import {
|
||||
addSubflowErrorLog,
|
||||
buildBranchNodeId,
|
||||
calculateBranchCount,
|
||||
extractBaseBlockId,
|
||||
extractBranchIndex,
|
||||
parseDistributionItems,
|
||||
resolveArrayInput,
|
||||
validateMaxCount,
|
||||
} from '@/executor/utils/subflow-utils'
|
||||
import type { VariableResolver } from '@/executor/variables/resolver'
|
||||
import type { SerializedParallel } from '@/serializer/types'
|
||||
@@ -32,6 +36,7 @@ export interface ParallelAggregationResult {
|
||||
|
||||
export class ParallelOrchestrator {
|
||||
private resolver: VariableResolver | null = null
|
||||
private contextExtensions: ContextExtensions | null = null
|
||||
|
||||
constructor(
|
||||
private dag: DAG,
|
||||
@@ -42,6 +47,10 @@ export class ParallelOrchestrator {
|
||||
this.resolver = resolver
|
||||
}
|
||||
|
||||
setContextExtensions(contextExtensions: ContextExtensions): void {
|
||||
this.contextExtensions = contextExtensions
|
||||
}
|
||||
|
||||
initializeParallelScope(
|
||||
ctx: ExecutionContext,
|
||||
parallelId: string,
|
||||
@@ -49,11 +58,42 @@ export class ParallelOrchestrator {
|
||||
terminalNodesCount = 1
|
||||
): ParallelScope {
|
||||
const parallelConfig = this.dag.parallelConfigs.get(parallelId)
|
||||
const items = parallelConfig ? this.resolveDistributionItems(ctx, parallelConfig) : undefined
|
||||
|
||||
// If we have more items than pre-built branches, expand the DAG
|
||||
let items: any[] | undefined
|
||||
if (parallelConfig) {
|
||||
try {
|
||||
items = this.resolveDistributionItems(ctx, parallelConfig)
|
||||
} catch (error) {
|
||||
const errorMessage = `Parallel Items did not resolve: ${error instanceof Error ? error.message : String(error)}`
|
||||
logger.error(errorMessage, {
|
||||
parallelId,
|
||||
distribution: parallelConfig.distribution,
|
||||
})
|
||||
this.addParallelErrorLog(ctx, parallelId, errorMessage, {
|
||||
distribution: parallelConfig.distribution,
|
||||
})
|
||||
this.setErrorScope(ctx, parallelId, errorMessage)
|
||||
throw new Error(errorMessage)
|
||||
}
|
||||
}
|
||||
|
||||
const actualBranchCount = items && items.length > totalBranches ? items.length : totalBranches
|
||||
|
||||
const branchError = validateMaxCount(
|
||||
actualBranchCount,
|
||||
DEFAULTS.MAX_PARALLEL_BRANCHES,
|
||||
'Parallel branch count'
|
||||
)
|
||||
if (branchError) {
|
||||
logger.error(branchError, { parallelId, actualBranchCount })
|
||||
this.addParallelErrorLog(ctx, parallelId, branchError, {
|
||||
distribution: parallelConfig?.distribution,
|
||||
branchCount: actualBranchCount,
|
||||
})
|
||||
this.setErrorScope(ctx, parallelId, branchError)
|
||||
throw new Error(branchError)
|
||||
}
|
||||
|
||||
const scope: ParallelScope = {
|
||||
parallelId,
|
||||
totalBranches: actualBranchCount,
|
||||
@@ -108,6 +148,38 @@ export class ParallelOrchestrator {
|
||||
return scope
|
||||
}
|
||||
|
||||
private addParallelErrorLog(
|
||||
ctx: ExecutionContext,
|
||||
parallelId: string,
|
||||
errorMessage: string,
|
||||
inputData?: any
|
||||
): void {
|
||||
addSubflowErrorLog(
|
||||
ctx,
|
||||
parallelId,
|
||||
'parallel',
|
||||
errorMessage,
|
||||
inputData || {},
|
||||
this.contextExtensions
|
||||
)
|
||||
}
|
||||
|
||||
private setErrorScope(ctx: ExecutionContext, parallelId: string, errorMessage: string): void {
|
||||
const scope: ParallelScope = {
|
||||
parallelId,
|
||||
totalBranches: 0,
|
||||
branchOutputs: new Map(),
|
||||
completedCount: 0,
|
||||
totalExpectedNodes: 0,
|
||||
items: [],
|
||||
validationError: errorMessage,
|
||||
}
|
||||
if (!ctx.parallelExecutions) {
|
||||
ctx.parallelExecutions = new Map()
|
||||
}
|
||||
ctx.parallelExecutions.set(parallelId, scope)
|
||||
}
|
||||
|
||||
/**
|
||||
* Dynamically expand the DAG to include additional branch nodes when
|
||||
* the resolved item count exceeds the pre-built branch count.
|
||||
@@ -291,63 +363,19 @@ export class ParallelOrchestrator {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Resolve distribution items at runtime, handling references like <previousBlock.items>
|
||||
* This mirrors how LoopOrchestrator.resolveForEachItems works.
|
||||
*/
|
||||
private resolveDistributionItems(ctx: ExecutionContext, config: SerializedParallel): any[] {
|
||||
const rawItems = config.distribution
|
||||
|
||||
if (rawItems === undefined || rawItems === null) {
|
||||
if (config.parallelType === 'count') {
|
||||
return []
|
||||
}
|
||||
|
||||
// Already an array - return as-is
|
||||
if (Array.isArray(rawItems)) {
|
||||
return rawItems
|
||||
if (
|
||||
config.distribution === undefined ||
|
||||
config.distribution === null ||
|
||||
config.distribution === ''
|
||||
) {
|
||||
return []
|
||||
}
|
||||
|
||||
// Object - convert to entries array (consistent with loop forEach behavior)
|
||||
if (typeof rawItems === 'object') {
|
||||
return Object.entries(rawItems)
|
||||
}
|
||||
|
||||
// String handling
|
||||
if (typeof rawItems === 'string') {
|
||||
// Resolve references at runtime using the variable resolver
|
||||
if (rawItems.startsWith('<') && rawItems.endsWith('>') && this.resolver) {
|
||||
const resolved = this.resolver.resolveSingleReference(ctx, '', rawItems)
|
||||
if (Array.isArray(resolved)) {
|
||||
return resolved
|
||||
}
|
||||
if (typeof resolved === 'object' && resolved !== null) {
|
||||
return Object.entries(resolved)
|
||||
}
|
||||
logger.warn('Distribution reference did not resolve to array or object', {
|
||||
rawItems,
|
||||
resolved,
|
||||
})
|
||||
return []
|
||||
}
|
||||
|
||||
// Try to parse as JSON
|
||||
try {
|
||||
const normalized = rawItems.replace(/'/g, '"')
|
||||
const parsed = JSON.parse(normalized)
|
||||
if (Array.isArray(parsed)) {
|
||||
return parsed
|
||||
}
|
||||
if (typeof parsed === 'object' && parsed !== null) {
|
||||
return Object.entries(parsed)
|
||||
}
|
||||
return []
|
||||
} catch (error) {
|
||||
logger.error('Failed to parse distribution items', { rawItems, error })
|
||||
return []
|
||||
}
|
||||
}
|
||||
|
||||
return []
|
||||
return resolveArrayInput(ctx, config.distribution, this.resolver)
|
||||
}
|
||||
|
||||
handleParallelBranchCompletion(
|
||||
|
||||
@@ -1,5 +1,8 @@
|
||||
import { createLogger } from '@/lib/logs/console/logger'
|
||||
import { LOOP, PARALLEL, PARSING, REFERENCE } from '@/executor/constants'
|
||||
import type { ContextExtensions } from '@/executor/execution/types'
|
||||
import type { BlockLog, ExecutionContext } from '@/executor/types'
|
||||
import type { VariableResolver } from '@/executor/variables/resolver'
|
||||
import type { SerializedParallel } from '@/serializer/types'
|
||||
|
||||
const logger = createLogger('SubflowUtils')
|
||||
@@ -132,3 +135,131 @@ export function normalizeNodeId(nodeId: string): string {
|
||||
}
|
||||
return nodeId
|
||||
}
|
||||
|
||||
/**
|
||||
* Validates that a count doesn't exceed a maximum limit.
|
||||
* Returns an error message if validation fails, undefined otherwise.
|
||||
*/
|
||||
export function validateMaxCount(count: number, max: number, itemType: string): string | undefined {
|
||||
if (count > max) {
|
||||
return `${itemType} (${count}) exceeds maximum allowed (${max}). Execution blocked.`
|
||||
}
|
||||
return undefined
|
||||
}
|
||||
|
||||
/**
|
||||
* Resolves array input at runtime. Handles arrays, objects, references, and JSON strings.
|
||||
* Used by both loop forEach and parallel distribution resolution.
|
||||
* Throws an error if resolution fails.
|
||||
*/
|
||||
export function resolveArrayInput(
|
||||
ctx: ExecutionContext,
|
||||
items: any,
|
||||
resolver: VariableResolver | null
|
||||
): any[] {
|
||||
if (Array.isArray(items)) {
|
||||
return items
|
||||
}
|
||||
|
||||
if (typeof items === 'object' && items !== null) {
|
||||
return Object.entries(items)
|
||||
}
|
||||
|
||||
if (typeof items === 'string') {
|
||||
if (items.startsWith(REFERENCE.START) && items.endsWith(REFERENCE.END) && resolver) {
|
||||
try {
|
||||
const resolved = resolver.resolveSingleReference(ctx, '', items)
|
||||
if (Array.isArray(resolved)) {
|
||||
return resolved
|
||||
}
|
||||
if (typeof resolved === 'object' && resolved !== null) {
|
||||
return Object.entries(resolved)
|
||||
}
|
||||
throw new Error(`Reference "${items}" did not resolve to an array or object`)
|
||||
} catch (error) {
|
||||
if (error instanceof Error && error.message.startsWith('Reference "')) {
|
||||
throw error
|
||||
}
|
||||
throw new Error(
|
||||
`Failed to resolve reference "${items}": ${error instanceof Error ? error.message : String(error)}`
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
const normalized = items.replace(/'/g, '"')
|
||||
const parsed = JSON.parse(normalized)
|
||||
if (Array.isArray(parsed)) {
|
||||
return parsed
|
||||
}
|
||||
if (typeof parsed === 'object' && parsed !== null) {
|
||||
return Object.entries(parsed)
|
||||
}
|
||||
throw new Error(`Parsed value is not an array or object`)
|
||||
} catch (error) {
|
||||
if (error instanceof Error && error.message.startsWith('Parsed value')) {
|
||||
throw error
|
||||
}
|
||||
throw new Error(`Failed to parse items as JSON: "${items}"`)
|
||||
}
|
||||
}
|
||||
|
||||
if (resolver) {
|
||||
try {
|
||||
const resolved = resolver.resolveInputs(ctx, 'subflow_items', { items }).items
|
||||
if (Array.isArray(resolved)) {
|
||||
return resolved
|
||||
}
|
||||
throw new Error(`Resolved items is not an array`)
|
||||
} catch (error) {
|
||||
if (error instanceof Error && error.message.startsWith('Resolved items')) {
|
||||
throw error
|
||||
}
|
||||
throw new Error(
|
||||
`Failed to resolve items: ${error instanceof Error ? error.message : String(error)}`
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
return []
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates and logs an error for a subflow (loop or parallel).
|
||||
*/
|
||||
export function addSubflowErrorLog(
|
||||
ctx: ExecutionContext,
|
||||
blockId: string,
|
||||
blockType: 'loop' | 'parallel',
|
||||
errorMessage: string,
|
||||
inputData: Record<string, any>,
|
||||
contextExtensions: ContextExtensions | null
|
||||
): void {
|
||||
const now = new Date().toISOString()
|
||||
|
||||
const block = ctx.workflow?.blocks?.find((b) => b.id === blockId)
|
||||
const blockName = block?.metadata?.name || (blockType === 'loop' ? 'Loop' : 'Parallel')
|
||||
|
||||
const blockLog: BlockLog = {
|
||||
blockId,
|
||||
blockName,
|
||||
blockType,
|
||||
startedAt: now,
|
||||
endedAt: now,
|
||||
durationMs: 0,
|
||||
success: false,
|
||||
error: errorMessage,
|
||||
input: inputData,
|
||||
output: { error: errorMessage },
|
||||
...(blockType === 'loop' ? { loopId: blockId } : { parallelId: blockId }),
|
||||
}
|
||||
ctx.blockLogs.push(blockLog)
|
||||
|
||||
if (contextExtensions?.onBlockComplete) {
|
||||
contextExtensions.onBlockComplete(blockId, blockName, blockType, {
|
||||
input: inputData,
|
||||
output: { error: errorMessage },
|
||||
executionTime: 0,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -471,8 +471,10 @@ function groupIterationBlocks(spans: TraceSpan[]): TraceSpan[] {
|
||||
}
|
||||
})
|
||||
|
||||
// Include loop/parallel spans that have errors (e.g., validation errors that blocked execution)
|
||||
// These won't have iteration children, so they should appear directly in results
|
||||
const nonIterationContainerSpans = normalSpans.filter(
|
||||
(span) => span.type !== 'parallel' && span.type !== 'loop'
|
||||
(span) => (span.type !== 'parallel' && span.type !== 'loop') || span.status === 'error'
|
||||
)
|
||||
|
||||
if (iterationSpans.length > 0) {
|
||||
|
||||
@@ -144,7 +144,7 @@ describe('workflow store', () => {
|
||||
expect(state.blocks.parallel1?.data?.count).toBe(5)
|
||||
|
||||
expect(state.parallels.parallel1).toBeDefined()
|
||||
expect(state.parallels.parallel1.distribution).toBe('')
|
||||
expect(state.parallels.parallel1.distribution).toBeUndefined()
|
||||
})
|
||||
|
||||
it.concurrent('should regenerate parallels when updateParallelCollection is called', () => {
|
||||
|
||||
@@ -103,7 +103,7 @@ export function convertParallelBlockToParallel(
|
||||
: 'collection'
|
||||
|
||||
const distribution =
|
||||
validatedParallelType === 'collection' ? parallelBlock.data?.collection || '' : ''
|
||||
validatedParallelType === 'collection' ? parallelBlock.data?.collection || '' : undefined
|
||||
|
||||
const count = parallelBlock.data?.count || 5
|
||||
|
||||
|
||||
Reference in New Issue
Block a user