diff --git a/extensions/memory-neo4j/config.ts b/extensions/memory-neo4j/config.ts index c2915ab193..c9a729034b 100644 --- a/extensions/memory-neo4j/config.ts +++ b/extensions/memory-neo4j/config.ts @@ -38,6 +38,18 @@ export type MemoryNeo4jConfig = { */ refreshAtContextPercent?: number; }; + /** + * Maximum relationship hops for graph search spreading activation. + * Default: 1 (direct + 1-hop neighbors). + * Setting to 2+ enables deeper traversal but may slow queries. + */ + graphSearchDepth: number; + /** + * Per-category decay curve parameters. Each category can have its own + * half-life (days) controlling how fast memories in that category decay. + * Categories not listed use the sleep cycle's default (30 days). + */ + decayCurves: Record; }; /** @@ -204,6 +216,8 @@ export const memoryNeo4jConfigSchema = { "autoRecallMinScore", "coreMemory", "extraction", + "graphSearchDepth", + "decayCurves", ], "memory-neo4j config", ); @@ -320,6 +334,32 @@ export const memoryNeo4jConfigSchema = { } } + // Parse decayCurves: per-category decay curve overrides + const decayCurvesRaw = cfg.decayCurves as Record | undefined; + const decayCurves: Record = {}; + if (decayCurvesRaw && typeof decayCurvesRaw === "object") { + for (const [cat, val] of Object.entries(decayCurvesRaw)) { + if (val && typeof val === "object" && "halfLifeDays" in val) { + const hl = (val as Record).halfLifeDays; + if (typeof hl === "number" && hl > 0) { + decayCurves[cat] = { halfLifeDays: hl }; + } else { + throw new Error(`decayCurves.${cat}.halfLifeDays must be a positive number`); + } + } + } + } + + // Parse graphSearchDepth: must be 1-3, default 1 + const rawDepth = cfg.graphSearchDepth; + let graphSearchDepth = 1; + if (typeof rawDepth === "number") { + if (rawDepth < 1 || rawDepth > 3 || !Number.isInteger(rawDepth)) { + throw new Error(`graphSearchDepth must be 1, 2, or 3, got: ${rawDepth}`); + } + graphSearchDepth = rawDepth; + } + return { neo4j: { uri: neo4jRaw.uri, @@ -341,6 +381,8 @@ export const memoryNeo4jConfigSchema = { maxEntries: coreMemoryMaxEntries, refreshAtContextPercent, }, + graphSearchDepth, + decayCurves, }; }, }; diff --git a/extensions/memory-neo4j/embeddings.ts b/extensions/memory-neo4j/embeddings.ts index 0df3b137f8..7328ee9e1f 100644 --- a/extensions/memory-neo4j/embeddings.ts +++ b/extensions/memory-neo4j/embeddings.ts @@ -280,3 +280,24 @@ export class Embeddings { return data.embeddings[0]; } } + +/** + * Compute cosine similarity between two embedding vectors. + * Returns a value between -1 and 1 (1 = identical, 0 = orthogonal). + * Returns 0 if either vector is empty or they differ in length. + */ +export function cosineSimilarity(a: number[], b: number[]): number { + if (a.length === 0 || a.length !== b.length) { + return 0; + } + let dot = 0; + let normA = 0; + let normB = 0; + for (let i = 0; i < a.length; i++) { + dot += a[i] * b[i]; + normA += a[i] * a[i]; + normB += b[i] * b[i]; + } + const denom = Math.sqrt(normA) * Math.sqrt(normB); + return denom === 0 ? 0 : dot / denom; +} diff --git a/extensions/memory-neo4j/extractor.test.ts b/extensions/memory-neo4j/extractor.test.ts index ef66bb4c29..8e115e898c 100644 --- a/extensions/memory-neo4j/extractor.test.ts +++ b/extensions/memory-neo4j/extractor.test.ts @@ -377,6 +377,22 @@ describe("extractUserMessages", () => { // extractEntities() — tests validateExtractionResult() indirectly // ============================================================================ +/** + * Create a ReadableStream that emits SSE-formatted chunks from a content string. + * Used to mock streaming LLM responses. + */ +function mockSSEStream(content: string): ReadableStream { + const encoder = new TextEncoder(); + // Send the content in one SSE data event, then [DONE] + const sseData = `data: ${JSON.stringify({ choices: [{ delta: { content } }] })}\n\ndata: [DONE]\n\n`; + return new ReadableStream({ + start(controller) { + controller.enqueue(encoder.encode(sseData)); + controller.close(); + }, + }); +} + describe("extractEntities", () => { // We need to mock `fetch` since callOpenRouter uses global fetch const originalFetch = globalThis.fetch; @@ -408,6 +424,9 @@ describe("extractEntities", () => { ok: status >= 200 && status < 300, status, text: () => Promise.resolve(content), + // Streaming response format (used by extractEntities via callOpenRouterStream) + body: status >= 200 && status < 300 ? mockSSEStream(content) : null, + // Non-streaming format (used by other LLM calls via callOpenRouter) json: () => Promise.resolve({ choices: [{ message: { content } }], @@ -792,6 +811,7 @@ describe("runBackgroundExtraction", () => { globalThis.fetch = vi.fn().mockResolvedValue({ ok: true, status: 200, + body: mockSSEStream(content), json: () => Promise.resolve({ choices: [{ message: { content } }], @@ -1928,10 +1948,10 @@ describe("runSleepCycle", () => { texts: ["text", "text", "text"], importances: [0.5, 0.6, 0.7], similarities: new Map([ - ["a:b", 0.8], - ["a:c", 0.78], + ["a:b", 0.85], + ["a:c", 0.81], ["b:c", 0.82], - ]), // All in 0.75-0.95 range + ]), // All above SEMANTIC_DEDUP_VECTOR_THRESHOLD (0.8) }, ]); diff --git a/extensions/memory-neo4j/extractor.ts b/extensions/memory-neo4j/extractor.ts index f5461efe5a..f1fc26ebe0 100644 --- a/extensions/memory-neo4j/extractor.ts +++ b/extensions/memory-neo4j/extractor.ts @@ -68,11 +68,17 @@ const FETCH_TIMEOUT_MS = 30_000; async function callOpenRouter( config: ExtractionConfig, prompt: string | Array<{ role: string; content: string }>, + abortSignal?: AbortSignal, ): Promise { const messages = typeof prompt === "string" ? [{ role: "user", content: prompt }] : prompt; for (let attempt = 0; attempt <= config.maxRetries; attempt++) { try { + // Combine the caller's abort signal with a per-request timeout + const signal = abortSignal + ? AbortSignal.any([abortSignal, AbortSignal.timeout(FETCH_TIMEOUT_MS)]) + : AbortSignal.timeout(FETCH_TIMEOUT_MS); + const response = await fetch(`${config.baseUrl}/chat/completions`, { method: "POST", headers: { @@ -85,7 +91,7 @@ async function callOpenRouter( temperature: config.temperature, response_format: { type: "json_object" }, }), - signal: AbortSignal.timeout(FETCH_TIMEOUT_MS), + signal, }); if (!response.ok) { @@ -108,6 +114,105 @@ async function callOpenRouter( return null; } +/** + * Streaming variant of callOpenRouter. Uses the streaming API to receive chunks + * incrementally, allowing earlier cancellation via abort signal and better + * latency characteristics for long responses. + * + * Accumulates all chunks into a single response string since extraction + * uses JSON mode (which requires the complete object to parse). + */ +async function callOpenRouterStream( + config: ExtractionConfig, + prompt: string | Array<{ role: string; content: string }>, + abortSignal?: AbortSignal, +): Promise { + const messages = typeof prompt === "string" ? [{ role: "user", content: prompt }] : prompt; + + for (let attempt = 0; attempt <= config.maxRetries; attempt++) { + try { + const signal = abortSignal + ? AbortSignal.any([abortSignal, AbortSignal.timeout(FETCH_TIMEOUT_MS)]) + : AbortSignal.timeout(FETCH_TIMEOUT_MS); + + const response = await fetch(`${config.baseUrl}/chat/completions`, { + method: "POST", + headers: { + Authorization: `Bearer ${config.apiKey}`, + "Content-Type": "application/json", + }, + body: JSON.stringify({ + model: config.model, + messages, + temperature: config.temperature, + response_format: { type: "json_object" }, + stream: true, + }), + signal, + }); + + if (!response.ok) { + const body = await response.text().catch(() => ""); + throw new Error(`OpenRouter API error ${response.status}: ${body}`); + } + + if (!response.body) { + throw new Error("No response body for streaming request"); + } + + // Read SSE stream and accumulate content chunks + const reader = response.body.getReader(); + const decoder = new TextDecoder(); + let accumulated = ""; + let buffer = ""; + + for (;;) { + // Check abort between chunks for responsive cancellation + if (abortSignal?.aborted) { + reader.cancel().catch(() => {}); + return null; + } + + const { done, value } = await reader.read(); + if (done) break; + + buffer += decoder.decode(value, { stream: true }); + + // Parse SSE lines + const lines = buffer.split("\n"); + buffer = lines.pop() ?? ""; + + for (const line of lines) { + const trimmed = line.trim(); + if (!trimmed.startsWith("data: ")) continue; + const data = trimmed.slice(6); + if (data === "[DONE]") continue; + + try { + const parsed = JSON.parse(data) as { + choices?: Array<{ delta?: { content?: string } }>; + }; + const chunk = parsed.choices?.[0]?.delta?.content; + if (chunk) { + accumulated += chunk; + } + } catch { + // Skip malformed SSE chunks + } + } + } + + return accumulated || null; + } catch (err) { + if (attempt >= config.maxRetries) { + throw err; + } + await new Promise((resolve) => setTimeout(resolve, 500 * Math.pow(2, attempt))); + } + } + return null; +} + // ============================================================================ // Entity Extraction // ============================================================================ @@ -143,6 +248,8 @@ function isTransientError(err: unknown): boolean { /** * Extract entities and relationships from a memory text using LLM. * + * Uses streaming for responsive abort signal handling and better latency. + * * Returns { result, transientFailure }: * - result is the ExtractionResult or null if extraction returned nothing useful * - transientFailure is true if the failure was due to a network/timeout issue @@ -151,6 +258,7 @@ function isTransientError(err: unknown): boolean { export async function extractEntities( text: string, config: ExtractionConfig, + abortSignal?: AbortSignal, ): Promise<{ result: ExtractionResult | null; transientFailure: boolean }> { if (!config.enabled) { return { result: null, transientFailure: false }; @@ -164,7 +272,8 @@ export async function extractEntities( let content: string | null; try { - content = await callOpenRouter(config, messages); + // Use streaming for extraction — allows responsive abort and better latency + content = await callOpenRouterStream(config, messages, abortSignal); } catch (err) { // Network/timeout errors are transient — caller should retry return { result: null, transientFailure: isTransientError(err) }; @@ -264,22 +373,27 @@ export async function resolveConflict( memA: string, memB: string, config: ExtractionConfig, + abortSignal?: AbortSignal, ): Promise<"a" | "b" | "both" | "skip"> { if (!config.enabled) return "skip"; try { - const content = await callOpenRouter(config, [ - { - role: "system", - content: `Two memories may conflict with each other. Determine which should be kept. + const content = await callOpenRouter( + config, + [ + { + role: "system", + content: `Two memories may conflict with each other. Determine which should be kept. If they genuinely contradict each other, keep the one that is more current, specific, or accurate. If they don't actually conflict (they cover different aspects or are both valid), keep both. Return JSON: {"keep": "a"|"b"|"both", "reason": "brief explanation"}`, - }, - { role: "user", content: `Memory A: "${memA}"\nMemory B: "${memB}"` }, - ]); + }, + { role: "user", content: `Memory A: "${memA}"\nMemory B: "${memB}"` }, + ], + abortSignal, + ); if (!content) return "skip"; const parsed = JSON.parse(content) as { keep?: string }; @@ -319,6 +433,7 @@ export async function runBackgroundExtraction( config: ExtractionConfig, logger: Logger, currentRetries: number = 0, + abortSignal?: AbortSignal, ): Promise { if (!config.enabled) { await db.updateExtractionStatus(memoryId, "skipped").catch(() => {}); @@ -326,7 +441,7 @@ export async function runBackgroundExtraction( } try { - const { result, transientFailure } = await extractEntities(text, config); + const { result, transientFailure } = await extractEntities(text, config, abortSignal); if (!result) { if (transientFailure) { @@ -483,6 +598,7 @@ export type SleepCycleOptions = { decayRetentionThreshold?: number; // Below this, memory is pruned (default: 0.1) decayBaseHalfLifeDays?: number; // Base half-life in days (default: 30) decayImportanceMultiplier?: number; // How much importance extends half-life (default: 2) + decayCurves?: Record; // Per-category decay curve overrides // Progress callback onPhaseStart?: ( @@ -552,6 +668,7 @@ export async function runSleepCycle( decayRetentionThreshold = 0.1, decayBaseHalfLifeDays = 30, decayImportanceMultiplier = 2, + decayCurves, extractionBatchSize = 50, extractionDelayMs = 1000, onPhaseStart, @@ -642,7 +759,7 @@ export async function runSleepCycle( onPhaseStart?.("semanticDedup"); logger.info("memory-neo4j: [sleep] Phase 1b: Semantic Deduplication (0.75-0.95 band)"); - // Collect all candidate pairs upfront + // Collect all candidate pairs upfront (with pairwise similarity for pre-screening) type DedupPair = { textA: string; textB: string; @@ -650,6 +767,7 @@ export async function runSleepCycle( idB: string; importanceA: number; importanceB: number; + similarity?: number; }; const allPairs: DedupPair[] = []; @@ -657,6 +775,7 @@ export async function runSleepCycle( if (cluster.memoryIds.length < 2) continue; for (let i = 0; i < cluster.memoryIds.length - 1; i++) { for (let j = i + 1; j < cluster.memoryIds.length; j++) { + const pairKey = makePairKey(cluster.memoryIds[i], cluster.memoryIds[j]); allPairs.push({ textA: cluster.texts[i], textB: cluster.texts[j], @@ -664,6 +783,7 @@ export async function runSleepCycle( idB: cluster.memoryIds[j], importanceA: cluster.importances[i], importanceB: cluster.importances[j], + similarity: cluster.similarities?.get(pairKey), }); } } @@ -683,7 +803,9 @@ export async function runSleepCycle( if (activeBatch.length === 0) continue; const outcomes = await Promise.allSettled( - activeBatch.map((p) => isSemanticDuplicate(p.textA, p.textB, config)), + activeBatch.map((p) => + isSemanticDuplicate(p.textA, p.textB, config, p.similarity, abortSignal), + ), ); for (let k = 0; k < outcomes.length; k++) { @@ -737,7 +859,9 @@ export async function runSleepCycle( for (let i = 0; i < pairs.length && !abortSignal?.aborted; i += LLM_CONCURRENCY) { const chunk = pairs.slice(i, i + LLM_CONCURRENCY); const outcomes = await Promise.allSettled( - chunk.map((pair) => resolveConflict(pair.memoryA.text, pair.memoryB.text, config)), + chunk.map((pair) => + resolveConflict(pair.memoryA.text, pair.memoryB.text, config, abortSignal), + ), ); for (let k = 0; k < outcomes.length; k++) { @@ -927,6 +1051,7 @@ export async function runSleepCycle( config, logger, memory.extractionRetries, + abortSignal, ), ), ); @@ -978,6 +1103,7 @@ export async function runSleepCycle( retentionThreshold: decayRetentionThreshold, baseHalfLifeDays: decayBaseHalfLifeDays, importanceMultiplier: decayImportanceMultiplier, + decayCurves, agentId, }); @@ -1004,19 +1130,23 @@ export async function runSleepCycle( try { // Clean up orphan entities - const orphanEntities = await db.findOrphanEntities(); - if (orphanEntities.length > 0) { - result.cleanup.entitiesRemoved = await db.deleteOrphanEntities( - orphanEntities.map((e) => e.id), - ); - onProgress?.("cleanup", `Removed ${result.cleanup.entitiesRemoved} orphan entities`); + if (!abortSignal?.aborted) { + const orphanEntities = await db.findOrphanEntities(); + if (orphanEntities.length > 0) { + result.cleanup.entitiesRemoved = await db.deleteOrphanEntities( + orphanEntities.map((e) => e.id), + ); + onProgress?.("cleanup", `Removed ${result.cleanup.entitiesRemoved} orphan entities`); + } } // Clean up orphan tags - const orphanTags = await db.findOrphanTags(); - if (orphanTags.length > 0) { - result.cleanup.tagsRemoved = await db.deleteOrphanTags(orphanTags.map((t) => t.id)); - onProgress?.("cleanup", `Removed ${result.cleanup.tagsRemoved} orphan tags`); + if (!abortSignal?.aborted) { + const orphanTags = await db.findOrphanTags(); + if (orphanTags.length > 0) { + result.cleanup.tagsRemoved = await db.deleteOrphanTags(orphanTags.map((t) => t.id)); + onProgress?.("cleanup", `Removed ${result.cleanup.tagsRemoved} orphan tags`); + } } logger.info( @@ -1108,9 +1238,21 @@ Rules: Return JSON: {"verdict": "duplicate"|"unique", "reason": "brief explanation"}`; +/** + * Minimum cosine similarity to proceed with the LLM comparison. + * Below this threshold, texts are too dissimilar to be semantic duplicates, + * saving an expensive LLM call. Exported for testing. + */ +export const SEMANTIC_DEDUP_VECTOR_THRESHOLD = 0.8; + /** * Check whether new text is semantically a duplicate of an existing memory. - * Uses an LLM to compare meaning rather than surface similarity. + * + * When a pre-computed vector similarity score is provided (from findSimilar + * or findDuplicateClusters), the LLM call is skipped entirely for pairs + * below SEMANTIC_DEDUP_VECTOR_THRESHOLD — a fast pre-screen that avoids + * the most expensive part of the pipeline. + * * Returns true if the new text is a duplicate (should be skipped). * Returns false on any failure (allow storage). */ @@ -1118,16 +1260,27 @@ export async function isSemanticDuplicate( newText: string, existingText: string, config: ExtractionConfig, + vectorSimilarity?: number, + abortSignal?: AbortSignal, ): Promise { if (!config.enabled) { return false; } + // Vector pre-screen: skip LLM call when similarity is below threshold + if (vectorSimilarity !== undefined && vectorSimilarity < SEMANTIC_DEDUP_VECTOR_THRESHOLD) { + return false; + } + try { - const content = await callOpenRouter(config, [ - { role: "system", content: SEMANTIC_DEDUP_SYSTEM }, - { role: "user", content: `Existing memory: "${existingText}"\nNew text: "${newText}"` }, - ]); + const content = await callOpenRouter( + config, + [ + { role: "system", content: SEMANTIC_DEDUP_SYSTEM }, + { role: "user", content: `Existing memory: "${existingText}"\nNew text: "${newText}"` }, + ], + abortSignal, + ); if (!content) { return false; } diff --git a/extensions/memory-neo4j/index.ts b/extensions/memory-neo4j/index.ts index 5a1b279612..ac30a0a701 100644 --- a/extensions/memory-neo4j/index.ts +++ b/extensions/memory-neo4j/index.ts @@ -126,6 +126,7 @@ const memoryNeo4jPlugin = { limit, agentId, extractionConfig.enabled, + { graphSearchDepth: cfg.graphSearchDepth }, ); if (results.length === 0) { @@ -434,6 +435,7 @@ const memoryNeo4jPlugin = { parseInt(opts.limit, 10), "default", extractionConfig.enabled, + { graphSearchDepth: cfg.graphSearchDepth }, ); const output = results.map((r) => ({ id: r.id, @@ -594,6 +596,8 @@ const memoryNeo4jPlugin = { promotionMinAgeDays: promotionMinAge, decayRetentionThreshold: decayThreshold, decayBaseHalfLifeDays: decayHalfLife, + decayCurves: + Object.keys(cfg.decayCurves).length > 0 ? cfg.decayCurves : undefined, extractionBatchSize: batchSize, extractionDelayMs: delay, onPhaseStart: (phase) => { @@ -964,6 +968,7 @@ const memoryNeo4jPlugin = { 3, agentId, extractionConfig.enabled, + { graphSearchDepth: cfg.graphSearchDepth }, ); // Feature 1: Filter out low-relevance results below min RRF score @@ -1213,10 +1218,12 @@ async function captureMessage( } // Semantic dedup: check moderate-similarity memories (0.75-0.95) + // Pass the vector similarity score as a pre-screen to skip LLM calls + // for pairs below SEMANTIC_DEDUP_VECTOR_THRESHOLD. const candidates = await db.findSimilar(vector, 0.75, 3); if (candidates.length > 0) { for (const candidate of candidates) { - if (await isSemanticDuplicate(text, candidate.text, extractionConfig)) { + if (await isSemanticDuplicate(text, candidate.text, extractionConfig, candidate.score)) { logger.debug?.( `memory-neo4j: semantic dedup — skipped "${text.slice(0, 60)}..." (duplicate of "${candidate.text.slice(0, 60)}...")`, ); diff --git a/extensions/memory-neo4j/neo4j-client.ts b/extensions/memory-neo4j/neo4j-client.ts index 5d1e337f27..f3a3814d51 100644 --- a/extensions/memory-neo4j/neo4j-client.ts +++ b/extensions/memory-neo4j/neo4j-client.ts @@ -518,6 +518,7 @@ export class Neo4jMemoryClient { limit: number, firingThreshold: number = 0.3, agentId?: string, + maxHops: number = 1, ): Promise { await this.ensureInitialized(); const escaped = escapeLucene(query); @@ -529,8 +530,10 @@ export class Neo4jMemoryClient { return await this.retryOnTransient(async () => { const session = this.driver!.session(); try { - // Single query: entity fulltext lookup → direct mentions + 1-hop spreading activation + // Single query: entity fulltext lookup → direct mentions + N-hop spreading activation const agentFilter = agentId ? "AND m.agentId = $agentId" : ""; + // Variable-length relationship pattern: 1..maxHops hops through entity relationships + const hopRange = `1..${Math.max(1, Math.min(3, maxHops))}`; const result = await session.run( `// Find matching entities via fulltext index CALL db.index.fulltext.queryNodes('entity_fulltext_index', $query) @@ -560,12 +563,12 @@ export class Neo4jMemoryClient { ORDER BY score DESC LIMIT 5 - // 1-hop: Entity → relationship → Entity ← MENTIONS ← Memory - OPTIONAL MATCH (entity)-[r1:${RELATIONSHIP_TYPE_PATTERN}]-(e2:Entity) - WHERE coalesce(r1.confidence, 0.7) >= $firingThreshold + // N-hop: Entity -[rels*1..N]-> Entity ← MENTIONS ← Memory + OPTIONAL MATCH (entity)-[rels:${RELATIONSHIP_TYPE_PATTERN}*${hopRange}]-(e2:Entity) + WHERE ALL(r IN rels WHERE coalesce(r.confidence, 0.7) >= $firingThreshold) OPTIONAL MATCH (e2)<-[rm:MENTIONS]-(m:Memory) WHERE m IS NOT NULL ${agentFilter} - WITH m, coalesce(r1.confidence, 0.7) * coalesce(rm.confidence, 1.0) AS hopScore + WITH m, reduce(s = 1.0, r IN rels | s * coalesce(r.confidence, 0.7)) * coalesce(rm.confidence, 1.0) AS hopScore WHERE m IS NOT NULL RETURN m.id AS id, m.text AS text, m.category AS category, @@ -1373,6 +1376,8 @@ export class Neo4jMemoryClient { retentionThreshold?: number; // Below this score, memory is pruned (default: 0.1) baseHalfLifeDays?: number; // Base half-life for decay (default: 30) importanceMultiplier?: number; // How much importance extends half-life (default: 2) + /** Per-category half-life overrides. Categories not listed use baseHalfLifeDays. */ + decayCurves?: Record; agentId?: string; limit?: number; } = {}, @@ -1383,6 +1388,7 @@ export class Neo4jMemoryClient { retentionThreshold = 0.1, baseHalfLifeDays = 30, importanceMultiplier = 2, + decayCurves, agentId, limit = 500, } = options; @@ -1391,6 +1397,19 @@ export class Neo4jMemoryClient { const session = this.driver!.session(); try { const agentFilter = agentId ? "AND m.agentId = $agentId" : ""; + + // Build per-category half-life CASE expression if curves are configured + let halfLifeExpr = "$baseHalfLife"; + if (decayCurves && Object.keys(decayCurves).length > 0) { + const cases = Object.entries(decayCurves) + .map( + ([cat, { halfLifeDays }]) => + `WHEN m.category = '${cat.replace(/'/g, "\\'")}' THEN ${halfLifeDays}`, + ) + .join(" "); + halfLifeExpr = `CASE ${cases} ELSE $baseHalfLife END`; + } + const result = await session.run( `MATCH (m:Memory) WHERE m.createdAt IS NOT NULL @@ -1400,7 +1419,7 @@ export class Neo4jMemoryClient { duration.between(datetime(m.createdAt), datetime()).days AS ageDays, m.importance AS importance WITH m, ageDays, importance, - $baseHalfLife * (1.0 + importance * $importanceMult) AS halfLife + ${halfLifeExpr} * (1.0 + importance * $importanceMult) AS halfLife WITH m, ageDays, importance, halfLife, importance * exp(-1.0 * ageDays / halfLife) AS decayScore WHERE decayScore < $threshold diff --git a/extensions/memory-neo4j/openclaw.plugin.json b/extensions/memory-neo4j/openclaw.plugin.json index b0ca8eb66a..5ea257df7c 100644 --- a/extensions/memory-neo4j/openclaw.plugin.json +++ b/extensions/memory-neo4j/openclaw.plugin.json @@ -75,6 +75,14 @@ "label": "Extraction Base URL", "placeholder": "https://openrouter.ai/api/v1", "help": "Base URL for extraction API (e.g., https://openrouter.ai/api/v1 or http://localhost:11434/v1 for Ollama)" + }, + "graphSearchDepth": { + "label": "Graph Search Depth", + "help": "Maximum relationship hops for graph search spreading activation (1-3, default: 1)" + }, + "decayCurves": { + "label": "Decay Curves", + "help": "Per-category decay curve overrides. Example: {\"fact\": {\"halfLifeDays\": 60}, \"other\": {\"halfLifeDays\": 14}}" } }, "configSchema": { @@ -162,6 +170,24 @@ "type": "string" } } + }, + "graphSearchDepth": { + "type": "number", + "minimum": 1, + "maximum": 3 + }, + "decayCurves": { + "type": "object", + "additionalProperties": { + "type": "object", + "properties": { + "halfLifeDays": { + "type": "number", + "minimum": 1 + } + }, + "required": ["halfLifeDays"] + } } }, "required": ["neo4j"] diff --git a/extensions/memory-neo4j/search.ts b/extensions/memory-neo4j/search.ts index db0a6b22ac..528c801584 100644 --- a/extensions/memory-neo4j/search.ts +++ b/extensions/memory-neo4j/search.ts @@ -217,9 +217,15 @@ export async function hybridSearch( rrfK?: number; candidateMultiplier?: number; graphFiringThreshold?: number; + graphSearchDepth?: number; } = {}, ): Promise { - const { rrfK = 60, candidateMultiplier = 4, graphFiringThreshold = 0.3 } = options; + const { + rrfK = 60, + candidateMultiplier = 4, + graphFiringThreshold = 0.3, + graphSearchDepth = 1, + } = options; const candidateLimit = Math.floor(Math.min(200, Math.max(1, limit * candidateMultiplier))); @@ -235,7 +241,7 @@ export async function hybridSearch( db.vectorSearch(queryEmbedding, candidateLimit, 0.1, agentId), db.bm25Search(query, candidateLimit, agentId), graphEnabled - ? db.graphSearch(query, candidateLimit, graphFiringThreshold, agentId) + ? db.graphSearch(query, candidateLimit, graphFiringThreshold, agentId, graphSearchDepth) : Promise.resolve([] as SearchSignalResult[]), ]);