|
import util |
|
import numpy as np |
|
import librosa |
|
import hashlib |
|
import json |
|
import os |
|
import torch |
|
import logging |
|
from rvc.infer_pack.models import ( |
|
SynthesizerTrnMs768NSFsid, |
|
SynthesizerTrnMs768NSFsid_nono |
|
) |
|
from os import getenv |
|
from typing import Union, Tuple, List |
|
from rvc.vc_infer_pipeline import VC |
|
from speakers.processors import BaseProcessor, ProcessorData |
|
from speakers.common.utils import get_abs_path |
|
from omegaconf import OmegaConf |
|
from speakers.common.registry import registry |
|
from pydantic import Field |
|
|
|
logger = logging.getLogger('speaker_runner') |
|
|
|
|
|
def set_rvc_speakers_logger(l): |
|
global logger |
|
logger = l |
|
|
|
|
|
class RvcProcessorData(ProcessorData): |
|
""" |
|
# https://github.com/fumiama/Retrieval-based-Voice-Conversion-WebUI/blob/main/infer-web.py#L118 # noqa |
|
:param f0_up_key: 变调(整数, 半音数量, 升八度12降八度-12) |
|
:param input_audio: |
|
:param f0_file: F0曲线文件, 可选, 一行一个音高, 代替默认F0及升降调 |
|
:param protect: 保护清辅音和呼吸声,防止电音撕裂等artifact,拉满0.5不开启,调低加大保护力度但可能降低索引效果 |
|
:param model_index: |
|
:param f0_method: |
|
:param index_rate: 检索特征占比 |
|
:param filter_radius: >=3则使用对harvest音高识别的结果使用中值滤波,数值为滤波半径,使用可以削弱哑音 |
|
:param rms_mix_rate: 输入源音量包络替换输出音量包络融合比例,越靠近1越使用输出包络 |
|
:param resample_sr: 后处理重采样至最终采样率,0为不进行重采样 |
|
:return: |
|
""" |
|
sample_rate: int = Field( |
|
default=0 |
|
) |
|
audio_samples: List[float] = Field( |
|
default=[] |
|
) |
|
|
|
model_index: int |
|
|
|
""" 变调(整数, 半音数量, 升八度12降八度-12)""" |
|
f0_up_key: int |
|
|
|
""" F0曲线文件, 可选, 一行一个音高, 代替默认F0及升降调""" |
|
f0_method: str |
|
|
|
"""检索特征占比""" |
|
index_rate: float |
|
""" >=3则使用对harvest音高识别的结果使用中值滤波,数值为滤波半径,使用可以削弱哑音""" |
|
filter_radius: int |
|
"""输入源音量包络替换输出音量包络融合比例,越靠近1越使用输出包络""" |
|
rms_mix_rate: float |
|
"""后处理重采样至最终采样率,0为不进行重采样""" |
|
resample_sr: int |
|
"""保护清辅音和呼吸声,防止电音撕裂等artifact,拉满0.5不开启,调低加大保护力度但可能降低索引效果""" |
|
protect: float = Field( |
|
default=0.33 |
|
) |
|
f0_file: str = Field( |
|
default=None |
|
) |
|
|
|
@property |
|
def type(self) -> str: |
|
"""Type of the Message, used for serialization.""" |
|
return "RVC" |
|
|
|
|
|
@registry.register_processor("rvc_speakers") |
|
class RVCSpeakers(BaseProcessor): |
|
""" |
|
音频处理器有抽象处理器Processor,通过单独的Processor配置,预加载音频处理器, |
|
不同的处理器有着特定人物的说话风格与配置参数 |
|
""" |
|
|
|
def __init__(self, hubert_model_path: str, rvc_config_file: str): |
|
|
|
self.in_hf_space = getenv('SYSTEM') == 'spaces' |
|
self._loaded_models = [] |
|
self._load_hubert(hubert_model_path=hubert_model_path) |
|
self._load_rvc_mode(rvc_config_file=rvc_config_file) |
|
|
|
def __call__( |
|
self, |
|
data: RvcProcessorData |
|
): |
|
|
|
audio_samples_np = np.array(data.audio_samples, dtype=np.float32) |
|
input_audio = (data.sample_rate, audio_samples_np) |
|
|
|
return self.vc_func(input_audio=input_audio, |
|
model_index=data.model_index, |
|
f0_up_key=data.f0_up_key, |
|
f0_method=data.f0_method, |
|
index_rate=data.index_rate, |
|
filter_radius=data.filter_radius, |
|
rms_mix_rate=data.rms_mix_rate, |
|
resample_sr=data.resample_sr, |
|
protect=data.protect, |
|
f0_file=data.f0_file) |
|
|
|
@classmethod |
|
def from_config(cls, cfg=None): |
|
if cfg is None: |
|
raise RuntimeError("from_config cfg is None.") |
|
|
|
hubert_model_path = cfg.get("hubert_model_path", "") |
|
rvc_config_file = cfg.get("rvc_config_file", "") |
|
|
|
return cls(hubert_model_path=os.path.join(registry.get_path("rvc_library_root"), |
|
hubert_model_path), |
|
rvc_config_file=os.path.join(registry.get_path("rvc_library_root"), |
|
rvc_config_file)) |
|
|
|
def match(self, data: ProcessorData): |
|
return "RVC" in data.type |
|
|
|
@property |
|
def loaded_models(self): |
|
return self._loaded_models |
|
|
|
def _load_hubert(self, hubert_model_path: str): |
|
|
|
|
|
logger.info(f'Load hubert model{hubert_model_path}') |
|
self.hubert_model = util.load_hubert_model(registry.get("device"), model_path=hubert_model_path) |
|
self.hubert_model.eval() |
|
logger.info('Loaded hubert model') |
|
|
|
def _load_rvc_mode(self, rvc_config_file: str): |
|
""" |
|
模型配置加载 |
|
:param rvc_config_file: |
|
:return: |
|
""" |
|
|
|
|
|
logger.info(f'Models Load:rvc_speakers') |
|
multi_cfg = OmegaConf.load(get_abs_path(rvc_config_file)) |
|
rmvpe_path = os.path.join(registry.get_path("rvc_library_root"), multi_cfg.get("rmvpe_path")) |
|
logger.info(f'rmvpe_path:{rmvpe_path}') |
|
for item in multi_cfg.get('models'): |
|
for key, model_info in item.items(): |
|
|
|
logger.info(f'Loading model: {key}') |
|
model_name = model_info.get("model_name") |
|
|
|
model_info_config_file = os.path.join(registry.get_path("rvc_library_root"), |
|
model_info.get("path"), |
|
'config.json') |
|
|
|
logger.info(f'Loading model model_info_config_file: {model_info_config_file}') |
|
model_info_config = json.load(open(model_info_config_file, 'r')) |
|
|
|
|
|
torch_file = os.path.join(registry.get_path("rvc_library_root"), |
|
model_info.get("path"), |
|
model_info_config['model']) |
|
cpt = torch.load( |
|
torch_file, |
|
map_location='cpu' |
|
) |
|
tgt_sr = cpt['config'][-1] |
|
cpt['config'][-3] = cpt['weight']['emb_g.weight'].shape[0] |
|
|
|
if_f0 = cpt.get('f0', 1) |
|
net_g: Union[SynthesizerTrnMs768NSFsid, SynthesizerTrnMs768NSFsid_nono] |
|
if if_f0 == 1: |
|
net_g = SynthesizerTrnMs768NSFsid( |
|
*cpt['config'], |
|
is_half=util.is_half(registry.get("device")) |
|
) |
|
else: |
|
net_g = SynthesizerTrnMs768NSFsid_nono(*cpt['config']) |
|
|
|
del net_g.enc_q |
|
|
|
|
|
logger.info(net_g.load_state_dict(cpt['weight'], strict=False)) |
|
|
|
net_g.eval().to(registry.get("device")) |
|
net_g = net_g.half() if util.is_half(registry.get("device")) else net_g.float() |
|
|
|
vc = VC(tgt_sr, |
|
registry.get("x_pad"), |
|
registry.get("x_query"), |
|
registry.get("x_center"), |
|
registry.get("x_max"), |
|
registry.get("is_half"), |
|
registry.get("device"), |
|
rmvpe_path=rmvpe_path |
|
) |
|
|
|
self._loaded_models.append(dict( |
|
name=model_name, |
|
metadata=model_info_config, |
|
vc=vc, |
|
net_g=net_g, |
|
if_f0=if_f0, |
|
target_sr=tgt_sr |
|
)) |
|
|
|
logger.info(f'Models loaded:rvc_speakers, len:{len(self._loaded_models)}') |
|
|
|
def vc_func( |
|
self, |
|
input_audio: Tuple[int, np.ndarray], model_index, f0_up_key, f0_method: str, index_rate, |
|
filter_radius, rms_mix_rate, resample_sr, protect: float = 0.33, f0_file: str = None |
|
) -> Tuple[int, np.ndarray]: |
|
""" |
|
# https://github.com/fumiama/Retrieval-based-Voice-Conversion-WebUI/blob/main/infer-web.py#L118 # noqa |
|
:param f0_up_key: 变调(整数, 半音数量, 升八度12降八度-12) |
|
:param input_audio: |
|
:param f0_file: F0曲线文件, 可选, 一行一个音高, 代替默认F0及升降调 |
|
:param protect: 保护清辅音和呼吸声,防止电音撕裂等artifact,拉满0.5不开启,调低加大保护力度但可能降低索引效果 |
|
:param model_index: |
|
:param f0_method: |
|
:param index_rate: 检索特征占比 |
|
:param filter_radius: >=3则使用对harvest音高识别的结果使用中值滤波,数值为滤波半径,使用可以削弱哑音 |
|
:param rms_mix_rate: 输入源音量包络替换输出音量包络融合比例,越靠近1越使用输出包络 |
|
:param resample_sr: 后处理重采样至最终采样率,0为不进行重采样 |
|
:return: |
|
""" |
|
if input_audio is None: |
|
raise RuntimeError("Please provide input audio.") |
|
|
|
if model_index is None: |
|
raise RuntimeError("Please select a model.") |
|
|
|
model = self._loaded_models[model_index] |
|
|
|
|
|
(audio_samp, audio_npy) = input_audio |
|
|
|
|
|
|
|
if (audio_npy.shape[0] / audio_samp) > 600 and self.in_hf_space: |
|
raise RuntimeError("Input audio is longer than 600 secs.") |
|
|
|
|
|
if audio_npy.dtype != np.float32: |
|
audio_npy = ( |
|
audio_npy / np.iinfo(audio_npy.dtype).max |
|
).astype(np.float32) |
|
|
|
if len(audio_npy.shape) > 1: |
|
audio_npy = librosa.to_mono(audio_npy.transpose(1, 0)) |
|
|
|
if audio_samp != 16000: |
|
audio_npy = librosa.resample( |
|
audio_npy, |
|
orig_sr=audio_samp, |
|
target_sr=16000 |
|
) |
|
|
|
f0_up_key = int(f0_up_key) |
|
times = [0, 0, 0] |
|
|
|
checksum = hashlib.sha512() |
|
checksum.update(audio_npy.tobytes()) |
|
feat_file_index = '' |
|
if ( |
|
model['metadata']['feat_index'] != "" |
|
|
|
|
|
and os.path.exists(model['metadata']['feat_index']) == True |
|
and index_rate != 0 |
|
): |
|
feat_file_index = model['metadata']['feat_index'] |
|
|
|
output_audio = model['vc'].pipeline( |
|
self.hubert_model, |
|
model['net_g'], |
|
model['metadata'].get('speaker_id', 0), |
|
audio_npy, |
|
checksum.hexdigest(), |
|
times, |
|
f0_up_key, |
|
f0_method, |
|
feat_file_index, |
|
index_rate, |
|
model['if_f0'], |
|
filter_radius, |
|
model['target_sr'], |
|
resample_sr, |
|
rms_mix_rate, |
|
'v2', |
|
protect, |
|
f0_file=f0_file |
|
) |
|
|
|
out_sr = ( |
|
resample_sr if 16000 <= resample_sr != model['target_sr'] |
|
else model['target_sr'] |
|
) |
|
|
|
logger.info(f'npy: {times[0]}s, f0: {times[1]}s, infer: {times[2]}s') |
|
return out_sr, output_audio |
|
|