chat / app /crud /process_file.py
ariansyahdedy's picture
Test Rag
e0c1af0
raw
history blame
5.3 kB
# app/crud.py
from langchain_community.document_loaders import PyPDFLoader, Docx2txtLoader, CSVLoader, UnstructuredExcelLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from sqlalchemy.future import select
from sqlalchemy.ext.asyncio import AsyncSession
from markitdown import MarkItDown
import os
import logging
from typing import List, Optional
# from app.db.models.docs import *
# from app.schemas.schemas import DocumentCreate, DocumentUpdate
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
async def load_file_with_markitdown(file_path:str, llm_client:str=None, model:str=None):
if llm_client and model:
markitdown = MarkItDown(llm_client, model)
documents = markitdown.convert(file_path)
else:
markitdown = MarkItDown()
documents = markitdown.convert(file_path)
return documents
async def load_pdf_with_langchain(file_path):
"""
Loads and extracts text from a PDF file using LangChain's PyPDFLoader.
Parameters:
file_path (str): Path to the PDF file.
Returns:
List[Document]: A list of LangChain Document objects with metadata.
"""
loader = PyPDFLoader(file_path, extract_images=True)
documents = loader.load()
return documents # Returns a list of Document objects
async def load_file_with_langchain(file_path: str):
"""
Loads and extracts text from a PDF or DOCX file using LangChain's appropriate loader.
Parameters:
file_path (str): Path to the file (PDF or DOCX).
Returns:
List[Document]: A list of LangChain Document objects with metadata.
"""
# Determine the file extension
_, file_extension = os.path.splitext(file_path)
# Choose the loader based on file extension
if file_extension.lower() == '.pdf':
loader = PyPDFLoader(file_path)
elif file_extension.lower() == '.docx':
loader = Docx2txtLoader(file_path)
elif file_extension.lower() == '.csv':
loader = CSVLoader(file_path)
elif file_extension.lower() == '.xlsx':
loader = UnstructuredExcelLoader(file_path)
else:
raise ValueError("Unsupported file format. Please provide a PDF or DOCX file.")
# Load the documents
documents = loader.load()
return documents
async def split_documents(documents, chunk_size=10000, chunk_overlap=1000):
"""
Splits documents into smaller chunks with overlap.
Parameters:
documents (List[Document]): List of LangChain Document objects.
chunk_size (int): The maximum size of each chunk.
chunk_overlap (int): The number of characters to overlap between chunks.
Returns:
List[Document]: List of chunked Document objects.
"""
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
)
split_docs = text_splitter.split_documents(documents)
return split_docs
async def process_uploaded_file(
id, file_path,
rag_system=None,
llm_client=None,
llm_model=None
):
try:
# Load the document using LangChain
documents = await load_file_with_markitdown(file_path, llm_client=llm_client, model=llm_model)
logger.info(f"Loaded document: {file_path}")
# Concatenate all pages to get the full document text for context generation
# whole_document_content = "\n".join([doc.page_content for doc in documents])
except Exception as e:
logger.error(f"Failed to load document {file_path}: {e}")
raise RuntimeError(f"Error loading document: {file_path}") from e
# # Generate context for each chunk if llm is provided
# if llm:
# for doc in split_docs:
# try:
# context = await llm.generate_context(doc, whole_document_content=whole_document_content)
# # Add context to the beginning of the page content
# doc.page_content = f"{context.replace('<|eot_id|>', '')}\n\n{doc.page_content}"
# logger.info(f"Context generated and added for chunk {split_docs.index(doc)}")
# except Exception as e:
# logger.error(f"Failed to generate context for chunk {split_docs.index(doc)}: {e}")
# raise RuntimeError(f"Error generating context for chunk {split_docs.index(doc)}") from e
# Add to RAG system if rag_system is provided and load_only is False
if rag_system:
try:
rag_system.add_document(doc_id = f"{id}_{documents.title}", text = documents.text_content, meta_data = {"source": file_path})
print(f"doc_id: {id}_{documents.title}")
print(f"content: {documents.text_content}")
# print(f"New Page Content: {doc.page_content}")
logger.info(f"Document chunks successfully added to RAG system for file {file_path}")
except Exception as e:
logger.error(f"Failed to add document chunks to RAG system for {file_path}: {e}")
raise RuntimeError(f"Error adding document to RAG system: {file_path}") from e
else:
logger.info(f"Loaded document {file_path}, but not added to RAG system")
return documents