Compare commits

...

23 Commits

Author SHA1 Message Date
claude[bot]
81a8e6f558 refactor(diagnostics): use model functions and clean up code
- Move all internal/local imports to top-level
- Remove unnecessary try-except blocks that just re-raise
- Replace direct Prisma count queries with get_graph_executions_count
- Add updated_time_gte/lte filter to get_graph_executions_count for
  querying failed/completed executions by update time
- Fix N+1 query in cleanup_orphaned_schedules_bulk by fetching all
  schedules once before the loop
- Extract SYSTEM_JOB_IDS constant to module level to avoid duplication

Co-authored-by: Nicholas Tindle <ntindle@users.noreply.github.com>
2026-02-05 05:42:07 +00:00
Nicholas Tindle
265295606b Remove duplicate library router inclusion
Eliminated a redundant app.include_router call for the v2 library routes in rest_api.py to prevent double registration and potential route conflicts.
2025-11-26 12:49:50 -06:00
Nicholas Tindle
6bf1ef6c0d fix: regen spec 2025-11-26 12:40:54 -06:00
Nicholas Tindle
91ee306b0e Merge branch 'dev' into claude/admin-user-management-011CULzkwgiPXZYcvCeozofC 2025-11-26 12:36:58 -06:00
Nicholas Tindle
2e16ef2272 Merge branch 'dev' into claude/admin-user-management-011CULzkwgiPXZYcvCeozofC 2025-11-07 11:28:29 -06:00
Nicholas Tindle
98cb639ab3 Clarify admin execution API descriptions
Updated descriptions for admin diagnostic endpoints to specify use of add_graph_execution and robust stop_graph_execution methods for requeue and stop operations, providing clearer implementation details for API consumers.
2025-11-06 15:12:38 -06:00
Nicholas Tindle
d0102f4e1f Refactor admin execution stop/requeue logic and tests
Replaces legacy stop/requeue functions in diagnostics.py with robust implementations using add_graph_execution and stop_graph_execution. Updates admin diagnostics routes to use these new methods, ensuring proper cascading and parallel handling. Adds comprehensive tests for admin routes, including edge cases and validation for bulk operations. Enhances get_graph_executions to support filtering by execution_ids for efficiency.
2025-11-06 15:06:27 -06:00
Nicholas Tindle
4950da2092 Add diagnostics for invalid execution states
Introduces detection and reporting of executions in impossible states (QUEUED with startedAt, RUNNING without startedAt) to backend diagnostics, API, and frontend. Adds a new read-only admin endpoint and UI tab for manual investigation of data corruption cases, updates metrics and OpenAPI spec, and refactors queries to support filtering by startedAt.
2025-11-06 14:08:42 -06:00
Nicholas Tindle
96e40daf80 Merge branch 'dev' into claude/admin-user-management-011CULzkwgiPXZYcvCeozofC 2025-11-06 12:41:00 -06:00
Nicholas Tindle
f7b332a435 Refactor diagnostics admin routes and add utility functions
Refactored admin diagnostics routes to remove redundant try/except blocks and streamline response handling. Added utility functions in diagnostics.py for fetching all orphaned and stuck queued execution IDs, and for counting failed executions. Updated execution.py to support offset in get_graph_executions. These changes improve maintainability, error logging, and enable bulk operations for admin endpoints.
2025-11-06 12:38:44 -06:00
Nicholas Tindle
4e6fd3f68f Update SchedulesTable.tsx 2025-11-03 19:45:30 -06:00
Nicholas Tindle
43dedd8c42 Move error toast to useEffect in ExecutionsTable
Error toast notification is now shown inside a useEffect hook to avoid side effects during render. Also updated the error message to 'Failed to fetch executions'.
2025-11-03 19:42:48 -06:00
Nicholas Tindle
c1c371bcf3 Add total upcoming execution runs to diagnostics
Backend now calculates and returns the total number of scheduled execution runs in the next hour and 24 hours, not just unique schedules. The frontend displays these new metrics in the diagnostics admin panel. The OpenAPI schema is updated to reflect the new fields.
2025-11-03 19:39:14 -06:00
Nicholas Tindle
6a72440005 Add admin endpoints for bulk stopping and cleanup of executions
Introduces backend and frontend support for stopping all long-running executions and cleaning up all stuck queued executions via new admin endpoints. Updates diagnostics logic to ensure both cancel signals and DB status updates are performed, adds corresponding API routes, and enhances the admin UI to expose these bulk actions. Also updates the sidebar icon for diagnostics.
2025-11-03 19:24:44 -06:00
Nicholas Tindle
1403c8f2de Improve failed executions error extraction and counting
Extract error messages from the stats JSON field in failed executions details. Update the admin diagnostics route to always count the actual number of failed executions within the specified time window, ensuring accurate pagination.
2025-11-03 18:37:01 -06:00
Nicholas Tindle
6068ed3516 Add admin diagnostics for agent schedules
Introduces backend endpoints and models for schedule diagnostics, including orphaned schedule detection, listing, and bulk cleanup. Updates the frontend to display schedule health metrics and a new schedules table with management actions. OpenAPI spec is updated to document the new endpoints and models.
2025-11-03 18:21:27 -06:00
Nicholas Tindle
53a6de9fdb feat(admin): Enhance diagnostics with comprehensive execution monitoring and management
Add extensive diagnostic capabilities for on-call engineers to monitor and manage execution health.

Backend Enhancements:
- Add 18 diagnostic metrics covering failures, orphaned executions, stuck queued, throughput, and queue health
- Implement orphaned execution detection (>24h old, not in executor)
- Add stuck queued detection (QUEUED >1h, never started)
- Add long-running execution detection (RUNNING >24h)
- Monitor both execution and cancel RabbitMQ queues
- Track failure rates (1h, 24h) and execution throughput metrics

New Backend Endpoints (15 total):
- GET /admin/diagnostics/executions/orphaned - List orphaned executions
- GET /admin/diagnostics/executions/stuck-queued - List stuck queued executions
- GET /admin/diagnostics/executions/long-running - List long-running executions
- GET /admin/diagnostics/executions/failed - List failed executions with error messages
- POST /admin/diagnostics/executions/cleanup-all-orphaned - Cleanup all orphaned (operates on entire dataset)
- POST /admin/diagnostics/executions/requeue - Requeue single stuck execution
- POST /admin/diagnostics/executions/requeue-bulk - Requeue selected executions
- POST /admin/diagnostics/executions/requeue-all-stuck - Requeue all stuck queued (operates on entire dataset)

Execution Management:
- Dual-mode stop: Active executions (cancel signals) vs orphaned (direct DB cleanup)
- Intelligent Stop All: Auto-splits active/orphaned, executes in parallel
- Requeue functionality for stuck QUEUED executions with credit cost warnings
- Stop sends cancel signals to RabbitMQ for graceful termination
- Cleanup orphaned updates DB directly without cancel signals
- ALL endpoints operate on entire datasets (not limited to pagination)

Frontend Enhancements:
- 5-tab filtering interface: All, Orphaned, Stuck Queued, Long-Running, Failed
- Clickable alert cards (🟠 🔴 🟡) automatically switch to relevant tabs
- Tab badges show live counts from diagnostics metrics
- Age column displays execution duration (e.g., "245d 12h")
- Orange row highlighting for orphaned executions (>24h old)
- Error message column for failed executions with hover tooltips
- Click-to-copy for execution IDs and user IDs with visual feedback
- Status badge colors match library view (blue=RUNNING, yellow=QUEUED, red=FAILED)

Tab-Specific Actions:
- Stuck Queued: Cleanup All OR Requeue All buttons with cost warnings
- Stuck Queued per-row: 🟠 Cleanup OR 🔵 Requeue buttons
- Orphaned: Cleanup All (operates on ALL orphaned)
- Long-Running: Stop All (sends cancel signals)
- Failed: View-only with error details
- All: Stop All (intelligent split of active/orphaned)

Alert Cards:
- 🟠 Orphaned: Shows count with RUNNING/QUEUED breakdown, click to view
- 🔴 Failed (24h): Shows count with hourly rate, click to view
- 🟡 Long-Running: Shows count with oldest execution age, click to view

Updated Diagnostic Info Card:
- Color-coded explanations for each execution type
- When to cleanup vs requeue vs stop
- Credit cost implications clearly documented
- Queue health thresholds explained

Provides ~70% coverage of on-call guide requirements for troubleshooting execution issues, orphaned database records, and system health monitoring.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-03 16:57:49 -06:00
Nicholas Tindle
cdd501c031 Merge branch 'dev' into claude/admin-user-management-011CULzkwgiPXZYcvCeozofC 2025-11-03 13:03:39 -06:00
claude[bot]
86b9ccfe5e fix: Apply linting and formatting fixes
- Run ruff, isort, and black on Python files
- Run prettier on TypeScript files
- Remove unused LaunchDarklyIntegration import from metrics.py

Co-authored-by: Nicholas Tindle <ntindle@users.noreply.github.com>
2025-10-22 07:48:54 +00:00
claude[bot]
cc1a2cd829 feat(admin): Add execution management table with stop functionality
- Created backend/data/diagnostics.py following Option B data layer pattern
- Refactored diagnostics_admin_routes.py to use the new data layer
- Added endpoints for listing running executions with details
- Added endpoints for stopping executions (single and bulk)
- Created ExecutionsTable component with multi-select and stop buttons
- Integrated execution management table into diagnostics page

Co-authored-by: Nicholas Tindle <ntindle@users.noreply.github.com>
2025-10-22 07:09:34 +00:00
claude[bot]
3a8cbe3eb4 fix: Remove unnecessary agent count endpoints from diagnostics
- Removed get_total_agents_count() function
- Removed get_active_agents_count() function
- Updated AgentDiagnosticsResponse model to only include agents_with_active_executions
- Updated frontend to display only agents with active executions metric

Co-authored-by: Nicholas Tindle <ntindle@users.noreply.github.com>
2025-10-21 22:00:31 +00:00
Claude
10c3b80cbd refactor(admin): Refactor diagnostics page to follow frontend contribution guidelines
Major refactoring to align with frontend best practices:

Frontend refactoring:
- Replace deprecated BackendAPI with generated React Query hooks
- Use useGetV2GetExecutionDiagnostics and useGetV2GetAgentDiagnostics
- Extract hook logic to useDiagnosticsContent.ts for separation of concerns
- Replace Lucide RefreshCw icon with Phosphor ArrowClockwise icon
- Use ErrorCard component from design system for error handling
- Remove manual useEffect/useState for data fetching

OpenAPI spec updates:
- Add /api/admin/diagnostics/executions endpoint definition
- Add /api/admin/diagnostics/agents endpoint definition
- Add ExecutionDiagnosticsResponse schema
- Add AgentDiagnosticsResponse schema

Component structure improvements:
- DiagnosticsContent.tsx: Render logic only
- useDiagnosticsContent.ts: Data fetching and state management
- Proper TypeScript typing with generated models
- Client-first approach with React Query

Deprecated code removal:
- Remove ExecutionDiagnosticsResponse from deprecated types.ts
- Remove AgentDiagnosticsResponse from deprecated types.ts
- Remove getExecutionDiagnosticsAdmin from BackendAPI client
- Remove getAgentDiagnosticsAdmin from BackendAPI client

This change follows the CONTRIBUTING.md guidelines:
- Uses generated API hooks instead of BackendAPI
- Uses Phosphor icons only
- Uses design system components (ErrorCard)
- Separates render logic from data/behavior logic
- Client-first architecture

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-21 21:12:26 +00:00
Claude
632085528a feat(admin): Add system diagnostics page for execution and agent monitoring
Add new admin diagnostics page to improve on-call diagnostics with the following features:

Backend changes:
- Add ExecutionDiagnosticsResponse and AgentDiagnosticsResponse models
- Create diagnostics_admin_routes.py with endpoints for:
  - /admin/diagnostics/executions - Get running, queued (DB), and queued (RabbitMQ) execution counts
  - /admin/diagnostics/agents - Get total agents, active agents, and agents with active executions
- Register new diagnostics routes in rest_api.py
- Use Prisma for database queries and direct RabbitMQ connection for queue depth

