import glob import sys from pathlib import Path import shutil import os import zipfile from espnet2.tasks.s2t import S2TTask from espnet2.text.sentencepiece_tokenizer import SentencepiecesTokenizer from espnet2.text.token_id_converter import TokenIDConverter from espnet2.s2t.espnet_model import ESPnetS2TModel from espnet2.bin.s2t_inference import Speech2Text import espnetez as ez import torch import numpy as np import logging import gradio as gr import librosa def log(temp_dir, text): with open(f"{temp_dir}/output.log", "a") as f: f.write(text + "\n") def count_parameters(model): return sum(p.numel() for p in model.parameters() if p.requires_grad) def get_dataset(data_path, data_info, test_count=10): # load data data = {} keys = [] with open(f"{data_path}/text", "r", encoding="utf-8") as f: for line in f.readlines(): audio_id, text = line.split(maxsplit=1) data[audio_id.strip()] = {"text": text.strip()} keys.append(audio_id.strip()) # load text_ctc data with open(f"{data_path}/text_ctc", "r", encoding="utf-8") as f: for line in f.readlines(): audio_id, text = line.split(maxsplit=1) data[audio_id.strip()]["text_ctc"] = text.strip() # load audio path for audio_path in glob.glob(f"{data_path}/audio/*"): audio_id = Path(audio_path).stem data[audio_id]["audio_path"] = audio_path # Convert to list data = [{ 'id': audio_id, 'text': data[audio_id]['text'], 'text_ctc': data[audio_id]['text_ctc'], 'audio_path': data[audio_id]['audio_path'], } for audio_id in keys] return ez.dataset.ESPnetEZDataset(data[test_count:], data_info), ez.dataset.ESPnetEZDataset(data[:test_count], data_info), data[:test_count] class CustomFinetuneModel(ESPnetS2TModel): def __init__(self, model, tempdir_path, log_every=500): super().__init__( vocab_size=model.vocab_size, token_list=model.token_list, frontend=model.frontend, specaug=model.specaug, normalize=model.normalize, preencoder=model.preencoder, encoder=model.encoder, postencoder=model.postencoder, decoder=model.decoder, ctc=model.ctc, ctc_weight=model.ctc_weight, interctc_weight=model.interctc_weight, ignore_id=model.ignore_id, lsm_weight=0.0, length_normalized_loss=False, report_cer=False, report_wer=False, sym_space="", sym_blank="", sym_sos = "", sym_eos = "", sym_sop = "", # start of prev sym_na = "", # not available extract_feats_in_collect_stats=model.extract_feats_in_collect_stats, ) self.iter_count = 0 self.log_every = log_every self.log_stats = { 'loss': 0.0, 'acc': 0.0 } self.tempdir_path = tempdir_path def forward(self, *args, **kwargs): out = super().forward(*args, **kwargs) self.log_stats['loss'] += out[1]['loss'].item() self.log_stats['acc'] += out[1]['acc'].item() self.iter_count += 1 if self.iter_count % self.log_every == 0: loss = self.log_stats['loss'] / self.log_every acc = self.log_stats['acc'] / self.log_every log(self.tempdir_path, f"[{self.iter_count}] - loss: {loss:.3f} - acc: {acc:.3f}") self.log_stats['loss'] = 0.0 self.log_stats['acc'] = 0.0 return out def finetune_model(lang, task, tempdir_path, log_every, max_epoch, scheduler, warmup_steps, optimizer, learning_rate, weight_decay): """Main function for finetuning the model.""" log(tempdir_path, "Start generating baseline...") gr.Info("Start generating baseline...") ref, base = baseline_model(lang, task, tempdir_path) log(tempdir_path, "Start generating hypothesis...") gr.Info("Start Fine-tuning process...") if len(tempdir_path) == 0: raise gr.Error("Please upload a zip file first.") # define tokenizer tokenizer = SentencepiecesTokenizer("assets/owsm_ebf_v3.1_base/bpe.model") converter = TokenIDConverter("assets/owsm_ebf_v3.1_base/tokens.txt") def tokenize(text): return np.array(converter.tokens2ids(tokenizer.text2tokens(text))) data_info = { "speech": lambda d: librosa.load(d["audio_path"], sr=16000)[0], "text": lambda d: tokenize(f"<{lang}><{task}> {d['text']}"), "text_ctc": lambda d: tokenize(d["text_ctc"]), "text_prev": lambda d: tokenize(""), } # load dataset and define data_info train_dataset, test_dataset, test_list = get_dataset(tempdir_path, data_info) log(tempdir_path, "Loading dataset...") gr.Info("Loaded dataset.") # load and update configuration log(tempdir_path, "Setting up the training configuration...") pretrain_config = ez.config.from_yaml( "s2t", "assets/owsm_ebf_v3.1_base/config.yaml", ) finetune_config = ez.config.update_finetune_config( "s2t", pretrain_config, "assets/owsm_ebf_v3.1_base/owsm_finetune_base.yaml" ) finetune_config['max_epoch'] = max_epoch finetune_config['optim'] = optimizer finetune_config['optim_conf']['lr'] = learning_rate finetune_config['optim_conf']['weight_decay'] = weight_decay finetune_config['scheduler'] = scheduler finetune_config['scheduler_conf']['warmup_steps'] = warmup_steps finetune_config['multiple_iterator'] = False finetune_config['num_iters_per_epoch'] = None finetune_config['multiprocessing_distributed'] = False def build_model_fn(args): model, _ = S2TTask.build_model_from_file( "assets/owsm_ebf_v3.1_base/config.yaml", "assets/owsm_ebf_v3.1_base/owsm_v3.1_base.trained.pth", device="cuda" if torch.cuda.is_available() else "cpu", ) model.train() log(tempdir_path, f'Trainable parameters: {count_parameters(model)}') model = CustomFinetuneModel(model, tempdir_path, log_every=log_every) return model trainer = ez.Trainer( task='s2t', train_config=finetune_config, train_dataset=train_dataset, valid_dataset=test_dataset, build_model_fn=build_model_fn, # provide the pre-trained model data_info=data_info, output_dir=f"{tempdir_path}/exp/finetune", stats_dir=f"{tempdir_path}/exp/stats", ngpu=1 ) gr.Info("start collect stats") log(tempdir_path, "Start collect stats process...") trainer.collect_stats() gr.Info("Finished collect stats, starting training.") log(tempdir_path, "Finished collect stats, starting training...") trainer.train() gr.Info("Finished Fine-tuning!") gr.Info("Start generating output for test set!") log(tempdir_path, "Start generating output for test set!") del trainer model = Speech2Text( "assets/owsm_ebf_v3.1_base/config.yaml", "assets/owsm_ebf_v3.1_base/owsm_v3.1_base.trained.pth", device="cuda" if torch.cuda.is_available() else "cpu", token_type="bpe", bpemodel="assets/owsm_ebf_v3.1_base/bpe.model", beam_size=5, ctc_weight=0.0, lang_sym=f"<{lang}>", task_sym=f"<{task}>", ) model.s2t_model.eval() d = torch.load(f"{tempdir_path}/exp/finetune/valid.acc.ave.pth") model.s2t_model.load_state_dict(d) hyp = "" with open(f"{tempdir_path}/hyp.txt", "w") as f_hyp: for i in range(len(test_list)): data = test_list[i] out = model(librosa.load(data['audio_path'], sr=16000)[0])[0][3] f_hyp.write(out + '\n') hyp += out + '\n' log(tempdir_path, "Finished fine-tuning.") log(tempdir_path, "Start archiving experiment files...") log(tempdir_path, "Create zip file for the following files into `finetune.zip`:") log(tempdir_path, "exp/s2t_stats_raw_bpe50000") log(tempdir_path, "exp/finetune/tensorboard") log(tempdir_path, "exp/finetune/images") log(tempdir_path, "exp/finetune/config.yaml") log(tempdir_path, "exp/finetune/valid.acc.ave.pth") log(tempdir_path, "output.log") log(tempdir_path, "acc.png") log(tempdir_path, "loss.png") log(tempdir_path, "base.txt") log(tempdir_path, "hyp.txt") log(tempdir_path, "ref.txt") finetune_zip = zipfile.ZipFile(f"{tempdir_path}/finetune.zip", "w", zipfile.ZIP_DEFLATED) finetune_zip.write(f"{tempdir_path}/exp/stats") finetune_zip.write(f"{tempdir_path}/exp/finetune/tensorboard") finetune_zip.write(f"{tempdir_path}/exp/finetune/images") finetune_zip.write(f"{tempdir_path}/exp/finetune/config.yaml") finetune_zip.write(f"{tempdir_path}/exp/finetune/valid.acc.ave.pth") finetune_zip.write(f"{tempdir_path}/output.log") finetune_zip.write(f"{tempdir_path}/acc.png") finetune_zip.write(f"{tempdir_path}/loss.png") finetune_zip.write(f"{tempdir_path}/base.txt") finetune_zip.write(f"{tempdir_path}/hyp.txt") finetune_zip.write(f"{tempdir_path}/ref.txt") finetune_zip.close() gr.Info("Finished generating result file in zip!") log(tempdir_path, "Finished generating result file in zip!") return [f"{tempdir_path}/finetune.zip", f"{tempdir_path}/ref.txt", f"{tempdir_path}/base.txt", f"{tempdir_path}/hyp.txt"], ref, base, hyp def baseline_model(lang, task, tempdir_path): log(tempdir_path, "Start loading dataset...") if len(tempdir_path) == 0: log(tempdir_path, "Please upload a zip file first.") raise gr.Error("Please upload a zip file first.") # define tokenizer tokenizer = SentencepiecesTokenizer("assets/owsm_ebf_v3.1_base/bpe.model") converter = TokenIDConverter("assets/owsm_ebf_v3.1_base/tokens.txt") def tokenize(text): return np.array(converter.tokens2ids(tokenizer.text2tokens(text))) data_info = { "speech": lambda d: librosa.load(d["audio_path"], sr=16000)[0], "text": lambda d: tokenize(f"<{lang}><{task}> {d['text']}"), "text_ctc": lambda d: tokenize(d["text_ctc"]), "text_prev": lambda d: tokenize(""), } # load dataset and define data_info train_dataset, test_dataset, test_list = get_dataset(tempdir_path, data_info) log(tempdir_path, "Loaded dataset.") gr.Info("Loaded dataset.") gr.Info("Loading pretrained model...") log(tempdir_path, "Loading pretrained model...") model = Speech2Text( "assets/owsm_ebf_v3.1_base/config.yaml", "assets/owsm_ebf_v3.1_base/owsm_v3.1_base.trained.pth", device="cuda" if torch.cuda.is_available() else "cpu", token_type="bpe", bpemodel="assets/owsm_ebf_v3.1_base/bpe.model", beam_size=5, ctc_weight=0.3, lang_sym=f"<{lang}>", task_sym=f"<{task}>", ) model.s2t_model.eval() base = "" ref = "" with open(f"{tempdir_path}/base.txt", "w") as f_base, open(f"{tempdir_path}/ref.txt", "w") as f_ref: for i in range(len(test_list)): data = test_list[i] f_ref.write(data['text'] + '\n') out = model(librosa.load(data['audio_path'], sr=16000)[0])[0][3] f_base.write(out + '\n') ref += data['text'] + '\n' base += out + '\n' return ref, base