#!/usr/bin/env python import os import json import random from datetime import datetime from src.backend.run_eval_suite import run_evaluation from src.backend.manage_requests import check_completed_evals, get_eval_requests, set_eval_request from src.backend.sort_queue import sort_models_by_priority from src.backend.envs import EVAL_REQUESTS_PATH_BACKEND, EVAL_RESULTS_PATH_BACKEND, DEVICE, LIMIT, Tasks, Task, num_fewshots from src.backend.manage_requests import EvalRequest from src.leaderboard.read_evals import EvalResult from src.envs import QUEUE_REPO, RESULTS_REPO, API from src.utils import my_snapshot_download import time import logging import pprint import argparse # def get_subdirectories(path): # subdirectories = [] # # Get all entries in the directory # entries = os.listdir(path) # for entry in entries: # # Check if the entry is a directory # if os.path.isdir(os.path.join(path, entry)): # subdirectories.append(entry) # return subdirectories # parser = argparse.ArgumentParser(description="Get subdirectory names") # parser.add_argument("include_path", help="Path to the directory", nargs='?', default=None) # args = parser.parse_args() # # = get_subdirectories(args.include_path) def my_set_eval_request(api, eval_request, set_to_status, hf_repo, local_dir): for i in range(10): try: set_eval_request(api=api, eval_request=eval_request, set_to_status=set_to_status, hf_repo=hf_repo, local_dir=local_dir) return except Exception: time.sleep(60) return logging.getLogger("openai").setLevel(logging.WARNING) logging.basicConfig(level=logging.ERROR) pp = pprint.PrettyPrinter(width=80) PENDING_STATUS = "PENDING" RUNNING_STATUS = "RUNNING" FINISHED_STATUS = "FINISHED" FAILED_STATUS = "FAILED" TASKS_HARNESS = [task.value for task in Tasks] # starts by downloading results and requests. makes sense since we want to be able to use different backend servers! my_snapshot_download(repo_id=RESULTS_REPO, revision="main", local_dir=EVAL_RESULTS_PATH_BACKEND, repo_type="dataset", max_workers=60) my_snapshot_download(repo_id=QUEUE_REPO, revision="main", local_dir=EVAL_REQUESTS_PATH_BACKEND, repo_type="dataset", max_workers=60) def sanity_checks(): print(f'Device: {DEVICE}') # pull the eval dataset from the hub and parse any eval requests # check completed evals and set them to finished my_snapshot_download(repo_id=QUEUE_REPO, revision="main", local_dir=EVAL_REQUESTS_PATH_BACKEND, repo_type="dataset", max_workers=60) check_completed_evals(api=API, checked_status=RUNNING_STATUS, completed_status=FINISHED_STATUS, failed_status=FAILED_STATUS, hf_repo=QUEUE_REPO, local_dir=EVAL_REQUESTS_PATH_BACKEND, hf_repo_results=RESULTS_REPO, local_dir_results=EVAL_RESULTS_PATH_BACKEND) return def request_to_result_name(request: EvalRequest) -> str: org_and_model = request.model.split("/", 1) if len(org_and_model) == 1: model = org_and_model[0] res = f"{model}_{request.precision}" else: org = org_and_model[0] model = org_and_model[1] res = f"{org}_{model}_{request.precision}" return res # doesn't make distinctions for tasks since the original code runs eval on ALL tasks. def process_evaluation(task_name: str, eval_request: EvalRequest) -> dict: # batch_size = 1 batch_size = "auto" # might not have to get the benchmark. print(f"task_name parameter in process_evaluation() = {task_name}") #, task_names=[task.benchmark] = {[task.benchmark]}") num_fewshot = num_fewshots[task_name] results = run_evaluation(eval_request=eval_request, task_names=task_name, num_fewshot=num_fewshot, batch_size=batch_size, device=DEVICE, use_cache=None, limit=LIMIT) print('RESULTS', results) dumped = json.dumps(results, indent=2, default=lambda o: '') print(dumped) output_path = os.path.join(EVAL_RESULTS_PATH_BACKEND, *eval_request.model.split("/"), f"results_{task_name}_{datetime.now()}.json") os.makedirs(os.path.dirname(output_path), exist_ok=True) with open(output_path, "w") as f: f.write(dumped) my_snapshot_download(repo_id=RESULTS_REPO, revision="main", local_dir=EVAL_RESULTS_PATH_BACKEND, repo_type="dataset", max_workers=60) API.upload_file(path_or_fileobj=output_path, path_in_repo=f"{eval_request.model}/results_{task_name}_{datetime.now()}.json", repo_id=RESULTS_REPO, repo_type="dataset") return results # the rendering is done with files in local repo. def process_pending_requests() -> bool: sanity_checks() current_pending_status = [PENDING_STATUS] # Get all eval request that are PENDING, if you want to run other evals, change this parameter # GETTING REQUESTS FROM THE HUB NOT LOCAL DIR. eval_requests = get_eval_requests(job_status=current_pending_status, hf_repo=QUEUE_REPO, local_dir=EVAL_REQUESTS_PATH_BACKEND) # Sort the evals by priority (first submitted first run) eval_requests = sort_models_by_priority(api=API, models=eval_requests) random.shuffle(eval_requests) # this says zero print(f"Found {len(eval_requests)} {','.join(current_pending_status)} eval requests") if len(eval_requests) == 0: return False eval_request = eval_requests[0] pp.pprint(eval_request) my_snapshot_download(repo_id=QUEUE_REPO, revision="main", local_dir=EVAL_REQUESTS_PATH_BACKEND, repo_type="dataset", max_workers=60) my_set_eval_request(api=API, eval_request=eval_request, set_to_status=RUNNING_STATUS, hf_repo=QUEUE_REPO, local_dir=EVAL_REQUESTS_PATH_BACKEND) # task_lst = TASKS_HARNESS.copy() task_lst = eval_request.get_user_requested_task_names() random.shuffle(task_lst) print(f"task_lst in process_pending_requests(): {task_lst}") for task_name in task_lst: results = process_evaluation(task_name, eval_request) my_snapshot_download(repo_id=QUEUE_REPO, revision="main", local_dir=EVAL_REQUESTS_PATH_BACKEND, repo_type="dataset", max_workers=60) my_set_eval_request(api=API, eval_request=eval_request, set_to_status=FINISHED_STATUS, hf_repo=QUEUE_REPO, local_dir=EVAL_REQUESTS_PATH_BACKEND) return True if __name__ == "__main__": # wait = True # import socket # if socket.gethostname() in {'hamburg'} or os.path.isdir("/home/pminervi"): # wait = False # if wait: # time.sleep(60 * random.randint(2, 5)) # pass # res = False res = process_pending_requests() # if res is False: # res = process_finished_requests(100) # if res is False: # res = process_finished_requests(0)