# Import import os #from dotenv import load_dotenv from langchain_openai import ChatOpenAI from pymilvus import connections, utility from langchain_openai import OpenAIEmbeddings from langchain_milvus.vectorstores import Milvus from langchain.chains import create_retrieval_chain from langchain.chains import create_history_aware_retriever from langchain_core.chat_history import BaseChatMessageHistory from langchain_text_splitters import RecursiveCharacterTextSplitter from langchain_core.runnables.history import RunnableWithMessageHistory from langchain_community.chat_message_histories import ChatMessageHistory from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder from langchain.chains.combine_documents import create_stuff_documents_chain # Environment Settings #load_dotenv() openai_api_key = os.getenv("OPENAI_API_KEY") cloud_api_key = os.getenv("CLOUD_API_KEY") cloud_uri = os.getenv("URI") # Database Connection class DatabaseManagement: """ Connects Milvus database """ def __init__(self): """ Connects to Milvus server and calls initiliaze_database function """ # Connects to Milvus server connections.connect(alias="default", uri=cloud_uri, token=cloud_api_key, timeout=120) print("Connected to the Milvus Server") # Manages vectorstore class VectorStoreManagement: """ Creates vectorstore from Milvus if vectorstore is not defined or defined as None Methods ------ create_vectorstore() Checks whether vectorstore is defined or not defined. If is defined, splits the data into smaller chunks and creates vectorstore from Milvus """ def __init__(self, document): """ Initialize document, embedding and vectorstore and calls create_vectorstore function Parameters ---------- document: list Document from langchain_core.documents inside a list embedding: Openai embeddings """ self.document = document self.vectorstore = None self.create_vectorstore() def create_vectorstore(self): """ create_vectorstore() Checks whether vectorstore is defined or not defined. If it is defined, splits the data into smaller chunks and creates vectorstore from Milvus """ # Define collection name collection_name = "RAG_Milvus" # Creates collection under ChatRAG database if collection_name not in utility.list_collections(): print("RAG_Milvus collection does not exist under the ChatRAG database") # Split the string data into smaller chunks textsplitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=200, length_function=len) chunks_data = textsplitter.split_documents(documents=self.document) # Create vectorstore from Milvus self.vectorstore = Milvus.from_documents(documents=chunks_data, embedding=OpenAIEmbeddings(openai_api_key=openai_api_key), collection_name=collection_name, connection_args={"uri":cloud_uri, "token":cloud_api_key}) print("RAG_Milvus collection is created under ChatRAG database") else: print("RAG_Milvus collection already exist") self.vectorstore = Milvus(embedding_function=OpenAIEmbeddings(openai_api_key=openai_api_key), collection_name=collection_name, connection_args={"uri":cloud_uri, "token":cloud_api_key}) # RAG class to retrieve ai response for a given user query class RAG: """ ChatRAG that uses Retrieval Augmented Generation model for large language model with the langchain Methods ------- model(): Creates llm from openai. Uses the model gpt-3.5-turbo-0125 with temperature=0 Creates retriever from vectorstore Defines contextualize_q_prompt to use it in history_aware_retriever where llm, retriever and contextualize_q_prompt is combined Defines qa_prompt (question/answer) to use it in create_stuff_documents_chain where llm and qa_prompt is combined for question_answer_chain Defines rag chain by combining history_aware_retriever and question_answer_chain get_session_history(session_id): Stores chat history and session_id in a dictionary conversational_rag_chain(input): Creates conversational rag chain and invokes the ai response """ def __init__(self, document): """ Initilization of document and store to store the chat history Parameters ---------- document: list Document from langchain.schema inside a list """ self.document = document self.database_manager = DatabaseManagement() self.vectorstore_manager = VectorStoreManagement(self.document) self.store = {} # RAG model def model(self): """ Creates llm from openai. Uses the model gpt-3.5-turbo-0125 with temperature=0 Creates retriever from vectorstore Defines contextualize_q_prompt to use it in history_aware_retriever where llm, retriever and contextualize_q_prompt is combined Defines qa_prompt (question/answer) to use it in create_stuff_documents_chain where llm and qa_prompt is combined for question_answer_chain Defines rag chain by combining history_aware_retriever and question_answer_chain """ # Create llm from chatopenai llm = ChatOpenAI(model="gpt-3.5-turbo-0125", temperature=0) # Create retriever. Its function is to return relevant documents from documents with respect to similarity search and user input. retriever = self.vectorstore_manager.vectorstore.as_retriever(search_type="similarity", search_kwargs={"k": 6}) # System prompt that tells the language model on how to handle the latest user query in the context of the entire conversation history # It tells the model to take the chat history and the latest user question and rephrase the question so it can be understood independently # of the history contextualize_q_system_prompt = """Given a chat history and the latest user question \ which might reference context in the chat history, formulate a standalone question \ which can be understood without the chat history. Do NOT answer the question, \ just reformulate it if needed and otherwise return it as is.""" # Create customized Chat Prompt Template with a customized system prompt contextualize_q_prompt = ChatPromptTemplate.from_messages([ ("system", contextualize_q_system_prompt), MessagesPlaceholder("chat_history"), ("human", "{input}"),]) # Create history aware retriever. It combines current user query with the chat history so that # ai response is relevant to the previous question/answer history_aware_retriever = create_history_aware_retriever(llm, retriever, contextualize_q_prompt) # Create custom question/answer prompt qa_system_prompt = """You are an assistant for question-answering tasks. \ Use the following pieces of retrieved context to answer the question. \ If you don't know the answer, just say that you don't know. \ Use three sentences maximum and keep the answer concise. \ {context}""" # Create custom question answer Chat Prompt qa_prompt = ChatPromptTemplate.from_messages([ ("system", qa_system_prompt), MessagesPlaceholder("chat_history"), ("human", "{input}"),]) # Create question/answer chain. It combines llm and qa_prompt. # It uses llm and retrieved context to asnwer question. question_answer_chain = create_stuff_documents_chain(llm, qa_prompt) # RAG chain that combines the history aware retriever and question/answer chain # It makes sure that that retrieved documents are related to the chat history and user query self.rag_chain = create_retrieval_chain(history_aware_retriever, question_answer_chain) # Method/function to store chat history def get_session_history(self, session_id: str) -> BaseChatMessageHistory: """ Stores chat history and session_id in a dictionary Parameters ---------- session_id: str session_id in string format Returns ------- store: dict Dictionary that has key: session_id and value: chat history """ if session_id not in self.store: self.store[session_id] = ChatMessageHistory() return self.store[session_id] #Create conversational RAG chain def conversational_rag_chain(self, input): """ Creates conversational rag chain and invokes it Parameters ---------- input: str User's query Returns ------- str AI response """ conversational_rag_chain = RunnableWithMessageHistory( self.rag_chain, self.get_session_history, input_messages_key="input", history_messages_key="chat_history", output_messages_key="answer") result = conversational_rag_chain.invoke({"input": str(input)}, config={"configurable": {"session_id": "6161"}}) l = [] for doc in result["context"]: l.append(doc.metadata["pdf_url"]) return result["answer"], l