File size: 6,295 Bytes
8bac072
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
import io
import os
import tempfile
import hashlib
import json
import logging
import pandas as pd
from datetime import datetime
from dotenv import load_dotenv
from langchain_community.vectorstores import FAISS
from langchain_openai import OpenAIEmbeddings
from langchain.text_splitter import CharacterTextSplitter
from PyPDF2 import PdfReader
from docx import Document
# from transformers import pipeline

# Load environment variables
load_dotenv()
open_api_key_token = os.getenv('OPENAI_API_KEY')


class FileHandler:
    def __init__(self, vector_db_path):
        self.vector_db_path = vector_db_path
        self.embeddings = OpenAIEmbeddings(api_key=open_api_key_token)
        # self.summarizer = pipeline("summarization")

    def prepare_metadata_string(self, document_name, document_description, department, version, last_updated):
        metadata_string = f"\nDocument Name: {document_name}\nDocument Description: {document_description}\nDepartment: {department}\nVersion: {version}\nLast Updated: {last_updated}"
        return metadata_string

    async def handle_file_upload(self, file, document_name, document_description, department, version, last_updated):
        content = await file.read()
        file_hash = hashlib.md5(content).hexdigest()
        file_key = f"{file.filename}_{file_hash}"
        vector_store_path = os.path.join(self.vector_db_path, f"{file_key}.vectorstore")
        metadata_path = os.path.join(self.vector_db_path, f"{file_key}.metadata.json")

        metadata_string = self.prepare_metadata_string(document_name, document_description, department, version,
                                                       last_updated)

        if os.path.exists(vector_store_path) and os.path.exists(metadata_path):
            with open(metadata_path, 'r') as md_file:
                metadata = json.load(md_file)
            return {'path': vector_store_path, 'metadata': metadata, 'status': 'skipped - duplicate'}

        if file.filename.endswith('.csv') or file.filename.endswith('.xlsx'):
            texts = self.load_and_split_table(content, file.filename,metadata_string)
        else:
            texts = await self.load_and_split_text(content, file.filename,metadata_string)

        vector_store = self.create_vector_store(texts)
        vector_store.save_local(vector_store_path)

        metadata = {
            'filename': file.filename,
            'document_name': document_name,
            'document_description': document_description,
            'department': department,
            'version': version,
            'last_updated': last_updated,
            'hash': file_hash,
            'upload_date': datetime.now().isoformat(),
            'file_path': vector_store_path,
            'file_size': len(content),
            'content_type': file.content_type
        }

        with open(metadata_path, 'w') as md_file:
            json.dump(metadata, md_file)

        return {"message": "File processed and vector store created successfully", "file_metadata": metadata}

    def summarize_text(self, text):
        try:
            summary = self.summarizer(text, max_length=150, min_length=10, do_sample=False)
            logging.info("Text summarization successful")
            return summary[0]['summary_text']
        except Exception as e:
            logging.error(f"Error in summarization: {str(e)}")
            # Log error or handle exception
            return text  # Return original text if summarization is not possible

    def load_and_split_table(self, content, filename,metadata_string):
        # Handle CSV and Excel file reading
        if filename.endswith('.csv'):
            df = pd.read_csv(io.StringIO(content.decode('utf-8')))
        else:  # Excel
            df = pd.read_excel(io.BytesIO(content))
        text = df.to_string(index=False)  # Convert DataFrame to string
        text += metadata_string  # Append metadata to the text
        return self.split_text(text)

    async def load_and_split_text(self, content, filename,metadata_string):
        with tempfile.NamedTemporaryFile(delete=False, mode='w+b', suffix=f"_{filename}") as temp_file:
            temp_file.write(content)
            temp_file.flush()
            temp_file_path = temp_file.name

        # Ensure the temp file is closed before reading from it
        if filename.endswith('.pdf'):
            texts = await self.load_and_split_pdf(temp_file_path,metadata_string)
        elif filename.endswith('.docx'):
            texts = await self.load_and_split_docx(temp_file_path,metadata_string)
        elif filename.endswith('.txt'):
            texts = await self.load_and_split_txt(temp_file_path,metadata_string)

        # Apply summarization here to each text segment
        # summarized_texts = [self.summarize_text(text) for text in texts]

        # os.unlink(temp_file_path)  # Explicitly remove the temporary file
        # return summarized_texts
        os.unlink(temp_file_path)  # Explicitly remove the temporary file
        return texts

    async def load_and_split_pdf(self, pdf_path,metadata_string):
        reader = PdfReader(pdf_path)
        text = ''
        for page in reader.pages:
            text += page.extract_text() or ""
            text += metadata_string  # Append metadata to the text
        return self.split_text(text)

    async def load_and_split_docx(self, docx_path,metadata_string):
        doc = Document(docx_path)
        text = '\n'.join([paragraph.text for paragraph in doc.paragraphs if paragraph.text])
        text += metadata_string  # Append metadata to the text
        return self.split_text(text)

    async def load_and_split_txt(self, txt_path,metadata_string):
        with open(txt_path, 'r', encoding='utf-8') as file:
            text = file.read()
            text += metadata_string  # Append metadata to the text
        return self.split_text(text)

    def split_text(self, text):
        text_splitter = CharacterTextSplitter(chunk_size=1000, chunk_overlap=200)
        return text_splitter.split_text(text)

    def create_vector_store(self, texts):
        return FAISS.from_texts(texts, self.embeddings)