|
|
|
|
|
""" |
|
Created by Shengbo.Zhang on 2021/10/08 |
|
""" |
|
|
|
|
|
import os |
|
import re |
|
import logging |
|
import pdfplumber |
|
from docx import Document |
|
from Pdf2Txt.config import * |
|
from pdf2docx import Converter |
|
from collections import Counter |
|
from Pdf2Txt.config import _check_ann_title_processable |
|
|
|
|
|
|
|
logging.disable(logging.INFO) |
|
logging.disable(logging.WARNING) |
|
|
|
|
|
def get_string_and_abscissa_list_from_pdf(pdf_path): |
|
''' |
|
从一个PDF文件中直接逐行读取文本内容(除表格以外的正文)以及最左侧字符的距左边距,结果存放在一个列表中 |
|
:param pdf_path: 一个字符串,PDF文件的路径地址 |
|
:return: 一个列表:string_abscissa_list,列表元素为[i, j]。其中,i为PDF的整行文本块内容,j为该文本块的横坐标(距左边距) |
|
''' |
|
string_abscissa_list = [] |
|
temp_list = [] |
|
temp_string_list = [] |
|
temp_abscissa_list = [] |
|
with pdfplumber.open(pdf_path) as pdf: |
|
for id, page in enumerate(pdf.pages): |
|
bboxes = [table.bbox for table in page.find_tables()] |
|
def _not_within_bboxes(obj): |
|
def _obj_in_bbox(_bbox): |
|
v_mid = (obj["top"] + obj["bottom"]) / 2 |
|
h_mid = (obj["x0"] + obj["x1"]) / 2 |
|
x0, top, x1, bottom = _bbox |
|
return (h_mid >= x0) and (h_mid < x1) and (v_mid >= top) and (v_mid < bottom) |
|
return not any(_obj_in_bbox(__bbox) for __bbox in bboxes) |
|
new_page = page.filter(_not_within_bboxes) |
|
words_list = new_page.extract_words() |
|
for item in words_list: |
|
text = item['text'].replace('\n', '').replace('\t', '').replace(' ', '').replace(' ', '').replace(',', '') |
|
x0 = int(str(item['x0']).split('.')[0]) |
|
y0 = int(str(item['top']).split('.')[0]) |
|
if text != '': |
|
temp_list.append([text, x0, y0]) |
|
|
|
for id, _ in enumerate(temp_list): |
|
if id < len(temp_list)-1 and temp_list[id+1][2] != temp_list[id][2] and abs(temp_list[id+1][2] - temp_list[id][2]) <= 3: |
|
temp_list[id+1][2] = temp_list[id][2] |
|
|
|
i = 0 |
|
j = 1 |
|
while True: |
|
if i < len(temp_list): |
|
temp_str = temp_list[i][0] |
|
while j < len(temp_list): |
|
if temp_list[i][2] == temp_list[j][2]: |
|
temp_str += temp_list[j][0] |
|
else: |
|
break |
|
j += 1 |
|
if i < len(temp_list)-1 and j == len(temp_list): |
|
temp_string_list.append(temp_str) |
|
temp_abscissa_list.append(temp_list[i][1]) |
|
break |
|
temp_string_list.append(temp_str) |
|
temp_abscissa_list.append(temp_list[i][1]) |
|
i = j |
|
j += 1 |
|
if i == len(temp_list)-1 and j == len(temp_list): |
|
temp_string_list.append(temp_list[i][0]) |
|
temp_abscissa_list.append(temp_list[i][1]) |
|
break |
|
else: |
|
break |
|
|
|
for i, j in zip(temp_string_list, temp_abscissa_list): |
|
string_abscissa_list.append([i, j]) |
|
|
|
return string_abscissa_list |
|
|
|
|
|
def get_ann_info_from_pdf(pdf_path): |
|
''' |
|
获取PDF公告文件的头部信息(此处截取了前5行文本,可能包括非头部数据,将在refine_txt_list()中进一步处理) |
|
:param pdf_path: 一个字符串,PDF文件的路径地址 |
|
:return: 一个列表,存放PDF公告文件的头部信息(例如:证券代码、证券简称、公告编号等) |
|
''' |
|
try: |
|
with pdfplumber.open(pdf_path) as pdf: |
|
string = pdf.pages[0].extract_text() |
|
string_split = string.split('\n') |
|
ann_info_list = string_split[:10] |
|
except: |
|
ann_info_list = [] |
|
return ann_info_list |
|
|
|
|
|
def get_document_from_pdf_converted_docx(pdf_path, docx_path): |
|
''' |
|
将PDF文件转换为Docx格式,逐行读取Docx文件中的正文内容(除表格以外) |
|
:param pdf_path: 一个字符串,PDF文件的路径地址 |
|
:return: 一个列表,string_list,存放PDF的逐行文本内容;一个Document实例对象,存放临时的Docx文件 |
|
''' |
|
document = None |
|
if docx_path == '': |
|
output_docx_file_path = f"{os.path.dirname(pdf_path)}//{os.path.basename(pdf_path)[:-4]}_{TEMP_DOCX_SUFFIX}.docx" |
|
else: |
|
output_docx_file_path = docx_path |
|
is_success = get_docx_from_pdf(pdf_path=pdf_path, out_path=output_docx_file_path) |
|
if is_success: |
|
document = Document(output_docx_file_path) |
|
if os.path.exists(output_docx_file_path): |
|
os.remove(output_docx_file_path) |
|
return document |
|
|
|
|
|
def get_min_abscissa_value(abscissa_list, string_list_length): |
|
''' |
|
计算PDF文本块横坐标的最小值(正文块),这里假设该值至少应大于或等于某一阈值(此处设为文本总行数的1/4) |
|
:param abscissa_dict: 一个字典,存放PDF文件中某一文本块的起始横坐标值 |
|
:param string_list_length: 整型,PDF的文本字符串列表 |
|
:return: 整型,PDF正文块横坐标的最小值 |
|
''' |
|
abscissa_x_list = abscissa_list |
|
abscissa_x_list_counter = list(dict(Counter(abscissa_x_list)).items()) |
|
abscissa_x_list_counter.sort() |
|
x_threshold = string_list_length // 4 |
|
min_abscissa_value = min(abscissa_x_list) |
|
for item in abscissa_x_list_counter: |
|
if item[1] >= x_threshold: |
|
min_abscissa_value = item[0] |
|
break |
|
return min_abscissa_value |
|
|
|
|
|
def refine_txt_list(txt, ann_info, string_abscissa_dict): |
|
''' |
|
此时PDF文件的文本字符串列表(正文)已经过首轮处理,此处将对它进行最后的格式上的优化 |
|
:param txt: PDF的文本列表,包含PDF的正文文本内容 |
|
:param ann_info: PDF的公告的头部信息 |
|
:return: 一个新的PDF文本列表 |
|
''' |
|
|
|
if ann_info != []: |
|
new_ann_info_list = [] |
|
for i, val in enumerate(ann_info): |
|
if val.strip() == '': continue |
|
if val.strip()[-4:] == '有限公司': break |
|
else: new_ann_info_list.append(' '.join(val.split()) + SEGMENT_SYMBOL) |
|
if new_ann_info_list != []: |
|
new_ann_info_list[-1] = new_ann_info_list[-1].replace(SEGMENT_SYMBOL, '') |
|
if txt[0].strip()[-4:] == '有限公司': |
|
for i in range(len(new_ann_info_list)): |
|
txt.insert(0, '') |
|
for i, val in enumerate(new_ann_info_list): |
|
txt[i] = val |
|
|
|
for i, val in enumerate(txt): |
|
if i > 10: break |
|
else: |
|
val = val.strip() |
|
if _check_ann_title_processable(val): |
|
if SEGMENT_SYMBOL not in val: |
|
txt[i] = (SEGMENT_SYMBOL + val) |
|
if val[-4:] == '有限公司': |
|
if SEGMENT_SYMBOL not in txt[i]: |
|
txt[i] = (SEGMENT_SYMBOL + val) |
|
if _check_ann_title_processable(txt[i+1]): |
|
txt[i+1] = txt[i+1].replace(SEGMENT_SYMBOL, '') |
|
if txt[i+2].replace(SEGMENT_SYMBOL, '')[:3] == '本公司': |
|
if SEGMENT_SYMBOL not in txt[i+2]: |
|
txt[i+2] = (SEGMENT_SYMBOL + txt[i+2]) |
|
txt[i+3] = txt[i+3].replace(SEGMENT_SYMBOL, '') |
|
break |
|
if _check_ann_title_processable(txt[i+2]): |
|
txt[i+1] = txt[i+1].replace(SEGMENT_SYMBOL, '') |
|
txt[i+2] = txt[i+2].replace(SEGMENT_SYMBOL, '') |
|
if txt[i+3].replace(SEGMENT_SYMBOL, '')[:3] == '本公司': |
|
if SEGMENT_SYMBOL not in txt[i+3]: |
|
txt[i+3] = (SEGMENT_SYMBOL + txt[i+3]) |
|
txt[i+4] = txt[i+4].replace(SEGMENT_SYMBOL, '') |
|
break |
|
|
|
for i, _ in enumerate(txt): |
|
|
|
if (SEGMENT_SYMBOL not in txt[i]): |
|
match_check = [1, 1, 1, 1, 1] |
|
|
|
match_1 = re.match('[\u96f6\u4e00\u4e8c\u4e09\u56db\u4e94\u516d\u4e03\u516b\u4e5d\u5341]{1,2}、', txt[i]) |
|
|
|
match_2 = re.match('[0-9]{1,2}、', txt[i]) |
|
|
|
match_3 = re.match('[0-9]{1,2}\.', txt[i]) |
|
|
|
match_4 = re.match('[\(\(]+[\u96f6\u4e00\u4e8c\u4e09\u56db\u4e94\u516d\u4e03\u516b\u4e5d\u5341]{1,2}[\)\)]+', txt[i]) |
|
|
|
match_5 = re.match('[\(\(]+[0-9]{1,2}[\)\)]+', txt[i]) |
|
if match_1: match_check[0] = match_1.start() |
|
if match_2: match_check[1] = match_2.start() |
|
if match_3: match_check[2] = match_3.start() |
|
if match_4: match_check[3] = match_4.start() |
|
if match_5: match_check[4] = match_5.start() |
|
if 0 in match_check: |
|
txt[i] = SEGMENT_SYMBOL + txt[i] |
|
|
|
if ('重要内容提示' in txt[i]) and (SEGMENT_SYMBOL not in txt[i]) and (txt[i].index('重要内容提示') == 0): |
|
txt[i] = SEGMENT_SYMBOL + txt[i] |
|
if txt[i][-1] != '\n': |
|
txt[i] += SEGMENT_SYMBOL |
|
|
|
if (txt[i] == '单位:元') or (txt[i] == SEGMENT_SYMBOL + '单位:元'): |
|
txt[i] = '' |
|
if (txt[i] == '单位:人民币元') or (txt[i] == SEGMENT_SYMBOL + '单位:人民币元'): |
|
txt[i] = '' |
|
|
|
if ('特别提示' in txt[i]) and (SEGMENT_SYMBOL not in txt[i]) and (txt[i].index('特别提示') == 0): |
|
txt[i] = SEGMENT_SYMBOL + txt[i] |
|
if txt[i][-1] != '\n': |
|
txt[i] += SEGMENT_SYMBOL |
|
|
|
if ('特此公告' in txt[i]) and (SEGMENT_SYMBOL not in txt[i]) and (txt[i].index('特此公告') == 0): |
|
txt[i] = SEGMENT_SYMBOL + txt[i] |
|
if txt[i][-1] != '\n': |
|
txt[i] += SEGMENT_SYMBOL |
|
|
|
match_6 = re.match('附件[0-9]{0,2}:', txt[i]) |
|
if match_6: |
|
if match_6.start() == 0: |
|
txt[i] = SEGMENT_SYMBOL + txt[i] |
|
|
|
if (i+1) < len(txt) and (txt[i] == txt[i+1]): |
|
txt[i] = '' |
|
|
|
if ('' in txt[i]) or ('●' in txt[i]): |
|
txt[i] = txt[i].replace('', '').replace('●', '') |
|
for idx in range(i+1, len(txt)-1): |
|
if ('' in txt[idx]) or ('●' in txt[idx]): |
|
break |
|
txt[idx] = txt[idx].replace(SEGMENT_SYMBOL, '') |
|
if string_abscissa_dict[txt[idx+1].replace(SEGMENT_SYMBOL, '')] < string_abscissa_dict[txt[idx].replace(SEGMENT_SYMBOL, '')]: |
|
break |
|
|
|
if i != 0 and txt[i].replace(SEGMENT_SYMBOL, '').replace(' ', '') in txt[0].replace(SEGMENT_SYMBOL, '').replace(' ', ''): |
|
txt[i] = '' |
|
|
|
if (re.match('^[0-9]{1,2}/[0-9]{1,2}', txt[i].strip().replace('', ''))) or \ |
|
(re.match('^第[0-9]{1,2}页', txt[i].strip().replace('', ''))) or \ |
|
(re.match(r'^-[0-9]{1,2}-', txt[i].strip().replace('', ''))): |
|
txt[i] = '' |
|
return txt |
|
|
|
|
|
def get_docx_from_pdf(pdf_path, out_path): |
|
''' |
|
读入一个PDF文件,将其转换为Docx格式并临时存放于本地 |
|
:param pdf_path: 输入的PDF公告文件的完整路径 |
|
:param out_path: 输出的中间Docx结果文件的完整路径 |
|
:return: 布尔值,是否转换成功 |
|
''' |
|
cv = Converter(pdf_path) |
|
try: |
|
cv.convert(out_path) |
|
except Exception: |
|
cv.close() |
|
return False |
|
for p in cv.pages: |
|
if not p.finalized: |
|
cv.close() |
|
return False |
|
cv.close() |
|
return True |
|
|
|
|
|
def _get_table_row_feat(str): |
|
''' |
|
给定一个空格分割的表格行字符串,计算它的特征(01组成的字符串) |
|
:param str: 字符串 |
|
:return: 字符串 |
|
''' |
|
s = str.split() |
|
r = '' |
|
for c in s: |
|
try: |
|
_ = float(c) |
|
r += '1' |
|
except Exception: |
|
r += '0' |
|
return r |
|
|
|
|
|
def append_table_from_docx(doc, txt): |
|
''' |
|
读取Docx文件中每个表格的内容,格式化处理后追加至PDF的文本列表中 |
|
:param doc: 一个Document对象实例 |
|
:param txt: 一个字符串列表,包含PDF的正文文本内容 |
|
:return: 一个新的PDF文本列表 |
|
''' |
|
data = [] |
|
table_txt = [] |
|
table_tag = '-' + TABLE_SYMBOL + '-' |
|
for table in doc.tables[:]: |
|
table_txt.append(f'{table_tag}\n') |
|
for i, row in enumerate(table.rows[:]): |
|
row_content = [] |
|
for cell in row.cells[:]: |
|
c = cell.text |
|
new_c = c.replace('\n', '').replace(' ','').replace('\t','').replace(',','') |
|
row_content.append(new_c) |
|
if row_content == []: continue |
|
if '本公司' in row_content[0]: |
|
local_flag = True |
|
for val in txt[:10]: |
|
if '本公司' in val: |
|
local_flag = False |
|
break |
|
if local_flag: |
|
tmp = SEGMENT_SYMBOL |
|
for line in row_content: |
|
tmp += line.strip() |
|
if '特别提示' in tmp: |
|
tmp = tmp[:tmp.index('特别提示')+4]+SEGMENT_SYMBOL+tmp[tmp.index('特别提示')+4:] |
|
for id, val in enumerate(txt): |
|
if id > 10: break |
|
else: |
|
if _check_ann_title_processable(val): |
|
txt.insert(id+1, tmp) |
|
break |
|
continue |
|
if '证券代码' in row_content[0]: |
|
continue |
|
data.append(row_content) |
|
new_row = '^' + TABLE_CELL_SYMBOL.join(row_content) + '$\n' |
|
if new_row.replace(TABLE_CELL_SYMBOL, '') != '^$\n': |
|
table_txt.append(new_row) |
|
data.append(f'{table_tag}\n') |
|
table_txt.append(f'{table_tag}\n') |
|
flag = False |
|
for i, val in enumerate(table_txt): |
|
if val == f'{table_tag}\n': |
|
if not flag: |
|
flag = True |
|
else: |
|
table_txt[i] = '^$\n' |
|
else: |
|
flag = False |
|
table_txt = list(filter(lambda x: x != '^$\n', table_txt)) |
|
for i, val in enumerate(table_txt): |
|
if val == f'{table_tag}\n' and (i > 0) and (i < len(table_txt)-1): |
|
feat1 = _get_table_row_feat(table_txt[i-1].replace('\n', '')) |
|
feat2 = _get_table_row_feat(table_txt[i+1].replace('\n', '')) |
|
if feat1 == feat2: |
|
table_txt[i] = '^$\n' |
|
if len(table_txt) == 1 and table_txt[0] == f'{table_tag}\n': |
|
table_txt[0] = '^$\n' |
|
for i, val in enumerate(table_txt): |
|
if val == table_tag: |
|
continue |
|
if val == '^$\n': |
|
table_txt[i] = '' |
|
continue |
|
table_txt[i] = val[1:][:-2] + '\n' |
|
txt.extend(table_txt) |
|
return txt |
|
|
|
|
|
def output_txt_string(txt_path, txt_string): |
|
''' |
|
将PDF公告的格式化文本字符串写出至一个.txt的纯文本文件 |
|
:param txt_path: 纯文本文件的路径 |
|
:param txt_string: PDF公告的纯文本字符串 |
|
:return: 布尔值,是否写出成功 |
|
''' |
|
try: |
|
with open(txt_path, "w", encoding='utf-8') as f: |
|
f.write(txt_string) |
|
|
|
|
|
|
|
|
|
|
|
except: |
|
return False |
|
return True |
|
|
|
|
|
def refine_table_txt(txt): |
|
''' |
|
对传入的txt_list再进行针对表头和跨页的优化 |
|
:param txt: PDF的文本列表,包含PDF的正文文本内容和追加的表格文本内容 |
|
:return: 一个新的文本列表 |
|
''' |
|
new_txt_list = [] |
|
j = -1 |
|
for i, _ in enumerate(txt): |
|
if txt[i] == f'{TABLE_SYMBOL}\n': |
|
j = i |
|
break |
|
else: |
|
new_txt_list.append(txt[i]) |
|
|
|
table_txt = txt[j:] |
|
|
|
table_txt = list(filter(None, table_txt)) |
|
for i, _ in enumerate(table_txt): |
|
if table_txt[i] == f'{TABLE_SYMBOL}\n' and i + 2 < len(table_txt): |
|
pre_cut = table_txt[i + 1].split(TABLE_CELL_SYMBOL) |
|
if (len(pre_cut) == 1) or (len(pre_cut) == 2 and pre_cut[0] == ''): |
|
table_txt[i + 1] = '' |
|
if '公司及董事会' in table_txt[i + 1]: |
|
table_txt[i + 1] = '' |
|
if table_txt[i + 2] == f'{TABLE_SYMBOL}\n': |
|
table_txt[i] = '' |
|
table_txt[i + 2] = table_txt[i + 1] |
|
table_txt[i + 1] = f'{TABLE_SYMBOL}\n' |
|
|
|
table_txt = list(filter(None, table_txt)) |
|
for i, _ in enumerate(table_txt): |
|
if table_txt[i] == f'{TABLE_SYMBOL}\n' and i + 2 < len(table_txt): |
|
if '同意' in table_txt[i + 1] and table_txt[i + 1].count('同意') == 2: |
|
cut = table_txt[i + 1].split(TABLE_CELL_SYMBOL) |
|
for k, val in enumerate(cut): |
|
if val == '同意': |
|
cut[k] += '票数' |
|
cut[k+1] += '比例' |
|
if val == '反对': |
|
cut[k] += '票数' |
|
cut[k+1] += '比例' |
|
if val == '弃权': |
|
cut[k] += '票数' |
|
cut[k+1] += '比例' |
|
table_txt[i + 1] = TABLE_CELL_SYMBOL.join(cut).replace(SEGMENT_SYMBOL, '')+SEGMENT_SYMBOL |
|
table_txt[i + 2] = '' |
|
continue |
|
|
|
cut1 = table_txt[i + 1].split(TABLE_CELL_SYMBOL) |
|
set_cut1 = list(set(cut1)) |
|
set_cut1.sort(key=cut1.index) |
|
set_cut1 = list(filter(None, set_cut1)) |
|
|
|
cut2 = table_txt[i + 2].split(TABLE_CELL_SYMBOL) |
|
set_cut2 = list(set(cut2)) |
|
set_cut2.sort(key=cut2.index) |
|
set_cut2 = list(filter(None, set_cut2)) |
|
|
|
head_cut = [] |
|
counter = 0 |
|
for val1, val2 in zip(set_cut1, set_cut2): |
|
if counter: |
|
if len(set_cut1) > len(set_cut2): |
|
head_cut = set_cut1 |
|
else: |
|
head_cut = set_cut2 |
|
break |
|
if val1 == val2: |
|
counter += 1 |
|
if counter and head_cut: |
|
table_txt[i + 1] = TABLE_CELL_SYMBOL.join(head_cut) |
|
table_txt[i + 2] = '' |
|
|
|
if counter: |
|
if i+4 < len(table_txt): |
|
cut3 = table_txt[i + 3].split(TABLE_CELL_SYMBOL) |
|
set_cut3 = list(set(cut3)) |
|
set_cut3.sort(key=cut3.index) |
|
set_cut3 = list(filter(None, set_cut3)) |
|
|
|
flag = False |
|
for val3 in set_cut3: |
|
if re.match(r'^[0-9]+(|.)[0-9]+(|%)$', val3): |
|
flag = True |
|
break |
|
|
|
if not flag: |
|
cut4 = table_txt[i + 4].split(TABLE_CELL_SYMBOL) |
|
set_cut4 = list(set(cut4)) |
|
set_cut4.sort(key=cut4.index) |
|
set_cut4 = list(filter(None, set_cut4)) |
|
|
|
counter_2 = 0 |
|
for val3, val4 in zip(set_cut3, set_cut4): |
|
if counter_2: |
|
if len(set_cut4) > len(set_cut3): |
|
head_cut = set_cut4 |
|
else: |
|
head_cut = set_cut3 |
|
break |
|
if val3 == val4: |
|
counter_2 += 1 |
|
if counter_2 and head_cut: |
|
table_txt[i + 1] = TABLE_CELL_SYMBOL.join(head_cut) |
|
table_txt[i + 2] = '' |
|
table_txt[i + 3] = '' |
|
table_txt[i + 4] = '' |
|
|
|
for val in table_txt: |
|
new_txt_list.append(val) |
|
return new_txt_list |
|
|
|
|
|
def get_txt_from_pdf(pdf_path, docx_path=''): |
|
''' |
|
给定一个PDF格式的公告文件,将其转化为格式化的TXT文本字符串 |
|
:param pdf_path: 一个字符串,PDF文件的路径地址 |
|
:return: 一个字符串,PDF经转换后的纯文本(已格式化,前部正文,后部表格) |
|
''' |
|
txt_string = '' |
|
ann_info_list = get_ann_info_from_pdf(pdf_path) |
|
string_abscissa_list = get_string_and_abscissa_list_from_pdf(pdf_path) |
|
document = get_document_from_pdf_converted_docx(pdf_path, docx_path) |
|
string_abscissa_dict = {} |
|
|
|
if ann_info_list != [] and string_abscissa_list != [] and document is not None: |
|
abscissa_list = [x[1] for x in string_abscissa_list] |
|
min_abscissa_value = get_min_abscissa_value(abscissa_list, len(abscissa_list)) |
|
|
|
for id, item in enumerate(string_abscissa_list): |
|
if id > 10: |
|
break |
|
if item[0].replace('\n', '')[-4:] == '有限公司': |
|
break |
|
else: |
|
string_abscissa_list[id][1] = min_abscissa_value |
|
|
|
txt_list = [] |
|
|
|
for id, item in enumerate(string_abscissa_list): |
|
if (not (len(item[0]) <= 3 and item[0].isdigit())): |
|
string_abscissa_dict[item[0]] = item[1] |
|
if item[1] > min_abscissa_value: |
|
if abs(item[1]-min_abscissa_value) <= 8: |
|
txt_list.append(item[0]) |
|
else: |
|
txt_list.append(SEGMENT_SYMBOL + item[0]) |
|
else: |
|
txt_list.append(item[0]) |
|
|
|
txt_list = refine_txt_list(txt_list, ann_info_list, string_abscissa_dict) |
|
|
|
if document is not None: |
|
txt_list.append(SEGMENT_SYMBOL) |
|
txt_list = append_table_from_docx(doc=document, txt=txt_list) |
|
|
|
txt_list = refine_table_txt(txt_list) |
|
|
|
for val in txt_list: |
|
txt_string += val |
|
|
|
return txt_string |