import os import re import json import time import locale import subprocess from yt_dlp import YoutubeDL import numpy as np import pandas as pd from abc import ABC, abstractmethod from pytube import YouTube class MediaDownloader(ABC): def __init__(self, url, output_path, start_time=None, end_time=None): self.url = url self.output_path = os.path.join(os.getcwd(), output_path) self.start_time = start_time self.end_time = end_time self.__create_output_dir() def __create_output_dir(self): if not os.path.exists(self.output_path): os.makedirs(self.output_path) @abstractmethod def _get_supported_media_formats(self): pass @abstractmethod def download(self, media_type, media_format, media_quality): pass @abstractmethod def _download_media(self, media_type, media_format, media_quality): pass @abstractmethod def _download_audio(self, audio_format, audio_quality): pass @abstractmethod def _download_video(self, video_format, video_quality): pass @abstractmethod def _download_audio_and_video(self, media_format, media_quality): pass @abstractmethod def _download_media_chunk(self, media_type, media_format, media_quality): pass @abstractmethod def _download_audio_chunk(self, audio_format, audio_quality): pass @abstractmethod def _download_video_chunk(self, video_format, video_quality): pass class YoutubeDownloader(MediaDownloader): def __init__(self, url, output_path, start_time=None, end_time=None): super().__init__(url, output_path, start_time, end_time) self.youtube = YouTube(url) self.title = self.youtube.title self.media_length = self.youtube.length self.thumbnail_url = self.youtube.thumbnail_url self.streams = self.youtube.streams self.streams_df, self.media_formats_dict = self._get_supported_media_formats() self.num_likes, self.num_views = self._get_num_likes_views() def get_media_formats(self): ''' Returns a dictionary for supported media formats ''' return self.media_formats_dict def _get_num_likes_views(self): ''' Returns the number of likes & views in the video ''' with YoutubeDL() as ydl: info = ydl.extract_info(self.url, download=False) num_likes = info.get('like_count', None) num_views = info.get('view_count', None) # num_comments = info.get('comment_count', None) return num_likes, num_views def get_media_metadata(self): ''' Returns a dictionary for media metadata ''' media_info = { 'title': self.title, 'num_likes': self.__format_number(self.num_likes), 'num_views': self.__format_number(self.num_views), 'media_length': self.media_length, 'thumbnail_url': self.thumbnail_url } return media_info @staticmethod def __format_number(num): ''' Returns the formatted number E.g: Given input 123456789, it returns 123,456,789 ''' # Setting the Locale locale.setlocale(locale.LC_ALL, '') # Formatting the Number with Commas num = locale.format_string("%d", num, grouping=True) return num @staticmethod def __get_quality_int(media_quality): ''' Returns the Quality in Integer E.g: Given input 1080p, it returns 1080 ''' match = re.search(r'^\d+', media_quality) if match: return int(match.group()) else: return None def _get_supported_media_formats(self): ''' Returns all supported media formats for both audio & video ''' try: # Creating Pandas Dataframe for Video Streams' Details streams_details = [] for stream in self.streams.filter(only_video=True): media_type = stream.type media_format = stream.mime_type.split('/')[1] quality = stream.resolution progressive = stream.is_progressive stream_details = [media_type, media_format, quality, progressive] streams_details.append(stream_details) cols = ['media_type', 'media_format', 'media_quality', 'progressive'] streams_df = pd.DataFrame(streams_details, columns=cols) # Adding Custom Audio Streams streams_df.loc[len(streams_df)] = ['audio', 'mp3', '128kbps', False] streams_df.loc[len(streams_df)] = ['audio', 'mp3', '256kbps', False] streams_df.loc[len(streams_df)] = ['audio', 'wav', '1411kbps', False] # Converting to Dictionary for Unique User Options media_formats_dict = dict() for media_type in sorted(streams_df['media_type'].unique()): media_formats_dict[media_type] = dict() media_type_df = streams_df[streams_df['media_type'] == media_type] for media_format in sorted(media_type_df['media_format'].unique()): media_format_df = media_type_df[media_type_df['media_format'] == media_format] media_qualities = sorted(media_format_df['media_quality'].unique(), key=self.__get_quality_int) media_formats_dict[media_type][media_format] = media_qualities return streams_df, media_formats_dict except Exception as pytube_error: print(f"PyTube Error in _get_supported_media_formats: \n{pytube_error}\n") print('Trying with yt-dlp...') try: # Download Command command = ["yt-dlp", "--list-formats", self.url, "--get-filename", "--format", "best[ext=mp4]/best[ext=webm]"] print(' '.join(command)) # Running the command using Subprocess and capturing the output completed_process = subprocess.run(command, text=True, stdout=subprocess.PIPE) if completed_process.returncode != 0: print(f"yt-dlp error in _get_supported_media_formats:") print(completed_process.stderr) else: output_lines = completed_process.stdout.split('\n') output_lines = [line for line in output_lines if line.strip()] # Create a list of dictionaries for each format entry streams_details = [] for line in output_lines[2:]: # Skip the header lines fields = line.split() media_format = fields[1] media_quality = fields[-2] if media_format in ['mp4', 'webm']: if 'p,' in media_quality: media_type = 'video' media_quality = media_quality[:-1] progressive = False stream_details = [media_type, media_format, media_quality, progressive] streams_details.append(stream_details) # Create a pandas DataFrame from the list of dictionaries cols = ['media_type', 'media_format', 'media_quality', 'progressive'] streams_df = pd.DataFrame(streams_details, columns=cols) streams_df = streams_df.drop_duplicates().reset_index(drop=True) # Adding Custom Audio Streams streams_df.loc[len(streams_df)] = ['audio', 'mp3', '128kbps', False] streams_df.loc[len(streams_df)] = ['audio', 'mp3', '256kbps', False] streams_df.loc[len(streams_df)] = ['audio', 'wav', '1411kbps', False] # Converting to Dictionary for Unique User Options media_formats_dict = dict() for media_type in sorted(streams_df['media_type'].unique()): media_formats_dict[media_type] = dict() media_type_df = streams_df[streams_df['media_type'] == media_type] for media_format in sorted(media_type_df['media_format'].unique()): media_format_df = media_type_df[media_type_df['media_format'] == media_format] media_qualities = sorted(media_format_df['media_quality'].unique(), key=self.__get_quality_int) media_formats_dict[media_type][media_format] = media_qualities return streams_df, media_formats_dict except Exception as yt_dlp_error: print(f"yt-dlp error in _get_supported_media_formats: \n{yt_dlp_error}\n") def select_media_format(self): ''' For selecting media format to download ''' print(json.dumps(self.media_formats_dict, indent=12)) # Getting Media Type media_types = list(self.media_formats_dict.keys()) media_type = input(f'Select a Media Type from {media_types}: ') assert(media_type in media_types) # Getting Media Format media_formats = list(self.media_formats_dict[media_type].keys()) media_format = input(f'Select a Media Format from {media_formats}: ') assert(media_format in media_formats) # Getting Media Type media_qualities = self.media_formats_dict[media_type][media_format] media_quality = input(f'Select a Media Quality from {media_qualities}: ') assert(media_quality in media_qualities) return media_type, media_format, media_quality def download(self, media_type, media_format, media_quality): ''' Download Handler Function: Handles all types of media download ''' if (self.start_time) or (self.end_time): output_path = self._download_media_chunk(media_type, media_format, media_quality) else: output_path = self._download_media(media_type, media_format, media_quality) return output_path def _download_media(self, media_type, media_format, media_quality): ''' Media Download Handler Function: Checks which type of media download is required & passes it onto the relevant method ''' # Checking for the Media in Dataframe media_mask = (self.streams_df['media_type'] == media_type) & \ (self.streams_df['media_format'] == media_format) & \ (self.streams_df['media_quality'] == media_quality) media_df = self.streams_df[media_mask] # Downloading Media according to the Arguments if media_type == 'audio': output_path = self._download_audio(media_format, media_quality) elif media_type == 'video': # Checking if Progressive Video is Available is_progressive = True if True in media_df['progressive'].unique() else False if is_progressive: output_path = self._download_video(media_format, media_quality) else: output_path = self._download_audio_and_video(media_format, media_quality) return output_path def _download_audio(self, audio_format, audio_quality): ''' Filters the required audio stream & downloads it ''' try: # Getting Quality Command String quality = str(self.__get_quality_int(audio_quality)) + 'K' # Getting Output Path output_path = os.path.join(self.output_path, f"{self.title}.{audio_format}") # Download Command command = [ "yt-dlp", "-x", "--audio-format", audio_format, "--audio-quality", quality, "-o", output_path, self.url, "-q" ] # Running the command using Subprocess subprocess.run(command) return output_path except Exception as yt_dlp_error: print(f"Error in _download_audio: \n{yt_dlp_error}\n") def _download_video(self, video_format, video_quality): ''' Filters the required video stream & downloads it Only for Progressive media i.e containing both audio & video streams ''' try: stream = self.streams.filter(progressive=True, file_extension=video_format, resolution=video_quality).first() print(stream) video_path = stream.download(output_path=self.output_path, filename=f"{self.title}.{video_format}") return video_path except Exception as pytube_error: print(f"PyTube error in _download_video: \n{pytube_error}\n") print('Trying with yt-dlp...') try: # Getting Output Path output_path = os.path.join(self.output_path, f"{self.title}.{video_format}") # Getting Video Quality Integer video_quality = self.__get_quality_int(video_quality) # Setting Formats if video_format == 'mp4': video_codec = "h264" audio_codec = "m4a" elif video_format == 'webm': video_codec = "vp9" audio_codec = "opus" else: print('Unexpected Video Format Encountered:', video_format) sys.exit(0) # Download Command command = [ "yt-dlp", self.url, "-S", f"res:{video_quality},vcodec:{video_codec},acodec:{audio_codec}", "--merge-output-format", video_format, "-o", f"{output_path}", "-q" ] print(' '.join(command)) # Running the command using Subprocess subprocess.run(command, check=True) return output_path except Exception as yt_dlp_error: print(f"yt-dlp error in _download_video: \n{yt_dlp_error}\n") def _download_audio_and_video(self, media_format, media_quality): ''' Filters the required video stream & downloads it Filters the best quality audio stream of the same format & downloads it ''' try: # Downloading Audio stream = self.streams.filter(file_extension=media_format, only_audio=True).order_by('abr').desc().first() print(stream) audio_filename = f"{self.title} - Audio.{media_format}" audio_path = stream.download(output_path=self.output_path, filename=audio_filename) # Downloading Video stream = self.streams.filter(file_extension=media_format, resolution=media_quality).first() print(stream) video_filename = f"{self.title} - Video.{media_format}" video_path = stream.download(output_path=self.output_path, filename=video_filename) # Combining the Audio & Video Files using FFMPEG Command output_path = os.path.join(self.output_path, f"{self.title}.{media_format}") command = ['ffmpeg', '-i', video_path, '-i', audio_path, '-c:v', 'copy', '-c:a', 'copy', output_path, '-loglevel', 'quiet'] subprocess.run(command) os.remove(audio_path) os.remove(video_path) return output_path except Exception as pytube_error: print(f"PyTube error in _download_audio_and_video: \n{pytube_error}\n") print('Trying with yt-dlp...') try: # Getting Output Path output_path = os.path.join(self.output_path, f"{self.title}.{media_format}") # Getting Video Quality Integer media_quality = self.__get_quality_int(media_quality) # Setting Formats if media_format == 'mp4': video_codec = "h264" audio_codec = "m4a" elif media_format == 'webm': video_codec = "vp9" audio_codec = "opus" else: print('Unexpected Video Format Encountered:', media_format) sys.exit(0) # Download Command command = [ "yt-dlp", self.url, "-S", f"res:{media_quality},vcodec:{video_codec},acodec:{audio_codec}", "--merge-output-format", media_format, "-o", f"{output_path}", "-q" ] print(' '.join(command)) # Running the command using Subprocess subprocess.run(command) return output_path except Exception as yt_dlp_error: print(f"yt-dlp error in _download_audio_and_video: \n{yt_dlp_error}\n") def _download_media_chunk(self, media_type, media_format, media_quality): ''' Media Download Handler Function: Checks which type of media download is required for particular chunk & passes it onto the relevant method ''' # Downloading Media according to the Arguments if media_type == 'audio': output_path = self._download_audio_chunk(media_format, media_quality) elif media_type == 'video': output_path = self._download_video_chunk(media_format, media_quality) return output_path def _download_audio_chunk(self, audio_format, audio_quality): ''' Filters the required audio stream & downloads it for particular chunk ''' try: # Getting Chunk Command String if (self.start_time) and (self.end_time): chunk_string = f"-ss {self.start_time} -to {self.end_time}" elif (self.start_time) and (not self.end_time): chunk_string = f"-ss {self.start_time}" elif (not self.start_time) and (self.end_time): chunk_string = f"-to {self.end_time}" # Getting Quality Command String quality = str(self.__get_quality_int(audio_quality)) + 'K' # Getting Output Path output_path = os.path.join(self.output_path, f"{self.title}.{audio_format}") # Download Command command = [ "yt-dlp", "-x", "--audio-format", audio_format, "--audio-quality", quality, "--external-downloader", "ffmpeg", "--external-downloader-args", chunk_string, "-o", output_path, self.url, "-q" ] # Running the command using Subprocess subprocess.run(command) return output_path except Exception as e: print(f"Error in _download_audio_chunk: {e}") def _download_video_chunk(self, video_format, video_quality): ''' Filters the required video stream & downloads it for particular chunk ''' try: # Getting Chunk Command String if (self.start_time) and (self.end_time): chunk_string = f"-ss {self.start_time} -to {self.end_time}" elif (self.start_time) and (not self.end_time): chunk_string = f"-ss {self.start_time}" elif (not self.start_time) and (self.end_time): chunk_string = f"-to {self.end_time}" # Getting Output Path output_path = os.path.join(self.output_path, f"{self.title}.{video_format}") # Getting Video Quality Integer video_quality = self.__get_quality_int(video_quality) # Setting Formats if video_format == 'mp4': video_codec = "h264" audio_codec = "m4a" elif video_format == 'webm': video_codec = "vp9" audio_codec = "opus" else: print('Unexpected Video Format Encountered:', video_format) sys.exit(0) # Download Command command = [ "yt-dlp", self.url, "-S", f"res:{video_quality},vcodec:{video_codec},acodec:{audio_codec}", "--merge-output-format", video_format, "--download-sections", f"*{self.start_time}-{self.end_time}", "-o", f"{output_path}", "-q" ] print(' '.join(command)) # Running the command using Subprocess subprocess.run(command) return output_path except Exception as e: print(f"Error in _download_video_chunk: {e}")