Spaces:
Running
Running
import json | |
import os | |
from pathlib import Path | |
import gradio as gr | |
import jsonlines | |
from agent.actions import RetrievalQA | |
from agent.llm import ChatAsOAI | |
from agent.memory import Memory | |
from utils import service, cache_file, max_ref_token | |
llm = ChatAsOAI(model="gpt-3.5-turbo") | |
mem = Memory(llm=llm, stream=False) | |
with open('css/main.css', 'r') as f: | |
css = f.read() | |
with open('js/main.js', 'r') as f: | |
js = f.read() | |
def add_text(history, text): | |
history = history + [(text, None)] | |
return history, gr.update(value='', interactive=False) | |
def rm_text(history): | |
if not history: | |
gr.Warning('No input content!') | |
elif not history[-1][1]: | |
return history, gr.update(value='', interactive=False) | |
else: | |
history = history[:-1] + [(history[-1][0], None)] | |
return history, gr.update(value='', interactive=False) | |
def add_file(history, file): | |
history = history + [((file.name,), None)] | |
return history | |
def initialize(request: gr.Request): | |
# print(request.kwargs) | |
access_token = request.query_params["access_token"] | |
url = request.query_params["url"] | |
is_valid = False | |
if access_token: | |
account_info = json.loads(service.get(access_token, "info.json", False)) | |
if account_info and account_info["enabled"]: | |
is_valid = True | |
if not is_valid: | |
gr.Info("The token is not valid, Please reset!") | |
return | |
return access_token, url | |
def bot(history, access_token, page_url): | |
if not history: | |
yield history | |
else: | |
now_page = None | |
_ref = '' | |
if not service.exists(access_token, page_url): | |
gr.Info("Please add this page to LLMBB's Reading List first!") | |
else: | |
now_page = json.loads(service.get(access_token, page_url)) | |
if not now_page: | |
gr.Info( | |
"This page has not yet been added to the LLMBB's reading list!" | |
) | |
elif not now_page['raw']: | |
gr.Info('Please reopen later, LLMBB is analyzing this page...') | |
else: | |
_ref_list = mem.get( | |
history[-1][0], [now_page], | |
max_token=max_ref_token) | |
if _ref_list: | |
_ref = '\n'.join( | |
json.dumps(x, ensure_ascii=False) for x in _ref_list) | |
else: | |
_ref = '' | |
# TODO: considering history for retrieval qa | |
agent = RetrievalQA(stream=True, llm=llm) | |
history[-1][1] = '' | |
response = agent.run(user_request=history[-1][0], ref_doc=_ref) | |
for chunk in response: | |
if chunk is not None: | |
history[-1][1] += chunk | |
yield history | |
# save history | |
if now_page: | |
now_page['session'] = history | |
service.upsert(access_token, page_url, json.dumps(now_page, ensure_ascii=False)) | |
def load_history_session(history, access_token, page_url): | |
now_page = None | |
if not service.exists(access_token, page_url): | |
gr.Info("Please add this page to LLMBB's Reading List first!") | |
return [] | |
now_page = json.loads(service.get(access_token, page_url)) | |
if not now_page: | |
gr.Info("Please add this page to LLMBB's Reading List first!") | |
return [] | |
if not now_page['raw']: | |
gr.Info('Please wait, LLMBB is analyzing this page...') | |
return [] | |
return now_page['session'] | |
def clear_session(access_token, page_url): | |
if not service.exists(access_token, page_url): | |
return None | |
now_page = json.loads(service.get(access_token, page_url)) | |
if not now_page: | |
return None | |
now_page['session'] = [] | |
service.upsert(access_token, page_url, json.dumps(now_page, ensure_ascii=False)) | |
return None | |
with gr.Blocks(css=css, theme='soft') as demo: | |
access_token = gr.State("") | |
page_url = gr.State("") | |
chatbot = gr.Chatbot([], elem_id='chatbot', height=480, avatar_images=(None, 'img/logo.png')) | |
with gr.Row(): | |
with gr.Column(scale=7): | |
txt = gr.Textbox(show_label=False, | |
placeholder='Chat with LLMBB...', | |
container=False) | |
# with gr.Column(scale=0.06, min_width=0): | |
# smt_bt = gr.Button('β') | |
with gr.Column(scale=1, min_width=0): | |
clr_bt = gr.Button('π§Ή', elem_classes='bt_small_font') | |
with gr.Column(scale=1, min_width=0): | |
stop_bt = gr.Button('π«', elem_classes='bt_small_font') | |
with gr.Column(scale=1, min_width=0): | |
re_bt = gr.Button('π', elem_classes='bt_small_font') | |
txt_msg = txt.submit(add_text, [chatbot, txt], [chatbot, txt], | |
queue=False).then(bot, [chatbot, access_token, page_url], chatbot) | |
txt_msg.then(lambda: gr.update(interactive=True), None, [txt], queue=False) | |
# txt_msg_bt = smt_bt.click(add_text, [chatbot, txt], [chatbot, txt], | |
# queue=False).then(bot, chatbot, chatbot) | |
# txt_msg_bt.then(lambda: gr.update(interactive=True), | |
# None, [txt], | |
# queue=False) | |
clr_bt.click(clear_session, [access_token, page_url], chatbot, queue=False) | |
re_txt_msg = re_bt.click(rm_text, [chatbot], [chatbot, txt], | |
queue=False).then(bot, [chatbot, access_token, page_url], chatbot) | |
re_txt_msg.then(lambda: gr.update(interactive=True), | |
None, [txt], | |
queue=False) | |
stop_bt.click(None, None, None, cancels=[txt_msg, re_txt_msg], queue=False) | |
demo.load(initialize, [], [access_token, page_url]).then(load_history_session, [chatbot, access_token, page_url], chatbot) | |
demo.queue() | |
# demo.queue().launch(server_name=server_config.server.server_host, server_port=server_config.server.app_in_browser_port) | |