mirror of
https://github.com/simstudioai/sim.git
synced 2026-04-28 03:00:29 -04:00
feat(executor): run from/until block (#3029)
* Run from block * Fixes * Fix * Fix * Minor improvements * Fix * Fix trace spans * Fix loop l ogs * Change ordering * Run u ntil block * Lint * Clean up * Fix * Allow run from block for triggers * Consolidation * Fix lint * Fix * Fix mock payload * Fix * Fix trigger clear snapshot * Fix loops and parallels * Fix * Cleanup * Fix test * Fix bugs * Catch error * Fix * Fix * I think it works?? * Fix * Fix * Add tests * Fix lint --------- Co-authored-by: Vikhyath Mondreti <vikhyath@simstudio.ai>
This commit is contained in:
committed by
GitHub
parent
72a2f79701
commit
655fe4f3b7
@@ -33,6 +33,15 @@ export interface DAG {
|
||||
parallelConfigs: Map<string, SerializedParallel>
|
||||
}
|
||||
|
||||
export interface DAGBuildOptions {
|
||||
/** Trigger block ID to start path construction from */
|
||||
triggerBlockId?: string
|
||||
/** Saved incoming edges from snapshot for resumption */
|
||||
savedIncomingEdges?: Record<string, string[]>
|
||||
/** Include all enabled blocks instead of only those reachable from trigger */
|
||||
includeAllBlocks?: boolean
|
||||
}
|
||||
|
||||
export class DAGBuilder {
|
||||
private pathConstructor = new PathConstructor()
|
||||
private loopConstructor = new LoopConstructor()
|
||||
@@ -40,11 +49,9 @@ export class DAGBuilder {
|
||||
private nodeConstructor = new NodeConstructor()
|
||||
private edgeConstructor = new EdgeConstructor()
|
||||
|
||||
build(
|
||||
workflow: SerializedWorkflow,
|
||||
triggerBlockId?: string,
|
||||
savedIncomingEdges?: Record<string, string[]>
|
||||
): DAG {
|
||||
build(workflow: SerializedWorkflow, options: DAGBuildOptions = {}): DAG {
|
||||
const { triggerBlockId, savedIncomingEdges, includeAllBlocks } = options
|
||||
|
||||
const dag: DAG = {
|
||||
nodes: new Map(),
|
||||
loopConfigs: new Map(),
|
||||
@@ -53,7 +60,7 @@ export class DAGBuilder {
|
||||
|
||||
this.initializeConfigs(workflow, dag)
|
||||
|
||||
const reachableBlocks = this.pathConstructor.execute(workflow, triggerBlockId)
|
||||
const reachableBlocks = this.pathConstructor.execute(workflow, triggerBlockId, includeAllBlocks)
|
||||
|
||||
this.loopConstructor.execute(dag, reachableBlocks)
|
||||
this.parallelConstructor.execute(dag, reachableBlocks)
|
||||
|
||||
@@ -6,7 +6,16 @@ import type { SerializedBlock, SerializedWorkflow } from '@/serializer/types'
|
||||
const logger = createLogger('PathConstructor')
|
||||
|
||||
export class PathConstructor {
|
||||
execute(workflow: SerializedWorkflow, triggerBlockId?: string): Set<string> {
|
||||
execute(
|
||||
workflow: SerializedWorkflow,
|
||||
triggerBlockId?: string,
|
||||
includeAllBlocks?: boolean
|
||||
): Set<string> {
|
||||
// For run-from-block mode, include all enabled blocks regardless of trigger reachability
|
||||
if (includeAllBlocks) {
|
||||
return this.getAllEnabledBlocks(workflow)
|
||||
}
|
||||
|
||||
const resolvedTriggerId = this.findTriggerBlock(workflow, triggerBlockId)
|
||||
|
||||
if (!resolvedTriggerId) {
|
||||
|
||||
@@ -26,6 +26,7 @@ export class ExecutionEngine {
|
||||
private allowResumeTriggers: boolean
|
||||
private cancelledFlag = false
|
||||
private errorFlag = false
|
||||
private stoppedEarlyFlag = false
|
||||
private executionError: Error | null = null
|
||||
private lastCancellationCheck = 0
|
||||
private readonly useRedisCancellation: boolean
|
||||
@@ -105,7 +106,7 @@ export class ExecutionEngine {
|
||||
this.initializeQueue(triggerBlockId)
|
||||
|
||||
while (this.hasWork()) {
|
||||
if ((await this.checkCancellation()) || this.errorFlag) {
|
||||
if ((await this.checkCancellation()) || this.errorFlag || this.stoppedEarlyFlag) {
|
||||
break
|
||||
}
|
||||
await this.processQueue()
|
||||
@@ -259,6 +260,16 @@ export class ExecutionEngine {
|
||||
}
|
||||
|
||||
private initializeQueue(triggerBlockId?: string): void {
|
||||
if (this.context.runFromBlockContext) {
|
||||
const { startBlockId } = this.context.runFromBlockContext
|
||||
logger.info('Initializing queue for run-from-block mode', {
|
||||
startBlockId,
|
||||
dirtySetSize: this.context.runFromBlockContext.dirtySet.size,
|
||||
})
|
||||
this.addToQueue(startBlockId)
|
||||
return
|
||||
}
|
||||
|
||||
const pendingBlocks = this.context.metadata.pendingBlocks
|
||||
const remainingEdges = (this.context.metadata as any).remainingEdges
|
||||
|
||||
@@ -385,6 +396,17 @@ export class ExecutionEngine {
|
||||
this.finalOutput = output
|
||||
}
|
||||
|
||||
if (this.context.stopAfterBlockId === nodeId) {
|
||||
// For loop/parallel sentinels, only stop if the subflow has fully exited (all iterations done)
|
||||
// shouldContinue: true means more iterations, shouldExit: true means loop is done
|
||||
const shouldContinueLoop = output.shouldContinue === true
|
||||
if (!shouldContinueLoop) {
|
||||
logger.info('Stopping execution after target block', { nodeId })
|
||||
this.stoppedEarlyFlag = true
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
const readyNodes = this.edgeManager.processOutgoingEdges(node, output, false)
|
||||
|
||||
logger.info('Processing outgoing edges', {
|
||||
|
||||
@@ -5,17 +5,31 @@ import { BlockExecutor } from '@/executor/execution/block-executor'
|
||||
import { EdgeManager } from '@/executor/execution/edge-manager'
|
||||
import { ExecutionEngine } from '@/executor/execution/engine'
|
||||
import { ExecutionState } from '@/executor/execution/state'
|
||||
import type { ContextExtensions, WorkflowInput } from '@/executor/execution/types'
|
||||
import type {
|
||||
ContextExtensions,
|
||||
SerializableExecutionState,
|
||||
WorkflowInput,
|
||||
} from '@/executor/execution/types'
|
||||
import { createBlockHandlers } from '@/executor/handlers/registry'
|
||||
import { LoopOrchestrator } from '@/executor/orchestrators/loop'
|
||||
import { NodeExecutionOrchestrator } from '@/executor/orchestrators/node'
|
||||
import { ParallelOrchestrator } from '@/executor/orchestrators/parallel'
|
||||
import type { BlockState, ExecutionContext, ExecutionResult } from '@/executor/types'
|
||||
import {
|
||||
computeExecutionSets,
|
||||
type RunFromBlockContext,
|
||||
resolveContainerToSentinelStart,
|
||||
validateRunFromBlock,
|
||||
} from '@/executor/utils/run-from-block'
|
||||
import {
|
||||
buildResolutionFromBlock,
|
||||
buildStartBlockOutput,
|
||||
resolveExecutorStartBlock,
|
||||
} from '@/executor/utils/start-block'
|
||||
import {
|
||||
extractLoopIdFromSentinel,
|
||||
extractParallelIdFromSentinel,
|
||||
} from '@/executor/utils/subflow-utils'
|
||||
import { VariableResolver } from '@/executor/variables/resolver'
|
||||
import type { SerializedWorkflow } from '@/serializer/types'
|
||||
|
||||
@@ -48,7 +62,10 @@ export class DAGExecutor {
|
||||
|
||||
async execute(workflowId: string, triggerBlockId?: string): Promise<ExecutionResult> {
|
||||
const savedIncomingEdges = this.contextExtensions.dagIncomingEdges
|
||||
const dag = this.dagBuilder.build(this.workflow, triggerBlockId, savedIncomingEdges)
|
||||
const dag = this.dagBuilder.build(this.workflow, {
|
||||
triggerBlockId,
|
||||
savedIncomingEdges,
|
||||
})
|
||||
const { context, state } = this.createExecutionContext(workflowId, triggerBlockId)
|
||||
|
||||
const resolver = new VariableResolver(this.workflow, this.workflowVariables, state)
|
||||
@@ -89,17 +106,156 @@ export class DAGExecutor {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Execute from a specific block using cached outputs for upstream blocks.
|
||||
*/
|
||||
async executeFromBlock(
|
||||
workflowId: string,
|
||||
startBlockId: string,
|
||||
sourceSnapshot: SerializableExecutionState
|
||||
): Promise<ExecutionResult> {
|
||||
// Build full DAG with all blocks to compute upstream set for snapshot filtering
|
||||
// includeAllBlocks is needed because the startBlockId might be a trigger not reachable from the main trigger
|
||||
const dag = this.dagBuilder.build(this.workflow, { includeAllBlocks: true })
|
||||
|
||||
const executedBlocks = new Set(sourceSnapshot.executedBlocks)
|
||||
const validation = validateRunFromBlock(startBlockId, dag, executedBlocks)
|
||||
if (!validation.valid) {
|
||||
throw new Error(validation.error)
|
||||
}
|
||||
|
||||
const { dirtySet, upstreamSet, reachableUpstreamSet } = computeExecutionSets(dag, startBlockId)
|
||||
const effectiveStartBlockId = resolveContainerToSentinelStart(startBlockId, dag) ?? startBlockId
|
||||
|
||||
// Extract container IDs from sentinel IDs in reachable upstream set
|
||||
// Use reachableUpstreamSet (not upstreamSet) to preserve sibling branch outputs
|
||||
// Example: A->C, B->C where C references A.result || B.result
|
||||
// When running from A, B's output should be preserved for C to reference
|
||||
const reachableContainerIds = new Set<string>()
|
||||
for (const nodeId of reachableUpstreamSet) {
|
||||
const loopId = extractLoopIdFromSentinel(nodeId)
|
||||
if (loopId) reachableContainerIds.add(loopId)
|
||||
const parallelId = extractParallelIdFromSentinel(nodeId)
|
||||
if (parallelId) reachableContainerIds.add(parallelId)
|
||||
}
|
||||
|
||||
// Filter snapshot to include all blocks reachable from dirty blocks
|
||||
// This preserves sibling branch outputs that dirty blocks may reference
|
||||
const filteredBlockStates: Record<string, any> = {}
|
||||
for (const [blockId, state] of Object.entries(sourceSnapshot.blockStates)) {
|
||||
if (reachableUpstreamSet.has(blockId) || reachableContainerIds.has(blockId)) {
|
||||
filteredBlockStates[blockId] = state
|
||||
}
|
||||
}
|
||||
const filteredExecutedBlocks = sourceSnapshot.executedBlocks.filter(
|
||||
(id) => reachableUpstreamSet.has(id) || reachableContainerIds.has(id)
|
||||
)
|
||||
|
||||
// Filter loop/parallel executions to only include reachable containers
|
||||
const filteredLoopExecutions: Record<string, any> = {}
|
||||
if (sourceSnapshot.loopExecutions) {
|
||||
for (const [loopId, execution] of Object.entries(sourceSnapshot.loopExecutions)) {
|
||||
if (reachableContainerIds.has(loopId)) {
|
||||
filteredLoopExecutions[loopId] = execution
|
||||
}
|
||||
}
|
||||
}
|
||||
const filteredParallelExecutions: Record<string, any> = {}
|
||||
if (sourceSnapshot.parallelExecutions) {
|
||||
for (const [parallelId, execution] of Object.entries(sourceSnapshot.parallelExecutions)) {
|
||||
if (reachableContainerIds.has(parallelId)) {
|
||||
filteredParallelExecutions[parallelId] = execution
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
const filteredSnapshot: SerializableExecutionState = {
|
||||
...sourceSnapshot,
|
||||
blockStates: filteredBlockStates,
|
||||
executedBlocks: filteredExecutedBlocks,
|
||||
loopExecutions: filteredLoopExecutions,
|
||||
parallelExecutions: filteredParallelExecutions,
|
||||
}
|
||||
|
||||
logger.info('Executing from block', {
|
||||
workflowId,
|
||||
startBlockId,
|
||||
effectiveStartBlockId,
|
||||
dirtySetSize: dirtySet.size,
|
||||
upstreamSetSize: upstreamSet.size,
|
||||
reachableUpstreamSetSize: reachableUpstreamSet.size,
|
||||
})
|
||||
|
||||
// Remove incoming edges from non-dirty sources so convergent blocks don't wait for cached upstream
|
||||
for (const nodeId of dirtySet) {
|
||||
const node = dag.nodes.get(nodeId)
|
||||
if (!node) continue
|
||||
|
||||
const nonDirtyIncoming: string[] = []
|
||||
for (const sourceId of node.incomingEdges) {
|
||||
if (!dirtySet.has(sourceId)) {
|
||||
nonDirtyIncoming.push(sourceId)
|
||||
}
|
||||
}
|
||||
|
||||
for (const sourceId of nonDirtyIncoming) {
|
||||
node.incomingEdges.delete(sourceId)
|
||||
}
|
||||
}
|
||||
|
||||
const runFromBlockContext = { startBlockId: effectiveStartBlockId, dirtySet }
|
||||
const { context, state } = this.createExecutionContext(workflowId, undefined, {
|
||||
snapshotState: filteredSnapshot,
|
||||
runFromBlockContext,
|
||||
})
|
||||
|
||||
const resolver = new VariableResolver(this.workflow, this.workflowVariables, state)
|
||||
const loopOrchestrator = new LoopOrchestrator(dag, state, resolver)
|
||||
loopOrchestrator.setContextExtensions(this.contextExtensions)
|
||||
const parallelOrchestrator = new ParallelOrchestrator(dag, state)
|
||||
parallelOrchestrator.setResolver(resolver)
|
||||
parallelOrchestrator.setContextExtensions(this.contextExtensions)
|
||||
const allHandlers = createBlockHandlers()
|
||||
const blockExecutor = new BlockExecutor(allHandlers, resolver, this.contextExtensions, state)
|
||||
const edgeManager = new EdgeManager(dag)
|
||||
loopOrchestrator.setEdgeManager(edgeManager)
|
||||
const nodeOrchestrator = new NodeExecutionOrchestrator(
|
||||
dag,
|
||||
state,
|
||||
blockExecutor,
|
||||
loopOrchestrator,
|
||||
parallelOrchestrator
|
||||
)
|
||||
const engine = new ExecutionEngine(context, dag, edgeManager, nodeOrchestrator)
|
||||
|
||||
return await engine.run()
|
||||
}
|
||||
|
||||
private createExecutionContext(
|
||||
workflowId: string,
|
||||
triggerBlockId?: string
|
||||
triggerBlockId?: string,
|
||||
overrides?: {
|
||||
snapshotState?: SerializableExecutionState
|
||||
runFromBlockContext?: RunFromBlockContext
|
||||
}
|
||||
): { context: ExecutionContext; state: ExecutionState } {
|
||||
const snapshotState = this.contextExtensions.snapshotState
|
||||
const snapshotState = overrides?.snapshotState ?? this.contextExtensions.snapshotState
|
||||
const blockStates = snapshotState?.blockStates
|
||||
? new Map(Object.entries(snapshotState.blockStates))
|
||||
: new Map<string, BlockState>()
|
||||
const executedBlocks = snapshotState?.executedBlocks
|
||||
let executedBlocks = snapshotState?.executedBlocks
|
||||
? new Set(snapshotState.executedBlocks)
|
||||
: new Set<string>()
|
||||
|
||||
if (overrides?.runFromBlockContext) {
|
||||
const { dirtySet } = overrides.runFromBlockContext
|
||||
executedBlocks = new Set([...executedBlocks].filter((id) => !dirtySet.has(id)))
|
||||
logger.info('Cleared executed status for dirty blocks', {
|
||||
dirtySetSize: dirtySet.size,
|
||||
remainingExecutedBlocks: executedBlocks.size,
|
||||
})
|
||||
}
|
||||
|
||||
const state = new ExecutionState(blockStates, executedBlocks)
|
||||
|
||||
const context: ExecutionContext = {
|
||||
@@ -109,7 +265,7 @@ export class DAGExecutor {
|
||||
userId: this.contextExtensions.userId,
|
||||
isDeployedContext: this.contextExtensions.isDeployedContext,
|
||||
blockStates: state.getBlockStates(),
|
||||
blockLogs: snapshotState?.blockLogs ?? [],
|
||||
blockLogs: overrides?.runFromBlockContext ? [] : (snapshotState?.blockLogs ?? []),
|
||||
metadata: {
|
||||
...this.contextExtensions.metadata,
|
||||
startTime: new Date().toISOString(),
|
||||
@@ -169,6 +325,8 @@ export class DAGExecutor {
|
||||
abortSignal: this.contextExtensions.abortSignal,
|
||||
includeFileBase64: this.contextExtensions.includeFileBase64,
|
||||
base64MaxBytes: this.contextExtensions.base64MaxBytes,
|
||||
runFromBlockContext: overrides?.runFromBlockContext,
|
||||
stopAfterBlockId: this.contextExtensions.stopAfterBlockId,
|
||||
}
|
||||
|
||||
if (this.contextExtensions.resumeFromSnapshot) {
|
||||
@@ -193,6 +351,15 @@ export class DAGExecutor {
|
||||
pendingBlocks: context.metadata.pendingBlocks,
|
||||
skipStarterBlockInit: true,
|
||||
})
|
||||
} else if (overrides?.runFromBlockContext) {
|
||||
// In run-from-block mode, initialize the start block only if it's a regular block
|
||||
// Skip for sentinels/containers (loop/parallel) which aren't real blocks
|
||||
const startBlockId = overrides.runFromBlockContext.startBlockId
|
||||
const isRegularBlock = this.workflow.blocks.some((b) => b.id === startBlockId)
|
||||
|
||||
if (isRegularBlock) {
|
||||
this.initializeStarterBlock(context, state, startBlockId)
|
||||
}
|
||||
} else {
|
||||
this.initializeStarterBlock(context, state, triggerBlockId)
|
||||
}
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
import type { Edge } from 'reactflow'
|
||||
import type { BlockLog, BlockState, NormalizedBlockOutput } from '@/executor/types'
|
||||
import type { RunFromBlockContext } from '@/executor/utils/run-from-block'
|
||||
import type { SubflowType } from '@/stores/workflows/workflow/types'
|
||||
|
||||
export interface ExecutionMetadata {
|
||||
@@ -105,6 +106,17 @@ export interface ContextExtensions {
|
||||
output: { input?: any; output: NormalizedBlockOutput; executionTime: number },
|
||||
iterationContext?: IterationContext
|
||||
) => Promise<void>
|
||||
|
||||
/**
|
||||
* Run-from-block configuration. When provided, executor runs in partial
|
||||
* execution mode starting from the specified block.
|
||||
*/
|
||||
runFromBlockContext?: RunFromBlockContext
|
||||
|
||||
/**
|
||||
* Stop execution after this block completes. Used for "run until block" feature.
|
||||
*/
|
||||
stopAfterBlockId?: string
|
||||
}
|
||||
|
||||
export interface WorkflowInput {
|
||||
|
||||
@@ -276,7 +276,16 @@ export class LoopOrchestrator {
|
||||
scope: LoopScope
|
||||
): LoopContinuationResult {
|
||||
const results = scope.allIterationOutputs
|
||||
this.state.setBlockOutput(loopId, { results }, DEFAULTS.EXECUTION_TIME)
|
||||
const output = { results }
|
||||
this.state.setBlockOutput(loopId, output, DEFAULTS.EXECUTION_TIME)
|
||||
|
||||
// Emit onBlockComplete for the loop container so the UI can track it
|
||||
if (this.contextExtensions?.onBlockComplete) {
|
||||
this.contextExtensions.onBlockComplete(loopId, 'Loop', 'loop', {
|
||||
output,
|
||||
executionTime: DEFAULTS.EXECUTION_TIME,
|
||||
})
|
||||
}
|
||||
|
||||
return {
|
||||
shouldContinue: false,
|
||||
|
||||
@@ -31,7 +31,18 @@ export class NodeExecutionOrchestrator {
|
||||
throw new Error(`Node not found in DAG: ${nodeId}`)
|
||||
}
|
||||
|
||||
if (this.state.hasExecuted(nodeId)) {
|
||||
if (ctx.runFromBlockContext && !ctx.runFromBlockContext.dirtySet.has(nodeId)) {
|
||||
const cachedOutput = this.state.getBlockOutput(nodeId) || {}
|
||||
logger.debug('Skipping non-dirty block in run-from-block mode', { nodeId })
|
||||
return {
|
||||
nodeId,
|
||||
output: cachedOutput,
|
||||
isFinalOutput: false,
|
||||
}
|
||||
}
|
||||
|
||||
const isDirtyBlock = ctx.runFromBlockContext?.dirtySet.has(nodeId) ?? false
|
||||
if (!isDirtyBlock && this.state.hasExecuted(nodeId)) {
|
||||
const output = this.state.getBlockOutput(nodeId) || {}
|
||||
return {
|
||||
nodeId,
|
||||
|
||||
@@ -260,9 +260,17 @@ export class ParallelOrchestrator {
|
||||
const branchOutputs = scope.branchOutputs.get(i) || []
|
||||
results.push(branchOutputs)
|
||||
}
|
||||
this.state.setBlockOutput(parallelId, {
|
||||
results,
|
||||
})
|
||||
const output = { results }
|
||||
this.state.setBlockOutput(parallelId, output)
|
||||
|
||||
// Emit onBlockComplete for the parallel container so the UI can track it
|
||||
if (this.contextExtensions?.onBlockComplete) {
|
||||
this.contextExtensions.onBlockComplete(parallelId, 'Parallel', 'parallel', {
|
||||
output,
|
||||
executionTime: 0,
|
||||
})
|
||||
}
|
||||
|
||||
return {
|
||||
allBranchesComplete: true,
|
||||
results,
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
import type { TraceSpan } from '@/lib/logs/types'
|
||||
import type { PermissionGroupConfig } from '@/lib/permission-groups/types'
|
||||
import type { BlockOutput } from '@/blocks/types'
|
||||
import type { RunFromBlockContext } from '@/executor/utils/run-from-block'
|
||||
import type { SerializedBlock, SerializedWorkflow } from '@/serializer/types'
|
||||
|
||||
export interface UserFile {
|
||||
@@ -250,6 +251,17 @@ export interface ExecutionContext {
|
||||
* will not have their base64 content fetched.
|
||||
*/
|
||||
base64MaxBytes?: number
|
||||
|
||||
/**
|
||||
* Context for "run from block" mode. When present, only blocks in dirtySet
|
||||
* will be executed; others return cached outputs from the source snapshot.
|
||||
*/
|
||||
runFromBlockContext?: RunFromBlockContext
|
||||
|
||||
/**
|
||||
* Stop execution after this block completes. Used for "run until block" feature.
|
||||
*/
|
||||
stopAfterBlockId?: string
|
||||
}
|
||||
|
||||
export interface ExecutionResult {
|
||||
|
||||
1537
apps/sim/executor/utils/run-from-block.test.ts
Normal file
1537
apps/sim/executor/utils/run-from-block.test.ts
Normal file
File diff suppressed because it is too large
Load Diff
219
apps/sim/executor/utils/run-from-block.ts
Normal file
219
apps/sim/executor/utils/run-from-block.ts
Normal file
@@ -0,0 +1,219 @@
|
||||
import { LOOP, PARALLEL } from '@/executor/constants'
|
||||
import type { DAG } from '@/executor/dag/builder'
|
||||
|
||||
/**
|
||||
* Builds the sentinel-start node ID for a loop.
|
||||
*/
|
||||
function buildLoopSentinelStartId(loopId: string): string {
|
||||
return `${LOOP.SENTINEL.PREFIX}${loopId}${LOOP.SENTINEL.START_SUFFIX}`
|
||||
}
|
||||
|
||||
/**
|
||||
* Builds the sentinel-start node ID for a parallel.
|
||||
*/
|
||||
function buildParallelSentinelStartId(parallelId: string): string {
|
||||
return `${PARALLEL.SENTINEL.PREFIX}${parallelId}${PARALLEL.SENTINEL.START_SUFFIX}`
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks if a block ID is a loop or parallel container and returns the sentinel-start ID if so.
|
||||
* Returns null if the block is not a container.
|
||||
*/
|
||||
export function resolveContainerToSentinelStart(blockId: string, dag: DAG): string | null {
|
||||
if (dag.loopConfigs.has(blockId)) {
|
||||
return buildLoopSentinelStartId(blockId)
|
||||
}
|
||||
if (dag.parallelConfigs.has(blockId)) {
|
||||
return buildParallelSentinelStartId(blockId)
|
||||
}
|
||||
return null
|
||||
}
|
||||
|
||||
/**
|
||||
* Result of validating a block for run-from-block execution.
|
||||
*/
|
||||
export interface RunFromBlockValidation {
|
||||
valid: boolean
|
||||
error?: string
|
||||
}
|
||||
|
||||
/**
|
||||
* Context for run-from-block execution mode.
|
||||
*/
|
||||
export interface RunFromBlockContext {
|
||||
/** The block ID to start execution from */
|
||||
startBlockId: string
|
||||
/** Set of block IDs that need re-execution (start block + all downstream) */
|
||||
dirtySet: Set<string>
|
||||
}
|
||||
|
||||
/**
|
||||
* Result of computing execution sets for run-from-block mode.
|
||||
*/
|
||||
export interface ExecutionSets {
|
||||
/** Blocks that need re-execution (start block + all downstream) */
|
||||
dirtySet: Set<string>
|
||||
/** Blocks that are upstream (ancestors) of the start block */
|
||||
upstreamSet: Set<string>
|
||||
/** Blocks that are upstream of any dirty block (for snapshot preservation) */
|
||||
reachableUpstreamSet: Set<string>
|
||||
}
|
||||
|
||||
/**
|
||||
* Computes the dirty set, upstream set, and reachable upstream set.
|
||||
* - Dirty set: start block + all blocks reachable via outgoing edges (need re-execution)
|
||||
* - Upstream set: all blocks reachable via incoming edges from the start block
|
||||
* - Reachable upstream set: all non-dirty blocks that are upstream of ANY dirty block
|
||||
* (includes sibling branches that dirty blocks may reference)
|
||||
*
|
||||
* For loop/parallel containers, starts from the sentinel-start node and includes
|
||||
* the container ID itself in the dirty set.
|
||||
*
|
||||
* @param dag - The workflow DAG
|
||||
* @param startBlockId - The block to start execution from
|
||||
* @returns Object containing dirtySet, upstreamSet, and reachableUpstreamSet
|
||||
*/
|
||||
export function computeExecutionSets(dag: DAG, startBlockId: string): ExecutionSets {
|
||||
const dirty = new Set<string>([startBlockId])
|
||||
const upstream = new Set<string>()
|
||||
const sentinelStartId = resolveContainerToSentinelStart(startBlockId, dag)
|
||||
const traversalStartId = sentinelStartId ?? startBlockId
|
||||
|
||||
if (sentinelStartId) {
|
||||
dirty.add(sentinelStartId)
|
||||
}
|
||||
|
||||
// BFS downstream for dirty set
|
||||
const downstreamQueue = [traversalStartId]
|
||||
while (downstreamQueue.length > 0) {
|
||||
const nodeId = downstreamQueue.shift()!
|
||||
const node = dag.nodes.get(nodeId)
|
||||
if (!node) continue
|
||||
|
||||
for (const [, edge] of node.outgoingEdges) {
|
||||
if (!dirty.has(edge.target)) {
|
||||
dirty.add(edge.target)
|
||||
downstreamQueue.push(edge.target)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// BFS upstream from start block for upstream set
|
||||
const upstreamQueue = [traversalStartId]
|
||||
while (upstreamQueue.length > 0) {
|
||||
const nodeId = upstreamQueue.shift()!
|
||||
const node = dag.nodes.get(nodeId)
|
||||
if (!node) continue
|
||||
|
||||
for (const sourceId of node.incomingEdges) {
|
||||
if (!upstream.has(sourceId)) {
|
||||
upstream.add(sourceId)
|
||||
upstreamQueue.push(sourceId)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Compute reachable upstream: all non-dirty blocks upstream of ANY dirty block
|
||||
// This handles the case where a dirty block (like C in A->C, B->C) may reference
|
||||
// sibling branches (like B when running from A)
|
||||
const reachableUpstream = new Set<string>()
|
||||
for (const dirtyNodeId of dirty) {
|
||||
const node = dag.nodes.get(dirtyNodeId)
|
||||
if (!node) continue
|
||||
|
||||
// BFS upstream from this dirty node
|
||||
const queue = [...node.incomingEdges]
|
||||
while (queue.length > 0) {
|
||||
const sourceId = queue.shift()!
|
||||
if (reachableUpstream.has(sourceId) || dirty.has(sourceId)) continue
|
||||
|
||||
reachableUpstream.add(sourceId)
|
||||
const sourceNode = dag.nodes.get(sourceId)
|
||||
if (sourceNode) {
|
||||
queue.push(...sourceNode.incomingEdges)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return { dirtySet: dirty, upstreamSet: upstream, reachableUpstreamSet: reachableUpstream }
|
||||
}
|
||||
|
||||
/**
|
||||
* Validates that a block can be used as a run-from-block starting point.
|
||||
*
|
||||
* Validation rules:
|
||||
* - Block must exist in the DAG (or be a loop/parallel container)
|
||||
* - Block cannot be inside a loop (but loop containers are allowed)
|
||||
* - Block cannot be inside a parallel (but parallel containers are allowed)
|
||||
* - Block cannot be a sentinel node
|
||||
* - All upstream dependencies must have been executed (have cached outputs)
|
||||
*
|
||||
* @param blockId - The block ID to validate
|
||||
* @param dag - The workflow DAG
|
||||
* @param executedBlocks - Set of blocks that were executed in the source run
|
||||
* @returns Validation result with error message if invalid
|
||||
*/
|
||||
export function validateRunFromBlock(
|
||||
blockId: string,
|
||||
dag: DAG,
|
||||
executedBlocks: Set<string>
|
||||
): RunFromBlockValidation {
|
||||
const node = dag.nodes.get(blockId)
|
||||
const isLoopContainer = dag.loopConfigs.has(blockId)
|
||||
const isParallelContainer = dag.parallelConfigs.has(blockId)
|
||||
const isContainer = isLoopContainer || isParallelContainer
|
||||
|
||||
if (!node && !isContainer) {
|
||||
return { valid: false, error: `Block not found in workflow: ${blockId}` }
|
||||
}
|
||||
|
||||
if (isContainer) {
|
||||
const sentinelStartId = resolveContainerToSentinelStart(blockId, dag)
|
||||
if (!sentinelStartId || !dag.nodes.has(sentinelStartId)) {
|
||||
return {
|
||||
valid: false,
|
||||
error: `Container sentinel not found for: ${blockId}`,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (node) {
|
||||
if (node.metadata.isLoopNode) {
|
||||
return {
|
||||
valid: false,
|
||||
error: `Cannot run from block inside loop: ${node.metadata.loopId}`,
|
||||
}
|
||||
}
|
||||
|
||||
if (node.metadata.isParallelBranch) {
|
||||
return {
|
||||
valid: false,
|
||||
error: `Cannot run from block inside parallel: ${node.metadata.parallelId}`,
|
||||
}
|
||||
}
|
||||
|
||||
if (node.metadata.isSentinel) {
|
||||
return { valid: false, error: 'Cannot run from sentinel node' }
|
||||
}
|
||||
|
||||
// Check immediate upstream dependencies were executed
|
||||
for (const sourceId of node.incomingEdges) {
|
||||
const sourceNode = dag.nodes.get(sourceId)
|
||||
// Skip sentinel nodes - they're internal and not in executedBlocks
|
||||
if (sourceNode?.metadata.isSentinel) continue
|
||||
|
||||
// Skip trigger nodes - they're entry points and don't need prior execution
|
||||
// A trigger node has no incoming edges
|
||||
if (sourceNode && sourceNode.incomingEdges.size === 0) continue
|
||||
|
||||
if (!executedBlocks.has(sourceId)) {
|
||||
return {
|
||||
valid: false,
|
||||
error: `Upstream dependency not executed: ${sourceId}`,
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return { valid: true }
|
||||
}
|
||||
Reference in New Issue
Block a user