nbaldwin commited on
Commit
210a49b
·
1 Parent(s): e1ac826

merge with coflows

Browse files
ChatWithDemonstrationsFlow.py CHANGED
@@ -1,14 +1,14 @@
1
 
2
 
3
- from aiflows.base_flows import SequentialFlow
4
  from aiflows.utils import logging
5
-
6
  logging.set_verbosity_debug()
7
-
8
  log = logging.get_logger(__name__)
9
 
10
 
11
- class ChatWithDemonstrationsFlow(SequentialFlow):
12
  """ A Chat with Demonstrations Flow. It is a flow that consists of multiple sub-flows that are executed sequentially.
13
  It's parent class is SequentialFlow.
14
 
@@ -44,4 +44,68 @@ class ChatWithDemonstrationsFlow(SequentialFlow):
44
  :param \**kwargs: Arguments to be passed to the parent class SequentialFlow constructor.
45
  """
46
  def __init__(self,**kwargs):
47
- super().__init__(**kwargs)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
 
2
 
3
+ from aiflows.base_flows import CompositeFlow
4
  from aiflows.utils import logging
5
+ from aiflows.interfaces import KeyInterface
6
  logging.set_verbosity_debug()
7
+ from aiflows.messages import FlowMessage
8
  log = logging.get_logger(__name__)
9
 
10
 
11
+ class ChatWithDemonstrationsFlow(CompositeFlow):
12
  """ A Chat with Demonstrations Flow. It is a flow that consists of multiple sub-flows that are executed sequentially.
13
  It's parent class is SequentialFlow.
14
 
 
44
  :param \**kwargs: Arguments to be passed to the parent class SequentialFlow constructor.
45
  """
46
  def __init__(self,**kwargs):
47
+ super().__init__(**kwargs)
48
+ self.output_interface = KeyInterface(
49
+ keys_to_rename={"api_output": "answer"}
50
+ )
51
+
52
+ def set_up_flow_state(self):
53
+ """ Set up the flow state. It sets the last state of the flow to None.
54
+ """
55
+ super().set_up_flow_state()
56
+ self.flow_state["last_state"] = None
57
+
58
+ def run(self,input_message):
59
+ """ Runs the flow.
60
+ :param input_message: The input message of the flow.
61
+ :type input_message: FlowMessage
62
+ """
63
+ # #~~~~~~~~~~~Solution 1 - Blocking ~~~~~~~
64
+ # future = self.subflows["demonstration_flow"].get_reply_future(input_message)
65
+
66
+ # answer = self.subflows["chat_flow"].get_reply_future(future.get_message())
67
+
68
+ #reply = self.package_output_message(input_message, self.output_interface(answer.get_data()))
69
+
70
+ # self.send_message(reply)
71
+
72
+ # #~~~~~~~~~~~Solution 2 - Non-Blocking ~~~~~~~
73
+ if self.flow_state["last_state"] is None:
74
+
75
+ self.flow_state["initial_message"] = input_message
76
+
77
+ msg = self.package_input_message(
78
+ input_message.data
79
+ )
80
+
81
+ self.subflows["demonstration_flow"].get_reply(
82
+ msg,
83
+ )
84
+ self.flow_state["last_state"] = "demonstration_flow"
85
+
86
+ elif self.flow_state["last_state"] == "demonstration_flow":
87
+
88
+ msg = self.package_input_message(
89
+ input_message.data
90
+ )
91
+
92
+ self.subflows["chat_flow"].get_reply(
93
+ msg,
94
+ )
95
+
96
+ self.flow_state["last_state"] = "chat_flow"
97
+
98
+ else:
99
+ self.flow_state["last_state"] = None
100
+
101
+ reply = self.package_output_message(
102
+ self.flow_state["initial_message"],
103
+ response = self.output_interface(input_message).data
104
+ )
105
+
106
+ self.send_message(
107
+ reply
108
+ )
109
+
110
+
111
+
ChatWithDemonstrationsFlow.yaml CHANGED
@@ -1,33 +1,17 @@
 
1
  name: "ChatAtomic_Flow_with_Demonstrations"
