nbaldwin commited on
Commit
5fbaf2a
1 Parent(s): bad377a

working version coflows

Browse files
Files changed (4) hide show
  1. ControllerExecutorFlow.py +30 -26
  2. ControllerExecutorFlow.yaml +4 -30
  3. demo.yaml +27 -25
  4. run.py +56 -38
ControllerExecutorFlow.py CHANGED
@@ -1,15 +1,15 @@
1
  from typing import Dict, Any
2
 
3
- from aiflows.base_flows import CircularFlow
4
  from aiflows.utils import logging
5
 
6
  from .ControllerAtomicFlow import ControllerAtomicFlow
7
-
8
  logging.set_verbosity_debug()
9
  log = logging.get_logger(__name__)
10
 
11
 
12
- class ControllerExecutorFlow(CircularFlow):
13
  """ This class implements a ControllerExecutorFlow. It's composed of a ControllerAtomicFlow and an ExecutorFlow.
14
  Where typically the ControllerAtomicFlow is uses a LLM to decide which command to call and the ExecutorFlow (branching flow) is used to execute the command.
15
 
@@ -64,29 +64,33 @@ class ControllerExecutorFlow(CircularFlow):
64
  :param flow_config: The configuration of the flow (see Configuration Parameters).
65
  :param subflows: A list of subflows. Required when instantiating the subflow programmatically (it replaces subflows_config from flow_config).
66
  """
67
- def _on_reach_max_round(self):
68
- """ This method is called when the flow reaches the maximum amount of rounds. It updates the state of the flow and starts the process of terminating the flow."""
69
- self._state_update_dict({
70
- "answer": "The maximum amount of rounds was reached before the model found an answer.",
71
- "status": "unfinished"
72
- })
 
 
73
 
74
- @CircularFlow.output_msg_payload_processor
75
- def detect_finish_or_continue(self, output_payload: Dict[str, Any], src_flow: ControllerAtomicFlow) -> Dict[str, Any]:
76
- """ This method is called when the ExecutorAtomicFlow receives a message from the ControllerAtomicFlow. It checks if the flow should finish or continue.
77
 
78
- :param output_payload: The output payload of the ControllerAtomicFlow.
79
- :type output_payload: Dict[str, Any]
80
- :param src_flow: The ControllerAtomicFlow.
81
- :type src_flow: ControllerAtomicFlow
82
- :return: The output payload of the ControllerAtomicFlow.
83
- """
84
- command = output_payload["command"]
85
- if command == "finish":
86
- return {
87
- "EARLY_EXIT": True,
88
- "answer": output_payload["command_args"]["answer"],
89
- "status": "finished"
 
90
  }
91
- else:
92
- return output_payload
 
 
 
 
1
  from typing import Dict, Any
2
 
3
+ from aiflows.base_flows import CompositeFlow
4
  from aiflows.utils import logging
5
 
6
  from .ControllerAtomicFlow import ControllerAtomicFlow
7
+ from aiflows.interfaces import KeyInterface
8
  logging.set_verbosity_debug()
9
  log = logging.get_logger(__name__)
10
 
11
 
12
+ class ControllerExecutorFlow(CompositeFlow):
13
  """ This class implements a ControllerExecutorFlow. It's composed of a ControllerAtomicFlow and an ExecutorFlow.
14
  Where typically the ControllerAtomicFlow is uses a LLM to decide which command to call and the ExecutorFlow (branching flow) is used to execute the command.
15
 
 
64
  :param flow_config: The configuration of the flow (see Configuration Parameters).
65
  :param subflows: A list of subflows. Required when instantiating the subflow programmatically (it replaces subflows_config from flow_config).
66
  """
67
+ def __init__(self, flow_config: Dict[str, Any], subflows: Dict[str, Any] = None):
68
+ super().__init__(flow_config, subflows)
69
+
70
+ def set_up_flow_state(self):
71
+ super().set_up_flow_state()
72
+
73
+
74
+ def run(self,input_data):
75
 
76
+ executor_reply = input_data
 
 
77
 
78
+ for round in range(self.flow_config["max_rounds"]):
79
+ controller_reply = self.ask_subflow("Controller", executor_reply).get_data()
80
+ if controller_reply["command"] == "finish":
81
+ return {
82
+ "EARLY_EXIT": True,
83
+ "answer": controller_reply["command_args"]["answer"],
84
+ "status": "finished"
85
+ }
86
+
87
+
88
+
89
+ executor_reply = {
90
+ "observation": self.ask_subflow(controller_reply["command"], controller_reply["command_args"]).get_data()
91
  }
