import os import subprocess import shutil import torch from io import BytesIO from pydub import AudioSegment from torchvggish import vggish, vggish_input from lib.conf import voice_formats from lib.models import models class VoiceExtractor: def __init__(self, session, models_dir, voice_file, voice_name): self.wav_file = None self.session = session self.voice_file = voice_file self.voice_name = voice_name self.models_dir = models_dir self.voice_track = 'vocals.wav' self.samplerate = models[session['tts_engine']][session['fine_tuned']]['samplerate'] self.output_dir = self.session['voice_dir'] self.demucs_dir = os.path.join(self.output_dir, 'htdemucs', os.path.splitext(os.path.basename(self.voice_file))[0]) def _validate_format(self): file_extension = os.path.splitext(self.voice_file)[1].lower() if file_extension in voice_formats: msg = 'Input file valid' return True, msg error = f'Unsupported file format: {file_extension}. Supported formats are: {", ".join(voice_formats)}' return False, error def _convert_to_wav(self): try: self.wav_file = os.path.join(self.session['voice_dir'], os.path.basename(self.voice_file).replace(os.path.splitext(self.voice_file)[1], '.wav')) ffmpeg_cmd = [ shutil.which('ffmpeg'), '-i', self.voice_file, '-ac', '1', '-ar', '44100', '-y', self.wav_file ] process = subprocess.Popen( ffmpeg_cmd, env={}, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, universal_newlines=True, encoding='utf-8' ) for line in process.stdout: print(line, end='') # Print each line of stdout process.wait() if process.returncode != 0: error = f'_convert_to_wav(): process.returncode: {process.returncode}' elif not os.path.exists(self.wav_file) or os.path.getsize(self.wav_file) == 0: error = f'_convert_to_wav output error: {self.wav_file} was not created or is empty.' else: msg = 'Conversion to .wav format for processing successful' return True, msg except subprocess.CalledProcessError as e: error = f'convert_to_wav fmpeg.Error: {e.stderr.decode()}' raise ValueError(error) except Exception as e: error = f'_convert_to_wav() error: {e}' raise ValueError(error) return False, error def _detect_background(self): try: torch_home = os.path.join(self.models_dir, 'hub') torch.hub.set_dir(torch_home) os.environ['TORCH_HOME'] = torch_home energy_threshold = 8800 # to tune if not enough accurate (higher = less sensitive) model = vggish() model.eval() # Preprocess audio to log mel spectrogram log_mel_spectrogram = vggish_input.wavfile_to_examples(self.wav_file) audio_tensor = log_mel_spectrogram.clone().detach() embeddings = model(audio_tensor) # Calculate total energy energy_score = torch.norm(embeddings).item() status = energy_score > energy_threshold msg = f'Noise Score: {energy_score:.2f}' if status: msg = f'{msg}\nBackground noise or music detected. Proceeding voice extraction.' else: msg = f'{msg}\nNo background noise or music detected. Skipping separation.' return True, status, msg except Exception as e: error = f'_detect_background() error: {e}' raise ValueError(error) return False, False, error def _demucs_voice(self): try: cmd = [ "demucs", "--verbose", "--two-stems=vocals", "--out", self.output_dir, self.wav_file ] try: torch_home = self.models_dir torch.hub.set_dir(torch_home) os.environ['TORCH_HOME'] = torch_home process = subprocess.run(cmd, check=True) self.voice_track = os.path.join(self.demucs_dir, self.voice_track) msg = 'Voice track isolation successful' return True, msg except subprocess.CalledProcessError as e: error = ( f'_demucs_voice() subprocess CalledProcessError error: {e.returncode}\n\n' f'stdout: {e.output}\n\n' f'stderr: {e.stderr}' ) raise ValueError(error) except FileNotFoundError: error = f'_demucs_voice() subprocess FileNotFoundError error: The "demucs" command was not found. Ensure it is installed and in PATH.' raise ValueError(error) except Exception as e: error = f'_demucs_voice() subprocess Exception error: {str(e)}' raise ValueError(error) except Exception as e: error = f'_demucs_voice() error: {e}' raise ValueError(error) return False, error def _remove_silences(self): try: audio = AudioSegment.from_file(self.voice_track) trimmed_audio = AudioSegment.silent(duration=0) for chunk in audio[::100]: if chunk.dBFS > -50: trimmed_audio += chunk trimmed_audio.export(self.voice_track, format='wav') msg = 'Silences removed' return True, msg except Exception as e: error = f'_remove_silence() error: {e}' raise ValueError(e) return False, error def _normalize_audio(self): try: ffmpeg_final_file = os.path.join(self.session['voice_dir'], f'{self.voice_name}.wav') ffmpeg_cmd = [shutil.which('ffmpeg'), '-i', self.voice_track] filter_complex = ( 'agate=threshold=-25dB:ratio=1.4:attack=10:release=250,' 'afftdn=nf=-70,' 'acompressor=threshold=-20dB:ratio=2:attack=80:release=200:makeup=1dB,' 'loudnorm=I=-14:TP=-3:LRA=7:linear=true,' 'equalizer=f=150:t=q:w=2:g=1,' 'equalizer=f=250:t=q:w=2:g=-3,' 'equalizer=f=3000:t=q:w=2:g=2,' 'equalizer=f=5500:t=q:w=2:g=-4,' 'equalizer=f=9000:t=q:w=2:g=-2,' 'highpass=f=63[audio]' ) ffmpeg_cmd += [ '-filter_complex', filter_complex, '-map', '[audio]', '-ar', 'null', '-y', ffmpeg_final_file ] error = None for rate in ['16000', '24000']: ffmpeg_cmd[-3] = rate ffmpeg_cmd[-1] = ffmpeg_final_file.replace('.wav', f'_{rate}.wav') try: process = subprocess.Popen( ffmpeg_cmd, env={}, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, universal_newlines=True, encoding='utf-8' ) for line in process.stdout: print(line, end='') # Print each line of stdout process.wait() if process.returncode != 0: error = f'normalize_voice_file(): process.returncode: {process.returncode}' break except subprocess.CalledProcessError as e: error = f'_normalize_audio ffmpeg.Error: {e.stderr.decode()}' break if not os.path.exists(ffmpeg_final_file) or os.path.getsize(ffmpeg_final_file) == 0: error = f'_normalize_audio output error: {ffmpeg_final_file} was not created or is empty.' break if error is None: os.remove(ffmpeg_final_file) msg = 'Audio normalization successful!' return True, msg except FileNotFoundError: error = '_normalize_audio() FileNotFoundError: Error: Input file or FFmpeg binary is missing!' raise ValueError(error) except Exception as e: error = f'_normalize_audio() error: {e}' raise ValueError(error) return False, error def extract_voice(self): success = False msg = None try: success, msg = self._validate_format() print(msg) if success: success, msg = self._convert_to_wav() print(msg) if success: success, status, msg = self._detect_background() print(msg) if success: if status: success, msg = self._demucs_voice() print(msg) else: self.voice_track = self.wav_file if success: success, msg = self._remove_silences() print(msg) if success: success, msg = self._normalize_audio() print(msg) except Exception as e: msg = f'extract_voice() error: {e}' raise ValueError(msg) shutil.rmtree(self.demucs_dir, ignore_errors=True) torch.hub.set_dir(self.models_dir) os.environ['TORCH_HOME'] = self.models_dir return success, msg