Spaces:
Running
Running
import json | |
import logging | |
import os | |
import subprocess | |
import time | |
import gradio as gr | |
import pandas as pd | |
from apscheduler.schedulers.background import BackgroundScheduler | |
from gradio_leaderboard import Leaderboard, SelectColumns | |
from gradio_space_ci import enable_space_ci | |
from huggingface_hub import snapshot_download | |
from src.display.about import ( | |
FAQ_TEXT, | |
INTRODUCTION_TEXT, | |
LLM_BENCHMARKS_TEXT, | |
TITLE, | |
) | |
from src.display.css_html_js import custom_css | |
from src.display.utils import ( | |
# BENCHMARK_COLS, | |
AutoEvalColumn, | |
fields, | |
) | |
from src.envs import ( | |
API, | |
EVAL_RESULTS_PATH, | |
H4_TOKEN, | |
REPO_ID, | |
RESET_JUDGEMENT_ENV, | |
) | |
os.environ['GRADIO_ANALYTICS_ENABLED']='false' | |
# Configure logging | |
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
# Start ephemeral Spaces on PRs (see config in README.md) | |
enable_space_ci() | |
def restart_space(): | |
API.restart_space(repo_id=REPO_ID, token=H4_TOKEN) | |
def time_diff_wrapper(func): | |
def wrapper(*args, **kwargs): | |
start_time = time.time() | |
result = func(*args, **kwargs) | |
end_time = time.time() | |
diff = end_time - start_time | |
logging.info(f"Time taken for {func.__name__}: {diff} seconds") | |
return result | |
return wrapper | |
def download_dataset(repo_id, local_dir, repo_type="dataset", max_attempts=3, backoff_factor=1.5): | |
"""Download dataset with exponential backoff retries.""" | |
attempt = 0 | |
while attempt < max_attempts: | |
try: | |
logging.info(f"Downloading {repo_id} to {local_dir}") | |
snapshot_download( | |
repo_id=repo_id, | |
local_dir=local_dir, | |
repo_type=repo_type, | |
tqdm_class=None, | |
etag_timeout=30, | |
max_workers=8, | |
) | |
logging.info("Download successful") | |
return | |
except Exception as e: | |
wait_time = backoff_factor ** attempt | |
logging.error(f"Error downloading {repo_id}: {e}, retrying in {wait_time}s") | |
time.sleep(wait_time) | |
attempt += 1 | |
raise Exception(f"Failed to download {repo_id} after {max_attempts} attempts") | |
def init_space(full_init: bool = True): | |
"""Initializes the application space, loading only necessary data.""" | |
if full_init: | |
# These downloads only occur on full initialization | |
# try: | |
# download_dataset(QUEUE_REPO, EVAL_REQUESTS_PATH) | |
# download_dataset(DYNAMIC_INFO_REPO, DYNAMIC_INFO_PATH) | |
download_dataset("Vikhrmodels/openbench-eval", EVAL_RESULTS_PATH) | |
# print(subprocess.Popen('ls src')) | |
subprocess.run(['rsync', '-avzP', '--ignore-existing', f'{EVAL_RESULTS_PATH[2:]}/external/*', 'src/gen/data/arena-hard-v0.1/model_answer/']) | |
subprocess.run(['rsync', '-avzP', '--ignore-existing', f'{EVAL_RESULTS_PATH[2:]}/model_judgment/*', 'src/gen/data/arena-hard-v0.1/model_judgement/']) | |
# except Exception: | |
# restart_space() | |
# Always retrieve the leaderboard DataFrame | |
original_df = pd.DataFrame.from_records(json.load(open('eval-results/evals/upd.json','r'))) | |
leaderboard_df = original_df.copy() | |
return leaderboard_df | |
# Convert the environment variable "LEADERBOARD_FULL_INIT" to a boolean value, defaulting to True if the variable is not set. | |
# This controls whether a full initialization should be performed. | |
do_full_init = os.getenv("LEADERBOARD_FULL_INIT", "True") == "True" | |
# Calls the init_space function with the `full_init` parameter determined by the `do_full_init` variable. | |
# This initializes various DataFrames used throughout the application, with the level of initialization detail controlled by the `do_full_init` flag. | |
leaderboard_df = init_space(full_init=do_full_init) | |
demo = gr.Blocks(css=custom_css) | |
with demo: | |
gr.HTML(TITLE) | |
gr.Markdown(INTRODUCTION_TEXT, elem_classes="markdown-text") | |
with gr.Tabs(elem_classes="tab-buttons") as tabs: | |
with gr.TabItem("🏅 LLM Benchmark", elem_id="llm-benchmark-tab-table", id=0): | |
pass | |
leaderboard = Leaderboard( | |
value=leaderboard_df, | |
datatype=[c.type for c in fields(AutoEvalColumn)], | |
select_columns=SelectColumns( | |
default_selection=[ | |
c.name | |
for c in fields(AutoEvalColumn) | |
if c.displayed_by_default | |
], | |
cant_deselect=[c.name for c in fields(AutoEvalColumn) if c.never_hidden or c.dummy], | |
label="Select Columns to Display:", | |
), | |
search_columns=[ | |
AutoEvalColumn.model.name, | |
# AutoEvalColumn.fullname.name, | |
# AutoEvalColumn.license.name | |
], | |
) | |
with gr.TabItem("📝 About", elem_id="llm-benchmark-tab-table", id=3): | |
gr.Markdown(LLM_BENCHMARKS_TEXT, elem_classes="markdown-text") | |
with gr.TabItem("❗FAQ", elem_id="llm-benchmark-tab-table", id=4): | |
gr.Markdown(FAQ_TEXT, elem_classes="markdown-text") | |
with gr.TabItem("🚀 Submit ", elem_id="llm-benchmark-tab-table", id=5): | |
with gr.Row(): | |
gr.Markdown("# ✨ Submit your model here!", elem_classes="markdown-text") | |
with gr.Column(): | |
model_name_textbox = gr.Textbox(label="Model name") | |
def upload_file(file): | |
print(file.name) | |
file_path = file.name.split('/')[-1] if '/' in file.name else file.name | |
print(file_path) | |
API.upload_file(path_or_fileobj=file.name,path_in_repo='./external/'+file_path,repo_id='Vikhrmodels/openbench-eval',repo_type='dataset') | |
os.environ[RESET_JUDGEMENT_ENV] = '1' | |
return file.name | |
if model_name_textbox: | |
file_output = gr.File() | |
upload_button = gr.UploadButton("Click to Upload & Submit Answers", file_types=['*'], file_count="single") | |
upload_button.upload(upload_file, upload_button, file_output) | |
# print(os.system('cd src/gen && ../../.venv/bin/python gen_judgment.py')) | |
# print(os.system('cd src/gen/ && python show_result.py --output')) | |
def update_board(): | |
need_reset = os.environ.get(RESET_JUDGEMENT_ENV) | |
if need_reset != '1': | |
return | |
os.environ[RESET_JUDGEMENT_ENV] = '0' | |
subprocess.run(['python','../gen/gen_judgement.py']) | |
subprocess.Popen('python3 ../gen/show_result.py --output') | |
if __name__ == "__main__": | |
os.environ[RESET_JUDGEMENT_ENV] = '1' | |
scheduler = BackgroundScheduler() | |
scheduler.add_job(update_board, "interval", minutes=10) | |
scheduler.start() | |
demo.queue(default_concurrency_limit=40).launch(debug=True) | |