Added initial multithreading logic for strava activities processing

This commit is contained in:
João Silva
2023-11-20 22:59:16 +00:00
parent 4f684bfd15
commit 7fb36e386f
3 changed files with 232 additions and 17 deletions

View File

@@ -112,9 +112,9 @@ async def read_activities_useractivities_thisweek_distances(
for activity in activity_records:
if activity.activity_type in [1, 2, 3]:
run += activity.distance
elif activity.activity_type in [4, 5, 6, 7]:
elif activity.activity_type in [4, 5, 6, 7, 8]:
bike += activity.distance
elif activity.activity_type == 8:
elif activity.activity_type == 9:
swim += activity.distance
# Prepare the result as JSON

View File

@@ -9,6 +9,7 @@ from jose import jwt, JWTError
from sqlalchemy import func
from stravalib.client import Client
from pint import Quantity
from concurrent.futures import ThreadPoolExecutor
import logging
import requests
@@ -239,7 +240,25 @@ def get_strava_activities(start_date: datetime):
# response.raise_for_status() # Raise an error for bad responses
# store_strava_activities_stravaLib(response.json(), user.id, user.strava_token)
store_strava_activities_stravaLib(user.id, user.strava_token)
#store_strava_activities_stravaLib(user.id, user.strava_token, start_date)
stravaClient = Client(access_token=user.strava_token)
strava_activities = list(stravaClient.get_activities(after=start_date))
chunk_size = len(strava_activities) // 4 # Adjust the number of threads as needed
activity_chunks = [strava_activities[i:i + chunk_size] for i in range(0, len(strava_activities), chunk_size)]
with ThreadPoolExecutor() as executor:
# Process each chunk of activities using threads
results = list(executor.map(lambda chunk: process_activities(chunk, user.id, stravaClient), activity_chunks))
# Flatten the list of results
activities_to_insert = [activity for sublist in results for activity in sublist]
# Bulk insert all activities
with get_db_session() as db_session:
db_session.bulk_save_objects(activities_to_insert)
db_session.commit()
# except requests.exceptions.RequestException as req_err:
# Handle request errors
@@ -250,10 +269,200 @@ def get_strava_activities(start_date: datetime):
except NameError as db_err:
logger.error(f"Database error: {db_err}")
def process_activities(strava_activities, user_id, stravaClient):
activities_to_insert = []
def store_strava_activities_stravaLib(user_id, strava_token):
for activity in strava_activities:
with get_db_session() as db_session:
activity_record = (
db_session.query(Activity)
.filter(Activity.strava_activity_id == activity.id)
.first()
)
if activity_record:
continue # Skip existing activities
# Process the activity and append to the list
processed_activity = process_activity(activity, user_id, stravaClient)
activities_to_insert.append(processed_activity)
return activities_to_insert
def process_activity(activity, user_id, stravaClient):
start_date_parsed = activity.start_date
# Ensure activity.elapsed_time is a numerical value
elapsed_time_seconds = (
activity.elapsed_time.total_seconds()
if isinstance(activity.elapsed_time, timedelta)
else activity.elapsed_time
)
end_date_parsed = start_date_parsed + timedelta(
seconds=elapsed_time_seconds
)
latitude = 0
longitude = 0
if hasattr(activity, "start_latlng") and activity.start_latlng is not None:
latitude = activity.start_latlng.lat
longitude = activity.start_latlng.lon
city = None
town = None
country = None
if latitude != 0 and longitude != 0:
url = f"https://geocode.maps.co/reverse?lat={latitude}&lon={longitude}"
try:
# Make a GET request
response = requests.get(url)
# Check if the request was successful (status code 200)
if response.status_code == 200:
# Parse the JSON response
data = response.json()
# Extract the town and country from the address components
city = data.get("address", {}).get("city", None)
town = data.get("address", {}).get("town", None)
country = data.get("address", {}).get("country", None)
else:
print(f"Error location: {response.status_code}")
print(f"Error location: {url}")
except Exception as e:
print(f"An error occurred: {e}")
# List to store constructed waypoints
waypoints = []
# Initialize variables for elevation gain and loss
elevation_gain = 0
elevation_loss = 0
previous_elevation = None
# Get streams for the activity
streams = stravaClient.get_activity_streams(
activity.id,
types=[
"latlng",
"altitude",
"time",
"heartrate",
"cadence",
"watts",
"velocity_smooth",
],
)
# Extract data from streams
latitudes = streams["latlng"].data if "latlng" in streams else []
longitudes = streams["latlng"].data if "latlng" in streams else []
elevations = streams["altitude"].data if "altitude" in streams else []
times = streams["time"].data if "time" in streams else []
heart_rates = streams["heartrate"].data if "heartrate" in streams else []
cadences = streams["cadence"].data if "cadence" in streams else []
powers = streams["watts"].data if "watts" in streams else []
velocities = (
streams["velocity_smooth"].data if "velocity_smooth" in streams else []
)
for i in range(len(heart_rates)):
waypoint = {
"lat": latitudes[i] if i < len(latitudes) else None,
"lon": longitudes[i] if i < len(longitudes) else None,
"ele": elevations[i] if i < len(elevations) else None,
"time": times[i] if i < len(times) else None,
"hr": heart_rates[i] if i < len(heart_rates) else None,
"cad": cadences[i] if i < len(cadences) else None,
"power": powers[i] if i < len(powers) else None,
"vel": velocities[i] if i < len(velocities) else None,
# Add other relevant fields based on your requirements
}
# Calculate elevation gain and loss on-the-fly
current_elevation = elevations[i] if i < len(elevations) else None
if current_elevation is not None:
if previous_elevation is not None:
elevation_change = current_elevation - previous_elevation
if elevation_change > 0:
elevation_gain += elevation_change
else:
elevation_loss += abs(elevation_change)
previous_elevation = current_elevation
# Append the constructed waypoint to the waypoints list
waypoints.append(waypoint)
average_speed = 0
if activity.average_speed is not None:
average_speed = (
float(activity.average_speed.magnitude)
if isinstance(activity.average_speed, Quantity)
else activity.average_speed
)
average_pace = 1 / average_speed if average_speed != 0 else 0
average_watts = 0
if activity.average_watts is not None:
average_watts = activity.average_watts
auxType = 10 # Default value
type_mapping = {
"running": 1,
"Run": 1,
"trail running": 2,
"TrailRun": 2,
"VirtualRun": 3,
"cycling": 4,
"Ride": 4,
"GravelRide": 5,
"EBikeRide": 6,
"EMountainBikeRide": 6,
"VirtualRide": 7,
"virtual_ride": 7,
"MountainBikeRide": 8,
"swimming": 9,
"Swim": 9,
"open_water_swimming": 9,
"Workout": 10,
}
auxType = type_mapping.get(activity.sport_type, 10)
# Create a new Activity record
newActivity = Activity(
user_id=user_id,
name=activity.name,
distance=round(float(activity.distance))
if isinstance(activity.distance, Quantity)
else round(activity.distance),
activity_type=auxType,
start_time=start_date_parsed,
end_time=end_date_parsed,
city=city,
town=town,
country=country,
#created_at=func.now(), # Use func.now() to set 'created_at' to the current timestamp
created_at=datetime.utcnow(),
waypoints=waypoints,
elevation_gain=elevation_gain,
elevation_loss=elevation_loss,
pace=average_pace,
average_speed=average_speed,
average_power=average_watts,
strava_activity_id=activity.id,
)
return newActivity
def store_strava_activities_stravaLib(user_id, strava_token, start_date):
stravaClient = Client(access_token=strava_token)
start_date = (datetime.utcnow() - timedelta(days=7)).strftime("%Y-%m-%dT%H:%M:%SZ")
#start_date = (datetime.utcnow() - timedelta(days=7)).strftime("%Y-%m-%dT%H:%M:%SZ")
for activity in stravaClient.get_activities(after=start_date):
with get_db_session() as db_session:
# Use SQLAlchemy to query the gear record by ID
@@ -281,9 +490,9 @@ def store_strava_activities_stravaLib(user_id, strava_token):
latitude = 0
longitude = 0
if hasattr(activity, "latlng") and activity.latlng is not None:
latitude = activity.latlng[0]
longitude = activity.latlng[1]
if hasattr(activity, "start_latlng") and activity.start_latlng is not None:
latitude = activity.start_latlng.lat
longitude = activity.start_latlng.lon
city = None
town = None
@@ -343,7 +552,7 @@ def store_strava_activities_stravaLib(user_id, strava_token):
streams["velocity_smooth"].data if "velocity_smooth" in streams else []
)
for i in range(len(latitudes)):
for i in range(len(heart_rates)):
waypoint = {
"lat": latitudes[i] if i < len(latitudes) else None,
"lon": longitudes[i] if i < len(longitudes) else None,
@@ -401,11 +610,13 @@ def store_strava_activities_stravaLib(user_id, strava_token):
"EMountainBikeRide": 6,
"VirtualRide": 7,
"virtual_ride": 7,
"swimming": 8,
"open_water_swimming": 8,
"MountainBikeRide": 8,
"swimming": 9,
"Swim": 9,
"open_water_swimming": 9,
"Workout": 10,
}
auxType = type_mapping.get(activity.workout_type, 10)
auxType = type_mapping.get(activity.sport_type, 10)
# Create a new Activity record
activity = Activity(
@@ -647,11 +858,15 @@ def store_strava_activities(strava_activities, user_id, strava_token):
"EMountainBikeRide": 6,
"VirtualRide": 7,
"virtual_ride": 7,
"swimming": 8,
"open_water_swimming": 8,
"MountainBikeRide": 8,
"swimming": 9,
"open_water_swimming": 9,
"Workout": 10,
}
auxType = type_mapping.get(activity.get("type", "Workout"), 10)
auxType = type_mapping.get(activity.get("sport_type", "Workout"), 10)
aux = activity.get("sport_type")
logger.info(f"No Strava activities returned {aux}")
# Create a new Activity record
activity = Activity(

View File

@@ -39,10 +39,10 @@ scheduler.add_job(sessionController.remove_expired_tokens, "interval", minutes=1
scheduler.add_job(stravaController.refresh_strava_token, "interval", minutes=30)
scheduler.add_job(
lambda: stravaController.get_strava_activities(
(datetime.utcnow() - timedelta(days=7)).strftime("%Y-%m-%dT%H:%M:%SZ")
(datetime.utcnow() - timedelta(days=14)).strftime("%Y-%m-%dT%H:%M:%SZ")
),
"interval",
minutes=60,
minutes=1,
)