Spaces:
Sleeping
Sleeping
import os | |
import logging | |
from concurrent.futures import ThreadPoolExecutor | |
from pypdf import PdfReader | |
from langchain.text_splitter import CharacterTextSplitter, RecursiveCharacterTextSplitter | |
from langchain_community.vectorstores import FAISS | |
#from langchain_community.embeddings import HuggingFaceEmbeddings | |
from langchain_huggingface import HuggingFaceEmbeddings | |
import time | |
import torch | |
from dotenv import load_dotenv | |
logging.basicConfig( | |
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', | |
level=logging.DEBUG | |
) | |
logger = logging.getLogger(__name__) | |
logging.getLogger('matplotlib').setLevel(logging.WARNING) # Suppress Matplotlib debug messages | |
load_dotenv() | |
logger.debug("Environment variables loaded.") | |
def load_single_document(filepath): | |
if filepath.endswith('.pdf'): | |
with open(filepath, 'rb') as file: | |
pdf_reader = PdfReader(file) | |
text = " ".join([page.extract_text() for page in pdf_reader.pages]) | |
elif filepath.endswith('.txt'): | |
with open(filepath, 'r', encoding='utf-8') as file: | |
text = file.read() | |
else: | |
logger.warning("Unsupported file type: %s", filepath) | |
return {"content": "", "source": filepath} | |
return {"content": text, "source": filepath} | |
def load_documents(directory): | |
logger.debug("Loading documents from directory: %s", directory) | |
start_time = time.time() | |
filepaths = [os.path.join(directory, filename) for filename in os.listdir(directory) if filename.endswith('.pdf') or filename.endswith('.txt')] | |
if not filepaths: | |
logger.error("No documents found in the directory.") | |
else: | |
logger.debug("Found %d documents", len(filepaths)) | |
documents = [] | |
with ThreadPoolExecutor() as executor: | |
documents = list(executor.map(load_single_document, filepaths)) | |
end_time = time.time() | |
logger.debug("Loaded %d documents in %.2f seconds.", len(documents), end_time - start_time) | |
return documents | |
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") | |
def prepare_documents(documents): | |
logger.debug("Preparing documents for embedding.") | |
start_time = time.time() | |
if not documents: | |
logger.error("No documents to prepare.") | |
return None | |
text_splitter = CharacterTextSplitter(chunk_size=1000, chunk_overlap=0) | |
texts = text_splitter.create_documents([doc["content"] for doc in documents], metadatas=[{"source": os.path.basename(doc["source"])} for doc in documents]) | |
if not texts: | |
logger.error("No texts to embed after splitting.") | |
return None | |
logger.debug(f"Created {len(texts)} text chunks.") | |
modelPath = "sentence-transformers/all-MiniLM-l6-v2" | |
model_kwargs = {'device': device} | |
encode_kwargs = {'normalize_embeddings': False} | |
embeddings = HuggingFaceEmbeddings(model_name=modelPath, model_kwargs=model_kwargs, encode_kwargs=encode_kwargs) | |
try: | |
db = FAISS.from_documents(texts, embeddings) | |
logger.debug("FAISS index created successfully.") | |
except Exception as e: | |
logger.error(f"Error creating FAISS index: {e}") | |
return None | |
end_time = time.time() | |
logger.debug(f"Documents prepared in {end_time - start_time:.2f} seconds.") | |
return db | |
def get_context_sources(question, db): | |
start_time = time.time() | |
if db is None: | |
logger.error("Database is None. Cannot perform similarity search.") | |
return "", "" | |
try: | |
docs = db.similarity_search(question, k=3) | |
context = " ".join([doc.page_content for doc in docs]) | |
sources = ", ".join(set([doc.metadata['source'] for doc in docs])) | |
except Exception as e: | |
logger.error(f"Error during similarity search: {e}") | |
return "", "" | |
end_time = time.time() | |
logger.debug(f"Similarity search done in {end_time - start_time:.2f} seconds.") | |
return context, sources |