Files
MIND/backend/db.py

522 lines
13 KiB
Python

#-*- coding: utf-8 -*-
"""
Setting up and interacting with the database.
"""
from datetime import datetime
from os import makedirs, remove
from os.path import dirname, isfile, join
from shutil import move
from sqlite3 import Connection, OperationalError, ProgrammingError, Row
from threading import current_thread, main_thread
from time import time
from typing import Type, Union
from flask import g
from backend.custom_exceptions import (AccessUnauthorized, InvalidDatabaseFile,
UserNotFound)
from backend.helpers import RestartVars, folder_path
from backend.logging import LOGGER, set_log_level
DB_FILENAME = 'db', 'MIND.db'
__DATABASE_VERSION__ = 10
__DATEBASE_NAME_ORIGINAL__ = "MIND_original.db"
class DB_Singleton(type):
_instances = {}
def __call__(cls, *args, **kwargs):
i = f'{cls}{current_thread()}'
if (i not in cls._instances
or cls._instances[i].closed):
cls._instances[i] = super(DB_Singleton, cls).__call__(*args, **kwargs)
return cls._instances[i]
class DBConnection(Connection, metaclass=DB_Singleton):
file = ''
def __init__(self, timeout: float) -> None:
LOGGER.debug(f'Creating connection {self}')
super().__init__(self.file, timeout=timeout)
super().cursor().execute("PRAGMA foreign_keys = ON;")
self.closed = False
return
def close(self) -> None:
LOGGER.debug(f'Closing connection {self}')
self.closed = True
super().close()
return
def __repr__(self) -> str:
return f'<{self.__class__.__name__}; {current_thread().name}; {id(self)}>'
def setup_db_location() -> None:
"""Create folder for database and link file to DBConnection class
"""
if isfile(folder_path('db', 'Noted.db')):
move(folder_path('db', 'Noted.db'), folder_path(*DB_FILENAME))
db_location = folder_path(*DB_FILENAME)
makedirs(dirname(db_location), exist_ok=True)
DBConnection.file = db_location
return
def get_db(output_type: Union[Type[dict], Type[tuple]]=tuple):
"""Get a database cursor instance. Coupled to Flask's g.
Args:
output_type (Union[Type[dict], Type[tuple]], optional):
The type of output: a tuple or dictionary with the row values.
Defaults to tuple.
Returns:
Cursor: The Cursor instance to use
"""
try:
cursor = g.cursor
except AttributeError:
db = DBConnection(timeout=20.0)
cursor = g.cursor = db.cursor()
if output_type is dict:
cursor.row_factory = Row
else:
cursor.row_factory = None
return g.cursor
def close_db(e=None) -> None:
"""Savely closes the database connection
"""
try:
cursor = g.cursor
db: DBConnection = cursor.connection
cursor.close()
delattr(g, 'cursor')
db.commit()
if current_thread() is main_thread():
db.close()
except (AttributeError, ProgrammingError):
pass
return
def migrate_db(current_db_version: int) -> None:
"""
Migrate a MIND database from it's current version
to the newest version supported by the MIND version installed.
"""
LOGGER.info('Migrating database to newer version...')
cursor = get_db()
if current_db_version == 1:
# V1 -> V2
t = time()
utc_offset = datetime.fromtimestamp(t) - datetime.utcfromtimestamp(t)
cursor.execute("SELECT time, id FROM reminders;")
new_reminders = []
new_reminders_append = new_reminders.append
for reminder in cursor:
new_reminders_append([round((datetime.fromtimestamp(reminder[0]) - utc_offset).timestamp()), reminder[1]])
cursor.executemany("UPDATE reminders SET time = ? WHERE id = ?;", new_reminders)
current_db_version = 2
if current_db_version == 2:
# V2 -> V3
cursor.executescript("""
ALTER TABLE reminders
ADD color VARCHAR(7);
ALTER TABLE templates
ADD color VARCHAR(7);
""")
current_db_version = 3
if current_db_version == 3:
# V3 -> V4
cursor.executescript("""
UPDATE reminders
SET repeat_quantity = repeat_quantity || 's'
WHERE repeat_quantity NOT LIKE '%s';
""")
current_db_version = 4
if current_db_version == 4:
# V4 -> V5
cursor.executescript("""
BEGIN TRANSACTION;
PRAGMA defer_foreign_keys = ON;
CREATE TEMPORARY TABLE temp_reminder_services(
reminder_id,
static_reminder_id,
template_id,
notification_service_id
);
-- Reminders
INSERT INTO temp_reminder_services(reminder_id, notification_service_id)
SELECT id, notification_service
FROM reminders;
CREATE TEMPORARY TABLE temp_reminders AS
SELECT id, user_id, title, text, time, repeat_quantity, repeat_interval, original_time, color
FROM reminders;
DROP TABLE reminders;
CREATE TABLE reminders(
id INTEGER PRIMARY KEY,
user_id INTEGER NOT NULL,
title VARCHAR(255) NOT NULL,
text TEXT,
time INTEGER NOT NULL,
repeat_quantity VARCHAR(15),
repeat_interval INTEGER,
original_time INTEGER,
color VARCHAR(7),
FOREIGN KEY (user_id) REFERENCES users(id)
);
INSERT INTO reminders
SELECT * FROM temp_reminders;
-- Templates
INSERT INTO temp_reminder_services(template_id, notification_service_id)
SELECT id, notification_service
FROM templates;
CREATE TEMPORARY TABLE temp_templates AS
SELECT id, user_id, title, text, color
FROM templates;
DROP TABLE templates;
CREATE TABLE templates(
id INTEGER PRIMARY KEY,
user_id INTEGER NOT NULL,
title VARCHAR(255) NOT NULL,
text TEXT,
color VARCHAR(7),
FOREIGN KEY (user_id) REFERENCES users(id)
);
INSERT INTO templates
SELECT * FROM temp_templates;
INSERT INTO reminder_services
SELECT * FROM temp_reminder_services;
COMMIT;
""")
current_db_version = 5
if current_db_version == 5:
# V5 -> V6
from backend.users import User
try:
User('User1', 'Password1').delete()
except (UserNotFound, AccessUnauthorized):
pass
current_db_version = 6
if current_db_version == 6:
# V6 -> V7
cursor.executescript("""
ALTER TABLE reminders
ADD weekdays VARCHAR(13);
""")
current_db_version = 7
if current_db_version == 7:
# V7 -> V8
from backend.settings import _format_setting, default_settings
from backend.users import Users
cursor.executescript("""
DROP TABLE config;
CREATE TABLE IF NOT EXISTS config(
key VARCHAR(255) PRIMARY KEY,
value BLOB NOT NULL
);
"""
)
cursor.executemany("""
INSERT OR IGNORE INTO config(key, value)
VALUES (?, ?);
""",
map(
lambda kv: (kv[0], _format_setting(*kv)),
default_settings.items()
)
)
cursor.executescript("""
ALTER TABLE users
ADD admin BOOL NOT NULL DEFAULT 0;
UPDATE users
SET username = 'admin_old'
WHERE username = 'admin';
""")
Users().add('admin', 'admin', True)
cursor.execute("""
UPDATE users
SET admin = 1
WHERE username = 'admin';
""")
current_db_version = 8
if current_db_version == 8:
# V8 -> V9
from backend.settings import set_setting
from MIND import HOST, PORT, URL_PREFIX
set_setting('host', HOST)
set_setting('port', int(PORT))
set_setting('url_prefix', URL_PREFIX)
current_db_version = 9
if current_db_version == 9:
# V9 -> V10
# Nothing is changed in the database
# It's just that this code needs to run once
# and the DB migration system does exactly that:
# run pieces of code once.
from backend.settings import update_manifest
url_prefix: str = cursor.execute(
"SELECT value FROM config WHERE key = 'url_prefix' LIMIT 1;"
).fetchone()[0]
update_manifest(url_prefix)
current_db_version = 10
return
def setup_db() -> None:
"""Setup the database
"""
from backend.settings import (_format_setting, default_settings, get_setting,
set_setting, update_manifest)
from backend.users import Users
cursor = get_db()
cursor.execute("PRAGMA journal_mode = wal;")
cursor.executescript("""
CREATE TABLE IF NOT EXISTS users(
id INTEGER PRIMARY KEY,
username VARCHAR(255) UNIQUE NOT NULL,
salt VARCHAR(40) NOT NULL,
hash VARCHAR(100) NOT NULL,
admin BOOL NOT NULL DEFAULT 0
);
CREATE TABLE IF NOT EXISTS notification_services(
id INTEGER PRIMARY KEY,
user_id INTEGER NOT NULL,
title VARCHAR(255),
url TEXT,
FOREIGN KEY (user_id) REFERENCES users(id)
);
CREATE TABLE IF NOT EXISTS reminders(
id INTEGER PRIMARY KEY,
user_id INTEGER NOT NULL,
title VARCHAR(255) NOT NULL,
text TEXT,
time INTEGER NOT NULL,
repeat_quantity VARCHAR(15),
repeat_interval INTEGER,
original_time INTEGER,
weekdays VARCHAR(13),
color VARCHAR(7),
FOREIGN KEY (user_id) REFERENCES users(id)
);
CREATE TABLE IF NOT EXISTS templates(
id INTEGER PRIMARY KEY,
user_id INTEGER NOT NULL,
title VARCHAR(255) NOT NULL,
text TEXT,
color VARCHAR(7),
FOREIGN KEY (user_id) REFERENCES users(id)
);
CREATE TABLE IF NOT EXISTS static_reminders(
id INTEGER PRIMARY KEY,
user_id INTEGER NOT NULL,
title VARCHAR(255) NOT NULL,
text TEXT,
color VARCHAR(7),
FOREIGN KEY (user_id) REFERENCES users(id)
);
CREATE TABLE IF NOT EXISTS reminder_services(
reminder_id INTEGER,
static_reminder_id INTEGER,
template_id INTEGER,
notification_service_id INTEGER NOT NULL,
FOREIGN KEY (reminder_id) REFERENCES reminders(id)
ON DELETE CASCADE,
FOREIGN KEY (static_reminder_id) REFERENCES static_reminders(id)
ON DELETE CASCADE,
FOREIGN KEY (template_id) REFERENCES templates(id)
ON DELETE CASCADE,
FOREIGN KEY (notification_service_id) REFERENCES notification_services(id)
);
CREATE TABLE IF NOT EXISTS config(
key VARCHAR(255) PRIMARY KEY,
value BLOB NOT NULL
);
""")
cursor.executemany("""
INSERT OR IGNORE INTO config(key, value)
VALUES (?, ?);
""",
map(
lambda kv: (kv[0], _format_setting(*kv)),
default_settings.items()
)
)
set_log_level(get_setting('log_level'), clear_file=False)
update_manifest(get_setting('url_prefix'))
current_db_version = get_setting('database_version')
if current_db_version < __DATABASE_VERSION__:
LOGGER.debug(
f'Database migration: {current_db_version} -> {__DATABASE_VERSION__}'
)
migrate_db(current_db_version)
set_setting('database_version', __DATABASE_VERSION__)
users = Users()
if not 'admin' in users:
users.add('admin', 'admin', True)
cursor.execute("""
UPDATE users
SET admin = 1
WHERE username = 'admin';
""")
return
def revert_db_import(
swap: bool,
imported_db_file: str = ''
) -> None:
"""Revert the database import process. The original_db_file is the file
currently used (`DBConnection.file`).
Args:
swap (bool): Whether or not to keep the imported_db_file or not,
instead of the original_db_file.
imported_db_file (str, optional): The other database file. Keep empty
to use `__DATABASE_NAME_ORIGINAL__`. Defaults to ''.
"""
original_db_file = DBConnection.file
if not imported_db_file:
imported_db_file = join(dirname(DBConnection.file), __DATEBASE_NAME_ORIGINAL__)
if swap:
remove(original_db_file)
move(
imported_db_file,
original_db_file
)
else:
remove(imported_db_file)
return
def import_db(
new_db_file: str,
copy_hosting_settings: bool
) -> None:
"""Replace the current database with a new one.
Args:
new_db_file (str): The path to the new database file.
copy_hosting_settings (bool): Keep the hosting settings from the current
database.
Raises:
InvalidDatabaseFile: The new database file is invalid or unsupported.
"""
LOGGER.info(f'Importing new database; {copy_hosting_settings=}')
try:
cursor = Connection(new_db_file, timeout=20.0).cursor()
database_version = cursor.execute(
"SELECT value FROM config WHERE key = 'database_version' LIMIT 1;"
).fetchone()[0]
if not isinstance(database_version, int):
raise InvalidDatabaseFile
except (OperationalError, InvalidDatabaseFile):
LOGGER.error('Uploaded database is not a MIND database file')
cursor.connection.close()
revert_db_import(
swap=False,
imported_db_file=new_db_file
)
raise InvalidDatabaseFile
if database_version > __DATABASE_VERSION__:
LOGGER.error('Uploaded database is higher version than this MIND installation can support')
revert_db_import(
swap=False,
imported_db_file=new_db_file
)
raise InvalidDatabaseFile
if copy_hosting_settings:
hosting_settings = get_db().execute("""
SELECT key, value, value
FROM config
WHERE key = 'host'
OR key = 'port'
OR key = 'url_prefix'
LIMIT 3;
"""
)
cursor.executemany("""
INSERT INTO config(key, value)
VALUES (?, ?)
ON CONFLICT(key) DO
UPDATE
SET value = ?;
""",
hosting_settings
)
cursor.connection.commit()
cursor.connection.close()
move(
DBConnection.file,
join(dirname(DBConnection.file), __DATEBASE_NAME_ORIGINAL__)
)
move(
new_db_file,
DBConnection.file
)
from backend.server import SERVER
SERVER.restart([RestartVars.DB_IMPORT.value])
return