nbaldwin commited on
Commit
e511f35
1 Parent(s): 30754bf

coflows update

Browse files
Files changed (3) hide show
  1. AutoGPTFlow.py +163 -117
  2. __init__.py +2 -2
  3. run.py +66 -46
AutoGPTFlow.py CHANGED
@@ -76,128 +76,43 @@ class AutoGPTFlow(ControllerExecutorFlow):
76
  self.rename_human_output_interface = KeyInterface(
77
  keys_to_rename={"human_input": "human_feedback"}
78
  )
79
-
80
- def set_up_flow_state(self):
81
- super().set_up_flow_state()
82
- self.flow_state["early_exit_flag"] = True
83
- self.flow_state["is_first_round"] = True
84
-
85
- def memory_read_step(self):
86
- memory_read_input = self.prepare_memory_read_input()
87
- output = self.ask_subflow("Memory", memory_read_input).get_data()
88
- memory_read_output = self.prepare_memory_read_output(output)
89
-
90
- return memory_read_output
91
-
92
- def memory_write_step(self):
93
- memory_write_input = self.prepare_memory_write_input()
94
- self.tell_subflow("Memory", memory_write_input)
95
-
96
- def controller_executor_step(self, output_memory_retrieval):
97
-
98
- if self.flow_state["is_first_round"]:
99
- additional_input_ctrl_ex = {
100
- "goal": self.flow_state["goal"],
101
- }
102
- else:
103
- additional_input_ctrl_ex = {
104
- "observation": self.flow_state["observation"],
105
- "human_feedback": self.flow_state["human_feedback"],
106
- }
107
 
108
- input_ctrl_ex = {"executor_reply": {**output_memory_retrieval,**additional_input_ctrl_ex}}
109
-
110
- output_ctrl_ex = self._single_round_controller_executor(input_ctrl_ex)
111
-
112
- self.flow_state["early_exit_flag"] = output_ctrl_ex.get("EARLY_EXIT",False)
113
-
114
- if self.flow_state["early_exit_flag"]:
115
- return output_ctrl_ex
116
-
117
- controller_reply = output_ctrl_ex["controller_reply"]
118
- executor_reply = output_ctrl_ex["executor_reply"]
119
-
120
- self._state_update_dict(
121
- {
122
- "command": controller_reply["command"],
123
- "command_args": controller_reply["command_args"],
124
- "observation": executor_reply["observation"],
125
- }
126
  )
127
- return output_ctrl_ex
128
-
129
-
130
- def human_feedback_step(self):
131
-
132
- human_feedback_input_variables = {
133
- "goal": self.flow_state["goal"],
134
- "command": self.flow_state["command"],
135
- "command_args": self.flow_state["command_args"],
136
- "observation": self.flow_state["observation"],
137
- }
138
 
139
- human_feedback = self.rename_human_output_interface(
140
- self.ask_subflow("HumanFeedback", human_feedback_input_variables).get_data()
141
  )
142
 
143
- self.flow_state["human_feedback"] = human_feedback["human_feedback"]
144
-
145
- if human_feedback["human_feedback"].strip().lower() == "q":
146
- self.flow_state["early_exit_flag"] = True
147
- return {
148
- "EARLY_EXIT": True,
149
- "answer": "The user has chosen to exit before a final answer was generated.",
150
- "status": "unfinished",
151
- }
152
-
153
- return human_feedback
154
-
155
-
156
- def _single_round_autogpt(self):
157
-
158
-
159
- #1. Memory Retrieval
160
- output_memory_retrieval = self.memory_read_step()
161
-
162
-
163
- #2. ControllerExecutor
164
- output_ctrl_ex = self.controller_executor_step(output_memory_retrieval)
165
-
166
- if self.flow_state["early_exit_flag"]:
167
- return output_ctrl_ex
168
-
169
- #3. HumanFeedback
170
- output_human_feedback = self.human_feedback_step()
171
-
172
- if self.flow_state["early_exit_flag"]:
173
- return output_human_feedback
174
-
175
- #4. Memory Write
176
- self.memory_write_step()
177
-
178
- return {** output_ctrl_ex, **output_human_feedback}
179
-
180
-
181
-
182
-
183
-
184
- def run(self,input_data):
185
- self._state_update_dict({"goal": input_data["goal"]})
186
 
187
- for round in range(self.flow_config["max_rounds"]):
188
- output = self._single_round_autogpt()
 
 
189
 
