mirror of
https://github.com/simstudioai/sim.git
synced 2026-01-09 23:17:59 -05:00
Compare commits
2 Commits
main
...
feat/run-f
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
6d46b44e51 | ||
|
|
ff99d75055 |
@@ -11,15 +11,23 @@ import {
|
||||
loadDeployedWorkflowState,
|
||||
loadWorkflowFromNormalizedTables,
|
||||
} from '@/lib/workflows/db-helpers'
|
||||
import type { NormalizedWorkflowData } from '@/lib/workflows/db-helpers'
|
||||
import { executeWorkflowCore } from '@/lib/workflows/executor/execution-core'
|
||||
import { type ExecutionEvent, encodeSSEEvent } from '@/lib/workflows/executor/execution-events'
|
||||
import { PauseResumeManager } from '@/lib/workflows/executor/pause-resume-manager'
|
||||
import { createStreamingResponse } from '@/lib/workflows/streaming'
|
||||
import { validateWorkflowAccess } from '@/app/api/workflows/middleware'
|
||||
import { type ExecutionMetadata, ExecutionSnapshot } from '@/executor/execution/snapshot'
|
||||
import {
|
||||
type ExecutionMetadata,
|
||||
type SerializableExecutionState,
|
||||
ExecutionSnapshot,
|
||||
} from '@/executor/execution/snapshot'
|
||||
import type { StreamingExecution } from '@/executor/types'
|
||||
import { Serializer } from '@/serializer'
|
||||
import type { SubflowType } from '@/stores/workflows/workflow/types'
|
||||
import { getWorkflowExecutionState } from '@/lib/workflows/execution-state/service'
|
||||
import { buildRunFromBlockPlan } from '@/lib/workflows/run-from-block/planner'
|
||||
import { TriggerUtils } from '@/lib/workflows/triggers'
|
||||
|
||||
const logger = createLogger('WorkflowExecuteAPI')
|
||||
|
||||
@@ -30,6 +38,7 @@ const ExecuteWorkflowSchema = z.object({
|
||||
useDraftState: z.boolean().optional(),
|
||||
input: z.any().optional(),
|
||||
startBlockId: z.string().optional(),
|
||||
executionMode: z.enum(['run_from_block']).optional(),
|
||||
})
|
||||
|
||||
export const runtime = 'nodejs'
|
||||
@@ -123,6 +132,7 @@ export async function executeWorkflow(
|
||||
triggerType,
|
||||
useDraftState: false,
|
||||
startTime: new Date().toISOString(),
|
||||
executionMode: 'full',
|
||||
}
|
||||
|
||||
const snapshot = new ExecutionSnapshot(
|
||||
@@ -310,6 +320,8 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
|
||||
stream: streamParam,
|
||||
useDraftState,
|
||||
input: validatedInput,
|
||||
startBlockId,
|
||||
executionMode,
|
||||
} = validation.data
|
||||
|
||||
// For API key auth, the entire body is the input (except for our control fields)
|
||||
@@ -322,16 +334,37 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
|
||||
})()
|
||||
: validatedInput
|
||||
|
||||
const shouldUseDraftState = useDraftState ?? auth.authType === 'session'
|
||||
let shouldUseDraftState = useDraftState ?? auth.authType === 'session'
|
||||
|
||||
const isRunFromBlock = executionMode === 'run_from_block'
|
||||
|
||||
if (isRunFromBlock && auth.authType !== 'session') {
|
||||
return NextResponse.json(
|
||||
{ error: 'Run from block is only available within the client editor' },
|
||||
{ status: 403 }
|
||||
)
|
||||
}
|
||||
|
||||
if (isRunFromBlock) {
|
||||
if (!startBlockId) {
|
||||
return NextResponse.json(
|
||||
{ error: 'Run from block requires a block identifier.' },
|
||||
{ status: 400 }
|
||||
)
|
||||
}
|
||||
shouldUseDraftState = true
|
||||
}
|
||||
|
||||
const streamHeader = req.headers.get('X-Stream-Response') === 'true'
|
||||
const enableSSE = streamHeader || streamParam === true
|
||||
|
||||
const effectiveTriggerType = isRunFromBlock ? 'manual' : triggerType
|
||||
|
||||
logger.info(`[${requestId}] Starting server-side execution`, {
|
||||
workflowId,
|
||||
userId,
|
||||
hasInput: !!input,
|
||||
triggerType,
|
||||
triggerType: effectiveTriggerType,
|
||||
authType: auth.authType,
|
||||
streamParam,
|
||||
streamHeader,
|
||||
@@ -342,13 +375,13 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
|
||||
type LoggingTriggerType = 'api' | 'webhook' | 'schedule' | 'manual' | 'chat'
|
||||
let loggingTriggerType: LoggingTriggerType = 'manual'
|
||||
if (
|
||||
triggerType === 'api' ||
|
||||
triggerType === 'chat' ||
|
||||
triggerType === 'webhook' ||
|
||||
triggerType === 'schedule' ||
|
||||
triggerType === 'manual'
|
||||
effectiveTriggerType === 'api' ||
|
||||
effectiveTriggerType === 'chat' ||
|
||||
effectiveTriggerType === 'webhook' ||
|
||||
effectiveTriggerType === 'schedule' ||
|
||||
effectiveTriggerType === 'manual'
|
||||
) {
|
||||
loggingTriggerType = triggerType as LoggingTriggerType
|
||||
loggingTriggerType = effectiveTriggerType as LoggingTriggerType
|
||||
}
|
||||
const loggingSession = new LoggingSession(
|
||||
workflowId,
|
||||
@@ -366,7 +399,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
|
||||
currentUsage: usageCheck.currentUsage,
|
||||
limit: usageCheck.limit,
|
||||
workflowId,
|
||||
triggerType,
|
||||
triggerType: effectiveTriggerType,
|
||||
})
|
||||
|
||||
await loggingSession.safeStart({
|
||||
@@ -395,8 +428,9 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
|
||||
|
||||
// Process file fields in workflow input (base64/URL to UserFile conversion)
|
||||
let processedInput = input
|
||||
let workflowData: NormalizedWorkflowData | null = null
|
||||
try {
|
||||
const workflowData = shouldUseDraftState
|
||||
workflowData = shouldUseDraftState
|
||||
? await loadWorkflowFromNormalizedTables(workflowId)
|
||||
: await loadDeployedWorkflowState(workflowId)
|
||||
|
||||
@@ -448,6 +482,83 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
|
||||
)
|
||||
}
|
||||
|
||||
let runFromBlockPlan:
|
||||
| {
|
||||
snapshotState: SerializableExecutionState
|
||||
resumePendingQueue: string[]
|
||||
triggerBlockId: string
|
||||
}
|
||||
| null = null
|
||||
|
||||
if (isRunFromBlock) {
|
||||
if (!workflowData) {
|
||||
return NextResponse.json(
|
||||
{ error: 'Unable to load workflow state for run from block execution' },
|
||||
{ status: 400 }
|
||||
)
|
||||
}
|
||||
|
||||
const startCandidate = TriggerUtils.findStartBlock(workflowData.blocks, 'manual', false)
|
||||
if (!startCandidate) {
|
||||
return NextResponse.json(
|
||||
{ error: 'No manual trigger block found for this workflow' },
|
||||
{ status: 400 }
|
||||
)
|
||||
}
|
||||
|
||||
const latestState = await getWorkflowExecutionState(workflowId, startCandidate.blockId)
|
||||
if (!latestState) {
|
||||
return NextResponse.json(
|
||||
{
|
||||
error:
|
||||
'No prior execution snapshot found. Run the workflow once before using run from block.',
|
||||
},
|
||||
{ status: 400 }
|
||||
)
|
||||
}
|
||||
|
||||
const serializedWorkflow = new Serializer().serializeWorkflow(
|
||||
workflowData.blocks,
|
||||
workflowData.edges,
|
||||
workflowData.loops,
|
||||
workflowData.parallels,
|
||||
true
|
||||
)
|
||||
|
||||
try {
|
||||
const trimmedStartBlockId = startBlockId!.trim()
|
||||
|
||||
const triggerBlockIdForPlan = latestState.triggerBlockId || startCandidate.blockId
|
||||
|
||||
const plan = buildRunFromBlockPlan({
|
||||
serializedWorkflow,
|
||||
previousState: latestState.serializedState,
|
||||
previousResolvedInputs: latestState.resolvedInputs,
|
||||
previousResolvedOutputs: latestState.resolvedOutputs,
|
||||
previousWorkflow: latestState.serializedWorkflow,
|
||||
startBlockId: trimmedStartBlockId,
|
||||
triggerBlockId: triggerBlockIdForPlan,
|
||||
})
|
||||
|
||||
runFromBlockPlan = {
|
||||
snapshotState: plan.snapshotState,
|
||||
resumePendingQueue: plan.resumePendingQueue,
|
||||
triggerBlockId: triggerBlockIdForPlan,
|
||||
}
|
||||
} catch (planError) {
|
||||
logger.error(`[${requestId}] Failed to build run-from-block plan`, {
|
||||
error: planError,
|
||||
})
|
||||
return NextResponse.json(
|
||||
{
|
||||
error:
|
||||
planError instanceof Error ? planError.message : 'Unable to build run from block plan',
|
||||
},
|
||||
{ status: 400 }
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
if (!enableSSE) {
|
||||
logger.info(`[${requestId}] Using non-SSE execution (direct JSON response)`)
|
||||
try {
|
||||
@@ -457,9 +568,13 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
|
||||
workflowId,
|
||||
workspaceId: workflow.workspaceId,
|
||||
userId,
|
||||
triggerType,
|
||||
triggerType: effectiveTriggerType,
|
||||
triggerBlockId: runFromBlockPlan?.triggerBlockId,
|
||||
useDraftState: shouldUseDraftState,
|
||||
startTime: new Date().toISOString(),
|
||||
resumeFromSnapshot: isRunFromBlock,
|
||||
executionMode: isRunFromBlock ? 'run_from_block' : 'full',
|
||||
pendingBlocks: runFromBlockPlan?.resumePendingQueue,
|
||||
}
|
||||
|
||||
const snapshot = new ExecutionSnapshot(
|
||||
@@ -468,7 +583,8 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
|
||||
processedInput,
|
||||
{},
|
||||
workflow.variables || {},
|
||||
selectedOutputs
|
||||
selectedOutputs,
|
||||
runFromBlockPlan?.snapshotState
|
||||
)
|
||||
|
||||
const result = await executeWorkflowCore({
|
||||
@@ -528,7 +644,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
|
||||
streamConfig: {
|
||||
selectedOutputs: resolvedSelectedOutputs,
|
||||
isSecureMode: false,
|
||||
workflowTriggerType: triggerType === 'chat' ? 'chat' : 'api',
|
||||
workflowTriggerType: effectiveTriggerType === 'chat' ? 'chat' : 'api',
|
||||
},
|
||||
createFilteredResult,
|
||||
executionId,
|
||||
@@ -711,9 +827,13 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
|
||||
workflowId,
|
||||
workspaceId: workflow.workspaceId,
|
||||
userId,
|
||||
triggerType,
|
||||
triggerType: effectiveTriggerType,
|
||||
triggerBlockId: runFromBlockPlan?.triggerBlockId,
|
||||
useDraftState: shouldUseDraftState,
|
||||
startTime: new Date().toISOString(),
|
||||
resumeFromSnapshot: isRunFromBlock,
|
||||
executionMode: isRunFromBlock ? 'run_from_block' : 'full',
|
||||
pendingBlocks: runFromBlockPlan?.resumePendingQueue,
|
||||
}
|
||||
|
||||
const snapshot = new ExecutionSnapshot(
|
||||
@@ -722,7 +842,8 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
|
||||
processedInput,
|
||||
{},
|
||||
workflow.variables || {},
|
||||
selectedOutputs
|
||||
selectedOutputs,
|
||||
runFromBlockPlan?.snapshotState
|
||||
)
|
||||
|
||||
const result = await executeWorkflowCore({
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
import { memo, useCallback } from 'react'
|
||||
import { ArrowLeftRight, ArrowUpDown, Circle, CircleOff, LogOut } from 'lucide-react'
|
||||
import { ArrowLeftRight, ArrowUpDown, Circle, CircleOff, LogOut, Play } from 'lucide-react'
|
||||
import { Button, Duplicate, Tooltip, Trash2 } from '@/components/emcn'
|
||||
import { cn } from '@/lib/utils'
|
||||
import { useUserPermissionsContext } from '@/app/workspace/[workspaceId]/providers/workspace-permissions-provider'
|
||||
@@ -16,6 +16,10 @@ interface ActionBarProps {
|
||||
blockType: string
|
||||
/** Whether the action bar is disabled */
|
||||
disabled?: boolean
|
||||
/** Whether an execution is currently in progress */
|
||||
isExecuting?: boolean
|
||||
/** Handler to run the workflow starting from this block */
|
||||
onRunFromBlock?: (blockId: string) => Promise<any> | void
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -25,7 +29,13 @@ interface ActionBarProps {
|
||||
* @component
|
||||
*/
|
||||
export const ActionBar = memo(
|
||||
function ActionBar({ blockId, blockType, disabled = false }: ActionBarProps) {
|
||||
function ActionBar({
|
||||
blockId,
|
||||
blockType,
|
||||
disabled = false,
|
||||
isExecuting = false,
|
||||
onRunFromBlock,
|
||||
}: ActionBarProps) {
|
||||
const {
|
||||
collaborativeRemoveBlock,
|
||||
collaborativeToggleBlockEnabled,
|
||||
@@ -69,6 +79,8 @@ export const ActionBar = memo(
|
||||
return defaultMessage
|
||||
}
|
||||
|
||||
const canRunFromBlock = !disabled && !isExecuting && Boolean(onRunFromBlock)
|
||||
|
||||
return (
|
||||
<div
|
||||
className={cn(
|
||||
@@ -78,6 +90,26 @@ export const ActionBar = memo(
|
||||
'gap-[6px] rounded-[10px] bg-[#242424] p-[6px]'
|
||||
)}
|
||||
>
|
||||
{onRunFromBlock && (
|
||||
<Tooltip.Root>
|
||||
<Tooltip.Trigger asChild>
|
||||
<Button
|
||||
variant='ghost'
|
||||
onClick={() => {
|
||||
if (canRunFromBlock) {
|
||||
onRunFromBlock?.(blockId)
|
||||
}
|
||||
}}
|
||||
className='h-[30px] w-[30px] rounded-[8px] bg-[#363636] p-0 text-[#868686] hover:bg-[#33B4FF] hover:text-[#1B1B1B] dark:text-[#868686] dark:hover:bg-[#33B4FF] dark:hover:text-[#1B1B1B]'
|
||||
disabled={!canRunFromBlock}
|
||||
>
|
||||
<Play className='h-[14px] w-[14px]' />
|
||||
</Button>
|
||||
</Tooltip.Trigger>
|
||||
<Tooltip.Content side='right'>{getTooltipMessage('Run From Here')}</Tooltip.Content>
|
||||
</Tooltip.Root>
|
||||
)}
|
||||
|
||||
<Tooltip.Root>
|
||||
<Tooltip.Trigger asChild>
|
||||
<Button
|
||||
@@ -204,7 +236,9 @@ export const ActionBar = memo(
|
||||
return (
|
||||
prevProps.blockId === nextProps.blockId &&
|
||||
prevProps.blockType === nextProps.blockType &&
|
||||
prevProps.disabled === nextProps.disabled
|
||||
prevProps.disabled === nextProps.disabled &&
|
||||
prevProps.isExecuting === nextProps.isExecuting &&
|
||||
prevProps.onRunFromBlock === nextProps.onRunFromBlock
|
||||
)
|
||||
}
|
||||
)
|
||||
|
||||
@@ -24,6 +24,7 @@ import {
|
||||
} from './hooks'
|
||||
import type { WorkflowBlockProps } from './types'
|
||||
import { debounce, getProviderName, shouldSkipBlockRender } from './utils'
|
||||
import { useWorkflowExecution } from '../../hooks/use-workflow-execution'
|
||||
|
||||
const logger = createLogger('WorkflowBlock')
|
||||
|
||||
@@ -195,6 +196,7 @@ export const WorkflowBlock = memo(function WorkflowBlock({
|
||||
currentWorkflow,
|
||||
data
|
||||
)
|
||||
const { handleRunFromBlock, isExecuting } = useWorkflowExecution()
|
||||
|
||||
const { horizontalHandles, blockHeight, blockWidth, displayAdvancedMode, displayTriggerMode } =
|
||||
useBlockProperties(
|
||||
@@ -600,7 +602,13 @@ export const WorkflowBlock = memo(function WorkflowBlock({
|
||||
</div>
|
||||
)}
|
||||
|
||||
<ActionBar blockId={id} blockType={type} disabled={!userPermissions.canEdit} />
|
||||
<ActionBar
|
||||
blockId={id}
|
||||
blockType={type}
|
||||
disabled={!userPermissions.canEdit}
|
||||
isExecuting={isExecuting}
|
||||
onRunFromBlock={handleRunFromBlock}
|
||||
/>
|
||||
|
||||
{shouldShowDefaultHandles && (
|
||||
<Connections blockId={id} horizontalHandles={horizontalHandles} />
|
||||
|
||||
@@ -643,7 +643,11 @@ export function useWorkflowExecution() {
|
||||
onStream?: (se: StreamingExecution) => Promise<void>,
|
||||
executionId?: string,
|
||||
onBlockComplete?: (blockId: string, output: any) => Promise<void>,
|
||||
overrideTriggerType?: 'chat' | 'manual' | 'api'
|
||||
overrideTriggerType?: 'chat' | 'manual' | 'api',
|
||||
overrides?: {
|
||||
startBlockId?: string
|
||||
executionMode?: 'run_from_block'
|
||||
}
|
||||
): Promise<ExecutionResult | StreamingExecution> => {
|
||||
// Use currentWorkflow but check if we're in diff mode
|
||||
const { blocks: workflowBlocks, edges: workflowEdges } = currentWorkflow
|
||||
@@ -725,10 +729,19 @@ export function useWorkflowExecution() {
|
||||
}
|
||||
|
||||
// Determine start block and workflow input based on execution type
|
||||
let startBlockId: string | undefined
|
||||
const overrideStartBlockId = overrides?.startBlockId?.trim()
|
||||
|
||||
let startBlockId: string | undefined = overrideStartBlockId
|
||||
let finalWorkflowInput = workflowInput
|
||||
|
||||
if (isExecutingFromChat) {
|
||||
if (overrideStartBlockId) {
|
||||
if (!filteredStates[overrideStartBlockId]) {
|
||||
setIsExecuting(false)
|
||||
throw new Error('Selected block is not part of this workflow')
|
||||
}
|
||||
}
|
||||
|
||||
if (!startBlockId && isExecutingFromChat) {
|
||||
// For chat execution, find the appropriate chat trigger
|
||||
const startBlock = TriggerUtils.findStartBlock(filteredStates, 'chat')
|
||||
|
||||
@@ -737,7 +750,7 @@ export function useWorkflowExecution() {
|
||||
}
|
||||
|
||||
startBlockId = startBlock.blockId
|
||||
} else {
|
||||
} else if (!startBlockId) {
|
||||
// Manual execution: detect and group triggers by paths
|
||||
const candidates = resolveStartCandidates(filteredStates, {
|
||||
execution: 'manual',
|
||||
@@ -838,6 +851,7 @@ export function useWorkflowExecution() {
|
||||
selectedOutputs,
|
||||
triggerType: overrideTriggerType || 'manual',
|
||||
useDraftState: true,
|
||||
executionMode: overrides?.executionMode,
|
||||
callbacks: {
|
||||
onExecutionStarted: (data) => {
|
||||
logger.info('Server execution started:', data)
|
||||
@@ -1208,6 +1222,63 @@ export function useWorkflowExecution() {
|
||||
handleDebugExecutionError,
|
||||
])
|
||||
|
||||
/**
|
||||
* Handles cancelling the current debugging session
|
||||
*/
|
||||
const handleRunFromBlock = useCallback(
|
||||
async (blockId: string) => {
|
||||
if (!activeWorkflowId || !blockId?.trim()) {
|
||||
logger.warn('Run from block requested without active workflow or block id')
|
||||
return
|
||||
}
|
||||
|
||||
setExecutionResult(null)
|
||||
setIsExecuting(true)
|
||||
setIsDebugging(false)
|
||||
|
||||
try {
|
||||
const runResult = await executeWorkflow(
|
||||
undefined,
|
||||
undefined,
|
||||
undefined,
|
||||
undefined,
|
||||
'manual',
|
||||
{
|
||||
startBlockId: blockId,
|
||||
executionMode: 'run_from_block',
|
||||
}
|
||||
)
|
||||
|
||||
if (runResult && 'metadata' in runResult && runResult.metadata?.isDebugSession) {
|
||||
setDebugContext(runResult.metadata.context || null)
|
||||
if (runResult.metadata.pendingBlocks) {
|
||||
setPendingBlocks(runResult.metadata.pendingBlocks)
|
||||
}
|
||||
} else if (runResult && 'success' in runResult) {
|
||||
setExecutionResult(runResult)
|
||||
setIsExecuting(false)
|
||||
setIsDebugging(false)
|
||||
setActiveBlocks(new Set())
|
||||
}
|
||||
|
||||
return runResult
|
||||
} catch (error) {
|
||||
return handleExecutionError(error, { executionId: undefined })
|
||||
}
|
||||
},
|
||||
[
|
||||
activeWorkflowId,
|
||||
executeWorkflow,
|
||||
handleExecutionError,
|
||||
setExecutionResult,
|
||||
setIsExecuting,
|
||||
setIsDebugging,
|
||||
setDebugContext,
|
||||
setPendingBlocks,
|
||||
setActiveBlocks,
|
||||
]
|
||||
)
|
||||
|
||||
/**
|
||||
* Handles cancelling the current debugging session
|
||||
*/
|
||||
@@ -1249,6 +1320,7 @@ export function useWorkflowExecution() {
|
||||
pendingBlocks,
|
||||
executionResult,
|
||||
handleRunWorkflow,
|
||||
handleRunFromBlock,
|
||||
handleStepDebug,
|
||||
handleResumeDebug,
|
||||
handleCancelDebug,
|
||||
|
||||
@@ -438,6 +438,7 @@ async function runWorkflowExecution({
|
||||
triggerBlockId: payload.blockId || undefined,
|
||||
useDraftState: false,
|
||||
startTime: new Date().toISOString(),
|
||||
executionMode: 'full',
|
||||
}
|
||||
|
||||
const snapshot = new ExecutionSnapshot(
|
||||
|
||||
@@ -234,6 +234,7 @@ async function executeWebhookJobInternal(
|
||||
triggerBlockId: payload.blockId,
|
||||
useDraftState: false,
|
||||
startTime: new Date().toISOString(),
|
||||
executionMode: 'full',
|
||||
}
|
||||
|
||||
const snapshot = new ExecutionSnapshot(
|
||||
@@ -447,6 +448,7 @@ async function executeWebhookJobInternal(
|
||||
triggerBlockId: payload.blockId,
|
||||
useDraftState: false,
|
||||
startTime: new Date().toISOString(),
|
||||
executionMode: 'full',
|
||||
}
|
||||
|
||||
const snapshot = new ExecutionSnapshot(
|
||||
|
||||
@@ -103,6 +103,7 @@ export async function executeWorkflowJob(payload: WorkflowExecutionPayload) {
|
||||
triggerType: payload.triggerType || 'api',
|
||||
useDraftState: false,
|
||||
startTime: new Date().toISOString(),
|
||||
executionMode: 'full',
|
||||
}
|
||||
|
||||
const snapshot = new ExecutionSnapshot(
|
||||
|
||||
181
apps/sim/executor/docs/run-from-block.md
Normal file
181
apps/sim/executor/docs/run-from-block.md
Normal file
@@ -0,0 +1,181 @@
|
||||
# Run-From-Block Execution Design
|
||||
|
||||
## Background
|
||||
- Manual debugging and partial reruns currently require executing an entire workflow from the trigger onward.
|
||||
- We already capture block outputs, router/condition choices, loop scopes, and DAG topology inside `SerializableExecutionState` snapshots.
|
||||
- The goal is to restart execution at any block while preserving as much prior computation as possible and maintaining true parallel semantics.
|
||||
|
||||
## Objectives
|
||||
- Persist block inputs/outputs, branch decisions, and DAG metadata for every successful execution.
|
||||
- Given a target block, recompute only the minimal subgraph whose upstream state changed or was explicitly invalidated.
|
||||
- Hydrate the executor with historical state so unresolved blocks behave exactly as in the full run.
|
||||
- Preserve orchestrator behaviour for loops, parallels, routers, and pause/resume blocks.
|
||||
- Provide observability that explains which blocks reran and why.
|
||||
|
||||
## Terminology
|
||||
- **Execution Snapshot** – persisted copy of `SerializableExecutionState` plus workflow metadata for a prior run.
|
||||
- **Target Block** – block the user wants to “run from”.
|
||||
- **Start Set** – minimal set of blocks that must be enqueued as new starting points.
|
||||
- **Resolved Block** – block whose prior output remains valid and is reused without re-executing.
|
||||
- **Restart Scope** – set of blocks that must be re-executed (includes the target block and any dependent nodes).
|
||||
|
||||
## Persisted State Model
|
||||
- Create migration (separate task) adding `workflow_execution_states` table storing:
|
||||
- `execution_id`, `workflow_id`, `trigger_block_id`, `run_version`, `serialized_state`, `resolved_inputs`, `resolved_outputs`, `status`, `attempt_at`.
|
||||
- Treat `(workflow_id, trigger_block_id)` as the logical key; each trigger maintains its own latest snapshot.
|
||||
- Extend `BlockExecutor` to persist `resolvedInputs` alongside `blockLogs` so we can diff future inputs accurately.
|
||||
- Store `dagIncomingEdges` and `remainingEdges` from snapshots to allow edge restoration when pruning.
|
||||
- Snapshot persistence is limited to executions initiated from the client/manual surface to avoid capturing automation noise.
|
||||
|
||||
## High-Level Run-From-Block Flow
|
||||
1. **Load Context**
|
||||
- Fetch latest snapshot (regardless of status) for the requested `(workflow_id, trigger_block_id)` pair, matching deploy/draft version.
|
||||
- Build fresh DAG from current workflow definition using `DAGBuilder`.
|
||||
2. **Analyse Changes**
|
||||
- Compute forward impact from target block.
|
||||
- Detect upstream changes and build initial Start Set.
|
||||
- Run backward pruning from sinks to drop unnecessary starts.
|
||||
3. **Hydrate Executor**
|
||||
- Reconstruct `ExecutionState` from snapshot.
|
||||
- Remove state for restart blocks; keep outputs for resolved blocks.
|
||||
- Clear loop/parallel scopes for affected constructs.
|
||||
4. **Seed Queue & Execute**
|
||||
- Set `context.metadata.pendingBlocks` to pruned Start Set.
|
||||
- Ensure incoming edge sets reflect resolved upstream nodes.
|
||||
- Invoke `ExecutionEngine.run()`; existing concurrency model provides parallelism.
|
||||
- After execution completes (success, pause, or failure), persist the resulting state as the new snapshot for this trigger, overwriting the previous attempt.
|
||||
- Tag `context.metadata.executionMode = 'run_from_block'` so downstream logging, billing, and analytics can classify the run.
|
||||
|
||||
## Detecting Blocks to Re-Run
|
||||
|
||||
### Step 1: Forward Impact DFS
|
||||
- Start at the target node ID (branch suffix and sentinel aware).
|
||||
- Traverse outgoing edges using the current DAG (post-builder).
|
||||
- Collect every reachable node into `affectedSet`.
|
||||
- Include sentinel nodes and parallel branch clones (IDs with subscript) to avoid missing orchestrator plumbing.
|
||||
|
||||
### Step 2: Upstream Change Detection
|
||||
- Determine the trigger block by reusing `resolveExecutorStartBlock` logic with current workflow metadata and the snapshot’s trigger info.
|
||||
- Perform DFS from the trigger through current DAG edges until:
|
||||
- Visiting the target block (stop descending past target).
|
||||
- Encountering nodes not present in the saved snapshot (mark as changed).
|
||||
- For each visited node:
|
||||
- Compare stored block configuration hash vs. current (hash metadata + params + code).
|
||||
- Resolve current inputs via `VariableResolver` with hydrated state; compare with stored `resolvedInputs`.
|
||||
- Compare stored outputs if the block ID exists in snapshot; missing outputs imply change.
|
||||
- If any mismatch or block is the explicit target, add node ID to `startCandidates`.
|
||||
- Propagate a “changed” flag downstream in this DFS so children get marked if a parent changed.
|
||||
|
||||
### Step 3: Backward Pruning from Sinks
|
||||
- Identify sink nodes (no outgoing edges or response blocks).
|
||||
- Build reverse adjacency map once per run.
|
||||
- DFS backward from each sink, collecting `ancestorSet`.
|
||||
- The minimal Start Set is `startCandidates ∩ ancestorSet`.
|
||||
- Always include target block even if intersection would remove it.
|
||||
- Remove nodes already in `affectedSet` but whose outputs remain identical and have no path to sinks (defensive guard).
|
||||
|
||||
### Step 4: Final Restart Scope
|
||||
- `restartScope = affectedSet ∪ startSet`.
|
||||
- Any block not in `restartScope` remains resolved and treated as completed.
|
||||
|
||||
## Queue Preparation
|
||||
- Hydrate `ExecutionState` with stored block states.
|
||||
- For every node in `restartScope`:
|
||||
- Remove entry from `ExecutionState.blockStates` and `executedBlocks`.
|
||||
- Delete related router/condition decisions.
|
||||
- Restore incoming edges using `dagIncomingEdges` snapshot.
|
||||
- For each resolved upstream node:
|
||||
- Confirm its outgoing edges are processed via `edgeManager.processOutgoingEdges(node, storedOutput, false)` to drop satisfied dependencies and enqueue children when needed.
|
||||
- Populate `context.metadata.pendingBlocks` with the Start Set (deduped, sorted for determinism).
|
||||
- ExecutionEngine’s `initializeQueue` path sees `pendingBlocks` and pushes them onto the ready queue.
|
||||
|
||||
## Loop Handling
|
||||
- Loops are restarted from iteration zero when any block inside the loop body requires re-execution.
|
||||
- Implementation:
|
||||
- When diff logic marks a loop body node (or sentinel) as changed, include `loopId` in `loopRestartSet`.
|
||||
- Before execution:
|
||||
- Drop entries for `loopId` from `context.loopExecutions`.
|
||||
- Call `LoopOrchestrator.clearLoopExecutionState(loopId)` and `restoreLoopEdges(loopId)` to reset sentinel edges.
|
||||
- Enqueue the sentinel start node if it belongs to Start Set; loop orchestrator will rebuild scopes as nodes run.
|
||||
- Snapshot values for loops (aggregated outputs) are reused only if loop not in restart scope.
|
||||
|
||||
## Parallel Handling
|
||||
- Parallel branches already map to distinct node IDs (`blockId₍index₎`).
|
||||
- Forward/backward DFS naturally include affected branch nodes.
|
||||
- When any branch node re-runs:
|
||||
- Clear saved outputs for that branch node only.
|
||||
- `ParallelOrchestrator` lazily recreates scope the first time a branch completes (scope removed from `context.parallelExecutions` for impacted `parallelId`).
|
||||
- Branches outside restart scope retain previous outputs, preserving aggregated results.
|
||||
|
||||
## Routers and Conditions
|
||||
- Decisions stored in `context.decisions.router` / `context.decisions.condition`.
|
||||
- When a router or condition is in restart scope, remove its decision before queuing so `EdgeManager.shouldActivateEdge` re-evaluates the correct branch.
|
||||
- For resolved routers/conditions keep decisions to avoid re-activating previously pruned edges.
|
||||
|
||||
## Pause/Resume Blocks
|
||||
- If an approval block lies in restart scope:
|
||||
- Strip `_pauseMetadata` before execution so we do not treat stored metadata as fresh pause output.
|
||||
- Re-running will register new pause contexts (or skip if configuration changed).
|
||||
- If outside restart scope:
|
||||
- Preserve `_pauseMetadata` in reused outputs but flag them as historical (optional boolean) so UI can show they are not actionable resumes.
|
||||
- Resume triggers are allowed only when `context.metadata.resumeFromSnapshot` is true; run-from-block will set `allowResumeTriggers = false` to avoid accidental resume nodes unless explicitly in Start Set.
|
||||
|
||||
## Variable Resolution
|
||||
- Hydrate `VariableResolver` with reconstructed `ExecutionState`; resolved blocks provide outputs to satisfy `<block.field>` references.
|
||||
- For blocks in restart scope, removal from `ExecutionState` forces resolver to recompute when node executes.
|
||||
- During diffing:
|
||||
- Temporarily use hydrated state (before pruning) to resolve current inputs; if resolution fails (missing upstream output), mark block as changed.
|
||||
- Workflow variables and environment variables come from persisted snapshot metadata to ensure deterministic comparisons.
|
||||
|
||||
## Concurrency & Queueing Semantics
|
||||
- Existing `ExecutionEngine` already executes all ready nodes concurrently; no changes needed for “true parallelism”.
|
||||
- Optionally expose `maxConcurrency` in `RunFromBlockPlan` for future throttling; default remains unbounded.
|
||||
- `readyQueue` is seeded exclusively with Start Set; downstream nodes become ready through normal edge processing when dependencies complete.
|
||||
- Reuse streaming callbacks (`onBlockStart`, `onBlockComplete`, `onStream`) so rerun blocks stream back to the client console in real time; resolved blocks remain silent.
|
||||
|
||||
## Error Handling
|
||||
- Missing snapshot or version mismatch → if no historical state exists, fall back to a full-run plan (Start Set becomes the original trigger).
|
||||
- Missing target node in current DAG → abort (workflow changed too much).
|
||||
- If diff fails due to unresolvable reference, treat as changed and re-run from that node.
|
||||
- Execution failures or aborts overwrite the stored snapshot with the partial state, ensuring subsequent attempts resume from the latest client-visible data.
|
||||
|
||||
## Observability
|
||||
- Add structured logs via `logger.info`:
|
||||
- Snapshot metadata (execution ID, createdAt, version).
|
||||
- Forward DFS result size (`affectedCount`).
|
||||
- Upstream diff summary (block IDs and reasons).
|
||||
- Loop/parallel scopes cleared.
|
||||
- Start Set list before queueing.
|
||||
- Emit metrics counters (future) for number of blocks reused vs. re-executed.
|
||||
|
||||
## Storage & Migration Plan
|
||||
- Migration (later task) creates new table; store raw JSON of execution state and run version.
|
||||
- Repository layer exposes:
|
||||
- `saveExecutionState(executionResult)` after each client-initiated run, regardless of outcome.
|
||||
- `loadLatestExecutionState(workflowId, runVersion)` when run-from-block invoked.
|
||||
- Consider TTL or pruning policy to limit history growth.
|
||||
|
||||
## Testing Strategy
|
||||
- Unit tests:
|
||||
- Forward/backward DFS utilities with loops/parallels.
|
||||
- Diff logic for input/output/config changes.
|
||||
- Loop restart logic resets scopes.
|
||||
- Pause block behaviour (metadata stripping vs. reuse).
|
||||
- Integration tests:
|
||||
- Full workflow with branches, loops, approvals – run once, mutate input, run-from-block, assert only necessary blocks rerun.
|
||||
- Scenario where upstream config change triggers deeper downstream re-execution.
|
||||
- Regression tests ensuring original full-run path unchanged.
|
||||
|
||||
## Open Questions
|
||||
- Should we expose UI controls to override Start Set suggestions?
|
||||
- How do we surface “historical” pause blocks to users?
|
||||
- Should we snapshot tool execution side-effects (e.g., notifications) or always rerun them?
|
||||
|
||||
## Future Enhancements
|
||||
- Cache hashed block configurations to speed change detection.
|
||||
- Allow selecting older execution snapshots.
|
||||
- Layer on speculative parallelism controls (per-branch throttling).
|
||||
- Add dry-run mode that reports Start Set without executing.
|
||||
- Extend the client block toolbar so hovering a block reveals the “Run from block” action alongside existing controls, invoking the documented flow with proper logging and billing hooks.
|
||||
|
||||
|
||||
@@ -87,6 +87,13 @@ export class BlockExecutor {
|
||||
}
|
||||
cleanupSelfReference?.()
|
||||
|
||||
if (!isSentinel) {
|
||||
this.state.setBlockInputs(node.id, resolvedInputs)
|
||||
if (blockLog) {
|
||||
blockLog.input = resolvedInputs
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
const output = handler.executeWithNode
|
||||
? await handler.executeWithNode(ctx, block, resolvedInputs, nodeMetadata)
|
||||
|
||||
@@ -19,6 +19,7 @@ const logger = createLogger('ExecutionEngine')
|
||||
export class ExecutionEngine {
|
||||
private readyQueue: string[] = []
|
||||
private executing = new Set<Promise<void>>()
|
||||
private executingNodes = new Set<string>()
|
||||
private queueLock = Promise.resolve()
|
||||
private finalOutput: NormalizedBlockOutput = {}
|
||||
private pausedBlocks: Map<string, PauseMetadata> = new Map()
|
||||
|
||||
@@ -1,17 +1,20 @@
|
||||
import { createLogger } from '@/lib/logs/console/logger'
|
||||
import { StartBlockPath } from '@/lib/workflows/triggers'
|
||||
import type { BlockOutput } from '@/blocks/types'
|
||||
import type { DAG } from '@/executor/dag/builder'
|
||||
import { DAGBuilder } from '@/executor/dag/builder'
|
||||
import { BlockExecutor } from '@/executor/execution/block-executor'
|
||||
import { EdgeManager } from '@/executor/execution/edge-manager'
|
||||
import { ExecutionEngine } from '@/executor/execution/engine'
|
||||
import { ExecutionState } from '@/executor/execution/state'
|
||||
import type { ContextExtensions, WorkflowInput } from '@/executor/execution/types'
|
||||
import { buildSerializableExecutionState } from '@/executor/execution/snapshot-serializer'
|
||||
import { createBlockHandlers } from '@/executor/handlers/registry'
|
||||
import { LoopOrchestrator } from '@/executor/orchestrators/loop'
|
||||
import { NodeExecutionOrchestrator } from '@/executor/orchestrators/node'
|
||||
import { ParallelOrchestrator } from '@/executor/orchestrators/parallel'
|
||||
import type { BlockState, ExecutionContext, ExecutionResult } from '@/executor/types'
|
||||
import type { SerializableExecutionState } from '@/executor/execution/snapshot'
|
||||
import {
|
||||
buildResolutionFromBlock,
|
||||
buildStartBlockOutput,
|
||||
@@ -39,6 +42,8 @@ export class DAGExecutor {
|
||||
private contextExtensions: ContextExtensions
|
||||
private isCancelled = false
|
||||
private dagBuilder: DAGBuilder
|
||||
private lastContext?: ExecutionContext
|
||||
private lastDag?: DAG
|
||||
|
||||
constructor(options: DAGExecutorOptions) {
|
||||
this.workflow = options.workflow
|
||||
@@ -53,6 +58,9 @@ export class DAGExecutor {
|
||||
const savedIncomingEdges = this.contextExtensions.dagIncomingEdges
|
||||
const dag = this.dagBuilder.build(this.workflow, triggerBlockId, savedIncomingEdges)
|
||||
const { context, state } = this.createExecutionContext(workflowId, triggerBlockId)
|
||||
context.metadata.triggerBlockId = triggerBlockId
|
||||
this.lastContext = context
|
||||
this.lastDag = dag
|
||||
|
||||
// Link cancellation flag to context
|
||||
Object.defineProperty(context, 'isCancelled', {
|
||||
@@ -78,6 +86,11 @@ export class DAGExecutor {
|
||||
return await engine.run(triggerBlockId)
|
||||
}
|
||||
|
||||
getSerializableState(pendingQueue: string[] = []): SerializableExecutionState | undefined {
|
||||
if (!this.lastContext) return undefined
|
||||
return buildSerializableExecutionState(this.lastContext, this.lastDag, pendingQueue)
|
||||
}
|
||||
|
||||
cancel(): void {
|
||||
this.isCancelled = true
|
||||
}
|
||||
|
||||
@@ -55,6 +55,38 @@ function serializeParallelExecutions(
|
||||
return result
|
||||
}
|
||||
|
||||
export function buildSerializableExecutionState(
|
||||
context: ExecutionContext,
|
||||
dag?: DAG,
|
||||
pendingQueue: string[] = []
|
||||
): SerializableExecutionState {
|
||||
const dagIncomingEdges: Record<string, string[]> | undefined = dag
|
||||
? Object.fromEntries(
|
||||
Array.from(dag.nodes.entries()).map(([nodeId, node]) => [
|
||||
nodeId,
|
||||
Array.from(node.incomingEdges),
|
||||
])
|
||||
)
|
||||
: undefined
|
||||
|
||||
return {
|
||||
blockStates: Object.fromEntries(context.blockStates),
|
||||
executedBlocks: Array.from(context.executedBlocks),
|
||||
blockLogs: context.blockLogs,
|
||||
decisions: {
|
||||
router: Object.fromEntries(context.decisions.router),
|
||||
condition: Object.fromEntries(context.decisions.condition),
|
||||
},
|
||||
completedLoops: Array.from(context.completedLoops),
|
||||
loopExecutions: serializeLoopExecutions(context.loopExecutions),
|
||||
parallelExecutions: serializeParallelExecutions(context.parallelExecutions),
|
||||
parallelBlockMapping: mapFromEntries(context.parallelBlockMapping),
|
||||
activeExecutionPath: Array.from(context.activeExecutionPath),
|
||||
pendingQueue,
|
||||
dagIncomingEdges,
|
||||
}
|
||||
}
|
||||
|
||||
export function serializePauseSnapshot(
|
||||
context: ExecutionContext,
|
||||
triggerBlockIds: string[],
|
||||
@@ -70,31 +102,7 @@ export function serializePauseSnapshot(
|
||||
useDraftState = true
|
||||
}
|
||||
|
||||
const dagIncomingEdges: Record<string, string[]> | undefined = dag
|
||||
? Object.fromEntries(
|
||||
Array.from(dag.nodes.entries()).map(([nodeId, node]) => [
|
||||
nodeId,
|
||||
Array.from(node.incomingEdges),
|
||||
])
|
||||
)
|
||||
: undefined
|
||||
|
||||
const state: SerializableExecutionState = {
|
||||
blockStates: Object.fromEntries(context.blockStates),
|
||||
executedBlocks: Array.from(context.executedBlocks),
|
||||
blockLogs: context.blockLogs,
|
||||
decisions: {
|
||||
router: Object.fromEntries(context.decisions.router),
|
||||
condition: Object.fromEntries(context.decisions.condition),
|
||||
},
|
||||
completedLoops: Array.from(context.completedLoops),
|
||||
loopExecutions: serializeLoopExecutions(context.loopExecutions),
|
||||
parallelExecutions: serializeParallelExecutions(context.parallelExecutions),
|
||||
parallelBlockMapping: mapFromEntries(context.parallelBlockMapping),
|
||||
activeExecutionPath: Array.from(context.activeExecutionPath),
|
||||
pendingQueue: triggerBlockIds,
|
||||
dagIncomingEdges,
|
||||
}
|
||||
const state = buildSerializableExecutionState(context, dag, triggerBlockIds)
|
||||
|
||||
const executionMetadata = {
|
||||
requestId:
|
||||
|
||||
@@ -13,6 +13,7 @@ export interface ExecutionMetadata {
|
||||
startTime: string
|
||||
pendingBlocks?: string[]
|
||||
resumeFromSnapshot?: boolean
|
||||
executionMode?: 'full' | 'run_from_block' | 'resume'
|
||||
}
|
||||
|
||||
export interface ExecutionCallbacks {
|
||||
|
||||
@@ -71,10 +71,31 @@ export class ExecutionState implements BlockStateController {
|
||||
}
|
||||
|
||||
setBlockOutput(blockId: string, output: NormalizedBlockOutput, executionTime = 0): void {
|
||||
this.blockStates.set(blockId, { output, executed: true, executionTime })
|
||||
const existing = this.blockStates.get(blockId)
|
||||
const inputs = existing?.inputs
|
||||
this.blockStates.set(blockId, {
|
||||
inputs,
|
||||
output,
|
||||
executed: true,
|
||||
executionTime,
|
||||
})
|
||||
this.executedBlocks.add(blockId)
|
||||
}
|
||||
|
||||
setBlockInputs(blockId: string, inputs: Record<string, any>): void {
|
||||
const existing = this.blockStates.get(blockId)
|
||||
if (existing) {
|
||||
this.blockStates.set(blockId, { ...existing, inputs })
|
||||
} else {
|
||||
this.blockStates.set(blockId, {
|
||||
inputs,
|
||||
output: {},
|
||||
executed: false,
|
||||
executionTime: 0,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
setBlockState(blockId: string, state: BlockState): void {
|
||||
this.blockStates.set(blockId, state)
|
||||
if (state.executed) {
|
||||
|
||||
@@ -54,6 +54,7 @@ export interface BlockStateReader {
|
||||
}
|
||||
|
||||
export interface BlockStateWriter {
|
||||
setBlockInputs(blockId: string, inputs: Record<string, any>): void
|
||||
setBlockOutput(blockId: string, output: NormalizedBlockOutput, executionTime?: number): void
|
||||
setBlockState(blockId: string, state: BlockState): void
|
||||
deleteBlockState(blockId: string): void
|
||||
|
||||
@@ -136,9 +136,11 @@ export interface ExecutionMetadata {
|
||||
triggerBlockId?: string
|
||||
useDraftState?: boolean
|
||||
resumeFromSnapshot?: boolean
|
||||
executionMode?: 'full' | 'run_from_block' | 'resume'
|
||||
}
|
||||
|
||||
export interface BlockState {
|
||||
inputs?: Record<string, any>
|
||||
output: NormalizedBlockOutput
|
||||
executed: boolean
|
||||
executionTime: number
|
||||
|
||||
@@ -61,6 +61,7 @@ export interface ExecuteStreamOptions {
|
||||
startBlockId?: string
|
||||
triggerType?: string
|
||||
useDraftState?: boolean
|
||||
executionMode?: 'run_from_block'
|
||||
callbacks?: ExecutionStreamCallbacks
|
||||
}
|
||||
|
||||
|
||||
139
apps/sim/lib/workflows/execution-state/service.ts
Normal file
139
apps/sim/lib/workflows/execution-state/service.ts
Normal file
@@ -0,0 +1,139 @@
|
||||
import { db } from '@sim/db'
|
||||
import { workflowExecutionStates } from '@sim/db/schema'
|
||||
import { and, eq } from 'drizzle-orm'
|
||||
import { v4 as uuidv4 } from 'uuid'
|
||||
import { createLogger } from '@/lib/logs/console/logger'
|
||||
import type { SerializableExecutionState } from '@/executor/execution/snapshot'
|
||||
import type { SerializedWorkflow } from '@/serializer/types'
|
||||
|
||||
const logger = createLogger('WorkflowExecutionStateService')
|
||||
|
||||
export type WorkflowExecutionStateStatus = 'success' | 'failed' | 'paused'
|
||||
|
||||
export interface WorkflowExecutionStateRecord {
|
||||
id: string
|
||||
workflowId: string
|
||||
triggerBlockId: string
|
||||
executionId: string
|
||||
runVersion: string | null
|
||||
serializedState: SerializableExecutionState
|
||||
serializedWorkflow?: SerializedWorkflow
|
||||
resolvedInputs: Record<string, any>
|
||||
resolvedOutputs: Record<string, any>
|
||||
status: WorkflowExecutionStateStatus
|
||||
attemptAt: string
|
||||
}
|
||||
|
||||
export interface UpsertWorkflowExecutionStateParams {
|
||||
workflowId: string
|
||||
triggerBlockId: string
|
||||
executionId: string
|
||||
runVersion?: string | null
|
||||
serializedState: SerializableExecutionState
|
||||
serializedWorkflow?: SerializedWorkflow
|
||||
resolvedInputs: Record<string, any>
|
||||
resolvedOutputs: Record<string, any>
|
||||
status: WorkflowExecutionStateStatus
|
||||
attemptAt?: Date
|
||||
}
|
||||
|
||||
export async function upsertWorkflowExecutionState(
|
||||
params: UpsertWorkflowExecutionStateParams
|
||||
): Promise<WorkflowExecutionStateRecord> {
|
||||
const {
|
||||
workflowId,
|
||||
triggerBlockId,
|
||||
executionId,
|
||||
runVersion = null,
|
||||
serializedState,
|
||||
serializedWorkflow,
|
||||
resolvedInputs,
|
||||
resolvedOutputs,
|
||||
status,
|
||||
} = params
|
||||
const attemptAt = params.attemptAt ?? new Date()
|
||||
|
||||
const insertValues = {
|
||||
id: uuidv4(),
|
||||
workflowId,
|
||||
triggerBlockId,
|
||||
executionId,
|
||||
runVersion,
|
||||
serializedState,
|
||||
serializedWorkflow,
|
||||
resolvedInputs,
|
||||
resolvedOutputs,
|
||||
status,
|
||||
attemptAt,
|
||||
}
|
||||
|
||||
const [row] = await db
|
||||
.insert(workflowExecutionStates)
|
||||
.values(insertValues)
|
||||
.onConflictDoUpdate({
|
||||
target: [workflowExecutionStates.workflowId, workflowExecutionStates.triggerBlockId],
|
||||
set: {
|
||||
executionId,
|
||||
runVersion,
|
||||
serializedState,
|
||||
serializedWorkflow,
|
||||
resolvedInputs,
|
||||
resolvedOutputs,
|
||||
status,
|
||||
attemptAt,
|
||||
},
|
||||
})
|
||||
.returning()
|
||||
|
||||
if (!row) {
|
||||
throw new Error('Failed to upsert workflow execution state')
|
||||
}
|
||||
|
||||
logger.info('Persisted workflow execution state', {
|
||||
workflowId,
|
||||
triggerBlockId,
|
||||
executionId,
|
||||
status,
|
||||
})
|
||||
|
||||
return mapRow(row)
|
||||
}
|
||||
|
||||
export async function getWorkflowExecutionState(
|
||||
workflowId: string,
|
||||
triggerBlockId: string
|
||||
): Promise<WorkflowExecutionStateRecord | null> {
|
||||
const [row] = await db
|
||||
.select()
|
||||
.from(workflowExecutionStates)
|
||||
.where(
|
||||
and(
|
||||
eq(workflowExecutionStates.workflowId, workflowId),
|
||||
eq(workflowExecutionStates.triggerBlockId, triggerBlockId)
|
||||
)
|
||||
)
|
||||
.limit(1)
|
||||
|
||||
if (!row) {
|
||||
return null
|
||||
}
|
||||
|
||||
return mapRow(row)
|
||||
}
|
||||
|
||||
function mapRow(row: typeof workflowExecutionStates.$inferSelect): WorkflowExecutionStateRecord {
|
||||
return {
|
||||
id: row.id,
|
||||
workflowId: row.workflowId,
|
||||
triggerBlockId: row.triggerBlockId,
|
||||
executionId: row.executionId,
|
||||
runVersion: row.runVersion,
|
||||
serializedState: row.serializedState as SerializableExecutionState,
|
||||
serializedWorkflow: row.serializedWorkflow as SerializedWorkflow | undefined,
|
||||
resolvedInputs: row.resolvedInputs as Record<string, any>,
|
||||
resolvedOutputs: row.resolvedOutputs as Record<string, any>,
|
||||
status: row.status as WorkflowExecutionStateStatus,
|
||||
attemptAt: row.attemptAt.toISOString(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -21,6 +21,10 @@ import type { ExecutionCallbacks, ExecutionSnapshot } from '@/executor/execution
|
||||
import type { ExecutionResult } from '@/executor/types'
|
||||
import { Serializer } from '@/serializer'
|
||||
import { mergeSubblockState } from '@/stores/workflows/server-utils'
|
||||
import {
|
||||
upsertWorkflowExecutionState,
|
||||
type WorkflowExecutionStateStatus,
|
||||
} from '@/lib/workflows/execution-state/service'
|
||||
|
||||
const logger = createLogger('ExecutionCore')
|
||||
|
||||
@@ -335,6 +339,73 @@ export async function executeWorkflowCore(
|
||||
resolvedTriggerBlockId
|
||||
)) as ExecutionResult
|
||||
|
||||
const executionMode =
|
||||
snapshot.metadata.executionMode ?? (metadata.resumeFromSnapshot ? 'resume' : 'full')
|
||||
snapshot.metadata.executionMode = executionMode
|
||||
const storageTriggerBlockId = metadata.triggerBlockId ?? resolvedTriggerBlockId
|
||||
metadata.triggerBlockId = storageTriggerBlockId
|
||||
if (result.metadata) {
|
||||
result.metadata.executionMode = result.metadata.executionMode ?? executionMode
|
||||
if (storageTriggerBlockId) {
|
||||
result.metadata.triggerBlockId =
|
||||
result.metadata.triggerBlockId ?? storageTriggerBlockId
|
||||
}
|
||||
}
|
||||
|
||||
if (snapshot.metadata.useDraftState && storageTriggerBlockId) {
|
||||
try {
|
||||
const pendingQueue =
|
||||
result.metadata?.pendingBlocks && result.metadata.pendingBlocks.length > 0
|
||||
? result.metadata.pendingBlocks
|
||||
: []
|
||||
const serializedState = executorInstance.getSerializableState(pendingQueue)
|
||||
if (serializedState) {
|
||||
const resolvedInputs: Record<string, any> = {}
|
||||
const resolvedOutputs: Record<string, any> = {}
|
||||
|
||||
for (const [blockId, state] of Object.entries(serializedState.blockStates)) {
|
||||
resolvedInputs[blockId] = state.inputs ?? {}
|
||||
resolvedOutputs[blockId] = state.output ?? {}
|
||||
}
|
||||
|
||||
let status: WorkflowExecutionStateStatus = 'success'
|
||||
if (result.status === 'paused') {
|
||||
status = 'paused'
|
||||
} else if (!result.success) {
|
||||
status = 'failed'
|
||||
}
|
||||
|
||||
const runVersion =
|
||||
typeof (workflow as any)?.updatedAt === 'string'
|
||||
? ((workflow as any).updatedAt as string)
|
||||
: null
|
||||
|
||||
await upsertWorkflowExecutionState({
|
||||
workflowId,
|
||||
triggerBlockId: storageTriggerBlockId,
|
||||
executionId: metadata.executionId ?? metadata.requestId ?? '',
|
||||
runVersion,
|
||||
serializedState,
|
||||
serializedWorkflow,
|
||||
resolvedInputs,
|
||||
resolvedOutputs,
|
||||
status,
|
||||
})
|
||||
} else {
|
||||
logger.warn(`[${requestId}] Unable to capture execution state for persistence`, {
|
||||
workflowId,
|
||||
triggerBlockId: storageTriggerBlockId,
|
||||
})
|
||||
}
|
||||
} catch (stateError) {
|
||||
logger.error(`[${requestId}] Failed to persist workflow execution state`, {
|
||||
workflowId,
|
||||
triggerBlockId: storageTriggerBlockId,
|
||||
error: stateError,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// Build trace spans for logging from the full execution result
|
||||
const { traceSpans, totalDuration } = buildTraceSpans(result)
|
||||
|
||||
|
||||
529
apps/sim/lib/workflows/run-from-block/planner.ts
Normal file
529
apps/sim/lib/workflows/run-from-block/planner.ts
Normal file
@@ -0,0 +1,529 @@
|
||||
import { DAGBuilder } from '@/executor/dag/builder'
|
||||
import type { DAGNode } from '@/executor/dag/builder'
|
||||
import type { SerializableExecutionState } from '@/executor/execution/snapshot'
|
||||
import type { SerializedWorkflow } from '@/serializer/types'
|
||||
import { createLogger } from '@/lib/logs/console/logger'
|
||||
|
||||
export interface RunFromBlockPlan {
|
||||
snapshotState: SerializableExecutionState
|
||||
resumePendingQueue: string[]
|
||||
}
|
||||
|
||||
export interface BuildRunFromBlockPlanParams {
|
||||
serializedWorkflow: SerializedWorkflow
|
||||
previousState: SerializableExecutionState
|
||||
previousResolvedInputs?: Record<string, any>
|
||||
previousResolvedOutputs?: Record<string, any>
|
||||
previousWorkflow?: SerializedWorkflow
|
||||
startBlockId: string
|
||||
triggerBlockId: string
|
||||
}
|
||||
|
||||
const logger = createLogger('RunFromBlockPlanner')
|
||||
|
||||
/**
|
||||
* Builds an execution plan for running a workflow starting from a specific block.
|
||||
*
|
||||
* Performs forward impact detection, upstream change analysis, backward pruning,
|
||||
* and snapshot pruning so that only the minimally required nodes are re-executed.
|
||||
*/
|
||||
export function buildRunFromBlockPlan(params: BuildRunFromBlockPlanParams): RunFromBlockPlan {
|
||||
const {
|
||||
serializedWorkflow,
|
||||
previousState,
|
||||
previousWorkflow,
|
||||
startBlockId,
|
||||
triggerBlockId,
|
||||
previousResolvedInputs,
|
||||
previousResolvedOutputs,
|
||||
} = params
|
||||
|
||||
const dagBuilder = new DAGBuilder()
|
||||
const dag = dagBuilder.build(serializedWorkflow)
|
||||
|
||||
const originalToNodeIds = mapOriginalIdsToNodes(dag.nodes)
|
||||
let startNodeIds = originalToNodeIds.get(startBlockId)
|
||||
|
||||
if (!startNodeIds || startNodeIds.length === 0) {
|
||||
if (dag.nodes.has(startBlockId)) {
|
||||
startNodeIds = [startBlockId]
|
||||
} else {
|
||||
throw new Error(`Start block ${startBlockId} not found in workflow DAG`)
|
||||
}
|
||||
}
|
||||
|
||||
const triggerNodeIds = determineTriggerNodeIds(triggerBlockId, originalToNodeIds, dag.nodes)
|
||||
const stopNodeIds = new Set(startNodeIds)
|
||||
|
||||
const forwardImpact = collectDownstreamNodes(dag.nodes, new Set(startNodeIds))
|
||||
|
||||
const upstreamAnalysis = analyzeUpstreamDifferences({
|
||||
dag,
|
||||
triggerNodeIds,
|
||||
stopNodeIds,
|
||||
previousState,
|
||||
previousResolvedInputs,
|
||||
previousResolvedOutputs,
|
||||
currentWorkflow: serializedWorkflow,
|
||||
previousWorkflow,
|
||||
})
|
||||
|
||||
const sinkNodes = identifySinkNodes(dag.nodes)
|
||||
const ancestorSet = collectAncestors(dag.nodes, sinkNodes)
|
||||
|
||||
const prunedStartCandidates = deriveStartSet({
|
||||
upstreamCandidates: upstreamAnalysis.startCandidates,
|
||||
ancestorSet,
|
||||
stopNodeIds,
|
||||
})
|
||||
|
||||
if (prunedStartCandidates.size === 0) {
|
||||
for (const nodeId of startNodeIds) {
|
||||
prunedStartCandidates.add(nodeId)
|
||||
}
|
||||
}
|
||||
|
||||
const restartSeeds = new Set<string>([...prunedStartCandidates, ...stopNodeIds])
|
||||
const restartScope = collectDownstreamNodes(dag.nodes, restartSeeds)
|
||||
const loopIdsToReset = new Set<string>()
|
||||
const parallelIdsToReset = new Set<string>()
|
||||
const originalIdsInScope = new Set<string>()
|
||||
|
||||
// Queue all pruned upstream changes - executor handles dependency resolution for downstream
|
||||
const queueStartSet = new Set(prunedStartCandidates)
|
||||
|
||||
for (const nodeId of restartScope) {
|
||||
const node = dag.nodes.get(nodeId)
|
||||
if (!node) continue
|
||||
if (node.metadata?.loopId) {
|
||||
loopIdsToReset.add(node.metadata.loopId)
|
||||
}
|
||||
if (node.metadata?.parallelId) {
|
||||
parallelIdsToReset.add(node.metadata.parallelId)
|
||||
}
|
||||
if (node.metadata?.originalBlockId) {
|
||||
originalIdsInScope.add(node.metadata.originalBlockId)
|
||||
}
|
||||
}
|
||||
|
||||
const snapshotState = cloneSerializableState(previousState)
|
||||
|
||||
// Reset block states and execution markers for nodes in restart scope
|
||||
for (const nodeId of restartScope) {
|
||||
delete snapshotState.blockStates[nodeId]
|
||||
if (snapshotState.parallelBlockMapping) {
|
||||
delete snapshotState.parallelBlockMapping[nodeId]
|
||||
}
|
||||
}
|
||||
for (const originalId of originalIdsInScope) {
|
||||
delete snapshotState.blockStates[originalId]
|
||||
}
|
||||
|
||||
snapshotState.executedBlocks = (snapshotState.executedBlocks || []).filter(
|
||||
(executedId) => !restartScope.has(executedId) && !originalIdsInScope.has(executedId)
|
||||
)
|
||||
|
||||
snapshotState.blockLogs = (snapshotState.blockLogs || []).filter(
|
||||
(log) => !restartScope.has(log.blockId) && !originalIdsInScope.has(log.blockId)
|
||||
)
|
||||
|
||||
if (snapshotState.decisions) {
|
||||
snapshotState.decisions.router = filterDecisionMap(
|
||||
snapshotState.decisions.router,
|
||||
restartScope,
|
||||
originalIdsInScope
|
||||
)
|
||||
snapshotState.decisions.condition = filterDecisionMap(
|
||||
snapshotState.decisions.condition,
|
||||
restartScope,
|
||||
originalIdsInScope
|
||||
)
|
||||
}
|
||||
|
||||
if (snapshotState.parallelExecutions) {
|
||||
for (const parallelId of parallelIdsToReset) {
|
||||
delete snapshotState.parallelExecutions[parallelId]
|
||||
}
|
||||
}
|
||||
|
||||
if (snapshotState.loopExecutions) {
|
||||
for (const loopId of loopIdsToReset) {
|
||||
delete snapshotState.loopExecutions[loopId]
|
||||
}
|
||||
}
|
||||
|
||||
if (snapshotState.completedLoops) {
|
||||
snapshotState.completedLoops = snapshotState.completedLoops.filter(
|
||||
(loopId) => !loopIdsToReset.has(loopId)
|
||||
)
|
||||
}
|
||||
|
||||
const resumePendingQueue = Array.from(queueStartSet).sort()
|
||||
snapshotState.pendingQueue = [...resumePendingQueue]
|
||||
|
||||
if (snapshotState.activeExecutionPath) {
|
||||
snapshotState.activeExecutionPath = snapshotState.activeExecutionPath.filter(
|
||||
(nodeId) => !restartScope.has(nodeId)
|
||||
)
|
||||
}
|
||||
|
||||
logPlanSummary({
|
||||
startBlockId,
|
||||
triggerBlockId,
|
||||
startNodeIds,
|
||||
forwardImpact,
|
||||
upstreamAnalysis,
|
||||
sinkNodes,
|
||||
ancestorSet,
|
||||
prunedStartSet: prunedStartCandidates,
|
||||
queueStartSet,
|
||||
restartScope,
|
||||
})
|
||||
|
||||
return {
|
||||
snapshotState,
|
||||
resumePendingQueue,
|
||||
}
|
||||
}
|
||||
|
||||
function mapOriginalIdsToNodes(nodes: Map<string, DAGNode>): Map<string, string[]> {
|
||||
const mapping = new Map<string, string[]>()
|
||||
for (const [nodeId, node] of nodes.entries()) {
|
||||
const originalId = node.metadata?.originalBlockId ?? nodeId
|
||||
if (!mapping.has(originalId)) {
|
||||
mapping.set(originalId, [])
|
||||
}
|
||||
mapping.get(originalId)!.push(nodeId)
|
||||
}
|
||||
return mapping
|
||||
}
|
||||
|
||||
function collectDownstreamNodes(
|
||||
nodes: Map<string, DAGNode>,
|
||||
seeds: Set<string>
|
||||
): Set<string> {
|
||||
const visited = new Set<string>(seeds)
|
||||
const stack = [...seeds]
|
||||
|
||||
while (stack.length > 0) {
|
||||
const current = stack.pop()!
|
||||
const node = nodes.get(current)
|
||||
if (!node) continue
|
||||
|
||||
for (const edge of node.outgoingEdges.values()) {
|
||||
if (!visited.has(edge.target)) {
|
||||
visited.add(edge.target)
|
||||
stack.push(edge.target)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return visited
|
||||
}
|
||||
|
||||
function collectAncestors(nodes: Map<string, DAGNode>, sinks: Set<string>): Set<string> {
|
||||
const visited = new Set<string>()
|
||||
const stack = [...sinks]
|
||||
|
||||
while (stack.length > 0) {
|
||||
const current = stack.pop()!
|
||||
if (visited.has(current)) {
|
||||
continue
|
||||
}
|
||||
|
||||
visited.add(current)
|
||||
const node = nodes.get(current)
|
||||
if (!node) {
|
||||
continue
|
||||
}
|
||||
|
||||
for (const incoming of node.incomingEdges.values()) {
|
||||
if (!visited.has(incoming)) {
|
||||
stack.push(incoming)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return visited
|
||||
}
|
||||
|
||||
function cloneSerializableState(
|
||||
state: SerializableExecutionState
|
||||
): SerializableExecutionState {
|
||||
return JSON.parse(JSON.stringify(state)) as SerializableExecutionState
|
||||
}
|
||||
|
||||
function filterDecisionMap(
|
||||
decisions: Record<string, string>,
|
||||
restartScope: Set<string>,
|
||||
originalIds: Set<string>
|
||||
): Record<string, string> {
|
||||
const filtered: Record<string, string> = {}
|
||||
for (const [key, value] of Object.entries(decisions || {})) {
|
||||
if (restartScope.has(key) || originalIds.has(key)) {
|
||||
continue
|
||||
}
|
||||
filtered[key] = value
|
||||
}
|
||||
return filtered
|
||||
}
|
||||
|
||||
function determineTriggerNodeIds(
|
||||
triggerBlockId: string,
|
||||
originalToNodeIds: Map<string, string[]>,
|
||||
nodes: Map<string, DAGNode>
|
||||
): string[] {
|
||||
let triggerNodes = originalToNodeIds.get(triggerBlockId)
|
||||
|
||||
if ((!triggerNodes || triggerNodes.length === 0) && nodes.has(triggerBlockId)) {
|
||||
triggerNodes = [triggerBlockId]
|
||||
}
|
||||
|
||||
if (!triggerNodes || triggerNodes.length === 0) {
|
||||
const rootNodes = Array.from(nodes.values())
|
||||
.filter((node) => node.incomingEdges.size === 0)
|
||||
.map((node) => node.id)
|
||||
|
||||
triggerNodes = rootNodes.length > 0 ? rootNodes : [triggerBlockId]
|
||||
}
|
||||
|
||||
return triggerNodes
|
||||
}
|
||||
|
||||
interface UpstreamAnalysisParams {
|
||||
dag: ReturnType<DAGBuilder['build']>
|
||||
triggerNodeIds: string[]
|
||||
stopNodeIds: Set<string>
|
||||
previousState: SerializableExecutionState
|
||||
previousResolvedInputs?: Record<string, any>
|
||||
previousResolvedOutputs?: Record<string, any>
|
||||
currentWorkflow: SerializedWorkflow
|
||||
previousWorkflow?: SerializedWorkflow
|
||||
}
|
||||
|
||||
interface UpstreamAnalysisResult {
|
||||
startCandidates: Map<string, string[]>
|
||||
traversedNodes: Set<string>
|
||||
}
|
||||
|
||||
function analyzeUpstreamDifferences(params: UpstreamAnalysisParams): UpstreamAnalysisResult {
|
||||
const {
|
||||
dag,
|
||||
triggerNodeIds,
|
||||
stopNodeIds,
|
||||
previousState,
|
||||
previousResolvedInputs,
|
||||
previousResolvedOutputs,
|
||||
currentWorkflow,
|
||||
previousWorkflow,
|
||||
} = params
|
||||
const startCandidates = new Map<string, string[]>()
|
||||
const traversedNodes = new Set<string>()
|
||||
const stack = triggerNodeIds.map((nodeId) => ({ nodeId, upstreamChanged: false as boolean }))
|
||||
const executedBlocks = new Set<string>(previousState.executedBlocks || [])
|
||||
|
||||
// Build a map of block IDs to their current and previous definitions
|
||||
const currentBlocks = new Map<string, any>()
|
||||
const previousBlocks = new Map<string, any>()
|
||||
|
||||
for (const block of currentWorkflow.blocks || []) {
|
||||
currentBlocks.set(block.id, block)
|
||||
}
|
||||
|
||||
if (previousWorkflow) {
|
||||
for (const block of previousWorkflow.blocks || []) {
|
||||
previousBlocks.set(block.id, block)
|
||||
}
|
||||
}
|
||||
|
||||
while (stack.length > 0) {
|
||||
const { nodeId, upstreamChanged } = stack.pop()!
|
||||
|
||||
if (traversedNodes.has(nodeId)) {
|
||||
continue
|
||||
}
|
||||
traversedNodes.add(nodeId)
|
||||
|
||||
const node = dag.nodes.get(nodeId)
|
||||
if (!node) {
|
||||
continue
|
||||
}
|
||||
|
||||
const originalId = node.metadata?.originalBlockId ?? nodeId
|
||||
const reasons: string[] = []
|
||||
|
||||
const previousBlockState =
|
||||
previousState.blockStates[nodeId] ?? previousState.blockStates[originalId]
|
||||
|
||||
const hasPriorState =
|
||||
previousBlockState !== undefined ||
|
||||
executedBlocks.has(nodeId) ||
|
||||
executedBlocks.has(originalId) ||
|
||||
previousResolvedOutputs?.[originalId] !== undefined ||
|
||||
previousResolvedInputs?.[originalId] !== undefined
|
||||
|
||||
if (!hasPriorState) {
|
||||
reasons.push('missing_prior_state')
|
||||
}
|
||||
|
||||
// Check if the block definition itself changed
|
||||
const currentBlock = currentBlocks.get(originalId)
|
||||
const previousBlock = previousBlocks.get(originalId)
|
||||
|
||||
if (currentBlock && previousBlock) {
|
||||
// Compare the block definitions (excluding metadata like position)
|
||||
const currentDefinition = JSON.stringify({
|
||||
type: currentBlock.type,
|
||||
subBlocks: currentBlock.subBlocks,
|
||||
})
|
||||
const previousDefinition = JSON.stringify({
|
||||
type: previousBlock.type,
|
||||
subBlocks: previousBlock.subBlocks,
|
||||
})
|
||||
|
||||
if (currentDefinition !== previousDefinition) {
|
||||
reasons.push('block_definition_changed')
|
||||
}
|
||||
} else if (currentBlock && !previousBlock) {
|
||||
reasons.push('new_block')
|
||||
}
|
||||
|
||||
// Note: We intentionally do NOT check incoming_edges_changed here because:
|
||||
// - Workflow topology changes (adding/removing unrelated blocks) shouldn't invalidate this block
|
||||
// - The output/input comparisons above already catch meaningful dependency changes
|
||||
// - This prevents false positives when the DAG structure evolves between runs
|
||||
|
||||
if (stopNodeIds.has(nodeId)) {
|
||||
reasons.push('target_block')
|
||||
}
|
||||
|
||||
const hasLocalChange = reasons.length > 0
|
||||
|
||||
if (hasLocalChange) {
|
||||
startCandidates.set(nodeId, reasons)
|
||||
}
|
||||
|
||||
const shouldPropagateChange = upstreamChanged || hasLocalChange
|
||||
|
||||
if (stopNodeIds.has(nodeId)) {
|
||||
continue
|
||||
}
|
||||
|
||||
for (const { target } of node.outgoingEdges.values()) {
|
||||
stack.push({ nodeId: target, upstreamChanged: shouldPropagateChange })
|
||||
}
|
||||
}
|
||||
|
||||
return {
|
||||
startCandidates,
|
||||
traversedNodes,
|
||||
}
|
||||
}
|
||||
|
||||
interface StartSetParams {
|
||||
upstreamCandidates: Map<string, string[]>
|
||||
ancestorSet: Set<string>
|
||||
stopNodeIds: Set<string>
|
||||
}
|
||||
|
||||
function deriveStartSet(params: StartSetParams): Set<string> {
|
||||
const { upstreamCandidates, ancestorSet, stopNodeIds } = params
|
||||
const finalStartSet = new Set<string>()
|
||||
|
||||
for (const candidate of upstreamCandidates.keys()) {
|
||||
if (ancestorSet.has(candidate) || stopNodeIds.has(candidate)) {
|
||||
finalStartSet.add(candidate)
|
||||
}
|
||||
}
|
||||
|
||||
for (const nodeId of stopNodeIds) {
|
||||
finalStartSet.add(nodeId)
|
||||
}
|
||||
|
||||
return finalStartSet
|
||||
}
|
||||
|
||||
function identifySinkNodes(nodes: Map<string, DAGNode>): Set<string> {
|
||||
const sinks = new Set<string>()
|
||||
for (const node of nodes.values()) {
|
||||
if (node.outgoingEdges.size === 0) {
|
||||
sinks.add(node.id)
|
||||
}
|
||||
}
|
||||
return sinks
|
||||
}
|
||||
|
||||
function areStringSetsEqual(a: Set<string>, b: Set<string>): boolean {
|
||||
if (a.size !== b.size) {
|
||||
return false
|
||||
}
|
||||
for (const value of a) {
|
||||
if (!b.has(value)) {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
interface PlanSummaryLogParams {
|
||||
startBlockId: string
|
||||
triggerBlockId: string
|
||||
startNodeIds: string[]
|
||||
forwardImpact: Set<string>
|
||||
upstreamAnalysis: UpstreamAnalysisResult
|
||||
sinkNodes: Set<string>
|
||||
ancestorSet: Set<string>
|
||||
prunedStartSet: Set<string>
|
||||
queueStartSet: Set<string>
|
||||
restartScope: Set<string>
|
||||
}
|
||||
|
||||
function logPlanSummary(params: PlanSummaryLogParams): void {
|
||||
const {
|
||||
startBlockId,
|
||||
triggerBlockId,
|
||||
startNodeIds,
|
||||
forwardImpact,
|
||||
upstreamAnalysis,
|
||||
sinkNodes,
|
||||
ancestorSet,
|
||||
prunedStartSet,
|
||||
queueStartSet,
|
||||
restartScope,
|
||||
} = params
|
||||
|
||||
logger.info('Run-from-block forward impact traversal completed', {
|
||||
startBlockId,
|
||||
startNodeIds,
|
||||
affectedCount: forwardImpact.size,
|
||||
affectedNodes: Array.from(forwardImpact),
|
||||
})
|
||||
|
||||
const upstreamDetails = Array.from(upstreamAnalysis.startCandidates.entries()).map(
|
||||
([nodeId, reasons]) => ({
|
||||
nodeId,
|
||||
reasons,
|
||||
})
|
||||
)
|
||||
|
||||
logger.info('Run-from-block upstream diff analysis', {
|
||||
triggerBlockId,
|
||||
traversedNodes: Array.from(upstreamAnalysis.traversedNodes),
|
||||
startCandidates: upstreamDetails,
|
||||
})
|
||||
|
||||
logger.info('Run-from-block backward pruning summary', {
|
||||
sinkNodes: Array.from(sinkNodes),
|
||||
ancestorCount: ancestorSet.size,
|
||||
ancestorNodes: Array.from(ancestorSet),
|
||||
prunedStartSet: Array.from(prunedStartSet),
|
||||
})
|
||||
|
||||
logger.info('Run-from-block queue and restart scope', {
|
||||
resumePendingQueue: Array.from(queueStartSet),
|
||||
restartScope: Array.from(restartScope),
|
||||
restartScopeSize: restartScope.size,
|
||||
})
|
||||
}
|
||||
|
||||
|
||||
22
packages/db/migrations/0109_solid_shiva.sql
Normal file
22
packages/db/migrations/0109_solid_shiva.sql
Normal file
@@ -0,0 +1,22 @@
|
||||
CREATE TYPE "public"."workflow_execution_state_status" AS ENUM('success', 'failed', 'paused');--> statement-breakpoint
|
||||
CREATE TABLE "workflow_execution_states" (
|
||||
"id" text PRIMARY KEY NOT NULL,
|
||||
"workflow_id" text NOT NULL,
|
||||
"trigger_block_id" text NOT NULL,
|
||||
"execution_id" text NOT NULL,
|
||||
"run_version" text,
|
||||
"serialized_state" jsonb NOT NULL,
|
||||
"resolved_inputs" jsonb DEFAULT '{}'::jsonb NOT NULL,
|
||||
"resolved_outputs" jsonb DEFAULT '{}'::jsonb NOT NULL,
|
||||
"status" "workflow_execution_state_status" DEFAULT 'success' NOT NULL,
|
||||
"attempt_at" timestamp DEFAULT now() NOT NULL
|
||||
);
|
||||
--> statement-breakpoint
|
||||
ALTER TABLE "workflow_execution_states" ADD CONSTRAINT "workflow_execution_states_workflow_id_workflow_id_fk" FOREIGN KEY ("workflow_id") REFERENCES "public"."workflow"("id") ON DELETE cascade ON UPDATE no action;--> statement-breakpoint
|
||||
ALTER TABLE "workflow_execution_states" ADD CONSTRAINT "workflow_execution_states_trigger_block_id_workflow_blocks_id_fk" FOREIGN KEY ("trigger_block_id") REFERENCES "public"."workflow_blocks"("id") ON DELETE cascade ON UPDATE no action;--> statement-breakpoint
|
||||
CREATE UNIQUE INDEX "workflow_execution_states_workflow_trigger_unique" ON "workflow_execution_states" USING btree ("workflow_id","trigger_block_id");--> statement-breakpoint
|
||||
CREATE INDEX "workflow_execution_states_workflow_idx" ON "workflow_execution_states" USING btree ("workflow_id");--> statement-breakpoint
|
||||
CREATE INDEX "workflow_execution_states_trigger_idx" ON "workflow_execution_states" USING btree ("trigger_block_id");--> statement-breakpoint
|
||||
CREATE INDEX "workflow_execution_states_execution_idx" ON "workflow_execution_states" USING btree ("execution_id");--> statement-breakpoint
|
||||
CREATE INDEX "workflow_execution_states_status_idx" ON "workflow_execution_states" USING btree ("status");--> statement-breakpoint
|
||||
CREATE INDEX "workflow_execution_states_attempt_at_idx" ON "workflow_execution_states" USING btree ("attempt_at");
|
||||
1
packages/db/migrations/0110_stale_impossible_man.sql
Normal file
1
packages/db/migrations/0110_stale_impossible_man.sql
Normal file
@@ -0,0 +1 @@
|
||||
ALTER TABLE "workflow_execution_states" ADD COLUMN "serialized_workflow" jsonb;
|
||||
8230
packages/db/migrations/meta/0109_snapshot.json
Normal file
8230
packages/db/migrations/meta/0109_snapshot.json
Normal file
File diff suppressed because it is too large
Load Diff
8236
packages/db/migrations/meta/0110_snapshot.json
Normal file
8236
packages/db/migrations/meta/0110_snapshot.json
Normal file
File diff suppressed because it is too large
Load Diff
@@ -757,6 +757,20 @@
|
||||
"when": 1762572820066,
|
||||
"tag": "0108_cuddly_scream",
|
||||
"breakpoints": true
|
||||
},
|
||||
{
|
||||
"idx": 109,
|
||||
"version": "7",
|
||||
"when": 1762826396153,
|
||||
"tag": "0109_solid_shiva",
|
||||
"breakpoints": true
|
||||
},
|
||||
{
|
||||
"idx": 110,
|
||||
"version": "7",
|
||||
"when": 1762832732250,
|
||||
"tag": "0110_stale_impossible_man",
|
||||
"breakpoints": true
|
||||
}
|
||||
]
|
||||
}
|
||||
}
|
||||
@@ -318,6 +318,44 @@ export const workflowExecutionLogs = pgTable(
|
||||
})
|
||||
)
|
||||
|
||||
export const workflowExecutionStateStatusEnum = pgEnum('workflow_execution_state_status', [
|
||||
'success',
|
||||
'failed',
|
||||
'paused',
|
||||
])
|
||||
|
||||
export const workflowExecutionStates = pgTable(
|
||||
'workflow_execution_states',
|
||||
{
|
||||
id: text('id').primaryKey(),
|
||||
workflowId: text('workflow_id')
|
||||
.notNull()
|
||||
.references(() => workflow.id, { onDelete: 'cascade' }),
|
||||
triggerBlockId: text('trigger_block_id')
|
||||
.notNull()
|
||||
.references(() => workflowBlocks.id, { onDelete: 'cascade' }),
|
||||
executionId: text('execution_id').notNull(),
|
||||
runVersion: text('run_version'),
|
||||
serializedState: jsonb('serialized_state').notNull(),
|
||||
serializedWorkflow: jsonb('serialized_workflow'),
|
||||
resolvedInputs: jsonb('resolved_inputs').notNull().default(sql`'{}'::jsonb`),
|
||||
resolvedOutputs: jsonb('resolved_outputs').notNull().default(sql`'{}'::jsonb`),
|
||||
status: workflowExecutionStateStatusEnum('status').notNull().default('success'),
|
||||
attemptAt: timestamp('attempt_at').notNull().defaultNow(),
|
||||
},
|
||||
(table) => ({
|
||||
workflowTriggerUnique: uniqueIndex('workflow_execution_states_workflow_trigger_unique').on(
|
||||
table.workflowId,
|
||||
table.triggerBlockId
|
||||
),
|
||||
workflowIdx: index('workflow_execution_states_workflow_idx').on(table.workflowId),
|
||||
triggerIdx: index('workflow_execution_states_trigger_idx').on(table.triggerBlockId),
|
||||
executionIdx: index('workflow_execution_states_execution_idx').on(table.executionId),
|
||||
statusIdx: index('workflow_execution_states_status_idx').on(table.status),
|
||||
attemptAtIdx: index('workflow_execution_states_attempt_at_idx').on(table.attemptAt),
|
||||
})
|
||||
)
|
||||
|
||||
export const pausedExecutions = pgTable(
|
||||
'paused_executions',
|
||||
{
|
||||
|
||||
Reference in New Issue
Block a user