diff --git a/apps/sim/app/api/workflows/[id]/execute/route.ts b/apps/sim/app/api/workflows/[id]/execute/route.ts index b047f23e8..13fc0ff41 100644 --- a/apps/sim/app/api/workflows/[id]/execute/route.ts +++ b/apps/sim/app/api/workflows/[id]/execute/route.ts @@ -747,6 +747,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id: iterationCurrent: iterationContext.iterationCurrent, iterationTotal: iterationContext.iterationTotal, iterationType: iterationContext.iterationType, + iterationContainerId: iterationContext.iterationContainerId, }), }, }) @@ -787,6 +788,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id: iterationCurrent: iterationContext.iterationCurrent, iterationTotal: iterationContext.iterationTotal, iterationType: iterationContext.iterationType, + iterationContainerId: iterationContext.iterationContainerId, }), }, }) @@ -815,6 +817,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id: iterationCurrent: iterationContext.iterationCurrent, iterationTotal: iterationContext.iterationTotal, iterationType: iterationContext.iterationType, + iterationContainerId: iterationContext.iterationContainerId, }), }, }) diff --git a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/terminal/utils.ts b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/terminal/utils.ts index 861708926..0c285a7b9 100644 --- a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/terminal/utils.ts +++ b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/terminal/utils.ts @@ -161,6 +161,7 @@ export interface ExecutionGroup { */ interface IterationGroup { iterationType: string + iterationContainerId: string iterationCurrent: number iterationTotal?: number blocks: ConsoleEntry[] @@ -169,7 +170,7 @@ interface IterationGroup { /** * Builds a tree structure from flat entries. - * Groups iteration entries by (iterationType, iterationCurrent), showing all blocks + * Groups iteration entries by (iterationType, iterationContainerId, iterationCurrent), showing all blocks * that executed within each iteration. * Sorts by start time to ensure chronological order. */ @@ -186,16 +187,18 @@ function buildEntryTree(entries: ConsoleEntry[]): EntryNode[] { } } - // Group iteration entries by (iterationType, iterationCurrent) + // Group iteration entries by (iterationType, iterationContainerId, iterationCurrent) const iterationGroupsMap = new Map() for (const entry of iterationEntries) { - const key = `${entry.iterationType}-${entry.iterationCurrent}` + const iterationContainerId = entry.iterationContainerId || 'unknown' + const key = `${entry.iterationType}-${iterationContainerId}-${entry.iterationCurrent}` let group = iterationGroupsMap.get(key) const entryStartMs = new Date(entry.startedAt || entry.timestamp).getTime() if (!group) { group = { iterationType: entry.iterationType!, + iterationContainerId, iterationCurrent: entry.iterationCurrent!, iterationTotal: entry.iterationTotal, blocks: [], @@ -220,26 +223,34 @@ function buildEntryTree(entries: ConsoleEntry[]): EntryNode[] { group.blocks.sort((a, b) => a.executionOrder - b.executionOrder) } - // Group iterations by iterationType to create subflow parents - const subflowGroups = new Map() + // Group iterations by (iterationType, iterationContainerId) to create subflow parents + const subflowGroups = new Map< + string, + { iterationType: string; iterationContainerId: string; groups: IterationGroup[] } + >() for (const group of iterationGroupsMap.values()) { - const type = group.iterationType - let groups = subflowGroups.get(type) - if (!groups) { - groups = [] - subflowGroups.set(type, groups) + const key = `${group.iterationType}-${group.iterationContainerId}` + let subflowGroup = subflowGroups.get(key) + if (!subflowGroup) { + subflowGroup = { + iterationType: group.iterationType, + iterationContainerId: group.iterationContainerId, + groups: [], + } + subflowGroups.set(key, subflowGroup) } - groups.push(group) + subflowGroup.groups.push(group) } // Sort iterations within each subflow by iteration number - for (const groups of subflowGroups.values()) { - groups.sort((a, b) => a.iterationCurrent - b.iterationCurrent) + for (const subflowGroup of subflowGroups.values()) { + subflowGroup.groups.sort((a, b) => a.iterationCurrent - b.iterationCurrent) } // Build subflow nodes with iteration children const subflowNodes: EntryNode[] = [] - for (const [iterationType, iterationGroups] of subflowGroups.entries()) { + for (const subflowGroup of subflowGroups.values()) { + const { iterationType, iterationContainerId, groups: iterationGroups } = subflowGroup // Calculate subflow timing from all its iterations const firstIteration = iterationGroups[0] const allBlocks = iterationGroups.flatMap((g) => g.blocks) @@ -255,10 +266,10 @@ function buildEntryTree(entries: ConsoleEntry[]): EntryNode[] { // Use the minimum executionOrder from all child blocks for proper ordering const subflowExecutionOrder = Math.min(...allBlocks.map((b) => b.executionOrder)) const syntheticSubflow: ConsoleEntry = { - id: `subflow-${iterationType}-${firstIteration.blocks[0]?.executionId || 'unknown'}`, + id: `subflow-${iterationType}-${iterationContainerId}-${firstIteration.blocks[0]?.executionId || 'unknown'}`, timestamp: new Date(subflowStartMs).toISOString(), workflowId: firstIteration.blocks[0]?.workflowId || '', - blockId: `${iterationType}-container`, + blockId: `${iterationType}-container-${iterationContainerId}`, blockName: iterationType.charAt(0).toUpperCase() + iterationType.slice(1), blockType: iterationType, executionId: firstIteration.blocks[0]?.executionId, @@ -284,10 +295,10 @@ function buildEntryTree(entries: ConsoleEntry[]): EntryNode[] { // Use the minimum executionOrder from blocks in this iteration const iterExecutionOrder = Math.min(...iterBlocks.map((b) => b.executionOrder)) const syntheticIteration: ConsoleEntry = { - id: `iteration-${iterationType}-${iterGroup.iterationCurrent}-${iterBlocks[0]?.executionId || 'unknown'}`, + id: `iteration-${iterationType}-${iterGroup.iterationContainerId}-${iterGroup.iterationCurrent}-${iterBlocks[0]?.executionId || 'unknown'}`, timestamp: new Date(iterStartMs).toISOString(), workflowId: iterBlocks[0]?.workflowId || '', - blockId: `iteration-${iterGroup.iterationCurrent}`, + blockId: `iteration-${iterGroup.iterationContainerId}-${iterGroup.iterationCurrent}`, blockName: `Iteration ${iterGroup.iterationCurrent}${iterGroup.iterationTotal !== undefined ? ` / ${iterGroup.iterationTotal}` : ''}`, blockType: iterationType, executionId: iterBlocks[0]?.executionId, @@ -299,6 +310,7 @@ function buildEntryTree(entries: ConsoleEntry[]): EntryNode[] { iterationCurrent: iterGroup.iterationCurrent, iterationTotal: iterGroup.iterationTotal, iterationType: iterationType as 'loop' | 'parallel', + iterationContainerId: iterGroup.iterationContainerId, } // Block nodes within this iteration diff --git a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/hooks/use-workflow-execution.ts b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/hooks/use-workflow-execution.ts index d816f17e0..6a2a43f67 100644 --- a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/hooks/use-workflow-execution.ts +++ b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/hooks/use-workflow-execution.ts @@ -365,6 +365,7 @@ export function useWorkflowExecution() { iterationCurrent: data.iterationCurrent, iterationTotal: data.iterationTotal, iterationType: data.iterationType, + iterationContainerId: data.iterationContainerId, }) } @@ -387,6 +388,7 @@ export function useWorkflowExecution() { iterationCurrent: data.iterationCurrent, iterationTotal: data.iterationTotal, iterationType: data.iterationType, + iterationContainerId: data.iterationContainerId, }) } @@ -394,6 +396,7 @@ export function useWorkflowExecution() { updateConsole( data.blockId, { + executionOrder: data.executionOrder, input: data.input || {}, replaceOutput: data.output, success: true, @@ -404,6 +407,7 @@ export function useWorkflowExecution() { iterationCurrent: data.iterationCurrent, iterationTotal: data.iterationTotal, iterationType: data.iterationType, + iterationContainerId: data.iterationContainerId, }, executionId ) @@ -413,6 +417,7 @@ export function useWorkflowExecution() { updateConsole( data.blockId, { + executionOrder: data.executionOrder, input: data.input || {}, replaceOutput: {}, success: false, @@ -424,6 +429,7 @@ export function useWorkflowExecution() { iterationCurrent: data.iterationCurrent, iterationTotal: data.iterationTotal, iterationType: data.iterationType, + iterationContainerId: data.iterationContainerId, }, executionId ) @@ -453,6 +459,7 @@ export function useWorkflowExecution() { iterationCurrent: data.iterationCurrent, iterationTotal: data.iterationTotal, iterationType: data.iterationType, + iterationContainerId: data.iterationContainerId, }) } @@ -921,6 +928,7 @@ export function useWorkflowExecution() { useTerminalConsoleStore.getState().updateConsole( log.blockId, { + executionOrder: log.executionOrder, replaceOutput: log.output, success: true, }, diff --git a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/utils/workflow-execution-utils.ts b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/utils/workflow-execution-utils.ts index f032da1f0..1f16dbc8d 100644 --- a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/utils/workflow-execution-utils.ts +++ b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/utils/workflow-execution-utils.ts @@ -137,6 +137,7 @@ export async function executeWorkflowWithFullLogging( iterationCurrent: event.data.iterationCurrent, iterationTotal: event.data.iterationTotal, iterationType: event.data.iterationType, + iterationContainerId: event.data.iterationContainerId, }) if (options.onBlockComplete) { @@ -167,6 +168,7 @@ export async function executeWorkflowWithFullLogging( iterationCurrent: event.data.iterationCurrent, iterationTotal: event.data.iterationTotal, iterationType: event.data.iterationType, + iterationContainerId: event.data.iterationContainerId, }) break diff --git a/apps/sim/executor/execution/block-executor.ts b/apps/sim/executor/execution/block-executor.ts index 200a23b5e..fcd5c1297 100644 --- a/apps/sim/executor/execution/block-executor.ts +++ b/apps/sim/executor/execution/block-executor.ts @@ -17,7 +17,11 @@ import { } from '@/executor/constants' import type { DAGNode } from '@/executor/dag/builder' import { ChildWorkflowError } from '@/executor/errors/child-workflow-error' -import type { BlockStateWriter, ContextExtensions } from '@/executor/execution/types' +import type { + BlockStateWriter, + ContextExtensions, + IterationContext, +} from '@/executor/execution/types' import { generatePauseContextId, mapNodeMetadataToPauseScopes, @@ -473,28 +477,41 @@ export class BlockExecutor { } } - private getIterationContext( - ctx: ExecutionContext, - node: DAGNode - ): { iterationCurrent: number; iterationTotal: number; iterationType: SubflowType } | undefined { + private createIterationContext( + iterationCurrent: number, + iterationType: SubflowType, + iterationContainerId?: string, + iterationTotal?: number + ): IterationContext { + return { + iterationCurrent, + iterationTotal, + iterationType, + iterationContainerId, + } + } + + private getIterationContext(ctx: ExecutionContext, node: DAGNode): IterationContext | undefined { if (!node?.metadata) return undefined - if (node.metadata.branchIndex !== undefined && node.metadata.branchTotal) { - return { - iterationCurrent: node.metadata.branchIndex, - iterationTotal: node.metadata.branchTotal, - iterationType: 'parallel', - } + if (node.metadata.branchIndex !== undefined && node.metadata.branchTotal !== undefined) { + return this.createIterationContext( + node.metadata.branchIndex, + 'parallel', + node.metadata.parallelId, + node.metadata.branchTotal + ) } if (node.metadata.isLoopNode && node.metadata.loopId) { const loopScope = ctx.loopExecutions?.get(node.metadata.loopId) - if (loopScope && loopScope.iteration !== undefined && loopScope.maxIterations) { - return { - iterationCurrent: loopScope.iteration, - iterationTotal: loopScope.maxIterations, - iterationType: 'loop', - } + if (loopScope && loopScope.iteration !== undefined) { + return this.createIterationContext( + loopScope.iteration, + 'loop', + node.metadata.loopId, + loopScope.maxIterations + ) } } diff --git a/apps/sim/executor/execution/types.ts b/apps/sim/executor/execution/types.ts index 91dfe2c6a..29b79ca03 100644 --- a/apps/sim/executor/execution/types.ts +++ b/apps/sim/executor/execution/types.ts @@ -49,8 +49,9 @@ export interface SerializableExecutionState { export interface IterationContext { iterationCurrent: number - iterationTotal: number + iterationTotal?: number iterationType: SubflowType + iterationContainerId?: string } export interface ExecutionCallbacks { diff --git a/apps/sim/lib/workflows/executor/execution-events.ts b/apps/sim/lib/workflows/executor/execution-events.ts index bc5c316c2..b0585dea5 100644 --- a/apps/sim/lib/workflows/executor/execution-events.ts +++ b/apps/sim/lib/workflows/executor/execution-events.ts @@ -80,6 +80,7 @@ export interface BlockStartedEvent extends BaseExecutionEvent { iterationCurrent?: number iterationTotal?: number iterationType?: SubflowType + iterationContainerId?: string } } @@ -102,6 +103,7 @@ export interface BlockCompletedEvent extends BaseExecutionEvent { iterationCurrent?: number iterationTotal?: number iterationType?: SubflowType + iterationContainerId?: string } } @@ -124,6 +126,7 @@ export interface BlockErrorEvent extends BaseExecutionEvent { iterationCurrent?: number iterationTotal?: number iterationType?: SubflowType + iterationContainerId?: string } } @@ -219,7 +222,12 @@ export function createSSECallbacks(options: SSECallbackOptions) { blockName: string, blockType: string, executionOrder: number, - iterationContext?: { iterationCurrent: number; iterationTotal: number; iterationType: string } + iterationContext?: { + iterationCurrent: number + iterationTotal?: number + iterationType: string + iterationContainerId?: string + } ) => { sendEvent({ type: 'block:started', @@ -235,6 +243,7 @@ export function createSSECallbacks(options: SSECallbackOptions) { iterationCurrent: iterationContext.iterationCurrent, iterationTotal: iterationContext.iterationTotal, iterationType: iterationContext.iterationType as any, + iterationContainerId: iterationContext.iterationContainerId, }), }, }) @@ -252,7 +261,12 @@ export function createSSECallbacks(options: SSECallbackOptions) { executionOrder: number endedAt: string }, - iterationContext?: { iterationCurrent: number; iterationTotal: number; iterationType: string } + iterationContext?: { + iterationCurrent: number + iterationTotal?: number + iterationType: string + iterationContainerId?: string + } ) => { const hasError = callbackData.output?.error const iterationData = iterationContext @@ -260,6 +274,7 @@ export function createSSECallbacks(options: SSECallbackOptions) { iterationCurrent: iterationContext.iterationCurrent, iterationTotal: iterationContext.iterationTotal, iterationType: iterationContext.iterationType as any, + iterationContainerId: iterationContext.iterationContainerId, } : {} diff --git a/apps/sim/stores/terminal/console/store.ts b/apps/sim/stores/terminal/console/store.ts index 8bd374204..55b59b135 100644 --- a/apps/sim/stores/terminal/console/store.ts +++ b/apps/sim/stores/terminal/console/store.ts @@ -62,6 +62,38 @@ const shouldSkipEntry = (output: any): boolean => { return false } +const matchesEntryForUpdate = ( + entry: ConsoleEntry, + blockId: string, + executionId: string | undefined, + update: string | ConsoleUpdate +): boolean => { + if (entry.blockId !== blockId || entry.executionId !== executionId) { + return false + } + + if (typeof update !== 'object') { + return true + } + + if (update.executionOrder !== undefined && entry.executionOrder !== update.executionOrder) { + return false + } + + if (update.iterationCurrent !== undefined && entry.iterationCurrent !== update.iterationCurrent) { + return false + } + + if ( + update.iterationContainerId !== undefined && + entry.iterationContainerId !== update.iterationContainerId + ) { + return false + } + + return true +} + interface NotifyBlockErrorParams { error: unknown blockName: string @@ -299,15 +331,7 @@ export const useTerminalConsoleStore = create()( updateConsole: (blockId: string, update: string | ConsoleUpdate, executionId?: string) => { set((state) => { const updatedEntries = state.entries.map((entry) => { - if (entry.blockId !== blockId || entry.executionId !== executionId) { - return entry - } - - if ( - typeof update === 'object' && - update.iterationCurrent !== undefined && - entry.iterationCurrent !== update.iterationCurrent - ) { + if (!matchesEntryForUpdate(entry, blockId, executionId, update)) { return entry } @@ -387,6 +411,10 @@ export const useTerminalConsoleStore = create()( updatedEntry.iterationType = update.iterationType } + if (update.iterationContainerId !== undefined) { + updatedEntry.iterationContainerId = update.iterationContainerId + } + return updatedEntry }) diff --git a/apps/sim/stores/terminal/console/types.ts b/apps/sim/stores/terminal/console/types.ts index 3ddb4b424..f15f36377 100644 --- a/apps/sim/stores/terminal/console/types.ts +++ b/apps/sim/stores/terminal/console/types.ts @@ -21,6 +21,7 @@ export interface ConsoleEntry { iterationCurrent?: number iterationTotal?: number iterationType?: SubflowType + iterationContainerId?: string isRunning?: boolean isCanceled?: boolean } @@ -29,6 +30,7 @@ export interface ConsoleUpdate { content?: string output?: Partial replaceOutput?: NormalizedBlockOutput + executionOrder?: number error?: string | Error | null warning?: string success?: boolean @@ -41,6 +43,7 @@ export interface ConsoleUpdate { iterationCurrent?: number iterationTotal?: number iterationType?: SubflowType + iterationContainerId?: string } export interface ConsoleStore {