Haofei Yu commited on
Commit
084fe8e
1 Parent(s): cbe01c4

update the deployable ctm (#22)

Browse files
ctm/__init__.py ADDED
File without changes
ctm/configs/__init__.py CHANGED
@@ -0,0 +1,5 @@
 
 
 
 
 
 
1
+ from .ctm_config_base import BaseConsciousnessTuringMachineConfig
2
+
3
+ __all__ = [
4
+ "BaseConsciousnessTuringMachineConfig",
5
+ ]
ctm/configs/ctm_config_base.py CHANGED
@@ -1,44 +1,48 @@
1
  import json
 
2
 
3
 
4
- class BaseConsciousnessTuringMachineConfig(object):
5
- # Initialize with default values or those passed to the constructor
6
  def __init__(
7
  self,
8
- ctm_name=None,
9
- max_iter_num=3,
10
- output_threshold=0.5,
11
- groups_of_processors={},
12
- supervisor="gpt4_supervisor",
13
- **kwargs,
14
- ):
15
- self.ctm_name = ctm_name
16
- self.max_iter_num = max_iter_num
17
- self.output_threshold = output_threshold
18
- self.groups_of_processors = groups_of_processors
19
- self.supervisor = supervisor
20
- # This allows for handling additional, possibly unknown configuration parameters
 
 
21
  for key, value in kwargs.items():
22
  setattr(self, key, value)
23
 
24
- def to_json_string(self):
25
  """Serializes this instance to a JSON string."""
26
  return json.dumps(self.__dict__, indent=2) + "\n"
27
 
28
  @classmethod
29
- def from_json_file(cls, json_file):
 
 
30
  """Creates an instance from a JSON file."""
31
  with open(json_file, "r", encoding="utf-8") as reader:
32
  text = reader.read()
33
  return cls(**json.loads(text))
34
 
35
  @classmethod
36
- def from_ctm(cls, ctm_name):
37
  """
38
  Simulate fetching a model configuration from a ctm model repository.
39
  This example assumes the configuration is already downloaded and saved locally.
40
  """
41
  # This path would be generated dynamically based on `model_name_or_path`
42
  # For simplicity, we're directly using it as a path to a local file
43
- config_file = f"../ctm/configs/{ctm_name}_config.json"
44
  return cls.from_json_file(config_file)
 
1
  import json
2
+ from typing import Any, Dict, Optional
3
 
4
 
5
+ class BaseConsciousnessTuringMachineConfig:
 
6
  def __init__(
7
  self,
8
+ ctm_name: Optional[str] = None,
9
+ max_iter_num: int = 3,
10
+ output_threshold: float = 0.5,
11
+ groups_of_processors: Dict[
12
+ str, Any
13
+ ] = {}, # Better to avoid mutable default arguments
14
+ supervisor: str = "gpt4_supervisor",
15
+ **kwargs: Any,
16
+ ) -> None:
17
+ self.ctm_name: Optional[str] = ctm_name
18
+ self.max_iter_num: int = max_iter_num
19
+ self.output_threshold: float = output_threshold
20
+ self.groups_of_processors: Dict[str, Any] = groups_of_processors
21
+ self.supervisor: str = supervisor
22
+ # Handle additional, possibly unknown configuration parameters
23
  for key, value in kwargs.items():
24
  setattr(self, key, value)
25
 
26
+ def to_json_string(self) -> str:
27
  """Serializes this instance to a JSON string."""
28
  return json.dumps(self.__dict__, indent=2) + "\n"
29
 
30
  @classmethod
31
+ def from_json_file(
32
+ cls, json_file: str
33
+ ) -> "BaseConsciousnessTuringMachineConfig":
34
  """Creates an instance from a JSON file."""
35
  with open(json_file, "r", encoding="utf-8") as reader:
36
  text = reader.read()
37
  return cls(**json.loads(text))
38
 
39
  @classmethod
40
+ def from_ctm(cls, ctm_name: str) -> "BaseConsciousnessTuringMachineConfig":
41
  """
42
  Simulate fetching a model configuration from a ctm model repository.
43
  This example assumes the configuration is already downloaded and saved locally.
44
  """
45
  # This path would be generated dynamically based on `model_name_or_path`
46
  # For simplicity, we're directly using it as a path to a local file
47
+ config_file = f"../ctm_conf/{ctm_name}_config.json"
48
  return cls.from_json_file(config_file)
ctm/configs/sarcasm_ctm_config.json DELETED
@@ -1,18 +0,0 @@
1
- {
2
- "ctm_name": "sarcasm_ctm",
3
- "max_iter_num": 3,
4
- "output_threshold": 0.5,
5
- "groups_of_processors": {
6
- "group_1": [
7
- "gpt4v_scene_location_processor",
8
- "gpt4v_cloth_fashion_processor"
9
- ],
10
- "group_2": [
11
- "gpt4v_posture_processor"
12
- ],
13
- "group_3": [
14
- "gpt4v_ocr_processor"
15
- ]
16
- },
17
- "supervisor": "gpt4_supervisor"
18
- }
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ctm/ctms/ctm_base.py CHANGED
@@ -1,33 +1,41 @@
1
  import concurrent.futures
2
  from collections import defaultdict
 
3
 
4
  from sklearn.feature_extraction.text import TfidfVectorizer
5
  from sklearn.metrics.pairwise import cosine_similarity
6
 
7
- from ctm.configs.ctm_config_base import (
8
- BaseConsciousnessTuringMachineConfig,
9
- )
10
- from ctm.processors.processor_base import BaseProcessor
11
- from ctm.supervisors.supervisor_base import BaseSupervisor
12
 
13
 
14
  class BaseConsciousnessTuringMachine(object):
15
- def __call__(self, *args, **kwargs):
16
- return self.forward(*args, **kwargs)
17
-
18
- def __init__(self, ctm_name=None, *args, **kwargs):
19
- super().__init__(*args, **kwargs)
20
  if ctm_name:
21
  self.config = BaseConsciousnessTuringMachineConfig.from_ctm(
22
  ctm_name
23
  )
24
  else:
25
  self.config = BaseConsciousnessTuringMachineConfig()
26
- self.processor_list = []
27
- self.processor_group_map = defaultdict(list)
28
  self.load_ctm()
29
 
30
- def add_processor(self, processor_name, group_name=None):
 
 
 
 
 
 
 
 
 
 
 
 
31
  processor_instance = BaseProcessor(processor_name)
32
  self.processor_list.append(
33
  {
@@ -38,37 +46,52 @@ class BaseConsciousnessTuringMachine(object):
38
  if group_name:
39
  self.processor_group_map[processor_name] = group_name
40
 
41
- def add_supervisor(self, supervisor_name):
42
  supervisor_instance = BaseSupervisor(supervisor_name)
43
- self.supervisor = {
44
  "supervisor_name": supervisor_name,
45
  "supervisor_instance": supervisor_instance,
46
  }
47
 
48
  @staticmethod
49
  def ask_processor(
50
- processor, question, context, image_path, audio_path, video_path
51
- ):
 
 
 
 
 
52
  processor_instance = processor["processor_instance"]
53
  processor_name = processor["processor_name"]
 
54
  gist, score = processor_instance.ask(
55
- question, context, image_path, audio_path, video_path
 
 
 
 
56
  )
57
  return {"name": processor_name, "gist": gist, "score": score}
58
 
59
  def ask_processors(
60
- self, question, context, image_path, audio_path, video_path
61
- ):
 
 
 
 
 
62
  with concurrent.futures.ThreadPoolExecutor() as executor:
63
  futures = [
64
  executor.submit(
65
  self.ask_processor,
66
  processor,
67
- question,
68
- context,
69
- image_path,
70
- audio_path,
71
- video_path,
72
  )
73
  for processor in self.processor_list
74
  ]
@@ -77,7 +100,7 @@ class BaseConsciousnessTuringMachine(object):
77
  for future in concurrent.futures.as_completed(futures)
78
  ]
79
 
80
- output = {}
81
  for result in results:
82
  output[result["name"]] = {
83
  "gist": result["gist"],
@@ -87,49 +110,68 @@ class BaseConsciousnessTuringMachine(object):
87
  assert len(output) == len(self.processor_list)
88
  return output
89
 
90
- def uptree_competition(self, processor_output):
 
 
91
  # Unpack processor outputs into lists for easier processing
92
- gists, scores, names = [], [], []
 
 
 
93
  for name, info in processor_output.items():
94
  gists.append(info["gist"])
95
  scores.append(info["score"])
96
  names.append(name)
97
 
98
  # Determine the unique group for each processor
99
- unique_groups = set(self.processor_group_map.values())
100
 
101
  # Prepare to track the best processor by group
102
- best_processor_by_group = {
103
- group: (None, -1) for group in unique_groups
104
- } # (processor_name, score)
 
 
 
 
105
 
106
  # Iterate through processors to find the best in each group
107
  for name, score in zip(names, scores):
108
- group = self.processor_group_map[name]
109
  if score > best_processor_by_group[group][1]:
110
  best_processor_by_group[group] = (name, score)
111
 
112
  # Select the overall best across groups
113
- best_overall = max(
114
  best_processor_by_group.values(), key=lambda x: x[1]
115
  )
116
- best_name = best_overall[0]
117
- index = names.index(best_name)
 
 
 
 
118
 
119
- winning_info = {
 
 
 
 
120
  "name": best_name,
121
  "gist": gists[index],
122
  "score": scores[index],
123
  }
124
  return winning_info
125
 
126
- def ask_supervisor(self, question, processor_info):
 
 
127
  final_answer, score = self.supervisor["supervisor_instance"].ask(
128
- question, processor_info["gist"]
129
  )
130
  return final_answer, score
131
 
132
- def downtree_broadcast(self, winning_output):
133
  winning_processor_name = winning_output["name"]
134
  winning_processor_gist = winning_output["gist"]
135
  for processor in self.processor_list:
@@ -139,14 +181,16 @@ class BaseConsciousnessTuringMachine(object):
139
  )
140
  return
141
 
142
- def calc_processor_sim(self, processor_output):
 
 
143
  processor_gists = [info["gist"] for info in processor_output.values()]
144
  tfidf_vectorizer = TfidfVectorizer()
145
  tfidf_matrix = tfidf_vectorizer.fit_transform(processor_gists)
146
  cosine_sim = cosine_similarity(tfidf_matrix, tfidf_matrix)
147
  return cosine_sim
148
 
149
- def link_form(self, processor_output):
150
  sim = self.calc_processor_sim(processor_output)
151
  print(sim)
152
  # iterate on each sim pair
@@ -184,34 +228,33 @@ class BaseConsciousnessTuringMachine(object):
184
  self.processor_group_map[processor2_name] = group_name
185
  return
186
 
187
- def processor_fuse(self, infos, scores):
 
 
188
  return infos, scores
189
 
190
  def forward(
191
  self,
192
- question=None,
193
- context=None,
194
- image_path=None,
195
- audio_path=None,
196
- video_path=None,
197
- ):
198
  answer_threshold = 0.5
199
  max_iter = 3
200
 
201
  for i in range(max_iter):
202
  print("start the {}-th iteration".format(i + 1))
203
  processor_output = self.ask_processors(
204
- question=question,
205
- context=context,
206
- image_path=image_path,
207
- audio_path=audio_path,
208
- video_path=video_path,
209
  )
210
- import pdb
211
-
212
- pdb.set_trace()
213
  winning_output = self.uptree_competition(processor_output)
214
- answer, score = self.ask_supervisor(question, winning_output)
215
  if score > answer_threshold:
216
  break
217
  else:
@@ -219,7 +262,7 @@ class BaseConsciousnessTuringMachine(object):
219
  self.link_form(processor_output)
220
  return answer, score
221
 
222
- def load_ctm(self):
223
  for (
224
  group_name,
225
  processor_list,
 
1
  import concurrent.futures
2
  from collections import defaultdict
3
+ from typing import Any, Dict, List, Optional, Set, Tuple
4
 
5
  from sklearn.feature_extraction.text import TfidfVectorizer
6
  from sklearn.metrics.pairwise import cosine_similarity
7
 
8
+ from ctm.configs import BaseConsciousnessTuringMachineConfig
9
+ from ctm.processors import BaseProcessor
10
+ from ctm.supervisors import BaseSupervisor
 
 
11
 
12
 
13
  class BaseConsciousnessTuringMachine(object):
14
+ def __init__(self, ctm_name: Optional[str] = None) -> None:
15
+ super().__init__()
 
 
 
16
  if ctm_name:
17
  self.config = BaseConsciousnessTuringMachineConfig.from_ctm(
18
  ctm_name
19
  )
20
  else:
21
  self.config = BaseConsciousnessTuringMachineConfig()
22
+ self.processor_list: List[Dict[str, Any]] = []
23
+ self.processor_group_map: Dict[str, str] = defaultdict(str)
24
  self.load_ctm()
25
 
26
+ def __call__(
27
+ self,
28
+ query: str,
29
+ text: Optional[str] = None,
30
+ image: Optional[Any] = None,
31
+ audio: Optional[Any] = None,
32
+ video_frames: Optional[Any] = None,
33
+ ) -> Tuple[str, float]:
34
+ return self.forward(query, text, image, audio, video_frames)
35
+
36
+ def add_processor(
37
+ self, processor_name: str, group_name: Optional[str] = 'default_group'
38
+ ) -> None:
39
  processor_instance = BaseProcessor(processor_name)
40
  self.processor_list.append(
41
  {
 
46
  if group_name:
47
  self.processor_group_map[processor_name] = group_name
48
 
49
+ def add_supervisor(self, supervisor_name: str) -> None:
50
  supervisor_instance = BaseSupervisor(supervisor_name)
51
+ self.supervisor: Dict[str, Any] = {
52
  "supervisor_name": supervisor_name,
53
  "supervisor_instance": supervisor_instance,
54
  }
55
 
56
  @staticmethod
57
  def ask_processor(
58
+ processor: Dict[str, Any],
59
+ query: str,
60
+ text: Optional[str] = None,
61
+ image: Optional[Any] = None,
62
+ audio: Optional[Any] = None,
63
+ video_frames: Optional[Any] = None,
64
+ ) -> Dict[str, Any]:
65
  processor_instance = processor["processor_instance"]
66
  processor_name = processor["processor_name"]
67
+ print(processor_name)
68
  gist, score = processor_instance.ask(
69
+ query=query,
70
+ text=text,
71
+ image=image,
72
+ audio=audio,
73
+ video_frames=video_frames,
74
  )
75
  return {"name": processor_name, "gist": gist, "score": score}
76
 
77
  def ask_processors(
78
+ self,
79
+ query: str,
80
+ text: Optional[str] = None,
81
+ image: Optional[Any] = None,
82
+ audio: Optional[Any] = None,
83
+ video_frames: Optional[Any] = None,
84
+ ) -> Dict[str, Dict[str, Any]]:
85
  with concurrent.futures.ThreadPoolExecutor() as executor:
86
  futures = [
87
  executor.submit(
88
  self.ask_processor,
89
  processor,
90
+ query,
91
+ text,
92
+ image,
93
+ audio,
94
+ video_frames,
95
  )
96
  for processor in self.processor_list
97
  ]
 
100
  for future in concurrent.futures.as_completed(futures)
101
  ]
102
 
103
+ output: Dict[str, Dict[str, Any]] = {}
104
  for result in results:
105
  output[result["name"]] = {
106
  "gist": result["gist"],
 
110
  assert len(output) == len(self.processor_list)
111
  return output
112
 
113
+ def uptree_competition(
114
+ self, processor_output: Dict[str, Dict[str, Any]]
115
+ ) -> Dict[str, Any]:
116
  # Unpack processor outputs into lists for easier processing
117
+ gists: List[str] = []
118
+ scores: List[float] = []
119
+ names: List[str] = []
120
+
121
  for name, info in processor_output.items():
122
  gists.append(info["gist"])
123
  scores.append(info["score"])
124
  names.append(name)
125
 
126
  # Determine the unique group for each processor
127
+ unique_groups: Set[str] = set(self.processor_group_map.values())
128
 
129
  # Prepare to track the best processor by group
130
+ best_processor_by_group: Dict[str, Tuple[Optional[str], float]] = {
131
+ group: (
132
+ None,
133
+ float("-inf"),
134
+ ) # Use negative infinity as the initial lowest score
135
+ for group in unique_groups
136
+ }
137
 
138
  # Iterate through processors to find the best in each group
139
  for name, score in zip(names, scores):
140
+ group = self.processor_group_map.get(name, "")
141
  if score > best_processor_by_group[group][1]:
142
  best_processor_by_group[group] = (name, score)
143
 
144
  # Select the overall best across groups
145
+ best_overall: Tuple[Optional[str], float] = max(
146
  best_processor_by_group.values(), key=lambda x: x[1]
147
  )
148
+ best_name: Optional[str] = best_overall[0]
149
+
150
+ if best_name is None:
151
+ raise ValueError(
152
+ "No valid processor found."
153
+ ) # Ensure best_name is not None
154
 
155
+ index: int = names.index(
156
+ best_name
157
+ ) # Now best_name is guaranteed to be not None
158
+
159
+ winning_info: Dict[str, Any] = {
160
  "name": best_name,
161
  "gist": gists[index],
162
  "score": scores[index],
163
  }
164
  return winning_info
165
 
166
+ def ask_supervisor(
167
+ self, query: str, processor_info: Dict[str, Any]
168
+ ) -> Tuple[str, float]:
169
  final_answer, score = self.supervisor["supervisor_instance"].ask(
170
+ query, processor_info["gist"]
171
  )
172
  return final_answer, score
173
 
174
+ def downtree_broadcast(self, winning_output: Dict[str, str]) -> None:
175
  winning_processor_name = winning_output["name"]
176
  winning_processor_gist = winning_output["gist"]
177
  for processor in self.processor_list:
 
181
  )
182
  return
183
 
184
+ def calc_processor_sim(
185
+ self, processor_output: Dict[str, Dict[str, str]]
186
+ ) -> Any:
187
  processor_gists = [info["gist"] for info in processor_output.values()]
188
  tfidf_vectorizer = TfidfVectorizer()
189
  tfidf_matrix = tfidf_vectorizer.fit_transform(processor_gists)
190
  cosine_sim = cosine_similarity(tfidf_matrix, tfidf_matrix)
191
  return cosine_sim
192
 
193
+ def link_form(self, processor_output: Dict[str, Dict[str, str]]) -> None:
194
  sim = self.calc_processor_sim(processor_output)
195
  print(sim)
196
  # iterate on each sim pair
 
228
  self.processor_group_map[processor2_name] = group_name
229
  return
230
 
231
+ def processor_fuse(
232
+ self, infos: List[str], scores: List[float]
233
+ ) -> Tuple[List[str], List[float]]:
234
  return infos, scores
235
 
236
  def forward(
237
  self,
238
+ query: str,
239
+ text: Optional[str] = None,
240
+ image: Optional[Any] = None,
241
+ audio: Optional[Any] = None,
242
+ video_frames: Optional[Any] = None,
243
+ ) -> Tuple[str, float]:
244
  answer_threshold = 0.5
245
  max_iter = 3
246
 
247
  for i in range(max_iter):
248
  print("start the {}-th iteration".format(i + 1))
249
  processor_output = self.ask_processors(
250
+ query=query,
251
+ text=text,
252
+ image=image,
253
+ audio=audio,
254
+ video_frames=video_frames,
255
  )
 
 
 
256
  winning_output = self.uptree_competition(processor_output)
257
+ answer, score = self.ask_supervisor(query, winning_output)
258
  if score > answer_threshold:
259
  break
260
  else:
 
262
  self.link_form(processor_output)
263
  return answer, score
264
 
265
+ def load_ctm(self) -> None:
266
  for (
267
  group_name,
268
  processor_list,
ctm/messengers/messenger_bart_text_summ.py CHANGED
@@ -1,22 +1,35 @@
1
- from typing import Dict, List, Union
2
 
3
- from ctm.messengers.messenger_base import BaseMessenger
4
 
 
5
 
6
- @BaseMessenger.register_messenger("bart_text_summ_messenger") # type: ignore[no-untyped-call] # FIX ME
 
7
  class BartTextSummarizationMessenger(BaseMessenger):
8
- def __init__(self, role=None, content=None, *args, **kwargs): # type: ignore[no-untyped-def] # FIX ME
 
 
 
 
 
 
 
9
  self.init_messenger(role, content)
10
 
11
- def init_messenger( # type: ignore[no-untyped-def] # FIX ME
12
- self, role: str = None, content: Union[str, Dict, List] = None # type: ignore[assignment, type-arg] # FIX ME
13
- ):
14
- self.messages = ""
15
- if content and role:
16
- self.update_messages(role, content) # type: ignore[attr-defined] # FIX ME
 
 
17
 
18
- def update_message(self, role: str, content: Union[str, Dict, List]): # type: ignore[no-untyped-def, type-arg] # FIX ME
19
- self.messages += content # type: ignore[operator] # FIX ME
 
 
20
 
21
- def check_iter_round_num(self): # type: ignore[no-untyped-def] # FIX ME
22
- return 1 if len(self.messages) > 0 else 0
 
1
+ from typing import Any, Dict, List, Optional, TypeVar, Union
2
 
3
+ from .messenger_base import BaseMessenger
4
 
5
+ T = TypeVar("T", bound="BaseMessenger")
6
 
7
+
8
+ @BaseMessenger.register_messenger("bart_text_summ_messenger")
9
  class BartTextSummarizationMessenger(BaseMessenger):
10
+ def __init__(
11
+ self,
12
+ role: Optional[str] = None,
13
+ content: Optional[Union[str, Dict[str, Any], List[Any]]] = None,
14
+ *args: Any,
15
+ **kwargs: Any,
16
+ ) -> None:
17
+ super().__init__(*args, **kwargs)
18
  self.init_messenger(role, content)
19
 
20
+ def init_messenger(
21
+ self,
22
+ role: Optional[str] = None,
23
+ content: Optional[Union[str, Dict[str, Any], List[Any]]] = None,
24
+ ) -> None:
25
+ self.messages: str = ""
26
+ if role and content:
27
+ self.update_message(role, content)
28
 
29
+ def update_message(
30
+ self, role: str, content: Union[str, Dict[str, Any], List[Any]]
31
+ ) -> None:
32
+ self.messages += content
33
 
34
+ def check_iter_round_num(self) -> int:
35
+ return len(self.messages)
ctm/messengers/messenger_base.py CHANGED
@@ -1,18 +1,39 @@
1
- from typing import Dict, List, Union
 
 
 
 
 
 
 
 
 
2
 
 
 
3
 
4
- class BaseMessenger(object):
5
- _messenger_registry = {} # type: ignore[var-annotated] # FIX ME
 
6
 
7
  @classmethod
8
- def register_messenger(cls, messenger_name): # type: ignore[no-untyped-def] # FIX ME
9
- def decorator(subclass): # type: ignore[no-untyped-def] # FIX ME
 
 
 
 
10
  cls._messenger_registry[messenger_name] = subclass
11
  return subclass
12
 
13
  return decorator
14
 
15
- def __new__(cls, messenger_name, *args, **kwargs): # type: ignore[no-untyped-def] # FIX ME
 
 
 
 
 
16
  if messenger_name not in cls._messenger_registry:
17
  raise ValueError(
18
  f"No messenger registered with name '{messenger_name}'"
@@ -21,43 +42,63 @@ class BaseMessenger(object):
21
  cls._messenger_registry[messenger_name]
22
  )
23
 
24
- def __init__(self, role=None, content=None, *args, **kwargs): # type: ignore[no-untyped-def] # FIX ME
 
 
 
 
25
  self.init_messenger(role, content)
26
 
27
- def init_messenger( # type: ignore[no-untyped-def] # FIX ME
28
- self, role: str = None, content: Union[str, Dict, List] = None # type: ignore[assignment, type-arg] # FIX ME
29
- ):
30
- pass
 
 
 
 
31
 
32
- def update_message(self, role: str, content: Union[str, Dict, List]): # type: ignore[no-untyped-def, type-arg] # FIX ME
33
- pass
 
 
34
 
35
- def check_iter_round_num(self): # type: ignore[no-untyped-def] # FIX ME
36
- pass
37
 
38
- def add_system_message(self, message: Union[str, Dict, List]): # type: ignore[no-untyped-def, type-arg] # FIX ME
 
 
39
  self.update_message("system", message)
40
 
41
- def add_assistant_message(self, message: Union[str, Dict, List]): # type: ignore[no-untyped-def, type-arg] # FIX ME
 
 
42
  self.update_message("assistant", message)
43
 
44
- def add_user_message(self, message: Union[str, Dict, List]): # type: ignore[no-untyped-def, type-arg] # FIX ME
 
 
45
  self.update_message("user", message)
46
 
47
- def add_user_image(self, image_base64: str): # type: ignore[no-untyped-def] # FIX ME
48
- self.add_message( # type: ignore[attr-defined] # FIX ME
49
  "user",
50
  {
51
- "type": "image_url",
52
  "image_url": f"data:image/jpeg;base64,{image_base64}",
53
  },
54
  )
55
 
56
- def add_feedback(self, feedback: Union[str, Dict, List]): # type: ignore[no-untyped-def, type-arg] # FIX ME
57
- self.add_message("system", feedback) # type: ignore[attr-defined] # FIX ME
 
 
58
 
59
- def clear(self): # type: ignore[no-untyped-def] # FIX ME
60
- self.messages.clear() # type: ignore[attr-defined] # FIX ME
61
 
62
- def get_messages(self): # type: ignore[no-untyped-def] # FIX ME
63
- return self.messages # type: ignore[attr-defined] # FIX ME
 
 
 
1
+ from typing import (
2
+ Any,
3
+ Callable,
4
+ Dict,
5
+ List,
6
+ Optional,
7
+ Type,
8
+ TypeVar,
9
+ Union,
10
+ )
11
 
12
+ # This TypeVar is used for methods that might need to return or work with instances of subclasses of BaseMessenger.
13
+ T = TypeVar("T")
14
 
15
+
16
+ class BaseMessenger:
17
+ _messenger_registry: Dict[str, Type["BaseMessenger"]] = {}
18
 
19
  @classmethod
20
+ def register_messenger(
21
+ cls, messenger_name: str
22
+ ) -> Callable[[Type["BaseMessenger"]], Type["BaseMessenger"]]:
23
+ def decorator(
24
+ subclass: Type["BaseMessenger"],
25
+ ) -> Type["BaseMessenger"]:
26
  cls._messenger_registry[messenger_name] = subclass
27
  return subclass
28
 
29
  return decorator
30
 
31
+ def __new__(
32
+ cls: Type["BaseMessenger"],
33
+ messenger_name: str,
34
+ *args: Any,
35
+ **kwargs: Any,
36
+ ) -> "BaseMessenger":
37
  if messenger_name not in cls._messenger_registry:
38
  raise ValueError(
39
  f"No messenger registered with name '{messenger_name}'"
 
42
  cls._messenger_registry[messenger_name]
43
  )
44
 
45
+ def __init__(
46
+ self,
47
+ role: Optional[str] = None,
48
+ content: Optional[Union[str, Dict[str, Any], List[Any]]] = None,
49
+ ) -> None:
50
  self.init_messenger(role, content)
51
 
52
+ def init_messenger(
53
+ self,
54
+ role: Optional[str] = None,
55
+ content: Optional[Union[str, Dict[str, Any], List[Any]]] = None,
56
+ ) -> None:
57
+ raise NotImplementedError(
58
+ "The 'init_messenger' method must be implemented in derived classes."
59
+ )
60
 
61
+ def update_message(
62
+ self, role: str, content: Union[str, Dict[str, Any], List[Any]]
63
+ ) -> None:
64
+ self.messages.append({"role": role, "content": content})
65
 
66
+ def check_iter_round_num(self) -> int:
67
+ return len(self.messages)
68
 
69
+ def add_system_message(
70
+ self, message: Union[str, Dict[str, Any], List[Any]]
71
+ ) -> None:
72
  self.update_message("system", message)
73
 
74
+ def add_assistant_message(
75
+ self, message: Union[str, Dict[str, Any], List[Any]]
76
+ ) -> None:
77
  self.update_message("assistant", message)
78
 
79
+ def add_user_message(
80
+ self, message: Union[str, Dict[str, Any], List[Any]]
81
+ ) -> None:
82
  self.update_message("user", message)
83
 
84
+ def add_user_image(self, image_base64: str) -> None:
85
+ self.update_message(
86
  "user",
87
  {
88
+ "type": "image",
89
  "image_url": f"data:image/jpeg;base64,{image_base64}",
90
  },
91
  )
92
 
93
+ def add_feedback(
94
+ self, feedback: Union[str, Dict[str, Any], List[Any]]
95
+ ) -> None:
96
+ self.update_message("system", feedback)
97
 
98
+ def clear(self) -> None:
99
+ self.messages.clear()
100
 
101
+ def get_messages(
102
+ self,
103
+ ) -> List[Dict[str, Union[str, Dict[str, Any], List[Any]]]]:
104
+ return self.messages
ctm/messengers/messenger_gpt4.py CHANGED
@@ -1,22 +1,39 @@
1
- from typing import Dict, List, Union
2
 
3
- from ctm.messengers.messenger_base import BaseMessenger
4
 
5
 
6
- @BaseMessenger.register_messenger("gpt4_messenger") # type: ignore[no-untyped-call] # FIX ME
 
7
  class GPT4Messenger(BaseMessenger):
8
- def __init__(self, role=None, content=None, *args, **kwargs): # type: ignore[no-untyped-def] # FIX ME
 
 
 
 
 
 
 
9
  self.init_messenger(role, content)
10
 
11
- def init_messenger( # type: ignore[no-untyped-def] # FIX ME
12
- self, role: str = None, content: Union[str, Dict, List] = None # type: ignore[assignment, type-arg] # FIX ME
13
- ):
14
- self.messages = [] # type: ignore[var-annotated] # FIX ME
15
- if content and role:
16
- self.update_messages(role, content) # type: ignore[attr-defined] # FIX ME
 
 
 
 
 
17
 
18
- def update_message(self, role: str, content: Union[str, Dict, List]): # type: ignore[no-untyped-def, type-arg] # FIX ME
 
 
 
19
  self.messages.append({"role": role, "content": content})
20
 
21
- def check_iter_round_num(self): # type: ignore[no-untyped-def] # FIX ME
 
22
  return len(self.messages)
 
1
+ from typing import Any, Dict, List, Optional, Tuple, Union
2
 
3
+ from .messenger_base import BaseMessenger
4
 
5
 
6
+ # Assuming BaseMessenger has a correctly typed decorator:
7
+ @BaseMessenger.register_messenger("gpt4_messenger")
8
  class GPT4Messenger(BaseMessenger):
9
+ def __init__(
10
+ self,
11
+ role: Optional[str] = None,
12
+ content: Optional[Union[str, Dict[str, Any], List[Any]]] = None,
13
+ *args: Any,
14
+ **kwargs: Any
15
+ ) -> None:
16
+ super().__init__(*args, **kwargs)
17
  self.init_messenger(role, content)
18
 
19
+ def init_messenger(
20
+ self,
21
+ role: Optional[str] = None,
22
+ content: Optional[Union[str, Dict[str, Any], List[Any]]] = None,
23
+ ) -> None:
24
+ # Define messages as a list of dictionaries with specific types
25
+ self.messages: List[
26
+ Dict[str, Union[str, Dict[str, Any], List[Any]]]
27
+ ] = []
28
+ if role is not None and content is not None:
29
+ self.update_message(role, content)
30
 
31
+ def update_message(
32
+ self, role: str, content: Union[str, Dict[str, Any], List[Any]]
33
+ ) -> None:
34
+ # Append a new message to the list with a specified structure
35
  self.messages.append({"role": role, "content": content})
36
 
37
+ def check_iter_round_num(self) -> int:
38
+ # Return the number of iterations, which is the length of the messages list
39
  return len(self.messages)
ctm/messengers/messenger_gpt4v.py CHANGED
@@ -1,22 +1,41 @@
1
- from typing import Dict, List, Union
2
 
3
- from ctm.messengers.messenger_base import BaseMessenger
4
 
 
5
 
6
- @BaseMessenger.register_messenger("gpt4v_messenger") # type: ignore[no-untyped-call] # FIX ME
 
 
 
7
  class GPT4VMessenger(BaseMessenger):
8
- def __init__(self, role=None, content=None, *args, **kwargs): # type: ignore[no-untyped-def] # FIX ME
 
 
 
 
 
 
 
9
  self.init_messenger(role, content)
10
 
11
- def init_messenger( # type: ignore[no-untyped-def] # FIX ME
12
- self, role: str = None, content: Union[str, Dict, List] = None # type: ignore[assignment, type-arg] # FIX ME
13
- ):
14
- self.messages = [] # type: ignore[var-annotated] # FIX ME
15
- if content and role:
16
- self.update_messages(role, content) # type: ignore[attr-defined] # FIX ME
 
 
 
 
17
 
18
- def update_message(self, role: str, content: Union[str, Dict, List]): # type: ignore[no-untyped-def, type-arg] # FIX ME
 
 
 
19
  self.messages.append({"role": role, "content": content})
20
 
21
- def check_iter_round_num(self): # type: ignore[no-untyped-def] # FIX ME
 
22
  return len(self.messages)
 
1
+ from typing import Any, Dict, List, Optional, TypeVar, Union
2
 
3
+ from .messenger_base import BaseMessenger
4
 
5
+ T = TypeVar("T", bound="BaseMessenger")
6
 
7
+
8
+ # If the BaseMessenger has a register_messenger method that is not typed to accept a generic class,
9
+ # you might need to define it properly in BaseMessenger or ensure that the typing is correct there.
10
+ @BaseMessenger.register_messenger("gpt4v_messenger")
11
  class GPT4VMessenger(BaseMessenger):
12
+ def __init__(
13
+ self,
14
+ role: Optional[str] = None,
15
+ content: Optional[Union[str, Dict[str, Any], List[Any]]] = None,
16
+ *args: Any,
17
+ **kwargs: Any,
18
+ ) -> None:
19
+ super().__init__(*args, **kwargs)
20
  self.init_messenger(role, content)
21
 
22
+ def init_messenger(
23
+ self,
24
+ role: Optional[str] = None,
25
+ content: Optional[Union[str, Dict[str, Any], List[Any]]] = None,
26
+ ) -> None:
27
+ self.messages: List[
28
+ Dict[str, Union[str, Dict[str, Any], List[Any]]]
29
+ ] = []
30
+ if role is not None and content is not None:
31
+ self.update_message(role, content)
32
 
33
+ def update_message(
34
+ self, role: str, content: Union[str, Dict[str, Any], List[Any]]
35
+ ) -> None:
36
+ # Ensuring that 'messages' is defined and typed properly in the base class
37
  self.messages.append({"role": role, "content": content})
38
 
39
+ def check_iter_round_num(self) -> int:
40
+ # Count the number of entries in the messages list
41
  return len(self.messages)
ctm/messengers/messenger_roberta_text_sentiment.py CHANGED
@@ -1,23 +1,34 @@
1
- from typing import Dict, List, Union
2
 
3
- from ctm.messengers.messenger_base import BaseMessenger
4
 
 
5
 
6
- @BaseMessenger.register_messenger("roberta_text_sentiment_messenger") # type: ignore[no-untyped-call] # FIX ME
 
7
  class RobertaTextSentimentMessenger(BaseMessenger):
8
- def __init__(self, role=None, content=None, *args, **kwargs): # type: ignore[no-untyped-def] # FIX ME
9
- self.init_messenger(role, content)
 
 
 
 
 
 
10
 
11
- def init_messenger( # type: ignore[no-untyped-def] # FIX ME
12
- self, role: str = None, content: Union[str, Dict, List] = None # type: ignore[assignment, type-arg] # FIX ME
13
- ):
14
- self.messages = ""
15
- if content and role:
16
- self.update_messages(role, content) # type: ignore[attr-defined] # FIX ME
 
 
17
 
18
- def update_message(self, role: str, content: Union[str, Dict, List]): # type: ignore[no-untyped-def, type-arg] # FIX ME
19
- # should replace with updated message
20
- self.messages = content # type: ignore[assignment] # FIX ME
 
21
 
22
- def check_iter_round_num(self): # type: ignore[no-untyped-def] # FIX ME
23
- return 1 if len(self.messages) > 0 else 0
 
1
+ from typing import Any, Dict, List, Optional, TypeVar, Union
2
 
3
+ from .messenger_base import BaseMessenger
4
 
5
+ T = TypeVar("T", bound="BaseMessenger")
6
 
7
+
8
+ @BaseMessenger.register_messenger("roberta_text_sentiment_messenger")
9
  class RobertaTextSentimentMessenger(BaseMessenger):
10
+ def __init__(
11
+ self,
12
+ role: Optional[str] = None,
13
+ content: Optional[Union[str, Dict[str, Any], List[Any]]] = None,
14
+ *args: Any,
15
+ **kwargs: Any,
16
+ ) -> None:
17
+ super().__init__(role, content, *args, **kwargs)
18
 
19
+ def init_messenger(
20
+ self,
21
+ role: Optional[str] = None,
22
+ content: Optional[Union[str, Dict[str, Any], List[Any]]] = None,
23
+ ) -> None:
24
+ self.messages: str = ""
25
+ if role and content:
26
+ self.update_message(role, content)
27
 
28
+ def update_message(
29
+ self, role: str, content: Union[str, Dict[str, Any], List[Any]]
30
+ ) -> None:
31
+ self.messages += content
32
 
33
+ def check_iter_round_num(self) -> int:
34
+ return len(self.messages)
ctm/processors/processor_bart_text_summary.py CHANGED
@@ -1,55 +1,61 @@
 
1
  import os
 
2
 
3
- from huggingface_hub.inference_api import (
4
- InferenceApi, # type: ignore[import] # FIX ME
5
- )
6
 
7
  from ctm.messengers.messenger_base import BaseMessenger
8
  from ctm.processors.processor_base import BaseProcessor
9
 
10
 
11
- @BaseProcessor.register_processor("bart_text_summary_processor") # type: ignore[no-untyped-call] # FIX ME
12
  class BartTextSummaryProcessor(BaseProcessor):
13
- def __init__(self, *args, **kwargs): # type: ignore[no-untyped-def] # FIX ME
14
- self.init_processor() # type: ignore[no-untyped-call] # FIX ME
 
 
15
 
16
- def init_processor(self): # type: ignore[no-untyped-def] # FIX ME
17
- self.model = InferenceApi(
18
- token=os.environ["HF_TOKEN"], repo_id="facebook/bart-large-cnn"
19
- )
20
- self.messenger = BaseMessenger("bart_text_summ_messenger") # type: ignore[no-untyped-call] # FIX ME
21
- return
22
 
23
- def update_info(self, feedback: str): # type: ignore[no-untyped-def] # FIX ME
 
 
 
 
 
 
24
  self.messenger.add_assistant_message(feedback)
25
 
26
- def ask_info( # type: ignore[override] # FIX ME
27
- self,
28
- query: str,
29
- context: str = None, # type: ignore[assignment] # FIX ME
30
- image_path: str = None, # type: ignore[assignment] # FIX ME
31
- audio_path: str = None, # type: ignore[assignment] # FIX ME
32
- video_path: str = None, # type: ignore[assignment] # FIX ME
33
- ) -> str:
34
- if self.messenger.check_iter_round_num() == 0: # type: ignore[no-untyped-call] # FIX ME
35
- self.messenger.add_user_message(context)
36
 
37
- response = self.model(self.messenger.get_messages()) # type: ignore[no-untyped-call] # FIX ME
38
- summary = response[0]["summary_text"]
39
- return summary # type: ignore[no-any-return] # FIX ME
 
 
 
 
40
 
41
 
42
  if __name__ == "__main__":
43
- processor = BaseProcessor("bart_text_summ_processor") # type: ignore[no-untyped-call] # FIX ME
44
  image_path = "../ctmai-test1.png"
45
- text: str = (
46
  "In a shocking turn of events, Hugging Face has released a new version of Transformers "
47
  "that brings several enhancements and bug fixes. Users are thrilled with the improvements "
48
  "and are finding the new version to be significantly better than the previous one. "
49
  "The Hugging Face team is thankful for the community's support and continues to work "
50
  "towards making the library the best it can be."
51
  )
52
- summary: str = processor.ask_info( # type: ignore[no-untyped-call] # FIX ME
53
- query=None, context=text, image_path=image_path
54
- )
55
  print(summary)
 
1
+ import json
2
  import os
3
+ from typing import Any, Dict, Optional
4
 
5
+ from huggingface_hub import InferenceClient
 
 
6
 
7
  from ctm.messengers.messenger_base import BaseMessenger
8
  from ctm.processors.processor_base import BaseProcessor
9
 
10
 
11
+ @BaseProcessor.register_processor("bart_text_summary_processor")
12
  class BartTextSummaryProcessor(BaseProcessor):
13
+ def __init__(self, *args: Any, **kwargs: Any) -> None:
14
+ super().__init__(
15
+ *args, **kwargs
16
+ ) # Ensure base class is properly initialized
17
 
18
+ def init_executor(self) -> None:
19
+ hf_token = os.getenv("HF_TOKEN")
20
+ if not hf_token:
21
+ raise ValueError("HF_TOKEN environment variable is not set")
22
+ self.executor = InferenceClient(token=hf_token)
 
23
 
24
+ def init_messenger(self) -> None:
25
+ self.messenger = BaseMessenger("bart_text_summ_messenger")
26
+
27
+ def init_task_info(self) -> None:
28
+ pass
29
+
30
+ def update_info(self, feedback: str) -> None:
31
  self.messenger.add_assistant_message(feedback)
32
 
33
+ def ask_info(
34
+ self, text: Optional[str] = None, *args: Any, **kwargs: Any
35
+ ) -> str | Any:
36
+ if text is None:
37
+ raise ValueError("Context must not be None")
38
+ if self.messenger.check_iter_round_num() == 0:
39
+ self.messenger.add_user_message(text)
 
 
 
40
 
41
+ response: Dict[str, Any] = json.loads(
42
+ self.executor.post(
43
+ json={"inputs": self.messenger.get_messages()},
44
+ model="facebook/bart-large-cnn",
45
+ )
46
+ )[0]
47
+ return response["summary_text"]
48
 
49
 
50
  if __name__ == "__main__":
51
+ processor = BartTextSummaryProcessor()
52
  image_path = "../ctmai-test1.png"
53
+ text = (
54
  "In a shocking turn of events, Hugging Face has released a new version of Transformers "
55
  "that brings several enhancements and bug fixes. Users are thrilled with the improvements "
56
  "and are finding the new version to be significantly better than the previous one. "
57
  "The Hugging Face team is thankful for the community's support and continues to work "
58
  "towards making the library the best it can be."
59
  )
60
+ summary = processor.ask_info(context=text, image_path=image_path)
 
 
61
  print(summary)
ctm/processors/processor_base.py CHANGED
@@ -1,20 +1,28 @@
1
- import base64
2
 
3
- from ctm.utils.exponential_backoff import exponential_backoff
 
 
4
 
5
 
6
  class BaseProcessor(object):
7
- _processor_registry = {} # type: ignore[var-annotated] # FIX ME
8
 
9
  @classmethod
10
- def register_processor(cls, processor_name): # type: ignore[no-untyped-def] # FIX ME
11
- def decorator(subclass): # type: ignore[no-untyped-def] # FIX ME
 
 
 
 
12
  cls._processor_registry[processor_name] = subclass
13
  return subclass
14
 
15
  return decorator
16
 
17
- def __new__(cls, processor_name, *args, **kwargs): # type: ignore[no-untyped-def] # FIX ME
 
 
18
  if processor_name not in cls._processor_registry:
19
  raise ValueError(
20
  f"No processor registered with name '{processor_name}'"
@@ -23,85 +31,110 @@ class BaseProcessor(object):
23
  cls._processor_registry[processor_name]
24
  )
25
 
26
- def set_model(self): # type: ignore[no-untyped-def] # FIX ME
 
 
 
 
 
 
27
  raise NotImplementedError(
28
- "The 'set_model' method must be implemented in derived classes."
29
  )
30
 
31
- @staticmethod
32
- def process_image(image_path): # type: ignore[no-untyped-def] # FIX ME
33
- with open(image_path, "rb") as image_file:
34
- return base64.b64encode(image_file.read()).decode("utf-8")
35
-
36
- @staticmethod
37
- def process_audio(audio_path): # type: ignore[no-untyped-def] # FIX ME
38
- return None
39
 
40
- @staticmethod
41
- def process_video(video_path): # type: ignore[no-untyped-def] # FIX ME
42
- return None
 
43
 
44
- def ask(self, query, context, image_path, audio_path, video_path): # type: ignore[no-untyped-def] # FIX ME
45
- gist = self.ask_info( # type: ignore[no-untyped-call] # FIX ME
46
- query, context, image_path, audio_path, video_path
 
 
 
 
 
 
 
 
 
47
  )
48
- score = self.ask_score(query, gist, verbose=True) # type: ignore[no-untyped-call] # FIX ME
49
  return gist, score
50
 
51
- @exponential_backoff(retries=5, base_wait_time=1) # type: ignore[misc, no-untyped-call] # FIX ME
52
  def ask_relevance(self, query: str, gist: str) -> float:
53
- response = self.model.chat.completions.create( # type: ignore[attr-defined] # FIX ME
54
  model="gpt-4-0125-preview",
55
  messages=[
56
  {
57
  "role": "user",
58
- "content": "How related is the information ({}) with the query ({})? Answer with a number from 0 to 5 and do not add any other thing.".format(
59
- gist, query
60
- ),
61
- },
62
  ],
63
  max_tokens=50,
64
  )
65
- score = int(response.choices[0].message.content.strip()) / 5
 
 
 
 
66
  return score
67
 
68
- @exponential_backoff(retries=5, base_wait_time=1) # type: ignore[misc, no-untyped-call] # FIX ME
69
  def ask_confidence(self, query: str, gist: str) -> float:
70
- response = self.model.chat.completions.create( # type: ignore[attr-defined] # FIX ME
71
  model="gpt-4-0125-preview",
72
  messages=[
73
  {
74
  "role": "user",
75
- "content": "How confidence do you think the information ({}) is a mustk? Answer with a number from 0 to 5 and do not add any other thing.".format( # type: ignore[str-format] # FIX ME
76
- gist, query
77
- ),
78
- },
79
  ],
80
  max_tokens=50,
81
  )
82
- score = int(response.choices[0].message.content.strip()) / 5
 
 
 
 
83
  return score
84
 
85
- @exponential_backoff(retries=5, base_wait_time=1) # type: ignore[misc, no-untyped-call] # FIX ME
86
  def ask_surprise(
87
- self, query: str, gist: str, history_gists: str = None # type: ignore[assignment] # FIX ME
88
  ) -> float:
89
- response = self.model.chat.completions.create( # type: ignore[attr-defined] # FIX ME
90
  model="gpt-4-0125-preview",
91
  messages=[
92
  {
93
  "role": "user",
94
- "content": "How surprise do you think the information ({}) is as an output of the processor? Answer with a number from 0 to 5 and do not add any other thing.".format( # type: ignore[str-format] # FIX ME
95
- gist, query
96
- ),
97
- },
98
  ],
99
  max_tokens=50,
100
  )
101
- score = int(response.choices[0].message.content.strip()) / 5
 
 
 
 
102
  return score
103
 
104
- def ask_score(self, query, gist, verbose=False, *args, **kwargs): # type: ignore[no-untyped-def] # FIX ME
 
 
 
 
 
 
 
105
  relevance = self.ask_relevance(query, gist, *args, **kwargs)
106
  confidence = self.ask_confidence(query, gist, *args, **kwargs)
107
  surprise = self.ask_surprise(query, gist, *args, **kwargs)
@@ -109,9 +142,11 @@ class BaseProcessor(object):
109
  print(
110
  f"Relevance: {relevance}, Confidence: {confidence}, Surprise: {surprise}"
111
  )
112
- return relevance * confidence * surprise
113
 
114
- def ask_info(self, query, image_path, *args, **kwargs): # type: ignore[no-untyped-def] # FIX ME
 
 
 
115
  raise NotImplementedError(
116
- "The 'ask_information' method must be implemented in derived classes."
117
  )
 
1
+ from typing import Any, Callable, Dict, Optional, Tuple, Type
2
 
3
+ from openai import OpenAI
4
+
5
+ from ..utils.decorator import score_exponential_backoff
6
 
7
 
8
  class BaseProcessor(object):
9
+ _processor_registry: Dict[str, Type["BaseProcessor"]] = {}
10
 
11
  @classmethod
12
+ def register_processor(
13
+ cls, processor_name: str
14
+ ) -> Callable[[Type["BaseProcessor"]], Type["BaseProcessor"]]:
15
+ def decorator(
16
+ subclass: Type["BaseProcessor"],
17
+ ) -> Type["BaseProcessor"]:
18
  cls._processor_registry[processor_name] = subclass
19
  return subclass
20
 
21
  return decorator
22
 
23
+ def __new__(
24
+ cls, processor_name: str, *args: Any, **kwargs: Any
25
+ ) -> "BaseProcessor":
26
  if processor_name not in cls._processor_registry:
27
  raise ValueError(
28
  f"No processor registered with name '{processor_name}'"
 
31
  cls._processor_registry[processor_name]
32
  )
33
 
34
+ def __init__(self, *args: Any, **kwargs: Any) -> None:
35
+ self.init_scorer()
36
+ self.init_executor()
37
+ self.init_messenger()
38
+ self.init_task_info()
39
+
40
+ def init_executor(self) -> None:
41
  raise NotImplementedError(
42
+ "The 'init_executor' method must be implemented in derived classes."
43
  )
44
 
45
+ def init_messenger(self) -> None:
46
+ raise NotImplementedError(
47
+ "The 'init_messenger' method must be implemented in derived classes."
48
+ )
 
 
 
 
49
 
50
+ def init_task_info(self) -> None:
51
+ raise NotImplementedError(
52
+ "The 'init_task_info' method must be implemented in derived classes."
53
+ )
54
 
55
+ def init_scorer(self) -> None:
56
+ self.scorer = OpenAI()
57
+
58
+ def ask(
59
+ self, query: str, text: str, image: str, audio: str, video_frames: str
60
+ ) -> Tuple[str, float]:
61
+ gist = self.ask_info(
62
+ query=query,
63
+ text=text,
64
+ image=image,
65
+ audio=audio,
66
+ video_frames=video_frames,
67
  )
68
+ score = self.ask_score(query, gist, verbose=True)
69
  return gist, score
70
 
71
+ @score_exponential_backoff(retries=5, base_wait_time=1)
72
  def ask_relevance(self, query: str, gist: str) -> float:
73
+ response = self.scorer.chat.completions.create(
74
  model="gpt-4-0125-preview",
75
  messages=[
76
  {
77
  "role": "user",
78
+ "content": f"How related is the information ({gist}) with the query ({query})? Answer with a number from 0 to 5 and do not add any other thing.",
79
+ }
 
 
80
  ],
81
  max_tokens=50,
82
  )
83
+ score = (
84
+ float(response.choices[0].message.content.strip()) / 5
85
+ if response.choices[0].message.content
86
+ else 0.0
87
+ )
88
  return score
89
 
90
+ @score_exponential_backoff(retries=5, base_wait_time=1)
91
  def ask_confidence(self, query: str, gist: str) -> float:
92
+ response = self.scorer.chat.completions.create(
93
  model="gpt-4-0125-preview",
94
  messages=[
95
  {
96
  "role": "user",
97
+ "content": f"How confident do you think the information ({gist}) is a must-know? Answer with a number from 0 to 5 and do not add any other thing.",
98
+ }
 
 
99
  ],
100
  max_tokens=50,
101
  )
102
+ score = (
103
+ float(response.choices[0].message.content.strip()) / 5
104
+ if response.choices[0].message.content
105
+ else 0.0
106
+ )
107
  return score
108
 
109
+ @score_exponential_backoff(retries=5, base_wait_time=1)
110
  def ask_surprise(
111
+ self, query: str, gist: str, history_gists: Optional[str] = None
112
  ) -> float:
113
+ response = self.scorer.chat.completions.create(
114
  model="gpt-4-0125-preview",
115
  messages=[
116
  {
117
  "role": "user",
118
+ "content": f"How surprising do you think the information ({gist}) is as an output of the processor? Answer with a number from 0 to 5 and do not add any other thing.",
119
+ }
 
 
120
  ],
121
  max_tokens=50,
122
  )
123
+ score = (
124
+ float(response.choices[0].message.content.strip()) / 5
125
+ if response.choices[0].message.content
126
+ else 0.0
127
+ )
128
  return score
129
 
130
+ def ask_score(
131
+ self,
132
+ query: str,
133
+ gist: str,
134
+ verbose: bool = False,
135
+ *args: Any,
136
+ **kwargs: Any,
137
+ ) -> float:
138
  relevance = self.ask_relevance(query, gist, *args, **kwargs)
139
  confidence = self.ask_confidence(query, gist, *args, **kwargs)
140
  surprise = self.ask_surprise(query, gist, *args, **kwargs)
 
142
  print(
143
  f"Relevance: {relevance}, Confidence: {confidence}, Surprise: {surprise}"
144
  )
 
145
 
146
+ final_score = relevance * confidence * surprise
147
+ return final_score
148
+
149
+ def ask_info(self, *args: Any, **kwargs: Any) -> str:
150
  raise NotImplementedError(
151
+ "The 'ask_info' method must be implemented in derived classes."
152
  )
ctm/processors/processor_gpt4.py CHANGED
@@ -1,59 +1,67 @@
 
 
1
  from openai import OpenAI
2
 
3
  from ctm.messengers.messenger_base import BaseMessenger
4
  from ctm.processors.processor_base import BaseProcessor
5
- from ctm.utils.exponential_backoff import exponential_backoff
6
 
7
 
8
- @BaseProcessor.register_processor("gpt4_processor") # type: ignore[no-untyped-call] # FIX ME
 
9
  class GPT4Processor(BaseProcessor):
10
- def __init__(self, *args, **kwargs): # type: ignore[no-untyped-def] # FIX ME
11
- self.init_processor() # type: ignore[no-untyped-call] # FIX ME
12
- self.task_instruction = None
 
 
 
 
 
 
 
13
 
14
- def init_processor(self): # type: ignore[no-untyped-def] # FIX ME
15
- self.model = OpenAI()
16
- self.messenger = BaseMessenger("gpt4_messenger") # type: ignore[no-untyped-call] # FIX ME
17
- return
18
 
19
- def process(self, payload: dict) -> dict: # type: ignore[type-arg] # FIX ME
20
- return # type: ignore[return-value] # FIX ME
 
21
 
22
- def update_info(self, feedback: str): # type: ignore[no-untyped-def] # FIX ME
23
  self.messenger.add_assistant_message(feedback)
24
 
25
- @exponential_backoff(retries=5, base_wait_time=1) # type: ignore[no-untyped-call] # FIX ME
26
- def gpt4_requst(self): # type: ignore[no-untyped-def] # FIX ME
27
- response = self.model.chat.completions.create(
28
  model="gpt-4-turbo-preview",
29
- messages=self.messenger.get_messages(), # type: ignore[no-untyped-call] # FIX ME
30
  max_tokens=300,
31
  )
32
- return response
33
-
34
- def ask_info( # type: ignore[override] # FIX ME
35
- self,
36
- query: str,
37
- context: str = None, # type: ignore[assignment] # FIX ME
38
- image_path: str = None, # type: ignore[assignment] # FIX ME
39
- audio_path: str = None, # type: ignore[assignment] # FIX ME
40
- video_path: str = None, # type: ignore[assignment] # FIX ME
41
  ) -> str:
42
- if self.messenger.check_iter_round_num() == 0: # type: ignore[no-untyped-call] # FIX ME
43
- self.messenger.add_user_message(
44
- "The text information for the previously described task is as follows: "
45
- + context
46
- + "Here is what you should do: "
47
- + self.task_instruction # type: ignore[operator] # FIX ME
48
  )
 
 
 
 
49
 
50
- response = self.gpt4_requst()
51
- description = response.choices[0].message.content
52
- return description # type: ignore[no-any-return] # FIX ME
53
 
54
 
55
  if __name__ == "__main__":
56
- processor = BaseProcessor("ocr_processor") # type: ignore[no-untyped-call] # FIX ME
57
- image_path = "../ctmai-test1.png"
58
- summary: str = processor.ask_info(query=None, image_path=image_path) # type: ignore[no-untyped-call] # FIX ME
 
 
59
  print(summary)
 
1
+ from typing import Any, Dict, Optional
2
+
3
  from openai import OpenAI
4
 
5
  from ctm.messengers.messenger_base import BaseMessenger
6
  from ctm.processors.processor_base import BaseProcessor
7
+ from ctm.utils.decorator import info_exponential_backoff
8
 
9
 
10
+ # Assuming the `register_processor` method has been updated to be properly typed:
11
+ @BaseProcessor.register_processor("gpt4_processor")
12
  class GPT4Processor(BaseProcessor):
13
+ def __init__(self, *args: Any, **kwargs: Any) -> None:
14
+ super().__init__(*args, **kwargs)
15
+
16
+ def init_task_info(self) -> None:
17
+ raise NotImplementedError(
18
+ "The 'init_task_info' method must be implemented in derived classes."
19
+ )
20
+
21
+ def init_executor(self) -> None:
22
+ self.executor = OpenAI()
23
 
24
+ def init_messenger(self) -> None:
25
+ self.messenger = BaseMessenger("gpt4_messenger")
 
 
26
 
27
+ def process(self, payload: Dict[str, Any]) -> Dict[str, Any]:
28
+ # Assume process should do something and return a dictionary
29
+ return {}
30
 
31
+ def update_info(self, feedback: str) -> None:
32
  self.messenger.add_assistant_message(feedback)
33
 
34
+ @info_exponential_backoff(retries=5, base_wait_time=1)
35
+ def gpt4_request(self) -> Any:
36
+ response = self.executor.chat.completions.create(
37
  model="gpt-4-turbo-preview",
38
+ messages=self.messenger.get_messages(),
39
  max_tokens=300,
40
  )
41
+ description = response.choices[0].message.content
42
+ return description
43
+
44
+ def ask_info(
45
+ self, query: str, text: Optional[str] = None, *args: Any, **kwargs: Any
 
 
 
 
46
  ) -> str:
47
+ if self.messenger.check_iter_round_num() == 0:
48
+ initial_message = "The text information for the previously described task is as follows: "
49
+ initial_message += (
50
+ text if text is not None else "No text provided."
 
 
51
  )
52
+ initial_message += (
53
+ " Here is what you should do: " + self.task_instruction
54
+ )
55
+ self.messenger.add_user_message(initial_message)
56
 
57
+ description = self.gpt4_request()
58
+ return description
 
59
 
60
 
61
  if __name__ == "__main__":
62
+ processor = GPT4Processor()
63
+ text = "Hugging Face has released a new version of Transformers that brings several enhancements."
64
+ summary: str = processor.ask_info(
65
+ query="Summarize the changes.", text=text
66
+ )
67
  print(summary)
ctm/processors/processor_gpt4_speaker_intent.py CHANGED
@@ -1,15 +1,25 @@
 
 
1
  from ctm.processors.processor_gpt4 import GPT4Processor
2
 
3
 
4
- @GPT4Processor.register_processor("gpt4_speaker_intent_processor") # type: ignore[no-untyped-call] # FIX ME
 
5
  class GPT4SpeakerIntentProcessor(GPT4Processor):
6
- def __init__(self, *args, **kwargs): # type: ignore[no-untyped-def] # FIX ME
7
- self.init_processor() # type: ignore[no-untyped-call] # FIX ME
 
 
 
 
8
  self.task_instruction = "You are a speaker intent predictor. You can understand the intent of the speaker and describe what is the speaker's intent for saying that. If there is no speaker detected, please answer with None."
9
 
10
 
11
  if __name__ == "__main__":
12
- processor = GPT4Processor("close_fashion_processor") # type: ignore[no-untyped-call] # FIX ME
13
- image_path = "../ctmai-test1.png"
14
- summary: str = processor.ask_info(query=None, image_path=image_path) # type: ignore[arg-type] # FIX ME
 
 
 
15
  print(summary)
 
1
+ from typing import Any
2
+
3
  from ctm.processors.processor_gpt4 import GPT4Processor
4
 
5
 
6
+ # Assuming GPT4Processor has a properly typed `register_processor` method
7
+ @GPT4Processor.register_processor("gpt4_speaker_intent_processor")
8
  class GPT4SpeakerIntentProcessor(GPT4Processor):
9
+ def __init__(self, *args: Any, **kwargs: Any) -> None:
10
+ super().__init__(
11
+ *args, **kwargs
12
+ ) # Ensure the parent constructor is called properly
13
+
14
+ def init_task_info(self) -> None:
15
  self.task_instruction = "You are a speaker intent predictor. You can understand the intent of the speaker and describe what is the speaker's intent for saying that. If there is no speaker detected, please answer with None."
16
 
17
 
18
  if __name__ == "__main__":
19
+ # Instantiate the specific subclass for speaker intent processing
20
+ processor = GPT4SpeakerIntentProcessor()
21
+ text = "I can't wait to see the results of the new project. We've put so much effort into it!"
22
+ summary: str = processor.ask_info(
23
+ query="What is the intent behind the speaker's statement?", text=text
24
+ )
25
  print(summary)
ctm/processors/processor_gpt4_text_emotion.py CHANGED
@@ -1,15 +1,21 @@
 
 
1
  from ctm.processors.processor_gpt4 import GPT4Processor
2
 
3
 
4
- @GPT4Processor.register_processor("gpt4_text_emotion_processor") # type: ignore[no-untyped-call] # FIX ME
 
5
  class GPT4TextEmotionProcessor(GPT4Processor):
6
- def __init__(self, *args, **kwargs): # type: ignore[no-untyped-def] # FIX ME
7
- self.init_processor() # type: ignore[no-untyped-call] # FIX ME
 
 
8
  self.task_instruction = "You are a text emotion classifier. You can understand the emotion within the text and generate the emotion label. If there is no text detected, please answer with None."
9
 
10
 
11
  if __name__ == "__main__":
12
- processor = GPT4Processor("close_fashion_processor") # type: ignore[no-untyped-call] # FIX ME
13
- image_path = "../ctmai-test1.png"
14
- summary: str = processor.ask_info(query=None, image_path=image_path) # type: ignore[arg-type] # FIX ME
 
15
  print(summary)
 
1
+ from typing import Any
2
+
3
  from ctm.processors.processor_gpt4 import GPT4Processor
4
 
5
 
6
+ # Assuming GPT4Processor has a properly typed `register_processor` method
7
+ @GPT4Processor.register_processor("gpt4_text_emotion_processor")
8
  class GPT4TextEmotionProcessor(GPT4Processor):
9
+ def __init__(self, *args: Any, **kwargs: Any) -> None:
10
+ super().__init__(*args, **kwargs) # Call to parent class constructor
11
+
12
+ def init_task_info(self) -> None:
13
  self.task_instruction = "You are a text emotion classifier. You can understand the emotion within the text and generate the emotion label. If there is no text detected, please answer with None."
14
 
15
 
16
  if __name__ == "__main__":
17
+ # Instantiate the specific subclass for text emotion processing
18
+ processor = GPT4TextEmotionProcessor()
19
+ text = "I am feeling great today! The sun is shining and I've got a lot of work done."
20
+ summary: str = processor.ask_info(query="Identify the emotion.", text=text)
21
  print(summary)
ctm/processors/processor_gpt4_text_summary.py CHANGED
@@ -1,15 +1,23 @@
 
 
1
  from ctm.processors.processor_gpt4 import GPT4Processor
2
 
3
 
4
- @GPT4Processor.register_processor("gpt4_text_summary_processor") # type: ignore[no-untyped-call] # FIX ME
 
5
  class GPT4TextSummaryProcessor(GPT4Processor):
6
- def __init__(self, *args, **kwargs): # type: ignore[no-untyped-def] # FIX ME
7
- self.init_processor() # type: ignore[no-untyped-call] # FIX ME
 
 
 
 
8
  self.task_instruction = "You are a text summarizer. You can understand the meaning of the text and generate the summary."
9
 
10
 
11
  if __name__ == "__main__":
12
- processor = GPT4Processor("close_fashion_processor") # type: ignore[no-untyped-call] # FIX ME
13
- image_path = "../ctmai-test1.png"
14
- summary: str = processor.ask_info(query=None, image_path=image_path) # type: ignore[arg-type] # FIX ME
 
15
  print(summary)
 
1
+ from typing import Any
2
+
3
  from ctm.processors.processor_gpt4 import GPT4Processor
4
 
5
 
6
+ # Assuming GPT4Processor has a properly typed `register_processor` method
7
+ @GPT4Processor.register_processor("gpt4_text_summary_processor")
8
  class GPT4TextSummaryProcessor(GPT4Processor):
9
+ def __init__(self, *args: Any, **kwargs: Any) -> None:
10
+ super().__init__(
11
+ *args, **kwargs
12
+ ) # Properly initialize the parent class
13
+
14
+ def init_task_info(self) -> None:
15
  self.task_instruction = "You are a text summarizer. You can understand the meaning of the text and generate the summary."
16
 
17
 
18
  if __name__ == "__main__":
19
+ # Instantiate the specific subclass for the text summarization task
20
+ processor = GPT4TextSummaryProcessor()
21
+ text = "The quick brown fox jumps over the lazy dog. This sentence contains every letter of the alphabet."
22
+ summary: str = processor.ask_info(query="Summarize the text.", text=text)
23
  print(summary)
ctm/processors/processor_gpt4v.py CHANGED
@@ -1,64 +1,79 @@
 
 
1
  from openai import OpenAI
2
 
3
  from ctm.messengers.messenger_base import BaseMessenger
4
  from ctm.processors.processor_base import BaseProcessor
5
- from ctm.utils.exponential_backoff import exponential_backoff
6
 
7
 
8
- @BaseProcessor.register_processor("gpt4v_processor") # type: ignore[no-untyped-call] # FIX ME
 
9
  class GPT4VProcessor(BaseProcessor):
10
- def __init__(self, *args, **kwargs): # type: ignore[no-untyped-def] # FIX ME
11
- self.init_processor() # type: ignore[no-untyped-call] # FIX ME
12
- self.task_instruction = None
 
 
 
 
 
13
 
14
- def init_processor(self): # type: ignore[no-untyped-def] # FIX ME
15
- self.model = OpenAI()
16
- self.messenger = BaseMessenger("gpt4v_messenger") # type: ignore[no-untyped-call] # FIX ME
17
- return
18
 
19
- def process(self, payload: dict) -> dict: # type: ignore[type-arg] # FIX ME
20
- return # type: ignore[return-value] # FIX ME
21
 
22
- def update_info(self, feedback: str): # type: ignore[no-untyped-def] # FIX ME
23
  self.messenger.add_assistant_message(feedback)
24
 
25
- @exponential_backoff(retries=5, base_wait_time=1) # type: ignore[no-untyped-call] # FIX ME
26
- def gpt4v_requst(self): # type: ignore[no-untyped-def] # FIX ME
27
- response = self.model.chat.completions.create(
28
  model="gpt-4-vision-preview",
29
- messages=self.messenger.get_messages(), # type: ignore[no-untyped-call] # FIX ME
30
  max_tokens=300,
31
  )
32
- return response
 
33
 
34
- def ask_info( # type: ignore[override] # FIX ME
35
  self,
36
  query: str,
37
- context: str = None, # type: ignore[assignment] # FIX ME
38
- image_path: str = None, # type: ignore[assignment] # FIX ME
39
- audio_path: str = None, # type: ignore[assignment] # FIX ME
40
- video_path: str = None, # type: ignore[assignment] # FIX ME
 
41
  ) -> str:
42
- if self.messenger.check_iter_round_num() == 0: # type: ignore[no-untyped-call] # FIX ME
43
- image = self.process_image(image_path) # type: ignore[no-untyped-call] # FIX ME
44
- # image = '0'
45
- self.messenger.add_user_message(
46
- [
47
- {"type": "text", "text": self.task_instruction},
 
 
 
 
48
  {
49
  "type": "image_url",
50
  "image_url": f"data:image/jpeg;base64,{image}",
51
- },
52
- ]
53
- )
54
 
55
- response = self.gpt4v_requst()
56
- description = response.choices[0].message.content
57
- return description # type: ignore[no-any-return] # FIX ME
58
 
59
 
60
  if __name__ == "__main__":
61
- processor = BaseProcessor("ocr_processor") # type: ignore[no-untyped-call] # FIX ME
62
  image_path = "../ctmai-test1.png"
63
- summary: str = processor.ask_info(query=None, image_path=image_path) # type: ignore[no-untyped-call] # FIX ME
 
 
64
  print(summary)
 
1
+ from typing import Any, Dict, List, Optional, Union
2
+
3
  from openai import OpenAI
4
 
5
  from ctm.messengers.messenger_base import BaseMessenger
6
  from ctm.processors.processor_base import BaseProcessor
7
+ from ctm.utils.decorator import info_exponential_backoff
8
 
9
 
10
+ # Ensure that BaseProcessor has a properly typed register_processor method:
11
+ @BaseProcessor.register_processor("gpt4v_processor")
12
  class GPT4VProcessor(BaseProcessor):
13
+ def __init__(self, *args: Any, **kwargs: Any) -> None:
14
+ super().__init__(*args, **kwargs) # Properly initialize the base class
15
+
16
+ def init_executor(self) -> None:
17
+ self.executor = OpenAI()
18
+
19
+ def init_messenger(self) -> None:
20
+ self.messenger = BaseMessenger("gpt4v_messenger")
21
 
22
+ def init_task_info(self) -> None:
23
+ raise NotImplementedError(
24
+ "The 'init_task_info' method must be implemented in derived classes."
25
+ )
26
 
27
+ def process(self, payload: Dict[str, Any]) -> Dict[str, Any]:
28
+ return {} # Return an empty dict or a meaningful response as required
29
 
30
+ def update_info(self, feedback: str) -> None:
31
  self.messenger.add_assistant_message(feedback)
32
 
33
+ @info_exponential_backoff(retries=5, base_wait_time=1)
34
+ def gpt4v_request(self) -> str | Any:
35
+ response = self.executor.chat.completions.create(
36
  model="gpt-4-vision-preview",
37
+ messages=self.messenger.get_messages(),
38
  max_tokens=300,
39
  )
40
+ description = response.choices[0].message.content
41
+ return description
42
 
43
+ def ask_info(
44
  self,
45
  query: str,
46
+ text: Optional[str] = None,
47
+ image: Optional[str] = None,
48
+ video_frames: Optional[str] = None,
49
+ *args: Any,
50
+ **kwargs: Any,
51
  ) -> str:
52
+ if self.messenger.check_iter_round_num() == 0:
53
+ messages: List[Dict[str, Union[str, Dict[str, str]]]] = [
54
+ {
55
+ "type": "text",
56
+ "text": self.task_instruction
57
+ or "No instruction provided.",
58
+ },
59
+ ]
60
+ if image:
61
+ messages.append(
62
  {
63
  "type": "image_url",
64
  "image_url": f"data:image/jpeg;base64,{image}",
65
+ }
66
+ )
67
+ self.messenger.add_user_message(messages)
68
 
69
+ description = self.gpt4v_request()
70
+ return description
 
71
 
72
 
73
  if __name__ == "__main__":
74
+ processor = GPT4VProcessor()
75
  image_path = "../ctmai-test1.png"
76
+ summary: str = processor.ask_info(
77
+ query="Describe the image.", image=image_path
78
+ )
79
  print(summary)
ctm/processors/processor_gpt4v_cloth_fashion.py CHANGED
@@ -1,15 +1,24 @@
 
 
1
  from ctm.processors.processor_gpt4v import GPT4VProcessor
2
 
3
 
4
- @GPT4VProcessor.register_processor("gpt4v_cloth_fashion_processor") # type: ignore[no-untyped-call] # FIX ME
 
5
  class GPT4VClothFashionProcessor(GPT4VProcessor):
6
- def __init__(self, *args, **kwargs): # type: ignore[no-untyped-def] # FIX ME
7
- self.init_processor() # type: ignore[no-untyped-call] # FIX ME
 
 
8
  self.task_instruction = "Focus on the cloth of people in the image, describe the style of the cloth fashion. If there is no people detected, please answer with None."
9
 
10
 
11
  if __name__ == "__main__":
12
- processor = GPT4VProcessor("close_fashion_processor") # type: ignore[no-untyped-call] # FIX ME
 
13
  image_path = "../ctmai-test1.png"
14
- summary: str = processor.ask_info(query=None, image_path=image_path) # type: ignore[arg-type] # FIX ME
 
 
 
15
  print(summary)
 
1
+ from typing import Any
2
+
3
  from ctm.processors.processor_gpt4v import GPT4VProcessor
4
 
5
 
6
+ # Assuming GPT4VProcessor has a properly typed `register_processor` method
7
+ @GPT4VProcessor.register_processor("gpt4v_cloth_fashion_processor")
8
  class GPT4VClothFashionProcessor(GPT4VProcessor):
9
+ def __init__(self, *args: Any, **kwargs: Any) -> None:
10
+ super().__init__(*args, **kwargs) # Call to parent class constructor
11
+
12
+ def init_task_info(self) -> None:
13
  self.task_instruction = "Focus on the cloth of people in the image, describe the style of the cloth fashion. If there is no people detected, please answer with None."
14
 
15
 
16
  if __name__ == "__main__":
17
+ # Instantiate the specific subclass for the cloth fashion task
18
+ processor = GPT4VClothFashionProcessor()
19
  image_path = "../ctmai-test1.png"
20
+ # Providing a valid query and ensuring `ask_info` is correctly implemented in the base class
21
+ summary: str = processor.ask_info(
22
+ query="Describe the fashion style", image=image_path
23
+ )
24
  print(summary)
ctm/processors/processor_gpt4v_face_emotion.py CHANGED
@@ -1,15 +1,26 @@
 
 
1
  from ctm.processors.processor_gpt4v import GPT4VProcessor
2
 
3
 
4
- @GPT4VProcessor.register_processor("gpt4v_face_emotion_processor") # type: ignore[no-untyped-call] # FIX ME
 
5
  class GPT4VFaceEmotionProcessor(GPT4VProcessor):
6
- def __init__(self, *args, **kwargs): # type: ignore[no-untyped-def] # FIX ME
7
- self.init_processor() # type: ignore[no-untyped-call] # FIX ME
 
 
 
 
8
  self.task_instruction = "Besides the main scene in the image, can you describe the face emotion that is on people's faces within this picture?"
9
 
10
 
11
  if __name__ == "__main__":
12
- processor = GPT4VProcessor("face_emotion_processor") # type: ignore[no-untyped-call] # FIX ME
 
13
  image_path = "../ctmai-test1.png"
14
- summary: str = processor.ask_info(query=None, image_path=image_path) # type: ignore[arg-type] # FIX ME
 
 
 
15
  print(summary)
 
1
+ from typing import Any
2
+
3
  from ctm.processors.processor_gpt4v import GPT4VProcessor
4
 
5
 
6
+ # Assume register_processor method has been properly typed
7
+ @GPT4VProcessor.register_processor("gpt4v_face_emotion_processor")
8
  class GPT4VFaceEmotionProcessor(GPT4VProcessor):
9
+ def __init__(self, *args: Any, **kwargs: Any) -> None:
10
+ super().__init__(
11
+ *args, **kwargs
12
+ ) # Properly initialize the parent class
13
+
14
+ def init_task_info(self) -> None:
15
  self.task_instruction = "Besides the main scene in the image, can you describe the face emotion that is on people's faces within this picture?"
16
 
17
 
18
  if __name__ == "__main__":
19
+ # Instantiate the specific subclass for face emotion processing
20
+ processor = GPT4VFaceEmotionProcessor()
21
  image_path = "../ctmai-test1.png"
22
+ # Providing a valid query and ensuring that the method ask_info accepts the correct parameters
23
+ summary: str = processor.ask_info(
24
+ query="Describe face emotions", image=image_path
25
+ )
26
  print(summary)
ctm/processors/processor_gpt4v_ocr.py CHANGED
@@ -1,15 +1,24 @@
 
 
1
  from ctm.processors.processor_gpt4v import GPT4VProcessor
2
 
3
 
4
- @GPT4VProcessor.register_processor("gpt4v_ocr_processor") # type: ignore[no-untyped-call] # FIX ME
 
5
  class GPT4VOCRProcessor(GPT4VProcessor):
6
- def __init__(self, *args, **kwargs): # type: ignore[no-untyped-def] # FIX ME
7
- self.init_processor() # type: ignore[no-untyped-call] # FIX ME
 
 
 
 
8
  self.task_instruction = "You should act like an OCR model. Please extract the text from the image. If there is no text detected, please answer with None."
9
 
10
 
11
  if __name__ == "__main__":
12
- processor = GPT4VProcessor("ocr_processor") # type: ignore[no-untyped-call] # FIX ME
 
13
  image_path = "../ctmai-test1.png"
14
- summary: str = processor.ask_info(query=None, image_path=image_path) # type: ignore[arg-type] # FIX ME
 
15
  print(summary)
 
1
+ from typing import Any
2
+
3
  from ctm.processors.processor_gpt4v import GPT4VProcessor
4
 
5
 
6
+ # Correct the registration method to include type checking if possible in the GPT4VProcessor class
7
+ @GPT4VProcessor.register_processor("gpt4v_ocr_processor")
8
  class GPT4VOCRProcessor(GPT4VProcessor):
9
+ def __init__(self, *args: Any, **kwargs: Any) -> None:
10
+ super().__init__(
11
+ *args, **kwargs
12
+ ) # Ensure the parent constructor is called properly
13
+
14
+ def init_task_info(self) -> None:
15
  self.task_instruction = "You should act like an OCR model. Please extract the text from the image. If there is no text detected, please answer with None."
16
 
17
 
18
  if __name__ == "__main__":
19
+ # Ensure that we're instantiating the correct processor for the job
20
+ processor = GPT4VOCRProcessor()
21
  image_path = "../ctmai-test1.png"
22
+ # Provide a valid query string; ensure `ask_info` can handle all provided parameters
23
+ summary: str = processor.ask_info(query="Extract text", image=image_path)
24
  print(summary)
ctm/processors/processor_gpt4v_posture.py CHANGED
@@ -1,15 +1,25 @@
 
 
1
  from ctm.processors.processor_gpt4v import GPT4VProcessor
2
 
3
 
4
- @GPT4VProcessor.register_processor("gpt4v_posture_processor") # type: ignore[no-untyped-call] # FIX ME
 
5
  class GPT4VPostureProcessor(GPT4VProcessor):
6
- def __init__(self, *args, **kwargs): # type: ignore[no-untyped-def] # FIX ME
7
- self.init_processor() # type: ignore[no-untyped-call] # FIX ME
 
 
 
 
8
  self.task_instruction = "Besides the main scene in the image, can you describe the posture that is going on within this picture?"
9
 
10
 
11
  if __name__ == "__main__":
12
- processor = GPT4VProcessor("posture_processor") # type: ignore[no-untyped-call] # FIX ME
 
13
  image_path = "../ctmai-test1.png"
14
- summary: str = processor.ask_info(query=None, image_path=image_path) # type: ignore[arg-type] # FIX ME
 
 
15
  print(summary)
 
1
+ from typing import Any
2
+
3
  from ctm.processors.processor_gpt4v import GPT4VProcessor
4
 
5
 
6
+ # Assuming the GPT4VProcessor has a properly typed `register_processor` method:
7
+ @GPT4VProcessor.register_processor("gpt4v_posture_processor")
8
  class GPT4VPostureProcessor(GPT4VProcessor):
9
+ def __init__(self, *args: Any, **kwargs: Any) -> None:
10
+ super().__init__(
11
+ *args, **kwargs
12
+ ) # Properly call the parent's constructor
13
+
14
+ def init_task_info(self) -> None:
15
  self.task_instruction = "Besides the main scene in the image, can you describe the posture that is going on within this picture?"
16
 
17
 
18
  if __name__ == "__main__":
19
+ # Instantiate the specific subclass for the posture analysis task
20
+ processor = GPT4VPostureProcessor()
21
  image_path = "../ctmai-test1.png"
22
+ summary: str = processor.ask_info(
23
+ query="Analyze the posture.", image=image_path
24
+ )
25
  print(summary)
ctm/processors/processor_gpt4v_scene_location.py CHANGED
@@ -1,15 +1,24 @@
 
 
1
  from ctm.processors.processor_gpt4v import GPT4VProcessor
2
 
3
 
4
- @GPT4VProcessor.register_processor("gpt4v_scene_location_processor") # type: ignore[no-untyped-call] # FIX ME
 
5
  class GPT4VSceneLocationProcessor(GPT4VProcessor):
6
- def __init__(self, *args, **kwargs): # type: ignore[no-untyped-def] # FIX ME
7
- self.init_processor() # type: ignore[no-untyped-call] # FIX ME
 
 
8
  self.task_instruction = "Besides the main activity in the image, can you describe the potential location or the event that is going on within this picture?"
9
 
10
 
11
  if __name__ == "__main__":
12
- processor = GPT4VProcessor("scene_location_processor") # type: ignore[no-untyped-call] # FIX ME
 
13
  image_path = "../ctmai-test1.png"
14
- summary: str = processor.ask_info(query=None, image_path=image_path) # type: ignore[arg-type] # FIX ME
 
 
 
15
  print(summary)
 
1
+ from typing import Any
2
+
3
  from ctm.processors.processor_gpt4v import GPT4VProcessor
4
 
5
 
6
+ # Assuming GPT4VProcessor has a properly typed `register_processor` method:
7
+ @GPT4VProcessor.register_processor("gpt4v_scene_location_processor")
8
  class GPT4VSceneLocationProcessor(GPT4VProcessor):
9
+ def __init__(self, *args: Any, **kwargs: Any) -> None:
10
+ super().__init__(*args, **kwargs) # Initialize the parent processor
11
+
12
+ def init_task_info(self) -> None:
13
  self.task_instruction = "Besides the main activity in the image, can you describe the potential location or the event that is going on within this picture?"
14
 
15
 
16
  if __name__ == "__main__":
17
+ # Instantiate the specific subclass for the scene location task
18
+ processor = GPT4VSceneLocationProcessor()
19
  image_path = "../ctmai-test1.png"
20
+ # The `ask_info` method should also be corrected to include all necessary parameters properly typed.
21
+ summary: str = processor.ask_info(
22
+ query="Describe the scene and location.", image=image_path
23
+ )
24
  print(summary)
ctm/processors/processor_roberta_text_sentiment.py CHANGED
@@ -1,46 +1,46 @@
 
1
  import os
 
2
 
3
- from huggingface_hub.inference_api import (
4
- InferenceApi, # type: ignore[import] # FIX ME
5
- )
6
 
7
- from ctm.messengers.messenger_base import BaseMessenger
8
- from ctm.processors.processor_base import BaseProcessor
9
 
10
 
11
- @BaseProcessor.register_processor("roberta_text_sentiment_processor") # type: ignore[no-untyped-call] # FIX ME
12
  class RobertaTextSentimentProcessor(BaseProcessor):
13
- def __init__(self, *args, **kwargs): # type: ignore[no-untyped-def] # FIX ME
14
- self.init_processor() # type: ignore[no-untyped-call] # FIX ME
15
 
16
- def init_processor(self): # type: ignore[no-untyped-def] # FIX ME
17
- self.model = InferenceApi(
18
- token=os.environ["HF_TOKEN"],
19
- repo_id="cardiffnlp/twitter-roberta-base-sentiment-latest",
20
- )
21
- self.messenger = BaseMessenger("roberta_text_sentiment_messenger") # type: ignore[no-untyped-call] # FIX ME
22
- return
23
 
24
- def update_info(self, feedback: str): # type: ignore[no-untyped-def] # FIX ME
 
 
 
25
  self.messenger.add_assistant_message(feedback)
26
 
27
- def ask_info( # type: ignore[override] # FIX ME
28
  self,
29
- query: str,
30
- context: str = None, # type: ignore[assignment] # FIX ME
31
- image_path: str = None, # type: ignore[assignment] # FIX ME
32
- audio_path: str = None, # type: ignore[assignment] # FIX ME
33
- video_path: str = None, # type: ignore[assignment] # FIX ME
34
  ) -> str:
35
- if self.messenger.check_iter_round_num() == 0: # type: ignore[no-untyped-call] # FIX ME
36
- self.messenger.add_user_message(context)
37
-
38
- response = self.model(self.messenger.get_messages()) # type: ignore[no-untyped-call] # FIX ME
39
- results = response[0]
40
- # choose the label with the highest score
41
- pos_score = 0
42
- neg_score = 0
43
- neutral_score = 0
 
 
 
44
  for result in results:
45
  if result["label"] == "POSITIVE":
46
  pos_score = result["score"]
@@ -48,23 +48,11 @@ class RobertaTextSentimentProcessor(BaseProcessor):
48
  neg_score = result["score"]
49
  else:
50
  neutral_score = result["score"]
51
- if max(pos_score, neg_score, neutral_score) == pos_score:
 
 
 
52
  return "This text is positive."
53
- elif max(pos_score, neg_score, neutral_score) == neg_score:
54
  return "This text is negative."
55
- else:
56
- return "This text is neutral."
57
-
58
-
59
- if __name__ == "__main__":
60
- processor = BaseProcessor("roberta_text_sentiment_processor") # type: ignore[no-untyped-call] # FIX ME
61
- image_path = "../ctmai-test1.png"
62
- text: str = (
63
- "In a shocking turn of events, Hugging Face has released a new version of Transformers "
64
- "that brings several enhancements and bug fixes. Users are thrilled with the improvements "
65
- "and are finding the new version to be significantly better than the previous one. "
66
- "The Hugging Face team is thankful for the community's support and continues to work "
67
- "towards making the library the best it can be."
68
- )
69
- label = processor.ask_info(query=None, context=text, image_path=image_path) # type: ignore[no-untyped-call] # FIX ME
70
- print(label)
 
1
+ import json
2
  import os
3
+ from typing import Any, Dict, Optional
4
 
5
+ from huggingface_hub import InferenceClient
 
 
6
 
7
+ from ..messengers.messenger_base import BaseMessenger
8
+ from .processor_base import BaseProcessor
9
 
10
 
11
+ @BaseProcessor.register_processor("roberta_text_sentiment_processor")
12
  class RobertaTextSentimentProcessor(BaseProcessor):
13
+ def init_executor(self) -> None:
14
+ self.executor = InferenceClient(token=os.environ["HF_TOKEN"])
15
 
16
+ def init_task_info(self) -> None:
17
+ pass
 
 
 
 
 
18
 
19
+ def init_messenger(self) -> None:
20
+ self.messenger = BaseMessenger("roberta_text_sentiment_messenger")
21
+
22
+ def update_info(self, feedback: str) -> None:
23
  self.messenger.add_assistant_message(feedback)
24
 
25
+ def ask_info(
26
  self,
27
+ query: Optional[str],
28
+ text: Optional[str] = None,
29
+ *args: Any,
30
+ **kwargs: Any,
 
31
  ) -> str:
32
+ if text and self.messenger.check_iter_round_num() == 0:
33
+ self.messenger.add_user_message(text)
34
+
35
+ results: Dict[str, Any] = json.loads(
36
+ self.executor.post(
37
+ json={"inputs": self.messenger.get_messages()},
38
+ model="cardiffnlp/twitter-roberta-base-sentiment-latest",
39
+ )
40
+ )[0]
41
+ pos_score = (
42
+ neg_score
43
+ ) = neutral_score = 0.0 # Initialize scores as floats
44
  for result in results:
45
  if result["label"] == "POSITIVE":
46
  pos_score = result["score"]
 
48
  neg_score = result["score"]
49
  else:
50
  neutral_score = result["score"]
51
+
52
+ # Simplified decision structure
53
+ max_score = max(pos_score, neg_score, neutral_score)
54
+ if max_score == pos_score:
55
  return "This text is positive."
56
+ elif max_score == neg_score:
57
  return "This text is negative."
58
+ return "This text is neutral."
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ctm/supervisors/__init__.py CHANGED
@@ -1,5 +1,7 @@
1
- from .supervisor_gpt4 import GPT4Supervisior
 
2
 
3
  __all__ = [
4
- "GPT4Supervisior",
 
5
  ]
 
1
+ from ctm.supervisors.supervisor_base import BaseSupervisor
2
+ from ctm.supervisors.supervisor_gpt4 import GPT4Supervisor
3
 
4
  __all__ = [
5
+ "GPT4Supervisor",
6
+ "BaseSupervisor",
7
  ]
ctm/supervisors/supervisor_base.py CHANGED
@@ -1,18 +1,21 @@
1
  import base64
 
2
 
3
 
4
  class BaseSupervisor(object):
5
- _supervisor_registry = {}
6
 
7
  @classmethod
8
- def register_supervisor(cls, supervisor_name):
9
- def decorator(subclass):
 
 
10
  cls._supervisor_registry[supervisor_name] = subclass
11
  return subclass
12
 
13
  return decorator
14
 
15
- def __new__(cls, supervisor_name, *args, **kwargs):
16
  if supervisor_name not in cls._supervisor_registry:
17
  raise ValueError(
18
  f"No supervisor registered with name '{supervisor_name}'"
@@ -21,31 +24,24 @@ class BaseSupervisor(object):
21
  cls._supervisor_registry[supervisor_name]
22
  )
23
 
24
- def set_model(self):
 
 
25
  raise NotImplementedError(
26
  "The 'set_model' method must be implemented in derived classes."
27
  )
28
 
29
- @staticmethod
30
- def process_image(image_path):
31
- with open(image_path, "rb") as image_file:
32
- return base64.b64encode(image_file.read()).decode("utf-8")
33
-
34
- @staticmethod
35
- def process_audio(audio_path):
36
- return None
37
-
38
- @staticmethod
39
- def process_video(video_path):
40
- return None
41
-
42
- def ask(self, query, image_path):
43
  gist = self.ask_info(query, image_path)
44
  score = self.ask_score(query, gist, verbose=True)
45
  return gist, score
46
 
47
- def ask_info(self, query: str, context: str = None) -> str:
48
- return None
 
 
49
 
50
  def ask_score(self, query: str, gist: str, verbose: bool = False) -> float:
51
- return None
 
 
 
1
  import base64
2
+ from typing import Any, Dict, Optional, Tuple, Type
3
 
4
 
5
  class BaseSupervisor(object):
6
+ _supervisor_registry: Dict[str, Type["BaseSupervisor"]] = {}
7
 
8
  @classmethod
9
+ def register_supervisor(cls, supervisor_name: str) -> Any:
10
+ def decorator(
11
+ subclass: Type["BaseSupervisor"],
12
+ ) -> Type["BaseSupervisor"]:
13
  cls._supervisor_registry[supervisor_name] = subclass
14
  return subclass
15
 
16
  return decorator
17
 
18
+ def __new__(cls, supervisor_name: str, *args: Any, **kwargs: Any) -> Any:
19
  if supervisor_name not in cls._supervisor_registry:
20
  raise ValueError(
21
  f"No supervisor registered with name '{supervisor_name}'"
 
24
  cls._supervisor_registry[supervisor_name]
25
  )
26
 
27
+ def set_model(
28
+ self,
29
+ ) -> None:
30
  raise NotImplementedError(
31
  "The 'set_model' method must be implemented in derived classes."
32
  )
33
 
34
+ def ask(self, query: str, image_path: str) -> Tuple[str, float]:
 
 
 
 
 
 
 
 
 
 
 
 
 
35
  gist = self.ask_info(query, image_path)
36
  score = self.ask_score(query, gist, verbose=True)
37
  return gist, score
38
 
39
+ def ask_info(self, query: str, context: Optional[str] = None) -> str:
40
+ raise NotImplementedError(
41
+ "The 'ask_info' method must be implemented in derived classes."
42
+ )
43
 
44
  def ask_score(self, query: str, gist: str, verbose: bool = False) -> float:
45
+ raise NotImplementedError(
46
+ "The 'ask_score' method must be implemented in derived classes."
47
+ )
ctm/supervisors/supervisor_gpt4.py CHANGED
@@ -1,32 +1,49 @@
 
 
1
  from openai import OpenAI
2
 
3
- from ctm.supervisors.supervisor_base import BaseSupervisor
4
- from ctm.utils.exponential_backoff import exponential_backoff
5
 
6
 
7
  @BaseSupervisor.register_supervisor("gpt4_supervisor")
8
- class GPT4Supervisior(BaseSupervisor):
9
- def __init__(self, *args, **kwargs):
10
  self.init_supervisor()
11
 
12
- def init_supervisor(self):
13
  self.model = OpenAI()
14
 
15
- @exponential_backoff(retries=5, base_wait_time=1)
16
- def ask_info(self, query: str, context: str = None) -> str:
17
- prompt = [
18
- {
19
- "role": "user",
20
- "content": f"The following is detailed information on the topic: {context}. Based on this information, answer the question: {query}. Answer with a few words:",
21
- }
22
- ]
23
  responses = self.model.chat.completions.create(
24
- model="gpt-4-turbo-preview", messages=prompt, max_tokens=300, n=1
 
 
 
 
 
 
 
 
 
 
 
 
 
25
  )
26
- answer = responses.choices[0].message.content
27
  return answer
28
 
29
- def ask_score(self, query, gist, verbose=False, *args, **kwargs):
 
 
 
 
 
 
 
 
30
  max_attempts = 5
31
  for attempt in range(max_attempts):
32
  try:
@@ -35,14 +52,16 @@ class GPT4Supervisior(BaseSupervisor):
35
  messages=[
36
  {
37
  "role": "user",
38
- "content": "How related is the information ({}) with the query ({})? We want to make sure that the information includes a person's name as the answer. Answer with a number from 0 to 5 and do not add any other thing.".format(
39
- gist, query
40
- ),
41
  },
42
  ],
43
  max_tokens=50,
44
  )
45
- score = int(response.choices[0].message.content.strip()) / 5
 
 
 
 
46
  return score
47
  except Exception as e:
48
  print(f"Attempt {attempt + 1} failed: {e}")
@@ -50,11 +69,4 @@ class GPT4Supervisior(BaseSupervisor):
50
  print("Retrying...")
51
  else:
52
  print("Max attempts reached. Returning default score.")
53
- return 0
54
-
55
-
56
- if __name__ == "__main__":
57
- supervisor = BaseSupervisor("cloth_fashion_supervisor")
58
- image_path = "../ctmai-test1.png"
59
- summary: str = supervisor.ask_info(query=None, image_path=image_path)
60
- print(summary)
 
1
+ from typing import Any, Optional
2
+
3
  from openai import OpenAI
4
 
5
+ from ..utils import info_exponential_backoff, score_exponential_backoff
6
+ from .supervisor_base import BaseSupervisor
7
 
8
 
9
  @BaseSupervisor.register_supervisor("gpt4_supervisor")
10
+ class GPT4Supervisor(BaseSupervisor):
11
+ def __init__(self, *args: Any, **kwargs: Any) -> None:
12
  self.init_supervisor()
13
 
14
+ def init_supervisor(self) -> None:
15
  self.model = OpenAI()
16
 
17
+ @info_exponential_backoff(retries=5, base_wait_time=1)
18
+ def ask_info(self, query: str, context: Optional[str] = None) -> str | Any:
19
+
 
 
 
 
 
20
  responses = self.model.chat.completions.create(
21
+ model="gpt-4-turbo-preview",
22
+ messages=[
23
+ {
24
+ "role": "user",
25
+ "content": f"The following is detailed information on the topic: {context}. Based on this information, answer the question: {query}. Answer with a few words:",
26
+ }
27
+ ],
28
+ max_tokens=300,
29
+ n=1,
30
+ )
31
+ answer = (
32
+ responses.choices[0].message.content
33
+ if responses.choices[0].message.content
34
+ else "FAILED"
35
  )
 
36
  return answer
37
 
38
+ @score_exponential_backoff(retries=5, base_wait_time=1)
39
+ def ask_score(
40
+ self,
41
+ query: str,
42
+ gist: str,
43
+ verbose: bool = False,
44
+ *args: Any,
45
+ **kwargs: Any,
46
+ ) -> float:
47
  max_attempts = 5
48
  for attempt in range(max_attempts):
49
  try:
 
52
  messages=[
53
  {
54
  "role": "user",
55
+ "content": f"How related is the information ({gist}) with the query ({query})? We want to make sure that the information includes a person's name as the answer. Answer with a number from 0 to 5 and do not add any other thing.",
 
 
56
  },
57
  ],
58
  max_tokens=50,
59
  )
60
+ score = (
61
+ float(response.choices[0].message.content.strip()) / 5
62
+ if response.choices[0].message.content
63
+ else 0.0
64
+ )
65
  return score
66
  except Exception as e:
67
  print(f"Attempt {attempt + 1} failed: {e}")
 
69
  print("Retrying...")
70
  else:
71
  print("Max attempts reached. Returning default score.")
72
+ return 0.0
 
 
 
 
 
 
 
ctm/utils/__init__.py CHANGED
@@ -0,0 +1,13 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from .decorator import (
2
+ info_exponential_backoff,
3
+ score_exponential_backoff,
4
+ )
5
+ from .loader import load_audio, load_image, load_video
6
+
7
+ __all__ = [
8
+ "score_exponential_backoff",
9
+ "info_exponential_backoff",
10
+ "load_audio",
11
+ "load_image",
12
+ "load_video",
13
+ ]
ctm/utils/{exponential_backoff.py → decorator.py} RENAMED
@@ -1,18 +1,55 @@
1
  import math
2
  import time
3
  from functools import wraps
 
4
 
 
5
 
6
- def exponential_backoff(retries=5, base_wait_time=1): # type: ignore[no-untyped-def] # FIX ME
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
7
  """
8
  Decorator for applying exponential backoff to a function.
9
  :param retries: Maximum number of retries.
10
  :param base_wait_time: Base wait time in seconds for the exponential backoff.
11
  """
12
 
13
- def decorator(func): # type: ignore[no-untyped-def] # FIX ME
14
  @wraps(func)
15
- def wrapper(*args, **kwargs): # type: ignore[no-untyped-def] # FIX ME
16
  attempts = 0
17
  while attempts < retries:
18
  try:
@@ -26,7 +63,7 @@ def exponential_backoff(retries=5, base_wait_time=1): # type: ignore[no-untyped
26
  print(
27
  f"Failed to execute '{func.__name__}' after {retries} retries."
28
  )
29
- return None
30
 
31
  return wrapper
32
 
 
1
  import math
2
  import time
3
  from functools import wraps
4
+ from typing import Any, Callable, Optional
5
 
6
+ INF = float(math.inf)
7
 
8
+
9
+ def info_exponential_backoff(
10
+ retries: int = 5, base_wait_time: int = 1
11
+ ) -> Callable[[Callable[..., str]], Callable[..., str]]:
12
+ """
13
+ Decorator for applying exponential backoff to a function.
14
+ :param retries: Maximum number of retries.
15
+ :param base_wait_time: Base wait time in seconds for the exponential backoff.
16
+ """
17
+
18
+ def decorator(func: Callable[..., str]) -> Callable[..., str]:
19
+ @wraps(func)
20
+ def wrapper(*args: Any, **kwargs: Any) -> str:
21
+ attempts = 0
22
+ while attempts < retries:
23
+ try:
24
+ return func(*args, **kwargs)
25
+ except Exception as e:
26
+ wait_time = base_wait_time * (2**attempts)
27
+ print(f"Attempt {attempts + 1} failed: {e}")
28
+ print(f"Waiting {wait_time} seconds before retrying...")
29
+ time.sleep(wait_time)
30
+ attempts += 1
31
+ print(
32
+ f"Failed to execute '{func.__name__}' after {retries} retries."
33
+ )
34
+ return "FAILED"
35
+
36
+ return wrapper
37
+
38
+ return decorator
39
+
40
+
41
+ def score_exponential_backoff(
42
+ retries: int = 5, base_wait_time: int = 1
43
+ ) -> Callable[[Callable[..., float]], Callable[..., float]]:
44
  """
45
  Decorator for applying exponential backoff to a function.
46
  :param retries: Maximum number of retries.
47
  :param base_wait_time: Base wait time in seconds for the exponential backoff.
48
  """
49
 
50
+ def decorator(func: Callable[..., float]) -> Callable[..., float]:
51
  @wraps(func)
52
+ def wrapper(*args: Any, **kwargs: Any) -> float:
53
  attempts = 0
54
  while attempts < retries:
55
  try:
 
63
  print(
64
  f"Failed to execute '{func.__name__}' after {retries} retries."
65
  )
66
+ return -INF
67
 
68
  return wrapper
69
 
ctm/utils/loader.py ADDED
@@ -0,0 +1,41 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import base64
2
+ from typing import Any, List, Tuple
3
+
4
+ import numpy as np
5
+ from numpy.typing import NDArray
6
+
7
+
8
+ def load_audio(audio_path: str) -> Tuple[NDArray[np.float32], int]:
9
+ import librosa
10
+
11
+ audio, sr = librosa.load(audio_path, sr=None)
12
+ import pdb
13
+
14
+ pdb.set_trace() # Debugging breakpoint
15
+ return (audio.astype(np.float32), int(sr))
16
+
17
+
18
+ def load_image(image_path: str) -> str:
19
+ with open(image_path, "rb") as image_file:
20
+ encoded_image = base64.b64encode(image_file.read()).decode("utf-8")
21
+ return encoded_image
22
+
23
+
24
+ def load_video(video_path: str, frame_num: int = 5) -> List[NDArray[np.uint8]]:
25
+ import cv2
26
+
27
+ cap = cv2.VideoCapture(video_path)
28
+ frames: List[np.ndarray[np.uint8, Any]] = []
29
+ try:
30
+ while True:
31
+ ret, frame = cap.read()
32
+ if not ret:
33
+ break
34
+ frames.append(frame.astype(np.uint8))
35
+ finally:
36
+ cap.release()
37
+
38
+ if len(frames) >= frame_num:
39
+ step = len(frames) // frame_num
40
+ frames = [frames[i] for i in range(0, len(frames), step)]
41
+ return frames