File size: 4,606 Bytes
3f6b5c2
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
f5ba9f9
3f6b5c2
a39f32d
3f6b5c2
f5ba9f9
3f6b5c2
 
f5ba9f9
 
 
3f6b5c2
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
import traceback
from copy import deepcopy
from typing import Dict, Any

from .code_interpreters.create_code_interpreter import create_code_interpreter

from aiflows.base_flows import AtomicFlow

def truncate_output(data, max_output_chars=2000):
    needs_truncation = False

    message = f'Output truncated. Showing the last {max_output_chars} characters.\n\n'

    # Remove previous truncation message if it exists
    if data.startswith(message):
        data = data[len(message):]
        needs_truncation = True

    # If data exceeds max length, truncate it and add message
    if len(data) > max_output_chars or needs_truncation:
        data = message + data[-max_output_chars:]

    return data


class InterpreterAtomicFlow(AtomicFlow):
    """This flow is used to run the code passed from the caller.
    *Input Interface*:
    - `code`
    - `language`
    
    *Output Interface*:
    - `interpreter_output`: output of the code interpreter

    *Configuration Parameters*:
    - None

    **Full credits to open-interpreter (https://github.com/KillianLucas/open-interpreter)
    for the usage of code interpreters (package `code_interpreters`) and the function truncate_output()**

    I'm extracting the code interpreter part from open-interpreter because the litellm version of open-interpreter
    is not compatible with that of the current version of aiflows(v.0.1.7).
    """
    def __init__(self,
                 max_output=2000,
                 **kwargs):
        super().__init__(**kwargs)
        self.max_output = max_output
        self._code_interpreters = {}

    @classmethod
    def instantiate_from_config(cls, config):
        flow_config = deepcopy(config)

        kwargs = {"flow_config": flow_config}

        # ~~~ Instantiate flow ~~~
        return cls(**kwargs)


    def set_up_flow_state(self):
        """
        class-specific flow state: language and code,
        which describes the programming language and the code to run.
        """
        super().set_up_flow_state()
        self.flow_state["language"] = None
        self.flow_state["code"] = ""

    def _state_update_add_language_and_code(self,
                                            language: str,
                                            code: str) -> None:
        """
        updates the language and code passed from _process_input_data
        to the flow state
        """
        self.flow_state["language"] = language
        self.flow_state["code"] = code

    def _check_input(self, input_data: Dict[str, Any]):
        """
        Sanity check of input data
        """
         # ~~~ Sanity check of input_data ~~~
        assert "language" in input_data, "attribute 'language' not in input data."
        assert "code" in input_data, "attribute 'code' not in input data."


    def _process_input_data(self, input_data: Dict[str, Any]):
        """
        Allocate interpreter if any, pass input data into flow state
        """
        # code in Jupyter notebook that starts with '!' is actually shell command.
        if input_data["language"] == "python" and input_data["code"].startswith("!"):
            input_data["language"] = "shell"
            input_data["code"] = input_data["code"][1:]

        # ~~~ Allocate interpreter ~~~
        # interpreter existence is checked in create_code_interpreter()
        # TODO: consider: should we put language not supported error into output?
        language = input_data["language"]
        if language not in self._code_interpreters:
            self._code_interpreters[language] = create_code_interpreter(language)

        # ~~~ Pass input data to flow state ~~~
        self._state_update_add_language_and_code(
            language=language,
            code=input_data["code"]
        )

    def _call(self):
        output = ""
        try:
            code_interpreter = self._code_interpreters[self.flow_state["language"]]
            code = self.flow_state["code"]
            for line in code_interpreter.run(code):
                if "output" in line:
                    output += "\n" + line["output"]
                    # Truncate output
                    output = truncate_output(output, self.max_output)
                    output = output.strip()
        except:
            output = traceback.format_exc()
            output = output.strip()
        return output

    def run(
            self,
            input_data: Dict[str, Any]):
        self._check_input(input_data)
        self._process_input_data(input_data)
        response = self._call()
        return {"interpreter_output": response}