add vector search

This commit is contained in:
Swifty
2025-12-04 16:05:47 +01:00
parent 6588110bf2
commit 7b6db6e260
7 changed files with 977 additions and 19 deletions

View File

@@ -0,0 +1,156 @@
"""
Embedding service for generating text embeddings using OpenAI.
Used for vector-based semantic search in the store.
"""
import logging
from typing import Optional
import openai
from backend.util.settings import Settings
logger = logging.getLogger(__name__)
# Model configuration
# Using text-embedding-3-small (1536 dimensions) for compatibility with pgvector indexes
# pgvector IVFFlat/HNSW indexes have dimension limits (2000 for IVFFlat, varies for HNSW)
EMBEDDING_MODEL = "text-embedding-3-small"
EMBEDDING_DIMENSIONS = 1536
# Input validation limits
# OpenAI text-embedding-3-large supports up to 8191 tokens (~32k chars)
# We set a conservative limit to prevent abuse
MAX_TEXT_LENGTH = 10000 # characters
MAX_BATCH_SIZE = 100 # maximum texts per batch request
class EmbeddingService:
"""Service for generating text embeddings using OpenAI."""
def __init__(self, api_key: Optional[str] = None):
settings = Settings()
self.api_key = (
api_key
or settings.secrets.openai_internal_api_key
or settings.secrets.openai_api_key
)
if not self.api_key:
raise ValueError(
"OpenAI API key not configured. "
"Set OPENAI_API_KEY or OPENAI_INTERNAL_API_KEY environment variable."
)
self.client = openai.AsyncOpenAI(api_key=self.api_key)
async def generate_embedding(self, text: str) -> list[float]:
"""
Generate embedding for a single text string.
Args:
text: The text to generate an embedding for.
Returns:
A list of floats representing the embedding vector.
Raises:
ValueError: If the text is empty or exceeds maximum length.
openai.APIError: If the OpenAI API call fails.
"""
# Input validation
if not text or not text.strip():
raise ValueError("Text cannot be empty")
if len(text) > MAX_TEXT_LENGTH:
raise ValueError(
f"Text exceeds maximum length of {MAX_TEXT_LENGTH} characters"
)
try:
response = await self.client.embeddings.create(
model=EMBEDDING_MODEL,
input=text,
dimensions=EMBEDDING_DIMENSIONS,
)
return response.data[0].embedding
except openai.APIError as e:
logger.error(f"OpenAI API error generating embedding: {e}")
raise
async def generate_embeddings(self, texts: list[str]) -> list[list[float]]:
"""
Generate embeddings for multiple texts (batch).
Args:
texts: List of texts to generate embeddings for.
Returns:
List of embedding vectors, one per input text.
Raises:
ValueError: If any text is invalid or batch size exceeds limit.
openai.APIError: If the OpenAI API call fails.
"""
# Input validation
if not texts:
raise ValueError("Texts list cannot be empty")
if len(texts) > MAX_BATCH_SIZE:
raise ValueError(f"Batch size exceeds maximum of {MAX_BATCH_SIZE} texts")
for i, text in enumerate(texts):
if not text or not text.strip():
raise ValueError(f"Text at index {i} cannot be empty")
if len(text) > MAX_TEXT_LENGTH:
raise ValueError(
f"Text at index {i} exceeds maximum length of {MAX_TEXT_LENGTH} characters"
)
try:
response = await self.client.embeddings.create(
model=EMBEDDING_MODEL,
input=texts,
dimensions=EMBEDDING_DIMENSIONS,
)
# Sort by index to ensure correct ordering
sorted_data = sorted(response.data, key=lambda x: x.index)
return [item.embedding for item in sorted_data]
except openai.APIError as e:
logger.error(f"OpenAI API error generating embeddings: {e}")
raise
def create_search_text(name: str, sub_heading: str, description: str) -> str:
"""
Combine fields into searchable text for embedding.
This creates a single text string from the agent's name, sub-heading,
and description, which is then converted to an embedding vector.
Args:
name: The agent name.
sub_heading: The agent sub-heading/tagline.
description: The agent description.
Returns:
A single string combining all non-empty fields.
"""
parts = [name or "", sub_heading or "", description or ""]
return " ".join(filter(None, parts)).strip()
# Singleton instance
_embedding_service: Optional[EmbeddingService] = None
async def get_embedding_service() -> EmbeddingService:
"""
Get or create the embedding service singleton.
Returns:
The shared EmbeddingService instance.
Raises:
ValueError: If OpenAI API key is not configured.
"""
global _embedding_service
if _embedding_service is None:
_embedding_service = EmbeddingService()
return _embedding_service

