#!/usr/bin/python3 # -*- coding: utf-8 -*- """ https://huggingface.co/spaces/fffiloni/langchain-chat-with-pdf-openai """ import argparse import json import time from typing import List, Tuple import gradio as gr from openai import OpenAI from threading import Thread import _queue from queue import Queue import project_settings as settings def get_args(): parser = argparse.ArgumentParser() parser.add_argument( "--openai_api_key", default=settings.environment.get("openai_api_key", default=None, dtype=str), type=str ) args = parser.parse_args() return args def greet(question: str, history: List[Tuple[str, str]]): answer = "Hello " + question + "!" result = history + [(question, answer)] return result def get_message_list(client: OpenAI, thread_id: str): messages = client.beta.threads.messages.list( thread_id=thread_id ) result = list() for message in messages.data: content = list() for msg in message.content: content.append({ "text": { "annotations": msg.text.annotations, "value": msg.text.value, }, "type": msg.type, }) result.append({ "id": message.id, "assistant_id": message.assistant_id, "content": content, "created_at": message.created_at, "file_ids": message.file_ids, "metadata": message.metadata, "object": message.object, "role": message.role, "run_id": message.run_id, "thread_id": message.thread_id, }) result = list(sorted(result, key=lambda x: x["created_at"])) return result def convert_message_list_to_response(message_list: List[dict]) -> str: response = "" for message in message_list: role = message["role"] content = message["content"] for c in content: if c["type"] != "text": continue text: dict = c["text"] msg = "{}: \n{}\n".format(role, text["value"]) response += msg response += "-" * 80 response += "\n" return response def streaming_refresh(openai_api_key: str, thread_id: str, queue: Queue, ): delta_time = 0.3 last_response = None no_updates_count = 0 max_no_updates_count = 5 while True: time.sleep(delta_time) this_response = refresh(openai_api_key, thread_id) if this_response == last_response: no_updates_count += 1 if no_updates_count >= max_no_updates_count: break last_response = this_response queue.put(this_response, block=True, timeout=2) return last_response def refresh(openai_api_key: str, thread_id: str, ): client = OpenAI( api_key=openai_api_key, ) message_list = get_message_list(client, thread_id=thread_id) response = convert_message_list_to_response(message_list) return response def add_and_run(openai_api_key: str, assistant_id: str, thread_id: str, name: str, instructions: str, model: str, query: str, ): client = OpenAI( api_key=openai_api_key, ) if assistant_id is None or len(assistant_id.strip()) == 0: assistant = client.beta.assistants.create( name=name, instructions=instructions, # tools=[{"type": "code_interpreter"}], model=model, ) assistant_id = assistant.id if thread_id is None or len(thread_id.strip()) == 0: thread = client.beta.threads.create() thread_id = thread.id message = client.beta.threads.messages.create( thread_id=thread_id, role="user", content=query ) run = client.beta.threads.runs.create( thread_id=thread_id, assistant_id=assistant_id, ) run = client.beta.threads.runs.retrieve( thread_id=thread_id, run_id=run.id ) response_queue = Queue(maxsize=10) refresh_kwargs = dict( openai_api_key=openai_api_key, thread_id=thread_id, queue=response_queue, ) thread = Thread(target=streaming_refresh, kwargs=refresh_kwargs) thread.start() delta_time = 0.1 last_response = None no_updates_count = 0 max_no_updates_count = 10 while True: time.sleep(delta_time) try: this_response = response_queue.get(block=True, timeout=2) except _queue.Empty: break if this_response == last_response: no_updates_count += 1 if no_updates_count >= max_no_updates_count: break last_response = this_response result = [ assistant_id, thread_id, last_response ] yield result def main(): args = get_args() description = """ chat llm """ # ui with gr.Blocks() as blocks: gr.Markdown(value=description) with gr.Row(): # settings with gr.Column(scale=3): with gr.Tabs(): with gr.TabItem("create assistant"): openai_api_key = gr.Text( value=args.openai_api_key, label="openai_api_key", placeholder="Fill with your `openai_api_key`" ) name = gr.Textbox(label="name") instructions = gr.Textbox(label="instructions") model = gr.Dropdown(["gpt-4-1106-preview"], value="gpt-4-1106-preview", label="model") # functions functions = gr.TextArea(label="functions") # upload files retrieval_files = gr.Files(label="retrieval files") # chat with gr.Column(scale=5): response = gr.Textbox(lines=5, max_lines=80, label="response") query = gr.Textbox(lines=2, label="query") # chat_bot = gr.Chatbot([], elem_id="context", height=400) # text_box = gr.Textbox(show_label=False, placeholder="Enter text and press enter", container=False) with gr.Row(): with gr.Column(scale=1): add_and_run_button = gr.Button("Add and run") with gr.Column(scale=1): refresh_button = gr.Button("Refresh") # states with gr.Column(scale=2): # upload files assistant_id = gr.Textbox(value=None, label="assistant_id") thread_id = gr.Textbox(value=None, label="thread_id") # examples with gr.Row(): gr.Examples( examples=[ [ "Math Tutor", "You are a personal math tutor. Write and run code to answer math questions.", "gpt-4-1106-preview", "123 * 524 等于多少?" ] ], inputs=[ name, instructions, model, query, ], examples_per_page=5 ) # add and run add_and_run_button.click( add_and_run, inputs=[ openai_api_key, assistant_id, thread_id, name, instructions, model, query, ], outputs=[ assistant_id, thread_id, response, ], ) # refresh refresh_button.click( refresh, inputs=[ openai_api_key, thread_id, ], outputs=[ response ] ) blocks.queue().launch() return if __name__ == '__main__': main()