2
  description: "A sequential flow that answers questions with demonstrations"
3
-
4
  subflows_config:
5
  demonstration_flow:
6
- _target_: flow_modules.aiflows.ChatWithDemonstrationsFlowModule.DemonstrationsAtomicFlow.instantiate_from_default_config
7
-
 
 
 
8
  chat_flow:
9
- _target_: flow_modules.aiflows.ChatFlowModule.ChatAtomicFlow.instantiate_from_default_config
10
-
11
- topology:
12
- - goal: Get Demonstrations
13
- input_interface:
14
- _target_: aiflows.interfaces.KeyInterface
15
- flow: demonstration_flow
16
- output_interface:
17
- _target_: aiflows.interfaces.KeyInterface
18
-
19
- - goal: Answer the question
20
- input_interface:
21
- _target_: aiflows.interfaces.KeyInterface
22
- flow: chat_flow
23
- output_interface:
24
- _target_: aiflows.interfaces.KeyInterface
25
- keys_to_rename:
26
- api_output: answer # Rename the api_output to answer
27
-
28
-
29
- input_interface_initialized:
30
- - "query"
31
-
32
- output_interface:
33
- - "answer"
 
1
+
2
  name: "ChatAtomic_Flow_with_Demonstrations"
3
  description: "A sequential flow that answers questions with demonstrations"
4
+ _target_: flow_modules.aiflows.ChatWithDemonstrationsFlowModule.ChatWithDemonstrationsFlow.instantiate_from_default_config
5
  subflows_config:
6
  demonstration_flow:
7
+ flow_class_name: flow_modules.aiflows.ChatWithDemonstrationsFlowModule.DemonstrationsAtomicFlow
8
+ flow_endpoint: DemonstrationsAtomicFlow
9
+ user_id: local
10
+ name: "DemonstrationsAtomicFlow"
11
+ description: "A flow that answers questions with demonstrations"
12
  chat_flow:
13
+ name: "ChatAtomicFlow"
14
+ description: "A flow that answers questions"
15
+ flow_class_name: flow_modules.aiflows.ChatFlowModule.ChatAtomicFlow
16
+ flow_endpoint: ChatAtomicFlow
17
+ user_id: local
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
DemonstrationsAtomicFlow.py CHANGED
@@ -5,6 +5,7 @@ from aiflows.utils import general_helpers
5
  from typing import Dict,Any,Optional,List
6
  from aiflows.prompt_template import JinjaPrompt
7
  from copy import deepcopy
 
8
  import os
9
  import hydra
10
  log = logging.get_logger(__name__)
@@ -41,7 +42,7 @@ class DemonstrationsAtomicFlow(AtomicFlow):
41
 
42
  *Output Interface*:
43
 
44
- - The input interface expected by its successor flow (e.g. typically ChatAtomicFlow so the input interface expected by ChatAtomicFlow))
45
  - `demonstrations` (List[Dict[str, Any]]): A list of demonstrations. Each demonstration is a dictionary with the following keys:
46
  - idx (int): The index of the demonstration
47
  - query (str): The query of the demonstration
@@ -182,12 +183,15 @@ class DemonstrationsAtomicFlow(AtomicFlow):
182
  log.info("Loaded the demonstrations for %d datapoints from %s", len(self.data), self.params["data_dir"])
183
 
184
  def run(self,
185
- input_data: Dict[str, Any]) -> Dict[str, Any]:
186
- """ This method runs the flow. It returns the input data of the flow with the demonstrations added to it.
187
 
188
- :param input_data: The input data of the flow.
189
- :type input_data: Dict[str, Any]
190
- :return: The input data of the flow with the demonstrations added to it.
191
- :rtype: Dict[str, Any]
192
  """
193
- return {**input_data,**{"demonstrations": self._get_io_pairs(input_data=input_data)}}
 
 
 
 
 
 
5
  from typing import Dict,Any,Optional,List
6
  from aiflows.prompt_template import JinjaPrompt
7
  from copy import deepcopy
8
+ from aiflows.messages import FlowMessage
9
  import os
10
  import hydra
11
  log = logging.get_logger(__name__)
 
42
 
