mirror of
https://github.com/Casvt/MIND.git
synced 2026-02-19 11:54:46 -05:00
306 lines
8.4 KiB
Python
306 lines
8.4 KiB
Python
# -*- coding: utf-8 -*-
|
|
|
|
from typing import List, Union
|
|
|
|
from backend.base.custom_exceptions import (AccessUnauthorized,
|
|
NewAccountsNotAllowed,
|
|
OperationNotAllowed,
|
|
UsernameInvalid, UsernameTaken,
|
|
UserNotFound)
|
|
from backend.base.definitions import Constants, InvalidUsernameReason, UserData
|
|
from backend.base.helpers import generate_salt_hash, get_hash
|
|
from backend.base.logging import LOGGER
|
|
from backend.internals.db_models import UsersDB
|
|
from backend.internals.settings import Settings
|
|
|
|
|
|
def is_valid_username(username: str) -> Union[None, InvalidUsernameReason]:
|
|
"""Check if username is valid.
|
|
|
|
Args:
|
|
username (str): The username to check.
|
|
|
|
Returns:
|
|
Union[None, InvalidUsernameReason]: `None` if username is valid,
|
|
`InvalidUsernameReason` if it is invalid.
|
|
"""
|
|
if username in Constants.INVALID_USERNAMES:
|
|
return InvalidUsernameReason.NOT_ALLOWED
|
|
|
|
if username.isdigit():
|
|
return InvalidUsernameReason.ONLY_NUMBERS
|
|
|
|
if any(
|
|
c not in Constants.USERNAME_CHARACTERS
|
|
for c in username
|
|
):
|
|
return InvalidUsernameReason.INVALID_CHARACTER
|
|
|
|
return None
|
|
|
|
|
|
class User:
|
|
def __init__(self, user_id: int) -> None:
|
|
"""Create an instance.
|
|
|
|
Args:
|
|
user_id (int): The ID of the user.
|
|
|
|
Raises:
|
|
UserNotFound: The user does not exist.
|
|
"""
|
|
self.user_db = UsersDB()
|
|
self.user_id = user_id
|
|
|
|
if not self.user_db.exists(self.user_id):
|
|
raise UserNotFound(None, user_id)
|
|
|
|
return
|
|
|
|
def get(self) -> UserData:
|
|
"""Get the info about the user.
|
|
|
|
Returns:
|
|
UserData: The info about the user.
|
|
"""
|
|
return self.user_db.fetch(self.user_id)[0]
|
|
|
|
def update_username(self, new_username: str) -> None:
|
|
"""Change the username of the account.
|
|
|
|
Args:
|
|
new_username (str): The new username.
|
|
|
|
Raises:
|
|
UsernameInvalid: The new username is not valid.
|
|
UsernameTaken: The new username is already taken.
|
|
"""
|
|
reason = is_valid_username(new_username)
|
|
if reason is not None:
|
|
raise UsernameInvalid(new_username, reason)
|
|
|
|
if self.user_db.taken(new_username):
|
|
raise UsernameTaken(new_username)
|
|
|
|
user_data = self.get()
|
|
|
|
self.user_db.update(
|
|
self.user_id,
|
|
new_username,
|
|
user_data.hash,
|
|
user_data.mfa_apprise_url
|
|
)
|
|
|
|
LOGGER.info(
|
|
f"The user with ID {self.user_id} has a new username: {new_username}"
|
|
)
|
|
return
|
|
|
|
def update_password(self, new_password: str) -> None:
|
|
"""Change the password of the account.
|
|
|
|
Args:
|
|
new_password (str): The new password.
|
|
"""
|
|
user_data = self.get()
|
|
|
|
hash_password = get_hash(user_data.salt, new_password)
|
|
|
|
self.user_db.update(
|
|
self.user_id,
|
|
user_data.username,
|
|
hash_password,
|
|
user_data.mfa_apprise_url
|
|
)
|
|
|
|
LOGGER.info(
|
|
f'The user with ID {self.user_id} changed their password'
|
|
)
|
|
return
|
|
|
|
def update_mfa_apprise_url(
|
|
self,
|
|
new_mfa_apprise_url: Union[str, None]
|
|
) -> None:
|
|
"""Change the MFA Apprise URL of the account.
|
|
|
|
Args:
|
|
new_mfa_apprise_url (Union[str, None]): The new MFA Apprise URL.
|
|
"""
|
|
user_data = self.get()
|
|
|
|
self.user_db.update(
|
|
self.user_id,
|
|
user_data.username,
|
|
user_data.hash,
|
|
new_mfa_apprise_url
|
|
)
|
|
|
|
return
|
|
|
|
def delete(self) -> None:
|
|
"""Delete the user. The instance should not be used after calling this
|
|
method.
|
|
|
|
Raises:
|
|
OperationNotAllowed: The admin account cannot be deleted.
|
|
"""
|
|
user_data = self.get()
|
|
if user_data.admin:
|
|
raise OperationNotAllowed(
|
|
"The admin account cannot be deleted"
|
|
)
|
|
|
|
LOGGER.info(f'Deleting the user with ID {self.user_id}')
|
|
|
|
self.user_db.delete(self.user_id)
|
|
|
|
return
|
|
|
|
|
|
class Users:
|
|
def __init__(self) -> None:
|
|
self.user_db = UsersDB()
|
|
return
|
|
|
|
def get_all(self) -> List[UserData]:
|
|
"""Get all user info for the admin.
|
|
|
|
Returns:
|
|
List[UserData]: The info about all users.
|
|
"""
|
|
return self.user_db.fetch()
|
|
|
|
def get_one(self, user_id: int) -> User:
|
|
"""Get a user instance based on the ID.
|
|
|
|
Args:
|
|
user_id (int): The ID of the user.
|
|
|
|
Returns:
|
|
User: The user instance.
|
|
"""
|
|
return User(user_id)
|
|
|
|
def __contains__(self, username_or_id: Union[str, int]) -> bool:
|
|
if isinstance(username_or_id, str):
|
|
return self.username_taken(username_or_id)
|
|
else:
|
|
return self.id_taken(username_or_id)
|
|
|
|
def username_taken(self, username: str) -> bool:
|
|
"""Check if a username is taken.
|
|
|
|
Args:
|
|
username (str): The username to check.
|
|
|
|
Returns:
|
|
bool: Whether the username is already taken.
|
|
"""
|
|
return self.user_db.taken(username)
|
|
|
|
def id_taken(self, user_id: int) -> bool:
|
|
"""Check if a user ID is taken.
|
|
|
|
Args:
|
|
user_id (int): The user ID to check.
|
|
|
|
Returns:
|
|
bool: Whether the user ID is already taken.
|
|
"""
|
|
return self.user_db.exists(user_id)
|
|
|
|
def login(
|
|
self,
|
|
username: str,
|
|
password: str
|
|
) -> User:
|
|
"""Login into an user account.
|
|
|
|
Args:
|
|
username (str): The username of the user.
|
|
password (str): The password of the user.
|
|
|
|
Raises:
|
|
UserNotFound: There is no user with the given username.
|
|
AccessUnauthorized: The password is incorrect.
|
|
|
|
Returns:
|
|
User: The user that was logged into.
|
|
"""
|
|
if not self.username_taken(username):
|
|
raise UserNotFound(username, None)
|
|
|
|
user_data = self.user_db.fetch(
|
|
self.user_db.username_to_id(username)
|
|
)[0]
|
|
|
|
hash_password = get_hash(user_data.salt, password)
|
|
# Comparing hashes, not password strings, so no need for
|
|
# constant time comparison
|
|
if not hash_password == user_data.hash:
|
|
raise AccessUnauthorized
|
|
|
|
return User(user_data.id)
|
|
|
|
def add(
|
|
self,
|
|
username: str,
|
|
password: str,
|
|
force: bool = False,
|
|
is_admin: bool = False
|
|
) -> int:
|
|
"""Add a user.
|
|
|
|
Args:
|
|
username (str): The username of the new user.
|
|
|
|
password (str): The password of the new user.
|
|
|
|
force (bool, optional): Skip check for whether new accounts are
|
|
allowed.
|
|
Defaults to False.
|
|
|
|
is_admin (bool, optional): The account is the admin account.
|
|
Defaults to False.
|
|
|
|
Raises:
|
|
UsernameInvalid: Username not allowed or contains invalid characters.
|
|
UsernameTaken: Username is already taken; usernames must be unique.
|
|
NewAccountsNotAllowed: In the admin panel, new accounts are set to be
|
|
not allowed.
|
|
|
|
Returns:
|
|
int: The ID of the new user. User registered successfully.
|
|
"""
|
|
LOGGER.info(f'Registering user with username {username}')
|
|
|
|
if not force and not Settings().sv.allow_new_accounts:
|
|
raise NewAccountsNotAllowed
|
|
|
|
reason = is_valid_username(username)
|
|
if reason is not None:
|
|
raise UsernameInvalid(username, reason)
|
|
|
|
if self.user_db.taken(username):
|
|
raise UsernameTaken(username)
|
|
|
|
if is_admin and self.user_db.admin_id() is not None:
|
|
# Attempted to add admin account (only done internally),
|
|
# but admin account already exists
|
|
raise RuntimeError("Admin account already exists")
|
|
|
|
# Generate salt and key exclusive for user
|
|
salt, hashed_password = generate_salt_hash(password)
|
|
|
|
# Add user to database
|
|
user_id = self.user_db.add(
|
|
username,
|
|
salt,
|
|
hashed_password,
|
|
is_admin
|
|
)
|
|
|
|
LOGGER.debug(f'Newly registered user has id {user_id}')
|
|
return user_id
|