ERNIE-Layout / app.py
linjieccc
update
05f882f
raw
history blame
14.7 kB
#-*- coding: UTF-8 -*-
# Copyright 2022 The Impira Team and the HuggingFace Team.
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import json
import base64
from io import BytesIO
from PIL import Image
import traceback
import requests
import numpy as np
import gradio as gr
import pdf2image
import fitz
import cv2
fitz_tools = fitz.Tools()
def pdf2img(stream, pagenos, dpi=300, thread_count=3, height=1600):
images = []
cimages = pdf2image.convert_from_bytes(
stream, dpi=dpi, thread_count=thread_count, first_page=pagenos[0] + 1, last_page=pagenos[-1] + 1,
size=height)
for _image in cimages:
image = np.array(_image)
image = image[..., ::-1]
images.append(image)
return images
class PdfReader(object):
"""pdf reader"""
def __init__(self,
stream: bytes,
image_height: int = 1600):
self.stream = stream
self._image_height = image_height
self._dpi = 200
self._inpdf = self.load_file(stream)
@staticmethod
def load_file(stream):
"""load document"""
try:
inpdf = fitz.Document(stream=stream, filetype="pdf")
except Exception as e:
print(f"[PDF_READER]-[Failed to load the file]-[{repr(e)}]")
return inpdf
@staticmethod
def _convert_page_obj_to_image(page_obj, image_height: int = None):
"""fitz convert pdf to image
Args:
page_obj ([type]): [description]
ratio ([type]): [description]
Returns:
[type]: [description]
"""
if image_height:
_, page_height = page_obj.rect.x1 - \
page_obj.rect.x0, page_obj.rect.y1 - page_obj.rect.y0
ratio = image_height / page_height
else:
ratio = 1.0
trans = fitz.Matrix(ratio, ratio)
pixmap = page_obj.get_pixmap(matrix=trans, alpha=False)
image = cv2.imdecode(np.frombuffer(pixmap.tobytes(), np.uint8), -1)
fitz_tools.store_shrink(100)
return image
def get_page_image(self,
pageno):
"""get page image
Args:
pageno ([type]): [description]
Returns:
[type]: [description]
"""
try:
page_obj = self._inpdf[pageno]
return self._convert_page_obj_to_image(page_obj, self._image_height)
except Exception as e:
print(f"[Failed to convert the PDF to images]-[{repr(e)}]")
try:
return pdf2img(stream=self.stream,
pagenos=[pageno],
height=self._image_height,
dpi=self._dpi)[0]
except Exception as e:
print(f"[Failed to convert the PDF to images]-[{repr(e)}]")
return None
examples = [
[
"budget_form.png",
"What is the total actual and/or obligated expenses of ECG Center?"
],
[
"medical_bill_2.jpg",
"患者さんは何でお金を払いますか。"
],
[
"receipt.png",
"เบอร์โทรร้านอะไรคะ"
],
[
"poster.png",
"Which gift idea needs a printer?"
],
[
"resume.png",
"五百丁本次想要担任的是什么职位?",
],
[
"custom_declaration_form.png",
"在哪个口岸进口?"
],
[
"invoice.jpg",
"发票号码是多少?",
],
[
"medical_bill_1.png",
"票据的具体名称是什么?"
],
[
"website_design_guide.jpeg",
"Which quality component has the icon of a pen in it?"
],
]
prompt_files = {
"发票号码是多少?": "invoice.jpg",
"五百丁本次想要担任的是什么职位?": "resume.png",
"在哪个口岸进口?": "custom_declaration_form.png",
"票据的具体名称是什么?": "medical_bill_1.png",
"What is the total actual and/or obligated expenses of ECG Center?": "budget_form.png",
"Which quality component has the icon of a pen in it?": "website_design_guide.jpeg",
"Which gift idea needs a printer?": "poster.png",
"患者さんは何でお金を払いますか。": "medical_bill_2.jpg",
"เบอร์โทรร้านอะไรคะ": "receipt.png"
}
def load_document(path):
if path.startswith("http://") or path.startswith("https://"):
resp = requests.get(path, allow_redirects=True, stream=True)
b = resp.raw
else:
b = open(path, "rb")
if path.endswith(".pdf"):
images_list = []
pdfreader = PdfReader(stream=b.read())
for p_no in range(0, pdfreader._inpdf.page_count):
img_np = pdfreader.get_page_image(pageno=p_no)
images_list.append(img_np)
else:
image = Image.open(b)
images_list = [image.convert("RGB")]
return images_list
def process_path(path):
error = None
if path:
try:
img = load_document(path)
return (
path,
gr.update(visible=True, value=img),
gr.update(visible=True),
gr.update(visible=False, value=None),
gr.update(visible=False, value=None),
None,
)
except Exception as e:
traceback.print_exc()
error = str(e)
return (
None,
gr.update(visible=False, value=None),
gr.update(visible=False),
gr.update(visible=False, value=None),
gr.update(visible=False, value=None),
gr.update(visible=True, value=error) if error is not None else None,
None,
)
def process_upload(file):
if file:
return process_path(file.name)
else:
return (
None,
gr.update(visible=False, value=None),
gr.update(visible=False),
gr.update(visible=False, value=None),
gr.update(visible=False, value=None),
None,
)
def img_base64(img_path):
with open(img_path, "rb") as f:
base64_str = base64.b64encode(f.read())
return base64_str
def process_prompt(prompt, document, lang="ch"):
if not prompt:
prompt = "What is the total actual and/or obligated expenses of ECG Center?"
if document is None:
return None, None, None
access_token = os.environ['token']
url = f"https://aip.baidubce.com/rpc/2.0/nlp-itec/poc/docprompt?access_token={access_token}"
r = requests.post(url, json={"doc": img_base64(document), "prompt": [prompt], "lang": lang})
response = r.json()
predictions = response['result']
img_list = response['image']
pages = [Image.open(BytesIO(base64.b64decode(img))) for img in img_list]
text_value = predictions[0]['result'][0]['value']
return (
gr.update(visible=True, value=pages),
gr.update(visible=True, value=predictions),
gr.update(
visible=True,
value=text_value,
),
)
def load_example_document(img, prompt, lang="ch"):
if img is not None:
document = prompt_files[prompt]
preview, answer, answer_text = process_prompt(prompt, document, lang)
return document, prompt, preview, gr.update(visible=True), answer, answer_text
else:
return None, None, None, gr.update(visible=False), None, None
def read_content(file_path: str) -> str:
"""read the content of target file
"""
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
return content
CSS = """
#prompt input {
font-size: 16px;
}
#url-textbox {
padding: 0 !important;
}
#short-upload-box .w-full {
min-height: 10rem !important;
}
/* I think something like this can be used to re-shape
* the table
*/
/*
.gr-samples-table tr {
display: inline;
}
.gr-samples-table .p-2 {
width: 100px;
}
*/
#select-a-file {
width: 100%;
}
#file-clear {
padding-top: 2px !important;
padding-bottom: 2px !important;
padding-left: 8px !important;
padding-right: 8px !important;
margin-top: 10px;
}
.gradio-container .gr-button-primary {
background: linear-gradient(180deg, #CDF9BE 0%, #AFF497 100%);
border: 1px solid #B0DCCC;
border-radius: 8px;
color: #1B8700;
}
.gradio-container.dark button#submit-button {
background: linear-gradient(180deg, #CDF9BE 0%, #AFF497 100%);
border: 1px solid #B0DCCC;
border-radius: 8px;
color: #1B8700
}
table.gr-samples-table tr td {
border: none;
outline: none;
}
table.gr-samples-table tr td:first-of-type {
width: 0%;
}
div#short-upload-box div.absolute {
display: none !important;
}
gradio-app > div > div > div > div.w-full > div, .gradio-app > div > div > div > div.w-full > div {
gap: 0px 2%;
}
gradio-app div div div div.w-full, .gradio-app div div div div.w-full {
gap: 0px;
}
gradio-app h2, .gradio-app h2 {
padding-top: 10px;
}
#answer {
overflow-y: scroll;
color: white;
background: #666;
border-color: #666;
font-size: 20px;
font-weight: bold;
}
#answer span {
color: white;
}
#answer textarea {
color:white;
background: #777;
border-color: #777;
font-size: 18px;
}
#url-error input {
color: red;
}
"""
with gr.Blocks(css=CSS) as demo:
gr.HTML(read_content("header.html"))
gr.Markdown(
f" ⚡DocPrompt⚡ is a Document Prompt Engine uses ERNIE-Layout as the backbone model.\n"
f" The engine is powered by Baidu Wenxin Document Intelligence Team 🚀 and is ability for multilingual documents information extraction and question ansering.\n"
)
document = gr.Variable()
example_prompt = gr.Textbox(visible=False)
example_image = gr.Image(visible=False)
with gr.Row(equal_height=True):
with gr.Column():
with gr.Row():
gr.Markdown("## 1. Select a file", elem_id="select-a-file")
img_clear_button = gr.Button(
"Clear", variant="secondary", elem_id="file-clear", visible=False
)
image = gr.Gallery(visible=False)
with gr.Row(equal_height=True):
with gr.Column():
with gr.Row():
url = gr.Textbox(
show_label=False,
placeholder="URL",
lines=1,
max_lines=1,
elem_id="url-textbox",
)
submit = gr.Button("Get")
url_error = gr.Textbox(
visible=False,
elem_id="url-error",
max_lines=1,
interactive=False,
label="Error",
)
gr.Markdown("— or —")
upload = gr.File(label=None, interactive=True, elem_id="short-upload-box")
gr.Examples(
examples=examples,
inputs=[example_image, example_prompt],
)
with gr.Column() as col:
gr.Markdown("## 2. Make a request")
prompt = gr.Textbox(
label="Prompt",
placeholder="What is the total actual and/or obligated expenses of ECG Center?",
lines=1,
max_lines=1,
)
ocr_lang = gr.Radio(
choices=["ch", "en"],
value="ch",
label="OCR Language",
)
with gr.Row():
clear_button = gr.Button("Clear", variant="secondary")
submit_button = gr.Button(
"Submit", variant="primary", elem_id="submit-button"
)
with gr.Column():
output_text = gr.Textbox(
label="Top Answer", visible=False, elem_id="answer"
)
output = gr.JSON(label="Output", visible=False)
for cb in [img_clear_button, clear_button]:
cb.click(
lambda _: (
gr.update(visible=False, value=None),
None,
gr.update(visible=False, value=None),
gr.update(visible=False, value=None),
gr.update(visible=False),
None,
None,
None,
gr.update(visible=False, value=None),
None,
),
inputs=clear_button,
outputs=[
image,
document,
output,
output_text,
img_clear_button,
example_image,
upload,
url,
url_error,
prompt,
],
)
upload.change(
fn=process_upload,
inputs=[upload],
outputs=[document, image, img_clear_button, output, output_text, url_error],
)
submit.click(
fn=process_path,
inputs=[url],
outputs=[document, image, img_clear_button, output, output_text, url_error],
)
prompt.submit(
fn=process_prompt,
inputs=[prompt, document, ocr_lang],
outputs=[image, output, output_text],
)
submit_button.click(
fn=process_prompt,
inputs=[prompt, document, ocr_lang],
outputs=[image, output, output_text],
)
ocr_lang.change(
fn=process_prompt,
inputs=[prompt, document, ocr_lang],
outputs=[image, output, output_text],
)
example_image.change(
fn=load_example_document,
inputs=[example_image, example_prompt, ocr_lang],
outputs=[document, prompt, image, img_clear_button, output, output_text],
)
gr.Markdown("[![Stargazers repo roster for @PaddlePaddle/PaddleNLP](https://reporoster.com/stars/PaddlePaddle/PaddleNLP)](https://github.com/PaddlePaddle/PaddleNLP)")
gr.HTML(read_content("footer.html"))
if __name__ == "__main__":
demo.launch(enable_queue=False)