From ddea5458d020efed3c0050d9e0962d9a50608582 Mon Sep 17 00:00:00 2001 From: Rob Dunn Date: Mon, 16 Feb 2026 08:07:51 -0700 Subject: [PATCH] cron: log model+token usage per run + add usage report script --- scripts/cron_usage_report.ts | 248 +++++++++++++++++++++++++++++++++ src/cron/isolated-agent/run.ts | 52 ++++++- src/cron/service/state.ts | 22 +++ src/cron/service/timer.ts | 42 ++++++ src/gateway/server-cron.ts | 3 + 5 files changed, 361 insertions(+), 6 deletions(-) create mode 100644 scripts/cron_usage_report.ts diff --git a/scripts/cron_usage_report.ts b/scripts/cron_usage_report.ts new file mode 100644 index 0000000000..e02162fc17 --- /dev/null +++ b/scripts/cron_usage_report.ts @@ -0,0 +1,248 @@ +import fs from "node:fs/promises"; +import path from "node:path"; + +type Usage = { + input_tokens?: number; + output_tokens?: number; + total_tokens?: number; + cache_read_tokens?: number; + cache_write_tokens?: number; +}; + +type CronRunLogEntry = { + ts: number; + jobId: string; + action: "finished"; + status?: "ok" | "error" | "skipped"; + model?: string; + provider?: string; + usage?: Usage; +}; + +function parseArgs(argv: string[]) { + const args: Record = {}; + for (let i = 2; i < argv.length; i++) { + const a = argv[i] ?? ""; + if (!a.startsWith("--")) { + continue; + } + const key = a.slice(2); + const next = argv[i + 1]; + if (next && !next.startsWith("--")) { + args[key] = next; + i++; + } else { + args[key] = true; + } + } + return args; +} + +function usageAndExit(code: number): never { + console.error( + [ + "cron_usage_report.ts", + "", + "Required (choose one):", + " --store (derive runs dir as dirname(store)/runs)", + " --runsDir ", + "", + "Time window:", + " --hours (default 24)", + " --from (overrides --hours)", + " --to (default now)", + "", + "Filters:", + " --jobId ", + " --model ", + "", + "Output:", + " --json (emit JSON)", + ].join("\n"), + ); + process.exit(code); +} + +async function listJsonlFiles(dir: string): Promise { + const entries = await fs.readdir(dir, { withFileTypes: true }).catch(() => []); + return entries + .filter((e) => e.isFile() && e.name.endsWith(".jsonl")) + .map((e) => path.join(dir, e.name)); +} + +function safeParseLine(line: string): CronRunLogEntry | null { + try { + const obj = JSON.parse(line) as Partial | null; + if (!obj || typeof obj !== "object") return null; + if (obj.action !== "finished") return null; + if (typeof obj.ts !== "number" || !Number.isFinite(obj.ts)) return null; + if (typeof obj.jobId !== "string" || !obj.jobId.trim()) return null; + return obj as CronRunLogEntry; + } catch { + return null; + } +} + +function fmtInt(n: number) { + return new Intl.NumberFormat("en-US", { maximumFractionDigits: 0 }).format(n); +} + +export async function main() { + const args = parseArgs(process.argv); + const store = typeof args.store === "string" ? args.store : undefined; + const runsDirArg = typeof args.runsDir === "string" ? args.runsDir : undefined; + const runsDir = runsDirArg ?? (store ? path.join(path.dirname(path.resolve(store)), "runs") : null); + if (!runsDir) { + usageAndExit(2); + } + + const hours = typeof args.hours === "string" ? Number(args.hours) : 24; + const toMs = typeof args.to === "string" ? Date.parse(args.to) : Date.now(); + const fromMs = + typeof args.from === "string" + ? Date.parse(args.from) + : toMs - Math.max(1, Number.isFinite(hours) ? hours : 24) * 60 * 60 * 1000; + + if (!Number.isFinite(fromMs) || !Number.isFinite(toMs)) { + console.error("Invalid --from/--to timestamp"); + process.exit(2); + } + + const filterJobId = typeof args.jobId === "string" ? args.jobId.trim() : ""; + const filterModel = typeof args.model === "string" ? args.model.trim() : ""; + const asJson = args.json === true; + + const files = await listJsonlFiles(runsDir); + const totalsByJob: Record< + string, + { + jobId: string; + runs: number; + models: Record< + string, + { + model: string; + runs: number; + input_tokens: number; + output_tokens: number; + total_tokens: number; + missingUsageRuns: number; + } + >; + input_tokens: number; + output_tokens: number; + total_tokens: number; + missingUsageRuns: number; + } + > = {}; + + for (const file of files) { + const raw = await fs.readFile(file, "utf-8").catch(() => ""); + if (!raw.trim()) continue; + const lines = raw.split("\n"); + for (const line of lines) { + const entry = safeParseLine(line.trim()); + if (!entry) continue; + if (entry.ts < fromMs || entry.ts > toMs) continue; + if (filterJobId && entry.jobId !== filterJobId) continue; + const model = (entry.model ?? "").trim() || ""; + if (filterModel && model !== filterModel) continue; + + const jobId = entry.jobId; + const usage = entry.usage; + const hasUsage = Boolean(usage && (usage.total_tokens ?? usage.input_tokens ?? usage.output_tokens) !== undefined); + + const jobAgg = (totalsByJob[jobId] ??= { + jobId, + runs: 0, + models: {}, + input_tokens: 0, + output_tokens: 0, + total_tokens: 0, + missingUsageRuns: 0, + }); + jobAgg.runs++; + + const modelAgg = (jobAgg.models[model] ??= { + model, + runs: 0, + input_tokens: 0, + output_tokens: 0, + total_tokens: 0, + missingUsageRuns: 0, + }); + modelAgg.runs++; + + if (!hasUsage) { + jobAgg.missingUsageRuns++; + modelAgg.missingUsageRuns++; + continue; + } + + const input = Math.max(0, Math.trunc(usage?.input_tokens ?? 0)); + const output = Math.max(0, Math.trunc(usage?.output_tokens ?? 0)); + const total = Math.max(0, Math.trunc(usage?.total_tokens ?? input + output)); + + jobAgg.input_tokens += input; + jobAgg.output_tokens += output; + jobAgg.total_tokens += total; + + modelAgg.input_tokens += input; + modelAgg.output_tokens += output; + modelAgg.total_tokens += total; + } + } + + const rows = Object.values(totalsByJob) + .map((r) => ({ + ...r, + models: Object.values(r.models).toSorted((a, b) => b.total_tokens - a.total_tokens), + })) + .toSorted((a, b) => b.total_tokens - a.total_tokens); + + if (asJson) { + process.stdout.write( + JSON.stringify( + { + from: new Date(fromMs).toISOString(), + to: new Date(toMs).toISOString(), + runsDir, + jobs: rows, + }, + null, + 2, + ) + "\n", + ); + return; + } + + console.log(`Cron usage report`); + console.log(` runsDir: ${runsDir}`); + console.log(` window: ${new Date(fromMs).toISOString()} → ${new Date(toMs).toISOString()}`); + if (filterJobId) console.log(` filter jobId: ${filterJobId}`); + if (filterModel) console.log(` filter model: ${filterModel}`); + console.log(""); + + if (rows.length === 0) { + console.log("No matching cron run entries found."); + return; + } + + for (const job of rows) { + console.log(`jobId: ${job.jobId}`); + console.log(` runs: ${fmtInt(job.runs)} (missing usage: ${fmtInt(job.missingUsageRuns)})`); + console.log( + ` tokens: total ${fmtInt(job.total_tokens)} (in ${fmtInt(job.input_tokens)} / out ${fmtInt(job.output_tokens)})`, + ); + for (const m of job.models) { + console.log( + ` model ${m.model}: runs ${fmtInt(m.runs)} (missing usage: ${fmtInt(m.missingUsageRuns)}), total ${fmtInt(m.total_tokens)} (in ${fmtInt(m.input_tokens)} / out ${fmtInt(m.output_tokens)})`, + ); + } + console.log(""); + } +} + +if (import.meta.url === `file://${process.argv[1]}`) { + void main(); +} diff --git a/src/cron/isolated-agent/run.ts b/src/cron/isolated-agent/run.ts index 9c5bd29278..064d829bc6 100644 --- a/src/cron/isolated-agent/run.ts +++ b/src/cron/isolated-agent/run.ts @@ -116,6 +116,17 @@ export type RunCronAgentTurnResult = { * messages. See: https://github.com/openclaw/openclaw/issues/15692 */ delivered?: boolean; + + // Telemetry (best-effort) + model?: string; + provider?: string; + usage?: { + input_tokens?: number; + output_tokens?: number; + total_tokens?: number; + cache_read_tokens?: number; + cache_write_tokens?: number; + }; }; export async function runCronIsolatedAgentTurn(params: { @@ -474,6 +485,20 @@ export async function runCronIsolatedAgentTurn(params: { const payloads = runResult.payloads ?? []; // Update token+model fields in the session store. + // Also collect best-effort telemetry for the cron run log. + let telemetry: + | { + model?: string; + provider?: string; + usage?: { + input_tokens?: number; + output_tokens?: number; + total_tokens?: number; + cache_read_tokens?: number; + cache_write_tokens?: number; + }; + } + | undefined; { const usage = runResult.meta.agentMeta?.usage; const promptTokens = runResult.meta.agentMeta?.promptTokens; @@ -504,6 +529,21 @@ export async function runCronIsolatedAgentTurn(params: { cronSession.sessionEntry.outputTokens = output; cronSession.sessionEntry.totalTokens = totalTokens; cronSession.sessionEntry.totalTokensFresh = true; + + telemetry = { + model: modelUsed, + provider: providerUsed, + usage: { + input_tokens: input, + output_tokens: output, + total_tokens: totalTokens, + }, + }; + } else { + telemetry = { + model: modelUsed, + provider: providerUsed, + }; } await persistSessionEntry(); } @@ -552,7 +592,7 @@ export async function runCronIsolatedAgentTurn(params: { }); } logWarn(`[cron:${params.job.id}] ${resolvedDelivery.error.message}`); - return withRunSession({ status: "ok", summary, outputText }); + return withRunSession({ status: "ok", summary, outputText, ...telemetry }); } if (!resolvedDelivery.to) { const message = "cron delivery target is missing"; @@ -565,7 +605,7 @@ export async function runCronIsolatedAgentTurn(params: { }); } logWarn(`[cron:${params.job.id}] ${message}`); - return withRunSession({ status: "ok", summary, outputText }); + return withRunSession({ status: "ok", summary, outputText, ...telemetry }); } const identity = resolveAgentOutboundIdentity(cfgWithAgentDefaults, agentId); @@ -643,7 +683,7 @@ export async function runCronIsolatedAgentTurn(params: { if (activeSubagentRuns > 0) { // Parent orchestration is still in progress; avoid announcing a partial // update to the main requester. - return withRunSession({ status: "ok", summary, outputText }); + return withRunSession({ status: "ok", summary, outputText, ...telemetry }); } if ( (hadActiveDescendants || expectedSubagentFollowup) && @@ -653,10 +693,10 @@ export async function runCronIsolatedAgentTurn(params: { ) { // Descendants existed but no post-orchestration synthesis arrived, so // suppress stale parent text like "on it, pulling everything together". - return withRunSession({ status: "ok", summary, outputText }); + return withRunSession({ status: "ok", summary, outputText, ...telemetry }); } if (synthesizedText.toUpperCase() === SILENT_REPLY_TOKEN.toUpperCase()) { - return withRunSession({ status: "ok", summary, outputText }); + return withRunSession({ status: "ok", summary, outputText, ...telemetry }); } try { const didAnnounce = await runSubagentAnnounceFlow({ @@ -703,5 +743,5 @@ export async function runCronIsolatedAgentTurn(params: { } } - return withRunSession({ status: "ok", summary, outputText, delivered }); + return withRunSession({ status: "ok", summary, outputText, delivered, ...telemetry }); } diff --git a/src/cron/service/state.ts b/src/cron/service/state.ts index 0ba0f86ac7..0d30bc99a0 100644 --- a/src/cron/service/state.ts +++ b/src/cron/service/state.ts @@ -13,6 +13,17 @@ export type CronEvent = { sessionId?: string; sessionKey?: string; nextRunAtMs?: number; + + // Telemetry (best-effort) + model?: string; + provider?: string; + usage?: { + input_tokens?: number; + output_tokens?: number; + total_tokens?: number; + cache_read_tokens?: number; + cache_write_tokens?: number; + }; }; export type Logger = { @@ -60,6 +71,17 @@ export type CronServiceDeps = { * https://github.com/openclaw/openclaw/issues/15692 */ delivered?: boolean; + + // Telemetry (best-effort) + model?: string; + provider?: string; + usage?: { + input_tokens?: number; + output_tokens?: number; + total_tokens?: number; + cache_read_tokens?: number; + cache_write_tokens?: number; + }; }>; onEvent?: (evt: CronEvent) => void; }; diff --git a/src/cron/service/timer.ts b/src/cron/service/timer.ts index 1d26401afb..cf38396201 100644 --- a/src/cron/service/timer.ts +++ b/src/cron/service/timer.ts @@ -216,6 +216,15 @@ export async function onTimer(state: CronServiceState) { summary?: string; sessionId?: string; sessionKey?: string; + model?: string; + provider?: string; + usage?: { + input_tokens?: number; + output_tokens?: number; + total_tokens?: number; + cache_read_tokens?: number; + cache_write_tokens?: number; + }; startedAt: number; endedAt: number; }> = []; @@ -426,6 +435,15 @@ async function executeJobCore( summary?: string; sessionId?: string; sessionKey?: string; + model?: string; + provider?: string; + usage?: { + input_tokens?: number; + output_tokens?: number; + total_tokens?: number; + cache_read_tokens?: number; + cache_write_tokens?: number; + }; }> { if (job.sessionTarget === "main") { const text = resolveJobPayloadTextForMain(job); @@ -515,6 +533,9 @@ async function executeJobCore( summary: res.summary, sessionId: res.sessionId, sessionKey: res.sessionKey, + model: res.model, + provider: res.provider, + usage: res.usage, }; } @@ -542,6 +563,15 @@ export async function executeJob( summary?: string; sessionId?: string; sessionKey?: string; + model?: string; + provider?: string; + usage?: { + input_tokens?: number; + output_tokens?: number; + total_tokens?: number; + cache_read_tokens?: number; + cache_write_tokens?: number; + }; }; try { coreResult = await executeJobCore(state, job); @@ -574,6 +604,15 @@ function emitJobFinished( summary?: string; sessionId?: string; sessionKey?: string; + model?: string; + provider?: string; + usage?: { + input_tokens?: number; + output_tokens?: number; + total_tokens?: number; + cache_read_tokens?: number; + cache_write_tokens?: number; + }; }, runAtMs: number, ) { @@ -588,6 +627,9 @@ function emitJobFinished( runAtMs, durationMs: job.state.lastDurationMs, nextRunAtMs: job.state.nextRunAtMs, + model: result.model, + provider: result.provider, + usage: result.usage, }); } diff --git a/src/gateway/server-cron.ts b/src/gateway/server-cron.ts index a5d1c711b3..d660148215 100644 --- a/src/gateway/server-cron.ts +++ b/src/gateway/server-cron.ts @@ -213,6 +213,9 @@ export function buildGatewayCronService(params: { runAtMs: evt.runAtMs, durationMs: evt.durationMs, nextRunAtMs: evt.nextRunAtMs, + model: evt.model, + provider: evt.provider, + usage: evt.usage, }).catch((err) => { cronLogger.warn({ err: String(err), logPath }, "cron: run log append failed"); });