mirror of
https://github.com/Significant-Gravitas/AutoGPT.git
synced 2026-04-08 03:00:28 -04:00
feat(chat): add check_operation_status tool for long-running ops
Lets the CoPilot agent query whether a create_agent/edit_agent operation is still running, completed, or failed. Accepts operation_id or task_id from a previous operation_started response and looks up the task status in Redis via stream_registry.
This commit is contained in:
@@ -10,6 +10,7 @@ from .add_understanding import AddUnderstandingTool
|
||||
from .agent_output import AgentOutputTool
|
||||
from .base import BaseTool
|
||||
from .bash_exec import BashExecTool
|
||||
from .check_operation_status import CheckOperationStatusTool
|
||||
from .create_agent import CreateAgentTool
|
||||
from .customize_agent import CustomizeAgentTool
|
||||
from .edit_agent import EditAgentTool
|
||||
@@ -45,6 +46,7 @@ TOOL_REGISTRY: dict[str, BaseTool] = {
|
||||
"run_agent": RunAgentTool(),
|
||||
"run_block": RunBlockTool(),
|
||||
"view_agent_output": AgentOutputTool(),
|
||||
"check_operation_status": CheckOperationStatusTool(),
|
||||
"search_docs": SearchDocsTool(),
|
||||
"get_doc_page": GetDocPageTool(),
|
||||
# Web fetch for safe URL retrieval
|
||||
|
||||
@@ -0,0 +1,127 @@
|
||||
"""CheckOperationStatusTool — query the status of a long-running operation."""
|
||||
|
||||
import logging
|
||||
from typing import Any
|
||||
|
||||
from backend.api.features.chat.model import ChatSession
|
||||
from backend.api.features.chat.tools.base import BaseTool
|
||||
from backend.api.features.chat.tools.models import (
|
||||
ErrorResponse,
|
||||
ResponseType,
|
||||
ToolResponseBase,
|
||||
)
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class OperationStatusResponse(ToolResponseBase):
|
||||
"""Response for check_operation_status tool."""
|
||||
|
||||
type: ResponseType = ResponseType.OPERATION_STATUS
|
||||
task_id: str
|
||||
operation_id: str
|
||||
status: str # "running", "completed", "failed"
|
||||
tool_name: str | None = None
|
||||
message: str = ""
|
||||
|
||||
|
||||
class CheckOperationStatusTool(BaseTool):
|
||||
"""Check the status of a long-running operation (create_agent, edit_agent, etc.).
|
||||
|
||||
The CoPilot uses this tool to report back to the user whether an
|
||||
operation that was started earlier has completed, failed, or is still
|
||||
running.
|
||||
"""
|
||||
|
||||
@property
|
||||
def name(self) -> str:
|
||||
return "check_operation_status"
|
||||
|
||||
@property
|
||||
def description(self) -> str:
|
||||
return (
|
||||
"Check the current status of a long-running operation such as "
|
||||
"create_agent or edit_agent. Accepts either an operation_id or "
|
||||
"task_id from a previous operation_started response. "
|
||||
"Returns the current status: running, completed, or failed."
|
||||
)
|
||||
|
||||
@property
|
||||
def parameters(self) -> dict[str, Any]:
|
||||
return {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"operation_id": {
|
||||
"type": "string",
|
||||
"description": (
|
||||
"The operation_id from an operation_started response."
|
||||
),
|
||||
},
|
||||
"task_id": {
|
||||
"type": "string",
|
||||
"description": (
|
||||
"The task_id from an operation_started response. "
|
||||
"Used as fallback if operation_id is not provided."
|
||||
),
|
||||
},
|
||||
},
|
||||
"required": [],
|
||||
}
|
||||
|
||||
@property
|
||||
def requires_auth(self) -> bool:
|
||||
return False
|
||||
|
||||
async def _execute(
|
||||
self,
|
||||
user_id: str | None,
|
||||
session: ChatSession,
|
||||
**kwargs,
|
||||
) -> ToolResponseBase:
|
||||
from backend.api.features.chat import stream_registry
|
||||
|
||||
operation_id: str = kwargs.get("operation_id", "").strip()
|
||||
task_id: str = kwargs.get("task_id", "").strip()
|
||||
|
||||
if not operation_id and not task_id:
|
||||
return ErrorResponse(
|
||||
message="Please provide an operation_id or task_id.",
|
||||
error="missing_parameter",
|
||||
)
|
||||
|
||||
task = None
|
||||
if operation_id:
|
||||
task = await stream_registry.find_task_by_operation_id(operation_id)
|
||||
if task is None and task_id:
|
||||
task = await stream_registry.get_task(task_id)
|
||||
|
||||
if task is None:
|
||||
# Task not in Redis — it may have already expired (TTL).
|
||||
# Check conversation history for the result instead.
|
||||
return ErrorResponse(
|
||||
message=(
|
||||
"Operation not found — it may have already completed and "
|
||||
"expired from the status tracker. Check the conversation "
|
||||
"history for the result."
|
||||
),
|
||||
error="not_found",
|
||||
)
|
||||
|
||||
status_messages = {
|
||||
"running": (
|
||||
f"The {task.tool_name or 'operation'} is still running. "
|
||||
"Please wait for it to complete."
|
||||
),
|
||||
"completed": (
|
||||
f"The {task.tool_name or 'operation'} has completed successfully."
|
||||
),
|
||||
"failed": f"The {task.tool_name or 'operation'} has failed.",
|
||||
}
|
||||
|
||||
return OperationStatusResponse(
|
||||
task_id=task.task_id,
|
||||
operation_id=task.operation_id,
|
||||
status=task.status,
|
||||
tool_name=task.tool_name,
|
||||
message=status_messages.get(task.status, f"Status: {task.status}"),
|
||||
)
|
||||
@@ -44,6 +44,8 @@ class ResponseType(str, Enum):
|
||||
WEB_FETCH = "web_fetch"
|
||||
# Code execution
|
||||
BASH_EXEC = "bash_exec"
|
||||
# Operation status check
|
||||
OPERATION_STATUS = "operation_status"
|
||||
|
||||
|
||||
# Base response model
|
||||
|
||||
Reference in New Issue
Block a user