import os import glob import json import traceback import logging import gradio as gr import numpy as np import librosa import torch import asyncio import edge_tts import yt_dlp import ffmpeg import subprocess import sys import io import wave from datetime import datetime from fairseq import checkpoint_utils from lib.infer_pack.models import ( SynthesizerTrnMs256NSFsid, SynthesizerTrnMs256NSFsid_nono, SynthesizerTrnMs768NSFsid, SynthesizerTrnMs768NSFsid_nono, ) from vc_infer_pipeline import VC from config import Config config = Config() logging.getLogger("numba").setLevel(logging.WARNING) spaces = os.getenv("SYSTEM") == "spaces" force_support = None if config.unsupported is False: if config.device == "mps" or config.device == "cpu": force_support = False else: force_support = True audio_mode = [] f0method_mode = [] f0method_info = "" if force_support is False or spaces is True: if spaces is True: audio_mode = ["Upload audio", "TTS Audio"] else: audio_mode = ["Input path", "Upload audio", "TTS Audio"] f0method_mode = ["pm", "harvest"] f0method_info = "PM is fast, Harvest is good but extremely slow, Rvmpe is alternative to harvest (might be better). (Default: PM)" else: audio_mode = ["Input path", "Upload audio", "Youtube", "TTS Audio"] f0method_mode = ["pm", "harvest", "crepe"] f0method_info = "PM is fast, Harvest is good but extremely slow, Rvmpe is alternative to harvest (might be better), and Crepe effect is good but requires GPU (Default: PM)" if os.path.isfile("rmvpe.pt"): f0method_mode.insert(2, "rmvpe") def create_vc_fn(model_name, tgt_sr, net_g, vc, if_f0, version, file_index): def vc_fn( vc_audio_mode, vc_input, vc_upload, tts_text, tts_voice, f0_up_key, f0_method, index_rate, filter_radius, resample_sr, rms_mix_rate, protect, ): try: logs = [] print(f"Converting using {model_name}...") logs.append(f"Converting using {model_name}...") yield "\n".join(logs), None if vc_audio_mode == "Input path" or "Youtube" and vc_input != "": audio, sr = librosa.load(vc_input, sr=16000, mono=True) elif vc_audio_mode == "Upload audio": if vc_upload is None: return "You need to upload an audio", None sampling_rate, audio = vc_upload duration = audio.shape[0] / sampling_rate if duration > 20 and spaces: return "Please upload an audio file that is less than 20 seconds. If you need to generate a longer audio file, please use Colab.", None audio = (audio / np.iinfo(audio.dtype).max).astype(np.float32) if len(audio.shape) > 1: audio = librosa.to_mono(audio.transpose(1, 0)) if sampling_rate != 16000: audio = librosa.resample(audio, orig_sr=sampling_rate, target_sr=16000) elif vc_audio_mode == "TTS Audio": if len(tts_text) > 100 and spaces: return "Text is too long", None if tts_text is None or tts_voice is None: return "You need to enter text and select a voice", None asyncio.run(edge_tts.Communicate(tts_text, "-".join(tts_voice.split('-')[:-1])).save("tts.mp3")) audio, sr = librosa.load("tts.mp3", sr=16000, mono=True) vc_input = "tts.mp3" times = [0, 0, 0] f0_up_key = int(f0_up_key) audio_opt = vc.pipeline( hubert_model, net_g, 0, audio, vc_input, times, f0_up_key, f0_method, file_index, # file_big_npy, index_rate, if_f0, filter_radius, tgt_sr, resample_sr, rms_mix_rate, version, protect, f0_file=None, ) info = f"[{datetime.now().strftime('%Y-%m-%d %H:%M')}]: npy: {times[0]}, f0: {times[1]}s, infer: {times[2]}s" print(f"{model_name} | {info}") logs.append(f"Successfully Convert {model_name}\n{info}") yield "\n".join(logs), (tgt_sr, audio_opt) except: info = traceback.format_exc() print(info) yield info, None return vc_fn def load_model(): categories = [] if os.path.isfile("weights/folder_info.json"): for _, w_dirs, _ in os.walk(f"weights"): category_count_total = len(w_dirs) category_count = 1 with open("weights/folder_info.json", "r", encoding="utf-8") as f: folder_info = json.load(f) for category_name, category_info in folder_info.items(): if not category_info['enable']: continue category_title = category_info['title'] category_folder = category_info['folder_path'] description = category_info['description'] print(f"Load {category_title} [{category_count}/{category_count_total}]") models = [] for _, m_dirs, _ in os.walk(f"weights/{category_folder}"): model_count_total = len(m_dirs) model_count = 1 with open(f"weights/{category_folder}/model_info.json", "r", encoding="utf-8") as f: models_info = json.load(f) for character_name, info in models_info.items(): if not info['enable']: continue model_title = info['title'] model_name = info['model_path'] model_author = info.get("author", None) model_cover = f"weights/{category_folder}/{character_name}/{info['cover']}" model_index = f"weights/{category_folder}/{character_name}/{info['feature_retrieval_library']}" cpt = torch.load(f"weights/{category_folder}/{character_name}/{model_name}", map_location="cpu") tgt_sr = cpt["config"][-1] cpt["config"][-3] = cpt["weight"]["emb_g.weight"].shape[0] # n_spk if_f0 = cpt.get("f0", 1) version = cpt.get("version", "v1") if version == "v1": if if_f0 == 1: net_g = SynthesizerTrnMs256NSFsid(*cpt["config"], is_half=config.is_half) else: net_g = SynthesizerTrnMs256NSFsid_nono(*cpt["config"]) model_version = "V1" elif version == "v2": if if_f0 == 1: net_g = SynthesizerTrnMs768NSFsid(*cpt["config"], is_half=config.is_half) else: net_g = SynthesizerTrnMs768NSFsid_nono(*cpt["config"]) model_version = "V2" del net_g.enc_q print(net_g.load_state_dict(cpt["weight"], strict=False)) net_g.eval().to(config.device) if config.is_half: net_g = net_g.half() else: net_g = net_g.float() vc = VC(tgt_sr, config) print(f"Model loaded [{model_count}/{model_count_total}]: {character_name} / {info['feature_retrieval_library']} | ({model_version})") model_count += 1 models.append((character_name, model_title, model_author, model_cover, model_version, create_vc_fn(model_name, tgt_sr, net_g, vc, if_f0, version, model_index))) category_count += 1 categories.append([category_title, description, models]) elif os.path.exists("weights"): models = [] for w_root, w_dirs, _ in os.walk("weights"): model_count = 1 for sub_dir in w_dirs: pth_files = glob.glob(f"weights/{sub_dir}/*.pth") index_files = glob.glob(f"weights/{sub_dir}/*.index") if pth_files == []: print(f"Model [{model_count}/{len(w_dirs)}]: No Model file detected, skipping...") continue cpt = torch.load(pth_files[0]) tgt_sr = cpt["config"][-1] cpt["config"][-3] = cpt["weight"]["emb_g.weight"].shape[0] # n_spk if_f0 = cpt.get("f0", 1) version = cpt.get("version", "v1") if version == "v1": if if_f0 == 1: net_g = SynthesizerTrnMs256NSFsid(*cpt["config"], is_half=config.is_half) else: net_g = SynthesizerTrnMs256NSFsid_nono(*cpt["config"]) model_version = "V1" elif version == "v2": if if_f0 == 1: net_g = SynthesizerTrnMs768NSFsid(*cpt["config"], is_half=config.is_half) else: net_g = SynthesizerTrnMs768NSFsid_nono(*cpt["config"]) model_version = "V2" del net_g.enc_q print(net_g.load_state_dict(cpt["weight"], strict=False)) net_g.eval().to(config.device) if config.is_half: net_g = net_g.half() else: net_g = net_g.float() vc = VC(tgt_sr, config) if index_files == []: print("Warning: No Index file detected!") index_info = "None" model_index = "" else: index_info = index_files[0] model_index = index_files[0] print(f"Model loaded [{model_count}/{len(w_dirs)}]: {index_files[0]} / {index_info} | ({model_version})") model_count += 1 models.append((index_files[0][:-4], index_files[0][:-4], "", "", model_version, create_vc_fn(index_files[0], tgt_sr, net_g, vc, if_f0, version, model_index))) categories.append(["Models", "", models]) else: categories = [] return categories def download_audio(url, audio_provider): logs = [] if url == "": logs.append("URL required!") yield None, "\n".join(logs) return None, "\n".join(logs) if not os.path.exists("dl_audio"): os.mkdir("dl_audio") if audio_provider == "Youtube": logs.append("Downloading the audio...") yield None, "\n".join(logs) ydl_opts = { 'noplaylist': True, 'format': 'bestaudio/best', 'postprocessors': [{ 'key': 'FFmpegExtractAudio', 'preferredcodec': 'wav', }], "outtmpl": 'dl_audio/audio', } audio_path = "dl_audio/audio.wav" with yt_dlp.YoutubeDL(ydl_opts) as ydl: ydl.download([url]) logs.append("Download Complete.") yield audio_path, "\n".join(logs) def cut_vocal_and_inst(split_model): logs = [] logs.append("Starting the audio splitting process...") yield "\n".join(logs), None, None, None command = f"demucs --two-stems=vocals -n {split_model} dl_audio/audio.wav -o output" result = subprocess.Popen(command.split(), stdout=subprocess.PIPE, text=True) for line in result.stdout: logs.append(line) yield "\n".join(logs), None, None, None print(result.stdout) vocal = f"output/{split_model}/audio/vocals.wav" inst = f"output/{split_model}/audio/no_vocals.wav" logs.append("Audio splitting complete.") yield "\n".join(logs), vocal, inst, vocal def combine_vocal_and_inst(audio_data, vocal_volume, inst_volume, split_model): if not os.path.exists("output/result"): os.mkdir("output/result") vocal_path = "output/result/output.wav" output_path = "output/result/combine.mp3" inst_path = f"output/{split_model}/audio/no_vocals.wav" with wave.open(vocal_path, "w") as wave_file: wave_file.setnchannels(1) wave_file.setsampwidth(2) wave_file.setframerate(audio_data[0]) wave_file.writeframes(audio_data[1].tobytes()) command = f'ffmpeg -y -i {inst_path} -i {vocal_path} -filter_complex [0:a]volume={inst_volume}[i];[1:a]volume={vocal_volume}[v];[i][v]amix=inputs=2:duration=longest[a] -map [a] -b:a 320k -c:a libmp3lame {output_path}' result = subprocess.run(command.split(), stdout=subprocess.PIPE) print(result.stdout.decode()) return output_path def load_hubert(): global hubert_model models, _, _ = checkpoint_utils.load_model_ensemble_and_task( ["hubert_base.pt"], suffix="", ) hubert_model = models[0] hubert_model = hubert_model.to(config.device) if config.is_half: hubert_model = hubert_model.half() else: hubert_model = hubert_model.float() hubert_model.eval() def change_audio_mode(vc_audio_mode): if vc_audio_mode == "Input path": return ( # Input & Upload gr.Textbox.update(visible=True), gr.Checkbox.update(visible=False), gr.Audio.update(visible=False), # Youtube gr.Dropdown.update(visible=False), gr.Textbox.update(visible=False), gr.Textbox.update(visible=False), gr.Button.update(visible=False), # Splitter gr.Dropdown.update(visible=False), gr.Textbox.update(visible=False), gr.Button.update(visible=False), gr.Audio.update(visible=False), gr.Audio.update(visible=False), gr.Audio.update(visible=False), gr.Slider.update(visible=False), gr.Slider.update(visible=False), gr.Audio.update(visible=False), gr.Button.update(visible=False), # TTS gr.Textbox.update(visible=False), gr.Dropdown.update(visible=False) ) elif vc_audio_mode == "Upload audio": return ( # Input & Upload gr.Textbox.update(visible=False), gr.Checkbox.update(visible=True), gr.Audio.update(visible=True), # Youtube gr.Dropdown.update(visible=False), gr.Textbox.update(visible=False), gr.Textbox.update(visible=False), gr.Button.update(visible=False), # Splitter gr.Dropdown.update(visible=False), gr.Textbox.update(visible=False), gr.Button.update(visible=False), gr.Audio.update(visible=False), gr.Audio.update(visible=False), gr.Audio.update(visible=False), gr.Slider.update(visible=False), gr.Slider.update(visible=False), gr.Audio.update(visible=False), gr.Button.update(visible=False), # TTS gr.Textbox.update(visible=False), gr.Dropdown.update(visible=False) ) elif vc_audio_mode == "Youtube": return ( # Input & Upload gr.Textbox.update(visible=False), gr.Checkbox.update(visible=False), gr.Audio.update(visible=False), # Youtube gr.Dropdown.update(visible=True), gr.Textbox.update(visible=True), gr.Textbox.update(visible=True), gr.Button.update(visible=True), # Splitter gr.Dropdown.update(visible=True), gr.Textbox.update(visible=True), gr.Button.update(visible=True), gr.Audio.update(visible=True), gr.Audio.update(visible=True), gr.Audio.update(visible=True), gr.Slider.update(visible=True), gr.Slider.update(visible=True), gr.Audio.update(visible=True), gr.Button.update(visible=True), # TTS gr.Textbox.update(visible=False), gr.Dropdown.update(visible=False) ) elif vc_audio_mode == "TTS Audio": return ( # Input & Upload gr.Textbox.update(visible=False), gr.Checkbox.update(visible=False), gr.Audio.update(visible=False), # Youtube gr.Dropdown.update(visible=False), gr.Textbox.update(visible=False), gr.Textbox.update(visible=False), gr.Button.update(visible=False), # Splitter gr.Dropdown.update(visible=False), gr.Textbox.update(visible=False), gr.Button.update(visible=False), gr.Audio.update(visible=False), gr.Audio.update(visible=False), gr.Audio.update(visible=False), gr.Slider.update(visible=False), gr.Slider.update(visible=False), gr.Audio.update(visible=False), gr.Button.update(visible=False), # TTS gr.Textbox.update(visible=True), gr.Dropdown.update(visible=True) ) def use_microphone(microphone): if microphone == True: return gr.Audio.update(source="microphone") else: return gr.Audio.update(source="upload") if __name__ == '__main__': load_hubert() categories = load_model() tts_voice_list = asyncio.new_event_loop().run_until_complete(edge_tts.list_voices()) voices = [f"{v['ShortName']}-{v['Gender']}" for v in tts_voice_list] with gr.Blocks() as app: gr.Markdown( "