KlaudiaTH
Refactorings and fixes for tab handling and few-shot selection
625e239
raw
history blame
5.88 kB
import itertools
import os
import gradio as gr
import numpy as np
import pandas as pd
import plotly.express as px
from datasets import load_dataset
import style
ZERO_SHOT_ONLY = ["BELEBELE"]
FEW_SHOT_ONLY = ["GSM8K", "TruthfulQA"]
def init():
global repo_id, config_name, split_name, hidden_df, task_group_names_list, task_group_type_dict, task_groups_shots_dict, languages_list, model_type_dict
repo_id = os.getenv("OGX_LEADERBOARD_DATASET_NAME")
config_name = os.getenv("OGX_LEADERBOARD_DATASET_CONFIG")
split_name = os.getenv("OGX_LEADERBOARD_DATASET_SPLIT")
dataset = load_dataset(repo_id, config_name, split=split_name)
hidden_df = dataset.to_pandas()
task_group_names_list = hidden_df["Task_Group"].unique().tolist()
task_group_type_df = hidden_df[["Task_Group", "Task_Type"]].drop_duplicates()
task_group_type_dict = task_group_type_df.set_index("Task_Group")["Task_Type"].to_dict()
task_groups_shots_df = hidden_df[hidden_df["Few_Shot"] == True][["Task_Group", "Number_Shots"]].drop_duplicates()
task_groups_shots_dict = task_groups_shots_df.set_index("Task_Group")["Number_Shots"].to_dict()
languages_list = hidden_df["Language"].drop_duplicates().str.upper().tolist()
model_type_df = hidden_df[["Model_Name", "Model_Type"]].drop_duplicates()
model_type_dict = model_type_df.set_index("Model_Name")["Model_Type"].to_dict()
hidden_df = hidden_df.pivot_table(
columns=["Task_Group", "Few_Shot", "Language"],
index=["Model_Name"],
values="Value",
dropna=False,
).reset_index(inplace=False)
hidden_df["Type"] = hidden_df["Model_Name"].apply(lambda x: style.T_SYMBOLS[model_type_dict[x]])
def sort_cols(df: pd.DataFrame, fewshot: bool = False) -> pd.DataFrame:
task_cols = get_task_columns(df)
return df.reindex(["Type", "Model_Name", "Average"] + sorted(task_cols), axis=1)
def get_task_columns(df: pd.DataFrame) -> pd.DataFrame:
l = list(df.columns)
l.remove("Model_Name")
l.remove("Average")
l.remove("Type")
return l
def get_models(df: pd.DataFrame) -> pd.DataFrame:
return df["Model_Name"].unique()
def filter_type(df: pd.DataFrame, model_types: list[str]) -> pd.DataFrame:
"""Keep only rows for which model type is in list of types"""
return df[df["Type"].isin(model_types)]
def search_model(df: pd.DataFrame, query: str) -> pd.DataFrame:
"""Keep only rows for which model name matches search query"""
query = query.replace(";", "|")
return df[df["Model_Name"].str.contains(query, case=False)]
def aggregate_langs(df: pd.DataFrame, tasks: list, langs: list):
"""Aggregates results over langs for each task in tasks.
If a language does not exist for a task, the aggregate for
that task will be shown as NaN.
"""
langs_lower = [item.lower() for item in langs]
df.columns = ["_".join(filter(None, col)) for col in df.columns]
colset = set(df.columns)
for t in tasks:
cols = [(f"{a}_{b}") for a, b in itertools.product([t], langs_lower)]
if set(cols).issubset(colset):
df.loc[:, t] = df[cols].mean(axis=1, skipna=False)
else:
df.loc[:, t] = np.nan
df.loc[:, "Average"] = df[tasks].mean(axis=1)
return df[["Type", "Model_Name", "Average"] + tasks]
def select_shots(df: pd.DataFrame, fewshot: bool = False):
cols = [col for col in df.columns if col[1] == fewshot] + []
# Move model name and type icon to the end
cols.append(("Model_Name", "", ""))
cols.append(("Type", "", ""))
return df[cols].droplevel(level=1, axis="columns")
def update_df(
tasks: list[str],
model_query: str,
langs: list[str],
model_types: list[str],
fewshot: bool = False,
format: bool = True,
) -> pd.DataFrame:
"""Return a filtered dataframe according to selected models, tasks and
languages. The format flag controls whether the output dataframe should
be formatted to tw significant figures.
"""
# keep only selected shots
df = select_shots(hidden_df, fewshot)
# aggregate results over languages per task
df = aggregate_langs(df, tasks, langs)
# filter models by search bar and model type
df = search_model(df, model_query)
df = filter_type(df, model_types)
if format:
return sort_cols(df, fewshot).style.format(precision=2, decimal=".", na_rep="N/A")
else:
return sort_cols(df, fewshot)
def update_task_groups_and_fewshot(current_selected_tab: int, is_fewshot_current: bool = False):
selected_task_type = get_selected_task_type(current_selected_tab)
available_tasks = get_available_task_groups(selected_task_type, is_fewshot_current)
new_selected_tasks = available_tasks.copy()
tasks_checkbox_group_update = gr.CheckboxGroup(
choices=available_tasks,
value=new_selected_tasks,
)
if current_selected_tab == 0:
is_fewshot_new = is_fewshot_current
fewshot_available = True
elif current_selected_tab == 1:
is_fewshot_new = False
fewshot_available = False
fewshot_radio_update = gr.Radio(
value=is_fewshot_new,
interactive=fewshot_available,
)
return [tasks_checkbox_group_update, fewshot_radio_update, current_selected_tab]
def get_selected_task_type(task_type_id):
task_types = {0: "accuracy", 1: "misc"}
selected_task_type = task_types[task_type_id]
return selected_task_type
def get_available_task_groups(selected_task_type, fewshot):
task_groups = [task_group_name for task_group_name, task_type in task_group_type_dict.items() if task_type == selected_task_type]
if fewshot:
available_tasks = [c for c in task_groups if c not in ZERO_SHOT_ONLY]
else:
available_tasks = [c for c in task_groups if c not in FEW_SHOT_ONLY]
return available_tasks
init()