From e19a23520c2dc95bdf62db5e94fa168fee44d371 Mon Sep 17 00:00:00 2001 From: Gustavo Madeira Santana Date: Mon, 9 Feb 2026 23:42:35 -0500 Subject: [PATCH] fix: unify session maintenance and cron run pruning (#13083) * fix: prune stale session entries, cap entry count, and rotate sessions.json The sessions.json file grows unbounded over time. Every heartbeat tick (default: 30m) triggers multiple full rewrites, and session keys from groups, threads, and DMs accumulate indefinitely with large embedded objects (skillsSnapshot, systemPromptReport). At >50MB the synchronous JSON parse blocks the event loop, causing Telegram webhook timeouts and effectively taking the bot down. Three mitigations, all running inside saveSessionStoreUnlocked() on every write: 1. Prune stale entries: remove entries with updatedAt older than 30 days (configurable via session.maintenance.pruneDays in openclaw.json) 2. Cap entry count: keep only the 500 most recently updated entries (configurable via session.maintenance.maxEntries). Entries without updatedAt are evicted first. 3. File rotation: if the existing sessions.json exceeds 10MB before a write, rename it to sessions.json.bak.{timestamp} and keep only the 3 most recent backups (configurable via session.maintenance.rotateBytes). All three thresholds are configurable under session.maintenance in openclaw.json with Zod validation. No env vars. Existing tests updated to use Date.now() instead of epoch-relative timestamps (1, 2, 3) that would be incorrectly pruned as stale. 27 new tests covering pruning, capping, rotation, and integration scenarios. * feat: auto-prune expired cron run sessions (#12289) Add TTL-based reaper for isolated cron run sessions that accumulate indefinitely in sessions.json. New config option: cron.sessionRetention: string | false (default: '24h') The reaper runs piggy-backed on the cron timer tick, self-throttled to sweep at most every 5 minutes. It removes session entries matching the pattern cron::run: whose updatedAt + retention < now. Design follows the Kubernetes ttlSecondsAfterFinished pattern: - Sessions are persisted normally (observability/debugging) - A periodic reaper prunes expired entries - Configurable retention with sensible default - Set to false to disable pruning entirely Files changed: - src/config/types.cron.ts: Add sessionRetention to CronConfig - src/config/zod-schema.ts: Add Zod validation for sessionRetention - src/cron/session-reaper.ts: New reaper module (sweepCronRunSessions) - src/cron/session-reaper.test.ts: 12 tests covering all paths - src/cron/service/state.ts: Add cronConfig/sessionStorePath to deps - src/cron/service/timer.ts: Wire reaper into onTimer tick - src/gateway/server-cron.ts: Pass config and session store path to deps Closes #12289 * fix: sweep cron session stores per agent * docs: add changelog for session maintenance (#13083) (thanks @skyfallsin, @Glucksberg) * fix: add warn-only session maintenance mode * fix: warn-only maintenance defaults to active session * fix: deliver maintenance warnings to active session * docs: add session maintenance examples * fix: accept duration and size maintenance thresholds * refactor: share cron run session key check * fix: format issues and replace defaultRuntime.warn with console.warn --------- Co-authored-by: Pradeep Elankumaran Co-authored-by: Glucksberg Co-authored-by: max <40643627+quotentiroler@users.noreply.github.com> Co-authored-by: quotentiroler --- CHANGELOG.md | 1 + docs/gateway/configuration-examples.md | 7 + docs/gateway/configuration.md | 16 + src/auto-reply/reply/session.ts | 22 +- src/cli/parse-bytes.test.ts | 25 + src/cli/parse-bytes.ts | 46 ++ src/config/sessions.test.ts | 14 +- src/config/sessions/store.pruning.test.ts | 562 ++++++++++++++++++++++ src/config/sessions/store.ts | 342 ++++++++++++- src/config/sessions/transcript.ts | 16 +- src/config/types.base.ts | 17 + src/config/types.cron.ts | 6 + src/config/zod-schema.session.ts | 37 ++ src/config/zod-schema.ts | 1 + src/cron/service/state.ts | 9 + src/cron/service/timer.ts | 34 ++ src/cron/session-reaper.test.ts | 203 ++++++++ src/cron/session-reaper.ts | 115 +++++ src/gateway/server-cron.ts | 12 + src/gateway/session-utils.ts | 7 +- src/infra/session-maintenance-warning.ts | 108 +++++ src/infra/state-migrations.ts | 4 +- src/sessions/session-key-utils.ts | 8 + 23 files changed, 1566 insertions(+), 46 deletions(-) create mode 100644 src/cli/parse-bytes.test.ts create mode 100644 src/cli/parse-bytes.ts create mode 100644 src/config/sessions/store.pruning.test.ts create mode 100644 src/cron/session-reaper.test.ts create mode 100644 src/cron/session-reaper.ts create mode 100644 src/infra/session-maintenance-warning.ts diff --git a/CHANGELOG.md b/CHANGELOG.md index 59247aa400..197271feaa 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,6 +18,7 @@ Docs: https://docs.openclaw.ai ### Fixes +- Sessions: prune stale entries, cap session store size, rotate large stores, accept duration/size thresholds, default to warn-only maintenance, and prune cron run sessions after retention windows. (#13083) Thanks @skyfallsin, @Glucksberg, @gumadeiras. - CI: Implement pipeline and workflow order. Thanks @quotentiroler. - WhatsApp: preserve original filenames for inbound documents. (#12691) Thanks @akramcodez. - Telegram: harden quote parsing; preserve quote context; avoid QUOTE_TEXT_INVALID; avoid nested reply quote misclassification. (#12156) Thanks @rybnikov. diff --git a/docs/gateway/configuration-examples.md b/docs/gateway/configuration-examples.md index 79b6d2acd1..ac3f992930 100644 --- a/docs/gateway/configuration-examples.md +++ b/docs/gateway/configuration-examples.md @@ -160,6 +160,12 @@ Save to `~/.openclaw/openclaw.json` and you can DM the bot from that number. }, resetTriggers: ["/new", "/reset"], store: "~/.openclaw/agents/default/sessions/sessions.json", + maintenance: { + mode: "warn", + pruneAfter: "30d", + maxEntries: 500, + rotateBytes: "10mb", + }, typingIntervalSeconds: 5, sendPolicy: { default: "allow", @@ -344,6 +350,7 @@ Save to `~/.openclaw/openclaw.json` and you can DM the bot from that number. enabled: true, store: "~/.openclaw/cron/cron.json", maxConcurrentRuns: 2, + sessionRetention: "24h", }, // Webhooks diff --git a/docs/gateway/configuration.md b/docs/gateway/configuration.md index 8bb61e65c0..31c115039b 100644 --- a/docs/gateway/configuration.md +++ b/docs/gateway/configuration.md @@ -2767,6 +2767,12 @@ Controls session scoping, reset policy, reset triggers, and where the session st // Default is already per-agent under ~/.openclaw/agents//sessions/sessions.json // You can override with {agentId} templating: store: "~/.openclaw/agents/{agentId}/sessions/sessions.json", + maintenance: { + mode: "warn", + pruneAfter: "30d", + maxEntries: 500, + rotateBytes: "10mb", + }, // Direct chats collapse to agent:: (default: "main"). mainKey: "main", agentToAgent: { @@ -2803,6 +2809,11 @@ Fields: - `agentToAgent.maxPingPongTurns`: max reply-back turns between requester/target (0–5, default 5). - `sendPolicy.default`: `allow` or `deny` fallback when no rule matches. - `sendPolicy.rules[]`: match by `channel`, `chatType` (`direct|group|room`), or `keyPrefix` (e.g. `cron:`). First deny wins; otherwise allow. +- `maintenance`: session store maintenance settings for pruning, capping, and rotation. + - `mode`: `"warn"` (default) warns the active session (best-effort delivery) when it would be evicted without enforcing maintenance. `"enforce"` applies pruning and rotation. + - `pruneAfter`: remove entries older than this duration (for example `"30m"`, `"1h"`, `"30d"`). Default "30d". + - `maxEntries`: cap the number of session entries kept (default 500). + - `rotateBytes`: rotate `sessions.json` when it exceeds this size (for example `"10kb"`, `"1mb"`, `"10mb"`). Default "10mb". ### `skills` (skills config) @@ -3407,10 +3418,15 @@ Cron is a Gateway-owned scheduler for wakeups and scheduled jobs. See [Cron jobs cron: { enabled: true, maxConcurrentRuns: 2, + sessionRetention: "24h", }, } ``` +Fields: + +- `sessionRetention`: how long to keep completed cron run sessions before pruning. Accepts a duration string like `"24h"` or `"7d"`. Use `false` to disable pruning. Default is 24h. + --- _Next: [Agent Runtime](/concepts/agent)_ 🦞 diff --git a/src/auto-reply/reply/session.ts b/src/auto-reply/reply/session.ts index d3de9ef3fb..a1491da0aa 100644 --- a/src/auto-reply/reply/session.ts +++ b/src/auto-reply/reply/session.ts @@ -26,6 +26,7 @@ import { type SessionScope, updateSessionStore, } from "../../config/sessions.js"; +import { deliverSessionMaintenanceWarning } from "../../infra/session-maintenance-warning.js"; import { normalizeMainKey } from "../../routing/session-key.js"; import { normalizeSessionDeliveryFields } from "../../utils/delivery-context.js"; import { resolveCommandAuthorization } from "../command-auth.js"; @@ -347,10 +348,23 @@ export async function initSessionState(params: { } // Preserve per-session overrides while resetting compaction state on /new. sessionStore[sessionKey] = { ...sessionStore[sessionKey], ...sessionEntry }; - await updateSessionStore(storePath, (store) => { - // Preserve per-session overrides while resetting compaction state on /new. - store[sessionKey] = { ...store[sessionKey], ...sessionEntry }; - }); + await updateSessionStore( + storePath, + (store) => { + // Preserve per-session overrides while resetting compaction state on /new. + store[sessionKey] = { ...store[sessionKey], ...sessionEntry }; + }, + { + activeSessionKey: sessionKey, + onWarn: (warning) => + deliverSessionMaintenanceWarning({ + cfg, + sessionKey, + entry: sessionEntry, + warning, + }), + }, + ); const sessionCtx: TemplateContext = { ...ctx, diff --git a/src/cli/parse-bytes.test.ts b/src/cli/parse-bytes.test.ts new file mode 100644 index 0000000000..a0c1abcb0b --- /dev/null +++ b/src/cli/parse-bytes.test.ts @@ -0,0 +1,25 @@ +import { describe, expect, it } from "vitest"; +import { parseByteSize } from "./parse-bytes.js"; + +describe("parseByteSize", () => { + it("parses bytes with units", () => { + expect(parseByteSize("10kb")).toBe(10 * 1024); + expect(parseByteSize("1mb")).toBe(1024 * 1024); + expect(parseByteSize("2gb")).toBe(2 * 1024 * 1024 * 1024); + }); + + it("parses shorthand units", () => { + expect(parseByteSize("5k")).toBe(5 * 1024); + expect(parseByteSize("1m")).toBe(1024 * 1024); + }); + + it("uses default unit when omitted", () => { + expect(parseByteSize("123")).toBe(123); + }); + + it("rejects invalid values", () => { + expect(() => parseByteSize("")).toThrow(); + expect(() => parseByteSize("nope")).toThrow(); + expect(() => parseByteSize("-5kb")).toThrow(); + }); +}); diff --git a/src/cli/parse-bytes.ts b/src/cli/parse-bytes.ts new file mode 100644 index 0000000000..db993a292f --- /dev/null +++ b/src/cli/parse-bytes.ts @@ -0,0 +1,46 @@ +export type BytesParseOptions = { + defaultUnit?: "b" | "kb" | "mb" | "gb" | "tb"; +}; + +const UNIT_MULTIPLIERS: Record = { + b: 1, + kb: 1024, + k: 1024, + mb: 1024 ** 2, + m: 1024 ** 2, + gb: 1024 ** 3, + g: 1024 ** 3, + tb: 1024 ** 4, + t: 1024 ** 4, +}; + +export function parseByteSize(raw: string, opts?: BytesParseOptions): number { + const trimmed = String(raw ?? "") + .trim() + .toLowerCase(); + if (!trimmed) { + throw new Error("invalid byte size (empty)"); + } + + const m = /^(\d+(?:\.\d+)?)([a-z]+)?$/.exec(trimmed); + if (!m) { + throw new Error(`invalid byte size: ${raw}`); + } + + const value = Number(m[1]); + if (!Number.isFinite(value) || value < 0) { + throw new Error(`invalid byte size: ${raw}`); + } + + const unit = (m[2] ?? opts?.defaultUnit ?? "b").toLowerCase(); + const multiplier = UNIT_MULTIPLIERS[unit]; + if (!multiplier) { + throw new Error(`invalid byte size unit: ${raw}`); + } + + const bytes = Math.round(value * multiplier); + if (!Number.isFinite(bytes)) { + throw new Error(`invalid byte size: ${raw}`); + } + return bytes; +} diff --git a/src/config/sessions.test.ts b/src/config/sessions.test.ts index c6f92246e0..d46c7e97ce 100644 --- a/src/config/sessions.test.ts +++ b/src/config/sessions.test.ts @@ -288,10 +288,10 @@ describe("sessions", () => { await Promise.all([ updateSessionStore(storePath, (store) => { - store["agent:main:one"] = { sessionId: "sess-1", updatedAt: 1 }; + store["agent:main:one"] = { sessionId: "sess-1", updatedAt: Date.now() }; }), updateSessionStore(storePath, (store) => { - store["agent:main:two"] = { sessionId: "sess-2", updatedAt: 2 }; + store["agent:main:two"] = { sessionId: "sess-2", updatedAt: Date.now() }; }), ]); @@ -306,7 +306,7 @@ describe("sessions", () => { await fs.writeFile(storePath, "[]", "utf-8"); await updateSessionStore(storePath, (store) => { - store["agent:main:main"] = { sessionId: "sess-1", updatedAt: 1 }; + store["agent:main:main"] = { sessionId: "sess-1", updatedAt: Date.now() }; }); const store = loadSessionStore(storePath); @@ -324,7 +324,7 @@ describe("sessions", () => { await updateSessionStore(storePath, (store) => { store["agent:main:main"] = { sessionId: "sess-normalized", - updatedAt: 1, + updatedAt: Date.now(), lastChannel: " WhatsApp ", lastTo: " +1555 ", lastAccountId: " acct-1 ", @@ -349,8 +349,8 @@ describe("sessions", () => { storePath, JSON.stringify( { - "agent:main:old": { sessionId: "sess-old", updatedAt: 1 }, - "agent:main:keep": { sessionId: "sess-keep", updatedAt: 2 }, + "agent:main:old": { sessionId: "sess-old", updatedAt: Date.now() }, + "agent:main:keep": { sessionId: "sess-keep", updatedAt: Date.now() }, }, null, 2, @@ -363,7 +363,7 @@ describe("sessions", () => { delete store["agent:main:old"]; }), updateSessionStore(storePath, (store) => { - store["agent:main:new"] = { sessionId: "sess-new", updatedAt: 3 }; + store["agent:main:new"] = { sessionId: "sess-new", updatedAt: Date.now() }; }), ]); diff --git a/src/config/sessions/store.pruning.test.ts b/src/config/sessions/store.pruning.test.ts new file mode 100644 index 0000000000..4a977a61ca --- /dev/null +++ b/src/config/sessions/store.pruning.test.ts @@ -0,0 +1,562 @@ +import crypto from "node:crypto"; +import fs from "node:fs/promises"; +import os from "node:os"; +import path from "node:path"; +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; +import type { SessionEntry } from "./types.js"; +import { + capEntryCount, + clearSessionStoreCacheForTest, + loadSessionStore, + pruneStaleEntries, + rotateSessionFile, + saveSessionStore, +} from "./store.js"; + +// Mock loadConfig so resolveMaintenanceConfig() never reads a real openclaw.json. +// Unit tests always pass explicit overrides so this mock is inert for them. +// Integration tests set return values to control the config. +vi.mock("../config.js", () => ({ + loadConfig: vi.fn().mockReturnValue({}), +})); + +const DAY_MS = 24 * 60 * 60 * 1000; + +function makeEntry(updatedAt: number): SessionEntry { + return { sessionId: crypto.randomUUID(), updatedAt }; +} + +function makeStore(entries: Array<[string, SessionEntry]>): Record { + return Object.fromEntries(entries); +} + +// --------------------------------------------------------------------------- +// Unit tests — each function called with explicit override parameters. +// No config loading needed; overrides bypass resolveMaintenanceConfig(). +// --------------------------------------------------------------------------- + +describe("pruneStaleEntries", () => { + it("removes entries older than maxAgeDays", () => { + const now = Date.now(); + const store = makeStore([ + ["old", makeEntry(now - 31 * DAY_MS)], + ["fresh", makeEntry(now - 1 * DAY_MS)], + ]); + + const pruned = pruneStaleEntries(store, 30 * DAY_MS); + + expect(pruned).toBe(1); + expect(store.old).toBeUndefined(); + expect(store.fresh).toBeDefined(); + }); + + it("keeps entries newer than maxAgeDays", () => { + const now = Date.now(); + const store = makeStore([ + ["a", makeEntry(now - 1 * DAY_MS)], + ["b", makeEntry(now - 6 * DAY_MS)], + ["c", makeEntry(now)], + ]); + + const pruned = pruneStaleEntries(store, 7 * DAY_MS); + + expect(pruned).toBe(0); + expect(Object.keys(store)).toHaveLength(3); + }); + + it("keeps entries with no updatedAt", () => { + const store: Record = { + noDate: { sessionId: crypto.randomUUID() } as SessionEntry, + fresh: makeEntry(Date.now()), + }; + + const pruned = pruneStaleEntries(store, 1 * DAY_MS); + + expect(pruned).toBe(0); + expect(store.noDate).toBeDefined(); + }); + + it("empty store is a no-op", () => { + const store: Record = {}; + const pruned = pruneStaleEntries(store, 30 * DAY_MS); + + expect(pruned).toBe(0); + expect(Object.keys(store)).toHaveLength(0); + }); + + it("all entries stale results in empty store", () => { + const now = Date.now(); + const store = makeStore([ + ["a", makeEntry(now - 10 * DAY_MS)], + ["b", makeEntry(now - 20 * DAY_MS)], + ["c", makeEntry(now - 100 * DAY_MS)], + ]); + + const pruned = pruneStaleEntries(store, 5 * DAY_MS); + + expect(pruned).toBe(3); + expect(Object.keys(store)).toHaveLength(0); + }); + + it("returns count of pruned entries", () => { + const now = Date.now(); + const store = makeStore([ + ["stale1", makeEntry(now - 15 * DAY_MS)], + ["stale2", makeEntry(now - 30 * DAY_MS)], + ["fresh1", makeEntry(now - 5 * DAY_MS)], + ["fresh2", makeEntry(now)], + ]); + + const pruned = pruneStaleEntries(store, 10 * DAY_MS); + + expect(pruned).toBe(2); + expect(Object.keys(store)).toHaveLength(2); + }); + + it("entry exactly at the boundary is kept", () => { + const now = Date.now(); + const store = makeStore([["borderline", makeEntry(now - 30 * DAY_MS + 1000)]]); + + const pruned = pruneStaleEntries(store, 30 * DAY_MS); + + expect(pruned).toBe(0); + expect(store.borderline).toBeDefined(); + }); + + it("falls back to built-in default (30 days) when no override given", () => { + const now = Date.now(); + const store = makeStore([ + ["old", makeEntry(now - 31 * DAY_MS)], + ["fresh", makeEntry(now - 29 * DAY_MS)], + ]); + + // loadConfig mock returns {} → maintenance is undefined → default 30 days + const pruned = pruneStaleEntries(store); + + expect(pruned).toBe(1); + expect(store.old).toBeUndefined(); + expect(store.fresh).toBeDefined(); + }); +}); + +describe("capEntryCount", () => { + it("over limit: keeps N most recent by updatedAt, deletes rest", () => { + const now = Date.now(); + const store = makeStore([ + ["oldest", makeEntry(now - 4 * DAY_MS)], + ["old", makeEntry(now - 3 * DAY_MS)], + ["mid", makeEntry(now - 2 * DAY_MS)], + ["recent", makeEntry(now - 1 * DAY_MS)], + ["newest", makeEntry(now)], + ]); + + const evicted = capEntryCount(store, 3); + + expect(evicted).toBe(2); + expect(Object.keys(store)).toHaveLength(3); + expect(store.newest).toBeDefined(); + expect(store.recent).toBeDefined(); + expect(store.mid).toBeDefined(); + expect(store.oldest).toBeUndefined(); + expect(store.old).toBeUndefined(); + }); + + it("under limit: no-op", () => { + const store = makeStore([ + ["a", makeEntry(Date.now())], + ["b", makeEntry(Date.now() - DAY_MS)], + ]); + + const evicted = capEntryCount(store, 10); + + expect(evicted).toBe(0); + expect(Object.keys(store)).toHaveLength(2); + }); + + it("exactly at limit: no-op", () => { + const now = Date.now(); + const store = makeStore([ + ["a", makeEntry(now)], + ["b", makeEntry(now - DAY_MS)], + ["c", makeEntry(now - 2 * DAY_MS)], + ]); + + const evicted = capEntryCount(store, 3); + + expect(evicted).toBe(0); + expect(Object.keys(store)).toHaveLength(3); + }); + + it("entries without updatedAt are evicted first (lowest priority)", () => { + const now = Date.now(); + const store: Record = { + noDate1: { sessionId: crypto.randomUUID() } as SessionEntry, + noDate2: { sessionId: crypto.randomUUID() } as SessionEntry, + recent: makeEntry(now), + older: makeEntry(now - DAY_MS), + }; + + const evicted = capEntryCount(store, 2); + + expect(evicted).toBe(2); + expect(store.recent).toBeDefined(); + expect(store.older).toBeDefined(); + expect(store.noDate1).toBeUndefined(); + expect(store.noDate2).toBeUndefined(); + }); + + it("returns count of evicted entries", () => { + const now = Date.now(); + const store = makeStore([ + ["a", makeEntry(now)], + ["b", makeEntry(now - DAY_MS)], + ["c", makeEntry(now - 2 * DAY_MS)], + ]); + + const evicted = capEntryCount(store, 1); + + expect(evicted).toBe(2); + expect(Object.keys(store)).toHaveLength(1); + expect(store.a).toBeDefined(); + }); + + it("falls back to built-in default (500) when no override given", () => { + const now = Date.now(); + const entries: Array<[string, SessionEntry]> = []; + for (let i = 0; i < 501; i++) { + entries.push([`key-${i}`, makeEntry(now - i * 1000)]); + } + const store = makeStore(entries); + + // loadConfig mock returns {} → maintenance is undefined → default 500 + const evicted = capEntryCount(store); + + expect(evicted).toBe(1); + expect(Object.keys(store)).toHaveLength(500); + expect(store["key-0"]).toBeDefined(); + expect(store["key-500"]).toBeUndefined(); + }); + + it("empty store is a no-op", () => { + const store: Record = {}; + + const evicted = capEntryCount(store, 5); + + expect(evicted).toBe(0); + }); +}); + +describe("rotateSessionFile", () => { + let testDir: string; + let storePath: string; + + beforeEach(async () => { + testDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-rotate-")); + storePath = path.join(testDir, "sessions.json"); + }); + + afterEach(async () => { + await fs.rm(testDir, { recursive: true, force: true }).catch(() => undefined); + }); + + it("file under maxBytes: no rotation (returns false)", async () => { + await fs.writeFile(storePath, "x".repeat(500), "utf-8"); + + const rotated = await rotateSessionFile(storePath, 1000); + + expect(rotated).toBe(false); + const content = await fs.readFile(storePath, "utf-8"); + expect(content).toBe("x".repeat(500)); + }); + + it("file over maxBytes: renamed to .bak.{timestamp}, returns true", async () => { + const bigContent = "x".repeat(200); + await fs.writeFile(storePath, bigContent, "utf-8"); + + const rotated = await rotateSessionFile(storePath, 100); + + expect(rotated).toBe(true); + await expect(fs.stat(storePath)).rejects.toThrow(); + const files = await fs.readdir(testDir); + const bakFiles = files.filter((f) => f.startsWith("sessions.json.bak.")); + expect(bakFiles).toHaveLength(1); + const bakContent = await fs.readFile(path.join(testDir, bakFiles[0]), "utf-8"); + expect(bakContent).toBe(bigContent); + }); + + it("multiple rotations: only keeps 3 most recent .bak files", async () => { + for (let i = 0; i < 5; i++) { + await fs.writeFile(storePath, `data-${i}-${"x".repeat(100)}`, "utf-8"); + await rotateSessionFile(storePath, 50); + await new Promise((r) => setTimeout(r, 5)); + } + + const files = await fs.readdir(testDir); + const bakFiles = files.filter((f) => f.startsWith("sessions.json.bak.")).toSorted(); + + expect(bakFiles.length).toBeLessThanOrEqual(3); + }); + + it("non-existent file: no rotation (returns false)", async () => { + const missingPath = path.join(testDir, "missing.json"); + + const rotated = await rotateSessionFile(missingPath, 100); + + expect(rotated).toBe(false); + }); + + it("file exactly at maxBytes: no rotation (returns false)", async () => { + await fs.writeFile(storePath, "x".repeat(100), "utf-8"); + + const rotated = await rotateSessionFile(storePath, 100); + + expect(rotated).toBe(false); + }); + + it("backup file name includes a timestamp", async () => { + await fs.writeFile(storePath, "x".repeat(100), "utf-8"); + const before = Date.now(); + + await rotateSessionFile(storePath, 50); + + const after = Date.now(); + const files = await fs.readdir(testDir); + const bakFiles = files.filter((f) => f.startsWith("sessions.json.bak.")); + expect(bakFiles).toHaveLength(1); + const timestamp = Number(bakFiles[0].replace("sessions.json.bak.", "")); + expect(timestamp).toBeGreaterThanOrEqual(before); + expect(timestamp).toBeLessThanOrEqual(after); + }); +}); + +// --------------------------------------------------------------------------- +// Integration tests — exercise saveSessionStore end-to-end. +// The file-level vi.mock("../config.js") stubs loadConfig; per-test +// mockReturnValue controls what resolveMaintenanceConfig() returns. +// --------------------------------------------------------------------------- + +describe("Integration: saveSessionStore with pruning", () => { + let testDir: string; + let storePath: string; + let savedCacheTtl: string | undefined; + let mockLoadConfig: ReturnType; + + beforeEach(async () => { + testDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-pruning-integ-")); + storePath = path.join(testDir, "sessions.json"); + savedCacheTtl = process.env.OPENCLAW_SESSION_CACHE_TTL_MS; + process.env.OPENCLAW_SESSION_CACHE_TTL_MS = "0"; + clearSessionStoreCacheForTest(); + + const configModule = await import("../config.js"); + mockLoadConfig = configModule.loadConfig as ReturnType; + }); + + afterEach(async () => { + vi.restoreAllMocks(); + await fs.rm(testDir, { recursive: true, force: true }).catch(() => undefined); + clearSessionStoreCacheForTest(); + if (savedCacheTtl === undefined) { + delete process.env.OPENCLAW_SESSION_CACHE_TTL_MS; + } else { + process.env.OPENCLAW_SESSION_CACHE_TTL_MS = savedCacheTtl; + } + }); + + it("saveSessionStore prunes stale entries on write", async () => { + mockLoadConfig.mockReturnValue({ + session: { + maintenance: { + mode: "enforce", + pruneAfter: "7d", + maxEntries: 500, + rotateBytes: 10_485_760, + }, + }, + }); + + const now = Date.now(); + const store: Record = { + stale: makeEntry(now - 30 * DAY_MS), + fresh: makeEntry(now), + }; + + await saveSessionStore(storePath, store); + + const loaded = loadSessionStore(storePath); + expect(loaded.stale).toBeUndefined(); + expect(loaded.fresh).toBeDefined(); + }); + + it("saveSessionStore caps entries over limit", async () => { + mockLoadConfig.mockReturnValue({ + session: { + maintenance: { + mode: "enforce", + pruneAfter: "30d", + maxEntries: 5, + rotateBytes: 10_485_760, + }, + }, + }); + + const now = Date.now(); + const store: Record = {}; + for (let i = 0; i < 10; i++) { + store[`key-${i}`] = makeEntry(now - i * 1000); + } + + await saveSessionStore(storePath, store); + + const loaded = loadSessionStore(storePath); + expect(Object.keys(loaded)).toHaveLength(5); + for (let i = 0; i < 5; i++) { + expect(loaded[`key-${i}`]).toBeDefined(); + } + for (let i = 5; i < 10; i++) { + expect(loaded[`key-${i}`]).toBeUndefined(); + } + }); + + it("saveSessionStore rotates file when over size limit and creates .bak", async () => { + mockLoadConfig.mockReturnValue({ + session: { + maintenance: { + mode: "enforce", + pruneAfter: "30d", + maxEntries: 500, + rotateBytes: "100b", + }, + }, + }); + + const now = Date.now(); + const largeStore: Record = {}; + for (let i = 0; i < 50; i++) { + largeStore[`agent:main:session-${crypto.randomUUID()}`] = makeEntry(now - i * 1000); + } + await fs.mkdir(path.dirname(storePath), { recursive: true }); + await fs.writeFile(storePath, JSON.stringify(largeStore, null, 2), "utf-8"); + + const statBefore = await fs.stat(storePath); + expect(statBefore.size).toBeGreaterThan(100); + + const smallStore: Record = { + only: makeEntry(now), + }; + await saveSessionStore(storePath, smallStore); + + const files = await fs.readdir(testDir); + const bakFiles = files.filter((f) => f.startsWith("sessions.json.bak.")); + expect(bakFiles.length).toBeGreaterThanOrEqual(1); + + const loaded = loadSessionStore(storePath); + expect(loaded.only).toBeDefined(); + }); + + it("saveSessionStore applies both pruning and capping together", async () => { + mockLoadConfig.mockReturnValue({ + session: { + maintenance: { + mode: "enforce", + pruneAfter: "10d", + maxEntries: 3, + rotateBytes: 10_485_760, + }, + }, + }); + + const now = Date.now(); + const store: Record = { + stale1: makeEntry(now - 15 * DAY_MS), + stale2: makeEntry(now - 20 * DAY_MS), + fresh1: makeEntry(now), + fresh2: makeEntry(now - 1 * DAY_MS), + fresh3: makeEntry(now - 2 * DAY_MS), + fresh4: makeEntry(now - 5 * DAY_MS), + }; + + await saveSessionStore(storePath, store); + + const loaded = loadSessionStore(storePath); + expect(loaded.stale1).toBeUndefined(); + expect(loaded.stale2).toBeUndefined(); + expect(Object.keys(loaded).length).toBeLessThanOrEqual(3); + expect(loaded.fresh1).toBeDefined(); + expect(loaded.fresh2).toBeDefined(); + expect(loaded.fresh3).toBeDefined(); + expect(loaded.fresh4).toBeUndefined(); + }); + + it("saveSessionStore skips enforcement when maintenance mode is warn", async () => { + mockLoadConfig.mockReturnValue({ + session: { + maintenance: { + mode: "warn", + pruneAfter: "7d", + maxEntries: 1, + rotateBytes: 10_485_760, + }, + }, + }); + + const now = Date.now(); + const store: Record = { + stale: makeEntry(now - 30 * DAY_MS), + fresh: makeEntry(now), + }; + + await saveSessionStore(storePath, store); + + const loaded = loadSessionStore(storePath); + expect(loaded.stale).toBeDefined(); + expect(loaded.fresh).toBeDefined(); + expect(Object.keys(loaded)).toHaveLength(2); + }); + + it("resolveMaintenanceConfig reads from loadConfig().session.maintenance", async () => { + mockLoadConfig.mockReturnValue({ + session: { + maintenance: { pruneAfter: "7d", maxEntries: 100, rotateBytes: "5mb" }, + }, + }); + + const { resolveMaintenanceConfig } = await import("./store.js"); + const config = resolveMaintenanceConfig(); + + expect(config).toEqual({ + mode: "warn", + pruneAfterMs: 7 * DAY_MS, + maxEntries: 100, + rotateBytes: 5 * 1024 * 1024, + }); + }); + + it("resolveMaintenanceConfig uses defaults for missing fields", async () => { + mockLoadConfig.mockReturnValue({ session: { maintenance: { pruneAfter: "14d" } } }); + + const { resolveMaintenanceConfig } = await import("./store.js"); + const config = resolveMaintenanceConfig(); + + expect(config).toEqual({ + mode: "warn", + pruneAfterMs: 14 * DAY_MS, + maxEntries: 500, + rotateBytes: 10_485_760, + }); + }); + + it("resolveMaintenanceConfig falls back to deprecated pruneDays", async () => { + mockLoadConfig.mockReturnValue({ session: { maintenance: { pruneDays: 2 } } }); + + const { resolveMaintenanceConfig } = await import("./store.js"); + const config = resolveMaintenanceConfig(); + + expect(config).toEqual({ + mode: "warn", + pruneAfterMs: 2 * DAY_MS, + maxEntries: 500, + rotateBytes: 10_485_760, + }); + }); +}); diff --git a/src/config/sessions/store.ts b/src/config/sessions/store.ts index a7fe48e144..5aea98d4ed 100644 --- a/src/config/sessions/store.ts +++ b/src/config/sessions/store.ts @@ -3,6 +3,10 @@ import crypto from "node:crypto"; import fs from "node:fs"; import path from "node:path"; import type { MsgContext } from "../../auto-reply/templating.js"; +import type { SessionMaintenanceConfig, SessionMaintenanceMode } from "../types.base.js"; +import { parseByteSize } from "../../cli/parse-bytes.js"; +import { parseDurationMs } from "../../cli/parse-duration.js"; +import { createSubsystemLogger } from "../../logging/subsystem.js"; import { deliveryContextFromSession, mergeDeliveryContext, @@ -11,9 +15,12 @@ import { type DeliveryContext, } from "../../utils/delivery-context.js"; import { getFileMtimeMs, isCacheEnabled, resolveCacheTtlMs } from "../cache-utils.js"; +import { loadConfig } from "../config.js"; import { deriveSessionMetaPatch } from "./metadata.js"; import { mergeSessionEntry, type SessionEntry } from "./types.js"; +const log = createSubsystemLogger("sessions/store"); + // ============================================================================ // Session Store Cache with TTL Support // ============================================================================ @@ -195,15 +202,300 @@ export function readSessionUpdatedAt(params: { } } +// ============================================================================ +// Session Store Pruning, Capping & File Rotation +// ============================================================================ + +const DEFAULT_SESSION_PRUNE_AFTER_MS = 30 * 24 * 60 * 60 * 1000; +const DEFAULT_SESSION_MAX_ENTRIES = 500; +const DEFAULT_SESSION_ROTATE_BYTES = 10_485_760; // 10 MB +const DEFAULT_SESSION_MAINTENANCE_MODE: SessionMaintenanceMode = "warn"; + +export type SessionMaintenanceWarning = { + activeSessionKey: string; + activeUpdatedAt?: number; + totalEntries: number; + pruneAfterMs: number; + maxEntries: number; + wouldPrune: boolean; + wouldCap: boolean; +}; + +type ResolvedSessionMaintenanceConfig = { + mode: SessionMaintenanceMode; + pruneAfterMs: number; + maxEntries: number; + rotateBytes: number; +}; + +function resolvePruneAfterMs(maintenance?: SessionMaintenanceConfig): number { + const raw = maintenance?.pruneAfter ?? maintenance?.pruneDays; + if (raw === undefined || raw === null || raw === "") { + return DEFAULT_SESSION_PRUNE_AFTER_MS; + } + try { + return parseDurationMs(String(raw).trim(), { defaultUnit: "d" }); + } catch { + return DEFAULT_SESSION_PRUNE_AFTER_MS; + } +} + +function resolveRotateBytes(maintenance?: SessionMaintenanceConfig): number { + const raw = maintenance?.rotateBytes; + if (raw === undefined || raw === null || raw === "") { + return DEFAULT_SESSION_ROTATE_BYTES; + } + try { + return parseByteSize(String(raw).trim(), { defaultUnit: "b" }); + } catch { + return DEFAULT_SESSION_ROTATE_BYTES; + } +} + +/** + * Resolve maintenance settings from openclaw.json (`session.maintenance`). + * Falls back to built-in defaults when config is missing or unset. + */ +export function resolveMaintenanceConfig(): ResolvedSessionMaintenanceConfig { + let maintenance: SessionMaintenanceConfig | undefined; + try { + maintenance = loadConfig().session?.maintenance; + } catch { + // Config may not be available (e.g. in tests). Use defaults. + } + return { + mode: maintenance?.mode ?? DEFAULT_SESSION_MAINTENANCE_MODE, + pruneAfterMs: resolvePruneAfterMs(maintenance), + maxEntries: maintenance?.maxEntries ?? DEFAULT_SESSION_MAX_ENTRIES, + rotateBytes: resolveRotateBytes(maintenance), + }; +} + +/** + * Remove entries whose `updatedAt` is older than the configured threshold. + * Entries without `updatedAt` are kept (cannot determine staleness). + * Mutates `store` in-place. + */ +export function pruneStaleEntries( + store: Record, + overrideMaxAgeMs?: number, + opts: { log?: boolean } = {}, +): number { + const maxAgeMs = overrideMaxAgeMs ?? resolveMaintenanceConfig().pruneAfterMs; + const cutoffMs = Date.now() - maxAgeMs; + let pruned = 0; + for (const [key, entry] of Object.entries(store)) { + if (entry?.updatedAt != null && entry.updatedAt < cutoffMs) { + delete store[key]; + pruned++; + } + } + if (pruned > 0 && opts.log !== false) { + log.info("pruned stale session entries", { pruned, maxAgeMs }); + } + return pruned; +} + +/** + * Cap the store to the N most recently updated entries. + * Entries without `updatedAt` are sorted last (removed first when over limit). + * Mutates `store` in-place. + */ +function getEntryUpdatedAt(entry?: SessionEntry): number { + return entry?.updatedAt ?? Number.NEGATIVE_INFINITY; +} + +export function getActiveSessionMaintenanceWarning(params: { + store: Record; + activeSessionKey: string; + pruneAfterMs: number; + maxEntries: number; + nowMs?: number; +}): SessionMaintenanceWarning | null { + const activeSessionKey = params.activeSessionKey.trim(); + if (!activeSessionKey) { + return null; + } + const activeEntry = params.store[activeSessionKey]; + if (!activeEntry) { + return null; + } + const now = params.nowMs ?? Date.now(); + const cutoffMs = now - params.pruneAfterMs; + const wouldPrune = activeEntry.updatedAt != null ? activeEntry.updatedAt < cutoffMs : false; + const keys = Object.keys(params.store); + const wouldCap = + keys.length > params.maxEntries && + keys + .toSorted((a, b) => getEntryUpdatedAt(params.store[b]) - getEntryUpdatedAt(params.store[a])) + .slice(params.maxEntries) + .includes(activeSessionKey); + + if (!wouldPrune && !wouldCap) { + return null; + } + + return { + activeSessionKey, + activeUpdatedAt: activeEntry.updatedAt, + totalEntries: keys.length, + pruneAfterMs: params.pruneAfterMs, + maxEntries: params.maxEntries, + wouldPrune, + wouldCap, + }; +} + +export function capEntryCount( + store: Record, + overrideMax?: number, + opts: { log?: boolean } = {}, +): number { + const maxEntries = overrideMax ?? resolveMaintenanceConfig().maxEntries; + const keys = Object.keys(store); + if (keys.length <= maxEntries) { + return 0; + } + + // Sort by updatedAt descending; entries without updatedAt go to the end (removed first). + const sorted = keys.toSorted((a, b) => { + const aTime = getEntryUpdatedAt(store[a]); + const bTime = getEntryUpdatedAt(store[b]); + return bTime - aTime; + }); + + const toRemove = sorted.slice(maxEntries); + for (const key of toRemove) { + delete store[key]; + } + if (opts.log !== false) { + log.info("capped session entry count", { removed: toRemove.length, maxEntries }); + } + return toRemove.length; +} + +async function getSessionFileSize(storePath: string): Promise { + try { + const stat = await fs.promises.stat(storePath); + return stat.size; + } catch { + return null; + } +} + +/** + * Rotate the sessions file if it exceeds the configured size threshold. + * Renames the current file to `sessions.json.bak.{timestamp}` and cleans up + * old rotation backups, keeping only the 3 most recent `.bak.*` files. + */ +export async function rotateSessionFile( + storePath: string, + overrideBytes?: number, +): Promise { + const maxBytes = overrideBytes ?? resolveMaintenanceConfig().rotateBytes; + + // Check current file size (file may not exist yet). + const fileSize = await getSessionFileSize(storePath); + if (fileSize == null) { + return false; + } + + if (fileSize <= maxBytes) { + return false; + } + + // Rotate: rename current file to .bak.{timestamp} + const backupPath = `${storePath}.bak.${Date.now()}`; + try { + await fs.promises.rename(storePath, backupPath); + log.info("rotated session store file", { + backupPath: path.basename(backupPath), + sizeBytes: fileSize, + }); + } catch { + // If rename fails (e.g. file disappeared), skip rotation. + return false; + } + + // Clean up old backups — keep only the 3 most recent .bak.* files. + try { + const dir = path.dirname(storePath); + const baseName = path.basename(storePath); + const files = await fs.promises.readdir(dir); + const backups = files + .filter((f) => f.startsWith(`${baseName}.bak.`)) + .toSorted() + .toReversed(); + + const maxBackups = 3; + if (backups.length > maxBackups) { + const toDelete = backups.slice(maxBackups); + for (const old of toDelete) { + await fs.promises.unlink(path.join(dir, old)).catch(() => undefined); + } + log.info("cleaned up old session store backups", { deleted: toDelete.length }); + } + } catch { + // Best-effort cleanup; don't fail the write. + } + + return true; +} + +type SaveSessionStoreOptions = { + /** Skip pruning, capping, and rotation (e.g. during one-time migrations). */ + skipMaintenance?: boolean; + /** Active session key for warn-only maintenance. */ + activeSessionKey?: string; + /** Optional callback for warn-only maintenance. */ + onWarn?: (warning: SessionMaintenanceWarning) => void | Promise; +}; + async function saveSessionStoreUnlocked( storePath: string, store: Record, + opts?: SaveSessionStoreOptions, ): Promise { // Invalidate cache on write to ensure consistency invalidateSessionStoreCache(storePath); normalizeSessionStore(store); + if (!opts?.skipMaintenance) { + // Resolve maintenance config once (avoids repeated loadConfig() calls). + const maintenance = resolveMaintenanceConfig(); + const shouldWarnOnly = maintenance.mode === "warn"; + + if (shouldWarnOnly) { + const activeSessionKey = opts?.activeSessionKey?.trim(); + if (activeSessionKey) { + const warning = getActiveSessionMaintenanceWarning({ + store, + activeSessionKey, + pruneAfterMs: maintenance.pruneAfterMs, + maxEntries: maintenance.maxEntries, + }); + if (warning) { + log.warn("session maintenance would evict active session; skipping enforcement", { + activeSessionKey: warning.activeSessionKey, + wouldPrune: warning.wouldPrune, + wouldCap: warning.wouldCap, + pruneAfterMs: warning.pruneAfterMs, + maxEntries: warning.maxEntries, + }); + await opts?.onWarn?.(warning); + } + } + } else { + // Prune stale entries and cap total count before serializing. + pruneStaleEntries(store, maintenance.pruneAfterMs); + capEntryCount(store, maintenance.maxEntries); + + // Rotate the on-disk file if it exceeds the size threshold. + await rotateSessionFile(storePath, maintenance.rotateBytes); + } + } + await fs.promises.mkdir(path.dirname(storePath), { recursive: true }); const json = JSON.stringify(store, null, 2); @@ -266,21 +558,23 @@ async function saveSessionStoreUnlocked( export async function saveSessionStore( storePath: string, store: Record, + opts?: SaveSessionStoreOptions, ): Promise { await withSessionStoreLock(storePath, async () => { - await saveSessionStoreUnlocked(storePath, store); + await saveSessionStoreUnlocked(storePath, store, opts); }); } export async function updateSessionStore( storePath: string, mutator: (store: Record) => Promise | T, + opts?: SaveSessionStoreOptions, ): Promise { return await withSessionStoreLock(storePath, async () => { // Always re-read inside the lock to avoid clobbering concurrent writers. const store = loadSessionStore(storePath, { skipCache: true }); const result = await mutator(store); - await saveSessionStoreUnlocked(storePath, store); + await saveSessionStoreUnlocked(storePath, store, opts); return result; }); } @@ -381,7 +675,7 @@ export async function updateSessionStoreEntry(params: { } const next = mergeSessionEntry(existing, patch); store[sessionKey] = next; - await saveSessionStoreUnlocked(storePath, store); + await saveSessionStoreUnlocked(storePath, store, { activeSessionKey: sessionKey }); return next; }); } @@ -395,24 +689,28 @@ export async function recordSessionMetaFromInbound(params: { }): Promise { const { storePath, sessionKey, ctx } = params; const createIfMissing = params.createIfMissing ?? true; - return await updateSessionStore(storePath, (store) => { - const existing = store[sessionKey]; - const patch = deriveSessionMetaPatch({ - ctx, - sessionKey, - existing, - groupResolution: params.groupResolution, - }); - if (!patch) { - return existing ?? null; - } - if (!existing && !createIfMissing) { - return null; - } - const next = mergeSessionEntry(existing, patch); - store[sessionKey] = next; - return next; - }); + return await updateSessionStore( + storePath, + (store) => { + const existing = store[sessionKey]; + const patch = deriveSessionMetaPatch({ + ctx, + sessionKey, + existing, + groupResolution: params.groupResolution, + }); + if (!patch) { + return existing ?? null; + } + if (!existing && !createIfMissing) { + return null; + } + const next = mergeSessionEntry(existing, patch); + store[sessionKey] = next; + return next; + }, + { activeSessionKey: sessionKey }, + ); } export async function updateLastRoute(params: { @@ -488,7 +786,7 @@ export async function updateLastRoute(params: { metaPatch ? { ...basePatch, ...metaPatch } : basePatch, ); store[sessionKey] = next; - await saveSessionStoreUnlocked(storePath, store); + await saveSessionStoreUnlocked(storePath, store, { activeSessionKey: sessionKey }); return next; }); } diff --git a/src/config/sessions/transcript.ts b/src/config/sessions/transcript.ts index 864825f0b6..593548db70 100644 --- a/src/config/sessions/transcript.ts +++ b/src/config/sessions/transcript.ts @@ -134,12 +134,16 @@ export async function appendAssistantMessageToSessionTranscript(params: { }); if (!entry.sessionFile || entry.sessionFile !== sessionFile) { - await updateSessionStore(storePath, (current) => { - current[sessionKey] = { - ...entry, - sessionFile, - }; - }); + await updateSessionStore( + storePath, + (current) => { + current[sessionKey] = { + ...entry, + sessionFile, + }; + }, + { activeSessionKey: sessionKey }, + ); } emitSessionTranscriptUpdate(sessionFile); diff --git a/src/config/types.base.ts b/src/config/types.base.ts index 9d713b816d..f42cbd54a6 100644 --- a/src/config/types.base.ts +++ b/src/config/types.base.ts @@ -99,6 +99,23 @@ export type SessionConfig = { /** Max ping-pong turns between requester/target (0–5). Default: 5. */ maxPingPongTurns?: number; }; + /** Automatic session store maintenance (pruning, capping, file rotation). */ + maintenance?: SessionMaintenanceConfig; +}; + +export type SessionMaintenanceMode = "enforce" | "warn"; + +export type SessionMaintenanceConfig = { + /** Whether to enforce maintenance or warn only. Default: "warn". */ + mode?: SessionMaintenanceMode; + /** Remove session entries older than this duration (e.g. "30d", "12h"). Default: "30d". */ + pruneAfter?: string | number; + /** Deprecated. Use pruneAfter instead. */ + pruneDays?: number; + /** Maximum number of session entries to keep. Default: 500. */ + maxEntries?: number; + /** Rotate sessions.json when it exceeds this size (e.g. "10mb"). Default: 10mb. */ + rotateBytes?: number | string; }; export type LoggingConfig = { diff --git a/src/config/types.cron.ts b/src/config/types.cron.ts index 2db17f4e29..62a9c1da13 100644 --- a/src/config/types.cron.ts +++ b/src/config/types.cron.ts @@ -2,4 +2,10 @@ export type CronConfig = { enabled?: boolean; store?: string; maxConcurrentRuns?: number; + /** + * How long to retain completed cron run sessions before automatic pruning. + * Accepts a duration string (e.g. "24h", "7d", "1h30m") or `false` to disable pruning. + * Default: "24h". + */ + sessionRetention?: string | false; }; diff --git a/src/config/zod-schema.session.ts b/src/config/zod-schema.session.ts index 555b921cda..ce30509fd9 100644 --- a/src/config/zod-schema.session.ts +++ b/src/config/zod-schema.session.ts @@ -1,4 +1,6 @@ import { z } from "zod"; +import { parseByteSize } from "../cli/parse-bytes.js"; +import { parseDurationMs } from "../cli/parse-duration.js"; import { GroupChatSchema, InboundDebounceSchema, @@ -90,6 +92,41 @@ export const SessionSchema = z }) .strict() .optional(), + maintenance: z + .object({ + mode: z.enum(["enforce", "warn"]).optional(), + pruneAfter: z.union([z.string(), z.number()]).optional(), + /** @deprecated Use pruneAfter instead. */ + pruneDays: z.number().int().positive().optional(), + maxEntries: z.number().int().positive().optional(), + rotateBytes: z.union([z.string(), z.number()]).optional(), + }) + .strict() + .superRefine((val, ctx) => { + if (val.pruneAfter !== undefined) { + try { + parseDurationMs(String(val.pruneAfter).trim(), { defaultUnit: "d" }); + } catch { + ctx.addIssue({ + code: z.ZodIssueCode.custom, + path: ["pruneAfter"], + message: "invalid duration (use ms, s, m, h, d)", + }); + } + } + if (val.rotateBytes !== undefined) { + try { + parseByteSize(String(val.rotateBytes).trim(), { defaultUnit: "b" }); + } catch { + ctx.addIssue({ + code: z.ZodIssueCode.custom, + path: ["rotateBytes"], + message: "invalid size (use b, kb, mb, gb, tb)", + }); + } + } + }) + .optional(), }) .strict() .optional(); diff --git a/src/config/zod-schema.ts b/src/config/zod-schema.ts index 6947a58760..72396ddd3f 100644 --- a/src/config/zod-schema.ts +++ b/src/config/zod-schema.ts @@ -292,6 +292,7 @@ export const OpenClawSchema = z enabled: z.boolean().optional(), store: z.string().optional(), maxConcurrentRuns: z.number().int().positive().optional(), + sessionRetention: z.union([z.string(), z.literal(false)]).optional(), }) .strict() .optional(), diff --git a/src/cron/service/state.ts b/src/cron/service/state.ts index 0847989b3d..c51103f339 100644 --- a/src/cron/service/state.ts +++ b/src/cron/service/state.ts @@ -1,3 +1,4 @@ +import type { CronConfig } from "../../config/types.cron.js"; import type { HeartbeatRunResult } from "../../infra/heartbeat-wake.js"; import type { CronJob, CronJobCreate, CronJobPatch, CronStoreFile } from "../types.js"; @@ -26,6 +27,14 @@ export type CronServiceDeps = { log: Logger; storePath: string; cronEnabled: boolean; + /** CronConfig for session retention settings. */ + cronConfig?: CronConfig; + /** Default agent id for jobs without an agent id. */ + defaultAgentId?: string; + /** Resolve session store path for a given agent id. */ + resolveSessionStorePath?: (agentId?: string) => string; + /** Path to the session store (sessions.json) for reaper use. */ + sessionStorePath?: string; enqueueSystemEvent: (text: string, opts?: { agentId?: string }) => void; requestHeartbeatNow: (opts?: { reason?: string }) => void; runHeartbeatOnce?: (opts?: { reason?: string }) => Promise; diff --git a/src/cron/service/timer.ts b/src/cron/service/timer.ts index d18deddc6d..cda67eb2ae 100644 --- a/src/cron/service/timer.ts +++ b/src/cron/service/timer.ts @@ -1,7 +1,9 @@ import type { HeartbeatRunResult } from "../../infra/heartbeat-wake.js"; import type { CronJob } from "../types.js"; import type { CronEvent, CronServiceState } from "./state.js"; +import { DEFAULT_AGENT_ID } from "../../routing/session-key.js"; import { resolveCronDeliveryPlan } from "../delivery.js"; +import { sweepCronRunSessions } from "../session-reaper.js"; import { computeJobNextRunAtMs, nextWakeAtMs, @@ -273,6 +275,38 @@ export async function onTimer(state: CronServiceState) { await persist(state); }); } + // Piggyback session reaper on timer tick (self-throttled to every 5 min). + const storePaths = new Set(); + if (state.deps.resolveSessionStorePath) { + const defaultAgentId = state.deps.defaultAgentId ?? DEFAULT_AGENT_ID; + if (state.store?.jobs?.length) { + for (const job of state.store.jobs) { + const agentId = + typeof job.agentId === "string" && job.agentId.trim() ? job.agentId : defaultAgentId; + storePaths.add(state.deps.resolveSessionStorePath(agentId)); + } + } else { + storePaths.add(state.deps.resolveSessionStorePath(defaultAgentId)); + } + } else if (state.deps.sessionStorePath) { + storePaths.add(state.deps.sessionStorePath); + } + + if (storePaths.size > 0) { + const nowMs = state.deps.nowMs(); + for (const storePath of storePaths) { + try { + await sweepCronRunSessions({ + cronConfig: state.deps.cronConfig, + sessionStorePath: storePath, + nowMs, + log: state.deps.log, + }); + } catch (err) { + state.deps.log.warn({ err: String(err), storePath }, "cron: session reaper sweep failed"); + } + } + } } finally { state.running = false; armTimer(state); diff --git a/src/cron/session-reaper.test.ts b/src/cron/session-reaper.test.ts new file mode 100644 index 0000000000..3a0c0d57d6 --- /dev/null +++ b/src/cron/session-reaper.test.ts @@ -0,0 +1,203 @@ +import fs from "node:fs"; +import os from "node:os"; +import path from "node:path"; +import { describe, it, expect, beforeEach } from "vitest"; +import type { Logger } from "./service/state.js"; +import { isCronRunSessionKey } from "../sessions/session-key-utils.js"; +import { sweepCronRunSessions, resolveRetentionMs, resetReaperThrottle } from "./session-reaper.js"; + +function createTestLogger(): Logger { + return { + debug: () => {}, + info: () => {}, + warn: () => {}, + error: () => {}, + }; +} + +describe("resolveRetentionMs", () => { + it("returns 24h default when no config", () => { + expect(resolveRetentionMs()).toBe(24 * 3_600_000); + }); + + it("returns 24h default when config is empty", () => { + expect(resolveRetentionMs({})).toBe(24 * 3_600_000); + }); + + it("parses duration string", () => { + expect(resolveRetentionMs({ sessionRetention: "1h" })).toBe(3_600_000); + expect(resolveRetentionMs({ sessionRetention: "7d" })).toBe(7 * 86_400_000); + expect(resolveRetentionMs({ sessionRetention: "30m" })).toBe(30 * 60_000); + }); + + it("returns null when disabled", () => { + expect(resolveRetentionMs({ sessionRetention: false })).toBeNull(); + }); + + it("falls back to default on invalid string", () => { + expect(resolveRetentionMs({ sessionRetention: "abc" })).toBe(24 * 3_600_000); + }); +}); + +describe("isCronRunSessionKey", () => { + it("matches cron run session keys", () => { + expect(isCronRunSessionKey("agent:main:cron:abc-123:run:def-456")).toBe(true); + expect(isCronRunSessionKey("agent:debugger:cron:249ecf82:run:1102aabb")).toBe(true); + }); + + it("does not match base cron session keys", () => { + expect(isCronRunSessionKey("agent:main:cron:abc-123")).toBe(false); + }); + + it("does not match regular session keys", () => { + expect(isCronRunSessionKey("agent:main:telegram:dm:123")).toBe(false); + }); + + it("does not match non-canonical cron-like keys", () => { + expect(isCronRunSessionKey("agent:main:slack:cron:job:run:uuid")).toBe(false); + expect(isCronRunSessionKey("cron:job:run:uuid")).toBe(false); + }); +}); + +describe("sweepCronRunSessions", () => { + let tmpDir: string; + let storePath: string; + const log = createTestLogger(); + + beforeEach(async () => { + resetReaperThrottle(); + tmpDir = fs.mkdtempSync(path.join(os.tmpdir(), "cron-reaper-")); + storePath = path.join(tmpDir, "sessions.json"); + }); + + it("prunes expired cron run sessions", async () => { + const now = Date.now(); + const store: Record = { + "agent:main:cron:job1": { + sessionId: "base-session", + updatedAt: now, + }, + "agent:main:cron:job1:run:old-run": { + sessionId: "old-run", + updatedAt: now - 25 * 3_600_000, // 25h ago — expired + }, + "agent:main:cron:job1:run:recent-run": { + sessionId: "recent-run", + updatedAt: now - 1 * 3_600_000, // 1h ago — not expired + }, + "agent:main:telegram:dm:123": { + sessionId: "regular-session", + updatedAt: now - 100 * 3_600_000, // old but not a cron run + }, + }; + fs.writeFileSync(storePath, JSON.stringify(store)); + + const result = await sweepCronRunSessions({ + sessionStorePath: storePath, + nowMs: now, + log, + force: true, + }); + + expect(result.swept).toBe(true); + expect(result.pruned).toBe(1); + + const updated = JSON.parse(fs.readFileSync(storePath, "utf-8")); + expect(updated["agent:main:cron:job1"]).toBeDefined(); + expect(updated["agent:main:cron:job1:run:old-run"]).toBeUndefined(); + expect(updated["agent:main:cron:job1:run:recent-run"]).toBeDefined(); + expect(updated["agent:main:telegram:dm:123"]).toBeDefined(); + }); + + it("respects custom retention", async () => { + const now = Date.now(); + const store: Record = { + "agent:main:cron:job1:run:run1": { + sessionId: "run1", + updatedAt: now - 2 * 3_600_000, // 2h ago + }, + }; + fs.writeFileSync(storePath, JSON.stringify(store)); + + const result = await sweepCronRunSessions({ + cronConfig: { sessionRetention: "1h" }, + sessionStorePath: storePath, + nowMs: now, + log, + force: true, + }); + + expect(result.pruned).toBe(1); + }); + + it("does nothing when pruning is disabled", async () => { + const now = Date.now(); + const store: Record = { + "agent:main:cron:job1:run:run1": { + sessionId: "run1", + updatedAt: now - 100 * 3_600_000, + }, + }; + fs.writeFileSync(storePath, JSON.stringify(store)); + + const result = await sweepCronRunSessions({ + cronConfig: { sessionRetention: false }, + sessionStorePath: storePath, + nowMs: now, + log, + force: true, + }); + + expect(result.swept).toBe(false); + expect(result.pruned).toBe(0); + }); + + it("throttles sweeps without force", async () => { + const now = Date.now(); + fs.writeFileSync(storePath, JSON.stringify({})); + + // First sweep runs + const r1 = await sweepCronRunSessions({ + sessionStorePath: storePath, + nowMs: now, + log, + }); + expect(r1.swept).toBe(true); + + // Second sweep (1 second later) is throttled + const r2 = await sweepCronRunSessions({ + sessionStorePath: storePath, + nowMs: now + 1000, + log, + }); + expect(r2.swept).toBe(false); + }); + + it("throttles per store path", async () => { + const now = Date.now(); + const otherPath = path.join(tmpDir, "sessions-other.json"); + fs.writeFileSync(storePath, JSON.stringify({})); + fs.writeFileSync(otherPath, JSON.stringify({})); + + const r1 = await sweepCronRunSessions({ + sessionStorePath: storePath, + nowMs: now, + log, + }); + expect(r1.swept).toBe(true); + + const r2 = await sweepCronRunSessions({ + sessionStorePath: otherPath, + nowMs: now + 1000, + log, + }); + expect(r2.swept).toBe(true); + + const r3 = await sweepCronRunSessions({ + sessionStorePath: storePath, + nowMs: now + 1000, + log, + }); + expect(r3.swept).toBe(false); + }); +}); diff --git a/src/cron/session-reaper.ts b/src/cron/session-reaper.ts new file mode 100644 index 0000000000..f21559902e --- /dev/null +++ b/src/cron/session-reaper.ts @@ -0,0 +1,115 @@ +/** + * Cron session reaper — prunes completed isolated cron run sessions + * from the session store after a configurable retention period. + * + * Pattern: sessions keyed as `...:cron::run:` are ephemeral + * run records. The base session (`...:cron:`) is kept as-is. + */ + +import type { CronConfig } from "../config/types.cron.js"; +import type { Logger } from "./service/state.js"; +import { parseDurationMs } from "../cli/parse-duration.js"; +import { updateSessionStore } from "../config/sessions.js"; +import { isCronRunSessionKey } from "../sessions/session-key-utils.js"; + +const DEFAULT_RETENTION_MS = 24 * 3_600_000; // 24 hours + +/** Minimum interval between reaper sweeps (avoid running every timer tick). */ +const MIN_SWEEP_INTERVAL_MS = 5 * 60_000; // 5 minutes + +const lastSweepAtMsByStore = new Map(); + +export function resolveRetentionMs(cronConfig?: CronConfig): number | null { + if (cronConfig?.sessionRetention === false) { + return null; // pruning disabled + } + const raw = cronConfig?.sessionRetention; + if (typeof raw === "string" && raw.trim()) { + try { + return parseDurationMs(raw.trim(), { defaultUnit: "h" }); + } catch { + return DEFAULT_RETENTION_MS; + } + } + return DEFAULT_RETENTION_MS; +} + +export type ReaperResult = { + swept: boolean; + pruned: number; +}; + +/** + * Sweep the session store and prune expired cron run sessions. + * Designed to be called from the cron timer tick — self-throttles via + * MIN_SWEEP_INTERVAL_MS to avoid excessive I/O. + * + * Lock ordering: this function acquires the session-store file lock via + * `updateSessionStore`. It must be called OUTSIDE of the cron service's + * own `locked()` section to avoid lock-order inversions. The cron timer + * calls this after all `locked()` sections have been released. + */ +export async function sweepCronRunSessions(params: { + cronConfig?: CronConfig; + /** Resolved path to sessions.json — required. */ + sessionStorePath: string; + nowMs?: number; + log: Logger; + /** Override for testing — skips the min-interval throttle. */ + force?: boolean; +}): Promise { + const now = params.nowMs ?? Date.now(); + const storePath = params.sessionStorePath; + const lastSweepAtMs = lastSweepAtMsByStore.get(storePath) ?? 0; + + // Throttle: don't sweep more often than every 5 minutes. + if (!params.force && now - lastSweepAtMs < MIN_SWEEP_INTERVAL_MS) { + return { swept: false, pruned: 0 }; + } + + const retentionMs = resolveRetentionMs(params.cronConfig); + if (retentionMs === null) { + lastSweepAtMsByStore.set(storePath, now); + return { swept: false, pruned: 0 }; + } + + let pruned = 0; + try { + await updateSessionStore(storePath, (store) => { + const cutoff = now - retentionMs; + for (const key of Object.keys(store)) { + if (!isCronRunSessionKey(key)) { + continue; + } + const entry = store[key]; + if (!entry) { + continue; + } + const updatedAt = entry.updatedAt ?? 0; + if (updatedAt < cutoff) { + delete store[key]; + pruned++; + } + } + }); + } catch (err) { + params.log.warn({ err: String(err) }, "cron-reaper: failed to sweep session store"); + return { swept: false, pruned: 0 }; + } + + lastSweepAtMsByStore.set(storePath, now); + + if (pruned > 0) { + params.log.info( + { pruned, retentionMs }, + `cron-reaper: pruned ${pruned} expired cron run session(s)`, + ); + } + + return { swept: true, pruned }; +} + +/** Reset the throttle timer (for tests). */ +export function resetReaperThrottle(): void { + lastSweepAtMsByStore.clear(); +} diff --git a/src/gateway/server-cron.ts b/src/gateway/server-cron.ts index 12b0fe6b6c..10ce4200a6 100644 --- a/src/gateway/server-cron.ts +++ b/src/gateway/server-cron.ts @@ -2,6 +2,7 @@ import type { CliDeps } from "../cli/deps.js"; import { resolveDefaultAgentId } from "../agents/agent-scope.js"; import { loadConfig } from "../config/config.js"; import { resolveAgentMainSessionKey } from "../config/sessions.js"; +import { resolveStorePath } from "../config/sessions/paths.js"; import { runCronIsolatedAgentTurn } from "../cron/isolated-agent.js"; import { appendCronRunLog, resolveCronRunLogPath } from "../cron/run-log.js"; import { CronService } from "../cron/service.js"; @@ -43,9 +44,20 @@ export function buildGatewayCronService(params: { return { agentId, cfg: runtimeConfig }; }; + const defaultAgentId = resolveDefaultAgentId(params.cfg); + const resolveSessionStorePath = (agentId?: string) => + resolveStorePath(params.cfg.session?.store, { + agentId: agentId ?? defaultAgentId, + }); + const sessionStorePath = resolveSessionStorePath(defaultAgentId); + const cron = new CronService({ storePath, cronEnabled, + cronConfig: params.cfg.cron, + defaultAgentId, + resolveSessionStorePath, + sessionStorePath, enqueueSystemEvent: (text, opts) => { const { agentId, cfg: runtimeConfig } = resolveCronAgent(opts?.agentId); const sessionKey = resolveAgentMainSessionKey({ diff --git a/src/gateway/session-utils.ts b/src/gateway/session-utils.ts index bbbbc575ec..f2bd97874e 100644 --- a/src/gateway/session-utils.ts +++ b/src/gateway/session-utils.ts @@ -29,6 +29,7 @@ import { normalizeMainKey, parseAgentSessionKey, } from "../routing/session-key.js"; +import { isCronRunSessionKey } from "../sessions/session-key-utils.js"; import { normalizeSessionDeliveryFields } from "../utils/delivery-context.js"; import { readFirstUserMessageFromTranscript, @@ -207,12 +208,6 @@ export function classifySessionKey(key: string, entry?: SessionEntry): GatewaySe return "direct"; } -function isCronRunSessionKey(key: string): boolean { - const parsed = parseAgentSessionKey(key); - const raw = parsed?.rest ?? key; - return /^cron:[^:]+:run:[^:]+$/.test(raw); -} - export function parseGroupKey( key: string, ): { channel?: string; kind?: "group" | "channel"; id?: string } | null { diff --git a/src/infra/session-maintenance-warning.ts b/src/infra/session-maintenance-warning.ts new file mode 100644 index 0000000000..adb8d2e23c --- /dev/null +++ b/src/infra/session-maintenance-warning.ts @@ -0,0 +1,108 @@ +import type { OpenClawConfig } from "../config/config.js"; +import type { SessionEntry, SessionMaintenanceWarning } from "../config/sessions.js"; +import { isDeliverableMessageChannel, normalizeMessageChannel } from "../utils/message-channel.js"; +import { resolveSessionDeliveryTarget } from "./outbound/targets.js"; +import { enqueueSystemEvent } from "./system-events.js"; + +type WarningParams = { + cfg: OpenClawConfig; + sessionKey: string; + entry: SessionEntry; + warning: SessionMaintenanceWarning; +}; + +const warnedContexts = new Map(); + +function shouldSendWarning(): boolean { + return !process.env.VITEST && process.env.NODE_ENV !== "test"; +} + +function buildWarningContext(params: WarningParams): string { + const { warning } = params; + return [ + warning.activeSessionKey, + warning.pruneAfterMs, + warning.maxEntries, + warning.wouldPrune ? "prune" : "", + warning.wouldCap ? "cap" : "", + ] + .filter(Boolean) + .join("|"); +} + +function formatDuration(ms: number): string { + if (ms >= 86_400_000) { + const days = Math.round(ms / 86_400_000); + return `${days} day${days === 1 ? "" : "s"}`; + } + if (ms >= 3_600_000) { + const hours = Math.round(ms / 3_600_000); + return `${hours} hour${hours === 1 ? "" : "s"}`; + } + if (ms >= 60_000) { + const mins = Math.round(ms / 60_000); + return `${mins} minute${mins === 1 ? "" : "s"}`; + } + const secs = Math.round(ms / 1000); + return `${secs} second${secs === 1 ? "" : "s"}`; +} + +function buildWarningText(warning: SessionMaintenanceWarning): string { + const reasons: string[] = []; + if (warning.wouldPrune) { + reasons.push(`older than ${formatDuration(warning.pruneAfterMs)}`); + } + if (warning.wouldCap) { + reasons.push(`not in the most recent ${warning.maxEntries} sessions`); + } + const reasonText = reasons.length > 0 ? reasons.join(" and ") : "over maintenance limits"; + return ( + `⚠️ Session maintenance warning: this active session would be evicted (${reasonText}). ` + + `Maintenance is set to warn-only, so nothing was reset. ` + + `To enforce cleanup, set \`session.maintenance.mode: "enforce"\` or increase the limits.` + ); +} + +export async function deliverSessionMaintenanceWarning(params: WarningParams): Promise { + if (!shouldSendWarning()) { + return; + } + + const contextKey = buildWarningContext(params); + if (warnedContexts.get(params.sessionKey) === contextKey) { + return; + } + warnedContexts.set(params.sessionKey, contextKey); + + const text = buildWarningText(params.warning); + const target = resolveSessionDeliveryTarget({ + entry: params.entry, + requestedChannel: "last", + }); + + if (!target.channel || !target.to) { + enqueueSystemEvent(text, { sessionKey: params.sessionKey }); + return; + } + + const channel = normalizeMessageChannel(target.channel) ?? target.channel; + if (!isDeliverableMessageChannel(channel)) { + enqueueSystemEvent(text, { sessionKey: params.sessionKey }); + return; + } + + try { + const { deliverOutboundPayloads } = await import("./outbound/deliver.js"); + await deliverOutboundPayloads({ + cfg: params.cfg, + channel, + to: target.to, + accountId: target.accountId, + threadId: target.threadId, + payloads: [{ text }], + }); + } catch (err) { + console.warn(`Failed to deliver session maintenance warning: ${String(err)}`); + enqueueSystemEvent(text, { sessionKey: params.sessionKey }); + } +} diff --git a/src/infra/state-migrations.ts b/src/infra/state-migrations.ts index 36ebe54b3f..9bec6f5789 100644 --- a/src/infra/state-migrations.ts +++ b/src/infra/state-migrations.ts @@ -732,7 +732,9 @@ async function migrateLegacySessions( } normalized[key] = normalizedEntry; } - await saveSessionStore(detected.sessions.targetStorePath, normalized); + await saveSessionStore(detected.sessions.targetStorePath, normalized, { + skipMaintenance: true, + }); changes.push(`Merged sessions store → ${detected.sessions.targetStorePath}`); if (canonicalizedTarget.legacyKeys.length > 0) { changes.push(`Canonicalized ${canonicalizedTarget.legacyKeys.length} legacy session key(s)`); diff --git a/src/sessions/session-key-utils.ts b/src/sessions/session-key-utils.ts index ba867f552e..a8cdb3f947 100644 --- a/src/sessions/session-key-utils.ts +++ b/src/sessions/session-key-utils.ts @@ -25,6 +25,14 @@ export function parseAgentSessionKey( return { agentId, rest }; } +export function isCronRunSessionKey(sessionKey: string | undefined | null): boolean { + const parsed = parseAgentSessionKey(sessionKey); + if (!parsed) { + return false; + } + return /^cron:[^:]+:run:[^:]+$/.test(parsed.rest); +} + export function isSubagentSessionKey(sessionKey: string | undefined | null): boolean { const raw = (sessionKey ?? "").trim(); if (!raw) {