From 4996eea2ee893e5736df52622474a4d84a972a63 Mon Sep 17 00:00:00 2001 From: Siddharth Ganesan Date: Tue, 27 Jan 2026 17:38:41 -0800 Subject: [PATCH] Fix --- apps/sim/executor/execution/executor.ts | 23 ++++++- apps/sim/executor/utils/run-from-block.ts | 65 +++++++++++++++---- .../lib/workflows/executor/execution-core.ts | 24 ++++++- 3 files changed, 96 insertions(+), 16 deletions(-) diff --git a/apps/sim/executor/execution/executor.ts b/apps/sim/executor/execution/executor.ts index 1dd3d4fb1..db6d32cfb 100644 --- a/apps/sim/executor/execution/executor.ts +++ b/apps/sim/executor/execution/executor.ts @@ -16,7 +16,7 @@ import { NodeExecutionOrchestrator } from '@/executor/orchestrators/node' import { ParallelOrchestrator } from '@/executor/orchestrators/parallel' import type { BlockState, ExecutionContext, ExecutionResult } from '@/executor/types' import { - computeDirtySet, + computeExecutionSets, type RunFromBlockContext, resolveContainerToSentinelStart, validateRunFromBlock, @@ -121,14 +121,31 @@ export class DAGExecutor { throw new Error(validation.error) } - const dirtySet = computeDirtySet(dag, startBlockId) + const { dirtySet, upstreamSet } = computeExecutionSets(dag, startBlockId) const effectiveStartBlockId = resolveContainerToSentinelStart(startBlockId, dag) ?? startBlockId + // Filter snapshot to only include upstream blocks - prevents references to non-upstream blocks + const filteredBlockStates: Record = {} + for (const [blockId, state] of Object.entries(sourceSnapshot.blockStates)) { + if (upstreamSet.has(blockId)) { + filteredBlockStates[blockId] = state + } + } + const filteredExecutedBlocks = sourceSnapshot.executedBlocks.filter((id) => upstreamSet.has(id)) + + const filteredSnapshot: SerializableExecutionState = { + ...sourceSnapshot, + blockStates: filteredBlockStates, + executedBlocks: filteredExecutedBlocks, + } + logger.info('Executing from block', { workflowId, startBlockId, effectiveStartBlockId, dirtySetSize: dirtySet.size, + upstreamSetSize: upstreamSet.size, + filteredBlockStatesCount: Object.keys(filteredBlockStates).length, totalBlocks: dag.nodes.size, dirtyBlocks: Array.from(dirtySet), }) @@ -156,7 +173,7 @@ export class DAGExecutor { const runFromBlockContext = { startBlockId: effectiveStartBlockId, dirtySet } const { context, state } = this.createExecutionContext(workflowId, undefined, { - snapshotState: sourceSnapshot, + snapshotState: filteredSnapshot, runFromBlockContext, }) diff --git a/apps/sim/executor/utils/run-from-block.ts b/apps/sim/executor/utils/run-from-block.ts index 260ffe796..8bb6df179 100644 --- a/apps/sim/executor/utils/run-from-block.ts +++ b/apps/sim/executor/utils/run-from-block.ts @@ -51,18 +51,30 @@ export interface RunFromBlockContext { } /** - * Computes all blocks that need re-execution when running from a specific block. - * Uses BFS to find all downstream blocks reachable via outgoing edges. + * Result of computing execution sets for run-from-block mode. + */ +export interface ExecutionSets { + /** Blocks that need re-execution (start block + all downstream) */ + dirtySet: Set + /** Blocks that are upstream (ancestors) of the start block */ + upstreamSet: Set +} + +/** + * Computes both the dirty set (downstream) and upstream set in a single traversal pass. + * - Dirty set: start block + all blocks reachable via outgoing edges (need re-execution) + * - Upstream set: all blocks reachable via incoming edges (can be referenced) * * For loop/parallel containers, starts from the sentinel-start node and includes * the container ID itself in the dirty set. * * @param dag - The workflow DAG * @param startBlockId - The block to start execution from - * @returns Set of block IDs that are "dirty" and need re-execution + * @returns Object containing both dirtySet and upstreamSet */ -export function computeDirtySet(dag: DAG, startBlockId: string): Set { +export function computeExecutionSets(dag: DAG, startBlockId: string): ExecutionSets { const dirty = new Set([startBlockId]) + const upstream = new Set() const sentinelStartId = resolveContainerToSentinelStart(startBlockId, dag) const traversalStartId = sentinelStartId ?? startBlockId @@ -70,29 +82,58 @@ export function computeDirtySet(dag: DAG, startBlockId: string): Set { dirty.add(sentinelStartId) } - const queue = [traversalStartId] - - while (queue.length > 0) { - const nodeId = queue.shift()! + // BFS downstream for dirty set + const downstreamQueue = [traversalStartId] + while (downstreamQueue.length > 0) { + const nodeId = downstreamQueue.shift()! const node = dag.nodes.get(nodeId) if (!node) continue for (const [, edge] of node.outgoingEdges) { if (!dirty.has(edge.target)) { dirty.add(edge.target) - queue.push(edge.target) + downstreamQueue.push(edge.target) } } } - logger.debug('Computed dirty set', { + // BFS upstream for upstream set + const upstreamQueue = [traversalStartId] + while (upstreamQueue.length > 0) { + const nodeId = upstreamQueue.shift()! + const node = dag.nodes.get(nodeId) + if (!node) continue + + for (const sourceId of node.incomingEdges) { + if (!upstream.has(sourceId)) { + upstream.add(sourceId) + upstreamQueue.push(sourceId) + } + } + } + + logger.debug('Computed execution sets', { startBlockId, traversalStartId, dirtySetSize: dirty.size, - dirtyBlocks: Array.from(dirty), + upstreamSetSize: upstream.size, }) - return dirty + return { dirtySet: dirty, upstreamSet: upstream } +} + +/** + * @deprecated Use computeExecutionSets instead for combined computation + */ +export function computeDirtySet(dag: DAG, startBlockId: string): Set { + return computeExecutionSets(dag, startBlockId).dirtySet +} + +/** + * @deprecated Use computeExecutionSets instead for combined computation + */ +export function computeUpstreamSet(dag: DAG, blockId: string): Set { + return computeExecutionSets(dag, blockId).upstreamSet } /** diff --git a/apps/sim/lib/workflows/executor/execution-core.ts b/apps/sim/lib/workflows/executor/execution-core.ts index bd9e2f69f..fb15fb0bd 100644 --- a/apps/sim/lib/workflows/executor/execution-core.ts +++ b/apps/sim/lib/workflows/executor/execution-core.ts @@ -27,6 +27,10 @@ import type { } from '@/executor/execution/types' import type { ExecutionResult, NormalizedBlockOutput } from '@/executor/types' import { hasExecutionResult } from '@/executor/utils/errors' +import { + buildParallelSentinelEndId, + buildSentinelEndId, +} from '@/executor/utils/subflow-utils' import { Serializer } from '@/serializer' const logger = createLogger('ExecutionCore') @@ -255,6 +259,24 @@ export async function executeWorkflowCore( processedInput = input || {} + // Resolve stopAfterBlockId for loop/parallel containers to their sentinel-end IDs + let resolvedStopAfterBlockId = stopAfterBlockId + if (stopAfterBlockId) { + if (serializedWorkflow.loops?.[stopAfterBlockId]) { + resolvedStopAfterBlockId = buildSentinelEndId(stopAfterBlockId) + logger.info(`[${requestId}] Resolved loop container to sentinel-end`, { + original: stopAfterBlockId, + resolved: resolvedStopAfterBlockId, + }) + } else if (serializedWorkflow.parallels?.[stopAfterBlockId]) { + resolvedStopAfterBlockId = buildParallelSentinelEndId(stopAfterBlockId) + logger.info(`[${requestId}] Resolved parallel container to sentinel-end`, { + original: stopAfterBlockId, + resolved: resolvedStopAfterBlockId, + }) + } + } + // Create and execute workflow with callbacks if (resumeFromSnapshot) { logger.info(`[${requestId}] Resume execution detected`, { @@ -305,7 +327,7 @@ export async function executeWorkflowCore( abortSignal, includeFileBase64, base64MaxBytes, - stopAfterBlockId, + stopAfterBlockId: resolvedStopAfterBlockId, } const executorInstance = new Executor({