92
+
93
+ return {
94
+ "answer": "The maximum amount of rounds was reached before the model found an answer.",
95
+ "status": "unfinished"
96
+ }
ControllerExecutorFlow.yaml CHANGED
@@ -12,6 +12,8 @@ output_interface:
12
  ### Subflows specification
13
  subflows_config:
14
  Controller:
 
 
15
  _target_: flow_modules.aiflows.ControllerAtomicFlow.instantiate_from_default_config
16
  finish:
17
  description: "Signal that the objective has been satisfied, and returns the answer to the user."
@@ -23,38 +25,10 @@ subflows_config:
23
  # input_args: ["search_term"]
24
 
25
 
26
- Executor:
27
- _target_: aiflows.base_flows.BranchingFlow.instantiate_from_default_config
28
  # E.g.,
29
- # subflows_config:
30
- # wiki_search:
31
- # _target_: .WikiSearchAtomicFlow.instantiate_from_default_config
32
 
33
 
34
  early_exit_key: "EARLY_EXIT"
35
 
36
- topology:
37
- - goal: "Select the next action and prepare the input for the executor."
38
- input_interface:
39
- _target_: aiflows.interfaces.KeyInterface
40
- additional_transformations:
41
- - _target_: aiflows.data_transformations.KeyMatchInput
42
- flow: Controller
43
- output_interface:
44
- _target_: ControllerExecutorFlow.detect_finish_or_continue
45
- reset: false
46
-
47
- - goal: "Execute the action specified by the Controller."
48
- input_interface:
49
- _target_: aiflows.interfaces.KeyInterface
50
- keys_to_rename:
51
- command: branch
52
- command_args: branch_input_data
53
- keys_to_select: ["branch", "branch_input_data"]
54
- flow: Executor
55
- output_interface:
56
- _target_: aiflows.interfaces.KeyInterface
57
- keys_to_rename:
58
- branch_output_data: observation
59
- keys_to_select: ["observation"]
60
- reset: false
 
12
  ### Subflows specification
13
  subflows_config:
14
  Controller:
15
+ name: "ControllerAtomicFlow"
16
+ description: "A flow that calls other flows to solve a problem."
17
  _target_: flow_modules.aiflows.ControllerAtomicFlow.instantiate_from_default_config
18
  finish:
19
  description: "Signal that the objective has been satisfied, and returns the answer to the user."
 
25
  # input_args: ["search_term"]
26
 
27
 
 
 
28
  # E.g.,
29
+ # wiki_search:
30
+ # _target_: .WikiSearchAtomicFlow.instantiate_from_default_config
 
31
 
32
 
33
  early_exit_key: "EARLY_EXIT"
34
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
demo.yaml CHANGED
@@ -1,27 +1,29 @@
1
- flow:
2
- _target_: flow_modules.aiflows.ControllerExecutorFlowModule.ControllerExecutorFlow.instantiate_from_default_config
3
- max_rounds: 30
4
 
5
- ### Subflows specification
6
- subflows_config:
7
- Controller:
8
- _target_: flow_modules.aiflows.ControllerExecutorFlowModule.ControllerAtomicFlow.instantiate_from_default_config
9
- commands:
10
- wiki_search:
11
- description: "Performs a search on Wikipedia."
12
- input_args: [ "search_term" ]
13
- finish:
14
- description: "Signal that the objective has been satisfied, and returns the answer to the user."
15
- input_args: [ "answer" ]
16
- backend:
17
- _target_: aiflows.backends.llm_lite.LiteLLMBackend
18
- api_infos: ???
19
- model_name:
20
- openai: "gpt-3.5-turbo"
21
- azure: "azure/gpt-4"
 
 
 
22
 
23
- Executor:
24
- _target_: aiflows.base_flows.BranchingFlow.instantiate_from_default_config
25
- subflows_config:
26
- wiki_search:
27
- _target_: flow_modules.aiflows.ControllerExecutorFlowModule.WikiSearchAtomicFlow.instantiate_from_default_config
 
1
+ _target_: flow_modules.aiflows.ControllerExecutorFlowModule.ControllerExecutorFlow.instantiate_from_default_config
2
+ max_rounds: 30
 
3
 
