chatbot-llamaindex / backend2.py
gufett0's picture
switched back to langchain
3968a50
raw
history blame
3.95 kB
import os
import logging
from concurrent.futures import ThreadPoolExecutor
from pypdf import PdfReader
from langchain.text_splitter import CharacterTextSplitter, RecursiveCharacterTextSplitter
from langchain_community.vectorstores import FAISS
#from langchain_community.embeddings import HuggingFaceEmbeddings
from langchain_huggingface import HuggingFaceEmbeddings
import time
import torch
from dotenv import load_dotenv
logging.basicConfig(
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
level=logging.DEBUG
)
logger = logging.getLogger(__name__)
logging.getLogger('matplotlib').setLevel(logging.WARNING) # Suppress Matplotlib debug messages
load_dotenv()
logger.debug("Environment variables loaded.")
def load_single_document(filepath):
if filepath.endswith('.pdf'):
with open(filepath, 'rb') as file:
pdf_reader = PdfReader(file)
text = " ".join([page.extract_text() for page in pdf_reader.pages])
elif filepath.endswith('.txt'):
with open(filepath, 'r', encoding='utf-8') as file:
text = file.read()
else:
logger.warning("Unsupported file type: %s", filepath)
return {"content": "", "source": filepath}
return {"content": text, "source": filepath}
def load_documents(directory):
logger.debug("Loading documents from directory: %s", directory)
start_time = time.time()
filepaths = [os.path.join(directory, filename) for filename in os.listdir(directory) if filename.endswith('.pdf') or filename.endswith('.txt')]
if not filepaths:
logger.error("No documents found in the directory.")
else:
logger.debug("Found %d documents", len(filepaths))
documents = []
with ThreadPoolExecutor() as executor:
documents = list(executor.map(load_single_document, filepaths))
end_time = time.time()
logger.debug("Loaded %d documents in %.2f seconds.", len(documents), end_time - start_time)
return documents
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
def prepare_documents(documents):
logger.debug("Preparing documents for embedding.")
start_time = time.time()
if not documents:
logger.error("No documents to prepare.")
return None
text_splitter = CharacterTextSplitter(chunk_size=1000, chunk_overlap=0)
texts = text_splitter.create_documents([doc["content"] for doc in documents], metadatas=[{"source": os.path.basename(doc["source"])} for doc in documents])
if not texts:
logger.error("No texts to embed after splitting.")
return None
logger.debug(f"Created {len(texts)} text chunks.")
modelPath = "sentence-transformers/all-MiniLM-l6-v2"
model_kwargs = {'device': device}
encode_kwargs = {'normalize_embeddings': False}
embeddings = HuggingFaceEmbeddings(model_name=modelPath, model_kwargs=model_kwargs, encode_kwargs=encode_kwargs)
try:
db = FAISS.from_documents(texts, embeddings)
logger.debug("FAISS index created successfully.")
except Exception as e:
logger.error(f"Error creating FAISS index: {e}")
return None
end_time = time.time()
logger.debug(f"Documents prepared in {end_time - start_time:.2f} seconds.")
return db
def get_context_sources(question, db):
start_time = time.time()
if db is None:
logger.error("Database is None. Cannot perform similarity search.")
return "", ""
try:
docs = db.similarity_search(question, k=3)
context = " ".join([doc.page_content for doc in docs])
sources = ", ".join(set([doc.metadata['source'] for doc in docs]))
except Exception as e:
logger.error(f"Error during similarity search: {e}")
return "", ""
end_time = time.time()
logger.debug(f"Similarity search done in {end_time - start_time:.2f} seconds.")
return context, sources