|
import os |
|
import inspect |
|
import base64 |
|
import yaml |
|
import copy |
|
import shutil |
|
import gradio as gr |
|
from data_juicer.ops.base_op import OPERATORS |
|
from data_juicer.utils.constant import Fields |
|
demo_path = os.path.dirname(os.path.abspath(__file__)) |
|
project_path = os.path.dirname(os.path.dirname(demo_path)) |
|
|
|
|
|
|
|
def covert_image_to_base64(image_path): |
|
|
|
ext = image_path.split(".")[-1] |
|
if ext not in ["gif", "jpeg", "png"]: |
|
ext = "jpeg" |
|
|
|
with open(image_path, "rb") as image_file: |
|
|
|
encoded_string = base64.b64encode(image_file.read()) |
|
|
|
|
|
base64_data = encoded_string.decode("utf-8") |
|
|
|
|
|
base64_url = f"data:image/{ext};base64,{base64_data}" |
|
return base64_url |
|
|
|
|
|
def format_cover_html(project_img_path): |
|
readme_link = 'https://github.com/alibaba/data-juicer' |
|
config = { |
|
'name': "Data-Juicer", |
|
'label': "Op Insight", |
|
'description': f'A One-Stop Data Processing System for Large Language Models.', |
|
'introduction': |
|
"This project is being actively updated and maintained, and we will periodically enhance and add more features and data recipes. <br>" |
|
"We welcome you to join us in promoting LLM data development and research!<br>", |
|
'demo':"You can experience the effect of the operators of Data-Juicer", |
|
'note':'Note: Due to resource limitations, only a subset of operators is available here. see more details in <a href="{readme_link}">GitHub</a>' |
|
} |
|
|
|
|
|
|
|
return f""" |
|
<div> |
|
<div class="project_name">{config.get("name", "")} </div> |
|
<div class="project_desc">{config.get("description", "")}</div> |
|
<div class="project_desc">{config.get("introduction", "")}</div> |
|
<div class="project_desc">{config.get("demo", "")}</div> |
|
<div class="project_desc">{config.get("note", "")}</div> |
|
</div> |
|
""" |
|
op_text = '' |
|
docs_file = os.path.join(project_path, 'docs/Operators.md') |
|
if os.path.exists(docs_file): |
|
with open(os.path.join(project_path, 'docs/Operators.md'), 'r') as f: |
|
op_text = f.read() |
|
|
|
def extract_op_desc(markdown_text, header): |
|
start_index = markdown_text.find(header) |
|
end_index = markdown_text.find("\n##", start_index + len(header)) |
|
return markdown_text[start_index+ len(header):end_index].strip() |
|
|
|
op_desc = f"<div style='text-align: center;'>{extract_op_desc(op_text, '## Overview').split('All the specific ')[0].strip()}</div>" |
|
op_list_desc = { |
|
'mapper':extract_op_desc(op_text, '## Mapper <a name="mapper"/>'), |
|
'filter':extract_op_desc(op_text, '## Filter <a name="filter"/>'), |
|
'deduplicator':extract_op_desc(op_text, '## Deduplicator <a name="deduplicator"/>'), |
|
'selector':extract_op_desc(op_text, '## Selector <a name="selector"/>'), |
|
} |
|
|
|
op_types = ['mapper', 'filter',] |
|
local_ops_dict = {op_type:[] for op_type in op_types} |
|
multimodal = os.getenv('MULTI_MODAL', True) |
|
multimodal_visible = False |
|
text_key = 'text' |
|
image_key = 'images' |
|
audio_key = 'audios' |
|
video_key = 'videos' |
|
|
|
def get_op_lists(op_type): |
|
use_local_op = os.getenv('USE_LOCAL_OP', False) |
|
if not use_local_op: |
|
all_ops = list(OPERATORS.modules.keys()) |
|
options = [ |
|
name for name in all_ops if name.endswith(op_type) |
|
] |
|
else: |
|
options = local_ops_dict.get(op_type, []) |
|
|
|
for exclude in ['image', 'video', 'audio']: |
|
options = [name for name in options if multimodal or exclude not in name] |
|
return options |
|
|
|
def show_code(op_name): |
|
op_class = OPERATORS.modules[op_name] |
|
text = inspect.getsourcelines(op_class) |
|
|
|
init_signature = inspect.signature(op_class.__init__) |
|
|
|
|
|
default_params = dict() |
|
for name, parameter in init_signature.parameters.items(): |
|
if name in ['self', 'args', 'kwargs']: |
|
continue |
|
if parameter.default is not inspect.Parameter.empty: |
|
default_params[name] = parameter.default |
|
|
|
return ''.join(text[0]), yaml.dump(default_params) |
|
|
|
def change_visible(op_name): |
|
text_visible = True |
|
video_visible = False |
|
audio_visible = False |
|
image_visible = False |
|
if 'video' in op_name: |
|
video_visible = True |
|
elif 'audio' in op_name: |
|
audio_visible = True |
|
elif 'image' in op_name: |
|
image_visible = True |
|
return gr.update(visible=text_visible), gr.update(visible=image_visible), gr.update(visible=video_visible), gr.update(visible=audio_visible), gr.update(visible=text_visible), gr.update(visible=image_visible), gr.update(visible=video_visible), gr.update(visible=audio_visible) |
|
|
|
def copy_func(file): |
|
filename = None |
|
if file: |
|
filename= os.path.basename(file) |
|
shutil.copyfile(file, filename) |
|
return filename |
|
|
|
def encode_sample(input_text, input_image, input_video, input_audio): |
|
sample = dict() |
|
sample[text_key]=input_text |
|
sample[image_key]= [input_image] if input_image else [] |
|
sample[video_key]=[input_video] if input_video else [] |
|
sample[audio_key]=[input_audio] if input_audio else [] |
|
return sample |
|
|
|
def decode_sample(output_sample): |
|
output_text = output_sample[text_key] |
|
output_image = output_sample[image_key][0] if output_sample[image_key] else None |
|
output_video = output_sample[video_key][0] if output_sample[video_key] else None |
|
output_audio = output_sample[audio_key][0] if output_sample[audio_key] else None |
|
image_file = copy_func(output_image) |
|
video_file = copy_func(output_video) |
|
audio_file = copy_func(output_audio) |
|
return output_text, image_file, video_file, audio_file |
|
|
|
def create_tab_layout(op_tab, op_type, run_op, has_stats=False): |
|
with op_tab: |
|
options = get_op_lists(op_type) |
|
label = f'Select a {op_type} to show details' |
|
with gr.Row(): |
|
op_selector = gr.Dropdown(value=options[0], label=label, choices=options, interactive=True) |
|
with gr.Column(): |
|
gr.Markdown(" **Op Parameters**") |
|
op_params = gr.Code(label="Yaml",language='yaml', interactive=True) |
|
run_button = gr.Button(value="🚀Run") |
|
show_code_button = gr.Button(value="🔍Show Code") |
|
|
|
with gr.Column(): |
|
with gr.Group('Inputs'): |
|
gr.Markdown(" **Inputs**") |
|
with gr.Row(): |
|
input_text = gr.TextArea(label="Text",interactive=True,) |
|
input_image = gr.Image(label='Image', type='filepath', visible=multimodal_visible, elem_classes="show_image") |
|
input_video = gr.Video(label='Video', visible=multimodal_visible) |
|
input_audio = gr.Audio(label='Audio', type='filepath', visible=multimodal_visible) |
|
|
|
with gr.Group('Outputs'): |
|
gr.Markdown(" **Outputs**") |
|
with gr.Row(): |
|
output_text = gr.TextArea(label="Text",interactive=False,) |
|
output_image = gr.Image(label='Image', type='filepath', visible=multimodal_visible, elem_classes="show_image") |
|
output_video = gr.Video(label='Video', visible=multimodal_visible) |
|
output_audio = gr.Audio(label='Audio', type='filepath', visible=multimodal_visible) |
|
|
|
with gr.Row(): |
|
if has_stats: |
|
output_stats = gr.Json(label='Stats') |
|
output_keep = gr.Text(label='Keep or not?', interactive=False) |
|
|
|
code = gr.Code(label='Source', language='python') |
|
inputs = [input_text, input_image, input_video, input_audio, op_selector, op_params] |
|
outputs = [output_text, output_image, output_video, output_audio] |
|
if has_stats: |
|
outputs.append(output_stats) |
|
outputs.append(output_keep) |
|
|
|
def run_func(*args): |
|
try: |
|
try: |
|
args = list(args) |
|
op_params = args.pop() |
|
params = yaml.safe_load(op_params) |
|
except: |
|
params = {} |
|
if params is None: |
|
params = {} |
|
return run_op(*args, params) |
|
except Exception as e: |
|
gr.Error(str(e)) |
|
print(e) |
|
return outputs |
|
|
|
show_code_button.click(show_code, inputs=[op_selector], outputs=[code, op_params]) |
|
show_code_button.click(change_visible, inputs=[op_selector], outputs=outputs[:4] + inputs[:4]) |
|
run_button.click(run_func, inputs=inputs, outputs=outputs) |
|
run_button.click(change_visible, inputs=[op_selector], outputs=outputs[:4] + inputs[:4]) |
|
op_selector.select(show_code, inputs=[op_selector], outputs=[code, op_params]) |
|
op_selector.select(change_visible, inputs=[op_selector], outputs=outputs[:4] + inputs[:4]) |
|
op_tab.select(change_visible, inputs=[op_selector], outputs=outputs[:4] + inputs[:4]) |
|
|
|
def create_mapper_tab(op_type, op_tab): |
|
with op_tab: |
|
def run_op(input_text, input_image, input_video, input_audio, op_name, op_params): |
|
op_class = OPERATORS.modules[op_name] |
|
op = op_class(**op_params) |
|
sample = encode_sample(input_text, input_image, input_video, input_audio) |
|
output_sample = op.process(copy.deepcopy(sample)) |
|
return decode_sample(output_sample) |
|
create_tab_layout(op_tab, op_type, run_op) |
|
|
|
|
|
def create_filter_tab(op_type, op_tab): |
|
def run_op(input_text, input_image, input_video, input_audio, op_name, op_params): |
|
op_class = OPERATORS.modules[op_name] |
|
op = op_class(**op_params) |
|
sample = encode_sample(input_text, input_image, input_video, input_audio) |
|
sample[Fields.stats] = dict() |
|
output_sample = op.compute_stats(copy.deepcopy(sample)) |
|
if op.process(output_sample): |
|
output_keep = 'Yes' |
|
else: |
|
output_keep = 'No' |
|
output_stats = output_sample[Fields.stats] |
|
return *decode_sample(output_sample), output_stats, output_keep |
|
create_tab_layout(op_tab, op_type, run_op, has_stats=True) |
|
|
|
|
|
def create_deduplicator_tab(op_type, op_tab): |
|
with op_tab: |
|
def run_op( input_text, input_image, input_video, input_audio, op_name, op_params): |
|
op_class = OPERATORS.modules[op_name] |
|
op = op_class(**op_params) |
|
sample = encode_sample(input_text, input_image, input_video, input_audio) |
|
output_sample = sample |
|
return decode_sample(output_sample) |
|
create_tab_layout(op_tab, op_type, run_op, has_stats=True) |
|
|
|
def create_tab_double_layout(op_tab, op_type, run_op): |
|
with op_tab: |
|
options = get_op_lists(op_type) |
|
label = f'Select a {op_type} to show details' |
|
with gr.Row(): |
|
op_selector = gr.Dropdown(value=options[0], label=label, choices=options, interactive=True) |
|
with gr.Column(): |
|
gr.Markdown(" **Op Parameters**") |
|
op_params = gr.Code(label="Yaml",language='yaml', interactive=True) |
|
run_button = gr.Button(value="🚀Run") |
|
show_code_button = gr.Button(value="🔍Show Code") |
|
|
|
with gr.Column(): |
|
with gr.Group('Inputs'): |
|
gr.Markdown(" **Inputs**") |
|
with gr.Row(): |
|
input_text = gr.TextArea(label="Text",interactive=True,) |
|
input_text2 = gr.TextArea(label="Text",interactive=True,) |
|
input_image = gr.Image(label='Image', type='filepath', visible=multimodal_visible, elem_classes="show_image") |
|
input_image2 = gr.Image(label='Image', type='filepath', visible=multimodal_visible, elem_classes="show_image") |
|
input_video = gr.Video(label='Video', visible=multimodal_visible) |
|
input_video2 = gr.Video(label='Video', visible=multimodal_visible) |
|
input_audio = gr.Audio(label='Audio', type='filepath', visible=multimodal_visible) |
|
input_audio2 = gr.Audio(label='Audio', type='filepath', visible=multimodal_visible) |
|
|
|
with gr.Group('Outputs'): |
|
gr.Markdown(" **Outputs**") |
|
with gr.Row(): |
|
output_text = gr.TextArea(label="Text",interactive=False,) |
|
output_text2 = gr.TextArea(label="Text",interactive=False,) |
|
output_image = gr.Image(label='Image', type='filepath', visible=multimodal_visible, elem_classes="show_image") |
|
output_image2 = gr.Image(label='Image', type='filepath', visible=multimodal_visible, elem_classes="show_image") |
|
output_video = gr.Video(label='Video', visible=multimodal_visible) |
|
output_video2 = gr.Video(label='Video', visible=multimodal_visible) |
|
output_audio = gr.Audio(label='Audio', type='filepath', visible=multimodal_visible) |
|
output_audio2 = gr.Audio(label='Audio', type='filepath', visible=multimodal_visible) |
|
|
|
code = gr.Code(label='Source', language='python') |
|
inputs = [input_text, input_image, input_video, input_audio, input_text2, input_image2, input_video2, input_audio2, op_selector, op_params] |
|
outputs = [output_text, output_image, output_video, output_audio, output_text2, output_image2, output_video2, output_audio2] |
|
|
|
def run_func(*args): |
|
try: |
|
try: |
|
op_params = args[-1] |
|
params = yaml.safe_load(op_params) |
|
except: |
|
params = {} |
|
if params is None: |
|
params = {} |
|
return run_op(input_text, input_image, input_video, input_audio, op_selector, params) |
|
except Exception as e: |
|
gr.Error(str(e)) |
|
return outputs |
|
|
|
|
|
|
|
|
|
|
|
|
|
show_code_button.click(change_visible, inputs=[op_selector], outputs=outputs[:4] + inputs[:4]).then(show_code, inputs=[op_selector], outputs=[code, op_params]) |
|
run_button.click(change_visible, inputs=[op_selector], outputs=outputs[:4] + inputs[:4]).then(run_func, inputs=[op_selector], outputs=[code, op_params]) |
|
op_selector.select(change_visible, inputs=[op_selector], outputs=outputs[:4] + inputs[:4]).then(show_code, inputs=[op_selector], outputs=[code, op_params]) |
|
op_tab.select(change_visible, inputs=[op_selector], outputs=outputs[:4] + inputs[:4]) |
|
with gr.Blocks(css="./app.css") as demo: |
|
|
|
dj_image = os.path.join(project_path, 'docs/imgs/data-juicer.jpg') |
|
gr.HTML(format_cover_html(dj_image)) |
|
|
|
with gr.Accordion(label='Op Insight',open=True): |
|
tabs = gr.Tabs() |
|
with tabs: |
|
op_tabs = {op_type: gr.Tab(label=op_type.capitalize() + 's') for op_type in op_types} |
|
for op_type, op_tab in op_tabs.items(): |
|
create_op_tab_func = globals().get(f'create_{op_type}_tab', None) |
|
if callable(create_op_tab_func): |
|
create_op_tab_func(op_type, op_tab) |
|
else: |
|
gr.Error(f'{op_type} not callable') |
|
|
|
demo.launch() |
|
|