nbaldwin commited on
Commit
1beb8a2
·
1 Parent(s): 7ba1689

coflows compatible+

Browse files
ControllerAtomicFlow.py CHANGED
@@ -2,7 +2,7 @@ import json
2
  from copy import deepcopy
3
  from typing import Any, Dict, List
4
  from flow_modules.aiflows.ChatFlowModule import ChatAtomicFlow
5
-
6
  from dataclasses import dataclass
7
 
8
 
@@ -117,14 +117,28 @@ class ControllerAtomicFlow(ChatAtomicFlow):
117
  # ~~~ Instantiate flow ~~~
118
  return cls(**kwargs)
119
 
120
- def run(self, input_data: Dict[str, Any]) -> Dict[str, Any]:
121
  """ This method runs the flow. Note that the response of the LLM is in the JSON format, but it's not a hard constraint (it can hallucinate and return an invalid JSON)
122
 
123
- :param input_data: The input data of the flow.
124
- :type input_data: Dict[str, Any]
125
- :return: The output data of the flow (thought, reasoning, criticism, command, command_args)
126
- :rtype: Dict[str, Any]
127
  """
128
- api_output = super().run(input_data)["api_output"].strip()
 
 
 
 
 
 
 
 
 
 
129
  response = json.loads(api_output)
130
- return response
 
 
 
 
 
 
 
2
  from copy import deepcopy
3
  from typing import Any, Dict, List
4
  from flow_modules.aiflows.ChatFlowModule import ChatAtomicFlow
5
+ from aiflows.messages import FlowMessage
6
  from dataclasses import dataclass
7
 
8
 
 
117
  # ~~~ Instantiate flow ~~~
118
  return cls(**kwargs)
119
 
120
+ def run(self, input_message: FlowMessage):
121
  """ This method runs the flow. Note that the response of the LLM is in the JSON format, but it's not a hard constraint (it can hallucinate and return an invalid JSON)
122
 
123
+ :param input_message: The input data of the flow.
124
+ :type input_message: FlowMessage
 
 
125
  """
126
+
127
+ input_data = input_message.data
128
+
129
+ if "goal" in input_data:
130
+ self.flow_state["goal"] = input_data["goal"]
131
+
132
+ else:
133
+ input_data["goal"] = self.flow_state["goal"]
134
+
135
+ api_output = self.query_llm(input_data)
136
+
137
  response = json.loads(api_output)
138
+
139
+ reply = self._package_output_message(
140
+ input_message=input_message,
141
+ response=response
142
+ )
143
+
144
+ self.reply_to_message(reply = reply, to = input_message)
ControllerAtomicFlow.yaml CHANGED
@@ -11,6 +11,7 @@ input_interface_non_initialized: # initial input keys
11
 
12
  input_interface_initialized: # input_keys
13
  - "observation"
 
14
 
15
  #######################################################
16
  # Output keys
@@ -67,10 +68,13 @@ system_message_prompt_template:
67
  human_message_prompt_template:
68
  _target_: aiflows.prompt_template.JinjaPrompt
69
  template: |2-
 
 
70
  Here is the response to your last action:
71
  {{observation}}
72
  input_variables:
73
  - "observation"
 
74
 
75
  init_human_message_prompt_template:
76
  _target_: aiflows.prompt_template.JinjaPrompt
 
11
 
12
  input_interface_initialized: # input_keys
13
  - "observation"
14
+ - "goal"
15
 
16
  #######################################################
17
  # Output keys
 
68
  human_message_prompt_template:
69
  _target_: aiflows.prompt_template.JinjaPrompt
70
  template: |2-
71
+ Here is the goal you need to achieve:
72
+ {{goal}}
73
  Here is the response to your last action:
74
  {{observation}}
75
  input_variables:
76
  - "observation"
77
+ - "goal"
78
 
79
  init_human_message_prompt_template:
80
  _target_: aiflows.prompt_template.JinjaPrompt
ControllerExecutorFlow.py CHANGED
@@ -4,6 +4,7 @@ from aiflows.base_flows import CompositeFlow
4
  from aiflows.utils import logging
5
 
6
  from .ControllerAtomicFlow import ControllerAtomicFlow
 
7
  from aiflows.interfaces import KeyInterface
8
  logging.set_verbosity_debug()
