|
import os |
|
|
|
|
|
|
|
os.environ["PYTORCH_CUDA_ALLOC_CONF"] = "max_split_size_mb:32" |
|
|
|
|
|
import json |
|
from datetime import datetime, timezone |
|
import random |
|
|
|
|
|
|
|
|
|
import torch |
|
import pandas as pd |
|
import numpy as np |
|
from datasets import load_dataset |
|
from transformers import AutoTokenizer, AutoModelForCausalLM |
|
from langchain.prompts import PromptTemplate |
|
|
|
|
|
|
|
|
|
from src.display.formatting import styled_error, styled_message, styled_warning |
|
from src.envs import API, EVAL_REQUESTS_PATH, TOKEN, QUEUE_REPO, EVAL_RESULTS_PATH, RESULTS_REPO |
|
from src.submission.check_validity import ( |
|
already_submitted_models, |
|
check_model_card, |
|
get_model_size, |
|
is_model_on_hub, |
|
) |
|
|
|
|
|
|
|
|
|
import spaces |
|
|
|
|
|
|
|
|
|
REQUESTED_MODELS = None |
|
USERS_TO_SUBMISSION_DATES = None |
|
|
|
|
|
|
|
|
|
|
|
excluded_subjects = [ |
|
"human_sexuality", |
|
"professional_psychology", |
|
"moral_disputes", |
|
"public_relations", |
|
"jurisprudence", |
|
"human_aging", |
|
"world_religions" |
|
] |
|
|
|
|
|
|
|
|
|
def get_top_prediction(batch_texts, tokenizer, model): |
|
inputs = tokenizer(batch_texts, return_tensors='pt', padding=True, truncation=True) |
|
if torch.cuda.is_available(): |
|
model = model.cuda() |
|
inputs = {k: v.cuda() for k, v in inputs.items()} |
|
else: |
|
model = model.cpu() |
|
inputs = {k: v.cpu() for k, v in inputs.items()} |
|
|
|
|
|
|
|
|
|
with torch.no_grad(): |
|
outputs = model(**inputs) |
|
logits = outputs.logits[:, -1, :] |
|
|
|
|
|
|
|
|
|
options = [' A', ' B', ' C', ' D'] |
|
predictions = [] |
|
|
|
|
|
|
|
|
|
for i in range(len(batch_texts)): |
|
option_logits = [] |
|
for option in options: |
|
option_ids = tokenizer(option).input_ids |
|
if option_ids and option_ids[-1] < logits.size(1): |
|
option_logit = logits[i, option_ids[-1]].item() |
|
option_logits.append((option_logit, option.strip())) |
|
else: |
|
print(f"Skipping option '{option}' due to index out of range for input {i}.") |
|
|
|
|
|
|
|
|
|
if not option_logits: |
|
predictions.append("No valid options") |
|
else: |
|
top_option = max(option_logits, key=lambda x: x[0])[1] |
|
predictions.append(top_option) |
|
|
|
|
|
|
|
|
|
return predictions |
|
|
|
|
|
|
|
|
|
@spaces.GPU(duration=120) |
|
def evaluate_model_accuracy_by_subject(model_name, num_questions_per_subject=100, batch_size=32): |
|
try: |
|
|
|
tokenizer = AutoTokenizer.from_pretrained(model_name, trust_remote_code=True) |
|
tokenizer.pad_token = tokenizer.eos_token |
|
|
|
|
|
|
|
|
|
model = AutoModelForCausalLM.from_pretrained( |
|
model_name, |
|
trust_remote_code=True |
|
) |
|
|
|
|
|
|
|
model = model.half() |
|
|
|
|
|
|
|
|
|
if torch.cuda.is_available(): |
|
model = model.cuda() |
|
else: |
|
model = model.cpu() |
|
|
|
|
|
|
|
|
|
|
|
dataset = load_dataset("Omartificial-Intelligence-Space/Arabic_Openai_MMMLU") |
|
dataset = dataset['test'] |
|
|
|
|
|
|
|
|
|
|
|
dataset = dataset.filter(lambda x: x['Subject'] not in excluded_subjects) |
|
|
|
|
|
|
|
|
|
|
|
template = """Answer the following multiple choice question by giving the most appropriate response. Answer should be one among [A, B, C, D]. |
|
Question: {Question} |
|
A) {A} |
|
B) {B} |
|
C) {C} |
|
D) {D} |
|
Answer:""" |
|
|
|
|
|
|
|
|
|
prompt_template = PromptTemplate(template=template, input_variables=['Question', 'A', 'B', 'C', 'D']) |
|
|
|
|
|
|
|
|
|
|
|
subject_results = {} |
|
overall_correct_predictions = 0 |
|
overall_total_questions = 0 |
|
|
|
|
|
|
|
|
|
subjects = dataset.unique('Subject') |
|
for subject in subjects: |
|
subject_data = dataset.filter(lambda x: x['Subject'] == subject) |
|
|
|
|
|
|
|
|
|
|
|
if num_questions_per_subject > 0: |
|
if len(subject_data) < num_questions_per_subject: |
|
print(f"Warning: Not enough questions for subject '{subject}'. Using all available questions.") |
|
selected_indices = range(len(subject_data)) |
|
else: |
|
selected_indices = random.sample(range(len(subject_data)), num_questions_per_subject) |
|
subject_data = subject_data.select(selected_indices) |
|
|
|
|
|
|
|
|
|
correct_predictions = 0 |
|
total_questions = 0 |
|
results = [] |
|
|
|
|
|
|
|
|
|
model.eval() |
|
|
|
for i in range(0, len(subject_data), batch_size): |
|
batch_data = subject_data[i:i + batch_size] |
|
|
|
|
|
batch_texts = [ |
|
prompt_template.format( |
|
Question=batch_data['Question'][j], |
|
A=batch_data['A'][j], |
|
B=batch_data['B'][j], |
|
C=batch_data['C'][j], |
|
D=batch_data['D'][j] |
|
) for j in range(len(batch_data['Question'])) |
|
] |
|
|
|
|
|
|
|
|
|
|
|
batch_predictions = get_top_prediction(batch_texts, tokenizer, model) |
|
|
|
|
|
|
|
|
|
for j in range(len(batch_data['Question'])): |
|
top_prediction = batch_predictions[j] |
|
is_correct = (top_prediction == batch_data['Answer'][j]) |
|
correct_predictions += int(is_correct) |
|
total_questions += 1 |
|
overall_correct_predictions += int(is_correct) |
|
overall_total_questions += 1 |
|
|
|
|
|
|
|
|
|
results.append({ |
|
'Question': batch_data['Question'][j], |
|
'Answer': batch_data['Answer'][j], |
|
'Prediction': top_prediction, |
|
'Correct': is_correct |
|
}) |
|
|
|
|
|
|
|
|
|
|
|
torch.cuda.empty_cache() |
|
|
|
|
|
|
|
|
|
accuracy = correct_predictions / total_questions if total_questions > 0 else 0 |
|
|
|
|
|
|
|
|
|
|
|
subject_results[subject] = { |
|
'Correct Predictions': correct_predictions, |
|
'Total Questions': total_questions, |
|
'Accuracy': accuracy * 100, |
|
'Results DataFrame': pd.DataFrame(results) |
|
} |
|
|
|
|
|
|
|
|
|
overall_accuracy = (overall_correct_predictions / overall_total_questions) * 100 if overall_total_questions > 0 else 0 |
|
|
|
|
|
|
|
|
|
return overall_accuracy, subject_results |
|
|
|
|
|
|
|
|
|
except Exception as e: |
|
import traceback |
|
tb = traceback.format_exc() |
|
print(f"Error in evaluate_model_accuracy_by_subject: {e}\n{tb}") |
|
return f"Error: {str(e)}", {} |
|
|
|
|
|
|
|
|
|
def add_new_eval( |
|
model: str, |
|
base_model: str, |
|
revision: str, |
|
precision: str, |
|
weight_type: str, |
|
model_type: str |
|
): |
|
global REQUESTED_MODELS |
|
global USERS_TO_SUBMISSION_DATES |
|
if not REQUESTED_MODELS: |
|
REQUESTED_MODELS, USERS_TO_SUBMISSION_DATES = already_submitted_models(EVAL_REQUESTS_PATH) |
|
|
|
|
|
|
|
|
|
user_name = "" |
|
model_path = model |
|
if "/" in model: |
|
user_name = model.split("/")[0] |
|
model_path = model.split("/")[1] |
|
|
|
|
|
|
|
|
|
precision = precision.split(" ")[0] |
|
current_time = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") |
|
|
|
|
|
|
|
|
|
if model_type is None or model_type == "": |
|
return styled_error("Please select a model type.") |
|
|
|
|
|
|
|
|
|
|
|
if revision == "": |
|
revision = "main" |
|
|
|
|
|
|
|
|
|
|
|
if weight_type in ["Delta", "Adapter"]: |
|
base_model_on_hub, error, _ = is_model_on_hub(model_name=base_model, revision=revision, token=TOKEN, test_tokenizer=True) |
|
if not base_model_on_hub: |
|
return styled_error(f'Base model "{base_model}" {error}') |
|
|
|
|
|
|
|
|
|
if not weight_type == "Adapter": |
|
model_on_hub, error, _ = is_model_on_hub(model_name=model, revision=revision, token=TOKEN, test_tokenizer=True) |
|
if not model_on_hub: |
|
return styled_error(f'Model "{model}" {error}') |
|
|
|
|
|
|
|
|
|
|
|
try: |
|
model_info = API.model_info(repo_id=model, revision=revision) |
|
except Exception: |
|
return styled_error("Could not get your model information. Please fill it up properly.") |
|
|
|
|
|
|
|
|
|
model_size = get_model_size(model_info=model_info, precision=precision) |
|
|
|
|
|
|
|
|
|
|
|
try: |
|
license = model_info.cardData["license"] |
|
except Exception: |
|
return styled_error("Please select a license for your model") |
|
|
|
|
|
|
|
|
|
modelcard_OK, error_msg = check_model_card(model) |
|
if not modelcard_OK: |
|
return styled_error(error_msg) |
|
|
|
|
|
|
|
|
|
|
|
if f"{model}_{revision}_{precision}" in REQUESTED_MODELS: |
|
return styled_warning("This model has been already submitted.") |
|
|
|
|
|
|
|
|
|
|
|
try: |
|
overall_accuracy, subject_results = evaluate_model_accuracy_by_subject(model, num_questions_per_subject=100, batch_size=32) |
|
if isinstance(overall_accuracy, str) and overall_accuracy.startswith("Error"): |
|
return styled_error(overall_accuracy) |
|
except Exception as e: |
|
return styled_error(f"An error occurred during evaluation: {str(e)}") |
|
|
|
|
|
|
|
|
|
|
|
results_dict = { |
|
"config": { |
|
"model_name": model, |
|
"base_model": base_model, |
|
"revision": revision, |
|
"precision": precision, |
|
"weight_type": weight_type, |
|
"model_type": model_type, |
|
"submitted_time": current_time, |
|
"license": license, |
|
"likes": model_info.likes, |
|
"params": model_size, |
|
"still_on_hub": True, |
|
}, |
|
"results": { |
|
"average": overall_accuracy, |
|
}, |
|
} |
|
|
|
|
|
|
|
|
|
|
|
for subject, data in subject_results.items(): |
|
accuracy = data['Accuracy'] |
|
results_dict['results'][subject] = accuracy |
|
|
|
|
|
|
|
|
|
|
|
results_file_path = f"{EVAL_RESULTS_PATH}/{model.replace('/', '_')}_results.json" |
|
with open(results_file_path, "w") as f: |
|
json.dump(results_dict, f, indent=4) |
|
|
|
|
|
|
|
|
|
|
|
API.upload_file( |
|
path_or_fileobj=results_file_path, |
|
path_in_repo=results_file_path.split(f"{EVAL_RESULTS_PATH}/")[1], |
|
repo_id=RESULTS_REPO, |
|
repo_type="dataset", |
|
commit_message=f"Add results for {model}" |
|
) |
|
|
|
|
|
os.remove(results_file_path) |
|
|
|
return styled_message("Your model has been evaluated and the results are now on the leaderboard!") |