Run u ntil block

This commit is contained in:
Siddharth Ganesan
2026-01-27 12:13:09 -08:00
parent 6e541949ec
commit 23ab11a40d
13 changed files with 215 additions and 88 deletions

View File

@@ -53,6 +53,7 @@ const ExecuteWorkflowSchema = z.object({
parallels: z.record(z.any()).optional(),
})
.optional(),
stopAfterBlockId: z.string().optional(),
})
export const runtime = 'nodejs'
@@ -222,6 +223,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
includeFileBase64,
base64MaxBytes,
workflowStateOverride,
stopAfterBlockId,
} = validation.data
// For API key and internal JWT auth, the entire body is the input (except for our control fields)
@@ -237,6 +239,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
includeFileBase64,
base64MaxBytes,
workflowStateOverride,
stopAfterBlockId: _stopAfterBlockId,
workflowId: _workflowId, // Also exclude workflowId used for internal JWT auth
...rest
} = body
@@ -434,6 +437,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
loggingSession,
includeFileBase64,
base64MaxBytes,
stopAfterBlockId,
})
const outputWithBase64 = includeFileBase64
@@ -722,6 +726,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
abortSignal: abortController.signal,
includeFileBase64,
base64MaxBytes,
stopAfterBlockId,
})
if (result.status === 'paused') {

View File

@@ -4,6 +4,7 @@ import { Button, Copy, Tooltip, Trash2 } from '@/components/emcn'
import { cn } from '@/lib/core/utils/cn'
import { isInputDefinitionTrigger } from '@/lib/workflows/triggers/input-definition-triggers'
import { useUserPermissionsContext } from '@/app/workspace/[workspaceId]/providers/workspace-permissions-provider'
import { useWorkflowExecution } from '@/app/workspace/[workspaceId]/w/[workflowId]/hooks'
import { validateTriggerPaste } from '@/app/workspace/[workspaceId]/w/[workflowId]/utils'
import { useCollaborativeWorkflow } from '@/hooks/use-collaborative-workflow'
import { useExecutionStore } from '@/stores/execution'
@@ -50,6 +51,7 @@ export const ActionBar = memo(
collaborativeBatchToggleBlockHandles,
} = useCollaborativeWorkflow()
const { setPendingSelection } = useWorkflowRegistry()
const { handleRunFromBlock } = useWorkflowExecution()
const addNotification = useNotificationStore((s) => s.addNotification)
@@ -101,6 +103,7 @@ export const ActionBar = memo(
const { activeWorkflowId } = useWorkflowRegistry()
const { isExecuting, getLastExecutionSnapshot } = useExecutionStore()
const userPermissions = useUserPermissionsContext()
const edges = useWorkflowStore((state) => state.edges)
const isStartBlock = isInputDefinitionTrigger(blockType)
const isResponseBlock = blockType === 'response'
@@ -109,29 +112,29 @@ export const ActionBar = memo(
const isInsideSubflow = parentId && (parentType === 'loop' || parentType === 'parallel')
// Check if run-from-block is available
const hasExecutionSnapshot = activeWorkflowId
? !!getLastExecutionSnapshot(activeWorkflowId)
: false
const wasExecuted = activeWorkflowId
? getLastExecutionSnapshot(activeWorkflowId)?.executedBlocks.includes(blockId) ?? false
: false
// Block can run if all its upstream dependencies have cached outputs
const snapshot = activeWorkflowId ? getLastExecutionSnapshot(activeWorkflowId) : null
const hasExecutionSnapshot = !!snapshot
const dependenciesSatisfied = (() => {
if (!snapshot) return false
// Find all blocks that feed into this block
const incomingEdges = edges.filter((edge) => edge.target === blockId)
// If no incoming edges (trigger/start block), dependencies are satisfied
if (incomingEdges.length === 0) return true
// All source blocks must have been executed (have cached outputs)
return incomingEdges.every((edge) => snapshot.executedBlocks.includes(edge.source))
})()
const canRunFromBlock =
hasExecutionSnapshot &&
wasExecuted &&
dependenciesSatisfied &&
!isNoteBlock &&
!isInsideSubflow &&
!isExecuting
const handleRunFromBlock = useCallback(() => {
const handleRunFromBlockClick = useCallback(() => {
if (!activeWorkflowId || !canRunFromBlock) return
// Dispatch a custom event to trigger run-from-block execution
window.dispatchEvent(
new CustomEvent('run-from-block', {
detail: { blockId, workflowId: activeWorkflowId },
})
)
}, [blockId, activeWorkflowId, canRunFromBlock])
handleRunFromBlock(blockId, activeWorkflowId)
}, [blockId, activeWorkflowId, canRunFromBlock, handleRunFromBlock])
/**
* Get appropriate tooltip message based on disabled state
@@ -165,7 +168,7 @@ export const ActionBar = memo(
onClick={(e) => {
e.stopPropagation()
if (canRunFromBlock && !disabled) {
handleRunFromBlock()
handleRunFromBlockClick()
}
}}
className={ACTION_BUTTON_STYLES}
@@ -179,7 +182,7 @@ export const ActionBar = memo(
if (disabled) return getTooltipMessage('Run from this block')
if (isExecuting) return 'Execution in progress'
if (!hasExecutionSnapshot) return 'Run workflow first'
if (!wasExecuted) return 'Block not executed in last run'
if (!dependenciesSatisfied) return 'Run upstream blocks first'
if (isInsideSubflow) return 'Cannot run from inside subflow'
return 'Run from this block'
})()}

View File

@@ -41,6 +41,7 @@ export interface BlockMenuProps {
onOpenEditor: () => void
onRename: () => void
onRunFromBlock?: () => void
onRunUntilBlock?: () => void
hasClipboard?: boolean
showRemoveFromSubflow?: boolean
/** Whether run from block is available (has snapshot, was executed, not inside subflow) */
@@ -72,6 +73,7 @@ export function BlockMenu({
onOpenEditor,
onRename,
onRunFromBlock,
onRunUntilBlock,
hasClipboard = false,
showRemoveFromSubflow = false,
canRunFromBlock = false,
@@ -213,7 +215,7 @@ export function BlockMenu({
</PopoverItem>
)}
{/* Run from block - only for single non-note block selection */}
{/* Run from/until block - only for single non-note block selection */}
{isSingleBlock && !allNoteBlocks && (
<>
<PopoverDivider />
@@ -232,6 +234,17 @@ export function BlockMenu({
? runFromBlockDisabledReason
: 'Run from this block'}
</PopoverItem>
<PopoverItem
disabled={isExecuting}
onClick={() => {
if (!isExecuting) {
onRunUntilBlock?.()
onClose()
}
}}
>
{isExecuting ? 'Execution in progress...' : 'Run until this block'}
</PopoverItem>
</>
)}

View File

@@ -33,8 +33,6 @@ import { useWorkflowStore } from '@/stores/workflows/workflow/store'
const logger = createLogger('useWorkflowExecution')
// Module-level guard to prevent duplicate run-from-block executions across hook instances
let runFromBlockGlobalLock = false
// Debug state validation result
interface DebugValidationResult {
@@ -674,7 +672,8 @@ export function useWorkflowExecution() {
onStream?: (se: StreamingExecution) => Promise<void>,
executionId?: string,
onBlockComplete?: (blockId: string, output: any) => Promise<void>,
overrideTriggerType?: 'chat' | 'manual' | 'api'
overrideTriggerType?: 'chat' | 'manual' | 'api',
stopAfterBlockId?: string
): Promise<ExecutionResult | StreamingExecution> => {
// Use diff workflow for execution when available, regardless of canvas view state
const executionWorkflowState = null as {
@@ -895,6 +894,7 @@ export function useWorkflowExecution() {
triggerType: overrideTriggerType || 'manual',
useDraftState: true,
isClientSession: true,
stopAfterBlockId,
workflowStateOverride: executionWorkflowState
? {
blocks: executionWorkflowState.blocks,
@@ -1080,19 +1080,47 @@ export function useWorkflowExecution() {
// Store execution snapshot for run-from-block
if (data.success && activeWorkflowId) {
const snapshot: SerializableExecutionState = {
blockStates: Object.fromEntries(accumulatedBlockStates),
executedBlocks: Array.from(executedBlockIds),
blockLogs: accumulatedBlockLogs,
decisions: { router: {}, condition: {} },
completedLoops: [],
activeExecutionPath: Array.from(executedBlockIds),
if (stopAfterBlockId) {
// Partial run (run-until-block): merge with existing snapshot
const existingSnapshot = getLastExecutionSnapshot(activeWorkflowId)
const mergedBlockStates = {
...(existingSnapshot?.blockStates || {}),
...Object.fromEntries(accumulatedBlockStates),
}
const mergedExecutedBlocks = new Set([
...(existingSnapshot?.executedBlocks || []),
...executedBlockIds,
])
const snapshot: SerializableExecutionState = {
blockStates: mergedBlockStates,
executedBlocks: Array.from(mergedExecutedBlocks),
blockLogs: [...(existingSnapshot?.blockLogs || []), ...accumulatedBlockLogs],
decisions: existingSnapshot?.decisions || { router: {}, condition: {} },
completedLoops: existingSnapshot?.completedLoops || [],
activeExecutionPath: Array.from(mergedExecutedBlocks),
}
setLastExecutionSnapshot(activeWorkflowId, snapshot)
logger.info('Merged execution snapshot after run-until-block', {
workflowId: activeWorkflowId,
newBlocksExecuted: executedBlockIds.size,
totalExecutedBlocks: mergedExecutedBlocks.size,
})
} else {
// Full run: replace snapshot entirely
const snapshot: SerializableExecutionState = {
blockStates: Object.fromEntries(accumulatedBlockStates),
executedBlocks: Array.from(executedBlockIds),
blockLogs: accumulatedBlockLogs,
decisions: { router: {}, condition: {} },
completedLoops: [],
activeExecutionPath: Array.from(executedBlockIds),
}
setLastExecutionSnapshot(activeWorkflowId, snapshot)
logger.info('Stored execution snapshot for run-from-block', {
workflowId: activeWorkflowId,
executedBlocksCount: executedBlockIds.size,
})
}
setLastExecutionSnapshot(activeWorkflowId, snapshot)
logger.info('Stored execution snapshot for run-from-block', {
workflowId: activeWorkflowId,
executedBlocksCount: executedBlockIds.size,
})
}
},
@@ -1419,26 +1447,21 @@ export function useWorkflowExecution() {
*/
const handleRunFromBlock = useCallback(
async (blockId: string, workflowId: string) => {
// Prevent duplicate executions across multiple hook instances (panel.tsx and chat.tsx)
if (runFromBlockGlobalLock) {
logger.debug('Run-from-block already in progress (global lock), ignoring duplicate request', {
workflowId,
blockId,
})
return
}
runFromBlockGlobalLock = true
const snapshot = getLastExecutionSnapshot(workflowId)
if (!snapshot) {
logger.error('No execution snapshot available for run-from-block', { workflowId, blockId })
runFromBlockGlobalLock = false
return
}
if (!snapshot.executedBlocks.includes(blockId)) {
logger.error('Block was not executed in the source run', { workflowId, blockId })
runFromBlockGlobalLock = false
// Check if all upstream dependencies have cached outputs
const workflowEdges = useWorkflowStore.getState().edges
const incomingEdges = workflowEdges.filter((edge) => edge.target === blockId)
const dependenciesSatisfied =
incomingEdges.length === 0 ||
incomingEdges.every((edge) => snapshot.executedBlocks.includes(edge.source))
if (!dependenciesSatisfied) {
logger.error('Upstream dependencies not satisfied for run-from-block', { workflowId, blockId })
return
}
@@ -1449,8 +1472,6 @@ export function useWorkflowExecution() {
})
setIsExecuting(true)
const workflowEdges = useWorkflowStore.getState().edges
const executionId = uuidv4()
const accumulatedBlockLogs: BlockLog[] = []
const accumulatedBlockStates = new Map<string, BlockState>()
@@ -1612,7 +1633,6 @@ export function useWorkflowExecution() {
} finally {
setIsExecuting(false)
setActiveBlocks(new Set())
runFromBlockGlobalLock = false
}
},
[
@@ -1627,18 +1647,45 @@ export function useWorkflowExecution() {
]
)
// Listen for run-from-block events from the action bar
useEffect(() => {
const handleRunFromBlockEvent = (event: CustomEvent<{ blockId: string; workflowId: string }>) => {
const { blockId, workflowId } = event.detail
handleRunFromBlock(blockId, workflowId)
}
/**
* Handles running workflow until a specific block (stops after that block completes)
*/
const handleRunUntilBlock = useCallback(
async (blockId: string, workflowId: string) => {
if (!workflowId || workflowId !== activeWorkflowId) {
logger.error('Invalid workflow ID for run-until-block', { workflowId, activeWorkflowId })
return
}
window.addEventListener('run-from-block', handleRunFromBlockEvent as EventListener)
return () => {
window.removeEventListener('run-from-block', handleRunFromBlockEvent as EventListener)
}
}, [handleRunFromBlock])
logger.info('Starting run-until-block execution', { workflowId, stopAfterBlockId: blockId })
setExecutionResult(null)
setIsExecuting(true)
const executionId = uuidv4()
try {
const result = await executeWorkflow(
undefined,
undefined,
executionId,
undefined,
'manual',
blockId
)
if (result && 'success' in result) {
setExecutionResult(result)
}
} catch (error) {
const errorResult = handleExecutionError(error, { executionId })
return errorResult
} finally {
setIsExecuting(false)
setIsDebugging(false)
setActiveBlocks(new Set())
}
},
[activeWorkflowId, setExecutionResult, setIsExecuting, setIsDebugging, setActiveBlocks]
)
return {
isExecuting,
@@ -1651,5 +1698,6 @@ export function useWorkflowExecution() {
handleCancelDebug,
handleCancelExecution,
handleRunFromBlock,
handleRunUntilBlock,
}
}

View File

@@ -47,6 +47,7 @@ import {
useCurrentWorkflow,
useNodeUtilities,
useShiftSelectionLock,
useWorkflowExecution,
} from '@/app/workspace/[workspaceId]/w/[workflowId]/hooks'
import {
calculateContainerDimensions,
@@ -325,6 +326,8 @@ const WorkflowContent = React.memo(() => {
const showTrainingModal = useCopilotTrainingStore((state) => state.showModal)
const { handleRunFromBlock, handleRunUntilBlock } = useWorkflowExecution()
const snapToGridSize = useSnapToGridSize()
const snapToGrid = snapToGridSize > 0
@@ -994,12 +997,14 @@ const WorkflowContent = React.memo(() => {
const handleContextRunFromBlock = useCallback(() => {
if (contextMenuBlocks.length !== 1) return
const blockId = contextMenuBlocks[0].id
window.dispatchEvent(
new CustomEvent('run-from-block', {
detail: { blockId, workflowId: workflowIdParam },
})
)
}, [contextMenuBlocks, workflowIdParam])
handleRunFromBlock(blockId, workflowIdParam)
}, [contextMenuBlocks, workflowIdParam, handleRunFromBlock])
const handleContextRunUntilBlock = useCallback(() => {
if (contextMenuBlocks.length !== 1) return
const blockId = contextMenuBlocks[0].id
handleRunUntilBlock(blockId, workflowIdParam)
}, [contextMenuBlocks, workflowIdParam, handleRunUntilBlock])
const handleContextAddBlock = useCallback(() => {
useSearchModalStore.getState().open()
@@ -3322,6 +3327,7 @@ const WorkflowContent = React.memo(() => {
onOpenEditor={handleContextOpenEditor}
onRename={handleContextRename}
onRunFromBlock={handleContextRunFromBlock}
onRunUntilBlock={handleContextRunUntilBlock}
hasClipboard={hasClipboard()}
showRemoveFromSubflow={contextMenuBlocks.some(
(b) => b.parentId && (b.parentType === 'loop' || b.parentType === 'parallel')
@@ -3331,11 +3337,16 @@ const WorkflowContent = React.memo(() => {
(() => {
const block = contextMenuBlocks[0]
const snapshot = getLastExecutionSnapshot(workflowIdParam)
const wasExecuted = snapshot?.executedBlocks.includes(block.id) ?? false
if (!snapshot) return false
// Check if all upstream dependencies have cached outputs
const incomingEdges = edges.filter((edge) => edge.target === block.id)
const dependenciesSatisfied =
incomingEdges.length === 0 ||
incomingEdges.every((edge) => snapshot.executedBlocks.includes(edge.source))
const isNoteBlock = block.type === 'note'
const isInsideSubflow =
block.parentId && (block.parentType === 'loop' || block.parentType === 'parallel')
return !!snapshot && wasExecuted && !isNoteBlock && !isInsideSubflow && !isExecuting
return dependenciesSatisfied && !isNoteBlock && !isInsideSubflow && !isExecuting
})()
}
runFromBlockDisabledReason={
@@ -3343,11 +3354,15 @@ const WorkflowContent = React.memo(() => {
? (() => {
const block = contextMenuBlocks[0]
const snapshot = getLastExecutionSnapshot(workflowIdParam)
const wasExecuted = snapshot?.executedBlocks.includes(block.id) ?? false
if (!snapshot) return 'Run workflow first'
// Check if all upstream dependencies have cached outputs
const incomingEdges = edges.filter((edge) => edge.target === block.id)
const dependenciesSatisfied =
incomingEdges.length === 0 ||
incomingEdges.every((edge) => snapshot.executedBlocks.includes(edge.source))
const isInsideSubflow =
block.parentId && (block.parentType === 'loop' || block.parentType === 'parallel')
if (!snapshot) return 'Run workflow first'
if (!wasExecuted) return 'Block not executed in last run'
if (!dependenciesSatisfied) return 'Run upstream blocks first'
if (isInsideSubflow) return 'Cannot run from inside subflow'
return undefined
})()

View File

@@ -26,6 +26,7 @@ export class ExecutionEngine {
private allowResumeTriggers: boolean
private cancelledFlag = false
private errorFlag = false
private stoppedEarlyFlag = false
private executionError: Error | null = null
private lastCancellationCheck = 0
private readonly useRedisCancellation: boolean
@@ -105,7 +106,7 @@ export class ExecutionEngine {
this.initializeQueue(triggerBlockId)
while (this.hasWork()) {
if ((await this.checkCancellation()) || this.errorFlag) {
if ((await this.checkCancellation()) || this.errorFlag || this.stoppedEarlyFlag) {
break
}
await this.processQueue()
@@ -396,6 +397,13 @@ export class ExecutionEngine {
this.finalOutput = output
}
// Check if we should stop after this block (run-until-block feature)
if (this.context.stopAfterBlockId === nodeId) {
logger.info('Stopping execution after target block', { nodeId })
this.stoppedEarlyFlag = true
return
}
const readyNodes = this.edgeManager.processOutgoingEdges(node, output, false)
logger.info('Processing outgoing edges', {

View File

@@ -296,6 +296,7 @@ export class DAGExecutor {
includeFileBase64: this.contextExtensions.includeFileBase64,
base64MaxBytes: this.contextExtensions.base64MaxBytes,
runFromBlockContext: overrides?.runFromBlockContext,
stopAfterBlockId: this.contextExtensions.stopAfterBlockId,
}
if (this.contextExtensions.resumeFromSnapshot) {

View File

@@ -112,6 +112,11 @@ export interface ContextExtensions {
* execution mode starting from the specified block.
*/
runFromBlockContext?: RunFromBlockContext
/**
* Stop execution after this block completes. Used for "run until block" feature.
*/
stopAfterBlockId?: string
}
export interface WorkflowInput {

View File

@@ -257,6 +257,11 @@ export interface ExecutionContext {
* will be executed; others return cached outputs from the source snapshot.
*/
runFromBlockContext?: RunFromBlockContext
/**
* Stop execution after this block completes. Used for "run until block" feature.
*/
stopAfterBlockId?: string
}
export interface ExecutionResult {

View File

@@ -311,14 +311,25 @@ describe('validateRunFromBlock', () => {
expect(result.error).toContain('sentinel')
})
it('rejects unexecuted blocks', () => {
const dag = createDAG([createNode('A'), createNode('B')])
const executedBlocks = new Set(['A']) // B was not executed
it('rejects blocks with unexecuted upstream dependencies', () => {
// A → B, only A executed but B depends on A
const dag = createDAG([createNode('A', [{ target: 'B' }]), createNode('B')])
const executedBlocks = new Set<string>() // A was not executed
const result = validateRunFromBlock('B', dag, executedBlocks)
expect(result.valid).toBe(false)
expect(result.error).toContain('was not executed')
expect(result.error).toContain('Upstream dependency not executed')
})
it('allows blocks with no dependencies even if not previously executed', () => {
// A and B are independent (no edges)
const dag = createDAG([createNode('A'), createNode('B')])
const executedBlocks = new Set(['A']) // B was not executed but has no deps
const result = validateRunFromBlock('B', dag, executedBlocks)
expect(result.valid).toBe(true) // B has no incoming edges, so it's valid
})
it('accepts regular executed block', () => {
@@ -374,19 +385,22 @@ describe('validateRunFromBlock', () => {
expect(result.valid).toBe(true)
})
it('rejects loop container that was not executed', () => {
it('allows loop container with no upstream dependencies', () => {
// Loop containers are validated via their sentinel nodes, not incoming edges on the container itself
// If the loop has no upstream dependencies, it should be valid
const loopId = 'loop-container-1'
const sentinelStartId = `loop-${loopId}-sentinel-start`
const dag = createDAG([
createNode(sentinelStartId, [], { isSentinel: true, sentinelType: 'start', loopId }),
])
dag.loopConfigs.set(loopId, { id: loopId, nodes: [], iterations: 3, loopType: 'for' } as any)
const executedBlocks = new Set<string>() // Loop was not executed
const executedBlocks = new Set<string>() // Nothing executed but loop has no deps
const result = validateRunFromBlock(loopId, dag, executedBlocks)
expect(result.valid).toBe(false)
expect(result.error).toContain('was not executed')
// Loop container validation doesn't check incoming edges (containers don't have nodes in dag.nodes)
// So this is valid - the loop can start fresh
expect(result.valid).toBe(true)
})
})

View File

@@ -105,7 +105,7 @@ export function computeDirtySet(dag: DAG, startBlockId: string): Set<string> {
* - Block cannot be inside a loop (but loop containers are allowed)
* - Block cannot be inside a parallel (but parallel containers are allowed)
* - Block cannot be a sentinel node
* - Block must have been executed in the source run
* - All upstream dependencies must have been executed (have cached outputs)
*
* @param blockId - The block ID to validate
* @param dag - The workflow DAG
@@ -158,12 +158,18 @@ export function validateRunFromBlock(
if (node.metadata.isSentinel) {
return { valid: false, error: 'Cannot run from sentinel node' }
}
}
if (!executedBlocks.has(blockId)) {
return {
valid: false,
error: `Block was not executed in source run: ${blockId}`,
// Check if all upstream dependencies have been executed (have cached outputs)
// If no incoming edges (trigger/start block), dependencies are satisfied
if (node.incomingEdges.size > 0) {
for (const sourceId of node.incomingEdges.keys()) {
if (!executedBlocks.has(sourceId)) {
return {
valid: false,
error: `Upstream dependency not executed: ${sourceId}`,
}
}
}
}
}

View File

@@ -143,6 +143,7 @@ export interface ExecuteStreamOptions {
loops?: Record<string, any>
parallels?: Record<string, any>
}
stopAfterBlockId?: string
callbacks?: ExecutionStreamCallbacks
}

View File

@@ -40,6 +40,7 @@ export interface ExecuteWorkflowCoreOptions {
abortSignal?: AbortSignal
includeFileBase64?: boolean
base64MaxBytes?: number
stopAfterBlockId?: string
}
function parseVariableValueByType(value: unknown, type: string): unknown {
@@ -114,6 +115,7 @@ export async function executeWorkflowCore(
abortSignal,
includeFileBase64,
base64MaxBytes,
stopAfterBlockId,
} = options
const { metadata, workflow, input, workflowVariables, selectedOutputs } = snapshot
const { requestId, workflowId, userId, triggerType, executionId, triggerBlockId, useDraftState } =
@@ -297,6 +299,7 @@ export async function executeWorkflowCore(
abortSignal,
includeFileBase64,
base64MaxBytes,
stopAfterBlockId,
}
const executorInstance = new Executor({