ZeroRVC / app /dataset.py
github-actions[bot]
Sync to HuggingFace Spaces
b4d60ce
import os
import gradio as gr
import zipfile
import tempfile
from zerorvc import prepare
from datasets import load_dataset, load_from_disk
from .constants import ROOT_EXP_DIR, BATCH_SIZE
from .zero import zero
from .model import accelerator
def extract_audio_files(zip_file: str, target_dir: str) -> list[str]:
with zipfile.ZipFile(zip_file, "r") as zip_ref:
zip_ref.extractall(target_dir)
audio_files = [
os.path.join(target_dir, f)
for f in os.listdir(target_dir)
if f.endswith((".wav", ".mp3", ".ogg"))
]
if not audio_files:
raise gr.Error("No audio files found at the top level of the zip file")
return audio_files
def make_dataset_from_zip(exp_dir: str, zip_file: str):
if not exp_dir:
exp_dir = tempfile.mkdtemp(dir=ROOT_EXP_DIR)
print(f"Using exp dir: {exp_dir}")
data_dir = os.path.join(exp_dir, "raw_data")
if not os.path.exists(data_dir):
os.makedirs(data_dir)
extract_audio_files(zip_file, data_dir)
ds = prepare(
data_dir,
accelerator=accelerator,
batch_size=BATCH_SIZE,
stage=1,
)
return exp_dir, str(ds)
@zero(duration=120)
def make_dataset_from_zip_stage_2(exp_dir: str):
data_dir = os.path.join(exp_dir, "raw_data")
ds = prepare(
data_dir,
accelerator=accelerator,
batch_size=BATCH_SIZE,
stage=2,
)
return exp_dir, str(ds)
def make_dataset_from_zip_stage_3(exp_dir: str):
data_dir = os.path.join(exp_dir, "raw_data")
ds = prepare(
data_dir,
accelerator=accelerator,
batch_size=BATCH_SIZE,
stage=3,
)
dataset = os.path.join(exp_dir, "dataset")
ds.save_to_disk(dataset)
return exp_dir, str(ds)
def make_dataset_from_repo(repo: str, hf_token: str):
ds = load_dataset(repo, token=hf_token)
ds = prepare(
ds,
accelerator=accelerator,
batch_size=BATCH_SIZE,
stage=1,
)
return str(ds)
@zero(duration=120)
def make_dataset_from_repo_stage_2(repo: str, hf_token: str):
ds = load_dataset(repo, token=hf_token)
ds = prepare(
ds,
accelerator=accelerator,
batch_size=BATCH_SIZE,
stage=2,
)
return str(ds)
def make_dataset_from_repo_stage_3(exp_dir: str, repo: str, hf_token: str):
ds = load_dataset(repo, token=hf_token)
ds = prepare(
ds,
accelerator=accelerator,
batch_size=BATCH_SIZE,
stage=3,
)
if not exp_dir:
exp_dir = tempfile.mkdtemp(dir=ROOT_EXP_DIR)
print(f"Using exp dir: {exp_dir}")
dataset = os.path.join(exp_dir, "dataset")
ds.save_to_disk(dataset)
return exp_dir, str(ds)
def use_dataset(exp_dir: str, repo: str, hf_token: str):
gr.Info("Fetching dataset")
ds = load_dataset(repo, token=hf_token)
if not exp_dir:
exp_dir = tempfile.mkdtemp(dir=ROOT_EXP_DIR)
print(f"Using exp dir: {exp_dir}")
dataset = os.path.join(exp_dir, "dataset")
ds.save_to_disk(dataset)
return exp_dir, str(ds)
def upload_dataset(exp_dir: str, repo: str, hf_token: str):
dataset = os.path.join(exp_dir, "dataset")
if not os.path.exists(dataset):
raise gr.Error("Dataset not found")
gr.Info("Uploading dataset")
ds = load_from_disk(dataset)
ds.push_to_hub(repo, token=hf_token, private=True)
gr.Info("Dataset uploaded successfully")
class DatasetTab:
def __init__(self):
pass
def ui(self):
gr.Markdown("# Dataset")
gr.Markdown("The suggested dataset size is > 5 minutes of audio.")
gr.Markdown("## Create Dataset from ZIP")
gr.Markdown(
"Create a dataset by simply upload a zip file containing audio files. The audio files should be at the top level of the zip file."
)
with gr.Row():
self.zip_file = gr.File(
label="Upload a zip file containing audio files",
file_types=["zip"],
)
self.make_ds_from_dir = gr.Button(
value="Create Dataset from ZIP", variant="primary"
)
gr.Markdown("## Create Dataset from Dataset Repository")
gr.Markdown(
"You can also create a dataset from any Hugging Face dataset repository that has 'audio' column."
)
with gr.Row():
self.repo = gr.Textbox(
label="Hugging Face Dataset Repository",
placeholder="username/dataset-name",
)
self.make_ds_from_repo = gr.Button(
value="Create Dataset from Repo", variant="primary"
)
gr.Markdown("## Sync Preprocessed Dataset")
gr.Markdown(
"After you have preprocessed the dataset, you can upload the dataset to Hugging Face. And fetch it back later directly."
)
with gr.Row():
self.preprocessed_repo = gr.Textbox(
label="Hugging Face Dataset Repository",
placeholder="username/dataset-name",
)
self.fetch_ds = gr.Button(value="Fetch Dataset", variant="primary")
self.upload_ds = gr.Button(value="Upload Dataset", variant="primary")
self.ds_state = gr.Textbox(label="Dataset Info", lines=5)
def build(self, exp_dir: gr.Textbox, hf_token: gr.Textbox):
self.make_ds_from_dir.click(
fn=make_dataset_from_zip,
inputs=[exp_dir, self.zip_file],
outputs=[exp_dir, self.ds_state],
).success(
fn=make_dataset_from_zip_stage_2,
inputs=[exp_dir],
outputs=[exp_dir, self.ds_state],
).success(
fn=make_dataset_from_zip_stage_3,
inputs=[exp_dir],
outputs=[exp_dir, self.ds_state],
)
self.make_ds_from_repo.click(
fn=make_dataset_from_repo,
inputs=[self.repo, hf_token],
outputs=[self.ds_state],
).success(
fn=make_dataset_from_repo_stage_2,
inputs=[self.repo, hf_token],
outputs=[self.ds_state],
).success(
fn=make_dataset_from_repo_stage_3,
inputs=[exp_dir, self.repo, hf_token],
outputs=[exp_dir, self.ds_state],
)
self.fetch_ds.click(
fn=use_dataset,
inputs=[exp_dir, self.preprocessed_repo, hf_token],
outputs=[exp_dir, self.ds_state],
)
self.upload_ds.click(
fn=upload_dataset,
inputs=[exp_dir, self.preprocessed_repo, hf_token],
outputs=[],
)