This commit is contained in:
Siddharth Ganesan
2026-01-27 17:53:37 -08:00
parent 4996eea2ee
commit c68cda63ae
5 changed files with 105 additions and 110 deletions

View File

@@ -1482,31 +1482,9 @@ export function useWorkflowExecution() {
const candidates = resolveStartCandidates(mergedStates, { execution: 'manual' })
const candidate = candidates.find((c) => c.blockId === blockId)
logger.info('Run-from-block trigger analysis', {
blockId,
blockType: workflowBlocks[blockId]?.type,
blockTriggerMode: workflowBlocks[blockId]?.triggerMode,
candidateFound: !!candidate,
candidatePath: candidate?.path,
allCandidates: candidates.map((c) => ({
blockId: c.blockId,
type: c.block.type,
path: c.path,
})),
})
if (candidate) {
const needsMockPayload = triggerNeedsMockPayload(candidate)
logger.info('Trigger mock payload check', {
needsMockPayload,
path: candidate.path,
isExternalTrigger: candidate.path === StartBlockPath.EXTERNAL_TRIGGER,
blockType: candidate.block.type,
})
if (needsMockPayload) {
if (triggerNeedsMockPayload(candidate)) {
workflowInput = extractTriggerMockPayload(candidate)
logger.info('Extracted mock payload for trigger block', { blockId, workflowInput })
} else if (
candidate.path === StartBlockPath.SPLIT_API ||
candidate.path === StartBlockPath.SPLIT_INPUT ||
@@ -1522,46 +1500,27 @@ export function useWorkflowExecution() {
})
if (Object.keys(testInput).length > 0) {
workflowInput = testInput
logger.info('Extracted test input for trigger block', { blockId, workflowInput })
}
}
}
} else {
// Fallback for trigger blocks not found in candidates
// This can happen when the block is a trigger by position (no incoming edges)
// but wasn't classified as a start candidate (e.g., triggerMode not set)
// Fallback: block is trigger by position but not classified as start candidate
const block = mergedStates[blockId]
if (block) {
const blockConfig = getBlock(block.type)
const hasTriggers = blockConfig?.triggers?.available?.length
if (hasTriggers || block.triggerMode) {
// Block has trigger capability - extract mock payload
const syntheticCandidate = {
workflowInput = extractTriggerMockPayload({
blockId,
block,
path: StartBlockPath.EXTERNAL_TRIGGER,
}
workflowInput = extractTriggerMockPayload(syntheticCandidate)
logger.info('Extracted mock payload for trigger block (fallback)', {
blockId,
blockType: block.type,
hasTriggers,
triggerMode: block.triggerMode,
workflowInput,
})
}
}
}
}
logger.info('Starting run-from-block execution', {
workflowId,
startBlockId: blockId,
isTriggerBlock,
hasInput: !!workflowInput,
})
setIsExecuting(true)
const executionId = uuidv4()
const accumulatedBlockLogs: BlockLog[] = []
@@ -1576,10 +1535,6 @@ export function useWorkflowExecution() {
sourceSnapshot: effectiveSnapshot,
input: workflowInput,
callbacks: {
onExecutionStarted: (data) => {
logger.info('Run-from-block execution started:', data)
},
onBlockStarted: (data) => {
activeBlocksSet.add(data.blockId)
setActiveBlocks(new Set(activeBlocksSet))
@@ -1702,17 +1657,10 @@ export function useWorkflowExecution() {
activeExecutionPath: Array.from(mergedExecutedBlocks),
}
setLastExecutionSnapshot(workflowId, updatedSnapshot)
logger.info('Updated execution snapshot after run-from-block', {
workflowId,
newBlocksExecuted: executedBlockIds.size,
})
}
},
onExecutionError: (data) => {
logger.error('Run-from-block execution error:', data.error)
// If block not found, the snapshot is stale - clear it
if (data.error?.includes('Block not found in workflow')) {
clearLastExecutionSnapshot(workflowId)
addNotification({
@@ -1720,7 +1668,6 @@ export function useWorkflowExecution() {
message: 'Workflow was modified. Run the workflow again to refresh.',
workflowId,
})
logger.info('Cleared stale execution snapshot', { workflowId })
} else {
addNotification({
level: 'error',
@@ -1729,15 +1676,11 @@ export function useWorkflowExecution() {
})
}
},
onExecutionCancelled: () => {
logger.info('Run-from-block execution cancelled')
},
},
})
} catch (error) {
if ((error as Error).name !== 'AbortError') {
logger.error('Run-from-block execution failed:', error)
logger.error('Run-from-block failed:', error)
}
} finally {
setIsExecuting(false)

View File

@@ -145,9 +145,6 @@ export class DAGExecutor {
effectiveStartBlockId,
dirtySetSize: dirtySet.size,
upstreamSetSize: upstreamSet.size,
filteredBlockStatesCount: Object.keys(filteredBlockStates).length,
totalBlocks: dag.nodes.size,
dirtyBlocks: Array.from(dirtySet),
})
// Remove incoming edges from non-dirty sources so convergent blocks don't wait for cached upstream
@@ -164,10 +161,6 @@ export class DAGExecutor {
for (const sourceId of nonDirtyIncoming) {
node.incomingEdges.delete(sourceId)
logger.debug('Removed non-dirty incoming edge for run-from-block', {
nodeId,
sourceId,
})
}
}
@@ -327,11 +320,6 @@ export class DAGExecutor {
if (isRegularBlock) {
this.initializeStarterBlock(context, state, startBlockId)
logger.info('Run-from-block mode: initialized start block', { startBlockId })
} else {
logger.info('Run-from-block mode: skipping starter block init for container/sentinel', {
startBlockId,
})
}
} else {
this.initializeStarterBlock(context, state, triggerBlockId)

View File

@@ -1,9 +1,16 @@
import { describe, expect, it } from 'vitest'
import type { DAG, DAGNode } from '@/executor/dag/builder'
import type { DAGEdge, NodeMetadata } from '@/executor/dag/types'
import { computeDirtySet, validateRunFromBlock } from '@/executor/utils/run-from-block'
import { computeExecutionSets, validateRunFromBlock } from '@/executor/utils/run-from-block'
import type { SerializedLoop, SerializedParallel } from '@/serializer/types'
/**
* Helper to extract dirty set from computeExecutionSets
*/
function computeDirtySet(dag: DAG, startBlockId: string): Set<string> {
return computeExecutionSets(dag, startBlockId).dirtySet
}
/**
* Helper to create a DAG node for testing
*/
@@ -491,3 +498,95 @@ describe('computeDirtySet with containers', () => {
expect(dirtySet.has('A')).toBe(false)
})
})
describe('computeExecutionSets upstream set', () => {
it('includes all upstream blocks in linear workflow', () => {
// A → B → C → D
const dag = createDAG([
createNode('A', [{ target: 'B' }]),
createNode('B', [{ target: 'C' }]),
createNode('C', [{ target: 'D' }]),
createNode('D'),
])
const { upstreamSet } = computeExecutionSets(dag, 'C')
expect(upstreamSet.has('A')).toBe(true)
expect(upstreamSet.has('B')).toBe(true)
expect(upstreamSet.has('C')).toBe(false) // start block not in upstream
expect(upstreamSet.has('D')).toBe(false) // downstream
})
it('includes all branches in convergent upstream', () => {
// A → C
// B → C → D
const dag = createDAG([
createNode('A', [{ target: 'C' }]),
createNode('B', [{ target: 'C' }]),
createNode('C', [{ target: 'D' }]),
createNode('D'),
])
const { upstreamSet } = computeExecutionSets(dag, 'C')
expect(upstreamSet.has('A')).toBe(true)
expect(upstreamSet.has('B')).toBe(true)
expect(upstreamSet.has('C')).toBe(false)
expect(upstreamSet.has('D')).toBe(false)
})
it('excludes parallel branches not in upstream path', () => {
// A → B → D
// A → C → D
// Running from B: upstream is A only, not C
const dag = createDAG([
createNode('A', [{ target: 'B' }, { target: 'C' }]),
createNode('B', [{ target: 'D' }]),
createNode('C', [{ target: 'D' }]),
createNode('D'),
])
const { upstreamSet, dirtySet } = computeExecutionSets(dag, 'B')
// Upstream should only contain A
expect(upstreamSet.has('A')).toBe(true)
expect(upstreamSet.has('C')).toBe(false) // parallel branch, not upstream of B
// Dirty should contain B and D
expect(dirtySet.has('B')).toBe(true)
expect(dirtySet.has('D')).toBe(true)
expect(dirtySet.has('C')).toBe(false)
})
it('handles diamond pattern upstream correctly', () => {
// B
// ↗ ↘
// A D → E
// ↘ ↗
// C
// Running from D: upstream should be A, B, C
const dag = createDAG([
createNode('A', [{ target: 'B' }, { target: 'C' }]),
createNode('B', [{ target: 'D' }]),
createNode('C', [{ target: 'D' }]),
createNode('D', [{ target: 'E' }]),
createNode('E'),
])
const { upstreamSet, dirtySet } = computeExecutionSets(dag, 'D')
expect(upstreamSet.has('A')).toBe(true)
expect(upstreamSet.has('B')).toBe(true)
expect(upstreamSet.has('C')).toBe(true)
expect(upstreamSet.has('D')).toBe(false)
expect(dirtySet.has('D')).toBe(true)
expect(dirtySet.has('E')).toBe(true)
})
it('returns empty upstream set for root block', () => {
const dag = createDAG([createNode('A', [{ target: 'B' }]), createNode('B')])
const { upstreamSet } = computeExecutionSets(dag, 'A')
expect(upstreamSet.size).toBe(0)
})
})

