import json import os from datetime import datetime, timezone import torch import pandas as pd import numpy as np from datasets import load_dataset from transformers import AutoTokenizer, AutoModelForCausalLM from langchain.prompts import PromptTemplate from src.display.formatting import styled_error, styled_message, styled_warning from src.envs import API, EVAL_REQUESTS_PATH, TOKEN, QUEUE_REPO, EVAL_RESULTS_PATH, RESULTS_REPO from src.submission.check_validity import ( already_submitted_models, check_model_card, get_model_size, is_model_on_hub, ) REQUESTED_MODELS = None USERS_TO_SUBMISSION_DATES = None # List of subjects to exclude from evaluation excluded_subjects = [ "human_sexuality", "professional_psychology", "moral_disputes", "public_relations", "jurisprudence", "human_aging", "world_religions" ] def get_top_prediction(text, tokenizer, model): inputs = tokenizer(text, return_tensors='pt') if torch.cuda.is_available(): model = model.cuda() inputs = {k: v.cuda() for k, v in inputs.items()} else: model = model.cpu() inputs = {k: v.cpu() for k, v in inputs.items()} with torch.no_grad(): outputs = model(**inputs) logits = outputs.logits[0, -1] # Get logits of the last token options = [' A', ' B', ' C', ' D'] option_logits = [] # Iterate through each option for option in options: option_ids = tokenizer(option).input_ids # Ensure option_ids are within range and not empty if option_ids and option_ids[-1] < logits.size(0): option_id = option_ids[-1] option_logit = logits[option_id] option_logits.append((option_logit.item(), option.strip())) else: print(f"Skipping option '{option}' due to index out of range.") if not option_logits: return "No valid options" # Get the option with the highest logit top_option = max(option_logits, key=lambda x: x[0])[1] return top_option def evaluate_model_accuracy_by_subject(model_name, num_examples): try: # Load the model and tokenizer tokenizer = AutoTokenizer.from_pretrained(model_name, trust_remote_code=True) tokenizer.pad_token = tokenizer.eos_token model = AutoModelForCausalLM.from_pretrained( model_name, trust_remote_code=True ) if torch.cuda.is_available(): model = model.cuda() # Move model to GPU if available else: model = model.cpu() # Load your custom MMMLU dataset dataset = load_dataset("Omartificial-Intelligence-Space/Arabic_Openai_MMMLU") dataset = dataset['test'] # Filter out excluded subjects dataset = dataset.filter(lambda x: x['Subject'] not in excluded_subjects) # Define prompt template template = """Answer the following multiple choice question by giving the most appropriate response. Answer should be one among [A, B, C, D]. Question: {Question} A) {A} B) {B} C) {C} D) {D} Answer:""" prompt_template = PromptTemplate(template=template, input_variables=['Question', 'A', 'B', 'C', 'D']) # Initialize results storage subject_results = {} subjects = dataset.unique('Subject') overall_correct_predictions = 0 overall_total_questions = 0 for subject in subjects: subject_data = dataset.filter(lambda x: x['Subject'] == subject) # Sample num_examples from each subject if num_examples > 0: subject_data = subject_data.shuffle().select(range(min(num_examples, len(subject_data)))) correct_predictions = 0 total_questions = 0 results = [] for data in subject_data: # Prepare text input text = prompt_template.format( Question=data['Question'], A=data['A'], B=data['B'], C=data['C'], D=data['D'] ) # Get the top prediction top_prediction = get_top_prediction(text, tokenizer, model) is_correct = (top_prediction == data['Answer']) correct_predictions += int(is_correct) total_questions += 1 overall_correct_predictions += int(is_correct) overall_total_questions += 1 results.append({ 'Question': data['Question'], 'Answer': data['Answer'], 'Prediction': top_prediction, 'Correct': is_correct }) accuracy = correct_predictions / total_questions if total_questions > 0 else 0 # Store results for this subject subject_results[subject] = { 'Correct Predictions': correct_predictions, 'Total Questions': total_questions, 'Accuracy': accuracy * 100, 'Results DataFrame': pd.DataFrame(results) } overall_accuracy = (overall_correct_predictions / overall_total_questions) * 100 if overall_total_questions > 0 else 0 return overall_accuracy, subject_results except Exception as e: import traceback tb = traceback.format_exc() print(f"Error in evaluate_model_accuracy_by_subject: {e}\n{tb}") return f"Error: {str(e)}", {} def add_new_eval( model: str, base_model: str, revision: str, precision: str, weight_type: str, model_type: str, num_examples: int ): global REQUESTED_MODELS global USERS_TO_SUBMISSION_DATES if not REQUESTED_MODELS: REQUESTED_MODELS, USERS_TO_SUBMISSION_DATES = already_submitted_models(EVAL_REQUESTS_PATH) user_name = "" model_path = model if "/" in model: user_name = model.split("/")[0] model_path = model.split("/")[1] precision = precision.split(" ")[0] current_time = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") if model_type is None or model_type == "": return styled_error("Please select a model type.") # Does the model actually exist? if revision == "": revision = "main" # Is the model on the hub? if weight_type in ["Delta", "Adapter"]: base_model_on_hub, error, _ = is_model_on_hub(model_name=base_model, revision=revision, token=TOKEN, test_tokenizer=True) if not base_model_on_hub: return styled_error(f'Base model "{base_model}" {error}') if not weight_type == "Adapter": model_on_hub, error, _ = is_model_on_hub(model_name=model, revision=revision, token=TOKEN, test_tokenizer=True) if not model_on_hub: return styled_error(f'Model "{model}" {error}') # Is the model info correctly filled? try: model_info = API.model_info(repo_id=model, revision=revision) except Exception: return styled_error("Could not get your model information. Please fill it up properly.") model_size = get_model_size(model_info=model_info, precision=precision) # Were the model card and license filled? try: license = model_info.cardData["license"] except Exception: return styled_error("Please select a license for your model") modelcard_OK, error_msg = check_model_card(model) if not modelcard_OK: return styled_error(error_msg) # Check for duplicate submission if f"{model}_{revision}_{precision}" in REQUESTED_MODELS: return styled_warning("This model has been already submitted.") # Now, perform the evaluation try: overall_accuracy, subject_results = evaluate_model_accuracy_by_subject(model, int(num_examples)) if isinstance(overall_accuracy, str) and overall_accuracy.startswith("Error"): return styled_error(overall_accuracy) except Exception as e: return styled_error(f"An error occurred during evaluation: {str(e)}") # Prepare results for storage results_dict = { "config": { "model_name": model, "model_sha": revision, "model_dtype": precision, "submitted_time": current_time, "model_type": model_type, "weight_type": weight_type, "license": license, "likes": model_info.likes, "params": model_size, "still_on_hub": True, "precision": precision, }, "results": { "average": overall_accuracy, }, } # Include per-subject accuracies for subject, data in subject_results.items(): accuracy = data['Accuracy'] results_dict['results'][subject] = accuracy # Save results to a JSON file results_file_path = f"{EVAL_RESULTS_PATH}/{model.replace('/', '_')}_results.json" with open(results_file_path, "w") as f: json.dump(results_dict, f) # Upload the results file API.upload_file( path_or_fileobj=results_file_path, path_in_repo=results_file_path.split(f"{EVAL_RESULTS_PATH}/")[1], repo_id=RESULTS_REPO, repo_type="dataset", commit_message=f"Add results for {model}" ) # Remove the local results file os.remove(results_file_path) return styled_message("Your model has been evaluated and the results are now on the leaderboard!")