meta2023 / semantic.py
philmui's picture
no LLM check
efc4be1
import logging
from pathlib import Path
_logger = logging.getLogger("semantic")
from operator import itemgetter
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables.base import RunnableSequence
from langchain_core.vectorstores import VectorStore
from langchain.retrievers.multi_query import MultiQueryRetriever
from langchain_community.vectorstores import Qdrant
from langchain.schema.output_parser import StrOutputParser
from langchain.schema.runnable import RunnablePassthrough
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.document_loaders import PyMuPDFLoader
from langchain_experimental.text_splitter import SemanticChunker
from globals import (
embeddings,
gpt35_model,
gpt4_model,
META_10K_FILE_PATH,
META_SEMANTIC_COLLECTION,
VECTOR_STORE_PATH
)
USE_MEMORY = True
from qdrant_client import QdrantClient
qclient: QdrantClient
if USE_MEMORY == True:
qclient = QdrantClient(":memory:")
else:
qclient = QdrantClient(path=VECTOR_STORE_PATH)
RAG_PROMPT = """
Reply the user's query thoughtfully and clearly.
You should only respond to user's query if the context is related to the query.
If you are not sure how to answer, please reply "I don't know".
Respond with structure in markdown.
CONTEXT:
{context}
QUERY:
{question}
YOUR REPLY: """
rag_prompt = ChatPromptTemplate.from_template(RAG_PROMPT)
class SemanticStoreFactory:
_semantic_vectorstore: VectorStore = None
@classmethod
def __load_semantic_store(
cls
) -> VectorStore:
path = Path(VECTOR_STORE_PATH)
store = None
# check if path exists and if it is not empty
if path.exists() and path.is_dir() and any(path.iterdir()):
_logger.info(f"\tQdrant loading ...")
store = Qdrant(
client=qclient,
embeddings=embeddings,
collection_name=META_SEMANTIC_COLLECTION,
)
else:
_logger.info(f"\tQdrant creating ...")
store = cls.__create_semantic_store()
return store
@classmethod
def __create_semantic_store(
cls
) -> VectorStore:
if USE_MEMORY == True:
_logger.info(f"creating semantic vector store: {USE_MEMORY}")
else:
_logger.info(f"creating semantic vector store: {VECTOR_STORE_PATH}")
path = Path(VECTOR_STORE_PATH)
if not path.exists():
path.mkdir(parents=True, exist_ok=True)
_logger.info(f"Directory '{path}' created.")
_logger.info(f"loading {META_10K_FILE_PATH}")
documents = PyMuPDFLoader(META_10K_FILE_PATH).load()
_logger.info(f"\tload { len(documents) } docs")
semantic_chunker = SemanticChunker(
embeddings=embeddings,
breakpoint_threshold_type="percentile"
)
semantic_chunks = semantic_chunker.create_documents([d.page_content for d in documents])
_logger.info(f"created semantic_chunks: {len(semantic_chunks)}")
if USE_MEMORY == True:
_logger.info(f"\t==> creating memory vectorstore ...")
semantic_chunk_vectorstore = Qdrant.from_documents(
semantic_chunks,
embeddings,
location=":memory:",
collection_name=META_SEMANTIC_COLLECTION,
force_recreate=True
)
_logger.info(f"\t==> finished constructing vectorstore")
else:
semantic_chunk_vectorstore = Qdrant.from_documents(
semantic_chunks,
embeddings,
path=VECTOR_STORE_PATH,
collection_name=META_SEMANTIC_COLLECTION,
force_recreate=True
)
_logger.info(f"\t==> return vectorstore")
return semantic_chunk_vectorstore
@classmethod
def get_semantic_store(
cls
) -> VectorStore:
_logger.info(f"get_semantic_store")
if cls._semantic_vectorstore is None:
if USE_MEMORY == True:
cls._semantic_vectorstore = cls.__create_semantic_store()
_logger.info(f"received semantic_vectorstore")
else:
print(f"Loading semantic vectorstore {META_SEMANTIC_COLLECTION} from: {VECTOR_STORE_PATH}")
try:
# first try to load the store
cls._semantic_vectorstore = cls.__load_semantic_store()
except Exception as e:
_logger.warning(f"cannot load: {e}")
cls._semantic_vectorstore = cls.__create_semantic_store()
_logger.info(f"RETURNING get_semantic_store")
return cls._semantic_vectorstore
class SemanticRAGChainFactory:
_chain: RunnableSequence = None
@classmethod
def get_semantic_rag_chain(
cls
) -> RunnableSequence:
if cls._chain is None:
_logger.info(f"creating SemanticRAGChainFactory")
semantic_store = SemanticStoreFactory.get_semantic_store()
_logger.info(f"\treceived semantic_store")
if semantic_store is not None:
semantic_chunk_retriever = semantic_store.as_retriever()
semantic_mquery_retriever = MultiQueryRetriever.from_llm(
retriever=semantic_chunk_retriever,
llm=gpt4_model
)
cls._chain = (
# INVOKE CHAIN WITH: {"question" : "<<SOME USER QUESTION>>"}
# "question" : populated by getting the value of the "question" key
# "context" : populated by getting the value of the "question" key and chaining it into the base_retriever
{"context": itemgetter("question") | semantic_mquery_retriever, "question": itemgetter("question")}
# "context" : is assigned to a RunnablePassthrough object (will not be called or considered in the next step)
# by getting the value of the "context" key from the previous step
| RunnablePassthrough.assign(context=itemgetter("context"))
# "response" : the "context" and "question" values are used to format our prompt object and then piped
# into the LLM and stored in a key called "response"
# "context" : populated by getting the value of the "context" key from the previous step
| {"response": rag_prompt | gpt4_model, "context": itemgetter("context")}
)
_logger.info(f"\t_chain constructed")
return cls._chain