mirror of
https://github.com/simstudioai/sim.git
synced 2026-03-15 03:00:33 -04:00
fix(terminal): thread executionOrder through child workflow SSE events for loop support (#3346)
* fix(terminal): thread executionOrder through child workflow SSE events for loop support * ran lint * fix(terminal): render iteration children through EntryNodeRow for workflow block expansion IterationNodeRow was rendering all children as flat BlockRow components, ignoring nodeType. Workflow blocks inside loop iterations were never rendered as WorkflowNodeRow, so they had no expand chevron or child tree. * fix(terminal): add childWorkflowBlockId to matchesEntryForUpdate Sub-executors reset executionOrderCounter, so child blocks across loop iterations share the same blockId + executionOrder. Without checking childWorkflowBlockId, updateConsole for iteration N overwrites entries from iterations 0..N-1, causing all child blocks to be grouped under the last iteration's workflow instance.
This commit is contained in:
@@ -987,7 +987,8 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
|
||||
const onChildWorkflowInstanceReady = (
|
||||
blockId: string,
|
||||
childWorkflowInstanceId: string,
|
||||
iterationContext?: IterationContext
|
||||
iterationContext?: IterationContext,
|
||||
executionOrder?: number
|
||||
) => {
|
||||
sendEvent({
|
||||
type: 'block:childWorkflowStarted',
|
||||
@@ -1001,6 +1002,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
|
||||
iterationCurrent: iterationContext.iterationCurrent,
|
||||
iterationContainerId: iterationContext.iterationContainerId,
|
||||
}),
|
||||
...(executionOrder !== undefined && { executionOrder }),
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
@@ -160,12 +160,16 @@ const IterationNodeRow = memo(function IterationNodeRow({
|
||||
onSelectEntry,
|
||||
isExpanded,
|
||||
onToggle,
|
||||
expandedNodes,
|
||||
onToggleNode,
|
||||
}: {
|
||||
node: EntryNode
|
||||
selectedEntryId: string | null
|
||||
onSelectEntry: (entry: ConsoleEntry) => void
|
||||
isExpanded: boolean
|
||||
onToggle: () => void
|
||||
expandedNodes: Set<string>
|
||||
onToggleNode: (nodeId: string) => void
|
||||
}) {
|
||||
const { entry, children, iterationInfo } = node
|
||||
const hasError = Boolean(entry.error) || children.some((c) => c.entry.error)
|
||||
@@ -226,11 +230,13 @@ const IterationNodeRow = memo(function IterationNodeRow({
|
||||
{isExpanded && hasChildren && (
|
||||
<div className={ROW_STYLES.nested}>
|
||||
{children.map((child) => (
|
||||
<BlockRow
|
||||
<EntryNodeRow
|
||||
key={child.entry.id}
|
||||
entry={child.entry}
|
||||
isSelected={selectedEntryId === child.entry.id}
|
||||
onSelect={onSelectEntry}
|
||||
node={child}
|
||||
selectedEntryId={selectedEntryId}
|
||||
onSelectEntry={onSelectEntry}
|
||||
expandedNodes={expandedNodes}
|
||||
onToggleNode={onToggleNode}
|
||||
/>
|
||||
))}
|
||||
</div>
|
||||
@@ -346,6 +352,8 @@ const SubflowNodeRow = memo(function SubflowNodeRow({
|
||||
onSelectEntry={onSelectEntry}
|
||||
isExpanded={expandedNodes.has(iterNode.entry.id)}
|
||||
onToggle={() => onToggleNode(iterNode.entry.id)}
|
||||
expandedNodes={expandedNodes}
|
||||
onToggleNode={onToggleNode}
|
||||
/>
|
||||
))}
|
||||
</div>
|
||||
@@ -520,6 +528,8 @@ const EntryNodeRow = memo(function EntryNodeRow({
|
||||
onSelectEntry={onSelectEntry}
|
||||
isExpanded={expandedNodes.has(node.entry.id)}
|
||||
onToggle={() => onToggleNode(node.entry.id)}
|
||||
expandedNodes={expandedNodes}
|
||||
onToggleNode={onToggleNode}
|
||||
/>
|
||||
)
|
||||
}
|
||||
|
||||
@@ -554,6 +554,7 @@ export function useWorkflowExecution() {
|
||||
childWorkflowInstanceId: string
|
||||
iterationCurrent?: number
|
||||
iterationContainerId?: string
|
||||
executionOrder?: number
|
||||
}) => {
|
||||
if (isStaleExecution()) return
|
||||
updateConsole(
|
||||
@@ -564,6 +565,7 @@ export function useWorkflowExecution() {
|
||||
...(data.iterationContainerId !== undefined && {
|
||||
iterationContainerId: data.iterationContainerId,
|
||||
}),
|
||||
...(data.executionOrder !== undefined && { executionOrder: data.executionOrder }),
|
||||
},
|
||||
executionIdRef.current
|
||||
)
|
||||
|
||||
@@ -80,7 +80,10 @@ export class BlockExecutor {
|
||||
const startTime = performance.now()
|
||||
let resolvedInputs: Record<string, any> = {}
|
||||
|
||||
const nodeMetadata = this.buildNodeMetadata(node)
|
||||
const nodeMetadata = {
|
||||
...this.buildNodeMetadata(node),
|
||||
executionOrder: blockLog?.executionOrder,
|
||||
}
|
||||
let cleanupSelfReference: (() => void) | undefined
|
||||
|
||||
if (block.metadata?.id === BlockType.HUMAN_IN_THE_LOOP) {
|
||||
|
||||
@@ -89,7 +89,8 @@ export interface ExecutionCallbacks {
|
||||
onChildWorkflowInstanceReady?: (
|
||||
blockId: string,
|
||||
childWorkflowInstanceId: string,
|
||||
iterationContext?: IterationContext
|
||||
iterationContext?: IterationContext,
|
||||
executionOrder?: number
|
||||
) => void
|
||||
}
|
||||
|
||||
@@ -155,7 +156,8 @@ export interface ContextExtensions {
|
||||
onChildWorkflowInstanceReady?: (
|
||||
blockId: string,
|
||||
childWorkflowInstanceId: string,
|
||||
iterationContext?: IterationContext
|
||||
iterationContext?: IterationContext,
|
||||
executionOrder?: number
|
||||
) => void
|
||||
|
||||
/**
|
||||
|
||||
@@ -62,6 +62,7 @@ export class WorkflowBlockHandler implements BlockHandler {
|
||||
branchTotal?: number
|
||||
originalBlockId?: string
|
||||
isLoopNode?: boolean
|
||||
executionOrder?: number
|
||||
}
|
||||
): Promise<BlockOutput | StreamingExecution> {
|
||||
return this._executeCore(ctx, block, inputs, nodeMetadata)
|
||||
@@ -79,6 +80,7 @@ export class WorkflowBlockHandler implements BlockHandler {
|
||||
branchTotal?: number
|
||||
originalBlockId?: string
|
||||
isLoopNode?: boolean
|
||||
executionOrder?: number
|
||||
}
|
||||
): Promise<BlockOutput | StreamingExecution> {
|
||||
logger.info(`Executing workflow block: ${block.id}`)
|
||||
@@ -169,7 +171,12 @@ export class WorkflowBlockHandler implements BlockHandler {
|
||||
const iterationContext = nodeMetadata
|
||||
? this.getIterationContext(ctx, nodeMetadata)
|
||||
: undefined
|
||||
ctx.onChildWorkflowInstanceReady?.(effectiveBlockId, instanceId, iterationContext)
|
||||
ctx.onChildWorkflowInstanceReady?.(
|
||||
effectiveBlockId,
|
||||
instanceId,
|
||||
iterationContext,
|
||||
nodeMetadata?.executionOrder
|
||||
)
|
||||
}
|
||||
|
||||
const subExecutor = new Executor({
|
||||
|
||||
@@ -264,7 +264,8 @@ export interface ExecutionContext {
|
||||
onChildWorkflowInstanceReady?: (
|
||||
blockId: string,
|
||||
childWorkflowInstanceId: string,
|
||||
iterationContext?: IterationContext
|
||||
iterationContext?: IterationContext,
|
||||
executionOrder?: number
|
||||
) => void
|
||||
|
||||
/**
|
||||
@@ -377,6 +378,7 @@ export interface BlockHandler {
|
||||
branchTotal?: number
|
||||
originalBlockId?: string
|
||||
isLoopNode?: boolean
|
||||
executionOrder?: number
|
||||
}
|
||||
) => Promise<BlockOutput | StreamingExecution>
|
||||
}
|
||||
|
||||
@@ -155,6 +155,7 @@ export interface BlockChildWorkflowStartedEvent extends BaseExecutionEvent {
|
||||
childWorkflowInstanceId: string
|
||||
iterationCurrent?: number
|
||||
iterationContainerId?: string
|
||||
executionOrder?: number
|
||||
}
|
||||
}
|
||||
|
||||
@@ -396,7 +397,8 @@ export function createSSECallbacks(options: SSECallbackOptions) {
|
||||
const onChildWorkflowInstanceReady = (
|
||||
blockId: string,
|
||||
childWorkflowInstanceId: string,
|
||||
iterationContext?: IterationContext
|
||||
iterationContext?: IterationContext,
|
||||
executionOrder?: number
|
||||
) => {
|
||||
sendEvent({
|
||||
type: 'block:childWorkflowStarted',
|
||||
@@ -410,6 +412,7 @@ export function createSSECallbacks(options: SSECallbackOptions) {
|
||||
iterationCurrent: iterationContext.iterationCurrent,
|
||||
iterationContainerId: iterationContext.iterationContainerId,
|
||||
}),
|
||||
...(executionOrder !== undefined && { executionOrder }),
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
@@ -91,6 +91,13 @@ const matchesEntryForUpdate = (
|
||||
return false
|
||||
}
|
||||
|
||||
if (
|
||||
update.childWorkflowBlockId !== undefined &&
|
||||
entry.childWorkflowBlockId !== update.childWorkflowBlockId
|
||||
) {
|
||||
return false
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user