import os import logging from concurrent.futures import ThreadPoolExecutor from pypdf import PdfReader from langchain.text_splitter import CharacterTextSplitter, RecursiveCharacterTextSplitter from langchain_community.vectorstores import FAISS #from langchain_community.embeddings import HuggingFaceEmbeddings from langchain_huggingface import HuggingFaceEmbeddings import time import torch from dotenv import load_dotenv logging.basicConfig( format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', level=logging.DEBUG ) logger = logging.getLogger(__name__) logging.getLogger('matplotlib').setLevel(logging.WARNING) # Suppress Matplotlib debug messages load_dotenv() logger.debug("Environment variables loaded.") def load_single_document(filepath): if filepath.endswith('.pdf'): with open(filepath, 'rb') as file: pdf_reader = PdfReader(file) text = " ".join([page.extract_text() for page in pdf_reader.pages]) elif filepath.endswith('.txt'): with open(filepath, 'r', encoding='utf-8') as file: text = file.read() else: logger.warning("Unsupported file type: %s", filepath) return {"content": "", "source": filepath} return {"content": text, "source": filepath} def load_documents(directory): logger.debug("Loading documents from directory: %s", directory) start_time = time.time() filepaths = [os.path.join(directory, filename) for filename in os.listdir(directory) if filename.endswith('.pdf') or filename.endswith('.txt')] if not filepaths: logger.error("No documents found in the directory.") else: logger.debug("Found %d documents", len(filepaths)) documents = [] with ThreadPoolExecutor() as executor: documents = list(executor.map(load_single_document, filepaths)) end_time = time.time() logger.debug("Loaded %d documents in %.2f seconds.", len(documents), end_time - start_time) return documents device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") def prepare_documents(documents): logger.debug("Preparing documents for embedding.") start_time = time.time() if not documents: logger.error("No documents to prepare.") return None text_splitter = CharacterTextSplitter(chunk_size=1000, chunk_overlap=0) texts = text_splitter.create_documents([doc["content"] for doc in documents], metadatas=[{"source": os.path.basename(doc["source"])} for doc in documents]) if not texts: logger.error("No texts to embed after splitting.") return None logger.debug(f"Created {len(texts)} text chunks.") modelPath = "sentence-transformers/all-MiniLM-l6-v2" model_kwargs = {'device': device} encode_kwargs = {'normalize_embeddings': False} embeddings = HuggingFaceEmbeddings(model_name=modelPath, model_kwargs=model_kwargs, encode_kwargs=encode_kwargs) try: db = FAISS.from_documents(texts, embeddings) logger.debug("FAISS index created successfully.") except Exception as e: logger.error(f"Error creating FAISS index: {e}") return None end_time = time.time() logger.debug(f"Documents prepared in {end_time - start_time:.2f} seconds.") return db def get_context_sources(question, db): start_time = time.time() if db is None: logger.error("Database is None. Cannot perform similarity search.") return "", "" try: docs = db.similarity_search(question, k=3) context = " ".join([doc.page_content for doc in docs]) sources = ", ".join(set([doc.metadata['source'] for doc in docs])) except Exception as e: logger.error(f"Error during similarity search: {e}") return "", "" end_time = time.time() logger.debug(f"Similarity search done in {end_time - start_time:.2f} seconds.") return context, sources