Docker_v / dataset.py
XDHDD's picture
Upload 12 files
687e655
import glob
import os
import random
import librosa
import numpy as np
import soundfile as sf
import torch
from numpy.random import default_rng
from pydtmc import MarkovChain
from sklearn.model_selection import train_test_split
from torch.utils.data import Dataset
from config import CONFIG
np.random.seed(0)
rng = default_rng()
def load_audio(
path,
sample_rate: int = 16000,
chunk_len=None,
):
with sf.SoundFile(path) as f:
sr = f.samplerate
audio_len = f.frames
if chunk_len is not None and chunk_len < audio_len:
start_index = torch.randint(0, audio_len - chunk_len, (1,))[0]
frames = f._prepare_read(start_index, start_index + chunk_len, -1)
audio = f.read(frames, always_2d=True, dtype="float32")
else:
audio = f.read(always_2d=True, dtype="float32")
if sr != sample_rate:
audio = librosa.resample(np.squeeze(audio), sr, sample_rate)[:, np.newaxis]
return audio.T
def pad(sig, length):
if sig.shape[1] < length:
pad_len = length - sig.shape[1]
sig = torch.hstack((sig, torch.zeros((sig.shape[0], pad_len))))
else:
start = random.randint(0, sig.shape[1] - length)
sig = sig[:, start:start + length]
return sig
class MaskGenerator:
def __init__(self, is_train=True, probs=((0.9, 0.1), (0.5, 0.1), (0.5, 0.5))):
'''
is_train: if True, mask generator for training otherwise for evaluation
probs: a list of transition probability (p_N, p_L) for Markov Chain. Only allow 1 tuple if 'is_train=False'
'''
self.is_train = is_train
self.probs = probs
self.mcs = []
if self.is_train:
for prob in probs:
self.mcs.append(MarkovChain([[prob[0], 1 - prob[0]], [1 - prob[1], prob[1]]], ['1', '0']))
else:
assert len(probs) == 1
prob = self.probs[0]
self.mcs.append(MarkovChain([[prob[0], 1 - prob[0]], [1 - prob[1], prob[1]]], ['1', '0']))
def gen_mask(self, length, seed=0):
if self.is_train:
mc = random.choice(self.mcs)
else:
mc = self.mcs[0]
mask = mc.walk(length - 1, seed=seed)
mask = np.array(list(map(int, mask)))
return mask
class TestLoader(Dataset):
def __init__(self):
dataset_name = CONFIG.DATA.dataset
self.mask = CONFIG.DATA.EVAL.masking
self.target_root = CONFIG.DATA.data_dir[dataset_name]['root']
txt_list = CONFIG.DATA.data_dir[dataset_name]['test']
self.data_list = self.load_txt(txt_list)
if self.mask == 'real':
trace_txt = glob.glob(os.path.join(CONFIG.DATA.EVAL.trace_path, '*.txt'))
trace_txt.sort()
self.trace_list = [1 - np.array(list(map(int, open(txt, 'r').read().strip('\n').split('\n')))) for txt in
trace_txt]
else:
self.mask_generator = MaskGenerator(is_train=False, probs=CONFIG.DATA.EVAL.transition_probs)
self.sr = CONFIG.DATA.sr
self.stride = CONFIG.DATA.stride
self.window_size = CONFIG.DATA.window_size
self.audio_chunk_len = CONFIG.DATA.audio_chunk_len
self.p_size = CONFIG.DATA.EVAL.packet_size # 20ms
self.hann = torch.sqrt(torch.hann_window(self.window_size))
def __len__(self):
return len(self.data_list)
def load_txt(self, txt_list):
target = []
with open(txt_list) as f:
for line in f:
target.append(os.path.join(self.target_root, line.strip('\n')))
target = list(set(target))
target.sort()
return target
def __getitem__(self, index):
target = load_audio(self.data_list[index], sample_rate=self.sr)
target = target[:, :(target.shape[1] // self.p_size) * self.p_size]
sig = np.reshape(target, (-1, self.p_size)).copy()
if self.mask == 'real':
mask = self.trace_list[index % len(self.trace_list)]
mask = np.repeat(mask, np.ceil(len(sig) / len(mask)), 0)[:len(sig)][:, np.newaxis]
else:
mask = self.mask_generator.gen_mask(len(sig), seed=index)[:, np.newaxis]
sig *= mask
sig = torch.tensor(sig).reshape(-1)
target = torch.tensor(target).squeeze(0)
sig_wav = sig.clone()
target_wav = target.clone()
target = torch.stft(target, self.window_size, self.stride, window=self.hann,
return_complex=False).permute(2, 0, 1)
sig = torch.stft(sig, self.window_size, self.stride, window=self.hann, return_complex=False).permute(2, 0, 1)
return sig.float(), target.float(), sig_wav, target_wav
class BlindTestLoader(Dataset):
def __init__(self, test_dir):
self.data_list = glob.glob(os.path.join(test_dir, '*.wav'))
self.sr = CONFIG.DATA.sr
self.stride = CONFIG.DATA.stride
self.chunk_len = CONFIG.DATA.window_size
self.hann = torch.sqrt(torch.hann_window(self.chunk_len))
def __len__(self):
return len(self.data_list)
def __getitem__(self, index):
sig = load_audio(self.data_list[index], sample_rate=self.sr)
sig = torch.from_numpy(sig).squeeze(0)
sig = torch.stft(sig, self.chunk_len, self.stride, window=self.hann, return_complex=False).permute(2, 0, 1)
return sig.float()
class TrainDataset(Dataset):
def __init__(self, mode='train'):
dataset_name = CONFIG.DATA.dataset
self.target_root = CONFIG.DATA.data_dir[dataset_name]['root']
txt_list = CONFIG.DATA.data_dir[dataset_name]['train']
self.data_list = self.load_txt(txt_list)
if mode == 'train':
self.data_list, _ = train_test_split(self.data_list, test_size=CONFIG.TRAIN.val_split, random_state=0)
elif mode == 'val':
_, self.data_list = train_test_split(self.data_list, test_size=CONFIG.TRAIN.val_split, random_state=0)
self.p_sizes = CONFIG.DATA.TRAIN.packet_sizes
self.mode = mode
self.sr = CONFIG.DATA.sr
self.window = CONFIG.DATA.audio_chunk_len
self.stride = CONFIG.DATA.stride
self.chunk_len = CONFIG.DATA.window_size
self.hann = torch.sqrt(torch.hann_window(self.chunk_len))
self.mask_generator = MaskGenerator(is_train=True, probs=CONFIG.DATA.TRAIN.transition_probs)
def __len__(self):
return len(self.data_list)
def load_txt(self, txt_list):
target = []
with open(txt_list) as f:
for line in f:
target.append(os.path.join(self.target_root, line.strip('\n')))
target = list(set(target))
target.sort()
return target
def fetch_audio(self, index):
sig = load_audio(self.data_list[index], sample_rate=self.sr, chunk_len=self.window)
while sig.shape[1] < self.window:
idx = torch.randint(0, len(self.data_list), (1,))[0]
pad_len = self.window - sig.shape[1]
if pad_len < 0.02 * self.sr:
padding = np.zeros((1, pad_len), dtype=np.float)
else:
padding = load_audio(self.data_list[idx], sample_rate=self.sr, chunk_len=pad_len)
sig = np.hstack((sig, padding))
return sig
def __getitem__(self, index):
sig = self.fetch_audio(index)
sig = sig.reshape(-1).astype(np.float32)
target = torch.tensor(sig.copy())
p_size = random.choice(self.p_sizes)
sig = np.reshape(sig, (-1, p_size))
mask = self.mask_generator.gen_mask(len(sig), seed=index)[:, np.newaxis]
sig *= mask
sig = torch.tensor(sig.copy()).reshape(-1)
target = torch.stft(target, self.chunk_len, self.stride, window=self.hann,
return_complex=False).permute(2, 0, 1).float()
sig = torch.stft(sig, self.chunk_len, self.stride, window=self.hann, return_complex=False)
sig = sig.permute(2, 0, 1).float()
return sig, target