JustinLin610
update
10b0761
raw
history blame
No virus
4.74 kB
import collections
import io
import json
import librosa
import numpy as np
import soundfile as sf
import time
import torch
from scipy.io.wavfile import read
from .text import SOS_TOK, EOS_TOK
def get_mask_from_lengths(lengths):
max_len = torch.max(lengths).item()
ids = torch.arange(0, max_len, out=torch.cuda.LongTensor(max_len))
mask = (ids < lengths.unsqueeze(1))
return mask
def load_wav_to_torch(full_path, sr=None):
data, sr = librosa.load(full_path, sr=sr)
data = np.clip(data, -1, 1) # potentially out of [-1, 1] due to resampling
data = data * 32768.0 # match values loaded by scipy
return torch.FloatTensor(data.astype(np.float32)), sr
def read_binary_audio(bin_data, tar_sr=None):
"""
read binary audio (`bytes` or `uint8` `numpy.ndarray`) to `float32`
`numpy.ndarray`
RETURNS:
data (np.ndarray) : audio of shape (n,) or (2, n)
tar_sr (int) : sample rate
"""
data, ori_sr = sf.read(io.BytesIO(bin_data), dtype='float32')
data = data.T
if (tar_sr is not None) and (ori_sr != tar_sr):
data = librosa.resample(data, ori_sr, tar_sr)
else:
tar_sr = ori_sr
data = np.clip(data, -1, 1)
data = data * 32768.0
return torch.FloatTensor(data.astype(np.float32)), tar_sr
def load_filepaths_and_text(filename):
with open(filename, encoding='utf-8') as f:
data = [json.loads(line.rstrip()) for line in f]
return data
def to_gpu(x):
x = x.contiguous()
if torch.cuda.is_available():
x = x.cuda(non_blocking=True)
return torch.autograd.Variable(x)
def load_code_dict(path, add_sos=False, add_eos=False):
if not path:
return {}
with open(path, 'r') as f:
codes = ['_'] + [line.rstrip() for line in f] # '_' for pad
code_dict = {c: i for i, c in enumerate(codes)}
if add_sos:
code_dict[SOS_TOK] = len(code_dict)
if add_eos:
code_dict[EOS_TOK] = len(code_dict)
assert(set(code_dict.values()) == set(range(len(code_dict))))
return code_dict
def load_obs_label_dict(path):
if not path:
return {}
with open(path, 'r') as f:
obs_labels = [line.rstrip() for line in f]
return {c: i for i, c in enumerate(obs_labels)}
# A simple timer class inspired from `tnt.TimeMeter`
class CudaTimer:
def __init__(self, keys):
self.keys = keys
self.reset()
def start(self, key):
s = torch.cuda.Event(enable_timing=True)
s.record()
self.start_events[key].append(s)
return self
def stop(self, key):
e = torch.cuda.Event(enable_timing=True)
e.record()
self.end_events[key].append(e)
return self
def reset(self):
self.start_events = collections.defaultdict(list)
self.end_events = collections.defaultdict(list)
self.running_times = collections.defaultdict(float)
self.n = collections.defaultdict(int)
return self
def value(self):
self._synchronize()
return {k: self.running_times[k] / self.n[k] for k in self.keys}
def _synchronize(self):
torch.cuda.synchronize()
for k in self.keys:
starts = self.start_events[k]
ends = self.end_events[k]
if len(starts) == 0:
raise ValueError("Trying to divide by zero in TimeMeter")
if len(ends) != len(starts):
raise ValueError("Call stop before checking value!")
time = 0
for start, end in zip(starts, ends):
time += start.elapsed_time(end)
self.running_times[k] += time * 1e-3
self.n[k] += len(starts)
self.start_events = collections.defaultdict(list)
self.end_events = collections.defaultdict(list)
# Used to measure the time taken for multiple events
class Timer:
def __init__(self, keys):
self.keys = keys
self.n = {}
self.running_time = {}
self.total_time = {}
self.reset()
def start(self, key):
self.running_time[key] = time.time()
return self
def stop(self, key):
self.total_time[key] = time.time() - self.running_time[key]
self.n[key] += 1
self.running_time[key] = None
return self
def reset(self):
for k in self.keys:
self.total_time[k] = 0
self.running_time[k] = None
self.n[k] = 0
return self
def value(self):
vals = {}
for k in self.keys:
if self.n[k] == 0:
raise ValueError("Trying to divide by zero in TimeMeter")
else:
vals[k] = self.total_time[k] / self.n[k]
return vals