Spaces:
Sleeping
Sleeping
import os | |
import subprocess | |
import random | |
from huggingface_hub import InferenceClient | |
import gradio as gr | |
from safe_search import safe_search | |
from i_search import google | |
from i_search import i_search as i_s | |
from agent import ( | |
ACTION_PROMPT, | |
ADD_PROMPT, | |
COMPRESS_HISTORY_PROMPT, | |
LOG_PROMPT, | |
LOG_RESPONSE, | |
MODIFY_PROMPT, | |
PREFIX, | |
SEARCH_QUERY, | |
READ_PROMPT, | |
TASK_PROMPT, | |
UNDERSTAND_TEST_RESULTS_PROMPT, | |
) | |
from utils import parse_action, parse_file_content, read_python_module_structure | |
from datetime import datetime | |
import yaml | |
import logging | |
# Create a directory for logs if it doesn't exist | |
log_dir = "logs" | |
if not os.path.exists(log_dir): | |
os.makedirs(log_dir) | |
# Configure logging | |
logging.basicConfig( | |
filename=os.path.join(log_dir, "gradio_log.txt"), | |
level=logging.INFO, | |
format="%(asctime)s [%(levelname)s] %(message)s", | |
filemode="a+", | |
) | |
# Use the logger | |
logger = logging.getLogger(__name__) | |
# Load custom prompts | |
try: | |
with open('custom_prompts.yaml', 'r') as fp: | |
custom_prompts = yaml.load(fp, Loader=yaml.FullLoader) | |
except FileNotFoundError: | |
custom_prompts = { | |
"WEB_DEV": "", | |
"AI_SYSTEM_PROMPT": "", | |
"PYTHON_CODE_DEV": "", | |
"CODE_GENERATION": "", | |
"CODE_INTERPRETATION": "", | |
"CODE_TRANSLATION": "", | |
"CODE_IMPLEMENTATION": "" | |
} | |
for key, val in custom_prompts.items(): | |
globals()[key] = val | |
# Define advanced prompts | |
CODE_GENERATION = """ | |
You are an expert AI code generation assistant. Your task is to generate high-quality, production-ready code based on the given requirements. You should be able to generate code in various programming languages, including Python, JavaScript, Java, C++, and more. | |
When generating code, follow these guidelines: | |
1. Understand the requirements thoroughly and ask clarifying questions if needed. | |
2. Write clean, modular, and maintainable code following best practices and industry standards. | |
3. Implement proper error handling, input validation, and edge case handling. | |
4. Optimize the code for performance and scalability when necessary. | |
5. Provide clear and concise comments to explain the code's functionality and logic. | |
6. If applicable, suggest and implement testing strategies (unit tests, integration tests, etc.). | |
7. Ensure the generated code is compatible with the target environment (e.g., web, mobile, desktop). | |
8. Provide examples or usage instructions if required. | |
Remember to always prioritize code quality, maintainability, and security. Your generated code should be ready for production use or further development. | |
""" | |
CODE_INTERPRETATION = """ | |
You are an expert AI code interpretation assistant. Your task is to analyze and explain existing code in various programming languages, including Python, JavaScript, Java, C++, and more. | |
When interpreting code, follow these guidelines: | |
1. Read and understand the code thoroughly, including its functionality, logic, and structure. | |
2. Identify and explain the purpose of each code block, function, or module. | |
3. Highlight any potential issues, inefficiencies, or areas for improvement. | |
4. Suggest refactoring or optimization techniques if applicable. | |
5. Explain the code's input and output, as well as any dependencies or external libraries used. | |
6. Provide clear and concise explanations, using code comments or separate documentation. | |
7. If applicable, explain the testing strategies or methodologies used in the code. | |
8. Ensure your interpretations are accurate, unbiased, and tailored to the target audience's skill level. | |
Remember to prioritize clarity, accuracy, and completeness in your code interpretations. Your explanations should help developers understand the code's functionality and potential areas for improvement. | |
""" | |
CODE_TRANSLATION = """ | |
You are an expert AI code translation assistant. Your task is to translate code from one programming language to another, ensuring the translated code maintains the original functionality and follows best practices in the target language. | |
When translating code, follow these guidelines: | |
1. Understand the original code's functionality, logic, and structure thoroughly. | |
2. Identify and translate all code elements, including variables, functions, classes, and data structures. | |
3. Ensure the translated code adheres to the coding conventions and best practices of the target language. | |
4. Optimize the translated code for performance and readability in the target language. | |
5. Preserve comments and documentation, translating them to the target language if necessary. | |
6. Handle any language-specific features or constructs appropriately during the translation process. | |
7. Implement error handling, input validation, and edge case handling in the translated code. | |
8. Provide clear and concise comments or documentation to explain any necessary changes or deviations from the original code. | |
Remember to prioritize accuracy, maintainability, and idiomatic usage in the target language. Your translated code should be functionally equivalent to the original code while adhering to the best practices of the target language. | |
""" | |
CODE_IMPLEMENTATION = """ | |
You are an expert AI code implementation assistant. Your task is to take existing code or requirements and implement them in a production-ready environment, ensuring proper integration, deployment, and maintenance. | |
When implementing code, follow these guidelines: | |
1. Understand the code's functionality, dependencies, and requirements thoroughly. | |
2. Set up the appropriate development environment, including installing necessary tools, libraries, and frameworks. | |
3. Integrate the code with existing systems, APIs, or databases, if applicable. | |
4. Implement proper configuration management, version control, and continuous integration/deployment processes. | |
5. Ensure the code is properly tested, including unit tests, integration tests, and end-to-end tests. | |
6. Optimize the code for performance, scalability, and security in the production environment. | |
7. Implement monitoring, logging, and error handling mechanisms for the deployed code. | |
8. Document the implementation process, including any specific configurations, deployment steps, or maintenance procedures. | |
Remember to prioritize reliability, maintainability, and scalability in your code implementations. Your implementations should be production-ready, well-documented, and aligned with industry best practices for software development and deployment. | |
""" | |
# Update the custom_prompts dictionary with the new prompts | |
custom_prompts.update({ | |
"CODE_GENERATION": CODE_GENERATION, | |
"CODE_INTERPRETATION": CODE_INTERPRETATION, | |
"CODE_TRANSLATION": CODE_TRANSLATION, | |
"CODE_IMPLEMENTATION": CODE_IMPLEMENTATION | |
}) | |
now = datetime.now() | |
date_time_str = now.strftime("%Y-%m-%d %H:%M:%S") | |
client = InferenceClient("mistralai/Mixtral-8x7B-Instruct-v0.1") | |
############################################ | |
VERBOSE = True | |
MAX_HISTORY = 125 | |
def format_prompt(message, history): | |
prompt = "<s>" | |
for user_prompt, bot_response in history: | |
prompt += f"[INST] {user_prompt} [/INST]" | |
prompt += f" {bot_response}</s> " | |
prompt += f"[INST] {message} [/INST]" | |
return prompt | |
def run_gpt(prompt_template, stop_tokens, max_tokens, purpose, **prompt_kwargs): | |
seed = random.randint(1, 1111111111111111) | |
print(seed) | |
generate_kwargs = dict( | |
temperature=1.0, | |
max_new_tokens=2096, | |
top_p=0.99, | |
repetition_penalty=1.7, | |
do_sample=True, | |
seed=seed, | |
) | |
content = PREFIX.format( | |
date_time_str=date_time_str, | |
purpose=purpose, | |
safe_search=safe_search, | |
) + prompt_template.format(**prompt_kwargs) | |
if VERBOSE: | |
print(LOG_PROMPT.format(content)) | |
stream = client.text_generation(content, **generate_kwargs, stream=True, details=True, return_full_text=False) | |
resp = "" | |
for response in stream: | |
resp += response.token.text | |
if VERBOSE: | |
print(LOG_RESPONSE.format(resp)) | |
return resp | |
def compress_history(purpose, task, history, directory): | |
resp = run_gpt( | |
COMPRESS_HISTORY_PROMPT, | |
stop_tokens=["observation:", "task:", "action:", "thought:"], | |
max_tokens=5096, | |
purpose=purpose, | |
task=task, | |
history=history, | |
) | |
history = "observation: {}\n".format(resp) | |
return history | |
def call_search(purpose, task, history, directory, action_input): | |
print("CALLING SEARCH") | |
try: | |
if "http" in action_input: | |
if "<" in action_input: | |
action_input = action_input.strip("<") | |
if ">" in action_input: | |
action_input = action_input.strip(">") | |
response = i_s(action_input) | |
print(response) | |
history += "observation: search result is: {}\n".format(response) | |
else: | |
history += "observation: I need to provide a valid URL to 'action: SEARCH action_input=https://URL'\n" | |
except Exception as e: | |
history += "observation: {}'\n".format(e) | |
return "MAIN", None, history, task | |
def call_main(purpose, task, history, directory, action_input): | |
resp = run_gpt( | |
ACTION_PROMPT, | |
stop_tokens=["observation:", "task:", "action:", "thought:"], | |
max_tokens=5096, | |
purpose=purpose, | |
task=task, | |
history=history, | |
) | |
lines = resp.strip().strip("\n").split("\n") | |
for line in lines: | |
if line == "": | |
continue | |
if line.startswith("thought: "): | |
history += "{}\n".format(line) | |
elif line.startswith("action: "): | |
action_name, action_input = parse_action(line) | |
print(f'ACTION_NAME :: {action_name}') | |
print(f'ACTION_INPUT :: {action_input}') | |
history += "{}\n".format(line) | |
if "COMPLETE" in action_name or "COMPLETE" in action_input: | |
task = "END" | |
return action_name, action_input, history, task | |
else: | |
return action_name, action_input, history, task | |
else: | |
history += "{}\n".format(line) | |
return "MAIN", None, history, task | |
def call_set_task(purpose, task, history, directory, action_input): | |
task = run_gpt( | |
TASK_PROMPT, | |
stop_tokens=[], | |
max_tokens=2048, | |
purpose=purpose, | |
task=task, | |
history=history, | |
).strip("\n") | |
history += "observation: task has been updated to: {}\n".format(task) | |
return "MAIN", None, history, task | |
def end_fn(purpose, task, history, directory, action_input): | |
task = "END" | |
return "COMPLETE", "COMPLETE", history, task | |
NAME_TO_FUNC = { | |
"MAIN": call_main, | |
"UPDATE-TASK": call_set_task, | |
"SEARCH": call_search, | |
"COMPLETE": end_fn, | |
} | |
def run_action(purpose, task, history, directory, action_name, action_input): | |
print(f'action_name::{action_name}') | |
try: | |
if "RESPONSE" in action_name or "COMPLETE" in action_name: | |
action_name = "COMPLETE" | |
task = "END" | |
return action_name, "COMPLETE", history, task | |
# compress the history when it is long | |
if len(history.split("\n")) > MAX_HISTORY: | |
if VERBOSE: | |
print("COMPRESSING HISTORY") | |
history = compress_history(purpose, task, history, directory) | |
if not action_name in NAME_TO_FUNC: | |
action_name = "MAIN" | |
if action_name == "" or action_name is None: | |
action_name = "MAIN" | |
assert action_name in NAME_TO_FUNC | |
print("RUN: ", action_name, action_input) | |
return NAME_TO_FUNC[action_name](purpose, task, history, directory, action_input) | |
except Exception as e: | |
history += "observation: the previous command did not produce any useful output, I need to check the commands syntax, or use a different command\n" | |
return "MAIN", None, history, task | |
def run(purpose, history): | |
task = None | |
directory = "./" | |
if history: | |
history = str(history).strip("[]") | |
if not history: | |
history = "" | |
action_name = "UPDATE-TASK" if task is None else "MAIN" | |
action_input = None | |
while True: | |
print("") | |
print("") | |
print("---") | |
print("purpose:", purpose) | |
print("task:", task) | |
print("---") | |
print(history) | |
print("---") | |
action_name, action_input, history, task = run_action( | |
purpose, | |
task, | |
history, | |
directory, | |
action_name, | |
action_input, | |
) | |
yield (history) | |
if task == "END": | |
return (history) | |
################################################ | |
agents = [ | |
"WEB_DEV", | |
"AI_SYSTEM_PROMPT", | |
"PYTHON_CODE_DEV" | |
] | |
def generate( | |
prompt, history, agent_name=agents[0], sys_prompt="", temperature=0.9, max_new_tokens=256, top_p=0.95, repetition_penalty=1.7, | |
): | |
seed = random.randint(1, 1111111111111111) | |
agent = prompts["WEB_DEV"] | |
if agent_name == "WEB_DEV": | |
agent = prompts["WEB_DEV"] | |
elif agent_name == "AI_SYSTEM_PROMPT": | |
agent = prompts["AI_SYSTEM_PROMPT"] | |
elif agent_name == "PYTHON_CODE_DEV": | |
agent = prompts["PYTHON_CODE_DEV"] | |
system_prompt = agent | |
temperature = float(temperature) | |
if temperature < 1e-2: | |
temperature = 1e-2 | |
top_p = float(top_p) | |
generate_kwargs = dict( | |
temperature=temperature, | |
max_new_tokens=max_new_tokens, | |
top_p=top_p, | |
repetition_penalty=repetition_penalty, | |
do_sample=True, | |
seed=seed, | |
) | |
formatted_prompt = format_prompt(f"{system_prompt}, {prompt}", history) | |
output = client.text_generation(formatted_prompt, **generate_kwargs, stream=False, return_full_text=True) | |
return output | |
# Define input and output components | |
with gr.Blocks() as iface: | |
# Input components | |
input_text = gr.Textbox(label="Input Text") | |
# Other input components... | |
# Output components | |
output_text = gr.Textbox(label="Output Text") | |
# Other output components... | |
# Specify inputs and events | |
inputs = [input_text, ...] # List of input components | |
events = [output_text, ...] # List of output components | |
def log_messages(inputs, outputs): | |
logger(f'Input: {inputs}, Output: {outputs}') | |
def log_messages(inputs, outputs): | |
logger(f'Input: {inputs}, Output: {outputs}') | |
def update_sys_prompt(agent): | |
global SYSTEM_PROMPT | |
SYSTEM_PROMPT = globals()[agent] | |
def get_helpful_tip(agent): | |
if agent == 'WEB_DEV': | |
return "Provide information related to Web Development tasks." | |
elif agent == 'AI_SYSTEM_PROMPT': | |
return "Update the system instructions for the assistant here." | |
elif agent == 'PYTHON_CODE_DEV': | |
return "Describe what you want me to help you with regarding Python coding tasks." | |
elif agent == 'CODE_GENERATION': | |
return "Provide requirements for the code you want me to generate." | |
elif agent == 'CODE_INTERPRETATION': | |
return "Share the code you want me to analyze and explain." | |
elif agent == 'CODE_TRANSLATION': | |
return "Specify the source and target programming languages, and provide the code you want me to translate." | |
elif agent == 'CODE_IMPLEMENTATION': | |
return "Provide the code or requirements you want me to implement in a production-ready environment." | |
def chat_interface(prompt, history, agent_name, sys_prompt, temperature, max_new_tokens, top_p, repetition_penalty): | |
generated_text = generate(prompt, history, agent_name, sys_prompt, temperature, max_new_tokens, top_p, repetition_penalty) | |
chatbot_response = [(prompt, generated_text)] | |
return chatbot_response, "" # Return the chatbot response and an empty string for the message textbox | |
additional_inputs = [ | |
gr.Dropdown( | |
label="Agents", | |
choices=[s for s in agents], | |
value=agents[0], | |
interactive=True, | |
), | |
gr.Textbox( | |
label="System Prompt", | |
max_lines=1, | |
interactive=True, | |
), | |
gr.Slider( | |
label="Temperature", | |
value=0.9, | |
minimum=0.0, | |
maximum=1.0, | |
step=0.05, | |
interactive=True, | |
info="Higher values produce more diverse outputs", | |
), | |
gr.Slider( | |
label="Max new tokens", | |
value=1048 * 10, | |
minimum=0, | |
maximum=1048 * 10, | |
step=64, | |
interactive=True, | |
info="The maximum numbers of new tokens", | |
), | |
gr.Slider( | |
label="Top-p (nucleus sampling)", | |
value=0.90, | |
minimum=0.0, | |
maximum=1, | |
step=0.05, | |
interactive=True, | |
info="Higher values sample more low-probability tokens", | |
), | |
gr.Slider( | |
label="Repetition penalty", | |
value=1.2, | |
minimum=1.0, | |
maximum=2.0, | |
step=0.05, | |
interactive=True, | |
info="Penalize repeated tokens", | |
), | |
] | |
examples = [ | |
["Based on previous interactions, generate an interactive preview of the user's requested application.", None, None, None, None, None], | |
["Utilize the relevant code snippets and components from previous interactions.", None, None, None, None, None], | |
["Assemble a working demo that showcases the core functionality of the application.", None, None, None, None, None], | |
["Present the demo in an interactive environment within the Gradio interface.", None, None, None, None, None], | |
["Allow the user to explore and interact with the demo to test its features.", None, None, None, None, None], | |
["Gather feedback from the user about the demo and potential improvements.", None, None, None, None, None], | |
["If the user approves of the app's running state, provide a bash script that will automate all aspects of a local run and a docker image for ease-of-launch in addition to the huggingface-ready app.py with all functions and GUI, and the requirements.txt file comprised of all required libraries and packages the application is dependent on, avoiding OpenAI API at all points since we only use Hugging Face transformers, models, agents, libraries, and API.", None, None, None, None, None], | |
] | |
def create_interface(): | |
with gr.Blocks() as iface: | |
gr.ChatInterface( | |
fn=generate, | |
title="Fragmixt\nAgents With Agents,\nSurf With a Purpose", | |
examples=examples, | |
additional_inputs=additional_inputs, | |
) | |
return iface | |
iface = gr.Blocks() | |
with iface: | |
gr.Markdown("# Fragmixt\nAgents With Agents,\nSurf With a Purpose") | |
chatbot = gr.Chatbot() | |
msg = gr.Textbox() | |
clear = gr.Button("Clear") | |
agent_dropdown = gr.Dropdown(label="Agents", choices=agents, value=agents[0]) | |
sys_prompt = gr.Textbox(label="System Prompt", max_lines=1) | |
temperature = gr.Slider(label="Temperature", value=0.9, minimum=0.0, maximum=1.0, step=0.05) | |
max_new_tokens = gr.Slider(label="Max new tokens", value=1048 * 10, minimum=0, maximum=1048 * 10, step=64) | |
top_p = gr.Slider(label="Top-p (nucleus sampling)", value=0.90, minimum=0.0, maximum=1, step=0.05) | |
repetition_penalty = gr.Slider(label="Repetition penalty", value=1.2, minimum=1.0, maximum=2.0, step=0.05) | |
msg.submit(chat_interface, | |
[msg, chatbot, agent_dropdown, sys_prompt, temperature, max_new_tokens, top_p, repetition_penalty], | |
[chatbot, msg]) | |
clear.click(lambda: None, None, chatbot, queue=False) | |
gr.Examples(examples, [msg, agent_dropdown, sys_prompt, temperature, max_new_tokens, top_p, repetition_penalty]) | |
iface.load(fn=get_helpful_tip, inputs=agent_dropdown, outputs=ui.info) | |
iface.load(fn=update_sys_prompt, inputs=agent_dropdown, outputs=sys_prompt) | |
iface.load(fn=log_messages, input_type='state', event_type='save_model') | |
iface.launch() |