Spaces:
Sleeping
Sleeping
""" | |
The terminal interface is just a view. Just handles the very top layer. | |
If you were to build a frontend this would be a way to do it. | |
""" | |
try: | |
import readline | |
except ImportError: | |
pass | |
import os | |
import platform | |
import random | |
import re | |
import subprocess | |
import time | |
from ..core.utils.scan_code import scan_code | |
from ..core.utils.system_debug_info import system_info | |
from ..core.utils.truncate_output import truncate_output | |
from .components.code_block import CodeBlock | |
from .components.message_block import MessageBlock | |
from .magic_commands import handle_magic_command | |
from .utils.check_for_package import check_for_package | |
from .utils.display_markdown_message import display_markdown_message | |
from .utils.display_output import display_output | |
from .utils.find_image_path import find_image_path | |
from .utils.cli_input import cli_input | |
# Add examples to the readline history | |
examples = [ | |
"How many files are on my desktop?", | |
"What time is it in Seattle?", | |
"Make me a simple Pomodoro app.", | |
"Open Chrome and go to YouTube.", | |
"Can you set my system to light mode?", | |
] | |
random.shuffle(examples) | |
try: | |
for example in examples: | |
readline.add_history(example) | |
except: | |
# If they don't have readline, that's fine | |
pass | |
def terminal_interface(interpreter, message): | |
# Auto run and offline (this.. this isn't right) don't display messages. | |
# Probably worth abstracting this to something like "debug_cli" at some point. | |
if not interpreter.auto_run and not interpreter.offline: | |
interpreter_intro_message = [ | |
"**Open Interpreter** will require approval before running code." | |
] | |
if interpreter.safe_mode == "ask" or interpreter.safe_mode == "auto": | |
if not check_for_package("semgrep"): | |
interpreter_intro_message.append( | |
f"**Safe Mode**: {interpreter.safe_mode}\n\n>Note: **Safe Mode** requires `semgrep` (`pip install semgrep`)" | |
) | |
else: | |
interpreter_intro_message.append("Use `interpreter -y` to bypass this.") | |
interpreter_intro_message.append("Press `CTRL-C` to exit.") | |
display_markdown_message("\n\n".join(interpreter_intro_message) + "\n") | |
if message: | |
interactive = False | |
else: | |
interactive = True | |
active_block = None | |
voice_subprocess = None | |
while True: | |
if interactive: | |
### This is the primary input for Open Interpreter. | |
message = cli_input("> ").strip() if interpreter.multi_line else input("> ").strip() | |
try: | |
# This lets users hit the up arrow key for past messages | |
readline.add_history(message) | |
except: | |
# If the user doesn't have readline (may be the case on windows), that's fine | |
pass | |
if isinstance(message, str): | |
# This is for the terminal interface being used as a CLI — messages are strings. | |
# This won't fire if they're in the python package, display=True, and they passed in an array of messages (for example). | |
if message == "": | |
# Ignore empty messages when user presses enter without typing anything | |
continue | |
if message.startswith("%") and interactive: | |
handle_magic_command(interpreter, message) | |
continue | |
# Many users do this | |
if message.strip() == "interpreter --local": | |
print("Please exit this conversation, then run `interpreter --local`.") | |
continue | |
if message.strip() == "pip install --upgrade open-interpreter": | |
print( | |
"Please exit this conversation, then run `pip install --upgrade open-interpreter`." | |
) | |
continue | |
if interpreter.llm.supports_vision or interpreter.llm.vision_renderer != None: | |
# Is the input a path to an image? Like they just dragged it into the terminal? | |
image_path = find_image_path(message) | |
## If we found an image, add it to the message | |
if image_path: | |
# Add the text interpreter's message history | |
interpreter.messages.append( | |
{ | |
"role": "user", | |
"type": "message", | |
"content": message, | |
} | |
) | |
# Pass in the image to interpreter in a moment | |
message = { | |
"role": "user", | |
"type": "image", | |
"format": "path", | |
"content": image_path, | |
} | |
try: | |
for chunk in interpreter.chat(message, display=False, stream=True): | |
yield chunk | |
# Is this for thine eyes? | |
if "recipient" in chunk and chunk["recipient"] != "user": | |
continue | |
if interpreter.verbose: | |
print("Chunk in `terminal_interface`:", chunk) | |
# Comply with PyAutoGUI fail-safe for OS mode | |
# so people can turn it off by moving their mouse to a corner | |
if interpreter.os: | |
if ( | |
chunk.get("format") == "output" | |
and "failsafeexception" in chunk["content"].lower() | |
): | |
print("Fail-safe triggered (mouse in one of the four corners).") | |
break | |
if "end" in chunk and active_block: | |
active_block.refresh(cursor=False) | |
if chunk["type"] in [ | |
"message", | |
"console", | |
]: # We don't stop on code's end — code + console output are actually one block. | |
active_block.end() | |
active_block = None | |
# Assistant message blocks | |
if chunk["type"] == "message": | |
if "start" in chunk: | |
active_block = MessageBlock() | |
render_cursor = True | |
if "content" in chunk: | |
active_block.message += chunk["content"] | |
if "end" in chunk and interpreter.os: | |
last_message = interpreter.messages[-1]["content"] | |
# Remove markdown lists and the line above markdown lists | |
lines = last_message.split("\n") | |
i = 0 | |
while i < len(lines): | |
# Match markdown lists starting with hyphen, asterisk or number | |
if re.match(r"^\s*([-*]|\d+\.)\s", lines[i]): | |
del lines[i] | |
if i > 0: | |
del lines[i - 1] | |
i -= 1 | |
else: | |
i += 1 | |
message = "\n".join(lines) | |
# Replace newlines with spaces, escape double quotes and backslashes | |
sanitized_message = ( | |
message.replace("\\", "\\\\") | |
.replace("\n", " ") | |
.replace('"', '\\"') | |
) | |
# Display notification in OS mode | |
if interpreter.os: | |
interpreter.computer.os.notify(sanitized_message) | |
# Speak message aloud | |
if platform.system() == "Darwin" and interpreter.speak_messages: | |
if voice_subprocess: | |
voice_subprocess.terminate() | |
voice_subprocess = subprocess.Popen( | |
[ | |
"osascript", | |
"-e", | |
f'say "{sanitized_message}" using "Fred"', | |
] | |
) | |
else: | |
pass | |
# User isn't on a Mac, so we can't do this. You should tell them something about that when they first set this up. | |
# Or use a universal TTS library. | |
# Assistant code blocks | |
elif chunk["role"] == "assistant" and chunk["type"] == "code": | |
if "start" in chunk: | |
active_block = CodeBlock() | |
active_block.language = chunk["format"] | |
render_cursor = True | |
if "content" in chunk: | |
active_block.code += chunk["content"] | |
# Execution notice | |
if chunk["type"] == "confirmation": | |
if not interpreter.auto_run: | |
# OI is about to execute code. The user wants to approve this | |
# End the active code block so you can run input() below it | |
if active_block: | |
active_block.refresh(cursor=False) | |
active_block.end() | |
active_block = None | |
code_to_run = chunk["content"] | |
language = code_to_run["format"] | |
code = code_to_run["content"] | |
should_scan_code = False | |
if not interpreter.safe_mode == "off": | |
if interpreter.safe_mode == "auto": | |
should_scan_code = True | |
elif interpreter.safe_mode == "ask": | |
response = input( | |
" Would you like to scan this code? (y/n)\n\n " | |
) | |
print("") # <- Aesthetic choice | |
if response.strip().lower() == "y": | |
should_scan_code = True | |
if should_scan_code: | |
scan_code(code, language, interpreter) | |
response = input( | |
" Would you like to run this code? (y/n)\n\n " | |
) | |
print("") # <- Aesthetic choice | |
if response.strip().lower() == "y": | |
# Create a new, identical block where the code will actually be run | |
# Conveniently, the chunk includes everything we need to do this: | |
active_block = CodeBlock() | |
active_block.margin_top = False # <- Aesthetic choice | |
active_block.language = language | |
active_block.code = code | |
else: | |
# User declined to run code. | |
interpreter.messages.append( | |
{ | |
"role": "user", | |
"type": "message", | |
"content": "I have declined to run this code.", | |
} | |
) | |
break | |
# Computer can display visual types to user, | |
# Which sometimes creates more computer output (e.g. HTML errors, eventually) | |
if ( | |
chunk["role"] == "computer" | |
and "content" in chunk | |
and ( | |
chunk["type"] == "image" | |
or ("format" in chunk and chunk["format"] == "html") | |
or ("format" in chunk and chunk["format"] == "javascript") | |
) | |
): | |
if interpreter.os and interpreter.verbose == False: | |
# We don't display things to the user in OS control mode, since we use vision to communicate the screen to the LLM so much. | |
# But if verbose is true, we do display it! | |
continue | |
# Display and give extra output back to the LLM | |
extra_computer_output = display_output(chunk) | |
# We're going to just add it to the messages directly, not changing `recipient` here. | |
# Mind you, the way we're doing this, this would make it appear to the user if they look at their conversation history, | |
# because we're not adding "recipient: assistant" to this block. But this is a good simple solution IMO. | |
# we just might want to change it in the future, once we're sure that a bunch of adjacent type:console blocks will be rendered normally to text-only LLMs | |
# and that if we made a new block here with "recipient: assistant" it wouldn't add new console outputs to that block (thus hiding them from the user) | |
if ( | |
interpreter.messages[-1].get("format") != "output" | |
or interpreter.messages[-1]["role"] != "computer" | |
or interpreter.messages[-1]["type"] != "console" | |
): | |
# If the last message isn't a console output, make a new block | |
interpreter.messages.append( | |
{ | |
"role": "computer", | |
"type": "console", | |
"format": "output", | |
"content": extra_computer_output, | |
} | |
) | |
else: | |
# If the last message is a console output, simply append the extra output to it | |
interpreter.messages[-1]["content"] += ( | |
"\n" + extra_computer_output | |
) | |
interpreter.messages[-1]["content"] = interpreter.messages[-1][ | |
"content" | |
].strip() | |
# Console | |
if chunk["type"] == "console": | |
render_cursor = False | |
if "format" in chunk and chunk["format"] == "output": | |
active_block.output += "\n" + chunk["content"] | |
active_block.output = ( | |
active_block.output.strip() | |
) # ^ Aesthetic choice | |
# Truncate output | |
active_block.output = truncate_output( | |
active_block.output, interpreter.max_output | |
) | |
if "format" in chunk and chunk["format"] == "active_line": | |
active_block.active_line = chunk["content"] | |
# Display action notifications if we're in OS mode | |
if interpreter.os and active_block.active_line != None: | |
action = "" | |
code_lines = active_block.code.split("\n") | |
if active_block.active_line < len(code_lines): | |
action = code_lines[active_block.active_line].strip() | |
if action.startswith("computer"): | |
description = None | |
# Extract arguments from the action | |
start_index = action.find("(") | |
end_index = action.rfind(")") | |
if start_index != -1 and end_index != -1: | |
# (If we found both) | |
arguments = action[start_index + 1 : end_index] | |
else: | |
arguments = None | |
# NOTE: Do not put the text you're clicking on screen | |
# (unless we figure out how to do this AFTER taking the screenshot) | |
# otherwise it will try to click this notification! | |
if any(action.startswith(text) for text in [ | |
"computer.screenshot", | |
"computer.display.screenshot", | |
"computer.display.view", | |
"computer.view" | |
]): | |
description = "Viewing screen..." | |
elif action == "computer.mouse.click()": | |
description = "Clicking..." | |
elif action.startswith("computer.mouse.click("): | |
if "icon=" in arguments: | |
text_or_icon = "icon" | |
else: | |
text_or_icon = "text" | |
description = f"Clicking {text_or_icon}..." | |
elif action.startswith("computer.mouse.move("): | |
if "icon=" in arguments: | |
text_or_icon = "icon" | |
else: | |
text_or_icon = "text" | |
if ( | |
"click" in active_block.code | |
): # This could be better | |
description = f"Clicking {text_or_icon}..." | |
else: | |
description = f"Mousing over {text_or_icon}..." | |
elif action.startswith("computer.keyboard.write("): | |
description = f"Typing {arguments}." | |
elif action.startswith("computer.keyboard.hotkey("): | |
description = f"Pressing {arguments}." | |
elif action.startswith("computer.keyboard.press("): | |
description = f"Pressing {arguments}." | |
elif action == "computer.os.get_selected_text()": | |
description = f"Getting selected text." | |
if description: | |
interpreter.computer.os.notify(description) | |
if "start" in chunk: | |
# We need to make a code block if we pushed out an HTML block first, which would have closed our code block. | |
if not isinstance(active_block, CodeBlock): | |
if active_block: | |
active_block.end() | |
active_block = CodeBlock() | |
if active_block: | |
active_block.refresh(cursor=render_cursor) | |
# (Sometimes -- like if they CTRL-C quickly -- active_block is still None here) | |
if "active_block" in locals(): | |
if active_block: | |
active_block.end() | |
active_block = None | |
time.sleep(0.1) | |
if not interactive: | |
# Don't loop | |
break | |
except KeyboardInterrupt: | |
# Exit gracefully | |
if "active_block" in locals() and active_block: | |
active_block.end() | |
active_block = None | |
if interactive: | |
# (this cancels LLM, returns to the interactive "> " input) | |
continue | |
else: | |
break | |
except: | |
if interpreter.debug: | |
system_info(interpreter) | |
raise | |