import os import traceback import numpy as np from sklearn.cluster import MiniBatchKMeans os.environ["PYTORCH_JIT"] = "0v" from random import shuffle import gradio as gr import zipfile import tempfile import shutil import faiss from glob import glob from infer.modules.train.preprocess import PreProcess from infer.modules.train.extract.extract_f0_rmvpe import FeatureInput from infer.modules.train.extract_feature_print import HubertFeatureExtractor from infer.modules.train.train import train from infer.lib.train.process_ckpt import extract_small_model from zero import zero # patch for jit script # if we find `def expand_2d_or_3d_tensor(x,` in /usr/local/lib/python3.10/site-packages/fairseq/models/model_utils.py # patch it with `def expand_2d_or_3d_tensor(x: Tensor,` FAIRSEQ_CODE = "/usr/local/lib/python3.10/site-packages/fairseq/models/model_utils.py" if os.path.exists(FAIRSEQ_CODE): with open(FAIRSEQ_CODE, "r") as f: lines = f.readlines() with open(FAIRSEQ_CODE, "w") as f: for line in lines: if "def expand_2d_or_3d_tensor(x, trg_dim: int, padding_idx: int):" in line: f.write( "def expand_2d_or_3d_tensor(x: Tensor, trg_dim: int, padding_idx: int) -> Tensor:\n" ) else: f.write(line) def extract_audio_files(zip_file: str, target_dir: str) -> list[str]: with zipfile.ZipFile(zip_file, "r") as zip_ref: zip_ref.extractall(target_dir) audio_files = [ os.path.join(target_dir, f) for f in os.listdir(target_dir) if f.endswith((".wav", ".mp3", ".ogg")) ] if not audio_files: raise gr.Error("No audio files found at the top level of the zip file") return audio_files def preprocess(zip_file: str) -> str: temp_dir = tempfile.mkdtemp() print(f"Using exp dir: {temp_dir}") data_dir = os.path.join(temp_dir, "_data") os.makedirs(data_dir) audio_files = extract_audio_files(zip_file, data_dir) pp = PreProcess(40000, temp_dir, 3.0, False) pp.pipeline_mp_inp_dir(data_dir, 4) pp.logfile.seek(0) log = pp.logfile.read() return temp_dir, f"Preprocessed {len(audio_files)} audio files.\n{log}" @zero(duration=300) def extract_features(exp_dir: str) -> str: err = None fi = FeatureInput(exp_dir) try: fi.run() except Exception as e: err = e fi.logfile.seek(0) log = fi.logfile.read() if err: log = f"Error: {err}\n{log}" return log hfe = HubertFeatureExtractor(exp_dir) try: hfe.run() except Exception as e: err = e hfe.logfile.seek(0) log += hfe.logfile.read() if err: log = f"Error: {err}\n{log}" return log def write_filelist(exp_dir: str) -> None: if_f0_3 = True spk_id5 = 0 gt_wavs_dir = "%s/0_gt_wavs" % (exp_dir) feature_dir = "%s/3_feature768" % (exp_dir) if if_f0_3: f0_dir = "%s/2a_f0" % (exp_dir) f0nsf_dir = "%s/2b-f0nsf" % (exp_dir) names = ( set([name.split(".")[0] for name in os.listdir(gt_wavs_dir)]) & set([name.split(".")[0] for name in os.listdir(feature_dir)]) & set([name.split(".")[0] for name in os.listdir(f0_dir)]) & set([name.split(".")[0] for name in os.listdir(f0nsf_dir)]) ) else: names = set([name.split(".")[0] for name in os.listdir(gt_wavs_dir)]) & set( [name.split(".")[0] for name in os.listdir(feature_dir)] ) opt = [] for name in names: if if_f0_3: opt.append( "%s/%s.wav|%s/%s.npy|%s/%s.wav.npy|%s/%s.wav.npy|%s" % ( gt_wavs_dir.replace("\\", "\\\\"), name, feature_dir.replace("\\", "\\\\"), name, f0_dir.replace("\\", "\\\\"), name, f0nsf_dir.replace("\\", "\\\\"), name, spk_id5, ) ) else: opt.append( "%s/%s.wav|%s/%s.npy|%s" % ( gt_wavs_dir.replace("\\", "\\\\"), name, feature_dir.replace("\\", "\\\\"), name, spk_id5, ) ) fea_dim = 768 now_dir = os.getcwd() sr2 = "40k" if if_f0_3: for _ in range(2): opt.append( "%s/logs/mute/0_gt_wavs/mute%s.wav|%s/logs/mute/3_feature%s/mute.npy|%s/logs/mute/2a_f0/mute.wav.npy|%s/logs/mute/2b-f0nsf/mute.wav.npy|%s" % (now_dir, sr2, now_dir, fea_dim, now_dir, now_dir, spk_id5) ) else: for _ in range(2): opt.append( "%s/logs/mute/0_gt_wavs/mute%s.wav|%s/logs/mute/3_feature%s/mute.npy|%s" % (now_dir, sr2, now_dir, fea_dim, spk_id5) ) shuffle(opt) with open("%s/filelist.txt" % exp_dir, "w") as f: f.write("\n".join(opt)) @zero(duration=300) def train_model(exp_dir: str) -> str: shutil.copy("config.json", exp_dir) write_filelist(exp_dir) train(exp_dir) models = glob(f"{exp_dir}/G_*.pth") print(models) if not models: raise gr.Error("No model found") latest_model = max(models, key=os.path.getctime) return latest_model def download_weight(exp_dir: str) -> str: models = glob(f"{exp_dir}/G_*.pth") if not models: raise gr.Error("No model found") latest_model = max(models, key=os.path.getctime) name = os.path.basename(exp_dir) extract_small_model( latest_model, name, "40k", True, "Model trained by ZeroGPU.", "v2" ) return "assets/weights/%s.pth" % name def train_index(exp_dir: str) -> str: feature_dir = "%s/3_feature768" % (exp_dir) if not os.path.exists(feature_dir): raise gr.Error("Please extract features first.") listdir_res = list(os.listdir(feature_dir)) if len(listdir_res) == 0: raise gr.Error("Please extract features first.") npys = [] for name in sorted(listdir_res): phone = np.load("%s/%s" % (feature_dir, name)) npys.append(phone) big_npy = np.concatenate(npys, 0) big_npy_idx = np.arange(big_npy.shape[0]) np.random.shuffle(big_npy_idx) big_npy = big_npy[big_npy_idx] if big_npy.shape[0] > 2e5: print("Trying doing kmeans %s shape to 10k centers." % big_npy.shape[0]) try: big_npy = ( MiniBatchKMeans( n_clusters=10000, verbose=True, batch_size=256 * 8, compute_labels=False, init="random", ) .fit(big_npy) .cluster_centers_ ) except: info = traceback.format_exc() print(info) raise gr.Error(info) np.save("%s/total_fea.npy" % exp_dir, big_npy) n_ivf = min(int(16 * np.sqrt(big_npy.shape[0])), big_npy.shape[0] // 39) print("%s,%s" % (big_npy.shape, n_ivf)) index = faiss.index_factory(768, "IVF%s,Flat" % n_ivf) # index = faiss.index_factory(256if version19=="v1"else 768, "IVF%s,PQ128x4fs,RFlat"%n_ivf) print("training") index_ivf = faiss.extract_index_ivf(index) # index_ivf.nprobe = 1 index.train(big_npy) faiss.write_index( index, "%s/trained_IVF%s_Flat_nprobe_%s.index" % (exp_dir, n_ivf, index_ivf.nprobe), ) print("adding") batch_size_add = 8192 for i in range(0, big_npy.shape[0], batch_size_add): index.add(big_npy[i : i + batch_size_add]) faiss.write_index( index, "%s/added_IVF%s_Flat_nprobe_%s.index" % (exp_dir, n_ivf, index_ivf.nprobe), ) print("built added_IVF%s_Flat_nprobe_%s.index" % (n_ivf, index_ivf.nprobe)) return "%s/added_IVF%s_Flat_nprobe_%s.index" % (exp_dir, n_ivf, index_ivf.nprobe) def download_expdir(exp_dir: str) -> str: shutil.make_archive(exp_dir, "zip", exp_dir) return f"{exp_dir}.zip" def restore_expdir(zip: str) -> str: exp_dir = tempfile.mkdtemp() shutil.unpack_archive(zip, exp_dir) return exp_dir with gr.Blocks() as app: # allow user to manually select the experiment directory exp_dir = gr.Textbox(label="Experiment directory (don't touch it unless you know what you are doing)", visible=True, interactive=True) with gr.Tabs(): with gr.Tab(label="New / Restore"): with gr.Row(): with gr.Column(): zip_file = gr.File( label="Upload a zip file containing audio files for training", file_types=["zip"], ) preprocess_output = gr.Textbox( label="Preprocessing output", lines=5 ) with gr.Column(): preprocess_btn = gr.Button( value="Start New Experiment", variant="primary" ) with gr.Row(): restore_zip_file = gr.File( label="Upload the experiment directory zip file", file_types=["zip"], ) restore_btn = gr.Button(value="Restore Experiment", variant="primary") with gr.Tab(label="Extract features"): with gr.Row(): extract_features_btn = gr.Button( value="Extract features", variant="primary" ) with gr.Row(): extract_features_output = gr.Textbox( label="Feature extraction output", lines=10 ) with gr.Tab(label="Train"): with gr.Row(): train_btn = gr.Button(value="Train", variant="primary") latest_model = gr.File(label="Latest checkpoint") with gr.Row(): train_index_btn = gr.Button(value="Train index", variant="primary") trained_index = gr.File(label="Trained index") with gr.Tab(label="Download"): with gr.Row(): download_weight_btn = gr.Button( value="Download latest model", variant="primary" ) download_weight_output = gr.File(label="Download latest model") with gr.Row(): download_expdir_btn = gr.Button( value="Download experiment directory", variant="primary" ) download_expdir_output = gr.File(label="Download experiment directory") preprocess_btn.click( fn=preprocess, inputs=[zip_file], outputs=[exp_dir, preprocess_output], ) extract_features_btn.click( fn=extract_features, inputs=[exp_dir], outputs=[extract_features_output], ) train_btn.click( fn=train_model, inputs=[exp_dir], outputs=[latest_model], ) train_index_btn.click( fn=train_index, inputs=[exp_dir], outputs=[trained_index], ) download_weight_btn.click( fn=download_weight, inputs=[exp_dir], outputs=[download_weight_output], ) download_expdir_btn.click( fn=download_expdir, inputs=[exp_dir], outputs=[download_expdir_output], ) restore_btn.click( fn=restore_expdir, inputs=[restore_zip_file], outputs=[exp_dir], ) app.launch()