190
- self.flow_state["is_first_round"] = False
 
 
 
 
 
 
 
191
 
192
- if self.flow_state["early_exit_flag"]:
193
- return output
 
194
 
195
- return {
196
- "EARLY_EXIT": False,
197
- "answer": output,
198
- "status": "unfinished"
199
- }
200
 
 
 
 
201
  def _get_memory_key(self):
202
  """ This method returns the memory key that is used to retrieve memories from the ChromaDB model.
203
 
@@ -246,11 +161,6 @@ class AutoGPTFlow(ControllerExecutorFlow):
246
  "content": query
247
  }
248
 
249
- def prepare_memory_read_output(self, data: Dict[str, Any]):
250
-
251
- retrieved_memories = data["retrieved"][0][1:]
252
- return {"memory": "\n".join(retrieved_memories)}
253
-
254
  def prepare_memory_write_input(self) -> Dict[str, Any]:
255
 
256
  query = self._get_memory_key()
@@ -259,3 +169,139 @@ class AutoGPTFlow(ControllerExecutorFlow):
259
  "operation": "write",
260
  "content": str(query)
261
  }
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
76
  self.rename_human_output_interface = KeyInterface(
77
  keys_to_rename={"human_input": "human_feedback"}
78
  )
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
79
 
80
+ self.input_interface_controller = KeyInterface(
81
+ keys_to_select = ["goal","observation","human_feedback", "memory"],
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
82
  )
 
 
 
 
 
 
 
 
 
 
 
83
 
84
+ self.input_interface_human_feedback = KeyInterface(
85
+ keys_to_select = ["goal","command","command_args","observation"],
86
  )
87
 
88
+ self.memory_read_ouput_interface = KeyInterface(
89
+ additional_transformations = [self.prepare_memory_read_output],
90
+ keys_to_select = ["memory"],
91
+ )
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
92
 
93
+ self.human_feedback_ouput_interface = KeyInterface(
94
+ keys_to_rename={"human_input": "human_feedback"},
95
+ keys_to_select = ["human_feedback"],
96
+ )
97
 
98
+ self.next_flow_to_call = {
99
+ None: "MemoryRead",
100
+ "MemoryRead": "Controller",
101
+ "Controller": "Executor",
102
+ "Executor": "HumanFeedback",
103
+ "HumanFeedback": "MemoryWrite",
104
+ "MemoryWrite": "MemoryRead",
105
+ }
106
 
107
+ def set_up_flow_state(self):
108
+ super().set_up_flow_state()
109
+ self.flow_state["early_exit_flag"] = False
110
 
111
+ def prepare_memory_read_output(self,data_dict: Dict[str, Any],**kwargs):
 
 
 
 
112
 
113
+ retrieved_memories = data_dict["retrieved"][0][1:]
114
+ return {"memory": "\n".join(retrieved_memories)}
115
+
116
  def _get_memory_key(self):
117
  """ This method returns the memory key that is used to retrieve memories from the ChromaDB model.
118
 
 
161
  "content": query
162
  }
163
 
 
 
 
 
 
164
  def prepare_memory_write_input(self) -> Dict[str, Any]:
165
 
166
  query = self._get_memory_key()
 
169
  "operation": "write",
170
  "content": str(query)
171
  }
