import itertools import os import gradio as gr import numpy as np import pandas as pd import plotly.express as px from datasets import load_dataset import style ZERO_SHOT_ONLY = ["BELEBELE"] FEW_SHOT_ONLY = ["GSM8K", "TruthfulQA"] def init(): global repo_id, config_name, split_name, hidden_df, task_group_names_list, task_group_type_dict, task_groups_shots_dict, languages_list, model_type_dict repo_id = os.getenv("OGX_LEADERBOARD_DATASET_NAME") config_name = os.getenv("OGX_LEADERBOARD_DATASET_CONFIG") split_name = os.getenv("OGX_LEADERBOARD_DATASET_SPLIT") dataset = load_dataset(repo_id, config_name, split=split_name) hidden_df = dataset.to_pandas() task_group_names_list = hidden_df["Task_Group"].unique().tolist() task_group_type_df = hidden_df[["Task_Group", "Task_Type"]].drop_duplicates() task_group_type_dict = task_group_type_df.set_index("Task_Group")["Task_Type"].to_dict() task_groups_shots_df = hidden_df[hidden_df["Few_Shot"] == True][["Task_Group", "Number_Shots"]].drop_duplicates() task_groups_shots_dict = task_groups_shots_df.set_index("Task_Group")["Number_Shots"].to_dict() languages_list = hidden_df["Language"].drop_duplicates().str.upper().tolist() model_type_df = hidden_df[["Model_Name", "Model_Type"]].drop_duplicates() model_type_dict = model_type_df.set_index("Model_Name")["Model_Type"].to_dict() hidden_df = hidden_df.pivot_table( columns=["Task_Group", "Few_Shot", "Language"], index=["Model_Name"], values="Value", dropna=False, ).reset_index(inplace=False) hidden_df["Type"] = hidden_df["Model_Name"].apply(lambda x: style.T_SYMBOLS[model_type_dict[x]]) def sort_cols(df: pd.DataFrame, fewshot: bool = False) -> pd.DataFrame: task_cols = get_task_columns(df) return df.reindex(["Type", "Model_Name", "Average"] + sorted(task_cols), axis=1) def get_task_columns(df: pd.DataFrame) -> pd.DataFrame: l = list(df.columns) l.remove("Model_Name") l.remove("Average") l.remove("Type") return l def get_models(df: pd.DataFrame) -> pd.DataFrame: return df["Model_Name"].unique() def filter_type(df: pd.DataFrame, model_types: list[str]) -> pd.DataFrame: """Keep only rows for which model type is in list of types""" return df[df["Type"].isin(model_types)] def search_model(df: pd.DataFrame, query: str) -> pd.DataFrame: """Keep only rows for which model name matches search query""" query = query.replace(";", "|") return df[df["Model_Name"].str.contains(query, case=False)] def aggregate_langs(df: pd.DataFrame, tasks: list, langs: list): """Aggregates results over langs for each task in tasks. If a language does not exist for a task, the aggregate for that task will be shown as NaN. """ langs_lower = [item.lower() for item in langs] df.columns = ["_".join(filter(None, col)) for col in df.columns] colset = set(df.columns) for t in tasks: cols = [(f"{a}_{b}") for a, b in itertools.product([t], langs_lower)] if set(cols).issubset(colset): df.loc[:, t] = df[cols].mean(axis=1, skipna=False) else: df.loc[:, t] = np.nan df.loc[:, "Average"] = df[tasks].mean(axis=1) return df[["Type", "Model_Name", "Average"] + tasks] def select_shots(df: pd.DataFrame, fewshot: bool = False): cols = [col for col in df.columns if col[1] == fewshot] + [] # Move model name and type icon to the end cols.append(("Model_Name", "", "")) cols.append(("Type", "", "")) return df[cols].droplevel(level=1, axis="columns") def update_df( tasks: list[str], model_query: str, langs: list[str], model_types: list[str], fewshot: bool = False, format: bool = True, ) -> pd.DataFrame: """Return a filtered dataframe according to selected models, tasks and languages. The format flag controls whether the output dataframe should be formatted to tw significant figures. """ # keep only selected shots df = select_shots(hidden_df, fewshot) # aggregate results over languages per task df = aggregate_langs(df, tasks, langs) # filter models by search bar and model type df = search_model(df, model_query) df = filter_type(df, model_types) if format: return sort_cols(df, fewshot).style.format(precision=2, decimal=".", na_rep="N/A") else: return sort_cols(df, fewshot) def update_task_groups_and_fewshot(current_selected_tab: int, is_fewshot_current: bool = False): selected_task_type = get_selected_task_type(current_selected_tab) available_tasks = get_available_task_groups(selected_task_type, is_fewshot_current) new_selected_tasks = available_tasks.copy() tasks_checkbox_group_update = gr.CheckboxGroup( choices=available_tasks, value=new_selected_tasks, ) if current_selected_tab == 0: is_fewshot_new = is_fewshot_current fewshot_available = True elif current_selected_tab == 1: is_fewshot_new = False fewshot_available = False fewshot_radio_update = gr.Radio( value=is_fewshot_new, interactive=fewshot_available, ) return [tasks_checkbox_group_update, fewshot_radio_update, current_selected_tab] def get_selected_task_type(task_type_id): task_types = {0: "accuracy", 1: "misc"} selected_task_type = task_types[task_type_id] return selected_task_type def get_available_task_groups(selected_task_type, fewshot): task_groups = [task_group_name for task_group_name, task_type in task_group_type_dict.items() if task_type == selected_task_type] if fewshot: available_tasks = [c for c in task_groups if c not in ZERO_SHOT_ONLY] else: available_tasks = [c for c in task_groups if c not in FEW_SHOT_ONLY] return available_tasks init()