9
  log = logging.get_logger(__name__)
@@ -65,51 +66,133 @@ class ControllerExecutorFlow(CompositeFlow):
65
  :param subflows: A list of subflows. Required when instantiating the subflow programmatically (it replaces subflows_config from flow_config).
66
  """
67
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
68
  def _on_reach_max_round(self):
69
  """ This method is called when the flow reaches the maximum amount of rounds. It updates the state of the flow and starts the process of terminating the flow."""
70
  self._state_update_dict({
 
71
  "answer": "The maximum amount of rounds was reached before the model found an answer.",
72
  "status": "unfinished"
73
  })
74
 
75
- def _single_round_controller_executor(self, reply: Dict[str,Any]) -> Dict[str,Any]:
 
 
 
76
 
77
- in_data = reply["executor_reply"]
 
 
 
 
 
 
 
 
 
 
 
78
 
79
- controller_reply = self.ask_subflow("Controller", in_data).get_data()
80
 
81
- if controller_reply["command"] == "finish":
82
- return {
83
- "EARLY_EXIT": True,
84
- "answer": controller_reply["command_args"]["answer"],
85
- "status": "finished"
86
- }
87
 
88
- executor_reply = {
89
- "observation": self.ask_subflow(controller_reply["command"], controller_reply["command_args"]).get_data()
90
- }
91
 
92
- return {
93
- "controller_reply": controller_reply,
94
- "executor_reply": executor_reply
95
- }
96
-
97
- def run(self,input_data):
98
-
99
- reply = {
100
- "executor_reply": input_data,
101
- "controller_reply": None
102
- }
103
-
104
- for round in range(self.flow_config["max_rounds"]):
105
- reply = self._single_round_controller_executor(reply)
106
 
107
- if reply.get("EARLY_EXIT",False):
108
- return reply
109
-
110
- self._on_reach_max_round()
111
- return {
112
- "EARLY_EXIT": False,
113
- "answer": reply["executor_reply"]["observation"],
114
- "status": "unfinished"
115
- }
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
4
  from aiflows.utils import logging
5
 
6
  from .ControllerAtomicFlow import ControllerAtomicFlow
7
+ from aiflows.messages import FlowMessage
8
  from aiflows.interfaces import KeyInterface
9
  logging.set_verbosity_debug()
10
  log = logging.get_logger(__name__)
 
66
  :param subflows: A list of subflows. Required when instantiating the subflow programmatically (it replaces subflows_config from flow_config).
67
  """
68
 
69
+ def __init__(self,**kwargs):
70
+ super().__init__(**kwargs)
71
+
72
+ self.input_interface_controller = KeyInterface(
73
+ keys_to_select = ["goal","observation"],
74
+ )
75
+ self.input_interface_first_round_controller = KeyInterface(
76
+ keys_to_select = ["goal"],
77
+ )
78
+
79
+ self.reply_interface = KeyInterface(
80
+ keys_to_select = ["answer","status", "EARLY_EXIT"],
81
+ )
82
+
83
+ self.next_flow_to_call = {
84
+ None: "Controller",
85
+ "Controller": "Executor",
86
+ "Executor": "Controller"
87
+ }
88
+
89
+ def generate_reply(self):
90
+ """ This method generates the reply of the flow. It's called when the flow is finished. """
91
+
92
+ reply = self._package_output_message(
93
+ input_message = self.flow_state["input_message"],
94
+ response = self.reply_interface(self.flow_state)
95
+ )
96
+ self.reply_to_message(reply,to=self.flow_state["input_message"])
97
+
98
+ def get_next_flow_to_call(self):
99
+
100
+ if self.flow_config["max_rounds"] is not None and self.flow_state["current_round"] >= self.flow_config["max_rounds"]:
101
+ return None
102
+
103
+ return self.next_flow_to_call[self.flow_state["last_called"]]
104
+
105
  def _on_reach_max_round(self):
106
  """ This method is called when the flow reaches the maximum amount of rounds. It updates the state of the flow and starts the process of terminating the flow."""
107
  self._state_update_dict({
108
+ "EARLY_EXIT": False,
109
  "answer": "The maximum amount of rounds was reached before the model found an answer.",
110
  "status": "unfinished"
111
  })
112
 
