fix(backend): resolve unclosed HTTP client session errors (#10566)

## Summary

This PR resolves unclosed HTTP client session errors that were occurring
in the backend, particularly during file uploads and service-to-service
communication.

### Key Changes

- **Fixed GCS storage operations**: Convert
`gcloud.aio.storage.Storage()` to use async context managers in
`media.py` and `cloud_storage.py`
- **Enhanced service client cleanup**: Added proper cleanup methods to
`DynamicClient` class in `service.py` with `__del__` fallback and
context manager support
- **Application shutdown cleanup**: Added cloud storage handler cleanup
to FastAPI application lifespan
- **Updated test mocks**: Fixed test fixtures to properly mock async
context manager behavior

### Root Cause Analysis

The "Unclosed client session" and "Unclosed connector" errors were
caused by:

1. **GCS storage clients** not using context managers (agent image
uploads)
2. **Service HTTP clients** (`httpx.Client`/`AsyncClient`) not being
properly cleaned up in the `DynamicClient` class

### Technical Details

- All `gcloud.aio.storage.Storage()` instances now use `async with`
context managers
- `DynamicClient` class now has proper cleanup methods and context
manager support
- Application shutdown hook ensures cloud storage handlers are properly
closed
- Test fixtures updated to mock async context manager protocol

### Testing

-  All media upload tests pass
-  Service client tests pass
-  Linting and formatting pass

## Test plan

- [ ] Deploy to staging environment
- [ ] Monitor logs for "Unclosed client session" errors (should be
eliminated)
- [ ] Verify file upload functionality works correctly
- [ ] Check service-to-service communication operates normally

🤖 Generated with [Claude Code](https://claude.ai/code)

---------

Co-authored-by: Claude <noreply@anthropic.com>
This commit is contained in:
Zamil Majdy
2025-08-08 09:41:41 +04:00
committed by GitHub
parent 378d256b58
commit 3131e2e856
5 changed files with 106 additions and 36 deletions

View File

@@ -40,6 +40,7 @@ from backend.integrations.providers import ProviderName
from backend.server.external.api import external_app
from backend.server.middleware.security import SecurityHeadersMiddleware
from backend.util import json
from backend.util.cloud_storage import shutdown_cloud_storage_handler
settings = backend.util.settings.Settings()
logger = logging.getLogger(__name__)
@@ -75,6 +76,12 @@ async def lifespan_context(app: fastapi.FastAPI):
await backend.data.graph.migrate_llm_models(LlmModel.GPT4O)
with launch_darkly_context():
yield
try:
await shutdown_cloud_storage_handler()
except Exception as e:
logger.warning(f"Error shutting down cloud storage handler: {e}")
await backend.data.db.disconnect()

View File

@@ -33,30 +33,30 @@ async def check_media_exists(user_id: str, filename: str) -> str | None:
if not settings.config.media_gcs_bucket_name:
raise MissingConfigError("GCS media bucket is not configured")
async_client = async_storage.Storage()
bucket_name = settings.config.media_gcs_bucket_name
async with async_storage.Storage() as async_client:
bucket_name = settings.config.media_gcs_bucket_name
# Check images
image_path = f"users/{user_id}/images/{filename}"
try:
await async_client.download_metadata(bucket_name, image_path)
# If we get here, the file exists - construct public URL
return f"https://storage.googleapis.com/{bucket_name}/{image_path}"
except Exception:
# File doesn't exist, continue to check videos
pass
# Check images
image_path = f"users/{user_id}/images/{filename}"
try:
await async_client.download_metadata(bucket_name, image_path)
# If we get here, the file exists - construct public URL
return f"https://storage.googleapis.com/{bucket_name}/{image_path}"
except Exception:
# File doesn't exist, continue to check videos
pass
# Check videos
video_path = f"users/{user_id}/videos/{filename}"
try:
await async_client.download_metadata(bucket_name, video_path)
# If we get here, the file exists - construct public URL
return f"https://storage.googleapis.com/{bucket_name}/{video_path}"
except Exception:
# File doesn't exist
pass
# Check videos
video_path = f"users/{user_id}/videos/{filename}"
try:
await async_client.download_metadata(bucket_name, video_path)
# If we get here, the file exists - construct public URL
return f"https://storage.googleapis.com/{bucket_name}/{video_path}"
except Exception:
# File doesn't exist
pass
return None
return None
async def upload_media(
@@ -177,22 +177,24 @@ async def upload_media(
storage_path = f"users/{user_id}/{media_type}/{unique_filename}"
try:
async_client = async_storage.Storage()
bucket_name = settings.config.media_gcs_bucket_name
async with async_storage.Storage() as async_client:
bucket_name = settings.config.media_gcs_bucket_name
file_bytes = await file.read()
await scan_content_safe(file_bytes, filename=unique_filename)
file_bytes = await file.read()
await scan_content_safe(file_bytes, filename=unique_filename)
# Upload using pure async client
await async_client.upload(
bucket_name, storage_path, file_bytes, content_type=content_type
)
# Upload using pure async client
await async_client.upload(
bucket_name, storage_path, file_bytes, content_type=content_type
)
# Construct public URL
public_url = f"https://storage.googleapis.com/{bucket_name}/{storage_path}"
# Construct public URL
public_url = (
f"https://storage.googleapis.com/{bucket_name}/{storage_path}"
)
logger.info(f"Successfully uploaded file to: {storage_path}")
return public_url
logger.info(f"Successfully uploaded file to: {storage_path}")
return public_url
except Exception as e:
logger.error(f"GCS storage error: {str(e)}")

View File

@@ -26,6 +26,10 @@ def mock_storage_client(mocker):
mock_client = AsyncMock()
mock_client.upload = AsyncMock()
# Mock context manager methods
mock_client.__aenter__ = AsyncMock(return_value=mock_client)
mock_client.__aexit__ = AsyncMock(return_value=None)
# Mock the constructor to return our mock client
mocker.patch(
"backend.server.v2.store.media.async_storage.Storage", return_value=mock_client

View File

@@ -46,6 +46,20 @@ class CloudStorageHandler:
self._async_gcs_client = async_gcs_storage.Storage()
return self._async_gcs_client
async def close(self):
"""Close all client connections properly."""
if self._async_gcs_client is not None:
await self._async_gcs_client.close()
self._async_gcs_client = None
async def __aenter__(self):
"""Async context manager entry."""
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""Async context manager exit."""
await self.close()
def _get_sync_gcs_client(self):
"""Lazy initialization of sync GCS client (only for signed URLs)."""
if self._sync_gcs_client is None:
@@ -507,6 +521,17 @@ async def get_cloud_storage_handler() -> CloudStorageHandler:
return _cloud_storage_handler
async def shutdown_cloud_storage_handler():
"""Properly shutdown the global cloud storage handler."""
global _cloud_storage_handler
if _cloud_storage_handler is not None:
async with _handler_lock:
if _cloud_storage_handler is not None:
await _cloud_storage_handler.close()
_cloud_storage_handler = None
async def cleanup_expired_files_async() -> int:
"""
Clean up expired files from cloud storage.

View File

@@ -406,11 +406,43 @@ def get_service_client(
raise
async def aclose(self) -> None:
self.sync_client.close()
await self.async_client.aclose()
if hasattr(self, "sync_client"):
self.sync_client.close()
if hasattr(self, "async_client"):
await self.async_client.aclose()
def close(self) -> None:
self.sync_client.close()
if hasattr(self, "sync_client"):
self.sync_client.close()
# Note: Cannot close async client synchronously
def __del__(self):
"""Cleanup HTTP clients on garbage collection to prevent resource leaks."""
try:
if hasattr(self, "sync_client"):
self.sync_client.close()
if hasattr(self, "async_client"):
# Note: Can't await in __del__, so we just close sync
# The async client will be cleaned up by garbage collection
import warnings
warnings.warn(
"DynamicClient async client not explicitly closed. "
"Call aclose() before destroying the client.",
ResourceWarning,
stacklevel=2,
)
except Exception:
# Silently ignore cleanup errors in __del__
pass
async def __aenter__(self):
"""Async context manager entry."""
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""Async context manager exit."""
await self.aclose()
def _get_params(
self, signature: inspect.Signature, *args: Any, **kwargs: Any