From cc1a2cd82912cc452e477f75e5a8404d79279c70 Mon Sep 17 00:00:00 2001 From: "claude[bot]" <41898282+claude[bot]@users.noreply.github.com> Date: Wed, 22 Oct 2025 07:09:34 +0000 Subject: [PATCH] feat(admin): Add execution management table with stop functionality - Created backend/data/diagnostics.py following Option B data layer pattern - Refactored diagnostics_admin_routes.py to use the new data layer - Added endpoints for listing running executions with details - Added endpoints for stopping executions (single and bulk) - Created ExecutionsTable component with multi-select and stop buttons - Integrated execution management table into diagnostics page Co-authored-by: Nicholas Tindle --- .../backend/backend/data/diagnostics.py | 252 +++++++++++ .../v2/admin/diagnostics_admin_routes.py | 225 ++++++---- .../components/DiagnosticsContent.tsx | 4 + .../components/ExecutionsTable.tsx | 400 ++++++++++++++++++ 4 files changed, 800 insertions(+), 81 deletions(-) create mode 100644 autogpt_platform/backend/backend/data/diagnostics.py create mode 100644 autogpt_platform/frontend/src/app/(platform)/admin/diagnostics/components/ExecutionsTable.tsx diff --git a/autogpt_platform/backend/backend/data/diagnostics.py b/autogpt_platform/backend/backend/data/diagnostics.py new file mode 100644 index 0000000000..8fb63f1b70 --- /dev/null +++ b/autogpt_platform/backend/backend/data/diagnostics.py @@ -0,0 +1,252 @@ +""" +Diagnostics data layer for admin operations. +Provides functions to query and manage system diagnostics including executions and agents. +""" + +import logging +from datetime import datetime, timezone +from typing import List, Optional + +from prisma.enums import AgentExecutionStatus +from prisma.models import AgentGraphExecution, AgentGraph, User +from pydantic import BaseModel + +from backend.data.rabbitmq import SyncRabbitMQ +from backend.executor.utils import create_execution_queue_config, GRAPH_EXECUTION_QUEUE_NAME + +logger = logging.getLogger(__name__) + + +class RunningExecutionDetail(BaseModel): + """Details about a running execution for admin view""" + + execution_id: str + graph_id: str + graph_name: str + graph_version: int + user_id: str + user_email: Optional[str] + status: str + started_at: Optional[datetime] + queue_status: Optional[str] + + +class ExecutionDiagnosticsSummary(BaseModel): + """Summary of execution diagnostics""" + + running_count: int + queued_db_count: int + rabbitmq_queue_depth: int + timestamp: str + + +class AgentDiagnosticsSummary(BaseModel): + """Summary of agent diagnostics""" + + agents_with_active_executions: int + timestamp: str + + +async def get_execution_diagnostics() -> ExecutionDiagnosticsSummary: + """ + Get comprehensive execution diagnostics including database and queue metrics. + + Returns: + ExecutionDiagnosticsSummary with current execution state + """ + try: + # Get running executions count + running_count = await AgentGraphExecution.prisma().count( + where={"executionStatus": AgentExecutionStatus.RUNNING} + ) + + # Get queued executions from database + queued_db_count = await AgentGraphExecution.prisma().count( + where={"executionStatus": AgentExecutionStatus.QUEUED} + ) + + # Get RabbitMQ queue depth + rabbitmq_queue_depth = get_rabbitmq_queue_depth() + + return ExecutionDiagnosticsSummary( + running_count=running_count, + queued_db_count=queued_db_count, + rabbitmq_queue_depth=rabbitmq_queue_depth, + timestamp=datetime.now(timezone.utc).isoformat() + ) + except Exception as e: + logger.error(f"Error getting execution diagnostics: {e}") + raise + + +async def get_agent_diagnostics() -> AgentDiagnosticsSummary: + """ + Get comprehensive agent diagnostics. + + Returns: + AgentDiagnosticsSummary with agent metrics + """ + try: + # Get distinct agent graph IDs with active executions + executions = await AgentGraphExecution.prisma().find_many( + where={ + "executionStatus": { + "in": [AgentExecutionStatus.RUNNING, AgentExecutionStatus.QUEUED] + } + }, + distinct=["agentGraphId"], + ) + + return AgentDiagnosticsSummary( + agents_with_active_executions=len(executions), + timestamp=datetime.now(timezone.utc).isoformat() + ) + except Exception as e: + logger.error(f"Error getting agent diagnostics: {e}") + raise + + +def get_rabbitmq_queue_depth() -> int: + """ + Get the number of messages in the RabbitMQ execution queue. + + Returns: + Number of messages in queue, or -1 if error + """ + try: + # Create a temporary connection to query the queue + config = create_execution_queue_config() + rabbitmq = SyncRabbitMQ(config) + rabbitmq.connect() + + # Use passive queue_declare to get queue info without modifying it + method_frame = rabbitmq._channel.queue_declare( + queue=GRAPH_EXECUTION_QUEUE_NAME, + passive=True + ) + + message_count = method_frame.method.message_count + + # Clean up connection + rabbitmq.disconnect() + + return message_count + except Exception as e: + logger.error(f"Error getting RabbitMQ queue depth: {e}") + # Return -1 to indicate an error state rather than failing the entire request + return -1 + + +async def get_running_executions_details( + limit: int = 100, + offset: int = 0 +) -> List[RunningExecutionDetail]: + """ + Get detailed information about running executions. + + Args: + limit: Maximum number of executions to return + offset: Number of executions to skip + + Returns: + List of RunningExecutionDetail objects + """ + try: + executions = await AgentGraphExecution.prisma().find_many( + where={ + "executionStatus": { + "in": [AgentExecutionStatus.RUNNING, AgentExecutionStatus.QUEUED] + } + }, + include={ + "AgentGraph": True, + "User": True, + }, + take=limit, + skip=offset, + order={"createdAt": "desc"} + ) + + results = [] + for exec in executions: + results.append(RunningExecutionDetail( + execution_id=exec.id, + graph_id=exec.agentGraphId, + graph_name=exec.AgentGraph.name if exec.AgentGraph else "Unknown", + graph_version=exec.agentGraphVersion, + user_id=exec.userId, + user_email=exec.User.email if exec.User else None, + status=exec.executionStatus.value, + started_at=exec.startedAt, + queue_status=exec.queueStatus if hasattr(exec, 'queueStatus') else None, + )) + + return results + except Exception as e: + logger.error(f"Error getting running execution details: {e}") + raise + + +async def stop_execution(execution_id: str, admin_user_id: str) -> bool: + """ + Stop a single execution by setting its status to FAILED. + Admin-only operation. + + Args: + execution_id: ID of the execution to stop + admin_user_id: ID of the admin user performing the operation + + Returns: + True if execution was stopped, False otherwise + """ + try: + logger.info(f"Admin user {admin_user_id} stopping execution {execution_id}") + + # Update the execution status to FAILED + result = await AgentGraphExecution.prisma().update( + where={"id": execution_id}, + data={ + "executionStatus": AgentExecutionStatus.FAILED, + "error": "Execution stopped by admin", + "updatedAt": datetime.now(timezone.utc), + } + ) + + return result is not None + except Exception as e: + logger.error(f"Error stopping execution {execution_id}: {e}") + return False + + +async def stop_executions_bulk(execution_ids: List[str], admin_user_id: str) -> int: + """ + Stop multiple executions by setting their status to FAILED. + Admin-only operation. + + Args: + execution_ids: List of execution IDs to stop + admin_user_id: ID of the admin user performing the operation + + Returns: + Number of executions successfully stopped + """ + try: + logger.info(f"Admin user {admin_user_id} stopping {len(execution_ids)} executions") + + # Update all executions to FAILED status + result = await AgentGraphExecution.prisma().update_many( + where={ + "id": {"in": execution_ids}, + "executionStatus": {"in": [AgentExecutionStatus.RUNNING, AgentExecutionStatus.QUEUED]} + }, + data={ + "executionStatus": AgentExecutionStatus.FAILED, + "error": "Execution stopped by admin", + "updatedAt": datetime.now(timezone.utc), + } + ) + + return result + except Exception as e: + logger.error(f"Error stopping executions in bulk: {e}") + return 0 \ No newline at end of file diff --git a/autogpt_platform/backend/backend/server/v2/admin/diagnostics_admin_routes.py b/autogpt_platform/backend/backend/server/v2/admin/diagnostics_admin_routes.py index 74bae6b662..c5275ea962 100644 --- a/autogpt_platform/backend/backend/server/v2/admin/diagnostics_admin_routes.py +++ b/autogpt_platform/backend/backend/server/v2/admin/diagnostics_admin_routes.py @@ -1,13 +1,19 @@ import logging -from datetime import datetime, timezone +from typing import List, Optional from autogpt_libs.auth import requires_admin_user -from fastapi import APIRouter, HTTPException, Security -from prisma.enums import AgentExecutionStatus -from prisma.models import AgentGraphExecution, AgentGraph +from autogpt_libs.auth.models import User as AuthUser +from fastapi import APIRouter, Depends, HTTPException, Security +from pydantic import BaseModel -from backend.data.rabbitmq import SyncRabbitMQ -from backend.executor.utils import create_execution_queue_config, GRAPH_EXECUTION_QUEUE_NAME +from backend.data.diagnostics import ( + get_execution_diagnostics, + get_agent_diagnostics, + get_running_executions_details, + stop_execution, + stop_executions_bulk, + RunningExecutionDetail, +) from backend.server.v2.admin.model import ( ExecutionDiagnosticsResponse, AgentDiagnosticsResponse, @@ -22,72 +28,27 @@ router = APIRouter( ) -async def get_running_executions_count() -> int: - """Get the count of currently running executions from the database.""" - try: - count = await AgentGraphExecution.prisma().count( - where={"executionStatus": AgentExecutionStatus.RUNNING} - ) - return count - except Exception as e: - logger.error(f"Error getting running executions count: {e}") - raise +class RunningExecutionsListResponse(BaseModel): + """Response model for list of running executions""" + executions: List[RunningExecutionDetail] + total: int -async def get_queued_executions_db_count() -> int: - """Get the count of queued executions from the database.""" - try: - count = await AgentGraphExecution.prisma().count( - where={"executionStatus": AgentExecutionStatus.QUEUED} - ) - return count - except Exception as e: - logger.error(f"Error getting queued executions count from DB: {e}") - raise +class StopExecutionRequest(BaseModel): + """Request model for stopping a single execution""" + execution_id: str -def get_rabbitmq_queue_depth() -> int: - """Get the number of messages in the RabbitMQ execution queue.""" - try: - # Create a temporary connection to query the queue - config = create_execution_queue_config() - rabbitmq = SyncRabbitMQ(config) - rabbitmq.connect() - - # Use passive queue_declare to get queue info without modifying it - method_frame = rabbitmq._channel.queue_declare( - queue=GRAPH_EXECUTION_QUEUE_NAME, - passive=True - ) - - message_count = method_frame.method.message_count - - # Clean up connection - rabbitmq.disconnect() - - return message_count - except Exception as e: - logger.error(f"Error getting RabbitMQ queue depth: {e}") - # Return -1 to indicate an error state rather than failing the entire request - return -1 +class StopExecutionsRequest(BaseModel): + """Request model for stopping multiple executions""" + execution_ids: List[str] -async def get_agents_with_active_executions_count() -> int: - """Get the count of unique agents that have running or queued executions.""" - try: - # Get distinct agent graph IDs with active executions - executions = await AgentGraphExecution.prisma().find_many( - where={ - "executionStatus": { - "in": [AgentExecutionStatus.RUNNING, AgentExecutionStatus.QUEUED] - } - }, - distinct=["agentGraphId"], - ) - return len(executions) - except Exception as e: - logger.error(f"Error getting agents with active executions count: {e}") - raise +class StopExecutionResponse(BaseModel): + """Response model for stop execution operations""" + success: bool + stopped_count: int = 0 + message: str @router.get( @@ -95,7 +56,7 @@ async def get_agents_with_active_executions_count() -> int: response_model=ExecutionDiagnosticsResponse, summary="Get Execution Diagnostics", ) -async def get_execution_diagnostics(): +async def get_execution_diagnostics_endpoint(): """ Get diagnostic information about execution status. @@ -108,20 +69,18 @@ async def get_execution_diagnostics(): try: logger.info("Getting execution diagnostics") - running_count = await get_running_executions_count() - queued_db_count = await get_queued_executions_db_count() - rabbitmq_count = get_rabbitmq_queue_depth() + diagnostics = await get_execution_diagnostics() response = ExecutionDiagnosticsResponse( - running_executions=running_count, - queued_executions_db=queued_db_count, - queued_executions_rabbitmq=rabbitmq_count, - timestamp=datetime.now(timezone.utc).isoformat(), + running_executions=diagnostics.running_count, + queued_executions_db=diagnostics.queued_db_count, + queued_executions_rabbitmq=diagnostics.rabbitmq_queue_depth, + timestamp=diagnostics.timestamp, ) logger.info( - f"Execution diagnostics: running={running_count}, " - f"queued_db={queued_db_count}, queued_rabbitmq={rabbitmq_count}" + f"Execution diagnostics: running={diagnostics.running_count}, " + f"queued_db={diagnostics.queued_db_count}, queued_rabbitmq={diagnostics.rabbitmq_queue_depth}" ) return response @@ -135,7 +94,7 @@ async def get_execution_diagnostics(): response_model=AgentDiagnosticsResponse, summary="Get Agent Diagnostics", ) -async def get_agent_diagnostics(): +async def get_agent_diagnostics_endpoint(): """ Get diagnostic information about agents. @@ -146,18 +105,122 @@ async def get_agent_diagnostics(): try: logger.info("Getting agent diagnostics") - active_executions_count = await get_agents_with_active_executions_count() + diagnostics = await get_agent_diagnostics() response = AgentDiagnosticsResponse( - agents_with_active_executions=active_executions_count, - timestamp=datetime.now(timezone.utc).isoformat(), + agents_with_active_executions=diagnostics.agents_with_active_executions, + timestamp=diagnostics.timestamp, ) logger.info( - f"Agent diagnostics: with_active_executions={active_executions_count}" + f"Agent diagnostics: with_active_executions={diagnostics.agents_with_active_executions}" ) return response except Exception as e: logger.exception(f"Error getting agent diagnostics: {e}") raise HTTPException(status_code=500, detail=str(e)) + + +@router.get( + "/diagnostics/executions/running", + response_model=RunningExecutionsListResponse, + summary="List Running Executions", +) +async def list_running_executions( + limit: int = 100, + offset: int = 0, +): + """ + Get detailed list of running and queued executions. + + Args: + limit: Maximum number of executions to return (default 100) + offset: Number of executions to skip (default 0) + + Returns: + List of running executions with details + """ + try: + logger.info(f"Listing running executions (limit={limit}, offset={offset})") + + executions = await get_running_executions_details(limit=limit, offset=offset) + + # Get total count for pagination + from backend.data.diagnostics import get_execution_diagnostics as get_diag + diagnostics = await get_diag() + total = diagnostics.running_count + diagnostics.queued_db_count + + return RunningExecutionsListResponse( + executions=executions, + total=total + ) + except Exception as e: + logger.exception(f"Error listing running executions: {e}") + raise HTTPException(status_code=500, detail=str(e)) + + +@router.post( + "/diagnostics/executions/stop", + response_model=StopExecutionResponse, + summary="Stop Single Execution", +) +async def stop_single_execution( + request: StopExecutionRequest, + user: AuthUser = Security(requires_admin_user), +): + """ + Stop a single execution (admin only). + + Args: + request: Contains execution_id to stop + + Returns: + Success status and message + """ + try: + logger.info(f"Admin {user.id} stopping execution {request.execution_id}") + + success = await stop_execution(request.execution_id, user.id) + + return StopExecutionResponse( + success=success, + stopped_count=1 if success else 0, + message="Execution stopped successfully" if success else "Failed to stop execution" + ) + except Exception as e: + logger.exception(f"Error stopping execution: {e}") + raise HTTPException(status_code=500, detail=str(e)) + + +@router.post( + "/diagnostics/executions/stop-bulk", + response_model=StopExecutionResponse, + summary="Stop Multiple Executions", +) +async def stop_multiple_executions( + request: StopExecutionsRequest, + user: AuthUser = Security(requires_admin_user), +): + """ + Stop multiple executions (admin only). + + Args: + request: Contains list of execution_ids to stop + + Returns: + Number of executions stopped and success message + """ + try: + logger.info(f"Admin {user.id} stopping {len(request.execution_ids)} executions") + + stopped_count = await stop_executions_bulk(request.execution_ids, user.id) + + return StopExecutionResponse( + success=stopped_count > 0, + stopped_count=stopped_count, + message=f"Stopped {stopped_count} of {len(request.execution_ids)} executions" + ) + except Exception as e: + logger.exception(f"Error stopping multiple executions: {e}") + raise HTTPException(status_code=500, detail=str(e)) diff --git a/autogpt_platform/frontend/src/app/(platform)/admin/diagnostics/components/DiagnosticsContent.tsx b/autogpt_platform/frontend/src/app/(platform)/admin/diagnostics/components/DiagnosticsContent.tsx index 620ded4548..69ec826302 100644 --- a/autogpt_platform/frontend/src/app/(platform)/admin/diagnostics/components/DiagnosticsContent.tsx +++ b/autogpt_platform/frontend/src/app/(platform)/admin/diagnostics/components/DiagnosticsContent.tsx @@ -11,6 +11,7 @@ import { Button } from "@/components/ui/button"; import { ArrowClockwise } from "@phosphor-icons/react"; import { ErrorCard } from "@/components/molecules/ErrorCard/ErrorCard"; import { useDiagnosticsContent } from "./useDiagnosticsContent"; +import { ExecutionsTable } from "./ExecutionsTable"; export function DiagnosticsContent() { const { executionData, agentData, isLoading, isError, error, refresh } = @@ -215,6 +216,9 @@ export function DiagnosticsContent() { + + {/* Add Executions Table */} + ); } diff --git a/autogpt_platform/frontend/src/app/(platform)/admin/diagnostics/components/ExecutionsTable.tsx b/autogpt_platform/frontend/src/app/(platform)/admin/diagnostics/components/ExecutionsTable.tsx new file mode 100644 index 0000000000..5460b37aa8 --- /dev/null +++ b/autogpt_platform/frontend/src/app/(platform)/admin/diagnostics/components/ExecutionsTable.tsx @@ -0,0 +1,400 @@ +"use client"; + +import React, { useState, useEffect } from "react"; +import { + Table, + TableBody, + TableCell, + TableHead, + TableHeader, + TableRow, +} from "@/components/ui/table"; +import { Card, CardContent, CardHeader, CardTitle } from "@/components/ui/card"; +import { Button } from "@/components/ui/button"; +import { Checkbox } from "@/components/ui/checkbox"; +import { toast } from "@/components/ui/use-toast"; +import { + AlertDialog, + AlertDialogAction, + AlertDialogCancel, + AlertDialogContent, + AlertDialogDescription, + AlertDialogFooter, + AlertDialogHeader, + AlertDialogTitle, +} from "@/components/ui/alert-dialog"; +import { + Stop, + StopCircle, + ArrowClockwise, + CaretLeft, + CaretRight, +} from "@phosphor-icons/react"; +import { apiUrl } from "@/lib/autogpt-server-api"; + +interface RunningExecutionDetail { + execution_id: string; + graph_id: string; + graph_name: string; + graph_version: number; + user_id: string; + user_email: string | null; + status: string; + started_at: string | null; + queue_status: string | null; +} + +interface ExecutionsTableProps { + onRefresh?: () => void; +} + +export function ExecutionsTable({ onRefresh }: ExecutionsTableProps) { + const [executions, setExecutions] = useState([]); + const [selectedIds, setSelectedIds] = useState>(new Set()); + const [isLoading, setIsLoading] = useState(false); + const [isStopping, setIsStopping] = useState(false); + const [showStopDialog, setShowStopDialog] = useState(false); + const [stopTarget, setStopTarget] = useState< + "single" | "selected" | "all" + >("single"); + const [singleStopId, setSingleStopId] = useState(null); + const [total, setTotal] = useState(0); + const [currentPage, setCurrentPage] = useState(1); + const [pageSize] = useState(10); + + const fetchExecutions = async () => { + setIsLoading(true); + try { + const offset = (currentPage - 1) * pageSize; + const response = await fetch( + `${apiUrl}/admin/diagnostics/executions/running?limit=${pageSize}&offset=${offset}`, + { + credentials: "include", + } + ); + + if (!response.ok) { + throw new Error("Failed to fetch executions"); + } + + const data = await response.json(); + setExecutions(data.executions || []); + setTotal(data.total || 0); + } catch (error) { + console.error("Error fetching executions:", error); + toast({ + title: "Error", + description: "Failed to fetch running executions", + variant: "destructive", + }); + } finally { + setIsLoading(false); + } + }; + + useEffect(() => { + fetchExecutions(); + }, [currentPage]); + + const handleSelectAll = (checked: boolean) => { + if (checked) { + setSelectedIds(new Set(executions.map((e) => e.execution_id))); + } else { + setSelectedIds(new Set()); + } + }; + + const handleSelectExecution = (id: string, checked: boolean) => { + const newSelected = new Set(selectedIds); + if (checked) { + newSelected.add(id); + } else { + newSelected.delete(id); + } + setSelectedIds(newSelected); + }; + + const confirmStop = ( + target: "single" | "selected" | "all", + singleId?: string + ) => { + setStopTarget(target); + setSingleStopId(singleId || null); + setShowStopDialog(true); + }; + + const handleStop = async () => { + setShowStopDialog(false); + setIsStopping(true); + + let endpoint: string; + let body: any; + + if (stopTarget === "single" && singleStopId) { + endpoint = `${apiUrl}/admin/diagnostics/executions/stop`; + body = { execution_id: singleStopId }; + } else { + let idsToStop: string[] = []; + if (stopTarget === "selected") { + idsToStop = Array.from(selectedIds); + } else if (stopTarget === "all") { + idsToStop = executions.map((e) => e.execution_id); + } + + endpoint = `${apiUrl}/admin/diagnostics/executions/stop-bulk`; + body = { execution_ids: idsToStop }; + } + + try { + const response = await fetch(endpoint, { + method: "POST", + headers: { + "Content-Type": "application/json", + }, + credentials: "include", + body: JSON.stringify(body), + }); + + const data = await response.json(); + + if (!response.ok) { + throw new Error(data.detail || "Failed to stop executions"); + } + + toast({ + title: "Success", + description: data.message || "Executions stopped successfully", + }); + + // Clear selections and refresh + setSelectedIds(new Set()); + await fetchExecutions(); + if (onRefresh) { + onRefresh(); + } + } catch (error: any) { + console.error("Error stopping executions:", error); + toast({ + title: "Error", + description: error.message || "Failed to stop executions", + variant: "destructive", + }); + } finally { + setIsStopping(false); + } + }; + + const totalPages = Math.ceil(total / pageSize); + + return ( + <> + + +
+ Running Executions +
+ {selectedIds.size > 0 && ( + + )} + + +
+
+
+ + {isLoading && executions.length === 0 ? ( +
+ +
+ ) : executions.length === 0 ? ( +
+ No running executions +
+ ) : ( + <> + + + + + 0 + } + onCheckedChange={handleSelectAll} + /> + + Execution ID + Agent Name + Version + User + Status + Started At + Actions + + + + {executions.map((execution) => ( + + + + handleSelectExecution( + execution.execution_id, + checked as boolean + ) + } + /> + + + {execution.execution_id.substring(0, 8)}... + + {execution.graph_name} + {execution.graph_version} + +
+ {execution.user_email || ( + Unknown + )} +
+
+ {execution.user_id.substring(0, 8)}... +
+
+ + + {execution.status} + + + + {execution.started_at + ? new Date(execution.started_at).toLocaleString() + : "-"} + + + + +
+ ))} +
+
+ + {totalPages > 1 && ( +
+
+ Showing {(currentPage - 1) * pageSize + 1} to{" "} + {Math.min(currentPage * pageSize, total)} of {total}{" "} + executions +
+
+ +
+ Page {currentPage} of {totalPages} +
+ +
+
+ )} + + )} +
+
+ + + + + Confirm Stop Executions + + {stopTarget === "single" && ( + <>Are you sure you want to stop this execution? + )} + {stopTarget === "selected" && ( + <> + Are you sure you want to stop {selectedIds.size} selected + execution(s)? + + )} + {stopTarget === "all" && ( + <> + Are you sure you want to stop ALL {executions.length} running + executions? + + )} +
+
+ This action cannot be undone. The executions will be marked as + FAILED. +
+
+ + Cancel + + Stop Executions + + +
+
+ + ); +} \ No newline at end of file