Compare commits

..

1 Commits

Author SHA1 Message Date
Vikhyath Mondreti
a0fd7c8ddf improvement(inputs): sanitize trigger inputs better 2026-01-28 10:47:57 -08:00
9 changed files with 54 additions and 94 deletions

View File

@@ -207,7 +207,6 @@ export class EdgeConstructor {
for (const connection of workflow.connections) {
let { source, target } = connection
const originalSource = source
const originalTarget = target
let sourceHandle = this.generateSourceHandle(
source,
target,
@@ -258,12 +257,12 @@ export class EdgeConstructor {
target = sentinelStartId
}
if (this.edgeCrossesLoopBoundary(source, target, blocksInLoops, dag)) {
continue
if (loopSentinelStartId) {
this.addEdge(dag, loopSentinelStartId, target, EDGE.LOOP_EXIT, targetHandle)
}
if (loopSentinelStartId && !blocksInLoops.has(originalTarget)) {
this.addEdge(dag, loopSentinelStartId, target, EDGE.LOOP_EXIT, targetHandle)
if (this.edgeCrossesLoopBoundary(source, target, blocksInLoops, dag)) {
continue
}
if (!this.isEdgeReachable(source, target, reachableBlocks, dag)) {

View File

@@ -4,6 +4,7 @@ import {
containsUserFileWithMetadata,
hydrateUserFilesWithBase64,
} from '@/lib/uploads/utils/user-file-base64.server'
import { sanitizeInputFormat, sanitizeTools } from '@/lib/workflows/comparison/normalize'
import {
BlockType,
buildResumeApiUrl,
@@ -34,6 +35,7 @@ import { validateBlockType } from '@/executor/utils/permission-check'
import type { VariableResolver } from '@/executor/variables/resolver'
import type { SerializedBlock } from '@/serializer/types'
import type { SubflowType } from '@/stores/workflows/workflow/types'
import { SYSTEM_SUBBLOCK_IDS } from '@/triggers/constants'
const logger = createLogger('BlockExecutor')
@@ -87,7 +89,7 @@ export class BlockExecutor {
resolvedInputs = this.resolver.resolveInputs(ctx, node.id, block.config.params, block)
if (blockLog) {
blockLog.input = this.parseJsonInputs(resolvedInputs)
blockLog.input = this.sanitizeInputsForLog(resolvedInputs)
}
} catch (error) {
cleanupSelfReference?.()
@@ -162,7 +164,7 @@ export class BlockExecutor {
ctx,
node,
block,
this.parseJsonInputs(resolvedInputs),
this.sanitizeInputsForLog(resolvedInputs),
displayOutput,
duration
)
@@ -241,7 +243,7 @@ export class BlockExecutor {
blockLog.durationMs = duration
blockLog.success = false
blockLog.error = errorMessage
blockLog.input = this.parseJsonInputs(input)
blockLog.input = this.sanitizeInputsForLog(input)
blockLog.output = filterOutputForLog(block.metadata?.id || '', errorOutput, { block })
}
@@ -260,7 +262,7 @@ export class BlockExecutor {
ctx,
node,
block,
this.parseJsonInputs(input),
this.sanitizeInputsForLog(input),
displayOutput,
duration
)
@@ -352,29 +354,41 @@ export class BlockExecutor {
}
/**
* Parse JSON string inputs to objects for log display only.
* Attempts to parse any string that looks like JSON.
* Sanitizes inputs for log display.
* - Filters out system fields (UI-only, readonly, internal flags)
* - Removes UI state from inputFormat items (e.g., collapsed)
* - Parses JSON strings to objects for readability
* Returns a new object - does not mutate the original inputs.
*/
private parseJsonInputs(inputs: Record<string, any>): Record<string, any> {
let result = inputs
let hasChanges = false
private sanitizeInputsForLog(inputs: Record<string, any>): Record<string, any> {
const result: Record<string, any> = {}
for (const [key, value] of Object.entries(inputs)) {
// isJSONString is a quick heuristic (checks for { or [), not a validator.
// Invalid JSON is safely caught below - this just avoids JSON.parse on every string.
if (typeof value !== 'string' || !isJSONString(value)) {
if (SYSTEM_SUBBLOCK_IDS.includes(key) || key === 'triggerMode') {
continue
}
try {
if (!hasChanges) {
result = { ...inputs }
hasChanges = true
if (key === 'inputFormat' && Array.isArray(value)) {
result[key] = sanitizeInputFormat(value)
continue
}
if (key === 'tools' && Array.isArray(value)) {
result[key] = sanitizeTools(value)
continue
}
// isJSONString is a quick heuristic (checks for { or [), not a validator.
// Invalid JSON is safely caught below - this just avoids JSON.parse on every string.
if (typeof value === 'string' && isJSONString(value)) {
try {
result[key] = JSON.parse(value.trim())
} catch {
// Not valid JSON, keep original string
result[key] = value
}
result[key] = JSON.parse(value.trim())
} catch {
// Not valid JSON, keep original string
} else {
result[key] = value
}
}

View File

@@ -77,16 +77,15 @@ export class EdgeManager {
}
}
if (output.selectedRoute !== EDGE.LOOP_EXIT && output.selectedRoute !== EDGE.PARALLEL_EXIT) {
for (const { target } of edgesToDeactivate) {
if (
!readyNodes.includes(target) &&
!activatedTargets.includes(target) &&
this.nodesWithActivatedEdge.has(target) &&
this.isTargetReady(target)
) {
readyNodes.push(target)
}
// Check if any deactivation targets that previously received an activated edge are now ready
for (const { target } of edgesToDeactivate) {
if (
!readyNodes.includes(target) &&
!activatedTargets.includes(target) &&
this.nodesWithActivatedEdge.has(target) &&
this.isTargetReady(target)
) {
readyNodes.push(target)
}
}

View File

@@ -390,12 +390,6 @@ export class ExecutionEngine {
logger.info('Processing outgoing edges', {
nodeId,
outgoingEdgesCount: node.outgoingEdges.size,
outgoingEdges: Array.from(node.outgoingEdges.entries()).map(([id, e]) => ({
id,
target: e.target,
sourceHandle: e.sourceHandle,
})),
output,
readyNodesCount: readyNodes.length,
readyNodes,
})

View File

@@ -27,8 +27,6 @@ export interface ParallelScope {
items?: any[]
/** Error message if parallel validation failed (e.g., exceeded max branches) */
validationError?: string
/** Whether the parallel has an empty distribution and should be skipped */
isEmpty?: boolean
}
export class ExecutionState implements BlockStateController {

View File

@@ -386,10 +386,10 @@ export class LoopOrchestrator {
return true
}
// forEach: skip if items array is empty
if (scope.loopType === 'forEach') {
if (!scope.items || scope.items.length === 0) {
logger.info('ForEach loop has empty collection, skipping loop body', { loopId })
this.state.setBlockOutput(loopId, { results: [] }, DEFAULTS.EXECUTION_TIME)
logger.info('ForEach loop has empty items, skipping loop body', { loopId })
return false
}
return true
@@ -399,8 +399,6 @@ export class LoopOrchestrator {
if (scope.loopType === 'for') {
if (scope.maxIterations === 0) {
logger.info('For loop has 0 iterations, skipping loop body', { loopId })
// Set empty output for the loop
this.state.setBlockOutput(loopId, { results: [] }, DEFAULTS.EXECUTION_TIME)
return false
}
return true

View File

@@ -97,7 +97,7 @@ export class NodeExecutionOrchestrator {
if (loopId) {
const shouldExecute = await this.loopOrchestrator.evaluateInitialCondition(ctx, loopId)
if (!shouldExecute) {
logger.info('Loop initial condition false, skipping loop body', { loopId })
logger.info('While loop initial condition false, skipping loop body', { loopId })
return {
sentinelStart: true,
shouldExit: true,
@@ -158,17 +158,6 @@ export class NodeExecutionOrchestrator {
this.parallelOrchestrator.initializeParallelScope(ctx, parallelId, nodesInParallel)
}
}
const scope = this.parallelOrchestrator.getParallelScope(ctx, parallelId)
if (scope?.isEmpty) {
logger.info('Parallel has empty distribution, skipping parallel body', { parallelId })
return {
sentinelStart: true,
shouldExit: true,
selectedRoute: EDGE.PARALLEL_EXIT,
}
}
return { sentinelStart: true }
}

View File

@@ -61,13 +61,11 @@ export class ParallelOrchestrator {
let items: any[] | undefined
let branchCount: number
let isEmpty = false
try {
const resolved = this.resolveBranchCount(ctx, parallelConfig, parallelId)
const resolved = this.resolveBranchCount(ctx, parallelConfig)
branchCount = resolved.branchCount
items = resolved.items
isEmpty = resolved.isEmpty ?? false
} catch (error) {
const errorMessage = `Parallel Items did not resolve: ${error instanceof Error ? error.message : String(error)}`
logger.error(errorMessage, { parallelId, distribution: parallelConfig.distribution })
@@ -93,34 +91,6 @@ export class ParallelOrchestrator {
throw new Error(branchError)
}
// Handle empty distribution - skip parallel body
if (isEmpty || branchCount === 0) {
const scope: ParallelScope = {
parallelId,
totalBranches: 0,
branchOutputs: new Map(),
completedCount: 0,
totalExpectedNodes: 0,
items: [],
isEmpty: true,
}
if (!ctx.parallelExecutions) {
ctx.parallelExecutions = new Map()
}
ctx.parallelExecutions.set(parallelId, scope)
// Set empty output for the parallel
this.state.setBlockOutput(parallelId, { results: [] })
logger.info('Parallel scope initialized with empty distribution, skipping body', {
parallelId,
branchCount: 0,
})
return scope
}
const { entryNodes } = this.expander.expandParallel(this.dag, parallelId, branchCount, items)
const scope: ParallelScope = {
@@ -157,17 +127,15 @@ export class ParallelOrchestrator {
private resolveBranchCount(
ctx: ExecutionContext,
config: SerializedParallel,
parallelId: string
): { branchCount: number; items?: any[]; isEmpty?: boolean } {
config: SerializedParallel
): { branchCount: number; items?: any[] } {
if (config.parallelType === 'count') {
return { branchCount: config.count ?? 1 }
}
const items = this.resolveDistributionItems(ctx, config)
if (items.length === 0) {
logger.info('Parallel has empty distribution, skipping parallel body', { parallelId })
return { branchCount: 0, items: [], isEmpty: true }
return { branchCount: config.count ?? 1 }
}
return { branchCount: items.length, items }

View File

@@ -10,6 +10,7 @@ export const SYSTEM_SUBBLOCK_IDS: string[] = [
'webhookUrlDisplay', // Webhook URL display
'samplePayload', // Example payload display
'setupScript', // Setup script code (e.g., Apps Script)
'scheduleInfo', // Schedule status display (next run, last run)
]
/**