mirror of
https://github.com/simstudioai/sim.git
synced 2026-03-15 03:00:33 -04:00
Compare commits
12 Commits
fix/search
...
v0.5.99
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
af592349d3 | ||
|
|
69ec70af13 | ||
|
|
687c12528b | ||
|
|
996dc96d6e | ||
|
|
0d86ea01f0 | ||
|
|
115f04e989 | ||
|
|
34d92fae89 | ||
|
|
67aa4bb332 | ||
|
|
15ace5e63f | ||
|
|
fdca73679d | ||
|
|
da46a387c9 | ||
|
|
b7e377ec4b |
@@ -211,7 +211,7 @@ describe('Function Execute API Route', () => {
|
||||
|
||||
it.concurrent('should block SSRF attacks through secure fetch wrapper', async () => {
|
||||
expect(validateProxyUrl('http://169.254.169.254/latest/meta-data/').isValid).toBe(false)
|
||||
expect(validateProxyUrl('http://127.0.0.1:8080/admin').isValid).toBe(false)
|
||||
expect(validateProxyUrl('http://127.0.0.1:8080/admin').isValid).toBe(true)
|
||||
expect(validateProxyUrl('http://192.168.1.1/config').isValid).toBe(false)
|
||||
expect(validateProxyUrl('http://10.0.0.1/internal').isValid).toBe(false)
|
||||
})
|
||||
|
||||
@@ -38,6 +38,7 @@ import { executeWorkflowJob, type WorkflowExecutionPayload } from '@/background/
|
||||
import { normalizeName } from '@/executor/constants'
|
||||
import { ExecutionSnapshot } from '@/executor/execution/snapshot'
|
||||
import type {
|
||||
ChildWorkflowContext,
|
||||
ExecutionMetadata,
|
||||
IterationContext,
|
||||
SerializableExecutionState,
|
||||
@@ -742,7 +743,8 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
|
||||
blockName: string,
|
||||
blockType: string,
|
||||
executionOrder: number,
|
||||
iterationContext?: IterationContext
|
||||
iterationContext?: IterationContext,
|
||||
childWorkflowContext?: ChildWorkflowContext
|
||||
) => {
|
||||
logger.info(`[${requestId}] 🔷 onBlockStart called:`, { blockId, blockName, blockType })
|
||||
sendEvent({
|
||||
@@ -761,6 +763,10 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
|
||||
iterationType: iterationContext.iterationType,
|
||||
iterationContainerId: iterationContext.iterationContainerId,
|
||||
}),
|
||||
...(childWorkflowContext && {
|
||||
childWorkflowBlockId: childWorkflowContext.parentBlockId,
|
||||
childWorkflowName: childWorkflowContext.workflowName,
|
||||
}),
|
||||
},
|
||||
})
|
||||
}
|
||||
@@ -770,9 +776,20 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
|
||||
blockName: string,
|
||||
blockType: string,
|
||||
callbackData: any,
|
||||
iterationContext?: IterationContext
|
||||
iterationContext?: IterationContext,
|
||||
childWorkflowContext?: ChildWorkflowContext
|
||||
) => {
|
||||
const hasError = callbackData.output?.error
|
||||
const childWorkflowData = childWorkflowContext
|
||||
? {
|
||||
childWorkflowBlockId: childWorkflowContext.parentBlockId,
|
||||
childWorkflowName: childWorkflowContext.workflowName,
|
||||
}
|
||||
: {}
|
||||
|
||||
const instanceData = callbackData.childWorkflowInstanceId
|
||||
? { childWorkflowInstanceId: callbackData.childWorkflowInstanceId }
|
||||
: {}
|
||||
|
||||
if (hasError) {
|
||||
logger.info(`[${requestId}] ✗ onBlockComplete (error) called:`, {
|
||||
@@ -802,6 +819,8 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
|
||||
iterationType: iterationContext.iterationType,
|
||||
iterationContainerId: iterationContext.iterationContainerId,
|
||||
}),
|
||||
...childWorkflowData,
|
||||
...instanceData,
|
||||
},
|
||||
})
|
||||
} else {
|
||||
@@ -831,6 +850,8 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
|
||||
iterationType: iterationContext.iterationType,
|
||||
iterationContainerId: iterationContext.iterationContainerId,
|
||||
}),
|
||||
...childWorkflowData,
|
||||
...instanceData,
|
||||
},
|
||||
})
|
||||
}
|
||||
@@ -898,12 +919,34 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
|
||||
selectedOutputs
|
||||
)
|
||||
|
||||
const onChildWorkflowInstanceReady = (
|
||||
blockId: string,
|
||||
childWorkflowInstanceId: string,
|
||||
iterationContext?: IterationContext
|
||||
) => {
|
||||
sendEvent({
|
||||
type: 'block:childWorkflowStarted',
|
||||
timestamp: new Date().toISOString(),
|
||||
executionId,
|
||||
workflowId,
|
||||
data: {
|
||||
blockId,
|
||||
childWorkflowInstanceId,
|
||||
...(iterationContext && {
|
||||
iterationCurrent: iterationContext.iterationCurrent,
|
||||
iterationContainerId: iterationContext.iterationContainerId,
|
||||
}),
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
const result = await executeWorkflowCore({
|
||||
snapshot,
|
||||
callbacks: {
|
||||
onBlockStart,
|
||||
onBlockComplete,
|
||||
onStream,
|
||||
onChildWorkflowInstanceReady,
|
||||
},
|
||||
loggingSession,
|
||||
abortSignal: timeoutController.signal,
|
||||
|
||||
@@ -41,6 +41,7 @@ import {
|
||||
} from '@/app/workspace/[workspaceId]/w/[workflowId]/components/terminal/hooks'
|
||||
import { ROW_STYLES } from '@/app/workspace/[workspaceId]/w/[workflowId]/components/terminal/types'
|
||||
import {
|
||||
collectExpandableNodeIds,
|
||||
type EntryNode,
|
||||
type ExecutionGroup,
|
||||
flattenBlockEntriesOnly,
|
||||
@@ -67,6 +68,21 @@ const MIN_HEIGHT = TERMINAL_HEIGHT.MIN
|
||||
const DEFAULT_EXPANDED_HEIGHT = TERMINAL_HEIGHT.DEFAULT
|
||||
const MIN_OUTPUT_PANEL_WIDTH_PX = OUTPUT_PANEL_WIDTH.MIN
|
||||
|
||||
/** Returns true if any node in the subtree has an error */
|
||||
function hasErrorInTree(nodes: EntryNode[]): boolean {
|
||||
return nodes.some((n) => Boolean(n.entry.error) || hasErrorInTree(n.children))
|
||||
}
|
||||
|
||||
/** Returns true if any node in the subtree is currently running */
|
||||
function hasRunningInTree(nodes: EntryNode[]): boolean {
|
||||
return nodes.some((n) => Boolean(n.entry.isRunning) || hasRunningInTree(n.children))
|
||||
}
|
||||
|
||||
/** Returns true if any node in the subtree was canceled */
|
||||
function hasCanceledInTree(nodes: EntryNode[]): boolean {
|
||||
return nodes.some((n) => Boolean(n.entry.isCanceled) || hasCanceledInTree(n.children))
|
||||
}
|
||||
|
||||
/**
|
||||
* Block row component for displaying actual block entries
|
||||
*/
|
||||
@@ -338,6 +354,122 @@ const SubflowNodeRow = memo(function SubflowNodeRow({
|
||||
)
|
||||
})
|
||||
|
||||
/**
|
||||
* Workflow node component - shows workflow block header with nested child blocks
|
||||
*/
|
||||
const WorkflowNodeRow = memo(function WorkflowNodeRow({
|
||||
node,
|
||||
selectedEntryId,
|
||||
onSelectEntry,
|
||||
expandedNodes,
|
||||
onToggleNode,
|
||||
}: {
|
||||
node: EntryNode
|
||||
selectedEntryId: string | null
|
||||
onSelectEntry: (entry: ConsoleEntry) => void
|
||||
expandedNodes: Set<string>
|
||||
onToggleNode: (nodeId: string) => void
|
||||
}) {
|
||||
const { entry, children } = node
|
||||
const BlockIcon = getBlockIcon(entry.blockType)
|
||||
const bgColor = getBlockColor(entry.blockType)
|
||||
const nodeId = entry.id
|
||||
const isExpanded = expandedNodes.has(nodeId)
|
||||
const hasChildren = children.length > 0
|
||||
const isSelected = selectedEntryId === entry.id
|
||||
|
||||
const hasError = useMemo(
|
||||
() => Boolean(entry.error) || hasErrorInTree(children),
|
||||
[entry.error, children]
|
||||
)
|
||||
const hasRunningDescendant = useMemo(
|
||||
() => Boolean(entry.isRunning) || hasRunningInTree(children),
|
||||
[entry.isRunning, children]
|
||||
)
|
||||
const hasCanceledDescendant = useMemo(
|
||||
() => (Boolean(entry.isCanceled) || hasCanceledInTree(children)) && !hasRunningDescendant,
|
||||
[entry.isCanceled, children, hasRunningDescendant]
|
||||
)
|
||||
|
||||
return (
|
||||
<div className='flex min-w-0 flex-col'>
|
||||
{/* Workflow Block Header */}
|
||||
<div
|
||||
className={clsx(
|
||||
ROW_STYLES.base,
|
||||
'h-[26px]',
|
||||
isSelected ? ROW_STYLES.selected : ROW_STYLES.hover
|
||||
)}
|
||||
onClick={(e) => {
|
||||
e.stopPropagation()
|
||||
if (!isSelected) onSelectEntry(entry)
|
||||
if (hasChildren) onToggleNode(nodeId)
|
||||
}}
|
||||
>
|
||||
<div className='flex min-w-0 flex-1 items-center gap-[8px]'>
|
||||
<div
|
||||
className='flex h-[14px] w-[14px] flex-shrink-0 items-center justify-center rounded-[4px]'
|
||||
style={{ background: bgColor }}
|
||||
>
|
||||
{BlockIcon && <BlockIcon className='h-[9px] w-[9px] text-white' />}
|
||||
</div>
|
||||
<span
|
||||
className={clsx(
|
||||
'min-w-0 truncate font-medium text-[13px]',
|
||||
hasError
|
||||
? 'text-[var(--text-error)]'
|
||||
: isSelected || isExpanded
|
||||
? 'text-[var(--text-primary)]'
|
||||
: 'text-[var(--text-tertiary)] group-hover:text-[var(--text-primary)]'
|
||||
)}
|
||||
>
|
||||
{entry.blockName}
|
||||
</span>
|
||||
{hasChildren && (
|
||||
<ChevronDown
|
||||
className={clsx(
|
||||
'h-[8px] w-[8px] flex-shrink-0 text-[var(--text-tertiary)] transition-transform duration-100 group-hover:text-[var(--text-primary)]',
|
||||
!isExpanded && '-rotate-90'
|
||||
)}
|
||||
/>
|
||||
)}
|
||||
</div>
|
||||
<span
|
||||
className={clsx(
|
||||
'flex-shrink-0 font-medium text-[13px]',
|
||||
!hasRunningDescendant &&
|
||||
(hasCanceledDescendant
|
||||
? 'text-[var(--text-secondary)]'
|
||||
: 'text-[var(--text-tertiary)]')
|
||||
)}
|
||||
>
|
||||
<StatusDisplay
|
||||
isRunning={hasRunningDescendant}
|
||||
isCanceled={hasCanceledDescendant}
|
||||
formattedDuration={formatDuration(entry.durationMs, { precision: 2 }) ?? '-'}
|
||||
/>
|
||||
</span>
|
||||
</div>
|
||||
|
||||
{/* Nested Child Blocks — rendered through EntryNodeRow for full loop/parallel support */}
|
||||
{isExpanded && hasChildren && (
|
||||
<div className={ROW_STYLES.nested}>
|
||||
{children.map((child) => (
|
||||
<EntryNodeRow
|
||||
key={child.entry.id}
|
||||
node={child}
|
||||
selectedEntryId={selectedEntryId}
|
||||
onSelectEntry={onSelectEntry}
|
||||
expandedNodes={expandedNodes}
|
||||
onToggleNode={onToggleNode}
|
||||
/>
|
||||
))}
|
||||
</div>
|
||||
)}
|
||||
</div>
|
||||
)
|
||||
})
|
||||
|
||||
/**
|
||||
* Entry node component - dispatches to appropriate component based on node type
|
||||
*/
|
||||
@@ -368,6 +500,18 @@ const EntryNodeRow = memo(function EntryNodeRow({
|
||||
)
|
||||
}
|
||||
|
||||
if (nodeType === 'workflow') {
|
||||
return (
|
||||
<WorkflowNodeRow
|
||||
node={node}
|
||||
selectedEntryId={selectedEntryId}
|
||||
onSelectEntry={onSelectEntry}
|
||||
expandedNodes={expandedNodes}
|
||||
onToggleNode={onToggleNode}
|
||||
/>
|
||||
)
|
||||
}
|
||||
|
||||
if (nodeType === 'iteration') {
|
||||
return (
|
||||
<IterationNodeRow
|
||||
@@ -659,27 +803,15 @@ export const Terminal = memo(function Terminal() {
|
||||
])
|
||||
|
||||
/**
|
||||
* Auto-expand subflows and iterations when new entries arrive.
|
||||
* Auto-expand subflows, iterations, and workflow nodes when new entries arrive.
|
||||
* Recursively walks the full tree so nested nodes (e.g. a workflow block inside
|
||||
* a loop iteration) are also expanded automatically.
|
||||
* This always runs regardless of autoSelectEnabled - new runs should always be visible.
|
||||
*/
|
||||
useEffect(() => {
|
||||
if (executionGroups.length === 0) return
|
||||
|
||||
const newestExec = executionGroups[0]
|
||||
|
||||
// Collect all node IDs that should be expanded (subflows and their iterations)
|
||||
const nodeIdsToExpand: string[] = []
|
||||
for (const node of newestExec.entryTree) {
|
||||
if (node.nodeType === 'subflow' && node.children.length > 0) {
|
||||
nodeIdsToExpand.push(node.entry.id)
|
||||
// Also expand all iteration children
|
||||
for (const iterNode of node.children) {
|
||||
if (iterNode.nodeType === 'iteration') {
|
||||
nodeIdsToExpand.push(iterNode.entry.id)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
const nodeIdsToExpand = collectExpandableNodeIds(executionGroups[0].entryTree)
|
||||
|
||||
if (nodeIdsToExpand.length > 0) {
|
||||
setExpandedNodes((prev) => {
|
||||
|
||||
@@ -1,6 +1,14 @@
|
||||
import type React from 'react'
|
||||
import { AlertTriangleIcon, BanIcon, RepeatIcon, SplitIcon, XCircleIcon } from 'lucide-react'
|
||||
import {
|
||||
AlertTriangleIcon,
|
||||
BanIcon,
|
||||
NetworkIcon,
|
||||
RepeatIcon,
|
||||
SplitIcon,
|
||||
XCircleIcon,
|
||||
} from 'lucide-react'
|
||||
import { getBlock } from '@/blocks'
|
||||
import { isWorkflowBlockType } from '@/executor/constants'
|
||||
import { TERMINAL_BLOCK_COLUMN_WIDTH } from '@/stores/constants'
|
||||
import type { ConsoleEntry } from '@/stores/terminal'
|
||||
|
||||
@@ -12,6 +20,8 @@ const SUBFLOW_COLORS = {
|
||||
parallel: '#FEE12B',
|
||||
} as const
|
||||
|
||||
const WORKFLOW_COLOR = '#8b5cf6'
|
||||
|
||||
/**
|
||||
* Special block type colors for errors and system messages
|
||||
*/
|
||||
@@ -41,6 +51,10 @@ export function getBlockIcon(
|
||||
return SplitIcon
|
||||
}
|
||||
|
||||
if (blockType === 'workflow') {
|
||||
return NetworkIcon
|
||||
}
|
||||
|
||||
if (blockType === 'error') {
|
||||
return XCircleIcon
|
||||
}
|
||||
@@ -71,6 +85,9 @@ export function getBlockColor(blockType: string): string {
|
||||
if (blockType === 'parallel') {
|
||||
return SUBFLOW_COLORS.parallel
|
||||
}
|
||||
if (blockType === 'workflow') {
|
||||
return WORKFLOW_COLOR
|
||||
}
|
||||
// Special block types for errors and system messages
|
||||
if (blockType === 'error') {
|
||||
return SPECIAL_BLOCK_COLORS.error
|
||||
@@ -120,7 +137,7 @@ export function isSubflowBlockType(blockType: string): boolean {
|
||||
/**
|
||||
* Node type for the tree structure
|
||||
*/
|
||||
export type EntryNodeType = 'block' | 'subflow' | 'iteration'
|
||||
export type EntryNodeType = 'block' | 'subflow' | 'iteration' | 'workflow'
|
||||
|
||||
/**
|
||||
* Entry node for tree structure - represents a block, subflow, or iteration
|
||||
@@ -168,6 +185,36 @@ interface IterationGroup {
|
||||
startTimeMs: number
|
||||
}
|
||||
|
||||
/**
|
||||
* Recursively collects all descendant entries owned by a workflow block.
|
||||
* This includes direct children and the children of any nested workflow blocks,
|
||||
* enabling correct tree construction for deeply-nested child workflows.
|
||||
*/
|
||||
function collectWorkflowDescendants(
|
||||
instanceKey: string,
|
||||
workflowChildGroups: Map<string, ConsoleEntry[]>,
|
||||
visited: Set<string> = new Set()
|
||||
): ConsoleEntry[] {
|
||||
if (visited.has(instanceKey)) return []
|
||||
visited.add(instanceKey)
|
||||
const direct = workflowChildGroups.get(instanceKey) ?? []
|
||||
const result = [...direct]
|
||||
for (const entry of direct) {
|
||||
if (isWorkflowBlockType(entry.blockType)) {
|
||||
// Use childWorkflowInstanceId when available (unique per-invocation) to correctly
|
||||
// separate children across loop iterations of the same workflow block.
|
||||
result.push(
|
||||
...collectWorkflowDescendants(
|
||||
entry.childWorkflowInstanceId ?? entry.blockId,
|
||||
workflowChildGroups,
|
||||
visited
|
||||
)
|
||||
)
|
||||
}
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
/**
|
||||
* Builds a tree structure from flat entries.
|
||||
* Groups iteration entries by (iterationType, iterationContainerId, iterationCurrent), showing all blocks
|
||||
@@ -175,18 +222,37 @@ interface IterationGroup {
|
||||
* Sorts by start time to ensure chronological order.
|
||||
*/
|
||||
function buildEntryTree(entries: ConsoleEntry[]): EntryNode[] {
|
||||
// Separate regular blocks from iteration entries
|
||||
// Separate entries into three buckets:
|
||||
// 1. Iteration entries (loop/parallel children)
|
||||
// 2. Workflow child entries (blocks inside a child workflow)
|
||||
// 3. Regular blocks
|
||||
const regularBlocks: ConsoleEntry[] = []
|
||||
const iterationEntries: ConsoleEntry[] = []
|
||||
const workflowChildEntries: ConsoleEntry[] = []
|
||||
|
||||
for (const entry of entries) {
|
||||
if (entry.iterationType && entry.iterationCurrent !== undefined) {
|
||||
if (entry.childWorkflowBlockId) {
|
||||
// Child workflow entries take priority over iteration classification
|
||||
workflowChildEntries.push(entry)
|
||||
} else if (entry.iterationType && entry.iterationCurrent !== undefined) {
|
||||
iterationEntries.push(entry)
|
||||
} else {
|
||||
regularBlocks.push(entry)
|
||||
}
|
||||
}
|
||||
|
||||
// Group workflow child entries by the parent workflow block ID
|
||||
const workflowChildGroups = new Map<string, ConsoleEntry[]>()
|
||||
for (const entry of workflowChildEntries) {
|
||||
const parentId = entry.childWorkflowBlockId!
|
||||
const group = workflowChildGroups.get(parentId)
|
||||
if (group) {
|
||||
group.push(entry)
|
||||
} else {
|
||||
workflowChildGroups.set(parentId, [entry])
|
||||
}
|
||||
}
|
||||
|
||||
// Group iteration entries by (iterationType, iterationContainerId, iterationCurrent)
|
||||
const iterationGroupsMap = new Map<string, IterationGroup>()
|
||||
for (const entry of iterationEntries) {
|
||||
@@ -261,6 +327,9 @@ function buildEntryTree(entries: ConsoleEntry[]): EntryNode[] {
|
||||
...allBlocks.map((b) => new Date(b.endedAt || b.timestamp).getTime())
|
||||
)
|
||||
const totalDuration = allBlocks.reduce((sum, b) => sum + (b.durationMs || 0), 0)
|
||||
// Parallel branches run concurrently — use wall-clock time. Loop iterations run serially — use sum.
|
||||
const subflowDuration =
|
||||
iterationType === 'parallel' ? subflowEndMs - subflowStartMs : totalDuration
|
||||
|
||||
// Create synthetic subflow parent entry
|
||||
// Use the minimum executionOrder from all child blocks for proper ordering
|
||||
@@ -276,7 +345,7 @@ function buildEntryTree(entries: ConsoleEntry[]): EntryNode[] {
|
||||
startedAt: new Date(subflowStartMs).toISOString(),
|
||||
executionOrder: subflowExecutionOrder,
|
||||
endedAt: new Date(subflowEndMs).toISOString(),
|
||||
durationMs: totalDuration,
|
||||
durationMs: subflowDuration,
|
||||
success: !allBlocks.some((b) => b.error),
|
||||
}
|
||||
|
||||
@@ -291,6 +360,9 @@ function buildEntryTree(entries: ConsoleEntry[]): EntryNode[] {
|
||||
...iterBlocks.map((b) => new Date(b.endedAt || b.timestamp).getTime())
|
||||
)
|
||||
const iterDuration = iterBlocks.reduce((sum, b) => sum + (b.durationMs || 0), 0)
|
||||
// Parallel branches run concurrently — use wall-clock time. Loop iterations run serially — use sum.
|
||||
const iterDisplayDuration =
|
||||
iterationType === 'parallel' ? iterEndMs - iterStartMs : iterDuration
|
||||
|
||||
// Use the minimum executionOrder from blocks in this iteration
|
||||
const iterExecutionOrder = Math.min(...iterBlocks.map((b) => b.executionOrder))
|
||||
@@ -305,7 +377,7 @@ function buildEntryTree(entries: ConsoleEntry[]): EntryNode[] {
|
||||
startedAt: new Date(iterStartMs).toISOString(),
|
||||
executionOrder: iterExecutionOrder,
|
||||
endedAt: new Date(iterEndMs).toISOString(),
|
||||
durationMs: iterDuration,
|
||||
durationMs: iterDisplayDuration,
|
||||
success: !iterBlocks.some((b) => b.error),
|
||||
iterationCurrent: iterGroup.iterationCurrent,
|
||||
iterationTotal: iterGroup.iterationTotal,
|
||||
@@ -313,12 +385,24 @@ function buildEntryTree(entries: ConsoleEntry[]): EntryNode[] {
|
||||
iterationContainerId: iterGroup.iterationContainerId,
|
||||
}
|
||||
|
||||
// Block nodes within this iteration
|
||||
const blockNodes: EntryNode[] = iterBlocks.map((block) => ({
|
||||
entry: block,
|
||||
children: [],
|
||||
nodeType: 'block' as const,
|
||||
}))
|
||||
// Block nodes within this iteration — workflow blocks get their full subtree
|
||||
const blockNodes: EntryNode[] = iterBlocks.map((block) => {
|
||||
if (isWorkflowBlockType(block.blockType)) {
|
||||
const instanceKey = block.childWorkflowInstanceId ?? block.blockId
|
||||
const allDescendants = collectWorkflowDescendants(instanceKey, workflowChildGroups)
|
||||
const rawChildren = allDescendants.map((c) => ({
|
||||
...c,
|
||||
childWorkflowBlockId:
|
||||
c.childWorkflowBlockId === instanceKey ? undefined : c.childWorkflowBlockId,
|
||||
}))
|
||||
return {
|
||||
entry: block,
|
||||
children: buildEntryTree(rawChildren),
|
||||
nodeType: 'workflow' as const,
|
||||
}
|
||||
}
|
||||
return { entry: block, children: [], nodeType: 'block' as const }
|
||||
})
|
||||
|
||||
return {
|
||||
entry: syntheticIteration,
|
||||
@@ -338,19 +422,61 @@ function buildEntryTree(entries: ConsoleEntry[]): EntryNode[] {
|
||||
})
|
||||
}
|
||||
|
||||
// Build nodes for regular blocks
|
||||
const regularNodes: EntryNode[] = regularBlocks.map((entry) => ({
|
||||
// Build workflow nodes for regular blocks that are workflow block types
|
||||
const workflowNodes: EntryNode[] = []
|
||||
const remainingRegularBlocks: ConsoleEntry[] = []
|
||||
|
||||
for (const block of regularBlocks) {
|
||||
if (isWorkflowBlockType(block.blockType)) {
|
||||
const instanceKey = block.childWorkflowInstanceId ?? block.blockId
|
||||
const allDescendants = collectWorkflowDescendants(instanceKey, workflowChildGroups)
|
||||
const rawChildren = allDescendants.map((c) => ({
|
||||
...c,
|
||||
childWorkflowBlockId:
|
||||
c.childWorkflowBlockId === instanceKey ? undefined : c.childWorkflowBlockId,
|
||||
}))
|
||||
const children = buildEntryTree(rawChildren)
|
||||
workflowNodes.push({ entry: block, children, nodeType: 'workflow' as const })
|
||||
} else {
|
||||
remainingRegularBlocks.push(block)
|
||||
}
|
||||
}
|
||||
|
||||
// Build nodes for remaining regular blocks
|
||||
const regularNodes: EntryNode[] = remainingRegularBlocks.map((entry) => ({
|
||||
entry,
|
||||
children: [],
|
||||
nodeType: 'block' as const,
|
||||
}))
|
||||
|
||||
// Combine all nodes and sort by executionOrder ascending (oldest first, top-down)
|
||||
const allNodes = [...subflowNodes, ...regularNodes]
|
||||
const allNodes = [...subflowNodes, ...workflowNodes, ...regularNodes]
|
||||
allNodes.sort((a, b) => a.entry.executionOrder - b.entry.executionOrder)
|
||||
return allNodes
|
||||
}
|
||||
|
||||
/**
|
||||
* Recursively collects IDs of all nodes that should be auto-expanded.
|
||||
* Includes subflow, iteration, and workflow nodes that have children.
|
||||
*/
|
||||
export function collectExpandableNodeIds(nodes: EntryNode[]): string[] {
|
||||
const ids: string[] = []
|
||||
for (const node of nodes) {
|
||||
if (
|
||||
(node.nodeType === 'subflow' ||
|
||||
node.nodeType === 'iteration' ||
|
||||
node.nodeType === 'workflow') &&
|
||||
node.children.length > 0
|
||||
) {
|
||||
ids.push(node.entry.id)
|
||||
}
|
||||
if (node.children.length > 0) {
|
||||
ids.push(...collectExpandableNodeIds(node.children))
|
||||
}
|
||||
}
|
||||
return ids
|
||||
}
|
||||
|
||||
/**
|
||||
* Groups console entries by execution ID and builds a tree structure.
|
||||
* Pre-computes timestamps for efficient sorting.
|
||||
@@ -458,7 +584,7 @@ export function flattenBlockEntriesOnly(
|
||||
): NavigableBlockEntry[] {
|
||||
const result: NavigableBlockEntry[] = []
|
||||
for (const node of nodes) {
|
||||
if (node.nodeType === 'block') {
|
||||
if (node.nodeType === 'block' || node.nodeType === 'workflow') {
|
||||
result.push({
|
||||
entry: node.entry,
|
||||
executionId,
|
||||
|
||||
@@ -20,6 +20,7 @@ import {
|
||||
TriggerUtils,
|
||||
} from '@/lib/workflows/triggers/triggers'
|
||||
import { useCurrentWorkflow } from '@/app/workspace/[workspaceId]/w/[workflowId]/hooks/use-current-workflow'
|
||||
import { updateActiveBlockRefCount } from '@/app/workspace/[workspaceId]/w/[workflowId]/utils/workflow-execution-utils'
|
||||
import { getBlock } from '@/blocks'
|
||||
import type { SerializableExecutionState } from '@/executor/execution/types'
|
||||
import type {
|
||||
@@ -63,6 +64,7 @@ interface BlockEventHandlerConfig {
|
||||
executionIdRef: { current: string }
|
||||
workflowEdges: Array<{ id: string; target: string; sourceHandle?: string | null }>
|
||||
activeBlocksSet: Set<string>
|
||||
activeBlockRefCounts: Map<string, number>
|
||||
accumulatedBlockLogs: BlockLog[]
|
||||
accumulatedBlockStates: Map<string, BlockState>
|
||||
executedBlockIds: Set<string>
|
||||
@@ -309,6 +311,7 @@ export function useWorkflowExecution() {
|
||||
executionIdRef,
|
||||
workflowEdges,
|
||||
activeBlocksSet,
|
||||
activeBlockRefCounts,
|
||||
accumulatedBlockLogs,
|
||||
accumulatedBlockStates,
|
||||
executedBlockIds,
|
||||
@@ -327,11 +330,7 @@ export function useWorkflowExecution() {
|
||||
|
||||
const updateActiveBlocks = (blockId: string, isActive: boolean) => {
|
||||
if (!workflowId) return
|
||||
if (isActive) {
|
||||
activeBlocksSet.add(blockId)
|
||||
} else {
|
||||
activeBlocksSet.delete(blockId)
|
||||
}
|
||||
updateActiveBlockRefCount(activeBlockRefCounts, activeBlocksSet, blockId, isActive)
|
||||
setActiveBlocks(workflowId, new Set(activeBlocksSet))
|
||||
}
|
||||
|
||||
@@ -384,6 +383,9 @@ export function useWorkflowExecution() {
|
||||
iterationTotal: data.iterationTotal,
|
||||
iterationType: data.iterationType,
|
||||
iterationContainerId: data.iterationContainerId,
|
||||
childWorkflowBlockId: data.childWorkflowBlockId,
|
||||
childWorkflowName: data.childWorkflowName,
|
||||
childWorkflowInstanceId: data.childWorkflowInstanceId,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -407,6 +409,9 @@ export function useWorkflowExecution() {
|
||||
iterationTotal: data.iterationTotal,
|
||||
iterationType: data.iterationType,
|
||||
iterationContainerId: data.iterationContainerId,
|
||||
childWorkflowBlockId: data.childWorkflowBlockId,
|
||||
childWorkflowName: data.childWorkflowName,
|
||||
childWorkflowInstanceId: data.childWorkflowInstanceId,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -426,6 +431,9 @@ export function useWorkflowExecution() {
|
||||
iterationTotal: data.iterationTotal,
|
||||
iterationType: data.iterationType,
|
||||
iterationContainerId: data.iterationContainerId,
|
||||
childWorkflowBlockId: data.childWorkflowBlockId,
|
||||
childWorkflowName: data.childWorkflowName,
|
||||
childWorkflowInstanceId: data.childWorkflowInstanceId,
|
||||
},
|
||||
executionIdRef.current
|
||||
)
|
||||
@@ -448,6 +456,9 @@ export function useWorkflowExecution() {
|
||||
iterationTotal: data.iterationTotal,
|
||||
iterationType: data.iterationType,
|
||||
iterationContainerId: data.iterationContainerId,
|
||||
childWorkflowBlockId: data.childWorkflowBlockId,
|
||||
childWorkflowName: data.childWorkflowName,
|
||||
childWorkflowInstanceId: data.childWorkflowInstanceId,
|
||||
},
|
||||
executionIdRef.current
|
||||
)
|
||||
@@ -479,6 +490,8 @@ export function useWorkflowExecution() {
|
||||
iterationTotal: data.iterationTotal,
|
||||
iterationType: data.iterationType,
|
||||
iterationContainerId: data.iterationContainerId,
|
||||
childWorkflowBlockId: data.childWorkflowBlockId,
|
||||
childWorkflowName: data.childWorkflowName,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -536,7 +549,27 @@ export function useWorkflowExecution() {
|
||||
}
|
||||
}
|
||||
|
||||
return { onBlockStarted, onBlockCompleted, onBlockError }
|
||||
const onBlockChildWorkflowStarted = (data: {
|
||||
blockId: string
|
||||
childWorkflowInstanceId: string
|
||||
iterationCurrent?: number
|
||||
iterationContainerId?: string
|
||||
}) => {
|
||||
if (isStaleExecution()) return
|
||||
updateConsole(
|
||||
data.blockId,
|
||||
{
|
||||
childWorkflowInstanceId: data.childWorkflowInstanceId,
|
||||
...(data.iterationCurrent !== undefined && { iterationCurrent: data.iterationCurrent }),
|
||||
...(data.iterationContainerId !== undefined && {
|
||||
iterationContainerId: data.iterationContainerId,
|
||||
}),
|
||||
},
|
||||
executionIdRef.current
|
||||
)
|
||||
}
|
||||
|
||||
return { onBlockStarted, onBlockCompleted, onBlockError, onBlockChildWorkflowStarted }
|
||||
},
|
||||
[addConsole, setActiveBlocks, setBlockRunStatus, setEdgeRunStatus, updateConsole]
|
||||
)
|
||||
@@ -1280,6 +1313,7 @@ export function useWorkflowExecution() {
|
||||
}
|
||||
|
||||
const activeBlocksSet = new Set<string>()
|
||||
const activeBlockRefCounts = new Map<string, number>()
|
||||
const streamedContent = new Map<string, string>()
|
||||
const accumulatedBlockLogs: BlockLog[] = []
|
||||
const accumulatedBlockStates = new Map<string, BlockState>()
|
||||
@@ -1292,6 +1326,7 @@ export function useWorkflowExecution() {
|
||||
executionIdRef,
|
||||
workflowEdges,
|
||||
activeBlocksSet,
|
||||
activeBlockRefCounts,
|
||||
accumulatedBlockLogs,
|
||||
accumulatedBlockStates,
|
||||
executedBlockIds,
|
||||
@@ -1334,6 +1369,7 @@ export function useWorkflowExecution() {
|
||||
onBlockStarted: blockHandlers.onBlockStarted,
|
||||
onBlockCompleted: blockHandlers.onBlockCompleted,
|
||||
onBlockError: blockHandlers.onBlockError,
|
||||
onBlockChildWorkflowStarted: blockHandlers.onBlockChildWorkflowStarted,
|
||||
|
||||
onStreamChunk: (data) => {
|
||||
const existing = streamedContent.get(data.blockId) || ''
|
||||
@@ -1902,6 +1938,7 @@ export function useWorkflowExecution() {
|
||||
const accumulatedBlockStates = new Map<string, BlockState>()
|
||||
const executedBlockIds = new Set<string>()
|
||||
const activeBlocksSet = new Set<string>()
|
||||
const activeBlockRefCounts = new Map<string, number>()
|
||||
|
||||
try {
|
||||
const blockHandlers = buildBlockEventHandlers({
|
||||
@@ -1909,6 +1946,7 @@ export function useWorkflowExecution() {
|
||||
executionIdRef,
|
||||
workflowEdges,
|
||||
activeBlocksSet,
|
||||
activeBlockRefCounts,
|
||||
accumulatedBlockLogs,
|
||||
accumulatedBlockStates,
|
||||
executedBlockIds,
|
||||
@@ -1929,6 +1967,7 @@ export function useWorkflowExecution() {
|
||||
onBlockStarted: blockHandlers.onBlockStarted,
|
||||
onBlockCompleted: blockHandlers.onBlockCompleted,
|
||||
onBlockError: blockHandlers.onBlockError,
|
||||
onBlockChildWorkflowStarted: blockHandlers.onBlockChildWorkflowStarted,
|
||||
|
||||
onExecutionCompleted: (data) => {
|
||||
if (data.success) {
|
||||
@@ -2104,6 +2143,7 @@ export function useWorkflowExecution() {
|
||||
|
||||
const workflowEdges = useWorkflowStore.getState().edges
|
||||
const activeBlocksSet = new Set<string>()
|
||||
const activeBlockRefCounts = new Map<string, number>()
|
||||
const accumulatedBlockLogs: BlockLog[] = []
|
||||
const accumulatedBlockStates = new Map<string, BlockState>()
|
||||
const executedBlockIds = new Set<string>()
|
||||
@@ -2115,6 +2155,7 @@ export function useWorkflowExecution() {
|
||||
executionIdRef,
|
||||
workflowEdges,
|
||||
activeBlocksSet,
|
||||
activeBlockRefCounts,
|
||||
accumulatedBlockLogs,
|
||||
accumulatedBlockStates,
|
||||
executedBlockIds,
|
||||
@@ -2155,6 +2196,10 @@ export function useWorkflowExecution() {
|
||||
clearOnce()
|
||||
handlers.onBlockError(data)
|
||||
},
|
||||
onBlockChildWorkflowStarted: (data) => {
|
||||
clearOnce()
|
||||
handlers.onBlockChildWorkflowStarted(data)
|
||||
},
|
||||
onExecutionCompleted: () => {
|
||||
const currentId = useExecutionStore
|
||||
.getState()
|
||||
|
||||
@@ -5,6 +5,30 @@ import { useTerminalConsoleStore } from '@/stores/terminal'
|
||||
import { useWorkflowRegistry } from '@/stores/workflows/registry/store'
|
||||
import { useWorkflowStore } from '@/stores/workflows/workflow/store'
|
||||
|
||||
/**
|
||||
* Updates the active blocks set and ref counts for a single block.
|
||||
* Ref counting ensures a block stays active until all parallel branches for it complete.
|
||||
*/
|
||||
export function updateActiveBlockRefCount(
|
||||
refCounts: Map<string, number>,
|
||||
activeSet: Set<string>,
|
||||
blockId: string,
|
||||
isActive: boolean
|
||||
): void {
|
||||
if (isActive) {
|
||||
refCounts.set(blockId, (refCounts.get(blockId) ?? 0) + 1)
|
||||
activeSet.add(blockId)
|
||||
} else {
|
||||
const next = (refCounts.get(blockId) ?? 1) - 1
|
||||
if (next <= 0) {
|
||||
refCounts.delete(blockId)
|
||||
activeSet.delete(blockId)
|
||||
} else {
|
||||
refCounts.set(blockId, next)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
export interface WorkflowExecutionOptions {
|
||||
workflowInput?: any
|
||||
onStream?: (se: StreamingExecution) => Promise<void>
|
||||
@@ -39,6 +63,7 @@ export async function executeWorkflowWithFullLogging(
|
||||
const workflowEdges = useWorkflowStore.getState().edges
|
||||
|
||||
const activeBlocksSet = new Set<string>()
|
||||
const activeBlockRefCounts = new Map<string, number>()
|
||||
|
||||
const payload: any = {
|
||||
input: options.workflowInput,
|
||||
@@ -103,7 +128,12 @@ export async function executeWorkflowWithFullLogging(
|
||||
|
||||
switch (event.type) {
|
||||
case 'block:started': {
|
||||
activeBlocksSet.add(event.data.blockId)
|
||||
updateActiveBlockRefCount(
|
||||
activeBlockRefCounts,
|
||||
activeBlocksSet,
|
||||
event.data.blockId,
|
||||
true
|
||||
)
|
||||
setActiveBlocks(wfId, new Set(activeBlocksSet))
|
||||
|
||||
const incomingEdges = workflowEdges.filter(
|
||||
@@ -115,8 +145,13 @@ export async function executeWorkflowWithFullLogging(
|
||||
break
|
||||
}
|
||||
|
||||
case 'block:completed':
|
||||
activeBlocksSet.delete(event.data.blockId)
|
||||
case 'block:completed': {
|
||||
updateActiveBlockRefCount(
|
||||
activeBlockRefCounts,
|
||||
activeBlocksSet,
|
||||
event.data.blockId,
|
||||
false
|
||||
)
|
||||
setActiveBlocks(wfId, new Set(activeBlocksSet))
|
||||
|
||||
setBlockRunStatus(wfId, event.data.blockId, 'success')
|
||||
@@ -138,15 +173,24 @@ export async function executeWorkflowWithFullLogging(
|
||||
iterationTotal: event.data.iterationTotal,
|
||||
iterationType: event.data.iterationType,
|
||||
iterationContainerId: event.data.iterationContainerId,
|
||||
childWorkflowBlockId: event.data.childWorkflowBlockId,
|
||||
childWorkflowName: event.data.childWorkflowName,
|
||||
childWorkflowInstanceId: event.data.childWorkflowInstanceId,
|
||||
})
|
||||
|
||||
if (options.onBlockComplete) {
|
||||
options.onBlockComplete(event.data.blockId, event.data.output).catch(() => {})
|
||||
}
|
||||
break
|
||||
}
|
||||
|
||||
case 'block:error':
|
||||
activeBlocksSet.delete(event.data.blockId)
|
||||
case 'block:error': {
|
||||
updateActiveBlockRefCount(
|
||||
activeBlockRefCounts,
|
||||
activeBlocksSet,
|
||||
event.data.blockId,
|
||||
false
|
||||
)
|
||||
setActiveBlocks(wfId, new Set(activeBlocksSet))
|
||||
|
||||
setBlockRunStatus(wfId, event.data.blockId, 'error')
|
||||
@@ -169,8 +213,30 @@ export async function executeWorkflowWithFullLogging(
|
||||
iterationTotal: event.data.iterationTotal,
|
||||
iterationType: event.data.iterationType,
|
||||
iterationContainerId: event.data.iterationContainerId,
|
||||
childWorkflowBlockId: event.data.childWorkflowBlockId,
|
||||
childWorkflowName: event.data.childWorkflowName,
|
||||
childWorkflowInstanceId: event.data.childWorkflowInstanceId,
|
||||
})
|
||||
break
|
||||
}
|
||||
|
||||
case 'block:childWorkflowStarted': {
|
||||
const { updateConsole } = useTerminalConsoleStore.getState()
|
||||
updateConsole(
|
||||
event.data.blockId,
|
||||
{
|
||||
childWorkflowInstanceId: event.data.childWorkflowInstanceId,
|
||||
...(event.data.iterationCurrent !== undefined && {
|
||||
iterationCurrent: event.data.iterationCurrent,
|
||||
}),
|
||||
...(event.data.iterationContainerId !== undefined && {
|
||||
iterationContainerId: event.data.iterationContainerId,
|
||||
}),
|
||||
},
|
||||
executionId
|
||||
)
|
||||
break
|
||||
}
|
||||
|
||||
case 'execution:completed':
|
||||
executionResult = {
|
||||
|
||||
@@ -159,6 +159,7 @@ export const DEFAULTS = {
|
||||
MAX_FOREACH_ITEMS: 1000,
|
||||
MAX_PARALLEL_BRANCHES: 20,
|
||||
MAX_WORKFLOW_DEPTH: 10,
|
||||
MAX_SSE_CHILD_DEPTH: 3,
|
||||
EXECUTION_TIME: 0,
|
||||
TOKENS: {
|
||||
PROMPT: 0,
|
||||
|
||||
@@ -7,6 +7,7 @@ interface ChildWorkflowErrorOptions {
|
||||
childTraceSpans?: TraceSpan[]
|
||||
executionResult?: ExecutionResult
|
||||
childWorkflowSnapshotId?: string
|
||||
childWorkflowInstanceId?: string
|
||||
cause?: Error
|
||||
}
|
||||
|
||||
@@ -18,6 +19,8 @@ export class ChildWorkflowError extends Error {
|
||||
readonly childWorkflowName: string
|
||||
readonly executionResult?: ExecutionResult
|
||||
readonly childWorkflowSnapshotId?: string
|
||||
/** Per-invocation unique ID used to correlate child block events with this workflow block. */
|
||||
readonly childWorkflowInstanceId?: string
|
||||
|
||||
constructor(options: ChildWorkflowErrorOptions) {
|
||||
super(options.message, { cause: options.cause })
|
||||
@@ -26,6 +29,7 @@ export class ChildWorkflowError extends Error {
|
||||
this.childTraceSpans = options.childTraceSpans ?? []
|
||||
this.executionResult = options.executionResult
|
||||
this.childWorkflowSnapshotId = options.childWorkflowSnapshotId
|
||||
this.childWorkflowInstanceId = options.childWorkflowInstanceId
|
||||
}
|
||||
|
||||
static isChildWorkflowError(error: unknown): error is ChildWorkflowError {
|
||||
|
||||
@@ -166,6 +166,9 @@ export class BlockExecutor {
|
||||
this.state.setBlockOutput(node.id, normalizedOutput, duration)
|
||||
|
||||
if (!isSentinel && blockLog) {
|
||||
const childWorkflowInstanceId = normalizedOutput._childWorkflowInstanceId as
|
||||
| string
|
||||
| undefined
|
||||
const displayOutput = filterOutputForLog(block.metadata?.id || '', normalizedOutput, {
|
||||
block,
|
||||
})
|
||||
@@ -178,7 +181,8 @@ export class BlockExecutor {
|
||||
duration,
|
||||
blockLog.startedAt,
|
||||
blockLog.executionOrder,
|
||||
blockLog.endedAt
|
||||
blockLog.endedAt,
|
||||
childWorkflowInstanceId
|
||||
)
|
||||
}
|
||||
|
||||
@@ -204,6 +208,8 @@ export class BlockExecutor {
|
||||
parallelId?: string
|
||||
branchIndex?: number
|
||||
branchTotal?: number
|
||||
originalBlockId?: string
|
||||
isLoopNode?: boolean
|
||||
} {
|
||||
const metadata = node?.metadata ?? {}
|
||||
return {
|
||||
@@ -212,6 +218,8 @@ export class BlockExecutor {
|
||||
parallelId: metadata.parallelId,
|
||||
branchIndex: metadata.branchIndex,
|
||||
branchTotal: metadata.branchTotal,
|
||||
originalBlockId: metadata.originalBlockId,
|
||||
isLoopNode: metadata.isLoopNode,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -276,6 +284,9 @@ export class BlockExecutor {
|
||||
)
|
||||
|
||||
if (!isSentinel && blockLog) {
|
||||
const childWorkflowInstanceId = ChildWorkflowError.isChildWorkflowError(error)
|
||||
? error.childWorkflowInstanceId
|
||||
: undefined
|
||||
const displayOutput = filterOutputForLog(block.metadata?.id || '', errorOutput, { block })
|
||||
this.callOnBlockComplete(
|
||||
ctx,
|
||||
@@ -286,7 +297,8 @@ export class BlockExecutor {
|
||||
duration,
|
||||
blockLog.startedAt,
|
||||
blockLog.executionOrder,
|
||||
blockLog.endedAt
|
||||
blockLog.endedAt,
|
||||
childWorkflowInstanceId
|
||||
)
|
||||
}
|
||||
|
||||
@@ -428,7 +440,7 @@ export class BlockExecutor {
|
||||
block: SerializedBlock,
|
||||
executionOrder: number
|
||||
): void {
|
||||
const blockId = node.id
|
||||
const blockId = node.metadata?.originalBlockId ?? node.id
|
||||
const blockName = block.metadata?.name ?? blockId
|
||||
const blockType = block.metadata?.id ?? DEFAULTS.BLOCK_TYPE
|
||||
|
||||
@@ -440,7 +452,8 @@ export class BlockExecutor {
|
||||
blockName,
|
||||
blockType,
|
||||
executionOrder,
|
||||
iterationContext
|
||||
iterationContext,
|
||||
ctx.childWorkflowContext
|
||||
)
|
||||
}
|
||||
}
|
||||
@@ -454,9 +467,10 @@ export class BlockExecutor {
|
||||
duration: number,
|
||||
startedAt: string,
|
||||
executionOrder: number,
|
||||
endedAt: string
|
||||
endedAt: string,
|
||||
childWorkflowInstanceId?: string
|
||||
): void {
|
||||
const blockId = node.id
|
||||
const blockId = node.metadata?.originalBlockId ?? node.id
|
||||
const blockName = block.metadata?.name ?? blockId
|
||||
const blockType = block.metadata?.id ?? DEFAULTS.BLOCK_TYPE
|
||||
|
||||
@@ -474,8 +488,10 @@ export class BlockExecutor {
|
||||
startedAt,
|
||||
executionOrder,
|
||||
endedAt,
|
||||
childWorkflowInstanceId,
|
||||
},
|
||||
iterationContext
|
||||
iterationContext,
|
||||
ctx.childWorkflowContext
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -322,7 +322,9 @@ export class DAGExecutor {
|
||||
onStream: this.contextExtensions.onStream,
|
||||
onBlockStart: this.contextExtensions.onBlockStart,
|
||||
onBlockComplete: this.contextExtensions.onBlockComplete,
|
||||
onChildWorkflowInstanceReady: this.contextExtensions.onChildWorkflowInstanceReady,
|
||||
abortSignal: this.contextExtensions.abortSignal,
|
||||
childWorkflowContext: this.contextExtensions.childWorkflowContext,
|
||||
includeFileBase64: this.contextExtensions.includeFileBase64,
|
||||
base64MaxBytes: this.contextExtensions.base64MaxBytes,
|
||||
runFromBlockContext: overrides?.runFromBlockContext,
|
||||
|
||||
@@ -54,6 +54,17 @@ export interface IterationContext {
|
||||
iterationContainerId?: string
|
||||
}
|
||||
|
||||
export interface ChildWorkflowContext {
|
||||
/** The workflow block's ID in the parent execution */
|
||||
parentBlockId: string
|
||||
/** Display name of the child workflow */
|
||||
workflowName: string
|
||||
/** Child workflow ID */
|
||||
workflowId: string
|
||||
/** Nesting depth (1 = first level child) */
|
||||
depth: number
|
||||
}
|
||||
|
||||
export interface ExecutionCallbacks {
|
||||
onStream?: (streamingExec: any) => Promise<void>
|
||||
onBlockStart?: (
|
||||
@@ -61,15 +72,23 @@ export interface ExecutionCallbacks {
|
||||
blockName: string,
|
||||
blockType: string,
|
||||
executionOrder: number,
|
||||
iterationContext?: IterationContext
|
||||
iterationContext?: IterationContext,
|
||||
childWorkflowContext?: ChildWorkflowContext
|
||||
) => Promise<void>
|
||||
onBlockComplete?: (
|
||||
blockId: string,
|
||||
blockName: string,
|
||||
blockType: string,
|
||||
output: any,
|
||||
iterationContext?: IterationContext
|
||||
iterationContext?: IterationContext,
|
||||
childWorkflowContext?: ChildWorkflowContext
|
||||
) => Promise<void>
|
||||
/** Fires immediately after instanceId is generated, before child execution begins. */
|
||||
onChildWorkflowInstanceReady?: (
|
||||
blockId: string,
|
||||
childWorkflowInstanceId: string,
|
||||
iterationContext?: IterationContext
|
||||
) => void
|
||||
}
|
||||
|
||||
export interface ContextExtensions {
|
||||
@@ -105,7 +124,8 @@ export interface ContextExtensions {
|
||||
blockName: string,
|
||||
blockType: string,
|
||||
executionOrder: number,
|
||||
iterationContext?: IterationContext
|
||||
iterationContext?: IterationContext,
|
||||
childWorkflowContext?: ChildWorkflowContext
|
||||
) => Promise<void>
|
||||
onBlockComplete?: (
|
||||
blockId: string,
|
||||
@@ -118,10 +138,23 @@ export interface ContextExtensions {
|
||||
startedAt: string
|
||||
executionOrder: number
|
||||
endedAt: string
|
||||
/** Per-invocation unique ID linking this workflow block execution to its child block events. */
|
||||
childWorkflowInstanceId?: string
|
||||
},
|
||||
iterationContext?: IterationContext
|
||||
iterationContext?: IterationContext,
|
||||
childWorkflowContext?: ChildWorkflowContext
|
||||
) => Promise<void>
|
||||
|
||||
/** Context identifying this execution as a child of a workflow block */
|
||||
childWorkflowContext?: ChildWorkflowContext
|
||||
|
||||
/** Fires immediately after instanceId is generated, before child execution begins. */
|
||||
onChildWorkflowInstanceReady?: (
|
||||
blockId: string,
|
||||
childWorkflowInstanceId: string,
|
||||
iterationContext?: IterationContext
|
||||
) => void
|
||||
|
||||
/**
|
||||
* Run-from-block configuration. When provided, executor runs in partial
|
||||
* execution mode starting from the specified block.
|
||||
|
||||
@@ -6,6 +6,7 @@ import type { BlockOutput } from '@/blocks/types'
|
||||
import { Executor } from '@/executor'
|
||||
import { BlockType, DEFAULTS, HTTP } from '@/executor/constants'
|
||||
import { ChildWorkflowError } from '@/executor/errors/child-workflow-error'
|
||||
import type { IterationContext } from '@/executor/execution/types'
|
||||
import type {
|
||||
BlockHandler,
|
||||
ExecutionContext,
|
||||
@@ -44,6 +45,40 @@ export class WorkflowBlockHandler implements BlockHandler {
|
||||
ctx: ExecutionContext,
|
||||
block: SerializedBlock,
|
||||
inputs: Record<string, any>
|
||||
): Promise<BlockOutput | StreamingExecution> {
|
||||
return this._executeCore(ctx, block, inputs)
|
||||
}
|
||||
|
||||
async executeWithNode(
|
||||
ctx: ExecutionContext,
|
||||
block: SerializedBlock,
|
||||
inputs: Record<string, any>,
|
||||
nodeMetadata: {
|
||||
nodeId: string
|
||||
loopId?: string
|
||||
parallelId?: string
|
||||
branchIndex?: number
|
||||
branchTotal?: number
|
||||
originalBlockId?: string
|
||||
isLoopNode?: boolean
|
||||
}
|
||||
): Promise<BlockOutput | StreamingExecution> {
|
||||
return this._executeCore(ctx, block, inputs, nodeMetadata)
|
||||
}
|
||||
|
||||
private async _executeCore(
|
||||
ctx: ExecutionContext,
|
||||
block: SerializedBlock,
|
||||
inputs: Record<string, any>,
|
||||
nodeMetadata?: {
|
||||
nodeId: string
|
||||
loopId?: string
|
||||
parallelId?: string
|
||||
branchIndex?: number
|
||||
branchTotal?: number
|
||||
originalBlockId?: string
|
||||
isLoopNode?: boolean
|
||||
}
|
||||
): Promise<BlockOutput | StreamingExecution> {
|
||||
logger.info(`Executing workflow block: ${block.id}`)
|
||||
|
||||
@@ -58,6 +93,10 @@ export class WorkflowBlockHandler implements BlockHandler {
|
||||
const workflowMetadata = workflows[workflowId]
|
||||
let childWorkflowName = workflowMetadata?.name || workflowId
|
||||
|
||||
// Unique ID per invocation — used to correlate child block events with this specific
|
||||
// workflow block execution, preventing cross-iteration child mixing in loop contexts.
|
||||
const instanceId = crypto.randomUUID()
|
||||
|
||||
let childWorkflowSnapshotId: string | undefined
|
||||
try {
|
||||
const currentDepth = (ctx.workflowId?.split('_sub_').length || 1) - 1
|
||||
@@ -115,6 +154,19 @@ export class WorkflowBlockHandler implements BlockHandler {
|
||||
)
|
||||
childWorkflowSnapshotId = childSnapshotResult.snapshot.id
|
||||
|
||||
const childDepth = (ctx.childWorkflowContext?.depth ?? 0) + 1
|
||||
const shouldPropagateCallbacks = childDepth <= DEFAULTS.MAX_SSE_CHILD_DEPTH
|
||||
|
||||
if (shouldPropagateCallbacks) {
|
||||
const effectiveBlockId = nodeMetadata
|
||||
? (nodeMetadata.originalBlockId ?? nodeMetadata.nodeId)
|
||||
: block.id
|
||||
const iterationContext = nodeMetadata
|
||||
? this.getIterationContext(ctx, nodeMetadata)
|
||||
: undefined
|
||||
ctx.onChildWorkflowInstanceReady?.(effectiveBlockId, instanceId, iterationContext)
|
||||
}
|
||||
|
||||
const subExecutor = new Executor({
|
||||
workflow: childWorkflow.serializedState,
|
||||
workflowInput: childWorkflowInput,
|
||||
@@ -127,6 +179,18 @@ export class WorkflowBlockHandler implements BlockHandler {
|
||||
userId: ctx.userId,
|
||||
executionId: ctx.executionId,
|
||||
abortSignal: ctx.abortSignal,
|
||||
...(shouldPropagateCallbacks && {
|
||||
onBlockStart: ctx.onBlockStart,
|
||||
onBlockComplete: ctx.onBlockComplete,
|
||||
onStream: ctx.onStream as ((streamingExecution: unknown) => Promise<void>) | undefined,
|
||||
onChildWorkflowInstanceReady: ctx.onChildWorkflowInstanceReady,
|
||||
childWorkflowContext: {
|
||||
parentBlockId: instanceId,
|
||||
workflowName: childWorkflowName,
|
||||
workflowId,
|
||||
depth: childDepth,
|
||||
},
|
||||
}),
|
||||
},
|
||||
})
|
||||
|
||||
@@ -148,6 +212,7 @@ export class WorkflowBlockHandler implements BlockHandler {
|
||||
workflowId,
|
||||
childWorkflowName,
|
||||
duration,
|
||||
instanceId,
|
||||
childTraceSpans,
|
||||
childWorkflowSnapshotId
|
||||
)
|
||||
@@ -183,11 +248,46 @@ export class WorkflowBlockHandler implements BlockHandler {
|
||||
childTraceSpans,
|
||||
executionResult,
|
||||
childWorkflowSnapshotId,
|
||||
childWorkflowInstanceId: instanceId,
|
||||
cause: error instanceof Error ? error : undefined,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
private getIterationContext(
|
||||
ctx: ExecutionContext,
|
||||
nodeMetadata: {
|
||||
loopId?: string
|
||||
parallelId?: string
|
||||
branchIndex?: number
|
||||
branchTotal?: number
|
||||
isLoopNode?: boolean
|
||||
}
|
||||
): IterationContext | undefined {
|
||||
if (nodeMetadata.branchIndex !== undefined && nodeMetadata.branchTotal !== undefined) {
|
||||
return {
|
||||
iterationCurrent: nodeMetadata.branchIndex,
|
||||
iterationTotal: nodeMetadata.branchTotal,
|
||||
iterationType: 'parallel',
|
||||
iterationContainerId: nodeMetadata.parallelId,
|
||||
}
|
||||
}
|
||||
|
||||
if (nodeMetadata.isLoopNode && nodeMetadata.loopId) {
|
||||
const loopScope = ctx.loopExecutions?.get(nodeMetadata.loopId)
|
||||
if (loopScope && loopScope.iteration !== undefined) {
|
||||
return {
|
||||
iterationCurrent: loopScope.iteration,
|
||||
iterationTotal: loopScope.maxIterations,
|
||||
iterationType: 'loop',
|
||||
iterationContainerId: nodeMetadata.loopId,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return undefined
|
||||
}
|
||||
|
||||
/**
|
||||
* Builds a cleaner error message for nested workflow errors.
|
||||
* Parses nested error messages to extract workflow chain and root error.
|
||||
@@ -525,6 +625,7 @@ export class WorkflowBlockHandler implements BlockHandler {
|
||||
childWorkflowId: string,
|
||||
childWorkflowName: string,
|
||||
duration: number,
|
||||
instanceId: string,
|
||||
childTraceSpans?: WorkflowTraceSpan[],
|
||||
childWorkflowSnapshotId?: string
|
||||
): BlockOutput {
|
||||
@@ -538,6 +639,7 @@ export class WorkflowBlockHandler implements BlockHandler {
|
||||
childWorkflowName,
|
||||
childTraceSpans: childTraceSpans || [],
|
||||
childWorkflowSnapshotId,
|
||||
childWorkflowInstanceId: instanceId,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -548,6 +650,7 @@ export class WorkflowBlockHandler implements BlockHandler {
|
||||
...(childWorkflowSnapshotId ? { childWorkflowSnapshotId } : {}),
|
||||
result,
|
||||
childTraceSpans: childTraceSpans || [],
|
||||
_childWorkflowInstanceId: instanceId,
|
||||
} as Record<string, any>
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,7 +1,11 @@
|
||||
import type { TraceSpan } from '@/lib/logs/types'
|
||||
import type { PermissionGroupConfig } from '@/lib/permission-groups/types'
|
||||
import type { BlockOutput } from '@/blocks/types'
|
||||
import type { SerializableExecutionState } from '@/executor/execution/types'
|
||||
import type {
|
||||
ChildWorkflowContext,
|
||||
IterationContext,
|
||||
SerializableExecutionState,
|
||||
} from '@/executor/execution/types'
|
||||
import type { RunFromBlockContext } from '@/executor/utils/run-from-block'
|
||||
import type { SerializedBlock, SerializedWorkflow } from '@/serializer/types'
|
||||
|
||||
@@ -239,15 +243,29 @@ export interface ExecutionContext {
|
||||
blockId: string,
|
||||
blockName: string,
|
||||
blockType: string,
|
||||
executionOrder: number
|
||||
executionOrder: number,
|
||||
iterationContext?: IterationContext,
|
||||
childWorkflowContext?: ChildWorkflowContext
|
||||
) => Promise<void>
|
||||
onBlockComplete?: (
|
||||
blockId: string,
|
||||
blockName: string,
|
||||
blockType: string,
|
||||
output: any
|
||||
output: any,
|
||||
iterationContext?: IterationContext,
|
||||
childWorkflowContext?: ChildWorkflowContext
|
||||
) => Promise<void>
|
||||
|
||||
/** Context identifying this execution as a child of a workflow block */
|
||||
childWorkflowContext?: ChildWorkflowContext
|
||||
|
||||
/** Fires immediately after instanceId is generated, before child execution begins. */
|
||||
onChildWorkflowInstanceReady?: (
|
||||
blockId: string,
|
||||
childWorkflowInstanceId: string,
|
||||
iterationContext?: IterationContext
|
||||
) => void
|
||||
|
||||
/**
|
||||
* AbortSignal for cancellation support.
|
||||
* When the signal is aborted, execution should stop gracefully.
|
||||
@@ -350,6 +368,8 @@ export interface BlockHandler {
|
||||
parallelId?: string
|
||||
branchIndex?: number
|
||||
branchTotal?: number
|
||||
originalBlockId?: string
|
||||
isLoopNode?: boolean
|
||||
}
|
||||
) => Promise<BlockOutput | StreamingExecution>
|
||||
}
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
import { useCallback } from 'react'
|
||||
import { createLogger } from '@sim/logger'
|
||||
import type {
|
||||
BlockChildWorkflowStartedData,
|
||||
BlockCompletedData,
|
||||
BlockErrorData,
|
||||
BlockStartedData,
|
||||
@@ -83,6 +84,9 @@ async function processSSEStream(
|
||||
case 'block:error':
|
||||
callbacks.onBlockError?.(event.data)
|
||||
break
|
||||
case 'block:childWorkflowStarted':
|
||||
callbacks.onBlockChildWorkflowStarted?.(event.data)
|
||||
break
|
||||
case 'stream:chunk':
|
||||
callbacks.onStreamChunk?.(event.data)
|
||||
break
|
||||
@@ -110,6 +114,7 @@ export interface ExecutionStreamCallbacks {
|
||||
onBlockStarted?: (data: BlockStartedData) => void
|
||||
onBlockCompleted?: (data: BlockCompletedData) => void
|
||||
onBlockError?: (data: BlockErrorData) => void
|
||||
onBlockChildWorkflowStarted?: (data: BlockChildWorkflowStartedData) => void
|
||||
onStreamChunk?: (data: StreamChunkData) => void
|
||||
onStreamDone?: (data: StreamDoneData) => void
|
||||
}
|
||||
|
||||
@@ -64,10 +64,31 @@ export async function validateUrlWithDNS(
|
||||
const parsedUrl = new URL(url!)
|
||||
const hostname = parsedUrl.hostname
|
||||
|
||||
try {
|
||||
const { address } = await dns.lookup(hostname)
|
||||
const hostnameLower = hostname.toLowerCase()
|
||||
const cleanHostname =
|
||||
hostnameLower.startsWith('[') && hostnameLower.endsWith(']')
|
||||
? hostnameLower.slice(1, -1)
|
||||
: hostnameLower
|
||||
|
||||
if (isPrivateOrReservedIP(address)) {
|
||||
let isLocalhost = cleanHostname === 'localhost'
|
||||
if (ipaddr.isValid(cleanHostname)) {
|
||||
const processedIP = ipaddr.process(cleanHostname).toString()
|
||||
if (processedIP === '127.0.0.1' || processedIP === '::1') {
|
||||
isLocalhost = true
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
const { address } = await dns.lookup(cleanHostname, { verbatim: true })
|
||||
|
||||
const resolvedIsLoopback =
|
||||
ipaddr.isValid(address) &&
|
||||
(() => {
|
||||
const ip = ipaddr.process(address).toString()
|
||||
return ip === '127.0.0.1' || ip === '::1'
|
||||
})()
|
||||
|
||||
if (isPrivateOrReservedIP(address) && !(isLocalhost && resolvedIsLoopback)) {
|
||||
logger.warn('URL resolves to blocked IP address', {
|
||||
paramName,
|
||||
hostname,
|
||||
@@ -189,8 +210,6 @@ export async function secureFetchWithPinnedIP(
|
||||
|
||||
const agent = isHttps ? new https.Agent(agentOptions) : new http.Agent(agentOptions)
|
||||
|
||||
// Remove accept-encoding since Node.js http/https doesn't auto-decompress
|
||||
// Headers are lowercase due to Web Headers API normalization in executeToolRequest
|
||||
const { 'accept-encoding': _, ...sanitizedHeaders } = options.headers ?? {}
|
||||
|
||||
const requestOptions: http.RequestOptions = {
|
||||
@@ -200,7 +219,7 @@ export async function secureFetchWithPinnedIP(
|
||||
method: options.method || 'GET',
|
||||
headers: sanitizedHeaders,
|
||||
agent,
|
||||
timeout: options.timeout || 300000, // Default 5 minutes
|
||||
timeout: options.timeout || 300000,
|
||||
}
|
||||
|
||||
const protocol = isHttps ? https : http
|
||||
|
||||
@@ -569,10 +569,28 @@ describe('validateUrlWithDNS', () => {
|
||||
expect(result.error).toContain('https://')
|
||||
})
|
||||
|
||||
it('should reject localhost URLs', async () => {
|
||||
it('should accept https localhost URLs', async () => {
|
||||
const result = await validateUrlWithDNS('https://localhost/api')
|
||||
expect(result.isValid).toBe(false)
|
||||
expect(result.error).toContain('localhost')
|
||||
expect(result.isValid).toBe(true)
|
||||
expect(result.resolvedIP).toBeDefined()
|
||||
})
|
||||
|
||||
it('should accept http localhost URLs', async () => {
|
||||
const result = await validateUrlWithDNS('http://localhost/api')
|
||||
expect(result.isValid).toBe(true)
|
||||
expect(result.resolvedIP).toBeDefined()
|
||||
})
|
||||
|
||||
it('should accept IPv4 loopback URLs', async () => {
|
||||
const result = await validateUrlWithDNS('http://127.0.0.1/api')
|
||||
expect(result.isValid).toBe(true)
|
||||
expect(result.resolvedIP).toBeDefined()
|
||||
})
|
||||
|
||||
it('should accept IPv6 loopback URLs', async () => {
|
||||
const result = await validateUrlWithDNS('http://[::1]/api')
|
||||
expect(result.isValid).toBe(true)
|
||||
expect(result.resolvedIP).toBeDefined()
|
||||
})
|
||||
|
||||
it('should reject private IP URLs', async () => {
|
||||
@@ -898,17 +916,37 @@ describe('validateExternalUrl', () => {
|
||||
expect(result.isValid).toBe(false)
|
||||
expect(result.error).toContain('valid URL')
|
||||
})
|
||||
})
|
||||
|
||||
it.concurrent('should reject localhost', () => {
|
||||
describe('localhost and loopback addresses', () => {
|
||||
it.concurrent('should accept https localhost', () => {
|
||||
const result = validateExternalUrl('https://localhost/api')
|
||||
expect(result.isValid).toBe(false)
|
||||
expect(result.error).toContain('localhost')
|
||||
expect(result.isValid).toBe(true)
|
||||
})
|
||||
|
||||
it.concurrent('should reject 127.0.0.1', () => {
|
||||
it.concurrent('should accept http localhost', () => {
|
||||
const result = validateExternalUrl('http://localhost/api')
|
||||
expect(result.isValid).toBe(true)
|
||||
})
|
||||
|
||||
it.concurrent('should accept https 127.0.0.1', () => {
|
||||
const result = validateExternalUrl('https://127.0.0.1/api')
|
||||
expect(result.isValid).toBe(false)
|
||||
expect(result.error).toContain('private IP')
|
||||
expect(result.isValid).toBe(true)
|
||||
})
|
||||
|
||||
it.concurrent('should accept http 127.0.0.1', () => {
|
||||
const result = validateExternalUrl('http://127.0.0.1/api')
|
||||
expect(result.isValid).toBe(true)
|
||||
})
|
||||
|
||||
it.concurrent('should accept https IPv6 loopback', () => {
|
||||
const result = validateExternalUrl('https://[::1]/api')
|
||||
expect(result.isValid).toBe(true)
|
||||
})
|
||||
|
||||
it.concurrent('should accept http IPv6 loopback', () => {
|
||||
const result = validateExternalUrl('http://[::1]/api')
|
||||
expect(result.isValid).toBe(true)
|
||||
})
|
||||
|
||||
it.concurrent('should reject 0.0.0.0', () => {
|
||||
@@ -989,9 +1027,9 @@ describe('validateImageUrl', () => {
|
||||
expect(result.isValid).toBe(true)
|
||||
})
|
||||
|
||||
it.concurrent('should reject localhost URLs', () => {
|
||||
it.concurrent('should accept localhost URLs', () => {
|
||||
const result = validateImageUrl('https://localhost/image.png')
|
||||
expect(result.isValid).toBe(false)
|
||||
expect(result.isValid).toBe(true)
|
||||
})
|
||||
|
||||
it.concurrent('should use imageUrl as default param name', () => {
|
||||
|
||||
@@ -89,9 +89,9 @@ export function validatePathSegment(
|
||||
const pathTraversalPatterns = [
|
||||
'..',
|
||||
'./',
|
||||
'.\\.', // Windows path traversal
|
||||
'%2e%2e', // URL encoded ..
|
||||
'%252e%252e', // Double URL encoded ..
|
||||
'.\\.',
|
||||
'%2e%2e',
|
||||
'%252e%252e',
|
||||
'..%2f',
|
||||
'..%5c',
|
||||
'%2e%2e%2f',
|
||||
@@ -391,7 +391,6 @@ export function validateHostname(
|
||||
|
||||
const lowerHostname = hostname.toLowerCase()
|
||||
|
||||
// Block localhost
|
||||
if (lowerHostname === 'localhost') {
|
||||
logger.warn('Hostname is localhost', { paramName })
|
||||
return {
|
||||
@@ -400,7 +399,6 @@ export function validateHostname(
|
||||
}
|
||||
}
|
||||
|
||||
// Use ipaddr.js to check if hostname is an IP and if it's private/reserved
|
||||
if (ipaddr.isValid(lowerHostname)) {
|
||||
if (isPrivateOrReservedIP(lowerHostname)) {
|
||||
logger.warn('Hostname matches blocked IP range', {
|
||||
@@ -414,7 +412,6 @@ export function validateHostname(
|
||||
}
|
||||
}
|
||||
|
||||
// Basic hostname format validation
|
||||
const hostnamePattern =
|
||||
/^[a-z0-9]([a-z0-9-]{0,61}[a-z0-9])?(\.[a-z0-9]([a-z0-9-]{0,61}[a-z0-9])?)*$/i
|
||||
|
||||
@@ -460,10 +457,7 @@ export function validateFileExtension(
|
||||
}
|
||||
}
|
||||
|
||||
// Remove leading dot if present
|
||||
const ext = extension.startsWith('.') ? extension.slice(1) : extension
|
||||
|
||||
// Normalize to lowercase
|
||||
const normalizedExt = ext.toLowerCase()
|
||||
|
||||
if (!allowedExtensions.map((e) => e.toLowerCase()).includes(normalizedExt)) {
|
||||
@@ -515,7 +509,6 @@ export function validateMicrosoftGraphId(
|
||||
}
|
||||
}
|
||||
|
||||
// Check for path traversal patterns (../)
|
||||
const pathTraversalPatterns = [
|
||||
'../',
|
||||
'..\\',
|
||||
@@ -525,7 +518,7 @@ export function validateMicrosoftGraphId(
|
||||
'%2e%2e%5c',
|
||||
'%2e%2e\\',
|
||||
'..%5c',
|
||||
'%252e%252e%252f', // double encoded
|
||||
'%252e%252e%252f',
|
||||
]
|
||||
|
||||
const lowerValue = value.toLowerCase()
|
||||
@@ -542,7 +535,6 @@ export function validateMicrosoftGraphId(
|
||||
}
|
||||
}
|
||||
|
||||
// Check for control characters and null bytes
|
||||
if (/[\x00-\x1f\x7f]/.test(value) || value.includes('%00')) {
|
||||
logger.warn('Control characters in Microsoft Graph ID', { paramName })
|
||||
return {
|
||||
@@ -551,7 +543,6 @@ export function validateMicrosoftGraphId(
|
||||
}
|
||||
}
|
||||
|
||||
// Check for newlines (which could be used for header injection)
|
||||
if (value.includes('\n') || value.includes('\r')) {
|
||||
return {
|
||||
isValid: false,
|
||||
@@ -559,8 +550,6 @@ export function validateMicrosoftGraphId(
|
||||
}
|
||||
}
|
||||
|
||||
// Microsoft Graph IDs can contain many characters, but not suspicious patterns
|
||||
// We've blocked path traversal, so allow the rest
|
||||
return { isValid: true, sanitized: value }
|
||||
}
|
||||
|
||||
@@ -583,7 +572,6 @@ export function validateJiraCloudId(
|
||||
value: string | null | undefined,
|
||||
paramName = 'cloudId'
|
||||
): ValidationResult {
|
||||
// Jira cloud IDs are alphanumeric with hyphens (UUID-like)
|
||||
return validatePathSegment(value, {
|
||||
paramName,
|
||||
allowHyphens: true,
|
||||
@@ -612,7 +600,6 @@ export function validateJiraIssueKey(
|
||||
value: string | null | undefined,
|
||||
paramName = 'issueKey'
|
||||
): ValidationResult {
|
||||
// Jira issue keys: letters, numbers, hyphens (PROJECT-123 format)
|
||||
return validatePathSegment(value, {
|
||||
paramName,
|
||||
allowHyphens: true,
|
||||
@@ -653,7 +640,6 @@ export function validateExternalUrl(
|
||||
}
|
||||
}
|
||||
|
||||
// Must be a valid URL
|
||||
let parsedUrl: URL
|
||||
try {
|
||||
parsedUrl = new URL(url)
|
||||
@@ -664,28 +650,29 @@ export function validateExternalUrl(
|
||||
}
|
||||
}
|
||||
|
||||
// Only allow https protocol
|
||||
if (parsedUrl.protocol !== 'https:') {
|
||||
const protocol = parsedUrl.protocol
|
||||
const hostname = parsedUrl.hostname.toLowerCase()
|
||||
|
||||
const cleanHostname =
|
||||
hostname.startsWith('[') && hostname.endsWith(']') ? hostname.slice(1, -1) : hostname
|
||||
|
||||
let isLocalhost = cleanHostname === 'localhost'
|
||||
if (ipaddr.isValid(cleanHostname)) {
|
||||
const processedIP = ipaddr.process(cleanHostname).toString()
|
||||
if (processedIP === '127.0.0.1' || processedIP === '::1') {
|
||||
isLocalhost = true
|
||||
}
|
||||
}
|
||||
|
||||
if (protocol !== 'https:' && !(protocol === 'http:' && isLocalhost)) {
|
||||
return {
|
||||
isValid: false,
|
||||
error: `${paramName} must use https:// protocol`,
|
||||
}
|
||||
}
|
||||
|
||||
// Block private IP ranges and localhost
|
||||
const hostname = parsedUrl.hostname.toLowerCase()
|
||||
|
||||
// Block localhost
|
||||
if (hostname === 'localhost') {
|
||||
return {
|
||||
isValid: false,
|
||||
error: `${paramName} cannot point to localhost`,
|
||||
}
|
||||
}
|
||||
|
||||
// Use ipaddr.js to check if hostname is an IP and if it's private/reserved
|
||||
if (ipaddr.isValid(hostname)) {
|
||||
if (isPrivateOrReservedIP(hostname)) {
|
||||
if (!isLocalhost && ipaddr.isValid(cleanHostname)) {
|
||||
if (isPrivateOrReservedIP(cleanHostname)) {
|
||||
return {
|
||||
isValid: false,
|
||||
error: `${paramName} cannot point to private IP addresses`,
|
||||
|
||||
@@ -619,7 +619,6 @@ function cleanupWorker(workerId: number) {
|
||||
workerInfo.activeExecutions = 0
|
||||
|
||||
workers.delete(workerId)
|
||||
logger.info('Worker removed from pool', { workerId, poolSize: workers.size })
|
||||
}
|
||||
|
||||
function resetWorkerIdleTimeout(workerId: number) {
|
||||
@@ -635,7 +634,6 @@ function resetWorkerIdleTimeout(workerId: number) {
|
||||
workerInfo.idleTimeout = setTimeout(() => {
|
||||
const w = workers.get(workerId)
|
||||
if (w && w.activeExecutions === 0) {
|
||||
logger.info('Cleaning up idle worker', { workerId })
|
||||
cleanupWorker(workerId)
|
||||
}
|
||||
}, WORKER_IDLE_TIMEOUT_MS)
|
||||
|
||||
@@ -20,6 +20,7 @@ import { updateWorkflowRunCounts } from '@/lib/workflows/utils'
|
||||
import { Executor } from '@/executor'
|
||||
import type { ExecutionSnapshot } from '@/executor/execution/snapshot'
|
||||
import type {
|
||||
ChildWorkflowContext,
|
||||
ContextExtensions,
|
||||
ExecutionCallbacks,
|
||||
IterationContext,
|
||||
@@ -128,7 +129,7 @@ export async function executeWorkflowCore(
|
||||
const { metadata, workflow, input, workflowVariables, selectedOutputs } = snapshot
|
||||
const { requestId, workflowId, userId, triggerType, executionId, triggerBlockId, useDraftState } =
|
||||
metadata
|
||||
const { onBlockStart, onBlockComplete, onStream } = callbacks
|
||||
const { onBlockStart, onBlockComplete, onStream, onChildWorkflowInstanceReady } = callbacks
|
||||
|
||||
const providedWorkspaceId = metadata.workspaceId
|
||||
if (!providedWorkspaceId) {
|
||||
@@ -287,11 +288,19 @@ export async function executeWorkflowCore(
|
||||
startedAt: string
|
||||
endedAt: string
|
||||
},
|
||||
iterationContext?: IterationContext
|
||||
iterationContext?: IterationContext,
|
||||
childWorkflowContext?: ChildWorkflowContext
|
||||
) => {
|
||||
await loggingSession.onBlockComplete(blockId, blockName, blockType, output)
|
||||
if (onBlockComplete) {
|
||||
await onBlockComplete(blockId, blockName, blockType, output, iterationContext)
|
||||
await onBlockComplete(
|
||||
blockId,
|
||||
blockName,
|
||||
blockType,
|
||||
output,
|
||||
iterationContext,
|
||||
childWorkflowContext
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -320,6 +329,7 @@ export async function executeWorkflowCore(
|
||||
includeFileBase64,
|
||||
base64MaxBytes,
|
||||
stopAfterBlockId: resolvedStopAfterBlockId,
|
||||
onChildWorkflowInstanceReady,
|
||||
}
|
||||
|
||||
const executorInstance = new Executor({
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
import type { ChildWorkflowContext, IterationContext } from '@/executor/execution/types'
|
||||
import type { SubflowType } from '@/stores/workflows/workflow/types'
|
||||
|
||||
export type ExecutionEventType =
|
||||
@@ -8,6 +9,7 @@ export type ExecutionEventType =
|
||||
| 'block:started'
|
||||
| 'block:completed'
|
||||
| 'block:error'
|
||||
| 'block:childWorkflowStarted'
|
||||
| 'stream:chunk'
|
||||
| 'stream:done'
|
||||
|
||||
@@ -81,6 +83,8 @@ export interface BlockStartedEvent extends BaseExecutionEvent {
|
||||
iterationTotal?: number
|
||||
iterationType?: SubflowType
|
||||
iterationContainerId?: string
|
||||
childWorkflowBlockId?: string
|
||||
childWorkflowName?: string
|
||||
}
|
||||
}
|
||||
|
||||
@@ -104,6 +108,10 @@ export interface BlockCompletedEvent extends BaseExecutionEvent {
|
||||
iterationTotal?: number
|
||||
iterationType?: SubflowType
|
||||
iterationContainerId?: string
|
||||
childWorkflowBlockId?: string
|
||||
childWorkflowName?: string
|
||||
/** Per-invocation unique ID for correlating child block events with this workflow block. */
|
||||
childWorkflowInstanceId?: string
|
||||
}
|
||||
}
|
||||
|
||||
@@ -127,6 +135,26 @@ export interface BlockErrorEvent extends BaseExecutionEvent {
|
||||
iterationTotal?: number
|
||||
iterationType?: SubflowType
|
||||
iterationContainerId?: string
|
||||
childWorkflowBlockId?: string
|
||||
childWorkflowName?: string
|
||||
/** Per-invocation unique ID for correlating child block events with this workflow block. */
|
||||
childWorkflowInstanceId?: string
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Block child workflow started event — fires when a workflow block generates its instanceId,
|
||||
* before child execution begins. Allows clients to pre-associate the running entry with
|
||||
* the instanceId so child block events can be correlated in real-time.
|
||||
*/
|
||||
export interface BlockChildWorkflowStartedEvent extends BaseExecutionEvent {
|
||||
type: 'block:childWorkflowStarted'
|
||||
workflowId: string
|
||||
data: {
|
||||
blockId: string
|
||||
childWorkflowInstanceId: string
|
||||
iterationCurrent?: number
|
||||
iterationContainerId?: string
|
||||
}
|
||||
}
|
||||
|
||||
@@ -164,6 +192,7 @@ export type ExecutionEvent =
|
||||
| BlockStartedEvent
|
||||
| BlockCompletedEvent
|
||||
| BlockErrorEvent
|
||||
| BlockChildWorkflowStartedEvent
|
||||
| StreamChunkEvent
|
||||
| StreamDoneEvent
|
||||
|
||||
@@ -174,6 +203,7 @@ export type ExecutionCancelledData = ExecutionCancelledEvent['data']
|
||||
export type BlockStartedData = BlockStartedEvent['data']
|
||||
export type BlockCompletedData = BlockCompletedEvent['data']
|
||||
export type BlockErrorData = BlockErrorEvent['data']
|
||||
export type BlockChildWorkflowStartedData = BlockChildWorkflowStartedEvent['data']
|
||||
export type StreamChunkData = StreamChunkEvent['data']
|
||||
export type StreamDoneData = StreamDoneEvent['data']
|
||||
|
||||
@@ -222,12 +252,8 @@ export function createSSECallbacks(options: SSECallbackOptions) {
|
||||
blockName: string,
|
||||
blockType: string,
|
||||
executionOrder: number,
|
||||
iterationContext?: {
|
||||
iterationCurrent: number
|
||||
iterationTotal?: number
|
||||
iterationType: string
|
||||
iterationContainerId?: string
|
||||
}
|
||||
iterationContext?: IterationContext,
|
||||
childWorkflowContext?: ChildWorkflowContext
|
||||
) => {
|
||||
sendEvent({
|
||||
type: 'block:started',
|
||||
@@ -242,9 +268,13 @@ export function createSSECallbacks(options: SSECallbackOptions) {
|
||||
...(iterationContext && {
|
||||
iterationCurrent: iterationContext.iterationCurrent,
|
||||
iterationTotal: iterationContext.iterationTotal,
|
||||
iterationType: iterationContext.iterationType as any,
|
||||
iterationType: iterationContext.iterationType,
|
||||
iterationContainerId: iterationContext.iterationContainerId,
|
||||
}),
|
||||
...(childWorkflowContext && {
|
||||
childWorkflowBlockId: childWorkflowContext.parentBlockId,
|
||||
childWorkflowName: childWorkflowContext.workflowName,
|
||||
}),
|
||||
},
|
||||
})
|
||||
}
|
||||
@@ -260,23 +290,30 @@ export function createSSECallbacks(options: SSECallbackOptions) {
|
||||
startedAt: string
|
||||
executionOrder: number
|
||||
endedAt: string
|
||||
childWorkflowInstanceId?: string
|
||||
},
|
||||
iterationContext?: {
|
||||
iterationCurrent: number
|
||||
iterationTotal?: number
|
||||
iterationType: string
|
||||
iterationContainerId?: string
|
||||
}
|
||||
iterationContext?: IterationContext,
|
||||
childWorkflowContext?: ChildWorkflowContext
|
||||
) => {
|
||||
const hasError = callbackData.output?.error
|
||||
const iterationData = iterationContext
|
||||
? {
|
||||
iterationCurrent: iterationContext.iterationCurrent,
|
||||
iterationTotal: iterationContext.iterationTotal,
|
||||
iterationType: iterationContext.iterationType as any,
|
||||
iterationType: iterationContext.iterationType,
|
||||
iterationContainerId: iterationContext.iterationContainerId,
|
||||
}
|
||||
: {}
|
||||
const childWorkflowData = childWorkflowContext
|
||||
? {
|
||||
childWorkflowBlockId: childWorkflowContext.parentBlockId,
|
||||
childWorkflowName: childWorkflowContext.workflowName,
|
||||
}
|
||||
: {}
|
||||
|
||||
const instanceData = callbackData.childWorkflowInstanceId
|
||||
? { childWorkflowInstanceId: callbackData.childWorkflowInstanceId }
|
||||
: {}
|
||||
|
||||
if (hasError) {
|
||||
sendEvent({
|
||||
@@ -295,6 +332,8 @@ export function createSSECallbacks(options: SSECallbackOptions) {
|
||||
executionOrder: callbackData.executionOrder,
|
||||
endedAt: callbackData.endedAt,
|
||||
...iterationData,
|
||||
...childWorkflowData,
|
||||
...instanceData,
|
||||
},
|
||||
})
|
||||
} else {
|
||||
@@ -314,6 +353,8 @@ export function createSSECallbacks(options: SSECallbackOptions) {
|
||||
executionOrder: callbackData.executionOrder,
|
||||
endedAt: callbackData.endedAt,
|
||||
...iterationData,
|
||||
...childWorkflowData,
|
||||
...instanceData,
|
||||
},
|
||||
})
|
||||
}
|
||||
@@ -352,5 +393,26 @@ export function createSSECallbacks(options: SSECallbackOptions) {
|
||||
}
|
||||
}
|
||||
|
||||
return { sendEvent, onBlockStart, onBlockComplete, onStream }
|
||||
const onChildWorkflowInstanceReady = (
|
||||
blockId: string,
|
||||
childWorkflowInstanceId: string,
|
||||
iterationContext?: IterationContext
|
||||
) => {
|
||||
sendEvent({
|
||||
type: 'block:childWorkflowStarted',
|
||||
timestamp: new Date().toISOString(),
|
||||
executionId,
|
||||
workflowId,
|
||||
data: {
|
||||
blockId,
|
||||
childWorkflowInstanceId,
|
||||
...(iterationContext && {
|
||||
iterationCurrent: iterationContext.iterationCurrent,
|
||||
iterationContainerId: iterationContext.iterationContainerId,
|
||||
}),
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
return { sendEvent, onBlockStart, onBlockComplete, onStream, onChildWorkflowInstanceReady }
|
||||
}
|
||||
|
||||
@@ -420,6 +420,18 @@ export const useTerminalConsoleStore = create<ConsoleStore>()(
|
||||
updatedEntry.iterationContainerId = update.iterationContainerId
|
||||
}
|
||||
|
||||
if (update.childWorkflowBlockId !== undefined) {
|
||||
updatedEntry.childWorkflowBlockId = update.childWorkflowBlockId
|
||||
}
|
||||
|
||||
if (update.childWorkflowName !== undefined) {
|
||||
updatedEntry.childWorkflowName = update.childWorkflowName
|
||||
}
|
||||
|
||||
if (update.childWorkflowInstanceId !== undefined) {
|
||||
updatedEntry.childWorkflowInstanceId = update.childWorkflowInstanceId
|
||||
}
|
||||
|
||||
return updatedEntry
|
||||
})
|
||||
|
||||
|
||||
@@ -24,6 +24,12 @@ export interface ConsoleEntry {
|
||||
iterationContainerId?: string
|
||||
isRunning?: boolean
|
||||
isCanceled?: boolean
|
||||
/** ID of the workflow block in the parent execution that spawned this child block */
|
||||
childWorkflowBlockId?: string
|
||||
/** Display name of the child workflow this block belongs to */
|
||||
childWorkflowName?: string
|
||||
/** Per-invocation unique ID linking this workflow block to its child block events */
|
||||
childWorkflowInstanceId?: string
|
||||
}
|
||||
|
||||
export interface ConsoleUpdate {
|
||||
@@ -44,6 +50,9 @@ export interface ConsoleUpdate {
|
||||
iterationTotal?: number
|
||||
iterationType?: SubflowType
|
||||
iterationContainerId?: string
|
||||
childWorkflowBlockId?: string
|
||||
childWorkflowName?: string
|
||||
childWorkflowInstanceId?: string
|
||||
}
|
||||
|
||||
export interface ConsoleStore {
|
||||
|
||||
Reference in New Issue
Block a user