Compare commits

...

4 Commits

Author SHA1 Message Date
Waleed
af592349d3 v0.5.99: local dev improvements, live workflow logs in terminal 2026-02-23 00:24:49 -08:00
Waleed
69ec70af13 feat(terminal): expandable child workflow blocks in console (#3306)
* feat(terminal): expandable child workflow blocks in console

* fix(terminal): cycle guard in collectWorkflowDescendants, workflow node running/canceled state

* fix(terminal): expand workflow blocks nested inside loop/parallel iterations

* fix(terminal): prevent child block mixing across loop iterations for workflow blocks

* ack PR comments, remove extranoeus logs

* feat(terminal): real-time child workflow block propagation in console

* fix(terminal): align parallel guard in WorkflowBlockHandler.getIterationContext with BlockExecutor

* fix(terminal): fire onChildWorkflowInstanceReady regardless of nodeMetadata presence

* fix(terminal): use shared isWorkflowBlockType from executor/constants
2026-02-23 00:17:44 -08:00
Waleed
687c12528b fix(parallel): correct active state pulsing and duration display for parallel subflow blocks (#3305)
* fix(executor): resolve block ID for parallel subflow active state

* fix timing for parallel block

* refactor(parallel): extract shared updateActiveBlockRefCount helper

* fix(parallel): error-sticky block run status to prevent branch success masking failure

* Revert "fix(parallel): error-sticky block run status to prevent branch success masking failure"

This reverts commit 9c087cd466.
2026-02-22 15:03:33 -08:00
Waleed
996dc96d6e fix(security): allow HTTP for localhost and loopback addresses (#3304)
* fix(security): allow localhost HTTP without weakening SSRF protections

* fix(security): remove extraneous comments and fix failing SSRF test

* fix(security): derive isLocalhost from hostname not resolved IP in validateUrlWithDNS

* fix(security): verify resolved IP is loopback when hostname is localhost in validateUrlWithDNS

---------

Co-authored-by: aayush598 <aayushgid598@gmail.com>
2026-02-22 14:58:11 -08:00
22 changed files with 862 additions and 131 deletions

View File

@@ -211,7 +211,7 @@ describe('Function Execute API Route', () => {
it.concurrent('should block SSRF attacks through secure fetch wrapper', async () => {
expect(validateProxyUrl('http://169.254.169.254/latest/meta-data/').isValid).toBe(false)
expect(validateProxyUrl('http://127.0.0.1:8080/admin').isValid).toBe(false)
expect(validateProxyUrl('http://127.0.0.1:8080/admin').isValid).toBe(true)
expect(validateProxyUrl('http://192.168.1.1/config').isValid).toBe(false)
expect(validateProxyUrl('http://10.0.0.1/internal').isValid).toBe(false)
})

View File

@@ -38,6 +38,7 @@ import { executeWorkflowJob, type WorkflowExecutionPayload } from '@/background/
import { normalizeName } from '@/executor/constants'
import { ExecutionSnapshot } from '@/executor/execution/snapshot'
import type {
ChildWorkflowContext,
ExecutionMetadata,
IterationContext,
SerializableExecutionState,
@@ -742,7 +743,8 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
blockName: string,
blockType: string,
executionOrder: number,
iterationContext?: IterationContext
iterationContext?: IterationContext,
childWorkflowContext?: ChildWorkflowContext
) => {
logger.info(`[${requestId}] 🔷 onBlockStart called:`, { blockId, blockName, blockType })
sendEvent({
@@ -761,6 +763,10 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
iterationType: iterationContext.iterationType,
iterationContainerId: iterationContext.iterationContainerId,
}),
...(childWorkflowContext && {
childWorkflowBlockId: childWorkflowContext.parentBlockId,
childWorkflowName: childWorkflowContext.workflowName,
}),
},
})
}
@@ -770,9 +776,20 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
blockName: string,
blockType: string,
callbackData: any,
iterationContext?: IterationContext
iterationContext?: IterationContext,
childWorkflowContext?: ChildWorkflowContext
) => {
const hasError = callbackData.output?.error
const childWorkflowData = childWorkflowContext
? {
childWorkflowBlockId: childWorkflowContext.parentBlockId,
childWorkflowName: childWorkflowContext.workflowName,
}
: {}
const instanceData = callbackData.childWorkflowInstanceId
? { childWorkflowInstanceId: callbackData.childWorkflowInstanceId }
: {}
if (hasError) {
logger.info(`[${requestId}] ✗ onBlockComplete (error) called:`, {
@@ -802,6 +819,8 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
iterationType: iterationContext.iterationType,
iterationContainerId: iterationContext.iterationContainerId,
}),
...childWorkflowData,
...instanceData,
},
})
} else {
@@ -831,6 +850,8 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
iterationType: iterationContext.iterationType,
iterationContainerId: iterationContext.iterationContainerId,
}),
...childWorkflowData,
...instanceData,
},
})
}
@@ -898,12 +919,34 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
selectedOutputs
)
const onChildWorkflowInstanceReady = (
blockId: string,
childWorkflowInstanceId: string,
iterationContext?: IterationContext
) => {
sendEvent({
type: 'block:childWorkflowStarted',
timestamp: new Date().toISOString(),
executionId,
workflowId,
data: {
blockId,
childWorkflowInstanceId,
...(iterationContext && {
iterationCurrent: iterationContext.iterationCurrent,
iterationContainerId: iterationContext.iterationContainerId,
}),
},
})
}
const result = await executeWorkflowCore({
snapshot,
callbacks: {
onBlockStart,
onBlockComplete,
onStream,
onChildWorkflowInstanceReady,
},
loggingSession,
abortSignal: timeoutController.signal,

View File

@@ -41,6 +41,7 @@ import {
} from '@/app/workspace/[workspaceId]/w/[workflowId]/components/terminal/hooks'
import { ROW_STYLES } from '@/app/workspace/[workspaceId]/w/[workflowId]/components/terminal/types'
import {
collectExpandableNodeIds,
type EntryNode,
type ExecutionGroup,
flattenBlockEntriesOnly,
@@ -67,6 +68,21 @@ const MIN_HEIGHT = TERMINAL_HEIGHT.MIN
const DEFAULT_EXPANDED_HEIGHT = TERMINAL_HEIGHT.DEFAULT
const MIN_OUTPUT_PANEL_WIDTH_PX = OUTPUT_PANEL_WIDTH.MIN
/** Returns true if any node in the subtree has an error */
function hasErrorInTree(nodes: EntryNode[]): boolean {
return nodes.some((n) => Boolean(n.entry.error) || hasErrorInTree(n.children))
}
/** Returns true if any node in the subtree is currently running */
function hasRunningInTree(nodes: EntryNode[]): boolean {
return nodes.some((n) => Boolean(n.entry.isRunning) || hasRunningInTree(n.children))
}
/** Returns true if any node in the subtree was canceled */
function hasCanceledInTree(nodes: EntryNode[]): boolean {
return nodes.some((n) => Boolean(n.entry.isCanceled) || hasCanceledInTree(n.children))
}
/**
* Block row component for displaying actual block entries
*/
@@ -338,6 +354,122 @@ const SubflowNodeRow = memo(function SubflowNodeRow({
)
})
/**
* Workflow node component - shows workflow block header with nested child blocks
*/
const WorkflowNodeRow = memo(function WorkflowNodeRow({
node,
selectedEntryId,
onSelectEntry,
expandedNodes,
onToggleNode,
}: {
node: EntryNode
selectedEntryId: string | null
onSelectEntry: (entry: ConsoleEntry) => void
expandedNodes: Set<string>
onToggleNode: (nodeId: string) => void
}) {
const { entry, children } = node
const BlockIcon = getBlockIcon(entry.blockType)
const bgColor = getBlockColor(entry.blockType)
const nodeId = entry.id
const isExpanded = expandedNodes.has(nodeId)
const hasChildren = children.length > 0
const isSelected = selectedEntryId === entry.id
const hasError = useMemo(
() => Boolean(entry.error) || hasErrorInTree(children),
[entry.error, children]
)
const hasRunningDescendant = useMemo(
() => Boolean(entry.isRunning) || hasRunningInTree(children),
[entry.isRunning, children]
)
const hasCanceledDescendant = useMemo(
() => (Boolean(entry.isCanceled) || hasCanceledInTree(children)) && !hasRunningDescendant,
[entry.isCanceled, children, hasRunningDescendant]
)
return (
<div className='flex min-w-0 flex-col'>
{/* Workflow Block Header */}
<div
className={clsx(
ROW_STYLES.base,
'h-[26px]',
isSelected ? ROW_STYLES.selected : ROW_STYLES.hover
)}
onClick={(e) => {
e.stopPropagation()
if (!isSelected) onSelectEntry(entry)
if (hasChildren) onToggleNode(nodeId)
}}
>
<div className='flex min-w-0 flex-1 items-center gap-[8px]'>
<div
className='flex h-[14px] w-[14px] flex-shrink-0 items-center justify-center rounded-[4px]'
style={{ background: bgColor }}
>
{BlockIcon && <BlockIcon className='h-[9px] w-[9px] text-white' />}
</div>
<span
className={clsx(
'min-w-0 truncate font-medium text-[13px]',
hasError
? 'text-[var(--text-error)]'
: isSelected || isExpanded
? 'text-[var(--text-primary)]'
: 'text-[var(--text-tertiary)] group-hover:text-[var(--text-primary)]'
)}
>
{entry.blockName}
</span>
{hasChildren && (
<ChevronDown
className={clsx(
'h-[8px] w-[8px] flex-shrink-0 text-[var(--text-tertiary)] transition-transform duration-100 group-hover:text-[var(--text-primary)]',
!isExpanded && '-rotate-90'
)}
/>
)}
</div>
<span
className={clsx(
'flex-shrink-0 font-medium text-[13px]',
!hasRunningDescendant &&
(hasCanceledDescendant
? 'text-[var(--text-secondary)]'
: 'text-[var(--text-tertiary)]')
)}
>
<StatusDisplay
isRunning={hasRunningDescendant}
isCanceled={hasCanceledDescendant}
formattedDuration={formatDuration(entry.durationMs, { precision: 2 }) ?? '-'}
/>
</span>
</div>
{/* Nested Child Blocks — rendered through EntryNodeRow for full loop/parallel support */}
{isExpanded && hasChildren && (
<div className={ROW_STYLES.nested}>
{children.map((child) => (
<EntryNodeRow
key={child.entry.id}
node={child}
selectedEntryId={selectedEntryId}
onSelectEntry={onSelectEntry}
expandedNodes={expandedNodes}
onToggleNode={onToggleNode}
/>
))}
</div>
)}
</div>
)
})
/**
* Entry node component - dispatches to appropriate component based on node type
*/
@@ -368,6 +500,18 @@ const EntryNodeRow = memo(function EntryNodeRow({
)
}
if (nodeType === 'workflow') {
return (
<WorkflowNodeRow
node={node}
selectedEntryId={selectedEntryId}
onSelectEntry={onSelectEntry}
expandedNodes={expandedNodes}
onToggleNode={onToggleNode}
/>
)
}
if (nodeType === 'iteration') {
return (
<IterationNodeRow
@@ -659,27 +803,15 @@ export const Terminal = memo(function Terminal() {
])
/**
* Auto-expand subflows and iterations when new entries arrive.
* Auto-expand subflows, iterations, and workflow nodes when new entries arrive.
* Recursively walks the full tree so nested nodes (e.g. a workflow block inside
* a loop iteration) are also expanded automatically.
* This always runs regardless of autoSelectEnabled - new runs should always be visible.
*/
useEffect(() => {
if (executionGroups.length === 0) return
const newestExec = executionGroups[0]
// Collect all node IDs that should be expanded (subflows and their iterations)
const nodeIdsToExpand: string[] = []
for (const node of newestExec.entryTree) {
if (node.nodeType === 'subflow' && node.children.length > 0) {
nodeIdsToExpand.push(node.entry.id)
// Also expand all iteration children
for (const iterNode of node.children) {
if (iterNode.nodeType === 'iteration') {
nodeIdsToExpand.push(iterNode.entry.id)
}
}
}
}
const nodeIdsToExpand = collectExpandableNodeIds(executionGroups[0].entryTree)
if (nodeIdsToExpand.length > 0) {
setExpandedNodes((prev) => {

View File

@@ -1,6 +1,14 @@
import type React from 'react'
import { AlertTriangleIcon, BanIcon, RepeatIcon, SplitIcon, XCircleIcon } from 'lucide-react'
import {
AlertTriangleIcon,
BanIcon,
NetworkIcon,
RepeatIcon,
SplitIcon,
XCircleIcon,
} from 'lucide-react'
import { getBlock } from '@/blocks'
import { isWorkflowBlockType } from '@/executor/constants'
import { TERMINAL_BLOCK_COLUMN_WIDTH } from '@/stores/constants'
import type { ConsoleEntry } from '@/stores/terminal'
@@ -12,6 +20,8 @@ const SUBFLOW_COLORS = {
parallel: '#FEE12B',
} as const
const WORKFLOW_COLOR = '#8b5cf6'
/**
* Special block type colors for errors and system messages
*/
@@ -41,6 +51,10 @@ export function getBlockIcon(
return SplitIcon
}
if (blockType === 'workflow') {
return NetworkIcon
}
if (blockType === 'error') {
return XCircleIcon
}
@@ -71,6 +85,9 @@ export function getBlockColor(blockType: string): string {
if (blockType === 'parallel') {
return SUBFLOW_COLORS.parallel
}
if (blockType === 'workflow') {
return WORKFLOW_COLOR
}
// Special block types for errors and system messages
if (blockType === 'error') {
return SPECIAL_BLOCK_COLORS.error
@@ -120,7 +137,7 @@ export function isSubflowBlockType(blockType: string): boolean {
/**
* Node type for the tree structure
*/
export type EntryNodeType = 'block' | 'subflow' | 'iteration'
export type EntryNodeType = 'block' | 'subflow' | 'iteration' | 'workflow'
/**
* Entry node for tree structure - represents a block, subflow, or iteration
@@ -168,6 +185,36 @@ interface IterationGroup {
startTimeMs: number
}
/**
* Recursively collects all descendant entries owned by a workflow block.
* This includes direct children and the children of any nested workflow blocks,
* enabling correct tree construction for deeply-nested child workflows.
*/
function collectWorkflowDescendants(
instanceKey: string,
workflowChildGroups: Map<string, ConsoleEntry[]>,
visited: Set<string> = new Set()
): ConsoleEntry[] {
if (visited.has(instanceKey)) return []
visited.add(instanceKey)
const direct = workflowChildGroups.get(instanceKey) ?? []
const result = [...direct]
for (const entry of direct) {
if (isWorkflowBlockType(entry.blockType)) {
// Use childWorkflowInstanceId when available (unique per-invocation) to correctly
// separate children across loop iterations of the same workflow block.
result.push(
...collectWorkflowDescendants(
entry.childWorkflowInstanceId ?? entry.blockId,
workflowChildGroups,
visited
)
)
}
}
return result
}
/**
* Builds a tree structure from flat entries.
* Groups iteration entries by (iterationType, iterationContainerId, iterationCurrent), showing all blocks
@@ -175,18 +222,37 @@ interface IterationGroup {
* Sorts by start time to ensure chronological order.
*/
function buildEntryTree(entries: ConsoleEntry[]): EntryNode[] {
// Separate regular blocks from iteration entries
// Separate entries into three buckets:
// 1. Iteration entries (loop/parallel children)
// 2. Workflow child entries (blocks inside a child workflow)
// 3. Regular blocks
const regularBlocks: ConsoleEntry[] = []
const iterationEntries: ConsoleEntry[] = []
const workflowChildEntries: ConsoleEntry[] = []
for (const entry of entries) {
if (entry.iterationType && entry.iterationCurrent !== undefined) {
if (entry.childWorkflowBlockId) {
// Child workflow entries take priority over iteration classification
workflowChildEntries.push(entry)
} else if (entry.iterationType && entry.iterationCurrent !== undefined) {
iterationEntries.push(entry)
} else {
regularBlocks.push(entry)
}
}
// Group workflow child entries by the parent workflow block ID
const workflowChildGroups = new Map<string, ConsoleEntry[]>()
for (const entry of workflowChildEntries) {
const parentId = entry.childWorkflowBlockId!
const group = workflowChildGroups.get(parentId)
if (group) {
group.push(entry)
} else {
workflowChildGroups.set(parentId, [entry])
}
}
// Group iteration entries by (iterationType, iterationContainerId, iterationCurrent)
const iterationGroupsMap = new Map<string, IterationGroup>()
for (const entry of iterationEntries) {
@@ -261,6 +327,9 @@ function buildEntryTree(entries: ConsoleEntry[]): EntryNode[] {
...allBlocks.map((b) => new Date(b.endedAt || b.timestamp).getTime())
)
const totalDuration = allBlocks.reduce((sum, b) => sum + (b.durationMs || 0), 0)
// Parallel branches run concurrently — use wall-clock time. Loop iterations run serially — use sum.
const subflowDuration =
iterationType === 'parallel' ? subflowEndMs - subflowStartMs : totalDuration
// Create synthetic subflow parent entry
// Use the minimum executionOrder from all child blocks for proper ordering
@@ -276,7 +345,7 @@ function buildEntryTree(entries: ConsoleEntry[]): EntryNode[] {
startedAt: new Date(subflowStartMs).toISOString(),
executionOrder: subflowExecutionOrder,
endedAt: new Date(subflowEndMs).toISOString(),
durationMs: totalDuration,
durationMs: subflowDuration,
success: !allBlocks.some((b) => b.error),
}
@@ -291,6 +360,9 @@ function buildEntryTree(entries: ConsoleEntry[]): EntryNode[] {
...iterBlocks.map((b) => new Date(b.endedAt || b.timestamp).getTime())
)
const iterDuration = iterBlocks.reduce((sum, b) => sum + (b.durationMs || 0), 0)
// Parallel branches run concurrently — use wall-clock time. Loop iterations run serially — use sum.
const iterDisplayDuration =
iterationType === 'parallel' ? iterEndMs - iterStartMs : iterDuration
// Use the minimum executionOrder from blocks in this iteration
const iterExecutionOrder = Math.min(...iterBlocks.map((b) => b.executionOrder))
@@ -305,7 +377,7 @@ function buildEntryTree(entries: ConsoleEntry[]): EntryNode[] {
startedAt: new Date(iterStartMs).toISOString(),
executionOrder: iterExecutionOrder,
endedAt: new Date(iterEndMs).toISOString(),
durationMs: iterDuration,
durationMs: iterDisplayDuration,
success: !iterBlocks.some((b) => b.error),
iterationCurrent: iterGroup.iterationCurrent,
iterationTotal: iterGroup.iterationTotal,
@@ -313,12 +385,24 @@ function buildEntryTree(entries: ConsoleEntry[]): EntryNode[] {
iterationContainerId: iterGroup.iterationContainerId,
}
// Block nodes within this iteration
const blockNodes: EntryNode[] = iterBlocks.map((block) => ({
entry: block,
children: [],
nodeType: 'block' as const,
}))
// Block nodes within this iteration — workflow blocks get their full subtree
const blockNodes: EntryNode[] = iterBlocks.map((block) => {
if (isWorkflowBlockType(block.blockType)) {
const instanceKey = block.childWorkflowInstanceId ?? block.blockId
const allDescendants = collectWorkflowDescendants(instanceKey, workflowChildGroups)
const rawChildren = allDescendants.map((c) => ({
...c,
childWorkflowBlockId:
c.childWorkflowBlockId === instanceKey ? undefined : c.childWorkflowBlockId,
}))
return {
entry: block,
children: buildEntryTree(rawChildren),
nodeType: 'workflow' as const,
}
}
return { entry: block, children: [], nodeType: 'block' as const }
})
return {
entry: syntheticIteration,
@@ -338,19 +422,61 @@ function buildEntryTree(entries: ConsoleEntry[]): EntryNode[] {
})
}
// Build nodes for regular blocks
const regularNodes: EntryNode[] = regularBlocks.map((entry) => ({
// Build workflow nodes for regular blocks that are workflow block types
const workflowNodes: EntryNode[] = []
const remainingRegularBlocks: ConsoleEntry[] = []
for (const block of regularBlocks) {
if (isWorkflowBlockType(block.blockType)) {
const instanceKey = block.childWorkflowInstanceId ?? block.blockId
const allDescendants = collectWorkflowDescendants(instanceKey, workflowChildGroups)
const rawChildren = allDescendants.map((c) => ({
...c,
childWorkflowBlockId:
c.childWorkflowBlockId === instanceKey ? undefined : c.childWorkflowBlockId,
}))
const children = buildEntryTree(rawChildren)
workflowNodes.push({ entry: block, children, nodeType: 'workflow' as const })
} else {
remainingRegularBlocks.push(block)
}
}
// Build nodes for remaining regular blocks
const regularNodes: EntryNode[] = remainingRegularBlocks.map((entry) => ({
entry,
children: [],
nodeType: 'block' as const,
}))
// Combine all nodes and sort by executionOrder ascending (oldest first, top-down)
const allNodes = [...subflowNodes, ...regularNodes]
const allNodes = [...subflowNodes, ...workflowNodes, ...regularNodes]
allNodes.sort((a, b) => a.entry.executionOrder - b.entry.executionOrder)
return allNodes
}
/**
* Recursively collects IDs of all nodes that should be auto-expanded.
* Includes subflow, iteration, and workflow nodes that have children.
*/
export function collectExpandableNodeIds(nodes: EntryNode[]): string[] {
const ids: string[] = []
for (const node of nodes) {
if (
(node.nodeType === 'subflow' ||
node.nodeType === 'iteration' ||
node.nodeType === 'workflow') &&
node.children.length > 0
) {
ids.push(node.entry.id)
}
if (node.children.length > 0) {
ids.push(...collectExpandableNodeIds(node.children))
}
}
return ids
}
/**
* Groups console entries by execution ID and builds a tree structure.
* Pre-computes timestamps for efficient sorting.
@@ -458,7 +584,7 @@ export function flattenBlockEntriesOnly(
): NavigableBlockEntry[] {
const result: NavigableBlockEntry[] = []
for (const node of nodes) {
if (node.nodeType === 'block') {
if (node.nodeType === 'block' || node.nodeType === 'workflow') {
result.push({
entry: node.entry,
executionId,

View File

@@ -20,6 +20,7 @@ import {
TriggerUtils,
} from '@/lib/workflows/triggers/triggers'
import { useCurrentWorkflow } from '@/app/workspace/[workspaceId]/w/[workflowId]/hooks/use-current-workflow'
import { updateActiveBlockRefCount } from '@/app/workspace/[workspaceId]/w/[workflowId]/utils/workflow-execution-utils'
import { getBlock } from '@/blocks'
import type { SerializableExecutionState } from '@/executor/execution/types'
import type {
@@ -63,6 +64,7 @@ interface BlockEventHandlerConfig {
executionIdRef: { current: string }
workflowEdges: Array<{ id: string; target: string; sourceHandle?: string | null }>
activeBlocksSet: Set<string>
activeBlockRefCounts: Map<string, number>
accumulatedBlockLogs: BlockLog[]
accumulatedBlockStates: Map<string, BlockState>
executedBlockIds: Set<string>
@@ -309,6 +311,7 @@ export function useWorkflowExecution() {
executionIdRef,
workflowEdges,
activeBlocksSet,
activeBlockRefCounts,
accumulatedBlockLogs,
accumulatedBlockStates,
executedBlockIds,
@@ -327,11 +330,7 @@ export function useWorkflowExecution() {
const updateActiveBlocks = (blockId: string, isActive: boolean) => {
if (!workflowId) return
if (isActive) {
activeBlocksSet.add(blockId)
} else {
activeBlocksSet.delete(blockId)
}
updateActiveBlockRefCount(activeBlockRefCounts, activeBlocksSet, blockId, isActive)
setActiveBlocks(workflowId, new Set(activeBlocksSet))
}
@@ -384,6 +383,9 @@ export function useWorkflowExecution() {
iterationTotal: data.iterationTotal,
iterationType: data.iterationType,
iterationContainerId: data.iterationContainerId,
childWorkflowBlockId: data.childWorkflowBlockId,
childWorkflowName: data.childWorkflowName,
childWorkflowInstanceId: data.childWorkflowInstanceId,
})
}
@@ -407,6 +409,9 @@ export function useWorkflowExecution() {
iterationTotal: data.iterationTotal,
iterationType: data.iterationType,
iterationContainerId: data.iterationContainerId,
childWorkflowBlockId: data.childWorkflowBlockId,
childWorkflowName: data.childWorkflowName,
childWorkflowInstanceId: data.childWorkflowInstanceId,
})
}
@@ -426,6 +431,9 @@ export function useWorkflowExecution() {
iterationTotal: data.iterationTotal,
iterationType: data.iterationType,
iterationContainerId: data.iterationContainerId,
childWorkflowBlockId: data.childWorkflowBlockId,
childWorkflowName: data.childWorkflowName,
childWorkflowInstanceId: data.childWorkflowInstanceId,
},
executionIdRef.current
)
@@ -448,6 +456,9 @@ export function useWorkflowExecution() {
iterationTotal: data.iterationTotal,
iterationType: data.iterationType,
iterationContainerId: data.iterationContainerId,
childWorkflowBlockId: data.childWorkflowBlockId,
childWorkflowName: data.childWorkflowName,
childWorkflowInstanceId: data.childWorkflowInstanceId,
},
executionIdRef.current
)
@@ -479,6 +490,8 @@ export function useWorkflowExecution() {
iterationTotal: data.iterationTotal,
iterationType: data.iterationType,
iterationContainerId: data.iterationContainerId,
childWorkflowBlockId: data.childWorkflowBlockId,
childWorkflowName: data.childWorkflowName,
})
}
@@ -536,7 +549,27 @@ export function useWorkflowExecution() {
}
}
return { onBlockStarted, onBlockCompleted, onBlockError }
const onBlockChildWorkflowStarted = (data: {
blockId: string
childWorkflowInstanceId: string
iterationCurrent?: number
iterationContainerId?: string
}) => {
if (isStaleExecution()) return
updateConsole(
data.blockId,
{
childWorkflowInstanceId: data.childWorkflowInstanceId,
...(data.iterationCurrent !== undefined && { iterationCurrent: data.iterationCurrent }),
...(data.iterationContainerId !== undefined && {
iterationContainerId: data.iterationContainerId,
}),
},
executionIdRef.current
)
}
return { onBlockStarted, onBlockCompleted, onBlockError, onBlockChildWorkflowStarted }
},
[addConsole, setActiveBlocks, setBlockRunStatus, setEdgeRunStatus, updateConsole]
)
@@ -1280,6 +1313,7 @@ export function useWorkflowExecution() {
}
const activeBlocksSet = new Set<string>()
const activeBlockRefCounts = new Map<string, number>()
const streamedContent = new Map<string, string>()
const accumulatedBlockLogs: BlockLog[] = []
const accumulatedBlockStates = new Map<string, BlockState>()
@@ -1292,6 +1326,7 @@ export function useWorkflowExecution() {
executionIdRef,
workflowEdges,
activeBlocksSet,
activeBlockRefCounts,
accumulatedBlockLogs,
accumulatedBlockStates,
executedBlockIds,
@@ -1334,6 +1369,7 @@ export function useWorkflowExecution() {
onBlockStarted: blockHandlers.onBlockStarted,
onBlockCompleted: blockHandlers.onBlockCompleted,
onBlockError: blockHandlers.onBlockError,
onBlockChildWorkflowStarted: blockHandlers.onBlockChildWorkflowStarted,
onStreamChunk: (data) => {
const existing = streamedContent.get(data.blockId) || ''
@@ -1902,6 +1938,7 @@ export function useWorkflowExecution() {
const accumulatedBlockStates = new Map<string, BlockState>()
const executedBlockIds = new Set<string>()
const activeBlocksSet = new Set<string>()
const activeBlockRefCounts = new Map<string, number>()
try {
const blockHandlers = buildBlockEventHandlers({
@@ -1909,6 +1946,7 @@ export function useWorkflowExecution() {
executionIdRef,
workflowEdges,
activeBlocksSet,
activeBlockRefCounts,
accumulatedBlockLogs,
accumulatedBlockStates,
executedBlockIds,
@@ -1929,6 +1967,7 @@ export function useWorkflowExecution() {
onBlockStarted: blockHandlers.onBlockStarted,
onBlockCompleted: blockHandlers.onBlockCompleted,
onBlockError: blockHandlers.onBlockError,
onBlockChildWorkflowStarted: blockHandlers.onBlockChildWorkflowStarted,
onExecutionCompleted: (data) => {
if (data.success) {
@@ -2104,6 +2143,7 @@ export function useWorkflowExecution() {
const workflowEdges = useWorkflowStore.getState().edges
const activeBlocksSet = new Set<string>()
const activeBlockRefCounts = new Map<string, number>()
const accumulatedBlockLogs: BlockLog[] = []
const accumulatedBlockStates = new Map<string, BlockState>()
const executedBlockIds = new Set<string>()
@@ -2115,6 +2155,7 @@ export function useWorkflowExecution() {
executionIdRef,
workflowEdges,
activeBlocksSet,
activeBlockRefCounts,
accumulatedBlockLogs,
accumulatedBlockStates,
executedBlockIds,
@@ -2155,6 +2196,10 @@ export function useWorkflowExecution() {
clearOnce()
handlers.onBlockError(data)
},
onBlockChildWorkflowStarted: (data) => {
clearOnce()
handlers.onBlockChildWorkflowStarted(data)
},
onExecutionCompleted: () => {
const currentId = useExecutionStore
.getState()

View File

@@ -5,6 +5,30 @@ import { useTerminalConsoleStore } from '@/stores/terminal'
import { useWorkflowRegistry } from '@/stores/workflows/registry/store'
import { useWorkflowStore } from '@/stores/workflows/workflow/store'
/**
* Updates the active blocks set and ref counts for a single block.
* Ref counting ensures a block stays active until all parallel branches for it complete.
*/
export function updateActiveBlockRefCount(
refCounts: Map<string, number>,
activeSet: Set<string>,
blockId: string,
isActive: boolean
): void {
if (isActive) {
refCounts.set(blockId, (refCounts.get(blockId) ?? 0) + 1)
activeSet.add(blockId)
} else {
const next = (refCounts.get(blockId) ?? 1) - 1
if (next <= 0) {
refCounts.delete(blockId)
activeSet.delete(blockId)
} else {
refCounts.set(blockId, next)
}
}
}
export interface WorkflowExecutionOptions {
workflowInput?: any
onStream?: (se: StreamingExecution) => Promise<void>
@@ -39,6 +63,7 @@ export async function executeWorkflowWithFullLogging(
const workflowEdges = useWorkflowStore.getState().edges
const activeBlocksSet = new Set<string>()
const activeBlockRefCounts = new Map<string, number>()
const payload: any = {
input: options.workflowInput,
@@ -103,7 +128,12 @@ export async function executeWorkflowWithFullLogging(
switch (event.type) {
case 'block:started': {
activeBlocksSet.add(event.data.blockId)
updateActiveBlockRefCount(
activeBlockRefCounts,
activeBlocksSet,
event.data.blockId,
true
)
setActiveBlocks(wfId, new Set(activeBlocksSet))
const incomingEdges = workflowEdges.filter(
@@ -115,8 +145,13 @@ export async function executeWorkflowWithFullLogging(
break
}
case 'block:completed':
activeBlocksSet.delete(event.data.blockId)
case 'block:completed': {
updateActiveBlockRefCount(
activeBlockRefCounts,
activeBlocksSet,
event.data.blockId,
false
)
setActiveBlocks(wfId, new Set(activeBlocksSet))
setBlockRunStatus(wfId, event.data.blockId, 'success')
@@ -138,15 +173,24 @@ export async function executeWorkflowWithFullLogging(
iterationTotal: event.data.iterationTotal,
iterationType: event.data.iterationType,
iterationContainerId: event.data.iterationContainerId,
childWorkflowBlockId: event.data.childWorkflowBlockId,
childWorkflowName: event.data.childWorkflowName,
childWorkflowInstanceId: event.data.childWorkflowInstanceId,
})
if (options.onBlockComplete) {
options.onBlockComplete(event.data.blockId, event.data.output).catch(() => {})
}
break
}
case 'block:error':
activeBlocksSet.delete(event.data.blockId)
case 'block:error': {
updateActiveBlockRefCount(
activeBlockRefCounts,
activeBlocksSet,
event.data.blockId,
false
)
setActiveBlocks(wfId, new Set(activeBlocksSet))
setBlockRunStatus(wfId, event.data.blockId, 'error')
@@ -169,8 +213,30 @@ export async function executeWorkflowWithFullLogging(
iterationTotal: event.data.iterationTotal,
iterationType: event.data.iterationType,
iterationContainerId: event.data.iterationContainerId,
childWorkflowBlockId: event.data.childWorkflowBlockId,
childWorkflowName: event.data.childWorkflowName,
childWorkflowInstanceId: event.data.childWorkflowInstanceId,
})
break
}
case 'block:childWorkflowStarted': {
const { updateConsole } = useTerminalConsoleStore.getState()
updateConsole(
event.data.blockId,
{
childWorkflowInstanceId: event.data.childWorkflowInstanceId,
...(event.data.iterationCurrent !== undefined && {
iterationCurrent: event.data.iterationCurrent,
}),
...(event.data.iterationContainerId !== undefined && {
iterationContainerId: event.data.iterationContainerId,
}),
},
executionId
)
break
}
case 'execution:completed':
executionResult = {

View File

@@ -159,6 +159,7 @@ export const DEFAULTS = {
MAX_FOREACH_ITEMS: 1000,
MAX_PARALLEL_BRANCHES: 20,
MAX_WORKFLOW_DEPTH: 10,
MAX_SSE_CHILD_DEPTH: 3,
EXECUTION_TIME: 0,
TOKENS: {
PROMPT: 0,

View File

@@ -7,6 +7,7 @@ interface ChildWorkflowErrorOptions {
childTraceSpans?: TraceSpan[]
executionResult?: ExecutionResult
childWorkflowSnapshotId?: string
childWorkflowInstanceId?: string
cause?: Error
}
@@ -18,6 +19,8 @@ export class ChildWorkflowError extends Error {
readonly childWorkflowName: string
readonly executionResult?: ExecutionResult
readonly childWorkflowSnapshotId?: string
/** Per-invocation unique ID used to correlate child block events with this workflow block. */
readonly childWorkflowInstanceId?: string
constructor(options: ChildWorkflowErrorOptions) {
super(options.message, { cause: options.cause })
@@ -26,6 +29,7 @@ export class ChildWorkflowError extends Error {
this.childTraceSpans = options.childTraceSpans ?? []
this.executionResult = options.executionResult
this.childWorkflowSnapshotId = options.childWorkflowSnapshotId
this.childWorkflowInstanceId = options.childWorkflowInstanceId
}
static isChildWorkflowError(error: unknown): error is ChildWorkflowError {

View File

@@ -166,6 +166,9 @@ export class BlockExecutor {
this.state.setBlockOutput(node.id, normalizedOutput, duration)
if (!isSentinel && blockLog) {
const childWorkflowInstanceId = normalizedOutput._childWorkflowInstanceId as
| string
| undefined
const displayOutput = filterOutputForLog(block.metadata?.id || '', normalizedOutput, {
block,
})
@@ -178,7 +181,8 @@ export class BlockExecutor {
duration,
blockLog.startedAt,
blockLog.executionOrder,
blockLog.endedAt
blockLog.endedAt,
childWorkflowInstanceId
)
}
@@ -204,6 +208,8 @@ export class BlockExecutor {
parallelId?: string
branchIndex?: number
branchTotal?: number
originalBlockId?: string
isLoopNode?: boolean
} {
const metadata = node?.metadata ?? {}
return {
@@ -212,6 +218,8 @@ export class BlockExecutor {
parallelId: metadata.parallelId,
branchIndex: metadata.branchIndex,
branchTotal: metadata.branchTotal,
originalBlockId: metadata.originalBlockId,
isLoopNode: metadata.isLoopNode,
}
}
@@ -276,6 +284,9 @@ export class BlockExecutor {
)
if (!isSentinel && blockLog) {
const childWorkflowInstanceId = ChildWorkflowError.isChildWorkflowError(error)
? error.childWorkflowInstanceId
: undefined
const displayOutput = filterOutputForLog(block.metadata?.id || '', errorOutput, { block })
this.callOnBlockComplete(
ctx,
@@ -286,7 +297,8 @@ export class BlockExecutor {
duration,
blockLog.startedAt,
blockLog.executionOrder,
blockLog.endedAt
blockLog.endedAt,
childWorkflowInstanceId
)
}
@@ -428,7 +440,7 @@ export class BlockExecutor {
block: SerializedBlock,
executionOrder: number
): void {
const blockId = node.id
const blockId = node.metadata?.originalBlockId ?? node.id
const blockName = block.metadata?.name ?? blockId
const blockType = block.metadata?.id ?? DEFAULTS.BLOCK_TYPE
@@ -440,7 +452,8 @@ export class BlockExecutor {
blockName,
blockType,
executionOrder,
iterationContext
iterationContext,
ctx.childWorkflowContext
)
}
}
@@ -454,9 +467,10 @@ export class BlockExecutor {
duration: number,
startedAt: string,
executionOrder: number,
endedAt: string
endedAt: string,
childWorkflowInstanceId?: string
): void {
const blockId = node.id
const blockId = node.metadata?.originalBlockId ?? node.id
const blockName = block.metadata?.name ?? blockId
const blockType = block.metadata?.id ?? DEFAULTS.BLOCK_TYPE
@@ -474,8 +488,10 @@ export class BlockExecutor {
startedAt,
executionOrder,
endedAt,
childWorkflowInstanceId,
},
iterationContext
iterationContext,
ctx.childWorkflowContext
)
}
}

View File

@@ -322,7 +322,9 @@ export class DAGExecutor {
onStream: this.contextExtensions.onStream,
onBlockStart: this.contextExtensions.onBlockStart,
onBlockComplete: this.contextExtensions.onBlockComplete,
onChildWorkflowInstanceReady: this.contextExtensions.onChildWorkflowInstanceReady,
abortSignal: this.contextExtensions.abortSignal,
childWorkflowContext: this.contextExtensions.childWorkflowContext,
includeFileBase64: this.contextExtensions.includeFileBase64,
base64MaxBytes: this.contextExtensions.base64MaxBytes,
runFromBlockContext: overrides?.runFromBlockContext,

View File

@@ -54,6 +54,17 @@ export interface IterationContext {
iterationContainerId?: string
}
export interface ChildWorkflowContext {
/** The workflow block's ID in the parent execution */
parentBlockId: string
/** Display name of the child workflow */
workflowName: string
/** Child workflow ID */
workflowId: string
/** Nesting depth (1 = first level child) */
depth: number
}
export interface ExecutionCallbacks {
onStream?: (streamingExec: any) => Promise<void>
onBlockStart?: (
@@ -61,15 +72,23 @@ export interface ExecutionCallbacks {
blockName: string,
blockType: string,
executionOrder: number,
iterationContext?: IterationContext
iterationContext?: IterationContext,
childWorkflowContext?: ChildWorkflowContext
) => Promise<void>
onBlockComplete?: (
blockId: string,
blockName: string,
blockType: string,
output: any,
iterationContext?: IterationContext
iterationContext?: IterationContext,
childWorkflowContext?: ChildWorkflowContext
) => Promise<void>
/** Fires immediately after instanceId is generated, before child execution begins. */
onChildWorkflowInstanceReady?: (
blockId: string,
childWorkflowInstanceId: string,
iterationContext?: IterationContext
) => void
}
export interface ContextExtensions {
@@ -105,7 +124,8 @@ export interface ContextExtensions {
blockName: string,
blockType: string,
executionOrder: number,
iterationContext?: IterationContext
iterationContext?: IterationContext,
childWorkflowContext?: ChildWorkflowContext
) => Promise<void>
onBlockComplete?: (
blockId: string,
@@ -118,10 +138,23 @@ export interface ContextExtensions {
startedAt: string
executionOrder: number
endedAt: string
/** Per-invocation unique ID linking this workflow block execution to its child block events. */
childWorkflowInstanceId?: string
},
iterationContext?: IterationContext
iterationContext?: IterationContext,
childWorkflowContext?: ChildWorkflowContext
) => Promise<void>
/** Context identifying this execution as a child of a workflow block */
childWorkflowContext?: ChildWorkflowContext
/** Fires immediately after instanceId is generated, before child execution begins. */
onChildWorkflowInstanceReady?: (
blockId: string,
childWorkflowInstanceId: string,
iterationContext?: IterationContext
) => void
/**
* Run-from-block configuration. When provided, executor runs in partial
* execution mode starting from the specified block.

View File

@@ -6,6 +6,7 @@ import type { BlockOutput } from '@/blocks/types'
import { Executor } from '@/executor'
import { BlockType, DEFAULTS, HTTP } from '@/executor/constants'
import { ChildWorkflowError } from '@/executor/errors/child-workflow-error'
import type { IterationContext } from '@/executor/execution/types'
import type {
BlockHandler,
ExecutionContext,
@@ -44,6 +45,40 @@ export class WorkflowBlockHandler implements BlockHandler {
ctx: ExecutionContext,
block: SerializedBlock,
inputs: Record<string, any>
): Promise<BlockOutput | StreamingExecution> {
return this._executeCore(ctx, block, inputs)
}
async executeWithNode(
ctx: ExecutionContext,
block: SerializedBlock,
inputs: Record<string, any>,
nodeMetadata: {
nodeId: string
loopId?: string
parallelId?: string
branchIndex?: number
branchTotal?: number
originalBlockId?: string
isLoopNode?: boolean
}
): Promise<BlockOutput | StreamingExecution> {
return this._executeCore(ctx, block, inputs, nodeMetadata)
}
private async _executeCore(
ctx: ExecutionContext,
block: SerializedBlock,
inputs: Record<string, any>,
nodeMetadata?: {
nodeId: string
loopId?: string
parallelId?: string
branchIndex?: number
branchTotal?: number
originalBlockId?: string
isLoopNode?: boolean
}
): Promise<BlockOutput | StreamingExecution> {
logger.info(`Executing workflow block: ${block.id}`)
@@ -58,6 +93,10 @@ export class WorkflowBlockHandler implements BlockHandler {
const workflowMetadata = workflows[workflowId]
let childWorkflowName = workflowMetadata?.name || workflowId
// Unique ID per invocation — used to correlate child block events with this specific
// workflow block execution, preventing cross-iteration child mixing in loop contexts.
const instanceId = crypto.randomUUID()
let childWorkflowSnapshotId: string | undefined
try {
const currentDepth = (ctx.workflowId?.split('_sub_').length || 1) - 1
@@ -115,6 +154,19 @@ export class WorkflowBlockHandler implements BlockHandler {
)
childWorkflowSnapshotId = childSnapshotResult.snapshot.id
const childDepth = (ctx.childWorkflowContext?.depth ?? 0) + 1
const shouldPropagateCallbacks = childDepth <= DEFAULTS.MAX_SSE_CHILD_DEPTH
if (shouldPropagateCallbacks) {
const effectiveBlockId = nodeMetadata
? (nodeMetadata.originalBlockId ?? nodeMetadata.nodeId)
: block.id
const iterationContext = nodeMetadata
? this.getIterationContext(ctx, nodeMetadata)
: undefined
ctx.onChildWorkflowInstanceReady?.(effectiveBlockId, instanceId, iterationContext)
}
const subExecutor = new Executor({
workflow: childWorkflow.serializedState,
workflowInput: childWorkflowInput,
@@ -127,6 +179,18 @@ export class WorkflowBlockHandler implements BlockHandler {
userId: ctx.userId,
executionId: ctx.executionId,
abortSignal: ctx.abortSignal,
...(shouldPropagateCallbacks && {
onBlockStart: ctx.onBlockStart,
onBlockComplete: ctx.onBlockComplete,
onStream: ctx.onStream as ((streamingExecution: unknown) => Promise<void>) | undefined,
onChildWorkflowInstanceReady: ctx.onChildWorkflowInstanceReady,
childWorkflowContext: {
parentBlockId: instanceId,
workflowName: childWorkflowName,
workflowId,
depth: childDepth,
},
}),
},
})
@@ -148,6 +212,7 @@ export class WorkflowBlockHandler implements BlockHandler {
workflowId,
childWorkflowName,
duration,
instanceId,
childTraceSpans,
childWorkflowSnapshotId
)
@@ -183,11 +248,46 @@ export class WorkflowBlockHandler implements BlockHandler {
childTraceSpans,
executionResult,
childWorkflowSnapshotId,
childWorkflowInstanceId: instanceId,
cause: error instanceof Error ? error : undefined,
})
}
}
private getIterationContext(
ctx: ExecutionContext,
nodeMetadata: {
loopId?: string
parallelId?: string
branchIndex?: number
branchTotal?: number
isLoopNode?: boolean
}
): IterationContext | undefined {
if (nodeMetadata.branchIndex !== undefined && nodeMetadata.branchTotal !== undefined) {
return {
iterationCurrent: nodeMetadata.branchIndex,
iterationTotal: nodeMetadata.branchTotal,
iterationType: 'parallel',
iterationContainerId: nodeMetadata.parallelId,
}
}
if (nodeMetadata.isLoopNode && nodeMetadata.loopId) {
const loopScope = ctx.loopExecutions?.get(nodeMetadata.loopId)
if (loopScope && loopScope.iteration !== undefined) {
return {
iterationCurrent: loopScope.iteration,
iterationTotal: loopScope.maxIterations,
iterationType: 'loop',
iterationContainerId: nodeMetadata.loopId,
}
}
}
return undefined
}
/**
* Builds a cleaner error message for nested workflow errors.
* Parses nested error messages to extract workflow chain and root error.
@@ -525,6 +625,7 @@ export class WorkflowBlockHandler implements BlockHandler {
childWorkflowId: string,
childWorkflowName: string,
duration: number,
instanceId: string,
childTraceSpans?: WorkflowTraceSpan[],
childWorkflowSnapshotId?: string
): BlockOutput {
@@ -538,6 +639,7 @@ export class WorkflowBlockHandler implements BlockHandler {
childWorkflowName,
childTraceSpans: childTraceSpans || [],
childWorkflowSnapshotId,
childWorkflowInstanceId: instanceId,
})
}
@@ -548,6 +650,7 @@ export class WorkflowBlockHandler implements BlockHandler {
...(childWorkflowSnapshotId ? { childWorkflowSnapshotId } : {}),
result,
childTraceSpans: childTraceSpans || [],
_childWorkflowInstanceId: instanceId,
} as Record<string, any>
}
}

View File

@@ -1,7 +1,11 @@
import type { TraceSpan } from '@/lib/logs/types'
import type { PermissionGroupConfig } from '@/lib/permission-groups/types'
import type { BlockOutput } from '@/blocks/types'
import type { SerializableExecutionState } from '@/executor/execution/types'
import type {
ChildWorkflowContext,
IterationContext,
SerializableExecutionState,
} from '@/executor/execution/types'
import type { RunFromBlockContext } from '@/executor/utils/run-from-block'
import type { SerializedBlock, SerializedWorkflow } from '@/serializer/types'
@@ -239,15 +243,29 @@ export interface ExecutionContext {
blockId: string,
blockName: string,
blockType: string,
executionOrder: number
executionOrder: number,
iterationContext?: IterationContext,
childWorkflowContext?: ChildWorkflowContext
) => Promise<void>
onBlockComplete?: (
blockId: string,
blockName: string,
blockType: string,
output: any
output: any,
iterationContext?: IterationContext,
childWorkflowContext?: ChildWorkflowContext
) => Promise<void>
/** Context identifying this execution as a child of a workflow block */
childWorkflowContext?: ChildWorkflowContext
/** Fires immediately after instanceId is generated, before child execution begins. */
onChildWorkflowInstanceReady?: (
blockId: string,
childWorkflowInstanceId: string,
iterationContext?: IterationContext
) => void
/**
* AbortSignal for cancellation support.
* When the signal is aborted, execution should stop gracefully.
@@ -350,6 +368,8 @@ export interface BlockHandler {
parallelId?: string
branchIndex?: number
branchTotal?: number
originalBlockId?: string
isLoopNode?: boolean
}
) => Promise<BlockOutput | StreamingExecution>
}

View File

@@ -1,6 +1,7 @@
import { useCallback } from 'react'
import { createLogger } from '@sim/logger'
import type {
BlockChildWorkflowStartedData,
BlockCompletedData,
BlockErrorData,
BlockStartedData,
@@ -83,6 +84,9 @@ async function processSSEStream(
case 'block:error':
callbacks.onBlockError?.(event.data)
break
case 'block:childWorkflowStarted':
callbacks.onBlockChildWorkflowStarted?.(event.data)
break
case 'stream:chunk':
callbacks.onStreamChunk?.(event.data)
break
@@ -110,6 +114,7 @@ export interface ExecutionStreamCallbacks {
onBlockStarted?: (data: BlockStartedData) => void
onBlockCompleted?: (data: BlockCompletedData) => void
onBlockError?: (data: BlockErrorData) => void
onBlockChildWorkflowStarted?: (data: BlockChildWorkflowStartedData) => void
onStreamChunk?: (data: StreamChunkData) => void
onStreamDone?: (data: StreamDoneData) => void
}

View File

@@ -64,10 +64,31 @@ export async function validateUrlWithDNS(
const parsedUrl = new URL(url!)
const hostname = parsedUrl.hostname
try {
const { address } = await dns.lookup(hostname)
const hostnameLower = hostname.toLowerCase()
const cleanHostname =
hostnameLower.startsWith('[') && hostnameLower.endsWith(']')
? hostnameLower.slice(1, -1)
: hostnameLower
if (isPrivateOrReservedIP(address)) {
let isLocalhost = cleanHostname === 'localhost'
if (ipaddr.isValid(cleanHostname)) {
const processedIP = ipaddr.process(cleanHostname).toString()
if (processedIP === '127.0.0.1' || processedIP === '::1') {
isLocalhost = true
}
}
try {
const { address } = await dns.lookup(cleanHostname, { verbatim: true })
const resolvedIsLoopback =
ipaddr.isValid(address) &&
(() => {
const ip = ipaddr.process(address).toString()
return ip === '127.0.0.1' || ip === '::1'
})()
if (isPrivateOrReservedIP(address) && !(isLocalhost && resolvedIsLoopback)) {
logger.warn('URL resolves to blocked IP address', {
paramName,
hostname,
@@ -189,8 +210,6 @@ export async function secureFetchWithPinnedIP(
const agent = isHttps ? new https.Agent(agentOptions) : new http.Agent(agentOptions)
// Remove accept-encoding since Node.js http/https doesn't auto-decompress
// Headers are lowercase due to Web Headers API normalization in executeToolRequest
const { 'accept-encoding': _, ...sanitizedHeaders } = options.headers ?? {}
const requestOptions: http.RequestOptions = {
@@ -200,7 +219,7 @@ export async function secureFetchWithPinnedIP(
method: options.method || 'GET',
headers: sanitizedHeaders,
agent,
timeout: options.timeout || 300000, // Default 5 minutes
timeout: options.timeout || 300000,
}
const protocol = isHttps ? https : http

View File

@@ -569,10 +569,28 @@ describe('validateUrlWithDNS', () => {
expect(result.error).toContain('https://')
})
it('should reject localhost URLs', async () => {
it('should accept https localhost URLs', async () => {
const result = await validateUrlWithDNS('https://localhost/api')
expect(result.isValid).toBe(false)
expect(result.error).toContain('localhost')
expect(result.isValid).toBe(true)
expect(result.resolvedIP).toBeDefined()
})
it('should accept http localhost URLs', async () => {
const result = await validateUrlWithDNS('http://localhost/api')
expect(result.isValid).toBe(true)
expect(result.resolvedIP).toBeDefined()
})
it('should accept IPv4 loopback URLs', async () => {
const result = await validateUrlWithDNS('http://127.0.0.1/api')
expect(result.isValid).toBe(true)
expect(result.resolvedIP).toBeDefined()
})
it('should accept IPv6 loopback URLs', async () => {
const result = await validateUrlWithDNS('http://[::1]/api')
expect(result.isValid).toBe(true)
expect(result.resolvedIP).toBeDefined()
})
it('should reject private IP URLs', async () => {
@@ -898,17 +916,37 @@ describe('validateExternalUrl', () => {
expect(result.isValid).toBe(false)
expect(result.error).toContain('valid URL')
})
})
it.concurrent('should reject localhost', () => {
describe('localhost and loopback addresses', () => {
it.concurrent('should accept https localhost', () => {
const result = validateExternalUrl('https://localhost/api')
expect(result.isValid).toBe(false)
expect(result.error).toContain('localhost')
expect(result.isValid).toBe(true)
})
it.concurrent('should reject 127.0.0.1', () => {
it.concurrent('should accept http localhost', () => {
const result = validateExternalUrl('http://localhost/api')
expect(result.isValid).toBe(true)
})
it.concurrent('should accept https 127.0.0.1', () => {
const result = validateExternalUrl('https://127.0.0.1/api')
expect(result.isValid).toBe(false)
expect(result.error).toContain('private IP')
expect(result.isValid).toBe(true)
})
it.concurrent('should accept http 127.0.0.1', () => {
const result = validateExternalUrl('http://127.0.0.1/api')
expect(result.isValid).toBe(true)
})
it.concurrent('should accept https IPv6 loopback', () => {
const result = validateExternalUrl('https://[::1]/api')
expect(result.isValid).toBe(true)
})
it.concurrent('should accept http IPv6 loopback', () => {
const result = validateExternalUrl('http://[::1]/api')
expect(result.isValid).toBe(true)
})
it.concurrent('should reject 0.0.0.0', () => {
@@ -989,9 +1027,9 @@ describe('validateImageUrl', () => {
expect(result.isValid).toBe(true)
})
it.concurrent('should reject localhost URLs', () => {
it.concurrent('should accept localhost URLs', () => {
const result = validateImageUrl('https://localhost/image.png')
expect(result.isValid).toBe(false)
expect(result.isValid).toBe(true)
})
it.concurrent('should use imageUrl as default param name', () => {

View File

@@ -89,9 +89,9 @@ export function validatePathSegment(
const pathTraversalPatterns = [
'..',
'./',
'.\\.', // Windows path traversal
'%2e%2e', // URL encoded ..
'%252e%252e', // Double URL encoded ..
'.\\.',
'%2e%2e',
'%252e%252e',
'..%2f',
'..%5c',
'%2e%2e%2f',
@@ -391,7 +391,6 @@ export function validateHostname(
const lowerHostname = hostname.toLowerCase()
// Block localhost
if (lowerHostname === 'localhost') {
logger.warn('Hostname is localhost', { paramName })
return {
@@ -400,7 +399,6 @@ export function validateHostname(
}
}
// Use ipaddr.js to check if hostname is an IP and if it's private/reserved
if (ipaddr.isValid(lowerHostname)) {
if (isPrivateOrReservedIP(lowerHostname)) {
logger.warn('Hostname matches blocked IP range', {
@@ -414,7 +412,6 @@ export function validateHostname(
}
}
// Basic hostname format validation
const hostnamePattern =
/^[a-z0-9]([a-z0-9-]{0,61}[a-z0-9])?(\.[a-z0-9]([a-z0-9-]{0,61}[a-z0-9])?)*$/i
@@ -460,10 +457,7 @@ export function validateFileExtension(
}
}
// Remove leading dot if present
const ext = extension.startsWith('.') ? extension.slice(1) : extension
// Normalize to lowercase
const normalizedExt = ext.toLowerCase()
if (!allowedExtensions.map((e) => e.toLowerCase()).includes(normalizedExt)) {
@@ -515,7 +509,6 @@ export function validateMicrosoftGraphId(
}
}
// Check for path traversal patterns (../)
const pathTraversalPatterns = [
'../',
'..\\',
@@ -525,7 +518,7 @@ export function validateMicrosoftGraphId(
'%2e%2e%5c',
'%2e%2e\\',
'..%5c',
'%252e%252e%252f', // double encoded
'%252e%252e%252f',
]
const lowerValue = value.toLowerCase()
@@ -542,7 +535,6 @@ export function validateMicrosoftGraphId(
}
}
// Check for control characters and null bytes
if (/[\x00-\x1f\x7f]/.test(value) || value.includes('%00')) {
logger.warn('Control characters in Microsoft Graph ID', { paramName })
return {
@@ -551,7 +543,6 @@ export function validateMicrosoftGraphId(
}
}
// Check for newlines (which could be used for header injection)
if (value.includes('\n') || value.includes('\r')) {
return {
isValid: false,
@@ -559,8 +550,6 @@ export function validateMicrosoftGraphId(
}
}
// Microsoft Graph IDs can contain many characters, but not suspicious patterns
// We've blocked path traversal, so allow the rest
return { isValid: true, sanitized: value }
}
@@ -583,7 +572,6 @@ export function validateJiraCloudId(
value: string | null | undefined,
paramName = 'cloudId'
): ValidationResult {
// Jira cloud IDs are alphanumeric with hyphens (UUID-like)
return validatePathSegment(value, {
paramName,
allowHyphens: true,
@@ -612,7 +600,6 @@ export function validateJiraIssueKey(
value: string | null | undefined,
paramName = 'issueKey'
): ValidationResult {
// Jira issue keys: letters, numbers, hyphens (PROJECT-123 format)
return validatePathSegment(value, {
paramName,
allowHyphens: true,
@@ -653,7 +640,6 @@ export function validateExternalUrl(
}
}
// Must be a valid URL
let parsedUrl: URL
try {
parsedUrl = new URL(url)
@@ -664,28 +650,29 @@ export function validateExternalUrl(
}
}
// Only allow https protocol
if (parsedUrl.protocol !== 'https:') {
const protocol = parsedUrl.protocol
const hostname = parsedUrl.hostname.toLowerCase()
const cleanHostname =
hostname.startsWith('[') && hostname.endsWith(']') ? hostname.slice(1, -1) : hostname
let isLocalhost = cleanHostname === 'localhost'
if (ipaddr.isValid(cleanHostname)) {
const processedIP = ipaddr.process(cleanHostname).toString()
if (processedIP === '127.0.0.1' || processedIP === '::1') {
isLocalhost = true
}
}
if (protocol !== 'https:' && !(protocol === 'http:' && isLocalhost)) {
return {
isValid: false,
error: `${paramName} must use https:// protocol`,
}
}
// Block private IP ranges and localhost
const hostname = parsedUrl.hostname.toLowerCase()
// Block localhost
if (hostname === 'localhost') {
return {
isValid: false,
error: `${paramName} cannot point to localhost`,
}
}
// Use ipaddr.js to check if hostname is an IP and if it's private/reserved
if (ipaddr.isValid(hostname)) {
if (isPrivateOrReservedIP(hostname)) {
if (!isLocalhost && ipaddr.isValid(cleanHostname)) {
if (isPrivateOrReservedIP(cleanHostname)) {
return {
isValid: false,
error: `${paramName} cannot point to private IP addresses`,

View File

@@ -619,7 +619,6 @@ function cleanupWorker(workerId: number) {
workerInfo.activeExecutions = 0
workers.delete(workerId)
logger.info('Worker removed from pool', { workerId, poolSize: workers.size })
}
function resetWorkerIdleTimeout(workerId: number) {
@@ -635,7 +634,6 @@ function resetWorkerIdleTimeout(workerId: number) {
workerInfo.idleTimeout = setTimeout(() => {
const w = workers.get(workerId)
if (w && w.activeExecutions === 0) {
logger.info('Cleaning up idle worker', { workerId })
cleanupWorker(workerId)
}
}, WORKER_IDLE_TIMEOUT_MS)

View File

@@ -20,6 +20,7 @@ import { updateWorkflowRunCounts } from '@/lib/workflows/utils'
import { Executor } from '@/executor'
import type { ExecutionSnapshot } from '@/executor/execution/snapshot'
import type {
ChildWorkflowContext,
ContextExtensions,
ExecutionCallbacks,
IterationContext,
@@ -128,7 +129,7 @@ export async function executeWorkflowCore(
const { metadata, workflow, input, workflowVariables, selectedOutputs } = snapshot
const { requestId, workflowId, userId, triggerType, executionId, triggerBlockId, useDraftState } =
metadata
const { onBlockStart, onBlockComplete, onStream } = callbacks
const { onBlockStart, onBlockComplete, onStream, onChildWorkflowInstanceReady } = callbacks
const providedWorkspaceId = metadata.workspaceId
if (!providedWorkspaceId) {
@@ -287,11 +288,19 @@ export async function executeWorkflowCore(
startedAt: string
endedAt: string
},
iterationContext?: IterationContext
iterationContext?: IterationContext,
childWorkflowContext?: ChildWorkflowContext
) => {
await loggingSession.onBlockComplete(blockId, blockName, blockType, output)
if (onBlockComplete) {
await onBlockComplete(blockId, blockName, blockType, output, iterationContext)
await onBlockComplete(
blockId,
blockName,
blockType,
output,
iterationContext,
childWorkflowContext
)
}
}
@@ -320,6 +329,7 @@ export async function executeWorkflowCore(
includeFileBase64,
base64MaxBytes,
stopAfterBlockId: resolvedStopAfterBlockId,
onChildWorkflowInstanceReady,
}
const executorInstance = new Executor({

View File

@@ -1,3 +1,4 @@
import type { ChildWorkflowContext, IterationContext } from '@/executor/execution/types'
import type { SubflowType } from '@/stores/workflows/workflow/types'
export type ExecutionEventType =
@@ -8,6 +9,7 @@ export type ExecutionEventType =
| 'block:started'
| 'block:completed'
| 'block:error'
| 'block:childWorkflowStarted'
| 'stream:chunk'
| 'stream:done'
@@ -81,6 +83,8 @@ export interface BlockStartedEvent extends BaseExecutionEvent {
iterationTotal?: number
iterationType?: SubflowType
iterationContainerId?: string
childWorkflowBlockId?: string
childWorkflowName?: string
}
}
@@ -104,6 +108,10 @@ export interface BlockCompletedEvent extends BaseExecutionEvent {
iterationTotal?: number
iterationType?: SubflowType
iterationContainerId?: string
childWorkflowBlockId?: string
childWorkflowName?: string
/** Per-invocation unique ID for correlating child block events with this workflow block. */
childWorkflowInstanceId?: string
}
}
@@ -127,6 +135,26 @@ export interface BlockErrorEvent extends BaseExecutionEvent {
iterationTotal?: number
iterationType?: SubflowType
iterationContainerId?: string
childWorkflowBlockId?: string
childWorkflowName?: string
/** Per-invocation unique ID for correlating child block events with this workflow block. */
childWorkflowInstanceId?: string
}
}
/**
* Block child workflow started event — fires when a workflow block generates its instanceId,
* before child execution begins. Allows clients to pre-associate the running entry with
* the instanceId so child block events can be correlated in real-time.
*/
export interface BlockChildWorkflowStartedEvent extends BaseExecutionEvent {
type: 'block:childWorkflowStarted'
workflowId: string
data: {
blockId: string
childWorkflowInstanceId: string
iterationCurrent?: number
iterationContainerId?: string
}
}
@@ -164,6 +192,7 @@ export type ExecutionEvent =
| BlockStartedEvent
| BlockCompletedEvent
| BlockErrorEvent
| BlockChildWorkflowStartedEvent
| StreamChunkEvent
| StreamDoneEvent
@@ -174,6 +203,7 @@ export type ExecutionCancelledData = ExecutionCancelledEvent['data']
export type BlockStartedData = BlockStartedEvent['data']
export type BlockCompletedData = BlockCompletedEvent['data']
export type BlockErrorData = BlockErrorEvent['data']
export type BlockChildWorkflowStartedData = BlockChildWorkflowStartedEvent['data']
export type StreamChunkData = StreamChunkEvent['data']
export type StreamDoneData = StreamDoneEvent['data']
@@ -222,12 +252,8 @@ export function createSSECallbacks(options: SSECallbackOptions) {
blockName: string,
blockType: string,
executionOrder: number,
iterationContext?: {
iterationCurrent: number
iterationTotal?: number
iterationType: string
iterationContainerId?: string
}
iterationContext?: IterationContext,
childWorkflowContext?: ChildWorkflowContext
) => {
sendEvent({
type: 'block:started',
@@ -242,9 +268,13 @@ export function createSSECallbacks(options: SSECallbackOptions) {
...(iterationContext && {
iterationCurrent: iterationContext.iterationCurrent,
iterationTotal: iterationContext.iterationTotal,
iterationType: iterationContext.iterationType as any,
iterationType: iterationContext.iterationType,
iterationContainerId: iterationContext.iterationContainerId,
}),
...(childWorkflowContext && {
childWorkflowBlockId: childWorkflowContext.parentBlockId,
childWorkflowName: childWorkflowContext.workflowName,
}),
},
})
}
@@ -260,23 +290,30 @@ export function createSSECallbacks(options: SSECallbackOptions) {
startedAt: string
executionOrder: number
endedAt: string
childWorkflowInstanceId?: string
},
iterationContext?: {
iterationCurrent: number
iterationTotal?: number
iterationType: string
iterationContainerId?: string
}
iterationContext?: IterationContext,
childWorkflowContext?: ChildWorkflowContext
) => {
const hasError = callbackData.output?.error
const iterationData = iterationContext
? {
iterationCurrent: iterationContext.iterationCurrent,
iterationTotal: iterationContext.iterationTotal,
iterationType: iterationContext.iterationType as any,
iterationType: iterationContext.iterationType,
iterationContainerId: iterationContext.iterationContainerId,
}
: {}
const childWorkflowData = childWorkflowContext
? {
childWorkflowBlockId: childWorkflowContext.parentBlockId,
childWorkflowName: childWorkflowContext.workflowName,
}
: {}
const instanceData = callbackData.childWorkflowInstanceId
? { childWorkflowInstanceId: callbackData.childWorkflowInstanceId }
: {}
if (hasError) {
sendEvent({
@@ -295,6 +332,8 @@ export function createSSECallbacks(options: SSECallbackOptions) {
executionOrder: callbackData.executionOrder,
endedAt: callbackData.endedAt,
...iterationData,
...childWorkflowData,
...instanceData,
},
})
} else {
@@ -314,6 +353,8 @@ export function createSSECallbacks(options: SSECallbackOptions) {
executionOrder: callbackData.executionOrder,
endedAt: callbackData.endedAt,
...iterationData,
...childWorkflowData,
...instanceData,
},
})
}
@@ -352,5 +393,26 @@ export function createSSECallbacks(options: SSECallbackOptions) {
}
}
return { sendEvent, onBlockStart, onBlockComplete, onStream }
const onChildWorkflowInstanceReady = (
blockId: string,
childWorkflowInstanceId: string,
iterationContext?: IterationContext
) => {
sendEvent({
type: 'block:childWorkflowStarted',
timestamp: new Date().toISOString(),
executionId,
workflowId,
data: {
blockId,
childWorkflowInstanceId,
...(iterationContext && {
iterationCurrent: iterationContext.iterationCurrent,
iterationContainerId: iterationContext.iterationContainerId,
}),
},
})
}
return { sendEvent, onBlockStart, onBlockComplete, onStream, onChildWorkflowInstanceReady }
}

View File

@@ -420,6 +420,18 @@ export const useTerminalConsoleStore = create<ConsoleStore>()(
updatedEntry.iterationContainerId = update.iterationContainerId
}
if (update.childWorkflowBlockId !== undefined) {
updatedEntry.childWorkflowBlockId = update.childWorkflowBlockId
}
if (update.childWorkflowName !== undefined) {
updatedEntry.childWorkflowName = update.childWorkflowName
}
if (update.childWorkflowInstanceId !== undefined) {
updatedEntry.childWorkflowInstanceId = update.childWorkflowInstanceId
}
return updatedEntry
})

View File

@@ -24,6 +24,12 @@ export interface ConsoleEntry {
iterationContainerId?: string
isRunning?: boolean
isCanceled?: boolean
/** ID of the workflow block in the parent execution that spawned this child block */
childWorkflowBlockId?: string
/** Display name of the child workflow this block belongs to */
childWorkflowName?: string
/** Per-invocation unique ID linking this workflow block to its child block events */
childWorkflowInstanceId?: string
}
export interface ConsoleUpdate {
@@ -44,6 +50,9 @@ export interface ConsoleUpdate {
iterationTotal?: number
iterationType?: SubflowType
iterationContainerId?: string
childWorkflowBlockId?: string
childWorkflowName?: string
childWorkflowInstanceId?: string
}
export interface ConsoleStore {