nbaldwin commited on
Commit
6211d74
·
1 Parent(s): 035821c

coflow compatible

Browse files
Files changed (5) hide show
  1. ChatHumanFlowModule.py +101 -3
  2. ChatHumanFlowModule.yaml +15 -49
  3. __init__.py +2 -2
  4. demo.yaml +28 -30
  5. run.py +78 -41
ChatHumanFlowModule.py CHANGED
@@ -1,10 +1,12 @@
1
- from aiflows.base_flows import CircularFlow
2
  from aiflows.utils import logging
3
-
 
 
4
  log = logging.get_logger(f"aiflows.{__name__}")
5
 
6
 
7
- class ChatHumanFlowModule(CircularFlow):
8
  """ This class implements a Chat Human Flow Module. It is a flow that consists of two sub-flows that are executed circularly. It Contains the following subflows:
9
 
10
  - A User Flow: A flow makes queries to the Assistant Flow. E.g. The user asks the assistant (LLM) a question.
@@ -48,8 +50,104 @@ class ChatHumanFlowModule(CircularFlow):
48
 
49
  def __init__(self, **kwargs):
50
  super().__init__(**kwargs)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
51
 
52
  @classmethod
53
  def type(cls):
54
  """ This method returns the type of the flow."""
55
  return "OpenAIChatHumanFlowModule"
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from aiflows.base_flows import CompositeFlow
2
  from aiflows.utils import logging
3
+ from aiflows.messages import FlowMessage
4
+ from aiflows.interfaces import KeyInterface
5
+ from aiflows.data_transformations import RegexFirstOccurrenceExtractor,EndOfInteraction
6
  log = logging.get_logger(f"aiflows.{__name__}")
7
 
8
 
9
+ class ChatHumanFlowModule(CompositeFlow):
10
  """ This class implements a Chat Human Flow Module. It is a flow that consists of two sub-flows that are executed circularly. It Contains the following subflows:
11
 
12
  - A User Flow: A flow makes queries to the Assistant Flow. E.g. The user asks the assistant (LLM) a question.
 
50
 
51
  def __init__(self, **kwargs):
52
  super().__init__(**kwargs)
53
+
54
+
55
+ self.regex_extractor = RegexFirstOccurrenceExtractor(**self.flow_config["regex_first_occurrence_extractor"])
56
+
57
+
58
+ self.end_of_interaction = EndOfInteraction(**self.flow_config["end_of_interaction"])
59
+
60
+ self.input_interface_assistant = KeyInterface(
61
+ keys_to_rename = {"human_input": "query"},
62
+ additional_transformations = [self.regex_extractor, self.end_of_interaction]
63
+ )
64
+
65
+ def set_up_flow_state(self):
66
+ """ This method sets up the flow state. It is called when the flow is executed."""
67
+ super().set_up_flow_state()
68
+ self.flow_state["last_flow_called"] = None
69
+ self.flow_state["current_round"] = 0
70
+ self.flow_state["user_inputs"] = []
71
+ self.flow_state["assistant_outputs"] = []
72
+ self.flow_state["input_message"] = None
73
+ self.flow_state["end_of_interaction"] = False
74
 
75
  @classmethod
76
  def type(cls):
77
  """ This method returns the type of the flow."""
78
  return "OpenAIChatHumanFlowModule"
