From 7a0aaa460dd211b9870a119e9f4eac7718a9ed8f Mon Sep 17 00:00:00 2001 From: Siddharth Ganesan Date: Tue, 27 Jan 2026 12:30:46 -0800 Subject: [PATCH] Clean up --- .../hooks/use-workflow-execution.ts | 6 ----- apps/sim/executor/execution/engine.ts | 2 -- apps/sim/executor/execution/executor.ts | 27 ++----------------- apps/sim/executor/utils/run-from-block.ts | 8 ------ 4 files changed, 2 insertions(+), 41 deletions(-) diff --git a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/hooks/use-workflow-execution.ts b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/hooks/use-workflow-execution.ts index 10ee2ba75..c52785294 100644 --- a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/hooks/use-workflow-execution.ts +++ b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/hooks/use-workflow-execution.ts @@ -1068,10 +1068,8 @@ export function useWorkflowExecution() { logs: accumulatedBlockLogs, } - // Store execution snapshot for run-from-block if (data.success && activeWorkflowId) { if (stopAfterBlockId) { - // Partial run (run-until-block): merge with existing snapshot const existingSnapshot = getLastExecutionSnapshot(activeWorkflowId) const mergedBlockStates = { ...(existingSnapshot?.blockStates || {}), @@ -1096,7 +1094,6 @@ export function useWorkflowExecution() { totalExecutedBlocks: mergedExecutedBlocks.size, }) } else { - // Full run: replace snapshot entirely const snapshot: SerializableExecutionState = { blockStates: Object.fromEntries(accumulatedBlockStates), executedBlocks: Array.from(executedBlockIds), @@ -1443,7 +1440,6 @@ export function useWorkflowExecution() { return } - // Check if all upstream dependencies have cached outputs const workflowEdges = useWorkflowStore.getState().edges const incomingEdges = workflowEdges.filter((edge) => edge.target === blockId) const dependenciesSatisfied = @@ -1504,7 +1500,6 @@ export function useWorkflowExecution() { executionTime: data.durationMs, }) - // Skip adding loop/parallel containers to console and logs const isContainerBlock = data.blockType === 'loop' || data.blockType === 'parallel' if (isContainerBlock) return @@ -1584,7 +1579,6 @@ export function useWorkflowExecution() { onExecutionCompleted: (data) => { if (data.success) { - // Merge new states with snapshot states for updated snapshot const mergedBlockStates: Record = { ...snapshot.blockStates } for (const [bId, state] of accumulatedBlockStates) { mergedBlockStates[bId] = state diff --git a/apps/sim/executor/execution/engine.ts b/apps/sim/executor/execution/engine.ts index b519fceef..0c6b6a1e5 100644 --- a/apps/sim/executor/execution/engine.ts +++ b/apps/sim/executor/execution/engine.ts @@ -260,7 +260,6 @@ export class ExecutionEngine { } private initializeQueue(triggerBlockId?: string): void { - // Run-from-block mode: start directly from specified block if (this.context.runFromBlockContext) { const { startBlockId } = this.context.runFromBlockContext logger.info('Initializing queue for run-from-block mode', { @@ -397,7 +396,6 @@ export class ExecutionEngine { this.finalOutput = output } - // Check if we should stop after this block (run-until-block feature) if (this.context.stopAfterBlockId === nodeId) { logger.info('Stopping execution after target block', { nodeId }) this.stoppedEarlyFlag = true diff --git a/apps/sim/executor/execution/executor.ts b/apps/sim/executor/execution/executor.ts index d20b5a22f..ec5fde24f 100644 --- a/apps/sim/executor/execution/executor.ts +++ b/apps/sim/executor/execution/executor.ts @@ -100,39 +100,22 @@ export class DAGExecutor { } /** - * Execute workflow starting from a specific block, using cached outputs - * for all upstream/unaffected blocks from the source snapshot. - * - * This implements Jupyter notebook-style execution where: - * - The start block and all downstream blocks are re-executed - * - Upstream blocks retain their cached outputs from the source snapshot - * - The result is a merged execution state - * - * @param workflowId - The workflow ID - * @param startBlockId - The block to start execution from - * @param sourceSnapshot - The execution state from a previous run - * @returns Merged execution result with cached + fresh outputs + * Execute from a specific block using cached outputs for upstream blocks. */ async executeFromBlock( workflowId: string, startBlockId: string, sourceSnapshot: SerializableExecutionState ): Promise { - // Build full DAG (no trigger constraint - we need all blocks for validation) const dag = this.dagBuilder.build(this.workflow) - // Validate the start block const executedBlocks = new Set(sourceSnapshot.executedBlocks) const validation = validateRunFromBlock(startBlockId, dag, executedBlocks) if (!validation.valid) { throw new Error(validation.error) } - // Compute dirty set (blocks that will be re-executed) const dirtySet = computeDirtySet(dag, startBlockId) - - // Resolve container IDs to sentinel IDs for execution - // The engine needs to start from the sentinel node, not the container ID const effectiveStartBlockId = resolveContainerToSentinelStart(startBlockId, dag) ?? startBlockId logger.info('Executing from block', { @@ -144,9 +127,7 @@ export class DAGExecutor { dirtyBlocks: Array.from(dirtySet), }) - // For convergent blocks in the dirty set, remove incoming edges from non-dirty sources. - // This ensures that a dirty block waiting on multiple inputs doesn't wait for non-dirty - // upstream blocks (whose outputs are already cached). + // Remove incoming edges from non-dirty sources so convergent blocks don't wait for cached upstream for (const nodeId of dirtySet) { const node = dag.nodes.get(nodeId) if (!node) continue @@ -167,14 +148,12 @@ export class DAGExecutor { } } - // Create context with snapshot state + runFromBlockContext const runFromBlockContext = { startBlockId: effectiveStartBlockId, dirtySet } const { context, state } = this.createExecutionContext(workflowId, undefined, { snapshotState: sourceSnapshot, runFromBlockContext, }) - // Setup orchestrators and engine (same as execute()) const resolver = new VariableResolver(this.workflow, this.workflowVariables, state) const loopOrchestrator = new LoopOrchestrator(dag, state, resolver) loopOrchestrator.setContextExtensions(this.contextExtensions) @@ -194,7 +173,6 @@ export class DAGExecutor { ) const engine = new ExecutionEngine(context, dag, edgeManager, nodeOrchestrator) - // Run and return result return await engine.run() } @@ -214,7 +192,6 @@ export class DAGExecutor { ? new Set(snapshotState.executedBlocks) : new Set() - // In run-from-block mode, clear the executed status for dirty blocks so they can be re-executed if (overrides?.runFromBlockContext) { const { dirtySet } = overrides.runFromBlockContext executedBlocks = new Set([...executedBlocks].filter((id) => !dirtySet.has(id))) diff --git a/apps/sim/executor/utils/run-from-block.ts b/apps/sim/executor/utils/run-from-block.ts index a12be5ab2..260ffe796 100644 --- a/apps/sim/executor/utils/run-from-block.ts +++ b/apps/sim/executor/utils/run-from-block.ts @@ -63,8 +63,6 @@ export interface RunFromBlockContext { */ export function computeDirtySet(dag: DAG, startBlockId: string): Set { const dirty = new Set([startBlockId]) - - // For loop/parallel containers, resolve to sentinel-start for BFS traversal const sentinelStartId = resolveContainerToSentinelStart(startBlockId, dag) const traversalStartId = sentinelStartId ?? startBlockId @@ -118,8 +116,6 @@ export function validateRunFromBlock( executedBlocks: Set ): RunFromBlockValidation { const node = dag.nodes.get(blockId) - - // Check if this is a loop or parallel container (not in dag.nodes but in configs) const isLoopContainer = dag.loopConfigs.has(blockId) const isParallelContainer = dag.parallelConfigs.has(blockId) const isContainer = isLoopContainer || isParallelContainer @@ -128,7 +124,6 @@ export function validateRunFromBlock( return { valid: false, error: `Block not found in workflow: ${blockId}` } } - // For containers, verify the sentinel-start exists if (isContainer) { const sentinelStartId = resolveContainerToSentinelStart(blockId, dag) if (!sentinelStartId || !dag.nodes.has(sentinelStartId)) { @@ -139,7 +134,6 @@ export function validateRunFromBlock( } } - // For regular nodes, check if inside loop/parallel if (node) { if (node.metadata.isLoopNode) { return { @@ -159,8 +153,6 @@ export function validateRunFromBlock( return { valid: false, error: 'Cannot run from sentinel node' } } - // Check if all upstream dependencies have been executed (have cached outputs) - // If no incoming edges (trigger/start block), dependencies are satisfied if (node.incomingEdges.size > 0) { for (const sourceId of node.incomingEdges.keys()) { if (!executedBlocks.has(sourceId)) {