import json import os from datetime import datetime, timezone import torch import pandas as pd import numpy as np from datasets import load_dataset from transformers import AutoTokenizer, AutoModelForCausalLM from langchain.prompts import PromptTemplate from src.display.formatting import styled_error, styled_message, styled_warning from src.envs import API, EVAL_REQUESTS_PATH, TOKEN, QUEUE_REPO, EVAL_RESULTS_PATH, RESULTS_REPO from src.submission.check_validity import ( already_submitted_models, check_model_card, get_model_size, is_model_on_hub, ) REQUESTED_MODELS = None USERS_TO_SUBMISSION_DATES = None def get_top_prediction(text, tokenizer, model): inputs = tokenizer(text, return_tensors='pt') if torch.cuda.is_available(): model = model.cuda() inputs = {k: v.cuda() for k, v in inputs.items()} with torch.no_grad(): outputs = model(**inputs) logits = outputs.logits[0, -1] options = [' A', ' B', ' C', ' D'] option_logits = [] for option in options: option_id = tokenizer(option).input_ids[-1] option_logit = logits[option_id] option_logits.append((option_logit.item(), option.strip())) # Get the option with the highest logit top_option = max(option_logits, key=lambda x: x[0])[1] return top_option def evaluate_model_accuracy(model_name, num_examples): try: # Load the model and tokenizer tokenizer = AutoTokenizer.from_pretrained(model_name, trust_remote_code=True) tokenizer.pad_token = tokenizer.eos_token model = AutoModelForCausalLM.from_pretrained( model_name, trust_remote_code=True ) if torch.cuda.is_available(): model = model.cuda() # Move model to GPU if available # Load your dataset dataset = load_dataset("Omartificial-Intelligence-Space/Arabic_Openai_MMMLU") dataset = dataset['test'] # Convert the dataset to a pandas DataFrame for easier manipulation df_dataset = dataset.to_pandas() # Get list of unique subjects subjects = df_dataset['Subject'].unique() # Define prompt template template = """Answer the following multiple choice question by giving the most appropriate response. Answer should be one among [A, B, C, D]. Question: {Question} A) {A} B) {B} C) {C} D) {D} Answer:""" prompt_template = PromptTemplate(template=template, input_variables=['Question', 'A', 'B', 'C', 'D']) # Initialize counters and results overall_correct_predictions = 0 overall_total_questions = 0 per_subject_results = [] detailed_results = [] for subject in subjects: # Filter dataset for the current subject subject_df = df_dataset[df_dataset['Subject'] == subject] # Select up to num_examples questions subject_df = subject_df.sample(n=min(num_examples, len(subject_df)), random_state=42) # Initialize counters for this subject correct_predictions = 0 total_questions = 0 for idx, data in subject_df.iterrows(): # Prepare text input text = prompt_template.format( Question=data['Question'], A=data['A'], B=data['B'], C=data['C'], D=data['D'] ) # Get the top prediction top_prediction = get_top_prediction(text, tokenizer, model) is_correct = (top_prediction == data['Answer']) correct_predictions += int(is_correct) total_questions += 1 overall_correct_predictions += int(is_correct) overall_total_questions +=1 detailed_results.append({ 'Subject': subject, 'Question': data['Question'], 'Answer': data['Answer'], 'Prediction': top_prediction, 'Correct': is_correct }) # Compute accuracy for this subject subject_accuracy = correct_predictions / total_questions if total_questions > 0 else 0 per_subject_results.append({ 'Subject': subject, 'Total Score': correct_predictions, 'Total Questions': total_questions, 'Accuracy (%)': subject_accuracy * 100 }) # Compute overall accuracy overall_accuracy = overall_correct_predictions / overall_total_questions if overall_total_questions > 0 else 0 # Convert per_subject_results to DataFrame df_per_subject = pd.DataFrame(per_subject_results) # Convert detailed_results to DataFrame df_detailed_results = pd.DataFrame(detailed_results) return overall_accuracy, df_per_subject, df_detailed_results except Exception as e: return f"Error: {str(e)}", pd.DataFrame(), pd.DataFrame() def add_new_eval( model: str, base_model: str, revision: str, precision: str, weight_type: str, model_type: str, num_examples: int # New parameter ): global REQUESTED_MODELS global USERS_TO_SUBMISSION_DATES if not REQUESTED_MODELS: REQUESTED_MODELS, USERS_TO_SUBMISSION_DATES = already_submitted_models(EVAL_REQUESTS_PATH) user_name = "" model_path = model if "/" in model: user_name = model.split("/")[0] model_path = model.split("/")[1] precision = precision.split(" ")[0] current_time = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") if model_type is None or model_type == "": return styled_error("Please select a model type.") # Does the model actually exist? if revision == "": revision = "main" # Is the model on the hub? if weight_type in ["Delta", "Adapter"]: base_model_on_hub, error, _ = is_model_on_hub(model_name=base_model, revision=revision, token=TOKEN, test_tokenizer=True) if not base_model_on_hub: return styled_error(f'Base model "{base_model}" {error}') if not weight_type == "Adapter": model_on_hub, error, _ = is_model_on_hub(model_name=model, revision=revision, token=TOKEN, test_tokenizer=True) if not model_on_hub: return styled_error(f'Model "{model}" {error}') # Is the model info correctly filled? try: model_info = API.model_info(repo_id=model, revision=revision) except Exception: return styled_error("Could not get your model information. Please fill it up properly.") model_size = get_model_size(model_info=model_info, precision=precision) # Were the model card and license filled? try: license = model_info.cardData["license"] except Exception: return styled_error("Please select a license for your model") modelcard_OK, error_msg = check_model_card(model) if not modelcard_OK: return styled_error(error_msg) # Check for duplicate submission if f"{model}_{revision}_{precision}" in REQUESTED_MODELS: return styled_warning("This model has been already submitted.") # Now, perform the evaluation try: overall_accuracy, df_per_subject, df_detailed_results = evaluate_model_accuracy(model, int(num_examples)) if isinstance(overall_accuracy, str) and overall_accuracy.startswith("Error"): return styled_error(overall_accuracy) except Exception as e: return styled_error(f"An error occurred during evaluation: {str(e)}") # Prepare results for storage results_dict = { "config": { "model_name": model, "model_sha": revision, "model_dtype": precision, "submitted_time": current_time, "model_type": model_type, "weight_type": weight_type, "license": license, "likes": model_info.likes, "params": model_size, "still_on_hub": True, "precision": precision, }, "results": { "average": overall_accuracy * 100, }, } # Include per-subject accuracies for idx, row in df_per_subject.iterrows(): subject_name = row['Subject'] accuracy = row['Accuracy (%)'] results_dict['results'][subject_name] = accuracy # Save results to a JSON file results_file_path = f"{EVAL_RESULTS_PATH}/{model.replace('/', '_')}_results.json" with open(results_file_path, "w") as f: json.dump(results_dict, f) # Upload the results file API.upload_file( path_or_fileobj=results_file_path, path_in_repo=results_file_path.split(f"{EVAL_RESULTS_PATH}/")[1], repo_id=RESULTS_REPO, repo_type="dataset", commit_message=f"Add results for {model}" ) # Remove the local results file os.remove(results_file_path) return styled_message("Your model has been evaluated and the results are now on the leaderboard!")