79
+
80
+ def max_rounds_reached(self):
81
+ return self.flow_config["max_rounds"] is not None and self.flow_state["current_round"] >= self.flow_config["max_rounds"]
82
+
83
+ def generate_reply(self):
84
+
85
+ reply = self._package_output_message(
86
+ input_message = self.flow_state["input_message"],
87
+ response = {
88
+ "user_inputs": self.flow_state["user_inputs"],
89
+ "assistant_outputs": self.flow_state["assistant_outputs"],
90
+ "end_of_interaction": self.flow_state["end_of_interaction"]
91
+ },
92
+ )
93
+
94
+ self.reply_to_message(
95
+ reply = reply,
96
+ to = self.flow_state["input_message"]
97
+ )
98
+
99
+
100
+
101
+ def call_to_user(self,input_message):
102
+
103
+ self.flow_state["assistant_outputs"].append(input_message.data["api_output"])
104
+
105
+ if self.max_rounds_reached():
106
+ self.generate_reply()
107
+ else:
108
+ self.subflows["User"].send_message_async(input_message,pipe_to=self.flow_config["flow_ref"])
109
+ self.flow_state["last_flow_called"] = "User"
110
+
111
+ self.flow_state["current_round"] += 1
112
+
113
+
114
+
115
+ def call_to_assistant(self,input_message):
116
+ message = self.input_interface_assistant(input_message)
117
+
118
+ if self.flow_state["last_flow_called"] is None:
119
+ self.flow_state["input_message"] = input_message
120
+
121
+ else:
122
+ self.flow_state["user_inputs"].append(input_message.data["query"])
123
+
124
+ if message.data["end_of_interaction"]:
125
+ self.flow_state["end_of_interaction"] = True
126
+ self.generate_reply()
127
+
128
+ else:
129
+ self.subflows["Assistant"].send_message_async(message,pipe_to=self.flow_config["flow_ref"])
130
+ self.flow_state["last_flow_called"] = "Assistant"
131
+
132
+ def run(self,input_message: FlowMessage):
133
+ """ This method runs the flow. It is the main method of the flow and it is called when the flow is executed.
134
+
135
+ :param input_message: The input message to the flow.
136
+ :type input_message: FlowMessage
137
+ """
138
+ last_flow_called = self.flow_state["last_flow_called"]
139
+
140
+ if last_flow_called is None or last_flow_called == "User":
141
+ self.call_to_assistant(input_message=input_message)
142
+
143
+
144
+ elif last_flow_called == "Assistant":
145
+ self.call_to_user(input_message=input_message)
146
+
147
+
148
+
149
+
150
+
151
+
152
+
153
+
ChatHumanFlowModule.yaml CHANGED
@@ -4,11 +4,24 @@ description: "Flow that enables chatting between a ChatAtomicFlow and a user pro
4
  max_rounds: null # Run until early exit is detected
5
 
6
  input_interface:
7
- _target_: aiflows.interfaces.KeyInterface
8
 
9
  output_interface:
10
  - "end_of_interaction"
11
- - "answer"
 
 
 
 
 
 
 
 
 
 
 
 
 
12
 
13
  subflows_config:
14
  Assistant:
@@ -16,52 +29,5 @@ subflows_config:
16
  User:
17
  _target_: flow_modules.aiflows.HumanStandardInputFlowModule.HumanStandardInputFlow.instantiate_from_default_config
18
 
19
- topology:
20
- - goal: "Query the assistant"
21
-
22
- ### Input Interface
23
- input_interface:
24
- _target_: aiflows.interfaces.KeyInterface
25
- additional_transformations:
26
- - _target_: aiflows.data_transformations.KeyMatchInput
27
-
28
- ### Flow Specification
29
- flow: Assistant
30
- reset_every_round: false
31
-
32
- ### Output Interface
33
- output_interface:
34
- _target_: aiflows.interfaces.KeyInterface
35
- additional_transformations:
36
- - _target_: aiflows.data_transformations.PrintPreviousMessages
37
-
38
- - goal: "Ask the user for input"
39
-
40
- ### Input Interface
41
- input_interface:
42
- _target_: aiflows.interfaces.KeyInterface
43
- additional_transformations:
44
- - _target_: aiflows.data_transformations.KeyMatchInput
45
-
46
- ### Flow Specification
47
- flow: User
48
- reset_every_round: true
49
 
50
- ### Output Interface
51
- output_interface:
52
- _target_: aiflows.interfaces.KeyInterface
53
- keys_to_rename:
54
- human_input: query
55
- additional_transformations:
56
- - _target_: aiflows.data_transformations.RegexFirstOccurrenceExtractor
57
- regex: '(?<=```answer)([\s\S]*?)(?=```)'
58
- input_key: "query"
59
- output_key: "answer"
60
- strip: True
61
- assert_unique: True
62
- - _target_: aiflows.data_transformations.EndOfInteraction
63
- end_of_interaction_string: "<END>"
64
- input_key: "query"
65
- output_key: "end_of_interaction"
66
 
67
- early_exit_key: "end_of_interaction"
 
4
  max_rounds: null # Run until early exit is detected
