Leaderboard / app.py
MCK-02's picture
Update app.py
4aba0dc
import streamlit as st
import pandas as pd
import numpy as np
import torch
import evaluate
from datasets import load_dataset
from evaluate import load as load_metric
from transformers import AutoModelForSeq2SeqLM, AutoTokenizer
from sklearn.metrics import accuracy_score, f1_score
from tqdm.auto import tqdm
from torch.utils.data import DataLoader
st.set_page_config(layout="wide")
select = st.selectbox('Which model would you like to evaluate?',
('Bart', 'mBart'))
def get_datasets():
if select == 'Bart':
all_datasets = ["Communication Networks: unseen questions", "Communication Networks: unseen answers"]
if select == 'mBart':
all_datasets = ["Micro Job: unseen questions", "Micro Job: unseen answers", "Legal Domain: unseen questions", "Legal Domain: unseen answers"]
return all_datasets
all_datasets = get_datasets()
#def get_split(dataset_name):
# if dataset_name == "Communication Networks: unseen questions":
# split = load_dataset("Short-Answer-Feedback/saf_communication_networks_english", split="test_unseen_questions")
# if dataset_name == "Communication Networks: unseen answers":
# split = load_dataset("Short-Answer-Feedback/saf_communication_networks_english", split="test_unseen_answers")
# if dataset_name == "Micro Job: unseen questions":
# split = load_dataset("Short-Answer-Feedback/saf_micro_job_german", split="test_unseen_questions")
# if dataset_name == "Micro Job: unseen answers":
# split = load_dataset("Short-Answer-Feedback/saf_micro_job_german", split="test_unseen_answers")
# if dataset_name == "Legal Domain: unseen questions":
# split = load_dataset("Short-Answer-Feedback/saf_legal_domain_german", split="test_unseen_questions")
# if dataset_name == "Legal Domain: unseen answers":
# split = load_dataset("Short-Answer-Feedback/saf_legal_domain_german", split="test_unseen_answers")
# return split
def get_model(datasetname):
if datasetname == "Communication Networks: unseen questions" or datasetname == "Communication Networks: unseen answers":
model = "Short-Answer-Feedback/bart-finetuned-saf-communication-networks"
if datasetname == "Micro Job: unseen questions" or datasetname == "Micro Job: unseen answers":
model = "Short-Answer-Feedback/mbart-finetuned-saf-micro-job"
if datasetname == "Legal Domain: unseen questions" or datasetname == "Legal Domain: unseen answers":
model = "Short-Answer-Feedback/mbart-finetuned-saf-legal-domain"
return model
# def get_tokenizer(datasetname):
# if datasetname == "Communication Networks: unseen questions" or datasetname == "Communication Networks: unseen answers":
# tokenizer = "Short-Answer-Feedback/bart-finetuned-saf-communication-networks"
# if datasetname == "Micro Job: unseen questions" or datasetname == "Micro Job: unseen answers":
# tokenizer = "Short-Answer-Feedback/mbart-finetuned-saf-micro-job"
# if datasetname == "Legal Domain: unseen questions" or datasetname == "Legal Domain: unseen answers":
# tokenizer = "Short-Answer-Feedback/mbart-finetuned-saf-legal-domain"
# return tokenizer
# sacrebleu = load_metric('sacrebleu')
# rouge = load_metric('rouge')
# meteor = load_metric('meteor')
# bertscore = load_metric('bertscore')
# # use gpu if it's available
# device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
# MAX_INPUT_LENGTH = 256
# MAX_TARGET_LENGTH = 128
# def preprocess_function(examples, **kwargs):
# """
# Preprocess entries of the given dataset
# Params:
# examples (Dataset): dataset to be preprocessed
# Returns:
# model_inputs (BatchEncoding): tokenized dataset entries
# """
# inputs, targets = [], []
# for i in range(len(examples['question'])):
# inputs.append(f"Antwort: {examples['provided_answer'][i]} Lösung: {examples['reference_answer'][i]} Frage: {examples['question'][i]}")
# targets.append(f"{examples['verification_feedback'][i]} Feedback: {examples['answer_feedback'][i]}")
# # apply tokenization to inputs and labels
# tokenizer = kwargs["tokenizer"]
# model_inputs = tokenizer(inputs, max_length=MAX_INPUT_LENGTH, padding='max_length', truncation=True)
# labels = tokenizer(text_target=targets, max_length=MAX_TARGET_LENGTH, padding='max_length', truncation=True)
# model_inputs['labels'] = labels['input_ids']
# return model_inputs
# def flatten_list(l):
# """
# Utility function to convert a list of lists into a flattened list
# Params:
# l (list of lists): list to be flattened
# Returns:
# A flattened list with the elements of the original list
# """
# return [item for sublist in l for item in sublist]
# def extract_feedback(predictions):
# """
# Utility function to extract the feedback from the predictions of the model
# Params:
# predictions (list): complete model predictions
# Returns:
# feedback (list): extracted feedback from the model's predictions
# """
# feedback = []
# # iterate through predictions and try to extract predicted feedback
# for pred in predictions:
# try:
# fb = pred.split(':', 1)[1]
# except IndexError:
# try:
# if pred.lower().startswith('partially correct'):
# fb = pred.split(' ', 1)[2]
# else:
# fb = pred.split(' ', 1)[1]
# except IndexError:
# fb = pred
# feedback.append(fb.strip())
# return feedback
# def extract_labels(predictions):
# """
# Utility function to extract the labels from the predictions of the model
# Params:
# predictions (list): complete model predictions
# Returns:
# feedback (list): extracted labels from the model's predictions
# """
# labels = []
# for pred in predictions:
# if pred.lower().startswith('correct'):
# label = 'Correct'
# elif pred.lower().startswith('partially correct'):
# label = 'Partially correct'
# elif pred.lower().startswith('incorrect'):
# label = 'Incorrect'
# else:
# label = 'Unknown label'
# labels.append(label)
# return labels
# def get_predictions_labels(model, dataloader, tokenizer):
# """
# Evaluate model on the given dataset
# Params:
# model (PreTrainedModel): seq2seq model
# dataloader (torch Dataloader): dataloader of the dataset to be used for evaluation
# Returns:
# results (dict): dictionary with the computed evaluation metrics
# predictions (list): list of the decoded predictions of the model
# """
# decoded_preds, decoded_labels = [], []
# model.eval()
# # iterate through batchs in the dataloader
# for batch in tqdm(dataloader):
# with torch.no_grad():
# batch = {k: v.to(device) for k, v in batch.items()}
# # generate tokens from batch
# generated_tokens = model.generate(
# batch['input_ids'],
# attention_mask=batch['attention_mask'],
# max_length=MAX_TARGET_LENGTH
# )
# # get golden labels from batch
# labels_batch = batch['labels']
# # decode model predictions and golden labels
# decoded_preds_batch = tokenizer.batch_decode(generated_tokens, skip_special_tokens=True)
# decoded_labels_batch = tokenizer.batch_decode(labels_batch, skip_special_tokens=True)
# decoded_preds.append(decoded_preds_batch)
# decoded_labels.append(decoded_labels_batch)
# # convert predictions and golden labels into flattened lists
# predictions = flatten_list(decoded_preds)
# labels = flatten_list(decoded_labels)
# return predictions, labels
# def load_data():
# df = pd.DataFrame(columns=['Model', 'Dataset', 'SacreBLEU', 'ROUGE-2', 'METEOR', 'BERTScore', 'Accuracy', 'Weighted F1', 'Macro F1'])
# for ds in all_datasets:
# split = get_split(ds)
# model = AutoModelForSeq2SeqLM.from_pretrained(get_model(ds))
# tokenizer = AutoTokenizer.from_pretrained(get_tokenizer(ds))
# processed_dataset = split.map(
# preprocess_function,
# batched=True,
# remove_columns=split.column_names,
# fn_kwargs={"tokenizer": tokenizer}
# )
# processed_dataset.set_format('torch')
# dataloader = DataLoader(processed_dataset, batch_size=4)
# predictions, labels = get_predictions_labels(model, dataloader, tokenizer)
# predicted_feedback = extract_feedback(predictions)
# predicted_labels = extract_labels(predictions)
# reference_feedback = [x.split('Feedback:', 1)[1].strip() for x in labels]
# reference_labels = [x.split('Feedback:', 1)[0].strip() for x in labels]
# rouge_score = rouge.compute(predictions=predicted_feedback, references=reference_feedback)['rouge2']
# bleu_score = sacrebleu.compute(predictions=predicted_feedback, references=[[x] for x in reference_feedback])['score']
# meteor_score = meteor.compute(predictions=predicted_feedback, references=reference_feedback)['meteor']
# bert_score = bertscore.compute(predictions=predicted_feedback, references=reference_feedback, lang='de', model_type='bert-base-multilingual-cased', rescale_with_baseline=True)
# reference_labels_np = np.array(reference_labels)
# accuracy_value = accuracy_score(reference_labels_np, predicted_labels)
# f1_weighted_value = f1_score(reference_labels_np, predicted_labels, average='weighted')
# f1_macro_value = f1_score(reference_labels_np, predicted_labels, average='macro', labels=['Incorrect', 'Partially correct', 'Correct'])
# new_row_data = {"Model": get_model(ds), "Dataset": ds, "SacreBLEU": bleu_score, "ROUGE-2": rouge_score, "METEOR": meteor_score, "BERTScore": bert_score, "Accuracy": accuracy_value, "Weighted F1": f1_weighted_value, "Macro F1": f1_macro_value}
# new_row = pd.DataFrame(new_row_data)
# df = pd.concat([df, new_row])
# return df
def get_rows(datasetname):
if datasetname == "Communication Networks: unseen questions":
row = pd.DataFrame(
{
'Model': get_model(datasetname),
'Dataset': datasetname,
'SacreBLEU': [2.4],
'ROUGE-2': [20.1],
'METEOR': [28.5],
'BERTScore': [36.6],
'Accuracy': [51.6],
'Weighted F1': [41.0],
'Macro F1': [27.9],
}
)
if datasetname == "Communication Networks: unseen answers":
row = pd.DataFrame(
{
'Model': get_model(datasetname),
'Dataset': datasetname,
'SacreBLEU': [36.0],
'ROUGE-2': [49.1],
'METEOR': [60.8],
'BERTScore': [69.5],
'Accuracy': [76.0],
'Weighted F1': [73.0],
'Macro F1': [53.4],
}
)
if datasetname == "Micro Job: unseen questions":
row = pd.DataFrame(
{
'Model': get_model(datasetname),
'Dataset': datasetname,
'SacreBLEU': [0.3],
'ROUGE-2': [0.5],
'METEOR': [33.8],
'BERTScore': [31.3],
'Accuracy': [48.7],
'Weighted F1': [46.5],
'Macro F1': [40.6],
}
)
if datasetname == "Micro Job: unseen answers":
row = pd.DataFrame(
{
'Model': get_model(datasetname),
'Dataset': datasetname,
'SacreBLEU': [39.5],
'ROUGE-2': [29.8],
'METEOR': [63.3],
'BERTScore': [63.1],
'Accuracy': [80.1],
'Weighted F1': [80.3],
'Macro F1': [80.7],
}
)
if datasetname == "Legal Domain: unseen questions":
row = pd.DataFrame(
{
'Model': get_model(datasetname),
'Dataset': datasetname,
'SacreBLEU': [3.2],
'ROUGE-2': [5.0],
'METEOR': [20.0],
'BERTScore': [14.8],
'Accuracy': [60.7],
'Weighted F1': [55.3],
'Macro F1': [55.4],
}
)
if datasetname == "Legal Domain: unseen answers":
row = pd.DataFrame(
{
'Model': get_model(datasetname),
'Dataset': datasetname,
'SacreBLEU': [42.8],
'ROUGE-2': [43.7],
'METEOR': [58.2],
'BERTScore': [57.5],
'Accuracy': [81.0],
'Weighted F1': [80.1],
'Macro F1': [74.6],
}
)
return row
def load_data():
df = pd.DataFrame(columns=['Model', 'Dataset', 'SacreBLEU', 'ROUGE-2', 'METEOR', 'BERTScore', 'Accuracy', 'Weighted F1', 'Macro F1'])
for ds in all_datasets:
new_row = get_rows(ds)
df = pd.concat([df, new_row], ignore_index=True)
return df
dataframe = load_data()
st.dataframe(dataframe)