Spanicin commited on
Commit
352f55d
·
verified ·
1 Parent(s): 1fa315c

Update app_parallel.py

Browse files
Files changed (1) hide show
  1. app_parallel.py +20 -18
app_parallel.py CHANGED
@@ -26,7 +26,6 @@ import pickle
26
  from concurrent.futures import ProcessPoolExecutor, as_completed, ThreadPoolExecutor
27
  from stream_server import add_video,HLS_DIR, generate_m3u8
28
  import math
29
- import re
30
 
31
  # Load environment variables from .env file
32
  # load_dotenv()
@@ -287,35 +286,38 @@ def split_audio(audio_path, TEMP_DIR, chunk_duration):
287
 
288
  audio_clip.close() # Close the audio clip to release resources
289
  return audio_chunks, total_duration
290
- def extract_order_from_path(temp_file_path):
291
- match = re.search(r'videostream(\d+)', temp_file_path)
292
- return int(match.group(1)) if match else -1 # Return -1 if no match is found, handle appropriately.
 
293
  # Generator function to yield chunk results as they are processed
 
294
  def generate_chunks(audio_chunks, preprocessed_data, args, m3u8_path, audio_duration):
295
  global TEMP_DIR
296
- next_expected_order = 1
297
- completed_chunks = {}
298
  future_to_chunk = {executor.submit(process_chunk, chunk[1], preprocessed_data, args): chunk[0] for chunk in audio_chunks}
299
-
 
 
300
  try:
301
  for chunk_idx, future in enumerate(as_completed(future_to_chunk)):
302
- idx = future_to_chunk[future] # Get the original chunk that was processed
303
  try:
304
- base64_video, temp_file_path = future.result() # Get the result of the completed task
305
- # Extract the order number from the temp_file_path
306
- order_number = extract_order_from_path(temp_file_path)
307
- completed_chunks[order_number] = (temp_file_path)
308
- # add video for streaming
309
- while next_expected_order in completed_chunks:
310
- temp_file_path = completed_chunks.pop(next_expected_order)
311
- asyncio.run(add_video(temp_file_path, m3u8_path, audio_duration))
312
- yield json.dumps({'start_time': next_expected_order, 'video_index': next_expected_order}).encode('utf-8')
313
- next_expected_order += 1
314
  except Exception as e:
315
  yield f"Task for chunk {idx} failed: {e}\n"
316
  finally:
317
  if TEMP_DIR:
318
  custom_cleanup(TEMP_DIR.name)
 
319
 
320
  @app.route("/run", methods=['POST'])
321
  def parallel_processing():
 
26
  from concurrent.futures import ProcessPoolExecutor, as_completed, ThreadPoolExecutor
27
  from stream_server import add_video,HLS_DIR, generate_m3u8
28
  import math
 
29
 
30
  # Load environment variables from .env file
31
  # load_dotenv()
 
286
 
287
  audio_clip.close() # Close the audio clip to release resources
288
  return audio_chunks, total_duration
289
+
290
+ # def extract_order_from_path(temp_file_path):
291
+ # match = re.search(r'videostream(\d+)', temp_file_path)
292
+ # return int(match.group(1)) if match else -1 # Return -1 if no match is found, handle appropriately.
293
  # Generator function to yield chunk results as they are processed
294
+
295
  def generate_chunks(audio_chunks, preprocessed_data, args, m3u8_path, audio_duration):
296
  global TEMP_DIR
 
 
297
  future_to_chunk = {executor.submit(process_chunk, chunk[1], preprocessed_data, args): chunk[0] for chunk in audio_chunks}
298
+ processed_chunks = {chunk[0]: None for chunk in audio_chunks}
299
+ print("processed_chunks:",processed_chunks)
300
+ yielded_count = 1
301
  try:
302
  for chunk_idx, future in enumerate(as_completed(future_to_chunk)):
303
+ idx = future_to_chunk[future]
304
  try:
305
+ base64_video, temp_file_path = future.result()
306
+ processed_chunks[idx] = temp_file_path
307
+ for expected_start_time in sorted(processed_chunks.keys()):
308
+ if processed_chunks[expected_start_time] is not None:
309
+ asyncio.run(add_video(processed_chunks[expected_start_time], m3u8_path, audio_duration))
310
+ yield json.dumps({'start_time': expected_start_time, 'video_index': yielded_count}).encode('utf-8')
311
+ processed_chunks[expected_start_time] = None
312
+ yielded_count += 1
313
+ else:
314
+ break
315
  except Exception as e:
316
  yield f"Task for chunk {idx} failed: {e}\n"
317
  finally:
318
  if TEMP_DIR:
319
  custom_cleanup(TEMP_DIR.name)
320
+
321
 
322
  @app.route("/run", methods=['POST'])
323
  def parallel_processing():