Spaces:
Build error
Build error
import numpy as np | |
import pandas as pd | |
from datasets import load_dataset, DatasetDict | |
from multiprocess import set_start_method | |
import argparse | |
from pathlib import Path | |
import os | |
import matplotlib.pyplot as plt | |
import json | |
SPEAKER_RATE_BINS = ["very slowly", "quite slowly", "slightly slowly", "moderate speed", "slightly fast", "quite fast", "very fast"] | |
SNR_BINS = ["very noisy", "quite noisy", "slightly noisy", "moderate ambient sound", "slightly clear", "quite clear", "very clear"] | |
REVERBERATION_BINS = ["very roomy sounding", "quite roomy sounding", "slightly roomy sounding", "moderate reverberation", "slightly confined sounding", "quite confined sounding", "very confined sounding"] | |
UTTERANCE_LEVEL_STD = ["very monotone", "quite monotone", "slightly monotone", "moderate intonation", "slightly expressive", "quite expressive", "very expressive"] | |
# this one is supposed to be apply to speaker-level mean pitch, and relative to gender | |
SPEAKER_LEVEL_PITCH_BINS = ["very low pitch", "quite low pitch", "slightly low pitch", "moderate pitch", "slightly high pitch", "quite high pitch", "very high pitch"] | |
def visualize_bins_to_text(values_1, values_2, name_1, name_2, text_bins, save_dir, output_column_name, default_bins=100, lower_range=None): | |
# Save both histograms into a single figure | |
fig, axs = plt.subplots(2, figsize=(8,6), sharex=True) | |
# Plot histogram and vertical lines for subplot 1 | |
axs[0].hist(values_1, bins=default_bins, color='blue', alpha=0.7) | |
_, bin_edges1 = np.histogram(values_1, bins=len(text_bins), range=(lower_range, values_1.max()) if lower_range else None) | |
for edge in bin_edges1: | |
axs[0].axvline(x=edge, color='red', linestyle='--', linewidth=1) | |
# Plot histogram and vertical lines for subplot 2 | |
axs[1].hist(values_2, bins=50, color='green', alpha=0.7) | |
_, bin_edges2 = np.histogram(values_2, bins=len(text_bins), range=(lower_range, values_2.max()) if lower_range else None) | |
for edge in bin_edges2: | |
axs[1].axvline(x=edge, color='red', linestyle='--', linewidth=1) | |
# Add labels and title | |
axs[0].set_title(name_1) | |
axs[1].set_title(name_2) | |
axs[0].set_yscale('log') | |
axs[1].set_yscale('log') | |
axs[0].set_ylabel('Frequency') | |
axs[1].set_ylabel('Frequency') | |
axs[1].set_xlabel(f'{output_column_name}') | |
# Adjust layout | |
plt.tight_layout() | |
filename = f"{output_column_name}.png" | |
filepath = os.path.join(save_dir, filename) | |
plt.savefig(filepath) | |
print(f"Plots saved at '{filename}'!") | |
def bins_to_text(dataset, text_bins, column_name, output_column_name, leading_split_for_bins="train", batch_size = 4, num_workers = 1, std_tolerance=5, save_dir=None, only_save_plot=False, lower_range=None, bin_edges=None): | |
''' | |
Compute bins of `column_name` from the splits `leading_split_for_bins` and apply text bins to every split. | |
`leading_split_for_bins` can be a string or a list. | |
''' | |
if bin_edges is None: | |
values = [] | |
for df in dataset: | |
for split in df: | |
if leading_split_for_bins is None or leading_split_for_bins in split: | |
values.extend(df[split][column_name]) | |
# filter out outliers | |
values = np.array(values) | |
if std_tolerance is not None: | |
filtered_values = values[np.abs(values - np.mean(values)) < std_tolerance * np.std(values)] | |
if save_dir is not None: | |
visualize_bins_to_text(values, filtered_values, "Before filtering", "After filtering", text_bins, save_dir, output_column_name, lower_range) | |
# speaking_rate can easily have outliers | |
if save_dir is not None and output_column_name=="speaking_rate": | |
visualize_bins_to_text(filtered_values, filtered_values, "After filtering", "After filtering", text_bins, save_dir, f"{output_column_name}_after_filtering", lower_range) | |
values = filtered_values | |
hist, bin_edges = np.histogram(values, bins = len(text_bins), range=(lower_range, values.max()) if lower_range else None) | |
if only_save_plot: | |
return dataset, bin_edges | |
else: | |
print(f"Already computed bin edges have been passed for {output_column_name}. Will use: {bin_edges}.") | |
def batch_association(batch): | |
index_bins = np.searchsorted(bin_edges, batch, side="left") | |
# do min(max(...)) when values are outside of the main bins | |
# it happens when value = min or max or have been filtered out from bins computation | |
batch_bins = [text_bins[min(max(i-1, 0), len(text_bins)-1)] for i in index_bins] | |
return { | |
output_column_name: batch_bins | |
} | |
dataset = [df.map(batch_association, batched=True, batch_size=batch_size, input_columns=[column_name], num_proc=num_workers) for df in dataset] | |
return dataset, bin_edges | |
def speaker_level_relative_to_gender(dataset, text_bins, speaker_column_name, gender_column_name, column_name, output_column_name, batch_size = 4, num_workers=1, std_tolerance=None, save_dir=None, only_save_plot=False, bin_edges=None): | |
''' | |
Computes mean values on a speaker level and computes bins on top relative to the gender column name. | |
Then associate a text bin to the column. | |
This time, doesn't use leading_split_for_bins, computes it for all. Could probably be optimized | |
''' | |
list_data = [] | |
for df in dataset: | |
for split in df: | |
panda_data = df[split].remove_columns([col for col in df[split].column_names if col not in {speaker_column_name, column_name, gender_column_name}]).to_pandas() | |
list_data.append(panda_data) | |
dataframe = pd.concat(list_data, ignore_index=True) | |
dataframe = dataframe.groupby(speaker_column_name).agg({column_name: "mean", gender_column_name: "first"}) | |
if bin_edges is None: | |
bin_edges = {} | |
if save_dir is not None: | |
save_dict = {} | |
save_dict_afer_filtering = {} | |
for category in ["male", "female"]: | |
values = dataframe[dataframe[gender_column_name] == category][column_name] | |
values = np.array(values) | |
if save_dir is not None: | |
save_dict[category] = values | |
if std_tolerance is not None: | |
# filter out outliers | |
values = values[np.abs(values - np.mean(values)) < std_tolerance * np.std(values)] | |
if save_dir is not None: | |
save_dict_afer_filtering[category] = values | |
bin_edges[category] = np.histogram(values, len(text_bins))[1] | |
if save_dir is not None: | |
visualize_bins_to_text(save_dict["male"], save_dict["female"], "Male distribution", "Female distribution", text_bins, save_dir, output_column_name) | |
if std_tolerance is not None: | |
visualize_bins_to_text(save_dict_afer_filtering["male"], save_dict_afer_filtering["female"], "Male distribution", "Female distribution", text_bins, save_dir, f"{output_column_name}_after_filtering") | |
if only_save_plot: | |
return dataset, bin_edges | |
speaker_id_to_bins = dataframe.apply(lambda x: np.searchsorted(bin_edges[x[gender_column_name]], x[column_name]), axis=1).to_dict() | |
def batch_association(batch): | |
index_bins = [speaker_id_to_bins[speaker] for speaker in batch] | |
# do min(max(...)) when values are outside of the main bins | |
# it happens when value = min or max or have been filtered out from bins computation | |
batch_bins = [text_bins[min(max(i-1, 0), len(text_bins)-1)] for i in index_bins] | |
return { | |
output_column_name: batch_bins | |
} | |
dataset = [df.map(batch_association, batched=True, input_columns=[speaker_column_name], batch_size=batch_size, num_proc=num_workers) for df in dataset] | |
return dataset, bin_edges | |
if __name__ == "__main__": | |
set_start_method("spawn") | |
parser = argparse.ArgumentParser() | |
parser.add_argument("dataset_name", type=str, help="Path or name of the dataset(s). If multiple datasets, names have to be separated by `+`.") | |
parser.add_argument("--configuration", default=None, type=str, help="Dataset configuration(s) to use (or configuration separated by +).") | |
parser.add_argument("--output_dir", default=None, type=str, help="If specified, save the dataset(s) on disk. If multiple datasets, paths have to be separated by `+`.") | |
parser.add_argument("--repo_id", default=None, type=str, help="If specified, push the dataset(s) to the hub. If multiple datasets, names have to be separated by `+`.") | |
parser.add_argument("--path_to_text_bins", default=None, type=str, help="If specified, points to a JSON file which contains the text bins that will be associated to each bins. Will use default bins.") | |
parser.add_argument("--path_to_bin_edges", default=None, type=str, help="If specified, points to a JSON file which contains the bin edges. Useful if you want to apply already computed bins to new datasets. If not specified, will recompute bin edges from scratch.") | |
parser.add_argument("--save_bin_edges", default=None, type=str, help="If specified, it's the name of the JSON file which will contains the edge bins that have been computed. Useful if you want to reuse those bin eges on new datasets. By default, it won't save those edges..") | |
parser.add_argument("--avoid_pitch_computation", default=False, action="store_true", help="If `True`, will not compute `pitch`. Note that `pitch` is computed on a speaker-level, relative to gender, so you don't need it in a mono-speaker setting.") | |
parser.add_argument("--cpu_num_workers", default=1, type=int, help="Number of CPU workers.") | |
parser.add_argument("--batch_size", default=16, type=int, help="Batch size in `Dataset.map` operations. https://huggingface.co/docs/datasets/v2.17.0/en/package_reference/main_classes#datasets.Dataset.map") | |
parser.add_argument("--speaker_id_column_name", default="speaker_id", type=str, help="Speaker id column name. Only used if `avoid_pitch_computation=False`") | |
parser.add_argument("--gender_column_name", default="gender", type=str, help="Gender column name. .Only used if `avoid_pitch_computation=False`") | |
parser.add_argument("--pitch_std_tolerance", default=2., type=float, help="Standard deviation tolerance for pitch estimation. Any value that is outside mean ± std * tolerance is discared. Only used if `avoid_pitch_computation=False`.") | |
parser.add_argument("--speaking_rate_std_tolerance", default=4., type=float, help="Standard deviation tolerance for speaking rate estimation. Any value that is outside mean ± std * tolerance is discared. Only used if `path_to_bin_edges=False`.") | |
parser.add_argument("--snr_std_tolerance", default=3.5, type=float, help="Standard deviation tolerance for SNR estimation. Any value that is outside mean ± std * tolerance is discared. Only used if `path_to_bin_edges=False`.") | |
parser.add_argument("--reverberation_std_tolerance", default=4, type=float, help="Standard deviation tolerance for reverberation estimation. Any value that is outside mean ± std * tolerance is discared. Only used if `path_to_bin_edges=False`.") | |
parser.add_argument("--speech_monotony_std_tolerance", default=4, type=float, help="Standard deviation tolerance for speech monotony estimation. Any value that is outside mean ± std * tolerance is discared. Only used if `path_to_bin_edges=False`.") | |
parser.add_argument("--leading_split_for_bins", default=None, type=str, help="If specified, will use every split that contains this string to compute statistics. If not specified, will use every split. Only used if `path_to_bin_edges=False`.") | |
parser.add_argument("--plot_directory", default=None, type=str, help="If specified, will save visualizing plots to this directory. Only used if `path_to_bin_edges=False`.") | |
parser.add_argument("--only_save_plot", default=False, action="store_true", help="If `True` and `--plot_directory` is specified, will only compute plot. Only used if `path_to_bin_edges=False`.") | |
parser.add_argument("--snr_lower_range", default=50, type=float, help="The lower range of the SNR bins") | |
args = parser.parse_args() | |
if args.plot_directory is None and args.only_save_plot: | |
raise ValueError("`only_save_plot=true` but `plot_directory` is not specified. Please give a path to the directory where you want the plot to be saved.") | |
if args.only_save_plot and args.path_to_bin_edges: | |
raise ValueError("`only_save_plot=true` but `path_to_bin_edges` is specified. Since the latter is specified, we won't redo computations that would have been used for plotting. Chose one ar another. Note that if you use this script to label a new dataset for fine-tuning, I'd recommend avoiding plotting and set `only_save_plot=false`") | |
text_bins_dict = {} | |
if args.path_to_text_bins: | |
with open(args.path_to_text_bins) as json_file: | |
text_bins_dict = json.load(json_file) | |
bin_edges_dict = {} | |
if args.path_to_bin_edges: | |
with open(args.path_to_bin_edges) as json_file: | |
bin_edges_dict = json.load(json_file) | |
speaker_level_pitch_bins = text_bins_dict.get("speaker_level_pitch_bins", SPEAKER_LEVEL_PITCH_BINS) | |
speaker_rate_bins = text_bins_dict.get("speaker_rate_bins", SPEAKER_RATE_BINS) | |
snr_bins = text_bins_dict.get("snr_bins", SNR_BINS) | |
reverberation_bins = text_bins_dict.get("reverberation_bins", REVERBERATION_BINS) | |
utterance_level_std = text_bins_dict.get("utterance_level_std", UTTERANCE_LEVEL_STD) | |
output_dirs = [args.output_dir] if args.output_dir is not None else None | |
repo_ids = [args.repo_id] if args.repo_id is not None else None | |
if args.configuration: | |
if "+" in args.dataset_name: | |
dataset_names = args.dataset_name.split("+") | |
dataset_configs = args.configuration.split("+") | |
if len(dataset_names) != len(dataset_configs): | |
raise ValueError(f"There are {len(dataset_names)} datasets spotted but {len(dataset_configs)} configuration spotted") | |
if args.repo_id is not None: | |
repo_ids = args.repo_id.split("+") | |
if len(dataset_names) != len(repo_ids): | |
raise ValueError(f"There are {len(dataset_names)} datasets spotted but {len(repo_ids)} repository ids spotted") | |
if args.output_dir is not None: | |
output_dirs = args.output_dir.split("+") | |
if len(dataset_names) != len(output_dirs): | |
raise ValueError(f"There are {len(dataset_names)} datasets spotted but {len(output_dirs)} local paths on which to save the datasets spotted") | |
dataset = [] | |
for dataset_name, dataset_config in zip(dataset_names, dataset_configs): | |
tmp_dataset = load_dataset(dataset_name, dataset_config) | |
dataset.append(tmp_dataset) | |
else: | |
dataset = [load_dataset(args.dataset_name, args.configuration)] | |
dataset_configs = [args.configuration] | |
else: | |
if "+" in args.dataset_name: | |
dataset_names = args.dataset_name.split("+") | |
if args.repo_id is not None: | |
repo_ids = args.repo_id.split("+") | |
if len(dataset_names) != len(repo_ids): | |
raise ValueError(f"There are {len(dataset_names)} datasets spotted but {len(repo_ids)} repository ids spotted") | |
if args.output_dir is not None: | |
output_dirs = args.output_dir.split("+") | |
if len(dataset_names) != len(output_dirs): | |
raise ValueError(f"There are {len(dataset_names)} datasets spotted but {len(output_dirs)} local paths on which to save the datasets spotted") | |
dataset = [] | |
for dataset_name, dataset_config in zip(dataset_names): | |
tmp_dataset = load_dataset(dataset_name) | |
dataset.append(tmp_dataset) | |
else: | |
dataset = [load_dataset(args.dataset_name)] | |
if args.plot_directory: | |
Path(args.plot_directory).mkdir(parents=True, exist_ok=True) | |
if not args.avoid_pitch_computation: | |
bin_edges = None | |
if "pitch_bins_male" in bin_edges_dict and "pitch_bins_female" in bin_edges_dict: | |
bin_edges = {"male": bin_edges_dict["pitch_bins_male"], "female": bin_edges_dict["pitch_bins_female"]} | |
dataset, pitch_bin_edges = speaker_level_relative_to_gender(dataset, speaker_level_pitch_bins, args.speaker_id_column_name, args.gender_column_name, "utterance_pitch_mean", "pitch", batch_size=args.batch_size, num_workers=args.cpu_num_workers, std_tolerance=args.pitch_std_tolerance, save_dir=args.plot_directory, only_save_plot=args.only_save_plot, bin_edges=bin_edges) | |
dataset, speaking_rate_bin_edges = bins_to_text(dataset, speaker_rate_bins, "speaking_rate", "speaking_rate", batch_size=args.batch_size, num_workers=args.cpu_num_workers, leading_split_for_bins=args.leading_split_for_bins, std_tolerance=args.speaking_rate_std_tolerance, save_dir=args.plot_directory, only_save_plot=args.only_save_plot, bin_edges=bin_edges_dict.get("speaking_rate",None)) | |
dataset, noise_bin_edges = bins_to_text(dataset, snr_bins, "snr", "noise", batch_size=args.batch_size, num_workers=args.cpu_num_workers, leading_split_for_bins=args.leading_split_for_bins, std_tolerance=args.snr_std_tolerance, save_dir=args.plot_directory, only_save_plot=args.only_save_plot, bin_edges=bin_edges_dict.get("noise",None), lower_range=args.snr_lower_range) | |
dataset, reverberation_bin_edges = bins_to_text(dataset, reverberation_bins, "c50", "reverberation", batch_size=args.batch_size, num_workers=args.cpu_num_workers, leading_split_for_bins=args.leading_split_for_bins, std_tolerance=args.reverberation_std_tolerance, save_dir=args.plot_directory, only_save_plot=args.only_save_plot, bin_edges=bin_edges_dict.get("reverberation",None)) | |
dataset, speech_monotony_bin_edges = bins_to_text(dataset, utterance_level_std, "utterance_pitch_std", "speech_monotony", batch_size=args.batch_size, num_workers=args.cpu_num_workers, leading_split_for_bins=args.leading_split_for_bins, std_tolerance=args.speech_monotony_std_tolerance, save_dir=args.plot_directory, only_save_plot=args.only_save_plot, bin_edges=bin_edges_dict.get("speech_monotony",None)) | |
if args.save_bin_edges: | |
bin_edges = { | |
"speaking_rate": speaking_rate_bin_edges.tolist(), | |
"noise": noise_bin_edges.tolist(), | |
"reverberation": reverberation_bin_edges.tolist(), | |
"speech_monotony": speech_monotony_bin_edges.tolist(), | |
} | |
if not args.avoid_pitch_computation: | |
bin_edges["pitch_bins_male"] = pitch_bin_edges["male"].tolist() | |
bin_edges["pitch_bins_female"] = pitch_bin_edges["female"].tolist() | |
with open(args.save_bin_edges, "w") as outfile: | |
json.dump(bin_edges, outfile) | |
if not args.only_save_plot: | |
if args.output_dir: | |
for output_dir, df in zip(output_dirs, dataset): | |
df.save_to_disk(output_dir) | |
if args.repo_id: | |
for i, (repo_id, df) in enumerate(zip(repo_ids, dataset)): | |
if args.configuration: | |
df.push_to_hub(repo_id, dataset_configs[i]) | |
else: | |
df.push_to_hub(repo_id) |