import torch from speechbrain.inference.interfaces import Pretrained import librosa class ASR(Pretrained): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) def encode_batch(self, wavs, wav_lens=None, normalize=False): wavs = wavs.to(self.device) self.wav_lens = wav_lens.to(self.device) # Forward pass encoded_outputs = self.mods.encoder_w2v2(wavs.detach()) # append tokens_bos = torch.zeros((wavs.size(0), 1), dtype=torch.long).to(self.device) embedded_tokens = self.mods.embedding(tokens_bos) decoder_outputs, _ = self.mods.decoder(embedded_tokens, encoded_outputs, self.wav_lens) # Output layer for seq2seq log-probabilities predictions = self.hparams.test_search(encoded_outputs, self.wav_lens)[0] predicted_words = [self.hparams.tokenizer.decode_ids(prediction).split(" ") for prediction in predictions] # prediction = [] # for sent in predicted_words: # sent = self.filter_repetitions(sent, 3) # sent = " ".join(sent) # prediction.append(sent) # predicted_words = prediction[0] return predicted_words def filter_repetitions(self, seq, max_repetition_length): seq = list(seq) output = [] max_n = len(seq) // 2 for n in range(max_n, 0, -1): max_repetitions = max(max_repetition_length // n, 1) # Don't need to iterate over impossible n values: # len(seq) can change a lot during iteration if (len(seq) <= n*2) or (len(seq) <= max_repetition_length): continue iterator = enumerate(seq) # Fill first buffers: buffers = [[next(iterator)[1]] for _ in range(n)] for seq_index, token in iterator: current_buffer = seq_index % n if token != buffers[current_buffer][-1]: # No repeat, we can flush some tokens buf_len = sum(map(len, buffers)) flush_start = (current_buffer-buf_len) % n # Keep n-1 tokens, but possibly mark some for removal for flush_index in range(buf_len - buf_len%n): if (buf_len - flush_index) > n-1: to_flush = buffers[(flush_index + flush_start) % n].pop(0) else: to_flush = None # Here, repetitions get removed: if (flush_index // n < max_repetitions) and to_flush is not None: output.append(to_flush) elif (flush_index // n >= max_repetitions) and to_flush is None: output.append(to_flush) buffers[current_buffer].append(token) # At the end, final flush current_buffer += 1 buf_len = sum(map(len, buffers)) flush_start = (current_buffer-buf_len) % n for flush_index in range(buf_len): to_flush = buffers[(flush_index + flush_start) % n].pop(0) # Here, repetitions just get removed: if flush_index // n < max_repetitions: output.append(to_flush) seq = [] to_delete = 0 for token in output: if token is None: to_delete += 1 elif to_delete > 0: to_delete -= 1 else: seq.append(token) output = [] return seq def classify_file(self, path): # waveform = self.load_audio(path) waveform, sr = librosa.load(path, sr=16000) waveform = torch.tensor(waveform) # Fake a batch: batch = waveform.unsqueeze(0) rel_length = torch.tensor([1.0]) outputs = self.encode_batch(batch, rel_length) return outputs # def forward(self, wavs, wav_lens=None): # return self.encode_batch(wavs=wavs, wav_lens=wav_lens)