From aeb3e8a129297afa9ffecce9d6282e94c214d74e Mon Sep 17 00:00:00 2001 From: Otto Date: Tue, 17 Feb 2026 18:10:14 +0000 Subject: [PATCH] feat(copilot): add wait_for_result to run_agent and wait_if_running to agent_output SECRT-2003: Allow CoPilot to wait for agent execution completion. - Add execution_utils.py with Redis pubsub-based wait_for_execution() - run_agent: new wait_for_result param (0-300s), returns outputs on completion - agent_output: new wait_if_running param, waits for running executions - Both tools handle timeout gracefully, returning current state --- .../backend/copilot/tools/agent_output.py | 76 ++++++++++- .../backend/copilot/tools/execution_utils.py | 129 ++++++++++++++++++ .../backend/copilot/tools/run_agent.py | 66 ++++++++- 3 files changed, 264 insertions(+), 7 deletions(-) create mode 100644 autogpt_platform/backend/backend/copilot/tools/execution_utils.py diff --git a/autogpt_platform/backend/backend/copilot/tools/agent_output.py b/autogpt_platform/backend/backend/copilot/tools/agent_output.py index fe4767d09e..d11c5ade29 100644 --- a/autogpt_platform/backend/backend/copilot/tools/agent_output.py +++ b/autogpt_platform/backend/backend/copilot/tools/agent_output.py @@ -5,7 +5,7 @@ import re from datetime import datetime, timedelta, timezone from typing import Any -from pydantic import BaseModel, field_validator +from pydantic import BaseModel, Field, field_validator from backend.api.features.library.model import LibraryAgent from backend.copilot.model import ChatSession @@ -13,6 +13,7 @@ from backend.data.db_accessors import execution_db, library_db from backend.data.execution import ExecutionStatus, GraphExecution, GraphExecutionMeta from .base import BaseTool +from .execution_utils import TERMINAL_STATUSES, wait_for_execution from .models import ( AgentOutputResponse, ErrorResponse, @@ -33,6 +34,7 @@ class AgentOutputInput(BaseModel): store_slug: str = "" execution_id: str = "" run_time: str = "latest" + wait_if_running: int = Field(default=0, ge=0, le=300) @field_validator( "agent_name", @@ -116,6 +118,11 @@ class AgentOutputTool(BaseTool): Select which run to retrieve using: - execution_id: Specific execution ID - run_time: 'latest' (default), 'yesterday', 'last week', or ISO date 'YYYY-MM-DD' + + Wait for completion (optional): + - wait_if_running: Max seconds to wait if execution is still running (0-300). + If the execution is running/queued, waits up to this many seconds for completion. + Returns current status on timeout. If already finished, returns immediately. """ @property @@ -145,6 +152,13 @@ class AgentOutputTool(BaseTool): "Time filter: 'latest', 'yesterday', 'last week', or 'YYYY-MM-DD'" ), }, + "wait_if_running": { + "type": "integer", + "description": ( + "Max seconds to wait if execution is still running (0-300). " + "If running, waits for completion. Returns current state on timeout." + ), + }, }, "required": [], } @@ -224,10 +238,14 @@ class AgentOutputTool(BaseTool): execution_id: str | None, time_start: datetime | None, time_end: datetime | None, + include_running: bool = False, ) -> tuple[GraphExecution | None, list[GraphExecutionMeta], str | None]: """ Fetch execution(s) based on filters. Returns (single_execution, available_executions_meta, error_message). + + Args: + include_running: If True, also look for running/queued executions (for waiting) """ exec_db = execution_db() @@ -242,11 +260,22 @@ class AgentOutputTool(BaseTool): return None, [], f"Execution '{execution_id}' not found" return execution, [], None - # Get completed executions with time filters + # Determine which statuses to query + statuses = [ExecutionStatus.COMPLETED] + if include_running: + statuses.extend( + [ + ExecutionStatus.RUNNING, + ExecutionStatus.QUEUED, + ExecutionStatus.INCOMPLETE, + ] + ) + + # Get executions with time filters executions = await exec_db.get_graph_executions( graph_id=graph_id, user_id=user_id, - statuses=[ExecutionStatus.COMPLETED], + statuses=statuses, created_time_gte=time_start, created_time_lte=time_end, limit=10, @@ -313,10 +342,28 @@ class AgentOutputTool(BaseTool): for e in available_executions[:5] ] - message = f"Found execution outputs for agent '{agent.name}'" + # Build appropriate message based on execution status + if execution.status == ExecutionStatus.COMPLETED: + message = f"Found execution outputs for agent '{agent.name}'" + elif execution.status == ExecutionStatus.FAILED: + message = f"Execution for agent '{agent.name}' failed" + elif execution.status == ExecutionStatus.TERMINATED: + message = f"Execution for agent '{agent.name}' was terminated" + elif execution.status in ( + ExecutionStatus.RUNNING, + ExecutionStatus.QUEUED, + ExecutionStatus.INCOMPLETE, + ): + message = ( + f"Execution for agent '{agent.name}' is still {execution.status.value}. " + "Results may be incomplete. Use wait_if_running to wait for completion." + ) + else: + message = f"Found execution for agent '{agent.name}' (status: {execution.status.value})" + if len(available_executions) > 1: message += ( - f". Showing latest of {len(available_executions)} matching executions." + f" Showing latest of {len(available_executions)} matching executions." ) return AgentOutputResponse( @@ -431,13 +478,17 @@ class AgentOutputTool(BaseTool): # Parse time expression time_start, time_end = parse_time_expression(input_data.run_time) - # Fetch execution(s) + # Check if we should wait for running executions + wait_timeout = input_data.wait_if_running + + # Fetch execution(s) - include running if we're going to wait execution, available_executions, exec_error = await self._get_execution( user_id=user_id, graph_id=agent.graph_id, execution_id=input_data.execution_id or None, time_start=time_start, time_end=time_end, + include_running=wait_timeout > 0, ) if exec_error: @@ -446,4 +497,17 @@ class AgentOutputTool(BaseTool): session_id=session_id, ) + # If we have an execution that's still running and we should wait + if execution and wait_timeout > 0 and execution.status not in TERMINAL_STATUSES: + logger.info( + f"Execution {execution.id} is {execution.status}, " + f"waiting up to {wait_timeout}s for completion" + ) + execution = await wait_for_execution( + user_id=user_id, + graph_id=agent.graph_id, + execution_id=execution.id, + timeout_seconds=wait_timeout, + ) + return self._build_response(agent, execution, available_executions, session_id) diff --git a/autogpt_platform/backend/backend/copilot/tools/execution_utils.py b/autogpt_platform/backend/backend/copilot/tools/execution_utils.py new file mode 100644 index 0000000000..4f9d446dc6 --- /dev/null +++ b/autogpt_platform/backend/backend/copilot/tools/execution_utils.py @@ -0,0 +1,129 @@ +"""Shared utilities for execution waiting and status handling.""" + +import asyncio +import logging +from typing import Any + +from backend.data.db_accessors import execution_db +from backend.data.execution import ( + AsyncRedisExecutionEventBus, + ExecutionStatus, + GraphExecution, + GraphExecutionEvent, +) + +logger = logging.getLogger(__name__) + +# Terminal statuses that indicate execution is complete +TERMINAL_STATUSES = frozenset( + { + ExecutionStatus.COMPLETED, + ExecutionStatus.FAILED, + ExecutionStatus.TERMINATED, + } +) + + +async def wait_for_execution( + user_id: str, + graph_id: str, + execution_id: str, + timeout_seconds: int, +) -> GraphExecution | None: + """ + Wait for an execution to reach a terminal status using Redis pubsub. + + Uses asyncio.wait_for to ensure timeout is respected even when no events + are received. + + Args: + user_id: User ID + graph_id: Graph ID + execution_id: Execution ID to wait for + timeout_seconds: Max seconds to wait + + Returns: + The execution with current status, or None if not found + """ + exec_db = execution_db() + + # First check current status - maybe it's already done + execution = await exec_db.get_graph_execution( + user_id=user_id, + execution_id=execution_id, + include_node_executions=False, + ) + if not execution: + return None + + # If already in terminal state, return immediately + if execution.status in TERMINAL_STATUSES: + logger.debug( + f"Execution {execution_id} already in terminal state: {execution.status}" + ) + return execution + + logger.info( + f"Waiting up to {timeout_seconds}s for execution {execution_id} " + f"(current status: {execution.status})" + ) + + # Subscribe to execution updates via Redis pubsub + event_bus = AsyncRedisExecutionEventBus() + channel_key = f"{user_id}/{graph_id}/{execution_id}" + + try: + # Use wait_for to enforce timeout on the entire listen operation + result = await asyncio.wait_for( + _listen_for_terminal_status( + event_bus, channel_key, user_id, execution_id, exec_db + ), + timeout=timeout_seconds, + ) + return result + except asyncio.TimeoutError: + logger.info(f"Timeout waiting for execution {execution_id}") + except Exception as e: + logger.error(f"Error waiting for execution: {e}", exc_info=True) + + # Return current state on timeout/error + return await exec_db.get_graph_execution( + user_id=user_id, + execution_id=execution_id, + include_node_executions=False, + ) + + +async def _listen_for_terminal_status( + event_bus: AsyncRedisExecutionEventBus, + channel_key: str, + user_id: str, + execution_id: str, + exec_db: Any, +) -> GraphExecution | None: + """ + Listen for execution events until a terminal status is reached. + + This is a helper that gets wrapped in asyncio.wait_for for timeout handling. + """ + async for event in event_bus.listen_events(channel_key): + # Only process GraphExecutionEvents (not NodeExecutionEvents) + if isinstance(event, GraphExecutionEvent): + logger.debug(f"Received execution update: {event.status}") + if event.status in TERMINAL_STATUSES: + # Fetch full execution with outputs + return await exec_db.get_graph_execution( + user_id=user_id, + execution_id=execution_id, + include_node_executions=False, + ) + + # Should not reach here normally (generator should yield indefinitely) + return None + + +def get_execution_outputs(execution: GraphExecution | None) -> dict[str, Any] | None: + """Extract outputs from an execution, or return None.""" + if execution is None: + return None + return execution.outputs diff --git a/autogpt_platform/backend/backend/copilot/tools/run_agent.py b/autogpt_platform/backend/backend/copilot/tools/run_agent.py index 46af6fbcb0..4d56f2e2a8 100644 --- a/autogpt_platform/backend/backend/copilot/tools/run_agent.py +++ b/autogpt_platform/backend/backend/copilot/tools/run_agent.py @@ -9,6 +9,7 @@ from backend.copilot.config import ChatConfig from backend.copilot.model import ChatSession from backend.copilot.tracking import track_agent_run_success, track_agent_scheduled from backend.data.db_accessors import graph_db, library_db, user_db +from backend.data.execution import ExecutionStatus from backend.data.graph import GraphModel from backend.data.model import CredentialsMetaInput from backend.executor import utils as execution_utils @@ -24,6 +25,7 @@ from .helpers import get_inputs_from_schema from .models import ( AgentDetails, AgentDetailsResponse, + AgentOutputResponse, ErrorResponse, ExecutionOptions, ExecutionStartedResponse, @@ -33,6 +35,7 @@ from .models import ( ToolResponseBase, UserReadiness, ) +from .execution_utils import get_execution_outputs, wait_for_execution from .utils import ( build_missing_credentials_from_graph, extract_credentials_from_schema, @@ -66,6 +69,7 @@ class RunAgentInput(BaseModel): schedule_name: str = "" cron: str = "" timezone: str = "UTC" + wait_for_result: int = Field(default=0, ge=0, le=300) @field_validator( "username_agent_slug", @@ -147,6 +151,14 @@ class RunAgentTool(BaseTool): "type": "string", "description": "IANA timezone for schedule (default: UTC)", }, + "wait_for_result": { + "type": "integer", + "description": ( + "Max seconds to wait for execution to complete (0-300). " + "If >0, blocks until the execution finishes or times out. " + "Returns execution outputs when complete." + ), + }, }, "required": [], } @@ -341,6 +353,7 @@ class RunAgentTool(BaseTool): graph=graph, graph_credentials=graph_credentials, inputs=params.inputs, + wait_for_result=params.wait_for_result, ) except NotFoundError as e: @@ -424,8 +437,9 @@ class RunAgentTool(BaseTool): graph: GraphModel, graph_credentials: dict[str, CredentialsMetaInput], inputs: dict[str, Any], + wait_for_result: int = 0, ) -> ToolResponseBase: - """Execute an agent immediately.""" + """Execute an agent immediately, optionally waiting for completion.""" session_id = session.session_id # Check rate limits @@ -462,6 +476,56 @@ class RunAgentTool(BaseTool): ) library_agent_link = f"/library/agents/{library_agent.id}" + + # If wait_for_result is requested, wait for execution to complete + if wait_for_result > 0: + logger.info( + f"Waiting up to {wait_for_result}s for execution {execution.id}" + ) + completed = await wait_for_execution( + user_id=user_id, + graph_id=library_agent.graph_id, + execution_id=execution.id, + timeout_seconds=wait_for_result, + ) + + if completed and completed.status == ExecutionStatus.COMPLETED: + outputs = get_execution_outputs(completed) + return AgentOutputResponse( + message=( + f"Agent '{library_agent.name}' completed successfully. " + f"View at {library_agent_link}." + ), + session_id=session_id, + execution_id=execution.id, + graph_id=library_agent.graph_id, + graph_name=library_agent.name, + outputs=outputs or {}, + ) + elif completed and completed.status == ExecutionStatus.FAILED: + return ErrorResponse( + message=( + f"Agent '{library_agent.name}' execution failed. " + f"View details at {library_agent_link}." + ), + session_id=session_id, + ) + else: + status = completed.status.value if completed else "unknown" + return ExecutionStartedResponse( + message=( + f"Agent '{library_agent.name}' is still {status} after " + f"{wait_for_result}s. Check results later at {library_agent_link}. " + f"Use view_agent_output with wait_if_running to check again." + ), + session_id=session_id, + execution_id=execution.id, + graph_id=library_agent.graph_id, + graph_name=library_agent.name, + library_agent_id=library_agent.id, + library_agent_link=library_agent_link, + ) + return ExecutionStartedResponse( message=( f"Agent '{library_agent.name}' execution started successfully. "