fix(execution-counts): execution counts by trigger type recorded accurately (#1670)

This commit is contained in:
Vikhyath Mondreti
2025-10-17 12:18:11 -07:00
committed by GitHub
parent 29c7827d6f
commit 0fbbbe02c7
6 changed files with 24 additions and 82 deletions

View File

@@ -1,7 +1,4 @@
import { db } from '@sim/db'
import { userStats } from '@sim/db/schema'
import { tasks } from '@trigger.dev/sdk'
import { eq, sql } from 'drizzle-orm'
import { type NextRequest, NextResponse } from 'next/server'
import { v4 as uuidv4 } from 'uuid'
import { z } from 'zod'
@@ -134,7 +131,8 @@ export async function executeWorkflow(
throw new Error('Execution is already running')
}
const loggingSession = new LoggingSession(workflowId, executionId, 'api', requestId)
const triggerType: TriggerType = streamConfig?.workflowTriggerType || 'api'
const loggingSession = new LoggingSession(workflowId, executionId, triggerType, requestId)
const usageCheck = await checkServerSideUsageLimits(actorUserId)
if (usageCheck.isExceeded) {
@@ -367,14 +365,6 @@ export async function executeWorkflow(
if (result.success) {
await updateWorkflowRunCounts(workflowId)
await db
.update(userStats)
.set({
totalApiCalls: sql`total_api_calls + 1`,
lastActive: sql`now()`,
})
.where(eq(userStats.userId, actorUserId))
}
if (!streamConfig?.skipLoggingComplete) {

View File

@@ -41,20 +41,17 @@ export async function POST(request: NextRequest, { params }: { params: Promise<{
throw error
}
// Upsert user stats record
try {
// Check if record exists
const userStatsRecords = await db
.select()
.from(userStats)
.where(eq(userStats.userId, workflowRecord.userId))
if (userStatsRecords.length === 0) {
// Create new record if none exists
await db.insert(userStats).values({
id: crypto.randomUUID(),
userId: workflowRecord.userId,
totalManualExecutions: runs,
totalManualExecutions: 0,
totalApiCalls: 0,
totalWebhookTriggers: 0,
totalScheduledExecutions: 0,
@@ -64,17 +61,15 @@ export async function POST(request: NextRequest, { params }: { params: Promise<{
lastActive: sql`now()`,
})
} else {
// Update existing record
await db
.update(userStats)
.set({
totalManualExecutions: sql`total_manual_executions + ${runs}`,
lastActive: new Date(),
lastActive: sql`now()`,
})
.where(eq(userStats.userId, workflowRecord.userId))
}
} catch (error) {
logger.error(`Error upserting userStats for userId ${workflowRecord.userId}:`, error)
logger.error(`Error ensuring userStats for userId ${workflowRecord.userId}:`, error)
// Don't rethrow - we want to continue even if this fails
}

View File

@@ -503,7 +503,8 @@ export function useWorkflowExecution() {
workflowInput,
onStream,
executionId,
onBlockComplete
onBlockComplete,
'chat'
)
// Check if execution was cancelled
@@ -611,7 +612,13 @@ export function useWorkflowExecution() {
// For manual (non-chat) execution
const executionId = uuidv4()
try {
const result = await executeWorkflow(workflowInput, undefined, executionId)
const result = await executeWorkflow(
workflowInput,
undefined,
executionId,
undefined,
'manual'
)
if (result && 'metadata' in result && result.metadata?.isDebugSession) {
setDebugContext(result.metadata.context || null)
if (result.metadata.pendingBlocks) {
@@ -666,7 +673,8 @@ export function useWorkflowExecution() {
workflowInput?: any,
onStream?: (se: StreamingExecution) => Promise<void>,
executionId?: string,
onBlockComplete?: (blockId: string, output: any) => Promise<void>
onBlockComplete?: (blockId: string, output: any) => Promise<void>,
overrideTriggerType?: 'chat' | 'manual' | 'api'
): Promise<ExecutionResult | StreamingExecution> => {
// Use currentWorkflow but check if we're in diff mode
const { blocks: workflowBlocks, edges: workflowEdges } = currentWorkflow
@@ -683,7 +691,8 @@ export function useWorkflowExecution() {
)
const isExecutingFromChat =
workflowInput && typeof workflowInput === 'object' && 'input' in workflowInput
overrideTriggerType === 'chat' ||
(workflowInput && typeof workflowInput === 'object' && 'input' in workflowInput)
logger.info('Executing workflow', {
isDiffMode: currentWorkflow.isDiffMode,

View File

@@ -1,7 +1,7 @@
import { db, userStats, workflow, workflowSchedule } from '@sim/db'
import { db, workflow, workflowSchedule } from '@sim/db'
import { task } from '@trigger.dev/sdk'
import { Cron } from 'croner'
import { eq, sql } from 'drizzle-orm'
import { eq } from 'drizzle-orm'
import { v4 as uuidv4 } from 'uuid'
import { getApiKeyOwnerUserId } from '@/lib/api-key/service'
import { checkServerSideUsageLimits } from '@/lib/billing'
@@ -366,20 +366,6 @@ export async function executeScheduleJob(payload: ScheduleExecutionPayload) {
if (executionResult.success) {
await updateWorkflowRunCounts(payload.workflowId)
try {
await db
.update(userStats)
.set({
totalScheduledExecutions: sql`total_scheduled_executions + 1`,
lastActive: now,
})
.where(eq(userStats.userId, actorUserId))
logger.debug(`[${requestId}] Updated user stats for scheduled execution`)
} catch (statsError) {
logger.error(`[${requestId}] Error updating user stats:`, statsError)
}
}
const { traceSpans, totalDuration } = buildTraceSpans(executionResult)

View File

@@ -1,7 +1,7 @@
import { db } from '@sim/db'
import { userStats, webhook, workflow as workflowTable } from '@sim/db/schema'
import { webhook, workflow as workflowTable } from '@sim/db/schema'
import { task } from '@trigger.dev/sdk'
import { eq, sql } from 'drizzle-orm'
import { eq } from 'drizzle-orm'
import { v4 as uuidv4 } from 'uuid'
import { checkServerSideUsageLimits } from '@/lib/billing'
import { getPersonalAndWorkspaceEnv } from '@/lib/environment/utils'
@@ -291,15 +291,6 @@ async function executeWebhookJobInternal(
// Update workflow run counts on success
if (executionResult.success) {
await updateWorkflowRunCounts(payload.workflowId)
// Track execution in user stats
await db
.update(userStats)
.set({
totalWebhookTriggers: sql`total_webhook_triggers + 1`,
lastActive: sql`now()`,
})
.where(eq(userStats.userId, payload.userId))
}
// Build trace spans and complete logging session
@@ -492,17 +483,6 @@ async function executeWebhookJobInternal(
// Update workflow run counts on success
if (executionResult.success) {
await updateWorkflowRunCounts(payload.workflowId)
// Track execution in user stats (skip in test mode)
if (!payload.testMode) {
await db
.update(userStats)
.set({
totalWebhookTriggers: sql`total_webhook_triggers + 1`,
lastActive: sql`now()`,
})
.where(eq(userStats.userId, payload.userId))
}
}
// Build trace spans and complete logging session

View File

@@ -1,7 +1,7 @@
import { db } from '@sim/db'
import { userStats, workflow as workflowTable } from '@sim/db/schema'
import { workflow as workflowTable } from '@sim/db/schema'
import { task } from '@trigger.dev/sdk'
import { eq, sql } from 'drizzle-orm'
import { eq } from 'drizzle-orm'
import { v4 as uuidv4 } from 'uuid'
import { checkServerSideUsageLimits } from '@/lib/billing'
import { getPersonalAndWorkspaceEnv } from '@/lib/environment/utils'
@@ -148,24 +148,6 @@ export async function executeWorkflowJob(payload: WorkflowExecutionPayload) {
// Update workflow run counts on success
if (executionResult.success) {
await updateWorkflowRunCounts(workflowId)
// Track execution in user stats
const statsUpdate =
triggerType === 'api'
? { totalApiCalls: sql`total_api_calls + 1` }
: triggerType === 'webhook'
? { totalWebhookTriggers: sql`total_webhook_triggers + 1` }
: triggerType === 'schedule'
? { totalScheduledExecutions: sql`total_scheduled_executions + 1` }
: { totalManualExecutions: sql`total_manual_executions + 1` }
await db
.update(userStats)
.set({
...statsUpdate,
lastActive: sql`now()`,
})
.where(eq(userStats.userId, payload.userId))
}
// Build trace spans and complete logging session (for both success and failure)