Frontend changes:
- Add new /admin/diagnostics page with real-time metrics display
- Create DiagnosticsContent component with auto-refresh capability
- Add diagnostic metrics cards for:
  - Running executions
  - Queued executions (database)
  - Queued executions (RabbitMQ)
  - Total agents
  - Active agents
  - Agents with active executions
- Add "System Diagnostics" link to admin navigation sidebar
- Update TypeScript types for new API responses

This improves on-call diagnostics by providing visibility into:
- System load (running executions)
- Queue backlog (DB vs RabbitMQ comparison)
- Agent activity levels

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-21 20:55:08 +00:00
17 changed files with 6253 additions and 32 deletions

File diff suppressed because it is too large Load Diff

View File

@@ -27,6 +27,7 @@ from prisma.models import (
AgentNodeExecutionKeyValueData,
)
from prisma.types import (
AgentGraphExecutionOrderByInput,
AgentGraphExecutionUpdateManyMutationInput,
AgentGraphExecutionWhereInput,
AgentNodeExecutionCreateInput,
@@ -459,20 +460,39 @@ class NodeExecutionResult(BaseModel):
async def get_graph_executions(
graph_exec_id: Optional[str] = None,
execution_ids: Optional[list[str]] = None,
graph_id: Optional[str] = None,
graph_version: Optional[int] = None,
user_id: Optional[str] = None,
statuses: Optional[list[ExecutionStatus]] = None,
created_time_gte: Optional[datetime] = None,
created_time_lte: Optional[datetime] = None,
started_time_gte: Optional[datetime] = None,
started_time_lte: Optional[datetime] = None,
limit: Optional[int] = None,
offset: Optional[int] = None,
order_by: Literal["createdAt", "startedAt", "updatedAt"] = "createdAt",
order_direction: Literal["asc", "desc"] = "desc",
) -> list[GraphExecutionMeta]:
"""⚠️ **Optional `user_id` check**: MUST USE check in user-facing endpoints."""
"""
Get graph executions with optional filters and ordering.
⚠️ **Optional `user_id` check**: MUST USE check in user-facing endpoints.
Args:
graph_exec_id: Filter by single execution ID (mutually exclusive with execution_ids)
execution_ids: Filter by list of execution IDs (mutually exclusive with graph_exec_id)
order_by: Field to order by. Defaults to "createdAt"
order_direction: Sort direction. Defaults to "desc"
"""
where_filter: AgentGraphExecutionWhereInput = {
"isDeleted": False,
}
if graph_exec_id:
where_filter["id"] = graph_exec_id
elif execution_ids:
where_filter["id"] = {"in": execution_ids}
if user_id:
where_filter["userId"] = user_id
if graph_id:
@@ -484,13 +504,36 @@ async def get_graph_executions(
"gte": created_time_gte or datetime.min.replace(tzinfo=timezone.utc),
"lte": created_time_lte or datetime.max.replace(tzinfo=timezone.utc),
}
if started_time_gte or started_time_lte:
where_filter["startedAt"] = {
"gte": started_time_gte or datetime.min.replace(tzinfo=timezone.utc),
"lte": started_time_lte or datetime.max.replace(tzinfo=timezone.utc),
}
if statuses:
where_filter["OR"] = [{"executionStatus": status} for status in statuses]
# Build properly typed order clause
# Prisma wants specific typed dicts for each field, so we construct them explicitly
order_clause: AgentGraphExecutionOrderByInput
match (order_by):
case "startedAt":
order_clause = {
"startedAt": order_direction,
}
case "updatedAt":
order_clause = {
"updatedAt": order_direction,
}
case _:
order_clause = {
"createdAt": order_direction,
}
executions = await AgentGraphExecution.prisma().find_many(
where=where_filter,
order={"createdAt": "desc"},
order=order_clause,
take=limit,
skip=offset,
)
return [GraphExecutionMeta.from_db(execution) for execution in executions]
@@ -501,6 +544,10 @@ async def get_graph_executions_count(
statuses: Optional[list[ExecutionStatus]] = None,
created_time_gte: Optional[datetime] = None,
created_time_lte: Optional[datetime] = None,
started_time_gte: Optional[datetime] = None,
started_time_lte: Optional[datetime] = None,
updated_time_gte: Optional[datetime] = None,
updated_time_lte: Optional[datetime] = None,
) -> int:
"""
Get count of graph executions with optional filters.
@@ -511,6 +558,10 @@ async def get_graph_executions_count(
statuses: Optional list of execution statuses to filter by
created_time_gte: Optional minimum creation time
created_time_lte: Optional maximum creation time
started_time_gte: Optional minimum start time (when execution started running)
started_time_lte: Optional maximum start time (when execution started running)
updated_time_gte: Optional minimum update time
updated_time_lte: Optional maximum update time
Returns:
Count of matching graph executions
@@ -530,6 +581,19 @@ async def get_graph_executions_count(
"gte": created_time_gte or datetime.min.replace(tzinfo=timezone.utc),
"lte": created_time_lte or datetime.max.replace(tzinfo=timezone.utc),
}
if started_time_gte or started_time_lte:
where_filter["startedAt"] = {
"gte": started_time_gte or datetime.min.replace(tzinfo=timezone.utc),
"lte": started_time_lte or datetime.max.replace(tzinfo=timezone.utc),
}
if updated_time_gte or updated_time_lte:
where_filter["updatedAt"] = {
"gte": updated_time_gte or datetime.min.replace(tzinfo=timezone.utc),
"lte": updated_time_lte or datetime.max.replace(tzinfo=timezone.utc),
}
if statuses:
where_filter["OR"] = [{"executionStatus": status} for status in statuses]

View File

@@ -9,6 +9,7 @@ from backend.data.execution import (
get_block_error_stats,
get_child_graph_executions,
get_execution_kv_data,
get_graph_execution,
get_graph_execution_meta,
get_graph_executions,
get_graph_executions_count,
@@ -125,6 +126,7 @@ class DatabaseManager(AppService):
get_child_graph_executions = _(get_child_graph_executions)
get_graph_executions = _(get_graph_executions)
get_graph_executions_count = _(get_graph_executions_count)
get_graph_execution = _(get_graph_execution)
get_graph_execution_meta = _(get_graph_execution_meta)
create_graph_execution = _(create_graph_execution)
get_node_execution = _(get_node_execution)
@@ -198,6 +200,7 @@ class DatabaseManagerClient(AppServiceClient):
# Executions
get_graph_executions = _(d.get_graph_executions)
get_graph_executions_count = _(d.get_graph_executions_count)
get_graph_execution = _(d.get_graph_execution)
get_graph_execution_meta = _(d.get_graph_execution_meta)
get_node_executions = _(d.get_node_executions)
update_node_execution_status = _(d.update_node_execution_status)
@@ -241,6 +244,7 @@ class DatabaseManagerAsyncClient(AppServiceClient):
get_latest_node_execution = d.get_latest_node_execution
get_graph = d.get_graph
get_graph_metadata = d.get_graph_metadata
get_graph_execution = d.get_graph_execution
get_graph_execution_meta = d.get_graph_execution_meta
get_node = d.get_node
get_node_execution = d.get_node_execution

View File

@@ -764,10 +764,15 @@ async def add_graph_execution(
nodes_input_masks: Optional[NodesInputMasks] = None,
parent_graph_exec_id: Optional[str] = None,
is_sub_graph: bool = False,
graph_exec_id: Optional[str] = None,
) -> GraphExecutionWithNodes:
"""
Adds a graph execution to the queue and returns the execution entry.
Supports two modes:
1. CREATE mode (graph_exec_id=None): Validates, creates new DB entry, and queues
2. REQUEUE mode (graph_exec_id provided): Fetches existing execution and re-queues it
Args:
graph_id: The ID of the graph to execute.
user_id: The ID of the user executing the graph.
@@ -779,8 +784,9 @@ async def add_graph_execution(
nodes_input_masks: Node inputs to use in the execution.
parent_graph_exec_id: The ID of the parent graph execution (for nested executions).
is_sub_graph: Whether this is a sub-graph execution.
graph_exec_id: Optional existing execution ID to requeue (skips creation).
Returns:
GraphExecutionEntry: The entry for the graph execution.
GraphExecutionWithNodes: The execution entry.
Raises:
ValueError: If the graph is not found or if there are validation errors.
"""
@@ -789,44 +795,80 @@ async def add_graph_execution(
else:
edb = get_database_manager_async_client()
graph, starting_nodes_input, compiled_nodes_input_masks = (
await validate_and_construct_node_execution_input(
graph_id=graph_id,
user_id=user_id,
graph_inputs=inputs or {},
graph_version=graph_version,
graph_credentials_inputs=graph_credentials_inputs,
nodes_input_masks=nodes_input_masks,
is_sub_graph=is_sub_graph,
)
)
graph_exec = None
compiled_nodes_input_masks = None
try:
# Sanity check: running add_graph_execution with the properties of
# the graph_exec created here should create the same execution again.
graph_exec = await edb.create_graph_execution(
user_id=user_id,
graph_id=graph_id,
graph_version=graph.version,
inputs=inputs or {},
credential_inputs=graph_credentials_inputs,
nodes_input_masks=nodes_input_masks,
starting_nodes_input=starting_nodes_input,
preset_id=preset_id,
parent_graph_exec_id=parent_graph_exec_id,
)
if graph_exec_id:
# REQUEUE mode: Fetch existing execution instead of creating new one
logger.info(f"Requeueing existing execution {graph_exec_id}")
graph_exec_meta = await edb.get_graph_execution_meta(
user_id=user_id,
execution_id=graph_exec_id,
)
if not graph_exec_meta:
raise ValueError(f"Execution {graph_exec_id} not found")
if graph_exec_meta.status != ExecutionStatus.QUEUED:
raise ValueError(
f"Can only requeue QUEUED executions, got {graph_exec_meta.status}"
)
# Fetch full execution with nodes for publishing
graph_exec = await edb.get_graph_execution(
user_id=user_id,
execution_id=graph_exec_id,
include_node_executions=True,
)
if not graph_exec:
raise ValueError(f"Execution {graph_exec_id} not found")
# Use existing execution's parameters
compiled_nodes_input_masks = graph_exec.nodes_input_masks
else:
# CREATE mode: Validate and create new execution
graph, starting_nodes_input, compiled_nodes_input_masks = (
await validate_and_construct_node_execution_input(
graph_id=graph_id,
user_id=user_id,
graph_inputs=inputs or {},
graph_version=graph_version,
graph_credentials_inputs=graph_credentials_inputs,
nodes_input_masks=nodes_input_masks,
is_sub_graph=is_sub_graph,
)
)
# Sanity check: running add_graph_execution with the properties of
# the graph_exec created here should create the same execution again.
graph_exec = await edb.create_graph_execution(
user_id=user_id,
graph_id=graph_id,
graph_version=graph.version,
inputs=inputs or {},
credential_inputs=graph_credentials_inputs,
nodes_input_masks=nodes_input_masks,
starting_nodes_input=starting_nodes_input,
preset_id=preset_id,
parent_graph_exec_id=parent_graph_exec_id,
)
logger.info(
f"Created graph execution #{graph_exec.id} for graph "
f"#{graph_id} with {len(starting_nodes_input)} starting nodes. "
f"Now publishing to execution queue."
)
# Common: Publish to execution queue (works for both create and requeue)
graph_exec_entry = graph_exec.to_graph_execution_entry(
user_context=await get_user_context(user_id),
compiled_nodes_input_masks=compiled_nodes_input_masks,
parent_graph_exec_id=parent_graph_exec_id,
)
logger.info(
f"Created graph execution #{graph_exec.id} for graph "
f"#{graph_id} with {len(starting_nodes_input)} starting nodes. "
f"Now publishing to execution queue."
)
exec_queue = await get_async_execution_queue()
await exec_queue.publish_message(
@@ -836,6 +878,7 @@ async def add_graph_execution(
)
logger.info(f"Published execution {graph_exec.id} to RabbitMQ queue")
# Common: Update status to QUEUED
graph_exec.status = ExecutionStatus.QUEUED
await edb.update_graph_execution_stats(
graph_exec_id=graph_exec.id,

View File

@@ -451,3 +451,144 @@ async def test_add_graph_execution_is_repeatable(mocker: MockerFixture):
# Both executions should succeed (though they create different objects)
assert result1 == mock_graph_exec
assert result2 == mock_graph_exec_2
@pytest.mark.asyncio
async def test_add_graph_execution_requeue_mode(mocker: MockerFixture):
"""Test that add_graph_execution with graph_exec_id requeues instead of creating"""
from backend.data.execution import ExecutionStatus, GraphExecutionWithNodes
from backend.executor.utils import add_graph_execution
existing_exec_id = "existing-exec-123"
graph_id = "graph-456"
user_id = "user-789"
graph_version = 1
# Mock existing execution (QUEUED, ready to requeue)
mock_existing_exec_meta = mocker.MagicMock()
mock_existing_exec_meta.id = existing_exec_id
mock_existing_exec_meta.user_id = user_id
mock_existing_exec_meta.graph_id = graph_id
mock_existing_exec_meta.graph_version = graph_version
mock_existing_exec_meta.status = ExecutionStatus.QUEUED
mock_existing_exec_full = mocker.MagicMock(spec=GraphExecutionWithNodes)
mock_existing_exec_full.id = existing_exec_id
mock_existing_exec_full.user_id = user_id
mock_existing_exec_full.graph_id = graph_id
mock_existing_exec_full.graph_version = graph_version
mock_existing_exec_full.status = ExecutionStatus.QUEUED
mock_existing_exec_full.nodes_input_masks = {"node1": {"input1": "value1"}}
mock_existing_exec_full.node_executions = []
mock_existing_exec_full.to_graph_execution_entry.return_value = mocker.MagicMock()
mock_existing_exec_full.to_graph_execution_entry.return_value.model_dump_json.return_value = (
"{}"
)
# Mock database manager
mock_edb = mocker.patch("backend.executor.utils.execution_db")
mock_edb.get_graph_execution_meta = mocker.AsyncMock(
return_value=mock_existing_exec_meta
)
mock_edb.get_graph_execution = mocker.AsyncMock(
return_value=mock_existing_exec_full
)
mock_edb.create_graph_execution = mocker.AsyncMock() # Should NOT be called
mock_edb.update_graph_execution_stats = mocker.AsyncMock()
# Mock prisma
mock_prisma = mocker.patch("backend.executor.utils.prisma")
mock_prisma.is_connected.return_value = True
# Mock queue
mock_queue = mocker.AsyncMock()
mock_queue.publish_message = mocker.AsyncMock()
# Mock event bus
mock_event_bus = mocker.MagicMock()
mock_event_bus.publish = mocker.AsyncMock()
# Mock user context
mock_user_context = mocker.MagicMock()
mocker.patch(
"backend.executor.utils.get_async_execution_queue", return_value=mock_queue
)
mocker.patch(
"backend.executor.utils.get_async_execution_event_bus",
return_value=mock_event_bus,
)
mocker.patch(
"backend.executor.utils.get_user_context", return_value=mock_user_context
)
# Call add_graph_execution in REQUEUE mode
await add_graph_execution(
graph_id=graph_id,
user_id=user_id,
graph_version=graph_version,
graph_exec_id=existing_exec_id, # This triggers REQUEUE mode
)
# Verify: Should NOT create new execution
mock_edb.create_graph_execution.assert_not_called()
# Verify: Should fetch existing execution
mock_edb.get_graph_execution_meta.assert_called_once()
mock_edb.get_graph_execution.assert_called_once()
# Verify: Should publish to queue (same as create mode)
mock_queue.publish_message.assert_called_once()
# Verify: Should update status to QUEUED
mock_edb.update_graph_execution_stats.assert_called_once()
@pytest.mark.asyncio
async def test_add_graph_execution_requeue_fails_if_not_queued(mocker: MockerFixture):
"""Test that requeue mode fails if execution is not in QUEUED status"""
from backend.data.execution import ExecutionStatus
from backend.executor.utils import add_graph_execution
# Mock execution that's RUNNING (not QUEUED)
mock_exec_meta = mocker.MagicMock()
mock_exec_meta.id = "exec-running-123"
mock_exec_meta.user_id = "user-123"
mock_exec_meta.graph_id = "graph-456"
mock_exec_meta.graph_version = 1
mock_exec_meta.status = ExecutionStatus.RUNNING # Wrong status!
mock_edb = mocker.patch("backend.executor.utils.execution_db")
mock_edb.get_graph_execution_meta = mocker.AsyncMock(return_value=mock_exec_meta)
mock_prisma = mocker.patch("backend.executor.utils.prisma")
mock_prisma.is_connected.return_value = True
# Should raise ValueError
with pytest.raises(ValueError, match="Can only requeue QUEUED executions"):
await add_graph_execution(
graph_id="graph-456",
user_id="user-123",
graph_exec_id="exec-running-123", # Requeue mode
)
@pytest.mark.asyncio
async def test_add_graph_execution_requeue_fails_if_not_found(mocker: MockerFixture):
"""Test that requeue mode fails if execution doesn't exist"""
from backend.executor.utils import add_graph_execution
# Mock execution not found
mock_edb = mocker.patch("backend.executor.utils.execution_db")
mock_edb.get_graph_execution_meta = mocker.AsyncMock(return_value=None)
mock_prisma = mocker.patch("backend.executor.utils.prisma")
mock_prisma.is_connected.return_value = True
# Should raise ValueError
with pytest.raises(ValueError, match="Execution .* not found"):
await add_graph_execution(
graph_id="graph-456",
user_id="user-123",
graph_exec_id="nonexistent-exec", # Requeue mode
)

View File

@@ -24,6 +24,7 @@ import backend.integrations.webhooks.utils
import backend.server.routers.postmark.postmark
import backend.server.routers.v1
import backend.server.v2.admin.credit_admin_routes
import backend.server.v2.admin.diagnostics_admin_routes
import backend.server.v2.admin.execution_analytics_routes
import backend.server.v2.admin.store_admin_routes
import backend.server.v2.builder
@@ -269,6 +270,11 @@ app.include_router(
tags=["v2", "admin"],
prefix="/api/credits",
)
app.include_router(
backend.server.v2.admin.diagnostics_admin_routes.router,
tags=["v2", "admin"],
prefix="/api",
)
app.include_router(
backend.server.v2.admin.execution_analytics_routes.router,
tags=["v2", "admin"],

View File

@@ -0,0 +1,926 @@
import asyncio
import logging
from typing import List
from autogpt_libs.auth import requires_admin_user
from autogpt_libs.auth.models import User as AuthUser
from fastapi import APIRouter, HTTPException, Security
from prisma.enums import AgentExecutionStatus
from pydantic import BaseModel
from backend.data.diagnostics import (
FailedExecutionDetail,
OrphanedScheduleDetail,
RunningExecutionDetail,
ScheduleDetail,
ScheduleHealthMetrics,
cleanup_all_stuck_queued_executions,
cleanup_orphaned_executions_bulk,
cleanup_orphaned_schedules_bulk,
get_agent_diagnostics,
get_all_orphaned_execution_ids,
get_all_schedules_details,
get_all_stuck_queued_execution_ids,
get_execution_diagnostics,
get_failed_executions_count,
get_failed_executions_details,
get_invalid_executions_details,
get_long_running_executions_details,
get_orphaned_executions_details,
get_orphaned_schedules_details,
get_running_executions_details,
get_schedule_health_metrics,
get_stuck_queued_executions_details,
stop_all_long_running_executions,
)
from backend.data.execution import get_graph_executions
from backend.executor.utils import add_graph_execution, stop_graph_execution
from backend.server.v2.admin.model import (
AgentDiagnosticsResponse,
ExecutionDiagnosticsResponse,
)
logger = logging.getLogger(__name__)
router = APIRouter(
prefix="/admin",
tags=["diagnostics", "admin"],
dependencies=[Security(requires_admin_user)],
)
class RunningExecutionsListResponse(BaseModel):
"""Response model for list of running executions"""
executions: List[RunningExecutionDetail]
total: int
class FailedExecutionsListResponse(BaseModel):
"""Response model for list of failed executions"""
executions: List[FailedExecutionDetail]
total: int
class StopExecutionRequest(BaseModel):
"""Request model for stopping a single execution"""
execution_id: str
class StopExecutionsRequest(BaseModel):
"""Request model for stopping multiple executions"""
execution_ids: List[str]
class StopExecutionResponse(BaseModel):
"""Response model for stop execution operations"""
success: bool
stopped_count: int = 0
message: str
class RequeueExecutionResponse(BaseModel):
"""Response model for requeue execution operations"""
success: bool
requeued_count: int = 0
message: str
@router.get(
"/diagnostics/executions",
response_model=ExecutionDiagnosticsResponse,
summary="Get Execution Diagnostics",
)
async def get_execution_diagnostics_endpoint():
"""
Get comprehensive diagnostic information about execution status.
Returns all execution metrics including:
- Current state (running, queued)
- Orphaned executions (>24h old, likely not in executor)
- Failure metrics (1h, 24h, rate)
- Long-running detection (stuck >1h, >24h)
- Stuck queued detection
- Throughput metrics (completions/hour)
- RabbitMQ queue depths
"""
logger.info("Getting execution diagnostics")
diagnostics = await get_execution_diagnostics()
response = ExecutionDiagnosticsResponse(
running_executions=diagnostics.running_count,
queued_executions_db=diagnostics.queued_db_count,
queued_executions_rabbitmq=diagnostics.rabbitmq_queue_depth,
cancel_queue_depth=diagnostics.cancel_queue_depth,
orphaned_running=diagnostics.orphaned_running,
orphaned_queued=diagnostics.orphaned_queued,
failed_count_1h=diagnostics.failed_count_1h,
failed_count_24h=diagnostics.failed_count_24h,
failure_rate_24h=diagnostics.failure_rate_24h,
stuck_running_24h=diagnostics.stuck_running_24h,
stuck_running_1h=diagnostics.stuck_running_1h,
oldest_running_hours=diagnostics.oldest_running_hours,
stuck_queued_1h=diagnostics.stuck_queued_1h,
queued_never_started=diagnostics.queued_never_started,
invalid_queued_with_start=diagnostics.invalid_queued_with_start,
invalid_running_without_start=diagnostics.invalid_running_without_start,
completed_1h=diagnostics.completed_1h,
completed_24h=diagnostics.completed_24h,
throughput_per_hour=diagnostics.throughput_per_hour,
timestamp=diagnostics.timestamp,
)
logger.info(
f"Execution diagnostics: running={diagnostics.running_count}, "
f"queued_db={diagnostics.queued_db_count}, "
f"orphaned={diagnostics.orphaned_running + diagnostics.orphaned_queued}, "
f"failed_24h={diagnostics.failed_count_24h}"
)
return response
@router.get(
"/diagnostics/agents",
response_model=AgentDiagnosticsResponse,
summary="Get Agent Diagnostics",
)
async def get_agent_diagnostics_endpoint():
"""
Get diagnostic information about agents.
Returns:
- agents_with_active_executions: Number of unique agents with running/queued executions
- timestamp: Current timestamp
"""
logger.info("Getting agent diagnostics")
diagnostics = await get_agent_diagnostics()
response = AgentDiagnosticsResponse(
agents_with_active_executions=diagnostics.agents_with_active_executions,
timestamp=diagnostics.timestamp,
)
logger.info(
f"Agent diagnostics: with_active_executions={diagnostics.agents_with_active_executions}"
)
return response
@router.get(
"/diagnostics/executions/running",
response_model=RunningExecutionsListResponse,
summary="List Running Executions",
)
async def list_running_executions(
limit: int = 100,
offset: int = 0,
):
"""
Get detailed list of running and queued executions (recent, likely active).
Args:
limit: Maximum number of executions to return (default 100)
offset: Number of executions to skip (default 0)
Returns:
List of running executions with details
"""
logger.info(f"Listing running executions (limit={limit}, offset={offset})")
executions = await get_running_executions_details(limit=limit, offset=offset)
# Get total count for pagination
diagnostics = await get_execution_diagnostics()
total = diagnostics.running_count + diagnostics.queued_db_count
return RunningExecutionsListResponse(executions=executions, total=total)
@router.get(
"/diagnostics/executions/orphaned",
response_model=RunningExecutionsListResponse,
summary="List Orphaned Executions",
)
async def list_orphaned_executions(
limit: int = 100,
offset: int = 0,
):
"""
Get detailed list of orphaned executions (>24h old, likely not in executor).
Args:
limit: Maximum number of executions to return (default 100)
offset: Number of executions to skip (default 0)
Returns:
List of orphaned executions with details
"""
logger.info(f"Listing orphaned executions (limit={limit}, offset={offset})")
executions = await get_orphaned_executions_details(limit=limit, offset=offset)
# Get total count for pagination
diagnostics = await get_execution_diagnostics()
total = diagnostics.orphaned_running + diagnostics.orphaned_queued
return RunningExecutionsListResponse(executions=executions, total=total)
@router.get(
"/diagnostics/executions/failed",
response_model=FailedExecutionsListResponse,
summary="List Failed Executions",
)
async def list_failed_executions(
limit: int = 100,
offset: int = 0,
hours: int = 24,
):
"""
Get detailed list of failed executions.
Args:
limit: Maximum number of executions to return (default 100)
offset: Number of executions to skip (default 0)
hours: Number of hours to look back (default 24)
Returns:
List of failed executions with error details
"""
logger.info(
f"Listing failed executions (limit={limit}, offset={offset}, hours={hours})"
)
executions = await get_failed_executions_details(
limit=limit, offset=offset, hours=hours
)
# Get total count for pagination
# Always count actual total for given hours parameter
total = await get_failed_executions_count(hours=hours)
return FailedExecutionsListResponse(executions=executions, total=total)
@router.get(
"/diagnostics/executions/long-running",
response_model=RunningExecutionsListResponse,
summary="List Long-Running Executions",
)
async def list_long_running_executions(
limit: int = 100,
offset: int = 0,
):
"""
Get detailed list of long-running executions (RUNNING status >24h).
Args:
limit: Maximum number of executions to return (default 100)
offset: Number of executions to skip (default 0)
Returns:
List of long-running executions with details
"""
logger.info(f"Listing long-running executions (limit={limit}, offset={offset})")
executions = await get_long_running_executions_details(limit=limit, offset=offset)
# Get total count for pagination
diagnostics = await get_execution_diagnostics()
total = diagnostics.stuck_running_24h
return RunningExecutionsListResponse(executions=executions, total=total)
@router.get(
"/diagnostics/executions/stuck-queued",
response_model=RunningExecutionsListResponse,
summary="List Stuck Queued Executions",
)
async def list_stuck_queued_executions(
limit: int = 100,
offset: int = 0,
):
"""
Get detailed list of stuck queued executions (QUEUED >1h, never started).
Args:
limit: Maximum number of executions to return (default 100)
offset: Number of executions to skip (default 0)
Returns:
List of stuck queued executions with details
"""
logger.info(f"Listing stuck queued executions (limit={limit}, offset={offset})")
executions = await get_stuck_queued_executions_details(limit=limit, offset=offset)
# Get total count for pagination
diagnostics = await get_execution_diagnostics()
total = diagnostics.stuck_queued_1h
return RunningExecutionsListResponse(executions=executions, total=total)
@router.get(
"/diagnostics/executions/invalid",
response_model=RunningExecutionsListResponse,
summary="List Invalid Executions",
)
async def list_invalid_executions(
limit: int = 100,
offset: int = 0,
):
"""
Get detailed list of executions in invalid states (READ-ONLY).
Invalid states indicate data corruption and require manual investigation:
- QUEUED but has startedAt (impossible - can't start while queued)
- RUNNING but no startedAt (impossible - can't run without starting)
⚠️ NO BULK ACTIONS PROVIDED - These need case-by-case investigation.
Each invalid execution likely has a different root cause (crashes, race conditions,
DB corruption). Investigate the execution history and logs to determine appropriate
action (manual cleanup, status fix, or leave as-is if system recovered).
Args:
limit: Maximum number of executions to return (default 100)
offset: Number of executions to skip (default 0)
Returns:
List of invalid state executions with details
"""
logger.info(f"Listing invalid state executions (limit={limit}, offset={offset})")
executions = await get_invalid_executions_details(limit=limit, offset=offset)
# Get total count for pagination
diagnostics = await get_execution_diagnostics()
total = (
diagnostics.invalid_queued_with_start
+ diagnostics.invalid_running_without_start
)
return RunningExecutionsListResponse(executions=executions, total=total)
@router.post(
"/diagnostics/executions/requeue",
response_model=RequeueExecutionResponse,
summary="Requeue Stuck Execution",
)
async def requeue_single_execution(
request: StopExecutionRequest, # Reuse same request model (has execution_id)
user: AuthUser = Security(requires_admin_user),
):
"""
Requeue a stuck QUEUED execution (admin only).
Uses add_graph_execution with existing graph_exec_id to requeue.
⚠️ WARNING: Only use for stuck executions. This will re-execute and may cost credits.
Args:
request: Contains execution_id to requeue
Returns:
Success status and message
"""
logger.info(f"Admin {user.user_id} requeueing execution {request.execution_id}")
# Get the execution (validation - must be QUEUED)
executions = await get_graph_executions(
graph_exec_id=request.execution_id,
statuses=[AgentExecutionStatus.QUEUED],
)
if not executions:
raise HTTPException(
status_code=404,
detail="Execution not found or not in QUEUED status",
)
execution = executions[0]
# Use add_graph_execution in requeue mode
await add_graph_execution(
graph_id=execution.graph_id,
user_id=execution.user_id,
graph_version=execution.graph_version,
graph_exec_id=request.execution_id, # Requeue existing execution
)
return RequeueExecutionResponse(
success=True,
requeued_count=1,
message="Execution requeued successfully",
)
@router.post(
"/diagnostics/executions/requeue-bulk",
response_model=RequeueExecutionResponse,
summary="Requeue Multiple Stuck Executions",
)
async def requeue_multiple_executions(
request: StopExecutionsRequest, # Reuse same request model (has execution_ids)
user: AuthUser = Security(requires_admin_user),
):
"""
Requeue multiple stuck QUEUED executions (admin only).
Uses add_graph_execution with existing graph_exec_id to requeue.
⚠️ WARNING: Only use for stuck executions. This will re-execute and may cost credits.
Args:
request: Contains list of execution_ids to requeue
Returns:
Number of executions requeued and success message
"""
logger.info(
f"Admin {user.user_id} requeueing {len(request.execution_ids)} executions"
)
# Get executions by ID list (must be QUEUED)
executions = await get_graph_executions(
execution_ids=request.execution_ids,
statuses=[AgentExecutionStatus.QUEUED],
)
if not executions:
return RequeueExecutionResponse(
success=False,
requeued_count=0,
message="No QUEUED executions found to requeue",
)
# Requeue all executions in parallel using add_graph_execution
async def requeue_one(exec) -> bool:
try:
await add_graph_execution(
graph_id=exec.graph_id,
user_id=exec.user_id,
graph_version=exec.graph_version,
graph_exec_id=exec.id, # Requeue existing
)
return True
except Exception as e:
logger.error(f"Failed to requeue {exec.id}: {e}")
return False
results = await asyncio.gather(
*[requeue_one(exec) for exec in executions], return_exceptions=False
)
requeued_count = sum(1 for success in results if success)
return RequeueExecutionResponse(
success=requeued_count > 0,
requeued_count=requeued_count,
message=f"Requeued {requeued_count} of {len(request.execution_ids)} executions",
)
@router.post(
"/diagnostics/executions/stop",
response_model=StopExecutionResponse,
summary="Stop Single Execution",
)
async def stop_single_execution(
request: StopExecutionRequest,
user: AuthUser = Security(requires_admin_user),
):
"""
Stop a single execution (admin only).
Uses robust stop_graph_execution which cascades to children and waits for termination.
Args:
request: Contains execution_id to stop
Returns:
Success status and message
"""
logger.info(f"Admin {user.user_id} stopping execution {request.execution_id}")
# Get the execution to find its owner user_id (required by stop_graph_execution)
executions = await get_graph_executions(
graph_exec_id=request.execution_id,
)
if not executions:
raise HTTPException(status_code=404, detail="Execution not found")
execution = executions[0]
# Use robust stop_graph_execution (cascades to children, waits for termination)
await stop_graph_execution(
user_id=execution.user_id,
graph_exec_id=request.execution_id,
wait_timeout=15.0,
cascade=True,
)
return StopExecutionResponse(
success=True,
stopped_count=1,
message="Execution stopped successfully",
)
@router.post(
"/diagnostics/executions/stop-bulk",
response_model=StopExecutionResponse,
summary="Stop Multiple Executions",
)
async def stop_multiple_executions(
request: StopExecutionsRequest,
user: AuthUser = Security(requires_admin_user),
):
"""
Stop multiple active executions (admin only).
Uses robust stop_graph_execution which cascades to children and waits for termination.
Args:
request: Contains list of execution_ids to stop
Returns:
Number of executions stopped and success message
"""
logger.info(
f"Admin {user.user_id} stopping {len(request.execution_ids)} executions"
)
# Get executions by ID list
executions = await get_graph_executions(
execution_ids=request.execution_ids,
)
if not executions:
return StopExecutionResponse(
success=False,
stopped_count=0,
message="No executions found",
)
# Stop all executions in parallel using robust stop_graph_execution
async def stop_one(exec) -> bool:
try:
await stop_graph_execution(
user_id=exec.user_id,
graph_exec_id=exec.id,
wait_timeout=15.0,
cascade=True,
)
return True
except Exception as e:
logger.error(f"Failed to stop execution {exec.id}: {e}")
return False
results = await asyncio.gather(
*[stop_one(exec) for exec in executions], return_exceptions=False
)
stopped_count = sum(1 for success in results if success)
return StopExecutionResponse(
success=stopped_count > 0,
stopped_count=stopped_count,
message=f"Stopped {stopped_count} of {len(request.execution_ids)} executions",
)
@router.post(
"/diagnostics/executions/cleanup-orphaned",
response_model=StopExecutionResponse,
summary="Cleanup Orphaned Executions",
)
async def cleanup_orphaned_executions(
request: StopExecutionsRequest,
user: AuthUser = Security(requires_admin_user),
):
"""
Cleanup orphaned executions by directly updating DB status (admin only).
For executions in DB but not actually running in executor (old/stale records).
Args:
request: Contains list of execution_ids to cleanup
Returns:
Number of executions cleaned up and success message
"""
logger.info(
f"Admin {user.user_id} cleaning up {len(request.execution_ids)} orphaned executions"
)
cleaned_count = await cleanup_orphaned_executions_bulk(
request.execution_ids, user.user_id
)
return StopExecutionResponse(
success=cleaned_count > 0,
stopped_count=cleaned_count,
message=f"Cleaned up {cleaned_count} of {len(request.execution_ids)} orphaned executions",
)
# ============================================================================
# SCHEDULE DIAGNOSTICS ENDPOINTS
# ============================================================================
class SchedulesListResponse(BaseModel):
"""Response model for list of schedules"""
schedules: List[ScheduleDetail]
total: int
class OrphanedSchedulesListResponse(BaseModel):
"""Response model for list of orphaned schedules"""
schedules: List[OrphanedScheduleDetail]
total: int
class ScheduleCleanupResponse(BaseModel):
"""Response model for schedule cleanup operations"""
success: bool
deleted_count: int = 0
message: str
@router.get(
"/diagnostics/schedules",
response_model=ScheduleHealthMetrics,
summary="Get Schedule Diagnostics",
)
async def get_schedule_diagnostics_endpoint():
"""
Get comprehensive diagnostic information about schedule health.
Returns schedule metrics including:
- Total schedules (user vs system)
- Orphaned schedules by category
- Upcoming executions
"""
logger.info("Getting schedule diagnostics")
diagnostics = await get_schedule_health_metrics()
logger.info(
f"Schedule diagnostics: total={diagnostics.total_schedules}, "
f"user={diagnostics.user_schedules}, "
f"orphaned={diagnostics.total_orphaned}"
)
return diagnostics
@router.get(
"/diagnostics/schedules/all",
response_model=SchedulesListResponse,
summary="List All User Schedules",
)
async def list_all_schedules(
limit: int = 100,
offset: int = 0,
):
"""
Get detailed list of all user schedules (excludes system monitoring jobs).
Args:
limit: Maximum number of schedules to return (default 100)
offset: Number of schedules to skip (default 0)
Returns:
List of schedules with details
"""
logger.info(f"Listing all schedules (limit={limit}, offset={offset})")
schedules = await get_all_schedules_details(limit=limit, offset=offset)
# Get total count
diagnostics = await get_schedule_health_metrics()
total = diagnostics.user_schedules
return SchedulesListResponse(schedules=schedules, total=total)
@router.get(
"/diagnostics/schedules/orphaned",
response_model=OrphanedSchedulesListResponse,
summary="List Orphaned Schedules",
)
async def list_orphaned_schedules():
"""
Get detailed list of orphaned schedules with orphan reasons.
Returns:
List of orphaned schedules categorized by orphan type
"""
logger.info("Listing orphaned schedules")
schedules = await get_orphaned_schedules_details()
return OrphanedSchedulesListResponse(schedules=schedules, total=len(schedules))
@router.post(
"/diagnostics/schedules/cleanup-orphaned",
response_model=ScheduleCleanupResponse,
summary="Cleanup Orphaned Schedules",
)
async def cleanup_orphaned_schedules(
request: StopExecutionsRequest, # Reuse for schedule_ids list
user: AuthUser = Security(requires_admin_user),
):
"""
Cleanup orphaned schedules by deleting from scheduler (admin only).
Args:
request: Contains list of schedule_ids to delete
Returns:
Number of schedules deleted and success message
"""
logger.info(
f"Admin {user.user_id} cleaning up {len(request.execution_ids)} orphaned schedules"
)
deleted_count = await cleanup_orphaned_schedules_bulk(
request.execution_ids, user.user_id
)
return ScheduleCleanupResponse(
success=deleted_count > 0,
deleted_count=deleted_count,
message=f"Deleted {deleted_count} of {len(request.execution_ids)} orphaned schedules",
)
@router.post(
"/diagnostics/executions/stop-all-long-running",
response_model=StopExecutionResponse,
summary="Stop ALL Long-Running Executions",
)
async def stop_all_long_running_executions_endpoint(
user: AuthUser = Security(requires_admin_user),
):
"""
Stop ALL long-running executions (RUNNING >24h) by sending cancel signals (admin only).
Operates on entire dataset, not limited to pagination.
Returns:
Number of executions stopped and success message
"""
logger.info(f"Admin {user.user_id} stopping ALL long-running executions")
stopped_count = await stop_all_long_running_executions(user.user_id)
return StopExecutionResponse(
success=stopped_count > 0,
stopped_count=stopped_count,
message=f"Stopped {stopped_count} long-running executions",
)
@router.post(
"/diagnostics/executions/cleanup-all-orphaned",
response_model=StopExecutionResponse,
summary="Cleanup ALL Orphaned Executions",
)
async def cleanup_all_orphaned_executions(
user: AuthUser = Security(requires_admin_user),
):
"""
Cleanup ALL orphaned executions (>24h old) by directly updating DB status.
Operates on all executions, not just paginated results.
Returns:
Number of executions cleaned up and success message
"""
logger.info(f"Admin {user.user_id} cleaning up ALL orphaned executions")
# Fetch all orphaned execution IDs
execution_ids = await get_all_orphaned_execution_ids()
if not execution_ids:
return StopExecutionResponse(
success=True,
stopped_count=0,
message="No orphaned executions to cleanup",
)
cleaned_count = await cleanup_orphaned_executions_bulk(execution_ids, user.user_id)
return StopExecutionResponse(
success=cleaned_count > 0,
stopped_count=cleaned_count,
message=f"Cleaned up {cleaned_count} orphaned executions",
)
@router.post(
"/diagnostics/executions/cleanup-all-stuck-queued",
response_model=StopExecutionResponse,
summary="Cleanup ALL Stuck Queued Executions",
)
async def cleanup_all_stuck_queued_executions_endpoint(
user: AuthUser = Security(requires_admin_user),
):
"""
Cleanup ALL stuck queued executions (QUEUED >1h) by updating DB status (admin only).
Operates on entire dataset, not limited to pagination.
Returns:
Number of executions cleaned up and success message
"""
logger.info(f"Admin {user.user_id} cleaning up ALL stuck queued executions")
cleaned_count = await cleanup_all_stuck_queued_executions(user.user_id)
return StopExecutionResponse(
success=cleaned_count > 0,
stopped_count=cleaned_count,
message=f"Cleaned up {cleaned_count} stuck queued executions",
)
@router.post(
"/diagnostics/executions/requeue-all-stuck",
response_model=RequeueExecutionResponse,
summary="Requeue ALL Stuck Queued Executions",
)
async def requeue_all_stuck_executions(
user: AuthUser = Security(requires_admin_user),
):
"""
Requeue ALL stuck queued executions (QUEUED >1h) by publishing to RabbitMQ.
Operates on all executions, not just paginated results.
Uses add_graph_execution with existing graph_exec_id to requeue.
⚠️ WARNING: This will re-execute ALL stuck executions and may cost significant credits.
Returns:
Number of executions requeued and success message
"""
logger.info(f"Admin {user.user_id} requeueing ALL stuck queued executions")
# Fetch all stuck queued execution IDs
execution_ids = await get_all_stuck_queued_execution_ids()
if not execution_ids:
return RequeueExecutionResponse(
success=True,
requeued_count=0,
message="No stuck queued executions to requeue",
)
# Get stuck executions by ID list (must be QUEUED)
executions = await get_graph_executions(
execution_ids=execution_ids,
statuses=[AgentExecutionStatus.QUEUED],
)
# Requeue all in parallel using add_graph_execution
async def requeue_one(exec) -> bool:
try:
await add_graph_execution(
graph_id=exec.graph_id,
user_id=exec.user_id,
graph_version=exec.graph_version,
graph_exec_id=exec.id, # Requeue existing
)
return True
except Exception as e:
logger.error(f"Failed to requeue {exec.id}: {e}")
return False
results = await asyncio.gather(
*[requeue_one(exec) for exec in executions], return_exceptions=False
)
requeued_count = sum(1 for success in results if success)
return RequeueExecutionResponse(
success=requeued_count > 0,
requeued_count=requeued_count,
message=f"Requeued {requeued_count} stuck executions",
)

View File

@@ -0,0 +1,336 @@
from datetime import datetime, timezone
from unittest.mock import AsyncMock
import fastapi
import fastapi.testclient
import pytest
import pytest_mock
from autogpt_libs.auth.jwt_utils import get_jwt_payload
from prisma.enums import AgentExecutionStatus
import backend.server.v2.admin.diagnostics_admin_routes as diagnostics_admin_routes
from backend.data.diagnostics import ExecutionDiagnosticsSummary, RunningExecutionDetail
from backend.data.execution import GraphExecutionMeta
app = fastapi.FastAPI()
app.include_router(diagnostics_admin_routes.router)
client = fastapi.testclient.TestClient(app)
@pytest.fixture(autouse=True)
def setup_app_admin_auth(mock_jwt_admin):
"""Setup admin auth overrides for all tests in this module"""
app.dependency_overrides[get_jwt_payload] = mock_jwt_admin["get_jwt_payload"]
yield
app.dependency_overrides.clear()
def test_get_execution_diagnostics_success(
mocker: pytest_mock.MockFixture,
):
"""Test fetching execution diagnostics with invalid state detection"""
mock_diagnostics = ExecutionDiagnosticsSummary(
running_count=10,
queued_db_count=5,
rabbitmq_queue_depth=3,
cancel_queue_depth=0,
orphaned_running=2,
orphaned_queued=1,
failed_count_1h=5,
failed_count_24h=20,
failure_rate_24h=0.83,
stuck_running_24h=1,
stuck_running_1h=3,
oldest_running_hours=26.5,
stuck_queued_1h=2,
queued_never_started=1,
invalid_queued_with_start=1, # New invalid state
invalid_running_without_start=1, # New invalid state
completed_1h=50,
completed_24h=1200,
throughput_per_hour=50.0,
timestamp=datetime.now(timezone.utc).isoformat(),
)
mocker.patch(
"backend.server.v2.admin.diagnostics_admin_routes.get_execution_diagnostics",
return_value=mock_diagnostics,
)
response = client.get("/admin/diagnostics/executions")
assert response.status_code == 200
data = response.json()
# Verify new invalid state fields are included
assert data["invalid_queued_with_start"] == 1
assert data["invalid_running_without_start"] == 1
# Verify all expected fields present
assert "running_executions" in data
assert "orphaned_running" in data
assert "failed_count_24h" in data
def test_list_invalid_executions(
mocker: pytest_mock.MockFixture,
):
"""Test listing executions in invalid states (read-only endpoint)"""
mock_invalid_executions = [
RunningExecutionDetail(
execution_id="exec-invalid-1",
graph_id="graph-123",
graph_name="Test Graph",
graph_version=1,
user_id="user-123",
user_email="test@example.com",
status="QUEUED",
created_at=datetime.now(timezone.utc),
started_at=datetime.now(
timezone.utc
), # QUEUED but has startedAt - INVALID!
queue_status=None,
),
RunningExecutionDetail(
execution_id="exec-invalid-2",
graph_id="graph-456",
graph_name="Another Graph",
graph_version=2,
user_id="user-456",
user_email="user@example.com",
status="RUNNING",
created_at=datetime.now(timezone.utc),
started_at=None, # RUNNING but no startedAt - INVALID!
queue_status=None,
),
]
mock_diagnostics = ExecutionDiagnosticsSummary(
running_count=10,
queued_db_count=5,
rabbitmq_queue_depth=3,
cancel_queue_depth=0,
orphaned_running=0,
orphaned_queued=0,
failed_count_1h=0,
failed_count_24h=0,
failure_rate_24h=0.0,
stuck_running_24h=0,
stuck_running_1h=0,
oldest_running_hours=None,
stuck_queued_1h=0,
queued_never_started=0,
invalid_queued_with_start=1,
invalid_running_without_start=1,
completed_1h=0,
completed_24h=0,
throughput_per_hour=0.0,
timestamp=datetime.now(timezone.utc).isoformat(),
)
mocker.patch(
"backend.server.v2.admin.diagnostics_admin_routes.get_invalid_executions_details",
return_value=mock_invalid_executions,
)
mocker.patch(
"backend.server.v2.admin.diagnostics_admin_routes.get_execution_diagnostics",
return_value=mock_diagnostics,
)
response = client.get("/admin/diagnostics/executions/invalid?limit=100&offset=0")
assert response.status_code == 200
data = response.json()
assert data["total"] == 2 # Sum of both invalid state types
assert len(data["executions"]) == 2
# Verify both types of invalid states are returned
assert data["executions"][0]["execution_id"] in [
"exec-invalid-1",
"exec-invalid-2",
]
assert data["executions"][1]["execution_id"] in [
"exec-invalid-1",
"exec-invalid-2",
]
def test_requeue_single_execution_with_add_graph_execution(
mocker: pytest_mock.MockFixture,
admin_user_id: str,
):
"""Test requeueing uses add_graph_execution in requeue mode"""
mock_exec_meta = GraphExecutionMeta(
id="exec-stuck-123",
user_id="user-123",
graph_id="graph-456",
graph_version=1,
inputs=None,
credential_inputs=None,
nodes_input_masks=None,
preset_id=None,
status=AgentExecutionStatus.QUEUED,
started_at=datetime.now(timezone.utc),
ended_at=datetime.now(timezone.utc),
stats=None,
)
mocker.patch(
"backend.server.v2.admin.diagnostics_admin_routes.get_graph_executions",
return_value=[mock_exec_meta],
)
mock_add_graph_execution = mocker.patch(
"backend.server.v2.admin.diagnostics_admin_routes.add_graph_execution",
return_value=AsyncMock(),
)
response = client.post(
"/admin/diagnostics/executions/requeue",
json={"execution_id": "exec-stuck-123"},
)
assert response.status_code == 200
data = response.json()
assert data["success"] is True
assert data["requeued_count"] == 1
# Verify it used add_graph_execution in requeue mode
mock_add_graph_execution.assert_called_once()
call_kwargs = mock_add_graph_execution.call_args.kwargs
assert call_kwargs["graph_exec_id"] == "exec-stuck-123" # Requeue mode!
assert call_kwargs["graph_id"] == "graph-456"
assert call_kwargs["user_id"] == "user-123"
def test_stop_single_execution_with_stop_graph_execution(
mocker: pytest_mock.MockFixture,
admin_user_id: str,
):
"""Test stopping uses robust stop_graph_execution"""
mock_exec_meta = GraphExecutionMeta(
id="exec-running-123",
user_id="user-789",
graph_id="graph-999",
graph_version=2,
inputs=None,
credential_inputs=None,
nodes_input_masks=None,
preset_id=None,
status=AgentExecutionStatus.RUNNING,
started_at=datetime.now(timezone.utc),
ended_at=datetime.now(timezone.utc),
stats=None,
)
mocker.patch(
"backend.server.v2.admin.diagnostics_admin_routes.get_graph_executions",
return_value=[mock_exec_meta],
)
mock_stop_graph_execution = mocker.patch(
"backend.server.v2.admin.diagnostics_admin_routes.stop_graph_execution",
return_value=AsyncMock(),
)
response = client.post(
"/admin/diagnostics/executions/stop",
json={"execution_id": "exec-running-123"},
)
assert response.status_code == 200
data = response.json()
assert data["success"] is True
assert data["stopped_count"] == 1
# Verify it used stop_graph_execution with cascade
mock_stop_graph_execution.assert_called_once()
call_kwargs = mock_stop_graph_execution.call_args.kwargs
assert call_kwargs["graph_exec_id"] == "exec-running-123"
assert call_kwargs["user_id"] == "user-789"
assert call_kwargs["cascade"] is True # Stops children too!
assert call_kwargs["wait_timeout"] == 15.0
def test_requeue_not_queued_execution_fails(
mocker: pytest_mock.MockFixture,
):
"""Test that requeue fails if execution is not in QUEUED status"""
# Mock an execution that's RUNNING (not QUEUED)
mocker.patch(
"backend.server.v2.admin.diagnostics_admin_routes.get_graph_executions",
return_value=[], # No QUEUED executions found
)
response = client.post(
"/admin/diagnostics/executions/requeue",
json={"execution_id": "exec-running-123"},
)
assert response.status_code == 404
assert "not found or not in QUEUED status" in response.json()["detail"]
def test_list_invalid_executions_no_bulk_actions(
mocker: pytest_mock.MockFixture,
):
"""Verify invalid executions endpoint is read-only (no bulk actions)"""
# This is a documentation test - the endpoint exists but should not
# have corresponding cleanup/stop/requeue endpoints
# These endpoints should NOT exist for invalid states:
invalid_bulk_endpoints = [
"/admin/diagnostics/executions/cleanup-invalid",
"/admin/diagnostics/executions/stop-invalid",
"/admin/diagnostics/executions/requeue-invalid",
]
for endpoint in invalid_bulk_endpoints:
response = client.post(endpoint, json={"execution_ids": ["test"]})
assert response.status_code == 404, f"{endpoint} should not exist (read-only)"
def test_execution_ids_filter_efficiency(
mocker: pytest_mock.MockFixture,
):
"""Test that bulk operations use efficient execution_ids filter"""
mock_exec_metas = [
GraphExecutionMeta(
id=f"exec-{i}",
user_id=f"user-{i}",
graph_id="graph-123",
graph_version=1,
inputs=None,
credential_inputs=None,
nodes_input_masks=None,
preset_id=None,
status=AgentExecutionStatus.QUEUED,
started_at=datetime.now(timezone.utc),
ended_at=datetime.now(timezone.utc),
stats=None,
)
for i in range(3)
]
mock_get_graph_executions = mocker.patch(
"backend.server.v2.admin.diagnostics_admin_routes.get_graph_executions",
return_value=mock_exec_metas,
)
mocker.patch(
"backend.server.v2.admin.diagnostics_admin_routes.add_graph_execution",
return_value=AsyncMock(),
)
response = client.post(
"/admin/diagnostics/executions/requeue-bulk",
json={"execution_ids": ["exec-0", "exec-1", "exec-2"]},
)
assert response.status_code == 200
# Verify it used execution_ids filter (not fetching all queued)
mock_get_graph_executions.assert_called_once()
call_kwargs = mock_get_graph_executions.call_args.kwargs
assert "execution_ids" in call_kwargs
assert call_kwargs["execution_ids"] == ["exec-0", "exec-1", "exec-2"]
assert call_kwargs["statuses"] == [AgentExecutionStatus.QUEUED]

View File

@@ -14,3 +14,70 @@ class UserHistoryResponse(BaseModel):
class AddUserCreditsResponse(BaseModel):
new_balance: int
transaction_key: str
class ExecutionDiagnosticsResponse(BaseModel):
"""Response model for execution diagnostics"""
# Current execution state
running_executions: int
queued_executions_db: int
queued_executions_rabbitmq: int
cancel_queue_depth: int
# Orphaned execution detection
orphaned_running: int
orphaned_queued: int
# Failure metrics
failed_count_1h: int
failed_count_24h: int
failure_rate_24h: float
# Long-running detection
stuck_running_24h: int
stuck_running_1h: int
oldest_running_hours: float | None
# Stuck queued detection
stuck_queued_1h: int
queued_never_started: int
# Invalid state detection (data corruption - no auto-actions)
invalid_queued_with_start: int
invalid_running_without_start: int
# Throughput metrics
completed_1h: int
completed_24h: int
throughput_per_hour: float
timestamp: str
class AgentDiagnosticsResponse(BaseModel):
"""Response model for agent diagnostics"""
agents_with_active_executions: int
timestamp: str
class ScheduleHealthMetrics(BaseModel):
"""Response model for schedule diagnostics"""
total_schedules: int
user_schedules: int
system_schedules: int
# Orphan detection
orphaned_deleted_graph: int
orphaned_no_library_access: int
orphaned_invalid_credentials: int
orphaned_validation_failed: int
total_orphaned: int
# Upcoming
schedules_next_hour: int
schedules_next_24h: int
timestamp: str

View File

@@ -0,0 +1,579 @@
"use client";
import { useState } from "react";
import { Button } from "@/components/atoms/Button/Button";
import { Card } from "@/components/atoms/Card/Card";
import {
CardContent,
CardDescription,
CardHeader,
CardTitle,
} from "@/components/__legacy__/ui/card";
import { ArrowClockwise } from "@phosphor-icons/react";
import { ErrorCard } from "@/components/molecules/ErrorCard/ErrorCard";
import { useDiagnosticsContent } from "./useDiagnosticsContent";
import { ExecutionsTable } from "./ExecutionsTable";
import { SchedulesTable } from "./SchedulesTable";
export function DiagnosticsContent() {
const {
executionData,
agentData,
scheduleData,
isLoading,
isError,
error,
refresh,
} = useDiagnosticsContent();
const [activeTab, setActiveTab] = useState<
"all" | "orphaned" | "failed" | "long-running" | "stuck-queued" | "invalid"
>("all");
if (isLoading && !executionData && !agentData) {
return (
<div className="flex h-64 items-center justify-center">
<div className="text-center">
<ArrowClockwise className="mx-auto h-8 w-8 animate-spin text-gray-400" />
<p className="mt-2 text-gray-500">Loading diagnostics...</p>
</div>
</div>
);
}
if (isError) {
return (
<ErrorCard
httpError={error as any}
onRetry={refresh}
context="diagnostics"
/>
);
}
return (
<div className="space-y-6">
<div className="flex items-center justify-between">
<div>
<h1 className="text-3xl font-bold">System Diagnostics</h1>
<p className="text-gray-500">
Monitor execution and agent system health
</p>
</div>
<Button
onClick={refresh}
disabled={isLoading}
variant="outline"
size="small"
>
<ArrowClockwise
className={`mr-2 h-4 w-4 ${isLoading ? "animate-spin" : ""}`}
/>
Refresh
</Button>
</div>
{/* Alert Cards for Critical Issues */}
<div className="grid gap-4 md:grid-cols-3">
{executionData && (
<>
{/* Orphaned Executions Alert */}
{(executionData.orphaned_running > 0 ||
executionData.orphaned_queued > 0) && (
<div
className="cursor-pointer transition-all hover:scale-105"
onClick={() => setActiveTab("orphaned")}
>
<Card className="border-orange-300 bg-orange-50">
<CardHeader className="pb-3">
<CardTitle className="text-orange-800">
Orphaned Executions
</CardTitle>
</CardHeader>
<CardContent>
<p className="text-3xl font-bold text-orange-900">
{executionData.orphaned_running +
executionData.orphaned_queued}
</p>
<p className="text-sm text-orange-700">
{executionData.orphaned_running} running,{" "}
{executionData.orphaned_queued} queued ({">"}24h old)
</p>
<p className="mt-2 text-xs text-orange-600">
Click to view
</p>
</CardContent>
</Card>
</div>
)}
{/* Failed Executions Alert */}
{executionData.failed_count_24h > 0 && (
<div
className="cursor-pointer transition-all hover:scale-105"
onClick={() => setActiveTab("failed")}
>
<Card className="border-red-300 bg-red-50">
<CardHeader className="pb-3">
<CardTitle className="text-red-800">
Failed Executions (24h)
</CardTitle>
</CardHeader>
<CardContent>
<p className="text-3xl font-bold text-red-900">
{executionData.failed_count_24h}
</p>
<p className="text-sm text-red-700">
{executionData.failed_count_1h} in last hour (
{executionData.failure_rate_24h.toFixed(1)}/hr rate)
</p>
<p className="mt-2 text-xs text-red-600">Click to view </p>
</CardContent>
</Card>
</div>
)}
{/* Long-Running Alert */}
{executionData.stuck_running_24h > 0 && (
<>
<div
className="cursor-pointer transition-all hover:scale-105"
onClick={() => setActiveTab("long-running")}
>
<Card className="border-yellow-300 bg-yellow-50">
<CardHeader className="pb-3">
<CardTitle className="text-yellow-800">
Long-Running Executions
</CardTitle>
</CardHeader>
<CardContent>
<p className="text-3xl font-bold text-yellow-900">
{executionData.stuck_running_24h}
</p>
<p className="text-sm text-yellow-700">
Running {">"}24h (oldest:{" "}
{executionData.oldest_running_hours
? `${Math.floor(executionData.oldest_running_hours)}h`
: "N/A"}
)
</p>
<p className="mt-2 text-xs text-yellow-600">
Click to view
</p>
</CardContent>
</Card>
</div>
</>
)}
{/* Orphaned Schedules Alert */}
{scheduleData && scheduleData.total_orphaned > 0 && (
<div
className="cursor-pointer transition-all hover:scale-105"
onClick={() => setActiveTab("all")}
>
<Card className="border-purple-300 bg-purple-50">
<CardHeader className="pb-3">
<CardTitle className="text-purple-800">
Orphaned Schedules
</CardTitle>
</CardHeader>
<CardContent>
<p className="text-3xl font-bold text-purple-900">
{scheduleData.total_orphaned}
</p>
<p className="text-sm text-purple-700">
{scheduleData.orphaned_deleted_graph > 0 &&
`${scheduleData.orphaned_deleted_graph} deleted graph, `}
{scheduleData.orphaned_no_library_access > 0 &&
`${scheduleData.orphaned_no_library_access} no access`}
</p>
<p className="mt-2 text-xs text-purple-600">
Click to view schedules
</p>
</CardContent>
</Card>
</div>
)}
{/* Invalid State Alert */}
{(executionData.invalid_queued_with_start > 0 ||
executionData.invalid_running_without_start > 0) && (
<div
className="cursor-pointer transition-all hover:scale-105"
onClick={() => setActiveTab("invalid")}
>
<Card className="border-pink-300 bg-pink-50">
<CardHeader className="pb-3">
<CardTitle className="text-pink-800">
Invalid States (Data Corruption)
</CardTitle>
</CardHeader>
<CardContent>
<p className="text-3xl font-bold text-pink-900">
{executionData.invalid_queued_with_start +
executionData.invalid_running_without_start}
</p>
<p className="text-sm text-pink-700">
Requires manual investigation
</p>
<p className="mt-2 text-xs text-pink-600">
Click to view (read-only)
</p>
</CardContent>
</Card>
</div>
)}
</>
)}
</div>
<div className="grid gap-6 md:grid-cols-3">
<Card>
<CardHeader>
<CardTitle>Execution Queue Status</CardTitle>
<CardDescription>
Current execution and queue metrics
</CardDescription>
</CardHeader>
<CardContent>
{executionData ? (
<div className="space-y-4">
<div className="flex items-center justify-between rounded-lg border p-4">
<div>
<p className="text-sm font-medium text-gray-500">
Running Executions
</p>
<p className="text-3xl font-bold">
{executionData.running_executions}
</p>
</div>
<div className="flex h-12 w-12 items-center justify-center rounded-full bg-green-100">
<div className="h-6 w-6 rounded-full bg-green-500"></div>
</div>
</div>
<div className="flex items-center justify-between rounded-lg border p-4">
<div>
<p className="text-sm font-medium text-gray-500">
Queued in Database
</p>
<p className="text-3xl font-bold">
{executionData.queued_executions_db}
</p>
{executionData.stuck_queued_1h > 0 && (
<p className="text-xs text-orange-600">
{executionData.stuck_queued_1h} stuck {">"}1h
</p>
)}
</div>
<div className="flex h-12 w-12 items-center justify-center rounded-full bg-blue-100">
<div className="h-6 w-6 rounded-full bg-blue-500"></div>
</div>
</div>
<div className="flex items-center justify-between rounded-lg border p-4">
<div>
<p className="text-sm font-medium text-gray-500">
Queued in RabbitMQ
</p>
<p className="text-3xl font-bold">
{executionData.queued_executions_rabbitmq === -1 ? (
<span className="text-xl text-red-500">Error</span>
) : (
executionData.queued_executions_rabbitmq
)}
</p>
</div>
<div
className={`flex h-12 w-12 items-center justify-center rounded-full ${
executionData.queued_executions_rabbitmq === -1
? "bg-red-100"
: "bg-yellow-100"
}`}
>
<div
className={`h-6 w-6 rounded-full ${
executionData.queued_executions_rabbitmq === -1
? "bg-red-500"
: "bg-yellow-500"
}`}
></div>
</div>
</div>
<div className="text-xs text-gray-400">
Last updated:{" "}
{new Date(executionData.timestamp).toLocaleString()}
</div>
</div>
) : (
<p className="text-gray-500">No data available</p>
)}
</CardContent>
</Card>
<Card>
<CardHeader>
<CardTitle>System Throughput</CardTitle>
<CardDescription>
Execution completion and processing rates
</CardDescription>
</CardHeader>
<CardContent>
{executionData ? (
<div className="space-y-4">
<div className="flex items-center justify-between rounded-lg border p-4">
<div>
<p className="text-sm font-medium text-gray-500">
Completed (24h)
</p>
<p className="text-3xl font-bold">
{executionData.completed_24h}
</p>
<p className="text-xs text-gray-600">
{executionData.completed_1h} in last hour
</p>
</div>
<div className="flex h-12 w-12 items-center justify-center rounded-full bg-green-100">
<div className="h-6 w-6 rounded-full bg-green-500"></div>
</div>
</div>
<div className="flex items-center justify-between rounded-lg border p-4">
<div>
<p className="text-sm font-medium text-gray-500">
Throughput Rate
</p>
<p className="text-3xl font-bold">
{executionData.throughput_per_hour.toFixed(1)}
</p>
<p className="text-xs text-gray-600">
completions per hour
</p>
</div>
<div className="flex h-12 w-12 items-center justify-center rounded-full bg-blue-100">
<div className="h-6 w-6 rounded-full bg-blue-500"></div>
</div>
</div>
<div className="flex items-center justify-between rounded-lg border p-4">
<div>
<p className="text-sm font-medium text-gray-500">
Cancel Queue Depth
</p>
<p className="text-3xl font-bold">
{executionData.cancel_queue_depth === -1 ? (
<span className="text-xl text-red-500">Error</span>
) : (
executionData.cancel_queue_depth
)}
</p>
</div>
<div className="flex h-12 w-12 items-center justify-center rounded-full bg-purple-100">
<div className="h-6 w-6 rounded-full bg-purple-500"></div>
</div>
</div>
<div className="text-xs text-gray-400">
Last updated:{" "}
{new Date(executionData.timestamp).toLocaleString()}
</div>
</div>
) : (
<p className="text-gray-500">No data available</p>
)}
</CardContent>
</Card>
<Card>
<CardHeader>
<CardTitle>Schedules</CardTitle>
<CardDescription>
Scheduled agent executions and health
</CardDescription>
</CardHeader>
<CardContent>
{scheduleData ? (
<div className="space-y-4">
<div className="flex items-center justify-between rounded-lg border p-4">
<div>
<p className="text-sm font-medium text-gray-500">
User Schedules
</p>
<p className="text-3xl font-bold">
{scheduleData.user_schedules}
</p>
{scheduleData.total_orphaned > 0 && (
<p className="text-xs text-orange-600">
{scheduleData.total_orphaned} orphaned
</p>
)}
</div>
<div className="flex h-12 w-12 items-center justify-center rounded-full bg-purple-100">
<div className="h-6 w-6 rounded-full bg-purple-500"></div>
</div>
</div>
<div className="flex items-center justify-between rounded-lg border p-4">
<div>
<p className="text-sm font-medium text-gray-500">
Upcoming Runs (1h)
</p>
<p className="text-3xl font-bold">
{scheduleData.total_runs_next_hour}
</p>
<p className="text-xs text-gray-600">
from {scheduleData.schedules_next_hour} schedule
{scheduleData.schedules_next_hour !== 1 ? "s" : ""}
</p>
</div>
<div className="flex h-12 w-12 items-center justify-center rounded-full bg-blue-100">
<div className="h-6 w-6 rounded-full bg-blue-500"></div>
</div>
</div>
<div className="flex items-center justify-between rounded-lg border p-4">
<div>
<p className="text-sm font-medium text-gray-500">
Upcoming Runs (24h)
</p>
<p className="text-3xl font-bold">
{scheduleData.total_runs_next_24h}
</p>
<p className="text-xs text-gray-600">
from {scheduleData.schedules_next_24h} schedule
{scheduleData.schedules_next_24h !== 1 ? "s" : ""}
</p>
</div>
<div className="flex h-12 w-12 items-center justify-center rounded-full bg-green-100">
<div className="h-6 w-6 rounded-full bg-green-500"></div>
</div>
</div>
<div className="text-xs text-gray-400">
Last updated:{" "}
{new Date(scheduleData.timestamp).toLocaleString()}
</div>
</div>
) : (
<p className="text-gray-500">No data available</p>
)}
</CardContent>
</Card>
</div>
<Card>
<CardHeader>
<CardTitle>Diagnostic Information</CardTitle>
<CardDescription>
Understanding metrics and tabs for on-call diagnostics
</CardDescription>
</CardHeader>
<CardContent>
<div className="space-y-3 text-sm">
<div>
<p className="font-semibold text-orange-700">
🟠 Orphaned Executions:
</p>
<p className="text-gray-600">
Executions {">"}24h old in database but not actually running in
executor. Usually from executor restarts/crashes. Safe to
cleanup (marks as FAILED in DB).
</p>
</div>
<div>
<p className="font-semibold text-blue-700">
🔵 Stuck Queued Executions:
</p>
<p className="text-gray-600">
QUEUED {">"}1h but never started. Not in RabbitMQ queue. Can
cleanup (safe) or requeue ( costs credits - only if temporary
issue like RabbitMQ purge).
</p>
</div>
<div>
<p className="font-semibold text-yellow-700">
🟡 Long-Running Executions:
</p>
<p className="text-gray-600">
RUNNING status {">"}24h. May be legitimately long jobs or stuck.
Review before stopping. Sends cancel signal to executor.
</p>
</div>
<div>
<p className="font-semibold text-red-700">
🔴 Failed Executions:
</p>
<p className="text-gray-600">
Executions that failed in last 24h. View error messages to
identify patterns. Spike in failures indicates system issues.
</p>
</div>
<div>
<p className="font-semibold text-pink-700">
🩷 Invalid States (Data Corruption):
</p>
<p className="text-gray-600">
Executions in impossible states (QUEUED with startedAt, RUNNING
without startedAt). Indicates DB corruption, race conditions, or
crashes. Each requires manual investigation - no bulk actions
provided.
</p>
</div>
<div>
<p className="font-semibold">Throughput Metrics:</p>
<p className="text-gray-600">
Completions per hour shows system productivity. Declining
throughput indicates performance degradation or executor issues.
</p>
</div>
<div>
<p className="font-semibold">Queue Health:</p>
<p className="text-gray-600">
RabbitMQ depths should be low ({"<"}100). High queues indicate
executor can&apos;t keep up. Cancel queue backlog indicates
executor processing issues.
</p>
</div>
</div>
</CardContent>
</Card>
{/* Add Executions Table with tab counts */}
<ExecutionsTable
onRefresh={refresh}
initialTab={activeTab}
onTabChange={setActiveTab}
diagnosticsData={
executionData
? {
orphaned_running: executionData.orphaned_running,
orphaned_queued: executionData.orphaned_queued,
failed_count_24h: executionData.failed_count_24h,
stuck_running_24h: executionData.stuck_running_24h,
stuck_queued_1h: executionData.stuck_queued_1h,
invalid_queued_with_start:
executionData.invalid_queued_with_start,
invalid_running_without_start:
executionData.invalid_running_without_start,
}
: undefined
}
/>
{/* Add Schedules Table */}
<SchedulesTable
onRefresh={refresh}
diagnosticsData={
scheduleData
? {
total_orphaned: scheduleData.total_orphaned,
user_schedules: scheduleData.user_schedules,
}
: undefined
}
/>
</div>
);
}

View File

@@ -0,0 +1,424 @@
"use client";
import { Button } from "@/components/atoms/Button/Button";
import { Card } from "@/components/atoms/Card/Card";
import {
Dialog,
DialogContent,
DialogDescription,
DialogFooter,
DialogHeader,
DialogTitle,
} from "@/components/__legacy__/ui/dialog";
import { toast } from "@/components/molecules/Toast/use-toast";
import { ArrowClockwise, Trash, Copy } from "@phosphor-icons/react";
import React, { useState } from "react";
import {
Table,
TableHeader,
TableBody,
TableHead,
TableRow,
TableCell,
} from "@/components/__legacy__/ui/table";
import { Checkbox } from "@/components/__legacy__/ui/checkbox";
import {
CardHeader,
CardTitle,
CardContent,
} from "@/components/__legacy__/ui/card";
import {
useGetV2ListAllUserSchedules,
useGetV2ListOrphanedSchedules,
usePostV2CleanupOrphanedSchedules,
} from "@/app/api/__generated__/endpoints/admin/admin";
import {
TabsLine,
TabsLineContent,
TabsLineList,
TabsLineTrigger,
} from "@/components/molecules/TabsLine/TabsLine";
interface ScheduleDetail {
schedule_id: string;
schedule_name: string;
graph_id: string;
graph_name: string;
graph_version: number;
user_id: string;
user_email: string | null;
cron: string;
timezone: string;
next_run_time: string;
}
interface OrphanedScheduleDetail {
schedule_id: string;
schedule_name: string;
graph_id: string;
graph_version: number;
user_id: string;
orphan_reason: string;
error_detail: string | null;
next_run_time: string;
}
interface SchedulesTableProps {
onRefresh?: () => void;
diagnosticsData?: {
total_orphaned: number;
user_schedules: number;
};
}
export function SchedulesTable({
onRefresh,
diagnosticsData,
}: SchedulesTableProps) {
const [activeTab, setActiveTab] = useState<"all" | "orphaned">("all");
const [selectedIds, setSelectedIds] = useState<Set<string>>(new Set());
const [showDeleteDialog, setShowDeleteDialog] = useState(false);
const [currentPage, setCurrentPage] = useState(1);
const [pageSize] = useState(10);
// Fetch data based on active tab
const allSchedulesQuery = useGetV2ListAllUserSchedules(
{
limit: pageSize,
offset: (currentPage - 1) * pageSize,
},
{ query: { enabled: activeTab === "all" } },
);
const orphanedSchedulesQuery = useGetV2ListOrphanedSchedules({
query: { enabled: activeTab === "orphaned" },
});
const activeQuery =
activeTab === "orphaned" ? orphanedSchedulesQuery : allSchedulesQuery;
const {
data: schedulesResponse,
isLoading,
error: _error,
refetch,
} = activeQuery;
const schedules =
(schedulesResponse?.data as any)?.schedules || ([] as any[]);
const total = (schedulesResponse?.data as any)?.total || 0;
// Cleanup mutation
const { mutateAsync: cleanupOrphanedSchedules, isPending: isDeleting } =
usePostV2CleanupOrphanedSchedules();
const handleSelectAll = (checked: boolean) => {
if (checked) {
setSelectedIds(new Set(schedules.map((s: any) => s.schedule_id)));
} else {
setSelectedIds(new Set());
}
};
const handleSelectSchedule = (id: string, checked: boolean) => {
const newSelected = new Set(selectedIds);
if (checked) {
newSelected.add(id);
} else {
newSelected.delete(id);
}
setSelectedIds(newSelected);
};
const confirmDelete = () => {
setShowDeleteDialog(true);
};
const handleDelete = async () => {
setShowDeleteDialog(false);
try {
const idsToDelete =
activeTab === "orphaned" && selectedIds.size === 0
? schedules.map((s: any) => s.schedule_id)
: Array.from(selectedIds);
const result = await cleanupOrphanedSchedules({
data: { execution_ids: idsToDelete }, // Reuses execution_ids field name
});
toast({
title: "Success",
description:
(result.data as any)?.message ||
`Deleted ${(result.data as any)?.deleted_count || 0} schedule(s)`,
});
setSelectedIds(new Set());
await refetch();
if (onRefresh) onRefresh();
} catch (error: any) {
console.error("Error deleting schedules:", error);
toast({
title: "Error",
description: error.message || "Failed to delete schedules",
variant: "destructive",
});
}
};
const totalPages = Math.ceil(total / pageSize);
return (
<>
<Card>
<TabsLine
value={activeTab}
onValueChange={(v) => setActiveTab(v as any)}
>
<CardHeader>
<div className="flex items-center justify-between">
<CardTitle>Schedules</CardTitle>
<div className="flex gap-2">
{activeTab === "orphaned" && schedules.length > 0 && (
<Button
variant="destructive"
size="small"
onClick={confirmDelete}
disabled={isDeleting}
>
<Trash className="mr-2 h-4 w-4" />
Delete All Orphaned ({total})
</Button>
)}
{selectedIds.size > 0 && (
<Button
variant="destructive"
size="small"
onClick={confirmDelete}
disabled={isDeleting}
>
<Trash className="mr-2 h-4 w-4" />
Delete Selected ({selectedIds.size})
</Button>
)}
<Button
variant="outline"
size="small"
onClick={() => {
refetch();
if (onRefresh) onRefresh();
}}
disabled={isLoading}
>
<ArrowClockwise
className={`h-4 w-4 ${isLoading ? "animate-spin" : ""}`}
/>
</Button>
</div>
</div>
<TabsLineList className="px-6">
<TabsLineTrigger value="all">
All Schedules
{diagnosticsData && ` (${diagnosticsData.user_schedules})`}
</TabsLineTrigger>
<TabsLineTrigger value="orphaned">
Orphaned
{diagnosticsData && ` (${diagnosticsData.total_orphaned})`}
</TabsLineTrigger>
</TabsLineList>
</CardHeader>
<TabsLineContent value={activeTab}>
<CardContent>
{isLoading && schedules.length === 0 ? (
<div className="flex h-32 items-center justify-center">
<ArrowClockwise className="h-6 w-6 animate-spin text-gray-400" />
</div>
) : schedules.length === 0 ? (
<div className="py-8 text-center text-gray-500">
No schedules found
</div>
) : (
<Table>
<TableHeader>
<TableRow>
<TableHead className="w-12">
<Checkbox
checked={
selectedIds.size === schedules.length &&
schedules.length > 0
}
onCheckedChange={handleSelectAll}
/>
</TableHead>
<TableHead>Name</TableHead>
<TableHead>Graph</TableHead>
<TableHead>User</TableHead>
<TableHead>Cron</TableHead>
<TableHead>Next Run</TableHead>
{activeTab === "orphaned" && (
<TableHead>Orphan Reason</TableHead>
)}
</TableRow>
</TableHeader>
<TableBody>
{schedules.map((schedule: any) => {
const isOrphaned = activeTab === "orphaned";
return (
<TableRow
key={schedule.schedule_id}
className={isOrphaned ? "bg-purple-50" : ""}
>
<TableCell>
<Checkbox
checked={selectedIds.has(schedule.schedule_id)}
onCheckedChange={(checked) =>
handleSelectSchedule(
schedule.schedule_id,
checked as boolean,
)
}
/>
</TableCell>
<TableCell>{schedule.schedule_name}</TableCell>
<TableCell>
<div>{schedule.graph_name || "Unknown"}</div>
<div className="font-mono text-xs text-gray-500">
v{schedule.graph_version}
</div>
</TableCell>
<TableCell>
<div>
{(schedule as ScheduleDetail).user_email || (
<span className="text-gray-400">Unknown</span>
)}
</div>
<div
className="group flex cursor-pointer items-center gap-1 font-mono text-xs text-gray-500 hover:text-gray-700"
onClick={() => {
navigator.clipboard.writeText(schedule.user_id);
toast({
title: "Copied",
description: "User ID copied to clipboard",
});
}}
title="Click to copy user ID"
>
{schedule.user_id.substring(0, 8)}...
<Copy className="h-3 w-3 opacity-0 transition-opacity group-hover:opacity-100" />
</div>
</TableCell>
<TableCell>
<code className="rounded bg-gray-100 px-2 py-1 text-xs">
{schedule.cron}
</code>
<div className="text-xs text-gray-500">
{schedule.timezone}
</div>
</TableCell>
<TableCell>
{schedule.next_run_time
? new Date(
schedule.next_run_time,
).toLocaleString()
: "Not scheduled"}
</TableCell>
{activeTab === "orphaned" && (
<TableCell>
<span className="text-xs text-purple-600">
{(
schedule as OrphanedScheduleDetail
).orphan_reason?.replace(/_/g, " ") ||
"unknown"}
</span>
</TableCell>
)}
</TableRow>
);
})}
</TableBody>
</Table>
)}
{totalPages > 1 && activeTab === "all" && (
<div className="mt-4 flex items-center justify-between">
<div className="text-sm text-gray-600">
Showing {(currentPage - 1) * pageSize + 1} to{" "}
{Math.min(currentPage * pageSize, total)} of {total}{" "}
schedules
</div>
<div className="flex gap-2">
<Button
variant="outline"
size="small"
onClick={() => setCurrentPage(currentPage - 1)}
disabled={currentPage === 1}
>
Previous
</Button>
<div className="flex items-center px-3">
Page {currentPage} of {totalPages}
</div>
<Button
variant="outline"
size="small"
onClick={() => setCurrentPage(currentPage + 1)}
disabled={currentPage === totalPages}
>
Next
</Button>
</div>
</div>
)}
</CardContent>
</TabsLineContent>
</TabsLine>
</Card>
<Dialog open={showDeleteDialog} onOpenChange={setShowDeleteDialog}>
<DialogContent>
<DialogHeader>
<DialogTitle>Confirm Delete Schedules</DialogTitle>
<DialogDescription>
{activeTab === "orphaned" && selectedIds.size === 0 ? (
<>
Are you sure you want to delete ALL {total} orphaned
schedules?
<br />
<br />
These schedules reference deleted graphs or graphs the user no
longer has access to. Deleting them is safe.
</>
) : (
<>
Are you sure you want to delete {selectedIds.size} selected
schedule(s)?
<br />
<br />
This will permanently remove the schedules from the system.
</>
)}
</DialogDescription>
</DialogHeader>
<DialogFooter>
<Button
variant="outline"
onClick={() => setShowDeleteDialog(false)}
>
Cancel
</Button>
<Button
variant="destructive"
onClick={handleDelete}
className="bg-red-600 hover:bg-red-700"
>
Delete Schedules
</Button>
</DialogFooter>
</DialogContent>
</Dialog>
</>
);
}

View File

@@ -0,0 +1,63 @@
import {
useGetV2GetExecutionDiagnostics,
useGetV2GetAgentDiagnostics,
useGetV2GetScheduleDiagnostics,
} from "@/app/api/__generated__/endpoints/admin/admin";
import type { ExecutionDiagnosticsResponse } from "@/app/api/__generated__/models/executionDiagnosticsResponse";
import type { AgentDiagnosticsResponse } from "@/app/api/__generated__/models/agentDiagnosticsResponse";
import type { ScheduleHealthMetrics } from "@/app/api/__generated__/models/scheduleHealthMetrics";
export function useDiagnosticsContent() {
const {
data: executionResponse,
isLoading: isLoadingExecutions,
isError: isExecutionError,
error: executionError,
refetch: refetchExecutions,
} = useGetV2GetExecutionDiagnostics();
const {
data: agentResponse,
isLoading: isLoadingAgents,
isError: isAgentError,
error: agentError,
refetch: refetchAgents,
} = useGetV2GetAgentDiagnostics();
const {
data: scheduleResponse,
isLoading: isLoadingSchedules,
isError: isScheduleError,
error: scheduleError,
refetch: refetchSchedules,
} = useGetV2GetScheduleDiagnostics();
const isLoading =
isLoadingExecutions || isLoadingAgents || isLoadingSchedules;
const isError = isExecutionError || isAgentError || isScheduleError;
const error = executionError || agentError || scheduleError;
const executionData = executionResponse?.data as
| ExecutionDiagnosticsResponse
| undefined;
const agentData = agentResponse?.data as AgentDiagnosticsResponse | undefined;
const scheduleData = scheduleResponse?.data as
| ScheduleHealthMetrics
| undefined;
const refresh = () => {
refetchExecutions();
refetchAgents();
refetchSchedules();
};
return {
executionData,
agentData,
scheduleData,
isLoading,
isError,
error,
refresh,
};
}

View File

@@ -0,0 +1,17 @@
import { withRoleAccess } from "@/lib/withRoleAccess";
import { DiagnosticsContent } from "./components/DiagnosticsContent";
function AdminDiagnostics() {
return (
<div className="mx-auto p-6">
<DiagnosticsContent />
</div>
);
}
export default async function AdminDiagnosticsPage() {
"use server";
const withAdminAccess = await withRoleAccess(["admin"]);
const ProtectedAdminDiagnostics = await withAdminAccess(AdminDiagnostics);
return <ProtectedAdminDiagnostics />;
}

View File

@@ -1,5 +1,6 @@
import { Sidebar } from "@/components/__legacy__/Sidebar";
import { Users, DollarSign, UserSearch, FileText } from "lucide-react";
import { PulseIcon } from "@phosphor-icons/react/ssr";
import { IconSliders } from "@/components/__legacy__/ui/icons";
@@ -16,6 +17,11 @@ const sidebarLinkGroups = [
href: "/admin/spending",
icon: <DollarSign className="h-6 w-6" />,
},
{
text: "System Diagnostics",
href: "/admin/diagnostics",
icon: <PulseIcon className="h-6 w-6" />,
},
{
text: "User Impersonation",
href: "/admin/impersonation",

File diff suppressed because it is too large Load Diff

View File

@@ -1074,6 +1074,7 @@ export type AddUserCreditsResponse = {
new_balance: number;
transaction_key: string;
};
const _stringFormatToDataTypeMap: Partial<Record<string, DataType>> = {
date: DataType.DATE,
time: DataType.TIME,