43
  *Output Interface*:
44
 
45
+ - Whichever data that was passed in the input_message (e.g. typically ChatAtomicFlow so the input interface expected by ChatAtomicFlow))
46
  - `demonstrations` (List[Dict[str, Any]]): A list of demonstrations. Each demonstration is a dictionary with the following keys:
47
  - idx (int): The index of the demonstration
48
  - query (str): The query of the demonstration
 
183
  log.info("Loaded the demonstrations for %d datapoints from %s", len(self.data), self.params["data_dir"])
184
 
185
  def run(self,
186
+ input_message: FlowMessage):
187
+ """ This method runs the flow. It returns the data of the input_message with the demonstrations added to it.
188
 
189
+ :param input_message: The input message of the flow.
190
+ :type input_message: FlowMessage
 
 
191
  """
192
+ input_data = input_message.data
193
+ reply = self.package_output_message(
194
+ input_message=input_message,
195
+ response = {**{"demonstrations": self._get_io_pairs(input_data=input_data)},**input_data}
196
+ )
197
+ self.send_message(reply)
DemonstrationsAtomicFlow.yaml CHANGED
@@ -1,9 +1,6 @@
1
  name: "DemonstrationAtomicFlow"
2
  description: "A flow which returns Demonstrations"
3
-
4
-
5
-
6
-
7
  data: ??? #e.g. [{"query_data": {"query": "What is the capital of France?"}, "response_data": {"response": "Paris, my sir."}}]
8
  params:
9
  data_dir: ???
 
1
  name: "DemonstrationAtomicFlow"
2
  description: "A flow which returns Demonstrations"
3
+ _target_: flow_modules.aiflows.ChatWithDemonstrationsFlowModule.DemonstrationsAtomicFlow.instantiate_from_default_config
 
 
 
4
  data: ??? #e.g. [{"query_data": {"query": "What is the capital of France?"}, "response_data": {"response": "Paris, my sir."}}]
5
  params:
6
  data_dir: ???
__init__.py CHANGED
@@ -1,8 +1,11 @@
1
  # ~~~ Specify the dependencies ~~
 
 
2
  dependencies = [
3
  {"url": "aiflows/ChatFlowModule", "revision": "main"},
4
  ]
5
  from aiflows import flow_verse
6
  flow_verse.sync_dependencies(dependencies)
 
7
  from .ChatWithDemonstrationsFlow import ChatWithDemonstrationsFlow
8
- from .DemonstrationsAtomicFlow import DemonstrationsAtomicFlow
 
1
  # ~~~ Specify the dependencies ~~
2
+
3
+
4
  dependencies = [
5
  {"url": "aiflows/ChatFlowModule", "revision": "main"},
6
  ]
7
  from aiflows import flow_verse
8
  flow_verse.sync_dependencies(dependencies)
9
+
10
  from .ChatWithDemonstrationsFlow import ChatWithDemonstrationsFlow
11
+ from .DemonstrationsAtomicFlow import DemonstrationsAtomicFlow
demo.yaml CHANGED
@@ -1,76 +1,74 @@
1
- flow: # Overrides the OpenAIChatAtomicFlow config
2
- _target_: flow_modules.aiflows.ChatWithDemonstrationsFlowModule.ChatWithDemonstrationsFlow.instantiate_from_default_config
3
- name: "SimpleQA_Flow_with_Demonstrations"
4
- description: "A sequential flow that answers questions with demonstrations"
5
-
6
- input_interface: # Connector between the "input data" and the Flow
7
- - "questions"
8
- output_interface: # Connector between the Flow's output and the caller
9
- - "answer"
10
-
11
- subflows_config:
12
- demonstration_flow:
13
- data:
14
- - query_data:
15
- query: "What is the capital of Turkey?"
16
- response_data:
17
- response: "Istambul, my sir."
18
- - query_data:
19
- query: "what is the capital of Germany?"
20
- response_data:
21
- response: "Berlin, my sir."
22
- params:
23
- data_dir: null
24
- demonstrations_id: my_sir_demo
25
- query_prompt_template:
26
- template: |2-
27
- Answer the following question: {{query}}
28
- input_variables:
29
- - "query"
30
- response_prompt_template:
31
- template: |2-
32
- {{response}}
33
- input_variables:
34
- - response
35
 