172
+
173
+
174
+ def call_memory_read(self):
175
+ memory_read_input = self.prepare_memory_read_input()
176
+
177
+ message = self._package_input_message(
178
+ data = memory_read_input,
179
+ dst_flow = "Memory",
180
+ )
181
+
182
+ self.subflows["Memory"].send_message_async(
183
+ message,
184
+ pipe_to = self.flow_config["flow_ref"]
185
+ )
186
+
187
+ def call_memory_write(self):
188
+ memory_write_input = self.prepare_memory_write_input()
189
+
190
+ message = self._package_input_message(
191
+ data = memory_write_input,
192
+ dst_flow = "Memory",
193
+ )
194
+
195
+ self.subflows["Memory"].send_message_async(
196
+ message,
197
+ pipe_to = self.flow_config["flow_ref"]
198
+ )
199
+
200
+ def call_human_feedback(self):
201
+
202
+ message = self._package_input_message(
203
+ data = self.input_interface_human_feedback(self.flow_state),
204
+ dst_flow = "HumanFeedback",
205
+ )
206
+
207
+ self.subflows["HumanFeedback"].send_message_async(
208
+ message,
209
+ pipe_to = self.flow_config["flow_ref"]
210
+ )
211
+
212
+ def register_data_to_state(self, input_message):
213
+
214
+ #Making this explicit so it's easier to understand
215
+ #I'm also showing different ways of writing to the state
216
+ # either programmatically or using the _state_update_dict and
217
+ # input and ouput interface methods
218
+
219
+ last_called = self.flow_state["last_called"]
220
+
221
+ if last_called is None:
222
+ self.flow_state["input_message"] = input_message
223
+ self.flow_state["goal"] = input_message.data["goal"]
224
+
225
+ elif last_called == "Executor":
226
+ self.flow_state["observation"] = input_message.data
227
+
228
+ elif last_called == "Controller":
229
+ self._state_update_dict(
230
+ {
231
+ "command": input_message.data["command"],
232
+ "command_args": input_message.data["command_args"]
233
+ }
234
+ )
235
+
236
+ #detect and early exit
237
+ if self.flow_state["command"] == "finish":
238
+
239
+ self._state_update_dict(
240
+ {
241
+ "EARLY_EXIT": True,
242
+ "answer": self.flow_state["command_args"]["answer"],
243
+ "status": "finished"
244
+ }
245
+ )
246
+ self.flow_state["early_exit_flag"] = True
247
+
248
+
249
+ elif last_called == "MemoryRead":
250
+ self._state_update_dict(
251
+ self.memory_read_ouput_interface(input_message).data
252
+ )
253
+
254
+ elif last_called == "HumanFeedback":
255
+ self._state_update_dict(
256
+ self.human_feedback_ouput_interface(input_message).data
257
+ )
258
+
259
+ #detect early exit
260
+ if self.flow_state["human_feedback"].strip().lower() == "q":
261
+
262
+ self._state_update_dict(
263
+ {
264
+ "EARLY_EXIT": True,
265
+ "answer": "The user has chosen to exit before a final answer was generated.",
266
+ "status": "unfinished",
267
+ }
268
+ )
269
+
270
+ self.flow_state["early_exit_flag"] = True
271
+
272
+
273
+
274
+
275
+
276
+ def run(self,input_message):
277
+
278
+ self.register_data_to_state(input_message)
279
+
280
+ flow_to_call = self.get_next_flow_to_call()
281
+
282
+ if self.flow_state.get("early_exit_flag",False):
283
+ self.generate_reply()
284
+
285
+ elif flow_to_call == "MemoryRead":
286
+ self.call_memory_read()
287
+
288
+ elif flow_to_call == "Controller":
289
+ self.call_controller()
290
+
291
+ elif flow_to_call == "Executor":
292
+ self.call_executor()
293
+
294
+ elif flow_to_call == "HumanFeedback":
295
+ self.call_human_feedback()
296
+
297
+ elif flow_to_call == "MemoryWrite":
298
+ self.call_memory_write()
299
+ self.flow_state["current_round"] += 1
300
+
301
+ else:
302
+ self._on_reach_max_round()
303
+ self.generate_reply()
304
+
305
+ self.flow_state["last_called"] = self.get_next_flow_to_call()
306
+
307
+
__init__.py CHANGED
@@ -3,9 +3,9 @@ dependencies = [
3
  {"url": "aiflows/ControllerExecutorFlowModule",
4
  "revision": "coflows"},
5
  {"url": "aiflows/HumanStandardInputFlowModule",
6
- "revision": "main"},
7
  {"url": "aiflows/VectorStoreFlowModule",
8
- "revision": "main"},
9
  ]
10
  from aiflows import flow_verse
11
 
 
3
  {"url": "aiflows/ControllerExecutorFlowModule",
4
  "revision": "coflows"},
5
  {"url": "aiflows/HumanStandardInputFlowModule",
6
+ "revision": "coflows"},
7
  {"url": "aiflows/VectorStoreFlowModule",
8
+ "revision": "coflows"},
9
  ]
10
  from aiflows import flow_verse
11
 
run.py CHANGED
@@ -1,3 +1,5 @@
 
 
1
  import os
2
 
3
  import hydra
@@ -9,91 +11,109 @@ from aiflows.utils.general_helpers import read_yaml_file, quick_load_api_keys
9
 
10
  from aiflows import logging
11
  from aiflows.flow_cache import CACHING_PARAMETERS, clear_cache
 
12
  from aiflows.utils import serve_utils
