mirror of
https://github.com/simstudioai/sim.git
synced 2026-01-27 15:58:11 -05:00
Compare commits
15 Commits
feat/block
...
feat/run-f
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c14c614e33 | ||
|
|
415acda403 | ||
|
|
8dc45e6e7e | ||
|
|
7a0aaa460d | ||
|
|
2c333bfd98 | ||
|
|
23ab11a40d | ||
|
|
6e541949ec | ||
|
|
3231955a07 | ||
|
|
d38fb29e05 | ||
|
|
5c1e620831 | ||
|
|
72594df766 | ||
|
|
be95a7dbd8 | ||
|
|
da5d4ac9d5 | ||
|
|
e8534bea7a | ||
|
|
3d0b810a8e |
305
apps/sim/app/api/workflows/[id]/execute-from-block/route.ts
Normal file
305
apps/sim/app/api/workflows/[id]/execute-from-block/route.ts
Normal file
@@ -0,0 +1,305 @@
|
||||
import { db, workflow as workflowTable } from '@sim/db'
|
||||
import { createLogger } from '@sim/logger'
|
||||
import { eq } from 'drizzle-orm'
|
||||
import { type NextRequest, NextResponse } from 'next/server'
|
||||
import { v4 as uuidv4 } from 'uuid'
|
||||
import { z } from 'zod'
|
||||
import { checkHybridAuth } from '@/lib/auth/hybrid'
|
||||
import { generateRequestId } from '@/lib/core/utils/request'
|
||||
import { SSE_HEADERS } from '@/lib/core/utils/sse'
|
||||
import { markExecutionCancelled } from '@/lib/execution/cancellation'
|
||||
import { LoggingSession } from '@/lib/logs/execution/logging-session'
|
||||
import { executeWorkflowCore } from '@/lib/workflows/executor/execution-core'
|
||||
import { type ExecutionEvent, encodeSSEEvent } from '@/lib/workflows/executor/execution-events'
|
||||
import { ExecutionSnapshot } from '@/executor/execution/snapshot'
|
||||
import type { IterationContext, SerializableExecutionState } from '@/executor/execution/types'
|
||||
import type { NormalizedBlockOutput } from '@/executor/types'
|
||||
import { hasExecutionResult } from '@/executor/utils/errors'
|
||||
|
||||
const logger = createLogger('ExecuteFromBlockAPI')
|
||||
|
||||
const ExecuteFromBlockSchema = z.object({
|
||||
startBlockId: z.string().min(1, 'Start block ID is required'),
|
||||
sourceSnapshot: z.object({
|
||||
blockStates: z.record(z.any()),
|
||||
executedBlocks: z.array(z.string()),
|
||||
blockLogs: z.array(z.any()),
|
||||
decisions: z.object({
|
||||
router: z.record(z.string()),
|
||||
condition: z.record(z.string()),
|
||||
}),
|
||||
completedLoops: z.array(z.string()),
|
||||
loopExecutions: z.record(z.any()).optional(),
|
||||
parallelExecutions: z.record(z.any()).optional(),
|
||||
parallelBlockMapping: z.record(z.any()).optional(),
|
||||
activeExecutionPath: z.array(z.string()),
|
||||
}),
|
||||
})
|
||||
|
||||
export const runtime = 'nodejs'
|
||||
export const dynamic = 'force-dynamic'
|
||||
|
||||
export async function POST(req: NextRequest, { params }: { params: Promise<{ id: string }> }) {
|
||||
const requestId = generateRequestId()
|
||||
const { id: workflowId } = await params
|
||||
|
||||
try {
|
||||
const auth = await checkHybridAuth(req, { requireWorkflowId: false })
|
||||
if (!auth.success || !auth.userId) {
|
||||
return NextResponse.json({ error: auth.error || 'Unauthorized' }, { status: 401 })
|
||||
}
|
||||
const userId = auth.userId
|
||||
|
||||
let body: unknown
|
||||
try {
|
||||
body = await req.json()
|
||||
} catch {
|
||||
return NextResponse.json({ error: 'Invalid JSON body' }, { status: 400 })
|
||||
}
|
||||
|
||||
const validation = ExecuteFromBlockSchema.safeParse(body)
|
||||
if (!validation.success) {
|
||||
logger.warn(`[${requestId}] Invalid request body:`, validation.error.errors)
|
||||
return NextResponse.json(
|
||||
{
|
||||
error: 'Invalid request body',
|
||||
details: validation.error.errors.map((e) => ({
|
||||
path: e.path.join('.'),
|
||||
message: e.message,
|
||||
})),
|
||||
},
|
||||
{ status: 400 }
|
||||
)
|
||||
}
|
||||
|
||||
const { startBlockId, sourceSnapshot } = validation.data
|
||||
const executionId = uuidv4()
|
||||
|
||||
const [workflowRecord] = await db
|
||||
.select({ workspaceId: workflowTable.workspaceId, userId: workflowTable.userId })
|
||||
.from(workflowTable)
|
||||
.where(eq(workflowTable.id, workflowId))
|
||||
.limit(1)
|
||||
|
||||
if (!workflowRecord?.workspaceId) {
|
||||
return NextResponse.json({ error: 'Workflow not found or has no workspace' }, { status: 404 })
|
||||
}
|
||||
|
||||
const workspaceId = workflowRecord.workspaceId
|
||||
const workflowUserId = workflowRecord.userId
|
||||
|
||||
logger.info(`[${requestId}] Starting run-from-block execution`, {
|
||||
workflowId,
|
||||
startBlockId,
|
||||
executedBlocksCount: sourceSnapshot.executedBlocks.length,
|
||||
})
|
||||
|
||||
const loggingSession = new LoggingSession(workflowId, executionId, 'manual', requestId)
|
||||
const abortController = new AbortController()
|
||||
let isStreamClosed = false
|
||||
|
||||
const stream = new ReadableStream<Uint8Array>({
|
||||
async start(controller) {
|
||||
const sendEvent = (event: ExecutionEvent) => {
|
||||
if (isStreamClosed) return
|
||||
try {
|
||||
controller.enqueue(encodeSSEEvent(event))
|
||||
} catch {
|
||||
isStreamClosed = true
|
||||
}
|
||||
}
|
||||
|
||||
const snapshot = new ExecutionSnapshot({
|
||||
requestId,
|
||||
workflowId,
|
||||
userId,
|
||||
executionId,
|
||||
triggerType: 'manual',
|
||||
workspaceId,
|
||||
workflowUserId,
|
||||
useDraftState: true,
|
||||
isClientSession: true,
|
||||
})
|
||||
|
||||
try {
|
||||
const startTime = new Date()
|
||||
|
||||
sendEvent({
|
||||
type: 'execution:started',
|
||||
timestamp: startTime.toISOString(),
|
||||
executionId,
|
||||
workflowId,
|
||||
data: { startTime: startTime.toISOString() },
|
||||
})
|
||||
|
||||
const result = await executeWorkflowCore({
|
||||
snapshot,
|
||||
loggingSession,
|
||||
abortSignal: abortController.signal,
|
||||
runFromBlock: {
|
||||
startBlockId,
|
||||
sourceSnapshot: sourceSnapshot as SerializableExecutionState,
|
||||
},
|
||||
callbacks: {
|
||||
onBlockStart: async (
|
||||
blockId: string,
|
||||
blockName: string,
|
||||
blockType: string,
|
||||
iterationContext?: IterationContext
|
||||
) => {
|
||||
sendEvent({
|
||||
type: 'block:started',
|
||||
timestamp: new Date().toISOString(),
|
||||
executionId,
|
||||
workflowId,
|
||||
data: {
|
||||
blockId,
|
||||
blockName,
|
||||
blockType,
|
||||
...(iterationContext && {
|
||||
iterationCurrent: iterationContext.iterationCurrent,
|
||||
iterationTotal: iterationContext.iterationTotal,
|
||||
iterationType: iterationContext.iterationType,
|
||||
}),
|
||||
},
|
||||
})
|
||||
},
|
||||
onBlockComplete: async (
|
||||
blockId: string,
|
||||
blockName: string,
|
||||
blockType: string,
|
||||
callbackData: {
|
||||
input?: unknown
|
||||
output: NormalizedBlockOutput
|
||||
executionTime: number
|
||||
},
|
||||
iterationContext?: IterationContext
|
||||
) => {
|
||||
const hasError = (callbackData.output as any)?.error
|
||||
sendEvent({
|
||||
type: hasError ? 'block:error' : 'block:completed',
|
||||
timestamp: new Date().toISOString(),
|
||||
executionId,
|
||||
workflowId,
|
||||
data: {
|
||||
blockId,
|
||||
blockName,
|
||||
blockType,
|
||||
input: callbackData.input,
|
||||
...(hasError
|
||||
? { error: (callbackData.output as any).error }
|
||||
: { output: callbackData.output }),
|
||||
durationMs: callbackData.executionTime || 0,
|
||||
...(iterationContext && {
|
||||
iterationCurrent: iterationContext.iterationCurrent,
|
||||
iterationTotal: iterationContext.iterationTotal,
|
||||
iterationType: iterationContext.iterationType,
|
||||
}),
|
||||
},
|
||||
})
|
||||
},
|
||||
onStream: async (streamingExecution: unknown) => {
|
||||
const streamingExec = streamingExecution as {
|
||||
stream: ReadableStream
|
||||
execution: any
|
||||
}
|
||||
const blockId = streamingExec.execution?.blockId
|
||||
const reader = streamingExec.stream.getReader()
|
||||
const decoder = new TextDecoder()
|
||||
|
||||
try {
|
||||
while (true) {
|
||||
const { done, value } = await reader.read()
|
||||
if (done) break
|
||||
const chunk = decoder.decode(value, { stream: true })
|
||||
sendEvent({
|
||||
type: 'stream:chunk',
|
||||
timestamp: new Date().toISOString(),
|
||||
executionId,
|
||||
workflowId,
|
||||
data: { blockId, chunk },
|
||||
})
|
||||
}
|
||||
sendEvent({
|
||||
type: 'stream:done',
|
||||
timestamp: new Date().toISOString(),
|
||||
executionId,
|
||||
workflowId,
|
||||
data: { blockId },
|
||||
})
|
||||
} finally {
|
||||
try {
|
||||
reader.releaseLock()
|
||||
} catch {}
|
||||
}
|
||||
},
|
||||
},
|
||||
})
|
||||
|
||||
if (result.status === 'cancelled') {
|
||||
sendEvent({
|
||||
type: 'execution:cancelled',
|
||||
timestamp: new Date().toISOString(),
|
||||
executionId,
|
||||
workflowId,
|
||||
data: { duration: result.metadata?.duration || 0 },
|
||||
})
|
||||
} else {
|
||||
sendEvent({
|
||||
type: 'execution:completed',
|
||||
timestamp: new Date().toISOString(),
|
||||
executionId,
|
||||
workflowId,
|
||||
data: {
|
||||
success: result.success,
|
||||
output: result.output,
|
||||
duration: result.metadata?.duration || 0,
|
||||
startTime: result.metadata?.startTime || startTime.toISOString(),
|
||||
endTime: result.metadata?.endTime || new Date().toISOString(),
|
||||
},
|
||||
})
|
||||
}
|
||||
} catch (error: unknown) {
|
||||
const errorMessage = error instanceof Error ? error.message : 'Unknown error'
|
||||
logger.error(`[${requestId}] Run-from-block execution failed: ${errorMessage}`)
|
||||
|
||||
const executionResult = hasExecutionResult(error) ? error.executionResult : undefined
|
||||
|
||||
sendEvent({
|
||||
type: 'execution:error',
|
||||
timestamp: new Date().toISOString(),
|
||||
executionId,
|
||||
workflowId,
|
||||
data: {
|
||||
error: executionResult?.error || errorMessage,
|
||||
duration: executionResult?.metadata?.duration || 0,
|
||||
},
|
||||
})
|
||||
} finally {
|
||||
if (!isStreamClosed) {
|
||||
try {
|
||||
controller.enqueue(new TextEncoder().encode('data: [DONE]\n\n'))
|
||||
controller.close()
|
||||
} catch {}
|
||||
}
|
||||
}
|
||||
},
|
||||
cancel() {
|
||||
isStreamClosed = true
|
||||
abortController.abort()
|
||||
markExecutionCancelled(executionId).catch(() => {})
|
||||
},
|
||||
})
|
||||
|
||||
return new NextResponse(stream, {
|
||||
headers: { ...SSE_HEADERS, 'X-Execution-Id': executionId },
|
||||
})
|
||||
} catch (error: unknown) {
|
||||
const errorMessage = error instanceof Error ? error.message : 'Unknown error'
|
||||
logger.error(`[${requestId}] Failed to start run-from-block execution:`, error)
|
||||
return NextResponse.json(
|
||||
{ error: errorMessage || 'Failed to start execution' },
|
||||
{ status: 500 }
|
||||
)
|
||||
}
|
||||
}
|
||||
@@ -53,6 +53,7 @@ const ExecuteWorkflowSchema = z.object({
|
||||
parallels: z.record(z.any()).optional(),
|
||||
})
|
||||
.optional(),
|
||||
stopAfterBlockId: z.string().optional(),
|
||||
})
|
||||
|
||||
export const runtime = 'nodejs'
|
||||
@@ -222,6 +223,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
|
||||
includeFileBase64,
|
||||
base64MaxBytes,
|
||||
workflowStateOverride,
|
||||
stopAfterBlockId,
|
||||
} = validation.data
|
||||
|
||||
// For API key and internal JWT auth, the entire body is the input (except for our control fields)
|
||||
@@ -237,6 +239,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
|
||||
includeFileBase64,
|
||||
base64MaxBytes,
|
||||
workflowStateOverride,
|
||||
stopAfterBlockId: _stopAfterBlockId,
|
||||
workflowId: _workflowId, // Also exclude workflowId used for internal JWT auth
|
||||
...rest
|
||||
} = body
|
||||
@@ -434,6 +437,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
|
||||
loggingSession,
|
||||
includeFileBase64,
|
||||
base64MaxBytes,
|
||||
stopAfterBlockId,
|
||||
})
|
||||
|
||||
const outputWithBase64 = includeFileBase64
|
||||
@@ -722,6 +726,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
|
||||
abortSignal: abortController.signal,
|
||||
includeFileBase64,
|
||||
base64MaxBytes,
|
||||
stopAfterBlockId,
|
||||
})
|
||||
|
||||
if (result.status === 'paused') {
|
||||
|
||||
@@ -1,11 +1,13 @@
|
||||
import { memo, useCallback } from 'react'
|
||||
import { ArrowLeftRight, ArrowUpDown, Circle, CircleOff, LogOut } from 'lucide-react'
|
||||
import { ArrowLeftRight, ArrowUpDown, Circle, CircleOff, LogOut, Play } from 'lucide-react'
|
||||
import { Button, Copy, Tooltip, Trash2 } from '@/components/emcn'
|
||||
import { cn } from '@/lib/core/utils/cn'
|
||||
import { isInputDefinitionTrigger } from '@/lib/workflows/triggers/input-definition-triggers'
|
||||
import { useUserPermissionsContext } from '@/app/workspace/[workspaceId]/providers/workspace-permissions-provider'
|
||||
import { useWorkflowExecution } from '@/app/workspace/[workspaceId]/w/[workflowId]/hooks'
|
||||
import { validateTriggerPaste } from '@/app/workspace/[workspaceId]/w/[workflowId]/utils'
|
||||
import { useCollaborativeWorkflow } from '@/hooks/use-collaborative-workflow'
|
||||
import { useExecutionStore } from '@/stores/execution'
|
||||
import { useNotificationStore } from '@/stores/notifications'
|
||||
import { useWorkflowRegistry } from '@/stores/workflows/registry/store'
|
||||
import { useWorkflowStore } from '@/stores/workflows/workflow/store'
|
||||
@@ -49,6 +51,7 @@ export const ActionBar = memo(
|
||||
collaborativeBatchToggleBlockHandles,
|
||||
} = useCollaborativeWorkflow()
|
||||
const { setPendingSelection } = useWorkflowRegistry()
|
||||
const { handleRunFromBlock } = useWorkflowExecution()
|
||||
|
||||
const addNotification = useNotificationStore((s) => s.addNotification)
|
||||
|
||||
@@ -97,12 +100,30 @@ export const ActionBar = memo(
|
||||
)
|
||||
)
|
||||
|
||||
const { activeWorkflowId } = useWorkflowRegistry()
|
||||
const { isExecuting, getLastExecutionSnapshot } = useExecutionStore()
|
||||
const userPermissions = useUserPermissionsContext()
|
||||
const edges = useWorkflowStore((state) => state.edges)
|
||||
|
||||
const isStartBlock = isInputDefinitionTrigger(blockType)
|
||||
const isResponseBlock = blockType === 'response'
|
||||
const isNoteBlock = blockType === 'note'
|
||||
const isSubflowBlock = blockType === 'loop' || blockType === 'parallel'
|
||||
const isInsideSubflow = parentId && (parentType === 'loop' || parentType === 'parallel')
|
||||
|
||||
const snapshot = activeWorkflowId ? getLastExecutionSnapshot(activeWorkflowId) : null
|
||||
const incomingEdges = edges.filter((edge) => edge.target === blockId)
|
||||
const isTriggerBlock = incomingEdges.length === 0
|
||||
const dependenciesSatisfied =
|
||||
isTriggerBlock ||
|
||||
(snapshot && incomingEdges.every((edge) => snapshot.executedBlocks.includes(edge.source)))
|
||||
const canRunFromBlock =
|
||||
dependenciesSatisfied && !isNoteBlock && !isInsideSubflow && !isExecuting
|
||||
|
||||
const handleRunFromBlockClick = useCallback(() => {
|
||||
if (!activeWorkflowId || !canRunFromBlock) return
|
||||
handleRunFromBlock(blockId, activeWorkflowId)
|
||||
}, [blockId, activeWorkflowId, canRunFromBlock, handleRunFromBlock])
|
||||
|
||||
/**
|
||||
* Get appropriate tooltip message based on disabled state
|
||||
@@ -135,23 +156,29 @@ export const ActionBar = memo(
|
||||
variant='ghost'
|
||||
onClick={(e) => {
|
||||
e.stopPropagation()
|
||||
if (!disabled) {
|
||||
collaborativeBatchToggleBlockEnabled([blockId])
|
||||
if (canRunFromBlock && !disabled) {
|
||||
handleRunFromBlockClick()
|
||||
}
|
||||
}}
|
||||
className={ACTION_BUTTON_STYLES}
|
||||
disabled={disabled}
|
||||
disabled={disabled || !canRunFromBlock}
|
||||
>
|
||||
{isEnabled ? <Circle className={ICON_SIZE} /> : <CircleOff className={ICON_SIZE} />}
|
||||
<Play className={ICON_SIZE} />
|
||||
</Button>
|
||||
</Tooltip.Trigger>
|
||||
<Tooltip.Content side='top'>
|
||||
{getTooltipMessage(isEnabled ? 'Disable Block' : 'Enable Block')}
|
||||
{(() => {
|
||||
if (disabled) return getTooltipMessage('Run from this block')
|
||||
if (isExecuting) return 'Execution in progress'
|
||||
if (isInsideSubflow) return 'Cannot run from inside subflow'
|
||||
if (!dependenciesSatisfied) return 'Run upstream blocks first'
|
||||
return 'Run from this block'
|
||||
})()}
|
||||
</Tooltip.Content>
|
||||
</Tooltip.Root>
|
||||
)}
|
||||
|
||||
{isSubflowBlock && (
|
||||
{!isNoteBlock && (
|
||||
<Tooltip.Root>
|
||||
<Tooltip.Trigger asChild>
|
||||
<Button
|
||||
|
||||
@@ -40,9 +40,16 @@ export interface BlockMenuProps {
|
||||
onRemoveFromSubflow: () => void
|
||||
onOpenEditor: () => void
|
||||
onRename: () => void
|
||||
onRunFromBlock?: () => void
|
||||
onRunUntilBlock?: () => void
|
||||
hasClipboard?: boolean
|
||||
showRemoveFromSubflow?: boolean
|
||||
/** Whether run from block is available (has snapshot, was executed, not inside subflow) */
|
||||
canRunFromBlock?: boolean
|
||||
/** Reason why run from block is disabled (for tooltip) */
|
||||
runFromBlockDisabledReason?: string
|
||||
disableEdit?: boolean
|
||||
isExecuting?: boolean
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -65,9 +72,14 @@ export function BlockMenu({
|
||||
onRemoveFromSubflow,
|
||||
onOpenEditor,
|
||||
onRename,
|
||||
onRunFromBlock,
|
||||
onRunUntilBlock,
|
||||
hasClipboard = false,
|
||||
showRemoveFromSubflow = false,
|
||||
canRunFromBlock = false,
|
||||
runFromBlockDisabledReason,
|
||||
disableEdit = false,
|
||||
isExecuting = false,
|
||||
}: BlockMenuProps) {
|
||||
const isSingleBlock = selectedBlocks.length === 1
|
||||
|
||||
@@ -203,6 +215,39 @@ export function BlockMenu({
|
||||
</PopoverItem>
|
||||
)}
|
||||
|
||||
{/* Run from/until block - only for single non-note block selection */}
|
||||
{isSingleBlock && !allNoteBlocks && (
|
||||
<>
|
||||
<PopoverDivider />
|
||||
<PopoverItem
|
||||
disabled={!canRunFromBlock || isExecuting}
|
||||
onClick={() => {
|
||||
if (canRunFromBlock && !isExecuting) {
|
||||
onRunFromBlock?.()
|
||||
onClose()
|
||||
}
|
||||
}}
|
||||
>
|
||||
{isExecuting
|
||||
? 'Execution in progress...'
|
||||
: !canRunFromBlock && runFromBlockDisabledReason
|
||||
? runFromBlockDisabledReason
|
||||
: 'Run from this block'}
|
||||
</PopoverItem>
|
||||
<PopoverItem
|
||||
disabled={isExecuting}
|
||||
onClick={() => {
|
||||
if (!isExecuting) {
|
||||
onRunUntilBlock?.()
|
||||
onClose()
|
||||
}
|
||||
}}
|
||||
>
|
||||
{isExecuting ? 'Execution in progress...' : 'Run until this block'}
|
||||
</PopoverItem>
|
||||
</>
|
||||
)}
|
||||
|
||||
{/* Destructive action */}
|
||||
<PopoverDivider />
|
||||
<PopoverItem
|
||||
|
||||
@@ -15,7 +15,8 @@ import {
|
||||
TriggerUtils,
|
||||
} from '@/lib/workflows/triggers/triggers'
|
||||
import { useCurrentWorkflow } from '@/app/workspace/[workspaceId]/w/[workflowId]/hooks/use-current-workflow'
|
||||
import type { BlockLog, ExecutionResult, StreamingExecution } from '@/executor/types'
|
||||
import type { SerializableExecutionState } from '@/executor/execution/types'
|
||||
import type { BlockLog, BlockState, ExecutionResult, StreamingExecution } from '@/executor/types'
|
||||
import { hasExecutionResult } from '@/executor/utils/errors'
|
||||
import { coerceValue } from '@/executor/utils/start-block'
|
||||
import { subscriptionKeys } from '@/hooks/queries/subscription'
|
||||
@@ -98,6 +99,8 @@ export function useWorkflowExecution() {
|
||||
setActiveBlocks,
|
||||
setBlockRunStatus,
|
||||
setEdgeRunStatus,
|
||||
setLastExecutionSnapshot,
|
||||
getLastExecutionSnapshot,
|
||||
} = useExecutionStore()
|
||||
const [executionResult, setExecutionResult] = useState<ExecutionResult | null>(null)
|
||||
const executionStream = useExecutionStream()
|
||||
@@ -668,7 +671,8 @@ export function useWorkflowExecution() {
|
||||
onStream?: (se: StreamingExecution) => Promise<void>,
|
||||
executionId?: string,
|
||||
onBlockComplete?: (blockId: string, output: any) => Promise<void>,
|
||||
overrideTriggerType?: 'chat' | 'manual' | 'api'
|
||||
overrideTriggerType?: 'chat' | 'manual' | 'api',
|
||||
stopAfterBlockId?: string
|
||||
): Promise<ExecutionResult | StreamingExecution> => {
|
||||
// Use diff workflow for execution when available, regardless of canvas view state
|
||||
const executionWorkflowState = null as {
|
||||
@@ -876,6 +880,8 @@ export function useWorkflowExecution() {
|
||||
const activeBlocksSet = new Set<string>()
|
||||
const streamedContent = new Map<string, string>()
|
||||
const accumulatedBlockLogs: BlockLog[] = []
|
||||
const accumulatedBlockStates = new Map<string, BlockState>()
|
||||
const executedBlockIds = new Set<string>()
|
||||
|
||||
// Execute the workflow
|
||||
try {
|
||||
@@ -887,6 +893,7 @@ export function useWorkflowExecution() {
|
||||
triggerType: overrideTriggerType || 'manual',
|
||||
useDraftState: true,
|
||||
isClientSession: true,
|
||||
stopAfterBlockId,
|
||||
workflowStateOverride: executionWorkflowState
|
||||
? {
|
||||
blocks: executionWorkflowState.blocks,
|
||||
@@ -916,18 +923,22 @@ export function useWorkflowExecution() {
|
||||
logger.info('onBlockCompleted received:', { data })
|
||||
|
||||
activeBlocksSet.delete(data.blockId)
|
||||
// Create a new Set to trigger React re-render
|
||||
setActiveBlocks(new Set(activeBlocksSet))
|
||||
|
||||
// Track successful block execution in run path
|
||||
setBlockRunStatus(data.blockId, 'success')
|
||||
|
||||
// Edges already tracked in onBlockStarted, no need to track again
|
||||
executedBlockIds.add(data.blockId)
|
||||
accumulatedBlockStates.set(data.blockId, {
|
||||
output: data.output,
|
||||
executed: true,
|
||||
executionTime: data.durationMs,
|
||||
})
|
||||
|
||||
const isContainerBlock = data.blockType === 'loop' || data.blockType === 'parallel'
|
||||
if (isContainerBlock) return
|
||||
|
||||
const startedAt = new Date(Date.now() - data.durationMs).toISOString()
|
||||
const endedAt = new Date().toISOString()
|
||||
|
||||
// Accumulate block log for the execution result
|
||||
accumulatedBlockLogs.push({
|
||||
blockId: data.blockId,
|
||||
blockName: data.blockName || 'Unknown Block',
|
||||
@@ -1056,6 +1067,48 @@ export function useWorkflowExecution() {
|
||||
},
|
||||
logs: accumulatedBlockLogs,
|
||||
}
|
||||
|
||||
if (data.success && activeWorkflowId) {
|
||||
if (stopAfterBlockId) {
|
||||
const existingSnapshot = getLastExecutionSnapshot(activeWorkflowId)
|
||||
const mergedBlockStates = {
|
||||
...(existingSnapshot?.blockStates || {}),
|
||||
...Object.fromEntries(accumulatedBlockStates),
|
||||
}
|
||||
const mergedExecutedBlocks = new Set([
|
||||
...(existingSnapshot?.executedBlocks || []),
|
||||
...executedBlockIds,
|
||||
])
|
||||
const snapshot: SerializableExecutionState = {
|
||||
blockStates: mergedBlockStates,
|
||||
executedBlocks: Array.from(mergedExecutedBlocks),
|
||||
blockLogs: [...(existingSnapshot?.blockLogs || []), ...accumulatedBlockLogs],
|
||||
decisions: existingSnapshot?.decisions || { router: {}, condition: {} },
|
||||
completedLoops: existingSnapshot?.completedLoops || [],
|
||||
activeExecutionPath: Array.from(mergedExecutedBlocks),
|
||||
}
|
||||
setLastExecutionSnapshot(activeWorkflowId, snapshot)
|
||||
logger.info('Merged execution snapshot after run-until-block', {
|
||||
workflowId: activeWorkflowId,
|
||||
newBlocksExecuted: executedBlockIds.size,
|
||||
totalExecutedBlocks: mergedExecutedBlocks.size,
|
||||
})
|
||||
} else {
|
||||
const snapshot: SerializableExecutionState = {
|
||||
blockStates: Object.fromEntries(accumulatedBlockStates),
|
||||
executedBlocks: Array.from(executedBlockIds),
|
||||
blockLogs: accumulatedBlockLogs,
|
||||
decisions: { router: {}, condition: {} },
|
||||
completedLoops: [],
|
||||
activeExecutionPath: Array.from(executedBlockIds),
|
||||
}
|
||||
setLastExecutionSnapshot(activeWorkflowId, snapshot)
|
||||
logger.info('Stored execution snapshot for run-from-block', {
|
||||
workflowId: activeWorkflowId,
|
||||
executedBlocksCount: executedBlockIds.size,
|
||||
})
|
||||
}
|
||||
}
|
||||
},
|
||||
|
||||
onExecutionError: (data) => {
|
||||
@@ -1376,6 +1429,265 @@ export function useWorkflowExecution() {
|
||||
setActiveBlocks,
|
||||
])
|
||||
|
||||
/**
|
||||
* Handles running workflow from a specific block using cached outputs
|
||||
*/
|
||||
const handleRunFromBlock = useCallback(
|
||||
async (blockId: string, workflowId: string) => {
|
||||
const snapshot = getLastExecutionSnapshot(workflowId)
|
||||
const workflowEdges = useWorkflowStore.getState().edges
|
||||
const incomingEdges = workflowEdges.filter((edge) => edge.target === blockId)
|
||||
const isTriggerBlock = incomingEdges.length === 0
|
||||
|
||||
if (!snapshot && !isTriggerBlock) {
|
||||
logger.error('No execution snapshot available for run-from-block', { workflowId, blockId })
|
||||
return
|
||||
}
|
||||
|
||||
const dependenciesSatisfied =
|
||||
isTriggerBlock ||
|
||||
(snapshot && incomingEdges.every((edge) => snapshot.executedBlocks.includes(edge.source)))
|
||||
|
||||
if (!dependenciesSatisfied) {
|
||||
logger.error('Upstream dependencies not satisfied for run-from-block', {
|
||||
workflowId,
|
||||
blockId,
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
// For trigger blocks with no snapshot, create an empty one
|
||||
const effectiveSnapshot: SerializableExecutionState = snapshot || {
|
||||
blockStates: {},
|
||||
executedBlocks: [],
|
||||
blockLogs: [],
|
||||
decisions: { router: {}, condition: {} },
|
||||
completedLoops: [],
|
||||
activeExecutionPath: [],
|
||||
}
|
||||
|
||||
logger.info('Starting run-from-block execution', {
|
||||
workflowId,
|
||||
startBlockId: blockId,
|
||||
isTriggerBlock,
|
||||
})
|
||||
|
||||
setIsExecuting(true)
|
||||
const executionId = uuidv4()
|
||||
const accumulatedBlockLogs: BlockLog[] = []
|
||||
const accumulatedBlockStates = new Map<string, BlockState>()
|
||||
const executedBlockIds = new Set<string>()
|
||||
const activeBlocksSet = new Set<string>()
|
||||
|
||||
try {
|
||||
await executionStream.executeFromBlock({
|
||||
workflowId,
|
||||
startBlockId: blockId,
|
||||
sourceSnapshot: effectiveSnapshot,
|
||||
callbacks: {
|
||||
onExecutionStarted: (data) => {
|
||||
logger.info('Run-from-block execution started:', data)
|
||||
},
|
||||
|
||||
onBlockStarted: (data) => {
|
||||
activeBlocksSet.add(data.blockId)
|
||||
setActiveBlocks(new Set(activeBlocksSet))
|
||||
|
||||
const incomingEdges = workflowEdges.filter((edge) => edge.target === data.blockId)
|
||||
incomingEdges.forEach((edge) => {
|
||||
setEdgeRunStatus(edge.id, 'success')
|
||||
})
|
||||
},
|
||||
|
||||
onBlockCompleted: (data) => {
|
||||
activeBlocksSet.delete(data.blockId)
|
||||
setActiveBlocks(new Set(activeBlocksSet))
|
||||
|
||||
setBlockRunStatus(data.blockId, 'success')
|
||||
|
||||
executedBlockIds.add(data.blockId)
|
||||
accumulatedBlockStates.set(data.blockId, {
|
||||
output: data.output,
|
||||
executed: true,
|
||||
executionTime: data.durationMs,
|
||||
})
|
||||
|
||||
const isContainerBlock = data.blockType === 'loop' || data.blockType === 'parallel'
|
||||
if (isContainerBlock) return
|
||||
|
||||
const startedAt = new Date(Date.now() - data.durationMs).toISOString()
|
||||
const endedAt = new Date().toISOString()
|
||||
|
||||
accumulatedBlockLogs.push({
|
||||
blockId: data.blockId,
|
||||
blockName: data.blockName || 'Unknown Block',
|
||||
blockType: data.blockType || 'unknown',
|
||||
input: data.input || {},
|
||||
output: data.output,
|
||||
success: true,
|
||||
durationMs: data.durationMs,
|
||||
startedAt,
|
||||
endedAt,
|
||||
})
|
||||
|
||||
addConsole({
|
||||
input: data.input || {},
|
||||
output: data.output,
|
||||
success: true,
|
||||
durationMs: data.durationMs,
|
||||
startedAt,
|
||||
endedAt,
|
||||
workflowId,
|
||||
blockId: data.blockId,
|
||||
executionId,
|
||||
blockName: data.blockName || 'Unknown Block',
|
||||
blockType: data.blockType || 'unknown',
|
||||
iterationCurrent: data.iterationCurrent,
|
||||
iterationTotal: data.iterationTotal,
|
||||
iterationType: data.iterationType,
|
||||
})
|
||||
},
|
||||
|
||||
onBlockError: (data) => {
|
||||
activeBlocksSet.delete(data.blockId)
|
||||
setActiveBlocks(new Set(activeBlocksSet))
|
||||
|
||||
setBlockRunStatus(data.blockId, 'error')
|
||||
|
||||
const startedAt = new Date(Date.now() - data.durationMs).toISOString()
|
||||
const endedAt = new Date().toISOString()
|
||||
|
||||
accumulatedBlockLogs.push({
|
||||
blockId: data.blockId,
|
||||
blockName: data.blockName || 'Unknown Block',
|
||||
blockType: data.blockType || 'unknown',
|
||||
input: data.input || {},
|
||||
output: {},
|
||||
success: false,
|
||||
error: data.error,
|
||||
durationMs: data.durationMs,
|
||||
startedAt,
|
||||
endedAt,
|
||||
})
|
||||
|
||||
addConsole({
|
||||
input: data.input || {},
|
||||
output: {},
|
||||
success: false,
|
||||
error: data.error,
|
||||
durationMs: data.durationMs,
|
||||
startedAt,
|
||||
endedAt,
|
||||
workflowId,
|
||||
blockId: data.blockId,
|
||||
executionId,
|
||||
blockName: data.blockName,
|
||||
blockType: data.blockType,
|
||||
iterationCurrent: data.iterationCurrent,
|
||||
iterationTotal: data.iterationTotal,
|
||||
iterationType: data.iterationType,
|
||||
})
|
||||
},
|
||||
|
||||
onExecutionCompleted: (data) => {
|
||||
if (data.success) {
|
||||
const mergedBlockStates: Record<string, BlockState> = {
|
||||
...effectiveSnapshot.blockStates,
|
||||
}
|
||||
for (const [bId, state] of accumulatedBlockStates) {
|
||||
mergedBlockStates[bId] = state
|
||||
}
|
||||
|
||||
const mergedExecutedBlocks = new Set([
|
||||
...effectiveSnapshot.executedBlocks,
|
||||
...executedBlockIds,
|
||||
])
|
||||
|
||||
const updatedSnapshot: SerializableExecutionState = {
|
||||
...effectiveSnapshot,
|
||||
blockStates: mergedBlockStates,
|
||||
executedBlocks: Array.from(mergedExecutedBlocks),
|
||||
blockLogs: [...effectiveSnapshot.blockLogs, ...accumulatedBlockLogs],
|
||||
activeExecutionPath: Array.from(mergedExecutedBlocks),
|
||||
}
|
||||
setLastExecutionSnapshot(workflowId, updatedSnapshot)
|
||||
logger.info('Updated execution snapshot after run-from-block', {
|
||||
workflowId,
|
||||
newBlocksExecuted: executedBlockIds.size,
|
||||
})
|
||||
}
|
||||
},
|
||||
|
||||
onExecutionError: (data) => {
|
||||
logger.error('Run-from-block execution error:', data.error)
|
||||
},
|
||||
|
||||
onExecutionCancelled: () => {
|
||||
logger.info('Run-from-block execution cancelled')
|
||||
},
|
||||
},
|
||||
})
|
||||
} catch (error) {
|
||||
if ((error as Error).name !== 'AbortError') {
|
||||
logger.error('Run-from-block execution failed:', error)
|
||||
}
|
||||
} finally {
|
||||
setIsExecuting(false)
|
||||
setActiveBlocks(new Set())
|
||||
}
|
||||
},
|
||||
[
|
||||
getLastExecutionSnapshot,
|
||||
setLastExecutionSnapshot,
|
||||
setIsExecuting,
|
||||
setActiveBlocks,
|
||||
setBlockRunStatus,
|
||||
setEdgeRunStatus,
|
||||
addConsole,
|
||||
executionStream,
|
||||
]
|
||||
)
|
||||
|
||||
/**
|
||||
* Handles running workflow until a specific block (stops after that block completes)
|
||||
*/
|
||||
const handleRunUntilBlock = useCallback(
|
||||
async (blockId: string, workflowId: string) => {
|
||||
if (!workflowId || workflowId !== activeWorkflowId) {
|
||||
logger.error('Invalid workflow ID for run-until-block', { workflowId, activeWorkflowId })
|
||||
return
|
||||
}
|
||||
|
||||
logger.info('Starting run-until-block execution', { workflowId, stopAfterBlockId: blockId })
|
||||
|
||||
setExecutionResult(null)
|
||||
setIsExecuting(true)
|
||||
|
||||
const executionId = uuidv4()
|
||||
try {
|
||||
const result = await executeWorkflow(
|
||||
undefined,
|
||||
undefined,
|
||||
executionId,
|
||||
undefined,
|
||||
'manual',
|
||||
blockId
|
||||
)
|
||||
if (result && 'success' in result) {
|
||||
setExecutionResult(result)
|
||||
}
|
||||
} catch (error) {
|
||||
const errorResult = handleExecutionError(error, { executionId })
|
||||
return errorResult
|
||||
} finally {
|
||||
setIsExecuting(false)
|
||||
setIsDebugging(false)
|
||||
setActiveBlocks(new Set())
|
||||
}
|
||||
},
|
||||
[activeWorkflowId, setExecutionResult, setIsExecuting, setIsDebugging, setActiveBlocks]
|
||||
)
|
||||
|
||||
return {
|
||||
isExecuting,
|
||||
isDebugging,
|
||||
@@ -1386,5 +1698,7 @@ export function useWorkflowExecution() {
|
||||
handleResumeDebug,
|
||||
handleCancelDebug,
|
||||
handleCancelExecution,
|
||||
handleRunFromBlock,
|
||||
handleRunUntilBlock,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -47,6 +47,7 @@ import {
|
||||
useCurrentWorkflow,
|
||||
useNodeUtilities,
|
||||
useShiftSelectionLock,
|
||||
useWorkflowExecution,
|
||||
} from '@/app/workspace/[workspaceId]/w/[workflowId]/hooks'
|
||||
import {
|
||||
calculateContainerDimensions,
|
||||
@@ -325,6 +326,8 @@ const WorkflowContent = React.memo(() => {
|
||||
|
||||
const showTrainingModal = useCopilotTrainingStore((state) => state.showModal)
|
||||
|
||||
const { handleRunFromBlock, handleRunUntilBlock } = useWorkflowExecution()
|
||||
|
||||
const snapToGridSize = useSnapToGridSize()
|
||||
const snapToGrid = snapToGridSize > 0
|
||||
|
||||
@@ -758,13 +761,16 @@ const WorkflowContent = React.memo(() => {
|
||||
[collaborativeBatchAddBlocks, setSelectedEdges, setPendingSelection]
|
||||
)
|
||||
|
||||
const { activeBlockIds, pendingBlocks, isDebugging } = useExecutionStore(
|
||||
useShallow((state) => ({
|
||||
activeBlockIds: state.activeBlockIds,
|
||||
pendingBlocks: state.pendingBlocks,
|
||||
isDebugging: state.isDebugging,
|
||||
}))
|
||||
)
|
||||
const { activeBlockIds, pendingBlocks, isDebugging, isExecuting, getLastExecutionSnapshot } =
|
||||
useExecutionStore(
|
||||
useShallow((state) => ({
|
||||
activeBlockIds: state.activeBlockIds,
|
||||
pendingBlocks: state.pendingBlocks,
|
||||
isDebugging: state.isDebugging,
|
||||
isExecuting: state.isExecuting,
|
||||
getLastExecutionSnapshot: state.getLastExecutionSnapshot,
|
||||
}))
|
||||
)
|
||||
|
||||
const [dragStartParentId, setDragStartParentId] = useState<string | null>(null)
|
||||
|
||||
@@ -988,6 +994,41 @@ const WorkflowContent = React.memo(() => {
|
||||
}
|
||||
}, [contextMenuBlocks])
|
||||
|
||||
const handleContextRunFromBlock = useCallback(() => {
|
||||
if (contextMenuBlocks.length !== 1) return
|
||||
const blockId = contextMenuBlocks[0].id
|
||||
handleRunFromBlock(blockId, workflowIdParam)
|
||||
}, [contextMenuBlocks, workflowIdParam, handleRunFromBlock])
|
||||
|
||||
const handleContextRunUntilBlock = useCallback(() => {
|
||||
if (contextMenuBlocks.length !== 1) return
|
||||
const blockId = contextMenuBlocks[0].id
|
||||
handleRunUntilBlock(blockId, workflowIdParam)
|
||||
}, [contextMenuBlocks, workflowIdParam, handleRunUntilBlock])
|
||||
|
||||
const runFromBlockState = useMemo(() => {
|
||||
if (contextMenuBlocks.length !== 1) {
|
||||
return { canRun: false, reason: undefined }
|
||||
}
|
||||
const block = contextMenuBlocks[0]
|
||||
const snapshot = getLastExecutionSnapshot(workflowIdParam)
|
||||
const incomingEdges = edges.filter((edge) => edge.target === block.id)
|
||||
const isTriggerBlock = incomingEdges.length === 0
|
||||
const dependenciesSatisfied =
|
||||
isTriggerBlock ||
|
||||
(snapshot && incomingEdges.every((edge) => snapshot.executedBlocks.includes(edge.source)))
|
||||
const isNoteBlock = block.type === 'note'
|
||||
const isInsideSubflow =
|
||||
block.parentId && (block.parentType === 'loop' || block.parentType === 'parallel')
|
||||
|
||||
if (isInsideSubflow) return { canRun: false, reason: 'Cannot run from inside subflow' }
|
||||
if (!dependenciesSatisfied) return { canRun: false, reason: 'Run upstream blocks first' }
|
||||
if (isNoteBlock) return { canRun: false, reason: undefined }
|
||||
if (isExecuting) return { canRun: false, reason: undefined }
|
||||
|
||||
return { canRun: true, reason: undefined }
|
||||
}, [contextMenuBlocks, edges, workflowIdParam, getLastExecutionSnapshot, isExecuting])
|
||||
|
||||
const handleContextAddBlock = useCallback(() => {
|
||||
useSearchModalStore.getState().open()
|
||||
}, [])
|
||||
@@ -3308,11 +3349,16 @@ const WorkflowContent = React.memo(() => {
|
||||
onRemoveFromSubflow={handleContextRemoveFromSubflow}
|
||||
onOpenEditor={handleContextOpenEditor}
|
||||
onRename={handleContextRename}
|
||||
onRunFromBlock={handleContextRunFromBlock}
|
||||
onRunUntilBlock={handleContextRunUntilBlock}
|
||||
hasClipboard={hasClipboard()}
|
||||
showRemoveFromSubflow={contextMenuBlocks.some(
|
||||
(b) => b.parentId && (b.parentType === 'loop' || b.parentType === 'parallel')
|
||||
)}
|
||||
canRunFromBlock={runFromBlockState.canRun}
|
||||
runFromBlockDisabledReason={runFromBlockState.reason}
|
||||
disableEdit={!effectivePermissions.canEdit}
|
||||
isExecuting={isExecuting}
|
||||
/>
|
||||
|
||||
<CanvasMenu
|
||||
|
||||
@@ -26,6 +26,7 @@ export class ExecutionEngine {
|
||||
private allowResumeTriggers: boolean
|
||||
private cancelledFlag = false
|
||||
private errorFlag = false
|
||||
private stoppedEarlyFlag = false
|
||||
private executionError: Error | null = null
|
||||
private lastCancellationCheck = 0
|
||||
private readonly useRedisCancellation: boolean
|
||||
@@ -105,7 +106,7 @@ export class ExecutionEngine {
|
||||
this.initializeQueue(triggerBlockId)
|
||||
|
||||
while (this.hasWork()) {
|
||||
if ((await this.checkCancellation()) || this.errorFlag) {
|
||||
if ((await this.checkCancellation()) || this.errorFlag || this.stoppedEarlyFlag) {
|
||||
break
|
||||
}
|
||||
await this.processQueue()
|
||||
@@ -259,6 +260,16 @@ export class ExecutionEngine {
|
||||
}
|
||||
|
||||
private initializeQueue(triggerBlockId?: string): void {
|
||||
if (this.context.runFromBlockContext) {
|
||||
const { startBlockId } = this.context.runFromBlockContext
|
||||
logger.info('Initializing queue for run-from-block mode', {
|
||||
startBlockId,
|
||||
dirtySetSize: this.context.runFromBlockContext.dirtySet.size,
|
||||
})
|
||||
this.addToQueue(startBlockId)
|
||||
return
|
||||
}
|
||||
|
||||
const pendingBlocks = this.context.metadata.pendingBlocks
|
||||
const remainingEdges = (this.context.metadata as any).remainingEdges
|
||||
|
||||
@@ -385,6 +396,12 @@ export class ExecutionEngine {
|
||||
this.finalOutput = output
|
||||
}
|
||||
|
||||
if (this.context.stopAfterBlockId === nodeId) {
|
||||
logger.info('Stopping execution after target block', { nodeId })
|
||||
this.stoppedEarlyFlag = true
|
||||
return
|
||||
}
|
||||
|
||||
const readyNodes = this.edgeManager.processOutgoingEdges(node, output, false)
|
||||
|
||||
logger.info('Processing outgoing edges', {
|
||||
|
||||
@@ -5,12 +5,22 @@ import { BlockExecutor } from '@/executor/execution/block-executor'
|
||||
import { EdgeManager } from '@/executor/execution/edge-manager'
|
||||
import { ExecutionEngine } from '@/executor/execution/engine'
|
||||
import { ExecutionState } from '@/executor/execution/state'
|
||||
import type { ContextExtensions, WorkflowInput } from '@/executor/execution/types'
|
||||
import type {
|
||||
ContextExtensions,
|
||||
SerializableExecutionState,
|
||||
WorkflowInput,
|
||||
} from '@/executor/execution/types'
|
||||
import { createBlockHandlers } from '@/executor/handlers/registry'
|
||||
import { LoopOrchestrator } from '@/executor/orchestrators/loop'
|
||||
import { NodeExecutionOrchestrator } from '@/executor/orchestrators/node'
|
||||
import { ParallelOrchestrator } from '@/executor/orchestrators/parallel'
|
||||
import type { BlockState, ExecutionContext, ExecutionResult } from '@/executor/types'
|
||||
import {
|
||||
computeDirtySet,
|
||||
type RunFromBlockContext,
|
||||
resolveContainerToSentinelStart,
|
||||
validateRunFromBlock,
|
||||
} from '@/executor/utils/run-from-block'
|
||||
import {
|
||||
buildResolutionFromBlock,
|
||||
buildStartBlockOutput,
|
||||
@@ -89,17 +99,108 @@ export class DAGExecutor {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Execute from a specific block using cached outputs for upstream blocks.
|
||||
*/
|
||||
async executeFromBlock(
|
||||
workflowId: string,
|
||||
startBlockId: string,
|
||||
sourceSnapshot: SerializableExecutionState
|
||||
): Promise<ExecutionResult> {
|
||||
const dag = this.dagBuilder.build(this.workflow)
|
||||
|
||||
const executedBlocks = new Set(sourceSnapshot.executedBlocks)
|
||||
const validation = validateRunFromBlock(startBlockId, dag, executedBlocks)
|
||||
if (!validation.valid) {
|
||||
throw new Error(validation.error)
|
||||
}
|
||||
|
||||
const dirtySet = computeDirtySet(dag, startBlockId)
|
||||
const effectiveStartBlockId = resolveContainerToSentinelStart(startBlockId, dag) ?? startBlockId
|
||||
|
||||
logger.info('Executing from block', {
|
||||
workflowId,
|
||||
startBlockId,
|
||||
effectiveStartBlockId,
|
||||
dirtySetSize: dirtySet.size,
|
||||
totalBlocks: dag.nodes.size,
|
||||
dirtyBlocks: Array.from(dirtySet),
|
||||
})
|
||||
|
||||
// Remove incoming edges from non-dirty sources so convergent blocks don't wait for cached upstream
|
||||
for (const nodeId of dirtySet) {
|
||||
const node = dag.nodes.get(nodeId)
|
||||
if (!node) continue
|
||||
|
||||
const nonDirtyIncoming: string[] = []
|
||||
for (const sourceId of node.incomingEdges) {
|
||||
if (!dirtySet.has(sourceId)) {
|
||||
nonDirtyIncoming.push(sourceId)
|
||||
}
|
||||
}
|
||||
|
||||
for (const sourceId of nonDirtyIncoming) {
|
||||
node.incomingEdges.delete(sourceId)
|
||||
logger.debug('Removed non-dirty incoming edge for run-from-block', {
|
||||
nodeId,
|
||||
sourceId,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
const runFromBlockContext = { startBlockId: effectiveStartBlockId, dirtySet }
|
||||
const { context, state } = this.createExecutionContext(workflowId, undefined, {
|
||||
snapshotState: sourceSnapshot,
|
||||
runFromBlockContext,
|
||||
})
|
||||
|
||||
const resolver = new VariableResolver(this.workflow, this.workflowVariables, state)
|
||||
const loopOrchestrator = new LoopOrchestrator(dag, state, resolver)
|
||||
loopOrchestrator.setContextExtensions(this.contextExtensions)
|
||||
const parallelOrchestrator = new ParallelOrchestrator(dag, state)
|
||||
parallelOrchestrator.setResolver(resolver)
|
||||
parallelOrchestrator.setContextExtensions(this.contextExtensions)
|
||||
const allHandlers = createBlockHandlers()
|
||||
const blockExecutor = new BlockExecutor(allHandlers, resolver, this.contextExtensions, state)
|
||||
const edgeManager = new EdgeManager(dag)
|
||||
loopOrchestrator.setEdgeManager(edgeManager)
|
||||
const nodeOrchestrator = new NodeExecutionOrchestrator(
|
||||
dag,
|
||||
state,
|
||||
blockExecutor,
|
||||
loopOrchestrator,
|
||||
parallelOrchestrator
|
||||
)
|
||||
const engine = new ExecutionEngine(context, dag, edgeManager, nodeOrchestrator)
|
||||
|
||||
return await engine.run()
|
||||
}
|
||||
|
||||
private createExecutionContext(
|
||||
workflowId: string,
|
||||
triggerBlockId?: string
|
||||
triggerBlockId?: string,
|
||||
overrides?: {
|
||||
snapshotState?: SerializableExecutionState
|
||||
runFromBlockContext?: RunFromBlockContext
|
||||
}
|
||||
): { context: ExecutionContext; state: ExecutionState } {
|
||||
const snapshotState = this.contextExtensions.snapshotState
|
||||
const snapshotState = overrides?.snapshotState ?? this.contextExtensions.snapshotState
|
||||
const blockStates = snapshotState?.blockStates
|
||||
? new Map(Object.entries(snapshotState.blockStates))
|
||||
: new Map<string, BlockState>()
|
||||
const executedBlocks = snapshotState?.executedBlocks
|
||||
let executedBlocks = snapshotState?.executedBlocks
|
||||
? new Set(snapshotState.executedBlocks)
|
||||
: new Set<string>()
|
||||
|
||||
if (overrides?.runFromBlockContext) {
|
||||
const { dirtySet } = overrides.runFromBlockContext
|
||||
executedBlocks = new Set([...executedBlocks].filter((id) => !dirtySet.has(id)))
|
||||
logger.info('Cleared executed status for dirty blocks', {
|
||||
dirtySetSize: dirtySet.size,
|
||||
remainingExecutedBlocks: executedBlocks.size,
|
||||
})
|
||||
}
|
||||
|
||||
const state = new ExecutionState(blockStates, executedBlocks)
|
||||
|
||||
const context: ExecutionContext = {
|
||||
@@ -109,7 +210,7 @@ export class DAGExecutor {
|
||||
userId: this.contextExtensions.userId,
|
||||
isDeployedContext: this.contextExtensions.isDeployedContext,
|
||||
blockStates: state.getBlockStates(),
|
||||
blockLogs: snapshotState?.blockLogs ?? [],
|
||||
blockLogs: overrides?.runFromBlockContext ? [] : (snapshotState?.blockLogs ?? []),
|
||||
metadata: {
|
||||
...this.contextExtensions.metadata,
|
||||
startTime: new Date().toISOString(),
|
||||
@@ -169,6 +270,8 @@ export class DAGExecutor {
|
||||
abortSignal: this.contextExtensions.abortSignal,
|
||||
includeFileBase64: this.contextExtensions.includeFileBase64,
|
||||
base64MaxBytes: this.contextExtensions.base64MaxBytes,
|
||||
runFromBlockContext: overrides?.runFromBlockContext,
|
||||
stopAfterBlockId: this.contextExtensions.stopAfterBlockId,
|
||||
}
|
||||
|
||||
if (this.contextExtensions.resumeFromSnapshot) {
|
||||
@@ -193,6 +296,10 @@ export class DAGExecutor {
|
||||
pendingBlocks: context.metadata.pendingBlocks,
|
||||
skipStarterBlockInit: true,
|
||||
})
|
||||
} else if (overrides?.runFromBlockContext) {
|
||||
logger.info('Run-from-block mode: skipping starter block initialization', {
|
||||
startBlockId: overrides.runFromBlockContext.startBlockId,
|
||||
})
|
||||
} else {
|
||||
this.initializeStarterBlock(context, state, triggerBlockId)
|
||||
}
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
import type { Edge } from 'reactflow'
|
||||
import type { BlockLog, BlockState, NormalizedBlockOutput } from '@/executor/types'
|
||||
import type { RunFromBlockContext } from '@/executor/utils/run-from-block'
|
||||
import type { SubflowType } from '@/stores/workflows/workflow/types'
|
||||
|
||||
export interface ExecutionMetadata {
|
||||
@@ -105,6 +106,17 @@ export interface ContextExtensions {
|
||||
output: { input?: any; output: NormalizedBlockOutput; executionTime: number },
|
||||
iterationContext?: IterationContext
|
||||
) => Promise<void>
|
||||
|
||||
/**
|
||||
* Run-from-block configuration. When provided, executor runs in partial
|
||||
* execution mode starting from the specified block.
|
||||
*/
|
||||
runFromBlockContext?: RunFromBlockContext
|
||||
|
||||
/**
|
||||
* Stop execution after this block completes. Used for "run until block" feature.
|
||||
*/
|
||||
stopAfterBlockId?: string
|
||||
}
|
||||
|
||||
export interface WorkflowInput {
|
||||
|
||||
@@ -276,7 +276,16 @@ export class LoopOrchestrator {
|
||||
scope: LoopScope
|
||||
): LoopContinuationResult {
|
||||
const results = scope.allIterationOutputs
|
||||
this.state.setBlockOutput(loopId, { results }, DEFAULTS.EXECUTION_TIME)
|
||||
const output = { results }
|
||||
this.state.setBlockOutput(loopId, output, DEFAULTS.EXECUTION_TIME)
|
||||
|
||||
// Emit onBlockComplete for the loop container so the UI can track it
|
||||
if (this.contextExtensions?.onBlockComplete) {
|
||||
this.contextExtensions.onBlockComplete(loopId, 'Loop', 'loop', {
|
||||
output,
|
||||
executionTime: DEFAULTS.EXECUTION_TIME,
|
||||
})
|
||||
}
|
||||
|
||||
return {
|
||||
shouldContinue: false,
|
||||
|
||||
@@ -31,7 +31,18 @@ export class NodeExecutionOrchestrator {
|
||||
throw new Error(`Node not found in DAG: ${nodeId}`)
|
||||
}
|
||||
|
||||
if (this.state.hasExecuted(nodeId)) {
|
||||
if (ctx.runFromBlockContext && !ctx.runFromBlockContext.dirtySet.has(nodeId)) {
|
||||
const cachedOutput = this.state.getBlockOutput(nodeId) || {}
|
||||
logger.debug('Skipping non-dirty block in run-from-block mode', { nodeId })
|
||||
return {
|
||||
nodeId,
|
||||
output: cachedOutput,
|
||||
isFinalOutput: false,
|
||||
}
|
||||
}
|
||||
|
||||
const isDirtyBlock = ctx.runFromBlockContext?.dirtySet.has(nodeId) ?? false
|
||||
if (!isDirtyBlock && this.state.hasExecuted(nodeId)) {
|
||||
const output = this.state.getBlockOutput(nodeId) || {}
|
||||
return {
|
||||
nodeId,
|
||||
|
||||
@@ -228,9 +228,17 @@ export class ParallelOrchestrator {
|
||||
const branchOutputs = scope.branchOutputs.get(i) || []
|
||||
results.push(branchOutputs)
|
||||
}
|
||||
this.state.setBlockOutput(parallelId, {
|
||||
results,
|
||||
})
|
||||
const output = { results }
|
||||
this.state.setBlockOutput(parallelId, output)
|
||||
|
||||
// Emit onBlockComplete for the parallel container so the UI can track it
|
||||
if (this.contextExtensions?.onBlockComplete) {
|
||||
this.contextExtensions.onBlockComplete(parallelId, 'Parallel', 'parallel', {
|
||||
output,
|
||||
executionTime: 0,
|
||||
})
|
||||
}
|
||||
|
||||
return {
|
||||
allBranchesComplete: true,
|
||||
results,
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
import type { TraceSpan } from '@/lib/logs/types'
|
||||
import type { PermissionGroupConfig } from '@/lib/permission-groups/types'
|
||||
import type { BlockOutput } from '@/blocks/types'
|
||||
import type { RunFromBlockContext } from '@/executor/utils/run-from-block'
|
||||
import type { SerializedBlock, SerializedWorkflow } from '@/serializer/types'
|
||||
|
||||
export interface UserFile {
|
||||
@@ -250,6 +251,17 @@ export interface ExecutionContext {
|
||||
* will not have their base64 content fetched.
|
||||
*/
|
||||
base64MaxBytes?: number
|
||||
|
||||
/**
|
||||
* Context for "run from block" mode. When present, only blocks in dirtySet
|
||||
* will be executed; others return cached outputs from the source snapshot.
|
||||
*/
|
||||
runFromBlockContext?: RunFromBlockContext
|
||||
|
||||
/**
|
||||
* Stop execution after this block completes. Used for "run until block" feature.
|
||||
*/
|
||||
stopAfterBlockId?: string
|
||||
}
|
||||
|
||||
export interface ExecutionResult {
|
||||
|
||||
493
apps/sim/executor/utils/run-from-block.test.ts
Normal file
493
apps/sim/executor/utils/run-from-block.test.ts
Normal file
@@ -0,0 +1,493 @@
|
||||
import { describe, expect, it } from 'vitest'
|
||||
import type { DAG, DAGNode } from '@/executor/dag/builder'
|
||||
import type { DAGEdge, NodeMetadata } from '@/executor/dag/types'
|
||||
import { computeDirtySet, validateRunFromBlock } from '@/executor/utils/run-from-block'
|
||||
import type { SerializedLoop, SerializedParallel } from '@/serializer/types'
|
||||
|
||||
/**
|
||||
* Helper to create a DAG node for testing
|
||||
*/
|
||||
function createNode(
|
||||
id: string,
|
||||
outgoingEdges: Array<{ target: string; sourceHandle?: string }> = [],
|
||||
metadata: Partial<NodeMetadata> = {}
|
||||
): DAGNode {
|
||||
const edges = new Map<string, DAGEdge>()
|
||||
for (const edge of outgoingEdges) {
|
||||
edges.set(edge.target, { target: edge.target, sourceHandle: edge.sourceHandle })
|
||||
}
|
||||
|
||||
return {
|
||||
id,
|
||||
block: {
|
||||
id,
|
||||
position: { x: 0, y: 0 },
|
||||
config: { tool: 'test', params: {} },
|
||||
inputs: {},
|
||||
outputs: {},
|
||||
metadata: { id: 'test', name: `block-${id}`, category: 'tools' },
|
||||
enabled: true,
|
||||
},
|
||||
incomingEdges: new Set<string>(),
|
||||
outgoingEdges: edges,
|
||||
metadata: {
|
||||
isParallelBranch: false,
|
||||
isLoopNode: false,
|
||||
isSentinel: false,
|
||||
...metadata,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Helper to create a DAG for testing
|
||||
*/
|
||||
function createDAG(nodes: DAGNode[]): DAG {
|
||||
const nodeMap = new Map<string, DAGNode>()
|
||||
for (const node of nodes) {
|
||||
nodeMap.set(node.id, node)
|
||||
}
|
||||
|
||||
// Set up incoming edges based on outgoing edges
|
||||
for (const node of nodes) {
|
||||
for (const [, edge] of node.outgoingEdges) {
|
||||
const targetNode = nodeMap.get(edge.target)
|
||||
if (targetNode) {
|
||||
targetNode.incomingEdges.add(node.id)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return {
|
||||
nodes: nodeMap,
|
||||
loopConfigs: new Map<string, SerializedLoop>(),
|
||||
parallelConfigs: new Map<string, SerializedParallel>(),
|
||||
}
|
||||
}
|
||||
|
||||
describe('computeDirtySet', () => {
|
||||
it('includes start block in dirty set', () => {
|
||||
const dag = createDAG([createNode('A'), createNode('B'), createNode('C')])
|
||||
|
||||
const dirtySet = computeDirtySet(dag, 'B')
|
||||
|
||||
expect(dirtySet.has('B')).toBe(true)
|
||||
})
|
||||
|
||||
it('includes all downstream blocks in linear workflow', () => {
|
||||
// A → B → C → D
|
||||
const dag = createDAG([
|
||||
createNode('A', [{ target: 'B' }]),
|
||||
createNode('B', [{ target: 'C' }]),
|
||||
createNode('C', [{ target: 'D' }]),
|
||||
createNode('D'),
|
||||
])
|
||||
|
||||
const dirtySet = computeDirtySet(dag, 'B')
|
||||
|
||||
expect(dirtySet.has('A')).toBe(false)
|
||||
expect(dirtySet.has('B')).toBe(true)
|
||||
expect(dirtySet.has('C')).toBe(true)
|
||||
expect(dirtySet.has('D')).toBe(true)
|
||||
expect(dirtySet.size).toBe(3)
|
||||
})
|
||||
|
||||
it('handles branching paths', () => {
|
||||
// A → B → C
|
||||
// ↓
|
||||
// D → E
|
||||
const dag = createDAG([
|
||||
createNode('A', [{ target: 'B' }]),
|
||||
createNode('B', [{ target: 'C' }, { target: 'D' }]),
|
||||
createNode('C'),
|
||||
createNode('D', [{ target: 'E' }]),
|
||||
createNode('E'),
|
||||
])
|
||||
|
||||
const dirtySet = computeDirtySet(dag, 'B')
|
||||
|
||||
expect(dirtySet.has('A')).toBe(false)
|
||||
expect(dirtySet.has('B')).toBe(true)
|
||||
expect(dirtySet.has('C')).toBe(true)
|
||||
expect(dirtySet.has('D')).toBe(true)
|
||||
expect(dirtySet.has('E')).toBe(true)
|
||||
expect(dirtySet.size).toBe(4)
|
||||
})
|
||||
|
||||
it('handles convergence points', () => {
|
||||
// A → C
|
||||
// B → C → D
|
||||
const dag = createDAG([
|
||||
createNode('A', [{ target: 'C' }]),
|
||||
createNode('B', [{ target: 'C' }]),
|
||||
createNode('C', [{ target: 'D' }]),
|
||||
createNode('D'),
|
||||
])
|
||||
|
||||
// Run from A: should include A, C, D (but not B)
|
||||
const dirtySet = computeDirtySet(dag, 'A')
|
||||
|
||||
expect(dirtySet.has('A')).toBe(true)
|
||||
expect(dirtySet.has('B')).toBe(false)
|
||||
expect(dirtySet.has('C')).toBe(true)
|
||||
expect(dirtySet.has('D')).toBe(true)
|
||||
expect(dirtySet.size).toBe(3)
|
||||
})
|
||||
|
||||
it('handles diamond pattern', () => {
|
||||
// B
|
||||
// ↗ ↘
|
||||
// A D
|
||||
// ↘ ↗
|
||||
// C
|
||||
const dag = createDAG([
|
||||
createNode('A', [{ target: 'B' }, { target: 'C' }]),
|
||||
createNode('B', [{ target: 'D' }]),
|
||||
createNode('C', [{ target: 'D' }]),
|
||||
createNode('D'),
|
||||
])
|
||||
|
||||
const dirtySet = computeDirtySet(dag, 'A')
|
||||
|
||||
expect(dirtySet.has('A')).toBe(true)
|
||||
expect(dirtySet.has('B')).toBe(true)
|
||||
expect(dirtySet.has('C')).toBe(true)
|
||||
expect(dirtySet.has('D')).toBe(true)
|
||||
expect(dirtySet.size).toBe(4)
|
||||
})
|
||||
|
||||
it('stops at graph boundaries', () => {
|
||||
// A → B C → D (disconnected)
|
||||
const dag = createDAG([
|
||||
createNode('A', [{ target: 'B' }]),
|
||||
createNode('B'),
|
||||
createNode('C', [{ target: 'D' }]),
|
||||
createNode('D'),
|
||||
])
|
||||
|
||||
const dirtySet = computeDirtySet(dag, 'A')
|
||||
|
||||
expect(dirtySet.has('A')).toBe(true)
|
||||
expect(dirtySet.has('B')).toBe(true)
|
||||
expect(dirtySet.has('C')).toBe(false)
|
||||
expect(dirtySet.has('D')).toBe(false)
|
||||
expect(dirtySet.size).toBe(2)
|
||||
})
|
||||
|
||||
it('handles single node workflow', () => {
|
||||
const dag = createDAG([createNode('A')])
|
||||
|
||||
const dirtySet = computeDirtySet(dag, 'A')
|
||||
|
||||
expect(dirtySet.has('A')).toBe(true)
|
||||
expect(dirtySet.size).toBe(1)
|
||||
})
|
||||
|
||||
it('handles node not in DAG gracefully', () => {
|
||||
const dag = createDAG([createNode('A'), createNode('B')])
|
||||
|
||||
const dirtySet = computeDirtySet(dag, 'nonexistent')
|
||||
|
||||
// Should just contain the start block ID even if not found
|
||||
expect(dirtySet.has('nonexistent')).toBe(true)
|
||||
expect(dirtySet.size).toBe(1)
|
||||
})
|
||||
|
||||
it('includes convergent block when running from one branch of parallel', () => {
|
||||
// Parallel branches converging:
|
||||
// A → B → D
|
||||
// A → C → D
|
||||
// Running from B should include B and D (but not A or C)
|
||||
const dag = createDAG([
|
||||
createNode('A', [{ target: 'B' }, { target: 'C' }]),
|
||||
createNode('B', [{ target: 'D' }]),
|
||||
createNode('C', [{ target: 'D' }]),
|
||||
createNode('D'),
|
||||
])
|
||||
|
||||
const dirtySet = computeDirtySet(dag, 'B')
|
||||
|
||||
expect(dirtySet.has('A')).toBe(false)
|
||||
expect(dirtySet.has('B')).toBe(true)
|
||||
expect(dirtySet.has('C')).toBe(false)
|
||||
expect(dirtySet.has('D')).toBe(true)
|
||||
expect(dirtySet.size).toBe(2)
|
||||
})
|
||||
|
||||
it('handles running from convergent block itself (all upstream non-dirty)', () => {
|
||||
// A → C
|
||||
// B → C
|
||||
// Running from C should only include C
|
||||
const dag = createDAG([
|
||||
createNode('A', [{ target: 'C' }]),
|
||||
createNode('B', [{ target: 'C' }]),
|
||||
createNode('C', [{ target: 'D' }]),
|
||||
createNode('D'),
|
||||
])
|
||||
|
||||
const dirtySet = computeDirtySet(dag, 'C')
|
||||
|
||||
expect(dirtySet.has('A')).toBe(false)
|
||||
expect(dirtySet.has('B')).toBe(false)
|
||||
expect(dirtySet.has('C')).toBe(true)
|
||||
expect(dirtySet.has('D')).toBe(true)
|
||||
expect(dirtySet.size).toBe(2)
|
||||
})
|
||||
|
||||
it('handles deep downstream chains', () => {
|
||||
// A → B → C → D → E → F
|
||||
// Running from C should include C, D, E, F
|
||||
const dag = createDAG([
|
||||
createNode('A', [{ target: 'B' }]),
|
||||
createNode('B', [{ target: 'C' }]),
|
||||
createNode('C', [{ target: 'D' }]),
|
||||
createNode('D', [{ target: 'E' }]),
|
||||
createNode('E', [{ target: 'F' }]),
|
||||
createNode('F'),
|
||||
])
|
||||
|
||||
const dirtySet = computeDirtySet(dag, 'C')
|
||||
|
||||
expect(dirtySet.has('A')).toBe(false)
|
||||
expect(dirtySet.has('B')).toBe(false)
|
||||
expect(dirtySet.has('C')).toBe(true)
|
||||
expect(dirtySet.has('D')).toBe(true)
|
||||
expect(dirtySet.has('E')).toBe(true)
|
||||
expect(dirtySet.has('F')).toBe(true)
|
||||
expect(dirtySet.size).toBe(4)
|
||||
})
|
||||
})
|
||||
|
||||
describe('validateRunFromBlock', () => {
|
||||
it('accepts valid block', () => {
|
||||
const dag = createDAG([createNode('A'), createNode('B')])
|
||||
const executedBlocks = new Set(['A', 'B'])
|
||||
|
||||
const result = validateRunFromBlock('A', dag, executedBlocks)
|
||||
|
||||
expect(result.valid).toBe(true)
|
||||
expect(result.error).toBeUndefined()
|
||||
})
|
||||
|
||||
it('rejects block not found in DAG', () => {
|
||||
const dag = createDAG([createNode('A')])
|
||||
const executedBlocks = new Set(['A', 'B'])
|
||||
|
||||
const result = validateRunFromBlock('B', dag, executedBlocks)
|
||||
|
||||
expect(result.valid).toBe(false)
|
||||
expect(result.error).toContain('Block not found')
|
||||
})
|
||||
|
||||
it('rejects blocks inside loops', () => {
|
||||
const dag = createDAG([createNode('A', [], { isLoopNode: true, loopId: 'loop-1' })])
|
||||
const executedBlocks = new Set(['A'])
|
||||
|
||||
const result = validateRunFromBlock('A', dag, executedBlocks)
|
||||
|
||||
expect(result.valid).toBe(false)
|
||||
expect(result.error).toContain('inside loop')
|
||||
expect(result.error).toContain('loop-1')
|
||||
})
|
||||
|
||||
it('rejects blocks inside parallels', () => {
|
||||
const dag = createDAG([
|
||||
createNode('A', [], { isParallelBranch: true, parallelId: 'parallel-1' }),
|
||||
])
|
||||
const executedBlocks = new Set(['A'])
|
||||
|
||||
const result = validateRunFromBlock('A', dag, executedBlocks)
|
||||
|
||||
expect(result.valid).toBe(false)
|
||||
expect(result.error).toContain('inside parallel')
|
||||
expect(result.error).toContain('parallel-1')
|
||||
})
|
||||
|
||||
it('rejects sentinel nodes', () => {
|
||||
const dag = createDAG([createNode('A', [], { isSentinel: true, sentinelType: 'start' })])
|
||||
const executedBlocks = new Set(['A'])
|
||||
|
||||
const result = validateRunFromBlock('A', dag, executedBlocks)
|
||||
|
||||
expect(result.valid).toBe(false)
|
||||
expect(result.error).toContain('sentinel')
|
||||
})
|
||||
|
||||
it('rejects blocks with unexecuted upstream dependencies', () => {
|
||||
// A → B, only A executed but B depends on A
|
||||
const dag = createDAG([createNode('A', [{ target: 'B' }]), createNode('B')])
|
||||
const executedBlocks = new Set<string>() // A was not executed
|
||||
|
||||
const result = validateRunFromBlock('B', dag, executedBlocks)
|
||||
|
||||
expect(result.valid).toBe(false)
|
||||
expect(result.error).toContain('Upstream dependency not executed')
|
||||
})
|
||||
|
||||
it('allows blocks with no dependencies even if not previously executed', () => {
|
||||
// A and B are independent (no edges)
|
||||
const dag = createDAG([createNode('A'), createNode('B')])
|
||||
const executedBlocks = new Set(['A']) // B was not executed but has no deps
|
||||
|
||||
const result = validateRunFromBlock('B', dag, executedBlocks)
|
||||
|
||||
expect(result.valid).toBe(true) // B has no incoming edges, so it's valid
|
||||
})
|
||||
|
||||
it('accepts regular executed block', () => {
|
||||
const dag = createDAG([
|
||||
createNode('trigger', [{ target: 'A' }]),
|
||||
createNode('A', [{ target: 'B' }]),
|
||||
createNode('B'),
|
||||
])
|
||||
const executedBlocks = new Set(['trigger', 'A', 'B'])
|
||||
|
||||
const result = validateRunFromBlock('A', dag, executedBlocks)
|
||||
|
||||
expect(result.valid).toBe(true)
|
||||
})
|
||||
|
||||
it('accepts loop container when executed', () => {
|
||||
// Loop container with sentinel nodes
|
||||
const loopId = 'loop-container-1'
|
||||
const sentinelStartId = `loop-${loopId}-sentinel-start`
|
||||
const sentinelEndId = `loop-${loopId}-sentinel-end`
|
||||
const dag = createDAG([
|
||||
createNode('A', [{ target: sentinelStartId }]),
|
||||
createNode(sentinelStartId, [{ target: 'B' }], {
|
||||
isSentinel: true,
|
||||
sentinelType: 'start',
|
||||
loopId,
|
||||
}),
|
||||
createNode('B', [{ target: sentinelEndId }], { isLoopNode: true, loopId }),
|
||||
createNode(sentinelEndId, [{ target: 'C' }], {
|
||||
isSentinel: true,
|
||||
sentinelType: 'end',
|
||||
loopId,
|
||||
}),
|
||||
createNode('C'),
|
||||
])
|
||||
dag.loopConfigs.set(loopId, { id: loopId, nodes: ['B'], iterations: 3, loopType: 'for' } as any)
|
||||
const executedBlocks = new Set(['A', loopId, sentinelStartId, 'B', sentinelEndId, 'C'])
|
||||
|
||||
const result = validateRunFromBlock(loopId, dag, executedBlocks)
|
||||
|
||||
expect(result.valid).toBe(true)
|
||||
})
|
||||
|
||||
it('accepts parallel container when executed', () => {
|
||||
// Parallel container with sentinel nodes
|
||||
const parallelId = 'parallel-container-1'
|
||||
const sentinelStartId = `parallel-${parallelId}-sentinel-start`
|
||||
const sentinelEndId = `parallel-${parallelId}-sentinel-end`
|
||||
const dag = createDAG([
|
||||
createNode('A', [{ target: sentinelStartId }]),
|
||||
createNode(sentinelStartId, [{ target: 'B₍0₎' }], {
|
||||
isSentinel: true,
|
||||
sentinelType: 'start',
|
||||
parallelId,
|
||||
}),
|
||||
createNode('B₍0₎', [{ target: sentinelEndId }], { isParallelBranch: true, parallelId }),
|
||||
createNode(sentinelEndId, [{ target: 'C' }], {
|
||||
isSentinel: true,
|
||||
sentinelType: 'end',
|
||||
parallelId,
|
||||
}),
|
||||
createNode('C'),
|
||||
])
|
||||
dag.parallelConfigs.set(parallelId, { id: parallelId, nodes: ['B'], count: 2 } as any)
|
||||
const executedBlocks = new Set(['A', parallelId, sentinelStartId, 'B₍0₎', sentinelEndId, 'C'])
|
||||
|
||||
const result = validateRunFromBlock(parallelId, dag, executedBlocks)
|
||||
|
||||
expect(result.valid).toBe(true)
|
||||
})
|
||||
|
||||
it('allows loop container with no upstream dependencies', () => {
|
||||
// Loop containers are validated via their sentinel nodes, not incoming edges on the container itself
|
||||
// If the loop has no upstream dependencies, it should be valid
|
||||
const loopId = 'loop-container-1'
|
||||
const sentinelStartId = `loop-${loopId}-sentinel-start`
|
||||
const dag = createDAG([
|
||||
createNode(sentinelStartId, [], { isSentinel: true, sentinelType: 'start', loopId }),
|
||||
])
|
||||
dag.loopConfigs.set(loopId, { id: loopId, nodes: [], iterations: 3, loopType: 'for' } as any)
|
||||
const executedBlocks = new Set<string>() // Nothing executed but loop has no deps
|
||||
|
||||
const result = validateRunFromBlock(loopId, dag, executedBlocks)
|
||||
|
||||
// Loop container validation doesn't check incoming edges (containers don't have nodes in dag.nodes)
|
||||
// So this is valid - the loop can start fresh
|
||||
expect(result.valid).toBe(true)
|
||||
})
|
||||
})
|
||||
|
||||
describe('computeDirtySet with containers', () => {
|
||||
it('includes loop container and all downstream when running from loop', () => {
|
||||
// A → loop-sentinel-start → B (inside loop) → loop-sentinel-end → C
|
||||
const loopId = 'loop-1'
|
||||
const sentinelStartId = `loop-${loopId}-sentinel-start`
|
||||
const sentinelEndId = `loop-${loopId}-sentinel-end`
|
||||
const dag = createDAG([
|
||||
createNode('A', [{ target: sentinelStartId }]),
|
||||
createNode(sentinelStartId, [{ target: 'B' }], {
|
||||
isSentinel: true,
|
||||
sentinelType: 'start',
|
||||
loopId,
|
||||
}),
|
||||
createNode('B', [{ target: sentinelEndId }], { isLoopNode: true, loopId }),
|
||||
createNode(sentinelEndId, [{ target: 'C' }], {
|
||||
isSentinel: true,
|
||||
sentinelType: 'end',
|
||||
loopId,
|
||||
}),
|
||||
createNode('C'),
|
||||
])
|
||||
dag.loopConfigs.set(loopId, { id: loopId, nodes: ['B'], iterations: 3, loopType: 'for' } as any)
|
||||
|
||||
const dirtySet = computeDirtySet(dag, loopId)
|
||||
|
||||
// Should include loop container, sentinel-start, B, sentinel-end, C
|
||||
expect(dirtySet.has(loopId)).toBe(true)
|
||||
expect(dirtySet.has(sentinelStartId)).toBe(true)
|
||||
expect(dirtySet.has('B')).toBe(true)
|
||||
expect(dirtySet.has(sentinelEndId)).toBe(true)
|
||||
expect(dirtySet.has('C')).toBe(true)
|
||||
// Should NOT include A (upstream)
|
||||
expect(dirtySet.has('A')).toBe(false)
|
||||
})
|
||||
|
||||
it('includes parallel container and all downstream when running from parallel', () => {
|
||||
// A → parallel-sentinel-start → B₍0₎ → parallel-sentinel-end → C
|
||||
const parallelId = 'parallel-1'
|
||||
const sentinelStartId = `parallel-${parallelId}-sentinel-start`
|
||||
const sentinelEndId = `parallel-${parallelId}-sentinel-end`
|
||||
const dag = createDAG([
|
||||
createNode('A', [{ target: sentinelStartId }]),
|
||||
createNode(sentinelStartId, [{ target: 'B₍0₎' }], {
|
||||
isSentinel: true,
|
||||
sentinelType: 'start',
|
||||
parallelId,
|
||||
}),
|
||||
createNode('B₍0₎', [{ target: sentinelEndId }], { isParallelBranch: true, parallelId }),
|
||||
createNode(sentinelEndId, [{ target: 'C' }], {
|
||||
isSentinel: true,
|
||||
sentinelType: 'end',
|
||||
parallelId,
|
||||
}),
|
||||
createNode('C'),
|
||||
])
|
||||
dag.parallelConfigs.set(parallelId, { id: parallelId, nodes: ['B'], count: 2 } as any)
|
||||
|
||||
const dirtySet = computeDirtySet(dag, parallelId)
|
||||
|
||||
// Should include parallel container, sentinel-start, B₍0₎, sentinel-end, C
|
||||
expect(dirtySet.has(parallelId)).toBe(true)
|
||||
expect(dirtySet.has(sentinelStartId)).toBe(true)
|
||||
expect(dirtySet.has('B₍0₎')).toBe(true)
|
||||
expect(dirtySet.has(sentinelEndId)).toBe(true)
|
||||
expect(dirtySet.has('C')).toBe(true)
|
||||
// Should NOT include A (upstream)
|
||||
expect(dirtySet.has('A')).toBe(false)
|
||||
})
|
||||
})
|
||||
169
apps/sim/executor/utils/run-from-block.ts
Normal file
169
apps/sim/executor/utils/run-from-block.ts
Normal file
@@ -0,0 +1,169 @@
|
||||
import { createLogger } from '@sim/logger'
|
||||
import { LOOP, PARALLEL } from '@/executor/constants'
|
||||
import type { DAG } from '@/executor/dag/builder'
|
||||
|
||||
const logger = createLogger('run-from-block')
|
||||
|
||||
/**
|
||||
* Builds the sentinel-start node ID for a loop.
|
||||
*/
|
||||
function buildLoopSentinelStartId(loopId: string): string {
|
||||
return `${LOOP.SENTINEL.PREFIX}${loopId}${LOOP.SENTINEL.START_SUFFIX}`
|
||||
}
|
||||
|
||||
/**
|
||||
* Builds the sentinel-start node ID for a parallel.
|
||||
*/
|
||||
function buildParallelSentinelStartId(parallelId: string): string {
|
||||
return `${PARALLEL.SENTINEL.PREFIX}${parallelId}${PARALLEL.SENTINEL.START_SUFFIX}`
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks if a block ID is a loop or parallel container and returns the sentinel-start ID if so.
|
||||
* Returns null if the block is not a container.
|
||||
*/
|
||||
export function resolveContainerToSentinelStart(blockId: string, dag: DAG): string | null {
|
||||
if (dag.loopConfigs.has(blockId)) {
|
||||
return buildLoopSentinelStartId(blockId)
|
||||
}
|
||||
if (dag.parallelConfigs.has(blockId)) {
|
||||
return buildParallelSentinelStartId(blockId)
|
||||
}
|
||||
return null
|
||||
}
|
||||
|
||||
/**
|
||||
* Result of validating a block for run-from-block execution.
|
||||
*/
|
||||
export interface RunFromBlockValidation {
|
||||
valid: boolean
|
||||
error?: string
|
||||
}
|
||||
|
||||
/**
|
||||
* Context for run-from-block execution mode.
|
||||
*/
|
||||
export interface RunFromBlockContext {
|
||||
/** The block ID to start execution from */
|
||||
startBlockId: string
|
||||
/** Set of block IDs that need re-execution (start block + all downstream) */
|
||||
dirtySet: Set<string>
|
||||
}
|
||||
|
||||
/**
|
||||
* Computes all blocks that need re-execution when running from a specific block.
|
||||
* Uses BFS to find all downstream blocks reachable via outgoing edges.
|
||||
*
|
||||
* For loop/parallel containers, starts from the sentinel-start node and includes
|
||||
* the container ID itself in the dirty set.
|
||||
*
|
||||
* @param dag - The workflow DAG
|
||||
* @param startBlockId - The block to start execution from
|
||||
* @returns Set of block IDs that are "dirty" and need re-execution
|
||||
*/
|
||||
export function computeDirtySet(dag: DAG, startBlockId: string): Set<string> {
|
||||
const dirty = new Set<string>([startBlockId])
|
||||
const sentinelStartId = resolveContainerToSentinelStart(startBlockId, dag)
|
||||
const traversalStartId = sentinelStartId ?? startBlockId
|
||||
|
||||
if (sentinelStartId) {
|
||||
dirty.add(sentinelStartId)
|
||||
}
|
||||
|
||||
const queue = [traversalStartId]
|
||||
|
||||
while (queue.length > 0) {
|
||||
const nodeId = queue.shift()!
|
||||
const node = dag.nodes.get(nodeId)
|
||||
if (!node) continue
|
||||
|
||||
for (const [, edge] of node.outgoingEdges) {
|
||||
if (!dirty.has(edge.target)) {
|
||||
dirty.add(edge.target)
|
||||
queue.push(edge.target)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
logger.debug('Computed dirty set', {
|
||||
startBlockId,
|
||||
traversalStartId,
|
||||
dirtySetSize: dirty.size,
|
||||
dirtyBlocks: Array.from(dirty),
|
||||
})
|
||||
|
||||
return dirty
|
||||
}
|
||||
|
||||
/**
|
||||
* Validates that a block can be used as a run-from-block starting point.
|
||||
*
|
||||
* Validation rules:
|
||||
* - Block must exist in the DAG (or be a loop/parallel container)
|
||||
* - Block cannot be inside a loop (but loop containers are allowed)
|
||||
* - Block cannot be inside a parallel (but parallel containers are allowed)
|
||||
* - Block cannot be a sentinel node
|
||||
* - All upstream dependencies must have been executed (have cached outputs)
|
||||
*
|
||||
* @param blockId - The block ID to validate
|
||||
* @param dag - The workflow DAG
|
||||
* @param executedBlocks - Set of blocks that were executed in the source run
|
||||
* @returns Validation result with error message if invalid
|
||||
*/
|
||||
export function validateRunFromBlock(
|
||||
blockId: string,
|
||||
dag: DAG,
|
||||
executedBlocks: Set<string>
|
||||
): RunFromBlockValidation {
|
||||
const node = dag.nodes.get(blockId)
|
||||
const isLoopContainer = dag.loopConfigs.has(blockId)
|
||||
const isParallelContainer = dag.parallelConfigs.has(blockId)
|
||||
const isContainer = isLoopContainer || isParallelContainer
|
||||
|
||||
if (!node && !isContainer) {
|
||||
return { valid: false, error: `Block not found in workflow: ${blockId}` }
|
||||
}
|
||||
|
||||
if (isContainer) {
|
||||
const sentinelStartId = resolveContainerToSentinelStart(blockId, dag)
|
||||
if (!sentinelStartId || !dag.nodes.has(sentinelStartId)) {
|
||||
return {
|
||||
valid: false,
|
||||
error: `Container sentinel not found for: ${blockId}`,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (node) {
|
||||
if (node.metadata.isLoopNode) {
|
||||
return {
|
||||
valid: false,
|
||||
error: `Cannot run from block inside loop: ${node.metadata.loopId}`,
|
||||
}
|
||||
}
|
||||
|
||||
if (node.metadata.isParallelBranch) {
|
||||
return {
|
||||
valid: false,
|
||||
error: `Cannot run from block inside parallel: ${node.metadata.parallelId}`,
|
||||
}
|
||||
}
|
||||
|
||||
if (node.metadata.isSentinel) {
|
||||
return { valid: false, error: 'Cannot run from sentinel node' }
|
||||
}
|
||||
|
||||
if (node.incomingEdges.size > 0) {
|
||||
for (const sourceId of node.incomingEdges.keys()) {
|
||||
if (!executedBlocks.has(sourceId)) {
|
||||
return {
|
||||
valid: false,
|
||||
error: `Upstream dependency not executed: ${sourceId}`,
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return { valid: true }
|
||||
}
|
||||
@@ -1,10 +1,85 @@
|
||||
import { useCallback, useRef } from 'react'
|
||||
import { createLogger } from '@sim/logger'
|
||||
import type { ExecutionEvent } from '@/lib/workflows/executor/execution-events'
|
||||
import type { SerializableExecutionState } from '@/executor/execution/types'
|
||||
import type { SubflowType } from '@/stores/workflows/workflow/types'
|
||||
|
||||
const logger = createLogger('useExecutionStream')
|
||||
|
||||
/**
|
||||
* Processes SSE events from a response body and invokes appropriate callbacks.
|
||||
*/
|
||||
async function processSSEStream(
|
||||
reader: ReadableStreamDefaultReader<Uint8Array>,
|
||||
callbacks: ExecutionStreamCallbacks,
|
||||
logPrefix: string
|
||||
): Promise<void> {
|
||||
const decoder = new TextDecoder()
|
||||
let buffer = ''
|
||||
|
||||
try {
|
||||
while (true) {
|
||||
const { done, value } = await reader.read()
|
||||
|
||||
if (done) break
|
||||
|
||||
buffer += decoder.decode(value, { stream: true })
|
||||
const lines = buffer.split('\n\n')
|
||||
buffer = lines.pop() || ''
|
||||
|
||||
for (const line of lines) {
|
||||
if (!line.trim() || !line.startsWith('data: ')) continue
|
||||
|
||||
const data = line.substring(6).trim()
|
||||
if (data === '[DONE]') {
|
||||
logger.info(`${logPrefix} stream completed`)
|
||||
continue
|
||||
}
|
||||
|
||||
try {
|
||||
const event = JSON.parse(data) as ExecutionEvent
|
||||
|
||||
switch (event.type) {
|
||||
case 'execution:started':
|
||||
callbacks.onExecutionStarted?.(event.data)
|
||||
break
|
||||
case 'execution:completed':
|
||||
callbacks.onExecutionCompleted?.(event.data)
|
||||
break
|
||||
case 'execution:error':
|
||||
callbacks.onExecutionError?.(event.data)
|
||||
break
|
||||
case 'execution:cancelled':
|
||||
callbacks.onExecutionCancelled?.(event.data)
|
||||
break
|
||||
case 'block:started':
|
||||
callbacks.onBlockStarted?.(event.data)
|
||||
break
|
||||
case 'block:completed':
|
||||
callbacks.onBlockCompleted?.(event.data)
|
||||
break
|
||||
case 'block:error':
|
||||
callbacks.onBlockError?.(event.data)
|
||||
break
|
||||
case 'stream:chunk':
|
||||
callbacks.onStreamChunk?.(event.data)
|
||||
break
|
||||
case 'stream:done':
|
||||
callbacks.onStreamDone?.(event.data)
|
||||
break
|
||||
default:
|
||||
logger.warn('Unknown event type:', (event as any).type)
|
||||
}
|
||||
} catch (error) {
|
||||
logger.error('Failed to parse SSE event:', error, { data })
|
||||
}
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
reader.releaseLock()
|
||||
}
|
||||
}
|
||||
|
||||
export interface ExecutionStreamCallbacks {
|
||||
onExecutionStarted?: (data: { startTime: string }) => void
|
||||
onExecutionCompleted?: (data: {
|
||||
@@ -68,6 +143,14 @@ export interface ExecuteStreamOptions {
|
||||
loops?: Record<string, any>
|
||||
parallels?: Record<string, any>
|
||||
}
|
||||
stopAfterBlockId?: string
|
||||
callbacks?: ExecutionStreamCallbacks
|
||||
}
|
||||
|
||||
export interface ExecuteFromBlockOptions {
|
||||
workflowId: string
|
||||
startBlockId: string
|
||||
sourceSnapshot: SerializableExecutionState
|
||||
callbacks?: ExecutionStreamCallbacks
|
||||
}
|
||||
|
||||
@@ -119,91 +202,7 @@ export function useExecutionStream() {
|
||||
}
|
||||
|
||||
const reader = response.body.getReader()
|
||||
const decoder = new TextDecoder()
|
||||
let buffer = ''
|
||||
|
||||
try {
|
||||
while (true) {
|
||||
const { done, value } = await reader.read()
|
||||
|
||||
if (done) {
|
||||
break
|
||||
}
|
||||
|
||||
buffer += decoder.decode(value, { stream: true })
|
||||
|
||||
const lines = buffer.split('\n\n')
|
||||
|
||||
buffer = lines.pop() || ''
|
||||
|
||||
for (const line of lines) {
|
||||
if (!line.trim() || !line.startsWith('data: ')) {
|
||||
continue
|
||||
}
|
||||
|
||||
const data = line.substring(6).trim()
|
||||
|
||||
if (data === '[DONE]') {
|
||||
logger.info('Stream completed')
|
||||
continue
|
||||
}
|
||||
|
||||
try {
|
||||
const event = JSON.parse(data) as ExecutionEvent
|
||||
|
||||
logger.info('📡 SSE Event received:', {
|
||||
type: event.type,
|
||||
executionId: event.executionId,
|
||||
data: event.data,
|
||||
})
|
||||
|
||||
switch (event.type) {
|
||||
case 'execution:started':
|
||||
logger.info('🚀 Execution started')
|
||||
callbacks.onExecutionStarted?.(event.data)
|
||||
break
|
||||
case 'execution:completed':
|
||||
logger.info('✅ Execution completed')
|
||||
callbacks.onExecutionCompleted?.(event.data)
|
||||
break
|
||||
case 'execution:error':
|
||||
logger.error('❌ Execution error')
|
||||
callbacks.onExecutionError?.(event.data)
|
||||
break
|
||||
case 'execution:cancelled':
|
||||
logger.warn('🛑 Execution cancelled')
|
||||
callbacks.onExecutionCancelled?.(event.data)
|
||||
break
|
||||
case 'block:started':
|
||||
logger.info('🔷 Block started:', event.data.blockId)
|
||||
callbacks.onBlockStarted?.(event.data)
|
||||
break
|
||||
case 'block:completed':
|
||||
logger.info('✓ Block completed:', event.data.blockId)
|
||||
callbacks.onBlockCompleted?.(event.data)
|
||||
break
|
||||
case 'block:error':
|
||||
logger.error('✗ Block error:', event.data.blockId)
|
||||
callbacks.onBlockError?.(event.data)
|
||||
break
|
||||
case 'stream:chunk':
|
||||
callbacks.onStreamChunk?.(event.data)
|
||||
break
|
||||
case 'stream:done':
|
||||
logger.info('Stream done:', event.data.blockId)
|
||||
callbacks.onStreamDone?.(event.data)
|
||||
break
|
||||
default:
|
||||
logger.warn('Unknown event type:', (event as any).type)
|
||||
}
|
||||
} catch (error) {
|
||||
logger.error('Failed to parse SSE event:', error, { data })
|
||||
}
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
reader.releaseLock()
|
||||
}
|
||||
await processSSEStream(reader, callbacks, 'Execution')
|
||||
} catch (error: any) {
|
||||
if (error.name === 'AbortError') {
|
||||
logger.info('Execution stream cancelled')
|
||||
@@ -222,6 +221,65 @@ export function useExecutionStream() {
|
||||
}
|
||||
}, [])
|
||||
|
||||
const executeFromBlock = useCallback(async (options: ExecuteFromBlockOptions) => {
|
||||
const { workflowId, startBlockId, sourceSnapshot, callbacks = {} } = options
|
||||
|
||||
if (abortControllerRef.current) {
|
||||
abortControllerRef.current.abort()
|
||||
}
|
||||
|
||||
const abortController = new AbortController()
|
||||
abortControllerRef.current = abortController
|
||||
currentExecutionRef.current = null
|
||||
|
||||
try {
|
||||
const response = await fetch(`/api/workflows/${workflowId}/execute-from-block`, {
|
||||
method: 'POST',
|
||||
headers: {
|
||||
'Content-Type': 'application/json',
|
||||
},
|
||||
body: JSON.stringify({ startBlockId, sourceSnapshot }),
|
||||
signal: abortController.signal,
|
||||
})
|
||||
|
||||
if (!response.ok) {
|
||||
const errorResponse = await response.json()
|
||||
const error = new Error(errorResponse.error || 'Failed to start execution')
|
||||
if (errorResponse && typeof errorResponse === 'object') {
|
||||
Object.assign(error, { executionResult: errorResponse })
|
||||
}
|
||||
throw error
|
||||
}
|
||||
|
||||
if (!response.body) {
|
||||
throw new Error('No response body')
|
||||
}
|
||||
|
||||
const executionId = response.headers.get('X-Execution-Id')
|
||||
if (executionId) {
|
||||
currentExecutionRef.current = { workflowId, executionId }
|
||||
}
|
||||
|
||||
const reader = response.body.getReader()
|
||||
await processSSEStream(reader, callbacks, 'Run-from-block')
|
||||
} catch (error: any) {
|
||||
if (error.name === 'AbortError') {
|
||||
logger.info('Run-from-block execution cancelled')
|
||||
callbacks.onExecutionCancelled?.({ duration: 0 })
|
||||
} else {
|
||||
logger.error('Run-from-block execution error:', error)
|
||||
callbacks.onExecutionError?.({
|
||||
error: error.message || 'Unknown error',
|
||||
duration: 0,
|
||||
})
|
||||
}
|
||||
throw error
|
||||
} finally {
|
||||
abortControllerRef.current = null
|
||||
currentExecutionRef.current = null
|
||||
}
|
||||
}, [])
|
||||
|
||||
const cancel = useCallback(() => {
|
||||
const execution = currentExecutionRef.current
|
||||
if (execution) {
|
||||
@@ -239,6 +297,7 @@ export function useExecutionStream() {
|
||||
|
||||
return {
|
||||
execute,
|
||||
executeFromBlock,
|
||||
cancel,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -22,6 +22,7 @@ import type {
|
||||
ContextExtensions,
|
||||
ExecutionCallbacks,
|
||||
IterationContext,
|
||||
SerializableExecutionState,
|
||||
} from '@/executor/execution/types'
|
||||
import type { ExecutionResult, NormalizedBlockOutput } from '@/executor/types'
|
||||
import { hasExecutionResult } from '@/executor/utils/errors'
|
||||
@@ -40,6 +41,12 @@ export interface ExecuteWorkflowCoreOptions {
|
||||
abortSignal?: AbortSignal
|
||||
includeFileBase64?: boolean
|
||||
base64MaxBytes?: number
|
||||
stopAfterBlockId?: string
|
||||
/** Run-from-block mode: execute starting from a specific block using cached upstream outputs */
|
||||
runFromBlock?: {
|
||||
startBlockId: string
|
||||
sourceSnapshot: SerializableExecutionState
|
||||
}
|
||||
}
|
||||
|
||||
function parseVariableValueByType(value: unknown, type: string): unknown {
|
||||
@@ -114,6 +121,8 @@ export async function executeWorkflowCore(
|
||||
abortSignal,
|
||||
includeFileBase64,
|
||||
base64MaxBytes,
|
||||
stopAfterBlockId,
|
||||
runFromBlock,
|
||||
} = options
|
||||
const { metadata, workflow, input, workflowVariables, selectedOutputs } = snapshot
|
||||
const { requestId, workflowId, userId, triggerType, executionId, triggerBlockId, useDraftState } =
|
||||
@@ -297,6 +306,7 @@ export async function executeWorkflowCore(
|
||||
abortSignal,
|
||||
includeFileBase64,
|
||||
base64MaxBytes,
|
||||
stopAfterBlockId,
|
||||
}
|
||||
|
||||
const executorInstance = new Executor({
|
||||
@@ -319,10 +329,13 @@ export async function executeWorkflowCore(
|
||||
}
|
||||
}
|
||||
|
||||
const result = (await executorInstance.execute(
|
||||
workflowId,
|
||||
resolvedTriggerBlockId
|
||||
)) as ExecutionResult
|
||||
const result = runFromBlock
|
||||
? ((await executorInstance.executeFromBlock(
|
||||
workflowId,
|
||||
runFromBlock.startBlockId,
|
||||
runFromBlock.sourceSnapshot
|
||||
)) as ExecutionResult)
|
||||
: ((await executorInstance.execute(workflowId, resolvedTriggerBlockId)) as ExecutionResult)
|
||||
|
||||
// Build trace spans for logging from the full execution result
|
||||
const { traceSpans, totalDuration } = buildTraceSpans(result)
|
||||
|
||||
@@ -35,4 +35,23 @@ export const useExecutionStore = create<ExecutionState & ExecutionActions>()((se
|
||||
},
|
||||
clearRunPath: () => set({ lastRunPath: new Map(), lastRunEdges: new Map() }),
|
||||
reset: () => set(initialState),
|
||||
|
||||
setLastExecutionSnapshot: (workflowId, snapshot) => {
|
||||
const { lastExecutionSnapshots } = get()
|
||||
const newSnapshots = new Map(lastExecutionSnapshots)
|
||||
newSnapshots.set(workflowId, snapshot)
|
||||
set({ lastExecutionSnapshots: newSnapshots })
|
||||
},
|
||||
|
||||
getLastExecutionSnapshot: (workflowId) => {
|
||||
const { lastExecutionSnapshots } = get()
|
||||
return lastExecutionSnapshots.get(workflowId)
|
||||
},
|
||||
|
||||
clearLastExecutionSnapshot: (workflowId) => {
|
||||
const { lastExecutionSnapshots } = get()
|
||||
const newSnapshots = new Map(lastExecutionSnapshots)
|
||||
newSnapshots.delete(workflowId)
|
||||
set({ lastExecutionSnapshots: newSnapshots })
|
||||
},
|
||||
}))
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
import type { Executor } from '@/executor'
|
||||
import type { SerializableExecutionState } from '@/executor/execution/types'
|
||||
import type { ExecutionContext } from '@/executor/types'
|
||||
|
||||
/**
|
||||
@@ -18,16 +19,9 @@ export interface ExecutionState {
|
||||
pendingBlocks: string[]
|
||||
executor: Executor | null
|
||||
debugContext: ExecutionContext | null
|
||||
/**
|
||||
* Tracks blocks from the last execution run and their success/error status.
|
||||
* Cleared when a new run starts. Used to show run path indicators (rings on blocks).
|
||||
*/
|
||||
lastRunPath: Map<string, BlockRunStatus>
|
||||
/**
|
||||
* Tracks edges from the last execution run and their success/error status.
|
||||
* Cleared when a new run starts. Used to show run path indicators on edges.
|
||||
*/
|
||||
lastRunEdges: Map<string, EdgeRunStatus>
|
||||
lastExecutionSnapshots: Map<string, SerializableExecutionState>
|
||||
}
|
||||
|
||||
export interface ExecutionActions {
|
||||
@@ -41,6 +35,9 @@ export interface ExecutionActions {
|
||||
setEdgeRunStatus: (edgeId: string, status: EdgeRunStatus) => void
|
||||
clearRunPath: () => void
|
||||
reset: () => void
|
||||
setLastExecutionSnapshot: (workflowId: string, snapshot: SerializableExecutionState) => void
|
||||
getLastExecutionSnapshot: (workflowId: string) => SerializableExecutionState | undefined
|
||||
clearLastExecutionSnapshot: (workflowId: string) => void
|
||||
}
|
||||
|
||||
export const initialState: ExecutionState = {
|
||||
@@ -52,4 +49,5 @@ export const initialState: ExecutionState = {
|
||||
debugContext: null,
|
||||
lastRunPath: new Map(),
|
||||
lastRunEdges: new Map(),
|
||||
lastExecutionSnapshots: new Map(),
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user