From a10f228a5b89ead45cff67e278091c03c2151069 Mon Sep 17 00:00:00 2001 From: Kyle Tse Date: Thu, 12 Feb 2026 23:02:30 +0000 Subject: [PATCH] fix: update totalTokens after compaction using last-call usage (#15018) Merged via /review-pr -> /prepare-pr -> /merge-pr. Prepared head SHA: 9214291bf7e9e62ba8661aa46b4739113794056a Co-authored-by: shtse8 <8020099+shtse8@users.noreply.github.com> Co-authored-by: gumadeiras <5599352+gumadeiras@users.noreply.github.com> Reviewed-by: @gumadeiras --- src/agents/pi-embedded-runner/run.ts | 7 + src/agents/pi-embedded-runner/types.ts | 14 + ...to-compaction-updates-total-tokens.test.ts | 240 ++++++++++++++++++ src/auto-reply/reply/agent-runner.ts | 10 +- src/auto-reply/reply/followup-runner.test.ts | 71 +++++- src/auto-reply/reply/followup-runner.ts | 26 +- .../reply/session-run-accounting.ts | 46 ++++ ...n-updates.incrementcompactioncount.test.ts | 98 +++++++ src/auto-reply/reply/session-usage.test.ts | 95 +++++++ src/auto-reply/reply/session-usage.ts | 14 +- 10 files changed, 602 insertions(+), 19 deletions(-) create mode 100644 src/auto-reply/reply/agent-runner.auto-compaction-updates-total-tokens.test.ts create mode 100644 src/auto-reply/reply/session-run-accounting.ts create mode 100644 src/auto-reply/reply/session-updates.incrementcompactioncount.test.ts create mode 100644 src/auto-reply/reply/session-usage.test.ts diff --git a/src/agents/pi-embedded-runner/run.ts b/src/agents/pi-embedded-runner/run.ts index 18e6234960..d56d188b5b 100644 --- a/src/agents/pi-embedded-runner/run.ts +++ b/src/agents/pi-embedded-runner/run.ts @@ -820,11 +820,18 @@ export async function runEmbeddedPiAgent( } const usage = toNormalizedUsage(usageAccumulator); + // Extract the last individual API call's usage for context-window + // utilization display. The accumulated `usage` sums input tokens + // across all calls (tool-use loops, compaction retries), which + // overstates the actual context size. `lastCallUsage` reflects only + // the final call, giving an accurate snapshot of current context. + const lastCallUsage = normalizeUsage(lastAssistant?.usage as UsageLike); const agentMeta: EmbeddedPiAgentMeta = { sessionId: sessionIdUsed, provider: lastAssistant?.provider ?? provider, model: lastAssistant?.model ?? model.id, usage, + lastCallUsage: lastCallUsage ?? undefined, compactionCount: autoCompactionCount > 0 ? autoCompactionCount : undefined, }; diff --git a/src/agents/pi-embedded-runner/types.ts b/src/agents/pi-embedded-runner/types.ts index 9217b48319..2f845de6b0 100644 --- a/src/agents/pi-embedded-runner/types.ts +++ b/src/agents/pi-embedded-runner/types.ts @@ -13,6 +13,20 @@ export type EmbeddedPiAgentMeta = { cacheWrite?: number; total?: number; }; + /** + * Usage from the last individual API call (not accumulated across tool-use + * loops or compaction retries). Used for context-window utilization display + * (`totalTokens` in sessions.json) because the accumulated `usage.input` + * sums input tokens from every API call in the run, which overstates the + * actual context size. + */ + lastCallUsage?: { + input?: number; + output?: number; + cacheRead?: number; + cacheWrite?: number; + total?: number; + }; }; export type EmbeddedPiRunMeta = { diff --git a/src/auto-reply/reply/agent-runner.auto-compaction-updates-total-tokens.test.ts b/src/auto-reply/reply/agent-runner.auto-compaction-updates-total-tokens.test.ts new file mode 100644 index 0000000000..c0596f4d02 --- /dev/null +++ b/src/auto-reply/reply/agent-runner.auto-compaction-updates-total-tokens.test.ts @@ -0,0 +1,240 @@ +import fs from "node:fs/promises"; +import os from "node:os"; +import path from "node:path"; +import { describe, expect, it, vi } from "vitest"; +import type { TemplateContext } from "../templating.js"; +import type { FollowupRun, QueueSettings } from "./queue.js"; +import { createMockTypingController } from "./test-helpers.js"; + +const runEmbeddedPiAgentMock = vi.fn(); + +type EmbeddedRunParams = { + prompt?: string; + extraSystemPrompt?: string; + onAgentEvent?: (evt: { stream?: string; data?: { phase?: string; willRetry?: boolean } }) => void; +}; + +vi.mock("../../agents/model-fallback.js", () => ({ + runWithModelFallback: async ({ + provider, + model, + run, + }: { + provider: string; + model: string; + run: (provider: string, model: string) => Promise; + }) => ({ + result: await run(provider, model), + provider, + model, + }), +})); + +vi.mock("../../agents/cli-runner.js", () => ({ + runCliAgent: vi.fn(), +})); + +vi.mock("../../agents/pi-embedded.js", () => ({ + queueEmbeddedPiMessage: vi.fn().mockReturnValue(false), + runEmbeddedPiAgent: (params: unknown) => runEmbeddedPiAgentMock(params), +})); + +vi.mock("./queue.js", async () => { + const actual = await vi.importActual("./queue.js"); + return { + ...actual, + enqueueFollowupRun: vi.fn(), + scheduleFollowupDrain: vi.fn(), + }; +}); + +import { runReplyAgent } from "./agent-runner.js"; + +async function seedSessionStore(params: { + storePath: string; + sessionKey: string; + entry: Record; +}) { + await fs.mkdir(path.dirname(params.storePath), { recursive: true }); + await fs.writeFile( + params.storePath, + JSON.stringify({ [params.sessionKey]: params.entry }, null, 2), + "utf-8", + ); +} + +function createBaseRun(params: { + storePath: string; + sessionEntry: Record; + config?: Record; +}) { + const typing = createMockTypingController(); + const sessionCtx = { + Provider: "whatsapp", + OriginatingTo: "+15550001111", + AccountId: "primary", + MessageSid: "msg", + } as unknown as TemplateContext; + const resolvedQueue = { mode: "interrupt" } as unknown as QueueSettings; + const followupRun = { + prompt: "hello", + summaryLine: "hello", + enqueuedAt: Date.now(), + run: { + agentId: "main", + agentDir: "/tmp/agent", + sessionId: "session", + sessionKey: "main", + messageProvider: "whatsapp", + sessionFile: "/tmp/session.jsonl", + workspaceDir: "/tmp", + config: params.config ?? {}, + skillsSnapshot: {}, + provider: "anthropic", + model: "claude", + thinkLevel: "low", + verboseLevel: "off", + elevatedLevel: "off", + bashElevated: { enabled: false, allowed: false, defaultLevel: "off" }, + timeoutMs: 1_000, + blockReplyBreak: "message_end", + }, + } as unknown as FollowupRun; + return { typing, sessionCtx, resolvedQueue, followupRun }; +} + +describe("runReplyAgent auto-compaction token update", () => { + it("updates totalTokens after auto-compaction using lastCallUsage", async () => { + runEmbeddedPiAgentMock.mockReset(); + const tmp = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-compact-tokens-")); + const storePath = path.join(tmp, "sessions.json"); + const sessionKey = "main"; + const sessionEntry = { + sessionId: "session", + updatedAt: Date.now(), + totalTokens: 181_000, + compactionCount: 0, + }; + + await seedSessionStore({ storePath, sessionKey, entry: sessionEntry }); + + runEmbeddedPiAgentMock.mockImplementation(async (params: EmbeddedRunParams) => { + // Simulate auto-compaction during agent run + params.onAgentEvent?.({ stream: "compaction", data: { phase: "start" } }); + params.onAgentEvent?.({ stream: "compaction", data: { phase: "end", willRetry: false } }); + return { + payloads: [{ text: "done" }], + meta: { + agentMeta: { + // Accumulated usage across pre+post compaction calls — inflated + usage: { input: 190_000, output: 8_000, total: 198_000 }, + // Last individual API call's usage — actual post-compaction context + lastCallUsage: { input: 10_000, output: 3_000, total: 13_000 }, + compactionCount: 1, + }, + }, + }; + }); + + // Disable memory flush so we isolate the auto-compaction path + const config = { + agents: { defaults: { compaction: { memoryFlush: { enabled: false } } } }, + }; + const { typing, sessionCtx, resolvedQueue, followupRun } = createBaseRun({ + storePath, + sessionEntry, + config, + }); + + await runReplyAgent({ + commandBody: "hello", + followupRun, + queueKey: "main", + resolvedQueue, + shouldSteer: false, + shouldFollowup: false, + isActive: false, + isStreaming: false, + typing, + sessionCtx, + sessionEntry, + sessionStore: { [sessionKey]: sessionEntry }, + sessionKey, + storePath, + defaultModel: "anthropic/claude-opus-4-5", + agentCfgContextTokens: 200_000, + resolvedVerboseLevel: "off", + isNewSession: false, + blockStreamingEnabled: false, + resolvedBlockStreamingBreak: "message_end", + shouldInjectGroupIntro: false, + typingMode: "instant", + }); + + const stored = JSON.parse(await fs.readFile(storePath, "utf-8")); + // totalTokens should reflect actual post-compaction context (~10k), not + // the stale pre-compaction value (181k) or the inflated accumulated (190k) + expect(stored[sessionKey].totalTokens).toBe(10_000); + // compactionCount should be incremented + expect(stored[sessionKey].compactionCount).toBe(1); + }); + + it("updates totalTokens from lastCallUsage even without compaction", async () => { + runEmbeddedPiAgentMock.mockReset(); + const tmp = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-usage-last-")); + const storePath = path.join(tmp, "sessions.json"); + const sessionKey = "main"; + const sessionEntry = { + sessionId: "session", + updatedAt: Date.now(), + totalTokens: 50_000, + }; + + await seedSessionStore({ storePath, sessionKey, entry: sessionEntry }); + + runEmbeddedPiAgentMock.mockImplementation(async (_params: EmbeddedRunParams) => ({ + payloads: [{ text: "ok" }], + meta: { + agentMeta: { + // Tool-use loop: accumulated input is higher than last call's input + usage: { input: 75_000, output: 5_000, total: 80_000 }, + lastCallUsage: { input: 55_000, output: 2_000, total: 57_000 }, + }, + }, + })); + + const { typing, sessionCtx, resolvedQueue, followupRun } = createBaseRun({ + storePath, + sessionEntry, + }); + + await runReplyAgent({ + commandBody: "hello", + followupRun, + queueKey: "main", + resolvedQueue, + shouldSteer: false, + shouldFollowup: false, + isActive: false, + isStreaming: false, + typing, + sessionCtx, + sessionEntry, + sessionStore: { [sessionKey]: sessionEntry }, + sessionKey, + storePath, + defaultModel: "anthropic/claude-opus-4-5", + agentCfgContextTokens: 200_000, + resolvedVerboseLevel: "off", + isNewSession: false, + blockStreamingEnabled: false, + resolvedBlockStreamingBreak: "message_end", + shouldInjectGroupIntro: false, + typingMode: "instant", + }); + + const stored = JSON.parse(await fs.readFile(storePath, "utf-8")); + // totalTokens should use lastCallUsage (55k), not accumulated (75k) + expect(stored[sessionKey].totalTokens).toBe(55_000); + }); +}); diff --git a/src/auto-reply/reply/agent-runner.ts b/src/auto-reply/reply/agent-runner.ts index 3ca6e39774..9f0db99753 100644 --- a/src/auto-reply/reply/agent-runner.ts +++ b/src/auto-reply/reply/agent-runner.ts @@ -38,8 +38,7 @@ import { resolveBlockStreamingCoalescing } from "./block-streaming.js"; import { createFollowupRunner } from "./followup-runner.js"; import { enqueueFollowupRun, type FollowupRun, type QueueSettings } from "./queue.js"; import { createReplyToModeFilterForChannel, resolveReplyToMode } from "./reply-threading.js"; -import { incrementCompactionCount } from "./session-updates.js"; -import { persistSessionUsageUpdate } from "./session-usage.js"; +import { incrementRunCompactionCount, persistRunSessionUsage } from "./session-run-accounting.js"; import { createTypingSignaler } from "./typing-mode.js"; const BLOCK_REPLY_SEND_TIMEOUT_MS = 15_000; @@ -384,10 +383,11 @@ export async function runReplyAgent(params: { activeSessionEntry?.contextTokens ?? DEFAULT_CONTEXT_TOKENS; - await persistSessionUsageUpdate({ + await persistRunSessionUsage({ storePath, sessionKey, usage, + lastCallUsage: runResult.meta.agentMeta?.lastCallUsage, modelUsed, providerUsed, contextTokensUsed, @@ -495,11 +495,13 @@ export async function runReplyAgent(params: { let finalPayloads = replyPayloads; const verboseEnabled = resolvedVerboseLevel !== "off"; if (autoCompactionCompleted) { - const count = await incrementCompactionCount({ + const count = await incrementRunCompactionCount({ sessionEntry: activeSessionEntry, sessionStore: activeSessionStore, sessionKey, storePath, + lastCallUsage: runResult.meta.agentMeta?.lastCallUsage, + contextTokensUsed, }); if (verboseEnabled) { const suffix = typeof count === "number" ? ` (count ${count})` : ""; diff --git a/src/auto-reply/reply/followup-runner.test.ts b/src/auto-reply/reply/followup-runner.test.ts index 3ae3e318cf..96d1b6016b 100644 --- a/src/auto-reply/reply/followup-runner.test.ts +++ b/src/auto-reply/reply/followup-runner.test.ts @@ -131,6 +131,68 @@ describe("createFollowupRunner compaction", () => { expect(onBlockReply.mock.calls[0][0].text).toContain("Auto-compaction complete"); expect(sessionStore.main.compactionCount).toBe(1); }); + + it("updates totalTokens after auto-compaction using lastCallUsage", async () => { + const storePath = path.join( + await fs.mkdtemp(path.join(tmpdir(), "openclaw-followup-compaction-")), + "sessions.json", + ); + const sessionKey = "main"; + const sessionEntry: SessionEntry = { + sessionId: "session", + updatedAt: Date.now(), + totalTokens: 180_000, + compactionCount: 0, + }; + const sessionStore: Record = { [sessionKey]: sessionEntry }; + await saveSessionStore(storePath, sessionStore); + const onBlockReply = vi.fn(async () => {}); + + runEmbeddedPiAgentMock.mockImplementationOnce( + async (params: { + onAgentEvent?: (evt: { stream: string; data: Record }) => void; + }) => { + params.onAgentEvent?.({ + stream: "compaction", + data: { phase: "end", willRetry: false }, + }); + return { + payloads: [{ text: "done" }], + meta: { + agentMeta: { + // Accumulated usage across pre+post compaction calls. + usage: { input: 190_000, output: 8_000, total: 198_000 }, + // Last call usage reflects post-compaction context. + lastCallUsage: { input: 11_000, output: 2_000, total: 13_000 }, + model: "claude-opus-4-5", + provider: "anthropic", + }, + }, + }; + }, + ); + + const runner = createFollowupRunner({ + opts: { onBlockReply }, + typing: createMockTypingController(), + typingMode: "instant", + sessionEntry, + sessionStore, + sessionKey, + storePath, + defaultModel: "anthropic/claude-opus-4-5", + agentCfgContextTokens: 200_000, + }); + + await runner(baseQueuedRun()); + + const store = loadSessionStore(storePath, { skipCache: true }); + expect(store[sessionKey]?.compactionCount).toBe(1); + expect(store[sessionKey]?.totalTokens).toBe(11_000); + // We only keep the total estimate after compaction. + expect(store[sessionKey]?.inputTokens).toBeUndefined(); + expect(store[sessionKey]?.outputTokens).toBeUndefined(); + }); }); describe("createFollowupRunner messaging tool dedupe", () => { @@ -212,7 +274,8 @@ describe("createFollowupRunner messaging tool dedupe", () => { messagingToolSentTargets: [{ tool: "slack", provider: "slack", to: "channel:C1" }], meta: { agentMeta: { - usage: { input: 10, output: 5 }, + usage: { input: 1_000, output: 50 }, + lastCallUsage: { input: 400, output: 20 }, model: "claude-opus-4-5", provider: "anthropic", }, @@ -234,7 +297,11 @@ describe("createFollowupRunner messaging tool dedupe", () => { expect(onBlockReply).not.toHaveBeenCalled(); const store = loadSessionStore(storePath, { skipCache: true }); - expect(store[sessionKey]?.totalTokens ?? 0).toBeGreaterThan(0); + // totalTokens should reflect the last call usage snapshot, not the accumulated input. + expect(store[sessionKey]?.totalTokens).toBe(400); expect(store[sessionKey]?.model).toBe("claude-opus-4-5"); + // Accumulated usage is still stored for usage/cost tracking. + expect(store[sessionKey]?.inputTokens).toBe(1_000); + expect(store[sessionKey]?.outputTokens).toBe(50); }); }); diff --git a/src/auto-reply/reply/followup-runner.ts b/src/auto-reply/reply/followup-runner.ts index e4c23aa043..eb8ce09fa8 100644 --- a/src/auto-reply/reply/followup-runner.ts +++ b/src/auto-reply/reply/followup-runner.ts @@ -22,8 +22,7 @@ import { } from "./reply-payloads.js"; import { resolveReplyToMode } from "./reply-threading.js"; import { isRoutableChannel, routeReply } from "./route-reply.js"; -import { incrementCompactionCount } from "./session-updates.js"; -import { persistSessionUsageUpdate } from "./session-usage.js"; +import { incrementRunCompactionCount, persistRunSessionUsage } from "./session-run-accounting.js"; import { createTypingSignaler } from "./typing-mode.js"; export function createFollowupRunner(params: { @@ -194,19 +193,20 @@ export function createFollowupRunner(params: { return; } - if (storePath && sessionKey) { - const usage = runResult.meta.agentMeta?.usage; - const modelUsed = runResult.meta.agentMeta?.model ?? fallbackModel ?? defaultModel; - const contextTokensUsed = - agentCfgContextTokens ?? - lookupContextTokens(modelUsed) ?? - sessionEntry?.contextTokens ?? - DEFAULT_CONTEXT_TOKENS; + const usage = runResult.meta.agentMeta?.usage; + const modelUsed = runResult.meta.agentMeta?.model ?? fallbackModel ?? defaultModel; + const contextTokensUsed = + agentCfgContextTokens ?? + lookupContextTokens(modelUsed) ?? + sessionEntry?.contextTokens ?? + DEFAULT_CONTEXT_TOKENS; - await persistSessionUsageUpdate({ + if (storePath && sessionKey) { + await persistRunSessionUsage({ storePath, sessionKey, usage, + lastCallUsage: runResult.meta.agentMeta?.lastCallUsage, modelUsed, providerUsed: fallbackProvider, contextTokensUsed, @@ -263,11 +263,13 @@ export function createFollowupRunner(params: { } if (autoCompactionCompleted) { - const count = await incrementCompactionCount({ + const count = await incrementRunCompactionCount({ sessionEntry, sessionStore, sessionKey, storePath, + lastCallUsage: runResult.meta.agentMeta?.lastCallUsage, + contextTokensUsed, }); if (queued.run.verboseLevel && queued.run.verboseLevel !== "off") { const suffix = typeof count === "number" ? ` (count ${count})` : ""; diff --git a/src/auto-reply/reply/session-run-accounting.ts b/src/auto-reply/reply/session-run-accounting.ts new file mode 100644 index 0000000000..4316a6573e --- /dev/null +++ b/src/auto-reply/reply/session-run-accounting.ts @@ -0,0 +1,46 @@ +import { deriveSessionTotalTokens, type NormalizedUsage } from "../../agents/usage.js"; +import { incrementCompactionCount } from "./session-updates.js"; +import { persistSessionUsageUpdate } from "./session-usage.js"; + +type PersistRunSessionUsageParams = Parameters[0]; + +type IncrementRunCompactionCountParams = Omit< + Parameters[0], + "tokensAfter" +> & { + lastCallUsage?: NormalizedUsage; + contextTokensUsed?: number; +}; + +export async function persistRunSessionUsage(params: PersistRunSessionUsageParams): Promise { + await persistSessionUsageUpdate({ + storePath: params.storePath, + sessionKey: params.sessionKey, + usage: params.usage, + lastCallUsage: params.lastCallUsage, + modelUsed: params.modelUsed, + providerUsed: params.providerUsed, + contextTokensUsed: params.contextTokensUsed, + systemPromptReport: params.systemPromptReport, + cliSessionId: params.cliSessionId, + logLabel: params.logLabel, + }); +} + +export async function incrementRunCompactionCount( + params: IncrementRunCompactionCountParams, +): Promise { + const tokensAfterCompaction = params.lastCallUsage + ? deriveSessionTotalTokens({ + usage: params.lastCallUsage, + contextTokens: params.contextTokensUsed, + }) + : undefined; + return incrementCompactionCount({ + sessionEntry: params.sessionEntry, + sessionStore: params.sessionStore, + sessionKey: params.sessionKey, + storePath: params.storePath, + tokensAfter: tokensAfterCompaction, + }); +} diff --git a/src/auto-reply/reply/session-updates.incrementcompactioncount.test.ts b/src/auto-reply/reply/session-updates.incrementcompactioncount.test.ts new file mode 100644 index 0000000000..5a90b4ed5f --- /dev/null +++ b/src/auto-reply/reply/session-updates.incrementcompactioncount.test.ts @@ -0,0 +1,98 @@ +import fs from "node:fs/promises"; +import os from "node:os"; +import path from "node:path"; +import { describe, expect, it } from "vitest"; +import type { SessionEntry } from "../../config/sessions.js"; +import { incrementCompactionCount } from "./session-updates.js"; + +async function seedSessionStore(params: { + storePath: string; + sessionKey: string; + entry: Record; +}) { + await fs.mkdir(path.dirname(params.storePath), { recursive: true }); + await fs.writeFile( + params.storePath, + JSON.stringify({ [params.sessionKey]: params.entry }, null, 2), + "utf-8", + ); +} + +describe("incrementCompactionCount", () => { + it("increments compaction count", async () => { + const tmp = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-compact-")); + const storePath = path.join(tmp, "sessions.json"); + const sessionKey = "main"; + const entry = { sessionId: "s1", updatedAt: Date.now(), compactionCount: 2 } as SessionEntry; + const sessionStore: Record = { [sessionKey]: entry }; + await seedSessionStore({ storePath, sessionKey, entry }); + + const count = await incrementCompactionCount({ + sessionEntry: entry, + sessionStore, + sessionKey, + storePath, + }); + expect(count).toBe(3); + + const stored = JSON.parse(await fs.readFile(storePath, "utf-8")); + expect(stored[sessionKey].compactionCount).toBe(3); + }); + + it("updates totalTokens when tokensAfter is provided", async () => { + const tmp = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-compact-")); + const storePath = path.join(tmp, "sessions.json"); + const sessionKey = "main"; + const entry = { + sessionId: "s1", + updatedAt: Date.now(), + compactionCount: 0, + totalTokens: 180_000, + inputTokens: 170_000, + outputTokens: 10_000, + } as SessionEntry; + const sessionStore: Record = { [sessionKey]: entry }; + await seedSessionStore({ storePath, sessionKey, entry }); + + await incrementCompactionCount({ + sessionEntry: entry, + sessionStore, + sessionKey, + storePath, + tokensAfter: 12_000, + }); + + const stored = JSON.parse(await fs.readFile(storePath, "utf-8")); + expect(stored[sessionKey].compactionCount).toBe(1); + expect(stored[sessionKey].totalTokens).toBe(12_000); + // input/output cleared since we only have the total estimate + expect(stored[sessionKey].inputTokens).toBeUndefined(); + expect(stored[sessionKey].outputTokens).toBeUndefined(); + }); + + it("does not update totalTokens when tokensAfter is not provided", async () => { + const tmp = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-compact-")); + const storePath = path.join(tmp, "sessions.json"); + const sessionKey = "main"; + const entry = { + sessionId: "s1", + updatedAt: Date.now(), + compactionCount: 0, + totalTokens: 180_000, + } as SessionEntry; + const sessionStore: Record = { [sessionKey]: entry }; + await seedSessionStore({ storePath, sessionKey, entry }); + + await incrementCompactionCount({ + sessionEntry: entry, + sessionStore, + sessionKey, + storePath, + }); + + const stored = JSON.parse(await fs.readFile(storePath, "utf-8")); + expect(stored[sessionKey].compactionCount).toBe(1); + // totalTokens unchanged + expect(stored[sessionKey].totalTokens).toBe(180_000); + }); +}); diff --git a/src/auto-reply/reply/session-usage.test.ts b/src/auto-reply/reply/session-usage.test.ts new file mode 100644 index 0000000000..d592cad21e --- /dev/null +++ b/src/auto-reply/reply/session-usage.test.ts @@ -0,0 +1,95 @@ +import fs from "node:fs/promises"; +import os from "node:os"; +import path from "node:path"; +import { describe, expect, it } from "vitest"; +import { persistSessionUsageUpdate } from "./session-usage.js"; + +async function seedSessionStore(params: { + storePath: string; + sessionKey: string; + entry: Record; +}) { + await fs.mkdir(path.dirname(params.storePath), { recursive: true }); + await fs.writeFile( + params.storePath, + JSON.stringify({ [params.sessionKey]: params.entry }, null, 2), + "utf-8", + ); +} + +describe("persistSessionUsageUpdate", () => { + it("uses lastCallUsage for totalTokens when provided", async () => { + const tmp = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-usage-")); + const storePath = path.join(tmp, "sessions.json"); + const sessionKey = "main"; + await seedSessionStore({ + storePath, + sessionKey, + entry: { sessionId: "s1", updatedAt: Date.now(), totalTokens: 100_000 }, + }); + + // Accumulated usage (sums all API calls) — inflated + const accumulatedUsage = { input: 180_000, output: 10_000, total: 190_000 }; + // Last individual API call's usage — actual context after compaction + const lastCallUsage = { input: 12_000, output: 2_000, total: 14_000 }; + + await persistSessionUsageUpdate({ + storePath, + sessionKey, + usage: accumulatedUsage, + lastCallUsage, + contextTokensUsed: 200_000, + }); + + const stored = JSON.parse(await fs.readFile(storePath, "utf-8")); + // totalTokens should reflect lastCallUsage (12_000 input), not accumulated (180_000) + expect(stored[sessionKey].totalTokens).toBe(12_000); + // inputTokens/outputTokens still reflect accumulated usage for cost tracking + expect(stored[sessionKey].inputTokens).toBe(180_000); + expect(stored[sessionKey].outputTokens).toBe(10_000); + }); + + it("falls back to accumulated usage for totalTokens when lastCallUsage not provided", async () => { + const tmp = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-usage-")); + const storePath = path.join(tmp, "sessions.json"); + const sessionKey = "main"; + await seedSessionStore({ + storePath, + sessionKey, + entry: { sessionId: "s1", updatedAt: Date.now() }, + }); + + await persistSessionUsageUpdate({ + storePath, + sessionKey, + usage: { input: 50_000, output: 5_000, total: 55_000 }, + contextTokensUsed: 200_000, + }); + + const stored = JSON.parse(await fs.readFile(storePath, "utf-8")); + expect(stored[sessionKey].totalTokens).toBe(50_000); + }); + + it("caps totalTokens at context window even with lastCallUsage", async () => { + const tmp = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-usage-")); + const storePath = path.join(tmp, "sessions.json"); + const sessionKey = "main"; + await seedSessionStore({ + storePath, + sessionKey, + entry: { sessionId: "s1", updatedAt: Date.now() }, + }); + + await persistSessionUsageUpdate({ + storePath, + sessionKey, + usage: { input: 300_000, output: 10_000, total: 310_000 }, + lastCallUsage: { input: 250_000, output: 5_000, total: 255_000 }, + contextTokensUsed: 200_000, + }); + + const stored = JSON.parse(await fs.readFile(storePath, "utf-8")); + // Capped at context window + expect(stored[sessionKey].totalTokens).toBe(200_000); + }); +}); diff --git a/src/auto-reply/reply/session-usage.ts b/src/auto-reply/reply/session-usage.ts index a562c20054..2922564b71 100644 --- a/src/auto-reply/reply/session-usage.ts +++ b/src/auto-reply/reply/session-usage.ts @@ -15,6 +15,13 @@ export async function persistSessionUsageUpdate(params: { storePath?: string; sessionKey?: string; usage?: NormalizedUsage; + /** + * Usage from the last individual API call (not accumulated). When provided, + * this is used for `totalTokens` instead of the accumulated `usage` so that + * context-window utilization reflects the actual current context size rather + * than the sum of input tokens across all API calls in the run. + */ + lastCallUsage?: NormalizedUsage; modelUsed?: string; providerUsed?: string; contextTokensUsed?: number; @@ -37,12 +44,17 @@ export async function persistSessionUsageUpdate(params: { const input = params.usage?.input ?? 0; const output = params.usage?.output ?? 0; const resolvedContextTokens = params.contextTokensUsed ?? entry.contextTokens; + // Use last-call usage for totalTokens when available. The accumulated + // `usage.input` sums input tokens from every API call in the run + // (tool-use loops, compaction retries), overstating actual context. + // `lastCallUsage` reflects only the final API call — the true context. + const usageForContext = params.lastCallUsage ?? params.usage; const patch: Partial = { inputTokens: input, outputTokens: output, totalTokens: deriveSessionTotalTokens({ - usage: params.usage, + usage: usageForContext, contextTokens: resolvedContextTokens, }) ?? input, modelProvider: params.providerUsed ?? entry.modelProvider,