mirror of
https://github.com/simstudioai/sim.git
synced 2026-02-15 00:44:56 -05:00
fix(executor): resolve block ID for parallel subflow active state
This commit is contained in:
@@ -63,6 +63,7 @@ interface BlockEventHandlerConfig {
|
||||
executionIdRef: { current: string }
|
||||
workflowEdges: Array<{ id: string; target: string; sourceHandle?: string | null }>
|
||||
activeBlocksSet: Set<string>
|
||||
activeBlockRefCounts: Map<string, number>
|
||||
accumulatedBlockLogs: BlockLog[]
|
||||
accumulatedBlockStates: Map<string, BlockState>
|
||||
executedBlockIds: Set<string>
|
||||
@@ -309,6 +310,7 @@ export function useWorkflowExecution() {
|
||||
executionIdRef,
|
||||
workflowEdges,
|
||||
activeBlocksSet,
|
||||
activeBlockRefCounts,
|
||||
accumulatedBlockLogs,
|
||||
accumulatedBlockStates,
|
||||
executedBlockIds,
|
||||
@@ -328,9 +330,18 @@ export function useWorkflowExecution() {
|
||||
const updateActiveBlocks = (blockId: string, isActive: boolean) => {
|
||||
if (!workflowId) return
|
||||
if (isActive) {
|
||||
const count = activeBlockRefCounts.get(blockId) ?? 0
|
||||
activeBlockRefCounts.set(blockId, count + 1)
|
||||
activeBlocksSet.add(blockId)
|
||||
} else {
|
||||
activeBlocksSet.delete(blockId)
|
||||
const count = activeBlockRefCounts.get(blockId) ?? 1
|
||||
const next = count - 1
|
||||
if (next <= 0) {
|
||||
activeBlockRefCounts.delete(blockId)
|
||||
activeBlocksSet.delete(blockId)
|
||||
} else {
|
||||
activeBlockRefCounts.set(blockId, next)
|
||||
}
|
||||
}
|
||||
setActiveBlocks(workflowId, new Set(activeBlocksSet))
|
||||
}
|
||||
@@ -1280,6 +1291,7 @@ export function useWorkflowExecution() {
|
||||
}
|
||||
|
||||
const activeBlocksSet = new Set<string>()
|
||||
const activeBlockRefCounts = new Map<string, number>()
|
||||
const streamedContent = new Map<string, string>()
|
||||
const accumulatedBlockLogs: BlockLog[] = []
|
||||
const accumulatedBlockStates = new Map<string, BlockState>()
|
||||
@@ -1292,6 +1304,7 @@ export function useWorkflowExecution() {
|
||||
executionIdRef,
|
||||
workflowEdges,
|
||||
activeBlocksSet,
|
||||
activeBlockRefCounts,
|
||||
accumulatedBlockLogs,
|
||||
accumulatedBlockStates,
|
||||
executedBlockIds,
|
||||
@@ -1902,6 +1915,7 @@ export function useWorkflowExecution() {
|
||||
const accumulatedBlockStates = new Map<string, BlockState>()
|
||||
const executedBlockIds = new Set<string>()
|
||||
const activeBlocksSet = new Set<string>()
|
||||
const activeBlockRefCounts = new Map<string, number>()
|
||||
|
||||
try {
|
||||
const blockHandlers = buildBlockEventHandlers({
|
||||
@@ -1909,6 +1923,7 @@ export function useWorkflowExecution() {
|
||||
executionIdRef,
|
||||
workflowEdges,
|
||||
activeBlocksSet,
|
||||
activeBlockRefCounts,
|
||||
accumulatedBlockLogs,
|
||||
accumulatedBlockStates,
|
||||
executedBlockIds,
|
||||
@@ -2104,6 +2119,7 @@ export function useWorkflowExecution() {
|
||||
|
||||
const workflowEdges = useWorkflowStore.getState().edges
|
||||
const activeBlocksSet = new Set<string>()
|
||||
const activeBlockRefCounts = new Map<string, number>()
|
||||
const accumulatedBlockLogs: BlockLog[] = []
|
||||
const accumulatedBlockStates = new Map<string, BlockState>()
|
||||
const executedBlockIds = new Set<string>()
|
||||
@@ -2115,6 +2131,7 @@ export function useWorkflowExecution() {
|
||||
executionIdRef,
|
||||
workflowEdges,
|
||||
activeBlocksSet,
|
||||
activeBlockRefCounts,
|
||||
accumulatedBlockLogs,
|
||||
accumulatedBlockStates,
|
||||
executedBlockIds,
|
||||
|
||||
@@ -39,6 +39,7 @@ export async function executeWorkflowWithFullLogging(
|
||||
const workflowEdges = useWorkflowStore.getState().edges
|
||||
|
||||
const activeBlocksSet = new Set<string>()
|
||||
const activeBlockRefCounts = new Map<string, number>()
|
||||
|
||||
const payload: any = {
|
||||
input: options.workflowInput,
|
||||
@@ -103,6 +104,8 @@ export async function executeWorkflowWithFullLogging(
|
||||
|
||||
switch (event.type) {
|
||||
case 'block:started': {
|
||||
const startCount = activeBlockRefCounts.get(event.data.blockId) ?? 0
|
||||
activeBlockRefCounts.set(event.data.blockId, startCount + 1)
|
||||
activeBlocksSet.add(event.data.blockId)
|
||||
setActiveBlocks(wfId, new Set(activeBlocksSet))
|
||||
|
||||
@@ -115,8 +118,14 @@ export async function executeWorkflowWithFullLogging(
|
||||
break
|
||||
}
|
||||
|
||||
case 'block:completed':
|
||||
activeBlocksSet.delete(event.data.blockId)
|
||||
case 'block:completed': {
|
||||
const completeCount = activeBlockRefCounts.get(event.data.blockId) ?? 1
|
||||
if (completeCount <= 1) {
|
||||
activeBlockRefCounts.delete(event.data.blockId)
|
||||
activeBlocksSet.delete(event.data.blockId)
|
||||
} else {
|
||||
activeBlockRefCounts.set(event.data.blockId, completeCount - 1)
|
||||
}
|
||||
setActiveBlocks(wfId, new Set(activeBlocksSet))
|
||||
|
||||
setBlockRunStatus(wfId, event.data.blockId, 'success')
|
||||
@@ -144,9 +153,16 @@ export async function executeWorkflowWithFullLogging(
|
||||
options.onBlockComplete(event.data.blockId, event.data.output).catch(() => {})
|
||||
}
|
||||
break
|
||||
}
|
||||
|
||||
case 'block:error':
|
||||
activeBlocksSet.delete(event.data.blockId)
|
||||
case 'block:error': {
|
||||
const errorCount = activeBlockRefCounts.get(event.data.blockId) ?? 1
|
||||
if (errorCount <= 1) {
|
||||
activeBlockRefCounts.delete(event.data.blockId)
|
||||
activeBlocksSet.delete(event.data.blockId)
|
||||
} else {
|
||||
activeBlockRefCounts.set(event.data.blockId, errorCount - 1)
|
||||
}
|
||||
setActiveBlocks(wfId, new Set(activeBlocksSet))
|
||||
|
||||
setBlockRunStatus(wfId, event.data.blockId, 'error')
|
||||
@@ -171,6 +187,7 @@ export async function executeWorkflowWithFullLogging(
|
||||
iterationContainerId: event.data.iterationContainerId,
|
||||
})
|
||||
break
|
||||
}
|
||||
|
||||
case 'execution:completed':
|
||||
executionResult = {
|
||||
|
||||
@@ -428,7 +428,7 @@ export class BlockExecutor {
|
||||
block: SerializedBlock,
|
||||
executionOrder: number
|
||||
): void {
|
||||
const blockId = node.id
|
||||
const blockId = node.metadata?.originalBlockId ?? node.id
|
||||
const blockName = block.metadata?.name ?? blockId
|
||||
const blockType = block.metadata?.id ?? DEFAULTS.BLOCK_TYPE
|
||||
|
||||
@@ -456,7 +456,7 @@ export class BlockExecutor {
|
||||
executionOrder: number,
|
||||
endedAt: string
|
||||
): void {
|
||||
const blockId = node.id
|
||||
const blockId = node.metadata?.originalBlockId ?? node.id
|
||||
const blockName = block.metadata?.name ?? blockId
|
||||
const blockType = block.metadata?.id ?? DEFAULTS.BLOCK_TYPE
|
||||
|
||||
|
||||
Reference in New Issue
Block a user