View File

@@ -1,9 +1,6 @@
import { createLogger } from '@sim/logger'
import { LOOP, PARALLEL } from '@/executor/constants'
import type { DAG } from '@/executor/dag/builder'
const logger = createLogger('run-from-block')
/**
* Builds the sentinel-start node ID for a loop.
*/
@@ -112,30 +109,9 @@ export function computeExecutionSets(dag: DAG, startBlockId: string): ExecutionS
}
}
logger.debug('Computed execution sets', {
startBlockId,
traversalStartId,
dirtySetSize: dirty.size,
upstreamSetSize: upstream.size,
})
return { dirtySet: dirty, upstreamSet: upstream }
}
/**
* @deprecated Use computeExecutionSets instead for combined computation
*/
export function computeDirtySet(dag: DAG, startBlockId: string): Set<string> {
return computeExecutionSets(dag, startBlockId).dirtySet
}
/**
* @deprecated Use computeExecutionSets instead for combined computation
*/
export function computeUpstreamSet(dag: DAG, blockId: string): Set<string> {
return computeExecutionSets(dag, blockId).upstreamSet
}
/**
* Validates that a block can be used as a run-from-block starting point.
*

View File

@@ -27,10 +27,7 @@ import type {
} from '@/executor/execution/types'
import type { ExecutionResult, NormalizedBlockOutput } from '@/executor/types'
import { hasExecutionResult } from '@/executor/utils/errors'
import {
buildParallelSentinelEndId,
buildSentinelEndId,
} from '@/executor/utils/subflow-utils'
import { buildParallelSentinelEndId, buildSentinelEndId } from '@/executor/utils/subflow-utils'
import { Serializer } from '@/serializer'
const logger = createLogger('ExecutionCore')
@@ -264,16 +261,8 @@ export async function executeWorkflowCore(
if (stopAfterBlockId) {
if (serializedWorkflow.loops?.[stopAfterBlockId]) {
resolvedStopAfterBlockId = buildSentinelEndId(stopAfterBlockId)
logger.info(`[${requestId}] Resolved loop container to sentinel-end`, {
original: stopAfterBlockId,
resolved: resolvedStopAfterBlockId,
})
} else if (serializedWorkflow.parallels?.[stopAfterBlockId]) {
resolvedStopAfterBlockId = buildParallelSentinelEndId(stopAfterBlockId)
logger.info(`[${requestId}] Resolved parallel container to sentinel-end`, {
original: stopAfterBlockId,
resolved: resolvedStopAfterBlockId,
})
}
}