From f55f6cc4532608b4317703bfbafba7572ba40cdd Mon Sep 17 00:00:00 2001 From: Siddharth Ganesan Date: Tue, 27 Jan 2026 14:08:32 -0800 Subject: [PATCH] Fix lint --- .../[id]/execute-from-block/route.ts | 122 ++-------------- .../workflows/executor/execution-events.ts | 137 ++++++++++++++++++ 2 files changed, 153 insertions(+), 106 deletions(-) diff --git a/apps/sim/app/api/workflows/[id]/execute-from-block/route.ts b/apps/sim/app/api/workflows/[id]/execute-from-block/route.ts index d7d284a06..88b0521f2 100644 --- a/apps/sim/app/api/workflows/[id]/execute-from-block/route.ts +++ b/apps/sim/app/api/workflows/[id]/execute-from-block/route.ts @@ -10,10 +10,9 @@ import { SSE_HEADERS } from '@/lib/core/utils/sse' import { markExecutionCancelled } from '@/lib/execution/cancellation' import { LoggingSession } from '@/lib/logs/execution/logging-session' import { executeWorkflowCore } from '@/lib/workflows/executor/execution-core' -import { type ExecutionEvent, encodeSSEEvent } from '@/lib/workflows/executor/execution-events' +import { createSSECallbacks } from '@/lib/workflows/executor/execution-events' import { ExecutionSnapshot } from '@/executor/execution/snapshot' -import type { IterationContext, SerializableExecutionState } from '@/executor/execution/types' -import type { NormalizedBlockOutput } from '@/executor/types' +import type { ExecutionMetadata, SerializableExecutionState } from '@/executor/execution/types' import { hasExecutionResult } from '@/executor/utils/errors' const logger = createLogger('ExecuteFromBlockAPI') @@ -100,16 +99,17 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id: const stream = new ReadableStream({ async start(controller) { - const sendEvent = (event: ExecutionEvent) => { - if (isStreamClosed) return - try { - controller.enqueue(encodeSSEEvent(event)) - } catch { + const { sendEvent, onBlockStart, onBlockComplete, onStream } = createSSECallbacks({ + executionId, + workflowId, + controller, + isStreamClosed: () => isStreamClosed, + setStreamClosed: () => { isStreamClosed = true - } - } + }, + }) - const snapshot = new ExecutionSnapshot({ + const metadata: ExecutionMetadata = { requestId, workflowId, userId, @@ -119,7 +119,10 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id: workflowUserId, useDraftState: true, isClientSession: true, - }) + startTime: new Date().toISOString(), + } + + const snapshot = new ExecutionSnapshot(metadata, {}, {}, {}) try { const startTime = new Date() @@ -140,100 +143,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id: startBlockId, sourceSnapshot: sourceSnapshot as SerializableExecutionState, }, - callbacks: { - onBlockStart: async ( - blockId: string, - blockName: string, - blockType: string, - iterationContext?: IterationContext - ) => { - sendEvent({ - type: 'block:started', - timestamp: new Date().toISOString(), - executionId, - workflowId, - data: { - blockId, - blockName, - blockType, - ...(iterationContext && { - iterationCurrent: iterationContext.iterationCurrent, - iterationTotal: iterationContext.iterationTotal, - iterationType: iterationContext.iterationType, - }), - }, - }) - }, - onBlockComplete: async ( - blockId: string, - blockName: string, - blockType: string, - callbackData: { - input?: unknown - output: NormalizedBlockOutput - executionTime: number - }, - iterationContext?: IterationContext - ) => { - const hasError = (callbackData.output as any)?.error - sendEvent({ - type: hasError ? 'block:error' : 'block:completed', - timestamp: new Date().toISOString(), - executionId, - workflowId, - data: { - blockId, - blockName, - blockType, - input: callbackData.input, - ...(hasError - ? { error: (callbackData.output as any).error } - : { output: callbackData.output }), - durationMs: callbackData.executionTime || 0, - ...(iterationContext && { - iterationCurrent: iterationContext.iterationCurrent, - iterationTotal: iterationContext.iterationTotal, - iterationType: iterationContext.iterationType, - }), - }, - }) - }, - onStream: async (streamingExecution: unknown) => { - const streamingExec = streamingExecution as { - stream: ReadableStream - execution: any - } - const blockId = streamingExec.execution?.blockId - const reader = streamingExec.stream.getReader() - const decoder = new TextDecoder() - - try { - while (true) { - const { done, value } = await reader.read() - if (done) break - const chunk = decoder.decode(value, { stream: true }) - sendEvent({ - type: 'stream:chunk', - timestamp: new Date().toISOString(), - executionId, - workflowId, - data: { blockId, chunk }, - }) - } - sendEvent({ - type: 'stream:done', - timestamp: new Date().toISOString(), - executionId, - workflowId, - data: { blockId }, - }) - } finally { - try { - reader.releaseLock() - } catch {} - } - }, - }, + callbacks: { onBlockStart, onBlockComplete, onStream }, }) if (result.status === 'cancelled') { diff --git a/apps/sim/lib/workflows/executor/execution-events.ts b/apps/sim/lib/workflows/executor/execution-events.ts index 291c119b7..436df010e 100644 --- a/apps/sim/lib/workflows/executor/execution-events.ts +++ b/apps/sim/lib/workflows/executor/execution-events.ts @@ -180,3 +180,140 @@ export function formatSSEEvent(event: ExecutionEvent): string { export function encodeSSEEvent(event: ExecutionEvent): Uint8Array { return new TextEncoder().encode(formatSSEEvent(event)) } + +/** + * Options for creating SSE execution callbacks + */ +export interface SSECallbackOptions { + executionId: string + workflowId: string + controller: ReadableStreamDefaultController + isStreamClosed: () => boolean + setStreamClosed: () => void +} + +/** + * Creates SSE callbacks for workflow execution streaming + */ +export function createSSECallbacks(options: SSECallbackOptions) { + const { executionId, workflowId, controller, isStreamClosed, setStreamClosed } = options + + const sendEvent = (event: ExecutionEvent) => { + if (isStreamClosed()) return + try { + controller.enqueue(encodeSSEEvent(event)) + } catch { + setStreamClosed() + } + } + + const onBlockStart = async ( + blockId: string, + blockName: string, + blockType: string, + iterationContext?: { iterationCurrent: number; iterationTotal: number; iterationType: string } + ) => { + sendEvent({ + type: 'block:started', + timestamp: new Date().toISOString(), + executionId, + workflowId, + data: { + blockId, + blockName, + blockType, + ...(iterationContext && { + iterationCurrent: iterationContext.iterationCurrent, + iterationTotal: iterationContext.iterationTotal, + iterationType: iterationContext.iterationType as any, + }), + }, + }) + } + + const onBlockComplete = async ( + blockId: string, + blockName: string, + blockType: string, + callbackData: { input?: unknown; output: any; executionTime: number }, + iterationContext?: { iterationCurrent: number; iterationTotal: number; iterationType: string } + ) => { + const hasError = callbackData.output?.error + const iterationData = iterationContext + ? { + iterationCurrent: iterationContext.iterationCurrent, + iterationTotal: iterationContext.iterationTotal, + iterationType: iterationContext.iterationType as any, + } + : {} + + if (hasError) { + sendEvent({ + type: 'block:error', + timestamp: new Date().toISOString(), + executionId, + workflowId, + data: { + blockId, + blockName, + blockType, + input: callbackData.input, + error: callbackData.output.error, + durationMs: callbackData.executionTime || 0, + ...iterationData, + }, + }) + } else { + sendEvent({ + type: 'block:completed', + timestamp: new Date().toISOString(), + executionId, + workflowId, + data: { + blockId, + blockName, + blockType, + input: callbackData.input, + output: callbackData.output, + durationMs: callbackData.executionTime || 0, + ...iterationData, + }, + }) + } + } + + const onStream = async (streamingExecution: unknown) => { + const streamingExec = streamingExecution as { stream: ReadableStream; execution: any } + const blockId = streamingExec.execution?.blockId + const reader = streamingExec.stream.getReader() + const decoder = new TextDecoder() + + try { + while (true) { + const { done, value } = await reader.read() + if (done) break + const chunk = decoder.decode(value, { stream: true }) + sendEvent({ + type: 'stream:chunk', + timestamp: new Date().toISOString(), + executionId, + workflowId, + data: { blockId, chunk }, + }) + } + sendEvent({ + type: 'stream:done', + timestamp: new Date().toISOString(), + executionId, + workflowId, + data: { blockId }, + }) + } finally { + try { + reader.releaseLock() + } catch {} + } + } + + return { sendEvent, onBlockStart, onBlockComplete, onStream } +}