Redmind_GPT_API_Aug1 / services /file_upload_service.py
lakshmivairamani's picture
Upload 16 files
8bac072 verified
raw
history blame
6.3 kB
import io
import os
import tempfile
import hashlib
import json
import logging
import pandas as pd
from datetime import datetime
from dotenv import load_dotenv
from langchain_community.vectorstores import FAISS
from langchain_openai import OpenAIEmbeddings
from langchain.text_splitter import CharacterTextSplitter
from PyPDF2 import PdfReader
from docx import Document
# from transformers import pipeline
# Load environment variables
load_dotenv()
open_api_key_token = os.getenv('OPENAI_API_KEY')
class FileHandler:
def __init__(self, vector_db_path):
self.vector_db_path = vector_db_path
self.embeddings = OpenAIEmbeddings(api_key=open_api_key_token)
# self.summarizer = pipeline("summarization")
def prepare_metadata_string(self, document_name, document_description, department, version, last_updated):
metadata_string = f"\nDocument Name: {document_name}\nDocument Description: {document_description}\nDepartment: {department}\nVersion: {version}\nLast Updated: {last_updated}"
return metadata_string
async def handle_file_upload(self, file, document_name, document_description, department, version, last_updated):
content = await file.read()
file_hash = hashlib.md5(content).hexdigest()
file_key = f"{file.filename}_{file_hash}"
vector_store_path = os.path.join(self.vector_db_path, f"{file_key}.vectorstore")
metadata_path = os.path.join(self.vector_db_path, f"{file_key}.metadata.json")
metadata_string = self.prepare_metadata_string(document_name, document_description, department, version,
last_updated)
if os.path.exists(vector_store_path) and os.path.exists(metadata_path):
with open(metadata_path, 'r') as md_file:
metadata = json.load(md_file)
return {'path': vector_store_path, 'metadata': metadata, 'status': 'skipped - duplicate'}
if file.filename.endswith('.csv') or file.filename.endswith('.xlsx'):
texts = self.load_and_split_table(content, file.filename,metadata_string)
else:
texts = await self.load_and_split_text(content, file.filename,metadata_string)
vector_store = self.create_vector_store(texts)
vector_store.save_local(vector_store_path)
metadata = {
'filename': file.filename,
'document_name': document_name,
'document_description': document_description,
'department': department,
'version': version,
'last_updated': last_updated,
'hash': file_hash,
'upload_date': datetime.now().isoformat(),
'file_path': vector_store_path,
'file_size': len(content),
'content_type': file.content_type
}
with open(metadata_path, 'w') as md_file:
json.dump(metadata, md_file)
return {"message": "File processed and vector store created successfully", "file_metadata": metadata}
def summarize_text(self, text):
try:
summary = self.summarizer(text, max_length=150, min_length=10, do_sample=False)
logging.info("Text summarization successful")
return summary[0]['summary_text']
except Exception as e:
logging.error(f"Error in summarization: {str(e)}")
# Log error or handle exception
return text # Return original text if summarization is not possible
def load_and_split_table(self, content, filename,metadata_string):
# Handle CSV and Excel file reading
if filename.endswith('.csv'):
df = pd.read_csv(io.StringIO(content.decode('utf-8')))
else: # Excel
df = pd.read_excel(io.BytesIO(content))
text = df.to_string(index=False) # Convert DataFrame to string
text += metadata_string # Append metadata to the text
return self.split_text(text)
async def load_and_split_text(self, content, filename,metadata_string):
with tempfile.NamedTemporaryFile(delete=False, mode='w+b', suffix=f"_{filename}") as temp_file:
temp_file.write(content)
temp_file.flush()
temp_file_path = temp_file.name
# Ensure the temp file is closed before reading from it
if filename.endswith('.pdf'):
texts = await self.load_and_split_pdf(temp_file_path,metadata_string)
elif filename.endswith('.docx'):
texts = await self.load_and_split_docx(temp_file_path,metadata_string)
elif filename.endswith('.txt'):
texts = await self.load_and_split_txt(temp_file_path,metadata_string)
# Apply summarization here to each text segment
# summarized_texts = [self.summarize_text(text) for text in texts]
# os.unlink(temp_file_path) # Explicitly remove the temporary file
# return summarized_texts
os.unlink(temp_file_path) # Explicitly remove the temporary file
return texts
async def load_and_split_pdf(self, pdf_path,metadata_string):
reader = PdfReader(pdf_path)
text = ''
for page in reader.pages:
text += page.extract_text() or ""
text += metadata_string # Append metadata to the text
return self.split_text(text)
async def load_and_split_docx(self, docx_path,metadata_string):
doc = Document(docx_path)
text = '\n'.join([paragraph.text for paragraph in doc.paragraphs if paragraph.text])
text += metadata_string # Append metadata to the text
return self.split_text(text)
async def load_and_split_txt(self, txt_path,metadata_string):
with open(txt_path, 'r', encoding='utf-8') as file:
text = file.read()
text += metadata_string # Append metadata to the text
return self.split_text(text)
def split_text(self, text):
text_splitter = CharacterTextSplitter(chunk_size=1000, chunk_overlap=200)
return text_splitter.split_text(text)
def create_vector_store(self, texts):
return FAISS.from_texts(texts, self.embeddings)