import os import subprocess import pathlib # from fastapi import HTTPException import asyncio # Define base directories BASE_DIR = pathlib.Path('./videos').resolve() HLS_DIR = pathlib.Path('./hls_videos').resolve() HLS_DIR.mkdir(exist_ok=True) # Keep track of video names added to the queue video_names = [] segment_counter = 1 total_processed_duration = 0 segment_lock = asyncio.Lock() def is_valid_path(video_name): """ Validates the video path to prevent directory traversal attacks. Args: video_name (str): Name of the video file. Returns: bool: True if valid, False otherwise. """ video_path = (BASE_DIR / video_name).resolve() return str(video_path).startswith(str(BASE_DIR)) def convert_to_hls(input_path, output_dir, video_name, start_number): """ Converts an MP4 video to HLS format with 3-second segments starting from `start_number`. Args: input_path (str): Path to the input MP4 video. output_dir (str): Directory where HLS files will be stored. video_name (str): Name of the video (used for playlist naming). start_number (int): The starting number for HLS segments. Returns: int: Number of segments generated. """ # FFmpeg command to convert MP4 to HLS with 3-second segments cmd = [ 'ffmpeg', '-y', # Overwrite output files without asking '-i', input_path, # Input file #'-codec', 'copy', # Audio codec-vf scale=1280:720 -r 30 -c:v libx264 -b:v 1500k -c:a aac -b:a 128k '-vf','scale=1280:720', '-r','30', '-c:v','libx264', '-b:v','1500k', '-c:a','aac', '-b:a','128k', '-hls_time', '3', # Segment duration in seconds '-hls_playlist_type', 'vod', # Video on Demand playlist type '-start_number', str(start_number), # Starting segment number '-hls_segment_filename', os.path.join(output_dir, 'video_stream%d.ts'), # Segment file naming os.path.join(output_dir, f'{video_name}.m3u8') # Output playlist ] try: # Execute the FFmpeg command subprocess.run(cmd, check=True, stderr=subprocess.PIPE) except subprocess.CalledProcessError as e: # Log FFmpeg errors and raise an HTTP exception error_message = e.stderr.decode() print(f"FFmpeg error: {error_message}") raise HTTPException(status_code=500, detail="Video conversion failed") async def add_video(video_name, output_path, audio_duration): """ Endpoint to add a video to the streaming queue. Args: video_name (str): Name of the video file to add. request (Request): FastAPI request object to extract base URL. Returns: dict: Success message. """ global total_processed_duration print((HLS_DIR / f'{video_name}.m3u8').exists()) if not (HLS_DIR / f'{video_name}.m3u8').exists(): async with segment_lock: global segment_counter current_start_number = segment_counter num_segments_before = len([f for f in os.listdir(HLS_DIR) if f.startswith('video_stream') and f.endswith('.ts')]) await asyncio.get_event_loop().run_in_executor( None, convert_to_hls, str(video_name), str(HLS_DIR), video_name, current_start_number ) num_segments_after = len([f for f in os.listdir(HLS_DIR) if f.startswith('video_stream') and f.endswith('.ts')]) num_new_segments = num_segments_after - num_segments_before # Update the global segment counter segment_counter += num_new_segments video_names.append(video_name) segment_duration = 3 full_segments = int(audio_duration // segment_duration) remaining_duration = audio_duration % segment_duration existing_segments = set() if output_path: with open(output_path, 'r') as m3u8_file: for line in m3u8_file: if line.startswith('/live_stream/video_stream'): existing_segments.add(line.strip()) new_segments = [] # Generate new segment paths for i in range(1, full_segments + 1): new_segment_path = f"/live_stream/video_stream{i}.ts" if new_segment_path not in existing_segments: new_segments.append(new_segment_path) total_processed_duration += segment_duration with open(output_path, 'a') as m3u8_file: for segment in new_segments: m3u8_file.write(f"#EXTINF:{segment_duration:.3f},\n") m3u8_file.write(f"{segment}\n") if remaining_duration > 0: remaining_segment_path = f"/live_stream/video_stream{full_segments + 1}.ts" if remaining_segment_path not in existing_segments: m3u8_file.write(f"#EXTINF:{remaining_duration:.3f},\n") m3u8_file.write(f"{remaining_segment_path}\n") total_processed_duration += remaining_duration # Add #EXT-X-ENDLIST only once after all segments have been added if total_processed_duration == audio_duration: with open(output_path, 'a') as m3u8_file: m3u8_file.write("#EXT-X-ENDLIST\n") total_processed_duration = 0 return {"message": f'"{video_name}" added to the streaming queue.'} async def concatenate_playlists(video_names, base_dir): """ Concatenates multiple HLS playlists into a single playlist with unique segment numbering. Args: video_names (list): List of video names added to the queue. base_dir (str): Base directory where HLS files are stored. request (Request): FastAPI request object to extract base URL. """ concatenated_playlist_path = os.path.join(base_dir, 'master.m3u8') max_segment_duration = 3 # Since we set hls_time to 3 segment_lines = [] # To store segment lines # Construct base URL from the incoming request for video_name in video_names: video_playlist_path = os.path.join(base_dir, f'{video_name}.m3u8') if os.path.exists(video_playlist_path): with open(video_playlist_path, 'r') as infile: lines = infile.readlines() for line in lines: line = line.strip() if line.startswith('#EXTINF'): # Append EXTINF line segment_lines.append(line) elif line.endswith('.ts'): segment_file = line # Update segment URI to include full URL segment_path = f'{segment_file}' segment_lines.append(segment_path) elif line.startswith('#EXT-X-BYTERANGE'): # Include byte range if present segment_lines.append(line) elif line.startswith('#EXT-X-ENDLIST'): # Do not include this here; we'll add it at the end continue elif line.startswith('#EXTM3U') or line.startswith('#EXT-X-VERSION') or line.startswith('#EXT-X-PLAYLIST-TYPE') or line.startswith('#EXT-X-TARGETDURATION') or line.startswith('#EXT-X-MEDIA-SEQUENCE'): # Skip these tags; they'll be added in the concatenated playlist continue else: # Include any other necessary tags segment_lines.append(line) # Write the concatenated playlist with open(concatenated_playlist_path, 'w') as outfile: outfile.write('#EXTM3U\n') outfile.write('#EXT-X-PLAYLIST-TYPE:VOD\n') outfile.write(f'#EXT-X-TARGETDURATION:{max_segment_duration}\n') outfile.write('#EXT-X-VERSION:4\n') outfile.write(f'#EXT-X-MEDIA-SEQUENCE:{1}\n') # Starting from segment number 1 for line in segment_lines: outfile.write(f'{line}\n') outfile.write('#EXT-X-ENDLIST\n') def generate_m3u8(video_duration, output_path,segment_duration=3): # Initialize playlist content m3u8_content = "#EXTM3U\n" m3u8_content += "#EXT-X-PLAYLIST-TYPE:VOD\n" m3u8_content += f"#EXT-X-TARGETDURATION:{segment_duration}\n" m3u8_content += "#EXT-X-VERSION:4\n" m3u8_content += "#EXT-X-MEDIA-SEQUENCE:1\n" # # Calculate the number of full segments and the remaining duration # segment_duration = int(segment_duration) # full_segments = int(video_duration // segment_duration) # remaining_duration = video_duration % segment_duration # # Add full segments to the playlist # for i in range(full_segments): # m3u8_content += f"#EXTINF:{segment_duration:.6f},\n" # m3u8_content += f"/live_stream/video_stream{i + 1}.ts\n" # # Add the remaining segment if there's any leftover duration # if remaining_duration > 0: # m3u8_content += f"#EXTINF:{remaining_duration:.6f},\n" # m3u8_content += f"/live_stream/video_stream{full_segments + 1}.ts\n" # # End the playlist # m3u8_content += "#EXT-X-ENDLIST\n" with open(output_path, "w") as file: file.write(m3u8_content) print(f"M3U8 playlist saved to {output_path}")