import os import re import json import time import subprocess import numpy as np import pandas as pd from abc import ABC, abstractmethod from pytube import YouTube class MediaDownloader(ABC): def __init__(self, url, output_path, start_time=None, end_time=None): self.url = url self.output_path = os.path.join(os.getcwd(), output_path) self.start_time = start_time self.end_time = end_time self.__create_output_dir() def __create_output_dir(self): if not os.path.exists(self.output_path): os.makedirs(self.output_path) @abstractmethod def _get_supported_media_formats(self): pass @abstractmethod def download(self, media_type, media_format, media_quality): pass @abstractmethod def _download_media(self, media_type, media_format, media_quality): pass @abstractmethod def _download_audio(self, audio_format, audio_quality): pass @abstractmethod def _download_video(self, video_format, video_quality): pass @abstractmethod def _download_audio_and_video(self, media_format, media_quality): pass @abstractmethod def _download_media_chunk(self, media_type, media_format, media_quality): pass @abstractmethod def _download_audio_chunk(self, audio_format, audio_quality): pass @abstractmethod def _download_video_chunk(self, video_format, video_quality): pass class YoutubeDownloader(MediaDownloader): def __init__(self, url, output_path, start_time=None, end_time=None): super().__init__(url, output_path, start_time, end_time) self.youtube = YouTube(url) self.title = self.youtube.title self.media_length = self.youtube.length self.thumbnail_url = self.youtube.thumbnail_url self.streams = self.youtube.streams self.streams_df, self.media_formats_dict = self._get_supported_media_formats() def __get_quality_int(self, media_quality): ''' Returns the Quality in Integer E.g: Given input 1080p, it returns 1080 ''' match = re.search(r'^\d+', media_quality) if match: return int(match.group()) else: return None def _get_supported_media_formats(self): ''' Returns all supported media formats for both audio & video ''' # Creating Pandas Dataframe for Video Streams' Details streams_details = [] for stream in self.streams.filter(only_video=True): media_type = stream.type media_format = stream.mime_type.split('/')[1] quality = stream.resolution progressive = stream.is_progressive stream_details = [media_type, media_format, quality, progressive] streams_details.append(stream_details) cols = ['media_type', 'media_format', 'media_quality', 'progressive'] streams_df = pd.DataFrame(streams_details, columns=cols) # Adding Custom Audio Streams streams_df.loc[len(streams_df)] = ['audio', 'mp3', '128kbps', False] streams_df.loc[len(streams_df)] = ['audio', 'mp3', '256kbps', False] streams_df.loc[len(streams_df)] = ['audio', 'wav', '1411kbps', False] # Converting to Dictionary for Unique User Options media_formats_dict = dict() for media_type in sorted(streams_df['media_type'].unique()): media_formats_dict[media_type] = dict() media_type_df = streams_df[streams_df['media_type'] == media_type] for media_format in sorted(media_type_df['media_format'].unique()): media_format_df = media_type_df[media_type_df['media_format'] == media_format] media_qualities = sorted(media_format_df['media_quality'].unique(), key=self.__get_quality_int) media_formats_dict[media_type][media_format] = media_qualities return streams_df, media_formats_dict def get_media_formats(self): ''' Returns a dictioary for supported media formats ''' return self.media_formats_dict def _select_media_format(self): ''' For selecting media format to download ''' print(json.dumps(self.media_formats_dict, indent=12)) # Getting Media Type media_types = list(self.media_formats_dict.keys()) media_type = input(f'Select a Media Type from {media_types}: ') assert(media_type in media_types) # Getting Media Format media_formats = list(self.media_formats_dict[media_type].keys()) media_format = input(f'Select a Media Format from {media_formats}: ') assert(media_format in media_formats) # Getting Media Type media_qualities = self.media_formats_dict[media_type][media_format] media_quality = input(f'Select a Media Quality from {media_qualities}: ') assert(media_quality in media_qualities) return media_type, media_format, media_quality def download(self, media_type, media_format, media_quality): ''' Download Handler Function: Handles all types of media download ''' if (self.start_time) or (self.end_time): output_path = self._download_media_chunk(media_type, media_format, media_quality) else: output_path = self._download_media(media_type, media_format, media_quality) return output_path def _download_media(self, media_type, media_format, media_quality): ''' Media Download Handler Function: Checks which type of media download is required & passes it onto the relevant method ''' # Checking for the Media in Dataframe media_mask = (self.streams_df['media_type'] == media_type) & \ (self.streams_df['media_format'] == media_format) & \ (self.streams_df['media_quality'] == media_quality) media_df = self.streams_df[media_mask] # Downloading Media according to the Arguments if media_type == 'audio': output_path = self._download_audio(media_format, media_quality) elif media_type == 'video': # Checking if Progressive Video is Available is_progressive = True if True in media_df['progressive'].unique() else False if is_progressive: output_path = self._download_video(media_format, media_quality) else: output_path = self._download_audio_and_video(media_format, media_quality) return output_path def _download_audio(self, audio_format, audio_quality): ''' Filters the required audio stream & downloads it ''' # Getting Quality Command String quality = str(self.__get_quality_int(audio_quality)) + 'K' # Getting Output Path output_path = os.path.join(self.output_path, f"{self.title}.{audio_format}") # Download Command command = [ "yt-dlp", "-x", "--audio-format", audio_format, "--audio-quality", quality, "-o", output_path, self.url, "-q" ] # Running the command using Subprocess subprocess.run(command) return output_path def _download_video(self, video_format, video_quality): ''' Filters the required video stream & downloads it Only for Progressive media i.e containing both audio & video streams ''' stream = self.streams.filter(progressive=True, file_extension=video_format, resolution=video_quality).first() print(stream) video_path = stream.download(output_path=self.output_path, filename=f"{self.title}.{video_format}") return video_path def _download_audio_and_video(self, media_format, media_quality): ''' Filters the required video stream & downloads it Filters the best quality audio stream of the same format & downloads it ''' # Downloading Audio stream = self.streams.filter(file_extension=media_format, only_audio=True).order_by('abr').desc().first() print(stream) audio_filename = f"{self.title} - Audio.{media_format}" audio_path = stream.download(output_path=self.output_path, filename=audio_filename) # Downloading Video stream = self.streams.filter(file_extension=media_format, resolution=media_quality).first() print(stream) video_filename = f"{self.title} - Video.{media_format}" video_path = stream.download(output_path=self.output_path, filename=video_filename) # Combining the Audio & Video Files using FFMPEG Command output_path = os.path.join(self.output_path, f"{self.title}.{media_format}") command = ['ffmpeg', '-i', video_path, '-i', audio_path, '-c:v', 'copy', '-c:a', 'copy', output_path, '-loglevel', 'quiet'] subprocess.run(command) os.remove(audio_path) os.remove(video_path) return output_path def _download_media_chunk(self, media_type, media_format, media_quality): ''' Media Download Handler Function: Checks which type of media download is required for particular chunk & passes it onto the relevant method ''' # Downloading Media according to the Arguments if media_type == 'audio': output_path = self._download_audio_chunk(media_format, media_quality) elif media_type == 'video': output_path = self._download_video_chunk(media_format, media_quality) return output_path def _download_audio_chunk(self, audio_format, audio_quality): ''' Filters the required audio stream & downloads it for particular chunk ''' # Getting Chunk Command String if (self.start_time) and (self.end_time): chunk_string = f"-ss {self.start_time} -to {self.end_time}" elif (self.start_time) and (not self.end_time): chunk_string = f"-ss {self.start_time}" elif (not self.start_time) and (self.end_time): chunk_string = f"-to {self.end_time}" # Getting Quality Command String quality = str(self.__get_quality_int(audio_quality)) + 'K' # Getting Output Path output_path = os.path.join(self.output_path, f"{self.title}.{audio_format}") # Download Command command = [ "yt-dlp", "-x", "--audio-format", audio_format, "--audio-quality", quality, "--external-downloader", "ffmpeg", "--external-downloader-args", chunk_string, "-o", output_path, url, "-q" ] # Running the command using Subprocess subprocess.run(command) return output_path def _download_video_chunk(self, video_format, video_quality): ''' Filters the required video stream & downloads it for particular chunk ''' # Getting Chunk Command String if (self.start_time) and (self.end_time): chunk_string = f"-ss {self.start_time} -to {self.end_time}" elif (self.start_time) and (not self.end_time): chunk_string = f"-ss {self.start_time}" elif (not self.start_time) and (self.end_time): chunk_string = f"-to {self.end_time}" # Getting Output Path output_path = os.path.join(self.output_path, f"{self.title}.{video_format}") # Getting Video Quality Integer video_quality = self.__get_quality_int(video_quality) # Download Command if video_format == 'mp4': video_codec = "h264" audio_codec = "m4a" elif video_format == 'webm': video_codec = "vp9" audio_codec = "opus" else: print('Unexpected Video Format Encountered:', video_format) os.exit(0) command = [ "yt-dlp", url, "-S", f"res:{video_quality},vcodec:{video_codec},acodec:{audio_codec}", "--merge-output-format", video_format, "--download-sections", f"*{self.start_time}-{self.end_time}", "-o", f"{output_path}", # "-q" ] print(' '.join(command)) # Running the command using Subprocess subprocess.run(command) return output_path def get_media_info(self): media_info = { 'title': self.title, 'media_length': self.media_length, 'thumbnail_url': self.thumbnail_url, 'formats': self.media_formats_dict } return media_info @staticmethod def extract_audio(video_path): """ Extract audio from a video file (MP4 or WebM) and save it as an MP3 file using ffmpeg. Args: video_path (str): Path to the input video file. Returns: str: Path of extracted audio. """ try: # Path for Extracted Audio File filename, extension = os.path.splitext(video_path) audio_path = filename + '.mp3' # Choosing the Appropriate Codec for the Output Audio Format (MP3) audio_codec = "libmp3lame" if extension.lower() in (".mp4", ".webm") else "mp3" # Extracting Audio using FFMPEG Command command = ["ffmpeg", "-i", video_path, "-vn", "-acodec", audio_codec, audio_path, '-loglevel', 'quiet'] subprocess.run(command, check=True) return audio_path except subprocess.CalledProcessError as e: print(f"Error: {e}") @staticmethod def burn_subtitles(video_file_path, subtitle_file_path): ''' Burns the subtitles onto the video Args: video_file_path (str): Path to the input video file. subtitle_file_path (str): Path to the subtitle file. Returns: str: Path of output video with subtitles. ''' try: # Getting Output File Path video_filename, video_extension = os.path.splitext(video_file_path) subtitle_filename, subtitle_extension = os.path.splitext(subtitle_file_path) output_file_path = video_filename + subtitle_extension.replace('.', '_') + video_extension # Burning the Subtitles onto Video using FFMPEG Command command = ['ffmpeg', '-i', video_file_path, '-vf', f'subtitles={subtitle_file_path}', output_file_path, '-loglevel', 'quiet'] subprocess.run(command, check=True) return output_file_path except subprocess.CalledProcessError as e: print(f"Error: {e}")