#!/usr/bin/python3 # -*- coding: utf-8 -*- """ https://huggingface.co/spaces/fffiloni/langchain-chat-with-pdf-openai """ import argparse import httpx import importlib import json import logging import os import platform import shutil import time from typing import List, Tuple logging.basicConfig( level=logging.INFO if platform.system() == "Windows" else logging.DEBUG, format="%(asctime)s %(levelname)s %(message)s", datefmt="%Y-%m-%d %H:%M:%S", ) import gradio as gr import openai from openai import OpenAI import project_settings as settings from project_settings import project_path logger = logging.getLogger(__name__) def get_args(): parser = argparse.ArgumentParser() parser.add_argument( "--examples_json_file", default="examples.json", type=str ) parser.add_argument( "--description_md_file", default="description.md", type=str ) parser.add_argument( "--openai_api_key", default=settings.environment.get("openai_api_key", default=None, dtype=str), type=str ) args = parser.parse_args() return args def dynamic_import_function(package_name: str, function_name: str): try: lib = importlib.import_module("functions.{}".format(package_name)) except ModuleNotFoundError as e: raise e function = getattr(lib, function_name) # del lib return function def click_create_assistant(openai_api_key: str, name: str, instructions: str, description: str, tools: str, files: List[str], file_ids: str, model: str, ): logger.info("click create assistant, name: {}".format(name)) client = OpenAI( api_key=openai_api_key, ) # tools tools = str(tools).strip() if tools is not None and len(tools) != 0: tools = tools.split("\n") tools = [json.loads(tool) for tool in tools if len(tool.strip()) != 0] else: tools = list() # files if files is not None and len(files) != 0: files = [ client.files.create( file=open(file, "rb"), purpose='assistants' ) for file in files ] else: files = list() # file_ids file_ids = str(file_ids).strip() if file_ids is not None and len(file_ids) != 0: file_ids = file_ids.split("\n") file_ids = [file_id.strip() for file_id in file_ids if len(file_id.strip()) != 0] else: file_ids = list() # assistant assistant = client.beta.assistants.create( name=name, instructions=instructions, description=description, tools=tools, file_ids=file_ids + [file.id for file in files], model=model, ) assistant_id = assistant.id return assistant_id, None def click_list_assistant(openai_api_key: str) -> str: client = OpenAI( api_key=openai_api_key, ) assistant_list = client.beta.assistants.list() assistant_list = assistant_list.model_dump(mode="json") result = "" for a in assistant_list["data"]: assis = "id: \n{}\nname: \n{}\ndescription: \n{}\n\n".format(a["id"], a["name"], a["description"]) result += assis return result def click_delete_assistant(openai_api_key: str, assistant_id: str) -> str: assistant_id = assistant_id.strip() logger.info("click delete assistant, assistant_id: {}".format(assistant_id)) client = OpenAI( api_key=openai_api_key, ) try: assistant_deleted = client.beta.assistants.delete(assistant_id=assistant_id) result = "success" if assistant_deleted.deleted else "failed" except openai.NotFoundError as e: result = e.message return result def click_delete_all_assistant(openai_api_key: str): client = OpenAI( api_key=openai_api_key, ) assistant_list = client.beta.assistants.list() for a in assistant_list.data: client.beta.assistants.delete(a.id) return None def click_list_file(openai_api_key: str): client = OpenAI( api_key=openai_api_key, ) file_list = client.files.list() file_list = file_list.model_dump(mode="json") result = "" for f in file_list["data"]: file = "id: \n{}\nfilename: \n{}\nbytes: \n{}\nstatus: \n{}\n\n".format( f["id"], f["filename"], f["bytes"], f["status"] ) result += file return result def click_delete_file(openai_api_key: str, file_id: str) -> str: file_id = file_id.strip() logger.info("click delete file, file_id: {}".format(file_id)) client = OpenAI( api_key=openai_api_key, ) try: assistant_deleted = client.files.delete(file_id=file_id) result = "success" if assistant_deleted.deleted else "failed" except openai.NotFoundError as e: result = e.message except httpx.InvalidURL as e: result = str(e) return result def click_upload_files(openai_api_key: str, files: List[str], ): logger.info("click upload files, files: {}".format(files)) client = OpenAI( api_key=openai_api_key, ) result = list() if files is not None and len(files) != 0: files = [ client.files.create( file=open(file, "rb"), purpose='assistants' ) for file in files ] file_ids = [file.id for file in files] result.extend(file_ids) return result def click_list_function_python_script(): function_script_dir = project_path / "functions" result = "" for script in function_script_dir.glob("*.py"): if script.name == "__init__.py": continue result += script.name result += "\n" return result def click_upload_function_python_script(files: List[str]): tgt = project_path / "functions" if files is None: return None for file in files: shutil.copy(file, tgt.as_posix()) return None def click_delete_function_python_script(filename: str): function_script_dir = project_path / "functions" filename = function_script_dir / filename.strip() filename = filename.as_posix() try: os.remove(filename) result = "success" except FileNotFoundError as e: result = str(e) except Exception as e: result = str(e) return result def click_download_function_python_script(name: str): function_script_dir = project_path / "functions" filename = function_script_dir / name.strip() if not filename.exists(): files = None flag = "File Not Found: {}".format(name.strip()) else: files = [filename.as_posix()] flag = "You can download it on `upload_python_script_files` now." return files, flag def convert_message_list_to_conversation(message_list: List[dict]) -> List[Tuple[str, str]]: conversation = list() for message in message_list: role = message["role"] content = message["content"] for c in content: c_type = c["type"] if c_type != "text": continue text: dict = c["text"] if c_type == "text": text_value = text["value"] text_annotations = text["annotations"] msg = text_value for text_annotation in text_annotations: a_type = text_annotation["type"] if a_type == "file_citation": msg += "\n\n" msg += "\nquote: \n{}\nfile_id: \n{}".format( text_annotation["file_citation"]["quote"], text_annotation["file_citation"]["file_id"], ) else: raise NotImplementedError if role == "assistant": msg = [None, msg] else: msg = [msg, None] conversation.append(msg) return conversation def refresh(openai_api_key: str, thread_id: str, ): client = OpenAI( api_key=openai_api_key, ) message_list = client.beta.threads.messages.list( thread_id=thread_id ) message_list = message_list.model_dump(mode="json") message_list = message_list["data"] message_list = list(sorted(message_list, key=lambda x: x["created_at"])) logger.debug("message_list: {}".format(message_list)) conversation = convert_message_list_to_conversation(message_list) return conversation def add_and_run(openai_api_key: str, assistant_id: str, thread_id: str, name: str, instructions: str, description: str, tools: str, files: List[str], file_ids: str, model: str, query: str, ): client = OpenAI( api_key=openai_api_key, ) if assistant_id is None or len(assistant_id.strip()) == 0: assistant_id, _ = click_create_assistant( openai_api_key, name, instructions, description, tools, files, file_ids, model ) if thread_id is None or len(thread_id.strip()) == 0: thread = client.beta.threads.create() thread_id = thread.id logger.info(f"assistant_id: {assistant_id}, thread_id: {thread_id}") message = client.beta.threads.messages.create( thread_id=thread_id, role="user", content=query ) run = client.beta.threads.runs.create( thread_id=thread_id, assistant_id=assistant_id, ) delta_time = 0.1 last_conversation = None no_updates_count = 0 max_no_updates_count = 5 while True: time.sleep(delta_time) run = client.beta.threads.runs.retrieve( thread_id=thread_id, run_id=run.id, ) # required action if run.required_action is not None: if run.required_action.type == "submit_tool_outputs": tool_outputs = list() for tool_call in run.required_action.submit_tool_outputs.tool_calls: function_name = tool_call.function.name function_to_call = dynamic_import_function(function_name, function_name) kwargs_required: List[str] = dynamic_import_function(function_name, "kwargs")() function_args = json.loads(tool_call.function.arguments) kwargs = {k: function_args.get(k) for k in kwargs_required} function_response = function_to_call(**kwargs) tool_outputs.append({ "tool_call_id": tool_call.id, "output": function_response, }) run = client.beta.threads.runs.submit_tool_outputs( thread_id=thread_id, run_id=run.id, tool_outputs=tool_outputs ) # get message conversation = refresh(openai_api_key, thread_id) if conversation == last_conversation: if any([run.completed_at is not None, run.cancelled_at is not None, run.failed_at is not None, run.expires_at is not None]): no_updates_count += 1 if no_updates_count >= max_no_updates_count: break last_conversation = conversation result = [ assistant_id, thread_id, conversation, None ] yield result def main(): args = get_args() brief_description = """ ## OpenAI Assistant 基于 [OpenAI platform](https://platform.openai.com/docs/introduction) 开发的 assistant 界面及示例。使用方法等详细介绍在最下面。 """ with open(args.description_md_file, "r", encoding="utf-8") as f: md_description = f.read() # example json with open(args.examples_json_file, "r", encoding="utf-8") as f: examples = json.load(f) for example in examples: files: List[str] = example[4] if files is None: continue files = [(project_path / file).as_posix() for file in files] example[4] = files # ui with gr.Blocks() as blocks: gr.Markdown(value=brief_description) with gr.Row(): # settings with gr.Column(scale=3): with gr.Tabs(): with gr.TabItem("create assistant"): openai_api_key = gr.Text( value=args.openai_api_key, label="openai_api_key", placeholder="Fill with your `openai_api_key`" ) name = gr.Textbox(label="name") instructions = gr.Textbox(label="instructions") description = gr.Textbox(label="description") model = gr.Dropdown(["gpt-4-1106-preview"], value="gpt-4-1106-preview", label="model") # functions tools = gr.TextArea(label="functions") # upload files retrieval_files = gr.Files(label="retrieval_files") retrieval_file_ids = gr.TextArea(label="retrieval_file_ids") # create assistant create_assistant_button = gr.Button("create assistant", variant="secondary") with gr.TabItem("assistants"): list_assistant_button = gr.Button("list assistant") assistant_list = gr.TextArea(label="assistant_list") delete_assistant_id = gr.Textbox(max_lines=1, label="delete_assistant_id") delete_assistant_button = gr.Button("delete assistant") delete_all_assistant_button = gr.Button("delete all assistant") with gr.TabItem("files"): list_file_button = gr.Button("list file") file_list = gr.TextArea(label="file_list") upload_files = gr.Files(label="upload_files") upload_files_button = gr.Button("upload file") delete_file_id = gr.Textbox(max_lines=1, label="delete_file_id") delete_file_button = gr.Button("delete file") with gr.TabItem("function script"): list_function_python_script_button = gr.Button("list python script") list_function_python_script_list = gr.TextArea(label="python_script_list") upload_function_python_script_files = gr.Files(label="upload_python_script_files") upload_function_python_script_button = gr.Button("upload python script") function_python_script_file = gr.Textbox(max_lines=1, label="python_script_file") delete_function_python_script_button = gr.Button("delete python script") download_function_python_script_button = gr.Button("download python script") # chat with gr.Column(scale=5): chat_bot = gr.Chatbot(label="conversation", height=600) query = gr.Textbox(lines=1, label="query") with gr.Row(): with gr.Column(scale=1): add_and_run_button = gr.Button("Add and run", variant="primary") with gr.Column(scale=1): refresh_button = gr.Button("Refresh") # states with gr.Column(scale=2): assistant_id = gr.Textbox(value=None, label="assistant_id") thread_id = gr.Textbox(value=None, label="thread_id") tips = gr.TextArea(value=None, label="tips") # examples with gr.Row(): gr.Examples( examples=examples, inputs=[ name, instructions, description, tools, retrieval_files, model, query, tips ], examples_per_page=5 ) gr.Markdown(value=md_description) # create assistant create_assistant_button.click( click_create_assistant, inputs=[ openai_api_key, name, instructions, description, tools, retrieval_files, retrieval_file_ids, model, ], outputs=[ assistant_id, thread_id ] ) # list assistant list_assistant_button.click( click_list_assistant, inputs=[ openai_api_key ], outputs=[ assistant_list ] ) # delete assistant button delete_assistant_button.click( click_delete_assistant, inputs=[ openai_api_key, delete_assistant_id ], outputs=[ delete_assistant_id ] ) # delete all assistant delete_all_assistant_button.click( click_delete_all_assistant, inputs=[ openai_api_key ], outputs=[ file_list ] ) # list file list_file_button.click( click_list_file, inputs=[ openai_api_key ], outputs=[ file_list ], ) # delete file delete_file_button.click( click_delete_file, inputs=[ openai_api_key, delete_file_id ], outputs=[ delete_file_id ] ) # upload files upload_files_button.click( click_upload_files, inputs=[ openai_api_key, upload_files ], outputs=[ ] ) # list python script list_function_python_script_button.click( click_list_function_python_script, inputs=[], outputs=[ list_function_python_script_list ] ) # upload function python script upload_function_python_script_button.click( click_upload_function_python_script, inputs=[ upload_function_python_script_files ], outputs=[ upload_function_python_script_files ] ) # delete function python script delete_function_python_script_button.click( click_delete_function_python_script, inputs=[ function_python_script_file ], outputs=[ function_python_script_file ] ) # download function python script download_function_python_script_button.click( click_download_function_python_script, inputs=[ function_python_script_file ], outputs=[ upload_function_python_script_files, function_python_script_file ] ) # query submit query.submit( add_and_run, inputs=[ openai_api_key, assistant_id, thread_id, name, instructions, description, tools, retrieval_files, retrieval_file_ids, model, query, ], outputs=[ assistant_id, thread_id, chat_bot, query ], api_name="query_submit", ) # add and run add_and_run_button.click( add_and_run, inputs=[ openai_api_key, assistant_id, thread_id, name, instructions, description, tools, retrieval_files, retrieval_file_ids, model, query, ], outputs=[ assistant_id, thread_id, chat_bot, query ], ) # refresh refresh_button.click( refresh, inputs=[ openai_api_key, thread_id, ], outputs=[ chat_bot ] ) blocks.queue().launch() return if __name__ == '__main__': main()