Memory: chain forced QMD queue and fail over on busy index

This commit is contained in:
Vignesh Natarajan
2026-02-07 16:57:22 -08:00
committed by Vignesh
parent 0d60ef6fef
commit c741d008dd
2 changed files with 162 additions and 18 deletions

View File

@@ -308,6 +308,75 @@ describe("QmdMemoryManager", () => {
await manager.close();
});
it("honors multiple forced sync requests while forced queue is active", async () => {
cfg = {
...cfg,
memory: {
backend: "qmd",
qmd: {
includeDefaultMemory: false,
update: {
interval: "0s",
debounceMs: 0,
onBoot: false,
updateTimeoutMs: 1_000,
},
paths: [{ path: workspaceDir, pattern: "**/*.md", name: "workspace" }],
},
},
} as OpenClawConfig;
let updateCalls = 0;
let releaseFirstUpdate: (() => void) | null = null;
let releaseSecondUpdate: (() => void) | null = null;
spawnMock.mockImplementation((_cmd: string, args: string[]) => {
if (args[0] === "update") {
updateCalls += 1;
if (updateCalls === 1) {
const first = createMockChild({ autoClose: false });
releaseFirstUpdate = () => first.closeWith(0);
return first;
}
if (updateCalls === 2) {
const second = createMockChild({ autoClose: false });
releaseSecondUpdate = () => second.closeWith(0);
return second;
}
return createMockChild();
}
return createMockChild();
});
const resolved = resolveMemoryBackendConfig({ cfg, agentId });
const manager = await QmdMemoryManager.create({ cfg, agentId, resolved });
expect(manager).toBeTruthy();
if (!manager) {
throw new Error("manager missing");
}
const inFlight = manager.sync({ reason: "interval" });
const forcedOne = manager.sync({ reason: "manual", force: true });
await new Promise((resolve) => setTimeout(resolve, 20));
expect(updateCalls).toBe(1);
if (!releaseFirstUpdate) {
throw new Error("first update release missing");
}
releaseFirstUpdate();
await waitForCondition(() => updateCalls >= 2, 200);
const forcedTwo = manager.sync({ reason: "manual-again", force: true });
if (!releaseSecondUpdate) {
throw new Error("second update release missing");
}
releaseSecondUpdate();
await Promise.all([inFlight, forcedOne, forcedTwo]);
expect(updateCalls).toBe(3);
await manager.close();
});
it("logs and continues when qmd embed times out", async () => {
cfg = {
...cfg,
@@ -398,7 +467,7 @@ describe("QmdMemoryManager", () => {
await manager.close();
});
it("skips doc lookup when sqlite index is busy", async () => {
it("throws when sqlite index is busy", async () => {
const resolved = resolveMemoryBackendConfig({ cfg, agentId });
const manager = await QmdMemoryManager.create({ cfg, agentId, resolved });
expect(manager).toBeTruthy();
@@ -417,7 +486,59 @@ describe("QmdMemoryManager", () => {
}),
close: () => {},
};
await expect(inner.resolveDocLocation("abc123")).resolves.toBeNull();
await expect(inner.resolveDocLocation("abc123")).rejects.toThrow(
"qmd index busy while reading results",
);
await manager.close();
});
it("fails search when sqlite index is busy so caller can fallback", async () => {
spawnMock.mockImplementation((_cmd: string, args: string[]) => {
if (args[0] === "query") {
const child = createMockChild({ autoClose: false });
setTimeout(() => {
child.stdout.emit(
"data",
JSON.stringify([{ docid: "abc123", score: 1, snippet: "@@ -1,1\nremember this" }]),
);
child.closeWith(0);
}, 0);
return child;
}
return createMockChild();
});
const resolved = resolveMemoryBackendConfig({ cfg, agentId });
const manager = await QmdMemoryManager.create({ cfg, agentId, resolved });
expect(manager).toBeTruthy();
if (!manager) {
throw new Error("manager missing");
}
const inner = manager as unknown as {
db: { prepare: () => { get: () => never }; close: () => void } | null;
};
inner.db = {
prepare: () => ({
get: () => {
throw new Error("SQLITE_BUSY: database is locked");
},
}),
close: () => {},
};
await expect(
manager.search("busy lookup", { sessionKey: "agent:main:slack:dm:u123" }),
).rejects.toThrow("qmd index busy while reading results");
await manager.close();
});
});
async function waitForCondition(check: () => boolean, timeoutMs: number): Promise<void> {
const deadline = Date.now() + timeoutMs;
while (Date.now() < deadline) {
if (check()) {
return;
}
await new Promise((resolve) => setTimeout(resolve, 5));
}
throw new Error("condition was not met in time");
}

View File

@@ -85,6 +85,7 @@ export class QmdMemoryManager implements MemorySearchManager {
private updateTimer: NodeJS.Timeout | null = null;
private pendingUpdate: Promise<void> | null = null;
private queuedForcedUpdate: Promise<void> | null = null;
private queuedForcedRuns = 0;
private closed = false;
private db: SqliteDatabase | null = null;
private lastUpdateAt: number | null = null;
@@ -386,36 +387,35 @@ export class QmdMemoryManager implements MemorySearchManager {
clearInterval(this.updateTimer);
this.updateTimer = null;
}
this.queuedForcedRuns = 0;
await this.pendingUpdate?.catch(() => undefined);
await this.queuedForcedUpdate?.catch(() => undefined);
if (this.db) {
this.db.close();
this.db = null;
}
}
private async runUpdate(reason: string, force?: boolean): Promise<void> {
private async runUpdate(
reason: string,
force?: boolean,
opts?: { fromForcedQueue?: boolean },
): Promise<void> {
if (this.closed) {
return;
}
if (this.pendingUpdate) {
if (force) {
if (!this.queuedForcedUpdate) {
this.queuedForcedUpdate = this.pendingUpdate
.catch(() => undefined)
.then(async () => {
if (this.closed) {
return;
}
await this.runUpdate(`${reason}:queued`, true);
})
.finally(() => {
this.queuedForcedUpdate = null;
});
}
return this.queuedForcedUpdate;
return this.enqueueForcedUpdate(reason);
}
return this.pendingUpdate;
}
if (this.queuedForcedUpdate && !opts?.fromForcedQueue) {
if (force) {
return this.enqueueForcedUpdate(reason);
}
return this.queuedForcedUpdate;
}
if (this.shouldSkipUpdate(force)) {
return;
}
@@ -446,6 +446,24 @@ export class QmdMemoryManager implements MemorySearchManager {
await this.pendingUpdate;
}
private enqueueForcedUpdate(reason: string): Promise<void> {
this.queuedForcedRuns += 1;
if (!this.queuedForcedUpdate) {
this.queuedForcedUpdate = this.drainForcedUpdates(reason).finally(() => {
this.queuedForcedUpdate = null;
});
}
return this.queuedForcedUpdate;
}
private async drainForcedUpdates(reason: string): Promise<void> {
await this.pendingUpdate?.catch(() => undefined);
while (!this.closed && this.queuedForcedRuns > 0) {
this.queuedForcedRuns -= 1;
await this.runUpdate(`${reason}:queued`, true, { fromForcedQueue: true });
}
}
private async runQmd(
args: string[],
opts?: { timeoutMs?: number },
@@ -577,7 +595,7 @@ export class QmdMemoryManager implements MemorySearchManager {
} catch (err) {
if (this.isSqliteBusyError(err)) {
log.debug(`qmd index is busy while resolving doc path: ${String(err)}`);
return null;
throw this.createQmdBusyError(err);
}
throw err;
}
@@ -862,6 +880,11 @@ export class QmdMemoryManager implements MemorySearchManager {
return normalized.includes("sqlite_busy") || normalized.includes("database is locked");
}
private createQmdBusyError(err: unknown): Error {
const message = err instanceof Error ? err.message : String(err);
return new Error(`qmd index busy while reading results: ${message}`);
}
private async waitForPendingUpdateBeforeSearch(): Promise<void> {
const pending = this.pendingUpdate;
if (!pending) {