Translate_Video_language / voice_main.py
r3gm's picture
v0.5.0
b152010
from soni_translate.logging_setup import logger
import torch
import gc
import numpy as np
import os
import shutil
import warnings
import threading
from tqdm import tqdm
from lib.infer_pack.models import (
SynthesizerTrnMs256NSFsid,
SynthesizerTrnMs256NSFsid_nono,
SynthesizerTrnMs768NSFsid,
SynthesizerTrnMs768NSFsid_nono,
)
from lib.audio import load_audio
import soundfile as sf
import edge_tts
import asyncio
from soni_translate.utils import remove_directory_contents, create_directories
from scipy import signal
from time import time as ttime
import faiss
from vci_pipeline import VC, change_rms, bh, ah
import librosa
warnings.filterwarnings("ignore")
class Config:
def __init__(self, only_cpu=False):
self.device = "cuda:0"
self.is_half = True
self.n_cpu = 0
self.gpu_name = None
self.gpu_mem = None
(
self.x_pad,
self.x_query,
self.x_center,
self.x_max
) = self.device_config(only_cpu)
def device_config(self, only_cpu) -> tuple:
if torch.cuda.is_available() and not only_cpu:
i_device = int(self.device.split(":")[-1])
self.gpu_name = torch.cuda.get_device_name(i_device)
if (
("16" in self.gpu_name and "V100" not in self.gpu_name.upper())
or "P40" in self.gpu_name.upper()
or "1060" in self.gpu_name
or "1070" in self.gpu_name
or "1080" in self.gpu_name
):
logger.info(
"16/10 Series GPUs and P40 excel "
"in single-precision tasks."
)
self.is_half = False
else:
self.gpu_name = None
self.gpu_mem = int(
torch.cuda.get_device_properties(i_device).total_memory
/ 1024
/ 1024
/ 1024
+ 0.4
)
elif torch.backends.mps.is_available() and not only_cpu:
logger.info("Supported N-card not found, using MPS for inference")
self.device = "mps"
else:
logger.info("No supported N-card found, using CPU for inference")
self.device = "cpu"
self.is_half = False
if self.n_cpu == 0:
self.n_cpu = os.cpu_count()
if self.is_half:
# 6GB VRAM configuration
x_pad = 3
x_query = 10
x_center = 60
x_max = 65
else:
# 5GB VRAM configuration
x_pad = 1
x_query = 6
x_center = 38
x_max = 41
if self.gpu_mem is not None and self.gpu_mem <= 4:
x_pad = 1
x_query = 5
x_center = 30
x_max = 32
logger.info(
f"Config: Device is {self.device}, "
f"half precision is {self.is_half}"
)
return x_pad, x_query, x_center, x_max
BASE_DOWNLOAD_LINK = "https://huggingface.co/r3gm/sonitranslate_voice_models/resolve/main/"
BASE_MODELS = [
"hubert_base.pt",
"rmvpe.pt"
]
BASE_DIR = "."
def load_hu_bert(config):
from fairseq import checkpoint_utils
from soni_translate.utils import download_manager
for id_model in BASE_MODELS:
download_manager(
os.path.join(BASE_DOWNLOAD_LINK, id_model), BASE_DIR
)
models, _, _ = checkpoint_utils.load_model_ensemble_and_task(
["hubert_base.pt"],
suffix="",
)
hubert_model = models[0]
hubert_model = hubert_model.to(config.device)
if config.is_half:
hubert_model = hubert_model.half()
else:
hubert_model = hubert_model.float()
hubert_model.eval()
return hubert_model
def load_trained_model(model_path, config):
if not model_path:
raise ValueError("No model found")
logger.info("Loading %s" % model_path)
cpt = torch.load(model_path, map_location="cpu")
tgt_sr = cpt["config"][-1]
cpt["config"][-3] = cpt["weight"]["emb_g.weight"].shape[0] # n_spk
if_f0 = cpt.get("f0", 1)
if if_f0 == 0:
# protect to 0.5 need?
pass
version = cpt.get("version", "v1")
if version == "v1":
if if_f0 == 1:
net_g = SynthesizerTrnMs256NSFsid(
*cpt["config"], is_half=config.is_half
)
else:
net_g = SynthesizerTrnMs256NSFsid_nono(*cpt["config"])
elif version == "v2":
if if_f0 == 1:
net_g = SynthesizerTrnMs768NSFsid(
*cpt["config"], is_half=config.is_half
)
else:
net_g = SynthesizerTrnMs768NSFsid_nono(*cpt["config"])
del net_g.enc_q
net_g.load_state_dict(cpt["weight"], strict=False)
net_g.eval().to(config.device)
if config.is_half:
net_g = net_g.half()
else:
net_g = net_g.float()
vc = VC(tgt_sr, config)
n_spk = cpt["config"][-3]
return n_spk, tgt_sr, net_g, vc, cpt, version
class ClassVoices:
def __init__(self, only_cpu=False):
self.model_config = {}
self.config = None
self.only_cpu = only_cpu
def apply_conf(
self,
tag="base_model",
file_model="",
pitch_algo="pm",
pitch_lvl=0,
file_index="",
index_influence=0.66,
respiration_median_filtering=3,
envelope_ratio=0.25,
consonant_breath_protection=0.33,
resample_sr=0,
file_pitch_algo="",
):
if not file_model:
raise ValueError("Model not found")
if file_index is None:
file_index = ""
if file_pitch_algo is None:
file_pitch_algo = ""
if not self.config:
self.config = Config(self.only_cpu)
self.hu_bert_model = None
self.model_pitch_estimator = None
self.model_config[tag] = {
"file_model": file_model,
"pitch_algo": pitch_algo,
"pitch_lvl": pitch_lvl, # no decimal
"file_index": file_index,
"index_influence": index_influence,
"respiration_median_filtering": respiration_median_filtering,
"envelope_ratio": envelope_ratio,
"consonant_breath_protection": consonant_breath_protection,
"resample_sr": resample_sr,
"file_pitch_algo": file_pitch_algo,
}
return f"CONFIGURATION APPLIED FOR {tag}: {file_model}"
def infer(
self,
task_id,
params,
# load model
n_spk,
tgt_sr,
net_g,
pipe,
cpt,
version,
if_f0,
# load index
index_rate,
index,
big_npy,
# load f0 file
inp_f0,
# audio file
input_audio_path,
overwrite,
):
f0_method = params["pitch_algo"]
f0_up_key = params["pitch_lvl"]
filter_radius = params["respiration_median_filtering"]
resample_sr = params["resample_sr"]
rms_mix_rate = params["envelope_ratio"]
protect = params["consonant_breath_protection"]
if not os.path.exists(input_audio_path):
raise ValueError(
"The audio file was not found or is not "
f"a valid file: {input_audio_path}"
)
f0_up_key = int(f0_up_key)
audio = load_audio(input_audio_path, 16000)
# Normalize audio
audio_max = np.abs(audio).max() / 0.95
if audio_max > 1:
audio /= audio_max
times = [0, 0, 0]
# filters audio signal, pads it, computes sliding window sums,
# and extracts optimized time indices
audio = signal.filtfilt(bh, ah, audio)
audio_pad = np.pad(
audio, (pipe.window // 2, pipe.window // 2), mode="reflect"
)
opt_ts = []
if audio_pad.shape[0] > pipe.t_max:
audio_sum = np.zeros_like(audio)
for i in range(pipe.window):
audio_sum += audio_pad[i:i - pipe.window]
for t in range(pipe.t_center, audio.shape[0], pipe.t_center):
opt_ts.append(
t
- pipe.t_query
+ np.where(
np.abs(audio_sum[t - pipe.t_query: t + pipe.t_query])
== np.abs(audio_sum[t - pipe.t_query: t + pipe.t_query]).min()
)[0][0]
)
s = 0
audio_opt = []
t = None
t1 = ttime()
sid_value = 0
sid = torch.tensor(sid_value, device=pipe.device).unsqueeze(0).long()
# Pads audio symmetrically, calculates length divided by window size.
audio_pad = np.pad(audio, (pipe.t_pad, pipe.t_pad), mode="reflect")
p_len = audio_pad.shape[0] // pipe.window
# Estimates pitch from audio signal
pitch, pitchf = None, None
if if_f0 == 1:
pitch, pitchf = pipe.get_f0(
input_audio_path,
audio_pad,
p_len,
f0_up_key,
f0_method,
filter_radius,
inp_f0,
)
pitch = pitch[:p_len]
pitchf = pitchf[:p_len]
if pipe.device == "mps":
pitchf = pitchf.astype(np.float32)
pitch = torch.tensor(
pitch, device=pipe.device
).unsqueeze(0).long()
pitchf = torch.tensor(
pitchf, device=pipe.device
).unsqueeze(0).float()
t2 = ttime()
times[1] += t2 - t1
for t in opt_ts:
t = t // pipe.window * pipe.window
if if_f0 == 1:
pitch_slice = pitch[
:, s // pipe.window: (t + pipe.t_pad2) // pipe.window
]
pitchf_slice = pitchf[
:, s // pipe.window: (t + pipe.t_pad2) // pipe.window
]
else:
pitch_slice = None
pitchf_slice = None
audio_slice = audio_pad[s:t + pipe.t_pad2 + pipe.window]
audio_opt.append(
pipe.vc(
self.hu_bert_model,
net_g,
sid,
audio_slice,
pitch_slice,
pitchf_slice,
times,
index,
big_npy,
index_rate,
version,
protect,
)[pipe.t_pad_tgt:-pipe.t_pad_tgt]
)
s = t
pitch_end_slice = pitch[
:, t // pipe.window:
] if t is not None else pitch
pitchf_end_slice = pitchf[
:, t // pipe.window:
] if t is not None else pitchf
audio_opt.append(
pipe.vc(
self.hu_bert_model,
net_g,
sid,
audio_pad[t:],
pitch_end_slice,
pitchf_end_slice,
times,
index,
big_npy,
index_rate,
version,
protect,
)[pipe.t_pad_tgt:-pipe.t_pad_tgt]
)
audio_opt = np.concatenate(audio_opt)
if rms_mix_rate != 1:
audio_opt = change_rms(
audio, 16000, audio_opt, tgt_sr, rms_mix_rate
)
if resample_sr >= 16000 and tgt_sr != resample_sr:
audio_opt = librosa.resample(
audio_opt, orig_sr=tgt_sr, target_sr=resample_sr
)
audio_max = np.abs(audio_opt).max() / 0.99
max_int16 = 32768
if audio_max > 1:
max_int16 /= audio_max
audio_opt = (audio_opt * max_int16).astype(np.int16)
del pitch, pitchf, sid
if torch.cuda.is_available():
torch.cuda.empty_cache()
if tgt_sr != resample_sr >= 16000:
final_sr = resample_sr
else:
final_sr = tgt_sr
"""
"Success.\n %s\nTime:\n npy:%ss, f0:%ss, infer:%ss" % (
times[0],
times[1],
times[2],
), (final_sr, audio_opt)
"""
if overwrite:
output_audio_path = input_audio_path # Overwrite
else:
basename = os.path.basename(input_audio_path)
dirname = os.path.dirname(input_audio_path)
new_basename = basename.split(
'.')[0] + "_edited." + basename.split('.')[-1]
new_path = os.path.join(dirname, new_basename)
logger.info(str(new_path))
output_audio_path = new_path
# Save file
sf.write(
file=output_audio_path,
samplerate=final_sr,
data=audio_opt
)
self.model_config[task_id]["result"].append(output_audio_path)
self.output_list.append(output_audio_path)
def make_test(
self,
tts_text,
tts_voice,
model_path,
index_path,
transpose,
f0_method,
):
folder_test = "test"
tag = "test_edge"
tts_file = "test/test.wav"
tts_edited = "test/test_edited.wav"
create_directories(folder_test)
remove_directory_contents(folder_test)
if "SET_LIMIT" == os.getenv("DEMO"):
if len(tts_text) > 60:
tts_text = tts_text[:60]
logger.warning("DEMO; limit to 60 characters")
try:
asyncio.run(edge_tts.Communicate(
tts_text, "-".join(tts_voice.split('-')[:-1])
).save(tts_file))
except Exception as e:
raise ValueError(
"No audio was received. Please change the "
f"tts voice for {tts_voice}. Error: {str(e)}"
)
shutil.copy(tts_file, tts_edited)
self.apply_conf(
tag=tag,
file_model=model_path,
pitch_algo=f0_method,
pitch_lvl=transpose,
file_index=index_path,
index_influence=0.66,
respiration_median_filtering=3,
envelope_ratio=0.25,
consonant_breath_protection=0.33,
)
self(
audio_files=tts_edited,
tag_list=tag,
overwrite=True
)
return tts_edited, tts_file
def run_threads(self, threads):
# Start threads
for thread in threads:
thread.start()
# Wait for all threads to finish
for thread in threads:
thread.join()
gc.collect()
torch.cuda.empty_cache()
def unload_models(self):
self.hu_bert_model = None
self.model_pitch_estimator = None
gc.collect()
torch.cuda.empty_cache()
def __call__(
self,
audio_files=[],
tag_list=[],
overwrite=False,
parallel_workers=1,
):
logger.info(f"Parallel workers: {str(parallel_workers)}")
self.output_list = []
if not self.model_config:
raise ValueError("No model has been configured for inference")
if isinstance(audio_files, str):
audio_files = [audio_files]
if isinstance(tag_list, str):
tag_list = [tag_list]
if not audio_files:
raise ValueError("No audio found to convert")
if not tag_list:
tag_list = [list(self.model_config.keys())[-1]] * len(audio_files)
if len(audio_files) > len(tag_list):
logger.info("Extend tag list to match audio files")
extend_number = len(audio_files) - len(tag_list)
tag_list.extend([tag_list[0]] * extend_number)
if len(audio_files) < len(tag_list):
logger.info("Cut list tags")
tag_list = tag_list[:len(audio_files)]
tag_file_pairs = list(zip(tag_list, audio_files))
sorted_tag_file = sorted(tag_file_pairs, key=lambda x: x[0])
# Base params
if not self.hu_bert_model:
self.hu_bert_model = load_hu_bert(self.config)
cache_params = None
threads = []
progress_bar = tqdm(total=len(tag_list), desc="Progress")
for i, (id_tag, input_audio_path) in enumerate(sorted_tag_file):
if id_tag not in self.model_config.keys():
logger.info(
f"No configured model for {id_tag} with {input_audio_path}"
)
continue
if (
len(threads) >= parallel_workers
or cache_params != id_tag
and cache_params is not None
):
self.run_threads(threads)
progress_bar.update(len(threads))
threads = []
if cache_params != id_tag:
self.model_config[id_tag]["result"] = []
# Unload previous
(
n_spk,
tgt_sr,
net_g,
pipe,
cpt,
version,
if_f0,
index_rate,
index,
big_npy,
inp_f0,
) = [None] * 11
gc.collect()
torch.cuda.empty_cache()
# Model params
params = self.model_config[id_tag]
model_path = params["file_model"]
f0_method = params["pitch_algo"]
file_index = params["file_index"]
index_rate = params["index_influence"]
f0_file = params["file_pitch_algo"]
# Load model
(
n_spk,
tgt_sr,
net_g,
pipe,
cpt,
version
) = load_trained_model(model_path, self.config)
if_f0 = cpt.get("f0", 1) # pitch data
# Load index
if os.path.exists(file_index) and index_rate != 0:
try:
index = faiss.read_index(file_index)
big_npy = index.reconstruct_n(0, index.ntotal)
except Exception as error:
logger.error(f"Index: {str(error)}")
index_rate = 0
index = big_npy = None
else:
logger.warning("File index not found")
index_rate = 0
index = big_npy = None
# Load f0 file
inp_f0 = None
if os.path.exists(f0_file):
try:
with open(f0_file, "r") as f:
lines = f.read().strip("\n").split("\n")
inp_f0 = []
for line in lines:
inp_f0.append([float(i) for i in line.split(",")])
inp_f0 = np.array(inp_f0, dtype="float32")
except Exception as error:
logger.error(f"f0 file: {str(error)}")
if "rmvpe" in f0_method:
if not self.model_pitch_estimator:
from lib.rmvpe import RMVPE
logger.info("Loading vocal pitch estimator model")
self.model_pitch_estimator = RMVPE(
"rmvpe.pt",
is_half=self.config.is_half,
device=self.config.device
)
pipe.model_rmvpe = self.model_pitch_estimator
cache_params = id_tag
# self.infer(
# id_tag,
# params,
# # load model
# n_spk,
# tgt_sr,
# net_g,
# pipe,
# cpt,
# version,
# if_f0,
# # load index
# index_rate,
# index,
# big_npy,
# # load f0 file
# inp_f0,
# # output file
# input_audio_path,
# overwrite,
# )
thread = threading.Thread(
target=self.infer,
args=(
id_tag,
params,
# loaded model
n_spk,
tgt_sr,
net_g,
pipe,
cpt,
version,
if_f0,
# loaded index
index_rate,
index,
big_npy,
# loaded f0 file
inp_f0,
# audio file
input_audio_path,
overwrite,
)
)
threads.append(thread)
# Run last
if threads:
self.run_threads(threads)
progress_bar.update(len(threads))
progress_bar.close()
final_result = []
valid_tags = set(tag_list)
for tag in valid_tags:
if (
tag in self.model_config.keys()
and "result" in self.model_config[tag].keys()
):
final_result.extend(self.model_config[tag]["result"])
return final_result