This commit is contained in:
Siddharth Ganesan
2026-01-26 16:40:14 -08:00
parent 3d0b810a8e
commit e8534bea7a
6 changed files with 145 additions and 209 deletions

View File

@@ -182,29 +182,6 @@ export const ActionBar = memo(
</Tooltip.Root>
)}
{isSubflowBlock && (
<Tooltip.Root>
<Tooltip.Trigger asChild>
<Button
variant='ghost'
onClick={(e) => {
e.stopPropagation()
if (!disabled) {
collaborativeBatchToggleBlockEnabled([blockId])
}
}}
className={ACTION_BUTTON_STYLES}
disabled={disabled}
>
{isEnabled ? <Circle className={ICON_SIZE} /> : <CircleOff className={ICON_SIZE} />}
</Button>
</Tooltip.Trigger>
<Tooltip.Content side='top'>
{getTooltipMessage(isEnabled ? 'Disable Block' : 'Enable Block')}
</Tooltip.Content>
</Tooltip.Root>
)}
{canRunFromBlock && (
<Tooltip.Root>
<Tooltip.Trigger asChild>

View File

@@ -15,7 +15,11 @@ import { LoopOrchestrator } from '@/executor/orchestrators/loop'
import { NodeExecutionOrchestrator } from '@/executor/orchestrators/node'
import { ParallelOrchestrator } from '@/executor/orchestrators/parallel'
import type { BlockState, ExecutionContext, ExecutionResult } from '@/executor/types'
import { computeDirtySet, validateRunFromBlock } from '@/executor/utils/run-from-block'
import {
computeDirtySet,
type RunFromBlockContext,
validateRunFromBlock,
} from '@/executor/utils/run-from-block'
import {
buildResolutionFromBlock,
buildStartBlockOutput,
@@ -134,6 +138,29 @@ export class DAGExecutor {
dirtyBlocks: Array.from(dirtySet),
})
// For convergent blocks in the dirty set, remove incoming edges from non-dirty sources.
// This ensures that a dirty block waiting on multiple inputs doesn't wait for non-dirty
// upstream blocks (whose outputs are already cached).
for (const nodeId of dirtySet) {
const node = dag.nodes.get(nodeId)
if (!node) continue
const nonDirtyIncoming: string[] = []
for (const sourceId of node.incomingEdges) {
if (!dirtySet.has(sourceId)) {
nonDirtyIncoming.push(sourceId)
}
}
for (const sourceId of nonDirtyIncoming) {
node.incomingEdges.delete(sourceId)
logger.debug('Removed non-dirty incoming edge for run-from-block', {
nodeId,
sourceId,
})
}
}
// Create context with snapshot state + runFromBlockContext
const runFromBlockContext = { startBlockId, dirtySet }
const { context, state } = this.createExecutionContext(workflowId, undefined, {
@@ -170,7 +197,7 @@ export class DAGExecutor {
triggerBlockId?: string,
overrides?: {
snapshotState?: SerializableExecutionState
runFromBlockContext?: { startBlockId: string; dirtySet: Set<string> }
runFromBlockContext?: RunFromBlockContext
}
): { context: ExecutionContext; state: ExecutionState } {
const snapshotState = overrides?.snapshotState ?? this.contextExtensions.snapshotState

View File

@@ -1,5 +1,6 @@
import type { Edge } from 'reactflow'
import type { BlockLog, BlockState, NormalizedBlockOutput } from '@/executor/types'
import type { RunFromBlockContext } from '@/executor/utils/run-from-block'
import type { SubflowType } from '@/stores/workflows/workflow/types'
export interface ExecutionMetadata {
@@ -110,10 +111,7 @@ export interface ContextExtensions {
* Run-from-block configuration. When provided, executor runs in partial
* execution mode starting from the specified block.
*/
runFromBlockContext?: {
startBlockId: string
dirtySet: Set<string>
}
runFromBlockContext?: RunFromBlockContext
}
export interface WorkflowInput {

View File

@@ -1,6 +1,7 @@
import type { TraceSpan } from '@/lib/logs/types'
import type { PermissionGroupConfig } from '@/lib/permission-groups/types'
import type { BlockOutput } from '@/blocks/types'
import type { RunFromBlockContext } from '@/executor/utils/run-from-block'
import type { SerializedBlock, SerializedWorkflow } from '@/serializer/types'
export interface UserFile {
@@ -255,10 +256,7 @@ export interface ExecutionContext {
* Context for "run from block" mode. When present, only blocks in dirtySet
* will be executed; others return cached outputs from the source snapshot.
*/
runFromBlockContext?: {
startBlockId: string
dirtySet: Set<string>
}
runFromBlockContext?: RunFromBlockContext
}
export interface ExecutionResult {

View File

@@ -66,7 +66,7 @@ function createDAG(nodes: DAGNode[]): DAG {
}
describe('computeDirtySet', () => {
it.concurrent('includes start block in dirty set', () => {
it('includes start block in dirty set', () => {
const dag = createDAG([createNode('A'), createNode('B'), createNode('C')])
const dirtySet = computeDirtySet(dag, 'B')
@@ -74,7 +74,7 @@ describe('computeDirtySet', () => {
expect(dirtySet.has('B')).toBe(true)
})
it.concurrent('includes all downstream blocks in linear workflow', () => {
it('includes all downstream blocks in linear workflow', () => {
// A → B → C → D
const dag = createDAG([
createNode('A', [{ target: 'B' }]),
@@ -92,7 +92,7 @@ describe('computeDirtySet', () => {
expect(dirtySet.size).toBe(3)
})
it.concurrent('handles branching paths', () => {
it('handles branching paths', () => {
// A → B → C
// ↓
// D → E
@@ -114,7 +114,7 @@ describe('computeDirtySet', () => {
expect(dirtySet.size).toBe(4)
})
it.concurrent('handles convergence points', () => {
it('handles convergence points', () => {
// A → C
// B → C → D
const dag = createDAG([
@@ -134,7 +134,7 @@ describe('computeDirtySet', () => {
expect(dirtySet.size).toBe(3)
})
it.concurrent('handles diamond pattern', () => {
it('handles diamond pattern', () => {
// B
// ↗ ↘
// A D
@@ -156,7 +156,7 @@ describe('computeDirtySet', () => {
expect(dirtySet.size).toBe(4)
})
it.concurrent('stops at graph boundaries', () => {
it('stops at graph boundaries', () => {
// A → B C → D (disconnected)
const dag = createDAG([
createNode('A', [{ target: 'B' }]),
@@ -174,7 +174,7 @@ describe('computeDirtySet', () => {
expect(dirtySet.size).toBe(2)
})
it.concurrent('handles single node workflow', () => {
it('handles single node workflow', () => {
const dag = createDAG([createNode('A')])
const dirtySet = computeDirtySet(dag, 'A')
@@ -183,7 +183,7 @@ describe('computeDirtySet', () => {
expect(dirtySet.size).toBe(1)
})
it.concurrent('handles node not in DAG gracefully', () => {
it('handles node not in DAG gracefully', () => {
const dag = createDAG([createNode('A'), createNode('B')])
const dirtySet = computeDirtySet(dag, 'nonexistent')
@@ -192,10 +192,31 @@ describe('computeDirtySet', () => {
expect(dirtySet.has('nonexistent')).toBe(true)
expect(dirtySet.size).toBe(1)
})
it('includes convergent block when running from one branch of parallel', () => {
// Parallel branches converging:
// A → B → D
// A → C → D
// Running from B should include B and D (but not A or C)
const dag = createDAG([
createNode('A', [{ target: 'B' }, { target: 'C' }]),
createNode('B', [{ target: 'D' }]),
createNode('C', [{ target: 'D' }]),
createNode('D'),
])
const dirtySet = computeDirtySet(dag, 'B')
expect(dirtySet.has('A')).toBe(false)
expect(dirtySet.has('B')).toBe(true)
expect(dirtySet.has('C')).toBe(false)
expect(dirtySet.has('D')).toBe(true)
expect(dirtySet.size).toBe(2)
})
})
describe('validateRunFromBlock', () => {
it.concurrent('accepts valid block', () => {
it('accepts valid block', () => {
const dag = createDAG([createNode('A'), createNode('B')])
const executedBlocks = new Set(['A', 'B'])
@@ -205,7 +226,7 @@ describe('validateRunFromBlock', () => {
expect(result.error).toBeUndefined()
})
it.concurrent('rejects block not found in DAG', () => {
it('rejects block not found in DAG', () => {
const dag = createDAG([createNode('A')])
const executedBlocks = new Set(['A', 'B'])
@@ -215,7 +236,7 @@ describe('validateRunFromBlock', () => {
expect(result.error).toContain('Block not found')
})
it.concurrent('rejects blocks inside loops', () => {
it('rejects blocks inside loops', () => {
const dag = createDAG([createNode('A', [], { isLoopNode: true, loopId: 'loop-1' })])
const executedBlocks = new Set(['A'])
@@ -226,7 +247,7 @@ describe('validateRunFromBlock', () => {
expect(result.error).toContain('loop-1')
})
it.concurrent('rejects blocks inside parallels', () => {
it('rejects blocks inside parallels', () => {
const dag = createDAG([createNode('A', [], { isParallelBranch: true, parallelId: 'parallel-1' })])
const executedBlocks = new Set(['A'])
@@ -237,7 +258,7 @@ describe('validateRunFromBlock', () => {
expect(result.error).toContain('parallel-1')
})
it.concurrent('rejects sentinel nodes', () => {
it('rejects sentinel nodes', () => {
const dag = createDAG([createNode('A', [], { isSentinel: true, sentinelType: 'start' })])
const executedBlocks = new Set(['A'])
@@ -247,7 +268,7 @@ describe('validateRunFromBlock', () => {
expect(result.error).toContain('sentinel')
})
it.concurrent('rejects unexecuted blocks', () => {
it('rejects unexecuted blocks', () => {
const dag = createDAG([createNode('A'), createNode('B')])
const executedBlocks = new Set(['A']) // B was not executed
@@ -257,7 +278,7 @@ describe('validateRunFromBlock', () => {
expect(result.error).toContain('was not executed')
})
it.concurrent('accepts regular executed block', () => {
it('accepts regular executed block', () => {
const dag = createDAG([
createNode('trigger', [{ target: 'A' }]),
createNode('A', [{ target: 'B' }]),

View File

@@ -6,6 +6,80 @@ import type { SubflowType } from '@/stores/workflows/workflow/types'
const logger = createLogger('useExecutionStream')
/**
* Processes SSE events from a response body and invokes appropriate callbacks.
*/
async function processSSEStream(
reader: ReadableStreamDefaultReader<Uint8Array>,
callbacks: ExecutionStreamCallbacks,
logPrefix: string
): Promise<void> {
const decoder = new TextDecoder()
let buffer = ''
try {
while (true) {
const { done, value } = await reader.read()
if (done) break
buffer += decoder.decode(value, { stream: true })
const lines = buffer.split('\n\n')
buffer = lines.pop() || ''
for (const line of lines) {
if (!line.trim() || !line.startsWith('data: ')) continue
const data = line.substring(6).trim()
if (data === '[DONE]') {
logger.info(`${logPrefix} stream completed`)
continue
}
try {
const event = JSON.parse(data) as ExecutionEvent
switch (event.type) {
case 'execution:started':
callbacks.onExecutionStarted?.(event.data)
break
case 'execution:completed':
callbacks.onExecutionCompleted?.(event.data)
break
case 'execution:error':
callbacks.onExecutionError?.(event.data)
break
case 'execution:cancelled':
callbacks.onExecutionCancelled?.(event.data)
break
case 'block:started':
callbacks.onBlockStarted?.(event.data)
break
case 'block:completed':
callbacks.onBlockCompleted?.(event.data)
break
case 'block:error':
callbacks.onBlockError?.(event.data)
break
case 'stream:chunk':
callbacks.onStreamChunk?.(event.data)
break
case 'stream:done':
callbacks.onStreamDone?.(event.data)
break
default:
logger.warn('Unknown event type:', (event as any).type)
}
} catch (error) {
logger.error('Failed to parse SSE event:', error, { data })
}
}
}
} finally {
reader.releaseLock()
}
}
export interface ExecutionStreamCallbacks {
onExecutionStarted?: (data: { startTime: string }) => void
onExecutionCompleted?: (data: {
@@ -127,91 +201,7 @@ export function useExecutionStream() {
}
const reader = response.body.getReader()
const decoder = new TextDecoder()
let buffer = ''
try {
while (true) {
const { done, value } = await reader.read()
if (done) {
break
}
buffer += decoder.decode(value, { stream: true })
const lines = buffer.split('\n\n')
buffer = lines.pop() || ''
for (const line of lines) {
if (!line.trim() || !line.startsWith('data: ')) {
continue
}
const data = line.substring(6).trim()
if (data === '[DONE]') {
logger.info('Stream completed')
continue
}
try {
const event = JSON.parse(data) as ExecutionEvent
logger.info('📡 SSE Event received:', {
type: event.type,
executionId: event.executionId,
data: event.data,
})
switch (event.type) {
case 'execution:started':
logger.info('🚀 Execution started')
callbacks.onExecutionStarted?.(event.data)
break
case 'execution:completed':
logger.info('✅ Execution completed')
callbacks.onExecutionCompleted?.(event.data)
break
case 'execution:error':
logger.error('❌ Execution error')
callbacks.onExecutionError?.(event.data)
break
case 'execution:cancelled':
logger.warn('🛑 Execution cancelled')
callbacks.onExecutionCancelled?.(event.data)
break
case 'block:started':
logger.info('🔷 Block started:', event.data.blockId)
callbacks.onBlockStarted?.(event.data)
break
case 'block:completed':
logger.info('✓ Block completed:', event.data.blockId)
callbacks.onBlockCompleted?.(event.data)
break
case 'block:error':
logger.error('✗ Block error:', event.data.blockId)
callbacks.onBlockError?.(event.data)
break
case 'stream:chunk':
callbacks.onStreamChunk?.(event.data)
break
case 'stream:done':
logger.info('Stream done:', event.data.blockId)
callbacks.onStreamDone?.(event.data)
break
default:
logger.warn('Unknown event type:', (event as any).type)
}
} catch (error) {
logger.error('Failed to parse SSE event:', error, { data })
}
}
}
} finally {
reader.releaseLock()
}
await processSSEStream(reader, callbacks, 'Execution')
} catch (error: any) {
if (error.name === 'AbortError') {
logger.info('Execution stream cancelled')
@@ -270,82 +260,7 @@ export function useExecutionStream() {
}
const reader = response.body.getReader()
const decoder = new TextDecoder()
let buffer = ''
try {
while (true) {
const { done, value } = await reader.read()
if (done) {
break
}
buffer += decoder.decode(value, { stream: true })
const lines = buffer.split('\n\n')
buffer = lines.pop() || ''
for (const line of lines) {
if (!line.trim() || !line.startsWith('data: ')) {
continue
}
const data = line.substring(6).trim()
if (data === '[DONE]') {
logger.info('Run-from-block stream completed')
continue
}
try {
const event = JSON.parse(data) as ExecutionEvent
logger.info('📡 Run-from-block SSE Event:', {
type: event.type,
executionId: event.executionId,
})
switch (event.type) {
case 'execution:started':
callbacks.onExecutionStarted?.(event.data)
break
case 'execution:completed':
callbacks.onExecutionCompleted?.(event.data)
break
case 'execution:error':
callbacks.onExecutionError?.(event.data)
break
case 'execution:cancelled':
callbacks.onExecutionCancelled?.(event.data)
break
case 'block:started':
callbacks.onBlockStarted?.(event.data)
break
case 'block:completed':
callbacks.onBlockCompleted?.(event.data)
break
case 'block:error':
callbacks.onBlockError?.(event.data)
break
case 'stream:chunk':
callbacks.onStreamChunk?.(event.data)
break
case 'stream:done':
callbacks.onStreamDone?.(event.data)
break
default:
logger.warn('Unknown event type:', (event as any).type)
}
} catch (error) {
logger.error('Failed to parse SSE event:', error, { data })
}
}
}
} finally {
reader.releaseLock()
}
await processSSEStream(reader, callbacks, 'Run-from-block')
} catch (error: any) {
if (error.name === 'AbortError') {
logger.info('Run-from-block execution cancelled')