Compare commits

...

1 Commits

Author SHA1 Message Date
seer-by-sentry[bot]
fb0d7fa31e feat(backend): Add admin diagnostics routes for execution monitoring 2025-11-03 20:32:20 +00:00
3 changed files with 250 additions and 0 deletions

View File

@@ -0,0 +1,147 @@
"""
Diagnostics module for monitoring and troubleshooting execution status.
"""
import logging
from datetime import datetime
from typing import Optional
from prisma.models import AgentGraphExecution
from pydantic import BaseModel
from backend.data.execution import ExecutionStatus
logger = logging.getLogger(__name__)
class RunningExecutionDetails(BaseModel):
"""Details about a running execution for diagnostics."""
execution_id: str
graph_id: str
graph_name: str
graph_version: int
user_id: str
user_email: Optional[str]
status: str
started_at: Optional[datetime]
queue_status: Optional[str] = None
class ExecutionDiagnostics(BaseModel):
"""Overall execution diagnostics information."""
total_running: int
total_queued: int
total_incomplete: int
async def get_running_executions_details(
limit: int = 10,
offset: int = 0,
) -> list[RunningExecutionDetails]:
"""
Get detailed information about currently running executions.
Args:
limit: Maximum number of executions to return
offset: Number of executions to skip
Returns:
List of running execution details
Raises:
Exception: If there's an error retrieving execution details
"""
try:
# Query for running and queued executions
executions = await AgentGraphExecution.prisma().find_many(
where={
"isDeleted": False,
"OR": [
{"executionStatus": ExecutionStatus.RUNNING},
{"executionStatus": ExecutionStatus.QUEUED},
],
},
include={
"AgentGraph": True,
"User": True,
},
order={"createdAt": "desc"},
skip=offset,
take=limit,
)
result = []
for exec in executions:
# Convert string executionStatus to enum if needed, then to string for response
# The database field executionStatus is a string, not an enum
status_value = exec.executionStatus
if isinstance(status_value, str):
# It's already a string, use it directly
status_str = status_value
else:
# It's an enum, get the value
status_str = status_value.value
result.append(
RunningExecutionDetails(
execution_id=exec.id,
graph_id=exec.agentGraphId,
graph_name=exec.AgentGraph.name if exec.AgentGraph else "Unknown",
graph_version=exec.agentGraphVersion,
user_id=exec.userId,
user_email=exec.User.email if exec.User else None,
status=status_str,
started_at=exec.startedAt,
queue_status=(
exec.queueStatus if hasattr(exec, "queueStatus") else None
),
)
)
return result
except Exception as e:
logger.error(f"Error getting running execution details: {e}")
raise
async def get_execution_diagnostics() -> ExecutionDiagnostics:
"""
Get overall execution diagnostics information.
Returns:
ExecutionDiagnostics with counts of executions by status
"""
try:
running_count = await AgentGraphExecution.prisma().count(
where={
"isDeleted": False,
"executionStatus": ExecutionStatus.RUNNING,
}
)
queued_count = await AgentGraphExecution.prisma().count(
where={
"isDeleted": False,
"executionStatus": ExecutionStatus.QUEUED,
}
)
incomplete_count = await AgentGraphExecution.prisma().count(
where={
"isDeleted": False,
"executionStatus": ExecutionStatus.INCOMPLETE,
}
)
return ExecutionDiagnostics(
total_running=running_count,
total_queued=queued_count,
total_incomplete=incomplete_count,
)
except Exception as e:
logger.error(f"Error getting execution diagnostics: {e}")
raise

View File

@@ -24,6 +24,7 @@ import backend.integrations.webhooks.utils
import backend.server.routers.postmark.postmark
import backend.server.routers.v1
import backend.server.v2.admin.credit_admin_routes
import backend.server.v2.admin.diagnostics_admin_routes
import backend.server.v2.admin.store_admin_routes
import backend.server.v2.builder
import backend.server.v2.builder.routes
@@ -267,6 +268,11 @@ app.include_router(
tags=["v2", "admin"],
prefix="/api/credits",
)
app.include_router(
backend.server.v2.admin.diagnostics_admin_routes.router,
tags=["v2", "admin"],
prefix="/api",
)
app.include_router(
backend.server.v2.library.routes.router, tags=["v2"], prefix="/api/library"
)

View File

@@ -0,0 +1,97 @@
"""
Admin routes for system diagnostics and monitoring.
"""
import logging
from autogpt_libs.auth import requires_admin_user
from fastapi import APIRouter, HTTPException, Query, Security
from backend.data.diagnostics import (
ExecutionDiagnostics,
RunningExecutionDetails,
get_execution_diagnostics,
get_running_executions_details,
)
logger = logging.getLogger(__name__)
router = APIRouter(
prefix="/admin/diagnostics",
tags=["diagnostics", "admin"],
dependencies=[Security(requires_admin_user)],
)
@router.get(
"/executions/running",
response_model=list[RunningExecutionDetails],
summary="List Running Executions",
)
async def list_running_executions(
limit: int = Query(default=10, ge=1, le=100),
offset: int = Query(default=0, ge=0),
):
"""
Get a list of currently running or queued executions with detailed information.
Args:
limit: Maximum number of executions to return (1-100)
offset: Number of executions to skip for pagination
Returns:
List of running executions with details
"""
try:
logger.info(f"Listing running executions (limit={limit}, offset={offset})")
executions = await get_running_executions_details(limit=limit, offset=offset)
# Get total count for pagination
from backend.data.diagnostics import get_execution_diagnostics as get_diag
diagnostics = await get_diag()
total_count = diagnostics.total_running + diagnostics.total_queued
logger.info(
f"Found {len(executions)} running executions (total: {total_count})"
)
return executions
except Exception as e:
logger.error(f"Error listing running executions: {e}")
raise HTTPException(
status_code=500,
detail=f"Error listing running executions: {str(e)}",
)
@router.get(
"/executions/stats",
response_model=ExecutionDiagnostics,
summary="Get Execution Statistics",
)
async def get_execution_stats():
"""
Get overall statistics about execution statuses.
Returns:
Execution diagnostics with counts by status
"""
try:
logger.info("Getting execution statistics")
diagnostics = await get_execution_diagnostics()
logger.info(
f"Execution stats - Running: {diagnostics.total_running}, "
f"Queued: {diagnostics.total_queued}, "
f"Incomplete: {diagnostics.total_incomplete}"
)
return diagnostics
except Exception as e:
logger.error(f"Error getting execution statistics: {e}")
raise HTTPException(
status_code=500,
detail=f"Error getting execution statistics: {str(e)}",
)