import io import os import tempfile import hashlib import json import logging import pandas as pd from datetime import datetime from dotenv import load_dotenv from langchain_community.vectorstores import FAISS from langchain_openai import OpenAIEmbeddings from langchain.text_splitter import CharacterTextSplitter from PyPDF2 import PdfReader from docx import Document # from transformers import pipeline # Load environment variables load_dotenv() open_api_key_token = os.getenv('OPENAI_API_KEY') class FileHandler: def __init__(self, vector_db_path): self.vector_db_path = vector_db_path self.embeddings = OpenAIEmbeddings(api_key=open_api_key_token) # self.summarizer = pipeline("summarization") def prepare_metadata_string(self, document_name, document_description, department, version, last_updated): metadata_string = f"\nDocument Name: {document_name}\nDocument Description: {document_description}\nDepartment: {department}\nVersion: {version}\nLast Updated: {last_updated}" return metadata_string async def handle_file_upload(self, file, document_name, document_description, department, version, last_updated): content = await file.read() file_hash = hashlib.md5(content).hexdigest() file_key = f"{file.filename}_{file_hash}" vector_store_path = os.path.join(self.vector_db_path, f"{file_key}.vectorstore") metadata_path = os.path.join(self.vector_db_path, f"{file_key}.metadata.json") metadata_string = self.prepare_metadata_string(document_name, document_description, department, version, last_updated) if os.path.exists(vector_store_path) and os.path.exists(metadata_path): with open(metadata_path, 'r') as md_file: metadata = json.load(md_file) return {'path': vector_store_path, 'metadata': metadata, 'status': 'skipped - duplicate'} if file.filename.endswith('.csv') or file.filename.endswith('.xlsx'): texts = self.load_and_split_table(content, file.filename,metadata_string) else: texts = await self.load_and_split_text(content, file.filename,metadata_string) vector_store = self.create_vector_store(texts) vector_store.save_local(vector_store_path) metadata = { 'filename': file.filename, 'document_name': document_name, 'document_description': document_description, 'department': department, 'version': version, 'last_updated': last_updated, 'hash': file_hash, 'upload_date': datetime.now().isoformat(), 'file_path': vector_store_path, 'file_size': len(content), 'content_type': file.content_type } with open(metadata_path, 'w') as md_file: json.dump(metadata, md_file) return {"message": "File processed and vector store created successfully", "file_metadata": metadata} def summarize_text(self, text): try: summary = self.summarizer(text, max_length=150, min_length=10, do_sample=False) logging.info("Text summarization successful") return summary[0]['summary_text'] except Exception as e: logging.error(f"Error in summarization: {str(e)}") # Log error or handle exception return text # Return original text if summarization is not possible def load_and_split_table(self, content, filename,metadata_string): # Handle CSV and Excel file reading if filename.endswith('.csv'): df = pd.read_csv(io.StringIO(content.decode('utf-8'))) else: # Excel df = pd.read_excel(io.BytesIO(content)) text = df.to_string(index=False) # Convert DataFrame to string text += metadata_string # Append metadata to the text return self.split_text(text) async def load_and_split_text(self, content, filename,metadata_string): with tempfile.NamedTemporaryFile(delete=False, mode='w+b', suffix=f"_{filename}") as temp_file: temp_file.write(content) temp_file.flush() temp_file_path = temp_file.name # Ensure the temp file is closed before reading from it if filename.endswith('.pdf'): texts = await self.load_and_split_pdf(temp_file_path,metadata_string) elif filename.endswith('.docx'): texts = await self.load_and_split_docx(temp_file_path,metadata_string) elif filename.endswith('.txt'): texts = await self.load_and_split_txt(temp_file_path,metadata_string) # Apply summarization here to each text segment # summarized_texts = [self.summarize_text(text) for text in texts] # os.unlink(temp_file_path) # Explicitly remove the temporary file # return summarized_texts os.unlink(temp_file_path) # Explicitly remove the temporary file return texts async def load_and_split_pdf(self, pdf_path,metadata_string): reader = PdfReader(pdf_path) text = '' for page in reader.pages: text += page.extract_text() or "" text += metadata_string # Append metadata to the text return self.split_text(text) async def load_and_split_docx(self, docx_path,metadata_string): doc = Document(docx_path) text = '\n'.join([paragraph.text for paragraph in doc.paragraphs if paragraph.text]) text += metadata_string # Append metadata to the text return self.split_text(text) async def load_and_split_txt(self, txt_path,metadata_string): with open(txt_path, 'r', encoding='utf-8') as file: text = file.read() text += metadata_string # Append metadata to the text return self.split_text(text) def split_text(self, text): text_splitter = CharacterTextSplitter(chunk_size=1000, chunk_overlap=200) return text_splitter.split_text(text) def create_vector_store(self, texts): return FAISS.from_texts(texts, self.embeddings)