diff --git a/autogpt_platform/backend/backend/monitoring/instrumentation.py b/autogpt_platform/backend/backend/monitoring/instrumentation.py new file mode 100644 index 0000000000..898324deaa --- /dev/null +++ b/autogpt_platform/backend/backend/monitoring/instrumentation.py @@ -0,0 +1,287 @@ +""" +Prometheus instrumentation for FastAPI services. + +This module provides centralized metrics collection and instrumentation +for all FastAPI services in the AutoGPT platform. +""" + +import logging +from typing import Optional + +from fastapi import FastAPI +from prometheus_client import Counter, Gauge, Histogram, Info +from prometheus_fastapi_instrumentator import Instrumentator, metrics + +logger = logging.getLogger(__name__) + +# Custom business metrics with controlled cardinality +GRAPH_EXECUTIONS = Counter( + "autogpt_graph_executions_total", + "Total number of graph executions", + labelnames=[ + "status" + ], # Removed graph_id and user_id to prevent cardinality explosion +) + +GRAPH_EXECUTIONS_BY_USER = Counter( + "autogpt_graph_executions_by_user_total", + "Total number of graph executions by user (sampled)", + labelnames=["status"], # Only status, user_id tracked separately when needed +) + +BLOCK_EXECUTIONS = Counter( + "autogpt_block_executions_total", + "Total number of block executions", + labelnames=["block_type", "status"], # block_type is bounded +) + +BLOCK_DURATION = Histogram( + "autogpt_block_duration_seconds", + "Duration of block executions in seconds", + labelnames=["block_type"], + buckets=[0.1, 0.25, 0.5, 1, 2.5, 5, 10, 30, 60], +) + +WEBSOCKET_CONNECTIONS = Gauge( + "autogpt_websocket_connections_total", + "Total number of active WebSocket connections", + # Removed user_id label - track total only to prevent cardinality explosion +) + +SCHEDULER_JOBS = Gauge( + "autogpt_scheduler_jobs", + "Current number of scheduled jobs", + labelnames=["job_type", "status"], +) + +DATABASE_QUERIES = Histogram( + "autogpt_database_query_duration_seconds", + "Duration of database queries in seconds", + labelnames=["operation", "table"], + buckets=[0.01, 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5], +) + +RABBITMQ_MESSAGES = Counter( + "autogpt_rabbitmq_messages_total", + "Total number of RabbitMQ messages", + labelnames=["queue", "status"], +) + +AUTHENTICATION_ATTEMPTS = Counter( + "autogpt_auth_attempts_total", + "Total number of authentication attempts", + labelnames=["method", "status"], +) + +API_KEY_USAGE = Counter( + "autogpt_api_key_usage_total", + "API key usage by provider", + labelnames=["provider", "block_type", "status"], +) + +# Function/operation level metrics with controlled cardinality +GRAPH_OPERATIONS = Counter( + "autogpt_graph_operations_total", + "Graph operations by type", + labelnames=["operation", "status"], # create, update, delete, execute, etc. +) + +USER_OPERATIONS = Counter( + "autogpt_user_operations_total", + "User operations by type", + labelnames=["operation", "status"], # login, register, update_profile, etc. +) + +RATE_LIMIT_HITS = Counter( + "autogpt_rate_limit_hits_total", + "Number of rate limit hits", + labelnames=["endpoint"], # Removed user_id to prevent cardinality explosion +) + +SERVICE_INFO = Info( + "autogpt_service", + "Service information", +) + + +def instrument_fastapi( + app: FastAPI, + service_name: str, + expose_endpoint: bool = True, + endpoint: str = "/metrics", + include_in_schema: bool = False, + excluded_handlers: Optional[list] = None, +) -> Instrumentator: + """ + Instrument a FastAPI application with Prometheus metrics. + + Args: + app: FastAPI application instance + service_name: Name of the service for metrics labeling + expose_endpoint: Whether to expose /metrics endpoint + endpoint: Path for metrics endpoint + include_in_schema: Whether to include metrics endpoint in OpenAPI schema + excluded_handlers: List of paths to exclude from metrics + + Returns: + Configured Instrumentator instance + """ + + # Set service info + try: + from importlib.metadata import version + + service_version = version("autogpt-platform-backend") + except Exception: + service_version = "unknown" + + SERVICE_INFO.info( + { + "service": service_name, + "version": service_version, + } + ) + + # Create instrumentator with default metrics + instrumentator = Instrumentator( + should_group_status_codes=True, + should_ignore_untemplated=True, + should_respect_env_var=True, + should_instrument_requests_inprogress=True, + excluded_handlers=excluded_handlers or ["/health", "/readiness"], + env_var_name="ENABLE_METRICS", + inprogress_name="autogpt_http_requests_inprogress", + inprogress_labels=True, + ) + + # Add default HTTP metrics + instrumentator.add( + metrics.default( + metric_namespace="autogpt", + metric_subsystem=service_name.replace("-", "_"), + ) + ) + + # Add request size metrics + instrumentator.add( + metrics.request_size( + metric_namespace="autogpt", + metric_subsystem=service_name.replace("-", "_"), + ) + ) + + # Add response size metrics + instrumentator.add( + metrics.response_size( + metric_namespace="autogpt", + metric_subsystem=service_name.replace("-", "_"), + ) + ) + + # Add latency metrics with custom buckets for better granularity + instrumentator.add( + metrics.latency( + metric_namespace="autogpt", + metric_subsystem=service_name.replace("-", "_"), + buckets=[0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5, 10, 30, 60], + ) + ) + + # Add combined metrics (requests by method and status) + instrumentator.add( + metrics.combined_size( + metric_namespace="autogpt", + metric_subsystem=service_name.replace("-", "_"), + ) + ) + + # Instrument the app + instrumentator.instrument(app) + + # Expose metrics endpoint if requested + if expose_endpoint: + instrumentator.expose( + app, + endpoint=endpoint, + include_in_schema=include_in_schema, + tags=["monitoring"] if include_in_schema else None, + ) + logger.info(f"Metrics endpoint exposed at {endpoint} for {service_name}") + + return instrumentator + + +def record_graph_execution(graph_id: str, status: str, user_id: str): + """Record a graph execution event. + + Args: + graph_id: Graph identifier (kept for future sampling/debugging) + status: Execution status (success/error/validation_error) + user_id: User identifier (kept for future sampling/debugging) + """ + # Track overall executions without high-cardinality labels + GRAPH_EXECUTIONS.labels(status=status).inc() + + # Optionally track per-user executions (implement sampling if needed) + # For now, just track status to avoid cardinality explosion + GRAPH_EXECUTIONS_BY_USER.labels(status=status).inc() + + +def record_block_execution(block_type: str, status: str, duration: float): + """Record a block execution event with duration.""" + BLOCK_EXECUTIONS.labels(block_type=block_type, status=status).inc() + BLOCK_DURATION.labels(block_type=block_type).observe(duration) + + +def update_websocket_connections(user_id: str, delta: int): + """Update the number of active WebSocket connections. + + Args: + user_id: User identifier (kept for future sampling/debugging) + delta: Change in connection count (+1 for connect, -1 for disconnect) + """ + # Track total connections without user_id to prevent cardinality explosion + if delta > 0: + WEBSOCKET_CONNECTIONS.inc(delta) + else: + WEBSOCKET_CONNECTIONS.dec(abs(delta)) + + +def record_database_query(operation: str, table: str, duration: float): + """Record a database query with duration.""" + DATABASE_QUERIES.labels(operation=operation, table=table).observe(duration) + + +def record_rabbitmq_message(queue: str, status: str): + """Record a RabbitMQ message event.""" + RABBITMQ_MESSAGES.labels(queue=queue, status=status).inc() + + +def record_authentication_attempt(method: str, status: str): + """Record an authentication attempt.""" + AUTHENTICATION_ATTEMPTS.labels(method=method, status=status).inc() + + +def record_api_key_usage(provider: str, block_type: str, status: str): + """Record API key usage by provider and block.""" + API_KEY_USAGE.labels(provider=provider, block_type=block_type, status=status).inc() + + +def record_rate_limit_hit(endpoint: str, user_id: str): + """Record a rate limit hit. + + Args: + endpoint: API endpoint that was rate limited + user_id: User identifier (kept for future sampling/debugging) + """ + RATE_LIMIT_HITS.labels(endpoint=endpoint).inc() + + +def record_graph_operation(operation: str, status: str): + """Record a graph operation (create, update, delete, execute, etc.).""" + GRAPH_OPERATIONS.labels(operation=operation, status=status).inc() + + +def record_user_operation(operation: str, status: str): + """Record a user operation (login, register, etc.).""" + USER_OPERATIONS.labels(operation=operation, status=status).inc() diff --git a/autogpt_platform/backend/backend/server/external/api.py b/autogpt_platform/backend/backend/server/external/api.py index 8ed15c1385..ee2cec2fa1 100644 --- a/autogpt_platform/backend/backend/server/external/api.py +++ b/autogpt_platform/backend/backend/server/external/api.py @@ -1,5 +1,6 @@ from fastapi import FastAPI +from backend.monitoring.instrumentation import instrument_fastapi from backend.server.middleware.security import SecurityHeadersMiddleware from .routes.v1 import v1_router @@ -13,3 +14,12 @@ external_app = FastAPI( external_app.add_middleware(SecurityHeadersMiddleware) external_app.include_router(v1_router, prefix="/v1") + +# Add Prometheus instrumentation +instrument_fastapi( + external_app, + service_name="external-api", + expose_endpoint=True, + endpoint="/metrics", + include_in_schema=True, +) diff --git a/autogpt_platform/backend/backend/server/rest_api.py b/autogpt_platform/backend/backend/server/rest_api.py index db2796c50f..1c493f7574 100644 --- a/autogpt_platform/backend/backend/server/rest_api.py +++ b/autogpt_platform/backend/backend/server/rest_api.py @@ -36,6 +36,7 @@ import backend.util.settings from backend.blocks.llm import LlmModel from backend.data.model import Credentials from backend.integrations.providers import ProviderName +from backend.monitoring.instrumentation import instrument_fastapi from backend.server.external.api import external_app from backend.server.middleware.security import SecurityHeadersMiddleware from backend.util import json @@ -139,6 +140,16 @@ app.add_middleware(SecurityHeadersMiddleware) # Add 401 responses to authenticated endpoints in OpenAPI spec add_auth_responses_to_openapi(app) +# Add Prometheus instrumentation +instrument_fastapi( + app, + service_name="rest-api", + expose_endpoint=True, + endpoint="/metrics", + include_in_schema=settings.config.app_env + == backend.util.settings.AppEnvironment.LOCAL, +) + def handle_internal_http_error(status_code: int = 500, log_error: bool = True): def handler(request: fastapi.Request, exc: Exception): diff --git a/autogpt_platform/backend/backend/server/routers/v1.py b/autogpt_platform/backend/backend/server/routers/v1.py index 633c5ea236..d512b8601c 100644 --- a/autogpt_platform/backend/backend/server/routers/v1.py +++ b/autogpt_platform/backend/backend/server/routers/v1.py @@ -1,6 +1,7 @@ import asyncio import base64 import logging +import time from collections import defaultdict from datetime import datetime from typing import Annotated, Any, Sequence @@ -63,6 +64,11 @@ from backend.integrations.webhooks.graph_lifecycle_hooks import ( on_graph_activate, on_graph_deactivate, ) +from backend.monitoring.instrumentation import ( + record_block_execution, + record_graph_execution, + record_graph_operation, +) from backend.server.model import ( CreateAPIKeyRequest, CreateAPIKeyResponse, @@ -96,6 +102,7 @@ def _create_file_size_error(size_bytes: int, max_size_mb: int) -> HTTPException: settings = Settings() logger = logging.getLogger(__name__) + _user_credit_model = get_user_credit_model() # Define the API routes @@ -279,10 +286,26 @@ async def execute_graph_block(block_id: str, data: BlockInput) -> CompletedBlock if not obj: raise HTTPException(status_code=404, detail=f"Block #{block_id} not found.") - output = defaultdict(list) - async for name, data in obj.execute(data): - output[name].append(data) - return output + start_time = time.time() + try: + output = defaultdict(list) + async for name, data in obj.execute(data): + output[name].append(data) + + # Record successful block execution with duration + duration = time.time() - start_time + block_type = obj.__class__.__name__ + record_block_execution( + block_type=block_type, status="success", duration=duration + ) + + return output + except Exception: + # Record failed block execution + duration = time.time() - start_time + block_type = obj.__class__.__name__ + record_block_execution(block_type=block_type, status="error", duration=duration) + raise @v1_router.post( @@ -778,7 +801,7 @@ async def execute_graph( ) try: - return await execution_utils.add_graph_execution( + result = await execution_utils.add_graph_execution( graph_id=graph_id, user_id=user_id, inputs=inputs, @@ -786,7 +809,16 @@ async def execute_graph( graph_version=graph_version, graph_credentials_inputs=credentials_inputs, ) + # Record successful graph execution + record_graph_execution(graph_id=graph_id, status="success", user_id=user_id) + record_graph_operation(operation="execute", status="success") + return result except GraphValidationError as e: + # Record failed graph execution + record_graph_execution( + graph_id=graph_id, status="validation_error", user_id=user_id + ) + record_graph_operation(operation="execute", status="validation_error") # Return structured validation errors that the frontend can parse raise HTTPException( status_code=400, @@ -797,6 +829,11 @@ async def execute_graph( "node_errors": e.node_errors, }, ) + except Exception: + # Record any other failures + record_graph_execution(graph_id=graph_id, status="error", user_id=user_id) + record_graph_operation(operation="execute", status="error") + raise @v1_router.post( diff --git a/autogpt_platform/backend/backend/server/ws_api.py b/autogpt_platform/backend/backend/server/ws_api.py index ac45524684..dc8a64d79f 100644 --- a/autogpt_platform/backend/backend/server/ws_api.py +++ b/autogpt_platform/backend/backend/server/ws_api.py @@ -11,6 +11,10 @@ from starlette.middleware.cors import CORSMiddleware from backend.data.execution import AsyncRedisExecutionEventBus from backend.data.user import DEFAULT_USER_ID +from backend.monitoring.instrumentation import ( + instrument_fastapi, + update_websocket_connections, +) from backend.server.conn_manager import ConnectionManager from backend.server.model import ( WSMessage, @@ -38,6 +42,15 @@ docs_url = "/docs" if settings.config.app_env == AppEnvironment.LOCAL else None app = FastAPI(lifespan=lifespan, docs_url=docs_url) _connection_manager = None +# Add Prometheus instrumentation +instrument_fastapi( + app, + service_name="websocket-server", + expose_endpoint=True, + endpoint="/metrics", + include_in_schema=settings.config.app_env == AppEnvironment.LOCAL, +) + def get_connection_manager(): global _connection_manager @@ -216,6 +229,10 @@ async def websocket_router( if not user_id: return await manager.connect_socket(websocket) + + # Track WebSocket connection + update_websocket_connections(user_id, 1) + try: while True: data = await websocket.receive_text() @@ -286,6 +303,8 @@ async def websocket_router( except WebSocketDisconnect: manager.disconnect_socket(websocket) logger.debug("WebSocket client disconnected") + finally: + update_websocket_connections(user_id, -1) @app.get("/") diff --git a/autogpt_platform/backend/poetry.lock b/autogpt_platform/backend/poetry.lock index cfb05fae17..059eb1d14f 100644 --- a/autogpt_platform/backend/poetry.lock +++ b/autogpt_platform/backend/poetry.lock @@ -4145,6 +4145,22 @@ files = [ [package.extras] twisted = ["twisted"] +[[package]] +name = "prometheus-fastapi-instrumentator" +version = "7.1.0" +description = "Instrument your FastAPI app with Prometheus metrics" +optional = false +python-versions = ">=3.8" +groups = ["main"] +files = [ + {file = "prometheus_fastapi_instrumentator-7.1.0-py3-none-any.whl", hash = "sha256:978130f3c0bb7b8ebcc90d35516a6fe13e02d2eb358c8f83887cdef7020c31e9"}, + {file = "prometheus_fastapi_instrumentator-7.1.0.tar.gz", hash = "sha256:be7cd61eeea4e5912aeccb4261c6631b3f227d8924542d79eaf5af3f439cbe5e"}, +] + +[package.dependencies] +prometheus-client = ">=0.8.0,<1.0.0" +starlette = ">=0.30.0,<1.0.0" + [[package]] name = "propcache" version = "0.3.2" @@ -7143,4 +7159,4 @@ cffi = ["cffi (>=1.11)"] [metadata] lock-version = "2.1" python-versions = ">=3.10,<3.14" -content-hash = "80d4dc2cbcd1ae33b2fa3920db5dcb1f82ad252d1e4a8bfeba8b2f2eebbdda0d" +content-hash = "2c7e9370f500039b99868376021627c5a120e0ee31c5c5e6de39db2c3d82f414" diff --git a/autogpt_platform/backend/pyproject.toml b/autogpt_platform/backend/pyproject.toml index dc568000f8..13d6591e25 100644 --- a/autogpt_platform/backend/pyproject.toml +++ b/autogpt_platform/backend/pyproject.toml @@ -45,6 +45,7 @@ postmarker = "^1.0" praw = "~7.8.1" prisma = "^0.15.0" prometheus-client = "^0.22.1" +prometheus-fastapi-instrumentator = "^7.0.0" psutil = "^7.0.0" psycopg2-binary = "^2.9.10" pydantic = { extras = ["email"], version = "^2.11.7" }