|
import logging |
|
from types import SimpleNamespace |
|
|
|
import pdfplumber |
|
from langchain.docstore.document import Document |
|
|
|
|
|
def prepare_table_config(crop_page): |
|
"""Prepare table查找边界, 要求page为原始page |
|
|
|
From https://github.com/jsvine/pdfplumber/issues/242 |
|
""" |
|
page = crop_page.root_page |
|
cs = page.curves + page.edges |
|
|
|
def curves_to_edges(): |
|
"""See https://github.com/jsvine/pdfplumber/issues/127""" |
|
edges = [] |
|
for c in cs: |
|
edges += pdfplumber.utils.rect_to_edges(c) |
|
return edges |
|
|
|
edges = curves_to_edges() |
|
return { |
|
"vertical_strategy": "explicit", |
|
"horizontal_strategy": "explicit", |
|
"explicit_vertical_lines": edges, |
|
"explicit_horizontal_lines": edges, |
|
"intersection_y_tolerance": 10, |
|
} |
|
|
|
|
|
def get_text_outside_table(crop_page): |
|
ts = prepare_table_config(crop_page) |
|
if len(ts["explicit_vertical_lines"]) == 0 or len(ts["explicit_horizontal_lines"]) == 0: |
|
return crop_page |
|
|
|
|
|
bboxes = [table.bbox for table in crop_page.root_page.find_tables(table_settings=ts)] |
|
|
|
def not_within_bboxes(obj): |
|
"""Check if the object is in any of the table's bbox.""" |
|
|
|
def obj_in_bbox(_bbox): |
|
"""See https://github.com/jsvine/pdfplumber/blob/stable/pdfplumber/table.py#L404""" |
|
v_mid = (obj["top"] + obj["bottom"]) / 2 |
|
h_mid = (obj["x0"] + obj["x1"]) / 2 |
|
x0, top, x1, bottom = _bbox |
|
return (h_mid >= x0) and (h_mid < x1) and (v_mid >= top) and (v_mid < bottom) |
|
|
|
return not any(obj_in_bbox(__bbox) for __bbox in bboxes) |
|
|
|
return crop_page.filter(not_within_bboxes) |
|
|
|
|
|
|
|
|
|
extract_words = lambda page: page.extract_words(keep_blank_chars=True, y_tolerance=0, x_tolerance=1, |
|
extra_attrs=["fontname", "size", "object_type"]) |
|
|
|
|
|
def get_title_with_cropped_page(first_page): |
|
title = [] |
|
x0, top, x1, bottom = first_page.bbox |
|
|
|
for word in extract_words(first_page): |
|
word = SimpleNamespace(**word) |
|
|
|
if word.size >= 14: |
|
title.append(word.text) |
|
title_bottom = word.bottom |
|
elif word.text == "Abstract": |
|
top = word.top |
|
|
|
user_info = [i["text"] for i in extract_words(first_page.within_bbox((x0, title_bottom, x1, top)))] |
|
|
|
return title, user_info, first_page.within_bbox((x0, top, x1, bottom)) |
|
|
|
|
|
def get_column_cropped_pages(pages, two_column=True): |
|
new_pages = [] |
|
for page in pages: |
|
if two_column: |
|
left = page.within_bbox((0, 0, page.width / 2, page.height), relative=True) |
|
right = page.within_bbox((page.width / 2, 0, page.width, page.height), relative=True) |
|
new_pages.append(left) |
|
new_pages.append(right) |
|
else: |
|
new_pages.append(page) |
|
|
|
return new_pages |
|
|
|
|
|
def parse_pdf(filename, two_column=True): |
|
level = logging.getLogger().level |
|
if level == logging.getLevelName("DEBUG"): |
|
logging.getLogger().setLevel("INFO") |
|
|
|
with pdfplumber.open(filename) as pdf: |
|
title, user_info, first_page = get_title_with_cropped_page(pdf.pages[0]) |
|
new_pages = get_column_cropped_pages([first_page] + pdf.pages[1:], two_column) |
|
|
|
chapters = [] |
|
|
|
create_chapter = lambda page_start, name_top, name_bottom: SimpleNamespace( |
|
name=[], |
|
name_top=name_top, |
|
name_bottom=name_bottom, |
|
record_chapter_name=True, |
|
|
|
page_start=page_start, |
|
page_stop=None, |
|
|
|
text=[], |
|
) |
|
cur_chapter = None |
|
|
|
|
|
for idx, page in enumerate(new_pages): |
|
page = get_text_outside_table(page) |
|
|
|
|
|
for word in extract_words(page): |
|
word = SimpleNamespace(**word) |
|
|
|
|
|
if word.size >= 11: |
|
if cur_chapter is None: |
|
cur_chapter = create_chapter(page.page_number, word.top, word.bottom) |
|
elif not cur_chapter.record_chapter_name or ( |
|
cur_chapter.name_bottom != cur_chapter.name_bottom and cur_chapter.name_top != cur_chapter.name_top): |
|
|
|
cur_chapter.page_stop = page.page_number |
|
chapters.append(cur_chapter) |
|
|
|
cur_chapter = create_chapter(page.page_number, word.top, word.bottom) |
|
|
|
|
|
cur_chapter.name.append(word.text) |
|
else: |
|
cur_chapter.record_chapter_name = False |
|
cur_chapter.text.append(word.text) |
|
else: |
|
|
|
cur_chapter.page_stop = page.page_number |
|
chapters.append(cur_chapter) |
|
|
|
for i in chapters: |
|
logging.info(f"section: {i.name} pages:{i.page_start, i.page_stop} word-count:{len(i.text)}") |
|
logging.debug(" ".join(i.text)) |
|
|
|
title = " ".join(title) |
|
user_info = " ".join(user_info) |
|
text = f"Article Title: {title}, Information:{user_info}\n" |
|
for idx, chapter in enumerate(chapters): |
|
chapter.name = " ".join(chapter.name) |
|
text += f"The {idx}th Chapter {chapter.name}: " + " ".join(chapter.text) + "\n" |
|
|
|
logging.getLogger().setLevel(level) |
|
|
|
return Document(page_content=text, metadata={"title": title}) |
|
|