Spaces:
Running
Running
# -*- coding: utf-8 -*- | |
""" | |
easyocr.py - A wrapper for easyocr to convert pdf to images to text | |
""" | |
import logging | |
from pathlib import Path | |
logging.basicConfig( | |
level=logging.INFO, | |
format="%(asctime)s %(levelname)s %(message)s", | |
datefmt="%m/%d/%Y %I:%M:%S", | |
) | |
import gc | |
import os | |
import pprint as pp | |
import re | |
import shutil | |
import time | |
from datetime import datetime | |
from os.path import basename, isfile, join | |
from pathlib import Path | |
import re | |
import pandas as pd | |
import wordninja | |
from cleantext import clean | |
from natsort import natsorted | |
from tqdm.auto import tqdm | |
from doctr.io import DocumentFile | |
from doctr.models import ocr_predictor | |
def fast_scandir(dirname): | |
# return all subfolders in a given filepath | |
subfolders = [f.path for f in os.scandir(dirname) if f.is_dir()] | |
for dirname in list(subfolders): | |
subfolders.extend(fast_scandir(dirname)) | |
return subfolders # list | |
def create_folder(directory): | |
os.makedirs(directory, exist_ok=True) | |
def simple_rename(filepath, target_ext=".txt"): | |
_fp = Path(filepath) | |
basename = _fp.stem | |
return f"OCR_{basename}_{target_ext}" | |
def load_dir_files(directory, req_extension=".txt", return_type="list", verbose=False): | |
appr_files = [] | |
# r=root, d=directories, f = files | |
for r, d, f in os.walk(directory): | |
for prefile in f: | |
if prefile.endswith(req_extension): | |
fullpath = os.path.join(r, prefile) | |
appr_files.append(fullpath) | |
appr_files = natsorted(appr_files) | |
if verbose: | |
print("A list of files in the {} directory are: \n".format(directory)) | |
if len(appr_files) < 10: | |
pp.pprint(appr_files) | |
else: | |
pp.pprint(appr_files[:10]) | |
print("\n and more. There are a total of {} files".format(len(appr_files))) | |
if return_type.lower() == "list": | |
return appr_files | |
else: | |
if verbose: | |
print("returning dictionary") | |
appr_file_dict = {} | |
for this_file in appr_files: | |
appr_file_dict[basename(this_file)] = this_file | |
return appr_file_dict | |
def corr( | |
s: str, | |
add_space_when_numerics=False, | |
exceptions=["e.g.", "i.e.", "etc.", "cf.", "vs.", "p."], | |
) -> str: | |
"""corrects spacing in a string | |
Args: | |
s (str): the string to correct | |
add_space_when_numerics (bool, optional): [add a space when a period is between two numbers, example 5.73]. Defaults to False. | |
exceptions (list, optional): [do not change these substrings]. Defaults to ['e.g.', 'i.e.', 'etc.', 'cf.', 'vs.', 'p.']. | |
Returns: | |
str: the corrected string | |
""" | |
if add_space_when_numerics: | |
s = re.sub(r"(\d)\.(\d)", r"\1. \2", s) | |
s = re.sub(r"\s+", " ", s) | |
s = re.sub(r'\s([?.!"](?:\s|$))', r"\1", s) | |
# fix space before apostrophe | |
s = re.sub(r"\s\'", r"'", s) | |
# fix space after apostrophe | |
s = re.sub(r"'\s", r"'", s) | |
# fix space before comma | |
s = re.sub(r"\s,", r",", s) | |
for e in exceptions: | |
expected_sub = re.sub(r"\s", "", e) | |
s = s.replace(expected_sub, e) | |
return s | |
def is_this_needed_in_output(in_string): | |
if in_string.isalnum(): | |
return True | |
elif in_string == ".": | |
return True | |
elif in_string == " ": | |
return True | |
elif in_string == "\n": | |
return True | |
elif in_string == "-": | |
return True | |
else: | |
return False | |
# @title clean filenames | |
def cleantxt_wrap(ugly_text, txt_lan="en"): | |
# a wrapper for clean text with options different than default | |
# https://pypi.org/project/clean-text/ | |
cleaned_text = clean( | |
ugly_text, | |
fix_unicode=True, # fix various unicode errors | |
to_ascii=True, # transliterate to closest ASCII representation | |
lower=True, # lowercase text | |
no_line_breaks=True, # fully strip line breaks as opposed to only normalizing them | |
no_urls=True, # replace all URLs with a special token | |
no_emails=True, # replace all email addresses with a special token | |
no_phone_numbers=True, # replace all phone numbers with a special token | |
no_numbers=False, # replace all numbers with a special token | |
no_digits=False, # replace all digits with a special token | |
no_currency_symbols=True, # replace all currency symbols with a special token | |
no_punct=True, # remove punctuations | |
replace_with_punct="", # instead of removing punctuations you may replace them | |
replace_with_url="<URL>", | |
replace_with_email="<EMAIL>", | |
replace_with_phone_number="<PHONE>", | |
replace_with_number="<NUM>", | |
replace_with_digit="0", | |
replace_with_currency_symbol="<CUR>", | |
lang=txt_lan, # set to 'de' for German special handling | |
) | |
return cleaned_text | |
def beautify_filename( | |
filename, num_words=25, start_reverse=False, word_separator="_" | |
) -> str: | |
""" | |
beautify_filename takes a filename and returns a beautified version of it | |
Args: | |
filename (str): the filename to beautify | |
num_words (int, optional): _description_. Defaults to 25. | |
start_reverse (bool, optional): _description_. Defaults to False. | |
word_separator (str, optional): _description_. Defaults to "_". | |
Returns: | |
str: the beautified filename | |
""" | |
filename = str(filename) | |
index_file_Ext = filename.rfind(".") | |
current_name = str(filename)[:index_file_Ext] # get rid of extension | |
if current_name[-1].isnumeric(): | |
current_name = current_name + "s" | |
clean_name = cleantxt_wrap(current_name) | |
file_words = wordninja.split(clean_name) | |
# splits concatenated text into a list of words based on common word freq | |
if len(file_words) <= num_words: | |
num_words = len(file_words) | |
if start_reverse: | |
t_file_words = file_words[-num_words:] | |
else: | |
t_file_words = file_words[:num_words] | |
pretty_name = word_separator.join(t_file_words) # see function argument | |
# NOTE IT DOES NOT RETURN THE EXTENSION | |
return pretty_name[ | |
: (len(pretty_name) - 1) | |
] # there is a space always at the end, so -1 | |
def fix_punct_spaces(string): | |
""" | |
fix_punct_spaces - replace spaces around punctuation with punctuation. For example, "hello , there" -> "hello, there" | |
Parameters | |
---------- | |
string : str, required, input string to be corrected | |
Returns | |
------- | |
str, corrected string | |
""" | |
fix_spaces = re.compile(r"\s*([?!.,]+(?:\s+[?!.,]+)*)\s*") | |
string = fix_spaces.sub(lambda x: "{} ".format(x.group(1).replace(" ", "")), string) | |
string = string.replace(" ' ", "'") | |
string = string.replace(' " ', '"') | |
return string.strip() | |
def clean_OCR(ugly_text: str): | |
""" | |
clean_OCR - clean the OCR text files. | |
Parameters | |
---------- | |
ugly_text : str, required, input string to be cleaned | |
Returns | |
------- | |
str, cleaned string | |
""" | |
# Remove all the newlines. | |
cleaned_text = ugly_text.replace("\n", " ") | |
# Remove all the tabs. | |
cleaned_text = cleaned_text.replace("\t", " ") | |
# Remove all the double spaces. | |
cleaned_text = cleaned_text.replace(" ", " ") | |
# Remove all the spaces at the beginning of the text. | |
cleaned_text = cleaned_text.lstrip() | |
# remove all instances of "- " and " - " | |
cleaned_text = cleaned_text.replace("- ", "") | |
cleaned_text = cleaned_text.replace(" -", "") | |
return fix_punct_spaces(cleaned_text) | |
import os | |
import shutil | |
from os.path import join | |
# @markdown move2completed | |
def move2completed(from_dir, filename, new_folder="completed", verbose=False): | |
# this is the better version | |
old_filepath = join(from_dir, filename) | |
new_filedirectory = join(from_dir, new_folder) | |
if not os.path.isdir(new_filedirectory): | |
os.mkdir(new_filedirectory) | |
if verbose: | |
print("created new directory for files at: \n", new_filedirectory) | |
new_filepath = join(new_filedirectory, filename) | |
try: | |
shutil.move(old_filepath, new_filepath) | |
logging.info("successfully moved the file {} to */completed.".format(filename)) | |
except: | |
logging.info( | |
"ERROR! unable to move file to \n{}. Please investigate".format( | |
new_filepath | |
) | |
) | |
"""### download files | |
**old versions** | |
""" | |
import re | |
def URL_string_filter(text): | |
custom_printable = ( | |
"0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ._" | |
) | |
filtered = "".join((filter(lambda i: i in custom_printable, text))) | |
return filtered | |
import shutil # zipfile formats | |
from datetime import datetime | |
from os.path import getsize | |
import requests | |
# @markdown old download MAIN | |
def get_zip_URL( | |
URLtoget, extract_loc=None, file_header="dropboxexport_", verbose=False | |
): | |
r = requests.get(URLtoget, allow_redirects=True) | |
names = "my_file.zip" | |
fixed_fnames = names.split(";") # split the multiple results | |
this_filename = file_header + URL_string_filter(fixed_fnames[0]) | |
# define paths and save the zip file | |
if extract_loc is None: | |
extract_loc = "dropbox_dl" | |
dl_place = Path.cwd() / extract_loc | |
create_folder(dl_place) | |
save_loc = Path.cwd() / this_filename | |
open(save_loc, "wb").write(r.content) | |
if verbose: | |
print("downloaded file size was {} MB".format(getsize(save_loc) / 1000000)) | |
# unpack the archive | |
shutil.unpack_archive(save_loc, extract_dir=dl_place) | |
if verbose: | |
print("extracted zip file - ", datetime.now()) | |
x = load_dir_files(dl_place, req_extension="", verbose=verbose) | |
# remove original | |
try: | |
os.remove(save_loc) | |
del save_loc | |
except: | |
logging.info( | |
"unable to delete original zipfile - check if exists", datetime.now() | |
) | |
print("finished extracting zip - ", datetime.now()) | |
return dl_place | |
"""--- | |
**new versions** | |
""" | |
# @markdown downloading URL files with python | |
def clean_file_name(file_path): | |
"""helper to clean filenames""" | |
file_path = Path(file_path) | |
# Remove all non-alphanumeric characters | |
cln_base = re.sub(r"[^\w\s]", "", file_path.stem) | |
# Replace all spaces with underscores | |
cln_base = re.sub(r"\s", "_", cln_base) | |
return cln_base + file_path.suffix | |
def download_URL(url: str, file=None, dlpath=None, verbose=False): | |
""" | |
download_URL - download a file from a URL and show progress bar | |
Parameters | |
---------- | |
url : str, URL to download | |
file : str, optional, default None, name of file to save to. If None, will use the filename from the URL | |
dlpath : str, optional, default None, path to save the file to. If None, will save to the current working directory | |
verbose : bool, optional, default False, print progress bar | |
Returns | |
------- | |
str - path to the downloaded file | |
""" | |
if file is None: | |
if "?dl=" in url: | |
# is a dropbox link | |
prefile = url.split("/")[-1] | |
filename = str(prefile).split("?dl=")[0] | |
else: | |
filename = url.split("/")[-1] | |
file = clean_file_name(filename) | |
if dlpath is None: | |
dlpath = Path.cwd() # save to current working directory | |
else: | |
dlpath = Path(dlpath) # make a path object | |
r = requests.get(url, stream=True, allow_redirects=True) | |
total_size = int(r.headers.get("content-length")) | |
initial_pos = 0 | |
dl_loc = dlpath / file | |
with open(str(dl_loc.resolve()), "wb") as f: | |
with tqdm( | |
total=total_size, | |
unit="B", | |
unit_scale=True, | |
desc=file, | |
initial=initial_pos, | |
ascii=True, | |
) as pbar: | |
for ch in r.iter_content(chunk_size=1024): | |
if ch: | |
f.write(ch) | |
pbar.update(len(ch)) | |
if verbose: | |
print(f"\ndownloaded {file} to {dlpath}\n") | |
return str(dl_loc.resolve()) | |
"""## pdf2text functions | |
- now uses **easyocr** | |
- link to [docs](https://www.jaided.ai/easyocr/documentation/) | |
- the [tutorial](https://www.jaided.ai/easyocr/tutorial/) | |
- a list of available languages is [here](https://www.jaided.ai/easyocr/) | |
""" | |
# need to run only once to load model into memory | |
custom_replace_list = { | |
"t0": "to", | |
"'$": "'s", | |
",,": ", ", | |
"_ ": " ", | |
" '": "'", | |
} | |
replace_corr_exceptions = { | |
"i. e.": "i.e.", | |
"e. g.": "e.g.", | |
"e. g": "e.g.", | |
" ,": ",", | |
} | |
# TODO: add logic to 'corr' function to not add space after period when surrounded | |
# by numbers, example 5.6 | |
from spellchecker import SpellChecker | |
spell = SpellChecker() | |
def check_word_spelling(word: str) -> bool: | |
""" | |
check_word_spelling - check the spelling of a word | |
Args: | |
word (str): word to check | |
Returns: | |
bool: True if word is spelled correctly, False if not | |
""" | |
misspelled = spell.unknown([word]) | |
return len(misspelled) == 0 | |
def eval_and_replace(text: str, match_token: str = "- ") -> str: | |
""" | |
eval_and_replace - conditionally replace all instances of a substring in a string based on whether the eliminated substring results in a valid word | |
Args: | |
text (str): text to evaluate | |
match_token (str, optional): token to replace. Defaults to "- ". | |
Returns: | |
str: text with replaced tokens | |
""" | |
if match_token not in text: | |
return text | |
else: | |
while True: | |
full_before_text = text.split(match_token, maxsplit=1)[0] | |
before_text = [ | |
char for char in full_before_text.split()[-1] if char.isalpha() | |
] | |
before_text = "".join(before_text) | |
full_after_text = text.split(match_token, maxsplit=1)[-1] | |
after_text = [char for char in full_after_text.split()[0] if char.isalpha()] | |
after_text = "".join(after_text) | |
full_text = before_text + after_text | |
if check_word_spelling(full_text): | |
text = full_before_text + full_after_text | |
else: | |
text = full_before_text + " " + full_after_text | |
if match_token not in text: | |
break | |
return text | |
def cleantxt_ocr(ugly_text): | |
# a wrapper for clean text with options different than default | |
# https://pypi.org/project/clean-text/ | |
cleaned_text = clean( | |
ugly_text, | |
fix_unicode=True, # fix various unicode errors | |
to_ascii=True, # transliterate to closest ASCII representation | |
lower=False, # lowercase text | |
no_line_breaks=True, # fully strip line breaks as opposed to only normalizing them | |
no_urls=True, # replace all URLs with a special token | |
no_emails=True, # replace all email addresses with a special token | |
no_phone_numbers=False, # replace all phone numbers with a special token | |
no_numbers=False, # replace all numbers with a special token | |
no_digits=False, # replace all digits with a special token | |
no_currency_symbols=False, # replace all currency symbols with a special token | |
no_punct=False, # remove punctuations | |
replace_with_punct="", # instead of removing punctuations you may replace them | |
replace_with_url="<URL>", | |
replace_with_email="<EMAIL>", | |
replace_with_phone_number="<PHONE>", | |
replace_with_number="<NUM>", | |
replace_with_digit="0", | |
replace_with_currency_symbol="<CUR>", | |
lang="en", # set to 'de' for German special handling | |
) | |
return cleaned_text | |
def format_ocr_out(OCR_data): | |
if isinstance(OCR_data, list): | |
text = " ".join(OCR_data) | |
else: | |
text = str(OCR_data) | |
_clean = cleantxt_ocr(text) | |
return corr(_clean) | |
def postprocess(text: str) -> str: | |
"""to be used after recombining the lines""" | |
proc = corr(cleantxt_ocr(text)) | |
for k, v in custom_replace_list.items(): | |
proc = proc.replace(str(k), str(v)) | |
proc = corr(proc) | |
# TODO: upgrade corr() function to handle commas | |
# proc = proc.replace(" ,", ",") | |
for k, v in replace_corr_exceptions.items(): | |
proc = proc.replace(str(k), str(v)) | |
return eval_and_replace(proc) | |
def result2text(result) -> str: | |
"""Convert OCR result to text""" | |
full_doc = [] | |
for i, page in enumerate(result.pages, start=1): | |
text = "" | |
for block in page.blocks: | |
text += "\n\t" | |
for line in block.lines: | |
for word in line.words: | |
# print(dir(word)) | |
text += word.value + " " | |
full_doc.append(text) | |
full_text = "\n".join(full_doc) | |
return full_text | |
import warnings | |
from datetime import date | |
from os.path import join | |
# @title define main fn - `convert_PDF_to_Text()` | |
# @markdown `convert_PDF_to_Text(PDF_file, multilang=False, use_page_labels=False, saveloc="")` | |
def convert_PDF_to_Text( | |
PDF_file, | |
ocr_model=None, | |
max_pages: int = 20, | |
): | |
st = time.perf_counter() | |
PDF_file = Path(PDF_file) | |
ocr_model = ocr_predictor(pretrained=True) if ocr_model is None else ocr_model | |
logging.info(f"starting OCR on {PDF_file.name}") | |
doc = DocumentFile.from_pdf(PDF_file) | |
if len(doc) > max_pages: | |
logging.warning(f"PDF has {len(doc)} pages, which is more than {max_pages}.. truncating") | |
doc = doc[:max_pages] | |
# Analyze | |
logging.info(f"running OCR on {len(doc)} pages") | |
result = ocr_model(doc) | |
raw_text = result2text(result) | |
proc_text = format_ocr_out(raw_text) | |
output_text = postprocess(proc_text) | |
fn_rt = time.perf_counter() - st | |
logging.info("OCR complete") | |
results_dict = { | |
"num_pages": len(doc), | |
"runtime": round(fn_rt, 2), | |
"date": str(date.today()), | |
"converted_text": output_text, | |
"length": len(output_text), | |
} | |
return results_dict | |
from os.path import basename, dirname, join | |
# @title translation functions | |
from libretranslatepy import LibreTranslateAPI | |
lt = LibreTranslateAPI("https://translate.astian.org/") | |
def translate_text(text, source_l, target_l="en"): | |
return str(lt.translate(text, source_l, target_l)) | |
def translate_doc(filepath, lang_start, lang_end="en", verbose=False): | |
"""translate a document from lang_start to lang_end | |
{'code': 'en', 'name': 'English'}, | |
{'code': 'fr', 'name': 'French'}, | |
{'code': 'de', 'name': 'German'}, | |
{'code': 'it', 'name': 'Italian'},""" | |
src_folder = dirname(filepath) | |
trgt_folder = join(src_folder, "translated to {}".format(lang_end)) | |
create_folder(trgt_folder) | |
with open(filepath, "r", encoding="utf-8", errors="ignore") as f: | |
foreign_t = f.readlines() | |
in_name = basename(filepath) | |
translated_doc = [] | |
for line in tqdm( | |
foreign_t, total=len(foreign_t), desc="translating {}...".format(in_name[:10]) | |
): | |
translated_line = translate_text(line, lang_start, lang_end) | |
translated_doc.append(translated_line) | |
t_out_name = "[To {}]".format(lang_end) + simple_rename(in_name) + ".txt" | |
out_path = join(trgt_folder, t_out_name) | |
with open(out_path, "w", encoding="utf-8", errors="ignore") as f_o: | |
f_o.writelines(translated_doc) | |
if verbose: | |
print("finished translating the document! - ", datetime.now()) | |
return out_path | |
"""translation codes | |
``` | |
print(lt.languages()) | |
call ^ | |
``` | |
- link to their github [here](https://github.com/argosopentech/LibreTranslate-py) | |
# Load FIles | |
""" | |