MMEB / utils.py
wenhu's picture
init commit (#1)
2016488 verified
raw
history blame
5.25 kB
import pandas as pd
import gradio as gr
import csv
import json
import os
import shutil
from huggingface_hub import Repository
HF_TOKEN = os.environ.get("HF_TOKEN")
SUBJECTS = ["Classification", "VQA", "Retrieval", "Grounding"]
MODEL_INFO = [
"Models", "Model Size(B)", "Data Source",
"Overall", "IND", "OOD",
"Classification", "VQA", "Retrieval", "Grounding"
]
DATA_TITLE_TYPE = ['markdown', 'str', 'markdown', 'number', 'number', 'number', 'number', 'number', 'number', 'number']
# TODO: submission process not implemented yet
SUBMISSION_NAME = ""
SUBMISSION_URL = ""
CSV_DIR = "results.csv" # TODO: Temporary file, to be updated with the actual file
COLUMN_NAMES = MODEL_INFO
LEADERBOARD_INTRODUCTION = """# MMEB Leaderboard
## Introduction
We introduce MMEB, a benchmark for multimodal evaluation of models. The benchmark consists of four tasks: Classification, VQA, Retrieval, and Grounding. Models are evaluated based on 36 datasets.
"""
TABLE_INTRODUCTION = """"""
LEADERBOARD_INFO = """
## Dataset Summary
"""
CITATION_BUTTON_LABEL = "Copy the following snippet to cite these results"
CITATION_BUTTON_TEXT = """"""
SUBMIT_INTRODUCTION = """# Submit on MMEB Leaderboard Introduction
## ⚠ Please note that you need to submit the JSON file with the following format:
```json
[
{
"question_id": 123,
"question": "abc",
"options": ["abc", "xyz", ...],
"answer": "ABC",
"answer_index": 1,
"category": "abc,
"pred": "B",
"model_outputs": ""
}, ...
]
```
...
"""
def get_df():
# TODO: Update this after the hf dataset has been created!
# repo = Repository(local_dir=SUBMISSION_NAME, clone_from=SUBMISSION_URL, use_auth_token=HF_TOKEN)
# repo.git_pull()
df = pd.read_csv(CSV_DIR)
df['Model Size(B)'] = df['Model Size(B)'].apply(process_model_size)
df = df.sort_values(by=['Overall'], ascending=False)
return df
def add_new_eval(
input_file,
):
if input_file is None:
return "Error! Empty file!"
upload_data = json.loads(input_file)
print("upload_data:\n", upload_data)
data_row = [f'{upload_data["Model"]}', upload_data['Overall']]
for subject in SUBJECTS:
data_row += [upload_data[subject]]
print("data_row:\n", data_row)
submission_repo = Repository(local_dir=SUBMISSION_NAME, clone_from=SUBMISSION_URL,
use_auth_token=HF_TOKEN, repo_type="dataset")
submission_repo.git_pull()
already_submitted = []
with open(CSV_DIR, mode='r') as file:
reader = csv.reader(file, delimiter=',')
for row in reader:
already_submitted.append(row[0])
if data_row[0] not in already_submitted:
with open(CSV_DIR, mode='a', newline='') as file:
writer = csv.writer(file)
writer.writerow(data_row)
submission_repo.push_to_hub()
print('Submission Successful')
else:
print('The entry already exists')
def refresh_data():
df = get_df()
return df[COLUMN_NAMES]
def search_and_filter_models(df, query, min_size, max_size):
filtered_df = df.copy()
if query:
filtered_df = filtered_df[filtered_df['Models'].str.contains(query, case=False, na=False)]
size_mask = filtered_df['Model Size(B)'].apply(lambda x:
(min_size <= 1000.0 <= max_size) if x == 'unknown'
else (min_size <= x <= max_size))
filtered_df = filtered_df[size_mask]
return filtered_df[COLUMN_NAMES]
# def search_and_filter_models(df, query, min_size, max_size):
# filtered_df = df.copy()
# if query:
# filtered_df = filtered_df[filtered_df['Models'].str.contains(query, case=False, na=False)]
# def size_filter(x):
# if isinstance(x, (int, float)):
# return min_size <= x <= max_size
# return True
# filtered_df = filtered_df[filtered_df['Model Size(B)'].apply(size_filter)]
# return filtered_df[COLUMN_NAMES]
def search_models(df, query):
if query:
return df[df['Models'].str.contains(query, case=False, na=False)]
return df
# def get_size_range(df):
# numeric_sizes = df[df['Model Size(B)'].apply(lambda x: isinstance(x, (int, float)))]['Model Size(B)']
# if len(numeric_sizes) > 0:
# return float(numeric_sizes.min()), float(numeric_sizes.max())
# return 0, 1000
def get_size_range(df):
sizes = df['Model Size(B)'].apply(lambda x: 1000.0 if x == 'unknown' else x)
return float(sizes.min()), float(sizes.max())
def process_model_size(size):
if pd.isna(size) or size == 'unk':
return 'unknown'
try:
val = float(size)
return val
except (ValueError, TypeError):
return 'unknown'
def filter_columns_by_subjects(df, selected_subjects=None):
if selected_subjects is None or len(selected_subjects) == 0:
return df[COLUMN_NAMES]
base_columns = ['Models', 'Model Size(B)', 'Data Source', 'Overall']
selected_columns = base_columns + selected_subjects
available_columns = [col for col in selected_columns if col in df.columns]
return df[available_columns]
def get_subject_choices():
return SUBJECTS