Spaces:
Runtime error
Runtime error
import os | |
import re | |
import json | |
import time | |
import subprocess | |
import numpy as np | |
import pandas as pd | |
from abc import ABC, abstractmethod | |
from pytube import YouTube | |
class MediaDownloader(ABC): | |
def __init__(self, url, output_path, start_time=None, end_time=None): | |
self.url = url | |
self.output_path = os.path.join(os.getcwd(), output_path) | |
self.start_time = start_time | |
self.end_time = end_time | |
self.__create_output_dir() | |
def __create_output_dir(self): | |
if not os.path.exists(self.output_path): | |
os.makedirs(self.output_path) | |
def _get_supported_media_formats(self): | |
pass | |
def download(self, media_type, media_format, media_quality): | |
pass | |
def _download_media(self, media_type, media_format, media_quality): | |
pass | |
def _download_audio(self, audio_format, audio_quality): | |
pass | |
def _download_video(self, video_format, video_quality): | |
pass | |
def _download_audio_and_video(self, media_format, media_quality): | |
pass | |
def _download_media_chunk(self, media_type, media_format, media_quality): | |
pass | |
def _download_audio_chunk(self, audio_format, audio_quality): | |
pass | |
def _download_video_chunk(self, video_format, video_quality): | |
pass | |
class YoutubeDownloader(MediaDownloader): | |
def __init__(self, url, output_path, start_time=None, end_time=None): | |
super().__init__(url, output_path, start_time, end_time) | |
self.youtube = YouTube(url) | |
self.title = self.youtube.title | |
self.media_length = self.youtube.length | |
self.thumbnail_url = self.youtube.thumbnail_url | |
self.streams = self.youtube.streams | |
self.streams_df, self.media_formats_dict = self._get_supported_media_formats() | |
def __get_quality_int(self, media_quality): | |
''' | |
Returns the Quality in Integer | |
E.g: Given input 1080p, it returns 1080 | |
''' | |
match = re.search(r'^\d+', media_quality) | |
if match: | |
return int(match.group()) | |
else: | |
return None | |
def _get_supported_media_formats(self): | |
''' | |
Returns all supported media formats for both audio & video | |
''' | |
# Creating Pandas Dataframe for Video Streams' Details | |
streams_details = [] | |
for stream in self.streams.filter(only_video=True): | |
media_type = stream.type | |
media_format = stream.mime_type.split('/')[1] | |
quality = stream.resolution | |
progressive = stream.is_progressive | |
stream_details = [media_type, media_format, quality, progressive] | |
streams_details.append(stream_details) | |
cols = ['media_type', 'media_format', 'media_quality', 'progressive'] | |
streams_df = pd.DataFrame(streams_details, columns=cols) | |
# Adding Custom Audio Streams | |
streams_df.loc[len(streams_df)] = ['audio', 'mp3', '128kbps', False] | |
streams_df.loc[len(streams_df)] = ['audio', 'mp3', '256kbps', False] | |
streams_df.loc[len(streams_df)] = ['audio', 'wav', '1411kbps', False] | |
# Converting to Dictionary for Unique User Options | |
media_formats_dict = dict() | |
for media_type in sorted(streams_df['media_type'].unique()): | |
media_formats_dict[media_type] = dict() | |
media_type_df = streams_df[streams_df['media_type'] == media_type] | |
for media_format in sorted(media_type_df['media_format'].unique()): | |
media_format_df = media_type_df[media_type_df['media_format'] == media_format] | |
media_qualities = sorted(media_format_df['media_quality'].unique(), key=self.__get_quality_int) | |
media_formats_dict[media_type][media_format] = media_qualities | |
return streams_df, media_formats_dict | |
def get_media_formats(self): | |
''' | |
Returns a dictioary for supported media formats | |
''' | |
return self.media_formats_dict | |
def _select_media_format(self): | |
''' | |
For selecting media format to download | |
''' | |
print(json.dumps(self.media_formats_dict, indent=12)) | |
# Getting Media Type | |
media_types = list(self.media_formats_dict.keys()) | |
media_type = input(f'Select a Media Type from {media_types}: ') | |
assert(media_type in media_types) | |
# Getting Media Format | |
media_formats = list(self.media_formats_dict[media_type].keys()) | |
media_format = input(f'Select a Media Format from {media_formats}: ') | |
assert(media_format in media_formats) | |
# Getting Media Type | |
media_qualities = self.media_formats_dict[media_type][media_format] | |
media_quality = input(f'Select a Media Quality from {media_qualities}: ') | |
assert(media_quality in media_qualities) | |
return media_type, media_format, media_quality | |
def download(self, media_type, media_format, media_quality): | |
''' | |
Download Handler Function: | |
Handles all types of media download | |
''' | |
if (self.start_time) or (self.end_time): | |
output_path = self._download_media_chunk(media_type, media_format, media_quality) | |
else: | |
output_path = self._download_media(media_type, media_format, media_quality) | |
return output_path | |
def _download_media(self, media_type, media_format, media_quality): | |
''' | |
Media Download Handler Function: | |
Checks which type of media download is required & passes it onto the relevant method | |
''' | |
# Checking for the Media in Dataframe | |
media_mask = (self.streams_df['media_type'] == media_type) & \ | |
(self.streams_df['media_format'] == media_format) & \ | |
(self.streams_df['media_quality'] == media_quality) | |
media_df = self.streams_df[media_mask] | |
# Downloading Media according to the Arguments | |
if media_type == 'audio': | |
output_path = self._download_audio(media_format, media_quality) | |
elif media_type == 'video': | |
# Checking if Progressive Video is Available | |
is_progressive = True if True in media_df['progressive'].unique() else False | |
if is_progressive: | |
output_path = self._download_video(media_format, media_quality) | |
else: | |
output_path = self._download_audio_and_video(media_format, media_quality) | |
return output_path | |
def _download_audio(self, audio_format, audio_quality): | |
''' | |
Filters the required audio stream & downloads it | |
''' | |
# Getting Quality Command String | |
quality = str(self.__get_quality_int(audio_quality)) + 'K' | |
# Getting Output Path | |
output_path = os.path.join(self.output_path, f"{self.title}.{audio_format}") | |
# Download Command | |
command = [ | |
"yt-dlp", | |
"-x", "--audio-format", audio_format, | |
"--audio-quality", quality, | |
"-o", output_path, | |
self.url, "-q" | |
] | |
# Running the command using Subprocess | |
subprocess.run(command) | |
return output_path | |
def _download_video(self, video_format, video_quality): | |
''' | |
Filters the required video stream & downloads it | |
Only for Progressive media i.e containing both audio & video streams | |
''' | |
stream = self.streams.filter(progressive=True, file_extension=video_format, resolution=video_quality).first() | |
print(stream) | |
video_path = stream.download(output_path=self.output_path, filename=f"{self.title}.{video_format}") | |
return video_path | |
def _download_audio_and_video(self, media_format, media_quality): | |
''' | |
Filters the required video stream & downloads it | |
Filters the best quality audio stream of the same format & downloads it | |
''' | |
# Downloading Audio | |
stream = self.streams.filter(file_extension=media_format, only_audio=True).order_by('abr').desc().first() | |
print(stream) | |
audio_filename = f"{self.title} - Audio.{media_format}" | |
audio_path = stream.download(output_path=self.output_path, filename=audio_filename) | |
# Downloading Video | |
stream = self.streams.filter(file_extension=media_format, resolution=media_quality).first() | |
print(stream) | |
video_filename = f"{self.title} - Video.{media_format}" | |
video_path = stream.download(output_path=self.output_path, filename=video_filename) | |
# Combining the Audio & Video Files using FFMPEG Command | |
output_path = os.path.join(self.output_path, f"{self.title}.{media_format}") | |
command = ['ffmpeg', '-i', video_path, '-i', audio_path, | |
'-c:v', 'copy', '-c:a', 'copy', output_path, | |
'-loglevel', 'quiet'] | |
subprocess.run(command) | |
os.remove(audio_path) | |
os.remove(video_path) | |
return output_path | |
def _download_media_chunk(self, media_type, media_format, media_quality): | |
''' | |
Media Download Handler Function: | |
Checks which type of media download is required for particular chunk & passes it onto the relevant method | |
''' | |
# Downloading Media according to the Arguments | |
if media_type == 'audio': | |
output_path = self._download_audio_chunk(media_format, media_quality) | |
elif media_type == 'video': | |
output_path = self._download_video_chunk(media_format, media_quality) | |
return output_path | |
def _download_audio_chunk(self, audio_format, audio_quality): | |
''' | |
Filters the required audio stream & downloads it for particular chunk | |
''' | |
# Getting Chunk Command String | |
if (self.start_time) and (self.end_time): | |
chunk_string = f"-ss {self.start_time} -to {self.end_time}" | |
elif (self.start_time) and (not self.end_time): | |
chunk_string = f"-ss {self.start_time}" | |
elif (not self.start_time) and (self.end_time): | |
chunk_string = f"-to {self.end_time}" | |
# Getting Quality Command String | |
quality = str(self.__get_quality_int(audio_quality)) + 'K' | |
# Getting Output Path | |
output_path = os.path.join(self.output_path, f"{self.title}.{audio_format}") | |
# Download Command | |
command = [ | |
"yt-dlp", | |
"-x", "--audio-format", audio_format, | |
"--audio-quality", quality, | |
"--external-downloader", "ffmpeg", | |
"--external-downloader-args", chunk_string, | |
"-o", output_path, | |
url, "-q" | |
] | |
# Running the command using Subprocess | |
subprocess.run(command) | |
return output_path | |
def _download_video_chunk(self, video_format, video_quality): | |
''' | |
Filters the required video stream & downloads it for particular chunk | |
''' | |
# Getting Chunk Command String | |
if (self.start_time) and (self.end_time): | |
chunk_string = f"-ss {self.start_time} -to {self.end_time}" | |
elif (self.start_time) and (not self.end_time): | |
chunk_string = f"-ss {self.start_time}" | |
elif (not self.start_time) and (self.end_time): | |
chunk_string = f"-to {self.end_time}" | |
# Getting Output Path | |
output_path = os.path.join(self.output_path, f"{self.title}.{video_format}") | |
# Getting Video Quality Integer | |
video_quality = self.__get_quality_int(video_quality) | |
# Download Command | |
if video_format == 'mp4': | |
video_codec = "h264" | |
audio_codec = "m4a" | |
elif video_format == 'webm': | |
video_codec = "vp9" | |
audio_codec = "opus" | |
else: | |
print('Unexpected Video Format Encountered:', video_format) | |
os.exit(0) | |
command = [ | |
"yt-dlp", | |
url, | |
"-S", f"res:{video_quality},vcodec:{video_codec},acodec:{audio_codec}", | |
"--merge-output-format", video_format, | |
"--download-sections", f"*{self.start_time}-{self.end_time}", | |
"-o", f"{output_path}", | |
# "-q" | |
] | |
print(' '.join(command)) | |
# Running the command using Subprocess | |
subprocess.run(command) | |
return output_path | |
def get_media_info(self): | |
media_info = { | |
'title': self.title, | |
'media_length': self.media_length, | |
'thumbnail_url': self.thumbnail_url, | |
'formats': self.media_formats_dict | |
} | |
return media_info | |
def extract_audio(video_path): | |
""" | |
Extract audio from a video file (MP4 or WebM) and save it as an MP3 file using ffmpeg. | |
Args: | |
video_path (str): Path to the input video file. | |
Returns: | |
str: Path of extracted audio. | |
""" | |
try: | |
# Path for Extracted Audio File | |
filename, extension = os.path.splitext(video_path) | |
audio_path = filename + '.mp3' | |
# Choosing the Appropriate Codec for the Output Audio Format (MP3) | |
audio_codec = "libmp3lame" if extension.lower() in (".mp4", ".webm") else "mp3" | |
# Extracting Audio using FFMPEG Command | |
command = ["ffmpeg", "-i", video_path, "-vn", "-acodec", | |
audio_codec, audio_path, '-loglevel', 'quiet'] | |
subprocess.run(command, check=True) | |
return audio_path | |
except subprocess.CalledProcessError as e: | |
print(f"Error: {e}") | |
def burn_subtitles(video_file_path, subtitle_file_path): | |
''' | |
Burns the subtitles onto the video | |
Args: | |
video_file_path (str): Path to the input video file. | |
subtitle_file_path (str): Path to the subtitle file. | |
Returns: | |
str: Path of output video with subtitles. | |
''' | |
try: | |
# Getting Output File Path | |
video_filename, video_extension = os.path.splitext(video_file_path) | |
subtitle_filename, subtitle_extension = os.path.splitext(subtitle_file_path) | |
output_file_path = video_filename + subtitle_extension.replace('.', '_') + video_extension | |
# Burning the Subtitles onto Video using FFMPEG Command | |
command = ['ffmpeg', '-i', video_file_path, | |
'-vf', f'subtitles={subtitle_file_path}', | |
output_file_path, '-loglevel', 'quiet'] | |
subprocess.run(command, check=True) | |
return output_file_path | |
except subprocess.CalledProcessError as e: | |
print(f"Error: {e}") |