Files
AutoGPT/docs/search.py
2025-12-16 15:04:38 +01:00

716 lines
23 KiB
Python

#!/usr/bin/env python3
"""
Documentation Search
Hybrid search combining:
- Semantic search (OpenAI embeddings + cosine similarity)
- Lexical search (BM25)
- Authority ranking (PageRank)
- Title matching
- Content quality signals
Based on ZIM-Plus search architecture with tunable weights.
Usage:
python search.py "your query" [--index index.bin] [--top-k 10]
"""
import argparse
import re
import sys
from dataclasses import dataclass
from pathlib import Path
from typing import Optional
import numpy as np
try:
from openai import OpenAI
HAS_OPENAI = True
except ImportError:
HAS_OPENAI = False
try:
from rank_bm25 import BM25Okapi
HAS_BM25 = True
except ImportError:
HAS_BM25 = False
try:
from sentence_transformers import SentenceTransformer
HAS_SENTENCE_TRANSFORMERS = True
except ImportError:
HAS_SENTENCE_TRANSFORMERS = False
from index import SearchIndex, Chunk, Document, load_index, tokenize
# ============================================================================
# Search Configuration
# ============================================================================
@dataclass
class SearchWeights:
"""
Hybrid search weight configuration.
Based on ZIM-Plus reranking signals:
- semantic: Cosine similarity from embeddings
- title_match: Query terms appearing in title
- url_path_match: Query terms appearing in URL/path
- bm25: Sparse lexical matching score
- content_quality: Penalizes TOC/nav/boilerplate chunks
- pagerank: Link authority score
- position_boost: Prefers earlier chunks in document
All weights should sum to 1.0 for interpretability.
"""
semantic: float = 0.30
title_match: float = 0.20
url_path_match: float = 0.15
bm25: float = 0.15
content_quality: float = 0.10
pagerank: float = 0.05
position_boost: float = 0.05
# Diversity penalty: max chunks per document
max_chunks_per_doc: int = 2
def validate(self) -> None:
"""Ensure weights are valid."""
total = (self.semantic + self.title_match + self.url_path_match +
self.bm25 + self.content_quality + self.pagerank +
self.position_boost)
if abs(total - 1.0) > 0.01:
print(f"Warning: Weights sum to {total:.3f}, not 1.0")
# Default weights (tuned for documentation search)
DEFAULT_WEIGHTS = SearchWeights()
# Alternative weight presets for different use cases
WEIGHT_PRESETS = {
"semantic_heavy": SearchWeights(
semantic=0.50, title_match=0.15, url_path_match=0.10,
bm25=0.10, content_quality=0.05, pagerank=0.05, position_boost=0.05
),
"keyword_heavy": SearchWeights(
semantic=0.20, title_match=0.20, url_path_match=0.15,
bm25=0.30, content_quality=0.05, pagerank=0.05, position_boost=0.05
),
"authority_heavy": SearchWeights(
semantic=0.25, title_match=0.15, url_path_match=0.10,
bm25=0.15, content_quality=0.10, pagerank=0.20, position_boost=0.05
),
}
# ============================================================================
# Search Result
# ============================================================================
@dataclass
class SearchResult:
"""A single search result with scoring breakdown."""
chunk: Chunk
score: float # Final combined score
# Individual signal scores (for debugging/tuning)
semantic_score: float = 0.0
title_score: float = 0.0
path_score: float = 0.0
bm25_score: float = 0.0
quality_score: float = 0.0
pagerank_score: float = 0.0
position_score: float = 0.0
def __str__(self) -> str:
return (
f"[{self.score:.3f}] {self.chunk.doc_title}\n"
f" Path: {self.chunk.doc_path}\n"
f" Heading: {self.chunk.heading}\n"
f" Scores: sem={self.semantic_score:.2f} title={self.title_score:.2f} "
f"path={self.path_score:.2f} bm25={self.bm25_score:.2f} "
f"qual={self.quality_score:.2f} pr={self.pagerank_score:.2f} "
f"pos={self.position_score:.2f}"
)
# ============================================================================
# Search Engine
# ============================================================================
class HybridSearcher:
"""
Hybrid search engine combining multiple ranking signals.
"""
def __init__(
self,
index: SearchIndex,
weights: SearchWeights = DEFAULT_WEIGHTS,
openai_client: Optional[OpenAI] = None
):
self.index = index
self.weights = weights
self.weights.validate()
# Detect if index was built with local embeddings (sentence-transformers)
self.use_local_embeddings = index.embedding_model in [
"all-MiniLM-L6-v2", "all-mpnet-base-v2", "paraphrase-MiniLM-L6-v2"
]
# Initialize local embedding model if needed
self.local_model = None
if self.use_local_embeddings and HAS_SENTENCE_TRANSFORMERS:
self.local_model = SentenceTransformer(index.embedding_model)
# Initialize OpenAI client for query embedding (only if not using local)
self.openai_client = None
if not self.use_local_embeddings:
self.openai_client = openai_client
if openai_client is None and HAS_OPENAI:
self.openai_client = OpenAI()
# Build BM25 index from stored corpus
self.bm25 = None
if HAS_BM25 and index.bm25_corpus:
self.bm25 = BM25Okapi(index.bm25_corpus)
# Precompute normalized embeddings for faster cosine similarity
self.normalized_embeddings = None
if index.embeddings is not None:
norms = np.linalg.norm(index.embeddings, axis=1, keepdims=True)
self.normalized_embeddings = index.embeddings / (norms + 1e-10)
def embed_query(self, query: str) -> Optional[np.ndarray]:
"""Get embedding for search query."""
# Use local model if available
if self.local_model is not None:
embedding = self.local_model.encode(query, convert_to_numpy=True)
return embedding.astype(np.float32)
# Fall back to OpenAI
if self.openai_client is None:
return None
response = self.openai_client.embeddings.create(
model=self.index.embedding_model,
input=query
)
return np.array(response.data[0].embedding, dtype=np.float32)
def compute_semantic_scores(self, query_embedding: np.ndarray) -> np.ndarray:
"""Compute cosine similarity between query and all chunks."""
if self.normalized_embeddings is None:
return np.zeros(len(self.index.chunks))
# Normalize query embedding
query_norm = query_embedding / (np.linalg.norm(query_embedding) + 1e-10)
# Cosine similarity via dot product of normalized vectors
similarities = self.normalized_embeddings @ query_norm
# Normalize to [0, 1] range
similarities = (similarities + 1) / 2 # cosine ranges from -1 to 1
return similarities
def compute_bm25_scores(self, query_tokens: list[str]) -> np.ndarray:
"""Compute BM25 scores for all chunks."""
if self.bm25 is None:
return np.zeros(len(self.index.chunks))
scores = self.bm25.get_scores(query_tokens)
# Normalize to [0, 1] range
if scores.max() > 0:
scores = scores / scores.max()
return scores
def compute_title_scores(self, query_tokens: list[str]) -> np.ndarray:
"""Compute title match scores for all chunks."""
scores = np.zeros(len(self.index.chunks))
query_set = set(query_tokens)
for chunk_idx, chunk in enumerate(self.index.chunks):
title_tokens = set(tokenize(chunk.doc_title))
# Exact matches
exact_matches = len(query_set & title_tokens)
# Partial matches (substring)
partial_matches = 0
for qt in query_tokens:
for tt in title_tokens:
if qt in tt or tt in qt:
partial_matches += 0.5
# Compute score
if query_tokens:
scores[chunk_idx] = (exact_matches * 2 + partial_matches) / (len(query_tokens) * 2)
return np.clip(scores, 0, 1)
def compute_path_scores(self, query_tokens: list[str]) -> np.ndarray:
"""Compute URL/path match scores for all chunks."""
scores = np.zeros(len(self.index.chunks))
query_set = set(query_tokens)
for chunk_idx, chunk in enumerate(self.index.chunks):
# Extract path components
path_parts = re.split(r'[/_-]', chunk.doc_path.lower())
path_parts = [p.replace('.md', '') for p in path_parts if p]
path_set = set(path_parts)
# Count matches
matches = len(query_set & path_set)
# Partial matches
partial = 0
for qt in query_tokens:
for pp in path_parts:
if qt in pp or pp in qt:
partial += 0.5
if query_tokens:
scores[chunk_idx] = (matches * 2 + partial) / (len(query_tokens) * 2)
return np.clip(scores, 0, 1)
def compute_quality_scores(self) -> np.ndarray:
"""
Compute content quality scores.
Penalizes:
- TOC/navigation chunks (lots of links, little content)
- Very short chunks
- Chunks that are mostly code
"""
scores = np.ones(len(self.index.chunks))
for chunk_idx, chunk in enumerate(self.index.chunks):
text = chunk.text
penalty = 0.0
# Penalize TOC-like content (many links)
link_count = len(re.findall(r'\[([^\]]+)\]\([^)]+\)', text))
if link_count > 10:
penalty += 0.3
# Penalize very short chunks
if len(text) < 200:
penalty += 0.2
# Penalize chunks that are mostly code
code_blocks = re.findall(r'```[\s\S]*?```', text)
code_length = sum(len(b) for b in code_blocks)
if len(text) > 0 and code_length / len(text) > 0.8:
penalty += 0.2
# Penalize index/navigation pages
if chunk.doc_path.endswith('index.md'):
penalty += 0.1
scores[chunk_idx] = max(0, 1 - penalty)
return scores
def compute_pagerank_scores(self) -> np.ndarray:
"""Get PageRank scores for all chunks (by document)."""
scores = np.zeros(len(self.index.chunks))
for chunk_idx, chunk in enumerate(self.index.chunks):
scores[chunk_idx] = self.index.pagerank.get(chunk.doc_path, 0.0)
return scores
def compute_position_scores(self) -> np.ndarray:
"""Compute position boost (prefer earlier chunks in document)."""
scores = np.zeros(len(self.index.chunks))
# Group chunks by document
doc_chunks = {}
for chunk_idx, chunk in enumerate(self.index.chunks):
if chunk.doc_path not in doc_chunks:
doc_chunks[chunk.doc_path] = []
doc_chunks[chunk.doc_path].append(chunk_idx)
for doc_path, chunk_indices in doc_chunks.items():
n = len(chunk_indices)
for i, chunk_idx in enumerate(chunk_indices):
# Earlier chunks get higher scores (linear decay)
scores[chunk_idx] = 1 - (i / max(n, 1))
return scores
def search(
self,
query: str,
top_k: int = 10,
apply_diversity: bool = True
) -> list[SearchResult]:
"""
Perform hybrid search.
Args:
query: Search query string
top_k: Number of results to return
apply_diversity: Apply diversity penalty (max chunks per doc)
Returns:
List of SearchResult objects sorted by score
"""
# Tokenize query
query_tokens = tokenize(query)
if not query_tokens:
return []
# Compute all signal scores
semantic_scores = np.zeros(len(self.index.chunks))
if self.normalized_embeddings is not None and (self.local_model or self.openai_client):
query_embedding = self.embed_query(query)
if query_embedding is not None:
semantic_scores = self.compute_semantic_scores(query_embedding)
bm25_scores = self.compute_bm25_scores(query_tokens)
title_scores = self.compute_title_scores(query_tokens)
path_scores = self.compute_path_scores(query_tokens)
quality_scores = self.compute_quality_scores()
pagerank_scores = self.compute_pagerank_scores()
position_scores = self.compute_position_scores()
# Combine scores using weights
w = self.weights
combined_scores = (
w.semantic * semantic_scores +
w.title_match * title_scores +
w.url_path_match * path_scores +
w.bm25 * bm25_scores +
w.content_quality * quality_scores +
w.pagerank * pagerank_scores +
w.position_boost * position_scores
)
# Create results
results = []
for chunk_idx in range(len(self.index.chunks)):
results.append(SearchResult(
chunk=self.index.chunks[chunk_idx],
score=combined_scores[chunk_idx],
semantic_score=semantic_scores[chunk_idx],
title_score=title_scores[chunk_idx],
path_score=path_scores[chunk_idx],
bm25_score=bm25_scores[chunk_idx],
quality_score=quality_scores[chunk_idx],
pagerank_score=pagerank_scores[chunk_idx],
position_score=position_scores[chunk_idx]
))
# Sort by score
results.sort(key=lambda r: r.score, reverse=True)
# Apply diversity penalty
if apply_diversity:
results = self._apply_diversity(results, top_k)
return results[:top_k]
def _apply_diversity(
self,
results: list[SearchResult],
target_k: int
) -> list[SearchResult]:
"""
Deduplicate results from the same document unless they point to
different sections (headings) within the page.
Logic:
1. Only keep one result per unique (doc_path, heading) combination
2. Additionally limit total chunks per document to max_chunks_per_doc
"""
seen_sections = set() # (doc_path, heading) tuples
doc_counts = {} # doc_path -> count
filtered = []
for result in results:
doc_path = result.chunk.doc_path
heading = result.chunk.heading
section_key = (doc_path, heading)
# Skip if we've already seen this exact section
if section_key in seen_sections:
continue
# Also enforce max chunks per document
doc_count = doc_counts.get(doc_path, 0)
if doc_count >= self.weights.max_chunks_per_doc:
continue
# Keep this result
seen_sections.add(section_key)
doc_counts[doc_path] = doc_count + 1
filtered.append(result)
if len(filtered) >= target_k * 2: # Get extra for buffer
break
return filtered
def search_title_only(self, query: str, top_k: int = 10) -> list[SearchResult]:
"""
Fallback search using only title index and PageRank.
Useful when embeddings aren't available.
"""
query_tokens = tokenize(query)
if not query_tokens:
return []
# Score documents by title match
doc_scores = {}
for token in query_tokens:
if token in self.index.title_index:
for doc_idx, score in self.index.title_index[token]:
doc_path = self.index.documents[doc_idx].path
doc_scores[doc_path] = doc_scores.get(doc_path, 0) + score
# Boost by PageRank
for doc_path in doc_scores:
pr = self.index.pagerank.get(doc_path, 0.0)
doc_scores[doc_path] *= (1 + pr)
# Get top documents and their first chunks
sorted_docs = sorted(doc_scores.items(), key=lambda x: x[1], reverse=True)
results = []
for doc_path, score in sorted_docs[:top_k]:
# Find first chunk of this document
for chunk in self.index.chunks:
if chunk.doc_path == doc_path:
results.append(SearchResult(
chunk=chunk,
score=score,
title_score=score
))
break
return results
# ============================================================================
# Reciprocal Rank Fusion (Alternative scoring method)
# ============================================================================
def reciprocal_rank_fusion(
rankings: list[list[int]],
k: int = 60
) -> list[tuple[int, float]]:
"""
Combine multiple rankings using Reciprocal Rank Fusion.
RRF is an alternative to weighted linear combination that's
less sensitive to score scale differences.
Args:
rankings: List of rankings (each is list of chunk indices)
k: RRF parameter (default 60 works well)
Returns:
List of (chunk_idx, rrf_score) tuples sorted by score
"""
scores = {}
for ranking in rankings:
for rank, chunk_idx in enumerate(ranking):
if chunk_idx not in scores:
scores[chunk_idx] = 0
scores[chunk_idx] += 1 / (k + rank + 1)
return sorted(scores.items(), key=lambda x: x[1], reverse=True)
class RRFSearcher(HybridSearcher):
"""
Alternative searcher using Reciprocal Rank Fusion instead of
weighted linear combination.
"""
def search(
self,
query: str,
top_k: int = 10,
apply_diversity: bool = True
) -> list[SearchResult]:
"""Search using RRF fusion."""
query_tokens = tokenize(query)
if not query_tokens:
return []
# Get individual rankings
rankings = []
# Semantic ranking
if self.normalized_embeddings is not None and (self.local_model or self.openai_client):
query_embedding = self.embed_query(query)
if query_embedding is not None:
scores = self.compute_semantic_scores(query_embedding)
rankings.append(np.argsort(scores)[::-1].tolist())
# BM25 ranking
if self.bm25:
scores = self.compute_bm25_scores(query_tokens)
rankings.append(np.argsort(scores)[::-1].tolist())
# Title ranking
scores = self.compute_title_scores(query_tokens)
rankings.append(np.argsort(scores)[::-1].tolist())
if not rankings:
return []
# Fuse rankings
fused = reciprocal_rank_fusion(rankings)
# Build results
results = []
for chunk_idx, score in fused[:top_k * 3]: # Extra for diversity
chunk = self.index.chunks[chunk_idx]
results.append(SearchResult(
chunk=chunk,
score=score
))
# Apply diversity
if apply_diversity:
results = self._apply_diversity(results, top_k)
return results[:top_k]
# ============================================================================
# CLI
# ============================================================================
def format_result(result: SearchResult, show_text: bool = True) -> str:
"""Format a search result for display."""
lines = [
f"\n{'='*60}",
f"Score: {result.score:.3f}",
f"Title: {result.chunk.doc_title}",
f"Path: {result.chunk.doc_path}",
f"Section: {result.chunk.heading}",
]
if show_text:
# Truncate text for display
text = result.chunk.text[:500]
if len(result.chunk.text) > 500:
text += "..."
lines.append(f"\n{text}")
return '\n'.join(lines)
def main():
parser = argparse.ArgumentParser(
description="Search documentation using hybrid search"
)
parser.add_argument(
'query',
type=str,
help='Search query'
)
parser.add_argument(
'--index',
type=Path,
default=Path('./index.bin'),
help='Path to index file (default: ./index.bin)'
)
parser.add_argument(
'--top-k',
type=int,
default=5,
help='Number of results (default: 5)'
)
parser.add_argument(
'--weights',
type=str,
choices=['default', 'semantic_heavy', 'keyword_heavy', 'authority_heavy'],
default='default',
help='Weight preset (default: default)'
)
parser.add_argument(
'--rrf',
action='store_true',
help='Use Reciprocal Rank Fusion instead of weighted combination'
)
parser.add_argument(
'--no-diversity',
action='store_true',
help='Disable diversity penalty'
)
parser.add_argument(
'--debug',
action='store_true',
help='Show detailed scoring breakdown'
)
parser.add_argument(
'--no-text',
action='store_true',
help='Hide result text snippets'
)
args = parser.parse_args()
if not args.index.exists():
print(f"Error: Index file not found: {args.index}")
print("Run 'python index.py' first to create the index.")
sys.exit(1)
# Load index
print(f"Loading index from {args.index}...")
index = load_index(args.index)
print(f"Loaded {len(index.chunks)} chunks from {len(index.documents)} documents")
# Select weights
weights = DEFAULT_WEIGHTS
if args.weights != 'default':
weights = WEIGHT_PRESETS[args.weights]
# Create searcher
SearcherClass = RRFSearcher if args.rrf else HybridSearcher
searcher = SearcherClass(index, weights)
# Search
print(f"\nSearching for: '{args.query}'")
results = searcher.search(
args.query,
top_k=args.top_k,
apply_diversity=not args.no_diversity
)
if not results:
print("No results found.")
return
print(f"\nFound {len(results)} results:")
for i, result in enumerate(results, 1):
print(f"\n--- Result {i} ---")
print(format_result(result, show_text=not args.no_text))
if args.debug:
print(f"\nScore breakdown:")
print(f" Semantic: {result.semantic_score:.3f}")
print(f" Title match: {result.title_score:.3f}")
print(f" Path match: {result.path_score:.3f}")
print(f" BM25: {result.bm25_score:.3f}")
print(f" Quality: {result.quality_score:.3f}")
print(f" PageRank: {result.pagerank_score:.3f}")
print(f" Position: {result.position_score:.3f}")
if __name__ == '__main__':
main()