|
import itertools |
|
import os |
|
from pathlib import Path |
|
from typing import List, Optional, Tuple, Union |
|
|
|
import librosa |
|
import numpy as np |
|
import pandas as pd |
|
import soundfile as sf |
|
from joblib import Parallel, delayed |
|
from sklearn.model_selection import StratifiedGroupKFold |
|
from tqdm.autonotebook import tqdm |
|
|
|
from modeling.transforms import LabelsFromTxt, ParentMultilabel |
|
from modeling.utils import get_file_info, sync_bpm, sync_onset, sync_pitch |
|
|
|
|
|
def generate_metadata( |
|
data_dir: Union[str, Path], |
|
save_path: str = ".", |
|
subset: str = "train", |
|
extract_music_features: bool = False, |
|
n_jobs: int = -2, |
|
): |
|
""" |
|
Generate metadata CSV file containing information about audio files in a directory. |
|
|
|
:param data_dir: Directory containing audio files. |
|
:type data_dir: Union[str, Path] |
|
:param save_path: Directory path to save metadata CSV file. |
|
:type save_path: str |
|
:param subset: Subset of the dataset (train or test), defaults to 'train'. |
|
:type subset: str |
|
:param extract_music_features: Flag to indicate whether to extract music features or not, defaults to False. |
|
:type extract_music_features: bool |
|
:param n_jobs: Number of parallel jobs to run, defaults to -2. |
|
:type n_jobs: int |
|
:raises FileNotFoundError: If the provided data directory does not exist. |
|
:return: DataFrame containing the metadata information. |
|
:rtype: pandas.DataFrame |
|
""" |
|
|
|
data_dir = Path(data_dir) if isinstance(data_dir, str) else data_dir |
|
|
|
if subset == "train": |
|
pattern = r"(.*)__[\d]+$" |
|
label_extractor = ParentMultilabel() |
|
else: |
|
pattern = r"(.*)-[\d]+$" |
|
label_extractor = LabelsFromTxt() |
|
|
|
sound_files = list(data_dir.glob("**/*.wav")) |
|
output = Parallel(n_jobs=n_jobs)(delayed(get_file_info)(path, extract_music_features) for path in tqdm(sound_files)) |
|
|
|
df = pd.DataFrame(data=output) |
|
|
|
df["fname"] = df.path.map(lambda x: Path(x).stem) |
|
df["song_name"] = df.fname.str.extract(pattern) |
|
df["inst"] = df.path.map(lambda x: "-".join(sorted(list(label_extractor(x))))) |
|
df["label_count"] = df.inst.map(lambda x: len(x.split("-"))) |
|
|
|
df.to_csv(f"{save_path}/metadata_{subset}.csv", index=False) |
|
|
|
return df |
|
|
|
|
|
def create_test_split(metadata_path: str, txt_save_path: str, random_state: Optional[int] = None): |
|
"""Create test split by generating a list of test songs and saving them to a text file. |
|
|
|
:param metadata_path: Path to the CSV file containing metadata of all songs |
|
:type metadata_path: str |
|
:param txt_save_path: Path to the directory where the text file containing test songs will be saved |
|
:type txt_save_path: str |
|
:param random_state: Seed value for the random number generator, defaults to None |
|
:type random_state: int, optional |
|
:raises TypeError: If metadata_path or txt_save_path is not a string or if random_state is not an integer or None |
|
:raises FileNotFoundError: If metadata_path does not exist |
|
:raises PermissionError: If the program does not have permission to write to txt_save_path |
|
:return: None |
|
:rtype: None |
|
""" |
|
|
|
df = pd.read_csv(metadata_path) |
|
kf = StratifiedGroupKFold(n_splits=2, shuffle=True, random_state=random_state) |
|
splits = kf.split(df.fname, df.inst, groups=df.song_name) |
|
_, test = list(splits)[0] |
|
|
|
test_songs = df.iloc[test].fname.sort_values().to_numpy() |
|
|
|
with open(f"{txt_save_path}/test_songs.txt", "w") as f: |
|
|
|
for song in test_songs: |
|
f.write(song + "\n") |
|
|
|
|
|
class IRMASPreprocessor: |
|
""" |
|
A class to preprocess IRMAS dataset metadata and create a mapping between |
|
file paths and their corresponding instrument labels. |
|
|
|
:param metadata: A pandas DataFrame or path to csv file containing metadata, defaults to None |
|
:type metadata: Union[pd.DataFrame, str], optional |
|
:param data_dir: Path to the directory containing the IRMAS dataset, defaults to None |
|
:type data_dir: Union[str, Path], optional |
|
:param sample_rate: Sample rate of the audio files, defaults to 16000 |
|
:type sample_rate: int, optional |
|
|
|
:raises AssertionError: Raised when metadata is None and data_dir is also None. |
|
|
|
:return: An instance of IRMASPreprocessor |
|
:rtype: IRMASPreprocessor |
|
""" |
|
|
|
def __init__( |
|
self, metadata: Union[pd.DataFrame, str] = None, data_dir: Union[str, Path] = None, sample_rate: int = 16000 |
|
): |
|
if metadata is not None: |
|
self.metadata = pd.read_csv(metadata) if isinstance(metadata, str) else metadata |
|
if data_dir is not None: |
|
self.metadata["path"] = self.metadata.apply(lambda x: f"{data_dir}/{x.inst}/{x.fname}.wav", axis=1) |
|
else: |
|
assert data_dir is not None, "No metadata found. Need to provide data directory" |
|
self.metadata = generate_metadata(data_dir=data_dir, subset="train", extract_music_features=True) |
|
|
|
self.instruments = self.metadata.inst.unique() |
|
self.sample_rate = sample_rate |
|
|
|
def preprocess_and_mix(self, save_dir: str, sync: str, ordered: bool, num_track_to_mix: int, n_jobs: int = -2): |
|
""" |
|
A method to preprocess and mix audio tracks from the IRMAS dataset. |
|
|
|
:param save_dir: The directory to save the preprocessed and mixed tracks |
|
:type save_dir: str |
|
:param sync: The column name used to synchronize the audio tracks during mixing |
|
:type sync: str |
|
:param ordered: Whether to order the metadata by the sync column before mixing the tracks |
|
:type ordered: bool |
|
:param num_track_to_mix: The number of tracks to mix together |
|
:type num_track_to_mix: int |
|
:param n_jobs: The number of parallel jobs to run, defaults to -2 |
|
:type n_jobs: int, optional |
|
|
|
:raises None |
|
|
|
:return: None |
|
:rtype: None |
|
""" |
|
|
|
combs = itertools.combinations(self.instruments, r=num_track_to_mix) |
|
|
|
if ordered: |
|
self.metadata = self.metadata.sort_values(by=sync) |
|
else: |
|
self.metadata = self.metadata.sample(frac=1) |
|
|
|
Parallel(n_jobs=n_jobs)(delayed(self._mix)(insts, save_dir, sync) for (insts) in tqdm(combs)) |
|
print("Parallel preprocessing done!") |
|
|
|
def _mix(self, insts: Tuple[str], save_dir: str, sync: str): |
|
""" |
|
A private method to mix audio tracks and save them to disk. |
|
|
|
:param insts: A tuple of instrument labels to mix |
|
:type insts: Tuple[str] |
|
:param save_dir: The directory to save the mixed tracks |
|
:type save_dir: str |
|
:param sync: The column name used to synchronize the audio tracks during mixing |
|
:type sync: str |
|
|
|
:raises None |
|
|
|
:return: None |
|
:rtype: None |
|
""" |
|
|
|
save_dir = self._create_save_dir(insts, save_dir) |
|
|
|
insts_files_list = [self._get_filepaths(inst) for inst in insts] |
|
|
|
max_length = max([inst_files.shape[0] for inst_files in insts_files_list]) |
|
for i, inst_files in enumerate(insts_files_list): |
|
if inst_files.shape[0] < max_length: |
|
diff = max_length - inst_files.shape[0] |
|
inst_files = np.pad(inst_files, (0, diff), mode="symmetric") |
|
insts_files_list[i] = [Path(x) for x in inst_files] |
|
|
|
self._mix_files_and_save(insts_files_list, save_dir, sync) |
|
|
|
def _get_filepaths(self, inst: str): |
|
""" |
|
A private method to retrieve file paths of audio tracks for a given instrument label. |
|
|
|
:param inst: The label of the instrument for which to retrieve the file paths |
|
:type inst: str |
|
|
|
:raises KeyError: Raised when the instrument label is not found in the metadata. |
|
|
|
:return: A numpy array of file paths corresponding to the instrument label. |
|
:rtype: numpy.ndarray |
|
""" |
|
|
|
metadata = self.metadata.loc[self.metadata.inst == inst] |
|
|
|
if metadata.empty: |
|
raise KeyError("Instrument not found. Please regenerate metadata!") |
|
|
|
files = metadata.path.to_numpy() |
|
|
|
return files |
|
|
|
def _mix_files_and_save(self, insts_files_list: List[List[Path]], save_dir: str, sync: str): |
|
""" |
|
A private method to mix audio files, synchronize them using a given column name in the metadata, |
|
and save the mixed file to disk. |
|
|
|
:param insts_files_list: A list of lists of file paths corresponding to each instrument label |
|
:type insts_files_list: List[List[Path]] |
|
:param save_dir: The directory to save the mixed tracks |
|
:type save_dir: str |
|
:param sync: The column name used to synchronize the audio tracks during mixing |
|
:type sync: str |
|
|
|
:raises None |
|
|
|
:return: None |
|
:rtype: None |
|
""" |
|
|
|
for i in range(len(insts_files_list[0])): |
|
files_to_sync = [inst_files[i] for inst_files in insts_files_list] |
|
new_name = f"{'-'.join([file.stem for file in files_to_sync])}.wav" |
|
|
|
synced_file = self._sync_and_mix(files_to_sync, sync) |
|
sf.write(os.path.join(save_dir, new_name), synced_file, samplerate=self.sample_rate) |
|
|
|
def _sync_and_mix(self, files_to_sync: List[Path], sync: str): |
|
""" |
|
Synchronize and mix audio files. |
|
|
|
:param files_to_sync: A list of file paths to synchronize and mix. |
|
:type files_to_sync: List[Path] |
|
:param sync: The type of synchronization to use. One of ['bpm', 'pitch', None]. |
|
:type sync: str, optional |
|
:raises KeyError: If any file in files_to_sync is not found in metadata. |
|
:return: The synchronized and mixed audio signal. |
|
:rtype: numpy.ndarray |
|
""" |
|
|
|
cols = ["pitch", "bpm", "onset"] |
|
files_metadata_df = self.metadata.loc[ |
|
self.metadata.path.isin([str(file_path) for file_path in files_to_sync]) |
|
].set_index("path") |
|
|
|
num_files = files_metadata_df.shape[0] |
|
if num_files != len(files_to_sync): |
|
raise KeyError("File not found in metadata. Please regenerate") |
|
|
|
if sync is not None: |
|
mean_features = files_metadata_df[cols].mean().to_dict() |
|
|
|
metadata_dict = files_metadata_df.to_dict("index") |
|
|
|
for i, (file_to_sync_path, features) in enumerate(metadata_dict.items()): |
|
file_to_sync, sr_sync = librosa.load(file_to_sync_path, sr=None) |
|
|
|
if sr_sync != 44100: |
|
file_to_sync = librosa.resample(y=file_to_sync, orig_sr=sr_sync, target_sr=self.sample_rate) |
|
|
|
if sync == "bpm": |
|
file_to_sync = sync_bpm(file_to_sync, sr_sync, bpm_base=mean_features["bpm"], bpm=features["bpm"]) |
|
|
|
if sync == "pitch": |
|
file_to_sync = sync_pitch( |
|
file_to_sync, sr_sync, pitch_base=mean_features["pitch"], pitch=features["pitch"] |
|
) |
|
|
|
if sync is not None: |
|
file_to_sync = sync_onset( |
|
file_to_sync, sr_sync, onset_base=mean_features["onset"], onset=features["onset"] |
|
) |
|
|
|
file_to_sync = librosa.util.normalize(file_to_sync) |
|
|
|
if i == 0: |
|
mixed_sound = np.zeros_like(file_to_sync) |
|
|
|
if mixed_sound.shape[0] > file_to_sync.shape[0]: |
|
file_to_sync = np.resize(file_to_sync, mixed_sound.shape) |
|
else: |
|
mixed_sound = np.resize(mixed_sound, file_to_sync.shape) |
|
|
|
mixed_sound += file_to_sync |
|
|
|
mixed_sound /= num_files |
|
|
|
return librosa.resample(y=mixed_sound, orig_sr=44100, target_sr=self.sample_rate) |
|
|
|
def _create_save_dir(self, insts: Union[Tuple[str], List[str]], save_dir: str): |
|
""" |
|
Create and return a directory to save instrument-specific files. |
|
|
|
:param insts: A tuple or list of instrument names. |
|
:type insts: Union[Tuple[str], List[str]] |
|
:param save_dir: The path to the directory where the new directory will be created. |
|
:type save_dir: str |
|
:return: The path to the newly created directory. |
|
:rtype: str |
|
""" |
|
|
|
new_dir_name = "-".join(insts) |
|
new_dir_path = os.path.join(save_dir, new_dir_name) |
|
os.makedirs(new_dir_path, exist_ok=True) |
|
return new_dir_path |
|
|
|
@classmethod |
|
def from_metadata(cls, metadata_path: str, **kwargs): |
|
""" |
|
Create a new instance of the class from a metadata file. |
|
|
|
:param metadata_path: The path to the metadata file. |
|
:type metadata_path: str |
|
:param **kwargs: Additional keyword arguments to pass to the class constructor. |
|
:return: A new instance of the class. |
|
:rtype: cls |
|
""" |
|
|
|
metadata = pd.read_csv(metadata_path) |
|
return cls(metadata, **kwargs) |
|
|
|
|
|
if __name__ == "__main__": |
|
data_dir = "/home/kpintaric/lumen-irmas/data/raw/IRMAS_Training_Data" |
|
metadata_path = "/home/kpintaric/lumen-irmas/data/metadata_train.csv" |
|
preprocess = IRMASPreprocessor(metadata=metadata_path, data_dir=data_dir) |
|
preprocess.preprocess_and_mix(save_dir="data", sync="pitch", ordered=False, num_track_to_mix=3) |
|
a = 1 |
|
|