Files
tinygrad/examples/sovits_helpers/preprocess.py
JaSpa99 2fd7004980 Implementation of SoftVC VITS SVC model (#1371)
* [WIP]: implementation of SoftVC VITS SVC model

* fix typo

* fix whitespace

* Fully implement Generator & Synthesizer

- implement SineGen & SourceHnNSF to reconstruct source signal from F0
- source signal is added during Generator
- fix various typos
- start loading state dict for synthesizer

* Load Synthesizer weights

- Fix typos in Synthesizer
- Slightly modify vits::load_checkpoint to skip a specified layer
- Test with Saul Goodman model because Drake weights are on mega

* start work on ContentVec

- implement ConvFeatureExtractionModel for ContentVec
- start work on TransformerEncoder for ContentVec:
- this transformer probably needs its own MultiheadAttention implementation
- fix various typos in synthesizer
- add helpers to mask behavior of ~ and % operator of torch

* use normal and kaiming_normal

* Implement ContentVec

- load ContentVec weights and config from fairseq hyperparams
- use MultiHeadAttention from whisper.py
- TransformerSentenceEncoderLayer might still need some tweaking, will see during inference testing
- redid tilde()
- some cleanup

* rename the file so it can be imported

* forgot to lint

* use float() instead of cast()

* add contentvec256l9 and cleanup

* Implement SoVITS fully and run it

- Fully run sovits with .wav file
- Drake weights need to be manually downloaded for now
- Fix bugs
- Add examples/sovits_helpers
- Big TODO: INVALID Kernel for recordings > 4.5 secs

* temp fix for longer audio recordings

* Upsample no more torch

* cleanup & detailed inference time measuring

* Completely remove torch(audio)

- Implement sinc resample in tinygrad
- Load audio via Soundfile
- Some cleanups

* move stuff to helper files

* Cleanup

* fix invalid kernel

* Cleanup & add more models

* Metal sounds good after master merge

- But Synthesizer pass became much slower

* drake weights now marked save

* do load/store in numpy

* no commas needed here

* remove extra newline

* call Tensor::where on object

* use Tensor::cat instead of numpy

* pull out first iteration

* remove Sequential, Dropout, GELU, TransposeLast

* cast during loading

* clean up attention

* remove SamePad

* Major cleanup / line reduction

- Finish implementation of GroupNormMasked
- Simplify parts of TransformerEncoder
- Simplify parts of Generator
- Move all helpers to common section
- Only use repeat_expand_left for interp after SpeechEncoder
- Moved SVC-specfic ContentVec impls up (canonically)
- Proper annotations for get_encoder
- Finished all TODOs
- Squashed some whitespaces

* clean up preprocess as well

* more straightforward bool expr

* add demo mode
2023-08-13 19:43:23 -07:00

206 lines
11 KiB
Python

import math
from typing import Optional, Tuple
from tinygrad.tensor import Tensor
from tinygrad.helpers import dtypes
import librosa
import soundfile
import numpy as np
import parselmouth
class PMF0Predictor: # from https://github.com/svc-develop-team/so-vits-svc/
def __init__(self,hop_length=512,f0_min=50,f0_max=1100,sampling_rate=44100):
self.hop_length, self.f0_min, self.f0_max, self.sampling_rate, self.name = hop_length, f0_min, f0_max, sampling_rate, "pm"
def interpolate_f0(self,f0):
vuv_vector = np.zeros_like(f0, dtype=np.float32)
vuv_vector[f0 > 0.0] = 1.0
vuv_vector[f0 <= 0.0] = 0.0
nzindex = np.nonzero(f0)[0]
data = f0[nzindex]
nzindex = nzindex.astype(np.float32)
time_org = self.hop_length / self.sampling_rate * nzindex
time_frame = np.arange(f0.shape[0]) * self.hop_length / self.sampling_rate
if data.shape[0] <= 0: return np.zeros(f0.shape[0], dtype=np.float32),vuv_vector
if data.shape[0] == 1: return np.ones(f0.shape[0], dtype=np.float32) * f0[0],vuv_vector
f0 = np.interp(time_frame, time_org, data, left=data[0], right=data[-1])
return f0,vuv_vector
def compute_f0(self,wav,p_len=None):
x = wav
if p_len is None: p_len = x.shape[0]//self.hop_length
else: assert abs(p_len-x.shape[0]//self.hop_length) < 4, "pad length error"
time_step = self.hop_length / self.sampling_rate * 1000
f0 = parselmouth.Sound(x, self.sampling_rate) \
.to_pitch_ac(time_step=time_step / 1000, voicing_threshold=0.6,pitch_floor=self.f0_min, pitch_ceiling=self.f0_max) \
.selected_array['frequency']
pad_size=(p_len - len(f0) + 1) // 2
if(pad_size>0 or p_len - len(f0) - pad_size>0):
f0 = np.pad(f0,[[pad_size,p_len - len(f0) - pad_size]], mode='constant')
f0,uv = self.interpolate_f0(f0)
return f0
def compute_f0_uv(self,wav,p_len=None):
x = wav
if p_len is None: p_len = x.shape[0]//self.hop_length
else: assert abs(p_len-x.shape[0]//self.hop_length) < 4, "pad length error"
time_step = self.hop_length / self.sampling_rate * 1000
f0 = parselmouth.Sound(x, self.sampling_rate).to_pitch_ac(
time_step=time_step / 1000, voicing_threshold=0.6,
pitch_floor=self.f0_min, pitch_ceiling=self.f0_max).selected_array['frequency']
pad_size=(p_len - len(f0) + 1) // 2
if(pad_size>0 or p_len - len(f0) - pad_size>0):
f0 = np.pad(f0,[[pad_size,p_len - len(f0) - pad_size]], mode='constant')
f0,uv = self.interpolate_f0(f0)
return f0,uv
class Slicer: # from https://github.com/svc-develop-team/so-vits-svc/
def __init__(self, sr: int, threshold: float = -40., min_length: int = 5000, min_interval: int = 300, hop_size: int = 20, max_sil_kept: int = 5000):
if not min_length >= min_interval >= hop_size:
raise ValueError('The following condition must be satisfied: min_length >= min_interval >= hop_size')
if not max_sil_kept >= hop_size:
raise ValueError('The following condition must be satisfied: max_sil_kept >= hop_size')
min_interval = sr * min_interval / 1000
self.threshold = 10 ** (threshold / 20.)
self.hop_size = round(sr * hop_size / 1000)
self.win_size = min(round(min_interval), 4 * self.hop_size)
self.min_length = round(sr * min_length / 1000 / self.hop_size)
self.min_interval = round(min_interval / self.hop_size)
self.max_sil_kept = round(sr * max_sil_kept / 1000 / self.hop_size)
def _apply_slice(self, waveform, begin, end):
if len(waveform.shape) > 1: return waveform[:, begin * self.hop_size: min(waveform.shape[1], end * self.hop_size)]
else: return waveform[begin * self.hop_size: min(waveform.shape[0], end * self.hop_size)]
def slice(self, waveform):
samples = librosa.to_mono(waveform) if len(waveform.shape) > 1 else waveform
if samples.shape[0] <= self.min_length: return {"0": {"slice": False, "split_time": f"0,{len(waveform)}"}}
rms_list = librosa.feature.rms(y=samples, frame_length=self.win_size, hop_length=self.hop_size).squeeze(0)
sil_tags, silence_start, clip_start = [], None, 0
for i, rms in enumerate(rms_list):
if rms < self.threshold: # Keep looping while frame is silent.
if silence_start is None: # Record start of silent frames.
silence_start = i
continue
if silence_start is None: continue # Keep looping while frame is not silent and silence start has not been recorded.
# Clear recorded silence start if interval is not enough or clip is too short
is_leading_silence = silence_start == 0 and i > self.max_sil_kept
need_slice_middle = i - silence_start >= self.min_interval and i - clip_start >= self.min_length
if not is_leading_silence and not need_slice_middle:
silence_start = None
continue
if i - silence_start <= self.max_sil_kept: # Need slicing. Record the range of silent frames to be removed.
pos = rms_list[silence_start: i + 1].argmin() + silence_start
sil_tags.append((0, pos) if silence_start == 0 else (pos, pos))
clip_start = pos
elif i - silence_start <= self.max_sil_kept * 2:
pos = rms_list[i - self.max_sil_kept: silence_start + self.max_sil_kept + 1].argmin()
pos += i - self.max_sil_kept
pos_l = rms_list[silence_start: silence_start + self.max_sil_kept + 1].argmin() + silence_start
pos_r = rms_list[i - self.max_sil_kept: i + 1].argmin() + i - self.max_sil_kept
if silence_start == 0:
sil_tags.append((0, pos_r))
clip_start = pos_r
else:
sil_tags.append((min(pos_l, pos), max(pos_r, pos)))
clip_start = max(pos_r, pos)
else:
pos_l = rms_list[silence_start: silence_start + self.max_sil_kept + 1].argmin() + silence_start
pos_r = rms_list[i - self.max_sil_kept: i + 1].argmin() + i - self.max_sil_kept
sil_tags.append((0, pos_r) if silence_start == 0 else (pos_l, pos_r))
clip_start = pos_r
silence_start = None
total_frames = rms_list.shape[0]
if silence_start is not None and total_frames - silence_start >= self.min_interval: # Deal with trailing silence.
silence_end = min(total_frames, silence_start + self.max_sil_kept)
pos = rms_list[silence_start: silence_end + 1].argmin() + silence_start
sil_tags.append((pos, total_frames + 1))
if len(sil_tags) == 0: return {"0": {"slice": False, "split_time": f"0,{len(waveform)}"}} # Apply and return slices.
chunks = []
if sil_tags[0][0]:
chunks.append({"slice": False, "split_time": f"0,{min(waveform.shape[0], sil_tags[0][0] * self.hop_size)}"})
for i in range(0, len(sil_tags)):
if i: chunks.append({"slice": False, "split_time": f"{sil_tags[i - 1][1] * self.hop_size},{min(waveform.shape[0], sil_tags[i][0] * self.hop_size)}"})
chunks.append({"slice": True, "split_time": f"{sil_tags[i][0] * self.hop_size},{min(waveform.shape[0], sil_tags[i][1] * self.hop_size)}"})
if sil_tags[-1][1] * self.hop_size < len(waveform):
chunks.append({"slice": False, "split_time": f"{sil_tags[-1][1] * self.hop_size},{len(waveform)}"})
chunk_dict = {}
for i in range(len(chunks)): chunk_dict[str(i)] = chunks[i]
return chunk_dict
# sinc_interp_hann audio resampling
class Resample:
def __init__(self, orig_freq:int=16000, new_freq:int=16000, lowpass_filter_width:int=6, rolloff:float=0.99, beta:Optional[float]=None, dtype:Optional[dtypes]=None):
self.orig_freq, self.new_freq, self.lowpass_filter_width, self.rolloff, self.beta = orig_freq, new_freq, lowpass_filter_width, rolloff, beta
self.gcd = math.gcd(int(self.orig_freq), int(self.new_freq))
self.kernel, self.width = self._get_sinc_resample_kernel(dtype) if self.orig_freq != self.new_freq else (None, None)
def __call__(self, waveform:Tensor) -> Tensor:
if self.orig_freq == self.new_freq: return waveform
return self._apply_sinc_resample_kernel(waveform)
def _apply_sinc_resample_kernel(self, waveform:Tensor):
if not waveform.is_floating_point(): raise TypeError(f"Waveform tensor expected to be of type float, but received {waveform.dtype}.")
orig_freq, new_freq = (int(self.orig_freq) // self.gcd), (int(self.new_freq) // self.gcd)
shape = waveform.shape
waveform = waveform.reshape(-1, shape[-1]) # pack batch
num_wavs, length = waveform.shape
target_length = int(math.ceil(new_freq * length / orig_freq))
waveform = waveform.pad2d((self.width, self.width + orig_freq))
resampled = waveform[:, None].conv2d(self.kernel, stride=orig_freq)
resampled = resampled.transpose(1, 2).reshape(num_wavs, -1)
resampled = resampled[..., :target_length]
resampled = resampled.reshape(shape[:-1] + resampled.shape[-1:]) # unpack batch
return resampled
def _get_sinc_resample_kernel(self, dtype=None):
orig_freq, new_freq = (int(self.orig_freq) // self.gcd), (int(self.new_freq) // self.gcd)
if self.lowpass_filter_width <= 0: raise ValueError("Low pass filter width should be positive.")
base_freq = min(orig_freq, new_freq)
base_freq *= self.rolloff
width = math.ceil(self.lowpass_filter_width * orig_freq / base_freq)
idx = Tensor.arange(-width, width + orig_freq, dtype=(dtype if dtype is not None else dtypes.float32))[None, None] / orig_freq
t = Tensor.arange(0, -new_freq, -1, dtype=dtype)[:, None, None] / new_freq + idx
t *= base_freq
t = t.clip(-self.lowpass_filter_width, self.lowpass_filter_width)
window = (t * math.pi / self.lowpass_filter_width / 2).cos() ** 2
t *= math.pi
scale = base_freq / orig_freq
kernels = Tensor.where(t == 0, Tensor(1.0, dtype=t.dtype).to(t.device), t.sin() / t)
kernels *= window * scale
if dtype is None: kernels = kernels.cast(dtype=dtypes.float32)
return kernels, width
def sinc_interp_resample(x:Tensor, orig_freq:int=16000, new_freq:int=1600, lowpass_filter_width:int=6, rolloff:float=0.99, beta:Optional[float]=None):
resamp = Resample(orig_freq, new_freq, lowpass_filter_width, rolloff, beta, x.dtype)
return resamp(x)
def cut(audio_path, db_thresh=-30, min_len=5000):
audio, sr = librosa.load(audio_path, sr=None)
slicer = Slicer(sr=sr, threshold=db_thresh, min_length=min_len)
chunks = slicer.slice(audio)
return chunks
def chunks2audio(audio_path, chunks):
chunks = dict(chunks)
audio, sr = load_audiofile(audio_path)
if len(audio.shape) == 2 and audio.shape[1] >= 2:
audio = audio.mean(0).unsqueeze(0)
audio = audio.numpy()[0]
result = []
for k, v in chunks.items():
tag = v["split_time"].split(",")
if tag[0] != tag[1]:
result.append((v["slice"], audio[int(tag[0]):int(tag[1])]))
return result, sr
def load_audiofile(filepath:str, frame_offset:int=0, num_frames:int=-1, channels_first:bool=True):
with soundfile.SoundFile(filepath, "r") as file_:
frames = file_._prepare_read(frame_offset, None, num_frames)
waveform = file_.read(frames, "float32", always_2d=True)
sample_rate = file_.samplerate
waveform = Tensor(waveform)
if channels_first: waveform = waveform.transpose(0, 1)
return waveform, sample_rate
def get_unit_f0(wav:Tensor, tran, hop_length, target_sample, f0_filter=False) -> Tuple[Tensor,Tensor,Tensor]:
f0_predictor = PMF0Predictor(hop_length, sampling_rate=target_sample)
f0, uv = f0_predictor.compute_f0_uv(wav.numpy())
if f0_filter and sum(f0) == 0: raise RuntimeError("No voice detected")
f0 = Tensor(f0.astype(np.float32)).float()
f0 = (f0 * 2 ** (tran / 12)).unsqueeze(0)
uv = Tensor(uv.astype(np.float32)).float().unsqueeze(0)
wav16k = sinc_interp_resample(wav[None,:], target_sample, 16000)[0]
return wav16k.realize(), f0.realize(), uv.realize()