Spaces:
Runtime error
Runtime error
import os | |
import json | |
import requests | |
import pandas as pd | |
from huggingface_hub import HfApi, hf_hub_download, snapshot_download | |
from huggingface_hub.repocard import metadata_load | |
from tqdm.contrib.concurrent import thread_map | |
from apscheduler.schedulers.background import BackgroundScheduler | |
from tqdm.contrib.concurrent import thread_map | |
DATASET_REPO_URL = "https://huggingface.co/datasets/pkalkman/drlc-leaderboard-data" | |
DATASET_REPO_ID = "pkalkman/drlc-leaderboard-data" | |
HF_TOKEN = os.environ.get("HF_TOKEN") | |
api = HfApi(token=HF_TOKEN) | |
# Read the environments from the JSON file | |
with open('envs.json', 'r') as f: | |
rl_envs = json.load(f) | |
def download_leaderboard_dataset(): | |
# Download the dataset from the Hugging Face Hub | |
path = snapshot_download(repo_id=DATASET_REPO_ID, repo_type="dataset") | |
return path | |
def get_metadata(model_id): | |
try: | |
readme_path = hf_hub_download(model_id, filename="README.md", etag_timeout=180) | |
return metadata_load(readme_path) | |
except requests.exceptions.HTTPError: | |
# 404 README.md not found | |
return None | |
def parse_metrics_accuracy(meta): | |
if "model-index" not in meta: | |
return None | |
result = meta["model-index"][0]["results"] | |
metrics = result[0]["metrics"] | |
accuracy = metrics[0]["value"] | |
return accuracy | |
# We keep the worst case episode | |
def parse_rewards(accuracy): | |
default_std = -1000 | |
default_reward = -1000 | |
if accuracy is not None: | |
accuracy = str(accuracy) | |
parsed = accuracy.split('+/-') | |
if len(parsed) > 1: | |
mean_reward = float(parsed[0].strip()) | |
std_reward = float(parsed[1].strip()) | |
elif len(parsed) == 1: # only mean reward | |
mean_reward = float(parsed[0].strip()) | |
std_reward = float(0) | |
else: | |
mean_reward = float(default_std) | |
std_reward = float(default_reward) | |
else: | |
mean_reward = float(default_std) | |
std_reward = float(default_reward) | |
return mean_reward, std_reward | |
def get_model_ids(rl_env): | |
api = HfApi() | |
models = api.list_models(filter=rl_env) | |
model_ids = [x.modelId for x in models] | |
return model_ids | |
# Parralelized version | |
def update_leaderboard_dataset_parallel(rl_env, path): | |
# Get model ids associated with rl_env | |
model_ids = get_model_ids(rl_env) | |
def process_model(model_id): | |
meta = get_metadata(model_id) | |
# LOADED_MODEL_METADATA[model_id] = meta if meta is not None else '' | |
if meta is None: | |
return None | |
user_id = model_id.split('/')[0] | |
row = {} | |
row["User"] = user_id | |
row["Model"] = model_id | |
accuracy = parse_metrics_accuracy(meta) | |
mean_reward, std_reward = parse_rewards(accuracy) | |
mean_reward = mean_reward if not pd.isna(mean_reward) else 0 | |
std_reward = std_reward if not pd.isna(std_reward) else 0 | |
row["Results"] = mean_reward - std_reward | |
row["Mean Reward"] = mean_reward | |
row["Std Reward"] = std_reward | |
return row | |
data = list(thread_map(process_model, model_ids, desc="Processing models")) | |
# Filter out None results (models with no metadata) | |
data = [row for row in data if row is not None] | |
ranked_dataframe = rank_dataframe(pd.DataFrame.from_records(data)) | |
new_history = ranked_dataframe | |
file_path = path + "/" + rl_env + ".csv" | |
new_history.to_csv(file_path, index=False) | |
return ranked_dataframe | |
def update_leaderboard_dataset(rl_env, path): | |
# Get model ids associated with rl_env | |
model_ids = get_model_ids(rl_env) | |
data = [] | |
for model_id in model_ids: | |
""" | |
readme_path = hf_hub_download(model_id, filename="README.md") | |
meta = metadata_load(readme_path) | |
""" | |
meta = get_metadata(model_id) | |
# LOADED_MODEL_METADATA[model_id] = meta if meta is not None else '' | |
if meta is None: | |
continue | |
user_id = model_id.split('/')[0] | |
row = {} | |
row["User"] = user_id | |
row["Model"] = model_id | |
accuracy = parse_metrics_accuracy(meta) | |
mean_reward, std_reward = parse_rewards(accuracy) | |
mean_reward = mean_reward if not pd.isna(mean_reward) else 0 | |
std_reward = std_reward if not pd.isna(std_reward) else 0 | |
row["Results"] = mean_reward - std_reward | |
row["Mean Reward"] = mean_reward | |
row["Std Reward"] = std_reward | |
data.append(row) | |
ranked_dataframe = rank_dataframe(pd.DataFrame.from_records(data)) | |
new_history = ranked_dataframe | |
file_path = path + "/" + rl_env + ".csv" | |
new_history.to_csv(file_path, index=False) | |
return ranked_dataframe | |
def get_data_no_html(rl_env, path) -> pd.DataFrame: | |
""" | |
Get data from rl_env | |
:return: data as a pandas DataFrame | |
""" | |
csv_path = path + "/" + rl_env + ".csv" | |
data = pd.read_csv(csv_path) | |
return data | |
def rank_dataframe(dataframe): | |
dataframe = dataframe.sort_values(by=['Results', 'User', 'Model'], ascending=False) | |
if 'Ranking' not in dataframe.columns: | |
dataframe.insert(0, 'Ranking', [i for i in range(1, len(dataframe) + 1)]) | |
else: | |
dataframe['Ranking'] = [i for i in range(1, len(dataframe) + 1)] | |
return dataframe | |
def run_update_dataset(): | |
path_ = download_leaderboard_dataset() | |
for i in range(0, len(rl_envs)): | |
rl_env = rl_envs[i] | |
update_leaderboard_dataset_parallel(rl_env["rl_env"], path_) | |
api.upload_folder( | |
folder_path=path_, | |
repo_id="pkalkman/drlc-leaderboard-data", | |
repo_type="dataset", | |
commit_message="Update dataset") | |
scheduler = BackgroundScheduler() | |
scheduler.add_job(run_update_dataset, 'interval', seconds=10800) | |
scheduler.start() | |