dbal0503's picture
Upload 693 files
2ce7b1a
raw
history blame
17.2 kB
"""
Adapted from https://github.com/dwromero/ckconv/blob/dc84dceb490cab2f2ddf609c380083367af21890/datasets/speech_commands.py
which is
adapted from https://github.com/patrick-kidger/NeuralCDE/blob/758d3a7134e3a691013e5cc6b7f68f277e9e6b69/experiments/datasets/speech_commands.py
"""
import os
import pathlib
import tarfile
import urllib.request
import sklearn.model_selection
import torch
import torch.nn.functional as F
import torchaudio
def pad(channel, maxlen):
channel = torch.tensor(channel)
out = torch.full((maxlen,), channel[-1])
out[: channel.size(0)] = channel
return out
def subsample(X, y, subsample_rate):
if subsample_rate != 1:
X = X[:, ::subsample_rate, :]
return X, y
def save_data(dir, **tensors):
for tensor_name, tensor_value in tensors.items():
torch.save(tensor_value, str(dir / tensor_name) + ".pt")
def load_data(dir):
tensors = {}
for filename in os.listdir(dir):
if filename.endswith(".pt"):
tensor_name = filename.split(".")[0]
tensor_value = torch.load(str(dir / filename))
tensors[tensor_name] = tensor_value
return tensors
def normalise_data(X, y):
train_X, _, _ = split_data(X, y)
out = []
for Xi, train_Xi in zip(X.unbind(dim=-1), train_X.unbind(dim=-1)):
train_Xi_nonan = train_Xi.masked_select(~torch.isnan(train_Xi))
mean = train_Xi_nonan.mean() # compute statistics using only training data.
std = train_Xi_nonan.std()
out.append((Xi - mean) / (std + 1e-5))
out = torch.stack(out, dim=-1)
return out
def normalize_all_data(X_train, X_val, X_test):
for i in range(X_train.shape[-1]):
mean = X_train[:, :, i].mean()
std = X_train[:, :, i].std()
X_train[:, :, i] = (X_train[:, :, i] - mean) / (std + 1e-5)
X_val[:, :, i] = (X_val[:, :, i] - mean) / (std + 1e-5)
X_test[:, :, i] = (X_test[:, :, i] - mean) / (std + 1e-5)
return X_train, X_val, X_test
def minmax_scale(tensor):
min_val = torch.amin(tensor, dim=(1, 2), keepdim=True)
max_val = torch.amax(tensor, dim=(1, 2), keepdim=True)
return (tensor - min_val) / (max_val - min_val)
def mu_law_encode(audio, bits=8):
"""
Perform mu-law companding transformation.
"""
mu = torch.tensor(2**bits - 1)
# Audio must be min-max scaled between -1 and 1
audio = 2 * minmax_scale(audio) - 1
# Perform mu-law companding transformation.
numerator = torch.log1p(mu * torch.abs(audio))
denominator = torch.log1p(mu)
encoded = torch.sign(audio) * (numerator / denominator)
# Quantize signal to the specified number of levels.
return ((encoded + 1) / 2 * mu + 0.5).to(torch.int32)
def mu_law_decode(encoded, bits=8):
"""
Perform inverse mu-law transformation.
"""
mu = 2**bits - 1
# Invert the quantization
x = (encoded / mu) * 2 - 1
# Invert the mu-law transformation
x = torch.sign(x) * ((1 + mu)**(torch.abs(x)) - 1) / mu
return x
def split_data(tensor, stratify):
# 0.7/0.15/0.15 train/val/test split
(
train_tensor,
testval_tensor,
train_stratify,
testval_stratify,
) = sklearn.model_selection.train_test_split(
tensor,
stratify,
train_size=0.7,
random_state=0,
shuffle=True,
stratify=stratify,
)
val_tensor, test_tensor = sklearn.model_selection.train_test_split(
testval_tensor,
train_size=0.5,
random_state=1,
shuffle=True,
stratify=testval_stratify,
)
return train_tensor, val_tensor, test_tensor
class _SpeechCommands(torch.utils.data.TensorDataset):
SUBSET_CLASSES = [
"yes",
"no",
"up",
"down",
"left",
"right",
"on",
"off",
"stop",
"go",
]
ALL_CLASSES = [
"bed",
"cat",
"down",
"five",
"forward",
"go",
"house",
"left",
"marvin",
"no",
"on",
"right",
"sheila",
"tree",
"up",
"visual",
"yes",
"backward",
"bird",
"dog",
"eight",
"follow",
"four",
"happy",
"learn",
"nine",
"off",
"one",
"seven",
"six",
"stop",
"three",
"two",
"wow",
"zero",
]
def __init__(
self,
partition: str, # `train`, `val`, `test`
length: int, # sequence length
mfcc: bool, # whether to use MFCC features (`True`) or raw features
sr: int, # subsampling rate: default should be 1 (no subsampling); keeps every kth sample
dropped_rate: float, # rate at which samples are dropped, lies in [0, 100.]
path: str,
all_classes: bool = False,
gen: bool = False, # whether we are doing speech generation
discrete_input: bool = False, # whether we are using discrete inputs
):
self.dropped_rate = dropped_rate
self.all_classes = all_classes
self.gen = gen
self.discrete_input = discrete_input
self.root = pathlib.Path(path) # pathlib.Path("./data")
base_loc = self.root / "SpeechCommands" / "processed_data"
if mfcc:
data_loc = base_loc / "mfcc"
elif gen:
data_loc = base_loc / "gen"
else:
data_loc = base_loc / "raw"
if self.dropped_rate != 0:
data_loc = pathlib.Path(
str(data_loc) + "_dropped{}".format(self.dropped_rate)
)
if self.all_classes:
data_loc = pathlib.Path(str(data_loc) + "_all_classes")
if self.discrete_input:
data_loc = pathlib.Path(str(data_loc) + "_discrete")
if os.path.exists(data_loc):
pass
else:
self.download()
if not self.all_classes:
train_X, val_X, test_X, train_y, val_y, test_y = self._process_data(mfcc)
else:
train_X, val_X, test_X, train_y, val_y, test_y = self._process_all(mfcc)
if not os.path.exists(base_loc):
os.mkdir(base_loc)
if not os.path.exists(data_loc):
os.mkdir(data_loc)
save_data(
data_loc,
train_X=train_X,
val_X=val_X,
test_X=test_X,
train_y=train_y,
val_y=val_y,
test_y=test_y,
)
X, y = self.load_data(data_loc, partition) # (batch, length, 1)
if self.gen: y = y.transpose(1, 2)
if not mfcc and not self.gen:
X = F.pad(X, (0, 0, 0, length-16000))
# Subsample
if not mfcc:
X, y = subsample(X, y, sr)
if self.discrete_input:
X = X.long().squeeze()
super(_SpeechCommands, self).__init__(X, y)
def download(self):
root = self.root
base_loc = root / "SpeechCommands"
loc = base_loc / "speech_commands.tar.gz"
if os.path.exists(loc):
return
if not os.path.exists(root):
os.mkdirs(root)
if not os.path.exists(base_loc):
os.mkdir(base_loc)
urllib.request.urlretrieve(
"http://download.tensorflow.org/data/speech_commands_v0.02.tar.gz", loc
) # TODO: Add progress bar
with tarfile.open(loc, "r") as f:
f.extractall(base_loc)
def _process_all(self, mfcc):
assert self.dropped_rate == 0, "Dropped rate must be 0 for all classes"
base_loc = self.root / "SpeechCommands"
with open(base_loc / "validation_list.txt", "r") as f:
validation_list = set([line.rstrip() for line in f])
with open(base_loc / "testing_list.txt", "r") as f:
testing_list = set([line.rstrip() for line in f])
train_X, val_X, test_X = [], [], []
train_y, val_y, test_y = [], [], []
batch_index = 0
y_index = 0
for foldername in self.ALL_CLASSES:
print(foldername)
loc = base_loc / foldername
for filename in os.listdir(loc):
audio, _ = torchaudio.load(
loc / filename, channels_first=False,
)
audio = (
audio / 2 ** 15
)
# Pad: A few samples are shorter than the full length
audio = F.pad(audio, (0, 0, 0, 16000 - audio.shape[0]))
if str(foldername + '/' + filename) in validation_list:
val_X.append(audio)
val_y.append(y_index)
elif str(foldername + '/' + filename) in testing_list:
test_X.append(audio)
test_y.append(y_index)
else:
train_X.append(audio)
train_y.append(y_index)
batch_index += 1
y_index += 1
# print("Full data: {} samples".format(len(X)))
train_X = torch.stack(train_X)
val_X = torch.stack(val_X)
test_X = torch.stack(test_X)
train_y = torch.tensor(train_y, dtype=torch.long)
val_y = torch.tensor(val_y, dtype=torch.long)
test_y = torch.tensor(test_y, dtype=torch.long)
# If MFCC, then we compute these coefficients.
if mfcc:
train_X = torchaudio.transforms.MFCC(
log_mels=True, n_mfcc=20, melkwargs=dict(n_fft=200, n_mels=64)
)(train_X.squeeze(-1)).detach()
val_X = torchaudio.transforms.MFCC(
log_mels=True, n_mfcc=20, melkwargs=dict(n_fft=200, n_mels=64)
)(val_X.squeeze(-1)).detach()
test_X = torchaudio.transforms.MFCC(
log_mels=True, n_mfcc=20, melkwargs=dict(n_fft=200, n_mels=64)
)(test_X.squeeze(-1)).detach()
# X is of shape (batch, channels=20, length=161)
else:
train_X = train_X.unsqueeze(1).squeeze(-1)
val_X = val_X.unsqueeze(1).squeeze(-1)
test_X = test_X.unsqueeze(1).squeeze(-1)
# X is of shape (batch, channels=1, length=16000)
# Normalize data
if mfcc:
train_X, val_X, test_X = normalize_all_data(train_X.transpose(1, 2), val_X.transpose(1, 2), test_X.transpose(1, 2))
train_X = train_X.transpose(1, 2)
val_X = val_X.transpose(1, 2)
test_X = test_X.transpose(1, 2)
else:
train_X, val_X, test_X = normalize_all_data(train_X, val_X, test_X)
# Print the shape of all tensors in one line
print(
"Train: {}, Val: {}, Test: {}".format(
train_X.shape, val_X.shape, test_X.shape
)
)
return (
train_X,
val_X,
test_X,
train_y,
val_y,
test_y,
)
def _process_data(self, mfcc):
base_loc = self.root / "SpeechCommands"
if self.gen:
X = torch.empty(35628, 16000, 1)
y = torch.empty(35628, dtype=torch.long)
else:
X = torch.empty(34975, 16000, 1)
y = torch.empty(34975, dtype=torch.long)
batch_index = 0
y_index = 0
for foldername in self.SUBSET_CLASSES:
loc = base_loc / foldername
for filename in os.listdir(loc):
audio, _ = torchaudio.load(
loc / filename, channels_first=False,
)
# audio, _ = torchaudio.load_wav(
# loc / filename, channels_first=False, normalization=False
# ) # for forward compatbility if they fix it
audio = (
audio / 2 ** 15
) # Normalization argument doesn't seem to work so we do it manually.
# A few samples are shorter than the full length; for simplicity we discard them.
if len(audio) != 16000:
continue
X[batch_index] = audio
y[batch_index] = y_index
batch_index += 1
y_index += 1
if self.gen:
assert batch_index == 35628, "batch_index is {}".format(batch_index)
else:
assert batch_index == 34975, "batch_index is {}".format(batch_index)
# If MFCC, then we compute these coefficients.
if mfcc:
X = torchaudio.transforms.MFCC(
log_mels=True, n_mfcc=20, melkwargs=dict(n_fft=200, n_mels=64)
)(X.squeeze(-1)).detach()
# X is of shape (batch=34975, channels=20, length=161)
else:
X = X.unsqueeze(1).squeeze(-1)
# X is of shape (batch=34975, channels=1, length=16000)
# If dropped is different than zero, randomly drop that quantity of data from the dataset.
if self.dropped_rate != 0:
generator = torch.Generator().manual_seed(56789)
X_removed = []
for Xi in X:
removed_points = (
torch.randperm(X.shape[-1], generator=generator)[
: int(X.shape[-1] * float(self.dropped_rate) / 100.0)
]
.sort()
.values
)
Xi_removed = Xi.clone()
Xi_removed[:, removed_points] = float("nan")
X_removed.append(Xi_removed)
X = torch.stack(X_removed, dim=0)
# Normalize data
if mfcc:
X = normalise_data(X.transpose(1, 2), y).transpose(1, 2)
else:
X = normalise_data(X, y)
# Once the data is normalized append times and mask values if required.
if self.dropped_rate != 0:
# Get mask of possitions that are deleted
mask_exists = (~torch.isnan(X[:, :1, :])).float()
X = torch.where(~torch.isnan(X), X, torch.Tensor([0.0]))
X = torch.cat([X, mask_exists], dim=1)
train_X, val_X, test_X = split_data(X, y)
train_y, val_y, test_y = split_data(y, y)
if self.gen:
train_y, val_y, test_y = train_X, val_X, test_X
train_y, val_y, test_y = mu_law_encode(train_y), mu_law_encode(val_y), mu_law_encode(test_y)
# train_X, val_X, test_X = train_X[..., :-1], val_X[..., :-1], test_X[..., :-1]
# # Prepend zero to train_X, val_X, test_X
# train_X = torch.cat([torch.zeros(train_X.shape[0], 1, train_X.shape[2]), train_X], dim=1)
# train_X, val_X, test_X = torch.roll(train_X, 1, 2), torch.roll(val_X, 1, 2), torch.roll(test_X, 1, 2)
if not self.discrete_input:
train_X, val_X, test_X = torch.roll(mu_law_decode(train_y), 1, 2), torch.roll(mu_law_decode(val_y), 1, 2), torch.roll(mu_law_decode(test_y), 1, 2)
else:
train_X, val_X, test_X = torch.roll(train_y, 1, 2), torch.roll(val_y, 1, 2), torch.roll(test_y, 1, 2)
train_X[..., 0], val_X[..., 0], test_X[..., 0] = 0, 0, 0
assert(train_y.shape == train_X.shape)
return (
train_X,
val_X,
test_X,
train_y,
val_y,
test_y,
)
@staticmethod
def load_data(data_loc, partition):
tensors = load_data(data_loc)
if partition == "train":
X = tensors["train_X"]
y = tensors["train_y"]
elif partition == "val":
X = tensors["val_X"]
y = tensors["val_y"]
elif partition == "test":
X = tensors["test_X"]
y = tensors["test_y"]
else:
raise NotImplementedError("the set {} is not implemented.".format(set))
return X.transpose(1, 2), y
class _SpeechCommandsGeneration(_SpeechCommands):
SUBSET_CLASSES = [
"zero",
"one",
"two",
"three",
"four",
"five",
"six",
"seven",
"eight",
"nine",
]
def __init__(
self,
partition: str, # `train`, `val`, `test`
length: int, # sequence length
mfcc: bool, # whether to use MFCC features (`True`) or raw features
sr: int, # subsampling rate: default should be 1 (no subsampling); keeps every kth sample
dropped_rate: float, # rate at which samples are dropped, lies in [0, 100.]
path: str,
all_classes: bool = False,
discrete_input: bool = False,
):
super(_SpeechCommandsGeneration, self).__init__(
partition = partition,
length = length,
mfcc = mfcc,
sr = sr,
dropped_rate = dropped_rate,
path = path,
all_classes = all_classes,
gen = True,
discrete_input = discrete_input,
)