import os from huggingface_hub import InferenceClient import gradio as gr import random from transformers import pipeline, AutoModelForCausalLM, AutoTokenizer import subprocess import threading import time import json import streamlit as st # Initialize the session state if 'current_state' not in st.session_state: st.session_state.current_state = None # Initialize the InferenceClient for Mixtral-8x7B-Instruct-v0.1 client = InferenceClient("mistralai/Mixtral-8x7B-Instruct-v0.1") # Load the model and tokenizer from a different repository model_name = "bigscience/bloom-1b7" model = AutoModelForCausalLM.from_pretrained(model_name) tokenizer = AutoTokenizer.from_pretrained(model_name) from agent.prompts import ( AI_SYSTEM_PROMPT, CODE_REVIEW_ASSISTANT, CONTENT_WRITER_EDITOR, PYTHON_CODE_DEV, WEB_DEV, QUESTION_GENERATOR, HUGGINGFACE_FILE_DEV, ) from agent.utils import parse_action, parse_file_content, read_python_module_structure # Hugging Face model and tokenizer setup model_name = "gpt2" tokenizer = AutoTokenizer.from_pretrained(model_name) model = AutoModelForCausalLM.from_pretrained(model_name) generator = pipeline('text-generation', model=model, tokenizer=tokenizer) VERBOSE = False MAX_HISTORY = 100 def run_gpt(prompt_template, stop_tokens, max_tokens, module_summary, purpose, **prompt_kwargs): content = PREFIX.format( module_summary=module_summary, purpose=purpose, ) + prompt_template.format(**prompt_kwargs) if VERBOSE: st.write(LOG_PROMPT.format(content)) resp = generator(content, max_length=max_tokens, stop=stop_tokens)[0]["generated_text"] if VERBOSE: st.write(LOG_RESPONSE.format(resp)) return resp def compress_history(purpose, task, history, directory): module_summary, _, _ = read_python_module_structure(directory) resp = run_gpt( COMPRESS_HISTORY_PROMPT, stop_tokens=["observation:", "task:", "action:", "thought:"], max_tokens=512, module_summary=module_summary, purpose=purpose, task=task, history=history, ) history = "observation: {}\n".format(resp) return history def call_main(purpose, task, history, directory, action_input): module_summary, _, _ = read_python_module_structure(directory) resp = run_gpt( ACTION_PROMPT, stop_tokens=["observation:", "task:"], max_tokens=256, module_summary=module_summary, purpose=purpose, task=task, history=history, ) lines = resp.strip().strip("\n").split("\n") for line in lines: if line == "": continue if line.startswith("thought: "): history += "{}\n".format(line) elif line.startswith("action: "): action_name, action_input = parse_action(line) history += "{}\n".format(line) return action_name, action_input, history, task else: assert False, "unknown action: {}".format(line) return "MAIN", None, history, task def call_test(purpose, task, history, directory, action_input): result = subprocess.run( ["python", "-m", "pytest", "--collect-only", directory], capture_output=True, text=True, ) if result.returncode != 0: history += "observation: there are no tests! Test should be written in a test folder under {}\n".format( directory ) return "MAIN", None, history, task result = subprocess.run( ["python", "-m", "pytest", directory], capture_output=True, text=True ) if result.returncode == 0: history += "observation: tests pass\n" return "MAIN", None, history, task module_summary, content, _ = read_python_module_structure(directory) resp = run_gpt( UNDERSTAND_TEST_RESULTS_PROMPT, stop_tokens=[], max_tokens=256, module_summary=module_summary, purpose=purpose, task=task, history=history, stdout=result.stdout[:5000], # limit amount of text stderr=result.stderr[:5000], # limit amount of text ) history += "observation: tests failed: {}\n".format(resp) return "MAIN", None, history, task def call_set_task(purpose, task, history, directory, action_input): module_summary, content, _ = read_python_module_structure(directory) task = run_gpt( TASK_PROMPT, stop_tokens=[], max_tokens=64, module_summary=module_summary, purpose=purpose, task=task, history=history, ).strip("\n") history += "observation: task has been updated to: {}\n".format(task) return "MAIN", None, history, task def call_read(purpose, task, history, directory, action_input): if not os.path.exists(action_input): history += "observation: file does not exist\n" return "MAIN", None, history, task module_summary, content, _ = read_python_module_structure(directory) f_content = ( content[action_input] if content[action_input] else "< document is empty >" ) resp = run_gpt( READ_PROMPT, stop_tokens=[], max_tokens=256, module_summary=module_summary, purpose=purpose, task=task, history=history, file_path=action_input, file_contents=f_content, ).strip("\n") history += "observation: {}\n".format(resp) return "MAIN", None, history, task def call_modify(purpose, task, history, directory, action_input): if not os.path.exists(action_input): history += "observation: file does not exist\n" return "MAIN", None, history, task ( module_summary, content, _, ) = read_python_module_structure(directory) f_content = ( content[action_input] if content[action_input] else "< document is empty >" ) resp = run_gpt( MODIFY_PROMPT, stop_tokens=["action:", "thought:", "observation:"], max_tokens=2048, module_summary=module_summary, purpose=purpose, task=task, history=history, file_path=action_input, file_contents=f_content, ) new_contents, description = parse_file_content(resp) if new_contents is None: history += "observation: failed to modify file\n" return "MAIN", None, history, task with open(action_input, "w") as f: f.write(new_contents) history += "observation: file successfully modified\n" history += "observation: {}\n".format(description) return "MAIN", None, history, task def call_add(purpose, task, history, directory, action_input): d = os.path.dirname(action_input) if not d.startswith(directory): history += "observation: files must be under directory {}\n".format(directory) elif not action_input.endswith(".py"): history += "observation: can only write .py files\n" else: if d and not os.path.exists(d): os.makedirs(d) if not os.path.exists(action_input): module_summary, _, _ = read_python_module_structure(directory) resp = run_gpt( ADD_PROMPT, stop_tokens=["action:", "thought:", "observation:"], max_tokens=2048, module_summary=module_summary, purpose=purpose, task=task, history=history, file_path=action_input, ) new_contents, description = parse_file_content(resp) if new_contents is None: history += "observation: failed to write file\n" return "MAIN", None, history, task with open(action_input, "w") as f: f.write(new_contents) history += "observation: file successfully written\n" history += "observation: {}\n".format(description) else: history += "observation: file already exists\n" return "MAIN", None, history, task # Streamlit UI st.title("AI Powered Code Assistant") with st.sidebar: st.header("Task Configuration") purpose = st.text_input("Purpose") task = st.text_input("Task") directory = st.text_input("Directory") action_input = st.text_input("Action Input") action = st.selectbox("Action", ["main", "test", "set_task", "read", "modify", "add"]) if st.button("Run Action"): history = "" if action == "main": action_name, action_input, history, task = call_main(purpose, task, history, directory, action_input) elif action == "test": action_name, action_input, history, task = call_test(purpose, task, history, directory, action_input) elif action == "set_task": action_name, action_input, history, task = call_set_task(purpose, task, history, directory, action_input) elif action == "read": action_name, action_input, history, task = call_read(purpose, task, history, directory, action_input) elif action == "modify": action_name, action_input, history, task = call_modify(purpose, task, history, directory, action_input) elif action == "add": action_name, action_input, history, task = call_add(purpose, task, history, directory, action_input) st.subheader("History") st.write(history)