|
import json |
|
import logging |
|
import os |
|
import subprocess |
|
import threading |
|
import time |
|
from pathlib import Path |
|
|
|
import pipe |
|
from app_env import ( |
|
HF_GSK_HUB_HF_TOKEN, |
|
HF_GSK_HUB_KEY, |
|
HF_GSK_HUB_PROJECT_KEY, |
|
HF_GSK_HUB_UNLOCK_TOKEN, |
|
HF_GSK_HUB_URL, |
|
HF_REPO_ID, |
|
HF_SPACE_ID, |
|
HF_WRITE_TOKEN, |
|
) |
|
from io_utils import LOG_FILE, get_submitted_yaml_path, write_log_to_user_file |
|
from isolated_env import prepare_venv |
|
from leaderboard import LEADERBOARD |
|
|
|
is_running = False |
|
|
|
logger = logging.getLogger(__file__) |
|
|
|
|
|
def start_process_run_job(): |
|
try: |
|
logging.debug("Running jobs in thread") |
|
global thread, is_running |
|
thread = threading.Thread(target=run_job) |
|
thread.daemon = True |
|
is_running = True |
|
thread.start() |
|
|
|
except Exception as e: |
|
print("Failed to start thread: ", e) |
|
|
|
|
|
def stop_thread(): |
|
logging.debug("Stop thread") |
|
global is_running |
|
is_running = False |
|
|
|
|
|
def prepare_env_and_get_command( |
|
m_id, |
|
d_id, |
|
config, |
|
split, |
|
inference_token, |
|
uid, |
|
label_mapping, |
|
feature_mapping, |
|
): |
|
leaderboard_dataset = None |
|
if os.environ.get("SPACE_ID") == "giskardai/giskard-evaluator": |
|
leaderboard_dataset = LEADERBOARD |
|
|
|
executable = "giskard_scanner" |
|
try: |
|
|
|
with open("requirements.txt", "r") as f: |
|
executable = prepare_venv( |
|
uid, |
|
"\n".join(f.readlines()), |
|
) |
|
logger.info(f"Using {executable} as executable") |
|
except Exception as e: |
|
logger.warn(f"Create env failed due to {e}, using the current env as fallback.") |
|
executable = "giskard_scanner" |
|
|
|
command = [ |
|
executable, |
|
"--loader", |
|
"huggingface", |
|
"--model", |
|
m_id, |
|
"--dataset", |
|
d_id, |
|
"--dataset_config", |
|
config, |
|
"--dataset_split", |
|
split, |
|
"--output_format", |
|
"markdown", |
|
"--output_portal", |
|
"huggingface", |
|
"--feature_mapping", |
|
json.dumps(feature_mapping), |
|
"--label_mapping", |
|
json.dumps(label_mapping), |
|
"--scan_config", |
|
get_submitted_yaml_path(uid), |
|
"--inference_type", |
|
"hf_inference_api", |
|
"--inference_api_token", |
|
inference_token, |
|
] |
|
|
|
if os.environ.get(HF_WRITE_TOKEN): |
|
command.append("--hf_token") |
|
command.append(os.environ.get(HF_WRITE_TOKEN)) |
|
|
|
|
|
if os.environ.get(HF_REPO_ID) or os.environ.get(HF_SPACE_ID): |
|
command.append("--discussion_repo") |
|
|
|
command.append(os.environ.get(HF_REPO_ID) or os.environ.get(HF_SPACE_ID)) |
|
|
|
|
|
if leaderboard_dataset: |
|
command.append("--leaderboard_dataset") |
|
command.append(leaderboard_dataset) |
|
|
|
|
|
if os.environ.get(HF_GSK_HUB_KEY): |
|
command.append("--giskard_hub_api_key") |
|
command.append(os.environ.get(HF_GSK_HUB_KEY)) |
|
if os.environ.get(HF_GSK_HUB_URL): |
|
command.append("--giskard_hub_url") |
|
command.append(os.environ.get(HF_GSK_HUB_URL)) |
|
if os.environ.get(HF_GSK_HUB_PROJECT_KEY): |
|
command.append("--giskard_hub_project_key") |
|
command.append(os.environ.get(HF_GSK_HUB_PROJECT_KEY)) |
|
if os.environ.get(HF_GSK_HUB_HF_TOKEN): |
|
command.append("--giskard_hub_hf_token") |
|
command.append(os.environ.get(HF_GSK_HUB_HF_TOKEN)) |
|
if os.environ.get(HF_GSK_HUB_UNLOCK_TOKEN): |
|
command.append("--giskard_hub_unlock_token") |
|
command.append(os.environ.get(HF_GSK_HUB_UNLOCK_TOKEN)) |
|
|
|
eval_str = f"[{m_id}]<{d_id}({config}, {split} set)>" |
|
|
|
write_log_to_user_file( |
|
uid, |
|
f"Start local evaluation on {eval_str}. Please wait for your job to start...\n", |
|
) |
|
|
|
return command |
|
|
|
|
|
def save_job_to_pipe(task_id, job, description, lock): |
|
with lock: |
|
pipe.jobs.append((task_id, job, description)) |
|
|
|
|
|
def pop_job_from_pipe(): |
|
if len(pipe.jobs) == 0: |
|
return |
|
job_info = pipe.jobs.pop() |
|
pipe.current = job_info[2] |
|
task_id = job_info[0] |
|
|
|
|
|
log_file_path = Path(LOG_FILE) |
|
if log_file_path.exists(): |
|
log_file_path.unlink() |
|
os.symlink(f"./tmp/{task_id}.log", LOG_FILE) |
|
|
|
write_log_to_user_file(task_id, f"Running job id {task_id}\n") |
|
command = prepare_env_and_get_command(*job_info[1]) |
|
|
|
with open(f"./tmp/{task_id}.log", "a") as log_file: |
|
p = subprocess.Popen(command, stdout=log_file, stderr=subprocess.STDOUT) |
|
p.wait() |
|
pipe.current = None |
|
|
|
|
|
def run_job(): |
|
global is_running |
|
while is_running: |
|
try: |
|
pop_job_from_pipe() |
|
time.sleep(10) |
|
except KeyboardInterrupt: |
|
logging.debug("KeyboardInterrupt stop background thread") |
|
is_running = False |
|
break |
|
|