|
|
|
""" |
|
|
|
easyocr.py - A wrapper for easyocr to convert pdf to images to text |
|
""" |
|
|
|
import logging |
|
from pathlib import Path |
|
|
|
logging.basicConfig( |
|
level=logging.INFO, |
|
format="%(asctime)s %(levelname)s %(message)s", |
|
datefmt="%m/%d/%Y %I:%M:%S", |
|
) |
|
|
|
|
|
import gc |
|
import os |
|
import pprint as pp |
|
import re |
|
import shutil |
|
import time |
|
from datetime import datetime |
|
from os.path import basename, isfile, join |
|
from pathlib import Path |
|
import re |
|
import pandas as pd |
|
import wordninja |
|
from cleantext import clean |
|
from natsort import natsorted |
|
from tqdm.auto import tqdm |
|
|
|
from doctr.io import DocumentFile |
|
from doctr.models import ocr_predictor |
|
|
|
|
|
def fast_scandir(dirname): |
|
|
|
|
|
subfolders = [f.path for f in os.scandir(dirname) if f.is_dir()] |
|
for dirname in list(subfolders): |
|
subfolders.extend(fast_scandir(dirname)) |
|
return subfolders |
|
|
|
|
|
def create_folder(directory): |
|
os.makedirs(directory, exist_ok=True) |
|
|
|
|
|
def simple_rename(filepath, target_ext=".txt"): |
|
_fp = Path(filepath) |
|
basename = _fp.stem |
|
return f"OCR_{basename}_{target_ext}" |
|
|
|
|
|
def load_dir_files(directory, req_extension=".txt", return_type="list", verbose=False): |
|
appr_files = [] |
|
|
|
for r, d, f in os.walk(directory): |
|
for prefile in f: |
|
if prefile.endswith(req_extension): |
|
fullpath = os.path.join(r, prefile) |
|
appr_files.append(fullpath) |
|
|
|
appr_files = natsorted(appr_files) |
|
|
|
if verbose: |
|
print("A list of files in the {} directory are: \n".format(directory)) |
|
if len(appr_files) < 10: |
|
pp.pprint(appr_files) |
|
else: |
|
pp.pprint(appr_files[:10]) |
|
print("\n and more. There are a total of {} files".format(len(appr_files))) |
|
|
|
if return_type.lower() == "list": |
|
return appr_files |
|
else: |
|
if verbose: |
|
print("returning dictionary") |
|
|
|
appr_file_dict = {} |
|
for this_file in appr_files: |
|
appr_file_dict[basename(this_file)] = this_file |
|
|
|
return appr_file_dict |
|
|
|
|
|
def corr( |
|
s: str, |
|
add_space_when_numerics=False, |
|
exceptions=["e.g.", "i.e.", "etc.", "cf.", "vs.", "p."], |
|
) -> str: |
|
"""corrects spacing in a string |
|
|
|
Args: |
|
s (str): the string to correct |
|
add_space_when_numerics (bool, optional): [add a space when a period is between two numbers, example 5.73]. Defaults to False. |
|
exceptions (list, optional): [do not change these substrings]. Defaults to ['e.g.', 'i.e.', 'etc.', 'cf.', 'vs.', 'p.']. |
|
|
|
Returns: |
|
str: the corrected string |
|
""" |
|
if add_space_when_numerics: |
|
s = re.sub(r"(\d)\.(\d)", r"\1. \2", s) |
|
|
|
s = re.sub(r"\s+", " ", s) |
|
s = re.sub(r'\s([?.!"](?:\s|$))', r"\1", s) |
|
|
|
|
|
s = re.sub(r"\s\'", r"'", s) |
|
|
|
s = re.sub(r"'\s", r"'", s) |
|
|
|
s = re.sub(r"\s,", r",", s) |
|
|
|
for e in exceptions: |
|
expected_sub = re.sub(r"\s", "", e) |
|
s = s.replace(expected_sub, e) |
|
|
|
return s |
|
|
|
|
|
def is_this_needed_in_output(in_string): |
|
if in_string.isalnum(): |
|
return True |
|
elif in_string == ".": |
|
return True |
|
elif in_string == " ": |
|
return True |
|
elif in_string == "\n": |
|
return True |
|
elif in_string == "-": |
|
return True |
|
else: |
|
return False |
|
|
|
|
|
|
|
def cleantxt_wrap(ugly_text, txt_lan="en"): |
|
|
|
|
|
|
|
cleaned_text = clean( |
|
ugly_text, |
|
fix_unicode=True, |
|
to_ascii=True, |
|
lower=True, |
|
no_line_breaks=True, |
|
no_urls=True, |
|
no_emails=True, |
|
no_phone_numbers=True, |
|
no_numbers=False, |
|
no_digits=False, |
|
no_currency_symbols=True, |
|
no_punct=True, |
|
replace_with_punct="", |
|
replace_with_url="<URL>", |
|
replace_with_email="<EMAIL>", |
|
replace_with_phone_number="<PHONE>", |
|
replace_with_number="<NUM>", |
|
replace_with_digit="0", |
|
replace_with_currency_symbol="<CUR>", |
|
lang=txt_lan, |
|
) |
|
|
|
return cleaned_text |
|
|
|
|
|
def beautify_filename( |
|
filename, num_words=25, start_reverse=False, word_separator="_" |
|
) -> str: |
|
""" |
|
beautify_filename takes a filename and returns a beautified version of it |
|
|
|
Args: |
|
filename (str): the filename to beautify |
|
num_words (int, optional): _description_. Defaults to 25. |
|
start_reverse (bool, optional): _description_. Defaults to False. |
|
word_separator (str, optional): _description_. Defaults to "_". |
|
|
|
Returns: |
|
str: the beautified filename |
|
""" |
|
|
|
filename = str(filename) |
|
index_file_Ext = filename.rfind(".") |
|
current_name = str(filename)[:index_file_Ext] |
|
if current_name[-1].isnumeric(): |
|
current_name = current_name + "s" |
|
clean_name = cleantxt_wrap(current_name) |
|
file_words = wordninja.split(clean_name) |
|
|
|
if len(file_words) <= num_words: |
|
num_words = len(file_words) |
|
|
|
if start_reverse: |
|
t_file_words = file_words[-num_words:] |
|
else: |
|
t_file_words = file_words[:num_words] |
|
|
|
pretty_name = word_separator.join(t_file_words) |
|
|
|
|
|
return pretty_name[ |
|
: (len(pretty_name) - 1) |
|
] |
|
|
|
|
|
def fix_punct_spaces(string): |
|
""" |
|
fix_punct_spaces - replace spaces around punctuation with punctuation. For example, "hello , there" -> "hello, there" |
|
|
|
Parameters |
|
---------- |
|
string : str, required, input string to be corrected |
|
|
|
Returns |
|
------- |
|
str, corrected string |
|
""" |
|
|
|
fix_spaces = re.compile(r"\s*([?!.,]+(?:\s+[?!.,]+)*)\s*") |
|
string = fix_spaces.sub(lambda x: "{} ".format(x.group(1).replace(" ", "")), string) |
|
string = string.replace(" ' ", "'") |
|
string = string.replace(' " ', '"') |
|
return string.strip() |
|
|
|
|
|
def clean_OCR(ugly_text: str): |
|
""" |
|
clean_OCR - clean the OCR text files. |
|
|
|
Parameters |
|
---------- |
|
ugly_text : str, required, input string to be cleaned |
|
|
|
Returns |
|
------- |
|
str, cleaned string |
|
""" |
|
|
|
cleaned_text = ugly_text.replace("\n", " ") |
|
|
|
cleaned_text = cleaned_text.replace("\t", " ") |
|
|
|
cleaned_text = cleaned_text.replace(" ", " ") |
|
|
|
cleaned_text = cleaned_text.lstrip() |
|
|
|
cleaned_text = cleaned_text.replace("- ", "") |
|
cleaned_text = cleaned_text.replace(" -", "") |
|
return fix_punct_spaces(cleaned_text) |
|
|
|
|
|
import os |
|
import shutil |
|
from os.path import join |
|
|
|
|
|
|
|
|
|
def move2completed(from_dir, filename, new_folder="completed", verbose=False): |
|
|
|
|
|
old_filepath = join(from_dir, filename) |
|
|
|
new_filedirectory = join(from_dir, new_folder) |
|
|
|
if not os.path.isdir(new_filedirectory): |
|
os.mkdir(new_filedirectory) |
|
if verbose: |
|
print("created new directory for files at: \n", new_filedirectory) |
|
new_filepath = join(new_filedirectory, filename) |
|
|
|
try: |
|
shutil.move(old_filepath, new_filepath) |
|
logging.info("successfully moved the file {} to */completed.".format(filename)) |
|
except: |
|
logging.info( |
|
"ERROR! unable to move file to \n{}. Please investigate".format( |
|
new_filepath |
|
) |
|
) |
|
|
|
|
|
"""### download files |
|
|
|
**old versions** |
|
""" |
|
|
|
import re |
|
|
|
|
|
def URL_string_filter(text): |
|
custom_printable = ( |
|
"0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ._" |
|
) |
|
|
|
filtered = "".join((filter(lambda i: i in custom_printable, text))) |
|
|
|
return filtered |
|
|
|
|
|
import shutil |
|
from datetime import datetime |
|
from os.path import getsize |
|
|
|
import requests |
|
|
|
|
|
|
|
|
|
def get_zip_URL( |
|
URLtoget, extract_loc=None, file_header="dropboxexport_", verbose=False |
|
): |
|
|
|
r = requests.get(URLtoget, allow_redirects=True) |
|
names = "my_file.zip" |
|
fixed_fnames = names.split(";") |
|
this_filename = file_header + URL_string_filter(fixed_fnames[0]) |
|
|
|
|
|
if extract_loc is None: |
|
extract_loc = "dropbox_dl" |
|
dl_place = Path.cwd() / extract_loc |
|
create_folder(dl_place) |
|
save_loc = Path.cwd() / this_filename |
|
open(save_loc, "wb").write(r.content) |
|
if verbose: |
|
print("downloaded file size was {} MB".format(getsize(save_loc) / 1000000)) |
|
|
|
|
|
shutil.unpack_archive(save_loc, extract_dir=dl_place) |
|
if verbose: |
|
print("extracted zip file - ", datetime.now()) |
|
x = load_dir_files(dl_place, req_extension="", verbose=verbose) |
|
|
|
try: |
|
os.remove(save_loc) |
|
del save_loc |
|
except: |
|
logging.info( |
|
"unable to delete original zipfile - check if exists", datetime.now() |
|
) |
|
print("finished extracting zip - ", datetime.now()) |
|
|
|
return dl_place |
|
|
|
|
|
"""--- |
|
|
|
**new versions** |
|
""" |
|
|
|
|
|
|
|
|
|
def clean_file_name(file_path): |
|
"""helper to clean filenames""" |
|
file_path = Path(file_path) |
|
|
|
cln_base = re.sub(r"[^\w\s]", "", file_path.stem) |
|
|
|
cln_base = re.sub(r"\s", "_", cln_base) |
|
return cln_base + file_path.suffix |
|
|
|
|
|
def download_URL(url: str, file=None, dlpath=None, verbose=False): |
|
""" |
|
download_URL - download a file from a URL and show progress bar |
|
Parameters |
|
---------- |
|
url : str, URL to download |
|
file : str, optional, default None, name of file to save to. If None, will use the filename from the URL |
|
dlpath : str, optional, default None, path to save the file to. If None, will save to the current working directory |
|
verbose : bool, optional, default False, print progress bar |
|
Returns |
|
------- |
|
str - path to the downloaded file |
|
""" |
|
|
|
if file is None: |
|
if "?dl=" in url: |
|
|
|
prefile = url.split("/")[-1] |
|
filename = str(prefile).split("?dl=")[0] |
|
else: |
|
filename = url.split("/")[-1] |
|
file = clean_file_name(filename) |
|
if dlpath is None: |
|
dlpath = Path.cwd() |
|
else: |
|
dlpath = Path(dlpath) |
|
r = requests.get(url, stream=True, allow_redirects=True) |
|
total_size = int(r.headers.get("content-length")) |
|
initial_pos = 0 |
|
dl_loc = dlpath / file |
|
with open(str(dl_loc.resolve()), "wb") as f: |
|
with tqdm( |
|
total=total_size, |
|
unit="B", |
|
unit_scale=True, |
|
desc=file, |
|
initial=initial_pos, |
|
ascii=True, |
|
) as pbar: |
|
for ch in r.iter_content(chunk_size=1024): |
|
if ch: |
|
f.write(ch) |
|
pbar.update(len(ch)) |
|
if verbose: |
|
print(f"\ndownloaded {file} to {dlpath}\n") |
|
return str(dl_loc.resolve()) |
|
|
|
|
|
"""## pdf2text functions |
|
|
|
- now uses **easyocr** |
|
- link to [docs](https://www.jaided.ai/easyocr/documentation/) |
|
- the [tutorial](https://www.jaided.ai/easyocr/tutorial/) |
|
- a list of available languages is [here](https://www.jaided.ai/easyocr/) |
|
|
|
""" |
|
|
|
|
|
|
|
|
|
custom_replace_list = { |
|
"t0": "to", |
|
"'$": "'s", |
|
",,": ", ", |
|
"_ ": " ", |
|
" '": "'", |
|
} |
|
|
|
replace_corr_exceptions = { |
|
"i. e.": "i.e.", |
|
"e. g.": "e.g.", |
|
"e. g": "e.g.", |
|
" ,": ",", |
|
} |
|
|
|
|
|
|
|
|
|
from spellchecker import SpellChecker |
|
|
|
spell = SpellChecker() |
|
|
|
|
|
def check_word_spelling(word: str) -> bool: |
|
""" |
|
check_word_spelling - check the spelling of a word |
|
|
|
Args: |
|
word (str): word to check |
|
|
|
Returns: |
|
bool: True if word is spelled correctly, False if not |
|
""" |
|
|
|
misspelled = spell.unknown([word]) |
|
|
|
return len(misspelled) == 0 |
|
|
|
|
|
def eval_and_replace(text: str, match_token: str = "- ") -> str: |
|
""" |
|
eval_and_replace - conditionally replace all instances of a substring in a string based on whether the eliminated substring results in a valid word |
|
|
|
Args: |
|
text (str): text to evaluate |
|
match_token (str, optional): token to replace. Defaults to "- ". |
|
|
|
Returns: |
|
str: text with replaced tokens |
|
""" |
|
|
|
if match_token not in text: |
|
return text |
|
else: |
|
while True: |
|
full_before_text = text.split(match_token, maxsplit=1)[0] |
|
before_text = [ |
|
char for char in full_before_text.split()[-1] if char.isalpha() |
|
] |
|
before_text = "".join(before_text) |
|
full_after_text = text.split(match_token, maxsplit=1)[-1] |
|
after_text = [char for char in full_after_text.split()[0] if char.isalpha()] |
|
after_text = "".join(after_text) |
|
full_text = before_text + after_text |
|
if check_word_spelling(full_text): |
|
text = full_before_text + full_after_text |
|
else: |
|
text = full_before_text + " " + full_after_text |
|
if match_token not in text: |
|
break |
|
return text |
|
|
|
|
|
def cleantxt_ocr(ugly_text): |
|
|
|
|
|
|
|
cleaned_text = clean( |
|
ugly_text, |
|
fix_unicode=True, |
|
to_ascii=True, |
|
lower=False, |
|
no_line_breaks=True, |
|
no_urls=True, |
|
no_emails=True, |
|
no_phone_numbers=False, |
|
no_numbers=False, |
|
no_digits=False, |
|
no_currency_symbols=False, |
|
no_punct=False, |
|
replace_with_punct="", |
|
replace_with_url="<URL>", |
|
replace_with_email="<EMAIL>", |
|
replace_with_phone_number="<PHONE>", |
|
replace_with_number="<NUM>", |
|
replace_with_digit="0", |
|
replace_with_currency_symbol="<CUR>", |
|
lang="en", |
|
) |
|
|
|
return cleaned_text |
|
|
|
|
|
def format_ocr_out(OCR_data): |
|
|
|
if isinstance(OCR_data, list): |
|
text = " ".join(OCR_data) |
|
else: |
|
text = str(OCR_data) |
|
_clean = cleantxt_ocr(text) |
|
return corr(_clean) |
|
|
|
|
|
def postprocess(text: str) -> str: |
|
"""to be used after recombining the lines""" |
|
|
|
proc = corr(cleantxt_ocr(text)) |
|
|
|
for k, v in custom_replace_list.items(): |
|
proc = proc.replace(str(k), str(v)) |
|
|
|
proc = corr(proc) |
|
|
|
|
|
|
|
for k, v in replace_corr_exceptions.items(): |
|
proc = proc.replace(str(k), str(v)) |
|
|
|
return eval_and_replace(proc) |
|
|
|
|
|
def result2text(result, as_text=False) -> str or list: |
|
"""Convert OCR result to text""" |
|
|
|
full_doc = [] |
|
for i, page in enumerate(result.pages, start=1): |
|
text = "" |
|
for block in page.blocks: |
|
text += "\n\t" |
|
for line in block.lines: |
|
for word in line.words: |
|
|
|
text += word.value + " " |
|
full_doc.append(text) |
|
|
|
return "\n".join(full_doc) if as_text else full_doc |
|
|
|
|
|
import warnings |
|
from datetime import date |
|
from os.path import join |
|
|
|
|
|
|
|
|
|
def convert_PDF_to_Text( |
|
PDF_file, |
|
ocr_model=None, |
|
max_pages: int = 20, |
|
): |
|
|
|
st = time.perf_counter() |
|
PDF_file = Path(PDF_file) |
|
ocr_model = ocr_predictor(pretrained=True) if ocr_model is None else ocr_model |
|
logging.info(f"starting OCR on {PDF_file.name}") |
|
doc = DocumentFile.from_pdf(PDF_file) |
|
truncated = False |
|
if len(doc) > max_pages: |
|
logging.warning( |
|
f"PDF has {len(doc)} pages, which is more than {max_pages}.. truncating" |
|
) |
|
doc = doc[:max_pages] |
|
truncated = True |
|
|
|
|
|
logging.info(f"running OCR on {len(doc)} pages") |
|
result = ocr_model(doc) |
|
raw_text = result2text(result) |
|
proc_text = [format_ocr_out(r) for r in raw_text] |
|
fin_text = [postprocess(t) for t in proc_text] |
|
|
|
ocr_results = "\n\n".join(fin_text) |
|
|
|
fn_rt = time.perf_counter() - st |
|
|
|
logging.info("OCR complete") |
|
|
|
results_dict = { |
|
"num_pages": len(doc), |
|
"runtime": round(fn_rt, 2), |
|
"date": str(date.today()), |
|
"converted_text": ocr_results, |
|
"truncated": truncated, |
|
"length": len(ocr_results), |
|
} |
|
|
|
return results_dict |
|
|
|
|
|
from os.path import basename, dirname, join |
|
|
|
|
|
from libretranslatepy import LibreTranslateAPI |
|
|
|
lt = LibreTranslateAPI("https://translate.astian.org/") |
|
|
|
|
|
def translate_text(text, source_l, target_l="en"): |
|
|
|
return str(lt.translate(text, source_l, target_l)) |
|
|
|
|
|
def translate_doc(filepath, lang_start, lang_end="en", verbose=False): |
|
"""translate a document from lang_start to lang_end |
|
|
|
{'code': 'en', 'name': 'English'}, |
|
{'code': 'fr', 'name': 'French'}, |
|
{'code': 'de', 'name': 'German'}, |
|
{'code': 'it', 'name': 'Italian'},""" |
|
|
|
src_folder = dirname(filepath) |
|
trgt_folder = join(src_folder, "translated to {}".format(lang_end)) |
|
create_folder(trgt_folder) |
|
with open(filepath, "r", encoding="utf-8", errors="ignore") as f: |
|
foreign_t = f.readlines() |
|
in_name = basename(filepath) |
|
translated_doc = [] |
|
for line in tqdm( |
|
foreign_t, total=len(foreign_t), desc="translating {}...".format(in_name[:10]) |
|
): |
|
translated_line = translate_text(line, lang_start, lang_end) |
|
translated_doc.append(translated_line) |
|
t_out_name = "[To {}]".format(lang_end) + simple_rename(in_name) + ".txt" |
|
out_path = join(trgt_folder, t_out_name) |
|
with open(out_path, "w", encoding="utf-8", errors="ignore") as f_o: |
|
f_o.writelines(translated_doc) |
|
if verbose: |
|
print("finished translating the document! - ", datetime.now()) |
|
return out_path |
|
|
|
|
|
"""translation codes |
|
|
|
|
|
``` |
|
|
|
|
|
print(lt.languages()) |
|
call ^ |
|
``` |
|
|
|
- link to their github [here](https://github.com/argosopentech/LibreTranslate-py) |
|
|
|
# Load FIles |
|
""" |
|
|