Omartificial-Intelligence-Space's picture
update submit
c842ee2 verified
raw
history blame
11.1 kB
import os
# Set environment variable for better memory management
os.environ["PYTORCH_CUDA_ALLOC_CONF"] = "max_split_size_mb:32"
import json
from datetime import datetime, timezone
import random
import torch
import pandas as pd
import numpy as np
from datasets import load_dataset
from transformers import AutoTokenizer, AutoModelForCausalLM
from langchain.prompts import PromptTemplate
from src.display.formatting import styled_error, styled_message, styled_warning
from src.envs import API, EVAL_REQUESTS_PATH, TOKEN, QUEUE_REPO, EVAL_RESULTS_PATH, RESULTS_REPO
from src.submission.check_validity import (
already_submitted_models,
check_model_card,
get_model_size,
is_model_on_hub,
)
import spaces
REQUESTED_MODELS = None
USERS_TO_SUBMISSION_DATES = None
# List of subjects to exclude from evaluation
excluded_subjects = [
"human_sexuality",
"professional_psychology",
"moral_disputes",
"public_relations",
"jurisprudence",
"human_aging",
"world_religions"
]
def get_top_prediction(batch_texts, tokenizer, model):
inputs = tokenizer(batch_texts, return_tensors='pt', padding=True, truncation=True)
if torch.cuda.is_available():
model = model.cuda()
inputs = {k: v.cuda() for k, v in inputs.items()}
else:
model = model.cpu()
inputs = {k: v.cpu() for k, v in inputs.items()}
with torch.no_grad():
outputs = model(**inputs)
logits = outputs.logits[:, -1, :] # Get logits of the last token for each input in the batch
options = [' A', ' B', ' C', ' D']
predictions = []
for i in range(len(batch_texts)):
option_logits = []
for option in options:
option_ids = tokenizer(option).input_ids
if option_ids and option_ids[-1] < logits.size(1):
option_logit = logits[i, option_ids[-1]].item()
option_logits.append((option_logit, option.strip()))
else:
print(f"Skipping option '{option}' due to index out of range for input {i}.")
if not option_logits:
predictions.append("No valid options")
else:
top_option = max(option_logits, key=lambda x: x[0])[1]
predictions.append(top_option)
return predictions
@spaces.GPU(duration=120)
def evaluate_model_accuracy_by_subject(model_name, num_questions_per_subject=100, batch_size=32):
try:
# Load the model and tokenizer
tokenizer = AutoTokenizer.from_pretrained(model_name, trust_remote_code=True)
tokenizer.pad_token = tokenizer.eos_token
model = AutoModelForCausalLM.from_pretrained(
model_name,
trust_remote_code=True
)
# Convert model to FP16 (half precision) to reduce memory usage
model = model.half()
if torch.cuda.is_available():
model = model.cuda() # Move model to GPU if available
else:
model = model.cpu()
# Load your custom MMMLU dataset from HuggingFace
dataset = load_dataset("Omartificial-Intelligence-Space/Arabic_Openai_MMMLU")
dataset = dataset['test']
# Filter out excluded subjects
dataset = dataset.filter(lambda x: x['Subject'] not in excluded_subjects)
# Define prompt template
template = """Answer the following multiple choice question by giving the most appropriate response. Answer should be one among [A, B, C, D].
Question: {Question}
A) {A}
B) {B}
C) {C}
D) {D}
Answer:"""
prompt_template = PromptTemplate(template=template, input_variables=['Question', 'A', 'B', 'C', 'D'])
# Initialize results storage
subject_results = {}
overall_correct_predictions = 0
overall_total_questions = 0
subjects = dataset.unique('Subject')
for subject in subjects:
subject_data = dataset.filter(lambda x: x['Subject'] == subject)
# Sample num_questions_per_subject from each subject
if num_questions_per_subject > 0:
if len(subject_data) < num_questions_per_subject:
print(f"Warning: Not enough questions for subject '{subject}'. Using all available questions.")
selected_indices = range(len(subject_data))
else:
selected_indices = random.sample(range(len(subject_data)), num_questions_per_subject)
subject_data = subject_data.select(selected_indices)
correct_predictions = 0
total_questions = 0
results = []
model.eval()
# Batch processing
for i in range(0, len(subject_data), batch_size):
batch_data = subject_data[i:i + batch_size]
# Generate batch texts
batch_texts = [
prompt_template.format(
Question=batch_data['Question'][j],
A=batch_data['A'][j],
B=batch_data['B'][j],
C=batch_data['C'][j],
D=batch_data['D'][j]
) for j in range(len(batch_data['Question']))
]
# Get the top predictions for the batch
batch_predictions = get_top_prediction(batch_texts, tokenizer, model)
for j in range(len(batch_data['Question'])):
top_prediction = batch_predictions[j]
is_correct = (top_prediction == batch_data['Answer'][j])
correct_predictions += int(is_correct)
total_questions += 1
overall_correct_predictions += int(is_correct)
overall_total_questions += 1
results.append({
'Question': batch_data['Question'][j],
'Answer': batch_data['Answer'][j],
'Prediction': top_prediction,
'Correct': is_correct
})
# Clear GPU memory after processing each subject
torch.cuda.empty_cache()
accuracy = correct_predictions / total_questions if total_questions > 0 else 0
# Store results for this subject
subject_results[subject] = {
'Correct Predictions': correct_predictions,
'Total Questions': total_questions,
'Accuracy': accuracy * 100,
'Results DataFrame': pd.DataFrame(results)
}
overall_accuracy = (overall_correct_predictions / overall_total_questions) * 100 if overall_total_questions > 0 else 0
return overall_accuracy, subject_results
except Exception as e:
import traceback
tb = traceback.format_exc()
print(f"Error in evaluate_model_accuracy_by_subject: {e}\n{tb}")
return f"Error: {str(e)}", {}
def add_new_eval(
model: str,
base_model: str,
revision: str,
precision: str,
weight_type: str,
model_type: str
):
global REQUESTED_MODELS
global USERS_TO_SUBMISSION_DATES
if not REQUESTED_MODELS:
REQUESTED_MODELS, USERS_TO_SUBMISSION_DATES = already_submitted_models(EVAL_REQUESTS_PATH)
user_name = ""
model_path = model
if "/" in model:
user_name = model.split("/")[0]
model_path = model.split("/")[1]
precision = precision.split(" ")[0]
current_time = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
if model_type is None or model_type == "":
return styled_error("Please select a model type.")
# Does the model actually exist?
if revision == "":
revision = "main"
# Is the model on the hub?
if weight_type in ["Delta", "Adapter"]:
base_model_on_hub, error, _ = is_model_on_hub(model_name=base_model, revision=revision, token=TOKEN, test_tokenizer=True)
if not base_model_on_hub:
return styled_error(f'Base model "{base_model}" {error}')
if not weight_type == "Adapter":
model_on_hub, error, _ = is_model_on_hub(model_name=model, revision=revision, token=TOKEN, test_tokenizer=True)
if not model_on_hub:
return styled_error(f'Model "{model}" {error}')
# Is the model info correctly filled?
try:
model_info = API.model_info(repo_id=model, revision=revision)
except Exception:
return styled_error("Could not get your model information. Please fill it up properly.")
model_size = get_model_size(model_info=model_info, precision=precision)
# Were the model card and license filled?
try:
license = model_info.cardData["license"]
except Exception:
return styled_error("Please select a license for your model")
modelcard_OK, error_msg = check_model_card(model)
if not modelcard_OK:
return styled_error(error_msg)
# Check for duplicate submission
if f"{model}_{revision}_{precision}" in REQUESTED_MODELS:
return styled_warning("This model has been already submitted.")
# Now, perform the evaluation
try:
overall_accuracy, subject_results = evaluate_model_accuracy_by_subject(model, num_questions_per_subject=100, batch_size=32)
if isinstance(overall_accuracy, str) and overall_accuracy.startswith("Error"):
return styled_error(overall_accuracy)
except Exception as e:
return styled_error(f"An error occurred during evaluation: {str(e)}")
# Prepare results for storage
results_dict = {
"config": {
"model_name": model,
"base_model": base_model,
"revision": revision,
"precision": precision,
"weight_type": weight_type,
"model_type": model_type,
"submitted_time": current_time,
"license": license,
"likes": model_info.likes,
"params": model_size,
"still_on_hub": True,
},
"results": {
"average": overall_accuracy,
},
}
# Include per-subject accuracies
for subject, data in subject_results.items():
accuracy = data['Accuracy']
results_dict['results'][subject] = accuracy
# Save results to a JSON file
results_file_path = f"{EVAL_RESULTS_PATH}/{model.replace('/', '_')}_results.json"
with open(results_file_path, "w") as f:
json.dump(results_dict, f, indent=4)
# Upload the results file
API.upload_file(
path_or_fileobj=results_file_path,
path_in_repo=results_file_path.split(f"{EVAL_RESULTS_PATH}/")[1],
repo_id=RESULTS_REPO,
repo_type="dataset",
commit_message=f"Add results for {model}"
)
# Remove the local results file
os.remove(results_file_path)
return styled_message("Your model has been evaluated and the results are now on the leaderboard!")