From c14c614e3373262e227f1c557d68d139dca9ae8a Mon Sep 17 00:00:00 2001 From: Siddharth Ganesan Date: Tue, 27 Jan 2026 12:55:27 -0800 Subject: [PATCH] Consolidation --- .../[id]/execute-from-block/route.ts | 386 ++++++------------ .../lib/workflows/executor/execution-core.ts | 18 +- 2 files changed, 145 insertions(+), 259 deletions(-) diff --git a/apps/sim/app/api/workflows/[id]/execute-from-block/route.ts b/apps/sim/app/api/workflows/[id]/execute-from-block/route.ts index 88d6de179..d7d284a06 100644 --- a/apps/sim/app/api/workflows/[id]/execute-from-block/route.ts +++ b/apps/sim/app/api/workflows/[id]/execute-from-block/route.ts @@ -7,18 +7,14 @@ import { z } from 'zod' import { checkHybridAuth } from '@/lib/auth/hybrid' import { generateRequestId } from '@/lib/core/utils/request' import { SSE_HEADERS } from '@/lib/core/utils/sse' -import { getPersonalAndWorkspaceEnv } from '@/lib/environment/utils' -import { clearExecutionCancellation, markExecutionCancelled } from '@/lib/execution/cancellation' +import { markExecutionCancelled } from '@/lib/execution/cancellation' import { LoggingSession } from '@/lib/logs/execution/logging-session' -import { buildTraceSpans } from '@/lib/logs/execution/trace-spans/trace-spans' +import { executeWorkflowCore } from '@/lib/workflows/executor/execution-core' import { type ExecutionEvent, encodeSSEEvent } from '@/lib/workflows/executor/execution-events' -import { loadWorkflowFromNormalizedTables } from '@/lib/workflows/persistence/utils' -import { DAGExecutor } from '@/executor/execution/executor' +import { ExecutionSnapshot } from '@/executor/execution/snapshot' import type { IterationContext, SerializableExecutionState } from '@/executor/execution/types' import type { NormalizedBlockOutput } from '@/executor/types' import { hasExecutionResult } from '@/executor/utils/errors' -import { Serializer } from '@/serializer' -import { mergeSubblockState } from '@/stores/workflows/server-utils' const logger = createLogger('ExecuteFromBlockAPI') @@ -43,12 +39,6 @@ const ExecuteFromBlockSchema = z.object({ export const runtime = 'nodejs' export const dynamic = 'force-dynamic' -/** - * POST /api/workflows/[id]/execute-from-block - * - * Executes a workflow starting from a specific block using cached outputs - * for upstream/unaffected blocks from the source snapshot. - */ export async function POST(req: NextRequest, { params }: { params: Promise<{ id: string }> }) { const requestId = generateRequestId() const { id: workflowId } = await params @@ -83,17 +73,8 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id: } const { startBlockId, sourceSnapshot } = validation.data - - logger.info(`[${requestId}] Starting run-from-block execution`, { - workflowId, - userId, - startBlockId, - executedBlocksCount: sourceSnapshot.executedBlocks.length, - }) - const executionId = uuidv4() - // Load workflow record to get workspaceId const [workflowRecord] = await db .select({ workspaceId: workflowTable.workspaceId, userId: workflowTable.userId }) .from(workflowTable) @@ -107,44 +88,13 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id: const workspaceId = workflowRecord.workspaceId const workflowUserId = workflowRecord.userId - // Initialize logging session for cost tracking - const loggingSession = new LoggingSession(workflowId, executionId, 'manual', requestId) - - // Load workflow state - const workflowData = await loadWorkflowFromNormalizedTables(workflowId) - if (!workflowData) { - return NextResponse.json({ error: 'Workflow state not found' }, { status: 404 }) - } - - const { blocks, edges, loops, parallels } = workflowData - - // Merge block states - const mergedStates = mergeSubblockState(blocks) - - // Get environment variables - const { personalDecrypted, workspaceDecrypted } = await getPersonalAndWorkspaceEnv( - userId, - workspaceId - ) - const decryptedEnvVars: Record = { ...personalDecrypted, ...workspaceDecrypted } - - // Serialize workflow - const serializedWorkflow = new Serializer().serializeWorkflow( - mergedStates, - edges, - loops, - parallels, - true - ) - - // Start logging session - await loggingSession.safeStart({ - userId, - workspaceId, - variables: {}, + logger.info(`[${requestId}] Starting run-from-block execution`, { + workflowId, + startBlockId, + executedBlocksCount: sourceSnapshot.executedBlocks.length, }) - const encoder = new TextEncoder() + const loggingSession = new LoggingSession(workflowId, executionId, 'manual', requestId) const abortController = new AbortController() let isStreamClosed = false @@ -152,7 +102,6 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id: async start(controller) { const sendEvent = (event: ExecutionEvent) => { if (isStreamClosed) return - try { controller.enqueue(encodeSSEEvent(event)) } catch { @@ -160,6 +109,18 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id: } } + const snapshot = new ExecutionSnapshot({ + requestId, + workflowId, + userId, + executionId, + triggerType: 'manual', + workspaceId, + workflowUserId, + useDraftState: true, + isClientSession: true, + }) + try { const startTime = new Date() @@ -168,220 +129,141 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id: timestamp: startTime.toISOString(), executionId, workflowId, - data: { - startTime: startTime.toISOString(), - }, + data: { startTime: startTime.toISOString() }, }) - const onBlockStart = async ( - blockId: string, - blockName: string, - blockType: string, - iterationContext?: IterationContext - ) => { - sendEvent({ - type: 'block:started', - timestamp: new Date().toISOString(), - executionId, - workflowId, - data: { - blockId, - blockName, - blockType, - ...(iterationContext && { - iterationCurrent: iterationContext.iterationCurrent, - iterationTotal: iterationContext.iterationTotal, - iterationType: iterationContext.iterationType, - }), - }, - }) - } - - const onBlockComplete = async ( - blockId: string, - blockName: string, - blockType: string, - callbackData: { input?: unknown; output: NormalizedBlockOutput; executionTime: number }, - iterationContext?: IterationContext - ) => { - // Log to session for cost tracking - await loggingSession.onBlockComplete(blockId, blockName, blockType, callbackData) - - const hasError = (callbackData.output as any)?.error - - if (hasError) { - sendEvent({ - type: 'block:error', - timestamp: new Date().toISOString(), - executionId, - workflowId, - data: { - blockId, - blockName, - blockType, - input: callbackData.input, - error: (callbackData.output as any).error, - durationMs: callbackData.executionTime || 0, - ...(iterationContext && { - iterationCurrent: iterationContext.iterationCurrent, - iterationTotal: iterationContext.iterationTotal, - iterationType: iterationContext.iterationType, - }), - }, - }) - } else { - sendEvent({ - type: 'block:completed', - timestamp: new Date().toISOString(), - executionId, - workflowId, - data: { - blockId, - blockName, - blockType, - input: callbackData.input, - output: callbackData.output, - durationMs: callbackData.executionTime || 0, - ...(iterationContext && { - iterationCurrent: iterationContext.iterationCurrent, - iterationTotal: iterationContext.iterationTotal, - iterationType: iterationContext.iterationType, - }), - }, - }) - } - } - - const onStream = async (streamingExecution: unknown) => { - const streamingExec = streamingExecution as { stream: ReadableStream; execution: any } - const blockId = streamingExec.execution?.blockId - - const reader = streamingExec.stream.getReader() - const decoder = new TextDecoder() - - try { - while (true) { - const { done, value } = await reader.read() - if (done) break - - const chunk = decoder.decode(value, { stream: true }) + const result = await executeWorkflowCore({ + snapshot, + loggingSession, + abortSignal: abortController.signal, + runFromBlock: { + startBlockId, + sourceSnapshot: sourceSnapshot as SerializableExecutionState, + }, + callbacks: { + onBlockStart: async ( + blockId: string, + blockName: string, + blockType: string, + iterationContext?: IterationContext + ) => { sendEvent({ - type: 'stream:chunk', + type: 'block:started', timestamp: new Date().toISOString(), executionId, workflowId, - data: { blockId, chunk }, + data: { + blockId, + blockName, + blockType, + ...(iterationContext && { + iterationCurrent: iterationContext.iterationCurrent, + iterationTotal: iterationContext.iterationTotal, + iterationType: iterationContext.iterationType, + }), + }, }) - } + }, + onBlockComplete: async ( + blockId: string, + blockName: string, + blockType: string, + callbackData: { + input?: unknown + output: NormalizedBlockOutput + executionTime: number + }, + iterationContext?: IterationContext + ) => { + const hasError = (callbackData.output as any)?.error + sendEvent({ + type: hasError ? 'block:error' : 'block:completed', + timestamp: new Date().toISOString(), + executionId, + workflowId, + data: { + blockId, + blockName, + blockType, + input: callbackData.input, + ...(hasError + ? { error: (callbackData.output as any).error } + : { output: callbackData.output }), + durationMs: callbackData.executionTime || 0, + ...(iterationContext && { + iterationCurrent: iterationContext.iterationCurrent, + iterationTotal: iterationContext.iterationTotal, + iterationType: iterationContext.iterationType, + }), + }, + }) + }, + onStream: async (streamingExecution: unknown) => { + const streamingExec = streamingExecution as { + stream: ReadableStream + execution: any + } + const blockId = streamingExec.execution?.blockId + const reader = streamingExec.stream.getReader() + const decoder = new TextDecoder() - sendEvent({ - type: 'stream:done', - timestamp: new Date().toISOString(), - executionId, - workflowId, - data: { blockId }, - }) - } catch (error) { - logger.error(`[${requestId}] Error streaming block content:`, error) - } finally { - try { - reader.releaseLock() - } catch {} - } - } - - // Create executor and run from block - const executor = new DAGExecutor({ - workflow: serializedWorkflow, - envVarValues: decryptedEnvVars, - workflowInput: {}, - workflowVariables: {}, - contextExtensions: { - stream: true, - executionId, - workspaceId, - userId, - isDeployedContext: false, - onBlockStart, - onBlockComplete, - onStream, - abortSignal: abortController.signal, + try { + while (true) { + const { done, value } = await reader.read() + if (done) break + const chunk = decoder.decode(value, { stream: true }) + sendEvent({ + type: 'stream:chunk', + timestamp: new Date().toISOString(), + executionId, + workflowId, + data: { blockId, chunk }, + }) + } + sendEvent({ + type: 'stream:done', + timestamp: new Date().toISOString(), + executionId, + workflowId, + data: { blockId }, + }) + } finally { + try { + reader.releaseLock() + } catch {} + } + }, }, }) - const result = await executor.executeFromBlock( - workflowId, - startBlockId, - sourceSnapshot as SerializableExecutionState - ) - - // Build trace spans from fresh execution logs only - // Trace spans show what actually executed in this run - const { traceSpans, totalDuration } = buildTraceSpans(result) - if (result.status === 'cancelled') { - await loggingSession.safeCompleteWithCancellation({ - endedAt: new Date().toISOString(), - totalDurationMs: totalDuration || 0, - traceSpans: traceSpans || [], - }) - await clearExecutionCancellation(executionId) - sendEvent({ type: 'execution:cancelled', timestamp: new Date().toISOString(), executionId, workflowId, + data: { duration: result.metadata?.duration || 0 }, + }) + } else { + sendEvent({ + type: 'execution:completed', + timestamp: new Date().toISOString(), + executionId, + workflowId, data: { + success: result.success, + output: result.output, duration: result.metadata?.duration || 0, + startTime: result.metadata?.startTime || startTime.toISOString(), + endTime: result.metadata?.endTime || new Date().toISOString(), }, }) - return } - - // Complete logging session - await loggingSession.safeComplete({ - endedAt: new Date().toISOString(), - totalDurationMs: totalDuration || 0, - finalOutput: result.output || {}, - traceSpans: traceSpans || [], - workflowInput: {}, - }) - await clearExecutionCancellation(executionId) - - sendEvent({ - type: 'execution:completed', - timestamp: new Date().toISOString(), - executionId, - workflowId, - data: { - success: result.success, - output: result.output, - duration: result.metadata?.duration || 0, - startTime: result.metadata?.startTime || startTime.toISOString(), - endTime: result.metadata?.endTime || new Date().toISOString(), - }, - }) } catch (error: unknown) { const errorMessage = error instanceof Error ? error.message : 'Unknown error' logger.error(`[${requestId}] Run-from-block execution failed: ${errorMessage}`) const executionResult = hasExecutionResult(error) ? error.executionResult : undefined - const { traceSpans } = executionResult - ? buildTraceSpans(executionResult) - : { traceSpans: [] } - - // Complete logging session with error - await loggingSession.safeCompleteWithError({ - endedAt: new Date().toISOString(), - totalDurationMs: executionResult?.metadata?.duration || 0, - error: { - message: errorMessage, - stackTrace: error instanceof Error ? error.stack : undefined, - }, - traceSpans, - }) - await clearExecutionCancellation(executionId) sendEvent({ type: 'execution:error', @@ -396,27 +278,21 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id: } finally { if (!isStreamClosed) { try { - controller.enqueue(encoder.encode('data: [DONE]\n\n')) + controller.enqueue(new TextEncoder().encode('data: [DONE]\n\n')) controller.close() - } catch { - // Stream already closed - } + } catch {} } } }, cancel() { isStreamClosed = true - logger.info(`[${requestId}] Client aborted SSE stream, signalling cancellation`) abortController.abort() markExecutionCancelled(executionId).catch(() => {}) }, }) return new NextResponse(stream, { - headers: { - ...SSE_HEADERS, - 'X-Execution-Id': executionId, - }, + headers: { ...SSE_HEADERS, 'X-Execution-Id': executionId }, }) } catch (error: unknown) { const errorMessage = error instanceof Error ? error.message : 'Unknown error' diff --git a/apps/sim/lib/workflows/executor/execution-core.ts b/apps/sim/lib/workflows/executor/execution-core.ts index 2ce87873f..350e06b2f 100644 --- a/apps/sim/lib/workflows/executor/execution-core.ts +++ b/apps/sim/lib/workflows/executor/execution-core.ts @@ -22,6 +22,7 @@ import type { ContextExtensions, ExecutionCallbacks, IterationContext, + SerializableExecutionState, } from '@/executor/execution/types' import type { ExecutionResult, NormalizedBlockOutput } from '@/executor/types' import { hasExecutionResult } from '@/executor/utils/errors' @@ -41,6 +42,11 @@ export interface ExecuteWorkflowCoreOptions { includeFileBase64?: boolean base64MaxBytes?: number stopAfterBlockId?: string + /** Run-from-block mode: execute starting from a specific block using cached upstream outputs */ + runFromBlock?: { + startBlockId: string + sourceSnapshot: SerializableExecutionState + } } function parseVariableValueByType(value: unknown, type: string): unknown { @@ -116,6 +122,7 @@ export async function executeWorkflowCore( includeFileBase64, base64MaxBytes, stopAfterBlockId, + runFromBlock, } = options const { metadata, workflow, input, workflowVariables, selectedOutputs } = snapshot const { requestId, workflowId, userId, triggerType, executionId, triggerBlockId, useDraftState } = @@ -322,10 +329,13 @@ export async function executeWorkflowCore( } } - const result = (await executorInstance.execute( - workflowId, - resolvedTriggerBlockId - )) as ExecutionResult + const result = runFromBlock + ? ((await executorInstance.executeFromBlock( + workflowId, + runFromBlock.startBlockId, + runFromBlock.sourceSnapshot + )) as ExecutionResult) + : ((await executorInstance.execute(workflowId, resolvedTriggerBlockId)) as ExecutionResult) // Build trace spans for logging from the full execution result const { traceSpans, totalDuration } = buildTraceSpans(result)