diff --git a/MIND.py b/MIND.py index 1c7f3f3..0081be8 100644 --- a/MIND.py +++ b/MIND.py @@ -11,7 +11,7 @@ from typing import NoReturn, Union from backend.base.custom_exceptions import InvalidKeyValue from backend.base.definitions import Constants, StartType -from backend.base.helpers import check_python_version, get_python_exe +from backend.base.helpers import check_min_python_version, get_python_exe from backend.base.logging import LOGGER, setup_logging from backend.features.reminder_handler import ReminderHandler from backend.features.tz_shifter import TimezoneChangeHandler @@ -57,7 +57,7 @@ def _main( setup_logging(log_folder) LOGGER.info('Starting up MIND') - if not check_python_version(): + if not check_min_python_version(*Constants.MIN_PYTHON_VERSION): exit(1) set_db_location(db_folder) @@ -163,7 +163,12 @@ def _run_sub_process( "MIND_START_TYPE": str(start_type.value) } - comm = [get_python_exe(), "-u", __file__] + argv[1:] + py_exe = get_python_exe() + if not py_exe: + print("ERROR: Python executable not found") + return 1 + + comm = [py_exe, "-u", __file__] + argv[1:] proc = Popen( comm, env=env diff --git a/backend/base/definitions.py b/backend/base/definitions.py index ae637a5..dd600f8 100644 --- a/backend/base/definitions.py +++ b/backend/base/definitions.py @@ -36,6 +36,8 @@ Serialisable = Union[ # region Constants class Constants: + MIN_PYTHON_VERSION = (3, 8, 0) + SUB_PROCESS_TIMEOUT = 20.0 # seconds HOSTING_THREADS = 10 diff --git a/backend/base/helpers.py b/backend/base/helpers.py index 537bd19..53a5a69 100644 --- a/backend/base/helpers.py +++ b/backend/base/helpers.py @@ -1,7 +1,7 @@ # -*- coding: utf-8 -*- """ -General "helper" function and classes +General "helper" functions and classes """ from base64 import urlsafe_b64encode @@ -9,12 +9,11 @@ from datetime import datetime from hashlib import pbkdf2_hmac from logging import WARNING from os import makedirs, scandir, symlink -from os.path import abspath, dirname, exists, join, splitext +from os.path import abspath, dirname, exists, isfile, join, splitext from secrets import token_bytes from shutil import copy2, move from sys import base_exec_prefix, executable, platform, version_info -from typing import (Any, Callable, Generator, Iterable, - List, Sequence, Set, Tuple, Union, cast) +from typing import Callable, Iterable, List, Sequence, Set, Tuple, Union, cast from apprise import Apprise, LogCapture from cron_converter import Cron @@ -22,44 +21,75 @@ from dateutil.relativedelta import relativedelta from backend.base.definitions import (WEEKDAY_NUMBER, GeneralReminderData, RepeatQuantity, SendResult, T, U) +from backend.base.logging import LOGGER # region Python def get_python_version() -> str: - """Get python version as string + """Get the Python version as a string. E.g. `"3.8.10.final.0"`. Returns: - str: The python version + str: The Python version. """ return ".".join( str(i) for i in list(version_info) ) -def check_python_version() -> bool: - """Check if the python version that is used is a minimum version. +def check_min_python_version( + min_major: int, + min_minor: int, + min_micro: int +) -> bool: + """Check whether the version of Python that is used is equal or higher than + the version given. Will log a critical error if not. + + ``` + # On Python3.9.1 + >>> check_min_python_version(3, 8, 2) + True + >>> check_min_python_version(3, 10, 0) + False + ``` + + Args: + min_major (int): The minimum major version. + min_minor (int): The minimum minor version. + min_micro (int): The miminum micro version. Returns: - bool: Whether or not the python version is version 3.8 or above or not. + bool: Whether it's equal or higher than the version given or below it. """ - if not (version_info.major == 3 and version_info.minor >= 8): - from backend.base.logging import LOGGER + min_version = ( + min_major, + min_minor, + min_micro + ) + current_version = ( + version_info.major, + version_info.minor, + version_info.micro + ) + + if current_version < min_version: LOGGER.critical( - 'The minimum python version required is python3.8 ' - '(currently ' + str(version_info.major) + '.' + - str(version_info.minor) + '.' + str(version_info.micro) + ').' + "The minimum python version required is python" + + ".".join(map(str, min_version)) + + " (currently " + ".".join(map(str, current_version)) + ")." ) return False + return True -def get_python_exe() -> str: - """Get the path to the python executable. +def get_python_exe() -> Union[str, None]: + """Get the absolute filepath to the python executable. Returns: - str: The python executable path. + Union[str, None]: The python executable path, or `None` if not found. """ if platform.startswith('darwin'): + filepath = None bundle_path = join( base_exec_prefix, "Resources", @@ -70,43 +100,35 @@ def get_python_exe() -> str: ) if exists(bundle_path): from tempfile import mkdtemp - python_path = join(mkdtemp(), "python") - symlink(bundle_path, python_path) + filepath = join(mkdtemp(), "python") + symlink(bundle_path, filepath) + else: + filepath = executable or None - return python_path + if filepath and not isfile(filepath): + filepath = None - return executable + return filepath # region Generic -def reversed_tuples( - i: Iterable[Tuple[T, U]] -) -> Generator[Tuple[U, T], Any, Any]: - """Yield sub-tuples in reversed order. - - Args: - i (Iterable[Tuple[T, U]]): Iterator. - - Yields: - Generator[Tuple[U, T], Any, Any]: Sub-tuple with reversed order. - """ - for entry_1, entry_2 in i: - yield entry_2, entry_1 - - -def first_of_column( - columns: Iterable[Sequence[T]] +def first_of_subarrays( + subarrays: Iterable[Sequence[T]] ) -> List[T]: """Get the first element of each sub-array. + ``` + >>> first_of_subarrays([[1, 2], [3, 4]]) + [1, 3] + ``` + Args: - columns (Iterable[Sequence[T]]): List of - sub-arrays. + subarrays (Iterable[Sequence[T]]): List of sub-arrays. Returns: List[T]: List with first value of each sub-array. """ - return [e[0] for e in columns] + return [e[0] for e in subarrays] def when_not_none( @@ -137,7 +159,7 @@ def search_filter(query: str, result: GeneralReminderData) -> bool: result (GeneralReminderData): The library result to check. Returns: - bool: Whether or not the result passes the filter. + bool: Whether the result passes the filter. """ query = query.lower().replace(' ', '') return ( @@ -149,14 +171,14 @@ def search_filter(query: str, result: GeneralReminderData) -> bool: # region Security def get_hash(salt: bytes, data: str) -> bytes: - """Hash a string using the supplied salt + """Hash a string using the supplied salt. Args: - salt (bytes): The salt to use when hashing - data (str): The data to hash + salt (bytes): The salt to use when hashing. + data (str): The data to hash. Returns: - bytes: The b64 encoded hash of the supplied string + bytes: The b64 encoded hash of the supplied string. """ return urlsafe_b64encode( pbkdf2_hmac('sha256', data.encode(), salt, 100_000) @@ -164,13 +186,13 @@ def get_hash(salt: bytes, data: str) -> bytes: def generate_salt_hash(password: str) -> Tuple[bytes, bytes]: - """Generate a salt and get the hash of the password + """Generate a salt and get the hash of the password. Args: - password (str): The password to generate for + password (str): The password to generate for. Returns: - Tuple[bytes, bytes]: The salt (1) and hashed_password (2) + Tuple[bytes, bytes]: The salt (1) and hashed_password (2). """ salt = token_bytes() hashed_password = get_hash(salt, password) @@ -183,19 +205,19 @@ def send_apprise_notification( title: str, text: Union[str, None] = None ) -> SendResult: - """Send a notification to all Apprise URL's given. + """Send a notification to all Apprise URLs given. Args: - urls (List[str]): The Apprise URL's to send the notification to. + urls (List[str]): The Apprise URLs to send the notification to. title (str): The title of the notification. text (Union[str, None], optional): The optional body of the - notification. + notification. Defaults to None. Returns: - SendResult: Whether or not it was successful. + SendResult: Whether Apprise was successful. """ a = Apprise() @@ -219,23 +241,30 @@ def send_apprise_notification( # region Time def next_selected_day( - weekdays: List[WEEKDAY_NUMBER], - weekday: WEEKDAY_NUMBER + allowed_weekdays: List[WEEKDAY_NUMBER], + current_weekday: WEEKDAY_NUMBER ) -> WEEKDAY_NUMBER: """Find the next allowed day in the week. + ``` + >>> next_selected_day([0, 4, 6], 4) + 6 + >>> next_selected_day([0, 4, 6], 6) + 0 + ``` + Args: - weekdays (List[WEEKDAY_NUMBER]): The days of the week that are allowed. - Monday is 0, Sunday is 6. - weekday (WEEKDAY_NUMBER): The current weekday. + allowed_weekdays (List[WEEKDAY_NUMBER]): The days of the week that are + allowed. Monday is 0, Sunday is 6. + current_weekday (WEEKDAY_NUMBER): The current weekday. Returns: WEEKDAY_NUMBER: The next allowed weekday. """ - for d in weekdays: - if weekday < d: + for d in allowed_weekdays: + if current_weekday < d: return d - return weekdays[0] + return allowed_weekdays[0] def find_next_time( @@ -245,19 +274,20 @@ def find_next_time( weekdays: Union[List[WEEKDAY_NUMBER], None], cron_schedule: Union[str, None] ) -> int: - """Calculate the next timestep based on original time and repeat/interval + """Calculate the next timestamp based on original time and repeat/interval values. Args: original_time (int): The original time of the repeating timestamp. repeat_quantity (Union[RepeatQuantity, None]): If set, what the quantity - is of the repetition. + is of the time interval. - repeat_interval (Union[int, None]): If set, the value of the repetition. + repeat_interval (Union[int, None]): If set, the value of the time + interval. weekdays (Union[List[WEEKDAY_NUMBER], None]): If set, on which days the - time can continue. Monday is 0, Sunday is 6. + timestamp can be. Monday is 0, Sunday is 6. cron_schedule (Union[str, None]): If set, the cron schedule to follow. @@ -326,12 +356,12 @@ def find_next_time( ) result = int(new_time.timestamp()) - # LOGGER.debug( - # f'{original_datetime=}, {current_time=} ' + - # f'and interval of {repeat_interval} {repeat_quantity} ' + - # f'and weekdays {weekdays} ' + - # f'leads to {result}' - # ) + LOGGER.debug( + f'{original_datetime=}, {current_time=} ' + + f'and interval of {repeat_interval} {repeat_quantity} ' + + f'and weekdays {weekdays} ' + + f'leads to {result}' + ) return result @@ -356,7 +386,7 @@ def list_files(folder: str, ext: Iterable[str] = []) -> List[str]: folder (str): The base folder to search through. ext (Iterable[str], optional): File extensions to only include. - Dot-prefix not necessary. + Dot-prefix not necessary. Let empty to allow all extensions. Defaults to []. Returns: @@ -391,10 +421,9 @@ def list_files(folder: str, ext: Iterable[str] = []) -> List[str]: return files -def create_folder( - folder: str -) -> None: - """Create a folder, if it doesn't exist already. +def create_folder(folder: str) -> None: + """Create a folder. Also creates any parent folders if they don't exist + already. Allows folder to already exist. Args: folder (str): The path to the folder to create. @@ -462,11 +491,17 @@ def rename_file( # region Classes class Singleton(type): + """ + Make each initialisation of a class return the same instance by setting + this as the metaclass. Works across threads, but not spawned subprocesses. + """ + _instances = {} - def __call__(cls, *args: Any, **kwargs: Any): - c = str(cls) - if c not in cls._instances: - cls._instances[c] = super().__call__(*args, **kwargs) + def __call__(cls, *args, **kwargs): + c_term = cls.__module__ + '.' + cls.__name__ - return cls._instances[c] + if c_term not in cls._instances: + cls._instances[c_term] = super().__call__(*args, **kwargs) + + return cls._instances[c_term] diff --git a/backend/base/logging.py b/backend/base/logging.py index ff5a6a9..4aa80ca 100644 --- a/backend/base/logging.py +++ b/backend/base/logging.py @@ -6,7 +6,6 @@ from os.path import exists, isdir, join from typing import Any, Union from backend.base.definitions import Constants -from backend.base.helpers import create_folder, folder_path class UpToInfoFilter(logging.Filter): @@ -92,6 +91,8 @@ def setup_logging(log_folder: Union[str, None]) -> None: Raises: ValueError: The given log folder is not a folder. """ + from backend.base.helpers import create_folder, folder_path + if log_folder: if exists(log_folder) and not isdir(log_folder): raise ValueError("Logging folder is not a folder") @@ -137,7 +138,11 @@ def setup_logging(log_folder: Union[str, None]) -> None: def get_log_filepath() -> str: - "Get the filepath to the logging file" + """Get the filepath to the logging file. + + Returns: + str: The filepath. + """ return LOGGING_CONFIG["handlers"]["file"]["filename"] diff --git a/backend/internals/db_models.py b/backend/internals/db_models.py index d9ae1ca..43c9cce 100644 --- a/backend/internals/db_models.py +++ b/backend/internals/db_models.py @@ -5,7 +5,7 @@ from typing import List, Union from backend.base.definitions import (NotificationServiceData, ReminderData, ReminderType, StaticReminderData, TemplateData, UserData) -from backend.base.helpers import first_of_column +from backend.base.helpers import first_of_subarrays from backend.internals.db import REMINDER_TO_KEY, get_db @@ -117,7 +117,7 @@ class ReminderServicesDB: List[int]: A list of the notification service ID's that are linked to the given reminder, static reminder or template. """ - result = first_of_column(get_db().execute( + result = first_of_subarrays(get_db().execute( f""" SELECT notification_service_id FROM reminder_services @@ -183,7 +183,7 @@ class ReminderServicesDB: List[int]: The ID's of the reminders (only of the given type) that use the notification service. """ - return first_of_column(get_db().execute( + return first_of_subarrays(get_db().execute( f""" SELECT {self.key} FROM reminder_services diff --git a/backend/internals/settings.py b/backend/internals/settings.py index a79c75c..545da3e 100644 --- a/backend/internals/settings.py +++ b/backend/internals/settings.py @@ -9,8 +9,7 @@ from typing import Any, Dict, Mapping from backend.base.custom_exceptions import InvalidKeyValue, KeyNotFound from backend.base.definitions import Constants -from backend.base.helpers import (Singleton, folder_path, - get_python_version, reversed_tuples) +from backend.base.helpers import Singleton, folder_path, get_python_version from backend.base.logging import LOGGER, set_log_level from backend.internals.db import DBConnection, commit, get_db from backend.internals.db_migration import get_latest_db_version @@ -149,7 +148,7 @@ class Settings(metaclass=Singleton): get_db().executemany( "UPDATE config SET value = ? WHERE key = ?;", - reversed_tuples(formatted_data.items()) + ((v, k) for k, v in formatted_data.items()) ) old_settings = self.get_settings()