From 5fbd9052d303ce86eee4b503873561d7b43b5908 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Vit=C3=B3ria=20Silva?= Date: Thu, 4 Dec 2025 11:51:13 +0000 Subject: [PATCH] Refactor bulk import to use thread pool executor Replaces FastAPI BackgroundTasks with a ThreadPoolExecutor for bulk activity import, queuing all files for processing in a single thread pool task. Adds process_all_files_sync to handle sequential file processing and database session management, improving scalability and resource usage. #399 --- backend/app/activities/activity/router.py | 34 +++++++++++-------- backend/app/activities/activity/utils.py | 41 +++++++++++++++++++++++ 2 files changed, 61 insertions(+), 14 deletions(-) diff --git a/backend/app/activities/activity/router.py b/backend/app/activities/activity/router.py index 76426dddb..9d3971812 100644 --- a/backend/app/activities/activity/router.py +++ b/backend/app/activities/activity/router.py @@ -1,8 +1,11 @@ import calendar import glob import os +import asyncio from datetime import date, datetime, timedelta, timezone from typing import Annotated, Callable +from concurrent.futures import ThreadPoolExecutor +from functools import partial import activities.activity.crud as activities_crud import activities.activity.dependencies as activities_dependencies @@ -20,7 +23,6 @@ import strava.activity_utils as strava_activity_utils import websocket.schema as websocket_schema from fastapi import ( APIRouter, - BackgroundTasks, Depends, HTTPException, Security, @@ -33,6 +35,9 @@ from sqlalchemy.orm import Session # Define the API router router = APIRouter() +# Define the thread pool executor with 2 workers +executor = ThreadPoolExecutor(max_workers=2) + @router.get( "/user/{user_id}/week/{week_number}", @@ -646,15 +651,10 @@ async def create_activity_with_bulk_import( _check_scopes: Annotated[ Callable, Security(auth_security.check_scopes, scopes=["activities:write"]) ], - db: Annotated[ - Session, - Depends(core_database.get_db), - ], websocket_manager: Annotated[ websocket_schema.WebSocketManager, Depends(websocket_schema.get_websocket_manager), ], - background_tasks: BackgroundTasks, ): try: core_logger.print_to_log_and_console("Bulk import initiated.") @@ -667,6 +667,7 @@ async def create_activity_with_bulk_import( supported_file_formats = core_config.SUPPORTED_FILE_FORMATS # Iterate over each file in the 'bulk_import' directory + files_to_process = [] for filename in os.listdir(bulk_import_dir): file_path = os.path.join(bulk_import_dir, filename) @@ -680,18 +681,23 @@ async def create_activity_with_bulk_import( continue if os.path.isfile(file_path): + files_to_process.append(file_path) # Log the file being processed core_logger.print_to_log_and_console( f"Queuing file for processing: {file_path}" ) - # Parse and store the activity - background_tasks.add_task( - activities_utils.parse_and_store_activity_from_file, - token_user_id, - file_path, - websocket_manager, - db, - ) + + # Submit ONE task that processes all files + loop = asyncio.get_event_loop() + loop.run_in_executor( + executor, + partial( + activities_utils.process_all_files_sync, + token_user_id, + files_to_process, + websocket_manager, + ), + ) # Log a success message that explains processing will continue elsewhere. core_logger.print_to_log_and_console( diff --git a/backend/app/activities/activity/utils.py b/backend/app/activities/activity/utils.py index 8f80dab41..e2d6ff597 100644 --- a/backend/app/activities/activity/utils.py +++ b/backend/app/activities/activity/utils.py @@ -1,6 +1,7 @@ import gzip import os import shutil +import asyncio from pathlib import Path from tempfile import NamedTemporaryFile @@ -44,6 +45,7 @@ import fit.utils as fit_utils import core.logger as core_logger import core.config as core_config +import core.database as core_database # Global Activity Type Mappings (ID to Name) ACTIVITY_ID_TO_NAME = { @@ -1171,3 +1173,42 @@ def set_activity_name_based_on_activity_type(activity_type_id: int) -> str: # If type is not 10 (Workout), return the mapping with " workout" suffix return mapping + " workout" if mapping != "Workout" else mapping + + +def process_all_files_sync( + user_id: int, + file_paths: list[str], + websocket_manager: websocket_schema.WebSocketManager, +): + """ + Process all files sequentially in single thread. + + Args: + user_id: User ID. + file_paths: List of file paths to process. + websocket_manager: WebSocket manager instance. + """ + db = next(core_database.get_db()) + try: + total_files = len(file_paths) + for idx, file_path in enumerate(file_paths, 1): + core_logger.print_to_log_and_console( + f"Processing file {idx}/{total_files}: " f"{file_path}" + ) + asyncio.run( + parse_and_store_activity_from_file( + user_id, + file_path, + websocket_manager, + db, + ) + ) + # Small delay between files + time.sleep(0.1) + + core_logger.print_to_log_and_console( + f"Bulk import completed: {total_files} files " + f"processed for user {user_id}" + ) + finally: + db.close()