Spaces:
Runtime error
Runtime error
File size: 4,741 Bytes
10b0761 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 |
import collections
import io
import json
import librosa
import numpy as np
import soundfile as sf
import time
import torch
from scipy.io.wavfile import read
from .text import SOS_TOK, EOS_TOK
def get_mask_from_lengths(lengths):
max_len = torch.max(lengths).item()
ids = torch.arange(0, max_len, out=torch.cuda.LongTensor(max_len))
mask = (ids < lengths.unsqueeze(1))
return mask
def load_wav_to_torch(full_path, sr=None):
data, sr = librosa.load(full_path, sr=sr)
data = np.clip(data, -1, 1) # potentially out of [-1, 1] due to resampling
data = data * 32768.0 # match values loaded by scipy
return torch.FloatTensor(data.astype(np.float32)), sr
def read_binary_audio(bin_data, tar_sr=None):
"""
read binary audio (`bytes` or `uint8` `numpy.ndarray`) to `float32`
`numpy.ndarray`
RETURNS:
data (np.ndarray) : audio of shape (n,) or (2, n)
tar_sr (int) : sample rate
"""
data, ori_sr = sf.read(io.BytesIO(bin_data), dtype='float32')
data = data.T
if (tar_sr is not None) and (ori_sr != tar_sr):
data = librosa.resample(data, ori_sr, tar_sr)
else:
tar_sr = ori_sr
data = np.clip(data, -1, 1)
data = data * 32768.0
return torch.FloatTensor(data.astype(np.float32)), tar_sr
def load_filepaths_and_text(filename):
with open(filename, encoding='utf-8') as f:
data = [json.loads(line.rstrip()) for line in f]
return data
def to_gpu(x):
x = x.contiguous()
if torch.cuda.is_available():
x = x.cuda(non_blocking=True)
return torch.autograd.Variable(x)
def load_code_dict(path, add_sos=False, add_eos=False):
if not path:
return {}
with open(path, 'r') as f:
codes = ['_'] + [line.rstrip() for line in f] # '_' for pad
code_dict = {c: i for i, c in enumerate(codes)}
if add_sos:
code_dict[SOS_TOK] = len(code_dict)
if add_eos:
code_dict[EOS_TOK] = len(code_dict)
assert(set(code_dict.values()) == set(range(len(code_dict))))
return code_dict
def load_obs_label_dict(path):
if not path:
return {}
with open(path, 'r') as f:
obs_labels = [line.rstrip() for line in f]
return {c: i for i, c in enumerate(obs_labels)}
# A simple timer class inspired from `tnt.TimeMeter`
class CudaTimer:
def __init__(self, keys):
self.keys = keys
self.reset()
def start(self, key):
s = torch.cuda.Event(enable_timing=True)
s.record()
self.start_events[key].append(s)
return self
def stop(self, key):
e = torch.cuda.Event(enable_timing=True)
e.record()
self.end_events[key].append(e)
return self
def reset(self):
self.start_events = collections.defaultdict(list)
self.end_events = collections.defaultdict(list)
self.running_times = collections.defaultdict(float)
self.n = collections.defaultdict(int)
return self
def value(self):
self._synchronize()
return {k: self.running_times[k] / self.n[k] for k in self.keys}
def _synchronize(self):
torch.cuda.synchronize()
for k in self.keys:
starts = self.start_events[k]
ends = self.end_events[k]
if len(starts) == 0:
raise ValueError("Trying to divide by zero in TimeMeter")
if len(ends) != len(starts):
raise ValueError("Call stop before checking value!")
time = 0
for start, end in zip(starts, ends):
time += start.elapsed_time(end)
self.running_times[k] += time * 1e-3
self.n[k] += len(starts)
self.start_events = collections.defaultdict(list)
self.end_events = collections.defaultdict(list)
# Used to measure the time taken for multiple events
class Timer:
def __init__(self, keys):
self.keys = keys
self.n = {}
self.running_time = {}
self.total_time = {}
self.reset()
def start(self, key):
self.running_time[key] = time.time()
return self
def stop(self, key):
self.total_time[key] = time.time() - self.running_time[key]
self.n[key] += 1
self.running_time[key] = None
return self
def reset(self):
for k in self.keys:
self.total_time[k] = 0
self.running_time[k] = None
self.n[k] = 0
return self
def value(self):
vals = {}
for k in self.keys:
if self.n[k] == 0:
raise ValueError("Trying to divide by zero in TimeMeter")
else:
vals[k] = self.total_time[k] / self.n[k]
return vals
|