File size: 9,530 Bytes
cbb225c
 
5fbaf2a
93c74cc
cbb225c
 
1beb8a2
5fbaf2a
cbb225c
 
 
 
5fbaf2a
40a0d94
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
34a9244
40a0d94
 
 
e794544
1beb8a2
 
 
 
 
 
 
 
 
 
 
34a9244
1beb8a2
 
34a9244
1beb8a2
 
 
 
 
 
 
 
a8e421a
1beb8a2
 
 
c4d9a5c
1beb8a2
34a9244
 
1beb8a2
 
 
34a9244
1beb8a2
e794544
 
 
 
 
 
5fbaf2a
1beb8a2
34a9244
1beb8a2
34a9244
1beb8a2
5fbaf2a
1beb8a2
 
34a9244
1beb8a2
41db545
1beb8a2
 
 
 
a8e421a
1beb8a2
 
 
7ba1689
a8e421a
 
 
5fbaf2a
1beb8a2
34a9244
2cec66f
1beb8a2
2cec66f
a8e421a
2cec66f
 
 
 
a8e421a
 
 
1beb8a2
 
 
34a9244
 
 
 
 
 
1beb8a2
34a9244
1beb8a2
 
 
34a9244
1beb8a2
 
 
2cec66f
1beb8a2
 
 
 
 
 
2cec66f
 
 
 
 
 
6bdc009
2cec66f
 
 
 
6bdc009
2cec66f
 
1beb8a2
 
 
 
 
 
 
 
 
 
34a9244
1beb8a2
2cec66f
 
 
34a9244
1beb8a2
2cec66f
34a9244
1beb8a2
 
2cec66f
1beb8a2
 
 
 
6bdc009
 
 
 
 
1beb8a2
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
from typing import Dict, Any

from aiflows.base_flows import CompositeFlow
from aiflows.utils import logging

from .ControllerAtomicFlow import ControllerAtomicFlow
from aiflows.messages import FlowMessage
from aiflows.interfaces import KeyInterface
logging.set_verbosity_debug()
log = logging.get_logger(__name__)


