From ecb9fdae25dfcc7d3a554aec0bb0d244566c98c6 Mon Sep 17 00:00:00 2001 From: Reinier van der Leer Date: Tue, 10 Feb 2026 22:48:01 +0100 Subject: [PATCH] feat(backend/copilot): Copilot Executor Microservice --- .../backend/backend/api/external/v1/tools.py | 6 +- .../backend/api/features/chat/routes.py | 112 ++--- .../backend/backend/api/rest_api.py | 4 +- autogpt_platform/backend/backend/app.py | 2 + .../backend/backend/copilot/__init__.py | 8 + .../chat => copilot}/completion_consumer.py | 5 +- .../chat => copilot}/completion_handler.py | 0 .../{api/features/chat => copilot}/config.py | 0 .../{api/features/chat => copilot}/db.py | 0 .../backend/copilot/executor/__init__.py | 5 + .../backend/copilot/executor/__main__.py | 18 + .../backend/copilot/executor/manager.py | 462 ++++++++++++++++++ .../backend/copilot/executor/processor.py | 252 ++++++++++ .../backend/backend/copilot/executor/utils.py | 207 ++++++++ .../{api/features/chat => copilot}/model.py | 0 .../features/chat => copilot}/model_test.py | 0 .../chat => copilot}/response_model.py | 0 .../{api/features/chat => copilot}/service.py | 0 .../features/chat => copilot}/service_test.py | 0 .../chat => copilot}/stream_registry.py | 0 .../features/chat => copilot}/tools/IDEAS.md | 0 .../chat => copilot}/tools/__init__.py | 6 +- .../chat => copilot}/tools/_test_data.py | 2 +- .../tools/add_understanding.py | 2 +- .../tools/agent_generator/__init__.py | 0 .../tools/agent_generator/core.py | 0 .../tools/agent_generator/errors.py | 0 .../tools/agent_generator/service.py | 0 .../chat => copilot}/tools/agent_output.py | 2 +- .../chat => copilot}/tools/agent_search.py | 0 .../features/chat => copilot}/tools/base.py | 4 +- .../chat => copilot}/tools/create_agent.py | 2 +- .../chat => copilot}/tools/customize_agent.py | 2 +- .../chat => copilot}/tools/edit_agent.py | 2 +- .../chat => copilot}/tools/find_agent.py | 2 +- .../chat => copilot}/tools/find_block.py | 8 +- .../chat => copilot}/tools/find_block_test.py | 4 +- .../tools/find_library_agent.py | 2 +- .../chat => copilot}/tools/get_doc_page.py | 6 +- .../chat => copilot}/tools/helpers.py | 0 .../features/chat => copilot}/tools/models.py | 0 .../chat => copilot}/tools/run_agent.py | 9 +- .../chat => copilot}/tools/run_agent_test.py | 0 .../chat => copilot}/tools/run_block.py | 4 +- .../chat => copilot}/tools/run_block_test.py | 4 +- .../chat => copilot}/tools/search_docs.py | 8 +- .../features/chat => copilot}/tools/utils.py | 0 .../chat => copilot}/tools/workspace_files.py | 2 +- .../features/chat => copilot}/tracking.py | 0 .../backend/backend/util/clients.py | 16 +- .../backend/backend/util/settings.py | 17 +- .../agent_generator/test_core_integration.py | 6 +- .../agent_generator/test_library_agents.py | 2 +- .../test/agent_generator/test_service.py | 2 +- 54 files changed, 1056 insertions(+), 139 deletions(-) create mode 100644 autogpt_platform/backend/backend/copilot/__init__.py rename autogpt_platform/backend/backend/{api/features/chat => copilot}/completion_consumer.py (99%) rename autogpt_platform/backend/backend/{api/features/chat => copilot}/completion_handler.py (100%) rename autogpt_platform/backend/backend/{api/features/chat => copilot}/config.py (100%) rename autogpt_platform/backend/backend/{api/features/chat => copilot}/db.py (100%) create mode 100644 autogpt_platform/backend/backend/copilot/executor/__init__.py create mode 100644 autogpt_platform/backend/backend/copilot/executor/__main__.py create mode 100644 autogpt_platform/backend/backend/copilot/executor/manager.py create mode 100644 autogpt_platform/backend/backend/copilot/executor/processor.py create mode 100644 autogpt_platform/backend/backend/copilot/executor/utils.py rename autogpt_platform/backend/backend/{api/features/chat => copilot}/model.py (100%) rename autogpt_platform/backend/backend/{api/features/chat => copilot}/model_test.py (100%) rename autogpt_platform/backend/backend/{api/features/chat => copilot}/response_model.py (100%) rename autogpt_platform/backend/backend/{api/features/chat => copilot}/service.py (100%) rename autogpt_platform/backend/backend/{api/features/chat => copilot}/service_test.py (100%) rename autogpt_platform/backend/backend/{api/features/chat => copilot}/stream_registry.py (100%) rename autogpt_platform/backend/backend/{api/features/chat => copilot}/tools/IDEAS.md (100%) rename autogpt_platform/backend/backend/{api/features/chat => copilot}/tools/__init__.py (93%) rename autogpt_platform/backend/backend/{api/features/chat => copilot}/tools/_test_data.py (99%) rename autogpt_platform/backend/backend/{api/features/chat => copilot}/tools/add_understanding.py (98%) rename autogpt_platform/backend/backend/{api/features/chat => copilot}/tools/agent_generator/__init__.py (100%) rename autogpt_platform/backend/backend/{api/features/chat => copilot}/tools/agent_generator/core.py (100%) rename autogpt_platform/backend/backend/{api/features/chat => copilot}/tools/agent_generator/errors.py (100%) rename autogpt_platform/backend/backend/{api/features/chat => copilot}/tools/agent_generator/service.py (100%) rename autogpt_platform/backend/backend/{api/features/chat => copilot}/tools/agent_output.py (99%) rename autogpt_platform/backend/backend/{api/features/chat => copilot}/tools/agent_search.py (100%) rename autogpt_platform/backend/backend/{api/features/chat => copilot}/tools/base.py (96%) rename autogpt_platform/backend/backend/{api/features/chat => copilot}/tools/create_agent.py (99%) rename autogpt_platform/backend/backend/{api/features/chat => copilot}/tools/customize_agent.py (99%) rename autogpt_platform/backend/backend/{api/features/chat => copilot}/tools/edit_agent.py (99%) rename autogpt_platform/backend/backend/{api/features/chat => copilot}/tools/find_agent.py (95%) rename autogpt_platform/backend/backend/{api/features/chat => copilot}/tools/find_block.py (97%) rename autogpt_platform/backend/backend/{api/features/chat => copilot}/tools/find_block_test.py (97%) rename autogpt_platform/backend/backend/{api/features/chat => copilot}/tools/find_library_agent.py (96%) rename autogpt_platform/backend/backend/{api/features/chat => copilot}/tools/get_doc_page.py (96%) rename autogpt_platform/backend/backend/{api/features/chat => copilot}/tools/helpers.py (100%) rename autogpt_platform/backend/backend/{api/features/chat => copilot}/tools/models.py (100%) rename autogpt_platform/backend/backend/{api/features/chat => copilot}/tools/run_agent.py (98%) rename autogpt_platform/backend/backend/{api/features/chat => copilot}/tools/run_agent_test.py (100%) rename autogpt_platform/backend/backend/{api/features/chat => copilot}/tools/run_block.py (99%) rename autogpt_platform/backend/backend/{api/features/chat => copilot}/tools/run_block_test.py (96%) rename autogpt_platform/backend/backend/{api/features/chat => copilot}/tools/search_docs.py (97%) rename autogpt_platform/backend/backend/{api/features/chat => copilot}/tools/utils.py (100%) rename autogpt_platform/backend/backend/{api/features/chat => copilot}/tools/workspace_files.py (99%) rename autogpt_platform/backend/backend/{api/features/chat => copilot}/tracking.py (100%) diff --git a/autogpt_platform/backend/backend/api/external/v1/tools.py b/autogpt_platform/backend/backend/api/external/v1/tools.py index 07734dd0c9..7b8b327919 100644 --- a/autogpt_platform/backend/backend/api/external/v1/tools.py +++ b/autogpt_platform/backend/backend/api/external/v1/tools.py @@ -15,9 +15,9 @@ from prisma.enums import APIKeyPermission from pydantic import BaseModel, Field from backend.api.external.middleware import require_permission -from backend.api.features.chat.model import ChatSession -from backend.api.features.chat.tools import find_agent_tool, run_agent_tool -from backend.api.features.chat.tools.models import ToolResponseBase +from backend.copilot.model import ChatSession +from backend.copilot.tools import find_agent_tool, run_agent_tool +from backend.copilot.tools.models import ToolResponseBase from backend.data.auth.base import APIAuthorizationInfo logger = logging.getLogger(__name__) diff --git a/autogpt_platform/backend/backend/api/features/chat/routes.py b/autogpt_platform/backend/backend/api/features/chat/routes.py index c6f37569b7..9982b5b2f2 100644 --- a/autogpt_platform/backend/backend/api/features/chat/routes.py +++ b/autogpt_platform/backend/backend/api/features/chat/routes.py @@ -10,15 +10,22 @@ from fastapi import APIRouter, Depends, Header, HTTPException, Query, Response, from fastapi.responses import StreamingResponse from pydantic import BaseModel -from backend.util.exceptions import NotFoundError - -from . import service as chat_service -from . import stream_registry -from .completion_handler import process_operation_failure, process_operation_success -from .config import ChatConfig -from .model import ChatSession, create_chat_session, get_chat_session, get_user_sessions -from .response_model import StreamFinish, StreamHeartbeat -from .tools.models import ( +from backend.copilot import service as chat_service +from backend.copilot import stream_registry +from backend.copilot.completion_handler import ( + process_operation_failure, + process_operation_success, +) +from backend.copilot.config import ChatConfig +from backend.copilot.executor.utils import enqueue_copilot_task +from backend.copilot.model import ( + ChatSession, + create_chat_session, + get_chat_session, + get_user_sessions, +) +from backend.copilot.response_model import StreamFinish, StreamHeartbeat +from backend.copilot.tools.models import ( AgentDetailsResponse, AgentOutputResponse, AgentPreviewResponse, @@ -40,6 +47,7 @@ from .tools.models import ( SetupRequirementsResponse, UnderstandingUpdatedResponse, ) +from backend.util.exceptions import NotFoundError config = ChatConfig() @@ -301,7 +309,7 @@ async def stream_chat_post( extra={"json_fields": log_meta}, ) - session = await _validate_and_get_session(session_id, user_id) + _session = await _validate_and_get_session(session_id, user_id) # noqa: F841 logger.info( f"[TIMING] session validated in {(time.perf_counter() - stream_start_time)*1000:.1f}ms", extra={ @@ -336,82 +344,20 @@ async def stream_chat_post( }, ) - # Background task that runs the AI generation independently of SSE connection - async def run_ai_generation(): - import time as time_module + # Enqueue the task to RabbitMQ for processing by the CoPilot executor + await enqueue_copilot_task( + task_id=task_id, + session_id=session_id, + user_id=user_id, + operation_id=operation_id, + message=request.message, + is_user_message=request.is_user_message, + context=request.context, + ) - gen_start_time = time_module.perf_counter() - logger.info( - f"[TIMING] run_ai_generation STARTED, task={task_id}, session={session_id}, user={user_id}", - extra={"json_fields": log_meta}, - ) - first_chunk_time, ttfc = None, None - chunk_count = 0 - try: - async for chunk in chat_service.stream_chat_completion( - session_id, - request.message, - is_user_message=request.is_user_message, - user_id=user_id, - session=session, # Pass pre-fetched session to avoid double-fetch - context=request.context, - _task_id=task_id, # Pass task_id so service emits start with taskId for reconnection - ): - chunk_count += 1 - if first_chunk_time is None: - first_chunk_time = time_module.perf_counter() - ttfc = first_chunk_time - gen_start_time - logger.info( - f"[TIMING] FIRST AI CHUNK at {ttfc:.2f}s, type={type(chunk).__name__}", - extra={ - "json_fields": { - **log_meta, - "chunk_type": type(chunk).__name__, - "time_to_first_chunk_ms": ttfc * 1000, - } - }, - ) - # Write to Redis (subscribers will receive via XREAD) - await stream_registry.publish_chunk(task_id, chunk) - - gen_end_time = time_module.perf_counter() - total_time = (gen_end_time - gen_start_time) * 1000 - logger.info( - f"[TIMING] run_ai_generation FINISHED in {total_time/1000:.1f}s; " - f"task={task_id}, session={session_id}, " - f"ttfc={ttfc or -1:.2f}s, n_chunks={chunk_count}", - extra={ - "json_fields": { - **log_meta, - "total_time_ms": total_time, - "time_to_first_chunk_ms": ( - ttfc * 1000 if ttfc is not None else None - ), - "n_chunks": chunk_count, - } - }, - ) - await stream_registry.mark_task_completed(task_id, "completed") - except Exception as e: - elapsed = time_module.perf_counter() - gen_start_time - logger.error( - f"[TIMING] run_ai_generation ERROR after {elapsed:.2f}s: {e}", - extra={ - "json_fields": { - **log_meta, - "elapsed_ms": elapsed * 1000, - "error": str(e), - } - }, - ) - await stream_registry.mark_task_completed(task_id, "failed") - - # Start the AI generation in a background task - bg_task = asyncio.create_task(run_ai_generation()) - await stream_registry.set_task_asyncio_task(task_id, bg_task) setup_time = (time.perf_counter() - stream_start_time) * 1000 logger.info( - f"[TIMING] Background task started, setup={setup_time:.1f}ms", + f"[TIMING] Task enqueued to RabbitMQ, setup={setup_time:.1f}ms", extra={"json_fields": {**log_meta, "setup_time_ms": setup_time}}, ) diff --git a/autogpt_platform/backend/backend/api/rest_api.py b/autogpt_platform/backend/backend/api/rest_api.py index 0eef76193e..7220845679 100644 --- a/autogpt_platform/backend/backend/api/rest_api.py +++ b/autogpt_platform/backend/backend/api/rest_api.py @@ -40,11 +40,11 @@ import backend.data.user import backend.integrations.webhooks.utils import backend.util.service import backend.util.settings -from backend.api.features.chat.completion_consumer import ( +from backend.blocks.llm import DEFAULT_LLM_MODEL +from backend.copilot.completion_consumer import ( start_completion_consumer, stop_completion_consumer, ) -from backend.blocks.llm import DEFAULT_LLM_MODEL from backend.data.model import Credentials from backend.integrations.providers import ProviderName from backend.monitoring.instrumentation import instrument_fastapi diff --git a/autogpt_platform/backend/backend/app.py b/autogpt_platform/backend/backend/app.py index 0afed130ed..d3abd80b12 100644 --- a/autogpt_platform/backend/backend/app.py +++ b/autogpt_platform/backend/backend/app.py @@ -38,6 +38,7 @@ def main(**kwargs): from backend.api.rest_api import AgentServer from backend.api.ws_api import WebsocketServer + from backend.copilot.executor.manager import CoPilotExecutor from backend.executor import DatabaseManager, ExecutionManager, Scheduler from backend.notifications import NotificationManager @@ -48,6 +49,7 @@ def main(**kwargs): WebsocketServer(), AgentServer(), ExecutionManager(), + CoPilotExecutor(), **kwargs, ) diff --git a/autogpt_platform/backend/backend/copilot/__init__.py b/autogpt_platform/backend/backend/copilot/__init__.py new file mode 100644 index 0000000000..65a2efefe1 --- /dev/null +++ b/autogpt_platform/backend/backend/copilot/__init__.py @@ -0,0 +1,8 @@ +"""CoPilot module - AI assistant for AutoGPT platform. + +This module contains the core CoPilot functionality including: +- AI generation service (LLM calls) +- Tool execution +- Session management +- Stream registry for SSE reconnection +""" diff --git a/autogpt_platform/backend/backend/api/features/chat/completion_consumer.py b/autogpt_platform/backend/backend/copilot/completion_consumer.py similarity index 99% rename from autogpt_platform/backend/backend/api/features/chat/completion_consumer.py rename to autogpt_platform/backend/backend/copilot/completion_consumer.py index f447d46bd7..622760f5d8 100644 --- a/autogpt_platform/backend/backend/api/features/chat/completion_consumer.py +++ b/autogpt_platform/backend/backend/copilot/completion_consumer.py @@ -119,8 +119,9 @@ class ChatCompletionConsumer: """Lazily initialize Prisma client on first use.""" if self._prisma is None: database_url = os.getenv("DATABASE_URL", "postgresql://localhost:5432") - self._prisma = Prisma(datasource={"url": database_url}) - await self._prisma.connect() + prisma = Prisma(datasource={"url": database_url}) + await prisma.connect() + self._prisma = prisma logger.info("[COMPLETION] Consumer Prisma client connected (lazy init)") return self._prisma diff --git a/autogpt_platform/backend/backend/api/features/chat/completion_handler.py b/autogpt_platform/backend/backend/copilot/completion_handler.py similarity index 100% rename from autogpt_platform/backend/backend/api/features/chat/completion_handler.py rename to autogpt_platform/backend/backend/copilot/completion_handler.py diff --git a/autogpt_platform/backend/backend/api/features/chat/config.py b/autogpt_platform/backend/backend/copilot/config.py similarity index 100% rename from autogpt_platform/backend/backend/api/features/chat/config.py rename to autogpt_platform/backend/backend/copilot/config.py diff --git a/autogpt_platform/backend/backend/api/features/chat/db.py b/autogpt_platform/backend/backend/copilot/db.py similarity index 100% rename from autogpt_platform/backend/backend/api/features/chat/db.py rename to autogpt_platform/backend/backend/copilot/db.py diff --git a/autogpt_platform/backend/backend/copilot/executor/__init__.py b/autogpt_platform/backend/backend/copilot/executor/__init__.py new file mode 100644 index 0000000000..373a13f0a7 --- /dev/null +++ b/autogpt_platform/backend/backend/copilot/executor/__init__.py @@ -0,0 +1,5 @@ +"""CoPilot Executor - Dedicated service for AI generation and tool execution. + +This module contains the executor service that processes CoPilot tasks +from RabbitMQ, following the graph executor pattern. +""" diff --git a/autogpt_platform/backend/backend/copilot/executor/__main__.py b/autogpt_platform/backend/backend/copilot/executor/__main__.py new file mode 100644 index 0000000000..00d42d6d95 --- /dev/null +++ b/autogpt_platform/backend/backend/copilot/executor/__main__.py @@ -0,0 +1,18 @@ +"""Entry point for running the CoPilot Executor service. + +Usage: + python -m backend.copilot.executor +""" + +from backend.app import run_processes + +from .manager import CoPilotExecutor + + +def main(): + """Run the CoPilot Executor service.""" + run_processes(CoPilotExecutor()) + + +if __name__ == "__main__": + main() diff --git a/autogpt_platform/backend/backend/copilot/executor/manager.py b/autogpt_platform/backend/backend/copilot/executor/manager.py new file mode 100644 index 0000000000..ce0c11d4de --- /dev/null +++ b/autogpt_platform/backend/backend/copilot/executor/manager.py @@ -0,0 +1,462 @@ +"""CoPilot Executor Manager - main service for CoPilot task execution. + +This module contains the CoPilotExecutor class that consumes chat tasks from +RabbitMQ and processes them using a thread pool, following the graph executor pattern. +""" + +import logging +import os +import threading +import time +import uuid +from concurrent.futures import Future, ThreadPoolExecutor + +from pika.adapters.blocking_connection import BlockingChannel +from pika.spec import Basic, BasicProperties +from prometheus_client import Gauge, start_http_server + +from backend.data import redis_client as redis +from backend.data.rabbitmq import SyncRabbitMQ +from backend.executor.cluster_lock import ClusterLock +from backend.util.decorator import error_logged +from backend.util.logging import TruncatedLogger +from backend.util.process import AppProcess +from backend.util.retry import continuous_retry, func_retry +from backend.util.settings import Settings + +from .processor import execute_copilot_task, init_worker +from .utils import ( + COPILOT_CANCEL_QUEUE_NAME, + COPILOT_EXECUTION_QUEUE_NAME, + GRACEFUL_SHUTDOWN_TIMEOUT_SECONDS, + CancelCoPilotEvent, + CoPilotExecutionEntry, + create_copilot_queue_config, +) + +logger = TruncatedLogger(logging.getLogger(__name__), prefix="[CoPilotExecutor]") +settings = Settings() + +# Prometheus metrics +active_tasks_gauge = Gauge( + "copilot_executor_active_tasks", + "Number of active CoPilot tasks", +) +pool_size_gauge = Gauge( + "copilot_executor_pool_size", + "Maximum number of CoPilot executor workers", +) +utilization_gauge = Gauge( + "copilot_executor_utilization_ratio", + "Ratio of active tasks to pool size", +) + + +class CoPilotExecutor(AppProcess): + """CoPilot Executor service for processing chat generation tasks. + + This service consumes tasks from RabbitMQ, processes them using a thread pool, + and publishes results to Redis Streams. It follows the graph executor pattern + for reliable message handling and graceful shutdown. + + Key features: + - RabbitMQ-based task distribution with manual acknowledgment + - Thread pool executor for concurrent task processing + - Cluster lock for duplicate prevention across pods + - Graceful shutdown with timeout for in-flight tasks + - FANOUT exchange for cancellation broadcast + """ + + def __init__(self): + super().__init__() + self.pool_size = settings.config.num_copilot_workers + self.active_tasks: dict[str, tuple[Future, threading.Event]] = {} + self.executor_id = str(uuid.uuid4()) + + self._executor = None + self._stop_consuming = None + + self._cancel_thread = None + self._cancel_client = None + self._run_thread = None + self._run_client = None + + self._task_locks: dict[str, ClusterLock] = {} + + # ============ Main Entry Points (AppProcess interface) ============ # + + def run(self): + """Main service loop - consume from RabbitMQ.""" + logger.info(f"Pod assigned executor_id: {self.executor_id}") + logger.info(f"Spawn max-{self.pool_size} workers...") + + pool_size_gauge.set(self.pool_size) + self._update_metrics() + start_http_server(settings.config.copilot_executor_port) + + self.cancel_thread.start() + self.run_thread.start() + + while True: + time.sleep(1e5) + + def cleanup(self): + """Graceful shutdown with active execution waiting.""" + pid = os.getpid() + logger.info(f"[cleanup {pid}] Starting graceful shutdown...") + + # Signal the consumer thread to stop + try: + self.stop_consuming.set() + run_channel = self.run_client.get_channel() + run_channel.connection.add_callback_threadsafe( + lambda: run_channel.stop_consuming() + ) + logger.info(f"[cleanup {pid}] Consumer has been signaled to stop") + except Exception as e: + logger.error(f"[cleanup {pid}] Error stopping consumer: {e}") + + # Wait for active executions to complete + if self.active_tasks: + logger.info( + f"[cleanup {pid}] Waiting for {len(self.active_tasks)} active tasks to complete (timeout: {GRACEFUL_SHUTDOWN_TIMEOUT_SECONDS}s)..." + ) + + start_time = time.monotonic() + last_refresh = start_time + lock_refresh_interval = settings.config.cluster_lock_timeout / 10 + + while ( + self.active_tasks + and (time.monotonic() - start_time) < GRACEFUL_SHUTDOWN_TIMEOUT_SECONDS + ): + self._cleanup_completed_tasks() + if not self.active_tasks: + break + + # Refresh cluster locks periodically + current_time = time.monotonic() + if current_time - last_refresh >= lock_refresh_interval: + for lock in self._task_locks.values(): + try: + lock.refresh() + except Exception as e: + logger.warning( + f"[cleanup {pid}] Failed to refresh lock: {e}" + ) + last_refresh = current_time + + logger.info( + f"[cleanup {pid}] {len(self.active_tasks)} tasks still active, waiting..." + ) + time.sleep(10.0) + + # Stop message consumers + if self._run_thread: + self._stop_message_consumers( + self._run_thread, self.run_client, "[cleanup][run]" + ) + if self._cancel_thread: + self._stop_message_consumers( + self._cancel_thread, self.cancel_client, "[cleanup][cancel]" + ) + + # Shutdown executor + if self._executor: + logger.info(f"[cleanup {pid}] Shutting down executor...") + self._executor.shutdown(wait=False) + + # Release any remaining locks + for task_id, lock in list(self._task_locks.items()): + try: + lock.release() + logger.info(f"[cleanup {pid}] Released lock for {task_id}") + except Exception as e: + logger.error( + f"[cleanup {pid}] Failed to release lock for {task_id}: {e}" + ) + + logger.info(f"[cleanup {pid}] Graceful shutdown completed") + + # ============ RabbitMQ Consumer Methods ============ # + + @continuous_retry() + def _consume_cancel(self): + """Consume cancellation messages from FANOUT exchange.""" + if self.stop_consuming.is_set() and not self.active_tasks: + logger.info("Stop reconnecting cancel consumer - service cleaned up") + return + + if not self.cancel_client.is_ready: + self.cancel_client.disconnect() + self.cancel_client.connect() + cancel_channel = self.cancel_client.get_channel() + cancel_channel.basic_consume( + queue=COPILOT_CANCEL_QUEUE_NAME, + on_message_callback=self._handle_cancel_message, + auto_ack=True, + ) + logger.info("Starting cancel message consumer...") + cancel_channel.start_consuming() + if not self.stop_consuming.is_set() or self.active_tasks: + raise RuntimeError("Cancel message consumer stopped unexpectedly") + logger.info("Cancel message consumer stopped gracefully") + + @continuous_retry() + def _consume_run(self): + """Consume run messages from DIRECT exchange.""" + if self.stop_consuming.is_set(): + logger.info("Stop reconnecting run consumer - service cleaned up") + return + + if not self.run_client.is_ready: + self.run_client.disconnect() + self.run_client.connect() + run_channel = self.run_client.get_channel() + run_channel.basic_qos(prefetch_count=self.pool_size) + + run_channel.basic_consume( + queue=COPILOT_EXECUTION_QUEUE_NAME, + on_message_callback=self._handle_run_message, + auto_ack=False, + consumer_tag="copilot_execution_consumer", + ) + run_channel.confirm_delivery() + logger.info("Starting to consume run messages...") + run_channel.start_consuming() + if not self.stop_consuming.is_set(): + raise RuntimeError("Run message consumer stopped unexpectedly") + logger.info("Run message consumer stopped gracefully") + + # ============ Message Handlers ============ # + + @error_logged(swallow=True) + def _handle_cancel_message( + self, + _channel: BlockingChannel, + _method: Basic.Deliver, + _properties: BasicProperties, + body: bytes, + ): + """Handle cancel message from FANOUT exchange.""" + request = CancelCoPilotEvent.model_validate_json(body) + task_id = request.task_id + if not task_id: + logger.warning("Cancel message missing 'task_id'") + return + if task_id not in self.active_tasks: + logger.debug(f"Cancel received for {task_id} but not active") + return + + _, cancel_event = self.active_tasks[task_id] + logger.info(f"Received cancel for {task_id}") + if not cancel_event.is_set(): + cancel_event.set() + else: + logger.debug(f"Cancel already set for {task_id}") + + def _handle_run_message( + self, + _channel: BlockingChannel, + method: Basic.Deliver, + _properties: BasicProperties, + body: bytes, + ): + """Handle run message from DIRECT exchange.""" + delivery_tag = method.delivery_tag + + @func_retry + def ack_message(reject: bool, requeue: bool): + """Acknowledge or reject the message.""" + channel = self.run_client.get_channel() + if reject: + channel.connection.add_callback_threadsafe( + lambda: channel.basic_nack(delivery_tag, requeue=requeue) + ) + else: + channel.connection.add_callback_threadsafe( + lambda: channel.basic_ack(delivery_tag) + ) + + # Check if we're shutting down + if self.stop_consuming.is_set(): + logger.info("Rejecting new task during shutdown") + ack_message(reject=True, requeue=True) + return + + # Check if we can accept more tasks + self._cleanup_completed_tasks() + if len(self.active_tasks) >= self.pool_size: + ack_message(reject=True, requeue=True) + return + + try: + entry = CoPilotExecutionEntry.model_validate_json(body) + except Exception as e: + logger.error(f"Could not parse run message: {e}, body={body}") + ack_message(reject=True, requeue=False) + return + + task_id = entry.task_id + + # Check for local duplicate + if task_id in self.active_tasks: + logger.warning(f"Task {task_id} already running locally") + ack_message(reject=True, requeue=True) + return + + # Try to acquire cluster-wide lock + cluster_lock = ClusterLock( + redis=redis.get_redis(), + key=f"copilot_lock:{task_id}", + owner_id=self.executor_id, + timeout=settings.config.cluster_lock_timeout, + ) + current_owner = cluster_lock.try_acquire() + if current_owner != self.executor_id: + if current_owner is not None: + logger.warning(f"Task {task_id} already running on pod {current_owner}") + ack_message(reject=True, requeue=False) + else: + logger.warning( + f"Could not acquire lock for {task_id} - Redis unavailable" + ) + ack_message(reject=True, requeue=True) + return + + # Execute the task + try: + self._task_locks[task_id] = cluster_lock + + logger.info( + f"Acquired cluster lock for {task_id}, executor_id={self.executor_id}" + ) + + cancel_event = threading.Event() + future = self.executor.submit( + execute_copilot_task, entry, cancel_event, cluster_lock + ) + self.active_tasks[task_id] = (future, cancel_event) + except Exception as e: + logger.warning(f"Failed to setup execution for {task_id}: {e}") + cluster_lock.release() + if task_id in self._task_locks: + del self._task_locks[task_id] + ack_message(reject=True, requeue=True) + return + + self._update_metrics() + + def on_run_done(f: Future): + logger.info(f"Run completed for {task_id}") + try: + if exec_error := f.exception(): + logger.error(f"Execution for {task_id} failed: {exec_error}") + ack_message(reject=True, requeue=True) + else: + ack_message(reject=False, requeue=False) + except BaseException as e: + logger.exception(f"Error in run completion callback: {e}") + finally: + # Release the cluster lock + if task_id in self._task_locks: + logger.info(f"Releasing cluster lock for {task_id}") + self._task_locks[task_id].release() + del self._task_locks[task_id] + self._cleanup_completed_tasks() + + future.add_done_callback(on_run_done) + + # ============ Helper Methods ============ # + + def _cleanup_completed_tasks(self) -> list[str]: + """Remove completed futures from active_tasks and update metrics.""" + completed_tasks = [] + for task_id, (future, _) in self.active_tasks.items(): + if future.done(): + completed_tasks.append(task_id) + + for task_id in completed_tasks: + logger.info(f"Cleaned up completed task {task_id}") + self.active_tasks.pop(task_id, None) + + self._update_metrics() + return completed_tasks + + def _update_metrics(self): + """Update Prometheus metrics.""" + active_count = len(self.active_tasks) + active_tasks_gauge.set(active_count) + if self.stop_consuming.is_set(): + utilization_gauge.set(1.0) + else: + utilization_gauge.set( + active_count / self.pool_size if self.pool_size > 0 else 0 + ) + + def _stop_message_consumers( + self, thread: threading.Thread, client: SyncRabbitMQ, prefix: str + ): + """Stop a message consumer thread.""" + try: + channel = client.get_channel() + channel.connection.add_callback_threadsafe(lambda: channel.stop_consuming()) + + try: + thread.join(timeout=300) + except TimeoutError: + logger.error( + f"{prefix} Thread did not finish in time, forcing disconnect" + ) + + client.disconnect() + logger.info(f"{prefix} Client disconnected") + except Exception as e: + logger.error(f"{prefix} Error disconnecting client: {e}") + + # ============ Lazy-initialized Properties ============ # + + @property + def cancel_thread(self) -> threading.Thread: + if self._cancel_thread is None: + self._cancel_thread = threading.Thread( + target=lambda: self._consume_cancel(), + daemon=True, + ) + return self._cancel_thread + + @property + def run_thread(self) -> threading.Thread: + if self._run_thread is None: + self._run_thread = threading.Thread( + target=lambda: self._consume_run(), + daemon=True, + ) + return self._run_thread + + @property + def stop_consuming(self) -> threading.Event: + if self._stop_consuming is None: + self._stop_consuming = threading.Event() + return self._stop_consuming + + @property + def executor(self) -> ThreadPoolExecutor: + if self._executor is None: + self._executor = ThreadPoolExecutor( + max_workers=self.pool_size, + initializer=init_worker, + ) + return self._executor + + @property + def cancel_client(self) -> SyncRabbitMQ: + if self._cancel_client is None: + self._cancel_client = SyncRabbitMQ(create_copilot_queue_config()) + return self._cancel_client + + @property + def run_client(self) -> SyncRabbitMQ: + if self._run_client is None: + self._run_client = SyncRabbitMQ(create_copilot_queue_config()) + return self._run_client diff --git a/autogpt_platform/backend/backend/copilot/executor/processor.py b/autogpt_platform/backend/backend/copilot/executor/processor.py new file mode 100644 index 0000000000..85f555672a --- /dev/null +++ b/autogpt_platform/backend/backend/copilot/executor/processor.py @@ -0,0 +1,252 @@ +"""CoPilot execution processor - per-worker execution logic. + +This module contains the processor class that handles CoPilot task execution +in a thread-local context, following the graph executor pattern. +""" + +import asyncio +import logging +import threading +import time + +from backend.copilot import service as copilot_service +from backend.copilot import stream_registry +from backend.copilot.response_model import StreamError, StreamFinish, StreamFinishStep +from backend.executor.cluster_lock import ClusterLock +from backend.util.decorator import error_logged +from backend.util.logging import TruncatedLogger, configure_logging +from backend.util.process import set_service_name +from backend.util.retry import func_retry + +from .utils import CoPilotExecutionEntry, CoPilotLogMetadata + +logger = TruncatedLogger(logging.getLogger(__name__), prefix="[CoPilotExecutor]") + + +# ============ Module Entry Points ============ # + +# Thread-local storage for processor instances +_tls = threading.local() + + +def execute_copilot_task( + entry: CoPilotExecutionEntry, + cancel: threading.Event, + cluster_lock: ClusterLock, +): + """Execute a CoPilot task using the thread-local processor. + + This function is the entry point called by the thread pool executor. + + Args: + entry: The task payload + cancel: Threading event to signal cancellation + cluster_lock: Distributed lock for this execution + """ + processor: CoPilotProcessor = _tls.processor + return processor.execute(entry, cancel, cluster_lock) + + +def init_worker(): + """Initialize the processor for the current worker thread. + + This function is called by the thread pool executor when a new worker + thread is created. It ensures each worker has its own processor instance. + """ + _tls.processor = CoPilotProcessor() + _tls.processor.on_executor_start() + + +# ============ Processor Class ============ # + + +class CoPilotProcessor: + """Per-worker execution logic for CoPilot tasks. + + This class is instantiated once per worker thread and handles the execution + of CoPilot chat generation tasks. It maintains an async event loop for + running the async service code. + + The execution flow: + 1. CoPilot task is picked from RabbitMQ queue + 2. Manager submits task to thread pool + 3. Processor executes the task in its event loop + 4. Results are published to Redis Streams + """ + + @func_retry + def on_executor_start(self): + """Initialize the processor when the worker thread starts. + + This method is called once per worker thread to set up the async event + loop, connect to Prisma, and initialize any required resources. + """ + configure_logging() + set_service_name("CoPilotExecutor") + self.tid = threading.get_ident() + self.execution_loop = asyncio.new_event_loop() + self.execution_thread = threading.Thread( + target=self.execution_loop.run_forever, daemon=True + ) + self.execution_thread.start() + + # Connect to Prisma in the worker's event loop + # This is required because the CoPilot service uses Prisma directly + # TODO: Use DatabaseManager, avoid direct Prisma connection(?) + asyncio.run_coroutine_threadsafe( + self._connect_prisma(), self.execution_loop + ).result(timeout=30.0) + + logger.info(f"[CoPilotExecutor] Worker {self.tid} started") + + async def _connect_prisma(self): + """Connect to Prisma database in the worker's event loop.""" + from backend.data import db + + if not db.is_connected(): + await db.connect() + logger.info(f"[CoPilotExecutor] Worker {self.tid} connected to Prisma") + + @error_logged(swallow=False) + def execute( + self, + entry: CoPilotExecutionEntry, + cancel: threading.Event, + cluster_lock: ClusterLock, + ): + """Execute a CoPilot task. + + This is the main entry point for task execution. It runs the async + execution logic in the worker's event loop and handles errors. + + Args: + entry: The task payload containing session and message info + cancel: Threading event to signal cancellation + cluster_lock: Distributed lock to prevent duplicate execution + """ + log = CoPilotLogMetadata( + logging.getLogger(__name__), + task_id=entry.task_id, + session_id=entry.session_id, + user_id=entry.user_id, + ) + log.info("Starting execution") + + start_time = time.monotonic() + + try: + # Run the async execution in our event loop + future = asyncio.run_coroutine_threadsafe( + self._execute_async(entry, cancel, cluster_lock, log), + self.execution_loop, + ) + + # Wait for completion, checking cancel periodically + while not future.done(): + try: + future.result(timeout=1.0) + except asyncio.TimeoutError: + if cancel.is_set(): + log.info("Cancellation requested") + future.cancel() + break + # Refresh cluster lock to maintain ownership + cluster_lock.refresh() + + if not future.cancelled(): + # Get result to propagate any exceptions + future.result() + + elapsed = time.monotonic() - start_time + log.info(f"Execution completed in {elapsed:.2f}s") + + except Exception as e: + elapsed = time.monotonic() - start_time + log.error(f"Execution failed after {elapsed:.2f}s: {e}") + # Ensure task is marked as failed in stream registry + asyncio.run_coroutine_threadsafe( + self._mark_task_failed(entry.task_id, str(e)), + self.execution_loop, + ).result(timeout=10.0) + raise + + async def _execute_async( + self, + entry: CoPilotExecutionEntry, + cancel: threading.Event, + cluster_lock: ClusterLock, + log: CoPilotLogMetadata, + ): + """Async execution logic for CoPilot task. + + This method calls the existing stream_chat_completion service function + and publishes results to the stream registry. + + Args: + entry: The task payload + cancel: Threading event to signal cancellation + cluster_lock: Distributed lock for refresh + log: Structured logger for this task + """ + last_refresh = time.monotonic() + refresh_interval = 30.0 # Refresh lock every 30 seconds + + try: + # Stream chat completion and publish chunks to Redis + async for chunk in copilot_service.stream_chat_completion( + session_id=entry.session_id, + message=entry.message if entry.message else None, + is_user_message=entry.is_user_message, + user_id=entry.user_id, + context=entry.context, + _task_id=entry.task_id, + ): + # Check for cancellation + if cancel.is_set(): + log.info("Cancelled during streaming") + await stream_registry.publish_chunk( + entry.task_id, StreamError(errorText="Operation cancelled") + ) + await stream_registry.publish_chunk( + entry.task_id, StreamFinishStep() + ) + await stream_registry.publish_chunk(entry.task_id, StreamFinish()) + await stream_registry.mark_task_completed( + entry.task_id, status="failed" + ) + return + + # Refresh cluster lock periodically + current_time = time.monotonic() + if current_time - last_refresh >= refresh_interval: + cluster_lock.refresh() + last_refresh = current_time + + # Publish chunk to stream registry + await stream_registry.publish_chunk(entry.task_id, chunk) + + # Mark task as completed + await stream_registry.mark_task_completed(entry.task_id, status="completed") + log.info("Task completed successfully") + + except asyncio.CancelledError: + log.info("Task cancelled") + await stream_registry.mark_task_completed(entry.task_id, status="failed") + raise + + except Exception as e: + log.error(f"Task failed: {e}") + await self._mark_task_failed(entry.task_id, str(e)) + raise + + async def _mark_task_failed(self, task_id: str, error_message: str): + """Mark a task as failed and publish error to stream registry.""" + try: + await stream_registry.publish_chunk( + task_id, StreamError(errorText=error_message) + ) + await stream_registry.publish_chunk(task_id, StreamFinishStep()) + await stream_registry.publish_chunk(task_id, StreamFinish()) + await stream_registry.mark_task_completed(task_id, status="failed") + except Exception as e: + logger.error(f"Failed to mark task {task_id} as failed: {e}") diff --git a/autogpt_platform/backend/backend/copilot/executor/utils.py b/autogpt_platform/backend/backend/copilot/executor/utils.py new file mode 100644 index 0000000000..473a1ff80a --- /dev/null +++ b/autogpt_platform/backend/backend/copilot/executor/utils.py @@ -0,0 +1,207 @@ +"""RabbitMQ queue configuration for CoPilot executor. + +Defines two exchanges and queues following the graph executor pattern: +- 'copilot_execution' (DIRECT) for chat generation tasks +- 'copilot_cancel' (FANOUT) for cancellation requests +""" + +import logging + +from pydantic import BaseModel + +from backend.data.rabbitmq import Exchange, ExchangeType, Queue, RabbitMQConfig +from backend.util.logging import TruncatedLogger, is_structured_logging_enabled + +logger = logging.getLogger(__name__) + + +# ============ Logging Helper ============ # + + +class CoPilotLogMetadata(TruncatedLogger): + """Structured logging helper for CoPilot executor. + + In cloud environments (structured logging enabled), uses a simple prefix + and passes metadata via json_fields. In local environments, uses a detailed + prefix with all metadata key-value pairs for easier debugging. + + Args: + logger: The underlying logger instance + max_length: Maximum log message length before truncation + **kwargs: Metadata key-value pairs (e.g., task_id="abc", session_id="xyz") + These are added to json_fields in cloud mode, or to the prefix in local mode. + """ + + def __init__( + self, + logger: logging.Logger, + max_length: int = 1000, + **kwargs: str | None, + ): + # Filter out None values + metadata = {k: v for k, v in kwargs.items() if v is not None} + metadata["component"] = "CoPilotExecutor" + + if is_structured_logging_enabled(): + prefix = "[CoPilotExecutor]" + else: + # Build prefix from metadata key-value pairs + meta_parts = "|".join( + f"{k}:{v}" for k, v in metadata.items() if k != "component" + ) + prefix = ( + f"[CoPilotExecutor|{meta_parts}]" if meta_parts else "[CoPilotExecutor]" + ) + + super().__init__( + logger, + max_length=max_length, + prefix=prefix, + metadata=metadata, + ) + + +# ============ Exchange and Queue Configuration ============ # + +COPILOT_EXECUTION_EXCHANGE = Exchange( + name="copilot_execution", + type=ExchangeType.DIRECT, + durable=True, + auto_delete=False, +) +COPILOT_EXECUTION_QUEUE_NAME = "copilot_execution_queue" +COPILOT_EXECUTION_ROUTING_KEY = "copilot.run" + +COPILOT_CANCEL_EXCHANGE = Exchange( + name="copilot_cancel", + type=ExchangeType.FANOUT, + durable=True, + auto_delete=True, +) +COPILOT_CANCEL_QUEUE_NAME = "copilot_cancel_queue" + +# CoPilot operations can include extended thinking and agent generation +# which may take 30+ minutes to complete +COPILOT_CONSUMER_TIMEOUT_SECONDS = 60 * 60 # 1 hour + +# Graceful shutdown timeout - allow in-flight operations to complete +GRACEFUL_SHUTDOWN_TIMEOUT_SECONDS = 30 * 60 # 30 minutes + + +def create_copilot_queue_config() -> RabbitMQConfig: + """Create RabbitMQ configuration for CoPilot executor. + + Defines two exchanges and queues: + - 'copilot_execution' (DIRECT) for chat generation tasks + - 'copilot_cancel' (FANOUT) for cancellation requests + + Returns: + RabbitMQConfig with exchanges and queues defined + """ + run_queue = Queue( + name=COPILOT_EXECUTION_QUEUE_NAME, + exchange=COPILOT_EXECUTION_EXCHANGE, + routing_key=COPILOT_EXECUTION_ROUTING_KEY, + durable=True, + auto_delete=False, + arguments={ + # Extended consumer timeout for long-running LLM operations + # Default 30-minute timeout is insufficient for extended thinking + # and agent generation which can take 30+ minutes + "x-consumer-timeout": COPILOT_CONSUMER_TIMEOUT_SECONDS + * 1000, + }, + ) + cancel_queue = Queue( + name=COPILOT_CANCEL_QUEUE_NAME, + exchange=COPILOT_CANCEL_EXCHANGE, + routing_key="", # not used for FANOUT + durable=True, + auto_delete=False, + ) + return RabbitMQConfig( + vhost="/", + exchanges=[COPILOT_EXECUTION_EXCHANGE, COPILOT_CANCEL_EXCHANGE], + queues=[run_queue, cancel_queue], + ) + + +# ============ Message Models ============ # + + +class CoPilotExecutionEntry(BaseModel): + """Task payload for CoPilot AI generation. + + This model represents a chat generation task to be processed by the executor. + """ + + task_id: str + """Unique identifier for this task (used for stream registry)""" + + session_id: str + """Chat session ID""" + + user_id: str | None + """User ID (may be None for anonymous users)""" + + operation_id: str + """Operation ID for webhook callbacks and completion tracking""" + + message: str + """User's message to process""" + + is_user_message: bool = True + """Whether the message is from the user (vs system/assistant)""" + + context: dict[str, str] | None = None + """Optional context for the message (e.g., {url: str, content: str})""" + + +class CancelCoPilotEvent(BaseModel): + """Event to cancel a CoPilot operation.""" + + task_id: str + """Task ID to cancel""" + + +# ============ Queue Publishing Helpers ============ # + + +async def enqueue_copilot_task( + task_id: str, + session_id: str, + user_id: str | None, + operation_id: str, + message: str, + is_user_message: bool = True, + context: dict[str, str] | None = None, +) -> None: + """Enqueue a CoPilot task for processing by the executor service. + + Args: + task_id: Unique identifier for this task (used for stream registry) + session_id: Chat session ID + user_id: User ID (may be None for anonymous users) + operation_id: Operation ID for webhook callbacks and completion tracking + message: User's message to process + is_user_message: Whether the message is from the user (vs system/assistant) + context: Optional context for the message (e.g., {url: str, content: str}) + """ + from backend.util.clients import get_async_copilot_queue + + entry = CoPilotExecutionEntry( + task_id=task_id, + session_id=session_id, + user_id=user_id, + operation_id=operation_id, + message=message, + is_user_message=is_user_message, + context=context, + ) + + queue_client = await get_async_copilot_queue() + await queue_client.publish_message( + routing_key=COPILOT_EXECUTION_ROUTING_KEY, + message=entry.model_dump_json(), + exchange=COPILOT_EXECUTION_EXCHANGE, + ) diff --git a/autogpt_platform/backend/backend/api/features/chat/model.py b/autogpt_platform/backend/backend/copilot/model.py similarity index 100% rename from autogpt_platform/backend/backend/api/features/chat/model.py rename to autogpt_platform/backend/backend/copilot/model.py diff --git a/autogpt_platform/backend/backend/api/features/chat/model_test.py b/autogpt_platform/backend/backend/copilot/model_test.py similarity index 100% rename from autogpt_platform/backend/backend/api/features/chat/model_test.py rename to autogpt_platform/backend/backend/copilot/model_test.py diff --git a/autogpt_platform/backend/backend/api/features/chat/response_model.py b/autogpt_platform/backend/backend/copilot/response_model.py similarity index 100% rename from autogpt_platform/backend/backend/api/features/chat/response_model.py rename to autogpt_platform/backend/backend/copilot/response_model.py diff --git a/autogpt_platform/backend/backend/api/features/chat/service.py b/autogpt_platform/backend/backend/copilot/service.py similarity index 100% rename from autogpt_platform/backend/backend/api/features/chat/service.py rename to autogpt_platform/backend/backend/copilot/service.py diff --git a/autogpt_platform/backend/backend/api/features/chat/service_test.py b/autogpt_platform/backend/backend/copilot/service_test.py similarity index 100% rename from autogpt_platform/backend/backend/api/features/chat/service_test.py rename to autogpt_platform/backend/backend/copilot/service_test.py diff --git a/autogpt_platform/backend/backend/api/features/chat/stream_registry.py b/autogpt_platform/backend/backend/copilot/stream_registry.py similarity index 100% rename from autogpt_platform/backend/backend/api/features/chat/stream_registry.py rename to autogpt_platform/backend/backend/copilot/stream_registry.py diff --git a/autogpt_platform/backend/backend/api/features/chat/tools/IDEAS.md b/autogpt_platform/backend/backend/copilot/tools/IDEAS.md similarity index 100% rename from autogpt_platform/backend/backend/api/features/chat/tools/IDEAS.md rename to autogpt_platform/backend/backend/copilot/tools/IDEAS.md diff --git a/autogpt_platform/backend/backend/api/features/chat/tools/__init__.py b/autogpt_platform/backend/backend/copilot/tools/__init__.py similarity index 93% rename from autogpt_platform/backend/backend/api/features/chat/tools/__init__.py rename to autogpt_platform/backend/backend/copilot/tools/__init__.py index dcbc35ef37..e24c927112 100644 --- a/autogpt_platform/backend/backend/api/features/chat/tools/__init__.py +++ b/autogpt_platform/backend/backend/copilot/tools/__init__.py @@ -3,8 +3,8 @@ from typing import TYPE_CHECKING, Any from openai.types.chat import ChatCompletionToolParam -from backend.api.features.chat.model import ChatSession -from backend.api.features.chat.tracking import track_tool_called +from backend.copilot.model import ChatSession +from backend.copilot.tracking import track_tool_called from .add_understanding import AddUnderstandingTool from .agent_output import AgentOutputTool @@ -27,7 +27,7 @@ from .workspace_files import ( ) if TYPE_CHECKING: - from backend.api.features.chat.response_model import StreamToolOutputAvailable + from backend.copilot.response_model import StreamToolOutputAvailable logger = logging.getLogger(__name__) diff --git a/autogpt_platform/backend/backend/api/features/chat/tools/_test_data.py b/autogpt_platform/backend/backend/copilot/tools/_test_data.py similarity index 99% rename from autogpt_platform/backend/backend/api/features/chat/tools/_test_data.py rename to autogpt_platform/backend/backend/copilot/tools/_test_data.py index a8f208ebb0..c1d45d00df 100644 --- a/autogpt_platform/backend/backend/api/features/chat/tools/_test_data.py +++ b/autogpt_platform/backend/backend/copilot/tools/_test_data.py @@ -6,11 +6,11 @@ import pytest from prisma.types import ProfileCreateInput from pydantic import SecretStr -from backend.api.features.chat.model import ChatSession from backend.api.features.store import db as store_db from backend.blocks.firecrawl.scrape import FirecrawlScrapeBlock from backend.blocks.io import AgentInputBlock, AgentOutputBlock from backend.blocks.llm import AITextGeneratorBlock +from backend.copilot.model import ChatSession from backend.data.db import prisma from backend.data.graph import Graph, Link, Node, create_graph from backend.data.model import APIKeyCredentials diff --git a/autogpt_platform/backend/backend/api/features/chat/tools/add_understanding.py b/autogpt_platform/backend/backend/copilot/tools/add_understanding.py similarity index 98% rename from autogpt_platform/backend/backend/api/features/chat/tools/add_understanding.py rename to autogpt_platform/backend/backend/copilot/tools/add_understanding.py index fe3d5e8984..6da759d3cf 100644 --- a/autogpt_platform/backend/backend/api/features/chat/tools/add_understanding.py +++ b/autogpt_platform/backend/backend/copilot/tools/add_understanding.py @@ -3,7 +3,7 @@ import logging from typing import Any -from backend.api.features.chat.model import ChatSession +from backend.copilot.model import ChatSession from backend.data.understanding import ( BusinessUnderstandingInput, upsert_business_understanding, diff --git a/autogpt_platform/backend/backend/api/features/chat/tools/agent_generator/__init__.py b/autogpt_platform/backend/backend/copilot/tools/agent_generator/__init__.py similarity index 100% rename from autogpt_platform/backend/backend/api/features/chat/tools/agent_generator/__init__.py rename to autogpt_platform/backend/backend/copilot/tools/agent_generator/__init__.py diff --git a/autogpt_platform/backend/backend/api/features/chat/tools/agent_generator/core.py b/autogpt_platform/backend/backend/copilot/tools/agent_generator/core.py similarity index 100% rename from autogpt_platform/backend/backend/api/features/chat/tools/agent_generator/core.py rename to autogpt_platform/backend/backend/copilot/tools/agent_generator/core.py diff --git a/autogpt_platform/backend/backend/api/features/chat/tools/agent_generator/errors.py b/autogpt_platform/backend/backend/copilot/tools/agent_generator/errors.py similarity index 100% rename from autogpt_platform/backend/backend/api/features/chat/tools/agent_generator/errors.py rename to autogpt_platform/backend/backend/copilot/tools/agent_generator/errors.py diff --git a/autogpt_platform/backend/backend/api/features/chat/tools/agent_generator/service.py b/autogpt_platform/backend/backend/copilot/tools/agent_generator/service.py similarity index 100% rename from autogpt_platform/backend/backend/api/features/chat/tools/agent_generator/service.py rename to autogpt_platform/backend/backend/copilot/tools/agent_generator/service.py diff --git a/autogpt_platform/backend/backend/api/features/chat/tools/agent_output.py b/autogpt_platform/backend/backend/copilot/tools/agent_output.py similarity index 99% rename from autogpt_platform/backend/backend/api/features/chat/tools/agent_output.py rename to autogpt_platform/backend/backend/copilot/tools/agent_output.py index 457e4a4f9b..96491b749a 100644 --- a/autogpt_platform/backend/backend/api/features/chat/tools/agent_output.py +++ b/autogpt_platform/backend/backend/copilot/tools/agent_output.py @@ -7,9 +7,9 @@ from typing import Any from pydantic import BaseModel, field_validator -from backend.api.features.chat.model import ChatSession from backend.api.features.library import db as library_db from backend.api.features.library.model import LibraryAgent +from backend.copilot.model import ChatSession from backend.data import execution as execution_db from backend.data.execution import ExecutionStatus, GraphExecution, GraphExecutionMeta diff --git a/autogpt_platform/backend/backend/api/features/chat/tools/agent_search.py b/autogpt_platform/backend/backend/copilot/tools/agent_search.py similarity index 100% rename from autogpt_platform/backend/backend/api/features/chat/tools/agent_search.py rename to autogpt_platform/backend/backend/copilot/tools/agent_search.py diff --git a/autogpt_platform/backend/backend/api/features/chat/tools/base.py b/autogpt_platform/backend/backend/copilot/tools/base.py similarity index 96% rename from autogpt_platform/backend/backend/api/features/chat/tools/base.py rename to autogpt_platform/backend/backend/copilot/tools/base.py index 809e06632b..e821b1844f 100644 --- a/autogpt_platform/backend/backend/api/features/chat/tools/base.py +++ b/autogpt_platform/backend/backend/copilot/tools/base.py @@ -5,8 +5,8 @@ from typing import Any from openai.types.chat import ChatCompletionToolParam -from backend.api.features.chat.model import ChatSession -from backend.api.features.chat.response_model import StreamToolOutputAvailable +from backend.copilot.model import ChatSession +from backend.copilot.response_model import StreamToolOutputAvailable from .models import ErrorResponse, NeedLoginResponse, ToolResponseBase diff --git a/autogpt_platform/backend/backend/api/features/chat/tools/create_agent.py b/autogpt_platform/backend/backend/copilot/tools/create_agent.py similarity index 99% rename from autogpt_platform/backend/backend/api/features/chat/tools/create_agent.py rename to autogpt_platform/backend/backend/copilot/tools/create_agent.py index 7333851a5b..74d6227b09 100644 --- a/autogpt_platform/backend/backend/api/features/chat/tools/create_agent.py +++ b/autogpt_platform/backend/backend/copilot/tools/create_agent.py @@ -3,7 +3,7 @@ import logging from typing import Any -from backend.api.features.chat.model import ChatSession +from backend.copilot.model import ChatSession from .agent_generator import ( AgentGeneratorNotConfiguredError, diff --git a/autogpt_platform/backend/backend/api/features/chat/tools/customize_agent.py b/autogpt_platform/backend/backend/copilot/tools/customize_agent.py similarity index 99% rename from autogpt_platform/backend/backend/api/features/chat/tools/customize_agent.py rename to autogpt_platform/backend/backend/copilot/tools/customize_agent.py index c0568bd936..a85a69196d 100644 --- a/autogpt_platform/backend/backend/api/features/chat/tools/customize_agent.py +++ b/autogpt_platform/backend/backend/copilot/tools/customize_agent.py @@ -3,9 +3,9 @@ import logging from typing import Any -from backend.api.features.chat.model import ChatSession from backend.api.features.store import db as store_db from backend.api.features.store.exceptions import AgentNotFoundError +from backend.copilot.model import ChatSession from .agent_generator import ( AgentGeneratorNotConfiguredError, diff --git a/autogpt_platform/backend/backend/api/features/chat/tools/edit_agent.py b/autogpt_platform/backend/backend/copilot/tools/edit_agent.py similarity index 99% rename from autogpt_platform/backend/backend/api/features/chat/tools/edit_agent.py rename to autogpt_platform/backend/backend/copilot/tools/edit_agent.py index 3ae56407a7..14d3a8d8f9 100644 --- a/autogpt_platform/backend/backend/api/features/chat/tools/edit_agent.py +++ b/autogpt_platform/backend/backend/copilot/tools/edit_agent.py @@ -3,7 +3,7 @@ import logging from typing import Any -from backend.api.features.chat.model import ChatSession +from backend.copilot.model import ChatSession from .agent_generator import ( AgentGeneratorNotConfiguredError, diff --git a/autogpt_platform/backend/backend/api/features/chat/tools/find_agent.py b/autogpt_platform/backend/backend/copilot/tools/find_agent.py similarity index 95% rename from autogpt_platform/backend/backend/api/features/chat/tools/find_agent.py rename to autogpt_platform/backend/backend/copilot/tools/find_agent.py index 477522757d..32e5bce454 100644 --- a/autogpt_platform/backend/backend/api/features/chat/tools/find_agent.py +++ b/autogpt_platform/backend/backend/copilot/tools/find_agent.py @@ -2,7 +2,7 @@ from typing import Any -from backend.api.features.chat.model import ChatSession +from backend.copilot.model import ChatSession from .agent_search import search_agents from .base import BaseTool diff --git a/autogpt_platform/backend/backend/api/features/chat/tools/find_block.py b/autogpt_platform/backend/backend/copilot/tools/find_block.py similarity index 97% rename from autogpt_platform/backend/backend/api/features/chat/tools/find_block.py rename to autogpt_platform/backend/backend/copilot/tools/find_block.py index f55cd567e8..c26f76fc52 100644 --- a/autogpt_platform/backend/backend/api/features/chat/tools/find_block.py +++ b/autogpt_platform/backend/backend/copilot/tools/find_block.py @@ -3,16 +3,16 @@ from typing import Any from prisma.enums import ContentType -from backend.api.features.chat.model import ChatSession -from backend.api.features.chat.tools.base import BaseTool, ToolResponseBase -from backend.api.features.chat.tools.models import ( +from backend.api.features.store.hybrid_search import unified_hybrid_search +from backend.copilot.model import ChatSession +from backend.copilot.tools.base import BaseTool, ToolResponseBase +from backend.copilot.tools.models import ( BlockInfoSummary, BlockInputFieldInfo, BlockListResponse, ErrorResponse, NoResultsResponse, ) -from backend.api.features.store.hybrid_search import unified_hybrid_search from backend.data.block import BlockType, get_block logger = logging.getLogger(__name__) diff --git a/autogpt_platform/backend/backend/api/features/chat/tools/find_block_test.py b/autogpt_platform/backend/backend/copilot/tools/find_block_test.py similarity index 97% rename from autogpt_platform/backend/backend/api/features/chat/tools/find_block_test.py rename to autogpt_platform/backend/backend/copilot/tools/find_block_test.py index 0f3d4cbfa5..f172605374 100644 --- a/autogpt_platform/backend/backend/api/features/chat/tools/find_block_test.py +++ b/autogpt_platform/backend/backend/copilot/tools/find_block_test.py @@ -4,12 +4,12 @@ from unittest.mock import AsyncMock, MagicMock, patch import pytest -from backend.api.features.chat.tools.find_block import ( +from backend.copilot.tools.find_block import ( COPILOT_EXCLUDED_BLOCK_IDS, COPILOT_EXCLUDED_BLOCK_TYPES, FindBlockTool, ) -from backend.api.features.chat.tools.models import BlockListResponse +from backend.copilot.tools.models import BlockListResponse from backend.data.block import BlockType from ._test_data import make_session diff --git a/autogpt_platform/backend/backend/api/features/chat/tools/find_library_agent.py b/autogpt_platform/backend/backend/copilot/tools/find_library_agent.py similarity index 96% rename from autogpt_platform/backend/backend/api/features/chat/tools/find_library_agent.py rename to autogpt_platform/backend/backend/copilot/tools/find_library_agent.py index 108fba75ae..16ae90e40b 100644 --- a/autogpt_platform/backend/backend/api/features/chat/tools/find_library_agent.py +++ b/autogpt_platform/backend/backend/copilot/tools/find_library_agent.py @@ -2,7 +2,7 @@ from typing import Any -from backend.api.features.chat.model import ChatSession +from backend.copilot.model import ChatSession from .agent_search import search_agents from .base import BaseTool diff --git a/autogpt_platform/backend/backend/api/features/chat/tools/get_doc_page.py b/autogpt_platform/backend/backend/copilot/tools/get_doc_page.py similarity index 96% rename from autogpt_platform/backend/backend/api/features/chat/tools/get_doc_page.py rename to autogpt_platform/backend/backend/copilot/tools/get_doc_page.py index 7040cd7db5..c923a133c5 100644 --- a/autogpt_platform/backend/backend/api/features/chat/tools/get_doc_page.py +++ b/autogpt_platform/backend/backend/copilot/tools/get_doc_page.py @@ -4,9 +4,9 @@ import logging from pathlib import Path from typing import Any -from backend.api.features.chat.model import ChatSession -from backend.api.features.chat.tools.base import BaseTool -from backend.api.features.chat.tools.models import ( +from backend.copilot.model import ChatSession +from backend.copilot.tools.base import BaseTool +from backend.copilot.tools.models import ( DocPageResponse, ErrorResponse, ToolResponseBase, diff --git a/autogpt_platform/backend/backend/api/features/chat/tools/helpers.py b/autogpt_platform/backend/backend/copilot/tools/helpers.py similarity index 100% rename from autogpt_platform/backend/backend/api/features/chat/tools/helpers.py rename to autogpt_platform/backend/backend/copilot/tools/helpers.py diff --git a/autogpt_platform/backend/backend/api/features/chat/tools/models.py b/autogpt_platform/backend/backend/copilot/tools/models.py similarity index 100% rename from autogpt_platform/backend/backend/api/features/chat/tools/models.py rename to autogpt_platform/backend/backend/copilot/tools/models.py diff --git a/autogpt_platform/backend/backend/api/features/chat/tools/run_agent.py b/autogpt_platform/backend/backend/copilot/tools/run_agent.py similarity index 98% rename from autogpt_platform/backend/backend/api/features/chat/tools/run_agent.py rename to autogpt_platform/backend/backend/copilot/tools/run_agent.py index a9f19bcf62..8f19ab86cc 100644 --- a/autogpt_platform/backend/backend/api/features/chat/tools/run_agent.py +++ b/autogpt_platform/backend/backend/copilot/tools/run_agent.py @@ -5,13 +5,10 @@ from typing import Any from pydantic import BaseModel, Field, field_validator -from backend.api.features.chat.config import ChatConfig -from backend.api.features.chat.model import ChatSession -from backend.api.features.chat.tracking import ( - track_agent_run_success, - track_agent_scheduled, -) from backend.api.features.library import db as library_db +from backend.copilot.config import ChatConfig +from backend.copilot.model import ChatSession +from backend.copilot.tracking import track_agent_run_success, track_agent_scheduled from backend.data.graph import GraphModel from backend.data.model import CredentialsMetaInput from backend.data.user import get_user_by_id diff --git a/autogpt_platform/backend/backend/api/features/chat/tools/run_agent_test.py b/autogpt_platform/backend/backend/copilot/tools/run_agent_test.py similarity index 100% rename from autogpt_platform/backend/backend/api/features/chat/tools/run_agent_test.py rename to autogpt_platform/backend/backend/copilot/tools/run_agent_test.py diff --git a/autogpt_platform/backend/backend/api/features/chat/tools/run_block.py b/autogpt_platform/backend/backend/copilot/tools/run_block.py similarity index 99% rename from autogpt_platform/backend/backend/api/features/chat/tools/run_block.py rename to autogpt_platform/backend/backend/copilot/tools/run_block.py index fc4a470fdd..dd813398de 100644 --- a/autogpt_platform/backend/backend/api/features/chat/tools/run_block.py +++ b/autogpt_platform/backend/backend/copilot/tools/run_block.py @@ -7,8 +7,8 @@ from typing import Any from pydantic_core import PydanticUndefined -from backend.api.features.chat.model import ChatSession -from backend.api.features.chat.tools.find_block import ( +from backend.copilot.model import ChatSession +from backend.copilot.tools.find_block import ( COPILOT_EXCLUDED_BLOCK_IDS, COPILOT_EXCLUDED_BLOCK_TYPES, ) diff --git a/autogpt_platform/backend/backend/api/features/chat/tools/run_block_test.py b/autogpt_platform/backend/backend/copilot/tools/run_block_test.py similarity index 96% rename from autogpt_platform/backend/backend/api/features/chat/tools/run_block_test.py rename to autogpt_platform/backend/backend/copilot/tools/run_block_test.py index 2aae45e875..1a860e24d6 100644 --- a/autogpt_platform/backend/backend/api/features/chat/tools/run_block_test.py +++ b/autogpt_platform/backend/backend/copilot/tools/run_block_test.py @@ -4,8 +4,8 @@ from unittest.mock import MagicMock, patch import pytest -from backend.api.features.chat.tools.models import ErrorResponse -from backend.api.features.chat.tools.run_block import RunBlockTool +from backend.copilot.tools.models import ErrorResponse +from backend.copilot.tools.run_block import RunBlockTool from backend.data.block import BlockType from ._test_data import make_session diff --git a/autogpt_platform/backend/backend/api/features/chat/tools/search_docs.py b/autogpt_platform/backend/backend/copilot/tools/search_docs.py similarity index 97% rename from autogpt_platform/backend/backend/api/features/chat/tools/search_docs.py rename to autogpt_platform/backend/backend/copilot/tools/search_docs.py index edb0c0de1e..056b86d4af 100644 --- a/autogpt_platform/backend/backend/api/features/chat/tools/search_docs.py +++ b/autogpt_platform/backend/backend/copilot/tools/search_docs.py @@ -5,16 +5,16 @@ from typing import Any from prisma.enums import ContentType -from backend.api.features.chat.model import ChatSession -from backend.api.features.chat.tools.base import BaseTool -from backend.api.features.chat.tools.models import ( +from backend.api.features.store.hybrid_search import unified_hybrid_search +from backend.copilot.model import ChatSession +from backend.copilot.tools.base import BaseTool +from backend.copilot.tools.models import ( DocSearchResult, DocSearchResultsResponse, ErrorResponse, NoResultsResponse, ToolResponseBase, ) -from backend.api.features.store.hybrid_search import unified_hybrid_search logger = logging.getLogger(__name__) diff --git a/autogpt_platform/backend/backend/api/features/chat/tools/utils.py b/autogpt_platform/backend/backend/copilot/tools/utils.py similarity index 100% rename from autogpt_platform/backend/backend/api/features/chat/tools/utils.py rename to autogpt_platform/backend/backend/copilot/tools/utils.py diff --git a/autogpt_platform/backend/backend/api/features/chat/tools/workspace_files.py b/autogpt_platform/backend/backend/copilot/tools/workspace_files.py similarity index 99% rename from autogpt_platform/backend/backend/api/features/chat/tools/workspace_files.py rename to autogpt_platform/backend/backend/copilot/tools/workspace_files.py index 03532c8fee..50df556f03 100644 --- a/autogpt_platform/backend/backend/api/features/chat/tools/workspace_files.py +++ b/autogpt_platform/backend/backend/copilot/tools/workspace_files.py @@ -6,7 +6,7 @@ from typing import Any, Optional from pydantic import BaseModel -from backend.api.features.chat.model import ChatSession +from backend.copilot.model import ChatSession from backend.data.workspace import get_or_create_workspace from backend.util.settings import Config from backend.util.virus_scanner import scan_content_safe diff --git a/autogpt_platform/backend/backend/api/features/chat/tracking.py b/autogpt_platform/backend/backend/copilot/tracking.py similarity index 100% rename from autogpt_platform/backend/backend/api/features/chat/tracking.py rename to autogpt_platform/backend/backend/copilot/tracking.py diff --git a/autogpt_platform/backend/backend/util/clients.py b/autogpt_platform/backend/backend/util/clients.py index 570e9fa3de..1cf2c6e49d 100644 --- a/autogpt_platform/backend/backend/util/clients.py +++ b/autogpt_platform/backend/backend/util/clients.py @@ -11,7 +11,6 @@ settings = Settings() if TYPE_CHECKING: from openai import AsyncOpenAI - from supabase import AClient, Client from backend.data.execution import ( AsyncRedisExecutionEventBus, @@ -22,6 +21,7 @@ if TYPE_CHECKING: from backend.executor.scheduler import SchedulerClient from backend.integrations.credentials_store import IntegrationCredentialsStore from backend.notifications.notifications import NotificationManagerClient + from supabase import AClient, Client @thread_cached @@ -106,6 +106,20 @@ async def get_async_execution_queue() -> "AsyncRabbitMQ": return client +# ============ CoPilot Queue Helpers ============ # + + +@thread_cached +async def get_async_copilot_queue() -> "AsyncRabbitMQ": + """Get a thread-cached AsyncRabbitMQ CoPilot queue client.""" + from backend.copilot.executor.utils import create_copilot_queue_config + from backend.data.rabbitmq import AsyncRabbitMQ + + client = AsyncRabbitMQ(create_copilot_queue_config()) + await client.connect() + return client + + # ============ Integration Credentials Store ============ # diff --git a/autogpt_platform/backend/backend/util/settings.py b/autogpt_platform/backend/backend/util/settings.py index 50b7428160..f021b44d4e 100644 --- a/autogpt_platform/backend/backend/util/settings.py +++ b/autogpt_platform/backend/backend/util/settings.py @@ -211,16 +211,23 @@ class Config(UpdateTrackingModel["Config"], BaseSettings): description="The port for execution manager daemon to run on", ) + num_copilot_workers: int = Field( + default=10, + ge=1, + le=100, + description="Number of concurrent CoPilot executor workers", + ) + + copilot_executor_port: int = Field( + default=8008, + description="The port for CoPilot executor daemon to run on", + ) + execution_scheduler_port: int = Field( default=8003, description="The port for execution scheduler daemon to run on", ) - agent_server_port: int = Field( - default=8004, - description="The port for agent server daemon to run on", - ) - database_api_port: int = Field( default=8005, description="The port for database server API to run on", diff --git a/autogpt_platform/backend/test/agent_generator/test_core_integration.py b/autogpt_platform/backend/test/agent_generator/test_core_integration.py index 528763e751..74cb890b37 100644 --- a/autogpt_platform/backend/test/agent_generator/test_core_integration.py +++ b/autogpt_platform/backend/test/agent_generator/test_core_integration.py @@ -9,10 +9,8 @@ from unittest.mock import AsyncMock, patch import pytest -from backend.api.features.chat.tools.agent_generator import core -from backend.api.features.chat.tools.agent_generator.core import ( - AgentGeneratorNotConfiguredError, -) +from backend.copilot.tools.agent_generator import core +from backend.copilot.tools.agent_generator.core import AgentGeneratorNotConfiguredError class TestServiceNotConfigured: diff --git a/autogpt_platform/backend/test/agent_generator/test_library_agents.py b/autogpt_platform/backend/test/agent_generator/test_library_agents.py index 8387339582..2007708325 100644 --- a/autogpt_platform/backend/test/agent_generator/test_library_agents.py +++ b/autogpt_platform/backend/test/agent_generator/test_library_agents.py @@ -9,7 +9,7 @@ from unittest.mock import AsyncMock, MagicMock, patch import pytest -from backend.api.features.chat.tools.agent_generator import core +from backend.copilot.tools.agent_generator import core class TestGetLibraryAgentsForGeneration: diff --git a/autogpt_platform/backend/test/agent_generator/test_service.py b/autogpt_platform/backend/test/agent_generator/test_service.py index cc37c428c0..79ad9aba9c 100644 --- a/autogpt_platform/backend/test/agent_generator/test_service.py +++ b/autogpt_platform/backend/test/agent_generator/test_service.py @@ -10,7 +10,7 @@ from unittest.mock import AsyncMock, MagicMock, patch import httpx import pytest -from backend.api.features.chat.tools.agent_generator import service +from backend.copilot.tools.agent_generator import service class TestServiceConfiguration: