From c68cda63ae99f389022c14358353014ded56e241 Mon Sep 17 00:00:00 2001 From: Siddharth Ganesan Date: Tue, 27 Jan 2026 17:53:37 -0800 Subject: [PATCH] Cleanup --- .../hooks/use-workflow-execution.ts | 65 +---------- apps/sim/executor/execution/executor.ts | 12 --- .../sim/executor/utils/run-from-block.test.ts | 101 +++++++++++++++++- apps/sim/executor/utils/run-from-block.ts | 24 ----- .../lib/workflows/executor/execution-core.ts | 13 +-- 5 files changed, 105 insertions(+), 110 deletions(-) diff --git a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/hooks/use-workflow-execution.ts b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/hooks/use-workflow-execution.ts index 5e4125db7..471cecd9b 100644 --- a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/hooks/use-workflow-execution.ts +++ b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/hooks/use-workflow-execution.ts @@ -1482,31 +1482,9 @@ export function useWorkflowExecution() { const candidates = resolveStartCandidates(mergedStates, { execution: 'manual' }) const candidate = candidates.find((c) => c.blockId === blockId) - logger.info('Run-from-block trigger analysis', { - blockId, - blockType: workflowBlocks[blockId]?.type, - blockTriggerMode: workflowBlocks[blockId]?.triggerMode, - candidateFound: !!candidate, - candidatePath: candidate?.path, - allCandidates: candidates.map((c) => ({ - blockId: c.blockId, - type: c.block.type, - path: c.path, - })), - }) - if (candidate) { - const needsMockPayload = triggerNeedsMockPayload(candidate) - logger.info('Trigger mock payload check', { - needsMockPayload, - path: candidate.path, - isExternalTrigger: candidate.path === StartBlockPath.EXTERNAL_TRIGGER, - blockType: candidate.block.type, - }) - - if (needsMockPayload) { + if (triggerNeedsMockPayload(candidate)) { workflowInput = extractTriggerMockPayload(candidate) - logger.info('Extracted mock payload for trigger block', { blockId, workflowInput }) } else if ( candidate.path === StartBlockPath.SPLIT_API || candidate.path === StartBlockPath.SPLIT_INPUT || @@ -1522,46 +1500,27 @@ export function useWorkflowExecution() { }) if (Object.keys(testInput).length > 0) { workflowInput = testInput - logger.info('Extracted test input for trigger block', { blockId, workflowInput }) } } } } else { - // Fallback for trigger blocks not found in candidates - // This can happen when the block is a trigger by position (no incoming edges) - // but wasn't classified as a start candidate (e.g., triggerMode not set) + // Fallback: block is trigger by position but not classified as start candidate const block = mergedStates[blockId] if (block) { const blockConfig = getBlock(block.type) const hasTriggers = blockConfig?.triggers?.available?.length if (hasTriggers || block.triggerMode) { - // Block has trigger capability - extract mock payload - const syntheticCandidate = { + workflowInput = extractTriggerMockPayload({ blockId, block, path: StartBlockPath.EXTERNAL_TRIGGER, - } - workflowInput = extractTriggerMockPayload(syntheticCandidate) - logger.info('Extracted mock payload for trigger block (fallback)', { - blockId, - blockType: block.type, - hasTriggers, - triggerMode: block.triggerMode, - workflowInput, }) } } } } - logger.info('Starting run-from-block execution', { - workflowId, - startBlockId: blockId, - isTriggerBlock, - hasInput: !!workflowInput, - }) - setIsExecuting(true) const executionId = uuidv4() const accumulatedBlockLogs: BlockLog[] = [] @@ -1576,10 +1535,6 @@ export function useWorkflowExecution() { sourceSnapshot: effectiveSnapshot, input: workflowInput, callbacks: { - onExecutionStarted: (data) => { - logger.info('Run-from-block execution started:', data) - }, - onBlockStarted: (data) => { activeBlocksSet.add(data.blockId) setActiveBlocks(new Set(activeBlocksSet)) @@ -1702,17 +1657,10 @@ export function useWorkflowExecution() { activeExecutionPath: Array.from(mergedExecutedBlocks), } setLastExecutionSnapshot(workflowId, updatedSnapshot) - logger.info('Updated execution snapshot after run-from-block', { - workflowId, - newBlocksExecuted: executedBlockIds.size, - }) } }, onExecutionError: (data) => { - logger.error('Run-from-block execution error:', data.error) - - // If block not found, the snapshot is stale - clear it if (data.error?.includes('Block not found in workflow')) { clearLastExecutionSnapshot(workflowId) addNotification({ @@ -1720,7 +1668,6 @@ export function useWorkflowExecution() { message: 'Workflow was modified. Run the workflow again to refresh.', workflowId, }) - logger.info('Cleared stale execution snapshot', { workflowId }) } else { addNotification({ level: 'error', @@ -1729,15 +1676,11 @@ export function useWorkflowExecution() { }) } }, - - onExecutionCancelled: () => { - logger.info('Run-from-block execution cancelled') - }, }, }) } catch (error) { if ((error as Error).name !== 'AbortError') { - logger.error('Run-from-block execution failed:', error) + logger.error('Run-from-block failed:', error) } } finally { setIsExecuting(false) diff --git a/apps/sim/executor/execution/executor.ts b/apps/sim/executor/execution/executor.ts index db6d32cfb..6048167cf 100644 --- a/apps/sim/executor/execution/executor.ts +++ b/apps/sim/executor/execution/executor.ts @@ -145,9 +145,6 @@ export class DAGExecutor { effectiveStartBlockId, dirtySetSize: dirtySet.size, upstreamSetSize: upstreamSet.size, - filteredBlockStatesCount: Object.keys(filteredBlockStates).length, - totalBlocks: dag.nodes.size, - dirtyBlocks: Array.from(dirtySet), }) // Remove incoming edges from non-dirty sources so convergent blocks don't wait for cached upstream @@ -164,10 +161,6 @@ export class DAGExecutor { for (const sourceId of nonDirtyIncoming) { node.incomingEdges.delete(sourceId) - logger.debug('Removed non-dirty incoming edge for run-from-block', { - nodeId, - sourceId, - }) } } @@ -327,11 +320,6 @@ export class DAGExecutor { if (isRegularBlock) { this.initializeStarterBlock(context, state, startBlockId) - logger.info('Run-from-block mode: initialized start block', { startBlockId }) - } else { - logger.info('Run-from-block mode: skipping starter block init for container/sentinel', { - startBlockId, - }) } } else { this.initializeStarterBlock(context, state, triggerBlockId) diff --git a/apps/sim/executor/utils/run-from-block.test.ts b/apps/sim/executor/utils/run-from-block.test.ts index 284379095..07e39c58d 100644 --- a/apps/sim/executor/utils/run-from-block.test.ts +++ b/apps/sim/executor/utils/run-from-block.test.ts @@ -1,9 +1,16 @@ import { describe, expect, it } from 'vitest' import type { DAG, DAGNode } from '@/executor/dag/builder' import type { DAGEdge, NodeMetadata } from '@/executor/dag/types' -import { computeDirtySet, validateRunFromBlock } from '@/executor/utils/run-from-block' +import { computeExecutionSets, validateRunFromBlock } from '@/executor/utils/run-from-block' import type { SerializedLoop, SerializedParallel } from '@/serializer/types' +/** + * Helper to extract dirty set from computeExecutionSets + */ +function computeDirtySet(dag: DAG, startBlockId: string): Set { + return computeExecutionSets(dag, startBlockId).dirtySet +} + /** * Helper to create a DAG node for testing */ @@ -491,3 +498,95 @@ describe('computeDirtySet with containers', () => { expect(dirtySet.has('A')).toBe(false) }) }) + +describe('computeExecutionSets upstream set', () => { + it('includes all upstream blocks in linear workflow', () => { + // A → B → C → D + const dag = createDAG([ + createNode('A', [{ target: 'B' }]), + createNode('B', [{ target: 'C' }]), + createNode('C', [{ target: 'D' }]), + createNode('D'), + ]) + + const { upstreamSet } = computeExecutionSets(dag, 'C') + + expect(upstreamSet.has('A')).toBe(true) + expect(upstreamSet.has('B')).toBe(true) + expect(upstreamSet.has('C')).toBe(false) // start block not in upstream + expect(upstreamSet.has('D')).toBe(false) // downstream + }) + + it('includes all branches in convergent upstream', () => { + // A → C + // B → C → D + const dag = createDAG([ + createNode('A', [{ target: 'C' }]), + createNode('B', [{ target: 'C' }]), + createNode('C', [{ target: 'D' }]), + createNode('D'), + ]) + + const { upstreamSet } = computeExecutionSets(dag, 'C') + + expect(upstreamSet.has('A')).toBe(true) + expect(upstreamSet.has('B')).toBe(true) + expect(upstreamSet.has('C')).toBe(false) + expect(upstreamSet.has('D')).toBe(false) + }) + + it('excludes parallel branches not in upstream path', () => { + // A → B → D + // A → C → D + // Running from B: upstream is A only, not C + const dag = createDAG([ + createNode('A', [{ target: 'B' }, { target: 'C' }]), + createNode('B', [{ target: 'D' }]), + createNode('C', [{ target: 'D' }]), + createNode('D'), + ]) + + const { upstreamSet, dirtySet } = computeExecutionSets(dag, 'B') + + // Upstream should only contain A + expect(upstreamSet.has('A')).toBe(true) + expect(upstreamSet.has('C')).toBe(false) // parallel branch, not upstream of B + // Dirty should contain B and D + expect(dirtySet.has('B')).toBe(true) + expect(dirtySet.has('D')).toBe(true) + expect(dirtySet.has('C')).toBe(false) + }) + + it('handles diamond pattern upstream correctly', () => { + // B + // ↗ ↘ + // A D → E + // ↘ ↗ + // C + // Running from D: upstream should be A, B, C + const dag = createDAG([ + createNode('A', [{ target: 'B' }, { target: 'C' }]), + createNode('B', [{ target: 'D' }]), + createNode('C', [{ target: 'D' }]), + createNode('D', [{ target: 'E' }]), + createNode('E'), + ]) + + const { upstreamSet, dirtySet } = computeExecutionSets(dag, 'D') + + expect(upstreamSet.has('A')).toBe(true) + expect(upstreamSet.has('B')).toBe(true) + expect(upstreamSet.has('C')).toBe(true) + expect(upstreamSet.has('D')).toBe(false) + expect(dirtySet.has('D')).toBe(true) + expect(dirtySet.has('E')).toBe(true) + }) + + it('returns empty upstream set for root block', () => { + const dag = createDAG([createNode('A', [{ target: 'B' }]), createNode('B')]) + + const { upstreamSet } = computeExecutionSets(dag, 'A') + + expect(upstreamSet.size).toBe(0) + }) +}) diff --git a/apps/sim/executor/utils/run-from-block.ts b/apps/sim/executor/utils/run-from-block.ts index 8bb6df179..5813d52b5 100644 --- a/apps/sim/executor/utils/run-from-block.ts +++ b/apps/sim/executor/utils/run-from-block.ts @@ -1,9 +1,6 @@ -import { createLogger } from '@sim/logger' import { LOOP, PARALLEL } from '@/executor/constants' import type { DAG } from '@/executor/dag/builder' -const logger = createLogger('run-from-block') - /** * Builds the sentinel-start node ID for a loop. */ @@ -112,30 +109,9 @@ export function computeExecutionSets(dag: DAG, startBlockId: string): ExecutionS } } - logger.debug('Computed execution sets', { - startBlockId, - traversalStartId, - dirtySetSize: dirty.size, - upstreamSetSize: upstream.size, - }) - return { dirtySet: dirty, upstreamSet: upstream } } -/** - * @deprecated Use computeExecutionSets instead for combined computation - */ -export function computeDirtySet(dag: DAG, startBlockId: string): Set { - return computeExecutionSets(dag, startBlockId).dirtySet -} - -/** - * @deprecated Use computeExecutionSets instead for combined computation - */ -export function computeUpstreamSet(dag: DAG, blockId: string): Set { - return computeExecutionSets(dag, blockId).upstreamSet -} - /** * Validates that a block can be used as a run-from-block starting point. * diff --git a/apps/sim/lib/workflows/executor/execution-core.ts b/apps/sim/lib/workflows/executor/execution-core.ts index fb15fb0bd..557bb284e 100644 --- a/apps/sim/lib/workflows/executor/execution-core.ts +++ b/apps/sim/lib/workflows/executor/execution-core.ts @@ -27,10 +27,7 @@ import type { } from '@/executor/execution/types' import type { ExecutionResult, NormalizedBlockOutput } from '@/executor/types' import { hasExecutionResult } from '@/executor/utils/errors' -import { - buildParallelSentinelEndId, - buildSentinelEndId, -} from '@/executor/utils/subflow-utils' +import { buildParallelSentinelEndId, buildSentinelEndId } from '@/executor/utils/subflow-utils' import { Serializer } from '@/serializer' const logger = createLogger('ExecutionCore') @@ -264,16 +261,8 @@ export async function executeWorkflowCore( if (stopAfterBlockId) { if (serializedWorkflow.loops?.[stopAfterBlockId]) { resolvedStopAfterBlockId = buildSentinelEndId(stopAfterBlockId) - logger.info(`[${requestId}] Resolved loop container to sentinel-end`, { - original: stopAfterBlockId, - resolved: resolvedStopAfterBlockId, - }) } else if (serializedWorkflow.parallels?.[stopAfterBlockId]) { resolvedStopAfterBlockId = buildParallelSentinelEndId(stopAfterBlockId) - logger.info(`[${requestId}] Resolved parallel container to sentinel-end`, { - original: stopAfterBlockId, - resolved: resolvedStopAfterBlockId, - }) } }