From 81a8e6f558c0c260aa564ee446041fd1b29502d1 Mon Sep 17 00:00:00 2001 From: "claude[bot]" <41898282+claude[bot]@users.noreply.github.com> Date: Thu, 5 Feb 2026 05:42:07 +0000 Subject: [PATCH] refactor(diagnostics): use model functions and clean up code - Move all internal/local imports to top-level - Remove unnecessary try-except blocks that just re-raise - Replace direct Prisma count queries with get_graph_executions_count - Add updated_time_gte/lte filter to get_graph_executions_count for querying failed/completed executions by update time - Fix N+1 query in cleanup_orphaned_schedules_bulk by fetching all schedules once before the loop - Extract SYSTEM_JOB_IDS constant to module level to avoid duplication Co-authored-by: Nicholas Tindle --- .../backend/backend/data/diagnostics.py | 1491 ++++++++--------- .../backend/backend/data/execution.py | 10 + 2 files changed, 692 insertions(+), 809 deletions(-) diff --git a/autogpt_platform/backend/backend/data/diagnostics.py b/autogpt_platform/backend/backend/data/diagnostics.py index 2f9a9f641e..f5d62a83b1 100644 --- a/autogpt_platform/backend/backend/data/diagnostics.py +++ b/autogpt_platform/backend/backend/data/diagnostics.py @@ -3,27 +3,40 @@ Diagnostics data layer for admin operations. Provides functions to query and manage system diagnostics including executions and agents. """ +import asyncio import logging from datetime import datetime, timedelta, timezone from typing import List, Optional +from croniter import croniter from prisma.enums import AgentExecutionStatus -from prisma.models import AgentGraphExecution +from prisma.models import AgentGraph, AgentGraphExecution, LibraryAgent from pydantic import BaseModel from backend.data.execution import get_graph_executions, get_graph_executions_count from backend.data.rabbitmq import SyncRabbitMQ from backend.executor.utils import ( GRAPH_EXECUTION_CANCEL_EXCHANGE, + GRAPH_EXECUTION_CANCEL_QUEUE_NAME, GRAPH_EXECUTION_QUEUE_NAME, CancelExecutionEvent, create_execution_queue_config, ) -from backend.util.clients import get_async_execution_queue +from backend.util.clients import get_async_execution_queue, get_scheduler_client logger = logging.getLogger(__name__) +# System job IDs (exclude from user schedule counts) +SYSTEM_JOB_IDS = { + "cleanup_expired_files", + "report_late_executions", + "report_block_error_rates", + "process_existing_batches", + "process_weekly_summary", +} + + class RunningExecutionDetail(BaseModel): """Details about a running execution for admin view""" @@ -164,148 +177,137 @@ async def get_execution_diagnostics() -> ExecutionDiagnosticsSummary: Returns: ExecutionDiagnosticsSummary with current execution state """ - try: - now = datetime.now(timezone.utc) - one_hour_ago = now - timedelta(hours=1) - twenty_four_hours_ago = now - timedelta(hours=24) + now = datetime.now(timezone.utc) + one_hour_ago = now - timedelta(hours=1) + twenty_four_hours_ago = now - timedelta(hours=24) - # Get running executions count - running_count = await get_graph_executions_count( - statuses=[AgentExecutionStatus.RUNNING] - ) + # Get running executions count + running_count = await get_graph_executions_count( + statuses=[AgentExecutionStatus.RUNNING] + ) - # Get queued executions from database - queued_db_count = await get_graph_executions_count( - statuses=[AgentExecutionStatus.QUEUED] - ) + # Get queued executions from database + queued_db_count = await get_graph_executions_count( + statuses=[AgentExecutionStatus.QUEUED] + ) - # Get RabbitMQ queue depths (both execution and cancel queues) - rabbitmq_queue_depth = get_rabbitmq_queue_depth() - cancel_queue_depth = get_rabbitmq_cancel_queue_depth() + # Get RabbitMQ queue depths (both execution and cancel queues) + rabbitmq_queue_depth = get_rabbitmq_queue_depth() + cancel_queue_depth = get_rabbitmq_cancel_queue_depth() - # Orphaned execution detection (>24h old, likely not in executor) - orphaned_running = await get_graph_executions_count( - statuses=[AgentExecutionStatus.RUNNING], - created_time_lte=twenty_four_hours_ago, - ) + # Orphaned execution detection (>24h old, likely not in executor) + orphaned_running = await get_graph_executions_count( + statuses=[AgentExecutionStatus.RUNNING], + created_time_lte=twenty_four_hours_ago, + ) - orphaned_queued = await get_graph_executions_count( - statuses=[AgentExecutionStatus.QUEUED], - created_time_lte=twenty_four_hours_ago, - ) + orphaned_queued = await get_graph_executions_count( + statuses=[AgentExecutionStatus.QUEUED], + created_time_lte=twenty_four_hours_ago, + ) - # Failure metrics - failed_count_1h = await AgentGraphExecution.prisma().count( - where={ - "executionStatus": AgentExecutionStatus.FAILED, - "updatedAt": {"gte": one_hour_ago}, - } - ) + # Failure metrics - use updated_time filter for "failed within time window" + failed_count_1h = await get_graph_executions_count( + statuses=[AgentExecutionStatus.FAILED], + updated_time_gte=one_hour_ago, + ) - failed_count_24h = await AgentGraphExecution.prisma().count( - where={ - "executionStatus": AgentExecutionStatus.FAILED, - "updatedAt": {"gte": twenty_four_hours_ago}, - } - ) + failed_count_24h = await get_graph_executions_count( + statuses=[AgentExecutionStatus.FAILED], + updated_time_gte=twenty_four_hours_ago, + ) - failure_rate_24h = failed_count_24h / 24.0 if failed_count_24h > 0 else 0.0 + failure_rate_24h = failed_count_24h / 24.0 if failed_count_24h > 0 else 0.0 - # Long-running detection (started running >24h ago, still running) - stuck_running_24h = await get_graph_executions_count( - statuses=[AgentExecutionStatus.RUNNING], - started_time_lte=twenty_four_hours_ago, - ) + # Long-running detection (started running >24h ago, still running) + stuck_running_24h = await get_graph_executions_count( + statuses=[AgentExecutionStatus.RUNNING], + started_time_lte=twenty_four_hours_ago, + ) - stuck_running_1h = await get_graph_executions_count( - statuses=[AgentExecutionStatus.RUNNING], - started_time_lte=one_hour_ago, - ) + stuck_running_1h = await get_graph_executions_count( + statuses=[AgentExecutionStatus.RUNNING], + started_time_lte=one_hour_ago, + ) - # Find oldest running execution (by when it started running) - oldest_running_list = await get_graph_executions( - statuses=[AgentExecutionStatus.RUNNING], - order_by="startedAt", - order_direction="asc", - limit=1, - ) + # Find oldest running execution (by when it started running) + oldest_running_list = await get_graph_executions( + statuses=[AgentExecutionStatus.RUNNING], + order_by="startedAt", + order_direction="asc", + limit=1, + ) - oldest_running_hours = None - if oldest_running_list and oldest_running_list[0].started_at: - age_seconds = (now - oldest_running_list[0].started_at).total_seconds() - oldest_running_hours = age_seconds / 3600.0 + oldest_running_hours = None + if oldest_running_list and oldest_running_list[0].started_at: + age_seconds = (now - oldest_running_list[0].started_at).total_seconds() + oldest_running_hours = age_seconds / 3600.0 - # Stuck queued detection - stuck_queued_1h = await get_graph_executions_count( - statuses=[AgentExecutionStatus.QUEUED], - created_time_lte=one_hour_ago, - ) + # Stuck queued detection + stuck_queued_1h = await get_graph_executions_count( + statuses=[AgentExecutionStatus.QUEUED], + created_time_lte=one_hour_ago, + ) - queued_never_started = await AgentGraphExecution.prisma().count( - where={ - "executionStatus": AgentExecutionStatus.QUEUED, - "startedAt": None, - } - ) + # These queries need direct Prisma access for null checks which aren't in model functions + queued_never_started = await AgentGraphExecution.prisma().count( + where={ + "executionStatus": AgentExecutionStatus.QUEUED, + "startedAt": None, + } + ) - # Invalid state detection (data corruption - requires manual investigation) - invalid_queued_with_start = await AgentGraphExecution.prisma().count( - where={ - "executionStatus": AgentExecutionStatus.QUEUED, - # ideally this is not None but thats not a valid query - "startedAt": {"lte": now}, - } - ) + # Invalid state detection (data corruption - requires manual investigation) + # QUEUED but has startedAt - use "not None" check + invalid_queued_with_start = await AgentGraphExecution.prisma().count( + where={ + "executionStatus": AgentExecutionStatus.QUEUED, + "startedAt": {"not": None}, + } + ) - invalid_running_without_start = await AgentGraphExecution.prisma().count( - where={ - "executionStatus": AgentExecutionStatus.RUNNING, - "startedAt": None, - } - ) + invalid_running_without_start = await AgentGraphExecution.prisma().count( + where={ + "executionStatus": AgentExecutionStatus.RUNNING, + "startedAt": None, + } + ) - # Throughput metrics - completed_1h = await AgentGraphExecution.prisma().count( - where={ - "executionStatus": AgentExecutionStatus.COMPLETED, - "updatedAt": {"gte": one_hour_ago}, - } - ) + # Throughput metrics - use updated_time filter for "completed within time window" + completed_1h = await get_graph_executions_count( + statuses=[AgentExecutionStatus.COMPLETED], + updated_time_gte=one_hour_ago, + ) - completed_24h = await AgentGraphExecution.prisma().count( - where={ - "executionStatus": AgentExecutionStatus.COMPLETED, - "updatedAt": {"gte": twenty_four_hours_ago}, - } - ) + completed_24h = await get_graph_executions_count( + statuses=[AgentExecutionStatus.COMPLETED], + updated_time_gte=twenty_four_hours_ago, + ) - throughput_per_hour = completed_24h / 24.0 if completed_24h > 0 else 0.0 + throughput_per_hour = completed_24h / 24.0 if completed_24h > 0 else 0.0 - return ExecutionDiagnosticsSummary( - running_count=running_count, - queued_db_count=queued_db_count, - rabbitmq_queue_depth=rabbitmq_queue_depth, - cancel_queue_depth=cancel_queue_depth, - orphaned_running=orphaned_running, - orphaned_queued=orphaned_queued, - failed_count_1h=failed_count_1h, - failed_count_24h=failed_count_24h, - failure_rate_24h=failure_rate_24h, - stuck_running_24h=stuck_running_24h, - stuck_running_1h=stuck_running_1h, - oldest_running_hours=oldest_running_hours, - stuck_queued_1h=stuck_queued_1h, - queued_never_started=queued_never_started, - invalid_queued_with_start=invalid_queued_with_start, - invalid_running_without_start=invalid_running_without_start, - completed_1h=completed_1h, - completed_24h=completed_24h, - throughput_per_hour=throughput_per_hour, - timestamp=now.isoformat(), - ) - except Exception as e: - logger.error(f"Error getting execution diagnostics: {e}") - raise + return ExecutionDiagnosticsSummary( + running_count=running_count, + queued_db_count=queued_db_count, + rabbitmq_queue_depth=rabbitmq_queue_depth, + cancel_queue_depth=cancel_queue_depth, + orphaned_running=orphaned_running, + orphaned_queued=orphaned_queued, + failed_count_1h=failed_count_1h, + failed_count_24h=failed_count_24h, + failure_rate_24h=failure_rate_24h, + stuck_running_24h=stuck_running_24h, + stuck_running_1h=stuck_running_1h, + oldest_running_hours=oldest_running_hours, + stuck_queued_1h=stuck_queued_1h, + queued_never_started=queued_never_started, + invalid_queued_with_start=invalid_queued_with_start, + invalid_running_without_start=invalid_running_without_start, + completed_1h=completed_1h, + completed_24h=completed_24h, + throughput_per_hour=throughput_per_hour, + timestamp=now.isoformat(), + ) async def get_agent_diagnostics() -> AgentDiagnosticsSummary: @@ -315,24 +317,20 @@ async def get_agent_diagnostics() -> AgentDiagnosticsSummary: Returns: AgentDiagnosticsSummary with agent metrics """ - try: - # Get distinct agent graph IDs with active executions - executions = await AgentGraphExecution.prisma().find_many( - where={ - "executionStatus": { - "in": [AgentExecutionStatus.RUNNING, AgentExecutionStatus.QUEUED] # type: ignore - } - }, - distinct=["agentGraphId"], - ) + # Get distinct agent graph IDs with active executions + executions = await AgentGraphExecution.prisma().find_many( + where={ + "executionStatus": { + "in": [AgentExecutionStatus.RUNNING, AgentExecutionStatus.QUEUED] # type: ignore + } + }, + distinct=["agentGraphId"], + ) - return AgentDiagnosticsSummary( - agents_with_active_executions=len(executions), - timestamp=datetime.now(timezone.utc).isoformat(), - ) - except Exception as e: - logger.error(f"Error getting agent diagnostics: {e}") - raise + return AgentDiagnosticsSummary( + agents_with_active_executions=len(executions), + timestamp=datetime.now(timezone.utc).isoformat(), + ) async def get_schedule_health_metrics() -> ScheduleHealthMetrics: @@ -342,77 +340,60 @@ async def get_schedule_health_metrics() -> ScheduleHealthMetrics: Returns: ScheduleHealthMetrics with schedule health info """ - try: - from backend.util.clients import get_scheduler_client + scheduler = get_scheduler_client() - scheduler = get_scheduler_client() + # Get all schedules from scheduler service + all_schedules = await scheduler.get_execution_schedules() - # System job IDs (exclude from user schedule counts) - SYSTEM_JOB_IDS = { - "cleanup_expired_files", - "report_late_executions", - "report_block_error_rates", - "process_existing_batches", - "process_weekly_summary", - } + # Filter user vs system schedules + user_schedules = [s for s in all_schedules if s.id not in SYSTEM_JOB_IDS] + system_schedules_count = len(all_schedules) - len(user_schedules) - # Get all schedules from scheduler service - all_schedules = await scheduler.get_execution_schedules() + # Detect orphaned schedules + orphans = await _detect_orphaned_schedules(user_schedules) - # Filter user vs system schedules - user_schedules = [s for s in all_schedules if s.id not in SYSTEM_JOB_IDS] - system_schedules_count = len(all_schedules) - len(user_schedules) + # Count schedules by next run time + now = datetime.now(timezone.utc) + one_hour_from_now = now + timedelta(hours=1) + twenty_four_hours_from_now = now + timedelta(hours=24) - # Detect orphaned schedules - orphans = await _detect_orphaned_schedules(user_schedules) + schedules_next_hour = sum( + 1 + for s in user_schedules + if s.next_run_time + and datetime.fromisoformat(s.next_run_time.replace("Z", "+00:00")) + <= one_hour_from_now + ) - # Count schedules by next run time - now = datetime.now(timezone.utc) - one_hour_from_now = now + timedelta(hours=1) - twenty_four_hours_from_now = now + timedelta(hours=24) + schedules_next_24h = sum( + 1 + for s in user_schedules + if s.next_run_time + and datetime.fromisoformat(s.next_run_time.replace("Z", "+00:00")) + <= twenty_four_hours_from_now + ) - schedules_next_hour = sum( - 1 - for s in user_schedules - if s.next_run_time - and datetime.fromisoformat(s.next_run_time.replace("Z", "+00:00")) - <= one_hour_from_now - ) + # Calculate total execution runs (not just unique schedules) + total_runs_next_hour = _calculate_total_runs(user_schedules, now, one_hour_from_now) + total_runs_next_24h = _calculate_total_runs( + user_schedules, now, twenty_four_hours_from_now + ) - schedules_next_24h = sum( - 1 - for s in user_schedules - if s.next_run_time - and datetime.fromisoformat(s.next_run_time.replace("Z", "+00:00")) - <= twenty_four_hours_from_now - ) - - # Calculate total execution runs (not just unique schedules) - total_runs_next_hour = _calculate_total_runs( - user_schedules, now, one_hour_from_now - ) - total_runs_next_24h = _calculate_total_runs( - user_schedules, now, twenty_four_hours_from_now - ) - - return ScheduleHealthMetrics( - total_schedules=len(all_schedules), - user_schedules=len(user_schedules), - system_schedules=system_schedules_count, - orphaned_deleted_graph=len(orphans["deleted_graph"]), - orphaned_no_library_access=len(orphans["no_library_access"]), - orphaned_invalid_credentials=len(orphans["invalid_credentials"]), - orphaned_validation_failed=len(orphans["validation_failed"]), - total_orphaned=sum(len(v) for v in orphans.values()), - schedules_next_hour=schedules_next_hour, - schedules_next_24h=schedules_next_24h, - total_runs_next_hour=total_runs_next_hour, - total_runs_next_24h=total_runs_next_24h, - timestamp=now.isoformat(), - ) - except Exception as e: - logger.error(f"Error getting schedule diagnostics: {e}") - raise + return ScheduleHealthMetrics( + total_schedules=len(all_schedules), + user_schedules=len(user_schedules), + system_schedules=system_schedules_count, + orphaned_deleted_graph=len(orphans["deleted_graph"]), + orphaned_no_library_access=len(orphans["no_library_access"]), + orphaned_invalid_credentials=len(orphans["invalid_credentials"]), + orphaned_validation_failed=len(orphans["validation_failed"]), + total_orphaned=sum(len(v) for v in orphans.values()), + schedules_next_hour=schedules_next_hour, + schedules_next_24h=schedules_next_24h, + total_runs_next_hour=total_runs_next_hour, + total_runs_next_24h=total_runs_next_24h, + timestamp=now.isoformat(), + ) def _calculate_total_runs( @@ -429,8 +410,6 @@ def _calculate_total_runs( Returns: Total number of execution runs across all schedules """ - from croniter import croniter - total_runs = 0 for schedule in schedules: @@ -472,8 +451,6 @@ async def _detect_orphaned_schedules(schedules: list) -> dict: Returns: Dict categorizing orphans by type """ - from prisma.models import AgentGraph, LibraryAgent - orphans = { "deleted_graph": [], "no_library_access": [], @@ -515,8 +492,6 @@ async def _detect_orphaned_schedules(schedules: list) -> dict: # Note: Full credential validation would require integration_creds_manager # For now, skip credential validation to avoid complexity # Orphaned credentials will be caught during execution attempt - # if schedule.input_credentials: - # # TODO: Add credential validation when needed except Exception as e: logger.error(f"Error validating schedule {schedule.id}: {e}") @@ -562,201 +537,6 @@ def get_rabbitmq_queue_depth() -> int: return -1 -async def get_all_schedules_details( - limit: int = 100, offset: int = 0 -) -> List[ScheduleDetail]: - """ - Get detailed information about all user schedules via Scheduler service. - - Args: - limit: Maximum number of schedules to return - offset: Number of schedules to skip - - Returns: - List of ScheduleDetail objects - """ - try: - from prisma.models import AgentGraph - - from backend.util.clients import get_scheduler_client - - scheduler = get_scheduler_client() - - # System job IDs to exclude - SYSTEM_JOB_IDS = { - "cleanup_expired_files", - "report_late_executions", - "report_block_error_rates", - "process_existing_batches", - "process_weekly_summary", - } - - # Get all schedules from scheduler - all_schedules = await scheduler.get_execution_schedules() - - # Filter to user schedules only - user_schedules = [s for s in all_schedules if s.id not in SYSTEM_JOB_IDS] - - # Apply pagination - paginated_schedules = user_schedules[offset : offset + limit] - - # Enrich with graph and user details - results = [] - for schedule in paginated_schedules: - # Get graph name - graph = await AgentGraph.prisma().find_unique( - where={ - "graphVersionId": { - "id": schedule.graph_id, - "version": schedule.graph_version, - } - }, - include={"User": True}, - ) - - graph_name = graph.name if graph and graph.name else "Unknown" - user_email = graph.User.email if graph and graph.User else None - - results.append( - ScheduleDetail( - schedule_id=schedule.id, - schedule_name=schedule.name, - graph_id=schedule.graph_id, - graph_name=graph_name, - graph_version=schedule.graph_version, - user_id=schedule.user_id, - user_email=user_email, - cron=schedule.cron, - timezone=schedule.timezone, - next_run_time=schedule.next_run_time, - ) - ) - - return results - except Exception as e: - logger.error(f"Error getting schedule details: {e}") - raise - - -async def get_orphaned_schedules_details() -> List[OrphanedScheduleDetail]: - """ - Get detailed list of orphaned schedules with orphan reasons. - - Returns: - List of OrphanedScheduleDetail objects - """ - try: - from backend.util.clients import get_scheduler_client - - scheduler = get_scheduler_client() - - # System job IDs to exclude - SYSTEM_JOB_IDS = { - "cleanup_expired_files", - "report_late_executions", - "report_block_error_rates", - "process_existing_batches", - "process_weekly_summary", - } - - # Get all schedules - all_schedules = await scheduler.get_execution_schedules() - user_schedules = [s for s in all_schedules if s.id not in SYSTEM_JOB_IDS] - - # Detect orphans with categorization - orphan_categories = await _detect_orphaned_schedules(user_schedules) - - # Build detailed orphan list - results = [] - for orphan_type, schedule_ids in orphan_categories.items(): - for schedule_id in schedule_ids: - # Find the schedule - schedule = next( - (s for s in user_schedules if s.id == schedule_id), None - ) - if not schedule: - continue - - results.append( - OrphanedScheduleDetail( - schedule_id=schedule.id, - schedule_name=schedule.name, - graph_id=schedule.graph_id, - graph_version=schedule.graph_version, - user_id=schedule.user_id, - orphan_reason=orphan_type, - error_detail=None, # Could add more detail in future - next_run_time=schedule.next_run_time, - ) - ) - - return results - except Exception as e: - logger.error(f"Error getting orphaned schedule details: {e}") - raise - - -async def cleanup_orphaned_schedules_bulk( - schedule_ids: List[str], admin_user_id: str -) -> int: - """ - Cleanup multiple orphaned schedules by deleting from scheduler. - - Args: - schedule_ids: List of schedule IDs to delete - admin_user_id: ID of the admin user performing the operation - - Returns: - Number of schedules successfully deleted - """ - import asyncio - - try: - from backend.util.clients import get_scheduler_client - - logger.info( - f"Admin user {admin_user_id} cleaning up {len(schedule_ids)} orphaned schedules" - ) - - scheduler = get_scheduler_client() - - # Delete schedules in parallel - async def delete_schedule(schedule_id: str) -> bool: - try: - # Note: delete_schedule requires user_id but we're admin - # We'll need to get the user_id from the schedule first - all_schedules = await scheduler.get_execution_schedules() - schedule = next((s for s in all_schedules if s.id == schedule_id), None) - - if not schedule: - logger.warning(f"Schedule {schedule_id} not found") - return False - - await scheduler.delete_schedule( - schedule_id=schedule_id, user_id=schedule.user_id - ) - return True - except Exception as e: - logger.error(f"Failed to delete schedule {schedule_id}: {e}") - return False - - results = await asyncio.gather( - *[delete_schedule(schedule_id) for schedule_id in schedule_ids], - return_exceptions=False, - ) - - deleted_count = sum(1 for success in results if success) - - logger.info( - f"Admin {admin_user_id} deleted {deleted_count}/{len(schedule_ids)} orphaned schedules" - ) - - return deleted_count - except Exception as e: - logger.error(f"Error cleaning up orphaned schedules: {e}") - return 0 - - def get_rabbitmq_cancel_queue_depth() -> int: """ Get the number of messages in the RabbitMQ cancel queue. @@ -765,8 +545,6 @@ def get_rabbitmq_cancel_queue_depth() -> int: Number of messages in cancel queue, or -1 if error """ try: - from backend.executor.utils import GRAPH_EXECUTION_CANCEL_QUEUE_NAME - # Create a temporary connection to query the queue config = create_execution_queue_config() rabbitmq = SyncRabbitMQ(config) @@ -796,6 +574,159 @@ def get_rabbitmq_cancel_queue_depth() -> int: return -1 +async def get_all_schedules_details( + limit: int = 100, offset: int = 0 +) -> List[ScheduleDetail]: + """ + Get detailed information about all user schedules via Scheduler service. + + Args: + limit: Maximum number of schedules to return + offset: Number of schedules to skip + + Returns: + List of ScheduleDetail objects + """ + scheduler = get_scheduler_client() + + # Get all schedules from scheduler + all_schedules = await scheduler.get_execution_schedules() + + # Filter to user schedules only + user_schedules = [s for s in all_schedules if s.id not in SYSTEM_JOB_IDS] + + # Apply pagination + paginated_schedules = user_schedules[offset : offset + limit] + + # Enrich with graph and user details + results = [] + for schedule in paginated_schedules: + # Get graph name + graph = await AgentGraph.prisma().find_unique( + where={ + "graphVersionId": { + "id": schedule.graph_id, + "version": schedule.graph_version, + } + }, + include={"User": True}, + ) + + graph_name = graph.name if graph and graph.name else "Unknown" + user_email = graph.User.email if graph and graph.User else None + + results.append( + ScheduleDetail( + schedule_id=schedule.id, + schedule_name=schedule.name, + graph_id=schedule.graph_id, + graph_name=graph_name, + graph_version=schedule.graph_version, + user_id=schedule.user_id, + user_email=user_email, + cron=schedule.cron, + timezone=schedule.timezone, + next_run_time=schedule.next_run_time, + ) + ) + + return results + + +async def get_orphaned_schedules_details() -> List[OrphanedScheduleDetail]: + """ + Get detailed list of orphaned schedules with orphan reasons. + + Returns: + List of OrphanedScheduleDetail objects + """ + scheduler = get_scheduler_client() + + # Get all schedules + all_schedules = await scheduler.get_execution_schedules() + user_schedules = [s for s in all_schedules if s.id not in SYSTEM_JOB_IDS] + + # Detect orphans with categorization + orphan_categories = await _detect_orphaned_schedules(user_schedules) + + # Build detailed orphan list + results = [] + for orphan_type, schedule_ids in orphan_categories.items(): + for schedule_id in schedule_ids: + # Find the schedule + schedule = next((s for s in user_schedules if s.id == schedule_id), None) + if not schedule: + continue + + results.append( + OrphanedScheduleDetail( + schedule_id=schedule.id, + schedule_name=schedule.name, + graph_id=schedule.graph_id, + graph_version=schedule.graph_version, + user_id=schedule.user_id, + orphan_reason=orphan_type, + error_detail=None, # Could add more detail in future + next_run_time=schedule.next_run_time, + ) + ) + + return results + + +async def cleanup_orphaned_schedules_bulk( + schedule_ids: List[str], admin_user_id: str +) -> int: + """ + Cleanup multiple orphaned schedules by deleting from scheduler. + + Args: + schedule_ids: List of schedule IDs to delete + admin_user_id: ID of the admin user performing the operation + + Returns: + Number of schedules successfully deleted + """ + logger.info( + f"Admin user {admin_user_id} cleaning up {len(schedule_ids)} orphaned schedules" + ) + + scheduler = get_scheduler_client() + + # Fetch all schedules once to avoid N+1 queries + all_schedules = await scheduler.get_execution_schedules() + schedule_map = {s.id: s for s in all_schedules} + + # Delete schedules in parallel + async def delete_schedule(schedule_id: str) -> bool: + schedule = schedule_map.get(schedule_id) + if not schedule: + logger.warning(f"Schedule {schedule_id} not found") + return False + + try: + await scheduler.delete_schedule( + schedule_id=schedule_id, user_id=schedule.user_id + ) + return True + except Exception as e: + logger.error(f"Failed to delete schedule {schedule_id}: {e}") + return False + + results = await asyncio.gather( + *[delete_schedule(schedule_id) for schedule_id in schedule_ids], + return_exceptions=False, + ) + + deleted_count = sum(1 for success in results if success) + + logger.info( + f"Admin {admin_user_id} deleted {deleted_count}/{len(schedule_ids)} orphaned schedules" + ) + + return deleted_count + + async def get_running_executions_details( limit: int = 100, offset: int = 0 ) -> List[RunningExecutionDetail]: @@ -809,47 +740,43 @@ async def get_running_executions_details( Returns: List of RunningExecutionDetail objects """ - try: - executions = await AgentGraphExecution.prisma().find_many( - where={ - "executionStatus": { - "in": [AgentExecutionStatus.RUNNING, AgentExecutionStatus.QUEUED] # type: ignore - } - }, - include={ - "AgentGraph": True, - "User": True, - }, - take=limit, - skip=offset, - order={"createdAt": "desc"}, + executions = await AgentGraphExecution.prisma().find_many( + where={ + "executionStatus": { + "in": [AgentExecutionStatus.RUNNING, AgentExecutionStatus.QUEUED] # type: ignore + } + }, + include={ + "AgentGraph": True, + "User": True, + }, + take=limit, + skip=offset, + order={"createdAt": "desc"}, + ) + + results = [] + for exec in executions: + results.append( + RunningExecutionDetail( + execution_id=exec.id, + graph_id=exec.agentGraphId, + graph_name=( + exec.AgentGraph.name + if exec.AgentGraph and exec.AgentGraph.name + else "Unknown" + ), + graph_version=exec.agentGraphVersion, + user_id=exec.userId, + user_email=exec.User.email if exec.User else None, + status=exec.executionStatus, + created_at=exec.createdAt, + started_at=exec.startedAt, + queue_status=None, # Queue status not available from AgentGraphExecution model + ) ) - results = [] - for exec in executions: - results.append( - RunningExecutionDetail( - execution_id=exec.id, - graph_id=exec.agentGraphId, - graph_name=( - exec.AgentGraph.name - if exec.AgentGraph and exec.AgentGraph.name - else "Unknown" - ), - graph_version=exec.agentGraphVersion, - user_id=exec.userId, - user_email=exec.User.email if exec.User else None, - status=exec.executionStatus, - created_at=exec.createdAt, - started_at=exec.startedAt, - queue_status=None, # Queue status not available from AgentGraphExecution model - ) - ) - - return results - except Exception as e: - logger.error(f"Error getting running execution details: {e}") - raise + return results async def get_orphaned_executions_details( @@ -865,51 +792,47 @@ async def get_orphaned_executions_details( Returns: List of orphaned RunningExecutionDetail objects """ - try: - # Executions older than 24 hours are likely orphaned - cutoff = datetime.now(timezone.utc) - timedelta(hours=24) + # Executions older than 24 hours are likely orphaned + cutoff = datetime.now(timezone.utc) - timedelta(hours=24) - executions = await AgentGraphExecution.prisma().find_many( - where={ - "executionStatus": { - "in": [AgentExecutionStatus.RUNNING, AgentExecutionStatus.QUEUED] # type: ignore - }, - "createdAt": {"lt": cutoff}, + executions = await AgentGraphExecution.prisma().find_many( + where={ + "executionStatus": { + "in": [AgentExecutionStatus.RUNNING, AgentExecutionStatus.QUEUED] # type: ignore }, - include={ - "AgentGraph": True, - "User": True, - }, - take=limit, - skip=offset, - order={"createdAt": "asc"}, # Oldest first for orphaned + "createdAt": {"lt": cutoff}, + }, + include={ + "AgentGraph": True, + "User": True, + }, + take=limit, + skip=offset, + order={"createdAt": "asc"}, # Oldest first for orphaned + ) + + results = [] + for exec in executions: + results.append( + RunningExecutionDetail( + execution_id=exec.id, + graph_id=exec.agentGraphId, + graph_name=( + exec.AgentGraph.name + if exec.AgentGraph and exec.AgentGraph.name + else "Unknown" + ), + graph_version=exec.agentGraphVersion, + user_id=exec.userId, + user_email=exec.User.email if exec.User else None, + status=exec.executionStatus, + created_at=exec.createdAt, + started_at=exec.startedAt, + queue_status=None, + ) ) - results = [] - for exec in executions: - results.append( - RunningExecutionDetail( - execution_id=exec.id, - graph_id=exec.agentGraphId, - graph_name=( - exec.AgentGraph.name - if exec.AgentGraph and exec.AgentGraph.name - else "Unknown" - ), - graph_version=exec.agentGraphVersion, - user_id=exec.userId, - user_email=exec.User.email if exec.User else None, - status=exec.executionStatus, - created_at=exec.createdAt, - started_at=exec.startedAt, - queue_status=None, - ) - ) - - return results - except Exception as e: - logger.error(f"Error getting orphaned execution details: {e}") - raise + return results async def get_long_running_executions_details( @@ -925,49 +848,45 @@ async def get_long_running_executions_details( Returns: List of long-running RunningExecutionDetail objects """ - try: - # RUNNING executions that started running >24 hours ago - cutoff = datetime.now(timezone.utc) - timedelta(hours=24) + # RUNNING executions that started running >24 hours ago + cutoff = datetime.now(timezone.utc) - timedelta(hours=24) - executions = await AgentGraphExecution.prisma().find_many( - where={ - "executionStatus": AgentExecutionStatus.RUNNING, - "startedAt": {"lt": cutoff}, - }, - include={ - "AgentGraph": True, - "User": True, - }, - take=limit, - skip=offset, - order={"startedAt": "asc"}, # Oldest first by start time + executions = await AgentGraphExecution.prisma().find_many( + where={ + "executionStatus": AgentExecutionStatus.RUNNING, + "startedAt": {"lt": cutoff}, + }, + include={ + "AgentGraph": True, + "User": True, + }, + take=limit, + skip=offset, + order={"startedAt": "asc"}, # Oldest first by start time + ) + + results = [] + for exec in executions: + results.append( + RunningExecutionDetail( + execution_id=exec.id, + graph_id=exec.agentGraphId, + graph_name=( + exec.AgentGraph.name + if exec.AgentGraph and exec.AgentGraph.name + else "Unknown" + ), + graph_version=exec.agentGraphVersion, + user_id=exec.userId, + user_email=exec.User.email if exec.User else None, + status=exec.executionStatus, + created_at=exec.createdAt, + started_at=exec.startedAt, + queue_status=None, + ) ) - results = [] - for exec in executions: - results.append( - RunningExecutionDetail( - execution_id=exec.id, - graph_id=exec.agentGraphId, - graph_name=( - exec.AgentGraph.name - if exec.AgentGraph and exec.AgentGraph.name - else "Unknown" - ), - graph_version=exec.agentGraphVersion, - user_id=exec.userId, - user_email=exec.User.email if exec.User else None, - status=exec.executionStatus, - created_at=exec.createdAt, - started_at=exec.startedAt, - queue_status=None, - ) - ) - - return results - except Exception as e: - logger.error(f"Error getting long-running execution details: {e}") - raise + return results async def get_stuck_queued_executions_details( @@ -983,49 +902,45 @@ async def get_stuck_queued_executions_details( Returns: List of stuck queued RunningExecutionDetail objects """ - try: - # QUEUED executions older than 1 hour that never started - one_hour_ago = datetime.now(timezone.utc) - timedelta(hours=1) + # QUEUED executions older than 1 hour that never started + one_hour_ago = datetime.now(timezone.utc) - timedelta(hours=1) - executions = await AgentGraphExecution.prisma().find_many( - where={ - "executionStatus": AgentExecutionStatus.QUEUED, - "createdAt": {"lt": one_hour_ago}, - }, - include={ - "AgentGraph": True, - "User": True, - }, - take=limit, - skip=offset, - order={"createdAt": "asc"}, # Oldest first + executions = await AgentGraphExecution.prisma().find_many( + where={ + "executionStatus": AgentExecutionStatus.QUEUED, + "createdAt": {"lt": one_hour_ago}, + }, + include={ + "AgentGraph": True, + "User": True, + }, + take=limit, + skip=offset, + order={"createdAt": "asc"}, # Oldest first + ) + + results = [] + for exec in executions: + results.append( + RunningExecutionDetail( + execution_id=exec.id, + graph_id=exec.agentGraphId, + graph_name=( + exec.AgentGraph.name + if exec.AgentGraph and exec.AgentGraph.name + else "Unknown" + ), + graph_version=exec.agentGraphVersion, + user_id=exec.userId, + user_email=exec.User.email if exec.User else None, + status=exec.executionStatus, + created_at=exec.createdAt, + started_at=exec.startedAt, + queue_status=None, + ) ) - results = [] - for exec in executions: - results.append( - RunningExecutionDetail( - execution_id=exec.id, - graph_id=exec.agentGraphId, - graph_name=( - exec.AgentGraph.name - if exec.AgentGraph and exec.AgentGraph.name - else "Unknown" - ), - graph_version=exec.agentGraphVersion, - user_id=exec.userId, - user_email=exec.User.email if exec.User else None, - status=exec.executionStatus, - created_at=exec.createdAt, - started_at=exec.startedAt, - queue_status=None, - ) - ) - - return results - except Exception as e: - logger.error(f"Error getting stuck queued execution details: {e}") - raise + return results async def get_invalid_executions_details( @@ -1047,57 +962,53 @@ async def get_invalid_executions_details( Returns: List of invalid RunningExecutionDetail objects """ - try: - # Find both types of invalid states - executions = await AgentGraphExecution.prisma().find_many( - where={ - "OR": [ # type: ignore - { - # QUEUED but has startedAt - "executionStatus": AgentExecutionStatus.QUEUED, - "startedAt": {"not": None}, # type: ignore - }, - { - # RUNNING but no startedAt - "executionStatus": AgentExecutionStatus.RUNNING, - "startedAt": None, - }, - ] - }, - include={ - "AgentGraph": True, - "User": True, - }, - take=limit, - skip=offset, - order={"createdAt": "desc"}, # Most recent first + # Find both types of invalid states + executions = await AgentGraphExecution.prisma().find_many( + where={ + "OR": [ # type: ignore + { + # QUEUED but has startedAt + "executionStatus": AgentExecutionStatus.QUEUED, + "startedAt": {"not": None}, # type: ignore + }, + { + # RUNNING but no startedAt + "executionStatus": AgentExecutionStatus.RUNNING, + "startedAt": None, + }, + ] + }, + include={ + "AgentGraph": True, + "User": True, + }, + take=limit, + skip=offset, + order={"createdAt": "desc"}, # Most recent first + ) + + results = [] + for exec in executions: + results.append( + RunningExecutionDetail( + execution_id=exec.id, + graph_id=exec.agentGraphId, + graph_name=( + exec.AgentGraph.name + if exec.AgentGraph and exec.AgentGraph.name + else "Unknown" + ), + graph_version=exec.agentGraphVersion, + user_id=exec.userId, + user_email=exec.User.email if exec.User else None, + status=exec.executionStatus, + created_at=exec.createdAt, + started_at=exec.startedAt, + queue_status=None, + ) ) - results = [] - for exec in executions: - results.append( - RunningExecutionDetail( - execution_id=exec.id, - graph_id=exec.agentGraphId, - graph_name=( - exec.AgentGraph.name - if exec.AgentGraph and exec.AgentGraph.name - else "Unknown" - ), - graph_version=exec.agentGraphVersion, - user_id=exec.userId, - user_email=exec.User.email if exec.User else None, - status=exec.executionStatus, - created_at=exec.createdAt, - started_at=exec.startedAt, - queue_status=None, - ) - ) - - return results - except Exception as e: - logger.error(f"Error getting invalid execution details: {e}") - raise + return results async def get_failed_executions_count(hours: int = 24) -> int: @@ -1110,16 +1021,12 @@ async def get_failed_executions_count(hours: int = 24) -> int: Returns: Count of failed executions """ - try: - cutoff = datetime.now(timezone.utc) - timedelta(hours=hours) - count = await get_graph_executions_count( - statuses=[AgentExecutionStatus.FAILED], - created_time_gte=cutoff, - ) - return count - except Exception as e: - logger.error(f"Error getting failed executions count: {e}") - raise + cutoff = datetime.now(timezone.utc) - timedelta(hours=hours) + count = await get_graph_executions_count( + statuses=[AgentExecutionStatus.FAILED], + created_time_gte=cutoff, + ) + return count async def get_failed_executions_details( @@ -1136,54 +1043,50 @@ async def get_failed_executions_details( Returns: List of FailedExecutionDetail objects """ - try: - cutoff = datetime.now(timezone.utc) - timedelta(hours=hours) + cutoff = datetime.now(timezone.utc) - timedelta(hours=hours) - executions = await AgentGraphExecution.prisma().find_many( - where={ - "executionStatus": AgentExecutionStatus.FAILED, - "updatedAt": {"gte": cutoff}, - }, - include={ - "AgentGraph": True, - "User": True, - }, - take=limit, - skip=offset, - order={"updatedAt": "desc"}, # Most recent failures first + executions = await AgentGraphExecution.prisma().find_many( + where={ + "executionStatus": AgentExecutionStatus.FAILED, + "updatedAt": {"gte": cutoff}, + }, + include={ + "AgentGraph": True, + "User": True, + }, + take=limit, + skip=offset, + order={"updatedAt": "desc"}, # Most recent failures first + ) + + results = [] + for exec in executions: + # Extract error from stats JSON field + error_message = None + if exec.stats and isinstance(exec.stats, dict): + error_message = exec.stats.get("error") + + results.append( + FailedExecutionDetail( + execution_id=exec.id, + graph_id=exec.agentGraphId, + graph_name=( + exec.AgentGraph.name + if exec.AgentGraph and exec.AgentGraph.name + else "Unknown" + ), + graph_version=exec.agentGraphVersion, + user_id=exec.userId, + user_email=exec.User.email if exec.User else None, + status=exec.executionStatus, + created_at=exec.createdAt, + started_at=exec.startedAt, + failed_at=exec.updatedAt, + error_message=error_message, + ) ) - results = [] - for exec in executions: - # Extract error from stats JSON field - error_message = None - if exec.stats and isinstance(exec.stats, dict): - error_message = exec.stats.get("error") - - results.append( - FailedExecutionDetail( - execution_id=exec.id, - graph_id=exec.agentGraphId, - graph_name=( - exec.AgentGraph.name - if exec.AgentGraph and exec.AgentGraph.name - else "Unknown" - ), - graph_version=exec.agentGraphVersion, - user_id=exec.userId, - user_email=exec.User.email if exec.User else None, - status=exec.executionStatus, - created_at=exec.createdAt, - started_at=exec.startedAt, - failed_at=exec.updatedAt, - error_message=error_message, - ) - ) - - return results - except Exception as e: - logger.error(f"Error getting failed execution details: {e}") - raise + return results async def cleanup_orphaned_execution(execution_id: str, admin_user_id: str) -> bool: @@ -1198,27 +1101,23 @@ async def cleanup_orphaned_execution(execution_id: str, admin_user_id: str) -> b Returns: True if execution was cleaned up, False otherwise """ - try: - logger.info( - f"Admin user {admin_user_id} cleaning up orphaned execution {execution_id}" - ) + logger.info( + f"Admin user {admin_user_id} cleaning up orphaned execution {execution_id}" + ) - # Update DB status directly without sending cancel signal - result = await AgentGraphExecution.prisma().update( - where={"id": execution_id}, - data={ - "executionStatus": AgentExecutionStatus.FAILED, - "updatedAt": datetime.now(timezone.utc), - }, - ) + # Update DB status directly without sending cancel signal + result = await AgentGraphExecution.prisma().update( + where={"id": execution_id}, + data={ + "executionStatus": AgentExecutionStatus.FAILED, + "updatedAt": datetime.now(timezone.utc), + }, + ) - logger.info( - f"Admin {admin_user_id} marked orphaned execution {execution_id} as FAILED" - ) - return result is not None - except Exception as e: - logger.error(f"Error cleaning up orphaned execution {execution_id}: {e}") - return False + logger.info( + f"Admin {admin_user_id} marked orphaned execution {execution_id} as FAILED" + ) + return result is not None async def stop_all_long_running_executions(admin_user_id: str) -> int: @@ -1231,67 +1130,59 @@ async def stop_all_long_running_executions(admin_user_id: str) -> int: Returns: Number of executions for which cancel signals were sent """ - import asyncio + logger.info(f"Admin user {admin_user_id} stopping ALL long-running executions") - try: - logger.info(f"Admin user {admin_user_id} stopping ALL long-running executions") + # Find all long-running executions (started running >24h ago) + cutoff = datetime.now(timezone.utc) - timedelta(hours=24) + executions = await get_graph_executions( + statuses=[AgentExecutionStatus.RUNNING], + started_time_lte=cutoff, + ) - # Find all long-running executions (started running >24h ago) - cutoff = datetime.now(timezone.utc) - timedelta(hours=24) - executions = await get_graph_executions( - statuses=[AgentExecutionStatus.RUNNING], - started_time_lte=cutoff, - ) - - if not executions: - logger.info("No long-running executions to stop") - return 0 - - queue_client = await get_async_execution_queue() - - # Send cancel signals in parallel - async def send_cancel_signal(exec_id: str) -> bool: - try: - await queue_client.publish_message( - routing_key="", - message=CancelExecutionEvent( - graph_exec_id=exec_id - ).model_dump_json(), - exchange=GRAPH_EXECUTION_CANCEL_EXCHANGE, - ) - return True - except Exception as e: - logger.error(f"Failed to send cancel for {exec_id}: {e}") - return False - - # Send cancel signals in parallel - await asyncio.gather( - *[send_cancel_signal(exec.id) for exec in executions], - return_exceptions=True, # Don't fail if some signals fail - ) - - # ALSO update DB status directly (don't rely on executor) - # This ensures executions are marked FAILED even if executor restarted - result = await AgentGraphExecution.prisma().update_many( - where={ - "executionStatus": AgentExecutionStatus.RUNNING, - "startedAt": {"lt": cutoff}, - }, - data={ - "executionStatus": AgentExecutionStatus.FAILED, - "updatedAt": datetime.now(timezone.utc), - }, - ) - - logger.info( - f"Admin {admin_user_id} stopped {result} long-running executions (sent cancel signals + updated DB)" - ) - - return result - except Exception as e: - logger.error(f"Error stopping all long-running executions: {e}") + if not executions: + logger.info("No long-running executions to stop") return 0 + queue_client = await get_async_execution_queue() + + # Send cancel signals in parallel + async def send_cancel_signal(exec_id: str) -> bool: + try: + await queue_client.publish_message( + routing_key="", + message=CancelExecutionEvent(graph_exec_id=exec_id).model_dump_json(), + exchange=GRAPH_EXECUTION_CANCEL_EXCHANGE, + ) + return True + except Exception as e: + logger.error(f"Failed to send cancel for {exec_id}: {e}") + return False + + # Send cancel signals in parallel + await asyncio.gather( + *[send_cancel_signal(exec.id) for exec in executions], + return_exceptions=True, # Don't fail if some signals fail + ) + + # ALSO update DB status directly (don't rely on executor) + # This ensures executions are marked FAILED even if executor restarted + result = await AgentGraphExecution.prisma().update_many( + where={ + "executionStatus": AgentExecutionStatus.RUNNING, + "startedAt": {"lt": cutoff}, + }, + data={ + "executionStatus": AgentExecutionStatus.FAILED, + "updatedAt": datetime.now(timezone.utc), + }, + ) + + logger.info( + f"Admin {admin_user_id} stopped {result} long-running executions (sent cancel signals + updated DB)" + ) + + return result + async def get_all_orphaned_execution_ids() -> List[str]: """ @@ -1300,18 +1191,14 @@ async def get_all_orphaned_execution_ids() -> List[str]: Returns: List of execution IDs that are orphaned """ - try: - cutoff = datetime.now(timezone.utc) - timedelta(hours=24) + cutoff = datetime.now(timezone.utc) - timedelta(hours=24) - executions = await get_graph_executions( - statuses=[AgentExecutionStatus.RUNNING, AgentExecutionStatus.QUEUED], - created_time_lte=cutoff, - ) + executions = await get_graph_executions( + statuses=[AgentExecutionStatus.RUNNING, AgentExecutionStatus.QUEUED], + created_time_lte=cutoff, + ) - return [e.id for e in executions] - except Exception as e: - logger.error(f"Error getting all orphaned execution IDs: {e}") - raise + return [e.id for e in executions] async def cleanup_orphaned_executions_bulk( @@ -1328,28 +1215,24 @@ async def cleanup_orphaned_executions_bulk( Returns: Number of executions successfully cleaned up """ - try: - logger.info( - f"Admin user {admin_user_id} cleaning up {len(execution_ids)} orphaned executions" - ) + logger.info( + f"Admin user {admin_user_id} cleaning up {len(execution_ids)} orphaned executions" + ) - # Update all executions in DB directly (no cancel signals) - result = await AgentGraphExecution.prisma().update_many( - where={"id": {"in": execution_ids}}, - data={ - "executionStatus": AgentExecutionStatus.FAILED, - "updatedAt": datetime.now(timezone.utc), - }, - ) + # Update all executions in DB directly (no cancel signals) + result = await AgentGraphExecution.prisma().update_many( + where={"id": {"in": execution_ids}}, + data={ + "executionStatus": AgentExecutionStatus.FAILED, + "updatedAt": datetime.now(timezone.utc), + }, + ) - logger.info( - f"Admin {admin_user_id} marked {result} orphaned executions as FAILED in DB" - ) + logger.info( + f"Admin {admin_user_id} marked {result} orphaned executions as FAILED in DB" + ) - return result - except Exception as e: - logger.error(f"Error cleaning up orphaned executions in bulk: {e}") - return 0 + return result async def get_all_stuck_queued_execution_ids() -> List[str]: @@ -1359,18 +1242,14 @@ async def get_all_stuck_queued_execution_ids() -> List[str]: Returns: List of execution IDs that are stuck in QUEUED status """ - try: - one_hour_ago = datetime.now(timezone.utc) - timedelta(hours=1) + one_hour_ago = datetime.now(timezone.utc) - timedelta(hours=1) - executions = await get_graph_executions( - statuses=[AgentExecutionStatus.QUEUED], - created_time_lte=one_hour_ago, - ) + executions = await get_graph_executions( + statuses=[AgentExecutionStatus.QUEUED], + created_time_lte=one_hour_ago, + ) - return [e.id for e in executions] - except Exception as e: - logger.error(f"Error getting all stuck queued execution IDs: {e}") - raise + return [e.id for e in executions] async def cleanup_all_stuck_queued_executions(admin_user_id: str) -> int: @@ -1384,30 +1263,24 @@ async def cleanup_all_stuck_queued_executions(admin_user_id: str) -> int: Returns: Number of executions successfully cleaned up """ - try: - logger.info( - f"Admin user {admin_user_id} cleaning up ALL stuck queued executions" - ) + logger.info(f"Admin user {admin_user_id} cleaning up ALL stuck queued executions") - # Find all stuck queued executions (>1h old) - one_hour_ago = datetime.now(timezone.utc) - timedelta(hours=1) + # Find all stuck queued executions (>1h old) + one_hour_ago = datetime.now(timezone.utc) - timedelta(hours=1) - result = await AgentGraphExecution.prisma().update_many( - where={ - "executionStatus": AgentExecutionStatus.QUEUED, - "createdAt": {"lt": one_hour_ago}, - }, - data={ - "executionStatus": AgentExecutionStatus.FAILED, - "updatedAt": datetime.now(timezone.utc), - }, - ) + result = await AgentGraphExecution.prisma().update_many( + where={ + "executionStatus": AgentExecutionStatus.QUEUED, + "createdAt": {"lt": one_hour_ago}, + }, + data={ + "executionStatus": AgentExecutionStatus.FAILED, + "updatedAt": datetime.now(timezone.utc), + }, + ) - logger.info( - f"Admin {admin_user_id} marked {result} stuck queued executions as FAILED in DB" - ) + logger.info( + f"Admin {admin_user_id} marked {result} stuck queued executions as FAILED in DB" + ) - return result - except Exception as e: - logger.error(f"Error cleaning up all stuck queued executions: {e}") - return 0 + return result diff --git a/autogpt_platform/backend/backend/data/execution.py b/autogpt_platform/backend/backend/data/execution.py index 00ae40bd2f..c4c03e9b6c 100644 --- a/autogpt_platform/backend/backend/data/execution.py +++ b/autogpt_platform/backend/backend/data/execution.py @@ -546,6 +546,8 @@ async def get_graph_executions_count( created_time_lte: Optional[datetime] = None, started_time_gte: Optional[datetime] = None, started_time_lte: Optional[datetime] = None, + updated_time_gte: Optional[datetime] = None, + updated_time_lte: Optional[datetime] = None, ) -> int: """ Get count of graph executions with optional filters. @@ -558,6 +560,8 @@ async def get_graph_executions_count( created_time_lte: Optional maximum creation time started_time_gte: Optional minimum start time (when execution started running) started_time_lte: Optional maximum start time (when execution started running) + updated_time_gte: Optional minimum update time + updated_time_lte: Optional maximum update time Returns: Count of matching graph executions @@ -584,6 +588,12 @@ async def get_graph_executions_count( "lte": started_time_lte or datetime.max.replace(tzinfo=timezone.utc), } + if updated_time_gte or updated_time_lte: + where_filter["updatedAt"] = { + "gte": updated_time_gte or datetime.min.replace(tzinfo=timezone.utc), + "lte": updated_time_lte or datetime.max.replace(tzinfo=timezone.utc), + } + if statuses: where_filter["OR"] = [{"executionStatus": status} for status in statuses]