This commit is contained in:
Siddharth Ganesan
2026-01-27 18:35:33 -08:00
parent 28fbd0c086
commit 0ead5aa04e
4 changed files with 78 additions and 18 deletions

View File

@@ -91,6 +91,9 @@ export function BlockMenu({
const allNoteBlocks = selectedBlocks.every((b) => b.type === 'note')
const isSubflow =
isSingleBlock && (selectedBlocks[0]?.type === 'loop' || selectedBlocks[0]?.type === 'parallel')
const isInsideSubflow =
isSingleBlock &&
(selectedBlocks[0]?.parentType === 'loop' || selectedBlocks[0]?.parentType === 'parallel')
const canRemoveFromSubflow = showRemoveFromSubflow && !hasTriggerBlock
@@ -212,8 +215,8 @@ export function BlockMenu({
</PopoverItem>
)}
{/* Run from/until block - only for single non-note block selection */}
{isSingleBlock && !allNoteBlocks && (
{/* Run from/until block - only for single non-note block, not inside subflows */}
{isSingleBlock && !allNoteBlocks && !isInsideSubflow && (
<>
<PopoverDivider />
<PopoverItem
@@ -227,17 +230,20 @@ export function BlockMenu({
>
Run from block
</PopoverItem>
<PopoverItem
disabled={isExecuting}
onClick={() => {
if (!isExecuting) {
onRunUntilBlock?.()
onClose()
}
}}
>
Run until block
</PopoverItem>
{/* Hide "Run until" for triggers - they're always at the start */}
{!hasTriggerBlock && (
<PopoverItem
disabled={isExecuting}
onClick={() => {
if (!isExecuting) {
onRunUntilBlock?.()
onClose()
}
}}
>
Run until block
</PopoverItem>
)}
</>
)}

View File

@@ -1128,8 +1128,22 @@ const WorkflowContent = React.memo(() => {
const snapshot = getLastExecutionSnapshot(workflowIdParam)
const incomingEdges = edges.filter((edge) => edge.target === block.id)
const isTriggerBlock = incomingEdges.length === 0
const isSubflow = block.type === 'loop' || block.type === 'parallel'
// For subflows, check if the sentinel-end was executed (meaning the subflow completed at least once)
// Sentinel IDs follow the pattern: loop-{id}-sentinel-end or parallel-{id}-sentinel-end
const subflowWasExecuted =
isSubflow &&
snapshot &&
snapshot.executedBlocks.some(
(executedId) =>
executedId === `loop-${block.id}-sentinel-end` ||
executedId === `parallel-${block.id}-sentinel-end`
)
const dependenciesSatisfied =
isTriggerBlock ||
subflowWasExecuted ||
(snapshot && incomingEdges.every((edge) => snapshot.executedBlocks.includes(edge.source)))
const isNoteBlock = block.type === 'note'
const isInsideSubflow =

View File

@@ -397,9 +397,14 @@ export class ExecutionEngine {
}
if (this.context.stopAfterBlockId === nodeId) {
logger.info('Stopping execution after target block', { nodeId })
this.stoppedEarlyFlag = true
return
// For loop/parallel sentinels, only stop if the subflow has fully exited (all iterations done)
// shouldContinue: true means more iterations, shouldExit: true means loop is done
const shouldContinueLoop = output.shouldContinue === true
if (!shouldContinueLoop) {
logger.info('Stopping execution after target block', { nodeId })
this.stoppedEarlyFlag = true
return
}
}
const readyNodes = this.edgeManager.processOutgoingEdges(node, output, false)

View File

@@ -26,6 +26,10 @@ import {
buildStartBlockOutput,
resolveExecutorStartBlock,
} from '@/executor/utils/start-block'
import {
extractLoopIdFromSentinel,
extractParallelIdFromSentinel,
} from '@/executor/utils/subflow-utils'
import { VariableResolver } from '@/executor/variables/resolver'
import type { SerializedWorkflow } from '@/serializer/types'
@@ -119,19 +123,50 @@ export class DAGExecutor {
const { dirtySet, upstreamSet } = computeExecutionSets(dag, startBlockId)
const effectiveStartBlockId = resolveContainerToSentinelStart(startBlockId, dag) ?? startBlockId
// Extract container IDs from sentinel IDs in upstream set
const upstreamContainerIds = new Set<string>()
for (const nodeId of upstreamSet) {
const loopId = extractLoopIdFromSentinel(nodeId)
if (loopId) upstreamContainerIds.add(loopId)
const parallelId = extractParallelIdFromSentinel(nodeId)
if (parallelId) upstreamContainerIds.add(parallelId)
}
// Filter snapshot to only include upstream blocks - prevents references to non-upstream blocks
const filteredBlockStates: Record<string, any> = {}
for (const [blockId, state] of Object.entries(sourceSnapshot.blockStates)) {
if (upstreamSet.has(blockId)) {
if (upstreamSet.has(blockId) || upstreamContainerIds.has(blockId)) {
filteredBlockStates[blockId] = state
}
}
const filteredExecutedBlocks = sourceSnapshot.executedBlocks.filter((id) => upstreamSet.has(id))
const filteredExecutedBlocks = sourceSnapshot.executedBlocks.filter(
(id) => upstreamSet.has(id) || upstreamContainerIds.has(id)
)
// Filter loop/parallel executions to only include upstream containers
const filteredLoopExecutions: Record<string, any> = {}
if (sourceSnapshot.loopExecutions) {
for (const [loopId, execution] of Object.entries(sourceSnapshot.loopExecutions)) {
if (upstreamContainerIds.has(loopId)) {
filteredLoopExecutions[loopId] = execution
}
}
}
const filteredParallelExecutions: Record<string, any> = {}
if (sourceSnapshot.parallelExecutions) {
for (const [parallelId, execution] of Object.entries(sourceSnapshot.parallelExecutions)) {
if (upstreamContainerIds.has(parallelId)) {
filteredParallelExecutions[parallelId] = execution
}
}
}
const filteredSnapshot: SerializableExecutionState = {
...sourceSnapshot,
blockStates: filteredBlockStates,
executedBlocks: filteredExecutedBlocks,
loopExecutions: filteredLoopExecutions,
parallelExecutions: filteredParallelExecutions,
}
logger.info('Executing from block', {