Agents: avoid duplicate compaction

This commit is contained in:
Tak Hoffman
2026-02-05 22:46:45 -06:00
parent 9f703a44dc
commit 508791fd15
9 changed files with 421 additions and 21 deletions

View File

@@ -58,6 +58,7 @@ const makeAttempt = (overrides: Partial<EmbeddedRunAttemptResult>): EmbeddedRunA
messagingToolSentTexts: [],
messagingToolSentTargets: [],
cloudCodeAssistFormatError: false,
didAutoCompaction: false,
...overrides,
});

View File

@@ -171,6 +171,7 @@ function makeAttemptResult(
messagingToolSentTexts: [],
messagingToolSentTargets: [],
cloudCodeAssistFormatError: false,
didAutoCompaction: false,
...overrides,
};
}
@@ -216,10 +217,12 @@ describe("overflow compaction in run loop", () => {
expect(mockedRunEmbeddedAttempt).toHaveBeenCalledTimes(2);
expect(log.warn).toHaveBeenCalledWith(
expect.stringContaining(
"context overflow detected (attempt 1/3); attempting auto-compaction",
"[openclaw-overflow-compaction] context overflow detected (attempt 1/3); attempting manual compaction",
),
);
expect(log.info).toHaveBeenCalledWith(expect.stringContaining("auto-compaction succeeded"));
expect(log.info).toHaveBeenCalledWith(
expect.stringContaining("[openclaw-overflow-compaction] manual compaction succeeded"),
);
// Should not be an error result
expect(result.meta.error).toBeUndefined();
});
@@ -241,7 +244,9 @@ describe("overflow compaction in run loop", () => {
expect(mockedRunEmbeddedAttempt).toHaveBeenCalledTimes(1);
expect(result.meta.error?.kind).toBe("context_overflow");
expect(result.payloads?.[0]?.isError).toBe(true);
expect(log.warn).toHaveBeenCalledWith(expect.stringContaining("auto-compaction failed"));
expect(log.warn).toHaveBeenCalledWith(
expect.stringContaining("[openclaw-overflow-compaction] manual compaction failed"),
);
});
it("retries compaction up to 3 times before giving up", async () => {
@@ -323,4 +328,19 @@ describe("overflow compaction in run loop", () => {
expect(mockedRunEmbeddedAttempt).toHaveBeenCalledTimes(1);
expect(result.meta.error?.kind).toBe("compaction_failure");
});
it("skips manual compaction if auto-compaction already ran", async () => {
const overflowError = new Error("request_too_large: Request size exceeds model context window");
mockedRunEmbeddedAttempt.mockResolvedValue(
makeAttemptResult({ promptError: overflowError, didAutoCompaction: true }),
);
const result = await runEmbeddedPiAgent(baseParams);
expect(mockedCompactDirect).not.toHaveBeenCalled();
expect(mockedRunEmbeddedAttempt).toHaveBeenCalledTimes(1);
expect(result.meta.error?.kind).toBe("context_overflow");
expect(result.payloads?.[0]?.isError).toBe(true);
});
});

View File

