Spaces:
Running
Running
import streamlit as st | |
from streamlit_feedback import streamlit_feedback | |
import os | |
import pandas as pd | |
import base64 | |
from io import BytesIO | |
import sqlite3 | |
import uuid | |
import yaml | |
import chromadb | |
from llama_index.core import ( | |
VectorStoreIndex, | |
SimpleDirectoryReader, | |
StorageContext, | |
Document | |
) | |
from llama_index.vector_stores.chroma.base import ChromaVectorStore | |
from llama_index.embeddings.huggingface.base import HuggingFaceEmbedding | |
from llama_index.llms.openai import OpenAI | |
from llama_index.core.memory import ChatMemoryBuffer | |
from llama_index.core.tools import QueryEngineTool | |
from llama_index.agent.openai import OpenAIAgent | |
from llama_index.core import Settings | |
from vision_api import get_transcribed_text | |
from qna_prompting import get_qna_question_tool, evaluate_qna_answer_tool | |
from prompt_engineering import ( | |
system_content, | |
textbook_content, | |
winnie_the_pooh_prompt, | |
introduction_line | |
) | |
import nest_asyncio | |
nest_asyncio.apply() | |
# App title | |
st.set_page_config(page_title="π»π Study Bear π―") | |
openai_api = os.getenv("OPENAI_API_KEY") | |
with open("./config/model_config_advanced.yml", "r") as file_reader: | |
model_config = yaml.safe_load(file_reader) | |
input_files = model_config["input_data"]["source"] | |
embedding_model = model_config["embeddings"]["embedding_base_model"] | |
fine_tuned_path = model_config["embeddings"]["fine_tuned_embedding_model"] | |
persisted_vector_db = model_config["vector_store"]["persisted_path"] | |
questionaire_db_path = model_config["questionaire_data"]["db_path"] | |
data_df = pd.DataFrame( | |
{ | |
"Completion": [30, 40, 100, 10], | |
} | |
) | |
data_df.index = ["Chapter 1", "Chapter 2", "Chapter 3", "Chapter 4"] | |
bear_img_path = "./resource/disney-cuties-little-winnie-the-pooh-emoticon.png" | |
piglet_img_path = "./resource/disney-cuties-piglet-emoticon.png" | |
# Replicate Credentials | |
with st.sidebar: | |
st.title("π―π Study Bear π»π") | |
st.write("Just like Pooh needs honey, success requires hard work β no shortcuts allowed!") | |
wtp_mode = st.toggle('Winnie-the-Pooh mode', value=False) | |
if wtp_mode: | |
system_content = system_content + winnie_the_pooh_prompt | |
textbook_content = system_content + textbook_content | |
if openai_api: | |
pass | |
elif "OPENAI_API_KEY" in st.secrets: | |
st.success("API key already provided!", icon="β ") | |
openai_api = st.secrets["OPENAI_API_KEY"] | |
else: | |
openai_api = st.text_input("Enter OpenAI API token:", type="password") | |
if not (openai_api.startswith("sk-") and len(openai_api)==51): | |
st.warning("Please enter your credentials!", icon="β οΈ") | |
else: | |
st.success("Proceed to entering your prompt message!", icon="π") | |
### for streamlit purpose | |
os.environ["OPENAI_API_KEY"] = openai_api | |
st.subheader("Models and parameters") | |
selected_model = st.sidebar.selectbox(label="Choose an OpenAI model", | |
options=["gpt-3.5-turbo-0125", "gpt-4-0125-preview"], | |
index=1, | |
key="selected_model") | |
temperature = st.sidebar.slider("temperature", min_value=0.0, max_value=2.0, | |
value=0.0, step=0.01) | |
st.data_editor( | |
data_df, | |
column_config={ | |
"Completion": st.column_config.ProgressColumn( | |
"Completion %", | |
help="Percentage of content covered", | |
format="%.1f%%", | |
min_value=0, | |
max_value=100, | |
), | |
}, | |
hide_index=False, | |
) | |
st.markdown("π Reach out to SakiMilo to learn how to create this app!") | |
if "init" not in st.session_state.keys(): | |
st.session_state.init = {"warm_started": "No"} | |
st.session_state.feedback = False | |
if "image_prompt" not in st.session_state.keys(): | |
st.session_state.image_prompt = False | |
# Store LLM generated responses | |
if "messages" not in st.session_state.keys(): | |
st.session_state.messages = [{"role": "assistant", | |
"content": introduction_line, | |
"type": "text"}] | |
if "feedback_key" not in st.session_state: | |
st.session_state.feedback_key = 0 | |
if "release_file" not in st.session_state: | |
st.session_state.release_file = "false" | |
if "question_id" not in st.session_state: | |
st.session_state.question_id = None | |
if "qna_answer" not in st.session_state: | |
st.session_state.qna_answer = None | |
if "reasons" not in st.session_state: | |
st.session_state.reasons = None | |
if "user_id" not in st.session_state: | |
st.session_state.user_id = str(uuid.uuid4()) | |
def clear_chat_history(): | |
st.session_state.messages = [{"role": "assistant", | |
"content": introduction_line, | |
"type": "text"}] | |
chat_engine = get_query_engine(input_files=input_files, | |
llm_model=selected_model, | |
temperature=temperature, | |
embedding_model=embedding_model, | |
fine_tuned_path=fine_tuned_path, | |
system_content=system_content, | |
persisted_vector_db=persisted_vector_db) | |
chat_engine.reset() | |
st.toast("yumyum, what was I saying again? π»π¬", icon="π―") | |
def clear_question_history(user_id): | |
con = sqlite3.connect(questionaire_db_path) | |
cur = con.cursor() | |
sql_string = f""" | |
DELETE FROM answer_tbl | |
WHERE user_id='{user_id}' | |
""" | |
res = cur.execute(sql_string) | |
con.commit() | |
con.close() | |
st.toast("the tale of one thousand and one questions, reset! π§¨π§¨", icon="π") | |
st.sidebar.button("Clear Chat History", on_click=clear_chat_history) | |
st.sidebar.button("Clear Question History", | |
on_click=clear_question_history, | |
kwargs={"user_id": st.session_state.user_id}) | |
if st.sidebar.button("I want to submit a feedback!"): | |
st.session_state.feedback = True | |
st.session_state.feedback_key += 1 # overwrite feedback component | |
def get_document_object(input_files): | |
documents = SimpleDirectoryReader(input_files=input_files).load_data() | |
document = Document(text="\n\n".join([doc.text for doc in documents])) | |
return document | |
def get_llm_object(selected_model, temperature): | |
llm = OpenAI(model=selected_model, temperature=temperature) | |
return llm | |
def get_embedding_model(model_name, fine_tuned_path=None): | |
if fine_tuned_path is None: | |
print(f"loading from `{model_name}` from huggingface") | |
embed_model = HuggingFaceEmbedding(model_name=model_name) | |
else: | |
print(f"loading from local `{fine_tuned_path}`") | |
embed_model = fine_tuned_path | |
return embed_model | |
def get_query_engine(input_files, llm_model, temperature, | |
embedding_model, fine_tuned_path, | |
system_content, persisted_vector_db): | |
llm = get_llm_object(llm_model, temperature) | |
embedded_model = get_embedding_model( | |
model_name=embedding_model, | |
fine_tuned_path=fine_tuned_path | |
) | |
Settings.llm = llm | |
Settings.chunk_size = 1024 | |
Settings.embed_model = embedded_model | |
if os.path.exists(persisted_vector_db): | |
print("loading from vector database - chroma") | |
db = chromadb.PersistentClient(path=persisted_vector_db) | |
chroma_collection = db.get_or_create_collection("quickstart") | |
vector_store = ChromaVectorStore(chroma_collection=chroma_collection) | |
storage_context = StorageContext.from_defaults(vector_store=vector_store) | |
index = VectorStoreIndex.from_vector_store( | |
vector_store=vector_store, | |
storage_context=storage_context | |
) | |
else: | |
print("create new chroma vector database..") | |
documents = SimpleDirectoryReader(input_files=input_files).load_data() | |
db = chromadb.PersistentClient(path=persisted_vector_db) | |
chroma_collection = db.get_or_create_collection("quickstart") | |
vector_store = ChromaVectorStore(chroma_collection=chroma_collection) | |
nodes = Settings.node_parser.get_nodes_from_documents(documents) | |
storage_context = StorageContext.from_defaults(vector_store=vector_store) | |
storage_context.docstore.add_documents(nodes) | |
index = VectorStoreIndex(nodes, storage_context=storage_context) | |
memory = ChatMemoryBuffer.from_defaults(token_limit=100_000) | |
hi_content_engine = index.as_query_engine( | |
memory=memory, | |
system_prompt=system_content, | |
similarity_top_k=10, | |
verbose=True, | |
streaming=True | |
) | |
hi_textbook_query_description = """ | |
Use this tool to extract content from the query engine, | |
which is built by ingesting textbook content from `Health Insurance 7th Edition`, | |
that has 15 chapters in total. When user wants to learn more about a | |
particular chapter, this tool will help to assist user to get better | |
understanding of the content of the textbook. | |
""" | |
hi_query_tool = QueryEngineTool.from_defaults( | |
query_engine=hi_content_engine, | |
name="health_insurance_textbook_query_engine", | |
description=hi_textbook_query_description | |
) | |
agent = OpenAIAgent.from_tools(tools=[ | |
hi_query_tool, | |
get_qna_question_tool, | |
evaluate_qna_answer_tool | |
], | |
max_function_calls=1, | |
llm=llm, | |
verbose=True, | |
system_prompt=textbook_content) | |
print("loaded AI agent, let's begin the chat!") | |
print("="*50) | |
print("") | |
return agent | |
def generate_llm_response(prompt_input, tool_choice="auto"): | |
chat_agent = get_query_engine(input_files=input_files, | |
llm_model=selected_model, | |
temperature=temperature, | |
embedding_model=embedding_model, | |
fine_tuned_path=fine_tuned_path, | |
system_content=system_content, | |
persisted_vector_db=persisted_vector_db) | |
# st.session_state.messages | |
response = chat_agent.stream_chat(prompt_input, tool_choice=tool_choice) | |
return response | |
def handle_feedback(user_response): | |
st.toast("βοΈ Feedback received!") | |
st.session_state.feedback = False | |
def handle_image_upload(): | |
st.session_state.release_file = "true" | |
# Warm start | |
if st.session_state.init["warm_started"] == "No": | |
clear_chat_history() | |
st.session_state.init["warm_started"] = "Yes" | |
# Image upload option | |
with st.sidebar: | |
image_file = st.file_uploader("Upload your image here...", | |
type=["png", "jpeg", "jpg"], | |
on_change=handle_image_upload) | |
if st.session_state.release_file == "true" and image_file: | |
with st.spinner("Uploading..."): | |
b64string = base64.b64encode(image_file.read()).decode('utf-8') | |
message = { | |
"role": "user", | |
"content": b64string, | |
"type": "image"} | |
st.session_state.messages.append(message) | |
transcribed_msg = get_transcribed_text(b64string) | |
message = { | |
"role": "admin", | |
"content": transcribed_msg, | |
"type": "text"} | |
st.session_state.messages.append(message) | |
st.session_state.release_file = "false" | |
# Display or clear chat messages | |
for message in st.session_state.messages: | |
if message["role"] == "admin": | |
continue | |
elif message["role"] == "user": | |
avatar = piglet_img_path | |
elif message["role"] == "assistant": | |
avatar = bear_img_path | |
with st.chat_message(message["role"], avatar=avatar): | |
if message["type"] == "text": | |
st.write(message["content"]) | |
elif message["type"] == "image": | |
img_io = BytesIO(base64.b64decode(message["content"].encode("utf-8"))) | |
st.image(img_io) | |
# User-provided prompt | |
if prompt := st.chat_input(disabled=not openai_api): | |
st.session_state.messages.append({"role": "user", | |
"content": prompt, | |
"type": "text"}) | |
with st.chat_message("user", avatar=piglet_img_path): | |
st.write(prompt) | |
# Retrieve text prompt from image submission | |
if prompt is None and \ | |
st.session_state.messages[-1]["role"] == "admin": | |
st.session_state.image_prompt = True | |
prompt = st.session_state.messages[-1]["content"] | |
# Generate a new response if last message is not from assistant | |
if st.session_state.messages[-1]["role"] != "assistant": | |
with st.chat_message("assistant", avatar=bear_img_path): | |
with st.spinner("π§Έπ€ Thinking... π»π"): | |
if st.session_state.image_prompt: | |
response = generate_llm_response( | |
prompt, | |
tool_choice="health_insurance_textbook_query_engine" | |
) | |
st.session_state.image_prompt = False | |
else: | |
response = generate_llm_response(prompt, tool_choice="auto") | |
placeholder = st.empty() | |
full_response = "" | |
for token in response.response_gen: | |
token = token.replace("\n", " \n") \ | |
.replace("$", "\$") \ | |
.replace("\[", "$$") | |
full_response += token | |
placeholder.markdown(full_response) | |
placeholder.markdown(full_response) | |
message = {"role": "assistant", | |
"content": full_response, | |
"type": "text"} | |
st.session_state.messages.append(message) | |
# Trigger feedback | |
if st.session_state.feedback: | |
result = streamlit_feedback( | |
feedback_type="thumbs", | |
optional_text_label="[Optional] Please provide an explanation", | |
on_submit=handle_feedback, | |
key=f"feedback_{st.session_state.feedback_key}" | |
) |