import os # Set environment variable for better memory management os.environ["PYTORCH_CUDA_ALLOC_CONF"] = "max_split_size_mb:32" import json from datetime import datetime, timezone import random import torch import pandas as pd import numpy as np from datasets import load_dataset from transformers import AutoTokenizer, AutoModelForCausalLM from langchain.prompts import PromptTemplate from src.display.formatting import styled_error, styled_message, styled_warning from src.envs import API, EVAL_REQUESTS_PATH, TOKEN, QUEUE_REPO, EVAL_RESULTS_PATH, RESULTS_REPO from src.submission.check_validity import ( already_submitted_models, check_model_card, get_model_size, is_model_on_hub, ) import spaces REQUESTED_MODELS = None USERS_TO_SUBMISSION_DATES = None # List of subjects to exclude from evaluation excluded_subjects = [ "human_sexuality", "professional_psychology", "moral_disputes", "public_relations", "jurisprudence", "human_aging", "world_religions" ] def get_top_prediction(batch_texts, tokenizer, model): inputs = tokenizer(batch_texts, return_tensors='pt', padding=True, truncation=True) if torch.cuda.is_available(): model = model.cuda() inputs = {k: v.cuda() for k, v in inputs.items()} else: model = model.cpu() inputs = {k: v.cpu() for k, v in inputs.items()} with torch.no_grad(): outputs = model(**inputs) logits = outputs.logits[:, -1, :] # Get logits of the last token for each input in the batch options = [' A', ' B', ' C', ' D'] predictions = [] for i in range(len(batch_texts)): option_logits = [] for option in options: option_ids = tokenizer(option).input_ids if option_ids and option_ids[-1] < logits.size(1): option_logit = logits[i, option_ids[-1]].item() option_logits.append((option_logit, option.strip())) else: print(f"Skipping option '{option}' due to index out of range for input {i}.") if not option_logits: predictions.append("No valid options") else: top_option = max(option_logits, key=lambda x: x[0])[1] predictions.append(top_option) return predictions @spaces.GPU(duration=120) def evaluate_model_accuracy_by_subject(model_name, num_questions_per_subject=100, batch_size=32): try: # Load the model and tokenizer tokenizer = AutoTokenizer.from_pretrained(model_name, trust_remote_code=True) tokenizer.pad_token = tokenizer.eos_token model = AutoModelForCausalLM.from_pretrained( model_name, trust_remote_code=True ) # Convert model to FP16 (half precision) to reduce memory usage model = model.half() if torch.cuda.is_available(): model = model.cuda() # Move model to GPU if available else: model = model.cpu() # Load your custom MMMLU dataset from HuggingFace dataset = load_dataset("Omartificial-Intelligence-Space/Arabic_Openai_MMMLU") dataset = dataset['test'] # Filter out excluded subjects dataset = dataset.filter(lambda x: x['Subject'] not in excluded_subjects) # Define prompt template template = """Answer the following multiple choice question by giving the most appropriate response. Answer should be one among [A, B, C, D]. Question: {Question} A) {A} B) {B} C) {C} D) {D} Answer:""" prompt_template = PromptTemplate(template=template, input_variables=['Question', 'A', 'B', 'C', 'D']) # Initialize results storage subject_results = {} overall_correct_predictions = 0 overall_total_questions = 0 subjects = dataset.unique('Subject') for subject in subjects: subject_data = dataset.filter(lambda x: x['Subject'] == subject) # Sample num_questions_per_subject from each subject if num_questions_per_subject > 0: if len(subject_data) < num_questions_per_subject: print(f"Warning: Not enough questions for subject '{subject}'. Using all available questions.") selected_indices = range(len(subject_data)) else: selected_indices = random.sample(range(len(subject_data)), num_questions_per_subject) subject_data = subject_data.select(selected_indices) correct_predictions = 0 total_questions = 0 results = [] model.eval() # Batch processing for i in range(0, len(subject_data), batch_size): batch_data = subject_data[i:i + batch_size] # Generate batch texts batch_texts = [ prompt_template.format( Question=batch_data['Question'][j], A=batch_data['A'][j], B=batch_data['B'][j], C=batch_data['C'][j], D=batch_data['D'][j] ) for j in range(len(batch_data['Question'])) ] # Get the top predictions for the batch batch_predictions = get_top_prediction(batch_texts, tokenizer, model) for j in range(len(batch_data['Question'])): top_prediction = batch_predictions[j] is_correct = (top_prediction == batch_data['Answer'][j]) correct_predictions += int(is_correct) total_questions += 1 overall_correct_predictions += int(is_correct) overall_total_questions += 1 results.append({ 'Question': batch_data['Question'][j], 'Answer': batch_data['Answer'][j], 'Prediction': top_prediction, 'Correct': is_correct }) # Clear GPU memory after processing each subject torch.cuda.empty_cache() accuracy = correct_predictions / total_questions if total_questions > 0 else 0 # Store results for this subject subject_results[subject] = { 'Correct Predictions': correct_predictions, 'Total Questions': total_questions, 'Accuracy': accuracy * 100, 'Results DataFrame': pd.DataFrame(results) } overall_accuracy = (overall_correct_predictions / overall_total_questions) * 100 if overall_total_questions > 0 else 0 return overall_accuracy, subject_results except Exception as e: import traceback tb = traceback.format_exc() print(f"Error in evaluate_model_accuracy_by_subject: {e}\n{tb}") return f"Error: {str(e)}", {} def add_new_eval( model: str, base_model: str, revision: str, precision: str, weight_type: str, model_type: str ): global REQUESTED_MODELS global USERS_TO_SUBMISSION_DATES if not REQUESTED_MODELS: REQUESTED_MODELS, USERS_TO_SUBMISSION_DATES = already_submitted_models(EVAL_REQUESTS_PATH) user_name = "" model_path = model if "/" in model: user_name = model.split("/")[0] model_path = model.split("/")[1] precision = precision.split(" ")[0] current_time = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") if model_type is None or model_type == "": return styled_error("Please select a model type.") # Does the model actually exist? if revision == "": revision = "main" # Is the model on the hub? if weight_type in ["Delta", "Adapter"]: base_model_on_hub, error, _ = is_model_on_hub(model_name=base_model, revision=revision, token=TOKEN, test_tokenizer=True) if not base_model_on_hub: return styled_error(f'Base model "{base_model}" {error}') if not weight_type == "Adapter": model_on_hub, error, _ = is_model_on_hub(model_name=model, revision=revision, token=TOKEN, test_tokenizer=True) if not model_on_hub: return styled_error(f'Model "{model}" {error}') # Is the model info correctly filled? try: model_info = API.model_info(repo_id=model, revision=revision) except Exception: return styled_error("Could not get your model information. Please fill it up properly.") model_size = get_model_size(model_info=model_info, precision=precision) # Were the model card and license filled? try: license = model_info.cardData["license"] except Exception: return styled_error("Please select a license for your model") modelcard_OK, error_msg = check_model_card(model) if not modelcard_OK: return styled_error(error_msg) # Check for duplicate submission if f"{model}_{revision}_{precision}" in REQUESTED_MODELS: return styled_warning("This model has been already submitted.") # Now, perform the evaluation try: overall_accuracy, subject_results = evaluate_model_accuracy_by_subject(model, num_questions_per_subject=100, batch_size=32) if isinstance(overall_accuracy, str) and overall_accuracy.startswith("Error"): return styled_error(overall_accuracy) except Exception as e: return styled_error(f"An error occurred during evaluation: {str(e)}") # Prepare results for storage results_dict = { "config": { "model_name": model, "base_model": base_model, "revision": revision, "precision": precision, "weight_type": weight_type, "model_type": model_type, "submitted_time": current_time, "license": license, "likes": model_info.likes, "params": model_size, "still_on_hub": True, }, "results": { "average": overall_accuracy, }, } # Include per-subject accuracies for subject, data in subject_results.items(): accuracy = data['Accuracy'] results_dict['results'][subject] = accuracy # Save results to a JSON file results_file_path = f"{EVAL_RESULTS_PATH}/{model.replace('/', '_')}_results.json" with open(results_file_path, "w") as f: json.dump(results_dict, f, indent=4) # Upload the results file API.upload_file( path_or_fileobj=results_file_path, path_in_repo=results_file_path.split(f"{EVAL_RESULTS_PATH}/")[1], repo_id=RESULTS_REPO, repo_type="dataset", commit_message=f"Add results for {model}" ) # Remove the local results file os.remove(results_file_path) return styled_message("Your model has been evaluated and the results are now on the leaderboard!")