diff --git a/autogpt_platform/backend/backend/util/cloud_storage.py b/autogpt_platform/backend/backend/util/cloud_storage.py index 27acc605bb..1cb38d2be6 100644 --- a/autogpt_platform/backend/backend/util/cloud_storage.py +++ b/autogpt_platform/backend/backend/util/cloud_storage.py @@ -9,6 +9,7 @@ import uuid from datetime import datetime, timedelta, timezone from typing import Tuple +import aiohttp from gcloud.aio import storage as async_gcs_storage from google.cloud import storage as gcs_storage @@ -38,20 +39,59 @@ class CloudStorageHandler: self.config = config self._async_gcs_client = None self._sync_gcs_client = None # Only for signed URLs + self._session = None + + async def _get_async_gcs_client(self): + """Get or create async GCS client, ensuring it's created in proper async context.""" + # Check if we already have a client + if self._async_gcs_client is not None: + return self._async_gcs_client + + current_task = asyncio.current_task() + if not current_task: + # If we're not in a task, create a temporary client + logger.warning( + "[CloudStorage] Creating GCS client outside of task context - using temporary client" + ) + timeout = aiohttp.ClientTimeout(total=300) + session = aiohttp.ClientSession( + timeout=timeout, + connector=aiohttp.TCPConnector(limit=100, force_close=False), + ) + return async_gcs_storage.Storage(session=session) + + # Create a reusable session with proper configuration + # Key fix: Don't set timeout on session, let gcloud-aio handle it + self._session = aiohttp.ClientSession( + connector=aiohttp.TCPConnector( + limit=100, # Connection pool limit + force_close=False, # Reuse connections + enable_cleanup_closed=True, + ) + ) + + # Create the GCS client with our session + # The key is NOT setting timeout on the session but letting the library handle it + self._async_gcs_client = async_gcs_storage.Storage(session=self._session) - def _get_async_gcs_client(self): - """Lazy initialization of async GCS client.""" - if self._async_gcs_client is None: - # Use Application Default Credentials (ADC) - self._async_gcs_client = async_gcs_storage.Storage() return self._async_gcs_client async def close(self): """Close all client connections properly.""" if self._async_gcs_client is not None: - await self._async_gcs_client.close() + try: + await self._async_gcs_client.close() + except Exception as e: + logger.warning(f"[CloudStorage] Error closing GCS client: {e}") self._async_gcs_client = None + if self._session is not None: + try: + await self._session.close() + except Exception as e: + logger.warning(f"[CloudStorage] Error closing session: {e}") + self._session = None + async def __aenter__(self): """Async context manager entry.""" return self @@ -141,7 +181,7 @@ class CloudStorageHandler: if user_id and graph_exec_id: raise ValueError("Provide either user_id OR graph_exec_id, not both") - async_client = self._get_async_gcs_client() + async_client = await self._get_async_gcs_client() # Generate unique path with appropriate scope unique_id = str(uuid.uuid4()) @@ -203,6 +243,15 @@ class CloudStorageHandler: self, path: str, user_id: str | None = None, graph_exec_id: str | None = None ) -> bytes: """Retrieve file from Google Cloud Storage with authorization.""" + # Log context for debugging + current_task = asyncio.current_task() + logger.info( + f"[CloudStorage]" + f"_retrieve_file_gcs called - " + f"current_task: {current_task}, " + f"in_task: {current_task is not None}" + ) + # Parse bucket and blob name from path parts = path.split("/", 1) if len(parts) != 2: @@ -213,13 +262,65 @@ class CloudStorageHandler: # Authorization check self._validate_file_access(blob_name, user_id, graph_exec_id) - async_client = self._get_async_gcs_client() + # Use a fresh client for each download to avoid session issues + # This is less efficient but more reliable with the executor's event loop + logger.info("[CloudStorage] Creating fresh GCS client for download") + # Create a new session specifically for this download + session = aiohttp.ClientSession( + connector=aiohttp.TCPConnector(limit=10, force_close=True) + ) + + async_client = None try: - # Download content using pure async client + # Create a new GCS client with the fresh session + async_client = async_gcs_storage.Storage(session=session) + + logger.info( + f"[CloudStorage] About to download from GCS - bucket: {bucket_name}, blob: {blob_name}" + ) + + # Download content using the fresh client content = await async_client.download(bucket_name, blob_name) + logger.info( + f"[CloudStorage] GCS download successful - size: {len(content)} bytes" + ) + + # Clean up + await async_client.close() + await session.close() + return content + except Exception as e: + # Always try to clean up + if async_client is not None: + try: + await async_client.close() + except Exception as cleanup_error: + logger.warning( + f"[CloudStorage] Error closing GCS client: {cleanup_error}" + ) + try: + await session.close() + except Exception as cleanup_error: + logger.warning(f"[CloudStorage] Error closing session: {cleanup_error}") + + # Log the specific error for debugging + logger.error( + f"[CloudStorage] GCS download failed - error: {str(e)}, " + f"error_type: {type(e).__name__}, " + f"bucket: {bucket_name}, blob: redacted for privacy" + ) + + # Special handling for timeout error + if "Timeout context manager" in str(e): + logger.critical( + f"[CloudStorage] TIMEOUT ERROR in GCS download! " + f"current_task: {current_task}, " + f"bucket: {bucket_name}, blob: redacted for privacy" + ) + # Convert gcloud-aio exceptions to standard ones if "404" in str(e) or "Not Found" in str(e): raise FileNotFoundError(f"File not found: gcs://{path}") @@ -303,7 +404,7 @@ class CloudStorageHandler: # Legacy uploads directory (uploads/*) - allow for backwards compatibility with warning # Note: We already validated it starts with "uploads/" above, so this is guaranteed to match - logger.warning(f"Accessing legacy upload path: {blob_name}") + logger.warning(f"[CloudStorage] Accessing legacy upload path: {blob_name}") return async def generate_signed_url( @@ -391,7 +492,7 @@ class CloudStorageHandler: if not self.config.gcs_bucket_name: raise ValueError("GCS_BUCKET_NAME not configured") - async_client = self._get_async_gcs_client() + async_client = await self._get_async_gcs_client() current_time = datetime.now(timezone.utc) try: @@ -431,7 +532,7 @@ class CloudStorageHandler: except Exception as e: # Log specific errors for debugging logger.warning( - f"Failed to process file {blob_name} during cleanup: {e}" + f"[CloudStorage] Failed to process file {blob_name} during cleanup: {e}" ) # Skip files with invalid metadata or delete errors pass @@ -447,7 +548,7 @@ class CloudStorageHandler: except Exception as e: # Log the error for debugging but continue operation - logger.error(f"Cleanup operation failed: {e}") + logger.error(f"[CloudStorage] Cleanup operation failed: {e}") # Return 0 - we'll try again next cleanup cycle return 0 @@ -476,7 +577,7 @@ class CloudStorageHandler: bucket_name, blob_name = parts - async_client = self._get_async_gcs_client() + async_client = await self._get_async_gcs_client() try: # Get object metadata using pure async client @@ -490,11 +591,15 @@ class CloudStorageHandler: except Exception as e: # If file doesn't exist or we can't read metadata if "404" in str(e) or "Not Found" in str(e): - logger.debug(f"File not found during expiration check: {blob_name}") + logger.warning( + f"[CloudStorage] File not found during expiration check: {blob_name}" + ) return True # File doesn't exist, consider it expired # Log other types of errors for debugging - logger.warning(f"Failed to check expiration for {blob_name}: {e}") + logger.warning( + f"[CloudStorage] Failed to check expiration for {blob_name}: {e}" + ) # If we can't read metadata for other reasons, assume not expired return False @@ -544,11 +649,15 @@ async def cleanup_expired_files_async() -> int: # Use cleanup lock to prevent concurrent cleanup operations async with _cleanup_lock: try: - logger.info("Starting cleanup of expired cloud storage files") + logger.info( + "[CloudStorage] Starting cleanup of expired cloud storage files" + ) handler = await get_cloud_storage_handler() deleted_count = await handler.delete_expired_files() - logger.info(f"Cleaned up {deleted_count} expired files from cloud storage") + logger.info( + f"[CloudStorage] Cleaned up {deleted_count} expired files from cloud storage" + ) return deleted_count except Exception as e: - logger.error(f"Error during cloud storage cleanup: {e}") + logger.error(f"[CloudStorage] Error during cloud storage cleanup: {e}") return 0 diff --git a/autogpt_platform/backend/backend/util/cloud_storage_test.py b/autogpt_platform/backend/backend/util/cloud_storage_test.py index 4dd0d79d20..d81187a879 100644 --- a/autogpt_platform/backend/backend/util/cloud_storage_test.py +++ b/autogpt_platform/backend/backend/util/cloud_storage_test.py @@ -72,16 +72,17 @@ class TestCloudStorageHandler: assert call_args[0][2] == content # file content assert "metadata" in call_args[1] # metadata argument - @patch.object(CloudStorageHandler, "_get_async_gcs_client") + @patch("backend.util.cloud_storage.async_gcs_storage.Storage") @pytest.mark.asyncio - async def test_retrieve_file_gcs(self, mock_get_async_client, handler): + async def test_retrieve_file_gcs(self, mock_storage_class, handler): """Test retrieving file from GCS.""" - # Mock async GCS client + # Mock async GCS client instance mock_async_client = AsyncMock() - mock_get_async_client.return_value = mock_async_client + mock_storage_class.return_value = mock_async_client - # Mock the download method + # Mock the download and close methods mock_async_client.download = AsyncMock(return_value=b"test content") + mock_async_client.close = AsyncMock() result = await handler.retrieve_file( "gcs://test-bucket/uploads/system/uuid123/file.txt" @@ -92,16 +93,17 @@ class TestCloudStorageHandler: "test-bucket", "uploads/system/uuid123/file.txt" ) - @patch.object(CloudStorageHandler, "_get_async_gcs_client") + @patch("backend.util.cloud_storage.async_gcs_storage.Storage") @pytest.mark.asyncio - async def test_retrieve_file_not_found(self, mock_get_async_client, handler): + async def test_retrieve_file_not_found(self, mock_storage_class, handler): """Test retrieving non-existent file from GCS.""" - # Mock async GCS client + # Mock async GCS client instance mock_async_client = AsyncMock() - mock_get_async_client.return_value = mock_async_client + mock_storage_class.return_value = mock_async_client # Mock the download method to raise a 404 exception mock_async_client.download = AsyncMock(side_effect=Exception("404 Not Found")) + mock_async_client.close = AsyncMock() with pytest.raises(FileNotFoundError): await handler.retrieve_file( @@ -287,14 +289,15 @@ class TestCloudStorageHandler: ): handler._validate_file_access("invalid/path/file.txt", "user123") - @patch.object(CloudStorageHandler, "_get_async_gcs_client") + @patch("backend.util.cloud_storage.async_gcs_storage.Storage") @pytest.mark.asyncio - async def test_retrieve_file_with_authorization(self, mock_get_client, handler): + async def test_retrieve_file_with_authorization(self, mock_storage_class, handler): """Test file retrieval with authorization.""" - # Mock async GCS client + # Mock async GCS client instance mock_client = AsyncMock() - mock_get_client.return_value = mock_client + mock_storage_class.return_value = mock_client mock_client.download = AsyncMock(return_value=b"test content") + mock_client.close = AsyncMock() # Test successful retrieval of user's own file result = await handler.retrieve_file( @@ -412,18 +415,19 @@ class TestCloudStorageHandler: "uploads/executions/exec123/uuid456/file.txt", graph_exec_id="exec456" ) - @patch.object(CloudStorageHandler, "_get_async_gcs_client") + @patch("backend.util.cloud_storage.async_gcs_storage.Storage") @pytest.mark.asyncio async def test_retrieve_file_with_exec_authorization( - self, mock_get_async_client, handler + self, mock_storage_class, handler ): """Test file retrieval with execution authorization.""" - # Mock async GCS client + # Mock async GCS client instance mock_async_client = AsyncMock() - mock_get_async_client.return_value = mock_async_client + mock_storage_class.return_value = mock_async_client - # Mock the download method + # Mock the download and close methods mock_async_client.download = AsyncMock(return_value=b"test content") + mock_async_client.close = AsyncMock() # Test successful retrieval of execution's own file result = await handler.retrieve_file(