Spaces:
Sleeping
Sleeping
import os | |
from typing import List | |
from langchain.embeddings.openai import OpenAIEmbeddings | |
from langchain.text_splitter import RecursiveCharacterTextSplitter | |
from langchain.vectorstores import Chroma | |
from langchain.chains import ( | |
ConversationalRetrievalChain, | |
) | |
from langchain.chat_models import ChatOpenAI | |
from langchain.docstore.document import Document | |
from langchain.memory import ChatMessageHistory, ConversationBufferMemory | |
import chainlit as cl | |
os.environ["OPENAI_API_KEY"] = os.getenv("OPENAI_API_KEY") | |
text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=100) | |
async def on_chat_start(): | |
files = None | |
# Wait for the user to upload a file | |
while files == None: | |
files = await cl.AskFileMessage( | |
content="Please upload a text file to begin!", | |
accept=["text/plain"], | |
max_size_mb=20, | |
timeout=180, | |
).send() | |
file = files[0] | |
msg = cl.Message(content=f"Processing `{file.name}`...") | |
await msg.send() | |
with open(file.path, "r", encoding="utf-8") as f: | |
text = f.read() | |
# Split the text into chunks | |
texts = text_splitter.split_text(text) | |
# Create a metadata for each chunk | |
metadatas = [{"source": f"{i}-pl"} for i in range(len(texts))] | |
# Create a Chroma vector store | |
embeddings = OpenAIEmbeddings() | |
docsearch = await cl.make_async(Chroma.from_texts)( | |
texts, embeddings, metadatas=metadatas | |
) | |
message_history = ChatMessageHistory() | |
memory = ConversationBufferMemory( | |
memory_key="chat_history", | |
output_key="answer", | |
chat_memory=message_history, | |
return_messages=True, | |
) | |
# Create a chain that uses the Chroma vector store | |
chain = ConversationalRetrievalChain.from_llm( | |
ChatOpenAI(model_name="gpt-3.5-turbo", temperature=0, streaming=True), | |
chain_type="stuff", | |
retriever=docsearch.as_retriever(), | |
memory=memory, | |
return_source_documents=True, | |
) | |
# Let the user know that the system is ready | |
msg.content = f"Processing `{file.name}` done. You can now ask questions!" | |
await msg.update() | |
cl.user_session.set("chain", chain) | |
async def main(message: cl.Message): | |
chain = cl.user_session.get("chain") # type: ConversationalRetrievalChain | |
cb = cl.AsyncLangchainCallbackHandler() | |
res = await chain.acall(message.content, callbacks=[cb]) | |
answer = res["answer"] | |
source_documents = res["source_documents"] # type: List[Document] | |
text_elements = [] # type: List[cl.Text] | |
if source_documents: | |
for source_idx, source_doc in enumerate(source_documents): | |
source_name = f"source_{source_idx}" | |
# Create the text element referenced in the message | |
text_elements.append( | |
cl.Text(content=source_doc.page_content, name=source_name, display="side") | |
) | |
source_names = [text_el.name for text_el in text_elements] | |
if source_names: | |
answer += f"\nSources: {', '.join(source_names)}" | |
else: | |
answer += "\nNo sources found" | |
await cl.Message(content=answer, elements=text_elements).send() | |