Compare commits

...

2 Commits

Author SHA1 Message Date
Siddharth Ganesan
6d46b44e51 Stash 2025-11-10 19:46:27 -08:00
Siddharth Ganesan
ff99d75055 Run from block v1 2025-11-10 19:09:43 -08:00
26 changed files with 17806 additions and 51 deletions

View File

@@ -11,15 +11,23 @@ import {
loadDeployedWorkflowState,
loadWorkflowFromNormalizedTables,
} from '@/lib/workflows/db-helpers'
import type { NormalizedWorkflowData } from '@/lib/workflows/db-helpers'
import { executeWorkflowCore } from '@/lib/workflows/executor/execution-core'
import { type ExecutionEvent, encodeSSEEvent } from '@/lib/workflows/executor/execution-events'
import { PauseResumeManager } from '@/lib/workflows/executor/pause-resume-manager'
import { createStreamingResponse } from '@/lib/workflows/streaming'
import { validateWorkflowAccess } from '@/app/api/workflows/middleware'
import { type ExecutionMetadata, ExecutionSnapshot } from '@/executor/execution/snapshot'
import {
type ExecutionMetadata,
type SerializableExecutionState,
ExecutionSnapshot,
} from '@/executor/execution/snapshot'
import type { StreamingExecution } from '@/executor/types'
import { Serializer } from '@/serializer'
import type { SubflowType } from '@/stores/workflows/workflow/types'
import { getWorkflowExecutionState } from '@/lib/workflows/execution-state/service'
import { buildRunFromBlockPlan } from '@/lib/workflows/run-from-block/planner'
import { TriggerUtils } from '@/lib/workflows/triggers'
const logger = createLogger('WorkflowExecuteAPI')
@@ -30,6 +38,7 @@ const ExecuteWorkflowSchema = z.object({
useDraftState: z.boolean().optional(),
input: z.any().optional(),
startBlockId: z.string().optional(),
executionMode: z.enum(['run_from_block']).optional(),
})
export const runtime = 'nodejs'
@@ -123,6 +132,7 @@ export async function executeWorkflow(
triggerType,
useDraftState: false,
startTime: new Date().toISOString(),
executionMode: 'full',
}
const snapshot = new ExecutionSnapshot(
@@ -310,6 +320,8 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
stream: streamParam,
useDraftState,
input: validatedInput,
startBlockId,
executionMode,
} = validation.data
// For API key auth, the entire body is the input (except for our control fields)
@@ -322,16 +334,37 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
})()
: validatedInput
const shouldUseDraftState = useDraftState ?? auth.authType === 'session'
let shouldUseDraftState = useDraftState ?? auth.authType === 'session'
const isRunFromBlock = executionMode === 'run_from_block'
if (isRunFromBlock && auth.authType !== 'session') {
return NextResponse.json(
{ error: 'Run from block is only available within the client editor' },
{ status: 403 }
)
}
if (isRunFromBlock) {
if (!startBlockId) {
return NextResponse.json(
{ error: 'Run from block requires a block identifier.' },
{ status: 400 }
)
}
shouldUseDraftState = true
}
const streamHeader = req.headers.get('X-Stream-Response') === 'true'
const enableSSE = streamHeader || streamParam === true
const effectiveTriggerType = isRunFromBlock ? 'manual' : triggerType
logger.info(`[${requestId}] Starting server-side execution`, {
workflowId,
userId,
hasInput: !!input,
triggerType,
triggerType: effectiveTriggerType,
authType: auth.authType,
streamParam,
streamHeader,
@@ -342,13 +375,13 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
type LoggingTriggerType = 'api' | 'webhook' | 'schedule' | 'manual' | 'chat'
let loggingTriggerType: LoggingTriggerType = 'manual'
if (
triggerType === 'api' ||
triggerType === 'chat' ||
triggerType === 'webhook' ||
triggerType === 'schedule' ||
triggerType === 'manual'
effectiveTriggerType === 'api' ||
effectiveTriggerType === 'chat' ||
effectiveTriggerType === 'webhook' ||
effectiveTriggerType === 'schedule' ||
effectiveTriggerType === 'manual'
) {
loggingTriggerType = triggerType as LoggingTriggerType
loggingTriggerType = effectiveTriggerType as LoggingTriggerType
}
const loggingSession = new LoggingSession(
workflowId,
@@ -366,7 +399,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
currentUsage: usageCheck.currentUsage,
limit: usageCheck.limit,
workflowId,
triggerType,
triggerType: effectiveTriggerType,
})
await loggingSession.safeStart({
@@ -395,8 +428,9 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
// Process file fields in workflow input (base64/URL to UserFile conversion)
let processedInput = input
let workflowData: NormalizedWorkflowData | null = null
try {
const workflowData = shouldUseDraftState
workflowData = shouldUseDraftState
? await loadWorkflowFromNormalizedTables(workflowId)
: await loadDeployedWorkflowState(workflowId)
@@ -448,6 +482,83 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
)
}
let runFromBlockPlan:
| {
snapshotState: SerializableExecutionState
resumePendingQueue: string[]
triggerBlockId: string
}
| null = null
if (isRunFromBlock) {
if (!workflowData) {
return NextResponse.json(
{ error: 'Unable to load workflow state for run from block execution' },
{ status: 400 }
)
}
const startCandidate = TriggerUtils.findStartBlock(workflowData.blocks, 'manual', false)
if (!startCandidate) {
return NextResponse.json(
{ error: 'No manual trigger block found for this workflow' },
{ status: 400 }
)
}
const latestState = await getWorkflowExecutionState(workflowId, startCandidate.blockId)
if (!latestState) {
return NextResponse.json(
{
error:
'No prior execution snapshot found. Run the workflow once before using run from block.',
},
{ status: 400 }
)
}
const serializedWorkflow = new Serializer().serializeWorkflow(
workflowData.blocks,
workflowData.edges,
workflowData.loops,
workflowData.parallels,
true
)
try {
const trimmedStartBlockId = startBlockId!.trim()
const triggerBlockIdForPlan = latestState.triggerBlockId || startCandidate.blockId
const plan = buildRunFromBlockPlan({
serializedWorkflow,
previousState: latestState.serializedState,
previousResolvedInputs: latestState.resolvedInputs,
previousResolvedOutputs: latestState.resolvedOutputs,
previousWorkflow: latestState.serializedWorkflow,
startBlockId: trimmedStartBlockId,
triggerBlockId: triggerBlockIdForPlan,
})
runFromBlockPlan = {
snapshotState: plan.snapshotState,
resumePendingQueue: plan.resumePendingQueue,
triggerBlockId: triggerBlockIdForPlan,
}
} catch (planError) {
logger.error(`[${requestId}] Failed to build run-from-block plan`, {
error: planError,
})
return NextResponse.json(
{
error:
planError instanceof Error ? planError.message : 'Unable to build run from block plan',
},
{ status: 400 }
)
}
}
if (!enableSSE) {
logger.info(`[${requestId}] Using non-SSE execution (direct JSON response)`)
try {
@@ -457,9 +568,13 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
workflowId,
workspaceId: workflow.workspaceId,
userId,
triggerType,
triggerType: effectiveTriggerType,
triggerBlockId: runFromBlockPlan?.triggerBlockId,
useDraftState: shouldUseDraftState,
startTime: new Date().toISOString(),
resumeFromSnapshot: isRunFromBlock,
executionMode: isRunFromBlock ? 'run_from_block' : 'full',
pendingBlocks: runFromBlockPlan?.resumePendingQueue,
}
const snapshot = new ExecutionSnapshot(
@@ -468,7 +583,8 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
processedInput,
{},
workflow.variables || {},
selectedOutputs
selectedOutputs,
runFromBlockPlan?.snapshotState
)
const result = await executeWorkflowCore({
@@ -528,7 +644,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
streamConfig: {
selectedOutputs: resolvedSelectedOutputs,
isSecureMode: false,
workflowTriggerType: triggerType === 'chat' ? 'chat' : 'api',
workflowTriggerType: effectiveTriggerType === 'chat' ? 'chat' : 'api',
},
createFilteredResult,
executionId,
@@ -711,9 +827,13 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
workflowId,
workspaceId: workflow.workspaceId,
userId,
triggerType,
triggerType: effectiveTriggerType,
triggerBlockId: runFromBlockPlan?.triggerBlockId,
useDraftState: shouldUseDraftState,
startTime: new Date().toISOString(),
resumeFromSnapshot: isRunFromBlock,
executionMode: isRunFromBlock ? 'run_from_block' : 'full',
pendingBlocks: runFromBlockPlan?.resumePendingQueue,
}
const snapshot = new ExecutionSnapshot(
@@ -722,7 +842,8 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
processedInput,
{},
workflow.variables || {},
selectedOutputs
selectedOutputs,
runFromBlockPlan?.snapshotState
)
const result = await executeWorkflowCore({

View File

@@ -1,5 +1,5 @@
import { memo, useCallback } from 'react'
import { ArrowLeftRight, ArrowUpDown, Circle, CircleOff, LogOut } from 'lucide-react'
import { ArrowLeftRight, ArrowUpDown, Circle, CircleOff, LogOut, Play } from 'lucide-react'
import { Button, Duplicate, Tooltip, Trash2 } from '@/components/emcn'
import { cn } from '@/lib/utils'
import { useUserPermissionsContext } from '@/app/workspace/[workspaceId]/providers/workspace-permissions-provider'
@@ -16,6 +16,10 @@ interface ActionBarProps {
blockType: string
/** Whether the action bar is disabled */
disabled?: boolean
/** Whether an execution is currently in progress */
isExecuting?: boolean
/** Handler to run the workflow starting from this block */
onRunFromBlock?: (blockId: string) => Promise<any> | void
}
/**
@@ -25,7 +29,13 @@ interface ActionBarProps {
* @component
*/
export const ActionBar = memo(
function ActionBar({ blockId, blockType, disabled = false }: ActionBarProps) {
function ActionBar({
blockId,
blockType,
disabled = false,
isExecuting = false,
onRunFromBlock,
}: ActionBarProps) {
const {
collaborativeRemoveBlock,
collaborativeToggleBlockEnabled,
@@ -69,6 +79,8 @@ export const ActionBar = memo(
return defaultMessage
}
const canRunFromBlock = !disabled && !isExecuting && Boolean(onRunFromBlock)
return (
<div
className={cn(
@@ -78,6 +90,26 @@ export const ActionBar = memo(
'gap-[6px] rounded-[10px] bg-[#242424] p-[6px]'
)}
>
{onRunFromBlock && (
<Tooltip.Root>
<Tooltip.Trigger asChild>
<Button
variant='ghost'
onClick={() => {
if (canRunFromBlock) {
onRunFromBlock?.(blockId)
}
}}
className='h-[30px] w-[30px] rounded-[8px] bg-[#363636] p-0 text-[#868686] hover:bg-[#33B4FF] hover:text-[#1B1B1B] dark:text-[#868686] dark:hover:bg-[#33B4FF] dark:hover:text-[#1B1B1B]'
disabled={!canRunFromBlock}
>
<Play className='h-[14px] w-[14px]' />
</Button>
</Tooltip.Trigger>
<Tooltip.Content side='right'>{getTooltipMessage('Run From Here')}</Tooltip.Content>
</Tooltip.Root>
)}
<Tooltip.Root>
<Tooltip.Trigger asChild>
<Button
@@ -204,7 +236,9 @@ export const ActionBar = memo(
return (
prevProps.blockId === nextProps.blockId &&
prevProps.blockType === nextProps.blockType &&
prevProps.disabled === nextProps.disabled
prevProps.disabled === nextProps.disabled &&
prevProps.isExecuting === nextProps.isExecuting &&
prevProps.onRunFromBlock === nextProps.onRunFromBlock
)
}
)

View File

@@ -24,6 +24,7 @@ import {
} from './hooks'
import type { WorkflowBlockProps } from './types'
import { debounce, getProviderName, shouldSkipBlockRender } from './utils'
import { useWorkflowExecution } from '../../hooks/use-workflow-execution'
const logger = createLogger('WorkflowBlock')
@@ -195,6 +196,7 @@ export const WorkflowBlock = memo(function WorkflowBlock({
currentWorkflow,
data
)
const { handleRunFromBlock, isExecuting } = useWorkflowExecution()
const { horizontalHandles, blockHeight, blockWidth, displayAdvancedMode, displayTriggerMode } =
useBlockProperties(
@@ -600,7 +602,13 @@ export const WorkflowBlock = memo(function WorkflowBlock({
</div>
)}
<ActionBar blockId={id} blockType={type} disabled={!userPermissions.canEdit} />
<ActionBar
blockId={id}
blockType={type}
disabled={!userPermissions.canEdit}
isExecuting={isExecuting}
onRunFromBlock={handleRunFromBlock}
/>
{shouldShowDefaultHandles && (
<Connections blockId={id} horizontalHandles={horizontalHandles} />

View File

@@ -643,7 +643,11 @@ export function useWorkflowExecution() {
onStream?: (se: StreamingExecution) => Promise<void>,
executionId?: string,
onBlockComplete?: (blockId: string, output: any) => Promise<void>,
overrideTriggerType?: 'chat' | 'manual' | 'api'
overrideTriggerType?: 'chat' | 'manual' | 'api',
overrides?: {
startBlockId?: string
executionMode?: 'run_from_block'
}
): Promise<ExecutionResult | StreamingExecution> => {
// Use currentWorkflow but check if we're in diff mode
const { blocks: workflowBlocks, edges: workflowEdges } = currentWorkflow
@@ -725,10 +729,19 @@ export function useWorkflowExecution() {
}
// Determine start block and workflow input based on execution type
let startBlockId: string | undefined
const overrideStartBlockId = overrides?.startBlockId?.trim()
let startBlockId: string | undefined = overrideStartBlockId
let finalWorkflowInput = workflowInput
if (isExecutingFromChat) {
if (overrideStartBlockId) {
if (!filteredStates[overrideStartBlockId]) {
setIsExecuting(false)
throw new Error('Selected block is not part of this workflow')
}
}
if (!startBlockId && isExecutingFromChat) {
// For chat execution, find the appropriate chat trigger
const startBlock = TriggerUtils.findStartBlock(filteredStates, 'chat')
@@ -737,7 +750,7 @@ export function useWorkflowExecution() {
}
startBlockId = startBlock.blockId
} else {
} else if (!startBlockId) {
// Manual execution: detect and group triggers by paths
const candidates = resolveStartCandidates(filteredStates, {
execution: 'manual',
@@ -838,6 +851,7 @@ export function useWorkflowExecution() {
selectedOutputs,
triggerType: overrideTriggerType || 'manual',
useDraftState: true,
executionMode: overrides?.executionMode,
callbacks: {
onExecutionStarted: (data) => {
logger.info('Server execution started:', data)
@@ -1208,6 +1222,63 @@ export function useWorkflowExecution() {
handleDebugExecutionError,
])
/**
* Handles cancelling the current debugging session
*/
const handleRunFromBlock = useCallback(
async (blockId: string) => {
if (!activeWorkflowId || !blockId?.trim()) {
logger.warn('Run from block requested without active workflow or block id')
return
}
setExecutionResult(null)
setIsExecuting(true)
setIsDebugging(false)
try {
const runResult = await executeWorkflow(
undefined,
undefined,
undefined,
undefined,
'manual',
{
startBlockId: blockId,
executionMode: 'run_from_block',
}
)
if (runResult && 'metadata' in runResult && runResult.metadata?.isDebugSession) {
setDebugContext(runResult.metadata.context || null)
if (runResult.metadata.pendingBlocks) {
setPendingBlocks(runResult.metadata.pendingBlocks)
}
} else if (runResult && 'success' in runResult) {
setExecutionResult(runResult)
setIsExecuting(false)
setIsDebugging(false)
setActiveBlocks(new Set())
}
return runResult
} catch (error) {
return handleExecutionError(error, { executionId: undefined })
}
},
[
activeWorkflowId,
executeWorkflow,
handleExecutionError,
setExecutionResult,
setIsExecuting,
setIsDebugging,
setDebugContext,
setPendingBlocks,
setActiveBlocks,
]
)
/**
* Handles cancelling the current debugging session
*/
@@ -1249,6 +1320,7 @@ export function useWorkflowExecution() {
pendingBlocks,
executionResult,
handleRunWorkflow,
handleRunFromBlock,
handleStepDebug,
handleResumeDebug,
handleCancelDebug,

View File

@@ -438,6 +438,7 @@ async function runWorkflowExecution({
triggerBlockId: payload.blockId || undefined,
useDraftState: false,
startTime: new Date().toISOString(),
executionMode: 'full',
}
const snapshot = new ExecutionSnapshot(

View File

@@ -234,6 +234,7 @@ async function executeWebhookJobInternal(
triggerBlockId: payload.blockId,
useDraftState: false,
startTime: new Date().toISOString(),
executionMode: 'full',
}
const snapshot = new ExecutionSnapshot(
@@ -447,6 +448,7 @@ async function executeWebhookJobInternal(
triggerBlockId: payload.blockId,
useDraftState: false,
startTime: new Date().toISOString(),
executionMode: 'full',
}
const snapshot = new ExecutionSnapshot(

View File

@@ -103,6 +103,7 @@ export async function executeWorkflowJob(payload: WorkflowExecutionPayload) {
triggerType: payload.triggerType || 'api',
useDraftState: false,
startTime: new Date().toISOString(),
executionMode: 'full',
}
const snapshot = new ExecutionSnapshot(

View File

@@ -0,0 +1,181 @@
# Run-From-Block Execution Design
## Background
- Manual debugging and partial reruns currently require executing an entire workflow from the trigger onward.
- We already capture block outputs, router/condition choices, loop scopes, and DAG topology inside `SerializableExecutionState` snapshots.
- The goal is to restart execution at any block while preserving as much prior computation as possible and maintaining true parallel semantics.
## Objectives
- Persist block inputs/outputs, branch decisions, and DAG metadata for every successful execution.
- Given a target block, recompute only the minimal subgraph whose upstream state changed or was explicitly invalidated.
- Hydrate the executor with historical state so unresolved blocks behave exactly as in the full run.
- Preserve orchestrator behaviour for loops, parallels, routers, and pause/resume blocks.
- Provide observability that explains which blocks reran and why.
## Terminology
- **Execution Snapshot** persisted copy of `SerializableExecutionState` plus workflow metadata for a prior run.
- **Target Block** block the user wants to “run from”.
- **Start Set** minimal set of blocks that must be enqueued as new starting points.
- **Resolved Block** block whose prior output remains valid and is reused without re-executing.
- **Restart Scope** set of blocks that must be re-executed (includes the target block and any dependent nodes).
## Persisted State Model
- Create migration (separate task) adding `workflow_execution_states` table storing:
- `execution_id`, `workflow_id`, `trigger_block_id`, `run_version`, `serialized_state`, `resolved_inputs`, `resolved_outputs`, `status`, `attempt_at`.
- Treat `(workflow_id, trigger_block_id)` as the logical key; each trigger maintains its own latest snapshot.
- Extend `BlockExecutor` to persist `resolvedInputs` alongside `blockLogs` so we can diff future inputs accurately.
- Store `dagIncomingEdges` and `remainingEdges` from snapshots to allow edge restoration when pruning.
- Snapshot persistence is limited to executions initiated from the client/manual surface to avoid capturing automation noise.
## High-Level Run-From-Block Flow
1. **Load Context**
- Fetch latest snapshot (regardless of status) for the requested `(workflow_id, trigger_block_id)` pair, matching deploy/draft version.
- Build fresh DAG from current workflow definition using `DAGBuilder`.
2. **Analyse Changes**
- Compute forward impact from target block.
- Detect upstream changes and build initial Start Set.
- Run backward pruning from sinks to drop unnecessary starts.
3. **Hydrate Executor**
- Reconstruct `ExecutionState` from snapshot.
- Remove state for restart blocks; keep outputs for resolved blocks.
- Clear loop/parallel scopes for affected constructs.
4. **Seed Queue & Execute**
- Set `context.metadata.pendingBlocks` to pruned Start Set.
- Ensure incoming edge sets reflect resolved upstream nodes.
- Invoke `ExecutionEngine.run()`; existing concurrency model provides parallelism.
- After execution completes (success, pause, or failure), persist the resulting state as the new snapshot for this trigger, overwriting the previous attempt.
- Tag `context.metadata.executionMode = 'run_from_block'` so downstream logging, billing, and analytics can classify the run.
## Detecting Blocks to Re-Run
### Step 1: Forward Impact DFS
- Start at the target node ID (branch suffix and sentinel aware).
- Traverse outgoing edges using the current DAG (post-builder).
- Collect every reachable node into `affectedSet`.
- Include sentinel nodes and parallel branch clones (IDs with subscript) to avoid missing orchestrator plumbing.
### Step 2: Upstream Change Detection
- Determine the trigger block by reusing `resolveExecutorStartBlock` logic with current workflow metadata and the snapshots trigger info.
- Perform DFS from the trigger through current DAG edges until:
- Visiting the target block (stop descending past target).
- Encountering nodes not present in the saved snapshot (mark as changed).
- For each visited node:
- Compare stored block configuration hash vs. current (hash metadata + params + code).
- Resolve current inputs via `VariableResolver` with hydrated state; compare with stored `resolvedInputs`.
- Compare stored outputs if the block ID exists in snapshot; missing outputs imply change.
- If any mismatch or block is the explicit target, add node ID to `startCandidates`.
- Propagate a “changed” flag downstream in this DFS so children get marked if a parent changed.
### Step 3: Backward Pruning from Sinks
- Identify sink nodes (no outgoing edges or response blocks).
- Build reverse adjacency map once per run.
- DFS backward from each sink, collecting `ancestorSet`.
- The minimal Start Set is `startCandidates ∩ ancestorSet`.
- Always include target block even if intersection would remove it.
- Remove nodes already in `affectedSet` but whose outputs remain identical and have no path to sinks (defensive guard).
### Step 4: Final Restart Scope
- `restartScope = affectedSet startSet`.
- Any block not in `restartScope` remains resolved and treated as completed.
## Queue Preparation
- Hydrate `ExecutionState` with stored block states.
- For every node in `restartScope`:
- Remove entry from `ExecutionState.blockStates` and `executedBlocks`.
- Delete related router/condition decisions.
- Restore incoming edges using `dagIncomingEdges` snapshot.
- For each resolved upstream node:
- Confirm its outgoing edges are processed via `edgeManager.processOutgoingEdges(node, storedOutput, false)` to drop satisfied dependencies and enqueue children when needed.
- Populate `context.metadata.pendingBlocks` with the Start Set (deduped, sorted for determinism).
- ExecutionEngines `initializeQueue` path sees `pendingBlocks` and pushes them onto the ready queue.
## Loop Handling
- Loops are restarted from iteration zero when any block inside the loop body requires re-execution.
- Implementation:
- When diff logic marks a loop body node (or sentinel) as changed, include `loopId` in `loopRestartSet`.
- Before execution:
- Drop entries for `loopId` from `context.loopExecutions`.
- Call `LoopOrchestrator.clearLoopExecutionState(loopId)` and `restoreLoopEdges(loopId)` to reset sentinel edges.
- Enqueue the sentinel start node if it belongs to Start Set; loop orchestrator will rebuild scopes as nodes run.
- Snapshot values for loops (aggregated outputs) are reused only if loop not in restart scope.
## Parallel Handling
- Parallel branches already map to distinct node IDs (`blockId₍index₎`).
- Forward/backward DFS naturally include affected branch nodes.
- When any branch node re-runs:
- Clear saved outputs for that branch node only.
- `ParallelOrchestrator` lazily recreates scope the first time a branch completes (scope removed from `context.parallelExecutions` for impacted `parallelId`).
- Branches outside restart scope retain previous outputs, preserving aggregated results.
## Routers and Conditions
- Decisions stored in `context.decisions.router` / `context.decisions.condition`.
- When a router or condition is in restart scope, remove its decision before queuing so `EdgeManager.shouldActivateEdge` re-evaluates the correct branch.
- For resolved routers/conditions keep decisions to avoid re-activating previously pruned edges.
## Pause/Resume Blocks
- If an approval block lies in restart scope:
- Strip `_pauseMetadata` before execution so we do not treat stored metadata as fresh pause output.
- Re-running will register new pause contexts (or skip if configuration changed).
- If outside restart scope:
- Preserve `_pauseMetadata` in reused outputs but flag them as historical (optional boolean) so UI can show they are not actionable resumes.
- Resume triggers are allowed only when `context.metadata.resumeFromSnapshot` is true; run-from-block will set `allowResumeTriggers = false` to avoid accidental resume nodes unless explicitly in Start Set.
## Variable Resolution
- Hydrate `VariableResolver` with reconstructed `ExecutionState`; resolved blocks provide outputs to satisfy `<block.field>` references.
- For blocks in restart scope, removal from `ExecutionState` forces resolver to recompute when node executes.
- During diffing:
- Temporarily use hydrated state (before pruning) to resolve current inputs; if resolution fails (missing upstream output), mark block as changed.
- Workflow variables and environment variables come from persisted snapshot metadata to ensure deterministic comparisons.
## Concurrency & Queueing Semantics
- Existing `ExecutionEngine` already executes all ready nodes concurrently; no changes needed for “true parallelism”.
- Optionally expose `maxConcurrency` in `RunFromBlockPlan` for future throttling; default remains unbounded.
- `readyQueue` is seeded exclusively with Start Set; downstream nodes become ready through normal edge processing when dependencies complete.
- Reuse streaming callbacks (`onBlockStart`, `onBlockComplete`, `onStream`) so rerun blocks stream back to the client console in real time; resolved blocks remain silent.
## Error Handling
- Missing snapshot or version mismatch → if no historical state exists, fall back to a full-run plan (Start Set becomes the original trigger).
- Missing target node in current DAG → abort (workflow changed too much).
- If diff fails due to unresolvable reference, treat as changed and re-run from that node.
- Execution failures or aborts overwrite the stored snapshot with the partial state, ensuring subsequent attempts resume from the latest client-visible data.
## Observability
- Add structured logs via `logger.info`:
- Snapshot metadata (execution ID, createdAt, version).
- Forward DFS result size (`affectedCount`).
- Upstream diff summary (block IDs and reasons).
- Loop/parallel scopes cleared.
- Start Set list before queueing.
- Emit metrics counters (future) for number of blocks reused vs. re-executed.
## Storage & Migration Plan
- Migration (later task) creates new table; store raw JSON of execution state and run version.
- Repository layer exposes:
- `saveExecutionState(executionResult)` after each client-initiated run, regardless of outcome.
- `loadLatestExecutionState(workflowId, runVersion)` when run-from-block invoked.
- Consider TTL or pruning policy to limit history growth.
## Testing Strategy
- Unit tests:
- Forward/backward DFS utilities with loops/parallels.
- Diff logic for input/output/config changes.
- Loop restart logic resets scopes.
- Pause block behaviour (metadata stripping vs. reuse).
- Integration tests:
- Full workflow with branches, loops, approvals run once, mutate input, run-from-block, assert only necessary blocks rerun.
- Scenario where upstream config change triggers deeper downstream re-execution.
- Regression tests ensuring original full-run path unchanged.
## Open Questions
- Should we expose UI controls to override Start Set suggestions?
- How do we surface “historical” pause blocks to users?
- Should we snapshot tool execution side-effects (e.g., notifications) or always rerun them?
## Future Enhancements
- Cache hashed block configurations to speed change detection.
- Allow selecting older execution snapshots.
- Layer on speculative parallelism controls (per-branch throttling).
- Add dry-run mode that reports Start Set without executing.
- Extend the client block toolbar so hovering a block reveals the “Run from block” action alongside existing controls, invoking the documented flow with proper logging and billing hooks.

View File

@@ -87,6 +87,13 @@ export class BlockExecutor {
}
cleanupSelfReference?.()
if (!isSentinel) {
this.state.setBlockInputs(node.id, resolvedInputs)
if (blockLog) {
blockLog.input = resolvedInputs
}
}
try {
const output = handler.executeWithNode
? await handler.executeWithNode(ctx, block, resolvedInputs, nodeMetadata)

View File

@@ -19,6 +19,7 @@ const logger = createLogger('ExecutionEngine')
export class ExecutionEngine {
private readyQueue: string[] = []
private executing = new Set<Promise<void>>()
private executingNodes = new Set<string>()
private queueLock = Promise.resolve()
private finalOutput: NormalizedBlockOutput = {}
private pausedBlocks: Map<string, PauseMetadata> = new Map()

View File

@@ -1,17 +1,20 @@
import { createLogger } from '@/lib/logs/console/logger'
import { StartBlockPath } from '@/lib/workflows/triggers'
import type { BlockOutput } from '@/blocks/types'
import type { DAG } from '@/executor/dag/builder'
import { DAGBuilder } from '@/executor/dag/builder'
import { BlockExecutor } from '@/executor/execution/block-executor'
import { EdgeManager } from '@/executor/execution/edge-manager'
import { ExecutionEngine } from '@/executor/execution/engine'
import { ExecutionState } from '@/executor/execution/state'
import type { ContextExtensions, WorkflowInput } from '@/executor/execution/types'
import { buildSerializableExecutionState } from '@/executor/execution/snapshot-serializer'
import { createBlockHandlers } from '@/executor/handlers/registry'
import { LoopOrchestrator } from '@/executor/orchestrators/loop'
import { NodeExecutionOrchestrator } from '@/executor/orchestrators/node'
import { ParallelOrchestrator } from '@/executor/orchestrators/parallel'
import type { BlockState, ExecutionContext, ExecutionResult } from '@/executor/types'
import type { SerializableExecutionState } from '@/executor/execution/snapshot'
import {
buildResolutionFromBlock,
buildStartBlockOutput,
@@ -39,6 +42,8 @@ export class DAGExecutor {
private contextExtensions: ContextExtensions
private isCancelled = false
private dagBuilder: DAGBuilder
private lastContext?: ExecutionContext
private lastDag?: DAG
constructor(options: DAGExecutorOptions) {
this.workflow = options.workflow
@@ -53,6 +58,9 @@ export class DAGExecutor {
const savedIncomingEdges = this.contextExtensions.dagIncomingEdges
const dag = this.dagBuilder.build(this.workflow, triggerBlockId, savedIncomingEdges)
const { context, state } = this.createExecutionContext(workflowId, triggerBlockId)
context.metadata.triggerBlockId = triggerBlockId
this.lastContext = context
this.lastDag = dag
// Link cancellation flag to context
Object.defineProperty(context, 'isCancelled', {
@@ -78,6 +86,11 @@ export class DAGExecutor {
return await engine.run(triggerBlockId)
}
getSerializableState(pendingQueue: string[] = []): SerializableExecutionState | undefined {
if (!this.lastContext) return undefined
return buildSerializableExecutionState(this.lastContext, this.lastDag, pendingQueue)
}
cancel(): void {
this.isCancelled = true
}

View File

@@ -55,6 +55,38 @@ function serializeParallelExecutions(
return result
}
export function buildSerializableExecutionState(
context: ExecutionContext,
dag?: DAG,
pendingQueue: string[] = []
): SerializableExecutionState {
const dagIncomingEdges: Record<string, string[]> | undefined = dag
? Object.fromEntries(
Array.from(dag.nodes.entries()).map(([nodeId, node]) => [
nodeId,
Array.from(node.incomingEdges),
])
)
: undefined
return {
blockStates: Object.fromEntries(context.blockStates),
executedBlocks: Array.from(context.executedBlocks),
blockLogs: context.blockLogs,
decisions: {
router: Object.fromEntries(context.decisions.router),
condition: Object.fromEntries(context.decisions.condition),
},
completedLoops: Array.from(context.completedLoops),
loopExecutions: serializeLoopExecutions(context.loopExecutions),
parallelExecutions: serializeParallelExecutions(context.parallelExecutions),
parallelBlockMapping: mapFromEntries(context.parallelBlockMapping),
activeExecutionPath: Array.from(context.activeExecutionPath),
pendingQueue,
dagIncomingEdges,
}
}
export function serializePauseSnapshot(
context: ExecutionContext,
triggerBlockIds: string[],
@@ -70,31 +102,7 @@ export function serializePauseSnapshot(
useDraftState = true
}
const dagIncomingEdges: Record<string, string[]> | undefined = dag
? Object.fromEntries(
Array.from(dag.nodes.entries()).map(([nodeId, node]) => [
nodeId,
Array.from(node.incomingEdges),
])
)
: undefined
const state: SerializableExecutionState = {
blockStates: Object.fromEntries(context.blockStates),
executedBlocks: Array.from(context.executedBlocks),
blockLogs: context.blockLogs,
decisions: {
router: Object.fromEntries(context.decisions.router),
condition: Object.fromEntries(context.decisions.condition),
},
completedLoops: Array.from(context.completedLoops),
loopExecutions: serializeLoopExecutions(context.loopExecutions),
parallelExecutions: serializeParallelExecutions(context.parallelExecutions),
parallelBlockMapping: mapFromEntries(context.parallelBlockMapping),
activeExecutionPath: Array.from(context.activeExecutionPath),
pendingQueue: triggerBlockIds,
dagIncomingEdges,
}
const state = buildSerializableExecutionState(context, dag, triggerBlockIds)
const executionMetadata = {
requestId:

View File

@@ -13,6 +13,7 @@ export interface ExecutionMetadata {
startTime: string
pendingBlocks?: string[]
resumeFromSnapshot?: boolean
executionMode?: 'full' | 'run_from_block' | 'resume'
}
export interface ExecutionCallbacks {

View File

@@ -71,10 +71,31 @@ export class ExecutionState implements BlockStateController {
}
setBlockOutput(blockId: string, output: NormalizedBlockOutput, executionTime = 0): void {
this.blockStates.set(blockId, { output, executed: true, executionTime })
const existing = this.blockStates.get(blockId)
const inputs = existing?.inputs
this.blockStates.set(blockId, {
inputs,
output,
executed: true,
executionTime,
})
this.executedBlocks.add(blockId)
}
setBlockInputs(blockId: string, inputs: Record<string, any>): void {
const existing = this.blockStates.get(blockId)
if (existing) {
this.blockStates.set(blockId, { ...existing, inputs })
} else {
this.blockStates.set(blockId, {
inputs,
output: {},
executed: false,
executionTime: 0,
})
}
}
setBlockState(blockId: string, state: BlockState): void {
this.blockStates.set(blockId, state)
if (state.executed) {

View File

@@ -54,6 +54,7 @@ export interface BlockStateReader {
}
export interface BlockStateWriter {
setBlockInputs(blockId: string, inputs: Record<string, any>): void
setBlockOutput(blockId: string, output: NormalizedBlockOutput, executionTime?: number): void
setBlockState(blockId: string, state: BlockState): void
deleteBlockState(blockId: string): void

View File

@@ -136,9 +136,11 @@ export interface ExecutionMetadata {
triggerBlockId?: string
useDraftState?: boolean
resumeFromSnapshot?: boolean
executionMode?: 'full' | 'run_from_block' | 'resume'
}
export interface BlockState {
inputs?: Record<string, any>
output: NormalizedBlockOutput
executed: boolean
executionTime: number

View File

@@ -61,6 +61,7 @@ export interface ExecuteStreamOptions {
startBlockId?: string
triggerType?: string
useDraftState?: boolean
executionMode?: 'run_from_block'
callbacks?: ExecutionStreamCallbacks
}

View File

@@ -0,0 +1,139 @@
import { db } from '@sim/db'
import { workflowExecutionStates } from '@sim/db/schema'
import { and, eq } from 'drizzle-orm'
import { v4 as uuidv4 } from 'uuid'
import { createLogger } from '@/lib/logs/console/logger'
import type { SerializableExecutionState } from '@/executor/execution/snapshot'
import type { SerializedWorkflow } from '@/serializer/types'
const logger = createLogger('WorkflowExecutionStateService')
export type WorkflowExecutionStateStatus = 'success' | 'failed' | 'paused'
export interface WorkflowExecutionStateRecord {
id: string
workflowId: string
triggerBlockId: string
executionId: string
runVersion: string | null
serializedState: SerializableExecutionState
serializedWorkflow?: SerializedWorkflow
resolvedInputs: Record<string, any>
resolvedOutputs: Record<string, any>
status: WorkflowExecutionStateStatus
attemptAt: string
}
export interface UpsertWorkflowExecutionStateParams {
workflowId: string
triggerBlockId: string
executionId: string
runVersion?: string | null
serializedState: SerializableExecutionState
serializedWorkflow?: SerializedWorkflow
resolvedInputs: Record<string, any>
resolvedOutputs: Record<string, any>
status: WorkflowExecutionStateStatus
attemptAt?: Date
}
export async function upsertWorkflowExecutionState(
params: UpsertWorkflowExecutionStateParams
): Promise<WorkflowExecutionStateRecord> {
const {
workflowId,
triggerBlockId,
executionId,
runVersion = null,
serializedState,
serializedWorkflow,
resolvedInputs,
resolvedOutputs,
status,
} = params
const attemptAt = params.attemptAt ?? new Date()
const insertValues = {
id: uuidv4(),
workflowId,
triggerBlockId,
executionId,
runVersion,
serializedState,
serializedWorkflow,
resolvedInputs,
resolvedOutputs,
status,
attemptAt,
}
const [row] = await db
.insert(workflowExecutionStates)
.values(insertValues)
.onConflictDoUpdate({
target: [workflowExecutionStates.workflowId, workflowExecutionStates.triggerBlockId],
set: {
executionId,
runVersion,
serializedState,
serializedWorkflow,
resolvedInputs,
resolvedOutputs,
status,
attemptAt,
},
})
.returning()
if (!row) {
throw new Error('Failed to upsert workflow execution state')
}
logger.info('Persisted workflow execution state', {
workflowId,
triggerBlockId,
executionId,
status,
})
return mapRow(row)
}
export async function getWorkflowExecutionState(
workflowId: string,
triggerBlockId: string
): Promise<WorkflowExecutionStateRecord | null> {
const [row] = await db
.select()
.from(workflowExecutionStates)
.where(
and(
eq(workflowExecutionStates.workflowId, workflowId),
eq(workflowExecutionStates.triggerBlockId, triggerBlockId)
)
)
.limit(1)
if (!row) {
return null
}
return mapRow(row)
}
function mapRow(row: typeof workflowExecutionStates.$inferSelect): WorkflowExecutionStateRecord {
return {
id: row.id,
workflowId: row.workflowId,
triggerBlockId: row.triggerBlockId,
executionId: row.executionId,
runVersion: row.runVersion,
serializedState: row.serializedState as SerializableExecutionState,
serializedWorkflow: row.serializedWorkflow as SerializedWorkflow | undefined,
resolvedInputs: row.resolvedInputs as Record<string, any>,
resolvedOutputs: row.resolvedOutputs as Record<string, any>,
status: row.status as WorkflowExecutionStateStatus,
attemptAt: row.attemptAt.toISOString(),
}
}

View File

@@ -21,6 +21,10 @@ import type { ExecutionCallbacks, ExecutionSnapshot } from '@/executor/execution
import type { ExecutionResult } from '@/executor/types'
import { Serializer } from '@/serializer'
import { mergeSubblockState } from '@/stores/workflows/server-utils'
import {
upsertWorkflowExecutionState,
type WorkflowExecutionStateStatus,
} from '@/lib/workflows/execution-state/service'
const logger = createLogger('ExecutionCore')
@@ -335,6 +339,73 @@ export async function executeWorkflowCore(
resolvedTriggerBlockId
)) as ExecutionResult
const executionMode =
snapshot.metadata.executionMode ?? (metadata.resumeFromSnapshot ? 'resume' : 'full')
snapshot.metadata.executionMode = executionMode
const storageTriggerBlockId = metadata.triggerBlockId ?? resolvedTriggerBlockId
metadata.triggerBlockId = storageTriggerBlockId
if (result.metadata) {
result.metadata.executionMode = result.metadata.executionMode ?? executionMode
if (storageTriggerBlockId) {
result.metadata.triggerBlockId =
result.metadata.triggerBlockId ?? storageTriggerBlockId
}
}
if (snapshot.metadata.useDraftState && storageTriggerBlockId) {
try {
const pendingQueue =
result.metadata?.pendingBlocks && result.metadata.pendingBlocks.length > 0
? result.metadata.pendingBlocks
: []
const serializedState = executorInstance.getSerializableState(pendingQueue)
if (serializedState) {
const resolvedInputs: Record<string, any> = {}
const resolvedOutputs: Record<string, any> = {}
for (const [blockId, state] of Object.entries(serializedState.blockStates)) {
resolvedInputs[blockId] = state.inputs ?? {}
resolvedOutputs[blockId] = state.output ?? {}
}
let status: WorkflowExecutionStateStatus = 'success'
if (result.status === 'paused') {
status = 'paused'
} else if (!result.success) {
status = 'failed'
}
const runVersion =
typeof (workflow as any)?.updatedAt === 'string'
? ((workflow as any).updatedAt as string)
: null
await upsertWorkflowExecutionState({
workflowId,
triggerBlockId: storageTriggerBlockId,
executionId: metadata.executionId ?? metadata.requestId ?? '',
runVersion,
serializedState,
serializedWorkflow,
resolvedInputs,
resolvedOutputs,
status,
})
} else {
logger.warn(`[${requestId}] Unable to capture execution state for persistence`, {
workflowId,
triggerBlockId: storageTriggerBlockId,
})
}
} catch (stateError) {
logger.error(`[${requestId}] Failed to persist workflow execution state`, {
workflowId,
triggerBlockId: storageTriggerBlockId,
error: stateError,
})
}
}
// Build trace spans for logging from the full execution result
const { traceSpans, totalDuration } = buildTraceSpans(result)

View File

@@ -0,0 +1,529 @@
import { DAGBuilder } from '@/executor/dag/builder'
import type { DAGNode } from '@/executor/dag/builder'
import type { SerializableExecutionState } from '@/executor/execution/snapshot'
import type { SerializedWorkflow } from '@/serializer/types'
import { createLogger } from '@/lib/logs/console/logger'
export interface RunFromBlockPlan {
snapshotState: SerializableExecutionState
resumePendingQueue: string[]
}
export interface BuildRunFromBlockPlanParams {
serializedWorkflow: SerializedWorkflow
previousState: SerializableExecutionState
previousResolvedInputs?: Record<string, any>
previousResolvedOutputs?: Record<string, any>
previousWorkflow?: SerializedWorkflow
startBlockId: string
triggerBlockId: string
}
const logger = createLogger('RunFromBlockPlanner')
/**
* Builds an execution plan for running a workflow starting from a specific block.
*
* Performs forward impact detection, upstream change analysis, backward pruning,
* and snapshot pruning so that only the minimally required nodes are re-executed.
*/
export function buildRunFromBlockPlan(params: BuildRunFromBlockPlanParams): RunFromBlockPlan {
const {
serializedWorkflow,
previousState,
previousWorkflow,
startBlockId,
triggerBlockId,
previousResolvedInputs,
previousResolvedOutputs,
} = params
const dagBuilder = new DAGBuilder()
const dag = dagBuilder.build(serializedWorkflow)
const originalToNodeIds = mapOriginalIdsToNodes(dag.nodes)
let startNodeIds = originalToNodeIds.get(startBlockId)
if (!startNodeIds || startNodeIds.length === 0) {
if (dag.nodes.has(startBlockId)) {
startNodeIds = [startBlockId]
} else {
throw new Error(`Start block ${startBlockId} not found in workflow DAG`)
}
}
const triggerNodeIds = determineTriggerNodeIds(triggerBlockId, originalToNodeIds, dag.nodes)
const stopNodeIds = new Set(startNodeIds)
const forwardImpact = collectDownstreamNodes(dag.nodes, new Set(startNodeIds))
const upstreamAnalysis = analyzeUpstreamDifferences({
dag,
triggerNodeIds,
stopNodeIds,
previousState,
previousResolvedInputs,
previousResolvedOutputs,
currentWorkflow: serializedWorkflow,
previousWorkflow,
})
const sinkNodes = identifySinkNodes(dag.nodes)
const ancestorSet = collectAncestors(dag.nodes, sinkNodes)
const prunedStartCandidates = deriveStartSet({
upstreamCandidates: upstreamAnalysis.startCandidates,
ancestorSet,
stopNodeIds,
})
if (prunedStartCandidates.size === 0) {
for (const nodeId of startNodeIds) {
prunedStartCandidates.add(nodeId)
}
}
const restartSeeds = new Set<string>([...prunedStartCandidates, ...stopNodeIds])
const restartScope = collectDownstreamNodes(dag.nodes, restartSeeds)
const loopIdsToReset = new Set<string>()
const parallelIdsToReset = new Set<string>()
const originalIdsInScope = new Set<string>()
// Queue all pruned upstream changes - executor handles dependency resolution for downstream
const queueStartSet = new Set(prunedStartCandidates)
for (const nodeId of restartScope) {
const node = dag.nodes.get(nodeId)
if (!node) continue
if (node.metadata?.loopId) {
loopIdsToReset.add(node.metadata.loopId)
}
if (node.metadata?.parallelId) {
parallelIdsToReset.add(node.metadata.parallelId)
}
if (node.metadata?.originalBlockId) {
originalIdsInScope.add(node.metadata.originalBlockId)
}
}
const snapshotState = cloneSerializableState(previousState)
// Reset block states and execution markers for nodes in restart scope
for (const nodeId of restartScope) {
delete snapshotState.blockStates[nodeId]
if (snapshotState.parallelBlockMapping) {
delete snapshotState.parallelBlockMapping[nodeId]
}
}
for (const originalId of originalIdsInScope) {
delete snapshotState.blockStates[originalId]
}
snapshotState.executedBlocks = (snapshotState.executedBlocks || []).filter(
(executedId) => !restartScope.has(executedId) && !originalIdsInScope.has(executedId)
)
snapshotState.blockLogs = (snapshotState.blockLogs || []).filter(
(log) => !restartScope.has(log.blockId) && !originalIdsInScope.has(log.blockId)
)
if (snapshotState.decisions) {
snapshotState.decisions.router = filterDecisionMap(
snapshotState.decisions.router,
restartScope,
originalIdsInScope
)
snapshotState.decisions.condition = filterDecisionMap(
snapshotState.decisions.condition,
restartScope,
originalIdsInScope
)
}
if (snapshotState.parallelExecutions) {
for (const parallelId of parallelIdsToReset) {
delete snapshotState.parallelExecutions[parallelId]
}
}
if (snapshotState.loopExecutions) {
for (const loopId of loopIdsToReset) {
delete snapshotState.loopExecutions[loopId]
}
}
if (snapshotState.completedLoops) {
snapshotState.completedLoops = snapshotState.completedLoops.filter(
(loopId) => !loopIdsToReset.has(loopId)
)
}
const resumePendingQueue = Array.from(queueStartSet).sort()
snapshotState.pendingQueue = [...resumePendingQueue]
if (snapshotState.activeExecutionPath) {
snapshotState.activeExecutionPath = snapshotState.activeExecutionPath.filter(
(nodeId) => !restartScope.has(nodeId)
)
}
logPlanSummary({
startBlockId,
triggerBlockId,
startNodeIds,
forwardImpact,
upstreamAnalysis,
sinkNodes,
ancestorSet,
prunedStartSet: prunedStartCandidates,
queueStartSet,
restartScope,
})
return {
snapshotState,
resumePendingQueue,
}
}
function mapOriginalIdsToNodes(nodes: Map<string, DAGNode>): Map<string, string[]> {
const mapping = new Map<string, string[]>()
for (const [nodeId, node] of nodes.entries()) {
const originalId = node.metadata?.originalBlockId ?? nodeId
if (!mapping.has(originalId)) {
mapping.set(originalId, [])
}
mapping.get(originalId)!.push(nodeId)
}
return mapping
}
function collectDownstreamNodes(
nodes: Map<string, DAGNode>,
seeds: Set<string>
): Set<string> {
const visited = new Set<string>(seeds)
const stack = [...seeds]
while (stack.length > 0) {
const current = stack.pop()!
const node = nodes.get(current)
if (!node) continue
for (const edge of node.outgoingEdges.values()) {
if (!visited.has(edge.target)) {
visited.add(edge.target)
stack.push(edge.target)
}
}
}
return visited
}
function collectAncestors(nodes: Map<string, DAGNode>, sinks: Set<string>): Set<string> {
const visited = new Set<string>()
const stack = [...sinks]
while (stack.length > 0) {
const current = stack.pop()!
if (visited.has(current)) {
continue
}
visited.add(current)
const node = nodes.get(current)
if (!node) {
continue
}
for (const incoming of node.incomingEdges.values()) {
if (!visited.has(incoming)) {
stack.push(incoming)
}
}
}
return visited
}
function cloneSerializableState(
state: SerializableExecutionState
): SerializableExecutionState {
return JSON.parse(JSON.stringify(state)) as SerializableExecutionState
}
function filterDecisionMap(
decisions: Record<string, string>,
restartScope: Set<string>,
originalIds: Set<string>
): Record<string, string> {
const filtered: Record<string, string> = {}
for (const [key, value] of Object.entries(decisions || {})) {
if (restartScope.has(key) || originalIds.has(key)) {
continue
}
filtered[key] = value
}
return filtered
}
function determineTriggerNodeIds(
triggerBlockId: string,
originalToNodeIds: Map<string, string[]>,
nodes: Map<string, DAGNode>
): string[] {
let triggerNodes = originalToNodeIds.get(triggerBlockId)
if ((!triggerNodes || triggerNodes.length === 0) && nodes.has(triggerBlockId)) {
triggerNodes = [triggerBlockId]
}
if (!triggerNodes || triggerNodes.length === 0) {
const rootNodes = Array.from(nodes.values())
.filter((node) => node.incomingEdges.size === 0)
.map((node) => node.id)
triggerNodes = rootNodes.length > 0 ? rootNodes : [triggerBlockId]
}
return triggerNodes
}
interface UpstreamAnalysisParams {
dag: ReturnType<DAGBuilder['build']>
triggerNodeIds: string[]
stopNodeIds: Set<string>
previousState: SerializableExecutionState
previousResolvedInputs?: Record<string, any>
previousResolvedOutputs?: Record<string, any>
currentWorkflow: SerializedWorkflow
previousWorkflow?: SerializedWorkflow
}
interface UpstreamAnalysisResult {
startCandidates: Map<string, string[]>
traversedNodes: Set<string>
}
function analyzeUpstreamDifferences(params: UpstreamAnalysisParams): UpstreamAnalysisResult {
const {
dag,
triggerNodeIds,
stopNodeIds,
previousState,
previousResolvedInputs,
previousResolvedOutputs,
currentWorkflow,
previousWorkflow,
} = params
const startCandidates = new Map<string, string[]>()
const traversedNodes = new Set<string>()
const stack = triggerNodeIds.map((nodeId) => ({ nodeId, upstreamChanged: false as boolean }))
const executedBlocks = new Set<string>(previousState.executedBlocks || [])
// Build a map of block IDs to their current and previous definitions
const currentBlocks = new Map<string, any>()
const previousBlocks = new Map<string, any>()
for (const block of currentWorkflow.blocks || []) {
currentBlocks.set(block.id, block)
}
if (previousWorkflow) {
for (const block of previousWorkflow.blocks || []) {
previousBlocks.set(block.id, block)
}
}
while (stack.length > 0) {
const { nodeId, upstreamChanged } = stack.pop()!
if (traversedNodes.has(nodeId)) {
continue
}
traversedNodes.add(nodeId)
const node = dag.nodes.get(nodeId)
if (!node) {
continue
}
const originalId = node.metadata?.originalBlockId ?? nodeId
const reasons: string[] = []
const previousBlockState =
previousState.blockStates[nodeId] ?? previousState.blockStates[originalId]
const hasPriorState =
previousBlockState !== undefined ||
executedBlocks.has(nodeId) ||
executedBlocks.has(originalId) ||
previousResolvedOutputs?.[originalId] !== undefined ||
previousResolvedInputs?.[originalId] !== undefined
if (!hasPriorState) {
reasons.push('missing_prior_state')
}
// Check if the block definition itself changed
const currentBlock = currentBlocks.get(originalId)
const previousBlock = previousBlocks.get(originalId)
if (currentBlock && previousBlock) {
// Compare the block definitions (excluding metadata like position)
const currentDefinition = JSON.stringify({
type: currentBlock.type,
subBlocks: currentBlock.subBlocks,
})
const previousDefinition = JSON.stringify({
type: previousBlock.type,
subBlocks: previousBlock.subBlocks,
})
if (currentDefinition !== previousDefinition) {
reasons.push('block_definition_changed')
}
} else if (currentBlock && !previousBlock) {
reasons.push('new_block')
}
// Note: We intentionally do NOT check incoming_edges_changed here because:
// - Workflow topology changes (adding/removing unrelated blocks) shouldn't invalidate this block
// - The output/input comparisons above already catch meaningful dependency changes
// - This prevents false positives when the DAG structure evolves between runs
if (stopNodeIds.has(nodeId)) {
reasons.push('target_block')
}
const hasLocalChange = reasons.length > 0
if (hasLocalChange) {
startCandidates.set(nodeId, reasons)
}
const shouldPropagateChange = upstreamChanged || hasLocalChange
if (stopNodeIds.has(nodeId)) {
continue
}
for (const { target } of node.outgoingEdges.values()) {
stack.push({ nodeId: target, upstreamChanged: shouldPropagateChange })
}
}
return {
startCandidates,
traversedNodes,
}
}
interface StartSetParams {
upstreamCandidates: Map<string, string[]>
ancestorSet: Set<string>
stopNodeIds: Set<string>
}
function deriveStartSet(params: StartSetParams): Set<string> {
const { upstreamCandidates, ancestorSet, stopNodeIds } = params
const finalStartSet = new Set<string>()
for (const candidate of upstreamCandidates.keys()) {
if (ancestorSet.has(candidate) || stopNodeIds.has(candidate)) {
finalStartSet.add(candidate)
}
}
for (const nodeId of stopNodeIds) {
finalStartSet.add(nodeId)
}
return finalStartSet
}
function identifySinkNodes(nodes: Map<string, DAGNode>): Set<string> {
const sinks = new Set<string>()
for (const node of nodes.values()) {
if (node.outgoingEdges.size === 0) {
sinks.add(node.id)
}
}
return sinks
}
function areStringSetsEqual(a: Set<string>, b: Set<string>): boolean {
if (a.size !== b.size) {
return false
}
for (const value of a) {
if (!b.has(value)) {
return false
}
}
return true
}
interface PlanSummaryLogParams {
startBlockId: string
triggerBlockId: string
startNodeIds: string[]
forwardImpact: Set<string>
upstreamAnalysis: UpstreamAnalysisResult
sinkNodes: Set<string>
ancestorSet: Set<string>
prunedStartSet: Set<string>
queueStartSet: Set<string>
restartScope: Set<string>
}
function logPlanSummary(params: PlanSummaryLogParams): void {
const {
startBlockId,
triggerBlockId,
startNodeIds,
forwardImpact,
upstreamAnalysis,
sinkNodes,
ancestorSet,
prunedStartSet,
queueStartSet,
restartScope,
} = params
logger.info('Run-from-block forward impact traversal completed', {
startBlockId,
startNodeIds,
affectedCount: forwardImpact.size,
affectedNodes: Array.from(forwardImpact),
})
const upstreamDetails = Array.from(upstreamAnalysis.startCandidates.entries()).map(
([nodeId, reasons]) => ({
nodeId,
reasons,
})
)
logger.info('Run-from-block upstream diff analysis', {
triggerBlockId,
traversedNodes: Array.from(upstreamAnalysis.traversedNodes),
startCandidates: upstreamDetails,
})
logger.info('Run-from-block backward pruning summary', {
sinkNodes: Array.from(sinkNodes),
ancestorCount: ancestorSet.size,
ancestorNodes: Array.from(ancestorSet),
prunedStartSet: Array.from(prunedStartSet),
})
logger.info('Run-from-block queue and restart scope', {
resumePendingQueue: Array.from(queueStartSet),
restartScope: Array.from(restartScope),
restartScopeSize: restartScope.size,
})
}

View File

@@ -0,0 +1,22 @@
CREATE TYPE "public"."workflow_execution_state_status" AS ENUM('success', 'failed', 'paused');--> statement-breakpoint
CREATE TABLE "workflow_execution_states" (
"id" text PRIMARY KEY NOT NULL,
"workflow_id" text NOT NULL,
"trigger_block_id" text NOT NULL,
"execution_id" text NOT NULL,
"run_version" text,
"serialized_state" jsonb NOT NULL,
"resolved_inputs" jsonb DEFAULT '{}'::jsonb NOT NULL,
"resolved_outputs" jsonb DEFAULT '{}'::jsonb NOT NULL,
"status" "workflow_execution_state_status" DEFAULT 'success' NOT NULL,
"attempt_at" timestamp DEFAULT now() NOT NULL
);
--> statement-breakpoint
ALTER TABLE "workflow_execution_states" ADD CONSTRAINT "workflow_execution_states_workflow_id_workflow_id_fk" FOREIGN KEY ("workflow_id") REFERENCES "public"."workflow"("id") ON DELETE cascade ON UPDATE no action;--> statement-breakpoint
ALTER TABLE "workflow_execution_states" ADD CONSTRAINT "workflow_execution_states_trigger_block_id_workflow_blocks_id_fk" FOREIGN KEY ("trigger_block_id") REFERENCES "public"."workflow_blocks"("id") ON DELETE cascade ON UPDATE no action;--> statement-breakpoint
CREATE UNIQUE INDEX "workflow_execution_states_workflow_trigger_unique" ON "workflow_execution_states" USING btree ("workflow_id","trigger_block_id");--> statement-breakpoint
CREATE INDEX "workflow_execution_states_workflow_idx" ON "workflow_execution_states" USING btree ("workflow_id");--> statement-breakpoint
CREATE INDEX "workflow_execution_states_trigger_idx" ON "workflow_execution_states" USING btree ("trigger_block_id");--> statement-breakpoint
CREATE INDEX "workflow_execution_states_execution_idx" ON "workflow_execution_states" USING btree ("execution_id");--> statement-breakpoint
CREATE INDEX "workflow_execution_states_status_idx" ON "workflow_execution_states" USING btree ("status");--> statement-breakpoint
CREATE INDEX "workflow_execution_states_attempt_at_idx" ON "workflow_execution_states" USING btree ("attempt_at");

View File

@@ -0,0 +1 @@
ALTER TABLE "workflow_execution_states" ADD COLUMN "serialized_workflow" jsonb;

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@@ -757,6 +757,20 @@
"when": 1762572820066,
"tag": "0108_cuddly_scream",
"breakpoints": true
},
{
"idx": 109,
"version": "7",
"when": 1762826396153,
"tag": "0109_solid_shiva",
"breakpoints": true
},
{
"idx": 110,
"version": "7",
"when": 1762832732250,
"tag": "0110_stale_impossible_man",
"breakpoints": true
}
]
}
}

View File

@@ -318,6 +318,44 @@ export const workflowExecutionLogs = pgTable(
})
)
export const workflowExecutionStateStatusEnum = pgEnum('workflow_execution_state_status', [
'success',
'failed',
'paused',
])
export const workflowExecutionStates = pgTable(
'workflow_execution_states',
{
id: text('id').primaryKey(),
workflowId: text('workflow_id')
.notNull()
.references(() => workflow.id, { onDelete: 'cascade' }),
triggerBlockId: text('trigger_block_id')
.notNull()
.references(() => workflowBlocks.id, { onDelete: 'cascade' }),
executionId: text('execution_id').notNull(),
runVersion: text('run_version'),
serializedState: jsonb('serialized_state').notNull(),
serializedWorkflow: jsonb('serialized_workflow'),
resolvedInputs: jsonb('resolved_inputs').notNull().default(sql`'{}'::jsonb`),
resolvedOutputs: jsonb('resolved_outputs').notNull().default(sql`'{}'::jsonb`),
status: workflowExecutionStateStatusEnum('status').notNull().default('success'),
attemptAt: timestamp('attempt_at').notNull().defaultNow(),
},
(table) => ({
workflowTriggerUnique: uniqueIndex('workflow_execution_states_workflow_trigger_unique').on(
table.workflowId,
table.triggerBlockId
),
workflowIdx: index('workflow_execution_states_workflow_idx').on(table.workflowId),
triggerIdx: index('workflow_execution_states_trigger_idx').on(table.triggerBlockId),
executionIdx: index('workflow_execution_states_execution_idx').on(table.executionId),
statusIdx: index('workflow_execution_states_status_idx').on(table.status),
attemptAtIdx: index('workflow_execution_states_attempt_at_idx').on(table.attemptAt),
})
)
export const pausedExecutions = pgTable(
'paused_executions',
{