File size: 6,295 Bytes
8bac072 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 |
import io
import os
import tempfile
import hashlib
import json
import logging
import pandas as pd
from datetime import datetime
from dotenv import load_dotenv
from langchain_community.vectorstores import FAISS
from langchain_openai import OpenAIEmbeddings
from langchain.text_splitter import CharacterTextSplitter
from PyPDF2 import PdfReader
from docx import Document
# from transformers import pipeline
# Load environment variables
load_dotenv()
open_api_key_token = os.getenv('OPENAI_API_KEY')
class FileHandler:
def __init__(self, vector_db_path):
self.vector_db_path = vector_db_path
self.embeddings = OpenAIEmbeddings(api_key=open_api_key_token)
# self.summarizer = pipeline("summarization")
def prepare_metadata_string(self, document_name, document_description, department, version, last_updated):
metadata_string = f"\nDocument Name: {document_name}\nDocument Description: {document_description}\nDepartment: {department}\nVersion: {version}\nLast Updated: {last_updated}"
return metadata_string
async def handle_file_upload(self, file, document_name, document_description, department, version, last_updated):
content = await file.read()
file_hash = hashlib.md5(content).hexdigest()
file_key = f"{file.filename}_{file_hash}"
vector_store_path = os.path.join(self.vector_db_path, f"{file_key}.vectorstore")
metadata_path = os.path.join(self.vector_db_path, f"{file_key}.metadata.json")
metadata_string = self.prepare_metadata_string(document_name, document_description, department, version,
last_updated)
if os.path.exists(vector_store_path) and os.path.exists(metadata_path):
with open(metadata_path, 'r') as md_file:
metadata = json.load(md_file)
return {'path': vector_store_path, 'metadata': metadata, 'status': 'skipped - duplicate'}
if file.filename.endswith('.csv') or file.filename.endswith('.xlsx'):
texts = self.load_and_split_table(content, file.filename,metadata_string)
else:
texts = await self.load_and_split_text(content, file.filename,metadata_string)
vector_store = self.create_vector_store(texts)
vector_store.save_local(vector_store_path)
metadata = {
'filename': file.filename,
'document_name': document_name,
'document_description': document_description,
'department': department,
'version': version,
'last_updated': last_updated,
'hash': file_hash,
'upload_date': datetime.now().isoformat(),
'file_path': vector_store_path,
'file_size': len(content),
'content_type': file.content_type
}
with open(metadata_path, 'w') as md_file:
json.dump(metadata, md_file)
return {"message": "File processed and vector store created successfully", "file_metadata": metadata}
def summarize_text(self, text):
try:
summary = self.summarizer(text, max_length=150, min_length=10, do_sample=False)
logging.info("Text summarization successful")
return summary[0]['summary_text']
except Exception as e:
logging.error(f"Error in summarization: {str(e)}")
# Log error or handle exception
return text # Return original text if summarization is not possible
def load_and_split_table(self, content, filename,metadata_string):
# Handle CSV and Excel file reading
if filename.endswith('.csv'):
df = pd.read_csv(io.StringIO(content.decode('utf-8')))
else: # Excel
df = pd.read_excel(io.BytesIO(content))
text = df.to_string(index=False) # Convert DataFrame to string
text += metadata_string # Append metadata to the text
return self.split_text(text)
async def load_and_split_text(self, content, filename,metadata_string):
with tempfile.NamedTemporaryFile(delete=False, mode='w+b', suffix=f"_{filename}") as temp_file:
temp_file.write(content)
temp_file.flush()
temp_file_path = temp_file.name
# Ensure the temp file is closed before reading from it
if filename.endswith('.pdf'):
texts = await self.load_and_split_pdf(temp_file_path,metadata_string)
elif filename.endswith('.docx'):
texts = await self.load_and_split_docx(temp_file_path,metadata_string)
elif filename.endswith('.txt'):
texts = await self.load_and_split_txt(temp_file_path,metadata_string)
# Apply summarization here to each text segment
# summarized_texts = [self.summarize_text(text) for text in texts]
# os.unlink(temp_file_path) # Explicitly remove the temporary file
# return summarized_texts
os.unlink(temp_file_path) # Explicitly remove the temporary file
return texts
async def load_and_split_pdf(self, pdf_path,metadata_string):
reader = PdfReader(pdf_path)
text = ''
for page in reader.pages:
text += page.extract_text() or ""
text += metadata_string # Append metadata to the text
return self.split_text(text)
async def load_and_split_docx(self, docx_path,metadata_string):
doc = Document(docx_path)
text = '\n'.join([paragraph.text for paragraph in doc.paragraphs if paragraph.text])
text += metadata_string # Append metadata to the text
return self.split_text(text)
async def load_and_split_txt(self, txt_path,metadata_string):
with open(txt_path, 'r', encoding='utf-8') as file:
text = file.read()
text += metadata_string # Append metadata to the text
return self.split_text(text)
def split_text(self, text):
text_splitter = CharacterTextSplitter(chunk_size=1000, chunk_overlap=200)
return text_splitter.split_text(text)
def create_vector_store(self, texts):
return FAISS.from_texts(texts, self.embeddings)
|