Application / pdf2txt_v2.py
FrankWu's picture
Upload 5 files
e2dccf7 verified
# -*- coding: utf-8 -*-
"""
Created by Shengbo.Zhang on 2021/09/20
"""
import os
import re
import logging
import pdfplumber
from docx import Document
from Pdf2Txt.config import *
from Pdf2Txt.config import _check_ann_title_processable
from pdf2docx import Converter
from collections import Counter
from pdfminer.pdfpage import PDFPage
from pdfminer.layout import LAParams, LTTextBox
from pdfminer.converter import PDFPageAggregator
from pdfminer.pdfinterp import PDFResourceManager, PDFPageInterpreter
# 临时关闭pdf2docx模块中Converter的日志输出
logging.disable(logging.INFO)
logging.disable(logging.WARNING)
def get_string_list_from_pdf(pdf_path):
'''
从一个PDF文件中直接逐行读取文本内容(除表格以外的正文),结果存放在一个列表中
:param pdf_path: 一个字符串,PDF文件的路径地址
:return: 两个列表:string_list,ann_info_list。前者存放PDF的逐行文本内容,后者存放公告的头部信息(例如:证券代码、证券简称、公告编号等)
'''
string_list = []
ann_info_list = []
with pdfplumber.open(pdf_path) as pdf:
for id, page in enumerate(pdf.pages):
bboxes = [table.bbox for table in page.find_tables()]
def _not_within_bboxes(obj):
def _obj_in_bbox(_bbox):
v_mid = (obj["top"] + obj["bottom"]) / 2
h_mid = (obj["x0"] + obj["x1"]) / 2
x0, top, x1, bottom = _bbox
return (h_mid >= x0) and (h_mid < x1) and (v_mid >= top) and (v_mid < bottom)
return not any(_obj_in_bbox(__bbox) for __bbox in bboxes)
new_page = page.filter(_not_within_bboxes)
string = new_page.extract_text()
string_split = string.split('\n')
if id == 0:
ann_info_list = string_split[:10]
string_split = [new_string.replace(' ', '').replace('\n', '').replace('\t', '') + '\n' for new_string in string_split]
string_split = list(filter(lambda x: x != '\n' and x != '', string_split))
string_list.extend(string_split)
return string_list, ann_info_list
def get_ann_info_from_pdf(pdf_path):
'''
获取PDF公告文件的头部信息(此处截取了前5行文本,可能包括非头部数据,将在refine_txt_list()中进一步处理)
:param pdf_path: 一个字符串,PDF文件的路径地址
:return: 一个列表,存放PDF公告文件的头部信息(例如:证券代码、证券简称、公告编号等)
'''
try:
with pdfplumber.open(pdf_path) as pdf:
string = pdf.pages[0].extract_text()
string_split = string.split('\n')
ann_info_list = string_split[:10]
except:
ann_info_list = []
return ann_info_list
def get_string_list_from_pdf_converted_docx(pdf_path, docx_path):
'''
将PDF文件转换为Docx格式,逐行读取Docx文件中的正文内容(除表格以外)
:param pdf_path: 一个字符串,PDF文件的路径地址
:return: 一个列表,string_list,存放PDF的逐行文本内容;一个Document实例对象,存放临时的Docx文件
'''
document = None
string_list = []
if docx_path == '':
output_docx_file_path = f"{os.path.dirname(pdf_path)}//{os.path.basename(pdf_path)[:-4]}_{TEMP_DOCX_SUFFIX}.docx"
else:
output_docx_file_path = docx_path
is_success = get_docx_from_pdf(pdf_path=pdf_path, out_path=output_docx_file_path)
if is_success:
document = Document(output_docx_file_path)
for val in document.paragraphs:
tmp = val.text.strip()
tmp_list = tmp.split('\n')
for s in tmp_list:
s = s.strip()
if s == '': continue
string_list.append(s)
string_list = [string.replace(' ', '').replace('\n', '').replace('\t', '') + '\n' for string in string_list]
ann_headers = []
for i, val in enumerate(string_list):
if i > 10: break
if val.strip()[-4:] == '有限公司': break
ann_headers.append(val)
for i, val1 in enumerate(string_list):
for j, val2 in enumerate(ann_headers):
if val1 == val2: string_list[i] = ''
if os.path.exists(output_docx_file_path):
os.remove(output_docx_file_path)
return string_list, document
def get_abscissa_dict_from_pdf(pdf_path):
'''
从一个PDF文件中逐行读取该行首个文本块字符的横坐标值(以PDF页面左上角为原点),以该行文本内容为键,横坐标值为值,建立一个字典
:param pdf_path: 一个字符串,PDF文件的路径地址
:return: 一个字典:abscissa_dict,存放PDF文件中某一文本块的起始横坐标值
'''
abscissa_dict = {}
fp = open(pdf_path, 'rb')
rsrcmgr = PDFResourceManager()
laparams = LAParams()
device = PDFPageAggregator(rsrcmgr=rsrcmgr, laparams=laparams)
interpreter = PDFPageInterpreter(rsrcmgr=rsrcmgr, device=device)
pages = PDFPage.get_pages(fp)
for i, page in enumerate(pages):
interpreter.process_page(page)
layout = device.get_result()
for lobj in layout:
if isinstance(lobj, LTTextBox):
x, text = int(lobj.bbox[0]), lobj.get_text()
tmp = text.replace(' ', '').replace('\n', '').replace('\t', '') + '\n'
if tmp != '\n' and tmp != '':
abscissa_dict[tmp] = x
fp.close()
return abscissa_dict
def get_min_abscissa_value(abscissa_dict, string_list_length):
'''
计算PDF文本块横坐标的最小值(正文块),这里假设该值至少应大于或等于某一阈值(此处设为文本总行数的1/4)
:param abscissa_dict: 一个字典,存放PDF文件中某一文本块的起始横坐标值
:param string_list_length: 整型,PDF的文本字符串列表
:return: 整型,PDF正文块横坐标的最小值
'''
abscissa_x_list = list(abscissa_dict.values())
abscissa_x_list_counter = list(dict(Counter(abscissa_x_list)).items())
abscissa_x_list_counter.sort()
x_threshold = string_list_length // 4
min_abscissa_value = min(abscissa_x_list)
for item in abscissa_x_list_counter:
if item[1] >= x_threshold:
min_abscissa_value = item[0]
break
return min_abscissa_value
def refine_txt_list(txt, ann_info):
'''
此时PDF文件的文本字符串列表(正文)已经过首轮处理,此处将对它进行最后的格式上的优化
:param txt: PDF的文本列表,包含PDF的正文文本内容
:param ann_info: PDF的公告的头部信息
:return: 一个新的PDF文本列表
'''
# 格式化PDF的【公告头部信息】
if ann_info != []:
new_ann_info_list = []
for i, val in enumerate(ann_info):
if val.strip() == '': continue
if val.strip()[-4:] == '有限公司': break
else: new_ann_info_list.append(' '.join(val.split()) + SEGMENT_SYMBOL)
if new_ann_info_list != []:
new_ann_info_list[-1] = new_ann_info_list[-1].replace(SEGMENT_SYMBOL, '')
if txt[0].strip()[-4:] == '有限公司':
for i in range(len(new_ann_info_list)):
txt.insert(0, '')
for i, val in enumerate(new_ann_info_list):
txt[i] = val
# 格式化PDF的【公告标题】【董事会承诺说明】
for i, val in enumerate(txt):
if i > 10: break
else:
val = val.strip()
if _check_ann_title_processable(val):
if SEGMENT_SYMBOL not in val:
txt[i] = (SEGMENT_SYMBOL + val)
if val[-4:] == '有限公司':
if SEGMENT_SYMBOL not in txt[i]:
txt[i] = (SEGMENT_SYMBOL + val)
if _check_ann_title_processable(txt[i+1]):
txt[i+1] = txt[i+1].replace(SEGMENT_SYMBOL, '')
if txt[i+2].replace(SEGMENT_SYMBOL, '')[:3] == '本公司':
if SEGMENT_SYMBOL not in txt[i+2]:
txt[i+2] = (SEGMENT_SYMBOL + txt[i+2])
txt[i+3] = txt[i+3].replace(SEGMENT_SYMBOL, '')
break
if _check_ann_title_processable(txt[i+2]):
txt[i+1] = txt[i+1].replace(SEGMENT_SYMBOL, '')
txt[i+2] = txt[i+2].replace(SEGMENT_SYMBOL, '')
if txt[i+3].replace(SEGMENT_SYMBOL, '')[:3] == '本公司':
if SEGMENT_SYMBOL not in txt[i+3]:
txt[i+3] = (SEGMENT_SYMBOL + txt[i+3])
txt[i+4] = txt[i+4].replace(SEGMENT_SYMBOL, '')
break
# 次轮遍历PDF的文本字符串列表
for i, _ in enumerate(txt):
# 格式化PDF的【大小节编号】
if (SEGMENT_SYMBOL not in txt[i]):
match_check = [1, 1, 1, 1, 1]
# 形如: '一、'的匹配模式
match_1 = re.match('[\u96f6\u4e00\u4e8c\u4e09\u56db\u4e94\u516d\u4e03\u516b\u4e5d\u5341]{1,2}、', txt[i])
# 形如: '1、'的匹配模式
match_2 = re.match('[0-9]{1,2}、', txt[i])
# 形如: '1.'的匹配模式
match_3 = re.match('[0-9]{1,2}\.', txt[i])
# 形如: '(一)'或'(一)'的匹配模式
match_4 = re.match('[\(\(]+[\u96f6\u4e00\u4e8c\u4e09\u56db\u4e94\u516d\u4e03\u516b\u4e5d\u5341]{1,2}[\)\)]+', txt[i])
# 形如: '(1)'或'(1)'的匹配模式
match_5 = re.match('[\(\(]+[0-9]{1,2}[\)\)]+', txt[i])
if match_1: match_check[0] = match_1.start()
if match_2: match_check[1] = match_2.start()
if match_3: match_check[2] = match_3.start()
if match_4: match_check[3] = match_4.start()
if match_5: match_check[4] = match_5.start()
if 0 in match_check:
txt[i] = SEGMENT_SYMBOL + txt[i]
# 修正某些情况下【重要内容提示】字段未自成一行的错误
if ('重要内容提示' in txt[i]) and (SEGMENT_SYMBOL not in txt[i]) and (txt[i].index('重要内容提示') == 0):
txt[i] = SEGMENT_SYMBOL + txt[i]
# 修正某些情况下【单位:元】字段未被删除的错误
if (txt[i] == '单位:元') or (txt[i] == SEGMENT_SYMBOL + '单位:元'):
txt[i] = ''
# 修正某些情况下【特别提示】字段未自成一行的错误
if ('特别提示' in txt[i]) and (SEGMENT_SYMBOL not in txt[i]) and (txt[i].index('特别提示') == 0):
txt[i] = SEGMENT_SYMBOL + txt[i]
# 修正某些情况下【特此公告】字段未自成一行的错误
if ('特此公告' in txt[i]) and (SEGMENT_SYMBOL not in txt[i]) and (txt[i].index('特此公告') == 0):
txt[i] = SEGMENT_SYMBOL + txt[i]
# 修正某些情况下该行文本与下一行文本内容重复的错误(仅保留一行)
if (i+1) < len(txt) and (txt[i] == txt[i+1]):
txt[i] = ''
return txt
def get_docx_from_pdf(pdf_path, out_path):
'''
读入一个PDF文件,将其转换为Docx格式并临时存放于本地
:param pdf_path: 输入的PDF公告文件的完整路径
:param out_path: 输出的中间Docx结果文件的完整路径
:return: 布尔值,是否转换成功
'''
cv = Converter(pdf_path)
try:
cv.convert(out_path)
except Exception:
cv.close()
return False
for p in cv.pages:
if not p.finalized:
cv.close()
return False
cv.close()
return True
def _get_table_row_feat(str):
'''
给定一个空格分割的表格行字符串,计算它的特征(01组成的字符串)
:param str: 字符串
:return: 字符串
'''
s = str.split()
r = ''
for c in s:
try:
_ = float(c)
r += '1'
except Exception:
r += '0'
return r
def append_table_from_docx(doc, txt):
'''
读取Docx文件中每个表格的内容,格式化处理后追加至PDF的文本列表中
:param doc: 一个Document对象实例
:param txt: 一个字符串列表,包含PDF的正文文本内容
:return: 一个新的PDF文本列表
'''
data = []
table_txt = []
table_tag = '-' + TABLE_SYMBOL + '-'
for table in doc.tables[:]:
table_txt.append(f'{table_tag}\n')
for i, row in enumerate(table.rows[:]):
row_content = []
for cell in row.cells[:]:
c = cell.text
new_c = c.replace('\n', '').replace(' ','').replace('\t','').replace(',','')
row_content.append(new_c)
if row_content == []: continue
if '本公司' in row_content[0]:
tmp = SEGMENT_SYMBOL
for line in row_content:
tmp += line.strip()
if '特别提示' in tmp:
tmp = tmp[:tmp.index('特别提示')+4]+SEGMENT_SYMBOL+tmp[tmp.index('特别提示')+4:]
for id, val in enumerate(txt):
if id > 10: break
else:
if _check_ann_title_processable(val):
txt.insert(id+1, tmp)
break
continue
if '证券代码' in row_content[0]: continue
data.append(row_content)
new_row = '^' + TABLE_CELL_SYMBOL.join(row_content) + '$\n'
if new_row.replace(TABLE_CELL_SYMBOL,'') != '^$\n':
table_txt.append(new_row)
data.append(f'{table_tag}\n')
table_txt.append(f'{table_tag}\n')
flag = False
for i, val in enumerate(table_txt):
if val == f'{table_tag}\n':
if not flag:
flag = True
else:
table_txt[i] = '^$\n'
else:
flag = False
table_txt = list(filter(lambda x: x != '^$\n', table_txt))
for i, val in enumerate(table_txt):
if val == f'{table_tag}\n' and (i > 0) and (i < len(table_txt)-1):
feat1 = _get_table_row_feat(table_txt[i-1].replace('\n', ''))
feat2 = _get_table_row_feat(table_txt[i+1].replace('\n', ''))
if feat1 == feat2:
table_txt[i] = '^$\n'
if len(table_txt) == 1 and table_txt[0] == f'{table_tag}\n':
table_txt[0] = '^$\n'
for i, val in enumerate(table_txt):
if val == table_tag:
continue
if val == '^$\n':
table_txt[i] = ''
continue
table_txt[i] = val[1:][:-2] + '\n'
txt.extend(table_txt)
return txt
def output_txt_string(txt_path, txt_string):
'''
将PDF公告的格式化文本字符串写出至一个.txt的纯文本文件
:param txt_path: 纯文本文件的路径
:param txt_string: PDF公告的纯文本字符串
:return: 布尔值,是否写出成功
'''
try:
with open(txt_path, "w", encoding='utf-8') as f:
f.write(txt_string)
# txt_string_split = txt_string.split('\n')
# with open(txt_path, "w", encoding='utf-8') as f:
# for string in txt_string_split:
# if string != '':
# f.write('^' + string + '$\n')
except:
return False
return True
def get_txt_from_pdf(pdf_path, docx_path=''):
'''
给定一个PDF格式的公告文件,将其转化为格式化的TXT文本字符串
:param pdf_path: 一个字符串,PDF文件的路径地址
:return: 一个字符串,PDF经转换后的纯文本(已格式化,前部正文,后部表格)
'''
txt_string = ''
ann_info_list = get_ann_info_from_pdf(pdf_path)
string_list, document = get_string_list_from_pdf_converted_docx(pdf_path, docx_path)
if ann_info_list != [] and string_list != [] and document is not None:
abscissa_dict = get_abscissa_dict_from_pdf(pdf_path)
min_abscissa_value = get_min_abscissa_value(abscissa_dict, len(string_list))
for i, val in enumerate(string_list):
if i > 10: break
if val.replace('\n', '')[-4:] == '有限公司': break
else: abscissa_dict[val] = min_abscissa_value
txt_list = []
for id, string in enumerate(string_list):
new_string = string.replace('\n', '').replace('\t', '').replace(' ', '').replace(' ', '').replace('', '').replace(',', '')
if (not (len(new_string) <= 3 and new_string.isdigit())) and string != '':
try:
if abscissa_dict[string] > min_abscissa_value:
txt_list.append(SEGMENT_SYMBOL + new_string)
else:
txt_list.append(new_string)
except:
txt_list.append(new_string)
txt_list = refine_txt_list(txt_list, ann_info_list)
if document is not None:
txt_list.append(SEGMENT_SYMBOL)
txt_list = append_table_from_docx(doc=document, txt=txt_list)
for val in txt_list:
txt_string += val
return txt_string