Compare commits

...

15 Commits

Author SHA1 Message Date
Siddharth Ganesan
c14c614e33 Consolidation 2026-01-27 12:55:27 -08:00
Siddharth Ganesan
415acda403 Allow run from block for triggers 2026-01-27 12:50:16 -08:00
Siddharth Ganesan
8dc45e6e7e Fix 2026-01-27 12:32:18 -08:00
Siddharth Ganesan
7a0aaa460d Clean up 2026-01-27 12:30:46 -08:00
Siddharth Ganesan
2c333bfd98 Lint 2026-01-27 12:25:27 -08:00
Siddharth Ganesan
23ab11a40d Run u ntil block 2026-01-27 12:13:09 -08:00
Siddharth Ganesan
6e541949ec Change ordering 2026-01-27 11:37:36 -08:00
Siddharth Ganesan
3231955a07 Fix loop l ogs 2026-01-27 11:36:09 -08:00
Siddharth Ganesan
d38fb29e05 Fix trace spans 2026-01-27 11:21:42 -08:00
Siddharth Ganesan
5c1e620831 Fix 2026-01-27 11:03:34 -08:00
Siddharth Ganesan
72594df766 Minor improvements 2026-01-27 11:03:13 -08:00
Siddharth Ganesan
be95a7dbd8 Fix 2026-01-27 10:33:31 -08:00
Siddharth Ganesan
da5d4ac9d5 Fix 2026-01-26 17:16:35 -08:00
Siddharth Ganesan
e8534bea7a Fixes 2026-01-26 16:40:14 -08:00
Siddharth Ganesan
3d0b810a8e Run from block 2026-01-26 16:19:41 -08:00
19 changed files with 1798 additions and 129 deletions

View File

@@ -0,0 +1,305 @@
import { db, workflow as workflowTable } from '@sim/db'
import { createLogger } from '@sim/logger'
import { eq } from 'drizzle-orm'
import { type NextRequest, NextResponse } from 'next/server'
import { v4 as uuidv4 } from 'uuid'
import { z } from 'zod'
import { checkHybridAuth } from '@/lib/auth/hybrid'
import { generateRequestId } from '@/lib/core/utils/request'
import { SSE_HEADERS } from '@/lib/core/utils/sse'
import { markExecutionCancelled } from '@/lib/execution/cancellation'
import { LoggingSession } from '@/lib/logs/execution/logging-session'
import { executeWorkflowCore } from '@/lib/workflows/executor/execution-core'
import { type ExecutionEvent, encodeSSEEvent } from '@/lib/workflows/executor/execution-events'
import { ExecutionSnapshot } from '@/executor/execution/snapshot'
import type { IterationContext, SerializableExecutionState } from '@/executor/execution/types'
import type { NormalizedBlockOutput } from '@/executor/types'
import { hasExecutionResult } from '@/executor/utils/errors'
const logger = createLogger('ExecuteFromBlockAPI')
const ExecuteFromBlockSchema = z.object({
startBlockId: z.string().min(1, 'Start block ID is required'),
sourceSnapshot: z.object({
blockStates: z.record(z.any()),
executedBlocks: z.array(z.string()),
blockLogs: z.array(z.any()),
decisions: z.object({
router: z.record(z.string()),
condition: z.record(z.string()),
}),
completedLoops: z.array(z.string()),
loopExecutions: z.record(z.any()).optional(),
parallelExecutions: z.record(z.any()).optional(),
parallelBlockMapping: z.record(z.any()).optional(),
activeExecutionPath: z.array(z.string()),
}),
})
export const runtime = 'nodejs'
export const dynamic = 'force-dynamic'
export async function POST(req: NextRequest, { params }: { params: Promise<{ id: string }> }) {
const requestId = generateRequestId()
const { id: workflowId } = await params
try {
const auth = await checkHybridAuth(req, { requireWorkflowId: false })
if (!auth.success || !auth.userId) {
return NextResponse.json({ error: auth.error || 'Unauthorized' }, { status: 401 })
}
const userId = auth.userId
let body: unknown
try {
body = await req.json()
} catch {
return NextResponse.json({ error: 'Invalid JSON body' }, { status: 400 })
}
const validation = ExecuteFromBlockSchema.safeParse(body)
if (!validation.success) {
logger.warn(`[${requestId}] Invalid request body:`, validation.error.errors)
return NextResponse.json(
{
error: 'Invalid request body',
details: validation.error.errors.map((e) => ({
path: e.path.join('.'),
message: e.message,
})),
},
{ status: 400 }
)
}
const { startBlockId, sourceSnapshot } = validation.data
const executionId = uuidv4()
const [workflowRecord] = await db
.select({ workspaceId: workflowTable.workspaceId, userId: workflowTable.userId })
.from(workflowTable)
.where(eq(workflowTable.id, workflowId))
.limit(1)
if (!workflowRecord?.workspaceId) {
return NextResponse.json({ error: 'Workflow not found or has no workspace' }, { status: 404 })
}
const workspaceId = workflowRecord.workspaceId
const workflowUserId = workflowRecord.userId
logger.info(`[${requestId}] Starting run-from-block execution`, {
workflowId,
startBlockId,
executedBlocksCount: sourceSnapshot.executedBlocks.length,
})
const loggingSession = new LoggingSession(workflowId, executionId, 'manual', requestId)
const abortController = new AbortController()
let isStreamClosed = false
const stream = new ReadableStream<Uint8Array>({
async start(controller) {
const sendEvent = (event: ExecutionEvent) => {
if (isStreamClosed) return
try {
controller.enqueue(encodeSSEEvent(event))
} catch {
isStreamClosed = true
}
}
const snapshot = new ExecutionSnapshot({
requestId,
workflowId,
userId,
executionId,
triggerType: 'manual',
workspaceId,
workflowUserId,
useDraftState: true,
isClientSession: true,
})
try {
const startTime = new Date()
sendEvent({
type: 'execution:started',
timestamp: startTime.toISOString(),
executionId,
workflowId,
data: { startTime: startTime.toISOString() },
})
const result = await executeWorkflowCore({
snapshot,
loggingSession,
abortSignal: abortController.signal,
runFromBlock: {
startBlockId,
sourceSnapshot: sourceSnapshot as SerializableExecutionState,
},
callbacks: {
onBlockStart: async (
blockId: string,
blockName: string,
blockType: string,
iterationContext?: IterationContext
) => {
sendEvent({
type: 'block:started',
timestamp: new Date().toISOString(),
executionId,
workflowId,
data: {
blockId,
blockName,
blockType,
...(iterationContext && {
iterationCurrent: iterationContext.iterationCurrent,
iterationTotal: iterationContext.iterationTotal,
iterationType: iterationContext.iterationType,
}),
},
})
},
onBlockComplete: async (
blockId: string,
blockName: string,
blockType: string,
callbackData: {
input?: unknown
output: NormalizedBlockOutput
executionTime: number
},
iterationContext?: IterationContext
) => {
const hasError = (callbackData.output as any)?.error
sendEvent({
type: hasError ? 'block:error' : 'block:completed',
timestamp: new Date().toISOString(),
executionId,
workflowId,
data: {
blockId,
blockName,
blockType,
input: callbackData.input,
...(hasError
? { error: (callbackData.output as any).error }
: { output: callbackData.output }),
durationMs: callbackData.executionTime || 0,
...(iterationContext && {
iterationCurrent: iterationContext.iterationCurrent,
iterationTotal: iterationContext.iterationTotal,
iterationType: iterationContext.iterationType,
}),
},
})
},
onStream: async (streamingExecution: unknown) => {
const streamingExec = streamingExecution as {
stream: ReadableStream
execution: any
}
const blockId = streamingExec.execution?.blockId
const reader = streamingExec.stream.getReader()
const decoder = new TextDecoder()
try {
while (true) {
const { done, value } = await reader.read()
if (done) break
const chunk = decoder.decode(value, { stream: true })
sendEvent({
type: 'stream:chunk',
timestamp: new Date().toISOString(),
executionId,
workflowId,
data: { blockId, chunk },
})
}
sendEvent({
type: 'stream:done',
timestamp: new Date().toISOString(),
executionId,
workflowId,
data: { blockId },
})
} finally {
try {
reader.releaseLock()
} catch {}
}
},
},
})
if (result.status === 'cancelled') {
sendEvent({
type: 'execution:cancelled',
timestamp: new Date().toISOString(),
executionId,
workflowId,
data: { duration: result.metadata?.duration || 0 },
})
} else {
sendEvent({
type: 'execution:completed',
timestamp: new Date().toISOString(),
executionId,
workflowId,
data: {
success: result.success,
output: result.output,
duration: result.metadata?.duration || 0,
startTime: result.metadata?.startTime || startTime.toISOString(),
endTime: result.metadata?.endTime || new Date().toISOString(),
},
})
}
} catch (error: unknown) {
const errorMessage = error instanceof Error ? error.message : 'Unknown error'
logger.error(`[${requestId}] Run-from-block execution failed: ${errorMessage}`)
const executionResult = hasExecutionResult(error) ? error.executionResult : undefined
sendEvent({
type: 'execution:error',
timestamp: new Date().toISOString(),
executionId,
workflowId,
data: {
error: executionResult?.error || errorMessage,
duration: executionResult?.metadata?.duration || 0,
},
})
} finally {
if (!isStreamClosed) {
try {
controller.enqueue(new TextEncoder().encode('data: [DONE]\n\n'))
controller.close()
} catch {}
}
}
},
cancel() {
isStreamClosed = true
abortController.abort()
markExecutionCancelled(executionId).catch(() => {})
},
})
return new NextResponse(stream, {
headers: { ...SSE_HEADERS, 'X-Execution-Id': executionId },
})
} catch (error: unknown) {
const errorMessage = error instanceof Error ? error.message : 'Unknown error'
logger.error(`[${requestId}] Failed to start run-from-block execution:`, error)
return NextResponse.json(
{ error: errorMessage || 'Failed to start execution' },
{ status: 500 }
)
}
}

