|
import os |
|
import glob |
|
import json |
|
import traceback |
|
import logging |
|
import gradio as gr |
|
import numpy as np |
|
import librosa |
|
import torch |
|
import asyncio |
|
import edge_tts |
|
import yt_dlp |
|
import ffmpeg |
|
import subprocess |
|
import sys |
|
import io |
|
import wave |
|
from datetime import datetime |
|
from fairseq import checkpoint_utils |
|
from infer_pack.models import SynthesizerTrnMs256NSFsid, SynthesizerTrnMs256NSFsid_nono |
|
from vc_infer_pipeline import VC |
|
from config import Config |
|
config = Config() |
|
logging.getLogger("numba").setLevel(logging.WARNING) |
|
limitation = os.getenv("SYSTEM") == "spaces" |
|
|
|
def create_vc_fn(tgt_sr, net_g, vc, if_f0, file_index): |
|
def vc_fn( |
|
input_audio, |
|
upload_audio, |
|
upload_mode, |
|
f0_up_key, |
|
f0_method, |
|
index_rate, |
|
tts_mode, |
|
tts_text, |
|
tts_voice |
|
): |
|
try: |
|
if tts_mode: |
|
if len(tts_text) > 100 and limitation: |
|
return "Text is too long", None |
|
if tts_text is None or tts_voice is None: |
|
return "You need to enter text and select a voice", None |
|
asyncio.run(edge_tts.Communicate(tts_text, "-".join(tts_voice.split('-')[:-1])).save("tts.mp3")) |
|
audio, sr = librosa.load("tts.mp3", sr=16000, mono=True) |
|
else: |
|
if upload_mode: |
|
if input_audio is None: |
|
return "You need to upload an audio", None |
|
sampling_rate, audio = upload_audio |
|
duration = audio.shape[0] / sampling_rate |
|
audio = (audio / np.iinfo(audio.dtype).max).astype(np.float32) |
|
if len(audio.shape) > 1: |
|
audio = librosa.to_mono(audio.transpose(1, 0)) |
|
if sampling_rate != 16000: |
|
audio = librosa.resample(audio, orig_sr=sampling_rate, target_sr=16000) |
|
else: |
|
audio, sr = librosa.load(input_audio, sr=16000, mono=True) |
|
times = [0, 0, 0] |
|
f0_up_key = int(f0_up_key) |
|
audio_opt = vc.pipeline( |
|
hubert_model, |
|
net_g, |
|
0, |
|
audio, |
|
times, |
|
f0_up_key, |
|
f0_method, |
|
file_index, |
|
index_rate, |
|
if_f0, |
|
f0_file=None, |
|
) |
|
print( |
|
f"[{datetime.now().strftime('%Y-%m-%d %H:%M')}]: npy: {times[0]}, f0: {times[1]}s, infer: {times[2]}s" |
|
) |
|
return "Success", (tgt_sr, audio_opt) |
|
except: |
|
info = traceback.format_exc() |
|
print(info) |
|
return info, (None, None) |
|
return vc_fn |
|
|
|
def cut_vocal_and_inst(yt_url): |
|
if yt_url != "": |
|
if not os.path.exists("youtube_audio"): |
|
os.mkdir("youtube_audio") |
|
ydl_opts = { |
|
'format': 'bestaudio/best', |
|
'postprocessors': [{ |
|
'key': 'FFmpegExtractAudio', |
|
'preferredcodec': 'wav', |
|
}], |
|
"outtmpl": 'youtube_audio/audio', |
|
} |
|
with yt_dlp.YoutubeDL(ydl_opts) as ydl: |
|
ydl.download([yt_url]) |
|
yt_audio_path = "youtube_audio/audio.wav" |
|
command = f"demucs --two-stems=vocals {yt_audio_path}" |
|
result = subprocess.run(command.split(), stdout=subprocess.PIPE) |
|
print(result.stdout.decode()) |
|
return ("separated/htdemucs/audio/vocals.wav", "separated/htdemucs/audio/no_vocals.wav", yt_audio_path, "separated/htdemucs/audio/vocals.wav") |
|
|
|
def combine_vocal_and_inst(audio_data, audio_volume): |
|
print(audio_data) |
|
if not os.path.exists("result"): |
|
os.mkdir("result") |
|
vocal_path = "result/output.wav" |
|
inst_path = "separated/htdemucs/audio/no_vocals.wav" |
|
output_path = "result/combine.mp3" |
|
with wave.open(vocal_path, "w") as wave_file: |
|
wave_file.setnchannels(1) |
|
wave_file.setsampwidth(2) |
|
wave_file.setframerate(audio_data[0]) |
|
wave_file.writeframes(audio_data[1].tobytes()) |
|
command = f'ffmpeg -y -i {inst_path} -i {vocal_path} -filter_complex [1:a]volume={audio_volume}dB[v];[0:a][v]amix=inputs=2:duration=longest -b:a 320k -c:a libmp3lame {output_path}' |
|
result = subprocess.run(command.split(), stdout=subprocess.PIPE) |
|
return output_path |
|
|
|
def load_hubert(): |
|
global hubert_model |
|
models, _, _ = checkpoint_utils.load_model_ensemble_and_task( |
|
["hubert_base.pt"], |
|
suffix="", |
|
) |
|
hubert_model = models[0] |
|
hubert_model = hubert_model.to(config.device) |
|
if config.is_half: |
|
hubert_model = hubert_model.half() |
|
else: |
|
hubert_model = hubert_model.float() |
|
hubert_model.eval() |
|
|
|
def change_to_tts_mode(tts_mode, upload_mode): |
|
if tts_mode: |
|
return gr.Textbox.update(visible=False), gr.Audio.update(visible=False), gr.Checkbox.update(visible=False), gr.Textbox.update(visible=True), gr.Dropdown.update(visible=True) |
|
else: |
|
if upload_mode: |
|
return gr.Textbox.update(visible=False), gr.Audio.update(visible=True), gr.Checkbox.update(visible=True), gr.Textbox.update(visible=False), gr.Dropdown.update(visible=False) |
|
else: |
|
return gr.Textbox.update(visible=True), gr.Audio.update(visible=False), gr.Checkbox.update(visible=True), gr.Textbox.update(visible=False), gr.Dropdown.update(visible=False) |
|
|
|
def change_to_upload_mode(upload_mode): |
|
if upload_mode: |
|
return gr.Textbox().update(visible=False), gr.Audio().update(visible=True) |
|
else: |
|
return gr.Textbox().update(visible=True), gr.Audio().update(visible=False) |
|
|
|
if __name__ == '__main__': |
|
load_hubert() |
|
models = [] |
|
tts_voice_list = asyncio.get_event_loop().run_until_complete(edge_tts.list_voices()) |
|
voices = [f"{v['ShortName']}-{v['Gender']}" for v in tts_voice_list] |
|
if config.json: |
|
with open("weights/model_info.json", "r", encoding="utf-8") as f: |
|
models_info = json.load(f) |
|
for name, info in models_info.items(): |
|
if not info['enable']: |
|
continue |
|
title = info['title'] |
|
author = info.get("author", None) |
|
cover = f"weights/{name}/{info['cover']}" |
|
index = f"weights/{name}/{info['feature_retrieval_library']}" |
|
cpt = torch.load(f"weights/{name}/{name}.pth", map_location="cpu") |
|
tgt_sr = cpt["config"][-1] |
|
cpt["config"][-3] = cpt["weight"]["emb_g.weight"].shape[0] |
|
if_f0 = cpt.get("f0", 1) |
|
if if_f0 == 1: |
|
net_g = SynthesizerTrnMs256NSFsid(*cpt["config"], is_half=config.is_half) |
|
else: |
|
net_g = SynthesizerTrnMs256NSFsid_nono(*cpt["config"]) |
|
del net_g.enc_q |
|
print(net_g.load_state_dict(cpt["weight"], strict=False)) |
|
net_g.eval().to(config.device) |
|
if config.is_half: |
|
net_g = net_g.half() |
|
else: |
|
net_g = net_g.float() |
|
vc = VC(tgt_sr, config) |
|
models.append((name, title, author, cover, create_vc_fn(tgt_sr, net_g, vc, if_f0, index))) |
|
else: |
|
folder_path = "weights" |
|
for name in os.listdir(folder_path): |
|
print("check folder: " + name) |
|
if name.startswith("."): break |
|
cover_path = glob.glob(f"{folder_path}/{name}/*.png") + glob.glob(f"{folder_path}/{name}/*.jpg") |
|
index_path = glob.glob(f"{folder_path}/{name}/*.index") |
|
checkpoint_path = glob.glob(f"{folder_path}/{name}/*.pth") |
|
title = name |
|
author = "" |
|
if cover_path: |
|
cover = cover_path[0] |
|
else: |
|
cover = "" |
|
index = index_path[0] |
|
cpt = torch.load(checkpoint_path[0], map_location="cpu") |
|
tgt_sr = cpt["config"][-1] |
|
cpt["config"][-3] = cpt["weight"]["emb_g.weight"].shape[0] |
|
if_f0 = cpt.get("f0", 1) |
|
if if_f0 == 1: |
|
net_g = SynthesizerTrnMs256NSFsid(*cpt["config"], is_half=config.is_half) |
|
else: |
|
net_g = SynthesizerTrnMs256NSFsid_nono(*cpt["config"]) |
|
del net_g.enc_q |
|
print(net_g.load_state_dict(cpt["weight"], strict=False)) |
|
net_g.eval().to(config.device) |
|
if config.is_half: |
|
net_g = net_g.half() |
|
else: |
|
net_g = net_g.float() |
|
vc = VC(tgt_sr, config) |
|
models.append((name, title, author, cover, create_vc_fn(tgt_sr, net_g, vc, if_f0, index))) |
|
with gr.Blocks() as app: |
|
gr.Markdown( |
|
"# <center> RVC Models [(Latest Update)](https://github.com/RVC-Project/Retrieval-based-Voice-Conversion-WebUI/releases/tag/20230428updated)\n" |
|
"## <center> The input audio should be clean and pure voice without background music.\n" |
|
"### <center> More feature will be added soon... \n" |
|
"#### <center> Please regenerate your model to latest RVC to fully applied this new rvc.\n" |
|
"[![image](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/drive/110kiMZTdP6Ri1lY9-NbQf17GVPPhHyeT?usp=sharing)\n\n" |
|
"[![Original Repo](https://badgen.net/badge/icon/github?icon=github&label=Original%20Repo)](https://github.com/RVC-Project/Retrieval-based-Voice-Conversion-WebUI)" |
|
) |
|
with gr.Tabs(): |
|
for (name, title, author, cover, vc_fn) in models: |
|
with gr.TabItem(name): |
|
with gr.Row(): |
|
gr.Markdown( |
|
'<div align="center">' |
|
f'<div>{title}</div>\n'+ |
|
(f'<div>Model author: {author}</div>' if author else "")+ |
|
(f'<img style="width:auto;height:300px;" src="file/{cover}">' if cover else "")+ |
|
'</div>' |
|
) |
|
with gr.Row(): |
|
with gr.Column(): |
|
vc_youtube = gr.Textbox(label="Youtube URL") |
|
vc_convert = gr.Button("Convert", variant="primary") |
|
vc_vocal_preview = gr.Audio(label="Vocal Preview") |
|
vc_inst_preview = gr.Audio(label="Instrumental Preview") |
|
vc_audio_preview = gr.Audio(label="Audio Preview") |
|
with gr.Column(): |
|
vc_input = gr.Textbox(label="Input audio path") |
|
vc_upload = gr.Audio(label="Upload audio file", visible=False, interactive=True) |
|
upload_mode = gr.Checkbox(label="Upload mode", value=False) |
|
vc_transpose = gr.Number(label="Transpose", value=0) |
|
vc_f0method = gr.Radio( |
|
label="Pitch extraction algorithm, PM is fast but Harvest is better for low frequencies", |
|
choices=["pm", "harvest"], |
|
value="pm", |
|
interactive=True, |
|
) |
|
vc_index_ratio = gr.Slider( |
|
minimum=0, |
|
maximum=1, |
|
label="Retrieval feature ratio", |
|
value=0.6, |
|
interactive=True, |
|
) |
|
tts_mode = gr.Checkbox(label="tts (use edge-tts as input)", value=False) |
|
tts_text = gr.Textbox(visible=False,label="TTS text (100 words limitation)" if limitation else "TTS text") |
|
tts_voice = gr.Dropdown(label="Edge-tts speaker", choices=voices, visible=False, allow_custom_value=False, value="en-US-AnaNeural-Female") |
|
vc_output1 = gr.Textbox(label="Output Message") |
|
vc_output2 = gr.Audio(label="Output Audio") |
|
vc_submit = gr.Button("Generate", variant="primary") |
|
with gr.Column(): |
|
vc_volume = gr.Slider( |
|
minimum=0, |
|
maximum=10, |
|
label="Vocal volume", |
|
value=4, |
|
interactive=True, |
|
step=1 |
|
) |
|
vc_outputCombine = gr.Audio(label="Output Combined Audio") |
|
vc_combine = gr.Button("Combine",variant="primary") |
|
vc_submit.click(vc_fn, [vc_input, vc_upload, upload_mode, vc_transpose, vc_f0method, vc_index_ratio, tts_mode, tts_text, tts_voice], [vc_output1, vc_output2]) |
|
vc_convert.click(cut_vocal_and_inst, vc_youtube, [vc_vocal_preview, vc_inst_preview, vc_audio_preview, vc_input]) |
|
vc_combine.click(combine_vocal_and_inst, [vc_output2, vc_volume], vc_outputCombine) |
|
tts_mode.change(change_to_tts_mode, [tts_mode, upload_mode], [vc_input, vc_upload, upload_mode, tts_text, tts_voice]) |
|
upload_mode.change(change_to_upload_mode, [upload_mode], [vc_input, vc_upload]) |
|
app.queue(concurrency_count=1, max_size=20, api_open=config.api).launch(share=config.colab) |