mirror of
https://github.com/Significant-Gravitas/AutoGPT.git
synced 2026-01-26 07:28:44 -05:00
Compare commits
2 Commits
feat/agent
...
claude/fix
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
00730496e3 | ||
|
|
21c753b971 |
@@ -26,6 +26,25 @@ def add_param(url: str, key: str, value: str) -> str:
|
||||
|
||||
DATABASE_URL = os.getenv("DATABASE_URL", "postgresql://localhost:5432")
|
||||
|
||||
# Extract the application schema from DATABASE_URL for use in queries
|
||||
_parsed = urlparse(DATABASE_URL)
|
||||
_query_params = dict(parse_qsl(_parsed.query))
|
||||
_app_schema = _query_params.get("schema", "public")
|
||||
|
||||
# Build search_path that includes app schema and extension schemas where pgvector may live.
|
||||
# This is used both in connection options (may be ignored by PgBouncer) and in SET LOCAL
|
||||
# statements before raw queries (guaranteed to work).
|
||||
SEARCH_PATH = f"{_app_schema},extensions,public" if _app_schema != "public" else "public,extensions"
|
||||
|
||||
# Try to set search_path via PostgreSQL options parameter at connection time.
|
||||
# NOTE: This may be ignored by PgBouncer in transaction pooling mode.
|
||||
# As a fallback, we also SET LOCAL search_path before raw queries.
|
||||
if "options" in _query_params:
|
||||
_query_params["options"] = _query_params["options"] + f" -c search_path={SEARCH_PATH}"
|
||||
else:
|
||||
_query_params["options"] = f"-c search_path={SEARCH_PATH}"
|
||||
DATABASE_URL = urlunparse(_parsed._replace(query=urlencode(_query_params)))
|
||||
|
||||
CONN_LIMIT = os.getenv("DB_CONNECTION_LIMIT")
|
||||
if CONN_LIMIT:
|
||||
DATABASE_URL = add_param(DATABASE_URL, "connection_limit", CONN_LIMIT)
|
||||
@@ -108,6 +127,34 @@ def get_database_schema() -> str:
|
||||
return query_params.get("schema", "public")
|
||||
|
||||
|
||||
async def get_connection_debug_info() -> dict:
|
||||
"""Get diagnostic info about the current database connection.
|
||||
|
||||
Useful for debugging "table does not exist" or "type does not exist" errors
|
||||
that may indicate connections going to different database instances.
|
||||
|
||||
Returns dict with: search_path, current_schema, server_version, pg_backend_pid
|
||||
"""
|
||||
import prisma as prisma_module
|
||||
|
||||
try:
|
||||
result = await prisma_module.get_client().query_raw(
|
||||
"""
|
||||
SELECT
|
||||
current_setting('search_path') as search_path,
|
||||
current_schema() as current_schema,
|
||||
current_database() as current_database,
|
||||
inet_server_addr() as server_addr,
|
||||
inet_server_port() as server_port,
|
||||
pg_backend_pid() as backend_pid,
|
||||
version() as server_version
|
||||
"""
|
||||
)
|
||||
return result[0] if result else {}
|
||||
except Exception as e:
|
||||
return {"error": str(e)}
|
||||
|
||||
|
||||
async def _raw_with_schema(
|
||||
query_template: str,
|
||||
*args,
|
||||
@@ -124,8 +171,9 @@ async def _raw_with_schema(
|
||||
|
||||
Note on pgvector types:
|
||||
Use unqualified ::vector and <=> operator in queries. PostgreSQL resolves
|
||||
these via search_path, which includes the schema where pgvector is installed
|
||||
on all environments (local, CI, dev).
|
||||
these via search_path. The connection's search_path is configured at module
|
||||
load to include common extension schemas (public, extensions) where pgvector
|
||||
may be installed across different environments (local, CI, Supabase).
|
||||
|
||||
Args:
|
||||
query_template: SQL query with {schema_prefix} and/or {schema} placeholders
|
||||
@@ -155,12 +203,45 @@ async def _raw_with_schema(
|
||||
|
||||
db_client = client if client else prisma_module.get_client()
|
||||
|
||||
if execute:
|
||||
result = await db_client.execute_raw(formatted_query, *args) # type: ignore
|
||||
else:
|
||||
result = await db_client.query_raw(formatted_query, *args) # type: ignore
|
||||
# For queries that might use pgvector types (::vector or <=> operator),
|
||||
# we need to ensure search_path includes the schema where pgvector is installed.
|
||||
# PgBouncer in transaction mode may ignore connection-level options, so we
|
||||
# use SET LOCAL within a transaction to guarantee correct search_path.
|
||||
needs_vector_search_path = "::vector" in formatted_query or "<=>" in formatted_query
|
||||
|
||||
return result
|
||||
try:
|
||||
if needs_vector_search_path and client is None:
|
||||
# Use transaction to set search_path for vector queries
|
||||
async with db_client.tx() as tx:
|
||||
await tx.execute_raw(f"SET LOCAL search_path TO {SEARCH_PATH}")
|
||||
if execute:
|
||||
result = await tx.execute_raw(formatted_query, *args) # type: ignore
|
||||
else:
|
||||
result = await tx.query_raw(formatted_query, *args) # type: ignore
|
||||
else:
|
||||
# Regular query without vector types, or already in a transaction
|
||||
if execute:
|
||||
result = await db_client.execute_raw(formatted_query, *args) # type: ignore
|
||||
else:
|
||||
result = await db_client.query_raw(formatted_query, *args) # type: ignore
|
||||
return result
|
||||
except Exception as e:
|
||||
error_msg = str(e)
|
||||
# Log connection debug info for "does not exist" errors to help diagnose
|
||||
# whether connections are going to different database instances
|
||||
if "does not exist" in error_msg:
|
||||
try:
|
||||
debug_info = await get_connection_debug_info()
|
||||
logger.error(
|
||||
f"Database object not found. Connection debug info: {debug_info}. "
|
||||
f"Query template: {query_template[:200]}... Error: {error_msg}"
|
||||
)
|
||||
except Exception:
|
||||
logger.error(
|
||||
f"Database object not found (debug info unavailable). "
|
||||
f"Query template: {query_template[:200]}... Error: {error_msg}"
|
||||
)
|
||||
raise
|
||||
|
||||
|
||||
async def query_raw_with_schema(query_template: str, *args) -> list[dict]:
|
||||
|
||||
@@ -216,7 +216,27 @@ async def get_business_understanding(
|
||||
|
||||
# Cache miss - load from database
|
||||
logger.debug(f"Business understanding cache miss for user {user_id}")
|
||||
record = await CoPilotUnderstanding.prisma().find_unique(where={"userId": user_id})
|
||||
try:
|
||||
record = await CoPilotUnderstanding.prisma().find_unique(where={"userId": user_id})
|
||||
except Exception as e:
|
||||
error_msg = str(e)
|
||||
if "does not exist" in error_msg:
|
||||
# Log connection debug info to diagnose if connections go to different DBs
|
||||
from backend.data.db import get_connection_debug_info
|
||||
|
||||
try:
|
||||
debug_info = await get_connection_debug_info()
|
||||
logger.error(
|
||||
f"CoPilotUnderstanding table not found. Connection debug: {debug_info}. "
|
||||
f"Error: {error_msg}"
|
||||
)
|
||||
except Exception:
|
||||
logger.error(
|
||||
f"CoPilotUnderstanding table not found (debug unavailable). "
|
||||
f"Error: {error_msg}"
|
||||
)
|
||||
raise
|
||||
|
||||
if record is None:
|
||||
return None
|
||||
|
||||
|
||||
Reference in New Issue
Block a user