mirror of
https://github.com/Significant-Gravitas/AutoGPT.git
synced 2026-04-30 03:00:41 -04:00
feat(admin): Add execution management table with stop functionality
- Created backend/data/diagnostics.py following Option B data layer pattern - Refactored diagnostics_admin_routes.py to use the new data layer - Added endpoints for listing running executions with details - Added endpoints for stopping executions (single and bulk) - Created ExecutionsTable component with multi-select and stop buttons - Integrated execution management table into diagnostics page Co-authored-by: Nicholas Tindle <ntindle@users.noreply.github.com>
This commit is contained in:
252
autogpt_platform/backend/backend/data/diagnostics.py
Normal file
252
autogpt_platform/backend/backend/data/diagnostics.py
Normal file
@@ -0,0 +1,252 @@
|
||||
"""
|
||||
Diagnostics data layer for admin operations.
|
||||
Provides functions to query and manage system diagnostics including executions and agents.
|
||||
"""
|
||||
|
||||
import logging
|
||||
from datetime import datetime, timezone
|
||||
from typing import List, Optional
|
||||
|
||||
from prisma.enums import AgentExecutionStatus
|
||||
from prisma.models import AgentGraphExecution, AgentGraph, User
|
||||
from pydantic import BaseModel
|
||||
|
||||
from backend.data.rabbitmq import SyncRabbitMQ
|
||||
from backend.executor.utils import create_execution_queue_config, GRAPH_EXECUTION_QUEUE_NAME
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class RunningExecutionDetail(BaseModel):
|
||||
"""Details about a running execution for admin view"""
|
||||
|
||||
execution_id: str
|
||||
graph_id: str
|
||||
graph_name: str
|
||||
graph_version: int
|
||||
user_id: str
|
||||
user_email: Optional[str]
|
||||
status: str
|
||||
started_at: Optional[datetime]
|
||||
queue_status: Optional[str]
|
||||
|
||||
|
||||
class ExecutionDiagnosticsSummary(BaseModel):
|
||||
"""Summary of execution diagnostics"""
|
||||
|
||||
running_count: int
|
||||
queued_db_count: int
|
||||
rabbitmq_queue_depth: int
|
||||
timestamp: str
|
||||
|
||||
|
||||
class AgentDiagnosticsSummary(BaseModel):
|
||||
"""Summary of agent diagnostics"""
|
||||
|
||||
agents_with_active_executions: int
|
||||
timestamp: str
|
||||
|
||||
|
||||
async def get_execution_diagnostics() -> ExecutionDiagnosticsSummary:
|
||||
"""
|
||||
Get comprehensive execution diagnostics including database and queue metrics.
|
||||
|
||||
Returns:
|
||||
ExecutionDiagnosticsSummary with current execution state
|
||||
"""
|
||||
try:
|
||||
# Get running executions count
|
||||
running_count = await AgentGraphExecution.prisma().count(
|
||||
where={"executionStatus": AgentExecutionStatus.RUNNING}
|
||||
)
|
||||
|
||||
# Get queued executions from database
|
||||
queued_db_count = await AgentGraphExecution.prisma().count(
|
||||
where={"executionStatus": AgentExecutionStatus.QUEUED}
|
||||
)
|
||||
|
||||
# Get RabbitMQ queue depth
|
||||
rabbitmq_queue_depth = get_rabbitmq_queue_depth()
|
||||
|
||||
return ExecutionDiagnosticsSummary(
|
||||
running_count=running_count,
|
||||
queued_db_count=queued_db_count,
|
||||
rabbitmq_queue_depth=rabbitmq_queue_depth,
|
||||
timestamp=datetime.now(timezone.utc).isoformat()
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(f"Error getting execution diagnostics: {e}")
|
||||
raise
|
||||
|
||||
|
||||
async def get_agent_diagnostics() -> AgentDiagnosticsSummary:
|
||||
"""
|
||||
Get comprehensive agent diagnostics.
|
||||
|
||||
Returns:
|
||||
AgentDiagnosticsSummary with agent metrics
|
||||
"""
|
||||
try:
|
||||
# Get distinct agent graph IDs with active executions
|
||||
executions = await AgentGraphExecution.prisma().find_many(
|
||||
where={
|
||||
"executionStatus": {
|
||||
"in": [AgentExecutionStatus.RUNNING, AgentExecutionStatus.QUEUED]
|
||||
}
|
||||
},
|
||||
distinct=["agentGraphId"],
|
||||
)
|
||||
|
||||
return AgentDiagnosticsSummary(
|
||||
agents_with_active_executions=len(executions),
|
||||
timestamp=datetime.now(timezone.utc).isoformat()
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(f"Error getting agent diagnostics: {e}")
|
||||
raise
|
||||
|
||||
|
||||
def get_rabbitmq_queue_depth() -> int:
|
||||
"""
|
||||
Get the number of messages in the RabbitMQ execution queue.
|
||||
|
||||
Returns:
|
||||
Number of messages in queue, or -1 if error
|
||||
"""
|
||||
try:
|
||||
# Create a temporary connection to query the queue
|
||||
config = create_execution_queue_config()
|
||||
rabbitmq = SyncRabbitMQ(config)
|
||||
rabbitmq.connect()
|
||||
|
||||
# Use passive queue_declare to get queue info without modifying it
|
||||
method_frame = rabbitmq._channel.queue_declare(
|
||||
queue=GRAPH_EXECUTION_QUEUE_NAME,
|
||||
passive=True
|
||||
)
|
||||
|
||||
message_count = method_frame.method.message_count
|
||||
|
||||
# Clean up connection
|
||||
rabbitmq.disconnect()
|
||||
|
||||
return message_count
|
||||
except Exception as e:
|
||||
logger.error(f"Error getting RabbitMQ queue depth: {e}")
|
||||
# Return -1 to indicate an error state rather than failing the entire request
|
||||
return -1
|
||||
|
||||
|
||||
async def get_running_executions_details(
|
||||
limit: int = 100,
|
||||
offset: int = 0
|
||||
) -> List[RunningExecutionDetail]:
|
||||
"""
|
||||
Get detailed information about running executions.
|
||||
|
||||
Args:
|
||||
limit: Maximum number of executions to return
|
||||
offset: Number of executions to skip
|
||||
|
||||
Returns:
|
||||
List of RunningExecutionDetail objects
|
||||
"""
|
||||
try:
|
||||
executions = await AgentGraphExecution.prisma().find_many(
|
||||
where={
|
||||
"executionStatus": {
|
||||
"in": [AgentExecutionStatus.RUNNING, AgentExecutionStatus.QUEUED]
|
||||
}
|
||||
},
|
||||
include={
|
||||
"AgentGraph": True,
|
||||
"User": True,
|
||||
},
|
||||
take=limit,
|
||||
skip=offset,
|
||||
order={"createdAt": "desc"}
|
||||
)
|
||||
|
||||
results = []
|
||||
for exec in executions:
|
||||
results.append(RunningExecutionDetail(
|
||||
execution_id=exec.id,
|
||||
graph_id=exec.agentGraphId,
|
||||
graph_name=exec.AgentGraph.name if exec.AgentGraph else "Unknown",
|
||||
graph_version=exec.agentGraphVersion,
|
||||
user_id=exec.userId,
|
||||
user_email=exec.User.email if exec.User else None,
|
||||
status=exec.executionStatus.value,
|
||||
started_at=exec.startedAt,
|
||||
queue_status=exec.queueStatus if hasattr(exec, 'queueStatus') else None,
|
||||
))
|
||||
|
||||
return results
|
||||
except Exception as e:
|
||||
logger.error(f"Error getting running execution details: {e}")
|
||||
raise
|
||||
|
||||
|
||||
async def stop_execution(execution_id: str, admin_user_id: str) -> bool:
|
||||
"""
|
||||
Stop a single execution by setting its status to FAILED.
|
||||
Admin-only operation.
|
||||
|
||||
Args:
|
||||
execution_id: ID of the execution to stop
|
||||
admin_user_id: ID of the admin user performing the operation
|
||||
|
||||
Returns:
|
||||
True if execution was stopped, False otherwise
|
||||
"""
|
||||
try:
|
||||
logger.info(f"Admin user {admin_user_id} stopping execution {execution_id}")
|
||||
|
||||
# Update the execution status to FAILED
|
||||
result = await AgentGraphExecution.prisma().update(
|
||||
where={"id": execution_id},
|
||||
data={
|
||||
"executionStatus": AgentExecutionStatus.FAILED,
|
||||
"error": "Execution stopped by admin",
|
||||
"updatedAt": datetime.now(timezone.utc),
|
||||
}
|
||||
)
|
||||
|
||||
return result is not None
|
||||
except Exception as e:
|
||||
logger.error(f"Error stopping execution {execution_id}: {e}")
|
||||
return False
|
||||
|
||||
|
||||
async def stop_executions_bulk(execution_ids: List[str], admin_user_id: str) -> int:
|
||||
"""
|
||||
Stop multiple executions by setting their status to FAILED.
|
||||
Admin-only operation.
|
||||
|
||||
Args:
|
||||
execution_ids: List of execution IDs to stop
|
||||
admin_user_id: ID of the admin user performing the operation
|
||||
|
||||
Returns:
|
||||
Number of executions successfully stopped
|
||||
"""
|
||||
try:
|
||||
logger.info(f"Admin user {admin_user_id} stopping {len(execution_ids)} executions")
|
||||
|
||||
# Update all executions to FAILED status
|
||||
result = await AgentGraphExecution.prisma().update_many(
|
||||
where={
|
||||
"id": {"in": execution_ids},
|
||||
"executionStatus": {"in": [AgentExecutionStatus.RUNNING, AgentExecutionStatus.QUEUED]}
|
||||
},
|
||||
data={
|
||||
"executionStatus": AgentExecutionStatus.FAILED,
|
||||
"error": "Execution stopped by admin",
|
||||
"updatedAt": datetime.now(timezone.utc),
|
||||
}
|
||||
)
|
||||
|
||||
return result
|
||||
except Exception as e:
|
||||
logger.error(f"Error stopping executions in bulk: {e}")
|
||||
return 0
|
||||
@@ -1,13 +1,19 @@
|
||||
import logging
|
||||
from datetime import datetime, timezone
|
||||
from typing import List, Optional
|
||||
|
||||
from autogpt_libs.auth import requires_admin_user
|
||||
from fastapi import APIRouter, HTTPException, Security
|
||||
from prisma.enums import AgentExecutionStatus
|
||||
from prisma.models import AgentGraphExecution, AgentGraph
|
||||
from autogpt_libs.auth.models import User as AuthUser
|
||||
from fastapi import APIRouter, Depends, HTTPException, Security
|
||||
from pydantic import BaseModel
|
||||
|
||||
from backend.data.rabbitmq import SyncRabbitMQ
|
||||
from backend.executor.utils import create_execution_queue_config, GRAPH_EXECUTION_QUEUE_NAME
|
||||
from backend.data.diagnostics import (
|
||||
get_execution_diagnostics,
|
||||
get_agent_diagnostics,
|
||||
get_running_executions_details,
|
||||
stop_execution,
|
||||
stop_executions_bulk,
|
||||
RunningExecutionDetail,
|
||||
)
|
||||
from backend.server.v2.admin.model import (
|
||||
ExecutionDiagnosticsResponse,
|
||||
AgentDiagnosticsResponse,
|
||||
@@ -22,72 +28,27 @@ router = APIRouter(
|
||||
)
|
||||
|
||||
|
||||
async def get_running_executions_count() -> int:
|
||||
"""Get the count of currently running executions from the database."""
|
||||
try:
|
||||
count = await AgentGraphExecution.prisma().count(
|
||||
where={"executionStatus": AgentExecutionStatus.RUNNING}
|
||||
)
|
||||
return count
|
||||
except Exception as e:
|
||||
logger.error(f"Error getting running executions count: {e}")
|
||||
raise
|
||||
class RunningExecutionsListResponse(BaseModel):
|
||||
"""Response model for list of running executions"""
|
||||
executions: List[RunningExecutionDetail]
|
||||
total: int
|
||||
|
||||
|
||||
async def get_queued_executions_db_count() -> int:
|
||||
"""Get the count of queued executions from the database."""
|
||||
try:
|
||||
count = await AgentGraphExecution.prisma().count(
|
||||
where={"executionStatus": AgentExecutionStatus.QUEUED}
|
||||
)
|
||||
return count
|
||||
except Exception as e:
|
||||
logger.error(f"Error getting queued executions count from DB: {e}")
|
||||
raise
|
||||
class StopExecutionRequest(BaseModel):
|
||||
"""Request model for stopping a single execution"""
|
||||
execution_id: str
|
||||
|
||||
|
||||
def get_rabbitmq_queue_depth() -> int:
|
||||
"""Get the number of messages in the RabbitMQ execution queue."""
|
||||
try:
|
||||
# Create a temporary connection to query the queue
|
||||
config = create_execution_queue_config()
|
||||
rabbitmq = SyncRabbitMQ(config)
|
||||
rabbitmq.connect()
|
||||
|
||||
# Use passive queue_declare to get queue info without modifying it
|
||||
method_frame = rabbitmq._channel.queue_declare(
|
||||
queue=GRAPH_EXECUTION_QUEUE_NAME,
|
||||
passive=True
|
||||
)
|
||||
|
||||
message_count = method_frame.method.message_count
|
||||
|
||||
# Clean up connection
|
||||
rabbitmq.disconnect()
|
||||
|
||||
return message_count
|
||||
except Exception as e:
|
||||
logger.error(f"Error getting RabbitMQ queue depth: {e}")
|
||||
# Return -1 to indicate an error state rather than failing the entire request
|
||||
return -1
|
||||
class StopExecutionsRequest(BaseModel):
|
||||
"""Request model for stopping multiple executions"""
|
||||
execution_ids: List[str]
|
||||
|
||||
|
||||
async def get_agents_with_active_executions_count() -> int:
|
||||
"""Get the count of unique agents that have running or queued executions."""
|
||||
try:
|
||||
# Get distinct agent graph IDs with active executions
|
||||
executions = await AgentGraphExecution.prisma().find_many(
|
||||
where={
|
||||
"executionStatus": {
|
||||
"in": [AgentExecutionStatus.RUNNING, AgentExecutionStatus.QUEUED]
|
||||
}
|
||||
},
|
||||
distinct=["agentGraphId"],
|
||||
)
|
||||
return len(executions)
|
||||
except Exception as e:
|
||||
logger.error(f"Error getting agents with active executions count: {e}")
|
||||
raise
|
||||
class StopExecutionResponse(BaseModel):
|
||||
"""Response model for stop execution operations"""
|
||||
success: bool
|
||||
stopped_count: int = 0
|
||||
message: str
|
||||
|
||||
|
||||
@router.get(
|
||||
@@ -95,7 +56,7 @@ async def get_agents_with_active_executions_count() -> int:
|
||||
response_model=ExecutionDiagnosticsResponse,
|
||||
summary="Get Execution Diagnostics",
|
||||
)
|
||||
async def get_execution_diagnostics():
|
||||
async def get_execution_diagnostics_endpoint():
|
||||
"""
|
||||
Get diagnostic information about execution status.
|
||||
|
||||
@@ -108,20 +69,18 @@ async def get_execution_diagnostics():
|
||||
try:
|
||||
logger.info("Getting execution diagnostics")
|
||||
|
||||
running_count = await get_running_executions_count()
|
||||
queued_db_count = await get_queued_executions_db_count()
|
||||
rabbitmq_count = get_rabbitmq_queue_depth()
|
||||
diagnostics = await get_execution_diagnostics()
|
||||
|
||||
response = ExecutionDiagnosticsResponse(
|
||||
running_executions=running_count,
|
||||
queued_executions_db=queued_db_count,
|
||||
queued_executions_rabbitmq=rabbitmq_count,
|
||||
timestamp=datetime.now(timezone.utc).isoformat(),
|
||||
running_executions=diagnostics.running_count,
|
||||
queued_executions_db=diagnostics.queued_db_count,
|
||||
queued_executions_rabbitmq=diagnostics.rabbitmq_queue_depth,
|
||||
timestamp=diagnostics.timestamp,
|
||||
)
|
||||
|
||||
logger.info(
|
||||
f"Execution diagnostics: running={running_count}, "
|
||||
f"queued_db={queued_db_count}, queued_rabbitmq={rabbitmq_count}"
|
||||
f"Execution diagnostics: running={diagnostics.running_count}, "
|
||||
f"queued_db={diagnostics.queued_db_count}, queued_rabbitmq={diagnostics.rabbitmq_queue_depth}"
|
||||
)
|
||||
|
||||
return response
|
||||
@@ -135,7 +94,7 @@ async def get_execution_diagnostics():
|
||||
response_model=AgentDiagnosticsResponse,
|
||||
summary="Get Agent Diagnostics",
|
||||
)
|
||||
async def get_agent_diagnostics():
|
||||
async def get_agent_diagnostics_endpoint():
|
||||
"""
|
||||
Get diagnostic information about agents.
|
||||
|
||||
@@ -146,18 +105,122 @@ async def get_agent_diagnostics():
|
||||
try:
|
||||
logger.info("Getting agent diagnostics")
|
||||
|
||||
active_executions_count = await get_agents_with_active_executions_count()
|
||||
diagnostics = await get_agent_diagnostics()
|
||||
|
||||
response = AgentDiagnosticsResponse(
|
||||
agents_with_active_executions=active_executions_count,
|
||||
timestamp=datetime.now(timezone.utc).isoformat(),
|
||||
agents_with_active_executions=diagnostics.agents_with_active_executions,
|
||||
timestamp=diagnostics.timestamp,
|
||||
)
|
||||
|
||||
logger.info(
|
||||
f"Agent diagnostics: with_active_executions={active_executions_count}"
|
||||
f"Agent diagnostics: with_active_executions={diagnostics.agents_with_active_executions}"
|
||||
)
|
||||
|
||||
return response
|
||||
except Exception as e:
|
||||
logger.exception(f"Error getting agent diagnostics: {e}")
|
||||
raise HTTPException(status_code=500, detail=str(e))
|
||||
|
||||
|
||||
@router.get(
|
||||
"/diagnostics/executions/running",
|
||||
response_model=RunningExecutionsListResponse,
|
||||
summary="List Running Executions",
|
||||
)
|
||||
async def list_running_executions(
|
||||
limit: int = 100,
|
||||
offset: int = 0,
|
||||
):
|
||||
"""
|
||||
Get detailed list of running and queued executions.
|
||||
|
||||
Args:
|
||||
limit: Maximum number of executions to return (default 100)
|
||||
offset: Number of executions to skip (default 0)
|
||||
|
||||
Returns:
|
||||
List of running executions with details
|
||||
"""
|
||||
try:
|
||||
logger.info(f"Listing running executions (limit={limit}, offset={offset})")
|
||||
|
||||
executions = await get_running_executions_details(limit=limit, offset=offset)
|
||||
|
||||
# Get total count for pagination
|
||||
from backend.data.diagnostics import get_execution_diagnostics as get_diag
|
||||
diagnostics = await get_diag()
|
||||
total = diagnostics.running_count + diagnostics.queued_db_count
|
||||
|
||||
return RunningExecutionsListResponse(
|
||||
executions=executions,
|
||||
total=total
|
||||
)
|
||||
except Exception as e:
|
||||
logger.exception(f"Error listing running executions: {e}")
|
||||
raise HTTPException(status_code=500, detail=str(e))
|
||||
|
||||
|
||||
@router.post(
|
||||
"/diagnostics/executions/stop",
|
||||
response_model=StopExecutionResponse,
|
||||
summary="Stop Single Execution",
|
||||
)
|
||||
async def stop_single_execution(
|
||||
request: StopExecutionRequest,
|
||||
user: AuthUser = Security(requires_admin_user),
|
||||
):
|
||||
"""
|
||||
Stop a single execution (admin only).
|
||||
|
||||
Args:
|
||||
request: Contains execution_id to stop
|
||||
|
||||
Returns:
|
||||
Success status and message
|
||||
"""
|
||||
try:
|
||||
logger.info(f"Admin {user.id} stopping execution {request.execution_id}")
|
||||
|
||||
success = await stop_execution(request.execution_id, user.id)
|
||||
|
||||
return StopExecutionResponse(
|
||||
success=success,
|
||||
stopped_count=1 if success else 0,
|
||||
message="Execution stopped successfully" if success else "Failed to stop execution"
|
||||
)
|
||||
except Exception as e:
|
||||
logger.exception(f"Error stopping execution: {e}")
|
||||
raise HTTPException(status_code=500, detail=str(e))
|
||||
|
||||
|
||||
@router.post(
|
||||
"/diagnostics/executions/stop-bulk",
|
||||
response_model=StopExecutionResponse,
|
||||
summary="Stop Multiple Executions",
|
||||
)
|
||||
async def stop_multiple_executions(
|
||||
request: StopExecutionsRequest,
|
||||
user: AuthUser = Security(requires_admin_user),
|
||||
):
|
||||
"""
|
||||
Stop multiple executions (admin only).
|
||||
|
||||
Args:
|
||||
request: Contains list of execution_ids to stop
|
||||
|
||||
Returns:
|
||||
Number of executions stopped and success message
|
||||
"""
|
||||
try:
|
||||
logger.info(f"Admin {user.id} stopping {len(request.execution_ids)} executions")
|
||||
|
||||
stopped_count = await stop_executions_bulk(request.execution_ids, user.id)
|
||||
|
||||
return StopExecutionResponse(
|
||||
success=stopped_count > 0,
|
||||
stopped_count=stopped_count,
|
||||
message=f"Stopped {stopped_count} of {len(request.execution_ids)} executions"
|
||||
)
|
||||
except Exception as e:
|
||||
logger.exception(f"Error stopping multiple executions: {e}")
|
||||
raise HTTPException(status_code=500, detail=str(e))
|
||||
|
||||
@@ -11,6 +11,7 @@ import { Button } from "@/components/ui/button";
|
||||
import { ArrowClockwise } from "@phosphor-icons/react";
|
||||
import { ErrorCard } from "@/components/molecules/ErrorCard/ErrorCard";
|
||||
import { useDiagnosticsContent } from "./useDiagnosticsContent";
|
||||
import { ExecutionsTable } from "./ExecutionsTable";
|
||||
|
||||
export function DiagnosticsContent() {
|
||||
const { executionData, agentData, isLoading, isError, error, refresh } =
|
||||
@@ -215,6 +216,9 @@ export function DiagnosticsContent() {
|
||||
</div>
|
||||
</CardContent>
|
||||
</Card>
|
||||
|
||||
{/* Add Executions Table */}
|
||||
<ExecutionsTable onRefresh={refresh} />
|
||||
</div>
|
||||
);
|
||||
}
|
||||
|
||||
@@ -0,0 +1,400 @@
|
||||
"use client";
|
||||
|
||||
import React, { useState, useEffect } from "react";
|
||||
import {
|
||||
Table,
|
||||
TableBody,
|
||||
TableCell,
|
||||
TableHead,
|
||||
TableHeader,
|
||||
TableRow,
|
||||
} from "@/components/ui/table";
|
||||
import { Card, CardContent, CardHeader, CardTitle } from "@/components/ui/card";
|
||||
import { Button } from "@/components/ui/button";
|
||||
import { Checkbox } from "@/components/ui/checkbox";
|
||||
import { toast } from "@/components/ui/use-toast";
|
||||
import {
|
||||
AlertDialog,
|
||||
AlertDialogAction,
|
||||
AlertDialogCancel,
|
||||
AlertDialogContent,
|
||||
AlertDialogDescription,
|
||||
AlertDialogFooter,
|
||||
AlertDialogHeader,
|
||||
AlertDialogTitle,
|
||||
} from "@/components/ui/alert-dialog";
|
||||
import {
|
||||
Stop,
|
||||
StopCircle,
|
||||
ArrowClockwise,
|
||||
CaretLeft,
|
||||
CaretRight,
|
||||
} from "@phosphor-icons/react";
|
||||
import { apiUrl } from "@/lib/autogpt-server-api";
|
||||
|
||||
interface RunningExecutionDetail {
|
||||
execution_id: string;
|
||||
graph_id: string;
|
||||
graph_name: string;
|
||||
graph_version: number;
|
||||
user_id: string;
|
||||
user_email: string | null;
|
||||
status: string;
|
||||
started_at: string | null;
|
||||
queue_status: string | null;
|
||||
}
|
||||
|
||||
interface ExecutionsTableProps {
|
||||
onRefresh?: () => void;
|
||||
}
|
||||
|
||||
export function ExecutionsTable({ onRefresh }: ExecutionsTableProps) {
|
||||
const [executions, setExecutions] = useState<RunningExecutionDetail[]>([]);
|
||||
const [selectedIds, setSelectedIds] = useState<Set<string>>(new Set());
|
||||
const [isLoading, setIsLoading] = useState(false);
|
||||
const [isStopping, setIsStopping] = useState(false);
|
||||
const [showStopDialog, setShowStopDialog] = useState(false);
|
||||
const [stopTarget, setStopTarget] = useState<
|
||||
"single" | "selected" | "all"
|
||||
>("single");
|
||||
const [singleStopId, setSingleStopId] = useState<string | null>(null);
|
||||
const [total, setTotal] = useState(0);
|
||||
const [currentPage, setCurrentPage] = useState(1);
|
||||
const [pageSize] = useState(10);
|
||||
|
||||
const fetchExecutions = async () => {
|
||||
setIsLoading(true);
|
||||
try {
|
||||
const offset = (currentPage - 1) * pageSize;
|
||||
const response = await fetch(
|
||||
`${apiUrl}/admin/diagnostics/executions/running?limit=${pageSize}&offset=${offset}`,
|
||||
{
|
||||
credentials: "include",
|
||||
}
|
||||
);
|
||||
|
||||
if (!response.ok) {
|
||||
throw new Error("Failed to fetch executions");
|
||||
}
|
||||
|
||||
const data = await response.json();
|
||||
setExecutions(data.executions || []);
|
||||
setTotal(data.total || 0);
|
||||
} catch (error) {
|
||||
console.error("Error fetching executions:", error);
|
||||
toast({
|
||||
title: "Error",
|
||||
description: "Failed to fetch running executions",
|
||||
variant: "destructive",
|
||||
});
|
||||
} finally {
|
||||
setIsLoading(false);
|
||||
}
|
||||
};
|
||||
|
||||
useEffect(() => {
|
||||
fetchExecutions();
|
||||
}, [currentPage]);
|
||||
|
||||
const handleSelectAll = (checked: boolean) => {
|
||||
if (checked) {
|
||||
setSelectedIds(new Set(executions.map((e) => e.execution_id)));
|
||||
} else {
|
||||
setSelectedIds(new Set());
|
||||
}
|
||||
};
|
||||
|
||||
const handleSelectExecution = (id: string, checked: boolean) => {
|
||||
const newSelected = new Set(selectedIds);
|
||||
if (checked) {
|
||||
newSelected.add(id);
|
||||
} else {
|
||||
newSelected.delete(id);
|
||||
}
|
||||
setSelectedIds(newSelected);
|
||||
};
|
||||
|
||||
const confirmStop = (
|
||||
target: "single" | "selected" | "all",
|
||||
singleId?: string
|
||||
) => {
|
||||
setStopTarget(target);
|
||||
setSingleStopId(singleId || null);
|
||||
setShowStopDialog(true);
|
||||
};
|
||||
|
||||
const handleStop = async () => {
|
||||
setShowStopDialog(false);
|
||||
setIsStopping(true);
|
||||
|
||||
let endpoint: string;
|
||||
let body: any;
|
||||
|
||||
if (stopTarget === "single" && singleStopId) {
|
||||
endpoint = `${apiUrl}/admin/diagnostics/executions/stop`;
|
||||
body = { execution_id: singleStopId };
|
||||
} else {
|
||||
let idsToStop: string[] = [];
|
||||
if (stopTarget === "selected") {
|
||||
idsToStop = Array.from(selectedIds);
|
||||
} else if (stopTarget === "all") {
|
||||
idsToStop = executions.map((e) => e.execution_id);
|
||||
}
|
||||
|
||||
endpoint = `${apiUrl}/admin/diagnostics/executions/stop-bulk`;
|
||||
body = { execution_ids: idsToStop };
|
||||
}
|
||||
|
||||
try {
|
||||
const response = await fetch(endpoint, {
|
||||
method: "POST",
|
||||
headers: {
|
||||
"Content-Type": "application/json",
|
||||
},
|
||||
credentials: "include",
|
||||
body: JSON.stringify(body),
|
||||
});
|
||||
|
||||
const data = await response.json();
|
||||
|
||||
if (!response.ok) {
|
||||
throw new Error(data.detail || "Failed to stop executions");
|
||||
}
|
||||
|
||||
toast({
|
||||
title: "Success",
|
||||
description: data.message || "Executions stopped successfully",
|
||||
});
|
||||
|
||||
// Clear selections and refresh
|
||||
setSelectedIds(new Set());
|
||||
await fetchExecutions();
|
||||
if (onRefresh) {
|
||||
onRefresh();
|
||||
}
|
||||
} catch (error: any) {
|
||||
console.error("Error stopping executions:", error);
|
||||
toast({
|
||||
title: "Error",
|
||||
description: error.message || "Failed to stop executions",
|
||||
variant: "destructive",
|
||||
});
|
||||
} finally {
|
||||
setIsStopping(false);
|
||||
}
|
||||
};
|
||||
|
||||
const totalPages = Math.ceil(total / pageSize);
|
||||
|
||||
return (
|
||||
<>
|
||||
<Card>
|
||||
<CardHeader>
|
||||
<div className="flex items-center justify-between">
|
||||
<CardTitle>Running Executions</CardTitle>
|
||||
<div className="flex gap-2">
|
||||
{selectedIds.size > 0 && (
|
||||
<Button
|
||||
variant="destructive"
|
||||
size="sm"
|
||||
onClick={() => confirmStop("selected")}
|
||||
disabled={isStopping}
|
||||
>
|
||||
<StopCircle className="mr-2 h-4 w-4" />
|
||||
Stop Selected ({selectedIds.size})
|
||||
</Button>
|
||||
)}
|
||||
<Button
|
||||
variant="destructive"
|
||||
size="sm"
|
||||
onClick={() => confirmStop("all")}
|
||||
disabled={isStopping || executions.length === 0}
|
||||
>
|
||||
<StopCircle className="mr-2 h-4 w-4" />
|
||||
Stop All
|
||||
</Button>
|
||||
<Button
|
||||
variant="outline"
|
||||
size="sm"
|
||||
onClick={() => {
|
||||
fetchExecutions();
|
||||
if (onRefresh) onRefresh();
|
||||
}}
|
||||
disabled={isLoading}
|
||||
>
|
||||
<ArrowClockwise
|
||||
className={`h-4 w-4 ${isLoading ? "animate-spin" : ""}`}
|
||||
/>
|
||||
</Button>
|
||||
</div>
|
||||
</div>
|
||||
</CardHeader>
|
||||
<CardContent>
|
||||
{isLoading && executions.length === 0 ? (
|
||||
<div className="flex h-32 items-center justify-center">
|
||||
<ArrowClockwise className="h-6 w-6 animate-spin text-gray-400" />
|
||||
</div>
|
||||
) : executions.length === 0 ? (
|
||||
<div className="py-8 text-center text-gray-500">
|
||||
No running executions
|
||||
</div>
|
||||
) : (
|
||||
<>
|
||||
<Table>
|
||||
<TableHeader>
|
||||
<TableRow>
|
||||
<TableHead className="w-12">
|
||||
<Checkbox
|
||||
checked={
|
||||
selectedIds.size === executions.length &&
|
||||
executions.length > 0
|
||||
}
|
||||
onCheckedChange={handleSelectAll}
|
||||
/>
|
||||
</TableHead>
|
||||
<TableHead>Execution ID</TableHead>
|
||||
<TableHead>Agent Name</TableHead>
|
||||
<TableHead>Version</TableHead>
|
||||
<TableHead>User</TableHead>
|
||||
<TableHead>Status</TableHead>
|
||||
<TableHead>Started At</TableHead>
|
||||
<TableHead className="w-20">Actions</TableHead>
|
||||
</TableRow>
|
||||
</TableHeader>
|
||||
<TableBody>
|
||||
{executions.map((execution) => (
|
||||
<TableRow key={execution.execution_id}>
|
||||
<TableCell>
|
||||
<Checkbox
|
||||
checked={selectedIds.has(execution.execution_id)}
|
||||
onCheckedChange={(checked) =>
|
||||
handleSelectExecution(
|
||||
execution.execution_id,
|
||||
checked as boolean
|
||||
)
|
||||
}
|
||||
/>
|
||||
</TableCell>
|
||||
<TableCell className="font-mono text-xs">
|
||||
{execution.execution_id.substring(0, 8)}...
|
||||
</TableCell>
|
||||
<TableCell>{execution.graph_name}</TableCell>
|
||||
<TableCell>{execution.graph_version}</TableCell>
|
||||
<TableCell>
|
||||
<div>
|
||||
{execution.user_email || (
|
||||
<span className="text-gray-400">Unknown</span>
|
||||
)}
|
||||
</div>
|
||||
<div className="font-mono text-xs text-gray-500">
|
||||
{execution.user_id.substring(0, 8)}...
|
||||
</div>
|
||||
</TableCell>
|
||||
<TableCell>
|
||||
<span
|
||||
className={`inline-flex rounded-full px-2 py-1 text-xs font-semibold ${
|
||||
execution.status === "RUNNING"
|
||||
? "bg-green-100 text-green-800"
|
||||
: "bg-yellow-100 text-yellow-800"
|
||||
}`}
|
||||
>
|
||||
{execution.status}
|
||||
</span>
|
||||
</TableCell>
|
||||
<TableCell>
|
||||
{execution.started_at
|
||||
? new Date(execution.started_at).toLocaleString()
|
||||
: "-"}
|
||||
</TableCell>
|
||||
<TableCell>
|
||||
<Button
|
||||
variant="ghost"
|
||||
size="sm"
|
||||
onClick={() => confirmStop("single", execution.execution_id)}
|
||||
disabled={isStopping}
|
||||
>
|
||||
<Stop className="h-4 w-4" />
|
||||
</Button>
|
||||
</TableCell>
|
||||
</TableRow>
|
||||
))}
|
||||
</TableBody>
|
||||
</Table>
|
||||
|
||||
{totalPages > 1 && (
|
||||
<div className="mt-4 flex items-center justify-between">
|
||||
<div className="text-sm text-gray-600">
|
||||
Showing {(currentPage - 1) * pageSize + 1} to{" "}
|
||||
{Math.min(currentPage * pageSize, total)} of {total}{" "}
|
||||
executions
|
||||
</div>
|
||||
<div className="flex gap-2">
|
||||
<Button
|
||||
variant="outline"
|
||||
size="sm"
|
||||
onClick={() => setCurrentPage(currentPage - 1)}
|
||||
disabled={currentPage === 1}
|
||||
>
|
||||
<CaretLeft className="h-4 w-4" />
|
||||
Previous
|
||||
</Button>
|
||||
<div className="flex items-center px-3">
|
||||
Page {currentPage} of {totalPages}
|
||||
</div>
|
||||
<Button
|
||||
variant="outline"
|
||||
size="sm"
|
||||
onClick={() => setCurrentPage(currentPage + 1)}
|
||||
disabled={currentPage === totalPages}
|
||||
>
|
||||
Next
|
||||
<CaretRight className="h-4 w-4" />
|
||||
</Button>
|
||||
</div>
|
||||
</div>
|
||||
)}
|
||||
</>
|
||||
)}
|
||||
</CardContent>
|
||||
</Card>
|
||||
|
||||
<AlertDialog open={showStopDialog} onOpenChange={setShowStopDialog}>
|
||||
<AlertDialogContent>
|
||||
<AlertDialogHeader>
|
||||
<AlertDialogTitle>Confirm Stop Executions</AlertDialogTitle>
|
||||
<AlertDialogDescription>
|
||||
{stopTarget === "single" && (
|
||||
<>Are you sure you want to stop this execution?</>
|
||||
)}
|
||||
{stopTarget === "selected" && (
|
||||
<>
|
||||
Are you sure you want to stop {selectedIds.size} selected
|
||||
execution(s)?
|
||||
</>
|
||||
)}
|
||||
{stopTarget === "all" && (
|
||||
<>
|
||||
Are you sure you want to stop ALL {executions.length} running
|
||||
executions?
|
||||
</>
|
||||
)}
|
||||
<br />
|
||||
<br />
|
||||
This action cannot be undone. The executions will be marked as
|
||||
FAILED.
|
||||
</AlertDialogDescription>
|
||||
</AlertDialogHeader>
|
||||
<AlertDialogFooter>
|
||||
<AlertDialogCancel>Cancel</AlertDialogCancel>
|
||||
<AlertDialogAction
|
||||
onClick={handleStop}
|
||||
className="bg-red-600 hover:bg-red-700"
|
||||
>
|
||||
Stop Executions
|
||||
</AlertDialogAction>
|
||||
</AlertDialogFooter>
|
||||
</AlertDialogContent>
|
||||
</AlertDialog>
|
||||
</>
|
||||
);
|
||||
}
|
||||
Reference in New Issue
Block a user