Removed some unused dependencies (Ruff). Initial code formatting using Black Formatter

This commit is contained in:
João Silva
2023-11-13 14:10:23 +00:00
parent 8863eeba8a
commit 4f684bfd15
8 changed files with 837 additions and 403 deletions

View File

@@ -2,7 +2,7 @@
import os
from dotenv import load_dotenv
load_dotenv('config/.env')
load_dotenv("config/.env")
DB_HOST = os.getenv("DB_HOST")
DB_USER = os.getenv("DB_USER")

View File

@@ -1,14 +1,12 @@
import os
import logging
from fastapi import APIRouter, Depends, HTTPException, Response, File, UploadFile, Request
from fastapi import APIRouter, Depends, HTTPException
from fastapi.security import OAuth2PasswordBearer
from typing import List, Optional
from sqlalchemy import func, DECIMAL, DateTime, desc
from sqlalchemy import func, desc
from db.db import get_db_session, Activity
from jose import jwt, JWTError
from dotenv import load_dotenv
import mysql.connector.errors
from urllib.parse import unquote
from pydantic import BaseModel
from datetime import datetime, timedelta
@@ -17,75 +15,95 @@ router = APIRouter()
logger = logging.getLogger("myLogger")
# Load the environment variables from config/.env
load_dotenv('config/.env')
load_dotenv("config/.env")
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
@router.get("/activities/all", response_model=List[dict])
async def read_activities_all(token: str = Depends(oauth2_scheme)):
from . import sessionController
try:
sessionController.validate_token(token)
with get_db_session() as db_session:
# Query the activities records using SQLAlchemy
activity_records = db_session.query(Activity).order_by(desc(Activity.start_time)).all()
activity_records = (
db_session.query(Activity).order_by(desc(Activity.start_time)).all()
)
# Convert the SQLAlchemy objects to dictionaries
results = [activity.__dict__ for activity in activity_records]
except JWTError:
raise HTTPException(status_code=401, detail="Unauthorized")
except Error as err:
except NameError as err:
print(err)
return results
@router.get("/activities/useractivities", response_model=List[dict])
async def read_activities_useractivities(token: str = Depends(oauth2_scheme)):
from . import sessionController
try:
sessionController.validate_token(token)
with get_db_session() as db_session:
payload = jwt.decode(token, os.getenv("SECRET_KEY"), algorithms=[os.getenv("ALGORITHM")])
payload = jwt.decode(
token, os.getenv("SECRET_KEY"), algorithms=[os.getenv("ALGORITHM")]
)
user_id = payload.get("id")
# Query the activities records using SQLAlchemy
activity_records = db_session.query(Activity).filter(Activity.user_id == user_id).order_by(desc(Activity.start_time)).all()
activity_records = (
db_session.query(Activity)
.filter(Activity.user_id == user_id)
.order_by(desc(Activity.start_time))
.all()
)
# Convert the SQLAlchemy objects to dictionaries
results = [activity.__dict__ for activity in activity_records]
except JWTError:
raise HTTPException(status_code=401, detail="Unauthorized")
except Error as err:
except NameError as err:
print(err)
return results
@router.get("/activities/useractivities/{user_id}/thisweek/distances")
async def read_activities_useractivities_thisweek_distances(user_id = int, token: str = Depends(oauth2_scheme)):
async def read_activities_useractivities_thisweek_distances(
user_id=int, token: str = Depends(oauth2_scheme)
):
from . import sessionController
try:
sessionController.validate_token(token)
with get_db_session() as db_session:
#payload = jwt.decode(token, os.getenv("SECRET_KEY"), algorithms=[os.getenv("ALGORITHM")])
#user_id = payload.get("id")
# payload = jwt.decode(token, os.getenv("SECRET_KEY"), algorithms=[os.getenv("ALGORITHM")])
# user_id = payload.get("id")
# Calculate the start of the current week
today = datetime.utcnow().date()
start_of_week = today - timedelta(days=today.weekday()) # Monday is the first day of the week, which is denoted by 0
start_of_week = today - timedelta(
days=today.weekday()
) # Monday is the first day of the week, which is denoted by 0
end_of_week = start_of_week + timedelta(days=7)
# Query the activities records for the current week
activity_records = db_session.query(Activity)\
activity_records = (
db_session.query(Activity)
.filter(
Activity.user_id == user_id,
func.date(Activity.start_time) >= start_of_week,
func.date(Activity.start_time) < end_of_week
)\
.order_by(desc(Activity.start_time))\
func.date(Activity.start_time) < end_of_week,
)
.order_by(desc(Activity.start_time))
.all()
)
# Initialize distance variables
run = bike = swim = 0
@@ -100,89 +118,107 @@ async def read_activities_useractivities_thisweek_distances(user_id = int, token
swim += activity.distance
# Prepare the result as JSON
results = {
"run": run,
"bike": bike,
"swim": swim
}
results = {"run": run, "bike": bike, "swim": swim}
except JWTError:
raise HTTPException(status_code=401, detail="Unauthorized")
except Error as err:
except NameError as err:
print(err)
raise HTTPException(status_code=500, detail="Internal Server Error")
return results
@router.get("/activities/gear/{gearID}", response_model=List[dict])
async def read_activities_gearactivities(gearID = int, token: str = Depends(oauth2_scheme)):
async def read_activities_gearactivities(
gearID=int, token: str = Depends(oauth2_scheme)
):
from . import sessionController
try:
sessionController.validate_token(token)
with get_db_session() as db_session:
payload = jwt.decode(token, os.getenv("SECRET_KEY"), algorithms=[os.getenv("ALGORITHM")])
payload = jwt.decode(
token, os.getenv("SECRET_KEY"), algorithms=[os.getenv("ALGORITHM")]
)
user_id = payload.get("id")
# Query the activities records using SQLAlchemy
activity_records = db_session.query(Activity).filter(Activity.user_id == user_id, Activity.gear_id == gearID).order_by(desc(Activity.start_time)).all()
activity_records = (
db_session.query(Activity)
.filter(Activity.user_id == user_id, Activity.gear_id == gearID)
.order_by(desc(Activity.start_time))
.all()
)
# Convert the SQLAlchemy objects to dictionaries
results = [activity.__dict__ for activity in activity_records]
except JWTError:
raise HTTPException(status_code=401, detail="Unauthorized")
except Error as err:
except NameError as err:
print(err)
return results
@router.get("/activities/all/number")
async def read_activities_all_number(token: str = Depends(oauth2_scheme)):
from . import sessionController
try:
sessionController.validate_token(token)
with get_db_session() as db_session:
# Query the number of activities records for the user using SQLAlchemy
activities_count = db_session.query(func.count(Activity.id)).scalar()
except JWTError:
raise HTTPException(status_code=401, detail="Unauthorized")
except Error as err:
except NameError as err:
print(err)
return {0: activities_count}
@router.get("/activities/useractivities/number")
async def read_activities_useractivities_number(token: str = Depends(oauth2_scheme)):
from . import sessionController
try:
sessionController.validate_token(token)
with get_db_session() as db_session:
payload = jwt.decode(token, os.getenv("SECRET_KEY"), algorithms=[os.getenv("ALGORITHM")])
payload = jwt.decode(
token, os.getenv("SECRET_KEY"), algorithms=[os.getenv("ALGORITHM")]
)
user_id = payload.get("id")
# Query the number of activities records for the user using SQLAlchemy
activities_count = db_session.query(func.count(Activity.id)).filter(Activity.user_id == user_id).scalar()
activities_count = (
db_session.query(func.count(Activity.id))
.filter(Activity.user_id == user_id)
.scalar()
)
except JWTError:
raise HTTPException(status_code=401, detail="Unauthorized")
except Error as err:
except NameError as err:
print(err)
return {0: activities_count}
@router.get("/activities/all/pagenumber/{pageNumber}/numRecords/{numRecords}", response_model=List[dict])
@router.get(
"/activities/all/pagenumber/{pageNumber}/numRecords/{numRecords}",
response_model=List[dict],
)
async def read_activities_all_pagination(
pageNumber: int,
numRecords: int,
token: str = Depends(oauth2_scheme)
pageNumber: int, numRecords: int, token: str = Depends(oauth2_scheme)
):
from . import sessionController
try:
sessionController.validate_token(token)
with get_db_session() as db_session:
# Use SQLAlchemy to query the gear records with pagination
activity_records = (
db_session.query(Activity)
@@ -197,22 +233,27 @@ async def read_activities_all_pagination(
except JWTError:
raise HTTPException(status_code=401, detail="Unauthorized")
except Error as err:
except NameError as err:
print(err)
return results
@router.get("/activities/useractivities/pagenumber/{pageNumber}/numRecords/{numRecords}", response_model=List[dict])
@router.get(
"/activities/useractivities/pagenumber/{pageNumber}/numRecords/{numRecords}",
response_model=List[dict],
)
async def read_activities_useractivities_pagination(
pageNumber: int,
numRecords: int,
token: str = Depends(oauth2_scheme)
pageNumber: int, numRecords: int, token: str = Depends(oauth2_scheme)
):
from . import sessionController
try:
sessionController.validate_token(token)
with get_db_session() as db_session:
payload = jwt.decode(token, os.getenv("SECRET_KEY"), algorithms=[os.getenv("ALGORITHM")])
payload = jwt.decode(
token, os.getenv("SECRET_KEY"), algorithms=[os.getenv("ALGORITHM")]
)
user_id = payload.get("id")
# Use SQLAlchemy to query the gear records with pagination
@@ -230,19 +271,23 @@ async def read_activities_useractivities_pagination(
except JWTError:
raise HTTPException(status_code=401, detail="Unauthorized")
except Error as err:
except NameError as err:
print(err)
return results
# Get gear from id
@router.get("/activities/{id}/activityfromid", response_model=List[dict])
async def read_activities_activityFromId(id: int, token: str = Depends(oauth2_scheme)):
from . import sessionController
try:
sessionController.validate_token(token)
with get_db_session() as db_session:
payload = jwt.decode(token, os.getenv("SECRET_KEY"), algorithms=[os.getenv("ALGORITHM")])
payload = jwt.decode(
token, os.getenv("SECRET_KEY"), algorithms=[os.getenv("ALGORITHM")]
)
user_id = payload.get("id")
# Use SQLAlchemy to query the gear record by ID
@@ -260,20 +305,26 @@ async def read_activities_activityFromId(id: int, token: str = Depends(oauth2_sc
except JWTError:
raise HTTPException(status_code=401, detail="Unauthorized")
except Error as err:
except NameError as err:
print(err)
return results
@router.put("/activities/{activity_id}/addgear/{gear_id}")
async def activity_add_gear(activity_id: int, gear_id: int, token: str = Depends(oauth2_scheme)):
async def activity_add_gear(
activity_id: int, gear_id: int, token: str = Depends(oauth2_scheme)
):
from . import sessionController
try:
sessionController.validate_token(token)
# Use SQLAlchemy to query and delete the gear record
with get_db_session() as db_session:
activity_record = db_session.query(Activity).filter(Activity.id == activity_id).first()
activity_record = (
db_session.query(Activity).filter(Activity.id == activity_id).first()
)
if activity_record:
activity_record.gear_id = gear_id
@@ -287,10 +338,12 @@ async def activity_add_gear(activity_id: int, gear_id: int, token: str = Depends
except JWTError:
raise HTTPException(status_code=401, detail="Unauthorized")
except Exception as err:
print(err)
raise HTTPException(status_code=500, detail="Failed to add gear to activity")
return {"message": f"Gear added to {activity_id}"}
class CreateActivityRequest(BaseModel):
distance: int
name: str
@@ -308,25 +361,28 @@ class CreateActivityRequest(BaseModel):
averagePower: int
strava_id: Optional[int]
@router.post("/activities/create")
async def create_activity(
activity_data: CreateActivityRequest,
token: str = Depends(oauth2_scheme)
activity_data: CreateActivityRequest, token: str = Depends(oauth2_scheme)
):
from . import sessionController
try:
# Validate the user's token
sessionController.validate_token(token)
# get user_id
payload = jwt.decode(token, os.getenv("SECRET_KEY"), algorithms=[os.getenv("ALGORITHM")])
payload = jwt.decode(
token, os.getenv("SECRET_KEY"), algorithms=[os.getenv("ALGORITHM")]
)
user_id = payload.get("id")
# Convert the 'starttime' string to a datetime
#starttime = datetime.strptime(activity_data.starttime, "%Y-%m-%dT%H:%M:%SZ")
# starttime = datetime.strptime(activity_data.starttime, "%Y-%m-%dT%H:%M:%SZ")
starttime = parse_timestamp(activity_data.starttime)
# Convert the 'endtime' string to a datetime
#endtime = datetime.strptime(activity_data.endtime, "%Y-%m-%dT%H:%M:%SZ")
# endtime = datetime.strptime(activity_data.endtime, "%Y-%m-%dT%H:%M:%SZ")
endtime = parse_timestamp(activity_data.endtime)
auxType = 10 # Default value
@@ -341,10 +397,10 @@ async def create_activity(
"VirtualRide": 7,
"virtual_ride": 7,
"swimming": 8,
"open_water_swimming": 8
"open_water_swimming": 8,
}
auxType = type_mapping.get(activity_data.type, 10)
# Create a new Activity record
activity = Activity(
user_id=user_id,
@@ -363,16 +419,18 @@ async def create_activity(
pace=activity_data.pace,
average_speed=activity_data.averageSpeed,
average_power=activity_data.averagePower,
strava_activity_id=activity.strava_id
strava_activity_id=activity_data.strava_id,
)
# Store the Activity record in the database
with get_db_session() as db_session:
db_session.add(activity)
db_session.commit()
db_session.refresh(activity) # This will ensure that the activity object is updated with the ID from the database
#return {"message": "Activity stored successfully", "id": activity.id}
db_session.refresh(
activity
) # This will ensure that the activity object is updated with the ID from the database
# return {"message": "Activity stored successfully", "id": activity.id}
return {"message": "Activity stored successfully"}
except JWTError:
@@ -382,7 +440,8 @@ async def create_activity(
logger.error(err)
raise HTTPException(status_code=500, detail="Failed to store activity")
#return {"message": "Activity stored successfully"}
# return {"message": "Activity stored successfully"}
def parse_timestamp(timestamp_string):
try:
@@ -392,25 +451,30 @@ def parse_timestamp(timestamp_string):
# If milliseconds are not present, use a default value of 0
return datetime.strptime(timestamp_string, "%Y-%m-%dT%H:%M:%SZ")
# Define an HTTP PUT route to delete an activity gear
@router.put("/activities/{activity_id}/deletegear")
async def delete_activity_gear(
activity_id: int,
token: str = Depends(oauth2_scheme)
):
async def delete_activity_gear(activity_id: int, token: str = Depends(oauth2_scheme)):
from . import sessionController
try:
# Validate the user's access token using the oauth2_scheme
sessionController.validate_token(token)
with get_db_session() as db_session:
# get user_id
payload = jwt.decode(token, os.getenv("SECRET_KEY"), algorithms=[os.getenv("ALGORITHM")])
payload = jwt.decode(
token, os.getenv("SECRET_KEY"), algorithms=[os.getenv("ALGORITHM")]
)
user_id = payload.get("id")
# Query the database to find the user by their ID
activity = db_session.query(Activity).filter(Activity.id == activity_id,Activity.user_id == user_id).first()
activity = (
db_session.query(Activity)
.filter(Activity.id == activity_id, Activity.user_id == user_id)
.first()
)
# Check if the user with the given ID exists
if not activity:
raise HTTPException(status_code=404, detail="Activity not found")
@@ -431,19 +495,27 @@ async def delete_activity_gear(
# Return a success message
return {"message": f"Activity gear {activity_id} has been deleted"}
@router.delete("/activities/{activity_id}/delete")
async def delete_activity(activity_id: int, token: str = Depends(oauth2_scheme)):
from . import sessionController
try:
sessionController.validate_token(token)
# Use SQLAlchemy to query and delete the gear record
with get_db_session() as db_session:
# get user_id
payload = jwt.decode(token, os.getenv("SECRET_KEY"), algorithms=[os.getenv("ALGORITHM")])
payload = jwt.decode(
token, os.getenv("SECRET_KEY"), algorithms=[os.getenv("ALGORITHM")]
)
user_id = payload.get("id")
activity_record = db_session.query(Activity).filter(Activity.id == activity_id,Activity.user_id == user_id).first()
activity_record = (
db_session.query(Activity)
.filter(Activity.id == activity_id, Activity.user_id == user_id)
.first()
)
if activity_record:
# Check for existing dependencies (uncomment if needed)
@@ -464,6 +536,7 @@ async def delete_activity(activity_id: int, token: str = Depends(oauth2_scheme))
except JWTError:
raise HTTPException(status_code=401, detail="Unauthorized")
except Exception as err:
print(err)
raise HTTPException(status_code=500, detail="Failed to delete activity")
return {"message": f"Activity {activity_id} has been deleted"}
return {"message": f"Activity {activity_id} has been deleted"}

View File

@@ -1,13 +1,12 @@
import os
import logging
from fastapi import APIRouter, Depends, HTTPException, Form, Response
from fastapi import APIRouter, Depends, HTTPException, Response
from fastapi.security import OAuth2PasswordBearer
from typing import List, Optional
from sqlalchemy import func
from db.db import get_db_session, Gear
from jose import jwt, JWTError
from dotenv import load_dotenv
import mysql.connector.errors
from urllib.parse import unquote
from pydantic import BaseModel
@@ -16,137 +15,177 @@ router = APIRouter()
logger = logging.getLogger("myLogger")
# Load the environment variables from config/.env
load_dotenv('config/.env')
load_dotenv("config/.env")
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
@router.get("/gear/all", response_model=List[dict])
async def read_gear_all(token: str = Depends(oauth2_scheme)):
from . import sessionController
try:
sessionController.validate_token(token)
with get_db_session() as db_session:
payload = jwt.decode(token, os.getenv("SECRET_KEY"), algorithms=[os.getenv("ALGORITHM")])
payload = jwt.decode(
token, os.getenv("SECRET_KEY"), algorithms=[os.getenv("ALGORITHM")]
)
user_id = payload.get("id")
# Query the gear records using SQLAlchemy
gear_records = db_session.query(Gear).filter(Gear.user_id == user_id).order_by(Gear.nickname).all()
gear_records = (
db_session.query(Gear)
.filter(Gear.user_id == user_id)
.order_by(Gear.nickname)
.all()
)
# Convert the SQLAlchemy objects to dictionaries
results = [gear.__dict__ for gear in gear_records]
except JWTError:
raise HTTPException(status_code=401, detail="Unauthorized")
except Error as err:
except NameError as err:
print(err)
return results
@router.get("/gear/all/running", response_model=List[dict])
async def read_gear_all_running(token: str = Depends(oauth2_scheme)):
from . import sessionController
try:
sessionController.validate_token(token)
with get_db_session() as db_session:
payload = jwt.decode(token, os.getenv("SECRET_KEY"), algorithms=[os.getenv("ALGORITHM")])
payload = jwt.decode(
token, os.getenv("SECRET_KEY"), algorithms=[os.getenv("ALGORITHM")]
)
user_id = payload.get("id")
# Query the gear records using SQLAlchemy
gear_records = db_session.query(Gear).filter(
Gear.gear_type == 2,
Gear.user_id == user_id
).order_by(Gear.nickname).all()
gear_records = (
db_session.query(Gear)
.filter(Gear.gear_type == 2, Gear.user_id == user_id)
.order_by(Gear.nickname)
.all()
)
# Convert the SQLAlchemy objects to dictionaries
results = [gear.__dict__ for gear in gear_records]
except JWTError:
raise HTTPException(status_code=401, detail="Unauthorized")
except Error as err:
except NameError as err:
print(err)
return results
@router.get("/gear/all/cycling", response_model=List[dict])
async def read_gear_all_cycling(token: str = Depends(oauth2_scheme)):
from . import sessionController
try:
sessionController.validate_token(token)
with get_db_session() as db_session:
payload = jwt.decode(token, os.getenv("SECRET_KEY"), algorithms=[os.getenv("ALGORITHM")])
payload = jwt.decode(
token, os.getenv("SECRET_KEY"), algorithms=[os.getenv("ALGORITHM")]
)
user_id = payload.get("id")
# Query the gear records using SQLAlchemy
gear_records = db_session.query(Gear).filter(
Gear.gear_type == 1,
Gear.user_id == user_id
).order_by(Gear.nickname).all()
gear_records = (
db_session.query(Gear)
.filter(Gear.gear_type == 1, Gear.user_id == user_id)
.order_by(Gear.nickname)
.all()
)
# Convert the SQLAlchemy objects to dictionaries
results = [gear.__dict__ for gear in gear_records]
except JWTError:
raise HTTPException(status_code=401, detail="Unauthorized")
except Error as err:
except NameError as err:
print(err)
return results
@router.get("/gear/all/swimming", response_model=List[dict])
async def read_gear_all_swimming(token: str = Depends(oauth2_scheme)):
from . import sessionController
try:
sessionController.validate_token(token)
with get_db_session() as db_session:
payload = jwt.decode(token, os.getenv("SECRET_KEY"), algorithms=[os.getenv("ALGORITHM")])
payload = jwt.decode(
token, os.getenv("SECRET_KEY"), algorithms=[os.getenv("ALGORITHM")]
)
user_id = payload.get("id")
# Query the gear records using SQLAlchemy
gear_records = db_session.query(Gear).filter(
Gear.gear_type == 3,
Gear.user_id == user_id
).order_by(Gear.nickname).all()
gear_records = (
db_session.query(Gear)
.filter(Gear.gear_type == 3, Gear.user_id == user_id)
.order_by(Gear.nickname)
.all()
)
# Convert the SQLAlchemy objects to dictionaries
results = [gear.__dict__ for gear in gear_records]
except JWTError:
raise HTTPException(status_code=401, detail="Unauthorized")
except Error as err:
except NameError as err:
print(err)
return results
@router.get("/gear/number")
async def read_gear_number(token: str = Depends(oauth2_scheme)):
from . import sessionController
try:
sessionController.validate_token(token)
with get_db_session() as db_session:
payload = jwt.decode(token, os.getenv("SECRET_KEY"), algorithms=[os.getenv("ALGORITHM")])
payload = jwt.decode(
token, os.getenv("SECRET_KEY"), algorithms=[os.getenv("ALGORITHM")]
)
user_id = payload.get("id")
# Query the number of gear records for the user using SQLAlchemy
gear_count = db_session.query(func.count(Gear.id)).filter(Gear.user_id == user_id).scalar()
gear_count = (
db_session.query(func.count(Gear.id))
.filter(Gear.user_id == user_id)
.scalar()
)
except JWTError:
raise HTTPException(status_code=401, detail="Unauthorized")
except Error as err:
except NameError as err:
print(err)
return {0: gear_count}
@router.get("/gear/all/pagenumber/{pageNumber}/numRecords/{numRecords}", response_model=List[dict])
@router.get(
"/gear/all/pagenumber/{pageNumber}/numRecords/{numRecords}",
response_model=List[dict],
)
async def read_gear_all_pagination(
pageNumber: int,
numRecords: int,
token: str = Depends(oauth2_scheme)
pageNumber: int, numRecords: int, token: str = Depends(oauth2_scheme)
):
from . import sessionController
try:
sessionController.validate_token(token)
with get_db_session() as db_session:
payload = jwt.decode(token, os.getenv("SECRET_KEY"), algorithms=[os.getenv("ALGORITHM")])
payload = jwt.decode(
token, os.getenv("SECRET_KEY"), algorithms=[os.getenv("ALGORITHM")]
)
user_id = payload.get("id")
# Use SQLAlchemy to query the gear records with pagination
@@ -164,19 +203,24 @@ async def read_gear_all_pagination(
except JWTError:
raise HTTPException(status_code=401, detail="Unauthorized")
except Error as err:
except NameError as err:
print(err)
return results
@router.get("/gear/{nickname}/gearfromnickname", response_model=List[dict])
async def read_gear_gearFromNickname(nickname: str, token: str = Depends(oauth2_scheme)):
async def read_gear_gearFromNickname(
nickname: str, token: str = Depends(oauth2_scheme)
):
from . import sessionController
try:
sessionController.validate_token(token)
with get_db_session() as db_session:
payload = jwt.decode(token, os.getenv("SECRET_KEY"), algorithms=[os.getenv("ALGORITHM")])
payload = jwt.decode(
token, os.getenv("SECRET_KEY"), algorithms=[os.getenv("ALGORITHM")]
)
user_id = payload.get("id")
# Define a search term
@@ -185,7 +229,9 @@ async def read_gear_gearFromNickname(nickname: str, token: str = Depends(oauth2_
# Use SQLAlchemy to query the gear records by nickname
gear_records = (
db_session.query(Gear)
.filter(Gear.nickname.like(f"%{partial_nickname}%"), Gear.user_id == user_id)
.filter(
Gear.nickname.like(f"%{partial_nickname}%"), Gear.user_id == user_id
)
.all()
)
@@ -194,7 +240,7 @@ async def read_gear_gearFromNickname(nickname: str, token: str = Depends(oauth2_
except JWTError:
raise HTTPException(status_code=401, detail="Unauthorized")
except Error as err:
except NameError as err:
print(err)
return results
@@ -204,10 +250,13 @@ async def read_gear_gearFromNickname(nickname: str, token: str = Depends(oauth2_
@router.get("/gear/{id}/gearfromid", response_model=List[dict])
async def read_gear_gearFromId(id: int, token: str = Depends(oauth2_scheme)):
from . import sessionController
try:
sessionController.validate_token(token)
with get_db_session() as db_session:
payload = jwt.decode(token, os.getenv("SECRET_KEY"), algorithms=[os.getenv("ALGORITHM")])
payload = jwt.decode(
token, os.getenv("SECRET_KEY"), algorithms=[os.getenv("ALGORITHM")]
)
user_id = payload.get("id")
# Use SQLAlchemy to query the gear record by ID
@@ -225,11 +274,12 @@ async def read_gear_gearFromId(id: int, token: str = Depends(oauth2_scheme)):
except JWTError:
raise HTTPException(status_code=401, detail="Unauthorized")
except Error as err:
except NameError as err:
print(err)
return results
class CreateGearRequest(BaseModel):
brand: Optional[str]
model: Optional[str]
@@ -237,16 +287,17 @@ class CreateGearRequest(BaseModel):
gear_type: int
date: str
@router.post("/gear/create")
async def create_gear(
gear: CreateGearRequest,
token: str = Depends(oauth2_scheme)
):
async def create_gear(gear: CreateGearRequest, token: str = Depends(oauth2_scheme)):
from . import sessionController
try:
sessionController.validate_token(token)
payload = jwt.decode(token, os.getenv("SECRET_KEY"), algorithms=[os.getenv("ALGORITHM")])
payload = jwt.decode(
token, os.getenv("SECRET_KEY"), algorithms=[os.getenv("ALGORITHM")]
)
user_id = payload.get("id")
with get_db_session() as db_session:
@@ -267,12 +318,13 @@ async def create_gear(
except JWTError:
raise HTTPException(status_code=401, detail="Unauthorized")
except Error as err:
except NameError as err:
print(err)
raise HTTPException(status_code=500, detail="Failed to create gear")
return {"message": "Gear added successfully"}
class EditGearRequest(BaseModel):
brand: Optional[str]
model: Optional[str]
@@ -281,13 +333,13 @@ class EditGearRequest(BaseModel):
date: str
is_active: int
@router.put("/gear/{gear_id}/edit")
async def edit_gear(
gear_id: int,
gear: EditGearRequest,
token: str = Depends(oauth2_scheme)
gear_id: int, gear: EditGearRequest, token: str = Depends(oauth2_scheme)
):
from . import sessionController
try:
sessionController.validate_token(token)
@@ -319,9 +371,13 @@ async def edit_gear(
return {"message": "Gear edited successfully"}
@router.delete("/gear/{gear_id}/delete")
async def delete_gear(gear_id: int, response: Response, token: str = Depends(oauth2_scheme)):
async def delete_gear(
gear_id: int, response: Response, token: str = Depends(oauth2_scheme)
):
from . import sessionController
try:
sessionController.validate_token(token)
@@ -348,6 +404,7 @@ async def delete_gear(gear_id: int, response: Response, token: str = Depends(oau
except JWTError:
raise HTTPException(status_code=401, detail="Unauthorized")
except Exception as err:
print(err)
raise HTTPException(status_code=500, detail="Failed to delete gear")
return {"message": f"Gear {gear_id} has been deleted"}

View File

@@ -1,10 +1,14 @@
import os
import logging
from fastapi import APIRouter, Depends, HTTPException, Form
from fastapi import APIRouter, Depends, HTTPException
from datetime import datetime, timedelta
from jose import jwt, JWTError
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from db.db import get_db_session, User, AccessToken # Import your SQLAlchemy session management from db.db and models
from fastapi.security import OAuth2PasswordBearer
from db.db import (
get_db_session,
User,
AccessToken,
) # Import your SQLAlchemy session management from db.db and models
from controllers.userController import UserResponse
from pydantic import BaseModel
@@ -14,22 +18,42 @@ logger = logging.getLogger("myLogger")
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
async def authenticate_user(username: str, password: str, neverExpires: bool, db_session = Depends(get_db_session)):
async def authenticate_user(
username: str, password: str, neverExpires: bool, db_session=Depends(get_db_session)
):
try:
with get_db_session() as db_session:
# Use SQLAlchemy ORM to query the database
user = db_session.query(User).filter(User.username == username, User.password == password).first()
user = (
db_session.query(User)
.filter(User.username == username, User.password == password)
.first()
)
if not user:
raise HTTPException(status_code=400, detail="Incorrect username or password")
raise HTTPException(
status_code=400, detail="Incorrect username or password"
)
# Check if there is an existing access token for the user
access_token = db_session.query(AccessToken).filter(AccessToken.user_id == user.id).first()
access_token = (
db_session.query(AccessToken)
.filter(AccessToken.user_id == user.id)
.first()
)
if access_token:
return access_token.token
# If there is no existing access token, create a new one
access_token_expires = timedelta(minutes=int(os.getenv("ACCESS_TOKEN_EXPIRE_MINUTES")))
access_token = await create_access_token(data={"id": user.id, "access_type": user.access_type}, never_expire=neverExpires, expires_delta=access_token_expires, db_session=db_session)
access_token_expires = timedelta(
minutes=int(os.getenv("ACCESS_TOKEN_EXPIRE_MINUTES"))
)
access_token = await create_access_token(
data={"id": user.id, "access_type": user.access_type},
never_expire=neverExpires,
expires_delta=access_token_expires,
db_session=db_session,
)
return access_token
@@ -37,7 +61,13 @@ async def authenticate_user(username: str, password: str, neverExpires: bool, db
logger.error(e)
return False
async def create_access_token(data: dict, never_expire: bool, expires_delta: timedelta = None, db_session = Depends(get_db_session)):
async def create_access_token(
data: dict,
never_expire: bool,
expires_delta: timedelta = None,
db_session=Depends(get_db_session),
):
to_encode = data.copy()
if never_expire:
expire = datetime.utcnow() + timedelta(days=90)
@@ -45,44 +75,63 @@ async def create_access_token(data: dict, never_expire: bool, expires_delta: tim
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=15)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, os.getenv("SECRET_KEY"), algorithm=os.getenv("ALGORITHM"))
encoded_jwt = jwt.encode(
to_encode, os.getenv("SECRET_KEY"), algorithm=os.getenv("ALGORITHM")
)
# Insert the access token into the database using SQLAlchemy
access_token = AccessToken(token=encoded_jwt, user_id=data['id'], created_at=datetime.utcnow(), expires_at=expire)
access_token = AccessToken(
token=encoded_jwt,
user_id=data["id"],
created_at=datetime.utcnow(),
expires_at=expire,
)
with get_db_session() as db_session:
db_session.add(access_token)
db_session.commit()
return encoded_jwt
def remove_expired_tokens():
try:
# Calculate the expiration time
expiration_time = datetime.utcnow() - timedelta(minutes=int(os.getenv("ACCESS_TOKEN_EXPIRE_MINUTES")))
expiration_time = datetime.utcnow() - timedelta(
minutes=int(os.getenv("ACCESS_TOKEN_EXPIRE_MINUTES"))
)
# Delete expired access tokens using SQLAlchemy ORM
with get_db_session() as db_session:
rows_deleted = db_session.query(AccessToken).filter(AccessToken.created_at < expiration_time).delete()
rows_deleted = (
db_session.query(AccessToken)
.filter(AccessToken.created_at < expiration_time)
.delete()
)
db_session.commit()
logger.info(f"{rows_deleted} access tokens deleted from the database")
except Exception as e:
logger.error(e)
def get_user_data(token: str):
try:
validate_token(token)
payload = jwt.decode(token, os.getenv("SECRET_KEY"), algorithms=[os.getenv("ALGORITHM")])
payload = jwt.decode(
token, os.getenv("SECRET_KEY"), algorithms=[os.getenv("ALGORITHM")]
)
user_id = payload.get("id")
if user_id is None:
raise HTTPException(status_code=401, detail="Invalid authentication credentials")
raise HTTPException(
status_code=401, detail="Invalid authentication credentials"
)
with get_db_session() as db_session:
# Retrieve the user details from the database using the user ID
user = db_session.query(User).filter(User.id == user_id).first()
if not user:
raise HTTPException(status_code=404, detail="User not found")
@@ -112,18 +161,22 @@ def get_user_data(token: str):
except JWTError:
raise HTTPException(status_code=401, detail="Unauthorized")
def validate_token(token: str):
try:
decoded_token = jwt.decode(token, os.getenv("SECRET_KEY"), algorithms=[os.getenv("ALGORITHM")])
decoded_token = jwt.decode(
token, os.getenv("SECRET_KEY"), algorithms=[os.getenv("ALGORITHM")]
)
user_id = decoded_token.get("id")
with get_db_session() as db_session:
access_token = db_session.query(AccessToken).filter(
AccessToken.user_id == user_id,
AccessToken.token == token
).first()
access_token = (
db_session.query(AccessToken)
.filter(AccessToken.user_id == user_id, AccessToken.token == token)
.first()
)
if access_token:
expiration_datetime = datetime.fromtimestamp(decoded_token['exp'])
expiration_datetime = datetime.fromtimestamp(decoded_token["exp"])
current_time = datetime.utcnow()
if current_time > expiration_datetime:
raise JWTError("Token expired")
@@ -132,43 +185,50 @@ def validate_token(token: str):
else:
raise JWTError("Token expired")
#if 'exp' not in decoded_token:
# if 'exp' not in decoded_token:
# return {"message": "Token is valid"}
#else:
# else:
except JWTError:
raise JWTError("Invalid token")
def validate_admin_access(token: str):
try:
payload = jwt.decode(token, os.getenv("SECRET_KEY"), algorithms=[os.getenv("ALGORITHM")])
payload = jwt.decode(
token, os.getenv("SECRET_KEY"), algorithms=[os.getenv("ALGORITHM")]
)
user_access_type = payload.get("access_type")
if user_access_type != 2:
raise HTTPException(status_code=401, detail="Unauthorized")
except JWTError:
raise JWTError("Invalid token")
class CreateTokenRequest(BaseModel):
username: str
password: str
neverExpires: bool
@router.post("/token")
async def login_for_access_token(
token: CreateTokenRequest
):
access_token = await authenticate_user(token.username, token.password, token.neverExpires)
async def login_for_access_token(token: CreateTokenRequest):
access_token = await authenticate_user(
token.username, token.password, token.neverExpires
)
if not access_token:
raise HTTPException(status_code=400, detail="Unable to retrieve access token")
return {"access_token": access_token, "token_type": "bearer"}
@router.delete("/logout/{user_id}")
async def logout(user_id: int, token: str = Depends(oauth2_scheme)):
try:
with get_db_session() as db_session:
access_token = db_session.query(AccessToken).filter(
AccessToken.user_id == user_id,
AccessToken.token == token
).first()
access_token = (
db_session.query(AccessToken)
.filter(AccessToken.user_id == user_id, AccessToken.token == token)
.first()
)
if access_token:
db_session.delete(access_token)
db_session.commit()
@@ -179,6 +239,7 @@ async def logout(user_id: int, token: str = Depends(oauth2_scheme)):
logger.error(e)
raise HTTPException(status_code=500, detail="Failed to invalidate access token")
@router.get("/validate_token")
async def check_validate_token(token: str = Depends(oauth2_scheme)):
try:
@@ -186,6 +247,7 @@ async def check_validate_token(token: str = Depends(oauth2_scheme)):
except JWTError:
raise HTTPException(status_code=401, detail="Unauthorized")
@router.get("/users/me", response_model=UserResponse)
async def read_users_me(token: str = Depends(oauth2_scheme)):
return get_user_data(token)
return get_user_data(token)

View File

@@ -11,43 +11,47 @@ from stravalib.client import Client
from pint import Quantity
import logging
import requests
import asyncio
router = APIRouter()
logger = logging.getLogger("myLogger")
# Load the environment variables from config/.env
load_dotenv('config/.env')
load_dotenv("config/.env")
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
# Strava route to link user Strava account
@router.get("/strava/strava-callback")
async def strava_callback(state: str, code: str):
token_url = 'https://www.strava.com/oauth/token'
token_url = "https://www.strava.com/oauth/token"
payload = {
'client_id': os.getenv("STRAVA_CLIENT_ID"),
'client_secret': os.getenv("STRAVA_CLIENT_SECRET"),
'code': code,
'grant_type': 'authorization_code'
"client_id": os.getenv("STRAVA_CLIENT_ID"),
"client_secret": os.getenv("STRAVA_CLIENT_SECRET"),
"code": code,
"grant_type": "authorization_code",
}
try:
response = requests.post(token_url, data=payload)
if response.status_code != 200:
raise HTTPException(status_code=response.status_code, detail="Error retrieving tokens from Strava.")
raise HTTPException(
status_code=response.status_code,
detail="Error retrieving tokens from Strava.",
)
tokens = response.json()
with get_db_session() as db_session:
# Query the activities records using SQLAlchemy
db_user = db_session.query(User).filter(User.strava_state == state).first()
if db_user:
db_user.strava_token = tokens['access_token']
db_user.strava_refresh_token = tokens['refresh_token']
db_user.strava_token_expires_at = datetime.fromtimestamp(tokens['expires_at'])
db_user.strava_token = tokens["access_token"]
db_user.strava_refresh_token = tokens["refresh_token"]
db_user.strava_token_expires_at = datetime.fromtimestamp(
tokens["expires_at"]
)
db_session.commit() # Commit the changes to the database
# Redirect to the main page or any other desired page after processing
@@ -58,13 +62,14 @@ async def strava_callback(state: str, code: str):
except JWTError:
raise HTTPException(status_code=401, detail="Unauthorized")
except Error as err:
except NameError as err:
print(err)
# Strava logic to refresh user Strava account refresh account
def refresh_strava_token():
# Strava token refresh endpoint
token_url = 'https://www.strava.com/oauth/token'
token_url = "https://www.strava.com/oauth/token"
try:
with get_db_session() as db_session:
@@ -72,7 +77,7 @@ def refresh_strava_token():
users = db_session.query(User).all()
for user in users:
#expires_at = user.strava_token_expires_at
# expires_at = user.strava_token_expires_at
if user.strava_token_expires_at is not None:
refresh_time = user.strava_token_expires_at - timedelta(minutes=60)
@@ -93,43 +98,56 @@ def refresh_strava_token():
tokens = response.json()
# Update the user in the database
db_user = db_session.query(User).filter(User.id == user.id).first()
db_user = (
db_session.query(User)
.filter(User.id == user.id)
.first()
)
if db_user:
db_user.strava_token = tokens['access_token']
db_user.strava_refresh_token = tokens['refresh_token']
db_user.strava_token_expires_at = datetime.fromtimestamp(tokens['expires_at'])
db_user.strava_token = tokens["access_token"]
db_user.strava_refresh_token = tokens["refresh_token"]
db_user.strava_token_expires_at = (
datetime.fromtimestamp(tokens["expires_at"])
)
db_session.commit() # Commit the changes to the database
logger.info(f"Token refreshed successfully for user {user.id}.")
logger.info(
f"Token refreshed successfully for user {user.id}."
)
else:
logger.error("User not found in the database.")
except requests.exceptions.RequestException as req_err:
logger.error(f"Error refreshing token for user {user.id}: {req_err}")
logger.error(
f"Error refreshing token for user {user.id}: {req_err}"
)
else:
logger.info(f"Token not refreshed for user {user.id}. Will not expire in less than 60min")
logger.info(
f"Token not refreshed for user {user.id}. Will not expire in less than 60min"
)
else:
logger.info(f"User {user.id} does not have strava linked")
except Error as db_err:
except NameError as db_err:
logger.error(f"Database error: {db_err}")
# Define an HTTP PUT route set strava unique state for link logic
@router.put("/strava/set-user-unique-state/{state}")
async def strava_set_user_unique_state(
state: str,
token: str = Depends(oauth2_scheme)
):
async def strava_set_user_unique_state(state: str, token: str = Depends(oauth2_scheme)):
from . import sessionController
try:
# Validate the user's access token using the oauth2_scheme
sessionController.validate_token(token)
with get_db_session() as db_session:
payload = jwt.decode(token, os.getenv("SECRET_KEY"), algorithms=[os.getenv("ALGORITHM")])
payload = jwt.decode(
token, os.getenv("SECRET_KEY"), algorithms=[os.getenv("ALGORITHM")]
)
user_id = payload.get("id")
# Query the database to find the user by their ID
user = db_session.query(User).filter(User.id == user_id).first()
# Check if the user with the given ID exists
if not user:
raise HTTPException(status_code=404, detail="User not found")
@@ -145,28 +163,32 @@ async def strava_set_user_unique_state(
except Exception as err:
# Handle any other unexpected exceptions
print(err)
raise HTTPException(status_code=500, detail="Failed to update user strava state")
raise HTTPException(
status_code=500, detail="Failed to update user strava state"
)
# Return a success message
return {"message": f"Strava state for user {user_id} has been updated"}
# Define an HTTP PUT route set strava unique state for link logic
@router.put("/strava/unset-user-unique-state")
async def strava_unset_user_unique_state(
token: str = Depends(oauth2_scheme)
):
async def strava_unset_user_unique_state(token: str = Depends(oauth2_scheme)):
from . import sessionController
try:
# Validate the user's access token using the oauth2_scheme
sessionController.validate_token(token)
with get_db_session() as db_session:
payload = jwt.decode(token, os.getenv("SECRET_KEY"), algorithms=[os.getenv("ALGORITHM")])
payload = jwt.decode(
token, os.getenv("SECRET_KEY"), algorithms=[os.getenv("ALGORITHM")]
)
user_id = payload.get("id")
# Query the database to find the user by their ID
user = db_session.query(User).filter(User.id == user_id).first()
# Check if the user with the given ID exists
if not user:
raise HTTPException(status_code=404, detail="User not found")
@@ -182,15 +204,18 @@ async def strava_unset_user_unique_state(
except Exception as err:
# Handle any other unexpected exceptions
print(err)
raise HTTPException(status_code=500, detail="Failed to update user strava state")
raise HTTPException(
status_code=500, detail="Failed to update user strava state"
)
# Return a success message
return {"message": f"Strava state for user {user_id} has been updated"}
# Strava logic to refresh user Strava account refresh account
def get_strava_activities(start_date: datetime):
# Strava token refresh endpoint
#activities_url = 'https://www.strava.com/api/v3/athlete/activities'
# activities_url = 'https://www.strava.com/api/v3/athlete/activities'
try:
with get_db_session() as db_session:
@@ -203,35 +228,33 @@ def get_strava_activities(start_date: datetime):
# start_date = (datetime.utcnow() - timedelta(days=7)).strftime('%Y-%m-%dT%H:%M:%SZ')
# Set parameters for the API request
#params = {
# params = {
# 'access_token': user.strava_token,
# 'after': start_date,
#}
# }
#try:
# Make a GET request to retrieve activities
# try:
# Make a GET request to retrieve activities
# response = requests.get(activities_url, params=params)
# response.raise_for_status() # Raise an error for bad responses
#store_strava_activities_stravaLib(response.json(), user.id, user.strava_token)
# store_strava_activities_stravaLib(response.json(), user.id, user.strava_token)
store_strava_activities_stravaLib(user.id, user.strava_token)
#except requests.exceptions.RequestException as req_err:
# Handle request errors
# except requests.exceptions.RequestException as req_err:
# Handle request errors
# logger.error(f"Error retrieving activities for user {user.id}: {req_err}")
# return None
else:
logger.info(f"User {user.id} does not have strava linked")
except Error as db_err:
except NameError as db_err:
logger.error(f"Database error: {db_err}")
def store_strava_activities_stravaLib(user_id, strava_token):
from . import activitiesController
stravaClient = Client(access_token=strava_token)
start_date = (datetime.utcnow() - timedelta(days=7)).strftime('%Y-%m-%dT%H:%M:%SZ')
for activity in stravaClient.get_activities(after = start_date):
start_date = (datetime.utcnow() - timedelta(days=7)).strftime("%Y-%m-%dT%H:%M:%SZ")
for activity in stravaClient.get_activities(after=start_date):
with get_db_session() as db_session:
# Use SQLAlchemy to query the gear record by ID
activity_record = (
@@ -243,18 +266,22 @@ def store_strava_activities_stravaLib(user_id, strava_token):
if activity_record:
# Skip to the next iteration
continue
start_date_parsed = activity.start_date
# Ensure activity.elapsed_time is a numerical value
elapsed_time_seconds = (
activity.elapsed_time.total_seconds() if isinstance(activity.elapsed_time, timedelta) else activity.elapsed_time
activity.elapsed_time.total_seconds()
if isinstance(activity.elapsed_time, timedelta)
else activity.elapsed_time
)
end_date_parsed = start_date_parsed + timedelta(
seconds=elapsed_time_seconds
)
end_date_parsed = start_date_parsed + timedelta(seconds=elapsed_time_seconds)
latitude = 0
longitude = 0
if hasattr(activity, 'latlng') and activity.latlng is not None:
if hasattr(activity, "latlng") and activity.latlng is not None:
latitude = activity.latlng[0]
longitude = activity.latlng[1]
@@ -281,7 +308,7 @@ def store_strava_activities_stravaLib(user_id, strava_token):
print(f"Error location: {url}")
except Exception as e:
print(f"An error occurred: {e}")
# List to store constructed waypoints
waypoints = []
@@ -291,28 +318,41 @@ def store_strava_activities_stravaLib(user_id, strava_token):
previous_elevation = None
# Get streams for the activity
streams = stravaClient.get_activity_streams(activity.id, types=['latlng', 'altitude', 'time', 'heartrate', 'cadence', 'watts', 'velocity_smooth'])
streams = stravaClient.get_activity_streams(
activity.id,
types=[
"latlng",
"altitude",
"time",
"heartrate",
"cadence",
"watts",
"velocity_smooth",
],
)
# Extract data from streams
latitudes = streams['latlng'].data if 'latlng' in streams else []
longitudes = streams['latlng'].data if 'latlng' in streams else []
elevations = streams['altitude'].data if 'altitude' in streams else []
times = streams['time'].data if 'time' in streams else []
heart_rates = streams['heartrate'].data if 'heartrate' in streams else []
cadences = streams['cadence'].data if 'cadence' in streams else []
powers = streams['watts'].data if 'watts' in streams else []
velocities = streams['velocity_smooth'].data if 'velocity_smooth' in streams else []
latitudes = streams["latlng"].data if "latlng" in streams else []
longitudes = streams["latlng"].data if "latlng" in streams else []
elevations = streams["altitude"].data if "altitude" in streams else []
times = streams["time"].data if "time" in streams else []
heart_rates = streams["heartrate"].data if "heartrate" in streams else []
cadences = streams["cadence"].data if "cadence" in streams else []
powers = streams["watts"].data if "watts" in streams else []
velocities = (
streams["velocity_smooth"].data if "velocity_smooth" in streams else []
)
for i in range(len(latitudes)):
waypoint = {
'lat': latitudes[i] if i < len(latitudes) else None,
'lon': longitudes[i] if i < len(longitudes) else None,
'ele': elevations[i] if i < len(elevations) else None,
'time': times[i] if i < len(times) else None,
'hr': heart_rates[i] if i < len(heart_rates) else None,
'cad': cadences[i] if i < len(cadences) else None,
'power': powers[i] if i < len(powers) else None,
'vel': velocities[i] if i < len(velocities) else None,
"lat": latitudes[i] if i < len(latitudes) else None,
"lon": longitudes[i] if i < len(longitudes) else None,
"ele": elevations[i] if i < len(elevations) else None,
"time": times[i] if i < len(times) else None,
"hr": heart_rates[i] if i < len(heart_rates) else None,
"cad": cadences[i] if i < len(cadences) else None,
"power": powers[i] if i < len(powers) else None,
"vel": velocities[i] if i < len(velocities) else None,
# Add other relevant fields based on your requirements
}
@@ -335,7 +375,11 @@ def store_strava_activities_stravaLib(user_id, strava_token):
average_speed = 0
if activity.average_speed is not None:
average_speed = float(activity.average_speed.magnitude) if isinstance(activity.average_speed, Quantity) else activity.average_speed
average_speed = (
float(activity.average_speed.magnitude)
if isinstance(activity.average_speed, Quantity)
else activity.average_speed
)
average_pace = 1 / average_speed if average_speed != 0 else 0
@@ -359,15 +403,17 @@ def store_strava_activities_stravaLib(user_id, strava_token):
"virtual_ride": 7,
"swimming": 8,
"open_water_swimming": 8,
"Workout": 10
"Workout": 10,
}
auxType = type_mapping.get(activity.workout_type, 10)
# Create a new Activity record
activity = Activity(
user_id=user_id,
name=activity.name,
distance=round(float(activity.distance)) if isinstance(activity.distance, Quantity) else round(activity.distance),
distance=round(float(activity.distance))
if isinstance(activity.distance, Quantity)
else round(activity.distance),
activity_type=auxType,
start_time=start_date_parsed,
end_time=end_date_parsed,
@@ -381,7 +427,7 @@ def store_strava_activities_stravaLib(user_id, strava_token):
pace=average_pace,
average_speed=average_speed,
average_power=average_watts,
strava_activity_id=activity.id
strava_activity_id=activity.id,
)
try:
@@ -389,8 +435,10 @@ def store_strava_activities_stravaLib(user_id, strava_token):
with get_db_session() as db_session:
db_session.add(activity)
db_session.commit()
db_session.refresh(activity) # This will ensure that the activity object is updated with the ID from the database
return {"message": f"Activities retrieved"}
db_session.refresh(
activity
) # This will ensure that the activity object is updated with the ID from the database
return {"message": "Activities retrieved"}
except Exception as err:
print(err)
logger.error(err)
@@ -398,13 +446,14 @@ def store_strava_activities_stravaLib(user_id, strava_token):
def store_strava_activities(strava_activities, user_id, strava_token):
from . import activitiesController
try:
params = {
'access_token': strava_token,
"access_token": strava_token,
}
#start_date = (datetime.utcnow() - timedelta(days=7)).strftime('%Y-%m-%dT%H:%M:%SZ')
#strava_activities = get_strava_activities(start_date)
# start_date = (datetime.utcnow() - timedelta(days=7)).strftime('%Y-%m-%dT%H:%M:%SZ')
# strava_activities = get_strava_activities(start_date)
if strava_activities:
for activity in strava_activities:
try:
@@ -419,9 +468,13 @@ def store_strava_activities(strava_activities, user_id, strava_token):
if activity_record:
# Skip to the next iteration
continue
start_date_parsed = activitiesController.parse_timestamp(activity.get("start_date"))
end_date_parsed = start_date_parsed + timedelta(seconds=activity.get("elapsed_time"))
start_date_parsed = activitiesController.parse_timestamp(
activity.get("start_date")
)
end_date_parsed = start_date_parsed + timedelta(
seconds=activity.get("elapsed_time")
)
latitude, longitude = activity.get("start_latlng", [0, 0])
city = None
@@ -449,8 +502,8 @@ def store_strava_activities(strava_activities, user_id, strava_token):
print(f"An error occurred: {e}")
activity_id = activity.get("id")
strava_api_url_streams = f'https://www.strava.com/api/v3/activities/{activity_id}/streams'
strava_api_url_streams = f"https://www.strava.com/api/v3/activities/{activity_id}/streams"
# List to store constructed waypoints
waypoints = []
@@ -468,34 +521,82 @@ def store_strava_activities(strava_activities, user_id, strava_token):
activity_streams = response.json()
# Extract relevant streams data
latlng_stream = activity_streams.get("latlng", {}).get("data", [])
time_stream = activity_streams.get("time", {}).get("data", [])
elevation_stream = activity_streams.get("altitude", {}).get("data", [])
heart_rate_stream = activity_streams.get("heartrate", {}).get("data", [])
cadence_stream = activity_streams.get("cadence", {}).get("data", [])
power_stream = activity_streams.get("watts", {}).get("data", [])
velocity_stream = activity_streams.get("velocity_smooth", {}).get("data", [])
latlng_stream = activity_streams.get("latlng", {}).get(
"data", []
)
time_stream = activity_streams.get("time", {}).get(
"data", []
)
elevation_stream = activity_streams.get("altitude", {}).get(
"data", []
)
heart_rate_stream = activity_streams.get(
"heartrate", {}
).get("data", [])
cadence_stream = activity_streams.get("cadence", {}).get(
"data", []
)
power_stream = activity_streams.get("watts", {}).get(
"data", []
)
velocity_stream = activity_streams.get(
"velocity_smooth", {}
).get("data", [])
# Ensure all streams have the same length (adjust as needed)
stream_length = min(len(latlng_stream), len(time_stream), len(elevation_stream),
len(heart_rate_stream), len(cadence_stream), len(power_stream),
len(velocity_stream))
stream_length = min(
len(latlng_stream),
len(time_stream),
len(elevation_stream),
len(heart_rate_stream),
len(cadence_stream),
len(power_stream),
len(velocity_stream),
)
# Iterate over the streams and construct waypoints
for i in range(stream_length):
latitude, longitude = latlng_stream[i] if i < len(latlng_stream) else (0, 0)
latitude, longitude = (
latlng_stream[i]
if i < len(latlng_stream)
else (0, 0)
)
time = time_stream[i] if i < len(time_stream) else ""
elevation = elevation_stream[i] if i < len(elevation_stream) else 0
heart_rate = heart_rate_stream[i] if i < len(heart_rate_stream) else 0
cadence = cadence_stream[i] if i < len(cadence_stream) else 0
elevation = (
elevation_stream[i]
if i < len(elevation_stream)
else 0
)
heart_rate = (
heart_rate_stream[i]
if i < len(heart_rate_stream)
else 0
)
cadence = (
cadence_stream[i] if i < len(cadence_stream) else 0
)
power = power_stream[i] if i < len(power_stream) else 0
velocity = velocity_stream[i] if i < len(velocity_stream) else 0
elevation_current = elevation_stream[i] if i < len(elevation_stream) else 0
elevation_previous = elevation_stream[i - 1] if i - 1 < len(elevation_stream) else 0
velocity = (
velocity_stream[i]
if i < len(velocity_stream)
else 0
)
elevation_current = (
elevation_stream[i]
if i < len(elevation_stream)
else 0
)
elevation_previous = (
elevation_stream[i - 1]
if i - 1 < len(elevation_stream)
else 0
)
# Calculate the difference in elevation
elevation_difference = elevation_current - elevation_previous
elevation_difference = (
elevation_current - elevation_previous
)
# Update elevation gain and loss based on the difference
if elevation_difference > 0:
@@ -505,14 +606,14 @@ def store_strava_activities(strava_activities, user_id, strava_token):
# Construct the waypoint dictionary
waypoint = {
'lat': latitude,
'lon': longitude,
'ele': elevation,
'time': time,
'hr': heart_rate,
'cad': cadence,
'power': power,
'vel': velocity,
"lat": latitude,
"lon": longitude,
"ele": elevation,
"time": time,
"hr": heart_rate,
"cad": cadence,
"power": power,
"vel": velocity,
# Add other relevant fields based on your requirements
}
@@ -524,10 +625,13 @@ def store_strava_activities(strava_activities, user_id, strava_token):
except Exception as e:
print(f"An error occurred: {e}")
average_pace = 1 / activity.get("average_speed", 0) if activity.get("average_speed", 0) != 0 else 0
average_pace = (
1 / activity.get("average_speed", 0)
if activity.get("average_speed", 0) != 0
else 0
)
#activitiesController.create_activity([activity.get("distance", 0), activity.get("name", "Unnamed activity"), activity.get("type", "Workout"), start_date_parsed, end_date_parsed, city, town, country, waypoints, elevation_gain, elevation_loss, average_pace, activity.get("average_speed", 0), activity.get("average_watts", 0), activity.get("id")],token)
# activitiesController.create_activity([activity.get("distance", 0), activity.get("name", "Unnamed activity"), activity.get("type", "Workout"), start_date_parsed, end_date_parsed, city, town, country, waypoints, elevation_gain, elevation_loss, average_pace, activity.get("average_speed", 0), activity.get("average_watts", 0), activity.get("id")],token)
auxType = 10 # Default value
type_mapping = {
@@ -545,10 +649,10 @@ def store_strava_activities(strava_activities, user_id, strava_token):
"virtual_ride": 7,
"swimming": 8,
"open_water_swimming": 8,
"Workout": 10
"Workout": 10,
}
auxType = type_mapping.get(activity.get("type", "Workout"), 10)
# Create a new Activity record
activity = Activity(
user_id=user_id,
@@ -567,19 +671,21 @@ def store_strava_activities(strava_activities, user_id, strava_token):
pace=average_pace,
average_speed=activity.get("average_speed", 0),
average_power=activity.get("average_watts", 0),
strava_activity_id=activity.get("id")
strava_activity_id=activity.get("id"),
)
# Store the Activity record in the database
with get_db_session() as db_session:
db_session.add(activity)
db_session.commit()
db_session.refresh(activity) # This will ensure that the activity object is updated with the ID from the database
return {"message": f"Activities retrieved"}
db_session.refresh(
activity
) # This will ensure that the activity object is updated with the ID from the database
return {"message": "Activities retrieved"}
except Exception as err:
print(err)
logger.error(err)
else:
logger.info("No Strava activities returned")
except JWTError:
raise HTTPException(status_code=401, detail="Unauthorized")
raise HTTPException(status_code=401, detail="Unauthorized")

View File

@@ -1,13 +1,16 @@
from fastapi import APIRouter, Depends, HTTPException
from fastapi.security import OAuth2PasswordBearer
from sqlalchemy.orm import Session
from pydantic import BaseModel
from typing import List, Optional
from jose import jwt, JWTError
from jose import JWTError
import logging
from datetime import date
from . import sessionController # Import your session controller module
from db.db import get_db_session, User, Gear # Import your SQLAlchemy User modelfrom db.db import get_db_session, User, AccessToken
from db.db import (
get_db_session,
User,
Gear,
) # Import your SQLAlchemy User modelfrom db.db import get_db_session, User, AccessToken
from urllib.parse import unquote
# Create an instance of an APIRouter
@@ -15,6 +18,7 @@ router = APIRouter()
logger = logging.getLogger("myLogger")
class UserResponse(BaseModel):
id: int
name: str
@@ -30,8 +34,10 @@ class UserResponse(BaseModel):
is_active: int
is_strava_linked: Optional[int]
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
# Define an HTTP GET route to retrieve all users
@router.get("/users/all", response_model=list[dict])
async def read_users_all(token: str = Depends(oauth2_scheme)):
@@ -43,8 +49,8 @@ async def read_users_all(token: str = Depends(oauth2_scheme)):
sessionController.validate_admin_access(token)
# Create a database session using the get_db_session context manager
with get_db_session() as db_session:
# Query all users from the database
with get_db_session() as db_session:
# Query all users from the database
users = db_session.query(User).all()
# Convert the SQLAlchemy User objects to dictionaries
@@ -52,10 +58,11 @@ async def read_users_all(token: str = Depends(oauth2_scheme)):
except JWTError:
# Handle JWT (JSON Web Token) authentication error
raise HTTPException(status_code=401, detail="Unauthorized")
# Return the list of user dictionaries as the response
return results
# Define an HTTP GET route to retrieve the number of users
@router.get("/users/number")
async def read_users_number(token: str = Depends(oauth2_scheme)):
@@ -73,16 +80,18 @@ async def read_users_number(token: str = Depends(oauth2_scheme)):
except JWTError:
# Handle JWT (JSON Web Token) authentication error
raise HTTPException(status_code=401, detail="Unauthorized")
# Return the user count as a JSON response
return {0: user_count}
# Define an HTTP GET route to retrieve user records with pagination
@router.get("/users/all/pagenumber/{pageNumber}/numRecords/{numRecords}", response_model=List[dict])
@router.get(
"/users/all/pagenumber/{pageNumber}/numRecords/{numRecords}",
response_model=List[dict],
)
async def read_users_all_pagination(
pageNumber: int,
numRecords: int,
token: str = Depends(oauth2_scheme)
pageNumber: int, numRecords: int, token: str = Depends(oauth2_scheme)
):
try:
# Validate the user's access token using the oauth2_scheme
@@ -108,16 +117,19 @@ async def read_users_all_pagination(
except JWTError:
# Handle JWT (JSON Web Token) authentication error
raise HTTPException(status_code=401, detail="Unauthorized")
except Error as err:
except NameError as err:
# Handle any other SQLAlchemy or database errors
print(err)
# Return the list of user records as a JSON response
return results
# Define an HTTP GET route to retrieve user records by username
@router.get("/users/{username}/userfromusername", response_model=List[dict])
async def read_users_userFromUsername(username: str, token: str = Depends(oauth2_scheme)):
async def read_users_userFromUsername(
username: str, token: str = Depends(oauth2_scheme)
):
try:
# Validate the user's access token using the oauth2_scheme
sessionController.validate_token(token)
@@ -143,13 +155,14 @@ async def read_users_userFromUsername(username: str, token: str = Depends(oauth2
except JWTError:
# Handle JWT (JSON Web Token) authentication error
raise HTTPException(status_code=401, detail="Unauthorized")
except Error as err:
except NameError as err:
# Handle any other SQLAlchemy or database errors
print(err)
# Return the list of user records as a JSON response
return results
# Define an HTTP GET route to retrieve user records by user ID
@router.get("/users/{user_id}/userfromid", response_model=List[dict])
async def read_users_userFromId(user_id: int, token: str = Depends(oauth2_scheme)):
@@ -163,11 +176,7 @@ async def read_users_userFromId(user_id: int, token: str = Depends(oauth2_scheme
# Create a database session using the get_db_session context manager
with get_db_session() as db_session:
# Use SQLAlchemy to query the user records by user ID
user_records = (
db_session.query(User)
.filter(User.id == user_id)
.all()
)
user_records = db_session.query(User).filter(User.id == user_id).all()
# Convert the SQLAlchemy results to a list of dictionaries
results = [record.__dict__ for record in user_records]
@@ -175,16 +184,19 @@ async def read_users_userFromId(user_id: int, token: str = Depends(oauth2_scheme
except JWTError:
# Handle JWT (JSON Web Token) authentication error
raise HTTPException(status_code=401, detail="Unauthorized")
except Error as err:
except NameError as err:
# Handle any other SQLAlchemy or database errors
print(err)
# Return the list of user records as a JSON response
return results
# Define an HTTP GET route to retrieve user ID by username
@router.get("/users/{username}/useridfromusername")
async def read_users_userIDFromUsername(username: str, token: str = Depends(oauth2_scheme)):
async def read_users_userIDFromUsername(
username: str, token: str = Depends(oauth2_scheme)
):
try:
# Validate the user's access token using the oauth2_scheme
sessionController.validate_token(token)
@@ -195,7 +207,11 @@ async def read_users_userIDFromUsername(username: str, token: str = Depends(oaut
# Create a database session using the get_db_session context manager
with get_db_session() as db_session:
# Use SQLAlchemy to query the user ID by username
user_id = db_session.query(User.id).filter(User.username == unquote(username).replace("+", " ")).first()
user_id = (
db_session.query(User.id)
.filter(User.username == unquote(username).replace("+", " "))
.first()
)
except JWTError:
# Handle JWT (JSON Web Token) authentication error
raise HTTPException(status_code=401, detail="Unauthorized")
@@ -203,6 +219,7 @@ async def read_users_userIDFromUsername(username: str, token: str = Depends(oaut
# Return the user ID as a JSON response
return {0: user_id}
# Define an HTTP GET route to retrieve user photos by user ID
@router.get("/users/{user_id}/userphotofromid")
async def read_users_userPhotoFromID(user_id: int, token: str = Depends(oauth2_scheme)):
@@ -223,7 +240,9 @@ async def read_users_userPhotoFromID(user_id: int, token: str = Depends(oauth2_s
photo_path = user.photo_path
else:
# Handle the case where the user was not found or doesn't have a photo path
raise HTTPException(status_code=404, detail="User not found or no photo path available")
raise HTTPException(
status_code=404, detail="User not found or no photo path available"
)
except JWTError:
# Handle JWT (JSON Web Token) authentication error
@@ -231,11 +250,13 @@ async def read_users_userPhotoFromID(user_id: int, token: str = Depends(oauth2_s
# Return the user's photo path as a JSON response
return {0: photo_path}
# Define an HTTP GET route to retrieve user photos aux by user ID
@router.get("/users/{user_id}/userphotoauxfromid")
async def read_users_userPhotoAuxFromID(user_id: int, token: str = Depends(oauth2_scheme)):
async def read_users_userPhotoAuxFromID(
user_id: int, token: str = Depends(oauth2_scheme)
):
try:
# Validate the user's access token using the oauth2_scheme
sessionController.validate_token(token)
@@ -246,14 +267,19 @@ async def read_users_userPhotoAuxFromID(user_id: int, token: str = Depends(oauth
# Create a database session using the get_db_session context manager
with get_db_session() as db_session:
# Use SQLAlchemy to query the user's photo path by user ID
user = db_session.query(User.photo_path_aux).filter(User.id == user_id).first()
user = (
db_session.query(User.photo_path_aux).filter(User.id == user_id).first()
)
if user:
# Extract the photo_path_aux attribute from the user object
photo_path_aux = user.photo_path_aux
else:
# Handle the case where the user was not found or doesn't have a photo path
raise HTTPException(status_code=404, detail="User not found or no photo path aux available")
raise HTTPException(
status_code=404,
detail="User not found or no photo path aux available",
)
except JWTError:
# Handle JWT (JSON Web Token) authentication error
@@ -262,6 +288,7 @@ async def read_users_userPhotoAuxFromID(user_id: int, token: str = Depends(oauth
# Return the user's photo path as a JSON response
return {0: photo_path_aux}
class CreateUserRequest(BaseModel):
name: str
username: str
@@ -276,12 +303,10 @@ class CreateUserRequest(BaseModel):
photo_path_aux: Optional[str]
is_active: int
# Define an HTTP POST route to create a new user
@router.post("/users/create")
async def create_user(
user: CreateUserRequest,
token: str = Depends(oauth2_scheme)
):
async def create_user(user: CreateUserRequest, token: str = Depends(oauth2_scheme)):
try:
# Validate the user's access token using the oauth2_scheme
sessionController.validate_token(token)
@@ -302,7 +327,7 @@ async def create_user(
access_type=user.access_type,
photo_path=user.photo_path,
photo_path_aux=user.photo_path_aux,
is_active=user.is_active
is_active=user.is_active,
)
with get_db_session() as db_session:
@@ -311,11 +336,12 @@ async def create_user(
db_session.commit()
return {"message": "User added successfully"}
except Error as err:
except NameError as err:
# Handle any database-related errors
print(err)
raise HTTPException(status_code=500, detail="Internal server error")
class EditUserRequest(BaseModel):
name: str
username: str
@@ -329,12 +355,13 @@ class EditUserRequest(BaseModel):
photo_path_aux: Optional[str]
is_active: int
# Define an HTTP PUT route to edit a user's information
@router.put("/users/{user_id}/edit")
async def edit_user(
user_id: int,
user_attributtes: EditUserRequest,
token: str = Depends(oauth2_scheme) # Add the Request dependency
token: str = Depends(oauth2_scheme), # Add the Request dependency
):
try:
# Validate the user's access token using the oauth2_scheme
@@ -346,7 +373,7 @@ async def edit_user(
with get_db_session() as db_session:
# Query the database to find the user by their ID
user = db_session.query(User).filter(User.id == user_id).first()
# Check if the user with the given ID exists
if not user:
raise HTTPException(status_code=404, detail="User not found")
@@ -387,12 +414,10 @@ async def edit_user(
return {"message": "User edited successfully"}
# Define an HTTP PUT route to delete a user's photo
@router.put("/users/{user_id}/delete-photo")
async def delete_user_photo(
user_id: int,
token: str = Depends(oauth2_scheme)
):
async def delete_user_photo(user_id: int, token: str = Depends(oauth2_scheme)):
try:
# Validate the user's access token using the oauth2_scheme
sessionController.validate_token(token)
@@ -403,7 +428,7 @@ async def delete_user_photo(
with get_db_session() as db_session:
# Query the database to find the user by their ID
user = db_session.query(User).filter(User.id == user_id).first()
# Check if the user with the given ID exists
if not user:
raise HTTPException(status_code=404, detail="User not found")
@@ -425,12 +450,10 @@ async def delete_user_photo(
# Return a success message
return {"message": f"Photo for user {user_id} has been deleted"}
# Define an HTTP DELETE route to delete a user
@router.delete("/users/{user_id}/delete")
async def delete_user(
user_id: int,
token: str = Depends(oauth2_scheme)
):
async def delete_user(user_id: int, token: str = Depends(oauth2_scheme)):
try:
# Validate the user's access token using the oauth2_scheme
sessionController.validate_token(token)
@@ -440,21 +463,25 @@ async def delete_user(
with get_db_session() as db_session:
user = db_session.query(User).filter(User.id == user_id).first()
# Check if the user with the given ID exists
if not user:
raise HTTPException(status_code=404, detail="User not found")
# Check for existing dependencies if needed (e.g., related systems)
count_gear = db_session.query(Gear).filter(Gear.user_id == user_id).count()
if count_gear > 0:
raise HTTPException(status_code=409, detail="Cannot delete user due to existing dependencies")
raise HTTPException(
status_code=409,
detail="Cannot delete user due to existing dependencies",
)
# Delete the user from the database
db_session.delete(user)
db_session.commit()
except JWTError:
raise HTTPException(status_code=401, detail="Unauthorized")
except Exception as err:
print(err)
raise HTTPException(status_code=500, detail="Failed to delete user")
# Return a success message upon successful deletion

226
db/db.py
View File

@@ -3,19 +3,29 @@ import urllib.parse # Import urllib.parse for URL encoding
import logging
from sqlalchemy.orm import sessionmaker, relationship
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import create_engine, Column, Integer, String, Date, DateTime, ForeignKey, LargeBinary, DECIMAL, BigInteger
from sqlalchemy import (
create_engine,
Column,
Integer,
String,
Date,
DateTime,
ForeignKey,
DECIMAL,
BigInteger,
)
from dotenv import load_dotenv
from datetime import datetime
from sqlalchemy.dialects.mysql import JSON
from contextlib import contextmanager
# Load the environment variables from config/.env
load_dotenv('config/.env')
load_dotenv("config/.env")
logger = logging.getLogger("myLogger")
# Define the database connection URL using environment variables
#db_url = f"mysql+mysqlconnector://{os.getenv('DB_USER')}:{os.getenv('DB_PASSWORD')}@{os.getenv('DB_HOST')}:{os.getenv('DB_PORT')}/{os.getenv('DB_DATABASE')}"
db_password = urllib.parse.quote_plus(os.getenv('DB_PASSWORD'))
# db_url = f"mysql+mysqlconnector://{os.getenv('DB_USER')}:{os.getenv('DB_PASSWORD')}@{os.getenv('DB_HOST')}:{os.getenv('DB_PORT')}/{os.getenv('DB_DATABASE')}"
db_password = urllib.parse.quote_plus(os.getenv("DB_PASSWORD"))
db_url = f"mysql://{os.getenv('DB_USER')}:{db_password}@{os.getenv('DB_HOST')}:{os.getenv('DB_PORT')}/{os.getenv('DB_DATABASE')}"
# Create the SQLAlchemy engine
@@ -27,71 +37,126 @@ Session = sessionmaker(bind=engine)
# Create a base class for declarative models
Base = declarative_base()
# Data model for users table using SQLAlchemy's ORM
class User(Base):
__tablename__ = 'users'
__tablename__ = "users"
id = Column(Integer, primary_key=True)
name = Column(String(length=45), nullable=False, comment='User real name (May include spaces)')
username = Column(String(length=45), nullable=False, unique=True, comment='User username (letters, numbers, and dots allowed)')
email = Column(String(length=250), nullable=False, unique=True, comment='User email (max 45 characters)')
password = Column(String(length=100), nullable=False, comment='User password (hash)')
city = Column(String(length=45), nullable=True, comment='User city')
birthdate = Column(Date, nullable=True, comment='User birthdate (date)')
preferred_language = Column(String(length=5), nullable=False, comment='User preferred language (en, pt, others)')
gender = Column(Integer, nullable=False, comment='User gender (one digit)(1 - male, 2 - female)')
access_type = Column(Integer, nullable=False, comment='User type (one digit)(1 - student, 2 - admin)')
photo_path = Column(String(length=250), nullable=True, comment='User photo path')
photo_path_aux = Column(String(length=250), nullable=True, comment='Auxiliary photo path')
is_active = Column(Integer, nullable=False, comment='Is user active (2 - not active, 1 - active)')
name = Column(
String(length=45), nullable=False, comment="User real name (May include spaces)"
)
username = Column(
String(length=45),
nullable=False,
unique=True,
comment="User username (letters, numbers, and dots allowed)",
)
email = Column(
String(length=250),
nullable=False,
unique=True,
comment="User email (max 45 characters)",
)
password = Column(
String(length=100), nullable=False, comment="User password (hash)"
)
city = Column(String(length=45), nullable=True, comment="User city")
birthdate = Column(Date, nullable=True, comment="User birthdate (date)")
preferred_language = Column(
String(length=5),
nullable=False,
comment="User preferred language (en, pt, others)",
)
gender = Column(
Integer, nullable=False, comment="User gender (one digit)(1 - male, 2 - female)"
)
access_type = Column(
Integer, nullable=False, comment="User type (one digit)(1 - student, 2 - admin)"
)
photo_path = Column(String(length=250), nullable=True, comment="User photo path")
photo_path_aux = Column(
String(length=250), nullable=True, comment="Auxiliary photo path"
)
is_active = Column(
Integer, nullable=False, comment="Is user active (2 - not active, 1 - active)"
)
strava_state = Column(String(length=45), nullable=True)
strava_token = Column(String(length=250), nullable=True)
strava_refresh_token = Column(String(length=250), nullable=True)
strava_token_expires_at = Column(DateTime, nullable=True)
# Define a relationship to AccessToken model
access_tokens = relationship('AccessToken', back_populates='user')
access_tokens = relationship("AccessToken", back_populates="user")
# Define a relationship to Gear model
gear = relationship('Gear', back_populates='user')
gear = relationship("Gear", back_populates="user")
# Establish a one-to-many relationship with 'activities'
activities = relationship('Activity', back_populates='user')
activities = relationship("Activity", back_populates="user")
# Establish a one-to-many relationship between User and UserSettings
# user_settings = relationship("UserSettings", back_populates="user")
# user_settings = relationship("UserSettings", back_populates="user")
# Data model for access_tokens table using SQLAlchemy's ORM
class AccessToken(Base):
__tablename__ = 'access_tokens'
__tablename__ = "access_tokens"
id = Column(Integer, primary_key=True)
token = Column(String(length=256), nullable=False, comment='User token')
user_id = Column(Integer, ForeignKey('users.id'), nullable=False, comment='User ID that the token belongs')
created_at = Column(DateTime, nullable=False, comment='Token creation date (date)')
expires_at = Column(DateTime, nullable=False, comment='Token expiration date (date)')
token = Column(String(length=256), nullable=False, comment="User token")
user_id = Column(
Integer,
ForeignKey("users.id"),
nullable=False,
comment="User ID that the token belongs",
)
created_at = Column(DateTime, nullable=False, comment="Token creation date (date)")
expires_at = Column(
DateTime, nullable=False, comment="Token expiration date (date)"
)
# Define a relationship to the User model
user = relationship('User', back_populates='access_tokens')
user = relationship("User", back_populates="access_tokens")
# Data model for gear table using SQLAlchemy's ORM
class Gear(Base):
__tablename__ = 'gear'
__tablename__ = "gear"
id = Column(Integer, primary_key=True)
brand = Column(String(length=45), nullable=True, comment='Gear brand (May include spaces)')
model = Column(String(length=45), nullable=True, comment='Gear model (May include spaces)')
nickname = Column(String(length=45), nullable=False, comment='Gear nickname (May include spaces)')
gear_type = Column(Integer, nullable=False, comment='Gear type (1 - bike, 2 - shoes, 3 - wetsuit)')
user_id = Column(Integer, ForeignKey('users.id'), nullable=False, comment='User ID that the gear belongs to')
created_at = Column(DateTime, nullable=False, comment='Gear creation date (date)')
is_active = Column(Integer, nullable=False, comment='Is gear active (0 - not active, 1 - active)')
brand = Column(
String(length=45), nullable=True, comment="Gear brand (May include spaces)"
)
model = Column(
String(length=45), nullable=True, comment="Gear model (May include spaces)"
)
nickname = Column(
String(length=45), nullable=False, comment="Gear nickname (May include spaces)"
)
gear_type = Column(
Integer, nullable=False, comment="Gear type (1 - bike, 2 - shoes, 3 - wetsuit)"
)
user_id = Column(
Integer,
ForeignKey("users.id"),
nullable=False,
comment="User ID that the gear belongs to",
)
created_at = Column(DateTime, nullable=False, comment="Gear creation date (date)")
is_active = Column(
Integer, nullable=False, comment="Is gear active (0 - not active, 1 - active)"
)
# Define a relationship to the User model
user = relationship('User', back_populates='gear')
user = relationship("User", back_populates="gear")
# Establish a one-to-many relationship with 'activities'
activities = relationship('Activity', back_populates='gear')
activities = relationship("Activity", back_populates="gear")
# Establish a one-to-many relationship between Gear and UserSettings
# user_settings = relationship("UserSettings", back_populates="gear")
#class UserSettings(Base):
# user_settings = relationship("UserSettings", back_populates="gear")
# class UserSettings(Base):
# __tablename__ = 'user_settings'
# id = Column(Integer, primary_key=True, autoincrement=True)
@@ -103,38 +168,73 @@ class Gear(Base):
# user = relationship("User", back_populates="user_settings")
# gear = relationship("Gear", back_populates="user_settings")
# Data model for activities table using SQLAlchemy's ORM
class Activity(Base):
__tablename__ = 'activities'
__tablename__ = "activities"
id = Column(Integer, primary_key=True, autoincrement=True)
user_id = Column(Integer, ForeignKey('users.id'), nullable=False, comment='User ID that the activity belongs')
name = Column(String(length=45), nullable=True, comment='Activity name (May include spaces)')
distance = Column(Integer, nullable=False, comment='Distance in meters')
activity_type = Column(Integer, nullable=False, comment='Gear type (1 - mountain bike, 2 - gravel bike, ...)')
start_time = Column(DateTime, nullable=False, comment='Activity start date (datetime)')
end_time = Column(DateTime, nullable=False, comment='Activity end date (datetime)')
city = Column(String(length=45), nullable=True, comment='Activity city (May include spaces)')
town = Column(String(length=45), nullable=True, comment='Activity town (May include spaces)')
country = Column(String(length=45), nullable=True, comment='Activity country (May include spaces)')
created_at = Column(DateTime, nullable=False, comment='Activity creation date (datetime)')
waypoints = Column(JSON, nullable=True, doc='Store waypoints data')
elevation_gain = Column(Integer, nullable=False, comment='Elevation gain in meters')
elevation_loss = Column(Integer, nullable=False, comment='Elevation loss in meters')
pace = Column(DECIMAL(precision=20, scale=10), nullable=False, comment='Pace seconds per meter (s/m)')
average_speed = Column(DECIMAL(precision=20, scale=10), nullable=False, comment='Average speed seconds per meter (s/m)')
average_power = Column(Integer, nullable=False, comment='Average power (watts)')
gear_id = Column(Integer, ForeignKey('gear.id'), nullable=True, comment='Gear ID associated with this activity')
strava_activity_id = Column(BigInteger, nullable=True, comment='Strava activity ID')
user_id = Column(
Integer,
ForeignKey("users.id"),
nullable=False,
comment="User ID that the activity belongs",
)
name = Column(
String(length=45), nullable=True, comment="Activity name (May include spaces)"
)
distance = Column(Integer, nullable=False, comment="Distance in meters")
activity_type = Column(
Integer,
nullable=False,
comment="Gear type (1 - mountain bike, 2 - gravel bike, ...)",
)
start_time = Column(
DateTime, nullable=False, comment="Activity start date (datetime)"
)
end_time = Column(DateTime, nullable=False, comment="Activity end date (datetime)")
city = Column(
String(length=45), nullable=True, comment="Activity city (May include spaces)"
)
town = Column(
String(length=45), nullable=True, comment="Activity town (May include spaces)"
)
country = Column(
String(length=45),
nullable=True,
comment="Activity country (May include spaces)",
)
created_at = Column(
DateTime, nullable=False, comment="Activity creation date (datetime)"
)
waypoints = Column(JSON, nullable=True, doc="Store waypoints data")
elevation_gain = Column(Integer, nullable=False, comment="Elevation gain in meters")
elevation_loss = Column(Integer, nullable=False, comment="Elevation loss in meters")
pace = Column(
DECIMAL(precision=20, scale=10),
nullable=False,
comment="Pace seconds per meter (s/m)",
)
average_speed = Column(
DECIMAL(precision=20, scale=10),
nullable=False,
comment="Average speed seconds per meter (s/m)",
)
average_power = Column(Integer, nullable=False, comment="Average power (watts)")
gear_id = Column(
Integer,
ForeignKey("gear.id"),
nullable=True,
comment="Gear ID associated with this activity",
)
strava_activity_id = Column(BigInteger, nullable=True, comment="Strava activity ID")
# Define a relationship to the User model
user = relationship('User', back_populates='activities')
user = relationship("User", back_populates="activities")
# Define a relationship to the Gear model
gear = relationship('Gear', back_populates='activities')
gear = relationship("Gear", back_populates="activities")
# Context manager to get a database session
from contextlib import contextmanager
@contextmanager
def get_db_session():

31
main.py
View File

@@ -1,20 +1,24 @@
from fastapi import FastAPI
from apscheduler.schedulers.background import BackgroundScheduler
from controllers import sessionController, userController, gearController, activitiesController, stravaController
from controllers import (
sessionController,
userController,
gearController,
activitiesController,
stravaController,
)
from datetime import datetime, timedelta
import logging
import asyncio
import threading
app = FastAPI()
logger = logging.getLogger("myLogger")
logger.setLevel(logging.DEBUG)
file_handler = logging.FileHandler('app.log')
file_handler = logging.FileHandler("app.log")
file_handler.setLevel(logging.DEBUG)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)
@@ -31,13 +35,18 @@ scheduler = BackgroundScheduler()
scheduler.start()
# Remove the leading space
scheduler.add_job(sessionController.remove_expired_tokens, 'interval', minutes=1)
scheduler.add_job(stravaController.refresh_strava_token, 'interval', minutes=30)
#scheduler.add_job(stravaController.get_strava_activities, 'interval', minutes=1, args=((datetime.utcnow() - timedelta(days=7)).strftime('%Y-%m-%dT%H:%M:%SZ'),))
start_date = (datetime.utcnow() - timedelta(days=7)).strftime('%Y-%m-%dT%H:%M:%SZ')
scheduler.add_job(lambda: stravaController.get_strava_activities(start_date), 'interval', minutes=1)
scheduler.add_job(sessionController.remove_expired_tokens, "interval", minutes=1)
scheduler.add_job(stravaController.refresh_strava_token, "interval", minutes=30)
scheduler.add_job(
lambda: stravaController.get_strava_activities(
(datetime.utcnow() - timedelta(days=7)).strftime("%Y-%m-%dT%H:%M:%SZ")
),
"interval",
minutes=60,
)
# Add the background scheduler to the app's shutdown event
@app.on_event("shutdown")
async def shutdown_event():
scheduler.shutdown()
scheduler.shutdown()