5
 
6
  input_interface:
7
+ - "query"
8
 
9
  output_interface:
10
  - "end_of_interaction"
11
+ - "user_inputs"
12
+ - "assistant_outputs"
13
+
14
+ regex_first_occurrence_extractor:
15
+ regex: "(?<=```answer)([\\s\\S]*?)(?=```)"
16
+ input_key: "query"
17
+ output_key: "answer"
18
+ strip: true
19
+ assert_unique: true
20
+
21
+ end_of_interaction:
22
+ end_of_interaction_string: "<END>"
23
+ input_key: "query"
24
+ output_key: "end_of_interaction"
25
 
26
  subflows_config:
27
  Assistant:
 
29
  User:
30
  _target_: flow_modules.aiflows.HumanStandardInputFlowModule.HumanStandardInputFlow.instantiate_from_default_config
31
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
32
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
33
 
 
__init__.py CHANGED
@@ -1,7 +1,7 @@
1
  # ~~~ Specify the dependencies ~~~
2
  dependencies = [
3
- {"url": "aiflows/ChatFlowModule", "revision": "main"},
4
- {"url": "aiflows/HumanStandardInputFlowModule", "revision": "main"},
5
  ]
6
  from aiflows import flow_verse
7
 
 
1
  # ~~~ Specify the dependencies ~~~
2
  dependencies = [
3
+ {"url": "aiflows/ChatFlowModule", "revision": "coflows"},
4
+ {"url": "aiflows/HumanStandardInputFlowModule", "revision": "coflows"},
5
  ]
6
  from aiflows import flow_verse
7
 
demo.yaml CHANGED
@@ -1,31 +1,29 @@
1
 
2
-
3
- flow:
4
- max_rounds: 5
5
- _target_: flow_modules.aiflows.ChatInteractiveFlowModule.ChatHumanFlowModule.instantiate_from_default_config
6
- subflows_config:
7
- Assistant:
8
- _target_: flow_modules.aiflows.ChatFlowModule.ChatAtomicFlow.instantiate_from_default_config
9
- backend:
10
- _target_: aiflows.backends.llm_lite.LiteLLMBackend
11
- api_infos: ???
12
- model_name:
13
- openai: "gpt-4"
14
- azure: "azure/gpt-4"
15
- input_interface_non_initialized: []
16
-
17
- User:
18
- _target_: flow_modules.aiflows.HumanStandardInputFlowModule.HumanStandardInputFlow.instantiate_from_default_config
19
- request_multi_line_input_flag: False
20
- query_message_prompt_template:
21
- _target_: aiflows.prompt_template.JinjaPrompt
22
- template: |2-
23
- {{api_output}}
24
-
25
- To end an Interaction, type <END> and press enter.
26
-
27
- input_variables: ["api_output"]
28
- input_interface:
29
- - "api_output"
30
- output_interface:
31
- - "human_input"
 
1
 
2
+ max_rounds: 2
3
+ _target_: flow_modules.aiflows.ChatInteractiveFlowModule.ChatHumanFlowModule.instantiate_from_default_config
4
+ subflows_config:
5
+ Assistant:
6
+ _target_: flow_modules.aiflows.ChatFlowModule.ChatAtomicFlow.instantiate_from_default_config
7
+ backend:
8
+ _target_: aiflows.backends.llm_lite.LiteLLMBackend
9
+ api_infos: ???
10
+ model_name:
11
+ openai: "gpt-4"
12
+ azure: "azure/gpt-4"
13
+ input_interface_non_initialized: []
14
+
15
+ User:
16
+ _target_: flow_modules.aiflows.HumanStandardInputFlowModule.HumanStandardInputFlow.instantiate_from_default_config
17
+ request_multi_line_input_flag: False
18
+ query_message_prompt_template:
19
+ _target_: aiflows.prompt_template.JinjaPrompt
20
+ template: |2-
21
+ {{api_output}}
22
+
23
+ To end an Interaction, type <END> and press enter.
24
+
25
+ input_variables: ["api_output"]
26
+ input_interface:
27
+ - "api_output"
28
+ output_interface:
29
+ - "human_input"
 
 
run.py CHANGED
@@ -1,5 +1,3 @@
1
- """A simple script to run a Flow that can be used for development and debugging."""
2
-
3
  import os
