Added 2FA in the backend

This commit is contained in:
CasVT
2025-08-30 23:12:50 +02:00
parent 7456561a3a
commit aef883e0da
9 changed files with 212 additions and 78 deletions

View File

@@ -158,6 +158,18 @@ class APIKeyExpired(LogUnauthMindException):
}
class MFACodeRequired(MindException):
"An MFA code is sent and now expected to be supplied"
@property
def api_response(self) -> ApiResponse:
return {
'code': 200,
'error': self.__class__.__name__,
'result': {}
}
# region Admin Operations
class OperationNotAllowed(MindException):
"What was requested to be done is not allowed"

View File

@@ -9,18 +9,15 @@ from __future__ import annotations
from abc import ABC, abstractmethod
from dataclasses import dataclass
from enum import Enum
from typing import (TYPE_CHECKING, Any, Callable, Dict, List, Literal,
Sequence, Tuple, TypedDict, TypeVar, Union, cast)
from typing import (Any, Callable, Dict, List, Literal, Sequence,
Tuple, TypedDict, TypeVar, Union, cast)
from flask import Response
if TYPE_CHECKING:
from backend.implementations.users import User
# region Types
T = TypeVar('T')
U = TypeVar('U')
MISSING = object()
WEEKDAY_NUMBER = Literal[0, 1, 2, 3, 4, 5, 6]
BaseJSONSerialisable = Union[
@@ -56,6 +53,7 @@ class Constants:
ADMIN_PREFIX = API_PREFIX + ADMIN_API_EXTENSION
API_KEY_LENGTH = 32 # hexadecimal characters
API_KEY_CLEANUP_INTERVAL = 86400 # seconds
MFA_CODE_TIMEOUT = 300 # seconds
DB_FOLDER = ("db",)
DB_NAME = "MIND.db"
@@ -238,12 +236,6 @@ class StartTypeHandler(ABC):
# region Dataclasses
@dataclass
class ApiKeyEntry:
exp: int
user_data: UserData
@dataclass(frozen=True, order=True)
class NotificationServiceData:
id: int
@@ -261,12 +253,13 @@ class UserData:
admin: bool
salt: bytes
hash: bytes
mfa_apprise_url: Union[str, None]
def todict(self) -> Dict[str, Any]:
return {
k: v
for k, v in self.__dict__.items()
if k in ('id', 'username', 'admin')
if k in ('id', 'username', 'admin', 'mfa_apprise_url')
}

View File

@@ -262,6 +262,15 @@ def generate_api_key() -> str:
return token_hex(Constants.API_KEY_LENGTH // 2)
def generate_mfa_code() -> str:
"""Generate a 6-digit MFA code.
Returns:
str: The code.
"""
return str(int.from_bytes(token_bytes(3), 'big') % 1_000_000).zfill(6)
# region Apprise
def send_apprise_notification(
urls: List[str],

View File

@@ -82,10 +82,13 @@ class User:
if self.user_db.taken(new_username):
raise UsernameTaken(new_username)
user_data = self.get()
self.user_db.update(
self.user_id,
new_username,
self.get().hash
user_data.hash,
user_data.mfa_apprise_url
)
LOGGER.info(
@@ -106,7 +109,8 @@ class User:
self.user_db.update(
self.user_id,
user_data.username,
hash_password
hash_password,
user_data.mfa_apprise_url
)
LOGGER.info(
@@ -114,6 +118,26 @@ class User:
)
return
def update_mfa_apprise_url(
self,
new_mfa_apprise_url: Union[str, None]
) -> None:
"""Change the MFA Apprise URL of the account.
Args:
new_mfa_apprise_url (Union[str, None]): The new MFA Apprise URL.
"""
user_data = self.get()
self.user_db.update(
self.user_id,
user_data.username,
user_data.hash,
new_mfa_apprise_url
)
return
def delete(self) -> None:
"""Delete the user. The instance should not be used after calling this
method.

View File

@@ -387,7 +387,8 @@ DB_SCHEMA = """
username VARCHAR(255) UNIQUE NOT NULL,
salt VARCHAR(40) NOT NULL,
hash VARCHAR(100) NOT NULL,
admin BOOL NOT NULL DEFAULT 0
admin BOOL NOT NULL DEFAULT 0,
mfa_apprise_url TEXT
);
CREATE TABLE IF NOT EXISTS notification_services(
id INTEGER PRIMARY KEY,

View File

@@ -382,3 +382,16 @@ class MigrateAddCronScheduleColumn(DBMigrator):
COMMIT;
PRAGMA foreign_keys = ON;
""")
class MigrateAddMFAColumn(DBMigrator):
start_version = 13
def run(self) -> None:
# V13 -> V14
get_db().executescript("""
ALTER TABLE users
ADD mfa_apprise_url TEXT;
""")
return

View File

@@ -275,7 +275,7 @@ class UsersDB:
result = get_db().execute(f"""
SELECT
id, username, admin, salt, hash
id, username, admin, salt, hash, mfa_apprise_url
FROM users
{id_filter}
ORDER BY admin DESC, LOWER(username);
@@ -310,16 +310,20 @@ class UsersDB:
self,
user_id: int,
username: str,
hash: bytes
hash: bytes,
mfa_apprise_url: Union[str, None]
) -> None:
get_db().execute("""
UPDATE users
SET username = :username, hash = :hash
SET username = :username,
hash = :hash,
mfa_apprise_url = :mfa_apprise_url
WHERE id = :user_id;
""",
{
"username": username,
"hash": hash,
"mfa_apprise_url": mfa_apprise_url or None,
"user_id": user_id
}
)