improvement(parallel): update parallel subflow to support conditional routing (#1444)

* improvement(parallel): update parallel subblock to support conditional routing

* ignore disconnected blocks in parallel/loop

* added utils

* fix z-index issues with edges in subflow

* fixed aggregation of results from loop & parallel blocks

* feat(manual-trigger): add manual trigger (#1452)

* feat(manual-trigger): add manual trigger

* consolidate input format extraction

* exclude triggers from console logs + deployed chat error surfacing

* works

* centralize error messages + logging for deployed chat

* fix(css-config): use correct version (#1453)

* fix(css-config): use correct version

* fix lint

* improvement(parallel): update parallel subblock to support conditional routing

* ignore disconnected blocks in parallel/loop

* added utils

* fix z-index issues with edges in subflow

* fixed aggregation of results from loop & parallel blocks

* change z index within component and remvoe global css

* fix cascade deletion subflows sockets case

* improve results array for subflows

* fix onedgeclick inside subflows

* fix test

---------

Co-authored-by: waleed <waleed>
Co-authored-by: Vikhyath Mondreti <vikhyath@simstudio.ai>
Co-authored-by: Vikhyath Mondreti <vikhyathvikku@gmail.com>
This commit is contained in:
Waleed
2025-09-25 18:31:34 -07:00
committed by GitHub
parent 18599ac3c3
commit f79e87e4c2
17 changed files with 738 additions and 219 deletions

View File

@@ -178,7 +178,7 @@ export const WorkflowEdge = ({
style={{
transform: `translate(-50%, -50%) translate(${labelX}px,${labelY}px)`,
pointerEvents: 'all',
zIndex: 22,
zIndex: 100,
}}
onClick={(e) => {
e.preventDefault()

View File

@@ -1751,9 +1751,7 @@ const WorkflowContent = React.memo(() => {
// An edge is inside a loop if either source or target has a parent
// If source and target have different parents, prioritize source's parent
const parentLoopId =
(sourceNode?.id && blocks[sourceNode.id]?.data?.parentId) ||
(targetNode?.id && blocks[targetNode.id]?.data?.parentId)
const parentLoopId = sourceNode?.parentId || targetNode?.parentId
// Create a unique identifier that combines edge ID and parent context
const contextId = `${edge.id}${parentLoopId ? `-${parentLoopId}` : ''}`
@@ -1772,9 +1770,7 @@ const WorkflowContent = React.memo(() => {
// Check if this edge connects nodes inside a loop
const sourceNode = getNodes().find((n) => n.id === edge.source)
const targetNode = getNodes().find((n) => n.id === edge.target)
const parentLoopId =
(sourceNode?.id && blocks[sourceNode.id]?.data?.parentId) ||
(targetNode?.id && blocks[targetNode.id]?.data?.parentId)
const parentLoopId = sourceNode?.parentId || targetNode?.parentId
const isInsideLoop = Boolean(parentLoopId)
// Create a unique context ID for this edge
@@ -1867,6 +1863,12 @@ const WorkflowContent = React.memo(() => {
return (
<div className='flex h-screen w-full flex-col overflow-hidden'>
<div className='relative h-full w-full flex-1 transition-all duration-200'>
<style jsx global>{`
/* Ensure edge labels (e.g., delete X) render above group/subflow nodes */
.react-flow__edge-labels {
z-index: 60 !important;
}
`}</style>
<div className='fixed top-0 right-0 z-10'>
<Panel />
</div>

View File

@@ -604,7 +604,8 @@ export const createParallelManagerMock = (options?: {
getIterationItem: vi.fn(),
areAllVirtualBlocksExecuted: vi
.fn()
.mockImplementation((parallelId, parallel, executedBlocks, state) => {
.mockImplementation((parallelId, parallel, executedBlocks, state, context) => {
// Simple mock implementation - check all blocks (ignoring conditional routing for tests)
for (const nodeId of parallel.nodes) {
for (let i = 0; i < state.parallelCount; i++) {
const virtualBlockId = `${nodeId}_parallel_${parallelId}_iteration_${i}`

View File

@@ -14,8 +14,6 @@ export enum BlockType {
RESPONSE = 'response',
WORKFLOW = 'workflow',
STARTER = 'starter',
SCHEDULE = 'schedule',
WEBHOOK_TRIGGER = 'webhook_trigger',
}
/**

View File

@@ -194,8 +194,9 @@ export class ConditionBlockHandler implements BlockHandler {
`Condition block ${block.id} selected path: ${selectedCondition.title} (${selectedCondition.id}) -> ${targetBlock.metadata?.name || targetBlock.id}`
)
// Update context decisions
context.decisions.condition.set(block.id, selectedCondition.id)
// Update context decisions - use virtual block ID if available (for parallel execution)
const decisionKey = context.currentVirtualBlockId || block.id
context.decisions.condition.set(decisionKey, selectedCondition.id)
// Return output, preserving source output structure if possible
return {

View File

@@ -1,6 +1,7 @@
import { createLogger } from '@/lib/logs/console/logger'
import type { BlockOutput } from '@/blocks/types'
import { BlockType } from '@/executor/consts'
import { ParallelRoutingUtils } from '@/executor/parallels/utils'
import type { PathTracker } from '@/executor/path/path'
import type { InputResolver } from '@/executor/resolver/resolver'
import { Routing } from '@/executor/routing/routing'
@@ -338,17 +339,13 @@ export class ParallelBlockHandler implements BlockHandler {
if (!parallel || !parallelState) return false
// Check each node in the parallel for all iterations
for (const nodeId of parallel.nodes) {
for (let i = 0; i < parallelState.parallelCount; i++) {
const virtualBlockId = `${nodeId}_parallel_${parallelId}_iteration_${i}`
if (!context.executedBlocks.has(virtualBlockId)) {
return false
}
}
}
return true
// Use the shared utility that respects conditional routing
return ParallelRoutingUtils.areAllRequiredVirtualBlocksExecuted(
parallel,
parallelState.parallelCount,
context.executedBlocks,
context
)
}
/**

View File

@@ -20,6 +20,7 @@ import {
} from '@/executor/handlers'
import { LoopManager } from '@/executor/loops/loops'
import { ParallelManager } from '@/executor/parallels/parallels'
import { ParallelRoutingUtils } from '@/executor/parallels/utils'
import { PathTracker } from '@/executor/path/path'
import { InputResolver } from '@/executor/resolver/resolver'
import type {
@@ -31,6 +32,7 @@ import type {
StreamingExecution,
} from '@/executor/types'
import { streamingResponseFormatProcessor } from '@/executor/utils'
import { VirtualBlockUtils } from '@/executor/utils/virtual-blocks'
import type { SerializedBlock, SerializedWorkflow } from '@/serializer/types'
import { useExecutionStore } from '@/stores/execution/store'
import { useConsoleStore } from '@/stores/panel/console/store'
@@ -1111,7 +1113,7 @@ export class Executor {
if (parallelState) {
let allVirtualInstancesExecuted = true
for (let i = 0; i < parallelState.parallelCount; i++) {
const virtualBlockId = `${block.id}_parallel_${insideParallel}_iteration_${i}`
const virtualBlockId = VirtualBlockUtils.generateParallelId(block.id, insideParallel, i)
if (!executedBlocks.has(virtualBlockId)) {
allVirtualInstancesExecuted = false
break
@@ -1205,7 +1207,9 @@ export class Executor {
}
/**
* Check if a specific parallel iteration is complete (all blocks executed).
* Check if a specific parallel iteration is complete (all blocks that should execute have executed).
* This method now considers conditional execution paths - only blocks in the active execution
* path are expected to execute.
*
* @param parallelId - ID of the parallel block
* @param iteration - Iteration index to check
@@ -1223,8 +1227,16 @@ export class Executor {
return true
}
for (const nodeId of parallel.nodes) {
const virtualBlockId = `${nodeId}_parallel_${parallelId}_iteration_${iteration}`
const expectedBlocks = this.getExpectedBlocksForIteration(
parallelId,
iteration,
parallel,
context
)
// Check if all expected blocks have been executed
for (const nodeId of expectedBlocks) {
const virtualBlockId = VirtualBlockUtils.generateParallelId(nodeId, parallelId, iteration)
if (!context.executedBlocks.has(virtualBlockId)) {
return false
}
@@ -1232,6 +1244,107 @@ export class Executor {
return true
}
/**
* Get the blocks that are expected to execute in a parallel iteration based on
* the active execution path. This handles conditional logic where some blocks
* may not execute due to condition or router blocks.
*
* @param parallelId - ID of the parallel block
* @param iteration - Iteration index
* @param parallel - Parallel configuration
* @param context - Execution context
* @returns Array of node IDs that should execute in this iteration
*/
private getExpectedBlocksForIteration(
parallelId: string,
iteration: number,
parallel: any,
context: ExecutionContext
): string[] {
if (!parallel || !parallel.nodes) {
return []
}
const expectedBlocks: string[] = []
for (const nodeId of parallel.nodes) {
const block = this.actualWorkflow.blocks.find((b) => b.id === nodeId)
// If block doesn't exist in workflow, fall back to original behavior (assume it should execute)
// This maintains compatibility with tests and edge cases
if (!block) {
expectedBlocks.push(nodeId)
continue
}
if (!block.enabled) {
continue
}
const virtualBlockId = VirtualBlockUtils.generateParallelId(nodeId, parallelId, iteration)
// Skip blocks that have already been executed
if (context.executedBlocks.has(virtualBlockId)) {
expectedBlocks.push(nodeId)
continue
}
// Check if this block should execute based on the active execution path
// We need to check if the original block is reachable based on current routing decisions
try {
const shouldExecute = this.shouldBlockExecuteInParallelIteration(
nodeId,
parallelId,
iteration,
context
)
if (shouldExecute) {
expectedBlocks.push(nodeId)
}
} catch (error) {
// If path checking fails, default to including the block to maintain existing behavior
logger.warn(
`Path check failed for block ${nodeId} in parallel ${parallelId}, iteration ${iteration}:`,
error
)
expectedBlocks.push(nodeId)
}
}
return expectedBlocks
}
/**
* Determines if a block should execute in a specific parallel iteration
* based on conditional routing and active execution paths.
*
* Blocks are excluded from execution if they are completely unconnected (no incoming connections).
* Starting blocks (with external connections only) and conditionally routed blocks execute as expected.
*
* @param nodeId - ID of the block to check
* @param parallelId - ID of the parallel block
* @param iteration - Current iteration index
* @param context - Execution context
* @returns Whether the block should execute
*/
private shouldBlockExecuteInParallelIteration(
nodeId: string,
parallelId: string,
iteration: number,
context: ExecutionContext
): boolean {
const parallel = this.actualWorkflow.parallels?.[parallelId]
if (!parallel) return false
return ParallelRoutingUtils.shouldBlockExecuteInParallelIteration(
nodeId,
parallel,
iteration,
context
)
}
/**
* Check if there are more parallel iterations to process.
* This ensures the execution loop continues when iterations are being processed sequentially.
@@ -1269,6 +1382,8 @@ export class Executor {
/**
* Process a single parallel iteration with topological ordering of dependencies.
* Now includes conditional execution logic - only processes blocks that should execute
* based on the active execution path (handles conditions, routers, etc.).
*
* @param parallelId - ID of the parallel block
* @param iteration - Current iteration index
@@ -1293,9 +1408,9 @@ export class Executor {
}
>()
// Build dependency graph for this iteration
// Build dependency graph for this iteration - only include blocks that should execute
for (const nodeId of parallel.nodes) {
const virtualBlockId = `${nodeId}_parallel_${parallelId}_iteration_${iteration}`
const virtualBlockId = VirtualBlockUtils.generateParallelId(nodeId, parallelId, iteration)
const isExecuted = context.executedBlocks.has(virtualBlockId)
if (isExecuted) {
@@ -1305,6 +1420,26 @@ export class Executor {
const block = this.actualWorkflow.blocks.find((b) => b.id === nodeId)
if (!block || !block.enabled) continue
// Check if this block should execute in this iteration based on conditional paths
try {
const shouldExecute = this.shouldBlockExecuteInParallelIteration(
nodeId,
parallelId,
iteration,
context
)
if (!shouldExecute) {
continue
}
} catch (error) {
// If path checking fails, default to processing the block to maintain existing behavior
logger.warn(
`Path check failed for block ${nodeId} in parallel ${parallelId}, iteration ${iteration}:`,
error
)
}
// Find dependencies within this iteration
const incomingConnections = this.actualWorkflow.connections.filter(
(conn) => conn.target === nodeId
@@ -1314,7 +1449,11 @@ export class Executor {
for (const conn of incomingConnections) {
// Check if the source is within the same parallel
if (parallel.nodes.includes(conn.source)) {
const sourceDependencyId = `${conn.source}_parallel_${parallelId}_iteration_${iteration}`
const sourceDependencyId = VirtualBlockUtils.generateParallelId(
conn.source,
parallelId,
iteration
)
dependencies.push(sourceDependencyId)
} else {
// External dependency - check if it's met
@@ -1422,7 +1561,11 @@ export class Executor {
sourceBlock &&
this.actualWorkflow.parallels?.[insideParallel]?.nodes.includes(conn.source)
) {
sourceId = `${conn.source}_parallel_${insideParallel}_iteration_${iterationIndex}`
sourceId = VirtualBlockUtils.generateParallelId(
conn.source,
insideParallel,
iterationIndex
)
}
}
@@ -1769,6 +1912,21 @@ export class Executor {
)
}
// Store result for loops (IDENTICAL to parallel logic)
const containingLoopId = this.resolver.getContainingLoopId(block.id)
if (containingLoopId && !parallelInfo) {
// Only store for loops if not already in a parallel (avoid double storage)
const currentIteration = context.loopIterations.get(containingLoopId)
if (currentIteration !== undefined) {
this.loopManager.storeIterationResult(
context,
containingLoopId,
currentIteration - 1, // Convert to 0-based index
output
)
}
}
// Update the execution log
blockLog.success = true
blockLog.output = output
@@ -1886,6 +2044,21 @@ export class Executor {
)
}
// Store result for loops (IDENTICAL to parallel logic)
const containingLoopId = this.resolver.getContainingLoopId(block.id)
if (containingLoopId && !parallelInfo) {
// Only store for loops if not already in a parallel (avoid double storage)
const currentIteration = context.loopIterations.get(containingLoopId)
if (currentIteration !== undefined) {
this.loopManager.storeIterationResult(
context,
containingLoopId,
currentIteration - 1, // Convert to 0-based index
output
)
}
}
// Update the execution log
blockLog.success = true
blockLog.output = output

View File

@@ -310,7 +310,7 @@ describe('LoopManager', () => {
test('should create new loop state if none exists', () => {
const output = { result: 'test result' }
manager.storeIterationResult(mockContext, 'loop-1', 0, 'block-1', output)
manager.storeIterationResult(mockContext, 'loop-1', 0, output)
expect(mockContext.loopExecutions).toBeDefined()
const loopState = mockContext.loopExecutions!.get('loop-1')
@@ -334,13 +334,14 @@ describe('LoopManager', () => {
const output1 = { result: 'result1' }
const output2 = { result: 'result2' }
manager.storeIterationResult(mockContext, 'loop-1', 0, 'block-1', output1)
manager.storeIterationResult(mockContext, 'loop-1', 0, 'block-2', output2)
manager.storeIterationResult(mockContext, 'loop-1', 0, output1)
manager.storeIterationResult(mockContext, 'loop-1', 0, output2)
const loopState = mockContext.loopExecutions.get('loop-1')
const iterationResults = loopState?.executionResults.get('iteration_0')
expect(iterationResults).toEqual(output2)
// When multiple results are stored for the same iteration, they are combined into an array
expect(iterationResults).toEqual([output1, output2])
})
test('should handle forEach loop state creation', () => {
@@ -349,7 +350,7 @@ describe('LoopManager', () => {
const output = { result: 'test result' }
manager.storeIterationResult(mockContext, 'loop-1', 0, 'block-1', output)
manager.storeIterationResult(mockContext, 'loop-1', 0, output)
const loopState = mockContext.loopExecutions!.get('loop-1')
expect(loopState?.loopType).toBe('forEach')

View File

@@ -1,6 +1,7 @@
import { createLogger } from '@/lib/logs/console/logger'
import { BlockType } from '@/executor/consts'
import type { ExecutionContext } from '@/executor/types'
import { ConnectionUtils } from '@/executor/utils/connections'
import type { SerializedBlock, SerializedConnection, SerializedLoop } from '@/serializer/types'
const logger = createLogger('LoopManager')
@@ -49,26 +50,8 @@ export class LoopManager {
// All blocks in the loop have been executed
const currentIteration = context.loopIterations.get(loopId) || 1
// Store the results from this iteration before potentially resetting blocks
const iterationResults: any[] = []
for (const nodeId of loop.nodes) {
const blockState = context.blockStates.get(nodeId)
if (blockState?.output) {
// Just push the output directly, not nested under block ID
iterationResults.push(blockState.output)
}
}
// Store the iteration results
if (iterationResults.length > 0) {
this.storeIterationResult(
context,
loopId,
currentIteration - 1, // Convert back to 0-based for storage
'iteration',
iterationResults
)
}
// Results are now stored individually as blocks execute (like parallels)
// No need for bulk collection here
// The loop block will handle incrementing the iteration when it executes next
// We just need to reset the blocks so they can run again
@@ -113,11 +96,7 @@ export class LoopManager {
for (let i = 0; i < maxIterations; i++) {
const result = loopState.executionResults.get(`iteration_${i}`)
if (result) {
if (Array.isArray(result)) {
results.push(...result)
} else {
results.push(result)
}
results.push(result)
}
}
}
@@ -169,134 +148,23 @@ export class LoopManager {
}
/**
* Helper to get the length of items for forEach loops
*/
private getItemsLength(forEachItems: any): number {
if (Array.isArray(forEachItems)) {
return forEachItems.length
}
if (typeof forEachItems === 'object' && forEachItems !== null) {
return Object.keys(forEachItems).length
}
if (typeof forEachItems === 'string') {
try {
const parsed = JSON.parse(forEachItems)
if (Array.isArray(parsed)) {
return parsed.length
}
if (typeof parsed === 'object' && parsed !== null) {
return Object.keys(parsed).length
}
} catch {}
}
return 0
}
/**
* Resets all blocks within a loop for the next iteration.
* Checks if all reachable blocks in a loop have been executed.
* This method now excludes completely unconnected blocks from consideration,
* ensuring they don't prevent loop completion.
*
* @param loopId - ID of the loop
* @param loop - The loop configuration
* @param context - Current execution context
*/
private resetLoopBlocks(loopId: string, loop: SerializedLoop, context: ExecutionContext): void {
// Reset all blocks in the loop
for (const nodeId of loop.nodes) {
context.executedBlocks.delete(nodeId)
context.blockStates.delete(nodeId)
context.activeExecutionPath.delete(nodeId)
context.decisions.router.delete(nodeId)
context.decisions.condition.delete(nodeId)
}
}
/**
* Stores the result of a loop iteration.
*/
storeIterationResult(
context: ExecutionContext,
loopId: string,
iterationIndex: number,
_blockId: string, // Not used anymore since we're storing results directly
output: any
): void {
if (!context.loopExecutions) {
context.loopExecutions = new Map()
}
let loopState = context.loopExecutions.get(loopId)
if (!loopState) {
const loop = this.loops[loopId]
const loopType = loop?.loopType === 'forEach' ? 'forEach' : 'for'
const forEachItems = loop?.forEachItems
loopState = {
maxIterations: loop?.iterations || this.defaultIterations,
loopType,
forEachItems:
Array.isArray(forEachItems) || (typeof forEachItems === 'object' && forEachItems !== null)
? forEachItems
: null,
executionResults: new Map(),
currentIteration: 0,
}
context.loopExecutions.set(loopId, loopState)
}
// Store the output directly for this iteration
const iterationKey = `iteration_${iterationIndex}`
loopState.executionResults.set(iterationKey, output)
}
/**
* Gets the correct loop index based on the current block being executed.
*
* @param loopId - ID of the loop
* @param blockId - ID of the block requesting the index
* @param context - Current execution context
* @returns The correct loop index for this block
*/
getLoopIndex(loopId: string, blockId: string, context: ExecutionContext): number {
const loop = this.loops[loopId]
if (!loop) return 0
// Return the current iteration counter
return context.loopIterations.get(loopId) || 0
}
/**
* Gets the iterations for a loop.
*
* @param loopId - ID of the loop
* @returns Iterations for the loop
*/
getIterations(loopId: string): number {
return this.loops[loopId]?.iterations || this.defaultIterations
}
/**
* Gets the current item for a forEach loop.
*
* @param loopId - ID of the loop
* @param context - Current execution context
* @returns Current item in the loop iteration
*/
getCurrentItem(loopId: string, context: ExecutionContext): any {
return context.loopItems.get(loopId)
}
/**
* Checks if all blocks in a list have been executed.
* For routing blocks (condition/router), only checks if the selected path has been executed.
*
* @param nodeIds - IDs of nodes to check
* @param context - Current execution context
* @returns Whether all blocks have been executed
* @param nodeIds - All node IDs in the loop
* @param context - Execution context
* @returns Whether all reachable blocks have been executed
*/
private allBlocksExecuted(nodeIds: string[], context: ExecutionContext): boolean {
return this.allReachableBlocksExecuted(nodeIds, context)
}
/**
* Helper method to check if all reachable blocks have been executed.
* Separated for clarity and potential future testing.
*/
private allReachableBlocksExecuted(nodeIds: string[], context: ExecutionContext): boolean {
// Get all connections within the loop
const loopConnections =
context.workflow?.connections.filter(
@@ -306,17 +174,15 @@ export class LoopManager {
// Build a map of blocks to their outgoing connections within the loop
const blockOutgoingConnections = new Map<string, typeof loopConnections>()
for (const nodeId of nodeIds) {
blockOutgoingConnections.set(
nodeId,
loopConnections.filter((conn) => conn.source === nodeId)
)
const outgoingConnections = ConnectionUtils.getOutgoingConnections(nodeId, loopConnections)
blockOutgoingConnections.set(nodeId, outgoingConnections)
}
// Find blocks that have no incoming connections within the loop (entry points)
const entryBlocks = nodeIds.filter((nodeId) => {
const hasIncomingFromLoop = loopConnections.some((conn) => conn.target === nodeId)
return !hasIncomingFromLoop
})
// Only consider blocks as entry points if they have external connections to the loop
const entryBlocks = nodeIds.filter((nodeId) =>
ConnectionUtils.isEntryPoint(nodeId, nodeIds, context.workflow?.connections || [])
)
// Track which blocks we've visited and determined are reachable
const reachableBlocks = new Set<string>()
@@ -378,6 +244,134 @@ export class LoopManager {
return true
}
/**
* Helper to get the length of items for forEach loops
*/
private getItemsLength(forEachItems: any): number {
if (Array.isArray(forEachItems)) {
return forEachItems.length
}
if (typeof forEachItems === 'object' && forEachItems !== null) {
return Object.keys(forEachItems).length
}
if (typeof forEachItems === 'string') {
try {
const parsed = JSON.parse(forEachItems)
if (Array.isArray(parsed)) {
return parsed.length
}
if (typeof parsed === 'object' && parsed !== null) {
return Object.keys(parsed).length
}
} catch {}
}
return 0
}
/**
* Resets all blocks within a loop for the next iteration.
*
* @param loopId - ID of the loop
* @param loop - The loop configuration
* @param context - Current execution context
*/
private resetLoopBlocks(loopId: string, loop: SerializedLoop, context: ExecutionContext): void {
// Reset all blocks in the loop
for (const nodeId of loop.nodes) {
context.executedBlocks.delete(nodeId)
context.blockStates.delete(nodeId)
context.activeExecutionPath.delete(nodeId)
context.decisions.router.delete(nodeId)
context.decisions.condition.delete(nodeId)
}
}
/**
* Stores the result of a loop iteration.
*/
storeIterationResult(
context: ExecutionContext,
loopId: string,
iterationIndex: number,
output: any
): void {
if (!context.loopExecutions) {
context.loopExecutions = new Map()
}
let loopState = context.loopExecutions.get(loopId)
if (!loopState) {
const loop = this.loops[loopId]
const loopType = loop?.loopType === 'forEach' ? 'forEach' : 'for'
const forEachItems = loop?.forEachItems
loopState = {
maxIterations: loop?.iterations || this.defaultIterations,
loopType,
forEachItems:
Array.isArray(forEachItems) || (typeof forEachItems === 'object' && forEachItems !== null)
? forEachItems
: null,
executionResults: new Map(),
currentIteration: 0,
}
context.loopExecutions.set(loopId, loopState)
}
const iterationKey = `iteration_${iterationIndex}`
const existingResult = loopState.executionResults.get(iterationKey)
if (existingResult) {
if (Array.isArray(existingResult)) {
existingResult.push(output)
} else {
loopState.executionResults.set(iterationKey, [existingResult, output])
}
} else {
loopState.executionResults.set(iterationKey, output)
}
}
/**
* Gets the correct loop index based on the current block being executed.
*
* @param loopId - ID of the loop
* @param blockId - ID of the block requesting the index
* @param context - Current execution context
* @returns The correct loop index for this block
*/
getLoopIndex(loopId: string, blockId: string, context: ExecutionContext): number {
const loop = this.loops[loopId]
if (!loop) return 0
// Return the current iteration counter
return context.loopIterations.get(loopId) || 0
}
/**
* Gets the iterations for a loop.
*
* @param loopId - ID of the loop
* @returns Iterations for the loop
*/
getIterations(loopId: string): number {
return this.loops[loopId]?.iterations || this.defaultIterations
}
/**
* Gets the current item for a forEach loop.
*
* @param loopId - ID of the loop
* @param context - Current execution context
* @returns Current item in the loop iteration
*/
getCurrentItem(loopId: string, context: ExecutionContext): any {
return context.loopItems.get(loopId)
}
/**
* Checks if a connection forms a feedback path in a loop.
* With loop blocks, feedback paths are now handled by loop-to-inner-block connections.

View File

@@ -111,11 +111,24 @@ describe('ParallelManager', () => {
distributionItems: ['a', 'b', 'c'],
})
const context = {
workflow: {
blocks: [],
connections: [],
},
decisions: {
condition: new Map(),
router: new Map(),
},
executedBlocks: new Set(),
} as any
const result = manager.areAllVirtualBlocksExecuted(
'parallel-1',
parallel,
executedBlocks,
state
state,
context
)
expect(result).toBe(true)
@@ -138,11 +151,32 @@ describe('ParallelManager', () => {
distributionItems: ['a', 'b', 'c'],
})
// Create context with external connection to make func-1 a legitimate entry point
const context = {
workflow: {
blocks: [{ id: 'func-1', metadata: { id: 'function' } }],
connections: [
{
source: 'external-block',
target: 'func-1',
sourceHandle: 'output',
targetHandle: 'input',
},
],
},
decisions: {
condition: new Map(),
router: new Map(),
},
executedBlocks: new Set(),
} as any
const result = manager.areAllVirtualBlocksExecuted(
'parallel-1',
parallel,
executedBlocks,
state
state,
context
)
expect(result).toBe(false)

View File

@@ -1,4 +1,5 @@
import { createLogger } from '@/lib/logs/console/logger'
import { ParallelRoutingUtils } from '@/executor/parallels/utils'
import type { ExecutionContext, NormalizedBlockOutput } from '@/executor/types'
import type { SerializedBlock, SerializedParallel, SerializedWorkflow } from '@/serializer/types'
@@ -56,23 +57,30 @@ export class ParallelManager {
}
/**
* Checks if all virtual blocks for a parallel have been executed.
* Checks if all virtual blocks that SHOULD execute for a parallel have been executed.
* This now respects conditional routing - only checks blocks that should execute.
*/
areAllVirtualBlocksExecuted(
parallelId: string,
parallel: SerializedParallel,
executedBlocks: Set<string>,
parallelState: ParallelState
parallelState: ParallelState,
context: ExecutionContext
): boolean {
for (const nodeId of parallel.nodes) {
for (let i = 0; i < parallelState.parallelCount; i++) {
const virtualBlockId = `${nodeId}_parallel_${parallelId}_iteration_${i}`
if (!executedBlocks.has(virtualBlockId)) {
return false
}
}
const result = ParallelRoutingUtils.areAllRequiredVirtualBlocksExecuted(
parallel,
parallelState.parallelCount,
executedBlocks,
context
)
if (result) {
logger.info(`All required virtual blocks completed for parallel ${parallelId}`)
} else {
logger.info(`Parallel ${parallelId} not complete - some blocks still need to execute`)
}
return true
return result
}
/**
@@ -106,7 +114,8 @@ export class ParallelManager {
parallelId,
parallel,
context.executedBlocks,
parallelState
parallelState,
context
)
if (allVirtualBlocksExecuted && !context.completedLoops.has(parallelId)) {
@@ -214,7 +223,18 @@ export class ParallelManager {
): void {
const parallelState = context.parallelExecutions?.get(parallelId)
if (parallelState) {
parallelState.executionResults.set(`iteration_${iterationIndex}`, output)
const iterationKey = `iteration_${iterationIndex}`
const existingResult = parallelState.executionResults.get(iterationKey)
if (existingResult) {
if (Array.isArray(existingResult)) {
existingResult.push(output)
} else {
parallelState.executionResults.set(iterationKey, [existingResult, output])
}
} else {
parallelState.executionResults.set(iterationKey, output)
}
}
}
}

View File

@@ -0,0 +1,105 @@
import { BlockType } from '@/executor/consts'
import type { ExecutionContext } from '@/executor/types'
import { ConnectionUtils } from '@/executor/utils/connections'
import { VirtualBlockUtils } from '@/executor/utils/virtual-blocks'
import type { SerializedParallel } from '@/serializer/types'
/**
* Utility functions for parallel block conditional routing logic.
* Shared between Executor and ParallelManager to ensure consistent behavior.
*/
export class ParallelRoutingUtils {
/**
* Determines if a block should execute in a specific parallel iteration
* based on conditional routing and active execution paths.
*/
static shouldBlockExecuteInParallelIteration(
nodeId: string,
parallel: SerializedParallel,
iteration: number,
context: ExecutionContext
): boolean {
const internalConnections = ConnectionUtils.getInternalConnections(
nodeId,
parallel.nodes,
context.workflow?.connections || []
)
// If no internal connections, check if this is truly a starting block or an unconnected block
if (internalConnections.length === 0) {
// Use helper to check if this is an unconnected block
if (ConnectionUtils.isUnconnectedBlock(nodeId, context.workflow?.connections || [])) {
return false
}
// If there are external connections, this is a legitimate starting block - should execute
return true
}
// For blocks with dependencies within the parallel, check if any incoming connection is active
// based on routing decisions made by executed source blocks
return internalConnections.some((conn) => {
const sourceVirtualId = VirtualBlockUtils.generateParallelId(
conn.source,
parallel.id,
iteration
)
// Source must be executed for the connection to be considered
if (!context.executedBlocks.has(sourceVirtualId)) {
return false
}
// Get the source block to check its type
const sourceBlock = context.workflow?.blocks.find((b) => b.id === conn.source)
const sourceBlockType = sourceBlock?.metadata?.id
// For condition blocks, check if the specific condition path was selected
if (sourceBlockType === BlockType.CONDITION) {
const selectedCondition = context.decisions.condition.get(sourceVirtualId)
const expectedHandle = `condition-${selectedCondition}`
return conn.sourceHandle === expectedHandle
}
// For router blocks, check if this specific target was selected
if (sourceBlockType === BlockType.ROUTER) {
const selectedTarget = context.decisions.router.get(sourceVirtualId)
return selectedTarget === conn.target
}
// For regular blocks, the connection is active if the source executed successfully
return true
})
}
/**
* Checks if all virtual blocks that SHOULD execute for a parallel have been executed.
* Respects conditional routing - only checks blocks that should execute.
*/
static areAllRequiredVirtualBlocksExecuted(
parallel: SerializedParallel,
parallelCount: number,
executedBlocks: Set<string>,
context: ExecutionContext
): boolean {
for (const nodeId of parallel.nodes) {
for (let i = 0; i < parallelCount; i++) {
// Check if this specific block should execute in this iteration
const shouldExecute = ParallelRoutingUtils.shouldBlockExecuteInParallelIteration(
nodeId,
parallel,
i,
context
)
if (shouldExecute) {
const virtualBlockId = VirtualBlockUtils.generateParallelId(nodeId, parallel.id, i)
if (!executedBlocks.has(virtualBlockId)) {
return false
}
}
}
}
return true
}
}

View File

@@ -2,6 +2,8 @@ import { createLogger } from '@/lib/logs/console/logger'
import { BlockType } from '@/executor/consts'
import { Routing } from '@/executor/routing/routing'
import type { BlockState, ExecutionContext } from '@/executor/types'
import { ConnectionUtils } from '@/executor/utils/connections'
import { VirtualBlockUtils } from '@/executor/utils/virtual-blocks'
import type { SerializedBlock, SerializedConnection, SerializedWorkflow } from '@/serializer/types'
const logger = createLogger('PathTracker')
@@ -38,31 +40,54 @@ export class PathTracker {
/**
* Updates execution paths based on newly executed blocks.
* Handles router and condition block decisions to activate paths without deactivating others.
* Supports both original block IDs and virtual block IDs (for parallel execution).
*
* @param executedBlockIds - IDs of blocks that were just executed
* @param executedBlockIds - IDs of blocks that were just executed (may include virtual IDs)
* @param context - Current execution context
*/
updateExecutionPaths(executedBlockIds: string[], context: ExecutionContext): void {
for (const blockId of executedBlockIds) {
const block = this.getBlock(blockId)
// Handle virtual block IDs from parallel execution
const originalBlockId = this.extractOriginalBlockId(blockId)
const block = this.getBlock(originalBlockId)
if (!block) continue
// Set currentVirtualBlockId so decision setting uses the correct key
const previousVirtualBlockId = context.currentVirtualBlockId
if (blockId !== originalBlockId) {
context.currentVirtualBlockId = blockId
}
this.updatePathForBlock(block, context)
// Restore previous virtual block ID
context.currentVirtualBlockId = previousVirtualBlockId
}
}
/**
* Extract original block ID from virtual block ID.
* Virtual block IDs have format: originalId_parallel_parallelId_iteration_N
*
* @param blockId - Block ID (may be virtual or original)
* @returns Original block ID
*/
private extractOriginalBlockId(blockId: string): string {
return VirtualBlockUtils.extractOriginalId(blockId)
}
/**
* Get all incoming connections to a block
*/
private getIncomingConnections(blockId: string): SerializedConnection[] {
return this.workflow.connections.filter((conn) => conn.target === blockId)
return ConnectionUtils.getIncomingConnections(blockId, this.workflow.connections)
}
/**
* Get all outgoing connections from a block
*/
private getOutgoingConnections(blockId: string): SerializedConnection[] {
return this.workflow.connections.filter((conn) => conn.source === blockId)
return ConnectionUtils.getOutgoingConnections(blockId, this.workflow.connections)
}
/**
@@ -167,11 +192,15 @@ export class PathTracker {
* Update paths for router blocks
*/
private updateRouterPaths(block: SerializedBlock, context: ExecutionContext): void {
const routerOutput = context.blockStates.get(block.id)?.output
const blockStateKey = context.currentVirtualBlockId || block.id
const routerOutput = context.blockStates.get(blockStateKey)?.output
const selectedPath = routerOutput?.selectedPath?.blockId
if (selectedPath) {
context.decisions.router.set(block.id, selectedPath)
const decisionKey = context.currentVirtualBlockId || block.id
if (!context.decisions.router.has(decisionKey)) {
context.decisions.router.set(decisionKey, selectedPath)
}
context.activeExecutionPath.add(selectedPath)
// Check if the selected target should activate downstream paths
@@ -219,12 +248,17 @@ export class PathTracker {
* Update paths for condition blocks
*/
private updateConditionPaths(block: SerializedBlock, context: ExecutionContext): void {
const conditionOutput = context.blockStates.get(block.id)?.output
// Read block state using the correct ID (virtual ID if in parallel execution, otherwise original ID)
const blockStateKey = context.currentVirtualBlockId || block.id
const conditionOutput = context.blockStates.get(blockStateKey)?.output
const selectedConditionId = conditionOutput?.selectedConditionId
if (!selectedConditionId) return
context.decisions.condition.set(block.id, selectedConditionId)
const decisionKey = context.currentVirtualBlockId || block.id
if (!context.decisions.condition.has(decisionKey)) {
context.decisions.condition.set(decisionKey, selectedConditionId)
}
const targetConnections = this.workflow.connections.filter(
(conn) => conn.source === block.id && conn.sourceHandle === `condition-${selectedConditionId}`
@@ -268,7 +302,9 @@ export class PathTracker {
* Update paths for regular blocks
*/
private updateRegularBlockPaths(block: SerializedBlock, context: ExecutionContext): void {
const blockState = context.blockStates.get(block.id)
// Read block state using the correct ID (virtual ID if in parallel execution, otherwise original ID)
const blockStateKey = context.currentVirtualBlockId || block.id
const blockState = context.blockStates.get(blockStateKey)
const hasError = this.blockHasError(blockState)
const outgoingConnections = this.getOutgoingConnections(block.id)

View File

@@ -0,0 +1,83 @@
import type { SerializedConnection } from '@/serializer/types'
/**
* Utility functions for analyzing connections in workflow execution.
* Provides reusable helpers for connection filtering and analysis.
*/
export class ConnectionUtils {
/**
* Get all incoming connections to a specific node.
*/
static getIncomingConnections(
nodeId: string,
connections: SerializedConnection[]
): SerializedConnection[] {
return connections.filter((conn) => conn.target === nodeId)
}
/**
* Get all outgoing connections from a specific node.
*/
static getOutgoingConnections(
nodeId: string,
connections: SerializedConnection[]
): SerializedConnection[] {
return connections.filter((conn) => conn.source === nodeId)
}
/**
* Get connections from within a specific scope (parallel/loop) to a target node.
*/
static getInternalConnections(
nodeId: string,
scopeNodes: string[],
connections: SerializedConnection[]
): SerializedConnection[] {
const incomingConnections = ConnectionUtils.getIncomingConnections(nodeId, connections)
return incomingConnections.filter((conn) => scopeNodes.includes(conn.source))
}
/**
* Check if a block is completely unconnected (has no incoming connections at all).
*/
static isUnconnectedBlock(nodeId: string, connections: SerializedConnection[]): boolean {
return ConnectionUtils.getIncomingConnections(nodeId, connections).length === 0
}
/**
* Check if a block has external connections (connections from outside a scope).
*/
static hasExternalConnections(
nodeId: string,
scopeNodes: string[],
connections: SerializedConnection[]
): boolean {
const incomingConnections = ConnectionUtils.getIncomingConnections(nodeId, connections)
const internalConnections = incomingConnections.filter((conn) =>
scopeNodes.includes(conn.source)
)
// Has external connections if total incoming > internal connections
return incomingConnections.length > internalConnections.length
}
/**
* Determine if a block should be considered an entry point for a scope.
* Entry points are blocks that have no internal connections but do have external connections.
*/
static isEntryPoint(
nodeId: string,
scopeNodes: string[],
connections: SerializedConnection[]
): boolean {
const hasInternalConnections =
ConnectionUtils.getInternalConnections(nodeId, scopeNodes, connections).length > 0
if (hasInternalConnections) {
return false // Has internal connections, not an entry point
}
// Only entry point if it has external connections (not completely unconnected)
return ConnectionUtils.hasExternalConnections(nodeId, scopeNodes, connections)
}
}

View File

@@ -0,0 +1,54 @@
/**
* Utility functions for managing virtual block IDs in parallel execution.
* Virtual blocks allow the same block to be executed multiple times with different contexts.
*/
export class VirtualBlockUtils {
/**
* Generate a virtual block ID for parallel execution.
*/
static generateParallelId(originalId: string, parallelId: string, iteration: number): string {
return `${originalId}_parallel_${parallelId}_iteration_${iteration}`
}
/**
* Extract the original block ID from a virtual block ID.
*/
static extractOriginalId(virtualOrOriginalId: string): string {
if (VirtualBlockUtils.isVirtualId(virtualOrOriginalId)) {
// Virtual IDs have format: originalId_parallel_parallelId_iteration_N
const parts = virtualOrOriginalId.split('_parallel_')
return parts[0] || virtualOrOriginalId
}
return virtualOrOriginalId
}
/**
* Check if an ID is a virtual block ID.
*/
static isVirtualId(id: string): boolean {
return id.includes('_parallel_') && id.includes('_iteration_')
}
/**
* Parse a virtual block ID to extract its components.
* Returns null if the ID is not a virtual ID.
*/
static parseVirtualId(
virtualId: string
): { originalId: string; parallelId: string; iteration: number } | null {
if (!VirtualBlockUtils.isVirtualId(virtualId)) {
return null
}
const parallelMatch = virtualId.match(/^(.+)_parallel_(.+)_iteration_(\d+)$/)
if (parallelMatch) {
return {
originalId: parallelMatch[1]!,
parallelId: parallelMatch[2]!,
iteration: Number.parseInt(parallelMatch[3]!, 10),
}
}
return null
}
}

View File

@@ -981,8 +981,28 @@ export function useCollaborativeWorkflow() {
const collaborativeRemoveEdge = useCallback(
(edgeId: string) => {
const edge = workflowStore.edges.find((e) => e.id === edgeId)
// Skip if edge doesn't exist (already removed during cascade deletion)
if (!edge) {
logger.debug('Edge already removed, skipping operation', { edgeId })
return
}
// Check if the edge's source and target blocks still exist
const sourceExists = workflowStore.blocks[edge.source]
const targetExists = workflowStore.blocks[edge.target]
if (!sourceExists || !targetExists) {
logger.debug('Edge source or target block no longer exists, skipping operation', {
edgeId,
sourceExists: !!sourceExists,
targetExists: !!targetExists,
})
return
}
// Only record edge removal if it's not part of a parent update operation
if (edge && !skipEdgeRecording.current) {
if (!skipEdgeRecording.current) {
undoRedo.recordRemoveEdge(edgeId, edge)
}

View File

@@ -11,7 +11,7 @@
"dev": "next dev --turbo --port 3000",
"dev:classic": "next dev",
"dev:sockets": "bun run socket-server/index.ts",
"dev:full": "concurrently -n \"NextJS,Realtime\" -c \"cyan,magenta\" \"bun run dev\" \"bun run dev:sockets\"",
"dev:full": "concurrently -n \"App,Realtime\" -c \"cyan,magenta\" \"bun run dev\" \"bun run dev:sockets\"",
"build": "next build",
"start": "next start",
"prepare": "cd ../.. && bun husky",