File size: 5,876 Bytes
2b62c4c 625e239 2b62c4c 65504f2 2b62c4c 625e239 2b62c4c 625e239 2b62c4c 625e239 2b62c4c 625e239 2b62c4c 625e239 2b62c4c |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 |
import itertools
import os
import gradio as gr
import numpy as np
import pandas as pd
import plotly.express as px
from datasets import load_dataset
import style
ZERO_SHOT_ONLY = ["BELEBELE"]
FEW_SHOT_ONLY = ["GSM8K", "TruthfulQA"]
def init():
global repo_id, config_name, split_name, hidden_df, task_group_names_list, task_group_type_dict, task_groups_shots_dict, languages_list, model_type_dict
repo_id = os.getenv("OGX_LEADERBOARD_DATASET_NAME")
config_name = os.getenv("OGX_LEADERBOARD_DATASET_CONFIG")
split_name = os.getenv("OGX_LEADERBOARD_DATASET_SPLIT")
dataset = load_dataset(repo_id, config_name, split=split_name)
hidden_df = dataset.to_pandas()
task_group_names_list = hidden_df["Task_Group"].unique().tolist()
task_group_type_df = hidden_df[["Task_Group", "Task_Type"]].drop_duplicates()
task_group_type_dict = task_group_type_df.set_index("Task_Group")["Task_Type"].to_dict()
task_groups_shots_df = hidden_df[hidden_df["Few_Shot"] == True][["Task_Group", "Number_Shots"]].drop_duplicates()
task_groups_shots_dict = task_groups_shots_df.set_index("Task_Group")["Number_Shots"].to_dict()
languages_list = hidden_df["Language"].drop_duplicates().str.upper().tolist()
model_type_df = hidden_df[["Model_Name", "Model_Type"]].drop_duplicates()
model_type_dict = model_type_df.set_index("Model_Name")["Model_Type"].to_dict()
hidden_df = hidden_df.pivot_table(
columns=["Task_Group", "Few_Shot", "Language"],
index=["Model_Name"],
values="Value",
dropna=False,
).reset_index(inplace=False)
hidden_df["Type"] = hidden_df["Model_Name"].apply(lambda x: style.T_SYMBOLS[model_type_dict[x]])
def sort_cols(df: pd.DataFrame, fewshot: bool = False) -> pd.DataFrame:
task_cols = get_task_columns(df)
return df.reindex(["Type", "Model_Name", "Average"] + sorted(task_cols), axis=1)
def get_task_columns(df: pd.DataFrame) -> pd.DataFrame:
l = list(df.columns)
l.remove("Model_Name")
l.remove("Average")
l.remove("Type")
return l
def get_models(df: pd.DataFrame) -> pd.DataFrame:
return df["Model_Name"].unique()
def filter_type(df: pd.DataFrame, model_types: list[str]) -> pd.DataFrame:
"""Keep only rows for which model type is in list of types"""
return df[df["Type"].isin(model_types)]
def search_model(df: pd.DataFrame, query: str) -> pd.DataFrame:
"""Keep only rows for which model name matches search query"""
query = query.replace(";", "|")
return df[df["Model_Name"].str.contains(query, case=False)]
def aggregate_langs(df: pd.DataFrame, tasks: list, langs: list):
"""Aggregates results over langs for each task in tasks.
If a language does not exist for a task, the aggregate for
that task will be shown as NaN.
"""
langs_lower = [item.lower() for item in langs]
df.columns = ["_".join(filter(None, col)) for col in df.columns]
colset = set(df.columns)
for t in tasks:
cols = [(f"{a}_{b}") for a, b in itertools.product([t], langs_lower)]
if set(cols).issubset(colset):
df.loc[:, t] = df[cols].mean(axis=1, skipna=False)
else:
df.loc[:, t] = np.nan
df.loc[:, "Average"] = df[tasks].mean(axis=1)
return df[["Type", "Model_Name", "Average"] + tasks]
def select_shots(df: pd.DataFrame, fewshot: bool = False):
cols = [col for col in df.columns if col[1] == fewshot] + []
# Move model name and type icon to the end
cols.append(("Model_Name", "", ""))
cols.append(("Type", "", ""))
return df[cols].droplevel(level=1, axis="columns")
def update_df(
tasks: list[str],
model_query: str,
langs: list[str],
model_types: list[str],
fewshot: bool = False,
format: bool = True,
) -> pd.DataFrame:
"""Return a filtered dataframe according to selected models, tasks and
languages. The format flag controls whether the output dataframe should
be formatted to tw significant figures.
"""
# keep only selected shots
df = select_shots(hidden_df, fewshot)
# aggregate results over languages per task
df = aggregate_langs(df, tasks, langs)
# filter models by search bar and model type
df = search_model(df, model_query)
df = filter_type(df, model_types)
if format:
return sort_cols(df, fewshot).style.format(precision=2, decimal=".", na_rep="N/A")
else:
return sort_cols(df, fewshot)
def update_task_groups_and_fewshot(current_selected_tab: int, is_fewshot_current: bool = False):
selected_task_type = get_selected_task_type(current_selected_tab)
available_tasks = get_available_task_groups(selected_task_type, is_fewshot_current)
new_selected_tasks = available_tasks.copy()
tasks_checkbox_group_update = gr.CheckboxGroup(
choices=available_tasks,
value=new_selected_tasks,
)
if current_selected_tab == 0:
is_fewshot_new = is_fewshot_current
fewshot_available = True
elif current_selected_tab == 1:
is_fewshot_new = False
fewshot_available = False
fewshot_radio_update = gr.Radio(
value=is_fewshot_new,
interactive=fewshot_available,
)
return [tasks_checkbox_group_update, fewshot_radio_update, current_selected_tab]
def get_selected_task_type(task_type_id):
task_types = {0: "accuracy", 1: "misc"}
selected_task_type = task_types[task_type_id]
return selected_task_type
def get_available_task_groups(selected_task_type, fewshot):
task_groups = [task_group_name for task_group_name, task_type in task_group_type_dict.items() if task_type == selected_task_type]
if fewshot:
available_tasks = [c for c in task_groups if c not in ZERO_SHOT_ONLY]
else:
available_tasks = [c for c in task_groups if c not in FEW_SHOT_ONLY]
return available_tasks
init()
|