Spaces:
Build error
Build error
# app/crud.py | |
from langchain_community.document_loaders import PyPDFLoader, Docx2txtLoader, CSVLoader, UnstructuredExcelLoader | |
from langchain.text_splitter import RecursiveCharacterTextSplitter | |
from sqlalchemy.future import select | |
from sqlalchemy.ext.asyncio import AsyncSession | |
from markitdown import MarkItDown | |
import os | |
import logging | |
from typing import List, Optional | |
# from app.db.models.docs import * | |
# from app.schemas.schemas import DocumentCreate, DocumentUpdate | |
# Configure logging | |
logging.basicConfig(level=logging.INFO) | |
logger = logging.getLogger(__name__) | |
async def load_file_with_markitdown(file_path:str, llm_client:str=None, model:str=None): | |
if llm_client and model: | |
markitdown = MarkItDown(llm_client, model) | |
documents = markitdown.convert(file_path) | |
else: | |
markitdown = MarkItDown() | |
documents = markitdown.convert(file_path) | |
return documents | |
async def load_pdf_with_langchain(file_path): | |
""" | |
Loads and extracts text from a PDF file using LangChain's PyPDFLoader. | |
Parameters: | |
file_path (str): Path to the PDF file. | |
Returns: | |
List[Document]: A list of LangChain Document objects with metadata. | |
""" | |
loader = PyPDFLoader(file_path, extract_images=True) | |
documents = loader.load() | |
return documents # Returns a list of Document objects | |
async def load_file_with_langchain(file_path: str): | |
""" | |
Loads and extracts text from a PDF or DOCX file using LangChain's appropriate loader. | |
Parameters: | |
file_path (str): Path to the file (PDF or DOCX). | |
Returns: | |
List[Document]: A list of LangChain Document objects with metadata. | |
""" | |
# Determine the file extension | |
_, file_extension = os.path.splitext(file_path) | |
# Choose the loader based on file extension | |
if file_extension.lower() == '.pdf': | |
loader = PyPDFLoader(file_path) | |
elif file_extension.lower() == '.docx': | |
loader = Docx2txtLoader(file_path) | |
elif file_extension.lower() == '.csv': | |
loader = CSVLoader(file_path) | |
elif file_extension.lower() == '.xlsx': | |
loader = UnstructuredExcelLoader(file_path) | |
else: | |
raise ValueError("Unsupported file format. Please provide a PDF or DOCX file.") | |
# Load the documents | |
documents = loader.load() | |
return documents | |
async def split_documents(documents, chunk_size=10000, chunk_overlap=1000): | |
""" | |
Splits documents into smaller chunks with overlap. | |
Parameters: | |
documents (List[Document]): List of LangChain Document objects. | |
chunk_size (int): The maximum size of each chunk. | |
chunk_overlap (int): The number of characters to overlap between chunks. | |
Returns: | |
List[Document]: List of chunked Document objects. | |
""" | |
text_splitter = RecursiveCharacterTextSplitter( | |
chunk_size=chunk_size, | |
chunk_overlap=chunk_overlap, | |
) | |
split_docs = text_splitter.split_documents(documents) | |
return split_docs | |
async def process_uploaded_file( | |
id, file_path, | |
rag_system=None, | |
llm_client=None, | |
llm_model=None | |
): | |
try: | |
# Load the document using LangChain | |
documents = await load_file_with_markitdown(file_path, llm_client=llm_client, model=llm_model) | |
logger.info(f"Loaded document: {file_path}") | |
# Concatenate all pages to get the full document text for context generation | |
# whole_document_content = "\n".join([doc.page_content for doc in documents]) | |
except Exception as e: | |
logger.error(f"Failed to load document {file_path}: {e}") | |
raise RuntimeError(f"Error loading document: {file_path}") from e | |
# # Generate context for each chunk if llm is provided | |
# if llm: | |
# for doc in split_docs: | |
# try: | |
# context = await llm.generate_context(doc, whole_document_content=whole_document_content) | |
# # Add context to the beginning of the page content | |
# doc.page_content = f"{context.replace('<|eot_id|>', '')}\n\n{doc.page_content}" | |
# logger.info(f"Context generated and added for chunk {split_docs.index(doc)}") | |
# except Exception as e: | |
# logger.error(f"Failed to generate context for chunk {split_docs.index(doc)}: {e}") | |
# raise RuntimeError(f"Error generating context for chunk {split_docs.index(doc)}") from e | |
# Add to RAG system if rag_system is provided and load_only is False | |
if rag_system: | |
try: | |
rag_system.add_document(doc_id = f"{id}_{documents.title}", text = documents.text_content, meta_data = {"source": file_path}) | |
print(f"doc_id: {id}_{documents.title}") | |
print(f"content: {documents.text_content}") | |
# print(f"New Page Content: {doc.page_content}") | |
logger.info(f"Document chunks successfully added to RAG system for file {file_path}") | |
except Exception as e: | |
logger.error(f"Failed to add document chunks to RAG system for {file_path}: {e}") | |
raise RuntimeError(f"Error adding document to RAG system: {file_path}") from e | |
else: | |
logger.info(f"Loaded document {file_path}, but not added to RAG system") | |
return documents | |