# -*- coding: utf-8 -*- import os import sys import torch import logging import speechbrain as sb from speechbrain.utils.distributed import run_on_main from hyperpyyaml import load_hyperpyyaml from pathlib import Path import torchaudio.transforms as T from cv_train import ASRCV import torchaudio import numpy as np import kenlm from pyctcdecode import build_ctcdecoder import re from torch.nn.utils.rnn import pad_sequence import torch.optim as optim import torch.nn as nn # Commented out IPython magic to ensure Python compatibility. hparams_file, run_opts, overrides = sb.parse_arguments(["hparams/train_semi.yaml"]) # If distributed_launch=True then # create ddp_group with the right communication protocol sb.utils.distributed.ddp_init_group(run_opts) with open(hparams_file) as fin: hparams = load_hyperpyyaml(fin, overrides) # Create experiment directory sb.create_experiment_directory( experiment_directory=hparams["output_folder"], hyperparams_to_save=hparams_file, overrides=overrides, ) # Dataset prep (parsing Librispeech) def dataio_prepare(hparams): """This function prepares the datasets to be used in the brain class. It also defines the data processing pipeline through user-defined functions.""" # 1. Define datasets data_folder = hparams["data_folder"] train_data = sb.dataio.dataset.DynamicItemDataset.from_csv( csv_path=hparams["train_csv"], replacements={"data_root": data_folder}, ) if hparams["sorting"] == "ascending": # we sort training data to speed up training and get better results. train_data = train_data.filtered_sorted( sort_key="duration", key_max_value={"duration": hparams["avoid_if_longer_than"]}, ) # when sorting do not shuffle in dataloader ! otherwise is pointless hparams["dataloader_options"]["shuffle"] = False elif hparams["sorting"] == "descending": train_data = train_data.filtered_sorted( sort_key="duration", reverse=True, key_max_value={"duration": hparams["avoid_if_longer_than"]}, ) # when sorting do not shuffle in dataloader ! otherwise is pointless hparams["dataloader_options"]["shuffle"] = False elif hparams["sorting"] == "random": pass else: raise NotImplementedError( "sorting must be random, ascending or descending" ) valid_data = sb.dataio.dataset.DynamicItemDataset.from_csv( csv_path=hparams["valid_csv"], replacements={"data_root": data_folder}, ) # We also sort the validation data so it is faster to validate valid_data = valid_data.filtered_sorted(sort_key="duration") test_datasets = {} for csv_file in hparams["test_csv"]: name = Path(csv_file).stem test_datasets[name] = sb.dataio.dataset.DynamicItemDataset.from_csv( csv_path=csv_file, replacements={"data_root": data_folder} ) test_datasets[name] = test_datasets[name].filtered_sorted( sort_key="duration" ) datasets = [train_data, valid_data] + [i for k, i in test_datasets.items()] # 2. Define audio pipeline: @sb.utils.data_pipeline.takes("wav") @sb.utils.data_pipeline.provides("sig") def audio_pipeline(wav): info = torchaudio.info(wav) sig = sb.dataio.dataio.read_audio(wav) if len(sig.shape)>1 : sig = torch.mean(sig, dim=1) resampled = torchaudio.transforms.Resample( info.sample_rate, hparams["sample_rate"], )(sig) return resampled sb.dataio.dataset.add_dynamic_item(datasets, audio_pipeline) label_encoder = sb.dataio.encoder.CTCTextEncoder() # 3. Define text pipeline: @sb.utils.data_pipeline.takes("wrd") @sb.utils.data_pipeline.provides( "wrd", "char_list", "tokens_list", "tokens" ) def text_pipeline(wrd): yield wrd char_list = list(wrd) yield char_list tokens_list = label_encoder.encode_sequence(char_list) yield tokens_list tokens = torch.LongTensor(tokens_list) yield tokens sb.dataio.dataset.add_dynamic_item(datasets, text_pipeline) lab_enc_file = os.path.join(hparams["save_folder"], "label_encoder.txt") special_labels = { "blank_label": hparams["blank_index"], "unk_label": hparams["unk_index"] } label_encoder.load_or_create( path=lab_enc_file, from_didatasets=[train_data], output_key="char_list", special_labels=special_labels, sequence_input=True, ) # 4. Set output: sb.dataio.dataset.set_output_keys( datasets, ["id", "sig", "wrd", "char_list", "tokens"], ) return train_data, valid_data,test_datasets, label_encoder class ASR(sb.core.Brain): def compute_forward(self, batch, stage): """Forward computations from the waveform batches to the output probabilities.""" batch = batch.to(self.device) wavs, wav_lens = batch.sig wavs, wav_lens = wavs.to(self.device), wav_lens.to(self.device) if stage == sb.Stage.TRAIN: if hasattr(self.hparams, "augmentation"): wavs = self.hparams.augmentation(wavs, wav_lens) # Forward pass feats = self.modules.wav2vec2(wavs, wav_lens) x = self.modules.enc(feats) logits = self.modules.ctc_lin(x) p_ctc = self.hparams.log_softmax(logits) return p_ctc, wav_lens def custom_encode(self,wavs,wav_lens) : wavs = wavs.to(self.device) if(wav_lens is not None): wav_lens.to(self.device) feats = self.modules.wav2vec2(wavs, wav_lens) x = self.modules.enc(feats) logits = self.modules.ctc_lin(x) p_ctc = self.hparams.log_softmax(logits) return feats,p_ctc def compute_objectives(self, predictions, batch, stage): """Computes the loss (CTC) given predictions and targets.""" p_ctc, wav_lens = predictions ids = batch.id tokens, tokens_lens = batch.tokens loss = self.hparams.ctc_cost(p_ctc, tokens, wav_lens, tokens_lens) if stage != sb.Stage.TRAIN: predicted_tokens = sb.decoders.ctc_greedy_decode( p_ctc, wav_lens, blank_id=self.hparams.blank_index ) # Decode token terms to words if self.hparams.use_language_modelling: predicted_words = [] for logs in p_ctc: text = decoder.decode(logs.detach().cpu().numpy()) predicted_words.append(text.split(" ")) else: predicted_words = [ "".join(self.tokenizer.decode_ndim(utt_seq)).split(" ") for utt_seq in predicted_tokens ] # Convert indices to words target_words = [wrd.split(" ") for wrd in batch.wrd] self.wer_metric.append(ids, predicted_words, target_words) self.cer_metric.append(ids, predicted_words, target_words) return loss def fit_batch(self, batch): """Train the parameters given a single batch in input""" should_step = self.step % self.grad_accumulation_factor == 0 # Managing automatic mixed precision # TOFIX: CTC fine-tuning currently is unstable # This is certainly due to CTC being done in fp16 instead of fp32 if self.auto_mix_prec: with torch.cuda.amp.autocast(): with self.no_sync(): outputs = self.compute_forward(batch, sb.Stage.TRAIN) loss = self.compute_objectives(outputs, batch, sb.Stage.TRAIN) with self.no_sync(not should_step): self.scaler.scale( loss / self.grad_accumulation_factor ).backward() if should_step: if not self.hparams.wav2vec2.freeze: self.scaler.unscale_(self.wav2vec_optimizer) self.scaler.unscale_(self.model_optimizer) if self.check_gradients(loss): if not self.hparams.wav2vec2.freeze: if self.optimizer_step >= self.hparams.warmup_steps: self.scaler.step(self.wav2vec_optimizer) self.scaler.step(self.model_optimizer) self.scaler.update() self.zero_grad() self.optimizer_step += 1 else: # This is mandatory because HF models have a weird behavior with DDP # on the forward pass with self.no_sync(): outputs = self.compute_forward(batch, sb.Stage.TRAIN) loss = self.compute_objectives(outputs, batch, sb.Stage.TRAIN) with self.no_sync(not should_step): (loss / self.grad_accumulation_factor).backward() if should_step: if self.check_gradients(loss): if not self.hparams.wav2vec2.freeze: if self.optimizer_step >= self.hparams.warmup_steps: self.wav2vec_optimizer.step() self.model_optimizer.step() self.zero_grad() self.optimizer_step += 1 self.on_fit_batch_end(batch, outputs, loss, should_step) return loss.detach().cpu() def evaluate_batch(self, batch, stage): """Computations needed for validation/test batches""" predictions = self.compute_forward(batch, stage=stage) with torch.no_grad(): loss = self.compute_objectives(predictions, batch, stage=stage) return loss.detach() def on_stage_start(self, stage, epoch): """Gets called at the beginning of each epoch""" if stage != sb.Stage.TRAIN: self.cer_metric = self.hparams.cer_computer() self.wer_metric = self.hparams.error_rate_computer() def on_stage_end(self, stage, stage_loss, epoch): """Gets called at the end of an epoch.""" # Compute/store important stats stage_stats = {"loss": stage_loss} if stage == sb.Stage.TRAIN: self.train_stats = stage_stats else: stage_stats["CER"] = self.cer_metric.summarize("error_rate") stage_stats["WER"] = self.wer_metric.summarize("error_rate") # Perform end-of-iteration things, like annealing, logging, etc. if stage == sb.Stage.VALID: old_lr_model, new_lr_model = self.hparams.lr_annealing_model( stage_stats["loss"] ) old_lr_wav2vec, new_lr_wav2vec = self.hparams.lr_annealing_wav2vec( stage_stats["loss"] ) sb.nnet.schedulers.update_learning_rate( self.model_optimizer, new_lr_model ) if not self.hparams.wav2vec2.freeze: sb.nnet.schedulers.update_learning_rate( self.wav2vec_optimizer, new_lr_wav2vec ) self.hparams.train_logger.log_stats( stats_meta={ "epoch": epoch, "lr_model": old_lr_model, "lr_wav2vec": old_lr_wav2vec, }, train_stats=self.train_stats, valid_stats=stage_stats, ) self.checkpointer.save_and_keep_only( meta={"WER": stage_stats["WER"]}, min_keys=["WER"], ) elif stage == sb.Stage.TEST: self.hparams.train_logger.log_stats( stats_meta={"Epoch loaded": self.hparams.epoch_counter.current}, test_stats=stage_stats, ) with open(self.hparams.wer_file, "w") as w: self.wer_metric.write_stats(w) def init_optimizers(self): "Initializes the wav2vec2 optimizer and model optimizer" # If the wav2vec encoder is unfrozen, we create the optimizer if not self.hparams.wav2vec2.freeze: self.wav2vec_optimizer = self.hparams.wav2vec_opt_class( self.modules.wav2vec2.parameters() ) if self.checkpointer is not None: self.checkpointer.add_recoverable( "wav2vec_opt", self.wav2vec_optimizer ) self.model_optimizer = self.hparams.model_opt_class( self.hparams.model.parameters() ) if self.checkpointer is not None: self.checkpointer.add_recoverable("modelopt", self.model_optimizer) def zero_grad(self, set_to_none=False): if not self.hparams.wav2vec2.freeze: self.wav2vec_optimizer.zero_grad(set_to_none) self.model_optimizer.zero_grad(set_to_none) from speechbrain.pretrained import EncoderASR,EncoderDecoderASR french_asr_model = EncoderASR.from_hparams(source="speechbrain/asr-wav2vec2-commonvoice-fr", savedir="pretrained_models/asr-wav2vec2-commonvoice-fr").cuda() cvhparams_file, cvrun_opts, cvoverrides = sb.parse_arguments(["en_cv.yaml"]) with open(cvhparams_file) as cvfin: cvhparams = load_hyperpyyaml(cvfin, cvoverrides) english_asr_model = ASRCV( modules=cvhparams["modules"], hparams=cvhparams, run_opts=cvrun_opts, checkpointer=cvhparams["checkpointer"], ) english_asr_model.checkpointer.recover_if_possible() asr_brain = ASR( modules=hparams["modules"], hparams=hparams, run_opts=run_opts, checkpointer=hparams["checkpointer"], ) asr_brain.checkpointer.recover_if_possible() asr_brain.modules.eval() english_asr_model.modules.eval() french_asr_model.mods.eval() # Commented out IPython magic to ensure Python compatibility. # %ls #UTILS FUNCTIOJNS def get_size_dimensions(arr): size_dimensions = [] while isinstance(arr, list): size_dimensions.append(len(arr)) arr = arr[0] return size_dimensions def scale_array(batch,n): scaled_batch = [] for array in batch: if(n < len(array)): raise ValueError("Cannot scale Array down") repeat = round(n/len(array))+1 scaled_length_array= [] for i in array: for j in range(repeat) : if(len(scaled_length_array) == n): break scaled_length_array.append(i) scaled_batch.append(scaled_length_array) return torch.tensor(scaled_batch) def load_paths(wavs_path): waveforms = [] for path in wavs_path : waveform, _ = torchaudio.load(path) waveforms.append(waveform.squeeze(0)) # normalize array length to the bigger arrays by pading with 0's padded_arrays = pad_sequence(waveforms, batch_first=True) return torch.tensor(padded_arrays) device = 'cuda' verbose = 0 #FLOW LEVEL FUNCTIONS def merge_strategy(embeddings1, embeddings2, embeddings3,post1, post2,post3): post1 = post1.to(device) post2 = post2.to(device) post3 = post3.to(device) embeddings1 = embeddings1.to(device) embeddings2 = embeddings2.to(device) embeddings3 = embeddings3.to(device) posteriograms_merged = torch.cat((post1,post2,post3),dim=2) embeddings_merged = torch.cat((embeddings1,embeddings2,embeddings3),dim=2) if(verbose !=0): print('MERGED POST ',posteriograms_merged.shape) print('MERGED emb ',embeddings_merged.shape) return torch.cat((posteriograms_merged,embeddings_merged),dim=2).to(device) def decode(model,wavs,wav_lens): with torch.no_grad(): wav_lens = wav_lens.to(model.device) encoder_out = model.encode_batch(wavs, wav_lens) predictions = model.decoding_function(encoder_out, wav_lens) return predictions def middle_layer(batch, lens): tn_embeddings, tn_posteriogram = asr_brain.custom_encode(batch,None) fr_embeddings = french_asr_model.mods.encoder.wav2vec2(batch) fr_posteriogram =french_asr_model.encode_batch(batch,lens) en_embeddings = english_asr_model.modules.wav2vec2(batch, lens) x = english_asr_model.modules.enc(en_embeddings) en_posteriogram = english_asr_model.modules.ctc_lin(x) #scores, en_posteriogram = english_asr_model.mods.decoder(en_embeddings ,lens) if(verbose !=0): print('[EMBEDDINGS] FR:',fr_embeddings.shape, "EN:",en_embeddings.shape, "TN:", tn_embeddings.shape) print('[POSTERIOGRAM] FR:',fr_posteriogram.shape, "EN:",en_posteriogram.shape,"TN:",tn_posteriogram.shape) bilangual_sample = merge_strategy(fr_embeddings,en_embeddings,tn_embeddings,fr_posteriogram,en_posteriogram,tn_posteriogram) return bilangual_sample class Mixer(sb.core.Brain): def compute_forward(self, batch, stage): """Forward computations from the waveform batches to the output probabilities.""" wavs, wav_lens = batch.sig wavs, wav_lens = wavs.to(self.device), wav_lens.to(self.device) if stage == sb.Stage.TRAIN: if hasattr(self.hparams, "augmentation"): wavs = self.hparams.augmentation(wavs, wav_lens) multi_langual_feats = middle_layer(wavs, wav_lens) multi_langual_feats= multi_langual_feats.to(device) feats, _ = self.modules.enc(multi_langual_feats) logits = self.modules.ctc_lin(feats) p_ctc = self.hparams.log_softmax(logits) if stage!= sb.Stage.TRAIN: p_tokens = sb.decoders.ctc_greedy_decode( p_ctc, wav_lens, blank_id=self.hparams.blank_index ) else : p_tokens = None return p_ctc, wav_lens, p_tokens def compute_objectives(self, predictions, batch, stage): """Computes the loss (CTC) given predictions and targets.""" p_ctc, wav_lens , predicted_tokens= predictions ids = batch.id tokens, tokens_lens = batch.tokens loss = self.hparams.ctc_cost(p_ctc, tokens, wav_lens, tokens_lens) if stage == sb.Stage.VALID: predicted_words = [ "".join(self.tokenizer.decode_ndim(utt_seq)).split(" ") for utt_seq in predicted_tokens ] target_words = [wrd.split(" ") for wrd in batch.wrd] self.wer_metric.append(ids, predicted_words, target_words) self.cer_metric.append(ids, predicted_words, target_words) if stage ==sb.Stage.TEST : if self.hparams.language_modelling: predicted_words = [] for logs in p_ctc: text = decoder.decode(logs.detach().cpu().numpy()) predicted_words.append(text.split(" ")) else : predicted_words = [ "".join(self.tokenizer.decode_ndim(utt_seq)).split(" ") for utt_seq in predicted_tokens ] target_words = [wrd.split(" ") for wrd in batch.wrd] self.wer_metric.append(ids, predicted_words, target_words) self.cer_metric.append(ids, predicted_words, target_words) return loss def fit_batch(self, batch): """Train the parameters given a single batch in input""" should_step = self.step % self.grad_accumulation_factor == 0 # Managing automatic mixed precision # TOFIX: CTC fine-tuning currently is unstable # This is certainly due to CTC being done in fp16 instead of fp32 if self.auto_mix_prec: with torch.cuda.amp.autocast(): with self.no_sync(): outputs = self.compute_forward(batch, sb.Stage.TRAIN) loss = self.compute_objectives(outputs, batch, sb.Stage.TRAIN) with self.no_sync(not should_step): self.scaler.scale( loss / self.grad_accumulation_factor ).backward() if should_step: self.scaler.unscale_(self.model_optimizer) if self.check_gradients(loss): self.scaler.step(self.model_optimizer) self.scaler.update() self.zero_grad() self.optimizer_step += 1 else: # This is mandatory because HF models have a weird behavior with DDP # on the forward pass with self.no_sync(): outputs = self.compute_forward(batch, sb.Stage.TRAIN) loss = self.compute_objectives(outputs, batch, sb.Stage.TRAIN) with self.no_sync(not should_step): (loss / self.grad_accumulation_factor).backward() if should_step: if self.check_gradients(loss): self.model_optimizer.step() self.zero_grad() self.optimizer_step += 1 self.on_fit_batch_end(batch, outputs, loss, should_step) return loss.detach().cpu() def evaluate_batch(self, batch, stage): """Computations needed for validation/test batches""" predictions = self.compute_forward(batch, stage=stage) with torch.no_grad(): loss = self.compute_objectives(predictions, batch, stage=stage) return loss.detach() def on_stage_start(self, stage, epoch): """Gets called at the beginning of each epoch""" if stage != sb.Stage.TRAIN: self.cer_metric = self.hparams.cer_computer() self.wer_metric = self.hparams.error_rate_computer() def on_stage_end(self, stage, stage_loss, epoch): """Gets called at the end of an epoch.""" # Compute/store important stats stage_stats = {"loss": stage_loss} if stage == sb.Stage.TRAIN: self.train_stats = stage_stats else: stage_stats["CER"] = self.cer_metric.summarize("error_rate") stage_stats["WER"] = self.wer_metric.summarize("error_rate") # Perform end-of-iteration things, like annealing, logging, etc. if stage == sb.Stage.VALID: old_lr_model, new_lr_model = self.hparams.lr_annealing_model( stage_stats["loss"] ) sb.nnet.schedulers.update_learning_rate( self.model_optimizer, new_lr_model ) self.hparams.train_logger.log_stats( stats_meta={ "epoch": epoch, "lr_model": old_lr_model, }, train_stats=self.train_stats, valid_stats=stage_stats, ) self.checkpointer.save_and_keep_only( meta={"WER": stage_stats["WER"]}, min_keys=["WER"], ) elif stage == sb.Stage.TEST: self.hparams.train_logger.log_stats( stats_meta={"Epoch loaded": self.hparams.epoch_counter.current}, test_stats=stage_stats, ) with open(self.hparams.wer_file, "w") as w: self.wer_metric.write_stats(w) def init_optimizers(self): self.model_optimizer = self.hparams.model_opt_class( self.hparams.model.parameters() ) if self.checkpointer is not None: self.checkpointer.add_recoverable("modelopt", self.model_optimizer) def zero_grad(self, set_to_none=False): self.model_optimizer.zero_grad(set_to_none) hparams_file, run_opts, overrides = sb.parse_arguments(sys.argv[1:]) # If distributed_launch=True then # create ddp_group with the right communication protocol sb.utils.distributed.ddp_init_group(run_opts) with open(hparams_file) as fin: hparams = load_hyperpyyaml(fin, overrides) # Create experiment directory sb.create_experiment_directory( experiment_directory=hparams["output_folder"], hyperparams_to_save=hparams_file, overrides=overrides, ) def read_labels_file(labels_file): with open(labels_file, "r",encoding="utf-8") as lf: lines = lf.read().splitlines() division = "===" numbers = {} for line in lines : if division in line : break string, number = line.split("=>") number = int(number) string = string[1:-2] numbers[number] = string return [numbers[x] for x in range(len(numbers))] train_data, valid_data, test_datasets, label_encoder = dataio_prepare( hparams ) labels = read_labels_file(os.path.join(hparams["save_folder"], "label_encoder.txt")) labels = [""] + labels[1:-1] + ["1"] if hparams["language_modelling"]: decoder = build_ctcdecoder( labels, kenlm_model_path=hparams["ngram_lm_path"], # either .arpa or .bin file alpha=0.5, # tuned on a val set beta=1, # tuned on a val set ) mixer = Mixer( modules=hparams["modules"], hparams=hparams, run_opts=run_opts, checkpointer=hparams["checkpointer"], ) mixer.tokenizer = label_encoder mixer.fit( mixer.hparams.epoch_counter, train_data, valid_data, train_loader_kwargs=hparams["dataloader_options"], valid_loader_kwargs=hparams["test_dataloader_options"], ) print(test_datasets.keys()) for k in test_datasets.keys(): # keys are test_clean, test_other etc mixer.hparams.wer_file = os.path.join( hparams["output_folder"], "wer_{}.txt".format(k) ) mixer.evaluate( test_datasets[k], test_loader_kwargs=hparams["test_dataloader_options"] )