fix: context overflow compaction and subagent announce improvements (#11664) (thanks @tyler6204)

* initial commit

* feat: implement deriveSessionTotalTokens function and update usage tests

* Added deriveSessionTotalTokens function to calculate total tokens based on usage and context tokens.
* Updated usage tests to include cases for derived session total tokens.
* Refactored session usage calculations in multiple files to utilize the new function for improved accuracy.

* fix: restore overflow truncation fallback + changelog/test hardening (#11551) (thanks @tyler6204)
This commit is contained in:
Tyler Yust
2026-02-07 20:02:32 -08:00
committed by GitHub
parent 8fae55e8e0
commit 191da1feb5
31 changed files with 889 additions and 178 deletions

View File

@@ -11,6 +11,7 @@ Docs: https://docs.openclaw.ai
### Fixes
- Cron: route text-only isolated agent announces through the shared subagent announce flow; add exponential backoff for repeated errors; preserve future `nextRunAtMs` on restart; include current-boundary schedule matches; prevent stale threadId reuse across targets; and add per-job execution timeout. (#11641) Thanks @tyler6204.
- Subagents: stabilize announce timing, preserve compaction metrics across retries, clamp overflow-prone long timeouts, and cap impossible context usage token totals. (#11551) Thanks @tyler6204.
- Agents: recover from context overflow caused by oversized tool results (pre-emptive capping + fallback truncation). (#11579) Thanks @tyler6204.
- Gateway/CLI: when `gateway.bind=lan`, use a LAN IP for probe URLs and Control UI links. (#11448) Thanks @AnonO6.
- Memory: set Voyage embeddings `input_type` for improved retrieval. (#10818) Thanks @mcinteerj.

View File

@@ -30,7 +30,9 @@ const shardCount = isWindowsCi
: 2
: 1;
const windowsCiArgs = isWindowsCi ? ["--dangerouslyIgnoreUnhandledErrors"] : [];
const passthroughArgs = process.argv.slice(2);
const rawPassthroughArgs = process.argv.slice(2);
const passthroughArgs =
rawPassthroughArgs[0] === "--" ? rawPassthroughArgs.slice(1) : rawPassthroughArgs;
const overrideWorkers = Number.parseInt(process.env.OPENCLAW_TEST_WORKERS ?? "", 10);
const resolvedOverride =
Number.isFinite(overrideWorkers) && overrideWorkers > 0 ? overrideWorkers : null;

View File

@@ -82,6 +82,16 @@ describe("openclaw-tools: subagents", () => {
endedAt: 4000,
};
}
if (request.method === "chat.history") {
return {
messages: [
{
role: "assistant",
content: [{ type: "text", text: "done" }],
},
],
};
}
if (request.method === "sessions.delete") {
const params = request.params as { key?: string } | undefined;
deletedKey = params?.key;

View File

@@ -23,7 +23,6 @@ vi.mock("../config/config.js", async (importOriginal) => {
import { emitAgentEvent } from "../infra/agent-events.js";
import "./test-helpers/fast-core-tools.js";
import { sleep } from "../utils.js";
import { createOpenClawTools } from "./openclaw-tools.js";
import { resetSubagentRegistryForTests } from "./subagent-registry.js";
@@ -202,19 +201,22 @@ describe("openclaw-tools: subagents", () => {
if (!childRunId) {
throw new Error("missing child runId");
}
emitAgentEvent({
runId: childRunId,
stream: "lifecycle",
data: {
phase: "end",
startedAt: 1234,
endedAt: 2345,
},
});
vi.useFakeTimers();
try {
emitAgentEvent({
runId: childRunId,
stream: "lifecycle",
data: {
phase: "end",
startedAt: 1234,
endedAt: 2345,
},
});
await sleep(0);
await sleep(0);
await sleep(0);
await vi.runAllTimersAsync();
} finally {
vi.useRealTimers();
}
const childWait = waitCalls.find((call) => call.runId === childRunId);
expect(childWait?.timeoutMs).toBe(1000);
@@ -313,19 +315,22 @@ describe("openclaw-tools: subagents", () => {
if (!childRunId) {
throw new Error("missing child runId");
}
emitAgentEvent({
runId: childRunId,
stream: "lifecycle",
data: {
phase: "end",
startedAt: 1000,
endedAt: 2000,
},
});
vi.useFakeTimers();
try {
emitAgentEvent({
runId: childRunId,
stream: "lifecycle",
data: {
phase: "end",
startedAt: 1000,
endedAt: 2000,
},
});
await sleep(0);
await sleep(0);
await sleep(0);
await vi.runAllTimersAsync();
} finally {
vi.useRealTimers();
}
const agentCalls = calls.filter((call) => call.method === "agent");
expect(agentCalls).toHaveLength(2);

View File

@@ -95,6 +95,16 @@ describe("openclaw-tools: subagents", () => {
patchParams = { key: params?.key, label: params?.label };
return { ok: true };
}
if (request.method === "chat.history") {
return {
messages: [
{
role: "assistant",
content: [{ type: "text", text: "done" }],
},
],
};
}
if (request.method === "sessions.delete") {
return { ok: true };
}

View File

@@ -88,6 +88,7 @@ vi.mock("../failover-error.js", () => ({
vi.mock("../usage.js", () => ({
normalizeUsage: vi.fn(() => undefined),
hasNonzeroUsage: vi.fn(() => false),
}));
vi.mock("./lanes.js", () => ({
@@ -108,6 +109,15 @@ vi.mock("./run/payloads.js", () => ({
buildEmbeddedRunPayloads: vi.fn(() => []),
}));
vi.mock("./tool-result-truncation.js", () => ({
truncateOversizedToolResultsInSession: vi.fn(async () => ({
truncated: false,
truncatedCount: 0,
reason: "no oversized tool results",
})),
sessionLikelyHasOversizedToolResults: vi.fn(() => false),
}));
vi.mock("./utils.js", () => ({
describeUnknownError: vi.fn((err: unknown) => {
if (err instanceof Error) {
@@ -140,6 +150,7 @@ vi.mock("../pi-embedded-helpers.js", async () => {
isBillingAssistantError: vi.fn(() => false),
classifyFailoverReason: vi.fn(() => null),
formatAssistantErrorText: vi.fn(() => ""),
parseImageSizeError: vi.fn(() => null),
pickFallbackThinkingLevel: vi.fn(() => null),
isTimeoutErrorMessage: vi.fn(() => false),
parseImageDimensionError: vi.fn(() => null),
@@ -151,9 +162,17 @@ import { compactEmbeddedPiSessionDirect } from "./compact.js";
import { log } from "./logger.js";
import { runEmbeddedPiAgent } from "./run.js";
import { runEmbeddedAttempt } from "./run/attempt.js";
import {
sessionLikelyHasOversizedToolResults,
truncateOversizedToolResultsInSession,
} from "./tool-result-truncation.js";
const mockedRunEmbeddedAttempt = vi.mocked(runEmbeddedAttempt);
const mockedCompactDirect = vi.mocked(compactEmbeddedPiSessionDirect);
const mockedSessionLikelyHasOversizedToolResults = vi.mocked(sessionLikelyHasOversizedToolResults);
const mockedTruncateOversizedToolResultsInSession = vi.mocked(
truncateOversizedToolResultsInSession,
);
function makeAttemptResult(
overrides: Partial<EmbeddedRunAttemptResult> = {},
@@ -188,6 +207,12 @@ const baseParams = {
describe("overflow compaction in run loop", () => {
beforeEach(() => {
vi.clearAllMocks();
mockedSessionLikelyHasOversizedToolResults.mockReturnValue(false);
mockedTruncateOversizedToolResultsInSession.mockResolvedValue({
truncated: false,
truncatedCount: 0,
reason: "no oversized tool results",
});
});
it("retries after successful compaction on context overflow promptError", async () => {
@@ -244,6 +269,43 @@ describe("overflow compaction in run loop", () => {
expect(log.warn).toHaveBeenCalledWith(expect.stringContaining("auto-compaction failed"));
});
it("falls back to tool-result truncation and retries when oversized results are detected", async () => {
const overflowError = new Error("request_too_large: Request size exceeds model context window");
mockedRunEmbeddedAttempt
.mockResolvedValueOnce(
makeAttemptResult({
promptError: overflowError,
messagesSnapshot: [{ role: "assistant", content: "big tool output" }],
}),
)
.mockResolvedValueOnce(makeAttemptResult({ promptError: null }));
mockedCompactDirect.mockResolvedValueOnce({
ok: false,
compacted: false,
reason: "nothing to compact",
});
mockedSessionLikelyHasOversizedToolResults.mockReturnValue(true);
mockedTruncateOversizedToolResultsInSession.mockResolvedValueOnce({
truncated: true,
truncatedCount: 1,
});
const result = await runEmbeddedPiAgent(baseParams);
expect(mockedCompactDirect).toHaveBeenCalledTimes(1);
expect(mockedSessionLikelyHasOversizedToolResults).toHaveBeenCalledWith(
expect.objectContaining({ contextWindowTokens: 200000 }),
);
expect(mockedTruncateOversizedToolResultsInSession).toHaveBeenCalledWith(
expect.objectContaining({ sessionFile: "/tmp/session.json" }),
);
expect(mockedRunEmbeddedAttempt).toHaveBeenCalledTimes(2);
expect(log.info).toHaveBeenCalledWith(expect.stringContaining("Truncated 1 tool result(s)"));
expect(result.meta.error).toBeUndefined();
});
it("retries compaction up to 3 times before giving up", async () => {
const overflowError = new Error("request_too_large: Request size exceeds model context window");
@@ -323,4 +385,52 @@ describe("overflow compaction in run loop", () => {
expect(mockedRunEmbeddedAttempt).toHaveBeenCalledTimes(1);
expect(result.meta.error?.kind).toBe("compaction_failure");
});
it("retries after successful compaction on assistant context overflow errors", async () => {
mockedRunEmbeddedAttempt
.mockResolvedValueOnce(
makeAttemptResult({
promptError: null,
lastAssistant: {
stopReason: "error",
errorMessage: "request_too_large: Request size exceeds model context window",
} as EmbeddedRunAttemptResult["lastAssistant"],
}),
)
.mockResolvedValueOnce(makeAttemptResult({ promptError: null }));
mockedCompactDirect.mockResolvedValueOnce({
ok: true,
compacted: true,
result: {
summary: "Compacted session",
firstKeptEntryId: "entry-5",
tokensBefore: 150000,
},
});
const result = await runEmbeddedPiAgent(baseParams);
expect(mockedCompactDirect).toHaveBeenCalledTimes(1);
expect(mockedRunEmbeddedAttempt).toHaveBeenCalledTimes(2);
expect(log.warn).toHaveBeenCalledWith(expect.stringContaining("source=assistantError"));
expect(result.meta.error).toBeUndefined();
});
it("does not treat stale assistant overflow as current-attempt overflow when promptError is non-overflow", async () => {
mockedRunEmbeddedAttempt.mockResolvedValue(
makeAttemptResult({
promptError: new Error("transport disconnected"),
lastAssistant: {
stopReason: "error",
errorMessage: "request_too_large: Request size exceeds model context window",
} as EmbeddedRunAttemptResult["lastAssistant"],
}),
);
await expect(runEmbeddedPiAgent(baseParams)).rejects.toThrow("transport disconnected");
expect(mockedCompactDirect).not.toHaveBeenCalled();
expect(log.warn).not.toHaveBeenCalledWith(expect.stringContaining("source=assistantError"));
});
});

View File

@@ -74,6 +74,66 @@ function scrubAnthropicRefusalMagic(prompt: string): string {
);
}
type UsageAccumulator = {
input: number;
output: number;
cacheRead: number;
cacheWrite: number;
total: number;
};
const createUsageAccumulator = (): UsageAccumulator => ({
input: 0,
output: 0,
cacheRead: 0,
cacheWrite: 0,
total: 0,
});
const hasUsageValues = (
usage: ReturnType<typeof normalizeUsage>,
): usage is NonNullable<ReturnType<typeof normalizeUsage>> =>
!!usage &&
[usage.input, usage.output, usage.cacheRead, usage.cacheWrite, usage.total].some(
(value) => typeof value === "number" && Number.isFinite(value) && value > 0,
);
const mergeUsageIntoAccumulator = (
target: UsageAccumulator,
usage: ReturnType<typeof normalizeUsage>,
) => {
if (!hasUsageValues(usage)) {
return;
}
target.input += usage.input ?? 0;
target.output += usage.output ?? 0;
target.cacheRead += usage.cacheRead ?? 0;
target.cacheWrite += usage.cacheWrite ?? 0;
target.total +=
usage.total ??
(usage.input ?? 0) + (usage.output ?? 0) + (usage.cacheRead ?? 0) + (usage.cacheWrite ?? 0);
};
const toNormalizedUsage = (usage: UsageAccumulator) => {
const hasUsage =
usage.input > 0 ||
usage.output > 0 ||
usage.cacheRead > 0 ||
usage.cacheWrite > 0 ||
usage.total > 0;
if (!hasUsage) {
return undefined;
}
const derivedTotal = usage.input + usage.output + usage.cacheRead + usage.cacheWrite;
return {
input: usage.input || undefined,
output: usage.output || undefined,
cacheRead: usage.cacheRead || undefined,
cacheWrite: usage.cacheWrite || undefined,
total: usage.total || derivedTotal || undefined,
};
};
export async function runEmbeddedPiAgent(
params: RunEmbeddedPiAgentParams,
): Promise<EmbeddedPiRunResult> {
@@ -326,6 +386,8 @@ export async function runEmbeddedPiAgent(
const MAX_OVERFLOW_COMPACTION_ATTEMPTS = 3;
let overflowCompactionAttempts = 0;
let toolResultTruncationAttempted = false;
const usageAccumulator = createUsageAccumulator();
let autoCompactionCount = 0;
try {
while (true) {
attemptedThinking.add(thinkLevel);
@@ -392,119 +454,151 @@ export async function runEmbeddedPiAgent(
});
const { aborted, promptError, timedOut, sessionIdUsed, lastAssistant } = attempt;
mergeUsageIntoAccumulator(
usageAccumulator,
attempt.attemptUsage ?? normalizeUsage(lastAssistant?.usage as UsageLike),
);
autoCompactionCount += Math.max(0, attempt.compactionCount ?? 0);
const formattedAssistantErrorText = lastAssistant
? formatAssistantErrorText(lastAssistant, {
cfg: params.config,
sessionKey: params.sessionKey ?? params.sessionId,
})
: undefined;
const assistantErrorText =
lastAssistant?.stopReason === "error"
? lastAssistant.errorMessage?.trim() || formattedAssistantErrorText
: undefined;
if (promptError && !aborted) {
const errorText = describeUnknownError(promptError);
if (isContextOverflowError(errorText)) {
const msgCount = attempt.messagesSnapshot?.length ?? 0;
const contextOverflowError = !aborted
? (() => {
if (promptError) {
const errorText = describeUnknownError(promptError);
if (isContextOverflowError(errorText)) {
return { text: errorText, source: "promptError" as const };
}
// Prompt submission failed with a non-overflow error. Do not
// inspect prior assistant errors from history for this attempt.
return null;
}
if (assistantErrorText && isContextOverflowError(assistantErrorText)) {
return { text: assistantErrorText, source: "assistantError" as const };
}
return null;
})()
: null;
if (contextOverflowError) {
const errorText = contextOverflowError.text;
const msgCount = attempt.messagesSnapshot?.length ?? 0;
log.warn(
`[context-overflow-diag] sessionKey=${params.sessionKey ?? params.sessionId} ` +
`provider=${provider}/${modelId} source=${contextOverflowError.source} ` +
`messages=${msgCount} sessionFile=${params.sessionFile} ` +
`compactionAttempts=${overflowCompactionAttempts} error=${errorText.slice(0, 200)}`,
);
const isCompactionFailure = isCompactionFailureError(errorText);
// Attempt auto-compaction on context overflow (not compaction_failure)
if (
!isCompactionFailure &&
overflowCompactionAttempts < MAX_OVERFLOW_COMPACTION_ATTEMPTS
) {
overflowCompactionAttempts++;
log.warn(
`[context-overflow-diag] sessionKey=${params.sessionKey ?? params.sessionId} ` +
`provider=${provider}/${modelId} messages=${msgCount} ` +
`sessionFile=${params.sessionFile} compactionAttempts=${overflowCompactionAttempts} ` +
`error=${errorText.slice(0, 200)}`,
`context overflow detected (attempt ${overflowCompactionAttempts}/${MAX_OVERFLOW_COMPACTION_ATTEMPTS}); attempting auto-compaction for ${provider}/${modelId}`,
);
const isCompactionFailure = isCompactionFailureError(errorText);
// Attempt auto-compaction on context overflow (not compaction_failure)
if (
!isCompactionFailure &&
overflowCompactionAttempts < MAX_OVERFLOW_COMPACTION_ATTEMPTS
) {
overflowCompactionAttempts++;
const compactResult = await compactEmbeddedPiSessionDirect({
sessionId: params.sessionId,
sessionKey: params.sessionKey,
messageChannel: params.messageChannel,
messageProvider: params.messageProvider,
agentAccountId: params.agentAccountId,
authProfileId: lastProfileId,
sessionFile: params.sessionFile,
workspaceDir: resolvedWorkspace,
agentDir,
config: params.config,
skillsSnapshot: params.skillsSnapshot,
senderIsOwner: params.senderIsOwner,
provider,
model: modelId,
thinkLevel,
reasoningLevel: params.reasoningLevel,
bashElevated: params.bashElevated,
extraSystemPrompt: params.extraSystemPrompt,
ownerNumbers: params.ownerNumbers,
});
if (compactResult.compacted) {
autoCompactionCount += 1;
log.info(`auto-compaction succeeded for ${provider}/${modelId}; retrying prompt`);
continue;
}
log.warn(
`auto-compaction failed for ${provider}/${modelId}: ${compactResult.reason ?? "nothing to compact"}`,
);
}
// Fallback: try truncating oversized tool results in the session.
// This handles the case where a single tool result exceeds the
// context window and compaction cannot reduce it further.
if (!toolResultTruncationAttempted) {
const contextWindowTokens = ctxInfo.tokens;
const hasOversized = attempt.messagesSnapshot
? sessionLikelyHasOversizedToolResults({
messages: attempt.messagesSnapshot,
contextWindowTokens,
})
: false;
if (hasOversized) {
toolResultTruncationAttempted = true;
log.warn(
`context overflow detected (attempt ${overflowCompactionAttempts}/${MAX_OVERFLOW_COMPACTION_ATTEMPTS}); attempting auto-compaction for ${provider}/${modelId}`,
`[context-overflow-recovery] Attempting tool result truncation for ${provider}/${modelId} ` +
`(contextWindow=${contextWindowTokens} tokens)`,
);
const compactResult = await compactEmbeddedPiSessionDirect({
const truncResult = await truncateOversizedToolResultsInSession({
sessionFile: params.sessionFile,
contextWindowTokens,
sessionId: params.sessionId,
sessionKey: params.sessionKey,
messageChannel: params.messageChannel,
messageProvider: params.messageProvider,
agentAccountId: params.agentAccountId,
authProfileId: lastProfileId,
sessionFile: params.sessionFile,
workspaceDir: resolvedWorkspace,
agentDir,
config: params.config,
skillsSnapshot: params.skillsSnapshot,
senderIsOwner: params.senderIsOwner,
provider,
model: modelId,
thinkLevel,
reasoningLevel: params.reasoningLevel,
bashElevated: params.bashElevated,
extraSystemPrompt: params.extraSystemPrompt,
ownerNumbers: params.ownerNumbers,
});
if (compactResult.compacted) {
log.info(`auto-compaction succeeded for ${provider}/${modelId}; retrying prompt`);
if (truncResult.truncated) {
log.info(
`[context-overflow-recovery] Truncated ${truncResult.truncatedCount} tool result(s); retrying prompt`,
);
// Session is now smaller; allow compaction retries again.
overflowCompactionAttempts = 0;
continue;
}
log.warn(
`auto-compaction failed for ${provider}/${modelId}: ${compactResult.reason ?? "nothing to compact"}`,
`[context-overflow-recovery] Tool result truncation did not help: ${truncResult.reason ?? "unknown"}`,
);
}
// Fallback: try truncating oversized tool results in the session.
// This handles the case where a single tool result (e.g., reading a
// huge file or getting a massive PR diff) exceeds the context window,
// and compaction can't help because there's no older history to compact.
if (!toolResultTruncationAttempted) {
const contextWindowTokens = ctxInfo.tokens;
const hasOversized = attempt.messagesSnapshot
? sessionLikelyHasOversizedToolResults({
messages: attempt.messagesSnapshot,
contextWindowTokens,
})
: false;
if (hasOversized) {
toolResultTruncationAttempted = true;
log.warn(
`[context-overflow-recovery] Attempting tool result truncation for ${provider}/${modelId} ` +
`(contextWindow=${contextWindowTokens} tokens)`,
);
const truncResult = await truncateOversizedToolResultsInSession({
sessionFile: params.sessionFile,
contextWindowTokens,
sessionId: params.sessionId,
sessionKey: params.sessionKey,
});
if (truncResult.truncated) {
log.info(
`[context-overflow-recovery] Truncated ${truncResult.truncatedCount} tool result(s); retrying prompt`,
);
// Reset compaction attempts so compaction can be tried again
// after truncation (the session is now smaller)
overflowCompactionAttempts = 0;
continue;
}
log.warn(
`[context-overflow-recovery] Tool result truncation did not help: ${truncResult.reason ?? "unknown"}`,
);
}
}
const kind = isCompactionFailure ? "compaction_failure" : "context_overflow";
return {
payloads: [
{
text:
"Context overflow: prompt too large for the model. " +
"Try again with less input or a larger-context model.",
isError: true,
},
],
meta: {
durationMs: Date.now() - started,
agentMeta: {
sessionId: sessionIdUsed,
provider,
model: model.id,
},
systemPromptReport: attempt.systemPromptReport,
error: { kind, message: errorText },
},
};
}
const kind = isCompactionFailure ? "compaction_failure" : "context_overflow";
return {
payloads: [
{
text:
"Context overflow: prompt too large for the model. " +
"Try again with less input or a larger-context model.",
isError: true,
},
],
meta: {
durationMs: Date.now() - started,
agentMeta: {
sessionId: sessionIdUsed,
provider,
model: model.id,
},
systemPromptReport: attempt.systemPromptReport,
error: { kind, message: errorText },
},
};
}
if (promptError && !aborted) {
const errorText = describeUnknownError(promptError);
// Handle role ordering errors with a user-friendly message
if (/incorrect role information|roles must alternate/i.test(errorText)) {
return {
@@ -702,12 +796,13 @@ export async function runEmbeddedPiAgent(
}
}
const usage = normalizeUsage(lastAssistant?.usage as UsageLike);
const usage = toNormalizedUsage(usageAccumulator);
const agentMeta: EmbeddedPiAgentMeta = {
sessionId: sessionIdUsed,
provider: lastAssistant?.provider ?? provider,
model: lastAssistant?.model ?? model.id,
usage,
compactionCount: autoCompactionCount > 0 ? autoCompactionCount : undefined,
};
const payloads = buildEmbeddedRunPayloads({

View File

@@ -650,6 +650,8 @@ export async function runEmbeddedAttempt(
getMessagingToolSentTargets,
didSendViaMessagingTool,
getLastToolError,
getUsageTotals,
getCompactionCount,
} = subscription;
const queueHandle: EmbeddedPiQueueHandle = {
@@ -908,6 +910,8 @@ export async function runEmbeddedAttempt(
cloudCodeAssistFormatError: Boolean(
lastAssistant?.errorMessage && isCloudCodeAssistFormatError(lastAssistant.errorMessage),
),
attemptUsage: getUsageTotals(),
compactionCount: getCompactionCount(),
// Client tool call detected (OpenResponses hosted tools)
clientToolCall: clientToolCallDetected ?? undefined,
};

View File

@@ -9,6 +9,7 @@ import type { MessagingToolSend } from "../../pi-embedded-messaging.js";
import type { BlockReplyChunking, ToolResultFormat } from "../../pi-embedded-subscribe.js";
import type { AuthStorage, ModelRegistry } from "../../pi-model-discovery.js";
import type { SkillSnapshot } from "../../skills.js";
import type { NormalizedUsage } from "../../usage.js";
import type { ClientToolDefinition } from "./params.js";
export type EmbeddedRunAttemptParams = {
@@ -106,6 +107,8 @@ export type EmbeddedRunAttemptResult = {
messagingToolSentTexts: string[];
messagingToolSentTargets: MessagingToolSend[];
cloudCodeAssistFormatError: boolean;
attemptUsage?: NormalizedUsage;
compactionCount?: number;
/** Client tool call detected (OpenResponses hosted tools). */
clientToolCall?: { name: string; params: Record<string, unknown> };
};

View File

@@ -5,6 +5,7 @@ export type EmbeddedPiAgentMeta = {
sessionId: string;
provider: string;
model: string;
compactionCount?: number;
usage?: {
input?: number;
output?: number;

View File

@@ -21,6 +21,7 @@ export function handleAgentStart(ctx: EmbeddedPiSubscribeContext) {
export function handleAutoCompactionStart(ctx: EmbeddedPiSubscribeContext) {
ctx.state.compactionInFlight = true;
ctx.incrementCompactionCount();
ctx.ensureCompactionPromise();
ctx.log.debug(`embedded run compaction start: runId=${ctx.params.runId}`);
emitAgentEvent({

View File

@@ -198,6 +198,7 @@ export function handleMessageEnd(
}
const assistantMessage = msg;
ctx.recordAssistantUsage((assistantMessage as { usage?: unknown }).usage);
promoteThinkingTagsToBlocks(assistantMessage);
const rawText = extractAssistantText(assistantMessage);

View File

@@ -8,6 +8,7 @@ import type {
BlockReplyChunking,
SubscribeEmbeddedPiSessionParams,
} from "./pi-embedded-subscribe.types.js";
import type { NormalizedUsage } from "./usage.js";
export type EmbeddedSubscribeLogger = {
debug: (message: string) => void;
@@ -100,6 +101,10 @@ export type EmbeddedPiSubscribeContext = {
noteCompactionRetry: () => void;
resolveCompactionRetry: () => void;
maybeResolveCompactionWait: () => void;
recordAssistantUsage: (usage: unknown) => void;
incrementCompactionCount: () => void;
getUsageTotals: () => NormalizedUsage | undefined;
getCompactionCount: () => number;
};
export type EmbeddedPiSubscribeEvent =

View File

@@ -16,6 +16,7 @@ import {
} from "./pi-embedded-helpers.js";
import { createEmbeddedPiSessionEventHandler } from "./pi-embedded-subscribe.handlers.js";
import { formatReasoningMessage } from "./pi-embedded-utils.js";
import { hasNonzeroUsage, normalizeUsage, type UsageLike } from "./usage.js";
const THINKING_TAG_SCAN_RE = /<\s*(\/?)\s*(?:think(?:ing)?|thought|antthinking)\s*>/gi;
const FINAL_TAG_SCAN_RE = /<\s*(\/?)\s*final\s*>/gi;
@@ -69,6 +70,14 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar
pendingMessagingTexts: new Map(),
pendingMessagingTargets: new Map(),
};
const usageTotals = {
input: 0,
output: 0,
cacheRead: 0,
cacheWrite: 0,
total: 0,
};
let compactionCount = 0;
const assistantTexts = state.assistantTexts;
const toolMetas = state.toolMetas;
@@ -222,6 +231,43 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar
state.compactionRetryPromise = null;
}
};
const recordAssistantUsage = (usageLike: unknown) => {
const usage = normalizeUsage((usageLike ?? undefined) as UsageLike | undefined);
if (!hasNonzeroUsage(usage)) {
return;
}
usageTotals.input += usage.input ?? 0;
usageTotals.output += usage.output ?? 0;
usageTotals.cacheRead += usage.cacheRead ?? 0;
usageTotals.cacheWrite += usage.cacheWrite ?? 0;
const usageTotal =
usage.total ??
(usage.input ?? 0) + (usage.output ?? 0) + (usage.cacheRead ?? 0) + (usage.cacheWrite ?? 0);
usageTotals.total += usageTotal;
};
const getUsageTotals = () => {
const hasUsage =
usageTotals.input > 0 ||
usageTotals.output > 0 ||
usageTotals.cacheRead > 0 ||
usageTotals.cacheWrite > 0 ||
usageTotals.total > 0;
if (!hasUsage) {
return undefined;
}
const derivedTotal =
usageTotals.input + usageTotals.output + usageTotals.cacheRead + usageTotals.cacheWrite;
return {
input: usageTotals.input || undefined,
output: usageTotals.output || undefined,
cacheRead: usageTotals.cacheRead || undefined,
cacheWrite: usageTotals.cacheWrite || undefined,
total: usageTotals.total || derivedTotal || undefined,
};
};
const incrementCompactionCount = () => {
compactionCount += 1;
};
const blockChunking = params.blockReplyChunking;
const blockChunker = blockChunking ? new EmbeddedBlockChunker(blockChunking) : null;
@@ -530,6 +576,10 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar
noteCompactionRetry,
resolveCompactionRetry,
maybeResolveCompactionWait,
recordAssistantUsage,
incrementCompactionCount,
getUsageTotals,
getCompactionCount: () => compactionCount,
};
const unsubscribe = params.session.subscribe(createEmbeddedPiSessionEventHandler(ctx));
@@ -546,6 +596,8 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar
// which is generated AFTER the tool sends the actual answer.
didSendViaMessagingTool: () => messagingToolSentTexts.length > 0,
getLastToolError: () => (state.lastToolError ? { ...state.lastToolError } : undefined),
getUsageTotals,
getCompactionCount: () => compactionCount,
waitForCompactionRetry: () => {
if (state.compactionInFlight || state.pendingCompactionRetry > 0) {
ensureCompactionPromise();

View File

@@ -1,6 +1,8 @@
import { beforeEach, describe, expect, it, vi } from "vitest";
const agentSpy = vi.fn(async () => ({ runId: "run-main", status: "ok" }));
const sessionsDeleteSpy = vi.fn();
const readLatestAssistantReplyMock = vi.fn(async () => "raw subagent reply");
const embeddedRunMock = {
isEmbeddedPiRunActive: vi.fn(() => false),
isEmbeddedPiRunStreaming: vi.fn(() => false),
@@ -28,6 +30,7 @@ vi.mock("../gateway/call.js", () => ({
return {};
}
if (typed.method === "sessions.delete") {
sessionsDeleteSpy(typed);
return {};
}
return {};
@@ -35,7 +38,7 @@ vi.mock("../gateway/call.js", () => ({
}));
vi.mock("./tools/agent-step.js", () => ({
readLatestAssistantReply: vi.fn(async () => "raw subagent reply"),
readLatestAssistantReply: readLatestAssistantReplyMock,
}));
vi.mock("../config/sessions.js", () => ({
@@ -60,10 +63,12 @@ vi.mock("../config/config.js", async (importOriginal) => {
describe("subagent announce formatting", () => {
beforeEach(() => {
agentSpy.mockClear();
sessionsDeleteSpy.mockClear();
embeddedRunMock.isEmbeddedPiRunActive.mockReset().mockReturnValue(false);
embeddedRunMock.isEmbeddedPiRunStreaming.mockReset().mockReturnValue(false);
embeddedRunMock.queueEmbeddedPiMessage.mockReset().mockReturnValue(false);
embeddedRunMock.waitForEmbeddedPiRunEnd.mockReset().mockResolvedValue(true);
readLatestAssistantReplyMock.mockReset().mockResolvedValue("raw subagent reply");
sessionStore = {};
configOverride = {
session: {
@@ -356,6 +361,95 @@ describe("subagent announce formatting", () => {
expect(call?.params?.accountId).toBe("acct-123");
});
it("retries reading subagent output when early lifecycle completion had no text", async () => {
const { runSubagentAnnounceFlow } = await import("./subagent-announce.js");
embeddedRunMock.isEmbeddedPiRunActive.mockReturnValueOnce(true).mockReturnValue(false);
embeddedRunMock.waitForEmbeddedPiRunEnd.mockResolvedValue(true);
readLatestAssistantReplyMock
.mockResolvedValueOnce(undefined)
.mockResolvedValueOnce("Read #12 complete.");
sessionStore = {
"agent:main:subagent:test": {
sessionId: "child-session-1",
},
};
await runSubagentAnnounceFlow({
childSessionKey: "agent:main:subagent:test",
childRunId: "run-child",
requesterSessionKey: "agent:main:main",
requesterDisplayKey: "main",
task: "context-stress-test",
timeoutMs: 1000,
cleanup: "keep",
waitForCompletion: false,
startedAt: 10,
endedAt: 20,
outcome: { status: "ok" },
});
expect(embeddedRunMock.waitForEmbeddedPiRunEnd).toHaveBeenCalledWith("child-session-1", 1000);
const call = agentSpy.mock.calls[0]?.[0] as { params?: { message?: string } };
expect(call?.params?.message).toContain("Read #12 complete.");
expect(call?.params?.message).not.toContain("(no output)");
});
it("defers announce when child run is still active after wait timeout", async () => {
const { runSubagentAnnounceFlow } = await import("./subagent-announce.js");
embeddedRunMock.isEmbeddedPiRunActive.mockReturnValue(true);
embeddedRunMock.waitForEmbeddedPiRunEnd.mockResolvedValue(false);
sessionStore = {
"agent:main:subagent:test": {
sessionId: "child-session-active",
},
};
const didAnnounce = await runSubagentAnnounceFlow({
childSessionKey: "agent:main:subagent:test",
childRunId: "run-child-active",
requesterSessionKey: "agent:main:main",
requesterDisplayKey: "main",
task: "context-stress-test",
timeoutMs: 1000,
cleanup: "keep",
waitForCompletion: false,
startedAt: 10,
endedAt: 20,
outcome: { status: "ok" },
});
expect(didAnnounce).toBe(false);
expect(agentSpy).not.toHaveBeenCalled();
});
it("does not delete child session when announce is deferred for an active run", async () => {
const { runSubagentAnnounceFlow } = await import("./subagent-announce.js");
embeddedRunMock.isEmbeddedPiRunActive.mockReturnValue(true);
embeddedRunMock.waitForEmbeddedPiRunEnd.mockResolvedValue(false);
sessionStore = {
"agent:main:subagent:test": {
sessionId: "child-session-active",
},
};
const didAnnounce = await runSubagentAnnounceFlow({
childSessionKey: "agent:main:subagent:test",
childRunId: "run-child-active-delete",
requesterSessionKey: "agent:main:main",
requesterDisplayKey: "main",
task: "context-stress-test",
timeoutMs: 1000,
cleanup: "delete",
waitForCompletion: false,
startedAt: 10,
endedAt: 20,
outcome: { status: "ok" },
});
expect(didAnnounce).toBe(false);
expect(sessionsDeleteSpy).not.toHaveBeenCalled();
});
it("normalizes requesterOrigin for direct announce delivery", async () => {
const { runSubagentAnnounceFlow } = await import("./subagent-announce.js");
embeddedRunMock.isEmbeddedPiRunActive.mockReturnValue(false);

View File

@@ -17,7 +17,11 @@ import {
mergeDeliveryContext,
normalizeDeliveryContext,
} from "../utils/delivery-context.js";
import { isEmbeddedPiRunActive, queueEmbeddedPiMessage } from "./pi-embedded.js";
import {
isEmbeddedPiRunActive,
queueEmbeddedPiMessage,
waitForEmbeddedPiRunEnd,
} from "./pi-embedded.js";
import { type AnnounceQueueItem, enqueueAnnounce } from "./subagent-announce-queue.js";
import { readLatestAssistantReply } from "./tools/agent-step.js";
@@ -288,6 +292,35 @@ async function buildSubagentStatsLine(params: {
return `Stats: ${parts.join(" \u2022 ")}`;
}
function loadSessionEntryByKey(sessionKey: string) {
const cfg = loadConfig();
const agentId = resolveAgentIdFromSessionKey(sessionKey);
const storePath = resolveStorePath(cfg.session?.store, { agentId });
const store = loadSessionStore(storePath);
return store[sessionKey];
}
async function readLatestAssistantReplyWithRetry(params: {
sessionKey: string;
initialReply?: string;
maxWaitMs: number;
}): Promise<string | undefined> {
let reply = params.initialReply?.trim() ? params.initialReply : undefined;
if (reply) {
return reply;
}
const deadline = Date.now() + Math.max(0, Math.min(params.maxWaitMs, 15_000));
while (Date.now() < deadline) {
await new Promise((resolve) => setTimeout(resolve, 300));
const latest = await readLatestAssistantReply({ sessionKey: params.sessionKey });
if (latest?.trim()) {
return latest;
}
}
return reply;
}
export function buildSubagentSystemPrompt(params: {
requesterSessionKey?: string;
requesterOrigin?: DeliveryContext;
@@ -365,12 +398,33 @@ export async function runSubagentAnnounceFlow(params: {
announceType?: SubagentAnnounceType;
}): Promise<boolean> {
let didAnnounce = false;
let shouldDeleteChildSession = params.cleanup === "delete";
try {
const requesterOrigin = normalizeDeliveryContext(params.requesterOrigin);
const childSessionId = (() => {
const entry = loadSessionEntryByKey(params.childSessionKey);
return typeof entry?.sessionId === "string" && entry.sessionId.trim()
? entry.sessionId.trim()
: undefined;
})();
const settleTimeoutMs = Math.min(Math.max(params.timeoutMs, 1), 120_000);
let reply = params.roundOneReply;
let outcome: SubagentRunOutcome | undefined = params.outcome;
// Lifecycle "end" can arrive before auto-compaction retries finish. If the
// subagent is still active, wait for the embedded run to fully settle.
if (childSessionId && isEmbeddedPiRunActive(childSessionId)) {
const settled = await waitForEmbeddedPiRunEnd(childSessionId, settleTimeoutMs);
if (!settled && isEmbeddedPiRunActive(childSessionId)) {
// The child run is still active (e.g., compaction retry still in progress).
// Defer announcement so we don't report stale/partial output.
// Keep the child session so output is not lost while the run is still active.
shouldDeleteChildSession = false;
return false;
}
}
if (!reply && params.waitForCompletion !== false) {
const waitMs = Math.min(params.timeoutMs, 60_000);
const waitMs = settleTimeoutMs;
const wait = await callGateway<{
status?: string;
startedAt?: number;
@@ -403,17 +457,27 @@ export async function runSubagentAnnounceFlow(params: {
outcome = { status: "timeout" };
}
}
reply = await readLatestAssistantReply({
sessionKey: params.childSessionKey,
});
reply = await readLatestAssistantReply({ sessionKey: params.childSessionKey });
}
if (!reply) {
reply = await readLatestAssistantReply({
reply = await readLatestAssistantReply({ sessionKey: params.childSessionKey });
}
if (!reply?.trim()) {
reply = await readLatestAssistantReplyWithRetry({
sessionKey: params.childSessionKey,
initialReply: reply,
maxWaitMs: params.timeoutMs,
});
}
if (!reply?.trim() && childSessionId && isEmbeddedPiRunActive(childSessionId)) {
// Avoid announcing "(no output)" while the child run is still producing output.
shouldDeleteChildSession = false;
return false;
}
if (!outcome) {
outcome = { status: "unknown" };
}
@@ -508,7 +572,7 @@ export async function runSubagentAnnounceFlow(params: {
// Best-effort
}
}
if (params.cleanup === "delete") {
if (shouldDeleteChildSession) {
try {
await callGateway({
method: "sessions.delete",

View File

@@ -230,4 +230,53 @@ describe("subagent registry persistence", () => {
};
expect(afterSecond.runs["run-3"].cleanupCompletedAt).toBeDefined();
});
it("keeps delete-mode runs retryable when announce is deferred", async () => {
tempStateDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-subagent-"));
process.env.OPENCLAW_STATE_DIR = tempStateDir;
const registryPath = path.join(tempStateDir, "subagents", "runs.json");
const persisted = {
version: 2,
runs: {
"run-4": {
runId: "run-4",
childSessionKey: "agent:main:subagent:four",
requesterSessionKey: "agent:main:main",
requesterDisplayKey: "main",
task: "deferred announce",
cleanup: "delete",
createdAt: 1,
startedAt: 1,
endedAt: 2,
},
},
};
await fs.mkdir(path.dirname(registryPath), { recursive: true });
await fs.writeFile(registryPath, `${JSON.stringify(persisted)}\n`, "utf8");
announceSpy.mockResolvedValueOnce(false);
vi.resetModules();
const mod1 = await import("./subagent-registry.js");
mod1.initSubagentRegistry();
await new Promise((r) => setTimeout(r, 0));
expect(announceSpy).toHaveBeenCalledTimes(1);
const afterFirst = JSON.parse(await fs.readFile(registryPath, "utf8")) as {
runs: Record<string, { cleanupHandled?: boolean }>;
};
expect(afterFirst.runs["run-4"]?.cleanupHandled).toBe(false);
announceSpy.mockResolvedValueOnce(true);
vi.resetModules();
const mod2 = await import("./subagent-registry.js");
mod2.initSubagentRegistry();
await new Promise((r) => setTimeout(r, 0));
expect(announceSpy).toHaveBeenCalledTimes(2);
const afterSecond = JSON.parse(await fs.readFile(registryPath, "utf8")) as {
runs?: Record<string, unknown>;
};
expect(afterSecond.runs?.["run-4"]).toBeUndefined();
});
});

View File

@@ -33,6 +33,7 @@ let listenerStarted = false;
let listenerStop: (() => void) | null = null;
// Use var to avoid TDZ when init runs across circular imports during bootstrap.
var restoreAttempted = false;
const SUBAGENT_ANNOUNCE_TIMEOUT_MS = 120_000;
function persistSubagentRuns() {
try {
@@ -68,7 +69,7 @@ function resumeSubagentRun(runId: string) {
requesterOrigin,
requesterDisplayKey: entry.requesterDisplayKey,
task: entry.task,
timeoutMs: 30_000,
timeoutMs: SUBAGENT_ANNOUNCE_TIMEOUT_MS,
cleanup: entry.cleanup,
waitForCompletion: false,
startedAt: entry.startedAt,
@@ -229,7 +230,7 @@ function ensureListener() {
requesterOrigin,
requesterDisplayKey: entry.requesterDisplayKey,
task: entry.task,
timeoutMs: 30_000,
timeoutMs: SUBAGENT_ANNOUNCE_TIMEOUT_MS,
cleanup: entry.cleanup,
waitForCompletion: false,
startedAt: entry.startedAt,
@@ -247,14 +248,14 @@ function finalizeSubagentCleanup(runId: string, cleanup: "delete" | "keep", didA
if (!entry) {
return;
}
if (cleanup === "delete") {
subagentRuns.delete(runId);
if (!didAnnounce) {
// Allow retry on the next wake if announce was deferred or failed.
entry.cleanupHandled = false;
persistSubagentRuns();
return;
}
if (!didAnnounce) {
// Allow retry on the next wake if the announce failed.
entry.cleanupHandled = false;
if (cleanup === "delete") {
subagentRuns.delete(runId);
persistSubagentRuns();
return;
}
@@ -373,7 +374,7 @@ async function waitForSubagentCompletion(runId: string, waitTimeoutMs: number) {
requesterOrigin,
requesterDisplayKey: entry.requesterDisplayKey,
task: entry.task,
timeoutMs: 30_000,
timeoutMs: SUBAGENT_ANNOUNCE_TIMEOUT_MS,
cleanup: entry.cleanup,
waitForCompletion: false,
startedAt: entry.startedAt,

View File

@@ -0,0 +1,14 @@
import { describe, expect, it } from "vitest";
import { resolveAgentTimeoutMs } from "./timeout.js";
describe("resolveAgentTimeoutMs", () => {
it("uses a timer-safe sentinel for no-timeout overrides", () => {
expect(resolveAgentTimeoutMs({ overrideSeconds: 0 })).toBe(2_147_000_000);
expect(resolveAgentTimeoutMs({ overrideMs: 0 })).toBe(2_147_000_000);
});
it("clamps very large timeout overrides to timer-safe values", () => {
expect(resolveAgentTimeoutMs({ overrideSeconds: 9_999_999 })).toBe(2_147_000_000);
expect(resolveAgentTimeoutMs({ overrideMs: 9_999_999_999 })).toBe(2_147_000_000);
});
});

View File

@@ -1,6 +1,7 @@
import type { OpenClawConfig } from "../config/config.js";
const DEFAULT_AGENT_TIMEOUT_SECONDS = 600;
const MAX_SAFE_TIMEOUT_MS = 2_147_000_000;
const normalizeNumber = (value: unknown): number | undefined =>
typeof value === "number" && Number.isFinite(value) ? Math.floor(value) : undefined;
@@ -18,10 +19,11 @@ export function resolveAgentTimeoutMs(opts: {
minMs?: number;
}): number {
const minMs = Math.max(normalizeNumber(opts.minMs) ?? 1, 1);
const defaultMs = resolveAgentTimeoutSeconds(opts.cfg) * 1000;
// Use a very large timeout value (30 days) to represent "no timeout"
// when explicitly set to 0. This avoids setTimeout issues with Infinity.
const NO_TIMEOUT_MS = 30 * 24 * 60 * 60 * 1000;
const clampTimeoutMs = (valueMs: number) =>
Math.min(Math.max(valueMs, minMs), MAX_SAFE_TIMEOUT_MS);
const defaultMs = clampTimeoutMs(resolveAgentTimeoutSeconds(opts.cfg) * 1000);
// Use the maximum timer-safe timeout to represent "no timeout" when explicitly set to 0.
const NO_TIMEOUT_MS = MAX_SAFE_TIMEOUT_MS;
const overrideMs = normalizeNumber(opts.overrideMs);
if (overrideMs !== undefined) {
if (overrideMs === 0) {
@@ -30,7 +32,7 @@ export function resolveAgentTimeoutMs(opts: {
if (overrideMs < 0) {
return defaultMs;
}
return Math.max(overrideMs, minMs);
return clampTimeoutMs(overrideMs);
}
const overrideSeconds = normalizeNumber(opts.overrideSeconds);
if (overrideSeconds !== undefined) {
@@ -40,7 +42,7 @@ export function resolveAgentTimeoutMs(opts: {
if (overrideSeconds < 0) {
return defaultMs;
}
return Math.max(overrideSeconds * 1000, minMs);
return clampTimeoutMs(overrideSeconds * 1000);
}
return Math.max(defaultMs, minMs);
return defaultMs;
}

View File

@@ -1,5 +1,5 @@
import { describe, expect, it } from "vitest";
import { hasNonzeroUsage, normalizeUsage } from "./usage.js";
import { deriveSessionTotalTokens, hasNonzeroUsage, normalizeUsage } from "./usage.js";
describe("normalizeUsage", () => {
it("normalizes Anthropic-style snake_case usage", () => {
@@ -46,4 +46,32 @@ describe("normalizeUsage", () => {
expect(hasNonzeroUsage({ input: 1 })).toBe(true);
expect(hasNonzeroUsage({ total: 1 })).toBe(true);
});
it("caps derived session total tokens to the context window", () => {
expect(
deriveSessionTotalTokens({
usage: {
input: 27,
cacheRead: 2_400_000,
cacheWrite: 0,
total: 2_402_300,
},
contextTokens: 200_000,
}),
).toBe(200_000);
});
it("uses prompt tokens when within context window", () => {
expect(
deriveSessionTotalTokens({
usage: {
input: 1_200,
cacheRead: 300,
cacheWrite: 50,
total: 2_000,
},
contextTokens: 200_000,
}),
).toBe(1_550);
});
});

View File

@@ -103,3 +103,34 @@ export function derivePromptTokens(usage?: {
const sum = input + cacheRead + cacheWrite;
return sum > 0 ? sum : undefined;
}
export function deriveSessionTotalTokens(params: {
usage?: {
input?: number;
total?: number;
cacheRead?: number;
cacheWrite?: number;
};
contextTokens?: number;
}): number | undefined {
const usage = params.usage;
if (!usage) {
return undefined;
}
const input = usage.input ?? 0;
const promptTokens = derivePromptTokens({
input: usage.input,
cacheRead: usage.cacheRead,
cacheWrite: usage.cacheWrite,
});
let total = promptTokens ?? usage.total ?? input;
if (!(total > 0)) {
return undefined;
}
const contextTokens = params.contextTokens;
if (typeof contextTokens === "number" && Number.isFinite(contextTokens) && contextTokens > 0) {
total = Math.min(total, contextTokens);
}
return total;
}

View File

@@ -1,5 +1,9 @@
import { setCliSessionId } from "../../agents/cli-session.js";
import { hasNonzeroUsage, type NormalizedUsage } from "../../agents/usage.js";
import {
deriveSessionTotalTokens,
hasNonzeroUsage,
type NormalizedUsage,
} from "../../agents/usage.js";
import {
type SessionSystemPromptReport,
type SessionEntry,
@@ -32,15 +36,18 @@ export async function persistSessionUsageUpdate(params: {
update: async (entry) => {
const input = params.usage?.input ?? 0;
const output = params.usage?.output ?? 0;
const promptTokens =
input + (params.usage?.cacheRead ?? 0) + (params.usage?.cacheWrite ?? 0);
const resolvedContextTokens = params.contextTokensUsed ?? entry.contextTokens;
const patch: Partial<SessionEntry> = {
inputTokens: input,
outputTokens: output,
totalTokens: promptTokens > 0 ? promptTokens : (params.usage?.total ?? input),
totalTokens:
deriveSessionTotalTokens({
usage: params.usage,
contextTokens: resolvedContextTokens,
}) ?? input,
modelProvider: params.providerUsed ?? entry.modelProvider,
model: params.modelUsed ?? entry.model,
contextTokens: params.contextTokensUsed ?? entry.contextTokens,
contextTokens: resolvedContextTokens,
systemPromptReport: params.systemPromptReport ?? entry.systemPromptReport,
updatedAt: Date.now(),
};

View File

@@ -3,7 +3,7 @@ import { setCliSessionId } from "../../agents/cli-session.js";
import { lookupContextTokens } from "../../agents/context.js";
import { DEFAULT_CONTEXT_TOKENS } from "../../agents/defaults.js";
import { isCliProvider } from "../../agents/model-selection.js";
import { hasNonzeroUsage } from "../../agents/usage.js";
import { deriveSessionTotalTokens, hasNonzeroUsage } from "../../agents/usage.js";
import { type SessionEntry, updateSessionStore } from "../../config/sessions.js";
type RunResult = Awaited<
@@ -37,6 +37,7 @@ export async function updateSessionStoreAfterAgentRun(params: {
} = params;
const usage = result.meta.agentMeta?.usage;
const compactionsThisRun = Math.max(0, result.meta.agentMeta?.compactionCount ?? 0);
const modelUsed = result.meta.agentMeta?.model ?? fallbackModel ?? defaultModel;
const providerUsed = result.meta.agentMeta?.provider ?? fallbackProvider ?? defaultProvider;
const contextTokens =
@@ -64,10 +65,16 @@ export async function updateSessionStoreAfterAgentRun(params: {
if (hasNonzeroUsage(usage)) {
const input = usage.input ?? 0;
const output = usage.output ?? 0;
const promptTokens = input + (usage.cacheRead ?? 0) + (usage.cacheWrite ?? 0);
next.inputTokens = input;
next.outputTokens = output;
next.totalTokens = promptTokens > 0 ? promptTokens : (usage.total ?? input);
next.totalTokens =
deriveSessionTotalTokens({
usage,
contextTokens,
}) ?? input;
}
if (compactionsThisRun > 0) {
next.compactionCount = (entry.compactionCount ?? 0) + compactionsThisRun;
}
sessionStore[sessionKey] = next;
await updateSessionStore(storePath, (store) => {

View File

@@ -33,7 +33,7 @@ import { buildWorkspaceSkillSnapshot } from "../../agents/skills.js";
import { getSkillsSnapshotVersion } from "../../agents/skills/refresh.js";
import { runSubagentAnnounceFlow } from "../../agents/subagent-announce.js";
import { resolveAgentTimeoutMs } from "../../agents/timeout.js";
import { hasNonzeroUsage } from "../../agents/usage.js";
import { deriveSessionTotalTokens, hasNonzeroUsage } from "../../agents/usage.js";
import { ensureAgentWorkspace } from "../../agents/workspace.js";
import {
normalizeThinkLevel,
@@ -454,11 +454,13 @@ export async function runCronIsolatedAgentTurn(params: {
if (hasNonzeroUsage(usage)) {
const input = usage.input ?? 0;
const output = usage.output ?? 0;
const promptTokens = input + (usage.cacheRead ?? 0) + (usage.cacheWrite ?? 0);
cronSession.sessionEntry.inputTokens = input;
cronSession.sessionEntry.outputTokens = output;
cronSession.sessionEntry.totalTokens =
promptTokens > 0 ? promptTokens : (usage.total ?? input);
deriveSessionTotalTokens({
usage,
contextTokens,
}) ?? input;
}
await persistSessionEntry();
}

View File

@@ -294,6 +294,29 @@ describe("callGateway error details", () => {
expect(err?.message).toContain("Bind: loopback");
});
it("does not overflow very large timeout values", async () => {
startMode = "silent";
loadConfig.mockReturnValue({
gateway: { mode: "local", bind: "loopback" },
});
resolveGatewayPort.mockReturnValue(18789);
pickPrimaryTailnetIPv4.mockReturnValue(undefined);
vi.useFakeTimers();
let err: Error | null = null;
const promise = callGateway({ method: "health", timeoutMs: 2_592_010_000 }).catch((caught) => {
err = caught as Error;
});
await vi.advanceTimersByTimeAsync(1);
expect(err).toBeNull();
lastClientOptions?.onClose?.(1006, "");
await promise;
expect(err?.message).toContain("gateway closed (1006");
});
it("fails fast when remote mode is missing remote url", async () => {
loadConfig.mockReturnValue({
gateway: { mode: "remote", bind: "loopback", remote: {} },

View File

@@ -156,7 +156,9 @@ export function buildGatewayConnectionDetails(
export async function callGateway<T = Record<string, unknown>>(
opts: CallGatewayOptions,
): Promise<T> {
const timeoutMs = opts.timeoutMs ?? 10_000;
const timeoutMs =
typeof opts.timeoutMs === "number" && Number.isFinite(opts.timeoutMs) ? opts.timeoutMs : 10_000;
const safeTimerTimeoutMs = Math.max(1, Math.min(Math.floor(timeoutMs), 2_147_483_647));
const config = opts.config ?? loadConfig();
const isRemoteMode = config.gateway?.mode === "remote";
const remote = isRemoteMode ? config.gateway?.remote : undefined;
@@ -299,7 +301,7 @@ export async function callGateway<T = Record<string, unknown>>(
ignoreClose = true;
client.stop();
stop(new Error(formatTimeoutError()));
}, timeoutMs);
}, safeTimerTimeoutMs);
client.start();
});

View File

@@ -127,7 +127,8 @@ export async function waitForAgentJob(params: {
recordAgentRunSnapshot(snapshot);
finish(snapshot);
});
const timer = setTimeout(() => finish(null), Math.max(1, timeoutMs));
const timerDelayMs = Math.max(1, Math.min(Math.floor(timeoutMs), 2_147_483_647));
const timer = setTimeout(() => finish(null), timerDelayMs);
});
}

View File

@@ -564,27 +564,51 @@
/* Compaction indicator */
.compaction-indicator {
align-self: center;
display: inline-flex;
align-items: center;
gap: 6px;
font-size: 13px;
padding: 10px 12px;
line-height: 1.2;
padding: 6px 14px;
margin-bottom: 8px;
border-radius: 999px;
border: 1px solid var(--border);
background: var(--panel-strong);
color: var(--text);
white-space: nowrap;
user-select: none;
animation: fade-in 0.2s var(--ease-out);
}
.compaction-indicator svg {
width: 16px;
height: 16px;
stroke: currentColor;
fill: none;
stroke-width: 1.5px;
stroke-linecap: round;
stroke-linejoin: round;
flex-shrink: 0;
}
.compaction-indicator--active {
animation: compaction-pulse 1.5s ease-in-out infinite;
color: var(--info);
border-color: rgba(59, 130, 246, 0.35);
}
.compaction-indicator--active svg {
animation: compaction-spin 1s linear infinite;
}
.compaction-indicator--complete {
animation: fade-in 0.2s var(--ease-out);
color: var(--ok);
border-color: rgba(34, 197, 94, 0.35);
}
@keyframes compaction-pulse {
0%,
100% {
opacity: 0.7;
}
50% {
opacity: 1;
@keyframes compaction-spin {
to {
transform: rotate(360deg);
}
}

View File

@@ -49,6 +49,68 @@ function createProps(overrides: Partial<ChatProps> = {}): ChatProps {
}
describe("chat view", () => {
it("renders compacting indicator as a badge", () => {
const container = document.createElement("div");
render(
renderChat(
createProps({
compactionStatus: {
active: true,
startedAt: Date.now(),
completedAt: null,
},
}),
),
container,
);
const indicator = container.querySelector(".compaction-indicator--active");
expect(indicator).not.toBeNull();
expect(indicator?.textContent).toContain("Compacting context...");
});
it("renders completion indicator shortly after compaction", () => {
const container = document.createElement("div");
const nowSpy = vi.spyOn(Date, "now").mockReturnValue(1_000);
render(
renderChat(
createProps({
compactionStatus: {
active: false,
startedAt: 900,
completedAt: 900,
},
}),
),
container,
);
const indicator = container.querySelector(".compaction-indicator--complete");
expect(indicator).not.toBeNull();
expect(indicator?.textContent).toContain("Context compacted");
nowSpy.mockRestore();
});
it("hides stale compaction completion indicator", () => {
const container = document.createElement("div");
const nowSpy = vi.spyOn(Date, "now").mockReturnValue(10_000);
render(
renderChat(
createProps({
compactionStatus: {
active: false,
startedAt: 0,
completedAt: 0,
},
}),
),
container,
);
expect(container.querySelector(".compaction-indicator")).toBeNull();
nowSpy.mockRestore();
});
it("shows a stop button when aborting is available", () => {
const container = document.createElement("div");
const onAbort = vi.fn();

View File

@@ -85,7 +85,7 @@ function renderCompactionIndicator(status: CompactionIndicatorStatus | null | un
// Show "compacting..." while active
if (status.active) {
return html`
<div class="callout info compaction-indicator compaction-indicator--active">
<div class="compaction-indicator compaction-indicator--active" role="status" aria-live="polite">
${icons.loader} Compacting context...
</div>
`;
@@ -96,7 +96,7 @@ function renderCompactionIndicator(status: CompactionIndicatorStatus | null | un
const elapsed = Date.now() - status.completedAt;
if (elapsed < COMPACTION_TOAST_DURATION_MS) {
return html`
<div class="callout success compaction-indicator compaction-indicator--complete">
<div class="compaction-indicator compaction-indicator--complete" role="status" aria-live="polite">
${icons.check} Context compacted
</div>
`;
@@ -268,8 +268,6 @@ export function renderChat(props: ChatProps) {
${props.error ? html`<div class="callout danger">${props.error}</div>` : nothing}
${renderCompactionIndicator(props.compactionStatus)}
${
props.focusMode
? html`
@@ -353,6 +351,8 @@ export function renderChat(props: ChatProps) {
: nothing
}
${renderCompactionIndicator(props.compactionStatus)}
${
props.showNewMessages
? html`