Files
AutoGPT/autogpt_platform/backend/backend/copilot/executor/utils.py
Reinier van der Leer d23248f065 feat(backend/copilot): Copilot Executor Microservice (#12057)
Uncouple Copilot task execution from the REST API server. This should
help performance and scalability, and allows task execution to continue
regardless of the state of the user's connection.

- Resolves #12023

### Changes 🏗️

- Add `backend.copilot.executor`->`CoPilotExecutor` (setup similar to
`backend.executor`->`ExecutionManager`).

This executor service uses RabbitMQ-based task distribution, and sticks
with the existing Redis Streams setup for task output. It uses a cluster
lock mechanism to ensure a task is only executed by one pod, and the
`DatabaseManager` for pooled DB access.

- Add `backend.data.db_accessors` for automatic choice of direct/proxied
DB access

Chat requests now flow: API → RabbitMQ → CoPilot Executor → Redis
Streams → SSE Client. This enables horizontal scaling of chat processing
and isolates long-running LLM operations from the API service.

- Move non-API Copilot stuff into `backend.copilot` (from
`backend.api.features.chat`)
  - Updated import paths for all usages

- Move `backend.executor.database` to `backend.data.db_manager` and add
methods for copilot executor
  - Updated import paths for all usages
- Make `backend.copilot.db` RPC-compatible (-> DB ops return ~~Prisma~~
Pydantic models)
  - Make `backend.data.workspace` RPC-compatible
  - Make `backend.data.graphs.get_store_listed_graphs` RPC-compatible

DX:
- Add `copilot_executor` service to Docker setup

Config:
- Add `Config.num_copilot_workers` (default 5) and
`Config.copilot_executor_port` (default 8008)
- Remove unused `Config.agent_server_port`

> [!WARNING]
> **This change adds a new microservice to the system, with entrypoint
`backend.copilot.executor`.**
> The `docker compose` setup has been updated, but if you run the
Platform on something else, you'll have to update your deployment config
to include this new service.
>
> When running locally, the `CoPilotExecutor` uses port 8008 by default.

### Checklist 📋

#### For code changes:
- [x] I have clearly listed my changes in the PR description
- [x] I have made a test plan
- [x] I have tested my changes according to the test plan:
  - [x] Copilot works
    - [x] Processes messages when triggered
    - [x] Can use its tools

#### For configuration changes:

- [x] `.env.default` is updated or already compatible with my changes
- [x] `docker-compose.yml` is updated or already compatible with my
changes
- [x] I have included a list of my configuration changes in the PR
description (under **Changes**)

---------

Co-authored-by: Zamil Majdy <zamil.majdy@agpt.co>
2026-02-17 16:15:28 +00:00

208 lines
6.2 KiB
Python

"""RabbitMQ queue configuration for CoPilot executor.
Defines two exchanges and queues following the graph executor pattern:
- 'copilot_execution' (DIRECT) for chat generation tasks
- 'copilot_cancel' (FANOUT) for cancellation requests
"""
import logging
from pydantic import BaseModel
from backend.data.rabbitmq import Exchange, ExchangeType, Queue, RabbitMQConfig
from backend.util.logging import TruncatedLogger, is_structured_logging_enabled
logger = logging.getLogger(__name__)
# ============ Logging Helper ============ #
class CoPilotLogMetadata(TruncatedLogger):
"""Structured logging helper for CoPilot executor.
In cloud environments (structured logging enabled), uses a simple prefix
and passes metadata via json_fields. In local environments, uses a detailed
prefix with all metadata key-value pairs for easier debugging.
Args:
logger: The underlying logger instance
max_length: Maximum log message length before truncation
**kwargs: Metadata key-value pairs (e.g., task_id="abc", session_id="xyz")
These are added to json_fields in cloud mode, or to the prefix in local mode.
"""
def __init__(
self,
logger: logging.Logger,
max_length: int = 1000,
**kwargs: str | None,
):
# Filter out None values
metadata = {k: v for k, v in kwargs.items() if v is not None}
metadata["component"] = "CoPilotExecutor"
if is_structured_logging_enabled():
prefix = "[CoPilotExecutor]"
else:
# Build prefix from metadata key-value pairs
meta_parts = "|".join(
f"{k}:{v}" for k, v in metadata.items() if k != "component"
)
prefix = (
f"[CoPilotExecutor|{meta_parts}]" if meta_parts else "[CoPilotExecutor]"
)
super().__init__(
logger,
max_length=max_length,
prefix=prefix,
metadata=metadata,
)
# ============ Exchange and Queue Configuration ============ #
COPILOT_EXECUTION_EXCHANGE = Exchange(
name="copilot_execution",
type=ExchangeType.DIRECT,
durable=True,
auto_delete=False,
)
COPILOT_EXECUTION_QUEUE_NAME = "copilot_execution_queue"
COPILOT_EXECUTION_ROUTING_KEY = "copilot.run"
COPILOT_CANCEL_EXCHANGE = Exchange(
name="copilot_cancel",
type=ExchangeType.FANOUT,
durable=True,
auto_delete=False,
)
COPILOT_CANCEL_QUEUE_NAME = "copilot_cancel_queue"
# CoPilot operations can include extended thinking and agent generation
# which may take 30+ minutes to complete
COPILOT_CONSUMER_TIMEOUT_SECONDS = 60 * 60 # 1 hour
# Graceful shutdown timeout - allow in-flight operations to complete
GRACEFUL_SHUTDOWN_TIMEOUT_SECONDS = 30 * 60 # 30 minutes
def create_copilot_queue_config() -> RabbitMQConfig:
"""Create RabbitMQ configuration for CoPilot executor.
Defines two exchanges and queues:
- 'copilot_execution' (DIRECT) for chat generation tasks
- 'copilot_cancel' (FANOUT) for cancellation requests
Returns:
RabbitMQConfig with exchanges and queues defined
"""
run_queue = Queue(
name=COPILOT_EXECUTION_QUEUE_NAME,
exchange=COPILOT_EXECUTION_EXCHANGE,
routing_key=COPILOT_EXECUTION_ROUTING_KEY,
durable=True,
auto_delete=False,
arguments={
# Extended consumer timeout for long-running LLM operations
# Default 30-minute timeout is insufficient for extended thinking
# and agent generation which can take 30+ minutes
"x-consumer-timeout": COPILOT_CONSUMER_TIMEOUT_SECONDS
* 1000,
},
)
cancel_queue = Queue(
name=COPILOT_CANCEL_QUEUE_NAME,
exchange=COPILOT_CANCEL_EXCHANGE,
routing_key="", # not used for FANOUT
durable=True,
auto_delete=False,
)
return RabbitMQConfig(
vhost="/",
exchanges=[COPILOT_EXECUTION_EXCHANGE, COPILOT_CANCEL_EXCHANGE],
queues=[run_queue, cancel_queue],
)
# ============ Message Models ============ #
class CoPilotExecutionEntry(BaseModel):
"""Task payload for CoPilot AI generation.
This model represents a chat generation task to be processed by the executor.
"""
task_id: str
"""Unique identifier for this task (used for stream registry)"""
session_id: str
"""Chat session ID"""
user_id: str | None
"""User ID (may be None for anonymous users)"""
operation_id: str
"""Operation ID for webhook callbacks and completion tracking"""
message: str
"""User's message to process"""
is_user_message: bool = True
"""Whether the message is from the user (vs system/assistant)"""
context: dict[str, str] | None = None
"""Optional context for the message (e.g., {url: str, content: str})"""
class CancelCoPilotEvent(BaseModel):
"""Event to cancel a CoPilot operation."""
task_id: str
"""Task ID to cancel"""
# ============ Queue Publishing Helpers ============ #
async def enqueue_copilot_task(
task_id: str,
session_id: str,
user_id: str | None,
operation_id: str,
message: str,
is_user_message: bool = True,
context: dict[str, str] | None = None,
) -> None:
"""Enqueue a CoPilot task for processing by the executor service.
Args:
task_id: Unique identifier for this task (used for stream registry)
session_id: Chat session ID
user_id: User ID (may be None for anonymous users)
operation_id: Operation ID for webhook callbacks and completion tracking
message: User's message to process
is_user_message: Whether the message is from the user (vs system/assistant)
context: Optional context for the message (e.g., {url: str, content: str})
"""
from backend.util.clients import get_async_copilot_queue
entry = CoPilotExecutionEntry(
task_id=task_id,
session_id=session_id,
user_id=user_id,
operation_id=operation_id,
message=message,
is_user_message=is_user_message,
context=context,
)
queue_client = await get_async_copilot_queue()
await queue_client.publish_message(
routing_key=COPILOT_EXECUTION_ROUTING_KEY,
message=entry.model_dump_json(),
exchange=COPILOT_EXECUTION_EXCHANGE,
)