mirror of
https://github.com/Significant-Gravitas/AutoGPT.git
synced 2026-04-08 03:00:28 -04:00
added embedded store search
This commit is contained in:
@@ -9,6 +9,7 @@ import prisma.enums
|
||||
|
||||
import backend.server.v2.store.cache as store_cache
|
||||
import backend.server.v2.store.db
|
||||
import backend.server.v2.store.embeddings as store_embeddings
|
||||
import backend.server.v2.store.model
|
||||
import backend.util.json
|
||||
|
||||
@@ -152,3 +153,54 @@ async def admin_download_agent_file(
|
||||
return fastapi.responses.FileResponse(
|
||||
tmp_file.name, filename=file_name, media_type="application/json"
|
||||
)
|
||||
|
||||
|
||||
@router.get(
|
||||
"/embeddings/stats",
|
||||
summary="Get Embedding Statistics",
|
||||
)
|
||||
async def get_embedding_stats() -> dict[str, typing.Any]:
|
||||
"""
|
||||
Get statistics about embedding coverage for store listings.
|
||||
|
||||
Returns counts of total approved listings, listings with embeddings,
|
||||
listings without embeddings, and coverage percentage.
|
||||
"""
|
||||
try:
|
||||
stats = await store_embeddings.get_embedding_stats()
|
||||
return stats
|
||||
except Exception as e:
|
||||
logger.exception("Error getting embedding stats: %s", e)
|
||||
raise fastapi.HTTPException(
|
||||
status_code=500,
|
||||
detail="An error occurred while retrieving embedding stats",
|
||||
)
|
||||
|
||||
|
||||
@router.post(
|
||||
"/embeddings/backfill",
|
||||
summary="Backfill Missing Embeddings",
|
||||
)
|
||||
async def backfill_embeddings(
|
||||
batch_size: int = 10,
|
||||
) -> dict[str, typing.Any]:
|
||||
"""
|
||||
Trigger backfill of embeddings for approved listings that don't have them.
|
||||
|
||||
Args:
|
||||
batch_size: Number of embeddings to generate in one call (default 10)
|
||||
|
||||
Returns:
|
||||
Dict with processed count, success count, failure count, and message
|
||||
"""
|
||||
try:
|
||||
result = await store_embeddings.backfill_missing_embeddings(
|
||||
batch_size=batch_size
|
||||
)
|
||||
return result
|
||||
except Exception as e:
|
||||
logger.exception("Error backfilling embeddings: %s", e)
|
||||
raise fastapi.HTTPException(
|
||||
status_code=500,
|
||||
detail="An error occurred while backfilling embeddings",
|
||||
)
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
import asyncio
|
||||
import logging
|
||||
import typing
|
||||
from datetime import datetime, timezone
|
||||
from typing import Literal
|
||||
|
||||
@@ -12,7 +11,7 @@ import prisma.types
|
||||
|
||||
import backend.server.v2.store.exceptions
|
||||
import backend.server.v2.store.model
|
||||
from backend.data.db import query_raw_with_schema, transaction
|
||||
from backend.data.db import transaction
|
||||
from backend.data.graph import (
|
||||
GraphMeta,
|
||||
GraphModel,
|
||||
@@ -56,95 +55,21 @@ async def get_store_agents(
|
||||
)
|
||||
|
||||
try:
|
||||
# If search_query is provided, use full-text search
|
||||
# If search_query is provided, use hybrid search (embeddings + tsvector)
|
||||
if search_query:
|
||||
offset = (page - 1) * page_size
|
||||
from backend.server.v2.store.hybrid_search import hybrid_search
|
||||
|
||||
# Whitelist allowed order_by columns
|
||||
ALLOWED_ORDER_BY = {
|
||||
"rating": "rating DESC, rank DESC",
|
||||
"runs": "runs DESC, rank DESC",
|
||||
"name": "agent_name ASC, rank ASC",
|
||||
"updated_at": "updated_at DESC, rank DESC",
|
||||
}
|
||||
# Use hybrid search combining semantic and lexical signals
|
||||
agents, total = await hybrid_search(
|
||||
query=search_query,
|
||||
featured=featured,
|
||||
creators=creators,
|
||||
category=category,
|
||||
sorted_by="relevance", # Use hybrid scoring for relevance
|
||||
page=page,
|
||||
page_size=page_size,
|
||||
)
|
||||
|
||||
# Validate and get order clause
|
||||
if sorted_by and sorted_by in ALLOWED_ORDER_BY:
|
||||
order_by_clause = ALLOWED_ORDER_BY[sorted_by]
|
||||
else:
|
||||
order_by_clause = "updated_at DESC, rank DESC"
|
||||
|
||||
# Build WHERE conditions and parameters list
|
||||
where_parts: list[str] = []
|
||||
params: list[typing.Any] = [search_query] # $1 - search term
|
||||
param_index = 2 # Start at $2 for next parameter
|
||||
|
||||
# Always filter for available agents
|
||||
where_parts.append("is_available = true")
|
||||
|
||||
if featured:
|
||||
where_parts.append("featured = true")
|
||||
|
||||
if creators and creators:
|
||||
# Use ANY with array parameter
|
||||
where_parts.append(f"creator_username = ANY(${param_index})")
|
||||
params.append(creators)
|
||||
param_index += 1
|
||||
|
||||
if category and category:
|
||||
where_parts.append(f"${param_index} = ANY(categories)")
|
||||
params.append(category)
|
||||
param_index += 1
|
||||
|
||||
sql_where_clause: str = " AND ".join(where_parts) if where_parts else "1=1"
|
||||
|
||||
# Add pagination params
|
||||
params.extend([page_size, offset])
|
||||
limit_param = f"${param_index}"
|
||||
offset_param = f"${param_index + 1}"
|
||||
|
||||
# Execute full-text search query with parameterized values
|
||||
sql_query = f"""
|
||||
SELECT
|
||||
slug,
|
||||
agent_name,
|
||||
agent_image,
|
||||
creator_username,
|
||||
creator_avatar,
|
||||
sub_heading,
|
||||
description,
|
||||
runs,
|
||||
rating,
|
||||
categories,
|
||||
featured,
|
||||
is_available,
|
||||
updated_at,
|
||||
ts_rank_cd(search, query) AS rank
|
||||
FROM {{schema_prefix}}"StoreAgent",
|
||||
plainto_tsquery('english', $1) AS query
|
||||
WHERE {sql_where_clause}
|
||||
AND search @@ query
|
||||
ORDER BY {order_by_clause}
|
||||
LIMIT {limit_param} OFFSET {offset_param}
|
||||
"""
|
||||
|
||||
# Count query for pagination - only uses search term parameter
|
||||
count_query = f"""
|
||||
SELECT COUNT(*) as count
|
||||
FROM {{schema_prefix}}"StoreAgent",
|
||||
plainto_tsquery('english', $1) AS query
|
||||
WHERE {sql_where_clause}
|
||||
AND search @@ query
|
||||
"""
|
||||
|
||||
# Execute both queries with parameters
|
||||
agents = await query_raw_with_schema(sql_query, *params)
|
||||
|
||||
# For count, use params without pagination (last 2 params)
|
||||
count_params = params[:-2]
|
||||
count_result = await query_raw_with_schema(count_query, *count_params)
|
||||
|
||||
total = count_result[0]["count"] if count_result else 0
|
||||
total_pages = (total + page_size - 1) // page_size
|
||||
|
||||
# Convert raw results to StoreAgent models
|
||||
@@ -1539,6 +1464,24 @@ async def review_store_submission(
|
||||
},
|
||||
)
|
||||
|
||||
# Generate embedding for approved listing (non-blocking)
|
||||
try:
|
||||
from backend.server.v2.store.embeddings import ensure_embedding
|
||||
|
||||
await ensure_embedding(
|
||||
version_id=store_listing_version_id,
|
||||
name=store_listing_version.name,
|
||||
description=store_listing_version.description,
|
||||
sub_heading=store_listing_version.subHeading,
|
||||
categories=store_listing_version.categories or [],
|
||||
)
|
||||
except Exception as e:
|
||||
# Don't fail approval if embedding generation fails
|
||||
logger.warning(
|
||||
f"Failed to generate embedding for approved listing "
|
||||
f"{store_listing_version_id}: {e}"
|
||||
)
|
||||
|
||||
# If rejecting an approved agent, update the StoreListing accordingly
|
||||
if is_rejecting_approved:
|
||||
# Check if there are other approved versions
|
||||
|
||||
406
autogpt_platform/backend/backend/server/v2/store/embeddings.py
Normal file
406
autogpt_platform/backend/backend/server/v2/store/embeddings.py
Normal file
@@ -0,0 +1,406 @@
|
||||
"""
|
||||
Store Listing Embeddings Service
|
||||
|
||||
Handles generation and storage of OpenAI embeddings for store listings
|
||||
to enable semantic/hybrid search.
|
||||
"""
|
||||
|
||||
import hashlib
|
||||
import logging
|
||||
import os
|
||||
from typing import Any
|
||||
|
||||
import prisma
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# OpenAI embedding model configuration
|
||||
EMBEDDING_MODEL = "text-embedding-3-small"
|
||||
EMBEDDING_DIM = 1536
|
||||
|
||||
|
||||
def build_searchable_text(
|
||||
name: str,
|
||||
description: str,
|
||||
sub_heading: str,
|
||||
categories: list[str],
|
||||
) -> str:
|
||||
"""
|
||||
Build searchable text from listing version fields.
|
||||
|
||||
Combines relevant fields into a single string for embedding.
|
||||
"""
|
||||
parts = []
|
||||
|
||||
# Name is important - include it
|
||||
if name:
|
||||
parts.append(name)
|
||||
|
||||
# Sub-heading provides context
|
||||
if sub_heading:
|
||||
parts.append(sub_heading)
|
||||
|
||||
# Description is the main content
|
||||
if description:
|
||||
parts.append(description)
|
||||
|
||||
# Categories help with semantic matching
|
||||
if categories:
|
||||
parts.append(" ".join(categories))
|
||||
|
||||
return " ".join(parts)
|
||||
|
||||
|
||||
def compute_content_hash(text: str) -> str:
|
||||
"""Compute MD5 hash of text for change detection."""
|
||||
return hashlib.md5(text.encode()).hexdigest()
|
||||
|
||||
|
||||
async def generate_embedding(text: str) -> list[float] | None:
|
||||
"""
|
||||
Generate embedding for text using OpenAI API.
|
||||
|
||||
Returns None if embedding generation fails.
|
||||
"""
|
||||
try:
|
||||
from openai import OpenAI
|
||||
|
||||
api_key = os.getenv("OPENAI_API_KEY")
|
||||
if not api_key:
|
||||
logger.warning("OPENAI_API_KEY not set, cannot generate embedding")
|
||||
return None
|
||||
|
||||
client = OpenAI(api_key=api_key)
|
||||
|
||||
# Truncate text to avoid token limits (~32k chars for safety)
|
||||
truncated_text = text[:32000]
|
||||
|
||||
response = client.embeddings.create(
|
||||
model=EMBEDDING_MODEL,
|
||||
input=truncated_text,
|
||||
)
|
||||
|
||||
embedding = response.data[0].embedding
|
||||
logger.debug(f"Generated embedding with {len(embedding)} dimensions")
|
||||
return embedding
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to generate embedding: {e}")
|
||||
return None
|
||||
|
||||
|
||||
async def store_embedding(
|
||||
version_id: str,
|
||||
embedding: list[float],
|
||||
searchable_text: str,
|
||||
content_hash: str,
|
||||
tx: prisma.Prisma | None = None,
|
||||
) -> bool:
|
||||
"""
|
||||
Store embedding in the database.
|
||||
|
||||
Uses raw SQL since Prisma doesn't natively support pgvector.
|
||||
"""
|
||||
try:
|
||||
client = tx if tx else prisma.get_client()
|
||||
|
||||
# Convert embedding to PostgreSQL vector format
|
||||
embedding_str = "[" + ",".join(str(x) for x in embedding) + "]"
|
||||
|
||||
# Upsert the embedding
|
||||
await client.execute_raw(
|
||||
"""
|
||||
INSERT INTO "StoreListingEmbedding" (
|
||||
"id", "storeListingVersionId", "embedding",
|
||||
"searchableText", "contentHash", "createdAt", "updatedAt"
|
||||
)
|
||||
VALUES (
|
||||
gen_random_uuid(), $1, $2::vector,
|
||||
$3, $4, NOW(), NOW()
|
||||
)
|
||||
ON CONFLICT ("storeListingVersionId")
|
||||
DO UPDATE SET
|
||||
"embedding" = $2::vector,
|
||||
"searchableText" = $3,
|
||||
"contentHash" = $4,
|
||||
"updatedAt" = NOW()
|
||||
""",
|
||||
version_id,
|
||||
embedding_str,
|
||||
searchable_text,
|
||||
content_hash,
|
||||
)
|
||||
|
||||
logger.info(f"Stored embedding for version {version_id}")
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to store embedding for version {version_id}: {e}")
|
||||
return False
|
||||
|
||||
|
||||
async def get_embedding(version_id: str) -> dict[str, Any] | None:
|
||||
"""
|
||||
Retrieve embedding record for a listing version.
|
||||
|
||||
Returns dict with embedding, searchableText, contentHash or None if not found.
|
||||
"""
|
||||
try:
|
||||
client = prisma.get_client()
|
||||
|
||||
result = await client.query_raw(
|
||||
"""
|
||||
SELECT
|
||||
"id",
|
||||
"storeListingVersionId",
|
||||
"embedding"::text as "embedding",
|
||||
"searchableText",
|
||||
"contentHash",
|
||||
"createdAt",
|
||||
"updatedAt"
|
||||
FROM "StoreListingEmbedding"
|
||||
WHERE "storeListingVersionId" = $1
|
||||
""",
|
||||
version_id,
|
||||
)
|
||||
|
||||
if result and len(result) > 0:
|
||||
return result[0]
|
||||
return None
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to get embedding for version {version_id}: {e}")
|
||||
return None
|
||||
|
||||
|
||||
async def ensure_embedding(
|
||||
version_id: str,
|
||||
name: str,
|
||||
description: str,
|
||||
sub_heading: str,
|
||||
categories: list[str],
|
||||
force: bool = False,
|
||||
tx: prisma.Prisma | None = None,
|
||||
) -> bool:
|
||||
"""
|
||||
Ensure an embedding exists for the listing version.
|
||||
|
||||
Creates embedding if missing or if content has changed.
|
||||
Skips if content hash matches existing embedding.
|
||||
|
||||
Args:
|
||||
version_id: The StoreListingVersion ID
|
||||
name: Agent name
|
||||
description: Agent description
|
||||
sub_heading: Agent sub-heading
|
||||
categories: Agent categories
|
||||
force: Force regeneration even if hash matches
|
||||
tx: Optional transaction client
|
||||
|
||||
Returns:
|
||||
True if embedding exists/was created, False on failure
|
||||
"""
|
||||
try:
|
||||
# Build searchable text and compute hash
|
||||
searchable_text = build_searchable_text(
|
||||
name, description, sub_heading, categories
|
||||
)
|
||||
content_hash = compute_content_hash(searchable_text)
|
||||
|
||||
# Check if embedding already exists with same hash
|
||||
if not force:
|
||||
existing = await get_embedding(version_id)
|
||||
if existing and existing.get("contentHash") == content_hash:
|
||||
logger.debug(
|
||||
f"Embedding for version {version_id} is up to date (hash match)"
|
||||
)
|
||||
return True
|
||||
|
||||
# Generate new embedding
|
||||
embedding = await generate_embedding(searchable_text)
|
||||
if embedding is None:
|
||||
logger.warning(f"Could not generate embedding for version {version_id}")
|
||||
return False
|
||||
|
||||
# Store the embedding
|
||||
return await store_embedding(
|
||||
version_id=version_id,
|
||||
embedding=embedding,
|
||||
searchable_text=searchable_text,
|
||||
content_hash=content_hash,
|
||||
tx=tx,
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to ensure embedding for version {version_id}: {e}")
|
||||
return False
|
||||
|
||||
|
||||
async def delete_embedding(version_id: str) -> bool:
|
||||
"""
|
||||
Delete embedding for a listing version.
|
||||
|
||||
Note: This is usually handled automatically by CASCADE delete,
|
||||
but provided for manual cleanup if needed.
|
||||
"""
|
||||
try:
|
||||
client = prisma.get_client()
|
||||
|
||||
await client.execute_raw(
|
||||
"""
|
||||
DELETE FROM "StoreListingEmbedding"
|
||||
WHERE "storeListingVersionId" = $1
|
||||
""",
|
||||
version_id,
|
||||
)
|
||||
|
||||
logger.info(f"Deleted embedding for version {version_id}")
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to delete embedding for version {version_id}: {e}")
|
||||
return False
|
||||
|
||||
|
||||
async def get_embedding_stats() -> dict[str, Any]:
|
||||
"""
|
||||
Get statistics about embedding coverage.
|
||||
|
||||
Returns counts of:
|
||||
- Total approved listing versions
|
||||
- Versions with embeddings
|
||||
- Versions without embeddings
|
||||
"""
|
||||
try:
|
||||
client = prisma.get_client()
|
||||
|
||||
# Count approved versions
|
||||
approved_result = await client.query_raw(
|
||||
"""
|
||||
SELECT COUNT(*) as count
|
||||
FROM "StoreListingVersion"
|
||||
WHERE "submissionStatus" = 'APPROVED'
|
||||
AND "isDeleted" = false
|
||||
"""
|
||||
)
|
||||
total_approved = approved_result[0]["count"] if approved_result else 0
|
||||
|
||||
# Count versions with embeddings
|
||||
embedded_result = await client.query_raw(
|
||||
"""
|
||||
SELECT COUNT(*) as count
|
||||
FROM "StoreListingVersion" slv
|
||||
JOIN "StoreListingEmbedding" sle ON slv.id = sle."storeListingVersionId"
|
||||
WHERE slv."submissionStatus" = 'APPROVED'
|
||||
AND slv."isDeleted" = false
|
||||
"""
|
||||
)
|
||||
with_embeddings = embedded_result[0]["count"] if embedded_result else 0
|
||||
|
||||
return {
|
||||
"total_approved": total_approved,
|
||||
"with_embeddings": with_embeddings,
|
||||
"without_embeddings": total_approved - with_embeddings,
|
||||
"coverage_percent": (
|
||||
round(with_embeddings / total_approved * 100, 1)
|
||||
if total_approved > 0
|
||||
else 0
|
||||
),
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to get embedding stats: {e}")
|
||||
return {
|
||||
"total_approved": 0,
|
||||
"with_embeddings": 0,
|
||||
"without_embeddings": 0,
|
||||
"coverage_percent": 0,
|
||||
"error": str(e),
|
||||
}
|
||||
|
||||
|
||||
async def backfill_missing_embeddings(batch_size: int = 10) -> dict[str, Any]:
|
||||
"""
|
||||
Generate embeddings for approved listings that don't have them.
|
||||
|
||||
Args:
|
||||
batch_size: Number of embeddings to generate in one call
|
||||
|
||||
Returns:
|
||||
Dict with success/failure counts
|
||||
"""
|
||||
try:
|
||||
client = prisma.get_client()
|
||||
|
||||
# Find approved versions without embeddings
|
||||
missing = await client.query_raw(
|
||||
"""
|
||||
SELECT
|
||||
slv.id,
|
||||
slv.name,
|
||||
slv.description,
|
||||
slv."subHeading",
|
||||
slv.categories
|
||||
FROM "StoreListingVersion" slv
|
||||
LEFT JOIN "StoreListingEmbedding" sle ON slv.id = sle."storeListingVersionId"
|
||||
WHERE slv."submissionStatus" = 'APPROVED'
|
||||
AND slv."isDeleted" = false
|
||||
AND sle.id IS NULL
|
||||
LIMIT $1
|
||||
""",
|
||||
batch_size,
|
||||
)
|
||||
|
||||
if not missing:
|
||||
return {
|
||||
"processed": 0,
|
||||
"success": 0,
|
||||
"failed": 0,
|
||||
"message": "No missing embeddings",
|
||||
}
|
||||
|
||||
success = 0
|
||||
failed = 0
|
||||
|
||||
for row in missing:
|
||||
result = await ensure_embedding(
|
||||
version_id=row["id"],
|
||||
name=row["name"],
|
||||
description=row["description"],
|
||||
sub_heading=row["subHeading"],
|
||||
categories=row["categories"] or [],
|
||||
)
|
||||
if result:
|
||||
success += 1
|
||||
else:
|
||||
failed += 1
|
||||
|
||||
return {
|
||||
"processed": len(missing),
|
||||
"success": success,
|
||||
"failed": failed,
|
||||
"message": f"Backfilled {success} embeddings, {failed} failed",
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to backfill embeddings: {e}")
|
||||
return {
|
||||
"processed": 0,
|
||||
"success": 0,
|
||||
"failed": 0,
|
||||
"error": str(e),
|
||||
}
|
||||
|
||||
|
||||
async def embed_query(query: str) -> list[float] | None:
|
||||
"""
|
||||
Generate embedding for a search query.
|
||||
|
||||
Same as generate_embedding but with clearer intent.
|
||||
"""
|
||||
return await generate_embedding(query)
|
||||
|
||||
|
||||
def embedding_to_vector_string(embedding: list[float]) -> str:
|
||||
"""Convert embedding list to PostgreSQL vector string format."""
|
||||
return "[" + ",".join(str(x) for x in embedding) + "]"
|
||||
@@ -0,0 +1,434 @@
|
||||
"""
|
||||
Hybrid Search for Store Agents
|
||||
|
||||
Combines semantic (embedding) search with lexical (tsvector) search
|
||||
for improved relevance in marketplace agent discovery.
|
||||
"""
|
||||
|
||||
import logging
|
||||
from dataclasses import dataclass
|
||||
from datetime import datetime
|
||||
from typing import Any, Literal
|
||||
|
||||
import prisma
|
||||
|
||||
from backend.server.v2.store.embeddings import embed_query, embedding_to_vector_string
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@dataclass
|
||||
class HybridSearchWeights:
|
||||
"""Weights for combining search signals."""
|
||||
|
||||
semantic: float = 0.35 # Embedding cosine similarity
|
||||
lexical: float = 0.35 # tsvector ts_rank_cd score
|
||||
category: float = 0.20 # Category match boost
|
||||
recency: float = 0.10 # Newer agents ranked higher
|
||||
|
||||
|
||||
DEFAULT_WEIGHTS = HybridSearchWeights()
|
||||
|
||||
# Minimum relevance score threshold - agents below this are filtered out
|
||||
# With weights (0.35 semantic + 0.35 lexical + 0.20 category + 0.10 recency):
|
||||
# - 0.20 means at least ~50% semantic match OR strong lexical match required
|
||||
# - Ensures only genuinely relevant results are returned
|
||||
# - Recency alone (0.10 max) won't pass the threshold
|
||||
DEFAULT_MIN_SCORE = 0.20
|
||||
|
||||
|
||||
@dataclass
|
||||
class HybridSearchResult:
|
||||
"""A single search result with score breakdown."""
|
||||
|
||||
slug: str
|
||||
agent_name: str
|
||||
agent_image: str
|
||||
creator_username: str
|
||||
creator_avatar: str
|
||||
sub_heading: str
|
||||
description: str
|
||||
runs: int
|
||||
rating: float
|
||||
categories: list[str]
|
||||
featured: bool
|
||||
is_available: bool
|
||||
updated_at: datetime
|
||||
|
||||
# Score breakdown (for debugging/tuning)
|
||||
combined_score: float
|
||||
semantic_score: float = 0.0
|
||||
lexical_score: float = 0.0
|
||||
category_score: float = 0.0
|
||||
recency_score: float = 0.0
|
||||
|
||||
|
||||
async def hybrid_search(
|
||||
query: str,
|
||||
featured: bool = False,
|
||||
creators: list[str] | None = None,
|
||||
category: str | None = None,
|
||||
sorted_by: (
|
||||
Literal["relevance", "rating", "runs", "name", "updated_at"] | None
|
||||
) = None,
|
||||
page: int = 1,
|
||||
page_size: int = 20,
|
||||
weights: HybridSearchWeights | None = None,
|
||||
min_score: float | None = None,
|
||||
) -> tuple[list[dict[str, Any]], int]:
|
||||
"""
|
||||
Perform hybrid search combining semantic and lexical signals.
|
||||
|
||||
Args:
|
||||
query: Search query string
|
||||
featured: Filter for featured agents only
|
||||
creators: Filter by creator usernames
|
||||
category: Filter by category
|
||||
sorted_by: Sort order (relevance uses hybrid scoring)
|
||||
page: Page number (1-indexed)
|
||||
page_size: Results per page
|
||||
weights: Custom weights for search signals
|
||||
min_score: Minimum relevance score threshold (0-1). Results below
|
||||
this score are filtered out. Defaults to DEFAULT_MIN_SCORE.
|
||||
|
||||
Returns:
|
||||
Tuple of (results list, total count). Returns empty list if no
|
||||
results meet the minimum relevance threshold.
|
||||
"""
|
||||
if weights is None:
|
||||
weights = DEFAULT_WEIGHTS
|
||||
if min_score is None:
|
||||
min_score = DEFAULT_MIN_SCORE
|
||||
|
||||
offset = (page - 1) * page_size
|
||||
client = prisma.get_client()
|
||||
|
||||
# Generate query embedding
|
||||
query_embedding = await embed_query(query)
|
||||
|
||||
# Build WHERE clause conditions
|
||||
where_parts: list[str] = ["sa.is_available = true"]
|
||||
params: list[Any] = []
|
||||
param_index = 1
|
||||
|
||||
# Add search query for lexical matching
|
||||
params.append(query)
|
||||
query_param = f"${param_index}"
|
||||
param_index += 1
|
||||
|
||||
if featured:
|
||||
where_parts.append("sa.featured = true")
|
||||
|
||||
if creators:
|
||||
where_parts.append(f"sa.creator_username = ANY(${param_index})")
|
||||
params.append(creators)
|
||||
param_index += 1
|
||||
|
||||
if category:
|
||||
where_parts.append(f"${param_index} = ANY(sa.categories)")
|
||||
params.append(category)
|
||||
param_index += 1
|
||||
|
||||
where_clause = " AND ".join(where_parts)
|
||||
|
||||
# Determine if we can use hybrid search (have query embedding)
|
||||
use_hybrid = query_embedding is not None
|
||||
|
||||
if use_hybrid:
|
||||
# Add embedding parameter
|
||||
embedding_str = embedding_to_vector_string(query_embedding)
|
||||
params.append(embedding_str)
|
||||
embedding_param = f"${param_index}"
|
||||
param_index += 1
|
||||
|
||||
# Build hybrid search query with weighted scoring
|
||||
# The semantic score is (1 - cosine_distance), normalized to [0,1]
|
||||
# The lexical score is ts_rank_cd, normalized by max value
|
||||
sql_query = f"""
|
||||
WITH search_scores AS (
|
||||
SELECT
|
||||
sa.*,
|
||||
-- Semantic score: cosine similarity (1 - distance)
|
||||
COALESCE(1 - (sle.embedding <=> {embedding_param}::vector), 0) as semantic_score,
|
||||
-- Lexical score: ts_rank_cd normalized
|
||||
COALESCE(ts_rank_cd(sa.search, plainto_tsquery('english', {query_param})), 0) as lexical_raw,
|
||||
-- Category match: 1 if query term appears in categories, else 0
|
||||
CASE
|
||||
WHEN EXISTS (
|
||||
SELECT 1 FROM unnest(sa.categories) cat
|
||||
WHERE LOWER(cat) LIKE '%' || LOWER({query_param}) || '%'
|
||||
) THEN 1.0
|
||||
ELSE 0.0
|
||||
END as category_score,
|
||||
-- Recency score: exponential decay over 90 days
|
||||
EXP(-EXTRACT(EPOCH FROM (NOW() - sa.updated_at)) / (90 * 24 * 3600)) as recency_score
|
||||
FROM "StoreAgent" sa
|
||||
LEFT JOIN "StoreListing" sl ON sa.slug = sl.slug
|
||||
LEFT JOIN "StoreListingVersion" slv ON sl."activeVersionId" = slv.id
|
||||
LEFT JOIN "StoreListingEmbedding" sle ON slv.id = sle."storeListingVersionId"
|
||||
WHERE {where_clause}
|
||||
AND (
|
||||
sa.search @@ plainto_tsquery('english', {query_param})
|
||||
OR sle.embedding IS NOT NULL
|
||||
)
|
||||
),
|
||||
normalized AS (
|
||||
SELECT
|
||||
*,
|
||||
-- Normalize lexical score by max in result set
|
||||
CASE
|
||||
WHEN MAX(lexical_raw) OVER () > 0
|
||||
THEN lexical_raw / MAX(lexical_raw) OVER ()
|
||||
ELSE 0
|
||||
END as lexical_score
|
||||
FROM search_scores
|
||||
),
|
||||
scored AS (
|
||||
SELECT
|
||||
slug,
|
||||
agent_name,
|
||||
agent_image,
|
||||
creator_username,
|
||||
creator_avatar,
|
||||
sub_heading,
|
||||
description,
|
||||
runs,
|
||||
rating,
|
||||
categories,
|
||||
featured,
|
||||
is_available,
|
||||
updated_at,
|
||||
semantic_score,
|
||||
lexical_score,
|
||||
category_score,
|
||||
recency_score,
|
||||
(
|
||||
{weights.semantic} * semantic_score +
|
||||
{weights.lexical} * lexical_score +
|
||||
{weights.category} * category_score +
|
||||
{weights.recency} * recency_score
|
||||
) as combined_score
|
||||
FROM normalized
|
||||
)
|
||||
SELECT * FROM scored
|
||||
WHERE combined_score >= {min_score}
|
||||
ORDER BY combined_score DESC
|
||||
LIMIT ${param_index} OFFSET ${param_index + 1}
|
||||
"""
|
||||
|
||||
# Add pagination params
|
||||
params.extend([page_size, offset])
|
||||
|
||||
# Count query - must also filter by min_score
|
||||
count_query = f"""
|
||||
WITH search_scores AS (
|
||||
SELECT
|
||||
sa.slug,
|
||||
COALESCE(1 - (sle.embedding <=> {embedding_param}::vector), 0) as semantic_score,
|
||||
COALESCE(ts_rank_cd(sa.search, plainto_tsquery('english', {query_param})), 0) as lexical_raw,
|
||||
CASE
|
||||
WHEN EXISTS (
|
||||
SELECT 1 FROM unnest(sa.categories) cat
|
||||
WHERE LOWER(cat) LIKE '%' || LOWER({query_param}) || '%'
|
||||
) THEN 1.0
|
||||
ELSE 0.0
|
||||
END as category_score,
|
||||
EXP(-EXTRACT(EPOCH FROM (NOW() - sa.updated_at)) / (90 * 24 * 3600)) as recency_score
|
||||
FROM "StoreAgent" sa
|
||||
LEFT JOIN "StoreListing" sl ON sa.slug = sl.slug
|
||||
LEFT JOIN "StoreListingVersion" slv ON sl."activeVersionId" = slv.id
|
||||
LEFT JOIN "StoreListingEmbedding" sle ON slv.id = sle."storeListingVersionId"
|
||||
WHERE {where_clause}
|
||||
AND (
|
||||
sa.search @@ plainto_tsquery('english', {query_param})
|
||||
OR sle.embedding IS NOT NULL
|
||||
)
|
||||
),
|
||||
normalized AS (
|
||||
SELECT
|
||||
slug,
|
||||
semantic_score,
|
||||
category_score,
|
||||
recency_score,
|
||||
CASE
|
||||
WHEN MAX(lexical_raw) OVER () > 0
|
||||
THEN lexical_raw / MAX(lexical_raw) OVER ()
|
||||
ELSE 0
|
||||
END as lexical_score
|
||||
FROM search_scores
|
||||
),
|
||||
scored AS (
|
||||
SELECT
|
||||
slug,
|
||||
(
|
||||
{weights.semantic} * semantic_score +
|
||||
{weights.lexical} * lexical_score +
|
||||
{weights.category} * category_score +
|
||||
{weights.recency} * recency_score
|
||||
) as combined_score
|
||||
FROM normalized
|
||||
)
|
||||
SELECT COUNT(*) as count FROM scored
|
||||
WHERE combined_score >= {min_score}
|
||||
"""
|
||||
|
||||
else:
|
||||
# Fallback to lexical-only search (existing behavior)
|
||||
# Note: For lexical-only, we still require tsvector match but don't
|
||||
# apply min_score since ts_rank_cd isn't normalized to [0,1]
|
||||
logger.warning("Falling back to lexical-only search (no query embedding)")
|
||||
|
||||
sql_query = f"""
|
||||
WITH lexical_scores AS (
|
||||
SELECT
|
||||
slug,
|
||||
agent_name,
|
||||
agent_image,
|
||||
creator_username,
|
||||
creator_avatar,
|
||||
sub_heading,
|
||||
description,
|
||||
runs,
|
||||
rating,
|
||||
categories,
|
||||
featured,
|
||||
is_available,
|
||||
updated_at,
|
||||
0.0 as semantic_score,
|
||||
ts_rank_cd(search, plainto_tsquery('english', {query_param})) as lexical_raw,
|
||||
CASE
|
||||
WHEN EXISTS (
|
||||
SELECT 1 FROM unnest(categories) cat
|
||||
WHERE LOWER(cat) LIKE '%' || LOWER({query_param}) || '%'
|
||||
) THEN 1.0
|
||||
ELSE 0.0
|
||||
END as category_score,
|
||||
EXP(-EXTRACT(EPOCH FROM (NOW() - updated_at)) / (90 * 24 * 3600)) as recency_score
|
||||
FROM "StoreAgent" sa
|
||||
WHERE {where_clause}
|
||||
AND search @@ plainto_tsquery('english', {query_param})
|
||||
),
|
||||
normalized AS (
|
||||
SELECT
|
||||
*,
|
||||
CASE
|
||||
WHEN MAX(lexical_raw) OVER () > 0
|
||||
THEN lexical_raw / MAX(lexical_raw) OVER ()
|
||||
ELSE 0
|
||||
END as lexical_score
|
||||
FROM lexical_scores
|
||||
),
|
||||
scored AS (
|
||||
SELECT
|
||||
slug,
|
||||
agent_name,
|
||||
agent_image,
|
||||
creator_username,
|
||||
creator_avatar,
|
||||
sub_heading,
|
||||
description,
|
||||
runs,
|
||||
rating,
|
||||
categories,
|
||||
featured,
|
||||
is_available,
|
||||
updated_at,
|
||||
semantic_score,
|
||||
lexical_score,
|
||||
category_score,
|
||||
recency_score,
|
||||
(
|
||||
{weights.lexical} * lexical_score +
|
||||
{weights.category} * category_score +
|
||||
{weights.recency} * recency_score
|
||||
) as combined_score
|
||||
FROM normalized
|
||||
)
|
||||
SELECT * FROM scored
|
||||
WHERE combined_score >= {min_score}
|
||||
ORDER BY combined_score DESC
|
||||
LIMIT ${param_index} OFFSET ${param_index + 1}
|
||||
"""
|
||||
|
||||
params.extend([page_size, offset])
|
||||
|
||||
count_query = f"""
|
||||
WITH lexical_scores AS (
|
||||
SELECT
|
||||
slug,
|
||||
ts_rank_cd(search, plainto_tsquery('english', {query_param})) as lexical_raw,
|
||||
CASE
|
||||
WHEN EXISTS (
|
||||
SELECT 1 FROM unnest(categories) cat
|
||||
WHERE LOWER(cat) LIKE '%' || LOWER({query_param}) || '%'
|
||||
) THEN 1.0
|
||||
ELSE 0.0
|
||||
END as category_score,
|
||||
EXP(-EXTRACT(EPOCH FROM (NOW() - updated_at)) / (90 * 24 * 3600)) as recency_score
|
||||
FROM "StoreAgent" sa
|
||||
WHERE {where_clause}
|
||||
AND search @@ plainto_tsquery('english', {query_param})
|
||||
),
|
||||
normalized AS (
|
||||
SELECT
|
||||
slug,
|
||||
category_score,
|
||||
recency_score,
|
||||
CASE
|
||||
WHEN MAX(lexical_raw) OVER () > 0
|
||||
THEN lexical_raw / MAX(lexical_raw) OVER ()
|
||||
ELSE 0
|
||||
END as lexical_score
|
||||
FROM lexical_scores
|
||||
),
|
||||
scored AS (
|
||||
SELECT
|
||||
slug,
|
||||
(
|
||||
{weights.lexical} * lexical_score +
|
||||
{weights.category} * category_score +
|
||||
{weights.recency} * recency_score
|
||||
) as combined_score
|
||||
FROM normalized
|
||||
)
|
||||
SELECT COUNT(*) as count FROM scored
|
||||
WHERE combined_score >= {min_score}
|
||||
"""
|
||||
|
||||
try:
|
||||
# Execute search query
|
||||
# Dynamic SQL is safe here - all user inputs are parameterized ($1, $2, etc.)
|
||||
results = await client.query_raw(sql_query, *params) # type: ignore[arg-type]
|
||||
|
||||
# Execute count query (without pagination params)
|
||||
count_params = params[:-2] # Remove LIMIT and OFFSET params
|
||||
count_result = await client.query_raw(count_query, *count_params) # type: ignore[arg-type]
|
||||
total = count_result[0]["count"] if count_result else 0
|
||||
|
||||
logger.info(
|
||||
f"Hybrid search for '{query}': {len(results)} results, {total} total "
|
||||
f"(hybrid={use_hybrid})"
|
||||
)
|
||||
|
||||
return results, total
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Hybrid search failed: {e}")
|
||||
raise
|
||||
|
||||
|
||||
async def hybrid_search_simple(
|
||||
query: str,
|
||||
page: int = 1,
|
||||
page_size: int = 20,
|
||||
) -> tuple[list[dict[str, Any]], int]:
|
||||
"""
|
||||
Simplified hybrid search for common use cases.
|
||||
|
||||
Uses default weights and no filters.
|
||||
"""
|
||||
return await hybrid_search(
|
||||
query=query,
|
||||
page=page,
|
||||
page_size=page_size,
|
||||
)
|
||||
@@ -0,0 +1,41 @@
|
||||
-- Migration: Add pgvector extension and StoreListingEmbedding table
|
||||
-- This enables hybrid search combining semantic (embedding) and lexical (tsvector) search
|
||||
|
||||
-- Enable pgvector extension for vector similarity search
|
||||
CREATE EXTENSION IF NOT EXISTS vector;
|
||||
|
||||
-- Create table to store embeddings for store listing versions
|
||||
CREATE TABLE "StoreListingEmbedding" (
|
||||
"id" TEXT NOT NULL DEFAULT gen_random_uuid(),
|
||||
"storeListingVersionId" TEXT NOT NULL,
|
||||
"embedding" vector(1536), -- OpenAI text-embedding-3-small produces 1536 dimensions
|
||||
"searchableText" TEXT, -- The text that was embedded (for debugging/recomputation)
|
||||
"contentHash" TEXT, -- MD5 hash of searchable text for change detection
|
||||
"createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
||||
"updatedAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
||||
|
||||
CONSTRAINT "StoreListingEmbedding_pkey" PRIMARY KEY ("id")
|
||||
);
|
||||
|
||||
-- Unique constraint: one embedding per listing version
|
||||
CREATE UNIQUE INDEX "StoreListingEmbedding_storeListingVersionId_key"
|
||||
ON "StoreListingEmbedding"("storeListingVersionId");
|
||||
|
||||
-- HNSW index for fast approximate nearest neighbor search
|
||||
-- Using cosine distance (vector_cosine_ops) which is standard for text embeddings
|
||||
CREATE INDEX "StoreListingEmbedding_embedding_idx"
|
||||
ON "StoreListingEmbedding"
|
||||
USING hnsw ("embedding" vector_cosine_ops);
|
||||
|
||||
-- Index on content hash for fast lookup during change detection
|
||||
CREATE INDEX "StoreListingEmbedding_contentHash_idx"
|
||||
ON "StoreListingEmbedding"("contentHash");
|
||||
|
||||
-- Foreign key to StoreListingVersion with CASCADE delete
|
||||
-- When a listing version is deleted, its embedding is automatically removed
|
||||
ALTER TABLE "StoreListingEmbedding"
|
||||
ADD CONSTRAINT "StoreListingEmbedding_storeListingVersionId_fkey"
|
||||
FOREIGN KEY ("storeListingVersionId")
|
||||
REFERENCES "StoreListingVersion"("id")
|
||||
ON DELETE CASCADE
|
||||
ON UPDATE CASCADE;
|
||||
@@ -0,0 +1,5 @@
|
||||
-- DropIndex
|
||||
DROP INDEX "StoreListingEmbedding_embedding_idx";
|
||||
|
||||
-- AlterTable
|
||||
ALTER TABLE "StoreListingEmbedding" ALTER COLUMN "id" DROP DEFAULT;
|
||||
@@ -982,6 +982,9 @@ model StoreListingVersion {
|
||||
// Reviews for this specific version
|
||||
Reviews StoreListingReview[]
|
||||
|
||||
// Embedding for semantic search (one-to-one)
|
||||
Embedding StoreListingEmbedding?
|
||||
|
||||
@@unique([storeListingId, version])
|
||||
@@index([storeListingId, submissionStatus, isAvailable])
|
||||
@@index([submissionStatus])
|
||||
@@ -1007,6 +1010,24 @@ model StoreListingReview {
|
||||
@@index([reviewByUserId])
|
||||
}
|
||||
|
||||
// Stores vector embeddings for semantic search of store listings
|
||||
// Uses pgvector extension for efficient similarity search
|
||||
model StoreListingEmbedding {
|
||||
id String @id @default(uuid())
|
||||
createdAt DateTime @default(now())
|
||||
updatedAt DateTime @default(now()) @updatedAt
|
||||
|
||||
storeListingVersionId String @unique
|
||||
StoreListingVersion StoreListingVersion @relation(fields: [storeListingVersionId], references: [id], onDelete: Cascade)
|
||||
|
||||
// pgvector embedding - stored as Unsupported type since Prisma doesn't natively support vector
|
||||
embedding Unsupported("vector(1536)")?
|
||||
searchableText String? // The text that was embedded (for debugging/recomputation)
|
||||
contentHash String? // MD5 hash for change detection
|
||||
|
||||
@@index([contentHash])
|
||||
}
|
||||
|
||||
enum SubmissionStatus {
|
||||
DRAFT // Being prepared, not yet submitted
|
||||
PENDING // Submitted, awaiting review
|
||||
|
||||
Reference in New Issue
Block a user