import os import subprocess import random from huggingface_hub import InferenceClient import gradio as gr from safe_search import safe_search from i_search import google from i_search import i_search as i_s from agent import ( ACTION_PROMPT, ADD_PROMPT, COMPRESS_HISTORY_PROMPT, LOG_PROMPT, LOG_RESPONSE, MODIFY_PROMPT, PREFIX, SEARCH_QUERY, READ_PROMPT, TASK_PROMPT, UNDERSTAND_TEST_RESULTS_PROMPT, ) from utils import parse_action, parse_file_content, read_python_module_structure from datetime import datetime now = datetime.now() date_time_str = now.strftime("%Y-%m-%d %H:%M:%S") client = InferenceClient( "mistralai/Mixtral-8x7B-Instruct-v0.1" ) ############################################ VERBOSE = True MAX_HISTORY = 100 #MODEL = "gpt-3.5-turbo" # "gpt-4" def format_prompt(message, history): prompt = "" for user_prompt, bot_response in history: prompt += f"[INST] {user_prompt} [/INST]" prompt += f" {bot_response} " prompt += f"[INST] {message} [/INST]" return prompt def run_gpt( prompt_template, stop_tokens, max_tokens, module_summary, purpose, **prompt_kwargs, ): seed = random.randint(1,1111111111111111) generate_kwargs = dict( temperature=0.9, max_new_tokens=1048, top_p=0.95, repetition_penalty=1.0, do_sample=True, seed=seed, ) content = PREFIX.format( date_time_str=date_time_str, purpose=purpose, safe_search=safe_search, ) + prompt_template.format(**prompt_kwargs) if VERBOSE: print(LOG_PROMPT.format(content)) #formatted_prompt = format_prompt(f"{system_prompt}, {prompt}", history) #formatted_prompt = format_prompt(f'{content}', history) stream = client.text_generation(content, **generate_kwargs, stream=True, details=True, return_full_text=False) resp = "" for response in stream: resp += response.token.text if VERBOSE: print(LOG_RESPONSE.format(resp)) return resp def compress_history(purpose, task, history, directory): module_summary, _, _ = read_python_module_structure(directory) resp = run_gpt( COMPRESS_HISTORY_PROMPT, stop_tokens=["observation:", "task:", "action:", "thought:"], max_tokens=512, module_summary=module_summary, purpose=purpose, task=task, history=history, ) history = "observation: {}\n".format(resp) return history def call_search(purpose, task, history, directory, action_input): print("CALLING SEARCH") try: if "http" in action_input: if "<" in action_input: action_input = action_input.strip("<") if ">" in action_input: action_input = action_input.strip(">") response = i_s(action_input) #response = google(search_return) print(response) history += "observation: search result is: {}\n".format(response) else: history += "observation: I need to provide a valid URL to 'action: SEARCH action_input=URL'\n" except Exception as e: history += "observation: {}'\n".format(e) return "MAIN", None, history, task def call_main(purpose, task, history, directory, action_input): module_summary, _, _ = read_python_module_structure(directory) resp = run_gpt( ACTION_PROMPT, stop_tokens=["observation:", "task:"], max_tokens=256, module_summary=module_summary, purpose=purpose, task=task, history=history, ) lines = resp.strip().strip("\n").split("\n") for line in lines: if line == "": continue if line.startswith("thought: "): history += "{}\n".format(line) elif line.startswith("action: "): action_name, action_input = parse_action(line) print (f'ACTION_NAME :: {action_name}') print (f'ACTION_INPUT :: {action_input}') history += "{}\n".format(line) if "COMPLETE" in action_name or "COMPLETE" in action_input: task = "END" return action_name, action_input, history, task else: return action_name, action_input, history, task else: history += "{}\n".format(line) #history += "observation: the following command did not produce any useful output: '{}', I need to check the commands syntax, or use a different command\n".format(line) #return action_name, action_input, history, task #assert False, "unknown action: {}".format(line) return "MAIN", None, history, task def call_test(purpose, task, history, directory, action_input): result = subprocess.run( ["python", "-m", "pytest", "--collect-only", directory], capture_output=True, text=True, ) if result.returncode != 0: history += "observation: there are no tests! Test should be written in a test folder under {}\n".format( directory ) return "MAIN", None, history, task result = subprocess.run( ["python", "-m", "pytest", directory], capture_output=True, text=True ) if result.returncode == 0: history += "observation: tests pass\n" return "MAIN", None, history, task module_summary, content, _ = read_python_module_structure(directory) resp = run_gpt( UNDERSTAND_TEST_RESULTS_PROMPT, stop_tokens=[], max_tokens=256, module_summary=module_summary, purpose=purpose, task=task, history=history, stdout=result.stdout[:5000], # limit amount of text stderr=result.stderr[:5000], # limit amount of text ) history += "observation: tests failed: {}\n".format(resp) return "MAIN", None, history, task def call_set_task(purpose, task, history, directory, action_input): module_summary, content, _ = read_python_module_structure(directory) task = run_gpt( TASK_PROMPT, stop_tokens=[], max_tokens=64, module_summary=module_summary, purpose=purpose, task=task, history=history, ).strip("\n") history += "observation: task has been updated to: {}\n".format(task) return "MAIN", None, history, task def call_read(purpose, task, history, directory, action_input): if not os.path.exists(action_input): history += "observation: file does not exist\n" return "MAIN", None, history, task module_summary, content, _ = read_python_module_structure(directory) f_content = ( content[action_input] if content[action_input] else "< document is empty >" ) resp = run_gpt( READ_PROMPT, stop_tokens=[], max_tokens=256, module_summary=module_summary, purpose=purpose, task=task, history=history, file_path=action_input, file_contents=f_content, ).strip("\n") history += "observation: {}\n".format(resp) return "MAIN", None, history, task def call_modify(purpose, task, history, directory, action_input): if not os.path.exists(action_input): history += "observation: file does not exist\n" return "MAIN", None, history, task ( module_summary, content, _, ) = read_python_module_structure(directory) f_content = ( content[action_input] if content[action_input] else "< document is empty >" ) resp = run_gpt( MODIFY_PROMPT, stop_tokens=["action:", "thought:", "observation:"], max_tokens=2048, module_summary=module_summary, purpose=purpose, task=task, history=history, file_path=action_input, file_contents=f_content, ) new_contents, description = parse_file_content(resp) if new_contents is None: history += "observation: failed to modify file\n" return "MAIN", None, history, task with open(action_input, "w") as f: f.write(new_contents) history += "observation: file successfully modified\n" history += "observation: {}\n".format(description) return "MAIN", None, history, task def call_add(purpose, task, history, directory, action_input): d = os.path.dirname(action_input) if not d.startswith(directory): history += "observation: files must be under directory {}\n".format(directory) elif not action_input.endswith(".py"): history += "observation: can only write .py files\n" else: if d and not os.path.exists(d): os.makedirs(d) if not os.path.exists(action_input): module_summary, _, _ = read_python_module_structure(directory) resp = run_gpt( ADD_PROMPT, stop_tokens=["action:", "thought:", "observation:"], max_tokens=2048, module_summary=module_summary, purpose=purpose, task=task, history=history, file_path=action_input, ) new_contents, description = parse_file_content(resp) if new_contents is None: history += "observation: failed to write file\n" return "MAIN", None, history, task with open(action_input, "w") as f: f.write(new_contents) history += "observation: file successfully written\n" history += "obsertation: {}\n".format(description) else: history += "observation: file already exists\n" return "MAIN", None, history, task def end_fn(purpose, task, history, directory, action_input): task = "END" return "COMPLETE", None, history, task NAME_TO_FUNC = { "MAIN": call_main, "UPDATE-TASK": call_set_task, "SEARCH": call_search, "COMPLETE": end_fn, } def run_action(purpose, task, history, directory, action_name, action_input): if "RESPONSE" in action_name: task="END" return action_name, action_input, history, task # compress the history when it is long if len(history.split("\n")) > MAX_HISTORY: if VERBOSE: print("COMPRESSING HISTORY") history = compress_history(purpose, task, history, directory) assert action_name in NAME_TO_FUNC print("RUN: ", action_name, action_input) return NAME_TO_FUNC[action_name](purpose, task, history, directory, action_input) def run(purpose,hist): print(purpose) print(hist) task=None directory="./" history = "" action_name = "UPDATE-TASK" if task is None else "MAIN" action_input = None while True: print("") print("") print("---") print("purpose:", purpose) print("task:", task) print("---") print(history) print("---") action_name, action_input, history, task = run_action( purpose, task, history, directory, action_name, action_input, ) if task == "END": return history ################################################ def format_prompt(message, history): prompt = "" for user_prompt, bot_response in history: prompt += f"[INST] {user_prompt} [/INST]" prompt += f" {bot_response} " prompt += f"[INST] {message} [/INST]" return prompt agents =[ "WEB_DEV", "AI_SYSTEM_PROMPT", "PYTHON_CODE_DEV" ] def generate( prompt, history, agent_name=agents[0], sys_prompt="", temperature=0.9, max_new_tokens=256, top_p=0.95, repetition_penalty=1.0, ): seed = random.randint(1,1111111111111111) agent=prompts.WEB_DEV if agent_name == "WEB_DEV": agent = prompts.WEB_DEV if agent_name == "AI_SYSTEM_PROMPT": agent = prompts.AI_SYSTEM_PROMPT if agent_name == "PYTHON_CODE_DEV": agent = prompts.PYTHON_CODE_DEV system_prompt=agent temperature = float(temperature) if temperature < 1e-2: temperature = 1e-2 top_p = float(top_p) generate_kwargs = dict( temperature=temperature, max_new_tokens=max_new_tokens, top_p=top_p, repetition_penalty=repetition_penalty, do_sample=True, seed=seed, ) formatted_prompt = format_prompt(f"{system_prompt}, {prompt}", history) stream = client.text_generation(formatted_prompt, **generate_kwargs, stream=True, details=True, return_full_text=False) output = "" for response in stream: output += response.token.text yield output return output additional_inputs=[ gr.Dropdown( label="Agents", choices=[s for s in agents], value=agents[0], interactive=True, ), gr.Textbox( label="System Prompt", max_lines=1, interactive=True, ), gr.Slider( label="Temperature", value=0.9, minimum=0.0, maximum=1.0, step=0.05, interactive=True, info="Higher values produce more diverse outputs", ), gr.Slider( label="Max new tokens", value=1048*10, minimum=0, maximum=1048*10, step=64, interactive=True, info="The maximum numbers of new tokens", ), gr.Slider( label="Top-p (nucleus sampling)", value=0.90, minimum=0.0, maximum=1, step=0.05, interactive=True, info="Higher values sample more low-probability tokens", ), gr.Slider( label="Repetition penalty", value=1.2, minimum=1.0, maximum=2.0, step=0.05, interactive=True, info="Penalize repeated tokens", ), ] examples=[["I'm planning a vacation to Japan. Can you suggest a one-week itinerary including must-visit places and local cuisines to try?", None, None, None, None, None, ], ["Can you write a short story about a time-traveling detective who solves historical mysteries?", None, None, None, None, None,], ["I'm trying to learn French. Can you provide some common phrases that would be useful for a beginner, along with their pronunciations?", None, None, None, None, None,], ["I have chicken, rice, and bell peppers in my kitchen. Can you suggest an easy recipe I can make with these ingredients?", None, None, None, None, None,], ["Can you explain how the QuickSort algorithm works and provide a Python implementation?", None, None, None, None, None,], ["What are some unique features of Rust that make it stand out compared to other systems programming languages like C++?", None, None, None, None, None,], ] gr.ChatInterface( fn=run, chatbot=gr.Chatbot(show_label=False, show_share_button=False, show_copy_button=True, likeable=True, layout="panel"), title="Mixtral 46.7B\nMicro-Agent\nInternet Search", examples=examples, concurrency_limit=20, ).launch(show_api=False)