giskard-evaluator / text_classification_ui_helpers.py
inoki-giskard's picture
Add more Giskard hub info for uploading
e163df8
raw
history blame
9.94 kB
import collections
import json
import logging
import os
import threading
import datasets
import gradio as gr
from transformers.pipelines import TextClassificationPipeline
from wordings import get_styled_input
from io_utils import (get_yaml_path, read_column_mapping, save_job_to_pipe,
write_column_mapping, write_inference_type,
write_log_to_user_file)
from text_classification import (check_model, get_example_prediction,
get_labels_and_features_from_dataset)
from wordings import (CHECK_CONFIG_OR_SPLIT_RAW,
CONFIRM_MAPPING_DETAILS_FAIL_RAW,
MAPPING_STYLED_ERROR_WARNING)
MAX_LABELS = 20
MAX_FEATURES = 20
HF_REPO_ID = "HF_REPO_ID"
HF_SPACE_ID = "SPACE_ID"
HF_WRITE_TOKEN = "HF_WRITE_TOKEN"
HF_GSK_HUB_URL = "GSK_HUB_URL"
HF_GSK_HUB_PROJECT_KEY = "GSK_HUB_PROJECT_KEY"
HF_GSK_HUB_KEY = "GSK_API_KEY"
HF_GSK_HUB_HF_TOKEN = "GSK_HF_TOKEN"
HF_GSK_HUB_UNLOCK_TOKEN = "GSK_HUB_UNLOCK_TOKEN"
def check_dataset_and_get_config(dataset_id):
try:
# write_column_mapping(None, uid) # reset column mapping
configs = datasets.get_dataset_config_names(dataset_id)
return gr.Dropdown(configs, value=configs[0], visible=True)
except Exception:
# Dataset may not exist
pass
def check_dataset_and_get_split(dataset_id, dataset_config):
try:
splits = list(datasets.load_dataset(dataset_id, dataset_config).keys())
return gr.Dropdown(splits, value=splits[0], visible=True)
except Exception:
# Dataset may not exist
# gr.Warning(f"Failed to load dataset {dataset_id} with config {dataset_config}: {e}")
pass
def select_run_mode(run_inf, inf_token, uid):
if run_inf:
if len(inf_token) > 0:
write_inference_type(run_inf, inf_token, uid)
return (gr.update(visible=True), gr.update(value=False))
else:
return (gr.update(visible=False), gr.update(value=True))
def deselect_run_inference(run_local):
if run_local:
return (gr.update(visible=False), gr.update(value=False))
else:
return (gr.update(visible=True), gr.update(value=True))
def write_column_mapping_to_config(
dataset_id, dataset_config, dataset_split, uid, *labels
):
# TODO: Substitute 'text' with more features for zero-shot
# we are not using ds features because we only support "text" for now
ds_labels, _ = get_labels_and_features_from_dataset(
dataset_id, dataset_config, dataset_split
)
if labels is None:
return
all_mappings = dict()
if "labels" not in all_mappings.keys():
all_mappings["labels"] = dict()
for i, label in enumerate(labels[:MAX_LABELS]):
if label:
all_mappings["labels"][label] = ds_labels[i % len(ds_labels)]
if "features" not in all_mappings.keys():
all_mappings["features"] = dict()
for _, feat in enumerate(labels[MAX_LABELS : (MAX_LABELS + MAX_FEATURES)]):
if feat:
# TODO: Substitute 'text' with more features for zero-shot
all_mappings["features"]["text"] = feat
write_column_mapping(all_mappings, uid)
def list_labels_and_features_from_dataset(ds_labels, ds_features, model_id2label):
model_labels = list(model_id2label.values())
len_model_labels = len(model_labels)
lables = [
gr.Dropdown(
label=f"{label}",
choices=model_labels,
value=model_id2label[i % len_model_labels],
interactive=True,
visible=True,
)
for i, label in enumerate(ds_labels[:MAX_LABELS])
]
lables += [gr.Dropdown(visible=False) for _ in range(MAX_LABELS - len(lables))]
# TODO: Substitute 'text' with more features for zero-shot
features = [
gr.Dropdown(
label=f"{feature}",
choices=ds_features,
value=ds_features[0],
interactive=True,
visible=True,
)
for feature in ["text"]
]
features += [
gr.Dropdown(visible=False) for _ in range(MAX_FEATURES - len(features))
]
return lables + features
def check_model_and_show_prediction(
model_id, dataset_id, dataset_config, dataset_split
):
ppl = check_model(model_id)
if ppl is None or not isinstance(ppl, TextClassificationPipeline):
gr.Warning("Please check your model.")
return (
gr.update(visible=False),
gr.update(visible=False),
*[gr.update(visible=False) for _ in range(MAX_LABELS + MAX_FEATURES)],
)
dropdown_placement = [
gr.Dropdown(visible=False) for _ in range(MAX_LABELS + MAX_FEATURES)
]
if ppl is None: # pipeline not found
gr.Warning("Model not found")
return (
gr.update(visible=False),
gr.update(visible=False),
gr.update(visible=False, open=False),
*dropdown_placement,
)
model_id2label = ppl.model.config.id2label
ds_labels, ds_features = get_labels_and_features_from_dataset(
dataset_id, dataset_config, dataset_split
)
# when dataset does not have labels or features
if not isinstance(ds_labels, list) or not isinstance(ds_features, list):
gr.Warning(CHECK_CONFIG_OR_SPLIT_RAW)
return (
gr.update(visible=False),
gr.update(visible=False),
gr.update(visible=False, open=False),
*dropdown_placement,
)
column_mappings = list_labels_and_features_from_dataset(
ds_labels,
ds_features,
model_id2label,
)
# when labels or features are not aligned
# show manually column mapping
if (
collections.Counter(model_id2label.values()) != collections.Counter(ds_labels)
or ds_features[0] != "text"
):
return (
gr.update(value=MAPPING_STYLED_ERROR_WARNING, visible=True),
gr.update(visible=False),
gr.update(visible=True, open=True),
*column_mappings,
)
prediction_input, prediction_output = get_example_prediction(
ppl, dataset_id, dataset_config, dataset_split
)
return (
gr.update(value=get_styled_input(prediction_input), visible=True),
gr.update(value=prediction_output, visible=True),
gr.update(visible=True, open=False),
*column_mappings,
)
def try_submit(m_id, d_id, config, split, local, uid):
all_mappings = read_column_mapping(uid)
if all_mappings is None:
gr.Warning(CONFIRM_MAPPING_DETAILS_FAIL_RAW)
return (gr.update(interactive=True), gr.update(visible=False))
if "labels" not in all_mappings.keys():
gr.Warning(CONFIRM_MAPPING_DETAILS_FAIL_RAW)
return (gr.update(interactive=True), gr.update(visible=False))
label_mapping = {}
for i, label in zip(
range(len(all_mappings["labels"].keys())), all_mappings["labels"].keys()
):
label_mapping.update({str(i): label})
if "features" not in all_mappings.keys():
gr.Warning(CONFIRM_MAPPING_DETAILS_FAIL_RAW)
return (gr.update(interactive=True), gr.update(visible=False))
feature_mapping = all_mappings["features"]
leaderboard_dataset = None
if os.environ.get("SPACE_ID") == "giskardai/giskard-evaluator":
leaderboard_dataset = "ZeroCommand/test-giskard-report"
# TODO: Set column mapping for some dataset such as `amazon_polarity`
if local:
command = [
"giskard_scanner",
"--loader",
"huggingface",
"--model",
m_id,
"--dataset",
d_id,
"--dataset_config",
config,
"--dataset_split",
split,
"--hf_token",
os.environ.get(HF_WRITE_TOKEN),
"--discussion_repo",
os.environ.get(HF_REPO_ID) or os.environ.get(HF_SPACE_ID),
"--output_format",
"markdown",
"--output_portal",
"huggingface",
"--feature_mapping",
json.dumps(feature_mapping),
"--label_mapping",
json.dumps(label_mapping),
"--scan_config",
get_yaml_path(uid),
"--leaderboard_dataset",
leaderboard_dataset,
]
if os.environ.get(HF_GSK_HUB_KEY):
command.append("--giskard_hub_api_key")
command.append(os.environ.get(HF_GSK_HUB_KEY))
if os.environ.get(HF_GSK_HUB_URL):
command.append("--giskard_hub_url")
command.append(os.environ.get(HF_GSK_HUB_URL))
if os.environ.get(HF_GSK_HUB_PROJECT_KEY):
command.append("--giskard_hub_project_key")
command.append(os.environ.get(HF_GSK_HUB_PROJECT_KEY))
if os.environ.get(HF_GSK_HUB_HF_TOKEN):
command.append("--giskard_hub_hf_token")
command.append(os.environ.get(HF_GSK_HUB_HF_TOKEN))
if os.environ.get(HF_GSK_HUB_UNLOCK_TOKEN):
command.append("--giskard_hub_unlock_token")
command.append(os.environ.get(HF_GSK_HUB_UNLOCK_TOKEN))
eval_str = f"[{m_id}]<{d_id}({config}, {split} set)>"
logging.info(f"Start local evaluation on {eval_str}")
save_job_to_pipe(uid, command, eval_str, threading.Lock())
write_log_to_user_file(
uid,
f"Start local evaluation on {eval_str}. Please wait for your job to start...\n",
)
gr.Info(f"Start local evaluation on {eval_str}")
return (
gr.update(interactive=False),
gr.update(lines=5, visible=True, interactive=False),
)
else:
gr.Info("TODO: Submit task to an endpoint")
return (gr.update(interactive=True), gr.update(visible=False)) # Submit button