View File

@@ -0,0 +1,231 @@
"""Tests for the embedding service.
This module tests:
- create_search_text utility function
- EmbeddingService input validation
- EmbeddingService API interaction (mocked)
"""
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from backend.integrations.embeddings import (
EMBEDDING_DIMENSIONS,
MAX_BATCH_SIZE,
MAX_TEXT_LENGTH,
EmbeddingService,
create_search_text,
)
class TestCreateSearchText:
"""Tests for the create_search_text utility function."""
def test_combines_all_fields(self):
result = create_search_text("Agent Name", "A cool agent", "Does amazing things")
assert result == "Agent Name A cool agent Does amazing things"
def test_handles_empty_name(self):
result = create_search_text("", "Sub heading", "Description")
assert result == "Sub heading Description"
def test_handles_empty_sub_heading(self):
result = create_search_text("Name", "", "Description")
assert result == "Name Description"
def test_handles_empty_description(self):
result = create_search_text("Name", "Sub heading", "")
assert result == "Name Sub heading"
def test_handles_all_empty(self):
result = create_search_text("", "", "")
assert result == ""
def test_handles_none_values(self):
# The function expects strings but should handle None gracefully
result = create_search_text(None, None, None) # type: ignore
assert result == ""
def test_preserves_content_strips_outer_whitespace(self):
# The function joins parts and strips the outer result
# Internal whitespace in each part is preserved
result = create_search_text(" Name ", " Sub ", " Desc ")
# Each part is joined with space, then outer strip applied
assert result == "Name Sub Desc"
def test_handles_only_whitespace(self):
# Parts that are only whitespace become empty after filter
result = create_search_text(" ", " ", " ")
assert result == ""
class TestEmbeddingServiceValidation:
"""Tests for EmbeddingService input validation."""
@pytest.fixture
def mock_settings(self):
"""Mock settings with a test API key."""
with patch("backend.integrations.embeddings.Settings") as mock:
mock_instance = MagicMock()
mock_instance.secrets.openai_internal_api_key = "test-api-key"
mock_instance.secrets.openai_api_key = ""
mock.return_value = mock_instance
yield mock
@pytest.fixture
def service(self, mock_settings):
"""Create an EmbeddingService instance with mocked settings."""
with patch("backend.integrations.embeddings.openai.AsyncOpenAI"):
return EmbeddingService()
def test_init_requires_api_key(self):
"""Test that initialization fails without an API key."""
with patch("backend.integrations.embeddings.Settings") as mock:
mock_instance = MagicMock()
mock_instance.secrets.openai_internal_api_key = ""
mock_instance.secrets.openai_api_key = ""
mock.return_value = mock_instance
with pytest.raises(ValueError, match="OpenAI API key not configured"):
EmbeddingService()
def test_init_accepts_explicit_api_key(self):
"""Test that explicit API key overrides settings."""
with patch("backend.integrations.embeddings.Settings") as mock:
mock_instance = MagicMock()
mock_instance.secrets.openai_internal_api_key = ""
mock_instance.secrets.openai_api_key = ""
mock.return_value = mock_instance
with patch("backend.integrations.embeddings.openai.AsyncOpenAI"):
service = EmbeddingService(api_key="explicit-key")
assert service.api_key == "explicit-key"
@pytest.mark.asyncio
async def test_generate_embedding_empty_text(self, service):
"""Test that empty text raises ValueError."""
with pytest.raises(ValueError, match="Text cannot be empty"):
await service.generate_embedding("")
@pytest.mark.asyncio
async def test_generate_embedding_whitespace_only(self, service):
"""Test that whitespace-only text raises ValueError."""
with pytest.raises(ValueError, match="Text cannot be empty"):
await service.generate_embedding(" ")
@pytest.mark.asyncio
async def test_generate_embedding_exceeds_max_length(self, service):
"""Test that text exceeding max length raises ValueError."""
long_text = "a" * (MAX_TEXT_LENGTH + 1)
with pytest.raises(ValueError, match="exceeds maximum length"):
await service.generate_embedding(long_text)
@pytest.mark.asyncio
async def test_generate_embeddings_empty_list(self, service):
"""Test that empty list raises ValueError."""
with pytest.raises(ValueError, match="Texts list cannot be empty"):
await service.generate_embeddings([])
@pytest.mark.asyncio
async def test_generate_embeddings_exceeds_batch_size(self, service):
"""Test that batch exceeding max size raises ValueError."""
texts = ["text"] * (MAX_BATCH_SIZE + 1)
with pytest.raises(ValueError, match="Batch size exceeds maximum"):
await service.generate_embeddings(texts)
@pytest.mark.asyncio
async def test_generate_embeddings_empty_text_in_batch(self, service):
"""Test that empty text in batch raises ValueError with index."""
with pytest.raises(ValueError, match="Text at index 1 cannot be empty"):
await service.generate_embeddings(["valid", "", "also valid"])
@pytest.mark.asyncio
async def test_generate_embeddings_long_text_in_batch(self, service):
"""Test that long text in batch raises ValueError with index."""
long_text = "a" * (MAX_TEXT_LENGTH + 1)
with pytest.raises(ValueError, match="Text at index 2 exceeds maximum length"):
await service.generate_embeddings(["short", "also short", long_text])
class TestEmbeddingServiceAPI:
"""Tests for EmbeddingService API interaction."""
@pytest.fixture
def mock_openai_client(self):
"""Create a mock OpenAI client."""
mock_client = MagicMock()
mock_client.embeddings = MagicMock()
return mock_client
@pytest.fixture
def service_with_mock_client(self, mock_openai_client):
"""Create an EmbeddingService with a mocked OpenAI client."""
with patch("backend.integrations.embeddings.Settings") as mock_settings:
mock_instance = MagicMock()
mock_instance.secrets.openai_internal_api_key = "test-key"
mock_instance.secrets.openai_api_key = ""
mock_settings.return_value = mock_instance
with patch(
"backend.integrations.embeddings.openai.AsyncOpenAI"
) as mock_openai:
mock_openai.return_value = mock_openai_client
service = EmbeddingService()
return service, mock_openai_client
@pytest.mark.asyncio
async def test_generate_embedding_success(self, service_with_mock_client):
"""Test successful embedding generation."""
service, mock_client = service_with_mock_client
# Create mock response
mock_embedding = [0.1] * EMBEDDING_DIMENSIONS
mock_response = MagicMock()
mock_response.data = [MagicMock(embedding=mock_embedding)]
mock_client.embeddings.create = AsyncMock(return_value=mock_response)
result = await service.generate_embedding("test text")
assert result == mock_embedding
mock_client.embeddings.create.assert_called_once()
@pytest.mark.asyncio
async def test_generate_embeddings_success(self, service_with_mock_client):
"""Test successful batch embedding generation."""
service, mock_client = service_with_mock_client
# Create mock response with multiple embeddings
mock_embeddings = [[0.1] * EMBEDDING_DIMENSIONS, [0.2] * EMBEDDING_DIMENSIONS]
mock_response = MagicMock()
mock_response.data = [
MagicMock(embedding=mock_embeddings[0], index=0),
MagicMock(embedding=mock_embeddings[1], index=1),
]
mock_client.embeddings.create = AsyncMock(return_value=mock_response)
result = await service.generate_embeddings(["text1", "text2"])
assert result == mock_embeddings
mock_client.embeddings.create.assert_called_once()
@pytest.mark.asyncio
async def test_generate_embeddings_preserves_order(self, service_with_mock_client):
"""Test that batch embeddings are returned in correct order even if API returns out of order."""
service, mock_client = service_with_mock_client
# Create mock response with embeddings out of order
mock_embeddings = [[0.1] * EMBEDDING_DIMENSIONS, [0.2] * EMBEDDING_DIMENSIONS]
mock_response = MagicMock()
# Return in reverse order
mock_response.data = [
MagicMock(embedding=mock_embeddings[1], index=1),
MagicMock(embedding=mock_embeddings[0], index=0),
]
mock_client.embeddings.create = AsyncMock(return_value=mock_response)
result = await service.generate_embeddings(["text1", "text2"])
# Should be sorted by index
assert result[0] == mock_embeddings[0]
assert result[1] == mock_embeddings[1]

View File

@@ -0,0 +1,168 @@
"""
Script to backfill embeddings for existing store listing versions.
This script should be run after the migration to add the embedding column
to populate embeddings for all existing store listing versions.
Usage:
poetry run python -m backend.server.v2.store.backfill_embeddings
poetry run python -m backend.server.v2.store.backfill_embeddings --dry-run
poetry run python -m backend.server.v2.store.backfill_embeddings --batch-size 25
"""
import argparse
import asyncio
import logging
import sys
from backend.data.db import connect, disconnect, query_raw_with_schema
from backend.integrations.embeddings import EmbeddingService, create_search_text
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
)
logger = logging.getLogger(__name__)
# Default batch size for processing
DEFAULT_BATCH_SIZE = 50
# Delay between batches to avoid rate limits (seconds)
BATCH_DELAY_SECONDS = 1.0
async def backfill_embeddings(
dry_run: bool = False,
batch_size: int = DEFAULT_BATCH_SIZE,
) -> tuple[int, int]:
"""
Backfill embeddings for all store listing versions without embeddings.
Args:
dry_run: If True, don't make any changes, just report what would be done.
batch_size: Number of versions to process in each batch.
Returns:
Tuple of (processed_count, error_count)
"""
await connect()
try:
embedding_service = EmbeddingService()
# Get all versions without embeddings
versions = await query_raw_with_schema(
"""
SELECT id, name, "subHeading", description
FROM {schema_prefix}"StoreListingVersion"
WHERE embedding IS NULL
ORDER BY "createdAt" DESC
"""
)
total = len(versions)
logger.info(f"Found {total} versions without embeddings")
if dry_run:
logger.info("Dry run mode - no changes will be made")
return (0, 0)
if total == 0:
logger.info("No versions need embeddings")
return (0, 0)
processed = 0
errors = 0
for i in range(0, total, batch_size):
batch = versions[i : i + batch_size]
batch_num = (i // batch_size) + 1
total_batches = (total + batch_size - 1) // batch_size
logger.info(f"Processing batch {batch_num}/{total_batches}")
for version in batch:
version_id = version["id"]
try:
search_text = create_search_text(
version["name"] or "",
version["subHeading"] or "",
version["description"] or "",
)
if not search_text:
logger.warning(f"Skipping {version_id} - no searchable text")
continue
embedding = await embedding_service.generate_embedding(search_text)
embedding_str = "[" + ",".join(map(str, embedding)) + "]"
await query_raw_with_schema(
"""
UPDATE {schema_prefix}"StoreListingVersion"
SET embedding = $1::vector
WHERE id = $2
""",
embedding_str,
version_id,
)
processed += 1
except Exception as e:
logger.error(f"Error processing {version_id}: {e}")
errors += 1
logger.info(f"Progress: {processed}/{total} processed, {errors} errors")
# Rate limit: wait between batches to avoid hitting API limits
if i + batch_size < total:
await asyncio.sleep(BATCH_DELAY_SECONDS)
logger.info(f"Backfill complete: {processed} processed, {errors} errors")
return (processed, errors)
finally:
await disconnect()
def main():
parser = argparse.ArgumentParser(
description="Backfill embeddings for store listing versions"
)
parser.add_argument(
"--dry-run",
action="store_true",
help="Don't make any changes, just report what would be done",
)
parser.add_argument(
"--batch-size",
type=int,
default=DEFAULT_BATCH_SIZE,
help=f"Number of versions to process in each batch (default: {DEFAULT_BATCH_SIZE})",
)
args = parser.parse_args()
try:
processed, errors = asyncio.run(
backfill_embeddings(dry_run=args.dry_run, batch_size=args.batch_size)
)
if errors > 0:
logger.warning(f"Completed with {errors} errors")
sys.exit(1)
else:
logger.info("Completed successfully")
sys.exit(0)
except KeyboardInterrupt:
logger.info("Interrupted by user")
sys.exit(130)
except Exception as e:
logger.error(f"Fatal error: {e}")
sys.exit(1)
if __name__ == "__main__":
main()

View File

@@ -26,6 +26,7 @@ from backend.data.notifications import (
AgentRejectionData,
NotificationEventModel,
)
from backend.integrations.embeddings import create_search_text, get_embedding_service
from backend.notifications.notifications import queue_notification_async
from backend.util.exceptions import DatabaseError
from backend.util.settings import Settings
@@ -56,31 +57,40 @@ async def get_store_agents(
)
try:
# If search_query is provided, use full-text search
# If search_query is provided, use vector similarity search
if search_query:
offset = (page - 1) * page_size
# Generate embedding for search query
embedding_service = await get_embedding_service()
query_embedding = await embedding_service.generate_embedding(search_query)
# Convert embedding to PostgreSQL array format
embedding_str = "[" + ",".join(map(str, query_embedding)) + "]"
# Whitelist allowed order_by columns
# For vector search, we use similarity instead of rank
ALLOWED_ORDER_BY = {
"rating": "rating DESC, rank DESC",
"runs": "runs DESC, rank DESC",
"name": "agent_name ASC, rank ASC",
"updated_at": "updated_at DESC, rank DESC",
"rating": "rating DESC, similarity DESC",
"runs": "runs DESC, similarity DESC",
"name": "agent_name ASC, similarity DESC",
"updated_at": "updated_at DESC, similarity DESC",
}
# Validate and get order clause
if sorted_by and sorted_by in ALLOWED_ORDER_BY:
order_by_clause = ALLOWED_ORDER_BY[sorted_by]
else:
order_by_clause = "updated_at DESC, rank DESC"
# Default: order by vector similarity (most similar first)
order_by_clause = "similarity DESC, updated_at DESC"
# Build WHERE conditions and parameters list
where_parts: list[str] = []
params: list[typing.Any] = [search_query] # $1 - search term
params: list[typing.Any] = [embedding_str] # $1 - query embedding
param_index = 2 # Start at $2 for next parameter
# Always filter for available agents
# Always filter for available agents and agents with embeddings
where_parts.append("is_available = true")
where_parts.append("embedding IS NOT NULL")
if featured:
where_parts.append("featured = true")
@@ -103,7 +113,9 @@ async def get_store_agents(
limit_param = f"${param_index}"
offset_param = f"${param_index + 1}"
# Execute full-text search query with parameterized values
# Vector similarity search query using cosine distance
# The <=> operator returns cosine distance (0 = identical, 2 = opposite)
# We convert to similarity: 1 - distance/2 gives range [0, 1]
sql_query = f"""
SELECT
slug,
@@ -119,22 +131,18 @@ async def get_store_agents(
featured,
is_available,
updated_at,
ts_rank_cd(search, query) AS rank
FROM {{schema_prefix}}"StoreAgent",
plainto_tsquery('english', $1) AS query
1 - (embedding <=> $1::vector) AS similarity
FROM {{schema_prefix}}"StoreAgent"
WHERE {sql_where_clause}
AND search @@ query
ORDER BY {order_by_clause}
LIMIT {limit_param} OFFSET {offset_param}
"""
# Count query for pagination - only uses search term parameter
# Count query for pagination
count_query = f"""
SELECT COUNT(*) as count
FROM {{schema_prefix}}"StoreAgent",
plainto_tsquery('english', $1) AS query
FROM {{schema_prefix}}"StoreAgent"
WHERE {sql_where_clause}
AND search @@ query
"""
# Execute both queries with parameters
@@ -255,6 +263,56 @@ async def log_search_term(search_query: str):
logger.error(f"Error logging search term: {e}")
async def _generate_and_store_embedding(
store_listing_version_id: str,
name: str,
sub_heading: str,
description: str,
) -> None:
"""
Generate and store embedding for a store listing version.
This creates a vector embedding from the agent's name, sub_heading, and
description, which is used for semantic search.
Args:
store_listing_version_id: The ID of the store listing version.
name: The agent name.
sub_heading: The agent sub-heading/tagline.
description: The agent description.
"""
try:
embedding_service = await get_embedding_service()
search_text = create_search_text(name, sub_heading, description)
if not search_text:
logger.warning(
f"No searchable text for version {store_listing_version_id}, "
"skipping embedding generation"
)
return
embedding = await embedding_service.generate_embedding(search_text)
embedding_str = "[" + ",".join(map(str, embedding)) + "]"
await query_raw_with_schema(
"""
UPDATE {schema_prefix}"StoreListingVersion"
SET embedding = $1::vector
WHERE id = $2
""",
embedding_str,
store_listing_version_id,
)
logger.debug(f"Generated embedding for version {store_listing_version_id}")
except Exception as e:
# Log error but don't fail the whole operation
# Embeddings can be generated later via backfill
logger.error(
f"Failed to generate embedding for {store_listing_version_id}: {e}"
)
async def get_store_agent_details(
username: str, agent_name: str
) -> backend.server.v2.store.model.StoreAgentDetails:
@@ -801,6 +859,12 @@ async def create_store_submission(
else None
)
# Generate embedding for semantic search
if store_listing_version_id:
await _generate_and_store_embedding(
store_listing_version_id, name, sub_heading, description
)
logger.debug(f"Created store listing for agent {agent_id}")
# Return submission details
return backend.server.v2.store.model.StoreSubmission(
@@ -963,6 +1027,12 @@ async def edit_store_submission(
if not updated_version:
raise DatabaseError("Failed to update store listing version")
# Regenerate embedding with updated content
await _generate_and_store_embedding(
store_listing_version_id, name, sub_heading, description
)
return backend.server.v2.store.model.StoreSubmission(
agent_id=current_version.agentGraphId,
agent_version=current_version.agentGraphVersion,
@@ -1093,6 +1163,12 @@ async def create_store_version(
logger.debug(
f"Created new version for listing {store_listing_id} of agent {agent_id}"
)
# Generate embedding for semantic search
await _generate_and_store_embedding(
new_version.id, name, sub_heading, description
)
# Return submission details
return backend.server.v2.store.model.StoreSubmission(
agent_id=agent_id,

View File

@@ -405,3 +405,237 @@ async def test_get_store_agents_search_category_array_injection():
# Verify the query executed without error
# Category should be parameterized, preventing SQL injection
assert isinstance(result.agents, list)
# Vector search tests
@pytest.mark.asyncio(loop_scope="session")
async def test_get_store_agents_vector_search_mocked(mocker):
"""Test vector search uses embedding service and executes query safely."""
from backend.integrations.embeddings import EMBEDDING_DIMENSIONS
# Mock embedding service
mock_embedding = [0.1] * EMBEDDING_DIMENSIONS
mock_embedding_service = mocker.MagicMock()
mock_embedding_service.generate_embedding = mocker.AsyncMock(
return_value=mock_embedding
)
mocker.patch(
"backend.server.v2.store.db.get_embedding_service",
mocker.AsyncMock(return_value=mock_embedding_service),
)
# Mock query_raw_with_schema to return empty results
mocker.patch(
"backend.server.v2.store.db.query_raw_with_schema",
mocker.AsyncMock(side_effect=[[], [{"count": 0}]]),
)
# Call function with search query
result = await db.get_store_agents(search_query="test query")
# Verify embedding service was called
mock_embedding_service.generate_embedding.assert_called_once_with("test query")
# Verify results
assert isinstance(result.agents, list)
assert len(result.agents) == 0
@pytest.mark.asyncio(loop_scope="session")
async def test_get_store_agents_vector_search_with_results(mocker):
"""Test vector search returns properly formatted results."""
from backend.integrations.embeddings import EMBEDDING_DIMENSIONS
# Mock embedding service
mock_embedding = [0.1] * EMBEDDING_DIMENSIONS
mock_embedding_service = mocker.MagicMock()
mock_embedding_service.generate_embedding = mocker.AsyncMock(
return_value=mock_embedding
)
mocker.patch(
"backend.server.v2.store.db.get_embedding_service",
mocker.AsyncMock(return_value=mock_embedding_service),
)
# Mock query results
mock_agents = [
{
"slug": "test-agent",
"agent_name": "Test Agent",
"agent_image": ["image.jpg"],
"creator_username": "creator",
"creator_avatar": "avatar.jpg",
"sub_heading": "Test heading",
"description": "Test description",
"runs": 10,
"rating": 4.5,
"categories": ["test"],
"featured": False,
"is_available": True,
"updated_at": datetime.now(),
"similarity": 0.95,
}
]
mock_count = [{"count": 1}]
mocker.patch(
"backend.server.v2.store.db.query_raw_with_schema",
mocker.AsyncMock(side_effect=[mock_agents, mock_count]),
)
# Call function with search query
result = await db.get_store_agents(search_query="test query")
# Verify results
assert len(result.agents) == 1
assert result.agents[0].slug == "test-agent"
assert result.agents[0].agent_name == "Test Agent"
assert result.pagination.total_items == 1
@pytest.mark.asyncio(loop_scope="session")
async def test_get_store_agents_vector_search_with_filters(mocker):
"""Test vector search works correctly with additional filters."""
from backend.integrations.embeddings import EMBEDDING_DIMENSIONS
# Mock embedding service
mock_embedding = [0.1] * EMBEDDING_DIMENSIONS
mock_embedding_service = mocker.MagicMock()
mock_embedding_service.generate_embedding = mocker.AsyncMock(
return_value=mock_embedding
)
mocker.patch(
"backend.server.v2.store.db.get_embedding_service",
mocker.AsyncMock(return_value=mock_embedding_service),
)
# Mock query_raw_with_schema
mock_query = mocker.patch(
"backend.server.v2.store.db.query_raw_with_schema",
mocker.AsyncMock(side_effect=[[], [{"count": 0}]]),
)
# Call function with search query and filters
await db.get_store_agents(
search_query="test query",
featured=True,
creators=["creator1", "creator2"],
category="AI",
sorted_by="rating",
)
# Verify query was called with parameterized values
# First call is the main query, second is count
assert mock_query.call_count == 2
# Check that the SQL query includes proper parameterization
first_call_args = mock_query.call_args_list[0]
sql_query = first_call_args[0][0]
# Verify key elements of the query
assert "embedding <=> $1::vector" in sql_query
assert "featured = true" in sql_query
assert "creator_username = ANY($" in sql_query
assert "= ANY(categories)" in sql_query
@pytest.mark.asyncio(loop_scope="session")
async def test_generate_and_store_embedding_success(mocker):
"""Test that embedding generation and storage works correctly."""
from backend.integrations.embeddings import EMBEDDING_DIMENSIONS
# Mock embedding service
mock_embedding = [0.1] * EMBEDDING_DIMENSIONS
mock_embedding_service = mocker.MagicMock()
mock_embedding_service.generate_embedding = mocker.AsyncMock(
return_value=mock_embedding
)
mocker.patch(
"backend.server.v2.store.db.get_embedding_service",
mocker.AsyncMock(return_value=mock_embedding_service),
)
# Mock query_raw_with_schema
mock_query = mocker.patch(
"backend.server.v2.store.db.query_raw_with_schema",
mocker.AsyncMock(return_value=[]),
)
# Call the internal function
await db._generate_and_store_embedding(
store_listing_version_id="version-123",
name="Test Agent",
sub_heading="A test agent",
description="Does testing",
)
# Verify embedding service was called with combined text
mock_embedding_service.generate_embedding.assert_called_once_with(
"Test Agent A test agent Does testing"
)
# Verify database update was called
mock_query.assert_called_once()
call_args = mock_query.call_args
assert "UPDATE" in call_args[0][0]
assert "embedding = $1::vector" in call_args[0][0]
assert call_args[0][2] == "version-123"
@pytest.mark.asyncio(loop_scope="session")
async def test_generate_and_store_embedding_empty_text(mocker):
"""Test that embedding is not generated for empty text."""
# Mock embedding service
mock_embedding_service = mocker.MagicMock()
mock_embedding_service.generate_embedding = mocker.AsyncMock()
mocker.patch(
"backend.server.v2.store.db.get_embedding_service",
mocker.AsyncMock(return_value=mock_embedding_service),
)
# Mock query_raw_with_schema
mock_query = mocker.patch(
"backend.server.v2.store.db.query_raw_with_schema",
mocker.AsyncMock(return_value=[]),
)
# Call with empty fields
await db._generate_and_store_embedding(
store_listing_version_id="version-123",
name="",
sub_heading="",
description="",
)
# Verify embedding service was NOT called
mock_embedding_service.generate_embedding.assert_not_called()
# Verify database was NOT updated
mock_query.assert_not_called()
@pytest.mark.asyncio(loop_scope="session")
async def test_generate_and_store_embedding_handles_error(mocker):
"""Test that embedding generation errors don't crash the operation."""
# Mock embedding service to raise an error
mock_embedding_service = mocker.MagicMock()
mock_embedding_service.generate_embedding = mocker.AsyncMock(
side_effect=Exception("API error")
)
mocker.patch(
"backend.server.v2.store.db.get_embedding_service",
mocker.AsyncMock(return_value=mock_embedding_service),
)
# Call should not raise - errors are logged but not propagated
await db._generate_and_store_embedding(
store_listing_version_id="version-123",
name="Test Agent",
sub_heading="A test agent",
description="Does testing",
)
# Verify embedding service was called (and failed)
mock_embedding_service.generate_embedding.assert_called_once()

View File

@@ -0,0 +1,92 @@
-- Migration: Replace full-text search with pgvector-based vector search
-- This migration:
-- 1. Enables the pgvector extension
-- 2. Drops the StoreAgent view (depends on search column)
-- 3. Removes the full-text search infrastructure (trigger, function, tsvector column)
-- 4. Adds a vector embedding column for semantic search
-- 5. Creates an index for fast vector similarity search
-- 6. Recreates the StoreAgent view with the embedding column
-- Enable pgvector extension
CREATE EXTENSION IF NOT EXISTS vector;
-- First drop the view that depends on the search column
DROP VIEW IF EXISTS "StoreAgent";
-- Remove full-text search infrastructure
DROP TRIGGER IF EXISTS "update_tsvector" ON "StoreListingVersion";
DROP FUNCTION IF EXISTS update_tsvector_column();
-- Drop the tsvector search column
ALTER TABLE "StoreListingVersion" DROP COLUMN IF EXISTS "search";
-- Add embedding column for vector search (1536 dimensions for text-embedding-3-small)
ALTER TABLE "StoreListingVersion"
ADD COLUMN IF NOT EXISTS "embedding" vector(1536);
-- Create IVFFlat index for fast similarity search
-- Using cosine distance (vector_cosine_ops) which is standard for text embeddings
-- lists = 100 is appropriate for datasets under 1M rows
CREATE INDEX IF NOT EXISTS idx_store_listing_version_embedding
ON "StoreListingVersion"
USING ivfflat (embedding vector_cosine_ops)
WITH (lists = 100);
-- Recreate StoreAgent view WITHOUT search column, WITH embedding column
CREATE OR REPLACE VIEW "StoreAgent" AS
WITH latest_versions AS (
SELECT
"storeListingId",
MAX(version) AS max_version
FROM "StoreListingVersion"
WHERE "submissionStatus" = 'APPROVED'
GROUP BY "storeListingId"
),
agent_versions AS (
SELECT
"storeListingId",
array_agg(DISTINCT version::text ORDER BY version::text) AS versions
FROM "StoreListingVersion"
WHERE "submissionStatus" = 'APPROVED'
GROUP BY "storeListingId"
)
SELECT
sl.id AS listing_id,
slv.id AS "storeListingVersionId",
slv."createdAt" AS updated_at,
sl.slug,
COALESCE(slv.name, '') AS agent_name,
slv."videoUrl" AS agent_video,
COALESCE(slv."imageUrls", ARRAY[]::text[]) AS agent_image,
slv."isFeatured" AS featured,
p.username AS creator_username,
p."avatarUrl" AS creator_avatar,
slv."subHeading" AS sub_heading,
slv.description,
slv.categories,
slv.embedding,
COALESCE(ar.run_count, 0::bigint) AS runs,
COALESCE(rs.avg_rating, 0.0)::double precision AS rating,
COALESCE(av.versions, ARRAY[slv.version::text]) AS versions,
COALESCE(sl."useForOnboarding", false) AS "useForOnboarding",
slv."isAvailable" AS is_available
FROM "StoreListing" sl
JOIN latest_versions lv
ON sl.id = lv."storeListingId"
JOIN "StoreListingVersion" slv
ON slv."storeListingId" = lv."storeListingId"
AND slv.version = lv.max_version
AND slv."submissionStatus" = 'APPROVED'
JOIN "AgentGraph" a
ON slv."agentGraphId" = a.id
AND slv."agentGraphVersion" = a.version
LEFT JOIN "Profile" p
ON sl."owningUserId" = p."userId"
LEFT JOIN "mv_review_stats" rs
ON sl.id = rs."storeListingId"
LEFT JOIN "mv_agent_run_counts" ar
ON a.id = ar."agentGraphId"
LEFT JOIN agent_versions av
ON sl.id = av."storeListingId"
WHERE sl."isDeleted" = false
AND sl."hasApprovedVersion" = true;

View File

@@ -712,7 +712,7 @@ view StoreAgent {
sub_heading String
description String
categories String[]
search Unsupported("tsvector")? @default(dbgenerated("''::tsvector"))
embedding Unsupported("vector(1536)")?
runs Int
rating Float
versions String[]
@@ -847,7 +847,8 @@ model StoreListingVersion {
// Old versions can be made unavailable by the author if desired
isAvailable Boolean @default(true)
search Unsupported("tsvector")? @default(dbgenerated("''::tsvector"))
// Vector embedding for semantic search (replaces tsvector full-text search)
embedding Unsupported("vector(1536)")?
// Version workflow state
submissionStatus SubmissionStatus @default(DRAFT)