common / vectore_store /PineconeConnector.py
cd@bziiit.com
Remove debug print statements from PineconeConnector
b82230b
raw
history blame
3.18 kB
import os
from dotenv import load_dotenv
from .ConnectorStrategy import ConnectorStrategy
from pinecone import Pinecone, ServerlessSpec
from langchain_openai import OpenAIEmbeddings
from langchain_pinecone import PineconeVectorStore
from langchain_core.documents import Document
import unicodedata
import time
class PineconeConnector(ConnectorStrategy):
def __init__(self):
load_dotenv()
pinecone_api_key = os.environ.get("PINECONE_API_KEY")
self.index_name = os.environ.get("PINECONE_INDEX_NAME")
self.namespace = os.environ.get("PINECONE_NAMESPACE")
pc = Pinecone(api_key=pinecone_api_key)
existing_indexes = [index_info["name"] for index_info in pc.list_indexes()]
if self.index_name not in existing_indexes:
pc.create_index(
name=self.index_name,
dimension=3072,
metric="cosine",
spec=ServerlessSpec(cloud="aws", region="us-east-1"),
)
while not pc.describe_index(self.index_name).status["ready"]:
time.sleep(1)
self.index = pc.Index(self.index_name)
def getDocs(self):
# Simulate getting docs from Pinecone
docs_names = []
for ids in self.index.list(namespace=self.namespace):
for id in ids:
name_doc = "_".join(id.split("_")[:-1])
if name_doc not in docs_names:
docs_names.append(name_doc)
return docs_names
def addDoc(self, filename, text_chunks, embedding):
try:
vector_store = PineconeVectorStore(index=self.index, embedding=embedding,namespace=self.namespace)
file_name = filename.split(".")[0].replace(" ","_").replace("-","_").replace(".","_").replace("/","_").replace("\\","_").strip()
documents = []
uuids = []
for i, chunk in enumerate(text_chunks):
clean_filename = remove_non_standard_ascii(file_name)
uuid = f"{clean_filename}_{i}"
document = Document(
page_content=chunk,
metadata={ "filename":filename, "chunk_id":uuid },
)
uuids.append(uuid)
documents.append(document)
vector_store.add_documents(documents=documents, ids=uuids)
return {"filename_id":clean_filename}
except Exception as e:
print(e)
return False
def retriever(self, query, embedding):
vector_store = PineconeVectorStore(index=self.index, embedding=embedding,namespace=self.namespace)
retriever = vector_store.as_retriever(
search_type="similarity_score_threshold",
search_kwargs={"k": 3, "score_threshold": 0.6},
)
return retriever.invoke(query)
def remove_non_standard_ascii(input_string: str) -> str:
normalized_string = unicodedata.normalize('NFKD', input_string)
return ''.join(char for char in normalized_string if 'a' <= char <= 'z' or 'A' <= char <= 'Z' or char.isdigit() or char in ' .,!?')