from os import listdir from os.path import isfile, join import html import gradio as gr import pipe from io_utils import get_logs_file LOG_PATH = "./tmp" CONFIG_PATH = "./cicd/configs/" MAX_FILES_NUM = 20 def get_accordions_of_files(path, files): components = [None for _ in range(0, MAX_FILES_NUM)] for i in range(0, len(files)): if i >= MAX_FILES_NUM: break with open(join(path, files[i]), "r") as f: components[i] = f.read() return components def get_accordions_of_log_files(): log_files = [ f for f in listdir(LOG_PATH) if isfile(join(LOG_PATH, f)) and f.endswith("_log") ] return get_accordions_of_files(LOG_PATH, log_files) def get_accordions_of_config_files(): config_files = [ f for f in listdir(CONFIG_PATH) if isfile(join(CONFIG_PATH, f)) and f.endswith(".yaml") ] return get_accordions_of_files(CONFIG_PATH, config_files) def get_config_files(): config_files = [ join(CONFIG_PATH, f) for f in listdir(CONFIG_PATH) if isfile(join(CONFIG_PATH, f)) and f.endswith(".yaml") ] return config_files def get_log_files(): return [ join(LOG_PATH, f) for f in listdir(LOG_PATH) if isfile(join(LOG_PATH, f)) and f.endswith("log") ] def get_jobs_info_in_queue(): return [f"⌛️job id {html.escape(job[0])}: {html.escape(job[2])}
" for job in pipe.jobs] def get_queue_status(): if len(pipe.jobs) > 0 or pipe.current is not None: current = pipe.current if current is None: current = "None" return f'
Current job: {html.escape(current)}
Jobs in queue:
{"".join(get_jobs_info_in_queue())}
' else: return '
No jobs in queue, please submit an evaluation task from another tab.
' def get_demo(): with gr.Row(): gr.HTML( value=get_queue_status, every=5, ) with gr.Accordion(label="Log Files", open=False): with gr.Row(): gr.Files(value=get_log_files, label="Log Files", every=10) with gr.Row(): gr.Textbox( value=get_logs_file, every=0.5, lines=10, visible=True, label="Current Log File" ) with gr.Accordion(label="Config Files", open=False): gr.Files(value=get_config_files, label="Config Files", every=10)