View File

@@ -53,6 +53,7 @@ const ExecuteWorkflowSchema = z.object({
parallels: z.record(z.any()).optional(),
})
.optional(),
stopAfterBlockId: z.string().optional(),
})
export const runtime = 'nodejs'
@@ -222,6 +223,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
includeFileBase64,
base64MaxBytes,
workflowStateOverride,
stopAfterBlockId,
} = validation.data
// For API key and internal JWT auth, the entire body is the input (except for our control fields)
@@ -237,6 +239,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
includeFileBase64,
base64MaxBytes,
workflowStateOverride,
stopAfterBlockId: _stopAfterBlockId,
workflowId: _workflowId, // Also exclude workflowId used for internal JWT auth
...rest
} = body
@@ -434,6 +437,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
loggingSession,
includeFileBase64,
base64MaxBytes,
stopAfterBlockId,
})
const outputWithBase64 = includeFileBase64
@@ -722,6 +726,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
abortSignal: abortController.signal,
includeFileBase64,
base64MaxBytes,
stopAfterBlockId,
})
if (result.status === 'paused') {

View File

@@ -1,11 +1,13 @@
import { memo, useCallback } from 'react'
import { ArrowLeftRight, ArrowUpDown, Circle, CircleOff, LogOut } from 'lucide-react'
import { ArrowLeftRight, ArrowUpDown, Circle, CircleOff, LogOut, Play } from 'lucide-react'
import { Button, Copy, Tooltip, Trash2 } from '@/components/emcn'
import { cn } from '@/lib/core/utils/cn'
import { isInputDefinitionTrigger } from '@/lib/workflows/triggers/input-definition-triggers'
import { useUserPermissionsContext } from '@/app/workspace/[workspaceId]/providers/workspace-permissions-provider'
import { useWorkflowExecution } from '@/app/workspace/[workspaceId]/w/[workflowId]/hooks'
import { validateTriggerPaste } from '@/app/workspace/[workspaceId]/w/[workflowId]/utils'
import { useCollaborativeWorkflow } from '@/hooks/use-collaborative-workflow'
import { useExecutionStore } from '@/stores/execution'
import { useNotificationStore } from '@/stores/notifications'
import { useWorkflowRegistry } from '@/stores/workflows/registry/store'
import { useWorkflowStore } from '@/stores/workflows/workflow/store'
@@ -49,6 +51,7 @@ export const ActionBar = memo(
collaborativeBatchToggleBlockHandles,
} = useCollaborativeWorkflow()
const { setPendingSelection } = useWorkflowRegistry()
const { handleRunFromBlock } = useWorkflowExecution()
const addNotification = useNotificationStore((s) => s.addNotification)
@@ -97,12 +100,30 @@ export const ActionBar = memo(
)
)
const { activeWorkflowId } = useWorkflowRegistry()
const { isExecuting, getLastExecutionSnapshot } = useExecutionStore()
const userPermissions = useUserPermissionsContext()
const edges = useWorkflowStore((state) => state.edges)
const isStartBlock = isInputDefinitionTrigger(blockType)
const isResponseBlock = blockType === 'response'
const isNoteBlock = blockType === 'note'
const isSubflowBlock = blockType === 'loop' || blockType === 'parallel'
const isInsideSubflow = parentId && (parentType === 'loop' || parentType === 'parallel')
const snapshot = activeWorkflowId ? getLastExecutionSnapshot(activeWorkflowId) : null
const incomingEdges = edges.filter((edge) => edge.target === blockId)
const isTriggerBlock = incomingEdges.length === 0
const dependenciesSatisfied =
isTriggerBlock ||
(snapshot && incomingEdges.every((edge) => snapshot.executedBlocks.includes(edge.source)))
const canRunFromBlock =
dependenciesSatisfied && !isNoteBlock && !isInsideSubflow && !isExecuting
const handleRunFromBlockClick = useCallback(() => {
if (!activeWorkflowId || !canRunFromBlock) return
handleRunFromBlock(blockId, activeWorkflowId)
}, [blockId, activeWorkflowId, canRunFromBlock, handleRunFromBlock])
/**
* Get appropriate tooltip message based on disabled state
@@ -135,23 +156,29 @@ export const ActionBar = memo(
variant='ghost'
onClick={(e) => {
e.stopPropagation()
if (!disabled) {
collaborativeBatchToggleBlockEnabled([blockId])
if (canRunFromBlock && !disabled) {
handleRunFromBlockClick()
}
}}
className={ACTION_BUTTON_STYLES}
disabled={disabled}
disabled={disabled || !canRunFromBlock}
>
{isEnabled ? <Circle className={ICON_SIZE} /> : <CircleOff className={ICON_SIZE} />}
<Play className={ICON_SIZE} />
</Button>
</Tooltip.Trigger>
<Tooltip.Content side='top'>
{getTooltipMessage(isEnabled ? 'Disable Block' : 'Enable Block')}
{(() => {
if (disabled) return getTooltipMessage('Run from this block')
if (isExecuting) return 'Execution in progress'
if (isInsideSubflow) return 'Cannot run from inside subflow'
if (!dependenciesSatisfied) return 'Run upstream blocks first'
return 'Run from this block'
})()}
</Tooltip.Content>
</Tooltip.Root>
)}
{isSubflowBlock && (
{!isNoteBlock && (
<Tooltip.Root>
<Tooltip.Trigger asChild>
<Button

View File

@@ -40,9 +40,16 @@ export interface BlockMenuProps {
onRemoveFromSubflow: () => void
onOpenEditor: () => void
onRename: () => void
onRunFromBlock?: () => void
onRunUntilBlock?: () => void
hasClipboard?: boolean
showRemoveFromSubflow?: boolean
/** Whether run from block is available (has snapshot, was executed, not inside subflow) */
canRunFromBlock?: boolean
/** Reason why run from block is disabled (for tooltip) */
runFromBlockDisabledReason?: string
disableEdit?: boolean
isExecuting?: boolean
}
/**
@@ -65,9 +72,14 @@ export function BlockMenu({
onRemoveFromSubflow,
onOpenEditor,
onRename,
onRunFromBlock,
onRunUntilBlock,
hasClipboard = false,
showRemoveFromSubflow = false,
canRunFromBlock = false,
runFromBlockDisabledReason,
disableEdit = false,
isExecuting = false,
}: BlockMenuProps) {
const isSingleBlock = selectedBlocks.length === 1
@@ -203,6 +215,39 @@ export function BlockMenu({
</PopoverItem>
)}
{/* Run from/until block - only for single non-note block selection */}
{isSingleBlock && !allNoteBlocks && (
<>
<PopoverDivider />
<PopoverItem
disabled={!canRunFromBlock || isExecuting}
onClick={() => {
if (canRunFromBlock && !isExecuting) {
onRunFromBlock?.()
onClose()
}
}}
>
{isExecuting
? 'Execution in progress...'
: !canRunFromBlock && runFromBlockDisabledReason
? runFromBlockDisabledReason
: 'Run from this block'}
</PopoverItem>
<PopoverItem
disabled={isExecuting}
onClick={() => {
if (!isExecuting) {
onRunUntilBlock?.()
onClose()
}
}}
>
{isExecuting ? 'Execution in progress...' : 'Run until this block'}
</PopoverItem>
</>
)}
{/* Destructive action */}
<PopoverDivider />
<PopoverItem

View File

