Spanicin commited on
Commit
579a758
·
verified ·
1 Parent(s): 96dad10

Update app_parallel.py

Browse files
Files changed (1) hide show
  1. app_parallel.py +53 -26
app_parallel.py CHANGED
@@ -1,4 +1,5 @@
1
- from flask import Flask, request, jsonify, stream_with_context
 
2
  import torch
3
  import shutil
4
  import os
@@ -23,6 +24,7 @@ import json
23
  import pickle
24
  # from dotenv import load_dotenv
25
  from concurrent.futures import ProcessPoolExecutor, as_completed, ThreadPoolExecutor
 
26
 
27
  # Load environment variables from .env file
28
  # load_dotenv()
@@ -174,6 +176,12 @@ def custom_cleanup(temp_dir):
174
  import gc
175
  gc.collect()
176
 
 
 
 
 
 
 
177
 
178
  def generate_audio(voice_cloning, voice_gender, text_prompt):
179
  print("generate_audio")
@@ -219,8 +227,10 @@ def generate_audio(voice_cloning, voice_gender, text_prompt):
219
  temp_file.write(chunk)
220
  driven_audio_path = temp_file.name
221
  print('driven_audio_path',driven_audio_path)
 
 
222
 
223
- return driven_audio_path
224
 
225
  def run_preprocessing(args):
226
  global path_of_lm_croper, path_of_net_recon_model, dir_of_BFM_fitting
@@ -273,17 +283,22 @@ def generate_chunks(audio_chunks, preprocessed_data, args):
273
  future_to_chunk = {executor.submit(process_chunk, chunk[1], preprocessed_data, args): chunk[0] for chunk in audio_chunks}
274
 
275
  try:
276
- for future in as_completed(future_to_chunk):
277
  idx = future_to_chunk[future] # Get the original chunk that was processed
278
  try:
279
  base64_video, temp_file_path = future.result() # Get the result of the completed task
280
- yield json.dumps({'start_time': idx, 'base64_video': base64_video}).encode('utf-8')
 
 
 
 
 
281
  except Exception as e:
282
  yield f"Task for chunk {idx} failed: {e}\n"
283
  finally:
284
  if TEMP_DIR:
285
  custom_cleanup(TEMP_DIR.name)
286
-
287
  @app.route("/run", methods=['POST'])
288
  def parallel_processing():
289
  global start_time
@@ -291,6 +306,7 @@ def parallel_processing():
291
  start_time = time.time()
292
  global TEMP_DIR
293
  TEMP_DIR = create_temp_dir()
 
294
  print('request:',request.method)
295
  try:
296
  if request.method == 'POST':
@@ -328,37 +344,30 @@ def parallel_processing():
328
  source_image_path = save_uploaded_file(source_image, 'source_image.png',TEMP_DIR)
329
  print(source_image_path)
330
 
331
- driven_audio_path = generate_audio(voice_cloning, voice_gender, text_prompt)
332
 
333
  save_dir = tempfile.mkdtemp(dir=TEMP_DIR.name)
334
  result_folder = os.path.join(save_dir, "results")
335
  os.makedirs(result_folder, exist_ok=True)
336
 
337
  ref_pose_video_path = None
338
- # if ref_pose_video:
339
- # with tempfile.NamedTemporaryFile(suffix=".mp4", prefix="ref_pose_",dir=TEMP_DIR.name, delete=False) as temp_file:
340
- # ref_pose_video_path = temp_file.name
341
- # ref_pose_video.save(ref_pose_video_path)
342
- # print('ref_pose_video_path',ref_pose_video_path)
 
 
 
 
 
 
 
343
 
344
  except Exception as e:
345
  app.logger.error(f"An error occurred: {e}")
346
  return jsonify({'status': 'error', 'message': str(e)}), 500
347
-
348
- args = AnimationConfig(driven_audio_path=driven_audio_path, source_image_path=source_image_path, result_folder=result_folder, pose_style=pose_style, expression_scale=expression_scale,enhancer=enhancer,still=still,preprocess=preprocess,ref_pose_video_path=ref_pose_video_path, image_hardcoded=image_hardcoded)
349
-
350
- preprocessed_data = run_preprocessing(args)
351
- chunk_duration = 3
352
- print(f"Splitting the audio into {chunk_duration}-second chunks...")
353
- audio_chunks = split_audio(driven_audio_path, chunk_duration=chunk_duration)
354
- print(f"Audio has been split into {len(audio_chunks)} chunks: {audio_chunks}")
355
- return jsonify({"status": "processing started, use /stream to get video chunks."}), 200
356
-
357
- # try:
358
- # return stream_with_context(generate_chunks(audio_chunks, preprocessed_data, args))
359
- # # base64_video, temp_file_path, duration = process_chunk(driven_audio_path, preprocessed_data, args)
360
- # except Exception as e:
361
- # return jsonify({'status': 'error', 'message': str(e)}), 500
362
 
363
  @app.route("/stream", methods=["GET"])
364
  def stream_results():
@@ -371,6 +380,24 @@ def stream_results():
371
  except Exception as e:
372
  return jsonify({'status': 'error', 'message': str(e)}), 500
373
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
374
  @app.route("/health", methods=["GET"])
375
  def health_status():
376
  response = {"online": "true"}
 
1
+ from flask import Flask, request, jsonify, stream_with_context, send_file, send_from_directory
2
+ import asyncio
3
  import torch
4
  import shutil
5
  import os
 
24
  import pickle