36
- chat_flow:
37
- name: "SimpleQA_Flow"
38
- # ~~~ Input interface specification ~~~
39
- input_interface_non_initialized:
40
- - "question"
41
 
42
- # ~~~ backend model parameters ~~
43
- backend:
44
- _target_: aiflows.backends.llm_lite.LiteLLMBackend
45
- api_infos: ???
46
- model_name:
47
- openai: "gpt-3.5-turbo"
48
- azure: "azure/gpt-4"
49
-
50
- # ~~~ generation_parameters ~~
51
- n: 1
52
- max_tokens: 3000
53
- temperature: 0.3
 
 
 
 
 
 
 
 
 
 
 
54
 
55
- top_p: 0.2
56
- frequency_penalty: 0
57
- presence_penalty: 0
 
 
 
58
 
59
- n_api_retries: 6
60
- wait_time_between_retries: 20
 
 
 
 
 
61
 
62
- # ~~~ Prompt specification ~~~
63
- system_message_prompt_template:
64
- _target_: aiflows.prompt_template.JinjaPrompt
65
- template: |2-
66
- You are a helpful chatbot that truthfully answers questions. Answer in a similar way to your previous replies.
67
- input_variables: []
68
- partial_variables: {}
69
-
70
 
71
- init_human_message_prompt_template:
72
- _target_: aiflows.prompt_template.JinjaPrompt
73
- template: |2-
74
- Answer the following question: {{question}}
75
- input_variables: ["question"]
76
- partial_variables: {}
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ _target_: flow_modules.aiflows.ChatWithDemonstrationsFlowModule.ChatWithDemonstrationsFlow.instantiate_from_default_config
2
+ name: "SimpleQA_Flow_with_Demonstrations"
3
+ description: "A sequential flow that answers questions with demonstrations"
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
4
 
5
+ input_interface: # Connector between the "input data" and the Flow
6
+ - "questions"
7
+ output_interface: # Connector between the Flow's output and the caller
8
+ - "answer"
 
9
 
10
+ subflows_config:
11
+ demonstration_flow:
12
+ name: "proxy of DemonstrationsAtomicFlow"
13
+ description: "A flow that answers questions with demonstrations"
14
+ data:
15
+ - query_data:
16
+ query: "What is the capital of Turkey?"
17
+ response_data:
18
+ response: "Istambul, my sir."
19
+ - query_data:
20
+ query: "what is the capital of Germany?"
21
+ response_data:
22
+ response: "Berlin, my sir."
23
+ query_prompt_template:
24
+ template: |2-
25
+ Answer the following question: {{query}}
26
+ input_variables:
27
+ - "query"
28
+ response_prompt_template:
29
+ template: |2-
30
+ {{response}}
31
+ input_variables:
32
+ - response
33
 
34
+ chat_flow:
35
+ name: "proxy SimpleQA_Flow"
36
+ description: "A flow that answers questions"
37
+ # ~~~ Input interface specification ~~~
38
+ input_interface_non_initialized:
39
+ - "question"
40
 
41
+ # ~~~ backend model parameters ~~
42
+ backend:
43
+ _target_: aiflows.backends.llm_lite.LiteLLMBackend
44
+ api_infos: ???
45
+ model_name:
46
+ openai: "gpt-3.5-turbo"
47
+ azure: "azure/gpt-4"
48
 
49
+ # ~~~ generation_parameters ~~
50
+ n: 1
51
+ max_tokens: 3000
52
+ temperature: 0.3
 
 
 
 
53
 
