memory-neo4j: medium-term fixes — index, batching, parallelism, module extraction

- Add composite index on (agentId, category) for faster filtered queries
- Combine graph search into single UNION Cypher query (was 2 sequential)
- Parallelize conflict resolution with LLM_CONCURRENCY chunks
- Batch entity operations (merge, mentions, relationships, tags, category,
  extraction status) into a single managed transaction
- Make auto-capture fire-and-forget with shared captureMessage helper
- Extract attention-gate.ts and message-utils.ts modules from index.ts
  and extractor.ts for better separation of concerns
- Update tests to match new batched/combined APIs

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Tarun Sukhani
2026-02-09 19:20:14 +08:00
parent 1ae3afbd6b
commit 1f80d4f0d2
7 changed files with 724 additions and 535 deletions

View File

@@ -0,0 +1,151 @@
/**
* Attention gate — lightweight heuristic filter (phase 1 of memory pipeline).
*
* Rejects obvious noise without any LLM call, analogous to how the brain's
* sensory gating filters out irrelevant stimuli before they enter working
* memory. Everything that passes gets stored; the sleep cycle decides what
* matters.
*/
const NOISE_PATTERNS = [
// Greetings / acknowledgments (exact match, with optional punctuation)
/^(hi|hey|hello|yo|sup|ok|okay|sure|thanks|thank you|thx|ty|yep|yup|nope|no|yes|yeah|cool|nice|great|got it|sounds good|perfect|alright|fine|noted|ack|kk|k)\s*[.!?]*$/i,
// Two-word affirmations: "ok great", "sounds good", "yes please", etc.
/^(ok|okay|yes|yeah|yep|sure|no|nope|alright|right|fine|cool|nice|great)\s+(great|good|sure|thanks|please|ok|fine|cool|yeah|perfect|noted|absolutely|definitely|exactly)\s*[.!?]*$/i,
// Deictic: messages that are only pronouns/articles/common verbs — no standalone meaning
// e.g. "I need those", "let me do it", "ok let me test it out", "I got it"
/^(ok[,.]?\s+)?(i('ll|'m|'d|'ve)?\s+)?(just\s+)?(need|want|got|have|let|let's|let me|give me|send|do|did|try|check|see|look at|test|take|get|go|use)\s+(it|that|this|those|these|them|some|one|the|a|an|me|him|her|us)\s*(out|up|now|then|too|again|later|first|here|there|please)?\s*[.!?]*$/i,
// Short acknowledgments with trailing context: "ok, ..." / "yes, ..." when total is brief
/^(ok|okay|yes|yeah|yep|sure|no|nope|right|alright|fine|cool|nice|great|perfect)[,.]?\s+.{0,20}$/i,
// Conversational filler / noise phrases (standalone, with optional punctuation)
/^(hmm+|huh|haha|ha|lol|lmao|rofl|nah|meh|idk|brb|ttyl|omg|wow|whoa|welp|oops|ooh|aah|ugh|bleh|pfft|smh|ikr|tbh|imo|fwiw|np|nvm|nm|wut|wat|wha|heh|tsk|sigh|yay|woo+|boo|dang|darn|geez|gosh|sheesh|oof)\s*[.!?]*$/i,
// Single-word or near-empty
/^\S{0,3}$/,
// Pure emoji
/^[\p{Emoji}\s]+$/u,
// System/XML markup
/^<[a-z-]+>[\s\S]*<\/[a-z-]+>$/i,
// --- Session reset prompts (from /new and /reset commands) ---
/^A new session was started via/i,
// --- System infrastructure messages (never user-generated) ---
// Heartbeat prompts
/Read HEARTBEAT\.md if it exists/i,
// Pre-compaction flush prompts
/^Pre-compaction memory flush/i,
// System timestamp messages (cron outputs, reminders, exec reports)
/^System:\s*\[/i,
// Cron job wrappers
/^\[cron:[0-9a-f-]+/i,
// Gateway restart JSON payloads
/^GatewayRestart:\s*\{/i,
// Background task completion reports
/^\[\w{3}\s+\d{4}-\d{2}-\d{2}\s.*\]\s*A background task/i,
];
/** Maximum message length — code dumps, logs, etc. are not memories. */
const MAX_CAPTURE_CHARS = 2000;
/** Minimum message length — too short to be meaningful. */
const MIN_CAPTURE_CHARS = 30;
/** Minimum word count — short contextual phrases lack standalone meaning. */
const MIN_WORD_COUNT = 5;
export function passesAttentionGate(text: string): boolean {
const trimmed = text.trim();
// Length bounds
if (trimmed.length < MIN_CAPTURE_CHARS || trimmed.length > MAX_CAPTURE_CHARS) {
return false;
}
// Word count — short phrases ("I need those") lack context for recall
const wordCount = trimmed.split(/\s+/).length;
if (wordCount < MIN_WORD_COUNT) {
return false;
}
// Injected context from the memory system itself
if (trimmed.includes("<relevant-memories>") || trimmed.includes("<core-memory-refresh>")) {
return false;
}
// Noise patterns
if (NOISE_PATTERNS.some((r) => r.test(trimmed))) {
return false;
}
// Excessive emoji (likely reaction, not substance)
const emojiCount = (trimmed.match(/[\u{1F300}-\u{1F9FF}]/gu) || []).length;
if (emojiCount > 3) {
return false;
}
// Passes gate — retain for short-term storage
return true;
}
// ============================================================================
// Assistant attention gate — stricter filter for assistant messages
// ============================================================================
/** Maximum assistant message length — shorter than user to avoid code dumps. */
const MAX_ASSISTANT_CAPTURE_CHARS = 1000;
/** Minimum word count for assistant messages — higher than user. */
const MIN_ASSISTANT_WORD_COUNT = 10;
export function passesAssistantAttentionGate(text: string): boolean {
const trimmed = text.trim();
// Length bounds (stricter than user)
if (trimmed.length < MIN_CAPTURE_CHARS || trimmed.length > MAX_ASSISTANT_CAPTURE_CHARS) {
return false;
}
// Word count — higher threshold than user messages
const wordCount = trimmed.split(/\s+/).length;
if (wordCount < MIN_ASSISTANT_WORD_COUNT) {
return false;
}
// Reject messages that are mostly code (>50% inside triple-backtick fences)
const codeBlockRegex = /```[\s\S]*?```/g;
let codeChars = 0;
let match: RegExpExecArray | null;
while ((match = codeBlockRegex.exec(trimmed)) !== null) {
codeChars += match[0].length;
}
if (codeChars > trimmed.length * 0.5) {
return false;
}
// Reject messages that are mostly tool output
if (
trimmed.includes("<tool_result>") ||
trimmed.includes("<tool_use>") ||
trimmed.includes("<function_call>")
) {
return false;
}
// Injected context from the memory system itself
if (trimmed.includes("<relevant-memories>") || trimmed.includes("<core-memory-refresh>")) {
return false;
}
// Noise patterns (same as user gate)
if (NOISE_PATTERNS.some((r) => r.test(trimmed))) {
return false;
}
// Excessive emoji (likely reaction, not substance)
const emojiCount = (trimmed.match(/[\u{1F300}-\u{1F9FF}]/gu) || []).length;
if (emojiCount > 3) {
return false;
}
return true;
}

View File

@@ -744,11 +744,7 @@ describe("runBackgroundExtraction", () => {
let mockDb: {
updateExtractionStatus: ReturnType<typeof vi.fn>;
mergeEntity: ReturnType<typeof vi.fn>;
createMentions: ReturnType<typeof vi.fn>;
createEntityRelationship: ReturnType<typeof vi.fn>;
tagMemory: ReturnType<typeof vi.fn>;
updateMemoryCategory: ReturnType<typeof vi.fn>;
batchEntityOperations: ReturnType<typeof vi.fn>;
};
let mockEmbeddings: {
@@ -766,11 +762,7 @@ describe("runBackgroundExtraction", () => {
};
mockDb = {
updateExtractionStatus: vi.fn().mockResolvedValue(undefined),
mergeEntity: vi.fn().mockResolvedValue(undefined),
createMentions: vi.fn().mockResolvedValue(undefined),
createEntityRelationship: vi.fn().mockResolvedValue(undefined),
tagMemory: vi.fn().mockResolvedValue(undefined),
updateMemoryCategory: vi.fn().mockResolvedValue(undefined),
batchEntityOperations: vi.fn().mockResolvedValue(undefined),
};
mockEmbeddings = {
embed: vi.fn().mockResolvedValue([0.1, 0.2, 0.3]),
@@ -857,7 +849,7 @@ describe("runBackgroundExtraction", () => {
expect(mockDb.updateExtractionStatus).toHaveBeenCalledWith("mem-1", "complete");
});
it("should merge entities, create mentions, and mark complete", async () => {
it("should batch entities, relationships, tags, and category in one call", async () => {
mockFetchResponse(
JSON.stringify({
category: "fact",
@@ -876,18 +868,16 @@ describe("runBackgroundExtraction", () => {
mockLogger,
);
expect(mockDb.mergeEntity).toHaveBeenCalledWith(
expect.objectContaining({
name: "alice",
type: "person",
}),
expect(mockDb.batchEntityOperations).toHaveBeenCalledWith(
"mem-1",
[expect.objectContaining({ name: "alice", type: "person" })],
[],
[],
"fact",
);
expect(mockDb.createMentions).toHaveBeenCalledWith("mem-1", "alice", "context", 1.0);
expect(mockDb.updateMemoryCategory).toHaveBeenCalledWith("mem-1", "fact");
expect(mockDb.updateExtractionStatus).toHaveBeenCalledWith("mem-1", "complete");
});
it("should create entity relationships", async () => {
it("should pass relationships to batchEntityOperations", async () => {
mockFetchResponse(
JSON.stringify({
entities: [
@@ -908,10 +898,19 @@ describe("runBackgroundExtraction", () => {
mockLogger,
);
expect(mockDb.createEntityRelationship).toHaveBeenCalledWith("alice", "acme", "WORKS_AT", 0.9);
expect(mockDb.batchEntityOperations).toHaveBeenCalledWith(
"mem-1",
expect.arrayContaining([
expect.objectContaining({ name: "alice", type: "person" }),
expect.objectContaining({ name: "acme", type: "organization" }),
]),
[{ source: "alice", target: "acme", type: "WORKS_AT", confidence: 0.9 }],
[],
undefined,
);
});
it("should tag memories", async () => {
it("should pass tags to batchEntityOperations", async () => {
mockFetchResponse(
JSON.stringify({
entities: [],
@@ -929,10 +928,16 @@ describe("runBackgroundExtraction", () => {
mockLogger,
);
expect(mockDb.tagMemory).toHaveBeenCalledWith("mem-1", "programming", "tech");
expect(mockDb.batchEntityOperations).toHaveBeenCalledWith(
"mem-1",
[],
[],
[{ name: "programming", category: "tech" }],
undefined,
);
});
it("should not update category when result has no category", async () => {
it("should pass undefined category when result has no category", async () => {
mockFetchResponse(
JSON.stringify({
entities: [{ name: "Test", type: "concept" }],
@@ -950,10 +955,16 @@ describe("runBackgroundExtraction", () => {
mockLogger,
);
expect(mockDb.updateMemoryCategory).not.toHaveBeenCalled();
expect(mockDb.batchEntityOperations).toHaveBeenCalledWith(
"mem-1",
[expect.objectContaining({ name: "test", type: "concept" })],
[],
[],
undefined,
);
});
it("should handle entity merge failure gracefully", async () => {
it("should handle batchEntityOperations failure gracefully", async () => {
mockFetchResponse(
JSON.stringify({
entities: [
@@ -965,9 +976,7 @@ describe("runBackgroundExtraction", () => {
}),
);
// First entity merge fails, second succeeds
mockDb.mergeEntity.mockRejectedValueOnce(new Error("merge failed"));
mockDb.mergeEntity.mockResolvedValueOnce(undefined);
mockDb.batchEntityOperations.mockRejectedValueOnce(new Error("batch failed"));
await runBackgroundExtraction(
"mem-1",
@@ -978,9 +987,8 @@ describe("runBackgroundExtraction", () => {
mockLogger,
);
// Should still continue and complete
expect(mockDb.mergeEntity).toHaveBeenCalledTimes(2);
expect(mockDb.updateExtractionStatus).toHaveBeenCalledWith("mem-1", "complete");
// Should handle error and mark as failed
expect(mockDb.batchEntityOperations).toHaveBeenCalledTimes(1);
expect(mockLogger.warn).toHaveBeenCalled();
});

View File

@@ -361,50 +361,22 @@ export async function runBackgroundExtraction(
return;
}
// MERGE Entity nodes (entities use fulltext search, not vector embeddings)
for (const entity of result.entities) {
try {
await db.mergeEntity({
id: randomUUID(),
name: entity.name,
type: entity.type,
aliases: entity.aliases,
description: entity.description,
});
// Batch all entity operations into a single transaction:
// entity merges, mentions, relationships, tags, category, and extraction status
await db.batchEntityOperations(
memoryId,
result.entities.map((e) => ({
id: randomUUID(),
name: e.name,
type: e.type,
aliases: e.aliases,
description: e.description,
})),
result.relationships,
result.tags,
result.category,
);
// Create MENTIONS relationship
await db.createMentions(memoryId, entity.name, "context", 1.0);
} catch (err) {
logger.warn(`memory-neo4j: entity merge failed for "${entity.name}": ${String(err)}`);
}
}
// Create inter-Entity relationships
for (const rel of result.relationships) {
try {
await db.createEntityRelationship(rel.source, rel.target, rel.type, rel.confidence);
} catch (err) {
logger.debug?.(
`memory-neo4j: relationship creation failed: ${rel.source}->${rel.target}: ${String(err)}`,
);
}
}
// Tag the memory
for (const tag of result.tags) {
try {
await db.tagMemory(memoryId, tag.name, tag.category);
} catch (err) {
logger.debug?.(`memory-neo4j: tagging failed for "${tag.name}": ${String(err)}`);
}
}
// Update category if the LLM classified it (only overwrites 'other')
if (result.category) {
await db.updateMemoryCategory(memoryId, result.category);
}
await db.updateExtractionStatus(memoryId, "complete");
logger.info(
`memory-neo4j: extraction complete for ${memoryId.slice(0, 8)}` +
`${result.entities.length} entities, ${result.relationships.length} rels, ${result.tags.length} tags` +
@@ -761,26 +733,42 @@ export async function runSleepCycle(
const pairs = await db.findConflictingMemories(agentId);
result.conflict.pairsFound = pairs.length;
for (const pair of pairs) {
if (abortSignal?.aborted) break;
// Process conflict pairs in parallel chunks of LLM_CONCURRENCY
for (let i = 0; i < pairs.length && !abortSignal?.aborted; i += LLM_CONCURRENCY) {
const chunk = pairs.slice(i, i + LLM_CONCURRENCY);
const outcomes = await Promise.allSettled(
chunk.map((pair) => resolveConflict(pair.memoryA.text, pair.memoryB.text, config)),
);
const decision = await resolveConflict(pair.memoryA.text, pair.memoryB.text, config);
for (let k = 0; k < outcomes.length; k++) {
if (abortSignal?.aborted) break;
const pair = chunk[k];
const outcome = outcomes[k];
if (outcome.status !== "fulfilled") continue;
if (decision === "a") {
await db.invalidateMemory(pair.memoryB.id);
result.conflict.invalidated++;
result.conflict.resolved++;
onProgress?.("conflict", `Kept A, invalidated B: "${pair.memoryB.text.slice(0, 40)}..."`);
} else if (decision === "b") {
await db.invalidateMemory(pair.memoryA.id);
result.conflict.invalidated++;
result.conflict.resolved++;
onProgress?.("conflict", `Kept B, invalidated A: "${pair.memoryA.text.slice(0, 40)}..."`);
} else if (decision === "both") {
result.conflict.resolved++;
onProgress?.("conflict", `Kept both: no real conflict`);
const decision = outcome.value;
if (decision === "a") {
await db.invalidateMemory(pair.memoryB.id);
result.conflict.invalidated++;
result.conflict.resolved++;
onProgress?.(
"conflict",
`Kept A, invalidated B: "${pair.memoryB.text.slice(0, 40)}..."`,
);
} else if (decision === "b") {
await db.invalidateMemory(pair.memoryA.id);
result.conflict.invalidated++;
result.conflict.resolved++;
onProgress?.(
"conflict",
`Kept B, invalidated A: "${pair.memoryA.text.slice(0, 40)}..."`,
);
} else if (decision === "both") {
result.conflict.resolved++;
onProgress?.("conflict", `Kept both: no real conflict`);
}
// "skip" = LLM unavailable, don't count as resolved
}
// "skip" = LLM unavailable, don't count as resolved
}
logger.info(
@@ -1051,143 +1039,15 @@ export async function runSleepCycle(
}
// ============================================================================
// Message Extraction Helper
// Message Extraction (re-exported from message-utils.ts)
// ============================================================================
/**
* Extract user message texts from the event.messages array.
* Handles both string content and content block arrays.
*/
export function extractUserMessages(messages: unknown[]): string[] {
const texts: string[] = [];
for (const msg of messages) {
if (!msg || typeof msg !== "object") {
continue;
}
const msgObj = msg as Record<string, unknown>;
// Only process user messages for auto-capture
if (msgObj.role !== "user") {
continue;
}
const content = msgObj.content;
if (typeof content === "string") {
texts.push(content);
continue;
}
if (Array.isArray(content)) {
for (const block of content) {
if (
block &&
typeof block === "object" &&
"type" in block &&
(block as Record<string, unknown>).type === "text" &&
"text" in block &&
typeof (block as Record<string, unknown>).text === "string"
) {
texts.push((block as Record<string, unknown>).text as string);
}
}
}
}
// Strip wrappers then filter by length
return texts.map(stripMessageWrappers).filter((t) => t.length >= 10);
}
/**
* Strip injected context, channel metadata wrappers, and system prefixes
* so the attention gate sees only the raw user text.
* Exported for use by the cleanup command.
*/
export function stripMessageWrappers(text: string): string {
let s = text;
// Injected context from memory system
s = s.replace(/<relevant-memories>[\s\S]*?<\/relevant-memories>\s*/g, "");
s = s.replace(/<core-memory-refresh>[\s\S]*?<\/core-memory-refresh>\s*/g, "");
s = s.replace(/<system>[\s\S]*?<\/system>\s*/g, "");
// File attachments (PDFs, images, etc. forwarded inline by channels)
s = s.replace(/<file\b[^>]*>[\s\S]*?<\/file>\s*/g, "");
// Media attachment preamble (appears before Telegram wrapper)
s = s.replace(/^\[media attached:[^\]]*\]\s*(?:To send an image[^\n]*\n?)*/i, "");
// System exec output blocks (may appear before Telegram wrapper)
s = s.replace(/^(?:System:\s*\[[^\]]*\][^\n]*\n?)+/gi, "");
// Telegram wrapper — may now be at start after previous strips
s = s.replace(/^\s*\[Telegram\s[^\]]+\]\s*/i, "");
// "[message_id: NNN]" suffix (Telegram)
s = s.replace(/\n?\[message_id:\s*\d+\]\s*$/i, "");
// Slack wrapper — "[Slack <workspace> #channel @user] MESSAGE [slack message id: ...]"
s = s.replace(/^\s*\[Slack\s[^\]]+\]\s*/i, "");
s = s.replace(/\n?\[slack message id:\s*[^\]]*\]\s*$/i, "");
return s.trim();
}
// ============================================================================
// Assistant Message Extraction
// ============================================================================
/**
* Strip tool-use, thinking, and code-output blocks from assistant messages
* so the attention gate sees only the substantive assistant text.
*/
export function stripAssistantWrappers(text: string): string {
let s = text;
// Tool-use / tool-result / function_call blocks
s = s.replace(/<tool_use>[\s\S]*?<\/tool_use>\s*/g, "");
s = s.replace(/<tool_result>[\s\S]*?<\/tool_result>\s*/g, "");
s = s.replace(/<function_call>[\s\S]*?<\/function_call>\s*/g, "");
// Thinking tags
s = s.replace(/<thinking>[\s\S]*?<\/thinking>\s*/g, "");
s = s.replace(/<antThinking>[\s\S]*?<\/antThinking>\s*/g, "");
// Code execution output
s = s.replace(/<code_output>[\s\S]*?<\/code_output>\s*/g, "");
return s.trim();
}
/**
* Extract assistant message texts from the event.messages array.
* Handles both string content and content block arrays.
*/
export function extractAssistantMessages(messages: unknown[]): string[] {
const texts: string[] = [];
for (const msg of messages) {
if (!msg || typeof msg !== "object") {
continue;
}
const msgObj = msg as Record<string, unknown>;
if (msgObj.role !== "assistant") {
continue;
}
const content = msgObj.content;
if (typeof content === "string") {
texts.push(content);
continue;
}
if (Array.isArray(content)) {
for (const block of content) {
if (
block &&
typeof block === "object" &&
"type" in block &&
(block as Record<string, unknown>).type === "text" &&
"text" in block &&
typeof (block as Record<string, unknown>).text === "string"
) {
texts.push((block as Record<string, unknown>).text as string);
}
}
}
}
return texts.map(stripAssistantWrappers).filter((t) => t.length >= 10);
}
export {
extractUserMessages,
extractAssistantMessages,
stripMessageWrappers,
stripAssistantWrappers,
} from "./message-utils.js";
// ============================================================================
// LLM-Judged Importance Rating

View File

@@ -17,6 +17,7 @@ import { Type } from "@sinclair/typebox";
import { randomUUID } from "node:crypto";
import { stringEnum } from "openclaw/plugin-sdk";
import type { MemoryCategory, MemorySource } from "./schema.js";
import { passesAttentionGate, passesAssistantAttentionGate } from "./attention-gate.js";
import {
DEFAULT_EMBEDDING_DIMS,
EMBEDDING_DIMENSIONS,
@@ -1104,7 +1105,7 @@ const memoryNeo4jPlugin = {
);
if (cfg.autoCapture) {
api.logger.debug?.("memory-neo4j: registering agent_end hook for auto-capture");
api.on("agent_end", async (event, ctx) => {
api.on("agent_end", (event, ctx) => {
api.logger.debug?.(
`memory-neo4j: agent_end fired (success=${event.success}, messages=${event.messages?.length ?? 0})`,
);
@@ -1116,141 +1117,17 @@ const memoryNeo4jPlugin = {
const agentId = ctx.agentId || "default";
const sessionKey = ctx.sessionKey;
try {
let stored = 0;
// Process user messages
const userMessages = extractUserMessages(event.messages);
const retained = userMessages.filter((text) => passesAttentionGate(text));
let semanticDeduped = 0;
for (const text of retained) {
try {
const vector = await embeddings.embed(text);
// Quick dedup (same content already stored — cosine ≥ 0.95)
const existing = await db.findSimilar(vector, 0.95, 1);
if (existing.length > 0) {
continue;
}
// Importance rating — moved before semantic dedup to avoid expensive LLM calls on low-value memories
const importance = await rateImportance(text, extractionConfig);
// Skip low-importance memories (not worth the semantic dedup cost)
if (importance < 0.3) {
continue;
}
// Semantic dedup: check moderate-similarity memories (0.750.95)
// with LLM to catch paraphrases and reformulations
const candidates = await db.findSimilar(vector, 0.75, 3);
if (candidates.length > 0) {
let isDuplicate = false;
for (const candidate of candidates) {
if (await isSemanticDuplicate(text, candidate.text, extractionConfig)) {
api.logger.debug?.(
`memory-neo4j: semantic dedup — skipped "${text.slice(0, 60)}..." (duplicate of "${candidate.text.slice(0, 60)}...")`,
);
isDuplicate = true;
semanticDeduped++;
break;
}
}
if (isDuplicate) {
continue;
}
}
await db.storeMemory({
id: randomUUID(),
text,
embedding: vector,
importance,
category: "other", // sleep cycle will categorize
source: "auto-capture",
extractionStatus: extractionConfig.enabled ? "pending" : "skipped",
agentId,
sessionKey,
});
stored++;
} catch (err) {
api.logger.debug?.(`memory-neo4j: auto-capture item failed: ${String(err)}`);
}
}
// Process assistant messages
const assistantMessages = extractAssistantMessages(event.messages);
const retainedAssistant = assistantMessages.filter((text) =>
passesAssistantAttentionGate(text),
);
for (const text of retainedAssistant) {
try {
const importance = await rateImportance(text, extractionConfig);
// Only store assistant messages that are genuinely important
if (importance < 0.7) {
continue;
}
const vector = await embeddings.embed(text);
const existing = await db.findSimilar(vector, 0.95, 1);
if (existing.length > 0) {
continue;
}
// Semantic dedup for assistant messages too
const candidates = await db.findSimilar(vector, 0.75, 3);
if (candidates.length > 0) {
let isDuplicate = false;
for (const candidate of candidates) {
if (await isSemanticDuplicate(text, candidate.text, extractionConfig)) {
api.logger.debug?.(
`memory-neo4j: semantic dedup (assistant) — skipped "${text.slice(0, 60)}..."`,
);
isDuplicate = true;
semanticDeduped++;
break;
}
}
if (isDuplicate) {
continue;
}
}
await db.storeMemory({
id: randomUUID(),
text,
embedding: vector,
importance: importance * 0.75, // discount assistant importance proportionally
category: "other",
source: "auto-capture-assistant",
extractionStatus: extractionConfig.enabled ? "pending" : "skipped",
agentId,
sessionKey,
});
stored++;
} catch (err) {
api.logger.debug?.(
`memory-neo4j: assistant auto-capture item failed: ${String(err)}`,
);
}
}
if (stored > 0 || semanticDeduped > 0) {
api.logger.info(
`memory-neo4j: auto-captured ${stored} memories (attention-gated)${semanticDeduped > 0 ? `, ${semanticDeduped} semantic dupes skipped` : ""}`,
);
} else if (userMessages.length > 0 || assistantMessages.length > 0) {
api.logger.info(
`memory-neo4j: auto-capture ran (0 stored, ${userMessages.length} user msgs, ${retained.length} passed gate, ${assistantMessages.length} assistant msgs, ${retainedAssistant.length} passed gate)`,
);
}
} catch (err) {
api.logger.warn(`memory-neo4j: auto-capture failed: ${String(err)}`);
}
// Fire-and-forget: run auto-capture asynchronously so it doesn't
// block the agent_end hook (which otherwise adds 2-10s per turn).
void runAutoCapture(
event.messages,
agentId,
sessionKey,
db,
embeddings,
extractionConfig,
api.logger,
);
});
}
@@ -1283,159 +1160,170 @@ const memoryNeo4jPlugin = {
};
// ============================================================================
// Attention gate — lightweight heuristic filter (phase 1 of memory pipeline)
//
// Rejects obvious noise without any LLM call, analogous to how the brain's
// sensory gating filters out irrelevant stimuli before they enter working
// memory. Everything that passes gets stored; the sleep cycle decides what
// matters.
// Auto-capture pipeline (fire-and-forget from agent_end hook)
// ============================================================================
const NOISE_PATTERNS = [
// Greetings / acknowledgments (exact match, with optional punctuation)
/^(hi|hey|hello|yo|sup|ok|okay|sure|thanks|thank you|thx|ty|yep|yup|nope|no|yes|yeah|cool|nice|great|got it|sounds good|perfect|alright|fine|noted|ack|kk|k)\s*[.!?]*$/i,
// Two-word affirmations: "ok great", "sounds good", "yes please", etc.
/^(ok|okay|yes|yeah|yep|sure|no|nope|alright|right|fine|cool|nice|great)\s+(great|good|sure|thanks|please|ok|fine|cool|yeah|perfect|noted|absolutely|definitely|exactly)\s*[.!?]*$/i,
// Deictic: messages that are only pronouns/articles/common verbs — no standalone meaning
// e.g. "I need those", "let me do it", "ok let me test it out", "I got it"
/^(ok[,.]?\s+)?(i('ll|'m|'d|'ve)?\s+)?(just\s+)?(need|want|got|have|let|let's|let me|give me|send|do|did|try|check|see|look at|test|take|get|go|use)\s+(it|that|this|those|these|them|some|one|the|a|an|me|him|her|us)\s*(out|up|now|then|too|again|later|first|here|there|please)?\s*[.!?]*$/i,
// Short acknowledgments with trailing context: "ok, ..." / "yes, ..." when total is brief
/^(ok|okay|yes|yeah|yep|sure|no|nope|right|alright|fine|cool|nice|great|perfect)[,.]?\s+.{0,20}$/i,
// Conversational filler / noise phrases (standalone, with optional punctuation)
/^(hmm+|huh|haha|ha|lol|lmao|rofl|nah|meh|idk|brb|ttyl|omg|wow|whoa|welp|oops|ooh|aah|ugh|bleh|pfft|smh|ikr|tbh|imo|fwiw|np|nvm|nm|wut|wat|wha|heh|tsk|sigh|yay|woo+|boo|dang|darn|geez|gosh|sheesh|oof)\s*[.!?]*$/i,
// Single-word or near-empty
/^\S{0,3}$/,
// Pure emoji
/^[\p{Emoji}\s]+$/u,
// System/XML markup
/^<[a-z-]+>[\s\S]*<\/[a-z-]+>$/i,
type AutoCaptureLogger = {
info: (msg: string) => void;
warn: (msg: string) => void;
debug?: (msg: string) => void;
};
// --- Session reset prompts (from /new and /reset commands) ---
/^A new session was started via/i,
/**
* Shared capture logic for both user and assistant messages.
* Extracts the common embed → dedup → rate → store pipeline.
*/
async function captureMessage(
text: string,
source: "auto-capture" | "auto-capture-assistant",
importanceThreshold: number,
importanceDiscount: number,
agentId: string,
sessionKey: string | undefined,
db: import("./neo4j-client.js").Neo4jMemoryClient,
embeddings: import("./embeddings.js").Embeddings,
extractionConfig: import("./config.js").ExtractionConfig,
logger: AutoCaptureLogger,
): Promise<{ stored: boolean; semanticDeduped: boolean }> {
// For assistant messages, rate importance first (before embedding) to skip early
const rateFirst = source === "auto-capture-assistant";
// --- System infrastructure messages (never user-generated) ---
// Heartbeat prompts
/Read HEARTBEAT\.md if it exists/i,
// Pre-compaction flush prompts
/^Pre-compaction memory flush/i,
// System timestamp messages (cron outputs, reminders, exec reports)
/^System:\s*\[/i,
// Cron job wrappers
/^\[cron:[0-9a-f-]+/i,
// Gateway restart JSON payloads
/^GatewayRestart:\s*\{/i,
// Background task completion reports
/^\[\w{3}\s+\d{4}-\d{2}-\d{2}\s.*\]\s*A background task/i,
];
/** Maximum message length — code dumps, logs, etc. are not memories. */
const MAX_CAPTURE_CHARS = 2000;
/** Minimum message length — too short to be meaningful. */
const MIN_CAPTURE_CHARS = 30;
/** Minimum word count — short contextual phrases lack standalone meaning. */
const MIN_WORD_COUNT = 5;
function passesAttentionGate(text: string): boolean {
const trimmed = text.trim();
// Length bounds
if (trimmed.length < MIN_CAPTURE_CHARS || trimmed.length > MAX_CAPTURE_CHARS) {
return false;
let importance: number | undefined;
if (rateFirst) {
importance = await rateImportance(text, extractionConfig);
if (importance < importanceThreshold) {
return { stored: false, semanticDeduped: false };
}
}
// Word count — short phrases ("I need those") lack context for recall
const wordCount = trimmed.split(/\s+/).length;
if (wordCount < MIN_WORD_COUNT) {
return false;
const vector = await embeddings.embed(text);
// Quick dedup (same content already stored — cosine >= 0.95)
const existing = await db.findSimilar(vector, 0.95, 1);
if (existing.length > 0) {
return { stored: false, semanticDeduped: false };
}
// Injected context from the memory system itself
if (trimmed.includes("<relevant-memories>") || trimmed.includes("<core-memory-refresh>")) {
return false;
// Rate importance if not already done
if (importance === undefined) {
importance = await rateImportance(text, extractionConfig);
if (importance < importanceThreshold) {
return { stored: false, semanticDeduped: false };
}
}
// Noise patterns
if (NOISE_PATTERNS.some((r) => r.test(trimmed))) {
return false;
// Semantic dedup: check moderate-similarity memories (0.75-0.95)
const candidates = await db.findSimilar(vector, 0.75, 3);
if (candidates.length > 0) {
for (const candidate of candidates) {
if (await isSemanticDuplicate(text, candidate.text, extractionConfig)) {
logger.debug?.(
`memory-neo4j: semantic dedup — skipped "${text.slice(0, 60)}..." (duplicate of "${candidate.text.slice(0, 60)}...")`,
);
return { stored: false, semanticDeduped: true };
}
}
}
// Excessive emoji (likely reaction, not substance)
const emojiCount = (trimmed.match(/[\u{1F300}-\u{1F9FF}]/gu) || []).length;
if (emojiCount > 3) {
return false;
}
// Passes gate — retain for short-term storage
return true;
await db.storeMemory({
id: randomUUID(),
text,
embedding: vector,
importance: importance * importanceDiscount,
category: "other",
source,
extractionStatus: extractionConfig.enabled ? "pending" : "skipped",
agentId,
sessionKey,
});
return { stored: true, semanticDeduped: false };
}
// ============================================================================
// Assistant attention gate — stricter filter for assistant messages
// ============================================================================
/**
* Run the full auto-capture pipeline asynchronously.
* Processes user and assistant messages through attention gate → capture.
*/
async function runAutoCapture(
messages: unknown[],
agentId: string,
sessionKey: string | undefined,
db: import("./neo4j-client.js").Neo4jMemoryClient,
embeddings: import("./embeddings.js").Embeddings,
extractionConfig: import("./config.js").ExtractionConfig,
logger: AutoCaptureLogger,
): Promise<void> {
try {
let stored = 0;
let semanticDeduped = 0;
/** Maximum assistant message length — shorter than user to avoid code dumps. */
const MAX_ASSISTANT_CAPTURE_CHARS = 1000;
// Process user messages
const userMessages = extractUserMessages(messages);
const retained = userMessages.filter((text) => passesAttentionGate(text));
/** Minimum word count for assistant messages — higher than user. */
const MIN_ASSISTANT_WORD_COUNT = 10;
for (const text of retained) {
try {
const result = await captureMessage(
text,
"auto-capture",
0.3,
1.0,
agentId,
sessionKey,
db,
embeddings,
extractionConfig,
logger,
);
if (result.stored) stored++;
if (result.semanticDeduped) semanticDeduped++;
} catch (err) {
logger.debug?.(`memory-neo4j: auto-capture item failed: ${String(err)}`);
}
}
function passesAssistantAttentionGate(text: string): boolean {
const trimmed = text.trim();
// Process assistant messages
const assistantMessages = extractAssistantMessages(messages);
const retainedAssistant = assistantMessages.filter((text) =>
passesAssistantAttentionGate(text),
);
// Length bounds (stricter than user)
if (trimmed.length < MIN_CAPTURE_CHARS || trimmed.length > MAX_ASSISTANT_CAPTURE_CHARS) {
return false;
for (const text of retainedAssistant) {
try {
const result = await captureMessage(
text,
"auto-capture-assistant",
0.7,
0.75,
agentId,
sessionKey,
db,
embeddings,
extractionConfig,
logger,
);
if (result.stored) stored++;
if (result.semanticDeduped) semanticDeduped++;
} catch (err) {
logger.debug?.(`memory-neo4j: assistant auto-capture item failed: ${String(err)}`);
}
}
if (stored > 0 || semanticDeduped > 0) {
logger.info(
`memory-neo4j: auto-captured ${stored} memories (attention-gated)${semanticDeduped > 0 ? `, ${semanticDeduped} semantic dupes skipped` : ""}`,
);
} else if (userMessages.length > 0 || assistantMessages.length > 0) {
logger.info(
`memory-neo4j: auto-capture ran (0 stored, ${userMessages.length} user msgs, ${retained.length} passed gate, ${assistantMessages.length} assistant msgs, ${retainedAssistant.length} passed gate)`,
);
}
} catch (err) {
logger.warn(`memory-neo4j: auto-capture failed: ${String(err)}`);
}
// Word count — higher threshold than user messages
const wordCount = trimmed.split(/\s+/).length;
if (wordCount < MIN_ASSISTANT_WORD_COUNT) {
return false;
}
// Reject messages that are mostly code (>50% inside triple-backtick fences)
const codeBlockRegex = /```[\s\S]*?```/g;
let codeChars = 0;
let match: RegExpExecArray | null;
while ((match = codeBlockRegex.exec(trimmed)) !== null) {
codeChars += match[0].length;
}
if (codeChars > trimmed.length * 0.5) {
return false;
}
// Reject messages that are mostly tool output
if (
trimmed.includes("<tool_result>") ||
trimmed.includes("<tool_use>") ||
trimmed.includes("<function_call>")
) {
return false;
}
// Injected context from the memory system itself
if (trimmed.includes("<relevant-memories>") || trimmed.includes("<core-memory-refresh>")) {
return false;
}
// Noise patterns (same as user gate)
if (NOISE_PATTERNS.some((r) => r.test(trimmed))) {
return false;
}
// Excessive emoji (likely reaction, not substance)
const emojiCount = (trimmed.match(/[\u{1F300}-\u{1F9FF}]/gu) || []).length;
if (emojiCount > 3) {
return false;
}
return true;
}
// Exported for testing
export { passesAttentionGate, passesAssistantAttentionGate };
// Re-export attention gate for backwards compatibility (tests import from here)
export { passesAttentionGate, passesAssistantAttentionGate } from "./attention-gate.js";
// ============================================================================
// Export

View File

@@ -0,0 +1,147 @@
/**
* Message extraction utilities for the memory pipeline.
*
* Extracts and cleans user/assistant messages from the raw event.messages
* array, stripping channel wrappers, injected context, tool output, and
* other noise so downstream consumers (attention gate, memory store) see
* only the substantive text.
*/
// ============================================================================
// User Message Extraction
// ============================================================================
/**
* Extract user message texts from the event.messages array.
* Handles both string content and content block arrays.
*/
export function extractUserMessages(messages: unknown[]): string[] {
const texts: string[] = [];
for (const msg of messages) {
if (!msg || typeof msg !== "object") {
continue;
}
const msgObj = msg as Record<string, unknown>;
// Only process user messages for auto-capture
if (msgObj.role !== "user") {
continue;
}
const content = msgObj.content;
if (typeof content === "string") {
texts.push(content);
continue;
}
if (Array.isArray(content)) {
for (const block of content) {
if (
block &&
typeof block === "object" &&
"type" in block &&
(block as Record<string, unknown>).type === "text" &&
"text" in block &&
typeof (block as Record<string, unknown>).text === "string"
) {
texts.push((block as Record<string, unknown>).text as string);
}
}
}
}
// Strip wrappers then filter by length
return texts.map(stripMessageWrappers).filter((t) => t.length >= 10);
}
/**
* Strip injected context, channel metadata wrappers, and system prefixes
* so the attention gate sees only the raw user text.
* Exported for use by the cleanup command.
*/
export function stripMessageWrappers(text: string): string {
let s = text;
// Injected context from memory system
s = s.replace(/<relevant-memories>[\s\S]*?<\/relevant-memories>\s*/g, "");
s = s.replace(/<core-memory-refresh>[\s\S]*?<\/core-memory-refresh>\s*/g, "");
s = s.replace(/<system>[\s\S]*?<\/system>\s*/g, "");
// File attachments (PDFs, images, etc. forwarded inline by channels)
s = s.replace(/<file\b[^>]*>[\s\S]*?<\/file>\s*/g, "");
// Media attachment preamble (appears before Telegram wrapper)
s = s.replace(/^\[media attached:[^\]]*\]\s*(?:To send an image[^\n]*\n?)*/i, "");
// System exec output blocks (may appear before Telegram wrapper)
s = s.replace(/^(?:System:\s*\[[^\]]*\][^\n]*\n?)+/gi, "");
// Telegram wrapper — may now be at start after previous strips
s = s.replace(/^\s*\[Telegram\s[^\]]+\]\s*/i, "");
// "[message_id: NNN]" suffix (Telegram)
s = s.replace(/\n?\[message_id:\s*\d+\]\s*$/i, "");
// Slack wrapper — "[Slack <workspace> #channel @user] MESSAGE [slack message id: ...]"
s = s.replace(/^\s*\[Slack\s[^\]]+\]\s*/i, "");
s = s.replace(/\n?\[slack message id:\s*[^\]]*\]\s*$/i, "");
return s.trim();
}
// ============================================================================
// Assistant Message Extraction
// ============================================================================
/**
* Strip tool-use, thinking, and code-output blocks from assistant messages
* so the attention gate sees only the substantive assistant text.
*/
export function stripAssistantWrappers(text: string): string {
let s = text;
// Tool-use / tool-result / function_call blocks
s = s.replace(/<tool_use>[\s\S]*?<\/tool_use>\s*/g, "");
s = s.replace(/<tool_result>[\s\S]*?<\/tool_result>\s*/g, "");
s = s.replace(/<function_call>[\s\S]*?<\/function_call>\s*/g, "");
// Thinking tags
s = s.replace(/<thinking>[\s\S]*?<\/thinking>\s*/g, "");
s = s.replace(/<antThinking>[\s\S]*?<\/antThinking>\s*/g, "");
// Code execution output
s = s.replace(/<code_output>[\s\S]*?<\/code_output>\s*/g, "");
return s.trim();
}
/**
* Extract assistant message texts from the event.messages array.
* Handles both string content and content block arrays.
*/
export function extractAssistantMessages(messages: unknown[]): string[] {
const texts: string[] = [];
for (const msg of messages) {
if (!msg || typeof msg !== "object") {
continue;
}
const msgObj = msg as Record<string, unknown>;
if (msgObj.role !== "assistant") {
continue;
}
const content = msgObj.content;
if (typeof content === "string") {
texts.push(content);
continue;
}
if (Array.isArray(content)) {
for (const block of content) {
if (
block &&
typeof block === "object" &&
"type" in block &&
(block as Record<string, unknown>).type === "text" &&
"text" in block &&
typeof (block as Record<string, unknown>).text === "string"
) {
texts.push((block as Record<string, unknown>).text as string);
}
}
}
}
return texts.map(stripAssistantWrappers).filter((t) => t.length >= 10);
}

View File

@@ -1375,23 +1375,7 @@ describe("Neo4jMemoryClient", () => {
});
it("should perform graph search with entity traversal", async () => {
// Mock entity search
mockSession.run.mockResolvedValueOnce({
records: [
{
get: vi.fn((key) => {
const data: Record<string, any> = {
entityId: "e1",
name: "tarun",
score: 0.95,
};
return data[key];
}),
},
],
});
// Mock memory search via entities
// Combined single-query now returns memory records directly
mockSession.run.mockResolvedValueOnce({
records: [
{

View File

@@ -154,6 +154,12 @@ export class Neo4jMemoryClient {
session,
"CREATE INDEX entity_name_index IF NOT EXISTS FOR (e:Entity) ON (e.name)",
);
// Composite index for queries that filter by both agentId and category
// (e.g. listByCategory, promotion/demotion filtering in sleep cycle)
await this.runSafe(
session,
"CREATE INDEX memory_agent_category_index IF NOT EXISTS FOR (m:Memory) ON (m.agentId, m.category)",
);
this.logger.info("memory-neo4j: indexes ensured");
} finally {
@@ -523,30 +529,21 @@ export class Neo4jMemoryClient {
return await this.retryOnTransient(async () => {
const session = this.driver!.session();
try {
// Step 1: Find matching entities
const entityResult = await session.run(
`CALL db.index.fulltext.queryNodes('entity_fulltext_index', $query)
YIELD node, score
WHERE score >= 0.5
RETURN node.id AS entityId, node.name AS name, score
ORDER BY score DESC
LIMIT 5`,
{ query: escaped },
);
const entityIds = entityResult.records.map((r) => r.get("entityId") as string);
if (entityIds.length === 0) {
return [];
}
// Step 2 + 3: Direct mentions + 1-hop spreading activation
// Single query: entity fulltext lookup → direct mentions + 1-hop spreading activation
const agentFilter = agentId ? "AND m.agentId = $agentId" : "";
const result = await session.run(
`UNWIND $entityIds AS eid
`// Find matching entities via fulltext index
CALL db.index.fulltext.queryNodes('entity_fulltext_index', $query)
YIELD node AS entity, score
WHERE score >= 0.5
WITH entity
ORDER BY score DESC
LIMIT 5
// Direct: Entity ← MENTIONS ← Memory
OPTIONAL MATCH (e:Entity {id: eid})<-[rm:MENTIONS]-(m:Memory)
OPTIONAL MATCH (entity)<-[rm:MENTIONS]-(m:Memory)
WHERE m IS NOT NULL ${agentFilter}
WITH m, coalesce(rm.confidence, 1.0) AS directScore
WITH m, coalesce(rm.confidence, 1.0) AS directScore, entity
WHERE m IS NOT NULL
RETURN m.id AS id, m.text AS text, m.category AS category,
@@ -555,9 +552,16 @@ export class Neo4jMemoryClient {
UNION
UNWIND $entityIds AS eid
// Find matching entities via fulltext index (repeated for UNION)
CALL db.index.fulltext.queryNodes('entity_fulltext_index', $query)
YIELD node AS entity, score
WHERE score >= 0.5
WITH entity
ORDER BY score DESC
LIMIT 5
// 1-hop: Entity → relationship → Entity ← MENTIONS ← Memory
OPTIONAL MATCH (e:Entity {id: eid})-[r1:${RELATIONSHIP_TYPE_PATTERN}]-(e2:Entity)
OPTIONAL MATCH (entity)-[r1:${RELATIONSHIP_TYPE_PATTERN}]-(e2:Entity)
WHERE coalesce(r1.confidence, 0.7) >= $firingThreshold
OPTIONAL MATCH (e2)<-[rm:MENTIONS]-(m:Memory)
WHERE m IS NOT NULL ${agentFilter}
@@ -567,7 +571,7 @@ export class Neo4jMemoryClient {
RETURN m.id AS id, m.text AS text, m.category AS category,
m.importance AS importance, m.createdAt AS createdAt,
max(hopScore) AS graphScore`,
{ entityIds, firingThreshold, ...(agentId ? { agentId } : {}) },
{ query: escaped, firingThreshold, ...(agentId ? { agentId } : {}) },
);
// Deduplicate by id, keeping highest score
@@ -873,6 +877,153 @@ export class Neo4jMemoryClient {
}
}
/**
* Batch all entity operations from an extraction result into a single managed
* transaction. Replaces the previous pattern of N individual session-per-call
* operations with a single atomic write.
*
* Operations performed atomically:
* 1. MERGE all Entity nodes
* 2. Create MENTIONS relationships (Memory → Entity)
* 3. Create inter-Entity relationships (validated against allowlist)
* 4. MERGE Tag nodes and create TAGGED relationships
* 5. Update memory category (if classified and current is 'other')
* 6. Set extractionStatus to 'complete'
*/
async batchEntityOperations(
memoryId: string,
entities: Array<{
id: string;
name: string;
type: string;
aliases?: string[];
description?: string;
}>,
relationships: Array<{
source: string;
target: string;
type: string;
confidence: number;
}>,
tags: Array<{ name: string; category: string }>,
category?: string,
): Promise<void> {
await this.ensureInitialized();
return this.retryOnTransient(async () => {
const session = this.driver!.session();
try {
await session.executeWrite(async (tx) => {
const now = new Date().toISOString();
// 1. MERGE all entities in one UNWIND
if (entities.length > 0) {
await tx.run(
`UNWIND $entities AS e
MERGE (n:Entity {name: e.name})
ON CREATE SET
n.id = e.id, n.type = e.type, n.aliases = e.aliases,
n.description = e.description,
n.firstSeen = $now, n.lastSeen = $now, n.mentionCount = 1
ON MATCH SET
n.type = COALESCE(e.type, n.type),
n.description = COALESCE(e.description, n.description),
n.lastSeen = $now,
n.mentionCount = n.mentionCount + 1`,
{
entities: entities.map((e) => ({
id: e.id,
name: e.name.trim().toLowerCase(),
type: e.type,
aliases: e.aliases ?? [],
description: e.description ?? null,
})),
now,
},
);
// 2. Create MENTIONS relationships in one UNWIND
await tx.run(
`UNWIND $entityNames AS eName
MATCH (m:Memory {id: $memoryId})
MATCH (e:Entity {name: eName})
MERGE (m)-[r:MENTIONS]->(e)
ON CREATE SET r.role = 'context', r.confidence = 1.0`,
{
memoryId,
entityNames: entities.map((e) => e.name.trim().toLowerCase()),
},
);
}
// 3. Create inter-Entity relationships (filter valid types)
const validRels = relationships.filter((r) => validateRelationshipType(r.type));
if (validRels.length > 0) {
// Group by relationship type since Cypher requires literal rel types
const byType = new Map<string, typeof validRels>();
for (const rel of validRels) {
const group = byType.get(rel.type) ?? [];
group.push(rel);
byType.set(rel.type, group);
}
for (const [relType, rels] of byType) {
await tx.run(
`UNWIND $rels AS r
MATCH (e1:Entity {name: r.source})
MATCH (e2:Entity {name: r.target})
MERGE (e1)-[rel:${relType}]->(e2)
ON CREATE SET rel.confidence = r.confidence, rel.createdAt = $now
ON MATCH SET rel.confidence = CASE WHEN r.confidence > rel.confidence THEN r.confidence ELSE rel.confidence END`,
{
rels: rels.map((r) => ({
source: r.source.trim().toLowerCase(),
target: r.target.trim().toLowerCase(),
confidence: r.confidence,
})),
now,
},
);
}
}
// 4. MERGE Tags and create TAGGED relationships in one UNWIND
if (tags.length > 0) {
await tx.run(
`UNWIND $tags AS t
MERGE (tag:Tag {name: t.name})
ON CREATE SET tag.id = t.id, tag.category = t.category, tag.createdAt = $now
WITH tag, t
MATCH (m:Memory {id: $memoryId})
MERGE (m)-[r:TAGGED]->(tag)
ON CREATE SET r.confidence = 1.0`,
{
memoryId,
tags: tags.map((t) => ({
name: t.name.trim().toLowerCase(),
category: t.category,
id: randomUUID(),
})),
now,
},
);
}
// 5. Update category + 6. Set extraction status (in one statement)
const categoryClause = category
? ", m.category = CASE WHEN m.category = 'other' THEN $category ELSE m.category END"
: "";
await tx.run(
`MATCH (m:Memory {id: $memoryId})
SET m.extractionStatus = 'complete', m.updatedAt = $now${categoryClause}`,
{ memoryId, now, ...(category ? { category } : {}) },
);
});
} finally {
await session.close();
}
});
}
/**
* List memories with pending extraction status.
* Used by the sleep cycle to batch-process extractions.