feat(metrics): add user_stats table that tracks more granular usage information per user (#177)

* added user_stats table to track total number of workflow runs

* added metrics for different types of triggers and runs

* add total_tokens_used and total_cost metrics

* ran migrations for new table, fixed build issue. added a user/stats route to fetch all user stats for the dashboard we'll eventually create

* fix bug with api deployment status not appearing
This commit is contained in:
Waleed Latif
2025-03-24 14:49:54 -07:00
committed by GitHub
parent a2ff120440
commit 78e701d8bb
17 changed files with 1625 additions and 43 deletions

View File

@@ -1,16 +1,18 @@
import { NextRequest, NextResponse } from 'next/server'
import { Cron } from 'croner'
import { eq, lte } from 'drizzle-orm'
import { sql } from 'drizzle-orm'
import { v4 as uuidv4 } from 'uuid'
import { z } from 'zod'
import { createLogger } from '@/lib/logs/console-logger'
import { persistExecutionError, persistExecutionLogs } from '@/lib/logs/execution-logger'
import { buildTraceSpans } from '@/lib/logs/trace-spans'
import { decryptSecret } from '@/lib/utils'
import { updateWorkflowRunCounts } from '@/lib/workflows/utils'
import { mergeSubblockState } from '@/stores/workflows/utils'
import { BlockState, WorkflowState } from '@/stores/workflows/workflow/types'
import { db } from '@/db'
import { environment, workflow, workflowSchedule } from '@/db/schema'
import { environment, userStats, workflow, workflowSchedule } from '@/db/schema'
import { Executor } from '@/executor'
import { Serializer } from '@/serializer'
@@ -326,6 +328,25 @@ export async function GET(req: NextRequest) {
)
const result = await executor.execute(schedule.workflowId)
logger.info(`[${requestId}] Workflow execution completed: ${schedule.workflowId}`, {
success: result.success,
executionTime: result.metadata?.duration,
})
// Update workflow run counts if execution was successful
if (result.success) {
await updateWorkflowRunCounts(schedule.workflowId)
// Track scheduled execution in user stats
await db
.update(userStats)
.set({
totalScheduledExecutions: sql`total_scheduled_executions + 1`,
lastActive: new Date(),
})
.where(eq(userStats.userId, workflowRecord.userId))
}
// Build trace spans from execution logs
const { traceSpans, totalDuration } = buildTraceSpans(result)
@@ -371,10 +392,6 @@ export async function GET(req: NextRequest) {
nextRunAt: nextRetryAt,
})
.where(eq(workflowSchedule.id, schedule.id))
logger.debug(
`[${requestId}] Scheduled retry for workflow ${schedule.workflowId} at ${nextRetryAt.toISOString()}`
)
}
} catch (error: any) {
logger.error(

View File

@@ -6,7 +6,7 @@ import { createLogger } from '@/lib/logs/console-logger'
import { db } from '@/db'
import { workflowSchedule } from '@/db/schema'
const logger = createLogger('Scheduled API')
const logger = createLogger('ScheduledAPI')
/**
* Get schedule information for a workflow

View File

@@ -0,0 +1,68 @@
import { NextRequest, NextResponse } from 'next/server'
import { eq, sql } from 'drizzle-orm'
import { getSession } from '@/lib/auth'
import { createLogger } from '@/lib/logs/console-logger'
import { db } from '@/db'
import { userStats, workflow } from '@/db/schema'
const logger = createLogger('UserStatsAPI')
/**
* GET endpoint to retrieve user statistics including the count of workflows
*/
export async function GET(request: NextRequest) {
try {
// Get the user session
const session = await getSession()
if (!session?.user?.id) {
logger.warn('Unauthorized user stats access attempt')
return NextResponse.json({ error: 'Unauthorized' }, { status: 401 })
}
const userId = session.user.id
// Get workflow count for user
const [workflowCountResult] = await db
.select({ count: sql`count(*)::int` })
.from(workflow)
.where(eq(workflow.userId, userId))
const workflowCount = workflowCountResult?.count || 0
// Get user stats record
const userStatsRecords = await db.select().from(userStats).where(eq(userStats.userId, userId))
// If no stats record exists, create one
if (userStatsRecords.length === 0) {
const newStats = {
id: crypto.randomUUID(),
userId,
totalManualExecutions: 0,
totalApiCalls: 0,
totalWebhookTriggers: 0,
totalScheduledExecutions: 0,
totalTokensUsed: 0,
totalCost: '0.00',
lastActive: new Date(),
}
await db.insert(userStats).values(newStats)
// Return the newly created stats with workflow count
return NextResponse.json({
...newStats,
workflowCount,
})
}
// Return stats with workflow count
const stats = userStatsRecords[0]
return NextResponse.json({
...stats,
workflowCount,
})
} catch (error) {
logger.error('Error fetching user stats:', error)
return NextResponse.json({ error: 'Failed to fetch user statistics' }, { status: 500 })
}
}

View File

@@ -1,14 +1,15 @@
import { NextRequest, NextResponse } from 'next/server'
import { and, eq } from 'drizzle-orm'
import { and, eq, sql } from 'drizzle-orm'
import { v4 as uuidv4 } from 'uuid'
import { createLogger } from '@/lib/logs/console-logger'
import { persistExecutionError, persistExecutionLogs } from '@/lib/logs/execution-logger'
import { buildTraceSpans } from '@/lib/logs/trace-spans'
import { closeRedisConnection, hasProcessedMessage, markMessageAsProcessed } from '@/lib/redis'
import { decryptSecret } from '@/lib/utils'
import { updateWorkflowRunCounts } from '@/lib/workflows/utils'
import { mergeSubblockStateAsync } from '@/stores/workflows/utils'
import { db } from '@/db'
import { environment, webhook, workflow } from '@/db/schema'
import { environment, userStats, webhook, workflow } from '@/db/schema'
import { Executor } from '@/executor'
import { Serializer } from '@/serializer'
@@ -559,6 +560,20 @@ async function processWebhook(
executionTime: result.metadata?.duration,
})
// Update workflow run counts if execution was successful
if (result.success) {
await updateWorkflowRunCounts(foundWorkflow.id)
// Track webhook trigger in user stats
await db
.update(userStats)
.set({
totalWebhookTriggers: sql`total_webhook_triggers + 1`,
lastActive: new Date(),
})
.where(eq(userStats.userId, foundWorkflow.userId))
}
// Build trace spans from execution logs
const { traceSpans, totalDuration } = buildTraceSpans(result)

View File

@@ -12,6 +12,59 @@ const logger = createLogger('WorkflowDeployAPI')
export const dynamic = 'force-dynamic'
export const runtime = 'nodejs'
export async function GET(request: NextRequest, { params }: { params: Promise<{ id: string }> }) {
const requestId = crypto.randomUUID().slice(0, 8)
const { id } = await params
try {
logger.debug(`[${requestId}] Fetching deployment info for workflow: ${id}`)
const validation = await validateWorkflowAccess(request, id, false)
if (validation.error) {
logger.warn(`[${requestId}] Failed to fetch deployment info: ${validation.error.message}`)
return createErrorResponse(validation.error.message, validation.error.status)
}
// Fetch the workflow information including deployment details
const result = await db
.select({
apiKey: workflow.apiKey,
isDeployed: workflow.isDeployed,
deployedAt: workflow.deployedAt,
})
.from(workflow)
.where(eq(workflow.id, id))
.limit(1)
if (result.length === 0) {
logger.warn(`[${requestId}] Workflow not found: ${id}`)
return createErrorResponse('Workflow not found', 404)
}
const workflowData = result[0]
// If the workflow is not deployed, return appropriate response
if (!workflowData.isDeployed || !workflowData.apiKey) {
logger.info(`[${requestId}] Workflow is not deployed: ${id}`)
return createSuccessResponse({
isDeployed: false,
apiKey: null,
deployedAt: null,
})
}
logger.info(`[${requestId}] Successfully retrieved deployment info: ${id}`)
return createSuccessResponse({
apiKey: workflowData.apiKey,
isDeployed: workflowData.isDeployed,
deployedAt: workflowData.deployedAt,
})
} catch (error: any) {
logger.error(`[${requestId}] Error fetching deployment info: ${id}`, error)
return createErrorResponse(error.message || 'Failed to fetch deployment information', 500)
}
}
export async function POST(request: NextRequest, { params }: { params: Promise<{ id: string }> }) {
const requestId = crypto.randomUUID().slice(0, 8)
const { id } = await params

View File

@@ -1,15 +1,16 @@
import { NextRequest } from 'next/server'
import { eq } from 'drizzle-orm'
import { eq, sql } from 'drizzle-orm'
import { v4 as uuidv4 } from 'uuid'
import { z } from 'zod'
import { createLogger } from '@/lib/logs/console-logger'
import { persistExecutionError, persistExecutionLogs } from '@/lib/logs/execution-logger'
import { buildTraceSpans } from '@/lib/logs/trace-spans'
import { decryptSecret } from '@/lib/utils'
import { updateWorkflowRunCounts } from '@/lib/workflows/utils'
import { mergeSubblockState } from '@/stores/workflows/utils'
import { WorkflowState } from '@/stores/workflows/workflow/types'
import { db } from '@/db'
import { environment } from '@/db/schema'
import { environment, userStats } from '@/db/schema'
import { Executor } from '@/executor'
import { Serializer } from '@/serializer'
import { validateWorkflowAccess } from '../../middleware'
@@ -159,6 +160,20 @@ async function executeWorkflow(workflow: any, requestId: string, input?: any) {
executionTime: result.metadata?.duration,
})
// Update workflow run counts if execution was successful
if (result.success) {
await updateWorkflowRunCounts(workflowId)
// Track API call in user stats
await db
.update(userStats)
.set({
totalApiCalls: sql`total_api_calls + 1`,
lastActive: new Date(),
})
.where(eq(userStats.userId, workflow.userId))
}
// Build trace spans from execution logs
const { traceSpans, totalDuration } = buildTraceSpans(result)

View File

@@ -0,0 +1,89 @@
import { NextRequest, NextResponse } from 'next/server'
import { eq, sql } from 'drizzle-orm'
import { createLogger } from '@/lib/logs/console-logger'
import { db } from '@/db'
import { userStats, workflow } from '@/db/schema'
const logger = createLogger('WorkflowStatsAPI')
export async function POST(request: NextRequest, { params }: { params: Promise<{ id: string }> }) {
const { id } = await params
const searchParams = request.nextUrl.searchParams
const runs = parseInt(searchParams.get('runs') || '1', 10)
if (isNaN(runs) || runs < 1 || runs > 100) {
logger.error(`Invalid number of runs: ${runs}`)
return NextResponse.json(
{ error: 'Invalid number of runs. Must be between 1 and 100.' },
{ status: 400 }
)
}
try {
// Get workflow record
const [workflowRecord] = await db.select().from(workflow).where(eq(workflow.id, id)).limit(1)
if (!workflowRecord) {
return NextResponse.json({ error: `Workflow ${id} not found` }, { status: 404 })
}
// Update workflow runCount
try {
await db
.update(workflow)
.set({
runCount: workflowRecord.runCount + runs,
lastRunAt: new Date(),
})
.where(eq(workflow.id, id))
} catch (error) {
logger.error('Error updating workflow runCount:', error)
throw error
}
// Upsert user stats record
try {
// Check if record exists
const userStatsRecords = await db
.select()
.from(userStats)
.where(eq(userStats.userId, workflowRecord.userId))
if (userStatsRecords.length === 0) {
// Create new record if none exists
await db.insert(userStats).values({
id: crypto.randomUUID(),
userId: workflowRecord.userId,
totalManualExecutions: runs,
totalApiCalls: 0,
totalWebhookTriggers: 0,
totalScheduledExecutions: 0,
totalTokensUsed: 0,
totalCost: '0.00',
lastActive: new Date(),
})
} else {
// Update existing record
await db
.update(userStats)
.set({
totalManualExecutions: sql`total_manual_executions + ${runs}`,
lastActive: new Date(),
})
.where(eq(userStats.userId, workflowRecord.userId))
}
} catch (error) {
logger.error(`Error upserting userStats for userId ${workflowRecord.userId}:`, error)
// Don't rethrow - we want to continue even if this fails
}
return NextResponse.json({
success: true,
runsAdded: runs,
newTotal: workflowRecord.runCount + runs,
})
} catch (error) {
logger.error('Error updating workflow stats:', error)
return NextResponse.json({ error: 'Internal server error' }, { status: 500 })
}
}

View File

@@ -1,6 +1,6 @@
import { NextRequest } from 'next/server'
import { createLogger } from '@/lib/logs/console-logger'
import { getWorkflowById } from '@/lib/workflows'
import { getWorkflowById } from '@/lib/workflows/utils'
const logger = createLogger('WorkflowMiddleware')

View File

@@ -208,7 +208,7 @@ export function ControlBar() {
try {
setIsDeploying(true)
const response = await fetch(`/api/workflows/${activeWorkflowId}/deploy/info`)
const response = await fetch(`/api/workflows/${activeWorkflowId}/deploy`)
if (!response.ok) throw new Error('Failed to fetch deployment info')
const { apiKey } = await response.json()
@@ -313,6 +313,19 @@ export function ControlBar() {
await handleRunWorkflow()
setCompletedRuns(i + 1)
}
// Update workflow stats after all runs are complete
if (activeWorkflowId) {
const response = await fetch(`/api/workflows/${activeWorkflowId}/stats?runs=${runCount}`, {
method: 'POST',
})
if (!response.ok) {
const errorData = await response.json()
logger.error(`Failed to update workflow stats: ${JSON.stringify(errorData)}`)
throw new Error('Failed to update workflow stats')
}
}
} catch (error) {
logger.error('Error during multiple workflow runs:', { error })
addNotification('error', 'Failed to complete all workflow runs', activeWorkflowId)

View File

@@ -0,0 +1,16 @@
CREATE TABLE "user_stats" (
"id" text PRIMARY KEY NOT NULL,
"user_id" text NOT NULL,
"total_manual_executions" integer DEFAULT 0 NOT NULL,
"total_api_calls" integer DEFAULT 0 NOT NULL,
"total_webhook_triggers" integer DEFAULT 0 NOT NULL,
"total_scheduled_executions" integer DEFAULT 0 NOT NULL,
"total_tokens_used" integer DEFAULT 0 NOT NULL,
"total_cost" numeric DEFAULT '0' NOT NULL,
"last_active" timestamp DEFAULT now() NOT NULL,
CONSTRAINT "user_stats_user_id_unique" UNIQUE("user_id")
);
--> statement-breakpoint
ALTER TABLE "workflow" ADD COLUMN "run_count" integer DEFAULT 0 NOT NULL;--> statement-breakpoint
ALTER TABLE "workflow" ADD COLUMN "last_run_at" timestamp;--> statement-breakpoint
ALTER TABLE "user_stats" ADD CONSTRAINT "user_stats_user_id_user_id_fk" FOREIGN KEY ("user_id") REFERENCES "public"."user"("id") ON DELETE cascade ON UPDATE no action;

File diff suppressed because it is too large Load Diff

View File

@@ -148,6 +148,13 @@
"when": 1742552444912,
"tag": "0020_clear_skreet",
"breakpoints": true
},
{
"idx": 21,
"version": "7",
"when": 1742850849852,
"tag": "0021_shocking_korath",
"breakpoints": true
}
]
}