54
+ top_p: 0.2
55
+ frequency_penalty: 0
56
+ presence_penalty: 0
57
+
58
+ n_api_retries: 6
59
+ wait_time_between_retries: 20
60
+
61
+ # ~~~ Prompt specification ~~~
62
+ system_message_prompt_template:
63
+ _target_: aiflows.prompt_template.JinjaPrompt
64
+ template: |2-
65
+ You are a helpful chatbot that truthfully answers questions. Answer in a similar way to your previous replies.
66
+ input_variables: []
67
+ partial_variables: {}
68
+
69
+ init_human_message_prompt_template:
70
+ _target_: aiflows.prompt_template.JinjaPrompt
71
+ template: |2-
72
+ Answer the following question: {{question}}
73
+ input_variables: ["question"]
74
+ partial_variables: {}
run.py CHANGED
@@ -4,72 +4,102 @@ import hydra
4
 
5
  import aiflows
6
  from aiflows.backends.api_info import ApiInfo
7
- from aiflows.utils.general_helpers import read_yaml_file
8
 
9
  from aiflows import logging
10
  from aiflows.flow_cache import CACHING_PARAMETERS, clear_cache
11
 
 
 
 
 
 
 
 
12
  CACHING_PARAMETERS.do_caching = False # Set to True in order to disable caching
13
  # clear_cache() # Uncomment this line to clear the cache
14
 
15
- logging.set_verbosity_debug() # Uncomment this line to see verbose logs
16
-
17
- from aiflows import flow_verse
18
 
19
 
20
  dependencies = [
21
- {"url": "aiflows/ChatWithDemonstrationsFlowModule", "revision": os.getcwd()},
22
  ]
23
 
 
24
  flow_verse.sync_dependencies(dependencies)
25
-
26
  if __name__ == "__main__":
27
- # ~~~ Set the API information ~~~
28
- # OpenAI backend
 
 
 
 
29
 
 
 
 
 
 
 
 
30
  api_information = [ApiInfo(backend_used="openai",
31
  api_key = os.getenv("OPENAI_API_KEY"))]
32
-
33
-
34
  # # Azure backend
35
  # api_information = ApiInfo(backend_used = "azure",
36
  # api_base = os.getenv("AZURE_API_BASE"),
37
  # api_key = os.getenv("AZURE_OPENAI_KEY"),
38
  # api_version = os.getenv("AZURE_API_VERSION") )
 
 
39
 
40
- root_dir = "."
41
- cfg_path = os.path.join(root_dir, "demo.yaml")
42
- cfg = read_yaml_file(cfg_path)
 
 
 
 
 
43
 
44
- cfg["flow"]["subflows_config"]["chat_flow"]["backend"]["api_infos"] = api_information
45
-
46
- # ~~~ Instantiate the Flow ~~~
47
- flow_with_interfaces = {
48
- "flow": hydra.utils.instantiate(cfg['flow'], _recursive_=False, _convert_="partial"),
49
- "input_interface": (
50
- None
51
- if cfg.get( "input_interface", None) is None
52
- else hydra.utils.instantiate(cfg['input_interface'], _recursive_=False)
53
- ),
54
- "output_interface": (
55
- None
56
- if cfg.get( "output_interface", None) is None
57
- else hydra.utils.instantiate(cfg['output_interface'], _recursive_=False)
58
- ),
59
- }
60
- # ~~~ Get the data ~~~
61
- data = {"id": 0, "question": "What's the capital of France?"} # This can be a list of samples
62
- # data = {"id": 0, "question": "Who was the NBA champion in 2023?"} # This can be a list of samples
63
- # ~~~ Run inference ~~~
64
- path_to_output_file = None
65
- # path_to_output_file = "output.jsonl" # Uncomment this line to save the output to disk
66
 
67
- _, outputs = FlowLauncher.launch(
68
- flow_with_interfaces=flow_with_interfaces,
69
- data=data,
70
- path_to_output_file=path_to_output_file
 
 
71
  )
 
 
 
 
 
 
72
 
 
 
 
 
 
 
 
 
 
73
  # ~~~ Print the output ~~~
74
- flow_output_data = outputs[0]
75
- print(flow_output_data)
 
 
 
 
 
 
 
 
 
 
 
 
4
 
5
  import aiflows
6
  from aiflows.backends.api_info import ApiInfo
7
+ from aiflows.utils.general_helpers import read_yaml_file, quick_load_api_keys
8
 
9
  from aiflows import logging
10
  from aiflows.flow_cache import CACHING_PARAMETERS, clear_cache
11
 
