|
import json |
|
import regex |
|
import traceback |
|
|
|
from colorama import Fore, Style |
|
|
|
from autogpt.chat import chat_with_ai, create_chat_message |
|
import autogpt.commands as cmd |
|
from autogpt.config import Config |
|
from autogpt.json_parser import fix_and_parse_json |
|
from autogpt.logger import logger |
|
from autogpt.speak import say_text |
|
from autogpt.spinner import Spinner |
|
from autogpt.utils import clean_input |
|
|
|
|
|
class Agent: |
|
"""Agent class for interacting with Auto-GPT. |
|
|
|
Attributes: |
|
ai_name: The name of the agent. |
|
memory: The memory object to use. |
|
full_message_history: The full message history. |
|
next_action_count: The number of actions to execute. |
|
prompt: The prompt to use. |
|
user_input: The user input. |
|
|
|
""" |
|
|
|
def __init__( |
|
self, |
|
ai_name, |
|
memory, |
|
full_message_history, |
|
next_action_count, |
|
prompt, |
|
user_input, |
|
): |
|
self.ai_name = ai_name |
|
self.memory = memory |
|
self.full_message_history = full_message_history |
|
self.next_action_count = next_action_count |
|
self.prompt = prompt |
|
self.user_input = user_input |
|
|
|
def start_interaction_loop(self): |
|
|
|
cfg = Config() |
|
loop_count = 0 |
|
command_name = None |
|
arguments = None |
|
while True: |
|
|
|
loop_count += 1 |
|
if ( |
|
cfg.continuous_mode |
|
and cfg.continuous_limit > 0 |
|
and loop_count > cfg.continuous_limit |
|
): |
|
logger.typewriter_log( |
|
"Continuous Limit Reached: ", Fore.YELLOW, f"{cfg.continuous_limit}" |
|
) |
|
break |
|
|
|
|
|
with Spinner("Thinking... "): |
|
assistant_reply = chat_with_ai( |
|
self.prompt, |
|
self.user_input, |
|
self.full_message_history, |
|
self.memory, |
|
cfg.fast_token_limit, |
|
) |
|
|
|
|
|
print_assistant_thoughts(self.ai_name, assistant_reply) |
|
|
|
|
|
try: |
|
command_name, arguments = cmd.get_command( |
|
attempt_to_fix_json_by_finding_outermost_brackets(assistant_reply) |
|
) |
|
if cfg.speak_mode: |
|
say_text(f"I want to execute {command_name}") |
|
except Exception as e: |
|
logger.error("Error: \n", str(e)) |
|
|
|
if not cfg.continuous_mode and self.next_action_count == 0: |
|
|
|
|
|
|
|
self.user_input = "" |
|
logger.typewriter_log( |
|
"NEXT ACTION: ", |
|
Fore.CYAN, |
|
f"COMMAND = {Fore.CYAN}{command_name}{Style.RESET_ALL} " |
|
f"ARGUMENTS = {Fore.CYAN}{arguments}{Style.RESET_ALL}", |
|
) |
|
print( |
|
"Enter 'y' to authorise command, 'y -N' to run N continuous " |
|
"commands, 'n' to exit program, or enter feedback for " |
|
f"{self.ai_name}...", |
|
flush=True, |
|
) |
|
while True: |
|
console_input = clean_input( |
|
Fore.MAGENTA + "Input:" + Style.RESET_ALL |
|
) |
|
if console_input.lower().rstrip() == "y": |
|
self.user_input = "GENERATE NEXT COMMAND JSON" |
|
break |
|
elif console_input.lower().startswith("y -"): |
|
try: |
|
self.next_action_count = abs( |
|
int(console_input.split(" ")[1]) |
|
) |
|
self.user_input = "GENERATE NEXT COMMAND JSON" |
|
except ValueError: |
|
print( |
|
"Invalid input format. Please enter 'y -n' where n is" |
|
" the number of continuous tasks." |
|
) |
|
continue |
|
break |
|
elif console_input.lower() == "n": |
|
self.user_input = "EXIT" |
|
break |
|
else: |
|
self.user_input = console_input |
|
command_name = "human_feedback" |
|
break |
|
|
|
if self.user_input == "GENERATE NEXT COMMAND JSON": |
|
logger.typewriter_log( |
|
"-=-=-=-=-=-=-= COMMAND AUTHORISED BY USER -=-=-=-=-=-=-=", |
|
Fore.MAGENTA, |
|
"", |
|
) |
|
elif self.user_input == "EXIT": |
|
print("Exiting...", flush=True) |
|
break |
|
else: |
|
|
|
logger.typewriter_log( |
|
"NEXT ACTION: ", |
|
Fore.CYAN, |
|
f"COMMAND = {Fore.CYAN}{command_name}{Style.RESET_ALL}" |
|
f" ARGUMENTS = {Fore.CYAN}{arguments}{Style.RESET_ALL}", |
|
) |
|
|
|
|
|
if command_name is not None and command_name.lower().startswith("error"): |
|
result = ( |
|
f"Command {command_name} threw the following error: {arguments}" |
|
) |
|
elif command_name == "human_feedback": |
|
result = f"Human feedback: {self.user_input}" |
|
else: |
|
result = ( |
|
f"Command {command_name} returned: " |
|
f"{cmd.execute_command(command_name, arguments)}" |
|
) |
|
if self.next_action_count > 0: |
|
self.next_action_count -= 1 |
|
|
|
memory_to_add = ( |
|
f"Assistant Reply: {assistant_reply} " |
|
f"\nResult: {result} " |
|
f"\nHuman Feedback: {self.user_input} " |
|
) |
|
|
|
self.memory.add(memory_to_add) |
|
|
|
|
|
|
|
if result is not None: |
|
self.full_message_history.append(create_chat_message("system", result)) |
|
logger.typewriter_log("SYSTEM: ", Fore.YELLOW, result) |
|
else: |
|
self.full_message_history.append( |
|
create_chat_message("system", "Unable to execute command") |
|
) |
|
logger.typewriter_log( |
|
"SYSTEM: ", Fore.YELLOW, "Unable to execute command" |
|
) |
|
|
|
|
|
def attempt_to_fix_json_by_finding_outermost_brackets(json_string): |
|
cfg = Config() |
|
if cfg.speak_mode and cfg.debug_mode: |
|
say_text( |
|
"I have received an invalid JSON response from the OpenAI API. " |
|
"Trying to fix it now." |
|
) |
|
logger.typewriter_log("Attempting to fix JSON by finding outermost brackets\n") |
|
|
|
try: |
|
json_pattern = regex.compile(r"\{(?:[^{}]|(?R))*\}") |
|
json_match = json_pattern.search(json_string) |
|
|
|
if json_match: |
|
|
|
json_string = json_match.group(0) |
|
logger.typewriter_log( |
|
title="Apparently json was fixed.", title_color=Fore.GREEN |
|
) |
|
if cfg.speak_mode and cfg.debug_mode: |
|
say_text("Apparently json was fixed.") |
|
else: |
|
raise ValueError("No valid JSON object found") |
|
|
|
except (json.JSONDecodeError, ValueError): |
|
if cfg.speak_mode: |
|
say_text("Didn't work. I will have to ignore this response then.") |
|
logger.error("Error: Invalid JSON, setting it to empty JSON now.\n") |
|
json_string = {} |
|
|
|
return json_string |
|
|
|
|
|
def print_assistant_thoughts(ai_name, assistant_reply): |
|
"""Prints the assistant's thoughts to the console""" |
|
cfg = Config() |
|
try: |
|
try: |
|
|
|
assistant_reply_json = fix_and_parse_json(assistant_reply) |
|
except json.JSONDecodeError: |
|
logger.error("Error: Invalid JSON in assistant thoughts\n", assistant_reply) |
|
assistant_reply_json = attempt_to_fix_json_by_finding_outermost_brackets( |
|
assistant_reply |
|
) |
|
if isinstance(assistant_reply_json, str): |
|
assistant_reply_json = fix_and_parse_json(assistant_reply_json) |
|
|
|
|
|
|
|
if isinstance(assistant_reply_json, str): |
|
try: |
|
assistant_reply_json = json.loads(assistant_reply_json) |
|
except json.JSONDecodeError: |
|
logger.error("Error: Invalid JSON\n", assistant_reply) |
|
assistant_reply_json = ( |
|
attempt_to_fix_json_by_finding_outermost_brackets( |
|
assistant_reply_json |
|
) |
|
) |
|
|
|
assistant_thoughts_reasoning = None |
|
assistant_thoughts_plan = None |
|
assistant_thoughts_speak = None |
|
assistant_thoughts_criticism = None |
|
if not isinstance(assistant_reply_json, dict): |
|
assistant_reply_json = {} |
|
assistant_thoughts = assistant_reply_json.get("thoughts", {}) |
|
assistant_thoughts_text = assistant_thoughts.get("text") |
|
|
|
if assistant_thoughts: |
|
assistant_thoughts_reasoning = assistant_thoughts.get("reasoning") |
|
assistant_thoughts_plan = assistant_thoughts.get("plan") |
|
assistant_thoughts_criticism = assistant_thoughts.get("criticism") |
|
assistant_thoughts_speak = assistant_thoughts.get("speak") |
|
|
|
logger.typewriter_log( |
|
f"{ai_name.upper()} THOUGHTS:", Fore.YELLOW, f"{assistant_thoughts_text}" |
|
) |
|
logger.typewriter_log( |
|
"REASONING:", Fore.YELLOW, f"{assistant_thoughts_reasoning}" |
|
) |
|
|
|
if assistant_thoughts_plan: |
|
logger.typewriter_log("PLAN:", Fore.YELLOW, "") |
|
|
|
if isinstance(assistant_thoughts_plan, list): |
|
assistant_thoughts_plan = "\n".join(assistant_thoughts_plan) |
|
elif isinstance(assistant_thoughts_plan, dict): |
|
assistant_thoughts_plan = str(assistant_thoughts_plan) |
|
|
|
|
|
lines = assistant_thoughts_plan.split("\n") |
|
for line in lines: |
|
line = line.lstrip("- ") |
|
logger.typewriter_log("- ", Fore.GREEN, line.strip()) |
|
|
|
logger.typewriter_log( |
|
"CRITICISM:", Fore.YELLOW, f"{assistant_thoughts_criticism}" |
|
) |
|
|
|
if cfg.speak_mode and assistant_thoughts_speak: |
|
say_text(assistant_thoughts_speak) |
|
|
|
return assistant_reply_json |
|
except json.decoder.JSONDecodeError: |
|
logger.error("Error: Invalid JSON\n", assistant_reply) |
|
if cfg.speak_mode: |
|
say_text( |
|
"I have received an invalid JSON response from the OpenAI API." |
|
" I cannot ignore this response." |
|
) |
|
|
|
|
|
except Exception: |
|
call_stack = traceback.format_exc() |
|
logger.error("Error: \n", call_stack) |
|
|