4
 
5
  import hydra
@@ -7,71 +5,110 @@ import hydra
7
  import aiflows
8
  from aiflows.flow_launchers import FlowLauncher
9
  from aiflows.backends.api_info import ApiInfo
10
- from aiflows.utils.general_helpers import read_yaml_file
11
 
12
  from aiflows import logging
13
  from aiflows.flow_cache import CACHING_PARAMETERS, clear_cache
14
 
15
- CACHING_PARAMETERS.do_caching = False # Set to True to enable caching
 
 
 
 
 
 
 
16
  # clear_cache() # Uncomment this line to clear the cache
17
 
18
  logging.set_verbosity_debug()
19
 
 
20
  dependencies = [
21
- {"url": "aiflows/ChatInteractiveFlowModule", "revision": os.getcwd()},
22
  ]
 
23
  from aiflows import flow_verse
24
  flow_verse.sync_dependencies(dependencies)
25
-
26
  if __name__ == "__main__":
27
- # ~~~ Set the API information ~~~
28
- # OpenAI backend
 
 
 
 
29
 
 
 
 
 
 
 
 
30
  api_information = [ApiInfo(backend_used="openai",
31
  api_key = os.getenv("OPENAI_API_KEY"))]
32
-
33
-
34
  # # Azure backend
35
  # api_information = ApiInfo(backend_used = "azure",
36
  # api_base = os.getenv("AZURE_API_BASE"),
37
  # api_key = os.getenv("AZURE_OPENAI_KEY"),
38
  # api_version = os.getenv("AZURE_API_VERSION") )
39
-
40
- root_dir = "."
41
- cfg_path = os.path.join(root_dir, "demo.yaml")
42
- cfg = read_yaml_file(cfg_path)
43
 
44
- cfg["flow"]["subflows_config"]["Assistant"]["backend"]["api_infos"] = api_information
45
-
46
- # ~~~ Instantiate the Flow ~~~
47
- flow_with_interfaces = {
48
- "flow": hydra.utils.instantiate(cfg['flow'], _recursive_=False, _convert_="partial"),
49
- "input_interface": (
50
- None
51
- if cfg.get( "input_interface", None) is None
52
- else hydra.utils.instantiate(cfg['input_interface'], _recursive_=False)
53
- ),
54
- "output_interface": (
55
- None
56
- if cfg.get( "output_interface", None) is None
57
- else hydra.utils.instantiate(cfg['output_interface'], _recursive_=False)
58
- ),
59
- }
60
- # ~~~ Get the data ~~~
61
- data = {"id": 0} # This can be a list of samples
62
- # data = {"id": 0, "question": "Who was the NBA champion in 2023?"} # This can be a list of samples
63
 
64
- # ~~~ Run inference ~~~
65
- path_to_output_file = None
66
- # path_to_output_file = "output.jsonl" # Uncomment this line to save the output to disk
 
 
 
 
 
 
 
 
 
67
 
68
- _, outputs = FlowLauncher.launch(
69
- flow_with_interfaces=flow_with_interfaces,
 
 
 
 
 
 
 
 
 
 
 
 
 
 
70
  data=data,
71
- path_to_output_file=path_to_output_file
72
  )
73
 
 
 
 
 
 
 
 
 
 
 
74
  # ~~~ Print the output ~~~
75
- flow_output_data = outputs[0]
76
- print(flow_output_data)
 
 
 
 
 
 
 
 
 
 
 
77
 
 
 
 
1
  import os
2
 
3
  import hydra
 
5
  import aiflows
6
  from aiflows.flow_launchers import FlowLauncher
7
  from aiflows.backends.api_info import ApiInfo
8
+ from aiflows.utils.general_helpers import read_yaml_file, quick_load_api_keys
9
 
10
  from aiflows import logging
11
  from aiflows.flow_cache import CACHING_PARAMETERS, clear_cache
12
 
