Migration logic and v0.5.0 migration

[backend] added new log specific for migrations
[backend] added database logic for migration handling
[backend] added logic for v0.5.0 migration
[backend] added additional crud function in activities necessary for v0.5.0 migration
[README] updated README file with new logs section detailing the several log types available
This commit is contained in:
João Vitória Silva
2024-11-03 22:27:16 +00:00
parent 42b6b75f68
commit 099835d98e
8 changed files with 331 additions and 14 deletions

View File

@@ -30,6 +30,7 @@
- [Volumes](#volumes)
- [Bulk import and file upload](#bulk-import-and-file-upload)
- [Strava Integration](#strava-integration)
- [Log files](#log-files)
- [Sponsors](#sponsors)
- [Contributing](#contributing)
- [License](#license)
@@ -186,6 +187,15 @@ Some notes:
To enable Strava integration, ensure your API endpoint is accessible from the internet and follow Strava's [API setup guide](https://developers.strava.com/docs/getting-started/).
## Log files
The table bellow details the log files available. These logs are available in the /app/logs folder:
| Log | Description |
| --- | --- |
| app.log | General log |
| migrations.log | File detailing migrations operations |
## Sponsors
A huge thank you to our sponsors! Your support helps keep this project going.

View File

@@ -6,6 +6,7 @@ from datetime import datetime
from sqlalchemy import func, desc
from sqlalchemy.orm import Session, joinedload
from urllib.parse import unquote
from pydantic import BaseModel
import models
@@ -15,6 +16,36 @@ import activities.schema as activities_schema
logger = logging.getLogger("myLogger")
def get_all_activities(db: Session):
try:
# Get the activities from the database
activities = (
db.query(models.Activity)
.all()
)
# Check if there are activities if not return None
if not activities:
return None
for activity in activities:
activity.start_time = activity.start_time.strftime("%Y-%m-%d %H:%M:%S")
activity.end_time = activity.end_time.strftime("%Y-%m-%d %H:%M:%S")
activity.created_at = activity.created_at.strftime("%Y-%m-%d %H:%M:%S")
# Return the activities
return activities
except Exception as err:
# Log the exception
logger.error(f"Error in get_all_activities: {err}", exc_info=True)
# Raise an HTTPException with a 500 Internal Server Error status code
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Internal Server Error",
) from err
def get_user_activities(
user_id: int,
db: Session,
@@ -508,7 +539,7 @@ def create_activity(activity: activities_schema.Activity, db: Session):
) from err
def edit_activity(user_id: int, activity: activities_schema.ActivityEdit, db: Session):
def edit_activity(user_id: int, activity: activities_schema.Activity, db: Session):
try:
# Get the activity from the database
db_activity = (
@@ -526,16 +557,16 @@ def edit_activity(user_id: int, activity: activities_schema.ActivityEdit, db: Se
detail="Activity not found",
headers={"WWW-Authenticate": "Bearer"},
)
# Check if 'activity' is a Pydantic model instance and convert it to a dictionary
if isinstance(activity, BaseModel):
activity_data = activity.dict(exclude_unset=True)
else:
activity_data = {key: value for key, value in vars(activity).items() if value is not None}
# Update the activity
if activity.description is not None:
db_activity.description = activity.description
if activity.name is not None:
db_activity.name = activity.name
if activity.activity_type is not None:
db_activity.activity_type = activity.activity_type
if activity.visibility is not None:
db_activity.visibility = activity.visibility
# Iterate over the fields and update the db_activity dynamically
for key, value in activity_data.items():
setattr(db_activity, key, value)
# Commit the transaction
db.commit()

View File

@@ -70,6 +70,17 @@ def upgrade() -> None:
nullable=True,
existing_comment='Average power (watts)')
op.add_column('users', sa.Column('height', sa.Integer(), nullable=True, comment='User height in centimeters'))
op.create_table('migrations',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('name', sa.String(length=250), nullable=False, comment='Migration name'),
sa.Column('description', sa.String(length=2500), nullable=False, comment='Migration description'),
sa.Column('executed', sa.Boolean(), nullable=False, comment='Whether the migration was executed or not'),
sa.PrimaryKeyConstraint('id')
)
op.execute("""
INSERT INTO migrations (id, name, description, executed) VALUES
(1, 'v0.5.0', 'Process additional activity fields for existing activities', false);
""")
# ### end Alembic commands ###
@@ -111,4 +122,5 @@ def downgrade() -> None:
op.drop_table('health_targets')
op.drop_index(op.f('ix_health_data_user_id'), table_name='health_data')
op.drop_table('health_data')
op.drop_table('migrations')
# ### end Alembic commands ###

View File

@@ -20,6 +20,7 @@ from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
import strava.utils as strava_utils
import strava.activity_utils as strava_activity_utils
import migrations.utils as migrations_utils
from config import API_VERSION
from database import SessionLocal
@@ -36,6 +37,9 @@ def startup_event():
alembic_cfg.attributes["configure_logger"] = False
command.upgrade(alembic_cfg, "head")
# Migration check
check_migrations()
# Create a scheduler to run background jobs
scheduler.start()
@@ -58,12 +62,25 @@ def shutdown_event():
scheduler.shutdown()
def check_migrations():
logger.info("Checking for migrations not executed")
# Create a new database session
db = SessionLocal()
try:
# Check migrations not executed
migrations_utils.check_migrations_not_executed(db)
finally:
# Ensure the session is closed after use
db.close()
logger.info("Migration check completed")
def refresh_strava_tokens_job():
# Create a new database session
db = SessionLocal()
try:
# Refresh Strava tokens
strava_utils.refresh_strava_tokens(db=db)
strava_utils.refresh_strava_tokens(db)
finally:
# Ensure the session is closed after use
db.close()
@@ -140,9 +157,7 @@ origins = [
"http://localhost",
"http://localhost:8080",
"http://localhost:5173",
os.environ.get("FRONTEND_PROTOCOL")
+ "://"
+ os.environ.get("FRONTEND_HOST"),
os.environ.get("FRONTEND_PROTOCOL") + "://" + os.environ.get("FRONTEND_HOST"),
]
app.add_middleware(

View File

View File

@@ -0,0 +1,71 @@
import logging
from fastapi import HTTPException, status
from sqlalchemy.orm import Session
import models
# Define a loggger created on main.py
logger = logging.getLogger("myLogger")
def get_migrations_not_executed(
db: Session
):
try:
# Get the migrations from the database
db_migrations = (
db.query(models.Migration)
.filter(models.Migration.executed == False)
.all()
)
# Check if there are not migrations if not return None
if not db_migrations:
return None
# Return the migrations
return db_migrations
except Exception as err:
# Log the exception
logger.error(f"Error in get_migrations_not_executed: {err}", exc_info=True)
# Raise an HTTPException with a 500 Internal Server Error status code
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Internal Server Error",
) from err
def set_migration_as_executed(migration_id: int, db: Session):
try:
# Get the migration from the database
db_migration = (
db.query(models.Migration)
.filter(models.Migration.id == migration_id)
.first()
)
if db_migration is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Migration not found",
headers={"WWW-Authenticate": "Bearer"},
)
# Update the migration
db_migration.executed = True
# Commit the transaction
db.commit()
except Exception as err:
# Rollback the transaction
db.rollback()
# Log the exception
logger.error(f"Error in set_migration_as_executed: {err}", exc_info=True)
# Raise an HTTPException with a 500 Internal Server Error status code
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Internal Server Error",
) from err

View File

@@ -0,0 +1,164 @@
import logging
from enum import Enum
from datetime import datetime
from sqlalchemy.orm import Session
import activities.crud as activities_crud
import activities.utils as activities_utils
import activity_streams.crud as activity_streams_crud
import migrations.crud as migrations_crud
# Create loggger
logger = logging.getLogger("migration_logger")
logger.setLevel(logging.DEBUG)
file_handler = logging.FileHandler("logs/migrations.log")
file_handler.setLevel(logging.DEBUG)
formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)
class StreamType(Enum):
HEART_RATE = 1
POWER = 2
CADENCE = 3
ELEVATION = 4
SPEED = 5
PACE = 6
LATLONG = 7
def check_migrations_not_executed(db: Session):
migrations_not_executed = migrations_crud.get_migrations_not_executed(db)
for migration in migrations_not_executed:
# Log the migration not executed
logger.info(
f"Migration not executed: {migration.name} - Migration will be executed"
)
if migration.id == 1:
# Execute the migration
process_migration_1(db)
def process_migration_1(db: Session):
logger.info("Started migration 1")
activities_processed_with_no_errors = True
try:
activities = activities_crud.get_all_activities(db)
except Exception as e:
logger.error(f"Error fetching activities: {e}")
return
for activity in activities:
try:
# Ensure start_time and end_time are datetime objects
if isinstance(activity.start_time, str):
activity.start_time = datetime.strptime(activity.start_time, "%Y-%m-%d %H:%M:%S")
if isinstance(activity.end_time, str):
activity.end_time = datetime.strptime(activity.end_time, "%Y-%m-%d %H:%M:%S")
# Initialize additional fields
metrics = {
"avg_hr": None,
"max_hr": None,
"avg_power": None,
"max_power": None,
"np": None,
"avg_cadence": None,
"max_cadence": None,
"avg_speed": None,
"max_speed": None,
}
# Get activity streams
try:
activity_streams = activity_streams_crud.get_activity_streams(
activity.id, db
)
except Exception as e:
logger.warning(
f"Failed to fetch streams for activity {activity.id}: {e}"
)
activities_processed_with_no_errors = False
continue
# Map stream processing functions
stream_processing = {
StreamType.HEART_RATE: ("avg_hr", "max_hr", "hr"),
StreamType.POWER: ("avg_power", "max_power", "power", "np"),
StreamType.CADENCE: ("avg_cadence", "max_cadence", "cad"),
StreamType.ELEVATION: None,
StreamType.SPEED: ("avg_speed", "max_speed", "vel"),
StreamType.PACE: None,
StreamType.LATLONG: None,
}
for stream in activity_streams:
stream_type = StreamType(stream.stream_type)
if (
stream_type in stream_processing
and stream_processing[stream_type] is not None
):
attr_avg, attr_max, stream_key = stream_processing[stream_type][:3]
metrics[attr_avg], metrics[attr_max] = (
activities_utils.calculate_avg_and_max(
stream.stream_waypoints, stream_key
)
)
# Special handling for normalized power
if stream_type == StreamType.POWER:
metrics["np"] = activities_utils.calculate_np(
stream.stream_waypoints
)
# Calculate elapsed time once
elapsed_time_seconds = (
activity.end_time - activity.start_time
).total_seconds()
# Set fields on the activity object
activity.total_elapsed_time = elapsed_time_seconds
activity.total_timer_time = elapsed_time_seconds
activity.max_speed = metrics["max_speed"]
activity.max_power = metrics["max_power"]
activity.normalized_power = metrics["np"]
activity.average_hr = metrics["avg_hr"]
activity.max_hr = metrics["max_hr"]
activity.average_cad = metrics["avg_cadence"]
activity.max_cad = metrics["max_cadence"]
# Update the activity in the database
activities_crud.edit_activity(activity.user_id, activity, db)
logger.info(f"Processed activity: {activity.id} - {activity.name}")
except Exception as e:
print(
f"Failed to process activity {activity.id}. Please check migrations log for more details."
)
activities_processed_with_no_errors = False
logger.error(f"Failed to process activity {activity.id}: {e}")
# Mark migration as executed
if activities_processed_with_no_errors:
try:
migrations_crud.set_migration_as_executed(1, db)
except Exception as e:
logger.error(f"Failed to set migration as executed: {e}")
return
else:
logger.error(
"Migration 1 failed to process all activities. Will try again later."
)
logger.info("Finished migration 1")

View File

@@ -14,6 +14,20 @@ from sqlalchemy.dialects.mysql import JSON
from database import Base
class Migration(Base):
__tablename__ = "migrations"
id = Column(Integer, primary_key=True)
name = Column(String(length=250), nullable=False, comment="Migration name")
description = Column(String(length=2500), nullable=False, comment="Migration description")
executed = Column(
Boolean,
nullable=False,
default=False,
comment="Whether the migration was executed or not",
)
# Data model for followers table using SQLAlchemy's ORM
class Follower(Base):
__tablename__ = "followers"