Spaces:
Sleeping
Sleeping
File size: 8,216 Bytes
19a6fbb 8ed49ee 19a6fbb 8ed49ee 19a6fbb 8ed49ee 19a6fbb 8ed49ee 19a6fbb a409078 19a6fbb a409078 19a6fbb a409078 19a6fbb 8ed49ee a409078 19a6fbb |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 |
import clip
import clip.model
import faiss
import io
from langchain_text_splitters import CharacterTextSplitter
import os
import pandas as pd
from PyPDF2 import PdfReader
from PIL import Image
from sentence_transformers import SentenceTransformer
import streamlit as st
import torch
import time
import whisper
device = "cuda" if torch.cuda.is_available() else "cpu"
os.makedirs("./vectorstore", exist_ok=True)
def update_vectordb(index_path: str, embedding: torch.Tensor, image_path: str = None, text_content: str = None, audio_path: str = None):
if not image_path and not text_content:
raise ValueError("Either image_path or text_content must be provided.")
if audio_path and not text_content:
raise ValueError("text_content must be provided when audio_path is provided.")
if not os.path.exists(f"./vectorstore/{index_path}"):
if image_path:
index = faiss.IndexFlatL2(512)
else:
index = faiss.IndexFlatL2(384)
else:
index = faiss.read_index(f"./vectorstore/{index_path}")
try:
index.add(embedding.cpu().numpy())
except:
if len(embedding.shape) == 1:
embedding = torch.Tensor([embedding])
index.add(embedding)
faiss.write_index(index, f'./vectorstore/{index_path}')
if image_path:
if not os.path.exists("./vectorstore/image_data.csv"):
df = pd.DataFrame([{"path": image_path, "index": 0}]).reset_index(drop=True)
df.to_csv("./vectorstore/image_data.csv", index=False)
else:
df = pd.read_csv("./vectorstore/image_data.csv").reset_index(drop=True)
new_entry_df = pd.DataFrame({"path": image_path, "index": len(df)}, index=[0])
df = pd.concat([df, new_entry_df], ignore_index=True)
df.to_csv("./vectorstore/image_data.csv", index=False)
elif audio_path:
if not os.path.exists("./vectorstore/audio_data.csv"):
df = pd.DataFrame([{"path": audio_path, "content": text_content, "index": 0}]).reset_index(drop=True)
df.to_csv("./vectorstore/audio_data.csv", index=False)
else:
df = pd.read_csv("./vectorstore/audio_data.csv").reset_index(drop=True)
new_entry_df = pd.DataFrame({"path": audio_path, "content": text_content, "index": len(df)}, index=[0])
df = pd.concat([df, new_entry_df], ignore_index=True)
df.to_csv("./vectorstore/audio_data.csv", index=False)
elif text_content:
if not os.path.exists("./vectorstore/text_data.csv"):
df = pd.DataFrame([{"content": text_content, "index": 0}]).reset_index(drop=True)
df.to_csv("./vectorstore/text_data.csv", index=False)
else:
df = pd.read_csv("./vectorstore/text_data.csv").reset_index(drop=True)
new_entry_df = pd.DataFrame({"content": text_content, "index": len(df)}, index=[0])
df = pd.concat([df, new_entry_df], ignore_index=True)
df.to_csv("./vectorstore/text_data.csv", index=False)
else:
raise ValueError("Either image_path or text_content must be provided.")
return index
def add_image_to_index(image, model: clip.model.CLIP, preprocess):
if hasattr(image, "name"):
image_name = image.name
else:
image_name = f"{time.time()}.png"
image_name = image_name.replace(" ", "_")
os.makedirs("./images", exist_ok=True)
os.makedirs("./vectorstore", exist_ok=True)
with open(f"./images/{image_name}", "wb") as f:
try:
f.write(image.read())
except:
if hasattr(image, "data"):
image = io.BytesIO(image.data)
else:
image = io.BytesIO(image)
f.write(image.read())
image = Image.open(f"./images/{image_name}")
with torch.no_grad():
image = preprocess(image).unsqueeze(0).to(device)
image_features = model.encode_image(image)
index = update_vectordb(index_path="image_index.index", embedding=image_features, image_path=f"./images/{image_name}")
return index
def add_pdf_to_index(pdf, clip_model: clip.model.CLIP, preprocess, text_embedding_model: SentenceTransformer):
if not os.path.exists("./vectorstore/"):
os.makedirs("./vectorstore")
pdf_name = pdf.name
pdf_name = pdf_name.replace(" ", "_")
pdf_reader = PdfReader(pdf)
pdf_pages_data = []
pdf_texts = []
text_splitter = CharacterTextSplitter(
separator="\n",
chunk_size=1000,
chunk_overlap=200,
length_function=len,
is_separator_regex=False,
)
progress_bar = st.progress(0)
for page_num, page in enumerate(pdf_reader.pages):
try:
page_images = page.images
except:
page_images = []
st.error("Some images in the PDF are not readable. Please try another PDF.")
for image in page_images:
image.name = f"{time.time()}.png"
add_image_to_index(image, clip_model, preprocess)
pdf_pages_data.append({f"page_number": page_num, "content": image, "type": "image"})
page_text = page.extract_text()
pdf_texts.append(page_text)
if page_text != "" or page_text.strip() != "":
chunks = text_splitter.split_text(page_text)
text_embeddings = text_embedding_model.encode(chunks)
for i, chunk in enumerate(chunks):
update_vectordb(index_path="text_index.index", embedding=text_embeddings[i], text_content=chunk)
pdf_pages_data.append({f"page_number": page_num, "content": chunk, "type": "text"})
percent_complete = ((page_num + 1) / len(pdf_reader.pages))
progress_bar.progress(percent_complete, f"Processing Page {page_num + 1}/{len(pdf_reader.pages)}")
return pdf_pages_data
def add_audio_to_index(audio, whisper_model: whisper.Whisper, text_embedding_model: SentenceTransformer):
if not os.path.exists("./vectorstore/"):
os.makedirs("./vectorstore")
if not os.path.exists("./audio"):
os.makedirs("./audio")
if hasattr(audio, "name"):
audio_name = audio.name
else:
audio_name = f"{time.time()}.wav"
audio_name = audio_name.replace(" ", "_")
with open(f"./audio/{audio_name}", "wb") as f:
try:
f.write(audio.read())
except:
if hasattr(audio, "data"):
audio = io.BytesIO(audio.data)
else:
audio = io.BytesIO(audio)
f.write(audio.read())
audio_transcript: str = whisper_model.transcribe(f"./audio/{audio_name}")["text"]
text_splitter = CharacterTextSplitter(
separator="\n",
chunk_size=1000,
chunk_overlap=200,
length_function=len,
is_separator_regex=False,
)
chunks = text_splitter.split_text(audio_transcript)
text_embeddings = text_embedding_model.encode(chunks)
for i, chunk in enumerate(chunks):
update_vectordb(index_path="audio_index.index", embedding=text_embeddings[i], text_content=chunk, audio_path=f"./audio/{audio_name}")
return audio_transcript
def search_image_index_with_image(image_features, index: faiss.IndexFlatL2, clip_model: clip.model.CLIP, k: int = 3):
with torch.no_grad():
distances, indices = index.search(image_features.cpu().numpy(), k)
return indices
def search_text_index_with_image(text_embeddings, index: faiss.IndexFlatL2, text_embedding_model: SentenceTransformer, k: int = 3):
distances, indices = index.search(text_embeddings, k)
return indices
def search_image_index(text_input: str, index: faiss.IndexFlatL2, clip_model: clip.model.CLIP, k: int = 3):
with torch.no_grad():
text = clip.tokenize([text_input]).to(device)
text_features = clip_model.encode_text(text)
distances, indices = index.search(text_features.cpu().numpy(), k)
return indices
def search_text_index(text_input: str, index: faiss.IndexFlatL2, text_embedding_model: SentenceTransformer, k: int = 3):
text_embeddings = text_embedding_model.encode([text_input])
distances, indices = index.search(text_embeddings, k)
return indices
|