4
+ ### Subflows specification
5
+ subflows_config:
6
+ Controller:
7
+ _target_: flow_modules.aiflows.ControllerExecutorFlowModule.ControllerAtomicFlow.instantiate_from_default_config
8
+ commands:
9
+ wiki_search:
10
+ description: "Performs a search on Wikipedia."
11
+ input_args: ["search_term"]
12
+ ddg_search:
13
+ description: "Query the search engine DuckDuckGo."
14
+ input_args: ["query"]
15
+ finish:
16
+ description: "Signal that the objective has been satisfied, and returns the answer to the user."
17
+ input_args: ["answer"]
18
+ backend:
19
+ _target_: aiflows.backends.llm_lite.LiteLLMBackend
20
+ api_infos: ???
21
+ model_name:
22
+ openai: "gpt-3.5-turbo"
23
+ azure: "azure/gpt-4"
24
 
25
+
26
+ wiki_search:
27
+ _target_: flow_modules.aiflows.ControllerExecutorFlowModule.WikiSearchAtomicFlow.instantiate_from_default_config
28
+ name: "WikiSearchAtomicFlow"
29
+ description: "A flow that searches Wikipedia for information."
run.py CHANGED
@@ -5,73 +5,91 @@ import hydra
5
  import aiflows
6
  from aiflows.flow_launchers import FlowLauncher
7
  from aiflows.backends.api_info import ApiInfo
8
- from aiflows.utils.general_helpers import read_yaml_file
9
 
10
  from aiflows import logging
11
  from aiflows.flow_cache import CACHING_PARAMETERS, clear_cache
 
 
 
 
12
 
13
  CACHING_PARAMETERS.do_caching = False # Set to True in order to disable caching
14
  # clear_cache() # Uncomment this line to clear the cache
15
 
16
  logging.set_verbosity_debug()
17
 
 
 
18
  dependencies = [
19
- {"url": "aiflows/ControllerExecutorFlowModule", "revision": os.getcwd()},
20
  ]
21
- from aiflows import flow_verse
22
 
23
  flow_verse.sync_dependencies(dependencies)
24
 
25
  if __name__ == "__main__":
26
  # ~~~ Set the API information ~~~
27
  # OpenAI backend
28
- api_information = [ApiInfo(backend_used="openai",
29
- api_key = os.getenv("OPENAI_API_KEY"))]
30
  # Azure backend
31
- # api_information = ApiInfo(backend_used = "azure",
32
  # api_base = os.getenv("AZURE_API_BASE"),
33
  # api_key = os.getenv("AZURE_OPENAI_KEY"),
34
- # api_version = os.getenv("AZURE_API_VERSION") )
35
 
36
- path_to_output_file = None
37
- # path_to_output_file = "output.jsonl" # Uncomment this line to save the output to disk
38
 
 
 
 
 
 
 
 
 
 
39
  root_dir = "."
40
  cfg_path = os.path.join(root_dir, "demo.yaml")
41
  cfg = read_yaml_file(cfg_path)
42
- # print(cfg["flow"].keys())
43
- cfg["flow"]["subflows_config"]["Controller"]["backend"]["api_infos"] = api_information
 
 
 
 
 
 
44
 
45
- # ~~~ Instantiate the Flow ~~~
46
- flow_with_interfaces = {
47
- "flow": hydra.utils.instantiate(cfg['flow'], _recursive_=False, _convert_="partial"),
48
- "input_interface": (
49
- None
50
- if cfg.get( "input_interface", None) is None
51
- else hydra.utils.instantiate(cfg['input_interface'], _recursive_=False)
52
- ),
53
- "output_interface": (
54
- None
55
- if cfg.get( "output_interface", None) is None
56
- else hydra.utils.instantiate(cfg['output_interface'], _recursive_=False)
57
- ),
58
- }
59
 
60
  # ~~~ Get the data ~~~
61
  # This can be a list of samples
62
  # data = {"id": 0, "goal": "Answer the following question: What is the population of Canada?"} # Uses wikipedia
63
- data = {"id": 0, "goal": "Answer the following question: Who was the NBA champion in 2023?"}
64
-
 
 
 
65
  # ~~~ Run inference ~~~
66
- path_to_output_file = None
67
- # path_to_output_file = "output.jsonl" # Uncomment this line to save the output to disk
68
-
69
- _, outputs = FlowLauncher.launch(
70
- flow_with_interfaces=flow_with_interfaces,
71
- data=data,
72
- path_to_output_file=path_to_output_file
73
- )
74
-
75
  # ~~~ Print the output ~~~
76
- flow_output_data = outputs[0]
77
- print(flow_output_data)
 
 
 
 
 
 
 
 
 
5
  import aiflows
6
  from aiflows.flow_launchers import FlowLauncher
7
  from aiflows.backends.api_info import ApiInfo
8
+ from aiflows.utils.general_helpers import read_yaml_file, quick_load_api_keys
9
 
10
  from aiflows import logging
11
  from aiflows.flow_cache import CACHING_PARAMETERS, clear_cache
12
+ from aiflows.utils import serve_utils
13
+ from aiflows.workers import run_dispatch_worker_thread
14
+ from aiflows.messages import FlowMessage
15
+ from aiflows.interfaces import KeyInterface
16
 
17
  CACHING_PARAMETERS.do_caching = False # Set to True in order to disable caching
18
  # clear_cache() # Uncomment this line to clear the cache
19
 
20
  logging.set_verbosity_debug()
21
 
22
+ from aiflows import flow_verse
23
+ # ~~~ Load Flow dependecies from FlowVerse ~~~
24
  dependencies = [
25
+ {"url": "aiflows/ControllerExecutorFlowModule", "revision": os.getcwd()},
26
  ]
 
27
 
28
  flow_verse.sync_dependencies(dependencies)
29
 
30
  if __name__ == "__main__":
31
  # ~~~ Set the API information ~~~
32
  # OpenAI backend
33
+ api_information = [ApiInfo(backend_used="openai", api_key=os.getenv("OPENAI_API_KEY"))]
 
34
  # Azure backend
35
+ # api_information = [ApiInfo(backend_used = "azure",
36
  # api_base = os.getenv("AZURE_API_BASE"),
37
  # api_key = os.getenv("AZURE_OPENAI_KEY"),
38
+ # api_version = os.getenv("AZURE_API_VERSION") )]
39
 
40
+ FLOW_MODULES_PATH = "./"
 
41
 
42
+ jwt = os.getenv("COLINK_JWT")
43
+ addr = os.getenv("LOCAL_COLINK_ADDRESS")
44
+
45
+ cl = serve_utils.start_colink_component(
46
+ "Reverse Number Demo",
47
+ {"jwt": jwt, "addr": addr}
48
+ )
49
+ # path_to_output_file = "output.jsonl" # Uncomment this line to save the output to disk
50
+
51
  root_dir = "."
52
  cfg_path = os.path.join(root_dir, "demo.yaml")
53
  cfg = read_yaml_file(cfg_path)
54
+ # put the API information in the config
55
+ serve_utils.recursive_serve_flow(
56
+ cl = cl,
57
+ flow_type="ReAct_served",
58
+ default_config=cfg,
59
+ default_state=None,
60
+ default_dispatch_point="coflows_dispatch",
61
+ )
62
 
63
+ #in case you haven't started the dispatch worker thread, uncomment
64
+ #run_dispatch_worker_thread(cl, dispatch_point="coflows_dispatch", flow_modules_base_path=FLOW_MODULES_PATH)
65
+
66
+ quick_load_api_keys(cfg, api_information, key="api_infos")
 
 
 
 
 
 
 
 
 
 
67
 
68
  # ~~~ Get the data ~~~
69
  # This can be a list of samples
70
  # data = {"id": 0, "goal": "Answer the following question: What is the population of Canada?"} # Uses wikipedia
71
+ # data = {"id": 0, "goal": "Answer the following question: Who was the NBA champion in 2023?"}
72
+ data = {
73
+ "id": 0,
74
+ "goal": "Answer the following question: What is the profession and date of birth of Michael Jordan?",
75
+ }
76
  # ~~~ Run inference ~~~
77
+ proxy_flow = serve_utils.recursive_mount(
78
+ cl=cl,
79
+ client_id="local",
80
+ flow_type="ReAct_served",
81
+ config_overrides=cfg,
82
+ initial_state=None,
83
+ dispatch_point_override=None,
84
+ )
 
85
  # ~~~ Print the output ~~~
86
+ input_message = FlowMessage(
87
+ data= data,
88
+ src_flow="Coflows team",
89
+ dst_flow=proxy_flow,
90
+ is_input_msg=True
91
+ )
92
+
93
+ future = proxy_flow.ask(input_message)
94
+
95
+ print(future.get_data())