mirror of
https://github.com/openclaw/openclaw.git
synced 2026-02-19 18:39:20 -05:00
fix: remap session JSONL chunk line numbers to original source positions (#12102)
* fix: remap session JSONL chunk line numbers to original source positions buildSessionEntry() flattens JSONL messages into plain text before chunkMarkdown() assigns line numbers. The stored startLine/endLine values therefore reference positions in the flattened text, not the original JSONL file. - Add lineMap to SessionFileEntry tracking which JSONL line each extracted message came from - Add remapChunkLines() to translate chunk positions back to original JSONL lines after chunking - Guard remap with source === "sessions" to prevent misapplication - Include lineMap in content hash so existing sessions get re-indexed Fixes #12044 * memory: dedupe session JSONL parsing --------- Co-authored-by: Tak Hoffman <781889+Takhoffman@users.noreply.github.com>
This commit is contained in:
@@ -2,7 +2,12 @@ import fs from "node:fs/promises";
|
||||
import os from "node:os";
|
||||
import path from "node:path";
|
||||
import { afterEach, beforeEach, describe, expect, it } from "vitest";
|
||||
import { chunkMarkdown, listMemoryFiles, normalizeExtraMemoryPaths } from "./internal.js";
|
||||
import {
|
||||
chunkMarkdown,
|
||||
listMemoryFiles,
|
||||
normalizeExtraMemoryPaths,
|
||||
remapChunkLines,
|
||||
} from "./internal.js";
|
||||
|
||||
describe("normalizeExtraMemoryPaths", () => {
|
||||
it("trims, resolves, and dedupes paths", () => {
|
||||
@@ -123,3 +128,65 @@ describe("chunkMarkdown", () => {
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
describe("remapChunkLines", () => {
|
||||
it("remaps chunk line numbers using a lineMap", () => {
|
||||
// Simulate 5 content lines that came from JSONL lines [4, 6, 7, 10, 13] (1-indexed)
|
||||
const lineMap = [4, 6, 7, 10, 13];
|
||||
|
||||
// Create chunks from content that has 5 lines
|
||||
const content = "User: Hello\nAssistant: Hi\nUser: Question\nAssistant: Answer\nUser: Thanks";
|
||||
const chunks = chunkMarkdown(content, { tokens: 400, overlap: 0 });
|
||||
expect(chunks.length).toBeGreaterThan(0);
|
||||
|
||||
// Before remapping, startLine/endLine reference content line numbers (1-indexed)
|
||||
expect(chunks[0].startLine).toBe(1);
|
||||
|
||||
// Remap
|
||||
remapChunkLines(chunks, lineMap);
|
||||
|
||||
// After remapping, line numbers should reference original JSONL lines
|
||||
// Content line 1 → JSONL line 4, content line 5 → JSONL line 13
|
||||
expect(chunks[0].startLine).toBe(4);
|
||||
const lastChunk = chunks[chunks.length - 1];
|
||||
expect(lastChunk.endLine).toBe(13);
|
||||
});
|
||||
|
||||
it("preserves original line numbers when lineMap is undefined", () => {
|
||||
const content = "Line one\nLine two\nLine three";
|
||||
const chunks = chunkMarkdown(content, { tokens: 400, overlap: 0 });
|
||||
const originalStart = chunks[0].startLine;
|
||||
const originalEnd = chunks[chunks.length - 1].endLine;
|
||||
|
||||
remapChunkLines(chunks, undefined);
|
||||
|
||||
expect(chunks[0].startLine).toBe(originalStart);
|
||||
expect(chunks[chunks.length - 1].endLine).toBe(originalEnd);
|
||||
});
|
||||
|
||||
it("handles multi-chunk content with correct remapping", () => {
|
||||
// Use small chunk size to force multiple chunks
|
||||
// lineMap: 10 content lines from JSONL lines [2, 5, 8, 11, 14, 17, 20, 23, 26, 29]
|
||||
const lineMap = [2, 5, 8, 11, 14, 17, 20, 23, 26, 29];
|
||||
const contentLines = lineMap.map((_, i) =>
|
||||
i % 2 === 0 ? `User: Message ${i}` : `Assistant: Reply ${i}`,
|
||||
);
|
||||
const content = contentLines.join("\n");
|
||||
|
||||
// Use very small chunk size to force splitting
|
||||
const chunks = chunkMarkdown(content, { tokens: 10, overlap: 0 });
|
||||
expect(chunks.length).toBeGreaterThan(1);
|
||||
|
||||
remapChunkLines(chunks, lineMap);
|
||||
|
||||
// First chunk should start at JSONL line 2
|
||||
expect(chunks[0].startLine).toBe(2);
|
||||
// Last chunk should end at JSONL line 29
|
||||
expect(chunks[chunks.length - 1].endLine).toBe(29);
|
||||
|
||||
// Each chunk's startLine should be ≤ its endLine
|
||||
for (const chunk of chunks) {
|
||||
expect(chunk.startLine).toBeLessThanOrEqual(chunk.endLine);
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
@@ -246,6 +246,27 @@ export function chunkMarkdown(
|
||||
return chunks;
|
||||
}
|
||||
|
||||
/**
|
||||
* Remap chunk startLine/endLine from content-relative positions to original
|
||||
* source file positions using a lineMap. Each entry in lineMap gives the
|
||||
* 1-indexed source line for the corresponding 0-indexed content line.
|
||||
*
|
||||
* This is used for session JSONL files where buildSessionEntry() flattens
|
||||
* messages into a plain-text string before chunking. Without remapping the
|
||||
* stored line numbers would reference positions in the flattened text rather
|
||||
* than the original JSONL file.
|
||||
*/
|
||||
export function remapChunkLines(chunks: MemoryChunk[], lineMap: number[] | undefined): void {
|
||||
if (!lineMap || lineMap.length === 0) {
|
||||
return;
|
||||
}
|
||||
for (const chunk of chunks) {
|
||||
// startLine/endLine are 1-indexed; lineMap is 0-indexed by content line
|
||||
chunk.startLine = lineMap[chunk.startLine - 1] ?? chunk.startLine;
|
||||
chunk.endLine = lineMap[chunk.endLine - 1] ?? chunk.endLine;
|
||||
}
|
||||
}
|
||||
|
||||
export function parseEmbedding(raw: string): number[] {
|
||||
try {
|
||||
const parsed = JSON.parse(raw) as number[];
|
||||
|
||||
@@ -50,10 +50,17 @@ import {
|
||||
type MemoryChunk,
|
||||
type MemoryFileEntry,
|
||||
parseEmbedding,
|
||||
remapChunkLines,
|
||||
runWithConcurrency,
|
||||
} from "./internal.js";
|
||||
import { searchKeyword, searchVector } from "./manager-search.js";
|
||||
import { ensureMemoryIndexSchema } from "./memory-schema.js";
|
||||
import {
|
||||
buildSessionEntry,
|
||||
listSessionFilesForAgent,
|
||||
sessionPathForFile,
|
||||
type SessionFileEntry,
|
||||
} from "./session-files.js";
|
||||
import { loadSqliteVecExtension } from "./sqlite-vec.js";
|
||||
import { requireNodeSqlite } from "./sqlite.js";
|
||||
|
||||
@@ -66,15 +73,6 @@ type MemoryIndexMeta = {
|
||||
vectorDims?: number;
|
||||
};
|
||||
|
||||
type SessionFileEntry = {
|
||||
path: string;
|
||||
absPath: string;
|
||||
mtimeMs: number;
|
||||
size: number;
|
||||
hash: string;
|
||||
content: string;
|
||||
};
|
||||
|
||||
type MemorySyncProgressState = {
|
||||
completed: number;
|
||||
total: number;
|
||||
@@ -1147,8 +1145,8 @@ export class MemoryIndexManager implements MemorySearchManager {
|
||||
needsFullReindex: boolean;
|
||||
progress?: MemorySyncProgressState;
|
||||
}) {
|
||||
const files = await this.listSessionFiles();
|
||||
const activePaths = new Set(files.map((file) => this.sessionPathForFile(file)));
|
||||
const files = await listSessionFilesForAgent(this.agentId);
|
||||
const activePaths = new Set(files.map((file) => sessionPathForFile(file)));
|
||||
const indexAll = params.needsFullReindex || this.sessionsDirtyFiles.size === 0;
|
||||
log.debug("memory sync: indexing session files", {
|
||||
files: files.length,
|
||||
@@ -1177,7 +1175,7 @@ export class MemoryIndexManager implements MemorySearchManager {
|
||||
}
|
||||
return;
|
||||
}
|
||||
const entry = await this.buildSessionEntry(absPath);
|
||||
const entry = await buildSessionEntry(absPath);
|
||||
if (!entry) {
|
||||
if (params.progress) {
|
||||
params.progress.completed += 1;
|
||||
@@ -1545,113 +1543,6 @@ export class MemoryIndexManager implements MemorySearchManager {
|
||||
.run(META_KEY, value);
|
||||
}
|
||||
|
||||
private async listSessionFiles(): Promise<string[]> {
|
||||
const dir = resolveSessionTranscriptsDirForAgent(this.agentId);
|
||||
try {
|
||||
const entries = await fs.readdir(dir, { withFileTypes: true });
|
||||
return entries
|
||||
.filter((entry) => entry.isFile())
|
||||
.map((entry) => entry.name)
|
||||
.filter((name) => name.endsWith(".jsonl"))
|
||||
.map((name) => path.join(dir, name));
|
||||
} catch {
|
||||
return [];
|
||||
}
|
||||
}
|
||||
|
||||
private sessionPathForFile(absPath: string): string {
|
||||
return path.join("sessions", path.basename(absPath)).replace(/\\/g, "/");
|
||||
}
|
||||
|
||||
private normalizeSessionText(value: string): string {
|
||||
return value
|
||||
.replace(/\s*\n+\s*/g, " ")
|
||||
.replace(/\s+/g, " ")
|
||||
.trim();
|
||||
}
|
||||
|
||||
private extractSessionText(content: unknown): string | null {
|
||||
if (typeof content === "string") {
|
||||
const normalized = this.normalizeSessionText(content);
|
||||
return normalized ? normalized : null;
|
||||
}
|
||||
if (!Array.isArray(content)) {
|
||||
return null;
|
||||
}
|
||||
const parts: string[] = [];
|
||||
for (const block of content) {
|
||||
if (!block || typeof block !== "object") {
|
||||
continue;
|
||||
}
|
||||
const record = block as { type?: unknown; text?: unknown };
|
||||
if (record.type !== "text" || typeof record.text !== "string") {
|
||||
continue;
|
||||
}
|
||||
const normalized = this.normalizeSessionText(record.text);
|
||||
if (normalized) {
|
||||
parts.push(normalized);
|
||||
}
|
||||
}
|
||||
if (parts.length === 0) {
|
||||
return null;
|
||||
}
|
||||
return parts.join(" ");
|
||||
}
|
||||
|
||||
private async buildSessionEntry(absPath: string): Promise<SessionFileEntry | null> {
|
||||
try {
|
||||
const stat = await fs.stat(absPath);
|
||||
const raw = await fs.readFile(absPath, "utf-8");
|
||||
const lines = raw.split("\n");
|
||||
const collected: string[] = [];
|
||||
for (const line of lines) {
|
||||
if (!line.trim()) {
|
||||
continue;
|
||||
}
|
||||
let record: unknown;
|
||||
try {
|
||||
record = JSON.parse(line);
|
||||
} catch {
|
||||
continue;
|
||||
}
|
||||
if (
|
||||
!record ||
|
||||
typeof record !== "object" ||
|
||||
(record as { type?: unknown }).type !== "message"
|
||||
) {
|
||||
continue;
|
||||
}
|
||||
const message = (record as { message?: unknown }).message as
|
||||
| { role?: unknown; content?: unknown }
|
||||
| undefined;
|
||||
if (!message || typeof message.role !== "string") {
|
||||
continue;
|
||||
}
|
||||
if (message.role !== "user" && message.role !== "assistant") {
|
||||
continue;
|
||||
}
|
||||
const text = this.extractSessionText(message.content);
|
||||
if (!text) {
|
||||
continue;
|
||||
}
|
||||
const label = message.role === "user" ? "User" : "Assistant";
|
||||
collected.push(`${label}: ${text}`);
|
||||
}
|
||||
const content = collected.join("\n");
|
||||
return {
|
||||
path: this.sessionPathForFile(absPath),
|
||||
absPath,
|
||||
mtimeMs: stat.mtimeMs,
|
||||
size: stat.size,
|
||||
hash: hashText(content),
|
||||
content,
|
||||
};
|
||||
} catch (err) {
|
||||
log.debug(`Failed reading session file ${absPath}: ${String(err)}`);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
private estimateEmbeddingTokens(text: string): number {
|
||||
if (!text) {
|
||||
return 0;
|
||||
@@ -2318,6 +2209,9 @@ export class MemoryIndexManager implements MemorySearchManager {
|
||||
const chunks = chunkMarkdown(content, this.settings.chunking).filter(
|
||||
(chunk) => chunk.text.trim().length > 0,
|
||||
);
|
||||
if (options.source === "sessions" && "lineMap" in entry) {
|
||||
remapChunkLines(chunks, entry.lineMap);
|
||||
}
|
||||
const embeddings = this.batch.enabled
|
||||
? await this.embedChunksWithBatch(chunks, entry, options.source)
|
||||
: await this.embedChunksInBatches(chunks);
|
||||
|
||||
87
src/memory/session-files.test.ts
Normal file
87
src/memory/session-files.test.ts
Normal file
@@ -0,0 +1,87 @@
|
||||
import fs from "node:fs/promises";
|
||||
import os from "node:os";
|
||||
import path from "node:path";
|
||||
import { afterEach, beforeEach, describe, expect, it } from "vitest";
|
||||
import { buildSessionEntry } from "./session-files.js";
|
||||
|
||||
describe("buildSessionEntry", () => {
|
||||
let tmpDir: string;
|
||||
|
||||
beforeEach(async () => {
|
||||
tmpDir = await fs.mkdtemp(path.join(os.tmpdir(), "session-entry-test-"));
|
||||
});
|
||||
|
||||
afterEach(async () => {
|
||||
await fs.rm(tmpDir, { recursive: true, force: true });
|
||||
});
|
||||
|
||||
it("returns lineMap tracking original JSONL line numbers", async () => {
|
||||
// Simulate a real session JSONL file with metadata records interspersed
|
||||
// Lines 1-3: non-message metadata records
|
||||
// Line 4: user message
|
||||
// Line 5: metadata
|
||||
// Line 6: assistant message
|
||||
// Line 7: user message
|
||||
const jsonlLines = [
|
||||
JSON.stringify({ type: "custom", customType: "model-snapshot", data: {} }),
|
||||
JSON.stringify({ type: "custom", customType: "openclaw.cache-ttl", data: {} }),
|
||||
JSON.stringify({ type: "session-meta", agentId: "test" }),
|
||||
JSON.stringify({ type: "message", message: { role: "user", content: "Hello world" } }),
|
||||
JSON.stringify({ type: "custom", customType: "tool-result", data: {} }),
|
||||
JSON.stringify({
|
||||
type: "message",
|
||||
message: { role: "assistant", content: "Hi there, how can I help?" },
|
||||
}),
|
||||
JSON.stringify({ type: "message", message: { role: "user", content: "Tell me a joke" } }),
|
||||
];
|
||||
const filePath = path.join(tmpDir, "session.jsonl");
|
||||
await fs.writeFile(filePath, jsonlLines.join("\n"));
|
||||
|
||||
const entry = await buildSessionEntry(filePath);
|
||||
expect(entry).not.toBeNull();
|
||||
|
||||
// The content should have 3 lines (3 message records)
|
||||
const contentLines = entry!.content.split("\n");
|
||||
expect(contentLines).toHaveLength(3);
|
||||
expect(contentLines[0]).toContain("User: Hello world");
|
||||
expect(contentLines[1]).toContain("Assistant: Hi there");
|
||||
expect(contentLines[2]).toContain("User: Tell me a joke");
|
||||
|
||||
// lineMap should map each content line to its original JSONL line (1-indexed)
|
||||
// Content line 0 → JSONL line 4 (the first user message)
|
||||
// Content line 1 → JSONL line 6 (the assistant message)
|
||||
// Content line 2 → JSONL line 7 (the second user message)
|
||||
expect(entry!.lineMap).toBeDefined();
|
||||
expect(entry!.lineMap).toEqual([4, 6, 7]);
|
||||
});
|
||||
|
||||
it("returns empty lineMap when no messages are found", async () => {
|
||||
const jsonlLines = [
|
||||
JSON.stringify({ type: "custom", customType: "model-snapshot", data: {} }),
|
||||
JSON.stringify({ type: "session-meta", agentId: "test" }),
|
||||
];
|
||||
const filePath = path.join(tmpDir, "empty-session.jsonl");
|
||||
await fs.writeFile(filePath, jsonlLines.join("\n"));
|
||||
|
||||
const entry = await buildSessionEntry(filePath);
|
||||
expect(entry).not.toBeNull();
|
||||
expect(entry!.content).toBe("");
|
||||
expect(entry!.lineMap).toEqual([]);
|
||||
});
|
||||
|
||||
it("skips blank lines and invalid JSON without breaking lineMap", async () => {
|
||||
const jsonlLines = [
|
||||
"",
|
||||
"not valid json",
|
||||
JSON.stringify({ type: "message", message: { role: "user", content: "First" } }),
|
||||
"",
|
||||
JSON.stringify({ type: "message", message: { role: "assistant", content: "Second" } }),
|
||||
];
|
||||
const filePath = path.join(tmpDir, "gaps.jsonl");
|
||||
await fs.writeFile(filePath, jsonlLines.join("\n"));
|
||||
|
||||
const entry = await buildSessionEntry(filePath);
|
||||
expect(entry).not.toBeNull();
|
||||
expect(entry!.lineMap).toEqual([3, 5]);
|
||||
});
|
||||
});
|
||||
@@ -14,6 +14,8 @@ export type SessionFileEntry = {
|
||||
size: number;
|
||||
hash: string;
|
||||
content: string;
|
||||
/** Maps each content line (0-indexed) to its 1-indexed JSONL source line. */
|
||||
lineMap: number[];
|
||||
};
|
||||
|
||||
export async function listSessionFilesForAgent(agentId: string): Promise<string[]> {
|
||||
@@ -75,7 +77,9 @@ export async function buildSessionEntry(absPath: string): Promise<SessionFileEnt
|
||||
const raw = await fs.readFile(absPath, "utf-8");
|
||||
const lines = raw.split("\n");
|
||||
const collected: string[] = [];
|
||||
for (const line of lines) {
|
||||
const lineMap: number[] = [];
|
||||
for (let jsonlIdx = 0; jsonlIdx < lines.length; jsonlIdx++) {
|
||||
const line = lines[jsonlIdx];
|
||||
if (!line.trim()) {
|
||||
continue;
|
||||
}
|
||||
@@ -108,6 +112,7 @@ export async function buildSessionEntry(absPath: string): Promise<SessionFileEnt
|
||||
const safe = redactSensitiveText(text, { mode: "tools" });
|
||||
const label = message.role === "user" ? "User" : "Assistant";
|
||||
collected.push(`${label}: ${safe}`);
|
||||
lineMap.push(jsonlIdx + 1);
|
||||
}
|
||||
const content = collected.join("\n");
|
||||
return {
|
||||
@@ -115,8 +120,9 @@ export async function buildSessionEntry(absPath: string): Promise<SessionFileEnt
|
||||
absPath,
|
||||
mtimeMs: stat.mtimeMs,
|
||||
size: stat.size,
|
||||
hash: hashText(content),
|
||||
hash: hashText(content + "\n" + lineMap.join(",")),
|
||||
content,
|
||||
lineMap,
|
||||
};
|
||||
} catch (err) {
|
||||
log.debug(`Failed reading session file ${absPath}: ${String(err)}`);
|
||||
|
||||
Reference in New Issue
Block a user