Spaces:
Running
Running
import json | |
import logging | |
import os | |
import subprocess | |
import threading | |
import time | |
from pathlib import Path | |
import pipe | |
from app_env import ( | |
HF_GSK_HUB_HF_TOKEN, | |
HF_GSK_HUB_KEY, | |
HF_GSK_HUB_PROJECT_KEY, | |
HF_GSK_HUB_UNLOCK_TOKEN, | |
HF_GSK_HUB_URL, | |
HF_REPO_ID, | |
HF_SPACE_ID, | |
HF_WRITE_TOKEN, | |
) | |
from io_utils import LOG_FILE, get_submitted_yaml_path, write_log_to_user_file | |
from isolated_env import prepare_venv | |
from leaderboard import LEADERBOARD | |
is_running = False | |
logger = logging.getLogger(__file__) | |
def start_process_run_job(): | |
try: | |
logging.debug("Running jobs in thread") | |
global thread, is_running | |
thread = threading.Thread(target=run_job) | |
thread.daemon = True | |
is_running = True | |
thread.start() | |
except Exception as e: | |
print("Failed to start thread: ", e) | |
def stop_thread(): | |
logging.debug("Stop thread") | |
global is_running | |
is_running = False | |
def prepare_env_and_get_command( | |
m_id, | |
d_id, | |
config, | |
split, | |
inference_token, | |
uid, | |
label_mapping, | |
feature_mapping, | |
verbose, | |
): | |
leaderboard_dataset = None | |
if os.environ.get("SPACE_ID") == "giskardai/giskard-evaluator": | |
leaderboard_dataset = LEADERBOARD | |
executable = "giskard_scanner" | |
try: | |
# Copy the current requirements (might be changed) | |
with open("requirements.txt", "r") as f: | |
executable = prepare_venv( | |
uid, | |
"\n".join(f.readlines()), | |
) | |
logger.info(f"Using {executable} as executable") | |
except Exception as e: | |
logger.warning(f"Create env failed due to {e}, using the current env as fallback.") | |
executable = "giskard_scanner" | |
command = [ | |
executable, | |
"--loader", | |
"huggingface", | |
"--model", | |
m_id, | |
"--dataset", | |
d_id, | |
"--dataset_config", | |
config, | |
"--dataset_split", | |
split, | |
"--output_format", | |
"markdown", | |
"--output_portal", | |
"huggingface", | |
"--feature_mapping", | |
json.dumps(feature_mapping), | |
"--label_mapping", | |
json.dumps(label_mapping), | |
"--scan_config", | |
get_submitted_yaml_path(uid), | |
"--inference_type", | |
"hf_inference_api", | |
"--inference_api_token", | |
inference_token, | |
"--persist_scan", | |
] | |
# The token to publish post | |
if os.environ.get(HF_WRITE_TOKEN): | |
command.append("--hf_token") | |
command.append(os.environ.get(HF_WRITE_TOKEN)) | |
# The repo to publish for ranking | |
if leaderboard_dataset: | |
command.append("--leaderboard_dataset") | |
command.append(leaderboard_dataset) | |
# The info to upload to Giskard hub | |
if os.environ.get(HF_GSK_HUB_KEY): | |
command.append("--giskard_hub_api_key") | |
command.append(os.environ.get(HF_GSK_HUB_KEY)) | |
if os.environ.get(HF_GSK_HUB_URL): | |
command.append("--giskard_hub_url") | |
command.append(os.environ.get(HF_GSK_HUB_URL)) | |
if os.environ.get(HF_GSK_HUB_PROJECT_KEY): | |
command.append("--giskard_hub_project_key") | |
command.append(os.environ.get(HF_GSK_HUB_PROJECT_KEY)) | |
if os.environ.get(HF_GSK_HUB_HF_TOKEN): | |
command.append("--giskard_hub_hf_token") | |
command.append(os.environ.get(HF_GSK_HUB_HF_TOKEN)) | |
if os.environ.get(HF_GSK_HUB_UNLOCK_TOKEN): | |
command.append("--giskard_hub_unlock_token") | |
command.append(os.environ.get(HF_GSK_HUB_UNLOCK_TOKEN)) | |
if verbose: | |
command.append("--verbose") | |
eval_str = f"[{m_id}]<{d_id}({config}, {split} set)>" | |
write_log_to_user_file( | |
uid, | |
f"Start local evaluation on {eval_str}. Please wait for your job to start...\n", | |
) | |
return command | |
def save_job_to_pipe(task_id, job, description, lock): | |
with lock: | |
pipe.jobs.append((task_id, job, description)) | |
def pop_job_from_pipe(): | |
if len(pipe.jobs) == 0: | |
return | |
job_info = pipe.jobs.pop() | |
pipe.current = job_info[2] | |
task_id = job_info[0] | |
# Link to LOG_FILE | |
log_file_path = Path(LOG_FILE) | |
if log_file_path.exists(): | |
log_file_path.unlink() | |
os.symlink(f"./tmp/{task_id}.log", LOG_FILE) | |
write_log_to_user_file(task_id, f"Running job id {task_id}\n") | |
command = prepare_env_and_get_command(*job_info[1]) | |
with open(f"./tmp/{task_id}.log", "a") as log_file: | |
return_code = None | |
p = subprocess.Popen(command, stdout=log_file, stderr=subprocess.STDOUT) | |
while pipe.current and return_code is None: | |
# Wait for finishing | |
try: | |
return_code = p.wait(timeout=1) | |
except subprocess.TimeoutExpired: | |
return_code = None | |
if not pipe.current: | |
# Job interrupted before finishing | |
p.kill() | |
log_file.write(f"\nJob interrupted by admin at {time.asctime()}\n") | |
if return_code: | |
log_file.write(f"\nJob finished with {return_code} at {time.asctime()}\n") | |
pipe.current = None | |
def run_job(): | |
global is_running | |
while is_running: | |
try: | |
pop_job_from_pipe() | |
time.sleep(10) | |
except KeyboardInterrupt: | |
logging.debug("KeyboardInterrupt stop background thread") | |
is_running = False | |
break | |