@@ -15,7 +15,8 @@ import {
TriggerUtils,
} from '@/lib/workflows/triggers/triggers'
import { useCurrentWorkflow } from '@/app/workspace/[workspaceId]/w/[workflowId]/hooks/use-current-workflow'
import type { BlockLog, ExecutionResult, StreamingExecution } from '@/executor/types'
import type { SerializableExecutionState } from '@/executor/execution/types'
import type { BlockLog, BlockState, ExecutionResult, StreamingExecution } from '@/executor/types'
import { hasExecutionResult } from '@/executor/utils/errors'
import { coerceValue } from '@/executor/utils/start-block'
import { subscriptionKeys } from '@/hooks/queries/subscription'
@@ -98,6 +99,8 @@ export function useWorkflowExecution() {
setActiveBlocks,
setBlockRunStatus,
setEdgeRunStatus,
setLastExecutionSnapshot,
getLastExecutionSnapshot,
} = useExecutionStore()
const [executionResult, setExecutionResult] = useState<ExecutionResult | null>(null)
const executionStream = useExecutionStream()
@@ -668,7 +671,8 @@ export function useWorkflowExecution() {
onStream?: (se: StreamingExecution) => Promise<void>,
executionId?: string,
onBlockComplete?: (blockId: string, output: any) => Promise<void>,
overrideTriggerType?: 'chat' | 'manual' | 'api'
overrideTriggerType?: 'chat' | 'manual' | 'api',
stopAfterBlockId?: string
): Promise<ExecutionResult | StreamingExecution> => {
// Use diff workflow for execution when available, regardless of canvas view state
const executionWorkflowState = null as {
@@ -876,6 +880,8 @@ export function useWorkflowExecution() {
const activeBlocksSet = new Set<string>()
const streamedContent = new Map<string, string>()
const accumulatedBlockLogs: BlockLog[] = []
const accumulatedBlockStates = new Map<string, BlockState>()
const executedBlockIds = new Set<string>()
// Execute the workflow
try {
@@ -887,6 +893,7 @@ export function useWorkflowExecution() {
triggerType: overrideTriggerType || 'manual',
useDraftState: true,
isClientSession: true,
stopAfterBlockId,
workflowStateOverride: executionWorkflowState
? {
blocks: executionWorkflowState.blocks,
@@ -916,18 +923,22 @@ export function useWorkflowExecution() {
logger.info('onBlockCompleted received:', { data })
activeBlocksSet.delete(data.blockId)
// Create a new Set to trigger React re-render
setActiveBlocks(new Set(activeBlocksSet))
// Track successful block execution in run path
setBlockRunStatus(data.blockId, 'success')
// Edges already tracked in onBlockStarted, no need to track again
executedBlockIds.add(data.blockId)
accumulatedBlockStates.set(data.blockId, {
output: data.output,
executed: true,
executionTime: data.durationMs,
})
const isContainerBlock = data.blockType === 'loop' || data.blockType === 'parallel'
if (isContainerBlock) return
const startedAt = new Date(Date.now() - data.durationMs).toISOString()
const endedAt = new Date().toISOString()
// Accumulate block log for the execution result
accumulatedBlockLogs.push({
blockId: data.blockId,
blockName: data.blockName || 'Unknown Block',
@@ -1056,6 +1067,48 @@ export function useWorkflowExecution() {
},
logs: accumulatedBlockLogs,
}
if (data.success && activeWorkflowId) {
if (stopAfterBlockId) {
const existingSnapshot = getLastExecutionSnapshot(activeWorkflowId)
const mergedBlockStates = {
...(existingSnapshot?.blockStates || {}),
...Object.fromEntries(accumulatedBlockStates),
}
const mergedExecutedBlocks = new Set([
...(existingSnapshot?.executedBlocks || []),
...executedBlockIds,
])
const snapshot: SerializableExecutionState = {
blockStates: mergedBlockStates,
executedBlocks: Array.from(mergedExecutedBlocks),
blockLogs: [...(existingSnapshot?.blockLogs || []), ...accumulatedBlockLogs],
decisions: existingSnapshot?.decisions || { router: {}, condition: {} },
completedLoops: existingSnapshot?.completedLoops || [],
activeExecutionPath: Array.from(mergedExecutedBlocks),
}
setLastExecutionSnapshot(activeWorkflowId, snapshot)
logger.info('Merged execution snapshot after run-until-block', {
workflowId: activeWorkflowId,
newBlocksExecuted: executedBlockIds.size,
totalExecutedBlocks: mergedExecutedBlocks.size,
})
} else {
const snapshot: SerializableExecutionState = {
blockStates: Object.fromEntries(accumulatedBlockStates),
executedBlocks: Array.from(executedBlockIds),
blockLogs: accumulatedBlockLogs,
decisions: { router: {}, condition: {} },
completedLoops: [],
activeExecutionPath: Array.from(executedBlockIds),
}
setLastExecutionSnapshot(activeWorkflowId, snapshot)
logger.info('Stored execution snapshot for run-from-block', {
workflowId: activeWorkflowId,
executedBlocksCount: executedBlockIds.size,
})
}
}
},
onExecutionError: (data) => {
@@ -1376,6 +1429,265 @@ export function useWorkflowExecution() {
setActiveBlocks,
])
/**
* Handles running workflow from a specific block using cached outputs
*/
const handleRunFromBlock = useCallback(
async (blockId: string, workflowId: string) => {
const snapshot = getLastExecutionSnapshot(workflowId)
const workflowEdges = useWorkflowStore.getState().edges
const incomingEdges = workflowEdges.filter((edge) => edge.target === blockId)
const isTriggerBlock = incomingEdges.length === 0
if (!snapshot && !isTriggerBlock) {
logger.error('No execution snapshot available for run-from-block', { workflowId, blockId })
return
}
const dependenciesSatisfied =
isTriggerBlock ||
(snapshot && incomingEdges.every((edge) => snapshot.executedBlocks.includes(edge.source)))
if (!dependenciesSatisfied) {
logger.error('Upstream dependencies not satisfied for run-from-block', {
workflowId,
blockId,
})
return
}
// For trigger blocks with no snapshot, create an empty one
const effectiveSnapshot: SerializableExecutionState = snapshot || {
blockStates: {},
executedBlocks: [],
blockLogs: [],
decisions: { router: {}, condition: {} },
completedLoops: [],
activeExecutionPath: [],
}
logger.info('Starting run-from-block execution', {
workflowId,
startBlockId: blockId,
isTriggerBlock,
})
setIsExecuting(true)
const executionId = uuidv4()
const accumulatedBlockLogs: BlockLog[] = []
const accumulatedBlockStates = new Map<string, BlockState>()
const executedBlockIds = new Set<string>()
const activeBlocksSet = new Set<string>()
try {
await executionStream.executeFromBlock({
workflowId,
startBlockId: blockId,
sourceSnapshot: effectiveSnapshot,
callbacks: {
onExecutionStarted: (data) => {
logger.info('Run-from-block execution started:', data)
},
onBlockStarted: (data) => {
activeBlocksSet.add(data.blockId)
setActiveBlocks(new Set(activeBlocksSet))
const incomingEdges = workflowEdges.filter((edge) => edge.target === data.blockId)
incomingEdges.forEach((edge) => {
setEdgeRunStatus(edge.id, 'success')
})
},
onBlockCompleted: (data) => {
activeBlocksSet.delete(data.blockId)
setActiveBlocks(new Set(activeBlocksSet))
setBlockRunStatus(data.blockId, 'success')
executedBlockIds.add(data.blockId)
accumulatedBlockStates.set(data.blockId, {
output: data.output,
executed: true,
executionTime: data.durationMs,
})
const isContainerBlock = data.blockType === 'loop' || data.blockType === 'parallel'
if (isContainerBlock) return
const startedAt = new Date(Date.now() - data.durationMs).toISOString()
const endedAt = new Date().toISOString()
accumulatedBlockLogs.push({
blockId: data.blockId,
blockName: data.blockName || 'Unknown Block',
blockType: data.blockType || 'unknown',
input: data.input || {},
output: data.output,
success: true,
durationMs: data.durationMs,
startedAt,
endedAt,
})
addConsole({
input: data.input || {},
output: data.output,
success: true,
durationMs: data.durationMs,
startedAt,
endedAt,
workflowId,
blockId: data.blockId,
executionId,
blockName: data.blockName || 'Unknown Block',
blockType: data.blockType || 'unknown',
iterationCurrent: data.iterationCurrent,
iterationTotal: data.iterationTotal,
iterationType: data.iterationType,
})
},
onBlockError: (data) => {
activeBlocksSet.delete(data.blockId)
setActiveBlocks(new Set(activeBlocksSet))
setBlockRunStatus(data.blockId, 'error')
const startedAt = new Date(Date.now() - data.durationMs).toISOString()
const endedAt = new Date().toISOString()
accumulatedBlockLogs.push({
blockId: data.blockId,
blockName: data.blockName || 'Unknown Block',
blockType: data.blockType || 'unknown',
input: data.input || {},
output: {},
success: false,
error: data.error,
durationMs: data.durationMs,
startedAt,
endedAt,
})
addConsole({
input: data.input || {},
output: {},
success: false,
error: data.error,
durationMs: data.durationMs,
startedAt,
endedAt,
workflowId,
blockId: data.blockId,
executionId,
blockName: data.blockName,
blockType: data.blockType,
iterationCurrent: data.iterationCurrent,
iterationTotal: data.iterationTotal,
iterationType: data.iterationType,
})
},
onExecutionCompleted: (data) => {
if (data.success) {
const mergedBlockStates: Record<string, BlockState> = {
...effectiveSnapshot.blockStates,
}
for (const [bId, state] of accumulatedBlockStates) {
mergedBlockStates[bId] = state
}
const mergedExecutedBlocks = new Set([
...effectiveSnapshot.executedBlocks,
...executedBlockIds,
])
const updatedSnapshot: SerializableExecutionState = {
...effectiveSnapshot,
blockStates: mergedBlockStates,
executedBlocks: Array.from(mergedExecutedBlocks),
blockLogs: [...effectiveSnapshot.blockLogs, ...accumulatedBlockLogs],
activeExecutionPath: Array.from(mergedExecutedBlocks),
}
setLastExecutionSnapshot(workflowId, updatedSnapshot)
logger.info('Updated execution snapshot after run-from-block', {
workflowId,
newBlocksExecuted: executedBlockIds.size,
})
}
},
onExecutionError: (data) => {
logger.error('Run-from-block execution error:', data.error)
},
onExecutionCancelled: () => {
logger.info('Run-from-block execution cancelled')
},
},
})
} catch (error) {
if ((error as Error).name !== 'AbortError') {
logger.error('Run-from-block execution failed:', error)
}
} finally {
setIsExecuting(false)
setActiveBlocks(new Set())
}
},
[
getLastExecutionSnapshot,
setLastExecutionSnapshot,
setIsExecuting,
setActiveBlocks,
setBlockRunStatus,
setEdgeRunStatus,
addConsole,
executionStream,
]
)
/**
* Handles running workflow until a specific block (stops after that block completes)
*/
const handleRunUntilBlock = useCallback(
async (blockId: string, workflowId: string) => {
if (!workflowId || workflowId !== activeWorkflowId) {
logger.error('Invalid workflow ID for run-until-block', { workflowId, activeWorkflowId })
return
}
logger.info('Starting run-until-block execution', { workflowId, stopAfterBlockId: blockId })
setExecutionResult(null)
setIsExecuting(true)
const executionId = uuidv4()
try {
const result = await executeWorkflow(
undefined,
undefined,
executionId,
undefined,
'manual',
blockId
)
if (result && 'success' in result) {
setExecutionResult(result)
}
} catch (error) {
const errorResult = handleExecutionError(error, { executionId })
return errorResult
} finally {
setIsExecuting(false)
setIsDebugging(false)
setActiveBlocks(new Set())
}
},
[activeWorkflowId, setExecutionResult, setIsExecuting, setIsDebugging, setActiveBlocks]
)
return {
isExecuting,
isDebugging,
@@ -1386,5 +1698,7 @@ export function useWorkflowExecution() {
handleResumeDebug,
handleCancelDebug,
handleCancelExecution,
handleRunFromBlock,
handleRunUntilBlock,
}
}

View File

@@ -47,6 +47,7 @@ import {
useCurrentWorkflow,
useNodeUtilities,
useShiftSelectionLock,
useWorkflowExecution,
} from '@/app/workspace/[workspaceId]/w/[workflowId]/hooks'
import {
calculateContainerDimensions,
@@ -325,6 +326,8 @@ const WorkflowContent = React.memo(() => {
const showTrainingModal = useCopilotTrainingStore((state) => state.showModal)
const { handleRunFromBlock, handleRunUntilBlock } = useWorkflowExecution()
const snapToGridSize = useSnapToGridSize()
const snapToGrid = snapToGridSize > 0
@@ -758,13 +761,16 @@ const WorkflowContent = React.memo(() => {
[collaborativeBatchAddBlocks, setSelectedEdges, setPendingSelection]
)
const { activeBlockIds, pendingBlocks, isDebugging } = useExecutionStore(
useShallow((state) => ({
activeBlockIds: state.activeBlockIds,
pendingBlocks: state.pendingBlocks,
isDebugging: state.isDebugging,
}))
)
const { activeBlockIds, pendingBlocks, isDebugging, isExecuting, getLastExecutionSnapshot } =
useExecutionStore(
useShallow((state) => ({
activeBlockIds: state.activeBlockIds,
pendingBlocks: state.pendingBlocks,
isDebugging: state.isDebugging,
isExecuting: state.isExecuting,
getLastExecutionSnapshot: state.getLastExecutionSnapshot,
}))
)
const [dragStartParentId, setDragStartParentId] = useState<string | null>(null)
@@ -988,6 +994,41 @@ const WorkflowContent = React.memo(() => {
}
}, [contextMenuBlocks])
const handleContextRunFromBlock = useCallback(() => {
if (contextMenuBlocks.length !== 1) return
const blockId = contextMenuBlocks[0].id
handleRunFromBlock(blockId, workflowIdParam)
}, [contextMenuBlocks, workflowIdParam, handleRunFromBlock])
const handleContextRunUntilBlock = useCallback(() => {
if (contextMenuBlocks.length !== 1) return
const blockId = contextMenuBlocks[0].id
handleRunUntilBlock(blockId, workflowIdParam)
}, [contextMenuBlocks, workflowIdParam, handleRunUntilBlock])
const runFromBlockState = useMemo(() => {
if (contextMenuBlocks.length !== 1) {
return { canRun: false, reason: undefined }
}
const block = contextMenuBlocks[0]
const snapshot = getLastExecutionSnapshot(workflowIdParam)
const incomingEdges = edges.filter((edge) => edge.target === block.id)
const isTriggerBlock = incomingEdges.length === 0
const dependenciesSatisfied =
isTriggerBlock ||
(snapshot && incomingEdges.every((edge) => snapshot.executedBlocks.includes(edge.source)))
const isNoteBlock = block.type === 'note'
const isInsideSubflow =
block.parentId && (block.parentType === 'loop' || block.parentType === 'parallel')
if (isInsideSubflow) return { canRun: false, reason: 'Cannot run from inside subflow' }
if (!dependenciesSatisfied) return { canRun: false, reason: 'Run upstream blocks first' }
if (isNoteBlock) return { canRun: false, reason: undefined }
if (isExecuting) return { canRun: false, reason: undefined }
return { canRun: true, reason: undefined }
}, [contextMenuBlocks, edges, workflowIdParam, getLastExecutionSnapshot, isExecuting])
const handleContextAddBlock = useCallback(() => {
useSearchModalStore.getState().open()
}, [])
@@ -3308,11 +3349,16 @@ const WorkflowContent = React.memo(() => {
onRemoveFromSubflow={handleContextRemoveFromSubflow}
onOpenEditor={handleContextOpenEditor}
onRename={handleContextRename}
onRunFromBlock={handleContextRunFromBlock}
onRunUntilBlock={handleContextRunUntilBlock}
hasClipboard={hasClipboard()}
showRemoveFromSubflow={contextMenuBlocks.some(
(b) => b.parentId && (b.parentType === 'loop' || b.parentType === 'parallel')
)}
canRunFromBlock={runFromBlockState.canRun}
runFromBlockDisabledReason={runFromBlockState.reason}
disableEdit={!effectivePermissions.canEdit}
isExecuting={isExecuting}
/>
<CanvasMenu

View File

@@ -26,6 +26,7 @@ export class ExecutionEngine {
private allowResumeTriggers: boolean
private cancelledFlag = false
private errorFlag = false
private stoppedEarlyFlag = false
private executionError: Error | null = null
private lastCancellationCheck = 0
private readonly useRedisCancellation: boolean
@@ -105,7 +106,7 @@ export class ExecutionEngine {
this.initializeQueue(triggerBlockId)
while (this.hasWork()) {
if ((await this.checkCancellation()) || this.errorFlag) {
if ((await this.checkCancellation()) || this.errorFlag || this.stoppedEarlyFlag) {
break
}
await this.processQueue()
@@ -259,6 +260,16 @@ export class ExecutionEngine {
}
private initializeQueue(triggerBlockId?: string): void {
if (this.context.runFromBlockContext) {
const { startBlockId } = this.context.runFromBlockContext
logger.info('Initializing queue for run-from-block mode', {
startBlockId,
dirtySetSize: this.context.runFromBlockContext.dirtySet.size,
})
this.addToQueue(startBlockId)
return
}
const pendingBlocks = this.context.metadata.pendingBlocks
const remainingEdges = (this.context.metadata as any).remainingEdges
@@ -385,6 +396,12 @@ export class ExecutionEngine {
this.finalOutput = output
}
if (this.context.stopAfterBlockId === nodeId) {
logger.info('Stopping execution after target block', { nodeId })
this.stoppedEarlyFlag = true
return
}
const readyNodes = this.edgeManager.processOutgoingEdges(node, output, false)
logger.info('Processing outgoing edges', {

View File

@@ -5,12 +5,22 @@ import { BlockExecutor } from '@/executor/execution/block-executor'
import { EdgeManager } from '@/executor/execution/edge-manager'
import { ExecutionEngine } from '@/executor/execution/engine'
import { ExecutionState } from '@/executor/execution/state'
import type { ContextExtensions, WorkflowInput } from '@/executor/execution/types'
import type {
ContextExtensions,
SerializableExecutionState,
WorkflowInput,
} from '@/executor/execution/types'
import { createBlockHandlers } from '@/executor/handlers/registry'
import { LoopOrchestrator } from '@/executor/orchestrators/loop'
import { NodeExecutionOrchestrator } from '@/executor/orchestrators/node'
import { ParallelOrchestrator } from '@/executor/orchestrators/parallel'
import type { BlockState, ExecutionContext, ExecutionResult } from '@/executor/types'
import {
computeDirtySet,
type RunFromBlockContext,
resolveContainerToSentinelStart,
validateRunFromBlock,
} from '@/executor/utils/run-from-block'
import {
buildResolutionFromBlock,
buildStartBlockOutput,
@@ -89,17 +99,108 @@ export class DAGExecutor {
}
}
/**
* Execute from a specific block using cached outputs for upstream blocks.
*/
async executeFromBlock(
workflowId: string,
startBlockId: string,
sourceSnapshot: SerializableExecutionState
): Promise<ExecutionResult> {
const dag = this.dagBuilder.build(this.workflow)
const executedBlocks = new Set(sourceSnapshot.executedBlocks)
const validation = validateRunFromBlock(startBlockId, dag, executedBlocks)
if (!validation.valid) {
throw new Error(validation.error)
}
const dirtySet = computeDirtySet(dag, startBlockId)
const effectiveStartBlockId = resolveContainerToSentinelStart(startBlockId, dag) ?? startBlockId
logger.info('Executing from block', {
workflowId,
startBlockId,
effectiveStartBlockId,
dirtySetSize: dirtySet.size,
totalBlocks: dag.nodes.size,
dirtyBlocks: Array.from(dirtySet),
})
// Remove incoming edges from non-dirty sources so convergent blocks don't wait for cached upstream
for (const nodeId of dirtySet) {
const node = dag.nodes.get(nodeId)
if (!node) continue
const nonDirtyIncoming: string[] = []
for (const sourceId of node.incomingEdges) {
if (!dirtySet.has(sourceId)) {
nonDirtyIncoming.push(sourceId)
}
}
for (const sourceId of nonDirtyIncoming) {
node.incomingEdges.delete(sourceId)
logger.debug('Removed non-dirty incoming edge for run-from-block', {
nodeId,
sourceId,
})
}
}
const runFromBlockContext = { startBlockId: effectiveStartBlockId, dirtySet }
const { context, state } = this.createExecutionContext(workflowId, undefined, {
snapshotState: sourceSnapshot,
runFromBlockContext,
})
const resolver = new VariableResolver(this.workflow, this.workflowVariables, state)
const loopOrchestrator = new LoopOrchestrator(dag, state, resolver)
loopOrchestrator.setContextExtensions(this.contextExtensions)
const parallelOrchestrator = new ParallelOrchestrator(dag, state)
parallelOrchestrator.setResolver(resolver)
parallelOrchestrator.setContextExtensions(this.contextExtensions)
const allHandlers = createBlockHandlers()
const blockExecutor = new BlockExecutor(allHandlers, resolver, this.contextExtensions, state)
const edgeManager = new EdgeManager(dag)
loopOrchestrator.setEdgeManager(edgeManager)
const nodeOrchestrator = new NodeExecutionOrchestrator(
dag,
state,
blockExecutor,
loopOrchestrator,
parallelOrchestrator
)
const engine = new ExecutionEngine(context, dag, edgeManager, nodeOrchestrator)
return await engine.run()
}
private createExecutionContext(
workflowId: string,
triggerBlockId?: string
triggerBlockId?: string,
overrides?: {
snapshotState?: SerializableExecutionState
runFromBlockContext?: RunFromBlockContext
}
): { context: ExecutionContext; state: ExecutionState } {
const snapshotState = this.contextExtensions.snapshotState
const snapshotState = overrides?.snapshotState ?? this.contextExtensions.snapshotState
const blockStates = snapshotState?.blockStates
? new Map(Object.entries(snapshotState.blockStates))
: new Map<string, BlockState>()
const executedBlocks = snapshotState?.executedBlocks
let executedBlocks = snapshotState?.executedBlocks
? new Set(snapshotState.executedBlocks)
: new Set<string>()
if (overrides?.runFromBlockContext) {
const { dirtySet } = overrides.runFromBlockContext
executedBlocks = new Set([...executedBlocks].filter((id) => !dirtySet.has(id)))
logger.info('Cleared executed status for dirty blocks', {
dirtySetSize: dirtySet.size,
remainingExecutedBlocks: executedBlocks.size,
})
}
const state = new ExecutionState(blockStates, executedBlocks)
const context: ExecutionContext = {
@@ -109,7 +210,7 @@ export class DAGExecutor {
userId: this.contextExtensions.userId,
isDeployedContext: this.contextExtensions.isDeployedContext,
blockStates: state.getBlockStates(),
blockLogs: snapshotState?.blockLogs ?? [],
blockLogs: overrides?.runFromBlockContext ? [] : (snapshotState?.blockLogs ?? []),
metadata: {
...this.contextExtensions.metadata,
startTime: new Date().toISOString(),
@@ -169,6 +270,8 @@ export class DAGExecutor {
abortSignal: this.contextExtensions.abortSignal,
includeFileBase64: this.contextExtensions.includeFileBase64,
base64MaxBytes: this.contextExtensions.base64MaxBytes,
runFromBlockContext: overrides?.runFromBlockContext,
stopAfterBlockId: this.contextExtensions.stopAfterBlockId,
}
if (this.contextExtensions.resumeFromSnapshot) {
@@ -193,6 +296,10 @@ export class DAGExecutor {
pendingBlocks: context.metadata.pendingBlocks,
skipStarterBlockInit: true,
})
} else if (overrides?.runFromBlockContext) {
logger.info('Run-from-block mode: skipping starter block initialization', {
startBlockId: overrides.runFromBlockContext.startBlockId,
})
} else {
this.initializeStarterBlock(context, state, triggerBlockId)
}

View File

@@ -1,5 +1,6 @@
import type { Edge } from 'reactflow'
import type { BlockLog, BlockState, NormalizedBlockOutput } from '@/executor/types'
import type { RunFromBlockContext } from '@/executor/utils/run-from-block'
import type { SubflowType } from '@/stores/workflows/workflow/types'
export interface ExecutionMetadata {
@@ -105,6 +106,17 @@ export interface ContextExtensions {
output: { input?: any; output: NormalizedBlockOutput; executionTime: number },
iterationContext?: IterationContext
) => Promise<void>
/**
* Run-from-block configuration. When provided, executor runs in partial
* execution mode starting from the specified block.
*/
runFromBlockContext?: RunFromBlockContext
/**
* Stop execution after this block completes. Used for "run until block" feature.
*/
stopAfterBlockId?: string
}
export interface WorkflowInput {

View File

@@ -276,7 +276,16 @@ export class LoopOrchestrator {
scope: LoopScope
): LoopContinuationResult {
const results = scope.allIterationOutputs
this.state.setBlockOutput(loopId, { results }, DEFAULTS.EXECUTION_TIME)
const output = { results }
this.state.setBlockOutput(loopId, output, DEFAULTS.EXECUTION_TIME)
// Emit onBlockComplete for the loop container so the UI can track it
if (this.contextExtensions?.onBlockComplete) {
this.contextExtensions.onBlockComplete(loopId, 'Loop', 'loop', {
output,
executionTime: DEFAULTS.EXECUTION_TIME,
})
}
return {
shouldContinue: false,

View File

@@ -31,7 +31,18 @@ export class NodeExecutionOrchestrator {
throw new Error(`Node not found in DAG: ${nodeId}`)
}
if (this.state.hasExecuted(nodeId)) {
if (ctx.runFromBlockContext && !ctx.runFromBlockContext.dirtySet.has(nodeId)) {
const cachedOutput = this.state.getBlockOutput(nodeId) || {}
logger.debug('Skipping non-dirty block in run-from-block mode', { nodeId })
return {
nodeId,
output: cachedOutput,
isFinalOutput: false,
}
}
const isDirtyBlock = ctx.runFromBlockContext?.dirtySet.has(nodeId) ?? false
if (!isDirtyBlock && this.state.hasExecuted(nodeId)) {
const output = this.state.getBlockOutput(nodeId) || {}
return {
nodeId,

View File

@@ -228,9 +228,17 @@ export class ParallelOrchestrator {
const branchOutputs = scope.branchOutputs.get(i) || []
results.push(branchOutputs)
}
this.state.setBlockOutput(parallelId, {
results,
})
const output = { results }
this.state.setBlockOutput(parallelId, output)
// Emit onBlockComplete for the parallel container so the UI can track it
if (this.contextExtensions?.onBlockComplete) {
this.contextExtensions.onBlockComplete(parallelId, 'Parallel', 'parallel', {
output,
executionTime: 0,
})
}
return {
allBranchesComplete: true,
results,

View File

@@ -1,6 +1,7 @@
import type { TraceSpan } from '@/lib/logs/types'
import type { PermissionGroupConfig } from '@/lib/permission-groups/types'
import type { BlockOutput } from '@/blocks/types'
import type { RunFromBlockContext } from '@/executor/utils/run-from-block'
import type { SerializedBlock, SerializedWorkflow } from '@/serializer/types'
export interface UserFile {
@@ -250,6 +251,17 @@ export interface ExecutionContext {
* will not have their base64 content fetched.
*/
base64MaxBytes?: number
/**
* Context for "run from block" mode. When present, only blocks in dirtySet
* will be executed; others return cached outputs from the source snapshot.
*/
runFromBlockContext?: RunFromBlockContext
/**
* Stop execution after this block completes. Used for "run until block" feature.
*/
stopAfterBlockId?: string
}
export interface ExecutionResult {

View File

@@ -0,0 +1,493 @@
import { describe, expect, it } from 'vitest'
import type { DAG, DAGNode } from '@/executor/dag/builder'
import type { DAGEdge, NodeMetadata } from '@/executor/dag/types'
import { computeDirtySet, validateRunFromBlock } from '@/executor/utils/run-from-block'
import type { SerializedLoop, SerializedParallel } from '@/serializer/types'
/**
* Helper to create a DAG node for testing
*/
function createNode(
id: string,
outgoingEdges: Array<{ target: string; sourceHandle?: string }> = [],
metadata: Partial<NodeMetadata> = {}
): DAGNode {
const edges = new Map<string, DAGEdge>()
for (const edge of outgoingEdges) {
edges.set(edge.target, { target: edge.target, sourceHandle: edge.sourceHandle })
}
return {
id,
block: {
id,
position: { x: 0, y: 0 },
config: { tool: 'test', params: {} },
inputs: {},
outputs: {},
metadata: { id: 'test', name: `block-${id}`, category: 'tools' },
enabled: true,
},
incomingEdges: new Set<string>(),
outgoingEdges: edges,
metadata: {
isParallelBranch: false,
isLoopNode: false,
isSentinel: false,
...metadata,
},
}
}
/**
* Helper to create a DAG for testing
*/
function createDAG(nodes: DAGNode[]): DAG {
const nodeMap = new Map<string, DAGNode>()
for (const node of nodes) {
nodeMap.set(node.id, node)
}
// Set up incoming edges based on outgoing edges
for (const node of nodes) {
for (const [, edge] of node.outgoingEdges) {
const targetNode = nodeMap.get(edge.target)
if (targetNode) {
targetNode.incomingEdges.add(node.id)
}
}
}
return {
nodes: nodeMap,
loopConfigs: new Map<string, SerializedLoop>(),
parallelConfigs: new Map<string, SerializedParallel>(),
}
}
describe('computeDirtySet', () => {
it('includes start block in dirty set', () => {
const dag = createDAG([createNode('A'), createNode('B'), createNode('C')])
const dirtySet = computeDirtySet(dag, 'B')
expect(dirtySet.has('B')).toBe(true)
})
it('includes all downstream blocks in linear workflow', () => {
// A → B → C → D
const dag = createDAG([
createNode('A', [{ target: 'B' }]),
createNode('B', [{ target: 'C' }]),
createNode('C', [{ target: 'D' }]),
createNode('D'),
])
const dirtySet = computeDirtySet(dag, 'B')
expect(dirtySet.has('A')).toBe(false)
expect(dirtySet.has('B')).toBe(true)
expect(dirtySet.has('C')).toBe(true)
expect(dirtySet.has('D')).toBe(true)
expect(dirtySet.size).toBe(3)
})
it('handles branching paths', () => {
// A → B → C
// ↓
// D → E
const dag = createDAG([
createNode('A', [{ target: 'B' }]),
createNode('B', [{ target: 'C' }, { target: 'D' }]),
createNode('C'),
createNode('D', [{ target: 'E' }]),
createNode('E'),
])
const dirtySet = computeDirtySet(dag, 'B')
expect(dirtySet.has('A')).toBe(false)
expect(dirtySet.has('B')).toBe(true)
expect(dirtySet.has('C')).toBe(true)
expect(dirtySet.has('D')).toBe(true)
expect(dirtySet.has('E')).toBe(true)
expect(dirtySet.size).toBe(4)
})
it('handles convergence points', () => {
// A → C
// B → C → D
const dag = createDAG([
createNode('A', [{ target: 'C' }]),
createNode('B', [{ target: 'C' }]),
createNode('C', [{ target: 'D' }]),
createNode('D'),
])
// Run from A: should include A, C, D (but not B)
const dirtySet = computeDirtySet(dag, 'A')
expect(dirtySet.has('A')).toBe(true)
expect(dirtySet.has('B')).toBe(false)
expect(dirtySet.has('C')).toBe(true)
expect(dirtySet.has('D')).toBe(true)
expect(dirtySet.size).toBe(3)
})
it('handles diamond pattern', () => {
// B
// ↗ ↘
// A D
// ↘ ↗
// C
const dag = createDAG([
createNode('A', [{ target: 'B' }, { target: 'C' }]),
createNode('B', [{ target: 'D' }]),
createNode('C', [{ target: 'D' }]),
createNode('D'),
])
const dirtySet = computeDirtySet(dag, 'A')
expect(dirtySet.has('A')).toBe(true)
expect(dirtySet.has('B')).toBe(true)
expect(dirtySet.has('C')).toBe(true)
expect(dirtySet.has('D')).toBe(true)
expect(dirtySet.size).toBe(4)
})
it('stops at graph boundaries', () => {
// A → B C → D (disconnected)
const dag = createDAG([
createNode('A', [{ target: 'B' }]),
createNode('B'),
createNode('C', [{ target: 'D' }]),
createNode('D'),
])
const dirtySet = computeDirtySet(dag, 'A')
expect(dirtySet.has('A')).toBe(true)
expect(dirtySet.has('B')).toBe(true)
expect(dirtySet.has('C')).toBe(false)
expect(dirtySet.has('D')).toBe(false)
expect(dirtySet.size).toBe(2)
})
it('handles single node workflow', () => {
const dag = createDAG([createNode('A')])
const dirtySet = computeDirtySet(dag, 'A')
expect(dirtySet.has('A')).toBe(true)
expect(dirtySet.size).toBe(1)
})
it('handles node not in DAG gracefully', () => {
const dag = createDAG([createNode('A'), createNode('B')])
const dirtySet = computeDirtySet(dag, 'nonexistent')
// Should just contain the start block ID even if not found
expect(dirtySet.has('nonexistent')).toBe(true)
expect(dirtySet.size).toBe(1)
})
it('includes convergent block when running from one branch of parallel', () => {
// Parallel branches converging:
// A → B → D
// A → C → D
// Running from B should include B and D (but not A or C)
const dag = createDAG([
createNode('A', [{ target: 'B' }, { target: 'C' }]),
createNode('B', [{ target: 'D' }]),
createNode('C', [{ target: 'D' }]),
createNode('D'),
])
const dirtySet = computeDirtySet(dag, 'B')
expect(dirtySet.has('A')).toBe(false)
expect(dirtySet.has('B')).toBe(true)
expect(dirtySet.has('C')).toBe(false)
expect(dirtySet.has('D')).toBe(true)
expect(dirtySet.size).toBe(2)
})
it('handles running from convergent block itself (all upstream non-dirty)', () => {
// A → C
// B → C
// Running from C should only include C
const dag = createDAG([
createNode('A', [{ target: 'C' }]),
createNode('B', [{ target: 'C' }]),
createNode('C', [{ target: 'D' }]),
createNode('D'),
])
const dirtySet = computeDirtySet(dag, 'C')
expect(dirtySet.has('A')).toBe(false)
expect(dirtySet.has('B')).toBe(false)
expect(dirtySet.has('C')).toBe(true)
expect(dirtySet.has('D')).toBe(true)
expect(dirtySet.size).toBe(2)
})
it('handles deep downstream chains', () => {
// A → B → C → D → E → F
// Running from C should include C, D, E, F
const dag = createDAG([
createNode('A', [{ target: 'B' }]),
createNode('B', [{ target: 'C' }]),
createNode('C', [{ target: 'D' }]),
createNode('D', [{ target: 'E' }]),
createNode('E', [{ target: 'F' }]),
createNode('F'),
])
const dirtySet = computeDirtySet(dag, 'C')
expect(dirtySet.has('A')).toBe(false)
expect(dirtySet.has('B')).toBe(false)
expect(dirtySet.has('C')).toBe(true)
expect(dirtySet.has('D')).toBe(true)
expect(dirtySet.has('E')).toBe(true)
expect(dirtySet.has('F')).toBe(true)
expect(dirtySet.size).toBe(4)
})
})
describe('validateRunFromBlock', () => {
it('accepts valid block', () => {
const dag = createDAG([createNode('A'), createNode('B')])
const executedBlocks = new Set(['A', 'B'])
const result = validateRunFromBlock('A', dag, executedBlocks)
expect(result.valid).toBe(true)
expect(result.error).toBeUndefined()
})
it('rejects block not found in DAG', () => {
const dag = createDAG([createNode('A')])
const executedBlocks = new Set(['A', 'B'])
const result = validateRunFromBlock('B', dag, executedBlocks)
expect(result.valid).toBe(false)
expect(result.error).toContain('Block not found')
})
it('rejects blocks inside loops', () => {
const dag = createDAG([createNode('A', [], { isLoopNode: true, loopId: 'loop-1' })])
const executedBlocks = new Set(['A'])
const result = validateRunFromBlock('A', dag, executedBlocks)
expect(result.valid).toBe(false)
expect(result.error).toContain('inside loop')
expect(result.error).toContain('loop-1')
})
it('rejects blocks inside parallels', () => {
const dag = createDAG([
createNode('A', [], { isParallelBranch: true, parallelId: 'parallel-1' }),
])
const executedBlocks = new Set(['A'])
const result = validateRunFromBlock('A', dag, executedBlocks)
expect(result.valid).toBe(false)
expect(result.error).toContain('inside parallel')
expect(result.error).toContain('parallel-1')
})
it('rejects sentinel nodes', () => {
const dag = createDAG([createNode('A', [], { isSentinel: true, sentinelType: 'start' })])
const executedBlocks = new Set(['A'])
const result = validateRunFromBlock('A', dag, executedBlocks)
expect(result.valid).toBe(false)
expect(result.error).toContain('sentinel')
})
it('rejects blocks with unexecuted upstream dependencies', () => {
// A → B, only A executed but B depends on A
const dag = createDAG([createNode('A', [{ target: 'B' }]), createNode('B')])
const executedBlocks = new Set<string>() // A was not executed
const result = validateRunFromBlock('B', dag, executedBlocks)
expect(result.valid).toBe(false)
expect(result.error).toContain('Upstream dependency not executed')
})
it('allows blocks with no dependencies even if not previously executed', () => {
// A and B are independent (no edges)
const dag = createDAG([createNode('A'), createNode('B')])
const executedBlocks = new Set(['A']) // B was not executed but has no deps
const result = validateRunFromBlock('B', dag, executedBlocks)
expect(result.valid).toBe(true) // B has no incoming edges, so it's valid
})
it('accepts regular executed block', () => {
const dag = createDAG([
createNode('trigger', [{ target: 'A' }]),
createNode('A', [{ target: 'B' }]),
createNode('B'),
])
const executedBlocks = new Set(['trigger', 'A', 'B'])
const result = validateRunFromBlock('A', dag, executedBlocks)
expect(result.valid).toBe(true)
})
it('accepts loop container when executed', () => {
// Loop container with sentinel nodes
const loopId = 'loop-container-1'
const sentinelStartId = `loop-${loopId}-sentinel-start`
const sentinelEndId = `loop-${loopId}-sentinel-end`
const dag = createDAG([
createNode('A', [{ target: sentinelStartId }]),
createNode(sentinelStartId, [{ target: 'B' }], {
isSentinel: true,
sentinelType: 'start',
loopId,
}),
createNode('B', [{ target: sentinelEndId }], { isLoopNode: true, loopId }),
createNode(sentinelEndId, [{ target: 'C' }], {
isSentinel: true,
sentinelType: 'end',
loopId,
}),
createNode('C'),
])
dag.loopConfigs.set(loopId, { id: loopId, nodes: ['B'], iterations: 3, loopType: 'for' } as any)
const executedBlocks = new Set(['A', loopId, sentinelStartId, 'B', sentinelEndId, 'C'])
const result = validateRunFromBlock(loopId, dag, executedBlocks)
expect(result.valid).toBe(true)
})
it('accepts parallel container when executed', () => {
// Parallel container with sentinel nodes
const parallelId = 'parallel-container-1'
const sentinelStartId = `parallel-${parallelId}-sentinel-start`
const sentinelEndId = `parallel-${parallelId}-sentinel-end`
const dag = createDAG([
createNode('A', [{ target: sentinelStartId }]),
createNode(sentinelStartId, [{ target: 'B₍0₎' }], {
isSentinel: true,
sentinelType: 'start',
parallelId,
}),
createNode('B₍0₎', [{ target: sentinelEndId }], { isParallelBranch: true, parallelId }),
createNode(sentinelEndId, [{ target: 'C' }], {
isSentinel: true,
sentinelType: 'end',
parallelId,
}),
createNode('C'),
])
dag.parallelConfigs.set(parallelId, { id: parallelId, nodes: ['B'], count: 2 } as any)
const executedBlocks = new Set(['A', parallelId, sentinelStartId, 'B₍0₎', sentinelEndId, 'C'])
const result = validateRunFromBlock(parallelId, dag, executedBlocks)
expect(result.valid).toBe(true)
})
it('allows loop container with no upstream dependencies', () => {
// Loop containers are validated via their sentinel nodes, not incoming edges on the container itself
// If the loop has no upstream dependencies, it should be valid
const loopId = 'loop-container-1'
const sentinelStartId = `loop-${loopId}-sentinel-start`
const dag = createDAG([
createNode(sentinelStartId, [], { isSentinel: true, sentinelType: 'start', loopId }),
])
dag.loopConfigs.set(loopId, { id: loopId, nodes: [], iterations: 3, loopType: 'for' } as any)
const executedBlocks = new Set<string>() // Nothing executed but loop has no deps
const result = validateRunFromBlock(loopId, dag, executedBlocks)
// Loop container validation doesn't check incoming edges (containers don't have nodes in dag.nodes)
// So this is valid - the loop can start fresh
expect(result.valid).toBe(true)
})
})
describe('computeDirtySet with containers', () => {
it('includes loop container and all downstream when running from loop', () => {
// A → loop-sentinel-start → B (inside loop) → loop-sentinel-end → C
const loopId = 'loop-1'
const sentinelStartId = `loop-${loopId}-sentinel-start`
const sentinelEndId = `loop-${loopId}-sentinel-end`
const dag = createDAG([
createNode('A', [{ target: sentinelStartId }]),
createNode(sentinelStartId, [{ target: 'B' }], {
isSentinel: true,
sentinelType: 'start',
loopId,
}),
createNode('B', [{ target: sentinelEndId }], { isLoopNode: true, loopId }),
createNode(sentinelEndId, [{ target: 'C' }], {
isSentinel: true,
sentinelType: 'end',
loopId,
}),
createNode('C'),
])
dag.loopConfigs.set(loopId, { id: loopId, nodes: ['B'], iterations: 3, loopType: 'for' } as any)
const dirtySet = computeDirtySet(dag, loopId)
// Should include loop container, sentinel-start, B, sentinel-end, C
expect(dirtySet.has(loopId)).toBe(true)
expect(dirtySet.has(sentinelStartId)).toBe(true)
expect(dirtySet.has('B')).toBe(true)
expect(dirtySet.has(sentinelEndId)).toBe(true)
expect(dirtySet.has('C')).toBe(true)
// Should NOT include A (upstream)
expect(dirtySet.has('A')).toBe(false)
})
it('includes parallel container and all downstream when running from parallel', () => {
// A → parallel-sentinel-start → B₍0₎ → parallel-sentinel-end → C
const parallelId = 'parallel-1'
const sentinelStartId = `parallel-${parallelId}-sentinel-start`
const sentinelEndId = `parallel-${parallelId}-sentinel-end`
const dag = createDAG([
createNode('A', [{ target: sentinelStartId }]),
createNode(sentinelStartId, [{ target: 'B₍0₎' }], {
isSentinel: true,
sentinelType: 'start',
parallelId,
}),
createNode('B₍0₎', [{ target: sentinelEndId }], { isParallelBranch: true, parallelId }),
createNode(sentinelEndId, [{ target: 'C' }], {
isSentinel: true,
sentinelType: 'end',
parallelId,
}),
createNode('C'),
])
dag.parallelConfigs.set(parallelId, { id: parallelId, nodes: ['B'], count: 2 } as any)
const dirtySet = computeDirtySet(dag, parallelId)
// Should include parallel container, sentinel-start, B₍0₎, sentinel-end, C
expect(dirtySet.has(parallelId)).toBe(true)
expect(dirtySet.has(sentinelStartId)).toBe(true)
expect(dirtySet.has('B₍0₎')).toBe(true)
expect(dirtySet.has(sentinelEndId)).toBe(true)
expect(dirtySet.has('C')).toBe(true)
// Should NOT include A (upstream)
expect(dirtySet.has('A')).toBe(false)
})
})

View File

@@ -0,0 +1,169 @@
import { createLogger } from '@sim/logger'
import { LOOP, PARALLEL } from '@/executor/constants'
import type { DAG } from '@/executor/dag/builder'
const logger = createLogger('run-from-block')
/**
* Builds the sentinel-start node ID for a loop.
*/
function buildLoopSentinelStartId(loopId: string): string {
return `${LOOP.SENTINEL.PREFIX}${loopId}${LOOP.SENTINEL.START_SUFFIX}`
}
/**
* Builds the sentinel-start node ID for a parallel.
*/
function buildParallelSentinelStartId(parallelId: string): string {
return `${PARALLEL.SENTINEL.PREFIX}${parallelId}${PARALLEL.SENTINEL.START_SUFFIX}`
}
/**
* Checks if a block ID is a loop or parallel container and returns the sentinel-start ID if so.
* Returns null if the block is not a container.
*/
export function resolveContainerToSentinelStart(blockId: string, dag: DAG): string | null {
if (dag.loopConfigs.has(blockId)) {
return buildLoopSentinelStartId(blockId)
}
if (dag.parallelConfigs.has(blockId)) {
return buildParallelSentinelStartId(blockId)
}
return null
}
/**
* Result of validating a block for run-from-block execution.
*/
export interface RunFromBlockValidation {
valid: boolean
error?: string
}
/**
* Context for run-from-block execution mode.
*/
export interface RunFromBlockContext {
/** The block ID to start execution from */
startBlockId: string
/** Set of block IDs that need re-execution (start block + all downstream) */
dirtySet: Set<string>
}
/**
* Computes all blocks that need re-execution when running from a specific block.
* Uses BFS to find all downstream blocks reachable via outgoing edges.
*
* For loop/parallel containers, starts from the sentinel-start node and includes
* the container ID itself in the dirty set.
*
* @param dag - The workflow DAG
* @param startBlockId - The block to start execution from
* @returns Set of block IDs that are "dirty" and need re-execution
*/
export function computeDirtySet(dag: DAG, startBlockId: string): Set<string> {
const dirty = new Set<string>([startBlockId])
const sentinelStartId = resolveContainerToSentinelStart(startBlockId, dag)
const traversalStartId = sentinelStartId ?? startBlockId
if (sentinelStartId) {
dirty.add(sentinelStartId)
}
const queue = [traversalStartId]
while (queue.length > 0) {
const nodeId = queue.shift()!
const node = dag.nodes.get(nodeId)
if (!node) continue
for (const [, edge] of node.outgoingEdges) {
if (!dirty.has(edge.target)) {
dirty.add(edge.target)
queue.push(edge.target)
}
}
}
logger.debug('Computed dirty set', {
startBlockId,
traversalStartId,
dirtySetSize: dirty.size,
dirtyBlocks: Array.from(dirty),
})
return dirty
}
/**
* Validates that a block can be used as a run-from-block starting point.
*
* Validation rules:
* - Block must exist in the DAG (or be a loop/parallel container)
* - Block cannot be inside a loop (but loop containers are allowed)
* - Block cannot be inside a parallel (but parallel containers are allowed)
* - Block cannot be a sentinel node
* - All upstream dependencies must have been executed (have cached outputs)
*
* @param blockId - The block ID to validate
* @param dag - The workflow DAG
* @param executedBlocks - Set of blocks that were executed in the source run
* @returns Validation result with error message if invalid
*/
export function validateRunFromBlock(
blockId: string,
dag: DAG,
executedBlocks: Set<string>
): RunFromBlockValidation {
const node = dag.nodes.get(blockId)
const isLoopContainer = dag.loopConfigs.has(blockId)
const isParallelContainer = dag.parallelConfigs.has(blockId)
const isContainer = isLoopContainer || isParallelContainer
if (!node && !isContainer) {
return { valid: false, error: `Block not found in workflow: ${blockId}` }
}
if (isContainer) {
const sentinelStartId = resolveContainerToSentinelStart(blockId, dag)
if (!sentinelStartId || !dag.nodes.has(sentinelStartId)) {
return {
valid: false,
error: `Container sentinel not found for: ${blockId}`,
}
}
}
if (node) {
if (node.metadata.isLoopNode) {
return {
valid: false,
error: `Cannot run from block inside loop: ${node.metadata.loopId}`,
}
}
if (node.metadata.isParallelBranch) {
return {
valid: false,
error: `Cannot run from block inside parallel: ${node.metadata.parallelId}`,
}
}
if (node.metadata.isSentinel) {
return { valid: false, error: 'Cannot run from sentinel node' }
}
if (node.incomingEdges.size > 0) {
for (const sourceId of node.incomingEdges.keys()) {
if (!executedBlocks.has(sourceId)) {
return {
valid: false,
error: `Upstream dependency not executed: ${sourceId}`,
}
}
}
}
}
return { valid: true }
}

View File

@@ -1,10 +1,85 @@
import { useCallback, useRef } from 'react'
import { createLogger } from '@sim/logger'
import type { ExecutionEvent } from '@/lib/workflows/executor/execution-events'
import type { SerializableExecutionState } from '@/executor/execution/types'
import type { SubflowType } from '@/stores/workflows/workflow/types'
const logger = createLogger('useExecutionStream')
/**
* Processes SSE events from a response body and invokes appropriate callbacks.
*/
async function processSSEStream(
reader: ReadableStreamDefaultReader<Uint8Array>,
callbacks: ExecutionStreamCallbacks,
logPrefix: string
): Promise<void> {
const decoder = new TextDecoder()
let buffer = ''
try {
while (true) {
const { done, value } = await reader.read()
if (done) break
buffer += decoder.decode(value, { stream: true })
const lines = buffer.split('\n\n')
buffer = lines.pop() || ''
for (const line of lines) {
if (!line.trim() || !line.startsWith('data: ')) continue
const data = line.substring(6).trim()
if (data === '[DONE]') {
logger.info(`${logPrefix} stream completed`)
continue
}
try {
const event = JSON.parse(data) as ExecutionEvent
switch (event.type) {
case 'execution:started':
callbacks.onExecutionStarted?.(event.data)
break
case 'execution:completed':
callbacks.onExecutionCompleted?.(event.data)
break
case 'execution:error':
callbacks.onExecutionError?.(event.data)
break
case 'execution:cancelled':
callbacks.onExecutionCancelled?.(event.data)
break
case 'block:started':
callbacks.onBlockStarted?.(event.data)
break
case 'block:completed':
callbacks.onBlockCompleted?.(event.data)
break
case 'block:error':
callbacks.onBlockError?.(event.data)
break
case 'stream:chunk':
callbacks.onStreamChunk?.(event.data)
break
case 'stream:done':
callbacks.onStreamDone?.(event.data)
break
default:
logger.warn('Unknown event type:', (event as any).type)
}
} catch (error) {
logger.error('Failed to parse SSE event:', error, { data })
}
}
}
} finally {
reader.releaseLock()
}
}
export interface ExecutionStreamCallbacks {
onExecutionStarted?: (data: { startTime: string }) => void
onExecutionCompleted?: (data: {
@@ -68,6 +143,14 @@ export interface ExecuteStreamOptions {
loops?: Record<string, any>
parallels?: Record<string, any>
}
stopAfterBlockId?: string
callbacks?: ExecutionStreamCallbacks
}
export interface ExecuteFromBlockOptions {
workflowId: string
startBlockId: string
sourceSnapshot: SerializableExecutionState
callbacks?: ExecutionStreamCallbacks
}
@@ -119,91 +202,7 @@ export function useExecutionStream() {
}
const reader = response.body.getReader()
const decoder = new TextDecoder()
let buffer = ''
try {
while (true) {
const { done, value } = await reader.read()
if (done) {
break
}
buffer += decoder.decode(value, { stream: true })
const lines = buffer.split('\n\n')
buffer = lines.pop() || ''
for (const line of lines) {
if (!line.trim() || !line.startsWith('data: ')) {
continue
}
const data = line.substring(6).trim()
if (data === '[DONE]') {
logger.info('Stream completed')
continue
}
try {
const event = JSON.parse(data) as ExecutionEvent
logger.info('📡 SSE Event received:', {
type: event.type,
executionId: event.executionId,
data: event.data,
})
switch (event.type) {
case 'execution:started':
logger.info('🚀 Execution started')
callbacks.onExecutionStarted?.(event.data)
break
case 'execution:completed':
logger.info('✅ Execution completed')
callbacks.onExecutionCompleted?.(event.data)
break
case 'execution:error':
logger.error('❌ Execution error')
callbacks.onExecutionError?.(event.data)
break
case 'execution:cancelled':
logger.warn('🛑 Execution cancelled')
callbacks.onExecutionCancelled?.(event.data)
break
case 'block:started':
logger.info('🔷 Block started:', event.data.blockId)
callbacks.onBlockStarted?.(event.data)
break
case 'block:completed':
logger.info('✓ Block completed:', event.data.blockId)
callbacks.onBlockCompleted?.(event.data)
break
case 'block:error':
logger.error('✗ Block error:', event.data.blockId)
callbacks.onBlockError?.(event.data)
break
case 'stream:chunk':
callbacks.onStreamChunk?.(event.data)
break
case 'stream:done':
logger.info('Stream done:', event.data.blockId)
callbacks.onStreamDone?.(event.data)
break
default:
logger.warn('Unknown event type:', (event as any).type)
}
} catch (error) {
logger.error('Failed to parse SSE event:', error, { data })
}
}
}
} finally {
reader.releaseLock()
}
await processSSEStream(reader, callbacks, 'Execution')
} catch (error: any) {
if (error.name === 'AbortError') {
logger.info('Execution stream cancelled')
@@ -222,6 +221,65 @@ export function useExecutionStream() {
}
}, [])
const executeFromBlock = useCallback(async (options: ExecuteFromBlockOptions) => {
const { workflowId, startBlockId, sourceSnapshot, callbacks = {} } = options
if (abortControllerRef.current) {
abortControllerRef.current.abort()
}
const abortController = new AbortController()
abortControllerRef.current = abortController
currentExecutionRef.current = null
try {
const response = await fetch(`/api/workflows/${workflowId}/execute-from-block`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
},
body: JSON.stringify({ startBlockId, sourceSnapshot }),
signal: abortController.signal,
})
if (!response.ok) {
const errorResponse = await response.json()
const error = new Error(errorResponse.error || 'Failed to start execution')
if (errorResponse && typeof errorResponse === 'object') {
Object.assign(error, { executionResult: errorResponse })
}
throw error
}
if (!response.body) {
throw new Error('No response body')
}
const executionId = response.headers.get('X-Execution-Id')
if (executionId) {
currentExecutionRef.current = { workflowId, executionId }
}
const reader = response.body.getReader()
await processSSEStream(reader, callbacks, 'Run-from-block')
} catch (error: any) {
if (error.name === 'AbortError') {
logger.info('Run-from-block execution cancelled')
callbacks.onExecutionCancelled?.({ duration: 0 })
} else {
logger.error('Run-from-block execution error:', error)
callbacks.onExecutionError?.({
error: error.message || 'Unknown error',
duration: 0,
})
}
throw error
} finally {
abortControllerRef.current = null
currentExecutionRef.current = null
}
}, [])
const cancel = useCallback(() => {
const execution = currentExecutionRef.current
if (execution) {
@@ -239,6 +297,7 @@ export function useExecutionStream() {
return {
execute,
executeFromBlock,
cancel,
}
}

View File

@@ -22,6 +22,7 @@ import type {
ContextExtensions,
ExecutionCallbacks,
IterationContext,
SerializableExecutionState,
} from '@/executor/execution/types'
import type { ExecutionResult, NormalizedBlockOutput } from '@/executor/types'
import { hasExecutionResult } from '@/executor/utils/errors'
@@ -40,6 +41,12 @@ export interface ExecuteWorkflowCoreOptions {
abortSignal?: AbortSignal
includeFileBase64?: boolean
base64MaxBytes?: number
stopAfterBlockId?: string
/** Run-from-block mode: execute starting from a specific block using cached upstream outputs */
runFromBlock?: {
startBlockId: string
sourceSnapshot: SerializableExecutionState
}
}
function parseVariableValueByType(value: unknown, type: string): unknown {
@@ -114,6 +121,8 @@ export async function executeWorkflowCore(
abortSignal,
includeFileBase64,
base64MaxBytes,
stopAfterBlockId,
runFromBlock,
} = options
const { metadata, workflow, input, workflowVariables, selectedOutputs } = snapshot
const { requestId, workflowId, userId, triggerType, executionId, triggerBlockId, useDraftState } =
@@ -297,6 +306,7 @@ export async function executeWorkflowCore(
abortSignal,
includeFileBase64,
base64MaxBytes,
stopAfterBlockId,
}
const executorInstance = new Executor({
@@ -319,10 +329,13 @@ export async function executeWorkflowCore(
}
}
const result = (await executorInstance.execute(
workflowId,
resolvedTriggerBlockId
)) as ExecutionResult
const result = runFromBlock
? ((await executorInstance.executeFromBlock(
workflowId,
runFromBlock.startBlockId,
runFromBlock.sourceSnapshot
)) as ExecutionResult)
: ((await executorInstance.execute(workflowId, resolvedTriggerBlockId)) as ExecutionResult)
// Build trace spans for logging from the full execution result
const { traceSpans, totalDuration } = buildTraceSpans(result)

View File

@@ -35,4 +35,23 @@ export const useExecutionStore = create<ExecutionState & ExecutionActions>()((se
},
clearRunPath: () => set({ lastRunPath: new Map(), lastRunEdges: new Map() }),
reset: () => set(initialState),
setLastExecutionSnapshot: (workflowId, snapshot) => {
const { lastExecutionSnapshots } = get()
const newSnapshots = new Map(lastExecutionSnapshots)
newSnapshots.set(workflowId, snapshot)
set({ lastExecutionSnapshots: newSnapshots })
},
getLastExecutionSnapshot: (workflowId) => {
const { lastExecutionSnapshots } = get()
return lastExecutionSnapshots.get(workflowId)
},
clearLastExecutionSnapshot: (workflowId) => {
const { lastExecutionSnapshots } = get()
const newSnapshots = new Map(lastExecutionSnapshots)
newSnapshots.delete(workflowId)
set({ lastExecutionSnapshots: newSnapshots })
},
}))

View File

@@ -1,4 +1,5 @@
import type { Executor } from '@/executor'
import type { SerializableExecutionState } from '@/executor/execution/types'
import type { ExecutionContext } from '@/executor/types'
/**
@@ -18,16 +19,9 @@ export interface ExecutionState {
pendingBlocks: string[]
executor: Executor | null
debugContext: ExecutionContext | null
/**
* Tracks blocks from the last execution run and their success/error status.
* Cleared when a new run starts. Used to show run path indicators (rings on blocks).
*/
lastRunPath: Map<string, BlockRunStatus>
/**
* Tracks edges from the last execution run and their success/error status.
* Cleared when a new run starts. Used to show run path indicators on edges.
*/
lastRunEdges: Map<string, EdgeRunStatus>
lastExecutionSnapshots: Map<string, SerializableExecutionState>
}
export interface ExecutionActions {
@@ -41,6 +35,9 @@ export interface ExecutionActions {
setEdgeRunStatus: (edgeId: string, status: EdgeRunStatus) => void
clearRunPath: () => void
reset: () => void
setLastExecutionSnapshot: (workflowId: string, snapshot: SerializableExecutionState) => void
getLastExecutionSnapshot: (workflowId: string) => SerializableExecutionState | undefined
clearLastExecutionSnapshot: (workflowId: string) => void
}
export const initialState: ExecutionState = {
@@ -52,4 +49,5 @@ export const initialState: ExecutionState = {
debugContext: null,
lastRunPath: new Map(),
lastRunEdges: new Map(),
lastExecutionSnapshots: new Map(),
}