113
+ def set_up_flow_state(self):
114
+ super().set_up_flow_state()
115
+ self.flow_state["last_called"] = None
116
+ self.flow_state["current_round"] = 0
117
 
118
+
119
+ def call_controller(self):
120
+ #first_round
121
+ if self.flow_state["last_called"] is None:
122
+ input_interface = self.input_interface_first_round_controller
123
+ else:
124
+ input_interface = self.input_interface_controller
125
+
126
+ message = self._package_input_message(
127
+ data = input_interface(self.flow_state),
128
+ dst_flow = "Controller"
129
+ )
130
 
131
+ self.subflows["Controller"].send_message_async(message, pipe_to= self.flow_config["flow_ref"])
132
 
133
+ def call_executor(self):
 
 
 
 
 
134
 
135
+ #detect and early exit
136
+ if self.flow_state["command"] == "finish":
 
137
 
138
+ self._state_update_dict(
139
+ {
140
+ "EARLY_EXIT": True,
141
+ "answer": self.flow_state["command_args"]["answer"],
142
+ "status": "finished"
143
+ }
144
+ )
145
+ self.generate_reply()
 
 
 
 
 
 
146
 
147
+ #call executor
148
+ else:
149
+ executor_branch_to_call = self.flow_state["command"]
150
+ message = self._package_input_message(
151
+ data = self.flow_state["command_args"],
152
+ dst_flow = executor_branch_to_call
153
+ )
154
+
155
+ self.subflows[executor_branch_to_call].send_message_async(message, pipe_to= self.flow_config["flow_ref"])
156
+
157
+
158
+ def register_data_to_state(self, input_message):
159
+ last_called = self.flow_state["last_called"]
160
+
161
+ if last_called is None:
162
+ self.flow_state["input_message"] = input_message
163
+ self.flow_state["goal"] = input_message.data["goal"]
164
+
165
+ elif last_called == "Executor":
166
+ self.flow_state["observation"] = input_message.data
167
+
168
+ else:
169
+ self._state_update_dict(
170
+ {
171
+ "command": input_message.data["command"],
172
+ "command_args": input_message.data["command_args"]
173
+ }
174
+ )
175
+
176
+ def run(self,input_message: FlowMessage):
177
+ """ Runs the WikiSearch Atomic Flow. It's used to execute a Wikipedia search and get page summaries.
178
+
179
+ :param input_message: The input message
180
+ :type input_message: FlowMessage
181
+ """
182
+
183
+ self.register_data_to_state(input_message)
184
+
185
+ flow_to_call = self.get_next_flow_to_call()
186
+
187
+ if flow_to_call == "Controller":
188
+ self.flow_state["observation"] = input_message.data
189
+ self.call_controller()
190
+ elif flow_to_call == "Executor":
191
+ self.call_executor()
192
+ self.flow_state["current_round"] += 1
193
+ else:
194
+ self._on_reach_max_round()
195
+ self.generate_reply()
196
+
197
+ self.flow_state["last_called"] = flow_to_call
198
+
ControllerExecutorFlow.yaml CHANGED
@@ -15,9 +15,10 @@ subflows_config:
15
  name: "ControllerAtomicFlow"
16
  description: "A flow that calls other flows to solve a problem."
17
  _target_: flow_modules.aiflows.ControllerAtomicFlow.instantiate_from_default_config
18
- finish:
19
- description: "Signal that the objective has been satisfied, and returns the answer to the user."
20
- input_args: ["answer"]
 
21
  # E.g.,
22
  # commands:
23
  # wiki_search:
@@ -29,6 +30,3 @@ subflows_config:
29
  # wiki_search:
30
  # _target_: .WikiSearchAtomicFlow.instantiate_from_default_config
31
 
32
-
33
- early_exit_key: "EARLY_EXIT"
34
-
 
15
  name: "ControllerAtomicFlow"
16
  description: "A flow that calls other flows to solve a problem."
17
  _target_: flow_modules.aiflows.ControllerAtomicFlow.instantiate_from_default_config
18
+ commands:
19
+ finish:
20
+ description: "Signal that the objective has been satisfied, and returns the answer to the user."
21
+ input_args: ["answer"]
22
  # E.g.,
23
  # commands:
24
  # wiki_search:
 
30
  # wiki_search:
31
  # _target_: .WikiSearchAtomicFlow.instantiate_from_default_config
