fix(logging): hitl + trigger dev crash protection (#2664)

* hitl gaps

* deal with trigger worker crashes

* cleanup import strcuture
This commit is contained in:
Vikhyath Mondreti
2026-01-02 14:01:01 -08:00
committed by GitHub
parent 79be435918
commit dc3de95c39
15 changed files with 602 additions and 311 deletions

View File

@@ -0,0 +1,90 @@
import { db } from '@sim/db'
import { workflowExecutionLogs } from '@sim/db/schema'
import { createLogger } from '@sim/logger'
import { and, eq, lt, sql } from 'drizzle-orm'
import { type NextRequest, NextResponse } from 'next/server'
import { verifyCronAuth } from '@/lib/auth/internal'
const logger = createLogger('CleanupStaleExecutions')
const STALE_THRESHOLD_MINUTES = 30
export async function GET(request: NextRequest) {
try {
const authError = verifyCronAuth(request, 'Stale execution cleanup')
if (authError) {
return authError
}
logger.info('Starting stale execution cleanup job')
const staleThreshold = new Date(Date.now() - STALE_THRESHOLD_MINUTES * 60 * 1000)
const staleExecutions = await db
.select({
id: workflowExecutionLogs.id,
executionId: workflowExecutionLogs.executionId,
workflowId: workflowExecutionLogs.workflowId,
startedAt: workflowExecutionLogs.startedAt,
})
.from(workflowExecutionLogs)
.where(
and(
eq(workflowExecutionLogs.status, 'running'),
lt(workflowExecutionLogs.startedAt, staleThreshold)
)
)
.limit(100)
logger.info(`Found ${staleExecutions.length} stale executions to clean up`)
let cleaned = 0
let failed = 0
for (const execution of staleExecutions) {
try {
const staleDurationMs = Date.now() - new Date(execution.startedAt).getTime()
const staleDurationMinutes = Math.round(staleDurationMs / 60000)
await db
.update(workflowExecutionLogs)
.set({
status: 'failed',
endedAt: new Date(),
totalDurationMs: staleDurationMs,
executionData: sql`jsonb_set(
COALESCE(execution_data, '{}'::jsonb),
ARRAY['error'],
to_jsonb(${`Execution terminated: worker timeout or crash after ${staleDurationMinutes} minutes`}::text)
)`,
})
.where(eq(workflowExecutionLogs.id, execution.id))
logger.info(`Cleaned up stale execution ${execution.executionId}`, {
workflowId: execution.workflowId,
staleDurationMinutes,
})
cleaned++
} catch (error) {
logger.error(`Failed to clean up execution ${execution.executionId}:`, {
error: error instanceof Error ? error.message : String(error),
})
failed++
}
}
logger.info(`Stale execution cleanup completed. Cleaned: ${cleaned}, Failed: ${failed}`)
return NextResponse.json({
success: true,
found: staleExecutions.length,
cleaned,
failed,
thresholdMinutes: STALE_THRESHOLD_MINUTES,
})
} catch (error) {
logger.error('Error in stale execution cleanup job:', error)
return NextResponse.json({ error: 'Internal server error' }, { status: 500 })
}
}

View File

@@ -23,11 +23,11 @@ import { createStreamingResponse } from '@/lib/workflows/streaming/streaming'
import { createHttpResponseFromBlock, workflowHasResponseBlock } from '@/lib/workflows/utils'
import type { WorkflowExecutionPayload } from '@/background/workflow-execution'
import { normalizeName } from '@/executor/constants'
import { type ExecutionMetadata, ExecutionSnapshot } from '@/executor/execution/snapshot'
import { ExecutionSnapshot } from '@/executor/execution/snapshot'
import type { ExecutionMetadata, IterationContext } from '@/executor/execution/types'
import type { StreamingExecution } from '@/executor/types'
import { Serializer } from '@/serializer'
import { CORE_TRIGGER_TYPES } from '@/stores/logs/filters/types'
import type { SubflowType } from '@/stores/workflows/workflow/types'
const logger = createLogger('WorkflowExecuteAPI')
@@ -541,11 +541,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
blockId: string,
blockName: string,
blockType: string,
iterationContext?: {
iterationCurrent: number
iterationTotal: number
iterationType: SubflowType
}
iterationContext?: IterationContext
) => {
logger.info(`[${requestId}] 🔷 onBlockStart called:`, { blockId, blockName, blockType })
sendEvent({
@@ -571,11 +567,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
blockName: string,
blockType: string,
callbackData: any,
iterationContext?: {
iterationCurrent: number
iterationTotal: number
iterationType: SubflowType
}
iterationContext?: IterationContext
) => {
const hasError = callbackData.output?.error
@@ -713,14 +705,25 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
logger.error(`[${requestId}] Missing snapshot seed for paused execution`, {
executionId,
})
await loggingSession.markAsFailed('Missing snapshot seed for paused execution')
} else {
await PauseResumeManager.persistPauseResult({
workflowId,
executionId,
pausePoints: result.pausePoints || [],
snapshotSeed: result.snapshotSeed,
executorUserId: result.metadata?.userId,
})
try {
await PauseResumeManager.persistPauseResult({
workflowId,
executionId,
pausePoints: result.pausePoints || [],
snapshotSeed: result.snapshotSeed,
executorUserId: result.metadata?.userId,
})
} catch (pauseError) {
logger.error(`[${requestId}] Failed to persist pause result`, {
executionId,
error: pauseError instanceof Error ? pauseError.message : String(pauseError),
})
await loggingSession.markAsFailed(
`Failed to persist pause state: ${pauseError instanceof Error ? pauseError.message : String(pauseError)}`
)
}
}
} else {
await PauseResumeManager.processQueuedResumes(executionId)

View File

@@ -23,7 +23,8 @@ import {
getSubBlockValue,
} from '@/lib/workflows/schedules/utils'
import { REFERENCE } from '@/executor/constants'
import { type ExecutionMetadata, ExecutionSnapshot } from '@/executor/execution/snapshot'
import { ExecutionSnapshot } from '@/executor/execution/snapshot'
import type { ExecutionMetadata } from '@/executor/execution/types'
import type { ExecutionResult } from '@/executor/types'
import { createEnvVarPattern } from '@/executor/utils/reference-validation'
import { mergeSubblockState } from '@/stores/workflows/server-utils'
@@ -285,14 +286,25 @@ async function runWorkflowExecution({
logger.error(`[${requestId}] Missing snapshot seed for paused execution`, {
executionId,
})
await loggingSession.markAsFailed('Missing snapshot seed for paused execution')
} else {
await PauseResumeManager.persistPauseResult({
workflowId: payload.workflowId,
executionId,
pausePoints: executionResult.pausePoints || [],
snapshotSeed: executionResult.snapshotSeed,
executorUserId: executionResult.metadata?.userId,
})
try {
await PauseResumeManager.persistPauseResult({
workflowId: payload.workflowId,
executionId,
pausePoints: executionResult.pausePoints || [],
snapshotSeed: executionResult.snapshotSeed,
executorUserId: executionResult.metadata?.userId,
})
} catch (pauseError) {
logger.error(`[${requestId}] Failed to persist pause result`, {
executionId,
error: pauseError instanceof Error ? pauseError.message : String(pauseError),
})
await loggingSession.markAsFailed(
`Failed to persist pause state: ${pauseError instanceof Error ? pauseError.message : String(pauseError)}`
)
}
}
} else {
await PauseResumeManager.processQueuedResumes(executionId)

View File

@@ -17,7 +17,8 @@ import {
loadWorkflowFromNormalizedTables,
} from '@/lib/workflows/persistence/utils'
import { getWorkflowById } from '@/lib/workflows/utils'
import { type ExecutionMetadata, ExecutionSnapshot } from '@/executor/execution/snapshot'
import { ExecutionSnapshot } from '@/executor/execution/snapshot'
import type { ExecutionMetadata } from '@/executor/execution/types'
import type { ExecutionResult } from '@/executor/types'
import { Serializer } from '@/serializer'
import { mergeSubblockState } from '@/stores/workflows/server-utils'
@@ -268,14 +269,25 @@ async function executeWebhookJobInternal(
logger.error(`[${requestId}] Missing snapshot seed for paused execution`, {
executionId,
})
await loggingSession.markAsFailed('Missing snapshot seed for paused execution')
} else {
await PauseResumeManager.persistPauseResult({
workflowId: payload.workflowId,
executionId,
pausePoints: executionResult.pausePoints || [],
snapshotSeed: executionResult.snapshotSeed,
executorUserId: executionResult.metadata?.userId,
})
try {
await PauseResumeManager.persistPauseResult({
workflowId: payload.workflowId,
executionId,
pausePoints: executionResult.pausePoints || [],
snapshotSeed: executionResult.snapshotSeed,
executorUserId: executionResult.metadata?.userId,
})
} catch (pauseError) {
logger.error(`[${requestId}] Failed to persist pause result`, {
executionId,
error: pauseError instanceof Error ? pauseError.message : String(pauseError),
})
await loggingSession.markAsFailed(
`Failed to persist pause state: ${pauseError instanceof Error ? pauseError.message : String(pauseError)}`
)
}
}
} else {
await PauseResumeManager.processQueuedResumes(executionId)
@@ -509,14 +521,25 @@ async function executeWebhookJobInternal(
logger.error(`[${requestId}] Missing snapshot seed for paused execution`, {
executionId,
})
await loggingSession.markAsFailed('Missing snapshot seed for paused execution')
} else {
await PauseResumeManager.persistPauseResult({
workflowId: payload.workflowId,
executionId,
pausePoints: executionResult.pausePoints || [],
snapshotSeed: executionResult.snapshotSeed,
executorUserId: executionResult.metadata?.userId,
})
try {
await PauseResumeManager.persistPauseResult({
workflowId: payload.workflowId,
executionId,
pausePoints: executionResult.pausePoints || [],
snapshotSeed: executionResult.snapshotSeed,
executorUserId: executionResult.metadata?.userId,
})
} catch (pauseError) {
logger.error(`[${requestId}] Failed to persist pause result`, {
executionId,
error: pauseError instanceof Error ? pauseError.message : String(pauseError),
})
await loggingSession.markAsFailed(
`Failed to persist pause state: ${pauseError instanceof Error ? pauseError.message : String(pauseError)}`
)
}
}
} else {
await PauseResumeManager.processQueuedResumes(executionId)

View File

@@ -7,7 +7,8 @@ import { buildTraceSpans } from '@/lib/logs/execution/trace-spans/trace-spans'
import { executeWorkflowCore } from '@/lib/workflows/executor/execution-core'
import { PauseResumeManager } from '@/lib/workflows/executor/human-in-the-loop-manager'
import { getWorkflowById } from '@/lib/workflows/utils'
import { type ExecutionMetadata, ExecutionSnapshot } from '@/executor/execution/snapshot'
import { ExecutionSnapshot } from '@/executor/execution/snapshot'
import type { ExecutionMetadata } from '@/executor/execution/types'
import type { ExecutionResult } from '@/executor/types'
const logger = createLogger('TriggerWorkflowExecution')
@@ -112,14 +113,25 @@ export async function executeWorkflowJob(payload: WorkflowExecutionPayload) {
logger.error(`[${requestId}] Missing snapshot seed for paused execution`, {
executionId,
})
await loggingSession.markAsFailed('Missing snapshot seed for paused execution')
} else {
await PauseResumeManager.persistPauseResult({
workflowId,
executionId,
pausePoints: result.pausePoints || [],
snapshotSeed: result.snapshotSeed,
executorUserId: result.metadata?.userId,
})
try {
await PauseResumeManager.persistPauseResult({
workflowId,
executionId,
pausePoints: result.pausePoints || [],
snapshotSeed: result.snapshotSeed,
executorUserId: result.metadata?.userId,
})
} catch (pauseError) {
logger.error(`[${requestId}] Failed to persist pause result`, {
executionId,
error: pauseError instanceof Error ? pauseError.message : String(pauseError),
})
await loggingSession.markAsFailed(
`Failed to persist pause state: ${pauseError instanceof Error ? pauseError.message : String(pauseError)}`
)
}
}
} else {
await PauseResumeManager.processQueuedResumes(executionId)

View File

@@ -1,9 +1,6 @@
import type { DAG } from '@/executor/dag/builder'
import {
type ExecutionMetadata,
ExecutionSnapshot,
type SerializableExecutionState,
} from '@/executor/execution/snapshot'
import { ExecutionSnapshot } from '@/executor/execution/snapshot'
import type { ExecutionMetadata, SerializableExecutionState } from '@/executor/execution/types'
import type { ExecutionContext, SerializedSnapshot } from '@/executor/types'
function mapFromEntries<T>(map?: Map<string, T>): Record<string, T> | undefined {

View File

@@ -1,59 +1,4 @@
import type { Edge } from 'reactflow'
import type { BlockLog, BlockState } from '@/executor/types'
export interface ExecutionMetadata {
requestId: string
executionId: string
workflowId: string
workspaceId: string
userId: string
sessionUserId?: string
workflowUserId?: string
triggerType: string
triggerBlockId?: string
useDraftState: boolean
startTime: string
isClientSession?: boolean
pendingBlocks?: string[]
resumeFromSnapshot?: boolean
workflowStateOverride?: {
blocks: Record<string, any>
edges: Edge[]
loops?: Record<string, any>
parallels?: Record<string, any>
deploymentVersionId?: string // ID of deployment version if this is deployed state
}
}
export interface ExecutionCallbacks {
onStream?: (streamingExec: any) => Promise<void>
onBlockStart?: (blockId: string, blockName: string, blockType: string) => Promise<void>
onBlockComplete?: (
blockId: string,
blockName: string,
blockType: string,
output: any
) => Promise<void>
}
export interface SerializableExecutionState {
blockStates: Record<string, BlockState>
executedBlocks: string[]
blockLogs: BlockLog[]
decisions: {
router: Record<string, string>
condition: Record<string, string>
}
completedLoops: string[]
loopExecutions?: Record<string, any>
parallelExecutions?: Record<string, any>
parallelBlockMapping?: Record<string, any>
activeExecutionPath: string[]
pendingQueue?: string[]
remainingEdges?: Edge[]
dagIncomingEdges?: Record<string, string[]>
completedPauseContexts?: string[]
}
import type { ExecutionMetadata, SerializableExecutionState } from '@/executor/execution/types'
export class ExecutionSnapshot {
constructor(

View File

@@ -1,7 +1,68 @@
import type { ExecutionMetadata, SerializableExecutionState } from '@/executor/execution/snapshot'
import type { BlockState, NormalizedBlockOutput } from '@/executor/types'
import type { Edge } from 'reactflow'
import type { BlockLog, BlockState, NormalizedBlockOutput } from '@/executor/types'
import type { SubflowType } from '@/stores/workflows/workflow/types'
export interface ExecutionMetadata {
requestId: string
executionId: string
workflowId: string
workspaceId: string
userId: string
sessionUserId?: string
workflowUserId?: string
triggerType: string
triggerBlockId?: string
useDraftState: boolean
startTime: string
isClientSession?: boolean
pendingBlocks?: string[]
resumeFromSnapshot?: boolean
workflowStateOverride?: {
blocks: Record<string, any>
edges: Edge[]
loops?: Record<string, any>
parallels?: Record<string, any>
deploymentVersionId?: string
}
}
export interface SerializableExecutionState {
blockStates: Record<string, BlockState>
executedBlocks: string[]
blockLogs: BlockLog[]
decisions: {
router: Record<string, string>
condition: Record<string, string>
}
completedLoops: string[]
loopExecutions?: Record<string, any>
parallelExecutions?: Record<string, any>
parallelBlockMapping?: Record<string, any>
activeExecutionPath: string[]
pendingQueue?: string[]
remainingEdges?: Edge[]
dagIncomingEdges?: Record<string, string[]>
completedPauseContexts?: string[]
}
export interface IterationContext {
iterationCurrent: number
iterationTotal: number
iterationType: SubflowType
}
export interface ExecutionCallbacks {
onStream?: (streamingExec: any) => Promise<void>
onBlockStart?: (blockId: string, blockName: string, blockType: string) => Promise<void>
onBlockComplete?: (
blockId: string,
blockName: string,
blockType: string,
output: any,
iterationContext?: IterationContext
) => Promise<void>
}
export interface ContextExtensions {
workspaceId?: string
executionId?: string
@@ -32,22 +93,14 @@ export interface ContextExtensions {
blockId: string,
blockName: string,
blockType: string,
iterationContext?: {
iterationCurrent: number
iterationTotal: number
iterationType: SubflowType
}
iterationContext?: IterationContext
) => Promise<void>
onBlockComplete?: (
blockId: string,
blockName: string,
blockType: string,
output: { input?: any; output: NormalizedBlockOutput; executionTime: number },
iterationContext?: {
iterationCurrent: number
iterationTotal: number
iterationType: SubflowType
}
iterationContext?: IterationContext
) => Promise<void>
}

View File

@@ -313,95 +313,4 @@ describe('ExecutionLogger', () => {
expect(files[0].name).toBe('nested.json')
})
})
describe('cost model merging', () => {
test('should merge cost models correctly', () => {
const loggerInstance = new ExecutionLogger()
const mergeCostModelsMethod = (loggerInstance as any).mergeCostModels.bind(loggerInstance)
const existing = {
'gpt-4': {
input: 0.01,
output: 0.02,
total: 0.03,
tokens: { input: 100, output: 200, total: 300 },
},
}
const additional = {
'gpt-4': {
input: 0.005,
output: 0.01,
total: 0.015,
tokens: { input: 50, output: 100, total: 150 },
},
'gpt-3.5-turbo': {
input: 0.001,
output: 0.002,
total: 0.003,
tokens: { input: 10, output: 20, total: 30 },
},
}
const merged = mergeCostModelsMethod(existing, additional)
expect(merged['gpt-4'].input).toBe(0.015)
expect(merged['gpt-4'].output).toBe(0.03)
expect(merged['gpt-4'].total).toBe(0.045)
expect(merged['gpt-4'].tokens.input).toBe(150)
expect(merged['gpt-4'].tokens.output).toBe(300)
expect(merged['gpt-4'].tokens.total).toBe(450)
expect(merged['gpt-3.5-turbo']).toBeDefined()
expect(merged['gpt-3.5-turbo'].total).toBe(0.003)
})
test('should handle prompt/completion token aliases', () => {
const loggerInstance = new ExecutionLogger()
const mergeCostModelsMethod = (loggerInstance as any).mergeCostModels.bind(loggerInstance)
const existing = {
'gpt-4': {
input: 0.01,
output: 0.02,
total: 0.03,
tokens: { prompt: 100, completion: 200, total: 300 },
},
}
const additional = {
'gpt-4': {
input: 0.005,
output: 0.01,
total: 0.015,
tokens: { input: 50, output: 100, total: 150 },
},
}
const merged = mergeCostModelsMethod(existing, additional)
expect(merged['gpt-4'].tokens.input).toBe(150)
expect(merged['gpt-4'].tokens.output).toBe(300)
})
test('should handle empty existing models', () => {
const loggerInstance = new ExecutionLogger()
const mergeCostModelsMethod = (loggerInstance as any).mergeCostModels.bind(loggerInstance)
const existing = {}
const additional = {
'claude-3': {
input: 0.02,
output: 0.04,
total: 0.06,
tokens: { input: 200, output: 400, total: 600 },
},
}
const merged = mergeCostModelsMethod(existing, additional)
expect(merged['claude-3']).toBeDefined()
expect(merged['claude-3'].total).toBe(0.06)
})
})
})

View File

@@ -9,6 +9,7 @@ import {
import { createLogger } from '@sim/logger'
import { eq, sql } from 'drizzle-orm'
import { v4 as uuidv4 } from 'uuid'
import { BASE_EXECUTION_CHARGE } from '@/lib/billing/constants'
import { getHighestPrioritySubscription } from '@/lib/billing/core/subscription'
import {
checkUsageStatus,
@@ -47,34 +48,6 @@ export interface ToolCall {
const logger = createLogger('ExecutionLogger')
export class ExecutionLogger implements IExecutionLoggerService {
private mergeCostModels(
existing: Record<string, any>,
additional: Record<string, any>
): Record<string, any> {
const merged = { ...existing }
for (const [model, costs] of Object.entries(additional)) {
if (merged[model]) {
merged[model] = {
input: (merged[model].input || 0) + (costs.input || 0),
output: (merged[model].output || 0) + (costs.output || 0),
total: (merged[model].total || 0) + (costs.total || 0),
tokens: {
input:
(merged[model].tokens?.input || merged[model].tokens?.prompt || 0) +
(costs.tokens?.input || costs.tokens?.prompt || 0),
output:
(merged[model].tokens?.output || merged[model].tokens?.completion || 0) +
(costs.tokens?.output || costs.tokens?.completion || 0),
total: (merged[model].tokens?.total || 0) + (costs.tokens?.total || 0),
},
}
} else {
merged[model] = costs
}
}
return merged
}
async startWorkflowExecution(params: {
workflowId: string
workspaceId: string
@@ -158,6 +131,13 @@ export class ExecutionLogger implements IExecutionLoggerService {
environment,
trigger,
},
cost: {
total: BASE_EXECUTION_CHARGE,
input: 0,
output: 0,
tokens: { input: 0, output: 0, total: 0 },
models: {},
},
})
.returning()
@@ -209,7 +189,7 @@ export class ExecutionLogger implements IExecutionLoggerService {
workflowInput?: any
isResume?: boolean
level?: 'info' | 'error'
status?: 'completed' | 'failed' | 'cancelled'
status?: 'completed' | 'failed' | 'cancelled' | 'pending'
}): Promise<WorkflowExecutionLog> {
const {
executionId,
@@ -268,43 +248,19 @@ export class ExecutionLogger implements IExecutionLoggerService {
const redactedTraceSpans = redactApiKeys(filteredTraceSpans)
const redactedFinalOutput = redactApiKeys(filteredFinalOutput)
// Merge costs if resuming
const existingCost = isResume && existingLog?.cost ? existingLog.cost : null
const mergedCost = existingCost
? {
// For resume, add only the model costs, NOT the base execution charge again
total: (existingCost.total || 0) + costSummary.modelCost,
input: (existingCost.input || 0) + costSummary.totalInputCost,
output: (existingCost.output || 0) + costSummary.totalOutputCost,
tokens: {
input:
(existingCost.tokens?.input || existingCost.tokens?.prompt || 0) +
costSummary.totalPromptTokens,
output:
(existingCost.tokens?.output || existingCost.tokens?.completion || 0) +
costSummary.totalCompletionTokens,
total: (existingCost.tokens?.total || 0) + costSummary.totalTokens,
},
models: this.mergeCostModels(existingCost.models || {}, costSummary.models),
}
: {
total: costSummary.totalCost,
input: costSummary.totalInputCost,
output: costSummary.totalOutputCost,
tokens: {
input: costSummary.totalPromptTokens,
output: costSummary.totalCompletionTokens,
total: costSummary.totalTokens,
},
models: costSummary.models,
}
const executionCost = {
total: costSummary.totalCost,
input: costSummary.totalInputCost,
output: costSummary.totalOutputCost,
tokens: {
input: costSummary.totalPromptTokens,
output: costSummary.totalCompletionTokens,
total: costSummary.totalTokens,
},
models: costSummary.models,
}
// Merge files if resuming
const existingFiles = isResume && existingLog?.files ? existingLog.files : []
const mergedFiles = [...existingFiles, ...executionFiles]
// Calculate the actual total duration for resume executions
const actualTotalDuration =
const totalDuration =
isResume && existingLog?.startedAt
? new Date(endedAt).getTime() - new Date(existingLog.startedAt).getTime()
: totalDurationMs
@@ -315,19 +271,19 @@ export class ExecutionLogger implements IExecutionLoggerService {
level,
status,
endedAt: new Date(endedAt),
totalDurationMs: actualTotalDuration,
files: mergedFiles.length > 0 ? mergedFiles : null,
totalDurationMs: totalDuration,
files: executionFiles.length > 0 ? executionFiles : null,
executionData: {
traceSpans: redactedTraceSpans,
finalOutput: redactedFinalOutput,
tokens: {
input: mergedCost.tokens.input,
output: mergedCost.tokens.output,
total: mergedCost.tokens.total,
input: executionCost.tokens.input,
output: executionCost.tokens.output,
total: executionCost.tokens.total,
},
models: mergedCost.models,
models: executionCost.models,
},
cost: mergedCost,
cost: executionCost,
})
.where(eq(workflowExecutionLogs.executionId, executionId))
.returning()

View File

@@ -1,4 +1,7 @@
import { db } from '@sim/db'
import { workflowExecutionLogs } from '@sim/db/schema'
import { createLogger } from '@sim/logger'
import { eq, sql } from 'drizzle-orm'
import { BASE_EXECUTION_CHARGE } from '@/lib/billing/constants'
import { executionLogger } from '@/lib/logs/execution/logger'
import {
@@ -50,6 +53,29 @@ export interface SessionCancelledParams {
traceSpans?: TraceSpan[]
}
export interface SessionPausedParams {
endedAt?: string
totalDurationMs?: number
traceSpans?: TraceSpan[]
workflowInput?: any
}
interface AccumulatedCost {
total: number
input: number
output: number
tokens: { input: number; output: number; total: number }
models: Record<
string,
{
input: number
output: number
total: number
tokens: { input: number; output: number; total: number }
}
>
}
export class LoggingSession {
private workflowId: string
private executionId: string
@@ -60,6 +86,14 @@ export class LoggingSession {
private workflowState?: WorkflowState
private isResume = false
private completed = false
private accumulatedCost: AccumulatedCost = {
total: BASE_EXECUTION_CHARGE,
input: 0,
output: 0,
tokens: { input: 0, output: 0, total: 0 },
models: {},
}
private costFlushed = false
constructor(
workflowId: string,
@@ -73,6 +107,102 @@ export class LoggingSession {
this.requestId = requestId
}
async onBlockComplete(
blockId: string,
blockName: string,
blockType: string,
output: any
): Promise<void> {
if (!output?.cost || typeof output.cost.total !== 'number' || output.cost.total <= 0) {
return
}
const { cost, tokens, model } = output
this.accumulatedCost.total += cost.total || 0
this.accumulatedCost.input += cost.input || 0
this.accumulatedCost.output += cost.output || 0
if (tokens) {
this.accumulatedCost.tokens.input += tokens.input || 0
this.accumulatedCost.tokens.output += tokens.output || 0
this.accumulatedCost.tokens.total += tokens.total || 0
}
if (model) {
if (!this.accumulatedCost.models[model]) {
this.accumulatedCost.models[model] = {
input: 0,
output: 0,
total: 0,
tokens: { input: 0, output: 0, total: 0 },
}
}
this.accumulatedCost.models[model].input += cost.input || 0
this.accumulatedCost.models[model].output += cost.output || 0
this.accumulatedCost.models[model].total += cost.total || 0
if (tokens) {
this.accumulatedCost.models[model].tokens.input += tokens.input || 0
this.accumulatedCost.models[model].tokens.output += tokens.output || 0
this.accumulatedCost.models[model].tokens.total += tokens.total || 0
}
}
await this.flushAccumulatedCost()
}
private async flushAccumulatedCost(): Promise<void> {
try {
await db
.update(workflowExecutionLogs)
.set({
cost: {
total: this.accumulatedCost.total,
input: this.accumulatedCost.input,
output: this.accumulatedCost.output,
tokens: this.accumulatedCost.tokens,
models: this.accumulatedCost.models,
},
})
.where(eq(workflowExecutionLogs.executionId, this.executionId))
this.costFlushed = true
} catch (error) {
logger.error(`Failed to flush accumulated cost for execution ${this.executionId}:`, {
error: error instanceof Error ? error.message : String(error),
})
}
}
private async loadExistingCost(): Promise<void> {
try {
const [existing] = await db
.select({ cost: workflowExecutionLogs.cost })
.from(workflowExecutionLogs)
.where(eq(workflowExecutionLogs.executionId, this.executionId))
.limit(1)
if (existing?.cost) {
const cost = existing.cost as any
this.accumulatedCost = {
total: cost.total || BASE_EXECUTION_CHARGE,
input: cost.input || 0,
output: cost.output || 0,
tokens: {
input: cost.tokens?.input || 0,
output: cost.tokens?.output || 0,
total: cost.tokens?.total || 0,
},
models: cost.models || {},
}
}
} catch (error) {
logger.error(`Failed to load existing cost for execution ${this.executionId}:`, {
error: error instanceof Error ? error.message : String(error),
})
}
}
async start(params: SessionStartParams): Promise<void> {
const { userId, workspaceId, variables, triggerData, skipLogCreation, deploymentVersionId } =
params
@@ -92,7 +222,6 @@ export class LoggingSession {
? await loadDeployedWorkflowStateForLogging(this.workflowId)
: await loadWorkflowStateForExecution(this.workflowId)
// Only create a new log entry if not resuming
if (!skipLogCreation) {
await executionLogger.startWorkflowExecution({
workflowId: this.workflowId,
@@ -108,7 +237,8 @@ export class LoggingSession {
logger.debug(`[${this.requestId}] Started logging for execution ${this.executionId}`)
}
} else {
this.isResume = true // Mark as resume
this.isResume = true
await this.loadExistingCost()
if (this.requestId) {
logger.debug(
`[${this.requestId}] Resuming logging for existing execution ${this.executionId}`
@@ -364,6 +494,68 @@ export class LoggingSession {
}
}
async completeWithPause(params: SessionPausedParams = {}): Promise<void> {
try {
const { endedAt, totalDurationMs, traceSpans, workflowInput } = params
const endTime = endedAt ? new Date(endedAt) : new Date()
const durationMs = typeof totalDurationMs === 'number' ? totalDurationMs : 0
const costSummary = traceSpans?.length
? calculateCostSummary(traceSpans)
: {
totalCost: BASE_EXECUTION_CHARGE,
totalInputCost: 0,
totalOutputCost: 0,
totalTokens: 0,
totalPromptTokens: 0,
totalCompletionTokens: 0,
baseExecutionCharge: BASE_EXECUTION_CHARGE,
modelCost: 0,
models: {},
}
await executionLogger.completeWorkflowExecution({
executionId: this.executionId,
endedAt: endTime.toISOString(),
totalDurationMs: Math.max(1, durationMs),
costSummary,
finalOutput: { paused: true },
traceSpans: traceSpans || [],
workflowInput,
status: 'pending',
})
try {
const { trackPlatformEvent } = await import('@/lib/core/telemetry')
trackPlatformEvent('platform.workflow.executed', {
'workflow.id': this.workflowId,
'execution.duration_ms': Math.max(1, durationMs),
'execution.status': 'paused',
'execution.trigger': this.triggerType,
'execution.blocks_executed': traceSpans?.length || 0,
'execution.has_errors': false,
'execution.total_cost': costSummary.totalCost || 0,
})
} catch (_e) {}
if (this.requestId) {
logger.debug(
`[${this.requestId}] Completed paused logging for execution ${this.executionId}`
)
}
} catch (pauseError) {
logger.error(`Failed to complete paused logging for execution ${this.executionId}:`, {
requestId: this.requestId,
workflowId: this.workflowId,
executionId: this.executionId,
error: pauseError instanceof Error ? pauseError.message : String(pauseError),
stack: pauseError instanceof Error ? pauseError.stack : undefined,
})
throw pauseError
}
}
async safeStart(params: SessionStartParams): Promise<boolean> {
try {
await this.start(params)
@@ -480,13 +672,64 @@ export class LoggingSession {
}
}
async safeCompleteWithPause(params?: SessionPausedParams): Promise<void> {
try {
await this.completeWithPause(params)
} catch (error) {
const errorMsg = error instanceof Error ? error.message : String(error)
logger.warn(
`[${this.requestId || 'unknown'}] CompleteWithPause failed for execution ${this.executionId}, attempting fallback`,
{ error: errorMsg }
)
await this.completeWithCostOnlyLog({
traceSpans: params?.traceSpans,
endedAt: params?.endedAt,
totalDurationMs: params?.totalDurationMs,
errorMessage: 'Execution paused but failed to store full trace spans',
isError: false,
status: 'pending',
})
}
}
async markAsFailed(errorMessage?: string): Promise<void> {
await LoggingSession.markExecutionAsFailed(this.executionId, errorMessage, this.requestId)
}
static async markExecutionAsFailed(
executionId: string,
errorMessage?: string,
requestId?: string
): Promise<void> {
try {
const message = errorMessage || 'Execution failed'
await db
.update(workflowExecutionLogs)
.set({
status: 'failed',
executionData: sql`jsonb_set(
COALESCE(execution_data, '{}'::jsonb),
ARRAY['error'],
to_jsonb(${message}::text)
)`,
})
.where(eq(workflowExecutionLogs.executionId, executionId))
logger.info(`[${requestId || 'unknown'}] Marked execution ${executionId} as failed`)
} catch (error) {
logger.error(`Failed to mark execution ${executionId} as failed:`, {
error: error instanceof Error ? error.message : String(error),
})
}
}
private async completeWithCostOnlyLog(params: {
traceSpans?: TraceSpan[]
endedAt?: string
totalDurationMs?: number
errorMessage: string
isError: boolean
status?: 'completed' | 'failed' | 'cancelled'
status?: 'completed' | 'failed' | 'cancelled' | 'pending'
}): Promise<void> {
if (this.completed) {
return

View File

@@ -367,5 +367,9 @@ export interface ExecutionLoggerService {
}
finalOutput: BlockOutputData
traceSpans?: TraceSpan[]
workflowInput?: any
isResume?: boolean
level?: 'info' | 'error'
status?: 'completed' | 'failed' | 'cancelled' | 'pending'
}): Promise<WorkflowExecutionLog>
}

View File

@@ -3,7 +3,8 @@ import { v4 as uuidv4 } from 'uuid'
import { LoggingSession } from '@/lib/logs/execution/logging-session'
import { executeWorkflowCore } from '@/lib/workflows/executor/execution-core'
import { PauseResumeManager } from '@/lib/workflows/executor/human-in-the-loop-manager'
import { type ExecutionMetadata, ExecutionSnapshot } from '@/executor/execution/snapshot'
import { ExecutionSnapshot } from '@/executor/execution/snapshot'
import type { ExecutionMetadata } from '@/executor/execution/types'
const logger = createLogger('WorkflowExecution')
@@ -83,14 +84,25 @@ export async function executeWorkflow(
logger.error(`[${requestId}] Missing snapshot seed for paused execution`, {
executionId,
})
await loggingSession.markAsFailed('Missing snapshot seed for paused execution')
} else {
await PauseResumeManager.persistPauseResult({
workflowId,
executionId,
pausePoints: result.pausePoints || [],
snapshotSeed: result.snapshotSeed,
executorUserId: result.metadata?.userId,
})
try {
await PauseResumeManager.persistPauseResult({
workflowId,
executionId,
pausePoints: result.pausePoints || [],
snapshotSeed: result.snapshotSeed,
executorUserId: result.metadata?.userId,
})
} catch (pauseError) {
logger.error(`[${requestId}] Failed to persist pause result`, {
executionId,
error: pauseError instanceof Error ? pauseError.message : String(pauseError),
})
await loggingSession.markAsFailed(
`Failed to persist pause state: ${pauseError instanceof Error ? pauseError.message : String(pauseError)}`
)
}
}
} else {
await PauseResumeManager.processQueuedResumes(executionId)

View File

@@ -18,7 +18,8 @@ import { TriggerUtils } from '@/lib/workflows/triggers/triggers'
import { updateWorkflowRunCounts } from '@/lib/workflows/utils'
import { Executor } from '@/executor'
import { REFERENCE } from '@/executor/constants'
import type { ExecutionCallbacks, ExecutionSnapshot } from '@/executor/execution/snapshot'
import type { ExecutionSnapshot } from '@/executor/execution/snapshot'
import type { ExecutionCallbacks, IterationContext } from '@/executor/execution/types'
import type { ExecutionResult } from '@/executor/types'
import { createEnvVarPattern } from '@/executor/utils/reference-validation'
import { Serializer } from '@/serializer'
@@ -316,6 +317,19 @@ export async function executeWorkflowCore(
})
}
const wrappedOnBlockComplete = async (
blockId: string,
blockName: string,
blockType: string,
output: any,
iterationContext?: IterationContext
) => {
await loggingSession.onBlockComplete(blockId, blockName, blockType, output)
if (onBlockComplete) {
await onBlockComplete(blockId, blockName, blockType, output, iterationContext)
}
}
const contextExtensions: any = {
stream: !!onStream,
selectedOutputs,
@@ -324,7 +338,7 @@ export async function executeWorkflowCore(
userId,
isDeployedContext: triggerType !== 'manual',
onBlockStart,
onBlockComplete,
onBlockComplete: wrappedOnBlockComplete,
onStream,
resumeFromSnapshot,
resumePendingQueue,
@@ -386,6 +400,13 @@ export async function executeWorkflowCore(
}
if (result.status === 'paused') {
await loggingSession.safeCompleteWithPause({
endedAt: new Date().toISOString(),
totalDurationMs: totalDuration || 0,
traceSpans: traceSpans || [],
workflowInput: processedInput,
})
await clearExecutionCancellation(executionId)
logger.info(`[${requestId}] Workflow execution paused`, {

View File

@@ -155,11 +155,6 @@ export class PauseResumeManager {
},
})
await db
.update(workflowExecutionLogs)
.set({ status: 'pending' })
.where(eq(workflowExecutionLogs.executionId, executionId))
await PauseResumeManager.processQueuedResumes(executionId)
}
@@ -302,18 +297,34 @@ export class PauseResumeManager {
})
if (result.status === 'paused') {
const effectiveExecutionId = result.metadata?.executionId ?? resumeExecutionId
if (!result.snapshotSeed) {
logger.error('Missing snapshot seed for paused resume execution', {
resumeExecutionId,
})
await LoggingSession.markExecutionAsFailed(
effectiveExecutionId,
'Missing snapshot seed for paused execution'
)
} else {
await PauseResumeManager.persistPauseResult({
workflowId: pausedExecution.workflowId,
executionId: result.metadata?.executionId ?? resumeExecutionId,
pausePoints: result.pausePoints || [],
snapshotSeed: result.snapshotSeed,
executorUserId: result.metadata?.userId,
})
try {
await PauseResumeManager.persistPauseResult({
workflowId: pausedExecution.workflowId,
executionId: effectiveExecutionId,
pausePoints: result.pausePoints || [],
snapshotSeed: result.snapshotSeed,
executorUserId: result.metadata?.userId,
})
} catch (pauseError) {
logger.error('Failed to persist pause result for resumed execution', {
resumeExecutionId,
error: pauseError instanceof Error ? pauseError.message : String(pauseError),
})
await LoggingSession.markExecutionAsFailed(
effectiveExecutionId,
`Failed to persist pause state: ${pauseError instanceof Error ? pauseError.message : String(pauseError)}`
)
}
}
} else {
await PauseResumeManager.updateSnapshotAfterResume({