InterpreterFlowModule / InterpreterAtomicFlow.py
Tachi67's picture
Upload 6 files
f5ba9f9 verified
raw
history blame
4.61 kB
import traceback
from copy import deepcopy
from typing import Dict, Any
from .code_interpreters.create_code_interpreter import create_code_interpreter
from aiflows.base_flows import AtomicFlow
def truncate_output(data, max_output_chars=2000):
needs_truncation = False
message = f'Output truncated. Showing the last {max_output_chars} characters.\n\n'
# Remove previous truncation message if it exists
if data.startswith(message):
data = data[len(message):]
needs_truncation = True
# If data exceeds max length, truncate it and add message
if len(data) > max_output_chars or needs_truncation:
data = message + data[-max_output_chars:]
return data
class InterpreterAtomicFlow(AtomicFlow):
"""This flow is used to run the code passed from the caller.
*Input Interface*:
- `code`
- `language`
*Output Interface*:
- `interpreter_output`: output of the code interpreter
*Configuration Parameters*:
- None
**Full credits to open-interpreter (https://github.com/KillianLucas/open-interpreter)
for the usage of code interpreters (package `code_interpreters`) and the function truncate_output()**
I'm extracting the code interpreter part from open-interpreter because the litellm version of open-interpreter
is not compatible with that of the current version of aiflows(v.0.1.7).
"""
def __init__(self,
max_output=2000,
**kwargs):
super().__init__(**kwargs)
self.max_output = max_output
self._code_interpreters = {}
@classmethod
def instantiate_from_config(cls, config):
flow_config = deepcopy(config)
kwargs = {"flow_config": flow_config}
# ~~~ Instantiate flow ~~~
return cls(**kwargs)
def set_up_flow_state(self):
"""
class-specific flow state: language and code,
which describes the programming language and the code to run.
"""
super().set_up_flow_state()
self.flow_state["language"] = None
self.flow_state["code"] = ""
def _state_update_add_language_and_code(self,
language: str,
code: str) -> None:
"""
updates the language and code passed from _process_input_data
to the flow state
"""
self.flow_state["language"] = language
self.flow_state["code"] = code
def _check_input(self, input_data: Dict[str, Any]):
"""
Sanity check of input data
"""
# ~~~ Sanity check of input_data ~~~
assert "language" in input_data, "attribute 'language' not in input data."
assert "code" in input_data, "attribute 'code' not in input data."
def _process_input_data(self, input_data: Dict[str, Any]):
"""
Allocate interpreter if any, pass input data into flow state
"""
# code in Jupyter notebook that starts with '!' is actually shell command.
if input_data["language"] == "python" and input_data["code"].startswith("!"):
input_data["language"] = "shell"
input_data["code"] = input_data["code"][1:]
# ~~~ Allocate interpreter ~~~
# interpreter existence is checked in create_code_interpreter()
# TODO: consider: should we put language not supported error into output?
language = input_data["language"]
if language not in self._code_interpreters:
self._code_interpreters[language] = create_code_interpreter(language)
# ~~~ Pass input data to flow state ~~~
self._state_update_add_language_and_code(
language=language,
code=input_data["code"]
)
def _call(self):
output = ""
try:
code_interpreter = self._code_interpreters[self.flow_state["language"]]
code = self.flow_state["code"]
for line in code_interpreter.run(code):
if "output" in line:
output += "\n" + line["output"]
# Truncate output
output = truncate_output(output, self.max_output)
output = output.strip()
except:
output = traceback.format_exc()
output = output.strip()
return output
def run(
self,
input_data: Dict[str, Any]):
self._check_input(input_data)
self._process_input_data(input_data)
response = self._call()
return {"interpreter_output": response}