mirror of
https://github.com/simstudioai/sim.git
synced 2026-01-28 16:27:55 -05:00
Compare commits
1 Commits
fix/loop-e
...
improvemen
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
a0fd7c8ddf |
@@ -207,7 +207,6 @@ export class EdgeConstructor {
|
||||
for (const connection of workflow.connections) {
|
||||
let { source, target } = connection
|
||||
const originalSource = source
|
||||
const originalTarget = target
|
||||
let sourceHandle = this.generateSourceHandle(
|
||||
source,
|
||||
target,
|
||||
@@ -258,12 +257,12 @@ export class EdgeConstructor {
|
||||
target = sentinelStartId
|
||||
}
|
||||
|
||||
if (this.edgeCrossesLoopBoundary(source, target, blocksInLoops, dag)) {
|
||||
continue
|
||||
if (loopSentinelStartId) {
|
||||
this.addEdge(dag, loopSentinelStartId, target, EDGE.LOOP_EXIT, targetHandle)
|
||||
}
|
||||
|
||||
if (loopSentinelStartId && !blocksInLoops.has(originalTarget)) {
|
||||
this.addEdge(dag, loopSentinelStartId, target, EDGE.LOOP_EXIT, targetHandle)
|
||||
if (this.edgeCrossesLoopBoundary(source, target, blocksInLoops, dag)) {
|
||||
continue
|
||||
}
|
||||
|
||||
if (!this.isEdgeReachable(source, target, reachableBlocks, dag)) {
|
||||
|
||||
@@ -4,6 +4,7 @@ import {
|
||||
containsUserFileWithMetadata,
|
||||
hydrateUserFilesWithBase64,
|
||||
} from '@/lib/uploads/utils/user-file-base64.server'
|
||||
import { sanitizeInputFormat, sanitizeTools } from '@/lib/workflows/comparison/normalize'
|
||||
import {
|
||||
BlockType,
|
||||
buildResumeApiUrl,
|
||||
@@ -34,6 +35,7 @@ import { validateBlockType } from '@/executor/utils/permission-check'
|
||||
import type { VariableResolver } from '@/executor/variables/resolver'
|
||||
import type { SerializedBlock } from '@/serializer/types'
|
||||
import type { SubflowType } from '@/stores/workflows/workflow/types'
|
||||
import { SYSTEM_SUBBLOCK_IDS } from '@/triggers/constants'
|
||||
|
||||
const logger = createLogger('BlockExecutor')
|
||||
|
||||
@@ -87,7 +89,7 @@ export class BlockExecutor {
|
||||
resolvedInputs = this.resolver.resolveInputs(ctx, node.id, block.config.params, block)
|
||||
|
||||
if (blockLog) {
|
||||
blockLog.input = this.parseJsonInputs(resolvedInputs)
|
||||
blockLog.input = this.sanitizeInputsForLog(resolvedInputs)
|
||||
}
|
||||
} catch (error) {
|
||||
cleanupSelfReference?.()
|
||||
@@ -162,7 +164,7 @@ export class BlockExecutor {
|
||||
ctx,
|
||||
node,
|
||||
block,
|
||||
this.parseJsonInputs(resolvedInputs),
|
||||
this.sanitizeInputsForLog(resolvedInputs),
|
||||
displayOutput,
|
||||
duration
|
||||
)
|
||||
@@ -241,7 +243,7 @@ export class BlockExecutor {
|
||||
blockLog.durationMs = duration
|
||||
blockLog.success = false
|
||||
blockLog.error = errorMessage
|
||||
blockLog.input = this.parseJsonInputs(input)
|
||||
blockLog.input = this.sanitizeInputsForLog(input)
|
||||
blockLog.output = filterOutputForLog(block.metadata?.id || '', errorOutput, { block })
|
||||
}
|
||||
|
||||
@@ -260,7 +262,7 @@ export class BlockExecutor {
|
||||
ctx,
|
||||
node,
|
||||
block,
|
||||
this.parseJsonInputs(input),
|
||||
this.sanitizeInputsForLog(input),
|
||||
displayOutput,
|
||||
duration
|
||||
)
|
||||
@@ -352,29 +354,41 @@ export class BlockExecutor {
|
||||
}
|
||||
|
||||
/**
|
||||
* Parse JSON string inputs to objects for log display only.
|
||||
* Attempts to parse any string that looks like JSON.
|
||||
* Sanitizes inputs for log display.
|
||||
* - Filters out system fields (UI-only, readonly, internal flags)
|
||||
* - Removes UI state from inputFormat items (e.g., collapsed)
|
||||
* - Parses JSON strings to objects for readability
|
||||
* Returns a new object - does not mutate the original inputs.
|
||||
*/
|
||||
private parseJsonInputs(inputs: Record<string, any>): Record<string, any> {
|
||||
let result = inputs
|
||||
let hasChanges = false
|
||||
private sanitizeInputsForLog(inputs: Record<string, any>): Record<string, any> {
|
||||
const result: Record<string, any> = {}
|
||||
|
||||
for (const [key, value] of Object.entries(inputs)) {
|
||||
// isJSONString is a quick heuristic (checks for { or [), not a validator.
|
||||
// Invalid JSON is safely caught below - this just avoids JSON.parse on every string.
|
||||
if (typeof value !== 'string' || !isJSONString(value)) {
|
||||
if (SYSTEM_SUBBLOCK_IDS.includes(key) || key === 'triggerMode') {
|
||||
continue
|
||||
}
|
||||
|
||||
try {
|
||||
if (!hasChanges) {
|
||||
result = { ...inputs }
|
||||
hasChanges = true
|
||||
if (key === 'inputFormat' && Array.isArray(value)) {
|
||||
result[key] = sanitizeInputFormat(value)
|
||||
continue
|
||||
}
|
||||
|
||||
if (key === 'tools' && Array.isArray(value)) {
|
||||
result[key] = sanitizeTools(value)
|
||||
continue
|
||||
}
|
||||
|
||||
// isJSONString is a quick heuristic (checks for { or [), not a validator.
|
||||
// Invalid JSON is safely caught below - this just avoids JSON.parse on every string.
|
||||
if (typeof value === 'string' && isJSONString(value)) {
|
||||
try {
|
||||
result[key] = JSON.parse(value.trim())
|
||||
} catch {
|
||||
// Not valid JSON, keep original string
|
||||
result[key] = value
|
||||
}
|
||||
result[key] = JSON.parse(value.trim())
|
||||
} catch {
|
||||
// Not valid JSON, keep original string
|
||||
} else {
|
||||
result[key] = value
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -77,16 +77,15 @@ export class EdgeManager {
|
||||
}
|
||||
}
|
||||
|
||||
if (output.selectedRoute !== EDGE.LOOP_EXIT && output.selectedRoute !== EDGE.PARALLEL_EXIT) {
|
||||
for (const { target } of edgesToDeactivate) {
|
||||
if (
|
||||
!readyNodes.includes(target) &&
|
||||
!activatedTargets.includes(target) &&
|
||||
this.nodesWithActivatedEdge.has(target) &&
|
||||
this.isTargetReady(target)
|
||||
) {
|
||||
readyNodes.push(target)
|
||||
}
|
||||
// Check if any deactivation targets that previously received an activated edge are now ready
|
||||
for (const { target } of edgesToDeactivate) {
|
||||
if (
|
||||
!readyNodes.includes(target) &&
|
||||
!activatedTargets.includes(target) &&
|
||||
this.nodesWithActivatedEdge.has(target) &&
|
||||
this.isTargetReady(target)
|
||||
) {
|
||||
readyNodes.push(target)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -390,12 +390,6 @@ export class ExecutionEngine {
|
||||
logger.info('Processing outgoing edges', {
|
||||
nodeId,
|
||||
outgoingEdgesCount: node.outgoingEdges.size,
|
||||
outgoingEdges: Array.from(node.outgoingEdges.entries()).map(([id, e]) => ({
|
||||
id,
|
||||
target: e.target,
|
||||
sourceHandle: e.sourceHandle,
|
||||
})),
|
||||
output,
|
||||
readyNodesCount: readyNodes.length,
|
||||
readyNodes,
|
||||
})
|
||||
|
||||
@@ -27,8 +27,6 @@ export interface ParallelScope {
|
||||
items?: any[]
|
||||
/** Error message if parallel validation failed (e.g., exceeded max branches) */
|
||||
validationError?: string
|
||||
/** Whether the parallel has an empty distribution and should be skipped */
|
||||
isEmpty?: boolean
|
||||
}
|
||||
|
||||
export class ExecutionState implements BlockStateController {
|
||||
|
||||
@@ -386,10 +386,10 @@ export class LoopOrchestrator {
|
||||
return true
|
||||
}
|
||||
|
||||
// forEach: skip if items array is empty
|
||||
if (scope.loopType === 'forEach') {
|
||||
if (!scope.items || scope.items.length === 0) {
|
||||
logger.info('ForEach loop has empty collection, skipping loop body', { loopId })
|
||||
this.state.setBlockOutput(loopId, { results: [] }, DEFAULTS.EXECUTION_TIME)
|
||||
logger.info('ForEach loop has empty items, skipping loop body', { loopId })
|
||||
return false
|
||||
}
|
||||
return true
|
||||
@@ -399,8 +399,6 @@ export class LoopOrchestrator {
|
||||
if (scope.loopType === 'for') {
|
||||
if (scope.maxIterations === 0) {
|
||||
logger.info('For loop has 0 iterations, skipping loop body', { loopId })
|
||||
// Set empty output for the loop
|
||||
this.state.setBlockOutput(loopId, { results: [] }, DEFAULTS.EXECUTION_TIME)
|
||||
return false
|
||||
}
|
||||
return true
|
||||
|
||||
@@ -97,7 +97,7 @@ export class NodeExecutionOrchestrator {
|
||||
if (loopId) {
|
||||
const shouldExecute = await this.loopOrchestrator.evaluateInitialCondition(ctx, loopId)
|
||||
if (!shouldExecute) {
|
||||
logger.info('Loop initial condition false, skipping loop body', { loopId })
|
||||
logger.info('While loop initial condition false, skipping loop body', { loopId })
|
||||
return {
|
||||
sentinelStart: true,
|
||||
shouldExit: true,
|
||||
@@ -158,17 +158,6 @@ export class NodeExecutionOrchestrator {
|
||||
this.parallelOrchestrator.initializeParallelScope(ctx, parallelId, nodesInParallel)
|
||||
}
|
||||
}
|
||||
|
||||
const scope = this.parallelOrchestrator.getParallelScope(ctx, parallelId)
|
||||
if (scope?.isEmpty) {
|
||||
logger.info('Parallel has empty distribution, skipping parallel body', { parallelId })
|
||||
return {
|
||||
sentinelStart: true,
|
||||
shouldExit: true,
|
||||
selectedRoute: EDGE.PARALLEL_EXIT,
|
||||
}
|
||||
}
|
||||
|
||||
return { sentinelStart: true }
|
||||
}
|
||||
|
||||
|
||||
@@ -61,13 +61,11 @@ export class ParallelOrchestrator {
|
||||
|
||||
let items: any[] | undefined
|
||||
let branchCount: number
|
||||
let isEmpty = false
|
||||
|
||||
try {
|
||||
const resolved = this.resolveBranchCount(ctx, parallelConfig, parallelId)
|
||||
const resolved = this.resolveBranchCount(ctx, parallelConfig)
|
||||
branchCount = resolved.branchCount
|
||||
items = resolved.items
|
||||
isEmpty = resolved.isEmpty ?? false
|
||||
} catch (error) {
|
||||
const errorMessage = `Parallel Items did not resolve: ${error instanceof Error ? error.message : String(error)}`
|
||||
logger.error(errorMessage, { parallelId, distribution: parallelConfig.distribution })
|
||||
@@ -93,34 +91,6 @@ export class ParallelOrchestrator {
|
||||
throw new Error(branchError)
|
||||
}
|
||||
|
||||
// Handle empty distribution - skip parallel body
|
||||
if (isEmpty || branchCount === 0) {
|
||||
const scope: ParallelScope = {
|
||||
parallelId,
|
||||
totalBranches: 0,
|
||||
branchOutputs: new Map(),
|
||||
completedCount: 0,
|
||||
totalExpectedNodes: 0,
|
||||
items: [],
|
||||
isEmpty: true,
|
||||
}
|
||||
|
||||
if (!ctx.parallelExecutions) {
|
||||
ctx.parallelExecutions = new Map()
|
||||
}
|
||||
ctx.parallelExecutions.set(parallelId, scope)
|
||||
|
||||
// Set empty output for the parallel
|
||||
this.state.setBlockOutput(parallelId, { results: [] })
|
||||
|
||||
logger.info('Parallel scope initialized with empty distribution, skipping body', {
|
||||
parallelId,
|
||||
branchCount: 0,
|
||||
})
|
||||
|
||||
return scope
|
||||
}
|
||||
|
||||
const { entryNodes } = this.expander.expandParallel(this.dag, parallelId, branchCount, items)
|
||||
|
||||
const scope: ParallelScope = {
|
||||
@@ -157,17 +127,15 @@ export class ParallelOrchestrator {
|
||||
|
||||
private resolveBranchCount(
|
||||
ctx: ExecutionContext,
|
||||
config: SerializedParallel,
|
||||
parallelId: string
|
||||
): { branchCount: number; items?: any[]; isEmpty?: boolean } {
|
||||
config: SerializedParallel
|
||||
): { branchCount: number; items?: any[] } {
|
||||
if (config.parallelType === 'count') {
|
||||
return { branchCount: config.count ?? 1 }
|
||||
}
|
||||
|
||||
const items = this.resolveDistributionItems(ctx, config)
|
||||
if (items.length === 0) {
|
||||
logger.info('Parallel has empty distribution, skipping parallel body', { parallelId })
|
||||
return { branchCount: 0, items: [], isEmpty: true }
|
||||
return { branchCount: config.count ?? 1 }
|
||||
}
|
||||
|
||||
return { branchCount: items.length, items }
|
||||
|
||||
@@ -10,6 +10,7 @@ export const SYSTEM_SUBBLOCK_IDS: string[] = [
|
||||
'webhookUrlDisplay', // Webhook URL display
|
||||
'samplePayload', // Example payload display
|
||||
'setupScript', // Setup script code (e.g., Apps Script)
|
||||
'scheduleInfo', // Schedule status display (next run, last run)
|
||||
]
|
||||
|
||||
/**
|
||||
|
||||
Reference in New Issue
Block a user