This commit is contained in:
Siddharth Ganesan
2025-11-10 19:46:27 -08:00
parent ff99d75055
commit 6d46b44e51
9 changed files with 8619 additions and 6 deletions

View File

@@ -528,14 +528,18 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
try {
const trimmedStartBlockId = startBlockId!.trim()
const triggerBlockIdForPlan = latestState.triggerBlockId || startCandidate.blockId
const plan = buildRunFromBlockPlan({
serializedWorkflow,
previousState: latestState.serializedState,
previousResolvedInputs: latestState.resolvedInputs,
previousResolvedOutputs: latestState.resolvedOutputs,
previousWorkflow: latestState.serializedWorkflow,
startBlockId: trimmedStartBlockId,
triggerBlockId: triggerBlockIdForPlan,
})
const triggerBlockIdForPlan = latestState.triggerBlockId || startCandidate.blockId
runFromBlockPlan = {
snapshotState: plan.snapshotState,
resumePendingQueue: plan.resumePendingQueue,

View File

@@ -19,6 +19,7 @@ const logger = createLogger('ExecutionEngine')
export class ExecutionEngine {
private readyQueue: string[] = []
private executing = new Set<Promise<void>>()
private executingNodes = new Set<string>()
private queueLock = Promise.resolve()
private finalOutput: NormalizedBlockOutput = {}
private pausedBlocks: Map<string, PauseMetadata> = new Map()

View File

@@ -4,6 +4,7 @@ import { and, eq } from 'drizzle-orm'
import { v4 as uuidv4 } from 'uuid'
import { createLogger } from '@/lib/logs/console/logger'
import type { SerializableExecutionState } from '@/executor/execution/snapshot'
import type { SerializedWorkflow } from '@/serializer/types'
const logger = createLogger('WorkflowExecutionStateService')
@@ -16,6 +17,7 @@ export interface WorkflowExecutionStateRecord {
executionId: string
runVersion: string | null
serializedState: SerializableExecutionState
serializedWorkflow?: SerializedWorkflow
resolvedInputs: Record<string, any>
resolvedOutputs: Record<string, any>
status: WorkflowExecutionStateStatus
@@ -28,6 +30,7 @@ export interface UpsertWorkflowExecutionStateParams {
executionId: string
runVersion?: string | null
serializedState: SerializableExecutionState
serializedWorkflow?: SerializedWorkflow
resolvedInputs: Record<string, any>
resolvedOutputs: Record<string, any>
status: WorkflowExecutionStateStatus
@@ -43,6 +46,7 @@ export async function upsertWorkflowExecutionState(
executionId,
runVersion = null,
serializedState,
serializedWorkflow,
resolvedInputs,
resolvedOutputs,
status,
@@ -56,6 +60,7 @@ export async function upsertWorkflowExecutionState(
executionId,
runVersion,
serializedState,
serializedWorkflow,
resolvedInputs,
resolvedOutputs,
status,
@@ -71,6 +76,7 @@ export async function upsertWorkflowExecutionState(
executionId,
runVersion,
serializedState,
serializedWorkflow,
resolvedInputs,
resolvedOutputs,
status,
@@ -123,6 +129,7 @@ function mapRow(row: typeof workflowExecutionStates.$inferSelect): WorkflowExecu
executionId: row.executionId,
runVersion: row.runVersion,
serializedState: row.serializedState as SerializableExecutionState,
serializedWorkflow: row.serializedWorkflow as SerializedWorkflow | undefined,
resolvedInputs: row.resolvedInputs as Record<string, any>,
resolvedOutputs: row.resolvedOutputs as Record<string, any>,
status: row.status as WorkflowExecutionStateStatus,

View File

@@ -386,6 +386,7 @@ export async function executeWorkflowCore(
executionId: metadata.executionId ?? metadata.requestId ?? '',
runVersion,
serializedState,
serializedWorkflow,
resolvedInputs,
resolvedOutputs,
status,

View File

@@ -2,6 +2,7 @@ import { DAGBuilder } from '@/executor/dag/builder'
import type { DAGNode } from '@/executor/dag/builder'
import type { SerializableExecutionState } from '@/executor/execution/snapshot'
import type { SerializedWorkflow } from '@/serializer/types'
import { createLogger } from '@/lib/logs/console/logger'
export interface RunFromBlockPlan {
snapshotState: SerializableExecutionState
@@ -11,11 +12,31 @@ export interface RunFromBlockPlan {
export interface BuildRunFromBlockPlanParams {
serializedWorkflow: SerializedWorkflow
previousState: SerializableExecutionState
previousResolvedInputs?: Record<string, any>
previousResolvedOutputs?: Record<string, any>
previousWorkflow?: SerializedWorkflow
startBlockId: string
triggerBlockId: string
}
const logger = createLogger('RunFromBlockPlanner')
/**
* Builds an execution plan for running a workflow starting from a specific block.
*
* Performs forward impact detection, upstream change analysis, backward pruning,
* and snapshot pruning so that only the minimally required nodes are re-executed.
*/
export function buildRunFromBlockPlan(params: BuildRunFromBlockPlanParams): RunFromBlockPlan {
const { serializedWorkflow, previousState, startBlockId } = params
const {
serializedWorkflow,
previousState,
previousWorkflow,
startBlockId,
triggerBlockId,
previousResolvedInputs,
previousResolvedOutputs,
} = params
const dagBuilder = new DAGBuilder()
const dag = dagBuilder.build(serializedWorkflow)
@@ -31,11 +52,46 @@ export function buildRunFromBlockPlan(params: BuildRunFromBlockPlanParams): RunF
}
}
const restartScope = collectDownstreamNodes(dag.nodes, new Set(startNodeIds))
const triggerNodeIds = determineTriggerNodeIds(triggerBlockId, originalToNodeIds, dag.nodes)
const stopNodeIds = new Set(startNodeIds)
const forwardImpact = collectDownstreamNodes(dag.nodes, new Set(startNodeIds))
const upstreamAnalysis = analyzeUpstreamDifferences({
dag,
triggerNodeIds,
stopNodeIds,
previousState,
previousResolvedInputs,
previousResolvedOutputs,
currentWorkflow: serializedWorkflow,
previousWorkflow,
})
const sinkNodes = identifySinkNodes(dag.nodes)
const ancestorSet = collectAncestors(dag.nodes, sinkNodes)
const prunedStartCandidates = deriveStartSet({
upstreamCandidates: upstreamAnalysis.startCandidates,
ancestorSet,
stopNodeIds,
})
if (prunedStartCandidates.size === 0) {
for (const nodeId of startNodeIds) {
prunedStartCandidates.add(nodeId)
}
}
const restartSeeds = new Set<string>([...prunedStartCandidates, ...stopNodeIds])
const restartScope = collectDownstreamNodes(dag.nodes, restartSeeds)
const loopIdsToReset = new Set<string>()
const parallelIdsToReset = new Set<string>()
const originalIdsInScope = new Set<string>()
// Queue all pruned upstream changes - executor handles dependency resolution for downstream
const queueStartSet = new Set(prunedStartCandidates)
for (const nodeId of restartScope) {
const node = dag.nodes.get(nodeId)
if (!node) continue
@@ -102,7 +158,8 @@ export function buildRunFromBlockPlan(params: BuildRunFromBlockPlanParams): RunF
)
}
snapshotState.pendingQueue = [...startNodeIds]
const resumePendingQueue = Array.from(queueStartSet).sort()
snapshotState.pendingQueue = [...resumePendingQueue]
if (snapshotState.activeExecutionPath) {
snapshotState.activeExecutionPath = snapshotState.activeExecutionPath.filter(
@@ -110,9 +167,22 @@ export function buildRunFromBlockPlan(params: BuildRunFromBlockPlanParams): RunF
)
}
logPlanSummary({
startBlockId,
triggerBlockId,
startNodeIds,
forwardImpact,
upstreamAnalysis,
sinkNodes,
ancestorSet,
prunedStartSet: prunedStartCandidates,
queueStartSet,
restartScope,
})
return {
snapshotState,
resumePendingQueue: [...startNodeIds],
resumePendingQueue,
}
}
@@ -151,6 +221,32 @@ function collectDownstreamNodes(
return visited
}
function collectAncestors(nodes: Map<string, DAGNode>, sinks: Set<string>): Set<string> {
const visited = new Set<string>()
const stack = [...sinks]
while (stack.length > 0) {
const current = stack.pop()!
if (visited.has(current)) {
continue
}
visited.add(current)
const node = nodes.get(current)
if (!node) {
continue
}
for (const incoming of node.incomingEdges.values()) {
if (!visited.has(incoming)) {
stack.push(incoming)
}
}
}
return visited
}
function cloneSerializableState(
state: SerializableExecutionState
): SerializableExecutionState {
@@ -172,3 +268,262 @@ function filterDecisionMap(
return filtered
}
function determineTriggerNodeIds(
triggerBlockId: string,
originalToNodeIds: Map<string, string[]>,
nodes: Map<string, DAGNode>
): string[] {
let triggerNodes = originalToNodeIds.get(triggerBlockId)
if ((!triggerNodes || triggerNodes.length === 0) && nodes.has(triggerBlockId)) {
triggerNodes = [triggerBlockId]
}
if (!triggerNodes || triggerNodes.length === 0) {
const rootNodes = Array.from(nodes.values())
.filter((node) => node.incomingEdges.size === 0)
.map((node) => node.id)
triggerNodes = rootNodes.length > 0 ? rootNodes : [triggerBlockId]
}
return triggerNodes
}
interface UpstreamAnalysisParams {
dag: ReturnType<DAGBuilder['build']>
triggerNodeIds: string[]
stopNodeIds: Set<string>
previousState: SerializableExecutionState
previousResolvedInputs?: Record<string, any>
previousResolvedOutputs?: Record<string, any>
currentWorkflow: SerializedWorkflow
previousWorkflow?: SerializedWorkflow
}
interface UpstreamAnalysisResult {
startCandidates: Map<string, string[]>
traversedNodes: Set<string>
}
function analyzeUpstreamDifferences(params: UpstreamAnalysisParams): UpstreamAnalysisResult {
const {
dag,
triggerNodeIds,
stopNodeIds,
previousState,
previousResolvedInputs,
previousResolvedOutputs,
currentWorkflow,
previousWorkflow,
} = params
const startCandidates = new Map<string, string[]>()
const traversedNodes = new Set<string>()
const stack = triggerNodeIds.map((nodeId) => ({ nodeId, upstreamChanged: false as boolean }))
const executedBlocks = new Set<string>(previousState.executedBlocks || [])
// Build a map of block IDs to their current and previous definitions
const currentBlocks = new Map<string, any>()
const previousBlocks = new Map<string, any>()
for (const block of currentWorkflow.blocks || []) {
currentBlocks.set(block.id, block)
}
if (previousWorkflow) {
for (const block of previousWorkflow.blocks || []) {
previousBlocks.set(block.id, block)
}
}
while (stack.length > 0) {
const { nodeId, upstreamChanged } = stack.pop()!
if (traversedNodes.has(nodeId)) {
continue
}
traversedNodes.add(nodeId)
const node = dag.nodes.get(nodeId)
if (!node) {
continue
}
const originalId = node.metadata?.originalBlockId ?? nodeId
const reasons: string[] = []
const previousBlockState =
previousState.blockStates[nodeId] ?? previousState.blockStates[originalId]
const hasPriorState =
previousBlockState !== undefined ||
executedBlocks.has(nodeId) ||
executedBlocks.has(originalId) ||
previousResolvedOutputs?.[originalId] !== undefined ||
previousResolvedInputs?.[originalId] !== undefined
if (!hasPriorState) {
reasons.push('missing_prior_state')
}
// Check if the block definition itself changed
const currentBlock = currentBlocks.get(originalId)
const previousBlock = previousBlocks.get(originalId)
if (currentBlock && previousBlock) {
// Compare the block definitions (excluding metadata like position)
const currentDefinition = JSON.stringify({
type: currentBlock.type,
subBlocks: currentBlock.subBlocks,
})
const previousDefinition = JSON.stringify({
type: previousBlock.type,
subBlocks: previousBlock.subBlocks,
})
if (currentDefinition !== previousDefinition) {
reasons.push('block_definition_changed')
}
} else if (currentBlock && !previousBlock) {
reasons.push('new_block')
}
// Note: We intentionally do NOT check incoming_edges_changed here because:
// - Workflow topology changes (adding/removing unrelated blocks) shouldn't invalidate this block
// - The output/input comparisons above already catch meaningful dependency changes
// - This prevents false positives when the DAG structure evolves between runs
if (stopNodeIds.has(nodeId)) {
reasons.push('target_block')
}
const hasLocalChange = reasons.length > 0
if (hasLocalChange) {
startCandidates.set(nodeId, reasons)
}
const shouldPropagateChange = upstreamChanged || hasLocalChange
if (stopNodeIds.has(nodeId)) {
continue
}
for (const { target } of node.outgoingEdges.values()) {
stack.push({ nodeId: target, upstreamChanged: shouldPropagateChange })
}
}
return {
startCandidates,
traversedNodes,
}
}
interface StartSetParams {
upstreamCandidates: Map<string, string[]>
ancestorSet: Set<string>
stopNodeIds: Set<string>
}
function deriveStartSet(params: StartSetParams): Set<string> {
const { upstreamCandidates, ancestorSet, stopNodeIds } = params
const finalStartSet = new Set<string>()
for (const candidate of upstreamCandidates.keys()) {
if (ancestorSet.has(candidate) || stopNodeIds.has(candidate)) {
finalStartSet.add(candidate)
}
}
for (const nodeId of stopNodeIds) {
finalStartSet.add(nodeId)
}
return finalStartSet
}
function identifySinkNodes(nodes: Map<string, DAGNode>): Set<string> {
const sinks = new Set<string>()
for (const node of nodes.values()) {
if (node.outgoingEdges.size === 0) {
sinks.add(node.id)
}
}
return sinks
}
function areStringSetsEqual(a: Set<string>, b: Set<string>): boolean {
if (a.size !== b.size) {
return false
}
for (const value of a) {
if (!b.has(value)) {
return false
}
}
return true
}
interface PlanSummaryLogParams {
startBlockId: string
triggerBlockId: string
startNodeIds: string[]
forwardImpact: Set<string>
upstreamAnalysis: UpstreamAnalysisResult
sinkNodes: Set<string>
ancestorSet: Set<string>
prunedStartSet: Set<string>
queueStartSet: Set<string>
restartScope: Set<string>
}
function logPlanSummary(params: PlanSummaryLogParams): void {
const {
startBlockId,
triggerBlockId,
startNodeIds,
forwardImpact,
upstreamAnalysis,
sinkNodes,
ancestorSet,
prunedStartSet,
queueStartSet,
restartScope,
} = params
logger.info('Run-from-block forward impact traversal completed', {
startBlockId,
startNodeIds,
affectedCount: forwardImpact.size,
affectedNodes: Array.from(forwardImpact),
})
const upstreamDetails = Array.from(upstreamAnalysis.startCandidates.entries()).map(
([nodeId, reasons]) => ({
nodeId,
reasons,
})
)
logger.info('Run-from-block upstream diff analysis', {
triggerBlockId,
traversedNodes: Array.from(upstreamAnalysis.traversedNodes),
startCandidates: upstreamDetails,
})
logger.info('Run-from-block backward pruning summary', {
sinkNodes: Array.from(sinkNodes),
ancestorCount: ancestorSet.size,
ancestorNodes: Array.from(ancestorSet),
prunedStartSet: Array.from(prunedStartSet),
})
logger.info('Run-from-block queue and restart scope', {
resumePendingQueue: Array.from(queueStartSet),
restartScope: Array.from(restartScope),
restartScopeSize: restartScope.size,
})
}

View File

@@ -0,0 +1 @@
ALTER TABLE "workflow_execution_states" ADD COLUMN "serialized_workflow" jsonb;

File diff suppressed because it is too large Load Diff

View File

@@ -764,6 +764,13 @@
"when": 1762826396153,
"tag": "0109_solid_shiva",
"breakpoints": true
},
{
"idx": 110,
"version": "7",
"when": 1762832732250,
"tag": "0110_stale_impossible_man",
"breakpoints": true
}
]
}

View File

@@ -337,6 +337,7 @@ export const workflowExecutionStates = pgTable(
executionId: text('execution_id').notNull(),
runVersion: text('run_version'),
serializedState: jsonb('serialized_state').notNull(),
serializedWorkflow: jsonb('serialized_workflow'),
resolvedInputs: jsonb('resolved_inputs').notNull().default(sql`'{}'::jsonb`),
resolvedOutputs: jsonb('resolved_outputs').notNull().default(sql`'{}'::jsonb`),
status: workflowExecutionStateStatusEnum('status').notNull().default('success'),