File size: 5,732 Bytes
ed28876
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
# PDF_Ingestion_Lib.py
#########################################
# Library to hold functions for ingesting PDF files.#
#
####################
# Function List
#
# 1. convert_pdf_to_markdown(pdf_path)
# 2. ingest_pdf_file(file_path, title=None, author=None, keywords=None):
# 3.
#
#
####################


# Import necessary libraries
from datetime import datetime
import logging
import subprocess
import os
import shutil
import tempfile


# Import Local
from App_Function_Libraries.SQLite_DB import add_media_with_keywords

#######################################################################################################################
# Function Definitions
#

# Ingest a text file into the database with Title/Author/Keywords


# Constants
MAX_FILE_SIZE_MB = 50
CONVERSION_TIMEOUT_SECONDS = 300


def convert_pdf_to_markdown(pdf_path):
    """

    Convert a PDF file to Markdown by calling a script in another virtual environment.

    """

    logging.debug(f"Marker: Converting PDF file to Markdown: {pdf_path}")
    # Check if the file size exceeds the maximum allowed size
    file_size_mb = os.path.getsize(pdf_path) / (1024 * 1024)
    if file_size_mb > MAX_FILE_SIZE_MB:
        raise ValueError(f"File size ({file_size_mb:.2f} MB) exceeds the maximum allowed size of {MAX_FILE_SIZE_MB} MB")

    logging.debug("Marker: Converting PDF file to Markdown using Marker virtual environment")
    # Path to the Python interpreter in the other virtual environment
    other_venv_python = "Helper_Scripts/marker_venv/bin/python"

    # Path to the conversion script
    converter_script = "Helper_Scripts/PDF_Converter.py"

    logging.debug("Marker: Attempting to convert PDF file to Markdown...")
    try:
        result = subprocess.run(
            [other_venv_python, converter_script, pdf_path],
            capture_output=True,
            text=True,
            timeout=CONVERSION_TIMEOUT_SECONDS
        )
        if result.returncode != 0:
            raise Exception(f"Conversion failed: {result.stderr}")
        return result.stdout
    except subprocess.TimeoutExpired:
        raise Exception(f"PDF conversion timed out after {CONVERSION_TIMEOUT_SECONDS} seconds")


def process_and_ingest_pdf(file, title, author, keywords):
    if file is None:
        return "Please select a PDF file to upload."

    try:
        # Create a temporary directory
        with tempfile.TemporaryDirectory() as temp_dir:
            # Create a path for the temporary PDF file
            temp_path = os.path.join(temp_dir, "temp.pdf")

            # Copy the contents of the uploaded file to the temporary file
            shutil.copy(file.name, temp_path)

            # Call the ingest_pdf_file function with the temporary file path
            result = ingest_pdf_file(temp_path, title, author, keywords)

        return result
    except Exception as e:
        return f"Error processing PDF: {str(e)}"


def ingest_pdf_file(file_path, title=None, author=None, keywords=None):
    try:
        # Convert PDF to Markdown
        markdown_content = convert_pdf_to_markdown(file_path)

        # If title is not provided, use the filename without extension
        if not title:
            title = os.path.splitext(os.path.basename(file_path))[0]

        # If author is not provided, set it to 'Unknown'
        if not author:
            author = 'Unknown'

        # If keywords are not provided, use a default keyword
        if not keywords:
            keywords = 'pdf_file,markdown_converted'
        else:
            keywords = f'pdf_file,markdown_converted,{keywords}'

        # Add the markdown content to the database
        add_media_with_keywords(
            url=file_path,
            title=title,
            media_type='document',
            content=markdown_content,
            keywords=keywords,
            prompt='No prompt for PDF files',
            summary='No summary for PDF files',
            transcription_model='None',
            author=author,
            ingestion_date=datetime.now().strftime('%Y-%m-%d')
        )

        return f"PDF file '{title}' converted to Markdown and ingested successfully.", file_path
    except ValueError as e:
        logging.error(f"File size error: {str(e)}")
        return f"Error: {str(e)}", file_path
    except Exception as e:
        logging.error(f"Error ingesting PDF file: {str(e)}")
        return f"Error ingesting PDF file: {str(e)}", file_path


def process_and_cleanup_pdf(file, title, author, keywords):
    if file is None:
        return "No file uploaded. Please upload a PDF file."

    temp_dir = tempfile.mkdtemp()
    temp_file_path = os.path.join(temp_dir, "temp.pdf")

    try:
        # Copy the uploaded file to a temporary location
        shutil.copy2(file.name, temp_file_path)

        # Process the file
        result, _ = ingest_pdf_file(temp_file_path, title, author, keywords)

        return result
    except Exception as e:
        logging.error(f"Error in processing and cleanup: {str(e)}")
        return f"Error: {str(e)}"
    finally:
        # Clean up the temporary directory and its contents
        try:
            shutil.rmtree(temp_dir)
            logging.info(f"Removed temporary directory: {temp_dir}")
        except Exception as cleanup_error:
            logging.error(f"Error during cleanup: {str(cleanup_error)}")
            result += f"\nWarning: Could not remove temporary files: {str(cleanup_error)}"


#
#
#######################################################################################################################