13
  from aiflows.workers import run_dispatch_worker_thread
14
  from aiflows.messages import FlowMessage
15
  from aiflows.interfaces import KeyInterface
 
 
16
 
17
  CACHING_PARAMETERS.do_caching = False # Set to True in order to disable caching
18
  # clear_cache() # Uncomment this line to clear the cache
19
 
20
  logging.set_verbosity_debug()
21
 
22
- from aiflows import flow_verse
23
- # ~~~ Load Flow dependecies from FlowVerse ~~~
24
  dependencies = [
25
  {"url": "aiflows/AutoGPTFlowModule", "revision": os.getcwd()},
26
- {"url": "aiflows/LCToolFlowModule", "revision": "80c0c76181d90846ebff1057b8951d9689f93b62"},
27
  ]
28
 
 
29
  flow_verse.sync_dependencies(dependencies)
30
  if __name__ == "__main__":
31
- # ~~~ Set the API information ~~~
 
 
 
 
 
 
 
 
 
 
 
 
32
  # OpenAI backend
33
- api_information = [ApiInfo(backend_used="openai", api_key=os.getenv("OPENAI_API_KEY"))]
34
- # Azure backend
 
35
  # api_information = ApiInfo(backend_used = "azure",
36
  # api_base = os.getenv("AZURE_API_BASE"),
37
  # api_key = os.getenv("AZURE_OPENAI_KEY"),
38
  # api_version = os.getenv("AZURE_API_VERSION") )
39
 
40
- FLOW_MODULES_PATH = "./"
41
-
42
- jwt = os.getenv("COLINK_JWT")
43
- addr = os.getenv("LOCAL_COLINK_ADDRESS")
44
 
45
- cl = serve_utils.start_colink_component(
46
- "Reverse Number Demo",
47
- {"jwt": jwt, "addr": addr}
48
- )
49
 
50
- root_dir = "."
51
- cfg_path = os.path.join(root_dir, "demo.yaml")
52
- cfg = read_yaml_file(cfg_path)
53
 
 
54
  serve_utils.recursive_serve_flow(
55
  cl = cl,
56
- flow_type="demo_served",
57
  default_config=cfg,
58
  default_state=None,
59
- default_dispatch_point="coflows_dispatch",
60
  )
61
 
62
- #in case you haven't started the dispatch worker thread, uncomment the 2 lines
63
- #run_dispatch_worker_thread(cl, dispatch_point="coflows_dispatch", flow_modules_base_path=FLOW_MODULES_PATH)
64
- #run_dispatch_worker_thread(cl, dispatch_point="coflows_dispatch", flow_modules_base_path=FLOW_MODULES_PATH)
65
 
66
- quick_load_api_keys(cfg, api_information, key="api_infos")
67
-
68
-
69
- # ~~~ Get the data ~~~
70
- # data = {"id": 0, "goal": "Answer the following question: What is the population of Canada?"} # Uses wikipedia
71
- # data = {"id": 0, "goal": "Answer the following question: Who was the NBA champion in 2023?"} # Uses duckduckgo
72
- data = {
73
- "id": 0,
74
- "goal": "Answer the following question: What is the profession and date of birth of Michael Jordan?",
75
- }
76
- # At first, we retrieve information about Michael Jordan the basketball player
77
- # If we provide feedback, only in the first round, that we are not interested in the basketball player,
78
- # but the statistician, and skip the feedback in the next rounds, we get the correct answer
79
-
80
- # ~~~ Run inference ~~~
81
  proxy_flow = serve_utils.recursive_mount(
82
  cl=cl,
83
  client_id="local",
84
- flow_type="demo_served",
85
- config_overrides=cfg,
86
  initial_state=None,
87
  dispatch_point_override=None,
88
  )
89
- # ~~~ Print the output ~~~
 
 
 
 
 
 
 
 
90
  input_message = FlowMessage(
91
- data= data,
92
- src_flow="Coflows team",
93
- dst_flow=proxy_flow,
94
- is_input_msg=True
95
  )
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
96
 
97
- future = proxy_flow.ask(input_message)
98
 
99
- print(future.get_data())
 
 
1
+ """A simple script to run a Flow that can be used for development and debugging."""
2
+
3
  import os
4
 
5
  import hydra
 
11
 
12
  from aiflows import logging
13
  from aiflows.flow_cache import CACHING_PARAMETERS, clear_cache
14
+
15
  from aiflows.utils import serve_utils
