Spaces:
Sleeping
Sleeping
import os | |
import subprocess | |
import random | |
from huggingface_hub import InferenceClient | |
import gradio as gr | |
from safe_search import safe_search | |
from i_search import google | |
from i_search import i_search as i_s | |
from datetime import datetime | |
import logging | |
import json | |
now = datetime.now() | |
date_time_str = now.strftime("%Y-%m-%d %H:%M:%S") | |
client = InferenceClient( | |
"mistralai/Mixtral-8x7B-Instruct-v0.1" | |
) | |
# --- Set up logging --- | |
logging.basicConfig( | |
filename="app.log", # Name of the log file | |
level=logging.INFO, # Set the logging level (INFO, DEBUG, etc.) | |
format="%(asctime)s - %(levelname)s - %(message)s", | |
) | |
agents = [ | |
"WEB_DEV", | |
"AI_SYSTEM_PROMPT", | |
"PYTHON_CODE_DEV" | |
] | |
############################################ | |
VERBOSE = True | |
MAX_HISTORY = 5 | |
# MODEL = "gpt-3.5-turbo" # "gpt-4" | |
PREFIX = """ | |
{date_time_str} | |
Purpose: {purpose} | |
Safe Search: {safe_search} | |
""" | |
LOG_PROMPT = """ | |
PROMPT: {content} | |
""" | |
LOG_RESPONSE = """ | |
RESPONSE: {resp} | |
""" | |
COMPRESS_HISTORY_PROMPT = """ | |
You are a helpful AI assistant. Your task is to compress the following history into a summary that is no longer than 512 tokens. | |
History: | |
{history} | |
""" | |
ACTION_PROMPT = """ | |
You are a helpful AI assistant. You are working on the task: {task} | |
Your current history is: | |
{history} | |
What is your next thought? | |
thought: | |
What is your next action? | |
action: | |
""" | |
TASK_PROMPT = """ | |
You are a helpful AI assistant. Your current history is: | |
{history} | |
What is the next task? | |
task: | |
""" | |
UNDERSTAND_TEST_RESULTS_PROMPT = """ | |
You are a helpful AI assistant. The test results are: | |
{test_results} | |
What do you want to know about the test results? | |
thought: | |
""" | |
def format_prompt(message, history, max_history_turns=2): | |
prompt = "<s>" | |
# Keep only the last 'max_history_turns' turns | |
for user_prompt, bot_response in history[-max_history_turns:]: | |
prompt += f"[INST] {user_prompt} [/INST]" | |
prompt += f" {bot_response}</s> " | |
prompt += f"[INST] {message} [/INST]" | |
return prompt | |
def run_gpt( | |
prompt_template, | |
stop_tokens, | |
max_tokens, | |
purpose, | |
**prompt_kwargs, | |
): | |
seed = random.randint(1,1111111111111111) | |
logging.info(f"Seed: {seed}") # Log the seed | |
content = PREFIX.format( | |
date_time_str=date_time_str, | |
purpose=purpose, | |
safe_search=safe_search, | |
) + prompt_template.format(**prompt_kwargs) | |
if VERBOSE: | |
logging.info(LOG_PROMPT.format(content)) # Log the prompt | |
resp = client.text_generation(content, max_new_tokens=max_tokens, stop_sequences=stop_tokens, temperature=0.7, top_p=0.8, repetition_penalty=1.5) | |
if VERBOSE: | |
logging.info(LOG_RESPONSE.format(resp)) # Log the response | |
return resp | |
def generate(prompt, history, agent_name=agents[0], sys_prompt="", temperature=0.7, max_new_tokens=2048, top_p=0.8, repetition_penalty=1.5, model="mistralai/Mixtral-8x7B-Instruct-v0.1"): | |
content = PREFIX.format( | |
date_time_str=date_time_str, | |
purpose=purpose, | |
safe_search=safe_search, | |
) + prompt_template.format(**prompt_kwargs) | |
if VERBOSE: | |
logging.info(LOG_PROMPT.format(content)) # Log the prompt | |
stream = client.text_generation(content, **generate_kwargs, stream=True, details=True, return_full_text=False) | |
resp = "" | |
for response in stream: | |
resp += response.token.text | |
if VERBOSE: | |
logging.info(LOG_RESPONSE.format(resp)) # Log the response | |
return resp | |
def compress_history(purpose, task, history, directory): | |
resp = run_gpt( | |
COMPRESS_HISTORY_PROMPT, | |
stop_tokens=["observation:", "task:", "action:", "thought:"], | |
max_tokens=512, | |
purpose=purpose, | |
task=task, | |
history=history, | |
) | |
history = "observation: {}\n".format(resp) | |
return history | |
def call_search(purpose, task, history, directory, action_input): | |
logging.info(f"CALLING SEARCH: {action_input}") | |
try: | |
if "http" in action_input: | |
if "<" in action_input: | |
action_input = action_input.strip("<") | |
if ">" in action_input: | |
action_input = action_input.strip(">") | |
response = i_s(action_input) | |
#response = google(search_return) | |
logging.info(f"Search Result: {response}") | |
history += "observation: search result is: {}\n".format(response) | |
else: | |
history += "observation: I need to provide a valid URL to 'action: SEARCH action_input=https://URL'\n" | |
except Exception as e: | |
history += "observation: {}'\n".format(e) | |
return "MAIN", None, history, task | |
def call_main(purpose, task, history, directory, action_input): | |
logging.info(f"CALLING MAIN: {action_input}") | |
resp = run_gpt( | |
ACTION_PROMPT, | |
stop_tokens=["observation:", "task:", "action:","thought:"], | |
max_tokens=32000, | |
purpose=purpose, | |
task=task, | |
history=history, | |
) | |
lines = resp.strip().strip("\n").split("\n") | |
for line in lines: | |
if line == "": | |
continue | |
if line.startswith("thought: "): | |
history += "{}\n".format(line) | |
logging.info(f"Thought: {line}") | |
elif line.startswith("action: "): | |
action_name, action_input = parse_action(line) | |
logging.info(f"Action: {action_name} - {action_input}") | |
history += "{}\n".format(line) | |
if "COMPLETE" in action_name or "COMPLETE" in action_input: | |
task = "END" | |
return action_name, action_input, history, task | |
else: | |
return action_name, action_input, history, task | |
else: | |
history += "{}\n".format(line) | |
logging.info(f"Other Output: {line}") | |
#history += "observation: the following command did not produce any useful output: '{}', I need to check the commands syntax, or use a different command\n".format(line) | |
#return action_name, action_input, history, task | |
#assert False, "unknown action: {}".format(line) | |
return "MAIN", None, history, task | |
def call_set_task(purpose, task, history, directory, action_input): | |
logging.info(f"CALLING SET_TASK: {action_input}") | |
task = run_gpt( | |
TASK_PROMPT, | |
stop_tokens=[], | |
max_tokens=64, | |
purpose=purpose, | |
task=task, | |
history=history, | |
).strip("\n") | |
history += "observation: task has been updated to: {}\n".format(task) | |
return "MAIN", None, history, task | |
def end_fn(purpose, task, history, directory, action_input): | |
logging.info(f"CALLING END_FN: {action_input}") | |
task = "END" | |
return "COMPLETE", "COMPLETE", history, task | |
NAME_TO_FUNC = { | |
"MAIN": call_main, | |
"UPDATE-TASK": call_set_task, | |
"SEARCH": call_search, | |
"COMPLETE": end_fn, | |
} | |
def run_action(purpose, task, history, directory, action_name, action_input): | |
logging.info(f"RUNNING ACTION: {action_name} - {action_input}") | |
try: | |
if "RESPONSE" in action_name or "COMPLETE" in action_name: | |
action_name="COMPLETE" | |
task="END" | |
return action_name, "COMPLETE", history, task | |
# compress the history when it is long | |
if len(history.split("\n")) > MAX_HISTORY: | |
logging.info("COMPRESSING HISTORY") | |
history = compress_history(purpose, task, history, directory) | |
if not action_name in NAME_TO_FUNC: | |
action_name="MAIN" | |
if action_name == "" or action_name == None: | |
action_name="MAIN" | |
assert action_name in NAME_TO_FUNC | |
logging.info(f"RUN: {action_name} - {action_input}") | |
return NAME_TO_FUNC[action_name](purpose, task, history, directory, action_input) | |
except Exception as e: | |
history += "observation: the previous command did not produce any useful output, I need to check the commands syntax, or use a different command\n" | |
logging.error(f"Error in run_action: {e}") | |
return "MAIN", None, history, task | |
def run(purpose,history): | |
#print(purpose) | |
#print(hist) | |
task=None | |
directory="./" | |
if history: | |
history=str(history).strip("[]") | |
if not history: | |
history = "" | |
action_name = "UPDATE-TASK" if task is None else "MAIN" | |
action_input = None | |
while True: | |
logging.info(f"---") | |
logging.info(f"Purpose: {purpose}") | |
logging.info(f"Task: {task}") | |
logging.info(f"---") | |
logging.info(f"History: {history}") | |
logging.info(f"---") | |
action_name, action_input, history, task = run_action( | |
purpose, | |
task, | |
history, | |
directory, | |
action_name, | |
action_input, | |
) | |
yield (history) | |
#yield ("",[(purpose,history)]) | |
if task == "END": | |
return (history) | |
#return ("", [(purpose,history)]) | |
################################################ | |
def format_prompt(message, history, max_history_turns=5): | |
prompt = "<s>" | |
# Keep only the last 'max_history_turns' turns | |
for user_prompt, bot_response in history[-max_history_turns:]: | |
prompt += f"[INST] {user_prompt} [/INST]" | |
prompt += f" {bot_response}</s> " | |
prompt += f"[INST] {message} [/INST]" | |
return prompt | |
agents =[ | |
"WEB_DEV", | |
"AI_SYSTEM_PROMPT", | |
"PYTHON_CODE_DEV" | |
] | |
def generate( | |
prompt, history, agent_name=agents[0], sys_prompt="", temperature=0.9, max_new_tokens=2048, top_p=0.95, repetition_penalty=1.0, model="mistralai/Mixtral-8x7B-Instruct-v0.1" | |
): | |
seed = random.randint(1,1111111111111111) | |
# Correct the line: | |
if agent_name == "WEB_DEV": | |
agent = "You are a helpful AI assistant. You are a web developer." | |
if agent_name == "AI_SYSTEM_PROMPT": | |
agent = "You are a helpful AI assistant. You are an AI system." | |
if agent_name == "PYTHON_CODE_DEV": | |
agent = "You are a helpful AI assistant. You are a Python code developer." | |
system_prompt = agent | |
temperature = float(temperature) | |
if temperature < 1e-2: | |
temperature = 1e-2 | |
top_p = float(top_p) | |
def generate_text_chunked(input_text, model, generation_parameters, max_tokens_to_generate): | |
"""Generates text in chunks to avoid token limit errors.""" | |
sentences = nltk.sent_tokenize(input_text) | |
generated_text = [] | |
generator = pipeline('text-generation', model=model) | |
for sentence in sentences: | |
# Tokenize the sentence and check if it's within the limit | |
tokens = generator.tokenizer(sentence).input_ids | |
if len(tokens) + max_tokens_to_generate <= 32768: | |
# Generate text for this chunk | |
response = generator(sentence, max_length=max_tokens_to_generate, **generation_parameters) | |
generated_text.append(response[0]['generated_text']) | |
else: | |
# Handle cases where the sentence is too long | |
# You could split the sentence further or skip it | |
print(f"Sentence too long: {sentence}") | |
return ''.join(generated_text) | |
formatted_prompt = format_prompt(prompt, history, max_history_turns=5) # Truncated history | |
logging.info(f"Formatted Prompt: {formatted_prompt}") | |
stream = client.text_generation(formatted_prompt, **generate_kwargs, stream=True, details=True, return_full_text=False) | |
output = "" | |
for response in stream: | |
output += response.token.text | |
yield output | |
return output | |
additional_inputs=[ | |
gr.Dropdown( | |
label="Agents", | |
choices=[s for s in agents], | |
value=agents[0], | |
interactive=True, | |
), | |
gr.Textbox( | |
label="System Prompt", | |
max_lines=1, | |
interactive=True, | |
), | |
gr.Slider( | |
label="Temperature", | |
value=0.9, | |
minimum=0.0, | |
maximum=1.0, | |
step=0.05, | |
interactive=True, | |
info="Higher values produce more diverse outputs", | |
), | |
gr.Slider( | |
label="Max new tokens", | |
value=1048*10, | |
minimum=0, | |
maximum=1048*10, | |
step=64, | |
interactive=True, | |
info="The maximum numbers of new tokens", | |
), | |
gr.Slider( | |
label="Top-p (nucleus sampling)", | |
value=0.90, | |
minimum=0.0, | |
maximum=1, | |
step=0.05, | |
interactive=True, | |
info="Higher values sample more low-probability tokens", | |
), | |
gr.Slider( | |
label="Repetition penalty", | |
value=1.2, | |
minimum=1.0, | |
maximum=2.0, | |
step=0.05, | |
interactive=True, | |
info="Penalize repeated tokens", | |
), | |
] | |
examples = [ | |
["Help me set up TypeScript configurations and integrate ts-loader in my existing React project.", | |
"Update Webpack Configurations", | |
"Install Dependencies", | |
"Configure Ts-Loader", | |
"TypeChecking Rules Setup", | |
"React Specific Settings", | |
"Compilation Options", | |
"Test Runner Configuration"], | |
["Guide me through building a serverless microservice using AWS Lambda and API Gateway, connecting to DynamoDB for storage.", | |
"Set Up AWS Account", | |
"Create Lambda Function", | |
"APIGateway Integration", | |
"Define DynamoDB Table Scheme", | |
"Connect Service To DB", | |
"Add Authentication Layers", | |
"Monitor Metrics and Set Alarms"], | |
["Migrate our current monolithic PHP application towards containerized services using Docker and Kubernetes for scalability.", | |
"Architectural Restructuring Plan", | |
"Containerisation Process With Docker", | |
"Service Orchestration With Kubernetes", | |
"Load Balancing Strategies", | |
"Persistent Storage Solutions", | |
"Network Policies Enforcement", | |
"Continuous Integration / Continuous Delivery"], | |
["Provide guidance on integrating WebAssembly modules compiled from C++ source files into an ongoing web project.", | |
"Toolchain Selection (Emscripten vs. LLVM)", | |
"Setting Up Compiler Environment", | |
".cpp Source Preparation", | |
"Module Building Approach", | |
"Memory Management Considerations", | |
"Performance Tradeoffs", | |
"Seamless Web Assembly Embedding"] | |
] | |
def parse_action(line): | |
action_name, action_input = line.strip("action: ").split("=") | |
action_input = action_input.strip() | |
return action_name, action_input | |
def get_file_tree(path): | |
""" | |
Recursively explores a directory and returns a nested dictionary representing its file tree. | |
""" | |
tree = {} | |
for item in os.listdir(path): | |
item_path = os.path.join(path, item) | |
if os.path.isdir(item_path): | |
tree[item] = get_file_tree(item_path) | |
else: | |
tree[item] = None | |
return tree | |
def display_file_tree(tree, indent=0): | |
""" | |
Prints a formatted representation of the file tree. | |
""" | |
for name, subtree in tree.items(): | |
print(f"{' ' * indent}{name}") | |
if subtree is not None: | |
display_file_tree(subtree, indent + 1) | |
def project_explorer(path): | |
""" | |
Displays the file tree of a given path in a Streamlit app. | |
""" | |
tree = get_file_tree(path) | |
display_file_tree(tree) | |
def chat_app_logic(message, history, purpose, agent_name, sys_prompt, temperature, max_new_tokens, top_p, repetition_penalty, model): | |
# Your existing code here | |
try: | |
# Attempt to join the generator output | |
response = ''.join(generate( | |
model=model, | |
message=message, | |
stream=True, | |
temperature=0.7, | |
max_tokens=1500 | |
)) | |
except TypeError: | |
# If joining fails, collect the output in a list | |
response_parts = [] | |
for part in generate( | |
model=model, | |
message=message, | |
stream=True, | |
temperature=0.7, | |
max_tokens=1500 | |
): | |
if isinstance(part, str): | |
response_parts.append(part) | |
elif isinstance(part, dict) and 'content' in part: | |
response_parts.append(part['content']), | |
response = ''.join(response_parts, | |
# Run the model and get the response (convert generator to string) | |
prompt=message, | |
history=history, | |
agent_name=agent_name, | |
sys_prompt=sys_prompt, | |
temperature=temperature, | |
max_new_tokens=max_new_tokens, | |
top_p=top_p, | |
repetition_penalty=repetition_penalty, | |
model=model # Pass the model argument here | |
) | |
history.append((message, response)) | |
return history | |
return history | |
def main(): | |
with gr.Blocks() as demo: | |
gr.Markdown("## FragMixt") | |
gr.Markdown("### Agents w/ Agents") | |
# Chat Interface | |
chatbot = gr.Chatbot(show_label=False, show_share_button=False, show_copy_button=True, likeable=True, layout="panel") | |
#chatbot.load(examples) | |
# Input Components | |
message = gr.Textbox(label="Enter your message", placeholder="Ask me anything!") | |
purpose = gr.Textbox(label="Purpose", placeholder="What is the purpose of this interaction?") | |
agent_name = gr.Dropdown(label="Agents", choices=[s for s in agents], value=agents[0], interactive=True) | |
sys_prompt = gr.Textbox(label="System Prompt", max_lines=1, interactive=True) | |
temperature = gr.Slider(label="Temperature", value=0.9, minimum=0.0, maximum=1.0, step=0.05, interactive=True, info="Higher values produce more diverse outputs") | |
max_new_tokens = gr.Slider(label="Max new tokens", value=1048*10, minimum=0, maximum=1048*10, step=64, interactive=True, info="The maximum numbers of new tokens") | |
top_p = gr.Slider(label="Top-p (nucleus sampling)", value=0.90, minimum=0.0, maximum=1, step=0.05, interactive=True, info="Higher values sample more low-probability tokens") | |
repetition_penalty = gr.Slider(label="Repetition penalty", value=1.2, minimum=1.0, maximum=2.0, step=0.05, interactive=True, info="Penalize repeated tokens") | |
model_input = gr.Textbox(label="Model", value="mistralai/Mixtral-8x7B-Instruct-v0.1", visible=False) | |
# Button to submit the message | |
submit_button = gr.Button(value="Send") | |
# Project Explorer Tab | |
with gr.Tab("Project Explorer"): | |
project_path = gr.Textbox(label="Project Path", placeholder="/home/user/app/current_project") | |
explore_button = gr.Button(value="Explore") | |
project_output = gr.Textbox(label="File Tree", lines=20) | |
# Chat App Logic Tab | |
with gr.Tab("Chat App"): | |
history = gr.State([]) | |
for example in examples: | |
gr.Button(value=example[0]).click(lambda: chat_app_logic(example[0], history, purpose, agent_name, sys_prompt, temperature, max_new_tokens, top_p, repetition_penalty, model=model_input), outputs=chatbot) | |
# Connect components to the chat app logic | |
submit_button.click(chat_app_logic, inputs=[message, history, purpose, agent_name, sys_prompt, temperature, max_new_tokens, top_p, repetition_penalty, model_input], outputs=chatbot) | |
message.submit(chat_app_logic, inputs=[message, history, purpose, agent_name, sys_prompt, temperature, max_new_tokens, top_p, repetition_penalty, model_input], outputs=chatbot) | |
# Connect components to the project explorer | |
explore_button.click(project_explorer, inputs=project_path, outputs=project_output) | |
demo.launch(show_api=True) | |
if __name__ == "__main__": | |
main() |