|
import gradio as gr |
|
from response_parser import * |
|
|
|
|
|
def initialization(state_dict: Dict) -> None: |
|
if not os.path.exists('cache'): |
|
os.mkdir('cache') |
|
if state_dict["bot_backend"] is None: |
|
state_dict["bot_backend"] = BotBackend() |
|
if 'OPENAI_API_KEY' in os.environ: |
|
del os.environ['OPENAI_API_KEY'] |
|
|
|
|
|
def get_bot_backend(state_dict: Dict) -> BotBackend: |
|
return state_dict["bot_backend"] |
|
|
|
|
|
def switch_to_gpt4(state_dict: Dict, whether_switch: bool) -> None: |
|
bot_backend = get_bot_backend(state_dict) |
|
if whether_switch: |
|
bot_backend.update_gpt_model_choice("GPT-4") |
|
else: |
|
bot_backend.update_gpt_model_choice("GPT-3.5") |
|
|
|
|
|
def add_text(state_dict: Dict, history: List, text: str) -> Tuple[List, Dict]: |
|
bot_backend = get_bot_backend(state_dict) |
|
bot_backend.add_text_message(user_text=text) |
|
|
|
history = history + [(text, None)] |
|
|
|
return history, gr.update(value="", interactive=False) |
|
|
|
|
|
def add_file(state_dict: Dict, history: List, files) -> List: |
|
bot_backend = get_bot_backend(state_dict) |
|
for file in files: |
|
path = file.name |
|
filename = os.path.basename(path) |
|
|
|
bot_msg = [f'📁[{filename}]', None] |
|
history.append(bot_msg) |
|
|
|
bot_backend.add_file_message(path=path, bot_msg=bot_msg) |
|
|
|
_, suffix = os.path.splitext(filename) |
|
if suffix in {'.jpg', '.jpeg', '.png', '.bmp', '.webp'}: |
|
copied_file_path = f'{bot_backend.jupyter_work_dir}/{filename}' |
|
width, height = get_image_size(copied_file_path) |
|
bot_msg[0] += \ |
|
f'\n<img src=\"file={copied_file_path}\" style=\'{"" if width < 800 else "width: 800px;"} max-width' \ |
|
f':none; max-height:none\'> ' |
|
|
|
return history |
|
|
|
|
|
def undo_upload_file(state_dict: Dict, history: List) -> Tuple[List, Dict]: |
|
bot_backend = get_bot_backend(state_dict) |
|
bot_msg = bot_backend.revoke_file() |
|
|
|
if bot_msg is None: |
|
return history, gr.Button.update(interactive=False) |
|
|
|
else: |
|
assert history[-1] == bot_msg |
|
del history[-1] |
|
if bot_backend.revocable_files: |
|
return history, gr.Button.update(interactive=True) |
|
else: |
|
return history, gr.Button.update(interactive=False) |
|
|
|
|
|
def refresh_file_display(state_dict: Dict) -> List[str]: |
|
bot_backend = get_bot_backend(state_dict) |
|
work_dir = bot_backend.jupyter_work_dir |
|
filenames = os.listdir(work_dir) |
|
paths = [] |
|
for filename in filenames: |
|
path = os.path.join(work_dir, filename) |
|
if not os.path.isdir(path): |
|
paths.append(path) |
|
return paths |
|
|
|
|
|
def refresh_token_count(state_dict: Dict): |
|
bot_backend = get_bot_backend(state_dict) |
|
model_choice = bot_backend.gpt_model_choice |
|
sliced = bot_backend.sliced |
|
token_count = bot_backend.context_window_tokens |
|
token_limit = config['model_context_window'][config['model'][model_choice]['model_name']] |
|
display_text = f"**Context token:** {token_count}/{token_limit}" |
|
if sliced: |
|
display_text += '\\\nToken limit exceeded, conversion has been sliced.' |
|
return gr.Markdown.update(value=display_text) |
|
|
|
|
|
def restart_ui(history: List) -> Tuple[List, Dict, Dict, Dict, Dict, Dict, Dict]: |
|
history.clear() |
|
return ( |
|
history, |
|
gr.Textbox.update(value="", interactive=False), |
|
gr.Button.update(interactive=False), |
|
gr.Button.update(interactive=False), |
|
gr.Button.update(interactive=False), |
|
gr.Button.update(interactive=False), |
|
gr.Button.update(visible=False) |
|
) |
|
|
|
|
|
def restart_bot_backend(state_dict: Dict) -> None: |
|
bot_backend = get_bot_backend(state_dict) |
|
bot_backend.restart() |
|
|
|
|
|
def stop_generating(state_dict: Dict) -> None: |
|
bot_backend = get_bot_backend(state_dict) |
|
if bot_backend.code_executing: |
|
bot_backend.send_interrupt_signal() |
|
else: |
|
bot_backend.update_stop_generating_state(stop_generating=True) |
|
|
|
|
|
def bot(state_dict: Dict, history: List) -> List: |
|
bot_backend = get_bot_backend(state_dict) |
|
|
|
while bot_backend.finish_reason in ('new_input', 'function_call'): |
|
if history[-1][1]: |
|
history.append([None, ""]) |
|
else: |
|
history[-1][1] = "" |
|
|
|
try: |
|
response = chat_completion(bot_backend=bot_backend) |
|
for chunk in response: |
|
if chunk['choices'] and chunk['choices'][0]['finish_reason'] == 'function_call': |
|
if bot_backend.function_name in bot_backend.jupyter_kernel.available_functions: |
|
yield history, gr.Button.update(value='⏹️ Interrupt execution'), gr.Button.update(visible=False) |
|
else: |
|
yield history, gr.Button.update(interactive=False), gr.Button.update(visible=False) |
|
|
|
if bot_backend.stop_generating: |
|
response.close() |
|
if bot_backend.content: |
|
bot_backend.add_gpt_response_content_message() |
|
if bot_backend.display_code_block: |
|
bot_backend.update_display_code_block( |
|
display_code_block="\n⚫Stopped:\n```python\n{}\n```".format(bot_backend.code_str) |
|
) |
|
history = copy.deepcopy(bot_backend.bot_history) |
|
history[-1][1] += bot_backend.display_code_block |
|
bot_backend.add_function_call_response_message(function_response=None) |
|
|
|
bot_backend.reset_gpt_response_log_values() |
|
break |
|
|
|
history, weather_exit = parse_response( |
|
chunk=chunk, |
|
history=history, |
|
bot_backend=bot_backend |
|
) |
|
|
|
yield ( |
|
history, |
|
gr.Button.update( |
|
interactive=False if bot_backend.stop_generating else True, |
|
value='⏹️ Stop generating' |
|
), |
|
gr.Button.update(visible=False) |
|
) |
|
if weather_exit: |
|
exit(-1) |
|
except openai.OpenAIError as openai_error: |
|
bot_backend.reset_gpt_response_log_values(exclude=['finish_reason']) |
|
yield history, gr.Button.update(interactive=False), gr.Button.update(visible=True) |
|
raise openai_error |
|
|
|
yield history, gr.Button.update(interactive=False, value='⏹️ Stop generating'), gr.Button.update(visible=False) |
|
|
|
|
|
if __name__ == '__main__': |
|
config = get_config() |
|
with gr.Blocks(theme=gr.themes.Base()) as block: |
|
""" |
|
Reference: https://www.gradio.app/guides/creating-a-chatbot-fast |
|
""" |
|
|
|
state = gr.State(value={"bot_backend": None}) |
|
with gr.Tab("Chat"): |
|
chatbot = gr.Chatbot([], elem_id="chatbot", label="Local Code Interpreter", height=750) |
|
with gr.Row(): |
|
with gr.Column(scale=0.85): |
|
text_box = gr.Textbox( |
|
show_label=False, |
|
placeholder="Enter text and press enter, or upload a file", |
|
container=False |
|
) |
|
with gr.Column(scale=0.15, min_width=0): |
|
file_upload_button = gr.UploadButton("📁", file_count='multiple', file_types=['file']) |
|
with gr.Row(equal_height=True): |
|
with gr.Column(scale=0.08, min_width=0): |
|
check_box = gr.Checkbox(label="Use GPT-4", interactive=config['model']['GPT-4']['available']) |
|
with gr.Column(scale=0.314, min_width=0): |
|
model_token_limit = config['model_context_window'][config['model']['GPT-3.5']['model_name']] |
|
token_count_display_text = f"**Context token:** 0/{model_token_limit}" |
|
token_monitor = gr.Markdown(value=token_count_display_text) |
|
with gr.Column(scale=0.15, min_width=0): |
|
retry_button = gr.Button(value='🔂OpenAI Error, click here to retry', visible=False) |
|
with gr.Column(scale=0.15, min_width=0): |
|
stop_generation_button = gr.Button(value='⏹️ Stop generating', interactive=False) |
|
with gr.Column(scale=0.15, min_width=0): |
|
restart_button = gr.Button(value='🔄 Restart') |
|
with gr.Column(scale=0.15, min_width=0): |
|
undo_file_button = gr.Button(value="↩️Undo upload file", interactive=False) |
|
with gr.Tab("Files"): |
|
file_output = gr.Files() |
|
|
|
|
|
txt_msg = text_box.submit(add_text, [state, chatbot, text_box], [chatbot, text_box], queue=False).then( |
|
lambda: gr.Button.update(interactive=False), None, [undo_file_button], queue=False |
|
).then( |
|
bot, [state, chatbot], [chatbot, stop_generation_button, retry_button] |
|
) |
|
txt_msg.then(fn=refresh_file_display, inputs=[state], outputs=[file_output]) |
|
txt_msg.then(lambda: gr.update(interactive=True), None, [text_box], queue=False) |
|
txt_msg.then(fn=refresh_token_count, inputs=[state], outputs=[token_monitor]) |
|
|
|
retry_button.click(lambda: gr.Button.update(visible=False), None, [retry_button], queue=False).then( |
|
bot, [state, chatbot], [chatbot, stop_generation_button, retry_button] |
|
).then( |
|
fn=refresh_file_display, inputs=[state], outputs=[file_output] |
|
).then( |
|
lambda: gr.update(interactive=True), None, [text_box], queue=False |
|
).then( |
|
fn=refresh_token_count, inputs=[state], outputs=[token_monitor] |
|
) |
|
|
|
check_box.change(fn=switch_to_gpt4, inputs=[state, check_box]).then( |
|
fn=refresh_token_count, inputs=[state], outputs=[token_monitor] |
|
) |
|
|
|
file_msg = file_upload_button.upload( |
|
add_file, [state, chatbot, file_upload_button], [chatbot], queue=False |
|
) |
|
file_msg.then(lambda: gr.Button.update(interactive=True), None, [undo_file_button], queue=False) |
|
file_msg.then(fn=refresh_file_display, inputs=[state], outputs=[file_output]) |
|
|
|
undo_file_button.click( |
|
fn=undo_upload_file, inputs=[state, chatbot], outputs=[chatbot, undo_file_button] |
|
).then( |
|
fn=refresh_file_display, inputs=[state], outputs=[file_output] |
|
) |
|
|
|
stop_generation_button.click(fn=stop_generating, inputs=[state], queue=False).then( |
|
fn=lambda: gr.Button.update(interactive=False), inputs=None, outputs=[stop_generation_button], queue=False |
|
) |
|
|
|
restart_button.click( |
|
fn=restart_ui, inputs=[chatbot], |
|
outputs=[ |
|
chatbot, text_box, restart_button, file_upload_button, undo_file_button, stop_generation_button, |
|
retry_button |
|
] |
|
).then( |
|
fn=restart_bot_backend, inputs=[state], queue=False |
|
).then( |
|
fn=refresh_file_display, inputs=[state], outputs=[file_output] |
|
).then( |
|
fn=lambda: (gr.Textbox.update(interactive=True), gr.Button.update(interactive=True), |
|
gr.Button.update(interactive=True)), |
|
inputs=None, outputs=[text_box, restart_button, file_upload_button], queue=False |
|
).then( |
|
fn=refresh_token_count, |
|
inputs=[state], outputs=[token_monitor] |
|
) |
|
|
|
block.load(fn=initialization, inputs=[state]) |
|
|
|
block.queue() |
|
block.launch(inbrowser=True) |
|
|