mirror of
https://github.com/Significant-Gravitas/AutoGPT.git
synced 2026-01-12 00:28:31 -05:00
Compare commits
8 Commits
dev
...
swiftyos/s
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
39839a51b3 | ||
|
|
b114354b1f | ||
|
|
0c0488ec0b | ||
|
|
5388a321c7 | ||
|
|
8ae5cbecd6 | ||
|
|
900b0e736f | ||
|
|
0085e5e6e0 | ||
|
|
5c8dac46f5 |
@@ -17,6 +17,41 @@ DATABASE_URL="postgresql://${DB_USER}:${DB_PASS}@${DB_HOST}:${DB_PORT}/${DB_NAME
|
||||
DIRECT_URL="postgresql://${DB_USER}:${DB_PASS}@${DB_HOST}:${DB_PORT}/${DB_NAME}?schema=${DB_SCHEMA}&connect_timeout=${DB_CONNECT_TIMEOUT}"
|
||||
PRISMA_SCHEMA="postgres/schema.prisma"
|
||||
|
||||
# SQLAlchemy Configuration (for gradual migration from Prisma)
|
||||
# Set to true to enable SQLAlchemy alongside Prisma (both ORMs coexist during migration)
|
||||
ENABLE_SQLALCHEMY=false
|
||||
|
||||
# Connection Pool Configuration
|
||||
# IMPORTANT: With 6 backend processes, total connections = 6 × (POOL_SIZE + MAX_OVERFLOW)
|
||||
# Must stay under PostgreSQL max_connections (default: 100)
|
||||
#
|
||||
# Environment-specific recommendations:
|
||||
# Development: POOL_SIZE=2-3, MAX_OVERFLOW=1-2 (lightweight, fast startup)
|
||||
# Test/CI: POOL_SIZE=2, MAX_OVERFLOW=1 (minimal resources, parallel test safety)
|
||||
# Production: POOL_SIZE=10-20, MAX_OVERFLOW=5-10 (handle real traffic and bursts)
|
||||
#
|
||||
# Default values below are suitable for production use:
|
||||
SQLALCHEMY_POOL_SIZE=10
|
||||
SQLALCHEMY_MAX_OVERFLOW=5
|
||||
|
||||
# Timeout Configuration
|
||||
# POOL_TIMEOUT: How long to wait for an available connection from the pool (when all connections busy)
|
||||
# CONNECT_TIMEOUT: How long to wait when establishing a NEW connection to PostgreSQL
|
||||
#
|
||||
# Environment-specific recommendations:
|
||||
# Development: POOL_TIMEOUT=10-30s, CONNECT_TIMEOUT=5-10s
|
||||
# Test/CI: POOL_TIMEOUT=5-10s, CONNECT_TIMEOUT=5-10s (fail fast)
|
||||
# Production: POOL_TIMEOUT=30s, CONNECT_TIMEOUT=10-15s
|
||||
#
|
||||
# Default values below are suitable for production use:
|
||||
SQLALCHEMY_POOL_TIMEOUT=30
|
||||
SQLALCHEMY_CONNECT_TIMEOUT=10
|
||||
|
||||
# SQL Query Logging
|
||||
# Set to true to log ALL SQL statements (very verbose, useful for debugging)
|
||||
# Should always be false in production
|
||||
SQLALCHEMY_ECHO=false
|
||||
|
||||
## ===== REQUIRED SERVICE CREDENTIALS ===== ##
|
||||
# Redis Configuration
|
||||
REDIS_HOST=localhost
|
||||
|
||||
233
autogpt_platform/backend/backend/data/sqlalchemy.py
Normal file
233
autogpt_platform/backend/backend/data/sqlalchemy.py
Normal file
@@ -0,0 +1,233 @@
|
||||
"""
|
||||
SQLAlchemy infrastructure for AutoGPT Platform.
|
||||
|
||||
This module provides:
|
||||
1. Async engine creation with connection pooling
|
||||
2. Session factory for dependency injection
|
||||
3. Database lifecycle management
|
||||
"""
|
||||
|
||||
import logging
|
||||
import re
|
||||
from contextlib import asynccontextmanager
|
||||
from typing import AsyncGenerator
|
||||
|
||||
from sqlalchemy.ext.asyncio import (
|
||||
AsyncEngine,
|
||||
AsyncSession,
|
||||
async_sessionmaker,
|
||||
create_async_engine,
|
||||
)
|
||||
|
||||
from backend.util.settings import Config
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# ============================================================================
|
||||
# CONFIGURATION
|
||||
# ============================================================================
|
||||
|
||||
|
||||
def get_database_url() -> str:
|
||||
"""
|
||||
Extract database URL from environment and convert to async format.
|
||||
|
||||
Prisma URL: postgresql://user:pass@host:port/db?schema=platform&connect_timeout=60
|
||||
Async URL: postgresql+asyncpg://user:pass@host:port/db
|
||||
|
||||
Returns the async-compatible URL without query parameters (handled via connect_args).
|
||||
"""
|
||||
prisma_url = Config().database_url
|
||||
|
||||
# Replace postgresql:// with postgresql+asyncpg://
|
||||
async_url = prisma_url.replace("postgresql://", "postgresql+asyncpg://")
|
||||
|
||||
# Remove ALL query parameters (schema, connect_timeout, etc.)
|
||||
# We'll handle these through connect_args instead
|
||||
async_url = re.sub(r"\?.*$", "", async_url)
|
||||
|
||||
return async_url
|
||||
|
||||
|
||||
def get_database_schema() -> str:
|
||||
"""
|
||||
Extract schema name from DATABASE_URL query parameter.
|
||||
|
||||
Returns 'platform' by default (matches Prisma configuration).
|
||||
"""
|
||||
prisma_url = Config().database_url
|
||||
match = re.search(r"schema=(\w+)", prisma_url)
|
||||
return match.group(1) if match else "platform"
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# ENGINE CREATION
|
||||
# ============================================================================
|
||||
|
||||
|
||||
def create_engine() -> AsyncEngine:
|
||||
"""
|
||||
Create async SQLAlchemy engine with connection pooling.
|
||||
|
||||
This should be called ONCE per process at startup.
|
||||
The engine is long-lived and thread-safe.
|
||||
|
||||
Connection Pool Configuration:
|
||||
- pool_size: Number of persistent connections (default: 10)
|
||||
- max_overflow: Additional connections when pool exhausted (default: 5)
|
||||
- pool_timeout: Seconds to wait for connection (default: 30)
|
||||
- pool_pre_ping: Test connections before using (prevents stale connections)
|
||||
|
||||
Total max connections = pool_size + max_overflow = 15
|
||||
"""
|
||||
url = get_database_url()
|
||||
config = Config()
|
||||
|
||||
engine = create_async_engine(
|
||||
url,
|
||||
# Connection pool configuration
|
||||
pool_size=config.sqlalchemy_pool_size, # Persistent connections
|
||||
max_overflow=config.sqlalchemy_max_overflow, # Burst capacity
|
||||
pool_timeout=config.sqlalchemy_pool_timeout, # Wait time for connection
|
||||
pool_pre_ping=True, # Validate connections before use
|
||||
# Async configuration
|
||||
echo=config.sqlalchemy_echo, # Log SQL statements (dev/debug only)
|
||||
future=True, # Use SQLAlchemy 2.0 style
|
||||
# Connection arguments (passed to asyncpg)
|
||||
connect_args={
|
||||
"server_settings": {
|
||||
"search_path": get_database_schema(), # Use 'platform' schema
|
||||
},
|
||||
"timeout": config.sqlalchemy_connect_timeout, # Connection timeout
|
||||
},
|
||||
)
|
||||
|
||||
logger.info(
|
||||
f"SQLAlchemy engine created: pool_size={config.sqlalchemy_pool_size}, "
|
||||
f"max_overflow={config.sqlalchemy_max_overflow}, "
|
||||
f"schema={get_database_schema()}"
|
||||
)
|
||||
|
||||
return engine
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# SESSION FACTORY
|
||||
# ============================================================================
|
||||
|
||||
|
||||
def create_session_factory(engine: AsyncEngine) -> async_sessionmaker[AsyncSession]:
|
||||
"""
|
||||
Create session factory for creating AsyncSession instances.
|
||||
|
||||
The factory is configured once, then used to create sessions on-demand.
|
||||
Each session represents a single database transaction.
|
||||
|
||||
Args:
|
||||
engine: The async engine (with connection pool)
|
||||
|
||||
Returns:
|
||||
Session factory that creates properly configured AsyncSession instances
|
||||
"""
|
||||
return async_sessionmaker(
|
||||
bind=engine,
|
||||
class_=AsyncSession,
|
||||
expire_on_commit=False, # Don't expire objects after commit
|
||||
autoflush=False, # Manual control over when to flush
|
||||
autocommit=False, # Explicit transaction control
|
||||
)
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# DEPENDENCY INJECTION FOR FASTAPI
|
||||
# ============================================================================
|
||||
|
||||
# Global references (set during app startup)
|
||||
_engine: AsyncEngine | None = None
|
||||
_session_factory: async_sessionmaker[AsyncSession] | None = None
|
||||
|
||||
|
||||
def initialize(engine: AsyncEngine) -> None:
|
||||
"""
|
||||
Initialize global engine and session factory.
|
||||
|
||||
Called during FastAPI lifespan startup.
|
||||
|
||||
Args:
|
||||
engine: The async engine to use for this process
|
||||
"""
|
||||
global _engine, _session_factory
|
||||
_engine = engine
|
||||
_session_factory = create_session_factory(engine)
|
||||
logger.info("SQLAlchemy session factory initialized")
|
||||
|
||||
|
||||
@asynccontextmanager
|
||||
async def get_session() -> AsyncGenerator[AsyncSession, None]:
|
||||
"""
|
||||
FastAPI dependency that provides database session.
|
||||
|
||||
Usage in routes:
|
||||
@router.get("/users/{user_id}")
|
||||
async def get_user(
|
||||
user_id: int,
|
||||
session: AsyncSession = Depends(get_session)
|
||||
):
|
||||
result = await session.execute(select(User).where(User.id == user_id))
|
||||
return result.scalar_one_or_none()
|
||||
|
||||
Usage in DatabaseManager RPC methods:
|
||||
@expose
|
||||
async def get_user(user_id: int):
|
||||
async with get_session() as session:
|
||||
result = await session.execute(select(User).where(User.id == user_id))
|
||||
return result.scalar_one_or_none()
|
||||
|
||||
Lifecycle:
|
||||
1. Request arrives
|
||||
2. FastAPI calls this function (or used as context manager)
|
||||
3. Session is created (borrows connection from pool)
|
||||
4. Session is injected into route handler
|
||||
5. Route executes (may commit/rollback)
|
||||
6. Route returns
|
||||
7. Session is closed (returns connection to pool)
|
||||
|
||||
Error handling:
|
||||
- If exception occurs, session is rolled back
|
||||
- Connection is always returned to pool (even on error)
|
||||
"""
|
||||
if _session_factory is None:
|
||||
raise RuntimeError(
|
||||
"SQLAlchemy not initialized. Call initialize() in lifespan context."
|
||||
)
|
||||
|
||||
# Create session (borrows connection from pool)
|
||||
async with _session_factory() as session:
|
||||
try:
|
||||
yield session # Inject into route handler or context manager
|
||||
# If we get here, route succeeded - commit any pending changes
|
||||
await session.commit()
|
||||
except Exception:
|
||||
# Error occurred - rollback transaction
|
||||
await session.rollback()
|
||||
raise
|
||||
finally:
|
||||
# Always close session (returns connection to pool)
|
||||
await session.close()
|
||||
|
||||
|
||||
async def dispose() -> None:
|
||||
"""
|
||||
Dispose of engine and close all connections.
|
||||
|
||||
Called during FastAPI lifespan shutdown.
|
||||
Closes all connections in the pool gracefully.
|
||||
"""
|
||||
global _engine, _session_factory
|
||||
|
||||
if _engine is not None:
|
||||
logger.info("Disposing SQLAlchemy engine...")
|
||||
await _engine.dispose()
|
||||
_engine = None
|
||||
_session_factory = None
|
||||
logger.info("SQLAlchemy engine disposed")
|
||||
721
autogpt_platform/backend/backend/data/sqlalchemy_test.py
Normal file
721
autogpt_platform/backend/backend/data/sqlalchemy_test.py
Normal file
@@ -0,0 +1,721 @@
|
||||
"""
|
||||
Integration tests for SQLAlchemy infrastructure.
|
||||
|
||||
These tests verify:
|
||||
- Engine and session lifecycle management
|
||||
- Connection pool behavior
|
||||
- Database URL parsing and schema handling
|
||||
- Session dependency injection for FastAPI
|
||||
- Error handling and connection cleanup
|
||||
- Integration with the docker compose database
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import re
|
||||
from unittest.mock import patch
|
||||
|
||||
import pytest
|
||||
from sqlalchemy import literal, select, text
|
||||
from sqlalchemy.exc import DBAPIError
|
||||
from sqlalchemy.ext.asyncio import AsyncEngine, AsyncSession
|
||||
|
||||
from backend.data import sqlalchemy as sa
|
||||
from backend.util.settings import Config
|
||||
|
||||
# ============================================================================
|
||||
# FIXTURES
|
||||
# ============================================================================
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
async def engine_cleanup():
|
||||
"""Cleanup fixture to ensure engine is disposed after each test."""
|
||||
yield
|
||||
# Cleanup after test
|
||||
await sa.dispose()
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
async def initialized_sqlalchemy(engine_cleanup):
|
||||
"""
|
||||
Fixture that initializes SQLAlchemy for tests.
|
||||
|
||||
Creates engine and initializes global state.
|
||||
Automatically cleaned up after test.
|
||||
"""
|
||||
engine = sa.create_engine()
|
||||
sa.initialize(engine)
|
||||
yield engine
|
||||
await sa.dispose()
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def test_database_config():
|
||||
"""Fixture to provide test configuration values."""
|
||||
config = Config()
|
||||
return config
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# CONFIGURATION TESTS
|
||||
# ============================================================================
|
||||
|
||||
|
||||
@pytest.mark.asyncio(loop_scope="session")
|
||||
async def test_get_database_url_parsing():
|
||||
"""
|
||||
Test database URL conversion from Prisma format to asyncpg format.
|
||||
|
||||
Verifies:
|
||||
- postgresql:// is replaced with postgresql+asyncpg://
|
||||
- schema query parameter is removed
|
||||
- Other connection params are preserved
|
||||
"""
|
||||
# The actual DATABASE_URL should be in the environment
|
||||
url = sa.get_database_url()
|
||||
|
||||
# Verify it uses asyncpg driver
|
||||
assert "postgresql+asyncpg://" in url, "URL should use asyncpg driver"
|
||||
|
||||
# Verify schema parameter is removed from URL
|
||||
assert "?schema=" not in url, "Schema parameter should be removed from URL"
|
||||
assert "&schema=" not in url, "Schema parameter should be removed from URL"
|
||||
|
||||
# Verify it's a valid database URL structure
|
||||
assert re.match(
|
||||
r"postgresql\+asyncpg://.*@.*:\d+/.*", url
|
||||
), "URL should match expected format"
|
||||
|
||||
|
||||
@pytest.mark.asyncio(loop_scope="session")
|
||||
async def test_get_database_schema_extraction():
|
||||
"""
|
||||
Test schema extraction from DATABASE_URL query parameter.
|
||||
|
||||
Verifies the schema name is correctly parsed from the URL.
|
||||
"""
|
||||
schema = sa.get_database_schema()
|
||||
|
||||
# Should extract 'platform' schema (or whatever is configured)
|
||||
assert schema is not None, "Schema should not be None"
|
||||
assert isinstance(schema, str), "Schema should be a string"
|
||||
assert len(schema) > 0, "Schema should not be empty"
|
||||
|
||||
# Based on .env.default, should be 'platform'
|
||||
assert schema == "platform", "Default schema should be 'platform'"
|
||||
|
||||
|
||||
@pytest.mark.asyncio(loop_scope="session")
|
||||
async def test_get_database_schema_default():
|
||||
"""
|
||||
Test default schema when not specified in DATABASE_URL.
|
||||
|
||||
Verifies fallback to 'platform' when schema parameter is missing.
|
||||
"""
|
||||
# Test with mocked Config instance
|
||||
with patch("backend.data.sqlalchemy.Config") as MockConfig:
|
||||
mock_config = MockConfig.return_value
|
||||
mock_config.database_url = "postgresql://user:pass@localhost:5432/testdb"
|
||||
|
||||
schema = sa.get_database_schema()
|
||||
assert (
|
||||
schema == "platform"
|
||||
), "Should default to 'platform' when schema not specified"
|
||||
|
||||
|
||||
@pytest.mark.asyncio(loop_scope="session")
|
||||
async def test_database_url_removes_query_params():
|
||||
"""
|
||||
Test that get_database_url properly removes all query parameters.
|
||||
|
||||
Verifies ?... patterns are completely removed.
|
||||
"""
|
||||
# Test with mocked Config instance
|
||||
with patch("backend.data.sqlalchemy.Config") as MockConfig:
|
||||
mock_config = MockConfig.return_value
|
||||
mock_config.database_url = (
|
||||
"postgresql://user:pass@localhost:5432/db?schema=test&connect_timeout=60"
|
||||
)
|
||||
|
||||
url = sa.get_database_url()
|
||||
assert "?" not in url, "All query parameters should be removed"
|
||||
assert "schema=" not in url, "Schema parameter should be removed"
|
||||
assert (
|
||||
"connect_timeout" not in url
|
||||
), "Connect timeout parameter should be removed"
|
||||
assert (
|
||||
url == "postgresql+asyncpg://user:pass@localhost:5432/db"
|
||||
), "URL should only contain connection details without query params"
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# ENGINE CREATION TESTS
|
||||
# ============================================================================
|
||||
|
||||
|
||||
@pytest.mark.asyncio(loop_scope="session")
|
||||
async def test_create_engine_with_default_config(engine_cleanup):
|
||||
"""
|
||||
Test engine creation with default configuration.
|
||||
|
||||
Verifies:
|
||||
- Engine is created successfully
|
||||
- Engine is an AsyncEngine instance
|
||||
- Engine has a connection pool
|
||||
"""
|
||||
engine = sa.create_engine()
|
||||
|
||||
assert engine is not None, "Engine should be created"
|
||||
assert isinstance(engine, AsyncEngine), "Should create AsyncEngine"
|
||||
|
||||
# Verify engine has a pool
|
||||
assert engine.pool is not None, "Engine should have a connection pool"
|
||||
|
||||
# Cleanup
|
||||
await engine.dispose()
|
||||
|
||||
|
||||
@pytest.mark.asyncio(loop_scope="session")
|
||||
async def test_create_engine_pool_configuration(test_database_config):
|
||||
"""
|
||||
Test engine pool configuration.
|
||||
|
||||
Verifies pool_size, max_overflow, and timeout settings are applied.
|
||||
"""
|
||||
engine = sa.create_engine()
|
||||
|
||||
# Verify pool configuration
|
||||
pool = engine.pool
|
||||
assert pool is not None, "Engine should have a pool"
|
||||
|
||||
# Check pool size matches configuration
|
||||
config = test_database_config
|
||||
# Note: pool.size() returns the pool size
|
||||
# We use hasattr/getattr to avoid type checker issues with internal APIs
|
||||
if hasattr(pool, "size"):
|
||||
pool_size = pool.size() if callable(pool.size) else pool.size # type: ignore
|
||||
assert (
|
||||
pool_size == config.sqlalchemy_pool_size
|
||||
), f"Pool size should be {config.sqlalchemy_pool_size}"
|
||||
|
||||
# Verify max_overflow is set
|
||||
if hasattr(pool, "_max_overflow"):
|
||||
assert (
|
||||
getattr(pool, "_max_overflow") == config.sqlalchemy_max_overflow
|
||||
), f"Max overflow should be {config.sqlalchemy_max_overflow}"
|
||||
|
||||
# Cleanup
|
||||
await engine.dispose()
|
||||
|
||||
|
||||
@pytest.mark.asyncio(loop_scope="session")
|
||||
async def test_create_engine_connection_args():
|
||||
"""
|
||||
Test engine connection arguments.
|
||||
|
||||
Verifies search_path (schema) and timeout are configured correctly.
|
||||
"""
|
||||
engine = sa.create_engine()
|
||||
|
||||
# Get the expected schema
|
||||
expected_schema = sa.get_database_schema()
|
||||
|
||||
# Test by actually connecting and checking search_path
|
||||
async with engine.connect() as conn:
|
||||
# Query current search_path
|
||||
result = await conn.execute(text("SHOW search_path"))
|
||||
search_path = result.scalar()
|
||||
|
||||
# Verify the schema is in the search_path
|
||||
assert search_path is not None, "search_path should not be None"
|
||||
assert (
|
||||
expected_schema in search_path
|
||||
), f"Schema '{expected_schema}' should be in search_path"
|
||||
|
||||
# Cleanup
|
||||
await engine.dispose()
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# SESSION FACTORY TESTS
|
||||
# ============================================================================
|
||||
|
||||
|
||||
@pytest.mark.asyncio(loop_scope="session")
|
||||
async def test_create_session_factory():
|
||||
"""
|
||||
Test session factory creation.
|
||||
|
||||
Verifies factory is configured correctly with proper settings.
|
||||
"""
|
||||
engine = sa.create_engine()
|
||||
factory = sa.create_session_factory(engine)
|
||||
|
||||
assert factory is not None, "Factory should be created"
|
||||
|
||||
# Verify factory configuration
|
||||
assert (
|
||||
factory.kw.get("expire_on_commit") is False
|
||||
), "expire_on_commit should be False"
|
||||
assert factory.kw.get("autoflush") is False, "autoflush should be False"
|
||||
assert factory.kw.get("autocommit") is False, "autocommit should be False"
|
||||
|
||||
# Cleanup
|
||||
await engine.dispose()
|
||||
|
||||
|
||||
@pytest.mark.asyncio(loop_scope="session")
|
||||
async def test_session_factory_creates_sessions():
|
||||
"""
|
||||
Test that session factory can create working sessions.
|
||||
|
||||
Verifies sessions can execute queries successfully.
|
||||
"""
|
||||
engine = sa.create_engine()
|
||||
factory = sa.create_session_factory(engine)
|
||||
|
||||
# Create a session and execute a simple query
|
||||
async with factory() as session:
|
||||
result = await session.execute(select(1))
|
||||
value = result.scalar()
|
||||
assert value == 1, "Should execute simple query successfully"
|
||||
|
||||
# Cleanup
|
||||
await engine.dispose()
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# INITIALIZATION AND LIFECYCLE TESTS
|
||||
# ============================================================================
|
||||
|
||||
|
||||
@pytest.mark.asyncio(loop_scope="session")
|
||||
async def test_initialize_sets_globals(engine_cleanup):
|
||||
"""
|
||||
Test that initialize() sets global engine and session factory.
|
||||
|
||||
Verifies global state is properly configured.
|
||||
"""
|
||||
engine = sa.create_engine()
|
||||
sa.initialize(engine)
|
||||
|
||||
# Verify globals are set (by checking get_session doesn't raise)
|
||||
async with sa.get_session() as session:
|
||||
assert session is not None, "Should create session from factory"
|
||||
assert isinstance(session, AsyncSession), "Should be AsyncSession"
|
||||
|
||||
|
||||
@pytest.mark.asyncio(loop_scope="session")
|
||||
async def test_get_session_before_initialize_fails():
|
||||
"""
|
||||
Test that get_session() raises RuntimeError when not initialized.
|
||||
|
||||
Verifies proper error handling when used before initialization.
|
||||
"""
|
||||
# Ensure we're not initialized
|
||||
await sa.dispose()
|
||||
|
||||
# Should raise RuntimeError
|
||||
with pytest.raises(RuntimeError) as exc_info:
|
||||
async with sa.get_session():
|
||||
pass
|
||||
|
||||
assert (
|
||||
"not initialized" in str(exc_info.value).lower()
|
||||
), "Error message should mention not initialized"
|
||||
|
||||
|
||||
@pytest.mark.asyncio(loop_scope="session")
|
||||
async def test_get_session_provides_working_session(initialized_sqlalchemy):
|
||||
"""
|
||||
Test that get_session provides a working database session.
|
||||
|
||||
Verifies session can execute queries and access database.
|
||||
"""
|
||||
async with sa.get_session() as session:
|
||||
# Execute a simple query
|
||||
result = await session.execute(select(1))
|
||||
value = result.scalar()
|
||||
assert value == 1, "Should execute query successfully"
|
||||
|
||||
# Verify session is active
|
||||
assert session.is_active, "Session should be active"
|
||||
|
||||
|
||||
@pytest.mark.asyncio(loop_scope="session")
|
||||
async def test_get_session_commits_on_success(initialized_sqlalchemy):
|
||||
"""
|
||||
Test that get_session automatically commits on successful completion.
|
||||
|
||||
Verifies transaction is committed when no exception occurs.
|
||||
"""
|
||||
# This test verifies the commit behavior by checking that
|
||||
# the session successfully completes without errors
|
||||
async with sa.get_session() as session:
|
||||
# Execute a query
|
||||
result = await session.execute(select(1))
|
||||
assert result.scalar() == 1
|
||||
# Session should auto-commit on exit
|
||||
|
||||
# If we get here without exception, commit succeeded
|
||||
|
||||
|
||||
@pytest.mark.asyncio(loop_scope="session")
|
||||
async def test_dispose_closes_connections(initialized_sqlalchemy):
|
||||
"""
|
||||
Test that dispose() properly closes all connections.
|
||||
|
||||
Verifies cleanup is performed correctly.
|
||||
"""
|
||||
# Create a session to establish connection
|
||||
async with sa.get_session() as session:
|
||||
await session.execute(select(1))
|
||||
|
||||
# Dispose should close all connections
|
||||
await sa.dispose()
|
||||
|
||||
# Verify engine is cleaned up (globals should be None)
|
||||
# After dispose, get_session should fail
|
||||
with pytest.raises(RuntimeError):
|
||||
async with sa.get_session():
|
||||
pass
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# CONNECTION POOL INTEGRATION TESTS
|
||||
# ============================================================================
|
||||
|
||||
|
||||
@pytest.mark.asyncio(loop_scope="session")
|
||||
async def test_connection_pool_reuses_connections(initialized_sqlalchemy):
|
||||
"""
|
||||
Test that connection pool reuses connections.
|
||||
|
||||
Verifies connections are borrowed and returned to pool.
|
||||
"""
|
||||
# Execute multiple queries in sequence
|
||||
for i in range(5):
|
||||
async with sa.get_session() as session:
|
||||
result = await session.execute(select(literal(i)))
|
||||
assert result.scalar() == i
|
||||
|
||||
# All queries should complete successfully, reusing connections
|
||||
|
||||
|
||||
@pytest.mark.asyncio(loop_scope="session")
|
||||
async def test_connection_pool_concurrent_sessions(initialized_sqlalchemy):
|
||||
"""
|
||||
Test multiple concurrent sessions from the pool.
|
||||
|
||||
Verifies pool can handle concurrent access.
|
||||
"""
|
||||
|
||||
async def execute_query(query_id: int):
|
||||
async with sa.get_session() as session:
|
||||
result = await session.execute(select(literal(query_id)))
|
||||
return result.scalar()
|
||||
|
||||
# Run 5 concurrent queries
|
||||
results = await asyncio.gather(
|
||||
execute_query(1),
|
||||
execute_query(2),
|
||||
execute_query(3),
|
||||
execute_query(4),
|
||||
execute_query(5),
|
||||
)
|
||||
|
||||
# Verify all queries succeeded
|
||||
assert results == [
|
||||
1,
|
||||
2,
|
||||
3,
|
||||
4,
|
||||
5,
|
||||
], "All concurrent queries should complete successfully"
|
||||
|
||||
|
||||
@pytest.mark.asyncio(loop_scope="session")
|
||||
async def test_connection_pool_respects_limits(initialized_sqlalchemy):
|
||||
"""
|
||||
Test that connection pool respects size limits.
|
||||
|
||||
Verifies pool_size + max_overflow configuration.
|
||||
"""
|
||||
config = Config()
|
||||
max_connections = config.sqlalchemy_pool_size + config.sqlalchemy_max_overflow
|
||||
|
||||
# This test just verifies the pool doesn't crash with concurrent load
|
||||
# Actual limit enforcement is handled by SQLAlchemy
|
||||
|
||||
async def execute_query(query_id: int):
|
||||
async with sa.get_session() as session:
|
||||
await asyncio.sleep(0.1) # Hold connection briefly
|
||||
result = await session.execute(select(literal(query_id)))
|
||||
return result.scalar()
|
||||
|
||||
# Run queries up to the limit
|
||||
tasks = [execute_query(i) for i in range(min(max_connections, 10))]
|
||||
results = await asyncio.gather(*tasks)
|
||||
|
||||
assert len(results) == min(
|
||||
max_connections, 10
|
||||
), "Should handle concurrent queries up to pool limit"
|
||||
|
||||
|
||||
@pytest.mark.asyncio(loop_scope="session")
|
||||
async def test_connection_pool_timeout_on_exhaustion(initialized_sqlalchemy):
|
||||
"""
|
||||
Test pool timeout when all connections are exhausted.
|
||||
|
||||
Verifies TimeoutError is raised when waiting for connection.
|
||||
"""
|
||||
# This test is complex and may not be reliable in all environments
|
||||
# We'll test that the pool can handle at least some concurrent load
|
||||
# without timing out
|
||||
|
||||
async def hold_connection(duration: float):
|
||||
async with sa.get_session() as session:
|
||||
await asyncio.sleep(duration)
|
||||
result = await session.execute(select(1))
|
||||
return result.scalar()
|
||||
|
||||
# Run a few concurrent queries
|
||||
tasks = [hold_connection(0.1) for _ in range(3)]
|
||||
results = await asyncio.gather(*tasks)
|
||||
|
||||
assert all(
|
||||
r == 1 for r in results
|
||||
), "Should handle concurrent queries within pool capacity"
|
||||
|
||||
|
||||
@pytest.mark.asyncio(loop_scope="session")
|
||||
async def test_connection_pool_pre_ping(initialized_sqlalchemy):
|
||||
"""
|
||||
Test that pool_pre_ping validates connections.
|
||||
|
||||
Verifies stale connections are detected and refreshed.
|
||||
"""
|
||||
# Execute a query to establish a connection
|
||||
async with sa.get_session() as session:
|
||||
result = await session.execute(select(1))
|
||||
assert result.scalar() == 1
|
||||
|
||||
# Execute another query - pre_ping should validate connection
|
||||
async with sa.get_session() as session:
|
||||
result = await session.execute(select(literal(2)))
|
||||
assert result.scalar() == 2
|
||||
|
||||
# If pre_ping is working, both queries succeed
|
||||
|
||||
|
||||
@pytest.mark.asyncio(loop_scope="session")
|
||||
async def test_schema_search_path_applied(initialized_sqlalchemy):
|
||||
"""
|
||||
Test that queries use the correct schema (search_path).
|
||||
|
||||
Verifies connection search_path is set to platform schema.
|
||||
"""
|
||||
expected_schema = sa.get_database_schema()
|
||||
|
||||
async with sa.get_session() as session:
|
||||
# Check current search_path
|
||||
result = await session.execute(text("SHOW search_path"))
|
||||
search_path = result.scalar()
|
||||
|
||||
# Verify the platform schema is in search_path
|
||||
assert search_path is not None, "search_path should not be None"
|
||||
assert (
|
||||
expected_schema in search_path
|
||||
), f"Schema '{expected_schema}' should be in search_path"
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# ERROR HANDLING TESTS
|
||||
# ============================================================================
|
||||
|
||||
|
||||
@pytest.mark.asyncio(loop_scope="session")
|
||||
async def test_get_session_rolls_back_on_error(initialized_sqlalchemy):
|
||||
"""
|
||||
Test that get_session rolls back transaction on exception.
|
||||
|
||||
Verifies automatic rollback on error.
|
||||
"""
|
||||
try:
|
||||
async with sa.get_session() as session:
|
||||
# Execute a valid query
|
||||
await session.execute(select(1))
|
||||
|
||||
# Raise an exception
|
||||
raise ValueError("Test error")
|
||||
except ValueError:
|
||||
pass # Expected
|
||||
|
||||
# Should be able to use a new session after rollback
|
||||
async with sa.get_session() as session:
|
||||
result = await session.execute(select(1))
|
||||
assert (
|
||||
result.scalar() == 1
|
||||
), "Should be able to create new session after rollback"
|
||||
|
||||
|
||||
@pytest.mark.asyncio(loop_scope="session")
|
||||
async def test_get_session_always_closes_session(initialized_sqlalchemy):
|
||||
"""
|
||||
Test that get_session always closes session, even on error.
|
||||
|
||||
Verifies connection is returned to pool on exception.
|
||||
"""
|
||||
session_closed = False
|
||||
|
||||
try:
|
||||
async with sa.get_session() as session:
|
||||
# Execute query
|
||||
await session.execute(select(1))
|
||||
# Raise error
|
||||
raise RuntimeError("Test error")
|
||||
except RuntimeError:
|
||||
# Session should be closed even though we raised
|
||||
session_closed = True
|
||||
|
||||
assert session_closed, "Should have caught the error"
|
||||
|
||||
# Verify we can still create new sessions
|
||||
async with sa.get_session() as session:
|
||||
result = await session.execute(select(1))
|
||||
assert result.scalar() == 1, "Should be able to create new session after error"
|
||||
|
||||
|
||||
@pytest.mark.asyncio(loop_scope="session")
|
||||
async def test_database_connection_error_handling():
|
||||
"""
|
||||
Test behavior with invalid DATABASE_URL.
|
||||
|
||||
Verifies proper error handling for connection failures.
|
||||
"""
|
||||
try:
|
||||
# Mock Config with invalid URL
|
||||
with patch("backend.data.sqlalchemy.Config") as MockConfig:
|
||||
mock_config = MockConfig.return_value
|
||||
mock_config.database_url = (
|
||||
"postgresql://invalid:invalid@invalid:9999/invalid?schema=platform"
|
||||
)
|
||||
mock_config.sqlalchemy_pool_size = 10
|
||||
mock_config.sqlalchemy_max_overflow = 5
|
||||
mock_config.sqlalchemy_pool_timeout = 30
|
||||
mock_config.sqlalchemy_connect_timeout = 10
|
||||
mock_config.sqlalchemy_echo = False
|
||||
|
||||
engine = sa.create_engine()
|
||||
sa.initialize(engine)
|
||||
|
||||
# Try to use session - should fail with connection error
|
||||
with pytest.raises((DBAPIError, Exception)):
|
||||
async with sa.get_session() as session:
|
||||
await session.execute(select(1))
|
||||
finally:
|
||||
# Cleanup
|
||||
await sa.dispose()
|
||||
|
||||
|
||||
@pytest.mark.asyncio(loop_scope="session")
|
||||
async def test_concurrent_session_error_isolation(initialized_sqlalchemy):
|
||||
"""
|
||||
Test that error in one session doesn't affect others.
|
||||
|
||||
Verifies session isolation and independent error handling.
|
||||
"""
|
||||
|
||||
async def failing_query():
|
||||
try:
|
||||
async with sa.get_session() as session:
|
||||
# Execute invalid SQL
|
||||
await session.execute(text("SELECT * FROM nonexistent_table"))
|
||||
except Exception:
|
||||
return "failed"
|
||||
return "succeeded"
|
||||
|
||||
async def successful_query():
|
||||
async with sa.get_session() as session:
|
||||
result = await session.execute(select(1))
|
||||
return result.scalar()
|
||||
|
||||
# Run both concurrently
|
||||
results = await asyncio.gather(
|
||||
failing_query(), successful_query(), return_exceptions=False
|
||||
)
|
||||
|
||||
# First should fail, second should succeed
|
||||
assert results[0] == "failed", "First query should fail"
|
||||
assert results[1] == 1, "Second query should succeed despite first failing"
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# FASTAPI INTEGRATION TESTS
|
||||
# ============================================================================
|
||||
|
||||
|
||||
@pytest.mark.asyncio(loop_scope="session")
|
||||
async def test_session_dependency_injection(initialized_sqlalchemy):
|
||||
"""
|
||||
Test that session can be used as FastAPI dependency.
|
||||
|
||||
Verifies Depends(get_session) pattern works.
|
||||
"""
|
||||
# Simulate FastAPI dependency injection
|
||||
async with sa.get_session() as session:
|
||||
# This is how it would be injected into a route
|
||||
assert isinstance(session, AsyncSession), "Should receive AsyncSession instance"
|
||||
|
||||
# Should be able to execute queries
|
||||
result = await session.execute(select(1))
|
||||
assert result.scalar() == 1
|
||||
|
||||
|
||||
@pytest.mark.asyncio(loop_scope="session")
|
||||
async def test_session_lifecycle_in_endpoint(initialized_sqlalchemy):
|
||||
"""
|
||||
Test full request/response cycle with session.
|
||||
|
||||
Simulates a FastAPI endpoint using the session.
|
||||
"""
|
||||
|
||||
# Simulate an endpoint that uses the session
|
||||
async def mock_endpoint():
|
||||
async with sa.get_session() as session:
|
||||
# Simulate querying data
|
||||
result = await session.execute(select(literal(42)))
|
||||
value = result.scalar()
|
||||
|
||||
# Simulate returning response
|
||||
return {"value": value}
|
||||
|
||||
# Execute the mock endpoint
|
||||
response = await mock_endpoint()
|
||||
|
||||
assert response["value"] == 42, "Endpoint should return correct value"
|
||||
|
||||
# Verify we can still use sessions after endpoint completes
|
||||
async with sa.get_session() as session:
|
||||
result = await session.execute(select(1))
|
||||
assert result.scalar() == 1
|
||||
|
||||
|
||||
@pytest.mark.asyncio(loop_scope="session")
|
||||
async def test_multiple_requests_share_pool(initialized_sqlalchemy):
|
||||
"""
|
||||
Test that multiple requests share the same connection pool.
|
||||
|
||||
Verifies pool reuse across simulated requests.
|
||||
"""
|
||||
|
||||
async def simulate_request(request_id: int):
|
||||
async with sa.get_session() as session:
|
||||
result = await session.execute(select(literal(request_id)))
|
||||
return result.scalar()
|
||||
|
||||
# Simulate 10 concurrent requests
|
||||
results = await asyncio.gather(*[simulate_request(i) for i in range(10)])
|
||||
|
||||
# All requests should complete successfully
|
||||
assert results == list(range(10)), "All requests should complete using shared pool"
|
||||
@@ -88,10 +88,85 @@ class DatabaseManager(AppService):
|
||||
logger.info(f"[{self.service_name}] ⏳ Connecting to Database...")
|
||||
await db.connect()
|
||||
|
||||
# Initialize SQLAlchemy if enabled (for gradual migration from Prisma)
|
||||
if config.enable_sqlalchemy:
|
||||
from sqlalchemy.exc import DatabaseError, OperationalError
|
||||
from sqlalchemy.exc import TimeoutError as SQLAlchemyTimeoutError
|
||||
|
||||
from backend.data import sqlalchemy as sa
|
||||
|
||||
try:
|
||||
engine = sa.create_engine()
|
||||
sa.initialize(engine)
|
||||
app.state.db_engine = engine
|
||||
logger.info(
|
||||
f"[{self.service_name}] ✓ SQLAlchemy initialized "
|
||||
f"(pool_size={config.sqlalchemy_pool_size}, "
|
||||
f"max_overflow={config.sqlalchemy_max_overflow})"
|
||||
)
|
||||
except OperationalError as e:
|
||||
logger.error(
|
||||
f"[{self.service_name}] Failed to connect to database during SQLAlchemy initialization. "
|
||||
f"Check database connection settings (host, port, credentials). "
|
||||
f"Database URL: {config.database_url.split('@')[-1] if '@' in config.database_url else 'N/A'}. "
|
||||
f"Error: {e}"
|
||||
)
|
||||
raise
|
||||
except SQLAlchemyTimeoutError as e:
|
||||
logger.error(
|
||||
f"[{self.service_name}] Database connection timeout during SQLAlchemy initialization. "
|
||||
f"Timeout setting: {config.sqlalchemy_connect_timeout}s. "
|
||||
f"Check if database is accessible and increase timeout if needed. "
|
||||
f"Error: {e}"
|
||||
)
|
||||
raise
|
||||
except DatabaseError as e:
|
||||
logger.error(
|
||||
f"[{self.service_name}] Database error during SQLAlchemy initialization. "
|
||||
f"Check database permissions and configuration. "
|
||||
f"Error: {e}"
|
||||
)
|
||||
raise
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
f"[{self.service_name}] Unexpected error during SQLAlchemy initialization. "
|
||||
f"Configuration: pool_size={config.sqlalchemy_pool_size}, "
|
||||
f"max_overflow={config.sqlalchemy_max_overflow}, "
|
||||
f"pool_timeout={config.sqlalchemy_pool_timeout}s. "
|
||||
f"Error: {e}",
|
||||
exc_info=True,
|
||||
)
|
||||
raise
|
||||
|
||||
logger.info(f"[{self.service_name}] ✅ Ready")
|
||||
yield
|
||||
|
||||
logger.info(f"[{self.service_name}] ⏳ Disconnecting Database...")
|
||||
|
||||
# Dispose SQLAlchemy if it was enabled
|
||||
if config.enable_sqlalchemy:
|
||||
from sqlalchemy.exc import DatabaseError, OperationalError
|
||||
|
||||
from backend.data import sqlalchemy as sa
|
||||
|
||||
try:
|
||||
await sa.dispose()
|
||||
logger.info(f"[{self.service_name}] ✓ SQLAlchemy disposed")
|
||||
except (OperationalError, DatabaseError) as e:
|
||||
# Log as warning since disposal failures during shutdown are non-critical
|
||||
logger.warning(
|
||||
f"[{self.service_name}] Database error while disposing SQLAlchemy connections. "
|
||||
f"This may leave connections open but won't affect shutdown. "
|
||||
f"Error: {e}"
|
||||
)
|
||||
except Exception as e:
|
||||
logger.warning(
|
||||
f"[{self.service_name}] Unexpected error while disposing SQLAlchemy. "
|
||||
f"Connection pool may not be cleanly released. "
|
||||
f"Error: {e}",
|
||||
exc_info=True,
|
||||
)
|
||||
|
||||
await db.disconnect()
|
||||
|
||||
async def health_check(self) -> str:
|
||||
|
||||
@@ -78,6 +78,57 @@ async def lifespan_context(app: fastapi.FastAPI):
|
||||
|
||||
await backend.data.db.connect()
|
||||
|
||||
# Initialize SQLAlchemy if enabled (for gradual migration from Prisma)
|
||||
config = backend.util.settings.Config()
|
||||
if config.enable_sqlalchemy:
|
||||
from sqlalchemy.exc import DatabaseError, OperationalError
|
||||
from sqlalchemy.exc import TimeoutError as SQLAlchemyTimeoutError
|
||||
|
||||
from backend.data import sqlalchemy as sa
|
||||
|
||||
try:
|
||||
engine = sa.create_engine()
|
||||
sa.initialize(engine)
|
||||
app.state.db_engine = engine
|
||||
logger.info(
|
||||
f"✓ AgentServer: SQLAlchemy initialized "
|
||||
f"(pool_size={config.sqlalchemy_pool_size}, "
|
||||
f"max_overflow={config.sqlalchemy_max_overflow})"
|
||||
)
|
||||
except OperationalError as e:
|
||||
logger.error(
|
||||
f"Failed to connect to database during SQLAlchemy initialization. "
|
||||
f"Check database connection settings (host, port, credentials). "
|
||||
f"Database URL: {config.database_url.split('@')[-1] if '@' in config.database_url else 'N/A'}. "
|
||||
f"Error: {e}"
|
||||
)
|
||||
raise
|
||||
except SQLAlchemyTimeoutError as e:
|
||||
logger.error(
|
||||
f"Database connection timeout during SQLAlchemy initialization. "
|
||||
f"Timeout setting: {config.sqlalchemy_connect_timeout}s. "
|
||||
f"Check if database is accessible and increase timeout if needed. "
|
||||
f"Error: {e}"
|
||||
)
|
||||
raise
|
||||
except DatabaseError as e:
|
||||
logger.error(
|
||||
f"Database error during SQLAlchemy initialization. "
|
||||
f"Check database permissions and configuration. "
|
||||
f"Error: {e}"
|
||||
)
|
||||
raise
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
f"Unexpected error during SQLAlchemy initialization. "
|
||||
f"Configuration: pool_size={config.sqlalchemy_pool_size}, "
|
||||
f"max_overflow={config.sqlalchemy_max_overflow}, "
|
||||
f"pool_timeout={config.sqlalchemy_pool_timeout}s. "
|
||||
f"Error: {e}",
|
||||
exc_info=True,
|
||||
)
|
||||
raise
|
||||
|
||||
# Configure thread pool for FastAPI sync operation performance
|
||||
# CRITICAL: FastAPI automatically runs ALL sync functions in this thread pool:
|
||||
# - Any endpoint defined with 'def' (not async def)
|
||||
@@ -118,6 +169,30 @@ async def lifespan_context(app: fastapi.FastAPI):
|
||||
except Exception as e:
|
||||
logger.warning(f"Error shutting down cloud storage handler: {e}")
|
||||
|
||||
# Dispose SQLAlchemy if it was enabled
|
||||
if config.enable_sqlalchemy:
|
||||
from sqlalchemy.exc import DatabaseError, OperationalError
|
||||
|
||||
from backend.data import sqlalchemy as sa
|
||||
|
||||
try:
|
||||
await sa.dispose()
|
||||
logger.info("✓ AgentServer: SQLAlchemy disposed")
|
||||
except (OperationalError, DatabaseError) as e:
|
||||
# Log as warning since disposal failures during shutdown are non-critical
|
||||
logger.warning(
|
||||
f"Database error while disposing SQLAlchemy connections. "
|
||||
f"This may leave connections open but won't affect shutdown. "
|
||||
f"Error: {e}"
|
||||
)
|
||||
except Exception as e:
|
||||
logger.warning(
|
||||
f"Unexpected error while disposing SQLAlchemy. "
|
||||
f"Connection pool may not be cleanly released. "
|
||||
f"Error: {e}",
|
||||
exc_info=True,
|
||||
)
|
||||
|
||||
await backend.data.db.disconnect()
|
||||
|
||||
|
||||
|
||||
@@ -65,6 +65,12 @@ class UpdateTrackingModel(BaseModel, Generic[T]):
|
||||
class Config(UpdateTrackingModel["Config"], BaseSettings):
|
||||
"""Config for the server."""
|
||||
|
||||
database_url: str = Field(
|
||||
default="",
|
||||
description="PostgreSQL database connection URL. "
|
||||
"Format: postgresql://user:pass@host:port/db?schema=platform&connect_timeout=60",
|
||||
)
|
||||
|
||||
num_graph_workers: int = Field(
|
||||
default=10,
|
||||
ge=1,
|
||||
@@ -267,6 +273,76 @@ class Config(UpdateTrackingModel["Config"], BaseSettings):
|
||||
description="The pool size for the scheduler database connection pool",
|
||||
)
|
||||
|
||||
# SQLAlchemy Configuration
|
||||
enable_sqlalchemy: bool = Field(
|
||||
default=False,
|
||||
description="Enable SQLAlchemy database connections. Set to true to enable gradual migration from Prisma to SQLAlchemy. "
|
||||
"When disabled, only Prisma is used. When enabled, both ORMs coexist during transition.",
|
||||
)
|
||||
|
||||
sqlalchemy_pool_size: int = Field(
|
||||
default=10,
|
||||
ge=1,
|
||||
le=100,
|
||||
description="Number of persistent connections in the SQLAlchemy pool. "
|
||||
"Environment-specific recommendations: "
|
||||
"Development: 2-3 (lightweight, fast startup), "
|
||||
"Test/CI: 2 (minimal resources, avoid connection exhaustion in parallel tests), "
|
||||
"Production: 10-20 for REST API (high traffic), 3-5 for background workers. "
|
||||
"IMPORTANT: Total connections across ALL services (pool_size + max_overflow per service) "
|
||||
"must not exceed PostgreSQL max_connections (default: 100). "
|
||||
"With 6 processes in production (rest-api, executor, database-manager, scheduler, websocket, comms), "
|
||||
"calculate: 6 × (pool_size + max_overflow) ≤ 100.",
|
||||
)
|
||||
|
||||
sqlalchemy_max_overflow: int = Field(
|
||||
default=5,
|
||||
ge=0,
|
||||
le=50,
|
||||
description="Additional connections beyond pool_size when pool is exhausted. "
|
||||
"Total max connections per service = pool_size + max_overflow. "
|
||||
"Environment-specific recommendations: "
|
||||
"Development: 1-2 (handles occasional bursts), "
|
||||
"Test/CI: 1 (minimal extra connections), "
|
||||
"Production: 5-10 (handles traffic spikes without exhausting pool). "
|
||||
"Setting to 0 means strict pool limit (connections fail when pool is exhausted). "
|
||||
"Higher values provide better burst handling but consume more database connections.",
|
||||
)
|
||||
|
||||
sqlalchemy_pool_timeout: int = Field(
|
||||
default=30,
|
||||
ge=1,
|
||||
le=300,
|
||||
description="Seconds to wait for available connection from pool before raising TimeoutError. "
|
||||
"This timeout applies ONLY when all connections (pool_size + max_overflow) are busy. "
|
||||
"Environment-specific recommendations: "
|
||||
"Development: 10-30s (generous for debugging), "
|
||||
"Test/CI: 5-10s (fail fast in tests), "
|
||||
"Production: 30s (balance between user experience and resource holding). "
|
||||
"If you see frequent TimeoutErrors, either increase pool_size/max_overflow or investigate slow queries. "
|
||||
"NOTE: This is different from sqlalchemy_connect_timeout (which applies when establishing new connections).",
|
||||
)
|
||||
|
||||
sqlalchemy_connect_timeout: int = Field(
|
||||
default=10,
|
||||
ge=1,
|
||||
le=60,
|
||||
description="Seconds to wait when establishing NEW connection to PostgreSQL database. "
|
||||
"This timeout applies at the network/TCP level when creating connections (not when acquiring from pool). "
|
||||
"Environment-specific recommendations: "
|
||||
"Development: 5-10s (local database should connect quickly), "
|
||||
"Test/CI: 5-10s (fail fast if database unavailable), "
|
||||
"Production: 10-15s (account for network latency, especially with cloud databases). "
|
||||
"If you see frequent connection timeout errors during startup, check database accessibility "
|
||||
"and network connectivity. "
|
||||
"NOTE: This is different from sqlalchemy_pool_timeout (which applies when waiting for available connections from pool).",
|
||||
)
|
||||
|
||||
sqlalchemy_echo: bool = Field(
|
||||
default=False,
|
||||
description="Whether to log all SQL statements. Useful for debugging but very verbose. Should be False in production.",
|
||||
)
|
||||
|
||||
rabbitmq_host: str = Field(
|
||||
default="localhost",
|
||||
description="The host for the RabbitMQ server",
|
||||
|
||||
69
autogpt_platform/backend/poetry.lock
generated
69
autogpt_platform/backend/poetry.lock
generated
@@ -311,6 +311,73 @@ files = [
|
||||
{file = "async_timeout-5.0.1.tar.gz", hash = "sha256:d9321a7a3d5a6a5e187e824d2fa0793ce379a202935782d555d6e9d2735677d3"},
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "asyncpg"
|
||||
version = "0.30.0"
|
||||
description = "An asyncio PostgreSQL driver"
|
||||
optional = false
|
||||
python-versions = ">=3.8.0"
|
||||
groups = ["main"]
|
||||
files = [
|
||||
{file = "asyncpg-0.30.0-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:bfb4dd5ae0699bad2b233672c8fc5ccbd9ad24b89afded02341786887e37927e"},
|
||||
{file = "asyncpg-0.30.0-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:dc1f62c792752a49f88b7e6f774c26077091b44caceb1983509edc18a2222ec0"},
|
||||
{file = "asyncpg-0.30.0-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:3152fef2e265c9c24eec4ee3d22b4f4d2703d30614b0b6753e9ed4115c8a146f"},
|
||||
{file = "asyncpg-0.30.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:c7255812ac85099a0e1ffb81b10dc477b9973345793776b128a23e60148dd1af"},
|
||||
{file = "asyncpg-0.30.0-cp310-cp310-musllinux_1_2_aarch64.whl", hash = "sha256:578445f09f45d1ad7abddbff2a3c7f7c291738fdae0abffbeb737d3fc3ab8b75"},
|
||||
{file = "asyncpg-0.30.0-cp310-cp310-musllinux_1_2_x86_64.whl", hash = "sha256:c42f6bb65a277ce4d93f3fba46b91a265631c8df7250592dd4f11f8b0152150f"},
|
||||
{file = "asyncpg-0.30.0-cp310-cp310-win32.whl", hash = "sha256:aa403147d3e07a267ada2ae34dfc9324e67ccc4cdca35261c8c22792ba2b10cf"},
|
||||
{file = "asyncpg-0.30.0-cp310-cp310-win_amd64.whl", hash = "sha256:fb622c94db4e13137c4c7f98834185049cc50ee01d8f657ef898b6407c7b9c50"},
|
||||
{file = "asyncpg-0.30.0-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:5e0511ad3dec5f6b4f7a9e063591d407eee66b88c14e2ea636f187da1dcfff6a"},
|
||||
{file = "asyncpg-0.30.0-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:915aeb9f79316b43c3207363af12d0e6fd10776641a7de8a01212afd95bdf0ed"},
|
||||
{file = "asyncpg-0.30.0-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:1c198a00cce9506fcd0bf219a799f38ac7a237745e1d27f0e1f66d3707c84a5a"},
|
||||
{file = "asyncpg-0.30.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:3326e6d7381799e9735ca2ec9fd7be4d5fef5dcbc3cb555d8a463d8460607956"},
|
||||
{file = "asyncpg-0.30.0-cp311-cp311-musllinux_1_2_aarch64.whl", hash = "sha256:51da377487e249e35bd0859661f6ee2b81db11ad1f4fc036194bc9cb2ead5056"},
|
||||
{file = "asyncpg-0.30.0-cp311-cp311-musllinux_1_2_x86_64.whl", hash = "sha256:bc6d84136f9c4d24d358f3b02be4b6ba358abd09f80737d1ac7c444f36108454"},
|
||||
{file = "asyncpg-0.30.0-cp311-cp311-win32.whl", hash = "sha256:574156480df14f64c2d76450a3f3aaaf26105869cad3865041156b38459e935d"},
|
||||
{file = "asyncpg-0.30.0-cp311-cp311-win_amd64.whl", hash = "sha256:3356637f0bd830407b5597317b3cb3571387ae52ddc3bca6233682be88bbbc1f"},
|
||||
{file = "asyncpg-0.30.0-cp312-cp312-macosx_10_13_x86_64.whl", hash = "sha256:c902a60b52e506d38d7e80e0dd5399f657220f24635fee368117b8b5fce1142e"},
|
||||
{file = "asyncpg-0.30.0-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:aca1548e43bbb9f0f627a04666fedaca23db0a31a84136ad1f868cb15deb6e3a"},
|
||||
{file = "asyncpg-0.30.0-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:6c2a2ef565400234a633da0eafdce27e843836256d40705d83ab7ec42074efb3"},
|
||||
{file = "asyncpg-0.30.0-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:1292b84ee06ac8a2ad8e51c7475aa309245874b61333d97411aab835c4a2f737"},
|
||||
{file = "asyncpg-0.30.0-cp312-cp312-musllinux_1_2_aarch64.whl", hash = "sha256:0f5712350388d0cd0615caec629ad53c81e506b1abaaf8d14c93f54b35e3595a"},
|
||||
{file = "asyncpg-0.30.0-cp312-cp312-musllinux_1_2_x86_64.whl", hash = "sha256:db9891e2d76e6f425746c5d2da01921e9a16b5a71a1c905b13f30e12a257c4af"},
|
||||
{file = "asyncpg-0.30.0-cp312-cp312-win32.whl", hash = "sha256:68d71a1be3d83d0570049cd1654a9bdfe506e794ecc98ad0873304a9f35e411e"},
|
||||
{file = "asyncpg-0.30.0-cp312-cp312-win_amd64.whl", hash = "sha256:9a0292c6af5c500523949155ec17b7fe01a00ace33b68a476d6b5059f9630305"},
|
||||
{file = "asyncpg-0.30.0-cp313-cp313-macosx_10_13_x86_64.whl", hash = "sha256:05b185ebb8083c8568ea8a40e896d5f7af4b8554b64d7719c0eaa1eb5a5c3a70"},
|
||||
{file = "asyncpg-0.30.0-cp313-cp313-macosx_11_0_arm64.whl", hash = "sha256:c47806b1a8cbb0a0db896f4cd34d89942effe353a5035c62734ab13b9f938da3"},
|
||||
{file = "asyncpg-0.30.0-cp313-cp313-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:9b6fde867a74e8c76c71e2f64f80c64c0f3163e687f1763cfaf21633ec24ec33"},
|
||||
{file = "asyncpg-0.30.0-cp313-cp313-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:46973045b567972128a27d40001124fbc821c87a6cade040cfcd4fa8a30bcdc4"},
|
||||
{file = "asyncpg-0.30.0-cp313-cp313-musllinux_1_2_aarch64.whl", hash = "sha256:9110df111cabc2ed81aad2f35394a00cadf4f2e0635603db6ebbd0fc896f46a4"},
|
||||
{file = "asyncpg-0.30.0-cp313-cp313-musllinux_1_2_x86_64.whl", hash = "sha256:04ff0785ae7eed6cc138e73fc67b8e51d54ee7a3ce9b63666ce55a0bf095f7ba"},
|
||||
{file = "asyncpg-0.30.0-cp313-cp313-win32.whl", hash = "sha256:ae374585f51c2b444510cdf3595b97ece4f233fde739aa14b50e0d64e8a7a590"},
|
||||
{file = "asyncpg-0.30.0-cp313-cp313-win_amd64.whl", hash = "sha256:f59b430b8e27557c3fb9869222559f7417ced18688375825f8f12302c34e915e"},
|
||||
{file = "asyncpg-0.30.0-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:29ff1fc8b5bf724273782ff8b4f57b0f8220a1b2324184846b39d1ab4122031d"},
|
||||
{file = "asyncpg-0.30.0-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:64e899bce0600871b55368b8483e5e3e7f1860c9482e7f12e0a771e747988168"},
|
||||
{file = "asyncpg-0.30.0-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:5b290f4726a887f75dcd1b3006f484252db37602313f806e9ffc4e5996cfe5cb"},
|
||||
{file = "asyncpg-0.30.0-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:f86b0e2cd3f1249d6fe6fd6cfe0cd4538ba994e2d8249c0491925629b9104d0f"},
|
||||
{file = "asyncpg-0.30.0-cp38-cp38-musllinux_1_2_aarch64.whl", hash = "sha256:393af4e3214c8fa4c7b86da6364384c0d1b3298d45803375572f415b6f673f38"},
|
||||
{file = "asyncpg-0.30.0-cp38-cp38-musllinux_1_2_x86_64.whl", hash = "sha256:fd4406d09208d5b4a14db9a9dbb311b6d7aeeab57bded7ed2f8ea41aeef39b34"},
|
||||
{file = "asyncpg-0.30.0-cp38-cp38-win32.whl", hash = "sha256:0b448f0150e1c3b96cb0438a0d0aa4871f1472e58de14a3ec320dbb2798fb0d4"},
|
||||
{file = "asyncpg-0.30.0-cp38-cp38-win_amd64.whl", hash = "sha256:f23b836dd90bea21104f69547923a02b167d999ce053f3d502081acea2fba15b"},
|
||||
{file = "asyncpg-0.30.0-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:6f4e83f067b35ab5e6371f8a4c93296e0439857b4569850b178a01385e82e9ad"},
|
||||
{file = "asyncpg-0.30.0-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:5df69d55add4efcd25ea2a3b02025b669a285b767bfbf06e356d68dbce4234ff"},
|
||||
{file = "asyncpg-0.30.0-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:a3479a0d9a852c7c84e822c073622baca862d1217b10a02dd57ee4a7a081f708"},
|
||||
{file = "asyncpg-0.30.0-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:26683d3b9a62836fad771a18ecf4659a30f348a561279d6227dab96182f46144"},
|
||||
{file = "asyncpg-0.30.0-cp39-cp39-musllinux_1_2_aarch64.whl", hash = "sha256:1b982daf2441a0ed314bd10817f1606f1c28b1136abd9e4f11335358c2c631cb"},
|
||||
{file = "asyncpg-0.30.0-cp39-cp39-musllinux_1_2_x86_64.whl", hash = "sha256:1c06a3a50d014b303e5f6fc1e5f95eb28d2cee89cf58384b700da621e5d5e547"},
|
||||
{file = "asyncpg-0.30.0-cp39-cp39-win32.whl", hash = "sha256:1b11a555a198b08f5c4baa8f8231c74a366d190755aa4f99aacec5970afe929a"},
|
||||
{file = "asyncpg-0.30.0-cp39-cp39-win_amd64.whl", hash = "sha256:8b684a3c858a83cd876f05958823b68e8d14ec01bb0c0d14a6704c5bf9711773"},
|
||||
{file = "asyncpg-0.30.0.tar.gz", hash = "sha256:c551e9928ab6707602f44811817f82ba3c446e018bfe1d3abecc8ba5f3eac851"},
|
||||
]
|
||||
|
||||
[package.dependencies]
|
||||
async-timeout = {version = ">=4.0.3", markers = "python_version < \"3.11.0\""}
|
||||
|
||||
[package.extras]
|
||||
docs = ["Sphinx (>=8.1.3,<8.2.0)", "sphinx-rtd-theme (>=1.2.2)"]
|
||||
gssauth = ["gssapi ; platform_system != \"Windows\"", "sspilib ; platform_system == \"Windows\""]
|
||||
test = ["distro (>=1.9.0,<1.10.0)", "flake8 (>=6.1,<7.0)", "flake8-pyi (>=24.1.0,<24.2.0)", "gssapi ; platform_system == \"Linux\"", "k5test ; platform_system == \"Linux\"", "mypy (>=1.8.0,<1.9.0)", "sspilib ; platform_system == \"Windows\"", "uvloop (>=0.15.3) ; platform_system != \"Windows\" and python_version < \"3.14.0\""]
|
||||
|
||||
[[package]]
|
||||
name = "attrs"
|
||||
version = "25.3.0"
|
||||
@@ -7279,4 +7346,4 @@ cffi = ["cffi (>=1.11)"]
|
||||
[metadata]
|
||||
lock-version = "2.1"
|
||||
python-versions = ">=3.10,<3.14"
|
||||
content-hash = "34c27fd178dc9a2837e2865c475b7b0332cb88b73adefe3a5dbbaafa1fb4a3a1"
|
||||
content-hash = "1f4a80587f7b0a10a945f87906120427ff75c109d66fc64980239f7777db11b0"
|
||||
|
||||
@@ -14,6 +14,7 @@ aiohttp = "^3.10.0"
|
||||
aiodns = "^3.5.0"
|
||||
anthropic = "^0.59.0"
|
||||
apscheduler = "^3.11.1"
|
||||
asyncpg = "^0.30.0"
|
||||
autogpt-libs = { path = "../autogpt_libs", develop = true }
|
||||
bleach = { extras = ["css"], version = "^6.2.0" }
|
||||
click = "^8.2.0"
|
||||
|
||||
Reference in New Issue
Block a user