|
|
|
|
|
import os |
|
import json |
|
|
|
import random |
|
from datetime import datetime |
|
|
|
from huggingface_hub import snapshot_download |
|
|
|
from src.backend.run_eval_suite import run_evaluation |
|
from src.backend.manage_requests import check_completed_evals, get_eval_requests, set_eval_request |
|
from src.backend.sort_queue import sort_models_by_priority |
|
from src.backend.envs import Tasks, EVAL_REQUESTS_PATH_BACKEND, EVAL_RESULTS_PATH_BACKEND, DEVICE, LIMIT, Task |
|
|
|
from src.backend.manage_requests import EvalRequest |
|
from src.leaderboard.read_evals import EvalResult |
|
|
|
from src.envs import QUEUE_REPO, RESULTS_REPO, API |
|
|
|
import logging |
|
import pprint |
|
|
|
logging.getLogger("openai").setLevel(logging.WARNING) |
|
|
|
logging.basicConfig(level=logging.ERROR) |
|
pp = pprint.PrettyPrinter(width=80) |
|
|
|
PENDING_STATUS = "PENDING" |
|
RUNNING_STATUS = "RUNNING" |
|
FINISHED_STATUS = "FINISHED" |
|
FAILED_STATUS = "FAILED" |
|
|
|
TASKS_HARNESS = [task.value for task in Tasks] |
|
|
|
|
|
def my_snapshot_download(repo_id, revision, local_dir, repo_type, max_workers): |
|
for i in range(10): |
|
try: |
|
snapshot_download(repo_id=repo_id, revision=revision, local_dir=local_dir, repo_type=repo_type, max_workers=max_workers) |
|
return |
|
except Exception: |
|
import time |
|
time.sleep(10) |
|
return |
|
|
|
|
|
my_snapshot_download(repo_id=RESULTS_REPO, revision="main", local_dir=EVAL_RESULTS_PATH_BACKEND, repo_type="dataset", max_workers=60) |
|
my_snapshot_download(repo_id=QUEUE_REPO, revision="main", local_dir=EVAL_REQUESTS_PATH_BACKEND, repo_type="dataset", max_workers=60) |
|
|
|
|
|
def sanity_checks(): |
|
print(f'Device: {DEVICE}') |
|
|
|
|
|
|
|
my_snapshot_download(repo_id=QUEUE_REPO, revision="main", local_dir=EVAL_REQUESTS_PATH_BACKEND, repo_type="dataset", max_workers=60) |
|
check_completed_evals(api=API, checked_status=RUNNING_STATUS, completed_status=FINISHED_STATUS, |
|
failed_status=FAILED_STATUS, hf_repo=QUEUE_REPO, local_dir=EVAL_REQUESTS_PATH_BACKEND, |
|
hf_repo_results=RESULTS_REPO, local_dir_results=EVAL_RESULTS_PATH_BACKEND) |
|
return |
|
|
|
|
|
def request_to_result_name(request: EvalRequest) -> str: |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
org_and_model = request.model.split("/", 1) |
|
if len(org_and_model) == 1: |
|
model = org_and_model[0] |
|
res = f"{model}_{request.precision}" |
|
else: |
|
org = org_and_model[0] |
|
model = org_and_model[1] |
|
res = f"{org}_{model}_{request.precision}" |
|
return res |
|
|
|
|
|
def process_evaluation(task: Task, eval_request: EvalRequest) -> dict: |
|
results = run_evaluation(eval_request=eval_request, task_names=[task.benchmark], num_fewshot=task.num_fewshot, |
|
batch_size=1, device=DEVICE, use_cache=None, limit=LIMIT) |
|
|
|
dumped = json.dumps(results, indent=2) |
|
print(dumped) |
|
|
|
output_path = os.path.join(EVAL_RESULTS_PATH_BACKEND, *eval_request.model.split("/"), f"results_{datetime.now()}.json") |
|
os.makedirs(os.path.dirname(output_path), exist_ok=True) |
|
with open(output_path, "w") as f: |
|
f.write(dumped) |
|
|
|
my_snapshot_download(repo_id=RESULTS_REPO, revision="main", local_dir=EVAL_RESULTS_PATH_BACKEND, repo_type="dataset", max_workers=60) |
|
API.upload_file(path_or_fileobj=output_path, path_in_repo=f"{eval_request.model}/results_{datetime.now()}.json", |
|
repo_id=RESULTS_REPO, repo_type="dataset") |
|
return results |
|
|
|
|
|
def process_finished_requests() -> bool: |
|
sanity_checks() |
|
|
|
current_finished_status = [FINISHED_STATUS] |
|
|
|
|
|
eval_requests: list[EvalRequest] = get_eval_requests(job_status=current_finished_status, hf_repo=QUEUE_REPO, local_dir=EVAL_REQUESTS_PATH_BACKEND) |
|
|
|
eval_requests: list[EvalRequest] = sort_models_by_priority(api=API, models=eval_requests) |
|
|
|
|
|
|
|
|
|
random.shuffle(eval_requests) |
|
|
|
from src.leaderboard.read_evals import get_raw_eval_results |
|
eval_results: list[EvalResult] = get_raw_eval_results(EVAL_RESULTS_PATH_BACKEND, EVAL_REQUESTS_PATH_BACKEND, True) |
|
|
|
result_name_to_request = {request_to_result_name(r): r for r in eval_requests} |
|
result_name_to_result = {r.eval_name: r for r in eval_results} |
|
|
|
for eval_request in eval_requests: |
|
result_name: str = request_to_result_name(eval_request) |
|
|
|
|
|
from typing import Optional |
|
eval_result: Optional[EvalResult] = result_name_to_result[result_name] if result_name in result_name_to_result else None |
|
|
|
task_lst = TASKS_HARNESS.copy() |
|
random.shuffle(task_lst) |
|
|
|
|
|
for task in task_lst: |
|
task_name = task.benchmark |
|
|
|
if eval_result is None or task_name not in eval_result.results: |
|
eval_request: EvalRequest = result_name_to_request[result_name] |
|
|
|
my_snapshot_download(repo_id=QUEUE_REPO, revision="main", local_dir=EVAL_REQUESTS_PATH_BACKEND, repo_type="dataset", max_workers=60) |
|
set_eval_request(api=API, eval_request=eval_request, set_to_status=RUNNING_STATUS, hf_repo=QUEUE_REPO, |
|
local_dir=EVAL_REQUESTS_PATH_BACKEND) |
|
|
|
results = process_evaluation(task, eval_request) |
|
|
|
my_snapshot_download(repo_id=QUEUE_REPO, revision="main", local_dir=EVAL_REQUESTS_PATH_BACKEND, repo_type="dataset", max_workers=60) |
|
set_eval_request(api=API, eval_request=eval_request, set_to_status=FINISHED_STATUS, hf_repo=QUEUE_REPO, |
|
local_dir=EVAL_REQUESTS_PATH_BACKEND) |
|
|
|
return True |
|
|
|
return False |
|
|
|
|
|
def process_pending_requests() -> bool: |
|
sanity_checks() |
|
|
|
current_pending_status = [PENDING_STATUS] |
|
|
|
|
|
eval_requests = get_eval_requests(job_status=current_pending_status, hf_repo=QUEUE_REPO, local_dir=EVAL_REQUESTS_PATH_BACKEND) |
|
|
|
eval_requests = sort_models_by_priority(api=API, models=eval_requests) |
|
|
|
random.shuffle(eval_requests) |
|
|
|
print(f"Found {len(eval_requests)} {','.join(current_pending_status)} eval requests") |
|
|
|
if len(eval_requests) == 0: |
|
return False |
|
|
|
eval_request = eval_requests[0] |
|
pp.pprint(eval_request) |
|
|
|
my_snapshot_download(repo_id=QUEUE_REPO, revision="main", local_dir=EVAL_REQUESTS_PATH_BACKEND, repo_type="dataset", max_workers=60) |
|
set_eval_request(api=API, eval_request=eval_request, set_to_status=RUNNING_STATUS, hf_repo=QUEUE_REPO, |
|
local_dir=EVAL_REQUESTS_PATH_BACKEND) |
|
|
|
task_lst = TASKS_HARNESS.copy() |
|
random.shuffle(task_lst) |
|
|
|
for task in task_lst: |
|
results = process_evaluation(task, eval_request) |
|
|
|
my_snapshot_download(repo_id=QUEUE_REPO, revision="main", local_dir=EVAL_REQUESTS_PATH_BACKEND, repo_type="dataset", max_workers=60) |
|
set_eval_request(api=API, eval_request=eval_request, set_to_status=FINISHED_STATUS, hf_repo=QUEUE_REPO, |
|
local_dir=EVAL_REQUESTS_PATH_BACKEND) |
|
|
|
return True |
|
|
|
|
|
if __name__ == "__main__": |
|
|
|
res = process_pending_requests() |
|
|
|
if res is False: |
|
res = process_finished_requests() |
|
|