CLI / ffmpeg tests
Browse files- pytube/cli.py +42 -51
- tests/test_cli.py +28 -19
pytube/cli.py
CHANGED
@@ -1,4 +1,5 @@
|
|
1 |
#!/usr/bin/env python3
|
|
|
2 |
"""A simple command line application to download youtube videos."""
|
3 |
|
4 |
import argparse
|
@@ -226,9 +227,7 @@ def _download(
|
|
226 |
sys.stdout.write("\n")
|
227 |
|
228 |
|
229 |
-
def
|
230 |
-
base: str, subtype: Optional[str], video_audio: str, target: str
|
231 |
-
) -> str:
|
232 |
"""
|
233 |
Given a base name, the file format, and the target directory, will generate
|
234 |
a filename unique for that directory and file format.
|
@@ -236,15 +235,17 @@ def unique_name(
|
|
236 |
The given base-name.
|
237 |
:param str subtype:
|
238 |
The filetype of the video which will be downloaded.
|
|
|
|
|
239 |
:param Path target:
|
240 |
Target directory for download.
|
241 |
"""
|
242 |
counter = 0
|
243 |
while True:
|
244 |
-
|
245 |
-
|
246 |
-
if not os.path.exists(
|
247 |
-
return
|
248 |
counter += 1
|
249 |
|
250 |
|
@@ -252,7 +253,7 @@ def ffmpeg_process(
|
|
252 |
youtube: YouTube, resolution: str, target: Optional[str] = None
|
253 |
) -> None:
|
254 |
"""
|
255 |
-
Decides the correct video stream to download, then calls
|
256 |
|
257 |
:param YouTube youtube:
|
258 |
A valid YouTube object.
|
@@ -262,79 +263,69 @@ def ffmpeg_process(
|
|
262 |
Target directory for download
|
263 |
"""
|
264 |
youtube.register_on_progress_callback(on_progress)
|
265 |
-
|
266 |
-
target = os.getcwd()
|
267 |
|
268 |
if resolution == "best":
|
269 |
-
highest_quality = (
|
270 |
-
youtube.streams.filter(progressive=False)
|
271 |
-
.order_by("resolution")
|
272 |
-
.desc()
|
273 |
-
.first()
|
274 |
-
)
|
275 |
-
|
276 |
video_stream = (
|
277 |
-
youtube.streams.filter(progressive=False
|
278 |
.order_by("resolution")
|
279 |
.desc()
|
280 |
.first()
|
281 |
)
|
282 |
-
|
283 |
-
if highest_quality.resolution == video_stream.resolution:
|
284 |
-
ffmpeg_downloader(youtube=youtube, stream=video_stream, target=target)
|
285 |
-
else:
|
286 |
-
ffmpeg_downloader(youtube=youtube, stream=highest_quality, target=target)
|
287 |
else:
|
288 |
video_stream = youtube.streams.filter(
|
289 |
progressive=False, resolution=resolution, subtype="mp4"
|
290 |
).first()
|
291 |
-
if
|
292 |
-
ffmpeg_downloader(youtube=youtube, stream=video_stream, target=target)
|
293 |
-
else:
|
294 |
video_stream = youtube.streams.filter(
|
295 |
progressive=False, resolution=resolution
|
296 |
).first()
|
297 |
-
|
298 |
-
|
299 |
-
|
300 |
-
|
301 |
-
|
302 |
-
ffmpeg_downloader(youtube=youtube, stream=video_stream, target=target)
|
303 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
304 |
|
305 |
-
|
|
|
306 |
"""
|
307 |
Given a YouTube Stream object, finds the correct audio stream, downloads them both
|
308 |
giving them a unique name, them uses ffmpeg to create a new file with the audio
|
309 |
and video from the previously downloaded files. Then deletes the original adaptive
|
310 |
streams, leaving the combination.
|
311 |
|
312 |
-
:param
|
313 |
-
A valid
|
314 |
-
:param Stream
|
315 |
-
A valid Stream object
|
316 |
:param Path target:
|
317 |
A valid Path object
|
318 |
"""
|
319 |
-
audio_stream = (
|
320 |
-
youtube.streams.filter(only_audio=True, subtype=stream.subtype)
|
321 |
-
.order_by("abr")
|
322 |
-
.desc()
|
323 |
-
.first()
|
324 |
-
)
|
325 |
|
326 |
-
video_unique_name =
|
327 |
-
safe_filename(
|
328 |
)
|
329 |
-
audio_unique_name =
|
330 |
-
safe_filename(
|
331 |
)
|
332 |
-
_download(stream=
|
333 |
_download(stream=audio_stream, target=target, filename=audio_unique_name)
|
334 |
|
335 |
-
video_path = os.path.join(target,
|
336 |
-
audio_path = os.path.join(target,
|
337 |
-
final_path = os.path.join(
|
|
|
|
|
338 |
|
339 |
subprocess.run( # nosec
|
340 |
[
|
|
|
1 |
#!/usr/bin/env python3
|
2 |
+
# -*- coding: utf-8 -*-
|
3 |
"""A simple command line application to download youtube videos."""
|
4 |
|
5 |
import argparse
|
|
|
227 |
sys.stdout.write("\n")
|
228 |
|
229 |
|
230 |
+
def _unique_name(base: str, subtype: str, media_type: str, target: str) -> str:
|
|
|
|
|
231 |
"""
|
232 |
Given a base name, the file format, and the target directory, will generate
|
233 |
a filename unique for that directory and file format.
|
|
|
235 |
The given base-name.
|
236 |
:param str subtype:
|
237 |
The filetype of the video which will be downloaded.
|
238 |
+
:param str media_type:
|
239 |
+
The media_type of the file, ie. "audio" or "video"
|
240 |
:param Path target:
|
241 |
Target directory for download.
|
242 |
"""
|
243 |
counter = 0
|
244 |
while True:
|
245 |
+
file_name = f"{base}_{media_type}_{counter}.{subtype}"
|
246 |
+
file_path = os.path.join(target, file_name)
|
247 |
+
if not os.path.exists(file_path):
|
248 |
+
return file_path
|
249 |
counter += 1
|
250 |
|
251 |
|
|
|
253 |
youtube: YouTube, resolution: str, target: Optional[str] = None
|
254 |
) -> None:
|
255 |
"""
|
256 |
+
Decides the correct video stream to download, then calls _ffmpeg_downloader.
|
257 |
|
258 |
:param YouTube youtube:
|
259 |
A valid YouTube object.
|
|
|
263 |
Target directory for download
|
264 |
"""
|
265 |
youtube.register_on_progress_callback(on_progress)
|
266 |
+
target = target or os.getcwd()
|
|
|
267 |
|
268 |
if resolution == "best":
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
269 |
video_stream = (
|
270 |
+
youtube.streams.filter(progressive=False)
|
271 |
.order_by("resolution")
|
272 |
.desc()
|
273 |
.first()
|
274 |
)
|
|
|
|
|
|
|
|
|
|
|
275 |
else:
|
276 |
video_stream = youtube.streams.filter(
|
277 |
progressive=False, resolution=resolution, subtype="mp4"
|
278 |
).first()
|
279 |
+
if not video_stream:
|
|
|
|
|
280 |
video_stream = youtube.streams.filter(
|
281 |
progressive=False, resolution=resolution
|
282 |
).first()
|
283 |
+
if video_stream is None:
|
284 |
+
print(f"Could not find a stream with resolution: {resolution}")
|
285 |
+
print("Try one of these:")
|
286 |
+
display_streams(youtube)
|
287 |
+
sys.exit()
|
|
|
288 |
|
289 |
+
audio_stream = youtube.streams.get_audio_only(video_stream.subtype)
|
290 |
+
if not audio_stream:
|
291 |
+
audio_stream = youtube.streams.filter(only_audio=True).order_by("abr").last()
|
292 |
+
if not audio_stream:
|
293 |
+
print("Could not find an audio only stream")
|
294 |
+
sys.exit()
|
295 |
+
_ffmpeg_downloader(
|
296 |
+
audio_stream=audio_stream, video_stream=video_stream, target=target
|
297 |
+
)
|
298 |
|
299 |
+
|
300 |
+
def _ffmpeg_downloader(audio_stream: Stream, video_stream: Stream, target: str) -> None:
|
301 |
"""
|
302 |
Given a YouTube Stream object, finds the correct audio stream, downloads them both
|
303 |
giving them a unique name, them uses ffmpeg to create a new file with the audio
|
304 |
and video from the previously downloaded files. Then deletes the original adaptive
|
305 |
streams, leaving the combination.
|
306 |
|
307 |
+
:param Stream audio_stream:
|
308 |
+
A valid Stream object representing the audio to download
|
309 |
+
:param Stream video_stream:
|
310 |
+
A valid Stream object representing the video to download
|
311 |
:param Path target:
|
312 |
A valid Path object
|
313 |
"""
|
|
|
|
|
|
|
|
|
|
|
|
|
314 |
|
315 |
+
video_unique_name = _unique_name(
|
316 |
+
safe_filename(video_stream.title), video_stream.subtype, "video", target=target
|
317 |
)
|
318 |
+
audio_unique_name = _unique_name(
|
319 |
+
safe_filename(video_stream.title), audio_stream.subtype, "audio", target=target
|
320 |
)
|
321 |
+
_download(stream=video_stream, target=target, filename=video_unique_name)
|
322 |
_download(stream=audio_stream, target=target, filename=audio_unique_name)
|
323 |
|
324 |
+
video_path = os.path.join(target, video_unique_name)
|
325 |
+
audio_path = os.path.join(target, audio_unique_name)
|
326 |
+
final_path = os.path.join(
|
327 |
+
target, f"{safe_filename(video_stream.title)}.{video_stream.subtype}"
|
328 |
+
)
|
329 |
|
330 |
subprocess.run( # nosec
|
331 |
[
|
tests/test_cli.py
CHANGED
@@ -220,17 +220,37 @@ def test_download_by_resolution_not_exists(youtube, stream_query):
|
|
220 |
|
221 |
|
222 |
@mock.patch("pytube.cli.YouTube")
|
223 |
-
@mock.patch("pytube.cli.
|
224 |
-
def
|
|
|
225 |
parser = argparse.ArgumentParser()
|
226 |
args = parse_args(parser, ["http://youtube.com/watch?v=9bZkp7q19f0", "-f", "best"])
|
227 |
cli._parse_args = MagicMock(return_value=args)
|
228 |
-
|
229 |
-
cli.
|
230 |
-
|
231 |
-
|
232 |
-
|
233 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
234 |
|
235 |
|
236 |
@mock.patch("pytube.cli.YouTube.__init__", return_value=None)
|
@@ -244,17 +264,6 @@ def test_download_audio(youtube):
|
|
244 |
cli.download_audio.assert_called()
|
245 |
|
246 |
|
247 |
-
@mock.patch("pytube.cli.YouTube.__init__", return_value=None)
|
248 |
-
def test_ffmpeg_process(youtube):
|
249 |
-
parser = argparse.ArgumentParser()
|
250 |
-
args = parse_args(parser, ["http://youtube.com/watch?v=9bZkp7q19f0", "-f", "2160p"])
|
251 |
-
cli._parse_args = MagicMock(return_value=args)
|
252 |
-
cli.ffmpeg_process = MagicMock()
|
253 |
-
cli.main()
|
254 |
-
youtube.assert_called()
|
255 |
-
cli.ffmpeg_process.assert_called()
|
256 |
-
|
257 |
-
|
258 |
@mock.patch("pytube.cli.YouTube.__init__", return_value=None)
|
259 |
def test_perform_args_on_youtube(youtube):
|
260 |
parser = argparse.ArgumentParser()
|
|
|
220 |
|
221 |
|
222 |
@mock.patch("pytube.cli.YouTube")
|
223 |
+
@mock.patch("pytube.cli.ffmpeg_process")
|
224 |
+
def test_perform_args_should_ffmpeg_process(ffmpeg_process, youtube):
|
225 |
+
# Given
|
226 |
parser = argparse.ArgumentParser()
|
227 |
args = parse_args(parser, ["http://youtube.com/watch?v=9bZkp7q19f0", "-f", "best"])
|
228 |
cli._parse_args = MagicMock(return_value=args)
|
229 |
+
# When
|
230 |
+
cli._perform_args_on_youtube(youtube, args)
|
231 |
+
# Then
|
232 |
+
ffmpeg_process.assert_called_with(youtube=youtube, resolution="best", target=None)
|
233 |
+
|
234 |
+
|
235 |
+
@mock.patch("pytube.cli.YouTube")
|
236 |
+
@mock.patch("pytube.cli._ffmpeg_downloader")
|
237 |
+
def test_ffmpeg_process_best_should_download(_ffmpeg_downloader, youtube):
|
238 |
+
# Given
|
239 |
+
target = "/target"
|
240 |
+
streams = MagicMock()
|
241 |
+
youtube.streams = streams
|
242 |
+
video_stream = MagicMock()
|
243 |
+
streams.filter.return_value.order_by.return_value.desc.return_value.first.return_value = (
|
244 |
+
video_stream
|
245 |
+
)
|
246 |
+
audio_stream = MagicMock()
|
247 |
+
streams.get_audio_only.return_value = audio_stream
|
248 |
+
# When
|
249 |
+
cli.ffmpeg_process(youtube, "best", target)
|
250 |
+
# Then
|
251 |
+
_ffmpeg_downloader.assert_called_with(
|
252 |
+
audio_stream=audio_stream, video_stream=video_stream, target=target
|
253 |
+
)
|
254 |
|
255 |
|
256 |
@mock.patch("pytube.cli.YouTube.__init__", return_value=None)
|
|
|
264 |
cli.download_audio.assert_called()
|
265 |
|
266 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
267 |
@mock.patch("pytube.cli.YouTube.__init__", return_value=None)
|
268 |
def test_perform_args_on_youtube(youtube):
|
269 |
parser = argparse.ArgumentParser()
|