From 15bcdae4e859c138594127ffd05f5f956132ea6f Mon Sep 17 00:00:00 2001 From: Otto Date: Wed, 18 Feb 2026 11:17:39 +0000 Subject: [PATCH] fix(backend/copilot): Clean up GCSWorkspaceStorage per worker (#12153) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The copilot executor runs each worker in its own thread with a dedicated event loop (`asyncio.new_event_loop()`). `aiohttp.ClientSession` is bound to the event loop where it was created — using it from a different loop causes `asyncio.timeout()` to fail with: ``` RuntimeError: Timeout context manager should be used inside a task ``` This was the root cause of transcript upload failures tracked in SECRT-2009 and [Sentry #7272473694](https://significant-gravitas.sentry.io/issues/7272473694/). ### Fix **One `GCSWorkspaceStorage` instance per event loop** instead of a single shared global. - `get_workspace_storage()` now returns a per-loop GCS instance (keyed by `id(asyncio.get_running_loop())`). Local storage remains shared since it has no async I/O. - `shutdown_workspace_storage()` closes the instance for the **current** loop only, so `session.close()` always runs on the loop that created the session. - `CoPilotProcessor.cleanup()` shuts down workspace storage on the worker's own loop, then stops the loop. - Manager cleanup submits `cleanup_worker` to each thread pool worker before shutting down the executor — replacing the old approach of creating a temporary event loop that couldn't close cross-loop sessions. ### Changes | File | Change | |------|--------| | `util/workspace_storage.py` | `GCSWorkspaceStorage` back to simple single-session class; `get_workspace_storage()` returns per-loop GCS instance; `shutdown_workspace_storage()` scoped to current loop | | `copilot/executor/processor.py` | Added `CoPilotProcessor.cleanup()` and `cleanup_worker()` | | `copilot/executor/manager.py` | Calls `cleanup_worker` on each thread pool worker during shutdown | Fixes SECRT-2009 --------- Co-authored-by: Reinier van der Leer --- .../backend/copilot/executor/manager.py | 24 +++-- .../backend/copilot/executor/processor.py | 34 ++++++ .../backend/backend/util/workspace_storage.py | 102 +++++++++++------- 3 files changed, 108 insertions(+), 52 deletions(-) diff --git a/autogpt_platform/backend/backend/copilot/executor/manager.py b/autogpt_platform/backend/backend/copilot/executor/manager.py index 212634d342..1ec26e2be5 100644 --- a/autogpt_platform/backend/backend/copilot/executor/manager.py +++ b/autogpt_platform/backend/backend/copilot/executor/manager.py @@ -164,21 +164,23 @@ class CoPilotExecutor(AppProcess): self._cancel_thread, self.cancel_client, "[cleanup][cancel]" ) - # Shutdown executor + # Clean up worker threads (closes per-loop workspace storage sessions) if self._executor: + from .processor import cleanup_worker + + logger.info(f"[cleanup {pid}] Cleaning up workers...") + futures = [] + for _ in range(self._executor._max_workers): + futures.append(self._executor.submit(cleanup_worker)) + for f in futures: + try: + f.result(timeout=10) + except Exception as e: + logger.warning(f"[cleanup {pid}] Worker cleanup error: {e}") + logger.info(f"[cleanup {pid}] Shutting down executor...") self._executor.shutdown(wait=False) - # Close async resources (workspace storage aiohttp session, etc.) - try: - from backend.util.workspace_storage import shutdown_workspace_storage - - loop = asyncio.new_event_loop() - loop.run_until_complete(shutdown_workspace_storage()) - loop.close() - except Exception as e: - logger.warning(f"[cleanup {pid}] Error closing workspace storage: {e}") - # Release any remaining locks for task_id, lock in list(self._task_locks.items()): try: diff --git a/autogpt_platform/backend/backend/copilot/executor/processor.py b/autogpt_platform/backend/backend/copilot/executor/processor.py index eb941d5efd..c080f23ef1 100644 --- a/autogpt_platform/backend/backend/copilot/executor/processor.py +++ b/autogpt_platform/backend/backend/copilot/executor/processor.py @@ -60,6 +60,18 @@ def init_worker(): _tls.processor.on_executor_start() +def cleanup_worker(): + """Clean up the processor for the current worker thread. + + Should be called before the worker thread's event loop is destroyed so + that event-loop-bound resources (e.g. ``aiohttp.ClientSession``) are + closed on the correct loop. + """ + processor: CoPilotProcessor | None = getattr(_tls, "processor", None) + if processor is not None: + processor.cleanup() + + # ============ Processor Class ============ # @@ -98,6 +110,28 @@ class CoPilotProcessor: logger.info(f"[CoPilotExecutor] Worker {self.tid} started") + def cleanup(self): + """Clean up event-loop-bound resources before the loop is destroyed. + + Shuts down the workspace storage instance that belongs to this + worker's event loop, ensuring ``aiohttp.ClientSession.close()`` + runs on the same loop that created the session. + """ + from backend.util.workspace_storage import shutdown_workspace_storage + + try: + future = asyncio.run_coroutine_threadsafe( + shutdown_workspace_storage(), self.execution_loop + ) + future.result(timeout=5) + except Exception as e: + logger.warning(f"[CoPilotExecutor] Worker {self.tid} cleanup error: {e}") + + # Stop the event loop + self.execution_loop.call_soon_threadsafe(self.execution_loop.stop) + self.execution_thread.join(timeout=5) + logger.info(f"[CoPilotExecutor] Worker {self.tid} cleaned up") + @error_logged(swallow=False) def execute( self, diff --git a/autogpt_platform/backend/backend/util/workspace_storage.py b/autogpt_platform/backend/backend/util/workspace_storage.py index 2f4c8ae2b5..eca41b6616 100644 --- a/autogpt_platform/backend/backend/util/workspace_storage.py +++ b/autogpt_platform/backend/backend/util/workspace_storage.py @@ -93,7 +93,14 @@ class WorkspaceStorageBackend(ABC): class GCSWorkspaceStorage(WorkspaceStorageBackend): - """Google Cloud Storage implementation for workspace storage.""" + """Google Cloud Storage implementation for workspace storage. + + Each instance owns a single ``aiohttp.ClientSession`` and GCS async + client. Because ``ClientSession`` is bound to the event loop on which it + was created, callers that run on separate loops (e.g. copilot executor + worker threads) **must** obtain their own ``GCSWorkspaceStorage`` instance + via :func:`get_workspace_storage` which is event-loop-aware. + """ def __init__(self, bucket_name: str): self.bucket_name = bucket_name @@ -337,60 +344,73 @@ class LocalWorkspaceStorage(WorkspaceStorageBackend): raise ValueError(f"Invalid storage path format: {storage_path}") -# Global storage backend instance -_workspace_storage: Optional[WorkspaceStorageBackend] = None +# --------------------------------------------------------------------------- +# Storage instance management +# --------------------------------------------------------------------------- +# ``aiohttp.ClientSession`` is bound to the event loop where it is created. +# The copilot executor runs each worker in its own thread with a dedicated +# event loop, so a single global ``GCSWorkspaceStorage`` instance would break. +# +# For **local storage** a single shared instance is fine (no async I/O). +# For **GCS storage** we keep one instance *per event loop* so every loop +# gets its own ``ClientSession``. +# --------------------------------------------------------------------------- + +_local_storage: Optional[LocalWorkspaceStorage] = None +_gcs_storages: dict[int, GCSWorkspaceStorage] = {} _storage_lock = asyncio.Lock() async def get_workspace_storage() -> WorkspaceStorageBackend: + """Return a workspace storage backend for the **current** event loop. + + * Local storage → single shared instance (no event-loop affinity). + * GCS storage → one instance per event loop to avoid cross-loop + ``aiohttp`` errors. """ - Get the workspace storage backend instance. + global _local_storage - Uses GCS if media_gcs_bucket_name is configured, otherwise uses local storage. - """ - global _workspace_storage + config = Config() - if _workspace_storage is None: - async with _storage_lock: - if _workspace_storage is None: - config = Config() + # --- Local storage (shared) --- + if not config.media_gcs_bucket_name: + if _local_storage is None: + storage_dir = ( + config.workspace_storage_dir if config.workspace_storage_dir else None + ) + logger.info(f"Using local workspace storage: {storage_dir or 'default'}") + _local_storage = LocalWorkspaceStorage(storage_dir) + return _local_storage - if config.media_gcs_bucket_name: - logger.info( - f"Using GCS workspace storage: {config.media_gcs_bucket_name}" - ) - _workspace_storage = GCSWorkspaceStorage( - config.media_gcs_bucket_name - ) - else: - storage_dir = ( - config.workspace_storage_dir - if config.workspace_storage_dir - else None - ) - logger.info( - f"Using local workspace storage: {storage_dir or 'default'}" - ) - _workspace_storage = LocalWorkspaceStorage(storage_dir) - - return _workspace_storage + # --- GCS storage (per event loop) --- + loop_id = id(asyncio.get_running_loop()) + if loop_id not in _gcs_storages: + logger.info( + f"Creating GCS workspace storage for loop {loop_id}: " + f"{config.media_gcs_bucket_name}" + ) + _gcs_storages[loop_id] = GCSWorkspaceStorage(config.media_gcs_bucket_name) + return _gcs_storages[loop_id] async def shutdown_workspace_storage() -> None: - """ - Properly shutdown the global workspace storage backend. + """Shut down workspace storage for the **current** event loop. - Closes aiohttp sessions and other resources for GCS backend. - Should be called during application shutdown. + Closes the ``aiohttp`` session owned by the current loop's GCS instance. + Each worker thread should call this on its own loop before the loop is + destroyed. The REST API lifespan hook calls it for the main server loop. """ - global _workspace_storage + global _local_storage - if _workspace_storage is not None: - async with _storage_lock: - if _workspace_storage is not None: - if isinstance(_workspace_storage, GCSWorkspaceStorage): - await _workspace_storage.close() - _workspace_storage = None + loop_id = id(asyncio.get_running_loop()) + storage = _gcs_storages.pop(loop_id, None) + if storage is not None: + await storage.close() + + # Clear local storage only when the last GCS instance is gone + # (i.e. full shutdown, not just a single worker stopping). + if not _gcs_storages: + _local_storage = None def compute_file_checksum(content: bytes) -> str: