Merge branch 'dev' into feat/sensitive-action-features

Resolved conflicts in embeddings.py by accepting dev's fail-fast error handling approach for missing OpenAI API keys.
This commit is contained in:
Zamil Majdy
2026-01-24 09:01:24 -06:00
6 changed files with 212 additions and 251 deletions

View File

@@ -1552,7 +1552,7 @@ async def review_store_submission(
# Generate embedding for approved listing (blocking - admin operation)
# Inside transaction: if embedding fails, entire transaction rolls back
embedding_success = await ensure_embedding(
await ensure_embedding(
version_id=store_listing_version_id,
name=store_listing_version.name,
description=store_listing_version.description,
@@ -1560,12 +1560,6 @@ async def review_store_submission(
categories=store_listing_version.categories or [],
tx=tx,
)
if not embedding_success:
raise ValueError(
f"Failed to generate embedding for listing {store_listing_version_id}. "
"This is likely due to OpenAI API being unavailable. "
"Please try again later or contact support if the issue persists."
)
await prisma.models.StoreListing.prisma(tx).update(
where={"id": store_listing_version.StoreListing.id},

View File

@@ -105,53 +105,42 @@ def build_searchable_text(
return " ".join(parts)
async def generate_embedding(text: str) -> list[float] | None:
async def generate_embedding(text: str) -> list[float]:
"""
Generate embedding for text using OpenAI API.
Returns None if embedding generation fails.
Fail-fast: no retries to maintain consistency with approval flow.
Raises exceptions on failure - caller should handle.
"""
try:
client = get_openai_client()
if not client:
log_once_per_task(
"openai_api_key_missing",
logger.error,
"openai_internal_api_key not set, cannot generate embeddings",
)
return None
client = get_openai_client()
if not client:
raise RuntimeError("openai_internal_api_key not set, cannot generate embedding")
# Truncate text to token limit using tiktoken
# Character-based truncation is insufficient because token ratios vary by content type
enc = encoding_for_model(EMBEDDING_MODEL)
tokens = enc.encode(text)
if len(tokens) > EMBEDDING_MAX_TOKENS:
tokens = tokens[:EMBEDDING_MAX_TOKENS]
truncated_text = enc.decode(tokens)
logger.info(
f"Truncated text from {len(enc.encode(text))} to {len(tokens)} tokens"
)
else:
truncated_text = text
start_time = time.time()
response = await client.embeddings.create(
model=EMBEDDING_MODEL,
input=truncated_text,
)
latency_ms = (time.time() - start_time) * 1000
embedding = response.data[0].embedding
# Truncate text to token limit using tiktoken
# Character-based truncation is insufficient because token ratios vary by content type
enc = encoding_for_model(EMBEDDING_MODEL)
tokens = enc.encode(text)
if len(tokens) > EMBEDDING_MAX_TOKENS:
tokens = tokens[:EMBEDDING_MAX_TOKENS]
truncated_text = enc.decode(tokens)
logger.info(
f"Generated embedding: {len(embedding)} dims, "
f"{len(tokens)} tokens, {latency_ms:.0f}ms"
f"Truncated text from {len(enc.encode(text))} to {len(tokens)} tokens"
)
return embedding
else:
truncated_text = text
except Exception as e:
logger.error(f"Failed to generate embedding: {e}")
return None
start_time = time.time()
response = await client.embeddings.create(
model=EMBEDDING_MODEL,
input=truncated_text,
)
latency_ms = (time.time() - start_time) * 1000
embedding = response.data[0].embedding
logger.info(
f"Generated embedding: {len(embedding)} dims, "
f"{len(tokens)} tokens, {latency_ms:.0f}ms"
)
return embedding
async def store_embedding(
@@ -190,48 +179,45 @@ async def store_content_embedding(
New function for unified content embedding storage.
Uses raw SQL since Prisma doesn't natively support pgvector.
Raises exceptions on failure - caller should handle.
"""
try:
client = tx if tx else prisma.get_client()
client = tx if tx else prisma.get_client()
# Convert embedding to PostgreSQL vector format
embedding_str = embedding_to_vector_string(embedding)
metadata_json = dumps(metadata or {})
# Convert embedding to PostgreSQL vector format
embedding_str = embedding_to_vector_string(embedding)
metadata_json = dumps(metadata or {})
# Upsert the embedding
# WHERE clause in DO UPDATE prevents PostgreSQL 15 bug with NULLS NOT DISTINCT
# Use unqualified ::vector - pgvector is in search_path on all environments
await execute_raw_with_schema(
"""
INSERT INTO {schema_prefix}"UnifiedContentEmbedding" (
"id", "contentType", "contentId", "userId", "embedding", "searchableText", "metadata", "createdAt", "updatedAt"
)
VALUES (gen_random_uuid()::text, $1::{schema_prefix}"ContentType", $2, $3, $4::vector, $5, $6::jsonb, NOW(), NOW())
ON CONFLICT ("contentType", "contentId", "userId")
DO UPDATE SET
"embedding" = $4::vector,
"searchableText" = $5,
"metadata" = $6::jsonb,
"updatedAt" = NOW()
WHERE {schema_prefix}"UnifiedContentEmbedding"."contentType" = $1::{schema_prefix}"ContentType"
AND {schema_prefix}"UnifiedContentEmbedding"."contentId" = $2
AND ({schema_prefix}"UnifiedContentEmbedding"."userId" = $3 OR ($3 IS NULL AND {schema_prefix}"UnifiedContentEmbedding"."userId" IS NULL))
""",
content_type,
content_id,
user_id,
embedding_str,
searchable_text,
metadata_json,
client=client,
# Upsert the embedding
# WHERE clause in DO UPDATE prevents PostgreSQL 15 bug with NULLS NOT DISTINCT
# Use unqualified ::vector - pgvector is in search_path on all environments
await execute_raw_with_schema(
"""
INSERT INTO {schema_prefix}"UnifiedContentEmbedding" (
"id", "contentType", "contentId", "userId", "embedding", "searchableText", "metadata", "createdAt", "updatedAt"
)
VALUES (gen_random_uuid()::text, $1::{schema_prefix}"ContentType", $2, $3, $4::vector, $5, $6::jsonb, NOW(), NOW())
ON CONFLICT ("contentType", "contentId", "userId")
DO UPDATE SET
"embedding" = $4::vector,
"searchableText" = $5,
"metadata" = $6::jsonb,
"updatedAt" = NOW()
WHERE {schema_prefix}"UnifiedContentEmbedding"."contentType" = $1::{schema_prefix}"ContentType"
AND {schema_prefix}"UnifiedContentEmbedding"."contentId" = $2
AND ({schema_prefix}"UnifiedContentEmbedding"."userId" = $3 OR ($3 IS NULL AND {schema_prefix}"UnifiedContentEmbedding"."userId" IS NULL))
""",
content_type,
content_id,
user_id,
embedding_str,
searchable_text,
metadata_json,
client=client,
)
logger.info(f"Stored embedding for {content_type}:{content_id}")
return True
except Exception as e:
logger.error(f"Failed to store embedding for {content_type}:{content_id}: {e}")
return False
logger.info(f"Stored embedding for {content_type}:{content_id}")
return True
async def get_embedding(version_id: str) -> dict[str, Any] | None:
@@ -263,34 +249,31 @@ async def get_content_embedding(
New function for unified content embedding retrieval.
Returns dict with contentType, contentId, embedding, timestamps or None if not found.
Raises exceptions on failure - caller should handle.
"""
try:
result = await query_raw_with_schema(
"""
SELECT
"contentType",
"contentId",
"userId",
"embedding"::text as "embedding",
"searchableText",
"metadata",
"createdAt",
"updatedAt"
FROM {schema_prefix}"UnifiedContentEmbedding"
WHERE "contentType" = $1::{schema_prefix}"ContentType" AND "contentId" = $2 AND ("userId" = $3 OR ($3 IS NULL AND "userId" IS NULL))
""",
content_type,
content_id,
user_id,
)
result = await query_raw_with_schema(
"""
SELECT
"contentType",
"contentId",
"userId",
"embedding"::text as "embedding",
"searchableText",
"metadata",
"createdAt",
"updatedAt"
FROM {schema_prefix}"UnifiedContentEmbedding"
WHERE "contentType" = $1::{schema_prefix}"ContentType" AND "contentId" = $2 AND ("userId" = $3 OR ($3 IS NULL AND "userId" IS NULL))
""",
content_type,
content_id,
user_id,
)
if result and len(result) > 0:
return result[0]
return None
except Exception as e:
logger.error(f"Failed to get embedding for {content_type}:{content_id}: {e}")
return None
if result and len(result) > 0:
return result[0]
return None
async def ensure_embedding(
@@ -318,51 +301,38 @@ async def ensure_embedding(
tx: Optional transaction client
Returns:
True if embedding exists/was created, False on failure
True if embedding exists/was created
Raises exceptions on failure - caller should handle.
"""
try:
# Check if embedding already exists
if not force:
existing = await get_embedding(version_id)
if existing and existing.get("embedding"):
logger.debug(f"Embedding for version {version_id} already exists")
return True
# Check if embedding already exists
if not force:
existing = await get_embedding(version_id)
if existing and existing.get("embedding"):
logger.debug(f"Embedding for version {version_id} already exists")
return True
# Build searchable text for embedding
searchable_text = build_searchable_text(
name, description, sub_heading, categories
)
# Build searchable text for embedding
searchable_text = build_searchable_text(name, description, sub_heading, categories)
# Generate new embedding
embedding = await generate_embedding(searchable_text)
if embedding is None:
log_once_per_task(
"embedding_generation_failed",
logger.warning,
"Could not generate embeddings (missing API key or service unavailable). "
"Embedding generation is disabled for this task.",
)
return False
# Generate new embedding
embedding = await generate_embedding(searchable_text)
# Store the embedding with metadata using new function
metadata = {
"name": name,
"subHeading": sub_heading,
"categories": categories,
}
return await store_content_embedding(
content_type=ContentType.STORE_AGENT,
content_id=version_id,
embedding=embedding,
searchable_text=searchable_text,
metadata=metadata,
user_id=None, # Store agents are public
tx=tx,
)
except Exception as e:
logger.error(f"Failed to ensure embedding for version {version_id}: {e}")
return False
# Store the embedding with metadata using new function
metadata = {
"name": name,
"subHeading": sub_heading,
"categories": categories,
}
return await store_content_embedding(
content_type=ContentType.STORE_AGENT,
content_id=version_id,
embedding=embedding,
searchable_text=searchable_text,
metadata=metadata,
user_id=None, # Store agents are public
tx=tx,
)
async def delete_embedding(version_id: str) -> bool:
@@ -572,6 +542,24 @@ async def backfill_all_content_types(batch_size: int = 10) -> dict[str, Any]:
success = sum(1 for result in results if result is True)
failed = len(results) - success
# Aggregate unique errors to avoid Sentry spam
if failed > 0:
# Group errors by type and message
error_summary: dict[str, int] = {}
for result in results:
if isinstance(result, Exception):
error_key = f"{type(result).__name__}: {str(result)}"
error_summary[error_key] = error_summary.get(error_key, 0) + 1
# Log aggregated error summary
error_details = ", ".join(
f"{error} ({count}x)" for error, count in error_summary.items()
)
logger.error(
f"{content_type.value}: {failed}/{len(results)} embeddings failed. "
f"Errors: {error_details}"
)
results_by_type[content_type.value] = {
"processed": len(missing_items),
"success": success,
@@ -608,11 +596,12 @@ async def backfill_all_content_types(batch_size: int = 10) -> dict[str, Any]:
}
async def embed_query(query: str) -> list[float] | None:
async def embed_query(query: str) -> list[float]:
"""
Generate embedding for a search query.
Same as generate_embedding but with clearer intent.
Raises exceptions on failure - caller should handle.
"""
return await generate_embedding(query)
@@ -645,43 +634,30 @@ async def ensure_content_embedding(
tx: Optional transaction client
Returns:
True if embedding exists/was created, False on failure
True if embedding exists/was created
Raises exceptions on failure - caller should handle.
"""
try:
# Check if embedding already exists
if not force:
existing = await get_content_embedding(content_type, content_id, user_id)
if existing and existing.get("embedding"):
logger.debug(
f"Embedding for {content_type}:{content_id} already exists"
)
return True
# Check if embedding already exists
if not force:
existing = await get_content_embedding(content_type, content_id, user_id)
if existing and existing.get("embedding"):
logger.debug(f"Embedding for {content_type}:{content_id} already exists")
return True
# Generate new embedding
embedding = await generate_embedding(searchable_text)
if embedding is None:
log_once_per_task(
"embedding_generation_failed",
logger.warning,
"Could not generate embeddings (missing API key or service unavailable). "
"Embedding generation is disabled for this task.",
)
return False
# Generate new embedding
embedding = await generate_embedding(searchable_text)
# Store the embedding
return await store_content_embedding(
content_type=content_type,
content_id=content_id,
embedding=embedding,
searchable_text=searchable_text,
metadata=metadata or {},
user_id=user_id,
tx=tx,
)
except Exception as e:
logger.error(f"Failed to ensure embedding for {content_type}:{content_id}: {e}")
return False
# Store the embedding
return await store_content_embedding(
content_type=content_type,
content_id=content_id,
embedding=embedding,
searchable_text=searchable_text,
metadata=metadata or {},
user_id=user_id,
tx=tx,
)
async def cleanup_orphaned_embeddings() -> dict[str, Any]:
@@ -908,9 +884,8 @@ async def semantic_search(
limit = 100
# Generate query embedding
query_embedding = await embed_query(query)
if query_embedding is not None:
try:
query_embedding = await embed_query(query)
# Semantic search with embeddings
embedding_str = embedding_to_vector_string(query_embedding)
@@ -961,24 +936,21 @@ async def semantic_search(
"""
)
try:
results = await query_raw_with_schema(sql, *params)
return [
{
"content_id": row["content_id"],
"content_type": row["content_type"],
"searchable_text": row["searchable_text"],
"metadata": row["metadata"],
"similarity": float(row["similarity"]),
}
for row in results
]
except Exception as e:
logger.error(f"Semantic search failed: {e}")
# Fall through to lexical search below
results = await query_raw_with_schema(sql, *params)
return [
{
"content_id": row["content_id"],
"content_type": row["content_type"],
"searchable_text": row["searchable_text"],
"metadata": row["metadata"],
"similarity": float(row["similarity"]),
}
for row in results
]
except Exception as e:
logger.warning(f"Semantic search failed, falling back to lexical search: {e}")
# Fallback to lexical search if embeddings unavailable
logger.warning("Falling back to lexical search (embeddings unavailable)")
params_lexical: list[Any] = [limit]
user_filter = ""

View File

@@ -298,17 +298,16 @@ async def test_schema_handling_error_cases():
mock_client.execute_raw.side_effect = Exception("Database error")
mock_get_client.return_value = mock_client
result = await embeddings.store_content_embedding(
content_type=ContentType.STORE_AGENT,
content_id="test-id",
embedding=[0.1] * EMBEDDING_DIM,
searchable_text="test",
metadata=None,
user_id=None,
)
# Should return False on error, not raise
assert result is False
# Should raise exception on error
with pytest.raises(Exception, match="Database error"):
await embeddings.store_content_embedding(
content_type=ContentType.STORE_AGENT,
content_id="test-id",
embedding=[0.1] * EMBEDDING_DIM,
searchable_text="test",
metadata=None,
user_id=None,
)
if __name__ == "__main__":

View File

@@ -80,9 +80,8 @@ async def test_generate_embedding_no_api_key():
) as mock_get_client:
mock_get_client.return_value = None
result = await embeddings.generate_embedding("test text")
assert result is None
with pytest.raises(RuntimeError, match="openai_internal_api_key not set"):
await embeddings.generate_embedding("test text")
@pytest.mark.asyncio(loop_scope="session")
@@ -97,9 +96,8 @@ async def test_generate_embedding_api_error():
) as mock_get_client:
mock_get_client.return_value = mock_client
result = await embeddings.generate_embedding("test text")
assert result is None
with pytest.raises(Exception, match="API Error"):
await embeddings.generate_embedding("test text")
@pytest.mark.asyncio(loop_scope="session")
@@ -173,11 +171,10 @@ async def test_store_embedding_database_error(mocker):
embedding = [0.1, 0.2, 0.3]
result = await embeddings.store_embedding(
version_id="test-version-id", embedding=embedding, tx=mock_client
)
assert result is False
with pytest.raises(Exception, match="Database error"):
await embeddings.store_embedding(
version_id="test-version-id", embedding=embedding, tx=mock_client
)
@pytest.mark.asyncio(loop_scope="session")
@@ -277,17 +274,16 @@ async def test_ensure_embedding_create_new(mock_get, mock_store, mock_generate):
async def test_ensure_embedding_generation_fails(mock_get, mock_generate):
"""Test ensure_embedding when generation fails."""
mock_get.return_value = None
mock_generate.return_value = None
mock_generate.side_effect = Exception("Generation failed")
result = await embeddings.ensure_embedding(
version_id="test-id",
name="Test",
description="Test description",
sub_heading="Test heading",
categories=["test"],
)
assert result is False
with pytest.raises(Exception, match="Generation failed"):
await embeddings.ensure_embedding(
version_id="test-id",
name="Test",
description="Test description",
sub_heading="Test heading",
categories=["test"],
)
@pytest.mark.asyncio(loop_scope="session")

View File

@@ -186,13 +186,12 @@ async def unified_hybrid_search(
offset = (page - 1) * page_size
# Generate query embedding
query_embedding = await embed_query(query)
# Graceful degradation if embedding unavailable
if query_embedding is None or not query_embedding:
# Generate query embedding with graceful degradation
try:
query_embedding = await embed_query(query)
except Exception as e:
logger.warning(
"Failed to generate query embedding - falling back to lexical-only search. "
f"Failed to generate query embedding - falling back to lexical-only search: {e}. "
"Check that openai_internal_api_key is configured and OpenAI API is accessible."
)
query_embedding = [0.0] * EMBEDDING_DIM
@@ -464,13 +463,12 @@ async def hybrid_search(
offset = (page - 1) * page_size
# Generate query embedding
query_embedding = await embed_query(query)
# Graceful degradation
if query_embedding is None or not query_embedding:
# Generate query embedding with graceful degradation
try:
query_embedding = await embed_query(query)
except Exception as e:
logger.warning(
"Failed to generate query embedding - falling back to lexical-only search."
f"Failed to generate query embedding - falling back to lexical-only search: {e}"
)
query_embedding = [0.0] * EMBEDDING_DIM
total_non_semantic = (

View File

@@ -172,8 +172,8 @@ async def test_hybrid_search_without_embeddings():
with patch(
"backend.api.features.store.hybrid_search.query_raw_with_schema"
) as mock_query:
# Simulate embedding failure
mock_embed.return_value = None
# Simulate embedding failure by raising exception
mock_embed.side_effect = Exception("Embedding generation failed")
mock_query.return_value = mock_results
# Should NOT raise - graceful degradation
@@ -613,7 +613,9 @@ async def test_unified_hybrid_search_graceful_degradation():
"backend.api.features.store.hybrid_search.embed_query"
) as mock_embed:
mock_query.return_value = mock_results
mock_embed.return_value = None # Embedding failure
mock_embed.side_effect = Exception(
"Embedding generation failed"
) # Embedding failure
# Should NOT raise - graceful degradation
results, total = await unified_hybrid_search(