|
import io
|
|
import os
|
|
import tempfile
|
|
import hashlib
|
|
import json
|
|
import logging
|
|
import pandas as pd
|
|
from datetime import datetime
|
|
from dotenv import load_dotenv
|
|
from langchain_community.vectorstores import FAISS
|
|
from langchain_openai import OpenAIEmbeddings
|
|
from langchain.text_splitter import CharacterTextSplitter
|
|
from PyPDF2 import PdfReader
|
|
from docx import Document
|
|
|
|
|
|
|
|
load_dotenv()
|
|
open_api_key_token = os.getenv('OPENAI_API_KEY')
|
|
|
|
|
|
class FileHandler:
|
|
def __init__(self, vector_db_path):
|
|
self.vector_db_path = vector_db_path
|
|
self.embeddings = OpenAIEmbeddings(api_key=open_api_key_token)
|
|
|
|
|
|
def prepare_metadata_string(self, document_name, document_description, department, version, last_updated):
|
|
metadata_string = f"\nDocument Name: {document_name}\nDocument Description: {document_description}\nDepartment: {department}\nVersion: {version}\nLast Updated: {last_updated}"
|
|
return metadata_string
|
|
|
|
async def handle_file_upload(self, file, document_name, document_description, department, version, last_updated):
|
|
content = await file.read()
|
|
file_hash = hashlib.md5(content).hexdigest()
|
|
file_key = f"{file.filename}_{file_hash}"
|
|
vector_store_path = os.path.join(self.vector_db_path, f"{file_key}.vectorstore")
|
|
metadata_path = os.path.join(self.vector_db_path, f"{file_key}.metadata.json")
|
|
|
|
metadata_string = self.prepare_metadata_string(document_name, document_description, department, version,
|
|
last_updated)
|
|
|
|
if os.path.exists(vector_store_path) and os.path.exists(metadata_path):
|
|
with open(metadata_path, 'r') as md_file:
|
|
metadata = json.load(md_file)
|
|
return {'path': vector_store_path, 'metadata': metadata, 'status': 'skipped - duplicate'}
|
|
|
|
if file.filename.endswith('.csv') or file.filename.endswith('.xlsx'):
|
|
texts = self.load_and_split_table(content, file.filename,metadata_string)
|
|
else:
|
|
texts = await self.load_and_split_text(content, file.filename,metadata_string)
|
|
|
|
vector_store = self.create_vector_store(texts)
|
|
vector_store.save_local(vector_store_path)
|
|
|
|
metadata = {
|
|
'filename': file.filename,
|
|
'document_name': document_name,
|
|
'document_description': document_description,
|
|
'department': department,
|
|
'version': version,
|
|
'last_updated': last_updated,
|
|
'hash': file_hash,
|
|
'upload_date': datetime.now().isoformat(),
|
|
'file_path': vector_store_path,
|
|
'file_size': len(content),
|
|
'content_type': file.content_type
|
|
}
|
|
|
|
with open(metadata_path, 'w') as md_file:
|
|
json.dump(metadata, md_file)
|
|
|
|
return {"message": "File processed and vector store created successfully", "file_metadata": metadata}
|
|
|
|
def summarize_text(self, text):
|
|
try:
|
|
summary = self.summarizer(text, max_length=150, min_length=10, do_sample=False)
|
|
logging.info("Text summarization successful")
|
|
return summary[0]['summary_text']
|
|
except Exception as e:
|
|
logging.error(f"Error in summarization: {str(e)}")
|
|
|
|
return text
|
|
|
|
def load_and_split_table(self, content, filename,metadata_string):
|
|
|
|
if filename.endswith('.csv'):
|
|
df = pd.read_csv(io.StringIO(content.decode('utf-8')))
|
|
else:
|
|
df = pd.read_excel(io.BytesIO(content))
|
|
text = df.to_string(index=False)
|
|
text += metadata_string
|
|
return self.split_text(text)
|
|
|
|
async def load_and_split_text(self, content, filename,metadata_string):
|
|
with tempfile.NamedTemporaryFile(delete=False, mode='w+b', suffix=f"_{filename}") as temp_file:
|
|
temp_file.write(content)
|
|
temp_file.flush()
|
|
temp_file_path = temp_file.name
|
|
|
|
|
|
if filename.endswith('.pdf'):
|
|
texts = await self.load_and_split_pdf(temp_file_path,metadata_string)
|
|
elif filename.endswith('.docx'):
|
|
texts = await self.load_and_split_docx(temp_file_path,metadata_string)
|
|
elif filename.endswith('.txt'):
|
|
texts = await self.load_and_split_txt(temp_file_path,metadata_string)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
os.unlink(temp_file_path)
|
|
return texts
|
|
|
|
async def load_and_split_pdf(self, pdf_path,metadata_string):
|
|
reader = PdfReader(pdf_path)
|
|
text = ''
|
|
for page in reader.pages:
|
|
text += page.extract_text() or ""
|
|
text += metadata_string
|
|
return self.split_text(text)
|
|
|
|
async def load_and_split_docx(self, docx_path,metadata_string):
|
|
doc = Document(docx_path)
|
|
text = '\n'.join([paragraph.text for paragraph in doc.paragraphs if paragraph.text])
|
|
text += metadata_string
|
|
return self.split_text(text)
|
|
|
|
async def load_and_split_txt(self, txt_path,metadata_string):
|
|
with open(txt_path, 'r', encoding='utf-8') as file:
|
|
text = file.read()
|
|
text += metadata_string
|
|
return self.split_text(text)
|
|
|
|
def split_text(self, text):
|
|
text_splitter = CharacterTextSplitter(chunk_size=1000, chunk_overlap=200)
|
|
return text_splitter.split_text(text)
|
|
|
|
def create_vector_store(self, texts):
|
|
return FAISS.from_texts(texts, self.embeddings)
|
|
|