import gradio as gr import random import time from langchain import PromptTemplate from langchain.llms import OpenAI from langchain.chat_models import ChatOpenAI from langchain.embeddings import HuggingFaceEmbeddings, HuggingFaceInstructEmbeddings, OpenAIEmbeddings from langchain.vectorstores import Pinecone from langchain.chains import LLMChain from langchain.chains.question_answering import load_qa_chain import pinecone import os os.environ["TOKENIZERS_PARALLELISM"] = "false" #OPENAI_API_KEY = "" OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY", "") OPENAI_TEMP = 1 OPENAI_API_LINK = "[OpenAI API Key](https://platform.openai.com/account/api-keys)" OPENAI_LINK = "[OpenAI](https://openai.com)" PINECONE_KEY = os.environ.get("PINECONE_KEY", "") PINECONE_ENV = os.environ.get("PINECONE_ENV", "asia-northeast1-gcp") PINECONE_INDEX = os.environ.get("PINECONE_INDEX", '3gpp-r16') PINECONE_LINK = "[Pinecone](https://www.pinecone.io)" LANGCHAIN_LINK = "[LangChain](https://python.langchain.com/en/latest/index.html)" EMBEDDING_MODEL = os.environ.get("EMBEDDING_MODEL", "hkunlp/instructor-large") EMBEDDING_LOADER = os.environ.get("EMBEDDING_LOADER", "HuggingFaceInstructEmbeddings") EMBEDDING_LIST = ["HuggingFaceInstructEmbeddings", "HuggingFaceEmbeddings", "OpenAIEmbeddings"] # return top-k text chunks from vector store TOP_K_DEFAULT = 15 TOP_K_MAX = 30 SCORE_DEFAULT = 0.33 BUTTON_MIN_WIDTH = 215 LLM_NULL = "LLM-UNLOAD-critical" LLM_DONE = "LLM-LOADED-9cf" DB_NULL = "DB-UNLOAD-critical" DB_DONE = "DB-LOADED-9cf" FORK_BADGE = "Fork-HuggingFace Space-9cf" def get_logo(inputs, logo) -> str: return f"""https://img.shields.io/badge/{inputs}?style=flat&logo={logo}&logoColor=white""" def get_status(inputs, logo, pos) -> str: return f"""""" KEY_INIT = "Initialize Model" KEY_SUBMIT = "Submit" KEY_CLEAR = "Clear" MODEL_NULL = get_status(LLM_NULL, "openai", "right") MODEL_DONE = get_status(LLM_DONE, "openai", "right") DOCS_NULL = get_status(DB_NULL, "processingfoundation", "right") DOCS_DONE = get_status(DB_DONE, "processingfoundation", "right") TAB_1 = "Chatbot" TAB_2 = "Details" TAB_3 = "Database" TAB_4 = "TODO" FAVICON = './icon.svg' LLM_LIST = ["gpt-3.5-turbo", "text-davinci-003"] DOC_1 = '3GPP' DOC_2 = 'HTTP2' DOC_SUPPORTED = [DOC_1] DOC_DEFAULT = [DOC_1] DOC_LABEL = "Reference Docs" MODEL_WARNING = f"Please paste your **{OPENAI_API_LINK}** and then **{KEY_INIT}**" DOCS_WARNING = f"""Database Unloaded Please check your **{TAB_3}** config and then **{KEY_INIT}** Or you could uncheck **{DOC_LABEL}** to ask LLM directly""" webui_title = """ # OpenAI Chatbot Based on Vector Database """ dup_link = f''' ''' init_message = f"""This demonstration website is based on \ **{OPENAI_LINK}** with **{LANGCHAIN_LINK}** and **{PINECONE_LINK}** 1. Insert your **{OPENAI_API_LINK}** and click `{KEY_INIT}` 2. Insert your **Question** and click `{KEY_SUBMIT}` """ PROMPT_DOC = PromptTemplate( input_variables=["context", "chat_history", "question"], template="""Context: ## {context} ## Chat History: ## {chat_history} ## Question: {question} Answer:""" ) PROMPT_BASE = PromptTemplate( input_variables=['question', "chat_history"], template="""Chat History: ## {chat_history} ## Question: ## {question} ## Answer:""" ) #---------------------------------------------------------------------------------------------------------- #---------------------------------------------------------------------------------------------------------- def init_rwkv(): try: import rwkv return True except Exception: print("RWKV not found, skip local llm") return False def init_model(api_key, emb_name, emb_loader, db_api_key, db_env, db_index): init_rwkv() try: if not (api_key and api_key.startswith("sk-") and len(api_key) > 50): return None,MODEL_NULL+DOCS_NULL,None,None,None,None llm_dict = {} for llm_name in LLM_LIST: if llm_name == "gpt-3.5-turbo": llm_dict[llm_name] = ChatOpenAI(model_name=llm_name, temperature = OPENAI_TEMP, openai_api_key = api_key ) else: llm_dict[llm_name] = OpenAI(model_name=llm_name, temperature = OPENAI_TEMP, openai_api_key = api_key) if not (emb_name and db_api_key and db_env and db_index): return api_key,MODEL_DONE+DOCS_NULL,llm_dict,None,None,None if emb_loader == "OpenAIEmbeddings": embeddings = eval(emb_loader)(openai_api_key=api_key) else: embeddings = eval(emb_loader)(model_name=emb_name) pinecone.init(api_key = db_api_key, environment = db_env) db = Pinecone.from_existing_index(index_name = db_index, embedding = embeddings) return api_key, MODEL_DONE+DOCS_DONE, llm_dict, None, db, None except Exception as e: print(e) return None,MODEL_NULL+DOCS_NULL,None,None,None,None def get_chat_history(inputs) -> str: res = [] for human, ai in inputs: res.append(f"Q: {human}\nA: {ai}") return "\n".join(res) def remove_duplicates(documents, score_min): seen_content = set() unique_documents = [] for (doc, score) in documents: if (doc.page_content not in seen_content) and (score >= score_min): seen_content.add(doc.page_content) unique_documents.append(doc) return unique_documents def doc_similarity(query, db, top_k, score): docs = db.similarity_search_with_score(query = query, k=top_k) #docsearch = db.as_retriever(search_kwargs={'k':top_k}) #docs = docsearch.get_relevant_documents(query) udocs = remove_duplicates(docs, score) return udocs def user(user_message, history): return "", history+[[user_message, None]] def bot(box_message, ref_message, llm_dropdown, llm_dict, doc_list, db, top_k, score): # bot_message = random.choice(["Yes", "No"]) # 0 is user question, 1 is bot response question = box_message[-1][0] history = box_message[:-1] if (not llm_dict): box_message[-1][1] = MODEL_WARNING return box_message, "", "" if not ref_message: ref_message = question details = f"Q: {question}" else: details = f"Q: {question}\nR: {ref_message}" llm = llm_dict[llm_dropdown] if DOC_1 in doc_list: if (not db): box_message[-1][1] = DOCS_WARNING return box_message, "", "" docs = doc_similarity(ref_message, db, top_k, score) delta_top_k = top_k - len(docs) if delta_top_k > 0: docs = doc_similarity(ref_message, db, top_k+delta_top_k, score) prompt = PROMPT_DOC #chain = load_qa_chain(llm, chain_type="stuff") else: prompt = PROMPT_BASE docs = [] chain = LLMChain(llm = llm, prompt = prompt, output_key = 'output_text') all_output = chain({"question": question, "context": docs, "chat_history": get_chat_history(history) }) bot_message = all_output['output_text'] source = "".join([f"""
{doc.metadata["source"]} {doc.page_content}
""" for i, doc in enumerate(docs)]) #print(source) box_message[-1][1] = bot_message return box_message, "", [[details, bot_message + '\n\nMetadata:\n' + source]] #---------------------------------------------------------------------------------------------------------- #---------------------------------------------------------------------------------------------------------- with gr.Blocks( title = TAB_1, theme = "Base", css = """.bigbox { min-height:250px; } """) as demo: llm = gr.State() chain_2 = gr.State() # not inuse vector_db = gr.State() gr.Markdown(webui_title) gr.Markdown(dup_link) gr.Markdown(init_message) with gr.Row(): with gr.Column(scale=10): llm_api_textbox = gr.Textbox( label = "OpenAI API Key", # show_label = False, value = OPENAI_API_KEY, placeholder = "Paste Your OpenAI API Key (sk-...) and Hit ENTER", lines=1, type='password') with gr.Column(scale=1, min_width=BUTTON_MIN_WIDTH): init = gr.Button(KEY_INIT) #.style(full_width=False) model_statusbox = gr.HTML(MODEL_NULL+DOCS_NULL) with gr.Tab(TAB_1): with gr.Row(): with gr.Column(scale=10): chatbot = gr.Chatbot(elem_classes="bigbox") #with gr.Column(scale=1): with gr.Column(scale=1, min_width=BUTTON_MIN_WIDTH): doc_check = gr.CheckboxGroup(choices = DOC_SUPPORTED, value = DOC_DEFAULT, label = DOC_LABEL, interactive=True) llm_dropdown = gr.Dropdown(LLM_LIST, value=LLM_LIST[0], multiselect=False, interactive=True, label="LLM Selection", ) with gr.Row(): with gr.Column(scale=10): query = gr.Textbox(label="Question:", lines=2) ref = gr.Textbox(label="Reference(optional):") with gr.Column(scale=1, min_width=BUTTON_MIN_WIDTH): clear = gr.Button(KEY_CLEAR) submit = gr.Button(KEY_SUBMIT,variant="primary") with gr.Tab(TAB_2): with gr.Row(): with gr.Column(): top_k = gr.Slider(1, TOP_K_MAX, value=TOP_K_DEFAULT, step=1, label="Vector similarity top_k", interactive=True) with gr.Column(): score = gr.Slider(0.01, 0.99, value=SCORE_DEFAULT, step=0.01, label="Vector similarity score", interactive=True) detail_panel = gr.Chatbot(label="Related Docs") with gr.Tab(TAB_3): with gr.Row(): with gr.Column(): emb_textbox = gr.Textbox( label = "Embedding Model", # show_label = False, value = EMBEDDING_MODEL, placeholder = "Paste Your Embedding Model Repo on HuggingFace", lines=1, interactive=True, type='email') with gr.Column(): emb_dropdown = gr.Dropdown( EMBEDDING_LIST, value=EMBEDDING_LOADER, multiselect=False, interactive=True, label="Embedding Loader") with gr.Accordion("Pinecone Database for "+DOC_1): with gr.Row(): db_api_textbox = gr.Textbox( label = "Pinecone API Key", # show_label = False, value = PINECONE_KEY, placeholder = "Paste Your Pinecone API Key (xx-xx-xx-xx-xx) and Hit ENTER", lines=1, interactive=True, type='password') with gr.Row(): db_env_textbox = gr.Textbox( label = "Pinecone Environment", # show_label = False, value = PINECONE_ENV, placeholder = "Paste Your Pinecone Environment (xx-xx-xx) and Hit ENTER", lines=1, interactive=True, type='email') db_index_textbox = gr.Textbox( label = "Pinecone Index", # show_label = False, value = PINECONE_INDEX, placeholder = "Paste Your Pinecone Index (xxxx) and Hit ENTER", lines=1, interactive=True, type='email') with gr.Tab(TAB_4): "TODO" init_input = [llm_api_textbox, emb_textbox, emb_dropdown, db_api_textbox, db_env_textbox, db_index_textbox] init_output = [llm_api_textbox, model_statusbox, llm, chain_2, vector_db, chatbot] llm_api_textbox.submit(init_model, init_input, init_output) init.click(init_model, init_input, init_output) submit.click(user, [query, chatbot], [query, chatbot], queue=False).then( bot, [chatbot, ref, llm_dropdown, llm, doc_check, vector_db, top_k, score], [chatbot, ref, detail_panel] ) clear.click(lambda: (None,None,None), None, [query, ref, chatbot], queue=False) #---------------------------------------------------------------------------------------------------------- #---------------------------------------------------------------------------------------------------------- if __name__ == "__main__": demo.launch(share = False, inbrowser = True, favicon_path = FAVICON)