Jon Taylor commited on
Commit
4d3af55
·
1 Parent(s): 0b1146e

fixed bot thread loop

Browse files
Files changed (4) hide show
  1. app/auth.py +3 -0
  2. app/bot.py +125 -119
  3. requirements.txt +2 -1
  4. server.py +11 -13
app/auth.py CHANGED
@@ -8,6 +8,7 @@ import os
8
 
9
  load_dotenv()
10
 
 
11
  def get_meeting_token(room_name, daily_api_key, token_expiry):
12
  api_path = os.getenv('DAILY_API_PATH') or 'https://api.daily.co/v1'
13
 
@@ -18,6 +19,8 @@ def get_meeting_token(room_name, daily_api_key, token_expiry):
18
  json={'properties': {'room_name': room_name, 'is_owner': True, 'exp': token_expiry}})
19
  if res.status_code != 200:
20
  return json.dumps({'error': 'Unable to create meeting token', 'detail': res.text}), 500
 
 
21
  meeting_token = res.json()['token']
22
  return meeting_token
23
 
 
8
 
9
  load_dotenv()
10
 
11
+
12
  def get_meeting_token(room_name, daily_api_key, token_expiry):
13
  api_path = os.getenv('DAILY_API_PATH') or 'https://api.daily.co/v1'
14
 
 
19
  json={'properties': {'room_name': room_name, 'is_owner': True, 'exp': token_expiry}})
20
  if res.status_code != 200:
21
  return json.dumps({'error': 'Unable to create meeting token', 'detail': res.text}), 500
22
+
23
+ #@TODO handle errors here
24
  meeting_token = res.json()['token']
25
  return meeting_token
26
 
app/bot.py CHANGED
@@ -1,8 +1,11 @@
1
  import argparse
 
 
 
2
  import logging
3
  import os
4
- import time
5
- from threading import Thread
6
  from typing import Any, Mapping
7
 
8
  from daily import EventHandler, CallClient, Daily
@@ -13,131 +16,112 @@ from auth import get_meeting_token, get_room_name
13
 
14
  load_dotenv()
15
 
16
- class DailyLLM(EventHandler):
17
  def __init__(
18
  self,
19
- room_url=os.getenv("DAILY_URL"),
20
- token=os.getenv("DAILY_TOKEN"),
21
- bot_name="TestBot",
 
22
  ):
23
- duration = os.getenv("BOT_MAX_DURATION")
24
- if not duration:
25
- duration = 300
26
- else:
27
- duration = int(duration)
28
- self.expiration = time.time() + duration
29
-
30
- # room + bot details
31
- self.room_url = room_url
32
- room_name = get_room_name(room_url)
33
- if token:
34
- self.token = token
35
- else:
36
- self.token = get_meeting_token(
37
- room_name, os.getenv("DAILY_API_KEY"), self.expiration
38
- )
39
- self.bot_name = bot_name
40
-
41
- self.finished_talking_at = None
42
-
43
- FORMAT = f"%(asctime)s {room_name} %(message)s"
44
  logging.basicConfig(format=FORMAT)
45
  self.logger = logging.getLogger("bot-instance")
46
  self.logger.setLevel(logging.DEBUG)
47
 
48
- self.logger.info(f"Joining as {self.bot_name}")
49
- self.logger.info(f"Joining room {self.room_url}")
50
- self.logger.info(
51
- f"expiration: {datetime.utcfromtimestamp(self.expiration).strftime('%Y-%m-%d %H:%M:%S')}"
52
- )
53
- self.logger.info(
54
- f"now: {datetime.utcfromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S')}"
55
- )
56
-
57
- self.my_participant_id = None
58
-
59
- self.logger.info("configuring daily")
60
- self.configure_daily()
61
-
62
- self.stop_threads = False
63
- self.image = None
64
 
65
- #self.logger.info("starting camera thread")
66
- #self.camera_thread = Thread(target=self.run_camera)
67
- #self.camera_thread.start()
68
 
69
- self.participant_left = False
70
- self.last_fragment_at = None
 
 
 
71
 
72
- try:
73
- participant_count = len(self.client.participants())
74
- self.logger.info(f"{participant_count} participants in room")
75
- while time.time() < self.expiration and not self.participant_left:
76
- time.sleep(1)
77
- except Exception as e:
78
- self.logger.error(f"Exception {e}")
79
- finally:
80
- self.client.leave()
81
 
82
- self.stop_threads = True
83
- self.logger.info("Shutting down")
84
- #self.camera_thread.join()
85
- #self.logger.info("camera thread stopped")
86
- #self.logger.info("Services closed.")
 
 
 
