Consolidation

This commit is contained in:
Siddharth Ganesan
2026-01-27 12:55:27 -08:00
parent 415acda403
commit c14c614e33
2 changed files with 145 additions and 259 deletions

View File

@@ -7,18 +7,14 @@ import { z } from 'zod'
import { checkHybridAuth } from '@/lib/auth/hybrid'
import { generateRequestId } from '@/lib/core/utils/request'
import { SSE_HEADERS } from '@/lib/core/utils/sse'
import { getPersonalAndWorkspaceEnv } from '@/lib/environment/utils'
import { clearExecutionCancellation, markExecutionCancelled } from '@/lib/execution/cancellation'
import { markExecutionCancelled } from '@/lib/execution/cancellation'
import { LoggingSession } from '@/lib/logs/execution/logging-session'
import { buildTraceSpans } from '@/lib/logs/execution/trace-spans/trace-spans'
import { executeWorkflowCore } from '@/lib/workflows/executor/execution-core'
import { type ExecutionEvent, encodeSSEEvent } from '@/lib/workflows/executor/execution-events'
import { loadWorkflowFromNormalizedTables } from '@/lib/workflows/persistence/utils'
import { DAGExecutor } from '@/executor/execution/executor'
import { ExecutionSnapshot } from '@/executor/execution/snapshot'
import type { IterationContext, SerializableExecutionState } from '@/executor/execution/types'
import type { NormalizedBlockOutput } from '@/executor/types'
import { hasExecutionResult } from '@/executor/utils/errors'
import { Serializer } from '@/serializer'
import { mergeSubblockState } from '@/stores/workflows/server-utils'
const logger = createLogger('ExecuteFromBlockAPI')
@@ -43,12 +39,6 @@ const ExecuteFromBlockSchema = z.object({
export const runtime = 'nodejs'
export const dynamic = 'force-dynamic'
/**
* POST /api/workflows/[id]/execute-from-block
*
* Executes a workflow starting from a specific block using cached outputs
* for upstream/unaffected blocks from the source snapshot.
*/
export async function POST(req: NextRequest, { params }: { params: Promise<{ id: string }> }) {
const requestId = generateRequestId()
const { id: workflowId } = await params
@@ -83,17 +73,8 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
}
const { startBlockId, sourceSnapshot } = validation.data
logger.info(`[${requestId}] Starting run-from-block execution`, {
workflowId,
userId,
startBlockId,
executedBlocksCount: sourceSnapshot.executedBlocks.length,
})
const executionId = uuidv4()
// Load workflow record to get workspaceId
const [workflowRecord] = await db
.select({ workspaceId: workflowTable.workspaceId, userId: workflowTable.userId })
.from(workflowTable)
@@ -107,44 +88,13 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
const workspaceId = workflowRecord.workspaceId
const workflowUserId = workflowRecord.userId
// Initialize logging session for cost tracking
const loggingSession = new LoggingSession(workflowId, executionId, 'manual', requestId)
// Load workflow state
const workflowData = await loadWorkflowFromNormalizedTables(workflowId)
if (!workflowData) {
return NextResponse.json({ error: 'Workflow state not found' }, { status: 404 })
}
const { blocks, edges, loops, parallels } = workflowData
// Merge block states
const mergedStates = mergeSubblockState(blocks)
// Get environment variables
const { personalDecrypted, workspaceDecrypted } = await getPersonalAndWorkspaceEnv(
userId,
workspaceId
)
const decryptedEnvVars: Record<string, string> = { ...personalDecrypted, ...workspaceDecrypted }
// Serialize workflow
const serializedWorkflow = new Serializer().serializeWorkflow(
mergedStates,
edges,
loops,
parallels,
true
)
// Start logging session
await loggingSession.safeStart({
userId,
workspaceId,
variables: {},
logger.info(`[${requestId}] Starting run-from-block execution`, {
workflowId,
startBlockId,
executedBlocksCount: sourceSnapshot.executedBlocks.length,
})
const encoder = new TextEncoder()
const loggingSession = new LoggingSession(workflowId, executionId, 'manual', requestId)
const abortController = new AbortController()
let isStreamClosed = false
@@ -152,7 +102,6 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
async start(controller) {
const sendEvent = (event: ExecutionEvent) => {
if (isStreamClosed) return
try {
controller.enqueue(encodeSSEEvent(event))
} catch {
@@ -160,6 +109,18 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
}
}
const snapshot = new ExecutionSnapshot({
requestId,
workflowId,
userId,
executionId,
triggerType: 'manual',
workspaceId,
workflowUserId,
useDraftState: true,
isClientSession: true,
})
try {
const startTime = new Date()
@@ -168,220 +129,141 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
timestamp: startTime.toISOString(),
executionId,
workflowId,
data: {
startTime: startTime.toISOString(),
},
data: { startTime: startTime.toISOString() },
})
const onBlockStart = async (
blockId: string,
blockName: string,
blockType: string,
iterationContext?: IterationContext
) => {
sendEvent({
type: 'block:started',
timestamp: new Date().toISOString(),
executionId,
workflowId,
data: {
blockId,
blockName,
blockType,
...(iterationContext && {
iterationCurrent: iterationContext.iterationCurrent,
iterationTotal: iterationContext.iterationTotal,
iterationType: iterationContext.iterationType,
}),
},
})
}
const onBlockComplete = async (
blockId: string,
blockName: string,
blockType: string,
callbackData: { input?: unknown; output: NormalizedBlockOutput; executionTime: number },
iterationContext?: IterationContext
) => {
// Log to session for cost tracking
await loggingSession.onBlockComplete(blockId, blockName, blockType, callbackData)
const hasError = (callbackData.output as any)?.error
if (hasError) {
sendEvent({
type: 'block:error',
timestamp: new Date().toISOString(),
executionId,
workflowId,
data: {
blockId,
blockName,
blockType,
input: callbackData.input,
error: (callbackData.output as any).error,
durationMs: callbackData.executionTime || 0,
...(iterationContext && {
iterationCurrent: iterationContext.iterationCurrent,
iterationTotal: iterationContext.iterationTotal,
iterationType: iterationContext.iterationType,
}),
},
})
} else {
sendEvent({
type: 'block:completed',
timestamp: new Date().toISOString(),
executionId,
workflowId,
data: {
blockId,
blockName,
blockType,
input: callbackData.input,
output: callbackData.output,
durationMs: callbackData.executionTime || 0,
...(iterationContext && {
iterationCurrent: iterationContext.iterationCurrent,
iterationTotal: iterationContext.iterationTotal,
iterationType: iterationContext.iterationType,
}),
},
})
}
}
const onStream = async (streamingExecution: unknown) => {
const streamingExec = streamingExecution as { stream: ReadableStream; execution: any }
const blockId = streamingExec.execution?.blockId
const reader = streamingExec.stream.getReader()
const decoder = new TextDecoder()
try {
while (true) {
const { done, value } = await reader.read()
if (done) break
const chunk = decoder.decode(value, { stream: true })
const result = await executeWorkflowCore({
snapshot,
loggingSession,
abortSignal: abortController.signal,
runFromBlock: {
startBlockId,
sourceSnapshot: sourceSnapshot as SerializableExecutionState,
},
callbacks: {
onBlockStart: async (
blockId: string,
blockName: string,
blockType: string,
iterationContext?: IterationContext
) => {
sendEvent({
type: 'stream:chunk',
type: 'block:started',
timestamp: new Date().toISOString(),
executionId,
workflowId,
data: { blockId, chunk },
data: {
blockId,
blockName,
blockType,
...(iterationContext && {
iterationCurrent: iterationContext.iterationCurrent,
iterationTotal: iterationContext.iterationTotal,
iterationType: iterationContext.iterationType,
}),
},
})
}
},
onBlockComplete: async (
blockId: string,
blockName: string,
blockType: string,
callbackData: {
input?: unknown
output: NormalizedBlockOutput
executionTime: number
},
iterationContext?: IterationContext
) => {
const hasError = (callbackData.output as any)?.error
sendEvent({
type: hasError ? 'block:error' : 'block:completed',
timestamp: new Date().toISOString(),
executionId,
workflowId,
data: {
blockId,
blockName,
blockType,
input: callbackData.input,
...(hasError
? { error: (callbackData.output as any).error }
: { output: callbackData.output }),
durationMs: callbackData.executionTime || 0,
...(iterationContext && {
iterationCurrent: iterationContext.iterationCurrent,
iterationTotal: iterationContext.iterationTotal,
iterationType: iterationContext.iterationType,
}),
},
})
},
onStream: async (streamingExecution: unknown) => {
const streamingExec = streamingExecution as {
stream: ReadableStream
execution: any
}
const blockId = streamingExec.execution?.blockId
const reader = streamingExec.stream.getReader()
const decoder = new TextDecoder()
sendEvent({
type: 'stream:done',
timestamp: new Date().toISOString(),
executionId,
workflowId,
data: { blockId },
})
} catch (error) {
logger.error(`[${requestId}] Error streaming block content:`, error)
} finally {
try {
reader.releaseLock()
} catch {}
}
}
// Create executor and run from block
const executor = new DAGExecutor({
workflow: serializedWorkflow,
envVarValues: decryptedEnvVars,
workflowInput: {},
workflowVariables: {},
contextExtensions: {
stream: true,
executionId,
workspaceId,
userId,
isDeployedContext: false,
onBlockStart,
onBlockComplete,
onStream,
abortSignal: abortController.signal,
try {
while (true) {
const { done, value } = await reader.read()
if (done) break
const chunk = decoder.decode(value, { stream: true })
sendEvent({
type: 'stream:chunk',
timestamp: new Date().toISOString(),
executionId,
workflowId,
data: { blockId, chunk },
})
}
sendEvent({
type: 'stream:done',
timestamp: new Date().toISOString(),
executionId,
workflowId,
data: { blockId },
})
} finally {
try {
reader.releaseLock()
} catch {}
}
},
},
})
const result = await executor.executeFromBlock(
workflowId,
startBlockId,
sourceSnapshot as SerializableExecutionState
)
// Build trace spans from fresh execution logs only
// Trace spans show what actually executed in this run
const { traceSpans, totalDuration } = buildTraceSpans(result)
if (result.status === 'cancelled') {
await loggingSession.safeCompleteWithCancellation({
endedAt: new Date().toISOString(),
totalDurationMs: totalDuration || 0,
traceSpans: traceSpans || [],
})
await clearExecutionCancellation(executionId)
sendEvent({
type: 'execution:cancelled',
timestamp: new Date().toISOString(),
executionId,
workflowId,
data: { duration: result.metadata?.duration || 0 },
})
} else {
sendEvent({
type: 'execution:completed',
timestamp: new Date().toISOString(),
executionId,
workflowId,
data: {
success: result.success,
output: result.output,
duration: result.metadata?.duration || 0,
startTime: result.metadata?.startTime || startTime.toISOString(),
endTime: result.metadata?.endTime || new Date().toISOString(),
},
})
return
}
// Complete logging session
await loggingSession.safeComplete({
endedAt: new Date().toISOString(),
totalDurationMs: totalDuration || 0,
finalOutput: result.output || {},
traceSpans: traceSpans || [],
workflowInput: {},
})
await clearExecutionCancellation(executionId)
sendEvent({
type: 'execution:completed',
timestamp: new Date().toISOString(),
executionId,
workflowId,
data: {
success: result.success,
output: result.output,
duration: result.metadata?.duration || 0,
startTime: result.metadata?.startTime || startTime.toISOString(),
endTime: result.metadata?.endTime || new Date().toISOString(),
},
})
} catch (error: unknown) {
const errorMessage = error instanceof Error ? error.message : 'Unknown error'
logger.error(`[${requestId}] Run-from-block execution failed: ${errorMessage}`)
const executionResult = hasExecutionResult(error) ? error.executionResult : undefined
const { traceSpans } = executionResult
? buildTraceSpans(executionResult)
: { traceSpans: [] }
// Complete logging session with error
await loggingSession.safeCompleteWithError({
endedAt: new Date().toISOString(),
totalDurationMs: executionResult?.metadata?.duration || 0,
error: {
message: errorMessage,
stackTrace: error instanceof Error ? error.stack : undefined,
},
traceSpans,
})
await clearExecutionCancellation(executionId)
sendEvent({
type: 'execution:error',
@@ -396,27 +278,21 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
} finally {
if (!isStreamClosed) {
try {
controller.enqueue(encoder.encode('data: [DONE]\n\n'))
controller.enqueue(new TextEncoder().encode('data: [DONE]\n\n'))
controller.close()
} catch {
// Stream already closed
}
} catch {}
}
}
},
cancel() {
isStreamClosed = true
logger.info(`[${requestId}] Client aborted SSE stream, signalling cancellation`)
abortController.abort()
markExecutionCancelled(executionId).catch(() => {})
},
})
return new NextResponse(stream, {
headers: {
...SSE_HEADERS,
'X-Execution-Id': executionId,
},
headers: { ...SSE_HEADERS, 'X-Execution-Id': executionId },
})
} catch (error: unknown) {
const errorMessage = error instanceof Error ? error.message : 'Unknown error'

View File

@@ -22,6 +22,7 @@ import type {
ContextExtensions,
ExecutionCallbacks,
IterationContext,
SerializableExecutionState,
} from '@/executor/execution/types'
import type { ExecutionResult, NormalizedBlockOutput } from '@/executor/types'
import { hasExecutionResult } from '@/executor/utils/errors'
@@ -41,6 +42,11 @@ export interface ExecuteWorkflowCoreOptions {
includeFileBase64?: boolean
base64MaxBytes?: number
stopAfterBlockId?: string
/** Run-from-block mode: execute starting from a specific block using cached upstream outputs */
runFromBlock?: {
startBlockId: string
sourceSnapshot: SerializableExecutionState
}
}
function parseVariableValueByType(value: unknown, type: string): unknown {
@@ -116,6 +122,7 @@ export async function executeWorkflowCore(
includeFileBase64,
base64MaxBytes,
stopAfterBlockId,
runFromBlock,
} = options
const { metadata, workflow, input, workflowVariables, selectedOutputs } = snapshot
const { requestId, workflowId, userId, triggerType, executionId, triggerBlockId, useDraftState } =
@@ -322,10 +329,13 @@ export async function executeWorkflowCore(
}
}
const result = (await executorInstance.execute(
workflowId,
resolvedTriggerBlockId
)) as ExecutionResult
const result = runFromBlock
? ((await executorInstance.executeFromBlock(
workflowId,
runFromBlock.startBlockId,
runFromBlock.sourceSnapshot
)) as ExecutionResult)
: ((await executorInstance.execute(workflowId, resolvedTriggerBlockId)) as ExecutionResult)
// Build trace spans for logging from the full execution result
const { traceSpans, totalDuration } = buildTraceSpans(result)