benjolo commited on
Commit
cec7050
1 Parent(s): b909d22

Update backend/main.py

Browse files
Files changed (1) hide show
  1. backend/main.py +91 -91
backend/main.py CHANGED
@@ -7,33 +7,12 @@ from urllib import parse
7
  from uuid import uuid4
8
  import logging
9
  from fastapi.logger import logger as fastapi_logger
10
-
11
- ###############################################
12
- # Configure logger
13
-
14
- logging.basicConfig(filename="output.log",
15
- filemode='w',
16
- format='%(asctime)s,%(msecs)d %(name)s %(levelname)s %(message)s',
17
- datefmt='%H:%M:%S',
18
- level=logging.DEBUG)
19
-
20
- gunicorn_error_logger = logging.getLogger("gunicorn.error")
21
- gunicorn_logger = logging.getLogger("gunicorn")
22
- uvicorn_access_logger = logging.getLogger("uvicorn.access")
23
- uvicorn_access_logger.handlers = gunicorn_error_logger.handlers
24
-
25
- fastapi_logger.handlers = gunicorn_error_logger.handlers
26
-
27
- logger = logging.getLogger("socketio_server_pubsub")
28
- logger.propagate = True
29
-
30
- ###############################################
31
-
32
  import sys
33
  # sys.path.append('/Users/benolojo/DCU/CA4/ca400_FinalYearProject/2024-ca400-olojob2-majdap2/src/backend/')
34
 
35
  from fastapi import FastAPI
36
  from fastapi.middleware.cors import CORSMiddleware
 
37
  from pymongo import MongoClient
38
  from dotenv import dotenv_values
39
  from routes import router as api_router
@@ -58,35 +37,41 @@ import torch
58
  # ---------------------------------
59
  import socketio
60
 
61
- DEBUG = True
 
62
 
63
- ESCAPE_HATCH_SERVER_LOCK_RELEASE_NAME = "remove_server_lock"
 
 
 
 
64
 
65
- TARGET_SAMPLING_RATE = 16000
66
- MAX_BYTES_BUFFER = 480_000
67
 
68
- print("")
69
- print("")
70
- print("=" * 20 + " ⭐️ Starting Server... ⭐️ " + "=" * 20)
 
 
 
 
 
 
 
71
 
72
- ###############################################
73
- # Configure socketio server
74
  ###############################################
75
 
76
- # TODO PM - change this to the actual path
77
- # seamless remnant code
78
- CLIENT_BUILD_PATH = "../streaming-react-app/dist/"
79
- static_files = {
80
- "/": CLIENT_BUILD_PATH,
81
- "/assets/seamless-db6a2555.svg": {
82
- "filename": CLIENT_BUILD_PATH + "assets/seamless-db6a2555.svg",
83
- "content_type": "image/svg+xml",
84
- },
85
- }
86
- device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
87
- processor = AutoProcessor.from_pretrained("facebook/seamless-m4t-v2-large")
88
- # PM - hardcoding temporarily as my GPU doesnt have enough vram
89
- model = SeamlessM4Tv2Model.from_pretrained("facebook/seamless-m4t-v2-large").to("cpu")
90
 
91
  config = dotenv_values(".env")
92
 
@@ -114,7 +99,7 @@ async def lifespan(app: FastAPI):
114
  print("Closing MongoDB Connection...")
115
  app.mongodb_client.close()
116
 
117
- app = FastAPI(lifespan=lifespan, logger=logger)
118
 
119
  # New CORS funcitonality
