|
fine_tuning_dir = "fine_tuned/SSD/model/Negel_79_AVA_script_conv_train_conv_dev/checkpoint-50" |
|
|
|
from typing import Any, Dict, List, Union |
|
from dataclasses import dataclass |
|
from transformers import Seq2SeqTrainer |
|
from transformers import WhisperProcessor, WhisperForConditionalGeneration, WhisperTokenizer, WhisperFeatureExtractor, Seq2SeqTrainingArguments, Seq2SeqTrainer, WhisperModel |
|
import evaluate |
|
from transformers import Wav2Vec2Processor, Wav2Vec2ForCTC |
|
from random import sample |
|
from sys import flags |
|
import gradio as gr |
|
import torchaudio |
|
import torch.nn as nn |
|
import jiwer |
|
import numpy as np |
|
from rich import print as rprint |
|
from rich.progress import track |
|
from transformers import pipeline |
|
import argparse |
|
import yaml |
|
import torch |
|
from pathlib import Path |
|
from transformers import AutoTokenizer, AutoFeatureExtractor, AutoModelForCTC, AutoProcessor |
|
from datasets import load_dataset, concatenate_datasets |
|
from datasets import Dataset, Audio |
|
import pdb |
|
import string |
|
import librosa |
|
|
|
import sys |
|
|
|
sys.path.append("src") |
|
import lightning_module |
|
|
|
torch.cuda.set_device("cuda:0") |
|
|
|
audio_dir = "./data/Patient_sil_trim_16k_normed_5_snr_40" |
|
healthy_dir = "./data/Healthy" |
|
Fary_PAL_30 = "./data/Fary_PAL_p326_20230110_30" |
|
John_p326 = "./data/John_p326/output" |
|
John_video = "./data/20230103_video" |
|
negel_79 = "./data/4_negel_79" |
|
|
|
patient_T = "data/Patient_T/Patient_T" |
|
patient_L = "data/Patient_L/Patient_L" |
|
|
|
""" |
|
TODO: |
|
[DONE]: Automatic generating Config |
|
""" |
|
|
|
|
|
sys.path.append("./src") |
|
|
|
|
|
wer = evaluate.load("wer") |
|
|
|
|
|
|
|
|
|
class ChangeSampleRate(nn.Module): |
|
def __init__(self, input_rate: int, output_rate: int): |
|
super().__init__() |
|
self.output_rate = output_rate |
|
self.input_rate = input_rate |
|
|
|
def forward(self, wav: torch.tensor) -> torch.tensor: |
|
|
|
wav = wav.view(wav.size(0), -1) |
|
new_length = wav.size(-1) * self.output_rate // self.input_rate |
|
indices = torch.arange(new_length) * ( |
|
self.input_rate / self.output_rate |
|
) |
|
round_down = wav[:, indices.long()] |
|
round_up = wav[:, (indices.long() + 1).clamp(max=wav.size(-1) - 1)] |
|
output = round_down * (1.0 - indices.fmod(1.0)).unsqueeze( |
|
0 |
|
) + round_up * indices.fmod(1.0).unsqueeze(0) |
|
return output |
|
|
|
|
|
|
|
|
|
def dataclean(example): |
|
|
|
if example['audio']['sampling_rate'] != 16000: |
|
resampled_audio = librosa.resample(y=example['audio']['array'], |
|
orig_sr=example['audio']['sampling_rate'], |
|
target_sr=16000) |
|
|
|
|
|
|
|
return {"audio": {"path": example['audio']['path'], "array": resampled_audio, "sampling_rate": 16000}, |
|
"transcription": example["transcription"].upper().translate(str.maketrans('', '', string.punctuation))} |
|
else: |
|
return {"transcription": example["transcription"].upper().translate(str.maketrans('', '', string.punctuation))} |
|
|
|
processor = AutoFeatureExtractor.from_pretrained( |
|
"facebook/wav2vec2-base-960h" |
|
) |
|
|
|
def prepare_dataset(batch): |
|
audio = batch["audio"] |
|
batch = processor( |
|
audio["array"], sampling_rate=audio["sampling_rate"], text=batch['transcription']) |
|
batch["input_length"] = len(batch["input_values"][0]) |
|
return batch |
|
|
|
|
|
negel_79_dataset = load_dataset("audiofolder", data_dir=negel_79, split="train") |
|
negel_79_dataset = negel_79_dataset.map(dataclean) |
|
|
|
def train_dev_test_split(dataset: Dataset, dev_rate=0.1, test_rate=0.1, seed=1): |
|
""" |
|
input: dataset |
|
dev_rate, |
|
test_rate |
|
seed |
|
------- |
|
Output: |
|
dataset_dict{"train", "dev", "test"} |
|
""" |
|
train_dev_test = dataset.train_test_split(test_size=test_rate, seed=seed) |
|
test = train_dev_test["test"] |
|
train_dev = train_dev_test['train'] |
|
|
|
|
|
if len(train_dev) <= int(len(dataset)*dev_rate): |
|
train = Dataset.from_dict({"audio": [], "transcription": []}) |
|
dev = train_dev |
|
else: |
|
train_dev = train_dev.train_test_split(test_size=int(len(dataset)*dev_rate), seed=seed) |
|
train = train_dev['train'] |
|
dev = train_dev['test'] |
|
return train, dev, test |
|
|
|
|
|
|
|
|
|
|
|
|
|
Negel_79_train, Negel_79_dev, Negel_79_test = train_dev_test_split(negel_79_dataset, dev_rate=0.1, test_rate=0.1, seed=1) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
WER = evaluate.load("wer") |
|
|
|
|
|
|
|
processor = WhisperProcessor.from_pretrained("openai/whisper-medium") |
|
model = WhisperForConditionalGeneration.from_pretrained( |
|
"openai/whisper-medium").to("cuda:0") |
|
tokenizer = WhisperTokenizer.from_pretrained( |
|
"openai/whisper-medium", language="English", task="transcribe") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
feature_extractor = WhisperFeatureExtractor.from_pretrained( |
|
"openai/whisper-medium") |
|
|
|
|
|
def whisper_prepare_dataset(batch): |
|
|
|
audio = batch["audio"] |
|
|
|
|
|
batch["input_features"] = feature_extractor( |
|
audio["array"], sampling_rate=audio["sampling_rate"]).input_features[0] |
|
|
|
|
|
batch["labels"] = tokenizer(batch["transcription"]).input_ids |
|
return batch |
|
|
|
|
|
torch.cuda.empty_cache() |
|
|
|
training_args = Seq2SeqTrainingArguments( |
|
|
|
output_dir="./whisper-medium-PAL128-25step", |
|
per_device_train_batch_size=8, |
|
gradient_accumulation_steps=1, |
|
learning_rate=1e-5, |
|
warmup_steps=100, |
|
max_steps=1000, |
|
gradient_checkpointing=True, |
|
fp16=True, |
|
evaluation_strategy="steps", |
|
per_device_eval_batch_size=8, |
|
predict_with_generate=True, |
|
generation_max_length=512, |
|
save_steps=100, |
|
eval_steps=25, |
|
logging_steps=100, |
|
report_to=["tensorboard"], |
|
load_best_model_at_end=True, |
|
metric_for_best_model="wer", |
|
greater_is_better=False, |
|
push_to_hub=True, |
|
) |
|
|
|
|
|
def my_map_to_pred(batch): |
|
|
|
audio = batch["audio"] |
|
input_features = processor( |
|
audio["array"], sampling_rate=audio["sampling_rate"], return_tensors="pt").input_features |
|
|
|
batch["reference"] = processor.tokenizer._normalize(batch['transcription']) |
|
|
|
with torch.no_grad(): |
|
|
|
predicted_ids = model.generate(input_features.to("cuda"))[0] |
|
transcription = model.decode(predicted_ids) |
|
batch["prediction"] = model.tokenizer._normalize(transcription) |
|
return batch |
|
|
|
|
|
@dataclass |
|
class DataCollatorSpeechSeq2SeqWithPadding: |
|
processor: Any |
|
|
|
def __call__(self, features: List[Dict[str, Union[List[int], torch.Tensor]]]) -> Dict[str, torch.Tensor]: |
|
|
|
|
|
input_features = [{"input_features": feature["input_features"]} |
|
for feature in features] |
|
batch = self.processor.feature_extractor.pad( |
|
input_features, return_tensors="pt") |
|
|
|
|
|
label_features = [{"input_ids": feature["labels"]} |
|
for feature in features] |
|
|
|
labels_batch = self.processor.tokenizer.pad( |
|
label_features, return_tensors="pt") |
|
|
|
|
|
labels = labels_batch["input_ids"].masked_fill( |
|
labels_batch.attention_mask.ne(1), -100) |
|
|
|
|
|
|
|
if (labels[:, 0] == self.processor.tokenizer.bos_token_id).all().cpu().item(): |
|
labels = labels[:, 1:] |
|
|
|
batch["labels"] = labels |
|
|
|
return batch |
|
|
|
|
|
data_collator = DataCollatorSpeechSeq2SeqWithPadding(processor=processor) |
|
|
|
|
|
def compute_metrics(pred): |
|
pdb.set_trace() |
|
pred_ids = pred.predictions |
|
label_ids = pred.label_ids |
|
|
|
|
|
label_ids[label_ids == -100] = tokenizer.pad_token_id |
|
|
|
|
|
pred_str = tokenizer.batch_decode(pred_ids, skip_special_tokens=True) |
|
label_str = tokenizer.batch_decode(label_ids, skip_special_tokens=True) |
|
|
|
wer = 100 * WER.compute(predictions=pred_str, references=label_str) |
|
|
|
return {"wer": wer} |
|
|
|
encode_negel_79_train = Negel_79_train.map(whisper_prepare_dataset, num_proc=4) |
|
encode_negel_79_dev = Negel_79_dev.map(whisper_prepare_dataset, num_proc=4) |
|
encode_negel_79_test = Negel_79_test.map(whisper_prepare_dataset, num_proc=4) |
|
pdb.set_trace() |
|
torch.cuda.empty_cache() |
|
|
|
torch.cuda.empty_cache() |
|
|
|
fine_tuned_model = WhisperForConditionalGeneration.from_pretrained( |
|
fine_tuning_dir |
|
).to("cuda") |
|
|
|
|
|
|
|
|
|
|
|
def fine_tuned_map_to_pred(batch): |
|
|
|
audio = batch["audio"] |
|
input_features = processor( |
|
audio["array"], sampling_rate=audio["sampling_rate"], return_tensors="pt").input_features |
|
|
|
batch["reference"] = processor.tokenizer._normalize(batch['transcription']) |
|
|
|
with torch.no_grad(): |
|
|
|
predicted_ids = fine_tuned_model.generate(input_features.to("cuda"))[0] |
|
transcription = tokenizer.decode(predicted_ids) |
|
batch["prediction"] = tokenizer._normalize(transcription) |
|
return batch |
|
|
|
|
|
|
|
testing_args = Seq2SeqTrainingArguments( |
|
|
|
output_dir="fine_tuned/SSD/model/whipser_medium_TEP_patient_TL_TL", |
|
per_device_train_batch_size=8, |
|
gradient_accumulation_steps=1, |
|
learning_rate=1e-5, |
|
warmup_steps=100, |
|
max_steps=1000, |
|
gradient_checkpointing=True, |
|
fp16=True, |
|
evaluation_strategy="steps", |
|
per_device_eval_batch_size=8, |
|
predict_with_generate=True, |
|
generation_max_length=512, |
|
save_steps=100, |
|
eval_steps=25, |
|
logging_steps=100, |
|
report_to=["tensorboard"], |
|
load_best_model_at_end=True, |
|
metric_for_best_model="wer", |
|
greater_is_better=False, |
|
push_to_hub=False, |
|
) |
|
|
|
predict_trainer = Seq2SeqTrainer( |
|
args=testing_args, |
|
model=fine_tuned_model, |
|
data_collator=data_collator, |
|
compute_metrics=compute_metrics, |
|
tokenizer=processor.feature_extractor, |
|
) |
|
|
|
|
|
|
|
|
|
pdb.set_trace() |
|
z_result= encode_negel_79_test.map(fine_tuned_map_to_pred) |
|
|
|
z = WER.compute(references=z_result['reference'], predictions=z_result['prediction']) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
pdb.set_trace() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|