Spaces:
Running
on
A10G
Running
on
A10G
# Copyright (c) 2023 Amphion. | |
# | |
# This source code is licensed under the MIT license found in the | |
# LICENSE file in the root directory of this source tree. | |
import os | |
import json | |
import pickle | |
import glob | |
from collections import defaultdict | |
from tqdm import tqdm | |
# Train: male 20 hours, female 10 hours | |
TRAIN_MALE_MAX_SECONDS = 20 * 3600 | |
TRAIN_FEMALE_MAX_SECONDS = 10 * 3600 | |
TEST_MAX_NUM_EVERY_PERSON = 5 | |
def select_sample_idxs(): | |
chosen_speakers = get_chosen_speakers() | |
with open(os.path.join(vctk_dir, "train.json"), "r") as f: | |
raw_train = json.load(f) | |
with open(os.path.join(vctk_dir, "test.json"), "r") as f: | |
raw_test = json.load(f) | |
train_idxs, test_idxs = [], [] | |
# =========== Test =========== | |
test_nums = defaultdict(int) | |
for utt in tqdm(raw_train): | |
idx = utt["index"] | |
singer = utt["Singer"] | |
if singer in chosen_speakers and test_nums[singer] < TEST_MAX_NUM_EVERY_PERSON: | |
test_nums[singer] += 1 | |
test_idxs.append("train_{}".format(idx)) | |
for utt in tqdm(raw_test): | |
idx = utt["index"] | |
singer = utt["Singer"] | |
if singer in chosen_speakers and test_nums[singer] < TEST_MAX_NUM_EVERY_PERSON: | |
test_nums[singer] += 1 | |
test_idxs.append("test_{}".format(idx)) | |
# =========== Train =========== | |
for utt in tqdm(raw_train): | |
idx = utt["index"] | |
singer = utt["Singer"] | |
if singer in chosen_speakers and "train_{}".format(idx) not in test_idxs: | |
train_idxs.append("train_{}".format(idx)) | |
for utt in tqdm(raw_test): | |
idx = utt["index"] | |
singer = utt["Singer"] | |
if singer in chosen_speakers and "test_{}".format(idx) not in test_idxs: | |
train_idxs.append("test_{}".format(idx)) | |
train_idxs.sort() | |
test_idxs.sort() | |
return train_idxs, test_idxs, raw_train, raw_test | |
def statistics_of_speakers(): | |
speaker2time = defaultdict(float) | |
sex2time = defaultdict(float) | |
with open(os.path.join(vctk_dir, "train.json"), "r") as f: | |
train = json.load(f) | |
with open(os.path.join(vctk_dir, "test.json"), "r") as f: | |
test = json.load(f) | |
for utt in train + test: | |
# minutes | |
speaker2time[utt["Singer"]] += utt["Duration"] | |
# hours | |
sex2time[utt["Singer"].split("_")[0]] += utt["Duration"] | |
print( | |
"Female: {:.2f} hours, Male: {:.2f} hours.\n".format( | |
sex2time["female"] / 3600, sex2time["male"] / 3600 | |
) | |
) | |
speaker2time = sorted(speaker2time.items(), key=lambda x: x[-1], reverse=True) | |
for singer, seconds in speaker2time: | |
print("{}\t{:.2f} mins".format(singer, seconds / 60)) | |
return speaker2time | |
def get_chosen_speakers(): | |
speaker2time = statistics_of_speakers() | |
chosen_time = defaultdict(float) | |
chosen_speaker = defaultdict(list) | |
train_constrait = { | |
"male": TRAIN_MALE_MAX_SECONDS, | |
"female": TRAIN_FEMALE_MAX_SECONDS, | |
} | |
for speaker, seconds in speaker2time: | |
sex = speaker.split("_")[0] | |
if chosen_time[sex] < train_constrait[sex]: | |
chosen_time[sex] += seconds | |
chosen_speaker[sex].append(speaker) | |
speaker2time = dict(speaker2time) | |
chosen_speaker = chosen_speaker["male"] + chosen_speaker["female"] | |
print("\n#Chosen speakers = {}".format(len(chosen_speaker))) | |
for spk in chosen_speaker: | |
print("{}\t{:.2f} mins".format(spk, speaker2time[spk] / 60)) | |
return chosen_speaker | |
if __name__ == "__main__": | |
root_path = "" | |
vctk_dir = os.path.join(root_path, "vctk") | |
fewspeaker_dir = os.path.join(root_path, "vctkfewspeaker") | |
os.makedirs(fewspeaker_dir, exist_ok=True) | |
train_idxs, test_idxs, raw_train, raw_test = select_sample_idxs() | |
print("#Train = {}, #Test = {}".format(len(train_idxs), len(test_idxs))) | |
# There are no data leakage | |
assert len(set(train_idxs).intersection(set(test_idxs))) == 0 | |
for idx in train_idxs + test_idxs: | |
# No test data of raw vctk | |
assert "test_" not in idx | |
for split, chosen_idxs in zip(["train", "test"], [train_idxs, test_idxs]): | |
print("{}: #chosen idx = {}\n".format(split, len(chosen_idxs))) | |
# Select features | |
feat_files = glob.glob("**/train.pkl", root_dir=vctk_dir, recursive=True) | |
for file in tqdm(feat_files): | |
raw_file = os.path.join(vctk_dir, file) | |
new_file = os.path.join( | |
fewspeaker_dir, file.replace("train.pkl", "{}.pkl".format(split)) | |
) | |
new_dir = "/".join(new_file.split("/")[:-1]) | |
os.makedirs(new_dir, exist_ok=True) | |
if "mel_min" in file or "mel_max" in file: | |
os.system("cp {} {}".format(raw_file, new_file)) | |
continue | |
with open(raw_file, "rb") as f: | |
raw_feats = pickle.load(f) | |
print("file: {}, #raw_feats = {}".format(file, len(raw_feats))) | |
new_feats = [] | |
for idx in chosen_idxs: | |
chosen_split_is_train, raw_idx = idx.split("_") | |
assert chosen_split_is_train == "train" | |
new_feats.append(raw_feats[int(raw_idx)]) | |
with open(new_file, "wb") as f: | |
pickle.dump(new_feats, f) | |
print("New file: {}, #new_feats = {}".format(new_file, len(new_feats))) | |
# Utterance re-index | |
news_utts = [raw_train[int(idx.split("_")[-1])] for idx in chosen_idxs] | |
for i, utt in enumerate(news_utts): | |
utt["Dataset"] = "vctkfewsinger" | |
utt["index"] = i | |
with open(os.path.join(fewspeaker_dir, "{}.json".format(split)), "w") as f: | |
json.dump(news_utts, f, indent=4) | |