import os from typing import List from operator import itemgetter from Chunking import ChunkingStrategy, TextLoaderAndSplitterWrapper from langchain.schema.runnable import RunnablePassthrough from langchain_openai import ChatOpenAI from langchain_openai.embeddings import OpenAIEmbeddings from langchain_core.prompts import ChatPromptTemplate from langchain_community.vectorstores import Qdrant import chainlit as cl from chainlit.types import AskFileResponse from chainlit.cli import run_chainlit from uuid import uuid4 import tempfile OPENAI_API_KEY = os.environ["OPENAI_API_KEY"] GPT_MODEL = "gpt-4o-mini" # Used for Langsmith unique_id = uuid4().hex[0:8] os.environ["LANGCHAIN_TRACING_V2"] = "true" if os.environ.get("LANGCHAIN_PROJECT") is None: os.environ["LANGCHAIN_PROJECT"] = f"LangSmith LCEL RAG - {unique_id}" is_azure = False if os.environ.get("AZURE_DEPLOYMENT") is None else True is_azure_qdrant_inmem = True if os.environ.get("AZURE_QDRANT_INMEM") else False # Utility functions def save_file(file: AskFileResponse,file_ext:str,is_azure:bool) -> str: if file_ext == "application/pdf": file_ext = ".pdf" elif file_ext == "text/plain": file_ext = ".txt" else: raise ValueError(f"Unknown file type: {file_ext}") dir = "/tmp" if is_azure_qdrant_inmem else None with tempfile.NamedTemporaryFile( mode="wb", delete=False, suffix=file_ext,dir=dir ) as temp_file: temp_file_path = temp_file.name temp_file.write(file.content) return temp_file_path def setup_vectorstore(documents: List[str], embedding_model: OpenAIEmbeddings,is_azure:bool) -> Qdrant: if is_azure: if is_azure_qdrant_inmem: qdrant_vectorstore = Qdrant.from_documents( documents=documents, embedding=embedding_model, location=":memory:" ) else: qdrant_vectorstore = Qdrant.from_documents( documents=documents, embedding=embedding_model, url="http://qdrant:6333", # Docker compose setup ) else: qdrant_vectorstore = Qdrant.from_documents( documents=documents, embedding=embedding_model, location=":memory:" ) return qdrant_vectorstore # Prepare the components that will form the chain ## Step 1: Create a prompt template base_rag_prompt_template = """\ You are a helpful assistant that can answer questions related to the provided context. Repond I don't have that information if outside context. Context: {context} Question: {question} """ base_rag_prompt = ChatPromptTemplate.from_template(base_rag_prompt_template) ## Step 2: Create Embeddings model instance for creating embeddings embedding_model = OpenAIEmbeddings(model="text-embedding-3-small") ## Step 2: Create the OpenAI chat model base_llm = ChatOpenAI(model="gpt-4o-mini", tags=["base_llm"]) @cl.on_chat_start async def on_chat_start(): msg = cl.Message(content="Welcome to the Chat with Files app powered by LCEL and OpenAI - RAG!") await msg.send() files = None documents = None # Wait for the user to upload a file while files == None: files = await cl.AskFileMessage( content="Please upload a text or a pdf file to begin!", accept=["text/plain", "application/pdf"], max_size_mb=10, max_files=1, timeout=180, ).send() ## Load file and split into chunks await cl.Message(content=f"Processing `{files[0].name}`...").send() current_file_path = save_file(files[0], files[0].type,is_azure) loader_splitter = TextLoaderAndSplitterWrapper(ChunkingStrategy.RECURSIVE_CHARACTER_CHAR_SPLITTER, current_file_path) documents = loader_splitter.load_documents() await cl.Message(content=" Data Chunked...").send() ## Vectorising the documents qdrant_vectorstore = setup_vectorstore(documents, embedding_model,is_azure) qdrant_retriever = qdrant_vectorstore.as_retriever() await cl.Message(content=" Created Vector store").send() # create the chain on new chart session retrieval_augmented_qa_chain = ( # INVOKE CHAIN WITH: {"question" : "<>"} # "question" : populated by getting the value of the "question" key # "context" : populated by getting the value of the "question" key and chaining it into the base_retriever {"context": itemgetter("question") | qdrant_retriever, "question": itemgetter("question")} # "context" : is assigned to a RunnablePassthrough object (will not be called or considered in the next step) # by getting the value of the "context" key from the previous step | RunnablePassthrough.assign(context=itemgetter("context")) # "response" : the "context" and "question" values are used to format our prompt object and then piped # into the LLM and stored in a key called "response" # "context" : populated by getting the value of the "context" key from the previous step | {"response": base_rag_prompt | base_llm, "context": itemgetter("context")} ) # Let the user know that the system is ready msg = cl.Message(content=f"Processing `{files[0].name}` done. You can now ask questions!") await msg.send() cl.user_session.set("chain", retrieval_augmented_qa_chain) @cl.on_message async def main(message: cl.Message): chain = cl.user_session.get("chain") msg = cl.Message(content="") response = chain.invoke({"question": message.content}, {"tags" : ["Demo Run"]}) msg.content= response["response"].content await msg.send() cl.user_session.set("chain", chain) if __name__ == "__main__": run_chainlit(__file__)