mirror of
https://github.com/simstudioai/sim.git
synced 2026-01-31 09:48:06 -05:00
fix(executor): condition inside parallel
This commit is contained in:
@@ -322,7 +322,8 @@ describe('ConditionBlockHandler', () => {
|
||||
|
||||
await handler.execute(mockContext, mockBlock, inputs)
|
||||
|
||||
expect(mockCollectBlockData).toHaveBeenCalledWith(mockContext)
|
||||
// collectBlockData is now called with the current node ID for parallel branch context
|
||||
expect(mockCollectBlockData).toHaveBeenCalledWith(mockContext, mockBlock.id)
|
||||
})
|
||||
|
||||
it('should handle function_execute tool failure', async () => {
|
||||
@@ -620,4 +621,248 @@ describe('ConditionBlockHandler', () => {
|
||||
expect(mockContext.decisions.condition.has(mockBlock.id)).toBe(false)
|
||||
})
|
||||
})
|
||||
|
||||
describe('Parallel branch handling', () => {
|
||||
it('should resolve connections and block data correctly when inside a parallel branch', async () => {
|
||||
// Simulate a condition block inside a parallel branch
|
||||
// Virtual block ID uses subscript notation: blockId₍branchIndex₎
|
||||
const parallelConditionBlock: SerializedBlock = {
|
||||
id: 'cond-block-1₍0₎', // Virtual ID for branch 0
|
||||
metadata: { id: 'condition', name: 'Condition' },
|
||||
position: { x: 0, y: 0 },
|
||||
config: {},
|
||||
}
|
||||
|
||||
// Source block also has a virtual ID in the same branch
|
||||
const sourceBlockVirtualId = 'agent-block-1₍0₎'
|
||||
|
||||
// Set up workflow with connections using BASE block IDs (as they are in the workflow definition)
|
||||
const parallelWorkflow: SerializedWorkflow = {
|
||||
blocks: [
|
||||
{
|
||||
id: 'agent-block-1',
|
||||
metadata: { id: 'agent', name: 'Agent' },
|
||||
position: { x: 0, y: 0 },
|
||||
config: {},
|
||||
},
|
||||
{
|
||||
id: 'cond-block-1',
|
||||
metadata: { id: 'condition', name: 'Condition' },
|
||||
position: { x: 100, y: 0 },
|
||||
config: {},
|
||||
},
|
||||
{
|
||||
id: 'target-block-1',
|
||||
metadata: { id: 'api', name: 'Target' },
|
||||
position: { x: 200, y: 0 },
|
||||
config: {},
|
||||
},
|
||||
],
|
||||
connections: [
|
||||
// Connections use base IDs, not virtual IDs
|
||||
{ source: 'agent-block-1', target: 'cond-block-1' },
|
||||
{ source: 'cond-block-1', target: 'target-block-1', sourceHandle: 'condition-cond1' },
|
||||
],
|
||||
loops: [],
|
||||
parallels: [],
|
||||
}
|
||||
|
||||
// Block states use virtual IDs (as outputs are stored per-branch)
|
||||
const parallelBlockStates = new Map<string, BlockState>([
|
||||
[
|
||||
sourceBlockVirtualId,
|
||||
{ output: { response: 'hello from branch 0', success: true }, executed: true },
|
||||
],
|
||||
])
|
||||
|
||||
const parallelContext: ExecutionContext = {
|
||||
workflowId: 'test-workflow-id',
|
||||
workspaceId: 'test-workspace-id',
|
||||
workflow: parallelWorkflow,
|
||||
blockStates: parallelBlockStates,
|
||||
blockLogs: [],
|
||||
completedBlocks: new Set(),
|
||||
decisions: {
|
||||
router: new Map(),
|
||||
condition: new Map(),
|
||||
},
|
||||
environmentVariables: {},
|
||||
workflowVariables: {},
|
||||
}
|
||||
|
||||
const conditions = [
|
||||
{ id: 'cond1', title: 'if', value: 'context.response === "hello from branch 0"' },
|
||||
{ id: 'else1', title: 'else', value: '' },
|
||||
]
|
||||
const inputs = { conditions: JSON.stringify(conditions) }
|
||||
|
||||
const result = await handler.execute(parallelContext, parallelConditionBlock, inputs)
|
||||
|
||||
// The condition should evaluate to true because:
|
||||
// 1. Connection lookup uses base ID 'cond-block-1' (extracted from 'cond-block-1₍0₎')
|
||||
// 2. Source block output is found at virtual ID 'agent-block-1₍0₎' (same branch)
|
||||
// 3. The evaluation context contains { response: 'hello from branch 0' }
|
||||
expect((result as any).conditionResult).toBe(true)
|
||||
expect((result as any).selectedOption).toBe('cond1')
|
||||
expect((result as any).selectedPath).toEqual({
|
||||
blockId: 'target-block-1',
|
||||
blockType: 'api',
|
||||
blockTitle: 'Target',
|
||||
})
|
||||
})
|
||||
|
||||
it('should find correct source block output in parallel branch context', async () => {
|
||||
// Test that when multiple branches exist, the correct branch output is used
|
||||
const parallelConditionBlock: SerializedBlock = {
|
||||
id: 'cond-block-1₍1₎', // Virtual ID for branch 1
|
||||
metadata: { id: 'condition', name: 'Condition' },
|
||||
position: { x: 0, y: 0 },
|
||||
config: {},
|
||||
}
|
||||
|
||||
const parallelWorkflow: SerializedWorkflow = {
|
||||
blocks: [
|
||||
{
|
||||
id: 'agent-block-1',
|
||||
metadata: { id: 'agent', name: 'Agent' },
|
||||
position: { x: 0, y: 0 },
|
||||
config: {},
|
||||
},
|
||||
{
|
||||
id: 'cond-block-1',
|
||||
metadata: { id: 'condition', name: 'Condition' },
|
||||
position: { x: 100, y: 0 },
|
||||
config: {},
|
||||
},
|
||||
{
|
||||
id: 'target-block-1',
|
||||
metadata: { id: 'api', name: 'Target' },
|
||||
position: { x: 200, y: 0 },
|
||||
config: {},
|
||||
},
|
||||
],
|
||||
connections: [
|
||||
{ source: 'agent-block-1', target: 'cond-block-1' },
|
||||
{ source: 'cond-block-1', target: 'target-block-1', sourceHandle: 'condition-cond1' },
|
||||
],
|
||||
loops: [],
|
||||
parallels: [],
|
||||
}
|
||||
|
||||
// Multiple branches have executed - each has different output
|
||||
const parallelBlockStates = new Map<string, BlockState>([
|
||||
['agent-block-1₍0₎', { output: { value: 10 }, executed: true }],
|
||||
['agent-block-1₍1₎', { output: { value: 25 }, executed: true }], // Branch 1 has value 25
|
||||
['agent-block-1₍2₎', { output: { value: 5 }, executed: true }],
|
||||
])
|
||||
|
||||
const parallelContext: ExecutionContext = {
|
||||
workflowId: 'test-workflow-id',
|
||||
workspaceId: 'test-workspace-id',
|
||||
workflow: parallelWorkflow,
|
||||
blockStates: parallelBlockStates,
|
||||
blockLogs: [],
|
||||
completedBlocks: new Set(),
|
||||
decisions: {
|
||||
router: new Map(),
|
||||
condition: new Map(),
|
||||
},
|
||||
environmentVariables: {},
|
||||
workflowVariables: {},
|
||||
}
|
||||
|
||||
// Condition checks if value > 20 - should be true for branch 1 (value=25)
|
||||
const conditions = [
|
||||
{ id: 'cond1', title: 'if', value: 'context.value > 20' },
|
||||
{ id: 'else1', title: 'else', value: '' },
|
||||
]
|
||||
const inputs = { conditions: JSON.stringify(conditions) }
|
||||
|
||||
const result = await handler.execute(parallelContext, parallelConditionBlock, inputs)
|
||||
|
||||
// Should evaluate using branch 1's data (value=25), not branch 0 (value=10) or branch 2 (value=5)
|
||||
expect((result as any).conditionResult).toBe(true)
|
||||
expect((result as any).selectedOption).toBe('cond1')
|
||||
})
|
||||
|
||||
it('should fall back to else when condition is false in parallel branch', async () => {
|
||||
const parallelConditionBlock: SerializedBlock = {
|
||||
id: 'cond-block-1₍2₎', // Virtual ID for branch 2
|
||||
metadata: { id: 'condition', name: 'Condition' },
|
||||
position: { x: 0, y: 0 },
|
||||
config: {},
|
||||
}
|
||||
|
||||
const parallelWorkflow: SerializedWorkflow = {
|
||||
blocks: [
|
||||
{
|
||||
id: 'agent-block-1',
|
||||
metadata: { id: 'agent', name: 'Agent' },
|
||||
position: { x: 0, y: 0 },
|
||||
config: {},
|
||||
},
|
||||
{
|
||||
id: 'cond-block-1',
|
||||
metadata: { id: 'condition', name: 'Condition' },
|
||||
position: { x: 100, y: 0 },
|
||||
config: {},
|
||||
},
|
||||
{
|
||||
id: 'target-true',
|
||||
metadata: { id: 'api', name: 'True Path' },
|
||||
position: { x: 200, y: 0 },
|
||||
config: {},
|
||||
},
|
||||
{
|
||||
id: 'target-false',
|
||||
metadata: { id: 'api', name: 'False Path' },
|
||||
position: { x: 200, y: 100 },
|
||||
config: {},
|
||||
},
|
||||
],
|
||||
connections: [
|
||||
{ source: 'agent-block-1', target: 'cond-block-1' },
|
||||
{ source: 'cond-block-1', target: 'target-true', sourceHandle: 'condition-cond1' },
|
||||
{ source: 'cond-block-1', target: 'target-false', sourceHandle: 'condition-else1' },
|
||||
],
|
||||
loops: [],
|
||||
parallels: [],
|
||||
}
|
||||
|
||||
const parallelBlockStates = new Map<string, BlockState>([
|
||||
['agent-block-1₍0₎', { output: { value: 100 }, executed: true }],
|
||||
['agent-block-1₍1₎', { output: { value: 50 }, executed: true }],
|
||||
['agent-block-1₍2₎', { output: { value: 5 }, executed: true }], // Branch 2 has value 5
|
||||
])
|
||||
|
||||
const parallelContext: ExecutionContext = {
|
||||
workflowId: 'test-workflow-id',
|
||||
workspaceId: 'test-workspace-id',
|
||||
workflow: parallelWorkflow,
|
||||
blockStates: parallelBlockStates,
|
||||
blockLogs: [],
|
||||
completedBlocks: new Set(),
|
||||
decisions: {
|
||||
router: new Map(),
|
||||
condition: new Map(),
|
||||
},
|
||||
environmentVariables: {},
|
||||
workflowVariables: {},
|
||||
}
|
||||
|
||||
// Condition checks if value > 20 - should be false for branch 2 (value=5)
|
||||
const conditions = [
|
||||
{ id: 'cond1', title: 'if', value: 'context.value > 20' },
|
||||
{ id: 'else1', title: 'else', value: '' },
|
||||
]
|
||||
const inputs = { conditions: JSON.stringify(conditions) }
|
||||
|
||||
const result = await handler.execute(parallelContext, parallelConditionBlock, inputs)
|
||||
|
||||
// Should fall back to else path because branch 2's value (5) is not > 20
|
||||
expect((result as any).conditionResult).toBe(true)
|
||||
expect((result as any).selectedOption).toBe('else1')
|
||||
expect((result as any).selectedPath.blockId).toBe('target-false')
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
@@ -3,6 +3,12 @@ import type { BlockOutput } from '@/blocks/types'
|
||||
import { BlockType, CONDITION, DEFAULTS, EDGE } from '@/executor/constants'
|
||||
import type { BlockHandler, ExecutionContext } from '@/executor/types'
|
||||
import { collectBlockData } from '@/executor/utils/block-data'
|
||||
import {
|
||||
buildBranchNodeId,
|
||||
extractBaseBlockId,
|
||||
extractBranchIndex,
|
||||
isBranchNodeId,
|
||||
} from '@/executor/utils/subflow-utils'
|
||||
import type { SerializedBlock } from '@/serializer/types'
|
||||
import { executeTool } from '@/tools'
|
||||
|
||||
@@ -14,11 +20,17 @@ const CONDITION_TIMEOUT_MS = 5000
|
||||
* Evaluates a single condition expression.
|
||||
* Variable resolution is handled consistently with the function block via the function_execute tool.
|
||||
* Returns true if condition is met, false otherwise.
|
||||
*
|
||||
* @param ctx - Execution context
|
||||
* @param conditionExpression - The condition expression to evaluate
|
||||
* @param providedEvalContext - Optional evaluation context with variables
|
||||
* @param currentNodeId - Optional current node ID for parallel branch context resolution
|
||||
*/
|
||||
export async function evaluateConditionExpression(
|
||||
ctx: ExecutionContext,
|
||||
conditionExpression: string,
|
||||
providedEvalContext?: Record<string, any>
|
||||
providedEvalContext?: Record<string, any>,
|
||||
currentNodeId?: string
|
||||
): Promise<boolean> {
|
||||
const evalContext = providedEvalContext || {}
|
||||
|
||||
@@ -26,7 +38,7 @@ export async function evaluateConditionExpression(
|
||||
const contextSetup = `const context = ${JSON.stringify(evalContext)};`
|
||||
const code = `${contextSetup}\nreturn Boolean(${conditionExpression})`
|
||||
|
||||
const { blockData, blockNameMapping, blockOutputSchemas } = collectBlockData(ctx)
|
||||
const { blockData, blockNameMapping, blockOutputSchemas } = collectBlockData(ctx, currentNodeId)
|
||||
|
||||
const result = await executeTool(
|
||||
'function_execute',
|
||||
@@ -83,7 +95,22 @@ export class ConditionBlockHandler implements BlockHandler {
|
||||
): Promise<BlockOutput> {
|
||||
const conditions = this.parseConditions(inputs.conditions)
|
||||
|
||||
const sourceBlockId = ctx.workflow?.connections.find((conn) => conn.target === block.id)?.source
|
||||
// In parallel branches, block.id is virtual (e.g., "condition₍0₎") but connections
|
||||
// use original IDs (e.g., "condition"). Extract the base ID for connection lookups.
|
||||
const baseBlockId = extractBaseBlockId(block.id)
|
||||
const branchIndex = isBranchNodeId(block.id) ? extractBranchIndex(block.id) : null
|
||||
|
||||
const sourceConnection = ctx.workflow?.connections.find((conn) => conn.target === baseBlockId)
|
||||
let sourceBlockId = sourceConnection?.source
|
||||
|
||||
// If we're in a parallel branch, look up the virtual source block ID for the same branch
|
||||
if (sourceBlockId && branchIndex !== null) {
|
||||
const virtualSourceId = buildBranchNodeId(sourceBlockId, branchIndex)
|
||||
if (ctx.blockStates.has(virtualSourceId)) {
|
||||
sourceBlockId = virtualSourceId
|
||||
}
|
||||
}
|
||||
|
||||
const evalContext = this.buildEvaluationContext(ctx, sourceBlockId)
|
||||
const rawSourceOutput = sourceBlockId ? ctx.blockStates.get(sourceBlockId)?.output : null
|
||||
|
||||
@@ -91,13 +118,16 @@ export class ConditionBlockHandler implements BlockHandler {
|
||||
// thinking this block is pausing (it was already resumed by the HITL block)
|
||||
const sourceOutput = this.filterPauseMetadata(rawSourceOutput)
|
||||
|
||||
const outgoingConnections = ctx.workflow?.connections.filter((conn) => conn.source === block.id)
|
||||
const outgoingConnections = ctx.workflow?.connections.filter(
|
||||
(conn) => conn.source === baseBlockId
|
||||
)
|
||||
|
||||
const { selectedConnection, selectedCondition } = await this.evaluateConditions(
|
||||
conditions,
|
||||
outgoingConnections || [],
|
||||
evalContext,
|
||||
ctx
|
||||
ctx,
|
||||
block.id
|
||||
)
|
||||
|
||||
if (!selectedConnection || !selectedCondition) {
|
||||
@@ -170,7 +200,8 @@ export class ConditionBlockHandler implements BlockHandler {
|
||||
conditions: Array<{ id: string; title: string; value: string }>,
|
||||
outgoingConnections: Array<{ source: string; target: string; sourceHandle?: string }>,
|
||||
evalContext: Record<string, any>,
|
||||
ctx: ExecutionContext
|
||||
ctx: ExecutionContext,
|
||||
currentNodeId?: string
|
||||
): Promise<{
|
||||
selectedConnection: { target: string; sourceHandle?: string } | null
|
||||
selectedCondition: { id: string; title: string; value: string } | null
|
||||
@@ -189,7 +220,8 @@ export class ConditionBlockHandler implements BlockHandler {
|
||||
const conditionMet = await evaluateConditionExpression(
|
||||
ctx,
|
||||
conditionValueString,
|
||||
evalContext
|
||||
evalContext,
|
||||
currentNodeId
|
||||
)
|
||||
|
||||
if (conditionMet) {
|
||||
|
||||
@@ -2,6 +2,11 @@ import { normalizeInputFormatValue } from '@/lib/workflows/input-format'
|
||||
import { isTriggerBehavior, normalizeName } from '@/executor/constants'
|
||||
import type { ExecutionContext } from '@/executor/types'
|
||||
import type { OutputSchema } from '@/executor/utils/block-reference'
|
||||
import {
|
||||
extractBaseBlockId,
|
||||
extractBranchIndex,
|
||||
isBranchNodeId,
|
||||
} from '@/executor/utils/subflow-utils'
|
||||
import type { SerializedBlock } from '@/serializer/types'
|
||||
import type { ToolConfig } from '@/tools/types'
|
||||
import { getTool } from '@/tools/utils'
|
||||
@@ -86,14 +91,35 @@ export function getBlockSchema(
|
||||
return undefined
|
||||
}
|
||||
|
||||
export function collectBlockData(ctx: ExecutionContext): BlockDataCollection {
|
||||
export function collectBlockData(
|
||||
ctx: ExecutionContext,
|
||||
currentNodeId?: string
|
||||
): BlockDataCollection {
|
||||
const blockData: Record<string, unknown> = {}
|
||||
const blockNameMapping: Record<string, string> = {}
|
||||
const blockOutputSchemas: Record<string, OutputSchema> = {}
|
||||
|
||||
// Determine if we're in a parallel branch context
|
||||
const branchIndex =
|
||||
currentNodeId && isBranchNodeId(currentNodeId) ? extractBranchIndex(currentNodeId) : null
|
||||
|
||||
for (const [id, state] of ctx.blockStates.entries()) {
|
||||
if (state.output !== undefined) {
|
||||
blockData[id] = state.output
|
||||
|
||||
// If we're in a parallel branch and this state is from the same branch,
|
||||
// also map it under the base block ID so references like <BlockName.field>
|
||||
// resolve correctly within the same branch context
|
||||
if (branchIndex !== null && isBranchNodeId(id)) {
|
||||
const stateBranchIndex = extractBranchIndex(id)
|
||||
if (stateBranchIndex === branchIndex) {
|
||||
const baseId = extractBaseBlockId(id)
|
||||
// Only set if not already set (prefer branch-specific data)
|
||||
if (blockData[baseId] === undefined) {
|
||||
blockData[baseId] = state.output
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user