Refactored users.py

This commit is contained in:
CasVT
2025-08-18 17:05:35 +02:00
parent 6da1d3b2f6
commit bafd8abe6a
6 changed files with 86 additions and 87 deletions

View File

@@ -59,7 +59,7 @@ class Constants:
ADMIN_USERNAME = "admin"
ADMIN_PASSWORD = "admin"
INVALID_USERNAMES = ("reminders", "api")
INVALID_USERNAMES = ("", "reminders", "api")
USERNAME_CHARACTERS = 'abcedfghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789_-.!@$'
CONNECTION_ERROR_TIMEOUT = 120 # seconds

View File

@@ -8,54 +8,52 @@ from backend.base.custom_exceptions import (AccessUnauthorized,
UsernameInvalid, UsernameTaken,
UserNotFound)
from backend.base.definitions import Constants, InvalidUsernameReason, UserData
from backend.base.helpers import Singleton, generate_salt_hash, get_hash
from backend.base.helpers import generate_salt_hash, get_hash
from backend.base.logging import LOGGER
from backend.internals.db_models import UsersDB
from backend.internals.settings import Settings
def is_valid_username(username: str) -> None:
def is_valid_username(username: str) -> Union[None, InvalidUsernameReason]:
"""Check if username is valid.
Args:
username (str): The username to check.
Raises:
UsernameInvalid: The username is not valid.
Returns:
Union[None, InvalidUsernameReason]: `None` if username is valid,
`InvalidUsernameReason` if it is invalid.
"""
if username in Constants.INVALID_USERNAMES:
raise UsernameInvalid(username, InvalidUsernameReason.NOT_ALLOWED)
return InvalidUsernameReason.NOT_ALLOWED
if username.isdigit():
raise UsernameInvalid(username, InvalidUsernameReason.ONLY_NUMBERS)
return InvalidUsernameReason.ONLY_NUMBERS
if any(
c not in Constants.USERNAME_CHARACTERS
for c in username
):
raise UsernameInvalid(
username,
InvalidUsernameReason.INVALID_CHARACTER
)
return InvalidUsernameReason.INVALID_CHARACTER
return
return None
class User:
def __init__(self, id: int) -> None:
"""Create a representation of a user.
def __init__(self, user_id: int) -> None:
"""Create an instance.
Args:
id (int): The ID of the user.
user_id (int): The ID of the user.
Raises:
UserNotFound: The user does not exist.
"""
self.user_db = UsersDB()
self.user_id = id
self.user_id = user_id
if not self.user_db.exists(self.user_id):
raise UserNotFound(None, id)
raise UserNotFound(None, user_id)
return
@@ -67,63 +65,58 @@ class User:
"""
return self.user_db.fetch(self.user_id)[0]
def update(
self,
new_username: Union[str, None],
new_password: Union[str, None]
) -> None:
"""Change the username and/or password of the account.
def update_username(self, new_username: str) -> None:
"""Change the username of the account.
Args:
new_username (Union[str, None]): The new username, or None if it
should not be changed.
new_password (Union[str, None]): The new password, or None if it
should not be changed.
new_username (str): The new username.
Raises:
UsernameInvalid: The new username is not valid.
UsernameTaken: The new username is already taken.
"""
if not (new_username or new_password):
return
reason = is_valid_username(new_username)
if reason is not None:
raise UsernameInvalid(new_username, reason)
if self.user_db.taken(new_username):
raise UsernameTaken(new_username)
self.user_db.update(
self.user_id,
new_username,
self.get().hash
)
LOGGER.info(
f"The user with ID {self.user_id} has a new username: {new_username}"
)
return
def update_password(self, new_password: str) -> None:
"""Change the password of the account.
Args:
new_password (str): The new password.
"""
user_data = self.get()
if new_username is not None:
is_valid_username(new_username)
hash_password = get_hash(user_data.salt, new_password)
if self.user_db.taken(new_username):
raise UsernameTaken(new_username)
self.user_db.update(
self.user_id,
new_username,
user_data.hash
)
LOGGER.info(
f"The user with ID {self.user_id} has a changed username: {new_username}"
)
user_data = self.get()
if new_password is not None:
hash_password = get_hash(user_data.salt, new_password)
self.user_db.update(
self.user_id,
user_data.username,
hash_password
)
LOGGER.info(
f'The user with ID {self.user_id} changed their password'
)
self.user_db.update(
self.user_id,
user_data.username,
hash_password
)
LOGGER.info(
f'The user with ID {self.user_id} changed their password'
)
return
def delete(self) -> None:
"""Delete the user.
"""Delete the user. The instance should not be used after calling this
method.
Raises:
OperationNotAllowed: The admin account cannot be deleted.
@@ -141,19 +134,18 @@ class User:
return
class Users(metaclass=Singleton):
class Users:
def __init__(self) -> None:
self.user_db = UsersDB()
return
def get_all(self) -> List[UserData]:
"""Get all user info for the admin
"""Get all user info for the admin.
Returns:
List[UserData]: The info about all users
List[UserData]: The info about all users.
"""
result = self.user_db.fetch()
return result
return self.user_db.fetch()
def get_one(self, id: int) -> User:
"""Get a user instance based on the ID.
@@ -179,7 +171,7 @@ class Users(metaclass=Singleton):
username (str): The username to check.
Returns:
bool: True if the username is taken, False otherwise.
bool: Whether the username is already taken.
"""
return self.user_db.taken(username)
@@ -190,7 +182,7 @@ class Users(metaclass=Singleton):
id (int): The user ID to check.
Returns:
bool: True if the user ID is taken, False otherwise.
bool: Whether the user ID is already taken.
"""
return self.user_db.exists(id)
@@ -212,7 +204,7 @@ class Users(metaclass=Singleton):
Returns:
User: The user that was logged into.
"""
if not self.user_db.taken(username):
if not self.username_taken(username):
raise UserNotFound(username, None)
user_data = self.user_db.fetch(
@@ -242,7 +234,7 @@ class Users(metaclass=Singleton):
password (str): The password of the new user.
force (bool, optional): Skip check for whether new accounts are
allowed.
allowed.
Defaults to False.
is_admin (bool, optional): The account is the admin account.
@@ -252,26 +244,27 @@ class Users(metaclass=Singleton):
UsernameInvalid: Username not allowed or contains invalid characters.
UsernameTaken: Username is already taken; usernames must be unique.
NewAccountsNotAllowed: In the admin panel, new accounts are set to be
not allowed.
not allowed.
Returns:
int: The ID of the new user. User registered successfully.
"""
LOGGER.info(f'Registering user with username {username}')
if not force and not Settings().get_settings().allow_new_accounts:
if not force and not Settings().sv.allow_new_accounts:
raise NewAccountsNotAllowed
is_valid_username(username)
reason = is_valid_username(username)
if reason is not None:
raise UsernameInvalid(username, reason)
if self.user_db.taken(username):
raise UsernameTaken(username)
if is_admin:
if self.user_db.taken(Constants.ADMIN_USERNAME):
# Attempted to add admin account (only done internally),
# but admin account already exists
raise RuntimeError("Admin account already exists")
if is_admin and self.user_db.admin_id() is not None:
# Attempted to add admin account (only done internally),
# but admin account already exists
raise RuntimeError("Admin account already exists")
# Generate salt and key exclusive for user
salt, hashed_password = generate_salt_hash(password)

View File

@@ -273,10 +273,7 @@ class MigrateAddAdmin(DBMigrator):
if 'admin' in users:
users.get_one(
users.user_db.username_to_id('admin')
).update(
new_username='admin_old',
new_password=None
)
).update_username('admin_old')
users.add(
Constants.ADMIN_USERNAME, Constants.ADMIN_PASSWORD,

View File

@@ -260,6 +260,11 @@ class UsersDB:
(username,)
).fetchone()[0]
def admin_id(self) -> Union[int, None]:
return get_db().execute(
"SELECT id FROM users WHERE admin = 1 LIMIT 1;"
).exists()
def fetch(
self,
user_id: Union[int, None] = None

View File

@@ -189,7 +189,10 @@ def api_add_user(inputs: Dict[str, str]):
def api_manage_user(inputs: Dict[str, Any]):
user = api_key_map[g.hashed_api_key].user_data
if request.method == 'PUT':
user.update(inputs['new_username'], inputs['new_password'])
if inputs['new_username']:
user.update_username(inputs['new_username'])
if inputs['new_password']:
user.update_password(inputs['new_password'])
return return_api({})
elif request.method == 'DELETE':
@@ -588,7 +591,10 @@ def api_admin_users(inputs: Dict[str, Any]):
def api_admin_user(inputs: Dict[str, Any], u_id: int):
user = users.get_one(u_id)
if request.method == 'PUT':
user.update(inputs['new_username'], inputs['new_password'])
if inputs['new_username']:
user.update_username(inputs['new_username'])
if inputs['new_password']:
user.update_password(inputs['new_password'])
return return_api({})
elif request.method == 'DELETE':

View File

@@ -1,15 +1,13 @@
import unittest
from backend.base.custom_exceptions import UsernameInvalid
from backend.base.definitions import Constants
from backend.implementations.users import is_valid_username
class Test_Users(unittest.TestCase):
def test_username_check(self):
for test_case in ('', 'test'):
is_valid_username(test_case)
for test_case in ('User1', 'test'):
self.assertIsNone(is_valid_username(test_case))
for test_case in (' ', ' ', '0', 'api', *Constants.INVALID_USERNAMES):
with self.assertRaises(UsernameInvalid):
is_valid_username(test_case)
self.assertIsNotNone(is_valid_username(test_case))