From 0ff7e57f99ead61c515b14eafc334a5804ce0303 Mon Sep 17 00:00:00 2001 From: Vikhyath Mondreti Date: Tue, 10 Feb 2026 11:41:45 -0800 Subject: [PATCH] fix(billing): attribute cost to caller when info available" --- .../[executionId]/[contextId]/route.ts | 1 - apps/sim/lib/execution/preprocessing.ts | 170 ++++-------------- apps/sim/lib/logs/execution/logger.ts | 69 ++++--- .../executor/human-in-the-loop-manager.ts | 1 - 4 files changed, 80 insertions(+), 161 deletions(-) diff --git a/apps/sim/app/api/resume/[workflowId]/[executionId]/[contextId]/route.ts b/apps/sim/app/api/resume/[workflowId]/[executionId]/[contextId]/route.ts index 6c0f44ff5..0f3252d85 100644 --- a/apps/sim/app/api/resume/[workflowId]/[executionId]/[contextId]/route.ts +++ b/apps/sim/app/api/resume/[workflowId]/[executionId]/[contextId]/route.ts @@ -59,7 +59,6 @@ export async function POST( checkDeployment: false, // Resuming existing execution, deployment already checked skipUsageLimits: true, // Resume is continuation of authorized execution - don't recheck limits workspaceId: workflow.workspaceId || undefined, - isResumeContext: true, // Enable billing fallback for paused workflow resumes }) if (!preprocessResult.success) { diff --git a/apps/sim/lib/execution/preprocessing.ts b/apps/sim/lib/execution/preprocessing.ts index 3eb14813e..5b0a65262 100644 --- a/apps/sim/lib/execution/preprocessing.ts +++ b/apps/sim/lib/execution/preprocessing.ts @@ -19,94 +19,6 @@ const BILLING_ERROR_MESSAGES = { BILLING_ERROR_GENERIC: 'Error resolving billing account', } as const -/** - * Attempts to resolve billing actor with fallback for resume contexts. - * Returns the resolved actor user ID or null if resolution fails and should block execution. - * - * For resume contexts, this function allows fallback to the workflow owner if workspace - * billing cannot be resolved, ensuring users can complete their paused workflows even - * if billing configuration changes mid-execution. - * - * @returns Object containing actorUserId (null if should block) and shouldBlock flag - */ -async function resolveBillingActorWithFallback(params: { - requestId: string - workflowId: string - workspaceId: string - executionId: string - triggerType: string - workflowRecord: WorkflowRecord - userId: string - isResumeContext: boolean - baseActorUserId: string | null - failureReason: 'null' | 'error' - error?: unknown - loggingSession?: LoggingSession -}): Promise< - { actorUserId: string; shouldBlock: false } | { actorUserId: null; shouldBlock: true } -> { - const { - requestId, - workflowId, - workspaceId, - executionId, - triggerType, - workflowRecord, - userId, - isResumeContext, - baseActorUserId, - failureReason, - error, - loggingSession, - } = params - - if (baseActorUserId) { - return { actorUserId: baseActorUserId, shouldBlock: false } - } - - const workflowOwner = workflowRecord.userId?.trim() - if (isResumeContext && workflowOwner) { - const logMessage = - failureReason === 'null' - ? '[BILLING_FALLBACK] Workspace billing account is null. Using workflow owner for billing.' - : '[BILLING_FALLBACK] Exception during workspace billing resolution. Using workflow owner for billing.' - - logger.warn(`[${requestId}] ${logMessage}`, { - workflowId, - workspaceId, - fallbackUserId: workflowOwner, - ...(error ? { error } : {}), - }) - - return { actorUserId: workflowOwner, shouldBlock: false } - } - - const fallbackUserId = workflowRecord.userId || userId || 'unknown' - const errorMessage = - failureReason === 'null' - ? BILLING_ERROR_MESSAGES.BILLING_REQUIRED - : BILLING_ERROR_MESSAGES.BILLING_ERROR_GENERIC - - logger.warn(`[${requestId}] ${errorMessage}`, { - workflowId, - workspaceId, - ...(error ? { error } : {}), - }) - - await logPreprocessingError({ - workflowId, - executionId, - triggerType, - requestId, - userId: fallbackUserId, - workspaceId, - errorMessage, - loggingSession, - }) - - return { actorUserId: null, shouldBlock: true } -} - export interface PreprocessExecutionOptions { // Required fields workflowId: string @@ -123,7 +35,7 @@ export interface PreprocessExecutionOptions { // Context information workspaceId?: string // If known, used for billing resolution loggingSession?: LoggingSession // If provided, will be used for error logging - isResumeContext?: boolean // If true, allows fallback billing on resolution failure (for paused workflow resumes) + isResumeContext?: boolean // Deprecated: no billing fallback is allowed useAuthenticatedUserAsActor?: boolean // If true, use the authenticated userId as actorUserId (for client-side executions and personal API keys) /** @deprecated No longer used - background/async executions always use deployed state */ useDraftState?: boolean @@ -170,7 +82,7 @@ export async function preprocessExecution( skipUsageLimits = false, workspaceId: providedWorkspaceId, loggingSession: providedLoggingSession, - isResumeContext = false, + isResumeContext: _isResumeContext = false, useAuthenticatedUserAsActor = false, } = options @@ -274,68 +186,54 @@ export async function preprocessExecution( } if (!actorUserId) { - actorUserId = workflowRecord.userId || userId - logger.info(`[${requestId}] Using workflow owner as actor: ${actorUserId}`) - } - - if (!actorUserId) { - const result = await resolveBillingActorWithFallback({ - requestId, + const fallbackUserId = userId || workflowRecord.userId || 'unknown' + logger.warn(`[${requestId}] ${BILLING_ERROR_MESSAGES.BILLING_REQUIRED}`, { workflowId, workspaceId, + }) + + await logPreprocessingError({ + workflowId, executionId, triggerType, - workflowRecord, - userId, - isResumeContext, - baseActorUserId: actorUserId, - failureReason: 'null', + requestId, + userId: fallbackUserId, + workspaceId, + errorMessage: BILLING_ERROR_MESSAGES.BILLING_REQUIRED, loggingSession: providedLoggingSession, }) - if (result.shouldBlock) { - return { - success: false, - error: { - message: 'Unable to resolve billing account', - statusCode: 500, - logCreated: true, - }, - } - } - - actorUserId = result.actorUserId - } - } catch (error) { - logger.error(`[${requestId}] Error resolving billing actor`, { error, workflowId }) - - const result = await resolveBillingActorWithFallback({ - requestId, - workflowId, - workspaceId, - executionId, - triggerType, - workflowRecord, - userId, - isResumeContext, - baseActorUserId: null, - failureReason: 'error', - error, - loggingSession: providedLoggingSession, - }) - - if (result.shouldBlock) { return { success: false, error: { - message: 'Error resolving billing account', + message: 'Unable to resolve billing account', statusCode: 500, logCreated: true, }, } } + } catch (error) { + logger.error(`[${requestId}] Error resolving billing actor`, { error, workflowId }) + const fallbackUserId = userId || workflowRecord.userId || 'unknown' + await logPreprocessingError({ + workflowId, + executionId, + triggerType, + requestId, + userId: fallbackUserId, + workspaceId, + errorMessage: BILLING_ERROR_MESSAGES.BILLING_ERROR_GENERIC, + loggingSession: providedLoggingSession, + }) - actorUserId = result.actorUserId + return { + success: false, + error: { + message: 'Error resolving billing account', + statusCode: 500, + logCreated: true, + }, + } } // ========== STEP 4: Get User Subscription ========== diff --git a/apps/sim/lib/logs/execution/logger.ts b/apps/sim/lib/logs/execution/logger.ts index 3c8fa4224..3cecd3585 100644 --- a/apps/sim/lib/logs/execution/logger.ts +++ b/apps/sim/lib/logs/execution/logger.ts @@ -33,7 +33,6 @@ import type { WorkflowExecutionSnapshot, WorkflowState, } from '@/lib/logs/types' -import { getWorkspaceBilledAccountUserId } from '@/lib/workspaces/utils' import type { SerializableExecutionState } from '@/executor/execution/types' export interface ToolCall { @@ -210,16 +209,15 @@ export class ExecutionLogger implements IExecutionLoggerService { logger.debug(`Completing workflow execution ${executionId}`, { isResume }) - // If this is a resume, fetch the existing log to merge data - let existingLog: any = null - if (isResume) { - const [existing] = await db - .select() - .from(workflowExecutionLogs) - .where(eq(workflowExecutionLogs.executionId, executionId)) - .limit(1) - existingLog = existing - } + const [existingLog] = await db + .select() + .from(workflowExecutionLogs) + .where(eq(workflowExecutionLogs.executionId, executionId)) + .limit(1) + const billingUserId = this.extractBillingUserId(existingLog?.executionData) + const existingExecutionData = existingLog?.executionData as + | { traceSpans?: TraceSpan[] } + | undefined // Determine if workflow failed by checking trace spans for errors // Use the override if provided (for cost-only fallback scenarios) @@ -244,7 +242,7 @@ export class ExecutionLogger implements IExecutionLoggerService { const mergedTraceSpans = isResume ? traceSpans && traceSpans.length > 0 ? traceSpans - : existingLog?.executionData?.traceSpans || [] + : existingExecutionData?.traceSpans || [] : traceSpans const filteredTraceSpans = filterForDisplay(mergedTraceSpans) @@ -329,7 +327,8 @@ export class ExecutionLogger implements IExecutionLoggerService { updatedLog.workflowId, costSummary, updatedLog.trigger as ExecutionTrigger['type'], - executionId + executionId, + billingUserId ) const limit = before.usageData.limit @@ -367,7 +366,8 @@ export class ExecutionLogger implements IExecutionLoggerService { updatedLog.workflowId, costSummary, updatedLog.trigger as ExecutionTrigger['type'], - executionId + executionId, + billingUserId ) const percentBefore = @@ -393,7 +393,8 @@ export class ExecutionLogger implements IExecutionLoggerService { updatedLog.workflowId, costSummary, updatedLog.trigger as ExecutionTrigger['type'], - executionId + executionId, + billingUserId ) } } else { @@ -401,7 +402,8 @@ export class ExecutionLogger implements IExecutionLoggerService { updatedLog.workflowId, costSummary, updatedLog.trigger as ExecutionTrigger['type'], - executionId + executionId, + billingUserId ) } } catch (e) { @@ -410,7 +412,8 @@ export class ExecutionLogger implements IExecutionLoggerService { updatedLog.workflowId, costSummary, updatedLog.trigger as ExecutionTrigger['type'], - executionId + executionId, + billingUserId ) } catch {} logger.warn('Usage threshold notification check failed (non-fatal)', { error: e }) @@ -472,6 +475,22 @@ export class ExecutionLogger implements IExecutionLoggerService { * Updates user stats with cost and token information * Maintains same logic as original execution logger for billing consistency */ + private extractBillingUserId(executionData: unknown): string | null { + if (!executionData || typeof executionData !== 'object') { + return null + } + + const environment = (executionData as { environment?: { userId?: unknown } }).environment + const userId = environment?.userId + + if (typeof userId !== 'string') { + return null + } + + const trimmedUserId = userId.trim() + return trimmedUserId.length > 0 ? trimmedUserId : null + } + private async updateUserStats( workflowId: string | null, costSummary: { @@ -494,7 +513,8 @@ export class ExecutionLogger implements IExecutionLoggerService { > }, trigger: ExecutionTrigger['type'], - executionId?: string + executionId?: string, + billingUserId?: string | null ): Promise { if (!isBillingEnabled) { logger.debug('Billing is disabled, skipping user stats cost update') @@ -512,7 +532,6 @@ export class ExecutionLogger implements IExecutionLoggerService { } try { - // Get the workflow record to get workspace and fallback userId const [workflowRecord] = await db .select() .from(workflow) @@ -524,12 +543,16 @@ export class ExecutionLogger implements IExecutionLoggerService { return } - let billingUserId: string | null = null - if (workflowRecord.workspaceId) { - billingUserId = await getWorkspaceBilledAccountUserId(workflowRecord.workspaceId) + const userId = billingUserId?.trim() || null + if (!userId) { + logger.error('Missing billing actor in execution context; skipping stats update', { + workflowId, + trigger, + executionId, + }) + return } - const userId = billingUserId || workflowRecord.userId const costToStore = costSummary.totalCost const existing = await db.select().from(userStats).where(eq(userStats.userId, userId)) diff --git a/apps/sim/lib/workflows/executor/human-in-the-loop-manager.ts b/apps/sim/lib/workflows/executor/human-in-the-loop-manager.ts index ee176ad9b..5649cf40e 100644 --- a/apps/sim/lib/workflows/executor/human-in-the-loop-manager.ts +++ b/apps/sim/lib/workflows/executor/human-in-the-loop-manager.ts @@ -739,7 +739,6 @@ export class PauseResumeManager { skipUsageLimits: true, // Resume is continuation of authorized execution - don't recheck limits workspaceId: baseSnapshot.metadata.workspaceId, loggingSession, - isResumeContext: true, // Enable billing fallback for paused workflow resumes }) if (!preprocessingResult.success) {