mirror of
https://github.com/simstudioai/sim.git
synced 2026-01-08 22:48:14 -05:00
feat(audit): added audit log for billing line items (#2500)
* feat(audit): added audit log for billing line items * remove migration * reran migrations after resolving merge conflict * ack PR comment
This commit is contained in:
@@ -3,6 +3,7 @@ import { userStats } from '@sim/db/schema'
|
||||
import { eq, sql } from 'drizzle-orm'
|
||||
import { type NextRequest, NextResponse } from 'next/server'
|
||||
import { z } from 'zod'
|
||||
import { logModelUsage } from '@/lib/billing/core/usage-log'
|
||||
import { checkAndBillOverageThreshold } from '@/lib/billing/threshold-billing'
|
||||
import { checkInternalApiKey } from '@/lib/copilot/utils'
|
||||
import { isBillingEnabled } from '@/lib/core/config/feature-flags'
|
||||
@@ -14,6 +15,9 @@ const logger = createLogger('BillingUpdateCostAPI')
|
||||
const UpdateCostSchema = z.object({
|
||||
userId: z.string().min(1, 'User ID is required'),
|
||||
cost: z.number().min(0, 'Cost must be a non-negative number'),
|
||||
model: z.string().min(1, 'Model is required'),
|
||||
inputTokens: z.number().min(0).default(0),
|
||||
outputTokens: z.number().min(0).default(0),
|
||||
})
|
||||
|
||||
/**
|
||||
@@ -71,11 +75,12 @@ export async function POST(req: NextRequest) {
|
||||
)
|
||||
}
|
||||
|
||||
const { userId, cost } = validation.data
|
||||
const { userId, cost, model, inputTokens, outputTokens } = validation.data
|
||||
|
||||
logger.info(`[${requestId}] Processing cost update`, {
|
||||
userId,
|
||||
cost,
|
||||
model,
|
||||
})
|
||||
|
||||
// Check if user stats record exists (same as ExecutionLogger)
|
||||
@@ -107,6 +112,16 @@ export async function POST(req: NextRequest) {
|
||||
addedCost: cost,
|
||||
})
|
||||
|
||||
// Log usage for complete audit trail
|
||||
await logModelUsage({
|
||||
userId,
|
||||
source: 'copilot',
|
||||
model,
|
||||
inputTokens,
|
||||
outputTokens,
|
||||
cost,
|
||||
})
|
||||
|
||||
// Check if user has hit overage threshold and bill incrementally
|
||||
await checkAndBillOverageThreshold(userId)
|
||||
|
||||
|
||||
105
apps/sim/app/api/users/me/usage-logs/route.ts
Normal file
105
apps/sim/app/api/users/me/usage-logs/route.ts
Normal file
@@ -0,0 +1,105 @@
|
||||
import { type NextRequest, NextResponse } from 'next/server'
|
||||
import { z } from 'zod'
|
||||
import { checkHybridAuth } from '@/lib/auth/hybrid'
|
||||
import { getUserUsageLogs, type UsageLogSource } from '@/lib/billing/core/usage-log'
|
||||
import { createLogger } from '@/lib/logs/console/logger'
|
||||
|
||||
const logger = createLogger('UsageLogsAPI')
|
||||
|
||||
const QuerySchema = z.object({
|
||||
source: z.enum(['workflow', 'wand', 'copilot']).optional(),
|
||||
workspaceId: z.string().optional(),
|
||||
period: z.enum(['1d', '7d', '30d', 'all']).optional().default('30d'),
|
||||
limit: z.coerce.number().min(1).max(100).optional().default(50),
|
||||
cursor: z.string().optional(),
|
||||
})
|
||||
|
||||
/**
|
||||
* GET /api/users/me/usage-logs
|
||||
* Get usage logs for the authenticated user
|
||||
*/
|
||||
export async function GET(req: NextRequest) {
|
||||
try {
|
||||
const auth = await checkHybridAuth(req, { requireWorkflowId: false })
|
||||
|
||||
if (!auth.success || !auth.userId) {
|
||||
return NextResponse.json({ error: 'Unauthorized' }, { status: 401 })
|
||||
}
|
||||
|
||||
const userId = auth.userId
|
||||
|
||||
const { searchParams } = new URL(req.url)
|
||||
const queryParams = {
|
||||
source: searchParams.get('source') || undefined,
|
||||
workspaceId: searchParams.get('workspaceId') || undefined,
|
||||
period: searchParams.get('period') || '30d',
|
||||
limit: searchParams.get('limit') || '50',
|
||||
cursor: searchParams.get('cursor') || undefined,
|
||||
}
|
||||
|
||||
const validation = QuerySchema.safeParse(queryParams)
|
||||
|
||||
if (!validation.success) {
|
||||
return NextResponse.json(
|
||||
{
|
||||
error: 'Invalid query parameters',
|
||||
details: validation.error.issues,
|
||||
},
|
||||
{ status: 400 }
|
||||
)
|
||||
}
|
||||
|
||||
const { source, workspaceId, period, limit, cursor } = validation.data
|
||||
|
||||
let startDate: Date | undefined
|
||||
const endDate = new Date()
|
||||
|
||||
if (period !== 'all') {
|
||||
startDate = new Date()
|
||||
switch (period) {
|
||||
case '1d':
|
||||
startDate.setDate(startDate.getDate() - 1)
|
||||
break
|
||||
case '7d':
|
||||
startDate.setDate(startDate.getDate() - 7)
|
||||
break
|
||||
case '30d':
|
||||
startDate.setDate(startDate.getDate() - 30)
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
const result = await getUserUsageLogs(userId, {
|
||||
source: source as UsageLogSource | undefined,
|
||||
workspaceId,
|
||||
startDate,
|
||||
endDate,
|
||||
limit,
|
||||
cursor,
|
||||
})
|
||||
|
||||
logger.debug('Retrieved usage logs', {
|
||||
userId,
|
||||
source,
|
||||
period,
|
||||
logCount: result.logs.length,
|
||||
hasMore: result.pagination.hasMore,
|
||||
})
|
||||
|
||||
return NextResponse.json({
|
||||
success: true,
|
||||
...result,
|
||||
})
|
||||
} catch (error) {
|
||||
logger.error('Failed to get usage logs', {
|
||||
error: error instanceof Error ? error.message : String(error),
|
||||
})
|
||||
|
||||
return NextResponse.json(
|
||||
{
|
||||
error: 'Failed to retrieve usage logs',
|
||||
},
|
||||
{ status: 500 }
|
||||
)
|
||||
}
|
||||
}
|
||||
@@ -4,6 +4,7 @@ import { eq, sql } from 'drizzle-orm'
|
||||
import { type NextRequest, NextResponse } from 'next/server'
|
||||
import OpenAI, { AzureOpenAI } from 'openai'
|
||||
import { getSession } from '@/lib/auth'
|
||||
import { logModelUsage } from '@/lib/billing/core/usage-log'
|
||||
import { checkAndBillOverageThreshold } from '@/lib/billing/threshold-billing'
|
||||
import { env } from '@/lib/core/config/env'
|
||||
import { getCostMultiplier, isBillingEnabled } from '@/lib/core/config/feature-flags'
|
||||
@@ -88,7 +89,7 @@ async function updateUserStatsForWand(
|
||||
|
||||
try {
|
||||
const [workflowRecord] = await db
|
||||
.select({ userId: workflow.userId })
|
||||
.select({ userId: workflow.userId, workspaceId: workflow.workspaceId })
|
||||
.from(workflow)
|
||||
.where(eq(workflow.id, workflowId))
|
||||
.limit(1)
|
||||
@@ -101,6 +102,7 @@ async function updateUserStatsForWand(
|
||||
}
|
||||
|
||||
const userId = workflowRecord.userId
|
||||
const workspaceId = workflowRecord.workspaceId
|
||||
const totalTokens = usage.total_tokens || 0
|
||||
const promptTokens = usage.prompt_tokens || 0
|
||||
const completionTokens = usage.completion_tokens || 0
|
||||
@@ -137,6 +139,17 @@ async function updateUserStatsForWand(
|
||||
costAdded: costToStore,
|
||||
})
|
||||
|
||||
await logModelUsage({
|
||||
userId,
|
||||
source: 'wand',
|
||||
model: modelName,
|
||||
inputTokens: promptTokens,
|
||||
outputTokens: completionTokens,
|
||||
cost: costToStore,
|
||||
workspaceId: workspaceId ?? undefined,
|
||||
workflowId,
|
||||
})
|
||||
|
||||
await checkAndBillOverageThreshold(userId)
|
||||
} catch (error) {
|
||||
logger.error(`[${requestId}] Failed to update user stats for wand usage`, error)
|
||||
|
||||
421
apps/sim/lib/billing/core/usage-log.ts
Normal file
421
apps/sim/lib/billing/core/usage-log.ts
Normal file
@@ -0,0 +1,421 @@
|
||||
import { db } from '@sim/db'
|
||||
import { usageLog, workflow } from '@sim/db/schema'
|
||||
import { and, desc, eq, gte, lte, sql } from 'drizzle-orm'
|
||||
import { isBillingEnabled } from '@/lib/core/config/feature-flags'
|
||||
import { createLogger } from '@/lib/logs/console/logger'
|
||||
|
||||
const logger = createLogger('UsageLog')
|
||||
|
||||
/**
|
||||
* Usage log category types
|
||||
*/
|
||||
export type UsageLogCategory = 'model' | 'fixed'
|
||||
|
||||
/**
|
||||
* Usage log source types
|
||||
*/
|
||||
export type UsageLogSource = 'workflow' | 'wand' | 'copilot'
|
||||
|
||||
/**
|
||||
* Metadata for 'model' category charges
|
||||
*/
|
||||
export interface ModelUsageMetadata {
|
||||
inputTokens: number
|
||||
outputTokens: number
|
||||
}
|
||||
|
||||
/**
|
||||
* Metadata for 'fixed' category charges (currently empty, extensible)
|
||||
*/
|
||||
export type FixedUsageMetadata = Record<string, never>
|
||||
|
||||
/**
|
||||
* Union type for all metadata types
|
||||
*/
|
||||
export type UsageLogMetadata = ModelUsageMetadata | FixedUsageMetadata | null
|
||||
|
||||
/**
|
||||
* Parameters for logging model usage (token-based charges)
|
||||
*/
|
||||
export interface LogModelUsageParams {
|
||||
userId: string
|
||||
source: UsageLogSource
|
||||
model: string
|
||||
inputTokens: number
|
||||
outputTokens: number
|
||||
cost: number
|
||||
workspaceId?: string
|
||||
workflowId?: string
|
||||
executionId?: string
|
||||
}
|
||||
|
||||
/**
|
||||
* Parameters for logging fixed charges (flat fees)
|
||||
*/
|
||||
export interface LogFixedUsageParams {
|
||||
userId: string
|
||||
source: UsageLogSource
|
||||
description: string
|
||||
cost: number
|
||||
workspaceId?: string
|
||||
workflowId?: string
|
||||
executionId?: string
|
||||
}
|
||||
|
||||
/**
|
||||
* Log a model usage charge (token-based)
|
||||
*/
|
||||
export async function logModelUsage(params: LogModelUsageParams): Promise<void> {
|
||||
if (!isBillingEnabled) {
|
||||
return
|
||||
}
|
||||
|
||||
try {
|
||||
const metadata: ModelUsageMetadata = {
|
||||
inputTokens: params.inputTokens,
|
||||
outputTokens: params.outputTokens,
|
||||
}
|
||||
|
||||
await db.insert(usageLog).values({
|
||||
id: crypto.randomUUID(),
|
||||
userId: params.userId,
|
||||
category: 'model',
|
||||
source: params.source,
|
||||
description: params.model,
|
||||
metadata,
|
||||
cost: params.cost.toString(),
|
||||
workspaceId: params.workspaceId ?? null,
|
||||
workflowId: params.workflowId ?? null,
|
||||
executionId: params.executionId ?? null,
|
||||
})
|
||||
|
||||
logger.debug('Logged model usage', {
|
||||
userId: params.userId,
|
||||
source: params.source,
|
||||
model: params.model,
|
||||
cost: params.cost,
|
||||
})
|
||||
} catch (error) {
|
||||
logger.error('Failed to log model usage', {
|
||||
error: error instanceof Error ? error.message : String(error),
|
||||
params,
|
||||
})
|
||||
// Don't throw - usage logging should not break the main flow
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Log a fixed charge (flat fee like base execution charge or search)
|
||||
*/
|
||||
export async function logFixedUsage(params: LogFixedUsageParams): Promise<void> {
|
||||
if (!isBillingEnabled) {
|
||||
return
|
||||
}
|
||||
|
||||
try {
|
||||
await db.insert(usageLog).values({
|
||||
id: crypto.randomUUID(),
|
||||
userId: params.userId,
|
||||
category: 'fixed',
|
||||
source: params.source,
|
||||
description: params.description,
|
||||
metadata: null,
|
||||
cost: params.cost.toString(),
|
||||
workspaceId: params.workspaceId ?? null,
|
||||
workflowId: params.workflowId ?? null,
|
||||
executionId: params.executionId ?? null,
|
||||
})
|
||||
|
||||
logger.debug('Logged fixed usage', {
|
||||
userId: params.userId,
|
||||
source: params.source,
|
||||
description: params.description,
|
||||
cost: params.cost,
|
||||
})
|
||||
} catch (error) {
|
||||
logger.error('Failed to log fixed usage', {
|
||||
error: error instanceof Error ? error.message : String(error),
|
||||
params,
|
||||
})
|
||||
// Don't throw - usage logging should not break the main flow
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Parameters for batch logging workflow usage
|
||||
*/
|
||||
export interface LogWorkflowUsageBatchParams {
|
||||
userId: string
|
||||
workspaceId?: string
|
||||
workflowId: string
|
||||
executionId?: string
|
||||
baseExecutionCharge?: number
|
||||
models?: Record<
|
||||
string,
|
||||
{
|
||||
total: number
|
||||
tokens: { prompt: number; completion: number }
|
||||
}
|
||||
>
|
||||
}
|
||||
|
||||
/**
|
||||
* Log all workflow usage entries in a single batch insert (performance optimized)
|
||||
*/
|
||||
export async function logWorkflowUsageBatch(params: LogWorkflowUsageBatchParams): Promise<void> {
|
||||
if (!isBillingEnabled) {
|
||||
return
|
||||
}
|
||||
|
||||
const entries: Array<{
|
||||
id: string
|
||||
userId: string
|
||||
category: 'model' | 'fixed'
|
||||
source: 'workflow'
|
||||
description: string
|
||||
metadata: ModelUsageMetadata | null
|
||||
cost: string
|
||||
workspaceId: string | null
|
||||
workflowId: string | null
|
||||
executionId: string | null
|
||||
}> = []
|
||||
|
||||
if (params.baseExecutionCharge && params.baseExecutionCharge > 0) {
|
||||
entries.push({
|
||||
id: crypto.randomUUID(),
|
||||
userId: params.userId,
|
||||
category: 'fixed',
|
||||
source: 'workflow',
|
||||
description: 'execution_fee',
|
||||
metadata: null,
|
||||
cost: params.baseExecutionCharge.toString(),
|
||||
workspaceId: params.workspaceId ?? null,
|
||||
workflowId: params.workflowId,
|
||||
executionId: params.executionId ?? null,
|
||||
})
|
||||
}
|
||||
|
||||
if (params.models) {
|
||||
for (const [modelName, modelData] of Object.entries(params.models)) {
|
||||
if (modelData.total > 0) {
|
||||
entries.push({
|
||||
id: crypto.randomUUID(),
|
||||
userId: params.userId,
|
||||
category: 'model',
|
||||
source: 'workflow',
|
||||
description: modelName,
|
||||
metadata: {
|
||||
inputTokens: modelData.tokens.prompt,
|
||||
outputTokens: modelData.tokens.completion,
|
||||
},
|
||||
cost: modelData.total.toString(),
|
||||
workspaceId: params.workspaceId ?? null,
|
||||
workflowId: params.workflowId,
|
||||
executionId: params.executionId ?? null,
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (entries.length === 0) {
|
||||
return
|
||||
}
|
||||
|
||||
try {
|
||||
await db.insert(usageLog).values(entries)
|
||||
|
||||
logger.debug('Logged workflow usage batch', {
|
||||
userId: params.userId,
|
||||
workflowId: params.workflowId,
|
||||
entryCount: entries.length,
|
||||
})
|
||||
} catch (error) {
|
||||
logger.error('Failed to log workflow usage batch', {
|
||||
error: error instanceof Error ? error.message : String(error),
|
||||
params,
|
||||
})
|
||||
// Don't throw - usage logging should not break the main flow
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Options for querying usage logs
|
||||
*/
|
||||
export interface GetUsageLogsOptions {
|
||||
/** Filter by source */
|
||||
source?: UsageLogSource
|
||||
/** Filter by workspace */
|
||||
workspaceId?: string
|
||||
/** Start date (inclusive) */
|
||||
startDate?: Date
|
||||
/** End date (inclusive) */
|
||||
endDate?: Date
|
||||
/** Maximum number of results */
|
||||
limit?: number
|
||||
/** Cursor for pagination (log ID) */
|
||||
cursor?: string
|
||||
}
|
||||
|
||||
/**
|
||||
* Usage log entry returned from queries
|
||||
*/
|
||||
export interface UsageLogEntry {
|
||||
id: string
|
||||
createdAt: string
|
||||
category: UsageLogCategory
|
||||
source: UsageLogSource
|
||||
description: string
|
||||
metadata?: UsageLogMetadata
|
||||
cost: number
|
||||
workspaceId?: string
|
||||
workflowId?: string
|
||||
executionId?: string
|
||||
}
|
||||
|
||||
/**
|
||||
* Result from getUserUsageLogs
|
||||
*/
|
||||
export interface UsageLogsResult {
|
||||
logs: UsageLogEntry[]
|
||||
summary: {
|
||||
totalCost: number
|
||||
bySource: Record<string, number>
|
||||
}
|
||||
pagination: {
|
||||
nextCursor?: string
|
||||
hasMore: boolean
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get usage logs for a user with optional filtering and pagination
|
||||
*/
|
||||
export async function getUserUsageLogs(
|
||||
userId: string,
|
||||
options: GetUsageLogsOptions = {}
|
||||
): Promise<UsageLogsResult> {
|
||||
const { source, workspaceId, startDate, endDate, limit = 50, cursor } = options
|
||||
|
||||
try {
|
||||
const conditions = [eq(usageLog.userId, userId)]
|
||||
|
||||
if (source) {
|
||||
conditions.push(eq(usageLog.source, source))
|
||||
}
|
||||
|
||||
if (workspaceId) {
|
||||
conditions.push(eq(usageLog.workspaceId, workspaceId))
|
||||
}
|
||||
|
||||
if (startDate) {
|
||||
conditions.push(gte(usageLog.createdAt, startDate))
|
||||
}
|
||||
|
||||
if (endDate) {
|
||||
conditions.push(lte(usageLog.createdAt, endDate))
|
||||
}
|
||||
|
||||
if (cursor) {
|
||||
const cursorLog = await db
|
||||
.select({ createdAt: usageLog.createdAt })
|
||||
.from(usageLog)
|
||||
.where(eq(usageLog.id, cursor))
|
||||
.limit(1)
|
||||
|
||||
if (cursorLog.length > 0) {
|
||||
conditions.push(
|
||||
sql`(${usageLog.createdAt} < ${cursorLog[0].createdAt} OR (${usageLog.createdAt} = ${cursorLog[0].createdAt} AND ${usageLog.id} < ${cursor}))`
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
const logs = await db
|
||||
.select()
|
||||
.from(usageLog)
|
||||
.where(and(...conditions))
|
||||
.orderBy(desc(usageLog.createdAt), desc(usageLog.id))
|
||||
.limit(limit + 1)
|
||||
|
||||
const hasMore = logs.length > limit
|
||||
const resultLogs = hasMore ? logs.slice(0, limit) : logs
|
||||
|
||||
const transformedLogs: UsageLogEntry[] = resultLogs.map((log) => ({
|
||||
id: log.id,
|
||||
createdAt: log.createdAt.toISOString(),
|
||||
category: log.category as UsageLogCategory,
|
||||
source: log.source as UsageLogSource,
|
||||
description: log.description,
|
||||
...(log.metadata ? { metadata: log.metadata as UsageLogMetadata } : {}),
|
||||
cost: Number.parseFloat(log.cost),
|
||||
...(log.workspaceId ? { workspaceId: log.workspaceId } : {}),
|
||||
...(log.workflowId ? { workflowId: log.workflowId } : {}),
|
||||
...(log.executionId ? { executionId: log.executionId } : {}),
|
||||
}))
|
||||
|
||||
const summaryConditions = [eq(usageLog.userId, userId)]
|
||||
if (source) summaryConditions.push(eq(usageLog.source, source))
|
||||
if (workspaceId) summaryConditions.push(eq(usageLog.workspaceId, workspaceId))
|
||||
if (startDate) summaryConditions.push(gte(usageLog.createdAt, startDate))
|
||||
if (endDate) summaryConditions.push(lte(usageLog.createdAt, endDate))
|
||||
|
||||
const summaryResult = await db
|
||||
.select({
|
||||
source: usageLog.source,
|
||||
totalCost: sql<string>`SUM(${usageLog.cost})`,
|
||||
})
|
||||
.from(usageLog)
|
||||
.where(and(...summaryConditions))
|
||||
.groupBy(usageLog.source)
|
||||
|
||||
const bySource: Record<string, number> = {}
|
||||
let totalCost = 0
|
||||
|
||||
for (const row of summaryResult) {
|
||||
const sourceCost = Number.parseFloat(row.totalCost || '0')
|
||||
bySource[row.source] = sourceCost
|
||||
totalCost += sourceCost
|
||||
}
|
||||
|
||||
return {
|
||||
logs: transformedLogs,
|
||||
summary: {
|
||||
totalCost,
|
||||
bySource,
|
||||
},
|
||||
pagination: {
|
||||
nextCursor:
|
||||
hasMore && resultLogs.length > 0 ? resultLogs[resultLogs.length - 1].id : undefined,
|
||||
hasMore,
|
||||
},
|
||||
}
|
||||
} catch (error) {
|
||||
logger.error('Failed to get usage logs', {
|
||||
error: error instanceof Error ? error.message : String(error),
|
||||
userId,
|
||||
options,
|
||||
})
|
||||
throw error
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the user ID associated with a workflow
|
||||
* Helper function for cases where we only have a workflow ID
|
||||
*/
|
||||
export async function getUserIdFromWorkflow(workflowId: string): Promise<string | null> {
|
||||
try {
|
||||
const [workflowRecord] = await db
|
||||
.select({ userId: workflow.userId })
|
||||
.from(workflow)
|
||||
.where(eq(workflow.id, workflowId))
|
||||
.limit(1)
|
||||
|
||||
return workflowRecord?.userId ?? null
|
||||
} catch (error) {
|
||||
logger.error('Failed to get user ID from workflow', {
|
||||
error: error instanceof Error ? error.message : String(error),
|
||||
workflowId,
|
||||
})
|
||||
return null
|
||||
}
|
||||
}
|
||||
@@ -14,6 +14,7 @@ import {
|
||||
getOrgUsageLimit,
|
||||
maybeSendUsageThresholdEmail,
|
||||
} from '@/lib/billing/core/usage'
|
||||
import { logWorkflowUsageBatch } from '@/lib/billing/core/usage-log'
|
||||
import { checkAndBillOverageThreshold } from '@/lib/billing/threshold-billing'
|
||||
import { isBillingEnabled } from '@/lib/core/config/feature-flags'
|
||||
import { redactApiKeys } from '@/lib/core/security/redaction'
|
||||
@@ -46,43 +47,6 @@ export interface ToolCall {
|
||||
const logger = createLogger('ExecutionLogger')
|
||||
|
||||
export class ExecutionLogger implements IExecutionLoggerService {
|
||||
private mergeTraceSpans(existing: TraceSpan[], additional: TraceSpan[]): TraceSpan[] {
|
||||
// If no existing spans, just return additional
|
||||
if (!existing || existing.length === 0) return additional
|
||||
if (!additional || additional.length === 0) return existing
|
||||
|
||||
// Find the root "Workflow Execution" span in both arrays
|
||||
const existingRoot = existing.find((s) => s.name === 'Workflow Execution')
|
||||
const additionalRoot = additional.find((s) => s.name === 'Workflow Execution')
|
||||
|
||||
if (!existingRoot || !additionalRoot) {
|
||||
// If we can't find both roots, just concatenate (fallback)
|
||||
return [...existing, ...additional]
|
||||
}
|
||||
|
||||
// Calculate the full duration from original start to resume end
|
||||
const startTime = existingRoot.startTime
|
||||
const endTime = additionalRoot.endTime || existingRoot.endTime
|
||||
const fullDuration =
|
||||
startTime && endTime
|
||||
? new Date(endTime).getTime() - new Date(startTime).getTime()
|
||||
: (existingRoot.duration || 0) + (additionalRoot.duration || 0)
|
||||
|
||||
// Merge the children of the workflow execution spans
|
||||
const mergedRoot = {
|
||||
...existingRoot,
|
||||
children: [...(existingRoot.children || []), ...(additionalRoot.children || [])],
|
||||
endTime,
|
||||
duration: fullDuration,
|
||||
}
|
||||
|
||||
// Return array with merged root plus any other top-level spans
|
||||
const otherExisting = existing.filter((s) => s.name !== 'Workflow Execution')
|
||||
const otherAdditional = additional.filter((s) => s.name !== 'Workflow Execution')
|
||||
|
||||
return [mergedRoot, ...otherExisting, ...otherAdditional]
|
||||
}
|
||||
|
||||
private mergeCostModels(
|
||||
existing: Record<string, any>,
|
||||
additional: Record<string, any>
|
||||
@@ -383,7 +347,8 @@ export class ExecutionLogger implements IExecutionLoggerService {
|
||||
await this.updateUserStats(
|
||||
updatedLog.workflowId,
|
||||
costSummary,
|
||||
updatedLog.trigger as ExecutionTrigger['type']
|
||||
updatedLog.trigger as ExecutionTrigger['type'],
|
||||
executionId
|
||||
)
|
||||
|
||||
const limit = before.usageData.limit
|
||||
@@ -420,7 +385,8 @@ export class ExecutionLogger implements IExecutionLoggerService {
|
||||
await this.updateUserStats(
|
||||
updatedLog.workflowId,
|
||||
costSummary,
|
||||
updatedLog.trigger as ExecutionTrigger['type']
|
||||
updatedLog.trigger as ExecutionTrigger['type'],
|
||||
executionId
|
||||
)
|
||||
|
||||
const percentBefore =
|
||||
@@ -445,14 +411,16 @@ export class ExecutionLogger implements IExecutionLoggerService {
|
||||
await this.updateUserStats(
|
||||
updatedLog.workflowId,
|
||||
costSummary,
|
||||
updatedLog.trigger as ExecutionTrigger['type']
|
||||
updatedLog.trigger as ExecutionTrigger['type'],
|
||||
executionId
|
||||
)
|
||||
}
|
||||
} else {
|
||||
await this.updateUserStats(
|
||||
updatedLog.workflowId,
|
||||
costSummary,
|
||||
updatedLog.trigger as ExecutionTrigger['type']
|
||||
updatedLog.trigger as ExecutionTrigger['type'],
|
||||
executionId
|
||||
)
|
||||
}
|
||||
} catch (e) {
|
||||
@@ -460,7 +428,8 @@ export class ExecutionLogger implements IExecutionLoggerService {
|
||||
await this.updateUserStats(
|
||||
updatedLog.workflowId,
|
||||
costSummary,
|
||||
updatedLog.trigger as ExecutionTrigger['type']
|
||||
updatedLog.trigger as ExecutionTrigger['type'],
|
||||
executionId
|
||||
)
|
||||
} catch {}
|
||||
logger.warn('Usage threshold notification check failed (non-fatal)', { error: e })
|
||||
@@ -533,8 +502,18 @@ export class ExecutionLogger implements IExecutionLoggerService {
|
||||
totalCompletionTokens: number
|
||||
baseExecutionCharge: number
|
||||
modelCost: number
|
||||
models?: Record<
|
||||
string,
|
||||
{
|
||||
input: number
|
||||
output: number
|
||||
total: number
|
||||
tokens: { prompt: number; completion: number; total: number }
|
||||
}
|
||||
>
|
||||
},
|
||||
trigger: ExecutionTrigger['type']
|
||||
trigger: ExecutionTrigger['type'],
|
||||
executionId?: string
|
||||
): Promise<void> {
|
||||
if (!isBillingEnabled) {
|
||||
logger.debug('Billing is disabled, skipping user stats cost update')
|
||||
@@ -606,6 +585,16 @@ export class ExecutionLogger implements IExecutionLoggerService {
|
||||
addedTokens: costSummary.totalTokens,
|
||||
})
|
||||
|
||||
// Log usage entries for auditing (batch insert for performance)
|
||||
await logWorkflowUsageBatch({
|
||||
userId,
|
||||
workspaceId: workflowRecord.workspaceId ?? undefined,
|
||||
workflowId,
|
||||
executionId,
|
||||
baseExecutionCharge: costSummary.baseExecutionCharge,
|
||||
models: costSummary.models,
|
||||
})
|
||||
|
||||
// Check if user has hit overage threshold and bill incrementally
|
||||
await checkAndBillOverageThreshold(userId)
|
||||
} catch (error) {
|
||||
|
||||
23
packages/db/migrations/0128_swift_terrax.sql
Normal file
23
packages/db/migrations/0128_swift_terrax.sql
Normal file
@@ -0,0 +1,23 @@
|
||||
CREATE TYPE "public"."usage_log_category" AS ENUM('model', 'fixed');--> statement-breakpoint
|
||||
CREATE TYPE "public"."usage_log_source" AS ENUM('workflow', 'wand', 'copilot');--> statement-breakpoint
|
||||
CREATE TABLE "usage_log" (
|
||||
"id" text PRIMARY KEY NOT NULL,
|
||||
"user_id" text NOT NULL,
|
||||
"category" "usage_log_category" NOT NULL,
|
||||
"source" "usage_log_source" NOT NULL,
|
||||
"description" text NOT NULL,
|
||||
"metadata" jsonb,
|
||||
"cost" numeric NOT NULL,
|
||||
"workspace_id" text,
|
||||
"workflow_id" text,
|
||||
"execution_id" text,
|
||||
"created_at" timestamp DEFAULT now() NOT NULL
|
||||
);
|
||||
--> statement-breakpoint
|
||||
ALTER TABLE "usage_log" ADD CONSTRAINT "usage_log_user_id_user_id_fk" FOREIGN KEY ("user_id") REFERENCES "public"."user"("id") ON DELETE cascade ON UPDATE no action;--> statement-breakpoint
|
||||
ALTER TABLE "usage_log" ADD CONSTRAINT "usage_log_workspace_id_workspace_id_fk" FOREIGN KEY ("workspace_id") REFERENCES "public"."workspace"("id") ON DELETE set null ON UPDATE no action;--> statement-breakpoint
|
||||
ALTER TABLE "usage_log" ADD CONSTRAINT "usage_log_workflow_id_workflow_id_fk" FOREIGN KEY ("workflow_id") REFERENCES "public"."workflow"("id") ON DELETE set null ON UPDATE no action;--> statement-breakpoint
|
||||
CREATE INDEX "usage_log_user_created_at_idx" ON "usage_log" USING btree ("user_id","created_at");--> statement-breakpoint
|
||||
CREATE INDEX "usage_log_source_idx" ON "usage_log" USING btree ("source");--> statement-breakpoint
|
||||
CREATE INDEX "usage_log_workspace_id_idx" ON "usage_log" USING btree ("workspace_id");--> statement-breakpoint
|
||||
CREATE INDEX "usage_log_workflow_id_idx" ON "usage_log" USING btree ("workflow_id");
|
||||
8444
packages/db/migrations/meta/0128_snapshot.json
Normal file
8444
packages/db/migrations/meta/0128_snapshot.json
Normal file
File diff suppressed because it is too large
Load Diff
@@ -890,6 +890,13 @@
|
||||
"when": 1766209394504,
|
||||
"tag": "0127_flimsy_sister_grimm",
|
||||
"breakpoints": true
|
||||
},
|
||||
{
|
||||
"idx": 128,
|
||||
"version": "7",
|
||||
"when": 1766266581373,
|
||||
"tag": "0128_swift_terrax",
|
||||
"breakpoints": true
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
@@ -1664,3 +1664,51 @@ export const ssoProvider = pgTable(
|
||||
organizationIdIdx: index('sso_provider_organization_id_idx').on(table.organizationId),
|
||||
})
|
||||
)
|
||||
|
||||
// Usage logging for tracking individual billable operations
|
||||
export const usageLogCategoryEnum = pgEnum('usage_log_category', ['model', 'fixed'])
|
||||
export const usageLogSourceEnum = pgEnum('usage_log_source', ['workflow', 'wand', 'copilot'])
|
||||
|
||||
export const usageLog = pgTable(
|
||||
'usage_log',
|
||||
{
|
||||
id: text('id').primaryKey(),
|
||||
userId: text('user_id')
|
||||
.notNull()
|
||||
.references(() => user.id, { onDelete: 'cascade' }),
|
||||
|
||||
// Charge category: 'model' (token-based) or 'fixed' (flat fee)
|
||||
category: usageLogCategoryEnum('category').notNull(),
|
||||
|
||||
// What generated this charge: 'workflow', 'wand', 'copilot'
|
||||
source: usageLogSourceEnum('source').notNull(),
|
||||
|
||||
// For model charges: model name (e.g., 'gpt-4o', 'claude-4.5-opus')
|
||||
// For fixed charges: charge type (e.g., 'execution_fee', 'search_query')
|
||||
description: text('description').notNull(),
|
||||
|
||||
// Category-specific metadata (e.g., tokens for 'model' category)
|
||||
metadata: jsonb('metadata'),
|
||||
|
||||
// Cost in USD
|
||||
cost: decimal('cost').notNull(),
|
||||
|
||||
// Optional context references
|
||||
workspaceId: text('workspace_id').references(() => workspace.id, { onDelete: 'set null' }),
|
||||
workflowId: text('workflow_id').references(() => workflow.id, { onDelete: 'set null' }),
|
||||
executionId: text('execution_id'),
|
||||
|
||||
// Timestamp
|
||||
createdAt: timestamp('created_at').notNull().defaultNow(),
|
||||
},
|
||||
(table) => ({
|
||||
// Index for querying user's usage history (most common query)
|
||||
userCreatedAtIdx: index('usage_log_user_created_at_idx').on(table.userId, table.createdAt),
|
||||
// Index for filtering by source
|
||||
sourceIdx: index('usage_log_source_idx').on(table.source),
|
||||
// Index for workspace-specific queries
|
||||
workspaceIdIdx: index('usage_log_workspace_id_idx').on(table.workspaceId),
|
||||
// Index for workflow-specific queries
|
||||
workflowIdIdx: index('usage_log_workflow_id_idx').on(table.workflowId),
|
||||
})
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user