fix(logs): always capture cost, logging size failures (#2487)

* fix(logs): truncate strings in tracespans crashing insertion

* add depth check to not crash

* custom serialization to not break tracepsans

* log costs even in log creation failure

* code cleanup?

* fix typing

* remove null bytes

* increase char limit

* reduce char limit
This commit is contained in:
Vikhyath Mondreti
2025-12-19 17:39:18 -08:00
committed by GitHub
parent df5f823d1c
commit 50c1c6775b
3 changed files with 233 additions and 61 deletions

View File

@@ -1,6 +1,13 @@
/**
* Type guard to check if an object is a UserFile
*/
const MAX_STRING_LENGTH = 15000
const MAX_DEPTH = 50
function truncateString(value: string, maxLength = MAX_STRING_LENGTH): string {
if (value.length <= maxLength) {
return value
}
return `${value.substring(0, maxLength)}... [truncated ${value.length - maxLength} chars]`
}
export function isUserFile(candidate: unknown): candidate is {
id: string
name: string
@@ -23,11 +30,6 @@ export function isUserFile(candidate: unknown): candidate is {
)
}
/**
* Filter function that transforms UserFile objects for display
* Removes internal fields: key, context
* Keeps user-friendly fields: id, name, url, size, type
*/
function filterUserFile(data: any): any {
if (isUserFile(data)) {
const { id, name, url, size, type } = data
@@ -36,50 +38,152 @@ function filterUserFile(data: any): any {
return data
}
/**
* Registry of filter functions to apply to data for cleaner display in logs/console.
* Add new filter functions here to handle additional data types.
*/
const DISPLAY_FILTERS = [
filterUserFile,
// Add more filters here as needed
]
const DISPLAY_FILTERS = [filterUserFile]
/**
* Generic helper to filter internal/technical fields from data for cleaner display in logs and console.
* Applies all registered filters recursively to the data structure.
*
* To add a new filter:
* 1. Create a filter function that checks and transforms a specific data type
* 2. Add it to the DISPLAY_FILTERS array above
*
* @param data - Data to filter (objects, arrays, primitives)
* @returns Filtered data with internal fields removed
*/
export function filterForDisplay(data: any): any {
if (!data || typeof data !== 'object') {
return data
}
// Apply all registered filters
const filtered = data
for (const filterFn of DISPLAY_FILTERS) {
const result = filterFn(filtered)
if (result !== filtered) {
// Filter matched and transformed the data
return result
}
}
// No filters matched - recursively filter nested structures
if (Array.isArray(filtered)) {
return filtered.map(filterForDisplay)
}
// Recursively filter object properties
const result: any = {}
for (const [key, value] of Object.entries(filtered)) {
result[key] = filterForDisplay(value)
}
return result
const seen = new WeakSet()
return filterForDisplayInternal(data, seen, 0)
}
function getObjectType(data: unknown): string {
return Object.prototype.toString.call(data).slice(8, -1)
}
function filterForDisplayInternal(data: any, seen: WeakSet<object>, depth: number): any {
try {
if (data === null || data === undefined) {
return data
}
const dataType = typeof data
if (dataType === 'string') {
// Remove null bytes which are not allowed in PostgreSQL JSONB
const sanitized = data.includes('\u0000') ? data.replace(/\u0000/g, '') : data
return truncateString(sanitized)
}
if (dataType === 'number') {
if (Number.isNaN(data)) {
return '[NaN]'
}
if (!Number.isFinite(data)) {
return data > 0 ? '[Infinity]' : '[-Infinity]'
}
return data
}
if (dataType === 'boolean') {
return data
}
if (dataType === 'bigint') {
return `[BigInt: ${data.toString()}]`
}
if (dataType === 'symbol') {
return `[Symbol: ${data.toString()}]`
}
if (dataType === 'function') {
return `[Function: ${data.name || 'anonymous'}]`
}
if (dataType !== 'object') {
return '[Unknown Type]'
}
if (seen.has(data)) {
return '[Circular Reference]'
}
if (depth > MAX_DEPTH) {
return '[Max Depth Exceeded]'
}
const objectType = getObjectType(data)
switch (objectType) {
case 'Date': {
const timestamp = (data as Date).getTime()
if (Number.isNaN(timestamp)) {
return '[Invalid Date]'
}
return (data as Date).toISOString()
}
case 'RegExp':
return (data as RegExp).toString()
case 'URL':
return (data as URL).toString()
case 'Error': {
const err = data as Error
return {
name: err.name,
message: truncateString(err.message),
stack: err.stack ? truncateString(err.stack) : undefined,
}
}
case 'ArrayBuffer':
return `[ArrayBuffer: ${(data as ArrayBuffer).byteLength} bytes]`
case 'Map': {
const obj: Record<string, any> = {}
for (const [key, value] of (data as Map<any, any>).entries()) {
const keyStr = typeof key === 'string' ? key : String(key)
obj[keyStr] = filterForDisplayInternal(value, seen, depth + 1)
}
return obj
}
case 'Set':
return Array.from(data as Set<any>).map((item) =>
filterForDisplayInternal(item, seen, depth + 1)
)
case 'WeakMap':
return '[WeakMap]'
case 'WeakSet':
return '[WeakSet]'
case 'WeakRef':
return '[WeakRef]'
case 'Promise':
return '[Promise]'
}
if (ArrayBuffer.isView(data)) {
return `[${objectType}: ${(data as ArrayBufferView).byteLength} bytes]`
}
seen.add(data)
for (const filterFn of DISPLAY_FILTERS) {
const result = filterFn(data)
if (result !== data) {
return filterForDisplayInternal(result, seen, depth + 1)
}
}
if (Array.isArray(data)) {
return data.map((item) => filterForDisplayInternal(item, seen, depth + 1))
}
const result: Record<string, any> = {}
for (const key of Object.keys(data)) {
try {
result[key] = filterForDisplayInternal(data[key], seen, depth + 1)
} catch {
result[key] = '[Error accessing property]'
}
}
return result
} catch {
return '[Unserializable]'
}
}

View File

@@ -230,6 +230,7 @@ export class ExecutionLogger implements IExecutionLoggerService {
traceSpans?: TraceSpan[]
workflowInput?: any
isResume?: boolean // If true, merge with existing data instead of replacing
level?: 'info' | 'error' // Optional override for log level (used in cost-only fallback)
}): Promise<WorkflowExecutionLog> {
const {
executionId,
@@ -240,6 +241,7 @@ export class ExecutionLogger implements IExecutionLoggerService {
traceSpans,
workflowInput,
isResume,
level: levelOverride,
} = params
logger.debug(`Completing workflow execution ${executionId}`, { isResume })
@@ -256,6 +258,7 @@ export class ExecutionLogger implements IExecutionLoggerService {
}
// Determine if workflow failed by checking trace spans for errors
// Use the override if provided (for cost-only fallback scenarios)
const hasErrors = traceSpans?.some((span: any) => {
const checkSpanForErrors = (s: any): boolean => {
if (s.status === 'error') return true
@@ -267,7 +270,7 @@ export class ExecutionLogger implements IExecutionLoggerService {
return checkSpanForErrors(span)
})
const level = hasErrors ? 'error' : 'info'
const level = levelOverride ?? (hasErrors ? 'error' : 'info')
// Extract files from trace spans, final output, and workflow input
const executionFiles = this.extractFilesFromExecution(traceSpans, finalOutput, workflowInput)

View File

@@ -29,7 +29,7 @@ export interface SessionCompleteParams {
endedAt?: string
totalDurationMs?: number
finalOutput?: any
traceSpans?: any[]
traceSpans?: TraceSpan[]
workflowInput?: any
}
@@ -331,20 +331,85 @@ export class LoggingSession {
try {
await this.complete(params)
} catch (error) {
// Error already logged in complete(), log a summary here
const errorMsg = error instanceof Error ? error.message : String(error)
logger.warn(
`[${this.requestId || 'unknown'}] Logging completion failed for execution ${this.executionId} - execution data not persisted`
`[${this.requestId || 'unknown'}] Complete failed for execution ${this.executionId}, attempting fallback`,
{ error: errorMsg }
)
await this.completeWithCostOnlyLog({
traceSpans: params.traceSpans,
endedAt: params.endedAt,
totalDurationMs: params.totalDurationMs,
errorMessage: `Failed to store trace spans: ${errorMsg}`,
isError: false,
})
}
}
async safeCompleteWithError(error?: SessionErrorCompleteParams): Promise<void> {
async safeCompleteWithError(params?: SessionErrorCompleteParams): Promise<void> {
try {
await this.completeWithError(error)
} catch (enhancedError) {
// Error already logged in completeWithError(), log a summary here
await this.completeWithError(params)
} catch (error) {
const errorMsg = error instanceof Error ? error.message : String(error)
logger.warn(
`[${this.requestId || 'unknown'}] Error logging completion failed for execution ${this.executionId} - execution data not persisted`
`[${this.requestId || 'unknown'}] CompleteWithError failed for execution ${this.executionId}, attempting fallback`,
{ error: errorMsg }
)
await this.completeWithCostOnlyLog({
traceSpans: params?.traceSpans,
endedAt: params?.endedAt,
totalDurationMs: params?.totalDurationMs,
errorMessage:
params?.error?.message || `Execution failed to store trace spans: ${errorMsg}`,
isError: true,
})
}
}
private async completeWithCostOnlyLog(params: {
traceSpans?: TraceSpan[]
endedAt?: string
totalDurationMs?: number
errorMessage: string
isError: boolean
}): Promise<void> {
logger.warn(
`[${this.requestId || 'unknown'}] Logging completion failed for execution ${this.executionId} - attempting cost-only fallback`
)
try {
const costSummary = params.traceSpans?.length
? calculateCostSummary(params.traceSpans)
: {
totalCost: BASE_EXECUTION_CHARGE,
totalInputCost: 0,
totalOutputCost: 0,
totalTokens: 0,
totalPromptTokens: 0,
totalCompletionTokens: 0,
baseExecutionCharge: BASE_EXECUTION_CHARGE,
modelCost: 0,
models: {},
}
await executionLogger.completeWorkflowExecution({
executionId: this.executionId,
endedAt: params.endedAt || new Date().toISOString(),
totalDurationMs: params.totalDurationMs || 0,
costSummary,
finalOutput: { _fallback: true, error: params.errorMessage },
traceSpans: [],
isResume: this.isResume,
level: params.isError ? 'error' : 'info',
})
logger.info(
`[${this.requestId || 'unknown'}] Cost-only fallback succeeded for execution ${this.executionId}`
)
} catch (fallbackError) {
logger.error(
`[${this.requestId || 'unknown'}] Cost-only fallback also failed for execution ${this.executionId}:`,
{ error: fallbackError instanceof Error ? fallbackError.message : String(fallbackError) }
)
}
}