16
  from aiflows.workers import run_dispatch_worker_thread
17
  from aiflows.messages import FlowMessage
18
  from aiflows.interfaces import KeyInterface
19
+ from aiflows.utils.colink_utils import start_colink_server
20
+ from aiflows.workers import run_dispatch_worker_thread
21
 
22
  CACHING_PARAMETERS.do_caching = False # Set to True in order to disable caching
23
  # clear_cache() # Uncomment this line to clear the cache
24
 
25
  logging.set_verbosity_debug()
26
 
27
+
 
28
  dependencies = [
29
  {"url": "aiflows/AutoGPTFlowModule", "revision": os.getcwd()},
30
+ {"url": "aiflows/LCToolFlowModule", "revision": "coflows"},
31
  ]
32
 
33
+ from aiflows import flow_verse
34
  flow_verse.sync_dependencies(dependencies)
35
  if __name__ == "__main__":
36
+
37
+ #1. ~~~~~ Set up a colink server ~~~~
38
+ FLOW_MODULES_PATH = "./"
39
+
40
+ cl = start_colink_server()
41
+
42
+
43
+ #2. ~~~~~Load flow config~~~~~~
44
+ root_dir = "."
45
+ cfg_path = os.path.join(root_dir, "demo.yaml")
46
+ cfg = read_yaml_file(cfg_path)
47
+
48
+ #2.1 ~~~ Set the API information ~~~
49
  # OpenAI backend
50
+ api_information = [ApiInfo(backend_used="openai",
51
+ api_key = os.getenv("OPENAI_API_KEY"))]
52
+ # # Azure backend
53
  # api_information = ApiInfo(backend_used = "azure",
54
  # api_base = os.getenv("AZURE_API_BASE"),
55
  # api_key = os.getenv("AZURE_OPENAI_KEY"),
56
  # api_version = os.getenv("AZURE_API_VERSION") )
57
 
 
 
 
 
58
 
59
+ quick_load_api_keys(cfg, api_information, key="api_infos")
 
 
 
60
 
 
 
 
61
 
62
+ #3. ~~~~ Serve The Flow ~~~~
63
  serve_utils.recursive_serve_flow(
64
  cl = cl,
65
+ flow_type="AutoGPTFlowModule",
66
  default_config=cfg,
67
  default_state=None,
68
+ default_dispatch_point="coflows_dispatch"
69
  )
70
 
71
+ #4. ~~~~~Start A Worker Thread~~~~~
72
+ run_dispatch_worker_thread(cl, dispatch_point="coflows_dispatch", flow_modules_base_path=FLOW_MODULES_PATH)
 
73
 
74
+ #5. ~~~~~Mount the flow and get its proxy~~~~~~
 
 
 
 
 
 
 
 
 
 
 
 
 
 
75
  proxy_flow = serve_utils.recursive_mount(
76
  cl=cl,
77
  client_id="local",
78
+ flow_type="AutoGPTFlowModule",
79
+ config_overrides=None,
80
  initial_state=None,
81
  dispatch_point_override=None,
82
  )
83
+
84
+ #6. ~~~ Get the data ~~~
85
+ data = {
86
+ "id": 0,
87
+ "goal": "Answer the following question: What is the profession and date of birth of Michael Jordan?",
88
+ }
89
+
90
+
91
+ #option1: use the FlowMessage class
92
  input_message = FlowMessage(
93
+ data=data,
 
 
 
94
  )
95
+
96
+ #option2: use the proxy_flow
97
+ #input_message = proxy_flow._package_input_message(data = data)
98
+
99
+ #7. ~~~ Run inference ~~~
100
+ future = proxy_flow.send_message_blocking(input_message)
101
+
102
+ #uncomment this line if you would like to get the full message back
103
+ #reply_message = future.get_message()
104
+ reply_data = future.get_data()
105
+
106
+ # ~~~ Print the output ~~~
107
+ print("~~~~~~Reply~~~~~~")
108
+ print(reply_data)
109
+
110
+
111
+ #8. ~~~~ (Optional) apply output interface on reply ~~~~
112
+ # output_interface = KeyInterface(
113
+ # keys_to_rename={"api_output": "answer"},
114
+ # )
115
+ # print("Output: ", output_interface(reply_data))
116
 
 
117
 
118
+ #9. ~~~~~Optional: Unserve Flow~~~~~~
119
+ # serve_utils.delete_served_flow(cl, "FlowModule")