nbaldwin commited on
Commit
3f7f8cc
1 Parent(s): 94f0f9e

coflows compatible

Browse files
Files changed (3) hide show
  1. __init__.py +1 -1
  2. demo.yaml +17 -18
  3. run.py +78 -38
__init__.py CHANGED
@@ -1,6 +1,6 @@
1
  # ~~~ Specify the dependencies ~~
2
  dependencies = [
3
- {"url": "aiflows/ChatFlowModule", "revision": "main"}
4
  ]
5
  from aiflows import flow_verse
6
  flow_verse.sync_dependencies(dependencies)
 
1
  # ~~~ Specify the dependencies ~~
2
  dependencies = [
3
+ {"url": "aiflows/ChatFlowModule", "revision": "coflows"}
4
  ]
5
  from aiflows import flow_verse
6
  flow_verse.sync_dependencies(dependencies)
demo.yaml CHANGED
@@ -1,20 +1,19 @@
1
- flow:
2
- _target_: flow_modules.aiflows.VisionFlowModule.VisionAtomicFlow.instantiate_from_default_config
3
- name: "Demo Vision Flow"
4
- description: "A flow that, given a textual input, and a set of images and/or videos, generates a textual output."
5
- backend:
6
- api_infos: ???
7
-
8
- system_message_prompt_template:
9
- template: |2-
10
- You are a helpful chatbot that truthfully answers questions.
11
- input_variables: []
12
- partial_variables: {}
13
-
14
- init_human_message_prompt_template:
15
- template: |2-
16
- {{query}}
17
- input_variables: ["query"]
18
- partial_variables: {}
19
 
20
 
 
1
+ _target_: flow_modules.aiflows.VisionFlowModule.VisionAtomicFlow.instantiate_from_default_config
2
+ name: "Demo Vision Flow"
3
+ description: "A flow that, given a textual input, and a set of images and/or videos, generates a textual output."
4
+ backend:
5
+ api_infos: ???
6
+
7
+ system_message_prompt_template:
8
+ template: |2-
9
+ You are a helpful chatbot that truthfully answers questions.
10
+ input_variables: []
11
+ partial_variables: {}
12
+
13
+ init_human_message_prompt_template:
14
+ template: |2-
15
+ {{query}}
16
+ input_variables: ["query"]
17
+ partial_variables: {}
 
18
 
19
 
run.py CHANGED
@@ -2,59 +2,83 @@ import os
2
 
3
  import hydra
4
 
 
5
  from aiflows.flow_launchers import FlowLauncher
6
  from aiflows.backends.api_info import ApiInfo
7
- from aiflows.utils.general_helpers import read_yaml_file
8
 
9
  from aiflows import logging
10
  from aiflows.flow_cache import CACHING_PARAMETERS, clear_cache
11
 
 
 
 
 
 
 
 
12
  CACHING_PARAMETERS.do_caching = False # Set to True in order to disable caching
13
  # clear_cache() # Uncomment this line to clear the cache
14
 
15
- logging.set_verbosity_debug() # Uncomment this line to see verbose logs
16
 
17
- from aiflows import flow_verse
18
 
19
  dependencies = [
20
- {"url": "aiflows/VisionFlowModule", "revision": os.getcwd()},
21
  ]
22
- flow_verse.sync_dependencies(dependencies)
23
 
 
 
24
  if __name__ == "__main__":
25
- # ~~~ Set the API information ~~~
26
- # OpenAI backend
 
 
 
 
27
 
 
 
 
 
 
 
 
28
  api_information = [ApiInfo(backend_used="openai",
29
  api_key = os.getenv("OPENAI_API_KEY"))]
30
-
31
-
32
  # # Azure backend
33
  # api_information = ApiInfo(backend_used = "azure",
34
  # api_base = os.getenv("AZURE_API_BASE"),
35
  # api_key = os.getenv("AZURE_OPENAI_KEY"),
36
  # api_version = os.getenv("AZURE_API_VERSION") )
 
 
 
37
 
38
- root_dir = "."
39
- cfg_path = os.path.join(root_dir, "demo.yaml")
40
- cfg = read_yaml_file(cfg_path)
41
 
42
- cfg["flow"]["backend"]["api_infos"] = api_information
43
-
44
- # ~~~ Instantiate the Flow ~~~
45
- flow_with_interfaces = {
46
- "flow": hydra.utils.instantiate(cfg['flow'], _recursive_=False, _convert_="partial"),
47
- "input_interface": (
48
- None
49
- if cfg.get( "input_interface", None) is None
50
- else hydra.utils.instantiate(cfg['input_interface'], _recursive_=False)
51
- ),
52
- "output_interface": (
53
- None
54
- if cfg.get( "output_interface", None) is None
55
- else hydra.utils.instantiate(cfg['output_interface'], _recursive_=False)
56
- ),
57
- }
 
 
 
 
 
 
 
58
  url_image = {"type": "url",
59
  "image": "https://upload.wikimedia.org/wikipedia/commons/thumb/d/dd/Gfp-wisconsin-madison-the-nature-boardwalk.jpg/2560px-Gfp-wisconsin-madison-the-nature-boardwalk.jpg"}
60
 
@@ -75,17 +99,33 @@ if __name__ == "__main__":
75
  # "question": "These are frames from a video that I want to upload. Generate a compelling description that I can upload along with the video.",
76
  # "data": {"video": video}} # This can be a list of samples
77
 
