diff --git a/autogpt_platform/backend/backend/copilot/tools/agent_output.py b/autogpt_platform/backend/backend/copilot/tools/agent_output.py index d11c5ade29..88b26ccc5f 100644 --- a/autogpt_platform/backend/backend/copilot/tools/agent_output.py +++ b/autogpt_platform/backend/backend/copilot/tools/agent_output.py @@ -268,6 +268,7 @@ class AgentOutputTool(BaseTool): ExecutionStatus.RUNNING, ExecutionStatus.QUEUED, ExecutionStatus.INCOMPLETE, + ExecutionStatus.REVIEW, ] ) @@ -349,6 +350,11 @@ class AgentOutputTool(BaseTool): message = f"Execution for agent '{agent.name}' failed" elif execution.status == ExecutionStatus.TERMINATED: message = f"Execution for agent '{agent.name}' was terminated" + elif execution.status == ExecutionStatus.REVIEW: + message = ( + f"Execution for agent '{agent.name}' is awaiting human review. " + "The user needs to approve it before it can continue." + ) elif execution.status in ( ExecutionStatus.RUNNING, ExecutionStatus.QUEUED, diff --git a/autogpt_platform/backend/backend/copilot/tools/execution_utils.py b/autogpt_platform/backend/backend/copilot/tools/execution_utils.py index 4f9d446dc6..193f98dceb 100644 --- a/autogpt_platform/backend/backend/copilot/tools/execution_utils.py +++ b/autogpt_platform/backend/backend/copilot/tools/execution_utils.py @@ -23,6 +23,16 @@ TERMINAL_STATUSES = frozenset( } ) +# Statuses where execution is paused but not finished (e.g. human-in-the-loop) +PAUSED_STATUSES = frozenset( + { + ExecutionStatus.REVIEW, + } +) + +# Statuses that mean "stop waiting" (terminal or paused) +STOP_WAITING_STATUSES = TERMINAL_STATUSES | PAUSED_STATUSES + async def wait_for_execution( user_id: str, @@ -31,10 +41,10 @@ async def wait_for_execution( timeout_seconds: int, ) -> GraphExecution | None: """ - Wait for an execution to reach a terminal status using Redis pubsub. + Wait for an execution to reach a terminal or paused status using Redis pubsub. - Uses asyncio.wait_for to ensure timeout is respected even when no events - are received. + Handles the race condition between checking status and subscribing by + re-checking the DB after the subscription is established. Args: user_id: User ID @@ -47,7 +57,7 @@ async def wait_for_execution( """ exec_db = execution_db() - # First check current status - maybe it's already done + # Quick check — maybe it's already done execution = await exec_db.get_graph_execution( user_id=user_id, execution_id=execution_id, @@ -56,10 +66,10 @@ async def wait_for_execution( if not execution: return None - # If already in terminal state, return immediately - if execution.status in TERMINAL_STATUSES: + if execution.status in STOP_WAITING_STATUSES: logger.debug( - f"Execution {execution_id} already in terminal state: {execution.status}" + f"Execution {execution_id} already in stop-waiting state: " + f"{execution.status}" ) return execution @@ -68,14 +78,12 @@ async def wait_for_execution( f"(current status: {execution.status})" ) - # Subscribe to execution updates via Redis pubsub event_bus = AsyncRedisExecutionEventBus() channel_key = f"{user_id}/{graph_id}/{execution_id}" try: - # Use wait_for to enforce timeout on the entire listen operation result = await asyncio.wait_for( - _listen_for_terminal_status( + _subscribe_and_wait( event_bus, channel_key, user_id, execution_id, exec_db ), timeout=timeout_seconds, @@ -85,6 +93,8 @@ async def wait_for_execution( logger.info(f"Timeout waiting for execution {execution_id}") except Exception as e: logger.error(f"Error waiting for execution: {e}", exc_info=True) + finally: + await event_bus.close() # Return current state on timeout/error return await exec_db.get_graph_execution( @@ -94,7 +104,7 @@ async def wait_for_execution( ) -async def _listen_for_terminal_status( +async def _subscribe_and_wait( event_bus: AsyncRedisExecutionEventBus, channel_key: str, user_id: str, @@ -102,24 +112,54 @@ async def _listen_for_terminal_status( exec_db: Any, ) -> GraphExecution | None: """ - Listen for execution events until a terminal status is reached. + Subscribe to execution events and wait for a terminal/paused status. - This is a helper that gets wrapped in asyncio.wait_for for timeout handling. + To avoid the race condition where the execution completes between the + initial DB check and the Redis subscription, we: + 1. Start listening (which subscribes internally) + 2. Re-check the DB after subscription is active + 3. If still running, wait for pubsub events """ - async for event in event_bus.listen_events(channel_key): - # Only process GraphExecutionEvents (not NodeExecutionEvents) - if isinstance(event, GraphExecutionEvent): - logger.debug(f"Received execution update: {event.status}") - if event.status in TERMINAL_STATUSES: - # Fetch full execution with outputs - return await exec_db.get_graph_execution( - user_id=user_id, - execution_id=execution_id, - include_node_executions=False, - ) + listen_iter = event_bus.listen_events(channel_key).__aiter__() - # Should not reach here normally (generator should yield indefinitely) - return None + # Start a background task to consume pubsub events + result_event: GraphExecutionEvent | None = None + done = asyncio.Event() + + async def _consume(): + nonlocal result_event + async for event in listen_iter: + if isinstance(event, GraphExecutionEvent): + logger.debug(f"Received execution update: {event.status}") + if event.status in STOP_WAITING_STATUSES: + result_event = event + done.set() + return + + consume_task = asyncio.create_task(_consume()) + + # Give the subscription a moment to establish, then re-check DB + await asyncio.sleep(0.1) + + execution = await exec_db.get_graph_execution( + user_id=user_id, + execution_id=execution_id, + include_node_executions=False, + ) + if execution and execution.status in STOP_WAITING_STATUSES: + consume_task.cancel() + return execution + + # Wait for the pubsub consumer to find a terminal event + await done.wait() + consume_task.cancel() + + # Fetch full execution + return await exec_db.get_graph_execution( + user_id=user_id, + execution_id=execution_id, + include_node_executions=False, + ) def get_execution_outputs(execution: GraphExecution | None) -> dict[str, Any] | None: diff --git a/autogpt_platform/backend/backend/copilot/tools/run_agent.py b/autogpt_platform/backend/backend/copilot/tools/run_agent.py index 4d56f2e2a8..e63a5d2258 100644 --- a/autogpt_platform/backend/backend/copilot/tools/run_agent.py +++ b/autogpt_platform/backend/backend/copilot/tools/run_agent.py @@ -510,12 +510,34 @@ class RunAgentTool(BaseTool): ), session_id=session_id, ) + elif completed and completed.status == ExecutionStatus.TERMINATED: + return ErrorResponse( + message=( + f"Agent '{library_agent.name}' execution was terminated. " + f"View details at {library_agent_link}." + ), + session_id=session_id, + ) + elif completed and completed.status == ExecutionStatus.REVIEW: + return ExecutionStartedResponse( + message=( + f"Agent '{library_agent.name}' is awaiting human review. " + f"Check at {library_agent_link}." + ), + session_id=session_id, + execution_id=execution.id, + graph_id=library_agent.graph_id, + graph_name=library_agent.name, + library_agent_id=library_agent.id, + library_agent_link=library_agent_link, + ) else: status = completed.status.value if completed else "unknown" return ExecutionStartedResponse( message=( f"Agent '{library_agent.name}' is still {status} after " - f"{wait_for_result}s. Check results later at {library_agent_link}. " + f"{wait_for_result}s. Check results later at " + f"{library_agent_link}. " f"Use view_agent_output with wait_if_running to check again." ), session_id=session_id,