diff --git a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/action-bar/action-bar.tsx b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/action-bar/action-bar.tsx index f8a816a32..1976d9e81 100644 --- a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/action-bar/action-bar.tsx +++ b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/action-bar/action-bar.tsx @@ -182,29 +182,6 @@ export const ActionBar = memo( )} - {isSubflowBlock && ( - - - - - - {getTooltipMessage(isEnabled ? 'Disable Block' : 'Enable Block')} - - - )} - {canRunFromBlock && ( diff --git a/apps/sim/executor/execution/executor.ts b/apps/sim/executor/execution/executor.ts index 3351aaff8..021fda022 100644 --- a/apps/sim/executor/execution/executor.ts +++ b/apps/sim/executor/execution/executor.ts @@ -15,7 +15,11 @@ import { LoopOrchestrator } from '@/executor/orchestrators/loop' import { NodeExecutionOrchestrator } from '@/executor/orchestrators/node' import { ParallelOrchestrator } from '@/executor/orchestrators/parallel' import type { BlockState, ExecutionContext, ExecutionResult } from '@/executor/types' -import { computeDirtySet, validateRunFromBlock } from '@/executor/utils/run-from-block' +import { + computeDirtySet, + type RunFromBlockContext, + validateRunFromBlock, +} from '@/executor/utils/run-from-block' import { buildResolutionFromBlock, buildStartBlockOutput, @@ -134,6 +138,29 @@ export class DAGExecutor { dirtyBlocks: Array.from(dirtySet), }) + // For convergent blocks in the dirty set, remove incoming edges from non-dirty sources. + // This ensures that a dirty block waiting on multiple inputs doesn't wait for non-dirty + // upstream blocks (whose outputs are already cached). + for (const nodeId of dirtySet) { + const node = dag.nodes.get(nodeId) + if (!node) continue + + const nonDirtyIncoming: string[] = [] + for (const sourceId of node.incomingEdges) { + if (!dirtySet.has(sourceId)) { + nonDirtyIncoming.push(sourceId) + } + } + + for (const sourceId of nonDirtyIncoming) { + node.incomingEdges.delete(sourceId) + logger.debug('Removed non-dirty incoming edge for run-from-block', { + nodeId, + sourceId, + }) + } + } + // Create context with snapshot state + runFromBlockContext const runFromBlockContext = { startBlockId, dirtySet } const { context, state } = this.createExecutionContext(workflowId, undefined, { @@ -170,7 +197,7 @@ export class DAGExecutor { triggerBlockId?: string, overrides?: { snapshotState?: SerializableExecutionState - runFromBlockContext?: { startBlockId: string; dirtySet: Set } + runFromBlockContext?: RunFromBlockContext } ): { context: ExecutionContext; state: ExecutionState } { const snapshotState = overrides?.snapshotState ?? this.contextExtensions.snapshotState diff --git a/apps/sim/executor/execution/types.ts b/apps/sim/executor/execution/types.ts index 73e6e11ba..9a4ffb691 100644 --- a/apps/sim/executor/execution/types.ts +++ b/apps/sim/executor/execution/types.ts @@ -1,5 +1,6 @@ import type { Edge } from 'reactflow' import type { BlockLog, BlockState, NormalizedBlockOutput } from '@/executor/types' +import type { RunFromBlockContext } from '@/executor/utils/run-from-block' import type { SubflowType } from '@/stores/workflows/workflow/types' export interface ExecutionMetadata { @@ -110,10 +111,7 @@ export interface ContextExtensions { * Run-from-block configuration. When provided, executor runs in partial * execution mode starting from the specified block. */ - runFromBlockContext?: { - startBlockId: string - dirtySet: Set - } + runFromBlockContext?: RunFromBlockContext } export interface WorkflowInput { diff --git a/apps/sim/executor/types.ts b/apps/sim/executor/types.ts index aa4e05523..35ff1c3c0 100644 --- a/apps/sim/executor/types.ts +++ b/apps/sim/executor/types.ts @@ -1,6 +1,7 @@ import type { TraceSpan } from '@/lib/logs/types' import type { PermissionGroupConfig } from '@/lib/permission-groups/types' import type { BlockOutput } from '@/blocks/types' +import type { RunFromBlockContext } from '@/executor/utils/run-from-block' import type { SerializedBlock, SerializedWorkflow } from '@/serializer/types' export interface UserFile { @@ -255,10 +256,7 @@ export interface ExecutionContext { * Context for "run from block" mode. When present, only blocks in dirtySet * will be executed; others return cached outputs from the source snapshot. */ - runFromBlockContext?: { - startBlockId: string - dirtySet: Set - } + runFromBlockContext?: RunFromBlockContext } export interface ExecutionResult { diff --git a/apps/sim/executor/utils/run-from-block.test.ts b/apps/sim/executor/utils/run-from-block.test.ts index 2e66d9fdf..687e86f0d 100644 --- a/apps/sim/executor/utils/run-from-block.test.ts +++ b/apps/sim/executor/utils/run-from-block.test.ts @@ -66,7 +66,7 @@ function createDAG(nodes: DAGNode[]): DAG { } describe('computeDirtySet', () => { - it.concurrent('includes start block in dirty set', () => { + it('includes start block in dirty set', () => { const dag = createDAG([createNode('A'), createNode('B'), createNode('C')]) const dirtySet = computeDirtySet(dag, 'B') @@ -74,7 +74,7 @@ describe('computeDirtySet', () => { expect(dirtySet.has('B')).toBe(true) }) - it.concurrent('includes all downstream blocks in linear workflow', () => { + it('includes all downstream blocks in linear workflow', () => { // A → B → C → D const dag = createDAG([ createNode('A', [{ target: 'B' }]), @@ -92,7 +92,7 @@ describe('computeDirtySet', () => { expect(dirtySet.size).toBe(3) }) - it.concurrent('handles branching paths', () => { + it('handles branching paths', () => { // A → B → C // ↓ // D → E @@ -114,7 +114,7 @@ describe('computeDirtySet', () => { expect(dirtySet.size).toBe(4) }) - it.concurrent('handles convergence points', () => { + it('handles convergence points', () => { // A → C // B → C → D const dag = createDAG([ @@ -134,7 +134,7 @@ describe('computeDirtySet', () => { expect(dirtySet.size).toBe(3) }) - it.concurrent('handles diamond pattern', () => { + it('handles diamond pattern', () => { // B // ↗ ↘ // A D @@ -156,7 +156,7 @@ describe('computeDirtySet', () => { expect(dirtySet.size).toBe(4) }) - it.concurrent('stops at graph boundaries', () => { + it('stops at graph boundaries', () => { // A → B C → D (disconnected) const dag = createDAG([ createNode('A', [{ target: 'B' }]), @@ -174,7 +174,7 @@ describe('computeDirtySet', () => { expect(dirtySet.size).toBe(2) }) - it.concurrent('handles single node workflow', () => { + it('handles single node workflow', () => { const dag = createDAG([createNode('A')]) const dirtySet = computeDirtySet(dag, 'A') @@ -183,7 +183,7 @@ describe('computeDirtySet', () => { expect(dirtySet.size).toBe(1) }) - it.concurrent('handles node not in DAG gracefully', () => { + it('handles node not in DAG gracefully', () => { const dag = createDAG([createNode('A'), createNode('B')]) const dirtySet = computeDirtySet(dag, 'nonexistent') @@ -192,10 +192,31 @@ describe('computeDirtySet', () => { expect(dirtySet.has('nonexistent')).toBe(true) expect(dirtySet.size).toBe(1) }) + + it('includes convergent block when running from one branch of parallel', () => { + // Parallel branches converging: + // A → B → D + // A → C → D + // Running from B should include B and D (but not A or C) + const dag = createDAG([ + createNode('A', [{ target: 'B' }, { target: 'C' }]), + createNode('B', [{ target: 'D' }]), + createNode('C', [{ target: 'D' }]), + createNode('D'), + ]) + + const dirtySet = computeDirtySet(dag, 'B') + + expect(dirtySet.has('A')).toBe(false) + expect(dirtySet.has('B')).toBe(true) + expect(dirtySet.has('C')).toBe(false) + expect(dirtySet.has('D')).toBe(true) + expect(dirtySet.size).toBe(2) + }) }) describe('validateRunFromBlock', () => { - it.concurrent('accepts valid block', () => { + it('accepts valid block', () => { const dag = createDAG([createNode('A'), createNode('B')]) const executedBlocks = new Set(['A', 'B']) @@ -205,7 +226,7 @@ describe('validateRunFromBlock', () => { expect(result.error).toBeUndefined() }) - it.concurrent('rejects block not found in DAG', () => { + it('rejects block not found in DAG', () => { const dag = createDAG([createNode('A')]) const executedBlocks = new Set(['A', 'B']) @@ -215,7 +236,7 @@ describe('validateRunFromBlock', () => { expect(result.error).toContain('Block not found') }) - it.concurrent('rejects blocks inside loops', () => { + it('rejects blocks inside loops', () => { const dag = createDAG([createNode('A', [], { isLoopNode: true, loopId: 'loop-1' })]) const executedBlocks = new Set(['A']) @@ -226,7 +247,7 @@ describe('validateRunFromBlock', () => { expect(result.error).toContain('loop-1') }) - it.concurrent('rejects blocks inside parallels', () => { + it('rejects blocks inside parallels', () => { const dag = createDAG([createNode('A', [], { isParallelBranch: true, parallelId: 'parallel-1' })]) const executedBlocks = new Set(['A']) @@ -237,7 +258,7 @@ describe('validateRunFromBlock', () => { expect(result.error).toContain('parallel-1') }) - it.concurrent('rejects sentinel nodes', () => { + it('rejects sentinel nodes', () => { const dag = createDAG([createNode('A', [], { isSentinel: true, sentinelType: 'start' })]) const executedBlocks = new Set(['A']) @@ -247,7 +268,7 @@ describe('validateRunFromBlock', () => { expect(result.error).toContain('sentinel') }) - it.concurrent('rejects unexecuted blocks', () => { + it('rejects unexecuted blocks', () => { const dag = createDAG([createNode('A'), createNode('B')]) const executedBlocks = new Set(['A']) // B was not executed @@ -257,7 +278,7 @@ describe('validateRunFromBlock', () => { expect(result.error).toContain('was not executed') }) - it.concurrent('accepts regular executed block', () => { + it('accepts regular executed block', () => { const dag = createDAG([ createNode('trigger', [{ target: 'A' }]), createNode('A', [{ target: 'B' }]), diff --git a/apps/sim/hooks/use-execution-stream.ts b/apps/sim/hooks/use-execution-stream.ts index ae5ab2d04..bd36cb55d 100644 --- a/apps/sim/hooks/use-execution-stream.ts +++ b/apps/sim/hooks/use-execution-stream.ts @@ -6,6 +6,80 @@ import type { SubflowType } from '@/stores/workflows/workflow/types' const logger = createLogger('useExecutionStream') +/** + * Processes SSE events from a response body and invokes appropriate callbacks. + */ +async function processSSEStream( + reader: ReadableStreamDefaultReader, + callbacks: ExecutionStreamCallbacks, + logPrefix: string +): Promise { + const decoder = new TextDecoder() + let buffer = '' + + try { + while (true) { + const { done, value } = await reader.read() + + if (done) break + + buffer += decoder.decode(value, { stream: true }) + const lines = buffer.split('\n\n') + buffer = lines.pop() || '' + + for (const line of lines) { + if (!line.trim() || !line.startsWith('data: ')) continue + + const data = line.substring(6).trim() + if (data === '[DONE]') { + logger.info(`${logPrefix} stream completed`) + continue + } + + try { + const event = JSON.parse(data) as ExecutionEvent + + switch (event.type) { + case 'execution:started': + callbacks.onExecutionStarted?.(event.data) + break + case 'execution:completed': + callbacks.onExecutionCompleted?.(event.data) + break + case 'execution:error': + callbacks.onExecutionError?.(event.data) + break + case 'execution:cancelled': + callbacks.onExecutionCancelled?.(event.data) + break + case 'block:started': + callbacks.onBlockStarted?.(event.data) + break + case 'block:completed': + callbacks.onBlockCompleted?.(event.data) + break + case 'block:error': + callbacks.onBlockError?.(event.data) + break + case 'stream:chunk': + callbacks.onStreamChunk?.(event.data) + break + case 'stream:done': + callbacks.onStreamDone?.(event.data) + break + default: + logger.warn('Unknown event type:', (event as any).type) + } + } catch (error) { + logger.error('Failed to parse SSE event:', error, { data }) + } + } + } + } finally { + reader.releaseLock() + } +} + export interface ExecutionStreamCallbacks { onExecutionStarted?: (data: { startTime: string }) => void onExecutionCompleted?: (data: { @@ -127,91 +201,7 @@ export function useExecutionStream() { } const reader = response.body.getReader() - const decoder = new TextDecoder() - let buffer = '' - - try { - while (true) { - const { done, value } = await reader.read() - - if (done) { - break - } - - buffer += decoder.decode(value, { stream: true }) - - const lines = buffer.split('\n\n') - - buffer = lines.pop() || '' - - for (const line of lines) { - if (!line.trim() || !line.startsWith('data: ')) { - continue - } - - const data = line.substring(6).trim() - - if (data === '[DONE]') { - logger.info('Stream completed') - continue - } - - try { - const event = JSON.parse(data) as ExecutionEvent - - logger.info('📡 SSE Event received:', { - type: event.type, - executionId: event.executionId, - data: event.data, - }) - - switch (event.type) { - case 'execution:started': - logger.info('🚀 Execution started') - callbacks.onExecutionStarted?.(event.data) - break - case 'execution:completed': - logger.info('✅ Execution completed') - callbacks.onExecutionCompleted?.(event.data) - break - case 'execution:error': - logger.error('❌ Execution error') - callbacks.onExecutionError?.(event.data) - break - case 'execution:cancelled': - logger.warn('🛑 Execution cancelled') - callbacks.onExecutionCancelled?.(event.data) - break - case 'block:started': - logger.info('🔷 Block started:', event.data.blockId) - callbacks.onBlockStarted?.(event.data) - break - case 'block:completed': - logger.info('✓ Block completed:', event.data.blockId) - callbacks.onBlockCompleted?.(event.data) - break - case 'block:error': - logger.error('✗ Block error:', event.data.blockId) - callbacks.onBlockError?.(event.data) - break - case 'stream:chunk': - callbacks.onStreamChunk?.(event.data) - break - case 'stream:done': - logger.info('Stream done:', event.data.blockId) - callbacks.onStreamDone?.(event.data) - break - default: - logger.warn('Unknown event type:', (event as any).type) - } - } catch (error) { - logger.error('Failed to parse SSE event:', error, { data }) - } - } - } - } finally { - reader.releaseLock() - } + await processSSEStream(reader, callbacks, 'Execution') } catch (error: any) { if (error.name === 'AbortError') { logger.info('Execution stream cancelled') @@ -270,82 +260,7 @@ export function useExecutionStream() { } const reader = response.body.getReader() - const decoder = new TextDecoder() - let buffer = '' - - try { - while (true) { - const { done, value } = await reader.read() - - if (done) { - break - } - - buffer += decoder.decode(value, { stream: true }) - - const lines = buffer.split('\n\n') - - buffer = lines.pop() || '' - - for (const line of lines) { - if (!line.trim() || !line.startsWith('data: ')) { - continue - } - - const data = line.substring(6).trim() - - if (data === '[DONE]') { - logger.info('Run-from-block stream completed') - continue - } - - try { - const event = JSON.parse(data) as ExecutionEvent - - logger.info('📡 Run-from-block SSE Event:', { - type: event.type, - executionId: event.executionId, - }) - - switch (event.type) { - case 'execution:started': - callbacks.onExecutionStarted?.(event.data) - break - case 'execution:completed': - callbacks.onExecutionCompleted?.(event.data) - break - case 'execution:error': - callbacks.onExecutionError?.(event.data) - break - case 'execution:cancelled': - callbacks.onExecutionCancelled?.(event.data) - break - case 'block:started': - callbacks.onBlockStarted?.(event.data) - break - case 'block:completed': - callbacks.onBlockCompleted?.(event.data) - break - case 'block:error': - callbacks.onBlockError?.(event.data) - break - case 'stream:chunk': - callbacks.onStreamChunk?.(event.data) - break - case 'stream:done': - callbacks.onStreamDone?.(event.data) - break - default: - logger.warn('Unknown event type:', (event as any).type) - } - } catch (error) { - logger.error('Failed to parse SSE event:', error, { data }) - } - } - } - } finally { - reader.releaseLock() - } + await processSSEStream(reader, callbacks, 'Run-from-block') } catch (error: any) { if (error.name === 'AbortError') { logger.info('Run-from-block execution cancelled')