|
import gradio as gr |
|
import json |
|
import pandas as pd |
|
from collections import defaultdict |
|
import copy as cp |
|
from urllib.request import urlopen, URLError |
|
import re |
|
from datetime import datetime |
|
import time |
|
|
|
|
|
CITATION_BUTTON_TEXT = r"""@misc{2023opencompass, |
|
title={OpenCompass: A Universal Evaluation Platform for Foundation Models}, |
|
author={OpenCompass Contributors}, |
|
howpublished = {\url{https://github.com/open-compass/opencompass}}, |
|
year={2023} |
|
}, |
|
}""" |
|
CITATION_BUTTON_LABEL = "Copy the following snippet to cite these results" |
|
OPENCOMPASS_README = ( |
|
'https://raw.githubusercontent.com/open-compass/opencompass/main/README.md' |
|
) |
|
GITHUB_REPO = 'https://github.com/open-compass/opencompass' |
|
GITHUB_RAW = 'https://raw.githubusercontent.com/open-compass/opencompass' |
|
GITHUB_BLOB = 'https://github.com/open-compass/opencompass/blob' |
|
|
|
|
|
DATA_URL_BASE = "http://opencompass.oss-cn-shanghai.aliyuncs.com/assets/research-rank/research-data.REALTIME." |
|
|
|
def find_latest_data_url(): |
|
"""Find the latest available data URL by trying different dates.""" |
|
today = datetime.now() |
|
|
|
for i in range(365): |
|
date = today.replace(day=today.day - i) |
|
date_str = date.strftime("%Y%m%d") |
|
url = f"{DATA_URL_BASE}{date_str}.json" |
|
try: |
|
urlopen(url) |
|
return url, date_str |
|
except URLError: |
|
continue |
|
|
|
return None, None |
|
|
|
def get_latest_data(): |
|
"""Get latest data URL and update time""" |
|
data_url, update_time = find_latest_data_url() |
|
if not data_url: |
|
raise Exception("Could not find valid data URL") |
|
formatted_update_time = datetime.strptime(update_time, "%Y%m%d").strftime("%Y-%m-%d") |
|
return data_url, formatted_update_time |
|
|
|
|
|
def get_leaderboard_title(update_time): |
|
return f"# CompassAcademic Leaderboard (Last Updated: {update_time})" |
|
|
|
MAIN_LEADERBOARD_DESCRIPTION = """## Main Evaluation Results |
|
The CompassAcademic currently focuses on the comprehensive reasoning abilities of LLMs. |
|
- The datasets selected so far include General Knowledge Reasoning (MMLU-Pro/GPQA-Diamond), Logical Reasoning (BBH), Mathematical Reasoning (MATH-500, AIME), Code Completion (LiveCodeBench, HumanEval), and Instruction Following (IFEval). |
|
- Currently, the evaluation primarily targets chat models, with updates featuring the latest community models at irregular intervals. |
|
- Prompts and reproduction scripts can be found in [**OpenCompass**: A Toolkit for Evaluation of LLMs](https://github.com/open-compass/opencompass)π. |
|
""" |
|
|
|
def fix_image_urls(content): |
|
"""Fix image URLs in markdown content.""" |
|
|
|
content = content.replace( |
|
'docs/en/_static/image/logo.svg', |
|
'https://raw.githubusercontent.com/open-compass/opencompass/main/docs/en/_static/image/logo.svg', |
|
) |
|
|
|
|
|
content = re.sub( |
|
r'!\[[^\]]*\]\((?!http)([^)]+)\)', |
|
lambda m: f'![{m.group(0)}](https://raw.githubusercontent.com/open-compass/opencompass/main/{m.group(1)})', |
|
content, |
|
) |
|
|
|
return content |
|
|
|
|
|
MODEL_SIZE = ['<10B', '10B-70B', '>70B', 'Unknown'] |
|
MODEL_TYPE = ['API', 'OpenSource'] |
|
|
|
|
|
def load_data(data_url): |
|
response = urlopen(data_url) |
|
data = json.loads(response.read().decode('utf-8')) |
|
return data |
|
|
|
|
|
def build_main_table(data): |
|
df = pd.DataFrame(data['globalData']['OverallTable']) |
|
|
|
|
|
models_data = data['models'] |
|
df['OpenSource'] = df['model'].apply( |
|
lambda x: 'Yes' if models_data[x]['release'] == 'OpenSource' else 'No' |
|
) |
|
|
|
|
|
df['Rank'] = df['Average'].rank(ascending=False, method='min').astype(int) |
|
|
|
columns = { |
|
'Rank': 'Rank', |
|
'model': 'Model', |
|
'org': 'Organization', |
|
'num': 'Parameters', |
|
'OpenSource': 'OpenSource', |
|
'Average': 'Average Score', |
|
'BBH': 'BBH', |
|
'Math-500': 'Math-500', |
|
'AIME': 'AIME', |
|
'MMLU-Pro': 'MMLU-Pro', |
|
'LiveCodeBench': 'LiveCodeBench', |
|
'HumanEval': 'HumanEval', |
|
'GQPA-Diamond': 'GQPA-Diamond', |
|
'IFEval': 'IFEval', |
|
} |
|
df = df[list(columns.keys())].rename(columns=columns) |
|
return df |
|
|
|
|
|
def filter_table(df, size_ranges, model_types): |
|
filtered_df = df.copy() |
|
|
|
|
|
if size_ranges: |
|
|
|
def get_size_in_B(param): |
|
if param == 'N/A': |
|
return None |
|
try: |
|
return float(param.replace('B', '')) |
|
except: |
|
return None |
|
|
|
filtered_df['size_in_B'] = filtered_df['Parameters'].apply( |
|
get_size_in_B |
|
) |
|
|
|
mask = pd.Series(False, index=filtered_df.index) |
|
for size_range in size_ranges: |
|
if size_range == '<10B': |
|
mask |= (filtered_df['size_in_B'] < 10) & ( |
|
filtered_df['size_in_B'].notna() |
|
) |
|
elif size_range == '10B-70B': |
|
mask |= (filtered_df['size_in_B'] >= 10) & ( |
|
filtered_df['size_in_B'] < 70 |
|
) |
|
elif size_range == '>70B': |
|
mask |= filtered_df['size_in_B'] >= 70 |
|
elif size_range == 'Unknown': |
|
mask |= filtered_df['size_in_B'].isna() |
|
|
|
filtered_df = filtered_df[mask] |
|
filtered_df.drop('size_in_B', axis=1, inplace=True) |
|
|
|
|
|
if model_types: |
|
type_mask = pd.Series(False, index=filtered_df.index) |
|
for model_type in model_types: |
|
if model_type == 'API': |
|
type_mask |= filtered_df['OpenSource'] == 'No' |
|
elif model_type == 'OpenSource': |
|
type_mask |= filtered_df['OpenSource'] == 'Yes' |
|
filtered_df = filtered_df[type_mask] |
|
|
|
return filtered_df |
|
|
|
|
|
def calculate_column_widths(df): |
|
"""Dynamically calculate column widths based on content length.""" |
|
column_widths = [] |
|
|
|
for column in df.columns: |
|
|
|
header_length = len(str(column)) |
|
max_content_length = df[column].astype(str).map(len).max() |
|
|
|
|
|
|
|
|
|
|
|
width = max(header_length * 10, max_content_length * 8) + 20 |
|
|
|
|
|
width = max(160, width) |
|
|
|
|
|
width = min(400, width) |
|
|
|
column_widths.append(width) |
|
|
|
return column_widths |
|
|
|
|
|
def create_interface(): |
|
data_url, update_time = get_latest_data() |
|
data = load_data(data_url) |
|
df = build_main_table(data) |
|
title = gr.Markdown(get_leaderboard_title(update_time)) |
|
|
|
with gr.Blocks() as demo: |
|
title_comp = gr.Markdown(get_leaderboard_title(update_time)) |
|
|
|
with gr.Tabs() as tabs: |
|
with gr.TabItem("π
Main Leaderboard", elem_id='main'): |
|
gr.Markdown(MAIN_LEADERBOARD_DESCRIPTION) |
|
|
|
with gr.Row(): |
|
with gr.Column(): |
|
size_filter = gr.CheckboxGroup( |
|
choices=MODEL_SIZE, |
|
value=MODEL_SIZE, |
|
label='Model Size', |
|
interactive=True, |
|
) |
|
with gr.Column(): |
|
type_filter = gr.CheckboxGroup( |
|
choices=MODEL_TYPE, |
|
value=MODEL_TYPE, |
|
label='Model Type', |
|
interactive=True, |
|
) |
|
|
|
with gr.Column(): |
|
table = gr.DataFrame( |
|
value=df.sort_values("Average Score", ascending=False), |
|
interactive=False, |
|
wrap=False, |
|
column_widths=calculate_column_widths(df), |
|
) |
|
|
|
def update_data(): |
|
"""Periodically check for new data and update the interface""" |
|
while True: |
|
time.sleep(300) |
|
try: |
|
new_data_url, new_update_time = get_latest_data() |
|
if new_data_url != data_url: |
|
new_data = load_data(new_data_url) |
|
new_df = build_main_table(new_data) |
|
filtered_df = filter_table(new_df, size_filter.value, type_filter.value) |
|
title_comp.value = get_leaderboard_title(new_update_time) |
|
table.value = filtered_df.sort_values("Average Score", ascending=False) |
|
except Exception as e: |
|
print(f"Error updating data: {e}") |
|
continue |
|
|
|
def update_table(size_ranges, model_types): |
|
filtered_df = filter_table(df, size_ranges, model_types) |
|
return filtered_df.sort_values( |
|
"Average Score", ascending=False |
|
) |
|
|
|
size_filter.change( |
|
fn=update_table, |
|
inputs=[size_filter, type_filter], |
|
outputs=table, |
|
) |
|
|
|
type_filter.change( |
|
fn=update_table, |
|
inputs=[size_filter, type_filter], |
|
outputs=table, |
|
) |
|
|
|
|
|
demo.load(update_data) |
|
|
|
with gr.Row(): |
|
with gr.Accordion("Citation", open=False): |
|
citation_button = gr.Textbox( |
|
value=CITATION_BUTTON_TEXT, |
|
label=CITATION_BUTTON_LABEL, |
|
elem_id='citation-button', |
|
) |
|
|
|
return demo |
|
|
|
|
|
if __name__ == '__main__': |
|
demo = create_interface() |
|
demo.launch(server_name='0.0.0.0') |
|
|