13
+ from aiflows.utils import serve_utils
14
+ from aiflows.workers import run_dispatch_worker_thread
15
+ from aiflows.messages import FlowMessage
16
+ from aiflows.interfaces import KeyInterface
17
+ from aiflows.utils.colink_utils import start_colink_server
18
+ from aiflows.workers import run_dispatch_worker_thread
19
+
20
+ CACHING_PARAMETERS.do_caching = False # Set to True in order to disable caching
21
  # clear_cache() # Uncomment this line to clear the cache
22
 
23
  logging.set_verbosity_debug()
24
 
25
+
26
  dependencies = [
27
+ {"url": "aiflows/ChatInteractiveFlowModule", "revision": os.getcwd()}
28
  ]
29
+
30
  from aiflows import flow_verse
31
  flow_verse.sync_dependencies(dependencies)
 
32
  if __name__ == "__main__":
33
+
34
+ #1. ~~~~~ Set up a colink server ~~~~
35
+ FLOW_MODULES_PATH = "./"
36
+
37
+ cl = start_colink_server()
38
+
39
 
40
+ #2. ~~~~~Load flow config~~~~~~
41
+ root_dir = "."
42
+ cfg_path = os.path.join(root_dir, "demo.yaml")
43
+ cfg = read_yaml_file(cfg_path)
44
+
45
+ #2.1 ~~~ Set the API information ~~~
46
+ # OpenAI backend
47
  api_information = [ApiInfo(backend_used="openai",
48
  api_key = os.getenv("OPENAI_API_KEY"))]
 
 
49
  # # Azure backend
50
  # api_information = ApiInfo(backend_used = "azure",
51
  # api_base = os.getenv("AZURE_API_BASE"),
52
  # api_key = os.getenv("AZURE_OPENAI_KEY"),
53
  # api_version = os.getenv("AZURE_API_VERSION") )
 
 
 
 
54
 
55
+
56
+ quick_load_api_keys(cfg, api_information, key="api_infos")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
57
 
58
+
59
+ #3. ~~~~ Serve The Flow ~~~~
60
+ serve_utils.recursive_serve_flow(
61
+ cl = cl,
62
+ flow_type="ChatInteractiveFlowModule",
63
+ default_config=cfg,
64
+ default_state=None,
65
+ default_dispatch_point="coflows_dispatch"
66
+ )
67
+
68
+ #4. ~~~~~Start A Worker Thread~~~~~
69
+ run_dispatch_worker_thread(cl, dispatch_point="coflows_dispatch", flow_modules_base_path=FLOW_MODULES_PATH)
70
 
71
+ #5. ~~~~~Mount the flow and get its proxy~~~~~~
72
+ proxy_flow = serve_utils.recursive_mount(
73
+ cl=cl,
74
+ client_id="local",
75
+ flow_type="ChatInteractiveFlowModule",
76
+ config_overrides=None,
77
+ initial_state=None,
78
+ dispatch_point_override=None,
79
+ )
80
+
81
+ #6. ~~~ Get the data ~~~
82
+ data = {"id": 0, "query": "I want to ask you a few questions"} # This can be a list of samples
83
+ # data = {"id": 0, "question": "Who was the NBA champion in 2023?"} # This can be a list of samples
84
+
85
+ #option1: use the FlowMessage class
86
+ input_message = FlowMessage(
87
  data=data,
 
88
  )
89
 
90
+ #option2: use the proxy_flow
91
+ #input_message = proxy_flow._package_input_message(data = data)
92
+
93
+ #7. ~~~ Run inference ~~~
94
+ future = proxy_flow.send_message_blocking(input_message)
95
+
96
+ #uncomment this line if you would like to get the full message back
97
+ #reply_message = future.get_message()
98
+ reply_data = future.get_data()
99
+
100
  # ~~~ Print the output ~~~
101
+ print("~~~~~~Reply~~~~~~")
102
+ print(reply_data)
103
+
104
+
105
+ #8. ~~~~ (Optional) apply output interface on reply ~~~~
106
+ # output_interface = KeyInterface(
107
+ # keys_to_rename={"api_output": "answer"},
108
+ # )
109
+ # print("Output: ", output_interface(reply_data))
110
+
111
+
112
+ #9. ~~~~~Optional: Unserve Flow~~~~~~
113
+ # serve_utils.delete_served_flow(cl, "ChatWithDemonstrationFlowModule") o_caching = False # Set to True to enable caching
114