Spaces:
Sleeping
Sleeping
import json | |
import os | |
import subprocess | |
import time | |
import sys | |
from datetime import datetime | |
from ..core.utils.system_debug_info import system_info | |
from .utils.count_tokens import count_messages_tokens | |
from .utils.display_markdown_message import display_markdown_message | |
def handle_undo(self, arguments): | |
# Removes all messages after the most recent user entry (and the entry itself). | |
# Therefore user can jump back to the latest point of conversation. | |
# Also gives a visual representation of the messages removed. | |
if len(self.messages) == 0: | |
return | |
# Find the index of the last 'role': 'user' entry | |
last_user_index = None | |
for i, message in enumerate(self.messages): | |
if message.get("role") == "user": | |
last_user_index = i | |
removed_messages = [] | |
# Remove all messages after the last 'role': 'user' | |
if last_user_index is not None: | |
removed_messages = self.messages[last_user_index:] | |
self.messages = self.messages[:last_user_index] | |
print("") # Aesthetics. | |
# Print out a preview of what messages were removed. | |
for message in removed_messages: | |
if "content" in message and message["content"] != None: | |
display_markdown_message( | |
f"**Removed message:** `\"{message['content'][:30]}...\"`" | |
) | |
elif "function_call" in message: | |
display_markdown_message( | |
f"**Removed codeblock**" | |
) # TODO: Could add preview of code removed here. | |
print("") # Aesthetics. | |
def handle_help(self, arguments): | |
commands_description = { | |
"%% [commands]": "Run commands in system shell", | |
"%verbose [true/false]": "Toggle verbose mode. Without arguments or with 'true', it enters verbose mode. With 'false', it exits verbose mode.", | |
"%reset": "Resets the current session.", | |
"%undo": "Remove previous messages and its response from the message history.", | |
"%save_message [path]": "Saves messages to a specified JSON path. If no path is provided, it defaults to 'messages.json'.", | |
"%load_message [path]": "Loads messages from a specified JSON path. If no path is provided, it defaults to 'messages.json'.", | |
"%tokens [prompt]": "EXPERIMENTAL: Calculate the tokens used by the next request based on the current conversation's messages and estimate the cost of that request; optionally provide a prompt to also calculate the tokens used by that prompt and the total amount of tokens that will be sent with the next request", | |
"%help": "Show this help message.", | |
"%info": "Show system and interpreter information", | |
"%jupyter": "Export the conversation to a Jupyter notebook file", | |
} | |
base_message = ["> **Available Commands:**\n\n"] | |
# Add each command and its description to the message | |
for cmd, desc in commands_description.items(): | |
base_message.append(f"- `{cmd}`: {desc}\n") | |
additional_info = [ | |
"\n\nFor further assistance, please join our community Discord or consider contributing to the project's development." | |
] | |
# Combine the base message with the additional info | |
full_message = base_message + additional_info | |
display_markdown_message("".join(full_message)) | |
def handle_verbose(self, arguments=None): | |
if arguments == "" or arguments == "true": | |
display_markdown_message("> Entered verbose mode") | |
print("\n\nCurrent messages:\n") | |
for message in self.messages: | |
message = message.copy() | |
if message["type"] == "image" and message.get("format") != "path": | |
message["content"] = ( | |
message["content"][:30] + "..." + message["content"][-30:] | |
) | |
print(message, "\n") | |
print("\n") | |
self.verbose = True | |
elif arguments == "false": | |
display_markdown_message("> Exited verbose mode") | |
self.verbose = False | |
else: | |
display_markdown_message("> Unknown argument to verbose command.") | |
def handle_info(self, arguments): | |
system_info(self) | |
def handle_reset(self, arguments): | |
self.reset() | |
display_markdown_message("> Reset Done") | |
def default_handle(self, arguments): | |
display_markdown_message("> Unknown command") | |
handle_help(self, arguments) | |
def handle_save_message(self, json_path): | |
if json_path == "": | |
json_path = "messages.json" | |
if not json_path.endswith(".json"): | |
json_path += ".json" | |
with open(json_path, "w") as f: | |
json.dump(self.messages, f, indent=2) | |
display_markdown_message(f"> messages json export to {os.path.abspath(json_path)}") | |
def handle_load_message(self, json_path): | |
if json_path == "": | |
json_path = "messages.json" | |
if not json_path.endswith(".json"): | |
json_path += ".json" | |
with open(json_path, "r") as f: | |
self.messages = json.load(f) | |
display_markdown_message( | |
f"> messages json loaded from {os.path.abspath(json_path)}" | |
) | |
def handle_count_tokens(self, prompt): | |
messages = [{"role": "system", "message": self.system_message}] + self.messages | |
outputs = [] | |
if len(self.messages) == 0: | |
(conversation_tokens, conversation_cost) = count_messages_tokens( | |
messages=messages, model=self.llm.model | |
) | |
else: | |
(conversation_tokens, conversation_cost) = count_messages_tokens( | |
messages=messages, model=self.llm.model | |
) | |
outputs.append( | |
( | |
f"> Tokens sent with next request as context: {conversation_tokens} (Estimated Cost: ${conversation_cost})" | |
) | |
) | |
if prompt: | |
(prompt_tokens, prompt_cost) = count_messages_tokens( | |
messages=[prompt], model=self.llm.model | |
) | |
outputs.append( | |
f"> Tokens used by this prompt: {prompt_tokens} (Estimated Cost: ${prompt_cost})" | |
) | |
total_tokens = conversation_tokens + prompt_tokens | |
total_cost = conversation_cost + prompt_cost | |
outputs.append( | |
f"> Total tokens for next request with this prompt: {total_tokens} (Estimated Cost: ${total_cost})" | |
) | |
outputs.append( | |
f"**Note**: This functionality is currently experimental and may not be accurate. Please report any issues you find to the [Open Interpreter GitHub repository](https://github.com/KillianLucas/open-interpreter)." | |
) | |
display_markdown_message("\n".join(outputs)) | |
def get_downloads_path(): | |
if os.name == 'nt': | |
# For Windows | |
downloads = os.path.join(os.environ['USERPROFILE'], 'Downloads') | |
else: | |
# For MacOS and Linux | |
downloads = os.path.join(os.path.expanduser('~'), 'Downloads') | |
return downloads | |
def install_and_import(package): | |
try: | |
module = __import__(package) | |
except ImportError: | |
try: | |
# Install the package silently with pip | |
print("") | |
print(f"Installing {package}...") | |
print("") | |
subprocess.check_call([sys.executable, "-m", "pip", "install", package], | |
stdout=subprocess.DEVNULL, | |
stderr=subprocess.DEVNULL | |
) | |
module = __import__(package) | |
except subprocess.CalledProcessError: | |
# If pip fails, try pip3 | |
try: | |
subprocess.check_call([sys.executable, "-m", "pip3", "install", package], | |
stdout=subprocess.DEVNULL, | |
stderr=subprocess.DEVNULL | |
) | |
except subprocess.CalledProcessError: | |
print(f"Failed to install package {package}.") | |
return | |
finally: | |
globals()[package] = module | |
return module | |
def jupyter(self, arguments): | |
# Dynamically install nbformat if not already installed | |
nbformat = install_and_import('nbformat') | |
from nbformat.v4 import new_notebook, new_code_cell, new_markdown_cell | |
downloads = get_downloads_path() | |
current_time = datetime.now() | |
formatted_time = current_time.strftime("%m-%d-%y-%I%M%p") | |
filename = f"open-interpreter-{formatted_time}.ipynb" | |
notebook_path = os.path.join(downloads, filename) | |
nb = new_notebook() | |
cells = [] | |
for msg in self.messages: | |
if msg['role'] == 'user' and msg['type'] == 'message': | |
# Prefix user messages with '>' to render them as block quotes, so they stand out | |
content = f"> {msg['content']}" | |
cells.append(new_markdown_cell(content)) | |
elif msg['role'] == 'assistant' and msg['type'] == 'message': | |
cells.append(new_markdown_cell(msg['content'])) | |
elif msg['type'] == 'code': | |
# Handle the language of the code cell | |
if 'format' in msg and msg['format']: | |
language = msg['format'] | |
else: | |
language = 'python' # Default to Python if no format specified | |
code_cell = new_code_cell(msg['content']) | |
code_cell.metadata.update({"language": language}) | |
cells.append(code_cell) | |
nb['cells'] = cells | |
with open(notebook_path, 'w', encoding='utf-8') as f: | |
nbformat.write(nb, f) | |
print("") | |
display_markdown_message(f"Jupyter notebook file exported to {os.path.abspath(notebook_path)}") | |
def handle_magic_command(self, user_input): | |
# Handle shell | |
if user_input.startswith("%%"): | |
code = user_input[2:].strip() | |
self.computer.run("shell", code, stream=False, display=True) | |
print("") | |
return | |
# split the command into the command and the arguments, by the first whitespace | |
switch = { | |
"help": handle_help, | |
"verbose": handle_verbose, | |
"reset": handle_reset, | |
"save_message": handle_save_message, | |
"load_message": handle_load_message, | |
"undo": handle_undo, | |
"tokens": handle_count_tokens, | |
"info": handle_info, | |
"jupyter": jupyter, | |
} | |
user_input = user_input[1:].strip() # Capture the part after the `%` | |
command = user_input.split(" ")[0] | |
arguments = user_input[len(command) :].strip() | |
if command == "debug": | |
print( | |
"\n`%debug` / `--debug_mode` has been renamed to `%verbose` / `--verbose`.\n" | |
) | |
time.sleep(1.5) | |
command = "verbose" | |
action = switch.get( | |
command, default_handle | |
) # Get the function from the dictionary, or default_handle if not found | |
action(self, arguments) # Execute the function | |