Spaces:
Runtime error
Runtime error
Tristan Thrush
specified version in requirements.txt, added all selections to url, fixed demo message in aggrid
38045c5
import pandas as pd | |
import streamlit as st | |
from huggingface_hub import HfApi, hf_hub_download | |
from huggingface_hub.repocard import metadata_load | |
from utils import ascending_metrics, metric_ranges | |
import numpy as np | |
from st_aggrid import AgGrid, GridOptionsBuilder, JsCode | |
from os.path import exists | |
import threading | |
st.set_page_config(layout="wide") | |
def get_model_infos(): | |
api = HfApi() | |
model_infos = api.list_models(filter="model-index", cardData=True) | |
return model_infos | |
def parse_metric_value(value): | |
if isinstance(value, str): | |
"".join(value.split("%")) | |
try: | |
value = float(value) | |
except: # noqa: E722 | |
value = None | |
elif isinstance(value, list): | |
if len(value) > 0: | |
value = value[0] | |
else: | |
value = None | |
value = round(value, 4) if isinstance(value, float) else None | |
return value | |
def parse_metrics_rows(meta, only_verified=False): | |
if not isinstance(meta["model-index"], list) or len(meta["model-index"]) == 0 or "results" not in meta["model-index"][0]: | |
return None | |
for result in meta["model-index"][0]["results"]: | |
if not isinstance(result, dict) or "dataset" not in result or "metrics" not in result or "type" not in result["dataset"]: | |
continue | |
dataset = result["dataset"]["type"] | |
if dataset == "": | |
continue | |
row = {"dataset": dataset, "split": "-unspecified-", "config": "-unspecified-"} | |
if "split" in result["dataset"]: | |
row["split"] = result["dataset"]["split"] | |
if "config" in result["dataset"]: | |
row["config"] = result["dataset"]["config"] | |
no_results = True | |
incorrect_results = False | |
for metric in result["metrics"]: | |
name = metric["type"].lower().strip() | |
if name in ("model_id", "dataset", "split", "config", "pipeline_tag", "only_verified"): | |
# Metrics are not allowed to be named "dataset", "split", "config", "pipeline_tag" | |
continue | |
value = parse_metric_value(metric.get("value", None)) | |
if value is None: | |
continue | |
if name in row: | |
new_metric_better = value < row[name] if name in ascending_metrics else value > row[name] | |
if name not in row or new_metric_better: | |
# overwrite the metric if the new value is better. | |
if only_verified: | |
if "verified" in metric and metric["verified"]: | |
no_results = False | |
row[name] = value | |
if name in metric_ranges: | |
if value < metric_ranges[name][0] or value > metric_ranges[name][1]: | |
incorrect_results = True | |
else: | |
no_results = False | |
row[name] = value | |
if name in metric_ranges: | |
if value < metric_ranges[name][0] or value > metric_ranges[name][1]: | |
incorrect_results = True | |
if no_results or incorrect_results: | |
continue | |
yield row | |
def get_data_wrapper(): | |
def get_data(dataframe=None, verified_dataframe=None): | |
data = [] | |
verified_data = [] | |
print("getting model infos") | |
model_infos = get_model_infos() | |
print("got model infos") | |
for model_info in model_infos: | |
meta = model_info.cardData | |
if meta is None: | |
continue | |
for row in parse_metrics_rows(meta): | |
if row is None: | |
continue | |
row["model_id"] = model_info.id | |
row["pipeline_tag"] = model_info.pipeline_tag | |
row["only_verified"] = False | |
data.append(row) | |
for row in parse_metrics_rows(meta, only_verified=True): | |
if row is None: | |
continue | |
row["model_id"] = model_info.id | |
row["pipeline_tag"] = model_info.pipeline_tag | |
row["only_verified"] = True | |
data.append(row) | |
dataframe = pd.DataFrame.from_records(data) | |
dataframe.to_pickle("cache.pkl") | |
if exists("cache.pkl"): | |
# If we have saved the results previously, call an asynchronous process | |
# to fetch the results and update the saved file. Don't make users wait | |
# while we fetch the new results. Instead, display the old results for | |
# now. The new results should be loaded when this method | |
# is called again. | |
dataframe = pd.read_pickle("cache.pkl") | |
t = threading.Thread(name="get_data procs", target=get_data) | |
t.start() | |
else: | |
# We have to make the users wait during the first startup of this app. | |
get_data() | |
dataframe = pd.read_pickle("cache.pkl") | |
return dataframe | |
dataframe = get_data_wrapper() | |
st.markdown("# 🤗 Leaderboards") | |
query_params = st.experimental_get_query_params() | |
if "first_query_params" not in st.session_state: | |
st.session_state.first_query_params = query_params | |
first_query_params = st.session_state.first_query_params | |
default_task = first_query_params.get("task", [None])[0] | |
default_only_verified = bool(int(first_query_params.get("only_verified", [0])[0])) | |
print(default_only_verified) | |
default_dataset = first_query_params.get("dataset", [None])[0] | |
default_split = first_query_params.get("split", [None])[0] | |
default_config = first_query_params.get("config", [None])[0] | |
only_verified_results = st.sidebar.checkbox( | |
"Filter for Verified Results", | |
value=default_only_verified, | |
help="Select this checkbox if you want to see only results produced by the Hugging Face model evaluator, and no self-reported results." | |
) | |
selectable_tasks = list(set(dataframe.pipeline_tag)) | |
if None in selectable_tasks: | |
selectable_tasks.remove(None) | |
selectable_tasks.sort(key=lambda name: name.lower()) | |
selectable_tasks = ["-any-"] + selectable_tasks | |
task = st.sidebar.selectbox( | |
"Task", | |
selectable_tasks, | |
index=(selectable_tasks).index(default_task) if default_task in selectable_tasks else 0, | |
help="Filter the selectable datasets by task. Leave as \"-any-\" to see all selectable datasets." | |
) | |
if task != "-any-": | |
dataframe = dataframe[dataframe.pipeline_tag == task] | |
selectable_datasets = ["-any-"] + sorted(list(set(dataframe.dataset.tolist())), key=lambda name: name.lower()) | |
if "" in selectable_datasets: | |
selectable_datasets.remove("") | |
dataset = st.sidebar.selectbox( | |
"Dataset", | |
selectable_datasets, | |
index=selectable_datasets.index(default_dataset) if default_dataset in selectable_datasets else 0, | |
help="Select a dataset to see the leaderboard!" | |
) | |
dataframe = dataframe[dataframe.only_verified == only_verified_results] | |
st.experimental_set_query_params(**{"dataset": [dataset], "only_verified": [int(only_verified_results)], "task": [task]}) | |
if dataset != "-any-": | |
dataset_df = dataframe[dataframe.dataset == dataset] | |
else: | |
dataset_df = dataframe | |
dataset_df = dataset_df.dropna(axis="columns", how="all") | |
if len(dataset_df) > 0: | |
selectable_configs = list(set(dataset_df["config"])) | |
if dataset != "-any-": | |
config = st.sidebar.selectbox( | |
"Config", | |
selectable_configs, | |
index=selectable_configs.index(default_config) if default_config in selectable_configs else 0, | |
help="Filter the results on the current leaderboard by the dataset config. Self-reported results might not report the config, which is why \"-unspecified-\" is an option." | |
) | |
dataset_df = dataset_df[dataset_df.config == config] | |
selectable_splits = list(set(dataset_df["split"])) | |
split = st.sidebar.selectbox( | |
"Split", | |
selectable_splits, | |
index=selectable_splits.index(default_split) if default_split in selectable_splits else 0, | |
help="Filter the results on the current leaderboard by the dataset split. Self-reported results might not report the split, which is why \"-unspecified-\" is an option." | |
) | |
st.experimental_set_query_params(**{"dataset": [dataset], "only_verified": [int(only_verified_results)], "task": [task], "config": [config], "split": [split]}) | |
dataset_df = dataset_df[dataset_df.split == split] | |
not_selectable_metrics = ["model_id", "dataset", "split", "config", "pipeline_tag", "only_verified"] | |
selectable_metrics = list(filter(lambda column: column not in not_selectable_metrics, dataset_df.columns)) | |
dataset_df = dataset_df.filter(["model_id"] + (["dataset"] if dataset == "-any-" else []) + selectable_metrics) | |
dataset_df = dataset_df.dropna(thresh=2) # Want at least two non-na values (one for model_id and one for a metric). | |
sorting_metric = st.sidebar.radio( | |
"Sorting Metric", | |
selectable_metrics, | |
help="Select the metric to sort the leaderboard by. Click on the metric name in the leaderboard to reverse the sorting order." | |
) | |
st.markdown( | |
"Please click on the model's name to be redirected to its model card." | |
) | |
st.markdown( | |
"Want to beat the leaderboard? Don't see your model here? Simply request an automatic evaluation [here](https://huggingface.co/spaces/autoevaluate/model-evaluator)." | |
) | |
st.markdown( | |
"If you do not see your self-reported results here, ensure that your results are in the expected range for all metrics. E.g., accuracy is 0-1, not 0-100." | |
) | |
if dataset == "-any-": | |
st.info( | |
"Note: you haven't chosen a dataset, so the leaderboard is showing the best scoring model for each dataset." | |
) | |
# Make the default metric appear right after model names and dataset names | |
cols = dataset_df.columns.tolist() | |
cols.remove(sorting_metric) | |
sorting_metric_index = 1 if dataset != "-any-" else 2 | |
cols = cols[:sorting_metric_index] + [sorting_metric] + cols[sorting_metric_index:] | |
dataset_df = dataset_df[cols] | |
# Sort the leaderboard, giving the sorting metric highest priority and then ordering by other metrics in the case of equal values. | |
dataset_df = dataset_df.sort_values(by=cols[sorting_metric_index:], ascending=[metric in ascending_metrics for metric in cols[sorting_metric_index:]]) | |
dataset_df = dataset_df.replace(np.nan, '-') | |
# If dataset is "-any-", only show the best model for each dataset. Otherwise | |
# The leaderboard is way too long and doesn't give the users a feel for all of | |
# the datasets available for a task. | |
if dataset == "-any-": | |
filtered_dataset_df_dict = {column: [] for column in dataset_df.columns} | |
seen_datasets = set() | |
for _, row in dataset_df.iterrows(): | |
if row["dataset"] not in seen_datasets: | |
for column in dataset_df.columns: | |
filtered_dataset_df_dict[column].append(row[column]) | |
seen_datasets.add(row["dataset"]) | |
dataset_df = pd.DataFrame(filtered_dataset_df_dict) | |
# Make the leaderboard | |
gb = GridOptionsBuilder.from_dataframe(dataset_df) | |
gb.configure_default_column(sortable=False) | |
gb.configure_column( | |
"model_id", | |
cellRenderer=JsCode('''function(params) {return '<a target="_blank" href="https://huggingface.co/'+params.value+'">'+params.value+'</a>'}'''), | |
) | |
if dataset == "-any-": | |
gb.configure_column( | |
"dataset", | |
cellRenderer=JsCode('''function(params) {return '<a target="_blank" href="https://huggingface.co/spaces/autoevaluate/leaderboards?dataset='+params.value+'">'+params.value+'</a>'}'''), | |
) | |
for name in selectable_metrics: | |
gb.configure_column(name, type=["numericColumn","numberColumnFilter","customNumericFormat"], precision=4, aggFunc='sum') | |
gb.configure_column( | |
sorting_metric, | |
sortable=True, | |
cellStyle=JsCode('''function(params) { return {'backgroundColor': '#FFD21E'}}''') | |
) | |
go = gb.build() | |
fit_columns = len(dataset_df.columns) < 10 | |
AgGrid(dataset_df, gridOptions=go, height=28*len(dataset_df) + (35 if fit_columns else 41), allow_unsafe_jscode=True, fit_columns_on_grid_load=fit_columns, enable_enterprise_modules=False) | |
else: | |
st.markdown( | |
"No " + ("verified" if only_verified_results else "unverified") + " results to display. Try toggling the verified results filter." | |
) |