fix(backend): use public schema for pgvector types

pgvector extension is installed in the 'public' schema, not the
application schema (e.g., 'platform'). This fix introduces a
{pgvector_schema} placeholder that defaults to 'public' for
qualifying pgvector types and operators.

Changes:
- Add pgvector_schema placeholder to db.py raw query helpers
- Update embeddings.py to use {pgvector_schema}.vector
- Update hybrid_search.py to use {pgvector_schema} for operators
- Add type ignore for rank_bm25 untyped import

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Zamil Majdy
2026-01-20 16:48:29 +00:00
parent 0a48c49902
commit fb5c89d881
3 changed files with 48 additions and 23 deletions

View File

@@ -154,16 +154,16 @@ async def store_content_embedding(
# Upsert the embedding
# WHERE clause in DO UPDATE prevents PostgreSQL 15 bug with NULLS NOT DISTINCT
# Use {schema}.vector for explicit pgvector type qualification
# Use {pgvector_schema}.vector for explicit pgvector type qualification
await execute_raw_with_schema(
"""
INSERT INTO {schema_prefix}"UnifiedContentEmbedding" (
"id", "contentType", "contentId", "userId", "embedding", "searchableText", "metadata", "createdAt", "updatedAt"
)
VALUES (gen_random_uuid()::text, $1::{schema_prefix}"ContentType", $2, $3, $4::{schema}.vector, $5, $6::jsonb, NOW(), NOW())
VALUES (gen_random_uuid()::text, $1::{schema_prefix}"ContentType", $2, $3, $4::{pgvector_schema}.vector, $5, $6::jsonb, NOW(), NOW())
ON CONFLICT ("contentType", "contentId", "userId")
DO UPDATE SET
"embedding" = $4::{schema}.vector,
"embedding" = $4::{pgvector_schema}.vector,
"searchableText" = $5,
"metadata" = $6::jsonb,
"updatedAt" = NOW()
@@ -870,7 +870,7 @@ async def semantic_search(
# Add content type parameters and build placeholders dynamically
content_type_start_idx = len(params) + 1
content_type_placeholders = ", ".join(
'$' + str(content_type_start_idx + i) + '::{schema_prefix}"ContentType"'
"$" + str(content_type_start_idx + i) + '::{schema_prefix}"ContentType"'
for i in range(len(content_types))
)
params.extend([ct.value for ct in content_types])
@@ -880,21 +880,33 @@ async def semantic_search(
params.append(min_similarity)
# Use regular string (not f-string) for template to preserve {schema_prefix} and {schema} placeholders
# Use OPERATOR({schema}.<=>) for explicit operator schema qualification
sql = """
# Use OPERATOR({pgvector_schema}.<=>) for explicit operator schema qualification
sql = (
"""
SELECT
"contentId" as content_id,
"contentType" as content_type,
"searchableText" as searchable_text,
metadata,
1 - (embedding OPERATOR({schema}.<=>) '""" + embedding_str + """'::{schema}.vector) as similarity
1 - (embedding OPERATOR({pgvector_schema}.<=>) '"""
+ embedding_str
+ """'::{pgvector_schema}.vector) as similarity
FROM {schema_prefix}"UnifiedContentEmbedding"
WHERE "contentType" IN (""" + content_type_placeholders + """)
""" + user_filter + """
AND 1 - (embedding OPERATOR({schema}.<=>) '""" + embedding_str + """'::{schema}.vector) >= $""" + str(min_similarity_idx) + """
WHERE "contentType" IN ("""
+ content_type_placeholders
+ """)
"""
+ user_filter
+ """
AND 1 - (embedding OPERATOR({pgvector_schema}.<=>) '"""
+ embedding_str
+ """'::{pgvector_schema}.vector) >= $"""
+ str(min_similarity_idx)
+ """
ORDER BY similarity DESC
LIMIT $1
"""
)
try:
results = await query_raw_with_schema(sql, *params)
@@ -924,7 +936,7 @@ async def semantic_search(
# Add content type parameters and build placeholders dynamically
content_type_start_idx = len(params_lexical) + 1
content_type_placeholders_lexical = ", ".join(
'$' + str(content_type_start_idx + i) + '::{schema_prefix}"ContentType"'
"$" + str(content_type_start_idx + i) + '::{schema_prefix}"ContentType"'
for i in range(len(content_types))
)
params_lexical.extend([ct.value for ct in content_types])
@@ -934,7 +946,8 @@ async def semantic_search(
params_lexical.append(f"%{query}%")
# Use regular string (not f-string) for template to preserve {schema_prefix} placeholders
sql_lexical = """
sql_lexical = (
"""
SELECT
"contentId" as content_id,
"contentType" as content_type,
@@ -942,12 +955,19 @@ async def semantic_search(
metadata,
0.0 as similarity
FROM {schema_prefix}"UnifiedContentEmbedding"
WHERE "contentType" IN (""" + content_type_placeholders_lexical + """)
""" + user_filter + """
AND "searchableText" ILIKE $""" + str(query_param_idx) + """
WHERE "contentType" IN ("""
+ content_type_placeholders_lexical
+ """)
"""
+ user_filter
+ """
AND "searchableText" ILIKE $"""
+ str(query_param_idx)
+ """
ORDER BY "updatedAt" DESC
LIMIT $1
"""
)
try:
results = await query_raw_with_schema(sql_lexical, *params_lexical)

View File

@@ -12,7 +12,7 @@ from dataclasses import dataclass
from typing import Any, Literal
from prisma.enums import ContentType
from rank_bm25 import BM25Okapi
from rank_bm25 import BM25Okapi # type: ignore[import-untyped]
from backend.api.features.store.embeddings import (
EMBEDDING_DIM,
@@ -295,7 +295,7 @@ async def unified_hybrid_search(
FROM {{schema_prefix}}"UnifiedContentEmbedding" uce
WHERE uce."contentType" = ANY({content_types_param}::{{schema_prefix}}"ContentType"[])
{user_filter}
ORDER BY uce.embedding OPERATOR({{schema}}.<=>) {embedding_param}::{{schema}}.vector
ORDER BY uce.embedding OPERATOR({{pgvector_schema}}.<=>) {embedding_param}::{{pgvector_schema}}.vector
LIMIT 200
)
),
@@ -307,7 +307,7 @@ async def unified_hybrid_search(
uce.metadata,
uce."updatedAt" as updated_at,
-- Semantic score: cosine similarity (1 - distance)
COALESCE(1 - (uce.embedding OPERATOR({{schema}}.<=>) {embedding_param}::{{schema}}.vector), 0) as semantic_score,
COALESCE(1 - (uce.embedding OPERATOR({{pgvector_schema}}.<=>) {embedding_param}::{{pgvector_schema}}.vector), 0) as semantic_score,
-- Lexical score: ts_rank_cd
COALESCE(ts_rank_cd(uce.search, plainto_tsquery('english', {query_param})), 0) as lexical_raw,
-- Category match from metadata
@@ -583,7 +583,7 @@ async def hybrid_search(
WHERE uce."contentType" = 'STORE_AGENT'::{{schema_prefix}}"ContentType"
AND uce."userId" IS NULL
AND {where_clause}
ORDER BY uce.embedding OPERATOR({{schema}}.<=>) {embedding_param}::{{schema}}.vector
ORDER BY uce.embedding OPERATOR({{pgvector_schema}}.<=>) {embedding_param}::{{pgvector_schema}}.vector
LIMIT 200
) uce
),
@@ -605,7 +605,7 @@ async def hybrid_search(
-- Searchable text for BM25 reranking
COALESCE(sa.agent_name, '') || ' ' || COALESCE(sa.sub_heading, '') || ' ' || COALESCE(sa.description, '') as searchable_text,
-- Semantic score
COALESCE(1 - (uce.embedding OPERATOR({{schema}}.<=>) {embedding_param}::{{schema}}.vector), 0) as semantic_score,
COALESCE(1 - (uce.embedding OPERATOR({{pgvector_schema}}.<=>) {embedding_param}::{{pgvector_schema}}.vector), 0) as semantic_score,
-- Lexical score (raw, will normalize)
COALESCE(ts_rank_cd(uce.search, plainto_tsquery('english', {query_param})), 0) as lexical_raw,
-- Category match

View File

@@ -120,10 +120,11 @@ async def _raw_with_schema(
Supports placeholders:
- {schema_prefix}: Table/type prefix (e.g., "platform".)
- {schema}: Raw schema name (e.g., platform) for pgvector types and operators
- {schema}: Raw schema name for application tables (e.g., platform)
- {pgvector_schema}: Schema where pgvector is installed (defaults to "public")
Args:
query_template: SQL query with {schema_prefix} and/or {schema} placeholders
query_template: SQL query with {schema_prefix}, {schema}, and/or {pgvector_schema} placeholders
*args: Query parameters
execute: If False, executes SELECT query. If True, executes INSERT/UPDATE/DELETE.
client: Optional Prisma client for transactions (only used when execute=True).
@@ -134,16 +135,20 @@ async def _raw_with_schema(
Example with vector type:
await execute_raw_with_schema(
'INSERT INTO {schema_prefix}"Embedding" (vec) VALUES ($1::{schema}.vector)',
'INSERT INTO {schema_prefix}"Embedding" (vec) VALUES ($1::{pgvector_schema}.vector)',
embedding_data
)
"""
schema = get_database_schema()
schema_prefix = f'"{schema}".' if schema != "public" else ""
# pgvector extension is typically installed in "public" schema
# On Supabase it may be in "extensions" but "public" is the common default
pgvector_schema = "public"
formatted_query = query_template.format(
schema_prefix=schema_prefix,
schema=schema,
pgvector_schema=pgvector_schema,
)
import prisma as prisma_module