File size: 5,171 Bytes
ed3fe33
58c39e0
ed3fe33
 
8f809e2
3573a39
ed3fe33
58c39e0
ed3fe33
 
 
 
 
 
 
 
 
 
 
 
 
 
3573a39
92e2a79
 
ed3fe33
 
8f809e2
 
 
1c00552
92e2a79
8f809e2
 
92e2a79
8f809e2
 
 
 
3573a39
 
8f809e2
1c00552
92e2a79
 
8f809e2
3573a39
ed3fe33
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
3573a39
92e2a79
 
8f809e2
 
 
 
1c00552
92e2a79
3573a39
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
import json
import logging
import os
import subprocess
import threading
import time
from pathlib import Path

import pipe
from app_env import (
    HF_GSK_HUB_HF_TOKEN,
    HF_GSK_HUB_KEY,
    HF_GSK_HUB_PROJECT_KEY,
    HF_GSK_HUB_UNLOCK_TOKEN,
    HF_GSK_HUB_URL,
    HF_REPO_ID,
    HF_SPACE_ID,
    HF_WRITE_TOKEN,
)
from io_utils import LOG_FILE, get_yaml_path, write_log_to_user_file
from isolated_env import prepare_venv
from leaderboard import LEADERBOARD

is_running = False

logger = logging.getLogger(__file__)


def start_process_run_job():
    try:
        logging.debug("Running jobs in thread")
        global thread, is_running
        thread = threading.Thread(target=run_job)
        thread.daemon = True
        is_running = True
        thread.start()

    except Exception as e:
        print("Failed to start thread: ", e)


def stop_thread():
    logging.debug("Stop thread")
    global is_running
    is_running = False


def prepare_env_and_get_command(
    m_id,
    d_id,
    config,
    split,
    inference,
    inference_token,
    uid,
    label_mapping,
    feature_mapping,
):
    leaderboard_dataset = None
    if os.environ.get("SPACE_ID") == "giskardai/giskard-evaluator":
        leaderboard_dataset = LEADERBOARD

    inference_type = "hf_pipeline"
    if inference and inference_token:
        inference_type = "hf_inference_api"

    executable = "giskard_scanner"
    try:
        # Copy the current requirements (might be changed)
        with open("requirements.txt", "r") as f:
            executable = prepare_venv(
                uid,
                "\n".join(f.readlines()),
            )
        logger.info(f"Using {executable} as executable")
    except Exception as e:
        logger.warn(f"Create env failed due to {e}, using the current env as fallback.")
        executable = "giskard_scanner"

    command = [
        executable,
        "--loader",
        "huggingface",
        "--model",
        m_id,
        "--dataset",
        d_id,
        "--dataset_config",
        config,
        "--dataset_split",
        split,
        "--output_format",
        "markdown",
        "--output_portal",
        "huggingface",
        "--feature_mapping",
        json.dumps(feature_mapping),
        "--label_mapping",
        json.dumps(label_mapping),
        "--scan_config",
        get_yaml_path(uid),
        "--inference_type",
        inference_type,
        "--inference_api_token",
        inference_token,
    ]
    # The token to publish post
    if os.environ.get(HF_WRITE_TOKEN):
        command.append("--hf_token")
        command.append(os.environ.get(HF_WRITE_TOKEN))

    # The repo to publish post
    if os.environ.get(HF_REPO_ID) or os.environ.get(HF_SPACE_ID):
        command.append("--discussion_repo")
        # TODO: Replace by the model id
        command.append(os.environ.get(HF_REPO_ID) or os.environ.get(HF_SPACE_ID))

    # The repo to publish for ranking
    if leaderboard_dataset:
        command.append("--leaderboard_dataset")
        command.append(leaderboard_dataset)

    # The info to upload to Giskard hub
    if os.environ.get(HF_GSK_HUB_KEY):
        command.append("--giskard_hub_api_key")
        command.append(os.environ.get(HF_GSK_HUB_KEY))
        if os.environ.get(HF_GSK_HUB_URL):
            command.append("--giskard_hub_url")
            command.append(os.environ.get(HF_GSK_HUB_URL))
        if os.environ.get(HF_GSK_HUB_PROJECT_KEY):
            command.append("--giskard_hub_project_key")
            command.append(os.environ.get(HF_GSK_HUB_PROJECT_KEY))
        if os.environ.get(HF_GSK_HUB_HF_TOKEN):
            command.append("--giskard_hub_hf_token")
            command.append(os.environ.get(HF_GSK_HUB_HF_TOKEN))
        if os.environ.get(HF_GSK_HUB_UNLOCK_TOKEN):
            command.append("--giskard_hub_unlock_token")
            command.append(os.environ.get(HF_GSK_HUB_UNLOCK_TOKEN))

    eval_str = f"[{m_id}]<{d_id}({config}, {split} set)>"

    write_log_to_user_file(
        uid,
        f"Start local evaluation on {eval_str}. Please wait for your job to start...\n",
    )

    return command


def save_job_to_pipe(task_id, job, description, lock):
    with lock:
        pipe.jobs.append((task_id, job, description))


def pop_job_from_pipe():
    if len(pipe.jobs) == 0:
        return
    job_info = pipe.jobs.pop()
    pipe.current = job_info[2]
    task_id = job_info[0]
    write_log_to_user_file(task_id, f"Running job id {task_id}\n")
    command = prepare_env_and_get_command(*job_info[1])

    # Link to LOG_FILE
    log_file_path = Path(LOG_FILE)
    if log_file_path.exists():
        log_file_path.unlink()
    os.symlink(f"./tmp/{task_id}.log", LOG_FILE)

    with open(f"./tmp/{task_id}.log", "a") as log_file:
        p = subprocess.Popen(command, stdout=log_file, stderr=subprocess.STDOUT)
        p.wait()
    pipe.current = None


def run_job():
    global is_running
    while is_running:
        try:
            pop_job_from_pipe()
            time.sleep(10)
        except KeyboardInterrupt:
            logging.debug("KeyboardInterrupt stop background thread")
            is_running = False
            break