12
+ from aiflows.utils import serve_utils
13
+ from aiflows.workers import run_dispatch_worker_thread
14
+ from aiflows.messages import FlowMessage
15
+ from aiflows.interfaces import KeyInterface
16
+ from aiflows.utils.colink_utils import start_colink_server
17
+ from aiflows.workers import run_dispatch_worker_thread
18
+
19
  CACHING_PARAMETERS.do_caching = False # Set to True in order to disable caching
20
  # clear_cache() # Uncomment this line to clear the cache
21
 
22
+ logging.set_verbosity_debug()
 
 
23
 
24
 
25
  dependencies = [
26
+ {"url": "aiflows/ChatWithDemonstrationsFlowModule", "revision": os.getcwd()}
27
  ]
28
 
29
+ from aiflows import flow_verse
30
  flow_verse.sync_dependencies(dependencies)
 
31
  if __name__ == "__main__":
32
+
33
+ #1. ~~~~~ Set up a colink server ~~~~
34
+ FLOW_MODULES_PATH = "./"
35
+
36
+ cl = start_colink_server()
37
+
38
 
39
+ #2. ~~~~~Load flow config~~~~~~
40
+ root_dir = "."
41
+ cfg_path = os.path.join(root_dir, "demo.yaml")
42
+ cfg = read_yaml_file(cfg_path)
43
+
44
+ #2.1 ~~~ Set the API information ~~~
45
+ # OpenAI backend
46
  api_information = [ApiInfo(backend_used="openai",
47
  api_key = os.getenv("OPENAI_API_KEY"))]
 
 
48
  # # Azure backend
49
  # api_information = ApiInfo(backend_used = "azure",
50
  # api_base = os.getenv("AZURE_API_BASE"),
51
  # api_key = os.getenv("AZURE_OPENAI_KEY"),
52
  # api_version = os.getenv("AZURE_API_VERSION") )
53
+
54
+ quick_load_api_keys(cfg, api_information, key="api_infos")
55
 
56
+
57
+ #3. ~~~~ Serve The Flow ~~~~
58
+
59
+ serve_utils.recursive_serve_flow(
60
+ cl = cl,
61
+ flow_class_name="flow_modules.aiflows.ChatWithDemonstrationsFlowModule.ChatWithDemonstrationsFlow",
62
+ flow_endpoint="ChatWithDemonstrationsFlow",
63
+ )
64
 
65
+
66
+ #4. ~~~~~Start A Worker Thread~~~~~
67
+ run_dispatch_worker_thread(cl)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
68
 
69
+ #5. ~~~~~Mount the flow and get its proxy~~~~~~
70
+ proxy_flow= serve_utils.get_flow_instance(
71
+ cl=cl,
72
+ flow_endpoint="ChatWithDemonstrationsFlow",
73
+ user_id="local",
74
+ config_overrides = cfg
75
  )
76
+
77
+
78
+ #6. ~~~ Get the data ~~~
79
+ data = {"id": 0, "question": "What is the capital of France?"} # This can be a list of samples
80
+ # data = {"id": 0, "question": "Who was the NBA champion in 2023?"} # This can be a list of samples
81
+
82
 
83
+ input_message = proxy_flow.package_input_message(data = data)
84
+
85
+ #7. ~~~ Run inference ~~~
86
+ future = proxy_flow.get_reply_future(input_message)
87
+
88
+ #uncomment this line if you would like to get the full message back
89
+ #reply_message = future.get_message()
90
+ reply_data = future.get_data()
91
+
92
  # ~~~ Print the output ~~~
93
+ print("~~~~~~Reply~~~~~~")
94
+ print(reply_data)
95
+
96
+
97
+ #8. ~~~~ (Optional) apply output interface on reply ~~~~
98
+ # output_interface = KeyInterface(
99
+ # keys_to_rename={"api_output": "answer"},
100
+ # )
101
+ # print("Output: ", output_interface(reply_data))
102
+
103
+
104
+ #9. ~~~~~Optional: Unserve Flow~~~~~~
105
+ # serve_utils.delete_served_flow(cl, "ChatWithDemonstrationFlowModule")