fix(parallel): add numbered parallel and loop blocks for multiple instances of sub-nodes

This commit is contained in:
Waleed Latif
2025-05-26 23:40:53 -07:00
parent 80a7bf5e5f
commit 79bea155b3
8 changed files with 606 additions and 53 deletions

View File

@@ -245,7 +245,11 @@ function WorkflowContent() {
if (type === 'loop' || type === 'parallel') {
// Create a unique ID and name for the container
const id = crypto.randomUUID()
const name = type === 'loop' ? 'Loop' : 'Parallel'
// Auto-number the blocks based on existing blocks of the same type
const existingBlocksOfType = Object.values(blocks).filter((b) => b.type === type)
const blockNumber = existingBlocksOfType.length + 1
const name = type === 'loop' ? `Loop ${blockNumber}` : `Parallel ${blockNumber}`
// Calculate the center position of the viewport
const centerPosition = project({
@@ -363,7 +367,11 @@ function WorkflowContent() {
if (data.type === 'loop' || data.type === 'parallel') {
// Create a unique ID and name for the container
const id = crypto.randomUUID()
const name = data.type === 'loop' ? 'Loop' : 'Parallel'
// Auto-number the blocks based on existing blocks of the same type
const existingBlocksOfType = Object.values(blocks).filter((b) => b.type === data.type)
const blockNumber = existingBlocksOfType.length + 1
const name = data.type === 'loop' ? `Loop ${blockNumber}` : `Parallel ${blockNumber}`
// Check if we're dropping inside another container
if (containerInfo) {
@@ -467,9 +475,9 @@ function WorkflowContent() {
const id = crypto.randomUUID()
const name =
data.type === 'loop'
? 'Loop'
? `Loop ${Object.values(blocks).filter((b) => b.type === 'loop').length + 1}`
: data.type === 'parallel'
? 'Parallel'
? `Parallel ${Object.values(blocks).filter((b) => b.type === 'parallel').length + 1}`
: `${blockConfig!.name} ${Object.values(blocks).filter((b) => b.type === data.type).length + 1}`
if (containerInfo) {

View File

@@ -277,4 +277,215 @@ describe('ParallelBlockHandler', () => {
// Should not have items when no distribution
expect(context.loopItems.has('parallel-1_items')).toBe(false)
})
describe('multiple downstream connections', () => {
it('should make results available to all downstream blocks', async () => {
const handler = new ParallelBlockHandler()
const parallelBlock = createMockBlock('parallel-1')
parallelBlock.config.params = {
parallelType: 'collection',
count: 3,
}
const parallel: SerializedParallel = {
id: 'parallel-1',
nodes: ['agent-1'],
distribution: ['item1', 'item2', 'item3'],
}
const context = createMockContext(parallel)
context.workflow!.connections = [
{
source: 'parallel-1',
target: 'agent-1',
sourceHandle: 'parallel-start-source',
},
{
source: 'parallel-1',
target: 'function-1',
sourceHandle: 'parallel-end-source',
},
{
source: 'parallel-1',
target: 'parallel-2',
sourceHandle: 'parallel-end-source',
},
]
// Initialize parallel
const initResult = await handler.execute(parallelBlock, {}, context)
expect((initResult as any).response.started).toBe(true)
expect((initResult as any).response.parallelCount).toBe(3)
// Simulate all virtual blocks being executed
const parallelState = context.parallelExecutions?.get('parallel-1')
expect(parallelState).toBeDefined()
// Mark all virtual blocks as executed and store results
for (let i = 0; i < 3; i++) {
const virtualBlockId = `agent-1_parallel_parallel-1_iteration_${i}`
context.executedBlocks.add(virtualBlockId)
// Store iteration results
parallelState!.executionResults.set(`iteration_${i}`, {
'agent-1': {
response: {
content: `Result from iteration ${i}`,
model: 'test-model',
},
},
})
}
// Re-execute to aggregate results
const aggregatedResult = await handler.execute(parallelBlock, {}, context)
// Verify results are aggregated
expect((aggregatedResult as any).response.completed).toBe(true)
expect((aggregatedResult as any).response.results).toHaveLength(3)
// Verify block state is stored
const blockState = context.blockStates.get('parallel-1')
expect(blockState).toBeDefined()
expect(blockState?.output.response.results).toHaveLength(3)
// Verify both downstream blocks are activated
expect(context.activeExecutionPath.has('function-1')).toBe(true)
expect(context.activeExecutionPath.has('parallel-2')).toBe(true)
// Verify parallel is marked as completed
expect(context.completedLoops.has('parallel-1')).toBe(true)
// Simulate downstream blocks trying to access results
// This should work without errors
const storedResults = context.blockStates.get('parallel-1')?.output.response.results
expect(storedResults).toBeDefined()
expect(storedResults).toHaveLength(3)
})
it('should handle reference resolution when multiple parallel blocks exist', async () => {
const handler = new ParallelBlockHandler()
// Create first parallel block
const parallel1Block = createMockBlock('parallel-1')
parallel1Block.config.params = {
parallelType: 'collection',
count: 2,
}
// Create second parallel block (even if not connected)
const parallel2Block = createMockBlock('parallel-2')
parallel2Block.config.params = {
parallelType: 'collection',
collection: '<parallel.response.results>', // This references the first parallel
}
// Set up context with both parallels
const context: ExecutionContext = {
workflowId: 'test-workflow',
blockStates: new Map(),
blockLogs: [],
metadata: { duration: 0 },
environmentVariables: {},
decisions: { router: new Map(), condition: new Map() },
loopIterations: new Map(),
loopItems: new Map(),
completedLoops: new Set(),
executedBlocks: new Set(),
activeExecutionPath: new Set(),
workflow: {
version: '1.0',
blocks: [
parallel1Block,
parallel2Block,
{
id: 'agent-1',
position: { x: 0, y: 0 },
config: { tool: 'agent', params: {} },
inputs: {},
outputs: {},
metadata: { id: 'agent', name: 'Agent 1' },
enabled: true,
},
{
id: 'function-1',
position: { x: 0, y: 0 },
config: {
tool: 'function',
params: {
code: 'return <parallel.response.results>;',
},
},
inputs: {},
outputs: {},
metadata: { id: 'function', name: 'Function 1' },
enabled: true,
},
],
connections: [
{
source: 'parallel-1',
target: 'agent-1',
sourceHandle: 'parallel-start-source',
},
{
source: 'parallel-1',
target: 'function-1',
sourceHandle: 'parallel-end-source',
},
{
source: 'parallel-1',
target: 'parallel-2',
sourceHandle: 'parallel-end-source',
},
],
loops: {},
parallels: {
'parallel-1': {
id: 'parallel-1',
nodes: ['agent-1'],
distribution: ['item1', 'item2'],
},
'parallel-2': {
id: 'parallel-2',
nodes: [],
distribution: '<parallel.response.results>',
},
},
},
}
// Initialize first parallel
await handler.execute(parallel1Block, {}, context)
// Simulate execution of agent blocks
const parallelState = context.parallelExecutions?.get('parallel-1')
for (let i = 0; i < 2; i++) {
context.executedBlocks.add(`agent-1_parallel_parallel-1_iteration_${i}`)
parallelState!.executionResults.set(`iteration_${i}`, {
'agent-1': { response: { content: `Result ${i}` } },
})
}
// Re-execute first parallel to aggregate results
const result = await handler.execute(parallel1Block, {}, context)
expect((result as any).response.completed).toBe(true)
// Verify the block state is available
const blockState = context.blockStates.get('parallel-1')
expect(blockState).toBeDefined()
expect(blockState?.output.response.results).toHaveLength(2)
// Now when function block tries to resolve <parallel.response.results>, it should work
// even though parallel-2 exists on the canvas
expect(() => {
// This simulates what the resolver would do
const state = context.blockStates.get('parallel-1')
if (!state) throw new Error('No state found for block parallel-1')
const results = state.output?.response?.results
if (!results) throw new Error('No results found')
return results
}).not.toThrow()
})
})
})

View File

@@ -39,6 +39,76 @@ export class ParallelBlockHandler implements BlockHandler {
// Get or initialize the parallel state
let parallelState = context.parallelExecutions.get(block.id)
// Check if all virtual blocks have completed (even before initialization)
if (parallelState) {
const allCompleted = this.checkAllIterationsCompleted(block.id, context)
if (allCompleted && !context.completedLoops.has(block.id)) {
logger.info(`All iterations completed for parallel ${block.id}, aggregating results`)
// Mark this parallel as completed
context.completedLoops.add(block.id)
// Check if we already have aggregated results stored (from a previous completion check)
const existingBlockState = context.blockStates.get(block.id)
if (existingBlockState?.output?.response?.results) {
logger.info(`Parallel ${block.id} already has aggregated results, returning them`)
return existingBlockState.output
}
// Aggregate results
const results = []
for (let i = 0; i < parallelState.parallelCount; i++) {
const result = parallelState.executionResults.get(`iteration_${i}`)
if (result) {
results.push(result)
}
}
// Store the aggregated results in the block state so subsequent blocks can reference them
const aggregatedOutput = {
response: {
parallelId: block.id,
parallelCount: parallelState.parallelCount,
completed: true,
results,
message: `Completed all ${parallelState.parallelCount} executions`,
},
}
// Store the aggregated results in context so blocks connected to parallel-end-source can access them
context.blockStates.set(block.id, {
output: aggregatedOutput,
executed: true,
executionTime: 0, // Parallel coordination doesn't have meaningful execution time
})
// Activate the parallel-end-source connection to continue workflow
const parallelEndConnections =
context.workflow?.connections.filter(
(conn) => conn.source === block.id && conn.sourceHandle === 'parallel-end-source'
) || []
for (const conn of parallelEndConnections) {
context.activeExecutionPath.add(conn.target)
logger.info(`Activated post-parallel path to ${conn.target}`)
}
// Clean up iteration data
if (context.loopItems.has(`${block.id}_items`)) {
context.loopItems.delete(`${block.id}_items`)
}
if (context.loopItems.has(block.id)) {
context.loopItems.delete(block.id)
}
if (context.loopIterations.has(block.id)) {
context.loopIterations.delete(block.id)
}
return aggregatedOutput
}
}
if (!parallelState) {
logger.info(`Initializing parallel block ${block.id}`)
@@ -123,59 +193,74 @@ export class ParallelBlockHandler implements BlockHandler {
if (allCompleted) {
logger.info(`All iterations completed for parallel ${block.id}`)
// Mark this parallel as completed
context.completedLoops.add(block.id)
// This case should have been handled earlier, but as a safety check
if (!context.completedLoops.has(block.id)) {
// Mark this parallel as completed
context.completedLoops.add(block.id)
// Aggregate results
const results = []
for (let i = 0; i < parallelState.parallelCount; i++) {
const result = parallelState.executionResults.get(`iteration_${i}`)
if (result) {
results.push(result)
// Check if we already have aggregated results stored (from a previous completion check)
const existingBlockState = context.blockStates.get(block.id)
if (existingBlockState?.output?.response?.results) {
logger.info(`Parallel ${block.id} already has aggregated results, returning them`)
return existingBlockState.output
}
}
// Store the aggregated results in the block state so subsequent blocks can reference them
const aggregatedOutput = {
response: {
parallelId: block.id,
parallelCount: parallelState.parallelCount,
completed: true,
results,
message: `Completed all ${parallelState.parallelCount} executions`,
},
}
// Aggregate results
const results = []
for (let i = 0; i < parallelState.parallelCount; i++) {
const result = parallelState.executionResults.get(`iteration_${i}`)
if (result) {
results.push(result)
}
}
// Store the aggregated results in context so blocks connected to parallel-end-source can access them
context.blockStates.set(block.id, {
output: aggregatedOutput,
executed: true,
executionTime: 0, // Parallel coordination doesn't have meaningful execution time
})
// Store the aggregated results in the block state so subsequent blocks can reference them
const aggregatedOutput = {
response: {
parallelId: block.id,
parallelCount: parallelState.parallelCount,
completed: true,
results,
message: `Completed all ${parallelState.parallelCount} executions`,
},
}
// Activate the parallel-end-source connection to continue workflow
const parallelEndConnections =
context.workflow?.connections.filter(
(conn) => conn.source === block.id && conn.sourceHandle === 'parallel-end-source'
) || []
// Store the aggregated results in context so blocks connected to parallel-end-source can access them
context.blockStates.set(block.id, {
output: aggregatedOutput,
executed: true,
executionTime: 0, // Parallel coordination doesn't have meaningful execution time
})
for (const conn of parallelEndConnections) {
context.activeExecutionPath.add(conn.target)
logger.info(`Activated post-parallel path to ${conn.target}`)
}
// Activate the parallel-end-source connection to continue workflow
const parallelEndConnections =
context.workflow?.connections.filter(
(conn) => conn.source === block.id && conn.sourceHandle === 'parallel-end-source'
) || []
// Clean up iteration data
if (context.loopItems.has(`${block.id}_items`)) {
context.loopItems.delete(`${block.id}_items`)
}
if (context.loopItems.has(block.id)) {
context.loopItems.delete(block.id)
}
if (context.loopIterations.has(block.id)) {
context.loopIterations.delete(block.id)
}
for (const conn of parallelEndConnections) {
context.activeExecutionPath.add(conn.target)
logger.info(`Activated post-parallel path to ${conn.target}`)
}
return aggregatedOutput
// Clean up iteration data
if (context.loopItems.has(`${block.id}_items`)) {
context.loopItems.delete(`${block.id}_items`)
}
if (context.loopItems.has(block.id)) {
context.loopItems.delete(block.id)
}
if (context.loopIterations.has(block.id)) {
context.loopIterations.delete(block.id)
}
return aggregatedOutput
}
// Already completed, return the stored results
const existingBlockState = context.blockStates.get(block.id)
if (existingBlockState?.output) {
return existingBlockState.output
}
}
// Still waiting for iterations to complete

View File

@@ -110,8 +110,33 @@ export class ParallelManager {
)
if (allVirtualBlocksExecuted && !context.completedLoops.has(parallelId)) {
// Check if the parallel block already has aggregated results stored
const blockState = context.blockStates.get(parallelId)
if (blockState?.output?.response?.completed && blockState?.output?.response?.results) {
logger.info(
`Parallel ${parallelId} already has aggregated results, marking as completed without re-execution`
)
// Just mark it as completed without re-execution
context.completedLoops.add(parallelId)
// Activate the parallel-end-source connections if not already done
const parallelEndConnections =
context.workflow?.connections.filter(
(conn) => conn.source === parallelId && conn.sourceHandle === 'parallel-end-source'
) || []
for (const conn of parallelEndConnections) {
if (!context.activeExecutionPath.has(conn.target)) {
context.activeExecutionPath.add(conn.target)
logger.info(`Activated post-parallel path to ${conn.target}`)
}
}
continue
}
logger.info(
`All virtual blocks completed for parallel ${parallelId}, re-executing to check completion`
`All virtual blocks completed for parallel ${parallelId}, re-executing to aggregate results`
)
// Re-execute the parallel block to check completion and trigger end connections

View File

@@ -944,4 +944,213 @@ describe('InputResolver', () => {
expect(resolvedInputs.allItems).toEqual(items)
})
})
describe('parallel references', () => {
it('should resolve parallel references when block is inside a parallel', () => {
const workflow: SerializedWorkflow = {
version: '1.0',
blocks: [
{
id: 'parallel-1',
position: { x: 0, y: 0 },
config: { tool: 'parallel', params: {} },
inputs: {},
outputs: {},
metadata: { id: 'parallel', name: 'Parallel 1' },
enabled: true,
},
{
id: 'function-1',
position: { x: 0, y: 0 },
config: { tool: 'function', params: { code: '<parallel.currentItem>' } },
inputs: {},
outputs: {},
metadata: { id: 'function', name: 'Function 1' },
enabled: true,
},
],
connections: [],
loops: {},
parallels: {
'parallel-1': {
id: 'parallel-1',
nodes: ['function-1'],
distribution: ['item1', 'item2'],
},
},
}
const resolver = new InputResolver(workflow, {})
const context: ExecutionContext = {
workflowId: 'test',
blockStates: new Map(),
blockLogs: [],
metadata: { duration: 0 },
environmentVariables: {},
decisions: { router: new Map(), condition: new Map() },
loopIterations: new Map(),
loopItems: new Map([['parallel-1', 'test-item']]),
completedLoops: new Set(),
executedBlocks: new Set(),
activeExecutionPath: new Set(['function-1']),
workflow,
}
const block = workflow.blocks[1]
const result = resolver.resolveInputs(block, context)
expect(result.code).toBe('test-item')
})
it('should resolve parallel references by block name when multiple parallels exist', () => {
const workflow: SerializedWorkflow = {
version: '1.0',
blocks: [
{
id: 'parallel-1',
position: { x: 0, y: 0 },
config: { tool: 'parallel', params: {} },
inputs: {},
outputs: {},
metadata: { id: 'parallel', name: 'Parallel 1' },
enabled: true,
},
{
id: 'parallel-2',
position: { x: 0, y: 0 },
config: { tool: 'parallel', params: {} },
inputs: {},
outputs: {},
metadata: { id: 'parallel', name: 'Parallel 2' },
enabled: true,
},
{
id: 'function-1',
position: { x: 0, y: 0 },
config: { tool: 'function', params: { code: '<Parallel1.response.results>' } },
inputs: {},
outputs: {},
metadata: { id: 'function', name: 'Function 1' },
enabled: true,
},
],
connections: [],
loops: {},
parallels: {
'parallel-1': {
id: 'parallel-1',
nodes: [],
},
'parallel-2': {
id: 'parallel-2',
nodes: [],
},
},
}
const resolver = new InputResolver(workflow, {})
const context: ExecutionContext = {
workflowId: 'test',
blockStates: new Map([
[
'parallel-1',
{
output: { response: { results: ['result1', 'result2'] } },
executed: true,
executionTime: 0,
},
],
[
'parallel-2',
{
output: { response: { results: ['result3', 'result4'] } },
executed: true,
executionTime: 0,
},
],
]),
blockLogs: [],
metadata: { duration: 0 },
environmentVariables: {},
decisions: { router: new Map(), condition: new Map() },
loopIterations: new Map(),
loopItems: new Map(),
completedLoops: new Set(),
executedBlocks: new Set(),
activeExecutionPath: new Set(['parallel-1', 'parallel-2', 'function-1']),
workflow,
}
const block = workflow.blocks[2]
const result = resolver.resolveInputs(block, context)
// Should resolve to Parallel 1's results
expect(result.code).toBe('["result1","result2"]')
})
it('should resolve parallel references by block ID when needed', () => {
const workflow: SerializedWorkflow = {
version: '1.0',
blocks: [
{
id: 'parallel-1',
position: { x: 0, y: 0 },
config: { tool: 'parallel', params: {} },
inputs: {},
outputs: {},
metadata: { id: 'parallel', name: 'Parallel 1' },
enabled: true,
},
{
id: 'function-1',
position: { x: 0, y: 0 },
config: { tool: 'function', params: { code: '<parallel-1.response.results>' } },
inputs: {},
outputs: {},
metadata: { id: 'function', name: 'Function 1' },
enabled: true,
},
],
connections: [],
loops: {},
parallels: {
'parallel-1': {
id: 'parallel-1',
nodes: [],
},
},
}
const resolver = new InputResolver(workflow, {})
const context: ExecutionContext = {
workflowId: 'test',
blockStates: new Map([
[
'parallel-1',
{
output: { response: { results: ['result1', 'result2'] } },
executed: true,
executionTime: 0,
},
],
]),
blockLogs: [],
metadata: { duration: 0 },
environmentVariables: {},
decisions: { router: new Map(), condition: new Map() },
loopIterations: new Map(),
loopItems: new Map(),
completedLoops: new Set(),
executedBlocks: new Set(),
activeExecutionPath: new Set(['parallel-1', 'function-1']),
workflow,
}
const block = workflow.blocks[1]
const result = resolver.resolveInputs(block, context)
// Should successfully resolve the reference using block ID
expect(result.code).toBe('["result1","result2"]')
})
})
})

View File

@@ -163,12 +163,17 @@ export function setupUnloadPersistence(): void {
? currentState.generateLoopBlocks()
: {}
// Generate parallels from the current blocks for consistency
const generatedParallels = currentState.generateParallelBlocks
? currentState.generateParallelBlocks()
: {}
// Save the complete state including history which is added by middleware
saveWorkflowState(currentId, {
blocks: currentState.blocks,
edges: currentState.edges,
loops: generatedLoops,
parallels: currentState.parallels,
parallels: generatedParallels,
isDeployed: currentState.isDeployed,
deployedAt: currentState.deployedAt,
lastSaved: Date.now(),

View File

@@ -493,12 +493,20 @@ export const useWorkflowRegistry = create<WorkflowRegistry>()(
if (currentId) {
const currentState = useWorkflowStore.getState()
// Generate loops and parallels from current blocks
const generatedLoops = currentState.generateLoopBlocks
? currentState.generateLoopBlocks()
: {}
const generatedParallels = currentState.generateParallelBlocks
? currentState.generateParallelBlocks()
: {}
// Save the complete state for the current workflow
saveWorkflowState(currentId, {
blocks: currentState.blocks,
edges: currentState.edges,
loops: currentState.loops,
parallels: currentState.parallels,
loops: generatedLoops,
parallels: generatedParallels,
history: currentState.history,
isDeployed: currentState.isDeployed,
deployedAt: currentState.deployedAt,

View File

@@ -474,10 +474,12 @@ export const useWorkflowStore = create<WorkflowStoreWithHistory>()(
if (activeWorkflowId) {
const currentState = get()
const generatedLoops = currentState.generateLoopBlocks()
const generatedParallels = currentState.generateParallelBlocks()
saveWorkflowState(activeWorkflowId, {
blocks: currentState.blocks,
edges: currentState.edges,
loops: generatedLoops,
parallels: generatedParallels,
history: currentState.history,
// Include both legacy and new deployment status fields
isDeployed: currentState.isDeployed,