import os import inspect import base64 import yaml import copy import shutil import gradio as gr from data_juicer.ops.base_op import OPERATORS from data_juicer.utils.constant import Fields demo_path = os.path.dirname(os.path.abspath(__file__)) project_path = os.path.dirname(os.path.dirname(demo_path)) # 图片本地路径转换为 base64 格式 def covert_image_to_base64(image_path): # 获得文件后缀名 ext = image_path.split(".")[-1] if ext not in ["gif", "jpeg", "png"]: ext = "jpeg" with open(image_path, "rb") as image_file: # Read the file encoded_string = base64.b64encode(image_file.read()) # Convert bytes to string base64_data = encoded_string.decode("utf-8") # 生成base64编码的地址 base64_url = f"data:image/{ext};base64,{base64_data}" return base64_url def format_cover_html(project_img_path): readme_link = 'https://github.com/alibaba/data-juicer' config = { 'name': "Data-Juicer", 'label': "Op Insight", 'description': f'A One-Stop Data Processing System for Large Language Models.', 'introduction': "This project is being actively updated and maintained, and we will periodically enhance and add more features and data recipes.
" "We welcome you to join us in promoting LLM data development and research!
", 'demo':"You can experience the effect of the operators of Data-Juicer", 'note':'Note: Due to resource limitations, only a subset of operators is available here. see more details in GitHub' } # image_src = covert_image_to_base64(project_img_path) #
#
return f"""
{config.get("name", "")}
{config.get("description", "")}
{config.get("introduction", "")}
{config.get("demo", "")}
{config.get("note", "")}
""" op_text = '' docs_file = os.path.join(project_path, 'docs/Operators.md') if os.path.exists(docs_file): with open(os.path.join(project_path, 'docs/Operators.md'), 'r') as f: op_text = f.read() def extract_op_desc(markdown_text, header): start_index = markdown_text.find(header) end_index = markdown_text.find("\n##", start_index + len(header)) return markdown_text[start_index+ len(header):end_index].strip() op_desc = f"
{extract_op_desc(op_text, '## Overview').split('All the specific ')[0].strip()}
" op_list_desc = { 'mapper':extract_op_desc(op_text, '## Mapper '), 'filter':extract_op_desc(op_text, '## Filter '), 'deduplicator':extract_op_desc(op_text, '## Deduplicator '), 'selector':extract_op_desc(op_text, '## Selector '), } op_types = ['mapper', 'filter',]# 'deduplicator'] , 'selector'] local_ops_dict = {op_type:[] for op_type in op_types} multimodal = os.getenv('MULTI_MODAL', True) multimodal_visible = False text_key = 'text' image_key = 'images' audio_key = 'audios' video_key = 'videos' def get_op_lists(op_type): use_local_op = os.getenv('USE_LOCAL_OP', False) if not use_local_op: all_ops = list(OPERATORS.modules.keys()) options = [ name for name in all_ops if name.endswith(op_type) ] else: options = local_ops_dict.get(op_type, []) for exclude in ['image', 'video', 'audio']: options = [name for name in options if multimodal or exclude not in name] return options def show_code(op_name): op_class = OPERATORS.modules[op_name] text = inspect.getsourcelines(op_class) init_signature = inspect.signature(op_class.__init__) # 输出每个参数的名字和默认值 default_params = dict() for name, parameter in init_signature.parameters.items(): if name in ['self', 'args', 'kwargs']: continue # 跳过 'self' 参数 if parameter.default is not inspect.Parameter.empty: default_params[name] = parameter.default return ''.join(text[0]), yaml.dump(default_params) def change_visible(op_name): text_visible = True video_visible = False audio_visible = False image_visible = False if 'video' in op_name: video_visible = True elif 'audio' in op_name: audio_visible = True elif 'image' in op_name: image_visible = True return gr.update(visible=text_visible), gr.update(visible=image_visible), gr.update(visible=video_visible), gr.update(visible=audio_visible), gr.update(visible=text_visible), gr.update(visible=image_visible), gr.update(visible=video_visible), gr.update(visible=audio_visible) def copy_func(file): filename = None if file: filename= os.path.basename(file) shutil.copyfile(file, filename) return filename def encode_sample(input_text, input_image, input_video, input_audio): sample = dict() sample[text_key]=input_text sample[image_key]= [input_image] if input_image else [] sample[video_key]=[input_video] if input_video else [] sample[audio_key]=[input_audio] if input_audio else [] return sample def decode_sample(output_sample): output_text = output_sample[text_key] output_image = output_sample[image_key][0] if output_sample[image_key] else None output_video = output_sample[video_key][0] if output_sample[video_key] else None output_audio = output_sample[audio_key][0] if output_sample[audio_key] else None image_file = copy_func(output_image) video_file = copy_func(output_video) audio_file = copy_func(output_audio) return output_text, image_file, video_file, audio_file def create_tab_layout(op_tab, op_type, run_op, has_stats=False): with op_tab: options = get_op_lists(op_type) label = f'Select a {op_type} to show details' with gr.Row(): op_selector = gr.Dropdown(value=options[0], label=label, choices=options, interactive=True) with gr.Column(): gr.Markdown(" **Op Parameters**") op_params = gr.Code(label="Yaml",language='yaml', interactive=True) run_button = gr.Button(value="🚀Run") show_code_button = gr.Button(value="🔍Show Code") with gr.Column(): with gr.Group('Inputs'): gr.Markdown(" **Inputs**") with gr.Row(): input_text = gr.TextArea(label="Text",interactive=True,) input_image = gr.Image(label='Image', type='filepath', visible=multimodal_visible, elem_classes="show_image") input_video = gr.Video(label='Video', visible=multimodal_visible) input_audio = gr.Audio(label='Audio', type='filepath', visible=multimodal_visible) with gr.Group('Outputs'): gr.Markdown(" **Outputs**") with gr.Row(): output_text = gr.TextArea(label="Text",interactive=False,) output_image = gr.Image(label='Image', type='filepath', visible=multimodal_visible, elem_classes="show_image") output_video = gr.Video(label='Video', visible=multimodal_visible) output_audio = gr.Audio(label='Audio', type='filepath', visible=multimodal_visible) with gr.Row(): if has_stats: output_stats = gr.Json(label='Stats') output_keep = gr.Text(label='Keep or not?', interactive=False) code = gr.Code(label='Source', language='python') inputs = [input_text, input_image, input_video, input_audio, op_selector, op_params] outputs = [output_text, output_image, output_video, output_audio] if has_stats: outputs.append(output_stats) outputs.append(output_keep) def run_func(*args): try: try: args = list(args) op_params = args.pop() params = yaml.safe_load(op_params) except: params = {} if params is None: params = {} return run_op(*args, params) except Exception as e: gr.Error(str(e)) print(e) return outputs show_code_button.click(show_code, inputs=[op_selector], outputs=[code, op_params]) show_code_button.click(change_visible, inputs=[op_selector], outputs=outputs[:4] + inputs[:4]) run_button.click(run_func, inputs=inputs, outputs=outputs) run_button.click(change_visible, inputs=[op_selector], outputs=outputs[:4] + inputs[:4]) op_selector.select(show_code, inputs=[op_selector], outputs=[code, op_params]) op_selector.select(change_visible, inputs=[op_selector], outputs=outputs[:4] + inputs[:4]) op_tab.select(change_visible, inputs=[op_selector], outputs=outputs[:4] + inputs[:4]) def create_mapper_tab(op_type, op_tab): with op_tab: def run_op(input_text, input_image, input_video, input_audio, op_name, op_params): op_class = OPERATORS.modules[op_name] op = op_class(**op_params) sample = encode_sample(input_text, input_image, input_video, input_audio) output_sample = op.process(copy.deepcopy(sample)) return decode_sample(output_sample) create_tab_layout(op_tab, op_type, run_op) def create_filter_tab(op_type, op_tab): def run_op(input_text, input_image, input_video, input_audio, op_name, op_params): op_class = OPERATORS.modules[op_name] op = op_class(**op_params) sample = encode_sample(input_text, input_image, input_video, input_audio) sample[Fields.stats] = dict() output_sample = op.compute_stats(copy.deepcopy(sample)) if op.process(output_sample): output_keep = 'Yes' else: output_keep = 'No' output_stats = output_sample[Fields.stats] return *decode_sample(output_sample), output_stats, output_keep create_tab_layout(op_tab, op_type, run_op, has_stats=True) def create_deduplicator_tab(op_type, op_tab): with op_tab: def run_op( input_text, input_image, input_video, input_audio, op_name, op_params): op_class = OPERATORS.modules[op_name] op = op_class(**op_params) sample = encode_sample(input_text, input_image, input_video, input_audio) output_sample = sample #op.compute_hash(copy.deepcopy(sample)) return decode_sample(output_sample) create_tab_layout(op_tab, op_type, run_op, has_stats=True) def create_tab_double_layout(op_tab, op_type, run_op): with op_tab: options = get_op_lists(op_type) label = f'Select a {op_type} to show details' with gr.Row(): op_selector = gr.Dropdown(value=options[0], label=label, choices=options, interactive=True) with gr.Column(): gr.Markdown(" **Op Parameters**") op_params = gr.Code(label="Yaml",language='yaml', interactive=True) run_button = gr.Button(value="🚀Run") show_code_button = gr.Button(value="🔍Show Code") with gr.Column(): with gr.Group('Inputs'): gr.Markdown(" **Inputs**") with gr.Row(): input_text = gr.TextArea(label="Text",interactive=True,) input_text2 = gr.TextArea(label="Text",interactive=True,) input_image = gr.Image(label='Image', type='filepath', visible=multimodal_visible, elem_classes="show_image") input_image2 = gr.Image(label='Image', type='filepath', visible=multimodal_visible, elem_classes="show_image") input_video = gr.Video(label='Video', visible=multimodal_visible) input_video2 = gr.Video(label='Video', visible=multimodal_visible) input_audio = gr.Audio(label='Audio', type='filepath', visible=multimodal_visible) input_audio2 = gr.Audio(label='Audio', type='filepath', visible=multimodal_visible) with gr.Group('Outputs'): gr.Markdown(" **Outputs**") with gr.Row(): output_text = gr.TextArea(label="Text",interactive=False,) output_text2 = gr.TextArea(label="Text",interactive=False,) output_image = gr.Image(label='Image', type='filepath', visible=multimodal_visible, elem_classes="show_image") output_image2 = gr.Image(label='Image', type='filepath', visible=multimodal_visible, elem_classes="show_image") output_video = gr.Video(label='Video', visible=multimodal_visible) output_video2 = gr.Video(label='Video', visible=multimodal_visible) output_audio = gr.Audio(label='Audio', type='filepath', visible=multimodal_visible) output_audio2 = gr.Audio(label='Audio', type='filepath', visible=multimodal_visible) code = gr.Code(label='Source', language='python') inputs = [input_text, input_image, input_video, input_audio, input_text2, input_image2, input_video2, input_audio2, op_selector, op_params] outputs = [output_text, output_image, output_video, output_audio, output_text2, output_image2, output_video2, output_audio2] def run_func(*args): try: try: op_params = args[-1] params = yaml.safe_load(op_params) except: params = {} if params is None: params = {} return run_op(input_text, input_image, input_video, input_audio, op_selector, params) except Exception as e: gr.Error(str(e)) return outputs # show_code_button.click(show_code, inputs=[op_selector], outputs=[code, op_params]) # show_code_button.click(change_visible, inputs=[op_selector], outputs=outputs[:4] + inputs[:4]) # run_button.click(run_func, inputs=inputs, outputs=outputs) # op_selector.select(show_code, inputs=[op_selector], outputs=[code, op_params]) # op_selector.select(change_visible, inputs=[op_selector], outputs=outputs[:4] + inputs[:4]) show_code_button.click(change_visible, inputs=[op_selector], outputs=outputs[:4] + inputs[:4]).then(show_code, inputs=[op_selector], outputs=[code, op_params]) run_button.click(change_visible, inputs=[op_selector], outputs=outputs[:4] + inputs[:4]).then(run_func, inputs=[op_selector], outputs=[code, op_params]) op_selector.select(change_visible, inputs=[op_selector], outputs=outputs[:4] + inputs[:4]).then(show_code, inputs=[op_selector], outputs=[code, op_params]) op_tab.select(change_visible, inputs=[op_selector], outputs=outputs[:4] + inputs[:4]) with gr.Blocks(css="./app.css") as demo: dj_image = os.path.join(project_path, 'docs/imgs/data-juicer.jpg') gr.HTML(format_cover_html(dj_image)) with gr.Accordion(label='Op Insight',open=True): tabs = gr.Tabs() with tabs: op_tabs = {op_type: gr.Tab(label=op_type.capitalize() + 's') for op_type in op_types} for op_type, op_tab in op_tabs.items(): create_op_tab_func = globals().get(f'create_{op_type}_tab', None) if callable(create_op_tab_func): create_op_tab_func(op_type, op_tab) else: gr.Error(f'{op_type} not callable') demo.launch()