fix(terminal): subflow logs rendering (#3189)

This commit is contained in:
Vikhyath Mondreti
2026-02-10 21:16:05 -08:00
committed by GitHub
parent 8a24b56f51
commit af01dce2c3
9 changed files with 136 additions and 47 deletions

View File

@@ -747,6 +747,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
iterationCurrent: iterationContext.iterationCurrent,
iterationTotal: iterationContext.iterationTotal,
iterationType: iterationContext.iterationType,
iterationContainerId: iterationContext.iterationContainerId,
}),
},
})
@@ -787,6 +788,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
iterationCurrent: iterationContext.iterationCurrent,
iterationTotal: iterationContext.iterationTotal,
iterationType: iterationContext.iterationType,
iterationContainerId: iterationContext.iterationContainerId,
}),
},
})
@@ -815,6 +817,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
iterationCurrent: iterationContext.iterationCurrent,
iterationTotal: iterationContext.iterationTotal,
iterationType: iterationContext.iterationType,
iterationContainerId: iterationContext.iterationContainerId,
}),
},
})

View File

@@ -161,6 +161,7 @@ export interface ExecutionGroup {
*/
interface IterationGroup {
iterationType: string
iterationContainerId: string
iterationCurrent: number
iterationTotal?: number
blocks: ConsoleEntry[]
@@ -169,7 +170,7 @@ interface IterationGroup {
/**
* Builds a tree structure from flat entries.
* Groups iteration entries by (iterationType, iterationCurrent), showing all blocks
* Groups iteration entries by (iterationType, iterationContainerId, iterationCurrent), showing all blocks
* that executed within each iteration.
* Sorts by start time to ensure chronological order.
*/
@@ -186,16 +187,18 @@ function buildEntryTree(entries: ConsoleEntry[]): EntryNode[] {
}
}
// Group iteration entries by (iterationType, iterationCurrent)
// Group iteration entries by (iterationType, iterationContainerId, iterationCurrent)
const iterationGroupsMap = new Map<string, IterationGroup>()
for (const entry of iterationEntries) {
const key = `${entry.iterationType}-${entry.iterationCurrent}`
const iterationContainerId = entry.iterationContainerId || 'unknown'
const key = `${entry.iterationType}-${iterationContainerId}-${entry.iterationCurrent}`
let group = iterationGroupsMap.get(key)
const entryStartMs = new Date(entry.startedAt || entry.timestamp).getTime()
if (!group) {
group = {
iterationType: entry.iterationType!,
iterationContainerId,
iterationCurrent: entry.iterationCurrent!,
iterationTotal: entry.iterationTotal,
blocks: [],
@@ -220,26 +223,34 @@ function buildEntryTree(entries: ConsoleEntry[]): EntryNode[] {
group.blocks.sort((a, b) => a.executionOrder - b.executionOrder)
}
// Group iterations by iterationType to create subflow parents
const subflowGroups = new Map<string, IterationGroup[]>()
// Group iterations by (iterationType, iterationContainerId) to create subflow parents
const subflowGroups = new Map<
string,
{ iterationType: string; iterationContainerId: string; groups: IterationGroup[] }
>()
for (const group of iterationGroupsMap.values()) {
const type = group.iterationType
let groups = subflowGroups.get(type)
if (!groups) {
groups = []
subflowGroups.set(type, groups)
const key = `${group.iterationType}-${group.iterationContainerId}`
let subflowGroup = subflowGroups.get(key)
if (!subflowGroup) {
subflowGroup = {
iterationType: group.iterationType,
iterationContainerId: group.iterationContainerId,
groups: [],
}
subflowGroups.set(key, subflowGroup)
}
groups.push(group)
subflowGroup.groups.push(group)
}
// Sort iterations within each subflow by iteration number
for (const groups of subflowGroups.values()) {
groups.sort((a, b) => a.iterationCurrent - b.iterationCurrent)
for (const subflowGroup of subflowGroups.values()) {
subflowGroup.groups.sort((a, b) => a.iterationCurrent - b.iterationCurrent)
}
// Build subflow nodes with iteration children
const subflowNodes: EntryNode[] = []
for (const [iterationType, iterationGroups] of subflowGroups.entries()) {
for (const subflowGroup of subflowGroups.values()) {
const { iterationType, iterationContainerId, groups: iterationGroups } = subflowGroup
// Calculate subflow timing from all its iterations
const firstIteration = iterationGroups[0]
const allBlocks = iterationGroups.flatMap((g) => g.blocks)
@@ -255,10 +266,10 @@ function buildEntryTree(entries: ConsoleEntry[]): EntryNode[] {
// Use the minimum executionOrder from all child blocks for proper ordering
const subflowExecutionOrder = Math.min(...allBlocks.map((b) => b.executionOrder))
const syntheticSubflow: ConsoleEntry = {
id: `subflow-${iterationType}-${firstIteration.blocks[0]?.executionId || 'unknown'}`,
id: `subflow-${iterationType}-${iterationContainerId}-${firstIteration.blocks[0]?.executionId || 'unknown'}`,
timestamp: new Date(subflowStartMs).toISOString(),
workflowId: firstIteration.blocks[0]?.workflowId || '',
blockId: `${iterationType}-container`,
blockId: `${iterationType}-container-${iterationContainerId}`,
blockName: iterationType.charAt(0).toUpperCase() + iterationType.slice(1),
blockType: iterationType,
executionId: firstIteration.blocks[0]?.executionId,
@@ -284,10 +295,10 @@ function buildEntryTree(entries: ConsoleEntry[]): EntryNode[] {
// Use the minimum executionOrder from blocks in this iteration
const iterExecutionOrder = Math.min(...iterBlocks.map((b) => b.executionOrder))
const syntheticIteration: ConsoleEntry = {
id: `iteration-${iterationType}-${iterGroup.iterationCurrent}-${iterBlocks[0]?.executionId || 'unknown'}`,
id: `iteration-${iterationType}-${iterGroup.iterationContainerId}-${iterGroup.iterationCurrent}-${iterBlocks[0]?.executionId || 'unknown'}`,
timestamp: new Date(iterStartMs).toISOString(),
workflowId: iterBlocks[0]?.workflowId || '',
blockId: `iteration-${iterGroup.iterationCurrent}`,
blockId: `iteration-${iterGroup.iterationContainerId}-${iterGroup.iterationCurrent}`,
blockName: `Iteration ${iterGroup.iterationCurrent}${iterGroup.iterationTotal !== undefined ? ` / ${iterGroup.iterationTotal}` : ''}`,
blockType: iterationType,
executionId: iterBlocks[0]?.executionId,
@@ -299,6 +310,7 @@ function buildEntryTree(entries: ConsoleEntry[]): EntryNode[] {
iterationCurrent: iterGroup.iterationCurrent,
iterationTotal: iterGroup.iterationTotal,
iterationType: iterationType as 'loop' | 'parallel',
iterationContainerId: iterGroup.iterationContainerId,
}
// Block nodes within this iteration

View File

@@ -365,6 +365,7 @@ export function useWorkflowExecution() {
iterationCurrent: data.iterationCurrent,
iterationTotal: data.iterationTotal,
iterationType: data.iterationType,
iterationContainerId: data.iterationContainerId,
})
}
@@ -387,6 +388,7 @@ export function useWorkflowExecution() {
iterationCurrent: data.iterationCurrent,
iterationTotal: data.iterationTotal,
iterationType: data.iterationType,
iterationContainerId: data.iterationContainerId,
})
}
@@ -394,6 +396,7 @@ export function useWorkflowExecution() {
updateConsole(
data.blockId,
{
executionOrder: data.executionOrder,
input: data.input || {},
replaceOutput: data.output,
success: true,
@@ -404,6 +407,7 @@ export function useWorkflowExecution() {
iterationCurrent: data.iterationCurrent,
iterationTotal: data.iterationTotal,
iterationType: data.iterationType,
iterationContainerId: data.iterationContainerId,
},
executionId
)
@@ -413,6 +417,7 @@ export function useWorkflowExecution() {
updateConsole(
data.blockId,
{
executionOrder: data.executionOrder,
input: data.input || {},
replaceOutput: {},
success: false,
@@ -424,6 +429,7 @@ export function useWorkflowExecution() {
iterationCurrent: data.iterationCurrent,
iterationTotal: data.iterationTotal,
iterationType: data.iterationType,
iterationContainerId: data.iterationContainerId,
},
executionId
)
@@ -453,6 +459,7 @@ export function useWorkflowExecution() {
iterationCurrent: data.iterationCurrent,
iterationTotal: data.iterationTotal,
iterationType: data.iterationType,
iterationContainerId: data.iterationContainerId,
})
}
@@ -921,6 +928,7 @@ export function useWorkflowExecution() {
useTerminalConsoleStore.getState().updateConsole(
log.blockId,
{
executionOrder: log.executionOrder,
replaceOutput: log.output,
success: true,
},

View File

@@ -137,6 +137,7 @@ export async function executeWorkflowWithFullLogging(
iterationCurrent: event.data.iterationCurrent,
iterationTotal: event.data.iterationTotal,
iterationType: event.data.iterationType,
iterationContainerId: event.data.iterationContainerId,
})
if (options.onBlockComplete) {
@@ -167,6 +168,7 @@ export async function executeWorkflowWithFullLogging(
iterationCurrent: event.data.iterationCurrent,
iterationTotal: event.data.iterationTotal,
iterationType: event.data.iterationType,
iterationContainerId: event.data.iterationContainerId,
})
break

View File

@@ -17,7 +17,11 @@ import {
} from '@/executor/constants'
import type { DAGNode } from '@/executor/dag/builder'
import { ChildWorkflowError } from '@/executor/errors/child-workflow-error'
import type { BlockStateWriter, ContextExtensions } from '@/executor/execution/types'
import type {
BlockStateWriter,
ContextExtensions,
IterationContext,
} from '@/executor/execution/types'
import {
generatePauseContextId,
mapNodeMetadataToPauseScopes,
@@ -473,28 +477,41 @@ export class BlockExecutor {
}
}
private getIterationContext(
ctx: ExecutionContext,
node: DAGNode
): { iterationCurrent: number; iterationTotal: number; iterationType: SubflowType } | undefined {
private createIterationContext(
iterationCurrent: number,
iterationType: SubflowType,
iterationContainerId?: string,
iterationTotal?: number
): IterationContext {
return {
iterationCurrent,
iterationTotal,
iterationType,
iterationContainerId,
}
}
private getIterationContext(ctx: ExecutionContext, node: DAGNode): IterationContext | undefined {
if (!node?.metadata) return undefined
if (node.metadata.branchIndex !== undefined && node.metadata.branchTotal) {
return {
iterationCurrent: node.metadata.branchIndex,
iterationTotal: node.metadata.branchTotal,
iterationType: 'parallel',
}
if (node.metadata.branchIndex !== undefined && node.metadata.branchTotal !== undefined) {
return this.createIterationContext(
node.metadata.branchIndex,
'parallel',
node.metadata.parallelId,
node.metadata.branchTotal
)
}
if (node.metadata.isLoopNode && node.metadata.loopId) {
const loopScope = ctx.loopExecutions?.get(node.metadata.loopId)
if (loopScope && loopScope.iteration !== undefined && loopScope.maxIterations) {
return {
iterationCurrent: loopScope.iteration,
iterationTotal: loopScope.maxIterations,
iterationType: 'loop',
}
if (loopScope && loopScope.iteration !== undefined) {
return this.createIterationContext(
loopScope.iteration,
'loop',
node.metadata.loopId,
loopScope.maxIterations
)
}
}

View File

@@ -49,8 +49,9 @@ export interface SerializableExecutionState {
export interface IterationContext {
iterationCurrent: number
iterationTotal: number
iterationTotal?: number
iterationType: SubflowType
iterationContainerId?: string
}
export interface ExecutionCallbacks {

View File

@@ -80,6 +80,7 @@ export interface BlockStartedEvent extends BaseExecutionEvent {
iterationCurrent?: number
iterationTotal?: number
iterationType?: SubflowType
iterationContainerId?: string
}
}
@@ -102,6 +103,7 @@ export interface BlockCompletedEvent extends BaseExecutionEvent {
iterationCurrent?: number
iterationTotal?: number
iterationType?: SubflowType
iterationContainerId?: string
}
}
@@ -124,6 +126,7 @@ export interface BlockErrorEvent extends BaseExecutionEvent {
iterationCurrent?: number
iterationTotal?: number
iterationType?: SubflowType
iterationContainerId?: string
}
}
@@ -219,7 +222,12 @@ export function createSSECallbacks(options: SSECallbackOptions) {
blockName: string,
blockType: string,
executionOrder: number,
iterationContext?: { iterationCurrent: number; iterationTotal: number; iterationType: string }
iterationContext?: {
iterationCurrent: number
iterationTotal?: number
iterationType: string
iterationContainerId?: string
}
) => {
sendEvent({
type: 'block:started',
@@ -235,6 +243,7 @@ export function createSSECallbacks(options: SSECallbackOptions) {
iterationCurrent: iterationContext.iterationCurrent,
iterationTotal: iterationContext.iterationTotal,
iterationType: iterationContext.iterationType as any,
iterationContainerId: iterationContext.iterationContainerId,
}),
},
})
@@ -252,7 +261,12 @@ export function createSSECallbacks(options: SSECallbackOptions) {
executionOrder: number
endedAt: string
},
iterationContext?: { iterationCurrent: number; iterationTotal: number; iterationType: string }
iterationContext?: {
iterationCurrent: number
iterationTotal?: number
iterationType: string
iterationContainerId?: string
}
) => {
const hasError = callbackData.output?.error
const iterationData = iterationContext
@@ -260,6 +274,7 @@ export function createSSECallbacks(options: SSECallbackOptions) {
iterationCurrent: iterationContext.iterationCurrent,
iterationTotal: iterationContext.iterationTotal,
iterationType: iterationContext.iterationType as any,
iterationContainerId: iterationContext.iterationContainerId,
}
: {}

View File

@@ -62,6 +62,38 @@ const shouldSkipEntry = (output: any): boolean => {
return false
}
const matchesEntryForUpdate = (
entry: ConsoleEntry,
blockId: string,
executionId: string | undefined,
update: string | ConsoleUpdate
): boolean => {
if (entry.blockId !== blockId || entry.executionId !== executionId) {
return false
}
if (typeof update !== 'object') {
return true
}
if (update.executionOrder !== undefined && entry.executionOrder !== update.executionOrder) {
return false
}
if (update.iterationCurrent !== undefined && entry.iterationCurrent !== update.iterationCurrent) {
return false
}
if (
update.iterationContainerId !== undefined &&
entry.iterationContainerId !== update.iterationContainerId
) {
return false
}
return true
}
interface NotifyBlockErrorParams {
error: unknown
blockName: string
@@ -299,15 +331,7 @@ export const useTerminalConsoleStore = create<ConsoleStore>()(
updateConsole: (blockId: string, update: string | ConsoleUpdate, executionId?: string) => {
set((state) => {
const updatedEntries = state.entries.map((entry) => {
if (entry.blockId !== blockId || entry.executionId !== executionId) {
return entry
}
if (
typeof update === 'object' &&
update.iterationCurrent !== undefined &&
entry.iterationCurrent !== update.iterationCurrent
) {
if (!matchesEntryForUpdate(entry, blockId, executionId, update)) {
return entry
}
@@ -387,6 +411,10 @@ export const useTerminalConsoleStore = create<ConsoleStore>()(
updatedEntry.iterationType = update.iterationType
}
if (update.iterationContainerId !== undefined) {
updatedEntry.iterationContainerId = update.iterationContainerId
}
return updatedEntry
})

View File

@@ -21,6 +21,7 @@ export interface ConsoleEntry {
iterationCurrent?: number
iterationTotal?: number
iterationType?: SubflowType
iterationContainerId?: string
isRunning?: boolean
isCanceled?: boolean
}
@@ -29,6 +30,7 @@ export interface ConsoleUpdate {
content?: string
output?: Partial<NormalizedBlockOutput>
replaceOutput?: NormalizedBlockOutput
executionOrder?: number
error?: string | Error | null
warning?: string
success?: boolean
@@ -41,6 +43,7 @@ export interface ConsoleUpdate {
iterationCurrent?: number
iterationTotal?: number
iterationType?: SubflowType
iterationContainerId?: string
}
export interface ConsoleStore {