78
-
79
- # ~~~ Run inference ~~~
80
- path_to_output_file = None
81
- # path_to_output_file = "output.jsonl" # Uncomment this line to save the output to disk
82
-
83
- _, outputs = FlowLauncher.launch(
84
- flow_with_interfaces=flow_with_interfaces,
85
  data=data,
86
- path_to_output_file=path_to_output_file
87
  )
88
 
 
 
 
 
 
 
 
 
 
 
89
  # ~~~ Print the output ~~~
90
- flow_output_data = outputs[0]
91
- print(flow_output_data)
 
 
 
 
 
 
 
 
 
 
 
 
 
2
 
3
  import hydra
4
 
5
+ import aiflows
6
  from aiflows.flow_launchers import FlowLauncher
7
  from aiflows.backends.api_info import ApiInfo
8
+ from aiflows.utils.general_helpers import read_yaml_file, quick_load_api_keys
9
 
10
  from aiflows import logging
11
  from aiflows.flow_cache import CACHING_PARAMETERS, clear_cache
12
 
13
+ from aiflows.utils import serve_utils
14
+ from aiflows.workers import run_dispatch_worker_thread
15
+ from aiflows.messages import FlowMessage
16
+ from aiflows.interfaces import KeyInterface
17
+ from aiflows.utils.colink_utils import start_colink_server
18
+ from aiflows.workers import run_dispatch_worker_thread
19
+
20
  CACHING_PARAMETERS.do_caching = False # Set to True in order to disable caching
21
  # clear_cache() # Uncomment this line to clear the cache
22
 
23
+ logging.set_verbosity_debug()
24
 
 
25
 
26
  dependencies = [
27
+ {"url": "aiflows/VisionFlowModule", "revision": os.getcwd()}
28
  ]
 
29
 
30
+ from aiflows import flow_verse
31
+ flow_verse.sync_dependencies(dependencies)
32
  if __name__ == "__main__":
33
+
34
+ #1. ~~~~~ Set up a colink server ~~~~
35
+ FLOW_MODULES_PATH = "./"
36
+
37
+ cl = start_colink_server()
38
+
39
 
40
+ #2. ~~~~~Load flow config~~~~~~
41
+ root_dir = "."
42
+ cfg_path = os.path.join(root_dir, "demo.yaml")
43
+ cfg = read_yaml_file(cfg_path)
44
+
45
+ #2.1 ~~~ Set the API information ~~~
46
+ # OpenAI backend
47
  api_information = [ApiInfo(backend_used="openai",
48
  api_key = os.getenv("OPENAI_API_KEY"))]
 
 
49
  # # Azure backend
50
  # api_information = ApiInfo(backend_used = "azure",
51
  # api_base = os.getenv("AZURE_API_BASE"),
52
  # api_key = os.getenv("AZURE_OPENAI_KEY"),
53
  # api_version = os.getenv("AZURE_API_VERSION") )
54
+
55
+
56
+ quick_load_api_keys(cfg, api_information, key="api_infos")
57
 
 
 
 
58
 
59
+ #3. ~~~~ Serve The Flow ~~~~
60
+ serve_utils.recursive_serve_flow(
61
+ cl = cl,
62
+ flow_type="VisionFlowModule",
63
+ default_config=cfg,
64
+ default_state=None,
65
+ default_dispatch_point="coflows_dispatch"
66
+ )
67
+
68
+ #4. ~~~~~Start A Worker Thread~~~~~
69
+ run_dispatch_worker_thread(cl, dispatch_point="coflows_dispatch", flow_modules_base_path=FLOW_MODULES_PATH)
70
+
71
+ #5. ~~~~~Mount the flow and get its proxy~~~~~~
72
+ proxy_flow = serve_utils.recursive_mount(
73
+ cl=cl,
74
+ client_id="local",
75
+ flow_type="VisionFlowModule",
76
+ config_overrides=None,
77
+ initial_state=None,
78
+ dispatch_point_override=None,
79
+ )
80
+
81
+ #6. ~~~ Get the data ~~~
82
  url_image = {"type": "url",
83
  "image": "https://upload.wikimedia.org/wikipedia/commons/thumb/d/dd/Gfp-wisconsin-madison-the-nature-boardwalk.jpg/2560px-Gfp-wisconsin-madison-the-nature-boardwalk.jpg"}
84
 
 
99
  # "question": "These are frames from a video that I want to upload. Generate a compelling description that I can upload along with the video.",
100
  # "data": {"video": video}} # This can be a list of samples
101
 
102
+ #option1: use the FlowMessage class
103
+ input_message = FlowMessage(
 
 
 
 
 
104
  data=data,
 
105
  )
106
 
107
+ #option2: use the proxy_flow
108
+ #input_message = proxy_flow._package_input_message(data = data)
109
+
110
+ #7. ~~~ Run inference ~~~
111
+ future = proxy_flow.send_message_blocking(input_message)
112
+
113
+ #uncomment this line if you would like to get the full message back
114
+ #reply_message = future.get_message()
115
+ reply_data = future.get_data()
116
+
117
  # ~~~ Print the output ~~~
118
+ print("~~~~~~Reply~~~~~~")
119
+ print(reply_data)
120
+
121
+
122
+ #8. ~~~~ (Optional) apply output interface on reply ~~~~
123
+ # output_interface = KeyInterface(
124
+ # keys_to_rename={"api_output": "answer"},
125
+ # )
126
+ # print("Output: ", output_interface(reply_data))
127
+
128
+
129
+ #9. ~~~~~Optional: Unserve Flow~~~~~~
130
+ # serve_utils.delete_served_flow(cl, "VisionFlowModule") o_caching = False # Set to True to enable caching
131
+