|
import os |
|
|
|
os.environ["TOKENIZERS_PARALLELISM"] = "false" |
|
|
|
from PIL import Image, ImageDraw |
|
import traceback |
|
|
|
import gradio as gr |
|
|
|
import torch |
|
from docquery.pipeline import get_pipeline |
|
from docquery.document import load_bytes, load_document, ImageDocument |
|
|
|
|
|
def ensure_list(x): |
|
if isinstance(x, list): |
|
return x |
|
else: |
|
return [x] |
|
|
|
|
|
CHECKPOINTS = { |
|
"LayoutLMv1 🦉": "impira/layoutlm-document-qa", |
|
"Donut 🍩": "naver-clova-ix/donut-base-finetuned-docvqa", |
|
} |
|
|
|
PIPELINES = {} |
|
|
|
|
|
def construct_pipeline(model): |
|
global PIPELINES |
|
if model in PIPELINES: |
|
return PIPELINES[model] |
|
|
|
device = "cuda" if torch.cuda.is_available() else "cpu" |
|
ret = get_pipeline(checkpoint=CHECKPOINTS[model], device=device) |
|
PIPELINES[model] = ret |
|
return ret |
|
|
|
|
|
def run_pipeline(model, question, document, top_k): |
|
pipeline = construct_pipeline(model) |
|
return pipeline(question=question, **document.context, top_k=top_k) |
|
|
|
|
|
|
|
|
|
def lift_word_boxes(document, page): |
|
return document.context["image"][page][1] |
|
|
|
|
|
def expand_bbox(word_boxes): |
|
if len(word_boxes) == 0: |
|
return None |
|
|
|
min_x, min_y, max_x, max_y = zip(*[x[1] for x in word_boxes]) |
|
min_x, min_y, max_x, max_y = [min(min_x), min(min_y), max(max_x), max(max_y)] |
|
return [min_x, min_y, max_x, max_y] |
|
|
|
|
|
|
|
def normalize_bbox(box, width, height, padding=0.005): |
|
min_x, min_y, max_x, max_y = [c / 1000 for c in box] |
|
if padding != 0: |
|
min_x = max(0, min_x - padding) |
|
min_y = max(0, min_y - padding) |
|
max_x = min(max_x + padding, 1) |
|
max_y = min(max_y + padding, 1) |
|
return [min_x * width, min_y * height, max_x * width, max_y * height] |
|
|
|
|
|
examples = [ |
|
[ |
|
"invoice.png", |
|
"What is the invoice number?", |
|
], |
|
[ |
|
"contract.jpeg", |
|
"What is the purchase amount?", |
|
], |
|
[ |
|
"statement.png", |
|
"What are net sales for 2020?", |
|
], |
|
] |
|
|
|
|
|
def process_path(path): |
|
if path: |
|
try: |
|
document = load_document(path) |
|
return ( |
|
document, |
|
gr.update(visible=True, value=document.preview), |
|
gr.update(visible=True), |
|
gr.update(visible=False, value=None), |
|
gr.update(visible=False, value=None), |
|
) |
|
except Exception: |
|
traceback.print_exc() |
|
return ( |
|
None, |
|
gr.update(visible=False, value=None), |
|
gr.update(visible=False), |
|
gr.update(visible=False, value=None), |
|
gr.update(visible=False, value=None), |
|
) |
|
|
|
|
|
def process_upload(file): |
|
if file: |
|
return process_path(file.name) |
|
else: |
|
return ( |
|
None, |
|
gr.update(visible=False, value=None), |
|
gr.update(visible=False), |
|
gr.update(visible=False, value=None), |
|
gr.update(visible=False, value=None), |
|
) |
|
|
|
|
|
colors = ["#64A087", "green", "black"] |
|
|
|
|
|
def process_question(question, document, model=list(CHECKPOINTS.keys())[0]): |
|
if document is None: |
|
return None, None, None |
|
|
|
text_value = None |
|
predictions = run_pipeline(model, question, document, 3) |
|
pages = [x.copy().convert("RGB") for x in document.preview] |
|
for i, p in enumerate(ensure_list(predictions)): |
|
if i == 0: |
|
text_value = p["answer"] |
|
else: |
|
|
|
|
|
break |
|
|
|
if "start" in p and "end" in p: |
|
image = pages[p["page"]] |
|
draw = ImageDraw.Draw(image, "RGBA") |
|
x1, y1, x2, y2 = normalize_bbox( |
|
expand_bbox( |
|
lift_word_boxes(document, p["page"])[p["start"] : p["end"] + 1] |
|
), |
|
image.width, |
|
image.height, |
|
) |
|
draw.rectangle(((x1, y1), (x2, y2)), fill=(0, 255, 0, int(0.4 * 255))) |
|
|
|
return ( |
|
gr.update(visible=True, value=pages), |
|
gr.update(visible=True, value=predictions), |
|
gr.update( |
|
visible=True, |
|
value=text_value, |
|
), |
|
) |
|
|
|
|
|
def load_example_document(img, question, model): |
|
if img is not None: |
|
document = ImageDocument(Image.fromarray(img)) |
|
preview, answer, answer_text = process_question(question, document, model) |
|
return document, question, preview, gr.update(visible=True), answer, answer_text |
|
else: |
|
return None, None, None, gr.update(visible=False), None |
|
|
|
|
|
CSS = """ |
|
#question input { |
|
font-size: 16px; |
|
} |
|
#url-textbox { |
|
padding: 0 !important; |
|
} |
|
#short-upload-box .w-full { |
|
min-height: 10rem !important; |
|
} |
|
/* I think something like this can be used to re-shape |
|
* the table |
|
*/ |
|
/* |
|
.gr-samples-table tr { |
|
display: inline; |
|
} |
|
.gr-samples-table .p-2 { |
|
width: 100px; |
|
} |
|
*/ |
|
#select-a-file { |
|
width: 100%; |
|
} |
|
#file-clear { |
|
padding-top: 2px !important; |
|
padding-bottom: 2px !important; |
|
padding-left: 8px !important; |
|
padding-right: 8px !important; |
|
margin-top: 10px; |
|
} |
|
.gradio-container .gr-button-primary { |
|
background: linear-gradient(180deg, #CDF9BE 0%, #AFF497 100%); |
|
border: 1px solid #B0DCCC; |
|
border-radius: 8px; |
|
color: #1B8700; |
|
} |
|
.gradio-container.dark button#submit-button { |
|
background: linear-gradient(180deg, #CDF9BE 0%, #AFF497 100%); |
|
border: 1px solid #B0DCCC; |
|
border-radius: 8px; |
|
color: #1B8700 |
|
} |
|
|
|
table.gr-samples-table tr td { |
|
border: none; |
|
outline: none; |
|
} |
|
|
|
table.gr-samples-table tr td:first-of-type { |
|
width: 0%; |
|
} |
|
|
|
div#short-upload-box div.absolute { |
|
display: none !important; |
|
} |
|
|
|
gradio-app > div > div > div > div.w-full > div, .gradio-app > div > div > div > div.w-full > div { |
|
gap: 0px 2%; |
|
} |
|
|
|
gradio-app div div div div.w-full, .gradio-app div div div div.w-full { |
|
gap: 0px; |
|
} |
|
|
|
gradio-app h2, .gradio-app h2 { |
|
padding-top: 10px; |
|
} |
|
|
|
#answer { |
|
overflow-y: scroll; |
|
color: white; |
|
background: #666; |
|
border-color: #666; |
|
font-size: 20px; |
|
font-weight: bold; |
|
} |
|
|
|
#answer span { |
|
color: white; |
|
} |
|
|
|
#answer textarea { |
|
color:white; |
|
background: #777; |
|
border-color: #777; |
|
font-size: 18px; |
|
} |
|
""" |
|
|
|
with gr.Blocks(css=CSS) as demo: |
|
gr.Markdown("# DocQuery: Document Query Engine") |
|
gr.Markdown( |
|
"DocQuery (created by [Impira](https://impira.com)) uses LayoutLMv1 fine-tuned on DocVQA, a document visual question" |
|
" answering dataset, as well as SQuAD, which boosts its English-language comprehension." |
|
" To use it, simply upload an image or PDF, type a question, and click 'submit', or " |
|
" click one of the examples to load them." |
|
" DocQuery is MIT-licensed and available on [Github](https://github.com/impira/docquery)." |
|
) |
|
|
|
document = gr.Variable() |
|
example_question = gr.Textbox(visible=False) |
|
example_image = gr.Image(visible=False) |
|
|
|
with gr.Row(equal_height=True): |
|
with gr.Column(): |
|
with gr.Row(): |
|
gr.Markdown("## 1. Select a file", elem_id="select-a-file") |
|
img_clear_button = gr.Button( |
|
"Clear", variant="secondary", elem_id="file-clear", visible=False |
|
) |
|
image = gr.Gallery(visible=False) |
|
with gr.Row(equal_height=True): |
|
url = gr.Textbox( |
|
show_label=False, |
|
placeholder="URL", |
|
lines=1, |
|
max_lines=1, |
|
elem_id="url-textbox", |
|
) |
|
submit = gr.Button("Get") |
|
gr.Markdown("— or —") |
|
upload = gr.File( |
|
label=None, interactive=True, elem_id="short-upload-box" |
|
) |
|
gr.Examples( |
|
examples=examples, |
|
inputs=[example_image, example_question], |
|
) |
|
|
|
with gr.Column() as col: |
|
gr.Markdown("## 2. Ask a question") |
|
question = gr.Textbox( |
|
label="Question", |
|
placeholder="e.g. What is the invoice number?", |
|
lines=1, |
|
max_lines=1, |
|
) |
|
model = gr.Radio( |
|
choices=list(CHECKPOINTS.keys()), |
|
value=list(CHECKPOINTS.keys())[0], |
|
label="Model", |
|
) |
|
|
|
with gr.Row(): |
|
clear_button = gr.Button("Clear", variant="secondary") |
|
submit_button = gr.Button( |
|
"Submit", variant="primary", elem_id="submit-button" |
|
) |
|
with gr.Column(): |
|
output_text = gr.Textbox(label="Top Answer", visible=False, elem_id="answer") |
|
output = gr.JSON(label="Output", visible=False) |
|
|
|
img_clear_button.click( |
|
lambda _: ( |
|
gr.update(visible=False, value=None), |
|
None, |
|
None, |
|
gr.update(visible=False, value=None), |
|
gr.update(visible=False, value=None), |
|
gr.update(visible=False), |
|
None, |
|
None, |
|
None, |
|
None, |
|
), |
|
inputs=img_clear_button, |
|
outputs=[ |
|
image, |
|
document, |
|
output, |
|
output_text, |
|
img_clear_button, |
|
example_image, |
|
upload, |
|
url, |
|
question, |
|
], |
|
) |
|
clear_button.click( |
|
lambda _: ( |
|
gr.update(visible=False, value=None), |
|
None, |
|
None, |
|
gr.update(visible=False, value=None), |
|
gr.update(visible=False, value=None), |
|
None, |
|
None, |
|
None, |
|
None, |
|
), |
|
inputs=clear_button, |
|
outputs=[ |
|
image, |
|
document, |
|
question, |
|
output, |
|
output_text, |
|
example_image, |
|
upload, |
|
url, |
|
question, |
|
], |
|
) |
|
|
|
upload.change( |
|
fn=process_upload, |
|
inputs=[upload], |
|
outputs=[document, image, img_clear_button, output, output_text], |
|
) |
|
submit.click( |
|
fn=process_path, |
|
inputs=[url], |
|
outputs=[document, image, img_clear_button, output, output_text], |
|
) |
|
|
|
question.submit( |
|
fn=process_question, |
|
inputs=[question, document, model], |
|
outputs=[image, output, output_text], |
|
) |
|
|
|
submit_button.click( |
|
process_question, |
|
inputs=[question, document, model], |
|
outputs=[image, output, output_text], |
|
) |
|
|
|
model.change( |
|
process_question, |
|
inputs=[question, document, model], |
|
outputs=[image, output, output_text], |
|
) |
|
|
|
example_image.change( |
|
fn=load_example_document, |
|
inputs=[example_image, example_question, model], |
|
outputs=[document, question, image, img_clear_button, output, output_text], |
|
) |
|
|
|
if __name__ == "__main__": |
|
demo.launch(enable_queue=False) |
|
|