diff --git a/apps/sim/app/api/cron/cleanup-stale-executions/route.ts b/apps/sim/app/api/cron/cleanup-stale-executions/route.ts new file mode 100644 index 000000000..5c4ba7921 --- /dev/null +++ b/apps/sim/app/api/cron/cleanup-stale-executions/route.ts @@ -0,0 +1,90 @@ +import { db } from '@sim/db' +import { workflowExecutionLogs } from '@sim/db/schema' +import { createLogger } from '@sim/logger' +import { and, eq, lt, sql } from 'drizzle-orm' +import { type NextRequest, NextResponse } from 'next/server' +import { verifyCronAuth } from '@/lib/auth/internal' + +const logger = createLogger('CleanupStaleExecutions') + +const STALE_THRESHOLD_MINUTES = 30 + +export async function GET(request: NextRequest) { + try { + const authError = verifyCronAuth(request, 'Stale execution cleanup') + if (authError) { + return authError + } + + logger.info('Starting stale execution cleanup job') + + const staleThreshold = new Date(Date.now() - STALE_THRESHOLD_MINUTES * 60 * 1000) + + const staleExecutions = await db + .select({ + id: workflowExecutionLogs.id, + executionId: workflowExecutionLogs.executionId, + workflowId: workflowExecutionLogs.workflowId, + startedAt: workflowExecutionLogs.startedAt, + }) + .from(workflowExecutionLogs) + .where( + and( + eq(workflowExecutionLogs.status, 'running'), + lt(workflowExecutionLogs.startedAt, staleThreshold) + ) + ) + .limit(100) + + logger.info(`Found ${staleExecutions.length} stale executions to clean up`) + + let cleaned = 0 + let failed = 0 + + for (const execution of staleExecutions) { + try { + const staleDurationMs = Date.now() - new Date(execution.startedAt).getTime() + const staleDurationMinutes = Math.round(staleDurationMs / 60000) + + await db + .update(workflowExecutionLogs) + .set({ + status: 'failed', + endedAt: new Date(), + totalDurationMs: staleDurationMs, + executionData: sql`jsonb_set( + COALESCE(execution_data, '{}'::jsonb), + ARRAY['error'], + to_jsonb(${`Execution terminated: worker timeout or crash after ${staleDurationMinutes} minutes`}::text) + )`, + }) + .where(eq(workflowExecutionLogs.id, execution.id)) + + logger.info(`Cleaned up stale execution ${execution.executionId}`, { + workflowId: execution.workflowId, + staleDurationMinutes, + }) + + cleaned++ + } catch (error) { + logger.error(`Failed to clean up execution ${execution.executionId}:`, { + error: error instanceof Error ? error.message : String(error), + }) + failed++ + } + } + + logger.info(`Stale execution cleanup completed. Cleaned: ${cleaned}, Failed: ${failed}`) + + return NextResponse.json({ + success: true, + found: staleExecutions.length, + cleaned, + failed, + thresholdMinutes: STALE_THRESHOLD_MINUTES, + }) + } catch (error) { + logger.error('Error in stale execution cleanup job:', error) + return NextResponse.json({ error: 'Internal server error' }, { status: 500 }) + } +} diff --git a/apps/sim/app/api/workflows/[id]/execute/route.ts b/apps/sim/app/api/workflows/[id]/execute/route.ts index 2d7fc088b..ca35437e5 100644 --- a/apps/sim/app/api/workflows/[id]/execute/route.ts +++ b/apps/sim/app/api/workflows/[id]/execute/route.ts @@ -23,11 +23,11 @@ import { createStreamingResponse } from '@/lib/workflows/streaming/streaming' import { createHttpResponseFromBlock, workflowHasResponseBlock } from '@/lib/workflows/utils' import type { WorkflowExecutionPayload } from '@/background/workflow-execution' import { normalizeName } from '@/executor/constants' -import { type ExecutionMetadata, ExecutionSnapshot } from '@/executor/execution/snapshot' +import { ExecutionSnapshot } from '@/executor/execution/snapshot' +import type { ExecutionMetadata, IterationContext } from '@/executor/execution/types' import type { StreamingExecution } from '@/executor/types' import { Serializer } from '@/serializer' import { CORE_TRIGGER_TYPES } from '@/stores/logs/filters/types' -import type { SubflowType } from '@/stores/workflows/workflow/types' const logger = createLogger('WorkflowExecuteAPI') @@ -541,11 +541,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id: blockId: string, blockName: string, blockType: string, - iterationContext?: { - iterationCurrent: number - iterationTotal: number - iterationType: SubflowType - } + iterationContext?: IterationContext ) => { logger.info(`[${requestId}] 🔷 onBlockStart called:`, { blockId, blockName, blockType }) sendEvent({ @@ -571,11 +567,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id: blockName: string, blockType: string, callbackData: any, - iterationContext?: { - iterationCurrent: number - iterationTotal: number - iterationType: SubflowType - } + iterationContext?: IterationContext ) => { const hasError = callbackData.output?.error @@ -713,14 +705,25 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id: logger.error(`[${requestId}] Missing snapshot seed for paused execution`, { executionId, }) + await loggingSession.markAsFailed('Missing snapshot seed for paused execution') } else { - await PauseResumeManager.persistPauseResult({ - workflowId, - executionId, - pausePoints: result.pausePoints || [], - snapshotSeed: result.snapshotSeed, - executorUserId: result.metadata?.userId, - }) + try { + await PauseResumeManager.persistPauseResult({ + workflowId, + executionId, + pausePoints: result.pausePoints || [], + snapshotSeed: result.snapshotSeed, + executorUserId: result.metadata?.userId, + }) + } catch (pauseError) { + logger.error(`[${requestId}] Failed to persist pause result`, { + executionId, + error: pauseError instanceof Error ? pauseError.message : String(pauseError), + }) + await loggingSession.markAsFailed( + `Failed to persist pause state: ${pauseError instanceof Error ? pauseError.message : String(pauseError)}` + ) + } } } else { await PauseResumeManager.processQueuedResumes(executionId) diff --git a/apps/sim/background/schedule-execution.ts b/apps/sim/background/schedule-execution.ts index d92bbd6ff..85eeaabfd 100644 --- a/apps/sim/background/schedule-execution.ts +++ b/apps/sim/background/schedule-execution.ts @@ -23,7 +23,8 @@ import { getSubBlockValue, } from '@/lib/workflows/schedules/utils' import { REFERENCE } from '@/executor/constants' -import { type ExecutionMetadata, ExecutionSnapshot } from '@/executor/execution/snapshot' +import { ExecutionSnapshot } from '@/executor/execution/snapshot' +import type { ExecutionMetadata } from '@/executor/execution/types' import type { ExecutionResult } from '@/executor/types' import { createEnvVarPattern } from '@/executor/utils/reference-validation' import { mergeSubblockState } from '@/stores/workflows/server-utils' @@ -285,14 +286,25 @@ async function runWorkflowExecution({ logger.error(`[${requestId}] Missing snapshot seed for paused execution`, { executionId, }) + await loggingSession.markAsFailed('Missing snapshot seed for paused execution') } else { - await PauseResumeManager.persistPauseResult({ - workflowId: payload.workflowId, - executionId, - pausePoints: executionResult.pausePoints || [], - snapshotSeed: executionResult.snapshotSeed, - executorUserId: executionResult.metadata?.userId, - }) + try { + await PauseResumeManager.persistPauseResult({ + workflowId: payload.workflowId, + executionId, + pausePoints: executionResult.pausePoints || [], + snapshotSeed: executionResult.snapshotSeed, + executorUserId: executionResult.metadata?.userId, + }) + } catch (pauseError) { + logger.error(`[${requestId}] Failed to persist pause result`, { + executionId, + error: pauseError instanceof Error ? pauseError.message : String(pauseError), + }) + await loggingSession.markAsFailed( + `Failed to persist pause state: ${pauseError instanceof Error ? pauseError.message : String(pauseError)}` + ) + } } } else { await PauseResumeManager.processQueuedResumes(executionId) diff --git a/apps/sim/background/webhook-execution.ts b/apps/sim/background/webhook-execution.ts index e632ec234..20389689f 100644 --- a/apps/sim/background/webhook-execution.ts +++ b/apps/sim/background/webhook-execution.ts @@ -17,7 +17,8 @@ import { loadWorkflowFromNormalizedTables, } from '@/lib/workflows/persistence/utils' import { getWorkflowById } from '@/lib/workflows/utils' -import { type ExecutionMetadata, ExecutionSnapshot } from '@/executor/execution/snapshot' +import { ExecutionSnapshot } from '@/executor/execution/snapshot' +import type { ExecutionMetadata } from '@/executor/execution/types' import type { ExecutionResult } from '@/executor/types' import { Serializer } from '@/serializer' import { mergeSubblockState } from '@/stores/workflows/server-utils' @@ -268,14 +269,25 @@ async function executeWebhookJobInternal( logger.error(`[${requestId}] Missing snapshot seed for paused execution`, { executionId, }) + await loggingSession.markAsFailed('Missing snapshot seed for paused execution') } else { - await PauseResumeManager.persistPauseResult({ - workflowId: payload.workflowId, - executionId, - pausePoints: executionResult.pausePoints || [], - snapshotSeed: executionResult.snapshotSeed, - executorUserId: executionResult.metadata?.userId, - }) + try { + await PauseResumeManager.persistPauseResult({ + workflowId: payload.workflowId, + executionId, + pausePoints: executionResult.pausePoints || [], + snapshotSeed: executionResult.snapshotSeed, + executorUserId: executionResult.metadata?.userId, + }) + } catch (pauseError) { + logger.error(`[${requestId}] Failed to persist pause result`, { + executionId, + error: pauseError instanceof Error ? pauseError.message : String(pauseError), + }) + await loggingSession.markAsFailed( + `Failed to persist pause state: ${pauseError instanceof Error ? pauseError.message : String(pauseError)}` + ) + } } } else { await PauseResumeManager.processQueuedResumes(executionId) @@ -509,14 +521,25 @@ async function executeWebhookJobInternal( logger.error(`[${requestId}] Missing snapshot seed for paused execution`, { executionId, }) + await loggingSession.markAsFailed('Missing snapshot seed for paused execution') } else { - await PauseResumeManager.persistPauseResult({ - workflowId: payload.workflowId, - executionId, - pausePoints: executionResult.pausePoints || [], - snapshotSeed: executionResult.snapshotSeed, - executorUserId: executionResult.metadata?.userId, - }) + try { + await PauseResumeManager.persistPauseResult({ + workflowId: payload.workflowId, + executionId, + pausePoints: executionResult.pausePoints || [], + snapshotSeed: executionResult.snapshotSeed, + executorUserId: executionResult.metadata?.userId, + }) + } catch (pauseError) { + logger.error(`[${requestId}] Failed to persist pause result`, { + executionId, + error: pauseError instanceof Error ? pauseError.message : String(pauseError), + }) + await loggingSession.markAsFailed( + `Failed to persist pause state: ${pauseError instanceof Error ? pauseError.message : String(pauseError)}` + ) + } } } else { await PauseResumeManager.processQueuedResumes(executionId) diff --git a/apps/sim/background/workflow-execution.ts b/apps/sim/background/workflow-execution.ts index e5539fa22..9bb1686d6 100644 --- a/apps/sim/background/workflow-execution.ts +++ b/apps/sim/background/workflow-execution.ts @@ -7,7 +7,8 @@ import { buildTraceSpans } from '@/lib/logs/execution/trace-spans/trace-spans' import { executeWorkflowCore } from '@/lib/workflows/executor/execution-core' import { PauseResumeManager } from '@/lib/workflows/executor/human-in-the-loop-manager' import { getWorkflowById } from '@/lib/workflows/utils' -import { type ExecutionMetadata, ExecutionSnapshot } from '@/executor/execution/snapshot' +import { ExecutionSnapshot } from '@/executor/execution/snapshot' +import type { ExecutionMetadata } from '@/executor/execution/types' import type { ExecutionResult } from '@/executor/types' const logger = createLogger('TriggerWorkflowExecution') @@ -112,14 +113,25 @@ export async function executeWorkflowJob(payload: WorkflowExecutionPayload) { logger.error(`[${requestId}] Missing snapshot seed for paused execution`, { executionId, }) + await loggingSession.markAsFailed('Missing snapshot seed for paused execution') } else { - await PauseResumeManager.persistPauseResult({ - workflowId, - executionId, - pausePoints: result.pausePoints || [], - snapshotSeed: result.snapshotSeed, - executorUserId: result.metadata?.userId, - }) + try { + await PauseResumeManager.persistPauseResult({ + workflowId, + executionId, + pausePoints: result.pausePoints || [], + snapshotSeed: result.snapshotSeed, + executorUserId: result.metadata?.userId, + }) + } catch (pauseError) { + logger.error(`[${requestId}] Failed to persist pause result`, { + executionId, + error: pauseError instanceof Error ? pauseError.message : String(pauseError), + }) + await loggingSession.markAsFailed( + `Failed to persist pause state: ${pauseError instanceof Error ? pauseError.message : String(pauseError)}` + ) + } } } else { await PauseResumeManager.processQueuedResumes(executionId) diff --git a/apps/sim/executor/execution/snapshot-serializer.ts b/apps/sim/executor/execution/snapshot-serializer.ts index ab20e5187..6674a88ab 100644 --- a/apps/sim/executor/execution/snapshot-serializer.ts +++ b/apps/sim/executor/execution/snapshot-serializer.ts @@ -1,9 +1,6 @@ import type { DAG } from '@/executor/dag/builder' -import { - type ExecutionMetadata, - ExecutionSnapshot, - type SerializableExecutionState, -} from '@/executor/execution/snapshot' +import { ExecutionSnapshot } from '@/executor/execution/snapshot' +import type { ExecutionMetadata, SerializableExecutionState } from '@/executor/execution/types' import type { ExecutionContext, SerializedSnapshot } from '@/executor/types' function mapFromEntries(map?: Map): Record | undefined { diff --git a/apps/sim/executor/execution/snapshot.ts b/apps/sim/executor/execution/snapshot.ts index dfa0d1cc3..afe9bf52d 100644 --- a/apps/sim/executor/execution/snapshot.ts +++ b/apps/sim/executor/execution/snapshot.ts @@ -1,59 +1,4 @@ -import type { Edge } from 'reactflow' -import type { BlockLog, BlockState } from '@/executor/types' - -export interface ExecutionMetadata { - requestId: string - executionId: string - workflowId: string - workspaceId: string - userId: string - sessionUserId?: string - workflowUserId?: string - triggerType: string - triggerBlockId?: string - useDraftState: boolean - startTime: string - isClientSession?: boolean - pendingBlocks?: string[] - resumeFromSnapshot?: boolean - workflowStateOverride?: { - blocks: Record - edges: Edge[] - loops?: Record - parallels?: Record - deploymentVersionId?: string // ID of deployment version if this is deployed state - } -} - -export interface ExecutionCallbacks { - onStream?: (streamingExec: any) => Promise - onBlockStart?: (blockId: string, blockName: string, blockType: string) => Promise - onBlockComplete?: ( - blockId: string, - blockName: string, - blockType: string, - output: any - ) => Promise -} - -export interface SerializableExecutionState { - blockStates: Record - executedBlocks: string[] - blockLogs: BlockLog[] - decisions: { - router: Record - condition: Record - } - completedLoops: string[] - loopExecutions?: Record - parallelExecutions?: Record - parallelBlockMapping?: Record - activeExecutionPath: string[] - pendingQueue?: string[] - remainingEdges?: Edge[] - dagIncomingEdges?: Record - completedPauseContexts?: string[] -} +import type { ExecutionMetadata, SerializableExecutionState } from '@/executor/execution/types' export class ExecutionSnapshot { constructor( diff --git a/apps/sim/executor/execution/types.ts b/apps/sim/executor/execution/types.ts index 0c5ac50e3..e4a6a5328 100644 --- a/apps/sim/executor/execution/types.ts +++ b/apps/sim/executor/execution/types.ts @@ -1,7 +1,68 @@ -import type { ExecutionMetadata, SerializableExecutionState } from '@/executor/execution/snapshot' -import type { BlockState, NormalizedBlockOutput } from '@/executor/types' +import type { Edge } from 'reactflow' +import type { BlockLog, BlockState, NormalizedBlockOutput } from '@/executor/types' import type { SubflowType } from '@/stores/workflows/workflow/types' +export interface ExecutionMetadata { + requestId: string + executionId: string + workflowId: string + workspaceId: string + userId: string + sessionUserId?: string + workflowUserId?: string + triggerType: string + triggerBlockId?: string + useDraftState: boolean + startTime: string + isClientSession?: boolean + pendingBlocks?: string[] + resumeFromSnapshot?: boolean + workflowStateOverride?: { + blocks: Record + edges: Edge[] + loops?: Record + parallels?: Record + deploymentVersionId?: string + } +} + +export interface SerializableExecutionState { + blockStates: Record + executedBlocks: string[] + blockLogs: BlockLog[] + decisions: { + router: Record + condition: Record + } + completedLoops: string[] + loopExecutions?: Record + parallelExecutions?: Record + parallelBlockMapping?: Record + activeExecutionPath: string[] + pendingQueue?: string[] + remainingEdges?: Edge[] + dagIncomingEdges?: Record + completedPauseContexts?: string[] +} + +export interface IterationContext { + iterationCurrent: number + iterationTotal: number + iterationType: SubflowType +} + +export interface ExecutionCallbacks { + onStream?: (streamingExec: any) => Promise + onBlockStart?: (blockId: string, blockName: string, blockType: string) => Promise + onBlockComplete?: ( + blockId: string, + blockName: string, + blockType: string, + output: any, + iterationContext?: IterationContext + ) => Promise +} + export interface ContextExtensions { workspaceId?: string executionId?: string @@ -32,22 +93,14 @@ export interface ContextExtensions { blockId: string, blockName: string, blockType: string, - iterationContext?: { - iterationCurrent: number - iterationTotal: number - iterationType: SubflowType - } + iterationContext?: IterationContext ) => Promise onBlockComplete?: ( blockId: string, blockName: string, blockType: string, output: { input?: any; output: NormalizedBlockOutput; executionTime: number }, - iterationContext?: { - iterationCurrent: number - iterationTotal: number - iterationType: SubflowType - } + iterationContext?: IterationContext ) => Promise } diff --git a/apps/sim/lib/logs/execution/logger.test.ts b/apps/sim/lib/logs/execution/logger.test.ts index 805d238e7..12d86635d 100644 --- a/apps/sim/lib/logs/execution/logger.test.ts +++ b/apps/sim/lib/logs/execution/logger.test.ts @@ -313,95 +313,4 @@ describe('ExecutionLogger', () => { expect(files[0].name).toBe('nested.json') }) }) - - describe('cost model merging', () => { - test('should merge cost models correctly', () => { - const loggerInstance = new ExecutionLogger() - const mergeCostModelsMethod = (loggerInstance as any).mergeCostModels.bind(loggerInstance) - - const existing = { - 'gpt-4': { - input: 0.01, - output: 0.02, - total: 0.03, - tokens: { input: 100, output: 200, total: 300 }, - }, - } - - const additional = { - 'gpt-4': { - input: 0.005, - output: 0.01, - total: 0.015, - tokens: { input: 50, output: 100, total: 150 }, - }, - 'gpt-3.5-turbo': { - input: 0.001, - output: 0.002, - total: 0.003, - tokens: { input: 10, output: 20, total: 30 }, - }, - } - - const merged = mergeCostModelsMethod(existing, additional) - - expect(merged['gpt-4'].input).toBe(0.015) - expect(merged['gpt-4'].output).toBe(0.03) - expect(merged['gpt-4'].total).toBe(0.045) - expect(merged['gpt-4'].tokens.input).toBe(150) - expect(merged['gpt-4'].tokens.output).toBe(300) - expect(merged['gpt-4'].tokens.total).toBe(450) - - expect(merged['gpt-3.5-turbo']).toBeDefined() - expect(merged['gpt-3.5-turbo'].total).toBe(0.003) - }) - - test('should handle prompt/completion token aliases', () => { - const loggerInstance = new ExecutionLogger() - const mergeCostModelsMethod = (loggerInstance as any).mergeCostModels.bind(loggerInstance) - - const existing = { - 'gpt-4': { - input: 0.01, - output: 0.02, - total: 0.03, - tokens: { prompt: 100, completion: 200, total: 300 }, - }, - } - - const additional = { - 'gpt-4': { - input: 0.005, - output: 0.01, - total: 0.015, - tokens: { input: 50, output: 100, total: 150 }, - }, - } - - const merged = mergeCostModelsMethod(existing, additional) - - expect(merged['gpt-4'].tokens.input).toBe(150) - expect(merged['gpt-4'].tokens.output).toBe(300) - }) - - test('should handle empty existing models', () => { - const loggerInstance = new ExecutionLogger() - const mergeCostModelsMethod = (loggerInstance as any).mergeCostModels.bind(loggerInstance) - - const existing = {} - const additional = { - 'claude-3': { - input: 0.02, - output: 0.04, - total: 0.06, - tokens: { input: 200, output: 400, total: 600 }, - }, - } - - const merged = mergeCostModelsMethod(existing, additional) - - expect(merged['claude-3']).toBeDefined() - expect(merged['claude-3'].total).toBe(0.06) - }) - }) }) diff --git a/apps/sim/lib/logs/execution/logger.ts b/apps/sim/lib/logs/execution/logger.ts index d31840585..cfb24f139 100644 --- a/apps/sim/lib/logs/execution/logger.ts +++ b/apps/sim/lib/logs/execution/logger.ts @@ -9,6 +9,7 @@ import { import { createLogger } from '@sim/logger' import { eq, sql } from 'drizzle-orm' import { v4 as uuidv4 } from 'uuid' +import { BASE_EXECUTION_CHARGE } from '@/lib/billing/constants' import { getHighestPrioritySubscription } from '@/lib/billing/core/subscription' import { checkUsageStatus, @@ -47,34 +48,6 @@ export interface ToolCall { const logger = createLogger('ExecutionLogger') export class ExecutionLogger implements IExecutionLoggerService { - private mergeCostModels( - existing: Record, - additional: Record - ): Record { - const merged = { ...existing } - for (const [model, costs] of Object.entries(additional)) { - if (merged[model]) { - merged[model] = { - input: (merged[model].input || 0) + (costs.input || 0), - output: (merged[model].output || 0) + (costs.output || 0), - total: (merged[model].total || 0) + (costs.total || 0), - tokens: { - input: - (merged[model].tokens?.input || merged[model].tokens?.prompt || 0) + - (costs.tokens?.input || costs.tokens?.prompt || 0), - output: - (merged[model].tokens?.output || merged[model].tokens?.completion || 0) + - (costs.tokens?.output || costs.tokens?.completion || 0), - total: (merged[model].tokens?.total || 0) + (costs.tokens?.total || 0), - }, - } - } else { - merged[model] = costs - } - } - return merged - } - async startWorkflowExecution(params: { workflowId: string workspaceId: string @@ -158,6 +131,13 @@ export class ExecutionLogger implements IExecutionLoggerService { environment, trigger, }, + cost: { + total: BASE_EXECUTION_CHARGE, + input: 0, + output: 0, + tokens: { input: 0, output: 0, total: 0 }, + models: {}, + }, }) .returning() @@ -209,7 +189,7 @@ export class ExecutionLogger implements IExecutionLoggerService { workflowInput?: any isResume?: boolean level?: 'info' | 'error' - status?: 'completed' | 'failed' | 'cancelled' + status?: 'completed' | 'failed' | 'cancelled' | 'pending' }): Promise { const { executionId, @@ -268,43 +248,19 @@ export class ExecutionLogger implements IExecutionLoggerService { const redactedTraceSpans = redactApiKeys(filteredTraceSpans) const redactedFinalOutput = redactApiKeys(filteredFinalOutput) - // Merge costs if resuming - const existingCost = isResume && existingLog?.cost ? existingLog.cost : null - const mergedCost = existingCost - ? { - // For resume, add only the model costs, NOT the base execution charge again - total: (existingCost.total || 0) + costSummary.modelCost, - input: (existingCost.input || 0) + costSummary.totalInputCost, - output: (existingCost.output || 0) + costSummary.totalOutputCost, - tokens: { - input: - (existingCost.tokens?.input || existingCost.tokens?.prompt || 0) + - costSummary.totalPromptTokens, - output: - (existingCost.tokens?.output || existingCost.tokens?.completion || 0) + - costSummary.totalCompletionTokens, - total: (existingCost.tokens?.total || 0) + costSummary.totalTokens, - }, - models: this.mergeCostModels(existingCost.models || {}, costSummary.models), - } - : { - total: costSummary.totalCost, - input: costSummary.totalInputCost, - output: costSummary.totalOutputCost, - tokens: { - input: costSummary.totalPromptTokens, - output: costSummary.totalCompletionTokens, - total: costSummary.totalTokens, - }, - models: costSummary.models, - } + const executionCost = { + total: costSummary.totalCost, + input: costSummary.totalInputCost, + output: costSummary.totalOutputCost, + tokens: { + input: costSummary.totalPromptTokens, + output: costSummary.totalCompletionTokens, + total: costSummary.totalTokens, + }, + models: costSummary.models, + } - // Merge files if resuming - const existingFiles = isResume && existingLog?.files ? existingLog.files : [] - const mergedFiles = [...existingFiles, ...executionFiles] - - // Calculate the actual total duration for resume executions - const actualTotalDuration = + const totalDuration = isResume && existingLog?.startedAt ? new Date(endedAt).getTime() - new Date(existingLog.startedAt).getTime() : totalDurationMs @@ -315,19 +271,19 @@ export class ExecutionLogger implements IExecutionLoggerService { level, status, endedAt: new Date(endedAt), - totalDurationMs: actualTotalDuration, - files: mergedFiles.length > 0 ? mergedFiles : null, + totalDurationMs: totalDuration, + files: executionFiles.length > 0 ? executionFiles : null, executionData: { traceSpans: redactedTraceSpans, finalOutput: redactedFinalOutput, tokens: { - input: mergedCost.tokens.input, - output: mergedCost.tokens.output, - total: mergedCost.tokens.total, + input: executionCost.tokens.input, + output: executionCost.tokens.output, + total: executionCost.tokens.total, }, - models: mergedCost.models, + models: executionCost.models, }, - cost: mergedCost, + cost: executionCost, }) .where(eq(workflowExecutionLogs.executionId, executionId)) .returning() diff --git a/apps/sim/lib/logs/execution/logging-session.ts b/apps/sim/lib/logs/execution/logging-session.ts index ca3a5896b..37004d688 100644 --- a/apps/sim/lib/logs/execution/logging-session.ts +++ b/apps/sim/lib/logs/execution/logging-session.ts @@ -1,4 +1,7 @@ +import { db } from '@sim/db' +import { workflowExecutionLogs } from '@sim/db/schema' import { createLogger } from '@sim/logger' +import { eq, sql } from 'drizzle-orm' import { BASE_EXECUTION_CHARGE } from '@/lib/billing/constants' import { executionLogger } from '@/lib/logs/execution/logger' import { @@ -50,6 +53,29 @@ export interface SessionCancelledParams { traceSpans?: TraceSpan[] } +export interface SessionPausedParams { + endedAt?: string + totalDurationMs?: number + traceSpans?: TraceSpan[] + workflowInput?: any +} + +interface AccumulatedCost { + total: number + input: number + output: number + tokens: { input: number; output: number; total: number } + models: Record< + string, + { + input: number + output: number + total: number + tokens: { input: number; output: number; total: number } + } + > +} + export class LoggingSession { private workflowId: string private executionId: string @@ -60,6 +86,14 @@ export class LoggingSession { private workflowState?: WorkflowState private isResume = false private completed = false + private accumulatedCost: AccumulatedCost = { + total: BASE_EXECUTION_CHARGE, + input: 0, + output: 0, + tokens: { input: 0, output: 0, total: 0 }, + models: {}, + } + private costFlushed = false constructor( workflowId: string, @@ -73,6 +107,102 @@ export class LoggingSession { this.requestId = requestId } + async onBlockComplete( + blockId: string, + blockName: string, + blockType: string, + output: any + ): Promise { + if (!output?.cost || typeof output.cost.total !== 'number' || output.cost.total <= 0) { + return + } + + const { cost, tokens, model } = output + + this.accumulatedCost.total += cost.total || 0 + this.accumulatedCost.input += cost.input || 0 + this.accumulatedCost.output += cost.output || 0 + + if (tokens) { + this.accumulatedCost.tokens.input += tokens.input || 0 + this.accumulatedCost.tokens.output += tokens.output || 0 + this.accumulatedCost.tokens.total += tokens.total || 0 + } + + if (model) { + if (!this.accumulatedCost.models[model]) { + this.accumulatedCost.models[model] = { + input: 0, + output: 0, + total: 0, + tokens: { input: 0, output: 0, total: 0 }, + } + } + this.accumulatedCost.models[model].input += cost.input || 0 + this.accumulatedCost.models[model].output += cost.output || 0 + this.accumulatedCost.models[model].total += cost.total || 0 + if (tokens) { + this.accumulatedCost.models[model].tokens.input += tokens.input || 0 + this.accumulatedCost.models[model].tokens.output += tokens.output || 0 + this.accumulatedCost.models[model].tokens.total += tokens.total || 0 + } + } + + await this.flushAccumulatedCost() + } + + private async flushAccumulatedCost(): Promise { + try { + await db + .update(workflowExecutionLogs) + .set({ + cost: { + total: this.accumulatedCost.total, + input: this.accumulatedCost.input, + output: this.accumulatedCost.output, + tokens: this.accumulatedCost.tokens, + models: this.accumulatedCost.models, + }, + }) + .where(eq(workflowExecutionLogs.executionId, this.executionId)) + + this.costFlushed = true + } catch (error) { + logger.error(`Failed to flush accumulated cost for execution ${this.executionId}:`, { + error: error instanceof Error ? error.message : String(error), + }) + } + } + + private async loadExistingCost(): Promise { + try { + const [existing] = await db + .select({ cost: workflowExecutionLogs.cost }) + .from(workflowExecutionLogs) + .where(eq(workflowExecutionLogs.executionId, this.executionId)) + .limit(1) + + if (existing?.cost) { + const cost = existing.cost as any + this.accumulatedCost = { + total: cost.total || BASE_EXECUTION_CHARGE, + input: cost.input || 0, + output: cost.output || 0, + tokens: { + input: cost.tokens?.input || 0, + output: cost.tokens?.output || 0, + total: cost.tokens?.total || 0, + }, + models: cost.models || {}, + } + } + } catch (error) { + logger.error(`Failed to load existing cost for execution ${this.executionId}:`, { + error: error instanceof Error ? error.message : String(error), + }) + } + } + async start(params: SessionStartParams): Promise { const { userId, workspaceId, variables, triggerData, skipLogCreation, deploymentVersionId } = params @@ -92,7 +222,6 @@ export class LoggingSession { ? await loadDeployedWorkflowStateForLogging(this.workflowId) : await loadWorkflowStateForExecution(this.workflowId) - // Only create a new log entry if not resuming if (!skipLogCreation) { await executionLogger.startWorkflowExecution({ workflowId: this.workflowId, @@ -108,7 +237,8 @@ export class LoggingSession { logger.debug(`[${this.requestId}] Started logging for execution ${this.executionId}`) } } else { - this.isResume = true // Mark as resume + this.isResume = true + await this.loadExistingCost() if (this.requestId) { logger.debug( `[${this.requestId}] Resuming logging for existing execution ${this.executionId}` @@ -364,6 +494,68 @@ export class LoggingSession { } } + async completeWithPause(params: SessionPausedParams = {}): Promise { + try { + const { endedAt, totalDurationMs, traceSpans, workflowInput } = params + + const endTime = endedAt ? new Date(endedAt) : new Date() + const durationMs = typeof totalDurationMs === 'number' ? totalDurationMs : 0 + + const costSummary = traceSpans?.length + ? calculateCostSummary(traceSpans) + : { + totalCost: BASE_EXECUTION_CHARGE, + totalInputCost: 0, + totalOutputCost: 0, + totalTokens: 0, + totalPromptTokens: 0, + totalCompletionTokens: 0, + baseExecutionCharge: BASE_EXECUTION_CHARGE, + modelCost: 0, + models: {}, + } + + await executionLogger.completeWorkflowExecution({ + executionId: this.executionId, + endedAt: endTime.toISOString(), + totalDurationMs: Math.max(1, durationMs), + costSummary, + finalOutput: { paused: true }, + traceSpans: traceSpans || [], + workflowInput, + status: 'pending', + }) + + try { + const { trackPlatformEvent } = await import('@/lib/core/telemetry') + trackPlatformEvent('platform.workflow.executed', { + 'workflow.id': this.workflowId, + 'execution.duration_ms': Math.max(1, durationMs), + 'execution.status': 'paused', + 'execution.trigger': this.triggerType, + 'execution.blocks_executed': traceSpans?.length || 0, + 'execution.has_errors': false, + 'execution.total_cost': costSummary.totalCost || 0, + }) + } catch (_e) {} + + if (this.requestId) { + logger.debug( + `[${this.requestId}] Completed paused logging for execution ${this.executionId}` + ) + } + } catch (pauseError) { + logger.error(`Failed to complete paused logging for execution ${this.executionId}:`, { + requestId: this.requestId, + workflowId: this.workflowId, + executionId: this.executionId, + error: pauseError instanceof Error ? pauseError.message : String(pauseError), + stack: pauseError instanceof Error ? pauseError.stack : undefined, + }) + throw pauseError + } + } + async safeStart(params: SessionStartParams): Promise { try { await this.start(params) @@ -480,13 +672,64 @@ export class LoggingSession { } } + async safeCompleteWithPause(params?: SessionPausedParams): Promise { + try { + await this.completeWithPause(params) + } catch (error) { + const errorMsg = error instanceof Error ? error.message : String(error) + logger.warn( + `[${this.requestId || 'unknown'}] CompleteWithPause failed for execution ${this.executionId}, attempting fallback`, + { error: errorMsg } + ) + await this.completeWithCostOnlyLog({ + traceSpans: params?.traceSpans, + endedAt: params?.endedAt, + totalDurationMs: params?.totalDurationMs, + errorMessage: 'Execution paused but failed to store full trace spans', + isError: false, + status: 'pending', + }) + } + } + + async markAsFailed(errorMessage?: string): Promise { + await LoggingSession.markExecutionAsFailed(this.executionId, errorMessage, this.requestId) + } + + static async markExecutionAsFailed( + executionId: string, + errorMessage?: string, + requestId?: string + ): Promise { + try { + const message = errorMessage || 'Execution failed' + await db + .update(workflowExecutionLogs) + .set({ + status: 'failed', + executionData: sql`jsonb_set( + COALESCE(execution_data, '{}'::jsonb), + ARRAY['error'], + to_jsonb(${message}::text) + )`, + }) + .where(eq(workflowExecutionLogs.executionId, executionId)) + + logger.info(`[${requestId || 'unknown'}] Marked execution ${executionId} as failed`) + } catch (error) { + logger.error(`Failed to mark execution ${executionId} as failed:`, { + error: error instanceof Error ? error.message : String(error), + }) + } + } + private async completeWithCostOnlyLog(params: { traceSpans?: TraceSpan[] endedAt?: string totalDurationMs?: number errorMessage: string isError: boolean - status?: 'completed' | 'failed' | 'cancelled' + status?: 'completed' | 'failed' | 'cancelled' | 'pending' }): Promise { if (this.completed) { return diff --git a/apps/sim/lib/logs/types.ts b/apps/sim/lib/logs/types.ts index 6f5fa379e..cef969282 100644 --- a/apps/sim/lib/logs/types.ts +++ b/apps/sim/lib/logs/types.ts @@ -367,5 +367,9 @@ export interface ExecutionLoggerService { } finalOutput: BlockOutputData traceSpans?: TraceSpan[] + workflowInput?: any + isResume?: boolean + level?: 'info' | 'error' + status?: 'completed' | 'failed' | 'cancelled' | 'pending' }): Promise } diff --git a/apps/sim/lib/workflows/executor/execute-workflow.ts b/apps/sim/lib/workflows/executor/execute-workflow.ts index a3ac85f56..b16e6ea82 100644 --- a/apps/sim/lib/workflows/executor/execute-workflow.ts +++ b/apps/sim/lib/workflows/executor/execute-workflow.ts @@ -3,7 +3,8 @@ import { v4 as uuidv4 } from 'uuid' import { LoggingSession } from '@/lib/logs/execution/logging-session' import { executeWorkflowCore } from '@/lib/workflows/executor/execution-core' import { PauseResumeManager } from '@/lib/workflows/executor/human-in-the-loop-manager' -import { type ExecutionMetadata, ExecutionSnapshot } from '@/executor/execution/snapshot' +import { ExecutionSnapshot } from '@/executor/execution/snapshot' +import type { ExecutionMetadata } from '@/executor/execution/types' const logger = createLogger('WorkflowExecution') @@ -83,14 +84,25 @@ export async function executeWorkflow( logger.error(`[${requestId}] Missing snapshot seed for paused execution`, { executionId, }) + await loggingSession.markAsFailed('Missing snapshot seed for paused execution') } else { - await PauseResumeManager.persistPauseResult({ - workflowId, - executionId, - pausePoints: result.pausePoints || [], - snapshotSeed: result.snapshotSeed, - executorUserId: result.metadata?.userId, - }) + try { + await PauseResumeManager.persistPauseResult({ + workflowId, + executionId, + pausePoints: result.pausePoints || [], + snapshotSeed: result.snapshotSeed, + executorUserId: result.metadata?.userId, + }) + } catch (pauseError) { + logger.error(`[${requestId}] Failed to persist pause result`, { + executionId, + error: pauseError instanceof Error ? pauseError.message : String(pauseError), + }) + await loggingSession.markAsFailed( + `Failed to persist pause state: ${pauseError instanceof Error ? pauseError.message : String(pauseError)}` + ) + } } } else { await PauseResumeManager.processQueuedResumes(executionId) diff --git a/apps/sim/lib/workflows/executor/execution-core.ts b/apps/sim/lib/workflows/executor/execution-core.ts index 403d690d7..9e81d8711 100644 --- a/apps/sim/lib/workflows/executor/execution-core.ts +++ b/apps/sim/lib/workflows/executor/execution-core.ts @@ -18,7 +18,8 @@ import { TriggerUtils } from '@/lib/workflows/triggers/triggers' import { updateWorkflowRunCounts } from '@/lib/workflows/utils' import { Executor } from '@/executor' import { REFERENCE } from '@/executor/constants' -import type { ExecutionCallbacks, ExecutionSnapshot } from '@/executor/execution/snapshot' +import type { ExecutionSnapshot } from '@/executor/execution/snapshot' +import type { ExecutionCallbacks, IterationContext } from '@/executor/execution/types' import type { ExecutionResult } from '@/executor/types' import { createEnvVarPattern } from '@/executor/utils/reference-validation' import { Serializer } from '@/serializer' @@ -316,6 +317,19 @@ export async function executeWorkflowCore( }) } + const wrappedOnBlockComplete = async ( + blockId: string, + blockName: string, + blockType: string, + output: any, + iterationContext?: IterationContext + ) => { + await loggingSession.onBlockComplete(blockId, blockName, blockType, output) + if (onBlockComplete) { + await onBlockComplete(blockId, blockName, blockType, output, iterationContext) + } + } + const contextExtensions: any = { stream: !!onStream, selectedOutputs, @@ -324,7 +338,7 @@ export async function executeWorkflowCore( userId, isDeployedContext: triggerType !== 'manual', onBlockStart, - onBlockComplete, + onBlockComplete: wrappedOnBlockComplete, onStream, resumeFromSnapshot, resumePendingQueue, @@ -386,6 +400,13 @@ export async function executeWorkflowCore( } if (result.status === 'paused') { + await loggingSession.safeCompleteWithPause({ + endedAt: new Date().toISOString(), + totalDurationMs: totalDuration || 0, + traceSpans: traceSpans || [], + workflowInput: processedInput, + }) + await clearExecutionCancellation(executionId) logger.info(`[${requestId}] Workflow execution paused`, { diff --git a/apps/sim/lib/workflows/executor/human-in-the-loop-manager.ts b/apps/sim/lib/workflows/executor/human-in-the-loop-manager.ts index 5d99eabb4..d05adfb64 100644 --- a/apps/sim/lib/workflows/executor/human-in-the-loop-manager.ts +++ b/apps/sim/lib/workflows/executor/human-in-the-loop-manager.ts @@ -155,11 +155,6 @@ export class PauseResumeManager { }, }) - await db - .update(workflowExecutionLogs) - .set({ status: 'pending' }) - .where(eq(workflowExecutionLogs.executionId, executionId)) - await PauseResumeManager.processQueuedResumes(executionId) } @@ -302,18 +297,34 @@ export class PauseResumeManager { }) if (result.status === 'paused') { + const effectiveExecutionId = result.metadata?.executionId ?? resumeExecutionId if (!result.snapshotSeed) { logger.error('Missing snapshot seed for paused resume execution', { resumeExecutionId, }) + await LoggingSession.markExecutionAsFailed( + effectiveExecutionId, + 'Missing snapshot seed for paused execution' + ) } else { - await PauseResumeManager.persistPauseResult({ - workflowId: pausedExecution.workflowId, - executionId: result.metadata?.executionId ?? resumeExecutionId, - pausePoints: result.pausePoints || [], - snapshotSeed: result.snapshotSeed, - executorUserId: result.metadata?.userId, - }) + try { + await PauseResumeManager.persistPauseResult({ + workflowId: pausedExecution.workflowId, + executionId: effectiveExecutionId, + pausePoints: result.pausePoints || [], + snapshotSeed: result.snapshotSeed, + executorUserId: result.metadata?.userId, + }) + } catch (pauseError) { + logger.error('Failed to persist pause result for resumed execution', { + resumeExecutionId, + error: pauseError instanceof Error ? pauseError.message : String(pauseError), + }) + await LoggingSession.markExecutionAsFailed( + effectiveExecutionId, + `Failed to persist pause state: ${pauseError instanceof Error ? pauseError.message : String(pauseError)}` + ) + } } } else { await PauseResumeManager.updateSnapshotAfterResume({