invoices / app.py
Ankur Goyal
Visual edits
3410d4d
raw
history blame
13.9 kB
import os
os.environ["TOKENIZERS_PARALLELISM"] = "false"
from PIL import Image, ImageDraw
import traceback
import gradio as gr
from gradio import processing_utils
import torch
from docquery import pipeline
from docquery.document import load_bytes, load_document, ImageDocument
from docquery.ocr_reader import get_ocr_reader
def ensure_list(x):
if isinstance(x, list):
return x
else:
return [x]
CHECKPOINTS = {
"LayoutLMv1 for Invoices 🧾": "impira/layoutlm-invoices",
}
PIPELINES = {}
def construct_pipeline(task, model):
global PIPELINES
if model in PIPELINES:
return PIPELINES[model]
device = "cuda" if torch.cuda.is_available() else "cpu"
ret = pipeline(task=task, model=CHECKPOINTS[model], device=device)
PIPELINES[model] = ret
return ret
def run_pipeline(model, question, document, top_k):
pipeline = construct_pipeline("document-question-answering", model)
return pipeline(question=question, **document.context, top_k=top_k)
# TODO: Move into docquery
# TODO: Support words past the first page (or window?)
def lift_word_boxes(document, page):
return document.context["image"][page][1]
def expand_bbox(word_boxes):
if len(word_boxes) == 0:
return None
min_x, min_y, max_x, max_y = zip(*[x[1] for x in word_boxes])
min_x, min_y, max_x, max_y = [min(min_x), min(min_y), max(max_x), max(max_y)]
return [min_x, min_y, max_x, max_y]
# LayoutLM boxes are normalized to 0, 1000
def normalize_bbox(box, width, height, padding=0.005):
min_x, min_y, max_x, max_y = [c / 1000 for c in box]
if padding != 0:
min_x = max(0, min_x - padding)
min_y = max(0, min_y - padding)
max_x = min(max_x + padding, 1)
max_y = min(max_y + padding, 1)
return [min_x * width, min_y * height, max_x * width, max_y * height]
EXAMPLES = [
[
"invoice.png",
"East Repair Invoice",
],
[
"acze.png",
"ACZE Invoice",
],
[
"north_sea.png",
"North Sea Invoice",
],
]
QUESTION_FILES = {
"North Sea Invoice": "north_sea.pdf",
}
FIELDS = {
"Vendor Name": ["Vendor Name - Logo?", "Vendor Name - Address?"],
"Vendor Address": ["Vendor Address?"],
"Customer Name": ["Customer Name?"],
"Customer Address": ["Customer Address?"],
"Invoice Number": ["Invoice Number?"],
"Invoice Date": ["Invoice Date?"],
"Due Date": ["Due Date?"],
"Subtotal": ["Subtotal?"],
"Total Tax": ["Total Tax?"],
"Invoice Total": ["Invoice Total?"],
"Amount Due": ["Amount Due?"],
"Payment Terms": ["Payment Terms?"],
"Remit To Name": ["Remit To Name?"],
"Remit To Address": ["Remit To Address?"],
}
def empty_table(fields):
return {"value": [[name, None] for name in fields.keys()], "interactive": False}
def process_document(document, fields, model, error=None):
if document is not None and error is None:
preview, json_output, table = process_fields(document, fields, model)
return (
document,
fields,
preview,
gr.update(visible=True),
gr.update(visible=False, value=None),
json_output,
table,
)
else:
return (
None,
fields,
None,
gr.update(visible=False),
gr.update(visible=True, value=error) if error is not None else None,
None,
gr.update(**empty_table(fields)),
)
def process_path(path, fields, model):
error = None
document = None
if path:
try:
document = load_document(path)
except Exception as e:
traceback.print_exc()
error = str(e)
return process_document(document, fields, model, error)
def process_upload(file, fields, model):
return process_path(file.name if file else None, fields, model)
colors = ["#64A087", "green", "black"]
def annotate_page(prediction, pages, document):
if prediction is not None and "word_ids" in prediction:
image = pages[prediction["page"]]
draw = ImageDraw.Draw(image, "RGBA")
word_boxes = lift_word_boxes(document, prediction["page"])
x1, y1, x2, y2 = normalize_bbox(
expand_bbox([word_boxes[i] for i in prediction["word_ids"]]),
image.width,
image.height,
)
draw.rectangle(((x1, y1), (x2, y2)), fill=(0, 255, 0, int(0.4 * 255)))
def process_question(
question, document, img_gallery, model, fields, output, output_table
):
if not question or document is None:
return None, None, None, None
text_value = None
pages = [processing_utils.decode_base64_to_image(p) for p in img_gallery]
prediction = run_pipeline(model, question, document, 1)
annotate_page(prediction, pages, document)
field_name = question.rstrip("?")
fields = {field_name: [question], **fields}
output = {field_name: prediction, **output}
table = [[field_name, prediction.get("answer")]] + output_table.values.tolist()
return (
None,
gr.update(visible=True, value=pages),
fields,
output,
gr.update(value=table, interactive=False),
)
def process_fields(document, fields, model=list(CHECKPOINTS.keys())[0]):
pages = [x.copy().convert("RGB") for x in document.preview]
ret = {}
table = []
for (field_name, questions) in fields.items():
answers = [
a
for q in questions
for a in ensure_list(run_pipeline(model, q, document, top_k=1))
if a.get("score", 1) > 0.5
]
answers.sort(key=lambda x: -x.get("score", 0) if x else 0)
top = answers[0] if len(answers) > 0 else None
annotate_page(top, pages, document)
ret[field_name] = top
table.append([field_name, top.get("answer") if top is not None else None])
return (
gr.update(visible=True, value=pages),
gr.update(visible=True, value=ret),
gr.update(visible=True, value=table),
)
def load_example_document(img, title, fields, model):
document = None
if img is not None:
if title in QUESTION_FILES:
document = load_document(QUESTION_FILES[title])
else:
document = ImageDocument(Image.fromarray(img), ocr_reader=get_ocr_reader())
return process_document(document, fields, model)
CSS = """
#question input {
font-size: 16px;
}
#url-textbox, #question-textbox {
padding: 0 !important;
}
#short-upload-box .w-full {
min-height: 10rem !important;
}
/* I think something like this can be used to re-shape
* the table
*/
/*
.gr-samples-table tr {
display: inline;
}
.gr-samples-table .p-2 {
width: 100px;
}
*/
#select-a-file {
width: 100%;
}
#file-clear {
padding-top: 2px !important;
padding-bottom: 2px !important;
padding-left: 8px !important;
padding-right: 8px !important;
margin-top: 10px;
}
.gradio-container .gr-button-primary {
background: linear-gradient(180deg, #CDF9BE 0%, #AFF497 100%);
border: 1px solid #B0DCCC;
border-radius: 8px;
color: #1B8700;
}
.gradio-container.dark button#submit-button {
background: linear-gradient(180deg, #CDF9BE 0%, #AFF497 100%);
border: 1px solid #B0DCCC;
border-radius: 8px;
color: #1B8700
}
table.gr-samples-table tr td {
border: none;
outline: none;
}
table.gr-samples-table tr td:first-of-type {
width: 0%;
}
div#short-upload-box div.absolute {
display: none !important;
}
gradio-app > div > div > div > div.w-full > div, .gradio-app > div > div > div > div.w-full > div {
gap: 0px 2%;
}
gradio-app div div div div.w-full, .gradio-app div div div div.w-full {
gap: 0px;
}
gradio-app h2, .gradio-app h2 {
padding-top: 10px;
}
#answer {
overflow-y: scroll;
color: white;
background: #666;
border-color: #666;
font-size: 20px;
font-weight: bold;
}
#answer span {
color: white;
}
#answer textarea {
color:white;
background: #777;
border-color: #777;
font-size: 18px;
}
#url-error input {
color: red;
}
#results-table {
max-height: 600px;
overflow-y: scroll;
}
"""
with gr.Blocks(css=CSS) as demo:
gr.Markdown("# DocQuery: Document Query Engine")
gr.Markdown(
"DocQuery (created by [Impira](https://impira.com)) uses LayoutLMv1 fine-tuned on an invoice dataset"
" as well as DocVQA and SQuAD, which boot its general comprehension skills. The model is an enhanced"
" QA architecture that supports selecting blocks of text which may be non-consecutive, which is a major"
" issue when dealing with invoice documents (e.g. addresses)."
" To use it, simply upload an image or PDF invoice and the model will predict values for several fields."
" You can also create additional fields by simply typing in a question."
" DocQuery is available on [Github](https://github.com/impira/docquery)."
)
document = gr.Variable()
fields = gr.Variable(value={**FIELDS})
example_question = gr.Textbox(visible=False)
example_image = gr.Image(visible=False)
with gr.Row(equal_height=True):
with gr.Column():
with gr.Row():
gr.Markdown("## Select an invoice", elem_id="select-a-file")
img_clear_button = gr.Button(
"Clear", variant="secondary", elem_id="file-clear", visible=False
)
image = gr.Gallery(visible=False)
with gr.Row(equal_height=True):
with gr.Column():
with gr.Row():
url = gr.Textbox(
show_label=False,
placeholder="URL",
lines=1,
max_lines=1,
elem_id="url-textbox",
)
submit = gr.Button("Get")
url_error = gr.Textbox(
visible=False,
elem_id="url-error",
max_lines=1,
interactive=False,
label="Error",
)
gr.Markdown("— or —")
upload = gr.File(label=None, interactive=True, elem_id="short-upload-box")
gr.Examples(
examples=EXAMPLES,
inputs=[example_image, example_question],
)
with gr.Column() as col:
gr.Markdown("## Results")
with gr.Tabs():
with gr.TabItem("Table"):
output_table = gr.Dataframe(
headers=["Field", "Value"], **empty_table(fields.value),
elem_id="results-table"
)
with gr.TabItem("JSON"):
output = gr.JSON(label="Output", visible=True)
model = gr.Radio(
choices=list(CHECKPOINTS.keys()),
value=list(CHECKPOINTS.keys())[0],
label="Model",
visible=False,
)
gr.Markdown("### Ask a question")
with gr.Row():
question = gr.Textbox(
label="Question",
show_label=False,
placeholder="e.g. What is the invoice number?",
lines=1,
max_lines=1,
elem_id="question-textbox",
)
clear_button = gr.Button("Clear", variant="secondary", visible=False)
submit_button = gr.Button(
"Add", variant="primary", elem_id="submit-button"
)
for cb in [img_clear_button, clear_button]:
cb.click(
lambda _: (
gr.update(visible=False, value=None), # image
None, # document
# {**FIELDS}, # fields
gr.update(value=None), # output
gr.update(**empty_table(fields.value)), # output_table
gr.update(visible=False),
None,
None,
None,
gr.update(visible=False, value=None),
None,
),
inputs=clear_button,
outputs=[
image,
document,
# fields,
output,
output_table,
img_clear_button,
example_image,
upload,
url,
url_error,
question,
],
)
submit_outputs = [
document,
fields,
image,
img_clear_button,
url_error,
output,
output_table,
]
upload.change(
fn=process_upload,
inputs=[upload, fields, model],
outputs=submit_outputs,
)
submit.click(
fn=process_path,
inputs=[url, fields, model],
outputs=submit_outputs,
)
question.submit(
fn=process_question,
inputs=[question, document, image, model, fields, output, output_table],
outputs=[question, image, fields, output, output_table],
)
submit_button.click(
process_question,
inputs=[question, document, model],
outputs=[image, output, output_table],
)
model.change(
process_question,
inputs=[question, document, model],
outputs=[image, output, output_table],
)
example_image.change(
fn=load_example_document,
inputs=[example_image, example_question, fields, model],
outputs=submit_outputs,
)
if __name__ == "__main__":
demo.launch(enable_queue=False)