diff --git a/src/infra/heartbeat-runner.transcript-prune.test.ts b/src/infra/heartbeat-runner.transcript-prune.test.ts new file mode 100644 index 0000000000..dab50483fc --- /dev/null +++ b/src/infra/heartbeat-runner.transcript-prune.test.ts @@ -0,0 +1,188 @@ +import fs from "node:fs/promises"; +import os from "node:os"; +import path from "node:path"; +import { beforeEach, describe, expect, it, vi } from "vitest"; +import type { OpenClawConfig } from "../config/config.js"; +import { telegramPlugin } from "../../extensions/telegram/src/channel.js"; +import { setTelegramRuntime } from "../../extensions/telegram/src/runtime.js"; +import * as replyModule from "../auto-reply/reply.js"; +import { resolveMainSessionKey } from "../config/sessions.js"; +import { setActivePluginRegistry } from "../plugins/runtime.js"; +import { createPluginRuntime } from "../plugins/runtime/index.js"; +import { createTestRegistry } from "../test-utils/channel-plugins.js"; +import { runHeartbeatOnce } from "./heartbeat-runner.js"; + +// Avoid pulling optional runtime deps during isolated runs. +vi.mock("jiti", () => ({ createJiti: () => () => ({}) })); + +beforeEach(() => { + const runtime = createPluginRuntime(); + setTelegramRuntime(runtime); + setActivePluginRegistry( + createTestRegistry([ + { pluginId: "telegram", plugin: telegramPlugin, source: "test" }, + ]), + ); +}); + +describe("heartbeat transcript pruning", () => { + async function seedSessionStore( + storePath: string, + sessionKey: string, + session: { + sessionId?: string; + updatedAt?: number; + lastChannel: string; + lastProvider: string; + lastTo: string; + }, + ) { + await fs.writeFile( + storePath, + JSON.stringify( + { + [sessionKey]: { + sessionId: session.sessionId ?? "sid", + updatedAt: session.updatedAt ?? Date.now(), + ...session, + }, + }, + null, + 2, + ), + ); + } + + async function createTranscriptWithContent(transcriptPath: string, sessionId: string) { + const header = { + type: "session", + version: 3, + id: sessionId, + timestamp: new Date().toISOString(), + cwd: process.cwd(), + }; + const existingContent = `${JSON.stringify(header)}\n{"role":"user","content":"Hello"}\n{"role":"assistant","content":"Hi there"}\n`; + await fs.mkdir(path.dirname(transcriptPath), { recursive: true }); + await fs.writeFile(transcriptPath, existingContent); + return existingContent; + } + + async function withTempHeartbeatSandbox( + fn: (ctx: { + tmpDir: string; + storePath: string; + replySpy: ReturnType; + }) => Promise, + ) { + const tmpDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-hb-prune-")); + const storePath = path.join(tmpDir, "sessions.json"); + const replySpy = vi.spyOn(replyModule, "getReplyFromConfig"); + const prevTelegramToken = process.env.TELEGRAM_BOT_TOKEN; + process.env.TELEGRAM_BOT_TOKEN = ""; + try { + return await fn({ tmpDir, storePath, replySpy }); + } finally { + replySpy.mockRestore(); + if (prevTelegramToken === undefined) { + delete process.env.TELEGRAM_BOT_TOKEN; + } else { + process.env.TELEGRAM_BOT_TOKEN = prevTelegramToken; + } + await fs.rm(tmpDir, { recursive: true, force: true }); + } + } + + it("prunes transcript when heartbeat returns HEARTBEAT_OK", async () => { + await withTempHeartbeatSandbox(async ({ tmpDir, storePath, replySpy }) => { + const sessionKey = resolveMainSessionKey(undefined); + const sessionId = "test-session-prune"; + const transcriptPath = path.join(tmpDir, `${sessionId}.jsonl`); + + // Create a transcript with some existing content + const originalContent = await createTranscriptWithContent(transcriptPath, sessionId); + const originalSize = (await fs.stat(transcriptPath)).size; + + // Seed session store + await seedSessionStore(storePath, sessionKey, { + sessionId, + lastChannel: "telegram", + lastProvider: "telegram", + lastTo: "user123", + }); + + // Mock reply to return HEARTBEAT_OK (which triggers pruning) + replySpy.mockResolvedValueOnce({ + text: "HEARTBEAT_OK", + usage: { inputTokens: 0, outputTokens: 0, cacheReadTokens: 0, cacheWriteTokens: 0 }, + }); + + // Run heartbeat + const cfg: OpenClawConfig = { + version: 1, + model: "test-model", + agent: { workspace: tmpDir }, + sessionStore: storePath, + channels: { telegram: { showOk: true, showAlerts: true } }, + }; + + await runHeartbeatOnce({ + agentId: undefined, + reason: "test", + cfg, + deps: { sendTelegram: vi.fn() }, + }); + + // Verify transcript was truncated back to original size + const finalContent = await fs.readFile(transcriptPath, "utf-8"); + expect(finalContent).toBe(originalContent); + const finalSize = (await fs.stat(transcriptPath)).size; + expect(finalSize).toBe(originalSize); + }); + }); + + it("does not prune transcript when heartbeat returns meaningful content", async () => { + await withTempHeartbeatSandbox(async ({ tmpDir, storePath, replySpy }) => { + const sessionKey = resolveMainSessionKey(undefined); + const sessionId = "test-session-no-prune"; + const transcriptPath = path.join(tmpDir, `${sessionId}.jsonl`); + + // Create a transcript with some existing content + const originalContent = await createTranscriptWithContent(transcriptPath, sessionId); + const originalSize = (await fs.stat(transcriptPath)).size; + + // Seed session store + await seedSessionStore(storePath, sessionKey, { + sessionId, + lastChannel: "telegram", + lastProvider: "telegram", + lastTo: "user123", + }); + + // Mock reply to return meaningful content (should NOT trigger pruning) + replySpy.mockResolvedValueOnce({ + text: "Alert: Something needs your attention!", + usage: { inputTokens: 10, outputTokens: 20, cacheReadTokens: 0, cacheWriteTokens: 0 }, + }); + + // Run heartbeat + const cfg: OpenClawConfig = { + version: 1, + model: "test-model", + agent: { workspace: tmpDir }, + sessionStore: storePath, + channels: { telegram: { showOk: true, showAlerts: true } }, + }; + + await runHeartbeatOnce({ + agentId: undefined, + reason: "test", + cfg, + deps: { sendTelegram: vi.fn() }, + }); + + // Verify transcript was NOT truncated (it may have grown with new entries) + const finalSize = (await fs.stat(transcriptPath)).size; + expect(finalSize).toBeGreaterThanOrEqual(originalSize); + }); + }); +}); diff --git a/src/infra/heartbeat-runner.ts b/src/infra/heartbeat-runner.ts index c94aae7979..1b28141792 100644 --- a/src/infra/heartbeat-runner.ts +++ b/src/infra/heartbeat-runner.ts @@ -31,6 +31,7 @@ import { loadSessionStore, resolveAgentIdFromSessionKey, resolveAgentMainSessionKey, + resolveSessionFilePath, resolveStorePath, saveSessionStore, updateSessionStore, @@ -351,6 +352,58 @@ async function restoreHeartbeatUpdatedAt(params: { }); } +/** + * Prune heartbeat transcript entries by truncating the file back to a previous size. + * This removes the user+assistant turns that were written during a HEARTBEAT_OK run, + * preventing context pollution from zero-information exchanges. + */ +async function pruneHeartbeatTranscript(params: { + transcriptPath?: string; + preHeartbeatSize?: number; +}) { + const { transcriptPath, preHeartbeatSize } = params; + if (!transcriptPath || typeof preHeartbeatSize !== "number" || preHeartbeatSize < 0) { + return; + } + try { + const stat = await fs.stat(transcriptPath); + // Only truncate if the file has grown during the heartbeat run + if (stat.size > preHeartbeatSize) { + await fs.truncate(transcriptPath, preHeartbeatSize); + } + } catch { + // File may not exist or may have been removed - ignore errors + } +} + +/** + * Get the transcript file path and its current size before a heartbeat run. + * Returns undefined values if the session or transcript doesn't exist yet. + */ +async function captureTranscriptState(params: { + storePath: string; + sessionKey: string; + agentId?: string; +}): Promise<{ transcriptPath?: string; preHeartbeatSize?: number }> { + const { storePath, sessionKey, agentId } = params; + try { + const store = loadSessionStore(storePath); + const entry = store[sessionKey]; + if (!entry?.sessionId) { + return {}; + } + const transcriptPath = resolveSessionFilePath(entry.sessionId, entry, { + agentId, + sessionsDir: path.dirname(storePath), + }); + const stat = await fs.stat(transcriptPath); + return { transcriptPath, preHeartbeatSize: stat.size }; + } catch { + // Session or transcript doesn't exist yet - nothing to prune + return {}; + } +} + function normalizeHeartbeatReply( payload: ReplyPayload, responsePrefix: string | undefined, @@ -546,6 +599,13 @@ export async function runHeartbeatOnce(opts: { }; try { + // Capture transcript state before the heartbeat run so we can prune if HEARTBEAT_OK + const transcriptState = await captureTranscriptState({ + storePath, + sessionKey, + agentId, + }); + const heartbeatModelOverride = heartbeat?.model?.trim() || undefined; const suppressToolErrorWarnings = heartbeat?.suppressToolErrorWarnings === true; const replyOpts = heartbeatModelOverride @@ -567,6 +627,8 @@ export async function runHeartbeatOnce(opts: { sessionKey, updatedAt: previousUpdatedAt, }); + // Prune the transcript to remove HEARTBEAT_OK turns + await pruneHeartbeatTranscript(transcriptState); const okSent = await maybeSendHeartbeatOk(); emitHeartbeatEvent({ status: "ok-empty", @@ -601,6 +663,8 @@ export async function runHeartbeatOnce(opts: { sessionKey, updatedAt: previousUpdatedAt, }); + // Prune the transcript to remove HEARTBEAT_OK turns + await pruneHeartbeatTranscript(transcriptState); const okSent = await maybeSendHeartbeatOk(); emitHeartbeatEvent({ status: "ok-token", @@ -637,6 +701,8 @@ export async function runHeartbeatOnce(opts: { sessionKey, updatedAt: previousUpdatedAt, }); + // Prune the transcript to remove duplicate heartbeat turns + await pruneHeartbeatTranscript(transcriptState); emitHeartbeatEvent({ status: "skipped", reason: "duplicate",