Refactor bulk import to use thread pool executor

Replaces FastAPI BackgroundTasks with a ThreadPoolExecutor for bulk activity import, queuing all files for processing in a single thread pool task. Adds process_all_files_sync to handle sequential file processing and database session management, improving scalability and resource usage. #399
This commit is contained in:
João Vitória Silva
2025-12-04 11:51:13 +00:00
parent caba01be97
commit 5fbd9052d3
2 changed files with 61 additions and 14 deletions

View File

@@ -1,8 +1,11 @@
import calendar
import glob
import os
import asyncio
from datetime import date, datetime, timedelta, timezone
from typing import Annotated, Callable
from concurrent.futures import ThreadPoolExecutor
from functools import partial
import activities.activity.crud as activities_crud
import activities.activity.dependencies as activities_dependencies
@@ -20,7 +23,6 @@ import strava.activity_utils as strava_activity_utils
import websocket.schema as websocket_schema
from fastapi import (
APIRouter,
BackgroundTasks,
Depends,
HTTPException,
Security,
@@ -33,6 +35,9 @@ from sqlalchemy.orm import Session
# Define the API router
router = APIRouter()
# Define the thread pool executor with 2 workers
executor = ThreadPoolExecutor(max_workers=2)
@router.get(
"/user/{user_id}/week/{week_number}",
@@ -646,15 +651,10 @@ async def create_activity_with_bulk_import(
_check_scopes: Annotated[
Callable, Security(auth_security.check_scopes, scopes=["activities:write"])
],
db: Annotated[
Session,
Depends(core_database.get_db),
],
websocket_manager: Annotated[
websocket_schema.WebSocketManager,
Depends(websocket_schema.get_websocket_manager),
],
background_tasks: BackgroundTasks,
):
try:
core_logger.print_to_log_and_console("Bulk import initiated.")
@@ -667,6 +667,7 @@ async def create_activity_with_bulk_import(
supported_file_formats = core_config.SUPPORTED_FILE_FORMATS
# Iterate over each file in the 'bulk_import' directory
files_to_process = []
for filename in os.listdir(bulk_import_dir):
file_path = os.path.join(bulk_import_dir, filename)
@@ -680,18 +681,23 @@ async def create_activity_with_bulk_import(
continue
if os.path.isfile(file_path):
files_to_process.append(file_path)
# Log the file being processed
core_logger.print_to_log_and_console(
f"Queuing file for processing: {file_path}"
)
# Parse and store the activity
background_tasks.add_task(
activities_utils.parse_and_store_activity_from_file,
token_user_id,
file_path,
websocket_manager,
db,
)
# Submit ONE task that processes all files
loop = asyncio.get_event_loop()
loop.run_in_executor(
executor,
partial(
activities_utils.process_all_files_sync,
token_user_id,
files_to_process,
websocket_manager,
),
)
# Log a success message that explains processing will continue elsewhere.
core_logger.print_to_log_and_console(

View File

@@ -1,6 +1,7 @@
import gzip
import os
import shutil
import asyncio
from pathlib import Path
from tempfile import NamedTemporaryFile
@@ -44,6 +45,7 @@ import fit.utils as fit_utils
import core.logger as core_logger
import core.config as core_config
import core.database as core_database
# Global Activity Type Mappings (ID to Name)
ACTIVITY_ID_TO_NAME = {
@@ -1171,3 +1173,42 @@ def set_activity_name_based_on_activity_type(activity_type_id: int) -> str:
# If type is not 10 (Workout), return the mapping with " workout" suffix
return mapping + " workout" if mapping != "Workout" else mapping
def process_all_files_sync(
user_id: int,
file_paths: list[str],
websocket_manager: websocket_schema.WebSocketManager,
):
"""
Process all files sequentially in single thread.
Args:
user_id: User ID.
file_paths: List of file paths to process.
websocket_manager: WebSocket manager instance.
"""
db = next(core_database.get_db())
try:
total_files = len(file_paths)
for idx, file_path in enumerate(file_paths, 1):
core_logger.print_to_log_and_console(
f"Processing file {idx}/{total_files}: " f"{file_path}"
)
asyncio.run(
parse_and_store_activity_from_file(
user_id,
file_path,
websocket_manager,
db,
)
)
# Small delay between files
time.sleep(0.1)
core_logger.print_to_log_and_console(
f"Bulk import completed: {total_files} files "
f"processed for user {user_id}"
)
finally:
db.close()