fix(parallel): correct active state pulsing and duration display for parallel subflow blocks (#3305)

* fix(executor): resolve block ID for parallel subflow active state

* fix timing for parallel block

* refactor(parallel): extract shared updateActiveBlockRefCount helper

* fix(parallel): error-sticky block run status to prevent branch success masking failure

* Revert "fix(parallel): error-sticky block run status to prevent branch success masking failure"

This reverts commit 9c087cd466.
This commit is contained in:
Waleed
2026-02-22 15:03:33 -08:00
committed by GitHub
parent 996dc96d6e
commit 687c12528b
4 changed files with 67 additions and 14 deletions

View File

@@ -261,6 +261,9 @@ function buildEntryTree(entries: ConsoleEntry[]): EntryNode[] {
...allBlocks.map((b) => new Date(b.endedAt || b.timestamp).getTime())
)
const totalDuration = allBlocks.reduce((sum, b) => sum + (b.durationMs || 0), 0)
// Parallel branches run concurrently — use wall-clock time. Loop iterations run serially — use sum.
const subflowDuration =
iterationType === 'parallel' ? subflowEndMs - subflowStartMs : totalDuration
// Create synthetic subflow parent entry
// Use the minimum executionOrder from all child blocks for proper ordering
@@ -276,7 +279,7 @@ function buildEntryTree(entries: ConsoleEntry[]): EntryNode[] {
startedAt: new Date(subflowStartMs).toISOString(),
executionOrder: subflowExecutionOrder,
endedAt: new Date(subflowEndMs).toISOString(),
durationMs: totalDuration,
durationMs: subflowDuration,
success: !allBlocks.some((b) => b.error),
}
@@ -291,6 +294,9 @@ function buildEntryTree(entries: ConsoleEntry[]): EntryNode[] {
...iterBlocks.map((b) => new Date(b.endedAt || b.timestamp).getTime())
)
const iterDuration = iterBlocks.reduce((sum, b) => sum + (b.durationMs || 0), 0)
// Parallel branches run concurrently — use wall-clock time. Loop iterations run serially — use sum.
const iterDisplayDuration =
iterationType === 'parallel' ? iterEndMs - iterStartMs : iterDuration
// Use the minimum executionOrder from blocks in this iteration
const iterExecutionOrder = Math.min(...iterBlocks.map((b) => b.executionOrder))
@@ -305,7 +311,7 @@ function buildEntryTree(entries: ConsoleEntry[]): EntryNode[] {
startedAt: new Date(iterStartMs).toISOString(),
executionOrder: iterExecutionOrder,
endedAt: new Date(iterEndMs).toISOString(),
durationMs: iterDuration,
durationMs: iterDisplayDuration,
success: !iterBlocks.some((b) => b.error),
iterationCurrent: iterGroup.iterationCurrent,
iterationTotal: iterGroup.iterationTotal,

View File

@@ -20,6 +20,7 @@ import {
TriggerUtils,
} from '@/lib/workflows/triggers/triggers'
import { useCurrentWorkflow } from '@/app/workspace/[workspaceId]/w/[workflowId]/hooks/use-current-workflow'
import { updateActiveBlockRefCount } from '@/app/workspace/[workspaceId]/w/[workflowId]/utils/workflow-execution-utils'
import { getBlock } from '@/blocks'
import type { SerializableExecutionState } from '@/executor/execution/types'
import type {
@@ -63,6 +64,7 @@ interface BlockEventHandlerConfig {
executionIdRef: { current: string }
workflowEdges: Array<{ id: string; target: string; sourceHandle?: string | null }>
activeBlocksSet: Set<string>
activeBlockRefCounts: Map<string, number>
accumulatedBlockLogs: BlockLog[]
accumulatedBlockStates: Map<string, BlockState>
executedBlockIds: Set<string>
@@ -309,6 +311,7 @@ export function useWorkflowExecution() {
executionIdRef,
workflowEdges,
activeBlocksSet,
activeBlockRefCounts,
accumulatedBlockLogs,
accumulatedBlockStates,
executedBlockIds,
@@ -327,11 +330,7 @@ export function useWorkflowExecution() {
const updateActiveBlocks = (blockId: string, isActive: boolean) => {
if (!workflowId) return
if (isActive) {
activeBlocksSet.add(blockId)
} else {
activeBlocksSet.delete(blockId)
}
updateActiveBlockRefCount(activeBlockRefCounts, activeBlocksSet, blockId, isActive)
setActiveBlocks(workflowId, new Set(activeBlocksSet))
}
@@ -1280,6 +1279,7 @@ export function useWorkflowExecution() {
}
const activeBlocksSet = new Set<string>()
const activeBlockRefCounts = new Map<string, number>()
const streamedContent = new Map<string, string>()
const accumulatedBlockLogs: BlockLog[] = []
const accumulatedBlockStates = new Map<string, BlockState>()
@@ -1292,6 +1292,7 @@ export function useWorkflowExecution() {
executionIdRef,
workflowEdges,
activeBlocksSet,
activeBlockRefCounts,
accumulatedBlockLogs,
accumulatedBlockStates,
executedBlockIds,
@@ -1902,6 +1903,7 @@ export function useWorkflowExecution() {
const accumulatedBlockStates = new Map<string, BlockState>()
const executedBlockIds = new Set<string>()
const activeBlocksSet = new Set<string>()
const activeBlockRefCounts = new Map<string, number>()
try {
const blockHandlers = buildBlockEventHandlers({
@@ -1909,6 +1911,7 @@ export function useWorkflowExecution() {
executionIdRef,
workflowEdges,
activeBlocksSet,
activeBlockRefCounts,
accumulatedBlockLogs,
accumulatedBlockStates,
executedBlockIds,
@@ -2104,6 +2107,7 @@ export function useWorkflowExecution() {
const workflowEdges = useWorkflowStore.getState().edges
const activeBlocksSet = new Set<string>()
const activeBlockRefCounts = new Map<string, number>()
const accumulatedBlockLogs: BlockLog[] = []
const accumulatedBlockStates = new Map<string, BlockState>()
const executedBlockIds = new Set<string>()
@@ -2115,6 +2119,7 @@ export function useWorkflowExecution() {
executionIdRef,
workflowEdges,
activeBlocksSet,
activeBlockRefCounts,
accumulatedBlockLogs,
accumulatedBlockStates,
executedBlockIds,

View File

@@ -5,6 +5,30 @@ import { useTerminalConsoleStore } from '@/stores/terminal'
import { useWorkflowRegistry } from '@/stores/workflows/registry/store'
import { useWorkflowStore } from '@/stores/workflows/workflow/store'
/**
* Updates the active blocks set and ref counts for a single block.
* Ref counting ensures a block stays active until all parallel branches for it complete.
*/
export function updateActiveBlockRefCount(
refCounts: Map<string, number>,
activeSet: Set<string>,
blockId: string,
isActive: boolean
): void {
if (isActive) {
refCounts.set(blockId, (refCounts.get(blockId) ?? 0) + 1)
activeSet.add(blockId)
} else {
const next = (refCounts.get(blockId) ?? 1) - 1
if (next <= 0) {
refCounts.delete(blockId)
activeSet.delete(blockId)
} else {
refCounts.set(blockId, next)
}
}
}
export interface WorkflowExecutionOptions {
workflowInput?: any
onStream?: (se: StreamingExecution) => Promise<void>
@@ -39,6 +63,7 @@ export async function executeWorkflowWithFullLogging(
const workflowEdges = useWorkflowStore.getState().edges
const activeBlocksSet = new Set<string>()
const activeBlockRefCounts = new Map<string, number>()
const payload: any = {
input: options.workflowInput,
@@ -103,7 +128,12 @@ export async function executeWorkflowWithFullLogging(
switch (event.type) {
case 'block:started': {
activeBlocksSet.add(event.data.blockId)
updateActiveBlockRefCount(
activeBlockRefCounts,
activeBlocksSet,
event.data.blockId,
true
)
setActiveBlocks(wfId, new Set(activeBlocksSet))
const incomingEdges = workflowEdges.filter(
@@ -115,8 +145,13 @@ export async function executeWorkflowWithFullLogging(
break
}
case 'block:completed':
activeBlocksSet.delete(event.data.blockId)
case 'block:completed': {
updateActiveBlockRefCount(
activeBlockRefCounts,
activeBlocksSet,
event.data.blockId,
false
)
setActiveBlocks(wfId, new Set(activeBlocksSet))
setBlockRunStatus(wfId, event.data.blockId, 'success')
@@ -144,9 +179,15 @@ export async function executeWorkflowWithFullLogging(
options.onBlockComplete(event.data.blockId, event.data.output).catch(() => {})
}
break
}
case 'block:error':
activeBlocksSet.delete(event.data.blockId)
case 'block:error': {
updateActiveBlockRefCount(
activeBlockRefCounts,
activeBlocksSet,
event.data.blockId,
false
)
setActiveBlocks(wfId, new Set(activeBlocksSet))
setBlockRunStatus(wfId, event.data.blockId, 'error')
@@ -171,6 +212,7 @@ export async function executeWorkflowWithFullLogging(
iterationContainerId: event.data.iterationContainerId,
})
break
}
case 'execution:completed':
executionResult = {

View File

@@ -428,7 +428,7 @@ export class BlockExecutor {
block: SerializedBlock,
executionOrder: number
): void {
const blockId = node.id
const blockId = node.metadata?.originalBlockId ?? node.id
const blockName = block.metadata?.name ?? blockId
const blockType = block.metadata?.id ?? DEFAULTS.BLOCK_TYPE
@@ -456,7 +456,7 @@ export class BlockExecutor {
executionOrder: number,
endedAt: string
): void {
const blockId = node.id
const blockId = node.metadata?.originalBlockId ?? node.id
const blockName = block.metadata?.name ?? blockId
const blockType = block.metadata?.id ?? DEFAULTS.BLOCK_TYPE