# PDF_Ingestion_Lib.py ######################################### # Library to hold functions for ingesting PDF files.# # #################### # Function List # # 1. convert_pdf_to_markdown(pdf_path) # 2. ingest_pdf_file(file_path, title=None, author=None, keywords=None): # 3. # # #################### import re # Import necessary libraries # Import Local ####################################################################################################################### # Function Definitions # # Ingest a text file into the database with Title/Author/Keywords # Constants MAX_FILE_SIZE_MB = 50 CONVERSION_TIMEOUT_SECONDS = 300 # Marker PDF solution # def convert_pdf_to_markdown(pdf_path): # """ # Convert a PDF file to Markdown by calling a script in another virtual environment. # """ # # logging.debug(f"Marker: Converting PDF file to Markdown: {pdf_path}") # # Check if the file size exceeds the maximum allowed size # file_size_mb = os.path.getsize(pdf_path) / (1024 * 1024) # if file_size_mb > MAX_FILE_SIZE_MB: # raise ValueError(f"File size ({file_size_mb:.2f} MB) exceeds the maximum allowed size of {MAX_FILE_SIZE_MB} MB") # # logging.debug("Marker: Converting PDF file to Markdown using Marker virtual environment") # # Path to the Python interpreter in the other virtual environment # other_venv_python = "Helper_Scripts/marker_venv/bin/python" # # # Path to the conversion script # converter_script = "Helper_Scripts/PDF_Converter.py" # # logging.debug("Marker: Attempting to convert PDF file to Markdown...") # try: # result = subprocess.run( # [other_venv_python, converter_script, pdf_path], # capture_output=True, # text=True, # timeout=CONVERSION_TIMEOUT_SECONDS # ) # if result.returncode != 0: # raise Exception(f"Conversion failed: {result.stderr}") # return result.stdout # except subprocess.TimeoutExpired: # raise Exception(f"PDF conversion timed out after {CONVERSION_TIMEOUT_SECONDS} seconds") # # # def process_and_ingest_pdf(file, title, author, keywords): # if file is None: # return "Please select a PDF file to upload." # # try: # # Create a temporary directory # with tempfile.TemporaryDirectory() as temp_dir: # # Create a path for the temporary PDF file # temp_path = os.path.join(temp_dir, "temp.pdf") # # # Copy the contents of the uploaded file to the temporary file # shutil.copy(file.name, temp_path) # # # Call the ingest_pdf_file function with the temporary file path # result = ingest_pdf_file(temp_path, title, author, keywords) # # return result # except Exception as e: # return f"Error processing PDF: {str(e)}" # # # def ingest_pdf_file(file_path, title=None, author=None, keywords=None): # try: # # Convert PDF to Markdown # markdown_content = convert_pdf_to_markdown(file_path) # # # If title is not provided, use the filename without extension # if not title: # title = os.path.splitext(os.path.basename(file_path))[0] # # # If author is not provided, set it to 'Unknown' # if not author: # author = 'Unknown' # # # If keywords are not provided, use a default keyword # if not keywords: # keywords = 'pdf_file,markdown_converted' # else: # keywords = f'pdf_file,markdown_converted,{keywords}' # # # Add the markdown content to the database # add_media_with_keywords( # url=file_path, # title=title, # media_type='document', # content=markdown_content, # keywords=keywords, # prompt='No prompt for PDF files', # summary='No summary for PDF files', # transcription_model='None', # author=author, # ingestion_date=datetime.now().strftime('%Y-%m-%d') # ) # # return f"PDF file '{title}' converted to Markdown and ingested successfully.", file_path # except ValueError as e: # logging.error(f"File size error: {str(e)}") # return f"Error: {str(e)}", file_path # except Exception as e: # logging.error(f"Error ingesting PDF file: {str(e)}") # return f"Error ingesting PDF file: {str(e)}", file_path # # # def process_and_cleanup_pdf(file, title, author, keywords): # # FIXME - Update to validate file upload/filetype is pdf.... # if file is None: # return "No file uploaded. Please upload a PDF file." # # temp_dir = tempfile.mkdtemp() # temp_file_path = os.path.join(temp_dir, "temp.pdf") # # try: # # Copy the uploaded file to a temporary location # shutil.copy2(file.name, temp_file_path) # # # Process the file # result, _ = ingest_pdf_file(temp_file_path, title, author, keywords) # # return result # except Exception as e: # logging.error(f"Error in processing and cleanup: {str(e)}") # return f"Error: {str(e)}" # finally: # # Clean up the temporary directory and its contents # try: # shutil.rmtree(temp_dir) # logging.info(f"Removed temporary directory: {temp_dir}") # except Exception as cleanup_error: # logging.error(f"Error during cleanup: {str(cleanup_error)}") # result += f"\nWarning: Could not remove temporary files: {str(cleanup_error)}" import logging # # ####################################################################################################################### # # Non-Marker implementation import os import shutil import tempfile from datetime import datetime import pymupdf from App_Function_Libraries.DB.DB_Manager import add_media_with_keywords def extract_text_and_format_from_pdf(pdf_path): """ Extract text from a PDF file and convert it to Markdown, preserving formatting. """ try: markdown_text = "" with pymupdf.open(pdf_path) as doc: for page_num, page in enumerate(doc, 1): markdown_text += f"## Page {page_num}\n\n" blocks = page.get_text("dict")["blocks"] current_paragraph = "" for block in blocks: if block["type"] == 0: # Text block for line in block["lines"]: line_text = "" for span in line["spans"]: text = span["text"] font_size = span["size"] font_flags = span["flags"] # Apply formatting based on font size and flags if font_size > 20: text = f"# {text}" elif font_size > 16: text = f"## {text}" elif font_size > 14: text = f"### {text}" if font_flags & 2 ** 0: # Bold text = f"**{text}**" if font_flags & 2 ** 1: # Italic text = f"*{text}*" line_text += text + " " # Remove hyphens at the end of lines line_text = line_text.rstrip() if line_text.endswith('-'): line_text = line_text[:-1] else: line_text += " " current_paragraph += line_text # End of block, add paragraph if current_paragraph: # Remove extra spaces current_paragraph = re.sub(r'\s+', ' ', current_paragraph).strip() markdown_text += current_paragraph + "\n\n" current_paragraph = "" elif block["type"] == 1: # Image block markdown_text += "[Image]\n\n" markdown_text += "\n---\n\n" # Page separator # Clean up hyphenated words markdown_text = re.sub(r'(\w+)-\s*\n(\w+)', r'\1\2', markdown_text) return markdown_text except Exception as e: logging.error(f"Error extracting text and formatting from PDF: {str(e)}") raise def extract_metadata_from_pdf(pdf_path): """ Extract metadata from a PDF file using PyMuPDF. """ try: with pymupdf.open(pdf_path) as doc: metadata = doc.metadata return metadata except Exception as e: logging.error(f"Error extracting metadata from PDF: {str(e)}") return {} def process_and_ingest_pdf(file, title, author, keywords): if file is None: return "Please select a PDF file to upload." try: # Create a temporary directory with tempfile.TemporaryDirectory() as temp_dir: # Create a path for the temporary PDF file temp_path = os.path.join(temp_dir, "temp.pdf") # Copy the contents of the uploaded file to the temporary file shutil.copy(file.name, temp_path) # Extract text and convert to Markdown markdown_text = extract_text_and_format_from_pdf(temp_path) # Extract metadata from PDF metadata = extract_metadata_from_pdf(temp_path) # Use metadata for title and author if not provided if not title: title = metadata.get('title', os.path.splitext(os.path.basename(file.name))[0]) if not author: author = metadata.get('author', 'Unknown') # If keywords are not provided, use a default keyword if not keywords: keywords = 'pdf_file,markdown_converted' else: keywords = f'pdf_file,markdown_converted,{keywords}' # Add metadata-based keywords if 'subject' in metadata: keywords += f",{metadata['subject']}" # Add the PDF content to the database add_media_with_keywords( url=file.name, title=title, media_type='document', content=markdown_text, keywords=keywords, prompt='No prompt for PDF files', summary='No summary for PDF files', transcription_model='None', author=author, ingestion_date=datetime.now().strftime('%Y-%m-%d') ) return f"PDF file '{title}' by {author} ingested successfully and converted to Markdown." except Exception as e: logging.error(f"Error ingesting PDF file: {str(e)}") return f"Error ingesting PDF file: {str(e)}" def process_and_cleanup_pdf(file, title, author, keywords): if file is None: return "No file uploaded. Please upload a PDF file." try: result = process_and_ingest_pdf(file, title, author, keywords) return result except Exception as e: logging.error(f"Error in processing and cleanup: {str(e)}") return f"Error: {str(e)}" # # End of PDF_Ingestion_Lib.py #######################################################################################################################