25
  # from dotenv import load_dotenv
26
  from concurrent.futures import ProcessPoolExecutor, as_completed, ThreadPoolExecutor
27
+ from stream_server import add_video,HLS_DIR, generate_m3u8
28
 
29
  # Load environment variables from .env file
30
  # load_dotenv()
 
176
  import gc
177
  gc.collect()
178
 
179
+ def get_audio_duration(audio_path):
180
+ audio_clip = mp.AudioFileClip(audio_path)
181
+ duration_in_seconds = audio_clip.duration
182
+ audio_clip.close() # Don't forget to close the clip
183
+ return duration_in_seconds
184
+
185
 
186
  def generate_audio(voice_cloning, voice_gender, text_prompt):
187
  print("generate_audio")
 
227
  temp_file.write(chunk)
228
  driven_audio_path = temp_file.name
229
  print('driven_audio_path',driven_audio_path)
230
+ audio_duration = get_audio_duration(driven_audio_path)
231
+ print('Total Audio Duration in seconds',audio_duration)
232
 
233
+ return driven_audio_path, audio_duration
234
 
235
  def run_preprocessing(args):
236
  global path_of_lm_croper, path_of_net_recon_model, dir_of_BFM_fitting
 
283
  future_to_chunk = {executor.submit(process_chunk, chunk[1], preprocessed_data, args): chunk[0] for chunk in audio_chunks}
284
 
285
  try:
286
+ for chunk_idx, future in enumerate(as_completed(future_to_chunk)):
287
  idx = future_to_chunk[future] # Get the original chunk that was processed
288
  try:
289
  base64_video, temp_file_path = future.result() # Get the result of the completed task
290
+ # add video for streaming
291
+ loop = asyncio.new_event_loop()
292
+ asyncio.set_event_loop(loop)
293
+ loop.run_until_complete(add_video(temp_file_path))
294
+ loop.close()
295
+ yield json.dumps({'start_time': idx, 'video_index': chunk_idx}).encode('utf-8')
296
  except Exception as e:
297
  yield f"Task for chunk {idx} failed: {e}\n"
298
  finally:
299
  if TEMP_DIR:
300
  custom_cleanup(TEMP_DIR.name)
301
+
302
  @app.route("/run", methods=['POST'])
303
  def parallel_processing():
304
  global start_time
 
306
  start_time = time.time()
307
  global TEMP_DIR
308
  TEMP_DIR = create_temp_dir()
309
+ unique_id = str(uuid.uuid4())
310
  print('request:',request.method)
311
  try:
312
  if request.method == 'POST':
 
344
  source_image_path = save_uploaded_file(source_image, 'source_image.png',TEMP_DIR)
345
  print(source_image_path)
346
 
347
+ driven_audio_path, audio_duration = generate_audio(voice_cloning, voice_gender, text_prompt)
348
 
349
  save_dir = tempfile.mkdtemp(dir=TEMP_DIR.name)
350
  result_folder = os.path.join(save_dir, "results")
351
  os.makedirs(result_folder, exist_ok=True)
352
 
353
  ref_pose_video_path = None
354
+ args = AnimationConfig(driven_audio_path=driven_audio_path, source_image_path=source_image_path, result_folder=result_folder, pose_style=pose_style, expression_scale=expression_scale,enhancer=enhancer,still=still,preprocess=preprocess,ref_pose_video_path=ref_pose_video_path, image_hardcoded=image_hardcoded)
355
+ preprocessed_data = run_preprocessing(args)
356
+ chunk_duration = 3
357
+ print(f"Splitting the audio into {chunk_duration}-second chunks...")
358
+ audio_chunks = split_audio(driven_audio_path, chunk_duration=chunk_duration)
359
+ print(f"Audio has been split into {len(audio_chunks)} chunks: {audio_chunks}")
360
+
361
+ os.makedirs('lives', exist_ok=True)
362
+ print("Entering generate m3u8")
363
+ generate_m3u8(audio_duration, f'lives/{unique_id}.m3u8')
364
+
365
+ return jsonify({'video_url': f'{unique_id}.m3u8'}), 200
366
 
367
  except Exception as e:
368
  app.logger.error(f"An error occurred: {e}")
369
  return jsonify({'status': 'error', 'message': str(e)}), 500
370
+
 
 
 
 
 
 
 
 
 
 
 
 
 
 
371
 
372
  @app.route("/stream", methods=["GET"])
373
  def stream_results():
 
380
  except Exception as e:
381
  return jsonify({'status': 'error', 'message': str(e)}), 500
382
 
383
+ @app.route("/live_stream/<string:playlist>", methods=['GET'])
384
+ async def get_concatenated_playlist(playlist: str):
385
+ """
386
+ Endpoint to serve the concatenated HLS playlist.
387
+
388
+ Returns:
389
+ FileResponse: The concatenated playlist file.
390
+ """
391
+ playlist_path = os.path.join('lives', playlist)
392
+ if not os.path.exists(playlist_path):
393
+ return jsonify({'status': 'error', "msg":"Playlist not found"}), 404
394
+
395
+ return send_file(playlist_path, mimetype='application/vnd.apple.mpegurl')
396
+
397
+ @app.route("/live_stream/<string:filename>", methods=["GET"])
398
+ def live_stream(filename):
399
+ return send_from_directory(directory="hls_videos", filename=filename)
400
+
401
  @app.route("/health", methods=["GET"])
402
  def health_status():
403
  response = {"online": "true"}