From 92ddb574602aa5f711a6534dcdfaf2828438a3a0 Mon Sep 17 00:00:00 2001 From: Otto Date: Tue, 17 Feb 2026 08:47:08 +0000 Subject: [PATCH] style: move helper after function in agent_output.py too --- .../api/features/chat/tools/agent_output.py | 168 +++++++++--------- 1 file changed, 84 insertions(+), 84 deletions(-) diff --git a/autogpt_platform/backend/backend/api/features/chat/tools/agent_output.py b/autogpt_platform/backend/backend/api/features/chat/tools/agent_output.py index 747e9df5b8..aa43ac751d 100644 --- a/autogpt_platform/backend/backend/api/features/chat/tools/agent_output.py +++ b/autogpt_platform/backend/backend/api/features/chat/tools/agent_output.py @@ -192,90 +192,6 @@ class AgentOutputTool(BaseTool): def requires_auth(self) -> bool: return True - async def _wait_for_execution_completion( - self, - user_id: str, - graph_id: str, - execution_id: str, - timeout_seconds: int, - ) -> GraphExecution | None: - """ - Wait for an execution to reach a terminal status using Redis pubsub. - - Args: - user_id: User ID - graph_id: Graph ID - execution_id: Execution ID to wait for - timeout_seconds: Max seconds to wait - - Returns: - The execution with current status, or None on error - """ - # First check current status - maybe it's already done - execution = await execution_db.get_graph_execution( - user_id=user_id, - execution_id=execution_id, - include_node_executions=False, - ) - if not execution: - return None - - # If already in terminal state, return immediately - if execution.status in TERMINAL_STATUSES: - logger.debug( - f"Execution {execution_id} already in terminal state: {execution.status}" - ) - return execution - - logger.info( - f"Waiting up to {timeout_seconds}s for execution {execution_id} " - f"(current status: {execution.status})" - ) - - # Subscribe to execution updates via Redis pubsub - event_bus = AsyncRedisExecutionEventBus() - channel_key = f"{user_id}/{graph_id}/{execution_id}" - - try: - deadline = asyncio.get_event_loop().time() + timeout_seconds - - async for event in event_bus.listen_events(channel_key): - # Check if we've exceeded the timeout - remaining = deadline - asyncio.get_event_loop().time() - if remaining <= 0: - logger.info(f"Timeout waiting for execution {execution_id}") - break - - # Only process GraphExecutionEvents (not NodeExecutionEvents) - if isinstance(event, GraphExecutionEvent): - logger.debug(f"Received execution update: {event.status}") - if event.status in TERMINAL_STATUSES: - # Fetch full execution with outputs - return await execution_db.get_graph_execution( - user_id=user_id, - execution_id=execution_id, - include_node_executions=False, - ) - - except asyncio.TimeoutError: - logger.info(f"Timeout waiting for execution {execution_id}") - except Exception as e: - logger.error(f"Error waiting for execution: {e}", exc_info=True) - finally: - # Clean up pubsub connection - try: - if hasattr(event_bus, "_pubsub") and event_bus._pubsub: - await event_bus._pubsub.close() - except Exception: - pass - - # Return current state on timeout - return await execution_db.get_graph_execution( - user_id=user_id, - execution_id=execution_id, - include_node_executions=False, - ) - async def _resolve_agent( self, user_id: str, @@ -616,3 +532,87 @@ class AgentOutputTool(BaseTool): ) return self._build_response(agent, execution, available_executions, session_id) + + async def _wait_for_execution_completion( + self, + user_id: str, + graph_id: str, + execution_id: str, + timeout_seconds: int, + ) -> GraphExecution | None: + """ + Wait for an execution to reach a terminal status using Redis pubsub. + + Args: + user_id: User ID + graph_id: Graph ID + execution_id: Execution ID to wait for + timeout_seconds: Max seconds to wait + + Returns: + The execution with current status, or None on error + """ + # First check current status - maybe it's already done + execution = await execution_db.get_graph_execution( + user_id=user_id, + execution_id=execution_id, + include_node_executions=False, + ) + if not execution: + return None + + # If already in terminal state, return immediately + if execution.status in TERMINAL_STATUSES: + logger.debug( + f"Execution {execution_id} already in terminal state: {execution.status}" + ) + return execution + + logger.info( + f"Waiting up to {timeout_seconds}s for execution {execution_id} " + f"(current status: {execution.status})" + ) + + # Subscribe to execution updates via Redis pubsub + event_bus = AsyncRedisExecutionEventBus() + channel_key = f"{user_id}/{graph_id}/{execution_id}" + + try: + deadline = asyncio.get_event_loop().time() + timeout_seconds + + async for event in event_bus.listen_events(channel_key): + # Check if we've exceeded the timeout + remaining = deadline - asyncio.get_event_loop().time() + if remaining <= 0: + logger.info(f"Timeout waiting for execution {execution_id}") + break + + # Only process GraphExecutionEvents (not NodeExecutionEvents) + if isinstance(event, GraphExecutionEvent): + logger.debug(f"Received execution update: {event.status}") + if event.status in TERMINAL_STATUSES: + # Fetch full execution with outputs + return await execution_db.get_graph_execution( + user_id=user_id, + execution_id=execution_id, + include_node_executions=False, + ) + + except asyncio.TimeoutError: + logger.info(f"Timeout waiting for execution {execution_id}") + except Exception as e: + logger.error(f"Error waiting for execution: {e}", exc_info=True) + finally: + # Clean up pubsub connection + try: + if hasattr(event_bus, "_pubsub") and event_bus._pubsub: + await event_bus._pubsub.close() + except Exception: + pass + + # Return current state on timeout + return await execution_db.get_graph_execution( + user_id=user_id, + execution_id=execution_id, + include_node_executions=False, + )