mirror of
https://github.com/Significant-Gravitas/AutoGPT.git
synced 2026-04-08 03:00:28 -04:00
fix(backend): ensure pgvector search_path for all pooled connections
Fix random "type 'vector' does not exist" errors during copilot chat block search. The error occurred because pooled database connections could have inconsistent search_path configurations that didn't include the schema where pgvector was installed. The fix adds search_path to the PostgreSQL connection options to ensure all connections include both the application schema and common extension schemas (public, extensions) where pgvector may be installed across different environments (local, CI, Supabase). Also adds connection debug logging for "does not exist" errors to help diagnose the related "CoPilotUnderstanding table does not exist" issue, which may indicate connections routing to different database instances. Debug info includes: search_path, current_schema, server_addr, backend_pid.
This commit is contained in:
@@ -26,6 +26,24 @@ def add_param(url: str, key: str, value: str) -> str:
|
||||
|
||||
DATABASE_URL = os.getenv("DATABASE_URL", "postgresql://localhost:5432")
|
||||
|
||||
# Ensure search_path includes common extension schemas for pgvector resolution.
|
||||
# This fixes random "type 'vector' does not exist" errors when connection pool
|
||||
# returns connections that don't have pgvector's schema in their search_path.
|
||||
# We extract the app schema and prepend it, then add 'extensions' and 'public'
|
||||
# where pgvector is commonly installed (Supabase uses 'extensions', local uses 'public').
|
||||
_parsed = urlparse(DATABASE_URL)
|
||||
_query_params = dict(parse_qsl(_parsed.query))
|
||||
_app_schema = _query_params.get("schema", "public")
|
||||
# Build search_path: app_schema first (for tables), then extension schemas
|
||||
_search_path = f"{_app_schema},extensions,public" if _app_schema != "public" else "public,extensions"
|
||||
# Add search_path via PostgreSQL options parameter
|
||||
if "options" in _query_params:
|
||||
# Append to existing options
|
||||
_query_params["options"] = _query_params["options"] + f" -c search_path={_search_path}"
|
||||
else:
|
||||
_query_params["options"] = f"-c search_path={_search_path}"
|
||||
DATABASE_URL = urlunparse(_parsed._replace(query=urlencode(_query_params)))
|
||||
|
||||
CONN_LIMIT = os.getenv("DB_CONNECTION_LIMIT")
|
||||
if CONN_LIMIT:
|
||||
DATABASE_URL = add_param(DATABASE_URL, "connection_limit", CONN_LIMIT)
|
||||
@@ -108,6 +126,34 @@ def get_database_schema() -> str:
|
||||
return query_params.get("schema", "public")
|
||||
|
||||
|
||||
async def get_connection_debug_info() -> dict:
|
||||
"""Get diagnostic info about the current database connection.
|
||||
|
||||
Useful for debugging "table does not exist" or "type does not exist" errors
|
||||
that may indicate connections going to different database instances.
|
||||
|
||||
Returns dict with: search_path, current_schema, server_version, pg_backend_pid
|
||||
"""
|
||||
import prisma as prisma_module
|
||||
|
||||
try:
|
||||
result = await prisma_module.get_client().query_raw(
|
||||
"""
|
||||
SELECT
|
||||
current_setting('search_path') as search_path,
|
||||
current_schema() as current_schema,
|
||||
current_database() as current_database,
|
||||
inet_server_addr() as server_addr,
|
||||
inet_server_port() as server_port,
|
||||
pg_backend_pid() as backend_pid,
|
||||
version() as server_version
|
||||
"""
|
||||
)
|
||||
return result[0] if result else {}
|
||||
except Exception as e:
|
||||
return {"error": str(e)}
|
||||
|
||||
|
||||
async def _raw_with_schema(
|
||||
query_template: str,
|
||||
*args,
|
||||
@@ -124,8 +170,9 @@ async def _raw_with_schema(
|
||||
|
||||
Note on pgvector types:
|
||||
Use unqualified ::vector and <=> operator in queries. PostgreSQL resolves
|
||||
these via search_path, which includes the schema where pgvector is installed
|
||||
on all environments (local, CI, dev).
|
||||
these via search_path. The connection's search_path is configured at module
|
||||
load to include common extension schemas (public, extensions) where pgvector
|
||||
may be installed across different environments (local, CI, Supabase).
|
||||
|
||||
Args:
|
||||
query_template: SQL query with {schema_prefix} and/or {schema} placeholders
|
||||
@@ -155,12 +202,29 @@ async def _raw_with_schema(
|
||||
|
||||
db_client = client if client else prisma_module.get_client()
|
||||
|
||||
if execute:
|
||||
result = await db_client.execute_raw(formatted_query, *args) # type: ignore
|
||||
else:
|
||||
result = await db_client.query_raw(formatted_query, *args) # type: ignore
|
||||
|
||||
return result
|
||||
try:
|
||||
if execute:
|
||||
result = await db_client.execute_raw(formatted_query, *args) # type: ignore
|
||||
else:
|
||||
result = await db_client.query_raw(formatted_query, *args) # type: ignore
|
||||
return result
|
||||
except Exception as e:
|
||||
error_msg = str(e)
|
||||
# Log connection debug info for "does not exist" errors to help diagnose
|
||||
# whether connections are going to different database instances
|
||||
if "does not exist" in error_msg:
|
||||
try:
|
||||
debug_info = await get_connection_debug_info()
|
||||
logger.error(
|
||||
f"Database object not found. Connection debug info: {debug_info}. "
|
||||
f"Query template: {query_template[:200]}... Error: {error_msg}"
|
||||
)
|
||||
except Exception:
|
||||
logger.error(
|
||||
f"Database object not found (debug info unavailable). "
|
||||
f"Query template: {query_template[:200]}... Error: {error_msg}"
|
||||
)
|
||||
raise
|
||||
|
||||
|
||||
async def query_raw_with_schema(query_template: str, *args) -> list[dict]:
|
||||
|
||||
@@ -216,7 +216,27 @@ async def get_business_understanding(
|
||||
|
||||
# Cache miss - load from database
|
||||
logger.debug(f"Business understanding cache miss for user {user_id}")
|
||||
record = await CoPilotUnderstanding.prisma().find_unique(where={"userId": user_id})
|
||||
try:
|
||||
record = await CoPilotUnderstanding.prisma().find_unique(where={"userId": user_id})
|
||||
except Exception as e:
|
||||
error_msg = str(e)
|
||||
if "does not exist" in error_msg:
|
||||
# Log connection debug info to diagnose if connections go to different DBs
|
||||
from backend.data.db import get_connection_debug_info
|
||||
|
||||
try:
|
||||
debug_info = await get_connection_debug_info()
|
||||
logger.error(
|
||||
f"CoPilotUnderstanding table not found. Connection debug: {debug_info}. "
|
||||
f"Error: {error_msg}"
|
||||
)
|
||||
except Exception:
|
||||
logger.error(
|
||||
f"CoPilotUnderstanding table not found (debug unavailable). "
|
||||
f"Error: {error_msg}"
|
||||
)
|
||||
raise
|
||||
|
||||
if record is None:
|
||||
return None
|
||||
|
||||
|
||||
Reference in New Issue
Block a user