Spaces:
Paused
Paused
import os | |
os.environ["PYTORCH_JIT"] = "0v" | |
from random import shuffle | |
import gradio as gr | |
import zipfile | |
import tempfile | |
import shutil | |
from glob import glob | |
from infer.modules.train.preprocess import PreProcess | |
from infer.modules.train.extract.extract_f0_rmvpe import FeatureInput | |
from infer.modules.train.extract_feature_print import HubertFeatureExtractor | |
from infer.modules.train.train import train | |
from infer.lib.train.process_ckpt import extract_small_model | |
from zero import zero | |
# patch for jit script | |
# if we find `def expand_2d_or_3d_tensor(x,` in /usr/local/lib/python3.10/site-packages/fairseq/models/model_utils.py | |
# patch it with `def expand_2d_or_3d_tensor(x: Tensor,` | |
FAIRSEQ_CODE = "/usr/local/lib/python3.10/site-packages/fairseq/models/model_utils.py" | |
if os.path.exists(FAIRSEQ_CODE): | |
with open(FAIRSEQ_CODE, "r") as f: | |
lines = f.readlines() | |
with open(FAIRSEQ_CODE, "w") as f: | |
for line in lines: | |
if "def expand_2d_or_3d_tensor(x, trg_dim: int, padding_idx: int):" in line: | |
f.write( | |
"def expand_2d_or_3d_tensor(x: Tensor, trg_dim: int, padding_idx: int) -> Tensor:\n" | |
) | |
else: | |
f.write(line) | |
def extract_audio_files(zip_file: str, target_dir: str) -> list[str]: | |
with zipfile.ZipFile(zip_file, "r") as zip_ref: | |
zip_ref.extractall(target_dir) | |
audio_files = [ | |
os.path.join(target_dir, f) | |
for f in os.listdir(target_dir) | |
if f.endswith((".wav", ".mp3", ".ogg")) | |
] | |
if not audio_files: | |
raise gr.Error("No audio files found at the top level of the zip file") | |
return audio_files | |
def train_rvc_model(audio_files: list[str]) -> str: | |
return "model_path" | |
def preprocess(zip_file: str) -> str: | |
temp_dir = tempfile.mkdtemp() | |
print(f"Using exp dir: {temp_dir}") | |
data_dir = os.path.join(temp_dir, "_data") | |
os.makedirs(data_dir) | |
audio_files = extract_audio_files(zip_file, data_dir) | |
pp = PreProcess(48000, temp_dir, 3.0, False) | |
pp.pipeline_mp_inp_dir(data_dir, 4) | |
pp.logfile.seek(0) | |
log = pp.logfile.read() | |
return temp_dir, f"Preprocessed {len(audio_files)} audio files.\n{log}" | |
def extract_features(exp_dir: str) -> str: | |
err = None | |
fi = FeatureInput(exp_dir) | |
try: | |
fi.run() | |
except Exception as e: | |
err = e | |
fi.logfile.seek(0) | |
log = fi.logfile.read() | |
if err: | |
log = f"Error: {err}\n{log}" | |
return log | |
hfe = HubertFeatureExtractor(exp_dir) | |
try: | |
hfe.run() | |
except Exception as e: | |
err = e | |
hfe.logfile.seek(0) | |
log += hfe.logfile.read() | |
if err: | |
log = f"Error: {err}\n{log}" | |
return log | |
def write_filelist(exp_dir: str) -> None: | |
if_f0_3 = True | |
spk_id5 = 0 | |
gt_wavs_dir = "%s/0_gt_wavs" % (exp_dir) | |
feature_dir = "%s/3_feature768" % (exp_dir) | |
if if_f0_3: | |
f0_dir = "%s/2a_f0" % (exp_dir) | |
f0nsf_dir = "%s/2b-f0nsf" % (exp_dir) | |
names = ( | |
set([name.split(".")[0] for name in os.listdir(gt_wavs_dir)]) | |
& set([name.split(".")[0] for name in os.listdir(feature_dir)]) | |
& set([name.split(".")[0] for name in os.listdir(f0_dir)]) | |
& set([name.split(".")[0] for name in os.listdir(f0nsf_dir)]) | |
) | |
else: | |
names = set([name.split(".")[0] for name in os.listdir(gt_wavs_dir)]) & set( | |
[name.split(".")[0] for name in os.listdir(feature_dir)] | |
) | |
opt = [] | |
for name in names: | |
if if_f0_3: | |
opt.append( | |
"%s/%s.wav|%s/%s.npy|%s/%s.wav.npy|%s/%s.wav.npy|%s" | |
% ( | |
gt_wavs_dir.replace("\\", "\\\\"), | |
name, | |
feature_dir.replace("\\", "\\\\"), | |
name, | |
f0_dir.replace("\\", "\\\\"), | |
name, | |
f0nsf_dir.replace("\\", "\\\\"), | |
name, | |
spk_id5, | |
) | |
) | |
else: | |
opt.append( | |
"%s/%s.wav|%s/%s.npy|%s" | |
% ( | |
gt_wavs_dir.replace("\\", "\\\\"), | |
name, | |
feature_dir.replace("\\", "\\\\"), | |
name, | |
spk_id5, | |
) | |
) | |
fea_dim = 768 | |
now_dir = os.getcwd() | |
sr2 = "40k" | |
if if_f0_3: | |
for _ in range(2): | |
opt.append( | |
"%s/logs/mute/0_gt_wavs/mute%s.wav|%s/logs/mute/3_feature%s/mute.npy|%s/logs/mute/2a_f0/mute.wav.npy|%s/logs/mute/2b-f0nsf/mute.wav.npy|%s" | |
% (now_dir, sr2, now_dir, fea_dim, now_dir, now_dir, spk_id5) | |
) | |
else: | |
for _ in range(2): | |
opt.append( | |
"%s/logs/mute/0_gt_wavs/mute%s.wav|%s/logs/mute/3_feature%s/mute.npy|%s" | |
% (now_dir, sr2, now_dir, fea_dim, spk_id5) | |
) | |
shuffle(opt) | |
with open("%s/filelist.txt" % exp_dir, "w") as f: | |
f.write("\n".join(opt)) | |
def train_model(exp_dir: str) -> str: | |
shutil.copy("config.json", exp_dir) | |
write_filelist(exp_dir) | |
train(exp_dir) | |
models = glob(f"{exp_dir}/G_*.pth") | |
print(models) | |
if not models: | |
raise gr.Error("No model found") | |
latest_model = max(models, key=os.path.getctime) | |
return latest_model | |
def download_weight(exp_dir: str) -> str: | |
models = glob(f"{exp_dir}/G_*.pth") | |
if not models: | |
raise gr.Error("No model found") | |
latest_model = max(models, key=os.path.getctime) | |
name = os.path.basename(exp_dir) | |
extract_small_model( | |
latest_model, name, "40k", True, "Model trained by ZeroGPU.", "v2" | |
) | |
return "assets/weights/%s.pth" % name | |
def download_expdir(exp_dir: str) -> str: | |
shutil.make_archive(exp_dir, "zip", exp_dir) | |
return f"{exp_dir}.zip" | |
def restore_expdir(zip: str) -> str: | |
exp_dir = tempfile.mkdtemp() | |
shutil.unpack_archive(zip, exp_dir) | |
return exp_dir | |
with gr.Blocks() as app: | |
with gr.Row(): | |
with gr.Column(): | |
zip_file = gr.File( | |
label="Upload a zip file containing audio files for training", | |
file_types=["zip"], | |
) | |
exp_dir = gr.Textbox(label="Experiment directory", visible=True) | |
preprocess_btn = gr.Button(value="Preprocess", variant="primary") | |
with gr.Column(): | |
preprocess_output = gr.Textbox(label="Preprocessing output", lines=5) | |
with gr.Row(): | |
restore_zip_file = gr.File( | |
label="Upload the experiment directory zip file", | |
file_types=["zip"], | |
) | |
restore_btn = gr.Button(value="Preprocess") | |
with gr.Row(): | |
with gr.Column(): | |
extract_features_btn = gr.Button( | |
value="Extract features", variant="primary" | |
) | |
with gr.Column(): | |
extract_features_output = gr.Textbox( | |
label="Feature extraction output", lines=5 | |
) | |
with gr.Row(): | |
with gr.Column(): | |
train_btn = gr.Button(value="Train", variant="primary") | |
with gr.Column(): | |
latest_model = gr.File(label="Latest checkpoint") | |
with gr.Row(): | |
with gr.Column(): | |
download_weight_btn = gr.Button( | |
value="Download latest model", variant="primary" | |
) | |
with gr.Column(): | |
download_weight_output = gr.File(label="Download latest model") | |
with gr.Row(): | |
with gr.Column(): | |
download_expdir_btn = gr.Button( | |
value="Download experiment directory", variant="primary" | |
) | |
with gr.Column(): | |
download_expdir_output = gr.File(label="Download experiment directory") | |
preprocess_btn.click( | |
fn=preprocess, | |
inputs=[zip_file], | |
outputs=[exp_dir, preprocess_output], | |
) | |
extract_features_btn.click( | |
fn=extract_features, | |
inputs=[exp_dir], | |
outputs=[extract_features_output], | |
) | |
train_btn.click( | |
fn=train_model, | |
inputs=[exp_dir], | |
outputs=[latest_model], | |
) | |
download_weight_btn.click( | |
fn=download_weight, | |
inputs=[exp_dir], | |
outputs=[download_weight_output], | |
) | |
download_expdir_btn.click( | |
fn=download_expdir, | |
inputs=[exp_dir], | |
outputs=[download_expdir_output], | |
) | |
restore_btn.click( | |
fn=restore_expdir, | |
inputs=[restore_zip_file], | |
outputs=[exp_dir], | |
) | |
app.launch() | |