diff --git a/extensions/memory-neo4j/attention-gate.ts b/extensions/memory-neo4j/attention-gate.ts new file mode 100644 index 0000000000..cd704eb800 --- /dev/null +++ b/extensions/memory-neo4j/attention-gate.ts @@ -0,0 +1,151 @@ +/** + * Attention gate — lightweight heuristic filter (phase 1 of memory pipeline). + * + * Rejects obvious noise without any LLM call, analogous to how the brain's + * sensory gating filters out irrelevant stimuli before they enter working + * memory. Everything that passes gets stored; the sleep cycle decides what + * matters. + */ + +const NOISE_PATTERNS = [ + // Greetings / acknowledgments (exact match, with optional punctuation) + /^(hi|hey|hello|yo|sup|ok|okay|sure|thanks|thank you|thx|ty|yep|yup|nope|no|yes|yeah|cool|nice|great|got it|sounds good|perfect|alright|fine|noted|ack|kk|k)\s*[.!?]*$/i, + // Two-word affirmations: "ok great", "sounds good", "yes please", etc. + /^(ok|okay|yes|yeah|yep|sure|no|nope|alright|right|fine|cool|nice|great)\s+(great|good|sure|thanks|please|ok|fine|cool|yeah|perfect|noted|absolutely|definitely|exactly)\s*[.!?]*$/i, + // Deictic: messages that are only pronouns/articles/common verbs — no standalone meaning + // e.g. "I need those", "let me do it", "ok let me test it out", "I got it" + /^(ok[,.]?\s+)?(i('ll|'m|'d|'ve)?\s+)?(just\s+)?(need|want|got|have|let|let's|let me|give me|send|do|did|try|check|see|look at|test|take|get|go|use)\s+(it|that|this|those|these|them|some|one|the|a|an|me|him|her|us)\s*(out|up|now|then|too|again|later|first|here|there|please)?\s*[.!?]*$/i, + // Short acknowledgments with trailing context: "ok, ..." / "yes, ..." when total is brief + /^(ok|okay|yes|yeah|yep|sure|no|nope|right|alright|fine|cool|nice|great|perfect)[,.]?\s+.{0,20}$/i, + // Conversational filler / noise phrases (standalone, with optional punctuation) + /^(hmm+|huh|haha|ha|lol|lmao|rofl|nah|meh|idk|brb|ttyl|omg|wow|whoa|welp|oops|ooh|aah|ugh|bleh|pfft|smh|ikr|tbh|imo|fwiw|np|nvm|nm|wut|wat|wha|heh|tsk|sigh|yay|woo+|boo|dang|darn|geez|gosh|sheesh|oof)\s*[.!?]*$/i, + // Single-word or near-empty + /^\S{0,3}$/, + // Pure emoji + /^[\p{Emoji}\s]+$/u, + // System/XML markup + /^<[a-z-]+>[\s\S]*<\/[a-z-]+>$/i, + + // --- Session reset prompts (from /new and /reset commands) --- + /^A new session was started via/i, + + // --- System infrastructure messages (never user-generated) --- + // Heartbeat prompts + /Read HEARTBEAT\.md if it exists/i, + // Pre-compaction flush prompts + /^Pre-compaction memory flush/i, + // System timestamp messages (cron outputs, reminders, exec reports) + /^System:\s*\[/i, + // Cron job wrappers + /^\[cron:[0-9a-f-]+/i, + // Gateway restart JSON payloads + /^GatewayRestart:\s*\{/i, + // Background task completion reports + /^\[\w{3}\s+\d{4}-\d{2}-\d{2}\s.*\]\s*A background task/i, +]; + +/** Maximum message length — code dumps, logs, etc. are not memories. */ +const MAX_CAPTURE_CHARS = 2000; + +/** Minimum message length — too short to be meaningful. */ +const MIN_CAPTURE_CHARS = 30; + +/** Minimum word count — short contextual phrases lack standalone meaning. */ +const MIN_WORD_COUNT = 5; + +export function passesAttentionGate(text: string): boolean { + const trimmed = text.trim(); + + // Length bounds + if (trimmed.length < MIN_CAPTURE_CHARS || trimmed.length > MAX_CAPTURE_CHARS) { + return false; + } + + // Word count — short phrases ("I need those") lack context for recall + const wordCount = trimmed.split(/\s+/).length; + if (wordCount < MIN_WORD_COUNT) { + return false; + } + + // Injected context from the memory system itself + if (trimmed.includes("") || trimmed.includes("")) { + return false; + } + + // Noise patterns + if (NOISE_PATTERNS.some((r) => r.test(trimmed))) { + return false; + } + + // Excessive emoji (likely reaction, not substance) + const emojiCount = (trimmed.match(/[\u{1F300}-\u{1F9FF}]/gu) || []).length; + if (emojiCount > 3) { + return false; + } + + // Passes gate — retain for short-term storage + return true; +} + +// ============================================================================ +// Assistant attention gate — stricter filter for assistant messages +// ============================================================================ + +/** Maximum assistant message length — shorter than user to avoid code dumps. */ +const MAX_ASSISTANT_CAPTURE_CHARS = 1000; + +/** Minimum word count for assistant messages — higher than user. */ +const MIN_ASSISTANT_WORD_COUNT = 10; + +export function passesAssistantAttentionGate(text: string): boolean { + const trimmed = text.trim(); + + // Length bounds (stricter than user) + if (trimmed.length < MIN_CAPTURE_CHARS || trimmed.length > MAX_ASSISTANT_CAPTURE_CHARS) { + return false; + } + + // Word count — higher threshold than user messages + const wordCount = trimmed.split(/\s+/).length; + if (wordCount < MIN_ASSISTANT_WORD_COUNT) { + return false; + } + + // Reject messages that are mostly code (>50% inside triple-backtick fences) + const codeBlockRegex = /```[\s\S]*?```/g; + let codeChars = 0; + let match: RegExpExecArray | null; + while ((match = codeBlockRegex.exec(trimmed)) !== null) { + codeChars += match[0].length; + } + if (codeChars > trimmed.length * 0.5) { + return false; + } + + // Reject messages that are mostly tool output + if ( + trimmed.includes("") || + trimmed.includes("") || + trimmed.includes("") + ) { + return false; + } + + // Injected context from the memory system itself + if (trimmed.includes("") || trimmed.includes("")) { + return false; + } + + // Noise patterns (same as user gate) + if (NOISE_PATTERNS.some((r) => r.test(trimmed))) { + return false; + } + + // Excessive emoji (likely reaction, not substance) + const emojiCount = (trimmed.match(/[\u{1F300}-\u{1F9FF}]/gu) || []).length; + if (emojiCount > 3) { + return false; + } + + return true; +} diff --git a/extensions/memory-neo4j/extractor.test.ts b/extensions/memory-neo4j/extractor.test.ts index 8f47bc41ad..ef66bb4c29 100644 --- a/extensions/memory-neo4j/extractor.test.ts +++ b/extensions/memory-neo4j/extractor.test.ts @@ -744,11 +744,7 @@ describe("runBackgroundExtraction", () => { let mockDb: { updateExtractionStatus: ReturnType; - mergeEntity: ReturnType; - createMentions: ReturnType; - createEntityRelationship: ReturnType; - tagMemory: ReturnType; - updateMemoryCategory: ReturnType; + batchEntityOperations: ReturnType; }; let mockEmbeddings: { @@ -766,11 +762,7 @@ describe("runBackgroundExtraction", () => { }; mockDb = { updateExtractionStatus: vi.fn().mockResolvedValue(undefined), - mergeEntity: vi.fn().mockResolvedValue(undefined), - createMentions: vi.fn().mockResolvedValue(undefined), - createEntityRelationship: vi.fn().mockResolvedValue(undefined), - tagMemory: vi.fn().mockResolvedValue(undefined), - updateMemoryCategory: vi.fn().mockResolvedValue(undefined), + batchEntityOperations: vi.fn().mockResolvedValue(undefined), }; mockEmbeddings = { embed: vi.fn().mockResolvedValue([0.1, 0.2, 0.3]), @@ -857,7 +849,7 @@ describe("runBackgroundExtraction", () => { expect(mockDb.updateExtractionStatus).toHaveBeenCalledWith("mem-1", "complete"); }); - it("should merge entities, create mentions, and mark complete", async () => { + it("should batch entities, relationships, tags, and category in one call", async () => { mockFetchResponse( JSON.stringify({ category: "fact", @@ -876,18 +868,16 @@ describe("runBackgroundExtraction", () => { mockLogger, ); - expect(mockDb.mergeEntity).toHaveBeenCalledWith( - expect.objectContaining({ - name: "alice", - type: "person", - }), + expect(mockDb.batchEntityOperations).toHaveBeenCalledWith( + "mem-1", + [expect.objectContaining({ name: "alice", type: "person" })], + [], + [], + "fact", ); - expect(mockDb.createMentions).toHaveBeenCalledWith("mem-1", "alice", "context", 1.0); - expect(mockDb.updateMemoryCategory).toHaveBeenCalledWith("mem-1", "fact"); - expect(mockDb.updateExtractionStatus).toHaveBeenCalledWith("mem-1", "complete"); }); - it("should create entity relationships", async () => { + it("should pass relationships to batchEntityOperations", async () => { mockFetchResponse( JSON.stringify({ entities: [ @@ -908,10 +898,19 @@ describe("runBackgroundExtraction", () => { mockLogger, ); - expect(mockDb.createEntityRelationship).toHaveBeenCalledWith("alice", "acme", "WORKS_AT", 0.9); + expect(mockDb.batchEntityOperations).toHaveBeenCalledWith( + "mem-1", + expect.arrayContaining([ + expect.objectContaining({ name: "alice", type: "person" }), + expect.objectContaining({ name: "acme", type: "organization" }), + ]), + [{ source: "alice", target: "acme", type: "WORKS_AT", confidence: 0.9 }], + [], + undefined, + ); }); - it("should tag memories", async () => { + it("should pass tags to batchEntityOperations", async () => { mockFetchResponse( JSON.stringify({ entities: [], @@ -929,10 +928,16 @@ describe("runBackgroundExtraction", () => { mockLogger, ); - expect(mockDb.tagMemory).toHaveBeenCalledWith("mem-1", "programming", "tech"); + expect(mockDb.batchEntityOperations).toHaveBeenCalledWith( + "mem-1", + [], + [], + [{ name: "programming", category: "tech" }], + undefined, + ); }); - it("should not update category when result has no category", async () => { + it("should pass undefined category when result has no category", async () => { mockFetchResponse( JSON.stringify({ entities: [{ name: "Test", type: "concept" }], @@ -950,10 +955,16 @@ describe("runBackgroundExtraction", () => { mockLogger, ); - expect(mockDb.updateMemoryCategory).not.toHaveBeenCalled(); + expect(mockDb.batchEntityOperations).toHaveBeenCalledWith( + "mem-1", + [expect.objectContaining({ name: "test", type: "concept" })], + [], + [], + undefined, + ); }); - it("should handle entity merge failure gracefully", async () => { + it("should handle batchEntityOperations failure gracefully", async () => { mockFetchResponse( JSON.stringify({ entities: [ @@ -965,9 +976,7 @@ describe("runBackgroundExtraction", () => { }), ); - // First entity merge fails, second succeeds - mockDb.mergeEntity.mockRejectedValueOnce(new Error("merge failed")); - mockDb.mergeEntity.mockResolvedValueOnce(undefined); + mockDb.batchEntityOperations.mockRejectedValueOnce(new Error("batch failed")); await runBackgroundExtraction( "mem-1", @@ -978,9 +987,8 @@ describe("runBackgroundExtraction", () => { mockLogger, ); - // Should still continue and complete - expect(mockDb.mergeEntity).toHaveBeenCalledTimes(2); - expect(mockDb.updateExtractionStatus).toHaveBeenCalledWith("mem-1", "complete"); + // Should handle error and mark as failed + expect(mockDb.batchEntityOperations).toHaveBeenCalledTimes(1); expect(mockLogger.warn).toHaveBeenCalled(); }); diff --git a/extensions/memory-neo4j/extractor.ts b/extensions/memory-neo4j/extractor.ts index 8f3370c621..f5461efe5a 100644 --- a/extensions/memory-neo4j/extractor.ts +++ b/extensions/memory-neo4j/extractor.ts @@ -361,50 +361,22 @@ export async function runBackgroundExtraction( return; } - // MERGE Entity nodes (entities use fulltext search, not vector embeddings) - for (const entity of result.entities) { - try { - await db.mergeEntity({ - id: randomUUID(), - name: entity.name, - type: entity.type, - aliases: entity.aliases, - description: entity.description, - }); + // Batch all entity operations into a single transaction: + // entity merges, mentions, relationships, tags, category, and extraction status + await db.batchEntityOperations( + memoryId, + result.entities.map((e) => ({ + id: randomUUID(), + name: e.name, + type: e.type, + aliases: e.aliases, + description: e.description, + })), + result.relationships, + result.tags, + result.category, + ); - // Create MENTIONS relationship - await db.createMentions(memoryId, entity.name, "context", 1.0); - } catch (err) { - logger.warn(`memory-neo4j: entity merge failed for "${entity.name}": ${String(err)}`); - } - } - - // Create inter-Entity relationships - for (const rel of result.relationships) { - try { - await db.createEntityRelationship(rel.source, rel.target, rel.type, rel.confidence); - } catch (err) { - logger.debug?.( - `memory-neo4j: relationship creation failed: ${rel.source}->${rel.target}: ${String(err)}`, - ); - } - } - - // Tag the memory - for (const tag of result.tags) { - try { - await db.tagMemory(memoryId, tag.name, tag.category); - } catch (err) { - logger.debug?.(`memory-neo4j: tagging failed for "${tag.name}": ${String(err)}`); - } - } - - // Update category if the LLM classified it (only overwrites 'other') - if (result.category) { - await db.updateMemoryCategory(memoryId, result.category); - } - - await db.updateExtractionStatus(memoryId, "complete"); logger.info( `memory-neo4j: extraction complete for ${memoryId.slice(0, 8)} — ` + `${result.entities.length} entities, ${result.relationships.length} rels, ${result.tags.length} tags` + @@ -761,26 +733,42 @@ export async function runSleepCycle( const pairs = await db.findConflictingMemories(agentId); result.conflict.pairsFound = pairs.length; - for (const pair of pairs) { - if (abortSignal?.aborted) break; + // Process conflict pairs in parallel chunks of LLM_CONCURRENCY + for (let i = 0; i < pairs.length && !abortSignal?.aborted; i += LLM_CONCURRENCY) { + const chunk = pairs.slice(i, i + LLM_CONCURRENCY); + const outcomes = await Promise.allSettled( + chunk.map((pair) => resolveConflict(pair.memoryA.text, pair.memoryB.text, config)), + ); - const decision = await resolveConflict(pair.memoryA.text, pair.memoryB.text, config); + for (let k = 0; k < outcomes.length; k++) { + if (abortSignal?.aborted) break; + const pair = chunk[k]; + const outcome = outcomes[k]; + if (outcome.status !== "fulfilled") continue; - if (decision === "a") { - await db.invalidateMemory(pair.memoryB.id); - result.conflict.invalidated++; - result.conflict.resolved++; - onProgress?.("conflict", `Kept A, invalidated B: "${pair.memoryB.text.slice(0, 40)}..."`); - } else if (decision === "b") { - await db.invalidateMemory(pair.memoryA.id); - result.conflict.invalidated++; - result.conflict.resolved++; - onProgress?.("conflict", `Kept B, invalidated A: "${pair.memoryA.text.slice(0, 40)}..."`); - } else if (decision === "both") { - result.conflict.resolved++; - onProgress?.("conflict", `Kept both: no real conflict`); + const decision = outcome.value; + if (decision === "a") { + await db.invalidateMemory(pair.memoryB.id); + result.conflict.invalidated++; + result.conflict.resolved++; + onProgress?.( + "conflict", + `Kept A, invalidated B: "${pair.memoryB.text.slice(0, 40)}..."`, + ); + } else if (decision === "b") { + await db.invalidateMemory(pair.memoryA.id); + result.conflict.invalidated++; + result.conflict.resolved++; + onProgress?.( + "conflict", + `Kept B, invalidated A: "${pair.memoryA.text.slice(0, 40)}..."`, + ); + } else if (decision === "both") { + result.conflict.resolved++; + onProgress?.("conflict", `Kept both: no real conflict`); + } + // "skip" = LLM unavailable, don't count as resolved } - // "skip" = LLM unavailable, don't count as resolved } logger.info( @@ -1051,143 +1039,15 @@ export async function runSleepCycle( } // ============================================================================ -// Message Extraction Helper +// Message Extraction (re-exported from message-utils.ts) // ============================================================================ -/** - * Extract user message texts from the event.messages array. - * Handles both string content and content block arrays. - */ -export function extractUserMessages(messages: unknown[]): string[] { - const texts: string[] = []; - - for (const msg of messages) { - if (!msg || typeof msg !== "object") { - continue; - } - const msgObj = msg as Record; - - // Only process user messages for auto-capture - if (msgObj.role !== "user") { - continue; - } - - const content = msgObj.content; - if (typeof content === "string") { - texts.push(content); - continue; - } - - if (Array.isArray(content)) { - for (const block of content) { - if ( - block && - typeof block === "object" && - "type" in block && - (block as Record).type === "text" && - "text" in block && - typeof (block as Record).text === "string" - ) { - texts.push((block as Record).text as string); - } - } - } - } - - // Strip wrappers then filter by length - return texts.map(stripMessageWrappers).filter((t) => t.length >= 10); -} - -/** - * Strip injected context, channel metadata wrappers, and system prefixes - * so the attention gate sees only the raw user text. - * Exported for use by the cleanup command. - */ -export function stripMessageWrappers(text: string): string { - let s = text; - // Injected context from memory system - s = s.replace(/[\s\S]*?<\/relevant-memories>\s*/g, ""); - s = s.replace(/[\s\S]*?<\/core-memory-refresh>\s*/g, ""); - s = s.replace(/[\s\S]*?<\/system>\s*/g, ""); - // File attachments (PDFs, images, etc. forwarded inline by channels) - s = s.replace(/]*>[\s\S]*?<\/file>\s*/g, ""); - // Media attachment preamble (appears before Telegram wrapper) - s = s.replace(/^\[media attached:[^\]]*\]\s*(?:To send an image[^\n]*\n?)*/i, ""); - // System exec output blocks (may appear before Telegram wrapper) - s = s.replace(/^(?:System:\s*\[[^\]]*\][^\n]*\n?)+/gi, ""); - // Telegram wrapper — may now be at start after previous strips - s = s.replace(/^\s*\[Telegram\s[^\]]+\]\s*/i, ""); - // "[message_id: NNN]" suffix (Telegram) - s = s.replace(/\n?\[message_id:\s*\d+\]\s*$/i, ""); - // Slack wrapper — "[Slack #channel @user] MESSAGE [slack message id: ...]" - s = s.replace(/^\s*\[Slack\s[^\]]+\]\s*/i, ""); - s = s.replace(/\n?\[slack message id:\s*[^\]]*\]\s*$/i, ""); - return s.trim(); -} - -// ============================================================================ -// Assistant Message Extraction -// ============================================================================ - -/** - * Strip tool-use, thinking, and code-output blocks from assistant messages - * so the attention gate sees only the substantive assistant text. - */ -export function stripAssistantWrappers(text: string): string { - let s = text; - // Tool-use / tool-result / function_call blocks - s = s.replace(/[\s\S]*?<\/tool_use>\s*/g, ""); - s = s.replace(/[\s\S]*?<\/tool_result>\s*/g, ""); - s = s.replace(/[\s\S]*?<\/function_call>\s*/g, ""); - // Thinking tags - s = s.replace(/[\s\S]*?<\/thinking>\s*/g, ""); - s = s.replace(/[\s\S]*?<\/antThinking>\s*/g, ""); - // Code execution output - s = s.replace(/[\s\S]*?<\/code_output>\s*/g, ""); - return s.trim(); -} - -/** - * Extract assistant message texts from the event.messages array. - * Handles both string content and content block arrays. - */ -export function extractAssistantMessages(messages: unknown[]): string[] { - const texts: string[] = []; - - for (const msg of messages) { - if (!msg || typeof msg !== "object") { - continue; - } - const msgObj = msg as Record; - - if (msgObj.role !== "assistant") { - continue; - } - - const content = msgObj.content; - if (typeof content === "string") { - texts.push(content); - continue; - } - - if (Array.isArray(content)) { - for (const block of content) { - if ( - block && - typeof block === "object" && - "type" in block && - (block as Record).type === "text" && - "text" in block && - typeof (block as Record).text === "string" - ) { - texts.push((block as Record).text as string); - } - } - } - } - - return texts.map(stripAssistantWrappers).filter((t) => t.length >= 10); -} +export { + extractUserMessages, + extractAssistantMessages, + stripMessageWrappers, + stripAssistantWrappers, +} from "./message-utils.js"; // ============================================================================ // LLM-Judged Importance Rating diff --git a/extensions/memory-neo4j/index.ts b/extensions/memory-neo4j/index.ts index 4a188098ee..5a1b279612 100644 --- a/extensions/memory-neo4j/index.ts +++ b/extensions/memory-neo4j/index.ts @@ -17,6 +17,7 @@ import { Type } from "@sinclair/typebox"; import { randomUUID } from "node:crypto"; import { stringEnum } from "openclaw/plugin-sdk"; import type { MemoryCategory, MemorySource } from "./schema.js"; +import { passesAttentionGate, passesAssistantAttentionGate } from "./attention-gate.js"; import { DEFAULT_EMBEDDING_DIMS, EMBEDDING_DIMENSIONS, @@ -1104,7 +1105,7 @@ const memoryNeo4jPlugin = { ); if (cfg.autoCapture) { api.logger.debug?.("memory-neo4j: registering agent_end hook for auto-capture"); - api.on("agent_end", async (event, ctx) => { + api.on("agent_end", (event, ctx) => { api.logger.debug?.( `memory-neo4j: agent_end fired (success=${event.success}, messages=${event.messages?.length ?? 0})`, ); @@ -1116,141 +1117,17 @@ const memoryNeo4jPlugin = { const agentId = ctx.agentId || "default"; const sessionKey = ctx.sessionKey; - try { - let stored = 0; - - // Process user messages - const userMessages = extractUserMessages(event.messages); - const retained = userMessages.filter((text) => passesAttentionGate(text)); - - let semanticDeduped = 0; - for (const text of retained) { - try { - const vector = await embeddings.embed(text); - - // Quick dedup (same content already stored — cosine ≥ 0.95) - const existing = await db.findSimilar(vector, 0.95, 1); - if (existing.length > 0) { - continue; - } - - // Importance rating — moved before semantic dedup to avoid expensive LLM calls on low-value memories - const importance = await rateImportance(text, extractionConfig); - - // Skip low-importance memories (not worth the semantic dedup cost) - if (importance < 0.3) { - continue; - } - - // Semantic dedup: check moderate-similarity memories (0.75–0.95) - // with LLM to catch paraphrases and reformulations - const candidates = await db.findSimilar(vector, 0.75, 3); - if (candidates.length > 0) { - let isDuplicate = false; - for (const candidate of candidates) { - if (await isSemanticDuplicate(text, candidate.text, extractionConfig)) { - api.logger.debug?.( - `memory-neo4j: semantic dedup — skipped "${text.slice(0, 60)}..." (duplicate of "${candidate.text.slice(0, 60)}...")`, - ); - isDuplicate = true; - semanticDeduped++; - break; - } - } - if (isDuplicate) { - continue; - } - } - - await db.storeMemory({ - id: randomUUID(), - text, - embedding: vector, - importance, - category: "other", // sleep cycle will categorize - source: "auto-capture", - extractionStatus: extractionConfig.enabled ? "pending" : "skipped", - agentId, - sessionKey, - }); - stored++; - } catch (err) { - api.logger.debug?.(`memory-neo4j: auto-capture item failed: ${String(err)}`); - } - } - - // Process assistant messages - const assistantMessages = extractAssistantMessages(event.messages); - const retainedAssistant = assistantMessages.filter((text) => - passesAssistantAttentionGate(text), - ); - - for (const text of retainedAssistant) { - try { - const importance = await rateImportance(text, extractionConfig); - - // Only store assistant messages that are genuinely important - if (importance < 0.7) { - continue; - } - - const vector = await embeddings.embed(text); - - const existing = await db.findSimilar(vector, 0.95, 1); - if (existing.length > 0) { - continue; - } - - // Semantic dedup for assistant messages too - const candidates = await db.findSimilar(vector, 0.75, 3); - if (candidates.length > 0) { - let isDuplicate = false; - for (const candidate of candidates) { - if (await isSemanticDuplicate(text, candidate.text, extractionConfig)) { - api.logger.debug?.( - `memory-neo4j: semantic dedup (assistant) — skipped "${text.slice(0, 60)}..."`, - ); - isDuplicate = true; - semanticDeduped++; - break; - } - } - if (isDuplicate) { - continue; - } - } - - await db.storeMemory({ - id: randomUUID(), - text, - embedding: vector, - importance: importance * 0.75, // discount assistant importance proportionally - category: "other", - source: "auto-capture-assistant", - extractionStatus: extractionConfig.enabled ? "pending" : "skipped", - agentId, - sessionKey, - }); - stored++; - } catch (err) { - api.logger.debug?.( - `memory-neo4j: assistant auto-capture item failed: ${String(err)}`, - ); - } - } - - if (stored > 0 || semanticDeduped > 0) { - api.logger.info( - `memory-neo4j: auto-captured ${stored} memories (attention-gated)${semanticDeduped > 0 ? `, ${semanticDeduped} semantic dupes skipped` : ""}`, - ); - } else if (userMessages.length > 0 || assistantMessages.length > 0) { - api.logger.info( - `memory-neo4j: auto-capture ran (0 stored, ${userMessages.length} user msgs, ${retained.length} passed gate, ${assistantMessages.length} assistant msgs, ${retainedAssistant.length} passed gate)`, - ); - } - } catch (err) { - api.logger.warn(`memory-neo4j: auto-capture failed: ${String(err)}`); - } + // Fire-and-forget: run auto-capture asynchronously so it doesn't + // block the agent_end hook (which otherwise adds 2-10s per turn). + void runAutoCapture( + event.messages, + agentId, + sessionKey, + db, + embeddings, + extractionConfig, + api.logger, + ); }); } @@ -1283,159 +1160,170 @@ const memoryNeo4jPlugin = { }; // ============================================================================ -// Attention gate — lightweight heuristic filter (phase 1 of memory pipeline) -// -// Rejects obvious noise without any LLM call, analogous to how the brain's -// sensory gating filters out irrelevant stimuli before they enter working -// memory. Everything that passes gets stored; the sleep cycle decides what -// matters. +// Auto-capture pipeline (fire-and-forget from agent_end hook) // ============================================================================ -const NOISE_PATTERNS = [ - // Greetings / acknowledgments (exact match, with optional punctuation) - /^(hi|hey|hello|yo|sup|ok|okay|sure|thanks|thank you|thx|ty|yep|yup|nope|no|yes|yeah|cool|nice|great|got it|sounds good|perfect|alright|fine|noted|ack|kk|k)\s*[.!?]*$/i, - // Two-word affirmations: "ok great", "sounds good", "yes please", etc. - /^(ok|okay|yes|yeah|yep|sure|no|nope|alright|right|fine|cool|nice|great)\s+(great|good|sure|thanks|please|ok|fine|cool|yeah|perfect|noted|absolutely|definitely|exactly)\s*[.!?]*$/i, - // Deictic: messages that are only pronouns/articles/common verbs — no standalone meaning - // e.g. "I need those", "let me do it", "ok let me test it out", "I got it" - /^(ok[,.]?\s+)?(i('ll|'m|'d|'ve)?\s+)?(just\s+)?(need|want|got|have|let|let's|let me|give me|send|do|did|try|check|see|look at|test|take|get|go|use)\s+(it|that|this|those|these|them|some|one|the|a|an|me|him|her|us)\s*(out|up|now|then|too|again|later|first|here|there|please)?\s*[.!?]*$/i, - // Short acknowledgments with trailing context: "ok, ..." / "yes, ..." when total is brief - /^(ok|okay|yes|yeah|yep|sure|no|nope|right|alright|fine|cool|nice|great|perfect)[,.]?\s+.{0,20}$/i, - // Conversational filler / noise phrases (standalone, with optional punctuation) - /^(hmm+|huh|haha|ha|lol|lmao|rofl|nah|meh|idk|brb|ttyl|omg|wow|whoa|welp|oops|ooh|aah|ugh|bleh|pfft|smh|ikr|tbh|imo|fwiw|np|nvm|nm|wut|wat|wha|heh|tsk|sigh|yay|woo+|boo|dang|darn|geez|gosh|sheesh|oof)\s*[.!?]*$/i, - // Single-word or near-empty - /^\S{0,3}$/, - // Pure emoji - /^[\p{Emoji}\s]+$/u, - // System/XML markup - /^<[a-z-]+>[\s\S]*<\/[a-z-]+>$/i, +type AutoCaptureLogger = { + info: (msg: string) => void; + warn: (msg: string) => void; + debug?: (msg: string) => void; +}; - // --- Session reset prompts (from /new and /reset commands) --- - /^A new session was started via/i, +/** + * Shared capture logic for both user and assistant messages. + * Extracts the common embed → dedup → rate → store pipeline. + */ +async function captureMessage( + text: string, + source: "auto-capture" | "auto-capture-assistant", + importanceThreshold: number, + importanceDiscount: number, + agentId: string, + sessionKey: string | undefined, + db: import("./neo4j-client.js").Neo4jMemoryClient, + embeddings: import("./embeddings.js").Embeddings, + extractionConfig: import("./config.js").ExtractionConfig, + logger: AutoCaptureLogger, +): Promise<{ stored: boolean; semanticDeduped: boolean }> { + // For assistant messages, rate importance first (before embedding) to skip early + const rateFirst = source === "auto-capture-assistant"; - // --- System infrastructure messages (never user-generated) --- - // Heartbeat prompts - /Read HEARTBEAT\.md if it exists/i, - // Pre-compaction flush prompts - /^Pre-compaction memory flush/i, - // System timestamp messages (cron outputs, reminders, exec reports) - /^System:\s*\[/i, - // Cron job wrappers - /^\[cron:[0-9a-f-]+/i, - // Gateway restart JSON payloads - /^GatewayRestart:\s*\{/i, - // Background task completion reports - /^\[\w{3}\s+\d{4}-\d{2}-\d{2}\s.*\]\s*A background task/i, -]; - -/** Maximum message length — code dumps, logs, etc. are not memories. */ -const MAX_CAPTURE_CHARS = 2000; - -/** Minimum message length — too short to be meaningful. */ -const MIN_CAPTURE_CHARS = 30; - -/** Minimum word count — short contextual phrases lack standalone meaning. */ -const MIN_WORD_COUNT = 5; - -function passesAttentionGate(text: string): boolean { - const trimmed = text.trim(); - - // Length bounds - if (trimmed.length < MIN_CAPTURE_CHARS || trimmed.length > MAX_CAPTURE_CHARS) { - return false; + let importance: number | undefined; + if (rateFirst) { + importance = await rateImportance(text, extractionConfig); + if (importance < importanceThreshold) { + return { stored: false, semanticDeduped: false }; + } } - // Word count — short phrases ("I need those") lack context for recall - const wordCount = trimmed.split(/\s+/).length; - if (wordCount < MIN_WORD_COUNT) { - return false; + const vector = await embeddings.embed(text); + + // Quick dedup (same content already stored — cosine >= 0.95) + const existing = await db.findSimilar(vector, 0.95, 1); + if (existing.length > 0) { + return { stored: false, semanticDeduped: false }; } - // Injected context from the memory system itself - if (trimmed.includes("") || trimmed.includes("")) { - return false; + // Rate importance if not already done + if (importance === undefined) { + importance = await rateImportance(text, extractionConfig); + if (importance < importanceThreshold) { + return { stored: false, semanticDeduped: false }; + } } - // Noise patterns - if (NOISE_PATTERNS.some((r) => r.test(trimmed))) { - return false; + // Semantic dedup: check moderate-similarity memories (0.75-0.95) + const candidates = await db.findSimilar(vector, 0.75, 3); + if (candidates.length > 0) { + for (const candidate of candidates) { + if (await isSemanticDuplicate(text, candidate.text, extractionConfig)) { + logger.debug?.( + `memory-neo4j: semantic dedup — skipped "${text.slice(0, 60)}..." (duplicate of "${candidate.text.slice(0, 60)}...")`, + ); + return { stored: false, semanticDeduped: true }; + } + } } - // Excessive emoji (likely reaction, not substance) - const emojiCount = (trimmed.match(/[\u{1F300}-\u{1F9FF}]/gu) || []).length; - if (emojiCount > 3) { - return false; - } - - // Passes gate — retain for short-term storage - return true; + await db.storeMemory({ + id: randomUUID(), + text, + embedding: vector, + importance: importance * importanceDiscount, + category: "other", + source, + extractionStatus: extractionConfig.enabled ? "pending" : "skipped", + agentId, + sessionKey, + }); + return { stored: true, semanticDeduped: false }; } -// ============================================================================ -// Assistant attention gate — stricter filter for assistant messages -// ============================================================================ +/** + * Run the full auto-capture pipeline asynchronously. + * Processes user and assistant messages through attention gate → capture. + */ +async function runAutoCapture( + messages: unknown[], + agentId: string, + sessionKey: string | undefined, + db: import("./neo4j-client.js").Neo4jMemoryClient, + embeddings: import("./embeddings.js").Embeddings, + extractionConfig: import("./config.js").ExtractionConfig, + logger: AutoCaptureLogger, +): Promise { + try { + let stored = 0; + let semanticDeduped = 0; -/** Maximum assistant message length — shorter than user to avoid code dumps. */ -const MAX_ASSISTANT_CAPTURE_CHARS = 1000; + // Process user messages + const userMessages = extractUserMessages(messages); + const retained = userMessages.filter((text) => passesAttentionGate(text)); -/** Minimum word count for assistant messages — higher than user. */ -const MIN_ASSISTANT_WORD_COUNT = 10; + for (const text of retained) { + try { + const result = await captureMessage( + text, + "auto-capture", + 0.3, + 1.0, + agentId, + sessionKey, + db, + embeddings, + extractionConfig, + logger, + ); + if (result.stored) stored++; + if (result.semanticDeduped) semanticDeduped++; + } catch (err) { + logger.debug?.(`memory-neo4j: auto-capture item failed: ${String(err)}`); + } + } -function passesAssistantAttentionGate(text: string): boolean { - const trimmed = text.trim(); + // Process assistant messages + const assistantMessages = extractAssistantMessages(messages); + const retainedAssistant = assistantMessages.filter((text) => + passesAssistantAttentionGate(text), + ); - // Length bounds (stricter than user) - if (trimmed.length < MIN_CAPTURE_CHARS || trimmed.length > MAX_ASSISTANT_CAPTURE_CHARS) { - return false; + for (const text of retainedAssistant) { + try { + const result = await captureMessage( + text, + "auto-capture-assistant", + 0.7, + 0.75, + agentId, + sessionKey, + db, + embeddings, + extractionConfig, + logger, + ); + if (result.stored) stored++; + if (result.semanticDeduped) semanticDeduped++; + } catch (err) { + logger.debug?.(`memory-neo4j: assistant auto-capture item failed: ${String(err)}`); + } + } + + if (stored > 0 || semanticDeduped > 0) { + logger.info( + `memory-neo4j: auto-captured ${stored} memories (attention-gated)${semanticDeduped > 0 ? `, ${semanticDeduped} semantic dupes skipped` : ""}`, + ); + } else if (userMessages.length > 0 || assistantMessages.length > 0) { + logger.info( + `memory-neo4j: auto-capture ran (0 stored, ${userMessages.length} user msgs, ${retained.length} passed gate, ${assistantMessages.length} assistant msgs, ${retainedAssistant.length} passed gate)`, + ); + } + } catch (err) { + logger.warn(`memory-neo4j: auto-capture failed: ${String(err)}`); } - - // Word count — higher threshold than user messages - const wordCount = trimmed.split(/\s+/).length; - if (wordCount < MIN_ASSISTANT_WORD_COUNT) { - return false; - } - - // Reject messages that are mostly code (>50% inside triple-backtick fences) - const codeBlockRegex = /```[\s\S]*?```/g; - let codeChars = 0; - let match: RegExpExecArray | null; - while ((match = codeBlockRegex.exec(trimmed)) !== null) { - codeChars += match[0].length; - } - if (codeChars > trimmed.length * 0.5) { - return false; - } - - // Reject messages that are mostly tool output - if ( - trimmed.includes("") || - trimmed.includes("") || - trimmed.includes("") - ) { - return false; - } - - // Injected context from the memory system itself - if (trimmed.includes("") || trimmed.includes("")) { - return false; - } - - // Noise patterns (same as user gate) - if (NOISE_PATTERNS.some((r) => r.test(trimmed))) { - return false; - } - - // Excessive emoji (likely reaction, not substance) - const emojiCount = (trimmed.match(/[\u{1F300}-\u{1F9FF}]/gu) || []).length; - if (emojiCount > 3) { - return false; - } - - return true; } -// Exported for testing -export { passesAttentionGate, passesAssistantAttentionGate }; +// Re-export attention gate for backwards compatibility (tests import from here) +export { passesAttentionGate, passesAssistantAttentionGate } from "./attention-gate.js"; // ============================================================================ // Export diff --git a/extensions/memory-neo4j/message-utils.ts b/extensions/memory-neo4j/message-utils.ts new file mode 100644 index 0000000000..08e68db1ec --- /dev/null +++ b/extensions/memory-neo4j/message-utils.ts @@ -0,0 +1,147 @@ +/** + * Message extraction utilities for the memory pipeline. + * + * Extracts and cleans user/assistant messages from the raw event.messages + * array, stripping channel wrappers, injected context, tool output, and + * other noise so downstream consumers (attention gate, memory store) see + * only the substantive text. + */ + +// ============================================================================ +// User Message Extraction +// ============================================================================ + +/** + * Extract user message texts from the event.messages array. + * Handles both string content and content block arrays. + */ +export function extractUserMessages(messages: unknown[]): string[] { + const texts: string[] = []; + + for (const msg of messages) { + if (!msg || typeof msg !== "object") { + continue; + } + const msgObj = msg as Record; + + // Only process user messages for auto-capture + if (msgObj.role !== "user") { + continue; + } + + const content = msgObj.content; + if (typeof content === "string") { + texts.push(content); + continue; + } + + if (Array.isArray(content)) { + for (const block of content) { + if ( + block && + typeof block === "object" && + "type" in block && + (block as Record).type === "text" && + "text" in block && + typeof (block as Record).text === "string" + ) { + texts.push((block as Record).text as string); + } + } + } + } + + // Strip wrappers then filter by length + return texts.map(stripMessageWrappers).filter((t) => t.length >= 10); +} + +/** + * Strip injected context, channel metadata wrappers, and system prefixes + * so the attention gate sees only the raw user text. + * Exported for use by the cleanup command. + */ +export function stripMessageWrappers(text: string): string { + let s = text; + // Injected context from memory system + s = s.replace(/[\s\S]*?<\/relevant-memories>\s*/g, ""); + s = s.replace(/[\s\S]*?<\/core-memory-refresh>\s*/g, ""); + s = s.replace(/[\s\S]*?<\/system>\s*/g, ""); + // File attachments (PDFs, images, etc. forwarded inline by channels) + s = s.replace(/]*>[\s\S]*?<\/file>\s*/g, ""); + // Media attachment preamble (appears before Telegram wrapper) + s = s.replace(/^\[media attached:[^\]]*\]\s*(?:To send an image[^\n]*\n?)*/i, ""); + // System exec output blocks (may appear before Telegram wrapper) + s = s.replace(/^(?:System:\s*\[[^\]]*\][^\n]*\n?)+/gi, ""); + // Telegram wrapper — may now be at start after previous strips + s = s.replace(/^\s*\[Telegram\s[^\]]+\]\s*/i, ""); + // "[message_id: NNN]" suffix (Telegram) + s = s.replace(/\n?\[message_id:\s*\d+\]\s*$/i, ""); + // Slack wrapper — "[Slack #channel @user] MESSAGE [slack message id: ...]" + s = s.replace(/^\s*\[Slack\s[^\]]+\]\s*/i, ""); + s = s.replace(/\n?\[slack message id:\s*[^\]]*\]\s*$/i, ""); + return s.trim(); +} + +// ============================================================================ +// Assistant Message Extraction +// ============================================================================ + +/** + * Strip tool-use, thinking, and code-output blocks from assistant messages + * so the attention gate sees only the substantive assistant text. + */ +export function stripAssistantWrappers(text: string): string { + let s = text; + // Tool-use / tool-result / function_call blocks + s = s.replace(/[\s\S]*?<\/tool_use>\s*/g, ""); + s = s.replace(/[\s\S]*?<\/tool_result>\s*/g, ""); + s = s.replace(/[\s\S]*?<\/function_call>\s*/g, ""); + // Thinking tags + s = s.replace(/[\s\S]*?<\/thinking>\s*/g, ""); + s = s.replace(/[\s\S]*?<\/antThinking>\s*/g, ""); + // Code execution output + s = s.replace(/[\s\S]*?<\/code_output>\s*/g, ""); + return s.trim(); +} + +/** + * Extract assistant message texts from the event.messages array. + * Handles both string content and content block arrays. + */ +export function extractAssistantMessages(messages: unknown[]): string[] { + const texts: string[] = []; + + for (const msg of messages) { + if (!msg || typeof msg !== "object") { + continue; + } + const msgObj = msg as Record; + + if (msgObj.role !== "assistant") { + continue; + } + + const content = msgObj.content; + if (typeof content === "string") { + texts.push(content); + continue; + } + + if (Array.isArray(content)) { + for (const block of content) { + if ( + block && + typeof block === "object" && + "type" in block && + (block as Record).type === "text" && + "text" in block && + typeof (block as Record).text === "string" + ) { + texts.push((block as Record).text as string); + } + } + } + } + + return texts.map(stripAssistantWrappers).filter((t) => t.length >= 10); +} diff --git a/extensions/memory-neo4j/neo4j-client.test.ts b/extensions/memory-neo4j/neo4j-client.test.ts index f720be4a41..8f5e59115c 100644 --- a/extensions/memory-neo4j/neo4j-client.test.ts +++ b/extensions/memory-neo4j/neo4j-client.test.ts @@ -1375,23 +1375,7 @@ describe("Neo4jMemoryClient", () => { }); it("should perform graph search with entity traversal", async () => { - // Mock entity search - mockSession.run.mockResolvedValueOnce({ - records: [ - { - get: vi.fn((key) => { - const data: Record = { - entityId: "e1", - name: "tarun", - score: 0.95, - }; - return data[key]; - }), - }, - ], - }); - - // Mock memory search via entities + // Combined single-query now returns memory records directly mockSession.run.mockResolvedValueOnce({ records: [ { diff --git a/extensions/memory-neo4j/neo4j-client.ts b/extensions/memory-neo4j/neo4j-client.ts index 269f786d92..5d1e337f27 100644 --- a/extensions/memory-neo4j/neo4j-client.ts +++ b/extensions/memory-neo4j/neo4j-client.ts @@ -154,6 +154,12 @@ export class Neo4jMemoryClient { session, "CREATE INDEX entity_name_index IF NOT EXISTS FOR (e:Entity) ON (e.name)", ); + // Composite index for queries that filter by both agentId and category + // (e.g. listByCategory, promotion/demotion filtering in sleep cycle) + await this.runSafe( + session, + "CREATE INDEX memory_agent_category_index IF NOT EXISTS FOR (m:Memory) ON (m.agentId, m.category)", + ); this.logger.info("memory-neo4j: indexes ensured"); } finally { @@ -523,30 +529,21 @@ export class Neo4jMemoryClient { return await this.retryOnTransient(async () => { const session = this.driver!.session(); try { - // Step 1: Find matching entities - const entityResult = await session.run( - `CALL db.index.fulltext.queryNodes('entity_fulltext_index', $query) - YIELD node, score - WHERE score >= 0.5 - RETURN node.id AS entityId, node.name AS name, score - ORDER BY score DESC - LIMIT 5`, - { query: escaped }, - ); - - const entityIds = entityResult.records.map((r) => r.get("entityId") as string); - if (entityIds.length === 0) { - return []; - } - - // Step 2 + 3: Direct mentions + 1-hop spreading activation + // Single query: entity fulltext lookup → direct mentions + 1-hop spreading activation const agentFilter = agentId ? "AND m.agentId = $agentId" : ""; const result = await session.run( - `UNWIND $entityIds AS eid + `// Find matching entities via fulltext index + CALL db.index.fulltext.queryNodes('entity_fulltext_index', $query) + YIELD node AS entity, score + WHERE score >= 0.5 + WITH entity + ORDER BY score DESC + LIMIT 5 + // Direct: Entity ← MENTIONS ← Memory - OPTIONAL MATCH (e:Entity {id: eid})<-[rm:MENTIONS]-(m:Memory) + OPTIONAL MATCH (entity)<-[rm:MENTIONS]-(m:Memory) WHERE m IS NOT NULL ${agentFilter} - WITH m, coalesce(rm.confidence, 1.0) AS directScore + WITH m, coalesce(rm.confidence, 1.0) AS directScore, entity WHERE m IS NOT NULL RETURN m.id AS id, m.text AS text, m.category AS category, @@ -555,9 +552,16 @@ export class Neo4jMemoryClient { UNION - UNWIND $entityIds AS eid + // Find matching entities via fulltext index (repeated for UNION) + CALL db.index.fulltext.queryNodes('entity_fulltext_index', $query) + YIELD node AS entity, score + WHERE score >= 0.5 + WITH entity + ORDER BY score DESC + LIMIT 5 + // 1-hop: Entity → relationship → Entity ← MENTIONS ← Memory - OPTIONAL MATCH (e:Entity {id: eid})-[r1:${RELATIONSHIP_TYPE_PATTERN}]-(e2:Entity) + OPTIONAL MATCH (entity)-[r1:${RELATIONSHIP_TYPE_PATTERN}]-(e2:Entity) WHERE coalesce(r1.confidence, 0.7) >= $firingThreshold OPTIONAL MATCH (e2)<-[rm:MENTIONS]-(m:Memory) WHERE m IS NOT NULL ${agentFilter} @@ -567,7 +571,7 @@ export class Neo4jMemoryClient { RETURN m.id AS id, m.text AS text, m.category AS category, m.importance AS importance, m.createdAt AS createdAt, max(hopScore) AS graphScore`, - { entityIds, firingThreshold, ...(agentId ? { agentId } : {}) }, + { query: escaped, firingThreshold, ...(agentId ? { agentId } : {}) }, ); // Deduplicate by id, keeping highest score @@ -873,6 +877,153 @@ export class Neo4jMemoryClient { } } + /** + * Batch all entity operations from an extraction result into a single managed + * transaction. Replaces the previous pattern of N individual session-per-call + * operations with a single atomic write. + * + * Operations performed atomically: + * 1. MERGE all Entity nodes + * 2. Create MENTIONS relationships (Memory → Entity) + * 3. Create inter-Entity relationships (validated against allowlist) + * 4. MERGE Tag nodes and create TAGGED relationships + * 5. Update memory category (if classified and current is 'other') + * 6. Set extractionStatus to 'complete' + */ + async batchEntityOperations( + memoryId: string, + entities: Array<{ + id: string; + name: string; + type: string; + aliases?: string[]; + description?: string; + }>, + relationships: Array<{ + source: string; + target: string; + type: string; + confidence: number; + }>, + tags: Array<{ name: string; category: string }>, + category?: string, + ): Promise { + await this.ensureInitialized(); + return this.retryOnTransient(async () => { + const session = this.driver!.session(); + try { + await session.executeWrite(async (tx) => { + const now = new Date().toISOString(); + + // 1. MERGE all entities in one UNWIND + if (entities.length > 0) { + await tx.run( + `UNWIND $entities AS e + MERGE (n:Entity {name: e.name}) + ON CREATE SET + n.id = e.id, n.type = e.type, n.aliases = e.aliases, + n.description = e.description, + n.firstSeen = $now, n.lastSeen = $now, n.mentionCount = 1 + ON MATCH SET + n.type = COALESCE(e.type, n.type), + n.description = COALESCE(e.description, n.description), + n.lastSeen = $now, + n.mentionCount = n.mentionCount + 1`, + { + entities: entities.map((e) => ({ + id: e.id, + name: e.name.trim().toLowerCase(), + type: e.type, + aliases: e.aliases ?? [], + description: e.description ?? null, + })), + now, + }, + ); + + // 2. Create MENTIONS relationships in one UNWIND + await tx.run( + `UNWIND $entityNames AS eName + MATCH (m:Memory {id: $memoryId}) + MATCH (e:Entity {name: eName}) + MERGE (m)-[r:MENTIONS]->(e) + ON CREATE SET r.role = 'context', r.confidence = 1.0`, + { + memoryId, + entityNames: entities.map((e) => e.name.trim().toLowerCase()), + }, + ); + } + + // 3. Create inter-Entity relationships (filter valid types) + const validRels = relationships.filter((r) => validateRelationshipType(r.type)); + if (validRels.length > 0) { + // Group by relationship type since Cypher requires literal rel types + const byType = new Map(); + for (const rel of validRels) { + const group = byType.get(rel.type) ?? []; + group.push(rel); + byType.set(rel.type, group); + } + + for (const [relType, rels] of byType) { + await tx.run( + `UNWIND $rels AS r + MATCH (e1:Entity {name: r.source}) + MATCH (e2:Entity {name: r.target}) + MERGE (e1)-[rel:${relType}]->(e2) + ON CREATE SET rel.confidence = r.confidence, rel.createdAt = $now + ON MATCH SET rel.confidence = CASE WHEN r.confidence > rel.confidence THEN r.confidence ELSE rel.confidence END`, + { + rels: rels.map((r) => ({ + source: r.source.trim().toLowerCase(), + target: r.target.trim().toLowerCase(), + confidence: r.confidence, + })), + now, + }, + ); + } + } + + // 4. MERGE Tags and create TAGGED relationships in one UNWIND + if (tags.length > 0) { + await tx.run( + `UNWIND $tags AS t + MERGE (tag:Tag {name: t.name}) + ON CREATE SET tag.id = t.id, tag.category = t.category, tag.createdAt = $now + WITH tag, t + MATCH (m:Memory {id: $memoryId}) + MERGE (m)-[r:TAGGED]->(tag) + ON CREATE SET r.confidence = 1.0`, + { + memoryId, + tags: tags.map((t) => ({ + name: t.name.trim().toLowerCase(), + category: t.category, + id: randomUUID(), + })), + now, + }, + ); + } + + // 5. Update category + 6. Set extraction status (in one statement) + const categoryClause = category + ? ", m.category = CASE WHEN m.category = 'other' THEN $category ELSE m.category END" + : ""; + await tx.run( + `MATCH (m:Memory {id: $memoryId}) + SET m.extractionStatus = 'complete', m.updatedAt = $now${categoryClause}`, + { memoryId, now, ...(category ? { category } : {}) }, + ); + }); + } finally { + await session.close(); + } + }); + } + /** * List memories with pending extraction status. * Used by the sleep cycle to batch-process extractions.