LCELRag / app.py
jeevan
refactoring for azure and langsmith
eb58fc5
raw
history blame contribute delete
No virus
5.85 kB
import os
from typing import List
from operator import itemgetter
from Chunking import ChunkingStrategy, TextLoaderAndSplitterWrapper
from langchain.schema.runnable import RunnablePassthrough
from langchain_openai import ChatOpenAI
from langchain_openai.embeddings import OpenAIEmbeddings
from langchain_core.prompts import ChatPromptTemplate
from langchain_community.vectorstores import Qdrant
import chainlit as cl
from chainlit.types import AskFileResponse
from chainlit.cli import run_chainlit
from uuid import uuid4
import tempfile
OPENAI_API_KEY = os.environ["OPENAI_API_KEY"]
GPT_MODEL = "gpt-4o-mini"
# Used for Langsmith
unique_id = uuid4().hex[0:8]
os.environ["LANGCHAIN_TRACING_V2"] = "true"
if os.environ.get("LANGCHAIN_PROJECT") is None:
os.environ["LANGCHAIN_PROJECT"] = f"LangSmith LCEL RAG - {unique_id}"
is_azure = False if os.environ.get("AZURE_DEPLOYMENT") is None else True
is_azure_qdrant_inmem = True if os.environ.get("AZURE_QDRANT_INMEM") else False
# Utility functions
def save_file(file: AskFileResponse,file_ext:str,is_azure:bool) -> str:
if file_ext == "application/pdf":
file_ext = ".pdf"
elif file_ext == "text/plain":
file_ext = ".txt"
else:
raise ValueError(f"Unknown file type: {file_ext}")
dir = "/tmp" if is_azure_qdrant_inmem else None
with tempfile.NamedTemporaryFile(
mode="wb", delete=False, suffix=file_ext,dir=dir
) as temp_file:
temp_file_path = temp_file.name
temp_file.write(file.content)
return temp_file_path
def setup_vectorstore(documents: List[str], embedding_model: OpenAIEmbeddings,is_azure:bool) -> Qdrant:
if is_azure:
if is_azure_qdrant_inmem:
qdrant_vectorstore = Qdrant.from_documents(
documents=documents,
embedding=embedding_model,
location=":memory:"
)
else:
qdrant_vectorstore = Qdrant.from_documents(
documents=documents,
embedding=embedding_model,
url="http://qdrant:6333", # Docker compose setup
)
else:
qdrant_vectorstore = Qdrant.from_documents(
documents=documents,
embedding=embedding_model,
location=":memory:"
)
return qdrant_vectorstore
# Prepare the components that will form the chain
## Step 1: Create a prompt template
base_rag_prompt_template = """\
You are a helpful assistant that can answer questions related to the provided context. Repond I don't have that information if outside context.
Context:
{context}
Question:
{question}
"""
base_rag_prompt = ChatPromptTemplate.from_template(base_rag_prompt_template)
## Step 2: Create Embeddings model instance for creating embeddings
embedding_model = OpenAIEmbeddings(model="text-embedding-3-small")
## Step 2: Create the OpenAI chat model
base_llm = ChatOpenAI(model="gpt-4o-mini", tags=["base_llm"])
@cl.on_chat_start
async def on_chat_start():
msg = cl.Message(content="Welcome to the Chat with Files app powered by LCEL and OpenAI - RAG!")
await msg.send()
files = None
documents = None
# Wait for the user to upload a file
while files == None:
files = await cl.AskFileMessage(
content="Please upload a text or a pdf file to begin!",
accept=["text/plain", "application/pdf"],
max_size_mb=10,
max_files=1,
timeout=180,
).send()
## Load file and split into chunks
await cl.Message(content=f"Processing `{files[0].name}`...").send()
current_file_path = save_file(files[0], files[0].type,is_azure)
loader_splitter = TextLoaderAndSplitterWrapper(ChunkingStrategy.RECURSIVE_CHARACTER_CHAR_SPLITTER, current_file_path)
documents = loader_splitter.load_documents()
await cl.Message(content=" Data Chunked...").send()
## Vectorising the documents
qdrant_vectorstore = setup_vectorstore(documents, embedding_model,is_azure)
qdrant_retriever = qdrant_vectorstore.as_retriever()
await cl.Message(content=" Created Vector store").send()
# create the chain on new chart session
retrieval_augmented_qa_chain = (
# INVOKE CHAIN WITH: {"question" : "<<SOME USER QUESTION>>"}
# "question" : populated by getting the value of the "question" key
# "context" : populated by getting the value of the "question" key and chaining it into the base_retriever
{"context": itemgetter("question") | qdrant_retriever, "question": itemgetter("question")}
# "context" : is assigned to a RunnablePassthrough object (will not be called or considered in the next step)
# by getting the value of the "context" key from the previous step
| RunnablePassthrough.assign(context=itemgetter("context"))
# "response" : the "context" and "question" values are used to format our prompt object and then piped
# into the LLM and stored in a key called "response"
# "context" : populated by getting the value of the "context" key from the previous step
| {"response": base_rag_prompt | base_llm, "context": itemgetter("context")}
)
# Let the user know that the system is ready
msg = cl.Message(content=f"Processing `{files[0].name}` done. You can now ask questions!")
await msg.send()
cl.user_session.set("chain", retrieval_augmented_qa_chain)
@cl.on_message
async def main(message: cl.Message):
chain = cl.user_session.get("chain")
msg = cl.Message(content="")
response = chain.invoke({"question": message.content}, {"tags" : ["Demo Run"]})
msg.content= response["response"].content
await msg.send()
cl.user_session.set("chain", chain)
if __name__ == "__main__":
run_chainlit(__file__)