|
import json |
|
import os |
|
from datetime import datetime, timezone |
|
|
|
import torch |
|
import pandas as pd |
|
import numpy as np |
|
from datasets import load_dataset |
|
from transformers import AutoTokenizer, AutoModelForCausalLM |
|
from langchain.prompts import PromptTemplate |
|
|
|
from src.display.formatting import styled_error, styled_message, styled_warning |
|
from src.envs import API, EVAL_REQUESTS_PATH, TOKEN, QUEUE_REPO, EVAL_RESULTS_PATH, RESULTS_REPO |
|
from src.submission.check_validity import ( |
|
already_submitted_models, |
|
check_model_card, |
|
get_model_size, |
|
is_model_on_hub, |
|
) |
|
|
|
REQUESTED_MODELS = None |
|
USERS_TO_SUBMISSION_DATES = None |
|
|
|
def get_top_prediction(text, tokenizer, model): |
|
inputs = tokenizer(text, return_tensors='pt') |
|
if torch.cuda.is_available(): |
|
model = model.cuda() |
|
inputs = {k: v.cuda() for k, v in inputs.items()} |
|
|
|
with torch.no_grad(): |
|
outputs = model(**inputs) |
|
logits = outputs.logits[0, -1] |
|
|
|
options = [' A', ' B', ' C', ' D'] |
|
option_logits = [] |
|
for option in options: |
|
option_id = tokenizer(option).input_ids[-1] |
|
option_logit = logits[option_id] |
|
option_logits.append((option_logit.item(), option.strip())) |
|
|
|
|
|
top_option = max(option_logits, key=lambda x: x[0])[1] |
|
return top_option |
|
|
|
def evaluate_model_accuracy(model_name, num_examples): |
|
try: |
|
|
|
tokenizer = AutoTokenizer.from_pretrained(model_name, trust_remote_code=True) |
|
tokenizer.pad_token = tokenizer.eos_token |
|
|
|
model = AutoModelForCausalLM.from_pretrained( |
|
model_name, |
|
trust_remote_code=True |
|
) |
|
if torch.cuda.is_available(): |
|
model = model.cuda() |
|
|
|
|
|
dataset = load_dataset("Omartificial-Intelligence-Space/Arabic_Openai_MMMLU") |
|
dataset = dataset['test'] |
|
|
|
|
|
df_dataset = dataset.to_pandas() |
|
|
|
|
|
subjects = df_dataset['Subject'].unique() |
|
|
|
|
|
template = """Answer the following multiple choice question by giving the most appropriate response. Answer should be one among [A, B, C, D]. |
|
|
|
Question: {Question} |
|
A) {A} |
|
B) {B} |
|
C) {C} |
|
D) {D} |
|
|
|
Answer:""" |
|
|
|
prompt_template = PromptTemplate(template=template, input_variables=['Question', 'A', 'B', 'C', 'D']) |
|
|
|
|
|
overall_correct_predictions = 0 |
|
overall_total_questions = 0 |
|
per_subject_results = [] |
|
detailed_results = [] |
|
|
|
for subject in subjects: |
|
|
|
subject_df = df_dataset[df_dataset['Subject'] == subject] |
|
|
|
|
|
subject_df = subject_df.sample(n=min(num_examples, len(subject_df)), random_state=42) |
|
|
|
|
|
correct_predictions = 0 |
|
total_questions = 0 |
|
|
|
for idx, data in subject_df.iterrows(): |
|
|
|
text = prompt_template.format( |
|
Question=data['Question'], |
|
A=data['A'], |
|
B=data['B'], |
|
C=data['C'], |
|
D=data['D'] |
|
) |
|
|
|
|
|
top_prediction = get_top_prediction(text, tokenizer, model) |
|
is_correct = (top_prediction == data['Answer']) |
|
correct_predictions += int(is_correct) |
|
total_questions += 1 |
|
overall_correct_predictions += int(is_correct) |
|
overall_total_questions +=1 |
|
|
|
detailed_results.append({ |
|
'Subject': subject, |
|
'Question': data['Question'], |
|
'Answer': data['Answer'], |
|
'Prediction': top_prediction, |
|
'Correct': is_correct |
|
}) |
|
|
|
|
|
subject_accuracy = correct_predictions / total_questions if total_questions > 0 else 0 |
|
|
|
per_subject_results.append({ |
|
'Subject': subject, |
|
'Total Score': correct_predictions, |
|
'Total Questions': total_questions, |
|
'Accuracy (%)': subject_accuracy * 100 |
|
}) |
|
|
|
|
|
overall_accuracy = overall_correct_predictions / overall_total_questions if overall_total_questions > 0 else 0 |
|
|
|
|
|
df_per_subject = pd.DataFrame(per_subject_results) |
|
|
|
|
|
df_detailed_results = pd.DataFrame(detailed_results) |
|
|
|
return overall_accuracy, df_per_subject, df_detailed_results |
|
|
|
except Exception as e: |
|
return f"Error: {str(e)}", pd.DataFrame(), pd.DataFrame() |
|
|
|
def add_new_eval( |
|
model: str, |
|
base_model: str, |
|
revision: str, |
|
precision: str, |
|
weight_type: str, |
|
model_type: str, |
|
num_examples: int |
|
): |
|
global REQUESTED_MODELS |
|
global USERS_TO_SUBMISSION_DATES |
|
if not REQUESTED_MODELS: |
|
REQUESTED_MODELS, USERS_TO_SUBMISSION_DATES = already_submitted_models(EVAL_REQUESTS_PATH) |
|
|
|
user_name = "" |
|
model_path = model |
|
if "/" in model: |
|
user_name = model.split("/")[0] |
|
model_path = model.split("/")[1] |
|
|
|
precision = precision.split(" ")[0] |
|
current_time = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") |
|
|
|
if model_type is None or model_type == "": |
|
return styled_error("Please select a model type.") |
|
|
|
|
|
if revision == "": |
|
revision = "main" |
|
|
|
|
|
if weight_type in ["Delta", "Adapter"]: |
|
base_model_on_hub, error, _ = is_model_on_hub(model_name=base_model, revision=revision, token=TOKEN, test_tokenizer=True) |
|
if not base_model_on_hub: |
|
return styled_error(f'Base model "{base_model}" {error}') |
|
|
|
if not weight_type == "Adapter": |
|
model_on_hub, error, _ = is_model_on_hub(model_name=model, revision=revision, token=TOKEN, test_tokenizer=True) |
|
if not model_on_hub: |
|
return styled_error(f'Model "{model}" {error}') |
|
|
|
|
|
try: |
|
model_info = API.model_info(repo_id=model, revision=revision) |
|
except Exception: |
|
return styled_error("Could not get your model information. Please fill it up properly.") |
|
|
|
model_size = get_model_size(model_info=model_info, precision=precision) |
|
|
|
|
|
try: |
|
license = model_info.cardData["license"] |
|
except Exception: |
|
return styled_error("Please select a license for your model") |
|
|
|
modelcard_OK, error_msg = check_model_card(model) |
|
if not modelcard_OK: |
|
return styled_error(error_msg) |
|
|
|
|
|
if f"{model}_{revision}_{precision}" in REQUESTED_MODELS: |
|
return styled_warning("This model has been already submitted.") |
|
|
|
|
|
try: |
|
overall_accuracy, df_per_subject, df_detailed_results = evaluate_model_accuracy(model, int(num_examples)) |
|
if isinstance(overall_accuracy, str) and overall_accuracy.startswith("Error"): |
|
return styled_error(overall_accuracy) |
|
except Exception as e: |
|
return styled_error(f"An error occurred during evaluation: {str(e)}") |
|
|
|
|
|
results_dict = { |
|
"config": { |
|
"model_name": model, |
|
"model_sha": revision, |
|
"model_dtype": precision, |
|
"submitted_time": current_time, |
|
"model_type": model_type, |
|
"weight_type": weight_type, |
|
"license": license, |
|
"likes": model_info.likes, |
|
"params": model_size, |
|
"still_on_hub": True, |
|
"precision": precision, |
|
}, |
|
"results": { |
|
"average": overall_accuracy * 100, |
|
}, |
|
} |
|
|
|
|
|
for idx, row in df_per_subject.iterrows(): |
|
subject_name = row['Subject'] |
|
accuracy = row['Accuracy (%)'] |
|
results_dict['results'][subject_name] = accuracy |
|
|
|
|
|
results_file_path = f"{EVAL_RESULTS_PATH}/{model.replace('/', '_')}_results.json" |
|
with open(results_file_path, "w") as f: |
|
json.dump(results_dict, f) |
|
|
|
|
|
API.upload_file( |
|
path_or_fileobj=results_file_path, |
|
path_in_repo=results_file_path.split(f"{EVAL_RESULTS_PATH}/")[1], |
|
repo_id=RESULTS_REPO, |
|
repo_type="dataset", |
|
commit_message=f"Add results for {model}" |
|
) |
|
|
|
|
|
os.remove(results_file_path) |
|
|
|
return styled_message("Your model has been evaluated and the results are now on the leaderboard!") |
|
|