mirror of
https://github.com/Significant-Gravitas/AutoGPT.git
synced 2026-02-08 22:05:08 -05:00
Compare commits
23 Commits
fix/execut
...
claude/adm
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
81a8e6f558 | ||
|
|
265295606b | ||
|
|
6bf1ef6c0d | ||
|
|
91ee306b0e | ||
|
|
2e16ef2272 | ||
|
|
98cb639ab3 | ||
|
|
d0102f4e1f | ||
|
|
4950da2092 | ||
|
|
96e40daf80 | ||
|
|
f7b332a435 | ||
|
|
4e6fd3f68f | ||
|
|
43dedd8c42 | ||
|
|
c1c371bcf3 | ||
|
|
6a72440005 | ||
|
|
1403c8f2de | ||
|
|
6068ed3516 | ||
|
|
53a6de9fdb | ||
|
|
cdd501c031 | ||
|
|
86b9ccfe5e | ||
|
|
cc1a2cd829 | ||
|
|
3a8cbe3eb4 | ||
|
|
10c3b80cbd | ||
|
|
632085528a |
1286
autogpt_platform/backend/backend/data/diagnostics.py
Normal file
1286
autogpt_platform/backend/backend/data/diagnostics.py
Normal file
File diff suppressed because it is too large
Load Diff
@@ -27,6 +27,7 @@ from prisma.models import (
|
||||
AgentNodeExecutionKeyValueData,
|
||||
)
|
||||
from prisma.types import (
|
||||
AgentGraphExecutionOrderByInput,
|
||||
AgentGraphExecutionUpdateManyMutationInput,
|
||||
AgentGraphExecutionWhereInput,
|
||||
AgentNodeExecutionCreateInput,
|
||||
@@ -459,20 +460,39 @@ class NodeExecutionResult(BaseModel):
|
||||
|
||||
async def get_graph_executions(
|
||||
graph_exec_id: Optional[str] = None,
|
||||
execution_ids: Optional[list[str]] = None,
|
||||
graph_id: Optional[str] = None,
|
||||
graph_version: Optional[int] = None,
|
||||
user_id: Optional[str] = None,
|
||||
statuses: Optional[list[ExecutionStatus]] = None,
|
||||
created_time_gte: Optional[datetime] = None,
|
||||
created_time_lte: Optional[datetime] = None,
|
||||
started_time_gte: Optional[datetime] = None,
|
||||
started_time_lte: Optional[datetime] = None,
|
||||
limit: Optional[int] = None,
|
||||
offset: Optional[int] = None,
|
||||
order_by: Literal["createdAt", "startedAt", "updatedAt"] = "createdAt",
|
||||
order_direction: Literal["asc", "desc"] = "desc",
|
||||
) -> list[GraphExecutionMeta]:
|
||||
"""⚠️ **Optional `user_id` check**: MUST USE check in user-facing endpoints."""
|
||||
"""
|
||||
Get graph executions with optional filters and ordering.
|
||||
|
||||
⚠️ **Optional `user_id` check**: MUST USE check in user-facing endpoints.
|
||||
|
||||
Args:
|
||||
graph_exec_id: Filter by single execution ID (mutually exclusive with execution_ids)
|
||||
execution_ids: Filter by list of execution IDs (mutually exclusive with graph_exec_id)
|
||||
order_by: Field to order by. Defaults to "createdAt"
|
||||
order_direction: Sort direction. Defaults to "desc"
|
||||
"""
|
||||
where_filter: AgentGraphExecutionWhereInput = {
|
||||
"isDeleted": False,
|
||||
}
|
||||
if graph_exec_id:
|
||||
where_filter["id"] = graph_exec_id
|
||||
elif execution_ids:
|
||||
where_filter["id"] = {"in": execution_ids}
|
||||
|
||||
if user_id:
|
||||
where_filter["userId"] = user_id
|
||||
if graph_id:
|
||||
@@ -484,13 +504,36 @@ async def get_graph_executions(
|
||||
"gte": created_time_gte or datetime.min.replace(tzinfo=timezone.utc),
|
||||
"lte": created_time_lte or datetime.max.replace(tzinfo=timezone.utc),
|
||||
}
|
||||
if started_time_gte or started_time_lte:
|
||||
where_filter["startedAt"] = {
|
||||
"gte": started_time_gte or datetime.min.replace(tzinfo=timezone.utc),
|
||||
"lte": started_time_lte or datetime.max.replace(tzinfo=timezone.utc),
|
||||
}
|
||||
if statuses:
|
||||
where_filter["OR"] = [{"executionStatus": status} for status in statuses]
|
||||
|
||||
# Build properly typed order clause
|
||||
# Prisma wants specific typed dicts for each field, so we construct them explicitly
|
||||
order_clause: AgentGraphExecutionOrderByInput
|
||||
match (order_by):
|
||||
case "startedAt":
|
||||
order_clause = {
|
||||
"startedAt": order_direction,
|
||||
}
|
||||
case "updatedAt":
|
||||
order_clause = {
|
||||
"updatedAt": order_direction,
|
||||
}
|
||||
case _:
|
||||
order_clause = {
|
||||
"createdAt": order_direction,
|
||||
}
|
||||
|
||||
executions = await AgentGraphExecution.prisma().find_many(
|
||||
where=where_filter,
|
||||
order={"createdAt": "desc"},
|
||||
order=order_clause,
|
||||
take=limit,
|
||||
skip=offset,
|
||||
)
|
||||
return [GraphExecutionMeta.from_db(execution) for execution in executions]
|
||||
|
||||
@@ -501,6 +544,10 @@ async def get_graph_executions_count(
|
||||
statuses: Optional[list[ExecutionStatus]] = None,
|
||||
created_time_gte: Optional[datetime] = None,
|
||||
created_time_lte: Optional[datetime] = None,
|
||||
started_time_gte: Optional[datetime] = None,
|
||||
started_time_lte: Optional[datetime] = None,
|
||||
updated_time_gte: Optional[datetime] = None,
|
||||
updated_time_lte: Optional[datetime] = None,
|
||||
) -> int:
|
||||
"""
|
||||
Get count of graph executions with optional filters.
|
||||
@@ -511,6 +558,10 @@ async def get_graph_executions_count(
|
||||
statuses: Optional list of execution statuses to filter by
|
||||
created_time_gte: Optional minimum creation time
|
||||
created_time_lte: Optional maximum creation time
|
||||
started_time_gte: Optional minimum start time (when execution started running)
|
||||
started_time_lte: Optional maximum start time (when execution started running)
|
||||
updated_time_gte: Optional minimum update time
|
||||
updated_time_lte: Optional maximum update time
|
||||
|
||||
Returns:
|
||||
Count of matching graph executions
|
||||
@@ -530,6 +581,19 @@ async def get_graph_executions_count(
|
||||
"gte": created_time_gte or datetime.min.replace(tzinfo=timezone.utc),
|
||||
"lte": created_time_lte or datetime.max.replace(tzinfo=timezone.utc),
|
||||
}
|
||||
|
||||
if started_time_gte or started_time_lte:
|
||||
where_filter["startedAt"] = {
|
||||
"gte": started_time_gte or datetime.min.replace(tzinfo=timezone.utc),
|
||||
"lte": started_time_lte or datetime.max.replace(tzinfo=timezone.utc),
|
||||
}
|
||||
|
||||
if updated_time_gte or updated_time_lte:
|
||||
where_filter["updatedAt"] = {
|
||||
"gte": updated_time_gte or datetime.min.replace(tzinfo=timezone.utc),
|
||||
"lte": updated_time_lte or datetime.max.replace(tzinfo=timezone.utc),
|
||||
}
|
||||
|
||||
if statuses:
|
||||
where_filter["OR"] = [{"executionStatus": status} for status in statuses]
|
||||
|
||||
|
||||
@@ -9,6 +9,7 @@ from backend.data.execution import (
|
||||
get_block_error_stats,
|
||||
get_child_graph_executions,
|
||||
get_execution_kv_data,
|
||||
get_graph_execution,
|
||||
get_graph_execution_meta,
|
||||
get_graph_executions,
|
||||
get_graph_executions_count,
|
||||
@@ -125,6 +126,7 @@ class DatabaseManager(AppService):
|
||||
get_child_graph_executions = _(get_child_graph_executions)
|
||||
get_graph_executions = _(get_graph_executions)
|
||||
get_graph_executions_count = _(get_graph_executions_count)
|
||||
get_graph_execution = _(get_graph_execution)
|
||||
get_graph_execution_meta = _(get_graph_execution_meta)
|
||||
create_graph_execution = _(create_graph_execution)
|
||||
get_node_execution = _(get_node_execution)
|
||||
@@ -198,6 +200,7 @@ class DatabaseManagerClient(AppServiceClient):
|
||||
# Executions
|
||||
get_graph_executions = _(d.get_graph_executions)
|
||||
get_graph_executions_count = _(d.get_graph_executions_count)
|
||||
get_graph_execution = _(d.get_graph_execution)
|
||||
get_graph_execution_meta = _(d.get_graph_execution_meta)
|
||||
get_node_executions = _(d.get_node_executions)
|
||||
update_node_execution_status = _(d.update_node_execution_status)
|
||||
@@ -241,6 +244,7 @@ class DatabaseManagerAsyncClient(AppServiceClient):
|
||||
get_latest_node_execution = d.get_latest_node_execution
|
||||
get_graph = d.get_graph
|
||||
get_graph_metadata = d.get_graph_metadata
|
||||
get_graph_execution = d.get_graph_execution
|
||||
get_graph_execution_meta = d.get_graph_execution_meta
|
||||
get_node = d.get_node
|
||||
get_node_execution = d.get_node_execution
|
||||
|
||||
@@ -764,10 +764,15 @@ async def add_graph_execution(
|
||||
nodes_input_masks: Optional[NodesInputMasks] = None,
|
||||
parent_graph_exec_id: Optional[str] = None,
|
||||
is_sub_graph: bool = False,
|
||||
graph_exec_id: Optional[str] = None,
|
||||
) -> GraphExecutionWithNodes:
|
||||
"""
|
||||
Adds a graph execution to the queue and returns the execution entry.
|
||||
|
||||
Supports two modes:
|
||||
1. CREATE mode (graph_exec_id=None): Validates, creates new DB entry, and queues
|
||||
2. REQUEUE mode (graph_exec_id provided): Fetches existing execution and re-queues it
|
||||
|
||||
Args:
|
||||
graph_id: The ID of the graph to execute.
|
||||
user_id: The ID of the user executing the graph.
|
||||
@@ -779,8 +784,9 @@ async def add_graph_execution(
|
||||
nodes_input_masks: Node inputs to use in the execution.
|
||||
parent_graph_exec_id: The ID of the parent graph execution (for nested executions).
|
||||
is_sub_graph: Whether this is a sub-graph execution.
|
||||
graph_exec_id: Optional existing execution ID to requeue (skips creation).
|
||||
Returns:
|
||||
GraphExecutionEntry: The entry for the graph execution.
|
||||
GraphExecutionWithNodes: The execution entry.
|
||||
Raises:
|
||||
ValueError: If the graph is not found or if there are validation errors.
|
||||
"""
|
||||
@@ -789,44 +795,80 @@ async def add_graph_execution(
|
||||
else:
|
||||
edb = get_database_manager_async_client()
|
||||
|
||||
graph, starting_nodes_input, compiled_nodes_input_masks = (
|
||||
await validate_and_construct_node_execution_input(
|
||||
graph_id=graph_id,
|
||||
user_id=user_id,
|
||||
graph_inputs=inputs or {},
|
||||
graph_version=graph_version,
|
||||
graph_credentials_inputs=graph_credentials_inputs,
|
||||
nodes_input_masks=nodes_input_masks,
|
||||
is_sub_graph=is_sub_graph,
|
||||
)
|
||||
)
|
||||
graph_exec = None
|
||||
compiled_nodes_input_masks = None
|
||||
|
||||
try:
|
||||
# Sanity check: running add_graph_execution with the properties of
|
||||
# the graph_exec created here should create the same execution again.
|
||||
graph_exec = await edb.create_graph_execution(
|
||||
user_id=user_id,
|
||||
graph_id=graph_id,
|
||||
graph_version=graph.version,
|
||||
inputs=inputs or {},
|
||||
credential_inputs=graph_credentials_inputs,
|
||||
nodes_input_masks=nodes_input_masks,
|
||||
starting_nodes_input=starting_nodes_input,
|
||||
preset_id=preset_id,
|
||||
parent_graph_exec_id=parent_graph_exec_id,
|
||||
)
|
||||
if graph_exec_id:
|
||||
# REQUEUE mode: Fetch existing execution instead of creating new one
|
||||
logger.info(f"Requeueing existing execution {graph_exec_id}")
|
||||
|
||||
graph_exec_meta = await edb.get_graph_execution_meta(
|
||||
user_id=user_id,
|
||||
execution_id=graph_exec_id,
|
||||
)
|
||||
|
||||
if not graph_exec_meta:
|
||||
raise ValueError(f"Execution {graph_exec_id} not found")
|
||||
|
||||
if graph_exec_meta.status != ExecutionStatus.QUEUED:
|
||||
raise ValueError(
|
||||
f"Can only requeue QUEUED executions, got {graph_exec_meta.status}"
|
||||
)
|
||||
|
||||
# Fetch full execution with nodes for publishing
|
||||
graph_exec = await edb.get_graph_execution(
|
||||
user_id=user_id,
|
||||
execution_id=graph_exec_id,
|
||||
include_node_executions=True,
|
||||
)
|
||||
|
||||
if not graph_exec:
|
||||
raise ValueError(f"Execution {graph_exec_id} not found")
|
||||
|
||||
# Use existing execution's parameters
|
||||
compiled_nodes_input_masks = graph_exec.nodes_input_masks
|
||||
|
||||
else:
|
||||
# CREATE mode: Validate and create new execution
|
||||
graph, starting_nodes_input, compiled_nodes_input_masks = (
|
||||
await validate_and_construct_node_execution_input(
|
||||
graph_id=graph_id,
|
||||
user_id=user_id,
|
||||
graph_inputs=inputs or {},
|
||||
graph_version=graph_version,
|
||||
graph_credentials_inputs=graph_credentials_inputs,
|
||||
nodes_input_masks=nodes_input_masks,
|
||||
is_sub_graph=is_sub_graph,
|
||||
)
|
||||
)
|
||||
|
||||
# Sanity check: running add_graph_execution with the properties of
|
||||
# the graph_exec created here should create the same execution again.
|
||||
graph_exec = await edb.create_graph_execution(
|
||||
user_id=user_id,
|
||||
graph_id=graph_id,
|
||||
graph_version=graph.version,
|
||||
inputs=inputs or {},
|
||||
credential_inputs=graph_credentials_inputs,
|
||||
nodes_input_masks=nodes_input_masks,
|
||||
starting_nodes_input=starting_nodes_input,
|
||||
preset_id=preset_id,
|
||||
parent_graph_exec_id=parent_graph_exec_id,
|
||||
)
|
||||
|
||||
logger.info(
|
||||
f"Created graph execution #{graph_exec.id} for graph "
|
||||
f"#{graph_id} with {len(starting_nodes_input)} starting nodes. "
|
||||
f"Now publishing to execution queue."
|
||||
)
|
||||
|
||||
# Common: Publish to execution queue (works for both create and requeue)
|
||||
graph_exec_entry = graph_exec.to_graph_execution_entry(
|
||||
user_context=await get_user_context(user_id),
|
||||
compiled_nodes_input_masks=compiled_nodes_input_masks,
|
||||
parent_graph_exec_id=parent_graph_exec_id,
|
||||
)
|
||||
logger.info(
|
||||
f"Created graph execution #{graph_exec.id} for graph "
|
||||
f"#{graph_id} with {len(starting_nodes_input)} starting nodes. "
|
||||
f"Now publishing to execution queue."
|
||||
)
|
||||
|
||||
exec_queue = await get_async_execution_queue()
|
||||
await exec_queue.publish_message(
|
||||
@@ -836,6 +878,7 @@ async def add_graph_execution(
|
||||
)
|
||||
logger.info(f"Published execution {graph_exec.id} to RabbitMQ queue")
|
||||
|
||||
# Common: Update status to QUEUED
|
||||
graph_exec.status = ExecutionStatus.QUEUED
|
||||
await edb.update_graph_execution_stats(
|
||||
graph_exec_id=graph_exec.id,
|
||||
|
||||
@@ -451,3 +451,144 @@ async def test_add_graph_execution_is_repeatable(mocker: MockerFixture):
|
||||
# Both executions should succeed (though they create different objects)
|
||||
assert result1 == mock_graph_exec
|
||||
assert result2 == mock_graph_exec_2
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_add_graph_execution_requeue_mode(mocker: MockerFixture):
|
||||
"""Test that add_graph_execution with graph_exec_id requeues instead of creating"""
|
||||
from backend.data.execution import ExecutionStatus, GraphExecutionWithNodes
|
||||
from backend.executor.utils import add_graph_execution
|
||||
|
||||
existing_exec_id = "existing-exec-123"
|
||||
graph_id = "graph-456"
|
||||
user_id = "user-789"
|
||||
graph_version = 1
|
||||
|
||||
# Mock existing execution (QUEUED, ready to requeue)
|
||||
mock_existing_exec_meta = mocker.MagicMock()
|
||||
mock_existing_exec_meta.id = existing_exec_id
|
||||
mock_existing_exec_meta.user_id = user_id
|
||||
mock_existing_exec_meta.graph_id = graph_id
|
||||
mock_existing_exec_meta.graph_version = graph_version
|
||||
mock_existing_exec_meta.status = ExecutionStatus.QUEUED
|
||||
|
||||
mock_existing_exec_full = mocker.MagicMock(spec=GraphExecutionWithNodes)
|
||||
mock_existing_exec_full.id = existing_exec_id
|
||||
mock_existing_exec_full.user_id = user_id
|
||||
mock_existing_exec_full.graph_id = graph_id
|
||||
mock_existing_exec_full.graph_version = graph_version
|
||||
mock_existing_exec_full.status = ExecutionStatus.QUEUED
|
||||
mock_existing_exec_full.nodes_input_masks = {"node1": {"input1": "value1"}}
|
||||
mock_existing_exec_full.node_executions = []
|
||||
mock_existing_exec_full.to_graph_execution_entry.return_value = mocker.MagicMock()
|
||||
mock_existing_exec_full.to_graph_execution_entry.return_value.model_dump_json.return_value = (
|
||||
"{}"
|
||||
)
|
||||
|
||||
# Mock database manager
|
||||
mock_edb = mocker.patch("backend.executor.utils.execution_db")
|
||||
mock_edb.get_graph_execution_meta = mocker.AsyncMock(
|
||||
return_value=mock_existing_exec_meta
|
||||
)
|
||||
mock_edb.get_graph_execution = mocker.AsyncMock(
|
||||
return_value=mock_existing_exec_full
|
||||
)
|
||||
mock_edb.create_graph_execution = mocker.AsyncMock() # Should NOT be called
|
||||
mock_edb.update_graph_execution_stats = mocker.AsyncMock()
|
||||
|
||||
# Mock prisma
|
||||
mock_prisma = mocker.patch("backend.executor.utils.prisma")
|
||||
mock_prisma.is_connected.return_value = True
|
||||
|
||||
# Mock queue
|
||||
mock_queue = mocker.AsyncMock()
|
||||
mock_queue.publish_message = mocker.AsyncMock()
|
||||
|
||||
# Mock event bus
|
||||
mock_event_bus = mocker.MagicMock()
|
||||
mock_event_bus.publish = mocker.AsyncMock()
|
||||
|
||||
# Mock user context
|
||||
mock_user_context = mocker.MagicMock()
|
||||
|
||||
mocker.patch(
|
||||
"backend.executor.utils.get_async_execution_queue", return_value=mock_queue
|
||||
)
|
||||
mocker.patch(
|
||||
"backend.executor.utils.get_async_execution_event_bus",
|
||||
return_value=mock_event_bus,
|
||||
)
|
||||
mocker.patch(
|
||||
"backend.executor.utils.get_user_context", return_value=mock_user_context
|
||||
)
|
||||
|
||||
# Call add_graph_execution in REQUEUE mode
|
||||
await add_graph_execution(
|
||||
graph_id=graph_id,
|
||||
user_id=user_id,
|
||||
graph_version=graph_version,
|
||||
graph_exec_id=existing_exec_id, # This triggers REQUEUE mode
|
||||
)
|
||||
|
||||
# Verify: Should NOT create new execution
|
||||
mock_edb.create_graph_execution.assert_not_called()
|
||||
|
||||
# Verify: Should fetch existing execution
|
||||
mock_edb.get_graph_execution_meta.assert_called_once()
|
||||
mock_edb.get_graph_execution.assert_called_once()
|
||||
|
||||
# Verify: Should publish to queue (same as create mode)
|
||||
mock_queue.publish_message.assert_called_once()
|
||||
|
||||
# Verify: Should update status to QUEUED
|
||||
mock_edb.update_graph_execution_stats.assert_called_once()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_add_graph_execution_requeue_fails_if_not_queued(mocker: MockerFixture):
|
||||
"""Test that requeue mode fails if execution is not in QUEUED status"""
|
||||
from backend.data.execution import ExecutionStatus
|
||||
from backend.executor.utils import add_graph_execution
|
||||
|
||||
# Mock execution that's RUNNING (not QUEUED)
|
||||
mock_exec_meta = mocker.MagicMock()
|
||||
mock_exec_meta.id = "exec-running-123"
|
||||
mock_exec_meta.user_id = "user-123"
|
||||
mock_exec_meta.graph_id = "graph-456"
|
||||
mock_exec_meta.graph_version = 1
|
||||
mock_exec_meta.status = ExecutionStatus.RUNNING # Wrong status!
|
||||
|
||||
mock_edb = mocker.patch("backend.executor.utils.execution_db")
|
||||
mock_edb.get_graph_execution_meta = mocker.AsyncMock(return_value=mock_exec_meta)
|
||||
|
||||
mock_prisma = mocker.patch("backend.executor.utils.prisma")
|
||||
mock_prisma.is_connected.return_value = True
|
||||
|
||||
# Should raise ValueError
|
||||
with pytest.raises(ValueError, match="Can only requeue QUEUED executions"):
|
||||
await add_graph_execution(
|
||||
graph_id="graph-456",
|
||||
user_id="user-123",
|
||||
graph_exec_id="exec-running-123", # Requeue mode
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_add_graph_execution_requeue_fails_if_not_found(mocker: MockerFixture):
|
||||
"""Test that requeue mode fails if execution doesn't exist"""
|
||||
from backend.executor.utils import add_graph_execution
|
||||
|
||||
# Mock execution not found
|
||||
mock_edb = mocker.patch("backend.executor.utils.execution_db")
|
||||
mock_edb.get_graph_execution_meta = mocker.AsyncMock(return_value=None)
|
||||
|
||||
mock_prisma = mocker.patch("backend.executor.utils.prisma")
|
||||
mock_prisma.is_connected.return_value = True
|
||||
|
||||
# Should raise ValueError
|
||||
with pytest.raises(ValueError, match="Execution .* not found"):
|
||||
await add_graph_execution(
|
||||
graph_id="graph-456",
|
||||
user_id="user-123",
|
||||
graph_exec_id="nonexistent-exec", # Requeue mode
|
||||
)
|
||||
|
||||
@@ -24,6 +24,7 @@ import backend.integrations.webhooks.utils
|
||||
import backend.server.routers.postmark.postmark
|
||||
import backend.server.routers.v1
|
||||
import backend.server.v2.admin.credit_admin_routes
|
||||
import backend.server.v2.admin.diagnostics_admin_routes
|
||||
import backend.server.v2.admin.execution_analytics_routes
|
||||
import backend.server.v2.admin.store_admin_routes
|
||||
import backend.server.v2.builder
|
||||
@@ -269,6 +270,11 @@ app.include_router(
|
||||
tags=["v2", "admin"],
|
||||
prefix="/api/credits",
|
||||
)
|
||||
app.include_router(
|
||||
backend.server.v2.admin.diagnostics_admin_routes.router,
|
||||
tags=["v2", "admin"],
|
||||
prefix="/api",
|
||||
)
|
||||
app.include_router(
|
||||
backend.server.v2.admin.execution_analytics_routes.router,
|
||||
tags=["v2", "admin"],
|
||||
|
||||
@@ -0,0 +1,926 @@
|
||||
import asyncio
|
||||
import logging
|
||||
from typing import List
|
||||
|
||||
from autogpt_libs.auth import requires_admin_user
|
||||
from autogpt_libs.auth.models import User as AuthUser
|
||||
from fastapi import APIRouter, HTTPException, Security
|
||||
from prisma.enums import AgentExecutionStatus
|
||||
from pydantic import BaseModel
|
||||
|
||||
from backend.data.diagnostics import (
|
||||
FailedExecutionDetail,
|
||||
OrphanedScheduleDetail,
|
||||
RunningExecutionDetail,
|
||||
ScheduleDetail,
|
||||
ScheduleHealthMetrics,
|
||||
cleanup_all_stuck_queued_executions,
|
||||
cleanup_orphaned_executions_bulk,
|
||||
cleanup_orphaned_schedules_bulk,
|
||||
get_agent_diagnostics,
|
||||
get_all_orphaned_execution_ids,
|
||||
get_all_schedules_details,
|
||||
get_all_stuck_queued_execution_ids,
|
||||
get_execution_diagnostics,
|
||||
get_failed_executions_count,
|
||||
get_failed_executions_details,
|
||||
get_invalid_executions_details,
|
||||
get_long_running_executions_details,
|
||||
get_orphaned_executions_details,
|
||||
get_orphaned_schedules_details,
|
||||
get_running_executions_details,
|
||||
get_schedule_health_metrics,
|
||||
get_stuck_queued_executions_details,
|
||||
stop_all_long_running_executions,
|
||||
)
|
||||
from backend.data.execution import get_graph_executions
|
||||
from backend.executor.utils import add_graph_execution, stop_graph_execution
|
||||
from backend.server.v2.admin.model import (
|
||||
AgentDiagnosticsResponse,
|
||||
ExecutionDiagnosticsResponse,
|
||||
)
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
router = APIRouter(
|
||||
prefix="/admin",
|
||||
tags=["diagnostics", "admin"],
|
||||
dependencies=[Security(requires_admin_user)],
|
||||
)
|
||||
|
||||
|
||||
class RunningExecutionsListResponse(BaseModel):
|
||||
"""Response model for list of running executions"""
|
||||
|
||||
executions: List[RunningExecutionDetail]
|
||||
total: int
|
||||
|
||||
|
||||
class FailedExecutionsListResponse(BaseModel):
|
||||
"""Response model for list of failed executions"""
|
||||
|
||||
executions: List[FailedExecutionDetail]
|
||||
total: int
|
||||
|
||||
|
||||
class StopExecutionRequest(BaseModel):
|
||||
"""Request model for stopping a single execution"""
|
||||
|
||||
execution_id: str
|
||||
|
||||
|
||||
class StopExecutionsRequest(BaseModel):
|
||||
"""Request model for stopping multiple executions"""
|
||||
|
||||
execution_ids: List[str]
|
||||
|
||||
|
||||
class StopExecutionResponse(BaseModel):
|
||||
"""Response model for stop execution operations"""
|
||||
|
||||
success: bool
|
||||
stopped_count: int = 0
|
||||
message: str
|
||||
|
||||
|
||||
class RequeueExecutionResponse(BaseModel):
|
||||
"""Response model for requeue execution operations"""
|
||||
|
||||
success: bool
|
||||
requeued_count: int = 0
|
||||
message: str
|
||||
|
||||
|
||||
@router.get(
|
||||
"/diagnostics/executions",
|
||||
response_model=ExecutionDiagnosticsResponse,
|
||||
summary="Get Execution Diagnostics",
|
||||
)
|
||||
async def get_execution_diagnostics_endpoint():
|
||||
"""
|
||||
Get comprehensive diagnostic information about execution status.
|
||||
|
||||
Returns all execution metrics including:
|
||||
- Current state (running, queued)
|
||||
- Orphaned executions (>24h old, likely not in executor)
|
||||
- Failure metrics (1h, 24h, rate)
|
||||
- Long-running detection (stuck >1h, >24h)
|
||||
- Stuck queued detection
|
||||
- Throughput metrics (completions/hour)
|
||||
- RabbitMQ queue depths
|
||||
"""
|
||||
logger.info("Getting execution diagnostics")
|
||||
|
||||
diagnostics = await get_execution_diagnostics()
|
||||
|
||||
response = ExecutionDiagnosticsResponse(
|
||||
running_executions=diagnostics.running_count,
|
||||
queued_executions_db=diagnostics.queued_db_count,
|
||||
queued_executions_rabbitmq=diagnostics.rabbitmq_queue_depth,
|
||||
cancel_queue_depth=diagnostics.cancel_queue_depth,
|
||||
orphaned_running=diagnostics.orphaned_running,
|
||||
orphaned_queued=diagnostics.orphaned_queued,
|
||||
failed_count_1h=diagnostics.failed_count_1h,
|
||||
failed_count_24h=diagnostics.failed_count_24h,
|
||||
failure_rate_24h=diagnostics.failure_rate_24h,
|
||||
stuck_running_24h=diagnostics.stuck_running_24h,
|
||||
stuck_running_1h=diagnostics.stuck_running_1h,
|
||||
oldest_running_hours=diagnostics.oldest_running_hours,
|
||||
stuck_queued_1h=diagnostics.stuck_queued_1h,
|
||||
queued_never_started=diagnostics.queued_never_started,
|
||||
invalid_queued_with_start=diagnostics.invalid_queued_with_start,
|
||||
invalid_running_without_start=diagnostics.invalid_running_without_start,
|
||||
completed_1h=diagnostics.completed_1h,
|
||||
completed_24h=diagnostics.completed_24h,
|
||||
throughput_per_hour=diagnostics.throughput_per_hour,
|
||||
timestamp=diagnostics.timestamp,
|
||||
)
|
||||
|
||||
logger.info(
|
||||
f"Execution diagnostics: running={diagnostics.running_count}, "
|
||||
f"queued_db={diagnostics.queued_db_count}, "
|
||||
f"orphaned={diagnostics.orphaned_running + diagnostics.orphaned_queued}, "
|
||||
f"failed_24h={diagnostics.failed_count_24h}"
|
||||
)
|
||||
|
||||
return response
|
||||
|
||||
|
||||
@router.get(
|
||||
"/diagnostics/agents",
|
||||
response_model=AgentDiagnosticsResponse,
|
||||
summary="Get Agent Diagnostics",
|
||||
)
|
||||
async def get_agent_diagnostics_endpoint():
|
||||
"""
|
||||
Get diagnostic information about agents.
|
||||
|
||||
Returns:
|
||||
- agents_with_active_executions: Number of unique agents with running/queued executions
|
||||
- timestamp: Current timestamp
|
||||
"""
|
||||
logger.info("Getting agent diagnostics")
|
||||
|
||||
diagnostics = await get_agent_diagnostics()
|
||||
|
||||
response = AgentDiagnosticsResponse(
|
||||
agents_with_active_executions=diagnostics.agents_with_active_executions,
|
||||
timestamp=diagnostics.timestamp,
|
||||
)
|
||||
|
||||
logger.info(
|
||||
f"Agent diagnostics: with_active_executions={diagnostics.agents_with_active_executions}"
|
||||
)
|
||||
|
||||
return response
|
||||
|
||||
|
||||
@router.get(
|
||||
"/diagnostics/executions/running",
|
||||
response_model=RunningExecutionsListResponse,
|
||||
summary="List Running Executions",
|
||||
)
|
||||
async def list_running_executions(
|
||||
limit: int = 100,
|
||||
offset: int = 0,
|
||||
):
|
||||
"""
|
||||
Get detailed list of running and queued executions (recent, likely active).
|
||||
|
||||
Args:
|
||||
limit: Maximum number of executions to return (default 100)
|
||||
offset: Number of executions to skip (default 0)
|
||||
|
||||
Returns:
|
||||
List of running executions with details
|
||||
"""
|
||||
logger.info(f"Listing running executions (limit={limit}, offset={offset})")
|
||||
|
||||
executions = await get_running_executions_details(limit=limit, offset=offset)
|
||||
|
||||
# Get total count for pagination
|
||||
diagnostics = await get_execution_diagnostics()
|
||||
total = diagnostics.running_count + diagnostics.queued_db_count
|
||||
|
||||
return RunningExecutionsListResponse(executions=executions, total=total)
|
||||
|
||||
|
||||
@router.get(
|
||||
"/diagnostics/executions/orphaned",
|
||||
response_model=RunningExecutionsListResponse,
|
||||
summary="List Orphaned Executions",
|
||||
)
|
||||
async def list_orphaned_executions(
|
||||
limit: int = 100,
|
||||
offset: int = 0,
|
||||
):
|
||||
"""
|
||||
Get detailed list of orphaned executions (>24h old, likely not in executor).
|
||||
|
||||
Args:
|
||||
limit: Maximum number of executions to return (default 100)
|
||||
offset: Number of executions to skip (default 0)
|
||||
|
||||
Returns:
|
||||
List of orphaned executions with details
|
||||
"""
|
||||
logger.info(f"Listing orphaned executions (limit={limit}, offset={offset})")
|
||||
|
||||
executions = await get_orphaned_executions_details(limit=limit, offset=offset)
|
||||
|
||||
# Get total count for pagination
|
||||
diagnostics = await get_execution_diagnostics()
|
||||
total = diagnostics.orphaned_running + diagnostics.orphaned_queued
|
||||
|
||||
return RunningExecutionsListResponse(executions=executions, total=total)
|
||||
|
||||
|
||||
@router.get(
|
||||
"/diagnostics/executions/failed",
|
||||
response_model=FailedExecutionsListResponse,
|
||||
summary="List Failed Executions",
|
||||
)
|
||||
async def list_failed_executions(
|
||||
limit: int = 100,
|
||||
offset: int = 0,
|
||||
hours: int = 24,
|
||||
):
|
||||
"""
|
||||
Get detailed list of failed executions.
|
||||
|
||||
Args:
|
||||
limit: Maximum number of executions to return (default 100)
|
||||
offset: Number of executions to skip (default 0)
|
||||
hours: Number of hours to look back (default 24)
|
||||
|
||||
Returns:
|
||||
List of failed executions with error details
|
||||
"""
|
||||
logger.info(
|
||||
f"Listing failed executions (limit={limit}, offset={offset}, hours={hours})"
|
||||
)
|
||||
|
||||
executions = await get_failed_executions_details(
|
||||
limit=limit, offset=offset, hours=hours
|
||||
)
|
||||
|
||||
# Get total count for pagination
|
||||
# Always count actual total for given hours parameter
|
||||
total = await get_failed_executions_count(hours=hours)
|
||||
|
||||
return FailedExecutionsListResponse(executions=executions, total=total)
|
||||
|
||||
|
||||
@router.get(
|
||||
"/diagnostics/executions/long-running",
|
||||
response_model=RunningExecutionsListResponse,
|
||||
summary="List Long-Running Executions",
|
||||
)
|
||||
async def list_long_running_executions(
|
||||
limit: int = 100,
|
||||
offset: int = 0,
|
||||
):
|
||||
"""
|
||||
Get detailed list of long-running executions (RUNNING status >24h).
|
||||
|
||||
Args:
|
||||
limit: Maximum number of executions to return (default 100)
|
||||
offset: Number of executions to skip (default 0)
|
||||
|
||||
Returns:
|
||||
List of long-running executions with details
|
||||
"""
|
||||
logger.info(f"Listing long-running executions (limit={limit}, offset={offset})")
|
||||
|
||||
executions = await get_long_running_executions_details(limit=limit, offset=offset)
|
||||
|
||||
# Get total count for pagination
|
||||
diagnostics = await get_execution_diagnostics()
|
||||
total = diagnostics.stuck_running_24h
|
||||
|
||||
return RunningExecutionsListResponse(executions=executions, total=total)
|
||||
|
||||
|
||||
@router.get(
|
||||
"/diagnostics/executions/stuck-queued",
|
||||
response_model=RunningExecutionsListResponse,
|
||||
summary="List Stuck Queued Executions",
|
||||
)
|
||||
async def list_stuck_queued_executions(
|
||||
limit: int = 100,
|
||||
offset: int = 0,
|
||||
):
|
||||
"""
|
||||
Get detailed list of stuck queued executions (QUEUED >1h, never started).
|
||||
|
||||
Args:
|
||||
limit: Maximum number of executions to return (default 100)
|
||||
offset: Number of executions to skip (default 0)
|
||||
|
||||
Returns:
|
||||
List of stuck queued executions with details
|
||||
"""
|
||||
logger.info(f"Listing stuck queued executions (limit={limit}, offset={offset})")
|
||||
|
||||
executions = await get_stuck_queued_executions_details(limit=limit, offset=offset)
|
||||
|
||||
# Get total count for pagination
|
||||
diagnostics = await get_execution_diagnostics()
|
||||
total = diagnostics.stuck_queued_1h
|
||||
|
||||
return RunningExecutionsListResponse(executions=executions, total=total)
|
||||
|
||||
|
||||
@router.get(
|
||||
"/diagnostics/executions/invalid",
|
||||
response_model=RunningExecutionsListResponse,
|
||||
summary="List Invalid Executions",
|
||||
)
|
||||
async def list_invalid_executions(
|
||||
limit: int = 100,
|
||||
offset: int = 0,
|
||||
):
|
||||
"""
|
||||
Get detailed list of executions in invalid states (READ-ONLY).
|
||||
|
||||
Invalid states indicate data corruption and require manual investigation:
|
||||
- QUEUED but has startedAt (impossible - can't start while queued)
|
||||
- RUNNING but no startedAt (impossible - can't run without starting)
|
||||
|
||||
⚠️ NO BULK ACTIONS PROVIDED - These need case-by-case investigation.
|
||||
|
||||
Each invalid execution likely has a different root cause (crashes, race conditions,
|
||||
DB corruption). Investigate the execution history and logs to determine appropriate
|
||||
action (manual cleanup, status fix, or leave as-is if system recovered).
|
||||
|
||||
Args:
|
||||
limit: Maximum number of executions to return (default 100)
|
||||
offset: Number of executions to skip (default 0)
|
||||
|
||||
Returns:
|
||||
List of invalid state executions with details
|
||||
"""
|
||||
logger.info(f"Listing invalid state executions (limit={limit}, offset={offset})")
|
||||
|
||||
executions = await get_invalid_executions_details(limit=limit, offset=offset)
|
||||
|
||||
# Get total count for pagination
|
||||
diagnostics = await get_execution_diagnostics()
|
||||
total = (
|
||||
diagnostics.invalid_queued_with_start
|
||||
+ diagnostics.invalid_running_without_start
|
||||
)
|
||||
|
||||
return RunningExecutionsListResponse(executions=executions, total=total)
|
||||
|
||||
|
||||
@router.post(
|
||||
"/diagnostics/executions/requeue",
|
||||
response_model=RequeueExecutionResponse,
|
||||
summary="Requeue Stuck Execution",
|
||||
)
|
||||
async def requeue_single_execution(
|
||||
request: StopExecutionRequest, # Reuse same request model (has execution_id)
|
||||
user: AuthUser = Security(requires_admin_user),
|
||||
):
|
||||
"""
|
||||
Requeue a stuck QUEUED execution (admin only).
|
||||
|
||||
Uses add_graph_execution with existing graph_exec_id to requeue.
|
||||
|
||||
⚠️ WARNING: Only use for stuck executions. This will re-execute and may cost credits.
|
||||
|
||||
Args:
|
||||
request: Contains execution_id to requeue
|
||||
|
||||
Returns:
|
||||
Success status and message
|
||||
"""
|
||||
logger.info(f"Admin {user.user_id} requeueing execution {request.execution_id}")
|
||||
|
||||
# Get the execution (validation - must be QUEUED)
|
||||
executions = await get_graph_executions(
|
||||
graph_exec_id=request.execution_id,
|
||||
statuses=[AgentExecutionStatus.QUEUED],
|
||||
)
|
||||
|
||||
if not executions:
|
||||
raise HTTPException(
|
||||
status_code=404,
|
||||
detail="Execution not found or not in QUEUED status",
|
||||
)
|
||||
|
||||
execution = executions[0]
|
||||
|
||||
# Use add_graph_execution in requeue mode
|
||||
await add_graph_execution(
|
||||
graph_id=execution.graph_id,
|
||||
user_id=execution.user_id,
|
||||
graph_version=execution.graph_version,
|
||||
graph_exec_id=request.execution_id, # Requeue existing execution
|
||||
)
|
||||
|
||||
return RequeueExecutionResponse(
|
||||
success=True,
|
||||
requeued_count=1,
|
||||
message="Execution requeued successfully",
|
||||
)
|
||||
|
||||
|
||||
@router.post(
|
||||
"/diagnostics/executions/requeue-bulk",
|
||||
response_model=RequeueExecutionResponse,
|
||||
summary="Requeue Multiple Stuck Executions",
|
||||
)
|
||||
async def requeue_multiple_executions(
|
||||
request: StopExecutionsRequest, # Reuse same request model (has execution_ids)
|
||||
user: AuthUser = Security(requires_admin_user),
|
||||
):
|
||||
"""
|
||||
Requeue multiple stuck QUEUED executions (admin only).
|
||||
|
||||
Uses add_graph_execution with existing graph_exec_id to requeue.
|
||||
|
||||
⚠️ WARNING: Only use for stuck executions. This will re-execute and may cost credits.
|
||||
|
||||
Args:
|
||||
request: Contains list of execution_ids to requeue
|
||||
|
||||
Returns:
|
||||
Number of executions requeued and success message
|
||||
"""
|
||||
logger.info(
|
||||
f"Admin {user.user_id} requeueing {len(request.execution_ids)} executions"
|
||||
)
|
||||
|
||||
# Get executions by ID list (must be QUEUED)
|
||||
executions = await get_graph_executions(
|
||||
execution_ids=request.execution_ids,
|
||||
statuses=[AgentExecutionStatus.QUEUED],
|
||||
)
|
||||
|
||||
if not executions:
|
||||
return RequeueExecutionResponse(
|
||||
success=False,
|
||||
requeued_count=0,
|
||||
message="No QUEUED executions found to requeue",
|
||||
)
|
||||
|
||||
# Requeue all executions in parallel using add_graph_execution
|
||||
async def requeue_one(exec) -> bool:
|
||||
try:
|
||||
await add_graph_execution(
|
||||
graph_id=exec.graph_id,
|
||||
user_id=exec.user_id,
|
||||
graph_version=exec.graph_version,
|
||||
graph_exec_id=exec.id, # Requeue existing
|
||||
)
|
||||
return True
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to requeue {exec.id}: {e}")
|
||||
return False
|
||||
|
||||
results = await asyncio.gather(
|
||||
*[requeue_one(exec) for exec in executions], return_exceptions=False
|
||||
)
|
||||
|
||||
requeued_count = sum(1 for success in results if success)
|
||||
|
||||
return RequeueExecutionResponse(
|
||||
success=requeued_count > 0,
|
||||
requeued_count=requeued_count,
|
||||
message=f"Requeued {requeued_count} of {len(request.execution_ids)} executions",
|
||||
)
|
||||
|
||||
|
||||
@router.post(
|
||||
"/diagnostics/executions/stop",
|
||||
response_model=StopExecutionResponse,
|
||||
summary="Stop Single Execution",
|
||||
)
|
||||
async def stop_single_execution(
|
||||
request: StopExecutionRequest,
|
||||
user: AuthUser = Security(requires_admin_user),
|
||||
):
|
||||
"""
|
||||
Stop a single execution (admin only).
|
||||
|
||||
Uses robust stop_graph_execution which cascades to children and waits for termination.
|
||||
|
||||
Args:
|
||||
request: Contains execution_id to stop
|
||||
|
||||
Returns:
|
||||
Success status and message
|
||||
"""
|
||||
logger.info(f"Admin {user.user_id} stopping execution {request.execution_id}")
|
||||
|
||||
# Get the execution to find its owner user_id (required by stop_graph_execution)
|
||||
executions = await get_graph_executions(
|
||||
graph_exec_id=request.execution_id,
|
||||
)
|
||||
|
||||
if not executions:
|
||||
raise HTTPException(status_code=404, detail="Execution not found")
|
||||
|
||||
execution = executions[0]
|
||||
|
||||
# Use robust stop_graph_execution (cascades to children, waits for termination)
|
||||
await stop_graph_execution(
|
||||
user_id=execution.user_id,
|
||||
graph_exec_id=request.execution_id,
|
||||
wait_timeout=15.0,
|
||||
cascade=True,
|
||||
)
|
||||
|
||||
return StopExecutionResponse(
|
||||
success=True,
|
||||
stopped_count=1,
|
||||
message="Execution stopped successfully",
|
||||
)
|
||||
|
||||
|
||||
@router.post(
|
||||
"/diagnostics/executions/stop-bulk",
|
||||
response_model=StopExecutionResponse,
|
||||
summary="Stop Multiple Executions",
|
||||
)
|
||||
async def stop_multiple_executions(
|
||||
request: StopExecutionsRequest,
|
||||
user: AuthUser = Security(requires_admin_user),
|
||||
):
|
||||
"""
|
||||
Stop multiple active executions (admin only).
|
||||
|
||||
Uses robust stop_graph_execution which cascades to children and waits for termination.
|
||||
|
||||
Args:
|
||||
request: Contains list of execution_ids to stop
|
||||
|
||||
Returns:
|
||||
Number of executions stopped and success message
|
||||
"""
|
||||
|
||||
logger.info(
|
||||
f"Admin {user.user_id} stopping {len(request.execution_ids)} executions"
|
||||
)
|
||||
|
||||
# Get executions by ID list
|
||||
executions = await get_graph_executions(
|
||||
execution_ids=request.execution_ids,
|
||||
)
|
||||
|
||||
if not executions:
|
||||
return StopExecutionResponse(
|
||||
success=False,
|
||||
stopped_count=0,
|
||||
message="No executions found",
|
||||
)
|
||||
|
||||
# Stop all executions in parallel using robust stop_graph_execution
|
||||
async def stop_one(exec) -> bool:
|
||||
try:
|
||||
await stop_graph_execution(
|
||||
user_id=exec.user_id,
|
||||
graph_exec_id=exec.id,
|
||||
wait_timeout=15.0,
|
||||
cascade=True,
|
||||
)
|
||||
return True
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to stop execution {exec.id}: {e}")
|
||||
return False
|
||||
|
||||
results = await asyncio.gather(
|
||||
*[stop_one(exec) for exec in executions], return_exceptions=False
|
||||
)
|
||||
|
||||
stopped_count = sum(1 for success in results if success)
|
||||
|
||||
return StopExecutionResponse(
|
||||
success=stopped_count > 0,
|
||||
stopped_count=stopped_count,
|
||||
message=f"Stopped {stopped_count} of {len(request.execution_ids)} executions",
|
||||
)
|
||||
|
||||
|
||||
@router.post(
|
||||
"/diagnostics/executions/cleanup-orphaned",
|
||||
response_model=StopExecutionResponse,
|
||||
summary="Cleanup Orphaned Executions",
|
||||
)
|
||||
async def cleanup_orphaned_executions(
|
||||
request: StopExecutionsRequest,
|
||||
user: AuthUser = Security(requires_admin_user),
|
||||
):
|
||||
"""
|
||||
Cleanup orphaned executions by directly updating DB status (admin only).
|
||||
For executions in DB but not actually running in executor (old/stale records).
|
||||
|
||||
Args:
|
||||
request: Contains list of execution_ids to cleanup
|
||||
|
||||
Returns:
|
||||
Number of executions cleaned up and success message
|
||||
"""
|
||||
logger.info(
|
||||
f"Admin {user.user_id} cleaning up {len(request.execution_ids)} orphaned executions"
|
||||
)
|
||||
|
||||
cleaned_count = await cleanup_orphaned_executions_bulk(
|
||||
request.execution_ids, user.user_id
|
||||
)
|
||||
|
||||
return StopExecutionResponse(
|
||||
success=cleaned_count > 0,
|
||||
stopped_count=cleaned_count,
|
||||
message=f"Cleaned up {cleaned_count} of {len(request.execution_ids)} orphaned executions",
|
||||
)
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# SCHEDULE DIAGNOSTICS ENDPOINTS
|
||||
# ============================================================================
|
||||
|
||||
|
||||
class SchedulesListResponse(BaseModel):
|
||||
"""Response model for list of schedules"""
|
||||
|
||||
schedules: List[ScheduleDetail]
|
||||
total: int
|
||||
|
||||
|
||||
class OrphanedSchedulesListResponse(BaseModel):
|
||||
"""Response model for list of orphaned schedules"""
|
||||
|
||||
schedules: List[OrphanedScheduleDetail]
|
||||
total: int
|
||||
|
||||
|
||||
class ScheduleCleanupResponse(BaseModel):
|
||||
"""Response model for schedule cleanup operations"""
|
||||
|
||||
success: bool
|
||||
deleted_count: int = 0
|
||||
message: str
|
||||
|
||||
|
||||
@router.get(
|
||||
"/diagnostics/schedules",
|
||||
response_model=ScheduleHealthMetrics,
|
||||
summary="Get Schedule Diagnostics",
|
||||
)
|
||||
async def get_schedule_diagnostics_endpoint():
|
||||
"""
|
||||
Get comprehensive diagnostic information about schedule health.
|
||||
|
||||
Returns schedule metrics including:
|
||||
- Total schedules (user vs system)
|
||||
- Orphaned schedules by category
|
||||
- Upcoming executions
|
||||
"""
|
||||
logger.info("Getting schedule diagnostics")
|
||||
|
||||
diagnostics = await get_schedule_health_metrics()
|
||||
|
||||
logger.info(
|
||||
f"Schedule diagnostics: total={diagnostics.total_schedules}, "
|
||||
f"user={diagnostics.user_schedules}, "
|
||||
f"orphaned={diagnostics.total_orphaned}"
|
||||
)
|
||||
|
||||
return diagnostics
|
||||
|
||||
|
||||
@router.get(
|
||||
"/diagnostics/schedules/all",
|
||||
response_model=SchedulesListResponse,
|
||||
summary="List All User Schedules",
|
||||
)
|
||||
async def list_all_schedules(
|
||||
limit: int = 100,
|
||||
offset: int = 0,
|
||||
):
|
||||
"""
|
||||
Get detailed list of all user schedules (excludes system monitoring jobs).
|
||||
|
||||
Args:
|
||||
limit: Maximum number of schedules to return (default 100)
|
||||
offset: Number of schedules to skip (default 0)
|
||||
|
||||
Returns:
|
||||
List of schedules with details
|
||||
"""
|
||||
logger.info(f"Listing all schedules (limit={limit}, offset={offset})")
|
||||
|
||||
schedules = await get_all_schedules_details(limit=limit, offset=offset)
|
||||
|
||||
# Get total count
|
||||
diagnostics = await get_schedule_health_metrics()
|
||||
total = diagnostics.user_schedules
|
||||
|
||||
return SchedulesListResponse(schedules=schedules, total=total)
|
||||
|
||||
|
||||
@router.get(
|
||||
"/diagnostics/schedules/orphaned",
|
||||
response_model=OrphanedSchedulesListResponse,
|
||||
summary="List Orphaned Schedules",
|
||||
)
|
||||
async def list_orphaned_schedules():
|
||||
"""
|
||||
Get detailed list of orphaned schedules with orphan reasons.
|
||||
|
||||
Returns:
|
||||
List of orphaned schedules categorized by orphan type
|
||||
"""
|
||||
logger.info("Listing orphaned schedules")
|
||||
|
||||
schedules = await get_orphaned_schedules_details()
|
||||
|
||||
return OrphanedSchedulesListResponse(schedules=schedules, total=len(schedules))
|
||||
|
||||
|
||||
@router.post(
|
||||
"/diagnostics/schedules/cleanup-orphaned",
|
||||
response_model=ScheduleCleanupResponse,
|
||||
summary="Cleanup Orphaned Schedules",
|
||||
)
|
||||
async def cleanup_orphaned_schedules(
|
||||
request: StopExecutionsRequest, # Reuse for schedule_ids list
|
||||
user: AuthUser = Security(requires_admin_user),
|
||||
):
|
||||
"""
|
||||
Cleanup orphaned schedules by deleting from scheduler (admin only).
|
||||
|
||||
Args:
|
||||
request: Contains list of schedule_ids to delete
|
||||
|
||||
Returns:
|
||||
Number of schedules deleted and success message
|
||||
"""
|
||||
logger.info(
|
||||
f"Admin {user.user_id} cleaning up {len(request.execution_ids)} orphaned schedules"
|
||||
)
|
||||
|
||||
deleted_count = await cleanup_orphaned_schedules_bulk(
|
||||
request.execution_ids, user.user_id
|
||||
)
|
||||
|
||||
return ScheduleCleanupResponse(
|
||||
success=deleted_count > 0,
|
||||
deleted_count=deleted_count,
|
||||
message=f"Deleted {deleted_count} of {len(request.execution_ids)} orphaned schedules",
|
||||
)
|
||||
|
||||
|
||||
@router.post(
|
||||
"/diagnostics/executions/stop-all-long-running",
|
||||
response_model=StopExecutionResponse,
|
||||
summary="Stop ALL Long-Running Executions",
|
||||
)
|
||||
async def stop_all_long_running_executions_endpoint(
|
||||
user: AuthUser = Security(requires_admin_user),
|
||||
):
|
||||
"""
|
||||
Stop ALL long-running executions (RUNNING >24h) by sending cancel signals (admin only).
|
||||
Operates on entire dataset, not limited to pagination.
|
||||
|
||||
Returns:
|
||||
Number of executions stopped and success message
|
||||
"""
|
||||
logger.info(f"Admin {user.user_id} stopping ALL long-running executions")
|
||||
|
||||
stopped_count = await stop_all_long_running_executions(user.user_id)
|
||||
|
||||
return StopExecutionResponse(
|
||||
success=stopped_count > 0,
|
||||
stopped_count=stopped_count,
|
||||
message=f"Stopped {stopped_count} long-running executions",
|
||||
)
|
||||
|
||||
|
||||
@router.post(
|
||||
"/diagnostics/executions/cleanup-all-orphaned",
|
||||
response_model=StopExecutionResponse,
|
||||
summary="Cleanup ALL Orphaned Executions",
|
||||
)
|
||||
async def cleanup_all_orphaned_executions(
|
||||
user: AuthUser = Security(requires_admin_user),
|
||||
):
|
||||
"""
|
||||
Cleanup ALL orphaned executions (>24h old) by directly updating DB status.
|
||||
Operates on all executions, not just paginated results.
|
||||
|
||||
Returns:
|
||||
Number of executions cleaned up and success message
|
||||
"""
|
||||
logger.info(f"Admin {user.user_id} cleaning up ALL orphaned executions")
|
||||
|
||||
# Fetch all orphaned execution IDs
|
||||
execution_ids = await get_all_orphaned_execution_ids()
|
||||
|
||||
if not execution_ids:
|
||||
return StopExecutionResponse(
|
||||
success=True,
|
||||
stopped_count=0,
|
||||
message="No orphaned executions to cleanup",
|
||||
)
|
||||
|
||||
cleaned_count = await cleanup_orphaned_executions_bulk(execution_ids, user.user_id)
|
||||
|
||||
return StopExecutionResponse(
|
||||
success=cleaned_count > 0,
|
||||
stopped_count=cleaned_count,
|
||||
message=f"Cleaned up {cleaned_count} orphaned executions",
|
||||
)
|
||||
|
||||
|
||||
@router.post(
|
||||
"/diagnostics/executions/cleanup-all-stuck-queued",
|
||||
response_model=StopExecutionResponse,
|
||||
summary="Cleanup ALL Stuck Queued Executions",
|
||||
)
|
||||
async def cleanup_all_stuck_queued_executions_endpoint(
|
||||
user: AuthUser = Security(requires_admin_user),
|
||||
):
|
||||
"""
|
||||
Cleanup ALL stuck queued executions (QUEUED >1h) by updating DB status (admin only).
|
||||
Operates on entire dataset, not limited to pagination.
|
||||
|
||||
Returns:
|
||||
Number of executions cleaned up and success message
|
||||
"""
|
||||
logger.info(f"Admin {user.user_id} cleaning up ALL stuck queued executions")
|
||||
|
||||
cleaned_count = await cleanup_all_stuck_queued_executions(user.user_id)
|
||||
|
||||
return StopExecutionResponse(
|
||||
success=cleaned_count > 0,
|
||||
stopped_count=cleaned_count,
|
||||
message=f"Cleaned up {cleaned_count} stuck queued executions",
|
||||
)
|
||||
|
||||
|
||||
@router.post(
|
||||
"/diagnostics/executions/requeue-all-stuck",
|
||||
response_model=RequeueExecutionResponse,
|
||||
summary="Requeue ALL Stuck Queued Executions",
|
||||
)
|
||||
async def requeue_all_stuck_executions(
|
||||
user: AuthUser = Security(requires_admin_user),
|
||||
):
|
||||
"""
|
||||
Requeue ALL stuck queued executions (QUEUED >1h) by publishing to RabbitMQ.
|
||||
Operates on all executions, not just paginated results.
|
||||
|
||||
Uses add_graph_execution with existing graph_exec_id to requeue.
|
||||
|
||||
⚠️ WARNING: This will re-execute ALL stuck executions and may cost significant credits.
|
||||
|
||||
Returns:
|
||||
Number of executions requeued and success message
|
||||
"""
|
||||
logger.info(f"Admin {user.user_id} requeueing ALL stuck queued executions")
|
||||
|
||||
# Fetch all stuck queued execution IDs
|
||||
execution_ids = await get_all_stuck_queued_execution_ids()
|
||||
|
||||
if not execution_ids:
|
||||
return RequeueExecutionResponse(
|
||||
success=True,
|
||||
requeued_count=0,
|
||||
message="No stuck queued executions to requeue",
|
||||
)
|
||||
|
||||
# Get stuck executions by ID list (must be QUEUED)
|
||||
executions = await get_graph_executions(
|
||||
execution_ids=execution_ids,
|
||||
statuses=[AgentExecutionStatus.QUEUED],
|
||||
)
|
||||
|
||||
# Requeue all in parallel using add_graph_execution
|
||||
async def requeue_one(exec) -> bool:
|
||||
try:
|
||||
await add_graph_execution(
|
||||
graph_id=exec.graph_id,
|
||||
user_id=exec.user_id,
|
||||
graph_version=exec.graph_version,
|
||||
graph_exec_id=exec.id, # Requeue existing
|
||||
)
|
||||
return True
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to requeue {exec.id}: {e}")
|
||||
return False
|
||||
|
||||
results = await asyncio.gather(
|
||||
*[requeue_one(exec) for exec in executions], return_exceptions=False
|
||||
)
|
||||
|
||||
requeued_count = sum(1 for success in results if success)
|
||||
|
||||
return RequeueExecutionResponse(
|
||||
success=requeued_count > 0,
|
||||
requeued_count=requeued_count,
|
||||
message=f"Requeued {requeued_count} stuck executions",
|
||||
)
|
||||
@@ -0,0 +1,336 @@
|
||||
from datetime import datetime, timezone
|
||||
from unittest.mock import AsyncMock
|
||||
|
||||
import fastapi
|
||||
import fastapi.testclient
|
||||
import pytest
|
||||
import pytest_mock
|
||||
from autogpt_libs.auth.jwt_utils import get_jwt_payload
|
||||
from prisma.enums import AgentExecutionStatus
|
||||
|
||||
import backend.server.v2.admin.diagnostics_admin_routes as diagnostics_admin_routes
|
||||
from backend.data.diagnostics import ExecutionDiagnosticsSummary, RunningExecutionDetail
|
||||
from backend.data.execution import GraphExecutionMeta
|
||||
|
||||
app = fastapi.FastAPI()
|
||||
app.include_router(diagnostics_admin_routes.router)
|
||||
|
||||
client = fastapi.testclient.TestClient(app)
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def setup_app_admin_auth(mock_jwt_admin):
|
||||
"""Setup admin auth overrides for all tests in this module"""
|
||||
app.dependency_overrides[get_jwt_payload] = mock_jwt_admin["get_jwt_payload"]
|
||||
yield
|
||||
app.dependency_overrides.clear()
|
||||
|
||||
|
||||
def test_get_execution_diagnostics_success(
|
||||
mocker: pytest_mock.MockFixture,
|
||||
):
|
||||
"""Test fetching execution diagnostics with invalid state detection"""
|
||||
mock_diagnostics = ExecutionDiagnosticsSummary(
|
||||
running_count=10,
|
||||
queued_db_count=5,
|
||||
rabbitmq_queue_depth=3,
|
||||
cancel_queue_depth=0,
|
||||
orphaned_running=2,
|
||||
orphaned_queued=1,
|
||||
failed_count_1h=5,
|
||||
failed_count_24h=20,
|
||||
failure_rate_24h=0.83,
|
||||
stuck_running_24h=1,
|
||||
stuck_running_1h=3,
|
||||
oldest_running_hours=26.5,
|
||||
stuck_queued_1h=2,
|
||||
queued_never_started=1,
|
||||
invalid_queued_with_start=1, # New invalid state
|
||||
invalid_running_without_start=1, # New invalid state
|
||||
completed_1h=50,
|
||||
completed_24h=1200,
|
||||
throughput_per_hour=50.0,
|
||||
timestamp=datetime.now(timezone.utc).isoformat(),
|
||||
)
|
||||
|
||||
mocker.patch(
|
||||
"backend.server.v2.admin.diagnostics_admin_routes.get_execution_diagnostics",
|
||||
return_value=mock_diagnostics,
|
||||
)
|
||||
|
||||
response = client.get("/admin/diagnostics/executions")
|
||||
|
||||
assert response.status_code == 200
|
||||
data = response.json()
|
||||
|
||||
# Verify new invalid state fields are included
|
||||
assert data["invalid_queued_with_start"] == 1
|
||||
assert data["invalid_running_without_start"] == 1
|
||||
# Verify all expected fields present
|
||||
assert "running_executions" in data
|
||||
assert "orphaned_running" in data
|
||||
assert "failed_count_24h" in data
|
||||
|
||||
|
||||
def test_list_invalid_executions(
|
||||
mocker: pytest_mock.MockFixture,
|
||||
):
|
||||
"""Test listing executions in invalid states (read-only endpoint)"""
|
||||
mock_invalid_executions = [
|
||||
RunningExecutionDetail(
|
||||
execution_id="exec-invalid-1",
|
||||
graph_id="graph-123",
|
||||
graph_name="Test Graph",
|
||||
graph_version=1,
|
||||
user_id="user-123",
|
||||
user_email="test@example.com",
|
||||
status="QUEUED",
|
||||
created_at=datetime.now(timezone.utc),
|
||||
started_at=datetime.now(
|
||||
timezone.utc
|
||||
), # QUEUED but has startedAt - INVALID!
|
||||
queue_status=None,
|
||||
),
|
||||
RunningExecutionDetail(
|
||||
execution_id="exec-invalid-2",
|
||||
graph_id="graph-456",
|
||||
graph_name="Another Graph",
|
||||
graph_version=2,
|
||||
user_id="user-456",
|
||||
user_email="user@example.com",
|
||||
status="RUNNING",
|
||||
created_at=datetime.now(timezone.utc),
|
||||
started_at=None, # RUNNING but no startedAt - INVALID!
|
||||
queue_status=None,
|
||||
),
|
||||
]
|
||||
|
||||
mock_diagnostics = ExecutionDiagnosticsSummary(
|
||||
running_count=10,
|
||||
queued_db_count=5,
|
||||
rabbitmq_queue_depth=3,
|
||||
cancel_queue_depth=0,
|
||||
orphaned_running=0,
|
||||
orphaned_queued=0,
|
||||
failed_count_1h=0,
|
||||
failed_count_24h=0,
|
||||
failure_rate_24h=0.0,
|
||||
stuck_running_24h=0,
|
||||
stuck_running_1h=0,
|
||||
oldest_running_hours=None,
|
||||
stuck_queued_1h=0,
|
||||
queued_never_started=0,
|
||||
invalid_queued_with_start=1,
|
||||
invalid_running_without_start=1,
|
||||
completed_1h=0,
|
||||
completed_24h=0,
|
||||
throughput_per_hour=0.0,
|
||||
timestamp=datetime.now(timezone.utc).isoformat(),
|
||||
)
|
||||
|
||||
mocker.patch(
|
||||
"backend.server.v2.admin.diagnostics_admin_routes.get_invalid_executions_details",
|
||||
return_value=mock_invalid_executions,
|
||||
)
|
||||
mocker.patch(
|
||||
"backend.server.v2.admin.diagnostics_admin_routes.get_execution_diagnostics",
|
||||
return_value=mock_diagnostics,
|
||||
)
|
||||
|
||||
response = client.get("/admin/diagnostics/executions/invalid?limit=100&offset=0")
|
||||
|
||||
assert response.status_code == 200
|
||||
data = response.json()
|
||||
assert data["total"] == 2 # Sum of both invalid state types
|
||||
assert len(data["executions"]) == 2
|
||||
# Verify both types of invalid states are returned
|
||||
assert data["executions"][0]["execution_id"] in [
|
||||
"exec-invalid-1",
|
||||
"exec-invalid-2",
|
||||
]
|
||||
assert data["executions"][1]["execution_id"] in [
|
||||
"exec-invalid-1",
|
||||
"exec-invalid-2",
|
||||
]
|
||||
|
||||
|
||||
def test_requeue_single_execution_with_add_graph_execution(
|
||||
mocker: pytest_mock.MockFixture,
|
||||
admin_user_id: str,
|
||||
):
|
||||
"""Test requeueing uses add_graph_execution in requeue mode"""
|
||||
mock_exec_meta = GraphExecutionMeta(
|
||||
id="exec-stuck-123",
|
||||
user_id="user-123",
|
||||
graph_id="graph-456",
|
||||
graph_version=1,
|
||||
inputs=None,
|
||||
credential_inputs=None,
|
||||
nodes_input_masks=None,
|
||||
preset_id=None,
|
||||
status=AgentExecutionStatus.QUEUED,
|
||||
started_at=datetime.now(timezone.utc),
|
||||
ended_at=datetime.now(timezone.utc),
|
||||
stats=None,
|
||||
)
|
||||
|
||||
mocker.patch(
|
||||
"backend.server.v2.admin.diagnostics_admin_routes.get_graph_executions",
|
||||
return_value=[mock_exec_meta],
|
||||
)
|
||||
|
||||
mock_add_graph_execution = mocker.patch(
|
||||
"backend.server.v2.admin.diagnostics_admin_routes.add_graph_execution",
|
||||
return_value=AsyncMock(),
|
||||
)
|
||||
|
||||
response = client.post(
|
||||
"/admin/diagnostics/executions/requeue",
|
||||
json={"execution_id": "exec-stuck-123"},
|
||||
)
|
||||
|
||||
assert response.status_code == 200
|
||||
data = response.json()
|
||||
assert data["success"] is True
|
||||
assert data["requeued_count"] == 1
|
||||
|
||||
# Verify it used add_graph_execution in requeue mode
|
||||
mock_add_graph_execution.assert_called_once()
|
||||
call_kwargs = mock_add_graph_execution.call_args.kwargs
|
||||
assert call_kwargs["graph_exec_id"] == "exec-stuck-123" # Requeue mode!
|
||||
assert call_kwargs["graph_id"] == "graph-456"
|
||||
assert call_kwargs["user_id"] == "user-123"
|
||||
|
||||
|
||||
def test_stop_single_execution_with_stop_graph_execution(
|
||||
mocker: pytest_mock.MockFixture,
|
||||
admin_user_id: str,
|
||||
):
|
||||
"""Test stopping uses robust stop_graph_execution"""
|
||||
mock_exec_meta = GraphExecutionMeta(
|
||||
id="exec-running-123",
|
||||
user_id="user-789",
|
||||
graph_id="graph-999",
|
||||
graph_version=2,
|
||||
inputs=None,
|
||||
credential_inputs=None,
|
||||
nodes_input_masks=None,
|
||||
preset_id=None,
|
||||
status=AgentExecutionStatus.RUNNING,
|
||||
started_at=datetime.now(timezone.utc),
|
||||
ended_at=datetime.now(timezone.utc),
|
||||
stats=None,
|
||||
)
|
||||
|
||||
mocker.patch(
|
||||
"backend.server.v2.admin.diagnostics_admin_routes.get_graph_executions",
|
||||
return_value=[mock_exec_meta],
|
||||
)
|
||||
|
||||
mock_stop_graph_execution = mocker.patch(
|
||||
"backend.server.v2.admin.diagnostics_admin_routes.stop_graph_execution",
|
||||
return_value=AsyncMock(),
|
||||
)
|
||||
|
||||
response = client.post(
|
||||
"/admin/diagnostics/executions/stop",
|
||||
json={"execution_id": "exec-running-123"},
|
||||
)
|
||||
|
||||
assert response.status_code == 200
|
||||
data = response.json()
|
||||
assert data["success"] is True
|
||||
assert data["stopped_count"] == 1
|
||||
|
||||
# Verify it used stop_graph_execution with cascade
|
||||
mock_stop_graph_execution.assert_called_once()
|
||||
call_kwargs = mock_stop_graph_execution.call_args.kwargs
|
||||
assert call_kwargs["graph_exec_id"] == "exec-running-123"
|
||||
assert call_kwargs["user_id"] == "user-789"
|
||||
assert call_kwargs["cascade"] is True # Stops children too!
|
||||
assert call_kwargs["wait_timeout"] == 15.0
|
||||
|
||||
|
||||
def test_requeue_not_queued_execution_fails(
|
||||
mocker: pytest_mock.MockFixture,
|
||||
):
|
||||
"""Test that requeue fails if execution is not in QUEUED status"""
|
||||
# Mock an execution that's RUNNING (not QUEUED)
|
||||
mocker.patch(
|
||||
"backend.server.v2.admin.diagnostics_admin_routes.get_graph_executions",
|
||||
return_value=[], # No QUEUED executions found
|
||||
)
|
||||
|
||||
response = client.post(
|
||||
"/admin/diagnostics/executions/requeue",
|
||||
json={"execution_id": "exec-running-123"},
|
||||
)
|
||||
|
||||
assert response.status_code == 404
|
||||
assert "not found or not in QUEUED status" in response.json()["detail"]
|
||||
|
||||
|
||||
def test_list_invalid_executions_no_bulk_actions(
|
||||
mocker: pytest_mock.MockFixture,
|
||||
):
|
||||
"""Verify invalid executions endpoint is read-only (no bulk actions)"""
|
||||
# This is a documentation test - the endpoint exists but should not
|
||||
# have corresponding cleanup/stop/requeue endpoints
|
||||
|
||||
# These endpoints should NOT exist for invalid states:
|
||||
invalid_bulk_endpoints = [
|
||||
"/admin/diagnostics/executions/cleanup-invalid",
|
||||
"/admin/diagnostics/executions/stop-invalid",
|
||||
"/admin/diagnostics/executions/requeue-invalid",
|
||||
]
|
||||
|
||||
for endpoint in invalid_bulk_endpoints:
|
||||
response = client.post(endpoint, json={"execution_ids": ["test"]})
|
||||
assert response.status_code == 404, f"{endpoint} should not exist (read-only)"
|
||||
|
||||
|
||||
def test_execution_ids_filter_efficiency(
|
||||
mocker: pytest_mock.MockFixture,
|
||||
):
|
||||
"""Test that bulk operations use efficient execution_ids filter"""
|
||||
mock_exec_metas = [
|
||||
GraphExecutionMeta(
|
||||
id=f"exec-{i}",
|
||||
user_id=f"user-{i}",
|
||||
graph_id="graph-123",
|
||||
graph_version=1,
|
||||
inputs=None,
|
||||
credential_inputs=None,
|
||||
nodes_input_masks=None,
|
||||
preset_id=None,
|
||||
status=AgentExecutionStatus.QUEUED,
|
||||
started_at=datetime.now(timezone.utc),
|
||||
ended_at=datetime.now(timezone.utc),
|
||||
stats=None,
|
||||
)
|
||||
for i in range(3)
|
||||
]
|
||||
|
||||
mock_get_graph_executions = mocker.patch(
|
||||
"backend.server.v2.admin.diagnostics_admin_routes.get_graph_executions",
|
||||
return_value=mock_exec_metas,
|
||||
)
|
||||
|
||||
mocker.patch(
|
||||
"backend.server.v2.admin.diagnostics_admin_routes.add_graph_execution",
|
||||
return_value=AsyncMock(),
|
||||
)
|
||||
|
||||
response = client.post(
|
||||
"/admin/diagnostics/executions/requeue-bulk",
|
||||
json={"execution_ids": ["exec-0", "exec-1", "exec-2"]},
|
||||
)
|
||||
|
||||
assert response.status_code == 200
|
||||
|
||||
# Verify it used execution_ids filter (not fetching all queued)
|
||||
mock_get_graph_executions.assert_called_once()
|
||||
call_kwargs = mock_get_graph_executions.call_args.kwargs
|
||||
assert "execution_ids" in call_kwargs
|
||||
assert call_kwargs["execution_ids"] == ["exec-0", "exec-1", "exec-2"]
|
||||
assert call_kwargs["statuses"] == [AgentExecutionStatus.QUEUED]
|
||||
@@ -14,3 +14,70 @@ class UserHistoryResponse(BaseModel):
|
||||
class AddUserCreditsResponse(BaseModel):
|
||||
new_balance: int
|
||||
transaction_key: str
|
||||
|
||||
|
||||
class ExecutionDiagnosticsResponse(BaseModel):
|
||||
"""Response model for execution diagnostics"""
|
||||
|
||||
# Current execution state
|
||||
running_executions: int
|
||||
queued_executions_db: int
|
||||
queued_executions_rabbitmq: int
|
||||
cancel_queue_depth: int
|
||||
|
||||
# Orphaned execution detection
|
||||
orphaned_running: int
|
||||
orphaned_queued: int
|
||||
|
||||
# Failure metrics
|
||||
failed_count_1h: int
|
||||
failed_count_24h: int
|
||||
failure_rate_24h: float
|
||||
|
||||
# Long-running detection
|
||||
stuck_running_24h: int
|
||||
stuck_running_1h: int
|
||||
oldest_running_hours: float | None
|
||||
|
||||
# Stuck queued detection
|
||||
stuck_queued_1h: int
|
||||
queued_never_started: int
|
||||
|
||||
# Invalid state detection (data corruption - no auto-actions)
|
||||
invalid_queued_with_start: int
|
||||
invalid_running_without_start: int
|
||||
|
||||
# Throughput metrics
|
||||
completed_1h: int
|
||||
completed_24h: int
|
||||
throughput_per_hour: float
|
||||
|
||||
timestamp: str
|
||||
|
||||
|
||||
class AgentDiagnosticsResponse(BaseModel):
|
||||
"""Response model for agent diagnostics"""
|
||||
|
||||
agents_with_active_executions: int
|
||||
timestamp: str
|
||||
|
||||
|
||||
class ScheduleHealthMetrics(BaseModel):
|
||||
"""Response model for schedule diagnostics"""
|
||||
|
||||
total_schedules: int
|
||||
user_schedules: int
|
||||
system_schedules: int
|
||||
|
||||
# Orphan detection
|
||||
orphaned_deleted_graph: int
|
||||
orphaned_no_library_access: int
|
||||
orphaned_invalid_credentials: int
|
||||
orphaned_validation_failed: int
|
||||
total_orphaned: int
|
||||
|
||||
# Upcoming
|
||||
schedules_next_hour: int
|
||||
schedules_next_24h: int
|
||||
|
||||
timestamp: str
|
||||
|
||||
@@ -0,0 +1,579 @@
|
||||
"use client";
|
||||
|
||||
import { useState } from "react";
|
||||
import { Button } from "@/components/atoms/Button/Button";
|
||||
import { Card } from "@/components/atoms/Card/Card";
|
||||
import {
|
||||
CardContent,
|
||||
CardDescription,
|
||||
CardHeader,
|
||||
CardTitle,
|
||||
} from "@/components/__legacy__/ui/card";
|
||||
import { ArrowClockwise } from "@phosphor-icons/react";
|
||||
import { ErrorCard } from "@/components/molecules/ErrorCard/ErrorCard";
|
||||
import { useDiagnosticsContent } from "./useDiagnosticsContent";
|
||||
import { ExecutionsTable } from "./ExecutionsTable";
|
||||
import { SchedulesTable } from "./SchedulesTable";
|
||||
|
||||
export function DiagnosticsContent() {
|
||||
const {
|
||||
executionData,
|
||||
agentData,
|
||||
scheduleData,
|
||||
isLoading,
|
||||
isError,
|
||||
error,
|
||||
refresh,
|
||||
} = useDiagnosticsContent();
|
||||
|
||||
const [activeTab, setActiveTab] = useState<
|
||||
"all" | "orphaned" | "failed" | "long-running" | "stuck-queued" | "invalid"
|
||||
>("all");
|
||||
|
||||
if (isLoading && !executionData && !agentData) {
|
||||
return (
|
||||
<div className="flex h-64 items-center justify-center">
|
||||
<div className="text-center">
|
||||
<ArrowClockwise className="mx-auto h-8 w-8 animate-spin text-gray-400" />
|
||||
<p className="mt-2 text-gray-500">Loading diagnostics...</p>
|
||||
</div>
|
||||
</div>
|
||||
);
|
||||
}
|
||||
|
||||
if (isError) {
|
||||
return (
|
||||
<ErrorCard
|
||||
httpError={error as any}
|
||||
onRetry={refresh}
|
||||
context="diagnostics"
|
||||
/>
|
||||
);
|
||||
}
|
||||
|
||||
return (
|
||||
<div className="space-y-6">
|
||||
<div className="flex items-center justify-between">
|
||||
<div>
|
||||
<h1 className="text-3xl font-bold">System Diagnostics</h1>
|
||||
<p className="text-gray-500">
|
||||
Monitor execution and agent system health
|
||||
</p>
|
||||
</div>
|
||||
<Button
|
||||
onClick={refresh}
|
||||
disabled={isLoading}
|
||||
variant="outline"
|
||||
size="small"
|
||||
>
|
||||
<ArrowClockwise
|
||||
className={`mr-2 h-4 w-4 ${isLoading ? "animate-spin" : ""}`}
|
||||
/>
|
||||
Refresh
|
||||
</Button>
|
||||
</div>
|
||||
|
||||
{/* Alert Cards for Critical Issues */}
|
||||
<div className="grid gap-4 md:grid-cols-3">
|
||||
{executionData && (
|
||||
<>
|
||||
{/* Orphaned Executions Alert */}
|
||||
{(executionData.orphaned_running > 0 ||
|
||||
executionData.orphaned_queued > 0) && (
|
||||
<div
|
||||
className="cursor-pointer transition-all hover:scale-105"
|
||||
onClick={() => setActiveTab("orphaned")}
|
||||
>
|
||||
<Card className="border-orange-300 bg-orange-50">
|
||||
<CardHeader className="pb-3">
|
||||
<CardTitle className="text-orange-800">
|
||||
Orphaned Executions
|
||||
</CardTitle>
|
||||
</CardHeader>
|
||||
<CardContent>
|
||||
<p className="text-3xl font-bold text-orange-900">
|
||||
{executionData.orphaned_running +
|
||||
executionData.orphaned_queued}
|
||||
</p>
|
||||
<p className="text-sm text-orange-700">
|
||||
{executionData.orphaned_running} running,{" "}
|
||||
{executionData.orphaned_queued} queued ({">"}24h old)
|
||||
</p>
|
||||
<p className="mt-2 text-xs text-orange-600">
|
||||
Click to view →
|
||||
</p>
|
||||
</CardContent>
|
||||
</Card>
|
||||
</div>
|
||||
)}
|
||||
|
||||
{/* Failed Executions Alert */}
|
||||
{executionData.failed_count_24h > 0 && (
|
||||
<div
|
||||
className="cursor-pointer transition-all hover:scale-105"
|
||||
onClick={() => setActiveTab("failed")}
|
||||
>
|
||||
<Card className="border-red-300 bg-red-50">
|
||||
<CardHeader className="pb-3">
|
||||
<CardTitle className="text-red-800">
|
||||
Failed Executions (24h)
|
||||
</CardTitle>
|
||||
</CardHeader>
|
||||
<CardContent>
|
||||
<p className="text-3xl font-bold text-red-900">
|
||||
{executionData.failed_count_24h}
|
||||
</p>
|
||||
<p className="text-sm text-red-700">
|
||||
{executionData.failed_count_1h} in last hour (
|
||||
{executionData.failure_rate_24h.toFixed(1)}/hr rate)
|
||||
</p>
|
||||
<p className="mt-2 text-xs text-red-600">Click to view →</p>
|
||||
</CardContent>
|
||||
</Card>
|
||||
</div>
|
||||
)}
|
||||
|
||||
{/* Long-Running Alert */}
|
||||
{executionData.stuck_running_24h > 0 && (
|
||||
<>
|
||||
<div
|
||||
className="cursor-pointer transition-all hover:scale-105"
|
||||
onClick={() => setActiveTab("long-running")}
|
||||
>
|
||||
<Card className="border-yellow-300 bg-yellow-50">
|
||||
<CardHeader className="pb-3">
|
||||
<CardTitle className="text-yellow-800">
|
||||
Long-Running Executions
|
||||
</CardTitle>
|
||||
</CardHeader>
|
||||
<CardContent>
|
||||
<p className="text-3xl font-bold text-yellow-900">
|
||||
{executionData.stuck_running_24h}
|
||||
</p>
|
||||
<p className="text-sm text-yellow-700">
|
||||
Running {">"}24h (oldest:{" "}
|
||||
{executionData.oldest_running_hours
|
||||
? `${Math.floor(executionData.oldest_running_hours)}h`
|
||||
: "N/A"}
|
||||
)
|
||||
</p>
|
||||
<p className="mt-2 text-xs text-yellow-600">
|
||||
Click to view →
|
||||
</p>
|
||||
</CardContent>
|
||||
</Card>
|
||||
</div>
|
||||
</>
|
||||
)}
|
||||
|
||||
{/* Orphaned Schedules Alert */}
|
||||
{scheduleData && scheduleData.total_orphaned > 0 && (
|
||||
<div
|
||||
className="cursor-pointer transition-all hover:scale-105"
|
||||
onClick={() => setActiveTab("all")}
|
||||
>
|
||||
<Card className="border-purple-300 bg-purple-50">
|
||||
<CardHeader className="pb-3">
|
||||
<CardTitle className="text-purple-800">
|
||||
Orphaned Schedules
|
||||
</CardTitle>
|
||||
</CardHeader>
|
||||
<CardContent>
|
||||
<p className="text-3xl font-bold text-purple-900">
|
||||
{scheduleData.total_orphaned}
|
||||
</p>
|
||||
<p className="text-sm text-purple-700">
|
||||
{scheduleData.orphaned_deleted_graph > 0 &&
|
||||
`${scheduleData.orphaned_deleted_graph} deleted graph, `}
|
||||
{scheduleData.orphaned_no_library_access > 0 &&
|
||||
`${scheduleData.orphaned_no_library_access} no access`}
|
||||
</p>
|
||||
<p className="mt-2 text-xs text-purple-600">
|
||||
Click to view schedules →
|
||||
</p>
|
||||
</CardContent>
|
||||
</Card>
|
||||
</div>
|
||||
)}
|
||||
|
||||
{/* Invalid State Alert */}
|
||||
{(executionData.invalid_queued_with_start > 0 ||
|
||||
executionData.invalid_running_without_start > 0) && (
|
||||
<div
|
||||
className="cursor-pointer transition-all hover:scale-105"
|
||||
onClick={() => setActiveTab("invalid")}
|
||||
>
|
||||
<Card className="border-pink-300 bg-pink-50">
|
||||
<CardHeader className="pb-3">
|
||||
<CardTitle className="text-pink-800">
|
||||
Invalid States (Data Corruption)
|
||||
</CardTitle>
|
||||
</CardHeader>
|
||||
<CardContent>
|
||||
<p className="text-3xl font-bold text-pink-900">
|
||||
{executionData.invalid_queued_with_start +
|
||||
executionData.invalid_running_without_start}
|
||||
</p>
|
||||
<p className="text-sm text-pink-700">
|
||||
Requires manual investigation
|
||||
</p>
|
||||
<p className="mt-2 text-xs text-pink-600">
|
||||
Click to view (read-only) →
|
||||
</p>
|
||||
</CardContent>
|
||||
</Card>
|
||||
</div>
|
||||
)}
|
||||
</>
|
||||
)}
|
||||
</div>
|
||||
|
||||
<div className="grid gap-6 md:grid-cols-3">
|
||||
<Card>
|
||||
<CardHeader>
|
||||
<CardTitle>Execution Queue Status</CardTitle>
|
||||
<CardDescription>
|
||||
Current execution and queue metrics
|
||||
</CardDescription>
|
||||
</CardHeader>
|
||||
<CardContent>
|
||||
{executionData ? (
|
||||
<div className="space-y-4">
|
||||
<div className="flex items-center justify-between rounded-lg border p-4">
|
||||
<div>
|
||||
<p className="text-sm font-medium text-gray-500">
|
||||
Running Executions
|
||||
</p>
|
||||
<p className="text-3xl font-bold">
|
||||
{executionData.running_executions}
|
||||
</p>
|
||||
</div>
|
||||
<div className="flex h-12 w-12 items-center justify-center rounded-full bg-green-100">
|
||||
<div className="h-6 w-6 rounded-full bg-green-500"></div>
|
||||
</div>
|
||||
</div>
|
||||
|
||||
<div className="flex items-center justify-between rounded-lg border p-4">
|
||||
<div>
|
||||
<p className="text-sm font-medium text-gray-500">
|
||||
Queued in Database
|
||||
</p>
|
||||
<p className="text-3xl font-bold">
|
||||
{executionData.queued_executions_db}
|
||||
</p>
|
||||
{executionData.stuck_queued_1h > 0 && (
|
||||
<p className="text-xs text-orange-600">
|
||||
{executionData.stuck_queued_1h} stuck {">"}1h
|
||||
</p>
|
||||
)}
|
||||
</div>
|
||||
<div className="flex h-12 w-12 items-center justify-center rounded-full bg-blue-100">
|
||||
<div className="h-6 w-6 rounded-full bg-blue-500"></div>
|
||||
</div>
|
||||
</div>
|
||||
|
||||
<div className="flex items-center justify-between rounded-lg border p-4">
|
||||
<div>
|
||||
<p className="text-sm font-medium text-gray-500">
|
||||
Queued in RabbitMQ
|
||||
</p>
|
||||
<p className="text-3xl font-bold">
|
||||
{executionData.queued_executions_rabbitmq === -1 ? (
|
||||
<span className="text-xl text-red-500">Error</span>
|
||||
) : (
|
||||
executionData.queued_executions_rabbitmq
|
||||
)}
|
||||
</p>
|
||||
</div>
|
||||
<div
|
||||
className={`flex h-12 w-12 items-center justify-center rounded-full ${
|
||||
executionData.queued_executions_rabbitmq === -1
|
||||
? "bg-red-100"
|
||||
: "bg-yellow-100"
|
||||
}`}
|
||||
>
|
||||
<div
|
||||
className={`h-6 w-6 rounded-full ${
|
||||
executionData.queued_executions_rabbitmq === -1
|
||||
? "bg-red-500"
|
||||
: "bg-yellow-500"
|
||||
}`}
|
||||
></div>
|
||||
</div>
|
||||
</div>
|
||||
|
||||
<div className="text-xs text-gray-400">
|
||||
Last updated:{" "}
|
||||
{new Date(executionData.timestamp).toLocaleString()}
|
||||
</div>
|
||||
</div>
|
||||
) : (
|
||||
<p className="text-gray-500">No data available</p>
|
||||
)}
|
||||
</CardContent>
|
||||
</Card>
|
||||
|
||||
<Card>
|
||||
<CardHeader>
|
||||
<CardTitle>System Throughput</CardTitle>
|
||||
<CardDescription>
|
||||
Execution completion and processing rates
|
||||
</CardDescription>
|
||||
</CardHeader>
|
||||
<CardContent>
|
||||
{executionData ? (
|
||||
<div className="space-y-4">
|
||||
<div className="flex items-center justify-between rounded-lg border p-4">
|
||||
<div>
|
||||
<p className="text-sm font-medium text-gray-500">
|
||||
Completed (24h)
|
||||
</p>
|
||||
<p className="text-3xl font-bold">
|
||||
{executionData.completed_24h}
|
||||
</p>
|
||||
<p className="text-xs text-gray-600">
|
||||
{executionData.completed_1h} in last hour
|
||||
</p>
|
||||
</div>
|
||||
<div className="flex h-12 w-12 items-center justify-center rounded-full bg-green-100">
|
||||
<div className="h-6 w-6 rounded-full bg-green-500"></div>
|
||||
</div>
|
||||
</div>
|
||||
|
||||
<div className="flex items-center justify-between rounded-lg border p-4">
|
||||
<div>
|
||||
<p className="text-sm font-medium text-gray-500">
|
||||
Throughput Rate
|
||||
</p>
|
||||
<p className="text-3xl font-bold">
|
||||
{executionData.throughput_per_hour.toFixed(1)}
|
||||
</p>
|
||||
<p className="text-xs text-gray-600">
|
||||
completions per hour
|
||||
</p>
|
||||
</div>
|
||||
<div className="flex h-12 w-12 items-center justify-center rounded-full bg-blue-100">
|
||||
<div className="h-6 w-6 rounded-full bg-blue-500"></div>
|
||||
</div>
|
||||
</div>
|
||||
|
||||
<div className="flex items-center justify-between rounded-lg border p-4">
|
||||
<div>
|
||||
<p className="text-sm font-medium text-gray-500">
|
||||
Cancel Queue Depth
|
||||
</p>
|
||||
<p className="text-3xl font-bold">
|
||||
{executionData.cancel_queue_depth === -1 ? (
|
||||
<span className="text-xl text-red-500">Error</span>
|
||||
) : (
|
||||
executionData.cancel_queue_depth
|
||||
)}
|
||||
</p>
|
||||
</div>
|
||||
<div className="flex h-12 w-12 items-center justify-center rounded-full bg-purple-100">
|
||||
<div className="h-6 w-6 rounded-full bg-purple-500"></div>
|
||||
</div>
|
||||
</div>
|
||||
|
||||
<div className="text-xs text-gray-400">
|
||||
Last updated:{" "}
|
||||
{new Date(executionData.timestamp).toLocaleString()}
|
||||
</div>
|
||||
</div>
|
||||
) : (
|
||||
<p className="text-gray-500">No data available</p>
|
||||
)}
|
||||
</CardContent>
|
||||
</Card>
|
||||
|
||||
<Card>
|
||||
<CardHeader>
|
||||
<CardTitle>Schedules</CardTitle>
|
||||
<CardDescription>
|
||||
Scheduled agent executions and health
|
||||
</CardDescription>
|
||||
</CardHeader>
|
||||
<CardContent>
|
||||
{scheduleData ? (
|
||||
<div className="space-y-4">
|
||||
<div className="flex items-center justify-between rounded-lg border p-4">
|
||||
<div>
|
||||
<p className="text-sm font-medium text-gray-500">
|
||||
User Schedules
|
||||
</p>
|
||||
<p className="text-3xl font-bold">
|
||||
{scheduleData.user_schedules}
|
||||
</p>
|
||||
{scheduleData.total_orphaned > 0 && (
|
||||
<p className="text-xs text-orange-600">
|
||||
{scheduleData.total_orphaned} orphaned
|
||||
</p>
|
||||
)}
|
||||
</div>
|
||||
<div className="flex h-12 w-12 items-center justify-center rounded-full bg-purple-100">
|
||||
<div className="h-6 w-6 rounded-full bg-purple-500"></div>
|
||||
</div>
|
||||
</div>
|
||||
|
||||
<div className="flex items-center justify-between rounded-lg border p-4">
|
||||
<div>
|
||||
<p className="text-sm font-medium text-gray-500">
|
||||
Upcoming Runs (1h)
|
||||
</p>
|
||||
<p className="text-3xl font-bold">
|
||||
{scheduleData.total_runs_next_hour}
|
||||
</p>
|
||||
<p className="text-xs text-gray-600">
|
||||
from {scheduleData.schedules_next_hour} schedule
|
||||
{scheduleData.schedules_next_hour !== 1 ? "s" : ""}
|
||||
</p>
|
||||
</div>
|
||||
<div className="flex h-12 w-12 items-center justify-center rounded-full bg-blue-100">
|
||||
<div className="h-6 w-6 rounded-full bg-blue-500"></div>
|
||||
</div>
|
||||
</div>
|
||||
|
||||
<div className="flex items-center justify-between rounded-lg border p-4">
|
||||
<div>
|
||||
<p className="text-sm font-medium text-gray-500">
|
||||
Upcoming Runs (24h)
|
||||
</p>
|
||||
<p className="text-3xl font-bold">
|
||||
{scheduleData.total_runs_next_24h}
|
||||
</p>
|
||||
<p className="text-xs text-gray-600">
|
||||
from {scheduleData.schedules_next_24h} schedule
|
||||
{scheduleData.schedules_next_24h !== 1 ? "s" : ""}
|
||||
</p>
|
||||
</div>
|
||||
<div className="flex h-12 w-12 items-center justify-center rounded-full bg-green-100">
|
||||
<div className="h-6 w-6 rounded-full bg-green-500"></div>
|
||||
</div>
|
||||
</div>
|
||||
|
||||
<div className="text-xs text-gray-400">
|
||||
Last updated:{" "}
|
||||
{new Date(scheduleData.timestamp).toLocaleString()}
|
||||
</div>
|
||||
</div>
|
||||
) : (
|
||||
<p className="text-gray-500">No data available</p>
|
||||
)}
|
||||
</CardContent>
|
||||
</Card>
|
||||
</div>
|
||||
|
||||
<Card>
|
||||
<CardHeader>
|
||||
<CardTitle>Diagnostic Information</CardTitle>
|
||||
<CardDescription>
|
||||
Understanding metrics and tabs for on-call diagnostics
|
||||
</CardDescription>
|
||||
</CardHeader>
|
||||
<CardContent>
|
||||
<div className="space-y-3 text-sm">
|
||||
<div>
|
||||
<p className="font-semibold text-orange-700">
|
||||
🟠 Orphaned Executions:
|
||||
</p>
|
||||
<p className="text-gray-600">
|
||||
Executions {">"}24h old in database but not actually running in
|
||||
executor. Usually from executor restarts/crashes. Safe to
|
||||
cleanup (marks as FAILED in DB).
|
||||
</p>
|
||||
</div>
|
||||
<div>
|
||||
<p className="font-semibold text-blue-700">
|
||||
🔵 Stuck Queued Executions:
|
||||
</p>
|
||||
<p className="text-gray-600">
|
||||
QUEUED {">"}1h but never started. Not in RabbitMQ queue. Can
|
||||
cleanup (safe) or requeue (⚠️ costs credits - only if temporary
|
||||
issue like RabbitMQ purge).
|
||||
</p>
|
||||
</div>
|
||||
<div>
|
||||
<p className="font-semibold text-yellow-700">
|
||||
🟡 Long-Running Executions:
|
||||
</p>
|
||||
<p className="text-gray-600">
|
||||
RUNNING status {">"}24h. May be legitimately long jobs or stuck.
|
||||
Review before stopping. Sends cancel signal to executor.
|
||||
</p>
|
||||
</div>
|
||||
<div>
|
||||
<p className="font-semibold text-red-700">
|
||||
🔴 Failed Executions:
|
||||
</p>
|
||||
<p className="text-gray-600">
|
||||
Executions that failed in last 24h. View error messages to
|
||||
identify patterns. Spike in failures indicates system issues.
|
||||
</p>
|
||||
</div>
|
||||
<div>
|
||||
<p className="font-semibold text-pink-700">
|
||||
🩷 Invalid States (Data Corruption):
|
||||
</p>
|
||||
<p className="text-gray-600">
|
||||
Executions in impossible states (QUEUED with startedAt, RUNNING
|
||||
without startedAt). Indicates DB corruption, race conditions, or
|
||||
crashes. Each requires manual investigation - no bulk actions
|
||||
provided.
|
||||
</p>
|
||||
</div>
|
||||
<div>
|
||||
<p className="font-semibold">Throughput Metrics:</p>
|
||||
<p className="text-gray-600">
|
||||
Completions per hour shows system productivity. Declining
|
||||
throughput indicates performance degradation or executor issues.
|
||||
</p>
|
||||
</div>
|
||||
<div>
|
||||
<p className="font-semibold">Queue Health:</p>
|
||||
<p className="text-gray-600">
|
||||
RabbitMQ depths should be low ({"<"}100). High queues indicate
|
||||
executor can't keep up. Cancel queue backlog indicates
|
||||
executor processing issues.
|
||||
</p>
|
||||
</div>
|
||||
</div>
|
||||
</CardContent>
|
||||
</Card>
|
||||
|
||||
{/* Add Executions Table with tab counts */}
|
||||
<ExecutionsTable
|
||||
onRefresh={refresh}
|
||||
initialTab={activeTab}
|
||||
onTabChange={setActiveTab}
|
||||
diagnosticsData={
|
||||
executionData
|
||||
? {
|
||||
orphaned_running: executionData.orphaned_running,
|
||||
orphaned_queued: executionData.orphaned_queued,
|
||||
failed_count_24h: executionData.failed_count_24h,
|
||||
stuck_running_24h: executionData.stuck_running_24h,
|
||||
stuck_queued_1h: executionData.stuck_queued_1h,
|
||||
invalid_queued_with_start:
|
||||
executionData.invalid_queued_with_start,
|
||||
invalid_running_without_start:
|
||||
executionData.invalid_running_without_start,
|
||||
}
|
||||
: undefined
|
||||
}
|
||||
/>
|
||||
|
||||
{/* Add Schedules Table */}
|
||||
<SchedulesTable
|
||||
onRefresh={refresh}
|
||||
diagnosticsData={
|
||||
scheduleData
|
||||
? {
|
||||
total_orphaned: scheduleData.total_orphaned,
|
||||
user_schedules: scheduleData.user_schedules,
|
||||
}
|
||||
: undefined
|
||||
}
|
||||
/>
|
||||
</div>
|
||||
);
|
||||
}
|
||||
File diff suppressed because it is too large
Load Diff
@@ -0,0 +1,424 @@
|
||||
"use client";
|
||||
|
||||
import { Button } from "@/components/atoms/Button/Button";
|
||||
import { Card } from "@/components/atoms/Card/Card";
|
||||
import {
|
||||
Dialog,
|
||||
DialogContent,
|
||||
DialogDescription,
|
||||
DialogFooter,
|
||||
DialogHeader,
|
||||
DialogTitle,
|
||||
} from "@/components/__legacy__/ui/dialog";
|
||||
import { toast } from "@/components/molecules/Toast/use-toast";
|
||||
import { ArrowClockwise, Trash, Copy } from "@phosphor-icons/react";
|
||||
import React, { useState } from "react";
|
||||
import {
|
||||
Table,
|
||||
TableHeader,
|
||||
TableBody,
|
||||
TableHead,
|
||||
TableRow,
|
||||
TableCell,
|
||||
} from "@/components/__legacy__/ui/table";
|
||||
import { Checkbox } from "@/components/__legacy__/ui/checkbox";
|
||||
import {
|
||||
CardHeader,
|
||||
CardTitle,
|
||||
CardContent,
|
||||
} from "@/components/__legacy__/ui/card";
|
||||
import {
|
||||
useGetV2ListAllUserSchedules,
|
||||
useGetV2ListOrphanedSchedules,
|
||||
usePostV2CleanupOrphanedSchedules,
|
||||
} from "@/app/api/__generated__/endpoints/admin/admin";
|
||||
import {
|
||||
TabsLine,
|
||||
TabsLineContent,
|
||||
TabsLineList,
|
||||
TabsLineTrigger,
|
||||
} from "@/components/molecules/TabsLine/TabsLine";
|
||||
|
||||
interface ScheduleDetail {
|
||||
schedule_id: string;
|
||||
schedule_name: string;
|
||||
graph_id: string;
|
||||
graph_name: string;
|
||||
graph_version: number;
|
||||
user_id: string;
|
||||
user_email: string | null;
|
||||
cron: string;
|
||||
timezone: string;
|
||||
next_run_time: string;
|
||||
}
|
||||
|
||||
interface OrphanedScheduleDetail {
|
||||
schedule_id: string;
|
||||
schedule_name: string;
|
||||
graph_id: string;
|
||||
graph_version: number;
|
||||
user_id: string;
|
||||
orphan_reason: string;
|
||||
error_detail: string | null;
|
||||
next_run_time: string;
|
||||
}
|
||||
|
||||
interface SchedulesTableProps {
|
||||
onRefresh?: () => void;
|
||||
diagnosticsData?: {
|
||||
total_orphaned: number;
|
||||
user_schedules: number;
|
||||
};
|
||||
}
|
||||
|
||||
export function SchedulesTable({
|
||||
onRefresh,
|
||||
diagnosticsData,
|
||||
}: SchedulesTableProps) {
|
||||
const [activeTab, setActiveTab] = useState<"all" | "orphaned">("all");
|
||||
const [selectedIds, setSelectedIds] = useState<Set<string>>(new Set());
|
||||
const [showDeleteDialog, setShowDeleteDialog] = useState(false);
|
||||
const [currentPage, setCurrentPage] = useState(1);
|
||||
const [pageSize] = useState(10);
|
||||
|
||||
// Fetch data based on active tab
|
||||
const allSchedulesQuery = useGetV2ListAllUserSchedules(
|
||||
{
|
||||
limit: pageSize,
|
||||
offset: (currentPage - 1) * pageSize,
|
||||
},
|
||||
{ query: { enabled: activeTab === "all" } },
|
||||
);
|
||||
|
||||
const orphanedSchedulesQuery = useGetV2ListOrphanedSchedules({
|
||||
query: { enabled: activeTab === "orphaned" },
|
||||
});
|
||||
|
||||
const activeQuery =
|
||||
activeTab === "orphaned" ? orphanedSchedulesQuery : allSchedulesQuery;
|
||||
|
||||
const {
|
||||
data: schedulesResponse,
|
||||
isLoading,
|
||||
error: _error,
|
||||
refetch,
|
||||
} = activeQuery;
|
||||
|
||||
const schedules =
|
||||
(schedulesResponse?.data as any)?.schedules || ([] as any[]);
|
||||
const total = (schedulesResponse?.data as any)?.total || 0;
|
||||
|
||||
// Cleanup mutation
|
||||
const { mutateAsync: cleanupOrphanedSchedules, isPending: isDeleting } =
|
||||
usePostV2CleanupOrphanedSchedules();
|
||||
|
||||
const handleSelectAll = (checked: boolean) => {
|
||||
if (checked) {
|
||||
setSelectedIds(new Set(schedules.map((s: any) => s.schedule_id)));
|
||||
} else {
|
||||
setSelectedIds(new Set());
|
||||
}
|
||||
};
|
||||
|
||||
const handleSelectSchedule = (id: string, checked: boolean) => {
|
||||
const newSelected = new Set(selectedIds);
|
||||
if (checked) {
|
||||
newSelected.add(id);
|
||||
} else {
|
||||
newSelected.delete(id);
|
||||
}
|
||||
setSelectedIds(newSelected);
|
||||
};
|
||||
|
||||
const confirmDelete = () => {
|
||||
setShowDeleteDialog(true);
|
||||
};
|
||||
|
||||
const handleDelete = async () => {
|
||||
setShowDeleteDialog(false);
|
||||
|
||||
try {
|
||||
const idsToDelete =
|
||||
activeTab === "orphaned" && selectedIds.size === 0
|
||||
? schedules.map((s: any) => s.schedule_id)
|
||||
: Array.from(selectedIds);
|
||||
|
||||
const result = await cleanupOrphanedSchedules({
|
||||
data: { execution_ids: idsToDelete }, // Reuses execution_ids field name
|
||||
});
|
||||
|
||||
toast({
|
||||
title: "Success",
|
||||
description:
|
||||
(result.data as any)?.message ||
|
||||
`Deleted ${(result.data as any)?.deleted_count || 0} schedule(s)`,
|
||||
});
|
||||
|
||||
setSelectedIds(new Set());
|
||||
await refetch();
|
||||
if (onRefresh) onRefresh();
|
||||
} catch (error: any) {
|
||||
console.error("Error deleting schedules:", error);
|
||||
toast({
|
||||
title: "Error",
|
||||
description: error.message || "Failed to delete schedules",
|
||||
variant: "destructive",
|
||||
});
|
||||
}
|
||||
};
|
||||
|
||||
const totalPages = Math.ceil(total / pageSize);
|
||||
|
||||
return (
|
||||
<>
|
||||
<Card>
|
||||
<TabsLine
|
||||
value={activeTab}
|
||||
onValueChange={(v) => setActiveTab(v as any)}
|
||||
>
|
||||
<CardHeader>
|
||||
<div className="flex items-center justify-between">
|
||||
<CardTitle>Schedules</CardTitle>
|
||||
<div className="flex gap-2">
|
||||
{activeTab === "orphaned" && schedules.length > 0 && (
|
||||
<Button
|
||||
variant="destructive"
|
||||
size="small"
|
||||
onClick={confirmDelete}
|
||||
disabled={isDeleting}
|
||||
>
|
||||
<Trash className="mr-2 h-4 w-4" />
|
||||
Delete All Orphaned ({total})
|
||||
</Button>
|
||||
)}
|
||||
{selectedIds.size > 0 && (
|
||||
<Button
|
||||
variant="destructive"
|
||||
size="small"
|
||||
onClick={confirmDelete}
|
||||
disabled={isDeleting}
|
||||
>
|
||||
<Trash className="mr-2 h-4 w-4" />
|
||||
Delete Selected ({selectedIds.size})
|
||||
</Button>
|
||||
)}
|
||||
<Button
|
||||
variant="outline"
|
||||
size="small"
|
||||
onClick={() => {
|
||||
refetch();
|
||||
if (onRefresh) onRefresh();
|
||||
}}
|
||||
disabled={isLoading}
|
||||
>
|
||||
<ArrowClockwise
|
||||
className={`h-4 w-4 ${isLoading ? "animate-spin" : ""}`}
|
||||
/>
|
||||
</Button>
|
||||
</div>
|
||||
</div>
|
||||
|
||||
<TabsLineList className="px-6">
|
||||
<TabsLineTrigger value="all">
|
||||
All Schedules
|
||||
{diagnosticsData && ` (${diagnosticsData.user_schedules})`}
|
||||
</TabsLineTrigger>
|
||||
<TabsLineTrigger value="orphaned">
|
||||
Orphaned
|
||||
{diagnosticsData && ` (${diagnosticsData.total_orphaned})`}
|
||||
</TabsLineTrigger>
|
||||
</TabsLineList>
|
||||
</CardHeader>
|
||||
|
||||
<TabsLineContent value={activeTab}>
|
||||
<CardContent>
|
||||
{isLoading && schedules.length === 0 ? (
|
||||
<div className="flex h-32 items-center justify-center">
|
||||
<ArrowClockwise className="h-6 w-6 animate-spin text-gray-400" />
|
||||
</div>
|
||||
) : schedules.length === 0 ? (
|
||||
<div className="py-8 text-center text-gray-500">
|
||||
No schedules found
|
||||
</div>
|
||||
) : (
|
||||
<Table>
|
||||
<TableHeader>
|
||||
<TableRow>
|
||||
<TableHead className="w-12">
|
||||
<Checkbox
|
||||
checked={
|
||||
selectedIds.size === schedules.length &&
|
||||
schedules.length > 0
|
||||
}
|
||||
onCheckedChange={handleSelectAll}
|
||||
/>
|
||||
</TableHead>
|
||||
<TableHead>Name</TableHead>
|
||||
<TableHead>Graph</TableHead>
|
||||
<TableHead>User</TableHead>
|
||||
<TableHead>Cron</TableHead>
|
||||
<TableHead>Next Run</TableHead>
|
||||
{activeTab === "orphaned" && (
|
||||
<TableHead>Orphan Reason</TableHead>
|
||||
)}
|
||||
</TableRow>
|
||||
</TableHeader>
|
||||
<TableBody>
|
||||
{schedules.map((schedule: any) => {
|
||||
const isOrphaned = activeTab === "orphaned";
|
||||
return (
|
||||
<TableRow
|
||||
key={schedule.schedule_id}
|
||||
className={isOrphaned ? "bg-purple-50" : ""}
|
||||
>
|
||||
<TableCell>
|
||||
<Checkbox
|
||||
checked={selectedIds.has(schedule.schedule_id)}
|
||||
onCheckedChange={(checked) =>
|
||||
handleSelectSchedule(
|
||||
schedule.schedule_id,
|
||||
checked as boolean,
|
||||
)
|
||||
}
|
||||
/>
|
||||
</TableCell>
|
||||
<TableCell>{schedule.schedule_name}</TableCell>
|
||||
<TableCell>
|
||||
<div>{schedule.graph_name || "Unknown"}</div>
|
||||
<div className="font-mono text-xs text-gray-500">
|
||||
v{schedule.graph_version}
|
||||
</div>
|
||||
</TableCell>
|
||||
<TableCell>
|
||||
<div>
|
||||
{(schedule as ScheduleDetail).user_email || (
|
||||
<span className="text-gray-400">Unknown</span>
|
||||
)}
|
||||
</div>
|
||||
<div
|
||||
className="group flex cursor-pointer items-center gap-1 font-mono text-xs text-gray-500 hover:text-gray-700"
|
||||
onClick={() => {
|
||||
navigator.clipboard.writeText(schedule.user_id);
|
||||
toast({
|
||||
title: "Copied",
|
||||
description: "User ID copied to clipboard",
|
||||
});
|
||||
}}
|
||||
title="Click to copy user ID"
|
||||
>
|
||||
{schedule.user_id.substring(0, 8)}...
|
||||
<Copy className="h-3 w-3 opacity-0 transition-opacity group-hover:opacity-100" />
|
||||
</div>
|
||||
</TableCell>
|
||||
<TableCell>
|
||||
<code className="rounded bg-gray-100 px-2 py-1 text-xs">
|
||||
{schedule.cron}
|
||||
</code>
|
||||
<div className="text-xs text-gray-500">
|
||||
{schedule.timezone}
|
||||
</div>
|
||||
</TableCell>
|
||||
<TableCell>
|
||||
{schedule.next_run_time
|
||||
? new Date(
|
||||
schedule.next_run_time,
|
||||
).toLocaleString()
|
||||
: "Not scheduled"}
|
||||
</TableCell>
|
||||
{activeTab === "orphaned" && (
|
||||
<TableCell>
|
||||
<span className="text-xs text-purple-600">
|
||||
{(
|
||||
schedule as OrphanedScheduleDetail
|
||||
).orphan_reason?.replace(/_/g, " ") ||
|
||||
"unknown"}
|
||||
</span>
|
||||
</TableCell>
|
||||
)}
|
||||
</TableRow>
|
||||
);
|
||||
})}
|
||||
</TableBody>
|
||||
</Table>
|
||||
)}
|
||||
|
||||
{totalPages > 1 && activeTab === "all" && (
|
||||
<div className="mt-4 flex items-center justify-between">
|
||||
<div className="text-sm text-gray-600">
|
||||
Showing {(currentPage - 1) * pageSize + 1} to{" "}
|
||||
{Math.min(currentPage * pageSize, total)} of {total}{" "}
|
||||
schedules
|
||||
</div>
|
||||
<div className="flex gap-2">
|
||||
<Button
|
||||
variant="outline"
|
||||
size="small"
|
||||
onClick={() => setCurrentPage(currentPage - 1)}
|
||||
disabled={currentPage === 1}
|
||||
>
|
||||
Previous
|
||||
</Button>
|
||||
<div className="flex items-center px-3">
|
||||
Page {currentPage} of {totalPages}
|
||||
</div>
|
||||
<Button
|
||||
variant="outline"
|
||||
size="small"
|
||||
onClick={() => setCurrentPage(currentPage + 1)}
|
||||
disabled={currentPage === totalPages}
|
||||
>
|
||||
Next
|
||||
</Button>
|
||||
</div>
|
||||
</div>
|
||||
)}
|
||||
</CardContent>
|
||||
</TabsLineContent>
|
||||
</TabsLine>
|
||||
</Card>
|
||||
|
||||
<Dialog open={showDeleteDialog} onOpenChange={setShowDeleteDialog}>
|
||||
<DialogContent>
|
||||
<DialogHeader>
|
||||
<DialogTitle>Confirm Delete Schedules</DialogTitle>
|
||||
<DialogDescription>
|
||||
{activeTab === "orphaned" && selectedIds.size === 0 ? (
|
||||
<>
|
||||
Are you sure you want to delete ALL {total} orphaned
|
||||
schedules?
|
||||
<br />
|
||||
<br />
|
||||
These schedules reference deleted graphs or graphs the user no
|
||||
longer has access to. Deleting them is safe.
|
||||
</>
|
||||
) : (
|
||||
<>
|
||||
Are you sure you want to delete {selectedIds.size} selected
|
||||
schedule(s)?
|
||||
<br />
|
||||
<br />
|
||||
This will permanently remove the schedules from the system.
|
||||
</>
|
||||
)}
|
||||
</DialogDescription>
|
||||
</DialogHeader>
|
||||
<DialogFooter>
|
||||
<Button
|
||||
variant="outline"
|
||||
onClick={() => setShowDeleteDialog(false)}
|
||||
>
|
||||
Cancel
|
||||
</Button>
|
||||
<Button
|
||||
variant="destructive"
|
||||
onClick={handleDelete}
|
||||
className="bg-red-600 hover:bg-red-700"
|
||||
>
|
||||
Delete Schedules
|
||||
</Button>
|
||||
</DialogFooter>
|
||||
</DialogContent>
|
||||
</Dialog>
|
||||
</>
|
||||
);
|
||||
}
|
||||
@@ -0,0 +1,63 @@
|
||||
import {
|
||||
useGetV2GetExecutionDiagnostics,
|
||||
useGetV2GetAgentDiagnostics,
|
||||
useGetV2GetScheduleDiagnostics,
|
||||
} from "@/app/api/__generated__/endpoints/admin/admin";
|
||||
import type { ExecutionDiagnosticsResponse } from "@/app/api/__generated__/models/executionDiagnosticsResponse";
|
||||
import type { AgentDiagnosticsResponse } from "@/app/api/__generated__/models/agentDiagnosticsResponse";
|
||||
import type { ScheduleHealthMetrics } from "@/app/api/__generated__/models/scheduleHealthMetrics";
|
||||
|
||||
export function useDiagnosticsContent() {
|
||||
const {
|
||||
data: executionResponse,
|
||||
isLoading: isLoadingExecutions,
|
||||
isError: isExecutionError,
|
||||
error: executionError,
|
||||
refetch: refetchExecutions,
|
||||
} = useGetV2GetExecutionDiagnostics();
|
||||
|
||||
const {
|
||||
data: agentResponse,
|
||||
isLoading: isLoadingAgents,
|
||||
isError: isAgentError,
|
||||
error: agentError,
|
||||
refetch: refetchAgents,
|
||||
} = useGetV2GetAgentDiagnostics();
|
||||
|
||||
const {
|
||||
data: scheduleResponse,
|
||||
isLoading: isLoadingSchedules,
|
||||
isError: isScheduleError,
|
||||
error: scheduleError,
|
||||
refetch: refetchSchedules,
|
||||
} = useGetV2GetScheduleDiagnostics();
|
||||
|
||||
const isLoading =
|
||||
isLoadingExecutions || isLoadingAgents || isLoadingSchedules;
|
||||
const isError = isExecutionError || isAgentError || isScheduleError;
|
||||
const error = executionError || agentError || scheduleError;
|
||||
|
||||
const executionData = executionResponse?.data as
|
||||
| ExecutionDiagnosticsResponse
|
||||
| undefined;
|
||||
const agentData = agentResponse?.data as AgentDiagnosticsResponse | undefined;
|
||||
const scheduleData = scheduleResponse?.data as
|
||||
| ScheduleHealthMetrics
|
||||
| undefined;
|
||||
|
||||
const refresh = () => {
|
||||
refetchExecutions();
|
||||
refetchAgents();
|
||||
refetchSchedules();
|
||||
};
|
||||
|
||||
return {
|
||||
executionData,
|
||||
agentData,
|
||||
scheduleData,
|
||||
isLoading,
|
||||
isError,
|
||||
error,
|
||||
refresh,
|
||||
};
|
||||
}
|
||||
@@ -0,0 +1,17 @@
|
||||
import { withRoleAccess } from "@/lib/withRoleAccess";
|
||||
import { DiagnosticsContent } from "./components/DiagnosticsContent";
|
||||
|
||||
function AdminDiagnostics() {
|
||||
return (
|
||||
<div className="mx-auto p-6">
|
||||
<DiagnosticsContent />
|
||||
</div>
|
||||
);
|
||||
}
|
||||
|
||||
export default async function AdminDiagnosticsPage() {
|
||||
"use server";
|
||||
const withAdminAccess = await withRoleAccess(["admin"]);
|
||||
const ProtectedAdminDiagnostics = await withAdminAccess(AdminDiagnostics);
|
||||
return <ProtectedAdminDiagnostics />;
|
||||
}
|
||||
@@ -1,5 +1,6 @@
|
||||
import { Sidebar } from "@/components/__legacy__/Sidebar";
|
||||
import { Users, DollarSign, UserSearch, FileText } from "lucide-react";
|
||||
import { PulseIcon } from "@phosphor-icons/react/ssr";
|
||||
|
||||
import { IconSliders } from "@/components/__legacy__/ui/icons";
|
||||
|
||||
@@ -16,6 +17,11 @@ const sidebarLinkGroups = [
|
||||
href: "/admin/spending",
|
||||
icon: <DollarSign className="h-6 w-6" />,
|
||||
},
|
||||
{
|
||||
text: "System Diagnostics",
|
||||
href: "/admin/diagnostics",
|
||||
icon: <PulseIcon className="h-6 w-6" />,
|
||||
},
|
||||
{
|
||||
text: "User Impersonation",
|
||||
href: "/admin/impersonation",
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -1074,6 +1074,7 @@ export type AddUserCreditsResponse = {
|
||||
new_balance: number;
|
||||
transaction_key: string;
|
||||
};
|
||||
|
||||
const _stringFormatToDataTypeMap: Partial<Record<string, DataType>> = {
|
||||
date: DataType.DATE,
|
||||
time: DataType.TIME,
|
||||
|
||||
Reference in New Issue
Block a user