fix(logs): logging with error issues for model costs (#2169)

* fix(async-execution): restore async executions

* fix schedules trace span collection'

* fix execution trace spans for schedules + cost tracking when workflow errors
This commit is contained in:
Vikhyath Mondreti
2025-12-02 20:54:17 -08:00
committed by GitHub
parent 3e83fb398c
commit 7de721e090
4 changed files with 100 additions and 20 deletions

View File

@@ -1,9 +1,12 @@
import { tasks } from '@trigger.dev/sdk'
import { type NextRequest, NextResponse } from 'next/server'
import { validate as uuidValidate, v4 as uuidv4 } from 'uuid'
import { z } from 'zod'
import { checkHybridAuth } from '@/lib/auth/hybrid'
import { env, isTruthy } from '@/lib/core/config/env'
import { generateRequestId } from '@/lib/core/utils/request'
import { SSE_HEADERS } from '@/lib/core/utils/sse'
import { getBaseUrl } from '@/lib/core/utils/urls'
import { processInputFileFields } from '@/lib/execution/files'
import { preprocessExecution } from '@/lib/execution/preprocessing'
import { createLogger } from '@/lib/logs/console/logger'
@@ -17,6 +20,7 @@ import {
} from '@/lib/workflows/persistence/utils'
import { createStreamingResponse } from '@/lib/workflows/streaming/streaming'
import { createHttpResponseFromBlock, workflowHasResponseBlock } from '@/lib/workflows/utils'
import type { WorkflowExecutionPayload } from '@/background/workflow-execution'
import { type ExecutionMetadata, ExecutionSnapshot } from '@/executor/execution/snapshot'
import type { StreamingExecution } from '@/executor/types'
import { Serializer } from '@/serializer'
@@ -217,6 +221,64 @@ function resolveOutputIds(
})
}
type AsyncExecutionParams = {
requestId: string
workflowId: string
userId: string
input: any
triggerType: 'api' | 'webhook' | 'schedule' | 'manual' | 'chat'
}
/**
* Handles async workflow execution by queueing a background job.
* Returns immediately with a 202 Accepted response containing the job ID.
*/
async function handleAsyncExecution(params: AsyncExecutionParams): Promise<NextResponse> {
const { requestId, workflowId, userId, input, triggerType } = params
const useTrigger = isTruthy(env.TRIGGER_DEV_ENABLED)
if (!useTrigger) {
logger.warn(`[${requestId}] Async mode requested but TRIGGER_DEV_ENABLED is false`)
return NextResponse.json(
{ error: 'Async execution is not enabled. Set TRIGGER_DEV_ENABLED=true to use async mode.' },
{ status: 400 }
)
}
const payload: WorkflowExecutionPayload = {
workflowId,
userId,
input,
triggerType,
}
try {
const handle = await tasks.trigger('workflow-execution', payload)
logger.info(`[${requestId}] Queued async workflow execution`, {
workflowId,
jobId: handle.id,
})
return NextResponse.json(
{
success: true,
async: true,
jobId: handle.id,
message: 'Workflow execution queued',
statusUrl: `${getBaseUrl()}/api/jobs/${handle.id}`,
},
{ status: 202 }
)
} catch (error: any) {
logger.error(`[${requestId}] Failed to queue async execution`, error)
return NextResponse.json(
{ error: `Failed to queue async execution: ${error.message}` },
{ status: 500 }
)
}
}
/**
* POST /api/workflows/[id]/execute
*
@@ -291,6 +353,8 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
const streamHeader = req.headers.get('X-Stream-Response') === 'true'
const enableSSE = streamHeader || streamParam === true
const executionModeHeader = req.headers.get('X-Execution-Mode')
const isAsyncMode = executionModeHeader === 'async'
logger.info(`[${requestId}] Starting server-side execution`, {
workflowId,
@@ -301,6 +365,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
streamParam,
streamHeader,
enableSSE,
isAsyncMode,
})
const executionId = uuidv4()
@@ -349,6 +414,16 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
workspaceId: workflow.workspaceId,
})
if (isAsyncMode) {
return handleAsyncExecution({
requestId,
workflowId,
userId: actorUserId,
input,
triggerType: loggingTriggerType,
})
}
let cachedWorkflowData: {
blocks: Record<string, any>
edges: any[]

View File

@@ -9,6 +9,7 @@ import { getPersonalAndWorkspaceEnv } from '@/lib/environment/utils'
import { preprocessExecution } from '@/lib/execution/preprocessing'
import { createLogger } from '@/lib/logs/console/logger'
import { LoggingSession } from '@/lib/logs/execution/logging-session'
import { buildTraceSpans } from '@/lib/logs/execution/trace-spans/trace-spans'
import { executeWorkflowCore } from '@/lib/workflows/executor/execution-core'
import { PauseResumeManager } from '@/lib/workflows/executor/human-in-the-loop-manager'
import {
@@ -22,6 +23,7 @@ import {
getSubBlockValue,
} from '@/lib/workflows/schedules/utils'
import { type ExecutionMetadata, ExecutionSnapshot } from '@/executor/execution/snapshot'
import type { ExecutionResult } from '@/executor/types'
import { mergeSubblockState } from '@/stores/workflows/server-utils'
const logger = createLogger('TriggerScheduleExecution')
@@ -293,6 +295,9 @@ async function runWorkflowExecution({
)
try {
const executionResult = (earlyError as any)?.executionResult as ExecutionResult | undefined
const { traceSpans } = executionResult ? buildTraceSpans(executionResult) : { traceSpans: [] }
await loggingSession.safeCompleteWithError({
error: {
message: `Schedule execution failed: ${
@@ -300,7 +305,7 @@ async function runWorkflowExecution({
}`,
stackTrace: earlyError instanceof Error ? earlyError.stack : undefined,
},
traceSpans: [],
traceSpans,
})
} catch (loggingError) {
logger.error(`[${requestId}] Failed to complete log entry for schedule failure`, loggingError)

View File

@@ -118,11 +118,9 @@ export function calculateCostSummary(traceSpans: any[]): {
totalCost += span.cost.total || 0
totalInputCost += span.cost.input || 0
totalOutputCost += span.cost.output || 0
const promptTokens = span.tokens?.prompt ?? span.tokens?.input ?? 0
const completionTokens = span.tokens?.completion ?? span.tokens?.output ?? 0
totalTokens += span.tokens?.total || 0
totalPromptTokens += promptTokens
totalCompletionTokens += completionTokens
totalPromptTokens += span.tokens?.input ?? span.tokens?.prompt ?? 0
totalCompletionTokens += span.tokens?.output ?? span.tokens?.completion ?? 0
if (span.model) {
const model = span.model
@@ -137,8 +135,8 @@ export function calculateCostSummary(traceSpans: any[]): {
models[model].input += span.cost.input || 0
models[model].output += span.cost.output || 0
models[model].total += span.cost.total || 0
models[model].tokens.prompt += promptTokens
models[model].tokens.completion += completionTokens
models[model].tokens.prompt += span.tokens?.input ?? span.tokens?.prompt ?? 0
models[model].tokens.completion += span.tokens?.output ?? span.tokens?.completion ?? 0
models[model].tokens.total += span.tokens?.total || 0
}
}

View File

@@ -186,22 +186,24 @@ export class LoggingSession {
const durationMs = typeof totalDurationMs === 'number' ? totalDurationMs : 0
const startTime = new Date(endTime.getTime() - Math.max(1, durationMs))
const costSummary = {
totalCost: BASE_EXECUTION_CHARGE,
totalInputCost: 0,
totalOutputCost: 0,
totalTokens: 0,
totalPromptTokens: 0,
totalCompletionTokens: 0,
baseExecutionCharge: BASE_EXECUTION_CHARGE,
modelCost: 0,
models: {},
}
const hasProvidedSpans = Array.isArray(traceSpans) && traceSpans.length > 0
const costSummary = hasProvidedSpans
? calculateCostSummary(traceSpans)
: {
totalCost: BASE_EXECUTION_CHARGE,
totalInputCost: 0,
totalOutputCost: 0,
totalTokens: 0,
totalPromptTokens: 0,
totalCompletionTokens: 0,
baseExecutionCharge: BASE_EXECUTION_CHARGE,
modelCost: 0,
models: {},
}
const message = error?.message || 'Execution failed before starting blocks'
const hasProvidedSpans = Array.isArray(traceSpans) && traceSpans.length > 0
const errorSpan: TraceSpan = {
id: 'workflow-error-root',
name: 'Workflow Error',