diff --git a/apps/sim/app/api/workflows/[id]/execute/route.ts b/apps/sim/app/api/workflows/[id]/execute/route.ts index 856a1a3c9..47f81ef12 100644 --- a/apps/sim/app/api/workflows/[id]/execute/route.ts +++ b/apps/sim/app/api/workflows/[id]/execute/route.ts @@ -53,6 +53,7 @@ const ExecuteWorkflowSchema = z.object({ parallels: z.record(z.any()).optional(), }) .optional(), + stopAfterBlockId: z.string().optional(), }) export const runtime = 'nodejs' @@ -222,6 +223,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id: includeFileBase64, base64MaxBytes, workflowStateOverride, + stopAfterBlockId, } = validation.data // For API key and internal JWT auth, the entire body is the input (except for our control fields) @@ -237,6 +239,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id: includeFileBase64, base64MaxBytes, workflowStateOverride, + stopAfterBlockId: _stopAfterBlockId, workflowId: _workflowId, // Also exclude workflowId used for internal JWT auth ...rest } = body @@ -434,6 +437,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id: loggingSession, includeFileBase64, base64MaxBytes, + stopAfterBlockId, }) const outputWithBase64 = includeFileBase64 @@ -722,6 +726,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id: abortSignal: abortController.signal, includeFileBase64, base64MaxBytes, + stopAfterBlockId, }) if (result.status === 'paused') { diff --git a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/action-bar/action-bar.tsx b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/action-bar/action-bar.tsx index 162c4f24f..33edef0ec 100644 --- a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/action-bar/action-bar.tsx +++ b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/action-bar/action-bar.tsx @@ -4,6 +4,7 @@ import { Button, Copy, Tooltip, Trash2 } from '@/components/emcn' import { cn } from '@/lib/core/utils/cn' import { isInputDefinitionTrigger } from '@/lib/workflows/triggers/input-definition-triggers' import { useUserPermissionsContext } from '@/app/workspace/[workspaceId]/providers/workspace-permissions-provider' +import { useWorkflowExecution } from '@/app/workspace/[workspaceId]/w/[workflowId]/hooks' import { validateTriggerPaste } from '@/app/workspace/[workspaceId]/w/[workflowId]/utils' import { useCollaborativeWorkflow } from '@/hooks/use-collaborative-workflow' import { useExecutionStore } from '@/stores/execution' @@ -50,6 +51,7 @@ export const ActionBar = memo( collaborativeBatchToggleBlockHandles, } = useCollaborativeWorkflow() const { setPendingSelection } = useWorkflowRegistry() + const { handleRunFromBlock } = useWorkflowExecution() const addNotification = useNotificationStore((s) => s.addNotification) @@ -101,6 +103,7 @@ export const ActionBar = memo( const { activeWorkflowId } = useWorkflowRegistry() const { isExecuting, getLastExecutionSnapshot } = useExecutionStore() const userPermissions = useUserPermissionsContext() + const edges = useWorkflowStore((state) => state.edges) const isStartBlock = isInputDefinitionTrigger(blockType) const isResponseBlock = blockType === 'response' @@ -109,29 +112,29 @@ export const ActionBar = memo( const isInsideSubflow = parentId && (parentType === 'loop' || parentType === 'parallel') // Check if run-from-block is available - const hasExecutionSnapshot = activeWorkflowId - ? !!getLastExecutionSnapshot(activeWorkflowId) - : false - const wasExecuted = activeWorkflowId - ? getLastExecutionSnapshot(activeWorkflowId)?.executedBlocks.includes(blockId) ?? false - : false + // Block can run if all its upstream dependencies have cached outputs + const snapshot = activeWorkflowId ? getLastExecutionSnapshot(activeWorkflowId) : null + const hasExecutionSnapshot = !!snapshot + const dependenciesSatisfied = (() => { + if (!snapshot) return false + // Find all blocks that feed into this block + const incomingEdges = edges.filter((edge) => edge.target === blockId) + // If no incoming edges (trigger/start block), dependencies are satisfied + if (incomingEdges.length === 0) return true + // All source blocks must have been executed (have cached outputs) + return incomingEdges.every((edge) => snapshot.executedBlocks.includes(edge.source)) + })() const canRunFromBlock = hasExecutionSnapshot && - wasExecuted && + dependenciesSatisfied && !isNoteBlock && !isInsideSubflow && !isExecuting - const handleRunFromBlock = useCallback(() => { + const handleRunFromBlockClick = useCallback(() => { if (!activeWorkflowId || !canRunFromBlock) return - - // Dispatch a custom event to trigger run-from-block execution - window.dispatchEvent( - new CustomEvent('run-from-block', { - detail: { blockId, workflowId: activeWorkflowId }, - }) - ) - }, [blockId, activeWorkflowId, canRunFromBlock]) + handleRunFromBlock(blockId, activeWorkflowId) + }, [blockId, activeWorkflowId, canRunFromBlock, handleRunFromBlock]) /** * Get appropriate tooltip message based on disabled state @@ -165,7 +168,7 @@ export const ActionBar = memo( onClick={(e) => { e.stopPropagation() if (canRunFromBlock && !disabled) { - handleRunFromBlock() + handleRunFromBlockClick() } }} className={ACTION_BUTTON_STYLES} @@ -179,7 +182,7 @@ export const ActionBar = memo( if (disabled) return getTooltipMessage('Run from this block') if (isExecuting) return 'Execution in progress' if (!hasExecutionSnapshot) return 'Run workflow first' - if (!wasExecuted) return 'Block not executed in last run' + if (!dependenciesSatisfied) return 'Run upstream blocks first' if (isInsideSubflow) return 'Cannot run from inside subflow' return 'Run from this block' })()} diff --git a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/block-menu/block-menu.tsx b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/block-menu/block-menu.tsx index 28edd6784..8e1290ab6 100644 --- a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/block-menu/block-menu.tsx +++ b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/block-menu/block-menu.tsx @@ -41,6 +41,7 @@ export interface BlockMenuProps { onOpenEditor: () => void onRename: () => void onRunFromBlock?: () => void + onRunUntilBlock?: () => void hasClipboard?: boolean showRemoveFromSubflow?: boolean /** Whether run from block is available (has snapshot, was executed, not inside subflow) */ @@ -72,6 +73,7 @@ export function BlockMenu({ onOpenEditor, onRename, onRunFromBlock, + onRunUntilBlock, hasClipboard = false, showRemoveFromSubflow = false, canRunFromBlock = false, @@ -213,7 +215,7 @@ export function BlockMenu({ )} - {/* Run from block - only for single non-note block selection */} + {/* Run from/until block - only for single non-note block selection */} {isSingleBlock && !allNoteBlocks && ( <> @@ -232,6 +234,17 @@ export function BlockMenu({ ? runFromBlockDisabledReason : 'Run from this block'} + { + if (!isExecuting) { + onRunUntilBlock?.() + onClose() + } + }} + > + {isExecuting ? 'Execution in progress...' : 'Run until this block'} + )} diff --git a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/hooks/use-workflow-execution.ts b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/hooks/use-workflow-execution.ts index 7c05cb040..f2907a19b 100644 --- a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/hooks/use-workflow-execution.ts +++ b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/hooks/use-workflow-execution.ts @@ -33,8 +33,6 @@ import { useWorkflowStore } from '@/stores/workflows/workflow/store' const logger = createLogger('useWorkflowExecution') -// Module-level guard to prevent duplicate run-from-block executions across hook instances -let runFromBlockGlobalLock = false // Debug state validation result interface DebugValidationResult { @@ -674,7 +672,8 @@ export function useWorkflowExecution() { onStream?: (se: StreamingExecution) => Promise, executionId?: string, onBlockComplete?: (blockId: string, output: any) => Promise, - overrideTriggerType?: 'chat' | 'manual' | 'api' + overrideTriggerType?: 'chat' | 'manual' | 'api', + stopAfterBlockId?: string ): Promise => { // Use diff workflow for execution when available, regardless of canvas view state const executionWorkflowState = null as { @@ -895,6 +894,7 @@ export function useWorkflowExecution() { triggerType: overrideTriggerType || 'manual', useDraftState: true, isClientSession: true, + stopAfterBlockId, workflowStateOverride: executionWorkflowState ? { blocks: executionWorkflowState.blocks, @@ -1080,19 +1080,47 @@ export function useWorkflowExecution() { // Store execution snapshot for run-from-block if (data.success && activeWorkflowId) { - const snapshot: SerializableExecutionState = { - blockStates: Object.fromEntries(accumulatedBlockStates), - executedBlocks: Array.from(executedBlockIds), - blockLogs: accumulatedBlockLogs, - decisions: { router: {}, condition: {} }, - completedLoops: [], - activeExecutionPath: Array.from(executedBlockIds), + if (stopAfterBlockId) { + // Partial run (run-until-block): merge with existing snapshot + const existingSnapshot = getLastExecutionSnapshot(activeWorkflowId) + const mergedBlockStates = { + ...(existingSnapshot?.blockStates || {}), + ...Object.fromEntries(accumulatedBlockStates), + } + const mergedExecutedBlocks = new Set([ + ...(existingSnapshot?.executedBlocks || []), + ...executedBlockIds, + ]) + const snapshot: SerializableExecutionState = { + blockStates: mergedBlockStates, + executedBlocks: Array.from(mergedExecutedBlocks), + blockLogs: [...(existingSnapshot?.blockLogs || []), ...accumulatedBlockLogs], + decisions: existingSnapshot?.decisions || { router: {}, condition: {} }, + completedLoops: existingSnapshot?.completedLoops || [], + activeExecutionPath: Array.from(mergedExecutedBlocks), + } + setLastExecutionSnapshot(activeWorkflowId, snapshot) + logger.info('Merged execution snapshot after run-until-block', { + workflowId: activeWorkflowId, + newBlocksExecuted: executedBlockIds.size, + totalExecutedBlocks: mergedExecutedBlocks.size, + }) + } else { + // Full run: replace snapshot entirely + const snapshot: SerializableExecutionState = { + blockStates: Object.fromEntries(accumulatedBlockStates), + executedBlocks: Array.from(executedBlockIds), + blockLogs: accumulatedBlockLogs, + decisions: { router: {}, condition: {} }, + completedLoops: [], + activeExecutionPath: Array.from(executedBlockIds), + } + setLastExecutionSnapshot(activeWorkflowId, snapshot) + logger.info('Stored execution snapshot for run-from-block', { + workflowId: activeWorkflowId, + executedBlocksCount: executedBlockIds.size, + }) } - setLastExecutionSnapshot(activeWorkflowId, snapshot) - logger.info('Stored execution snapshot for run-from-block', { - workflowId: activeWorkflowId, - executedBlocksCount: executedBlockIds.size, - }) } }, @@ -1419,26 +1447,21 @@ export function useWorkflowExecution() { */ const handleRunFromBlock = useCallback( async (blockId: string, workflowId: string) => { - // Prevent duplicate executions across multiple hook instances (panel.tsx and chat.tsx) - if (runFromBlockGlobalLock) { - logger.debug('Run-from-block already in progress (global lock), ignoring duplicate request', { - workflowId, - blockId, - }) - return - } - runFromBlockGlobalLock = true - const snapshot = getLastExecutionSnapshot(workflowId) if (!snapshot) { logger.error('No execution snapshot available for run-from-block', { workflowId, blockId }) - runFromBlockGlobalLock = false return } - if (!snapshot.executedBlocks.includes(blockId)) { - logger.error('Block was not executed in the source run', { workflowId, blockId }) - runFromBlockGlobalLock = false + // Check if all upstream dependencies have cached outputs + const workflowEdges = useWorkflowStore.getState().edges + const incomingEdges = workflowEdges.filter((edge) => edge.target === blockId) + const dependenciesSatisfied = + incomingEdges.length === 0 || + incomingEdges.every((edge) => snapshot.executedBlocks.includes(edge.source)) + + if (!dependenciesSatisfied) { + logger.error('Upstream dependencies not satisfied for run-from-block', { workflowId, blockId }) return } @@ -1449,8 +1472,6 @@ export function useWorkflowExecution() { }) setIsExecuting(true) - - const workflowEdges = useWorkflowStore.getState().edges const executionId = uuidv4() const accumulatedBlockLogs: BlockLog[] = [] const accumulatedBlockStates = new Map() @@ -1612,7 +1633,6 @@ export function useWorkflowExecution() { } finally { setIsExecuting(false) setActiveBlocks(new Set()) - runFromBlockGlobalLock = false } }, [ @@ -1627,18 +1647,45 @@ export function useWorkflowExecution() { ] ) - // Listen for run-from-block events from the action bar - useEffect(() => { - const handleRunFromBlockEvent = (event: CustomEvent<{ blockId: string; workflowId: string }>) => { - const { blockId, workflowId } = event.detail - handleRunFromBlock(blockId, workflowId) - } + /** + * Handles running workflow until a specific block (stops after that block completes) + */ + const handleRunUntilBlock = useCallback( + async (blockId: string, workflowId: string) => { + if (!workflowId || workflowId !== activeWorkflowId) { + logger.error('Invalid workflow ID for run-until-block', { workflowId, activeWorkflowId }) + return + } - window.addEventListener('run-from-block', handleRunFromBlockEvent as EventListener) - return () => { - window.removeEventListener('run-from-block', handleRunFromBlockEvent as EventListener) - } - }, [handleRunFromBlock]) + logger.info('Starting run-until-block execution', { workflowId, stopAfterBlockId: blockId }) + + setExecutionResult(null) + setIsExecuting(true) + + const executionId = uuidv4() + try { + const result = await executeWorkflow( + undefined, + undefined, + executionId, + undefined, + 'manual', + blockId + ) + if (result && 'success' in result) { + setExecutionResult(result) + } + } catch (error) { + const errorResult = handleExecutionError(error, { executionId }) + return errorResult + } finally { + setIsExecuting(false) + setIsDebugging(false) + setActiveBlocks(new Set()) + } + }, + [activeWorkflowId, setExecutionResult, setIsExecuting, setIsDebugging, setActiveBlocks] + ) return { isExecuting, @@ -1651,5 +1698,6 @@ export function useWorkflowExecution() { handleCancelDebug, handleCancelExecution, handleRunFromBlock, + handleRunUntilBlock, } } diff --git a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/workflow.tsx b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/workflow.tsx index 491995b0b..0d13f7d63 100644 --- a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/workflow.tsx +++ b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/workflow.tsx @@ -47,6 +47,7 @@ import { useCurrentWorkflow, useNodeUtilities, useShiftSelectionLock, + useWorkflowExecution, } from '@/app/workspace/[workspaceId]/w/[workflowId]/hooks' import { calculateContainerDimensions, @@ -325,6 +326,8 @@ const WorkflowContent = React.memo(() => { const showTrainingModal = useCopilotTrainingStore((state) => state.showModal) + const { handleRunFromBlock, handleRunUntilBlock } = useWorkflowExecution() + const snapToGridSize = useSnapToGridSize() const snapToGrid = snapToGridSize > 0 @@ -994,12 +997,14 @@ const WorkflowContent = React.memo(() => { const handleContextRunFromBlock = useCallback(() => { if (contextMenuBlocks.length !== 1) return const blockId = contextMenuBlocks[0].id - window.dispatchEvent( - new CustomEvent('run-from-block', { - detail: { blockId, workflowId: workflowIdParam }, - }) - ) - }, [contextMenuBlocks, workflowIdParam]) + handleRunFromBlock(blockId, workflowIdParam) + }, [contextMenuBlocks, workflowIdParam, handleRunFromBlock]) + + const handleContextRunUntilBlock = useCallback(() => { + if (contextMenuBlocks.length !== 1) return + const blockId = contextMenuBlocks[0].id + handleRunUntilBlock(blockId, workflowIdParam) + }, [contextMenuBlocks, workflowIdParam, handleRunUntilBlock]) const handleContextAddBlock = useCallback(() => { useSearchModalStore.getState().open() @@ -3322,6 +3327,7 @@ const WorkflowContent = React.memo(() => { onOpenEditor={handleContextOpenEditor} onRename={handleContextRename} onRunFromBlock={handleContextRunFromBlock} + onRunUntilBlock={handleContextRunUntilBlock} hasClipboard={hasClipboard()} showRemoveFromSubflow={contextMenuBlocks.some( (b) => b.parentId && (b.parentType === 'loop' || b.parentType === 'parallel') @@ -3331,11 +3337,16 @@ const WorkflowContent = React.memo(() => { (() => { const block = contextMenuBlocks[0] const snapshot = getLastExecutionSnapshot(workflowIdParam) - const wasExecuted = snapshot?.executedBlocks.includes(block.id) ?? false + if (!snapshot) return false + // Check if all upstream dependencies have cached outputs + const incomingEdges = edges.filter((edge) => edge.target === block.id) + const dependenciesSatisfied = + incomingEdges.length === 0 || + incomingEdges.every((edge) => snapshot.executedBlocks.includes(edge.source)) const isNoteBlock = block.type === 'note' const isInsideSubflow = block.parentId && (block.parentType === 'loop' || block.parentType === 'parallel') - return !!snapshot && wasExecuted && !isNoteBlock && !isInsideSubflow && !isExecuting + return dependenciesSatisfied && !isNoteBlock && !isInsideSubflow && !isExecuting })() } runFromBlockDisabledReason={ @@ -3343,11 +3354,15 @@ const WorkflowContent = React.memo(() => { ? (() => { const block = contextMenuBlocks[0] const snapshot = getLastExecutionSnapshot(workflowIdParam) - const wasExecuted = snapshot?.executedBlocks.includes(block.id) ?? false + if (!snapshot) return 'Run workflow first' + // Check if all upstream dependencies have cached outputs + const incomingEdges = edges.filter((edge) => edge.target === block.id) + const dependenciesSatisfied = + incomingEdges.length === 0 || + incomingEdges.every((edge) => snapshot.executedBlocks.includes(edge.source)) const isInsideSubflow = block.parentId && (block.parentType === 'loop' || block.parentType === 'parallel') - if (!snapshot) return 'Run workflow first' - if (!wasExecuted) return 'Block not executed in last run' + if (!dependenciesSatisfied) return 'Run upstream blocks first' if (isInsideSubflow) return 'Cannot run from inside subflow' return undefined })() diff --git a/apps/sim/executor/execution/engine.ts b/apps/sim/executor/execution/engine.ts index 9cea32218..b519fceef 100644 --- a/apps/sim/executor/execution/engine.ts +++ b/apps/sim/executor/execution/engine.ts @@ -26,6 +26,7 @@ export class ExecutionEngine { private allowResumeTriggers: boolean private cancelledFlag = false private errorFlag = false + private stoppedEarlyFlag = false private executionError: Error | null = null private lastCancellationCheck = 0 private readonly useRedisCancellation: boolean @@ -105,7 +106,7 @@ export class ExecutionEngine { this.initializeQueue(triggerBlockId) while (this.hasWork()) { - if ((await this.checkCancellation()) || this.errorFlag) { + if ((await this.checkCancellation()) || this.errorFlag || this.stoppedEarlyFlag) { break } await this.processQueue() @@ -396,6 +397,13 @@ export class ExecutionEngine { this.finalOutput = output } + // Check if we should stop after this block (run-until-block feature) + if (this.context.stopAfterBlockId === nodeId) { + logger.info('Stopping execution after target block', { nodeId }) + this.stoppedEarlyFlag = true + return + } + const readyNodes = this.edgeManager.processOutgoingEdges(node, output, false) logger.info('Processing outgoing edges', { diff --git a/apps/sim/executor/execution/executor.ts b/apps/sim/executor/execution/executor.ts index dd303217e..4112eb6e6 100644 --- a/apps/sim/executor/execution/executor.ts +++ b/apps/sim/executor/execution/executor.ts @@ -296,6 +296,7 @@ export class DAGExecutor { includeFileBase64: this.contextExtensions.includeFileBase64, base64MaxBytes: this.contextExtensions.base64MaxBytes, runFromBlockContext: overrides?.runFromBlockContext, + stopAfterBlockId: this.contextExtensions.stopAfterBlockId, } if (this.contextExtensions.resumeFromSnapshot) { diff --git a/apps/sim/executor/execution/types.ts b/apps/sim/executor/execution/types.ts index 9a4ffb691..40c4c61cd 100644 --- a/apps/sim/executor/execution/types.ts +++ b/apps/sim/executor/execution/types.ts @@ -112,6 +112,11 @@ export interface ContextExtensions { * execution mode starting from the specified block. */ runFromBlockContext?: RunFromBlockContext + + /** + * Stop execution after this block completes. Used for "run until block" feature. + */ + stopAfterBlockId?: string } export interface WorkflowInput { diff --git a/apps/sim/executor/types.ts b/apps/sim/executor/types.ts index 35ff1c3c0..9752b7701 100644 --- a/apps/sim/executor/types.ts +++ b/apps/sim/executor/types.ts @@ -257,6 +257,11 @@ export interface ExecutionContext { * will be executed; others return cached outputs from the source snapshot. */ runFromBlockContext?: RunFromBlockContext + + /** + * Stop execution after this block completes. Used for "run until block" feature. + */ + stopAfterBlockId?: string } export interface ExecutionResult { diff --git a/apps/sim/executor/utils/run-from-block.test.ts b/apps/sim/executor/utils/run-from-block.test.ts index a641d805b..098d6e5c9 100644 --- a/apps/sim/executor/utils/run-from-block.test.ts +++ b/apps/sim/executor/utils/run-from-block.test.ts @@ -311,14 +311,25 @@ describe('validateRunFromBlock', () => { expect(result.error).toContain('sentinel') }) - it('rejects unexecuted blocks', () => { - const dag = createDAG([createNode('A'), createNode('B')]) - const executedBlocks = new Set(['A']) // B was not executed + it('rejects blocks with unexecuted upstream dependencies', () => { + // A → B, only A executed but B depends on A + const dag = createDAG([createNode('A', [{ target: 'B' }]), createNode('B')]) + const executedBlocks = new Set() // A was not executed const result = validateRunFromBlock('B', dag, executedBlocks) expect(result.valid).toBe(false) - expect(result.error).toContain('was not executed') + expect(result.error).toContain('Upstream dependency not executed') + }) + + it('allows blocks with no dependencies even if not previously executed', () => { + // A and B are independent (no edges) + const dag = createDAG([createNode('A'), createNode('B')]) + const executedBlocks = new Set(['A']) // B was not executed but has no deps + + const result = validateRunFromBlock('B', dag, executedBlocks) + + expect(result.valid).toBe(true) // B has no incoming edges, so it's valid }) it('accepts regular executed block', () => { @@ -374,19 +385,22 @@ describe('validateRunFromBlock', () => { expect(result.valid).toBe(true) }) - it('rejects loop container that was not executed', () => { + it('allows loop container with no upstream dependencies', () => { + // Loop containers are validated via their sentinel nodes, not incoming edges on the container itself + // If the loop has no upstream dependencies, it should be valid const loopId = 'loop-container-1' const sentinelStartId = `loop-${loopId}-sentinel-start` const dag = createDAG([ createNode(sentinelStartId, [], { isSentinel: true, sentinelType: 'start', loopId }), ]) dag.loopConfigs.set(loopId, { id: loopId, nodes: [], iterations: 3, loopType: 'for' } as any) - const executedBlocks = new Set() // Loop was not executed + const executedBlocks = new Set() // Nothing executed but loop has no deps const result = validateRunFromBlock(loopId, dag, executedBlocks) - expect(result.valid).toBe(false) - expect(result.error).toContain('was not executed') + // Loop container validation doesn't check incoming edges (containers don't have nodes in dag.nodes) + // So this is valid - the loop can start fresh + expect(result.valid).toBe(true) }) }) diff --git a/apps/sim/executor/utils/run-from-block.ts b/apps/sim/executor/utils/run-from-block.ts index 0b364063c..a12be5ab2 100644 --- a/apps/sim/executor/utils/run-from-block.ts +++ b/apps/sim/executor/utils/run-from-block.ts @@ -105,7 +105,7 @@ export function computeDirtySet(dag: DAG, startBlockId: string): Set { * - Block cannot be inside a loop (but loop containers are allowed) * - Block cannot be inside a parallel (but parallel containers are allowed) * - Block cannot be a sentinel node - * - Block must have been executed in the source run + * - All upstream dependencies must have been executed (have cached outputs) * * @param blockId - The block ID to validate * @param dag - The workflow DAG @@ -158,12 +158,18 @@ export function validateRunFromBlock( if (node.metadata.isSentinel) { return { valid: false, error: 'Cannot run from sentinel node' } } - } - if (!executedBlocks.has(blockId)) { - return { - valid: false, - error: `Block was not executed in source run: ${blockId}`, + // Check if all upstream dependencies have been executed (have cached outputs) + // If no incoming edges (trigger/start block), dependencies are satisfied + if (node.incomingEdges.size > 0) { + for (const sourceId of node.incomingEdges.keys()) { + if (!executedBlocks.has(sourceId)) { + return { + valid: false, + error: `Upstream dependency not executed: ${sourceId}`, + } + } + } } } diff --git a/apps/sim/hooks/use-execution-stream.ts b/apps/sim/hooks/use-execution-stream.ts index bd36cb55d..0273165e4 100644 --- a/apps/sim/hooks/use-execution-stream.ts +++ b/apps/sim/hooks/use-execution-stream.ts @@ -143,6 +143,7 @@ export interface ExecuteStreamOptions { loops?: Record parallels?: Record } + stopAfterBlockId?: string callbacks?: ExecutionStreamCallbacks } diff --git a/apps/sim/lib/workflows/executor/execution-core.ts b/apps/sim/lib/workflows/executor/execution-core.ts index c2b300f08..2ce87873f 100644 --- a/apps/sim/lib/workflows/executor/execution-core.ts +++ b/apps/sim/lib/workflows/executor/execution-core.ts @@ -40,6 +40,7 @@ export interface ExecuteWorkflowCoreOptions { abortSignal?: AbortSignal includeFileBase64?: boolean base64MaxBytes?: number + stopAfterBlockId?: string } function parseVariableValueByType(value: unknown, type: string): unknown { @@ -114,6 +115,7 @@ export async function executeWorkflowCore( abortSignal, includeFileBase64, base64MaxBytes, + stopAfterBlockId, } = options const { metadata, workflow, input, workflowVariables, selectedOutputs } = snapshot const { requestId, workflowId, userId, triggerType, executionId, triggerBlockId, useDraftState } = @@ -297,6 +299,7 @@ export async function executeWorkflowCore( abortSignal, includeFileBase64, base64MaxBytes, + stopAfterBlockId, } const executorInstance = new Executor({