whispertube_backend / media_download.py
uzi007's picture
Added UUID & Media Metadata
6459994
raw
history blame
21.3 kB
import os
import re
import json
import time
import locale
import subprocess
from yt_dlp import YoutubeDL
import numpy as np
import pandas as pd
from abc import ABC, abstractmethod
from pytube import YouTube
class MediaDownloader(ABC):
def __init__(self, url, output_path, start_time=None, end_time=None):
self.url = url
self.output_path = os.path.join(os.getcwd(), output_path)
self.start_time = start_time
self.end_time = end_time
self.__create_output_dir()
def __create_output_dir(self):
if not os.path.exists(self.output_path):
os.makedirs(self.output_path)
@abstractmethod
def _get_supported_media_formats(self):
pass
@abstractmethod
def download(self, media_type, media_format, media_quality):
pass
@abstractmethod
def _download_media(self, media_type, media_format, media_quality):
pass
@abstractmethod
def _download_audio(self, audio_format, audio_quality):
pass
@abstractmethod
def _download_video(self, video_format, video_quality):
pass
@abstractmethod
def _download_audio_and_video(self, media_format, media_quality):
pass
@abstractmethod
def _download_media_chunk(self, media_type, media_format, media_quality):
pass
@abstractmethod
def _download_audio_chunk(self, audio_format, audio_quality):
pass
@abstractmethod
def _download_video_chunk(self, video_format, video_quality):
pass
class YoutubeDownloader(MediaDownloader):
def __init__(self, url, output_path, start_time=None, end_time=None):
super().__init__(url, output_path, start_time, end_time)
self.youtube = YouTube(url)
self.title = self.youtube.title
self.media_length = self.youtube.length
self.thumbnail_url = self.youtube.thumbnail_url
self.streams = self.youtube.streams
self.streams_df, self.media_formats_dict = self._get_supported_media_formats()
self.num_likes, self.num_views = self._get_num_likes_views()
def get_media_formats(self):
'''
Returns a dictionary for supported media formats
'''
return self.media_formats_dict
def _get_num_likes_views(self):
'''
Returns the number of likes & views in the video
'''
with YoutubeDL() as ydl:
info = ydl.extract_info(self.url, download=False)
num_likes = info.get('like_count', None)
num_views = info.get('view_count', None)
# num_comments = info.get('comment_count', None)
return num_likes, num_views
def get_media_metadata(self):
'''
Returns a dictionary for media metadata
'''
media_info = {
'title': self.title,
'num_likes': self.__format_number(self.num_likes),
'num_views': self.__format_number(self.num_views),
'media_length': self.media_length,
'thumbnail_url': self.thumbnail_url
}
return media_info
@staticmethod
def __format_number(num):
'''
Returns the formatted number
E.g: Given input 123456789, it returns 123,456,789
'''
# Setting the Locale
locale.setlocale(locale.LC_ALL, '')
# Formatting the Number with Commas
num = locale.format_string("%d", num, grouping=True)
return num
@staticmethod
def __get_quality_int(media_quality):
'''
Returns the Quality in Integer
E.g: Given input 1080p, it returns 1080
'''
match = re.search(r'^\d+', media_quality)
if match:
return int(match.group())
else:
return None
def _get_supported_media_formats(self):
'''
Returns all supported media formats for both audio & video
'''
try:
# Creating Pandas Dataframe for Video Streams' Details
streams_details = []
for stream in self.streams.filter(only_video=True):
media_type = stream.type
media_format = stream.mime_type.split('/')[1]
quality = stream.resolution
progressive = stream.is_progressive
stream_details = [media_type, media_format, quality, progressive]
streams_details.append(stream_details)
cols = ['media_type', 'media_format', 'media_quality', 'progressive']
streams_df = pd.DataFrame(streams_details, columns=cols)
# Adding Custom Audio Streams
streams_df.loc[len(streams_df)] = ['audio', 'mp3', '128kbps', False]
streams_df.loc[len(streams_df)] = ['audio', 'mp3', '256kbps', False]
streams_df.loc[len(streams_df)] = ['audio', 'wav', '1411kbps', False]
# Converting to Dictionary for Unique User Options
media_formats_dict = dict()
for media_type in sorted(streams_df['media_type'].unique()):
media_formats_dict[media_type] = dict()
media_type_df = streams_df[streams_df['media_type'] == media_type]
for media_format in sorted(media_type_df['media_format'].unique()):
media_format_df = media_type_df[media_type_df['media_format'] == media_format]
media_qualities = sorted(media_format_df['media_quality'].unique(), key=self.__get_quality_int)
media_formats_dict[media_type][media_format] = media_qualities
return streams_df, media_formats_dict
except Exception as pytube_error:
print(f"PyTube Error in _get_supported_media_formats: \n{pytube_error}\n")
print('Trying with yt-dlp...')
try:
# Download Command
command = ["yt-dlp", "--list-formats", url,
"--get-filename", "--format", "best[ext=mp4]/best[ext=webm]"]
print(' '.join(command))
# Running the command using Subprocess and capturing the output
completed_process = subprocess.run(command, text=True, stdout=subprocess.PIPE)
if completed_process.returncode != 0:
print(f"yt-dlp error in _get_supported_media_formats:")
print(completed_process.stderr)
else:
output_lines = completed_process.stdout.split('\n')
output_lines = [line for line in output_lines if line.strip()]
# Create a list of dictionaries for each format entry
streams_details = []
for line in output_lines[2:]: # Skip the header lines
fields = line.split()
media_format = fields[1]
media_quality = fields[-2]
if media_format in ['mp4', 'webm']:
if 'p,' in media_quality:
media_type = 'video'
media_quality = media_quality[:-1]
progressive = False
stream_details = [media_type, media_format, media_quality, progressive]
streams_details.append(stream_details)
# Create a pandas DataFrame from the list of dictionaries
cols = ['media_type', 'media_format', 'media_quality', 'progressive']
streams_df = pd.DataFrame(streams_details, columns=cols)
streams_df = streams_df.drop_duplicates().reset_index(drop=True)
# Adding Custom Audio Streams
streams_df.loc[len(streams_df)] = ['audio', 'mp3', '128kbps', False]
streams_df.loc[len(streams_df)] = ['audio', 'mp3', '256kbps', False]
streams_df.loc[len(streams_df)] = ['audio', 'wav', '1411kbps', False]
# Converting to Dictionary for Unique User Options
media_formats_dict = dict()
for media_type in sorted(streams_df['media_type'].unique()):
media_formats_dict[media_type] = dict()
media_type_df = streams_df[streams_df['media_type'] == media_type]
for media_format in sorted(media_type_df['media_format'].unique()):
media_format_df = media_type_df[media_type_df['media_format'] == media_format]
media_qualities = sorted(media_format_df['media_quality'].unique(), key=self.__get_quality_int)
media_formats_dict[media_type][media_format] = media_qualities
return streams_df, media_formats_dict
except Exception as yt_dlp_error:
print(f"yt-dlp error in _get_supported_media_formats: \n{yt_dlp_error}\n")
def select_media_format(self):
'''
For selecting media format to download
'''
print(json.dumps(self.media_formats_dict, indent=12))
# Getting Media Type
media_types = list(self.media_formats_dict.keys())
media_type = input(f'Select a Media Type from {media_types}: ')
assert(media_type in media_types)
# Getting Media Format
media_formats = list(self.media_formats_dict[media_type].keys())
media_format = input(f'Select a Media Format from {media_formats}: ')
assert(media_format in media_formats)
# Getting Media Type
media_qualities = self.media_formats_dict[media_type][media_format]
media_quality = input(f'Select a Media Quality from {media_qualities}: ')
assert(media_quality in media_qualities)
return media_type, media_format, media_quality
def download(self, media_type, media_format, media_quality):
'''
Download Handler Function:
Handles all types of media download
'''
if (self.start_time) or (self.end_time):
output_path = self._download_media_chunk(media_type, media_format, media_quality)
else:
output_path = self._download_media(media_type, media_format, media_quality)
return output_path
def _download_media(self, media_type, media_format, media_quality):
'''
Media Download Handler Function:
Checks which type of media download is required & passes it onto the relevant method
'''
# Checking for the Media in Dataframe
media_mask = (self.streams_df['media_type'] == media_type) & \
(self.streams_df['media_format'] == media_format) & \
(self.streams_df['media_quality'] == media_quality)
media_df = self.streams_df[media_mask]
# Downloading Media according to the Arguments
if media_type == 'audio':
output_path = self._download_audio(media_format, media_quality)
elif media_type == 'video':
# Checking if Progressive Video is Available
is_progressive = True if True in media_df['progressive'].unique() else False
if is_progressive:
output_path = self._download_video(media_format, media_quality)
else:
output_path = self._download_audio_and_video(media_format, media_quality)
return output_path
def _download_audio(self, audio_format, audio_quality):
'''
Filters the required audio stream & downloads it
'''
try:
# Getting Quality Command String
quality = str(self.__get_quality_int(audio_quality)) + 'K'
# Getting Output Path
output_path = os.path.join(self.output_path, f"{self.title}.{audio_format}")
# Download Command
command = [
"yt-dlp",
"-x", "--audio-format", audio_format,
"--audio-quality", quality,
"-o", output_path,
self.url, "-q"
]
# Running the command using Subprocess
subprocess.run(command)
return output_path
except Exception as yt_dlp_error:
print(f"Error in _download_audio: \n{yt_dlp_error}\n")
def _download_video(self, video_format, video_quality):
'''
Filters the required video stream & downloads it
Only for Progressive media i.e containing both audio & video streams
'''
try:
stream = self.streams.filter(progressive=True, file_extension=video_format, resolution=video_quality).first()
print(stream)
video_path = stream.download(output_path=self.output_path, filename=f"{self.title}.{video_format}")
return video_path
except Exception as pytube_error:
print(f"PyTube error in _download_video: \n{pytube_error}\n")
print('Trying with yt-dlp...')
try:
# Getting Output Path
output_path = os.path.join(self.output_path, f"{self.title}.{video_format}")
# Getting Video Quality Integer
video_quality = self.__get_quality_int(video_quality)
# Setting Formats
if video_format == 'mp4':
video_codec = "h264"
audio_codec = "m4a"
elif video_format == 'webm':
video_codec = "vp9"
audio_codec = "opus"
else:
print('Unexpected Video Format Encountered:', video_format)
os.exit(0)
# Download Command
command = [
"yt-dlp",
url,
"-S", f"res:{video_quality},vcodec:{video_codec},acodec:{audio_codec}",
"--merge-output-format", video_format,
"-o", f"{output_path}",
"-q"
]
print(' '.join(command))
# Running the command using Subprocess
subprocess.run(command, check=True)
return output_path
except Exception as yt_dlp_error:
print(f"yt-dlp error in _download_video: \n{yt_dlp_error}\n")
def _download_audio_and_video(self, media_format, media_quality):
'''
Filters the required video stream & downloads it
Filters the best quality audio stream of the same format & downloads it
'''
try:
# Downloading Audio
stream = self.streams.filter(file_extension=media_format, only_audio=True).order_by('abr').desc().first()
print(stream)
audio_filename = f"{self.title} - Audio.{media_format}"
audio_path = stream.download(output_path=self.output_path, filename=audio_filename)
# Downloading Video
stream = self.streams.filter(file_extension=media_format, resolution=media_quality).first()
print(stream)
video_filename = f"{self.title} - Video.{media_format}"
video_path = stream.download(output_path=self.output_path, filename=video_filename)
# Combining the Audio & Video Files using FFMPEG Command
output_path = os.path.join(self.output_path, f"{self.title}.{media_format}")
command = ['ffmpeg', '-i', video_path, '-i', audio_path,
'-c:v', 'copy', '-c:a', 'copy', output_path,
'-loglevel', 'quiet']
subprocess.run(command)
os.remove(audio_path)
os.remove(video_path)
return output_path
except Exception as pytube_error:
print(f"PyTube error in _download_audio_and_video: \n{pytube_error}\n")
print('Trying with yt-dlp...')
try:
# Getting Output Path
output_path = os.path.join(self.output_path, f"{self.title}.{media_format}")
# Getting Video Quality Integer
media_quality = self.__get_quality_int(media_quality)
# Setting Formats
if media_format == 'mp4':
video_codec = "h264"
audio_codec = "m4a"
elif media_format == 'webm':
video_codec = "vp9"
audio_codec = "opus"
else:
print('Unexpected Video Format Encountered:', media_format)
os.exit(0)
# Download Command
command = [
"yt-dlp",
url,
"-S", f"res:{media_quality},vcodec:{video_codec},acodec:{audio_codec}",
"--merge-output-format", media_format,
"-o", f"{output_path}",
"-q"
]
print(' '.join(command))
# Running the command using Subprocess
subprocess.run(command)
return output_path
except Exception as yt_dlp_error:
print(f"yt-dlp error in _download_audio_and_video: \n{yt_dlp_error}\n")
def _download_media_chunk(self, media_type, media_format, media_quality):
'''
Media Download Handler Function:
Checks which type of media download is required for particular chunk & passes it onto the relevant method
'''
# Downloading Media according to the Arguments
if media_type == 'audio':
output_path = self._download_audio_chunk(media_format, media_quality)
elif media_type == 'video':
output_path = self._download_video_chunk(media_format, media_quality)
return output_path
def _download_audio_chunk(self, audio_format, audio_quality):
'''
Filters the required audio stream & downloads it for particular chunk
'''
try:
# Getting Chunk Command String
if (self.start_time) and (self.end_time):
chunk_string = f"-ss {self.start_time} -to {self.end_time}"
elif (self.start_time) and (not self.end_time):
chunk_string = f"-ss {self.start_time}"
elif (not self.start_time) and (self.end_time):
chunk_string = f"-to {self.end_time}"
# Getting Quality Command String
quality = str(self.__get_quality_int(audio_quality)) + 'K'
# Getting Output Path
output_path = os.path.join(self.output_path, f"{self.title}.{audio_format}")
# Download Command
command = [
"yt-dlp",
"-x", "--audio-format", audio_format,
"--audio-quality", quality,
"--external-downloader", "ffmpeg",
"--external-downloader-args", chunk_string,
"-o", output_path,
url, "-q"
]
# Running the command using Subprocess
subprocess.run(command)
return output_path
except Exception as e:
print(f"Error in _download_audio_chunk: {e}")
def _download_video_chunk(self, video_format, video_quality):
'''
Filters the required video stream & downloads it for particular chunk
'''
try:
# Getting Chunk Command String
if (self.start_time) and (self.end_time):
chunk_string = f"-ss {self.start_time} -to {self.end_time}"
elif (self.start_time) and (not self.end_time):
chunk_string = f"-ss {self.start_time}"
elif (not self.start_time) and (self.end_time):
chunk_string = f"-to {self.end_time}"
# Getting Output Path
output_path = os.path.join(self.output_path, f"{self.title}.{video_format}")
# Getting Video Quality Integer
video_quality = self.__get_quality_int(video_quality)
# Setting Formats
if video_format == 'mp4':
video_codec = "h264"
audio_codec = "m4a"
elif video_format == 'webm':
video_codec = "vp9"
audio_codec = "opus"
else:
print('Unexpected Video Format Encountered:', video_format)
os.exit(0)
# Download Command
command = [
"yt-dlp",
url,
"-S", f"res:{video_quality},vcodec:{video_codec},acodec:{audio_codec}",
"--merge-output-format", video_format,
"--download-sections", f"*{self.start_time}-{self.end_time}",
"-o", f"{output_path}",
"-q"
]
print(' '.join(command))
# Running the command using Subprocess
subprocess.run(command)
return output_path
except Exception as e:
print(f"Error in _download_video_chunk: {e}")