From c741d008dd683b9bb8e9784cd56587ad74487e30 Mon Sep 17 00:00:00 2001 From: Vignesh Natarajan Date: Sat, 7 Feb 2026 16:57:22 -0800 Subject: [PATCH] Memory: chain forced QMD queue and fail over on busy index --- src/memory/qmd-manager.test.ts | 125 ++++++++++++++++++++++++++++++++- src/memory/qmd-manager.ts | 55 ++++++++++----- 2 files changed, 162 insertions(+), 18 deletions(-) diff --git a/src/memory/qmd-manager.test.ts b/src/memory/qmd-manager.test.ts index 4eee92874c..0475aec502 100644 --- a/src/memory/qmd-manager.test.ts +++ b/src/memory/qmd-manager.test.ts @@ -308,6 +308,75 @@ describe("QmdMemoryManager", () => { await manager.close(); }); + it("honors multiple forced sync requests while forced queue is active", async () => { + cfg = { + ...cfg, + memory: { + backend: "qmd", + qmd: { + includeDefaultMemory: false, + update: { + interval: "0s", + debounceMs: 0, + onBoot: false, + updateTimeoutMs: 1_000, + }, + paths: [{ path: workspaceDir, pattern: "**/*.md", name: "workspace" }], + }, + }, + } as OpenClawConfig; + + let updateCalls = 0; + let releaseFirstUpdate: (() => void) | null = null; + let releaseSecondUpdate: (() => void) | null = null; + spawnMock.mockImplementation((_cmd: string, args: string[]) => { + if (args[0] === "update") { + updateCalls += 1; + if (updateCalls === 1) { + const first = createMockChild({ autoClose: false }); + releaseFirstUpdate = () => first.closeWith(0); + return first; + } + if (updateCalls === 2) { + const second = createMockChild({ autoClose: false }); + releaseSecondUpdate = () => second.closeWith(0); + return second; + } + return createMockChild(); + } + return createMockChild(); + }); + + const resolved = resolveMemoryBackendConfig({ cfg, agentId }); + const manager = await QmdMemoryManager.create({ cfg, agentId, resolved }); + expect(manager).toBeTruthy(); + if (!manager) { + throw new Error("manager missing"); + } + + const inFlight = manager.sync({ reason: "interval" }); + const forcedOne = manager.sync({ reason: "manual", force: true }); + + await new Promise((resolve) => setTimeout(resolve, 20)); + expect(updateCalls).toBe(1); + if (!releaseFirstUpdate) { + throw new Error("first update release missing"); + } + releaseFirstUpdate(); + + await waitForCondition(() => updateCalls >= 2, 200); + const forcedTwo = manager.sync({ reason: "manual-again", force: true }); + + if (!releaseSecondUpdate) { + throw new Error("second update release missing"); + } + releaseSecondUpdate(); + + await Promise.all([inFlight, forcedOne, forcedTwo]); + expect(updateCalls).toBe(3); + await manager.close(); + }); + it("logs and continues when qmd embed times out", async () => { cfg = { ...cfg, @@ -398,7 +467,7 @@ describe("QmdMemoryManager", () => { await manager.close(); }); - it("skips doc lookup when sqlite index is busy", async () => { + it("throws when sqlite index is busy", async () => { const resolved = resolveMemoryBackendConfig({ cfg, agentId }); const manager = await QmdMemoryManager.create({ cfg, agentId, resolved }); expect(manager).toBeTruthy(); @@ -417,7 +486,59 @@ describe("QmdMemoryManager", () => { }), close: () => {}, }; - await expect(inner.resolveDocLocation("abc123")).resolves.toBeNull(); + await expect(inner.resolveDocLocation("abc123")).rejects.toThrow( + "qmd index busy while reading results", + ); + await manager.close(); + }); + + it("fails search when sqlite index is busy so caller can fallback", async () => { + spawnMock.mockImplementation((_cmd: string, args: string[]) => { + if (args[0] === "query") { + const child = createMockChild({ autoClose: false }); + setTimeout(() => { + child.stdout.emit( + "data", + JSON.stringify([{ docid: "abc123", score: 1, snippet: "@@ -1,1\nremember this" }]), + ); + child.closeWith(0); + }, 0); + return child; + } + return createMockChild(); + }); + + const resolved = resolveMemoryBackendConfig({ cfg, agentId }); + const manager = await QmdMemoryManager.create({ cfg, agentId, resolved }); + expect(manager).toBeTruthy(); + if (!manager) { + throw new Error("manager missing"); + } + const inner = manager as unknown as { + db: { prepare: () => { get: () => never }; close: () => void } | null; + }; + inner.db = { + prepare: () => ({ + get: () => { + throw new Error("SQLITE_BUSY: database is locked"); + }, + }), + close: () => {}, + }; + await expect( + manager.search("busy lookup", { sessionKey: "agent:main:slack:dm:u123" }), + ).rejects.toThrow("qmd index busy while reading results"); await manager.close(); }); }); + +async function waitForCondition(check: () => boolean, timeoutMs: number): Promise { + const deadline = Date.now() + timeoutMs; + while (Date.now() < deadline) { + if (check()) { + return; + } + await new Promise((resolve) => setTimeout(resolve, 5)); + } + throw new Error("condition was not met in time"); +} diff --git a/src/memory/qmd-manager.ts b/src/memory/qmd-manager.ts index 1aca2223a2..66ef31a01d 100644 --- a/src/memory/qmd-manager.ts +++ b/src/memory/qmd-manager.ts @@ -85,6 +85,7 @@ export class QmdMemoryManager implements MemorySearchManager { private updateTimer: NodeJS.Timeout | null = null; private pendingUpdate: Promise | null = null; private queuedForcedUpdate: Promise | null = null; + private queuedForcedRuns = 0; private closed = false; private db: SqliteDatabase | null = null; private lastUpdateAt: number | null = null; @@ -386,36 +387,35 @@ export class QmdMemoryManager implements MemorySearchManager { clearInterval(this.updateTimer); this.updateTimer = null; } + this.queuedForcedRuns = 0; await this.pendingUpdate?.catch(() => undefined); + await this.queuedForcedUpdate?.catch(() => undefined); if (this.db) { this.db.close(); this.db = null; } } - private async runUpdate(reason: string, force?: boolean): Promise { + private async runUpdate( + reason: string, + force?: boolean, + opts?: { fromForcedQueue?: boolean }, + ): Promise { if (this.closed) { return; } if (this.pendingUpdate) { if (force) { - if (!this.queuedForcedUpdate) { - this.queuedForcedUpdate = this.pendingUpdate - .catch(() => undefined) - .then(async () => { - if (this.closed) { - return; - } - await this.runUpdate(`${reason}:queued`, true); - }) - .finally(() => { - this.queuedForcedUpdate = null; - }); - } - return this.queuedForcedUpdate; + return this.enqueueForcedUpdate(reason); } return this.pendingUpdate; } + if (this.queuedForcedUpdate && !opts?.fromForcedQueue) { + if (force) { + return this.enqueueForcedUpdate(reason); + } + return this.queuedForcedUpdate; + } if (this.shouldSkipUpdate(force)) { return; } @@ -446,6 +446,24 @@ export class QmdMemoryManager implements MemorySearchManager { await this.pendingUpdate; } + private enqueueForcedUpdate(reason: string): Promise { + this.queuedForcedRuns += 1; + if (!this.queuedForcedUpdate) { + this.queuedForcedUpdate = this.drainForcedUpdates(reason).finally(() => { + this.queuedForcedUpdate = null; + }); + } + return this.queuedForcedUpdate; + } + + private async drainForcedUpdates(reason: string): Promise { + await this.pendingUpdate?.catch(() => undefined); + while (!this.closed && this.queuedForcedRuns > 0) { + this.queuedForcedRuns -= 1; + await this.runUpdate(`${reason}:queued`, true, { fromForcedQueue: true }); + } + } + private async runQmd( args: string[], opts?: { timeoutMs?: number }, @@ -577,7 +595,7 @@ export class QmdMemoryManager implements MemorySearchManager { } catch (err) { if (this.isSqliteBusyError(err)) { log.debug(`qmd index is busy while resolving doc path: ${String(err)}`); - return null; + throw this.createQmdBusyError(err); } throw err; } @@ -862,6 +880,11 @@ export class QmdMemoryManager implements MemorySearchManager { return normalized.includes("sqlite_busy") || normalized.includes("database is locked"); } + private createQmdBusyError(err: unknown): Error { + const message = err instanceof Error ? err.message : String(err); + return new Error(`qmd index busy while reading results: ${message}`); + } + private async waitForPendingUpdateBeforeSearch(): Promise { const pending = this.pendingUpdate; if (!pending) {