nbaldwin commited on
Commit
9b0c91a
·
1 Parent(s): c9f9fe3

update template for coflows compatibility

Browse files
Files changed (2) hide show
  1. demo.yaml +3 -8
  2. run.py +89 -34
demo.yaml CHANGED
@@ -1,8 +1,3 @@
1
- input_interface: # Connector between the "input data" and the Flow
2
- #your input interface here
3
- output_interface: # Connector between the Flow's output and the caller
4
- #your output interface here
5
-
6
- flow: # Overrides the for the flow config
7
- #_target_: Your target here
8
- #your flow overrides here
 
1
+ # Overrides the for the flow config
2
+ #_target_: Your target here
3
+ #your flow overrides here
 
 
 
 
 
run.py CHANGED
@@ -1,59 +1,114 @@
1
- """A simple script to run a Flow that can be used for development and debugging."""
2
-
3
  import os
4
 
5
  import hydra
6
 
7
  import aiflows
8
- from aiflows.flow_launchers import FlowLauncher, ApiInfo
9
- from aiflows.utils.general_helpers import read_yaml_file
 
10
 
11
  from aiflows import logging
12
  from aiflows.flow_cache import CACHING_PARAMETERS, clear_cache
13
 
14
- CACHING_PARAMETERS.do_caching = False # Set to True to enable caching
 
 
 
 
 
 
 
15
  # clear_cache() # Uncomment this line to clear the cache
16
 
17
  logging.set_verbosity_debug()
18
 
19
 
 
 
 
 
 
 
20
  if __name__ == "__main__":
21
 
 
 
 
 
 
22
 
23
- # ~~~ Instantiate the Flow ~~~
24
  root_dir = "."
25
  cfg_path = os.path.join(root_dir, "demo.yaml")
26
  cfg = read_yaml_file(cfg_path)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
27
 
28
- # ~~~ Instantiate the Flow ~~~
29
- flow_with_interfaces = {
30
- "flow": hydra.utils.instantiate(cfg['flow'], _recursive_=False, _convert_="partial"),
31
- "input_interface": (
32
- None
33
- if cfg.get( "input_interface", None) is None
34
- else hydra.utils.instantiate(cfg['input_interface'], _recursive_=False)
35
- ),
36
- "output_interface": (
37
- None
38
- if cfg.get( "output_interface", None) is None
39
- else hydra.utils.instantiate(cfg['output_interface'], _recursive_=False)
40
- ),
41
- }
42
-
43
- # ~~~ Get the data ~~~
44
- # This can be a list of samples
45
- data = {"id": 0} # Add your data here
46
-
47
- # ~~~ Run inference ~~~
48
- path_to_output_file = None
49
- # path_to_output_file = "output.jsonl" # Uncomment this line to save the output to disk
50
-
51
- _, outputs = FlowLauncher.launch(
52
- flow_with_interfaces=flow_with_interfaces,
53
  data=data,
54
- path_to_output_file=path_to_output_file,
55
  )
56
 
 
 
 
 
 
 
 
 
 
 
57
  # ~~~ Print the output ~~~
58
- flow_output_data = outputs[0]
59
- print(flow_output_data)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
  import os
2
 
3
  import hydra
4
 
5
  import aiflows
6
+ from aiflows.flow_launchers import FlowLauncher
7
+ from aiflows.backends.api_info import ApiInfo
8
+ from aiflows.utils.general_helpers import read_yaml_file, quick_load_api_keys
9
 
10
  from aiflows import logging
11
  from aiflows.flow_cache import CACHING_PARAMETERS, clear_cache
12
 
13
+ from aiflows.utils import serve_utils
14
+ from aiflows.workers import run_dispatch_worker_thread
15
+ from aiflows.messages import FlowMessage
16
+ from aiflows.interfaces import KeyInterface
17
+ from aiflows.utils.colink_utils import start_colink_server
18
+ from aiflows.workers import run_dispatch_worker_thread
19
+
20
+ CACHING_PARAMETERS.do_caching = False # Set to True in order to disable caching
21
  # clear_cache() # Uncomment this line to clear the cache
