Spaces:
Runtime error
Runtime error
import os | |
import re | |
import json | |
import time | |
import locale | |
import subprocess | |
from yt_dlp import YoutubeDL | |
import numpy as np | |
import pandas as pd | |
from abc import ABC, abstractmethod | |
from pytube import YouTube | |
class MediaDownloader(ABC): | |
def __init__(self, url, output_path, start_time=None, end_time=None): | |
self.url = url | |
self.output_path = os.path.join(os.getcwd(), output_path) | |
self.start_time = start_time | |
self.end_time = end_time | |
self.__create_output_dir() | |
def __create_output_dir(self): | |
if not os.path.exists(self.output_path): | |
os.makedirs(self.output_path) | |
def _get_supported_media_formats(self): | |
pass | |
def download(self, media_type, media_format, media_quality): | |
pass | |
def _download_media(self, media_type, media_format, media_quality): | |
pass | |
def _download_audio(self, audio_format, audio_quality): | |
pass | |
def _download_video(self, video_format, video_quality): | |
pass | |
def _download_audio_and_video(self, media_format, media_quality): | |
pass | |
def _download_media_chunk(self, media_type, media_format, media_quality): | |
pass | |
def _download_audio_chunk(self, audio_format, audio_quality): | |
pass | |
def _download_video_chunk(self, video_format, video_quality): | |
pass | |
class YoutubeDownloader(MediaDownloader): | |
def __init__(self, url, output_path, start_time=None, end_time=None): | |
super().__init__(url, output_path, start_time, end_time) | |
self.youtube = YouTube(url) | |
self.title = self.youtube.title | |
self.media_length = self.youtube.length | |
self.thumbnail_url = self.youtube.thumbnail_url | |
self.streams = self.youtube.streams | |
self.streams_df, self.media_formats_dict = self._get_supported_media_formats() | |
self.num_likes, self.num_views = self._get_num_likes_views() | |
def get_media_formats(self): | |
''' | |
Returns a dictionary for supported media formats | |
''' | |
return self.media_formats_dict | |
def _get_num_likes_views(self): | |
''' | |
Returns the number of likes & views in the video | |
''' | |
with YoutubeDL() as ydl: | |
info = ydl.extract_info(self.url, download=False) | |
num_likes = info.get('like_count', None) | |
num_views = info.get('view_count', None) | |
# num_comments = info.get('comment_count', None) | |
return num_likes, num_views | |
def get_media_metadata(self): | |
''' | |
Returns a dictionary for media metadata | |
''' | |
media_info = { | |
'title': self.title, | |
'num_likes': self.__format_number(self.num_likes), | |
'num_views': self.__format_number(self.num_views), | |
'media_length': self.media_length, | |
'thumbnail_url': self.thumbnail_url | |
} | |
return media_info | |
def __format_number(num): | |
''' | |
Returns the formatted number | |
E.g: Given input 123456789, it returns 123,456,789 | |
''' | |
# Setting the Locale | |
locale.setlocale(locale.LC_ALL, '') | |
# Formatting the Number with Commas | |
num = locale.format_string("%d", num, grouping=True) | |
return num | |
def __get_quality_int(media_quality): | |
''' | |
Returns the Quality in Integer | |
E.g: Given input 1080p, it returns 1080 | |
''' | |
match = re.search(r'^\d+', media_quality) | |
if match: | |
return int(match.group()) | |
else: | |
return None | |
def _get_supported_media_formats(self): | |
''' | |
Returns all supported media formats for both audio & video | |
''' | |
try: | |
# Creating Pandas Dataframe for Video Streams' Details | |
streams_details = [] | |
for stream in self.streams.filter(only_video=True): | |
media_type = stream.type | |
media_format = stream.mime_type.split('/')[1] | |
quality = stream.resolution | |
progressive = stream.is_progressive | |
stream_details = [media_type, media_format, quality, progressive] | |
streams_details.append(stream_details) | |
cols = ['media_type', 'media_format', 'media_quality', 'progressive'] | |
streams_df = pd.DataFrame(streams_details, columns=cols) | |
# Adding Custom Audio Streams | |
streams_df.loc[len(streams_df)] = ['audio', 'mp3', '128kbps', False] | |
streams_df.loc[len(streams_df)] = ['audio', 'mp3', '256kbps', False] | |
streams_df.loc[len(streams_df)] = ['audio', 'wav', '1411kbps', False] | |
# Converting to Dictionary for Unique User Options | |
media_formats_dict = dict() | |
for media_type in sorted(streams_df['media_type'].unique()): | |
media_formats_dict[media_type] = dict() | |
media_type_df = streams_df[streams_df['media_type'] == media_type] | |
for media_format in sorted(media_type_df['media_format'].unique()): | |
media_format_df = media_type_df[media_type_df['media_format'] == media_format] | |
media_qualities = sorted(media_format_df['media_quality'].unique(), key=self.__get_quality_int) | |
media_formats_dict[media_type][media_format] = media_qualities | |
return streams_df, media_formats_dict | |
except Exception as pytube_error: | |
print(f"PyTube Error in _get_supported_media_formats: \n{pytube_error}\n") | |
print('Trying with yt-dlp...') | |
try: | |
# Download Command | |
command = ["yt-dlp", "--list-formats", url, | |
"--get-filename", "--format", "best[ext=mp4]/best[ext=webm]"] | |
print(' '.join(command)) | |
# Running the command using Subprocess and capturing the output | |
completed_process = subprocess.run(command, text=True, stdout=subprocess.PIPE) | |
if completed_process.returncode != 0: | |
print(f"yt-dlp error in _get_supported_media_formats:") | |
print(completed_process.stderr) | |
else: | |
output_lines = completed_process.stdout.split('\n') | |
output_lines = [line for line in output_lines if line.strip()] | |
# Create a list of dictionaries for each format entry | |
streams_details = [] | |
for line in output_lines[2:]: # Skip the header lines | |
fields = line.split() | |
media_format = fields[1] | |
media_quality = fields[-2] | |
if media_format in ['mp4', 'webm']: | |
if 'p,' in media_quality: | |
media_type = 'video' | |
media_quality = media_quality[:-1] | |
progressive = False | |
stream_details = [media_type, media_format, media_quality, progressive] | |
streams_details.append(stream_details) | |
# Create a pandas DataFrame from the list of dictionaries | |
cols = ['media_type', 'media_format', 'media_quality', 'progressive'] | |
streams_df = pd.DataFrame(streams_details, columns=cols) | |
streams_df = streams_df.drop_duplicates().reset_index(drop=True) | |
# Adding Custom Audio Streams | |
streams_df.loc[len(streams_df)] = ['audio', 'mp3', '128kbps', False] | |
streams_df.loc[len(streams_df)] = ['audio', 'mp3', '256kbps', False] | |
streams_df.loc[len(streams_df)] = ['audio', 'wav', '1411kbps', False] | |
# Converting to Dictionary for Unique User Options | |
media_formats_dict = dict() | |
for media_type in sorted(streams_df['media_type'].unique()): | |
media_formats_dict[media_type] = dict() | |
media_type_df = streams_df[streams_df['media_type'] == media_type] | |
for media_format in sorted(media_type_df['media_format'].unique()): | |
media_format_df = media_type_df[media_type_df['media_format'] == media_format] | |
media_qualities = sorted(media_format_df['media_quality'].unique(), key=self.__get_quality_int) | |
media_formats_dict[media_type][media_format] = media_qualities | |
return streams_df, media_formats_dict | |
except Exception as yt_dlp_error: | |
print(f"yt-dlp error in _get_supported_media_formats: \n{yt_dlp_error}\n") | |
def select_media_format(self): | |
''' | |
For selecting media format to download | |
''' | |
print(json.dumps(self.media_formats_dict, indent=12)) | |
# Getting Media Type | |
media_types = list(self.media_formats_dict.keys()) | |
media_type = input(f'Select a Media Type from {media_types}: ') | |
assert(media_type in media_types) | |
# Getting Media Format | |
media_formats = list(self.media_formats_dict[media_type].keys()) | |
media_format = input(f'Select a Media Format from {media_formats}: ') | |
assert(media_format in media_formats) | |
# Getting Media Type | |
media_qualities = self.media_formats_dict[media_type][media_format] | |
media_quality = input(f'Select a Media Quality from {media_qualities}: ') | |
assert(media_quality in media_qualities) | |
return media_type, media_format, media_quality | |
def download(self, media_type, media_format, media_quality): | |
''' | |
Download Handler Function: | |
Handles all types of media download | |
''' | |
if (self.start_time) or (self.end_time): | |
output_path = self._download_media_chunk(media_type, media_format, media_quality) | |
else: | |
output_path = self._download_media(media_type, media_format, media_quality) | |
return output_path | |
def _download_media(self, media_type, media_format, media_quality): | |
''' | |
Media Download Handler Function: | |
Checks which type of media download is required & passes it onto the relevant method | |
''' | |
# Checking for the Media in Dataframe | |
media_mask = (self.streams_df['media_type'] == media_type) & \ | |
(self.streams_df['media_format'] == media_format) & \ | |
(self.streams_df['media_quality'] == media_quality) | |
media_df = self.streams_df[media_mask] | |
# Downloading Media according to the Arguments | |
if media_type == 'audio': | |
output_path = self._download_audio(media_format, media_quality) | |
elif media_type == 'video': | |
# Checking if Progressive Video is Available | |
is_progressive = True if True in media_df['progressive'].unique() else False | |
if is_progressive: | |
output_path = self._download_video(media_format, media_quality) | |
else: | |
output_path = self._download_audio_and_video(media_format, media_quality) | |
return output_path | |
def _download_audio(self, audio_format, audio_quality): | |
''' | |
Filters the required audio stream & downloads it | |
''' | |
try: | |
# Getting Quality Command String | |
quality = str(self.__get_quality_int(audio_quality)) + 'K' | |
# Getting Output Path | |
output_path = os.path.join(self.output_path, f"{self.title}.{audio_format}") | |
# Download Command | |
command = [ | |
"yt-dlp", | |
"-x", "--audio-format", audio_format, | |
"--audio-quality", quality, | |
"-o", output_path, | |
self.url, "-q" | |
] | |
# Running the command using Subprocess | |
subprocess.run(command) | |
return output_path | |
except Exception as yt_dlp_error: | |
print(f"Error in _download_audio: \n{yt_dlp_error}\n") | |
def _download_video(self, video_format, video_quality): | |
''' | |
Filters the required video stream & downloads it | |
Only for Progressive media i.e containing both audio & video streams | |
''' | |
try: | |
stream = self.streams.filter(progressive=True, file_extension=video_format, resolution=video_quality).first() | |
print(stream) | |
video_path = stream.download(output_path=self.output_path, filename=f"{self.title}.{video_format}") | |
return video_path | |
except Exception as pytube_error: | |
print(f"PyTube error in _download_video: \n{pytube_error}\n") | |
print('Trying with yt-dlp...') | |
try: | |
# Getting Output Path | |
output_path = os.path.join(self.output_path, f"{self.title}.{video_format}") | |
# Getting Video Quality Integer | |
video_quality = self.__get_quality_int(video_quality) | |
# Setting Formats | |
if video_format == 'mp4': | |
video_codec = "h264" | |
audio_codec = "m4a" | |
elif video_format == 'webm': | |
video_codec = "vp9" | |
audio_codec = "opus" | |
else: | |
print('Unexpected Video Format Encountered:', video_format) | |
os.exit(0) | |
# Download Command | |
command = [ | |
"yt-dlp", | |
url, | |
"-S", f"res:{video_quality},vcodec:{video_codec},acodec:{audio_codec}", | |
"--merge-output-format", video_format, | |
"-o", f"{output_path}", | |
"-q" | |
] | |
print(' '.join(command)) | |
# Running the command using Subprocess | |
subprocess.run(command, check=True) | |
return output_path | |
except Exception as yt_dlp_error: | |
print(f"yt-dlp error in _download_video: \n{yt_dlp_error}\n") | |
def _download_audio_and_video(self, media_format, media_quality): | |
''' | |
Filters the required video stream & downloads it | |
Filters the best quality audio stream of the same format & downloads it | |
''' | |
try: | |
# Downloading Audio | |
stream = self.streams.filter(file_extension=media_format, only_audio=True).order_by('abr').desc().first() | |
print(stream) | |
audio_filename = f"{self.title} - Audio.{media_format}" | |
audio_path = stream.download(output_path=self.output_path, filename=audio_filename) | |
# Downloading Video | |
stream = self.streams.filter(file_extension=media_format, resolution=media_quality).first() | |
print(stream) | |
video_filename = f"{self.title} - Video.{media_format}" | |
video_path = stream.download(output_path=self.output_path, filename=video_filename) | |
# Combining the Audio & Video Files using FFMPEG Command | |
output_path = os.path.join(self.output_path, f"{self.title}.{media_format}") | |
command = ['ffmpeg', '-i', video_path, '-i', audio_path, | |
'-c:v', 'copy', '-c:a', 'copy', output_path, | |
'-loglevel', 'quiet'] | |
subprocess.run(command) | |
os.remove(audio_path) | |
os.remove(video_path) | |
return output_path | |
except Exception as pytube_error: | |
print(f"PyTube error in _download_audio_and_video: \n{pytube_error}\n") | |
print('Trying with yt-dlp...') | |
try: | |
# Getting Output Path | |
output_path = os.path.join(self.output_path, f"{self.title}.{media_format}") | |
# Getting Video Quality Integer | |
media_quality = self.__get_quality_int(media_quality) | |
# Setting Formats | |
if media_format == 'mp4': | |
video_codec = "h264" | |
audio_codec = "m4a" | |
elif media_format == 'webm': | |
video_codec = "vp9" | |
audio_codec = "opus" | |
else: | |
print('Unexpected Video Format Encountered:', media_format) | |
os.exit(0) | |
# Download Command | |
command = [ | |
"yt-dlp", | |
url, | |
"-S", f"res:{media_quality},vcodec:{video_codec},acodec:{audio_codec}", | |
"--merge-output-format", media_format, | |
"-o", f"{output_path}", | |
"-q" | |
] | |
print(' '.join(command)) | |
# Running the command using Subprocess | |
subprocess.run(command) | |
return output_path | |
except Exception as yt_dlp_error: | |
print(f"yt-dlp error in _download_audio_and_video: \n{yt_dlp_error}\n") | |
def _download_media_chunk(self, media_type, media_format, media_quality): | |
''' | |
Media Download Handler Function: | |
Checks which type of media download is required for particular chunk & passes it onto the relevant method | |
''' | |
# Downloading Media according to the Arguments | |
if media_type == 'audio': | |
output_path = self._download_audio_chunk(media_format, media_quality) | |
elif media_type == 'video': | |
output_path = self._download_video_chunk(media_format, media_quality) | |
return output_path | |
def _download_audio_chunk(self, audio_format, audio_quality): | |
''' | |
Filters the required audio stream & downloads it for particular chunk | |
''' | |
try: | |
# Getting Chunk Command String | |
if (self.start_time) and (self.end_time): | |
chunk_string = f"-ss {self.start_time} -to {self.end_time}" | |
elif (self.start_time) and (not self.end_time): | |
chunk_string = f"-ss {self.start_time}" | |
elif (not self.start_time) and (self.end_time): | |
chunk_string = f"-to {self.end_time}" | |
# Getting Quality Command String | |
quality = str(self.__get_quality_int(audio_quality)) + 'K' | |
# Getting Output Path | |
output_path = os.path.join(self.output_path, f"{self.title}.{audio_format}") | |
# Download Command | |
command = [ | |
"yt-dlp", | |
"-x", "--audio-format", audio_format, | |
"--audio-quality", quality, | |
"--external-downloader", "ffmpeg", | |
"--external-downloader-args", chunk_string, | |
"-o", output_path, | |
url, "-q" | |
] | |
# Running the command using Subprocess | |
subprocess.run(command) | |
return output_path | |
except Exception as e: | |
print(f"Error in _download_audio_chunk: {e}") | |
def _download_video_chunk(self, video_format, video_quality): | |
''' | |
Filters the required video stream & downloads it for particular chunk | |
''' | |
try: | |
# Getting Chunk Command String | |
if (self.start_time) and (self.end_time): | |
chunk_string = f"-ss {self.start_time} -to {self.end_time}" | |
elif (self.start_time) and (not self.end_time): | |
chunk_string = f"-ss {self.start_time}" | |
elif (not self.start_time) and (self.end_time): | |
chunk_string = f"-to {self.end_time}" | |
# Getting Output Path | |
output_path = os.path.join(self.output_path, f"{self.title}.{video_format}") | |
# Getting Video Quality Integer | |
video_quality = self.__get_quality_int(video_quality) | |
# Setting Formats | |
if video_format == 'mp4': | |
video_codec = "h264" | |
audio_codec = "m4a" | |
elif video_format == 'webm': | |
video_codec = "vp9" | |
audio_codec = "opus" | |
else: | |
print('Unexpected Video Format Encountered:', video_format) | |
os.exit(0) | |
# Download Command | |
command = [ | |
"yt-dlp", | |
url, | |
"-S", f"res:{video_quality},vcodec:{video_codec},acodec:{audio_codec}", | |
"--merge-output-format", video_format, | |
"--download-sections", f"*{self.start_time}-{self.end_time}", | |
"-o", f"{output_path}", | |
"-q" | |
] | |
print(' '.join(command)) | |
# Running the command using Subprocess | |
subprocess.run(command) | |
return output_path | |
except Exception as e: | |
print(f"Error in _download_video_chunk: {e}") |