Compare commits

..

13 Commits

Author SHA1 Message Date
Vikhyath Mondreti
e09e525ea2 consistent target 2026-01-22 18:28:33 -08:00
Vikhyath Mondreti
ae113f76fd fix cleanup 2026-01-22 18:26:25 -08:00
Vikhyath Mondreti
a160c8e145 simplify namespace filtering logic 2026-01-22 18:13:53 -08:00
Vikhyath Mondreti
03bb2d2763 delete needs to account for namespace 2026-01-22 18:08:55 -08:00
Vikhyath Mondreti
e0d301aca7 fix(idempotency): add conflict target to atomicallyClaimDb query 2026-01-22 18:04:23 -08:00
Vikhyath Mondreti
aa99db6fdd fix(subflows): tag dropdown + resolution logic (#2949)
* fix(subflows): tag dropdown + resolution logic

* fixes;

* revert parallel change
2026-01-22 17:57:55 -08:00
Waleed
748793e07d fix(executor): handle condition dead-end branches in loops (#2944) 2026-01-22 13:30:11 -08:00
Siddharth Ganesan
91da7e183a fix(copilot): always allow, credential masking (#2947)
* Fix always allow, credential validation

* Credential masking

* Autoload
2026-01-22 13:07:16 -08:00
Waleed
ab09a5ad23 feat(router): expose reasoning output in router v2 block (#2945) 2026-01-22 12:43:57 -08:00
Vikhyath Mondreti
fcd0240db6 fix(resolver): consolidate reference resolution (#2941)
* fix(resolver): consolidate code to resolve references

* fix edge cases

* use already formatted error

* fix multi index

* fix backwards compat reachability

* handle backwards compatibility accurately

* use shared constant correctly
2026-01-22 12:38:50 -08:00
Waleed
4e4149792a fix(gmail): expose messageId field in read email block (#2943) 2026-01-22 11:46:34 -08:00
Waleed
9a8b591257 improvement(helm): add per-deployment extraVolumes support (#2942) 2026-01-22 11:35:23 -08:00
Waleed
f3ae3f8442 fix(executor): stop parallel execution when block errors (#2940) 2026-01-22 11:34:40 -08:00
48 changed files with 13538 additions and 560 deletions

View File

@@ -313,7 +313,7 @@ describe('Function Execute API Route', () => {
'block-2': 'world',
},
blockNameMapping: {
validVar: 'block-1',
validvar: 'block-1',
another_valid: 'block-2',
},
})
@@ -539,7 +539,7 @@ describe('Function Execute API Route', () => {
'block-complex': complexData,
},
blockNameMapping: {
complexData: 'block-complex',
complexdata: 'block-complex',
},
})

View File

@@ -6,11 +6,11 @@ import { executeInE2B } from '@/lib/execution/e2b'
import { executeInIsolatedVM } from '@/lib/execution/isolated-vm'
import { CodeLanguage, DEFAULT_CODE_LANGUAGE, isValidCodeLanguage } from '@/lib/execution/languages'
import { escapeRegExp, normalizeName, REFERENCE } from '@/executor/constants'
import { type OutputSchema, resolveBlockReference } from '@/executor/utils/block-reference'
import {
createEnvVarPattern,
createWorkflowVariablePattern,
} from '@/executor/utils/reference-validation'
import { navigatePath } from '@/executor/variables/resolvers/reference'
export const dynamic = 'force-dynamic'
export const runtime = 'nodejs'
@@ -470,14 +470,17 @@ function resolveEnvironmentVariables(
function resolveTagVariables(
code: string,
blockData: Record<string, any>,
blockData: Record<string, unknown>,
blockNameMapping: Record<string, string>,
contextVariables: Record<string, any>
blockOutputSchemas: Record<string, OutputSchema>,
contextVariables: Record<string, unknown>,
language = 'javascript'
): string {
let resolvedCode = code
const undefinedLiteral = language === 'python' ? 'None' : 'undefined'
const tagPattern = new RegExp(
`${REFERENCE.START}([a-zA-Z_][a-zA-Z0-9_${REFERENCE.PATH_DELIMITER}]*[a-zA-Z0-9_])${REFERENCE.END}`,
`${REFERENCE.START}([a-zA-Z_](?:[a-zA-Z0-9_${REFERENCE.PATH_DELIMITER}]*[a-zA-Z0-9_])?)${REFERENCE.END}`,
'g'
)
const tagMatches = resolvedCode.match(tagPattern) || []
@@ -486,41 +489,37 @@ function resolveTagVariables(
const tagName = match.slice(REFERENCE.START.length, -REFERENCE.END.length).trim()
const pathParts = tagName.split(REFERENCE.PATH_DELIMITER)
const blockName = pathParts[0]
const fieldPath = pathParts.slice(1)
const blockId = blockNameMapping[blockName]
if (!blockId) {
const result = resolveBlockReference(blockName, fieldPath, {
blockNameMapping,
blockData,
blockOutputSchemas,
})
if (!result) {
continue
}
const blockOutput = blockData[blockId]
if (blockOutput === undefined) {
continue
}
let tagValue: any
if (pathParts.length === 1) {
tagValue = blockOutput
} else {
tagValue = navigatePath(blockOutput, pathParts.slice(1))
}
let tagValue = result.value
if (tagValue === undefined) {
resolvedCode = resolvedCode.replace(new RegExp(escapeRegExp(match), 'g'), undefinedLiteral)
continue
}
if (
typeof tagValue === 'string' &&
tagValue.length > 100 &&
(tagValue.startsWith('{') || tagValue.startsWith('['))
) {
try {
tagValue = JSON.parse(tagValue)
} catch {
// Keep as-is
if (typeof tagValue === 'string') {
const trimmed = tagValue.trimStart()
if (trimmed.startsWith('{') || trimmed.startsWith('[')) {
try {
tagValue = JSON.parse(tagValue)
} catch {
// Keep as string if not valid JSON
}
}
}
const safeVarName = `__tag_${tagName.replace(/[^a-zA-Z0-9_]/g, '_')}`
const safeVarName = `__tag_${tagName.replace(/_/g, '_1').replace(/\./g, '_0')}`
contextVariables[safeVarName] = tagValue
resolvedCode = resolvedCode.replace(new RegExp(escapeRegExp(match), 'g'), safeVarName)
}
@@ -537,18 +536,27 @@ function resolveTagVariables(
*/
function resolveCodeVariables(
code: string,
params: Record<string, any>,
params: Record<string, unknown>,
envVars: Record<string, string> = {},
blockData: Record<string, any> = {},
blockData: Record<string, unknown> = {},
blockNameMapping: Record<string, string> = {},
workflowVariables: Record<string, any> = {}
): { resolvedCode: string; contextVariables: Record<string, any> } {
blockOutputSchemas: Record<string, OutputSchema> = {},
workflowVariables: Record<string, unknown> = {},
language = 'javascript'
): { resolvedCode: string; contextVariables: Record<string, unknown> } {
let resolvedCode = code
const contextVariables: Record<string, any> = {}
const contextVariables: Record<string, unknown> = {}
resolvedCode = resolveWorkflowVariables(resolvedCode, workflowVariables, contextVariables)
resolvedCode = resolveEnvironmentVariables(resolvedCode, params, envVars, contextVariables)
resolvedCode = resolveTagVariables(resolvedCode, blockData, blockNameMapping, contextVariables)
resolvedCode = resolveTagVariables(
resolvedCode,
blockData,
blockNameMapping,
blockOutputSchemas,
contextVariables,
language
)
return { resolvedCode, contextVariables }
}
@@ -585,6 +593,7 @@ export async function POST(req: NextRequest) {
envVars = {},
blockData = {},
blockNameMapping = {},
blockOutputSchemas = {},
workflowVariables = {},
workflowId,
isCustomTool = false,
@@ -601,20 +610,21 @@ export async function POST(req: NextRequest) {
isCustomTool,
})
// Resolve variables in the code with workflow environment variables
const lang = isValidCodeLanguage(language) ? language : DEFAULT_CODE_LANGUAGE
const codeResolution = resolveCodeVariables(
code,
executionParams,
envVars,
blockData,
blockNameMapping,
workflowVariables
blockOutputSchemas,
workflowVariables,
lang
)
resolvedCode = codeResolution.resolvedCode
const contextVariables = codeResolution.contextVariables
const lang = isValidCodeLanguage(language) ? language : DEFAULT_CODE_LANGUAGE
let jsImports = ''
let jsRemainingCode = resolvedCode
let hasImports = false
@@ -670,7 +680,11 @@ export async function POST(req: NextRequest) {
prologue += `const environmentVariables = JSON.parse(${JSON.stringify(JSON.stringify(envVars))});\n`
prologueLineCount++
for (const [k, v] of Object.entries(contextVariables)) {
prologue += `const ${k} = JSON.parse(${JSON.stringify(JSON.stringify(v))});\n`
if (v === undefined) {
prologue += `const ${k} = undefined;\n`
} else {
prologue += `const ${k} = JSON.parse(${JSON.stringify(JSON.stringify(v))});\n`
}
prologueLineCount++
}
@@ -741,7 +755,11 @@ export async function POST(req: NextRequest) {
prologue += `environmentVariables = json.loads(${JSON.stringify(JSON.stringify(envVars))})\n`
prologueLineCount++
for (const [k, v] of Object.entries(contextVariables)) {
prologue += `${k} = json.loads(${JSON.stringify(JSON.stringify(v))})\n`
if (v === undefined) {
prologue += `${k} = None\n`
} else {
prologue += `${k} = json.loads(${JSON.stringify(JSON.stringify(v))})\n`
}
prologueLineCount++
}
const wrapped = [

View File

@@ -151,6 +151,29 @@ export const ActionBar = memo(
</Tooltip.Root>
)}
{isSubflowBlock && (
<Tooltip.Root>
<Tooltip.Trigger asChild>
<Button
variant='ghost'
onClick={(e) => {
e.stopPropagation()
if (!disabled) {
collaborativeBatchToggleBlockEnabled([blockId])
}
}}
className={ACTION_BUTTON_STYLES}
disabled={disabled}
>
{isEnabled ? <Circle className={ICON_SIZE} /> : <CircleOff className={ICON_SIZE} />}
</Button>
</Tooltip.Trigger>
<Tooltip.Content side='top'>
{getTooltipMessage(isEnabled ? 'Disable Block' : 'Enable Block')}
</Tooltip.Content>
</Tooltip.Root>
)}
{!isStartBlock && !isResponseBlock && (
<Tooltip.Root>
<Tooltip.Trigger asChild>
@@ -222,29 +245,6 @@ export const ActionBar = memo(
</Tooltip.Root>
)}
{isSubflowBlock && (
<Tooltip.Root>
<Tooltip.Trigger asChild>
<Button
variant='ghost'
onClick={(e) => {
e.stopPropagation()
if (!disabled) {
collaborativeBatchToggleBlockEnabled([blockId])
}
}}
className={ACTION_BUTTON_STYLES}
disabled={disabled}
>
{isEnabled ? <Circle className={ICON_SIZE} /> : <CircleOff className={ICON_SIZE} />}
</Button>
</Tooltip.Trigger>
<Tooltip.Content side='top'>
{getTooltipMessage(isEnabled ? 'Disable Block' : 'Enable Block')}
</Tooltip.Content>
</Tooltip.Root>
)}
<Tooltip.Root>
<Tooltip.Trigger asChild>
<Button

View File

@@ -1312,15 +1312,16 @@ export const TagDropdown: React.FC<TagDropdownProps> = ({
if (currentLoop && isLoopBlock) {
containingLoopBlockId = blockId
const loopType = currentLoop.loopType || 'for'
const contextualTags: string[] = ['index']
if (loopType === 'forEach') {
contextualTags.push('currentItem')
contextualTags.push('items')
}
const loopBlock = blocks[blockId]
if (loopBlock) {
const loopBlockName = loopBlock.name || loopBlock.type
const normalizedLoopName = normalizeName(loopBlockName)
const contextualTags: string[] = [`${normalizedLoopName}.index`]
if (loopType === 'forEach') {
contextualTags.push(`${normalizedLoopName}.currentItem`)
contextualTags.push(`${normalizedLoopName}.items`)
}
loopBlockGroup = {
blockName: loopBlockName,
@@ -1328,21 +1329,23 @@ export const TagDropdown: React.FC<TagDropdownProps> = ({
blockType: 'loop',
tags: contextualTags,
distance: 0,
isContextual: true,
}
}
} else if (containingLoop) {
const [loopId, loop] = containingLoop
containingLoopBlockId = loopId
const loopType = loop.loopType || 'for'
const contextualTags: string[] = ['index']
if (loopType === 'forEach') {
contextualTags.push('currentItem')
contextualTags.push('items')
}
const containingLoopBlock = blocks[loopId]
if (containingLoopBlock) {
const loopBlockName = containingLoopBlock.name || containingLoopBlock.type
const normalizedLoopName = normalizeName(loopBlockName)
const contextualTags: string[] = [`${normalizedLoopName}.index`]
if (loopType === 'forEach') {
contextualTags.push(`${normalizedLoopName}.currentItem`)
contextualTags.push(`${normalizedLoopName}.items`)
}
loopBlockGroup = {
blockName: loopBlockName,
@@ -1350,6 +1353,7 @@ export const TagDropdown: React.FC<TagDropdownProps> = ({
blockType: 'loop',
tags: contextualTags,
distance: 0,
isContextual: true,
}
}
}
@@ -1363,15 +1367,16 @@ export const TagDropdown: React.FC<TagDropdownProps> = ({
const [parallelId, parallel] = containingParallel
containingParallelBlockId = parallelId
const parallelType = parallel.parallelType || 'count'
const contextualTags: string[] = ['index']
if (parallelType === 'collection') {
contextualTags.push('currentItem')
contextualTags.push('items')
}
const containingParallelBlock = blocks[parallelId]
if (containingParallelBlock) {
const parallelBlockName = containingParallelBlock.name || containingParallelBlock.type
const normalizedParallelName = normalizeName(parallelBlockName)
const contextualTags: string[] = [`${normalizedParallelName}.index`]
if (parallelType === 'collection') {
contextualTags.push(`${normalizedParallelName}.currentItem`)
contextualTags.push(`${normalizedParallelName}.items`)
}
parallelBlockGroup = {
blockName: parallelBlockName,
@@ -1379,6 +1384,7 @@ export const TagDropdown: React.FC<TagDropdownProps> = ({
blockType: 'parallel',
tags: contextualTags,
distance: 0,
isContextual: true,
}
}
}
@@ -1645,38 +1651,29 @@ export const TagDropdown: React.FC<TagDropdownProps> = ({
const nestedBlockTagGroups: NestedBlockTagGroup[] = useMemo(() => {
return filteredBlockTagGroups.map((group: BlockTagGroup) => {
const normalizedBlockName = normalizeName(group.blockName)
// Handle loop/parallel contextual tags (index, currentItem, items)
const directTags: NestedTag[] = []
const tagsForTree: string[] = []
group.tags.forEach((tag: string) => {
const tagParts = tag.split('.')
// Loop/parallel contextual tags without block prefix
if (
(group.blockType === 'loop' || group.blockType === 'parallel') &&
tagParts.length === 1
) {
if (tagParts.length === 1) {
directTags.push({
key: tag,
display: tag,
fullTag: tag,
})
} else if (tagParts.length === 2) {
// Direct property like blockname.property
directTags.push({
key: tagParts[1],
display: tagParts[1],
fullTag: tag,
})
} else {
// Nested property - add to tree builder
tagsForTree.push(tag)
}
})
// Build recursive tree from nested tags
const nestedTags = [...directTags, ...buildNestedTagTree(tagsForTree, normalizedBlockName)]
return {
@@ -1800,13 +1797,19 @@ export const TagDropdown: React.FC<TagDropdownProps> = ({
processedTag = tag
}
} else if (
blockGroup &&
blockGroup?.isContextual &&
(blockGroup.blockType === 'loop' || blockGroup.blockType === 'parallel')
) {
if (!tag.includes('.') && ['index', 'currentItem', 'items'].includes(tag)) {
processedTag = `${blockGroup.blockType}.${tag}`
const tagParts = tag.split('.')
if (tagParts.length === 1) {
processedTag = blockGroup.blockType
} else {
processedTag = tag
const lastPart = tagParts[tagParts.length - 1]
if (['index', 'currentItem', 'items'].includes(lastPart)) {
processedTag = `${blockGroup.blockType}.${lastPart}`
} else {
processedTag = tag
}
}
}

View File

@@ -7,6 +7,8 @@ export interface BlockTagGroup {
blockType: string
tags: string[]
distance: number
/** True if this is a contextual group (loop/parallel iteration context available inside the subflow) */
isContextual?: boolean
}
/**

View File

@@ -242,15 +242,9 @@ Return ONLY the email body - no explanations, no extra text.`,
id: 'messageId',
title: 'Message ID',
type: 'short-input',
placeholder: 'Enter message ID to read (optional)',
condition: {
field: 'operation',
value: 'read_gmail',
and: {
field: 'folder',
value: '',
},
},
placeholder: 'Read specific email by ID (overrides label/folder)',
condition: { field: 'operation', value: 'read_gmail' },
mode: 'advanced',
},
// Search Fields
{

View File

@@ -129,12 +129,9 @@ ROUTING RULES:
3. If the context is even partially related to a route's description, select that route
4. ONLY output NO_MATCH if the context is completely unrelated to ALL route descriptions
OUTPUT FORMAT:
- Output EXACTLY one route ID (copied exactly as shown above) OR "NO_MATCH"
- No explanation, no punctuation, no additional text
- Just the route ID or NO_MATCH
Your response:`
Respond with a JSON object containing:
- route: EXACTLY one route ID (copied exactly as shown above) OR "NO_MATCH"
- reasoning: A brief explanation (1-2 sentences) of why you chose this route`
}
/**
@@ -272,6 +269,7 @@ interface RouterV2Response extends ToolResponse {
total: number
}
selectedRoute: string
reasoning: string
selectedPath: {
blockId: string
blockType: string
@@ -355,6 +353,7 @@ export const RouterV2Block: BlockConfig<RouterV2Response> = {
tokens: { type: 'json', description: 'Token usage' },
cost: { type: 'json', description: 'Cost information' },
selectedRoute: { type: 'string', description: 'Selected route ID' },
reasoning: { type: 'string', description: 'Explanation of why this route was chosen' },
selectedPath: { type: 'json', description: 'Selected routing path' },
},
}

View File

@@ -120,6 +120,12 @@ export const SPECIAL_REFERENCE_PREFIXES = [
REFERENCE.PREFIX.VARIABLE,
] as const
export const RESERVED_BLOCK_NAMES = [
REFERENCE.PREFIX.LOOP,
REFERENCE.PREFIX.PARALLEL,
REFERENCE.PREFIX.VARIABLE,
] as const
export const LOOP_REFERENCE = {
ITERATION: 'iteration',
INDEX: 'index',

View File

@@ -24,6 +24,71 @@ function createBlock(id: string, metadataId: string): SerializedBlock {
}
}
describe('DAGBuilder disabled subflow validation', () => {
it('skips validation for disabled loops with no blocks inside', () => {
const workflow: SerializedWorkflow = {
version: '1',
blocks: [
createBlock('start', BlockType.STARTER),
{ ...createBlock('loop-block', BlockType.FUNCTION), enabled: false },
],
connections: [],
loops: {
'loop-1': {
id: 'loop-1',
nodes: [], // Empty loop - would normally throw
iterations: 3,
},
},
}
const builder = new DAGBuilder()
// Should not throw even though loop has no blocks inside
expect(() => builder.build(workflow)).not.toThrow()
})
it('skips validation for disabled parallels with no blocks inside', () => {
const workflow: SerializedWorkflow = {
version: '1',
blocks: [createBlock('start', BlockType.STARTER)],
connections: [],
loops: {},
parallels: {
'parallel-1': {
id: 'parallel-1',
nodes: [], // Empty parallel - would normally throw
},
},
}
const builder = new DAGBuilder()
// Should not throw even though parallel has no blocks inside
expect(() => builder.build(workflow)).not.toThrow()
})
it('skips validation for loops where all inner blocks are disabled', () => {
const workflow: SerializedWorkflow = {
version: '1',
blocks: [
createBlock('start', BlockType.STARTER),
{ ...createBlock('inner-block', BlockType.FUNCTION), enabled: false },
],
connections: [],
loops: {
'loop-1': {
id: 'loop-1',
nodes: ['inner-block'], // Has node but it's disabled
iterations: 3,
},
},
}
const builder = new DAGBuilder()
// Should not throw - loop is effectively disabled since all inner blocks are disabled
expect(() => builder.build(workflow)).not.toThrow()
})
})
describe('DAGBuilder human-in-the-loop transformation', () => {
it('creates trigger nodes and rewires edges for pause blocks', () => {
const workflow: SerializedWorkflow = {

View File

@@ -136,17 +136,18 @@ export class DAGBuilder {
nodes: string[] | undefined,
type: 'Loop' | 'Parallel'
): void {
const sentinelStartId =
type === 'Loop' ? buildSentinelStartId(id) : buildParallelSentinelStartId(id)
const sentinelStartNode = dag.nodes.get(sentinelStartId)
if (!sentinelStartNode) return
if (!nodes || nodes.length === 0) {
throw new Error(
`${type} has no blocks inside. Add at least one block to the ${type.toLowerCase()}.`
)
}
const sentinelStartId =
type === 'Loop' ? buildSentinelStartId(id) : buildParallelSentinelStartId(id)
const sentinelStartNode = dag.nodes.get(sentinelStartId)
if (!sentinelStartNode) return
const hasConnections = Array.from(sentinelStartNode.outgoingEdges.values()).some((edge) =>
nodes.includes(extractBaseBlockId(edge.target))
)

File diff suppressed because it is too large Load Diff

View File

@@ -20,21 +20,13 @@ export class EdgeManager {
const activatedTargets: string[] = []
const edgesToDeactivate: Array<{ target: string; handle?: string }> = []
// First pass: categorize edges as activating or deactivating
// Don't modify incomingEdges yet - we need the original state for deactivation checks
for (const [edgeId, edge] of node.outgoingEdges) {
for (const [, edge] of node.outgoingEdges) {
if (skipBackwardsEdge && this.isBackwardsEdge(edge.sourceHandle)) {
continue
}
const shouldActivate = this.shouldActivateEdge(edge, output)
if (!shouldActivate) {
const isLoopEdge =
edge.sourceHandle === EDGE.LOOP_CONTINUE ||
edge.sourceHandle === EDGE.LOOP_CONTINUE_ALT ||
edge.sourceHandle === EDGE.LOOP_EXIT
if (!isLoopEdge) {
if (!this.shouldActivateEdge(edge, output)) {
if (!this.isLoopEdge(edge.sourceHandle)) {
edgesToDeactivate.push({ target: edge.target, handle: edge.sourceHandle })
}
continue
@@ -43,13 +35,19 @@ export class EdgeManager {
activatedTargets.push(edge.target)
}
// Second pass: process deactivations while incomingEdges is still intact
// This ensures hasActiveIncomingEdges can find all potential sources
const cascadeTargets = new Set<string>()
for (const { target, handle } of edgesToDeactivate) {
this.deactivateEdgeAndDescendants(node.id, target, handle)
this.deactivateEdgeAndDescendants(node.id, target, handle, cascadeTargets)
}
if (activatedTargets.length === 0) {
for (const { target } of edgesToDeactivate) {
if (this.isTerminalControlNode(target)) {
cascadeTargets.add(target)
}
}
}
// Third pass: update incomingEdges for activated targets
for (const targetId of activatedTargets) {
const targetNode = this.dag.nodes.get(targetId)
if (!targetNode) {
@@ -59,28 +57,25 @@ export class EdgeManager {
targetNode.incomingEdges.delete(node.id)
}
// Fourth pass: check readiness after all edge processing is complete
for (const targetId of activatedTargets) {
const targetNode = this.dag.nodes.get(targetId)
if (targetNode && this.isNodeReady(targetNode)) {
if (this.isTargetReady(targetId)) {
readyNodes.push(targetId)
}
}
for (const targetId of cascadeTargets) {
if (!readyNodes.includes(targetId) && !activatedTargets.includes(targetId)) {
if (this.isTargetReady(targetId)) {
readyNodes.push(targetId)
}
}
}
return readyNodes
}
isNodeReady(node: DAGNode): boolean {
if (node.incomingEdges.size === 0) {
return true
}
const activeIncomingCount = this.countActiveIncomingEdges(node)
if (activeIncomingCount > 0) {
return false
}
return true
return node.incomingEdges.size === 0 || this.countActiveIncomingEdges(node) === 0
}
restoreIncomingEdge(targetNodeId: string, sourceNodeId: string): void {
@@ -99,13 +94,10 @@ export class EdgeManager {
/**
* Clear deactivated edges for a set of nodes (used when restoring loop state for next iteration).
* This ensures error/success edges can be re-evaluated on each iteration.
*/
clearDeactivatedEdgesForNodes(nodeIds: Set<string>): void {
const edgesToRemove: string[] = []
for (const edgeKey of this.deactivatedEdges) {
// Edge key format is "sourceId-targetId-handle"
// Check if either source or target is in the nodeIds set
for (const nodeId of nodeIds) {
if (edgeKey.startsWith(`${nodeId}-`) || edgeKey.includes(`-${nodeId}-`)) {
edgesToRemove.push(edgeKey)
@@ -118,6 +110,44 @@ export class EdgeManager {
}
}
private isTargetReady(targetId: string): boolean {
const targetNode = this.dag.nodes.get(targetId)
return targetNode ? this.isNodeReady(targetNode) : false
}
private isLoopEdge(handle?: string): boolean {
return (
handle === EDGE.LOOP_CONTINUE ||
handle === EDGE.LOOP_CONTINUE_ALT ||
handle === EDGE.LOOP_EXIT
)
}
private isControlEdge(handle?: string): boolean {
return (
handle === EDGE.LOOP_CONTINUE ||
handle === EDGE.LOOP_CONTINUE_ALT ||
handle === EDGE.LOOP_EXIT ||
handle === EDGE.PARALLEL_EXIT
)
}
private isBackwardsEdge(sourceHandle?: string): boolean {
return sourceHandle === EDGE.LOOP_CONTINUE || sourceHandle === EDGE.LOOP_CONTINUE_ALT
}
private isTerminalControlNode(nodeId: string): boolean {
const node = this.dag.nodes.get(nodeId)
if (!node || node.outgoingEdges.size === 0) return false
for (const [, edge] of node.outgoingEdges) {
if (!this.isControlEdge(edge.sourceHandle)) {
return false
}
}
return true
}
private shouldActivateEdge(edge: DAGEdge, output: NormalizedBlockOutput): boolean {
const handle = edge.sourceHandle
@@ -159,14 +189,12 @@ export class EdgeManager {
}
}
private isBackwardsEdge(sourceHandle?: string): boolean {
return sourceHandle === EDGE.LOOP_CONTINUE || sourceHandle === EDGE.LOOP_CONTINUE_ALT
}
private deactivateEdgeAndDescendants(
sourceId: string,
targetId: string,
sourceHandle?: string
sourceHandle?: string,
cascadeTargets?: Set<string>,
isCascade = false
): void {
const edgeKey = this.createEdgeKey(sourceId, targetId, sourceHandle)
if (this.deactivatedEdges.has(edgeKey)) {
@@ -174,38 +202,46 @@ export class EdgeManager {
}
this.deactivatedEdges.add(edgeKey)
const targetNode = this.dag.nodes.get(targetId)
if (!targetNode) return
// Check if target has other active incoming edges
// Pass the specific edge key being deactivated, not just source ID,
// to handle multiple edges from same source to same target (e.g., condition branches)
const hasOtherActiveIncoming = this.hasActiveIncomingEdges(targetNode, edgeKey)
if (!hasOtherActiveIncoming) {
for (const [_, outgoingEdge] of targetNode.outgoingEdges) {
this.deactivateEdgeAndDescendants(targetId, outgoingEdge.target, outgoingEdge.sourceHandle)
if (isCascade && this.isTerminalControlNode(targetId)) {
cascadeTargets?.add(targetId)
}
if (this.hasActiveIncomingEdges(targetNode, edgeKey)) {
return
}
for (const [, outgoingEdge] of targetNode.outgoingEdges) {
if (!this.isControlEdge(outgoingEdge.sourceHandle)) {
this.deactivateEdgeAndDescendants(
targetId,
outgoingEdge.target,
outgoingEdge.sourceHandle,
cascadeTargets,
true
)
}
}
}
/**
* Checks if a node has any active incoming edges besides the one being excluded.
* This properly handles the case where multiple edges from the same source go to
* the same target (e.g., multiple condition branches pointing to one block).
*/
private hasActiveIncomingEdges(node: DAGNode, excludeEdgeKey: string): boolean {
for (const incomingSourceId of node.incomingEdges) {
const incomingNode = this.dag.nodes.get(incomingSourceId)
if (!incomingNode) continue
for (const [_, incomingEdge] of incomingNode.outgoingEdges) {
for (const [, incomingEdge] of incomingNode.outgoingEdges) {
if (incomingEdge.target === node.id) {
const incomingEdgeKey = this.createEdgeKey(
incomingSourceId,
node.id,
incomingEdge.sourceHandle
)
// Skip the specific edge being excluded, but check other edges from same source
if (incomingEdgeKey === excludeEdgeKey) continue
if (!this.deactivatedEdges.has(incomingEdgeKey)) {
return true

View File

@@ -554,6 +554,413 @@ describe('ExecutionEngine', () => {
})
})
describe('Error handling in execution', () => {
it('should fail execution when a single node throws an error', async () => {
const startNode = createMockNode('start', 'starter')
const errorNode = createMockNode('error-node', 'function')
startNode.outgoingEdges.set('edge1', { target: 'error-node' })
const dag = createMockDAG([startNode, errorNode])
const context = createMockContext()
const edgeManager = createMockEdgeManager((node) => {
if (node.id === 'start') return ['error-node']
return []
})
const nodeOrchestrator = {
executionCount: 0,
executeNode: vi.fn().mockImplementation(async (_ctx: ExecutionContext, nodeId: string) => {
if (nodeId === 'error-node') {
throw new Error('Block execution failed')
}
return { nodeId, output: {}, isFinalOutput: false }
}),
handleNodeCompletion: vi.fn(),
} as unknown as MockNodeOrchestrator
const engine = new ExecutionEngine(context, dag, edgeManager, nodeOrchestrator)
await expect(engine.run('start')).rejects.toThrow('Block execution failed')
})
it('should stop parallel branches when one branch throws an error', async () => {
const startNode = createMockNode('start', 'starter')
const parallelNodes = Array.from({ length: 5 }, (_, i) =>
createMockNode(`parallel${i}`, 'function')
)
parallelNodes.forEach((_, i) => {
startNode.outgoingEdges.set(`edge${i}`, { target: `parallel${i}` })
})
const dag = createMockDAG([startNode, ...parallelNodes])
const context = createMockContext()
const edgeManager = createMockEdgeManager((node) => {
if (node.id === 'start') return parallelNodes.map((_, i) => `parallel${i}`)
return []
})
const executedNodes: string[] = []
const nodeOrchestrator = {
executionCount: 0,
executeNode: vi.fn().mockImplementation(async (_ctx: ExecutionContext, nodeId: string) => {
executedNodes.push(nodeId)
if (nodeId === 'parallel0') {
await new Promise((resolve) => setTimeout(resolve, 10))
throw new Error('Parallel branch failed')
}
await new Promise((resolve) => setTimeout(resolve, 100))
return { nodeId, output: {}, isFinalOutput: false }
}),
handleNodeCompletion: vi.fn(),
} as unknown as MockNodeOrchestrator
const engine = new ExecutionEngine(context, dag, edgeManager, nodeOrchestrator)
await expect(engine.run('start')).rejects.toThrow('Parallel branch failed')
})
it('should capture only the first error when multiple parallel branches fail', async () => {
const startNode = createMockNode('start', 'starter')
const parallelNodes = Array.from({ length: 3 }, (_, i) =>
createMockNode(`parallel${i}`, 'function')
)
parallelNodes.forEach((_, i) => {
startNode.outgoingEdges.set(`edge${i}`, { target: `parallel${i}` })
})
const dag = createMockDAG([startNode, ...parallelNodes])
const context = createMockContext()
const edgeManager = createMockEdgeManager((node) => {
if (node.id === 'start') return parallelNodes.map((_, i) => `parallel${i}`)
return []
})
const nodeOrchestrator = {
executionCount: 0,
executeNode: vi.fn().mockImplementation(async (_ctx: ExecutionContext, nodeId: string) => {
if (nodeId === 'parallel0') {
await new Promise((resolve) => setTimeout(resolve, 10))
throw new Error('First error')
}
if (nodeId === 'parallel1') {
await new Promise((resolve) => setTimeout(resolve, 20))
throw new Error('Second error')
}
if (nodeId === 'parallel2') {
await new Promise((resolve) => setTimeout(resolve, 30))
throw new Error('Third error')
}
return { nodeId, output: {}, isFinalOutput: false }
}),
handleNodeCompletion: vi.fn(),
} as unknown as MockNodeOrchestrator
const engine = new ExecutionEngine(context, dag, edgeManager, nodeOrchestrator)
await expect(engine.run('start')).rejects.toThrow('First error')
})
it('should wait for ongoing executions to complete before throwing error', async () => {
const startNode = createMockNode('start', 'starter')
const fastErrorNode = createMockNode('fast-error', 'function')
const slowNode = createMockNode('slow', 'function')
startNode.outgoingEdges.set('edge1', { target: 'fast-error' })
startNode.outgoingEdges.set('edge2', { target: 'slow' })
const dag = createMockDAG([startNode, fastErrorNode, slowNode])
const context = createMockContext()
const edgeManager = createMockEdgeManager((node) => {
if (node.id === 'start') return ['fast-error', 'slow']
return []
})
let slowNodeCompleted = false
const nodeOrchestrator = {
executionCount: 0,
executeNode: vi.fn().mockImplementation(async (_ctx: ExecutionContext, nodeId: string) => {
if (nodeId === 'fast-error') {
await new Promise((resolve) => setTimeout(resolve, 10))
throw new Error('Fast error')
}
if (nodeId === 'slow') {
await new Promise((resolve) => setTimeout(resolve, 50))
slowNodeCompleted = true
return { nodeId, output: {}, isFinalOutput: false }
}
return { nodeId, output: {}, isFinalOutput: false }
}),
handleNodeCompletion: vi.fn(),
} as unknown as MockNodeOrchestrator
const engine = new ExecutionEngine(context, dag, edgeManager, nodeOrchestrator)
await expect(engine.run('start')).rejects.toThrow('Fast error')
expect(slowNodeCompleted).toBe(true)
})
it('should not queue new nodes after an error occurs', async () => {
const startNode = createMockNode('start', 'starter')
const errorNode = createMockNode('error-node', 'function')
const afterErrorNode = createMockNode('after-error', 'function')
startNode.outgoingEdges.set('edge1', { target: 'error-node' })
errorNode.outgoingEdges.set('edge2', { target: 'after-error' })
const dag = createMockDAG([startNode, errorNode, afterErrorNode])
const context = createMockContext()
const queuedNodes: string[] = []
const edgeManager = createMockEdgeManager((node) => {
if (node.id === 'start') {
queuedNodes.push('error-node')
return ['error-node']
}
if (node.id === 'error-node') {
queuedNodes.push('after-error')
return ['after-error']
}
return []
})
const executedNodes: string[] = []
const nodeOrchestrator = {
executionCount: 0,
executeNode: vi.fn().mockImplementation(async (_ctx: ExecutionContext, nodeId: string) => {
executedNodes.push(nodeId)
if (nodeId === 'error-node') {
throw new Error('Node error')
}
return { nodeId, output: {}, isFinalOutput: false }
}),
handleNodeCompletion: vi.fn(),
} as unknown as MockNodeOrchestrator
const engine = new ExecutionEngine(context, dag, edgeManager, nodeOrchestrator)
await expect(engine.run('start')).rejects.toThrow('Node error')
expect(executedNodes).not.toContain('after-error')
})
it('should populate error result with metadata when execution fails', async () => {
const startNode = createMockNode('start', 'starter')
const errorNode = createMockNode('error-node', 'function')
startNode.outgoingEdges.set('edge1', { target: 'error-node' })
const dag = createMockDAG([startNode, errorNode])
const context = createMockContext()
context.blockLogs.push({
blockId: 'start',
blockName: 'Start',
blockType: 'starter',
startedAt: new Date().toISOString(),
endedAt: new Date().toISOString(),
durationMs: 10,
success: true,
})
const edgeManager = createMockEdgeManager((node) => {
if (node.id === 'start') return ['error-node']
return []
})
const nodeOrchestrator = {
executionCount: 0,
executeNode: vi.fn().mockImplementation(async (_ctx: ExecutionContext, nodeId: string) => {
if (nodeId === 'error-node') {
const error = new Error('Execution failed') as any
error.executionResult = {
success: false,
output: { partial: 'data' },
logs: context.blockLogs,
metadata: context.metadata,
}
throw error
}
return { nodeId, output: {}, isFinalOutput: false }
}),
handleNodeCompletion: vi.fn(),
} as unknown as MockNodeOrchestrator
const engine = new ExecutionEngine(context, dag, edgeManager, nodeOrchestrator)
try {
await engine.run('start')
expect.fail('Should have thrown')
} catch (error: any) {
expect(error.executionResult).toBeDefined()
expect(error.executionResult.metadata.endTime).toBeDefined()
expect(error.executionResult.metadata.duration).toBeDefined()
}
})
it('should prefer cancellation status over error when both occur', async () => {
const abortController = new AbortController()
const startNode = createMockNode('start', 'starter')
const errorNode = createMockNode('error-node', 'function')
startNode.outgoingEdges.set('edge1', { target: 'error-node' })
const dag = createMockDAG([startNode, errorNode])
const context = createMockContext({ abortSignal: abortController.signal })
const edgeManager = createMockEdgeManager((node) => {
if (node.id === 'start') return ['error-node']
return []
})
const nodeOrchestrator = {
executionCount: 0,
executeNode: vi.fn().mockImplementation(async (_ctx: ExecutionContext, nodeId: string) => {
if (nodeId === 'error-node') {
abortController.abort()
throw new Error('Node error')
}
return { nodeId, output: {}, isFinalOutput: false }
}),
handleNodeCompletion: vi.fn(),
} as unknown as MockNodeOrchestrator
const engine = new ExecutionEngine(context, dag, edgeManager, nodeOrchestrator)
const result = await engine.run('start')
expect(result.status).toBe('cancelled')
expect(result.success).toBe(false)
})
it('should stop loop iteration when error occurs in loop body', async () => {
const loopStartNode = createMockNode('loop-start', 'loop_sentinel')
loopStartNode.metadata = { isSentinel: true, sentinelType: 'start', loopId: 'loop1' }
const loopBodyNode = createMockNode('loop-body', 'function')
loopBodyNode.metadata = { isLoopNode: true, loopId: 'loop1' }
const loopEndNode = createMockNode('loop-end', 'loop_sentinel')
loopEndNode.metadata = { isSentinel: true, sentinelType: 'end', loopId: 'loop1' }
const afterLoopNode = createMockNode('after-loop', 'function')
loopStartNode.outgoingEdges.set('edge1', { target: 'loop-body' })
loopBodyNode.outgoingEdges.set('edge2', { target: 'loop-end' })
loopEndNode.outgoingEdges.set('loop_continue', {
target: 'loop-start',
sourceHandle: 'loop_continue',
})
loopEndNode.outgoingEdges.set('loop_complete', {
target: 'after-loop',
sourceHandle: 'loop_complete',
})
const dag = createMockDAG([loopStartNode, loopBodyNode, loopEndNode, afterLoopNode])
const context = createMockContext()
let iterationCount = 0
const edgeManager = createMockEdgeManager((node) => {
if (node.id === 'loop-start') return ['loop-body']
if (node.id === 'loop-body') return ['loop-end']
if (node.id === 'loop-end') {
iterationCount++
if (iterationCount < 5) return ['loop-start']
return ['after-loop']
}
return []
})
const nodeOrchestrator = {
executionCount: 0,
executeNode: vi.fn().mockImplementation(async (_ctx: ExecutionContext, nodeId: string) => {
if (nodeId === 'loop-body' && iterationCount >= 2) {
throw new Error('Loop body error on iteration 3')
}
return { nodeId, output: {}, isFinalOutput: false }
}),
handleNodeCompletion: vi.fn(),
} as unknown as MockNodeOrchestrator
const engine = new ExecutionEngine(context, dag, edgeManager, nodeOrchestrator)
await expect(engine.run('loop-start')).rejects.toThrow('Loop body error on iteration 3')
expect(iterationCount).toBeLessThanOrEqual(3)
})
it('should handle error that is not an Error instance', async () => {
const startNode = createMockNode('start', 'starter')
const errorNode = createMockNode('error-node', 'function')
startNode.outgoingEdges.set('edge1', { target: 'error-node' })
const dag = createMockDAG([startNode, errorNode])
const context = createMockContext()
const edgeManager = createMockEdgeManager((node) => {
if (node.id === 'start') return ['error-node']
return []
})
const nodeOrchestrator = {
executionCount: 0,
executeNode: vi.fn().mockImplementation(async (_ctx: ExecutionContext, nodeId: string) => {
if (nodeId === 'error-node') {
throw 'String error message'
}
return { nodeId, output: {}, isFinalOutput: false }
}),
handleNodeCompletion: vi.fn(),
} as unknown as MockNodeOrchestrator
const engine = new ExecutionEngine(context, dag, edgeManager, nodeOrchestrator)
await expect(engine.run('start')).rejects.toThrow('String error message')
})
it('should preserve partial output when error occurs after some blocks complete', async () => {
const startNode = createMockNode('start', 'starter')
const successNode = createMockNode('success', 'function')
const errorNode = createMockNode('error-node', 'function')
startNode.outgoingEdges.set('edge1', { target: 'success' })
successNode.outgoingEdges.set('edge2', { target: 'error-node' })
const dag = createMockDAG([startNode, successNode, errorNode])
const context = createMockContext()
const edgeManager = createMockEdgeManager((node) => {
if (node.id === 'start') return ['success']
if (node.id === 'success') return ['error-node']
return []
})
const nodeOrchestrator = {
executionCount: 0,
executeNode: vi.fn().mockImplementation(async (_ctx: ExecutionContext, nodeId: string) => {
if (nodeId === 'success') {
return { nodeId, output: { successData: 'preserved' }, isFinalOutput: false }
}
if (nodeId === 'error-node') {
throw new Error('Late error')
}
return { nodeId, output: {}, isFinalOutput: false }
}),
handleNodeCompletion: vi.fn(),
} as unknown as MockNodeOrchestrator
const engine = new ExecutionEngine(context, dag, edgeManager, nodeOrchestrator)
try {
await engine.run('start')
expect.fail('Should have thrown')
} catch (error: any) {
// Verify the error was thrown
expect(error.message).toBe('Late error')
// The partial output should be available in executionResult if attached
if (error.executionResult) {
expect(error.executionResult.output).toBeDefined()
}
}
})
})
describe('Cancellation flag behavior', () => {
it('should set cancelledFlag when abort signal fires', async () => {
const abortController = new AbortController()

View File

@@ -25,6 +25,8 @@ export class ExecutionEngine {
private pausedBlocks: Map<string, PauseMetadata> = new Map()
private allowResumeTriggers: boolean
private cancelledFlag = false
private errorFlag = false
private executionError: Error | null = null
private lastCancellationCheck = 0
private readonly useRedisCancellation: boolean
private readonly CANCELLATION_CHECK_INTERVAL_MS = 500
@@ -103,7 +105,7 @@ export class ExecutionEngine {
this.initializeQueue(triggerBlockId)
while (this.hasWork()) {
if (await this.checkCancellation()) {
if ((await this.checkCancellation()) || this.errorFlag) {
break
}
await this.processQueue()
@@ -113,6 +115,11 @@ export class ExecutionEngine {
await this.waitForAllExecutions()
}
// Rethrow the captured error so it's handled by the catch block
if (this.errorFlag && this.executionError) {
throw this.executionError
}
if (this.pausedBlocks.size > 0) {
return this.buildPausedResult(startTime)
}
@@ -196,11 +203,17 @@ export class ExecutionEngine {
}
private trackExecution(promise: Promise<void>): void {
this.executing.add(promise)
promise.catch(() => {})
promise.finally(() => {
this.executing.delete(promise)
})
const trackedPromise = promise
.catch((error) => {
if (!this.errorFlag) {
this.errorFlag = true
this.executionError = error instanceof Error ? error : new Error(String(error))
}
})
.finally(() => {
this.executing.delete(trackedPromise)
})
this.executing.add(trackedPromise)
}
private async waitForAnyExecution(): Promise<void> {
@@ -315,7 +328,7 @@ export class ExecutionEngine {
private async processQueue(): Promise<void> {
while (this.readyQueue.length > 0) {
if (await this.checkCancellation()) {
if ((await this.checkCancellation()) || this.errorFlag) {
break
}
const nodeId = this.dequeue()
@@ -324,7 +337,7 @@ export class ExecutionEngine {
this.trackExecution(promise)
}
if (this.executing.size > 0 && !this.cancelledFlag) {
if (this.executing.size > 0 && !this.cancelledFlag && !this.errorFlag) {
await this.waitForAnyExecution()
}
}

View File

@@ -305,7 +305,7 @@ export class AgentBlockHandler implements BlockHandler {
base.executeFunction = async (callParams: Record<string, any>) => {
const mergedParams = mergeToolParameters(userProvidedParams, callParams)
const { blockData, blockNameMapping } = collectBlockData(ctx)
const { blockData, blockNameMapping, blockOutputSchemas } = collectBlockData(ctx)
const result = await executeTool(
'function_execute',
@@ -317,6 +317,7 @@ export class AgentBlockHandler implements BlockHandler {
workflowVariables: ctx.workflowVariables || {},
blockData,
blockNameMapping,
blockOutputSchemas,
isCustomTool: true,
_context: {
workflowId: ctx.workflowId,

View File

@@ -26,7 +26,7 @@ export async function evaluateConditionExpression(
const contextSetup = `const context = ${JSON.stringify(evalContext)};`
const code = `${contextSetup}\nreturn Boolean(${conditionExpression})`
const { blockData, blockNameMapping } = collectBlockData(ctx)
const { blockData, blockNameMapping, blockOutputSchemas } = collectBlockData(ctx)
const result = await executeTool(
'function_execute',
@@ -37,6 +37,7 @@ export async function evaluateConditionExpression(
workflowVariables: ctx.workflowVariables || {},
blockData,
blockNameMapping,
blockOutputSchemas,
_context: {
workflowId: ctx.workflowId,
workspaceId: ctx.workspaceId,

View File

@@ -75,7 +75,12 @@ describe('FunctionBlockHandler', () => {
workflowVariables: {},
blockData: {},
blockNameMapping: {},
_context: { workflowId: mockContext.workflowId, workspaceId: mockContext.workspaceId },
blockOutputSchemas: {},
_context: {
workflowId: mockContext.workflowId,
workspaceId: mockContext.workspaceId,
isDeployedContext: mockContext.isDeployedContext,
},
}
const expectedOutput: any = { result: 'Success' }
@@ -84,8 +89,8 @@ describe('FunctionBlockHandler', () => {
expect(mockExecuteTool).toHaveBeenCalledWith(
'function_execute',
expectedToolParams,
false, // skipPostProcess
mockContext // execution context
false,
mockContext
)
expect(result).toEqual(expectedOutput)
})
@@ -107,7 +112,12 @@ describe('FunctionBlockHandler', () => {
workflowVariables: {},
blockData: {},
blockNameMapping: {},
_context: { workflowId: mockContext.workflowId, workspaceId: mockContext.workspaceId },
blockOutputSchemas: {},
_context: {
workflowId: mockContext.workflowId,
workspaceId: mockContext.workspaceId,
isDeployedContext: mockContext.isDeployedContext,
},
}
const expectedOutput: any = { result: 'Success' }
@@ -116,8 +126,8 @@ describe('FunctionBlockHandler', () => {
expect(mockExecuteTool).toHaveBeenCalledWith(
'function_execute',
expectedToolParams,
false, // skipPostProcess
mockContext // execution context
false,
mockContext
)
expect(result).toEqual(expectedOutput)
})
@@ -132,7 +142,12 @@ describe('FunctionBlockHandler', () => {
workflowVariables: {},
blockData: {},
blockNameMapping: {},
_context: { workflowId: mockContext.workflowId, workspaceId: mockContext.workspaceId },
blockOutputSchemas: {},
_context: {
workflowId: mockContext.workflowId,
workspaceId: mockContext.workspaceId,
isDeployedContext: mockContext.isDeployedContext,
},
}
await handler.execute(mockContext, mockBlock, inputs)

View File

@@ -23,7 +23,7 @@ export class FunctionBlockHandler implements BlockHandler {
? inputs.code.map((c: { content: string }) => c.content).join('\n')
: inputs.code
const { blockData, blockNameMapping } = collectBlockData(ctx)
const { blockData, blockNameMapping, blockOutputSchemas } = collectBlockData(ctx)
const result = await executeTool(
'function_execute',
@@ -35,6 +35,7 @@ export class FunctionBlockHandler implements BlockHandler {
workflowVariables: ctx.workflowVariables || {},
blockData,
blockNameMapping,
blockOutputSchemas,
_context: {
workflowId: ctx.workflowId,
workspaceId: ctx.workspaceId,

View File

@@ -1,7 +1,7 @@
import '@sim/testing/mocks/executor'
import { beforeEach, describe, expect, it, type Mock, vi } from 'vitest'
import { generateRouterPrompt } from '@/blocks/blocks/router'
import { generateRouterPrompt, generateRouterV2Prompt } from '@/blocks/blocks/router'
import { BlockType } from '@/executor/constants'
import { RouterBlockHandler } from '@/executor/handlers/router/router-handler'
import type { ExecutionContext } from '@/executor/types'
@@ -9,6 +9,7 @@ import { getProviderFromModel } from '@/providers/utils'
import type { SerializedBlock, SerializedWorkflow } from '@/serializer/types'
const mockGenerateRouterPrompt = generateRouterPrompt as Mock
const mockGenerateRouterV2Prompt = generateRouterV2Prompt as Mock
const mockGetProviderFromModel = getProviderFromModel as Mock
const mockFetch = global.fetch as unknown as Mock
@@ -44,7 +45,7 @@ describe('RouterBlockHandler', () => {
metadata: { id: BlockType.ROUTER, name: 'Test Router' },
position: { x: 50, y: 50 },
config: { tool: BlockType.ROUTER, params: {} },
inputs: { prompt: 'string', model: 'string' }, // Using ParamType strings
inputs: { prompt: 'string', model: 'string' },
outputs: {},
enabled: true,
}
@@ -72,14 +73,11 @@ describe('RouterBlockHandler', () => {
workflow: mockWorkflow as SerializedWorkflow,
}
// Reset mocks using vi
vi.clearAllMocks()
// Default mock implementations
mockGetProviderFromModel.mockReturnValue('openai')
mockGenerateRouterPrompt.mockReturnValue('Generated System Prompt')
// Set up fetch mock to return a successful response
mockFetch.mockImplementation(() => {
return Promise.resolve({
ok: true,
@@ -147,7 +145,6 @@ describe('RouterBlockHandler', () => {
})
)
// Verify the request body contains the expected data
const fetchCallArgs = mockFetch.mock.calls[0]
const requestBody = JSON.parse(fetchCallArgs[1].body)
expect(requestBody).toMatchObject({
@@ -180,7 +177,6 @@ describe('RouterBlockHandler', () => {
const inputs = { prompt: 'Test' }
mockContext.workflow!.blocks = [mockBlock, mockTargetBlock2]
// Expect execute to throw because getTargetBlocks (called internally) will throw
await expect(handler.execute(mockContext, mockBlock, inputs)).rejects.toThrow(
'Target block target-block-1 not found'
)
@@ -190,7 +186,6 @@ describe('RouterBlockHandler', () => {
it('should throw error if LLM response is not a valid target block ID', async () => {
const inputs = { prompt: 'Test', apiKey: 'test-api-key' }
// Override fetch mock to return an invalid block ID
mockFetch.mockImplementationOnce(() => {
return Promise.resolve({
ok: true,
@@ -228,7 +223,6 @@ describe('RouterBlockHandler', () => {
it('should handle server error responses', async () => {
const inputs = { prompt: 'Test error handling.', apiKey: 'test-api-key' }
// Override fetch mock to return an error
mockFetch.mockImplementationOnce(() => {
return Promise.resolve({
ok: false,
@@ -276,13 +270,12 @@ describe('RouterBlockHandler', () => {
mockGetProviderFromModel.mockReturnValue('vertex')
// Mock the database query for Vertex credential
const mockDb = await import('@sim/db')
const mockAccount = {
id: 'test-vertex-credential-id',
accessToken: 'mock-access-token',
refreshToken: 'mock-refresh-token',
expiresAt: new Date(Date.now() + 3600000), // 1 hour from now
expiresAt: new Date(Date.now() + 3600000),
}
vi.spyOn(mockDb.db.query.account, 'findFirst').mockResolvedValue(mockAccount as any)
@@ -300,3 +293,287 @@ describe('RouterBlockHandler', () => {
expect(requestBody.apiKey).toBe('mock-access-token')
})
})
describe('RouterBlockHandler V2', () => {
let handler: RouterBlockHandler
let mockRouterV2Block: SerializedBlock
let mockContext: ExecutionContext
let mockWorkflow: Partial<SerializedWorkflow>
let mockTargetBlock1: SerializedBlock
let mockTargetBlock2: SerializedBlock
beforeEach(() => {
mockTargetBlock1 = {
id: 'target-block-1',
metadata: { id: 'agent', name: 'Support Agent' },
position: { x: 100, y: 100 },
config: { tool: 'agent', params: {} },
inputs: {},
outputs: {},
enabled: true,
}
mockTargetBlock2 = {
id: 'target-block-2',
metadata: { id: 'agent', name: 'Sales Agent' },
position: { x: 100, y: 150 },
config: { tool: 'agent', params: {} },
inputs: {},
outputs: {},
enabled: true,
}
mockRouterV2Block = {
id: 'router-v2-block-1',
metadata: { id: BlockType.ROUTER_V2, name: 'Test Router V2' },
position: { x: 50, y: 50 },
config: { tool: BlockType.ROUTER_V2, params: {} },
inputs: {},
outputs: {},
enabled: true,
}
mockWorkflow = {
blocks: [mockRouterV2Block, mockTargetBlock1, mockTargetBlock2],
connections: [
{
source: mockRouterV2Block.id,
target: mockTargetBlock1.id,
sourceHandle: 'router-route-support',
},
{
source: mockRouterV2Block.id,
target: mockTargetBlock2.id,
sourceHandle: 'router-route-sales',
},
],
}
handler = new RouterBlockHandler({})
mockContext = {
workflowId: 'test-workflow-id',
blockStates: new Map(),
blockLogs: [],
metadata: { duration: 0 },
environmentVariables: {},
decisions: { router: new Map(), condition: new Map() },
loopExecutions: new Map(),
completedLoops: new Set(),
executedBlocks: new Set(),
activeExecutionPath: new Set(),
workflow: mockWorkflow as SerializedWorkflow,
}
vi.clearAllMocks()
mockGetProviderFromModel.mockReturnValue('openai')
mockGenerateRouterV2Prompt.mockReturnValue('Generated V2 System Prompt')
})
it('should handle router_v2 blocks', () => {
expect(handler.canHandle(mockRouterV2Block)).toBe(true)
})
it('should execute router V2 and return reasoning', async () => {
const inputs = {
context: 'I need help with a billing issue',
model: 'gpt-4o',
apiKey: 'test-api-key',
routes: JSON.stringify([
{ id: 'route-support', title: 'Support', value: 'Customer support inquiries' },
{ id: 'route-sales', title: 'Sales', value: 'Sales and pricing questions' },
]),
}
mockFetch.mockImplementationOnce(() => {
return Promise.resolve({
ok: true,
json: () =>
Promise.resolve({
content: JSON.stringify({
route: 'route-support',
reasoning: 'The user mentioned a billing issue which is a customer support matter.',
}),
model: 'gpt-4o',
tokens: { input: 150, output: 25, total: 175 },
}),
})
})
const result = await handler.execute(mockContext, mockRouterV2Block, inputs)
expect(result).toMatchObject({
context: 'I need help with a billing issue',
model: 'gpt-4o',
selectedRoute: 'route-support',
reasoning: 'The user mentioned a billing issue which is a customer support matter.',
selectedPath: {
blockId: 'target-block-1',
blockType: 'agent',
blockTitle: 'Support Agent',
},
})
})
it('should include responseFormat in provider request', async () => {
const inputs = {
context: 'Test context',
model: 'gpt-4o',
apiKey: 'test-api-key',
routes: JSON.stringify([{ id: 'route-1', title: 'Route 1', value: 'Description 1' }]),
}
mockFetch.mockImplementationOnce(() => {
return Promise.resolve({
ok: true,
json: () =>
Promise.resolve({
content: JSON.stringify({ route: 'route-1', reasoning: 'Test reasoning' }),
model: 'gpt-4o',
tokens: { input: 100, output: 20, total: 120 },
}),
})
})
await handler.execute(mockContext, mockRouterV2Block, inputs)
const fetchCallArgs = mockFetch.mock.calls[0]
const requestBody = JSON.parse(fetchCallArgs[1].body)
expect(requestBody.responseFormat).toEqual({
name: 'router_response',
schema: {
type: 'object',
properties: {
route: {
type: 'string',
description: 'The selected route ID or NO_MATCH',
},
reasoning: {
type: 'string',
description: 'Brief explanation of why this route was chosen',
},
},
required: ['route', 'reasoning'],
additionalProperties: false,
},
strict: true,
})
})
it('should handle NO_MATCH response with reasoning', async () => {
const inputs = {
context: 'Random unrelated query',
model: 'gpt-4o',
apiKey: 'test-api-key',
routes: JSON.stringify([{ id: 'route-1', title: 'Route 1', value: 'Specific topic' }]),
}
mockFetch.mockImplementationOnce(() => {
return Promise.resolve({
ok: true,
json: () =>
Promise.resolve({
content: JSON.stringify({
route: 'NO_MATCH',
reasoning: 'The query does not relate to any available route.',
}),
model: 'gpt-4o',
tokens: { input: 100, output: 20, total: 120 },
}),
})
})
await expect(handler.execute(mockContext, mockRouterV2Block, inputs)).rejects.toThrow(
'Router could not determine a matching route: The query does not relate to any available route.'
)
})
it('should throw error for invalid route ID in response', async () => {
const inputs = {
context: 'Test context',
model: 'gpt-4o',
apiKey: 'test-api-key',
routes: JSON.stringify([{ id: 'route-1', title: 'Route 1', value: 'Description' }]),
}
mockFetch.mockImplementationOnce(() => {
return Promise.resolve({
ok: true,
json: () =>
Promise.resolve({
content: JSON.stringify({ route: 'invalid-route', reasoning: 'Some reasoning' }),
model: 'gpt-4o',
tokens: { input: 100, output: 20, total: 120 },
}),
})
})
await expect(handler.execute(mockContext, mockRouterV2Block, inputs)).rejects.toThrow(
/Router could not determine a valid route/
)
})
it('should handle routes passed as array instead of JSON string', async () => {
const inputs = {
context: 'Test context',
model: 'gpt-4o',
apiKey: 'test-api-key',
routes: [{ id: 'route-1', title: 'Route 1', value: 'Description' }],
}
mockFetch.mockImplementationOnce(() => {
return Promise.resolve({
ok: true,
json: () =>
Promise.resolve({
content: JSON.stringify({ route: 'route-1', reasoning: 'Matched route 1' }),
model: 'gpt-4o',
tokens: { input: 100, output: 20, total: 120 },
}),
})
})
const result = await handler.execute(mockContext, mockRouterV2Block, inputs)
expect(result.selectedRoute).toBe('route-1')
expect(result.reasoning).toBe('Matched route 1')
})
it('should throw error when no routes are defined', async () => {
const inputs = {
context: 'Test context',
model: 'gpt-4o',
apiKey: 'test-api-key',
routes: '[]',
}
await expect(handler.execute(mockContext, mockRouterV2Block, inputs)).rejects.toThrow(
'No routes defined for router'
)
})
it('should handle fallback when JSON parsing fails', async () => {
const inputs = {
context: 'Test context',
model: 'gpt-4o',
apiKey: 'test-api-key',
routes: JSON.stringify([{ id: 'route-1', title: 'Route 1', value: 'Description' }]),
}
mockFetch.mockImplementationOnce(() => {
return Promise.resolve({
ok: true,
json: () =>
Promise.resolve({
content: 'route-1',
model: 'gpt-4o',
tokens: { input: 100, output: 5, total: 105 },
}),
})
})
const result = await handler.execute(mockContext, mockRouterV2Block, inputs)
expect(result.selectedRoute).toBe('route-1')
expect(result.reasoning).toBe('')
})
})

View File

@@ -238,6 +238,25 @@ export class RouterBlockHandler implements BlockHandler {
apiKey: finalApiKey,
workflowId: ctx.workflowId,
workspaceId: ctx.workspaceId,
responseFormat: {
name: 'router_response',
schema: {
type: 'object',
properties: {
route: {
type: 'string',
description: 'The selected route ID or NO_MATCH',
},
reasoning: {
type: 'string',
description: 'Brief explanation of why this route was chosen',
},
},
required: ['route', 'reasoning'],
additionalProperties: false,
},
strict: true,
},
}
if (providerId === 'vertex') {
@@ -277,16 +296,31 @@ export class RouterBlockHandler implements BlockHandler {
const result = await response.json()
const chosenRouteId = result.content.trim()
let chosenRouteId: string
let reasoning = ''
try {
const parsedResponse = JSON.parse(result.content)
chosenRouteId = parsedResponse.route?.trim() || ''
reasoning = parsedResponse.reasoning || ''
} catch (_parseError) {
logger.error('Router response was not valid JSON despite responseFormat', {
content: result.content,
})
chosenRouteId = result.content.trim()
}
if (chosenRouteId === 'NO_MATCH' || chosenRouteId.toUpperCase() === 'NO_MATCH') {
logger.info('Router determined no route matches the context, routing to error path')
throw new Error('Router could not determine a matching route for the given context')
throw new Error(
reasoning
? `Router could not determine a matching route: ${reasoning}`
: 'Router could not determine a matching route for the given context'
)
}
const chosenRoute = routes.find((r) => r.id === chosenRouteId)
// Throw error if LLM returns invalid route ID - this routes through error path
if (!chosenRoute) {
const availableRoutes = routes.map((r) => ({ id: r.id, title: r.title }))
logger.error(
@@ -298,7 +332,6 @@ export class RouterBlockHandler implements BlockHandler {
)
}
// Find the target block connected to this route's handle
const connection = ctx.workflow?.connections.find(
(conn) => conn.source === block.id && conn.sourceHandle === `router-${chosenRoute.id}`
)
@@ -334,6 +367,7 @@ export class RouterBlockHandler implements BlockHandler {
total: cost.total,
},
selectedRoute: chosenRoute.id,
reasoning,
selectedPath: targetBlock
? {
blockId: targetBlock.id,
@@ -353,7 +387,7 @@ export class RouterBlockHandler implements BlockHandler {
}
/**
* Parse routes from input (can be JSON string or array).
* Parse routes from input (can be JSON string or array)
*/
private parseRoutes(input: any): RouteDefinition[] {
try {

View File

@@ -204,26 +204,21 @@ describe('WorkflowBlockHandler', () => {
})
})
it('should map failed child output correctly', () => {
it('should throw error for failed child output so BlockExecutor can check error port', () => {
const childResult = {
success: false,
error: 'Child workflow failed',
}
const result = (handler as any).mapChildOutputToParent(
childResult,
'child-id',
'Child Workflow',
100
)
expect(() =>
(handler as any).mapChildOutputToParent(childResult, 'child-id', 'Child Workflow', 100)
).toThrow('Error in child workflow "Child Workflow": Child workflow failed')
expect(result).toEqual({
success: false,
childWorkflowName: 'Child Workflow',
result: {},
error: 'Child workflow failed',
childTraceSpans: [],
})
try {
;(handler as any).mapChildOutputToParent(childResult, 'child-id', 'Child Workflow', 100)
} catch (error: any) {
expect(error.childTraceSpans).toEqual([])
}
})
it('should handle nested response structures', () => {

View File

@@ -144,6 +144,11 @@ export class WorkflowBlockHandler implements BlockHandler {
const workflowMetadata = workflows[workflowId]
const childWorkflowName = workflowMetadata?.name || workflowId
const originalError = error.message || 'Unknown error'
const wrappedError = new Error(
`Error in child workflow "${childWorkflowName}": ${originalError}`
)
if (error.executionResult?.logs) {
const executionResult = error.executionResult as ExecutionResult
@@ -159,28 +164,12 @@ export class WorkflowBlockHandler implements BlockHandler {
)
logger.info(`Captured ${childTraceSpans.length} child trace spans from failed execution`)
return {
success: false,
childWorkflowName,
result: {},
error: error.message || 'Child workflow execution failed',
childTraceSpans: childTraceSpans,
} as Record<string, any>
;(wrappedError as any).childTraceSpans = childTraceSpans
} else if (error.childTraceSpans && Array.isArray(error.childTraceSpans)) {
;(wrappedError as any).childTraceSpans = error.childTraceSpans
}
if (error.childTraceSpans && Array.isArray(error.childTraceSpans)) {
return {
success: false,
childWorkflowName,
result: {},
error: error.message || 'Child workflow execution failed',
childTraceSpans: error.childTraceSpans,
} as Record<string, any>
}
const originalError = error.message || 'Unknown error'
throw new Error(`Error in child workflow "${childWorkflowName}": ${originalError}`)
throw wrappedError
}
}
@@ -452,17 +441,13 @@ export class WorkflowBlockHandler implements BlockHandler {
if (!success) {
logger.warn(`Child workflow ${childWorkflowName} failed`)
// Return failure with child trace spans so they can be displayed
return {
success: false,
childWorkflowName,
result,
error: childResult.error || 'Child workflow execution failed',
childTraceSpans: childTraceSpans || [],
} as Record<string, any>
const error = new Error(
`Error in child workflow "${childWorkflowName}": ${childResult.error || 'Child workflow execution failed'}`
)
;(error as any).childTraceSpans = childTraceSpans || []
throw error
}
// Success case
return {
success: true,
childWorkflowName,

View File

@@ -1,24 +1,43 @@
import { getBlockOutputs } from '@/lib/workflows/blocks/block-outputs'
import { normalizeName } from '@/executor/constants'
import type { ExecutionContext } from '@/executor/types'
import type { OutputSchema } from '@/executor/utils/block-reference'
export interface BlockDataCollection {
blockData: Record<string, any>
blockData: Record<string, unknown>
blockNameMapping: Record<string, string>
blockOutputSchemas: Record<string, OutputSchema>
}
export function collectBlockData(ctx: ExecutionContext): BlockDataCollection {
const blockData: Record<string, any> = {}
const blockData: Record<string, unknown> = {}
const blockNameMapping: Record<string, string> = {}
const blockOutputSchemas: Record<string, OutputSchema> = {}
for (const [id, state] of ctx.blockStates.entries()) {
if (state.output !== undefined) {
blockData[id] = state.output
const workflowBlock = ctx.workflow?.blocks?.find((b) => b.id === id)
if (workflowBlock?.metadata?.name) {
blockNameMapping[normalizeName(workflowBlock.metadata.name)] = id
}
const workflowBlock = ctx.workflow?.blocks?.find((b) => b.id === id)
if (!workflowBlock) continue
if (workflowBlock.metadata?.name) {
blockNameMapping[normalizeName(workflowBlock.metadata.name)] = id
}
const blockType = workflowBlock.metadata?.id
if (blockType) {
const params = workflowBlock.config?.params as Record<string, unknown> | undefined
const subBlocks = params
? Object.fromEntries(Object.entries(params).map(([k, v]) => [k, { value: v }]))
: undefined
const schema = getBlockOutputs(blockType, subBlocks)
if (schema && Object.keys(schema).length > 0) {
blockOutputSchemas[id] = schema
}
}
}
return { blockData, blockNameMapping }
return { blockData, blockNameMapping, blockOutputSchemas }
}

View File

@@ -0,0 +1,255 @@
/**
* @vitest-environment node
*/
import { describe, expect, it } from 'vitest'
import {
type BlockReferenceContext,
InvalidFieldError,
resolveBlockReference,
} from './block-reference'
describe('resolveBlockReference', () => {
const createContext = (
overrides: Partial<BlockReferenceContext> = {}
): BlockReferenceContext => ({
blockNameMapping: { start: 'block-1', agent: 'block-2' },
blockData: {},
blockOutputSchemas: {},
...overrides,
})
describe('block name resolution', () => {
it('should return undefined when block name does not exist', () => {
const ctx = createContext()
const result = resolveBlockReference('unknown', ['field'], ctx)
expect(result).toBeUndefined()
})
it('should normalize block name before lookup', () => {
const ctx = createContext({
blockNameMapping: { myblock: 'block-1' },
blockData: { 'block-1': { value: 'test' } },
})
const result = resolveBlockReference('MyBlock', ['value'], ctx)
expect(result).toEqual({ value: 'test', blockId: 'block-1' })
})
it('should handle block names with spaces', () => {
const ctx = createContext({
blockNameMapping: { myblock: 'block-1' },
blockData: { 'block-1': { value: 'test' } },
})
const result = resolveBlockReference('My Block', ['value'], ctx)
expect(result).toEqual({ value: 'test', blockId: 'block-1' })
})
})
describe('field resolution', () => {
it('should return entire block output when no path specified', () => {
const ctx = createContext({
blockData: { 'block-1': { input: 'hello', other: 'data' } },
})
const result = resolveBlockReference('start', [], ctx)
expect(result).toEqual({
value: { input: 'hello', other: 'data' },
blockId: 'block-1',
})
})
it('should resolve simple field path', () => {
const ctx = createContext({
blockData: { 'block-1': { input: 'hello' } },
})
const result = resolveBlockReference('start', ['input'], ctx)
expect(result).toEqual({ value: 'hello', blockId: 'block-1' })
})
it('should resolve nested field path', () => {
const ctx = createContext({
blockData: { 'block-1': { response: { data: { name: 'test' } } } },
})
const result = resolveBlockReference('start', ['response', 'data', 'name'], ctx)
expect(result).toEqual({ value: 'test', blockId: 'block-1' })
})
it('should resolve array index path', () => {
const ctx = createContext({
blockData: { 'block-1': { items: ['a', 'b', 'c'] } },
})
const result = resolveBlockReference('start', ['items', '1'], ctx)
expect(result).toEqual({ value: 'b', blockId: 'block-1' })
})
it('should return undefined value when field exists but has no value', () => {
const ctx = createContext({
blockData: { 'block-1': { input: undefined } },
blockOutputSchemas: {
'block-1': { input: { type: 'string' } },
},
})
const result = resolveBlockReference('start', ['input'], ctx)
expect(result).toEqual({ value: undefined, blockId: 'block-1' })
})
it('should return null value when field has null', () => {
const ctx = createContext({
blockData: { 'block-1': { input: null } },
})
const result = resolveBlockReference('start', ['input'], ctx)
expect(result).toEqual({ value: null, blockId: 'block-1' })
})
})
describe('schema validation', () => {
it('should throw InvalidFieldError when field not in schema', () => {
const ctx = createContext({
blockData: { 'block-1': { existing: 'value' } },
blockOutputSchemas: {
'block-1': {
input: { type: 'string' },
conversationId: { type: 'string' },
},
},
})
expect(() => resolveBlockReference('start', ['invalid'], ctx)).toThrow(InvalidFieldError)
expect(() => resolveBlockReference('start', ['invalid'], ctx)).toThrow(
/"invalid" doesn't exist on block "start"/
)
})
it('should include available fields in error message', () => {
const ctx = createContext({
blockData: { 'block-1': {} },
blockOutputSchemas: {
'block-1': {
input: { type: 'string' },
conversationId: { type: 'string' },
files: { type: 'files' },
},
},
})
try {
resolveBlockReference('start', ['typo'], ctx)
expect.fail('Should have thrown')
} catch (error) {
expect(error).toBeInstanceOf(InvalidFieldError)
const fieldError = error as InvalidFieldError
expect(fieldError.availableFields).toContain('input')
expect(fieldError.availableFields).toContain('conversationId')
expect(fieldError.availableFields).toContain('files')
}
})
it('should allow valid field even when value is undefined', () => {
const ctx = createContext({
blockData: { 'block-1': {} },
blockOutputSchemas: {
'block-1': { input: { type: 'string' } },
},
})
const result = resolveBlockReference('start', ['input'], ctx)
expect(result).toEqual({ value: undefined, blockId: 'block-1' })
})
it('should validate path when block has no output yet', () => {
const ctx = createContext({
blockData: {},
blockOutputSchemas: {
'block-1': { input: { type: 'string' } },
},
})
expect(() => resolveBlockReference('start', ['invalid'], ctx)).toThrow(InvalidFieldError)
})
it('should return undefined for valid field when block has no output', () => {
const ctx = createContext({
blockData: {},
blockOutputSchemas: {
'block-1': { input: { type: 'string' } },
},
})
const result = resolveBlockReference('start', ['input'], ctx)
expect(result).toEqual({ value: undefined, blockId: 'block-1' })
})
})
describe('without schema (pass-through mode)', () => {
it('should return undefined value without throwing when no schema', () => {
const ctx = createContext({
blockData: { 'block-1': { existing: 'value' } },
})
const result = resolveBlockReference('start', ['missing'], ctx)
expect(result).toEqual({ value: undefined, blockId: 'block-1' })
})
})
describe('file type handling', () => {
it('should allow file property access', () => {
const ctx = createContext({
blockData: {
'block-1': {
files: [{ name: 'test.txt', url: 'http://example.com/file' }],
},
},
blockOutputSchemas: {
'block-1': { files: { type: 'files' } },
},
})
const result = resolveBlockReference('start', ['files', '0', 'name'], ctx)
expect(result).toEqual({ value: 'test.txt', blockId: 'block-1' })
})
it('should validate file property names', () => {
const ctx = createContext({
blockData: { 'block-1': { files: [] } },
blockOutputSchemas: {
'block-1': { files: { type: 'files' } },
},
})
expect(() => resolveBlockReference('start', ['files', '0', 'invalid'], ctx)).toThrow(
InvalidFieldError
)
})
})
})
describe('InvalidFieldError', () => {
it('should have correct properties', () => {
const error = new InvalidFieldError('myBlock', 'invalid.path', ['field1', 'field2'])
expect(error.blockName).toBe('myBlock')
expect(error.fieldPath).toBe('invalid.path')
expect(error.availableFields).toEqual(['field1', 'field2'])
expect(error.name).toBe('InvalidFieldError')
})
it('should format message correctly', () => {
const error = new InvalidFieldError('start', 'typo', ['input', 'files'])
expect(error.message).toBe(
'"typo" doesn\'t exist on block "start". Available fields: input, files'
)
})
it('should handle empty available fields', () => {
const error = new InvalidFieldError('start', 'field', [])
expect(error.message).toBe('"field" doesn\'t exist on block "start". Available fields: none')
})
})

View File

@@ -0,0 +1,210 @@
import { USER_FILE_ACCESSIBLE_PROPERTIES } from '@/lib/workflows/types'
import { normalizeName } from '@/executor/constants'
import { navigatePath } from '@/executor/variables/resolvers/reference'
export type OutputSchema = Record<string, { type?: string; description?: string } | unknown>
export interface BlockReferenceContext {
blockNameMapping: Record<string, string>
blockData: Record<string, unknown>
blockOutputSchemas?: Record<string, OutputSchema>
}
export interface BlockReferenceResult {
value: unknown
blockId: string
}
export class InvalidFieldError extends Error {
constructor(
public readonly blockName: string,
public readonly fieldPath: string,
public readonly availableFields: string[]
) {
super(
`"${fieldPath}" doesn't exist on block "${blockName}". ` +
`Available fields: ${availableFields.length > 0 ? availableFields.join(', ') : 'none'}`
)
this.name = 'InvalidFieldError'
}
}
function isFileType(value: unknown): boolean {
if (typeof value !== 'object' || value === null) return false
const typed = value as { type?: string }
return typed.type === 'file[]' || typed.type === 'files'
}
function isArrayType(value: unknown): value is { type: 'array'; items?: unknown } {
if (typeof value !== 'object' || value === null) return false
return (value as { type?: string }).type === 'array'
}
function getArrayItems(schema: unknown): unknown {
if (typeof schema !== 'object' || schema === null) return undefined
return (schema as { items?: unknown }).items
}
function getProperties(schema: unknown): Record<string, unknown> | undefined {
if (typeof schema !== 'object' || schema === null) return undefined
const props = (schema as { properties?: unknown }).properties
return typeof props === 'object' && props !== null
? (props as Record<string, unknown>)
: undefined
}
function lookupField(schema: unknown, fieldName: string): unknown | undefined {
if (typeof schema !== 'object' || schema === null) return undefined
const typed = schema as Record<string, unknown>
if (fieldName in typed) {
return typed[fieldName]
}
const props = getProperties(schema)
if (props && fieldName in props) {
return props[fieldName]
}
return undefined
}
function isPathInSchema(schema: OutputSchema | undefined, pathParts: string[]): boolean {
if (!schema || pathParts.length === 0) {
return true
}
let current: unknown = schema
for (let i = 0; i < pathParts.length; i++) {
const part = pathParts[i]
if (current === null || current === undefined) {
return false
}
if (/^\d+$/.test(part)) {
if (isFileType(current)) {
const nextPart = pathParts[i + 1]
return (
!nextPart ||
USER_FILE_ACCESSIBLE_PROPERTIES.includes(
nextPart as (typeof USER_FILE_ACCESSIBLE_PROPERTIES)[number]
)
)
}
if (isArrayType(current)) {
current = getArrayItems(current)
}
continue
}
const arrayMatch = part.match(/^([^[]+)\[(\d+)\]$/)
if (arrayMatch) {
const [, prop] = arrayMatch
const fieldDef = lookupField(current, prop)
if (!fieldDef) return false
if (isFileType(fieldDef)) {
const nextPart = pathParts[i + 1]
return (
!nextPart ||
USER_FILE_ACCESSIBLE_PROPERTIES.includes(
nextPart as (typeof USER_FILE_ACCESSIBLE_PROPERTIES)[number]
)
)
}
current = isArrayType(fieldDef) ? getArrayItems(fieldDef) : fieldDef
continue
}
if (
isFileType(current) &&
USER_FILE_ACCESSIBLE_PROPERTIES.includes(
part as (typeof USER_FILE_ACCESSIBLE_PROPERTIES)[number]
)
) {
return true
}
const fieldDef = lookupField(current, part)
if (fieldDef !== undefined) {
if (isFileType(fieldDef)) {
const nextPart = pathParts[i + 1]
if (!nextPart) return true
if (/^\d+$/.test(nextPart)) {
const afterIndex = pathParts[i + 2]
return (
!afterIndex ||
USER_FILE_ACCESSIBLE_PROPERTIES.includes(
afterIndex as (typeof USER_FILE_ACCESSIBLE_PROPERTIES)[number]
)
)
}
return USER_FILE_ACCESSIBLE_PROPERTIES.includes(
nextPart as (typeof USER_FILE_ACCESSIBLE_PROPERTIES)[number]
)
}
current = fieldDef
continue
}
if (isArrayType(current)) {
const items = getArrayItems(current)
const itemField = lookupField(items, part)
if (itemField !== undefined) {
current = itemField
continue
}
}
return false
}
return true
}
function getSchemaFieldNames(schema: OutputSchema | undefined): string[] {
if (!schema) return []
return Object.keys(schema)
}
export function resolveBlockReference(
blockName: string,
pathParts: string[],
context: BlockReferenceContext
): BlockReferenceResult | undefined {
const normalizedName = normalizeName(blockName)
const blockId = context.blockNameMapping[normalizedName]
if (!blockId) {
return undefined
}
const blockOutput = context.blockData[blockId]
const schema = context.blockOutputSchemas?.[blockId]
if (blockOutput === undefined) {
if (schema && pathParts.length > 0) {
if (!isPathInSchema(schema, pathParts)) {
throw new InvalidFieldError(blockName, pathParts.join('.'), getSchemaFieldNames(schema))
}
}
return { value: undefined, blockId }
}
if (pathParts.length === 0) {
return { value: blockOutput, blockId }
}
const value = navigatePath(blockOutput, pathParts)
if (value === undefined && schema) {
if (!isPathInSchema(schema, pathParts)) {
throw new InvalidFieldError(blockName, pathParts.join('.'), getSchemaFieldNames(schema))
}
}
return { value, blockId }
}

View File

@@ -1,11 +1,15 @@
import { getBlockOutputs } from '@/lib/workflows/blocks/block-outputs'
import { USER_FILE_ACCESSIBLE_PROPERTIES } from '@/lib/workflows/types'
import {
isReference,
normalizeName,
parseReferencePath,
SPECIAL_REFERENCE_PREFIXES,
} from '@/executor/constants'
import {
InvalidFieldError,
type OutputSchema,
resolveBlockReference,
} from '@/executor/utils/block-reference'
import {
navigatePath,
type ResolutionContext,
@@ -14,123 +18,6 @@ import {
import type { SerializedBlock, SerializedWorkflow } from '@/serializer/types'
import { getTool } from '@/tools/utils'
function isPathInOutputSchema(
outputs: Record<string, any> | undefined,
pathParts: string[]
): boolean {
if (!outputs || pathParts.length === 0) {
return true
}
const isFileArrayType = (value: any): boolean =>
value?.type === 'file[]' || value?.type === 'files'
let current: any = outputs
for (let i = 0; i < pathParts.length; i++) {
const part = pathParts[i]
const arrayMatch = part.match(/^([^[]+)\[(\d+)\]$/)
if (arrayMatch) {
const [, prop] = arrayMatch
let fieldDef: any
if (prop in current) {
fieldDef = current[prop]
} else if (current.properties && prop in current.properties) {
fieldDef = current.properties[prop]
} else if (current.type === 'array' && current.items) {
if (current.items.properties && prop in current.items.properties) {
fieldDef = current.items.properties[prop]
} else if (prop in current.items) {
fieldDef = current.items[prop]
}
}
if (!fieldDef) {
return false
}
if (isFileArrayType(fieldDef)) {
if (i + 1 < pathParts.length) {
return USER_FILE_ACCESSIBLE_PROPERTIES.includes(pathParts[i + 1] as any)
}
return true
}
if (fieldDef.type === 'array' && fieldDef.items) {
current = fieldDef.items
continue
}
current = fieldDef
continue
}
if (/^\d+$/.test(part)) {
if (isFileArrayType(current)) {
if (i + 1 < pathParts.length) {
const nextPart = pathParts[i + 1]
return USER_FILE_ACCESSIBLE_PROPERTIES.includes(nextPart as any)
}
return true
}
continue
}
if (current === null || current === undefined) {
return false
}
if (part in current) {
const nextCurrent = current[part]
if (nextCurrent?.type === 'file[]' && i + 1 < pathParts.length) {
const nextPart = pathParts[i + 1]
if (/^\d+$/.test(nextPart) && i + 2 < pathParts.length) {
const propertyPart = pathParts[i + 2]
return USER_FILE_ACCESSIBLE_PROPERTIES.includes(propertyPart as any)
}
}
current = nextCurrent
continue
}
if (current.properties && part in current.properties) {
current = current.properties[part]
continue
}
if (current.type === 'array' && current.items) {
if (current.items.properties && part in current.items.properties) {
current = current.items.properties[part]
continue
}
if (part in current.items) {
current = current.items[part]
continue
}
}
if (isFileArrayType(current) && USER_FILE_ACCESSIBLE_PROPERTIES.includes(part as any)) {
return true
}
if ('type' in current && typeof current.type === 'string') {
if (!current.properties && !current.items) {
return false
}
}
return false
}
return true
}
function getSchemaFieldNames(outputs: Record<string, any> | undefined): string[] {
if (!outputs) return []
return Object.keys(outputs)
}
export class BlockResolver implements Resolver {
private nameToBlockId: Map<string, string>
private blockById: Map<string, SerializedBlock>
@@ -170,83 +57,94 @@ export class BlockResolver implements Resolver {
return undefined
}
const block = this.blockById.get(blockId)
const block = this.blockById.get(blockId)!
const output = this.getBlockOutput(blockId, context)
if (output === undefined) {
const blockData: Record<string, unknown> = {}
const blockOutputSchemas: Record<string, OutputSchema> = {}
if (output !== undefined) {
blockData[blockId] = output
}
const blockType = block.metadata?.id
const params = block.config?.params as Record<string, unknown> | undefined
const subBlocks = params
? Object.fromEntries(Object.entries(params).map(([k, v]) => [k, { value: v }]))
: undefined
const toolId = block.config?.tool
const toolConfig = toolId ? getTool(toolId) : undefined
const outputSchema =
toolConfig?.outputs ?? (blockType ? getBlockOutputs(blockType, subBlocks) : block.outputs)
if (outputSchema && Object.keys(outputSchema).length > 0) {
blockOutputSchemas[blockId] = outputSchema
}
try {
const result = resolveBlockReference(blockName, pathParts, {
blockNameMapping: Object.fromEntries(this.nameToBlockId),
blockData,
blockOutputSchemas,
})!
if (result.value !== undefined) {
return result.value
}
return this.handleBackwardsCompat(block, output, pathParts)
} catch (error) {
if (error instanceof InvalidFieldError) {
const fallback = this.handleBackwardsCompat(block, output, pathParts)
if (fallback !== undefined) {
return fallback
}
throw new Error(error.message)
}
throw error
}
}
private handleBackwardsCompat(
block: SerializedBlock,
output: unknown,
pathParts: string[]
): unknown {
if (output === undefined || pathParts.length === 0) {
return undefined
}
if (pathParts.length === 0) {
return output
}
// Try the original path first
let result = navigatePath(output, pathParts)
// If successful, return it immediately
if (result !== undefined) {
return result
}
// Response block backwards compatibility:
// Old: <responseBlock.response.data> -> New: <responseBlock.data>
// Only apply fallback if:
// 1. Block type is 'response'
// 2. Path starts with 'response.'
// 3. Output doesn't have a 'response' key (confirming it's the new format)
if (
block?.metadata?.id === 'response' &&
block.metadata?.id === 'response' &&
pathParts[0] === 'response' &&
output?.response === undefined
(output as Record<string, unknown>)?.response === undefined
) {
const adjustedPathParts = pathParts.slice(1)
if (adjustedPathParts.length === 0) {
return output
}
result = navigatePath(output, adjustedPathParts)
if (result !== undefined) {
return result
const fallbackResult = navigatePath(output, adjustedPathParts)
if (fallbackResult !== undefined) {
return fallbackResult
}
}
// Workflow block backwards compatibility:
// Old: <workflowBlock.result.response.data> -> New: <workflowBlock.result.data>
// Only apply fallback if:
// 1. Block type is 'workflow' or 'workflow_input'
// 2. Path starts with 'result.response.'
// 3. output.result.response doesn't exist (confirming child used new format)
const isWorkflowBlock =
block?.metadata?.id === 'workflow' || block?.metadata?.id === 'workflow_input'
block.metadata?.id === 'workflow' || block.metadata?.id === 'workflow_input'
const outputRecord = output as Record<string, Record<string, unknown> | undefined>
if (
isWorkflowBlock &&
pathParts[0] === 'result' &&
pathParts[1] === 'response' &&
output?.result?.response === undefined
outputRecord?.result?.response === undefined
) {
const adjustedPathParts = ['result', ...pathParts.slice(2)]
result = navigatePath(output, adjustedPathParts)
if (result !== undefined) {
return result
const fallbackResult = navigatePath(output, adjustedPathParts)
if (fallbackResult !== undefined) {
return fallbackResult
}
}
const blockType = block?.metadata?.id
const params = block?.config?.params as Record<string, unknown> | undefined
const subBlocks = params
? Object.fromEntries(Object.entries(params).map(([k, v]) => [k, { value: v }]))
: undefined
const toolId = block?.config?.tool
const toolConfig = toolId ? getTool(toolId) : undefined
const outputSchema =
toolConfig?.outputs ?? (blockType ? getBlockOutputs(blockType, subBlocks) : block?.outputs)
const schemaFields = getSchemaFieldNames(outputSchema)
if (schemaFields.length > 0 && !isPathInOutputSchema(outputSchema, pathParts)) {
throw new Error(
`"${pathParts.join('.')}" doesn't exist on block "${blockName}". ` +
`Available fields: ${schemaFields.join(', ')}`
)
}
return undefined
}

View File

@@ -1,6 +1,7 @@
import { loggerMock } from '@sim/testing'
import { describe, expect, it, vi } from 'vitest'
import type { LoopScope } from '@/executor/execution/state'
import { InvalidFieldError } from '@/executor/utils/block-reference'
import { LoopResolver } from './loop'
import type { ResolutionContext } from './reference'
@@ -62,7 +63,12 @@ function createTestContext(
describe('LoopResolver', () => {
describe('canResolve', () => {
it.concurrent('should return true for loop references', () => {
it.concurrent('should return true for bare loop reference', () => {
const resolver = new LoopResolver(createTestWorkflow())
expect(resolver.canResolve('<loop>')).toBe(true)
})
it.concurrent('should return true for known loop properties', () => {
const resolver = new LoopResolver(createTestWorkflow())
expect(resolver.canResolve('<loop.index>')).toBe(true)
expect(resolver.canResolve('<loop.iteration>')).toBe(true)
@@ -78,6 +84,13 @@ describe('LoopResolver', () => {
expect(resolver.canResolve('<loop.items.0>')).toBe(true)
})
it.concurrent('should return true for unknown loop properties (validates in resolve)', () => {
const resolver = new LoopResolver(createTestWorkflow())
expect(resolver.canResolve('<loop.results>')).toBe(true)
expect(resolver.canResolve('<loop.output>')).toBe(true)
expect(resolver.canResolve('<loop.unknownProperty>')).toBe(true)
})
it.concurrent('should return false for non-loop references', () => {
const resolver = new LoopResolver(createTestWorkflow())
expect(resolver.canResolve('<block.output>')).toBe(false)
@@ -181,20 +194,34 @@ describe('LoopResolver', () => {
})
describe('edge cases', () => {
it.concurrent('should return undefined for invalid loop reference (missing property)', () => {
it.concurrent('should return context object for bare loop reference', () => {
const resolver = new LoopResolver(createTestWorkflow())
const loopScope = createLoopScope({ iteration: 0 })
const loopScope = createLoopScope({ iteration: 2, item: 'test', items: ['a', 'b', 'c'] })
const ctx = createTestContext('block-1', loopScope)
expect(resolver.resolve('<loop>', ctx)).toBeUndefined()
expect(resolver.resolve('<loop>', ctx)).toEqual({
index: 2,
currentItem: 'test',
items: ['a', 'b', 'c'],
})
})
it.concurrent('should return undefined for unknown loop property', () => {
it.concurrent('should return minimal context object for for-loop (no items)', () => {
const resolver = new LoopResolver(createTestWorkflow())
const loopScope = createLoopScope({ iteration: 5 })
const ctx = createTestContext('block-1', loopScope)
expect(resolver.resolve('<loop>', ctx)).toEqual({
index: 5,
})
})
it.concurrent('should throw InvalidFieldError for unknown loop property', () => {
const resolver = new LoopResolver(createTestWorkflow())
const loopScope = createLoopScope({ iteration: 0 })
const ctx = createTestContext('block-1', loopScope)
expect(resolver.resolve('<loop.unknownProperty>', ctx)).toBeUndefined()
expect(() => resolver.resolve('<loop.unknownProperty>', ctx)).toThrow(InvalidFieldError)
})
it.concurrent('should handle iteration index 0 correctly', () => {

View File

@@ -1,5 +1,6 @@
import { createLogger } from '@sim/logger'
import { isReference, parseReferencePath, REFERENCE } from '@/executor/constants'
import { InvalidFieldError } from '@/executor/utils/block-reference'
import { extractBaseBlockId } from '@/executor/utils/subflow-utils'
import {
navigatePath,
@@ -13,6 +14,8 @@ const logger = createLogger('LoopResolver')
export class LoopResolver implements Resolver {
constructor(private workflow: SerializedWorkflow) {}
private static KNOWN_PROPERTIES = ['iteration', 'index', 'item', 'currentItem', 'items']
canResolve(reference: string): boolean {
if (!isReference(reference)) {
return false
@@ -27,16 +30,15 @@ export class LoopResolver implements Resolver {
resolve(reference: string, context: ResolutionContext): any {
const parts = parseReferencePath(reference)
if (parts.length < 2) {
logger.warn('Invalid loop reference - missing property', { reference })
if (parts.length === 0) {
logger.warn('Invalid loop reference', { reference })
return undefined
}
const [_, property, ...pathParts] = parts
const loopId = this.findLoopForBlock(context.currentNodeId)
let loopScope = context.loopScope
if (!loopScope) {
const loopId = this.findLoopForBlock(context.currentNodeId)
if (!loopId) {
return undefined
}
@@ -48,6 +50,27 @@ export class LoopResolver implements Resolver {
return undefined
}
const isForEach = loopId ? this.isForEachLoop(loopId) : loopScope.items !== undefined
if (parts.length === 1) {
const result: Record<string, any> = {
index: loopScope.iteration,
}
if (loopScope.item !== undefined) {
result.currentItem = loopScope.item
}
if (loopScope.items !== undefined) {
result.items = loopScope.items
}
return result
}
const [_, property, ...pathParts] = parts
if (!LoopResolver.KNOWN_PROPERTIES.includes(property)) {
const availableFields = isForEach ? ['index', 'currentItem', 'items'] : ['index']
throw new InvalidFieldError('loop', property, availableFields)
}
let value: any
switch (property) {
case 'iteration':
@@ -61,12 +84,8 @@ export class LoopResolver implements Resolver {
case 'items':
value = loopScope.items
break
default:
logger.warn('Unknown loop property', { property })
return undefined
}
// If there are additional path parts, navigate deeper
if (pathParts.length > 0) {
return navigatePath(value, pathParts)
}
@@ -85,4 +104,9 @@ export class LoopResolver implements Resolver {
return undefined
}
private isForEachLoop(loopId: string): boolean {
const loopConfig = this.workflow.loops?.[loopId]
return loopConfig?.loopType === 'forEach'
}
}

View File

@@ -1,5 +1,6 @@
import { loggerMock } from '@sim/testing'
import { describe, expect, it, vi } from 'vitest'
import { InvalidFieldError } from '@/executor/utils/block-reference'
import { ParallelResolver } from './parallel'
import type { ResolutionContext } from './reference'
@@ -81,7 +82,12 @@ function createTestContext(
describe('ParallelResolver', () => {
describe('canResolve', () => {
it.concurrent('should return true for parallel references', () => {
it.concurrent('should return true for bare parallel reference', () => {
const resolver = new ParallelResolver(createTestWorkflow())
expect(resolver.canResolve('<parallel>')).toBe(true)
})
it.concurrent('should return true for known parallel properties', () => {
const resolver = new ParallelResolver(createTestWorkflow())
expect(resolver.canResolve('<parallel.index>')).toBe(true)
expect(resolver.canResolve('<parallel.currentItem>')).toBe(true)
@@ -94,6 +100,16 @@ describe('ParallelResolver', () => {
expect(resolver.canResolve('<parallel.items.0>')).toBe(true)
})
it.concurrent(
'should return true for unknown parallel properties (validates in resolve)',
() => {
const resolver = new ParallelResolver(createTestWorkflow())
expect(resolver.canResolve('<parallel.results>')).toBe(true)
expect(resolver.canResolve('<parallel.output>')).toBe(true)
expect(resolver.canResolve('<parallel.unknownProperty>')).toBe(true)
}
)
it.concurrent('should return false for non-parallel references', () => {
const resolver = new ParallelResolver(createTestWorkflow())
expect(resolver.canResolve('<block.output>')).toBe(false)
@@ -254,24 +270,40 @@ describe('ParallelResolver', () => {
})
describe('edge cases', () => {
it.concurrent(
'should return undefined for invalid parallel reference (missing property)',
() => {
const resolver = new ParallelResolver(createTestWorkflow())
const ctx = createTestContext('block-1₍0₎')
it.concurrent('should return context object for bare parallel reference', () => {
const workflow = createTestWorkflow({
'parallel-1': { nodes: ['block-1'], distribution: ['a', 'b', 'c'] },
})
const resolver = new ParallelResolver(workflow)
const ctx = createTestContext('block-1₍1₎')
expect(resolver.resolve('<parallel>', ctx)).toBeUndefined()
}
)
expect(resolver.resolve('<parallel>', ctx)).toEqual({
index: 1,
currentItem: 'b',
items: ['a', 'b', 'c'],
})
})
it.concurrent('should return undefined for unknown parallel property', () => {
it.concurrent('should return minimal context object when no distribution', () => {
const workflow = createTestWorkflow({
'parallel-1': { nodes: ['block-1'] },
})
const resolver = new ParallelResolver(workflow)
const ctx = createTestContext('block-1₍0₎')
const result = resolver.resolve('<parallel>', ctx)
expect(result).toHaveProperty('index', 0)
expect(result).toHaveProperty('items')
})
it.concurrent('should throw InvalidFieldError for unknown parallel property', () => {
const workflow = createTestWorkflow({
'parallel-1': { nodes: ['block-1'], distribution: ['a'] },
})
const resolver = new ParallelResolver(workflow)
const ctx = createTestContext('block-1₍0₎')
expect(resolver.resolve('<parallel.unknownProperty>', ctx)).toBeUndefined()
expect(() => resolver.resolve('<parallel.unknownProperty>', ctx)).toThrow(InvalidFieldError)
})
it.concurrent('should return undefined when block is not in any parallel', () => {

View File

@@ -1,5 +1,6 @@
import { createLogger } from '@sim/logger'
import { isReference, parseReferencePath, REFERENCE } from '@/executor/constants'
import { InvalidFieldError } from '@/executor/utils/block-reference'
import { extractBaseBlockId, extractBranchIndex } from '@/executor/utils/subflow-utils'
import {
navigatePath,
@@ -13,6 +14,8 @@ const logger = createLogger('ParallelResolver')
export class ParallelResolver implements Resolver {
constructor(private workflow: SerializedWorkflow) {}
private static KNOWN_PROPERTIES = ['index', 'currentItem', 'items']
canResolve(reference: string): boolean {
if (!isReference(reference)) {
return false
@@ -27,12 +30,11 @@ export class ParallelResolver implements Resolver {
resolve(reference: string, context: ResolutionContext): any {
const parts = parseReferencePath(reference)
if (parts.length < 2) {
logger.warn('Invalid parallel reference - missing property', { reference })
if (parts.length === 0) {
logger.warn('Invalid parallel reference', { reference })
return undefined
}
const [_, property, ...pathParts] = parts
const parallelId = this.findParallelForBlock(context.currentNodeId)
if (!parallelId) {
return undefined
@@ -49,11 +51,33 @@ export class ParallelResolver implements Resolver {
return undefined
}
// First try to get items from the parallel scope (resolved at runtime)
// This is the same pattern as LoopResolver reading from loopScope.items
const parallelScope = context.executionContext.parallelExecutions?.get(parallelId)
const distributionItems = parallelScope?.items ?? this.getDistributionItems(parallelConfig)
if (parts.length === 1) {
const result: Record<string, any> = {
index: branchIndex,
}
if (distributionItems !== undefined) {
result.items = distributionItems
if (Array.isArray(distributionItems)) {
result.currentItem = distributionItems[branchIndex]
} else if (typeof distributionItems === 'object' && distributionItems !== null) {
const keys = Object.keys(distributionItems)
const key = keys[branchIndex]
result.currentItem = key !== undefined ? distributionItems[key] : undefined
}
}
return result
}
const [_, property, ...pathParts] = parts
if (!ParallelResolver.KNOWN_PROPERTIES.includes(property)) {
const isCollection = parallelConfig.parallelType === 'collection'
const availableFields = isCollection ? ['index', 'currentItem', 'items'] : ['index']
throw new InvalidFieldError('parallel', property, availableFields)
}
let value: any
switch (property) {
case 'index':
@@ -73,12 +97,8 @@ export class ParallelResolver implements Resolver {
case 'items':
value = distributionItems
break
default:
logger.warn('Unknown parallel property', { property })
return undefined
}
// If there are additional path parts, navigate deeper
if (pathParts.length > 0) {
return navigatePath(value, pathParts)
}

View File

@@ -27,23 +27,28 @@ export function navigatePath(obj: any, path: string[]): any {
return undefined
}
// Handle array indexing like "items[0]" or just numeric indices
const arrayMatch = part.match(/^([^[]+)\[(\d+)\](.*)$/)
const arrayMatch = part.match(/^([^[]+)(\[.+)$/)
if (arrayMatch) {
// Handle complex array access like "items[0]"
const [, prop, index] = arrayMatch
const [, prop, bracketsPart] = arrayMatch
current = current[prop]
if (current === undefined || current === null) {
return undefined
}
const idx = Number.parseInt(index, 10)
current = Array.isArray(current) ? current[idx] : undefined
const indices = bracketsPart.match(/\[(\d+)\]/g)
if (indices) {
for (const indexMatch of indices) {
if (current === null || current === undefined) {
return undefined
}
const idx = Number.parseInt(indexMatch.slice(1, -1), 10)
current = Array.isArray(current) ? current[idx] : undefined
}
}
} else if (/^\d+$/.test(part)) {
// Handle plain numeric index
const index = Number.parseInt(part, 10)
current = Array.isArray(current) ? current[index] : undefined
} else {
// Handle regular property access
current = current[part]
}
}

View File

@@ -5,7 +5,7 @@ import { useShallow } from 'zustand/react/shallow'
import { useSession } from '@/lib/auth/auth-client'
import { useSocket } from '@/app/workspace/providers/socket-provider'
import { getBlock } from '@/blocks'
import { normalizeName } from '@/executor/constants'
import { normalizeName, RESERVED_BLOCK_NAMES } from '@/executor/constants'
import { useUndoRedo } from '@/hooks/use-undo-redo'
import {
BLOCK_OPERATIONS,
@@ -740,6 +740,16 @@ export function useCollaborativeWorkflow() {
return { success: false, error: 'Block name cannot be empty' }
}
if ((RESERVED_BLOCK_NAMES as readonly string[]).includes(normalizedNewName)) {
logger.error(`Cannot rename block to reserved name: "${trimmedName}"`)
useNotificationStore.getState().addNotification({
level: 'error',
message: `"${trimmedName}" is a reserved name and cannot be used`,
workflowId: activeWorkflowId || undefined,
})
return { success: false, error: `"${trimmedName}" is a reserved name` }
}
const currentBlocks = useWorkflowStore.getState().blocks
const conflictingBlock = Object.entries(currentBlocks).find(
([blockId, block]) => blockId !== id && normalizeName(block.name) === normalizedNewName

View File

@@ -14,7 +14,7 @@ import { validateWorkflowState } from '@/lib/workflows/sanitization/validation'
import { TriggerUtils } from '@/lib/workflows/triggers/triggers'
import { getAllBlocks, getBlock } from '@/blocks/registry'
import type { SubBlockConfig } from '@/blocks/types'
import { EDGE, normalizeName } from '@/executor/constants'
import { EDGE, normalizeName, RESERVED_BLOCK_NAMES } from '@/executor/constants'
import { getUserPermissionConfig } from '@/executor/utils/permission-check'
import { generateLoopBlocks, generateParallelBlocks } from '@/stores/workflows/workflow/utils'
import { TRIGGER_RUNTIME_SUBBLOCK_IDS } from '@/triggers/constants'
@@ -63,6 +63,7 @@ type SkippedItemType =
| 'invalid_subflow_parent'
| 'nested_subflow_not_allowed'
| 'duplicate_block_name'
| 'reserved_block_name'
| 'duplicate_trigger'
| 'duplicate_single_instance_block'
@@ -1683,7 +1684,8 @@ function applyOperationsToWorkflowState(
}
}
if (params?.name !== undefined) {
if (!normalizeName(params.name)) {
const normalizedName = normalizeName(params.name)
if (!normalizedName) {
logSkippedItem(skippedItems, {
type: 'missing_required_params',
operationType: 'edit',
@@ -1691,6 +1693,14 @@ function applyOperationsToWorkflowState(
reason: `Cannot rename to empty name`,
details: { requestedName: params.name },
})
} else if ((RESERVED_BLOCK_NAMES as readonly string[]).includes(normalizedName)) {
logSkippedItem(skippedItems, {
type: 'reserved_block_name',
operationType: 'edit',
blockId: block_id,
reason: `Cannot rename to "${params.name}" - this is a reserved name`,
details: { requestedName: params.name },
})
} else {
const conflictingBlock = findBlockWithDuplicateNormalizedName(
modifiedState.blocks,
@@ -1911,7 +1921,8 @@ function applyOperationsToWorkflowState(
}
case 'add': {
if (!params?.type || !params?.name || !normalizeName(params.name)) {
const addNormalizedName = params?.name ? normalizeName(params.name) : ''
if (!params?.type || !params?.name || !addNormalizedName) {
logSkippedItem(skippedItems, {
type: 'missing_required_params',
operationType: 'add',
@@ -1922,6 +1933,17 @@ function applyOperationsToWorkflowState(
break
}
if ((RESERVED_BLOCK_NAMES as readonly string[]).includes(addNormalizedName)) {
logSkippedItem(skippedItems, {
type: 'reserved_block_name',
operationType: 'add',
blockId: block_id,
reason: `Block name "${params.name}" is a reserved name and cannot be used`,
details: { requestedName: params.name },
})
break
}
const conflictingBlock = findBlockWithDuplicateNormalizedName(
modifiedState.blocks,
params.name,

View File

@@ -1,7 +1,7 @@
import { db } from '@sim/db'
import { idempotencyKey } from '@sim/db/schema'
import { createLogger } from '@sim/logger'
import { and, eq, lt } from 'drizzle-orm'
import { and, count, inArray, like, lt, max, min, sql } from 'drizzle-orm'
const logger = createLogger('IdempotencyCleanup')
@@ -19,7 +19,8 @@ export interface CleanupOptions {
batchSize?: number
/**
* Specific namespace to clean up, or undefined to clean all namespaces
* Specific namespace prefix to clean up (e.g., 'webhook', 'polling')
* Keys are prefixed with namespace, so this filters by key prefix
*/
namespace?: string
}
@@ -53,13 +54,17 @@ export async function cleanupExpiredIdempotencyKeys(
while (hasMore) {
try {
// Build where condition - filter by cutoff date and optionally by namespace prefix
const whereCondition = namespace
? and(lt(idempotencyKey.createdAt, cutoffDate), eq(idempotencyKey.namespace, namespace))
? and(
lt(idempotencyKey.createdAt, cutoffDate),
like(idempotencyKey.key, `${namespace}:%`)
)
: lt(idempotencyKey.createdAt, cutoffDate)
// First, find IDs to delete with limit
// Find keys to delete with limit
const toDelete = await db
.select({ key: idempotencyKey.key, namespace: idempotencyKey.namespace })
.select({ key: idempotencyKey.key })
.from(idempotencyKey)
.where(whereCondition)
.limit(batchSize)
@@ -68,14 +73,13 @@ export async function cleanupExpiredIdempotencyKeys(
break
}
// Delete the found records
// Delete the found records by key
const deleteResult = await db
.delete(idempotencyKey)
.where(
and(
...toDelete.map((item) =>
and(eq(idempotencyKey.key, item.key), eq(idempotencyKey.namespace, item.namespace))
)
inArray(
idempotencyKey.key,
toDelete.map((item) => item.key)
)
)
.returning({ key: idempotencyKey.key })
@@ -126,6 +130,7 @@ export async function cleanupExpiredIdempotencyKeys(
/**
* Get statistics about idempotency key usage
* Uses SQL aggregations to avoid loading all keys into memory
*/
export async function getIdempotencyKeyStats(): Promise<{
totalKeys: number
@@ -134,34 +139,35 @@ export async function getIdempotencyKeyStats(): Promise<{
newestKey: Date | null
}> {
try {
const allKeys = await db
// Get total count and date range in a single query
const [statsResult] = await db
.select({
namespace: idempotencyKey.namespace,
createdAt: idempotencyKey.createdAt,
totalKeys: count(),
oldestKey: min(idempotencyKey.createdAt),
newestKey: max(idempotencyKey.createdAt),
})
.from(idempotencyKey)
const totalKeys = allKeys.length
// Get counts by namespace prefix using SQL substring
// Extracts everything before the first ':' as the namespace
const namespaceStats = await db
.select({
namespace: sql<string>`split_part(${idempotencyKey.key}, ':', 1)`.as('namespace'),
count: count(),
})
.from(idempotencyKey)
.groupBy(sql`split_part(${idempotencyKey.key}, ':', 1)`)
const keysByNamespace: Record<string, number> = {}
let oldestKey: Date | null = null
let newestKey: Date | null = null
for (const key of allKeys) {
keysByNamespace[key.namespace] = (keysByNamespace[key.namespace] || 0) + 1
if (!oldestKey || key.createdAt < oldestKey) {
oldestKey = key.createdAt
}
if (!newestKey || key.createdAt > newestKey) {
newestKey = key.createdAt
}
for (const row of namespaceStats) {
keysByNamespace[row.namespace || 'unknown'] = row.count
}
return {
totalKeys,
totalKeys: statsResult?.totalKeys ?? 0,
keysByNamespace,
oldestKey,
newestKey,
oldestKey: statsResult?.oldestKey ?? null,
newestKey: statsResult?.newestKey ?? null,
}
} catch (error) {
logger.error('Failed to get idempotency key stats:', error)

View File

@@ -2,7 +2,7 @@ import { randomUUID } from 'crypto'
import { db } from '@sim/db'
import { idempotencyKey } from '@sim/db/schema'
import { createLogger } from '@sim/logger'
import { and, eq } from 'drizzle-orm'
import { eq } from 'drizzle-orm'
import { getRedisClient } from '@/lib/core/config/redis'
import { getStorageMethod, type StorageMethod } from '@/lib/core/storage'
import { extractProviderIdentifierFromBody } from '@/lib/webhooks/provider-utils'
@@ -124,12 +124,7 @@ export class IdempotencyService {
const existing = await db
.select({ result: idempotencyKey.result, createdAt: idempotencyKey.createdAt })
.from(idempotencyKey)
.where(
and(
eq(idempotencyKey.key, normalizedKey),
eq(idempotencyKey.namespace, this.config.namespace)
)
)
.where(eq(idempotencyKey.key, normalizedKey))
.limit(1)
if (existing.length > 0) {
@@ -224,11 +219,12 @@ export class IdempotencyService {
.insert(idempotencyKey)
.values({
key: normalizedKey,
namespace: this.config.namespace,
result: inProgressResult,
createdAt: new Date(),
})
.onConflictDoNothing()
.onConflictDoNothing({
target: [idempotencyKey.key],
})
.returning({ key: idempotencyKey.key })
if (insertResult.length > 0) {
@@ -243,12 +239,7 @@ export class IdempotencyService {
const existing = await db
.select({ result: idempotencyKey.result })
.from(idempotencyKey)
.where(
and(
eq(idempotencyKey.key, normalizedKey),
eq(idempotencyKey.namespace, this.config.namespace)
)
)
.where(eq(idempotencyKey.key, normalizedKey))
.limit(1)
const existingResult =
@@ -280,12 +271,7 @@ export class IdempotencyService {
const existing = await db
.select({ result: idempotencyKey.result })
.from(idempotencyKey)
.where(
and(
eq(idempotencyKey.key, normalizedKey),
eq(idempotencyKey.namespace, this.config.namespace)
)
)
.where(eq(idempotencyKey.key, normalizedKey))
.limit(1)
currentResult = existing.length > 0 ? (existing[0].result as ProcessingResult) : null
}
@@ -339,12 +325,11 @@ export class IdempotencyService {
.insert(idempotencyKey)
.values({
key: normalizedKey,
namespace: this.config.namespace,
result: result,
createdAt: new Date(),
})
.onConflictDoUpdate({
target: [idempotencyKey.key, idempotencyKey.namespace],
target: [idempotencyKey.key],
set: {
result: result,
createdAt: new Date(),

View File

@@ -130,7 +130,11 @@ async function executeCode(request) {
await jail.set('environmentVariables', new ivm.ExternalCopy(envVars).copyInto())
for (const [key, value] of Object.entries(contextVariables)) {
await jail.set(key, new ivm.ExternalCopy(value).copyInto())
if (value === undefined) {
await jail.set(key, undefined)
} else {
await jail.set(key, new ivm.ExternalCopy(value).copyInto())
}
}
const fetchCallback = new ivm.Reference(async (url, optionsJson) => {

View File

@@ -7,7 +7,7 @@ import { getBlockOutputs } from '@/lib/workflows/blocks/block-outputs'
import { TriggerUtils } from '@/lib/workflows/triggers/triggers'
import { getBlock } from '@/blocks'
import type { SubBlockConfig } from '@/blocks/types'
import { normalizeName } from '@/executor/constants'
import { normalizeName, RESERVED_BLOCK_NAMES } from '@/executor/constants'
import { useWorkflowRegistry } from '@/stores/workflows/registry/store'
import { useSubBlockStore } from '@/stores/workflows/subblock/store'
import { filterNewEdges, getUniqueBlockName, mergeSubblockState } from '@/stores/workflows/utils'
@@ -726,6 +726,11 @@ export const useWorkflowStore = create<WorkflowStore>()(
return { success: false, changedSubblocks: [] }
}
if ((RESERVED_BLOCK_NAMES as readonly string[]).includes(normalizedNewName)) {
logger.error(`Cannot rename block to reserved name: "${name}"`)
return { success: false, changedSubblocks: [] }
}
const newState = {
blocks: {
...get().blocks,

View File

@@ -56,6 +56,7 @@ describe('Function Execute Tool', () => {
workflowVariables: {},
blockData: {},
blockNameMapping: {},
blockOutputSchemas: {},
isCustomTool: false,
language: 'javascript',
timeout: 5000,
@@ -83,6 +84,7 @@ describe('Function Execute Tool', () => {
workflowVariables: {},
blockData: {},
blockNameMapping: {},
blockOutputSchemas: {},
isCustomTool: false,
language: 'javascript',
workflowId: undefined,
@@ -101,6 +103,7 @@ describe('Function Execute Tool', () => {
workflowVariables: {},
blockData: {},
blockNameMapping: {},
blockOutputSchemas: {},
isCustomTool: false,
language: 'javascript',
workflowId: undefined,

View File

@@ -53,6 +53,13 @@ export const functionExecuteTool: ToolConfig<CodeExecutionInput, CodeExecutionOu
description: 'Mapping of block names to block IDs',
default: {},
},
blockOutputSchemas: {
type: 'object',
required: false,
visibility: 'hidden',
description: 'Mapping of block IDs to their output schemas for validation',
default: {},
},
workflowVariables: {
type: 'object',
required: false,
@@ -81,6 +88,7 @@ export const functionExecuteTool: ToolConfig<CodeExecutionInput, CodeExecutionOu
workflowVariables: params.workflowVariables || {},
blockData: params.blockData || {},
blockNameMapping: params.blockNameMapping || {},
blockOutputSchemas: params.blockOutputSchemas || {},
workflowId: params._context?.workflowId,
isCustomTool: params.isCustomTool || false,
}

View File

@@ -11,6 +11,7 @@ export interface CodeExecutionInput {
workflowVariables?: Record<string, unknown>
blockData?: Record<string, unknown>
blockNameMapping?: Record<string, string>
blockOutputSchemas?: Record<string, Record<string, unknown>>
_context?: {
workflowId?: string
}

View File

@@ -110,12 +110,22 @@ spec:
{{- end }}
{{- include "sim.resources" .Values.app | nindent 10 }}
{{- include "sim.securityContext" .Values.app | nindent 10 }}
{{- with .Values.extraVolumeMounts }}
{{- if or .Values.extraVolumeMounts .Values.app.extraVolumeMounts }}
volumeMounts:
{{- with .Values.extraVolumeMounts }}
{{- toYaml . | nindent 12 }}
{{- end }}
{{- with .Values.app.extraVolumeMounts }}
{{- toYaml . | nindent 12 }}
{{- end }}
{{- end }}
{{- with .Values.extraVolumes }}
{{- if or .Values.extraVolumes .Values.app.extraVolumes }}
volumes:
{{- with .Values.extraVolumes }}
{{- toYaml . | nindent 8 }}
{{- end }}
{{- with .Values.app.extraVolumes }}
{{- toYaml . | nindent 8 }}
{{- end }}
{{- end }}
{{- end }}

View File

@@ -92,6 +92,7 @@ spec:
{{- toYaml .Values.ollama.readinessProbe | nindent 12 }}
{{- end }}
{{- include "sim.resources" .Values.ollama | nindent 10 }}
{{- if or .Values.ollama.persistence.enabled .Values.extraVolumeMounts .Values.ollama.extraVolumeMounts }}
volumeMounts:
{{- if .Values.ollama.persistence.enabled }}
- name: ollama-data
@@ -100,13 +101,22 @@ spec:
{{- with .Values.extraVolumeMounts }}
{{- toYaml . | nindent 12 }}
{{- end }}
{{- if .Values.ollama.persistence.enabled }}
{{- with .Values.ollama.extraVolumeMounts }}
{{- toYaml . | nindent 12 }}
{{- end }}
{{- end }}
{{- if or .Values.ollama.persistence.enabled .Values.extraVolumes .Values.ollama.extraVolumes }}
volumes:
{{- if .Values.ollama.persistence.enabled }}
- name: ollama-data
persistentVolumeClaim:
claimName: {{ include "sim.fullname" . }}-ollama-data
{{- end }}
{{- with .Values.extraVolumes }}
{{- toYaml . | nindent 8 }}
{{- end }}
{{- with .Values.ollama.extraVolumes }}
{{- toYaml . | nindent 8 }}
{{- end }}
{{- end }}
{{- end }}

View File

@@ -84,12 +84,22 @@ spec:
{{- end }}
{{- include "sim.resources" .Values.realtime | nindent 10 }}
{{- include "sim.securityContext" .Values.realtime | nindent 10 }}
{{- with .Values.extraVolumeMounts }}
{{- if or .Values.extraVolumeMounts .Values.realtime.extraVolumeMounts }}
volumeMounts:
{{- with .Values.extraVolumeMounts }}
{{- toYaml . | nindent 12 }}
{{- end }}
{{- with .Values.realtime.extraVolumeMounts }}
{{- toYaml . | nindent 12 }}
{{- end }}
{{- end }}
{{- with .Values.extraVolumes }}
{{- if or .Values.extraVolumes .Values.realtime.extraVolumes }}
volumes:
{{- with .Values.extraVolumes }}
{{- toYaml . | nindent 8 }}
{{- end }}
{{- with .Values.realtime.extraVolumes }}
{{- toYaml . | nindent 8 }}
{{- end }}
{{- end }}
{{- end }}

View File

@@ -224,6 +224,10 @@ app:
timeoutSeconds: 5
failureThreshold: 3
# Additional volumes for app deployment (e.g., branding assets, custom configs)
extraVolumes: []
extraVolumeMounts: []
# Realtime socket server configuration
realtime:
# Enable/disable the realtime service
@@ -301,6 +305,10 @@ realtime:
timeoutSeconds: 5
failureThreshold: 3
# Additional volumes for realtime deployment
extraVolumes: []
extraVolumeMounts: []
# Database migrations job configuration
migrations:
# Enable/disable migrations job
@@ -539,6 +547,10 @@ ollama:
timeoutSeconds: 5
failureThreshold: 3
# Additional volumes for ollama deployment
extraVolumes: []
extraVolumeMounts: []
# Ingress configuration
ingress:
# Enable/disable ingress

View File

@@ -0,0 +1,4 @@
DROP INDEX "idempotency_key_namespace_unique";--> statement-breakpoint
DROP INDEX "idempotency_key_namespace_idx";--> statement-breakpoint
ALTER TABLE "idempotency_key" ADD PRIMARY KEY ("key");--> statement-breakpoint
ALTER TABLE "idempotency_key" DROP COLUMN "namespace";

File diff suppressed because it is too large Load Diff

View File

@@ -1023,6 +1023,13 @@
"when": 1768867605608,
"tag": "0146_cultured_ikaris",
"breakpoints": true
},
{
"idx": 147,
"version": "7",
"when": 1769134350805,
"tag": "0147_rare_firebrand",
"breakpoints": true
}
]
}

View File

@@ -1656,20 +1656,13 @@ export const workflowDeploymentVersion = pgTable(
export const idempotencyKey = pgTable(
'idempotency_key',
{
key: text('key').notNull(),
namespace: text('namespace').notNull().default('default'),
key: text('key').primaryKey(),
result: json('result').notNull(),
createdAt: timestamp('created_at').notNull().defaultNow(),
},
(table) => ({
// Primary key is combination of key and namespace
keyNamespacePk: uniqueIndex('idempotency_key_namespace_unique').on(table.key, table.namespace),
// Index for cleanup operations by creation time
createdAtIdx: index('idempotency_key_created_at_idx').on(table.createdAt),
// Index for namespace-based queries
namespaceIdx: index('idempotency_key_namespace_idx').on(table.namespace),
})
)