""" Extraction @author : Sakshi Tantak """ # Imports import os import re import json from time import time from datetime import datetime from invoice_extractor import PROMPTS_DIR from invoice_extractor.llm import call_openai from invoice_extractor.ocr import PyMuPDF4LLMOCR, AzureLayoutOCR class LOB: def __init__(self, ocr_engine = 'open-source/pymupdf4llm'): if ocr_engine == 'open-source/pymupdf4llm': self.engine = PyMuPDF4LLMOCR() elif ocr_engine == 'azure/layout': self.engine = AzureLayoutOCR() self.file_type = 'pdf' with open(os.path.join(PROMPTS_DIR, 'extraction_system_prompt.txt'), 'r') as f: self.analysis_prompt = f.read() def __call__(self, file_bytes): response = [ { 'stage' : 'OCR', 'response' : '', 'time' : 0 }, { 'stage' : 'EXTRACTION', 'response' : '', 'time' : 0 }, { 'stage' : 'POST_PROCESS', 'response' : '', 'time' : 0 } ] try: print('OCR Started ...') ocr_start = time() if isinstance(file_bytes, str): text = file_bytes elif isinstance(file_bytes, (bytearray, bytes)): text, _ = self.engine(file_bytes) ocr_end = time() print(f'OCR done [{ocr_end - ocr_start}]') if len(text) > 0: response[0].update({'response' : text, 'time' : ocr_end - ocr_start}) try: print('Extracting ...') extraction_start = time() raw_response = self._extract(text = text) extraction_end = time() print('Extraction : ', raw_response) print(f'Extracted [{extraction_end - extraction_start}]') if raw_response is not None and len(raw_response) > 0: response[1].update({'response' : raw_response, 'time' : extraction_end - extraction_start}) try: print('Post processing extraction ...') post_process_start = time() post_processed = self._post_process(response = raw_response) post_process_end = time() print(f'Suggested [{post_process_end - post_process_start}]') if post_processed is not None and len(post_processed) > 0: response[2].update({'response' : post_processed, 'time' : post_process_end - post_process_start}) except Exception as pp_e: print(f'Exception while post processing : {pp_e}') except Exception as extraction_e: print(f'Exception while extracting : {extraction_e}') except Exception as ocr_e: print(f'Exception while OCR : {ocr_e}') return response def _extract(self, **kwargs): raise NotImplemented def _post_process(self, **kwargs): raise NotImplemented class Auto(LOB): def __init__(self, ocr_engine = 'open-source/pymupdf4llm'): super().__init__(ocr_engine) with open(os.path.join(PROMPTS_DIR, 'extraction_system_prompt.txt'), 'r') as f: self.extraction_prompt = f.read() with open(os.path.join(PROMPTS_DIR, 'auto', 'entities.json'), 'r') as f: self.entities = json.load(f) for entity in self.entities: entity.update({'entityNameRaw' : '', 'entityValueRaw' : ''}) def _extract(self, **kwargs): text = kwargs.get('text') if len(text) > 0: prompt = self.extraction_prompt.replace("{{date}}", f'{datetime.today().day}/{datetime.today().month}/{datetime.today().year}') + str(self.entities) prompt += '\nInvoice : ' + text response = call_openai(prompt) if len(response) > 0: return response return '' def _post_process(self, **kwargs): response = kwargs.get('response', '') if len(response) > 0: jsonified_str_list = [e['entityName'] for e in self.entities if 'json' in e['expectedOutputFormat'].lower()] response = re.sub(r'`|json', '', response) if len(response) > 0: try: response = json.loads(response) for entity in response: if entity['entityName'] in jsonified_str_list: try: entity['entityValue'] = json.loads(entity['entityValue']) except Exception as e: pass return response except Exception as jsonify_exc: print(f'Error JSONifying {jsonify_exc}') return [] if __name__ == '__main__': import os import json import sys from tqdm import tqdm filepaths = sys.argv[1:] auto = Auto() for filepath in tqdm(filepaths): print(filepath) if filepath.endswith('.pdf'): file_bytes = open(filepath, 'rb').read() elif filepath.endswith(('.txt', '.md')): file_bytes = open(filepath).read() extraction = auto(file_bytes) print(extraction) basepath = os.path.splitext(filepath)[0] with open(basepath + '.json', 'w') as f: json.dump(extraction, f, indent = 4) with open(basepath + '.entities.json', 'w') as f: json.dump(extraction[-1]['response'], f, indent = 4)