class ControllerExecutorFlow(CompositeFlow):
    """ This class implements a ControllerExecutorFlow. It's composed of a ControllerAtomicFlow and an ExecutorFlow.
    Where typically the ControllerAtomicFlow is uses a LLM to decide which command to call and the ExecutorFlow (branching flow) is used to execute the command.
    
    It contains the following subflows:
    
    - A Controller Atomic Flow: It is a flow that to decides which command to get closer to completing it's task of accomplishing a given goal.
    - An Executor Flow: It is a branching flow that uses the executes the command instructed by the ControllerAtomicFlow.
        
    An illustration of the flow is as follows:
        
           goal -----|-----> ControllerFlow----->|-----> (anwser,status)
                     ^                           |
                     |                           |
                     |                           v
                     |<----- ExecutorFlow <------|
                     
    *Configuration Parameters*:
    
    - `name` (str): The name of the flow. Default: "CtrlEx"
    - `description` (str): A description of the flow. This description is used to generate the help message of the flow.
    Default: "ControllerExecutor (i.e., MRKL, ReAct) interaction implementation with Flows
    that approaches the problem solving in two phases: one Flow chooses the next step and another Flow executes it.
    This is repeated until the controller Flow concludes on an answer."
    - `max_rounds` (int): The maximum number of rounds the flow can run for. 
    Default: 30.
    - `subflows_config` (Dict[str,Any]): A dictionary of the subflows configurations. Default:
        - `Controller`: The configuration of the Controller Flow. By default, it a ControllerAtomicFlow. Default parameters:
            - `finish` (Dict[str,Any]): The configuration of the finish command. Default parameters:
                - `description` (str): The description of the command.
                Default: "Signal that the objective has been satisfied, and returns the answer to the user."
                - `input_args` (List[str]): The input arguments of the command. Default: ["answer"]
            -  All other parameters are inherited from the default configuration of ControllerAtomicFlow (see ControllerAtomicFlow)
        - `Executor`: The configuration of the Executor Flow. By default, it's a BranchingFlow. There are no default parameters, the flow
        parameter to to be defined is:
            - `subflows_config` (Dict[str,Any]): A dictionary of the configuration of the subflows of the branching flow.
            These subflows are typically also the possible commands of the Controller Flow. Default: []
    - `early_exit_key` (str): The key that is used to exit the flow. Default: "EARLY_EXIT"
    - `topology` (str): The topology of the flow which is "circular". 
    By default, the topology is the one shown in the illustration above
    (the topology is also described in ControllerExecutorFlow.yaml).
             
                     
    *Input Interface*:
        
    - `goal` (str): The goal of the controller. Usually asked by the user/human (e.g. "I want to know the occupation and birth date of Michael Jordan.")
    
    *Output Interface*:
    
    - `answer` (str): The answer of the flow to the query (e.g. "Michael Jordan is a basketball player and business man. He was born on February 17, 1963.")
    - `status` (str): The status of the flow. It can be "finished" or "unfinished". If the status is "unfinished", it's usually because the maximum amount of rounds was reached before the model found an answer.

    :param flow_config: The configuration of the flow (see Configuration Parameters).
    :param subflows: A list of subflows. Required when instantiating the subflow programmatically (it replaces subflows_config from flow_config).                                  
    """
    
    def __init__(self,**kwargs):
        super().__init__(**kwargs)
            
        self.input_interface_controller = KeyInterface(
            keys_to_select = ["goal","observation"],
        )
        self.input_interface_first_round_controller = KeyInterface(
            keys_to_select = ["goal"],
        )
        
        self.reply_interface = KeyInterface(
            keys_to_select = ["answer","status"],
        )
        
        self.next_state = {
            None: "Controller",
            "Controller": "Executor",
            "Executor": "Controller" 
        } 
    
    def generate_reply(self):
        """ This method generates the reply of the flow. It's called when the flow is finished. """
        
        reply = self.package_output_message(
            input_message = self.flow_state["input_message"],
            response = self.reply_interface(self.flow_state)
        )
        self.send_message(reply)
    
    def get_next_state(self):
        """ """
        if self.flow_config["max_rounds"] is not None and self.flow_state["current_round"] >= self.flow_config["max_rounds"]:
            return None
        
        return self.next_state[self.flow_state["last_state"]]
    
    def _on_reach_max_round(self):
        """ This method is called when the flow reaches the maximum amount of rounds. It updates the state of the flow and starts the process of terminating the flow."""
        self._state_update_dict({
            "answer": "The maximum amount of rounds was reached before the model found an answer.",
            "status": "unfinished"
        })
        
    def set_up_flow_state(self):
        """ Sets up the flow state."""
        super().set_up_flow_state()
        self.flow_state["last_state"] = None
        self.flow_state["current_round"] = 0
        
    
    def call_controller(self):
        """ Calls the controller: the flow that decides which command to call next."""
        #first_round  
        if self.flow_state["current_round"] == 0:
            input_interface = self.input_interface_first_round_controller
        else:
            input_interface = self.input_interface_controller
     
        message = self.package_input_message(
            data = input_interface(self.flow_state),
            dst_flow = "Controller"
        )
        
        self.subflows["Controller"].get_reply(
                message,
            )
        
    def call_executor(self):
        """ Calls the flow that executes the command instructed by the ControllerAtomicFlow."""
    
        #call executor
        executor_branch_to_call = self.flow_state["command"]
        message = self.package_input_message(
            data = self.flow_state["command_args"],
            dst_flow = executor_branch_to_call
        )
        
        self.subflows[executor_branch_to_call].get_reply(
                message,
        )
            
            
    def register_data_to_state(self, input_message):
        """This method registers the input message data to the flow state. It's everytime a new input message is received.
        
        :param input_message: The input message
        :type input_message: FlowMessage
        """
        last_state = self.flow_state["last_state"]
        
        if last_state is None:
            self.flow_state["input_message"] = input_message
            self.flow_state["goal"] = input_message.data["goal"]
        
        elif last_state == "Executor":
            self.flow_state["observation"] = input_message.data
        
        else:
            
            self._state_update_dict(
                {
                    "command": input_message.data["command"],
                    "command_args": input_message.data["command_args"]
                }
            )
            
            #detect and early exit
            if self.flow_state["command"] == "finish":
                
                self._state_update_dict(
                    {
                        "early_exit_flag": True,
                        "answer": self.flow_state["command_args"]["answer"],
                        "status": "finished"
                    }
                )
              
                
            
        
    def run(self,input_message: FlowMessage):
        """ Runs the WikiSearch Atomic Flow. It's used to execute a Wikipedia search and get page summaries.
        
        :param input_message: The input message
        :type input_message: FlowMessage
        """
     
        self.register_data_to_state(input_message)       
        
        current_state = self.get_next_state()
        
        if self.flow_state.get("early_exit_flag",False):
            self.generate_reply()
        
        elif current_state == "Controller":
            self.call_controller()
            
        elif current_state == "Executor":
            self.call_executor()
            self.flow_state["current_round"] += 1
            
        else:
            self._on_reach_max_round()
            self.generate_reply()
        
        
        if self.flow_state.get("early_exit_flag",False) or current_state is None:
            self.flow_state["last_state"] = None
        else:
            self.flow_state["last_state"] = current_state