32
 
 
 
 
WikiSearchAtomicFlow.py CHANGED
@@ -3,7 +3,7 @@ from copy import deepcopy
3
  from typing import List, Dict, Optional, Any
4
 
5
  from aiflows.base_flows import AtomicFlow
6
-
7
  from aiflows.utils import logging
8
  from .wikipediaAPI import WikipediaAPIWrapper
9
 
@@ -44,15 +44,13 @@ class WikiSearchAtomicFlow(AtomicFlow):
44
  super().__init__(**kwargs)
45
 
46
  def run(self,
47
- input_data: Dict[str, Any]) -> Dict[str, Any]:
48
  """ Runs the WikiSearch Atomic Flow. It's used to execute a Wikipedia search and get page summaries.
49
 
50
- :param input_data: The input data dictionary
51
- :type input_data: Dict[str, Any]
52
- :return: The output data dictionary
53
- :rtype: Dict[str, Any]
54
  """
55
-
56
  # ~~~ Process input ~~~
57
  term = input_data.get("search_term", None)
58
  api_wrapper = WikipediaAPIWrapper(
@@ -70,4 +68,10 @@ class WikiSearchAtomicFlow(AtomicFlow):
70
 
71
  # Log the update to the flow messages list
72
  observation = search_response["wiki_content"] if search_response["wiki_content"] else search_response["relevant_pages"]
73
- return {"wiki_content": observation}
 
 
 
 
 
 
 
3
  from typing import List, Dict, Optional, Any
4
 
5
  from aiflows.base_flows import AtomicFlow
6
+ from aiflows.messages import FlowMessage
7
  from aiflows.utils import logging
8
  from .wikipediaAPI import WikipediaAPIWrapper
9
 
 
44
  super().__init__(**kwargs)
45
 
46
  def run(self,
47
+ input_message: FlowMessage):
48
  """ Runs the WikiSearch Atomic Flow. It's used to execute a Wikipedia search and get page summaries.
49
 
50
+ :param input_message: The input message
51
+ :type input_message: FlowMessage
 
 
52
  """
53
+ input_data = input_message.data
54
  # ~~~ Process input ~~~
55
  term = input_data.get("search_term", None)
56
  api_wrapper = WikipediaAPIWrapper(
 
68
 
69
  # Log the update to the flow messages list
70
  observation = search_response["wiki_content"] if search_response["wiki_content"] else search_response["relevant_pages"]
71
+
72
+ reply = self._package_output_message(
73
+ input_message = input_message,
74
+ response = {"wiki_content": observation},
75
+ )
76
+
77
+ self.reply_to_message(reply=reply, to=input_message)
__init__.py CHANGED
@@ -1,6 +1,6 @@
1
  # ~~~ Specify the dependencies ~~~
2
  dependencies = [
3
- {"url": "aiflows/ChatFlowModule", "revision": "main"},
4
  ]
5
  from aiflows import flow_verse
6
 
 
1
  # ~~~ Specify the dependencies ~~~
2
  dependencies = [
3
+ {"url": "aiflows/ChatFlowModule", "revision": "coflows"},
4
  ]
5
  from aiflows import flow_verse
6
 
demo.yaml CHANGED
@@ -9,12 +9,10 @@ subflows_config:
9
  wiki_search:
10
  description: "Performs a search on Wikipedia."
11
  input_args: ["search_term"]
12
- ddg_search:
13
- description: "Query the search engine DuckDuckGo."
14
- input_args: ["query"]
15
- finish:
16
- description: "Signal that the objective has been satisfied, and returns the answer to the user."
17
- input_args: ["answer"]
18
  backend:
19
  _target_: aiflows.backends.llm_lite.LiteLLMBackend
20
  api_infos: ???
 
9
  wiki_search:
10
  description: "Performs a search on Wikipedia."
11
  input_args: ["search_term"]
12
+ # ddg_search:
13
+ # description: "Query the search engine DuckDuckGo."
14
+ # input_args: ["query"]
15
+
 
 
16
  backend:
17
  _target_: aiflows.backends.llm_lite.LiteLLMBackend
18
  api_infos: ???
run.py CHANGED
@@ -1,3 +1,5 @@
 
 
1
  import os
2
 
3
  import hydra
@@ -9,87 +11,109 @@ from aiflows.utils.general_helpers import read_yaml_file, quick_load_api_keys
9
 
10
  from aiflows import logging
11
  from aiflows.flow_cache import CACHING_PARAMETERS, clear_cache
 
12
  from aiflows.utils import serve_utils
13
  from aiflows.workers import run_dispatch_worker_thread
14
  from aiflows.messages import FlowMessage
15
  from aiflows.interfaces import KeyInterface
 
 
16
 
17
  CACHING_PARAMETERS.do_caching = False # Set to True in order to disable caching
18
  # clear_cache() # Uncomment this line to clear the cache
19
 
20
  logging.set_verbosity_debug()
21
 
22
- from aiflows import flow_verse
23
- # ~~~ Load Flow dependecies from FlowVerse ~~~
24
  dependencies = [
25
- {"url": "aiflows/ControllerExecutorFlowModule", "revision": os.getcwd()},
26
  ]
27
 
 
28
  flow_verse.sync_dependencies(dependencies)
29
-
30
  if __name__ == "__main__":
31
- # ~~~ Set the API information ~~~
32
- # OpenAI backend
33
- api_information = [ApiInfo(backend_used="openai", api_key=os.getenv("OPENAI_API_KEY"))]
34
- # Azure backend
35
- # api_information = [ApiInfo(backend_used = "azure",
36
- # api_base = os.getenv("AZURE_API_BASE"),
37
- # api_key = os.getenv("AZURE_OPENAI_KEY"),
38
- # api_version = os.getenv("AZURE_API_VERSION") )]
39
-
40
- FLOW_MODULES_PATH = "./"
41
 
42
- jwt = os.getenv("COLINK_JWT")
43
- addr = os.getenv("LOCAL_COLINK_ADDRESS")
44
 
45
- cl = serve_utils.start_colink_component(
46
- "Reverse Number Demo",
47
- {"jwt": jwt, "addr": addr}
48
- )
49
- # path_to_output_file = "output.jsonl" # Uncomment this line to save the output to disk
50
 
 
51
  root_dir = "."
52
  cfg_path = os.path.join(root_dir, "demo.yaml")
53
  cfg = read_yaml_file(cfg_path)
54
- # put the API information in the config
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
55
  serve_utils.recursive_serve_flow(
56
  cl = cl,
57
- flow_type="ReAct_served",
58
  default_config=cfg,
59
  default_state=None,
60
- default_dispatch_point="coflows_dispatch",
61
  )
62
 
63
- #in case you haven't started the dispatch worker thread, uncomment
64
- #run_dispatch_worker_thread(cl, dispatch_point="coflows_dispatch", flow_modules_base_path=FLOW_MODULES_PATH)
65
-
66
- quick_load_api_keys(cfg, api_information, key="api_infos")
67
 
68
- # ~~~ Get the data ~~~
69
- # This can be a list of samples
70
- # data = {"id": 0, "goal": "Answer the following question: What is the population of Canada?"} # Uses wikipedia
71
- # data = {"id": 0, "goal": "Answer the following question: Who was the NBA champion in 2023?"}
72
- data = {
73
- "id": 0,
74
- "goal": "Answer the following question: What is the profession and date of birth of Michael Jordan?",
75
- }
76
- # ~~~ Run inference ~~~
77
  proxy_flow = serve_utils.recursive_mount(
78
  cl=cl,
79
  client_id="local",
80
- flow_type="ReAct_served",
81
- config_overrides=cfg,
82
  initial_state=None,
83
  dispatch_point_override=None,
84
  )
85
- # ~~~ Print the output ~~~
 
 
 
 
 
 
 
 
86
  input_message = FlowMessage(
87
- data= data,
88
- src_flow="Coflows team",
89
- dst_flow=proxy_flow,
90
- is_input_msg=True
91
  )
 
 
 
 
 
 
92
 
93
- future = proxy_flow.ask(input_message)
 
 
94
 
95
- print(future.get_data())
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """A simple script to run a Flow that can be used for development and debugging."""
2
+
3
  import os
4
 
5
  import hydra
 
11
 
12
  from aiflows import logging
13
  from aiflows.flow_cache import CACHING_PARAMETERS, clear_cache
14
+
15
  from aiflows.utils import serve_utils
16
  from aiflows.workers import run_dispatch_worker_thread
17
  from aiflows.messages import FlowMessage
18
  from aiflows.interfaces import KeyInterface
19
+ from aiflows.utils.colink_utils import start_colink_server
20
+ from aiflows.workers import run_dispatch_worker_thread
21
 
22
  CACHING_PARAMETERS.do_caching = False # Set to True in order to disable caching
23
  # clear_cache() # Uncomment this line to clear the cache
24
 
25
  logging.set_verbosity_debug()
26
 
27
+
 
28
  dependencies = [
29
+ {"url": "aiflows/ControllerExecutorFlowModule", "revision": os.getcwd()}
30
  ]
31
 
32
+ from aiflows import flow_verse
33
  flow_verse.sync_dependencies(dependencies)
 
34
  if __name__ == "__main__":
 
 
 
 
 
 
 
 
 
 
35
 
36
+ #1. ~~~~~ Set up a colink server ~~~~
37
+ FLOW_MODULES_PATH = "./"
38
 
39
+ cl = start_colink_server()
40
+
 
 
 
41
 
42
+ #2. ~~~~~Load flow config~~~~~~
43
  root_dir = "."
44
  cfg_path = os.path.join(root_dir, "demo.yaml")
45
  cfg = read_yaml_file(cfg_path)
46
+
47
+ #2.1 ~~~ Set the API information ~~~
48
+ # OpenAI backend
49
+ api_information = [ApiInfo(backend_used="openai",
50
+ api_key = os.getenv("OPENAI_API_KEY"))]
51
+ # # Azure backend
52
+ # api_information = ApiInfo(backend_used = "azure",
53
+ # api_base = os.getenv("AZURE_API_BASE"),
54
+ # api_key = os.getenv("AZURE_OPENAI_KEY"),
55
+ # api_version = os.getenv("AZURE_API_VERSION") )
56
+
57
+
58
+ quick_load_api_keys(cfg, api_information, key="api_infos")
59
+
60
+
61
+ #3. ~~~~ Serve The Flow ~~~~
62
  serve_utils.recursive_serve_flow(
63
  cl = cl,
64
+ flow_type="ControllerExecutorFlowModule",
65
  default_config=cfg,
66
  default_state=None,
67
+ default_dispatch_point="coflows_dispatch"
68
  )
69
 
70
+ #4. ~~~~~Start A Worker Thread~~~~~
71
+ run_dispatch_worker_thread(cl, dispatch_point="coflows_dispatch", flow_modules_base_path=FLOW_MODULES_PATH)
 
 
72
 
73
+ #5. ~~~~~Mount the flow and get its proxy~~~~~~
 
 
 
 
 
 
 
 
74
  proxy_flow = serve_utils.recursive_mount(
75
  cl=cl,
76
  client_id="local",
77
+ flow_type="ControllerExecutorFlowModule",
78
+ config_overrides=None,
79
  initial_state=None,
80
  dispatch_point_override=None,
81
  )
82
+
83
+ #6. ~~~ Get the data ~~~
84
+ data = {
85
+ "id": 0,
86
+ "goal": "Answer the following question: What is the profession and date of birth of Michael Jordan?",
87
+ }
88
+
89
+
90
+ #option1: use the FlowMessage class
91
  input_message = FlowMessage(
92
+ data=data,
 
 
 
93
  )
94
+
95
+ #option2: use the proxy_flow
96
+ #input_message = proxy_flow._package_input_message(data = data)
97
+
98
+ #7. ~~~ Run inference ~~~
99
+ future = proxy_flow.send_message_blocking(input_message)
100
 
101
+ #uncomment this line if you would like to get the full message back
102
+ #reply_message = future.get_message()
103
+ reply_data = future.get_data()
104
 
105
+ # ~~~ Print the output ~~~
106
+ print("~~~~~~Reply~~~~~~")
107
+ print(reply_data)
108
+
109
+
110
+ #8. ~~~~ (Optional) apply output interface on reply ~~~~
111
+ # output_interface = KeyInterface(
112
+ # keys_to_rename={"api_output": "answer"},
113
+ # )
114
+ # print("Output: ", output_interface(reply_data))
115
+
116
+
117
+ #9. ~~~~~Optional: Unserve Flow~~~~~~
118
+ # serve_utils.delete_served_flow(cl, "FlowModule")
119
+