|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import os
|
|
import re
|
|
import tempfile
|
|
import zipfile
|
|
from datetime import datetime
|
|
import logging
|
|
|
|
|
|
import ebooklib
|
|
from bs4 import BeautifulSoup
|
|
from ebooklib import epub
|
|
|
|
|
|
from App_Function_Libraries.DB.DB_Manager import add_media_with_keywords, add_media_to_database
|
|
from App_Function_Libraries.Summarization.Summarization_General_Lib import perform_summarization
|
|
from App_Function_Libraries.Chunk_Lib import chunk_ebook_by_chapters
|
|
from App_Function_Libraries.Metrics.metrics_logger import log_counter, log_histogram
|
|
|
|
|
|
|
|
|
|
|
|
def import_epub(file_path,
|
|
title=None,
|
|
author=None,
|
|
keywords=None,
|
|
custom_prompt=None,
|
|
system_prompt=None,
|
|
summary=None,
|
|
auto_summarize=False,
|
|
api_name=None,
|
|
api_key=None,
|
|
chunk_options=None,
|
|
custom_chapter_pattern=None
|
|
):
|
|
"""
|
|
Imports an EPUB file, extracts its content, chunks it, optionally summarizes it, and adds it to the database.
|
|
|
|
Parameters:
|
|
- file_path (str): Path to the EPUB file.
|
|
- title (str, optional): Title of the book.
|
|
- author (str, optional): Author of the book.
|
|
- keywords (str, optional): Comma-separated keywords for the book.
|
|
- custom_prompt (str, optional): Custom user prompt for summarization.
|
|
- summary (str, optional): Predefined summary of the book.
|
|
- auto_summarize (bool, optional): Whether to auto-summarize the chunks.
|
|
- api_name (str, optional): API name for summarization.
|
|
- api_key (str, optional): API key for summarization.
|
|
- chunk_options (dict, optional): Options for chunking.
|
|
- custom_chapter_pattern (str, optional): Custom regex pattern for chapter detection.
|
|
|
|
Returns:
|
|
- str: Status message indicating success or failure.
|
|
"""
|
|
try:
|
|
logging.info(f"Importing EPUB file from {file_path}")
|
|
log_counter("epub_import_attempt", labels={"file_path": file_path})
|
|
|
|
start_time = datetime.now()
|
|
|
|
|
|
markdown_content = epub_to_markdown(file_path)
|
|
logging.debug("Converted EPUB to Markdown.")
|
|
|
|
|
|
if not title or not author:
|
|
extracted_title, extracted_author = extract_epub_metadata(markdown_content)
|
|
title = title or extracted_title or os.path.splitext(os.path.basename(file_path))[0]
|
|
author = author or extracted_author or "Unknown"
|
|
logging.debug(f"Extracted metadata - Title: {title}, Author: {author}")
|
|
|
|
|
|
keyword_list = [kw.strip() for kw in keywords.split(',')] if keywords else []
|
|
logging.debug(f"Keywords: {keyword_list}")
|
|
|
|
|
|
if chunk_options is None:
|
|
chunk_options = {
|
|
'method': 'chapter',
|
|
'max_size': 500,
|
|
'overlap': 200,
|
|
'custom_chapter_pattern': custom_chapter_pattern
|
|
}
|
|
else:
|
|
|
|
chunk_options.setdefault('method', 'chapter')
|
|
chunk_options.setdefault('custom_chapter_pattern', custom_chapter_pattern)
|
|
|
|
|
|
chunks = chunk_ebook_by_chapters(markdown_content, chunk_options)
|
|
logging.info(f"Total chunks created: {len(chunks)}")
|
|
log_histogram("epub_chunks_created", len(chunks), labels={"file_path": file_path})
|
|
|
|
if chunks:
|
|
logging.debug(f"Structure of first chunk: {chunks[0].keys()}")
|
|
|
|
|
|
if auto_summarize and api_name and api_key:
|
|
logging.info("Auto-summarization is enabled.")
|
|
summarized_chunks = []
|
|
for chunk in chunks:
|
|
chunk_text = chunk.get('text', '')
|
|
if chunk_text:
|
|
summary_text = perform_summarization(api_name, chunk_text, custom_prompt, api_key,
|
|
recursive_summarization=False, temp=None,
|
|
system_message=system_prompt
|
|
)
|
|
chunk['metadata']['summary'] = summary_text
|
|
summarized_chunks.append(chunk)
|
|
chunks = summarized_chunks
|
|
logging.info("Summarization of chunks completed.")
|
|
log_counter("epub_chunks_summarized", value=len(chunks), labels={"file_path": file_path})
|
|
else:
|
|
|
|
if summary:
|
|
logging.debug("Using provided summary.")
|
|
else:
|
|
summary = "No summary provided."
|
|
|
|
|
|
info_dict = {
|
|
'title': title,
|
|
'uploader': author,
|
|
'ingestion_date': datetime.now().strftime('%Y-%m-%d')
|
|
}
|
|
|
|
|
|
segments = [{'Text': chunk.get('text', chunk.get('content', ''))} for chunk in chunks]
|
|
logging.debug(f"Prepared segments for database. Number of segments: {len(segments)}")
|
|
|
|
|
|
result = add_media_to_database(
|
|
url=file_path,
|
|
info_dict=info_dict,
|
|
segments=segments,
|
|
summary=summary,
|
|
keywords=keyword_list,
|
|
custom_prompt_input=custom_prompt,
|
|
whisper_model="Imported",
|
|
media_type="ebook",
|
|
overwrite=False
|
|
)
|
|
|
|
end_time = datetime.now()
|
|
processing_time = (end_time - start_time).total_seconds()
|
|
log_histogram("epub_import_duration", processing_time, labels={"file_path": file_path})
|
|
|
|
logging.info(f"Ebook '{title}' by {author} imported successfully. Database result: {result}")
|
|
log_counter("epub ingested into the DB successfully", labels={"file_path": file_path})
|
|
return f"Ebook '{title}' by {author} imported successfully. Database result: {result}"
|
|
|
|
except Exception as e:
|
|
logging.exception(f"Error importing ebook: {str(e)}")
|
|
log_counter("epub_import_error", labels={"file_path": file_path, "error": str(e)})
|
|
return f"Error importing ebook: {str(e)}"
|
|
|
|
|
|
|
|
def process_zip_file(zip_file,
|
|
title,
|
|
author,
|
|
keywords,
|
|
custom_prompt,
|
|
system_prompt,
|
|
summary,
|
|
auto_summarize,
|
|
api_name,
|
|
api_key,
|
|
chunk_options
|
|
):
|
|
"""
|
|
Processes a ZIP file containing multiple EPUB files and imports each one.
|
|
|
|
Parameters:
|
|
- zip_file (file-like object): The ZIP file to process.
|
|
- title (str): Title prefix for the books.
|
|
- author (str): Author name for the books.
|
|
- keywords (str): Comma-separated keywords.
|
|
- custom_prompt (str): Custom user prompt for summarization.
|
|
- summary (str): Predefined summary (not used in this context).
|
|
- auto_summarize (bool): Whether to auto-summarize the chunks.
|
|
- api_name (str): API name for summarization.
|
|
- api_key (str): API key for summarization.
|
|
- chunk_options (dict): Options for chunking.
|
|
|
|
Returns:
|
|
- str: Combined status messages for all EPUB files in the ZIP.
|
|
"""
|
|
results = []
|
|
try:
|
|
with tempfile.TemporaryDirectory() as temp_dir:
|
|
zip_path = zip_file.name if hasattr(zip_file, 'name') else zip_file.path
|
|
logging.info(f"Extracting ZIP file {zip_path} to temporary directory {temp_dir}")
|
|
log_counter("zip_processing_attempt", labels={"zip_path": zip_path})
|
|
|
|
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
|
|
zip_ref.extractall(temp_dir)
|
|
|
|
epub_files = [f for f in os.listdir(temp_dir) if f.lower().endswith('.epub')]
|
|
log_histogram("epub_files_in_zip", len(epub_files), labels={"zip_path": zip_path})
|
|
|
|
for filename in epub_files:
|
|
file_path = os.path.join(temp_dir, filename)
|
|
logging.info(f"Processing EPUB file {filename} from ZIP.")
|
|
result = import_epub(
|
|
file_path=file_path,
|
|
title=title,
|
|
author=author,
|
|
keywords=keywords,
|
|
custom_prompt=custom_prompt,
|
|
summary=summary,
|
|
auto_summarize=auto_summarize,
|
|
api_name=api_name,
|
|
api_key=api_key,
|
|
chunk_options=chunk_options,
|
|
custom_chapter_pattern=chunk_options.get('custom_chapter_pattern') if chunk_options else None
|
|
)
|
|
results.append(f"File: {filename} - {result}")
|
|
|
|
logging.info("Completed processing all EPUB files in the ZIP.")
|
|
log_counter("zip_processing_success", labels={"zip_path": zip_path})
|
|
except Exception as e:
|
|
logging.exception(f"Error processing ZIP file: {str(e)}")
|
|
log_counter("zip_processing_error", labels={"zip_path": zip_path, "error": str(e)})
|
|
return f"Error processing ZIP file: {str(e)}"
|
|
|
|
return "\n".join(results)
|
|
|
|
|
|
def import_file_handler(file,
|
|
title,
|
|
author,
|
|
keywords,
|
|
system_prompt,
|
|
custom_prompt,
|
|
auto_summarize,
|
|
api_name,
|
|
api_key,
|
|
max_chunk_size,
|
|
chunk_overlap,
|
|
custom_chapter_pattern
|
|
):
|
|
try:
|
|
log_counter("file_import_attempt", labels={"file_name": file.name})
|
|
|
|
|
|
if isinstance(max_chunk_size, str):
|
|
max_chunk_size = int(max_chunk_size) if max_chunk_size.strip() else 4000
|
|
elif not isinstance(max_chunk_size, int):
|
|
max_chunk_size = 4000
|
|
|
|
|
|
if isinstance(chunk_overlap, str):
|
|
chunk_overlap = int(chunk_overlap) if chunk_overlap.strip() else 0
|
|
elif not isinstance(chunk_overlap, int):
|
|
chunk_overlap = 0
|
|
|
|
chunk_options = {
|
|
'method': 'chapter',
|
|
'max_size': max_chunk_size,
|
|
'overlap': chunk_overlap,
|
|
'custom_chapter_pattern': custom_chapter_pattern if custom_chapter_pattern else None
|
|
}
|
|
|
|
if file is None:
|
|
log_counter("file_import_error", labels={"error": "No file uploaded"})
|
|
return "No file uploaded."
|
|
|
|
file_path = file.name
|
|
if not os.path.exists(file_path):
|
|
log_counter("file_import_error", labels={"error": "File not found", "file_name": file.name})
|
|
return "Uploaded file not found."
|
|
|
|
start_time = datetime.now()
|
|
|
|
if file_path.lower().endswith('.epub'):
|
|
status = import_epub(
|
|
file_path,
|
|
title,
|
|
author,
|
|
keywords,
|
|
custom_prompt=custom_prompt,
|
|
system_prompt=system_prompt,
|
|
summary=None,
|
|
auto_summarize=auto_summarize,
|
|
api_name=api_name,
|
|
api_key=api_key,
|
|
chunk_options=chunk_options,
|
|
custom_chapter_pattern=custom_chapter_pattern
|
|
)
|
|
log_counter("epub_import_success", labels={"file_name": file.name})
|
|
result = f"📚 EPUB Imported Successfully:\n{status}"
|
|
elif file.name.lower().endswith('.zip'):
|
|
status = process_zip_file(
|
|
zip_file=file,
|
|
title=title,
|
|
author=author,
|
|
keywords=keywords,
|
|
custom_prompt=custom_prompt,
|
|
system_prompt=system_prompt,
|
|
summary=None,
|
|
auto_summarize=auto_summarize,
|
|
api_name=api_name,
|
|
api_key=api_key,
|
|
chunk_options=chunk_options
|
|
)
|
|
log_counter("zip_import_success", labels={"file_name": file.name})
|
|
result = f"📦 ZIP Processed Successfully:\n{status}"
|
|
elif file.name.lower().endswith(('.chm', '.html', '.pdf', '.xml', '.opml')):
|
|
file_type = file.name.split('.')[-1].upper()
|
|
log_counter("unsupported_file_type", labels={"file_type": file_type})
|
|
result = f"{file_type} file import is not yet supported."
|
|
else:
|
|
log_counter("unsupported_file_type", labels={"file_type": file.name.split('.')[-1]})
|
|
result = "❌ Unsupported file type. Please upload an `.epub` file or a `.zip` file containing `.epub` files."
|
|
|
|
end_time = datetime.now()
|
|
processing_time = (end_time - start_time).total_seconds()
|
|
log_histogram("file_import_duration", processing_time, labels={"file_name": file.name})
|
|
|
|
return result
|
|
|
|
except ValueError as ve:
|
|
logging.exception(f"Error parsing input values: {str(ve)}")
|
|
log_counter("file_import_error", labels={"error": "Invalid input", "file_name": file.name})
|
|
return f"❌ Error: Invalid input for chunk size or overlap. Please enter valid numbers."
|
|
except Exception as e:
|
|
logging.exception(f"Error during file import: {str(e)}")
|
|
log_counter("file_import_error", labels={"error": str(e), "file_name": file.name})
|
|
return f"❌ Error during import: {str(e)}"
|
|
|
|
|
|
def read_epub(file_path):
|
|
"""
|
|
Reads and extracts text from an EPUB file.
|
|
|
|
Parameters:
|
|
- file_path (str): Path to the EPUB file.
|
|
|
|
Returns:
|
|
- str: Extracted text content from the EPUB.
|
|
"""
|
|
try:
|
|
logging.info(f"Reading EPUB file from {file_path}")
|
|
book = epub.read_epub(file_path)
|
|
chapters = []
|
|
for item in book.get_items():
|
|
if item.get_type() == ebooklib.ITEM_DOCUMENT:
|
|
chapters.append(item.get_content())
|
|
|
|
text = ""
|
|
for html_content in chapters:
|
|
soup = BeautifulSoup(html_content, 'html.parser')
|
|
text += soup.get_text(separator='\n\n') + "\n\n"
|
|
logging.debug("EPUB content extraction completed.")
|
|
return text
|
|
except Exception as e:
|
|
logging.exception(f"Error reading EPUB file: {str(e)}")
|
|
raise
|
|
|
|
|
|
|
|
def extract_epub_metadata(content):
|
|
title_match = re.search(r'Title:\s*(.*?)\n', content)
|
|
author_match = re.search(r'Author:\s*(.*?)\n', content)
|
|
|
|
title = title_match.group(1) if title_match else None
|
|
author = author_match.group(1) if author_match else None
|
|
|
|
return title, author
|
|
|
|
|
|
def ingest_text_file(file_path, title=None, author=None, keywords=None):
|
|
"""
|
|
Ingests a plain text file into the database with optional metadata.
|
|
|
|
Parameters:
|
|
- file_path (str): Path to the text file.
|
|
- title (str, optional): Title of the document.
|
|
- author (str, optional): Author of the document.
|
|
- keywords (str, optional): Comma-separated keywords.
|
|
|
|
Returns:
|
|
- str: Status message indicating success or failure.
|
|
"""
|
|
try:
|
|
with open(file_path, 'r', encoding='utf-8') as file:
|
|
content = file.read()
|
|
|
|
|
|
if 'epub_converted' in (keywords or '').lower():
|
|
extracted_title, extracted_author = extract_epub_metadata(content)
|
|
title = title or extracted_title
|
|
author = author or extracted_author
|
|
logging.debug(f"Extracted metadata for converted EPUB - Title: {title}, Author: {author}")
|
|
|
|
|
|
if not title:
|
|
title = os.path.splitext(os.path.basename(file_path))[0]
|
|
|
|
|
|
if not author:
|
|
author = 'Unknown'
|
|
|
|
|
|
if not keywords:
|
|
keywords = 'text_file,epub_converted'
|
|
else:
|
|
keywords = f'text_file,epub_converted,{keywords}'
|
|
|
|
|
|
add_media_with_keywords(
|
|
url=file_path,
|
|
title=title,
|
|
media_type='document',
|
|
content=content,
|
|
keywords=keywords,
|
|
prompt='No prompt for text files',
|
|
summary='No summary for text files',
|
|
transcription_model='None',
|
|
author=author,
|
|
ingestion_date=datetime.now().strftime('%Y-%m-%d')
|
|
)
|
|
|
|
logging.info(f"Text file '{title}' by {author} ingested successfully.")
|
|
return f"Text file '{title}' by {author} ingested successfully."
|
|
except Exception as e:
|
|
logging.error(f"Error ingesting text file: {str(e)}")
|
|
return f"Error ingesting text file: {str(e)}"
|
|
|
|
|
|
def ingest_folder(folder_path, keywords=None):
|
|
"""
|
|
Ingests all text files within a specified folder.
|
|
|
|
Parameters:
|
|
- folder_path (str): Path to the folder containing text files.
|
|
- keywords (str, optional): Comma-separated keywords to add to each file.
|
|
|
|
Returns:
|
|
- str: Combined status messages for all ingested text files.
|
|
"""
|
|
results = []
|
|
try:
|
|
logging.info(f"Ingesting all text files from folder {folder_path}")
|
|
for filename in os.listdir(folder_path):
|
|
if filename.lower().endswith('.txt'):
|
|
file_path = os.path.join(folder_path, filename)
|
|
result = ingest_text_file(file_path, keywords=keywords)
|
|
results.append(result)
|
|
logging.info("Completed ingestion of all text files in the folder.")
|
|
except Exception as e:
|
|
logging.exception(f"Error ingesting folder: {str(e)}")
|
|
return f"Error ingesting folder: {str(e)}"
|
|
|
|
return "\n".join(results)
|
|
|
|
|
|
def epub_to_markdown(epub_path):
|
|
"""
|
|
Converts an EPUB file to Markdown format, including the table of contents and chapter contents.
|
|
|
|
Parameters:
|
|
- epub_path (str): Path to the EPUB file.
|
|
|
|
Returns:
|
|
- str: Markdown-formatted content of the EPUB.
|
|
"""
|
|
try:
|
|
logging.info(f"Converting EPUB to Markdown from {epub_path}")
|
|
book = epub.read_epub(epub_path)
|
|
markdown_content = "# Table of Contents\n\n"
|
|
chapters = []
|
|
|
|
|
|
toc = book.toc
|
|
for item in toc:
|
|
if isinstance(item, tuple):
|
|
section, children = item
|
|
level = 1
|
|
markdown_content += format_toc_item(section, level)
|
|
for child in children:
|
|
markdown_content += format_toc_item(child, level + 1)
|
|
else:
|
|
markdown_content += format_toc_item(item, 1)
|
|
|
|
markdown_content += "\n---\n\n"
|
|
|
|
|
|
for item in book.get_items():
|
|
if item.get_type() == ebooklib.ITEM_DOCUMENT:
|
|
chapter_content = item.get_content().decode('utf-8')
|
|
soup = BeautifulSoup(chapter_content, 'html.parser')
|
|
|
|
|
|
title = soup.find(['h1', 'h2', 'h3'])
|
|
if title:
|
|
chapter_title = title.get_text()
|
|
markdown_content += f"# {chapter_title}\n\n"
|
|
|
|
|
|
for elem in soup.find_all(['p', 'h1', 'h2', 'h3', 'h4', 'h5', 'h6', 'ul', 'ol']):
|
|
if elem.name.startswith('h'):
|
|
level = int(elem.name[1])
|
|
markdown_content += f"{'#' * level} {elem.get_text()}\n\n"
|
|
elif elem.name == 'p':
|
|
markdown_content += f"{elem.get_text()}\n\n"
|
|
elif elem.name in ['ul', 'ol']:
|
|
for li in elem.find_all('li'):
|
|
prefix = '-' if elem.name == 'ul' else '1.'
|
|
markdown_content += f"{prefix} {li.get_text()}\n"
|
|
markdown_content += "\n"
|
|
|
|
markdown_content += "---\n\n"
|
|
|
|
logging.debug("EPUB to Markdown conversion completed.")
|
|
return markdown_content
|
|
|
|
except Exception as e:
|
|
logging.exception(f"Error converting EPUB to Markdown: {str(e)}")
|
|
raise
|
|
|
|
|
|
def format_toc_item(item, level):
|
|
"""
|
|
Formats a table of contents item into Markdown list format.
|
|
|
|
Parameters:
|
|
- item (epub.Link or epub.Section): TOC item.
|
|
- level (int): Heading level for indentation.
|
|
|
|
Returns:
|
|
- str: Markdown-formatted TOC item.
|
|
"""
|
|
try:
|
|
if isinstance(item, epub.Link):
|
|
title = item.title
|
|
elif isinstance(item, epub.Section):
|
|
title = item.title
|
|
else:
|
|
title = str(item)
|
|
|
|
return f"{' ' * (level - 1)}- [{title}](#{slugify(title)})\n"
|
|
except Exception as e:
|
|
logging.exception(f"Error formatting TOC item: {str(e)}")
|
|
return ""
|
|
|
|
|
|
def slugify(text):
|
|
"""
|
|
Converts a string into a slug suitable for Markdown links.
|
|
|
|
Parameters:
|
|
- text (str): The text to slugify.
|
|
|
|
Returns:
|
|
- str: Slugified text.
|
|
"""
|
|
return re.sub(r'[\W_]+', '-', text.lower()).strip('-')
|
|
|
|
|
|
|
|
|
|
|