Compare commits

...

2 Commits

Author SHA1 Message Date
Claude
00730496e3 fix(backend): use SET LOCAL search_path for vector queries
The connection-level search_path via options parameter is being ignored,
likely by PgBouncer/Supavisor in transaction pooling mode. Logs showed
connections with wrong search_path ('"$user", public, extensions') and
wrong current_schema ('public' instead of 'platform').

This fix wraps vector-type queries (those using ::vector or <=>) in a
transaction with SET LOCAL search_path to ensure pgvector types are
always resolvable, regardless of PgBouncer configuration.

Also exposed SEARCH_PATH constant for use in other modules if needed.
2026-01-25 23:36:30 +00:00
Claude
21c753b971 fix(backend): ensure pgvector search_path for all pooled connections
Fix random "type 'vector' does not exist" errors during copilot chat
block search. The error occurred because pooled database connections
could have inconsistent search_path configurations that didn't include
the schema where pgvector was installed.

The fix adds search_path to the PostgreSQL connection options to ensure
all connections include both the application schema and common extension
schemas (public, extensions) where pgvector may be installed across
different environments (local, CI, Supabase).

Also adds connection debug logging for "does not exist" errors to help
diagnose the related "CoPilotUnderstanding table does not exist" issue,
which may indicate connections routing to different database instances.

Debug info includes: search_path, current_schema, server_addr, backend_pid.
2026-01-25 22:26:10 +00:00
2 changed files with 109 additions and 8 deletions

View File

@@ -26,6 +26,25 @@ def add_param(url: str, key: str, value: str) -> str:
DATABASE_URL = os.getenv("DATABASE_URL", "postgresql://localhost:5432")
# Extract the application schema from DATABASE_URL for use in queries
_parsed = urlparse(DATABASE_URL)
_query_params = dict(parse_qsl(_parsed.query))
_app_schema = _query_params.get("schema", "public")
# Build search_path that includes app schema and extension schemas where pgvector may live.
# This is used both in connection options (may be ignored by PgBouncer) and in SET LOCAL
# statements before raw queries (guaranteed to work).
SEARCH_PATH = f"{_app_schema},extensions,public" if _app_schema != "public" else "public,extensions"
# Try to set search_path via PostgreSQL options parameter at connection time.
# NOTE: This may be ignored by PgBouncer in transaction pooling mode.
# As a fallback, we also SET LOCAL search_path before raw queries.
if "options" in _query_params:
_query_params["options"] = _query_params["options"] + f" -c search_path={SEARCH_PATH}"
else:
_query_params["options"] = f"-c search_path={SEARCH_PATH}"
DATABASE_URL = urlunparse(_parsed._replace(query=urlencode(_query_params)))
CONN_LIMIT = os.getenv("DB_CONNECTION_LIMIT")
if CONN_LIMIT:
DATABASE_URL = add_param(DATABASE_URL, "connection_limit", CONN_LIMIT)
@@ -108,6 +127,34 @@ def get_database_schema() -> str:
return query_params.get("schema", "public")
async def get_connection_debug_info() -> dict:
"""Get diagnostic info about the current database connection.
Useful for debugging "table does not exist" or "type does not exist" errors
that may indicate connections going to different database instances.
Returns dict with: search_path, current_schema, server_version, pg_backend_pid
"""
import prisma as prisma_module
try:
result = await prisma_module.get_client().query_raw(
"""
SELECT
current_setting('search_path') as search_path,
current_schema() as current_schema,
current_database() as current_database,
inet_server_addr() as server_addr,
inet_server_port() as server_port,
pg_backend_pid() as backend_pid,
version() as server_version
"""
)
return result[0] if result else {}
except Exception as e:
return {"error": str(e)}
async def _raw_with_schema(
query_template: str,
*args,
@@ -124,8 +171,9 @@ async def _raw_with_schema(
Note on pgvector types:
Use unqualified ::vector and <=> operator in queries. PostgreSQL resolves
these via search_path, which includes the schema where pgvector is installed
on all environments (local, CI, dev).
these via search_path. The connection's search_path is configured at module
load to include common extension schemas (public, extensions) where pgvector
may be installed across different environments (local, CI, Supabase).
Args:
query_template: SQL query with {schema_prefix} and/or {schema} placeholders
@@ -155,12 +203,45 @@ async def _raw_with_schema(
db_client = client if client else prisma_module.get_client()
if execute:
result = await db_client.execute_raw(formatted_query, *args) # type: ignore
else:
result = await db_client.query_raw(formatted_query, *args) # type: ignore
# For queries that might use pgvector types (::vector or <=> operator),
# we need to ensure search_path includes the schema where pgvector is installed.
# PgBouncer in transaction mode may ignore connection-level options, so we
# use SET LOCAL within a transaction to guarantee correct search_path.
needs_vector_search_path = "::vector" in formatted_query or "<=>" in formatted_query
return result
try:
if needs_vector_search_path and client is None:
# Use transaction to set search_path for vector queries
async with db_client.tx() as tx:
await tx.execute_raw(f"SET LOCAL search_path TO {SEARCH_PATH}")
if execute:
result = await tx.execute_raw(formatted_query, *args) # type: ignore
else:
result = await tx.query_raw(formatted_query, *args) # type: ignore
else:
# Regular query without vector types, or already in a transaction
if execute:
result = await db_client.execute_raw(formatted_query, *args) # type: ignore
else:
result = await db_client.query_raw(formatted_query, *args) # type: ignore
return result
except Exception as e:
error_msg = str(e)
# Log connection debug info for "does not exist" errors to help diagnose
# whether connections are going to different database instances
if "does not exist" in error_msg:
try:
debug_info = await get_connection_debug_info()
logger.error(
f"Database object not found. Connection debug info: {debug_info}. "
f"Query template: {query_template[:200]}... Error: {error_msg}"
)
except Exception:
logger.error(
f"Database object not found (debug info unavailable). "
f"Query template: {query_template[:200]}... Error: {error_msg}"
)
raise
async def query_raw_with_schema(query_template: str, *args) -> list[dict]:

View File

@@ -216,7 +216,27 @@ async def get_business_understanding(
# Cache miss - load from database
logger.debug(f"Business understanding cache miss for user {user_id}")
record = await CoPilotUnderstanding.prisma().find_unique(where={"userId": user_id})
try:
record = await CoPilotUnderstanding.prisma().find_unique(where={"userId": user_id})
except Exception as e:
error_msg = str(e)
if "does not exist" in error_msg:
# Log connection debug info to diagnose if connections go to different DBs
from backend.data.db import get_connection_debug_info
try:
debug_info = await get_connection_debug_info()
logger.error(
f"CoPilotUnderstanding table not found. Connection debug: {debug_info}. "
f"Error: {error_msg}"
)
except Exception:
logger.error(
f"CoPilotUnderstanding table not found (debug unavailable). "
f"Error: {error_msg}"
)
raise
if record is None:
return None