Madhavan Iyengar
add more robust error checking
42a6f9c
import subprocess
import gradio as gr
import zipfile
import os
import shutil
import pandas as pd
from apscheduler.schedulers.background import BackgroundScheduler
from huggingface_hub import snapshot_download, Repository, HfFolder
from src.about import (
CITATION_BUTTON_LABEL,
CITATION_BUTTON_TEXT,
EVALUATION_QUEUE_TEXT,
INTRODUCTION_TEXT,
LLM_BENCHMARKS_TEXT,
TITLE,
)
from src.display.css_html_js import custom_css
from src.display.utils import (
BENCHMARK_COLS,
COLS,
EVAL_COLS,
EVAL_TYPES,
NUMERIC_INTERVALS,
TYPES,
AutoEvalColumn,
ModelType,
fields,
WeightType,
Precision
)
from src.envs import API, EVAL_REQUESTS_PATH, EVAL_RESULTS_PATH, QUEUE_REPO, REPO_ID, RESULTS_REPO, TOKEN, EVAL_REQUESTS_PATH_BACKEND, EVAL_RESULTS_PATH_BACKEND
from src.populate import get_evaluation_queue_df, get_leaderboard_df
from src.submission.submit import add_new_eval
from src.submission.evaluate import calculate_metrics
import json
def handle_new_eval_submission(model_name, model_zip, model_link=None) -> str:
try:
# Input validation
if not model_name:
return "Please enter a model name."
if not isinstance(model_name, str):
return "Model name must be a string."
if len(model_name.split()) > 1:
return "Model name should be a single word with hyphens."
# Check if the model name is already in the leaderboard
if model_name in leaderboard_df[AutoEvalColumn.model.name].values:
return "Model name already exists in the leaderboard. Please choose a different name."
if model_zip is None:
return "Please provide a zip file."
extraction_path = os.path.join(EVAL_RESULTS_PATH_BACKEND, model_name)
if model_zip is not None:
# Check if the zip file is actually a zip file
if not zipfile.is_zipfile(model_zip):
return "Please upload a valid zip file."
# Create extraction path if it doesn't exist
os.makedirs(extraction_path, exist_ok=True)
# Extract the zip file
try:
with zipfile.ZipFile(model_zip, 'r') as zip_ref:
zip_ref.extractall(extraction_path)
except zipfile.BadZipFile:
return "The uploaded file is not a valid zip file."
except Exception as e:
return f"An error occurred while extracting the zip file: {str(e)}"
print("File unzipped successfully to:", extraction_path)
# Evaluate the model's performance
try:
calculate_metrics(extraction_path, model_name)
except Exception as e:
return f"An error occurred while calculating metrics: {str(e)}"
# Upload results to repo
results_file_path = os.path.join(os.getcwd(), EVAL_RESULTS_PATH, '3d-pope', model_name, 'results.json')
if not os.path.exists(results_file_path):
return f"Results file not found at {results_file_path}"
try:
with open(results_file_path, 'r') as f:
json.load(f) # Validate JSON structure
except json.JSONDecodeError:
return "The results file is not a valid JSON file."
try:
API.upload_file(
path_or_fileobj=results_file_path,
path_in_repo=os.path.join('3d-pope', model_name, 'results.json'),
repo_id=RESULTS_REPO,
repo_type="dataset",
)
except Exception as e:
return f"An error occurred while uploading results: {str(e)}"
# Restart the space
try:
restart_space()
except Exception as e:
return f"An error occurred while restarting the space: {str(e)}"
return "Submission received and results are being processed. Please check the leaderboard for updates."
except Exception as e:
return f"An unexpected error occurred: {str(e)}"
def restart_space():
API.restart_space(repo_id=REPO_ID)
try:
print(EVAL_REQUESTS_PATH)
snapshot_download(
repo_id=QUEUE_REPO, local_dir=EVAL_REQUESTS_PATH, repo_type="dataset", tqdm_class=None, etag_timeout=30, token=TOKEN
)
except Exception:
restart_space()
try:
print(EVAL_RESULTS_PATH)
snapshot_download(
repo_id=RESULTS_REPO, local_dir=EVAL_RESULTS_PATH, repo_type="dataset", tqdm_class=None, etag_timeout=30, token=TOKEN
)
except Exception:
restart_space()
raw_data, original_df = get_leaderboard_df(EVAL_RESULTS_PATH, EVAL_REQUESTS_PATH, COLS, BENCHMARK_COLS)
leaderboard_df = original_df.copy()
def custom_format(x):
if pd.isna(x):
return x # Return as is if NaN
try:
float_x = float(x)
if float_x.is_integer():
return f"{int(float_x)}"
else:
return f"{float_x:.2f}".rstrip('0').rstrip('.')
except ValueError:
return x # Return as is if conversion to float fails
numeric_cols = [col for col in leaderboard_df.columns if leaderboard_df[col].dtype in ['float64', 'float32']]
leaderboard_df[numeric_cols] = leaderboard_df[numeric_cols].applymap(custom_format)
(
finished_eval_queue_df,
running_eval_queue_df,
pending_eval_queue_df,
) = get_evaluation_queue_df(EVAL_REQUESTS_PATH, EVAL_COLS)
# Searching and filtering
def update_table(
hidden_df: pd.DataFrame,
columns: list,
# type_query: list,
# precision_query: str,
# size_query: list,
# show_deleted: bool,
query: str,
):
# filtered_df = filter_models(hidden_df, type_query, size_query, precision_query, show_deleted)
filtered_df = filter_queries(query, hidden_df)
df = select_columns(filtered_df, columns)
return df
def search_table(df: pd.DataFrame, query: str) -> pd.DataFrame:
return df[(df[AutoEvalColumn.model.name].str.contains(query, case=False))]
def select_columns(df: pd.DataFrame, columns: list) -> pd.DataFrame:
always_here_cols = [
# AutoEvalColumn.model_type_symbol.name,
AutoEvalColumn.model.name,
]
# We use COLS to maintain sorting
filtered_df = df[
always_here_cols + [c for c in COLS if c in df.columns and c in columns]
]
return filtered_df
def filter_queries(query: str, filtered_df: pd.DataFrame) -> pd.DataFrame:
final_df = []
if query != "":
queries = [q.strip() for q in query.split(";")]
for _q in queries:
_q = _q.strip()
if _q != "":
temp_filtered_df = search_table(filtered_df, _q)
if len(temp_filtered_df) > 0:
final_df.append(temp_filtered_df)
if len(final_df) > 0:
filtered_df = pd.concat(final_df)
existing_columns = [col for col in [AutoEvalColumn.model.name, AutoEvalColumn.precision.name, AutoEvalColumn.revision.name] if col in filtered_df.columns]
filtered_df = filtered_df.drop_duplicates(subset=existing_columns)
return filtered_df
def filter_models(
df: pd.DataFrame, type_query: list, size_query: list, precision_query: list, show_deleted: bool
) -> pd.DataFrame:
# Show all models
# if show_deleted:
# filtered_df = df
# else: # Show only still on the hub models
# filtered_df = df[df[AutoEvalColumn.still_on_hub.name] == True]
filtered_df = df
type_emoji = [t[0] for t in type_query]
# filtered_df = filtered_df.loc[df[AutoEvalColumn.model_type_symbol.name].isin(type_emoji)]
# filtered_df = filtered_df.loc[df[AutoEvalColumn.precision.name].isin(precision_query + ["None"])]
numeric_interval = pd.IntervalIndex(sorted([NUMERIC_INTERVALS[s] for s in size_query]))
params_column = pd.to_numeric(df[AutoEvalColumn.params.name], errors="coerce")
mask = params_column.apply(lambda x: any(numeric_interval.contains(x)))
filtered_df = filtered_df.loc[mask]
return filtered_df
demo = gr.Blocks(css=custom_css)
with demo:
gr.HTML(TITLE)
gr.Markdown(INTRODUCTION_TEXT, elem_classes="markdown-text")
with gr.Tabs(elem_classes="tab-buttons") as tabs:
with gr.TabItem("πŸ… 3D-POPE Benchmark", elem_id="llm-benchmark-tab-table", id=0):
with gr.Row():
with gr.Column():
with gr.Row():
search_bar = gr.Textbox(
placeholder=" πŸ” Search for your model (separate multiple queries with `;`) and press ENTER...",
show_label=False,
elem_id="search-bar",
)
with gr.Row():
shown_columns = gr.CheckboxGroup(
choices=[
c.name
for c in fields(AutoEvalColumn)
if not c.hidden and not c.never_hidden
],
value=[
c.name
for c in fields(AutoEvalColumn)
if c.displayed_by_default and not c.hidden and not c.never_hidden
],
label="Select columns to show",
elem_id="column-select",
interactive=True,
)
# with gr.Row():
# deleted_models_visibility = gr.Checkbox(
# value=False, label="Show gated/private/deleted models", interactive=True
# )
# with gr.Column(min_width=320):
#with gr.Box(elem_id="box-filter"):
leaderboard_table = gr.components.Dataframe(
value=leaderboard_df[
[c.name for c in fields(AutoEvalColumn) if c.never_hidden]
+ shown_columns.value
],
headers=[c.name for c in fields(AutoEvalColumn) if c.never_hidden] + shown_columns.value,
datatype=TYPES,
elem_id="leaderboard-table",
interactive=False,
visible=True,
)
# Dummy leaderboard for handling the case when the user uses backspace key
hidden_leaderboard_table_for_search = gr.components.Dataframe(
value=original_df[COLS],
headers=COLS,
datatype=TYPES,
visible=False,
)
search_bar.submit(
update_table,
[
hidden_leaderboard_table_for_search,
shown_columns,
# deleted_models_visibility,
search_bar,
],
leaderboard_table,
)
for selector in [shown_columns]:
selector.change(
update_table,
[
hidden_leaderboard_table_for_search,
shown_columns,
# deleted_models_visibility,
search_bar,
],
leaderboard_table,
queue=True,
)
with gr.TabItem("πŸ“ About", elem_id="llm-benchmark-tab-table", id=2):
gr.Markdown(LLM_BENCHMARKS_TEXT, elem_classes="markdown-text")
with gr.TabItem("πŸš€ Submit here! ", elem_id="llm-benchmark-tab-table", id=3):
with gr.Column():
with gr.Row():
gr.Markdown(EVALUATION_QUEUE_TEXT, elem_classes="markdown-text")
with gr.Row():
gr.Markdown("# πŸ“‹ Submit your results here!", elem_classes="markdown-text")
with gr.Row():
model_name_textbox = gr.Textbox(label="Model name")
model_zip_file = gr.File(label="Upload model prediction result ZIP file")
# model_link_textbox = gr.Textbox(label="Link to model page")
with gr.Row():
gr.Column()
with gr.Column(scale=2):
submit_button = gr.Button("Submit Model")
submission_result = gr.Markdown()
submit_button.click(
handle_new_eval_submission,
[model_name_textbox, model_zip_file],
submission_result
)
gr.Column()
with gr.Row():
with gr.Accordion("πŸ“™ Citation", open=False):
citation_button = gr.Textbox(
value=CITATION_BUTTON_TEXT,
label=CITATION_BUTTON_LABEL,
lines=20,
elem_id="citation-button",
show_copy_button=True,
)
scheduler = BackgroundScheduler()
scheduler.add_job(restart_space, "interval", seconds=1800)
scheduler.start()
demo.queue(default_concurrency_limit=40).launch()