mirror of
https://github.com/simstudioai/sim.git
synced 2026-02-11 07:04:58 -05:00
fix(billing): attribute cost to caller when info available"
This commit is contained in:
@@ -59,7 +59,6 @@ export async function POST(
|
||||
checkDeployment: false, // Resuming existing execution, deployment already checked
|
||||
skipUsageLimits: true, // Resume is continuation of authorized execution - don't recheck limits
|
||||
workspaceId: workflow.workspaceId || undefined,
|
||||
isResumeContext: true, // Enable billing fallback for paused workflow resumes
|
||||
})
|
||||
|
||||
if (!preprocessResult.success) {
|
||||
|
||||
@@ -19,94 +19,6 @@ const BILLING_ERROR_MESSAGES = {
|
||||
BILLING_ERROR_GENERIC: 'Error resolving billing account',
|
||||
} as const
|
||||
|
||||
/**
|
||||
* Attempts to resolve billing actor with fallback for resume contexts.
|
||||
* Returns the resolved actor user ID or null if resolution fails and should block execution.
|
||||
*
|
||||
* For resume contexts, this function allows fallback to the workflow owner if workspace
|
||||
* billing cannot be resolved, ensuring users can complete their paused workflows even
|
||||
* if billing configuration changes mid-execution.
|
||||
*
|
||||
* @returns Object containing actorUserId (null if should block) and shouldBlock flag
|
||||
*/
|
||||
async function resolveBillingActorWithFallback(params: {
|
||||
requestId: string
|
||||
workflowId: string
|
||||
workspaceId: string
|
||||
executionId: string
|
||||
triggerType: string
|
||||
workflowRecord: WorkflowRecord
|
||||
userId: string
|
||||
isResumeContext: boolean
|
||||
baseActorUserId: string | null
|
||||
failureReason: 'null' | 'error'
|
||||
error?: unknown
|
||||
loggingSession?: LoggingSession
|
||||
}): Promise<
|
||||
{ actorUserId: string; shouldBlock: false } | { actorUserId: null; shouldBlock: true }
|
||||
> {
|
||||
const {
|
||||
requestId,
|
||||
workflowId,
|
||||
workspaceId,
|
||||
executionId,
|
||||
triggerType,
|
||||
workflowRecord,
|
||||
userId,
|
||||
isResumeContext,
|
||||
baseActorUserId,
|
||||
failureReason,
|
||||
error,
|
||||
loggingSession,
|
||||
} = params
|
||||
|
||||
if (baseActorUserId) {
|
||||
return { actorUserId: baseActorUserId, shouldBlock: false }
|
||||
}
|
||||
|
||||
const workflowOwner = workflowRecord.userId?.trim()
|
||||
if (isResumeContext && workflowOwner) {
|
||||
const logMessage =
|
||||
failureReason === 'null'
|
||||
? '[BILLING_FALLBACK] Workspace billing account is null. Using workflow owner for billing.'
|
||||
: '[BILLING_FALLBACK] Exception during workspace billing resolution. Using workflow owner for billing.'
|
||||
|
||||
logger.warn(`[${requestId}] ${logMessage}`, {
|
||||
workflowId,
|
||||
workspaceId,
|
||||
fallbackUserId: workflowOwner,
|
||||
...(error ? { error } : {}),
|
||||
})
|
||||
|
||||
return { actorUserId: workflowOwner, shouldBlock: false }
|
||||
}
|
||||
|
||||
const fallbackUserId = workflowRecord.userId || userId || 'unknown'
|
||||
const errorMessage =
|
||||
failureReason === 'null'
|
||||
? BILLING_ERROR_MESSAGES.BILLING_REQUIRED
|
||||
: BILLING_ERROR_MESSAGES.BILLING_ERROR_GENERIC
|
||||
|
||||
logger.warn(`[${requestId}] ${errorMessage}`, {
|
||||
workflowId,
|
||||
workspaceId,
|
||||
...(error ? { error } : {}),
|
||||
})
|
||||
|
||||
await logPreprocessingError({
|
||||
workflowId,
|
||||
executionId,
|
||||
triggerType,
|
||||
requestId,
|
||||
userId: fallbackUserId,
|
||||
workspaceId,
|
||||
errorMessage,
|
||||
loggingSession,
|
||||
})
|
||||
|
||||
return { actorUserId: null, shouldBlock: true }
|
||||
}
|
||||
|
||||
export interface PreprocessExecutionOptions {
|
||||
// Required fields
|
||||
workflowId: string
|
||||
@@ -123,7 +35,7 @@ export interface PreprocessExecutionOptions {
|
||||
// Context information
|
||||
workspaceId?: string // If known, used for billing resolution
|
||||
loggingSession?: LoggingSession // If provided, will be used for error logging
|
||||
isResumeContext?: boolean // If true, allows fallback billing on resolution failure (for paused workflow resumes)
|
||||
isResumeContext?: boolean // Deprecated: no billing fallback is allowed
|
||||
useAuthenticatedUserAsActor?: boolean // If true, use the authenticated userId as actorUserId (for client-side executions and personal API keys)
|
||||
/** @deprecated No longer used - background/async executions always use deployed state */
|
||||
useDraftState?: boolean
|
||||
@@ -170,7 +82,7 @@ export async function preprocessExecution(
|
||||
skipUsageLimits = false,
|
||||
workspaceId: providedWorkspaceId,
|
||||
loggingSession: providedLoggingSession,
|
||||
isResumeContext = false,
|
||||
isResumeContext: _isResumeContext = false,
|
||||
useAuthenticatedUserAsActor = false,
|
||||
} = options
|
||||
|
||||
@@ -274,68 +186,54 @@ export async function preprocessExecution(
|
||||
}
|
||||
|
||||
if (!actorUserId) {
|
||||
actorUserId = workflowRecord.userId || userId
|
||||
logger.info(`[${requestId}] Using workflow owner as actor: ${actorUserId}`)
|
||||
}
|
||||
|
||||
if (!actorUserId) {
|
||||
const result = await resolveBillingActorWithFallback({
|
||||
requestId,
|
||||
const fallbackUserId = userId || workflowRecord.userId || 'unknown'
|
||||
logger.warn(`[${requestId}] ${BILLING_ERROR_MESSAGES.BILLING_REQUIRED}`, {
|
||||
workflowId,
|
||||
workspaceId,
|
||||
})
|
||||
|
||||
await logPreprocessingError({
|
||||
workflowId,
|
||||
executionId,
|
||||
triggerType,
|
||||
workflowRecord,
|
||||
userId,
|
||||
isResumeContext,
|
||||
baseActorUserId: actorUserId,
|
||||
failureReason: 'null',
|
||||
requestId,
|
||||
userId: fallbackUserId,
|
||||
workspaceId,
|
||||
errorMessage: BILLING_ERROR_MESSAGES.BILLING_REQUIRED,
|
||||
loggingSession: providedLoggingSession,
|
||||
})
|
||||
|
||||
if (result.shouldBlock) {
|
||||
return {
|
||||
success: false,
|
||||
error: {
|
||||
message: 'Unable to resolve billing account',
|
||||
statusCode: 500,
|
||||
logCreated: true,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
actorUserId = result.actorUserId
|
||||
}
|
||||
} catch (error) {
|
||||
logger.error(`[${requestId}] Error resolving billing actor`, { error, workflowId })
|
||||
|
||||
const result = await resolveBillingActorWithFallback({
|
||||
requestId,
|
||||
workflowId,
|
||||
workspaceId,
|
||||
executionId,
|
||||
triggerType,
|
||||
workflowRecord,
|
||||
userId,
|
||||
isResumeContext,
|
||||
baseActorUserId: null,
|
||||
failureReason: 'error',
|
||||
error,
|
||||
loggingSession: providedLoggingSession,
|
||||
})
|
||||
|
||||
if (result.shouldBlock) {
|
||||
return {
|
||||
success: false,
|
||||
error: {
|
||||
message: 'Error resolving billing account',
|
||||
message: 'Unable to resolve billing account',
|
||||
statusCode: 500,
|
||||
logCreated: true,
|
||||
},
|
||||
}
|
||||
}
|
||||
} catch (error) {
|
||||
logger.error(`[${requestId}] Error resolving billing actor`, { error, workflowId })
|
||||
const fallbackUserId = userId || workflowRecord.userId || 'unknown'
|
||||
await logPreprocessingError({
|
||||
workflowId,
|
||||
executionId,
|
||||
triggerType,
|
||||
requestId,
|
||||
userId: fallbackUserId,
|
||||
workspaceId,
|
||||
errorMessage: BILLING_ERROR_MESSAGES.BILLING_ERROR_GENERIC,
|
||||
loggingSession: providedLoggingSession,
|
||||
})
|
||||
|
||||
actorUserId = result.actorUserId
|
||||
return {
|
||||
success: false,
|
||||
error: {
|
||||
message: 'Error resolving billing account',
|
||||
statusCode: 500,
|
||||
logCreated: true,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// ========== STEP 4: Get User Subscription ==========
|
||||
|
||||
@@ -33,7 +33,6 @@ import type {
|
||||
WorkflowExecutionSnapshot,
|
||||
WorkflowState,
|
||||
} from '@/lib/logs/types'
|
||||
import { getWorkspaceBilledAccountUserId } from '@/lib/workspaces/utils'
|
||||
import type { SerializableExecutionState } from '@/executor/execution/types'
|
||||
|
||||
export interface ToolCall {
|
||||
@@ -210,16 +209,15 @@ export class ExecutionLogger implements IExecutionLoggerService {
|
||||
|
||||
logger.debug(`Completing workflow execution ${executionId}`, { isResume })
|
||||
|
||||
// If this is a resume, fetch the existing log to merge data
|
||||
let existingLog: any = null
|
||||
if (isResume) {
|
||||
const [existing] = await db
|
||||
.select()
|
||||
.from(workflowExecutionLogs)
|
||||
.where(eq(workflowExecutionLogs.executionId, executionId))
|
||||
.limit(1)
|
||||
existingLog = existing
|
||||
}
|
||||
const [existingLog] = await db
|
||||
.select()
|
||||
.from(workflowExecutionLogs)
|
||||
.where(eq(workflowExecutionLogs.executionId, executionId))
|
||||
.limit(1)
|
||||
const billingUserId = this.extractBillingUserId(existingLog?.executionData)
|
||||
const existingExecutionData = existingLog?.executionData as
|
||||
| { traceSpans?: TraceSpan[] }
|
||||
| undefined
|
||||
|
||||
// Determine if workflow failed by checking trace spans for errors
|
||||
// Use the override if provided (for cost-only fallback scenarios)
|
||||
@@ -244,7 +242,7 @@ export class ExecutionLogger implements IExecutionLoggerService {
|
||||
const mergedTraceSpans = isResume
|
||||
? traceSpans && traceSpans.length > 0
|
||||
? traceSpans
|
||||
: existingLog?.executionData?.traceSpans || []
|
||||
: existingExecutionData?.traceSpans || []
|
||||
: traceSpans
|
||||
|
||||
const filteredTraceSpans = filterForDisplay(mergedTraceSpans)
|
||||
@@ -329,7 +327,8 @@ export class ExecutionLogger implements IExecutionLoggerService {
|
||||
updatedLog.workflowId,
|
||||
costSummary,
|
||||
updatedLog.trigger as ExecutionTrigger['type'],
|
||||
executionId
|
||||
executionId,
|
||||
billingUserId
|
||||
)
|
||||
|
||||
const limit = before.usageData.limit
|
||||
@@ -367,7 +366,8 @@ export class ExecutionLogger implements IExecutionLoggerService {
|
||||
updatedLog.workflowId,
|
||||
costSummary,
|
||||
updatedLog.trigger as ExecutionTrigger['type'],
|
||||
executionId
|
||||
executionId,
|
||||
billingUserId
|
||||
)
|
||||
|
||||
const percentBefore =
|
||||
@@ -393,7 +393,8 @@ export class ExecutionLogger implements IExecutionLoggerService {
|
||||
updatedLog.workflowId,
|
||||
costSummary,
|
||||
updatedLog.trigger as ExecutionTrigger['type'],
|
||||
executionId
|
||||
executionId,
|
||||
billingUserId
|
||||
)
|
||||
}
|
||||
} else {
|
||||
@@ -401,7 +402,8 @@ export class ExecutionLogger implements IExecutionLoggerService {
|
||||
updatedLog.workflowId,
|
||||
costSummary,
|
||||
updatedLog.trigger as ExecutionTrigger['type'],
|
||||
executionId
|
||||
executionId,
|
||||
billingUserId
|
||||
)
|
||||
}
|
||||
} catch (e) {
|
||||
@@ -410,7 +412,8 @@ export class ExecutionLogger implements IExecutionLoggerService {
|
||||
updatedLog.workflowId,
|
||||
costSummary,
|
||||
updatedLog.trigger as ExecutionTrigger['type'],
|
||||
executionId
|
||||
executionId,
|
||||
billingUserId
|
||||
)
|
||||
} catch {}
|
||||
logger.warn('Usage threshold notification check failed (non-fatal)', { error: e })
|
||||
@@ -472,6 +475,22 @@ export class ExecutionLogger implements IExecutionLoggerService {
|
||||
* Updates user stats with cost and token information
|
||||
* Maintains same logic as original execution logger for billing consistency
|
||||
*/
|
||||
private extractBillingUserId(executionData: unknown): string | null {
|
||||
if (!executionData || typeof executionData !== 'object') {
|
||||
return null
|
||||
}
|
||||
|
||||
const environment = (executionData as { environment?: { userId?: unknown } }).environment
|
||||
const userId = environment?.userId
|
||||
|
||||
if (typeof userId !== 'string') {
|
||||
return null
|
||||
}
|
||||
|
||||
const trimmedUserId = userId.trim()
|
||||
return trimmedUserId.length > 0 ? trimmedUserId : null
|
||||
}
|
||||
|
||||
private async updateUserStats(
|
||||
workflowId: string | null,
|
||||
costSummary: {
|
||||
@@ -494,7 +513,8 @@ export class ExecutionLogger implements IExecutionLoggerService {
|
||||
>
|
||||
},
|
||||
trigger: ExecutionTrigger['type'],
|
||||
executionId?: string
|
||||
executionId?: string,
|
||||
billingUserId?: string | null
|
||||
): Promise<void> {
|
||||
if (!isBillingEnabled) {
|
||||
logger.debug('Billing is disabled, skipping user stats cost update')
|
||||
@@ -512,7 +532,6 @@ export class ExecutionLogger implements IExecutionLoggerService {
|
||||
}
|
||||
|
||||
try {
|
||||
// Get the workflow record to get workspace and fallback userId
|
||||
const [workflowRecord] = await db
|
||||
.select()
|
||||
.from(workflow)
|
||||
@@ -524,12 +543,16 @@ export class ExecutionLogger implements IExecutionLoggerService {
|
||||
return
|
||||
}
|
||||
|
||||
let billingUserId: string | null = null
|
||||
if (workflowRecord.workspaceId) {
|
||||
billingUserId = await getWorkspaceBilledAccountUserId(workflowRecord.workspaceId)
|
||||
const userId = billingUserId?.trim() || null
|
||||
if (!userId) {
|
||||
logger.error('Missing billing actor in execution context; skipping stats update', {
|
||||
workflowId,
|
||||
trigger,
|
||||
executionId,
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
const userId = billingUserId || workflowRecord.userId
|
||||
const costToStore = costSummary.totalCost
|
||||
|
||||
const existing = await db.select().from(userStats).where(eq(userStats.userId, userId))
|
||||
|
||||
@@ -739,7 +739,6 @@ export class PauseResumeManager {
|
||||
skipUsageLimits: true, // Resume is continuation of authorized execution - don't recheck limits
|
||||
workspaceId: baseSnapshot.metadata.workspaceId,
|
||||
loggingSession,
|
||||
isResumeContext: true, // Enable billing fallback for paused workflow resumes
|
||||
})
|
||||
|
||||
if (!preprocessingResult.success) {
|
||||
|
||||
Reference in New Issue
Block a user