diff --git a/autogpt_platform/backend/backend/api/features/chat/tools/run_agent.py b/autogpt_platform/backend/backend/api/features/chat/tools/run_agent.py index 2622fc7234..34a9a00439 100644 --- a/autogpt_platform/backend/backend/api/features/chat/tools/run_agent.py +++ b/autogpt_platform/backend/backend/api/features/chat/tools/run_agent.py @@ -458,96 +458,6 @@ class RunAgentTool(BaseTool): trigger_info=trigger_info, ) - async def _wait_for_execution_completion( - self, - user_id: str, - graph_id: str, - execution_id: str, - timeout_seconds: int, - ) -> tuple[ExecutionStatus, dict[str, Any] | None]: - """ - Wait for an execution to reach a terminal status using Redis pubsub. - - Args: - user_id: User ID - graph_id: Graph ID - execution_id: Execution ID to wait for - timeout_seconds: Max seconds to wait - - Returns: - Tuple of (final_status, outputs_dict_or_None) - """ - # First check current status - maybe it's already done - execution = await execution_db.get_graph_execution( - user_id=user_id, - execution_id=execution_id, - include_node_executions=False, - ) - if not execution: - return ExecutionStatus.FAILED, None - - # If already in terminal state, return immediately - if execution.status in TERMINAL_STATUSES: - logger.debug( - f"Execution {execution_id} already in terminal state: {execution.status}" - ) - return execution.status, execution.outputs - - logger.info( - f"Waiting up to {timeout_seconds}s for execution {execution_id} " - f"(current status: {execution.status})" - ) - - # Subscribe to execution updates via Redis pubsub - event_bus = AsyncRedisExecutionEventBus() - channel_key = f"{user_id}/{graph_id}/{execution_id}" - - try: - deadline = asyncio.get_event_loop().time() + timeout_seconds - - async for event in event_bus.listen_events(channel_key): - # Check if we've exceeded the timeout - remaining = deadline - asyncio.get_event_loop().time() - if remaining <= 0: - logger.info(f"Timeout waiting for execution {execution_id}") - break - - # Only process GraphExecutionEvents (not NodeExecutionEvents) - if isinstance(event, GraphExecutionEvent): - logger.debug(f"Received execution update: {event.status}") - if event.status in TERMINAL_STATUSES: - # Fetch full execution with outputs - final_exec = await execution_db.get_graph_execution( - user_id=user_id, - execution_id=execution_id, - include_node_executions=False, - ) - if final_exec: - return final_exec.status, final_exec.outputs - return event.status, None - - except asyncio.TimeoutError: - logger.info(f"Timeout waiting for execution {execution_id}") - except Exception as e: - logger.error(f"Error waiting for execution: {e}", exc_info=True) - finally: - # Clean up pubsub connection - try: - if hasattr(event_bus, "_pubsub") and event_bus._pubsub: - await event_bus._pubsub.close() - except Exception: - pass - - # Return current state on timeout - execution = await execution_db.get_graph_execution( - user_id=user_id, - execution_id=execution_id, - include_node_executions=False, - ) - if execution: - return execution.status, execution.outputs - return ExecutionStatus.QUEUED, None - async def _run_agent( self, user_id: str, @@ -660,6 +570,96 @@ class RunAgentTool(BaseTool): library_agent_link=library_agent_link, ) + async def _wait_for_execution_completion( + self, + user_id: str, + graph_id: str, + execution_id: str, + timeout_seconds: int, + ) -> tuple[ExecutionStatus, dict[str, Any] | None]: + """ + Wait for an execution to reach a terminal status using Redis pubsub. + + Args: + user_id: User ID + graph_id: Graph ID + execution_id: Execution ID to wait for + timeout_seconds: Max seconds to wait + + Returns: + Tuple of (final_status, outputs_dict_or_None) + """ + # First check current status - maybe it's already done + execution = await execution_db.get_graph_execution( + user_id=user_id, + execution_id=execution_id, + include_node_executions=False, + ) + if not execution: + return ExecutionStatus.FAILED, None + + # If already in terminal state, return immediately + if execution.status in TERMINAL_STATUSES: + logger.debug( + f"Execution {execution_id} already in terminal state: {execution.status}" + ) + return execution.status, execution.outputs + + logger.info( + f"Waiting up to {timeout_seconds}s for execution {execution_id} " + f"(current status: {execution.status})" + ) + + # Subscribe to execution updates via Redis pubsub + event_bus = AsyncRedisExecutionEventBus() + channel_key = f"{user_id}/{graph_id}/{execution_id}" + + try: + deadline = asyncio.get_event_loop().time() + timeout_seconds + + async for event in event_bus.listen_events(channel_key): + # Check if we've exceeded the timeout + remaining = deadline - asyncio.get_event_loop().time() + if remaining <= 0: + logger.info(f"Timeout waiting for execution {execution_id}") + break + + # Only process GraphExecutionEvents (not NodeExecutionEvents) + if isinstance(event, GraphExecutionEvent): + logger.debug(f"Received execution update: {event.status}") + if event.status in TERMINAL_STATUSES: + # Fetch full execution with outputs + final_exec = await execution_db.get_graph_execution( + user_id=user_id, + execution_id=execution_id, + include_node_executions=False, + ) + if final_exec: + return final_exec.status, final_exec.outputs + return event.status, None + + except asyncio.TimeoutError: + logger.info(f"Timeout waiting for execution {execution_id}") + except Exception as e: + logger.error(f"Error waiting for execution: {e}", exc_info=True) + finally: + # Clean up pubsub connection + try: + if hasattr(event_bus, "_pubsub") and event_bus._pubsub: + await event_bus._pubsub.close() + except Exception: + pass + + # Return current state on timeout + execution = await execution_db.get_graph_execution( + user_id=user_id, + execution_id=execution_id, + include_node_executions=False, + ) + if execution: + return execution.status, execution.outputs + return ExecutionStatus.QUEUED, None + async def _schedule_agent( self, user_id: str,