mirror of
https://github.com/Casvt/MIND.git
synced 2026-02-19 11:54:46 -05:00
699 lines
20 KiB
Python
699 lines
20 KiB
Python
# -*- coding: utf-8 -*-
|
|
|
|
from datetime import datetime
|
|
from io import BytesIO
|
|
from os import remove
|
|
from os.path import basename
|
|
from time import time as epoch_time
|
|
from typing import TYPE_CHECKING, Any, Dict, Tuple, cast
|
|
|
|
from flask import after_this_request, g as flask_g, request, send_file
|
|
|
|
from backend.base.custom_exceptions import (AccessUnauthorized, APIKeyExpired,
|
|
APIKeyInvalid, MFACodeRequired)
|
|
from backend.base.definitions import (MISSING, Constants, Interval,
|
|
SendResult, StartType, UserData)
|
|
from backend.base.helpers import (folder_path, generate_api_key,
|
|
generate_mfa_code, hash_api_key, return_api,
|
|
send_apprise_notification)
|
|
from backend.base.logging import LOGGER, get_log_file_contents
|
|
from backend.implementations.apprise_parser import get_apprise_services
|
|
from backend.implementations.notification_services import NotificationServices
|
|
from backend.implementations.reminders import Reminders
|
|
from backend.implementations.static_reminders import StaticReminders
|
|
from backend.implementations.templates import Templates
|
|
from backend.implementations.users import Users
|
|
from backend.internals.db_backup_import import (create_database_copy,
|
|
get_backup, get_backups,
|
|
import_db, import_db_backup)
|
|
from backend.internals.server import Server, StartTypeHandlers
|
|
from backend.internals.settings import Settings, get_about_data
|
|
from frontend.input_validation import (AboutData, AuthLoginData,
|
|
AuthLogoutData, AuthStatusData,
|
|
AvailableNotificationServicesData,
|
|
BackupData, BackupsData, DatabaseData,
|
|
LogfileData, NotificationServiceData,
|
|
NotificationServicesData,
|
|
PublicSettingsData, ReminderData,
|
|
RemindersData, RestartData,
|
|
SearchRemindersData,
|
|
SearchStaticRemindersData,
|
|
SearchTemplatesData, SettingsData,
|
|
ShutdownData, StaticReminderData,
|
|
StaticRemindersData, TemplateData,
|
|
TemplatesData,
|
|
TestNotificationServiceURLData,
|
|
TestRemindersData, UserManagementData,
|
|
UsersAddData, UsersData,
|
|
UsersManagementData, admin_api, api,
|
|
get_api_docs, input_validation)
|
|
|
|
# region Auth Management and Input
|
|
users = Users()
|
|
|
|
if TYPE_CHECKING:
|
|
class TypedAppCtxGlobals:
|
|
hashed_api_key: str
|
|
exp: int
|
|
user_data: UserData
|
|
inputs: Dict[str, Any]
|
|
|
|
g = cast(TypedAppCtxGlobals, flask_g)
|
|
else:
|
|
g = flask_g
|
|
|
|
|
|
class AuthManager:
|
|
_next_run: int = 0
|
|
api_key_map: Dict[str, Tuple[UserData, int]] = {}
|
|
mfa_code_map: Dict[int, Tuple[str, int]] = {}
|
|
|
|
@classmethod
|
|
def cleanup(cls) -> None:
|
|
"""Cleans up expired API keys and MFA codes"""
|
|
|
|
now = int(epoch_time())
|
|
if now < cls._next_run:
|
|
return
|
|
|
|
cls._next_run = now + Constants.API_KEY_CLEANUP_INTERVAL
|
|
|
|
expired_api_keys = [
|
|
k
|
|
for k, (_, exp) in cls.api_key_map.items()
|
|
# Allow one day to respond with expired key
|
|
# instead of invalid key
|
|
if exp + Interval.ONE_DAY.value <= now
|
|
]
|
|
for k in expired_api_keys:
|
|
del cls.api_key_map[k]
|
|
|
|
expired_mfa_codes = [
|
|
k
|
|
for k, (_, exp) in cls.mfa_code_map.items()
|
|
if exp <= now
|
|
]
|
|
for k in expired_mfa_codes:
|
|
del cls.mfa_code_map[k]
|
|
|
|
return
|
|
|
|
@classmethod
|
|
def remove_user(cls, user_id: int) -> None:
|
|
for key, value in cls.api_key_map.items():
|
|
if value[0].id == user_id:
|
|
del cls.api_key_map[key]
|
|
break
|
|
return
|
|
|
|
|
|
def auth() -> None:
|
|
"""Checks if the client is logged in.
|
|
|
|
Raises:
|
|
APIKeyInvalid: The api key supplied is invalid.
|
|
APIKeyExpired: The api key supplied has expired.
|
|
"""
|
|
api_key = request.values.get('api_key', '')
|
|
hashed_api_key = hash_api_key(api_key)
|
|
|
|
if hashed_api_key not in AuthManager.api_key_map:
|
|
raise APIKeyInvalid(api_key)
|
|
|
|
user_data, exp = AuthManager.api_key_map[hashed_api_key]
|
|
|
|
if (
|
|
user_data.admin
|
|
and not request.path.startswith(
|
|
(Constants.ADMIN_PREFIX, Constants.API_PREFIX + '/auth')
|
|
)
|
|
):
|
|
raise APIKeyInvalid(api_key)
|
|
|
|
if (
|
|
not user_data.admin
|
|
and
|
|
request.path.startswith(Constants.ADMIN_PREFIX)
|
|
):
|
|
raise APIKeyInvalid(api_key)
|
|
|
|
if exp <= epoch_time():
|
|
del AuthManager.api_key_map[hashed_api_key]
|
|
raise APIKeyExpired(api_key)
|
|
|
|
# Api key valid
|
|
sv = Settings().get_settings()
|
|
if sv.login_time_reset:
|
|
exp = (
|
|
int(epoch_time()) + sv.login_time
|
|
)
|
|
|
|
g.hashed_api_key = hashed_api_key
|
|
g.user_data = user_data
|
|
g.exp = exp
|
|
|
|
return
|
|
|
|
|
|
@api.before_request
|
|
@admin_api.before_request
|
|
def api_auth_and_input_validation() -> None:
|
|
requires_auth = get_api_docs(request).requires_auth
|
|
|
|
if requires_auth:
|
|
auth()
|
|
|
|
g.inputs = input_validation()
|
|
return
|
|
|
|
|
|
# region Auth
|
|
@api.route('/auth/login', AuthLoginData)
|
|
def api_login():
|
|
user_data = users.login(g.inputs['username'], g.inputs['password']).get()
|
|
|
|
# Credentials valid
|
|
if user_data.mfa_apprise_url:
|
|
if g.inputs['mfa_code'] is not None:
|
|
# Validate code
|
|
mfa_code, exp = AuthManager.mfa_code_map.get(
|
|
user_data.id, ('', 0)
|
|
)
|
|
if not (
|
|
g.inputs['mfa_code'] == mfa_code
|
|
and exp > epoch_time()
|
|
):
|
|
raise AccessUnauthorized()
|
|
|
|
else:
|
|
mfa_code = generate_mfa_code()
|
|
|
|
AuthManager.mfa_code_map[user_data.id] = (
|
|
mfa_code, int(epoch_time()) + Constants.MFA_CODE_TIMEOUT
|
|
)
|
|
|
|
send_apprise_notification(
|
|
[user_data.mfa_apprise_url],
|
|
"MIND MFA Login Code",
|
|
f"Your login code is: {mfa_code}"
|
|
)
|
|
|
|
raise MFACodeRequired()
|
|
|
|
# Login successful
|
|
|
|
StartTypeHandlers.diffuse_timer(StartType.RESTART_DB_CHANGES)
|
|
StartTypeHandlers.diffuse_timer(StartType.RESTART_HOSTING_CHANGES)
|
|
AuthManager.cleanup()
|
|
|
|
# Generate an API key until one is generated that isn't used already
|
|
while True:
|
|
api_key = generate_api_key()
|
|
hashed_api_key = hash_api_key(api_key)
|
|
if hashed_api_key not in AuthManager.api_key_map:
|
|
break
|
|
|
|
login_time = Settings().sv.login_time
|
|
exp = int(epoch_time()) + login_time
|
|
AuthManager.api_key_map[hashed_api_key] = (user_data, exp)
|
|
|
|
result = {
|
|
'api_key': api_key,
|
|
'expires': exp,
|
|
'admin': user_data.admin
|
|
}
|
|
return return_api(result, code=201)
|
|
|
|
|
|
@api.route('/auth/logout', AuthLogoutData)
|
|
def api_logout():
|
|
del AuthManager.api_key_map[g.hashed_api_key]
|
|
return return_api({}, code=201)
|
|
|
|
|
|
@api.route('/auth/status', AuthStatusData)
|
|
def api_status():
|
|
result = {
|
|
'expires': g.exp,
|
|
'username': g.user_data.username,
|
|
'admin': g.user_data.admin
|
|
}
|
|
return return_api(result)
|
|
|
|
|
|
# region Users
|
|
@api.route('/user/add', UsersAddData)
|
|
def api_add_user():
|
|
inputs = g.inputs
|
|
users.add(inputs['username'], inputs['password'])
|
|
return return_api({}, code=201)
|
|
|
|
|
|
@api.route('/user', UsersData)
|
|
def api_manage_user():
|
|
user = users.get_one(g.user_data.id)
|
|
|
|
if request.method == 'GET':
|
|
result = user.get()
|
|
return return_api(result.todict())
|
|
|
|
elif request.method == 'PUT':
|
|
new_username = g.inputs['new_username']
|
|
new_password = g.inputs['new_password']
|
|
new_mfa_apprise_url = g.inputs['new_mfa_apprise_url']
|
|
|
|
if new_username:
|
|
user.update_username(new_username)
|
|
if new_password:
|
|
user.update_password(new_password)
|
|
if new_mfa_apprise_url != MISSING:
|
|
user.update_mfa_apprise_url(new_mfa_apprise_url)
|
|
|
|
return return_api(user.get().todict())
|
|
|
|
elif request.method == 'DELETE':
|
|
user.delete()
|
|
del AuthManager.api_key_map[g.hashed_api_key]
|
|
return return_api({})
|
|
|
|
|
|
# region Notification Services
|
|
@api.route('/notificationservices', NotificationServicesData)
|
|
def api_notification_services_list():
|
|
services = NotificationServices(g.user_data.id)
|
|
|
|
if request.method == 'GET':
|
|
result = services.get_all()
|
|
return return_api([r.todict() for r in result])
|
|
|
|
elif request.method == 'POST':
|
|
inputs = g.inputs
|
|
result = services.add(
|
|
title=inputs['title'],
|
|
url=inputs['url']
|
|
).get()
|
|
return return_api(result.todict(), code=201)
|
|
|
|
|
|
@api.route('/notificationservices/available', AvailableNotificationServicesData)
|
|
def api_notification_service_available():
|
|
result = get_apprise_services()
|
|
return return_api(result)
|
|
|
|
|
|
@api.route('/notificationservices/test', TestNotificationServiceURLData)
|
|
def api_test_service():
|
|
success = NotificationServices.test(g.inputs['url'])
|
|
return return_api(
|
|
{
|
|
'success': success == SendResult.SUCCESS,
|
|
'description': success.value
|
|
},
|
|
code=201
|
|
)
|
|
|
|
|
|
@api.route('/notificationservices/<int:n_id>', NotificationServiceData)
|
|
def api_notification_service(n_id: int):
|
|
inputs = g.inputs
|
|
service = NotificationServices(g.user_data.id).get_one(n_id)
|
|
|
|
if request.method == 'GET':
|
|
result = service.get()
|
|
return return_api(result.todict())
|
|
|
|
elif request.method == 'PUT':
|
|
result = service.update(
|
|
title=inputs['title'],
|
|
url=inputs['url']
|
|
)
|
|
return return_api(result.todict())
|
|
|
|
elif request.method == 'DELETE':
|
|
service.delete(
|
|
inputs['delete_reminders_using']
|
|
)
|
|
return return_api({})
|
|
|
|
|
|
# region Reminders
|
|
@api.route('/reminders', RemindersData)
|
|
def api_reminders_list():
|
|
inputs = g.inputs
|
|
reminders = Reminders(g.user_data.id)
|
|
|
|
if request.method == 'GET':
|
|
result = reminders.get_all(inputs['sort_by'])
|
|
return return_api([r.todict() for r in result])
|
|
|
|
elif request.method == 'POST':
|
|
result = reminders.add(
|
|
title=inputs['title'],
|
|
time=inputs['time'],
|
|
notification_services=inputs['notification_services'],
|
|
text=inputs['text'],
|
|
repeat_quantity=inputs['repeat_quantity'],
|
|
repeat_interval=inputs['repeat_interval'],
|
|
weekdays=inputs['weekdays'],
|
|
cron_schedule=inputs['cron_schedule'],
|
|
color=inputs['color'],
|
|
enabled=inputs['enabled']
|
|
)
|
|
return return_api(result.get().todict(), code=201)
|
|
|
|
|
|
@api.route('/reminders/search', SearchRemindersData)
|
|
def api_reminders_query():
|
|
inputs = g.inputs
|
|
reminders = Reminders(g.user_data.id)
|
|
result = reminders.search(inputs['query'], inputs['sort_by'])
|
|
return return_api([r.todict() for r in result])
|
|
|
|
|
|
@api.route('/reminders/test', TestRemindersData)
|
|
def api_test_reminder():
|
|
inputs = g.inputs
|
|
success = Reminders(g.user_data.id).test_reminder(
|
|
inputs['title'],
|
|
inputs['notification_services'],
|
|
inputs['text']
|
|
)
|
|
return return_api(
|
|
{
|
|
'success': success == SendResult.SUCCESS,
|
|
'description': success.value
|
|
},
|
|
code=201
|
|
)
|
|
|
|
|
|
@api.route('/reminders/<int:r_id>', ReminderData)
|
|
def api_get_reminder(r_id: int):
|
|
reminder = Reminders(g.user_data.id).get_one(r_id)
|
|
|
|
if request.method == 'GET':
|
|
result = reminder.get()
|
|
return return_api(result.todict())
|
|
|
|
elif request.method == 'PUT':
|
|
inputs = g.inputs
|
|
result = reminder.update(
|
|
title=inputs['title'],
|
|
time=inputs['time'],
|
|
notification_services=inputs['notification_services'],
|
|
text=inputs['text'],
|
|
repeat_quantity=inputs['repeat_quantity'],
|
|
repeat_interval=inputs['repeat_interval'],
|
|
weekdays=inputs['weekdays'],
|
|
cron_schedule=inputs['cron_schedule'],
|
|
color=inputs['color'],
|
|
enabled=inputs['enabled']
|
|
)
|
|
return return_api(result.todict())
|
|
|
|
elif request.method == 'DELETE':
|
|
reminder.delete()
|
|
return return_api({})
|
|
|
|
|
|
# region Templates
|
|
@api.route('/templates', TemplatesData)
|
|
def api_get_templates():
|
|
inputs = g.inputs
|
|
templates = Templates(g.user_data.id)
|
|
|
|
if request.method == 'GET':
|
|
result = templates.get_all(inputs['sort_by'])
|
|
return return_api([r.todict() for r in result])
|
|
|
|
elif request.method == 'POST':
|
|
result = templates.add(
|
|
title=inputs['title'],
|
|
notification_services=inputs['notification_services'],
|
|
text=inputs['text'],
|
|
color=inputs['color']
|
|
)
|
|
return return_api(result.get().todict(), code=201)
|
|
|
|
|
|
@api.route('/templates/search', SearchTemplatesData)
|
|
def api_templates_query():
|
|
inputs = g.inputs
|
|
templates = Templates(g.user_data.id)
|
|
result = templates.search(inputs['query'], inputs['sort_by'])
|
|
return return_api([r.todict() for r in result])
|
|
|
|
|
|
@api.route('/templates/<int:t_id>', TemplateData)
|
|
def api_get_template(t_id: int):
|
|
template = Templates(g.user_data.id).get_one(t_id)
|
|
|
|
if request.method == 'GET':
|
|
result = template.get()
|
|
return return_api(result.todict())
|
|
|
|
elif request.method == 'PUT':
|
|
inputs = g.inputs
|
|
result = template.update(
|
|
title=inputs['title'],
|
|
notification_services=inputs['notification_services'],
|
|
text=inputs['text'],
|
|
color=inputs['color']
|
|
)
|
|
return return_api(result.todict())
|
|
|
|
elif request.method == 'DELETE':
|
|
template.delete()
|
|
return return_api({})
|
|
|
|
|
|
# region Static Reminders
|
|
@api.route('/staticreminders', StaticRemindersData)
|
|
def api_static_reminders_list():
|
|
inputs = g.inputs
|
|
reminders = StaticReminders(g.user_data.id)
|
|
|
|
if request.method == 'GET':
|
|
result = reminders.get_all(inputs['sort_by'])
|
|
return return_api([r.todict() for r in result])
|
|
|
|
elif request.method == 'POST':
|
|
result = reminders.add(
|
|
title=inputs['title'],
|
|
notification_services=inputs['notification_services'],
|
|
text=inputs['text'],
|
|
color=inputs['color']
|
|
)
|
|
return return_api(result.get().todict(), code=201)
|
|
|
|
|
|
@api.route('/staticreminders/search', SearchStaticRemindersData)
|
|
def api_static_reminders_query():
|
|
inputs = g.inputs
|
|
result = StaticReminders(g.user_data.id).search(
|
|
inputs['query'], inputs['sort_by']
|
|
)
|
|
return return_api([r.todict() for r in result])
|
|
|
|
|
|
@api.route('/staticreminders/<int:s_id>', StaticReminderData)
|
|
def api_get_static_reminder(s_id: int):
|
|
reminder = StaticReminders(g.user_data.id).get_one(s_id)
|
|
|
|
if request.method == 'GET':
|
|
result = reminder.get()
|
|
return return_api(result.todict())
|
|
|
|
elif request.method == 'POST':
|
|
success = reminder.trigger_reminder()
|
|
return return_api(
|
|
{
|
|
'success': success == SendResult.SUCCESS,
|
|
'description': success.value
|
|
},
|
|
code=201
|
|
)
|
|
|
|
elif request.method == 'PUT':
|
|
inputs = g.inputs
|
|
result = reminder.update(
|
|
title=inputs['title'],
|
|
notification_services=inputs['notification_services'],
|
|
text=inputs['text'],
|
|
color=inputs['color']
|
|
)
|
|
return return_api(result.todict())
|
|
|
|
elif request.method == 'DELETE':
|
|
reminder.delete()
|
|
return return_api({})
|
|
|
|
|
|
# region Admin Settings
|
|
@admin_api.route('/shutdown', ShutdownData)
|
|
def api_shutdown():
|
|
Server().shutdown()
|
|
return return_api({})
|
|
|
|
|
|
@admin_api.route('/restart', RestartData)
|
|
def api_restart():
|
|
Server().restart()
|
|
return return_api({})
|
|
|
|
|
|
@api.route('/settings', PublicSettingsData)
|
|
def api_settings():
|
|
return return_api(Settings().get_public_settings().todict())
|
|
|
|
|
|
@api.route('/about', AboutData)
|
|
def api_about():
|
|
return return_api(get_about_data())
|
|
|
|
|
|
@admin_api.route('/settings', SettingsData)
|
|
def api_admin_settings():
|
|
inputs = g.inputs
|
|
settings = Settings()
|
|
|
|
if request.method == 'GET':
|
|
return return_api(settings.get_public_settings().todict())
|
|
|
|
elif request.method == 'PUT':
|
|
LOGGER.info(f'Submitting admin settings: {inputs}')
|
|
|
|
hosting_changes = any(
|
|
inputs[s] is not None
|
|
and inputs[s] != getattr(settings.sv, s)
|
|
for s in ('host', 'port', 'url_prefix')
|
|
)
|
|
|
|
if hosting_changes:
|
|
settings.backup_hosting_settings()
|
|
|
|
settings.update(
|
|
{
|
|
k: v
|
|
for k, v in inputs.items()
|
|
if v is not None
|
|
},
|
|
from_public=True
|
|
)
|
|
|
|
if hosting_changes:
|
|
Server().restart(StartType.RESTART_HOSTING_CHANGES)
|
|
|
|
return return_api({})
|
|
|
|
elif request.method == 'DELETE':
|
|
hosting_changes = any(
|
|
s in inputs["setting_keys"]
|
|
and settings.get_default_value(s) != getattr(settings.sv, s)
|
|
for s in ('host', 'port', 'url_prefix')
|
|
)
|
|
|
|
if hosting_changes:
|
|
settings.backup_hosting_settings()
|
|
|
|
for setting in inputs["setting_keys"]:
|
|
settings.reset(setting, from_public=True)
|
|
|
|
if hosting_changes:
|
|
Server().restart(StartType.RESTART_HOSTING_CHANGES)
|
|
|
|
return return_api({})
|
|
|
|
|
|
@admin_api.route('/logs', LogfileData)
|
|
def api_admin_logs():
|
|
sio = get_log_file_contents()
|
|
|
|
return send_file(
|
|
BytesIO(sio.getvalue().encode('utf-8')),
|
|
mimetype="application/octet-stream",
|
|
download_name=f'MIND_log_{datetime.now().strftime("%Y_%m_%d_%H_%M")}.txt'
|
|
), 200
|
|
|
|
|
|
# region Admin Users
|
|
@admin_api.route('/users', UsersManagementData)
|
|
def api_admin_users():
|
|
if request.method == 'GET':
|
|
result = users.get_all()
|
|
return return_api([r.todict() for r in result])
|
|
|
|
elif request.method == 'POST':
|
|
inputs = g.inputs
|
|
users.add(
|
|
inputs['username'], inputs['password'],
|
|
force=True
|
|
)
|
|
return return_api({}, code=201)
|
|
|
|
|
|
@admin_api.route('/users/<int:u_id>', UserManagementData)
|
|
def api_admin_user(u_id: int):
|
|
user = users.get_one(u_id)
|
|
if request.method == 'PUT':
|
|
new_username = g.inputs['new_username']
|
|
new_password = g.inputs['new_password']
|
|
new_mfa_apprise_url = g.inputs['mfa_apprise_url']
|
|
|
|
if new_username:
|
|
user.update_username(new_username)
|
|
if new_password:
|
|
user.update_password(new_password)
|
|
if new_mfa_apprise_url != MISSING:
|
|
user.update_mfa_apprise_url(new_mfa_apprise_url)
|
|
|
|
return return_api({})
|
|
|
|
elif request.method == 'DELETE':
|
|
user.delete()
|
|
AuthManager.remove_user(u_id)
|
|
return return_api({})
|
|
|
|
|
|
# region Admin Database
|
|
@admin_api.route('/database', DatabaseData)
|
|
def api_admin_database():
|
|
if request.method == "GET":
|
|
filepath = create_database_copy(folder_path('db'))
|
|
|
|
@after_this_request
|
|
def remove_file(response):
|
|
remove(filepath)
|
|
return response
|
|
|
|
return send_file(
|
|
filepath,
|
|
mimetype="application/x-sqlite3",
|
|
download_name=basename(filepath)
|
|
), 200
|
|
|
|
elif request.method == "POST":
|
|
inputs = g.inputs
|
|
import_db(inputs['file'], inputs['copy_hosting_settings'])
|
|
return return_api({})
|
|
|
|
|
|
@admin_api.route('/database/backups', BackupsData)
|
|
def api_admin_backups():
|
|
return return_api(get_backups())
|
|
|
|
|
|
@admin_api.route('/database/backups/<int:b_idx>', BackupData)
|
|
def api_admin_backup(b_idx: int):
|
|
if request.method == "GET":
|
|
filepath = get_backup(b_idx)['filepath']
|
|
return send_file(
|
|
filepath,
|
|
mimetype="application/x-sqlite3",
|
|
download_name=basename(filepath)
|
|
), 200
|
|
|
|
elif request.method == "POST":
|
|
import_db_backup(b_idx, g.inputs['copy_hosting_settings'])
|
|
return return_api({})
|