entity_extraction / services /ocr_service.py
adit94's picture
Update services/ocr_service.py
c730ced verified
raw
history blame
No virus
4.06 kB
import os
import re
import requests
import json
import docx
import pytesseract
from nltk.tokenize import sent_tokenize, word_tokenize
from PyPDF2 import PdfReader
from pdf2image import convert_from_path
class OCRService:
def __init__(self, LLAMAPARSE_API_KEY):
self.llama_parse_key = LLAMAPARSE_API_KEY
return
def extract_ocrless_pdf(self, filepath):
reader = PdfReader(filepath)
extracted_text = ""
for page in reader.pages:
text = page.extract_text()
extracted_text += " "
extracted_text += text
return extracted_text
def extract_text_from_pdf(self, filepath):
images = convert_from_path(filepath, thread_count=4)
full_text = []
#config = (r"--oem 2 --psm 7")
for image_idx, image in enumerate(images):
text = pytesseract.image_to_string(image)
#text = pytesseract.image_to_string(image, config=config)
full_text.append(text)
return full_text
def extract_text_from_document(self, filepath):
file_ext = os.path.splitext(filepath)[-1]
if file_ext in [".pdf"]:
text_to_process = self.extract_text_from_pdf(filepath)
text_joined = " ".join(text_to_process)
#with open(f"{os.path.splitext(filepath)[0]}.txt", "w") as file:
# file.writelines(text_to_process)
elif file_ext in [".doc", ".DOC", ".docx", ".DOCX"]:
doc_content = docx.Document(filepath)
text_to_process = [i.text for i in doc_content.paragraphs]
text_joined = " \n ".join(text_to_process)
#with open(f"{os.path.splitext(filepath)[0]}.txt", "w") as file:
# file.write(text_joined)
elif file_ext in [".txt"]:
file = open(f"{os.path.splitext(filepath)[0]}.txt", encoding="utf8")
text_joined = file.read()
return text_joined
def preprocess_document(self, document):
document = document.replace(r'\n+', "\n")
#document = re.sub(r"\s+", " ", document)
document = re.sub("β€œ", r"\"", document)
document = re.sub("”", r"\"", document)
document = re.sub(r"\\\"", "\"", document)
return document
def chunk_document(self, text, k=1500):
sentences = sent_tokenize(text)
words = word_tokenize(text)
chunks = []
current_chunk = []
current_word_count = 0
for sentence in sentences:
sentence_words = word_tokenize(sentence)
if current_word_count + len(sentence_words) <= k:
current_chunk.append(sentence)
current_word_count += len(sentence_words)
else:
chunks.append(" ".join(current_chunk))
current_chunk = [sentence]
current_word_count = len(sentence_words)
if current_chunk:
chunks.append(" ".join(current_chunk))
for id, chunk in enumerate(chunks):
if len(chunk.split()) < 2:
del chunks[id]
return chunks
def llama_parse_ocr(self, file_path):
llamaparse_url = 'https://api.cloud.llamaindex.ai/api/parsing/upload'
headers = {
'accept': 'application/json',
'Authorization': f'Bearer {self.llama_parse_key}'
}
files = {
'file': (file_path, open(file_path, 'rb'), 'application/pdf')
}
response = requests.post(llamaparse_url, headers=headers, files=files)
print(response.json()) # If you want to print the JSON response
job_id = response.json()["id"]
result_type = "markdown"
llamaparse_result_url = f"https://api.cloud.llamaindex.ai/api/parsing/job/{job_id}/result/{result_type}"
# check for the result until its ready
while True:
response = requests.get(llamaparse_result_url, headers=headers)
if response.status_code == 200:
break
return response.json()['markdown']