This commit is contained in:
Siddharth Ganesan
2026-01-27 14:08:32 -08:00
parent c14c614e33
commit f55f6cc453
2 changed files with 153 additions and 106 deletions

View File

@@ -10,10 +10,9 @@ import { SSE_HEADERS } from '@/lib/core/utils/sse'
import { markExecutionCancelled } from '@/lib/execution/cancellation'
import { LoggingSession } from '@/lib/logs/execution/logging-session'
import { executeWorkflowCore } from '@/lib/workflows/executor/execution-core'
import { type ExecutionEvent, encodeSSEEvent } from '@/lib/workflows/executor/execution-events'
import { createSSECallbacks } from '@/lib/workflows/executor/execution-events'
import { ExecutionSnapshot } from '@/executor/execution/snapshot'
import type { IterationContext, SerializableExecutionState } from '@/executor/execution/types'
import type { NormalizedBlockOutput } from '@/executor/types'
import type { ExecutionMetadata, SerializableExecutionState } from '@/executor/execution/types'
import { hasExecutionResult } from '@/executor/utils/errors'
const logger = createLogger('ExecuteFromBlockAPI')
@@ -100,16 +99,17 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
const stream = new ReadableStream<Uint8Array>({
async start(controller) {
const sendEvent = (event: ExecutionEvent) => {
if (isStreamClosed) return
try {
controller.enqueue(encodeSSEEvent(event))
} catch {
const { sendEvent, onBlockStart, onBlockComplete, onStream } = createSSECallbacks({
executionId,
workflowId,
controller,
isStreamClosed: () => isStreamClosed,
setStreamClosed: () => {
isStreamClosed = true
}
}
},
})
const snapshot = new ExecutionSnapshot({
const metadata: ExecutionMetadata = {
requestId,
workflowId,
userId,
@@ -119,7 +119,10 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
workflowUserId,
useDraftState: true,
isClientSession: true,
})
startTime: new Date().toISOString(),
}
const snapshot = new ExecutionSnapshot(metadata, {}, {}, {})
try {
const startTime = new Date()
@@ -140,100 +143,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
startBlockId,
sourceSnapshot: sourceSnapshot as SerializableExecutionState,
},
callbacks: {
onBlockStart: async (
blockId: string,
blockName: string,
blockType: string,
iterationContext?: IterationContext
) => {
sendEvent({
type: 'block:started',
timestamp: new Date().toISOString(),
executionId,
workflowId,
data: {
blockId,
blockName,
blockType,
...(iterationContext && {
iterationCurrent: iterationContext.iterationCurrent,
iterationTotal: iterationContext.iterationTotal,
iterationType: iterationContext.iterationType,
}),
},
})
},
onBlockComplete: async (
blockId: string,
blockName: string,
blockType: string,
callbackData: {
input?: unknown
output: NormalizedBlockOutput
executionTime: number
},
iterationContext?: IterationContext
) => {
const hasError = (callbackData.output as any)?.error
sendEvent({
type: hasError ? 'block:error' : 'block:completed',
timestamp: new Date().toISOString(),
executionId,
workflowId,
data: {
blockId,
blockName,
blockType,
input: callbackData.input,
...(hasError
? { error: (callbackData.output as any).error }
: { output: callbackData.output }),
durationMs: callbackData.executionTime || 0,
...(iterationContext && {
iterationCurrent: iterationContext.iterationCurrent,
iterationTotal: iterationContext.iterationTotal,
iterationType: iterationContext.iterationType,
}),
},
})
},
onStream: async (streamingExecution: unknown) => {
const streamingExec = streamingExecution as {
stream: ReadableStream
execution: any
}
const blockId = streamingExec.execution?.blockId
const reader = streamingExec.stream.getReader()
const decoder = new TextDecoder()
try {
while (true) {
const { done, value } = await reader.read()
if (done) break
const chunk = decoder.decode(value, { stream: true })
sendEvent({
type: 'stream:chunk',
timestamp: new Date().toISOString(),
executionId,
workflowId,
data: { blockId, chunk },
})
}
sendEvent({
type: 'stream:done',
timestamp: new Date().toISOString(),
executionId,
workflowId,
data: { blockId },
})
} finally {
try {
reader.releaseLock()
} catch {}
}
},
},
callbacks: { onBlockStart, onBlockComplete, onStream },
})
if (result.status === 'cancelled') {

View File

@@ -180,3 +180,140 @@ export function formatSSEEvent(event: ExecutionEvent): string {
export function encodeSSEEvent(event: ExecutionEvent): Uint8Array {
return new TextEncoder().encode(formatSSEEvent(event))
}
/**
* Options for creating SSE execution callbacks
*/
export interface SSECallbackOptions {
executionId: string
workflowId: string
controller: ReadableStreamDefaultController<Uint8Array>
isStreamClosed: () => boolean
setStreamClosed: () => void
}
/**
* Creates SSE callbacks for workflow execution streaming
*/
export function createSSECallbacks(options: SSECallbackOptions) {
const { executionId, workflowId, controller, isStreamClosed, setStreamClosed } = options
const sendEvent = (event: ExecutionEvent) => {
if (isStreamClosed()) return
try {
controller.enqueue(encodeSSEEvent(event))
} catch {
setStreamClosed()
}
}
const onBlockStart = async (
blockId: string,
blockName: string,
blockType: string,
iterationContext?: { iterationCurrent: number; iterationTotal: number; iterationType: string }
) => {
sendEvent({
type: 'block:started',
timestamp: new Date().toISOString(),
executionId,
workflowId,
data: {
blockId,
blockName,
blockType,
...(iterationContext && {
iterationCurrent: iterationContext.iterationCurrent,
iterationTotal: iterationContext.iterationTotal,
iterationType: iterationContext.iterationType as any,
}),
},
})
}
const onBlockComplete = async (
blockId: string,
blockName: string,
blockType: string,
callbackData: { input?: unknown; output: any; executionTime: number },
iterationContext?: { iterationCurrent: number; iterationTotal: number; iterationType: string }
) => {
const hasError = callbackData.output?.error
const iterationData = iterationContext
? {
iterationCurrent: iterationContext.iterationCurrent,
iterationTotal: iterationContext.iterationTotal,
iterationType: iterationContext.iterationType as any,
}
: {}
if (hasError) {
sendEvent({
type: 'block:error',
timestamp: new Date().toISOString(),
executionId,
workflowId,
data: {
blockId,
blockName,
blockType,
input: callbackData.input,
error: callbackData.output.error,
durationMs: callbackData.executionTime || 0,
...iterationData,
},
})
} else {
sendEvent({
type: 'block:completed',
timestamp: new Date().toISOString(),
executionId,
workflowId,
data: {
blockId,
blockName,
blockType,
input: callbackData.input,
output: callbackData.output,
durationMs: callbackData.executionTime || 0,
...iterationData,
},
})
}
}
const onStream = async (streamingExecution: unknown) => {
const streamingExec = streamingExecution as { stream: ReadableStream; execution: any }
const blockId = streamingExec.execution?.blockId
const reader = streamingExec.stream.getReader()
const decoder = new TextDecoder()
try {
while (true) {
const { done, value } = await reader.read()
if (done) break
const chunk = decoder.decode(value, { stream: true })
sendEvent({
type: 'stream:chunk',
timestamp: new Date().toISOString(),
executionId,
workflowId,
data: { blockId, chunk },
})
}
sendEvent({
type: 'stream:done',
timestamp: new Date().toISOString(),
executionId,
workflowId,
data: { blockId },
})
} finally {
try {
reader.releaseLock()
} catch {}
}
}
return { sendEvent, onBlockStart, onBlockComplete, onStream }
}