Spaces:
Sleeping
Sleeping
File size: 5,209 Bytes
461c9df |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 |
import os
import hashlib
import json
import pandas as pd
from langchain_community.vectorstores import FAISS
from langchain_openai import OpenAIEmbeddings
from PyPDF2 import PdfReader
from docx import Document
class FileHandler:
def __init__(self, vector_db_path, open_api_key, grok_api_key):
self.vector_db_path = vector_db_path
self.openai_embeddings = OpenAIEmbeddings(api_key=open_api_key)
self.grok_api_key = grok_api_key
def handle_file_upload(self, file_name, file_content):
try:
# Debug the type of the file object
# Extract the base file name
base_file_name = os.path.basename(file_name)
# Replace spaces with underscores and make the name lowercase
formatted_file_name = base_file_name.replace(" ", "_").lower()
file_content_encode = file_content.encode('utf-8')
file_hash = hashlib.md5(file_content_encode).hexdigest()
file_key = f"{formatted_file_name}_{file_hash}"
vector_store_dir = os.path.join(self.vector_db_path, file_key)
os.makedirs(vector_store_dir, exist_ok=True)
vector_store_path = os.path.join(vector_store_dir, "index.faiss")
if os.path.exists(vector_store_path):
return {"message": "File already processed."}
# Process file based on type
if file_name.endswith(".pdf"):
texts, metadatas = self.load_and_split_pdf(file_content)
elif file_name.endswith(".docx"):
texts, metadatas = self.load_and_split_docx(file_content)
elif file_name.endswith(".txt"):
texts, metadatas = self.load_and_split_txt(file_content)
elif file_name.endswith(".xlsx"):
texts, metadatas = self.load_and_split_table(file_content)
elif file_name.endswith(".csv"):
texts, metadatas = self.load_and_split_csv(file_content)
else:
raise ValueError("Unsupported file format.")
if not texts:
return {"message": "No text extracted from the file. Check the file content."}
# # Generate embeddings using Grok API
vector_store = FAISS.from_texts(texts, self.openai_embeddings, metadatas=metadatas)
vector_store.save_local(vector_store_dir)
metadata = {
"filename": file_name,
"file_size": len(file_content),
}
metadata_path = os.path.join(vector_store_dir, "metadata.json")
with open(metadata_path, 'w') as md_file:
json.dump(metadata, md_file)
return {"message": "File processed successfully."}
except Exception as e:
return {"message": f"Error processing file: {str(e)}"}
def load_and_split_pdf(self, file):
reader = PdfReader(file)
texts = []
metadatas = []
for page_num, page in enumerate(reader.pages):
text = page.extract_text()
if text:
texts.append(text)
metadatas.append({"page_number": page_num + 1})
return texts, metadatas
def load_and_split_docx(self, file):
doc = Document(file)
texts = []
metadatas = []
for para_num, paragraph in enumerate(doc.paragraphs):
if paragraph.text:
texts.append(paragraph.text)
metadatas.append({"paragraph_number": para_num + 1})
return texts, metadatas
def load_and_split_txt(self, content):
text = content.decode("utf-8")
lines = text.split('\n')
texts = [line for line in lines if line.strip()]
metadatas = [{}] * len(texts)
return texts, metadatas
def load_and_split_table(self, content):
excel_data = pd.read_excel(content, sheet_name=None)
texts = []
metadatas = []
for sheet_name, df in excel_data.items():
df = df.dropna(how='all', axis=0).dropna(how='all', axis=1)
df = df.fillna('N/A')
for _, row in df.iterrows():
row_dict = row.to_dict()
# Combine key-value pairs into a string
row_text = ', '.join([f"{key}: {value}" for key, value in row_dict.items()])
texts.append(row_text)
metadatas.append({"sheet_name": sheet_name})
return texts, metadatas
def load_and_split_csv(self, content):
print('its csv')
csv_data = pd.read_csv(content)
print(csv_data)
texts = []
metadatas = []
csv_data = csv_data.dropna(how='all', axis=0).dropna(how='all', axis=1)
csv_data = csv_data.fillna('N/A')
for _, row in csv_data.iterrows():
row_dict = row.to_dict()
row_text = ', '.join([f"{key}: {value}" for key, value in row_dict.items()])
texts.append(row_text)
metadatas.append({"row_index": _})
print(texts)
return texts, metadatas
|