mirror of
https://github.com/simstudioai/sim.git
synced 2026-03-15 03:00:33 -04:00
Compare commits
20 Commits
waleedlati
...
v0.5.106
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
3ce947566d | ||
|
|
4f45f705a5 | ||
|
|
d640fa0852 | ||
|
|
28f8e0fd97 | ||
|
|
cc38ecaf12 | ||
|
|
70c36cb7aa | ||
|
|
f1ec5fe824 | ||
|
|
e07e3c34cc | ||
|
|
0d2e6ff31d | ||
|
|
4fd0989264 | ||
|
|
67f8a687f6 | ||
|
|
af592349d3 | ||
|
|
0d86ea01f0 | ||
|
|
115f04e989 | ||
|
|
34d92fae89 | ||
|
|
67aa4bb332 | ||
|
|
15ace5e63f | ||
|
|
fdca73679d | ||
|
|
da46a387c9 | ||
|
|
b7e377ec4b |
10
.github/workflows/test-build.yml
vendored
10
.github/workflows/test-build.yml
vendored
@@ -90,6 +90,16 @@ jobs:
|
||||
|
||||
echo "✅ All feature flags are properly configured"
|
||||
|
||||
- name: Check subblock ID stability
|
||||
run: |
|
||||
if [ "${{ github.event_name }}" = "pull_request" ]; then
|
||||
BASE_REF="origin/${{ github.base_ref }}"
|
||||
git fetch --depth=1 origin "${{ github.base_ref }}" 2>/dev/null || true
|
||||
else
|
||||
BASE_REF="HEAD~1"
|
||||
fi
|
||||
bun run apps/sim/scripts/check-subblock-id-stability.ts "$BASE_REF"
|
||||
|
||||
- name: Lint code
|
||||
run: bun run lint:check
|
||||
|
||||
|
||||
@@ -20,7 +20,10 @@ import {
|
||||
TriggerUtils,
|
||||
} from '@/lib/workflows/triggers/triggers'
|
||||
import { useCurrentWorkflow } from '@/app/workspace/[workspaceId]/w/[workflowId]/hooks/use-current-workflow'
|
||||
import { updateActiveBlockRefCount } from '@/app/workspace/[workspaceId]/w/[workflowId]/utils/workflow-execution-utils'
|
||||
import {
|
||||
markOutgoingEdgesFromOutput,
|
||||
updateActiveBlockRefCount,
|
||||
} from '@/app/workspace/[workspaceId]/w/[workflowId]/utils/workflow-execution-utils'
|
||||
import { getBlock } from '@/blocks'
|
||||
import type { SerializableExecutionState } from '@/executor/execution/types'
|
||||
import type {
|
||||
@@ -63,7 +66,7 @@ interface DebugValidationResult {
|
||||
interface BlockEventHandlerConfig {
|
||||
workflowId?: string
|
||||
executionIdRef: { current: string }
|
||||
workflowEdges: Array<{ id: string; target: string; sourceHandle?: string | null }>
|
||||
workflowEdges: Array<{ id: string; source: string; target: string; sourceHandle?: string | null }>
|
||||
activeBlocksSet: Set<string>
|
||||
activeBlockRefCounts: Map<string, number>
|
||||
accumulatedBlockLogs: BlockLog[]
|
||||
@@ -335,13 +338,9 @@ export function useWorkflowExecution() {
|
||||
setActiveBlocks(workflowId, new Set(activeBlocksSet))
|
||||
}
|
||||
|
||||
const markIncomingEdges = (blockId: string) => {
|
||||
const markOutgoingEdges = (blockId: string, output: Record<string, any> | undefined) => {
|
||||
if (!workflowId) return
|
||||
const incomingEdges = workflowEdges.filter((edge) => edge.target === blockId)
|
||||
incomingEdges.forEach((edge) => {
|
||||
const status = edge.sourceHandle === 'error' ? 'error' : 'success'
|
||||
setEdgeRunStatus(workflowId, edge.id, status)
|
||||
})
|
||||
markOutgoingEdgesFromOutput(blockId, output, workflowEdges, workflowId, setEdgeRunStatus)
|
||||
}
|
||||
|
||||
const isContainerBlockType = (blockType?: string) => {
|
||||
@@ -460,7 +459,6 @@ export function useWorkflowExecution() {
|
||||
const onBlockStarted = (data: BlockStartedData) => {
|
||||
if (isStaleExecution()) return
|
||||
updateActiveBlocks(data.blockId, true)
|
||||
markIncomingEdges(data.blockId)
|
||||
|
||||
if (!includeStartConsoleEntry || !workflowId) return
|
||||
|
||||
@@ -487,6 +485,7 @@ export function useWorkflowExecution() {
|
||||
if (isStaleExecution()) return
|
||||
updateActiveBlocks(data.blockId, false)
|
||||
if (workflowId) setBlockRunStatus(workflowId, data.blockId, 'success')
|
||||
markOutgoingEdges(data.blockId, data.output as Record<string, any> | undefined)
|
||||
executedBlockIds.add(data.blockId)
|
||||
accumulatedBlockStates.set(data.blockId, {
|
||||
output: data.output,
|
||||
@@ -505,7 +504,9 @@ export function useWorkflowExecution() {
|
||||
}
|
||||
|
||||
if (isContainerBlockType(data.blockType) && !data.iterationContainerId) {
|
||||
return
|
||||
const output = data.output as Record<string, any> | undefined
|
||||
const isEmptySubflow = Array.isArray(output?.results) && output.results.length === 0
|
||||
if (!isEmptySubflow) return
|
||||
}
|
||||
|
||||
accumulatedBlockLogs.push(createBlockLogEntry(data, { success: true, output: data.output }))
|
||||
@@ -527,6 +528,7 @@ export function useWorkflowExecution() {
|
||||
if (isStaleExecution()) return
|
||||
updateActiveBlocks(data.blockId, false)
|
||||
if (workflowId) setBlockRunStatus(workflowId, data.blockId, 'error')
|
||||
markOutgoingEdges(data.blockId, { error: data.error })
|
||||
|
||||
executedBlockIds.add(data.blockId)
|
||||
accumulatedBlockStates.set(data.blockId, {
|
||||
|
||||
@@ -29,6 +29,62 @@ export function updateActiveBlockRefCount(
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Determines if a workflow edge should be marked as active based on its handle and the block output.
|
||||
* Mirrors the executor's EdgeManager.shouldActivateEdge logic on the client side.
|
||||
* Exclude sentinel handles here
|
||||
*/
|
||||
function shouldActivateEdgeClient(
|
||||
handle: string | null | undefined,
|
||||
output: Record<string, any> | undefined
|
||||
): boolean {
|
||||
if (!handle) return true
|
||||
|
||||
if (handle.startsWith('condition-')) {
|
||||
return output?.selectedOption === handle.substring('condition-'.length)
|
||||
}
|
||||
|
||||
if (handle.startsWith('router-')) {
|
||||
return output?.selectedRoute === handle.substring('router-'.length)
|
||||
}
|
||||
|
||||
switch (handle) {
|
||||
case 'error':
|
||||
return !!output?.error
|
||||
case 'source':
|
||||
return !output?.error
|
||||
case 'loop-start-source':
|
||||
case 'loop-end-source':
|
||||
case 'parallel-start-source':
|
||||
case 'parallel-end-source':
|
||||
return true
|
||||
default:
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
export function markOutgoingEdgesFromOutput(
|
||||
blockId: string,
|
||||
output: Record<string, any> | undefined,
|
||||
workflowEdges: Array<{
|
||||
id: string
|
||||
source: string
|
||||
target: string
|
||||
sourceHandle?: string | null
|
||||
}>,
|
||||
workflowId: string,
|
||||
setEdgeRunStatus: (wfId: string, edgeId: string, status: 'success' | 'error') => void
|
||||
): void {
|
||||
const outgoing = workflowEdges.filter((edge) => edge.source === blockId)
|
||||
for (const edge of outgoing) {
|
||||
const handle = edge.sourceHandle
|
||||
if (shouldActivateEdgeClient(handle, output)) {
|
||||
const status = handle === 'error' ? 'error' : output?.error ? 'error' : 'success'
|
||||
setEdgeRunStatus(workflowId, edge.id, status)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
export interface WorkflowExecutionOptions {
|
||||
workflowInput?: any
|
||||
onStream?: (se: StreamingExecution) => Promise<void>
|
||||
@@ -135,13 +191,6 @@ export async function executeWorkflowWithFullLogging(
|
||||
true
|
||||
)
|
||||
setActiveBlocks(wfId, new Set(activeBlocksSet))
|
||||
|
||||
const incomingEdges = workflowEdges.filter(
|
||||
(edge) => edge.target === event.data.blockId
|
||||
)
|
||||
incomingEdges.forEach((edge) => {
|
||||
setEdgeRunStatus(wfId, edge.id, 'success')
|
||||
})
|
||||
break
|
||||
}
|
||||
|
||||
@@ -155,6 +204,13 @@ export async function executeWorkflowWithFullLogging(
|
||||
setActiveBlocks(wfId, new Set(activeBlocksSet))
|
||||
|
||||
setBlockRunStatus(wfId, event.data.blockId, 'success')
|
||||
markOutgoingEdgesFromOutput(
|
||||
event.data.blockId,
|
||||
event.data.output,
|
||||
workflowEdges,
|
||||
wfId,
|
||||
setEdgeRunStatus
|
||||
)
|
||||
|
||||
addConsole({
|
||||
input: event.data.input || {},
|
||||
@@ -194,6 +250,13 @@ export async function executeWorkflowWithFullLogging(
|
||||
setActiveBlocks(wfId, new Set(activeBlocksSet))
|
||||
|
||||
setBlockRunStatus(wfId, event.data.blockId, 'error')
|
||||
markOutgoingEdgesFromOutput(
|
||||
event.data.blockId,
|
||||
{ error: event.data.error },
|
||||
workflowEdges,
|
||||
wfId,
|
||||
setEdgeRunStatus
|
||||
)
|
||||
|
||||
addConsole({
|
||||
input: event.data.input || {},
|
||||
|
||||
@@ -145,7 +145,7 @@ interface PreviewWorkflowProps {
|
||||
/** Cursor style to show when hovering the canvas */
|
||||
cursorStyle?: 'default' | 'pointer' | 'grab'
|
||||
/** Map of executed block IDs to their status for highlighting the execution path */
|
||||
executedBlocks?: Record<string, { status: string }>
|
||||
executedBlocks?: Record<string, { status: string; output?: unknown }>
|
||||
/** Currently selected block ID for highlighting */
|
||||
selectedBlockId?: string | null
|
||||
/** Skips expensive subblock computations for thumbnails/template previews */
|
||||
@@ -274,9 +274,9 @@ export function PreviewWorkflow({
|
||||
|
||||
/** Maps base block IDs to execution data, handling parallel iteration variants (blockId₍n₎). */
|
||||
const blockExecutionMap = useMemo(() => {
|
||||
if (!executedBlocks) return new Map<string, { status: string }>()
|
||||
if (!executedBlocks) return new Map<string, { status: string; output?: unknown }>()
|
||||
|
||||
const map = new Map<string, { status: string }>()
|
||||
const map = new Map<string, { status: string; output?: unknown }>()
|
||||
for (const [key, value] of Object.entries(executedBlocks)) {
|
||||
// Extract base ID (remove iteration suffix like ₍0₎)
|
||||
const baseId = key.includes('₍') ? key.split('₍')[0] : key
|
||||
@@ -451,7 +451,6 @@ export function PreviewWorkflow({
|
||||
const edges: Edge[] = useMemo(() => {
|
||||
if (!isValidWorkflowState) return []
|
||||
|
||||
/** Edge is green if target executed and source condition met by edge type. */
|
||||
const getEdgeExecutionStatus = (edge: {
|
||||
source: string
|
||||
target: string
|
||||
@@ -463,17 +462,40 @@ export function PreviewWorkflow({
|
||||
if (!targetStatus?.executed) return 'not-executed'
|
||||
|
||||
const sourceStatus = getBlockExecutionStatus(edge.source)
|
||||
const { sourceHandle } = edge
|
||||
if (!sourceStatus?.executed) return 'not-executed'
|
||||
|
||||
if (sourceHandle === 'error') {
|
||||
return sourceStatus?.status === 'error' ? 'success' : 'not-executed'
|
||||
const handle = edge.sourceHandle
|
||||
if (!handle) {
|
||||
return sourceStatus.status === 'success' ? 'success' : 'not-executed'
|
||||
}
|
||||
|
||||
if (sourceHandle === 'loop-start-source' || sourceHandle === 'parallel-start-source') {
|
||||
return 'success'
|
||||
const sourceOutput = blockExecutionMap.get(edge.source)?.output as
|
||||
| Record<string, any>
|
||||
| undefined
|
||||
|
||||
if (handle.startsWith('condition-')) {
|
||||
const conditionValue = handle.substring('condition-'.length)
|
||||
return sourceOutput?.selectedOption === conditionValue ? 'success' : 'not-executed'
|
||||
}
|
||||
|
||||
return sourceStatus?.status === 'success' ? 'success' : 'not-executed'
|
||||
if (handle.startsWith('router-')) {
|
||||
const routeId = handle.substring('router-'.length)
|
||||
return sourceOutput?.selectedRoute === routeId ? 'success' : 'not-executed'
|
||||
}
|
||||
|
||||
switch (handle) {
|
||||
case 'error':
|
||||
return sourceStatus.status === 'error' ? 'error' : 'not-executed'
|
||||
case 'source':
|
||||
return sourceStatus.status === 'success' ? 'success' : 'not-executed'
|
||||
case 'loop-start-source':
|
||||
case 'loop-end-source':
|
||||
case 'parallel-start-source':
|
||||
case 'parallel-end-source':
|
||||
return 'success'
|
||||
default:
|
||||
return sourceStatus.status === 'success' ? 'success' : 'not-executed'
|
||||
}
|
||||
}
|
||||
|
||||
return (workflowState.edges || []).map((edge) => {
|
||||
|
||||
@@ -66,11 +66,15 @@ describe('EdgeManager', () => {
|
||||
const dag = createMockDAG(nodes)
|
||||
const edgeManager = new EdgeManager(dag)
|
||||
|
||||
const readyAfterA = edgeManager.processOutgoingEdges(blockANode, { result: 'done' })
|
||||
const readyAfterA = edgeManager.processOutgoingEdges(blockANode, {
|
||||
result: 'done',
|
||||
})
|
||||
expect(readyAfterA).toContain(blockBId)
|
||||
expect(readyAfterA).not.toContain(blockCId)
|
||||
|
||||
const readyAfterB = edgeManager.processOutgoingEdges(blockBNode, { result: 'done' })
|
||||
const readyAfterB = edgeManager.processOutgoingEdges(blockBNode, {
|
||||
result: 'done',
|
||||
})
|
||||
expect(readyAfterB).toContain(blockCId)
|
||||
})
|
||||
|
||||
@@ -591,7 +595,9 @@ describe('EdgeManager', () => {
|
||||
|
||||
function1Node.incomingEdges.add(conditionId)
|
||||
|
||||
const readyNodes = edgeManager.processOutgoingEdges(conditionNode, { selectedOption: 'if' })
|
||||
const readyNodes = edgeManager.processOutgoingEdges(conditionNode, {
|
||||
selectedOption: 'if',
|
||||
})
|
||||
expect(readyNodes).toContain(function1Id)
|
||||
})
|
||||
})
|
||||
@@ -977,11 +983,15 @@ describe('EdgeManager', () => {
|
||||
const dag = createMockDAG(nodes)
|
||||
const edgeManager = new EdgeManager(dag)
|
||||
|
||||
const ready1 = edgeManager.processOutgoingEdges(condition1Node, { selectedOption: 'if' })
|
||||
const ready1 = edgeManager.processOutgoingEdges(condition1Node, {
|
||||
selectedOption: 'if',
|
||||
})
|
||||
expect(ready1).toContain(condition2Id)
|
||||
expect(ready1).not.toContain(target1Id)
|
||||
|
||||
const ready2 = edgeManager.processOutgoingEdges(condition2Node, { selectedOption: 'else' })
|
||||
const ready2 = edgeManager.processOutgoingEdges(condition2Node, {
|
||||
selectedOption: 'else',
|
||||
})
|
||||
expect(ready2).toContain(target1Id)
|
||||
expect(ready2).not.toContain(target2Id)
|
||||
})
|
||||
@@ -1394,10 +1404,14 @@ describe('EdgeManager', () => {
|
||||
const edgeManager = new EdgeManager(dag)
|
||||
|
||||
// Path: condition1(if) → condition2(else) → nodeC → sentinel_end
|
||||
const ready1 = edgeManager.processOutgoingEdges(condition1Node, { selectedOption: 'if' })
|
||||
const ready1 = edgeManager.processOutgoingEdges(condition1Node, {
|
||||
selectedOption: 'if',
|
||||
})
|
||||
expect(ready1).toContain(condition2Id)
|
||||
|
||||
const ready2 = edgeManager.processOutgoingEdges(condition2Node, { selectedOption: 'else' })
|
||||
const ready2 = edgeManager.processOutgoingEdges(condition2Node, {
|
||||
selectedOption: 'else',
|
||||
})
|
||||
expect(ready2).toContain(nodeCId)
|
||||
|
||||
const ready3 = edgeManager.processOutgoingEdges(nodeCNode, {})
|
||||
@@ -1448,7 +1462,9 @@ describe('EdgeManager', () => {
|
||||
const edgeManager = new EdgeManager(dag)
|
||||
|
||||
// Test else path through diamond
|
||||
const ready1 = edgeManager.processOutgoingEdges(conditionNode, { selectedOption: 'else' })
|
||||
const ready1 = edgeManager.processOutgoingEdges(conditionNode, {
|
||||
selectedOption: 'else',
|
||||
})
|
||||
expect(ready1).toContain(nodeBId)
|
||||
expect(ready1).not.toContain(nodeAId)
|
||||
|
||||
@@ -1509,7 +1525,9 @@ describe('EdgeManager', () => {
|
||||
const edgeManager = new EdgeManager(dag)
|
||||
|
||||
// Select else - triggers deep cascade deactivation of if path
|
||||
const ready1 = edgeManager.processOutgoingEdges(conditionNode, { selectedOption: 'else' })
|
||||
const ready1 = edgeManager.processOutgoingEdges(conditionNode, {
|
||||
selectedOption: 'else',
|
||||
})
|
||||
expect(ready1).toContain(nodeDId)
|
||||
|
||||
const ready2 = edgeManager.processOutgoingEdges(nodeDNode, {})
|
||||
@@ -1566,7 +1584,9 @@ describe('EdgeManager', () => {
|
||||
const edgeManager = new EdgeManager(dag)
|
||||
|
||||
// Test middle branch (elseif2)
|
||||
const ready1 = edgeManager.processOutgoingEdges(conditionNode, { selectedOption: 'elseif2' })
|
||||
const ready1 = edgeManager.processOutgoingEdges(conditionNode, {
|
||||
selectedOption: 'elseif2',
|
||||
})
|
||||
expect(ready1).toContain(nodeCId)
|
||||
expect(ready1).not.toContain(nodeAId)
|
||||
expect(ready1).not.toContain(nodeBId)
|
||||
@@ -1629,7 +1649,7 @@ describe('EdgeManager', () => {
|
||||
// Scenario: Loop with Function 1 → Condition 1 → Function 2
|
||||
// Condition has "if" branch → Function 2
|
||||
// Condition has "else" branch → NO connection (dead end)
|
||||
// When else is selected (selectedOption: null), the loop should continue
|
||||
// When else is selected, the loop sentinel should still fire
|
||||
//
|
||||
// DAG structure:
|
||||
// sentinel_start → func1 → condition → (if) → func2 → sentinel_end
|
||||
@@ -1637,11 +1657,12 @@ describe('EdgeManager', () => {
|
||||
// sentinel_end → (loop_continue) → sentinel_start
|
||||
//
|
||||
// When condition takes else with no edge:
|
||||
// - selectedOption: null (no condition matches)
|
||||
// - selectedOption is set (condition made a routing decision)
|
||||
// - The "if" edge gets deactivated
|
||||
// - func2 has no other active incoming edges, so edge to sentinel_end gets deactivated
|
||||
// - sentinel_end has no active incoming edges and should become ready
|
||||
// - sentinel_end is the enclosing loop's sentinel and should become ready
|
||||
|
||||
const loopId = 'loop-1'
|
||||
const sentinelStartId = 'sentinel-start'
|
||||
const sentinelEndId = 'sentinel-end'
|
||||
const func1Id = 'func1'
|
||||
@@ -1649,14 +1670,21 @@ describe('EdgeManager', () => {
|
||||
const func2Id = 'func2'
|
||||
|
||||
const sentinelStartNode = createMockNode(sentinelStartId, [{ target: func1Id }])
|
||||
sentinelStartNode.metadata = { isSentinel: true, sentinelType: 'start', loopId }
|
||||
|
||||
const func1Node = createMockNode(func1Id, [{ target: conditionId }], [sentinelStartId])
|
||||
// Condition only has "if" branch, no "else" edge (dead end)
|
||||
func1Node.metadata = { loopId, isLoopNode: true }
|
||||
|
||||
const conditionNode = createMockNode(
|
||||
conditionId,
|
||||
[{ target: func2Id, sourceHandle: 'condition-if' }],
|
||||
[func1Id]
|
||||
)
|
||||
conditionNode.metadata = { loopId, isLoopNode: true }
|
||||
|
||||
const func2Node = createMockNode(func2Id, [{ target: sentinelEndId }], [conditionId])
|
||||
func2Node.metadata = { loopId, isLoopNode: true }
|
||||
|
||||
const sentinelEndNode = createMockNode(
|
||||
sentinelEndId,
|
||||
[
|
||||
@@ -1665,6 +1693,8 @@ describe('EdgeManager', () => {
|
||||
],
|
||||
[func2Id]
|
||||
)
|
||||
sentinelEndNode.metadata = { isSentinel: true, sentinelType: 'end', loopId }
|
||||
|
||||
const afterLoopNode = createMockNode('after-loop', [], [sentinelEndId])
|
||||
|
||||
const nodes = new Map<string, DAGNode>([
|
||||
@@ -1679,22 +1709,17 @@ describe('EdgeManager', () => {
|
||||
const dag = createMockDAG(nodes)
|
||||
const edgeManager = new EdgeManager(dag)
|
||||
|
||||
// Simulate execution: sentinel_start → func1 → condition
|
||||
// Clear incoming edges as execution progresses (simulating normal flow)
|
||||
func1Node.incomingEdges.clear()
|
||||
conditionNode.incomingEdges.clear()
|
||||
|
||||
// Condition takes "else" but there's no else edge
|
||||
// selectedOption: null means no condition branch matches
|
||||
// Condition selects dead-end else (selectedOption is set — routing decision made)
|
||||
// but it's inside the loop, so the enclosing sentinel should still fire
|
||||
const ready = edgeManager.processOutgoingEdges(conditionNode, {
|
||||
selectedOption: null,
|
||||
conditionResult: false,
|
||||
selectedOption: 'else-id',
|
||||
conditionResult: true,
|
||||
selectedPath: null,
|
||||
})
|
||||
|
||||
// The "if" edge to func2 should be deactivated
|
||||
// func2 has no other incoming edges, so its edge to sentinel_end gets deactivated
|
||||
// sentinel_end has no active incoming edges and should be ready
|
||||
expect(ready).toContain(sentinelEndId)
|
||||
})
|
||||
|
||||
@@ -1763,11 +1788,12 @@ describe('EdgeManager', () => {
|
||||
// → (else) → [nothing]
|
||||
// → (else) → [nothing]
|
||||
//
|
||||
// When condition1 takes if, then condition2 takes else:
|
||||
// When condition1 takes if, then condition2 takes else (dead-end):
|
||||
// - condition2's "if" edge to func gets deactivated
|
||||
// - func's edge to sentinel_end gets deactivated
|
||||
// - sentinel_end should become ready
|
||||
// - sentinel_end is the enclosing loop's sentinel and should become ready
|
||||
|
||||
const loopId = 'loop-1'
|
||||
const sentinelStartId = 'sentinel-start'
|
||||
const sentinelEndId = 'sentinel-end'
|
||||
const condition1Id = 'condition1'
|
||||
@@ -1775,22 +1801,31 @@ describe('EdgeManager', () => {
|
||||
const funcId = 'func'
|
||||
|
||||
const sentinelStartNode = createMockNode(sentinelStartId, [{ target: condition1Id }])
|
||||
sentinelStartNode.metadata = { isSentinel: true, sentinelType: 'start', loopId }
|
||||
|
||||
const condition1Node = createMockNode(
|
||||
condition1Id,
|
||||
[{ target: condition2Id, sourceHandle: 'condition-if' }],
|
||||
[sentinelStartId]
|
||||
)
|
||||
condition1Node.metadata = { loopId, isLoopNode: true }
|
||||
|
||||
const condition2Node = createMockNode(
|
||||
condition2Id,
|
||||
[{ target: funcId, sourceHandle: 'condition-if' }],
|
||||
[condition1Id]
|
||||
)
|
||||
condition2Node.metadata = { loopId, isLoopNode: true }
|
||||
|
||||
const funcNode = createMockNode(funcId, [{ target: sentinelEndId }], [condition2Id])
|
||||
funcNode.metadata = { loopId, isLoopNode: true }
|
||||
|
||||
const sentinelEndNode = createMockNode(
|
||||
sentinelEndId,
|
||||
[{ target: sentinelStartId, sourceHandle: 'loop_continue' }],
|
||||
[funcId]
|
||||
)
|
||||
sentinelEndNode.metadata = { isSentinel: true, sentinelType: 'end', loopId }
|
||||
|
||||
const nodes = new Map<string, DAGNode>([
|
||||
[sentinelStartId, sentinelStartNode],
|
||||
@@ -1803,22 +1838,95 @@ describe('EdgeManager', () => {
|
||||
const dag = createMockDAG(nodes)
|
||||
const edgeManager = new EdgeManager(dag)
|
||||
|
||||
// Clear incoming edges as execution progresses
|
||||
condition1Node.incomingEdges.clear()
|
||||
|
||||
// condition1 takes "if" - condition2 becomes ready
|
||||
const ready1 = edgeManager.processOutgoingEdges(condition1Node, { selectedOption: 'if' })
|
||||
const ready1 = edgeManager.processOutgoingEdges(condition1Node, {
|
||||
selectedOption: 'if',
|
||||
})
|
||||
expect(ready1).toContain(condition2Id)
|
||||
|
||||
condition2Node.incomingEdges.clear()
|
||||
|
||||
// condition2 takes "else" (dead end)
|
||||
const ready2 = edgeManager.processOutgoingEdges(condition2Node, { selectedOption: null })
|
||||
// condition2 selects dead-end else (selectedOption set — routing decision made)
|
||||
const ready2 = edgeManager.processOutgoingEdges(condition2Node, {
|
||||
selectedOption: 'else-id',
|
||||
})
|
||||
|
||||
// sentinel_end should be ready because all paths to it are deactivated
|
||||
// sentinel_end is the enclosing loop's sentinel and should be ready
|
||||
expect(ready2).toContain(sentinelEndId)
|
||||
})
|
||||
|
||||
it('should not fire nested subflow sentinel when condition inside outer loop hits dead-end', () => {
|
||||
// Scenario: outer loop contains condition → (if) → inner loop → sentinel_end
|
||||
// → (else) → [dead end]
|
||||
//
|
||||
// When condition selects dead-end else:
|
||||
// - The outer loop's sentinel should fire (enclosing subflow)
|
||||
// - The inner loop's sentinel should NOT fire (downstream subflow)
|
||||
|
||||
const outerLoopId = 'outer-loop'
|
||||
const innerLoopId = 'inner-loop'
|
||||
const outerStartId = 'outer-start'
|
||||
const outerEndId = 'outer-end'
|
||||
const conditionId = 'condition'
|
||||
const innerStartId = 'inner-start'
|
||||
const innerBodyId = 'inner-body'
|
||||
const innerEndId = 'inner-end'
|
||||
|
||||
const outerStartNode = createMockNode(outerStartId, [{ target: conditionId }])
|
||||
outerStartNode.metadata = { isSentinel: true, sentinelType: 'start', loopId: outerLoopId }
|
||||
|
||||
const conditionNode = createMockNode(
|
||||
conditionId,
|
||||
[{ target: innerStartId, sourceHandle: 'condition-if' }],
|
||||
[outerStartId]
|
||||
)
|
||||
conditionNode.metadata = { loopId: outerLoopId, isLoopNode: true }
|
||||
|
||||
const innerStartNode = createMockNode(innerStartId, [{ target: innerBodyId }], [conditionId])
|
||||
innerStartNode.metadata = { isSentinel: true, sentinelType: 'start', loopId: innerLoopId }
|
||||
|
||||
const innerBodyNode = createMockNode(innerBodyId, [{ target: innerEndId }], [innerStartId])
|
||||
innerBodyNode.metadata = { loopId: innerLoopId, isLoopNode: true }
|
||||
|
||||
const innerEndNode = createMockNode(
|
||||
innerEndId,
|
||||
[{ target: outerEndId, sourceHandle: 'loop_exit' }],
|
||||
[innerBodyId]
|
||||
)
|
||||
innerEndNode.metadata = { isSentinel: true, sentinelType: 'end', loopId: innerLoopId }
|
||||
|
||||
const outerEndNode = createMockNode(
|
||||
outerEndId,
|
||||
[{ target: outerStartId, sourceHandle: 'loop_continue' }],
|
||||
[innerEndId]
|
||||
)
|
||||
outerEndNode.metadata = { isSentinel: true, sentinelType: 'end', loopId: outerLoopId }
|
||||
|
||||
const nodes = new Map<string, DAGNode>([
|
||||
[outerStartId, outerStartNode],
|
||||
[conditionId, conditionNode],
|
||||
[innerStartId, innerStartNode],
|
||||
[innerBodyId, innerBodyNode],
|
||||
[innerEndId, innerEndNode],
|
||||
[outerEndId, outerEndNode],
|
||||
])
|
||||
|
||||
const dag = createMockDAG(nodes)
|
||||
const edgeManager = new EdgeManager(dag)
|
||||
|
||||
conditionNode.incomingEdges.clear()
|
||||
|
||||
const ready = edgeManager.processOutgoingEdges(conditionNode, {
|
||||
selectedOption: 'else-id',
|
||||
})
|
||||
|
||||
// Outer loop sentinel should fire (condition is inside outer loop)
|
||||
expect(ready).toContain(outerEndId)
|
||||
// Inner loop sentinel should NOT fire (it's a downstream subflow)
|
||||
expect(ready).not.toContain(innerEndId)
|
||||
})
|
||||
|
||||
it('should NOT execute intermediate nodes in long cascade chains (2+ hops)', () => {
|
||||
// Regression test: When condition hits dead-end with 2+ intermediate nodes,
|
||||
// only sentinel_end should be ready, NOT the intermediate nodes.
|
||||
@@ -1922,7 +2030,9 @@ describe('EdgeManager', () => {
|
||||
const edgeManager = new EdgeManager(dag)
|
||||
|
||||
// Select else path
|
||||
const ready1 = edgeManager.processOutgoingEdges(conditionNode, { selectedOption: 'else' })
|
||||
const ready1 = edgeManager.processOutgoingEdges(conditionNode, {
|
||||
selectedOption: 'else',
|
||||
})
|
||||
expect(ready1).toContain(nodeBId)
|
||||
expect(ready1).not.toContain(nodeAId)
|
||||
|
||||
@@ -1968,7 +2078,9 @@ describe('EdgeManager', () => {
|
||||
const edgeManager = new EdgeManager(dag)
|
||||
|
||||
// When selectedOption is null, the cascade deactivation makes parallel_end ready
|
||||
const ready = edgeManager.processOutgoingEdges(conditionNode, { selectedOption: null })
|
||||
const ready = edgeManager.processOutgoingEdges(conditionNode, {
|
||||
selectedOption: null,
|
||||
})
|
||||
expect(ready).toContain(parallelEndId)
|
||||
})
|
||||
|
||||
@@ -2039,11 +2151,15 @@ describe('EdgeManager', () => {
|
||||
const edgeManager = new EdgeManager(dag)
|
||||
|
||||
// Branch 1: condition1 selects else
|
||||
const ready1 = edgeManager.processOutgoingEdges(condition1Node, { selectedOption: 'else' })
|
||||
const ready1 = edgeManager.processOutgoingEdges(condition1Node, {
|
||||
selectedOption: 'else',
|
||||
})
|
||||
expect(ready1).toContain(nodeBId)
|
||||
|
||||
// Branch 2: condition2 selects if
|
||||
const ready2 = edgeManager.processOutgoingEdges(condition2Node, { selectedOption: 'if' })
|
||||
const ready2 = edgeManager.processOutgoingEdges(condition2Node, {
|
||||
selectedOption: 'if',
|
||||
})
|
||||
expect(ready2).toContain(nodeCId)
|
||||
|
||||
// Both complete
|
||||
@@ -2200,7 +2316,9 @@ describe('EdgeManager', () => {
|
||||
const edgeManager = new EdgeManager(dag)
|
||||
|
||||
// nodeA errors
|
||||
const ready1 = edgeManager.processOutgoingEdges(nodeANode, { error: 'Something failed' })
|
||||
const ready1 = edgeManager.processOutgoingEdges(nodeANode, {
|
||||
error: 'Something failed',
|
||||
})
|
||||
expect(ready1).toContain(errorNodeId)
|
||||
expect(ready1).not.toContain(successNodeId)
|
||||
|
||||
@@ -2289,7 +2407,9 @@ describe('EdgeManager', () => {
|
||||
edgeManager.processOutgoingEdges(conditionNode, { selectedOption: 'if' })
|
||||
edgeManager.processOutgoingEdges(nodeANode, {})
|
||||
|
||||
const ready2 = edgeManager.processOutgoingEdges(loopEndNode, { selectedRoute: 'loop_exit' })
|
||||
const ready2 = edgeManager.processOutgoingEdges(loopEndNode, {
|
||||
selectedRoute: 'loop_exit',
|
||||
})
|
||||
expect(ready2).toContain(parallelEndId)
|
||||
|
||||
const ready3 = edgeManager.processOutgoingEdges(parallelEndNode, {
|
||||
@@ -2413,7 +2533,9 @@ describe('EdgeManager', () => {
|
||||
const dag = createMockDAG(nodes)
|
||||
const edgeManager = new EdgeManager(dag)
|
||||
|
||||
const successReady = edgeManager.processOutgoingEdges(sourceNode, { result: 'ok' })
|
||||
const successReady = edgeManager.processOutgoingEdges(sourceNode, {
|
||||
result: 'ok',
|
||||
})
|
||||
expect(successReady).toContain(targetId)
|
||||
})
|
||||
})
|
||||
@@ -2472,7 +2594,9 @@ describe('EdgeManager', () => {
|
||||
const edgeManager = new EdgeManager(dag)
|
||||
|
||||
// Condition selects "else" branch, deactivating the "if" branch (which contains the loop)
|
||||
const readyNodes = edgeManager.processOutgoingEdges(conditionNode, { selectedOption: 'else' })
|
||||
const readyNodes = edgeManager.processOutgoingEdges(conditionNode, {
|
||||
selectedOption: 'else',
|
||||
})
|
||||
|
||||
// Only otherBranch should be ready
|
||||
expect(readyNodes).toContain(otherBranchId)
|
||||
@@ -2539,7 +2663,9 @@ describe('EdgeManager', () => {
|
||||
const edgeManager = new EdgeManager(dag)
|
||||
|
||||
// Condition selects "else" branch
|
||||
const readyNodes = edgeManager.processOutgoingEdges(conditionNode, { selectedOption: 'else' })
|
||||
const readyNodes = edgeManager.processOutgoingEdges(conditionNode, {
|
||||
selectedOption: 'else',
|
||||
})
|
||||
|
||||
expect(readyNodes).toContain(otherBranchId)
|
||||
expect(readyNodes).not.toContain(parallelStartId)
|
||||
@@ -2626,6 +2752,171 @@ describe('EdgeManager', () => {
|
||||
expect(readyNodes).not.toContain(afterLoopId)
|
||||
})
|
||||
|
||||
it('should not queue sentinel-end when condition selects no-edge path (loop)', () => {
|
||||
// Bug scenario: condition → (if) → sentinel_start → body → sentinel_end → (loop_exit) → after_loop
|
||||
// → (else) → [NO outgoing edge]
|
||||
// Condition evaluates false, else is selected but has no edge.
|
||||
// With selectedOption set (routing decision made), cascadeTargets should NOT be queued.
|
||||
// Previously sentinel_end was queued via cascadeTargets, causing downstream blocks to execute.
|
||||
|
||||
const conditionId = 'condition'
|
||||
const sentinelStartId = 'sentinel-start'
|
||||
const loopBodyId = 'loop-body'
|
||||
const sentinelEndId = 'sentinel-end'
|
||||
const afterLoopId = 'after-loop'
|
||||
|
||||
const conditionNode = createMockNode(conditionId, [
|
||||
{ target: sentinelStartId, sourceHandle: 'condition-if-id' },
|
||||
])
|
||||
|
||||
const sentinelStartNode = createMockNode(
|
||||
sentinelStartId,
|
||||
[{ target: loopBodyId }],
|
||||
[conditionId]
|
||||
)
|
||||
|
||||
const loopBodyNode = createMockNode(
|
||||
loopBodyId,
|
||||
[{ target: sentinelEndId }],
|
||||
[sentinelStartId]
|
||||
)
|
||||
|
||||
const sentinelEndNode = createMockNode(
|
||||
sentinelEndId,
|
||||
[
|
||||
{ target: sentinelStartId, sourceHandle: 'loop_continue' },
|
||||
{ target: afterLoopId, sourceHandle: 'loop_exit' },
|
||||
],
|
||||
[loopBodyId]
|
||||
)
|
||||
|
||||
const afterLoopNode = createMockNode(afterLoopId, [], [sentinelEndId])
|
||||
|
||||
const nodes = new Map<string, DAGNode>([
|
||||
[conditionId, conditionNode],
|
||||
[sentinelStartId, sentinelStartNode],
|
||||
[loopBodyId, loopBodyNode],
|
||||
[sentinelEndId, sentinelEndNode],
|
||||
[afterLoopId, afterLoopNode],
|
||||
])
|
||||
|
||||
const dag = createMockDAG(nodes)
|
||||
const edgeManager = new EdgeManager(dag)
|
||||
|
||||
// Condition selected else, but else has no outgoing edge.
|
||||
// selectedOption is set (routing decision was made).
|
||||
const readyNodes = edgeManager.processOutgoingEdges(conditionNode, {
|
||||
selectedOption: 'else-id',
|
||||
})
|
||||
|
||||
// Nothing should be queued -- the entire branch is intentionally dead
|
||||
expect(readyNodes).not.toContain(sentinelStartId)
|
||||
expect(readyNodes).not.toContain(loopBodyId)
|
||||
expect(readyNodes).not.toContain(sentinelEndId)
|
||||
expect(readyNodes).not.toContain(afterLoopId)
|
||||
expect(readyNodes).toHaveLength(0)
|
||||
})
|
||||
|
||||
it('should not queue sentinel-end when condition selects no-edge path (parallel)', () => {
|
||||
// Same scenario with parallel instead of loop
|
||||
const conditionId = 'condition'
|
||||
const parallelStartId = 'parallel-start'
|
||||
const branchId = 'branch-0'
|
||||
const parallelEndId = 'parallel-end'
|
||||
const afterParallelId = 'after-parallel'
|
||||
|
||||
const conditionNode = createMockNode(conditionId, [
|
||||
{ target: parallelStartId, sourceHandle: 'condition-if-id' },
|
||||
])
|
||||
|
||||
const parallelStartNode = createMockNode(
|
||||
parallelStartId,
|
||||
[{ target: branchId }],
|
||||
[conditionId]
|
||||
)
|
||||
|
||||
const branchNode = createMockNode(
|
||||
branchId,
|
||||
[{ target: parallelEndId, sourceHandle: 'parallel_exit' }],
|
||||
[parallelStartId]
|
||||
)
|
||||
|
||||
const parallelEndNode = createMockNode(
|
||||
parallelEndId,
|
||||
[{ target: afterParallelId, sourceHandle: 'parallel_exit' }],
|
||||
[branchId]
|
||||
)
|
||||
|
||||
const afterParallelNode = createMockNode(afterParallelId, [], [parallelEndId])
|
||||
|
||||
const nodes = new Map<string, DAGNode>([
|
||||
[conditionId, conditionNode],
|
||||
[parallelStartId, parallelStartNode],
|
||||
[branchId, branchNode],
|
||||
[parallelEndId, parallelEndNode],
|
||||
[afterParallelId, afterParallelNode],
|
||||
])
|
||||
|
||||
const dag = createMockDAG(nodes)
|
||||
const edgeManager = new EdgeManager(dag)
|
||||
|
||||
const readyNodes = edgeManager.processOutgoingEdges(conditionNode, {
|
||||
selectedOption: 'else-id',
|
||||
})
|
||||
|
||||
expect(readyNodes).not.toContain(parallelStartId)
|
||||
expect(readyNodes).not.toContain(branchId)
|
||||
expect(readyNodes).not.toContain(parallelEndId)
|
||||
expect(readyNodes).not.toContain(afterParallelId)
|
||||
expect(readyNodes).toHaveLength(0)
|
||||
})
|
||||
|
||||
it('should still queue sentinel-end inside loop when no condition matches (true dead-end)', () => {
|
||||
// Contrast: condition INSIDE a loop with selectedOption null (no match, no routing decision).
|
||||
// This is a true dead-end where cascadeTargets SHOULD fire so the loop sentinel can handle exit.
|
||||
|
||||
const sentinelStartId = 'sentinel-start'
|
||||
const sentinelEndId = 'sentinel-end'
|
||||
const conditionId = 'condition'
|
||||
const nodeAId = 'node-a'
|
||||
|
||||
const sentinelStartNode = createMockNode(sentinelStartId, [{ target: conditionId }])
|
||||
const conditionNode = createMockNode(
|
||||
conditionId,
|
||||
[{ target: nodeAId, sourceHandle: 'condition-if' }],
|
||||
[sentinelStartId]
|
||||
)
|
||||
const nodeANode = createMockNode(nodeAId, [{ target: sentinelEndId }], [conditionId])
|
||||
const sentinelEndNode = createMockNode(
|
||||
sentinelEndId,
|
||||
[
|
||||
{ target: sentinelStartId, sourceHandle: 'loop_continue' },
|
||||
{ target: 'after-loop', sourceHandle: 'loop_exit' },
|
||||
],
|
||||
[nodeAId]
|
||||
)
|
||||
|
||||
const nodes = new Map<string, DAGNode>([
|
||||
[sentinelStartId, sentinelStartNode],
|
||||
[conditionId, conditionNode],
|
||||
[nodeAId, nodeANode],
|
||||
[sentinelEndId, sentinelEndNode],
|
||||
])
|
||||
|
||||
const dag = createMockDAG(nodes)
|
||||
const edgeManager = new EdgeManager(dag)
|
||||
|
||||
conditionNode.incomingEdges.clear()
|
||||
|
||||
// selectedOption: null → no routing decision, true dead-end
|
||||
const readyNodes = edgeManager.processOutgoingEdges(conditionNode, {
|
||||
selectedOption: null,
|
||||
})
|
||||
|
||||
// sentinel-end SHOULD be queued (true dead-end inside loop)
|
||||
expect(readyNodes).toContain(sentinelEndId)
|
||||
})
|
||||
|
||||
it('should still correctly handle normal loop exit (not deactivate when loop runs)', () => {
|
||||
// When a loop actually executes and exits normally, after_loop should become ready
|
||||
const sentinelStartId = 'sentinel-start'
|
||||
|
||||
@@ -69,15 +69,23 @@ export class EdgeManager {
|
||||
}
|
||||
}
|
||||
|
||||
const isDeadEnd = activatedTargets.length === 0
|
||||
const isRoutedDeadEnd = isDeadEnd && !!(output.selectedOption || output.selectedRoute)
|
||||
|
||||
for (const targetId of cascadeTargets) {
|
||||
if (!readyNodes.includes(targetId) && !activatedTargets.includes(targetId)) {
|
||||
// Only queue cascade terminal control nodes when ALL outgoing edges from the
|
||||
// current node were deactivated (dead-end scenario). When some edges are
|
||||
// activated, terminal control nodes on deactivated branches should NOT be
|
||||
// queued - they will be reached through the normal activated path's completion.
|
||||
// This prevents loop/parallel sentinels on fully deactivated paths (e.g., an
|
||||
// upstream condition took a different branch) from being spuriously executed.
|
||||
if (activatedTargets.length === 0 && this.isTargetReady(targetId)) {
|
||||
if (!isDeadEnd || !this.isTargetReady(targetId)) continue
|
||||
|
||||
if (isRoutedDeadEnd) {
|
||||
// A condition/router deliberately selected a dead-end path.
|
||||
// Only queue the sentinel if it belongs to the SAME subflow as the
|
||||
// current node (the condition is inside the loop/parallel and the
|
||||
// loop still needs to continue/exit). Downstream subflow sentinels
|
||||
// should NOT fire.
|
||||
if (this.isEnclosingSentinel(node, targetId)) {
|
||||
readyNodes.push(targetId)
|
||||
}
|
||||
} else {
|
||||
readyNodes.push(targetId)
|
||||
}
|
||||
}
|
||||
@@ -145,6 +153,27 @@ export class EdgeManager {
|
||||
return targetNode ? this.isNodeReady(targetNode) : false
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks if the cascade target sentinel belongs to the same subflow as the source node.
|
||||
* A condition inside a loop that hits a dead-end should still allow the enclosing
|
||||
* loop's sentinel to fire so the loop can continue or exit.
|
||||
*/
|
||||
private isEnclosingSentinel(sourceNode: DAGNode, sentinelId: string): boolean {
|
||||
const sentinel = this.dag.nodes.get(sentinelId)
|
||||
if (!sentinel?.metadata.isSentinel) return false
|
||||
|
||||
const sourceLoopId = sourceNode.metadata.loopId
|
||||
const sourceParallelId = sourceNode.metadata.parallelId
|
||||
const sentinelLoopId = sentinel.metadata.loopId
|
||||
const sentinelParallelId = sentinel.metadata.parallelId
|
||||
|
||||
if (sourceLoopId && sentinelLoopId && sourceLoopId === sentinelLoopId) return true
|
||||
if (sourceParallelId && sentinelParallelId && sourceParallelId === sentinelParallelId)
|
||||
return true
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
private isLoopEdge(handle?: string): boolean {
|
||||
return (
|
||||
handle === EDGE.LOOP_CONTINUE ||
|
||||
|
||||
@@ -555,7 +555,7 @@ describe('ConditionBlockHandler', () => {
|
||||
})
|
||||
|
||||
describe('Condition with no outgoing edge', () => {
|
||||
it('should return null path when condition matches but has no edge', async () => {
|
||||
it('should set selectedOption when condition matches but has no edge', async () => {
|
||||
const conditions = [
|
||||
{ id: 'cond1', title: 'if', value: 'true' },
|
||||
{ id: 'else1', title: 'else', value: '' },
|
||||
@@ -570,9 +570,52 @@ describe('ConditionBlockHandler', () => {
|
||||
|
||||
const result = await handler.execute(mockContext, mockBlock, inputs)
|
||||
|
||||
// Condition matches but no edge for it
|
||||
expect((result as any).conditionResult).toBe(false)
|
||||
expect((result as any).conditionResult).toBe(true)
|
||||
expect((result as any).selectedPath).toBeNull()
|
||||
expect((result as any).selectedOption).toBe('cond1')
|
||||
expect(mockContext.decisions.condition.get(mockBlock.id)).toBe('cond1')
|
||||
})
|
||||
|
||||
it('should set selectedOption when else is selected but has no edge', async () => {
|
||||
const conditions = [
|
||||
{ id: 'cond1', title: 'if', value: 'false' },
|
||||
{ id: 'else1', title: 'else', value: '' },
|
||||
]
|
||||
const inputs = { conditions: JSON.stringify(conditions) }
|
||||
|
||||
// Only the if branch has an edge; else has no outgoing connection
|
||||
mockContext.workflow!.connections = [
|
||||
{ source: mockSourceBlock.id, target: mockBlock.id },
|
||||
{ source: mockBlock.id, target: mockTargetBlock1.id, sourceHandle: 'condition-cond1' },
|
||||
]
|
||||
|
||||
const result = await handler.execute(mockContext, mockBlock, inputs)
|
||||
|
||||
expect((result as any).conditionResult).toBe(true)
|
||||
expect((result as any).selectedPath).toBeNull()
|
||||
expect((result as any).selectedOption).toBe('else1')
|
||||
expect(mockContext.decisions.condition.get(mockBlock.id)).toBe('else1')
|
||||
})
|
||||
|
||||
it('should deactivate if-path when else is selected with no edge', async () => {
|
||||
const conditions = [
|
||||
{ id: 'cond1', title: 'if', value: 'context.value > 100' },
|
||||
{ id: 'else1', title: 'else', value: '' },
|
||||
]
|
||||
const inputs = { conditions: JSON.stringify(conditions) }
|
||||
|
||||
// Only the if branch has an edge to a loop; else has nothing
|
||||
mockContext.workflow!.connections = [
|
||||
{ source: mockSourceBlock.id, target: mockBlock.id },
|
||||
{ source: mockBlock.id, target: mockTargetBlock1.id, sourceHandle: 'condition-cond1' },
|
||||
]
|
||||
|
||||
const result = await handler.execute(mockContext, mockBlock, inputs)
|
||||
|
||||
// Else was selected (value 10 is not > 100), so selectedOption should be 'else1'
|
||||
// This allows the edge manager to deactivate the cond1 edge
|
||||
expect((result as any).selectedOption).toBe('else1')
|
||||
expect((result as any).conditionResult).toBe(true)
|
||||
})
|
||||
})
|
||||
|
||||
@@ -602,6 +645,67 @@ describe('ConditionBlockHandler', () => {
|
||||
})
|
||||
})
|
||||
|
||||
describe('Source output filtering', () => {
|
||||
it('should not propagate error field from source block output', async () => {
|
||||
;(mockContext.blockStates as any).set(mockSourceBlock.id, {
|
||||
output: { value: 10, text: 'hello', error: 'upstream block failed' },
|
||||
executed: true,
|
||||
executionTime: 100,
|
||||
})
|
||||
|
||||
const conditions = [
|
||||
{ id: 'cond1', title: 'if', value: 'context.value > 5' },
|
||||
{ id: 'else1', title: 'else', value: '' },
|
||||
]
|
||||
const inputs = { conditions: JSON.stringify(conditions) }
|
||||
|
||||
const result = await handler.execute(mockContext, mockBlock, inputs)
|
||||
|
||||
expect((result as any).conditionResult).toBe(true)
|
||||
expect((result as any).selectedOption).toBe('cond1')
|
||||
expect(result).not.toHaveProperty('error')
|
||||
})
|
||||
|
||||
it('should not propagate _pauseMetadata from source block output', async () => {
|
||||
;(mockContext.blockStates as any).set(mockSourceBlock.id, {
|
||||
output: { value: 10, _pauseMetadata: { contextId: 'abc' } },
|
||||
executed: true,
|
||||
executionTime: 100,
|
||||
})
|
||||
|
||||
const conditions = [
|
||||
{ id: 'cond1', title: 'if', value: 'context.value > 5' },
|
||||
{ id: 'else1', title: 'else', value: '' },
|
||||
]
|
||||
const inputs = { conditions: JSON.stringify(conditions) }
|
||||
|
||||
const result = await handler.execute(mockContext, mockBlock, inputs)
|
||||
|
||||
expect((result as any).conditionResult).toBe(true)
|
||||
expect(result).not.toHaveProperty('_pauseMetadata')
|
||||
})
|
||||
|
||||
it('should still pass through non-control fields from source output', async () => {
|
||||
;(mockContext.blockStates as any).set(mockSourceBlock.id, {
|
||||
output: { value: 10, text: 'hello', customData: { nested: true } },
|
||||
executed: true,
|
||||
executionTime: 100,
|
||||
})
|
||||
|
||||
const conditions = [
|
||||
{ id: 'cond1', title: 'if', value: 'context.value > 5' },
|
||||
{ id: 'else1', title: 'else', value: '' },
|
||||
]
|
||||
const inputs = { conditions: JSON.stringify(conditions) }
|
||||
|
||||
const result = await handler.execute(mockContext, mockBlock, inputs)
|
||||
|
||||
expect((result as any).value).toBe(10)
|
||||
expect((result as any).text).toBe('hello')
|
||||
expect((result as any).customData).toEqual({ nested: true })
|
||||
})
|
||||
})
|
||||
|
||||
describe('Virtual block ID handling', () => {
|
||||
it('should use currentVirtualBlockId for decision key when available', async () => {
|
||||
mockContext.currentVirtualBlockId = 'virtual-block-123'
|
||||
|
||||
@@ -108,9 +108,7 @@ export class ConditionBlockHandler implements BlockHandler {
|
||||
const evalContext = this.buildEvaluationContext(ctx, sourceBlockId)
|
||||
const rawSourceOutput = sourceBlockId ? ctx.blockStates.get(sourceBlockId)?.output : null
|
||||
|
||||
// Filter out _pauseMetadata from source output to prevent the engine from
|
||||
// thinking this block is pausing (it was already resumed by the HITL block)
|
||||
const sourceOutput = this.filterPauseMetadata(rawSourceOutput)
|
||||
const sourceOutput = this.filterSourceOutput(rawSourceOutput)
|
||||
|
||||
const outgoingConnections = ctx.workflow?.connections.filter(
|
||||
(conn) => conn.source === baseBlockId
|
||||
@@ -124,7 +122,7 @@ export class ConditionBlockHandler implements BlockHandler {
|
||||
block.id
|
||||
)
|
||||
|
||||
if (!selectedConnection || !selectedCondition) {
|
||||
if (!selectedCondition) {
|
||||
return {
|
||||
...((sourceOutput as any) || {}),
|
||||
conditionResult: false,
|
||||
@@ -133,6 +131,17 @@ export class ConditionBlockHandler implements BlockHandler {
|
||||
}
|
||||
}
|
||||
|
||||
if (!selectedConnection) {
|
||||
const decisionKey = ctx.currentVirtualBlockId || block.id
|
||||
ctx.decisions.condition.set(decisionKey, selectedCondition.id)
|
||||
return {
|
||||
...((sourceOutput as any) || {}),
|
||||
conditionResult: true,
|
||||
selectedPath: null,
|
||||
selectedOption: selectedCondition.id,
|
||||
}
|
||||
}
|
||||
|
||||
const targetBlock = ctx.workflow?.blocks.find((b) => b.id === selectedConnection?.target)
|
||||
if (!targetBlock) {
|
||||
throw new Error(`Target block ${selectedConnection?.target} not found`)
|
||||
@@ -153,11 +162,11 @@ export class ConditionBlockHandler implements BlockHandler {
|
||||
}
|
||||
}
|
||||
|
||||
private filterPauseMetadata(output: any): any {
|
||||
private filterSourceOutput(output: any): any {
|
||||
if (!output || typeof output !== 'object') {
|
||||
return output
|
||||
}
|
||||
const { _pauseMetadata, ...rest } = output
|
||||
const { _pauseMetadata, error, ...rest } = output
|
||||
return rest
|
||||
}
|
||||
|
||||
@@ -223,8 +232,7 @@ export class ConditionBlockHandler implements BlockHandler {
|
||||
if (connection) {
|
||||
return { selectedConnection: connection, selectedCondition: condition }
|
||||
}
|
||||
// Condition is true but has no outgoing edge - branch ends gracefully
|
||||
return { selectedConnection: null, selectedCondition: null }
|
||||
return { selectedConnection: null, selectedCondition: condition }
|
||||
}
|
||||
} catch (error: any) {
|
||||
logger.error(`Failed to evaluate condition "${condition.title}": ${error.message}`)
|
||||
@@ -238,7 +246,7 @@ export class ConditionBlockHandler implements BlockHandler {
|
||||
if (elseConnection) {
|
||||
return { selectedConnection: elseConnection, selectedCondition: elseCondition }
|
||||
}
|
||||
return { selectedConnection: null, selectedCondition: null }
|
||||
return { selectedConnection: null, selectedCondition: elseCondition }
|
||||
}
|
||||
|
||||
return { selectedConnection: null, selectedCondition: null }
|
||||
|
||||
@@ -21,6 +21,7 @@ import {
|
||||
buildParallelSentinelStartId,
|
||||
buildSentinelEndId,
|
||||
buildSentinelStartId,
|
||||
emitEmptySubflowEvents,
|
||||
extractBaseBlockId,
|
||||
resolveArrayInput,
|
||||
validateMaxCount,
|
||||
@@ -596,6 +597,7 @@ export class LoopOrchestrator {
|
||||
if (!scope.items || scope.items.length === 0) {
|
||||
logger.info('ForEach loop has empty collection, skipping loop body', { loopId })
|
||||
this.state.setBlockOutput(loopId, { results: [] }, DEFAULTS.EXECUTION_TIME)
|
||||
emitEmptySubflowEvents(ctx, loopId, 'loop', this.contextExtensions)
|
||||
return false
|
||||
}
|
||||
return true
|
||||
@@ -605,6 +607,7 @@ export class LoopOrchestrator {
|
||||
if (scope.maxIterations === 0) {
|
||||
logger.info('For loop has 0 iterations, skipping loop body', { loopId })
|
||||
this.state.setBlockOutput(loopId, { results: [] }, DEFAULTS.EXECUTION_TIME)
|
||||
emitEmptySubflowEvents(ctx, loopId, 'loop', this.contextExtensions)
|
||||
return false
|
||||
}
|
||||
return true
|
||||
@@ -617,6 +620,8 @@ export class LoopOrchestrator {
|
||||
if (scope.loopType === 'while') {
|
||||
if (!scope.condition) {
|
||||
logger.warn('No condition defined for while loop', { loopId })
|
||||
this.state.setBlockOutput(loopId, { results: [] }, DEFAULTS.EXECUTION_TIME)
|
||||
emitEmptySubflowEvents(ctx, loopId, 'loop', this.contextExtensions)
|
||||
return false
|
||||
}
|
||||
|
||||
@@ -627,6 +632,11 @@ export class LoopOrchestrator {
|
||||
result,
|
||||
})
|
||||
|
||||
if (!result) {
|
||||
this.state.setBlockOutput(loopId, { results: [] }, DEFAULTS.EXECUTION_TIME)
|
||||
emitEmptySubflowEvents(ctx, loopId, 'loop', this.contextExtensions)
|
||||
}
|
||||
|
||||
return result
|
||||
}
|
||||
|
||||
|
||||
@@ -13,6 +13,7 @@ import { buildContainerIterationContext } from '@/executor/utils/iteration-conte
|
||||
import { ParallelExpander } from '@/executor/utils/parallel-expansion'
|
||||
import {
|
||||
addSubflowErrorLog,
|
||||
emitEmptySubflowEvents,
|
||||
extractBranchIndex,
|
||||
resolveArrayInput,
|
||||
validateMaxCount,
|
||||
@@ -108,6 +109,8 @@ export class ParallelOrchestrator {
|
||||
|
||||
this.state.setBlockOutput(parallelId, { results: [] })
|
||||
|
||||
emitEmptySubflowEvents(ctx, parallelId, 'parallel', this.contextExtensions)
|
||||
|
||||
logger.info('Parallel scope initialized with empty distribution, skipping body', {
|
||||
parallelId,
|
||||
branchCount: 0,
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
import { LOOP, PARALLEL, REFERENCE } from '@/executor/constants'
|
||||
import { DEFAULTS, LOOP, PARALLEL, REFERENCE } from '@/executor/constants'
|
||||
import type { ContextExtensions } from '@/executor/execution/types'
|
||||
import { type BlockLog, type ExecutionContext, getNextExecutionOrder } from '@/executor/types'
|
||||
import { buildContainerIterationContext } from '@/executor/utils/iteration-context'
|
||||
import type { VariableResolver } from '@/executor/variables/resolver'
|
||||
|
||||
const BRANCH_PATTERN = new RegExp(`${PARALLEL.BRANCH.PREFIX}\\d+${PARALLEL.BRANCH.SUFFIX}$`)
|
||||
@@ -309,3 +310,54 @@ export function addSubflowErrorLog(
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Emits block log + SSE events for a loop/parallel that was skipped due to an
|
||||
* empty collection or false initial condition. This ensures the container block
|
||||
* appears in terminal logs, execution snapshots, and edge highlighting.
|
||||
*/
|
||||
export function emitEmptySubflowEvents(
|
||||
ctx: ExecutionContext,
|
||||
blockId: string,
|
||||
blockType: 'loop' | 'parallel',
|
||||
contextExtensions: ContextExtensions | null
|
||||
): void {
|
||||
const now = new Date().toISOString()
|
||||
const executionOrder = getNextExecutionOrder(ctx)
|
||||
const output = { results: [] }
|
||||
const block = ctx.workflow?.blocks.find((b) => b.id === blockId)
|
||||
const blockName = block?.metadata?.name ?? blockType
|
||||
const iterationContext = buildContainerIterationContext(ctx, blockId)
|
||||
|
||||
ctx.blockLogs.push({
|
||||
blockId,
|
||||
blockName,
|
||||
blockType,
|
||||
startedAt: now,
|
||||
endedAt: now,
|
||||
durationMs: DEFAULTS.EXECUTION_TIME,
|
||||
success: true,
|
||||
output,
|
||||
executionOrder,
|
||||
})
|
||||
|
||||
if (contextExtensions?.onBlockStart) {
|
||||
contextExtensions.onBlockStart(blockId, blockName, blockType, executionOrder)
|
||||
}
|
||||
|
||||
if (contextExtensions?.onBlockComplete) {
|
||||
contextExtensions.onBlockComplete(
|
||||
blockId,
|
||||
blockName,
|
||||
blockType,
|
||||
{
|
||||
output,
|
||||
executionTime: DEFAULTS.EXECUTION_TIME,
|
||||
startedAt: now,
|
||||
executionOrder,
|
||||
endedAt: now,
|
||||
},
|
||||
iterationContext
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -761,9 +761,17 @@ function groupIterationBlocksRecursive(
|
||||
}
|
||||
}
|
||||
|
||||
// Non-iteration spans that aren't consumed container sentinels go straight to result
|
||||
const containerIdsWithIterations = new Set<string>()
|
||||
for (const span of iterationSpans) {
|
||||
const outermost = getOutermostContainer(span)
|
||||
if (outermost) containerIdsWithIterations.add(outermost.containerId)
|
||||
}
|
||||
|
||||
const nonContainerSpans = nonIterationSpans.filter(
|
||||
(span) => (span.type !== 'parallel' && span.type !== 'loop') || span.status === 'error'
|
||||
(span) =>
|
||||
(span.type !== 'parallel' && span.type !== 'loop') ||
|
||||
span.status === 'error' ||
|
||||
(span.blockId && !containerIdsWithIterations.has(span.blockId))
|
||||
)
|
||||
|
||||
if (iterationSpans.length === 0) {
|
||||
|
||||
183
apps/sim/lib/workflows/migrations/subblock-migrations.test.ts
Normal file
183
apps/sim/lib/workflows/migrations/subblock-migrations.test.ts
Normal file
@@ -0,0 +1,183 @@
|
||||
/**
|
||||
* @vitest-environment node
|
||||
*/
|
||||
import { describe, expect, it } from 'vitest'
|
||||
import type { BlockState } from '@/stores/workflows/workflow/types'
|
||||
import { migrateSubblockIds } from './subblock-migrations'
|
||||
|
||||
function makeBlock(overrides: Partial<BlockState> & { type: string }): BlockState {
|
||||
return {
|
||||
id: 'block-1',
|
||||
name: 'Test',
|
||||
position: { x: 0, y: 0 },
|
||||
subBlocks: {},
|
||||
outputs: {},
|
||||
enabled: true,
|
||||
...overrides,
|
||||
} as BlockState
|
||||
}
|
||||
|
||||
describe('migrateSubblockIds', () => {
|
||||
describe('knowledge block', () => {
|
||||
it('should rename knowledgeBaseId to knowledgeBaseSelector', () => {
|
||||
const input: Record<string, BlockState> = {
|
||||
b1: makeBlock({
|
||||
type: 'knowledge',
|
||||
subBlocks: {
|
||||
operation: { id: 'operation', type: 'dropdown', value: 'search' },
|
||||
knowledgeBaseId: {
|
||||
id: 'knowledgeBaseId',
|
||||
type: 'knowledge-base-selector',
|
||||
value: 'kb-uuid-123',
|
||||
},
|
||||
},
|
||||
}),
|
||||
}
|
||||
|
||||
const { blocks, migrated } = migrateSubblockIds(input)
|
||||
|
||||
expect(migrated).toBe(true)
|
||||
expect(blocks['b1'].subBlocks['knowledgeBaseSelector']).toEqual({
|
||||
id: 'knowledgeBaseSelector',
|
||||
type: 'knowledge-base-selector',
|
||||
value: 'kb-uuid-123',
|
||||
})
|
||||
expect(blocks['b1'].subBlocks['knowledgeBaseId']).toBeUndefined()
|
||||
expect(blocks['b1'].subBlocks['operation'].value).toBe('search')
|
||||
})
|
||||
|
||||
it('should prefer new key when both old and new exist', () => {
|
||||
const input: Record<string, BlockState> = {
|
||||
b1: makeBlock({
|
||||
type: 'knowledge',
|
||||
subBlocks: {
|
||||
knowledgeBaseId: {
|
||||
id: 'knowledgeBaseId',
|
||||
type: 'knowledge-base-selector',
|
||||
value: 'stale-kb',
|
||||
},
|
||||
knowledgeBaseSelector: {
|
||||
id: 'knowledgeBaseSelector',
|
||||
type: 'knowledge-base-selector',
|
||||
value: 'fresh-kb',
|
||||
},
|
||||
},
|
||||
}),
|
||||
}
|
||||
|
||||
const { blocks, migrated } = migrateSubblockIds(input)
|
||||
|
||||
expect(migrated).toBe(true)
|
||||
expect(blocks['b1'].subBlocks['knowledgeBaseSelector'].value).toBe('fresh-kb')
|
||||
expect(blocks['b1'].subBlocks['knowledgeBaseId']).toBeUndefined()
|
||||
})
|
||||
|
||||
it('should not touch blocks that already use the new key', () => {
|
||||
const input: Record<string, BlockState> = {
|
||||
b1: makeBlock({
|
||||
type: 'knowledge',
|
||||
subBlocks: {
|
||||
knowledgeBaseSelector: {
|
||||
id: 'knowledgeBaseSelector',
|
||||
type: 'knowledge-base-selector',
|
||||
value: 'kb-uuid',
|
||||
},
|
||||
},
|
||||
}),
|
||||
}
|
||||
|
||||
const { blocks, migrated } = migrateSubblockIds(input)
|
||||
|
||||
expect(migrated).toBe(false)
|
||||
expect(blocks['b1'].subBlocks['knowledgeBaseSelector'].value).toBe('kb-uuid')
|
||||
})
|
||||
})
|
||||
|
||||
it('should not mutate the input blocks', () => {
|
||||
const input: Record<string, BlockState> = {
|
||||
b1: makeBlock({
|
||||
type: 'knowledge',
|
||||
subBlocks: {
|
||||
knowledgeBaseId: {
|
||||
id: 'knowledgeBaseId',
|
||||
type: 'knowledge-base-selector',
|
||||
value: 'kb-uuid',
|
||||
},
|
||||
},
|
||||
}),
|
||||
}
|
||||
|
||||
const { blocks } = migrateSubblockIds(input)
|
||||
|
||||
expect(input['b1'].subBlocks['knowledgeBaseId']).toBeDefined()
|
||||
expect(blocks['b1'].subBlocks['knowledgeBaseSelector']).toBeDefined()
|
||||
expect(blocks).not.toBe(input)
|
||||
})
|
||||
|
||||
it('should skip blocks with no registered migrations', () => {
|
||||
const input: Record<string, BlockState> = {
|
||||
b1: makeBlock({
|
||||
type: 'function',
|
||||
subBlocks: {
|
||||
code: { id: 'code', type: 'code', value: 'console.log("hi")' },
|
||||
},
|
||||
}),
|
||||
}
|
||||
|
||||
const { blocks, migrated } = migrateSubblockIds(input)
|
||||
|
||||
expect(migrated).toBe(false)
|
||||
expect(blocks['b1'].subBlocks['code'].value).toBe('console.log("hi")')
|
||||
})
|
||||
|
||||
it('should migrate multiple blocks in one pass', () => {
|
||||
const input: Record<string, BlockState> = {
|
||||
b1: makeBlock({
|
||||
id: 'b1',
|
||||
type: 'knowledge',
|
||||
subBlocks: {
|
||||
knowledgeBaseId: {
|
||||
id: 'knowledgeBaseId',
|
||||
type: 'knowledge-base-selector',
|
||||
value: 'kb-1',
|
||||
},
|
||||
},
|
||||
}),
|
||||
b2: makeBlock({
|
||||
id: 'b2',
|
||||
type: 'knowledge',
|
||||
subBlocks: {
|
||||
knowledgeBaseId: {
|
||||
id: 'knowledgeBaseId',
|
||||
type: 'knowledge-base-selector',
|
||||
value: 'kb-2',
|
||||
},
|
||||
},
|
||||
}),
|
||||
b3: makeBlock({
|
||||
id: 'b3',
|
||||
type: 'function',
|
||||
subBlocks: {
|
||||
code: { id: 'code', type: 'code', value: '' },
|
||||
},
|
||||
}),
|
||||
}
|
||||
|
||||
const { blocks, migrated } = migrateSubblockIds(input)
|
||||
|
||||
expect(migrated).toBe(true)
|
||||
expect(blocks['b1'].subBlocks['knowledgeBaseSelector'].value).toBe('kb-1')
|
||||
expect(blocks['b2'].subBlocks['knowledgeBaseSelector'].value).toBe('kb-2')
|
||||
expect(blocks['b3'].subBlocks['code']).toBeDefined()
|
||||
})
|
||||
|
||||
it('should handle blocks with empty subBlocks', () => {
|
||||
const input: Record<string, BlockState> = {
|
||||
b1: makeBlock({ type: 'knowledge', subBlocks: {} }),
|
||||
}
|
||||
|
||||
const { migrated } = migrateSubblockIds(input)
|
||||
|
||||
expect(migrated).toBe(false)
|
||||
})
|
||||
})
|
||||
90
apps/sim/lib/workflows/migrations/subblock-migrations.ts
Normal file
90
apps/sim/lib/workflows/migrations/subblock-migrations.ts
Normal file
@@ -0,0 +1,90 @@
|
||||
import { createLogger } from '@sim/logger'
|
||||
import type { BlockState } from '@/stores/workflows/workflow/types'
|
||||
|
||||
const logger = createLogger('SubblockMigrations')
|
||||
|
||||
/**
|
||||
* Maps old subblock IDs to their current equivalents per block type.
|
||||
*
|
||||
* When a subblock is renamed in a block definition, old deployed/saved states
|
||||
* still carry the value under the previous key. Without this mapping the
|
||||
* serializer silently drops the value, breaking execution.
|
||||
*
|
||||
* Format: { blockType: { oldSubblockId: newSubblockId } }
|
||||
*/
|
||||
export const SUBBLOCK_ID_MIGRATIONS: Record<string, Record<string, string>> = {
|
||||
knowledge: {
|
||||
knowledgeBaseId: 'knowledgeBaseSelector',
|
||||
},
|
||||
}
|
||||
|
||||
/**
|
||||
* Migrates legacy subblock IDs inside a single block's subBlocks map.
|
||||
* Returns a new subBlocks record if anything changed, or the original if not.
|
||||
*/
|
||||
function migrateBlockSubblockIds(
|
||||
subBlocks: Record<string, BlockState['subBlocks'][string]>,
|
||||
renames: Record<string, string>
|
||||
): { subBlocks: Record<string, BlockState['subBlocks'][string]>; migrated: boolean } {
|
||||
let migrated = false
|
||||
|
||||
for (const oldId of Object.keys(renames)) {
|
||||
if (oldId in subBlocks) {
|
||||
migrated = true
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if (!migrated) return { subBlocks, migrated: false }
|
||||
|
||||
const result = { ...subBlocks }
|
||||
|
||||
for (const [oldId, newId] of Object.entries(renames)) {
|
||||
if (!(oldId in result)) continue
|
||||
|
||||
if (newId in result) {
|
||||
delete result[oldId]
|
||||
continue
|
||||
}
|
||||
|
||||
const oldEntry = result[oldId]
|
||||
result[newId] = { ...oldEntry, id: newId }
|
||||
delete result[oldId]
|
||||
}
|
||||
|
||||
return { subBlocks: result, migrated: true }
|
||||
}
|
||||
|
||||
/**
|
||||
* Applies subblock-ID migrations to every block in a workflow.
|
||||
* Returns a new blocks record with migrated subBlocks where needed.
|
||||
*/
|
||||
export function migrateSubblockIds(blocks: Record<string, BlockState>): {
|
||||
blocks: Record<string, BlockState>
|
||||
migrated: boolean
|
||||
} {
|
||||
let anyMigrated = false
|
||||
const result: Record<string, BlockState> = {}
|
||||
|
||||
for (const [blockId, block] of Object.entries(blocks)) {
|
||||
const renames = SUBBLOCK_ID_MIGRATIONS[block.type]
|
||||
if (!renames || !block.subBlocks) {
|
||||
result[blockId] = block
|
||||
continue
|
||||
}
|
||||
|
||||
const { subBlocks, migrated } = migrateBlockSubblockIds(block.subBlocks, renames)
|
||||
if (migrated) {
|
||||
logger.info('Migrated legacy subblock IDs', {
|
||||
blockId: block.id,
|
||||
blockType: block.type,
|
||||
})
|
||||
anyMigrated = true
|
||||
result[blockId] = { ...block, subBlocks }
|
||||
} else {
|
||||
result[blockId] = block
|
||||
}
|
||||
}
|
||||
|
||||
return { blocks: result, migrated: anyMigrated }
|
||||
}
|
||||
@@ -14,6 +14,7 @@ import { and, desc, eq, inArray, sql } from 'drizzle-orm'
|
||||
import type { Edge } from 'reactflow'
|
||||
import { v4 as uuidv4 } from 'uuid'
|
||||
import type { DbOrTx } from '@/lib/db/types'
|
||||
import { migrateSubblockIds } from '@/lib/workflows/migrations/subblock-migrations'
|
||||
import { sanitizeAgentToolsInBlocks } from '@/lib/workflows/sanitization/validation'
|
||||
import type { BlockState, Loop, Parallel, WorkflowState } from '@/stores/workflows/workflow/types'
|
||||
import { SUBFLOW_TYPES } from '@/stores/workflows/workflow/types'
|
||||
@@ -113,10 +114,10 @@ export async function loadDeployedWorkflowState(
|
||||
resolvedWorkspaceId = wfRow?.workspaceId ?? undefined
|
||||
}
|
||||
|
||||
const resolvedBlocks = state.blocks || {}
|
||||
const { blocks: migratedBlocks } = resolvedWorkspaceId
|
||||
? await migrateCredentialIds(resolvedBlocks, resolvedWorkspaceId)
|
||||
: { blocks: resolvedBlocks }
|
||||
const { blocks: migratedBlocks } = await applyBlockMigrations(
|
||||
state.blocks || {},
|
||||
resolvedWorkspaceId
|
||||
)
|
||||
|
||||
return {
|
||||
blocks: migratedBlocks,
|
||||
@@ -133,6 +134,50 @@ export async function loadDeployedWorkflowState(
|
||||
}
|
||||
}
|
||||
|
||||
interface MigrationContext {
|
||||
blocks: Record<string, BlockState>
|
||||
workspaceId?: string
|
||||
migrated: boolean
|
||||
}
|
||||
|
||||
type BlockMigration = (ctx: MigrationContext) => MigrationContext | Promise<MigrationContext>
|
||||
|
||||
function createMigrationPipeline(migrations: BlockMigration[]) {
|
||||
return async (
|
||||
blocks: Record<string, BlockState>,
|
||||
workspaceId?: string
|
||||
): Promise<{ blocks: Record<string, BlockState>; migrated: boolean }> => {
|
||||
let ctx: MigrationContext = { blocks, workspaceId, migrated: false }
|
||||
for (const migration of migrations) {
|
||||
ctx = await migration(ctx)
|
||||
}
|
||||
return { blocks: ctx.blocks, migrated: ctx.migrated }
|
||||
}
|
||||
}
|
||||
|
||||
const applyBlockMigrations = createMigrationPipeline([
|
||||
(ctx) => {
|
||||
const { blocks } = sanitizeAgentToolsInBlocks(ctx.blocks)
|
||||
return { ...ctx, blocks }
|
||||
},
|
||||
|
||||
(ctx) => ({
|
||||
...ctx,
|
||||
blocks: migrateAgentBlocksToMessagesFormat(ctx.blocks),
|
||||
}),
|
||||
|
||||
async (ctx) => {
|
||||
if (!ctx.workspaceId) return ctx
|
||||
const { blocks, migrated } = await migrateCredentialIds(ctx.blocks, ctx.workspaceId)
|
||||
return { ...ctx, blocks, migrated: ctx.migrated || migrated }
|
||||
},
|
||||
|
||||
(ctx) => {
|
||||
const { blocks, migrated } = migrateSubblockIds(ctx.blocks)
|
||||
return { ...ctx, blocks, migrated: ctx.migrated || migrated }
|
||||
},
|
||||
])
|
||||
|
||||
/**
|
||||
* Migrates agent blocks from old format (systemPrompt/userPrompt) to new format (messages array)
|
||||
* This ensures backward compatibility for workflows created before the messages-input refactor.
|
||||
@@ -356,22 +401,16 @@ export async function loadWorkflowFromNormalizedTables(
|
||||
blocksMap[block.id] = assembled
|
||||
})
|
||||
|
||||
// Sanitize any invalid custom tools in agent blocks to prevent client crashes
|
||||
const { blocks: sanitizedBlocks } = sanitizeAgentToolsInBlocks(blocksMap)
|
||||
const { blocks: finalBlocks, migrated } = await applyBlockMigrations(
|
||||
blocksMap,
|
||||
workflowRow?.workspaceId ?? undefined
|
||||
)
|
||||
|
||||
// Migrate old agent block format (systemPrompt/userPrompt) to new messages array format
|
||||
const migratedBlocks = migrateAgentBlocksToMessagesFormat(sanitizedBlocks)
|
||||
|
||||
// Migrate legacy account.id → credential.id in OAuth subblocks
|
||||
const { blocks: credMigratedBlocks, migrated: credentialsMigrated } = workflowRow?.workspaceId
|
||||
? await migrateCredentialIds(migratedBlocks, workflowRow.workspaceId)
|
||||
: { blocks: migratedBlocks, migrated: false }
|
||||
|
||||
if (credentialsMigrated) {
|
||||
if (migrated) {
|
||||
Promise.resolve().then(async () => {
|
||||
try {
|
||||
for (const [blockId, block] of Object.entries(credMigratedBlocks)) {
|
||||
if (block.subBlocks !== migratedBlocks[blockId]?.subBlocks) {
|
||||
for (const [blockId, block] of Object.entries(finalBlocks)) {
|
||||
if (block.subBlocks !== blocksMap[blockId]?.subBlocks) {
|
||||
await db
|
||||
.update(workflowBlocks)
|
||||
.set({ subBlocks: block.subBlocks, updatedAt: new Date() })
|
||||
@@ -381,7 +420,7 @@ export async function loadWorkflowFromNormalizedTables(
|
||||
}
|
||||
}
|
||||
} catch (err) {
|
||||
logger.warn('Failed to persist credential ID migration', { workflowId, error: err })
|
||||
logger.warn('Failed to persist block migrations', { workflowId, error: err })
|
||||
}
|
||||
})
|
||||
}
|
||||
@@ -422,13 +461,13 @@ export async function loadWorkflowFromNormalizedTables(
|
||||
forEachItems: (config as Loop).forEachItems ?? '',
|
||||
whileCondition: (config as Loop).whileCondition ?? '',
|
||||
doWhileCondition: (config as Loop).doWhileCondition ?? '',
|
||||
enabled: credMigratedBlocks[subflow.id]?.enabled ?? true,
|
||||
enabled: finalBlocks[subflow.id]?.enabled ?? true,
|
||||
}
|
||||
loops[subflow.id] = loop
|
||||
|
||||
if (credMigratedBlocks[subflow.id]) {
|
||||
const block = credMigratedBlocks[subflow.id]
|
||||
credMigratedBlocks[subflow.id] = {
|
||||
if (finalBlocks[subflow.id]) {
|
||||
const block = finalBlocks[subflow.id]
|
||||
finalBlocks[subflow.id] = {
|
||||
...block,
|
||||
data: {
|
||||
...block.data,
|
||||
@@ -449,7 +488,7 @@ export async function loadWorkflowFromNormalizedTables(
|
||||
(config as Parallel).parallelType === 'collection'
|
||||
? (config as Parallel).parallelType
|
||||
: 'count',
|
||||
enabled: credMigratedBlocks[subflow.id]?.enabled ?? true,
|
||||
enabled: finalBlocks[subflow.id]?.enabled ?? true,
|
||||
}
|
||||
parallels[subflow.id] = parallel
|
||||
} else {
|
||||
@@ -458,7 +497,7 @@ export async function loadWorkflowFromNormalizedTables(
|
||||
})
|
||||
|
||||
return {
|
||||
blocks: credMigratedBlocks,
|
||||
blocks: finalBlocks,
|
||||
edges: edgesArray,
|
||||
loops,
|
||||
parallels,
|
||||
|
||||
@@ -122,6 +122,40 @@ export const PROVIDER_DEFINITIONS: Record<string, ProviderDefinition> = {
|
||||
},
|
||||
contextWindow: 128000,
|
||||
},
|
||||
{
|
||||
id: 'gpt-5.4',
|
||||
pricing: {
|
||||
input: 2.5,
|
||||
cachedInput: 0.25,
|
||||
output: 15.0,
|
||||
updatedAt: '2026-03-05',
|
||||
},
|
||||
capabilities: {
|
||||
reasoningEffort: {
|
||||
values: ['none', 'low', 'medium', 'high', 'xhigh'],
|
||||
},
|
||||
verbosity: {
|
||||
values: ['low', 'medium', 'high'],
|
||||
},
|
||||
maxOutputTokens: 128000,
|
||||
},
|
||||
contextWindow: 1050000,
|
||||
},
|
||||
{
|
||||
id: 'gpt-5.4-pro',
|
||||
pricing: {
|
||||
input: 30.0,
|
||||
output: 180.0,
|
||||
updatedAt: '2026-03-05',
|
||||
},
|
||||
capabilities: {
|
||||
reasoningEffort: {
|
||||
values: ['medium', 'high', 'xhigh'],
|
||||
},
|
||||
maxOutputTokens: 128000,
|
||||
},
|
||||
contextWindow: 1050000,
|
||||
},
|
||||
{
|
||||
id: 'gpt-5.2',
|
||||
pricing: {
|
||||
@@ -493,6 +527,25 @@ export const PROVIDER_DEFINITIONS: Record<string, ProviderDefinition> = {
|
||||
},
|
||||
contextWindow: 128000,
|
||||
},
|
||||
{
|
||||
id: 'azure/gpt-5.4',
|
||||
pricing: {
|
||||
input: 2.5,
|
||||
cachedInput: 0.25,
|
||||
output: 15.0,
|
||||
updatedAt: '2026-03-05',
|
||||
},
|
||||
capabilities: {
|
||||
reasoningEffort: {
|
||||
values: ['none', 'low', 'medium', 'high', 'xhigh'],
|
||||
},
|
||||
verbosity: {
|
||||
values: ['low', 'medium', 'high'],
|
||||
},
|
||||
maxOutputTokens: 128000,
|
||||
},
|
||||
contextWindow: 1050000,
|
||||
},
|
||||
{
|
||||
id: 'azure/gpt-5.2',
|
||||
pricing: {
|
||||
|
||||
@@ -523,13 +523,16 @@ describe('Model Capabilities', () => {
|
||||
|
||||
it.concurrent('should have GPT-5 models in both reasoning effort and verbosity arrays', () => {
|
||||
const gpt5ModelsWithReasoningEffort = MODELS_WITH_REASONING_EFFORT.filter(
|
||||
(m) => m.includes('gpt-5') && !m.includes('chat-latest')
|
||||
(m) => m.includes('gpt-5') && !m.includes('chat-latest') && !m.includes('gpt-5.4-pro')
|
||||
)
|
||||
const gpt5ModelsWithVerbosity = MODELS_WITH_VERBOSITY.filter(
|
||||
(m) => m.includes('gpt-5') && !m.includes('chat-latest')
|
||||
)
|
||||
expect(gpt5ModelsWithReasoningEffort.sort()).toEqual(gpt5ModelsWithVerbosity.sort())
|
||||
|
||||
expect(MODELS_WITH_REASONING_EFFORT).toContain('gpt-5.4-pro')
|
||||
expect(MODELS_WITH_VERBOSITY).not.toContain('gpt-5.4-pro')
|
||||
|
||||
expect(MODELS_WITH_REASONING_EFFORT).toContain('o1')
|
||||
expect(MODELS_WITH_VERBOSITY).not.toContain('o1')
|
||||
})
|
||||
|
||||
170
apps/sim/scripts/check-subblock-id-stability.ts
Normal file
170
apps/sim/scripts/check-subblock-id-stability.ts
Normal file
@@ -0,0 +1,170 @@
|
||||
#!/usr/bin/env bun
|
||||
|
||||
/**
|
||||
* CI check: detect subblock ID renames that would break deployed workflows.
|
||||
*
|
||||
* Compares the current block registry against the parent commit.
|
||||
* If any subblock ID was removed from a block, it must have a corresponding
|
||||
* entry in SUBBLOCK_ID_MIGRATIONS — otherwise this script exits non-zero.
|
||||
*
|
||||
* Usage:
|
||||
* bun run apps/sim/scripts/check-subblock-id-stability.ts [base-ref]
|
||||
*
|
||||
* base-ref defaults to HEAD~1. In a PR CI pipeline, pass the merge base:
|
||||
* bun run apps/sim/scripts/check-subblock-id-stability.ts origin/main
|
||||
*/
|
||||
|
||||
import { execSync } from 'child_process'
|
||||
import { SUBBLOCK_ID_MIGRATIONS } from '@/lib/workflows/migrations/subblock-migrations'
|
||||
import { getAllBlocks } from '@/blocks/registry'
|
||||
|
||||
const baseRef = process.argv[2] || 'HEAD~1'
|
||||
|
||||
const gitRoot = execSync('git rev-parse --show-toplevel', { encoding: 'utf-8' }).trim()
|
||||
const gitOpts = { encoding: 'utf-8' as const, cwd: gitRoot }
|
||||
|
||||
type IdMap = Record<string, Set<string>>
|
||||
|
||||
/**
|
||||
* Extracts subblock IDs from the `subBlocks: [ ... ]` section of a block
|
||||
* definition. Only grabs the top-level `id:` of each subblock object —
|
||||
* ignores nested IDs inside `options`, `columns`, etc.
|
||||
*/
|
||||
function extractSubBlockIds(source: string): string[] {
|
||||
const startIdx = source.indexOf('subBlocks:')
|
||||
if (startIdx === -1) return []
|
||||
|
||||
const bracketStart = source.indexOf('[', startIdx)
|
||||
if (bracketStart === -1) return []
|
||||
|
||||
const ids: string[] = []
|
||||
let braceDepth = 0
|
||||
let bracketDepth = 0
|
||||
let i = bracketStart + 1
|
||||
bracketDepth = 1
|
||||
|
||||
while (i < source.length && bracketDepth > 0) {
|
||||
const ch = source[i]
|
||||
|
||||
if (ch === '[') bracketDepth++
|
||||
else if (ch === ']') {
|
||||
bracketDepth--
|
||||
if (bracketDepth === 0) break
|
||||
} else if (ch === '{') {
|
||||
braceDepth++
|
||||
if (braceDepth === 1) {
|
||||
const ahead = source.slice(i, i + 200)
|
||||
const idMatch = ahead.match(/{\s*(?:\/\/[^\n]*\n\s*)*id:\s*['"]([^'"]+)['"]/)
|
||||
if (idMatch) {
|
||||
ids.push(idMatch[1])
|
||||
}
|
||||
}
|
||||
} else if (ch === '}') {
|
||||
braceDepth--
|
||||
}
|
||||
|
||||
i++
|
||||
}
|
||||
|
||||
return ids
|
||||
}
|
||||
|
||||
function getCurrentIds(): IdMap {
|
||||
const map: IdMap = {}
|
||||
for (const block of getAllBlocks()) {
|
||||
map[block.type] = new Set(block.subBlocks.map((sb) => sb.id))
|
||||
}
|
||||
return map
|
||||
}
|
||||
|
||||
function getPreviousIds(): IdMap {
|
||||
const registryPath = 'apps/sim/blocks/registry.ts'
|
||||
const blocksDir = 'apps/sim/blocks/blocks'
|
||||
|
||||
let hasChanges = false
|
||||
try {
|
||||
const diff = execSync(
|
||||
`git diff --name-only ${baseRef} HEAD -- ${registryPath} ${blocksDir}`,
|
||||
gitOpts
|
||||
).trim()
|
||||
hasChanges = diff.length > 0
|
||||
} catch {
|
||||
console.log('⚠ Could not diff against base ref — skipping check')
|
||||
process.exit(0)
|
||||
}
|
||||
|
||||
if (!hasChanges) {
|
||||
console.log('✓ No block definition changes detected — nothing to check')
|
||||
process.exit(0)
|
||||
}
|
||||
|
||||
const map: IdMap = {}
|
||||
|
||||
try {
|
||||
const blockFiles = execSync(`git ls-tree -r --name-only ${baseRef} -- ${blocksDir}`, gitOpts)
|
||||
.trim()
|
||||
.split('\n')
|
||||
.filter((f) => f.endsWith('.ts') && !f.endsWith('.test.ts'))
|
||||
|
||||
for (const filePath of blockFiles) {
|
||||
let content: string
|
||||
try {
|
||||
content = execSync(`git show ${baseRef}:${filePath}`, gitOpts)
|
||||
} catch {
|
||||
continue
|
||||
}
|
||||
|
||||
const typeMatch = content.match(/BlockConfig\s*=\s*\{[\s\S]*?type:\s*['"]([^'"]+)['"]/)
|
||||
if (!typeMatch) continue
|
||||
const blockType = typeMatch[1]
|
||||
|
||||
const ids = extractSubBlockIds(content)
|
||||
if (ids.length === 0) continue
|
||||
|
||||
map[blockType] = new Set(ids)
|
||||
}
|
||||
} catch (err) {
|
||||
console.log(`⚠ Could not read previous block files from ${baseRef} — skipping check`, err)
|
||||
process.exit(0)
|
||||
}
|
||||
|
||||
return map
|
||||
}
|
||||
|
||||
const previous = getPreviousIds()
|
||||
const current = getCurrentIds()
|
||||
const errors: string[] = []
|
||||
|
||||
for (const [blockType, prevIds] of Object.entries(previous)) {
|
||||
const currIds = current[blockType]
|
||||
if (!currIds) continue
|
||||
|
||||
const migrations = SUBBLOCK_ID_MIGRATIONS[blockType] ?? {}
|
||||
|
||||
for (const oldId of prevIds) {
|
||||
if (currIds.has(oldId)) continue
|
||||
|
||||
if (oldId in migrations) continue
|
||||
|
||||
errors.push(
|
||||
`Block "${blockType}": subblock ID "${oldId}" was removed.\n` +
|
||||
` → Add a migration in SUBBLOCK_ID_MIGRATIONS (lib/workflows/migrations/subblock-migrations.ts)\n` +
|
||||
` mapping "${oldId}" to its replacement ID.`
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
if (errors.length > 0) {
|
||||
console.error('✗ Subblock ID stability check FAILED\n')
|
||||
console.error(
|
||||
'Removing subblock IDs breaks deployed workflows.\n' +
|
||||
'Either revert the rename or add a migration entry.\n'
|
||||
)
|
||||
for (const err of errors) {
|
||||
console.error(` ${err}\n`)
|
||||
}
|
||||
process.exit(1)
|
||||
} else {
|
||||
console.log('✓ Subblock ID stability check passed')
|
||||
process.exit(0)
|
||||
}
|
||||
Reference in New Issue
Block a user