mirror of
https://github.com/DrewThomasson/ebook2audiobook.git
synced 2026-01-13 07:48:02 -05:00
2527 lines
121 KiB
Python
2527 lines
121 KiB
Python
# NOTE!!NOTE!!!NOTE!!NOTE!!!NOTE!!NOTE!!!NOTE!!NOTE!!!
|
|
# THE WORD "CHAPTER" IN THE CODE DOES NOT MEAN
|
|
# IT'S THE REAL CHAPTER OF THE EBOOK SINCE NO STANDARDS
|
|
# ARE DEFINING A CHAPTER ON .EPUB FORMAT. THE WORD "PART"
|
|
# IS USED TO PRINT TO THE TERMINAL, AND "CHAPTER" TO THE CODE
|
|
# WHICH IS LESS GENERIC FOR THE DEVELOPERS
|
|
|
|
import argparse
|
|
import asyncio
|
|
import csv
|
|
import docker
|
|
import ebooklib
|
|
import fnmatch
|
|
import gc
|
|
import gradio as gr
|
|
import hashlib
|
|
import json
|
|
import os
|
|
import platform
|
|
import psutil
|
|
import random
|
|
import regex as re
|
|
import requests
|
|
import shutil
|
|
import socket
|
|
import subprocess
|
|
import sys
|
|
import threading
|
|
import time
|
|
import torch
|
|
import urllib.request
|
|
import uuid
|
|
import zipfile
|
|
import traceback
|
|
|
|
import lib.conf as conf
|
|
import lib.lang as lang
|
|
import lib.models as mod
|
|
|
|
from bs4 import BeautifulSoup
|
|
from collections import Counter
|
|
from collections.abc import Mapping
|
|
from collections.abc import MutableMapping
|
|
from datetime import datetime
|
|
from ebooklib import epub
|
|
from fastapi import FastAPI
|
|
from glob import glob
|
|
from iso639 import languages
|
|
from multiprocessing import Manager, Event
|
|
from multiprocessing.managers import DictProxy, ListProxy
|
|
from num2words import num2words
|
|
from pathlib import Path
|
|
from pydub import AudioSegment
|
|
from queue import Queue, Empty
|
|
from starlette.requests import ClientDisconnect
|
|
from tqdm import tqdm
|
|
from types import MappingProxyType
|
|
from urllib.parse import urlparse
|
|
|
|
from lib.classes.redirect_console import RedirectConsole
|
|
from lib.classes.voice_extractor import VoiceExtractor
|
|
from lib.classes.tts_manager import TTSManager
|
|
|
|
def inject_configs(target_namespace):
|
|
# Extract variables from both modules and inject them into the target namespace
|
|
for module in (conf, lang, mod):
|
|
target_namespace.update({k: v for k, v in vars(module).items() if not k.startswith('__')})
|
|
|
|
# Inject configurations into the global namespace of this module
|
|
inject_configs(globals())
|
|
|
|
class DependencyError(Exception):
|
|
def __init__(self, message=None):
|
|
super().__init__(message)
|
|
print(message)
|
|
# Automatically handle the exception when it's raised
|
|
self.handle_exception()
|
|
|
|
def handle_exception(self):
|
|
# Print the full traceback of the exception
|
|
traceback.print_exc()
|
|
# Print the exception message
|
|
print(f'Caught DependencyError: {self}')
|
|
# Exit the script if it's not a web process
|
|
if not is_gui_process:
|
|
sys.exit(1)
|
|
|
|
def recursive_proxy(data, manager=None):
|
|
if manager is None:
|
|
manager = Manager()
|
|
if isinstance(data, dict):
|
|
proxy_dict = manager.dict()
|
|
for key, value in data.items():
|
|
proxy_dict[key] = recursive_proxy(value, manager)
|
|
return proxy_dict
|
|
elif isinstance(data, list):
|
|
proxy_list = manager.list()
|
|
for item in data:
|
|
proxy_list.append(recursive_proxy(item, manager))
|
|
return proxy_list
|
|
elif isinstance(data, (str, int, float, bool, type(None))):
|
|
return data
|
|
else:
|
|
error = f"Unsupported data type: {type(data)}"
|
|
print(error)
|
|
return
|
|
|
|
class SessionContext:
|
|
def __init__(self):
|
|
self.manager = Manager()
|
|
self.sessions = self.manager.dict() # Store all session-specific contexts
|
|
self.cancellation_events = {} # Store multiprocessing.Event for each session
|
|
|
|
def get_session(self, id):
|
|
if id not in self.sessions:
|
|
self.sessions[id] = recursive_proxy({
|
|
"script_mode": NATIVE,
|
|
"id": id,
|
|
"process_id": None,
|
|
"device": default_device,
|
|
"system": None,
|
|
"client": None,
|
|
"language": default_language_code,
|
|
"language_iso1": None,
|
|
"audiobook": None,
|
|
"audiobooks_dir": None,
|
|
"process_dir": None,
|
|
"ebook": None,
|
|
"ebook_list": None,
|
|
"ebook_mode": "single",
|
|
"chapters_dir": None,
|
|
"chapters_dir_sentences": None,
|
|
"epub_path": None,
|
|
"filename_noext": None,
|
|
"tts_engine": default_tts_engine,
|
|
"fine_tuned": default_fine_tuned,
|
|
"voice": None,
|
|
"voice_dir": None,
|
|
"custom_model": None,
|
|
"custom_model_dir": None,
|
|
"chapters": None,
|
|
"cover": None,
|
|
"status": None,
|
|
"progress": 0,
|
|
"time": None,
|
|
"cancellation_requested": False,
|
|
"temperature": tts_default_settings['temperature'],
|
|
"length_penalty": tts_default_settings['length_penalty'],
|
|
"num_beams": tts_default_settings['num_beams'],
|
|
"length_scale": tts_default_settings['length_scale'],
|
|
"noise_scale": tts_default_settings['noise_scale'],
|
|
"repetition_penalty": tts_default_settings['repetition_penalty'],
|
|
"top_k": tts_default_settings['top_k'],
|
|
"top_p": tts_default_settings['top_k'],
|
|
"speed": tts_default_settings['speed'],
|
|
"enable_text_splitting": tts_default_settings['enable_text_splitting'],
|
|
"event": None,
|
|
"output_format": default_output_format,
|
|
"metadata": {
|
|
"title": None,
|
|
"creator": None,
|
|
"contributor": None,
|
|
"language": None,
|
|
"identifier": None,
|
|
"publisher": None,
|
|
"date": None,
|
|
"description": None,
|
|
"subject": None,
|
|
"rights": None,
|
|
"format": None,
|
|
"type": None,
|
|
"coverage": None,
|
|
"relation": None,
|
|
"Source": None,
|
|
"Modified": None,
|
|
}
|
|
}, manager=self.manager)
|
|
return self.sessions[id]
|
|
|
|
app = FastAPI()
|
|
lock = threading.Lock()
|
|
context = SessionContext()
|
|
is_gui_process = False
|
|
|
|
def prepare_dirs(src, session):
|
|
try:
|
|
resume = False
|
|
os.makedirs(os.path.join(models_dir,'tts'), exist_ok=True)
|
|
os.makedirs(session['session_dir'], exist_ok=True)
|
|
os.makedirs(session['process_dir'], exist_ok=True)
|
|
os.makedirs(session['custom_model_dir'], exist_ok=True)
|
|
os.makedirs(session['voice_dir'], exist_ok=True)
|
|
os.makedirs(session['audiobooks_dir'], exist_ok=True)
|
|
session['ebook'] = os.path.join(session['process_dir'], os.path.basename(src))
|
|
if os.path.exists(session['ebook']):
|
|
if compare_files_by_hash(session['ebook'], src):
|
|
resume = True
|
|
if not resume:
|
|
shutil.rmtree(session['chapters_dir'], ignore_errors=True)
|
|
os.makedirs(session['chapters_dir'], exist_ok=True)
|
|
os.makedirs(session['chapters_dir_sentences'], exist_ok=True)
|
|
shutil.copy(src, session['ebook'])
|
|
return True
|
|
except Exception as e:
|
|
DependencyError(e)
|
|
return False
|
|
|
|
def check_programs(prog_name, command, options):
|
|
try:
|
|
subprocess.run(
|
|
[command, options],
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE,
|
|
check=True,
|
|
text=True,
|
|
universal_newlines=True,
|
|
encoding='utf-8'
|
|
)
|
|
return True, None
|
|
except FileNotFoundError:
|
|
e = f'''********** Error: {prog_name} is not installed! if your OS calibre package version
|
|
is not compatible you still can run ebook2audiobook.sh (linux/mac) or ebook2audiobook.cmd (windows) **********'''
|
|
DependencyError(e)
|
|
return False, None
|
|
except subprocess.CalledProcessError:
|
|
e = f'Error: There was an issue running {prog_name}.'
|
|
DependencyError(e)
|
|
return False, None
|
|
|
|
def analyze_uploaded_file(zip_path, required_files=None):
|
|
try:
|
|
if not os.path.exists(zip_path):
|
|
error = f"The file does not exist: {os.path.basename(zip_path)}"
|
|
print(error)
|
|
return False
|
|
if required_files is None:
|
|
required_files = default_xtts_files
|
|
files_in_zip = {}
|
|
empty_files = set()
|
|
with zipfile.ZipFile(zip_path, 'r') as zf:
|
|
for file_info in zf.infolist():
|
|
file_name = file_info.filename
|
|
if file_info.is_dir():
|
|
continue
|
|
base_name = os.path.basename(file_name)
|
|
files_in_zip[base_name.lower()] = file_info.file_size
|
|
if file_info.file_size == 0:
|
|
empty_files.add(base_name.lower())
|
|
required_files = [file.lower() for file in required_files]
|
|
missing_files = [f for f in required_files if f not in files_in_zip]
|
|
required_empty_files = [f for f in required_files if f in empty_files]
|
|
if missing_files:
|
|
print(f"Missing required files: {missing_files}")
|
|
if required_empty_files:
|
|
print(f"Required files with 0 KB: {required_empty_files}")
|
|
return not missing_files and not required_empty_files
|
|
except zipfile.BadZipFile:
|
|
error = "The file is not a valid ZIP archive."
|
|
raise ValueError(error)
|
|
except Exception as e:
|
|
error = f"An error occurred: {e}"
|
|
raise RuntimeError(error)
|
|
|
|
def extract_custom_model(file_src, session, required_files=None):
|
|
try:
|
|
model_path = None
|
|
if required_files is None:
|
|
required_files = models[session['tts_engine']][default_fine_tuned]['files']
|
|
model_name = re.sub('.zip', '', os.path.basename(file_src), flags=re.IGNORECASE)
|
|
model_name = get_sanitized(model_name)
|
|
with zipfile.ZipFile(file_src, 'r') as zip_ref:
|
|
files = zip_ref.namelist()
|
|
files_length = len(files)
|
|
tts_dir = session['tts_engine']
|
|
model_path = os.path.join(session['custom_model_dir'], tts_dir, model_name)
|
|
if os.path.exists(model_path):
|
|
print(f'{model_path} already exists, bypassing files extraction')
|
|
return model_path
|
|
os.makedirs(model_path, exist_ok=True)
|
|
with tqdm(total=files_length, unit='files') as t:
|
|
for f in files:
|
|
if f in required_files:
|
|
zip_ref.extract(f, model_path)
|
|
t.update(1)
|
|
os.remove(file_src)
|
|
if model_path is not None:
|
|
msg = f'Extracted files to {model_path}'
|
|
print(msg)
|
|
return model_name
|
|
else:
|
|
error = f'An error occured when unzip {file_src}'
|
|
return None
|
|
except asyncio.exceptions.CancelledError:
|
|
DependencyError(e)
|
|
os.remove(file_src)
|
|
return None
|
|
except Exception as e:
|
|
DependencyError(e)
|
|
os.remove(file_src)
|
|
return None
|
|
|
|
def hash_proxy_dict(proxy_dict):
|
|
return hashlib.md5(str(proxy_dict).encode('utf-8')).hexdigest()
|
|
|
|
def calculate_hash(filepath, hash_algorithm='sha256'):
|
|
hash_func = hashlib.new(hash_algorithm)
|
|
with open(filepath, 'rb') as f:
|
|
while chunk := f.read(8192): # Read in chunks to handle large files
|
|
hash_func.update(chunk)
|
|
return hash_func.hexdigest()
|
|
|
|
def compare_files_by_hash(file1, file2, hash_algorithm='sha256'):
|
|
return calculate_hash(file1, hash_algorithm) == calculate_hash(file2, hash_algorithm)
|
|
|
|
def compare_dict_keys(d1, d2):
|
|
if not isinstance(d1, Mapping) or not isinstance(d2, Mapping):
|
|
return d1 == d2
|
|
d1_keys = set(d1.keys())
|
|
d2_keys = set(d2.keys())
|
|
missing_in_d2 = d1_keys - d2_keys
|
|
missing_in_d1 = d2_keys - d1_keys
|
|
if missing_in_d2 or missing_in_d1:
|
|
return {
|
|
"missing_in_d2": missing_in_d2,
|
|
"missing_in_d1": missing_in_d1,
|
|
}
|
|
for key in d1_keys.intersection(d2_keys):
|
|
nested_result = compare_keys(d1[key], d2[key])
|
|
if nested_result:
|
|
return {key: nested_result}
|
|
return None
|
|
|
|
def proxy_to_dict(proxy_obj):
|
|
def recursive_copy(source, visited):
|
|
# Handle circular references by tracking visited objects
|
|
if id(source) in visited:
|
|
return None # Stop processing circular references
|
|
visited.add(id(source)) # Mark as visited
|
|
if isinstance(source, dict):
|
|
result = {}
|
|
for key, value in source.items():
|
|
result[key] = recursive_copy(value, visited)
|
|
return result
|
|
elif isinstance(source, list):
|
|
return [recursive_copy(item, visited) for item in source]
|
|
elif isinstance(source, set):
|
|
return list(source)
|
|
elif isinstance(source, (int, float, str, bool, type(None))):
|
|
return source
|
|
elif isinstance(source, DictProxy):
|
|
# Explicitly handle DictProxy objects
|
|
return recursive_copy(dict(source), visited) # Convert DictProxy to dict
|
|
else:
|
|
return str(source) # Convert non-serializable types to strings
|
|
return recursive_copy(proxy_obj, set())
|
|
|
|
def maths_to_words(text, lang, lang_iso1, tts_engine):
|
|
def check_compat():
|
|
try:
|
|
num2words(1, lang=lang_iso1)
|
|
return True
|
|
except NotImplementedError:
|
|
return False
|
|
except Exception as e:
|
|
return False
|
|
def rep_num(match):
|
|
number = match.group().replace(",", "") # Remove commas for proper conversion
|
|
try:
|
|
if "." in number or "e" in number or "E" in number:
|
|
return num2words(float(number), lang=lang_iso1)
|
|
return num2words(int(number), lang=lang_iso1)
|
|
except ValueError:
|
|
return number # If conversion fails, return original number
|
|
is_num2words_compat = check_compat()
|
|
phonemes_list = language_math_phonemes.get(lang, language_math_phonemes[default_language_code])
|
|
# Separate ambiguous and non-ambiguous symbols
|
|
ambiguous_symbols = {"-", "/", "*", "x"}
|
|
replacements = {k: v for k, v in phonemes_list.items() if not k.isdigit()} # Keep only math symbols
|
|
normal_replacements = {k: v for k, v in replacements.items() if k not in ambiguous_symbols}
|
|
ambiguous_replacements = {k: v for k, v in replacements.items() if k in ambiguous_symbols}
|
|
# Replace unambiguous math symbols normally
|
|
if normal_replacements:
|
|
math_pattern = r'(' + '|'.join(map(re.escape, normal_replacements.keys())) + r')'
|
|
text = re.sub(math_pattern, lambda m: f" {normal_replacements[m.group(0)]} ", text)
|
|
# Regex pattern for ambiguous symbols (match only valid equations)
|
|
ambiguous_pattern = (
|
|
r'(?<!\S)(\d+)\s*([-/*x])\s*(\d+)(?!\S)|' # Matches "num SYMBOL num" (e.g., "3 + 5", "7-2", "8 * 4")
|
|
r'(?<!\S)([-/*x])\s*(\d+)(?!\S)' # Matches "SYMBOL num" (e.g., "-4", "/ 9")
|
|
)
|
|
def replace_ambiguous(match):
|
|
if match.group(2): # "num SYMBOL num" case
|
|
return f"{match.group(1)} {ambiguous_replacements[str(match.group(2))]} {match.group(3)}"
|
|
elif match.group(3): # "SYMBOL num" case
|
|
return f"{ambiguous_replacements[str(match.group(3))]} {match.group(4)}"
|
|
return match.group(0)
|
|
if ambiguous_replacements:
|
|
text = re.sub(ambiguous_pattern, replace_ambiguous, text)
|
|
# Regex pattern for detecting numbers (handles negatives, commas, decimals, scientific notation)
|
|
number_pattern = r'(?<!\S)(-?\d{1,3}(?:,\d{3})*(?:\.\d+)?(?:[eE][-+]?\d+)?)(?!\S)'
|
|
if tts_engine != XTTSv2:
|
|
if is_num2words_compat:
|
|
text = re.sub(number_pattern, rep_num, text)
|
|
else:
|
|
# Fallback: Replace numbers using phonemes dictionary
|
|
sorted_numbers = sorted((k for k in phonemes_list if k.isdigit()), key=len, reverse=True)
|
|
if sorted_numbers:
|
|
number_pattern = r'\b(' + '|'.join(map(re.escape, sorted_numbers)) + r')\b'
|
|
text = re.sub(number_pattern, lambda match: phonemes_list[match.group(0)], text)
|
|
return text.strip()
|
|
|
|
def normalize_text(text, lang, lang_iso1, tts_engine):
|
|
# Replace NBSP with a normal space
|
|
text = text.replace("\xa0", " ")
|
|
if lang in abbreviations_mapping:
|
|
pattern = r'\b(' + '|'.join(re.escape(k) for k in abbreviations_mapping[lang]) + r')\b'
|
|
text = re.sub(pattern, lambda match: abbreviations_mapping[lang].get(match.group(0), match.group(0)), text)
|
|
# Replace multiple newlines ("\n\n", "\r\r", "\n\r") with " . " as many times as they occur
|
|
text = re.sub('(\r\n|\n\n|\r\r|\n\r)+', lambda m: ' . ' * (m.group().count("\n") // 2 + m.group().count("\r") // 2), text)
|
|
# Replace single newlines ("\n" or "\r") with spaces
|
|
text = re.sub(r'[\r\n]', ' ', text)
|
|
# Replace tabs ("\t") with equivalent spaces
|
|
text = re.sub(r'\t+', lambda m: ' ' * len(m.group()), text)
|
|
# replace roman numbers by digits
|
|
text = replace_roman_numbers(text)
|
|
# Pattern 1: Add a space between UTF-8 characters and numbers
|
|
text = re.sub(r'(?<=[\p{L}])(?=\d)|(?<=\d)(?=[\p{L}])', ' ', text)
|
|
# Pattern 2: Split big numbers into groups of 2
|
|
text = re.sub(r'(\d{2})(?=\d{2}(?!\.\d))', r'\1 ', text)
|
|
# Replace math symbols with words
|
|
text = maths_to_words(text, lang, lang_iso1, tts_engine)
|
|
return text
|
|
|
|
def convert_to_epub(session):
|
|
if session['cancellation_requested']:
|
|
print('Cancel requested')
|
|
return False
|
|
try:
|
|
util_app = shutil.which('ebook-convert')
|
|
if not util_app:
|
|
error = "The 'ebook-convert' utility is not installed or not found."
|
|
print(error)
|
|
return False
|
|
print(f"Running command: {util_app} {session['ebook']} {session['epub_path']} --input-encoding=utf-8 --output-profile=generic_eink --verbose")
|
|
result = subprocess.run(
|
|
[util_app, session['ebook'], session['epub_path'], '--input-encoding=utf-8', '--output-profile=generic_eink', '--verbose'],
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE,
|
|
text=True,
|
|
universal_newlines=True,
|
|
encoding='utf-8'
|
|
)
|
|
print(result.stdout)
|
|
return True
|
|
|
|
except subprocess.CalledProcessError as e:
|
|
print(f"Subprocess error: {e.stderr}")
|
|
DependencyError(e)
|
|
return False
|
|
except FileNotFoundError as e:
|
|
print(f"Utility not found: {e}")
|
|
DependencyError(e)
|
|
return False
|
|
|
|
def get_cover(epubBook, session):
|
|
try:
|
|
if session['cancellation_requested']:
|
|
print('Cancel requested')
|
|
return False
|
|
cover_image = False
|
|
cover_path = os.path.join(session['process_dir'], session['filename_noext'] + '.jpg')
|
|
for item in epubBook.get_items_of_type(ebooklib.ITEM_COVER):
|
|
cover_image = item.get_content()
|
|
break
|
|
if not cover_image:
|
|
for item in epubBook.get_items_of_type(ebooklib.ITEM_IMAGE):
|
|
if 'cover' in item.file_name.lower() or 'cover' in item.get_id().lower():
|
|
cover_image = item.get_content()
|
|
break
|
|
if cover_image:
|
|
with open(cover_path, 'wb') as cover_file:
|
|
cover_file.write(cover_image)
|
|
return cover_path
|
|
return True
|
|
except Exception as e:
|
|
DependencyError(e)
|
|
return False
|
|
|
|
def get_chapters(epubBook, session):
|
|
try:
|
|
if session['cancellation_requested']:
|
|
print('Cancel requested')
|
|
return False
|
|
# Step 1: Get all documents and their patterns
|
|
all_docs = list(epubBook.get_items_of_type(ebooklib.ITEM_DOCUMENT))
|
|
if not all_docs:
|
|
return False
|
|
all_docs = all_docs[1:] # Exclude the first document if needed
|
|
# Cache filtered parts to avoid redundant calls
|
|
doc_cache = {}
|
|
msg = '******* NOTE: YOU CAN SAFELY IGNORE "Character xx not found in the vocabulary." *******'
|
|
print(msg)
|
|
for doc in all_docs:
|
|
doc_cache[doc] = filter_chapter(doc, session['language'], session['language_iso1'], session['tts_engine'])
|
|
# Step 2: Determine the most common pattern
|
|
doc_patterns = [filter_pattern(str(doc)) for doc in all_docs if filter_pattern(str(doc))]
|
|
most_common_pattern = filter_doc(doc_patterns)
|
|
# Step 3: Calculate average character length of selected docs
|
|
char_length = [len(content) for content in doc_cache.values()]
|
|
average_char_length = sum(char_length) / len(char_length) if char_length else 0
|
|
# Step 4: Filter docs based on average character length or repetitive pattern
|
|
final_selected_docs = [
|
|
doc for doc in all_docs
|
|
if len(doc_cache[doc]) >= average_char_length or filter_pattern(str(doc)) == most_common_pattern
|
|
]
|
|
# Step 5: Extract parts from the final selected docs
|
|
chapters = [doc_cache[doc] for doc in final_selected_docs]
|
|
# Add introductory metadata if available
|
|
#creator = session['metadata'].get('creator') or ''
|
|
#title = session['metadata']['title'] or ''
|
|
#intro = f"{creator} , \n\n{title} , \n\n"
|
|
#intro = normalize_text(intro, session['language'], session['language_iso1'], session['tts_engine'])
|
|
#if chapters:
|
|
# chapters[0] = [intro] + chapters[0]
|
|
return chapters
|
|
except Exception as e:
|
|
error = f'Error extracting main content pages: {e}'
|
|
DependencyError(error)
|
|
return None
|
|
|
|
def filter_chapter(doc, lang, lang_iso1, tts_engine):
|
|
soup = BeautifulSoup(doc.get_body_content(), 'html.parser')
|
|
# Remove scripts and styles
|
|
for script in soup(["script", "style"]):
|
|
script.decompose()
|
|
# Rule 1: Ensure spaces before & after punctuation (EXCLUDING `,` and `.`)
|
|
escaped_punctuation = re.escape(''.join(punctuation_list))
|
|
punctuation_pattern_spaces = r'\s*([{}])\s*'.format(escaped_punctuation.replace(',', '').replace('.', ''))
|
|
# Rule 2: Ensure spaces before & after `,` and `.` ONLY when NOT between numbers
|
|
comma_dot_pattern = r'(?<!\d)\s*([,.])\s*(?!\d)'
|
|
# Step 1: Ensure space before and after punctuation (excluding `,` and `.`)
|
|
text = re.sub(punctuation_pattern_spaces, r' \1 ', soup.get_text().strip())
|
|
# Step 2: Ensure space before and after `,` and `.` only when NOT between numbers
|
|
text = re.sub(comma_dot_pattern, r' \1 ', text)
|
|
# Normalize lines and remove unnecessary spaces
|
|
text = normalize_text(text, lang, lang_iso1, tts_engine)
|
|
# Create regex pattern from punctuation list to split the phoneme_list
|
|
escaped_punctuation = re.escape(''.join(punctuation_list))
|
|
punctuation_pattern_split = rf'([^{"".join(escaped_punctuation)}]+|[{escaped_punctuation}])'
|
|
# Split by punctuation marks while keeping the punctuation at the end of each word
|
|
tmp_list = re.findall(punctuation_pattern_split, text)
|
|
phoneme_list = [phoneme.strip() for phoneme in tmp_list if phoneme.strip()]
|
|
# get the final sentence array according to the max_tokens limitation
|
|
max_tokens = language_mapping[lang]['max_tokens']
|
|
chapter_sentences = get_sentences(phoneme_list, max_tokens)
|
|
return chapter_sentences
|
|
|
|
def filter_doc(doc_patterns):
|
|
pattern_counter = Counter(doc_patterns)
|
|
# Returns a list with one tuple: [(pattern, count)]
|
|
most_common = pattern_counter.most_common(1)
|
|
return most_common[0][0] if most_common else None
|
|
|
|
def filter_pattern(doc_identifier):
|
|
docs = doc_identifier.split(':')
|
|
if len(docs) > 2:
|
|
segment = docs[1]
|
|
if re.search(r'[a-zA-Z]', segment) and re.search(r'\d', segment):
|
|
return ''.join([char for char in segment if char.isalpha()])
|
|
elif re.match(r'^[a-zA-Z]+$', segment):
|
|
return segment
|
|
elif re.match(r'^\d+$', segment):
|
|
return 'numbers'
|
|
return None
|
|
|
|
def get_sentences(phoneme_list, max_tokens):
|
|
sentences = []
|
|
current_sentence = ""
|
|
current_phoneme_count = 0
|
|
for phoneme in phoneme_list:
|
|
part_phoneme_count = len(phoneme.split())
|
|
# If adding this phoneme exceeds max tokens, finalize the current sentence
|
|
if current_phoneme_count + part_phoneme_count > max_tokens:
|
|
sentences.append(current_sentence.strip())
|
|
current_sentence = phoneme
|
|
current_phoneme_count = part_phoneme_count
|
|
else:
|
|
# Add the phoneme to the current sentence
|
|
current_sentence += (" " if current_sentence else "") + phoneme
|
|
current_phoneme_count += part_phoneme_count
|
|
# Handle overly long rows by splitting them in the middle
|
|
if current_phoneme_count > max_tokens * 2:
|
|
new_phoneme_list = current_sentence.split()
|
|
mid_point = len(new_phoneme_list) // 2
|
|
sentences.append(" ".join(new_phoneme_list[:mid_point]).strip())
|
|
current_sentence = " ".join(new_phoneme_list[mid_point:]).strip()
|
|
current_word_count = len(current_sentence.split())
|
|
# Add the final sentence if any content remains
|
|
if current_sentence:
|
|
sentences.append(current_sentence.strip())
|
|
return sentences
|
|
|
|
def get_batch_size(list, session):
|
|
total_size = 0
|
|
print(list)
|
|
for file in list:
|
|
try:
|
|
total_size += os.path.getsize(os.path.join(session['chapters_dir'], file))
|
|
except Exception as e:
|
|
error = f'Something wrong trying to get the size of {file}'
|
|
print(error)
|
|
pass
|
|
avg_size = total_size / len(list) if list else 0
|
|
if avg_size > 0:
|
|
avail_memory = psutil.virtual_memory().available * 0.6
|
|
return int(avail_memory // avg_size)
|
|
return 16
|
|
|
|
def get_sanitized(str, replacement="_"):
|
|
sanitized = re.sub(r'\s+', replacement, str)
|
|
sanitized = re.sub(r'[<>:"/\\|?*]', replacement, sanitized)
|
|
sanitized = sanitized.strip()
|
|
return sanitized
|
|
|
|
def convert_chapters_to_audio(session):
|
|
try:
|
|
if session['cancellation_requested']:
|
|
print('Cancel requested')
|
|
return False
|
|
progress_bar = None
|
|
if is_gui_process:
|
|
progress_bar = gr.Progress(track_tqdm=True)
|
|
|
|
tts_manager = TTSManager(session)
|
|
if not tts_manager:
|
|
return False
|
|
|
|
resume_chapter = 0
|
|
resume_sentence = 0
|
|
|
|
# Check existing files to resume the process if it was interrupted
|
|
existing_chapters = sorted([f for f in os.listdir(session['chapters_dir']) if f.endswith(f'.{default_audio_proc_format}')])
|
|
existing_sentences = sorted([f for f in os.listdir(session['chapters_dir_sentences']) if f.endswith(f'.{default_audio_proc_format}')])
|
|
|
|
if existing_chapters:
|
|
count_chapter_files = len(existing_chapters)
|
|
resume_chapter = count_chapter_files - 1 if count_chapter_files > 0 else 0
|
|
print(f'Resuming from part {count_chapter_files}')
|
|
if existing_sentences:
|
|
resume_sentence = len(existing_sentences)
|
|
print(f'Resuming from sentence {resume_sentence}')
|
|
|
|
total_chapters = len(session['chapters'])
|
|
total_sentences = sum(len(array) for array in session['chapters'])
|
|
sentence_number = 0
|
|
|
|
with tqdm(total=total_sentences, desc='convertsion 0.00%', bar_format='{desc}: {n_fmt}/{total_fmt} ', unit='step', initial=resume_sentence) as t:
|
|
t.n = resume_sentence
|
|
for x in range(resume_chapter, total_chapters):
|
|
chapter_num = x + 1
|
|
chapter_audio_file = f'chapter_{chapter_num}.{default_audio_proc_format}'
|
|
sentences = session['chapters'][x]
|
|
sentences_count = len(sentences)
|
|
# Mark the starting sentence of the chapter
|
|
start = sentence_number
|
|
msg = f'Part {chapter_num} containing {sentences_count} sentences...'
|
|
print(msg)
|
|
for i, sentence in enumerate(sentences):
|
|
if session['cancellation_requested']:
|
|
msg = 'Cancel requested'
|
|
print(msg)
|
|
return False
|
|
if sentence_number >= resume_sentence:
|
|
tts_manager.params['sentence_audio_file'] = os.path.join(session['chapters_dir_sentences'], f'{sentence_number}.{default_audio_proc_format}')
|
|
tts_manager.params['sentence'] = sentence
|
|
if tts_manager.convert_sentence_to_audio():
|
|
percentage = (sentence_number / total_sentences) * 100
|
|
t.set_description(f'Converting {percentage:.2f}%')
|
|
msg = f'\nSentence: {sentence}'
|
|
print(msg)
|
|
else:
|
|
return False
|
|
t.update(1)
|
|
if progress_bar is not None:
|
|
progress_bar(sentence_number / total_sentences)
|
|
sentence_number += 1
|
|
end = sentence_number - 1 if sentence_number > 1 else sentence_number
|
|
msg = f"\nEnd of Part {chapter_num}"
|
|
print(msg)
|
|
if sentence_number >= resume_sentence:
|
|
if combine_audio_sentences(chapter_audio_file, start, end, session):
|
|
msg = f'Combining part {chapter_num} to audio, sentence {start} to {end}'
|
|
print(msg)
|
|
else:
|
|
msg = 'combine_audio_sentences() failed!'
|
|
print(msg)
|
|
return False
|
|
return True
|
|
except Exception as e:
|
|
DependencyError(e)
|
|
return False
|
|
|
|
def combine_audio_sentences(chapter_audio_file, start, end, session):
|
|
try:
|
|
chapter_audio_file = os.path.join(session['chapters_dir'], chapter_audio_file)
|
|
combined_audio = AudioSegment.empty()
|
|
# Get all audio sentence files sorted by their numeric indices
|
|
sentence_files = [f for f in os.listdir(session['chapters_dir_sentences']) if f.endswith(f'.{default_audio_proc_format}')]
|
|
sentences_dir_ordered = sorted(sentence_files, key=lambda x: int(re.search(r'\d+', x).group()))
|
|
# Filter the files in the range [start, end]
|
|
selected_files = [
|
|
f for f in sentences_dir_ordered
|
|
if start <= int(''.join(filter(str.isdigit, os.path.basename(f)))) <= end
|
|
]
|
|
for f in selected_files:
|
|
if session['cancellation_requested']:
|
|
msg = 'Cancel requested'
|
|
print(msg)
|
|
return False
|
|
audio_segment = AudioSegment.from_file(os.path.join(session['chapters_dir_sentences'], f), format=default_audio_proc_format)
|
|
combined_audio += audio_segment
|
|
combined_audio.export(chapter_audio_file, format=default_audio_proc_format)
|
|
msg = f'********* Combined part audio file saved to {chapter_audio_file}'
|
|
print(msg)
|
|
return True
|
|
except Exception as e:
|
|
DependencyError(e)
|
|
return False
|
|
|
|
def combine_audio_chapters(session):
|
|
def assemble_audio():
|
|
try:
|
|
combined_audio = AudioSegment.empty()
|
|
batch_size = get_batch_size(chapter_files, session)
|
|
print(f'************ DYNAMIC BATCH SIZE SET TO {batch_size} ******************')
|
|
# Process the part files in batches
|
|
for i in range(0, len(chapter_files), batch_size):
|
|
batch_files = chapter_files[i:i + batch_size]
|
|
batch_audio = AudioSegment.empty() # Initialize an empty AudioSegment for the batch
|
|
# Sequentially append each file in the current batch to the batch_audio
|
|
for chapter_file in batch_files:
|
|
if session['cancellation_requested']:
|
|
print('Cancel requested')
|
|
return False
|
|
audio_segment = AudioSegment.from_file(os.path.join(session['chapters_dir'], chapter_file), format=default_audio_proc_format)
|
|
batch_audio += audio_segment
|
|
combined_audio += batch_audio
|
|
combined_audio.export(combined_chapters_file, format=default_audio_proc_format)
|
|
msg = f'********* total audio parts saved to {combined_chapters_file}'
|
|
print(msg)
|
|
return True
|
|
except Exception as e:
|
|
DependencyError(e)
|
|
return False
|
|
|
|
def generate_ffmpeg_metadata():
|
|
try:
|
|
if session['cancellation_requested']:
|
|
print('Cancel requested')
|
|
return False
|
|
ffmpeg_metadata = ';FFMETADATA1\n'
|
|
if session['metadata'].get('title'):
|
|
ffmpeg_metadata += f"title={session['metadata']['title']}\n"
|
|
if session['metadata'].get('creator'):
|
|
ffmpeg_metadata += f"artist={session['metadata']['creator']}\n"
|
|
if session['metadata'].get('language'):
|
|
ffmpeg_metadata += f"language={session['metadata']['language']}\n\n"
|
|
if session['metadata'].get('publisher'):
|
|
ffmpeg_metadata += f"publisher={session['metadata']['publisher']}\n"
|
|
if session['metadata'].get('description'):
|
|
ffmpeg_metadata += f"description={session['metadata']['description']}\n"
|
|
if session['metadata'].get('published'):
|
|
# Check if the timestamp contains fractional seconds
|
|
if '.' in session['metadata']['published']:
|
|
# Parse with fractional seconds
|
|
year = datetime.strptime(session['metadata']['published'], '%Y-%m-%dT%H:%M:%S.%f%z').year
|
|
else:
|
|
# Parse without fractional seconds
|
|
year = datetime.strptime(session['metadata']['published'], '%Y-%m-%dT%H:%M:%S%z').year
|
|
else:
|
|
# If published is not provided, use the current year
|
|
year = datetime.now().year
|
|
ffmpeg_metadata += f'year={year}\n'
|
|
if session['metadata'].get('identifiers') and isinstance(session['metadata'].get('identifiers'), dict):
|
|
isbn = session['metadata']['identifiers'].get('isbn', None)
|
|
if isbn:
|
|
ffmpeg_metadata += f'isbn={isbn}\n' # ISBN
|
|
mobi_asin = session['metadata']['identifiers'].get('mobi-asin', None)
|
|
if mobi_asin:
|
|
ffmpeg_metadata += f'asin={mobi_asin}\n' # ASIN
|
|
start_time = 0
|
|
for index, chapter_file in enumerate(chapter_files):
|
|
if session['cancellation_requested']:
|
|
msg = 'Cancel requested'
|
|
print(msg)
|
|
return False
|
|
duration_ms = len(AudioSegment.from_file(os.path.join(session['chapters_dir'],chapter_file), format=default_audio_proc_format))
|
|
ffmpeg_metadata += f'[CHAPTER]\nTIMEBASE=1/1000\nSTART={start_time}\n'
|
|
ffmpeg_metadata += f'END={start_time + duration_ms}\ntitle=Part {index + 1}\n'
|
|
start_time += duration_ms
|
|
# Write the metadata to the file
|
|
with open(metadata_file, 'w', encoding='utf-8') as f:
|
|
f.write(ffmpeg_metadata)
|
|
return True
|
|
except Exception as e:
|
|
DependencyError(e)
|
|
return False
|
|
|
|
def export_audio():
|
|
try:
|
|
if session['cancellation_requested']:
|
|
print('Cancel requested')
|
|
return False
|
|
ffmpeg_cover = None
|
|
ffmpeg_combined_audio = combined_chapters_file
|
|
ffmpeg_metadata_file = metadata_file
|
|
ffmpeg_final_file = final_file
|
|
if session['cover'] is not None:
|
|
ffmpeg_cover = session['cover']
|
|
ffmpeg_cmd = [shutil.which('ffmpeg'), '-i', ffmpeg_combined_audio, '-i', ffmpeg_metadata_file]
|
|
if session['output_format'] == 'wav':
|
|
ffmpeg_cmd += ['-map', '0:a']
|
|
elif session['output_format'] == 'aac':
|
|
ffmpeg_cmd += ['-c:a', 'aac', '-b:a', '128k', '-ar', '44100']
|
|
else:
|
|
if ffmpeg_cover is not None:
|
|
if session['output_format'] == 'mp3' or session['output_format'] == 'm4a' or session['output_format'] == 'm4b' or session['output_format'] == 'mp4' or session['output_format'] == 'flac':
|
|
ffmpeg_cmd += ['-i', ffmpeg_cover]
|
|
ffmpeg_cmd += ['-map', '0:a', '-map', '2:v']
|
|
if ffmpeg_cover.endswith('.png'):
|
|
ffmpeg_cmd += ['-c:v', 'png', '-disposition:v', 'attached_pic'] # PNG cover
|
|
else:
|
|
ffmpeg_cmd += ['-c:v', 'copy', '-disposition:v', 'attached_pic'] # JPEG cover (no re-encoding needed)
|
|
elif session['output_format'] == 'mov':
|
|
ffmpeg_cmd += ['-framerate', '1', '-loop', '1', '-i', ffmpeg_cover]
|
|
ffmpeg_cmd += ['-map', '0:a', '-map', '2:v', '-shortest']
|
|
elif session['output_format'] == 'webm':
|
|
ffmpeg_cmd += ['-framerate', '1', '-loop', '1', '-i', ffmpeg_cover]
|
|
ffmpeg_cmd += ['-map', '0:a', '-map', '2:v']
|
|
ffmpeg_cmd += ['-c:v', 'libvpx-vp9', '-crf', '40', '-speed', '8', '-shortest']
|
|
elif session['output_format'] == 'ogg':
|
|
ffmpeg_cmd += ['-framerate', '1', '-loop', '1', '-i', ffmpeg_cover]
|
|
ffmpeg_cmd += ['-filter_complex', '[2:v:0][0:a:0]concat=n=1:v=1:a=1[outv][rawa];[rawa]afftdn=nf=-70[outa]', '-map', '[outv]', '-map', '[outa]', '-shortest']
|
|
if ffmpeg_cover.endswith('.png'):
|
|
ffmpeg_cmd += ['-pix_fmt', 'yuv420p']
|
|
else:
|
|
ffmpeg_cmd += ['-map', '0:a']
|
|
if session['output_format'] == 'm4a' or session['output_format'] == 'm4b' or session['output_format'] == 'mp4':
|
|
ffmpeg_cmd += ['-c:a', 'aac', '-b:a', '128k', '-ar', '44100']
|
|
ffmpeg_cmd += ['-movflags', '+faststart']
|
|
elif session['output_format'] == 'webm':
|
|
ffmpeg_cmd += ['-c:a', 'libopus', '-b:a', '64k']
|
|
elif session['output_format'] == 'ogg':
|
|
ffmpeg_cmd += ['-c:a', 'libopus', '-b:a', '128k', '-compression_level', '0']
|
|
elif session['output_format'] == 'flac':
|
|
ffmpeg_cmd += ['-c:a', 'flac', '-compression_level', '4']
|
|
elif session['output_format'] == 'mp3':
|
|
ffmpeg_cmd += ['-c:a', 'libmp3lame', '-b:a', '128k', '-ar', '44100']
|
|
if session['output_format'] != 'ogg':
|
|
ffmpeg_cmd += ['-af', 'afftdn=nf=-70']
|
|
ffmpeg_cmd += ['-strict', 'experimental', '-map_metadata', '1']
|
|
ffmpeg_cmd += ['-threads', '8', '-y', ffmpeg_final_file]
|
|
try:
|
|
process = subprocess.Popen(
|
|
ffmpeg_cmd,
|
|
env={},
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.STDOUT,
|
|
encoding='utf-8'
|
|
)
|
|
for line in process.stdout:
|
|
print(line, end='') # Print each line of stdout
|
|
process.wait()
|
|
if process.returncode == 0:
|
|
return True
|
|
else:
|
|
error = process.returncode
|
|
print(error, ffmpeg_cmd)
|
|
return False
|
|
except subprocess.CalledProcessError as e:
|
|
DependencyError(e)
|
|
return False
|
|
|
|
except Exception as e:
|
|
DependencyError(e)
|
|
return False
|
|
try:
|
|
chapter_files = [f for f in os.listdir(session['chapters_dir']) if f.endswith(f'.{default_audio_proc_format}')]
|
|
chapter_files = sorted(chapter_files, key=lambda x: int(re.search(r'\d+', x).group()))
|
|
if len(chapter_files) > 0:
|
|
combined_chapters_file = os.path.join(session['process_dir'], session['metadata']['title'] + '.' + default_audio_proc_format)
|
|
metadata_file = os.path.join(session['process_dir'], 'metadata.txt')
|
|
if assemble_audio():
|
|
if generate_ffmpeg_metadata():
|
|
final_name = get_sanitized(session['metadata']['title'] + '.' + session['output_format'])
|
|
docker_final_file = os.path.join(session['process_dir'], final_name)
|
|
final_file = os.path.join(session['audiobooks_dir'], final_name)
|
|
if export_audio():
|
|
return final_file
|
|
else:
|
|
error = 'No part files exists!'
|
|
print(error)
|
|
return None
|
|
except Exception as e:
|
|
DependencyError(e)
|
|
return False
|
|
|
|
def replace_roman_numbers(text):
|
|
def roman_to_int(s):
|
|
try:
|
|
roman = {'I': 1, 'V': 5, 'X': 10, 'L': 50, 'C': 100, 'D': 500, 'M': 1000,
|
|
'IV': 4, 'IX': 9, 'XL': 40, 'XC': 90, 'CD': 400, 'CM': 900}
|
|
i = 0
|
|
num = 0
|
|
# Iterate over the string to calculate the integer value
|
|
while i < len(s):
|
|
# Check for two-character numerals (subtractive combinations)
|
|
if i + 1 < len(s) and s[i:i+2] in roman:
|
|
num += roman[s[i:i+2]]
|
|
i += 2
|
|
else:
|
|
# Add the value of the single character
|
|
num += roman[s[i]]
|
|
i += 1
|
|
return num
|
|
except Exception as e:
|
|
return s
|
|
|
|
roman_chapter_pattern = re.compile(
|
|
r'\b(chapter|volume|chapitre|tome|capitolo|capítulo|volumen|Kapitel|глава|том|κεφάλαιο|τόμος|capitul|poglavlje)\s'
|
|
r'(M{0,4}(CM|CD|D?C{0,3})(XC|XL|L?X{0,3})(IX|IV|V?I{0,3})|[IVXLCDM]+)\b',
|
|
re.IGNORECASE
|
|
)
|
|
|
|
roman_numerals_with_period = re.compile(
|
|
r'^(M{0,4}(CM|CD|D?C{0,3})(XC|XL|L?X{0,3})(IX|IV|V?I{0,3})|[IVXLCDM])\.+'
|
|
)
|
|
|
|
def replace_chapter_match(match):
|
|
chapter_word = match.group(1)
|
|
roman_numeral = match.group(2)
|
|
integer_value = roman_to_int(roman_numeral.upper())
|
|
return f'{chapter_word.capitalize()} {integer_value}'
|
|
|
|
def replace_numeral_with_period(match):
|
|
roman_numeral = match.group(1)
|
|
integer_value = roman_to_int(roman_numeral)
|
|
return f'{integer_value}.'
|
|
|
|
text = roman_chapter_pattern.sub(replace_chapter_match, text)
|
|
text = roman_numerals_with_period.sub(replace_numeral_with_period, text)
|
|
return text
|
|
|
|
def delete_unused_tmp_dirs(web_dir, days, session):
|
|
dir_array = [
|
|
tmp_dir,
|
|
web_dir,
|
|
os.path.join(models_dir, '__sessions'),
|
|
os.path.join(voices_dir, '__sessions')
|
|
]
|
|
current_user_dirs = {
|
|
f"ebook-{session['id']}",
|
|
f"web-{session['id']}",
|
|
f"voice-{session['id']}",
|
|
f"model-{session['id']}"
|
|
}
|
|
current_time = time.time()
|
|
threshold_time = current_time - (days * 24 * 60 * 60) # Convert days to seconds
|
|
for dir_path in dir_array:
|
|
if not os.path.exists(dir_path) or not os.path.isdir(dir_path):
|
|
continue
|
|
for dir in os.listdir(dir_path):
|
|
if dir in current_user_dirs:
|
|
continue
|
|
full_dir_path = os.path.join(dir_path, dir) # Use a new variable
|
|
if not os.path.isdir(full_dir_path):
|
|
continue
|
|
try:
|
|
dir_mtime = os.path.getmtime(full_dir_path)
|
|
dir_ctime = os.path.getctime(full_dir_path)
|
|
if dir_mtime < threshold_time and dir_ctime < threshold_time:
|
|
shutil.rmtree(full_dir_path)
|
|
print(f"Deleted unmatched or old directory: {full_dir_path}")
|
|
except Exception as e:
|
|
print(f"Error deleting {full_dir_path}: {e}")
|
|
|
|
def compare_file_metadata(f1, f2):
|
|
if os.path.getsize(f1) != os.path.getsize(f2):
|
|
return False
|
|
if os.path.getmtime(f1) != os.path.getmtime(f2):
|
|
return False
|
|
return True
|
|
|
|
def get_compatible_tts_engines(language):
|
|
compatible_engines = [
|
|
tts for tts in models.keys()
|
|
if language in language_tts.get(tts, {})
|
|
]
|
|
return compatible_engines
|
|
|
|
def convert_ebook_batch(args):
|
|
if isinstance(args['ebook_list'], list):
|
|
ebook_list = args['ebook_list'][:]
|
|
for file in ebook_list: # Use a shallow copy
|
|
if any(file.endswith(ext) for ext in ebook_formats):
|
|
args['ebook'] = file
|
|
print(f'Processing eBook file: {os.path.basename(file)}')
|
|
progress_status, audiobook_file = convert_ebook(args)
|
|
if audiobook_file is None:
|
|
print(f'Conversion failed: {progress_status}')
|
|
sys.exit(1)
|
|
args['ebook_list'].remove(file)
|
|
reset_ebook_session(args['session'])
|
|
return progress_status, audiobook_file
|
|
else:
|
|
print(f'the ebooks source is not a list!')
|
|
sys.exit(1)
|
|
|
|
def convert_ebook(args):
|
|
try:
|
|
global is_gui_process, context
|
|
error = None
|
|
id = None
|
|
info_session = None
|
|
if args['language'] is not None:
|
|
|
|
if not os.path.splitext(args['ebook'])[1]:
|
|
error = 'The selected ebook file has no extension. Please select a valid file.'
|
|
print(error)
|
|
return error, None
|
|
|
|
if not os.path.exists(args['ebook']):
|
|
error = 'The ebook path you provided does not exist'
|
|
print(error)
|
|
return error, None
|
|
|
|
try:
|
|
if len(args['language']) == 2:
|
|
lang_array = languages.get(part1=args['language'])
|
|
if lang_array:
|
|
args['language'] = lang_array.part3
|
|
args['language_iso1'] = lang_array.part1
|
|
elif len(args['language']) == 3:
|
|
lang_array = languages.get(part3=args['language'])
|
|
if lang_array:
|
|
args['language'] = lang_array.part3
|
|
args['language_iso1'] = lang_array.part1
|
|
else:
|
|
args['language_iso1'] = None
|
|
except Exception as e:
|
|
pass
|
|
|
|
if args['language'] not in language_mapping.keys():
|
|
error = 'The language you provided is not (yet) supported'
|
|
print(error)
|
|
return error, None
|
|
|
|
is_gui_process = args['is_gui_process']
|
|
id = args['session'] if args['session'] is not None else str(uuid.uuid4())
|
|
session = context.get_session(id)
|
|
session['script_mode'] = args['script_mode'] if args['script_mode'] is not None else NATIVE
|
|
session['ebook'] = args['ebook']
|
|
session['ebook_list'] = args['ebook_list']
|
|
session['device'] = args['device']
|
|
session['language'] = args['language']
|
|
session['language_iso1'] = args['language_iso1']
|
|
session['tts_engine'] = args['tts_engine'] if args['tts_engine'] is not None else get_compatible_tts_engines(args['language'])[0]
|
|
session['custom_model'] = args['custom_model'] if not is_gui_process or args['custom_model'] is None else os.path.join(session['custom_model_dir'], args['custom_model'])
|
|
session['fine_tuned'] = args['fine_tuned']
|
|
session['output_format'] = args['output_format']
|
|
session['temperature'] = args['temperature']
|
|
session['length_penalty'] = args['length_penalty']
|
|
session['num_beams'] = args['num_beams']
|
|
session['repetition_penalty'] = args['repetition_penalty']
|
|
session['top_k'] = args['top_k']
|
|
session['top_p'] = args['top_p']
|
|
session['speed'] = args['speed']
|
|
session['enable_text_splitting'] = args['enable_text_splitting']
|
|
session['audiobooks_dir'] = args['audiobooks_dir']
|
|
session['voice'] = args['voice']
|
|
|
|
info_session = f"\n*********** Session: {id} **************\nStore it in case of interruption, crash, reuse of custom model or custom voice,\nyou can resume the conversion with --session option"
|
|
|
|
if not is_gui_process:
|
|
session['voice_dir'] = os.path.join(voices_dir, '__sessions',f"voice-{session['id']}")
|
|
session['custom_model_dir'] = os.path.join(models_dir, '__sessions',f"model-{session['id']}")
|
|
if session['custom_model'] is not None:
|
|
if not os.path.exists(session['custom_model_dir']):
|
|
os.makedirs(session['custom_model_dir'], exist_ok=True)
|
|
src_path = Path(session['custom_model'])
|
|
src_name = src_path.stem
|
|
if not os.path.exists(os.path.join(session['custom_model_dir'], src_name)):
|
|
if analyze_uploaded_file(session['custom_model']):
|
|
model = extract_custom_model(session['custom_model'], session)
|
|
if model is not None:
|
|
session['custom_model'] = model
|
|
else:
|
|
error = f"{model} could not be extracted or mandatory files are missing"
|
|
else:
|
|
error = f'{os.path.basename(f)} is not a valid model or some required files are missing'
|
|
if session['voice'] is not None:
|
|
os.makedirs(session['voice_dir'], exist_ok=True)
|
|
voice_name = os.path.splitext(os.path.basename(session['voice']))[0].replace('&', 'And').replace(' ', '_')
|
|
voice_name = get_sanitized(voice_name)
|
|
final_voice_file = os.path.join(session['voice_dir'],f'{voice_name}_24000.wav')
|
|
if not os.path.exists(final_voice_file):
|
|
extractor = VoiceExtractor(session, models_dir, session['voice'], voice_name)
|
|
status, msg = extractor.extract_voice()
|
|
if status:
|
|
session['voice'] = os.path.join(session['voice_dir'], f'{voice_name}_24000.wav')
|
|
else:
|
|
error = 'extractor.extract_voice()() failed! Check if you audio file is compatible.'
|
|
print(error)
|
|
if error is None:
|
|
if session['script_mode'] == NATIVE:
|
|
bool, e = check_programs('Calibre', 'ebook-convert', '--version')
|
|
if not bool:
|
|
error = f'check_programs() Calibre failed: {e}'
|
|
bool, e = check_programs('FFmpeg', 'ffmpeg', '-version')
|
|
if not bool:
|
|
error = f'check_programs() FFMPEG failed: {e}'
|
|
if error is None:
|
|
session['session_dir'] = os.path.join(tmp_dir, f"ebook-{session['id']}")
|
|
session['process_dir'] = os.path.join(session['session_dir'], f"{hashlib.md5(session['ebook'].encode()).hexdigest()}")
|
|
session['chapters_dir'] = os.path.join(session['process_dir'], "chapters")
|
|
session['chapters_dir_sentences'] = os.path.join(session['chapters_dir'], 'sentences')
|
|
if prepare_dirs(args['ebook'], session):
|
|
session['filename_noext'] = os.path.splitext(os.path.basename(session['ebook']))[0]
|
|
if session['device'] == 'cuda':
|
|
session['device'] = session['device'] if torch.cuda.is_available() else 'cpu'
|
|
if session['device'] == 'cpu':
|
|
print('GPU is not available on your device!')
|
|
elif session['device'] == 'mps':
|
|
session['device'] = session['device'] if torch.backends.mps.is_available() else 'cpu'
|
|
if session['device'] == 'cpu':
|
|
print('MPS is not available on your device!')
|
|
print(f"Available Processor Unit: {session['device']}")
|
|
session['epub_path'] = os.path.join(session['process_dir'], '__' + session['filename_noext'] + '.epub')
|
|
if convert_to_epub(session):
|
|
epubBook = epub.read_epub(session['epub_path'], {'ignore_ncx': True})
|
|
metadata = dict(session['metadata'])
|
|
for key, value in metadata.items():
|
|
data = epubBook.get_metadata('DC', key)
|
|
if data:
|
|
for value, attributes in data:
|
|
metadata[key] = value
|
|
metadata['language'] = session['language']
|
|
metadata['title'] = os.path.splitext(os.path.basename(session['ebook']))[0].replace('_',' ') if not metadata['title'] else metadata['title']
|
|
metadata['creator'] = False if not metadata['creator'] or metadata['creator'] == 'Unknown' else metadata['creator']
|
|
session['metadata'] = metadata
|
|
|
|
try:
|
|
if len(session['metadata']['language']) == 2:
|
|
lang_array = languages.get(part1=session['language'])
|
|
if lang_array:
|
|
session['metadata']['language'] = lang_array.part3
|
|
except Exception as e:
|
|
pass
|
|
|
|
if session['metadata']['language'] != session['language']:
|
|
error = f"WARNING!!! language selected {session['language']} differs from the EPUB file language {session['metadata']['language']}"
|
|
print(error)
|
|
session['cover'] = get_cover(epubBook, session)
|
|
if session['cover']:
|
|
session['chapters'] = get_chapters(epubBook, session)
|
|
if session['chapters'] is not None:
|
|
if convert_chapters_to_audio(session):
|
|
final_file = combine_audio_chapters(session)
|
|
if final_file is not None:
|
|
chapters_dirs = [
|
|
dir_name for dir_name in os.listdir(session['process_dir'])
|
|
if fnmatch.fnmatch(dir_name, "chapters_*") and os.path.isdir(os.path.join(session['process_dir'], dir_name))
|
|
]
|
|
if is_gui_process:
|
|
if len(chapters_dirs) > 1:
|
|
if os.path.exists(session['chapters_dir']):
|
|
shutil.rmtree(session['chapters_dir'])
|
|
if os.path.exists(session['epub_path']):
|
|
os.remove(session['epub_path'])
|
|
if os.path.exists(session['cover']):
|
|
os.remove(session['cover'])
|
|
else:
|
|
if os.path.exists(session['process_dir']):
|
|
shutil.rmtree(session['process_dir'])
|
|
else:
|
|
if os.path.exists(session['voice_dir']):
|
|
if not any(os.scandir(session['voice_dir'])):
|
|
shutil.rmtree(session['voice_dir'])
|
|
if os.path.exists(session['custom_model_dir']):
|
|
if not any(os.scandir(session['custom_model_dir'])):
|
|
shutil.rmtree(session['custom_model_dir'])
|
|
if os.path.exists(session['session_dir']):
|
|
shutil.rmtree(session['session_dir'])
|
|
progress_status = f'Audiobook {os.path.basename(final_file)} created!'
|
|
print(info_session)
|
|
return progress_status, final_file
|
|
else:
|
|
error = 'combine_audio_chapters() error: final_file not created!'
|
|
else:
|
|
error = 'convert_chapters_to_audio() failed!'
|
|
else:
|
|
error = 'get_chapters() failed!'
|
|
else:
|
|
error = 'get_cover() failed!'
|
|
else:
|
|
error = 'convert_to_epub() failed!'
|
|
else:
|
|
error = f"Temporary directory {session['process_dir']} not removed due to failure."
|
|
else:
|
|
error = f"Language {args['language']} is not supported."
|
|
if session['cancellation_requested']:
|
|
error = 'Cancelled'
|
|
else:
|
|
if not is_gui_process and id is not None:
|
|
error += info_session
|
|
print(error)
|
|
return error, None
|
|
except Exception as e:
|
|
print(f'convert_ebook() Exception: {e}')
|
|
return e, None
|
|
|
|
def restore_session_from_data(data, session):
|
|
try:
|
|
for key, value in data.items():
|
|
if key in session: # Check if the key exists in session
|
|
if isinstance(value, dict) and isinstance(session[key], dict):
|
|
restore_session_from_data(value, session[key])
|
|
else:
|
|
session[key] = value
|
|
except Exception as e:
|
|
alert_exception(e)
|
|
|
|
def reset_ebook_session(id):
|
|
session = context.get_session(id)
|
|
data = {
|
|
"ebook": None,
|
|
"chapters_dir": None,
|
|
"chapters_dir_sentences": None,
|
|
"epub_path": None,
|
|
"filename_noext": None,
|
|
"chapters": None,
|
|
"cover": None,
|
|
"status": None,
|
|
"progress": 0,
|
|
"time": None,
|
|
"cancellation_requested": False,
|
|
"event": None,
|
|
"metadata": {
|
|
"title": None,
|
|
"creator": None,
|
|
"contributor": None,
|
|
"language": None,
|
|
"identifier": None,
|
|
"publisher": None,
|
|
"date": None,
|
|
"description": None,
|
|
"subject": None,
|
|
"rights": None,
|
|
"format": None,
|
|
"type": None,
|
|
"coverage": None,
|
|
"relation": None,
|
|
"Source": None,
|
|
"Modified": None
|
|
}
|
|
}
|
|
restore_session_from_data(data, session)
|
|
|
|
def get_all_ip_addresses():
|
|
ip_addresses = []
|
|
for interface, addresses in psutil.net_if_addrs().items():
|
|
for address in addresses:
|
|
if address.family == socket.AF_INET:
|
|
ip_addresses.append(address.address)
|
|
elif address.family == socket.AF_INET6:
|
|
ip_addresses.append(address.address)
|
|
return ip_addresses
|
|
|
|
def web_interface(args):
|
|
script_mode = args['script_mode']
|
|
is_gui_process = args['is_gui_process']
|
|
is_gui_shared = args['share']
|
|
audiobooks_dir = None
|
|
ebook_src = None
|
|
audiobook_file = None
|
|
language_options = [
|
|
(
|
|
f"{details['name']} - {details['native_name']}" if details['name'] != details['native_name'] else details['name'],
|
|
lang
|
|
)
|
|
for lang, details in language_mapping.items()
|
|
]
|
|
voice_options = []
|
|
tts_engine_options = []
|
|
custom_model_options = []
|
|
fine_tuned_options = []
|
|
audiobook_options = []
|
|
|
|
src_label_file = f'Select a File {ebook_formats}'
|
|
src_label_dir = 'Select a Directory'
|
|
|
|
visible_gr_tab_preferences = interface_component_options['gr_tab_preferences']
|
|
visible_gr_group_custom_model = interface_component_options['gr_group_custom_model']
|
|
visible_gr_group_voice_file = interface_component_options['gr_group_voice_file']
|
|
|
|
# Buffer for real-time log streaming
|
|
log_buffer = Queue()
|
|
|
|
# Event to signal when the process should stop
|
|
thread = None
|
|
stop_event = threading.Event()
|
|
|
|
theme = gr.themes.Origin(
|
|
primary_hue='amber',
|
|
secondary_hue='green',
|
|
neutral_hue='gray',
|
|
radius_size='lg',
|
|
font_mono=['JetBrains Mono', 'monospace', 'Consolas', 'Menlo', 'Liberation Mono']
|
|
)
|
|
"""
|
|
def process_cleanup(state):
|
|
try:
|
|
print('***************PROCESS CLEANING REQUESTED*****************')
|
|
if state['id'] in context.sessions:
|
|
del context.sessions[state['id']]
|
|
except Exception as e:
|
|
error = f'process_cleanup(): {e}'
|
|
alert_exception(error)
|
|
"""
|
|
with gr.Blocks(theme=theme, delete_cache=(86400, 86400)) as interface:
|
|
main_html = gr.HTML(
|
|
'''
|
|
<style>
|
|
/* Global Scrollbar Customization */
|
|
/* The entire scrollbar */
|
|
::-webkit-scrollbar {
|
|
width: 6px !important;
|
|
height: 6px !important;
|
|
cursor: pointer !important;;
|
|
}
|
|
/* The scrollbar track (background) */
|
|
::-webkit-scrollbar-track {
|
|
background: none transparent !important;
|
|
border-radius: 6px !important;
|
|
}
|
|
/* The scrollbar thumb (scroll handle) */
|
|
::-webkit-scrollbar-thumb {
|
|
background: #c09340 !important;
|
|
border-radius: 6px !important;
|
|
}
|
|
/* The scrollbar thumb on hover */
|
|
::-webkit-scrollbar-thumb:hover {
|
|
background: #ff8c00 !important;
|
|
}
|
|
/* Firefox scrollbar styling */
|
|
html {
|
|
scrollbar-width: thin !important;
|
|
scrollbar-color: #c09340 none !important;
|
|
}
|
|
.svelte-1xyfx7i.center.boundedheight.flex{
|
|
height: 120px !important;
|
|
}
|
|
.block.svelte-5y6bt2 {
|
|
padding: 10px !important;
|
|
margin: 0 !important;
|
|
height: auto !important;
|
|
font-size: 16px !important;
|
|
}
|
|
.wrap.svelte-12ioyct {
|
|
padding: 0 !important;
|
|
margin: 0 !important;
|
|
font-size: 12px !important;
|
|
}
|
|
.block.svelte-5y6bt2.padded {
|
|
height: auto !important;
|
|
padding: 10px !important;
|
|
}
|
|
.block.svelte-5y6bt2.padded.hide-container {
|
|
height: auto !important;
|
|
padding: 0 !important;
|
|
}
|
|
.waveform-container.svelte-19usgod {
|
|
height: 58px !important;
|
|
overflow: hidden !important;
|
|
padding: 0 !important;
|
|
margin: 0 !important;
|
|
}
|
|
.component-wrapper.svelte-19usgod {
|
|
height: 110px !important;
|
|
}
|
|
.timestamps.svelte-19usgod {
|
|
display: none !important;
|
|
}
|
|
.controls.svelte-ije4bl {
|
|
padding: 0 !important;
|
|
margin: 0 !important;
|
|
}
|
|
.icon-btn {
|
|
font-size: 30px !important;
|
|
}
|
|
.small-btn {
|
|
font-size: 22px !important;
|
|
width: 60px !important;
|
|
height: 60px !important;
|
|
margin: 0 !important;
|
|
padding: 0 !important;
|
|
}
|
|
.file-preview-holder {
|
|
height: 116px !important;
|
|
overflow: auto !important;
|
|
}
|
|
#component-8, #component-31, #component-15 {
|
|
height: 140px !important;
|
|
}
|
|
#component-27, #component-28 {
|
|
height: 95px !important;
|
|
}
|
|
#component-56 {
|
|
height: 80px !important;
|
|
}
|
|
#component-64 {
|
|
height: 60px !important;
|
|
}
|
|
#component-9 span[data-testid="block-info"], #component-14 span[data-testid="block-info"],
|
|
#component-33 span[data-testid="block-info"], #component-61 span[data-testid="block-info"] {
|
|
display: none !important;
|
|
}
|
|
#voice_player {
|
|
display: block !important;
|
|
margin: 0 !important;
|
|
padding: 0 !important;
|
|
width: 60px !important;
|
|
height: 60px !important;
|
|
}
|
|
#voice_player :is(#waveform, .rewind, .skip, .playback, label, .volume, .empty) {
|
|
display: none !important;
|
|
}
|
|
#voice_player .controls {
|
|
display: block !important;
|
|
position: absolute !important;
|
|
left: 15px !important;
|
|
top: 0 !important;
|
|
}
|
|
#audiobook_player :is(.volume, .empty, .source-selection, .control-wrapper, .settings-wrapper) {
|
|
display: none !important;
|
|
}
|
|
</style>
|
|
<script>
|
|
setInterval(() => {
|
|
const data = window.localStorage.getItem('data');
|
|
if(data){
|
|
const obj = JSON.parse(data);
|
|
if(typeof(obj.id) != 'undefined'){
|
|
if(obj.id != null && obj.id != ""){
|
|
fetch('/api/heartbeat', {
|
|
method: 'POST',
|
|
headers: { 'Content-Type': 'application/json' },
|
|
body: JSON.stringify({ 'id': obj.id })
|
|
});
|
|
}
|
|
}
|
|
}
|
|
}, 5000);
|
|
</script>
|
|
'''
|
|
)
|
|
main_markdown = gr.Markdown(
|
|
f'''
|
|
<h1 style="line-height: 0.7">Ebook2Audiobook v{version}</h1>
|
|
<a href="https://github.com/DrewThomasson/ebook2audiobook" target="_blank" style="line-height:0">https://github.com/DrewThomasson/ebook2audiobook</a>
|
|
<div style="line-height: 1.3;">
|
|
Multiuser, multiprocessing tasks on a geo cluster to share the conversion to the Grid<br/>
|
|
Convert eBooks into immersive audiobooks with realistic TTS model voices.<br/>
|
|
</div>
|
|
'''
|
|
)
|
|
with gr.Tabs():
|
|
gr_tab_main = gr.TabItem('Main Parameters')
|
|
|
|
with gr_tab_main:
|
|
with gr.Row():
|
|
with gr.Column(scale=3):
|
|
with gr.Group():
|
|
gr_ebook_file = gr.File(label=src_label_file, file_types=ebook_formats, file_count='single', allow_reordering=True, height=140)
|
|
gr_ebook_mode = gr.Radio(label='', choices=[('File','single'), ('Directory','directory')], value='single', interactive=True)
|
|
with gr.Group():
|
|
gr_language = gr.Dropdown(label='Language', choices=language_options, value=default_language_code, type='value', interactive=True)
|
|
gr_group_voice_file = gr.Group(visible=visible_gr_group_voice_file)
|
|
with gr_group_voice_file:
|
|
gr_voice_file = gr.File(label='*Cloning Voice Audio Fiie', file_types=voice_formats, value=None, height=140)
|
|
with gr.Row():
|
|
gr_voice_player = gr.Audio(elem_id='voice_player', type='filepath', interactive=False, show_download_button=False, container=False, visible=False, show_share_button=False, show_label=False, waveform_options=gr.WaveformOptions(show_controls=False), scale=0, min_width=60)
|
|
gr_voice_list = gr.Dropdown(label='', choices=voice_options, type='value', interactive=True, scale=2)
|
|
gr_voice_del_btn = gr.Button('🗑', elem_classes=['small-btn'], variant='secondary', interactive=True, visible=False, scale=0, min_width=60)
|
|
gr.Markdown('<p> * Optional</p>')
|
|
with gr.Group():
|
|
gr_device = gr.Radio(label='Processor Unit', choices=[('CPU','cpu'), ('GPU','cuda'), ('MPS','mps')], value=default_device)
|
|
with gr.Column(scale=3):
|
|
with gr.Group():
|
|
gr_tts_engine_list = gr.Dropdown(label='TTS Base', choices=tts_engine_options, type='value', interactive=True)
|
|
gr_fine_tuned_list = gr.Dropdown(label='Fine Tuned Models', choices=fine_tuned_options, type='value', interactive=True)
|
|
gr_group_custom_model = gr.Group(visible=visible_gr_group_custom_model)
|
|
with gr_group_custom_model:
|
|
gr_custom_model_file = gr.File(label=f"*Custom Model Zip File", value=None, file_types=['.zip'], height=140)
|
|
with gr.Row():
|
|
gr_custom_model_list = gr.Dropdown(label='', choices=custom_model_options, type='value', interactive=True, scale=2)
|
|
gr_custom_model_del_btn = gr.Button('🗑', elem_classes=['small-btn'], variant='secondary', interactive=True, visible=False, scale=0, min_width=60)
|
|
gr.Markdown('<p> * Optional</p>')
|
|
with gr.Group():
|
|
gr_session = gr.Textbox(label='Session')
|
|
gr_output_format_list = gr.Dropdown(label='Output format', choices=output_formats, type='value', value=default_output_format, interactive=True)
|
|
gr_tab_preferences = gr.TabItem('Fine Tuned Parameters', visible=visible_gr_tab_preferences)
|
|
|
|
with gr_tab_preferences:
|
|
gr.Markdown(
|
|
'''
|
|
### Customize Audio Generation Parameters
|
|
Adjust the settings below to influence how the audio is generated. You can control the creativity, speed, repetition, and more.
|
|
'''
|
|
)
|
|
gr_temperature = gr.Slider(
|
|
label='Temperature',
|
|
minimum=0.1,
|
|
maximum=10.0,
|
|
step=0.1,
|
|
value=tts_default_settings['temperature'],
|
|
info='Higher values lead to more creative, unpredictable outputs. Lower values make it more monotone.'
|
|
)
|
|
gr_length_penalty = gr.Slider(
|
|
label='Length Penalty',
|
|
minimum=0.3,
|
|
maximum=5.0,
|
|
step=0.1,
|
|
value=tts_default_settings['length_penalty'],
|
|
info='Adjusts how much longer sequences are preferred. Higher values encourage the model to produce longer and more natural speech.'
|
|
)
|
|
gr_num_beams = gr.Slider(
|
|
label='Number Beams',
|
|
minimum=1,
|
|
maximum=10,
|
|
step=1,
|
|
value=tts_default_settings['num_beams'],
|
|
info='Controls how many alternative sequences the model explores. Higher values improve speech coherence and pronunciation but increase inference time.'
|
|
)
|
|
gr_repetition_penalty = gr.Slider(
|
|
label='Repetition Penalty',
|
|
minimum=1.0,
|
|
maximum=10.0,
|
|
step=0.1,
|
|
value=tts_default_settings['repetition_penalty'],
|
|
info='Penalizes repeated phrases. Higher values reduce repetition.'
|
|
)
|
|
gr_top_k = gr.Slider(
|
|
label='Top-k Sampling',
|
|
minimum=10,
|
|
maximum=100,
|
|
step=1,
|
|
value=tts_default_settings['top_k'],
|
|
info='Lower values restrict outputs to more likely words and increase speed at which audio generates.'
|
|
)
|
|
gr_top_p = gr.Slider(
|
|
label='Top-p Sampling',
|
|
minimum=0.1,
|
|
maximum=1.0,
|
|
step=0.01,
|
|
value=tts_default_settings['top_p'],
|
|
info='Controls cumulative probability for word selection. Lower values make the output more predictable and increase speed at which audio generates.'
|
|
)
|
|
gr_speed = gr.Slider(
|
|
label='Speed',
|
|
minimum=0.5,
|
|
maximum=3.0,
|
|
step=0.1,
|
|
value=tts_default_settings['speed'],
|
|
info='Adjusts how fast the narrator will speak.'
|
|
)
|
|
gr_enable_text_splitting = gr.Checkbox(
|
|
label='Enable Text Splitting',
|
|
value=tts_default_settings['enable_text_splitting'],
|
|
info='Coqui-tts builtin text splitting. Can help against hallucinations bu can also be worse.'
|
|
)
|
|
|
|
gr_state = gr.State(value={"hash": None})
|
|
gr_state_alert = gr.State(value={"type": None,"msg": None})
|
|
gr_read_data = gr.JSON(visible=False)
|
|
gr_write_data = gr.JSON(visible=False)
|
|
gr_conversion_progress = gr.Textbox(label='Progress')
|
|
gr_group_audiobook_list = gr.Group(visible=False)
|
|
with gr_group_audiobook_list:
|
|
gr_audiobook_player = gr.Audio(label='Audiobook', elem_id='audiobook_player', type='filepath', show_download_button=False, show_share_button=False, container=True, interactive=False, visible=True)
|
|
with gr.Row():
|
|
gr_audiobook_download_btn = gr.DownloadButton('↧', elem_classes=['small-btn'], variant='secondary', interactive=True, visible=True, scale=0, min_width=60)
|
|
gr_audiobook_list = gr.Dropdown(label='', choices=audiobook_options, type='value', interactive=True, scale=2)
|
|
gr_audiobook_del_btn = gr.Button('🗑', elem_classes=['small-btn'], variant='secondary', interactive=True, visible=True, scale=0, min_width=60)
|
|
gr_convert_btn = gr.Button('📚', elem_classes='icon-btn', variant='primary', interactive=False)
|
|
|
|
gr_modal = gr.HTML(visible=False)
|
|
gr_confirm_field_hidden = gr.Textbox(elem_id='confirm_hidden', visible=False)
|
|
gr_confirm_yes_btn_hidden = gr.Button('', elem_id='confirm_yes_btn_hidden', visible=False)
|
|
gr_confirm_no_btn_hidden = gr.Button('', elem_id='confirm_no_btn_hidden', visible=False)
|
|
|
|
def show_alert(state):
|
|
if isinstance(state, dict):
|
|
if state['type'] is not None:
|
|
if state['type'] == 'error':
|
|
gr.Error(state['msg'])
|
|
elif state['type'] == 'warning':
|
|
gr.Warning(state['msg'])
|
|
elif state['type'] == 'info':
|
|
gr.Info(state['msg'])
|
|
elif state['type'] == 'success':
|
|
gr.Success(state['msg'])
|
|
|
|
def show_modal(type, msg):
|
|
return f'''
|
|
<style>
|
|
.modal {{
|
|
display: none; /* Hidden by default */
|
|
position: fixed;
|
|
top: 0;
|
|
left: 0;
|
|
width: 100%;
|
|
height: 100%;
|
|
background-color: rgba(0, 0, 0, 0.5);
|
|
z-index: 9999;
|
|
display: flex;
|
|
justify-content: center;
|
|
align-items: center;
|
|
}}
|
|
.modal-content {{
|
|
background-color: #333;
|
|
padding: 20px;
|
|
border-radius: 8px;
|
|
text-align: center;
|
|
max-width: 300px;
|
|
box-shadow: 0 4px 8px rgba(0, 0, 0, 0.5);
|
|
border: 2px solid #FFA500;
|
|
color: white;
|
|
font-family: Arial, sans-serif;
|
|
position: relative;
|
|
}}
|
|
.modal-content p {{
|
|
margin: 10px 0;
|
|
}}
|
|
.confirm-buttons {{
|
|
display: flex;
|
|
justify-content: space-evenly;
|
|
margin-top: 20px;
|
|
}}
|
|
.confirm-buttons button {{
|
|
padding: 10px 20px;
|
|
border: none;
|
|
border-radius: 5px;
|
|
font-size: 16px;
|
|
cursor: pointer;
|
|
font-weight: bold;
|
|
}}
|
|
.confirm-buttons .confirm_yes_btn {{
|
|
background-color: #28a745;
|
|
color: white;
|
|
}}
|
|
.confirm-buttons .confirm_no_btn {{
|
|
background-color: #dc3545;
|
|
color: white;
|
|
}}
|
|
.confirm-buttons .confirm_yes_btn:hover {{
|
|
background-color: #34d058;
|
|
}}
|
|
.confirm-buttons .confirm_no_btn:hover {{
|
|
background-color: #ff6f71;
|
|
}}
|
|
/* Spinner */
|
|
.spinner {{
|
|
margin: 15px auto;
|
|
border: 4px solid rgba(255, 255, 255, 0.2);
|
|
border-top: 4px solid #FFA500;
|
|
border-radius: 50%;
|
|
width: 30px;
|
|
height: 30px;
|
|
animation: spin 1s linear infinite;
|
|
}}
|
|
@keyframes spin {{
|
|
0% {{ transform: rotate(0deg); }}
|
|
100% {{ transform: rotate(360deg); }}
|
|
}}
|
|
</style>
|
|
<div id="custom-modal" class="modal">
|
|
<div class="modal-content">
|
|
<p style="color:#ffffff">{msg}</p>
|
|
{show_confirm() if type == 'confirm' else '<div class="spinner"></div>'}
|
|
</div>
|
|
</div>
|
|
'''
|
|
|
|
def show_confirm():
|
|
return '''
|
|
<div class="confirm-buttons">
|
|
<button class="confirm_yes_btn" onclick="document.querySelector('#confirm_yes_btn_hidden').click()">✔</button>
|
|
<button class="confirm_no_btn" onclick="document.querySelector('#confirm_no_btn_hidden').click()">⨉</button>
|
|
</div>
|
|
'''
|
|
|
|
def alert_exception(error):
|
|
gr.Error(error)
|
|
DependencyError(error)
|
|
|
|
def restore_interface(id):
|
|
session = context.get_session(id)
|
|
ebook_data = None
|
|
file_count = session['ebook_mode']
|
|
if isinstance(session['ebook_list'], list) and file_count == 'directory':
|
|
#ebook_data = session['ebook_list']
|
|
ebook_data = None
|
|
elif isinstance(session['ebook'], str) and file_count == 'single':
|
|
ebook_data = session['ebook']
|
|
else:
|
|
ebook_data = None
|
|
return (
|
|
gr.update(value=ebook_data), gr.update(value=session['ebook_mode']), gr.update(value=session['device']),
|
|
gr.update(value=session['language']), update_gr_voice_list(id), update_gr_tts_engine_list(id), update_gr_custom_model_list(id),
|
|
update_gr_fine_tuned_list(id), gr.update(value=session['output_format']), update_gr_audiobook_list(id),
|
|
gr.update(value=session['temperature']), gr.update(value=session['length_penalty']), gr.update(value=session['num_beams']),
|
|
gr.update(value=session['repetition_penalty']), gr.update(value=session['top_k']), gr.update(value=session['top_p']), gr.update(value=session['speed']),
|
|
gr.update(value=session['enable_text_splitting']), gr.update(active=True)
|
|
)
|
|
|
|
def refresh_interface(id):
|
|
session = context.get_session(id)
|
|
session['status'] = None
|
|
return gr.update(interactive=False), gr.update(value=None), update_gr_audiobook_list(id), gr.update(value=session['audiobook']), gr.update(visible=False)
|
|
|
|
def change_gr_audiobook_list(selected, id):
|
|
session = context.get_session(id)
|
|
session['audiobook'] = selected
|
|
visible = True if session['audiobook'] is not None else False
|
|
return gr.update(value=selected), gr.update(value=selected), gr.update(visible=visible)
|
|
|
|
def update_convert_btn(upload_file=None, upload_file_mode=None, custom_model_file=None, session=None):
|
|
if session is None:
|
|
return gr.update(variant='primary', interactive=False)
|
|
else:
|
|
if hasattr(upload_file, 'name') and not hasattr(custom_model_file, 'name'):
|
|
return gr.update(variant='primary', interactive=True)
|
|
elif isinstance(upload_file, list) and len(upload_file) > 0 and upload_file_mode == 'directory' and not hasattr(custom_model_file, 'name'):
|
|
return gr.update(variant='primary', interactive=True)
|
|
else:
|
|
return gr.update(variant='primary', interactive=False)
|
|
|
|
def change_gr_ebook_file(data, id):
|
|
try:
|
|
session = context.get_session(id)
|
|
session['ebook'] = None
|
|
session['ebook_list'] = None
|
|
if data is None:
|
|
if session['status'] == 'converting':
|
|
session['cancellation_requested'] = True
|
|
msg = 'Cancellation requested, please wait...'
|
|
yield gr.update(value=show_modal('wait', msg),visible=True)
|
|
return
|
|
elif isinstance(data, list):
|
|
session['ebook_list'] = data
|
|
else:
|
|
session['ebook'] = data
|
|
session['cancellation_requested'] = False
|
|
except Exception as e:
|
|
error = f'change_gr_ebook_file(): {e}'
|
|
alert_exception(error)
|
|
return gr.update(visible=False)
|
|
|
|
def change_gr_ebook_mode(val, id):
|
|
session = context.get_session(id)
|
|
session['ebook_mode'] = val
|
|
if val == 'single':
|
|
return gr.update(label=src_label_file, value=None, file_count='single')
|
|
else:
|
|
return gr.update(label=src_label_dir, value=None, file_count='directory')
|
|
|
|
def change_gr_voice_file(f, id):
|
|
if f is not None:
|
|
state = {}
|
|
if len(voice_options) > max_custom_voices:
|
|
error = f'You are allowed to upload a max of {max_custom_voices} voices'
|
|
state['type'] = 'warning'
|
|
state['msg'] = error
|
|
elif os.path.splitext(f.name)[1] not in voice_formats:
|
|
error = f'The audio file format selected is not valid.'
|
|
state['type'] = 'warning'
|
|
state['msg'] = error
|
|
else:
|
|
session = context.get_session(id)
|
|
voice_name = os.path.splitext(os.path.basename(f))[0].replace('&', 'And').replace(' ', '_')
|
|
voice_name = get_sanitized(voice_name)
|
|
extractor = VoiceExtractor(session, models_dir, f, voice_name)
|
|
status, msg = extractor.extract_voice()
|
|
if status:
|
|
session['voice'] = os.path.join(session['voice_dir'], f'{voice_name}_24000.wav')
|
|
msg = f"Voice {voice_name} added to the voices list"
|
|
state['type'] = 'success'
|
|
state['msg'] = msg
|
|
else:
|
|
error = 'failed! Check if you audio file is compatible.'
|
|
state['type'] = 'warning'
|
|
state['msg'] = error
|
|
show_alert(state)
|
|
return gr.update(value=None)
|
|
return gr.update()
|
|
|
|
def change_gr_voice_list(selected, id):
|
|
session = context.get_session(id)
|
|
session['voice'] = next((value for label, value in voice_options if value == selected), None)
|
|
visible = True if session['voice'] is not None else False
|
|
min_width = 60 if session['voice'] is not None else 0
|
|
return gr.update(value=session['voice'], visible=visible, min_width=min_width), gr.update(visible=visible)
|
|
|
|
def click_gr_voice_del_btn(selected, id):
|
|
try:
|
|
if selected is not None:
|
|
voice_name = os.path.basename(selected).replace('_24000.wav','').replace('_16000.wav','')
|
|
if voice_name in builtin_xtts_voices.keys() or voice_name in builtin_yourtts_voices.keys():
|
|
error = f'Voice file {voice_name} is a builtin voice and cannot be deleted.'
|
|
show_alert({"type": "warning", "msg": error})
|
|
else:
|
|
try:
|
|
session = context.get_session(id)
|
|
if selected.find(session['voice_dir']) > -1:
|
|
msg = f'Are you sure to delete {voice_name}...'
|
|
return gr.update(value='confirm_voice_del'), gr.update(value=show_modal('confirm', msg),visible=True)
|
|
else:
|
|
error = f'{voice_name} is part of the global voices directorye. Only your own custom uploaded voices can be deleted!'
|
|
show_alert({"type": "warning", "msg": error})
|
|
except Exception as e:
|
|
error = f'Could not delete the voice file {voice_name}!'
|
|
alert_exception(error)
|
|
return gr.update(), gr.update(visible=False)
|
|
except Exception as e:
|
|
error = f'click_gr_voice_del_btn(): {e}'
|
|
alert_exception(error)
|
|
return gr.update(), gr.update(visible=False)
|
|
|
|
def click_gr_custom_model_del_btn(selected, id):
|
|
try:
|
|
if selected is not None:
|
|
session = context.get_session(id)
|
|
selected_name = os.path.basename(selected)
|
|
msg = f'Are you sure to delete {selected_name}...'
|
|
return gr.update(value='confirm_custom_model_del'), gr.update(value=show_modal('confirm', msg),visible=True)
|
|
except Exception as e:
|
|
error = f'Could not delete the custom model {selected_name}!'
|
|
alert_exception(error)
|
|
return gr.update(), gr.update(visible=False)
|
|
|
|
def click_gr_audiobook_del_btn(selected, id):
|
|
try:
|
|
if selected is not None:
|
|
session = context.get_session(id)
|
|
selected_name = os.path.basename(selected)
|
|
msg = f'Are you sure to delete {selected_name}...'
|
|
return gr.update(value='confirm_audiobook_del'), gr.update(value=show_modal('confirm', msg),visible=True)
|
|
except Exception as e:
|
|
error = f'Could not delete the audiobook {selected_name}!'
|
|
alert_exception(error)
|
|
return gr.update(), gr.update(visible=False)
|
|
|
|
def confirm_deletion(voice, custom_model, audiobook, id, method=None):
|
|
try:
|
|
if method is not None:
|
|
session = context.get_session(id)
|
|
if method == 'confirm_voice_del':
|
|
selected_name = os.path.basename(voice)
|
|
pattern = voice.replace('_16000.wav', '_*.wav').replace('_24000.wav', '_*.wav')
|
|
files_to_remove = glob(pattern)
|
|
for file in files_to_remove:
|
|
os.remove(file)
|
|
msg = f'Voice file {selected_name.replace('_16000.wav', '').replace('_24000.wav', '')} deleted!'
|
|
session['voice'] = None
|
|
show_alert({"type": "warning", "msg": msg})
|
|
return update_gr_voice_list(id), gr.update(), gr.update(), gr.update(visible=False)
|
|
elif method == 'confirm_custom_model_del':
|
|
selected_name = os.path.basename(custom_model)
|
|
shutil.rmtree(custom_model, ignore_errors=True)
|
|
msg = f'Custom model {selected_name} deleted!'
|
|
session['custom_model'] = None
|
|
show_alert({"type": "warning", "msg": msg})
|
|
return gr.update(), update_gr_custom_model_list(id), gr.update(), gr.update(visible=False)
|
|
elif method == 'confirm_audiobook_del':
|
|
selected_name = os.path.basename(audiobook)
|
|
if os.path.isdir(audiobook):
|
|
shutil.rmtree(selected, ignore_errors=True)
|
|
else:
|
|
os.remove(audiobook)
|
|
msg = f'Audiobook {selected_name} deleted!'
|
|
session['audiobook'] = None
|
|
show_alert({"type": "warning", "msg": msg})
|
|
return gr.update(), gr.update(), update_gr_audiobook_list(id), gr.update(visible=False)
|
|
except Exception as e:
|
|
error = f'confirm_deletion(): {e}!'
|
|
alert_exception(error)
|
|
return gr.update(), gr.update(), gr.update(), gr.update(visible=False)
|
|
|
|
def prepare_audiobook_download(selected):
|
|
if os.path.exists(selected):
|
|
return selected
|
|
return None
|
|
|
|
def update_gr_voice_list(id):
|
|
try:
|
|
nonlocal voice_options
|
|
session = context.get_session(id)
|
|
voice_lang_dir = session['language'] if session['language'] != 'con' else 'con-' # Bypass Windows CON reserved name
|
|
voice_file_pattern = f"*_{models[session['tts_engine']][session['fine_tuned']]['samplerate']}.wav"
|
|
voice_options = [
|
|
(os.path.splitext(f.name)[0].replace('_16000', '').replace('_24000', '').replace('_22050',''), str(f))
|
|
for f in Path(session['voice_dir']).rglob(voice_file_pattern)
|
|
]
|
|
voice_options += [
|
|
(os.path.splitext(f.name)[0].replace('_16000', '').replace('_24000', '').replace('_22050',''), str(f))
|
|
for f in Path(os.path.join(voices_dir, voice_lang_dir)).rglob(voice_file_pattern)
|
|
]
|
|
voice_options = [('None', None)] + sorted(voice_options, key=lambda x: x[0].lower())
|
|
session['voice'] = session['voice'] if session['voice'] in [option[1] for option in voice_options] else voice_options[0][1]
|
|
return gr.update(choices=voice_options, value=session['voice'])
|
|
except Exception as e:
|
|
error = f'update_gr_voice_list(): {e}!'
|
|
alert_exception(error)
|
|
return gr.update()
|
|
|
|
def update_gr_tts_engine_list(id):
|
|
try:
|
|
nonlocal tts_engine_options
|
|
session = context.get_session(id)
|
|
tts_engine_options = get_compatible_tts_engines(session['language'])
|
|
session['tts_engine'] = session['tts_engine'] if session['tts_engine'] in tts_engine_options else tts_engine_options[0]
|
|
return gr.update(choices=tts_engine_options, value=session['tts_engine'])
|
|
except Exception as e:
|
|
error = f'update_gr_tts_engine_list(): {e}!'
|
|
alert_exception(error)
|
|
return gr.update()
|
|
|
|
def update_gr_custom_model_list(id):
|
|
try:
|
|
nonlocal custom_model_options
|
|
session = context.get_session(id)
|
|
custom_model_tts_dir = check_custom_model_tts(session['custom_model_dir'], session['tts_engine'])
|
|
custom_model_options = [('None', None)] + [
|
|
(os.path.basename(os.path.join(custom_model_tts_dir, dir)), os.path.join(custom_model_tts_dir, dir))
|
|
for dir in os.listdir(custom_model_tts_dir)
|
|
if os.path.isdir(os.path.join(custom_model_tts_dir, dir))
|
|
]
|
|
session['custom_model'] = session['custom_model'] if session['custom_model'] in [option[1] for option in custom_model_options] else custom_model_options[0][1]
|
|
return gr.update(choices=custom_model_options, value=session['custom_model'])
|
|
except Exception as e:
|
|
error = f'update_gr_custom_model_list(): {e}!'
|
|
alert_exception(error)
|
|
return gr.update()
|
|
|
|
def update_gr_fine_tuned_list(id):
|
|
try:
|
|
nonlocal fine_tuned_options
|
|
session = context.get_session(id)
|
|
fine_tuned_options = [
|
|
name for name, details in models.get(session['tts_engine'],{}).items()
|
|
if details.get('lang') == 'multi' or details.get('lang') == session['language']
|
|
]
|
|
session['fine_tuned'] = session['fine_tuned'] if session['fine_tuned'] in fine_tuned_options else default_fine_tuned
|
|
return gr.update(choices=fine_tuned_options, value=session['fine_tuned'])
|
|
except Exception as e:
|
|
error = f'update_gr_fine_tuned_list(): {e}!'
|
|
alert_exception(error)
|
|
return gr.update()
|
|
|
|
def change_gr_device(device, id):
|
|
session = context.get_session(id)
|
|
session['device'] = device
|
|
|
|
def change_gr_language(selected, id):
|
|
session = context.get_session(id)
|
|
if selected == 'zzz':
|
|
new_language_code = default_language_code
|
|
else:
|
|
new_language_code = selected
|
|
session['language'] = new_language_code
|
|
return[
|
|
gr.update(value=session['language']),
|
|
update_gr_voice_list(id),
|
|
update_gr_tts_engine_list(id),
|
|
update_gr_custom_model_list(id),
|
|
update_gr_fine_tuned_list(id)
|
|
]
|
|
|
|
def check_custom_model_tts(custom_model_dir, tts_engine):
|
|
dir_path = os.path.join(custom_model_dir, tts_engine)
|
|
if not os.path.isdir(dir_path):
|
|
os.makedirs(dir_path, exist_ok=True)
|
|
return dir_path
|
|
|
|
def change_gr_custom_model_file(f, t, id):
|
|
if f is not None:
|
|
state = {}
|
|
try:
|
|
if len(custom_model_options) > max_custom_model:
|
|
error = f'You are allowed to upload a max of {max_custom_models} models'
|
|
state['type'] = 'warning'
|
|
state['msg'] = error
|
|
else:
|
|
session = context.get_session(id)
|
|
session['tts_engine'] = t
|
|
if analyze_uploaded_file(f):
|
|
model = extract_custom_model(f, session)
|
|
if model is None:
|
|
error = f'Cannot extract custom model zip file {os.path.basename(f)}'
|
|
state['type'] = 'warning'
|
|
state['msg'] = error
|
|
else:
|
|
session['custom_model'] = model
|
|
msg = f'{os.path.basename(model)} added to the custom models list'
|
|
state['type'] = 'success'
|
|
state['msg'] = msg
|
|
else:
|
|
error = f'{os.path.basename(f)} is not a valid model or some required files are missing'
|
|
state['type'] = 'warning'
|
|
state['msg'] = error
|
|
except ClientDisconnect:
|
|
error = 'Client disconnected during upload. Operation aborted.'
|
|
state['type'] = 'error'
|
|
state['msg'] = error
|
|
except Exception as e:
|
|
error = f'change_gr_custom_model_file() exception: {str(e)}'
|
|
state['type'] = 'error'
|
|
state['msg'] = error
|
|
show_alert(state)
|
|
return gr.update(value=None)
|
|
return gr.update()
|
|
|
|
def change_gr_tts_engine_list(engine, id):
|
|
session = context.get_session(id)
|
|
session['tts_engine'] = engine
|
|
if session['tts_engine'] == XTTSv2:
|
|
visible = True
|
|
if session['fine_tuned'] != default_fine_tuned:
|
|
visible = visible_gr_group_custom_model
|
|
return gr.update(visible=visible_gr_tab_preferences), gr.update(visible=visible), update_gr_fine_tuned_list(id), gr.update(label=f"*Custom Model Zip File (Mandatory files {models[session['tts_engine']][default_fine_tuned]['files']})")
|
|
else:
|
|
return gr.update(visible=False), gr.update(visible=False), update_gr_fine_tuned_list(id), gr.update(label=f"*Custom Model Zip File (Mandatory files {models[session['tts_engine']][default_fine_tuned]['files']})")
|
|
|
|
def change_gr_fine_tuned_list(selected, id):
|
|
session = context.get_session(id)
|
|
visible = False
|
|
if selected == default_fine_tuned and session['tts_engine'] == XTTSv2:
|
|
visible = visible_gr_group_custom_model
|
|
session['fine_tuned'] = selected
|
|
return gr.update(visible=visible)
|
|
|
|
def change_gr_custom_model_list(selected, id):
|
|
session = context.get_session(id)
|
|
session['custom_model'] = next((value for label, value in custom_model_options if value == selected), None)
|
|
visible = True if session['custom_model'] is not None else False
|
|
return gr.update(visible=not visible), gr.update(visible=visible)
|
|
|
|
def change_gr_output_format_list(val, id):
|
|
session = context.get_session(id)
|
|
session['output_format'] = val
|
|
return
|
|
|
|
def change_param(key, val, id, val2=None):
|
|
session = context.get_session(id)
|
|
session[key] = val
|
|
state = {}
|
|
if key == 'length_penalty':
|
|
if val2 is not None:
|
|
if float(val) > float(val2):
|
|
error = 'Length penalty must be always lower than num beams if greater than 1.0 or equal if 1.0'
|
|
state['type'] = 'warning'
|
|
state['msg'] = error
|
|
show_alert(state)
|
|
elif key == 'num_beams':
|
|
if val2 is not None:
|
|
if float(val) < float(val2):
|
|
error = 'Num beams must be always higher than length penalty or equal if its value is 1.0'
|
|
state['type'] = 'warning'
|
|
state['msg'] = error
|
|
show_alert(state)
|
|
return
|
|
|
|
def submit_convert_btn(id, device, ebook_file, tts_engine, voice, language, custom_model, fine_tuned, output_format, temperature, length_penalty, num_beams, repetition_penalty, top_k, top_p, speed, enable_text_splitting):
|
|
try:
|
|
args = {
|
|
"is_gui_process": is_gui_process,
|
|
"session": id,
|
|
"script_mode": script_mode,
|
|
"device": device.lower(),
|
|
"tts_engine": tts_engine,
|
|
"ebook": ebook_file if isinstance(ebook_file, str) else None,
|
|
"ebook_list": ebook_file if isinstance(ebook_file, list) else None,
|
|
"audiobooks_dir": audiobooks_dir,
|
|
"voice": voice,
|
|
"language": language,
|
|
"custom_model": custom_model,
|
|
"output_format": output_format,
|
|
"temperature": float(temperature),
|
|
"length_penalty": float(length_penalty),
|
|
"num_beams": int(num_beams),
|
|
"repetition_penalty": float(repetition_penalty),
|
|
"top_k": int(top_k),
|
|
"top_p": float(top_p),
|
|
"speed": float(speed),
|
|
"enable_text_splitting": enable_text_splitting,
|
|
"fine_tuned": fine_tuned
|
|
}
|
|
error = None
|
|
if args["ebook"] is None and args['ebook_list'] is None:
|
|
error = 'Error: a file or directory is required.'
|
|
show_alert({"type": "warning", "msg": error})
|
|
elif args['num_beams'] < args['length_penalty']:
|
|
error = 'Error: num beams must be greater or equal than length penalty.'
|
|
show_alert({"type": "warning", "msg": error})
|
|
else:
|
|
session = context.get_session(id)
|
|
session['status'] = 'converting'
|
|
session['progress'] = len(audiobook_options)
|
|
if isinstance(args['ebook_list'], list):
|
|
ebook_list = args['ebook_list'][:]
|
|
for file in ebook_list:
|
|
if any(file.endswith(ext) for ext in ebook_formats):
|
|
print(f'Processing eBook file: {os.path.basename(file)}')
|
|
args['ebook'] = file
|
|
progress_status, audiobook_file = convert_ebook(args)
|
|
if audiobook_file is None:
|
|
if session['status'] == 'converting':
|
|
error = 'Conversion cancelled.'
|
|
session['status'] = None
|
|
break
|
|
else:
|
|
error = 'Conversion failed.'
|
|
session['status'] = None
|
|
break
|
|
else:
|
|
show_alert({"type": "success", "msg": progress_status})
|
|
args['ebook_list'].remove(file)
|
|
reset_ebook_session(args['session'])
|
|
count_file = len(args['ebook_list'])
|
|
if count_file > 0:
|
|
msg = f"{len(args['ebook_list'])} remaining..."
|
|
else:
|
|
msg = 'Conversion successful!'
|
|
yield gr.update(value=msg), update_gr_audiobook_list(id)
|
|
session['status'] = None
|
|
else:
|
|
print(f"Processing eBook file: {os.path.basename(args['ebook'])}")
|
|
progress_status, audiobook_file = convert_ebook(args)
|
|
if audiobook_file is None:
|
|
if session['status'] == 'converting':
|
|
session['status'] = None
|
|
error = 'Conversion cancelled.'
|
|
else:
|
|
error = 'Conversion failed.'
|
|
session['status'] = None
|
|
else:
|
|
show_alert({"type": "success", "msg": progress_status})
|
|
reset_ebook_session(args['session'])
|
|
msg = 'Conversion successful!'
|
|
return gr.update(value=msg), gr.update()
|
|
if error is not None:
|
|
show_alert({"type": "warning", "msg": error})
|
|
except Exception as e:
|
|
error = f'submit_convert_btn(): {e}'
|
|
alert_exception(error)
|
|
return gr.update(value=''), gr.update()
|
|
|
|
def update_gr_audiobook_list(id):
|
|
try:
|
|
nonlocal audiobook_options
|
|
session = context.get_session(id)
|
|
audiobook_options = [
|
|
(f, os.path.join(session['audiobooks_dir'], f))
|
|
for f in os.listdir(session['audiobooks_dir'])
|
|
]
|
|
audiobook_options.sort(
|
|
key=lambda x: os.path.getmtime(x[1]),
|
|
reverse=True
|
|
)
|
|
session['audiobook'] = session['audiobook'] if session['audiobook'] in [option[1] for option in audiobook_options] else None
|
|
if len(audiobook_options) > 0:
|
|
if not session['audiobook']:
|
|
session['audiobook'] = audiobook_options[0][1]
|
|
return gr.update(choices=audiobook_options, value=session['audiobook'])
|
|
except Exception as e:
|
|
error = f'update_gr_audiobook_list(): {e}!'
|
|
alert_exception(error)
|
|
return gr.update()
|
|
|
|
def change_gr_read_data(data, state):
|
|
nonlocal audiobooks_dir
|
|
msg = 'Error while loading saved session. Please try to delete your cookies and refresh the page'
|
|
try:
|
|
if data is None:
|
|
session = context.get_session(str(uuid.uuid4()))
|
|
else:
|
|
try:
|
|
if 'id' not in data:
|
|
data['id'] = str(uuid.uuid4())
|
|
session = context.get_session(data['id'])
|
|
restore_session_from_data(data, session)
|
|
session['cancellation_requested'] = False
|
|
if isinstance(session['ebook'], str):
|
|
if not os.path.exists(session['ebook']):
|
|
session['ebook'] = None
|
|
if session['voice'] is not None:
|
|
if not os.path.exists(session['voice']):
|
|
session['voice'] = None
|
|
if session['custom_model'] is not None:
|
|
if not os.path.exists(session['custom_model_dir']):
|
|
session['custom_model'] = None
|
|
if session['fine_tuned'] is not None:
|
|
if session['tts_engine'] is not None:
|
|
if session['tts_engine'] in models.keys():
|
|
if session['fine_tuned'] not in models[session['tts_engine']].keys():
|
|
session['fine_tuned'] = default_fine_tuned
|
|
else:
|
|
session['tts_engine'] = default_tts_engine
|
|
session['fine_tuned'] = default_fine_tuned
|
|
if session['audiobook'] is not None:
|
|
if not os.path.exists(session['audiobook']):
|
|
session['audiobook'] = None
|
|
if session['status'] == 'converting':
|
|
session['status'] = None
|
|
except Exception as e:
|
|
error = f'change_gr_read_data(): {e}'
|
|
alert_exception(error)
|
|
return gr.update(), gr.update(), gr.update()
|
|
session['system'] = (f"{platform.system()}-{platform.release()}").lower()
|
|
session['custom_model_dir'] = os.path.join(models_dir, '__sessions', f"model-{session['id']}")
|
|
session['voice_dir'] = os.path.join(voices_dir, '__sessions', f"voice-{session['id']}")
|
|
os.makedirs(session['custom_model_dir'], exist_ok=True)
|
|
os.makedirs(session['voice_dir'], exist_ok=True)
|
|
if is_gui_shared:
|
|
msg = f' Note: access limit time: {interface_shared_tmp_expire} days'
|
|
audiobooks_dir = os.path.join(audiobooks_gradio_dir, f"web-{session['id']}")
|
|
session['audiobooks_dir'] = audiobooks_dir
|
|
delete_unused_tmp_dirs(audiobooks_gradio_dir, interface_shared_tmp_expire, session)
|
|
else:
|
|
msg = f' Note: if no activity is detected after {tmp_expire} days, your session will be cleaned up.'
|
|
audiobooks_dir = os.path.join(audiobooks_host_dir, f"web-{session['id']}")
|
|
session['audiobooks_dir'] = audiobooks_dir
|
|
delete_unused_tmp_dirs(audiobooks_host_dir, tmp_expire, session)
|
|
if not os.path.exists(session['audiobooks_dir']):
|
|
os.makedirs(session['audiobooks_dir'], exist_ok=True)
|
|
previous_hash = state['hash']
|
|
new_hash = hash_proxy_dict(MappingProxyType(session))
|
|
state['hash'] = new_hash
|
|
session_dict = proxy_to_dict(session)
|
|
show_alert({"type": "info", "msg": msg})
|
|
return gr.update(value=session_dict), gr.update(value=state), gr.update(value=session['id'])
|
|
except Exception as e:
|
|
error = f'change_gr_read_data(): {e}'
|
|
alert_exception(error)
|
|
return gr.update(), gr.update(), gr.update()
|
|
|
|
def save_session(id, state):
|
|
try:
|
|
if id:
|
|
if id in context.sessions:
|
|
session = context.get_session(id)
|
|
if session:
|
|
if session['event'] == 'clear':
|
|
session_dict = session
|
|
else:
|
|
previous_hash = state['hash']
|
|
new_hash = hash_proxy_dict(MappingProxyType(session))
|
|
if previous_hash == new_hash:
|
|
return gr.update(), gr.update(), gr.update()
|
|
else:
|
|
state['hash'] = new_hash
|
|
session_dict = proxy_to_dict(session)
|
|
if session['status'] == 'converting':
|
|
if session['progress'] != len(audiobook_options):
|
|
session['progress'] = len(audiobook_options)
|
|
return gr.update(value=json.dumps(session_dict, indent=4)), gr.update(value=state), update_gr_audiobook_list(id)
|
|
return gr.update(value=json.dumps(session_dict, indent=4)), gr.update(value=state), gr.update()
|
|
return gr.update(), gr.update(), gr.update()
|
|
except Exception as e:
|
|
error = f'save_session(): {e}!'
|
|
alert_exception(error)
|
|
return gr.update(), gr.update(value=e), gr.update()
|
|
|
|
def clear_event(id):
|
|
session = context.get_session(id)
|
|
if session['event'] is not None:
|
|
session['event'] = None
|
|
|
|
gr_ebook_file.change(
|
|
fn=update_convert_btn,
|
|
inputs=[gr_ebook_file, gr_ebook_mode, gr_custom_model_file, gr_session],
|
|
outputs=[gr_convert_btn]
|
|
).then(
|
|
fn=change_gr_ebook_file,
|
|
inputs=[gr_ebook_file, gr_session],
|
|
outputs=[gr_modal]
|
|
)
|
|
gr_ebook_mode.change(
|
|
fn=change_gr_ebook_mode,
|
|
inputs=[gr_ebook_mode, gr_session],
|
|
outputs=[gr_ebook_file]
|
|
)
|
|
gr_voice_file.upload(
|
|
fn=change_gr_voice_file,
|
|
inputs=[gr_voice_file, gr_session],
|
|
outputs=[gr_voice_file]
|
|
).then(
|
|
fn=update_gr_voice_list,
|
|
inputs=[gr_session],
|
|
outputs=[gr_voice_list]
|
|
)
|
|
gr_voice_list.change(
|
|
fn=change_gr_voice_list,
|
|
inputs=[gr_voice_list, gr_session],
|
|
outputs=[gr_voice_player, gr_voice_del_btn]
|
|
)
|
|
gr_voice_del_btn.click(
|
|
fn=click_gr_voice_del_btn,
|
|
inputs=[gr_voice_list, gr_session],
|
|
outputs=[gr_confirm_field_hidden, gr_modal]
|
|
)
|
|
gr_device.change(
|
|
fn=change_gr_device,
|
|
inputs=[gr_device, gr_session],
|
|
outputs=None
|
|
)
|
|
gr_language.change(
|
|
fn=change_gr_language,
|
|
inputs=[gr_language, gr_session],
|
|
outputs=[gr_language, gr_voice_list, gr_tts_engine_list, gr_custom_model_list, gr_fine_tuned_list]
|
|
)
|
|
gr_tts_engine_list.change(
|
|
fn=change_gr_tts_engine_list,
|
|
inputs=[gr_tts_engine_list, gr_session],
|
|
outputs=[gr_tab_preferences, gr_group_custom_model, gr_fine_tuned_list, gr_custom_model_file]
|
|
)
|
|
gr_fine_tuned_list.change(
|
|
fn=change_gr_fine_tuned_list,
|
|
inputs=[gr_fine_tuned_list, gr_session],
|
|
outputs=[gr_group_custom_model]
|
|
)
|
|
gr_custom_model_file.upload(
|
|
fn=change_gr_custom_model_file,
|
|
inputs=[gr_custom_model_file, gr_tts_engine_list, gr_session],
|
|
outputs=[gr_custom_model_file]
|
|
).then(
|
|
fn=update_gr_custom_model_list,
|
|
inputs=[gr_session],
|
|
outputs=[gr_custom_model_list]
|
|
)
|
|
gr_custom_model_list.change(
|
|
fn=change_gr_custom_model_list,
|
|
inputs=[gr_custom_model_list, gr_session],
|
|
outputs=[gr_fine_tuned_list, gr_custom_model_del_btn]
|
|
)
|
|
gr_custom_model_del_btn.click(
|
|
fn=click_gr_custom_model_del_btn,
|
|
inputs=[gr_custom_model_list, gr_session],
|
|
outputs=[gr_confirm_field_hidden, gr_modal]
|
|
)
|
|
gr_output_format_list.change(
|
|
fn=change_gr_output_format_list,
|
|
inputs=[gr_output_format_list, gr_session],
|
|
outputs=None
|
|
)
|
|
gr_audiobook_download_btn.click(
|
|
fn=lambda audiobook: show_alert({"type": "info", "msg": f'Downloading {os.path.basename(audiobook)}'}),
|
|
inputs=[gr_audiobook_list],
|
|
outputs=None,
|
|
show_progress='minimal'
|
|
)
|
|
gr_audiobook_list.change(
|
|
fn=change_gr_audiobook_list,
|
|
inputs=[gr_audiobook_list, gr_session],
|
|
outputs=[gr_audiobook_download_btn, gr_audiobook_player, gr_group_audiobook_list]
|
|
)
|
|
gr_audiobook_del_btn.click(
|
|
fn=click_gr_audiobook_del_btn,
|
|
inputs=[gr_audiobook_list, gr_session],
|
|
outputs=[gr_confirm_field_hidden, gr_modal]
|
|
)
|
|
########## Parameters
|
|
gr_temperature.change(
|
|
fn=lambda val, id: change_param('temperature', val, id),
|
|
inputs=[gr_temperature, gr_session],
|
|
outputs=None
|
|
)
|
|
gr_length_penalty.change(
|
|
fn=lambda val, id, val2: change_param('length_penalty', val, id, val2),
|
|
inputs=[gr_length_penalty, gr_session, gr_num_beams],
|
|
outputs=None,
|
|
)
|
|
gr_num_beams.change(
|
|
fn=lambda val, id, val2: change_param('num_beams', val, id, val2),
|
|
inputs=[gr_num_beams, gr_session, gr_length_penalty],
|
|
outputs=None,
|
|
)
|
|
gr_repetition_penalty.change(
|
|
fn=lambda val, id: change_param('repetition_penalty', val, id),
|
|
inputs=[gr_repetition_penalty, gr_session],
|
|
outputs=None
|
|
)
|
|
gr_top_k.change(
|
|
fn=lambda val, id: change_param('top_k', val, id),
|
|
inputs=[gr_top_k, gr_session],
|
|
outputs=None
|
|
)
|
|
gr_top_p.change(
|
|
fn=lambda val, id: change_param('top_p', val, id),
|
|
inputs=[gr_top_p, gr_session],
|
|
outputs=None
|
|
)
|
|
gr_speed.change(
|
|
fn=lambda val, id: change_param('speed', val, id),
|
|
inputs=[gr_speed, gr_session],
|
|
outputs=None
|
|
)
|
|
gr_enable_text_splitting.change(
|
|
fn=lambda val, id: change_param('enable_text_splitting', val, id),
|
|
inputs=[gr_enable_text_splitting, gr_session],
|
|
outputs=None
|
|
)
|
|
##########
|
|
# Timer to save session to localStorage
|
|
gr_timer = gr.Timer(10, active=False)
|
|
gr_timer.tick(
|
|
fn=save_session,
|
|
inputs=[gr_session, gr_state],
|
|
outputs=[gr_write_data, gr_state, gr_audiobook_list],
|
|
).then(
|
|
fn=clear_event,
|
|
inputs=[gr_session],
|
|
outputs=None
|
|
)
|
|
gr_convert_btn.click(
|
|
fn=update_convert_btn,
|
|
inputs=None,
|
|
outputs=[gr_convert_btn]
|
|
).then(
|
|
fn=submit_convert_btn,
|
|
inputs=[
|
|
gr_session, gr_device, gr_ebook_file, gr_tts_engine_list, gr_voice_list, gr_language,
|
|
gr_custom_model_list, gr_fine_tuned_list, gr_output_format_list, gr_temperature, gr_length_penalty,
|
|
gr_num_beams, gr_repetition_penalty, gr_top_k, gr_top_p, gr_speed, gr_enable_text_splitting
|
|
],
|
|
outputs=[gr_conversion_progress, gr_audiobook_list]
|
|
).then(
|
|
fn=refresh_interface,
|
|
inputs=[gr_session],
|
|
outputs=[gr_convert_btn, gr_ebook_file, gr_audiobook_list, gr_audiobook_player, gr_modal]
|
|
)
|
|
gr_write_data.change(
|
|
fn=None,
|
|
inputs=[gr_write_data],
|
|
js='''
|
|
(data)=>{
|
|
if(data){
|
|
localStorage.clear();
|
|
if(data['event'] != 'clear'){
|
|
console.log('save: ', data);
|
|
window.localStorage.setItem("data", JSON.stringify(data));
|
|
}
|
|
}
|
|
}
|
|
'''
|
|
)
|
|
gr_read_data.change(
|
|
fn=change_gr_read_data,
|
|
inputs=[gr_read_data, gr_state],
|
|
outputs=[gr_write_data, gr_state, gr_session]
|
|
).then(
|
|
fn=restore_interface,
|
|
inputs=[gr_session],
|
|
outputs=[
|
|
gr_ebook_file, gr_ebook_mode, gr_device, gr_language, gr_voice_list,
|
|
gr_tts_engine_list, gr_custom_model_list, gr_fine_tuned_list,
|
|
gr_output_format_list, gr_audiobook_list,
|
|
gr_temperature, gr_length_penalty, gr_num_beams, gr_repetition_penalty,
|
|
gr_top_k, gr_top_p, gr_speed, gr_enable_text_splitting, gr_timer
|
|
]
|
|
)
|
|
gr_confirm_yes_btn_hidden.click(
|
|
fn=confirm_deletion,
|
|
inputs=[gr_voice_list, gr_custom_model_list, gr_audiobook_list, gr_session, gr_confirm_field_hidden],
|
|
outputs=[gr_voice_list, gr_custom_model_list, gr_audiobook_list, gr_modal]
|
|
)
|
|
gr_confirm_no_btn_hidden.click(
|
|
fn=confirm_deletion,
|
|
inputs=[gr_voice_list, gr_custom_model_list, gr_audiobook_list, gr_session],
|
|
outputs=[gr_voice_list, gr_custom_model_list, gr_audiobook_list, gr_modal]
|
|
)
|
|
interface.load(
|
|
fn=None,
|
|
js='''
|
|
() => {
|
|
try{
|
|
const data = window.localStorage.getItem('data');
|
|
if(data){
|
|
const obj = JSON.parse(data);
|
|
console.log(obj);
|
|
return obj;
|
|
}
|
|
}catch(e){
|
|
console.log('error: ',e)
|
|
}
|
|
return null;
|
|
}
|
|
''',
|
|
outputs=[gr_read_data]
|
|
)
|
|
try:
|
|
all_ips = get_all_ip_addresses()
|
|
msg = f'IPs available for connection:\n{all_ips}\nNote: 0.0.0.0 is not the IP to connect. Instead use an IP above to connect.'
|
|
show_alert({"type": "info", "msg": msg})
|
|
interface.queue(default_concurrency_limit=interface_concurrency_limit).launch(show_error=debug_mode, server_name=interface_host, server_port=interface_port, share=is_gui_shared, max_file_size=max_upload_size)
|
|
except OSError as e:
|
|
error = f'Connection error: {e}'
|
|
alert_exception(error)
|
|
except socket.error as e:
|
|
error = f'Socket error: {e}'
|
|
alert_exception(error)
|
|
except KeyboardInterrupt:
|
|
error = 'Server interrupted by user. Shutting down...'
|
|
alert_exception(error)
|
|
except Exception as e:
|
|
error = f'An unexpected error occurred: {e}'
|
|
alert_exception(error) |