120
  app.add_middleware(
@@ -127,19 +112,36 @@ app.add_middleware(
127
 
128
  app.include_router(api_router) # include routers for user, calls and transcripts operations
129
 
 
130
 
131
- # sio is the main socket.io entrypoint
132
- sio = socketio.AsyncServer(
133
- async_mode="asgi",
134
- cors_allowed_origins="*",
135
- logger=logger,
136
- engineio_logger=logger,
137
- )
138
- # sio.logger.setLevel(logging.DEBUG)
139
- socketio_app = socketio.ASGIApp(sio)
140
- # app.mount("/", socketio_app)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
141
 
142
- from fastapi import APIRouter, Body, Request, status
143
 
144
  bytes_data = bytearray()
145
  model_name = "seamlessM4T_v2_large"
@@ -157,18 +159,24 @@ def get_collection_calls():
157
  return app.database["call_test"]
158
 
159
 
160
- @app.get("/test/", response_description="List all existing call records", response_model=List[UserCall])
161
  def test():
162
 
163
- result = list_calls(get_collection_calls(), 100)
164
 
165
- # return {"message": "Welcome to InterpreTalk!"}
166
 
167
- print(result)
168
- return result
 
 
 
169
 
 
170
 
171
- @app.put("/test_put/", response_description="List all existing call records", response_model=UserCall)
 
 
 
172
  def test_put():
173
 
174
  # result = list_calls(get_collection_calls(), 100)
@@ -179,18 +187,6 @@ def test_put():
179
  return result
180
 
181
 
182
- @app.post("/test_post/", response_description="List all existing call records", response_model=UserCall)
183
- def test_post():
184
- request_data = {
185
- "call_id": "TESTID000001"
186
- }
187
-
188
- result = create_calls(get_collection_calls(), request_data)
189
-
190
- # return {"message": "Welcome to InterpreTalk!"}
191
- return result
192
-
193
-
194
  async def send_translated_text(client_id, original_text, translated_text, room_id):
195
  print('SEND_TRANSLATED_TEXT IS WOKRING IN FASTAPI BACKEND...')
196
  print(rooms)
@@ -202,48 +198,48 @@ async def send_translated_text(client_id, original_text, translated_text, room_i
202
  "translated_text": str(translated_text),
203
  "timestamp": str(datetime.now())
204
  }
205
- logger.warning("SENDING TRANSLATED TEXT TO CLIENT")
206
  await sio.emit("translated_text", data, room=room_id)
207
- logger.warning("SUCCESSFULLY SEND AUDIO TO FRONTEND")
208
 
209
  @sio.on("connect")
210
  async def connect(sid, environ):
211
  print(f"📥 [event: connected] sid={sid}")
212
  query_params = dict(parse.parse_qsl(environ["QUERY_STRING"]))
213
  client_id = query_params.get("client_id")
214
- logger.info(f"📥 [event: connected] sid={sid}, client_id={client_id}")
215
  # sid = socketid, client_id = client specific ID ,always the same for same user
216
  clients[sid] = Client(sid, client_id)
217
- logger.warning(f"Client connected: {sid}")
218
- logger.warning(clients)
219
 
220
  @sio.on("disconnect")
221
  async def disconnect(sid): # BO - also pass call id as parameter for updating MongoDB
222
- logger.debug(f"📤 [event: disconnected] sid={sid}")
223
  clients.pop(sid, None)
224
  # BO -> Update Call record with call duration, key terms
225
 
226
  @sio.on("target_language")
227
  async def target_language(sid, target_lang):
228
- logger.info(f"📥 [event: target_language] sid={sid}, target_lang={target_lang}")
229
  clients[sid].target_language = target_lang
230
 
231
  @sio.on("call_user")
232
  async def call_user(sid, call_id):
233
  clients[sid].call_id = call_id
234
- logger.warning(f"CALL {sid}: entering room {call_id}")
235
  rooms[call_id] = rooms.get(call_id, [])
236
  if sid not in rooms[call_id] and len(rooms[call_id]) < 2:
237
  rooms[call_id].append(sid)
238
  sio.enter_room(sid, call_id)
239
  else:
240
- logger.warning(f"CALL {sid}: room {call_id} is full")
241
  # await sio.emit("room_full", room=call_id, to=sid)
242
 
243
  # # BO - Get call id from dictionary created during socketio connection
244
  # client_id = clients[sid].client_id
245
 
246
- # logger.warning(f"NOW TRYING TO CREATE DB RECORD FOR Caller with ID: {client_id} for call: {call_id}")
247
  # # # BO -> Create Call Record with Caller and call_id field (None for callee, duration, terms..)
248
  # request_data = {
249
  # "call_id": str(call_id),
@@ -263,13 +259,13 @@ async def audio_config(sid, sample_rate):
263
  async def answer_call(sid, call_id):
264
 
265
  clients[sid].call_id = call_id
266
- logger.warning(f"ANSWER {sid}: entering room {call_id}")
267
  rooms[call_id] = rooms.get(call_id, [])
268
  if sid not in rooms[call_id] and len(rooms[call_id]) < 2:
269
  rooms[call_id].append(sid)
270
  sio.enter_room(sid, call_id)
271
  else:
272
- logger.warning(f"ANSWER {sid}: room {call_id} is full")
273
  # await sio.emit("room_full", room=call_id, to=sid)
274
 
275
 
@@ -277,7 +273,7 @@ async def answer_call(sid, call_id):
277
  # client_id = clients[sid].client_id
278
 
279
  # # BO -> Update Call Record with Callee field based on call_id
280
- # logger.warning(f"NOW UPDATING MongoDB RECORD FOR Caller with ID: {client_id} for call: {call_id}")
281
  # # # BO -> Create Call Record with callee_id field (None for callee, duration, terms..)
282
  # request_data = {
283
  # "callee_id": client_id
@@ -293,13 +289,13 @@ async def incoming_audio(sid, data, call_id):
293
  clients[sid].add_bytes(data)
294
 
295
  if clients[sid].get_length() >= MAX_BYTES_BUFFER:
296
- logger.warning('Buffer full, now outputting...')
297
  output_path = clients[sid].output_path
298
  vad_result, resampled_audio = clients[sid].resample_and_write_to_file()
299
  # source lang is speakers tgt language 😃
300
  src_lang = clients[sid].target_language
301
  if vad_result:
302
- logger.warning('Speech detected, now processing audio.....')
303
  tgt_sid = next(id for id in rooms[call_id] if id != sid)
304
  tgt_lang = clients[tgt_sid].target_language
305
  # following example from https://github.com/facebookresearch/seamless_communication/blob/main/docs/m4t/README.md#transformers-usage
@@ -326,7 +322,7 @@ async def incoming_audio(sid, data, call_id):
326
  # send_captions(clients[sid].client_id, asr_text, translated_text, call_id)
327
 
328
  except Exception as e:
329
- logger.error(f"Error in incoming_audio: {e.with_traceback()}")
330
 
331
  def send_captions(client_id, original_text, translated_text, call_id):
332
  # BO -> Update Call Record with Callee field based on call_id
@@ -345,6 +341,10 @@ def send_captions(client_id, original_text, translated_text, call_id):
345
  app.mount("/", socketio_app)
346
 
347
  if __name__ == '__main__':
 
 
 
 
348
  fastapi_logger.setLevel(gunicorn_logger.level)
349
  else:
350
  fastapi_logger.setLevel(logging.DEBUG)
 
7
  from uuid import uuid4
8
  import logging
9
  from fastapi.logger import logger as fastapi_logger
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
10
  import sys
11
  # sys.path.append('/Users/benolojo/DCU/CA4/ca400_FinalYearProject/2024-ca400-olojob2-majdap2/src/backend/')
12
 
13
  from fastapi import FastAPI
14
  from fastapi.middleware.cors import CORSMiddleware
15
+ from fastapi import APIRouter, Body, Request, status
16
  from pymongo import MongoClient
17
  from dotenv import dotenv_values
18
  from routes import router as api_router
 
37
  # ---------------------------------
38
  import socketio
39
 
40
+ ###############################################
41
+ # Configure logger
42
 
43
+ logging.basicConfig(filename="backend.log",
44
+ filemode='w',
45
+ format='%(asctime)s,%(msecs)d %(name)s %(levelname)s %(message)s',
46
+ datefmt='%H:%M:%S',
47
+ level=logging.DEBUG)
48
 
49
+ # logger = logging.getLogger("socketio_server_pubsub")
50
+ # gunicorn_logger.propagate = True
51
 
52
+ gunicorn_error_logger = logging.getLogger("gunicorn.error")
53
+ gunicorn_logger = logging.getLogger("gunicorn")
54
+ uvicorn_access_logger = logging.getLogger("uvicorn.access")
55
+
56
+ gunicorn_error_logger.propagate = True
57
+ gunicorn_logger.propagate = True
58
+ uvicorn_access_logger.propagate = True
59
+
60
+ uvicorn_access_logger.handlers = gunicorn_error_logger.handlers
61
+ fastapi_logger.handlers = gunicorn_error_logger.handlers
62
 
 
 
63
  ###############################################
64
 
65
+ # sio is the main socket.io entrypoint
66
+ sio = socketio.AsyncServer(
67
+ async_mode="asgi",
68
+ cors_allowed_origins="*",
69
+ logger=gunicorn_logger,
70
+ engineio_logger=gunicorn_logger,
71
+ )
72
+ # sio.logger.setLevel(logging.DEBUG)
73
+ socketio_app = socketio.ASGIApp(sio)
74
+ # app.mount("/", socketio_app)
 
 
 
 
75
 
76
  config = dotenv_values(".env")
77
 
 
99
  print("Closing MongoDB Connection...")
100
  app.mongodb_client.close()
101
 
102
+ app = FastAPI(lifespan=lifespan, logger=gunicorn_logger)
103
 
104
  # New CORS funcitonality
105
  app.add_middleware(
 
112
 
113
  app.include_router(api_router) # include routers for user, calls and transcripts operations
114
 
115
+ DEBUG = True
116
 
117
+ ESCAPE_HATCH_SERVER_LOCK_RELEASE_NAME = "remove_server_lock"
118
+
119
+ TARGET_SAMPLING_RATE = 16000
120
+ MAX_BYTES_BUFFER = 480_000
121
+
122
+ print("")
123
+ print("")
124
+ print("=" * 20 + " ⭐️ Starting Server... ⭐️ " + "=" * 20)
125
+
126
+ ###############################################
127
+ # Configure socketio server
128
+ ###############################################
129
+
130
+ # TODO PM - change this to the actual path
131
+ # seamless remnant code
132
+ CLIENT_BUILD_PATH = "../streaming-react-app/dist/"
133
+ static_files = {
134
+ "/": CLIENT_BUILD_PATH,
135
+ "/assets/seamless-db6a2555.svg": {
136
+ "filename": CLIENT_BUILD_PATH + "assets/seamless-db6a2555.svg",
137
+ "content_type": "image/svg+xml",
138
+ },
139
+ }
140
+ device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
141
+ processor = AutoProcessor.from_pretrained("facebook/seamless-m4t-v2-large")
142
+ # PM - hardcoding temporarily as my GPU doesnt have enough vram
143
+ model = SeamlessM4Tv2Model.from_pretrained("facebook/seamless-m4t-v2-large").to("cpu")
144
 
 
145
 
146
  bytes_data = bytearray()
147
  model_name = "seamlessM4T_v2_large"
 
159
  return app.database["call_test"]
160
 
161
 
162
+ @app.get("/test/", response_description="Welcome User")
163
  def test():
164
 
165
+ return {"message": "Welcome to InterpreTalk!"}
166
 
 
167
 
168
+ @app.post("/test_post/", response_description="List more test call records")
169
+ def test_post():
170
+ request_data = {
171
+ "call_id": "TESTID000001"
172
+ }
173
 
174
+ result = create_calls(get_collection_calls(), request_data)
175
 
176
+ # return {"message": "Welcome to InterpreTalk!"}
177
+ return result
178
+
179
+ @app.put("/test_put/", response_description="List test call records")
180
  def test_put():
181
 
182
  # result = list_calls(get_collection_calls(), 100)
 
187
  return result
188
 
189
 
 
 
 
 
 
 
 
 
 
 
 
 
190
  async def send_translated_text(client_id, original_text, translated_text, room_id):
191
  print('SEND_TRANSLATED_TEXT IS WOKRING IN FASTAPI BACKEND...')
192
  print(rooms)
 
198
  "translated_text": str(translated_text),
199
  "timestamp": str(datetime.now())
200
  }
201
+ gunicorn_logger.info("SENDING TRANSLATED TEXT TO CLIENT")
202
  await sio.emit("translated_text", data, room=room_id)
203
+ gunicorn_logger.info("SUCCESSFULLY SEND AUDIO TO FRONTEND")
204
 
205
  @sio.on("connect")
206
  async def connect(sid, environ):
207
  print(f"📥 [event: connected] sid={sid}")
208
  query_params = dict(parse.parse_qsl(environ["QUERY_STRING"]))
209
  client_id = query_params.get("client_id")
210
+ gunicorn_logger.info(f"📥 [event: connected] sid={sid}, client_id={client_id}")
211
  # sid = socketid, client_id = client specific ID ,always the same for same user
212
  clients[sid] = Client(sid, client_id)
213
+ gunicorn_logger.warning(f"Client connected: {sid}")
214
+ gunicorn_logger.warning(clients)
215
 
216
  @sio.on("disconnect")
217
  async def disconnect(sid): # BO - also pass call id as parameter for updating MongoDB
218
+ gunicorn_logger.debug(f"📤 [event: disconnected] sid={sid}")
219
  clients.pop(sid, None)
220
  # BO -> Update Call record with call duration, key terms
221
 
222
  @sio.on("target_language")
223
  async def target_language(sid, target_lang):
224
+ gunicorn_logger.info(f"📥 [event: target_language] sid={sid}, target_lang={target_lang}")
225
  clients[sid].target_language = target_lang
226
 
227
  @sio.on("call_user")
228
  async def call_user(sid, call_id):
229
  clients[sid].call_id = call_id
230
+ gunicorn_logger.info(f"CALL {sid}: entering room {call_id}")
231
  rooms[call_id] = rooms.get(call_id, [])
232
  if sid not in rooms[call_id] and len(rooms[call_id]) < 2:
233
  rooms[call_id].append(sid)
234
  sio.enter_room(sid, call_id)
235
  else:
236
+ gunicorn_logger.info(f"CALL {sid}: room {call_id} is full")
237
  # await sio.emit("room_full", room=call_id, to=sid)
238
 
239
  # # BO - Get call id from dictionary created during socketio connection
240
  # client_id = clients[sid].client_id
241
 
242
+ # gunicorn_logger.warning(f"NOW TRYING TO CREATE DB RECORD FOR Caller with ID: {client_id} for call: {call_id}")
243
  # # # BO -> Create Call Record with Caller and call_id field (None for callee, duration, terms..)
244
  # request_data = {
245
  # "call_id": str(call_id),
 
259
  async def answer_call(sid, call_id):
260
 
261
  clients[sid].call_id = call_id
262
+ gunicorn_logger.info(f"ANSWER {sid}: entering room {call_id}")
263
  rooms[call_id] = rooms.get(call_id, [])
264
  if sid not in rooms[call_id] and len(rooms[call_id]) < 2:
265
  rooms[call_id].append(sid)
266
  sio.enter_room(sid, call_id)
267
  else:
268
+ gunicorn_logger.info(f"ANSWER {sid}: room {call_id} is full")
269
  # await sio.emit("room_full", room=call_id, to=sid)
270
 
271
 
 
273
  # client_id = clients[sid].client_id
274
 
275
  # # BO -> Update Call Record with Callee field based on call_id
276
+ # gunicorn_logger.warning(f"NOW UPDATING MongoDB RECORD FOR Caller with ID: {client_id} for call: {call_id}")
277
  # # # BO -> Create Call Record with callee_id field (None for callee, duration, terms..)
278
  # request_data = {
279
  # "callee_id": client_id
 
289
  clients[sid].add_bytes(data)
290
 
291
  if clients[sid].get_length() >= MAX_BYTES_BUFFER:
292
+ gunicorn_logger.info('Buffer full, now outputting...')
293
  output_path = clients[sid].output_path
294
  vad_result, resampled_audio = clients[sid].resample_and_write_to_file()
295
  # source lang is speakers tgt language 😃
296
  src_lang = clients[sid].target_language
297
  if vad_result:
298
+ gunicorn_logger.info('Speech detected, now processing audio.....')
299
  tgt_sid = next(id for id in rooms[call_id] if id != sid)
300
  tgt_lang = clients[tgt_sid].target_language
301
  # following example from https://github.com/facebookresearch/seamless_communication/blob/main/docs/m4t/README.md#transformers-usage
 
322
  # send_captions(clients[sid].client_id, asr_text, translated_text, call_id)
323
 
324
  except Exception as e:
325
+ gunicorn_logger.error(f"Error in incoming_audio: {e.with_traceback()}")
326
 
327
  def send_captions(client_id, original_text, translated_text, call_id):
328
  # BO -> Update Call Record with Callee field based on call_id
 
341
  app.mount("/", socketio_app)
342
 
343
  if __name__ == '__main__':
344
+ uvicorn.run("main:app", host='127.0.0.1', port=8080, log_level="info")
345
+
346
+ # Running in Docker Container
347
+ if __name__ != "__main__":
348
  fastapi_logger.setLevel(gunicorn_logger.level)
349
  else:
350
  fastapi_logger.setLevel(logging.DEBUG)