fix(execution-store): allowed for multiple concurrent block executions

This commit is contained in:
Emir Karabeg
2025-02-16 00:49:23 -08:00
parent 6896f74579
commit dc85adc444
3 changed files with 85 additions and 84 deletions

View File

@@ -33,7 +33,7 @@ export function WorkflowBlock({ id, data, selected }: NodeProps<WorkflowBlockPro
const contentRef = useRef<HTMLDivElement>(null)
const updateNodeInternals = useUpdateNodeInternals()
// Store selectors
// Workflow store selectors
const isEnabled = useWorkflowStore((state) => state.blocks[id]?.enabled ?? true)
const horizontalHandles = useWorkflowStore(
(state) => state.blocks[id]?.horizontalHandles ?? false
@@ -41,13 +41,13 @@ export function WorkflowBlock({ id, data, selected }: NodeProps<WorkflowBlockPro
const isWide = useWorkflowStore((state) => state.blocks[id]?.isWide ?? false)
const blockHeight = useWorkflowStore((state) => state.blocks[id]?.height ?? 0)
// Store actions
// Workflow store actions
const updateBlockName = useWorkflowStore((state) => state.updateBlockName)
const toggleBlockWide = useWorkflowStore((state) => state.toggleBlockWide)
const updateBlockHeight = useWorkflowStore((state) => state.updateBlockHeight)
// Add execution state
const isActiveBlock = useExecutionStore((state) => state.activeBlockId === id)
// Execution store
const isActiveBlock = useExecutionStore((state) => state.activeBlockIds.has(id))
// const isExecuting = useExecutionStore((state) => state.isExecuting)
// Update node internals when handles change

View File

@@ -290,78 +290,89 @@ export class Executor {
return true
})
// Execute all blocks in the current layer in parallel
const layerResults = await Promise.all(
executableBlocks.map(async (blockId) => {
const block = blocks.find((b) => b.id === blockId)
if (!block) throw new Error(`Block ${blockId} not found`)
// Create a Set to track active blocks in the current layer
const { setActiveBlocks } = useExecutionStore.getState()
const inputs = this.resolveInputs(block, context)
const result = await this.executeBlock(block, inputs, context)
context.blockStates.set(blockId, result)
lastOutput = result
try {
// Set all blocks in the layer as active before execution
setActiveBlocks(new Set(executableBlocks))
if (block.metadata?.id === 'router') {
const routerResult = result as {
response: {
content: string
model: string
tokens: { prompt: number; completion: number; total: number }
selectedPath: { blockId: string }
// Execute all blocks in the current layer in parallel
const layerResults = await Promise.all(
executableBlocks.map(async (blockId) => {
const block = blocks.find((b) => b.id === blockId)
if (!block) throw new Error(`Block ${blockId} not found`)
const inputs = this.resolveInputs(block, context)
const result = await this.executeBlock(block, inputs, context)
context.blockStates.set(blockId, result)
lastOutput = result
if (block.metadata?.id === 'router') {
const routerResult = result as {
response: {
content: string
model: string
tokens: { prompt: number; completion: number; total: number }
selectedPath: { blockId: string }
}
}
routerDecisions.set(block.id, routerResult.response.selectedPath.blockId)
} else if (block.metadata?.id === 'condition') {
const conditionResult = await this.executeConditionalBlock(block, context)
activeConditionalPaths.set(block.id, conditionResult.selectedConditionId)
}
return blockId
})
)
// Process outgoing connections and update queue using the updateInDegree helper
for (const finishedBlockId of layerResults) {
const outgoingConns = connections.filter((conn) => conn.source === finishedBlockId)
for (const conn of outgoingConns) {
this.updateInDegree(
conn,
inDegree,
queue,
blocks,
routerDecisions,
activeConditionalPaths
)
}
}
// Check if we need to reset any loops
for (const [loopId, loop] of Object.entries(this.workflow.loops || {})) {
const loopBlocks = new Set(loop.nodes)
const executedLoopBlocks = layerResults.filter((blockId) => loopBlocks.has(blockId))
if (executedLoopBlocks.length > 0) {
const iterations = this.loopIterations.get(loopId) || 0
// Only process if we haven't hit max iterations
if (iterations < loop.maxIterations) {
// Check if any block in the loop has outgoing connections to other blocks in the loop
const hasLoopConnection = executedLoopBlocks.some((blockId) => {
const outgoingConns = connections.filter((conn) => conn.source === blockId)
return outgoingConns.some((conn) => loopBlocks.has(conn.target))
})
// Check if this was the last block in the loop (e.g., a condition block)
const isLoopComplete = executedLoopBlocks.some((blockId) => {
const block = blocks.find((b) => b.id === blockId)
return block?.metadata?.id === 'condition'
})
if (hasLoopConnection && isLoopComplete) {
this.loopIterations.set(loopId, iterations + 1)
}
}
routerDecisions.set(block.id, routerResult.response.selectedPath.blockId)
} else if (block.metadata?.id === 'condition') {
const conditionResult = await this.executeConditionalBlock(block, context)
activeConditionalPaths.set(block.id, conditionResult.selectedConditionId)
}
return blockId
})
)
// Process outgoing connections and update queue using the updateInDegree helper
for (const finishedBlockId of layerResults) {
const outgoingConns = connections.filter((conn) => conn.source === finishedBlockId)
for (const conn of outgoingConns) {
this.updateInDegree(
conn,
inDegree,
queue,
blocks,
routerDecisions,
activeConditionalPaths
)
}
}
// Check if we need to reset any loops
for (const [loopId, loop] of Object.entries(this.workflow.loops || {})) {
const loopBlocks = new Set(loop.nodes)
const executedLoopBlocks = layerResults.filter((blockId) => loopBlocks.has(blockId))
if (executedLoopBlocks.length > 0) {
const iterations = this.loopIterations.get(loopId) || 0
// Only process if we haven't hit max iterations
if (iterations < loop.maxIterations) {
// Check if any block in the loop has outgoing connections to other blocks in the loop
const hasLoopConnection = executedLoopBlocks.some((blockId) => {
const outgoingConns = connections.filter((conn) => conn.source === blockId)
return outgoingConns.some((conn) => loopBlocks.has(conn.target))
})
// Check if this was the last block in the loop (e.g., a condition block)
const isLoopComplete = executedLoopBlocks.some((blockId) => {
const block = blocks.find((b) => b.id === blockId)
return block?.metadata?.id === 'condition'
})
if (hasLoopConnection && isLoopComplete) {
this.loopIterations.set(loopId, iterations + 1)
}
}
}
} finally {
// Clear active blocks after layer execution
setActiveBlocks(new Set())
}
}
@@ -436,12 +447,8 @@ export class Executor {
): Promise<BlockOutput> {
const blockLog = this.startBlockLog(block)
const addConsole = useConsoleStore.getState().addConsole
const { setActiveBlock } = useExecutionStore.getState()
try {
// Set active block before execution
setActiveBlock(block.id)
if (block.enabled === false) {
throw new Error(`Cannot execute disabled block: ${block.metadata?.name || block.id}`)
}
@@ -610,9 +617,6 @@ export class Executor {
context.blockStates.set(block.id, output)
// Clear active block
setActiveBlock(null)
return output
} catch (error: any) {
// Log error
@@ -634,9 +638,6 @@ export class Executor {
blockType: block.metadata?.id || 'unknown',
})
// Clear active block
setActiveBlock(null)
throw error
}
}

View File

@@ -1,25 +1,25 @@
import { create } from 'zustand'
interface ExecutionState {
activeBlockId: string | null
activeBlockIds: Set<string>
isExecuting: boolean
}
interface ExecutionActions {
setActiveBlock: (blockId: string | null) => void
setActiveBlocks: (blockIds: Set<string>) => void
setIsExecuting: (isExecuting: boolean) => void
reset: () => void
}
const initialState: ExecutionState = {
activeBlockId: null,
activeBlockIds: new Set(),
isExecuting: false,
}
export const useExecutionStore = create<ExecutionState & ExecutionActions>()((set) => ({
...initialState,
setActiveBlock: (blockId) => set({ activeBlockId: blockId }),
setActiveBlocks: (blockIds) => set({ activeBlockIds: new Set(blockIds) }),
setIsExecuting: (isExecuting) => set({ isExecuting }),
reset: () => set(initialState),
}))