wav2vec2-aed-macedonian-asr / custom_interface_app_streaming.py
Porjaz's picture
Rename custom_interface_app.py to custom_interface_app_streaming.py
2ca05be verified
import torch
from speechbrain.inference.interfaces import Pretrained
import librosa
import numpy as np
class ASR(Pretrained):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
def encode_batch_w2v2(self, device, wavs, wav_lens=None, normalize=False):
wavs = wavs.to(device)
wav_lens = wav_lens.to(device)
# Forward pass
encoded_outputs = self.mods.encoder_w2v2(wavs.detach())
# append
tokens_bos = torch.zeros((wavs.size(0), 1), dtype=torch.long).to(device)
embedded_tokens = self.mods.embedding(tokens_bos)
decoder_outputs, _ = self.mods.decoder(embedded_tokens, encoded_outputs, wav_lens)
# Output layer for seq2seq log-probabilities
predictions = self.hparams.test_search(encoded_outputs, wav_lens)[0]
# predicted_words = [self.hparams.tokenizer.decode_ids(prediction).split(" ") for prediction in predictions]
predicted_words = []
for prediction in predictions:
prediction = [token for token in prediction if token != 0]
predicted_words.append(self.hparams.tokenizer.decode_ids(prediction).split(" "))
prediction = []
for sent in predicted_words:
sent = self.filter_repetitions(sent, 3)
prediction.append(sent)
predicted_words = prediction
return predicted_words
def encode_batch_whisper(self, device, wavs, wav_lens=None, normalize=False):
wavs = wavs.to(device)
wav_lens = wav_lens.to(device)
# Forward encoder + decoder
tokens = torch.tensor([[1, 1]]) * self.mods.whisper.config.decoder_start_token_id
tokens = tokens.to(device)
enc_out, logits, _ = self.mods.whisper(wavs.detach(), tokens.detach())
log_probs = self.hparams.log_softmax(logits)
hyps, _, _, _ = self.hparams.test_search(enc_out.detach(), wav_lens)
predicted_words = [self.mods.whisper.tokenizer.decode(token, skip_special_tokens=True).strip() for token in hyps]
return predicted_words
def filter_repetitions(self, seq, max_repetition_length):
seq = list(seq)
output = []
max_n = len(seq) // 2
for n in range(max_n, 0, -1):
max_repetitions = max(max_repetition_length // n, 1)
# Don't need to iterate over impossible n values:
# len(seq) can change a lot during iteration
if (len(seq) <= n*2) or (len(seq) <= max_repetition_length):
continue
iterator = enumerate(seq)
# Fill first buffers:
buffers = [[next(iterator)[1]] for _ in range(n)]
for seq_index, token in iterator:
current_buffer = seq_index % n
if token != buffers[current_buffer][-1]:
# No repeat, we can flush some tokens
buf_len = sum(map(len, buffers))
flush_start = (current_buffer-buf_len) % n
# Keep n-1 tokens, but possibly mark some for removal
for flush_index in range(buf_len - buf_len%n):
if (buf_len - flush_index) > n-1:
to_flush = buffers[(flush_index + flush_start) % n].pop(0)
else:
to_flush = None
# Here, repetitions get removed:
if (flush_index // n < max_repetitions) and to_flush is not None:
output.append(to_flush)
elif (flush_index // n >= max_repetitions) and to_flush is None:
output.append(to_flush)
buffers[current_buffer].append(token)
# At the end, final flush
current_buffer += 1
buf_len = sum(map(len, buffers))
flush_start = (current_buffer-buf_len) % n
for flush_index in range(buf_len):
to_flush = buffers[(flush_index + flush_start) % n].pop(0)
# Here, repetitions just get removed:
if flush_index // n < max_repetitions:
output.append(to_flush)
seq = []
to_delete = 0
for token in output:
if token is None:
to_delete += 1
elif to_delete > 0:
to_delete -= 1
else:
seq.append(token)
output = []
return seq
def increase_volume(self, waveform, threshold_db=-25):
# Measure loudness using RMS
loudness_vector = librosa.feature.rms(y=waveform)
average_loudness = np.mean(loudness_vector)
average_loudness_db = librosa.amplitude_to_db(average_loudness)
print(f"Average Loudness: {average_loudness_db} dB")
# Check if loudness is below threshold and apply gain if needed
if average_loudness_db < threshold_db:
# Calculate gain needed
gain_db = threshold_db - average_loudness_db
gain = librosa.db_to_amplitude(gain_db) # Convert dB to amplitude factor
# Apply gain to the audio signal
waveform = waveform * gain
loudness_vector = librosa.feature.rms(y=waveform)
average_loudness = np.mean(loudness_vector)
average_loudness_db = librosa.amplitude_to_db(average_loudness)
print(f"Average Loudness: {average_loudness_db} dB")
return waveform
def classify_file_w2v2(self, waveform, device):
# Load the audio file
# waveform, sr = librosa.load(path, sr=16000)
# Get audio length in seconds
audio_length = len(waveform) / 16000
if audio_length >= 20:
# split audio every 20 seconds
segments = []
max_duration = 20 * 16000 # Maximum segment duration in samples (20 seconds)
num_segments = int(np.ceil(len(waveform) / max_duration))
start = 0
for i in range(num_segments):
end = start + max_duration
if end > len(waveform):
end = len(waveform)
segment_part = waveform[start:end]
segment_len = len(segment_part) / 16000
if segment_len < 1:
continue
segments.append(segment_part)
start = end
for segment in segments:
segment_tensor = torch.tensor(segment).to(device)
# Fake a batch for the segment
batch = segment_tensor.unsqueeze(0).to(device)
rel_length = torch.tensor([1.0]).to(device) # Adjust if necessary
# Pass the segment through the ASR model
segment_output = self.encode_batch_w2v2(device, batch, rel_length)
yield segment_output
else:
waveform = torch.tensor(waveform).to(device)
waveform = waveform.to(device)
# Fake a batch:
batch = waveform.unsqueeze(0)
rel_length = torch.tensor([1.0]).to(device)
outputs = self.encode_batch_w2v2(device, batch, rel_length)
yield outputs
def classify_file_whisper_mkd(self, path, device):
# Load the audio file
waveform, sr = librosa.load(path, sr=16000)
# Get audio length in seconds
audio_length = len(waveform) / sr
if audio_length >= 20:
# split audio every 20 seconds
segments = []
max_duration = 20 * sr # Maximum segment duration in samples (20 seconds)
num_segments = int(np.ceil(len(waveform) / max_duration))
start = 0
for i in range(num_segments):
end = start + max_duration
if end > len(waveform):
end = len(waveform)
segment_part = waveform[start:end]
segment_len = len(segment_part) / sr
if segment_len < 1:
continue
segments.append(segment_part)
start = end
for segment in segments:
segment_tensor = torch.tensor(segment).to(device)
# Fake a batch for the segment
batch = segment_tensor.unsqueeze(0).to(device)
batch = batch.to(torch.float16)
rel_length = torch.tensor([1.0], dtype=torch.float16).to(device)
# Pass the segment through the ASR model
segment_output = self.encode_batch_whisper(device, batch, rel_length)
yield segment_output
else:
waveform = torch.tensor(waveform).to(device)
waveform = waveform.to(device)
batch = waveform.unsqueeze(0)
batch = batch.to(torch.float16)
rel_length = torch.tensor([1.0], dtype=torch.float16).to(device)
outputs = self.encode_batch_whisper(device, batch, rel_length)
yield outputs
def classify_file_whisper(self, path, pipe, device):
waveform, sr = librosa.load(path, sr=16000)
transcription = pipe(waveform, generate_kwargs={"language": "macedonian"})["text"]
return transcription
def classify_file_mms(self, path, processor, model, device):
# Load the audio file
waveform, sr = librosa.load(path, sr=16000)
# Get audio length in seconds
audio_length = len(waveform) / sr
if audio_length >= 20:
# split audio every 20 seconds
segments = []
max_duration = 20 * sr # Maximum segment duration in samples (20 seconds)
num_segments = int(np.ceil(len(waveform) / max_duration))
start = 0
for i in range(num_segments):
end = start + max_duration
if end > len(waveform):
end = len(waveform)
segment_part = waveform[start:end]
segment_len = len(segment_part) / sr
if segment_len < 1:
continue
segments.append(segment_part)
start = end
for segment in segments:
segment_tensor = torch.tensor(segment).to(device)
# Pass the segment through the ASR model
inputs = processor(segment_tensor, sampling_rate=16_000, return_tensors="pt").to(device)
inputs['input_values'] = inputs['input_values']
outputs = model(**inputs).logits
ids = torch.argmax(outputs, dim=-1)[0]
segment_output = processor.decode(ids)
yield segment_output
else:
waveform = torch.tensor(waveform).to(device)
inputs = processor(waveform, sampling_rate=16_000, return_tensors="pt").to(device)
inputs['input_values'] = inputs['input_values']
outputs = model(**inputs).logits
ids = torch.argmax(outputs, dim=-1)[0]
transcription = processor.decode(ids)
yield transcription