@@ -399,14 +399,16 @@ export async function runEmbeddedPiAgent(
`error=${errorText.slice(0, 200)}`,
);
const isCompactionFailure = isCompactionFailureError(errorText);
// Attempt auto-compaction on context overflow (not compaction_failure)
// Attempt manual overflow compaction on context overflow (not compaction_failure).
// If Pi already auto-compacted during this attempt, skip our manual compaction to avoid duplicates.
if (
!isCompactionFailure &&
overflowCompactionAttempts < MAX_OVERFLOW_COMPACTION_ATTEMPTS
overflowCompactionAttempts < MAX_OVERFLOW_COMPACTION_ATTEMPTS &&
!attempt.didAutoCompaction
) {
overflowCompactionAttempts++;
log.warn(
`context overflow detected (attempt ${overflowCompactionAttempts}/${MAX_OVERFLOW_COMPACTION_ATTEMPTS}); attempting auto-compaction for ${provider}/${modelId}`,
`[openclaw-overflow-compaction] context overflow detected (attempt ${overflowCompactionAttempts}/${MAX_OVERFLOW_COMPACTION_ATTEMPTS}); attempting manual compaction for ${provider}/${modelId}`,
);
const compactResult = await compactEmbeddedPiSessionDirect({
sessionId: params.sessionId,
@@ -430,11 +432,13 @@ export async function runEmbeddedPiAgent(
ownerNumbers: params.ownerNumbers,
});
if (compactResult.compacted) {
log.info(`auto-compaction succeeded for ${provider}/${modelId}; retrying prompt`);
log.info(
`[openclaw-overflow-compaction] manual compaction succeeded for ${provider}/${modelId}; retrying prompt`,
);
continue;
}
log.warn(
`auto-compaction failed for ${provider}/${modelId}: ${compactResult.reason ?? "nothing to compact"}`,
`[openclaw-overflow-compaction] manual compaction failed for ${provider}/${modelId}: ${compactResult.reason ?? "nothing to compact"}`,
);
}
const kind = isCompactionFailure ? "compaction_failure" : "context_overflow";

View File

@@ -0,0 +1,366 @@
import type { Api, Model } from "@mariozechner/pi-ai";
import { beforeEach, describe, expect, it, vi } from "vitest";
const appendCacheTtlTimestamp = vi.fn();
const isCacheTtlEligibleProvider = vi.fn(() => true);
const waitOrder: string[] = [];
const didAutoCompaction = vi.fn();
const waitForCompactionRetry = vi.fn(async () => {
waitOrder.push("wait");
});
vi.mock("../cache-ttl.js", () => ({
appendCacheTtlTimestamp: (...args: unknown[]) => appendCacheTtlTimestamp(...args),
isCacheTtlEligibleProvider: (...args: unknown[]) => isCacheTtlEligibleProvider(...args),
}));
vi.mock("../../pi-embedded-subscribe.js", () => ({
subscribeEmbeddedPiSession: () => ({
assistantTexts: [],
toolMetas: [],
unsubscribe: vi.fn(),
waitForCompactionRetry,
didAutoCompaction: () => didAutoCompaction(),
isCompacting: () => false,
getMessagingToolSentTexts: () => [],
getMessagingToolSentTargets: () => [],
didSendViaMessagingTool: () => false,
getLastToolError: () => undefined,
}),
}));
vi.mock("@mariozechner/pi-ai", () => ({
streamSimple: vi.fn(),
}));
vi.mock("@mariozechner/pi-coding-agent", () => {
const sessionManager = {
getLeafEntry: () => null,
branch: vi.fn(),
resetLeaf: vi.fn(),
buildSessionContext: () => ({ messages: [] }),
appendCustomEntry: vi.fn(),
flushPendingToolResults: vi.fn(),
};
const session = {
sessionId: "session:test",
agent: { replaceMessages: vi.fn(), streamFn: vi.fn() },
messages: [],
isStreaming: false,
prompt: vi.fn(async () => {}),
steer: vi.fn(async () => {}),
dispose: vi.fn(),
};
return {
SessionManager: { open: vi.fn(() => sessionManager) },
SettingsManager: { create: vi.fn(() => ({})) },
createAgentSession: vi.fn(async () => ({ session })),
};
});
vi.mock("../../../auto-reply/heartbeat.js", () => ({
resolveHeartbeatPrompt: vi.fn(() => undefined),
}));
vi.mock("../../../config/channel-capabilities.js", () => ({
resolveChannelCapabilities: vi.fn(() => ({ supportsImages: false })),
}));
vi.mock("../../../infra/machine-name.js", () => ({
getMachineDisplayName: vi.fn(() => "test-host"),
}));
vi.mock("../../../media/constants.js", () => ({
MAX_IMAGE_BYTES: 5_000_000,
}));
vi.mock("../../../plugins/hook-runner-global.js", () => ({
getGlobalHookRunner: vi.fn(() => ({ hasHooks: () => false })),
}));
vi.mock("../../../routing/session-key.js", () => ({
isSubagentSessionKey: vi.fn(() => false),
}));
vi.mock("../../../signal/reaction-level.js", () => ({
resolveSignalReactionLevel: vi.fn(() => "off"),
}));
vi.mock("../../../telegram/inline-buttons.js", () => ({
resolveTelegramInlineButtonsScope: vi.fn(() => "off"),
}));
vi.mock("../../../telegram/reaction-level.js", () => ({
resolveTelegramReactionLevel: vi.fn(() => "off"),
}));
vi.mock("../../../tts/tts.js", () => ({
buildTtsSystemPromptHint: vi.fn(() => undefined),
}));
vi.mock("../../../utils.js", () => ({
resolveUserPath: vi.fn((p: string) => p),
}));
vi.mock("../../../utils/message-channel.js", () => ({
normalizeMessageChannel: vi.fn((v?: string) => v),
}));
vi.mock("../../../utils/provider-utils.js", () => ({
isReasoningTagProvider: vi.fn(() => false),
}));
vi.mock("../../agent-paths.js", () => ({
resolveOpenClawAgentDir: vi.fn(() => "/tmp/agent-dir"),
}));
vi.mock("../../agent-scope.js", () => ({
resolveSessionAgentIds: vi.fn(() => ({ sessionAgentId: "main", defaultAgentId: "main" })),
}));
vi.mock("../../anthropic-payload-log.js", () => ({
createAnthropicPayloadLogger: vi.fn(() => ({
recordUsage: vi.fn(),
wrapStreamFn: vi.fn((fn: unknown) => fn),
})),
}));
vi.mock("../../bootstrap-files.js", () => ({
makeBootstrapWarn: vi.fn(() => ({ warn: vi.fn() })),
resolveBootstrapContextForRun: vi.fn(async () => ({
bootstrapFiles: [],
contextFiles: [],
})),
}));
vi.mock("../../cache-trace.js", () => ({
createCacheTrace: vi.fn(() => ({
recordStage: vi.fn(),
wrapStreamFn: vi.fn((fn: unknown) => fn),
})),
}));
vi.mock("../../channel-tools.js", () => ({
listChannelSupportedActions: vi.fn(() => []),
resolveChannelMessageToolHints: vi.fn(() => undefined),
}));
vi.mock("../../docs-path.js", () => ({
resolveOpenClawDocsPath: vi.fn(async () => undefined),
}));
vi.mock("../../failover-error.js", () => ({
isTimeoutError: vi.fn(() => false),
}));
vi.mock("../../model-auth.js", () => ({
resolveModelAuthMode: vi.fn(() => "api-key"),
}));
vi.mock("../../model-selection.js", () => ({
resolveDefaultModelForAgent: vi.fn(() => ({ provider: "anthropic", model: "claude" })),
}));
vi.mock("../../pi-embedded-helpers.js", () => ({
isCloudCodeAssistFormatError: vi.fn(() => false),
resolveBootstrapMaxChars: vi.fn(() => 0),
validateAnthropicTurns: vi.fn(() => {}),
validateGeminiTurns: vi.fn(() => {}),
}));
vi.mock("../../pi-settings.js", () => ({
ensurePiCompactionReserveTokens: vi.fn(() => {}),
resolveCompactionReserveTokensFloor: vi.fn(() => 0),
}));
vi.mock("../../pi-tool-definition-adapter.js", () => ({
toClientToolDefinitions: vi.fn(() => []),
}));
vi.mock("../../pi-tools.js", () => ({
createOpenClawCodingTools: vi.fn(() => []),
}));
vi.mock("../../sandbox.js", () => ({
resolveSandboxContext: vi.fn(async () => undefined),
}));
vi.mock("../../sandbox/runtime-status.js", () => ({
resolveSandboxRuntimeStatus: vi.fn(() => ({ mode: "off", sandboxed: false })),
}));
vi.mock("../../session-file-repair.js", () => ({
repairSessionFileIfNeeded: vi.fn(async () => false),
}));
vi.mock("../../session-tool-result-guard-wrapper.js", () => ({
guardSessionManager: vi.fn((sm) => sm),
}));
vi.mock("../../session-write-lock.js", () => ({
acquireSessionWriteLock: vi.fn(async () => ({ release: vi.fn(async () => {}) })),
}));
vi.mock("../../skills.js", () => ({
applySkillEnvOverrides: vi.fn(() => () => {}),
applySkillEnvOverridesFromSnapshot: vi.fn(() => () => {}),
loadWorkspaceSkillEntries: vi.fn(() => []),
resolveSkillsPromptForRun: vi.fn(() => ""),
}));
vi.mock("../../system-prompt-params.js", () => ({
buildSystemPromptParams: vi.fn(() => ({
runtimeInfo: {},
userTimezone: "UTC",
userTime: "00:00",
userTimeFormat: "24h",
})),
}));
vi.mock("../../system-prompt-report.js", () => ({
buildSystemPromptReport: vi.fn(() => ({ systemPrompt: "" })),
}));
vi.mock("../../transcript-policy.js", () => ({
resolveTranscriptPolicy: vi.fn(() => ({ allowSyntheticToolResults: false })),
}));
vi.mock("../../workspace.js", () => ({
DEFAULT_BOOTSTRAP_FILENAME: "OPENCLAW.md",
}));
vi.mock("../abort.js", () => ({
isAbortError: vi.fn(() => false),
}));
vi.mock("../extensions.js", () => ({
buildEmbeddedExtensionPaths: vi.fn(() => []),
}));
vi.mock("../extra-params.js", () => ({
applyExtraParamsToAgent: vi.fn(() => {}),
}));
vi.mock("../google.js", () => ({
logToolSchemasForGoogle: vi.fn(() => {}),
sanitizeSessionHistory: vi.fn((messages) => messages),
sanitizeToolsForGoogle: vi.fn((tools) => tools),
}));
vi.mock("../history.js", () => ({
getDmHistoryLimitFromSessionKey: vi.fn(() => undefined),
limitHistoryTurns: vi.fn(() => []),
}));
vi.mock("../logger.js", () => ({
log: { debug: vi.fn(), warn: vi.fn(), error: vi.fn() },
}));
vi.mock("../model.js", () => ({
buildModelAliasLines: vi.fn(() => []),
}));
vi.mock("../runs.js", () => ({
clearActiveEmbeddedRun: vi.fn(() => {}),
setActiveEmbeddedRun: vi.fn(() => {}),
}));
vi.mock("../sandbox-info.js", () => ({
buildEmbeddedSandboxInfo: vi.fn(() => undefined),
}));
vi.mock("../session-manager-cache.js", () => ({
prewarmSessionFile: vi.fn(async () => {}),
trackSessionManagerAccess: vi.fn(() => {}),
}));
vi.mock("../session-manager-init.js", () => ({
prepareSessionManagerForRun: vi.fn(async () => {}),
}));
vi.mock("../system-prompt.js", () => ({
applySystemPromptOverrideToSession: vi.fn(() => {}),
buildEmbeddedSystemPrompt: vi.fn(() => ""),
createSystemPromptOverride: vi.fn((prompt: string) => () => prompt),
}));
vi.mock("../tool-split.js", () => ({
splitSdkTools: vi.fn(() => ({ builtInTools: [], customTools: [] })),
}));
vi.mock("../utils.js", () => ({
describeUnknownError: vi.fn((err: unknown) => String(err)),
mapThinkingLevel: vi.fn(() => "off"),
}));
vi.mock("./images.js", () => ({
detectAndLoadPromptImages: vi.fn(async () => ({
images: [],
historyImagesByIndex: new Map(),
})),
}));
import { runEmbeddedAttempt } from "./attempt.js";
const model = {
id: "claude-3",
provider: "anthropic",
api: "messages",
input: ["text"],
} as unknown as Model<Api>;
const baseParams = {
sessionId: "session:test",
sessionKey: "main",
sessionFile: "/tmp/session.jsonl",
workspaceDir: "/tmp/workspace",
prompt: "hi",
provider: "anthropic",
modelId: "claude-3",
model,
authStorage: {},
modelRegistry: {},
thinkLevel: "off",
timeoutMs: 1000,
runId: "run-1",
config: {
agents: {
defaults: {
contextPruning: { mode: "cache-ttl" },
},
},
},
};
describe("runEmbeddedAttempt cache-ttl timing", () => {
beforeEach(() => {
appendCacheTtlTimestamp.mockClear();
isCacheTtlEligibleProvider.mockClear();
didAutoCompaction.mockReset();
waitForCompactionRetry.mockClear();
waitOrder.length = 0;
});
it("skips cache-ttl append when auto-compaction ran", async () => {
didAutoCompaction.mockReturnValue(true);
await runEmbeddedAttempt(baseParams);
expect(waitForCompactionRetry).toHaveBeenCalledTimes(1);
expect(appendCacheTtlTimestamp).not.toHaveBeenCalled();
});
it("appends cache-ttl after compaction retry wait when no auto-compaction ran", async () => {
didAutoCompaction.mockReturnValue(false);
appendCacheTtlTimestamp.mockImplementation(() => {
waitOrder.push("append");
});
await runEmbeddedAttempt(baseParams);
expect(waitForCompactionRetry).toHaveBeenCalledTimes(1);
expect(appendCacheTtlTimestamp).toHaveBeenCalledTimes(1);
expect(waitOrder).toEqual(["wait", "append"]);
});
});

View File

@@ -644,6 +644,7 @@ export async function runEmbeddedAttempt(
toolMetas,
unsubscribe,
waitForCompactionRetry,
didAutoCompaction,
getMessagingToolSentTexts,
getMessagingToolSentTargets,
didSendViaMessagingTool,
@@ -799,17 +800,6 @@ export async function runEmbeddedAttempt(
note: `images: prompt=${imageResult.images.length} history=${imageResult.historyImagesByIndex.size}`,
});
const shouldTrackCacheTtl =
params.config?.agents?.defaults?.contextPruning?.mode === "cache-ttl" &&
isCacheTtlEligibleProvider(params.provider, params.modelId);
if (shouldTrackCacheTtl) {
appendCacheTtlTimestamp(sessionManager, {
timestamp: Date.now(),
provider: params.provider,
modelId: params.modelId,
});
}
// Only pass images option if there are actually images to pass
// This avoids potential issues with models that don't expect the images parameter
if (imageResult.images.length > 0) {
@@ -837,6 +827,17 @@ export async function runEmbeddedAttempt(
}
}
const shouldTrackCacheTtl =
params.config?.agents?.defaults?.contextPruning?.mode === "cache-ttl" &&
isCacheTtlEligibleProvider(params.provider, params.modelId);
if (shouldTrackCacheTtl && !didAutoCompaction()) {
appendCacheTtlTimestamp(sessionManager, {
timestamp: Date.now(),
provider: params.provider,
modelId: params.modelId,
});
}
messagesSnapshot = activeSession.messages.slice();
sessionIdUsed = activeSession.sessionId;
cacheTrace?.recordStage("session:after", {
@@ -906,6 +907,7 @@ export async function runEmbeddedAttempt(
cloudCodeAssistFormatError: Boolean(
lastAssistant?.errorMessage && isCloudCodeAssistFormatError(lastAssistant.errorMessage),
),
didAutoCompaction: didAutoCompaction(),
// Client tool call detected (OpenResponses hosted tools)
clientToolCall: clientToolCallDetected ?? undefined,
};

View File

@@ -106,6 +106,7 @@ export type EmbeddedRunAttemptResult = {
messagingToolSentTexts: string[];
messagingToolSentTargets: MessagingToolSend[];
cloudCodeAssistFormatError: boolean;
didAutoCompaction: boolean;
/** Client tool call detected (OpenResponses hosted tools). */
clientToolCall?: { name: string; params: Record<string, unknown> };
};

View File

@@ -20,9 +20,12 @@ export function handleAgentStart(ctx: EmbeddedPiSubscribeContext) {
}
export function handleAutoCompactionStart(ctx: EmbeddedPiSubscribeContext) {
ctx.state.autoCompactionAttempts += 1;
ctx.state.compactionInFlight = true;
ctx.ensureCompactionPromise();
ctx.log.debug(`embedded run compaction start: runId=${ctx.params.runId}`);
ctx.log.debug(
`[pi-auto-compaction] start: runId=${ctx.params.runId} attempt=${ctx.state.autoCompactionAttempts}`,
);
emitAgentEvent({
runId: ctx.params.runId,
stream: "compaction",
@@ -43,7 +46,7 @@ export function handleAutoCompactionEnd(
if (willRetry) {
ctx.noteCompactionRetry();
ctx.resetForCompactionRetry();
ctx.log.debug(`embedded run compaction retry: runId=${ctx.params.runId}`);
ctx.log.debug(`[pi-auto-compaction] retry: runId=${ctx.params.runId}`);
} else {
ctx.maybeResolveCompactionWait();
}

View File

@@ -54,6 +54,7 @@ export type EmbeddedPiSubscribeState = {
pendingCompactionRetry: number;
compactionRetryResolve?: () => void;
compactionRetryPromise: Promise<void> | null;
autoCompactionAttempts: number;
messagingToolSentTexts: string[];
messagingToolSentTextsNormalized: string[];

View File

@@ -63,6 +63,7 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar
pendingCompactionRetry: 0,
compactionRetryResolve: undefined,
compactionRetryPromise: null,
autoCompactionAttempts: 0,
messagingToolSentTexts: [],
messagingToolSentTextsNormalized: [],
messagingToolSentTargets: [],
@@ -539,6 +540,7 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar
toolMetas,
unsubscribe,
isCompacting: () => state.compactionInFlight || state.pendingCompactionRetry > 0,
didAutoCompaction: () => state.autoCompactionAttempts > 0,
getMessagingToolSentTexts: () => messagingToolSentTexts.slice(),
getMessagingToolSentTargets: () => messagingToolSentTargets.slice(),
// Returns true if any messaging tool successfully sent a message.