From de656e319471768b41ef74d8a16da6e251a62605 Mon Sep 17 00:00:00 2001 From: Vincent Koc Date: Thu, 19 Feb 2026 02:36:47 -0800 Subject: [PATCH] fix(otel): complete diagnostics-otel OpenTelemetry v2 API migration (#12897) * fix(otel): complete diagnostics-otel OpenTelemetry v2 API migration * chore(format): align otel files with updated oxfmt config * chore(format): apply updated oxfmt spacing to otel diagnostics --- .../diagnostics-otel/src/service.test.ts | 5 +- extensions/diagnostics-otel/src/service.ts | 302 ++++++++++-------- src/infra/diagnostic-events.ts | 60 +++- 3 files changed, 219 insertions(+), 148 deletions(-) diff --git a/extensions/diagnostics-otel/src/service.test.ts b/extensions/diagnostics-otel/src/service.test.ts index 6607cb1097..6a7620c54b 100644 --- a/extensions/diagnostics-otel/src/service.test.ts +++ b/extensions/diagnostics-otel/src/service.test.ts @@ -70,7 +70,6 @@ vi.mock("@opentelemetry/exporter-logs-otlp-http", () => ({ vi.mock("@opentelemetry/sdk-logs", () => ({ BatchLogRecordProcessor: class {}, LoggerProvider: class { - addLogRecordProcessor = vi.fn(); getLogger = vi.fn(() => ({ emit: logEmit, })); @@ -96,9 +95,7 @@ vi.mock("@opentelemetry/resources", () => ({ })); vi.mock("@opentelemetry/semantic-conventions", () => ({ - SemanticResourceAttributes: { - SERVICE_NAME: "service.name", - }, + ATTR_SERVICE_NAME: "service.name", })); vi.mock("openclaw/plugin-sdk", async () => { diff --git a/extensions/diagnostics-otel/src/service.ts b/extensions/diagnostics-otel/src/service.ts index f1babf4732..78975eb36e 100644 --- a/extensions/diagnostics-otel/src/service.ts +++ b/extensions/diagnostics-otel/src/service.ts @@ -8,7 +8,7 @@ import { BatchLogRecordProcessor, LoggerProvider } from "@opentelemetry/sdk-logs import { PeriodicExportingMetricReader } from "@opentelemetry/sdk-metrics"; import { NodeSDK } from "@opentelemetry/sdk-node"; import { ParentBasedSampler, TraceIdRatioBasedSampler } from "@opentelemetry/sdk-trace-base"; -import { SemanticResourceAttributes } from "@opentelemetry/semantic-conventions"; +import { ATTR_SERVICE_NAME } from "@opentelemetry/semantic-conventions"; import type { DiagnosticEventPayload, OpenClawPluginService } from "openclaw/plugin-sdk"; import { onDiagnosticEvent, registerLogTransport } from "openclaw/plugin-sdk"; @@ -40,6 +40,20 @@ function resolveSampleRate(value: number | undefined): number | undefined { return value; } +function formatError(err: unknown): string { + if (err instanceof Error) { + return err.stack ?? err.message; + } + if (typeof err === "string") { + return err; + } + try { + return JSON.stringify(err); + } catch { + return String(err); + } +} + export function createDiagnosticsOtelService(): OpenClawPluginService { let sdk: NodeSDK | null = null; let logProvider: LoggerProvider | null = null; @@ -75,7 +89,7 @@ export function createDiagnosticsOtelService(): OpenClawPluginService { } const resource = resourceFromAttributes({ - [SemanticResourceAttributes.SERVICE_NAME]: serviceName, + [ATTR_SERVICE_NAME]: serviceName, }); const traceUrl = resolveOtelUrl(endpoint, "v1/traces"); @@ -118,7 +132,12 @@ export function createDiagnosticsOtelService(): OpenClawPluginService { : {}), }); - sdk.start(); + try { + await sdk.start(); + } catch (err) { + ctx.logger.error(`diagnostics-otel: failed to start SDK: ${formatError(err)}`); + throw err; + } } const logSeverityMap: Record = { @@ -211,115 +230,122 @@ export function createDiagnosticsOtelService(): OpenClawPluginService { ...(logUrl ? { url: logUrl } : {}), ...(headers ? { headers } : {}), }); - const processor = new BatchLogRecordProcessor( + const logProcessor = new BatchLogRecordProcessor( logExporter, typeof otel.flushIntervalMs === "number" ? { scheduledDelayMillis: Math.max(1000, otel.flushIntervalMs) } : {}, ); - logProvider = new LoggerProvider({ resource, processors: [processor] }); + logProvider = new LoggerProvider({ + resource, + processors: [logProcessor], + }); const otelLogger = logProvider.getLogger("openclaw"); stopLogTransport = registerLogTransport((logObj) => { - const safeStringify = (value: unknown) => { - try { - return JSON.stringify(value); - } catch { - return String(value); - } - }; - const meta = (logObj as Record)._meta as - | { - logLevelName?: string; - date?: Date; - name?: string; - parentNames?: string[]; - path?: { - filePath?: string; - fileLine?: string; - fileColumn?: string; - filePathWithLine?: string; - method?: string; - }; + try { + const safeStringify = (value: unknown) => { + try { + return JSON.stringify(value); + } catch { + return String(value); } - | undefined; - const logLevelName = meta?.logLevelName ?? "INFO"; - const severityNumber = logSeverityMap[logLevelName] ?? (9 as SeverityNumber); + }; + const meta = (logObj as Record)._meta as + | { + logLevelName?: string; + date?: Date; + name?: string; + parentNames?: string[]; + path?: { + filePath?: string; + fileLine?: string; + fileColumn?: string; + filePathWithLine?: string; + method?: string; + }; + } + | undefined; + const logLevelName = meta?.logLevelName ?? "INFO"; + const severityNumber = logSeverityMap[logLevelName] ?? (9 as SeverityNumber); - const numericArgs = Object.entries(logObj) - .filter(([key]) => /^\d+$/.test(key)) - .toSorted((a, b) => Number(a[0]) - Number(b[0])) - .map(([, value]) => value); + const numericArgs = Object.entries(logObj) + .filter(([key]) => /^\d+$/.test(key)) + .toSorted((a, b) => Number(a[0]) - Number(b[0])) + .map(([, value]) => value); - let bindings: Record | undefined; - if (typeof numericArgs[0] === "string" && numericArgs[0].trim().startsWith("{")) { - try { - const parsed = JSON.parse(numericArgs[0]); - if (parsed && typeof parsed === "object" && !Array.isArray(parsed)) { - bindings = parsed as Record; - numericArgs.shift(); - } - } catch { - // ignore malformed json bindings - } - } - - let message = ""; - if (numericArgs.length > 0 && typeof numericArgs[numericArgs.length - 1] === "string") { - message = String(numericArgs.pop()); - } else if (numericArgs.length === 1) { - message = safeStringify(numericArgs[0]); - numericArgs.length = 0; - } - if (!message) { - message = "log"; - } - - const attributes: Record = { - "openclaw.log.level": logLevelName, - }; - if (meta?.name) { - attributes["openclaw.logger"] = meta.name; - } - if (meta?.parentNames?.length) { - attributes["openclaw.logger.parents"] = meta.parentNames.join("."); - } - if (bindings) { - for (const [key, value] of Object.entries(bindings)) { - if ( - typeof value === "string" || - typeof value === "number" || - typeof value === "boolean" - ) { - attributes[`openclaw.${key}`] = value; - } else if (value != null) { - attributes[`openclaw.${key}`] = safeStringify(value); + let bindings: Record | undefined; + if (typeof numericArgs[0] === "string" && numericArgs[0].trim().startsWith("{")) { + try { + const parsed = JSON.parse(numericArgs[0]); + if (parsed && typeof parsed === "object" && !Array.isArray(parsed)) { + bindings = parsed as Record; + numericArgs.shift(); + } + } catch { + // ignore malformed json bindings } } - } - if (numericArgs.length > 0) { - attributes["openclaw.log.args"] = safeStringify(numericArgs); - } - if (meta?.path?.filePath) { - attributes["code.filepath"] = meta.path.filePath; - } - if (meta?.path?.fileLine) { - attributes["code.lineno"] = Number(meta.path.fileLine); - } - if (meta?.path?.method) { - attributes["code.function"] = meta.path.method; - } - if (meta?.path?.filePathWithLine) { - attributes["openclaw.code.location"] = meta.path.filePathWithLine; - } - otelLogger.emit({ - body: message, - severityText: logLevelName, - severityNumber, - attributes, - timestamp: meta?.date ?? new Date(), - }); + let message = ""; + if (numericArgs.length > 0 && typeof numericArgs[numericArgs.length - 1] === "string") { + message = String(numericArgs.pop()); + } else if (numericArgs.length === 1) { + message = safeStringify(numericArgs[0]); + numericArgs.length = 0; + } + if (!message) { + message = "log"; + } + + const attributes: Record = { + "openclaw.log.level": logLevelName, + }; + if (meta?.name) { + attributes["openclaw.logger"] = meta.name; + } + if (meta?.parentNames?.length) { + attributes["openclaw.logger.parents"] = meta.parentNames.join("."); + } + if (bindings) { + for (const [key, value] of Object.entries(bindings)) { + if ( + typeof value === "string" || + typeof value === "number" || + typeof value === "boolean" + ) { + attributes[`openclaw.${key}`] = value; + } else if (value != null) { + attributes[`openclaw.${key}`] = safeStringify(value); + } + } + } + if (numericArgs.length > 0) { + attributes["openclaw.log.args"] = safeStringify(numericArgs); + } + if (meta?.path?.filePath) { + attributes["code.filepath"] = meta.path.filePath; + } + if (meta?.path?.fileLine) { + attributes["code.lineno"] = Number(meta.path.fileLine); + } + if (meta?.path?.method) { + attributes["code.function"] = meta.path.method; + } + if (meta?.path?.filePathWithLine) { + attributes["openclaw.code.location"] = meta.path.filePathWithLine; + } + + otelLogger.emit({ + body: message, + severityText: logLevelName, + severityNumber, + attributes, + timestamp: meta?.date ?? new Date(), + }); + } catch (err) { + ctx.logger.error(`diagnostics-otel: log transport failed: ${formatError(err)}`); + } }); } @@ -572,43 +598,49 @@ export function createDiagnosticsOtelService(): OpenClawPluginService { }; unsubscribe = onDiagnosticEvent((evt: DiagnosticEventPayload) => { - switch (evt.type) { - case "model.usage": - recordModelUsage(evt); - return; - case "webhook.received": - recordWebhookReceived(evt); - return; - case "webhook.processed": - recordWebhookProcessed(evt); - return; - case "webhook.error": - recordWebhookError(evt); - return; - case "message.queued": - recordMessageQueued(evt); - return; - case "message.processed": - recordMessageProcessed(evt); - return; - case "queue.lane.enqueue": - recordLaneEnqueue(evt); - return; - case "queue.lane.dequeue": - recordLaneDequeue(evt); - return; - case "session.state": - recordSessionState(evt); - return; - case "session.stuck": - recordSessionStuck(evt); - return; - case "run.attempt": - recordRunAttempt(evt); - return; - case "diagnostic.heartbeat": - recordHeartbeat(evt); - return; + try { + switch (evt.type) { + case "model.usage": + recordModelUsage(evt); + return; + case "webhook.received": + recordWebhookReceived(evt); + return; + case "webhook.processed": + recordWebhookProcessed(evt); + return; + case "webhook.error": + recordWebhookError(evt); + return; + case "message.queued": + recordMessageQueued(evt); + return; + case "message.processed": + recordMessageProcessed(evt); + return; + case "queue.lane.enqueue": + recordLaneEnqueue(evt); + return; + case "queue.lane.dequeue": + recordLaneDequeue(evt); + return; + case "session.state": + recordSessionState(evt); + return; + case "session.stuck": + recordSessionStuck(evt); + return; + case "run.attempt": + recordRunAttempt(evt); + return; + case "diagnostic.heartbeat": + recordHeartbeat(evt); + return; + } + } catch (err) { + ctx.logger.error( + `diagnostics-otel: event handler failed (${evt.type}): ${formatError(err)}`, + ); } }); diff --git a/src/infra/diagnostic-events.ts b/src/infra/diagnostic-events.ts index cf8958dd71..5acf0483a8 100644 --- a/src/infra/diagnostic-events.ts +++ b/src/infra/diagnostic-events.ts @@ -167,34 +167,76 @@ export type DiagnosticEventInput = DiagnosticEventPayload extends infer Event ? Omit : never : never; -let seq = 0; -const listeners = new Set<(evt: DiagnosticEventPayload) => void>(); + +type DiagnosticEventsGlobalState = { + seq: number; + listeners: Set<(evt: DiagnosticEventPayload) => void>; + dispatchDepth: number; +}; + +function getDiagnosticEventsState(): DiagnosticEventsGlobalState { + const globalStore = globalThis as typeof globalThis & { + __openclawDiagnosticEventsState?: DiagnosticEventsGlobalState; + }; + if (!globalStore.__openclawDiagnosticEventsState) { + globalStore.__openclawDiagnosticEventsState = { + seq: 0, + listeners: new Set<(evt: DiagnosticEventPayload) => void>(), + dispatchDepth: 0, + }; + } + return globalStore.__openclawDiagnosticEventsState; +} export function isDiagnosticsEnabled(config?: OpenClawConfig): boolean { return config?.diagnostics?.enabled === true; } export function emitDiagnosticEvent(event: DiagnosticEventInput) { + const state = getDiagnosticEventsState(); + if (state.dispatchDepth > 100) { + console.error( + `[diagnostic-events] recursion guard tripped at depth=${state.dispatchDepth}, dropping type=${event.type}`, + ); + return; + } + const enriched = { ...event, - seq: (seq += 1), + seq: (state.seq += 1), ts: Date.now(), } satisfies DiagnosticEventPayload; - for (const listener of listeners) { + state.dispatchDepth += 1; + for (const listener of state.listeners) { try { listener(enriched); - } catch { + } catch (err) { + const errorMessage = + err instanceof Error + ? (err.stack ?? err.message) + : typeof err === "string" + ? err + : String(err); + console.error( + `[diagnostic-events] listener error type=${enriched.type} seq=${enriched.seq}: ${errorMessage}`, + ); // Ignore listener failures. } } + state.dispatchDepth -= 1; } export function onDiagnosticEvent(listener: (evt: DiagnosticEventPayload) => void): () => void { - listeners.add(listener); - return () => listeners.delete(listener); + const state = getDiagnosticEventsState(); + state.listeners.add(listener); + return () => { + state.listeners.delete(listener); + }; } export function resetDiagnosticEventsForTest(): void { - seq = 0; - listeners.clear(); + const state = getDiagnosticEventsState(); + state.seq = 0; + state.listeners.clear(); + state.dispatchDepth = 0; }