87
 
88
- def configure_daily(self):
89
- Daily.init()
90
- self.client = CallClient(event_handler=self)
91
-
92
- self.mic = Daily.create_microphone_device("mic", sample_rate=16000, channels=1)
93
- self.speaker = Daily.create_speaker_device(
94
- "speaker", sample_rate=16000, channels=1
95
- )
96
- self.camera = Daily.create_camera_device(
97
- "camera", width=1024, height=1024, color_format="RGB"
98
- )
99
 
100
- Daily.select_speaker_device("speaker")
 
 
101
 
102
- self.client.set_user_name(self.bot_name)
103
- self.client.join(self.room_url, completion=self.call_joined)
104
- #self.client.join(self.room_url, self.token, completion=self.call_joined)
105
 
106
- self.client.update_inputs(
107
- {
 
 
 
 
 
108
  "camera": {
109
  "isEnabled": True,
110
  "settings": {
111
- "deviceId": "camera",
112
- "frameRate": 5,
113
- },
114
- },
115
- "microphone": {"isEnabled": True, "settings": {"deviceId": "mic"}},
116
- }
117
- )
118
-
119
- self.my_participant_id = self.client.participants()["local"]["id"]
120
-
121
- def call_joined(self, join_data, client_error):
122
- self.logger.info(f"call_joined: {join_data}, {client_error}")
123
-
124
- def on_participant_joined(self, participant):
125
- self.logger.info(f"on_participant_joined: {participant}")
126
- #self.client.send_app_message({"event": "story-id", "storyID": self.story_id})
127
- self.wave()
128
- time.sleep(2)
129
-
130
- def on_participant_left(self, participant, reason):
131
- if len(self.client.participants()) < 2:
132
- self.logger.info("participant left")
133
- self.participant_left = True
134
-
135
- def wave(self):
136
- self.client.send_app_message(
 
 
 
 
 
 
 
 
137
  {
138
  "event": "sync-emoji-reaction",
139
  "reaction": {
140
- "emoji": "👋",
141
  "room": "main-room",
142
  "sessionId": "bot",
143
  "id": time.time(),
@@ -145,17 +129,39 @@ class DailyLLM(EventHandler):
145
  }
146
  )
147
 
148
-
149
- if __name__ == "__main__":
150
- parser = argparse.ArgumentParser(description="Daily LLM bot")
151
- parser.add_argument("-u", "--url", type=str, help="URL of the Daily room")
152
- parser.add_argument("-t", "--token", type=str, help="Token for Daily API")
153
- parser.add_argument("-b", "--bot-name", type=str, help="Name of the bot")
154
-
 
 
155
  args = parser.parse_args()
156
-
157
- url = args.url or os.getenv("DAILY_URL")
158
- bot_name = args.bot_name or "TestBot"
159
- token = args.token or None
160
-
161
- app = DailyLLM(url, token, bot_name)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
  import argparse
2
+ import queue
3
+ import time
4
+ import threading
5
  import logging
6
  import os
7
+
8
+ from PIL import Image
9
  from typing import Any, Mapping
10
 
11
  from daily import EventHandler, CallClient, Daily
 
16
 
17
  load_dotenv()
18
 
19
+ class DailyVision(EventHandler):
20
  def __init__(
21
  self,
22
+ room_url,
23
+ room_name,
24
+ expiration,
25
+ bot_name="Daily Bot",
26
  ):
27
+ self.__client = CallClient(event_handler=self)
28
+ self.__pipeline = None
29
+ self.__camera = None
30
+ self.__time = time.time()
31
+ self.__queue = queue.Queue()
32
+ self.__app_quit = False
33
+ self.__bot_name = bot_name
34
+ self.__room_url = room_url
35
+ self.__room_name = room_name
36
+ self.__expiration = expiration
37
+
38
+ # Configure logger
39
+ FORMAT = f"%(asctime)s {self.__room_url} %(message)s"
 
 
 
 
 
 
 
 
40
  logging.basicConfig(format=FORMAT)
41
  self.logger = logging.getLogger("bot-instance")
42
  self.logger.setLevel(logging.DEBUG)
43
 
44
+ self.logger.info(f"Expiration timer set to: {self.__expiration}")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
45
 
46
+ # Start thread
47
+ self.__thread = threading.Thread(target = self.process_frames)
48
+ self.__thread.start()
49
 
50
+ def run(self, meeting_url, token):
51
+ # Join
52
+ self.logger.info(f"Connecting to room {meeting_url} as {self.__bot_name}")
53
+ self.__client.set_user_name(self.__bot_name)
54
+ self.__client.join(meeting_url, token, completion=self.on_joined)
55
 
56
+ #self.__participant_id = self.client.participants()["local"]["id"]
 
 
 
 
 
 
 
 
57
 
58
+ # Keep-alive on thread
59
+ self.__thread.join()
60
+
61
+ def leave(self):
62
+ self.logger.info(f"Leaving...")
63
+ self.__app_quit = True
64
+ self.__thread.join()
65
+ self.__client.leave()
66
 
67
+ def on_joined(self, join_data, client_error):
68
+ self.logger.info(f"call_joined: {join_data}, {client_error}")
 
 
 
 
 
 
 
 
 
69
 
70
+ def on_participant_joined(self, participant):
71
+ self.logger.info(f"Participant {participant['id']} joined, analyzing frames...")
72
+ self.__client.set_video_renderer(participant["id"], self.on_video_frame)
73
 
74
+ # Say hello
75
+ self.wave()
 
76
 
77
+ def setup_camera(self, video_frame):
78
+ if not self.__camera:
79
+ self.__camera = Daily.create_camera_device("camera",
80
+ width = video_frame.width,
81
+ height = video_frame.height,
82
+ color_format="RGB")
83
+ self.__client.update_inputs({
84
  "camera": {
85
  "isEnabled": True,
86
  "settings": {
87
+ "deviceId": "camera"
88
+ }
89
+ }
90
+ })
91
+
92
+ def process_frames(self):
93
+ while not self.__app_quit:
94
+ # Check expiry timer
95
+ if time.time() > self.__expiration:
96
+ self.logger.info(f"Expiration timer exceeded. Exiting...")
97
+ self.__app_quit = True
98
+ return
99
+ try:
100
+ video_frame = self.__queue.get(timeout=5)
101
+
102
+ if video_frame:
103
+ image = Image.frombytes("RGBA", (video_frame.width, video_frame.height), video_frame.buffer)
104
+ result = self.__pipeline(image)
105
+
106
+ pil = Image.fromarray(result.render()[0], mode="RGB").tobytes()
107
+
108
+ self.__camera.write_frame(pil)
109
+ except queue.Empty:
110
+ pass
111
+
112
+ def on_video_frame(self, participant_id, video_frame):
113
+ # Process ~15 frames per second (considering incoming frames at 30fps).
114
+ if time.time() - self.__time > 0.05:
115
+ self.__time = time.time()
116
+ self.setup_camera(video_frame)
117
+ self.__queue.put(video_frame)
118
+
119
+ def wave(self, emoji="👋"):
120
+ self.__client.send_app_message(
121
  {
122
  "event": "sync-emoji-reaction",
123
  "reaction": {
124
+ "emoji": emoji,
125
  "room": "main-room",
126
  "sessionId": "bot",
127
  "id": time.time(),
 
129
  }
130
  )
131
 
132
+ def main():
133
+ parser = argparse.ArgumentParser(description="Daily Bot")
134
+ # Required args
135
+ parser.add_argument("-u", "--url", required=True, type=str, help="URL of the Daily room")
136
+ parser.add_argument("-k", "--api_key", required=True, type=str, help="Daily API key")
137
+ # Optional args
138
+ parser.add_argument("-t", "--private", type=bool, help="Is this room private?", default=True)
139
+ parser.add_argument("-n", "--bot-name", type=str, help="Name of the bot", default="Daily Bot")
140
+ parser.add_argument("-e", "--expiration", type=int, help="Duration of bot", default=os.getenv("BOT_MAX_DURATION", 300))
141
  args = parser.parse_args()
142
+
143
+ Daily.init()
144
+
145
+ expiration = time.time() + args.expiration
146
+ room_name = get_room_name(args.url)
147
+
148
+ # Retrieve a meeting token, if not provided
149
+ #@TODO do room lookup to check privacy
150
+ if args.private:
151
+ token = get_meeting_token(room_name, args.api_key, expiration)
152
+
153
+ app = DailyVision(args.url, room_name, expiration, args.bot_name)
154
+
155
+ try :
156
+ app.run(args.url, token)
157
+ except KeyboardInterrupt:
158
+ print("Ctrl-C detected. Exiting!")
159
+ finally:
160
+ print("Bot loop completed. Exiting")
161
+ app.leave()
162
+
163
+ # Let leave finish
164
+ time.sleep(2)
165
+
166
+ if __name__ == '__main__':
167
+ main()
requirements.txt CHANGED
@@ -5,4 +5,5 @@ imageio
5
  requests
6
  fastapi
7
  uvicorn[standard]
8
- requests
 
 
5
  requests
6
  fastapi
7
  uvicorn[standard]
8
+ requests
9
+ pillow
server.py CHANGED
@@ -9,7 +9,7 @@ import requests
9
  import subprocess
10
  import time
11
 
12
- from app.auth import get_meeting_token
13
 
14
  load_dotenv()
15
 
@@ -27,13 +27,9 @@ app.add_middleware(
27
  app.mount("/static", StaticFiles(directory="frontend/out", html=True), name="static")
28
 
29
  def _start_bot(bot_path, args=None):
30
- daily_api_key = os.getenv("DAILY_API_KEY") or ""
31
  api_path = os.getenv("DAILY_API_PATH") or "https://api.daily.co/v1"
32
 
33
- timeout = int(os.getenv("ROOM_TIMEOUT") or os.getenv("BOT_MAX_DURATION") or 300)
34
- exp = time.time() + timeout
35
-
36
- '''
37
  res = requests.post(
38
  f"{api_path}/rooms",
39
  headers={"Authorization": f"Bearer {daily_api_key}"},
@@ -59,10 +55,11 @@ def _start_bot(bot_path, args=None):
59
  500,
60
  )
61
  '''
62
- room_url = os.getenv("DAILY_ROOM_URL") #res.json()["url"]
63
- room_name = os.getenv("DAILY_ROOM_NAME") #res.json()["name"]
64
 
65
- meeting_token = get_meeting_token(room_url, daily_api_key, exp)
 
 
 
66
 
67
  if args:
68
  extra_args = " ".join([f'-{x[0]} "{x[1]}"' for x in args])
@@ -71,7 +68,7 @@ def _start_bot(bot_path, args=None):
71
 
72
  proc = subprocess.Popen(
73
  [
74
- f"python3 {bot_path} -u {room_url} -t {meeting_token} {extra_args}"
75
  ],
76
  shell=True,
77
  bufsize=1,
@@ -83,14 +80,15 @@ def _start_bot(bot_path, args=None):
83
  time.sleep(0.1)
84
  attempts += 1
85
  res = requests.get(
86
- f"{api_path}/rooms/{room_name}/get-session-data",
87
  headers={"Authorization": f"Bearer {daily_api_key}"},
88
  )
89
  if res.status_code == 200:
90
  break
91
- print(f"Took {attempts} attempts to join room {room_name}")
 
 
92
 
93
- return JSONResponse({"room_url": room_url, "token": meeting_token})
94
 
95
  @app.post("/start")
96
  async def start():
 
9
  import subprocess
10
  import time
11
 
12
+ from app.auth import get_room_name
13
 
14
  load_dotenv()
15
 
 
27
  app.mount("/static", StaticFiles(directory="frontend/out", html=True), name="static")
28
 
29
  def _start_bot(bot_path, args=None):
30
+ '''
31
  api_path = os.getenv("DAILY_API_PATH") or "https://api.daily.co/v1"
32
 
 
 
 
 
33
  res = requests.post(
34
  f"{api_path}/rooms",
35
  headers={"Authorization": f"Bearer {daily_api_key}"},
 
55
  500,
56
  )
57
  '''
 
 
58
 
59
+ daily_room_url = os.getenv("DAILY_ROOM_URL") #res.json()["url"]
60
+ daily_room_name = get_room_name(daily_room_url)
61
+ daily_api_path = os.get("DAILY_API_PATH")
62
+ daily_api_key = os.getenv("DAILY_API_KEY")
63
 
64
  if args:
65
  extra_args = " ".join([f'-{x[0]} "{x[1]}"' for x in args])
 
68
 
69
  proc = subprocess.Popen(
70
  [
71
+ f"python3 {bot_path} -u {daily_room_url} -k {daily_api_key} {extra_args}"
72
  ],
73
  shell=True,
74
  bufsize=1,
 
80
  time.sleep(0.1)
81
  attempts += 1
82
  res = requests.get(
83
+ f"{daily_api_path}/rooms/{daily_room_name}/get-session-data",
84
  headers={"Authorization": f"Bearer {daily_api_key}"},
85
  )
86
  if res.status_code == 200:
87
  break
88
+ print(f"Took {attempts} attempts to join room {daily_room_name}")
89
+
90
+ return JSONResponse({"room_url": daily_room_url})
91
 
 
92
 
93
  @app.post("/start")
94
  async def start():