Spaces:
Runtime error
Runtime error
import os | |
import gradio as gr | |
import zipfile | |
import tempfile | |
from zerorvc import prepare | |
from datasets import load_dataset, load_from_disk | |
from .constants import ROOT_EXP_DIR, BATCH_SIZE | |
from .zero import zero | |
from .model import accelerator | |
def extract_audio_files(zip_file: str, target_dir: str) -> list[str]: | |
with zipfile.ZipFile(zip_file, "r") as zip_ref: | |
zip_ref.extractall(target_dir) | |
audio_files = [ | |
os.path.join(target_dir, f) | |
for f in os.listdir(target_dir) | |
if f.endswith((".wav", ".mp3", ".ogg")) | |
] | |
if not audio_files: | |
raise gr.Error("No audio files found at the top level of the zip file") | |
return audio_files | |
def make_dataset_from_zip(exp_dir: str, zip_file: str): | |
if not exp_dir: | |
exp_dir = tempfile.mkdtemp(dir=ROOT_EXP_DIR) | |
print(f"Using exp dir: {exp_dir}") | |
data_dir = os.path.join(exp_dir, "raw_data") | |
if not os.path.exists(data_dir): | |
os.makedirs(data_dir) | |
extract_audio_files(zip_file, data_dir) | |
ds = prepare( | |
data_dir, | |
accelerator=accelerator, | |
batch_size=BATCH_SIZE, | |
stage=1, | |
) | |
return exp_dir, str(ds) | |
def make_dataset_from_zip_stage_2(exp_dir: str): | |
data_dir = os.path.join(exp_dir, "raw_data") | |
ds = prepare( | |
data_dir, | |
accelerator=accelerator, | |
batch_size=BATCH_SIZE, | |
stage=2, | |
) | |
return exp_dir, str(ds) | |
def make_dataset_from_zip_stage_3(exp_dir: str): | |
data_dir = os.path.join(exp_dir, "raw_data") | |
ds = prepare( | |
data_dir, | |
accelerator=accelerator, | |
batch_size=BATCH_SIZE, | |
stage=3, | |
) | |
dataset = os.path.join(exp_dir, "dataset") | |
ds.save_to_disk(dataset) | |
return exp_dir, str(ds) | |
def make_dataset_from_repo(repo: str, hf_token: str): | |
ds = load_dataset(repo, token=hf_token) | |
ds = prepare( | |
ds, | |
accelerator=accelerator, | |
batch_size=BATCH_SIZE, | |
stage=1, | |
) | |
return str(ds) | |
def make_dataset_from_repo_stage_2(repo: str, hf_token: str): | |
ds = load_dataset(repo, token=hf_token) | |
ds = prepare( | |
ds, | |
accelerator=accelerator, | |
batch_size=BATCH_SIZE, | |
stage=2, | |
) | |
return str(ds) | |
def make_dataset_from_repo_stage_3(exp_dir: str, repo: str, hf_token: str): | |
ds = load_dataset(repo, token=hf_token) | |
ds = prepare( | |
ds, | |
accelerator=accelerator, | |
batch_size=BATCH_SIZE, | |
stage=3, | |
) | |
if not exp_dir: | |
exp_dir = tempfile.mkdtemp(dir=ROOT_EXP_DIR) | |
print(f"Using exp dir: {exp_dir}") | |
dataset = os.path.join(exp_dir, "dataset") | |
ds.save_to_disk(dataset) | |
return exp_dir, str(ds) | |
def use_dataset(exp_dir: str, repo: str, hf_token: str): | |
gr.Info("Fetching dataset") | |
ds = load_dataset(repo, token=hf_token) | |
if not exp_dir: | |
exp_dir = tempfile.mkdtemp(dir=ROOT_EXP_DIR) | |
print(f"Using exp dir: {exp_dir}") | |
dataset = os.path.join(exp_dir, "dataset") | |
ds.save_to_disk(dataset) | |
return exp_dir, str(ds) | |
def upload_dataset(exp_dir: str, repo: str, hf_token: str): | |
dataset = os.path.join(exp_dir, "dataset") | |
if not os.path.exists(dataset): | |
raise gr.Error("Dataset not found") | |
gr.Info("Uploading dataset") | |
ds = load_from_disk(dataset) | |
ds.push_to_hub(repo, token=hf_token, private=True) | |
gr.Info("Dataset uploaded successfully") | |
class DatasetTab: | |
def __init__(self): | |
pass | |
def ui(self): | |
gr.Markdown("# Dataset") | |
gr.Markdown("The suggested dataset size is > 5 minutes of audio.") | |
gr.Markdown("## Create Dataset from ZIP") | |
gr.Markdown( | |
"Create a dataset by simply upload a zip file containing audio files. The audio files should be at the top level of the zip file." | |
) | |
with gr.Row(): | |
self.zip_file = gr.File( | |
label="Upload a zip file containing audio files", | |
file_types=["zip"], | |
) | |
self.make_ds_from_dir = gr.Button( | |
value="Create Dataset from ZIP", variant="primary" | |
) | |
gr.Markdown("## Create Dataset from Dataset Repository") | |
gr.Markdown( | |
"You can also create a dataset from any Hugging Face dataset repository that has 'audio' column." | |
) | |
with gr.Row(): | |
self.repo = gr.Textbox( | |
label="Hugging Face Dataset Repository", | |
placeholder="username/dataset-name", | |
) | |
self.make_ds_from_repo = gr.Button( | |
value="Create Dataset from Repo", variant="primary" | |
) | |
gr.Markdown("## Sync Preprocessed Dataset") | |
gr.Markdown( | |
"After you have preprocessed the dataset, you can upload the dataset to Hugging Face. And fetch it back later directly." | |
) | |
with gr.Row(): | |
self.preprocessed_repo = gr.Textbox( | |
label="Hugging Face Dataset Repository", | |
placeholder="username/dataset-name", | |
) | |
self.fetch_ds = gr.Button(value="Fetch Dataset", variant="primary") | |
self.upload_ds = gr.Button(value="Upload Dataset", variant="primary") | |
self.ds_state = gr.Textbox(label="Dataset Info", lines=5) | |
def build(self, exp_dir: gr.Textbox, hf_token: gr.Textbox): | |
self.make_ds_from_dir.click( | |
fn=make_dataset_from_zip, | |
inputs=[exp_dir, self.zip_file], | |
outputs=[exp_dir, self.ds_state], | |
).success( | |
fn=make_dataset_from_zip_stage_2, | |
inputs=[exp_dir], | |
outputs=[exp_dir, self.ds_state], | |
).success( | |
fn=make_dataset_from_zip_stage_3, | |
inputs=[exp_dir], | |
outputs=[exp_dir, self.ds_state], | |
) | |
self.make_ds_from_repo.click( | |
fn=make_dataset_from_repo, | |
inputs=[self.repo, hf_token], | |
outputs=[self.ds_state], | |
).success( | |
fn=make_dataset_from_repo_stage_2, | |
inputs=[self.repo, hf_token], | |
outputs=[self.ds_state], | |
).success( | |
fn=make_dataset_from_repo_stage_3, | |
inputs=[exp_dir, self.repo, hf_token], | |
outputs=[exp_dir, self.ds_state], | |
) | |
self.fetch_ds.click( | |
fn=use_dataset, | |
inputs=[exp_dir, self.preprocessed_repo, hf_token], | |
outputs=[exp_dir, self.ds_state], | |
) | |
self.upload_ds.click( | |
fn=upload_dataset, | |
inputs=[exp_dir, self.preprocessed_repo, hf_token], | |
outputs=[], | |
) | |