Compare commits

..

31 Commits

Author SHA1 Message Date
Siddharth Ganesan
0d6b485d40 Fix 2026-01-27 20:08:27 -08:00
Siddharth Ganesan
a9f271cdb0 I think it works?? 2026-01-27 20:04:53 -08:00
Siddharth Ganesan
0ead5aa04e Fix 2026-01-27 18:35:33 -08:00
Siddharth Ganesan
28fbd0c086 Fix 2026-01-27 18:25:12 -08:00
Siddharth Ganesan
994a664172 Catch error 2026-01-27 18:22:16 -08:00
Siddharth Ganesan
79857e1a04 Fix bugs 2026-01-27 18:16:46 -08:00
Siddharth Ganesan
07dfedd5f1 Fix test 2026-01-27 18:07:36 -08:00
Siddharth Ganesan
c68cda63ae Cleanup 2026-01-27 17:53:37 -08:00
Siddharth Ganesan
4996eea2ee Fix 2026-01-27 17:38:41 -08:00
Siddharth Ganesan
d9631424dc Fix loops and parallels 2026-01-27 17:08:39 -08:00
Vikhyath Mondreti
08bea875e5 Merge branch 'staging' into feat/run-from-block-2 2026-01-27 17:01:31 -08:00
Siddharth Ganesan
c201a7ca91 Fix trigger clear snapshot 2026-01-27 16:53:13 -08:00
Siddharth Ganesan
d80608cdd5 Fix 2026-01-27 16:20:41 -08:00
Siddharth Ganesan
6f66d33e62 Fix mock payload 2026-01-27 14:59:34 -08:00
Siddharth Ganesan
2f504ce07e Fix 2026-01-27 14:55:07 -08:00
Siddharth Ganesan
f55f6cc453 Fix lint 2026-01-27 14:08:32 -08:00
Siddharth Ganesan
c14c614e33 Consolidation 2026-01-27 12:55:27 -08:00
Siddharth Ganesan
415acda403 Allow run from block for triggers 2026-01-27 12:50:16 -08:00
Siddharth Ganesan
8dc45e6e7e Fix 2026-01-27 12:32:18 -08:00
Siddharth Ganesan
7a0aaa460d Clean up 2026-01-27 12:30:46 -08:00
Siddharth Ganesan
2c333bfd98 Lint 2026-01-27 12:25:27 -08:00
Siddharth Ganesan
23ab11a40d Run u ntil block 2026-01-27 12:13:09 -08:00
Siddharth Ganesan
6e541949ec Change ordering 2026-01-27 11:37:36 -08:00
Siddharth Ganesan
3231955a07 Fix loop l ogs 2026-01-27 11:36:09 -08:00
Siddharth Ganesan
d38fb29e05 Fix trace spans 2026-01-27 11:21:42 -08:00
Siddharth Ganesan
5c1e620831 Fix 2026-01-27 11:03:34 -08:00
Siddharth Ganesan
72594df766 Minor improvements 2026-01-27 11:03:13 -08:00
Siddharth Ganesan
be95a7dbd8 Fix 2026-01-27 10:33:31 -08:00
Siddharth Ganesan
da5d4ac9d5 Fix 2026-01-26 17:16:35 -08:00
Siddharth Ganesan
e8534bea7a Fixes 2026-01-26 16:40:14 -08:00
Siddharth Ganesan
3d0b810a8e Run from block 2026-01-26 16:19:41 -08:00
54 changed files with 2505 additions and 602 deletions

View File

