mirror of
https://github.com/openclaw/openclaw.git
synced 2026-02-19 18:39:20 -05:00
cron: log model+token usage per run + add usage report script
This commit is contained in:
committed by
Peter Steinberger
parent
edbc68e9f1
commit
ddea5458d0
248
scripts/cron_usage_report.ts
Normal file
248
scripts/cron_usage_report.ts
Normal file
@@ -0,0 +1,248 @@
|
||||
import fs from "node:fs/promises";
|
||||
import path from "node:path";
|
||||
|
||||
type Usage = {
|
||||
input_tokens?: number;
|
||||
output_tokens?: number;
|
||||
total_tokens?: number;
|
||||
cache_read_tokens?: number;
|
||||
cache_write_tokens?: number;
|
||||
};
|
||||
|
||||
type CronRunLogEntry = {
|
||||
ts: number;
|
||||
jobId: string;
|
||||
action: "finished";
|
||||
status?: "ok" | "error" | "skipped";
|
||||
model?: string;
|
||||
provider?: string;
|
||||
usage?: Usage;
|
||||
};
|
||||
|
||||
function parseArgs(argv: string[]) {
|
||||
const args: Record<string, string | boolean> = {};
|
||||
for (let i = 2; i < argv.length; i++) {
|
||||
const a = argv[i] ?? "";
|
||||
if (!a.startsWith("--")) {
|
||||
continue;
|
||||
}
|
||||
const key = a.slice(2);
|
||||
const next = argv[i + 1];
|
||||
if (next && !next.startsWith("--")) {
|
||||
args[key] = next;
|
||||
i++;
|
||||
} else {
|
||||
args[key] = true;
|
||||
}
|
||||
}
|
||||
return args;
|
||||
}
|
||||
|
||||
function usageAndExit(code: number): never {
|
||||
console.error(
|
||||
[
|
||||
"cron_usage_report.ts",
|
||||
"",
|
||||
"Required (choose one):",
|
||||
" --store <path-to-cron-store-json> (derive runs dir as dirname(store)/runs)",
|
||||
" --runsDir <path-to-runs-dir>",
|
||||
"",
|
||||
"Time window:",
|
||||
" --hours <n> (default 24)",
|
||||
" --from <iso> (overrides --hours)",
|
||||
" --to <iso> (default now)",
|
||||
"",
|
||||
"Filters:",
|
||||
" --jobId <id>",
|
||||
" --model <name>",
|
||||
"",
|
||||
"Output:",
|
||||
" --json (emit JSON)",
|
||||
].join("\n"),
|
||||
);
|
||||
process.exit(code);
|
||||
}
|
||||
|
||||
async function listJsonlFiles(dir: string): Promise<string[]> {
|
||||
const entries = await fs.readdir(dir, { withFileTypes: true }).catch(() => []);
|
||||
return entries
|
||||
.filter((e) => e.isFile() && e.name.endsWith(".jsonl"))
|
||||
.map((e) => path.join(dir, e.name));
|
||||
}
|
||||
|
||||
function safeParseLine(line: string): CronRunLogEntry | null {
|
||||
try {
|
||||
const obj = JSON.parse(line) as Partial<CronRunLogEntry> | null;
|
||||
if (!obj || typeof obj !== "object") return null;
|
||||
if (obj.action !== "finished") return null;
|
||||
if (typeof obj.ts !== "number" || !Number.isFinite(obj.ts)) return null;
|
||||
if (typeof obj.jobId !== "string" || !obj.jobId.trim()) return null;
|
||||
return obj as CronRunLogEntry;
|
||||
} catch {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
function fmtInt(n: number) {
|
||||
return new Intl.NumberFormat("en-US", { maximumFractionDigits: 0 }).format(n);
|
||||
}
|
||||
|
||||
export async function main() {
|
||||
const args = parseArgs(process.argv);
|
||||
const store = typeof args.store === "string" ? args.store : undefined;
|
||||
const runsDirArg = typeof args.runsDir === "string" ? args.runsDir : undefined;
|
||||
const runsDir = runsDirArg ?? (store ? path.join(path.dirname(path.resolve(store)), "runs") : null);
|
||||
if (!runsDir) {
|
||||
usageAndExit(2);
|
||||
}
|
||||
|
||||
const hours = typeof args.hours === "string" ? Number(args.hours) : 24;
|
||||
const toMs = typeof args.to === "string" ? Date.parse(args.to) : Date.now();
|
||||
const fromMs =
|
||||
typeof args.from === "string"
|
||||
? Date.parse(args.from)
|
||||
: toMs - Math.max(1, Number.isFinite(hours) ? hours : 24) * 60 * 60 * 1000;
|
||||
|
||||
if (!Number.isFinite(fromMs) || !Number.isFinite(toMs)) {
|
||||
console.error("Invalid --from/--to timestamp");
|
||||
process.exit(2);
|
||||
}
|
||||
|
||||
const filterJobId = typeof args.jobId === "string" ? args.jobId.trim() : "";
|
||||
const filterModel = typeof args.model === "string" ? args.model.trim() : "";
|
||||
const asJson = args.json === true;
|
||||
|
||||
const files = await listJsonlFiles(runsDir);
|
||||
const totalsByJob: Record<
|
||||
string,
|
||||
{
|
||||
jobId: string;
|
||||
runs: number;
|
||||
models: Record<
|
||||
string,
|
||||
{
|
||||
model: string;
|
||||
runs: number;
|
||||
input_tokens: number;
|
||||
output_tokens: number;
|
||||
total_tokens: number;
|
||||
missingUsageRuns: number;
|
||||
}
|
||||
>;
|
||||
input_tokens: number;
|
||||
output_tokens: number;
|
||||
total_tokens: number;
|
||||
missingUsageRuns: number;
|
||||
}
|
||||
> = {};
|
||||
|
||||
for (const file of files) {
|
||||
const raw = await fs.readFile(file, "utf-8").catch(() => "");
|
||||
if (!raw.trim()) continue;
|
||||
const lines = raw.split("\n");
|
||||
for (const line of lines) {
|
||||
const entry = safeParseLine(line.trim());
|
||||
if (!entry) continue;
|
||||
if (entry.ts < fromMs || entry.ts > toMs) continue;
|
||||
if (filterJobId && entry.jobId !== filterJobId) continue;
|
||||
const model = (entry.model ?? "<unknown>").trim() || "<unknown>";
|
||||
if (filterModel && model !== filterModel) continue;
|
||||
|
||||
const jobId = entry.jobId;
|
||||
const usage = entry.usage;
|
||||
const hasUsage = Boolean(usage && (usage.total_tokens ?? usage.input_tokens ?? usage.output_tokens) !== undefined);
|
||||
|
||||
const jobAgg = (totalsByJob[jobId] ??= {
|
||||
jobId,
|
||||
runs: 0,
|
||||
models: {},
|
||||
input_tokens: 0,
|
||||
output_tokens: 0,
|
||||
total_tokens: 0,
|
||||
missingUsageRuns: 0,
|
||||
});
|
||||
jobAgg.runs++;
|
||||
|
||||
const modelAgg = (jobAgg.models[model] ??= {
|
||||
model,
|
||||
runs: 0,
|
||||
input_tokens: 0,
|
||||
output_tokens: 0,
|
||||
total_tokens: 0,
|
||||
missingUsageRuns: 0,
|
||||
});
|
||||
modelAgg.runs++;
|
||||
|
||||
if (!hasUsage) {
|
||||
jobAgg.missingUsageRuns++;
|
||||
modelAgg.missingUsageRuns++;
|
||||
continue;
|
||||
}
|
||||
|
||||
const input = Math.max(0, Math.trunc(usage?.input_tokens ?? 0));
|
||||
const output = Math.max(0, Math.trunc(usage?.output_tokens ?? 0));
|
||||
const total = Math.max(0, Math.trunc(usage?.total_tokens ?? input + output));
|
||||
|
||||
jobAgg.input_tokens += input;
|
||||
jobAgg.output_tokens += output;
|
||||
jobAgg.total_tokens += total;
|
||||
|
||||
modelAgg.input_tokens += input;
|
||||
modelAgg.output_tokens += output;
|
||||
modelAgg.total_tokens += total;
|
||||
}
|
||||
}
|
||||
|
||||
const rows = Object.values(totalsByJob)
|
||||
.map((r) => ({
|
||||
...r,
|
||||
models: Object.values(r.models).toSorted((a, b) => b.total_tokens - a.total_tokens),
|
||||
}))
|
||||
.toSorted((a, b) => b.total_tokens - a.total_tokens);
|
||||
|
||||
if (asJson) {
|
||||
process.stdout.write(
|
||||
JSON.stringify(
|
||||
{
|
||||
from: new Date(fromMs).toISOString(),
|
||||
to: new Date(toMs).toISOString(),
|
||||
runsDir,
|
||||
jobs: rows,
|
||||
},
|
||||
null,
|
||||
2,
|
||||
) + "\n",
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
console.log(`Cron usage report`);
|
||||
console.log(` runsDir: ${runsDir}`);
|
||||
console.log(` window: ${new Date(fromMs).toISOString()} → ${new Date(toMs).toISOString()}`);
|
||||
if (filterJobId) console.log(` filter jobId: ${filterJobId}`);
|
||||
if (filterModel) console.log(` filter model: ${filterModel}`);
|
||||
console.log("");
|
||||
|
||||
if (rows.length === 0) {
|
||||
console.log("No matching cron run entries found.");
|
||||
return;
|
||||
}
|
||||
|
||||
for (const job of rows) {
|
||||
console.log(`jobId: ${job.jobId}`);
|
||||
console.log(` runs: ${fmtInt(job.runs)} (missing usage: ${fmtInt(job.missingUsageRuns)})`);
|
||||
console.log(
|
||||
` tokens: total ${fmtInt(job.total_tokens)} (in ${fmtInt(job.input_tokens)} / out ${fmtInt(job.output_tokens)})`,
|
||||
);
|
||||
for (const m of job.models) {
|
||||
console.log(
|
||||
` model ${m.model}: runs ${fmtInt(m.runs)} (missing usage: ${fmtInt(m.missingUsageRuns)}), total ${fmtInt(m.total_tokens)} (in ${fmtInt(m.input_tokens)} / out ${fmtInt(m.output_tokens)})`,
|
||||
);
|
||||
}
|
||||
console.log("");
|
||||
}
|
||||
}
|
||||
|
||||
if (import.meta.url === `file://${process.argv[1]}`) {
|
||||
void main();
|
||||
}
|
||||
@@ -116,6 +116,17 @@ export type RunCronAgentTurnResult = {
|
||||
* messages. See: https://github.com/openclaw/openclaw/issues/15692
|
||||
*/
|
||||
delivered?: boolean;
|
||||
|
||||
// Telemetry (best-effort)
|
||||
model?: string;
|
||||
provider?: string;
|
||||
usage?: {
|
||||
input_tokens?: number;
|
||||
output_tokens?: number;
|
||||
total_tokens?: number;
|
||||
cache_read_tokens?: number;
|
||||
cache_write_tokens?: number;
|
||||
};
|
||||
};
|
||||
|
||||
export async function runCronIsolatedAgentTurn(params: {
|
||||
@@ -474,6 +485,20 @@ export async function runCronIsolatedAgentTurn(params: {
|
||||
const payloads = runResult.payloads ?? [];
|
||||
|
||||
// Update token+model fields in the session store.
|
||||
// Also collect best-effort telemetry for the cron run log.
|
||||
let telemetry:
|
||||
| {
|
||||
model?: string;
|
||||
provider?: string;
|
||||
usage?: {
|
||||
input_tokens?: number;
|
||||
output_tokens?: number;
|
||||
total_tokens?: number;
|
||||
cache_read_tokens?: number;
|
||||
cache_write_tokens?: number;
|
||||
};
|
||||
}
|
||||
| undefined;
|
||||
{
|
||||
const usage = runResult.meta.agentMeta?.usage;
|
||||
const promptTokens = runResult.meta.agentMeta?.promptTokens;
|
||||
@@ -504,6 +529,21 @@ export async function runCronIsolatedAgentTurn(params: {
|
||||
cronSession.sessionEntry.outputTokens = output;
|
||||
cronSession.sessionEntry.totalTokens = totalTokens;
|
||||
cronSession.sessionEntry.totalTokensFresh = true;
|
||||
|
||||
telemetry = {
|
||||
model: modelUsed,
|
||||
provider: providerUsed,
|
||||
usage: {
|
||||
input_tokens: input,
|
||||
output_tokens: output,
|
||||
total_tokens: totalTokens,
|
||||
},
|
||||
};
|
||||
} else {
|
||||
telemetry = {
|
||||
model: modelUsed,
|
||||
provider: providerUsed,
|
||||
};
|
||||
}
|
||||
await persistSessionEntry();
|
||||
}
|
||||
@@ -552,7 +592,7 @@ export async function runCronIsolatedAgentTurn(params: {
|
||||
});
|
||||
}
|
||||
logWarn(`[cron:${params.job.id}] ${resolvedDelivery.error.message}`);
|
||||
return withRunSession({ status: "ok", summary, outputText });
|
||||
return withRunSession({ status: "ok", summary, outputText, ...telemetry });
|
||||
}
|
||||
if (!resolvedDelivery.to) {
|
||||
const message = "cron delivery target is missing";
|
||||
@@ -565,7 +605,7 @@ export async function runCronIsolatedAgentTurn(params: {
|
||||
});
|
||||
}
|
||||
logWarn(`[cron:${params.job.id}] ${message}`);
|
||||
return withRunSession({ status: "ok", summary, outputText });
|
||||
return withRunSession({ status: "ok", summary, outputText, ...telemetry });
|
||||
}
|
||||
const identity = resolveAgentOutboundIdentity(cfgWithAgentDefaults, agentId);
|
||||
|
||||
@@ -643,7 +683,7 @@ export async function runCronIsolatedAgentTurn(params: {
|
||||
if (activeSubagentRuns > 0) {
|
||||
// Parent orchestration is still in progress; avoid announcing a partial
|
||||
// update to the main requester.
|
||||
return withRunSession({ status: "ok", summary, outputText });
|
||||
return withRunSession({ status: "ok", summary, outputText, ...telemetry });
|
||||
}
|
||||
if (
|
||||
(hadActiveDescendants || expectedSubagentFollowup) &&
|
||||
@@ -653,10 +693,10 @@ export async function runCronIsolatedAgentTurn(params: {
|
||||
) {
|
||||
// Descendants existed but no post-orchestration synthesis arrived, so
|
||||
// suppress stale parent text like "on it, pulling everything together".
|
||||
return withRunSession({ status: "ok", summary, outputText });
|
||||
return withRunSession({ status: "ok", summary, outputText, ...telemetry });
|
||||
}
|
||||
if (synthesizedText.toUpperCase() === SILENT_REPLY_TOKEN.toUpperCase()) {
|
||||
return withRunSession({ status: "ok", summary, outputText });
|
||||
return withRunSession({ status: "ok", summary, outputText, ...telemetry });
|
||||
}
|
||||
try {
|
||||
const didAnnounce = await runSubagentAnnounceFlow({
|
||||
@@ -703,5 +743,5 @@ export async function runCronIsolatedAgentTurn(params: {
|
||||
}
|
||||
}
|
||||
|
||||
return withRunSession({ status: "ok", summary, outputText, delivered });
|
||||
return withRunSession({ status: "ok", summary, outputText, delivered, ...telemetry });
|
||||
}
|
||||
|
||||
@@ -13,6 +13,17 @@ export type CronEvent = {
|
||||
sessionId?: string;
|
||||
sessionKey?: string;
|
||||
nextRunAtMs?: number;
|
||||
|
||||
// Telemetry (best-effort)
|
||||
model?: string;
|
||||
provider?: string;
|
||||
usage?: {
|
||||
input_tokens?: number;
|
||||
output_tokens?: number;
|
||||
total_tokens?: number;
|
||||
cache_read_tokens?: number;
|
||||
cache_write_tokens?: number;
|
||||
};
|
||||
};
|
||||
|
||||
export type Logger = {
|
||||
@@ -60,6 +71,17 @@ export type CronServiceDeps = {
|
||||
* https://github.com/openclaw/openclaw/issues/15692
|
||||
*/
|
||||
delivered?: boolean;
|
||||
|
||||
// Telemetry (best-effort)
|
||||
model?: string;
|
||||
provider?: string;
|
||||
usage?: {
|
||||
input_tokens?: number;
|
||||
output_tokens?: number;
|
||||
total_tokens?: number;
|
||||
cache_read_tokens?: number;
|
||||
cache_write_tokens?: number;
|
||||
};
|
||||
}>;
|
||||
onEvent?: (evt: CronEvent) => void;
|
||||
};
|
||||
|
||||
@@ -216,6 +216,15 @@ export async function onTimer(state: CronServiceState) {
|
||||
summary?: string;
|
||||
sessionId?: string;
|
||||
sessionKey?: string;
|
||||
model?: string;
|
||||
provider?: string;
|
||||
usage?: {
|
||||
input_tokens?: number;
|
||||
output_tokens?: number;
|
||||
total_tokens?: number;
|
||||
cache_read_tokens?: number;
|
||||
cache_write_tokens?: number;
|
||||
};
|
||||
startedAt: number;
|
||||
endedAt: number;
|
||||
}> = [];
|
||||
@@ -426,6 +435,15 @@ async function executeJobCore(
|
||||
summary?: string;
|
||||
sessionId?: string;
|
||||
sessionKey?: string;
|
||||
model?: string;
|
||||
provider?: string;
|
||||
usage?: {
|
||||
input_tokens?: number;
|
||||
output_tokens?: number;
|
||||
total_tokens?: number;
|
||||
cache_read_tokens?: number;
|
||||
cache_write_tokens?: number;
|
||||
};
|
||||
}> {
|
||||
if (job.sessionTarget === "main") {
|
||||
const text = resolveJobPayloadTextForMain(job);
|
||||
@@ -515,6 +533,9 @@ async function executeJobCore(
|
||||
summary: res.summary,
|
||||
sessionId: res.sessionId,
|
||||
sessionKey: res.sessionKey,
|
||||
model: res.model,
|
||||
provider: res.provider,
|
||||
usage: res.usage,
|
||||
};
|
||||
}
|
||||
|
||||
@@ -542,6 +563,15 @@ export async function executeJob(
|
||||
summary?: string;
|
||||
sessionId?: string;
|
||||
sessionKey?: string;
|
||||
model?: string;
|
||||
provider?: string;
|
||||
usage?: {
|
||||
input_tokens?: number;
|
||||
output_tokens?: number;
|
||||
total_tokens?: number;
|
||||
cache_read_tokens?: number;
|
||||
cache_write_tokens?: number;
|
||||
};
|
||||
};
|
||||
try {
|
||||
coreResult = await executeJobCore(state, job);
|
||||
@@ -574,6 +604,15 @@ function emitJobFinished(
|
||||
summary?: string;
|
||||
sessionId?: string;
|
||||
sessionKey?: string;
|
||||
model?: string;
|
||||
provider?: string;
|
||||
usage?: {
|
||||
input_tokens?: number;
|
||||
output_tokens?: number;
|
||||
total_tokens?: number;
|
||||
cache_read_tokens?: number;
|
||||
cache_write_tokens?: number;
|
||||
};
|
||||
},
|
||||
runAtMs: number,
|
||||
) {
|
||||
@@ -588,6 +627,9 @@ function emitJobFinished(
|
||||
runAtMs,
|
||||
durationMs: job.state.lastDurationMs,
|
||||
nextRunAtMs: job.state.nextRunAtMs,
|
||||
model: result.model,
|
||||
provider: result.provider,
|
||||
usage: result.usage,
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -213,6 +213,9 @@ export function buildGatewayCronService(params: {
|
||||
runAtMs: evt.runAtMs,
|
||||
durationMs: evt.durationMs,
|
||||
nextRunAtMs: evt.nextRunAtMs,
|
||||
model: evt.model,
|
||||
provider: evt.provider,
|
||||
usage: evt.usage,
|
||||
}).catch((err) => {
|
||||
cronLogger.warn({ err: String(err), logPath }, "cron: run log append failed");
|
||||
});
|
||||
|
||||
Reference in New Issue
Block a user