View File

@@ -1,4 +1,13 @@
import { boolean, json, pgTable, text, timestamp, uniqueIndex, integer } from 'drizzle-orm/pg-core'
import {
boolean,
decimal,
integer,
json,
pgTable,
text,
timestamp,
uniqueIndex,
} from 'drizzle-orm/pg-core'
export const user = pgTable('user', {
id: text('id').primaryKey(),
@@ -67,6 +76,8 @@ export const workflow = pgTable('workflow', {
apiKey: text('api_key'),
isPublished: boolean('is_published').notNull().default(false),
collaborators: json('collaborators').notNull().default('[]'),
runCount: integer('run_count').notNull().default(0),
lastRunAt: timestamp('last_run_at'),
})
export const waitlist = pgTable('waitlist', {
@@ -179,17 +190,36 @@ export const marketplace = pgTable('marketplace', {
updatedAt: timestamp('updated_at').notNull().defaultNow(),
})
export const marketplaceStar = pgTable('marketplace_star', {
export const marketplaceStar = pgTable(
'marketplace_star',
{
id: text('id').primaryKey(),
marketplaceId: text('marketplace_id')
.notNull()
.references(() => marketplace.id, { onDelete: 'cascade' }),
userId: text('user_id')
.notNull()
.references(() => user.id),
createdAt: timestamp('created_at').notNull().defaultNow(),
},
(table) => {
return {
userMarketplaceIdx: uniqueIndex('user_marketplace_idx').on(table.userId, table.marketplaceId),
}
}
)
export const userStats = pgTable('user_stats', {
id: text('id').primaryKey(),
marketplaceId: text('marketplace_id')
.notNull()
.references(() => marketplace.id, { onDelete: 'cascade' }),
userId: text('user_id')
.notNull()
.references(() => user.id),
createdAt: timestamp('created_at').notNull().defaultNow(),
}, (table) => {
return {
userMarketplaceIdx: uniqueIndex('user_marketplace_idx').on(table.userId, table.marketplaceId),
}
})
.references(() => user.id, { onDelete: 'cascade' })
.unique(), // One record per user
totalManualExecutions: integer('total_manual_executions').notNull().default(0),
totalApiCalls: integer('total_api_calls').notNull().default(0),
totalWebhookTriggers: integer('total_webhook_triggers').notNull().default(0),
totalScheduledExecutions: integer('total_scheduled_executions').notNull().default(0),
totalTokensUsed: integer('total_tokens_used').notNull().default(0),
totalCost: decimal('total_cost').notNull().default('0'),
lastActive: timestamp('last_active').notNull().defaultNow(),
})

View File

@@ -1,7 +1,8 @@
import { eq, sql } from 'drizzle-orm'
import { v4 as uuidv4 } from 'uuid'
import { createLogger } from '@/lib/logs/console-logger'
import { db } from '@/db'
import { workflowLogs } from '@/db/schema'
import { userStats, workflow, workflowLogs } from '@/db/schema'
import { ExecutionResult as ExecutorResult } from '@/executor/types'
const logger = createLogger('ExecutionLogger')
@@ -69,6 +70,20 @@ export async function persistExecutionLogs(
triggerType: 'api' | 'webhook' | 'schedule' | 'manual'
) {
try {
// Get the workflow record to get the userId
const [workflowRecord] = await db
.select()
.from(workflow)
.where(eq(workflow.id, workflowId))
.limit(1)
if (!workflowRecord) {
logger.error(`Workflow ${workflowId} not found`)
return
}
const userId = workflowRecord.userId
// Track accumulated cost data across all agent blocks
let totalCost = 0
let totalInputCost = 0
@@ -512,14 +527,39 @@ export async function persistExecutionLogs(
}
}
logger.info(`Workflow execution total cost: ${totalCost}`, {
workflowId,
executionId,
totalCost,
inputCost: totalInputCost,
outputCost: totalOutputCost,
models: Object.keys(modelCounts),
})
if (userId) {
try {
const userStatsRecords = await db
.select()
.from(userStats)
.where(eq(userStats.userId, userId))
if (userStatsRecords.length === 0) {
await db.insert(userStats).values({
id: crypto.randomUUID(),
userId: userId,
totalManualExecutions: 0,
totalApiCalls: 0,
totalWebhookTriggers: 0,
totalScheduledExecutions: 0,
totalTokensUsed: totalTokens,
totalCost: totalCost.toString(),
lastActive: new Date(),
})
} else {
await db
.update(userStats)
.set({
totalTokensUsed: sql`total_tokens_used + ${totalTokens}`,
totalCost: sql`total_cost + ${totalCost}`,
lastActive: new Date(),
})
.where(eq(userStats.userId, userId))
}
} catch (error) {
logger.error(`Error upserting user stats:`, error)
}
}
}
// Log the final execution result

View File

@@ -1,9 +0,0 @@
import { eq } from 'drizzle-orm'
import { db } from '@/db'
import { workflow as workflowTable } from '@/db/schema'
export async function getWorkflowById(id: string) {
const workflows = await db.select().from(workflowTable).where(eq(workflowTable.id, id)).limit(1)
return workflows[0]
}

View File

@@ -0,0 +1,35 @@
import { eq } from 'drizzle-orm'
import { createLogger } from '@/lib/logs/console-logger'
import { db } from '@/db'
import { userStats, workflow as workflowTable } from '@/db/schema'
const logger = createLogger('WorkflowUtils')
export async function getWorkflowById(id: string) {
const workflows = await db.select().from(workflowTable).where(eq(workflowTable.id, id)).limit(1)
return workflows[0]
}
export async function updateWorkflowRunCounts(workflowId: string, runs: number = 1) {
try {
const workflow = await getWorkflowById(workflowId)
if (!workflow) {
logger.error(`Workflow ${workflowId} not found`)
throw new Error(`Workflow ${workflowId} not found`)
}
const response = await fetch(`/api/workflows/${workflowId}/stats?runs=${runs}`, {
method: 'POST',
})
if (!response.ok) {
const error = await response.json()
throw new Error(error.error || 'Failed to update workflow stats')
}
return response.json()
} catch (error) {
logger.error(`Error updating workflow run counts:`, error)
throw error
}
}

View File

@@ -2,7 +2,6 @@ import { StateCreator } from 'zustand'
import { saveSubblockValues, saveWorkflowState } from './persistence'
import { useWorkflowRegistry } from './registry/store'
import { useSubBlockStore } from './subblock/store'
import { mergeSubblockState } from './utils'
import { WorkflowState, WorkflowStore } from './workflow/types'
// Types