@@ -55,21 +55,21 @@ export const {serviceName}{Action}Tool: ToolConfig<
},
params: {
// Hidden params (system-injected, only use hidden for oauth accessToken)
// Hidden params (system-injected)
accessToken: {
type: 'string',
required: true,
visibility: 'hidden',
description: 'OAuth access token',
},
// User-only params (credentials, api key, IDs user must provide)
// User-only params (credentials, IDs user must provide)
someId: {
type: 'string',
required: true,
visibility: 'user-only',
description: 'The ID of the resource',
},
// User-or-LLM params (everything else, can be provided by user OR computed by LLM)
// User-or-LLM params (can be provided by user OR computed by LLM)
query: {
type: 'string',
required: false, // Use false for optional
@@ -114,8 +114,8 @@ export const {serviceName}{Action}Tool: ToolConfig<
### Visibility Options
- `'hidden'` - System-injected (OAuth tokens, internal params). User never sees.
- `'user-only'` - User must provide (credentials, api keys, account-specific IDs)
- `'user-or-llm'` - User provides OR LLM can compute (search queries, content, filters, most fall into this category)
- `'user-only'` - User must provide (credentials, account-specific IDs)
- `'user-or-llm'` - User provides OR LLM can compute (search queries, content, filters)
### Parameter Types
- `'string'` - Text values

View File

@@ -35,7 +35,8 @@ const AutoLayoutRequestSchema = z.object({
})
.optional()
.default({}),
gridSize: z.number().min(0).max(50).optional(),
// Optional: if provided, use these blocks instead of loading from DB
// This allows using blocks with live measurements from the UI
blocks: z.record(z.any()).optional(),
edges: z.array(z.any()).optional(),
loops: z.record(z.any()).optional(),
@@ -52,6 +53,7 @@ export async function POST(request: NextRequest, { params }: { params: Promise<{
const { id: workflowId } = await params
try {
// Get the session
const session = await getSession()
if (!session?.user?.id) {
logger.warn(`[${requestId}] Unauthorized autolayout attempt for workflow ${workflowId}`)
@@ -60,6 +62,7 @@ export async function POST(request: NextRequest, { params }: { params: Promise<{
const userId = session.user.id
// Parse request body
const body = await request.json()
const layoutOptions = AutoLayoutRequestSchema.parse(body)
@@ -67,6 +70,7 @@ export async function POST(request: NextRequest, { params }: { params: Promise<{
userId,
})
// Fetch the workflow to check ownership/access
const accessContext = await getWorkflowAccessContext(workflowId, userId)
const workflowData = accessContext?.workflow
@@ -75,6 +79,7 @@ export async function POST(request: NextRequest, { params }: { params: Promise<{
return NextResponse.json({ error: 'Workflow not found' }, { status: 404 })
}
// Check if user has permission to update this workflow
const canUpdate =
accessContext?.isOwner ||
(workflowData.workspaceId
@@ -89,6 +94,8 @@ export async function POST(request: NextRequest, { params }: { params: Promise<{
return NextResponse.json({ error: 'Access denied' }, { status: 403 })
}
// Use provided blocks/edges if available (with live measurements from UI),
// otherwise load from database
let currentWorkflowData: NormalizedWorkflowData | null
if (layoutOptions.blocks && layoutOptions.edges) {
@@ -118,7 +125,6 @@ export async function POST(request: NextRequest, { params }: { params: Promise<{
y: layoutOptions.padding?.y ?? DEFAULT_LAYOUT_PADDING.y,
},
alignment: layoutOptions.alignment,
gridSize: layoutOptions.gridSize,
}
const layoutResult = applyAutoLayout(

View File

@@ -0,0 +1,216 @@
import { db, workflow as workflowTable } from '@sim/db'
import { createLogger } from '@sim/logger'
import { eq } from 'drizzle-orm'
import { type NextRequest, NextResponse } from 'next/server'
import { v4 as uuidv4 } from 'uuid'
import { z } from 'zod'
import { checkHybridAuth } from '@/lib/auth/hybrid'
import { generateRequestId } from '@/lib/core/utils/request'
import { SSE_HEADERS } from '@/lib/core/utils/sse'
import { markExecutionCancelled } from '@/lib/execution/cancellation'
import { LoggingSession } from '@/lib/logs/execution/logging-session'
import { executeWorkflowCore } from '@/lib/workflows/executor/execution-core'
import { createSSECallbacks } from '@/lib/workflows/executor/execution-events'
import { ExecutionSnapshot } from '@/executor/execution/snapshot'
import type { ExecutionMetadata, SerializableExecutionState } from '@/executor/execution/types'
import { hasExecutionResult } from '@/executor/utils/errors'
const logger = createLogger('ExecuteFromBlockAPI')
const ExecuteFromBlockSchema = z.object({
startBlockId: z.string().min(1, 'Start block ID is required'),
sourceSnapshot: z.object({
blockStates: z.record(z.any()),
executedBlocks: z.array(z.string()),
blockLogs: z.array(z.any()),
decisions: z.object({
router: z.record(z.string()),
condition: z.record(z.string()),
}),
completedLoops: z.array(z.string()),
loopExecutions: z.record(z.any()).optional(),
parallelExecutions: z.record(z.any()).optional(),
parallelBlockMapping: z.record(z.any()).optional(),
activeExecutionPath: z.array(z.string()),
}),
input: z.any().optional(),
})
export const runtime = 'nodejs'
export const dynamic = 'force-dynamic'
export async function POST(req: NextRequest, { params }: { params: Promise<{ id: string }> }) {
const requestId = generateRequestId()
const { id: workflowId } = await params
try {
const auth = await checkHybridAuth(req, { requireWorkflowId: false })
if (!auth.success || !auth.userId) {
return NextResponse.json({ error: auth.error || 'Unauthorized' }, { status: 401 })
}
const userId = auth.userId
let body: unknown
try {
body = await req.json()
} catch {
return NextResponse.json({ error: 'Invalid JSON body' }, { status: 400 })
}
const validation = ExecuteFromBlockSchema.safeParse(body)
if (!validation.success) {
logger.warn(`[${requestId}] Invalid request body:`, validation.error.errors)
return NextResponse.json(
{
error: 'Invalid request body',
details: validation.error.errors.map((e) => ({
path: e.path.join('.'),
message: e.message,
})),
},
{ status: 400 }
)
}
const { startBlockId, sourceSnapshot, input } = validation.data
const executionId = uuidv4()
const [workflowRecord] = await db
.select({ workspaceId: workflowTable.workspaceId, userId: workflowTable.userId })
.from(workflowTable)
.where(eq(workflowTable.id, workflowId))
.limit(1)
if (!workflowRecord?.workspaceId) {
return NextResponse.json({ error: 'Workflow not found or has no workspace' }, { status: 404 })
}
const workspaceId = workflowRecord.workspaceId
const workflowUserId = workflowRecord.userId
logger.info(`[${requestId}] Starting run-from-block execution`, {
workflowId,
startBlockId,
executedBlocksCount: sourceSnapshot.executedBlocks.length,
})
const loggingSession = new LoggingSession(workflowId, executionId, 'manual', requestId)
const abortController = new AbortController()
let isStreamClosed = false
const stream = new ReadableStream<Uint8Array>({
async start(controller) {
const { sendEvent, onBlockStart, onBlockComplete, onStream } = createSSECallbacks({
executionId,
workflowId,
controller,
isStreamClosed: () => isStreamClosed,
setStreamClosed: () => {
isStreamClosed = true
},
})
const metadata: ExecutionMetadata = {
requestId,
workflowId,
userId,
executionId,
triggerType: 'manual',
workspaceId,
workflowUserId,
useDraftState: true,
isClientSession: true,
startTime: new Date().toISOString(),
}
const snapshot = new ExecutionSnapshot(metadata, {}, input || {}, {})
try {
const startTime = new Date()
sendEvent({
type: 'execution:started',
timestamp: startTime.toISOString(),
executionId,
workflowId,
data: { startTime: startTime.toISOString() },
})
const result = await executeWorkflowCore({
snapshot,
loggingSession,
abortSignal: abortController.signal,
runFromBlock: {
startBlockId,
sourceSnapshot: sourceSnapshot as SerializableExecutionState,
},
callbacks: { onBlockStart, onBlockComplete, onStream },
})
if (result.status === 'cancelled') {
sendEvent({
type: 'execution:cancelled',
timestamp: new Date().toISOString(),
executionId,
workflowId,
data: { duration: result.metadata?.duration || 0 },
})
} else {
sendEvent({
type: 'execution:completed',
timestamp: new Date().toISOString(),
executionId,
workflowId,
data: {
success: result.success,
output: result.output,
duration: result.metadata?.duration || 0,
startTime: result.metadata?.startTime || startTime.toISOString(),
endTime: result.metadata?.endTime || new Date().toISOString(),
},
})
}
} catch (error: unknown) {
const errorMessage = error instanceof Error ? error.message : 'Unknown error'
logger.error(`[${requestId}] Run-from-block execution failed: ${errorMessage}`)
const executionResult = hasExecutionResult(error) ? error.executionResult : undefined
sendEvent({
type: 'execution:error',
timestamp: new Date().toISOString(),
executionId,
workflowId,
data: {
error: executionResult?.error || errorMessage,
duration: executionResult?.metadata?.duration || 0,
},
})
} finally {
if (!isStreamClosed) {
try {
controller.enqueue(new TextEncoder().encode('data: [DONE]\n\n'))
controller.close()
} catch {}
}
}
},
cancel() {
isStreamClosed = true
abortController.abort()
markExecutionCancelled(executionId).catch(() => {})
},
})
return new NextResponse(stream, {
headers: { ...SSE_HEADERS, 'X-Execution-Id': executionId },
})
} catch (error: unknown) {
const errorMessage = error instanceof Error ? error.message : 'Unknown error'
logger.error(`[${requestId}] Failed to start run-from-block execution:`, error)
return NextResponse.json(
{ error: errorMessage || 'Failed to start execution' },
{ status: 500 }
)
}
}

View File

@@ -53,6 +53,7 @@ const ExecuteWorkflowSchema = z.object({
parallels: z.record(z.any()).optional(),
})
.optional(),
stopAfterBlockId: z.string().optional(),
})
export const runtime = 'nodejs'
@@ -222,6 +223,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
includeFileBase64,
base64MaxBytes,
workflowStateOverride,
stopAfterBlockId,
} = validation.data
// For API key and internal JWT auth, the entire body is the input (except for our control fields)
@@ -237,6 +239,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
includeFileBase64,
base64MaxBytes,
workflowStateOverride,
stopAfterBlockId: _stopAfterBlockId,
workflowId: _workflowId, // Also exclude workflowId used for internal JWT auth
...rest
} = body
@@ -434,6 +437,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
loggingSession,
includeFileBase64,
base64MaxBytes,
stopAfterBlockId,
})
const outputWithBase64 = includeFileBase64
@@ -722,6 +726,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
abortSignal: abortController.signal,
includeFileBase64,
base64MaxBytes,
stopAfterBlockId,
})
if (result.status === 'paused') {

View File

@@ -0,0 +1,108 @@
import { createLogger } from '@sim/logger'
import { type NextRequest, NextResponse } from 'next/server'
import { z } from 'zod'
import { generateRequestId } from '@/lib/core/utils/request'
import { applyAutoLayout } from '@/lib/workflows/autolayout'
import {
DEFAULT_HORIZONTAL_SPACING,
DEFAULT_LAYOUT_PADDING,
DEFAULT_VERTICAL_SPACING,
} from '@/lib/workflows/autolayout/constants'
const logger = createLogger('YamlAutoLayoutAPI')
const AutoLayoutRequestSchema = z.object({
workflowState: z.object({
blocks: z.record(z.any()),
edges: z.array(z.any()),
loops: z.record(z.any()).optional().default({}),
parallels: z.record(z.any()).optional().default({}),
}),
options: z
.object({
spacing: z
.object({
horizontal: z.number().optional(),
vertical: z.number().optional(),
})
.optional(),
alignment: z.enum(['start', 'center', 'end']).optional(),
padding: z
.object({
x: z.number().optional(),
y: z.number().optional(),
})
.optional(),
})
.optional(),
})
export async function POST(request: NextRequest) {
const requestId = generateRequestId()
try {
const body = await request.json()
const { workflowState, options } = AutoLayoutRequestSchema.parse(body)
logger.info(`[${requestId}] Applying auto layout`, {
blockCount: Object.keys(workflowState.blocks).length,
edgeCount: workflowState.edges.length,
})
const autoLayoutOptions = {
horizontalSpacing: options?.spacing?.horizontal ?? DEFAULT_HORIZONTAL_SPACING,
verticalSpacing: options?.spacing?.vertical ?? DEFAULT_VERTICAL_SPACING,
padding: {
x: options?.padding?.x ?? DEFAULT_LAYOUT_PADDING.x,
y: options?.padding?.y ?? DEFAULT_LAYOUT_PADDING.y,
},
alignment: options?.alignment ?? 'center',
}
const layoutResult = applyAutoLayout(
workflowState.blocks,
workflowState.edges,
autoLayoutOptions
)
if (!layoutResult.success || !layoutResult.blocks) {
logger.error(`[${requestId}] Auto layout failed:`, {
error: layoutResult.error,
})
return NextResponse.json(
{
success: false,
errors: [layoutResult.error || 'Unknown auto layout error'],
},
{ status: 500 }
)
}
logger.info(`[${requestId}] Auto layout completed successfully:`, {
success: true,
blockCount: Object.keys(layoutResult.blocks).length,
})
const transformedResponse = {
success: true,
workflowState: {
blocks: layoutResult.blocks,
edges: workflowState.edges,
loops: workflowState.loops || {},
parallels: workflowState.parallels || {},
},
}
return NextResponse.json(transformedResponse)
} catch (error) {
logger.error(`[${requestId}] Auto layout failed:`, error)
return NextResponse.json(
{
success: false,
errors: [error instanceof Error ? error.message : 'Unknown auto layout error'],
},
{ status: 500 }
)
}
}

View File

@@ -1,11 +1,13 @@
import { memo, useCallback } from 'react'
import { ArrowLeftRight, ArrowUpDown, Circle, CircleOff, LogOut } from 'lucide-react'
import { ArrowLeftRight, ArrowUpDown, Circle, CircleOff, LogOut, Play } from 'lucide-react'
import { Button, Copy, Tooltip, Trash2 } from '@/components/emcn'
import { cn } from '@/lib/core/utils/cn'
import { isInputDefinitionTrigger } from '@/lib/workflows/triggers/input-definition-triggers'
import { useUserPermissionsContext } from '@/app/workspace/[workspaceId]/providers/workspace-permissions-provider'
import { useWorkflowExecution } from '@/app/workspace/[workspaceId]/w/[workflowId]/hooks'
import { validateTriggerPaste } from '@/app/workspace/[workspaceId]/w/[workflowId]/utils'
import { useCollaborativeWorkflow } from '@/hooks/use-collaborative-workflow'
import { useExecutionStore } from '@/stores/execution'
import { useNotificationStore } from '@/stores/notifications'
import { useWorkflowRegistry } from '@/stores/workflows/registry/store'
import { useWorkflowStore } from '@/stores/workflows/workflow/store'
@@ -49,6 +51,7 @@ export const ActionBar = memo(
collaborativeBatchToggleBlockHandles,
} = useCollaborativeWorkflow()
const { setPendingSelection } = useWorkflowRegistry()
const { handleRunFromBlock } = useWorkflowExecution()
const addNotification = useNotificationStore((s) => s.addNotification)
@@ -97,12 +100,39 @@ export const ActionBar = memo(
)
)
const { activeWorkflowId } = useWorkflowRegistry()
const { isExecuting, getLastExecutionSnapshot } = useExecutionStore()
const userPermissions = useUserPermissionsContext()
const edges = useWorkflowStore((state) => state.edges)
const isStartBlock = isInputDefinitionTrigger(blockType)
const isResponseBlock = blockType === 'response'
const isNoteBlock = blockType === 'note'
const isSubflowBlock = blockType === 'loop' || blockType === 'parallel'
const isInsideSubflow = parentId && (parentType === 'loop' || parentType === 'parallel')
const snapshot = activeWorkflowId ? getLastExecutionSnapshot(activeWorkflowId) : null
const incomingEdges = edges.filter((edge) => edge.target === blockId)
const isTriggerBlock = incomingEdges.length === 0
// Check if each source block is either executed OR is a trigger block (triggers don't need prior execution)
const isSourceSatisfied = (sourceId: string) => {
if (snapshot?.executedBlocks.includes(sourceId)) return true
// Check if source is a trigger (has no incoming edges itself)
const sourceIncomingEdges = edges.filter((edge) => edge.target === sourceId)
return sourceIncomingEdges.length === 0
}
// Non-trigger blocks need a snapshot to exist (so upstream outputs are available)
const dependenciesSatisfied =
isTriggerBlock || (snapshot && incomingEdges.every((edge) => isSourceSatisfied(edge.source)))
const canRunFromBlock =
dependenciesSatisfied && !isNoteBlock && !isInsideSubflow && !isExecuting
const handleRunFromBlockClick = useCallback(() => {
if (!activeWorkflowId || !canRunFromBlock) return
handleRunFromBlock(blockId, activeWorkflowId)
}, [blockId, activeWorkflowId, canRunFromBlock, handleRunFromBlock])
/**
* Get appropriate tooltip message based on disabled state
@@ -128,30 +158,35 @@ export const ActionBar = memo(
'dark:border-transparent dark:bg-[var(--surface-4)]'
)}
>
{!isNoteBlock && (
{!isNoteBlock && !isInsideSubflow && (
<Tooltip.Root>
<Tooltip.Trigger asChild>
<Button
variant='ghost'
onClick={(e) => {
e.stopPropagation()
if (!disabled) {
collaborativeBatchToggleBlockEnabled([blockId])
if (canRunFromBlock && !disabled) {
handleRunFromBlockClick()
}
}}
className={ACTION_BUTTON_STYLES}
disabled={disabled}
disabled={disabled || !canRunFromBlock}
>
{isEnabled ? <Circle className={ICON_SIZE} /> : <CircleOff className={ICON_SIZE} />}
<Play className={ICON_SIZE} />
</Button>
</Tooltip.Trigger>
<Tooltip.Content side='top'>
{getTooltipMessage(isEnabled ? 'Disable Block' : 'Enable Block')}
{(() => {
if (disabled) return getTooltipMessage('Run from block')
if (isExecuting) return 'Execution in progress'
if (!dependenciesSatisfied) return 'Run upstream blocks first'
return 'Run from block'
})()}
</Tooltip.Content>
</Tooltip.Root>
)}
{isSubflowBlock && (
{!isNoteBlock && (
<Tooltip.Root>
<Tooltip.Trigger asChild>
<Button

View File

@@ -40,9 +40,16 @@ export interface BlockMenuProps {
onRemoveFromSubflow: () => void
onOpenEditor: () => void
onRename: () => void
onRunFromBlock?: () => void
onRunUntilBlock?: () => void
hasClipboard?: boolean
showRemoveFromSubflow?: boolean
/** Whether run from block is available (has snapshot, was executed, not inside subflow) */
canRunFromBlock?: boolean
disableEdit?: boolean
isExecuting?: boolean
/** Whether the selected block is a trigger (has no incoming edges) */
isPositionalTrigger?: boolean
}
/**
@@ -65,9 +72,14 @@ export function BlockMenu({
onRemoveFromSubflow,
onOpenEditor,
onRename,
onRunFromBlock,
onRunUntilBlock,
hasClipboard = false,
showRemoveFromSubflow = false,
canRunFromBlock = false,
disableEdit = false,
isExecuting = false,
isPositionalTrigger = false,
}: BlockMenuProps) {
const isSingleBlock = selectedBlocks.length === 1
@@ -78,10 +90,15 @@ export function BlockMenu({
(b) =>
TriggerUtils.requiresSingleInstance(b.type) || TriggerUtils.isSingleInstanceBlockType(b.type)
)
const hasTriggerBlock = selectedBlocks.some((b) => TriggerUtils.isTriggerBlock(b))
// A block is a trigger if it's explicitly a trigger type OR has no incoming edges (positional trigger)
const hasTriggerBlock =
selectedBlocks.some((b) => TriggerUtils.isTriggerBlock(b)) || isPositionalTrigger
const allNoteBlocks = selectedBlocks.every((b) => b.type === 'note')
const isSubflow =
isSingleBlock && (selectedBlocks[0]?.type === 'loop' || selectedBlocks[0]?.type === 'parallel')
const isInsideSubflow =
isSingleBlock &&
(selectedBlocks[0]?.parentType === 'loop' || selectedBlocks[0]?.parentType === 'parallel')
const canRemoveFromSubflow = showRemoveFromSubflow && !hasTriggerBlock
@@ -203,6 +220,38 @@ export function BlockMenu({
</PopoverItem>
)}
{/* Run from/until block - only for single non-note block, not inside subflows */}
{isSingleBlock && !allNoteBlocks && !isInsideSubflow && (
<>
<PopoverDivider />
<PopoverItem
disabled={!canRunFromBlock || isExecuting}
onClick={() => {
if (canRunFromBlock && !isExecuting) {
onRunFromBlock?.()
onClose()
}
}}
>
Run from block
</PopoverItem>
{/* Hide "Run until" for triggers - they're always at the start */}
{!hasTriggerBlock && (
<PopoverItem
disabled={isExecuting}
onClick={() => {
if (!isExecuting) {
onRunUntilBlock?.()
onClose()
}
}}
>
Run until block
</PopoverItem>
)}
</>
)}
{/* Destructive action */}
<PopoverDivider />
<PopoverItem

View File

@@ -3,7 +3,6 @@ import { createLogger } from '@sim/logger'
import { useReactFlow } from 'reactflow'
import type { AutoLayoutOptions } from '@/app/workspace/[workspaceId]/w/[workflowId]/utils/auto-layout-utils'
import { applyAutoLayoutAndUpdateStore as applyAutoLayoutStandalone } from '@/app/workspace/[workspaceId]/w/[workflowId]/utils/auto-layout-utils'
import { useSnapToGridSize } from '@/hooks/queries/general-settings'
import { useCanvasViewport } from '@/hooks/use-canvas-viewport'
export type { AutoLayoutOptions }
@@ -14,28 +13,21 @@ const logger = createLogger('useAutoLayout')
* Hook providing auto-layout functionality for workflows.
* Binds workflowId context and provides memoized callback for React components.
* Includes automatic fitView animation after successful layout.
* Automatically uses the user's snap-to-grid setting for grid-aligned layout.
*
* Note: This hook requires a ReactFlowProvider ancestor.
*/
export function useAutoLayout(workflowId: string | null) {
const reactFlowInstance = useReactFlow()
const { fitViewToBounds } = useCanvasViewport(reactFlowInstance)
const snapToGridSize = useSnapToGridSize()
const applyAutoLayoutAndUpdateStore = useCallback(
async (options: AutoLayoutOptions = {}) => {
if (!workflowId) {
return { success: false, error: 'No workflow ID provided' }
}
// Include gridSize from user's snap-to-grid setting
const optionsWithGrid: AutoLayoutOptions = {
...options,
gridSize: options.gridSize ?? (snapToGridSize > 0 ? snapToGridSize : undefined),
}
return applyAutoLayoutStandalone(workflowId, optionsWithGrid)
return applyAutoLayoutStandalone(workflowId, options)
},
[workflowId, snapToGridSize]
[workflowId]
)
/**

View File

@@ -15,13 +15,16 @@ import {
TriggerUtils,
} from '@/lib/workflows/triggers/triggers'
import { useCurrentWorkflow } from '@/app/workspace/[workspaceId]/w/[workflowId]/hooks/use-current-workflow'
import type { BlockLog, ExecutionResult, StreamingExecution } from '@/executor/types'
import { getBlock } from '@/blocks'
import type { SerializableExecutionState } from '@/executor/execution/types'
import type { BlockLog, BlockState, ExecutionResult, StreamingExecution } from '@/executor/types'
import { hasExecutionResult } from '@/executor/utils/errors'
import { coerceValue } from '@/executor/utils/start-block'
import { subscriptionKeys } from '@/hooks/queries/subscription'
import { useExecutionStream } from '@/hooks/use-execution-stream'
import { WorkflowValidationError } from '@/serializer'
import { useExecutionStore } from '@/stores/execution'
import { useNotificationStore } from '@/stores/notifications'
import { useVariablesStore } from '@/stores/panel'
import { useEnvironmentStore } from '@/stores/settings/environment'
import { type ConsoleEntry, useTerminalConsoleStore } from '@/stores/terminal'
@@ -98,11 +101,15 @@ export function useWorkflowExecution() {
setActiveBlocks,
setBlockRunStatus,
setEdgeRunStatus,
setLastExecutionSnapshot,
getLastExecutionSnapshot,
clearLastExecutionSnapshot,
} = useExecutionStore()
const [executionResult, setExecutionResult] = useState<ExecutionResult | null>(null)
const executionStream = useExecutionStream()
const currentChatExecutionIdRef = useRef<string | null>(null)
const isViewingDiff = useWorkflowDiffStore((state) => state.isShowingDiff)
const addNotification = useNotificationStore((state) => state.addNotification)
/**
* Validates debug state before performing debug operations
@@ -668,7 +675,8 @@ export function useWorkflowExecution() {
onStream?: (se: StreamingExecution) => Promise<void>,
executionId?: string,
onBlockComplete?: (blockId: string, output: any) => Promise<void>,
overrideTriggerType?: 'chat' | 'manual' | 'api'
overrideTriggerType?: 'chat' | 'manual' | 'api',
stopAfterBlockId?: string
): Promise<ExecutionResult | StreamingExecution> => {
// Use diff workflow for execution when available, regardless of canvas view state
const executionWorkflowState = null as {
@@ -876,6 +884,8 @@ export function useWorkflowExecution() {
const activeBlocksSet = new Set<string>()
const streamedContent = new Map<string, string>()
const accumulatedBlockLogs: BlockLog[] = []
const accumulatedBlockStates = new Map<string, BlockState>()
const executedBlockIds = new Set<string>()
// Execute the workflow
try {
@@ -887,6 +897,7 @@ export function useWorkflowExecution() {
triggerType: overrideTriggerType || 'manual',
useDraftState: true,
isClientSession: true,
stopAfterBlockId,
workflowStateOverride: executionWorkflowState
? {
blocks: executionWorkflowState.blocks,
@@ -916,18 +927,22 @@ export function useWorkflowExecution() {
logger.info('onBlockCompleted received:', { data })
activeBlocksSet.delete(data.blockId)
// Create a new Set to trigger React re-render
setActiveBlocks(new Set(activeBlocksSet))
// Track successful block execution in run path
setBlockRunStatus(data.blockId, 'success')
// Edges already tracked in onBlockStarted, no need to track again
executedBlockIds.add(data.blockId)
accumulatedBlockStates.set(data.blockId, {
output: data.output,
executed: true,
executionTime: data.durationMs,
})
const isContainerBlock = data.blockType === 'loop' || data.blockType === 'parallel'
if (isContainerBlock) return
const startedAt = new Date(Date.now() - data.durationMs).toISOString()
const endedAt = new Date().toISOString()
// Accumulate block log for the execution result
accumulatedBlockLogs.push({
blockId: data.blockId,
blockName: data.blockName || 'Unknown Block',
@@ -1056,6 +1071,53 @@ export function useWorkflowExecution() {
},
logs: accumulatedBlockLogs,
}
// Add trigger block to executed blocks so downstream blocks can use run-from-block
if (data.success && startBlockId) {
executedBlockIds.add(startBlockId)
}
if (data.success && activeWorkflowId) {
if (stopAfterBlockId) {
const existingSnapshot = getLastExecutionSnapshot(activeWorkflowId)
const mergedBlockStates = {
...(existingSnapshot?.blockStates || {}),
...Object.fromEntries(accumulatedBlockStates),
}
const mergedExecutedBlocks = new Set([
...(existingSnapshot?.executedBlocks || []),
...executedBlockIds,
])
const snapshot: SerializableExecutionState = {
blockStates: mergedBlockStates,
executedBlocks: Array.from(mergedExecutedBlocks),
blockLogs: [...(existingSnapshot?.blockLogs || []), ...accumulatedBlockLogs],
decisions: existingSnapshot?.decisions || { router: {}, condition: {} },
completedLoops: existingSnapshot?.completedLoops || [],
activeExecutionPath: Array.from(mergedExecutedBlocks),
}
setLastExecutionSnapshot(activeWorkflowId, snapshot)
logger.info('Merged execution snapshot after run-until-block', {
workflowId: activeWorkflowId,
newBlocksExecuted: executedBlockIds.size,
totalExecutedBlocks: mergedExecutedBlocks.size,
})
} else {
const snapshot: SerializableExecutionState = {
blockStates: Object.fromEntries(accumulatedBlockStates),
executedBlocks: Array.from(executedBlockIds),
blockLogs: accumulatedBlockLogs,
decisions: { router: {}, condition: {} },
completedLoops: [],
activeExecutionPath: Array.from(executedBlockIds),
}
setLastExecutionSnapshot(activeWorkflowId, snapshot)
logger.info('Stored execution snapshot for run-from-block', {
workflowId: activeWorkflowId,
executedBlocksCount: executedBlockIds.size,
})
}
}
},
onExecutionError: (data) => {
@@ -1376,6 +1438,330 @@ export function useWorkflowExecution() {
setActiveBlocks,
])
/**
* Handles running workflow from a specific block using cached outputs
*/
const handleRunFromBlock = useCallback(
async (blockId: string, workflowId: string) => {
const snapshot = getLastExecutionSnapshot(workflowId)
const workflowEdges = useWorkflowStore.getState().edges
const incomingEdges = workflowEdges.filter((edge) => edge.target === blockId)
const isTriggerBlock = incomingEdges.length === 0
// Check if each source block is either executed OR is a trigger block (triggers don't need prior execution)
const isSourceSatisfied = (sourceId: string) => {
if (snapshot?.executedBlocks.includes(sourceId)) return true
// Check if source is a trigger (has no incoming edges itself)
const sourceIncomingEdges = workflowEdges.filter((edge) => edge.target === sourceId)
return sourceIncomingEdges.length === 0
}
// Non-trigger blocks need a snapshot to exist (so upstream outputs are available)
if (!snapshot && !isTriggerBlock) {
logger.error('No execution snapshot available for run-from-block', { workflowId, blockId })
return
}
const dependenciesSatisfied =
isTriggerBlock || incomingEdges.every((edge) => isSourceSatisfied(edge.source))
if (!dependenciesSatisfied) {
logger.error('Upstream dependencies not satisfied for run-from-block', {
workflowId,
blockId,
})
return
}
// For trigger blocks, always use empty snapshot to prevent stale data from different
// execution paths from being resolved. For non-trigger blocks, use the existing snapshot.
const emptySnapshot: SerializableExecutionState = {
blockStates: {},
executedBlocks: [],
blockLogs: [],
decisions: { router: {}, condition: {} },
completedLoops: [],
activeExecutionPath: [],
}
const effectiveSnapshot: SerializableExecutionState = isTriggerBlock
? emptySnapshot
: snapshot || emptySnapshot
// Extract mock payload for trigger blocks
let workflowInput: any
if (isTriggerBlock) {
const workflowBlocks = useWorkflowStore.getState().blocks
const mergedStates = mergeSubblockState(workflowBlocks, workflowId)
const candidates = resolveStartCandidates(mergedStates, { execution: 'manual' })
const candidate = candidates.find((c) => c.blockId === blockId)
if (candidate) {
if (triggerNeedsMockPayload(candidate)) {
workflowInput = extractTriggerMockPayload(candidate)
} else if (
candidate.path === StartBlockPath.SPLIT_API ||
candidate.path === StartBlockPath.SPLIT_INPUT ||
candidate.path === StartBlockPath.UNIFIED
) {
const inputFormatValue = candidate.block.subBlocks?.inputFormat?.value
if (Array.isArray(inputFormatValue)) {
const testInput: Record<string, any> = {}
inputFormatValue.forEach((field: any) => {
if (field && typeof field === 'object' && field.name && field.value !== undefined) {
testInput[field.name] = coerceValue(field.type, field.value)
}
})
if (Object.keys(testInput).length > 0) {
workflowInput = testInput
}
}
}
} else {
// Fallback: block is trigger by position but not classified as start candidate
const block = mergedStates[blockId]
if (block) {
const blockConfig = getBlock(block.type)
const hasTriggers = blockConfig?.triggers?.available?.length
if (hasTriggers || block.triggerMode) {
workflowInput = extractTriggerMockPayload({
blockId,
block,
path: StartBlockPath.EXTERNAL_TRIGGER,
})
}
}
}
}
setIsExecuting(true)
const executionId = uuidv4()
const accumulatedBlockLogs: BlockLog[] = []
const accumulatedBlockStates = new Map<string, BlockState>()
const executedBlockIds = new Set<string>()
const activeBlocksSet = new Set<string>()
try {
await executionStream.executeFromBlock({
workflowId,
startBlockId: blockId,
sourceSnapshot: effectiveSnapshot,
input: workflowInput,
callbacks: {
onBlockStarted: (data) => {
activeBlocksSet.add(data.blockId)
setActiveBlocks(new Set(activeBlocksSet))
const incomingEdges = workflowEdges.filter((edge) => edge.target === data.blockId)
incomingEdges.forEach((edge) => {
setEdgeRunStatus(edge.id, 'success')
})
},
onBlockCompleted: (data) => {
activeBlocksSet.delete(data.blockId)
setActiveBlocks(new Set(activeBlocksSet))
setBlockRunStatus(data.blockId, 'success')
executedBlockIds.add(data.blockId)
accumulatedBlockStates.set(data.blockId, {
output: data.output,
executed: true,
executionTime: data.durationMs,
})
const isContainerBlock = data.blockType === 'loop' || data.blockType === 'parallel'
if (isContainerBlock) return
const startedAt = new Date(Date.now() - data.durationMs).toISOString()
const endedAt = new Date().toISOString()
accumulatedBlockLogs.push({
blockId: data.blockId,
blockName: data.blockName || 'Unknown Block',
blockType: data.blockType || 'unknown',
input: data.input || {},
output: data.output,
success: true,
durationMs: data.durationMs,
startedAt,
endedAt,
})
addConsole({
input: data.input || {},
output: data.output,
success: true,
durationMs: data.durationMs,
startedAt,
endedAt,
workflowId,
blockId: data.blockId,
executionId,
blockName: data.blockName || 'Unknown Block',
blockType: data.blockType || 'unknown',
iterationCurrent: data.iterationCurrent,
iterationTotal: data.iterationTotal,
iterationType: data.iterationType,
})
},
onBlockError: (data) => {
activeBlocksSet.delete(data.blockId)
setActiveBlocks(new Set(activeBlocksSet))
setBlockRunStatus(data.blockId, 'error')
const startedAt = new Date(Date.now() - data.durationMs).toISOString()
const endedAt = new Date().toISOString()
accumulatedBlockLogs.push({
blockId: data.blockId,
blockName: data.blockName || 'Unknown Block',
blockType: data.blockType || 'unknown',
input: data.input || {},
output: {},
success: false,
error: data.error,
durationMs: data.durationMs,
startedAt,
endedAt,
})
addConsole({
input: data.input || {},
output: {},
success: false,
error: data.error,
durationMs: data.durationMs,
startedAt,
endedAt,
workflowId,
blockId: data.blockId,
executionId,
blockName: data.blockName,
blockType: data.blockType,
iterationCurrent: data.iterationCurrent,
iterationTotal: data.iterationTotal,
iterationType: data.iterationType,
})
},
onExecutionCompleted: (data) => {
if (data.success) {
// Add the start block (trigger) to executed blocks
executedBlockIds.add(blockId)
const mergedBlockStates: Record<string, BlockState> = {
...effectiveSnapshot.blockStates,
}
for (const [bId, state] of accumulatedBlockStates) {
mergedBlockStates[bId] = state
}
const mergedExecutedBlocks = new Set([
...effectiveSnapshot.executedBlocks,
...executedBlockIds,
])
const updatedSnapshot: SerializableExecutionState = {
...effectiveSnapshot,
blockStates: mergedBlockStates,
executedBlocks: Array.from(mergedExecutedBlocks),
blockLogs: [...effectiveSnapshot.blockLogs, ...accumulatedBlockLogs],
activeExecutionPath: Array.from(mergedExecutedBlocks),
}
setLastExecutionSnapshot(workflowId, updatedSnapshot)
}
},
onExecutionError: (data) => {
const isWorkflowModified =
data.error?.includes('Block not found in workflow') ||
data.error?.includes('Upstream dependency not executed')
if (isWorkflowModified) {
clearLastExecutionSnapshot(workflowId)
addNotification({
level: 'error',
message:
'Workflow was modified. Run the workflow again to enable running from block.',
workflowId,
})
} else {
addNotification({
level: 'error',
message: data.error || 'Run from block failed',
workflowId,
})
}
},
},
})
} catch (error) {
if ((error as Error).name !== 'AbortError') {
logger.error('Run-from-block failed:', error)
}
} finally {
setIsExecuting(false)
setActiveBlocks(new Set())
}
},
[
getLastExecutionSnapshot,
setLastExecutionSnapshot,
clearLastExecutionSnapshot,
setIsExecuting,
setActiveBlocks,
setBlockRunStatus,
setEdgeRunStatus,
addNotification,
addConsole,
executionStream,
]
)
/**
* Handles running workflow until a specific block (stops after that block completes)
*/
const handleRunUntilBlock = useCallback(
async (blockId: string, workflowId: string) => {
if (!workflowId || workflowId !== activeWorkflowId) {
logger.error('Invalid workflow ID for run-until-block', { workflowId, activeWorkflowId })
return
}
logger.info('Starting run-until-block execution', { workflowId, stopAfterBlockId: blockId })
setExecutionResult(null)
setIsExecuting(true)
const executionId = uuidv4()
try {
const result = await executeWorkflow(
undefined,
undefined,
executionId,
undefined,
'manual',
blockId
)
if (result && 'success' in result) {
setExecutionResult(result)
}
} catch (error) {
const errorResult = handleExecutionError(error, { executionId })
return errorResult
} finally {
setIsExecuting(false)
setIsDebugging(false)
setActiveBlocks(new Set())
}
},
[activeWorkflowId, setExecutionResult, setIsExecuting, setIsDebugging, setActiveBlocks]
)
return {
isExecuting,
isDebugging,
@@ -1386,5 +1772,7 @@ export function useWorkflowExecution() {
handleResumeDebug,
handleCancelDebug,
handleCancelExecution,
handleRunFromBlock,
handleRunUntilBlock,
}
}

View File

@@ -21,7 +21,6 @@ export interface AutoLayoutOptions {
x?: number
y?: number
}
gridSize?: number
}
/**
@@ -63,7 +62,6 @@ export async function applyAutoLayoutAndUpdateStore(
x: options.padding?.x ?? DEFAULT_LAYOUT_PADDING.x,
y: options.padding?.y ?? DEFAULT_LAYOUT_PADDING.y,
},
gridSize: options.gridSize,
}
// Call the autolayout API route

View File

@@ -47,6 +47,7 @@ import {
useCurrentWorkflow,
useNodeUtilities,
useShiftSelectionLock,
useWorkflowExecution,
} from '@/app/workspace/[workspaceId]/w/[workflowId]/hooks'
import {
calculateContainerDimensions,
@@ -302,6 +303,8 @@ const WorkflowContent = React.memo(() => {
const showTrainingModal = useCopilotTrainingStore((state) => state.showModal)
const { handleRunFromBlock, handleRunUntilBlock } = useWorkflowExecution()
const snapToGridSize = useSnapToGridSize()
const snapToGrid = snapToGridSize > 0
@@ -733,13 +736,16 @@ const WorkflowContent = React.memo(() => {
[collaborativeBatchAddBlocks, setSelectedEdges, setPendingSelection]
)
const { activeBlockIds, pendingBlocks, isDebugging } = useExecutionStore(
useShallow((state) => ({
activeBlockIds: state.activeBlockIds,
pendingBlocks: state.pendingBlocks,
isDebugging: state.isDebugging,
}))
)
const { activeBlockIds, pendingBlocks, isDebugging, isExecuting, getLastExecutionSnapshot } =
useExecutionStore(
useShallow((state) => ({
activeBlockIds: state.activeBlockIds,
pendingBlocks: state.pendingBlocks,
isDebugging: state.isDebugging,
isExecuting: state.isExecuting,
getLastExecutionSnapshot: state.getLastExecutionSnapshot,
}))
)
const [dragStartParentId, setDragStartParentId] = useState<string | null>(null)
@@ -1102,6 +1108,50 @@ const WorkflowContent = React.memo(() => {
}
}, [contextMenuBlocks])
const handleContextRunFromBlock = useCallback(() => {
if (contextMenuBlocks.length !== 1) return
const blockId = contextMenuBlocks[0].id
handleRunFromBlock(blockId, workflowIdParam)
}, [contextMenuBlocks, workflowIdParam, handleRunFromBlock])
const handleContextRunUntilBlock = useCallback(() => {
if (contextMenuBlocks.length !== 1) return
const blockId = contextMenuBlocks[0].id
handleRunUntilBlock(blockId, workflowIdParam)
}, [contextMenuBlocks, workflowIdParam, handleRunUntilBlock])
const runFromBlockState = useMemo(() => {
if (contextMenuBlocks.length !== 1) {
return { canRun: false, reason: undefined }
}
const block = contextMenuBlocks[0]
const snapshot = getLastExecutionSnapshot(workflowIdParam)
const incomingEdges = edges.filter((edge) => edge.target === block.id)
const isTriggerBlock = incomingEdges.length === 0
// Check if each source block is either executed OR is a trigger block (triggers don't need prior execution)
const isSourceSatisfied = (sourceId: string) => {
if (snapshot?.executedBlocks.includes(sourceId)) return true
// Check if source is a trigger (has no incoming edges itself)
const sourceIncomingEdges = edges.filter((edge) => edge.target === sourceId)
return sourceIncomingEdges.length === 0
}
// Non-trigger blocks need a snapshot to exist (so upstream outputs are available)
const dependenciesSatisfied =
isTriggerBlock || (snapshot && incomingEdges.every((edge) => isSourceSatisfied(edge.source)))
const isNoteBlock = block.type === 'note'
const isInsideSubflow =
block.parentId && (block.parentType === 'loop' || block.parentType === 'parallel')
if (isInsideSubflow) return { canRun: false, reason: 'Cannot run from inside subflow' }
if (!dependenciesSatisfied) return { canRun: false, reason: 'Run upstream blocks first' }
if (isNoteBlock) return { canRun: false, reason: undefined }
if (isExecuting) return { canRun: false, reason: undefined }
return { canRun: true, reason: undefined }
}, [contextMenuBlocks, edges, workflowIdParam, getLastExecutionSnapshot, isExecuting])
const handleContextAddBlock = useCallback(() => {
useSearchModalStore.getState().open()
}, [])
@@ -3418,11 +3468,19 @@ const WorkflowContent = React.memo(() => {
onRemoveFromSubflow={handleContextRemoveFromSubflow}
onOpenEditor={handleContextOpenEditor}
onRename={handleContextRename}
onRunFromBlock={handleContextRunFromBlock}
onRunUntilBlock={handleContextRunUntilBlock}
hasClipboard={hasClipboard()}
showRemoveFromSubflow={contextMenuBlocks.some(
(b) => b.parentId && (b.parentType === 'loop' || b.parentType === 'parallel')
)}
canRunFromBlock={runFromBlockState.canRun}
disableEdit={!effectivePermissions.canEdit}
isExecuting={isExecuting}
isPositionalTrigger={
contextMenuBlocks.length === 1 &&
edges.filter((e) => e.target === contextMenuBlocks[0]?.id).length === 0
}
/>
<CanvasMenu

View File

@@ -33,6 +33,15 @@ export interface DAG {
parallelConfigs: Map<string, SerializedParallel>
}
export interface DAGBuildOptions {
/** Trigger block ID to start path construction from */
triggerBlockId?: string
/** Saved incoming edges from snapshot for resumption */
savedIncomingEdges?: Record<string, string[]>
/** Include all enabled blocks instead of only those reachable from trigger */
includeAllBlocks?: boolean
}
export class DAGBuilder {
private pathConstructor = new PathConstructor()
private loopConstructor = new LoopConstructor()
@@ -40,11 +49,9 @@ export class DAGBuilder {
private nodeConstructor = new NodeConstructor()
private edgeConstructor = new EdgeConstructor()
build(
workflow: SerializedWorkflow,
triggerBlockId?: string,
savedIncomingEdges?: Record<string, string[]>
): DAG {
build(workflow: SerializedWorkflow, options: DAGBuildOptions = {}): DAG {
const { triggerBlockId, savedIncomingEdges, includeAllBlocks } = options
const dag: DAG = {
nodes: new Map(),
loopConfigs: new Map(),
@@ -53,7 +60,7 @@ export class DAGBuilder {
this.initializeConfigs(workflow, dag)
const reachableBlocks = this.pathConstructor.execute(workflow, triggerBlockId)
const reachableBlocks = this.pathConstructor.execute(workflow, triggerBlockId, includeAllBlocks)
this.loopConstructor.execute(dag, reachableBlocks)
this.parallelConstructor.execute(dag, reachableBlocks)

View File

@@ -6,7 +6,16 @@ import type { SerializedBlock, SerializedWorkflow } from '@/serializer/types'
const logger = createLogger('PathConstructor')
export class PathConstructor {
execute(workflow: SerializedWorkflow, triggerBlockId?: string): Set<string> {
execute(
workflow: SerializedWorkflow,
triggerBlockId?: string,
includeAllBlocks?: boolean
): Set<string> {
// For run-from-block mode, include all enabled blocks regardless of trigger reachability
if (includeAllBlocks) {
return this.getAllEnabledBlocks(workflow)
}
const resolvedTriggerId = this.findTriggerBlock(workflow, triggerBlockId)
if (!resolvedTriggerId) {

View File

@@ -28,7 +28,6 @@ import type {
} from '@/executor/types'
import { streamingResponseFormatProcessor } from '@/executor/utils'
import { buildBlockExecutionError, normalizeError } from '@/executor/utils/errors'
import { isJSONString } from '@/executor/utils/json'
import { filterOutputForLog } from '@/executor/utils/output-filter'
import { validateBlockType } from '@/executor/utils/permission-check'
import type { VariableResolver } from '@/executor/variables/resolver'
@@ -87,7 +86,7 @@ export class BlockExecutor {
resolvedInputs = this.resolver.resolveInputs(ctx, node.id, block.config.params, block)
if (blockLog) {
blockLog.input = this.parseJsonInputs(resolvedInputs)
blockLog.input = resolvedInputs
}
} catch (error) {
cleanupSelfReference?.()
@@ -158,14 +157,7 @@ export class BlockExecutor {
const displayOutput = filterOutputForLog(block.metadata?.id || '', normalizedOutput, {
block,
})
this.callOnBlockComplete(
ctx,
node,
block,
this.parseJsonInputs(resolvedInputs),
displayOutput,
duration
)
this.callOnBlockComplete(ctx, node, block, resolvedInputs, displayOutput, duration)
}
return normalizedOutput
@@ -241,7 +233,7 @@ export class BlockExecutor {
blockLog.durationMs = duration
blockLog.success = false
blockLog.error = errorMessage
blockLog.input = this.parseJsonInputs(input)
blockLog.input = input
blockLog.output = filterOutputForLog(block.metadata?.id || '', errorOutput, { block })
}
@@ -256,14 +248,7 @@ export class BlockExecutor {
if (!isSentinel) {
const displayOutput = filterOutputForLog(block.metadata?.id || '', errorOutput, { block })
this.callOnBlockComplete(
ctx,
node,
block,
this.parseJsonInputs(input),
displayOutput,
duration
)
this.callOnBlockComplete(ctx, node, block, input, displayOutput, duration)
}
const hasErrorPort = this.hasErrorPortEdge(node)
@@ -351,36 +336,6 @@ export class BlockExecutor {
return { result: output }
}
/**
* Parse JSON string inputs to objects for log display only.
* Attempts to parse any string that looks like JSON.
* Returns a new object - does not mutate the original inputs.
*/
private parseJsonInputs(inputs: Record<string, any>): Record<string, any> {
let result = inputs
let hasChanges = false
for (const [key, value] of Object.entries(inputs)) {
// isJSONString is a quick heuristic (checks for { or [), not a validator.
// Invalid JSON is safely caught below - this just avoids JSON.parse on every string.
if (typeof value !== 'string' || !isJSONString(value)) {
continue
}
try {
if (!hasChanges) {
result = { ...inputs }
hasChanges = true
}
result[key] = JSON.parse(value.trim())
} catch {
// Not valid JSON, keep original string
}
}
return result
}
private callOnBlockStart(ctx: ExecutionContext, node: DAGNode, block: SerializedBlock): void {
const blockId = node.id
const blockName = block.metadata?.name ?? blockId

View File

@@ -26,6 +26,7 @@ export class ExecutionEngine {
private allowResumeTriggers: boolean
private cancelledFlag = false
private errorFlag = false
private stoppedEarlyFlag = false
private executionError: Error | null = null
private lastCancellationCheck = 0
private readonly useRedisCancellation: boolean
@@ -105,7 +106,7 @@ export class ExecutionEngine {
this.initializeQueue(triggerBlockId)
while (this.hasWork()) {
if ((await this.checkCancellation()) || this.errorFlag) {
if ((await this.checkCancellation()) || this.errorFlag || this.stoppedEarlyFlag) {
break
}
await this.processQueue()
@@ -259,6 +260,16 @@ export class ExecutionEngine {
}
private initializeQueue(triggerBlockId?: string): void {
if (this.context.runFromBlockContext) {
const { startBlockId } = this.context.runFromBlockContext
logger.info('Initializing queue for run-from-block mode', {
startBlockId,
dirtySetSize: this.context.runFromBlockContext.dirtySet.size,
})
this.addToQueue(startBlockId)
return
}
const pendingBlocks = this.context.metadata.pendingBlocks
const remainingEdges = (this.context.metadata as any).remainingEdges
@@ -385,6 +396,17 @@ export class ExecutionEngine {
this.finalOutput = output
}
if (this.context.stopAfterBlockId === nodeId) {
// For loop/parallel sentinels, only stop if the subflow has fully exited (all iterations done)
// shouldContinue: true means more iterations, shouldExit: true means loop is done
const shouldContinueLoop = output.shouldContinue === true
if (!shouldContinueLoop) {
logger.info('Stopping execution after target block', { nodeId })
this.stoppedEarlyFlag = true
return
}
}
const readyNodes = this.edgeManager.processOutgoingEdges(node, output, false)
logger.info('Processing outgoing edges', {

View File

@@ -5,17 +5,31 @@ import { BlockExecutor } from '@/executor/execution/block-executor'
import { EdgeManager } from '@/executor/execution/edge-manager'
import { ExecutionEngine } from '@/executor/execution/engine'
import { ExecutionState } from '@/executor/execution/state'
import type { ContextExtensions, WorkflowInput } from '@/executor/execution/types'
import type {
ContextExtensions,
SerializableExecutionState,
WorkflowInput,
} from '@/executor/execution/types'
import { createBlockHandlers } from '@/executor/handlers/registry'
import { LoopOrchestrator } from '@/executor/orchestrators/loop'
import { NodeExecutionOrchestrator } from '@/executor/orchestrators/node'
import { ParallelOrchestrator } from '@/executor/orchestrators/parallel'
import type { BlockState, ExecutionContext, ExecutionResult } from '@/executor/types'
import {
computeExecutionSets,
type RunFromBlockContext,
resolveContainerToSentinelStart,
validateRunFromBlock,
} from '@/executor/utils/run-from-block'
import {
buildResolutionFromBlock,
buildStartBlockOutput,
resolveExecutorStartBlock,
} from '@/executor/utils/start-block'
import {
extractLoopIdFromSentinel,
extractParallelIdFromSentinel,
} from '@/executor/utils/subflow-utils'
import { VariableResolver } from '@/executor/variables/resolver'
import type { SerializedWorkflow } from '@/serializer/types'
@@ -48,7 +62,10 @@ export class DAGExecutor {
async execute(workflowId: string, triggerBlockId?: string): Promise<ExecutionResult> {
const savedIncomingEdges = this.contextExtensions.dagIncomingEdges
const dag = this.dagBuilder.build(this.workflow, triggerBlockId, savedIncomingEdges)
const dag = this.dagBuilder.build(this.workflow, {
triggerBlockId,
savedIncomingEdges,
})
const { context, state } = this.createExecutionContext(workflowId, triggerBlockId)
const resolver = new VariableResolver(this.workflow, this.workflowVariables, state)
@@ -89,17 +106,151 @@ export class DAGExecutor {
}
}
/**
* Execute from a specific block using cached outputs for upstream blocks.
*/
async executeFromBlock(
workflowId: string,
startBlockId: string,
sourceSnapshot: SerializableExecutionState
): Promise<ExecutionResult> {
// Build full DAG with all blocks to compute upstream set for snapshot filtering
// includeAllBlocks is needed because the startBlockId might be a trigger not reachable from the main trigger
const dag = this.dagBuilder.build(this.workflow, { includeAllBlocks: true })
const executedBlocks = new Set(sourceSnapshot.executedBlocks)
const validation = validateRunFromBlock(startBlockId, dag, executedBlocks)
if (!validation.valid) {
throw new Error(validation.error)
}
const { dirtySet, upstreamSet } = computeExecutionSets(dag, startBlockId)
const effectiveStartBlockId = resolveContainerToSentinelStart(startBlockId, dag) ?? startBlockId
// Extract container IDs from sentinel IDs in upstream set
const upstreamContainerIds = new Set<string>()
for (const nodeId of upstreamSet) {
const loopId = extractLoopIdFromSentinel(nodeId)
if (loopId) upstreamContainerIds.add(loopId)
const parallelId = extractParallelIdFromSentinel(nodeId)
if (parallelId) upstreamContainerIds.add(parallelId)
}
// Filter snapshot to only include upstream blocks - prevents references to non-upstream blocks
const filteredBlockStates: Record<string, any> = {}
for (const [blockId, state] of Object.entries(sourceSnapshot.blockStates)) {
if (upstreamSet.has(blockId) || upstreamContainerIds.has(blockId)) {
filteredBlockStates[blockId] = state
}
}
const filteredExecutedBlocks = sourceSnapshot.executedBlocks.filter(
(id) => upstreamSet.has(id) || upstreamContainerIds.has(id)
)
// Filter loop/parallel executions to only include upstream containers
const filteredLoopExecutions: Record<string, any> = {}
if (sourceSnapshot.loopExecutions) {
for (const [loopId, execution] of Object.entries(sourceSnapshot.loopExecutions)) {
if (upstreamContainerIds.has(loopId)) {
filteredLoopExecutions[loopId] = execution
}
}
}
const filteredParallelExecutions: Record<string, any> = {}
if (sourceSnapshot.parallelExecutions) {
for (const [parallelId, execution] of Object.entries(sourceSnapshot.parallelExecutions)) {
if (upstreamContainerIds.has(parallelId)) {
filteredParallelExecutions[parallelId] = execution
}
}
}
const filteredSnapshot: SerializableExecutionState = {
...sourceSnapshot,
blockStates: filteredBlockStates,
executedBlocks: filteredExecutedBlocks,
loopExecutions: filteredLoopExecutions,
parallelExecutions: filteredParallelExecutions,
}
logger.info('Executing from block', {
workflowId,
startBlockId,
effectiveStartBlockId,
dirtySetSize: dirtySet.size,
upstreamSetSize: upstreamSet.size,
})
// Remove incoming edges from non-dirty sources so convergent blocks don't wait for cached upstream
for (const nodeId of dirtySet) {
const node = dag.nodes.get(nodeId)
if (!node) continue
const nonDirtyIncoming: string[] = []
for (const sourceId of node.incomingEdges) {
if (!dirtySet.has(sourceId)) {
nonDirtyIncoming.push(sourceId)
}
}
for (const sourceId of nonDirtyIncoming) {
node.incomingEdges.delete(sourceId)
}
}
const runFromBlockContext = { startBlockId: effectiveStartBlockId, dirtySet }
const { context, state } = this.createExecutionContext(workflowId, undefined, {
snapshotState: filteredSnapshot,
runFromBlockContext,
})
const resolver = new VariableResolver(this.workflow, this.workflowVariables, state)
const loopOrchestrator = new LoopOrchestrator(dag, state, resolver)
loopOrchestrator.setContextExtensions(this.contextExtensions)
const parallelOrchestrator = new ParallelOrchestrator(dag, state)
parallelOrchestrator.setResolver(resolver)
parallelOrchestrator.setContextExtensions(this.contextExtensions)
const allHandlers = createBlockHandlers()
const blockExecutor = new BlockExecutor(allHandlers, resolver, this.contextExtensions, state)
const edgeManager = new EdgeManager(dag)
loopOrchestrator.setEdgeManager(edgeManager)
const nodeOrchestrator = new NodeExecutionOrchestrator(
dag,
state,
blockExecutor,
loopOrchestrator,
parallelOrchestrator
)
const engine = new ExecutionEngine(context, dag, edgeManager, nodeOrchestrator)
return await engine.run()
}
private createExecutionContext(
workflowId: string,
triggerBlockId?: string
triggerBlockId?: string,
overrides?: {
snapshotState?: SerializableExecutionState
runFromBlockContext?: RunFromBlockContext
}
): { context: ExecutionContext; state: ExecutionState } {
const snapshotState = this.contextExtensions.snapshotState
const snapshotState = overrides?.snapshotState ?? this.contextExtensions.snapshotState
const blockStates = snapshotState?.blockStates
? new Map(Object.entries(snapshotState.blockStates))
: new Map<string, BlockState>()
const executedBlocks = snapshotState?.executedBlocks
let executedBlocks = snapshotState?.executedBlocks
? new Set(snapshotState.executedBlocks)
: new Set<string>()
if (overrides?.runFromBlockContext) {
const { dirtySet } = overrides.runFromBlockContext
executedBlocks = new Set([...executedBlocks].filter((id) => !dirtySet.has(id)))
logger.info('Cleared executed status for dirty blocks', {
dirtySetSize: dirtySet.size,
remainingExecutedBlocks: executedBlocks.size,
})
}
const state = new ExecutionState(blockStates, executedBlocks)
const context: ExecutionContext = {
@@ -109,7 +260,7 @@ export class DAGExecutor {
userId: this.contextExtensions.userId,
isDeployedContext: this.contextExtensions.isDeployedContext,
blockStates: state.getBlockStates(),
blockLogs: snapshotState?.blockLogs ?? [],
blockLogs: overrides?.runFromBlockContext ? [] : (snapshotState?.blockLogs ?? []),
metadata: {
...this.contextExtensions.metadata,
startTime: new Date().toISOString(),
@@ -169,6 +320,8 @@ export class DAGExecutor {
abortSignal: this.contextExtensions.abortSignal,
includeFileBase64: this.contextExtensions.includeFileBase64,
base64MaxBytes: this.contextExtensions.base64MaxBytes,
runFromBlockContext: overrides?.runFromBlockContext,
stopAfterBlockId: this.contextExtensions.stopAfterBlockId,
}
if (this.contextExtensions.resumeFromSnapshot) {
@@ -193,6 +346,15 @@ export class DAGExecutor {
pendingBlocks: context.metadata.pendingBlocks,
skipStarterBlockInit: true,
})
} else if (overrides?.runFromBlockContext) {
// In run-from-block mode, initialize the start block only if it's a regular block
// Skip for sentinels/containers (loop/parallel) which aren't real blocks
const startBlockId = overrides.runFromBlockContext.startBlockId
const isRegularBlock = this.workflow.blocks.some((b) => b.id === startBlockId)
if (isRegularBlock) {
this.initializeStarterBlock(context, state, startBlockId)
}
} else {
this.initializeStarterBlock(context, state, triggerBlockId)
}

View File

@@ -1,5 +1,6 @@
import type { Edge } from 'reactflow'
import type { BlockLog, BlockState, NormalizedBlockOutput } from '@/executor/types'
import type { RunFromBlockContext } from '@/executor/utils/run-from-block'
import type { SubflowType } from '@/stores/workflows/workflow/types'
export interface ExecutionMetadata {
@@ -105,6 +106,17 @@ export interface ContextExtensions {
output: { input?: any; output: NormalizedBlockOutput; executionTime: number },
iterationContext?: IterationContext
) => Promise<void>
/**
* Run-from-block configuration. When provided, executor runs in partial
* execution mode starting from the specified block.
*/
runFromBlockContext?: RunFromBlockContext
/**
* Stop execution after this block completes. Used for "run until block" feature.
*/
stopAfterBlockId?: string
}
export interface WorkflowInput {

View File

@@ -936,12 +936,8 @@ export class AgentBlockHandler implements BlockHandler {
systemPrompt: validMessages ? undefined : inputs.systemPrompt,
context: validMessages ? undefined : stringifyJSON(messages),
tools: formattedTools,
temperature:
inputs.temperature != null && inputs.temperature !== ''
? Number(inputs.temperature)
: undefined,
maxTokens:
inputs.maxTokens != null && inputs.maxTokens !== '' ? Number(inputs.maxTokens) : undefined,
temperature: inputs.temperature,
maxTokens: inputs.maxTokens,
apiKey: inputs.apiKey,
azureEndpoint: inputs.azureEndpoint,
azureApiVersion: inputs.azureApiVersion,

View File

@@ -14,8 +14,8 @@ export interface AgentInputs {
slidingWindowSize?: string // For message-based sliding window
slidingWindowTokens?: string // For token-based sliding window
// LLM parameters
temperature?: string
maxTokens?: string
temperature?: number
maxTokens?: number
apiKey?: string
azureEndpoint?: string
azureApiVersion?: string

View File

@@ -276,7 +276,16 @@ export class LoopOrchestrator {
scope: LoopScope
): LoopContinuationResult {
const results = scope.allIterationOutputs
this.state.setBlockOutput(loopId, { results }, DEFAULTS.EXECUTION_TIME)
const output = { results }
this.state.setBlockOutput(loopId, output, DEFAULTS.EXECUTION_TIME)
// Emit onBlockComplete for the loop container so the UI can track it
if (this.contextExtensions?.onBlockComplete) {
this.contextExtensions.onBlockComplete(loopId, 'Loop', 'loop', {
output,
executionTime: DEFAULTS.EXECUTION_TIME,
})
}
return {
shouldContinue: false,

View File

@@ -31,7 +31,18 @@ export class NodeExecutionOrchestrator {
throw new Error(`Node not found in DAG: ${nodeId}`)
}
if (this.state.hasExecuted(nodeId)) {
if (ctx.runFromBlockContext && !ctx.runFromBlockContext.dirtySet.has(nodeId)) {
const cachedOutput = this.state.getBlockOutput(nodeId) || {}
logger.debug('Skipping non-dirty block in run-from-block mode', { nodeId })
return {
nodeId,
output: cachedOutput,
isFinalOutput: false,
}
}
const isDirtyBlock = ctx.runFromBlockContext?.dirtySet.has(nodeId) ?? false
if (!isDirtyBlock && this.state.hasExecuted(nodeId)) {
const output = this.state.getBlockOutput(nodeId) || {}
return {
nodeId,

View File

@@ -228,9 +228,17 @@ export class ParallelOrchestrator {
const branchOutputs = scope.branchOutputs.get(i) || []
results.push(branchOutputs)
}
this.state.setBlockOutput(parallelId, {
results,
})
const output = { results }
this.state.setBlockOutput(parallelId, output)
// Emit onBlockComplete for the parallel container so the UI can track it
if (this.contextExtensions?.onBlockComplete) {
this.contextExtensions.onBlockComplete(parallelId, 'Parallel', 'parallel', {
output,
executionTime: 0,
})
}
return {
allBranchesComplete: true,
results,

View File

@@ -1,6 +1,7 @@
import type { TraceSpan } from '@/lib/logs/types'
import type { PermissionGroupConfig } from '@/lib/permission-groups/types'
import type { BlockOutput } from '@/blocks/types'
import type { RunFromBlockContext } from '@/executor/utils/run-from-block'
import type { SerializedBlock, SerializedWorkflow } from '@/serializer/types'
export interface UserFile {
@@ -250,6 +251,17 @@ export interface ExecutionContext {
* will not have their base64 content fetched.
*/
base64MaxBytes?: number
/**
* Context for "run from block" mode. When present, only blocks in dirtySet
* will be executed; others return cached outputs from the source snapshot.
*/
runFromBlockContext?: RunFromBlockContext
/**
* Stop execution after this block completes. Used for "run until block" feature.
*/
stopAfterBlockId?: string
}
export interface ExecutionResult {

View File

@@ -0,0 +1,610 @@
import { describe, expect, it } from 'vitest'
import type { DAG, DAGNode } from '@/executor/dag/builder'
import type { DAGEdge, NodeMetadata } from '@/executor/dag/types'
import { computeExecutionSets, validateRunFromBlock } from '@/executor/utils/run-from-block'
import type { SerializedLoop, SerializedParallel } from '@/serializer/types'
/**
* Helper to extract dirty set from computeExecutionSets
*/
function computeDirtySet(dag: DAG, startBlockId: string): Set<string> {
return computeExecutionSets(dag, startBlockId).dirtySet
}
/**
* Helper to create a DAG node for testing
*/
function createNode(
id: string,
outgoingEdges: Array<{ target: string; sourceHandle?: string }> = [],
metadata: Partial<NodeMetadata> = {}
): DAGNode {
const edges = new Map<string, DAGEdge>()
for (const edge of outgoingEdges) {
edges.set(edge.target, { target: edge.target, sourceHandle: edge.sourceHandle })
}
return {
id,
block: {
id,
position: { x: 0, y: 0 },
config: { tool: 'test', params: {} },
inputs: {},
outputs: {},
metadata: { id: 'test', name: `block-${id}`, category: 'tools' },
enabled: true,
},
incomingEdges: new Set<string>(),
outgoingEdges: edges,
metadata: {
isParallelBranch: false,
isLoopNode: false,
isSentinel: false,
...metadata,
},
}
}
/**
* Helper to create a DAG for testing
*/
function createDAG(nodes: DAGNode[]): DAG {
const nodeMap = new Map<string, DAGNode>()
for (const node of nodes) {
nodeMap.set(node.id, node)
}
// Set up incoming edges based on outgoing edges
for (const node of nodes) {
for (const [, edge] of node.outgoingEdges) {
const targetNode = nodeMap.get(edge.target)
if (targetNode) {
targetNode.incomingEdges.add(node.id)
}
}
}
return {
nodes: nodeMap,
loopConfigs: new Map<string, SerializedLoop>(),
parallelConfigs: new Map<string, SerializedParallel>(),
}
}
describe('computeDirtySet', () => {
it('includes start block in dirty set', () => {
const dag = createDAG([createNode('A'), createNode('B'), createNode('C')])
const dirtySet = computeDirtySet(dag, 'B')
expect(dirtySet.has('B')).toBe(true)
})
it('includes all downstream blocks in linear workflow', () => {
// A → B → C → D
const dag = createDAG([
createNode('A', [{ target: 'B' }]),
createNode('B', [{ target: 'C' }]),
createNode('C', [{ target: 'D' }]),
createNode('D'),
])
const dirtySet = computeDirtySet(dag, 'B')
expect(dirtySet.has('A')).toBe(false)
expect(dirtySet.has('B')).toBe(true)
expect(dirtySet.has('C')).toBe(true)
expect(dirtySet.has('D')).toBe(true)
expect(dirtySet.size).toBe(3)
})
it('handles branching paths', () => {
// A → B → C
// ↓
// D → E
const dag = createDAG([
createNode('A', [{ target: 'B' }]),
createNode('B', [{ target: 'C' }, { target: 'D' }]),
createNode('C'),
createNode('D', [{ target: 'E' }]),
createNode('E'),
])
const dirtySet = computeDirtySet(dag, 'B')
expect(dirtySet.has('A')).toBe(false)
expect(dirtySet.has('B')).toBe(true)
expect(dirtySet.has('C')).toBe(true)
expect(dirtySet.has('D')).toBe(true)
expect(dirtySet.has('E')).toBe(true)
expect(dirtySet.size).toBe(4)
})
it('handles convergence points', () => {
// A → C
// B → C → D
const dag = createDAG([
createNode('A', [{ target: 'C' }]),
createNode('B', [{ target: 'C' }]),
createNode('C', [{ target: 'D' }]),
createNode('D'),
])
// Run from A: should include A, C, D (but not B)
const dirtySet = computeDirtySet(dag, 'A')
expect(dirtySet.has('A')).toBe(true)
expect(dirtySet.has('B')).toBe(false)
expect(dirtySet.has('C')).toBe(true)
expect(dirtySet.has('D')).toBe(true)
expect(dirtySet.size).toBe(3)
})
it('handles diamond pattern', () => {
// B
// ↗ ↘
// A D
// ↘ ↗
// C
const dag = createDAG([
createNode('A', [{ target: 'B' }, { target: 'C' }]),
createNode('B', [{ target: 'D' }]),
createNode('C', [{ target: 'D' }]),
createNode('D'),
])
const dirtySet = computeDirtySet(dag, 'A')
expect(dirtySet.has('A')).toBe(true)
expect(dirtySet.has('B')).toBe(true)
expect(dirtySet.has('C')).toBe(true)
expect(dirtySet.has('D')).toBe(true)
expect(dirtySet.size).toBe(4)
})
it('stops at graph boundaries', () => {
// A → B C → D (disconnected)
const dag = createDAG([
createNode('A', [{ target: 'B' }]),
createNode('B'),
createNode('C', [{ target: 'D' }]),
createNode('D'),
])
const dirtySet = computeDirtySet(dag, 'A')
expect(dirtySet.has('A')).toBe(true)
expect(dirtySet.has('B')).toBe(true)
expect(dirtySet.has('C')).toBe(false)
expect(dirtySet.has('D')).toBe(false)
expect(dirtySet.size).toBe(2)
})
it('handles single node workflow', () => {
const dag = createDAG([createNode('A')])
const dirtySet = computeDirtySet(dag, 'A')
expect(dirtySet.has('A')).toBe(true)
expect(dirtySet.size).toBe(1)
})
it('handles node not in DAG gracefully', () => {
const dag = createDAG([createNode('A'), createNode('B')])
const dirtySet = computeDirtySet(dag, 'nonexistent')
// Should just contain the start block ID even if not found
expect(dirtySet.has('nonexistent')).toBe(true)
expect(dirtySet.size).toBe(1)
})
it('includes convergent block when running from one branch of parallel', () => {
// Parallel branches converging:
// A → B → D
// A → C → D
// Running from B should include B and D (but not A or C)
const dag = createDAG([
createNode('A', [{ target: 'B' }, { target: 'C' }]),
createNode('B', [{ target: 'D' }]),
createNode('C', [{ target: 'D' }]),
createNode('D'),
])
const dirtySet = computeDirtySet(dag, 'B')
expect(dirtySet.has('A')).toBe(false)
expect(dirtySet.has('B')).toBe(true)
expect(dirtySet.has('C')).toBe(false)
expect(dirtySet.has('D')).toBe(true)
expect(dirtySet.size).toBe(2)
})
it('handles running from convergent block itself (all upstream non-dirty)', () => {
// A → C
// B → C
// Running from C should only include C
const dag = createDAG([
createNode('A', [{ target: 'C' }]),
createNode('B', [{ target: 'C' }]),
createNode('C', [{ target: 'D' }]),
createNode('D'),
])
const dirtySet = computeDirtySet(dag, 'C')
expect(dirtySet.has('A')).toBe(false)
expect(dirtySet.has('B')).toBe(false)
expect(dirtySet.has('C')).toBe(true)
expect(dirtySet.has('D')).toBe(true)
expect(dirtySet.size).toBe(2)
})
it('handles deep downstream chains', () => {
// A → B → C → D → E → F
// Running from C should include C, D, E, F
const dag = createDAG([
createNode('A', [{ target: 'B' }]),
createNode('B', [{ target: 'C' }]),
createNode('C', [{ target: 'D' }]),
createNode('D', [{ target: 'E' }]),
createNode('E', [{ target: 'F' }]),
createNode('F'),
])
const dirtySet = computeDirtySet(dag, 'C')
expect(dirtySet.has('A')).toBe(false)
expect(dirtySet.has('B')).toBe(false)
expect(dirtySet.has('C')).toBe(true)
expect(dirtySet.has('D')).toBe(true)
expect(dirtySet.has('E')).toBe(true)
expect(dirtySet.has('F')).toBe(true)
expect(dirtySet.size).toBe(4)
})
})
describe('validateRunFromBlock', () => {
it('accepts valid block', () => {
const dag = createDAG([createNode('A'), createNode('B')])
const executedBlocks = new Set(['A', 'B'])
const result = validateRunFromBlock('A', dag, executedBlocks)
expect(result.valid).toBe(true)
expect(result.error).toBeUndefined()
})
it('rejects block not found in DAG', () => {
const dag = createDAG([createNode('A')])
const executedBlocks = new Set(['A', 'B'])
const result = validateRunFromBlock('B', dag, executedBlocks)
expect(result.valid).toBe(false)
expect(result.error).toContain('Block not found')
})
it('rejects blocks inside loops', () => {
const dag = createDAG([createNode('A', [], { isLoopNode: true, loopId: 'loop-1' })])
const executedBlocks = new Set(['A'])
const result = validateRunFromBlock('A', dag, executedBlocks)
expect(result.valid).toBe(false)
expect(result.error).toContain('inside loop')
expect(result.error).toContain('loop-1')
})
it('rejects blocks inside parallels', () => {
const dag = createDAG([
createNode('A', [], { isParallelBranch: true, parallelId: 'parallel-1' }),
])
const executedBlocks = new Set(['A'])
const result = validateRunFromBlock('A', dag, executedBlocks)
expect(result.valid).toBe(false)
expect(result.error).toContain('inside parallel')
expect(result.error).toContain('parallel-1')
})
it('rejects sentinel nodes', () => {
const dag = createDAG([createNode('A', [], { isSentinel: true, sentinelType: 'start' })])
const executedBlocks = new Set(['A'])
const result = validateRunFromBlock('A', dag, executedBlocks)
expect(result.valid).toBe(false)
expect(result.error).toContain('sentinel')
})
it('rejects blocks with unexecuted upstream dependencies', () => {
// A → B, only A executed but B depends on A
const dag = createDAG([createNode('A', [{ target: 'B' }]), createNode('B')])
const executedBlocks = new Set<string>() // A was not executed
const result = validateRunFromBlock('B', dag, executedBlocks)
expect(result.valid).toBe(false)
expect(result.error).toContain('Upstream dependency not executed')
})
it('allows running from block when immediate predecessor was executed (ignores transitive)', () => {
// A → X → B → C, where X is new (not executed)
// Running from C is allowed because B (immediate predecessor) was executed
// C will use B's cached output - doesn't matter that X is new
const dag = createDAG([
createNode('A', [{ target: 'X' }]),
createNode('X', [{ target: 'B' }]),
createNode('B', [{ target: 'C' }]),
createNode('C'),
])
const executedBlocks = new Set(['A', 'B', 'C']) // X was not executed (new block)
const result = validateRunFromBlock('C', dag, executedBlocks)
// Valid because C's immediate predecessor B was executed
expect(result.valid).toBe(true)
})
it('allows blocks with no dependencies even if not previously executed', () => {
// A and B are independent (no edges)
const dag = createDAG([createNode('A'), createNode('B')])
const executedBlocks = new Set(['A']) // B was not executed but has no deps
const result = validateRunFromBlock('B', dag, executedBlocks)
expect(result.valid).toBe(true) // B has no incoming edges, so it's valid
})
it('accepts regular executed block', () => {
const dag = createDAG([
createNode('trigger', [{ target: 'A' }]),
createNode('A', [{ target: 'B' }]),
createNode('B'),
])
const executedBlocks = new Set(['trigger', 'A', 'B'])
const result = validateRunFromBlock('A', dag, executedBlocks)
expect(result.valid).toBe(true)
})
it('accepts loop container when executed', () => {
// Loop container with sentinel nodes
const loopId = 'loop-container-1'
const sentinelStartId = `loop-${loopId}-sentinel-start`
const sentinelEndId = `loop-${loopId}-sentinel-end`
const dag = createDAG([
createNode('A', [{ target: sentinelStartId }]),
createNode(sentinelStartId, [{ target: 'B' }], {
isSentinel: true,
sentinelType: 'start',
loopId,
}),
createNode('B', [{ target: sentinelEndId }], { isLoopNode: true, loopId }),
createNode(sentinelEndId, [{ target: 'C' }], {
isSentinel: true,
sentinelType: 'end',
loopId,
}),
createNode('C'),
])
dag.loopConfigs.set(loopId, { id: loopId, nodes: ['B'], iterations: 3, loopType: 'for' } as any)
const executedBlocks = new Set(['A', loopId, sentinelStartId, 'B', sentinelEndId, 'C'])
const result = validateRunFromBlock(loopId, dag, executedBlocks)
expect(result.valid).toBe(true)
})
it('accepts parallel container when executed', () => {
// Parallel container with sentinel nodes
const parallelId = 'parallel-container-1'
const sentinelStartId = `parallel-${parallelId}-sentinel-start`
const sentinelEndId = `parallel-${parallelId}-sentinel-end`
const dag = createDAG([
createNode('A', [{ target: sentinelStartId }]),
createNode(sentinelStartId, [{ target: 'B₍0₎' }], {
isSentinel: true,
sentinelType: 'start',
parallelId,
}),
createNode('B₍0₎', [{ target: sentinelEndId }], { isParallelBranch: true, parallelId }),
createNode(sentinelEndId, [{ target: 'C' }], {
isSentinel: true,
sentinelType: 'end',
parallelId,
}),
createNode('C'),
])
dag.parallelConfigs.set(parallelId, { id: parallelId, nodes: ['B'], count: 2 } as any)
const executedBlocks = new Set(['A', parallelId, sentinelStartId, 'B₍0₎', sentinelEndId, 'C'])
const result = validateRunFromBlock(parallelId, dag, executedBlocks)
expect(result.valid).toBe(true)
})
it('allows loop container with no upstream dependencies', () => {
// Loop containers are validated via their sentinel nodes, not incoming edges on the container itself
// If the loop has no upstream dependencies, it should be valid
const loopId = 'loop-container-1'
const sentinelStartId = `loop-${loopId}-sentinel-start`
const dag = createDAG([
createNode(sentinelStartId, [], { isSentinel: true, sentinelType: 'start', loopId }),
])
dag.loopConfigs.set(loopId, { id: loopId, nodes: [], iterations: 3, loopType: 'for' } as any)
const executedBlocks = new Set<string>() // Nothing executed but loop has no deps
const result = validateRunFromBlock(loopId, dag, executedBlocks)
// Loop container validation doesn't check incoming edges (containers don't have nodes in dag.nodes)
// So this is valid - the loop can start fresh
expect(result.valid).toBe(true)
})
})
describe('computeDirtySet with containers', () => {
it('includes loop container and all downstream when running from loop', () => {
// A → loop-sentinel-start → B (inside loop) → loop-sentinel-end → C
const loopId = 'loop-1'
const sentinelStartId = `loop-${loopId}-sentinel-start`
const sentinelEndId = `loop-${loopId}-sentinel-end`
const dag = createDAG([
createNode('A', [{ target: sentinelStartId }]),
createNode(sentinelStartId, [{ target: 'B' }], {
isSentinel: true,
sentinelType: 'start',
loopId,
}),
createNode('B', [{ target: sentinelEndId }], { isLoopNode: true, loopId }),
createNode(sentinelEndId, [{ target: 'C' }], {
isSentinel: true,
sentinelType: 'end',
loopId,
}),
createNode('C'),
])
dag.loopConfigs.set(loopId, { id: loopId, nodes: ['B'], iterations: 3, loopType: 'for' } as any)
const dirtySet = computeDirtySet(dag, loopId)
// Should include loop container, sentinel-start, B, sentinel-end, C
expect(dirtySet.has(loopId)).toBe(true)
expect(dirtySet.has(sentinelStartId)).toBe(true)
expect(dirtySet.has('B')).toBe(true)
expect(dirtySet.has(sentinelEndId)).toBe(true)
expect(dirtySet.has('C')).toBe(true)
// Should NOT include A (upstream)
expect(dirtySet.has('A')).toBe(false)
})
it('includes parallel container and all downstream when running from parallel', () => {
// A → parallel-sentinel-start → B₍0₎ → parallel-sentinel-end → C
const parallelId = 'parallel-1'
const sentinelStartId = `parallel-${parallelId}-sentinel-start`
const sentinelEndId = `parallel-${parallelId}-sentinel-end`
const dag = createDAG([
createNode('A', [{ target: sentinelStartId }]),
createNode(sentinelStartId, [{ target: 'B₍0₎' }], {
isSentinel: true,
sentinelType: 'start',
parallelId,
}),
createNode('B₍0₎', [{ target: sentinelEndId }], { isParallelBranch: true, parallelId }),
createNode(sentinelEndId, [{ target: 'C' }], {
isSentinel: true,
sentinelType: 'end',
parallelId,
}),
createNode('C'),
])
dag.parallelConfigs.set(parallelId, { id: parallelId, nodes: ['B'], count: 2 } as any)
const dirtySet = computeDirtySet(dag, parallelId)
// Should include parallel container, sentinel-start, B₍0₎, sentinel-end, C
expect(dirtySet.has(parallelId)).toBe(true)
expect(dirtySet.has(sentinelStartId)).toBe(true)
expect(dirtySet.has('B₍0₎')).toBe(true)
expect(dirtySet.has(sentinelEndId)).toBe(true)
expect(dirtySet.has('C')).toBe(true)
// Should NOT include A (upstream)
expect(dirtySet.has('A')).toBe(false)
})
})
describe('computeExecutionSets upstream set', () => {
it('includes all upstream blocks in linear workflow', () => {
// A → B → C → D
const dag = createDAG([
createNode('A', [{ target: 'B' }]),
createNode('B', [{ target: 'C' }]),
createNode('C', [{ target: 'D' }]),
createNode('D'),
])
const { upstreamSet } = computeExecutionSets(dag, 'C')
expect(upstreamSet.has('A')).toBe(true)
expect(upstreamSet.has('B')).toBe(true)
expect(upstreamSet.has('C')).toBe(false) // start block not in upstream
expect(upstreamSet.has('D')).toBe(false) // downstream
})
it('includes all branches in convergent upstream', () => {
// A → C
// B → C → D
const dag = createDAG([
createNode('A', [{ target: 'C' }]),
createNode('B', [{ target: 'C' }]),
createNode('C', [{ target: 'D' }]),
createNode('D'),
])
const { upstreamSet } = computeExecutionSets(dag, 'C')
expect(upstreamSet.has('A')).toBe(true)
expect(upstreamSet.has('B')).toBe(true)
expect(upstreamSet.has('C')).toBe(false)
expect(upstreamSet.has('D')).toBe(false)
})
it('excludes parallel branches not in upstream path', () => {
// A → B → D
// A → C → D
// Running from B: upstream is A only, not C
const dag = createDAG([
createNode('A', [{ target: 'B' }, { target: 'C' }]),
createNode('B', [{ target: 'D' }]),
createNode('C', [{ target: 'D' }]),
createNode('D'),
])
const { upstreamSet, dirtySet } = computeExecutionSets(dag, 'B')
// Upstream should only contain A
expect(upstreamSet.has('A')).toBe(true)
expect(upstreamSet.has('C')).toBe(false) // parallel branch, not upstream of B
// Dirty should contain B and D
expect(dirtySet.has('B')).toBe(true)
expect(dirtySet.has('D')).toBe(true)
expect(dirtySet.has('C')).toBe(false)
})
it('handles diamond pattern upstream correctly', () => {
// B
// ↗ ↘
// A D → E
// ↘ ↗
// C
// Running from D: upstream should be A, B, C
const dag = createDAG([
createNode('A', [{ target: 'B' }, { target: 'C' }]),
createNode('B', [{ target: 'D' }]),
createNode('C', [{ target: 'D' }]),
createNode('D', [{ target: 'E' }]),
createNode('E'),
])
const { upstreamSet, dirtySet } = computeExecutionSets(dag, 'D')
expect(upstreamSet.has('A')).toBe(true)
expect(upstreamSet.has('B')).toBe(true)
expect(upstreamSet.has('C')).toBe(true)
expect(upstreamSet.has('D')).toBe(false)
expect(dirtySet.has('D')).toBe(true)
expect(dirtySet.has('E')).toBe(true)
})
it('returns empty upstream set for root block', () => {
const dag = createDAG([createNode('A', [{ target: 'B' }]), createNode('B')])
const { upstreamSet } = computeExecutionSets(dag, 'A')
expect(upstreamSet.size).toBe(0)
})
})

View File

@@ -0,0 +1,193 @@
import { LOOP, PARALLEL } from '@/executor/constants'
import type { DAG } from '@/executor/dag/builder'
/**
* Builds the sentinel-start node ID for a loop.
*/
function buildLoopSentinelStartId(loopId: string): string {
return `${LOOP.SENTINEL.PREFIX}${loopId}${LOOP.SENTINEL.START_SUFFIX}`
}
/**
* Builds the sentinel-start node ID for a parallel.
*/
function buildParallelSentinelStartId(parallelId: string): string {
return `${PARALLEL.SENTINEL.PREFIX}${parallelId}${PARALLEL.SENTINEL.START_SUFFIX}`
}
/**
* Checks if a block ID is a loop or parallel container and returns the sentinel-start ID if so.
* Returns null if the block is not a container.
*/
export function resolveContainerToSentinelStart(blockId: string, dag: DAG): string | null {
if (dag.loopConfigs.has(blockId)) {
return buildLoopSentinelStartId(blockId)
}
if (dag.parallelConfigs.has(blockId)) {
return buildParallelSentinelStartId(blockId)
}
return null
}
/**
* Result of validating a block for run-from-block execution.
*/
export interface RunFromBlockValidation {
valid: boolean
error?: string
}
/**
* Context for run-from-block execution mode.
*/
export interface RunFromBlockContext {
/** The block ID to start execution from */
startBlockId: string
/** Set of block IDs that need re-execution (start block + all downstream) */
dirtySet: Set<string>
}
/**
* Result of computing execution sets for run-from-block mode.
*/
export interface ExecutionSets {
/** Blocks that need re-execution (start block + all downstream) */
dirtySet: Set<string>
/** Blocks that are upstream (ancestors) of the start block */
upstreamSet: Set<string>
}
/**
* Computes both the dirty set (downstream) and upstream set in a single traversal pass.
* - Dirty set: start block + all blocks reachable via outgoing edges (need re-execution)
* - Upstream set: all blocks reachable via incoming edges (can be referenced)
*
* For loop/parallel containers, starts from the sentinel-start node and includes
* the container ID itself in the dirty set.
*
* @param dag - The workflow DAG
* @param startBlockId - The block to start execution from
* @returns Object containing both dirtySet and upstreamSet
*/
export function computeExecutionSets(dag: DAG, startBlockId: string): ExecutionSets {
const dirty = new Set<string>([startBlockId])
const upstream = new Set<string>()
const sentinelStartId = resolveContainerToSentinelStart(startBlockId, dag)
const traversalStartId = sentinelStartId ?? startBlockId
if (sentinelStartId) {
dirty.add(sentinelStartId)
}
// BFS downstream for dirty set
const downstreamQueue = [traversalStartId]
while (downstreamQueue.length > 0) {
const nodeId = downstreamQueue.shift()!
const node = dag.nodes.get(nodeId)
if (!node) continue
for (const [, edge] of node.outgoingEdges) {
if (!dirty.has(edge.target)) {
dirty.add(edge.target)
downstreamQueue.push(edge.target)
}
}
}
// BFS upstream for upstream set
const upstreamQueue = [traversalStartId]
while (upstreamQueue.length > 0) {
const nodeId = upstreamQueue.shift()!
const node = dag.nodes.get(nodeId)
if (!node) continue
for (const sourceId of node.incomingEdges) {
if (!upstream.has(sourceId)) {
upstream.add(sourceId)
upstreamQueue.push(sourceId)
}
}
}
return { dirtySet: dirty, upstreamSet: upstream }
}
/**
* Validates that a block can be used as a run-from-block starting point.
*
* Validation rules:
* - Block must exist in the DAG (or be a loop/parallel container)
* - Block cannot be inside a loop (but loop containers are allowed)
* - Block cannot be inside a parallel (but parallel containers are allowed)
* - Block cannot be a sentinel node
* - All upstream dependencies must have been executed (have cached outputs)
*
* @param blockId - The block ID to validate
* @param dag - The workflow DAG
* @param executedBlocks - Set of blocks that were executed in the source run
* @returns Validation result with error message if invalid
*/
export function validateRunFromBlock(
blockId: string,
dag: DAG,
executedBlocks: Set<string>
): RunFromBlockValidation {
const node = dag.nodes.get(blockId)
const isLoopContainer = dag.loopConfigs.has(blockId)
const isParallelContainer = dag.parallelConfigs.has(blockId)
const isContainer = isLoopContainer || isParallelContainer
if (!node && !isContainer) {
return { valid: false, error: `Block not found in workflow: ${blockId}` }
}
if (isContainer) {
const sentinelStartId = resolveContainerToSentinelStart(blockId, dag)
if (!sentinelStartId || !dag.nodes.has(sentinelStartId)) {
return {
valid: false,
error: `Container sentinel not found for: ${blockId}`,
}
}
}
if (node) {
if (node.metadata.isLoopNode) {
return {
valid: false,
error: `Cannot run from block inside loop: ${node.metadata.loopId}`,
}
}
if (node.metadata.isParallelBranch) {
return {
valid: false,
error: `Cannot run from block inside parallel: ${node.metadata.parallelId}`,
}
}
if (node.metadata.isSentinel) {
return { valid: false, error: 'Cannot run from sentinel node' }
}
// Check immediate upstream dependencies were executed
for (const sourceId of node.incomingEdges) {
const sourceNode = dag.nodes.get(sourceId)
// Skip sentinel nodes - they're internal and not in executedBlocks
if (sourceNode?.metadata.isSentinel) continue
// Skip trigger nodes - they're entry points and don't need prior execution
// A trigger node has no incoming edges
if (sourceNode && sourceNode.incomingEdges.size === 0) continue
if (!executedBlocks.has(sourceId)) {
return {
valid: false,
error: `Upstream dependency not executed: ${sourceId}`,
}
}
}
}
return { valid: true }
}

View File

@@ -1,10 +1,85 @@
import { useCallback, useRef } from 'react'
import { createLogger } from '@sim/logger'
import type { ExecutionEvent } from '@/lib/workflows/executor/execution-events'
import type { SerializableExecutionState } from '@/executor/execution/types'
import type { SubflowType } from '@/stores/workflows/workflow/types'
const logger = createLogger('useExecutionStream')
/**
* Processes SSE events from a response body and invokes appropriate callbacks.
*/
async function processSSEStream(
reader: ReadableStreamDefaultReader<Uint8Array>,
callbacks: ExecutionStreamCallbacks,
logPrefix: string
): Promise<void> {
const decoder = new TextDecoder()
let buffer = ''
try {
while (true) {
const { done, value } = await reader.read()
if (done) break
buffer += decoder.decode(value, { stream: true })
const lines = buffer.split('\n\n')
buffer = lines.pop() || ''
for (const line of lines) {
if (!line.trim() || !line.startsWith('data: ')) continue
const data = line.substring(6).trim()
if (data === '[DONE]') {
logger.info(`${logPrefix} stream completed`)
continue
}
try {
const event = JSON.parse(data) as ExecutionEvent
switch (event.type) {
case 'execution:started':
callbacks.onExecutionStarted?.(event.data)
break
case 'execution:completed':
callbacks.onExecutionCompleted?.(event.data)
break
case 'execution:error':
callbacks.onExecutionError?.(event.data)
break
case 'execution:cancelled':
callbacks.onExecutionCancelled?.(event.data)
break
case 'block:started':
callbacks.onBlockStarted?.(event.data)
break
case 'block:completed':
callbacks.onBlockCompleted?.(event.data)
break
case 'block:error':
callbacks.onBlockError?.(event.data)
break
case 'stream:chunk':
callbacks.onStreamChunk?.(event.data)
break
case 'stream:done':
callbacks.onStreamDone?.(event.data)
break
default:
logger.warn('Unknown event type:', (event as any).type)
}
} catch (error) {
logger.error('Failed to parse SSE event:', error, { data })
}
}
}
} finally {
reader.releaseLock()
}
}
export interface ExecutionStreamCallbacks {
onExecutionStarted?: (data: { startTime: string }) => void
onExecutionCompleted?: (data: {
@@ -68,6 +143,15 @@ export interface ExecuteStreamOptions {
loops?: Record<string, any>
parallels?: Record<string, any>
}
stopAfterBlockId?: string
callbacks?: ExecutionStreamCallbacks
}
export interface ExecuteFromBlockOptions {
workflowId: string
startBlockId: string
sourceSnapshot: SerializableExecutionState
input?: any
callbacks?: ExecutionStreamCallbacks
}
@@ -119,91 +203,7 @@ export function useExecutionStream() {
}
const reader = response.body.getReader()
const decoder = new TextDecoder()
let buffer = ''
try {
while (true) {
const { done, value } = await reader.read()
if (done) {
break
}
buffer += decoder.decode(value, { stream: true })
const lines = buffer.split('\n\n')
buffer = lines.pop() || ''
for (const line of lines) {
if (!line.trim() || !line.startsWith('data: ')) {
continue
}
const data = line.substring(6).trim()
if (data === '[DONE]') {
logger.info('Stream completed')
continue
}
try {
const event = JSON.parse(data) as ExecutionEvent
logger.info('📡 SSE Event received:', {
type: event.type,
executionId: event.executionId,
data: event.data,
})
switch (event.type) {
case 'execution:started':
logger.info('🚀 Execution started')
callbacks.onExecutionStarted?.(event.data)
break
case 'execution:completed':
logger.info('✅ Execution completed')
callbacks.onExecutionCompleted?.(event.data)
break
case 'execution:error':
logger.error('❌ Execution error')
callbacks.onExecutionError?.(event.data)
break
case 'execution:cancelled':
logger.warn('🛑 Execution cancelled')
callbacks.onExecutionCancelled?.(event.data)
break
case 'block:started':
logger.info('🔷 Block started:', event.data.blockId)
callbacks.onBlockStarted?.(event.data)
break
case 'block:completed':
logger.info('✓ Block completed:', event.data.blockId)
callbacks.onBlockCompleted?.(event.data)
break
case 'block:error':
logger.error('✗ Block error:', event.data.blockId)
callbacks.onBlockError?.(event.data)
break
case 'stream:chunk':
callbacks.onStreamChunk?.(event.data)
break
case 'stream:done':
logger.info('Stream done:', event.data.blockId)
callbacks.onStreamDone?.(event.data)
break
default:
logger.warn('Unknown event type:', (event as any).type)
}
} catch (error) {
logger.error('Failed to parse SSE event:', error, { data })
}
}
}
} finally {
reader.releaseLock()
}
await processSSEStream(reader, callbacks, 'Execution')
} catch (error: any) {
if (error.name === 'AbortError') {
logger.info('Execution stream cancelled')
@@ -222,6 +222,70 @@ export function useExecutionStream() {
}
}, [])
const executeFromBlock = useCallback(async (options: ExecuteFromBlockOptions) => {
const { workflowId, startBlockId, sourceSnapshot, input, callbacks = {} } = options
if (abortControllerRef.current) {
abortControllerRef.current.abort()
}
const abortController = new AbortController()
abortControllerRef.current = abortController
currentExecutionRef.current = null
try {
const response = await fetch(`/api/workflows/${workflowId}/execute-from-block`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
},
body: JSON.stringify({ startBlockId, sourceSnapshot, input }),
signal: abortController.signal,
})
if (!response.ok) {
let errorResponse: any
try {
errorResponse = await response.json()
} catch {
throw new Error(`Server error (${response.status}): ${response.statusText}`)
}
const error = new Error(errorResponse.error || 'Failed to start execution')
if (errorResponse && typeof errorResponse === 'object') {
Object.assign(error, { executionResult: errorResponse })
}
throw error
}
if (!response.body) {
throw new Error('No response body')
}
const executionId = response.headers.get('X-Execution-Id')
if (executionId) {
currentExecutionRef.current = { workflowId, executionId }
}
const reader = response.body.getReader()
await processSSEStream(reader, callbacks, 'Run-from-block')
} catch (error: any) {
if (error.name === 'AbortError') {
logger.info('Run-from-block execution cancelled')
callbacks.onExecutionCancelled?.({ duration: 0 })
} else {
logger.error('Run-from-block execution error:', error)
callbacks.onExecutionError?.({
error: error.message || 'Unknown error',
duration: 0,
})
}
throw error
} finally {
abortControllerRef.current = null
currentExecutionRef.current = null
}
}, [])
const cancel = useCallback(() => {
const execution = currentExecutionRef.current
if (execution) {
@@ -239,6 +303,7 @@ export function useExecutionStream() {
return {
execute,
executeFromBlock,
cancel,
}
}

View File

@@ -8,7 +8,7 @@ const ivm = require('isolated-vm')
const USER_CODE_START_LINE = 4
const pendingFetches = new Map()
let fetchIdCounter = 0
const FETCH_TIMEOUT_MS = 300000 // 5 minutes
const FETCH_TIMEOUT_MS = 30000
/**
* Extract line and column from error stack or message

View File

@@ -34,7 +34,6 @@ export function layoutContainers(
: DEFAULT_CONTAINER_HORIZONTAL_SPACING,
verticalSpacing: options.verticalSpacing ?? DEFAULT_VERTICAL_SPACING,
padding: { x: CONTAINER_PADDING_X, y: CONTAINER_PADDING_Y },
gridSize: options.gridSize,
}
for (const [parentId, childIds] of children.entries()) {
@@ -57,15 +56,18 @@ export function layoutContainers(
continue
}
// Use the shared core layout function with container options
const { nodes, dimensions } = layoutBlocksCore(childBlocks, childEdges, {
isContainer: true,
layoutOptions: containerOptions,
})
// Apply positions back to blocks
for (const node of nodes.values()) {
blocks[node.id].position = node.position
}
// Update container dimensions
const calculatedWidth = dimensions.width
const calculatedHeight = dimensions.height

View File

@@ -9,7 +9,6 @@ import {
getBlockMetrics,
normalizePositions,
prepareBlockMetrics,
snapNodesToGrid,
} from '@/lib/workflows/autolayout/utils'
import { BLOCK_DIMENSIONS, HANDLE_POSITIONS } from '@/lib/workflows/blocks/block-dimensions'
import { EDGE } from '@/executor/constants'
@@ -85,6 +84,7 @@ export function assignLayers(
): Map<string, GraphNode> {
const nodes = new Map<string, GraphNode>()
// Initialize nodes
for (const [id, block] of Object.entries(blocks)) {
nodes.set(id, {
id,
@@ -97,6 +97,7 @@ export function assignLayers(
})
}
// Build a map of target node -> edges coming into it (to check sourceHandle later)
const incomingEdgesMap = new Map<string, Edge[]>()
for (const edge of edges) {
if (!incomingEdgesMap.has(edge.target)) {
@@ -105,6 +106,7 @@ export function assignLayers(
incomingEdgesMap.get(edge.target)!.push(edge)
}
// Build adjacency from edges
for (const edge of edges) {
const sourceNode = nodes.get(edge.source)
const targetNode = nodes.get(edge.target)
@@ -115,6 +117,7 @@ export function assignLayers(
}
}
// Find starter nodes (no incoming edges)
const starterNodes = Array.from(nodes.values()).filter((node) => node.incoming.size === 0)
if (starterNodes.length === 0 && nodes.size > 0) {
@@ -123,6 +126,7 @@ export function assignLayers(
logger.warn('No starter blocks found, using first block as starter', { blockId: firstNode.id })
}
// Topological sort using Kahn's algorithm
const inDegreeCount = new Map<string, number>()
for (const node of nodes.values()) {
@@ -140,6 +144,8 @@ export function assignLayers(
const node = nodes.get(nodeId)!
processed.add(nodeId)
// Calculate layer based on max incoming layer + 1
// For edges from subflow ends, add the subflow's internal depth (minus 1 to avoid double-counting)
if (node.incoming.size > 0) {
let maxEffectiveLayer = -1
const incomingEdges = incomingEdgesMap.get(nodeId) || []
@@ -147,11 +153,16 @@ export function assignLayers(
for (const incomingId of node.incoming) {
const incomingNode = nodes.get(incomingId)
if (incomingNode) {
// Find edges from this incoming node to check if it's a subflow end edge
const edgesFromSource = incomingEdges.filter((e) => e.source === incomingId)
let additionalDepth = 0
// Check if any edge from this source is a subflow end edge
const hasSubflowEndEdge = edgesFromSource.some(isSubflowEndEdge)
if (hasSubflowEndEdge && subflowDepths) {
// Get the internal depth of the subflow
// Subtract 1 because the +1 at the end of layer calculation already accounts for one layer
// E.g., if subflow has 2 internal layers (depth=2), we add 1 extra so total offset is 2
const depth = subflowDepths.get(incomingId) ?? 1
additionalDepth = Math.max(0, depth - 1)
}
@@ -163,6 +174,7 @@ export function assignLayers(
node.layer = maxEffectiveLayer + 1
}
// Add outgoing nodes when all dependencies processed
for (const targetId of node.outgoing) {
const currentCount = inDegreeCount.get(targetId) || 0
inDegreeCount.set(targetId, currentCount - 1)
@@ -173,6 +185,7 @@ export function assignLayers(
}
}
// Handle isolated nodes
for (const node of nodes.values()) {
if (!processed.has(node.id)) {
logger.debug('Isolated node detected, assigning to layer 0', { blockId: node.id })
@@ -211,6 +224,7 @@ function resolveVerticalOverlaps(nodes: GraphNode[], verticalSpacing: number): v
hasOverlap = false
iteration++
// Group nodes by layer for same-layer overlap resolution
const nodesByLayer = new Map<number, GraphNode[]>()
for (const node of nodes) {
if (!nodesByLayer.has(node.layer)) {
@@ -219,9 +233,11 @@ function resolveVerticalOverlaps(nodes: GraphNode[], verticalSpacing: number): v
nodesByLayer.get(node.layer)!.push(node)
}
// Process each layer independently
for (const [layer, layerNodes] of nodesByLayer) {
if (layerNodes.length < 2) continue
// Sort by Y position for consistent processing
layerNodes.sort((a, b) => a.position.y - b.position.y)
for (let i = 0; i < layerNodes.length - 1; i++) {
@@ -286,6 +302,7 @@ export function calculatePositions(
const layerNumbers = Array.from(layers.keys()).sort((a, b) => a - b)
// Calculate max width for each layer
const layerWidths = new Map<number, number>()
for (const layerNum of layerNumbers) {
const nodesInLayer = layers.get(layerNum)!
@@ -293,6 +310,7 @@ export function calculatePositions(
layerWidths.set(layerNum, maxWidth)
}
// Calculate cumulative X positions for each layer based on actual widths
const layerXPositions = new Map<number, number>()
let cumulativeX = padding.x
@@ -301,6 +319,7 @@ export function calculatePositions(
cumulativeX += layerWidths.get(layerNum)! + horizontalSpacing
}
// Build a flat map of all nodes for quick lookups
const allNodes = new Map<string, GraphNode>()
for (const nodesInLayer of layers.values()) {
for (const node of nodesInLayer) {
@@ -308,6 +327,7 @@ export function calculatePositions(
}
}
// Build incoming edges map for handle lookups
const incomingEdgesMap = new Map<string, Edge[]>()
for (const edge of edges) {
if (!incomingEdgesMap.has(edge.target)) {
@@ -316,16 +336,20 @@ export function calculatePositions(
incomingEdgesMap.get(edge.target)!.push(edge)
}
// Position nodes layer by layer, aligning with connected predecessors
for (const layerNum of layerNumbers) {
const nodesInLayer = layers.get(layerNum)!
const xPosition = layerXPositions.get(layerNum)!
// Separate containers and non-containers
const containersInLayer = nodesInLayer.filter(isContainerBlock)
const nonContainersInLayer = nodesInLayer.filter((n) => !isContainerBlock(n))
// For the first layer (layer 0), position sequentially from padding.y
if (layerNum === 0) {
let yOffset = padding.y
// Sort containers by height for visual balance
containersInLayer.sort((a, b) => b.metrics.height - a.metrics.height)
for (const node of containersInLayer) {
@@ -337,6 +361,7 @@ export function calculatePositions(
yOffset += CONTAINER_VERTICAL_CLEARANCE
}
// Sort non-containers by outgoing connections
nonContainersInLayer.sort((a, b) => b.outgoing.size - a.outgoing.size)
for (const node of nonContainersInLayer) {
@@ -346,7 +371,9 @@ export function calculatePositions(
continue
}
// For subsequent layers, align with connected predecessors (handle-to-handle)
for (const node of [...containersInLayer, ...nonContainersInLayer]) {
// Find the bottommost predecessor handle Y (highest value) and align to it
let bestSourceHandleY = -1
let bestEdge: Edge | null = null
const incomingEdges = incomingEdgesMap.get(node.id) || []
@@ -354,6 +381,7 @@ export function calculatePositions(
for (const edge of incomingEdges) {
const predecessor = allNodes.get(edge.source)
if (predecessor) {
// Calculate actual source handle Y position based on block type and handle
const sourceHandleOffset = getSourceHandleYOffset(predecessor.block, edge.sourceHandle)
const sourceHandleY = predecessor.position.y + sourceHandleOffset
@@ -364,16 +392,20 @@ export function calculatePositions(
}
}
// If no predecessors found (shouldn't happen for layer > 0), use padding
if (bestSourceHandleY < 0) {
bestSourceHandleY = padding.y + HANDLE_POSITIONS.DEFAULT_Y_OFFSET
}
// Calculate the target handle Y offset for this node
const targetHandleOffset = getTargetHandleYOffset(node.block, bestEdge?.targetHandle)
// Position node so its target handle aligns with the source handle Y
node.position = { x: xPosition, y: bestSourceHandleY - targetHandleOffset }
}
}
// Resolve vertical overlaps within layers (X overlaps prevented by cumulative positioning)
resolveVerticalOverlaps(Array.from(layers.values()).flat(), verticalSpacing)
}
@@ -403,7 +435,7 @@ export function layoutBlocksCore(
return { nodes: new Map(), dimensions: { width: 0, height: 0 } }
}
const layoutOptions: LayoutOptions =
const layoutOptions =
options.layoutOptions ??
(options.isContainer ? CONTAINER_LAYOUT_OPTIONS : DEFAULT_LAYOUT_OPTIONS)
@@ -420,13 +452,7 @@ export function layoutBlocksCore(
calculatePositions(layers, edges, layoutOptions)
// 5. Normalize positions
let dimensions = normalizePositions(nodes, { isContainer: options.isContainer })
// 6. Snap to grid if gridSize is specified (recalculates dimensions)
const snappedDimensions = snapNodesToGrid(nodes, layoutOptions.gridSize)
if (snappedDimensions) {
dimensions = snappedDimensions
}
const dimensions = normalizePositions(nodes, { isContainer: options.isContainer })
return { nodes, dimensions }
}

View File

@@ -36,13 +36,14 @@ export function applyAutoLayout(
const horizontalSpacing = options.horizontalSpacing ?? DEFAULT_HORIZONTAL_SPACING
const verticalSpacing = options.verticalSpacing ?? DEFAULT_VERTICAL_SPACING
// Pre-calculate container dimensions by laying out their children (bottom-up)
// This ensures accurate widths/heights before root-level layout
prepareContainerDimensions(
blocksCopy,
edges,
layoutBlocksCore,
horizontalSpacing,
verticalSpacing,
options.gridSize
verticalSpacing
)
const { root: rootBlockIds } = getBlocksByParent(blocksCopy)
@@ -57,6 +58,8 @@ export function applyAutoLayout(
(edge) => layoutRootIds.includes(edge.source) && layoutRootIds.includes(edge.target)
)
// Calculate subflow depths before laying out root blocks
// This ensures blocks connected to subflow ends are positioned correctly
const subflowDepths = calculateSubflowDepths(blocksCopy, edges, assignLayers)
if (Object.keys(rootBlocks).length > 0) {
@@ -92,12 +95,13 @@ export function applyAutoLayout(
}
export type { TargetedLayoutOptions } from '@/lib/workflows/autolayout/targeted'
// Function exports
export { applyTargetedLayout } from '@/lib/workflows/autolayout/targeted'
// Type exports
export type { Edge, LayoutOptions, LayoutResult } from '@/lib/workflows/autolayout/types'
export {
getBlockMetrics,
isContainerType,
shouldSkipAutoLayout,
snapPositionToGrid,
transferBlockHeights,
} from '@/lib/workflows/autolayout/utils'

View File

@@ -1,3 +1,4 @@
import { createLogger } from '@sim/logger'
import {
CONTAINER_PADDING,
DEFAULT_HORIZONTAL_SPACING,
@@ -13,11 +14,12 @@ import {
isContainerType,
prepareContainerDimensions,
shouldSkipAutoLayout,
snapPositionToGrid,
} from '@/lib/workflows/autolayout/utils'
import { CONTAINER_DIMENSIONS } from '@/lib/workflows/blocks/block-dimensions'
import type { BlockState } from '@/stores/workflows/workflow/types'
const logger = createLogger('AutoLayout:Targeted')
export interface TargetedLayoutOptions extends LayoutOptions {
changedBlockIds: string[]
verticalSpacing?: number
@@ -37,7 +39,6 @@ export function applyTargetedLayout(
changedBlockIds,
verticalSpacing = DEFAULT_VERTICAL_SPACING,
horizontalSpacing = DEFAULT_HORIZONTAL_SPACING,
gridSize,
} = options
if (!changedBlockIds || changedBlockIds.length === 0) {
@@ -47,17 +48,19 @@ export function applyTargetedLayout(
const changedSet = new Set(changedBlockIds)
const blocksCopy: Record<string, BlockState> = JSON.parse(JSON.stringify(blocks))
// Pre-calculate container dimensions by laying out their children (bottom-up)
// This ensures accurate widths/heights before root-level layout
prepareContainerDimensions(
blocksCopy,
edges,
layoutBlocksCore,
horizontalSpacing,
verticalSpacing,
gridSize
verticalSpacing
)
const groups = getBlocksByParent(blocksCopy)
// Calculate subflow depths before layout to properly position blocks after subflow ends
const subflowDepths = calculateSubflowDepths(blocksCopy, edges, assignLayers)
layoutGroup(
@@ -68,8 +71,7 @@ export function applyTargetedLayout(
changedSet,
verticalSpacing,
horizontalSpacing,
subflowDepths,
gridSize
subflowDepths
)
for (const [parentId, childIds] of groups.children.entries()) {
@@ -81,8 +83,7 @@ export function applyTargetedLayout(
changedSet,
verticalSpacing,
horizontalSpacing,
subflowDepths,
gridSize
subflowDepths
)
}
@@ -100,8 +101,7 @@ function layoutGroup(
changedSet: Set<string>,
verticalSpacing: number,
horizontalSpacing: number,
subflowDepths: Map<string, number>,
gridSize?: number
subflowDepths: Map<string, number>
): void {
if (childIds.length === 0) return
@@ -116,6 +116,7 @@ function layoutGroup(
return
}
// Determine which blocks need repositioning
const requestedLayout = layoutEligibleChildIds.filter((id) => {
const block = blocks[id]
if (!block) return false
@@ -140,6 +141,7 @@ function layoutGroup(
return
}
// Store old positions for anchor calculation
const oldPositions = new Map<string, { x: number; y: number }>()
for (const id of layoutEligibleChildIds) {
const block = blocks[id]
@@ -147,6 +149,8 @@ function layoutGroup(
oldPositions.set(id, { ...block.position })
}
// Compute layout positions using core function
// Only pass subflowDepths for root-level layout (not inside containers)
const layoutPositions = computeLayoutPositions(
layoutEligibleChildIds,
blocks,
@@ -154,8 +158,7 @@ function layoutGroup(
parentBlock,
horizontalSpacing,
verticalSpacing,
parentId === null ? subflowDepths : undefined,
gridSize
parentId === null ? subflowDepths : undefined
)
if (layoutPositions.size === 0) {
@@ -165,6 +168,7 @@ function layoutGroup(
return
}
// Find anchor block (unchanged block with a layout position)
let offsetX = 0
let offsetY = 0
@@ -181,16 +185,20 @@ function layoutGroup(
}
}
// Apply new positions only to blocks that need layout
for (const id of needsLayout) {
const block = blocks[id]
const newPos = layoutPositions.get(id)
if (!block || !newPos) continue
block.position = snapPositionToGrid({ x: newPos.x + offsetX, y: newPos.y + offsetY }, gridSize)
block.position = {
x: newPos.x + offsetX,
y: newPos.y + offsetY,
}
}
}
/**
* Computes layout positions for a subset of blocks using the core layout function
* Computes layout positions for a subset of blocks using the core layout
*/
function computeLayoutPositions(
childIds: string[],
@@ -199,8 +207,7 @@ function computeLayoutPositions(
parentBlock: BlockState | undefined,
horizontalSpacing: number,
verticalSpacing: number,
subflowDepths?: Map<string, number>,
gridSize?: number
subflowDepths?: Map<string, number>
): Map<string, { x: number; y: number }> {
const subsetBlocks: Record<string, BlockState> = {}
for (const id of childIds) {
@@ -221,11 +228,11 @@ function computeLayoutPositions(
layoutOptions: {
horizontalSpacing: isContainer ? horizontalSpacing * 0.85 : horizontalSpacing,
verticalSpacing,
gridSize,
},
subflowDepths,
})
// Update parent container dimensions if applicable
if (parentBlock) {
parentBlock.data = {
...parentBlock.data,
@@ -234,6 +241,7 @@ function computeLayoutPositions(
}
}
// Convert nodes to position map
const positions = new Map<string, { x: number; y: number }>()
for (const node of nodes.values()) {
positions.set(node.id, { x: node.position.x, y: node.position.y })

View File

@@ -7,7 +7,6 @@ export interface LayoutOptions {
horizontalSpacing?: number
verticalSpacing?: number
padding?: { x: number; y: number }
gridSize?: number
}
export interface LayoutResult {

View File

@@ -18,61 +18,6 @@ function resolveNumeric(value: number | undefined, fallback: number): number {
return typeof value === 'number' && Number.isFinite(value) ? value : fallback
}
/**
* Snaps a single coordinate value to the nearest grid position
*/
function snapToGrid(value: number, gridSize: number): number {
return Math.round(value / gridSize) * gridSize
}
/**
* Snaps a position to the nearest grid point.
* Returns the original position if gridSize is 0 or not provided.
*/
export function snapPositionToGrid(
position: { x: number; y: number },
gridSize: number | undefined
): { x: number; y: number } {
if (!gridSize || gridSize <= 0) {
return position
}
return {
x: snapToGrid(position.x, gridSize),
y: snapToGrid(position.y, gridSize),
}
}
/**
* Snaps all node positions in a graph to grid positions and returns updated dimensions.
* Returns null if gridSize is not set or no snapping was needed.
*/
export function snapNodesToGrid(
nodes: Map<string, GraphNode>,
gridSize: number | undefined
): { width: number; height: number } | null {
if (!gridSize || gridSize <= 0 || nodes.size === 0) {
return null
}
let minX = Number.POSITIVE_INFINITY
let minY = Number.POSITIVE_INFINITY
let maxX = Number.NEGATIVE_INFINITY
let maxY = Number.NEGATIVE_INFINITY
for (const node of nodes.values()) {
node.position = snapPositionToGrid(node.position, gridSize)
minX = Math.min(minX, node.position.x)
minY = Math.min(minY, node.position.y)
maxX = Math.max(maxX, node.position.x + node.metrics.width)
maxY = Math.max(maxY, node.position.y + node.metrics.height)
}
return {
width: maxX - minX + CONTAINER_PADDING * 2,
height: maxY - minY + CONTAINER_PADDING * 2,
}
}
/**
* Checks if a block type is a container (loop or parallel)
*/
@@ -369,7 +314,6 @@ export type LayoutFunction = (
horizontalSpacing?: number
verticalSpacing?: number
padding?: { x: number; y: number }
gridSize?: number
}
subflowDepths?: Map<string, number>
}
@@ -385,15 +329,13 @@ export type LayoutFunction = (
* @param layoutFn - The layout function to use for calculating dimensions
* @param horizontalSpacing - Horizontal spacing between blocks
* @param verticalSpacing - Vertical spacing between blocks
* @param gridSize - Optional grid size for snap-to-grid
*/
export function prepareContainerDimensions(
blocks: Record<string, BlockState>,
edges: Edge[],
layoutFn: LayoutFunction,
horizontalSpacing: number,
verticalSpacing: number,
gridSize?: number
verticalSpacing: number
): void {
const { children } = getBlocksByParent(blocks)
@@ -460,7 +402,6 @@ export function prepareContainerDimensions(
layoutOptions: {
horizontalSpacing: horizontalSpacing * 0.85,
verticalSpacing,
gridSize,
},
})

View File

@@ -23,9 +23,11 @@ import type {
ContextExtensions,
ExecutionCallbacks,
IterationContext,
SerializableExecutionState,
} from '@/executor/execution/types'
import type { ExecutionResult, NormalizedBlockOutput } from '@/executor/types'
import { hasExecutionResult } from '@/executor/utils/errors'
import { buildParallelSentinelEndId, buildSentinelEndId } from '@/executor/utils/subflow-utils'
import { Serializer } from '@/serializer'
const logger = createLogger('ExecutionCore')
@@ -40,6 +42,12 @@ export interface ExecuteWorkflowCoreOptions {
abortSignal?: AbortSignal
includeFileBase64?: boolean
base64MaxBytes?: number
stopAfterBlockId?: string
/** Run-from-block mode: execute starting from a specific block using cached upstream outputs */
runFromBlock?: {
startBlockId: string
sourceSnapshot: SerializableExecutionState
}
}
function parseVariableValueByType(value: unknown, type: string): unknown {
@@ -114,6 +122,8 @@ export async function executeWorkflowCore(
abortSignal,
includeFileBase64,
base64MaxBytes,
stopAfterBlockId,
runFromBlock,
} = options
const { metadata, workflow, input, workflowVariables, selectedOutputs } = snapshot
const { requestId, workflowId, userId, triggerType, executionId, triggerBlockId, useDraftState } =
@@ -246,6 +256,16 @@ export async function executeWorkflowCore(
processedInput = input || {}
// Resolve stopAfterBlockId for loop/parallel containers to their sentinel-end IDs
let resolvedStopAfterBlockId = stopAfterBlockId
if (stopAfterBlockId) {
if (serializedWorkflow.loops?.[stopAfterBlockId]) {
resolvedStopAfterBlockId = buildSentinelEndId(stopAfterBlockId)
} else if (serializedWorkflow.parallels?.[stopAfterBlockId]) {
resolvedStopAfterBlockId = buildParallelSentinelEndId(stopAfterBlockId)
}
}
// Create and execute workflow with callbacks
if (resumeFromSnapshot) {
logger.info(`[${requestId}] Resume execution detected`, {
@@ -296,6 +316,7 @@ export async function executeWorkflowCore(
abortSignal,
includeFileBase64,
base64MaxBytes,
stopAfterBlockId: resolvedStopAfterBlockId,
}
const executorInstance = new Executor({
@@ -318,10 +339,13 @@ export async function executeWorkflowCore(
}
}
const result = (await executorInstance.execute(
workflowId,
resolvedTriggerBlockId
)) as ExecutionResult
const result = runFromBlock
? ((await executorInstance.executeFromBlock(
workflowId,
runFromBlock.startBlockId,
runFromBlock.sourceSnapshot
)) as ExecutionResult)
: ((await executorInstance.execute(workflowId, resolvedTriggerBlockId)) as ExecutionResult)
// Build trace spans for logging from the full execution result
const { traceSpans, totalDuration } = buildTraceSpans(result)

View File

@@ -180,3 +180,140 @@ export function formatSSEEvent(event: ExecutionEvent): string {
export function encodeSSEEvent(event: ExecutionEvent): Uint8Array {
return new TextEncoder().encode(formatSSEEvent(event))
}
/**
* Options for creating SSE execution callbacks
*/
export interface SSECallbackOptions {
executionId: string
workflowId: string
controller: ReadableStreamDefaultController<Uint8Array>
isStreamClosed: () => boolean
setStreamClosed: () => void
}
/**
* Creates SSE callbacks for workflow execution streaming
*/
export function createSSECallbacks(options: SSECallbackOptions) {
const { executionId, workflowId, controller, isStreamClosed, setStreamClosed } = options
const sendEvent = (event: ExecutionEvent) => {
if (isStreamClosed()) return
try {
controller.enqueue(encodeSSEEvent(event))
} catch {
setStreamClosed()
}
}
const onBlockStart = async (
blockId: string,
blockName: string,
blockType: string,
iterationContext?: { iterationCurrent: number; iterationTotal: number; iterationType: string }
) => {
sendEvent({
type: 'block:started',
timestamp: new Date().toISOString(),
executionId,
workflowId,
data: {
blockId,
blockName,
blockType,
...(iterationContext && {
iterationCurrent: iterationContext.iterationCurrent,
iterationTotal: iterationContext.iterationTotal,
iterationType: iterationContext.iterationType as any,
}),
},
})
}
const onBlockComplete = async (
blockId: string,
blockName: string,
blockType: string,
callbackData: { input?: unknown; output: any; executionTime: number },
iterationContext?: { iterationCurrent: number; iterationTotal: number; iterationType: string }
) => {
const hasError = callbackData.output?.error
const iterationData = iterationContext
? {
iterationCurrent: iterationContext.iterationCurrent,
iterationTotal: iterationContext.iterationTotal,
iterationType: iterationContext.iterationType as any,
}
: {}
if (hasError) {
sendEvent({
type: 'block:error',
timestamp: new Date().toISOString(),
executionId,
workflowId,
data: {
blockId,
blockName,
blockType,
input: callbackData.input,
error: callbackData.output.error,
durationMs: callbackData.executionTime || 0,
...iterationData,
},
})
} else {
sendEvent({
type: 'block:completed',
timestamp: new Date().toISOString(),
executionId,
workflowId,
data: {
blockId,
blockName,
blockType,
input: callbackData.input,
output: callbackData.output,
durationMs: callbackData.executionTime || 0,
...iterationData,
},
})
}
}
const onStream = async (streamingExecution: unknown) => {
const streamingExec = streamingExecution as { stream: ReadableStream; execution: any }
const blockId = streamingExec.execution?.blockId
const reader = streamingExec.stream.getReader()
const decoder = new TextDecoder()
try {
while (true) {
const { done, value } = await reader.read()
if (done) break
const chunk = decoder.decode(value, { stream: true })
sendEvent({
type: 'stream:chunk',
timestamp: new Date().toISOString(),
executionId,
workflowId,
data: { blockId, chunk },
})
}
sendEvent({
type: 'stream:done',
timestamp: new Date().toISOString(),
executionId,
workflowId,
data: { blockId },
})
} finally {
try {
reader.releaseLock()
} catch {}
}
}
return { sendEvent, onBlockStart, onBlockComplete, onStream }
}

View File

@@ -102,7 +102,7 @@ export const azureOpenAIProvider: ProviderConfig = {
}
if (request.temperature !== undefined) payload.temperature = request.temperature
if (request.maxTokens != null) payload.max_completion_tokens = request.maxTokens
if (request.maxTokens !== undefined) payload.max_tokens = request.maxTokens
if (request.reasoningEffort !== undefined) payload.reasoning_effort = request.reasoningEffort
if (request.verbosity !== undefined) payload.verbosity = request.verbosity

View File

@@ -77,7 +77,7 @@ export const cerebrasProvider: ProviderConfig = {
messages: allMessages,
}
if (request.temperature !== undefined) payload.temperature = request.temperature
if (request.maxTokens != null) payload.max_completion_tokens = request.maxTokens
if (request.maxTokens !== undefined) payload.max_tokens = request.maxTokens
if (request.responseFormat) {
payload.response_format = {
type: 'json_schema',

View File

@@ -81,7 +81,7 @@ export const deepseekProvider: ProviderConfig = {
}
if (request.temperature !== undefined) payload.temperature = request.temperature
if (request.maxTokens != null) payload.max_tokens = request.maxTokens
if (request.maxTokens !== undefined) payload.max_tokens = request.maxTokens
let preparedTools: ReturnType<typeof prepareToolsWithUsageControl> | null = null

View File

@@ -349,7 +349,7 @@ export async function executeGeminiRequest(
if (request.temperature !== undefined) {
geminiConfig.temperature = request.temperature
}
if (request.maxTokens != null) {
if (request.maxTokens !== undefined) {
geminiConfig.maxOutputTokens = request.maxTokens
}
if (systemInstruction) {

View File

@@ -123,21 +123,17 @@ export function extractFunctionCallPart(candidate: Candidate | undefined): Part
}
/**
* Converts usage metadata from SDK response to our format.
* Per Gemini docs, total = promptTokenCount + candidatesTokenCount + toolUsePromptTokenCount + thoughtsTokenCount
* We include toolUsePromptTokenCount in input and thoughtsTokenCount in output for correct billing.
* Converts usage metadata from SDK response to our format
*/
export function convertUsageMetadata(
usageMetadata: GenerateContentResponseUsageMetadata | undefined
): GeminiUsage {
const thoughtsTokenCount = usageMetadata?.thoughtsTokenCount ?? 0
const toolUsePromptTokenCount = usageMetadata?.toolUsePromptTokenCount ?? 0
const promptTokenCount = (usageMetadata?.promptTokenCount ?? 0) + toolUsePromptTokenCount
const candidatesTokenCount = (usageMetadata?.candidatesTokenCount ?? 0) + thoughtsTokenCount
const promptTokenCount = usageMetadata?.promptTokenCount ?? 0
const candidatesTokenCount = usageMetadata?.candidatesTokenCount ?? 0
return {
promptTokenCount,
candidatesTokenCount,
totalTokenCount: usageMetadata?.totalTokenCount ?? 0,
totalTokenCount: usageMetadata?.totalTokenCount ?? promptTokenCount + candidatesTokenCount,
}
}

View File

@@ -74,7 +74,7 @@ export const groqProvider: ProviderConfig = {
}
if (request.temperature !== undefined) payload.temperature = request.temperature
if (request.maxTokens != null) payload.max_completion_tokens = request.maxTokens
if (request.maxTokens !== undefined) payload.max_tokens = request.maxTokens
if (request.responseFormat) {
payload.response_format = {

View File

@@ -91,7 +91,7 @@ export const mistralProvider: ProviderConfig = {
}
if (request.temperature !== undefined) payload.temperature = request.temperature
if (request.maxTokens != null) payload.max_tokens = request.maxTokens
if (request.maxTokens !== undefined) payload.max_tokens = request.maxTokens
if (request.responseFormat) {
payload.response_format = {

View File

@@ -1130,7 +1130,7 @@ export const PROVIDER_DEFINITIONS: Record<string, ProviderDefinition> = {
id: 'cerebras',
name: 'Cerebras',
description: 'Cerebras Cloud LLMs',
defaultModel: 'cerebras/gpt-oss-120b',
defaultModel: 'cerebras/llama-3.3-70b',
modelPatterns: [/^cerebras/],
icon: CerebrasIcon,
capabilities: {
@@ -1138,64 +1138,44 @@ export const PROVIDER_DEFINITIONS: Record<string, ProviderDefinition> = {
},
models: [
{
id: 'cerebras/gpt-oss-120b',
pricing: {
input: 0.35,
output: 0.75,
updatedAt: '2026-01-27',
},
capabilities: {},
contextWindow: 131000,
},
{
id: 'cerebras/llama3.1-8b',
id: 'cerebras/llama-3.1-8b',
pricing: {
input: 0.1,
output: 0.1,
updatedAt: '2026-01-27',
updatedAt: '2025-10-11',
},
capabilities: {},
contextWindow: 32000,
},
{
id: 'cerebras/llama-3.3-70b',
id: 'cerebras/llama-3.1-70b',
pricing: {
input: 0.85,
output: 1.2,
updatedAt: '2026-01-27',
input: 0.6,
output: 0.6,
updatedAt: '2025-10-11',
},
capabilities: {},
contextWindow: 128000,
},
{
id: 'cerebras/qwen-3-32b',
pricing: {
input: 0.4,
output: 0.8,
updatedAt: '2026-01-27',
},
capabilities: {},
contextWindow: 131000,
},
{
id: 'cerebras/qwen-3-235b-a22b-instruct-2507',
id: 'cerebras/llama-3.3-70b',
pricing: {
input: 0.6,
output: 1.2,
updatedAt: '2026-01-27',
output: 0.6,
updatedAt: '2025-10-11',
},
capabilities: {},
contextWindow: 131000,
contextWindow: 128000,
},
{
id: 'cerebras/zai-glm-4.7',
id: 'cerebras/llama-4-scout-17b-16e-instruct',
pricing: {
input: 2.25,
output: 2.75,
updatedAt: '2026-01-27',
input: 0.11,
output: 0.34,
updatedAt: '2025-10-11',
},
capabilities: {},
contextWindow: 131000,
contextWindow: 10000000,
},
],
},
@@ -1214,8 +1194,8 @@ export const PROVIDER_DEFINITIONS: Record<string, ProviderDefinition> = {
id: 'groq/openai/gpt-oss-120b',
pricing: {
input: 0.15,
output: 0.6,
updatedAt: '2026-01-27',
output: 0.75,
updatedAt: '2025-10-11',
},
capabilities: {},
contextWindow: 131072,
@@ -1223,29 +1203,9 @@ export const PROVIDER_DEFINITIONS: Record<string, ProviderDefinition> = {
{
id: 'groq/openai/gpt-oss-20b',
pricing: {
input: 0.075,
output: 0.3,
updatedAt: '2026-01-27',
},
capabilities: {},
contextWindow: 131072,
},
{
id: 'groq/openai/gpt-oss-safeguard-20b',
pricing: {
input: 0.075,
output: 0.3,
updatedAt: '2026-01-27',
},
capabilities: {},
contextWindow: 131072,
},
{
id: 'groq/qwen/qwen3-32b',
pricing: {
input: 0.29,
output: 0.59,
updatedAt: '2026-01-27',
input: 0.01,
output: 0.25,
updatedAt: '2025-10-11',
},
capabilities: {},
contextWindow: 131072,
@@ -1255,7 +1215,7 @@ export const PROVIDER_DEFINITIONS: Record<string, ProviderDefinition> = {
pricing: {
input: 0.05,
output: 0.08,
updatedAt: '2026-01-27',
updatedAt: '2025-10-11',
},
capabilities: {},
contextWindow: 131072,
@@ -1265,17 +1225,27 @@ export const PROVIDER_DEFINITIONS: Record<string, ProviderDefinition> = {
pricing: {
input: 0.59,
output: 0.79,
updatedAt: '2026-01-27',
updatedAt: '2025-10-11',
},
capabilities: {},
contextWindow: 131072,
},
{
id: 'groq/meta-llama/llama-4-scout-17b-16e-instruct',
id: 'groq/llama-4-scout-17b-instruct',
pricing: {
input: 0.11,
output: 0.34,
updatedAt: '2026-01-27',
updatedAt: '2025-10-11',
},
capabilities: {},
contextWindow: 131072,
},
{
id: 'groq/llama-4-maverick-17b-instruct',
pricing: {
input: 0.5,
output: 0.77,
updatedAt: '2025-10-11',
},
capabilities: {},
contextWindow: 131072,
@@ -1283,9 +1253,9 @@ export const PROVIDER_DEFINITIONS: Record<string, ProviderDefinition> = {
{
id: 'groq/meta-llama/llama-4-maverick-17b-128e-instruct',
pricing: {
input: 0.2,
output: 0.6,
updatedAt: '2026-01-27',
input: 0.5,
output: 0.77,
updatedAt: '2025-10-11',
},
capabilities: {},
contextWindow: 131072,
@@ -1295,7 +1265,7 @@ export const PROVIDER_DEFINITIONS: Record<string, ProviderDefinition> = {
pricing: {
input: 0.04,
output: 0.04,
updatedAt: '2026-01-27',
updatedAt: '2025-10-11',
},
capabilities: {},
contextWindow: 8192,
@@ -1305,37 +1275,27 @@ export const PROVIDER_DEFINITIONS: Record<string, ProviderDefinition> = {
pricing: {
input: 0.59,
output: 0.79,
updatedAt: '2026-01-27',
updatedAt: '2025-10-11',
},
capabilities: {},
contextWindow: 128000,
},
{
id: 'groq/deepseek-r1-distill-qwen-32b',
pricing: {
input: 0.69,
output: 0.69,
updatedAt: '2026-01-27',
},
capabilities: {},
contextWindow: 128000,
},
{
id: 'groq/moonshotai/kimi-k2-instruct-0905',
id: 'groq/moonshotai/kimi-k2-instruct',
pricing: {
input: 1.0,
output: 3.0,
updatedAt: '2026-01-27',
updatedAt: '2025-10-11',
},
capabilities: {},
contextWindow: 262144,
contextWindow: 131072,
},
{
id: 'groq/meta-llama/llama-guard-4-12b',
pricing: {
input: 0.2,
output: 0.2,
updatedAt: '2026-01-27',
updatedAt: '2025-10-11',
},
capabilities: {},
contextWindow: 131072,

View File

@@ -105,7 +105,7 @@ export const ollamaProvider: ProviderConfig = {
}
if (request.temperature !== undefined) payload.temperature = request.temperature
if (request.maxTokens != null) payload.max_tokens = request.maxTokens
if (request.maxTokens !== undefined) payload.max_tokens = request.maxTokens
if (request.responseFormat) {
payload.response_format = {

View File

@@ -81,7 +81,7 @@ export const openaiProvider: ProviderConfig = {
}
if (request.temperature !== undefined) payload.temperature = request.temperature
if (request.maxTokens != null) payload.max_completion_tokens = request.maxTokens
if (request.maxTokens !== undefined) payload.max_tokens = request.maxTokens
if (request.reasoningEffort !== undefined) payload.reasoning_effort = request.reasoningEffort
if (request.verbosity !== undefined) payload.verbosity = request.verbosity

View File

@@ -121,7 +121,7 @@ export const openRouterProvider: ProviderConfig = {
}
if (request.temperature !== undefined) payload.temperature = request.temperature
if (request.maxTokens != null) payload.max_tokens = request.maxTokens
if (request.maxTokens !== undefined) payload.max_tokens = request.maxTokens
let preparedTools: ReturnType<typeof prepareToolsWithUsageControl> | null = null
let hasActiveTools = false
@@ -516,7 +516,7 @@ export const openRouterProvider: ProviderConfig = {
return streamingResult as StreamingExecution
}
if (request.responseFormat && hasActiveTools) {
if (request.responseFormat && hasActiveTools && toolCalls.length > 0) {
const finalPayload: any = {
model: payload.model,
messages: [...currentMessages],

View File

@@ -135,7 +135,7 @@ export const vllmProvider: ProviderConfig = {
}
if (request.temperature !== undefined) payload.temperature = request.temperature
if (request.maxTokens != null) payload.max_completion_tokens = request.maxTokens
if (request.maxTokens !== undefined) payload.max_tokens = request.maxTokens
if (request.responseFormat) {
payload.response_format = {

View File

@@ -92,7 +92,7 @@ export const xAIProvider: ProviderConfig = {
}
if (request.temperature !== undefined) basePayload.temperature = request.temperature
if (request.maxTokens != null) basePayload.max_completion_tokens = request.maxTokens
if (request.maxTokens !== undefined) basePayload.max_tokens = request.maxTokens
let preparedTools: ReturnType<typeof prepareToolsWithUsageControl> | null = null
if (tools?.length) {

View File

@@ -35,4 +35,23 @@ export const useExecutionStore = create<ExecutionState & ExecutionActions>()((se
},
clearRunPath: () => set({ lastRunPath: new Map(), lastRunEdges: new Map() }),
reset: () => set(initialState),
setLastExecutionSnapshot: (workflowId, snapshot) => {
const { lastExecutionSnapshots } = get()
const newSnapshots = new Map(lastExecutionSnapshots)
newSnapshots.set(workflowId, snapshot)
set({ lastExecutionSnapshots: newSnapshots })
},
getLastExecutionSnapshot: (workflowId) => {
const { lastExecutionSnapshots } = get()
return lastExecutionSnapshots.get(workflowId)
},
clearLastExecutionSnapshot: (workflowId) => {
const { lastExecutionSnapshots } = get()
const newSnapshots = new Map(lastExecutionSnapshots)
newSnapshots.delete(workflowId)
set({ lastExecutionSnapshots: newSnapshots })
},
}))

View File

@@ -1,4 +1,5 @@
import type { Executor } from '@/executor'
import type { SerializableExecutionState } from '@/executor/execution/types'
import type { ExecutionContext } from '@/executor/types'
/**
@@ -18,16 +19,9 @@ export interface ExecutionState {
pendingBlocks: string[]
executor: Executor | null
debugContext: ExecutionContext | null
/**
* Tracks blocks from the last execution run and their success/error status.
* Cleared when a new run starts. Used to show run path indicators (rings on blocks).
*/
lastRunPath: Map<string, BlockRunStatus>
/**
* Tracks edges from the last execution run and their success/error status.
* Cleared when a new run starts. Used to show run path indicators on edges.
*/
lastRunEdges: Map<string, EdgeRunStatus>
lastExecutionSnapshots: Map<string, SerializableExecutionState>
}
export interface ExecutionActions {
@@ -41,6 +35,9 @@ export interface ExecutionActions {
setEdgeRunStatus: (edgeId: string, status: EdgeRunStatus) => void
clearRunPath: () => void
reset: () => void
setLastExecutionSnapshot: (workflowId: string, snapshot: SerializableExecutionState) => void
getLastExecutionSnapshot: (workflowId: string) => SerializableExecutionState | undefined
clearLastExecutionSnapshot: (workflowId: string) => void
}
export const initialState: ExecutionState = {
@@ -52,4 +49,5 @@ export const initialState: ExecutionState = {
debugContext: null,
lastRunPath: new Map(),
lastRunEdges: new Map(),
lastExecutionSnapshots: new Map(),
}

View File

@@ -253,6 +253,23 @@ describe('executeTool Function', () => {
vi.restoreAllMocks()
})
it('should handle errors from tools', async () => {
setupFetchMock({ status: 400, ok: false, json: { error: 'Bad request' } })
const result = await executeTool(
'http_request',
{
url: 'https://api.example.com/data',
method: 'GET',
},
true
)
expect(result.success).toBe(false)
expect(result.error).toBeDefined()
expect(result.timing).toBeDefined()
})
it('should add timing information to results', async () => {
const result = await executeTool(
'http_request',

View File

@@ -1,84 +0,0 @@
{{- if .Values.certManager.enabled }}
{{- /*
cert-manager Issuer Bootstrap Pattern
PREREQUISITE: cert-manager must be installed in your cluster before enabling this.
The root CA Certificate is created in the namespace specified by certManager.rootCA.namespace
(defaults to "cert-manager"). Ensure this namespace exists and cert-manager is running there.
Install cert-manager: https://cert-manager.io/docs/installation/
This implements the recommended pattern from cert-manager documentation:
1. A self-signed ClusterIssuer (for bootstrapping the root CA only)
2. A root CA Certificate (self-signed, used to sign other certificates)
3. A CA ClusterIssuer (uses the root CA to sign certificates)
Reference: https://cert-manager.io/docs/configuration/selfsigned/
*/ -}}
---
# 1. Self-Signed ClusterIssuer (Bootstrap Only)
# This issuer is used ONLY to create the root CA certificate.
# It should NOT be used directly for application certificates.
apiVersion: cert-manager.io/v1
kind: ClusterIssuer
metadata:
name: {{ .Values.certManager.selfSignedIssuer.name }}
labels:
{{- include "sim.labels" . | nindent 4 }}
app.kubernetes.io/component: cert-manager
spec:
selfSigned: {}
---
# 2. Root CA Certificate
# This certificate is signed by the self-signed issuer and becomes the root of trust.
# The secret created here will be used by the CA issuer to sign certificates.
# NOTE: This must be created in the cert-manager namespace (or the namespace specified
# in certManager.rootCA.namespace). Ensure cert-manager is installed there first.
apiVersion: cert-manager.io/v1
kind: Certificate
metadata:
name: {{ .Values.certManager.rootCA.certificateName }}
namespace: {{ .Values.certManager.rootCA.namespace | default "cert-manager" }} # Must match cert-manager's cluster-resource-namespace
labels:
{{- include "sim.labels" . | nindent 4 }}
app.kubernetes.io/component: cert-manager
spec:
isCA: true
commonName: {{ .Values.certManager.rootCA.commonName }}
secretName: {{ .Values.certManager.rootCA.secretName }}
duration: {{ .Values.certManager.rootCA.duration | default "87600h" }}
renewBefore: {{ .Values.certManager.rootCA.renewBefore | default "2160h" }}
privateKey:
algorithm: {{ .Values.certManager.rootCA.privateKey.algorithm | default "RSA" }}
size: {{ .Values.certManager.rootCA.privateKey.size | default 4096 }}
subject:
organizations:
{{- if .Values.certManager.rootCA.subject.organizations }}
{{- toYaml .Values.certManager.rootCA.subject.organizations | nindent 6 }}
{{- else }}
- {{ .Release.Name }}
{{- end }}
issuerRef:
name: {{ .Values.certManager.selfSignedIssuer.name }}
kind: ClusterIssuer
group: cert-manager.io
---
# 3. CA ClusterIssuer
# This is the issuer that should be used by applications to obtain certificates.
# It signs certificates using the root CA created above.
# NOTE: This issuer may briefly show "not ready" on first install while cert-manager
# processes the Certificate above and creates the secret. It will auto-reconcile.
apiVersion: cert-manager.io/v1
kind: ClusterIssuer
metadata:
name: {{ .Values.certManager.caIssuer.name }}
labels:
{{- include "sim.labels" . | nindent 4 }}
app.kubernetes.io/component: cert-manager
spec:
ca:
secretName: {{ .Values.certManager.rootCA.secretName }}
{{- end }}

View File

@@ -1,36 +1,6 @@
{{- if and .Values.ollama.enabled .Values.ollama.gpu.enabled }}
---
# 1. ConfigMap for NVIDIA Device Plugin Configuration
apiVersion: v1
kind: ConfigMap
metadata:
name: {{ include "sim.fullname" . }}-nvidia-device-plugin-config
namespace: {{ .Release.Namespace }}
labels:
{{- include "sim.labels" . | nindent 4 }}
app.kubernetes.io/component: nvidia-device-plugin
data:
config.yaml: |
version: v1
flags:
{{- if eq .Values.ollama.gpu.strategy "mig" }}
migStrategy: "single"
{{- else }}
migStrategy: "none"
{{- end }}
failOnInitError: false
plugin:
passDeviceSpecs: true
deviceListStrategy: envvar
{{- if eq .Values.ollama.gpu.strategy "time-slicing" }}
sharing:
timeSlicing:
resources:
- name: nvidia.com/gpu
replicas: {{ .Values.ollama.gpu.timeSlicingReplicas | default 5 }}
{{- end }}
---
# 2. NVIDIA Device Plugin DaemonSet for GPU support
# NVIDIA Device Plugin DaemonSet for GPU support
apiVersion: apps/v1
kind: DaemonSet
metadata:
@@ -65,6 +35,9 @@ spec:
# Only schedule on nodes with NVIDIA GPUs
accelerator: nvidia
priorityClassName: system-node-critical
runtimeClassName: nvidia
hostNetwork: true
hostPID: true
volumes:
- name: device-plugin
hostPath:
@@ -75,21 +48,22 @@ spec:
- name: sys
hostPath:
path: /sys
# Volume to mount the ConfigMap
- name: nvidia-device-plugin-config
configMap:
name: {{ include "sim.fullname" . }}-nvidia-device-plugin-config
- name: proc-driver-nvidia
hostPath:
path: /proc/driver/nvidia
containers:
- name: nvidia-device-plugin
image: nvcr.io/nvidia/k8s-device-plugin:v0.18.2
image: nvcr.io/nvidia/k8s-device-plugin:v0.14.5
imagePullPolicy: Always
args:
- "--config-file=/etc/device-plugin/config.yaml"
{{- if eq .Values.ollama.gpu.strategy "mig" }}
- --mig-strategy=single
- --pass-device-specs=true
- --fail-on-init-error=false
- --device-list-strategy=envvar
- --nvidia-driver-root=/host-sys/fs/cgroup
env:
- name: NVIDIA_MIG_MONITOR_DEVICES
value: all
{{- end }}
securityContext:
allowPrivilegeEscalation: false
capabilities:
@@ -100,16 +74,29 @@ spec:
- name: dev
mountPath: /dev
- name: sys
mountPath: /sys
mountPath: /host-sys
readOnly: true
- name: nvidia-device-plugin-config
mountPath: /etc/device-plugin/
- name: proc-driver-nvidia
mountPath: /proc/driver/nvidia
readOnly: true
resources:
requests:
cpu: 50m
memory: 20Mi
memory: 10Mi
limits:
cpu: 50m
memory: 50Mi
{{- end }}
memory: 20Mi
{{- if .Values.nodeSelector }}
nodeSelector:
{{- toYaml .Values.nodeSelector | nindent 8 }}
{{- end }}
---
# RuntimeClass for NVIDIA Container Runtime
apiVersion: node.k8s.io/v1
kind: RuntimeClass
metadata:
name: {{ include "sim.fullname" . }}-nvidia
labels:
{{- include "sim.labels" . | nindent 4 }}
handler: nvidia
{{- end }}

View File

@@ -400,10 +400,8 @@ postgresql:
algorithm: RSA # RSA or ECDSA
size: 4096 # Key size in bits
# Issuer reference (REQUIRED if tls.enabled is true)
# By default, references the CA issuer created by certManager.caIssuer
# Make sure certManager.enabled is true, or provide your own issuer
issuerRef:
name: sim-ca-issuer # Name of your cert-manager Issuer/ClusterIssuer
name: selfsigned-cluster-issuer # Name of your cert-manager Issuer/ClusterIssuer
kind: ClusterIssuer # ClusterIssuer or Issuer
group: "" # Optional: cert-manager.io (leave empty for default)
# Additional DNS names (optional)
@@ -465,26 +463,20 @@ externalDatabase:
ollama:
# Enable/disable Ollama deployment
enabled: false
# Image configuration
image:
repository: ollama/ollama
tag: latest
pullPolicy: Always
# Number of replicas
replicaCount: 1
# GPU configuration
gpu:
enabled: false
count: 1
# GPU sharing strategy: "mig" (Multi-Instance GPU) or "time-slicing"
# - mig: Hardware-level GPU partitioning (requires supported GPUs like A100)
# - time-slicing: Software-level GPU sharing (works with most NVIDIA GPUs)
strategy: "time-slicing"
# Number of time-slicing replicas (only used when strategy is "time-slicing")
timeSlicingReplicas: 5
# Node selector for GPU workloads (adjust labels based on your cluster configuration)
nodeSelector:
@@ -1193,53 +1185,4 @@ externalSecrets:
# External database password (when using managed database services)
externalDatabase:
# Path to external database password in external store
password: ""
# cert-manager configuration
# Prerequisites: Install cert-manager in your cluster first
# See: https://cert-manager.io/docs/installation/
#
# This implements the recommended CA bootstrap pattern from cert-manager:
# 1. Self-signed ClusterIssuer (bootstrap only - creates root CA)
# 2. Root CA Certificate (self-signed, becomes the trust anchor)
# 3. CA ClusterIssuer (signs application certificates using root CA)
#
# Reference: https://cert-manager.io/docs/configuration/selfsigned/
certManager:
# Enable/disable cert-manager issuer resources
enabled: false
# Self-signed ClusterIssuer (used ONLY to bootstrap the root CA)
# Do not reference this issuer directly for application certificates
selfSignedIssuer:
name: "sim-selfsigned-bootstrap-issuer"
# Root CA Certificate configuration
# This certificate is signed by the self-signed issuer and used as the trust anchor
rootCA:
# Name of the Certificate resource
certificateName: "sim-root-ca"
# Namespace where the root CA certificate and secret will be created
# Must match cert-manager's cluster-resource-namespace (default: cert-manager)
namespace: "cert-manager"
# Common name for the root CA certificate
commonName: "sim-root-ca"
# Secret name where the root CA certificate and key will be stored
secretName: "sim-root-ca-secret"
# Certificate validity duration (default: 10 years)
duration: "87600h"
# Renew before expiry (default: 90 days)
renewBefore: "2160h"
# Private key configuration
privateKey:
algorithm: RSA
size: 4096
# Subject configuration
subject:
organizations: []
# If empty, defaults to the release name
# CA ClusterIssuer configuration
# This is the issuer that applications should reference for obtaining certificates
caIssuer:
name: "sim-ca-issuer"
password: ""