22
 
23
  logging.set_verbosity_debug()
24
 
25
 
26
+ dependencies = [
27
+ {"url": "aiflows/FlowModule", "revision": os.getcwd()}
28
+ ]
29
+
30
+ from aiflows import flow_verse
31
+ flow_verse.sync_dependencies(dependencies)
32
  if __name__ == "__main__":
33
 
34
+ #1. ~~~~~ Set up a colink server ~~~~
35
+ FLOW_MODULES_PATH = "./"
36
+
37
+ cl = start_colink_server()
38
+
39
 
40
+ #2. ~~~~~Load flow config~~~~~~
41
  root_dir = "."
42
  cfg_path = os.path.join(root_dir, "demo.yaml")
43
  cfg = read_yaml_file(cfg_path)
44
+
45
+ #2.1 ~~~ Set the API information ~~~
46
+ # OpenAI backend
47
+ api_information = [ApiInfo(backend_used="openai",
48
+ api_key = os.getenv("OPENAI_API_KEY"))]
49
+ # # Azure backend
50
+ # api_information = ApiInfo(backend_used = "azure",
51
+ # api_base = os.getenv("AZURE_API_BASE"),
52
+ # api_key = os.getenv("AZURE_OPENAI_KEY"),
53
+ # api_version = os.getenv("AZURE_API_VERSION") )
54
+
55
+
56
+ quick_load_api_keys(cfg, api_information, key="api_infos")
57
+
58
+
59
+ #3. ~~~~ Serve The Flow ~~~~
60
+ serve_utils.recursive_serve_flow(
61
+ cl = cl,
62
+ flow_type="FlowModule",
63
+ default_config=cfg,
64
+ default_state=None,
65
+ default_dispatch_point="coflows_dispatch"
66
+ )
67
+
68
+ #4. ~~~~~Start A Worker Thread~~~~~
69
+ run_dispatch_worker_thread(cl, dispatch_point="coflows_dispatch", flow_modules_base_path=FLOW_MODULES_PATH)
70
 
71
+ #5. ~~~~~Mount the flow and get its proxy~~~~~~
72
+ proxy_flow = serve_utils.recursive_mount(
73
+ cl=cl,
74
+ client_id="local",
75
+ flow_type="FlowModule",
76
+ config_overrides=None,
77
+ initial_state=None,
78
+ dispatch_point_override=None,
79
+ )
80
+
81
+ #6. ~~~ Get the data ~~~
82
+ data = {"id": 0} # This can be a list of samples
83
+ # data = {"id": 0, "question": "Who was the NBA champion in 2023?"} # This can be a list of samples
84
+
85
+ #option1: use the FlowMessage class
86
+ input_message = FlowMessage(
 
 
 
 
 
 
 
 
 
87
  data=data,
 
88
  )
89
 
90
+ #option2: use the proxy_flow
91
+ #input_message = proxy_flow._package_input_message(data = data)
92
+
93
+ #7. ~~~ Run inference ~~~
94
+ future = proxy_flow.send_message_blocking(input_message)
95
+
96
+ #uncomment this line if you would like to get the full message back
97
+ #reply_message = future.get_message()
98
+ reply_data = future.get_data()
99
+
100
  # ~~~ Print the output ~~~
101
+ print("~~~~~~Reply~~~~~~")
102
+ print(reply_data)
103
+
104
+
105
+ #8. ~~~~ (Optional) apply output interface on reply ~~~~
106
+ # output_interface = KeyInterface(
107
+ # keys_to_rename={"api_output": "answer"},
108
+ # )
109
+ # print("Output: ", output_interface(reply_data))
110
+
111
+
112
+ #9. ~~~~~Optional: Unserve Flow~~~~~~
113
+ # serve_utils.delete_served_flow(cl, "ChatWithDemonstrationFlowModule")
114
+