mirror of
https://github.com/Significant-Gravitas/AutoGPT.git
synced 2026-04-08 03:00:28 -04:00
feat(backend): Reduce excessive application log, Make DB configurable, Set DB connection limit & timeout (#9605)
It's hard to configure the DB parameter at the moment, this PR gives an option to provide URL parameter on the Prisma DATABASE_URL. ### Changes 🏗️ Changes: * Reduce excessive application log * Make DB configurable * Set DB connection limit & timeout ### Checklist 📋 #### For code changes: - [x] I have clearly listed my changes in the PR description - [x] I have made a test plan - [x] I have tested my changes according to the test plan:
This commit is contained in:
@@ -2,9 +2,18 @@ DB_USER=postgres
|
||||
DB_PASS=your-super-secret-and-long-postgres-password
|
||||
DB_NAME=postgres
|
||||
DB_PORT=5432
|
||||
DATABASE_URL="postgresql://${DB_USER}:${DB_PASS}@localhost:${DB_PORT}/${DB_NAME}?connect_timeout=60&schema=platform"
|
||||
DB_HOST=localhost
|
||||
DB_CONNECTION_LIMIT=12
|
||||
DB_CONNECT_TIMEOUT=60
|
||||
DB_POOL_TIMEOUT=300
|
||||
DB_SCHEMA=platform
|
||||
DATABASE_URL="postgresql://${DB_USER}:${DB_PASS}@${DB_HOST}:${DB_PORT}/${DB_NAME}?schema=${DB_SCHEMA}&connect_timeout=${DB_CONNECT_TIMEOUT}"
|
||||
PRISMA_SCHEMA="postgres/schema.prisma"
|
||||
|
||||
# EXECUTOR
|
||||
NUM_GRAPH_WORKERS=10
|
||||
NUM_NODE_WORKERS=3
|
||||
|
||||
BACKEND_CORS_ALLOW_ORIGINS=["http://localhost:3000"]
|
||||
|
||||
# generate using `from cryptography.fernet import Fernet;Fernet.generate_key().decode()`
|
||||
|
||||
@@ -2,6 +2,7 @@ import logging
|
||||
import os
|
||||
import zlib
|
||||
from contextlib import asynccontextmanager
|
||||
from urllib.parse import parse_qsl, urlencode, urlparse, urlunparse
|
||||
from uuid import uuid4
|
||||
|
||||
from dotenv import load_dotenv
|
||||
@@ -15,7 +16,38 @@ load_dotenv()
|
||||
PRISMA_SCHEMA = os.getenv("PRISMA_SCHEMA", "schema.prisma")
|
||||
os.environ["PRISMA_SCHEMA_PATH"] = PRISMA_SCHEMA
|
||||
|
||||
prisma = Prisma(auto_register=True)
|
||||
|
||||
def add_param(url: str, key: str, value: str) -> str:
|
||||
p = urlparse(url)
|
||||
qs = dict(parse_qsl(p.query))
|
||||
qs[key] = value
|
||||
return urlunparse(p._replace(query=urlencode(qs)))
|
||||
|
||||
|
||||
DATABASE_URL = os.getenv("DATABASE_URL")
|
||||
if not DATABASE_URL:
|
||||
raise ValueError("DATABASE_URL is not set.")
|
||||
|
||||
CONN_LIMIT = os.getenv("DB_CONNECTION_LIMIT")
|
||||
if CONN_LIMIT:
|
||||
DATABASE_URL = add_param(DATABASE_URL, "connection_limit", CONN_LIMIT)
|
||||
|
||||
CONN_TIMEOUT = os.getenv("DB_CONNECT_TIMEOUT")
|
||||
if CONN_TIMEOUT:
|
||||
DATABASE_URL = add_param(DATABASE_URL, "connect_timeout", CONN_TIMEOUT)
|
||||
|
||||
POOL_TIMEOUT = os.getenv("DB_POOL_TIMEOUT")
|
||||
if POOL_TIMEOUT:
|
||||
DATABASE_URL = add_param(DATABASE_URL, "pool_timeout", POOL_TIMEOUT)
|
||||
|
||||
HTTP_TIMEOUT = int(POOL_TIMEOUT) if POOL_TIMEOUT else None
|
||||
|
||||
prisma = Prisma(
|
||||
auto_register=True,
|
||||
http={"timeout": HTTP_TIMEOUT},
|
||||
datasource={"url": DATABASE_URL},
|
||||
)
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@@ -35,7 +35,7 @@ class BaseRedisEventBus(Generic[M], ABC):
|
||||
def _serialize_message(self, item: M, channel_key: str) -> tuple[str, str]:
|
||||
message = json.dumps(item.model_dump(), cls=DateTimeEncoder)
|
||||
channel_name = f"{self.event_bus_name}/{channel_key}"
|
||||
logger.info(f"[{channel_name}] Publishing an event to Redis {message}")
|
||||
logger.debug(f"[{channel_name}] Publishing an event to Redis {message}")
|
||||
return message, channel_name
|
||||
|
||||
def _deserialize_message(self, msg: Any, channel_key: str) -> M | None:
|
||||
@@ -44,7 +44,7 @@ class BaseRedisEventBus(Generic[M], ABC):
|
||||
return None
|
||||
try:
|
||||
data = json.loads(msg["data"])
|
||||
logger.info(f"Consuming an event from Redis {data}")
|
||||
logger.debug(f"Consuming an event from Redis {data}")
|
||||
return self.Model(**data)
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to parse event result from Redis {msg} {e}")
|
||||
|
||||
@@ -109,7 +109,10 @@ class LogMetadata:
|
||||
logger.exception(msg, extra={"json_fields": {**self.metadata, **extra}})
|
||||
|
||||
def _wrap(self, msg: str, **extra):
|
||||
return f"{self.prefix} {msg} {extra}"
|
||||
extra_msg = str(extra or "")
|
||||
if len(extra_msg) > 1000:
|
||||
extra_msg = extra_msg[:1000] + "..."
|
||||
return f"{self.prefix} {msg} {extra_msg}"
|
||||
|
||||
|
||||
T = TypeVar("T")
|
||||
|
||||
@@ -56,6 +56,7 @@ config = Config()
|
||||
api_host = config.pyro_host
|
||||
api_comm_retry = config.pyro_client_comm_retry
|
||||
api_comm_timeout = config.pyro_client_comm_timeout
|
||||
api_call_timeout = config.rpc_client_call_timeout
|
||||
pyro_config.MAX_RETRIES = api_comm_retry # type: ignore
|
||||
pyro_config.COMMTIMEOUT = api_comm_timeout # type: ignore
|
||||
|
||||
@@ -264,7 +265,11 @@ class FastApiAppService(BaseAppService, ABC):
|
||||
def _handle_internal_http_error(status_code: int = 500, log_error: bool = True):
|
||||
def handler(request: Request, exc: Exception):
|
||||
if log_error:
|
||||
logger.exception(f"{request.method} {request.url.path} failed: {exc}")
|
||||
if status_code == 500:
|
||||
log = logger.exception
|
||||
else:
|
||||
log = logger.error
|
||||
log(f"{request.method} {request.url.path} failed: {exc}")
|
||||
return responses.JSONResponse(
|
||||
status_code=status_code,
|
||||
content=RemoteCallError(
|
||||
@@ -429,7 +434,10 @@ def fastapi_close_service_client(client: Any) -> None:
|
||||
|
||||
|
||||
@conn_retry("FastAPI client", "Creating service client", max_retry=api_comm_retry)
|
||||
def fastapi_get_service_client(service_type: Type[AS]) -> AS:
|
||||
def fastapi_get_service_client(
|
||||
service_type: Type[AS],
|
||||
call_timeout: int | None = api_call_timeout,
|
||||
) -> AS:
|
||||
class DynamicClient:
|
||||
def __init__(self):
|
||||
host = service_type.get_host()
|
||||
@@ -437,7 +445,7 @@ def fastapi_get_service_client(service_type: Type[AS]) -> AS:
|
||||
self.base_url = f"http://{host}:{port}".rstrip("/")
|
||||
self.client = httpx.Client(
|
||||
base_url=self.base_url,
|
||||
timeout=api_comm_timeout,
|
||||
timeout=call_timeout,
|
||||
)
|
||||
|
||||
def _call_method(self, method_name: str, **kwargs) -> Any:
|
||||
|
||||
@@ -81,6 +81,10 @@ class Config(UpdateTrackingModel["Config"], BaseSettings):
|
||||
default=3,
|
||||
description="The default number of retries for Pyro client connections.",
|
||||
)
|
||||
rpc_client_call_timeout: int = Field(
|
||||
default=300,
|
||||
description="The default timeout in seconds, for RPC client calls.",
|
||||
)
|
||||
enable_auth: bool = Field(
|
||||
default=True,
|
||||
description="If authentication is enabled or not",
|
||||
|
||||
@@ -1,4 +1,3 @@
|
||||
// THIS FILE IS AUTO-GENERATED, RUN `poetry run schema` TO UPDATE
|
||||
datasource db {
|
||||
provider = "postgresql"
|
||||
url = env("DATABASE_URL")
|
||||
|
||||
Reference in New Issue
Block a user