Spaces:
Sleeping
Sleeping
# app.py | |
from typing import List, Union, Optional | |
from dotenv import load_dotenv, find_dotenv | |
from langchain.callbacks import get_openai_callback | |
from langchain.chat_models import ChatOpenAI | |
from langchain.embeddings.openai import OpenAIEmbeddings | |
from langchain.schema import (SystemMessage, HumanMessage, AIMessage) | |
from langchain.llms import LlamaCpp | |
from langchain.embeddings import LlamaCppEmbeddings | |
from langchain.callbacks.manager import CallbackManager | |
from langchain.callbacks.streaming_stdout import StreamingStdOutCallbackHandler | |
from langchain.text_splitter import TokenTextSplitter | |
from langchain.prompts import PromptTemplate | |
from langchain.vectorstores import Qdrant | |
from PyPDF2 import PdfReader | |
import streamlit as st | |
from ctransformers import AutoModelForCausalLM | |
from dl_hf_model import dl_hf_model | |
from loguru import logger | |
PROMPT_TEMPLATE = """ | |
Use the following pieces of context enclosed by triple backquotes to answer the question at the end. | |
\n\n | |
Context: | |
``` | |
{context} | |
``` | |
\n\n | |
Question: [][][][]{question}[][][][] | |
\n | |
Answer:""" | |
def init_page() -> None: | |
st.set_page_config( | |
page_title="Personal ChatGPT" | |
) | |
st.sidebar.title("Options") | |
def init_messages() -> None: | |
clear_button = st.sidebar.button("Clear Conversation", key="clear") | |
if clear_button or "messages" not in st.session_state: | |
st.session_state.messages = [ | |
SystemMessage( | |
content=( | |
"You are a helpful AI QA assistant. " | |
"When answering questions, use the context enclosed by triple backquotes if it is relevant. " | |
"If you don't know the answer, just say that you don't know, " | |
"don't try to make up an answer. " | |
"Reply your answer in mardkown format.") | |
) | |
] | |
st.session_state.costs = [] | |
def get_pdf_text() -> Optional[str]: | |
""" | |
Function to load PDF text and split it into chunks. | |
""" | |
st.header("Document Upload") | |
uploaded_file = st.file_uploader( | |
label="Here, upload your PDF file you want ChatGPT to use to answer", | |
type="pdf" | |
) | |
if uploaded_file: | |
pdf_reader = PdfReader(uploaded_file) | |
text = "\n\n".join([page.extract_text() for page in pdf_reader.pages]) | |
text_splitter = TokenTextSplitter(chunk_size=100, chunk_overlap=0) | |
return text_splitter.split_text(text) | |
else: | |
return None | |
def build_vectore_store( | |
texts: str, embeddings: Union[OpenAIEmbeddings, LlamaCppEmbeddings]) \ | |
-> Optional[Qdrant]: | |
""" | |
Store the embedding vectors of text chunks into vector store (Qdrant). | |
""" | |
if texts: | |
with st.spinner("Loading PDF ..."): | |
qdrant = Qdrant.from_texts( | |
texts, | |
embeddings, | |
path=":memory:", | |
collection_name="my_collection", | |
force_recreate=True | |
) | |
st.success("File Loaded Successfully!!") | |
else: | |
qdrant = None | |
return qdrant | |
def select_llm() -> Union[ChatOpenAI, LlamaCpp]: | |
""" | |
Read user selection of parameters in Streamlit sidebar. | |
""" | |
model_name = st.sidebar.radio("Choose LLM:", | |
("llama-2-7b-chat.ggmlv3.q2_K",#"Wizard-Vicuna-7B-Uncensored.ggmlv3.q4_K_M" | |
"gpt-3.5-turbo-0613", | |
"gpt-3.5-turbo-16k-0613", | |
"gpt-4")) | |
temperature = st.sidebar.slider("Temperature:", min_value=0.0, | |
max_value=1.0, value=0.0, step=0.01) | |
return model_name, temperature | |
def load_llm(model_name: str, temperature: float) -> Union[ChatOpenAI, LlamaCpp]: | |
""" | |
Load LLM. | |
""" | |
if model_name.startswith("gpt-"): | |
return ChatOpenAI(temperature=temperature, model_name=model_name) | |
elif model_name.startswith("llama-2-"): | |
callback_manager = CallbackManager([StreamingStdOutCallbackHandler()]) | |
return LlamaCpp( | |
model_path=f"./models/{model_name}.bin", | |
input={"temperature": temperature, | |
"max_length": 2048, | |
"top_p": 1 | |
}, | |
n_ctx=2048, | |
callback_manager=callback_manager, | |
verbose=False, # True | |
) | |
def load_embeddings(model_name: str) -> Union[OpenAIEmbeddings, LlamaCppEmbeddings]: | |
""" | |
Load embedding model. | |
""" | |
if model_name.startswith("gpt-"): | |
return OpenAIEmbeddings() | |
elif model_name.startswith("llama-2-"): | |
return LlamaCppEmbeddings(model_path=f"./models/{model_name}.bin") | |
def get_answer(llm, messages) -> tuple[str, float]: | |
""" | |
Get the AI answer to user questions. | |
""" | |
if isinstance(llm, ChatOpenAI): | |
with get_openai_callback() as cb: | |
answer = llm(messages) | |
return answer.content, cb.total_cost | |
if isinstance(llm, LlamaCpp): | |
return llm(llama_v2_prompt(convert_langchainschema_to_dict(messages))), 0.0 | |
def find_role(message: Union[SystemMessage, HumanMessage, AIMessage]) -> str: | |
""" | |
Identify role name from langchain.schema object. | |
""" | |
if isinstance(message, SystemMessage): | |
return "system" | |
if isinstance(message, HumanMessage): | |
return "user" | |
if isinstance(message, AIMessage): | |
return "assistant" | |
raise TypeError("Unknown message type.") | |
def convert_langchainschema_to_dict( | |
messages: List[Union[SystemMessage, HumanMessage, AIMessage]]) \ | |
-> List[dict]: | |
""" | |
Convert the chain of chat messages in list of langchain.schema format to | |
list of dictionary format. | |
""" | |
return [{"role": find_role(message), | |
"content": message.content | |
} for message in messages] | |
def llama_v2_prompt(messages: List[dict]) -> str: | |
""" | |
Convert the messages in list of dictionary format to Llama2 compliant | |
format. | |
""" | |
B_INST, E_INST = "[INST]", "[/INST]" | |
B_SYS, E_SYS = "<<SYS>>\n", "\n<</SYS>>\n\n" | |
BOS, EOS = "<s>", "</s>" | |
DEFAULT_SYSTEM_PROMPT = f"""You are a helpful, respectful and honest assistant. Always answer as helpfully as possible, while being safe. Please ensure that your responses are socially unbiased and positive in nature. If a question does not make any sense, or is not factually coherent, explain why instead of answering something not correct. If you don't know the answer to a question, please don't share false information.""" | |
if messages[0]["role"] != "system": | |
messages = [ | |
{ | |
"role": "system", | |
"content": DEFAULT_SYSTEM_PROMPT, | |
} | |
] + messages | |
messages = [ | |
{ | |
"role": messages[1]["role"], | |
"content": B_SYS + messages[0]["content"] + E_SYS + messages[1]["content"], | |
} | |
] + messages[2:] | |
messages_list = [ | |
f"{BOS}{B_INST} {(prompt['content']).strip()} {E_INST} {(answer['content']).strip()} {EOS}" | |
for prompt, answer in zip(messages[::2], messages[1::2]) | |
] | |
messages_list.append( | |
f"{BOS}{B_INST} {(messages[-1]['content']).strip()} {E_INST}") | |
return "".join(messages_list) | |
def extract_userquesion_part_only(content): | |
""" | |
Function to extract only the user question part from the entire question | |
content combining user question and pdf context. | |
""" | |
content_split = content.split("[][][][]") | |
if len(content_split) == 3: | |
return content_split[1] | |
return content | |
def main() -> None: | |
_ = load_dotenv(find_dotenv()) | |
init_page() | |
model_name, temperature = select_llm() | |
#llm = load_llm(model_name, temperature) | |
url = "https://huggingface.co/TheBloke/Llama-2-7B-Chat-GGML/blob/main/llama-2-7b-chat.ggmlv3.q2_K.bin" | |
#url = "https://huggingface.co/TheBloke/Wizard-Vicuna-7B-Uncensored-GGML/raw/main/Wizard-Vicuna-7B-Uncensored.ggmlv3.q4_K_M.bin" | |
try: | |
model_loc, file_size = dl_hf_model(url) | |
except Exception as exc_: | |
logger.error(exc_) | |
raise SystemExit(1) from exc_ | |
llm = AutoModelForCausalLM.from_pretrained( | |
model_loc, | |
model_type="llama", | |
# threads=cpu_count, | |
) | |
logger.info(f"done load llm {model_loc=} {file_size=}G") | |
embeddings = load_embeddings(model_name) | |
texts = get_pdf_text() | |
qdrant = build_vectore_store(texts, embeddings) | |
init_messages() | |
st.header("Personal ChatGPT") | |
# Supervise user input | |
if user_input := st.chat_input("Input your question!"): | |
if qdrant: | |
context = [c.page_content for c in qdrant.similarity_search( | |
user_input, k=10)] | |
user_input_w_context = PromptTemplate( | |
template=PROMPT_TEMPLATE, | |
input_variables=["context", "question"]) \ | |
.format( | |
context=context, question=user_input) | |
else: | |
user_input_w_context = user_input | |
st.session_state.messages.append( | |
HumanMessage(content=user_input_w_context)) | |
with st.spinner("ChatGPT is typing ..."): | |
answer, cost = get_answer(llm, st.session_state.messages) | |
st.session_state.messages.append(AIMessage(content=answer)) | |
st.session_state.costs.append(cost) | |
# Display chat history | |
messages = st.session_state.get("messages", []) | |
for message in messages: | |
if isinstance(message, AIMessage): | |
with st.chat_message("assistant"): | |
st.markdown(message.content) | |
elif isinstance(message, HumanMessage): | |
with st.chat_message("user"): | |
st.markdown(extract_userquesion_part_only(message.content)) | |
costs = st.session_state.get("costs", []) | |
st.sidebar.markdown("## Costs") | |
st.sidebar.markdown(f"**Total cost: ${sum(costs):.5f}**") | |
for cost in costs: | |
st.sidebar.markdown(f"- ${cost:.5f}") | |
# streamlit run app.py | |
if __name__ == "__main__": | |
main() |