Files
MIND/frontend/api.py
CasVT b68284530b Changed generation and hashing of API keys
Moved from generating API keys using os.urandom to secrets.token_hex. Moved from hashing the keys using stdlib.hash to hashlib.sha256.
2025-08-26 00:50:46 +02:00

618 lines
19 KiB
Python

# -*- coding: utf-8 -*-
from datetime import datetime
from hashlib import sha256
from io import BytesIO, StringIO
from os import remove
from os.path import basename, exists
from secrets import token_hex
from time import time as epoch_time
from typing import Any, Dict
from flask import g, request, send_file
from backend.base.custom_exceptions import (APIKeyExpired, APIKeyInvalid,
LogFileNotFound)
from backend.base.definitions import (ApiKeyEntry, Constants,
SendResult, StartType)
from backend.base.helpers import folder_path, return_api
from backend.base.logging import LOGGER, get_log_filepath
from backend.implementations.apprise_parser import get_apprise_services
from backend.implementations.notification_services import NotificationServices
from backend.implementations.reminders import Reminders
from backend.implementations.static_reminders import StaticReminders
from backend.implementations.templates import Templates
from backend.implementations.users import Users
from backend.internals.db_backup_import import (create_database_copy,
get_backup, get_backups,
import_db, import_db_backup)
from backend.internals.server import Server, StartTypeHandlers
from backend.internals.settings import Settings, get_about_data
from frontend.input_validation import (AboutData, AuthLoginData,
AuthLogoutData, AuthStatusData,
AvailableNotificationServicesData,
BackupData, BackupsData, DatabaseData,
LogfileData, NotificationServiceData,
NotificationServicesData,
PublicSettingsData, ReminderData,
RemindersData, RestartData,
SearchRemindersData,
SearchStaticRemindersData,
SearchTemplatesData, SettingsData,
ShutdownData, StaticReminderData,
StaticRemindersData, TemplateData,
TemplatesData,
TestNotificationServiceURLData,
TestRemindersData, UserManagementData,
UsersAddData, UsersData,
UsersManagementData, admin_api, api,
get_api_docs, input_validation)
# region Auth and input
users = Users()
api_key_map: Dict[str, ApiKeyEntry] = {}
def auth() -> None:
"""Checks if the client is logged in.
Raises:
APIKeyInvalid: The api key supplied is invalid.
APIKeyExpired: The api key supplied has expired.
"""
api_key = request.values.get('api_key', '')
hashed_api_key = sha256(api_key.encode('utf-8')).hexdigest()
if hashed_api_key not in api_key_map:
raise APIKeyInvalid(api_key)
map_entry = api_key_map[hashed_api_key]
user_data = map_entry.user_data.get()
if (
user_data.admin
and not request.path.startswith(
(Constants.ADMIN_PREFIX, Constants.API_PREFIX + '/auth')
)
):
raise APIKeyInvalid(api_key)
if (
not user_data.admin
and
request.path.startswith(Constants.ADMIN_PREFIX)
):
raise APIKeyInvalid(api_key)
if map_entry.exp <= epoch_time():
raise APIKeyExpired(api_key)
# Api key valid
sv = Settings().get_settings()
if sv.login_time_reset:
g.exp = map_entry.exp = (
int(epoch_time()) + sv.login_time
)
else:
g.exp = map_entry.exp
g.hashed_api_key = hashed_api_key
g.user_data = map_entry.user_data
return
@api.before_request
@admin_api.before_request
def api_auth_and_input_validation() -> None:
requires_auth = get_api_docs(request).requires_auth
if requires_auth:
auth()
g.inputs = input_validation()
return
# region Auth
@api.route('/auth/login', AuthLoginData)
def api_login():
inputs: Dict[str, Any] = g.inputs
user = users.login(inputs['username'], inputs['password'])
# Login successful
StartTypeHandlers.diffuse_timer(StartType.RESTART_DB_CHANGES)
StartTypeHandlers.diffuse_timer(StartType.RESTART_HOSTING_CHANGES)
# Generate an API key until one is generated that isn't used already
while True:
# Each byte is represented by two hexadecimal characters, so halve
# the desired amount of bytes.
api_key = token_hex(Constants.API_KEY_LENGTH // 2)
hashed_api_key = sha256(api_key.encode('utf-8')).hexdigest()
if hashed_api_key not in api_key_map:
break
login_time = Settings().sv.login_time
exp = int(epoch_time()) + login_time
api_key_map[hashed_api_key] = ApiKeyEntry(exp, user)
result = {
'api_key': api_key,
'expires': exp,
'admin': user.get().admin
}
return return_api(result, code=201)
@api.route('/auth/logout', AuthLogoutData)
def api_logout():
api_key_map.pop(g.hashed_api_key)
return return_api({}, code=201)
@api.route('/auth/status', AuthStatusData)
def api_status():
map_entry = api_key_map[g.hashed_api_key]
user_data = map_entry.user_data.get()
result = {
'expires': map_entry.exp,
'username': user_data.username,
'admin': user_data.admin
}
return return_api(result)
# region User
@api.route('/user/add', UsersAddData)
def api_add_user():
inputs: Dict[str, Any] = g.inputs
users.add(inputs['username'], inputs['password'])
return return_api({}, code=201)
@api.route('/user', UsersData)
def api_manage_user():
user = api_key_map[g.hashed_api_key].user_data
if request.method == 'PUT':
inputs: Dict[str, Any] = g.inputs
if inputs['new_username']:
user.update_username(inputs['new_username'])
if inputs['new_password']:
user.update_password(inputs['new_password'])
return return_api({})
elif request.method == 'DELETE':
user.delete()
api_key_map.pop(g.hashed_api_key)
return return_api({})
# region Notification Service
@api.route('/notificationservices', NotificationServicesData)
def api_notification_services_list():
services = NotificationServices(
api_key_map[g.hashed_api_key].user_data.user_id
)
if request.method == 'GET':
result = services.get_all()
return return_api(result=[r.todict() for r in result])
elif request.method == 'POST':
inputs: Dict[str, Any] = g.inputs
result = services.add(
title=inputs['title'],
url=inputs['url']
).get()
return return_api(result.todict(), code=201)
@api.route('/notificationservices/available', AvailableNotificationServicesData)
def api_notification_service_available():
result = get_apprise_services()
return return_api(result) # type: ignore
@api.route('/notificationservices/test', TestNotificationServiceURLData)
def api_test_service():
success = NotificationServices.test(g.inputs['url'])
return return_api(
{
'success': success == SendResult.SUCCESS,
'description': success.value
},
code=201
)
@api.route('/notificationservices/<int:n_id>', NotificationServiceData)
def api_notification_service(n_id: int):
inputs: Dict[str, Any] = g.inputs
user_id = api_key_map[g.hashed_api_key].user_data.user_id
service = NotificationServices(user_id).get_one(n_id)
if request.method == 'GET':
result = service.get()
return return_api(result.todict())
elif request.method == 'PUT':
result = service.update(
title=inputs['title'],
url=inputs['url']
)
return return_api(result.todict())
elif request.method == 'DELETE':
service.delete(
inputs['delete_reminders_using']
)
return return_api({})
# region Library
@api.route('/reminders', RemindersData)
def api_reminders_list():
inputs: Dict[str, Any] = g.inputs
reminders = Reminders(api_key_map[g.hashed_api_key].user_data.user_id)
if request.method == 'GET':
result = reminders.get_all(inputs['sort_by'])
return return_api([r.todict() for r in result])
elif request.method == 'POST':
result = reminders.add(
title=inputs['title'],
time=inputs['time'],
notification_services=inputs['notification_services'],
text=inputs['text'],
repeat_quantity=inputs['repeat_quantity'],
repeat_interval=inputs['repeat_interval'],
weekdays=inputs['weekdays'],
cron_schedule=inputs['cron_schedule'],
color=inputs['color'],
enabled=inputs['enabled']
)
return return_api(result.get().todict(), code=201)
@api.route('/reminders/search', SearchRemindersData)
def api_reminders_query():
inputs: Dict[str, Any] = g.inputs
reminders = Reminders(api_key_map[g.hashed_api_key].user_data.user_id)
result = reminders.search(inputs['query'], inputs['sort_by'])
return return_api([r.todict() for r in result])
@api.route('/reminders/test', TestRemindersData)
def api_test_reminder():
inputs: Dict[str, Any] = g.inputs
Reminders(
api_key_map[g.hashed_api_key].user_data.user_id
).test_reminder(
inputs['title'],
inputs['notification_services'],
inputs['text']
)
return return_api({}, code=201)
@api.route('/reminders/<int:r_id>', ReminderData)
def api_get_reminder(r_id: int):
reminders = Reminders(
api_key_map[g.hashed_api_key].user_data.user_id
)
if request.method == 'GET':
result = reminders.get_one(r_id).get()
return return_api(result.todict())
elif request.method == 'PUT':
inputs: Dict[str, Any] = g.inputs
result = reminders.get_one(r_id).update(
title=inputs['title'],
time=inputs['time'],
notification_services=inputs['notification_services'],
text=inputs['text'],
repeat_quantity=inputs['repeat_quantity'],
repeat_interval=inputs['repeat_interval'],
weekdays=inputs['weekdays'],
cron_schedule=inputs['cron_schedule'],
color=inputs['color'],
enabled=inputs['enabled']
)
return return_api(result.todict())
elif request.method == 'DELETE':
reminders.get_one(r_id).delete()
return return_api({})
# region Template
@api.route('/templates', TemplatesData)
def api_get_templates():
inputs: Dict[str, Any] = g.inputs
templates = Templates(
api_key_map[g.hashed_api_key].user_data.user_id
)
if request.method == 'GET':
result = templates.get_all(inputs['sort_by'])
return return_api([r.todict() for r in result])
elif request.method == 'POST':
result = templates.add(
title=inputs['title'],
notification_services=inputs['notification_services'],
text=inputs['text'],
color=inputs['color']
)
return return_api(result.get().todict(), code=201)
@api.route('/templates/search', SearchTemplatesData)
def api_templates_query():
inputs: Dict[str, Any] = g.inputs
templates = Templates(
api_key_map[g.hashed_api_key].user_data.user_id
)
result = templates.search(inputs['query'], inputs['sort_by'])
return return_api([r.todict() for r in result])
@api.route('/templates/<int:t_id>', TemplateData)
def api_get_template(t_id: int):
template = Templates(
api_key_map[g.hashed_api_key].user_data.user_id
).get_one(t_id)
if request.method == 'GET':
result = template.get()
return return_api(result.todict())
elif request.method == 'PUT':
inputs: Dict[str, Any] = g.inputs
result = template.update(
title=inputs['title'],
notification_services=inputs['notification_services'],
text=inputs['text'],
color=inputs['color']
)
return return_api(result.todict())
elif request.method == 'DELETE':
template.delete()
return return_api({})
# region Static Reminder
@api.route('/staticreminders', StaticRemindersData)
def api_static_reminders_list():
inputs: Dict[str, Any] = g.inputs
reminders = StaticReminders(
api_key_map[g.hashed_api_key].user_data.user_id
)
if request.method == 'GET':
result = reminders.get_all(inputs['sort_by'])
return return_api([r.todict() for r in result])
elif request.method == 'POST':
result = reminders.add(
title=inputs['title'],
notification_services=inputs['notification_services'],
text=inputs['text'],
color=inputs['color']
)
return return_api(result.get().todict(), code=201)
@api.route('/staticreminders/search', SearchStaticRemindersData)
def api_static_reminders_query():
inputs: Dict[str, Any] = g.inputs
result = StaticReminders(
api_key_map[g.hashed_api_key].user_data.user_id
).search(inputs['query'], inputs['sort_by'])
return return_api([r.todict() for r in result])
@api.route('/staticreminders/<int:s_id>', StaticReminderData)
def api_get_static_reminder(s_id: int):
reminders = StaticReminders(
api_key_map[g.hashed_api_key].user_data.user_id
)
if request.method == 'GET':
result = reminders.get_one(s_id).get()
return return_api(result.todict())
elif request.method == 'POST':
reminders.get_one(s_id).trigger_reminder()
return return_api({}, code=201)
elif request.method == 'PUT':
inputs: Dict[str, Any] = g.inputs
result = reminders.get_one(s_id).update(
title=inputs['title'],
notification_services=inputs['notification_services'],
text=inputs['text'],
color=inputs['color']
)
return return_api(result.todict())
elif request.method == 'DELETE':
reminders.get_one(s_id).delete()
return return_api({})
# region Admin Panel
@admin_api.route('/shutdown', ShutdownData)
def api_shutdown():
Server().shutdown()
return return_api({})
@admin_api.route('/restart', RestartData)
def api_restart():
Server().restart()
return return_api({})
@api.route('/settings', PublicSettingsData)
def api_settings():
return return_api(Settings().get_public_settings().todict())
@api.route('/about', AboutData)
def api_about():
return return_api(get_about_data())
@admin_api.route('/settings', SettingsData)
def api_admin_settings():
inputs: Dict[str, Any] = g.inputs
settings = Settings()
if request.method == 'GET':
return return_api(settings.get_public_settings().todict())
elif request.method == 'PUT':
LOGGER.info(f'Submitting admin settings: {inputs}')
hosting_changes = any(
inputs[s] is not None
and inputs[s] != getattr(settings.sv, s)
for s in ('host', 'port', 'url_prefix')
)
if hosting_changes:
settings.backup_hosting_settings()
settings.update(
{
k: v
for k, v in inputs.items()
if v is not None
},
from_public=True
)
if hosting_changes:
Server().restart(StartType.RESTART_HOSTING_CHANGES)
return return_api({})
elif request.method == 'DELETE':
hosting_changes = any(
s in inputs["setting_keys"]
and settings.get_default_value(s) != getattr(settings.sv, s)
for s in ('host', 'port', 'url_prefix')
)
if hosting_changes:
settings.backup_hosting_settings()
for setting in inputs["setting_keys"]:
settings.reset(setting, from_public=True)
if hosting_changes:
Server().restart(StartType.RESTART_HOSTING_CHANGES)
return return_api({})
@admin_api.route('/logs', LogfileData)
def api_admin_logs():
file = get_log_filepath()
if not exists(file):
raise LogFileNotFound(file)
sio = StringIO()
for ext in ('.1', ''):
lf = file + ext
if not exists(lf):
continue
with open(lf, 'r') as f:
sio.writelines(f)
return send_file(
BytesIO(sio.getvalue().encode('utf-8')),
mimetype="application/octet-stream",
download_name=f'MIND_log_{datetime.now().strftime("%Y_%m_%d_%H_%M")}.txt'
), 200
@admin_api.route('/users', UsersManagementData)
def api_admin_users():
if request.method == 'GET':
result = users.get_all()
return return_api([r.todict() for r in result])
elif request.method == 'POST':
inputs: Dict[str, Any] = g.inputs
users.add(inputs['username'], inputs['password'], True)
return return_api({}, code=201)
@admin_api.route('/users/<int:u_id>', UserManagementData)
def api_admin_user(u_id: int):
user = users.get_one(u_id)
if request.method == 'PUT':
inputs: Dict[str, Any] = g.inputs
if inputs['new_username']:
user.update_username(inputs['new_username'])
if inputs['new_password']:
user.update_password(inputs['new_password'])
return return_api({})
elif request.method == 'DELETE':
user.delete()
for key, value in api_key_map.items():
if value.user_data.user_id == u_id:
del api_key_map[key]
break
return return_api({})
@admin_api.route('/database', DatabaseData)
def api_admin_database():
if request.method == "GET":
filename = create_database_copy(folder_path('db'))
# We cannot simply pass the filename, ass we have to
# delete the file, but we cannot do that if we send it.
with open(filename, 'rb') as database_file:
bi = BytesIO(database_file.read())
remove(filename)
return send_file(
bi,
mimetype="application/x-sqlite3",
download_name=basename(filename)
), 200
elif request.method == "POST":
inputs: Dict[str, Any] = g.inputs
import_db(inputs['file'], inputs['copy_hosting_settings'])
return return_api({})
@admin_api.route('/database/backups', BackupsData)
def api_admin_backups():
return return_api(get_backups())
@admin_api.route('/database/backups/<int:b_idx>', BackupData)
def api_admin_backup(b_idx: int):
if request.method == "GET":
return send_file(
get_backup(b_idx)['filepath'],
mimetype="application/x-sqlite3"
), 200
elif request.method == "POST":
inputs: Dict[str, Any] = g.inputs
import_db_backup(b_idx, inputs['copy_hosting_settings'])
return return_api({})