|
import os |
|
from dotenv import load_dotenv |
|
|
|
from .ConnectorStrategy import ConnectorStrategy |
|
|
|
from pinecone import Pinecone, ServerlessSpec |
|
from langchain_openai import OpenAIEmbeddings |
|
from langchain_pinecone import PineconeVectorStore |
|
from langchain_core.documents import Document |
|
|
|
import unicodedata |
|
import time |
|
|
|
class PineconeConnector(ConnectorStrategy): |
|
def __init__(self): |
|
|
|
load_dotenv() |
|
|
|
pinecone_api_key = os.environ.get("PINECONE_API_KEY") |
|
|
|
self.index_name = os.environ.get("PINECONE_INDEX_NAME") |
|
self.namespace = os.environ.get("PINECONE_NAMESPACE") |
|
|
|
|
|
pc = Pinecone(api_key=pinecone_api_key) |
|
|
|
existing_indexes = [index_info["name"] for index_info in pc.list_indexes()] |
|
|
|
if self.index_name not in existing_indexes: |
|
pc.create_index( |
|
name=self.index_name, |
|
dimension=3072, |
|
metric="cosine", |
|
spec=ServerlessSpec(cloud="aws", region="us-east-1"), |
|
) |
|
while not pc.describe_index(self.index_name).status["ready"]: |
|
time.sleep(1) |
|
|
|
self.index = pc.Index(self.index_name) |
|
|
|
|
|
def getDocs(self): |
|
|
|
|
|
docs_names = [] |
|
for ids in self.index.list(namespace=self.namespace): |
|
for id in ids: |
|
name_doc = "_".join(id.split("_")[:-1]) |
|
if name_doc not in docs_names: |
|
docs_names.append(name_doc) |
|
|
|
return docs_names |
|
|
|
|
|
def addDoc(self, filename, text_chunks, embedding): |
|
try: |
|
vector_store = PineconeVectorStore(index=self.index, embedding=embedding,namespace=self.namespace) |
|
|
|
file_name = filename.split(".")[0].replace(" ","_").replace("-","_").replace(".","_").replace("/","_").replace("\\","_").strip() |
|
|
|
documents = [] |
|
uuids = [] |
|
|
|
for i, chunk in enumerate(text_chunks): |
|
clean_filename = remove_non_standard_ascii(file_name) |
|
uuid = f"{clean_filename}_{i}" |
|
|
|
document = Document( |
|
page_content=chunk, |
|
metadata={ "filename":filename, "chunk_id":uuid }, |
|
) |
|
|
|
uuids.append(uuid) |
|
documents.append(document) |
|
|
|
|
|
vector_store.add_documents(documents=documents, ids=uuids) |
|
|
|
return {"filename_id":clean_filename} |
|
|
|
except Exception as e: |
|
print(e) |
|
return False |
|
|
|
def retriever(self, query, embedding): |
|
|
|
vector_store = PineconeVectorStore(index=self.index, embedding=embedding,namespace=self.namespace) |
|
|
|
retriever = vector_store.as_retriever( |
|
search_type="similarity_score_threshold", |
|
search_kwargs={"k": 3, "score_threshold": 0.6}, |
|
) |
|
|
|
return retriever.invoke(query) |
|
|
|
|
|
def remove_non_standard_ascii(input_string: str) -> str: |
|
normalized_string = unicodedata.normalize('NFKD', input_string) |
|
return ''.join(char for char in normalized_string if 'a' <= char <= 'z' or 'A' <= char <= 'Z' or char.isdigit() or char in ' .,!?') |
|
|
|
|