From be95a7dbd896a78a005b17e5a9e9ecc289f55fe8 Mon Sep 17 00:00:00 2001 From: Siddharth Ganesan Date: Tue, 27 Jan 2026 10:33:31 -0800 Subject: [PATCH] Fix --- .../[id]/execute-from-block/route.ts | 54 ++++++++- .../components/trace-spans/trace-spans.tsx | 9 +- .../components/action-bar/action-bar.tsx | 17 ++- .../components/block-menu/block-menu.tsx | 32 +++++ .../workflow-block/workflow-block.tsx | 4 +- .../[workspaceId]/w/[workflowId]/workflow.tsx | 56 +++++++-- apps/sim/executor/orchestrators/loop.ts | 13 ++- apps/sim/executor/orchestrators/parallel.ts | 16 ++- .../sim/executor/utils/run-from-block.test.ts | 109 ++++++++++++++++++ apps/sim/executor/utils/run-from-block.ts | 93 ++++++++++++--- apps/sim/stores/logs/filters/types.ts | 2 + 11 files changed, 365 insertions(+), 40 deletions(-) diff --git a/apps/sim/app/api/workflows/[id]/execute-from-block/route.ts b/apps/sim/app/api/workflows/[id]/execute-from-block/route.ts index 14cc81248..5a2bb9f34 100644 --- a/apps/sim/app/api/workflows/[id]/execute-from-block/route.ts +++ b/apps/sim/app/api/workflows/[id]/execute-from-block/route.ts @@ -8,7 +8,9 @@ import { checkHybridAuth } from '@/lib/auth/hybrid' import { generateRequestId } from '@/lib/core/utils/request' import { SSE_HEADERS } from '@/lib/core/utils/sse' import { getPersonalAndWorkspaceEnv } from '@/lib/environment/utils' -import { markExecutionCancelled } from '@/lib/execution/cancellation' +import { clearExecutionCancellation, markExecutionCancelled } from '@/lib/execution/cancellation' +import { LoggingSession } from '@/lib/logs/execution/logging-session' +import { buildTraceSpans } from '@/lib/logs/execution/trace-spans/trace-spans' import { loadWorkflowFromNormalizedTables } from '@/lib/workflows/persistence/utils' import { type ExecutionEvent, encodeSSEEvent } from '@/lib/workflows/executor/execution-events' import { DAGExecutor } from '@/executor/execution/executor' @@ -93,7 +95,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id: // Load workflow record to get workspaceId const [workflowRecord] = await db - .select({ workspaceId: workflowTable.workspaceId }) + .select({ workspaceId: workflowTable.workspaceId, userId: workflowTable.userId }) .from(workflowTable) .where(eq(workflowTable.id, workflowId)) .limit(1) @@ -103,6 +105,10 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id: } const workspaceId = workflowRecord.workspaceId + const workflowUserId = workflowRecord.userId + + // Initialize logging session for cost tracking + const loggingSession = new LoggingSession(workflowId, executionId, 'manual', requestId) // Load workflow state const workflowData = await loadWorkflowFromNormalizedTables(workflowId) @@ -131,6 +137,13 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id: true ) + // Start logging session + await loggingSession.safeStart({ + userId, + workspaceId, + variables: {}, + }) + const encoder = new TextEncoder() const abortController = new AbortController() let isStreamClosed = false @@ -191,6 +204,9 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id: callbackData: { input?: unknown; output: NormalizedBlockOutput; executionTime: number }, iterationContext?: IterationContext ) => { + // Log to session for cost tracking + await loggingSession.onBlockComplete(blockId, blockName, blockType, callbackData) + const hasError = (callbackData.output as any)?.error if (hasError) { @@ -299,7 +315,18 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id: sourceSnapshot as SerializableExecutionState ) + // Build trace spans from fresh execution logs only + // Trace spans show what actually executed in this run + const { traceSpans, totalDuration } = buildTraceSpans(result) + if (result.status === 'cancelled') { + await loggingSession.safeCompleteWithCancellation({ + endedAt: new Date().toISOString(), + totalDurationMs: totalDuration || 0, + traceSpans: traceSpans || [], + }) + await clearExecutionCancellation(executionId) + sendEvent({ type: 'execution:cancelled', timestamp: new Date().toISOString(), @@ -312,6 +339,16 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id: return } + // Complete logging session + await loggingSession.safeComplete({ + endedAt: new Date().toISOString(), + totalDurationMs: totalDuration || 0, + finalOutput: result.output || {}, + traceSpans: traceSpans || [], + workflowInput: {}, + }) + await clearExecutionCancellation(executionId) + sendEvent({ type: 'execution:completed', timestamp: new Date().toISOString(), @@ -330,6 +367,19 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id: logger.error(`[${requestId}] Run-from-block execution failed: ${errorMessage}`) const executionResult = hasExecutionResult(error) ? error.executionResult : undefined + const { traceSpans } = executionResult ? buildTraceSpans(executionResult) : { traceSpans: [] } + + // Complete logging session with error + await loggingSession.safeCompleteWithError({ + endedAt: new Date().toISOString(), + totalDurationMs: executionResult?.metadata?.duration || 0, + error: { + message: errorMessage, + stackTrace: error instanceof Error ? error.stack : undefined, + }, + traceSpans, + }) + await clearExecutionCancellation(executionId) sendEvent({ type: 'execution:error', diff --git a/apps/sim/app/workspace/[workspaceId]/logs/components/log-details/components/trace-spans/trace-spans.tsx b/apps/sim/app/workspace/[workspaceId]/logs/components/log-details/components/trace-spans/trace-spans.tsx index dab65614c..73998bb2f 100644 --- a/apps/sim/app/workspace/[workspaceId]/logs/components/log-details/components/trace-spans/trace-spans.tsx +++ b/apps/sim/app/workspace/[workspaceId]/logs/components/log-details/components/trace-spans/trace-spans.tsx @@ -528,6 +528,7 @@ const TraceSpanNode = memo(function TraceSpanNode({ const isDirectError = span.status === 'error' const hasNestedError = hasErrorInTree(span) const showErrorStyle = isDirectError || hasNestedError + const isCached = span.cached === true const { icon: BlockIcon, bgColor } = getBlockIconAndColor(span.type, span.name) @@ -586,7 +587,7 @@ const TraceSpanNode = memo(function TraceSpanNode({ isIterationType(lowerType) || lowerType === 'workflow' || lowerType === 'workflow_input' return ( -
+
{/* Node Header Row */}
{!isIterationType(span.type) && (
{BlockIcon && } @@ -623,6 +627,7 @@ const TraceSpanNode = memo(function TraceSpanNode({ style={{ color: showErrorStyle ? 'var(--text-error)' : 'var(--text-secondary)' }} > {span.name} + {isCached && (cached)} {isToggleable && ( )} - {canRunFromBlock && ( + {!isNoteBlock && ( - {isExecuting ? 'Execution in progress' : getTooltipMessage('Run from this block')} + {(() => { + if (disabled) return getTooltipMessage('Run from this block') + if (isExecuting) return 'Execution in progress' + if (!hasExecutionSnapshot) return 'Run workflow first' + if (!wasExecuted) return 'Block not executed in last run' + if (isInsideSubflow) return 'Cannot run from inside subflow' + return 'Run from this block' + })()} )} diff --git a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/block-menu/block-menu.tsx b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/block-menu/block-menu.tsx index c3a4d2ea8..28edd6784 100644 --- a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/block-menu/block-menu.tsx +++ b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/block-menu/block-menu.tsx @@ -40,9 +40,15 @@ export interface BlockMenuProps { onRemoveFromSubflow: () => void onOpenEditor: () => void onRename: () => void + onRunFromBlock?: () => void hasClipboard?: boolean showRemoveFromSubflow?: boolean + /** Whether run from block is available (has snapshot, was executed, not inside subflow) */ + canRunFromBlock?: boolean + /** Reason why run from block is disabled (for tooltip) */ + runFromBlockDisabledReason?: string disableEdit?: boolean + isExecuting?: boolean } /** @@ -65,9 +71,13 @@ export function BlockMenu({ onRemoveFromSubflow, onOpenEditor, onRename, + onRunFromBlock, hasClipboard = false, showRemoveFromSubflow = false, + canRunFromBlock = false, + runFromBlockDisabledReason, disableEdit = false, + isExecuting = false, }: BlockMenuProps) { const isSingleBlock = selectedBlocks.length === 1 @@ -203,6 +213,28 @@ export function BlockMenu({ )} + {/* Run from block - only for single non-note block selection */} + {isSingleBlock && !allNoteBlocks && ( + <> + + { + if (canRunFromBlock && !isExecuting) { + onRunFromBlock?.() + onClose() + } + }} + > + {isExecuting + ? 'Execution in progress...' + : !canRunFromBlock && runFromBlockDisabledReason + ? runFromBlockDisabledReason + : 'Run from this block'} + + + )} + {/* Destructive action */} {isPending && (
diff --git a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/workflow.tsx b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/workflow.tsx index 030781c4e..491995b0b 100644 --- a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/workflow.tsx +++ b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/workflow.tsx @@ -758,13 +758,16 @@ const WorkflowContent = React.memo(() => { [collaborativeBatchAddBlocks, setSelectedEdges, setPendingSelection] ) - const { activeBlockIds, pendingBlocks, isDebugging } = useExecutionStore( - useShallow((state) => ({ - activeBlockIds: state.activeBlockIds, - pendingBlocks: state.pendingBlocks, - isDebugging: state.isDebugging, - })) - ) + const { activeBlockIds, pendingBlocks, isDebugging, isExecuting, getLastExecutionSnapshot } = + useExecutionStore( + useShallow((state) => ({ + activeBlockIds: state.activeBlockIds, + pendingBlocks: state.pendingBlocks, + isDebugging: state.isDebugging, + isExecuting: state.isExecuting, + getLastExecutionSnapshot: state.getLastExecutionSnapshot, + })) + ) const [dragStartParentId, setDragStartParentId] = useState(null) @@ -988,6 +991,16 @@ const WorkflowContent = React.memo(() => { } }, [contextMenuBlocks]) + const handleContextRunFromBlock = useCallback(() => { + if (contextMenuBlocks.length !== 1) return + const blockId = contextMenuBlocks[0].id + window.dispatchEvent( + new CustomEvent('run-from-block', { + detail: { blockId, workflowId: workflowIdParam }, + }) + ) + }, [contextMenuBlocks, workflowIdParam]) + const handleContextAddBlock = useCallback(() => { useSearchModalStore.getState().open() }, []) @@ -3308,11 +3321,40 @@ const WorkflowContent = React.memo(() => { onRemoveFromSubflow={handleContextRemoveFromSubflow} onOpenEditor={handleContextOpenEditor} onRename={handleContextRename} + onRunFromBlock={handleContextRunFromBlock} hasClipboard={hasClipboard()} showRemoveFromSubflow={contextMenuBlocks.some( (b) => b.parentId && (b.parentType === 'loop' || b.parentType === 'parallel') )} + canRunFromBlock={ + contextMenuBlocks.length === 1 && + (() => { + const block = contextMenuBlocks[0] + const snapshot = getLastExecutionSnapshot(workflowIdParam) + const wasExecuted = snapshot?.executedBlocks.includes(block.id) ?? false + const isNoteBlock = block.type === 'note' + const isInsideSubflow = + block.parentId && (block.parentType === 'loop' || block.parentType === 'parallel') + return !!snapshot && wasExecuted && !isNoteBlock && !isInsideSubflow && !isExecuting + })() + } + runFromBlockDisabledReason={ + contextMenuBlocks.length === 1 + ? (() => { + const block = contextMenuBlocks[0] + const snapshot = getLastExecutionSnapshot(workflowIdParam) + const wasExecuted = snapshot?.executedBlocks.includes(block.id) ?? false + const isInsideSubflow = + block.parentId && (block.parentType === 'loop' || block.parentType === 'parallel') + if (!snapshot) return 'Run workflow first' + if (!wasExecuted) return 'Block not executed in last run' + if (isInsideSubflow) return 'Cannot run from inside subflow' + return undefined + })() + : undefined + } disableEdit={!effectivePermissions.canEdit} + isExecuting={isExecuting} /> { expect(result.valid).toBe(true) }) + + it('accepts loop container when executed', () => { + // Loop container with sentinel nodes + const loopId = 'loop-container-1' + const sentinelStartId = `loop-${loopId}-sentinel-start` + const sentinelEndId = `loop-${loopId}-sentinel-end` + const dag = createDAG([ + createNode('A', [{ target: sentinelStartId }]), + createNode(sentinelStartId, [{ target: 'B' }], { isSentinel: true, sentinelType: 'start', loopId }), + createNode('B', [{ target: sentinelEndId }], { isLoopNode: true, loopId }), + createNode(sentinelEndId, [{ target: 'C' }], { isSentinel: true, sentinelType: 'end', loopId }), + createNode('C'), + ]) + dag.loopConfigs.set(loopId, { id: loopId, nodes: ['B'], iterations: 3, loopType: 'for' } as any) + const executedBlocks = new Set(['A', loopId, sentinelStartId, 'B', sentinelEndId, 'C']) + + const result = validateRunFromBlock(loopId, dag, executedBlocks) + + expect(result.valid).toBe(true) + }) + + it('accepts parallel container when executed', () => { + // Parallel container with sentinel nodes + const parallelId = 'parallel-container-1' + const sentinelStartId = `parallel-${parallelId}-sentinel-start` + const sentinelEndId = `parallel-${parallelId}-sentinel-end` + const dag = createDAG([ + createNode('A', [{ target: sentinelStartId }]), + createNode(sentinelStartId, [{ target: 'B₍0₎' }], { isSentinel: true, sentinelType: 'start', parallelId }), + createNode('B₍0₎', [{ target: sentinelEndId }], { isParallelBranch: true, parallelId }), + createNode(sentinelEndId, [{ target: 'C' }], { isSentinel: true, sentinelType: 'end', parallelId }), + createNode('C'), + ]) + dag.parallelConfigs.set(parallelId, { id: parallelId, nodes: ['B'], count: 2 } as any) + const executedBlocks = new Set(['A', parallelId, sentinelStartId, 'B₍0₎', sentinelEndId, 'C']) + + const result = validateRunFromBlock(parallelId, dag, executedBlocks) + + expect(result.valid).toBe(true) + }) + + it('rejects loop container that was not executed', () => { + const loopId = 'loop-container-1' + const sentinelStartId = `loop-${loopId}-sentinel-start` + const dag = createDAG([ + createNode(sentinelStartId, [], { isSentinel: true, sentinelType: 'start', loopId }), + ]) + dag.loopConfigs.set(loopId, { id: loopId, nodes: [], iterations: 3, loopType: 'for' } as any) + const executedBlocks = new Set() // Loop was not executed + + const result = validateRunFromBlock(loopId, dag, executedBlocks) + + expect(result.valid).toBe(false) + expect(result.error).toContain('was not executed') + }) +}) + +describe('computeDirtySet with containers', () => { + it('includes loop container and all downstream when running from loop', () => { + // A → loop-sentinel-start → B (inside loop) → loop-sentinel-end → C + const loopId = 'loop-1' + const sentinelStartId = `loop-${loopId}-sentinel-start` + const sentinelEndId = `loop-${loopId}-sentinel-end` + const dag = createDAG([ + createNode('A', [{ target: sentinelStartId }]), + createNode(sentinelStartId, [{ target: 'B' }], { isSentinel: true, sentinelType: 'start', loopId }), + createNode('B', [{ target: sentinelEndId }], { isLoopNode: true, loopId }), + createNode(sentinelEndId, [{ target: 'C' }], { isSentinel: true, sentinelType: 'end', loopId }), + createNode('C'), + ]) + dag.loopConfigs.set(loopId, { id: loopId, nodes: ['B'], iterations: 3, loopType: 'for' } as any) + + const dirtySet = computeDirtySet(dag, loopId) + + // Should include loop container, sentinel-start, B, sentinel-end, C + expect(dirtySet.has(loopId)).toBe(true) + expect(dirtySet.has(sentinelStartId)).toBe(true) + expect(dirtySet.has('B')).toBe(true) + expect(dirtySet.has(sentinelEndId)).toBe(true) + expect(dirtySet.has('C')).toBe(true) + // Should NOT include A (upstream) + expect(dirtySet.has('A')).toBe(false) + }) + + it('includes parallel container and all downstream when running from parallel', () => { + // A → parallel-sentinel-start → B₍0₎ → parallel-sentinel-end → C + const parallelId = 'parallel-1' + const sentinelStartId = `parallel-${parallelId}-sentinel-start` + const sentinelEndId = `parallel-${parallelId}-sentinel-end` + const dag = createDAG([ + createNode('A', [{ target: sentinelStartId }]), + createNode(sentinelStartId, [{ target: 'B₍0₎' }], { isSentinel: true, sentinelType: 'start', parallelId }), + createNode('B₍0₎', [{ target: sentinelEndId }], { isParallelBranch: true, parallelId }), + createNode(sentinelEndId, [{ target: 'C' }], { isSentinel: true, sentinelType: 'end', parallelId }), + createNode('C'), + ]) + dag.parallelConfigs.set(parallelId, { id: parallelId, nodes: ['B'], count: 2 } as any) + + const dirtySet = computeDirtySet(dag, parallelId) + + // Should include parallel container, sentinel-start, B₍0₎, sentinel-end, C + expect(dirtySet.has(parallelId)).toBe(true) + expect(dirtySet.has(sentinelStartId)).toBe(true) + expect(dirtySet.has('B₍0₎')).toBe(true) + expect(dirtySet.has(sentinelEndId)).toBe(true) + expect(dirtySet.has('C')).toBe(true) + // Should NOT include A (upstream) + expect(dirtySet.has('A')).toBe(false) + }) }) diff --git a/apps/sim/executor/utils/run-from-block.ts b/apps/sim/executor/utils/run-from-block.ts index 57e1e81e8..94dccb056 100644 --- a/apps/sim/executor/utils/run-from-block.ts +++ b/apps/sim/executor/utils/run-from-block.ts @@ -1,8 +1,37 @@ import { createLogger } from '@sim/logger' +import { LOOP, PARALLEL } from '@/executor/constants' import type { DAG } from '@/executor/dag/builder' const logger = createLogger('run-from-block') +/** + * Builds the sentinel-start node ID for a loop. + */ +function buildLoopSentinelStartId(loopId: string): string { + return `${LOOP.SENTINEL.PREFIX}${loopId}${LOOP.SENTINEL.START_SUFFIX}` +} + +/** + * Builds the sentinel-start node ID for a parallel. + */ +function buildParallelSentinelStartId(parallelId: string): string { + return `${PARALLEL.SENTINEL.PREFIX}${parallelId}${PARALLEL.SENTINEL.START_SUFFIX}` +} + +/** + * Checks if a block ID is a loop or parallel container and returns the sentinel-start ID if so. + * Returns null if the block is not a container. + */ +function resolveContainerToSentinelStart(blockId: string, dag: DAG): string | null { + if (dag.loopConfigs.has(blockId)) { + return buildLoopSentinelStartId(blockId) + } + if (dag.parallelConfigs.has(blockId)) { + return buildParallelSentinelStartId(blockId) + } + return null +} + /** * Result of validating a block for run-from-block execution. */ @@ -25,13 +54,25 @@ export interface RunFromBlockContext { * Computes all blocks that need re-execution when running from a specific block. * Uses BFS to find all downstream blocks reachable via outgoing edges. * + * For loop/parallel containers, starts from the sentinel-start node and includes + * the container ID itself in the dirty set. + * * @param dag - The workflow DAG * @param startBlockId - The block to start execution from * @returns Set of block IDs that are "dirty" and need re-execution */ export function computeDirtySet(dag: DAG, startBlockId: string): Set { const dirty = new Set([startBlockId]) - const queue = [startBlockId] + + // For loop/parallel containers, resolve to sentinel-start for BFS traversal + const sentinelStartId = resolveContainerToSentinelStart(startBlockId, dag) + const traversalStartId = sentinelStartId ?? startBlockId + + if (sentinelStartId) { + dirty.add(sentinelStartId) + } + + const queue = [traversalStartId] while (queue.length > 0) { const nodeId = queue.shift()! @@ -48,6 +89,7 @@ export function computeDirtySet(dag: DAG, startBlockId: string): Set { logger.debug('Computed dirty set', { startBlockId, + traversalStartId, dirtySetSize: dirty.size, dirtyBlocks: Array.from(dirty), }) @@ -59,9 +101,9 @@ export function computeDirtySet(dag: DAG, startBlockId: string): Set { * Validates that a block can be used as a run-from-block starting point. * * Validation rules: - * - Block must exist in the DAG - * - Block cannot be inside a loop - * - Block cannot be inside a parallel + * - Block must exist in the DAG (or be a loop/parallel container) + * - Block cannot be inside a loop (but loop containers are allowed) + * - Block cannot be inside a parallel (but parallel containers are allowed) * - Block cannot be a sentinel node * - Block must have been executed in the source run * @@ -77,26 +119,45 @@ export function validateRunFromBlock( ): RunFromBlockValidation { const node = dag.nodes.get(blockId) - if (!node) { + // Check if this is a loop or parallel container (not in dag.nodes but in configs) + const isLoopContainer = dag.loopConfigs.has(blockId) + const isParallelContainer = dag.parallelConfigs.has(blockId) + const isContainer = isLoopContainer || isParallelContainer + + if (!node && !isContainer) { return { valid: false, error: `Block not found in workflow: ${blockId}` } } - if (node.metadata.isLoopNode) { - return { - valid: false, - error: `Cannot run from block inside loop: ${node.metadata.loopId}`, + // For containers, verify the sentinel-start exists + if (isContainer) { + const sentinelStartId = resolveContainerToSentinelStart(blockId, dag) + if (!sentinelStartId || !dag.nodes.has(sentinelStartId)) { + return { + valid: false, + error: `Container sentinel not found for: ${blockId}`, + } } } - if (node.metadata.isParallelBranch) { - return { - valid: false, - error: `Cannot run from block inside parallel: ${node.metadata.parallelId}`, + // For regular nodes, check if inside loop/parallel + if (node) { + if (node.metadata.isLoopNode) { + return { + valid: false, + error: `Cannot run from block inside loop: ${node.metadata.loopId}`, + } } - } - if (node.metadata.isSentinel) { - return { valid: false, error: 'Cannot run from sentinel node' } + if (node.metadata.isParallelBranch) { + return { + valid: false, + error: `Cannot run from block inside parallel: ${node.metadata.parallelId}`, + } + } + + if (node.metadata.isSentinel) { + return { valid: false, error: 'Cannot run from sentinel node' } + } } if (!executedBlocks.has(blockId)) { diff --git a/apps/sim/stores/logs/filters/types.ts b/apps/sim/stores/logs/filters/types.ts index f533b6996..de6ec7b89 100644 --- a/apps/sim/stores/logs/filters/types.ts +++ b/apps/sim/stores/logs/filters/types.ts @@ -98,6 +98,8 @@ export interface TraceSpan { total?: number } providerTiming?: ProviderTiming + /** Whether this span represents a cached (not re-executed) block in run-from-block mode */ + cached?: boolean } export interface WorkflowLog {