hbmartin commited on
Commit
bfacd7d
·
1 Parent(s): 4af667e

create Monostate class with callback protocols, add typing to __main__ and playlist

Browse files
Makefile CHANGED
@@ -10,8 +10,8 @@ pipenv:
10
 
11
  test:
12
  pipenv run flake8
13
- pipenv run black pytube --diff
14
- pipenv run black tests --diff
15
  pipenv run mypy pytube
16
  pipenv run pytest --cov-report term-missing --cov=pytube
17
 
 
10
 
11
  test:
12
  pipenv run flake8
13
+ pipenv run black pytube --check
14
+ pipenv run black tests --check
15
  pipenv run mypy pytube
16
  pipenv run pytest --cov-report term-missing --cov=pytube
17
 
pytube/__main__.py CHANGED
@@ -10,6 +10,7 @@ smaller peripheral modules and functions.
10
 
11
  import json
12
  import logging
 
13
  from urllib.parse import parse_qsl
14
  from html import unescape
15
 
@@ -23,6 +24,7 @@ from pytube import StreamQuery
23
  from pytube.mixins import install_proxy
24
  from pytube.exceptions import VideoUnavailable
25
  from pytube.helpers import apply_mixin
 
26
 
27
  logger = logging.getLogger(__name__)
28
 
@@ -32,11 +34,11 @@ class YouTube(object):
32
 
33
  def __init__(
34
  self,
35
- url,
36
- defer_prefetch_init=False,
37
- on_progress_callback=None,
38
- on_complete_callback=None,
39
- proxies=None,
40
  ):
41
  """Construct a :class:`YouTube <YouTube>`.
42
 
@@ -52,23 +54,25 @@ class YouTube(object):
52
  complete events.
53
 
54
  """
55
- self.js = None # js fetched by js_url
56
- self.js_url = None # the url to the js, parsed from watch html
57
 
58
  # note: vid_info may eventually be removed. It sounds like it once had
59
  # additional formats, but that doesn't appear to still be the case.
60
 
61
- self.vid_info = None # content fetched by vid_info_url
62
- self.vid_info_url = None # the url to vid info, parsed from watch html
 
 
63
 
64
- self.watch_html = None # the html of /watch?v=<video_id>
65
- self.embed_html = None
66
- self.player_config_args = None # inline js in the html containing
67
  # streams
68
- self.age_restricted = None
69
 
70
- self.fmt_streams = [] # list of :class:`Stream <Stream>` instances
71
- self.caption_tracks = []
72
 
73
  # video_id part of /watch?v=<video_id>
74
  self.video_id = extract.video_id(url)
@@ -79,11 +83,9 @@ class YouTube(object):
79
  self.embed_url = extract.embed_url(self.video_id)
80
  # A dictionary shared between all instances of :class:`Stream <Stream>`
81
  # (Borg pattern).
82
- self.stream_monostate = {
83
- # user defined callback functions.
84
- "on_progress": on_progress_callback,
85
- "on_complete": on_complete_callback,
86
- }
87
 
88
  if proxies:
89
  install_proxy(proxies)
@@ -113,16 +115,17 @@ class YouTube(object):
113
  """
114
  logger.info("init started")
115
 
116
- self.vid_info = {k: v for k, v in parse_qsl(self.vid_info)}
117
  if self.age_restricted:
118
  self.player_config_args = self.vid_info
119
  else:
 
120
  self.player_config_args = extract.get_ytplayer_config(self.watch_html,)[
121
  "args"
122
  ]
123
 
124
  # Fix for KeyError: 'title' issue #434
125
- if "title" not in self.player_config_args:
126
  i_start = self.watch_html.lower().index("<title>") + len("<title>")
127
  i_end = self.watch_html.lower().index("</title>")
128
  title = self.watch_html[i_start:i_end].strip()
@@ -130,7 +133,8 @@ class YouTube(object):
130
  title = title[:index] if index > 0 else title
131
  self.player_config_args["title"] = unescape(title)
132
 
133
- self.vid_descr = extract.get_vid_descr(self.watch_html)
 
134
  # https://github.com/nficano/pytube/issues/165
135
  stream_maps = ["url_encoded_fmt_stream_map"]
136
  if "adaptive_fmts" in self.player_config_args:
@@ -143,10 +147,14 @@ class YouTube(object):
143
  mixins.apply_descrambler(self.player_config_args, fmt)
144
 
145
  try:
146
- mixins.apply_signature(self.player_config_args, fmt, self.js)
 
 
147
  except TypeError:
148
- self.js_url = extract.js_url(self.embed_html, self.age_restricted,)
 
149
  self.js = request.get(self.js_url)
 
150
  mixins.apply_signature(self.player_config_args, fmt, self.js)
151
 
152
  # build instances of :class:`Stream <Stream>`
@@ -169,18 +177,21 @@ class YouTube(object):
169
 
170
  """
171
  self.watch_html = request.get(url=self.watch_url)
172
- if '<img class="icon meh" src="/yts/img' not in self.watch_html:
 
 
 
 
173
  raise VideoUnavailable("This video is unavailable.")
174
  self.embed_html = request.get(url=self.embed_url)
175
  self.age_restricted = extract.is_age_restricted(self.watch_html)
176
  self.vid_info_url = extract.video_info_url(
177
  video_id=self.video_id,
178
  watch_url=self.watch_url,
179
- watch_html=self.watch_html,
180
  embed_html=self.embed_html,
181
  age_restricted=self.age_restricted,
182
  )
183
- self.vid_info = request.get(self.vid_info_url)
184
  if not self.age_restricted:
185
  self.js_url = extract.js_url(self.watch_html, self.age_restricted)
186
  self.js = request.get(self.js_url)
@@ -302,9 +313,11 @@ class YouTube(object):
302
  :rtype: str
303
 
304
  """
305
- return self.player_config_args["player_response"]["videoDetails"][
306
- "lengthSeconds"
307
- ]
 
 
308
 
309
  @property
310
  def views(self) -> str:
@@ -330,7 +343,7 @@ class YouTube(object):
330
  .get("author", "unknown")
331
  )
332
 
333
- def register_on_progress_callback(self, func):
334
  """Register a download progress callback function post initialization.
335
 
336
  :param callable func:
@@ -340,9 +353,9 @@ class YouTube(object):
340
  :rtype: None
341
 
342
  """
343
- self.stream_monostate["on_progress"] = func
344
 
345
- def register_on_complete_callback(self, func):
346
  """Register a download complete callback function post initialization.
347
 
348
  :param callable func:
@@ -351,4 +364,4 @@ class YouTube(object):
351
  :rtype: None
352
 
353
  """
354
- self.stream_monostate["on_complete"] = func
 
10
 
11
  import json
12
  import logging
13
+ from typing import Optional, Dict, List
14
  from urllib.parse import parse_qsl
15
  from html import unescape
16
 
 
24
  from pytube.mixins import install_proxy
25
  from pytube.exceptions import VideoUnavailable
26
  from pytube.helpers import apply_mixin
27
+ from pytube.monostate import OnProgress, OnComplete, Monostate
28
 
29
  logger = logging.getLogger(__name__)
30
 
 
34
 
35
  def __init__(
36
  self,
37
+ url: str,
38
+ defer_prefetch_init: bool = False,
39
+ on_progress_callback: Optional[OnProgress] = None,
40
+ on_complete_callback: Optional[OnComplete] = None,
41
+ proxies: Dict[str, str] = None,
42
  ):
43
  """Construct a :class:`YouTube <YouTube>`.
44
 
 
54
  complete events.
55
 
56
  """
57
+ self.js: Optional[str] = None # js fetched by js_url
58
+ self.js_url: Optional[str] = None # the url to the js, parsed from watch html
59
 
60
  # note: vid_info may eventually be removed. It sounds like it once had
61
  # additional formats, but that doesn't appear to still be the case.
62
 
63
+ # the url to vid info, parsed from watch html
64
+ self.vid_info_url: Optional[str] = None
65
+ self.vid_info_raw = None # content fetched by vid_info_url
66
+ self.vid_info: Optional[Dict] = None # parsed content of vid_info_raw
67
 
68
+ self.watch_html: Optional[str] = None # the html of /watch?v=<video_id>
69
+ self.embed_html: Optional[str] = None
70
+ self.player_config_args: Dict = {} # inline js in the html containing
71
  # streams
72
+ self.age_restricted: Optional[bool] = None
73
 
74
+ self.fmt_streams: List[Stream] = []
75
+ self.caption_tracks: List[Caption] = []
76
 
77
  # video_id part of /watch?v=<video_id>
78
  self.video_id = extract.video_id(url)
 
83
  self.embed_url = extract.embed_url(self.video_id)
84
  # A dictionary shared between all instances of :class:`Stream <Stream>`
85
  # (Borg pattern).
86
+ self.stream_monostate = Monostate(
87
+ on_progress=on_progress_callback, on_complete=on_complete_callback
88
+ )
 
 
89
 
90
  if proxies:
91
  install_proxy(proxies)
 
115
  """
116
  logger.info("init started")
117
 
118
+ self.vid_info = {k: v for k, v in parse_qsl(self.vid_info_raw)}
119
  if self.age_restricted:
120
  self.player_config_args = self.vid_info
121
  else:
122
+ assert self.watch_html is not None
123
  self.player_config_args = extract.get_ytplayer_config(self.watch_html,)[
124
  "args"
125
  ]
126
 
127
  # Fix for KeyError: 'title' issue #434
128
+ if "title" not in self.player_config_args: # type: ignore
129
  i_start = self.watch_html.lower().index("<title>") + len("<title>")
130
  i_end = self.watch_html.lower().index("</title>")
131
  title = self.watch_html[i_start:i_end].strip()
 
133
  title = title[:index] if index > 0 else title
134
  self.player_config_args["title"] = unescape(title)
135
 
136
+ if self.watch_html:
137
+ self.vid_descr = extract.get_vid_descr(self.watch_html)
138
  # https://github.com/nficano/pytube/issues/165
139
  stream_maps = ["url_encoded_fmt_stream_map"]
140
  if "adaptive_fmts" in self.player_config_args:
 
147
  mixins.apply_descrambler(self.player_config_args, fmt)
148
 
149
  try:
150
+ mixins.apply_signature(
151
+ self.player_config_args, fmt, self.js # type: ignore
152
+ )
153
  except TypeError:
154
+ assert self.embed_html is not None
155
+ self.js_url = extract.js_url(self.embed_html, self.age_restricted)
156
  self.js = request.get(self.js_url)
157
+ assert self.js is not None
158
  mixins.apply_signature(self.player_config_args, fmt, self.js)
159
 
160
  # build instances of :class:`Stream <Stream>`
 
177
 
178
  """
179
  self.watch_html = request.get(url=self.watch_url)
180
+ if (
181
+ self.watch_html is None
182
+ or '<img class="icon meh" src="/yts/img' # noqa: W503
183
+ not in self.watch_html # noqa: W503
184
+ ):
185
  raise VideoUnavailable("This video is unavailable.")
186
  self.embed_html = request.get(url=self.embed_url)
187
  self.age_restricted = extract.is_age_restricted(self.watch_html)
188
  self.vid_info_url = extract.video_info_url(
189
  video_id=self.video_id,
190
  watch_url=self.watch_url,
 
191
  embed_html=self.embed_html,
192
  age_restricted=self.age_restricted,
193
  )
194
+ self.vid_info_raw = request.get(self.vid_info_url)
195
  if not self.age_restricted:
196
  self.js_url = extract.js_url(self.watch_html, self.age_restricted)
197
  self.js = request.get(self.js_url)
 
313
  :rtype: str
314
 
315
  """
316
+ return (
317
+ self.player_config_args.get("player_response", {})
318
+ .get("videoDetails", {})
319
+ .get("lengthSeconds")
320
+ )
321
 
322
  @property
323
  def views(self) -> str:
 
343
  .get("author", "unknown")
344
  )
345
 
346
+ def register_on_progress_callback(self, func: OnProgress):
347
  """Register a download progress callback function post initialization.
348
 
349
  :param callable func:
 
353
  :rtype: None
354
 
355
  """
356
+ self.stream_monostate.on_progress = func
357
 
358
+ def register_on_complete_callback(self, func: OnComplete):
359
  """Register a download complete callback function post initialization.
360
 
361
  :param callable func:
 
364
  :rtype: None
365
 
366
  """
367
+ self.stream_monostate.on_complete = func
pytube/cli.py CHANGED
@@ -136,18 +136,6 @@ def display_progress_bar(
136
 
137
 
138
  def on_progress(stream, chunk, file_handle, bytes_remaining):
139
- """On download progress callback function.
140
-
141
- :param object stream:
142
- An instance of :class:`Stream <Stream>` being downloaded.
143
- :param file_handle:
144
- The file handle where the media is being written to.
145
- :type file_handle:
146
- :py:class:`io.BufferedWriter`
147
- :param int bytes_remaining:
148
- How many bytes have been downloaded.
149
-
150
- """
151
  filesize = stream.filesize
152
  bytes_received = filesize - bytes_remaining
153
  display_progress_bar(bytes_received, filesize)
 
136
 
137
 
138
  def on_progress(stream, chunk, file_handle, bytes_remaining):
 
 
 
 
 
 
 
 
 
 
 
 
139
  filesize = stream.filesize
140
  bytes_received = filesize - bytes_remaining
141
  display_progress_bar(bytes_received, filesize)
pytube/contrib/playlist.py CHANGED
@@ -6,6 +6,7 @@ import json
6
  import logging
7
  import re
8
  from collections import OrderedDict
 
9
 
10
  from pytube import request
11
  from pytube.__main__ import YouTube
@@ -18,12 +19,12 @@ class Playlist(object):
18
  playlist
19
  """
20
 
21
- def __init__(self, url, suppress_exception=False):
22
  self.playlist_url = url
23
- self.video_urls = []
24
  self.suppress_exception = suppress_exception
25
 
26
- def construct_playlist_url(self):
27
  """There are two kinds of playlist urls in YouTube. One that contains
28
  watch?v= in URL, another one contains the "playlist?list=" portion. It
29
  is preferable to work with the later one.
@@ -53,7 +54,7 @@ class Playlist(object):
53
  load_more_url = ""
54
  return load_more_url
55
 
56
- def parse_links(self):
57
  """Parse the video links from the page source, extracts and
58
  returns the /watch?v= part from video link href
59
  It's an alternative for BeautifulSoup
@@ -115,8 +116,11 @@ class Playlist(object):
115
  return (str(i).zfill(digits) for i in range(start, stop, step))
116
 
117
  def download_all(
118
- self, download_path=None, prefix_number=True, reverse_numbering=False,
119
- ):
 
 
 
120
  """Download all the videos in the the playlist. Initially, download
121
  resolution is 720p (or highest available), later more option
122
  should be added to download resolution of choice
@@ -172,7 +176,7 @@ class Playlist(object):
172
  dl_stream.download(download_path)
173
  logger.debug("download complete")
174
 
175
- def title(self):
176
  """return playlist title (name)
177
  """
178
  try:
@@ -180,14 +184,19 @@ class Playlist(object):
180
  req = request.get(url)
181
  open_tag = "<title>"
182
  end_tag = "</title>"
183
- match_result = re.compile(open_tag + "(.+?)" + end_tag)
184
- match_result = match_result.search(req).group()
185
- match_result = match_result.replace(open_tag, "")
186
- match_result = match_result.replace(end_tag, "")
187
- match_result = match_result.replace("- YouTube", "")
188
- match_result = match_result.strip()
189
-
190
- return match_result
 
 
 
 
 
191
  except Exception as e:
192
  logger.debug(e)
193
  return None
 
6
  import logging
7
  import re
8
  from collections import OrderedDict
9
+ from typing import List, Optional
10
 
11
  from pytube import request
12
  from pytube.__main__ import YouTube
 
19
  playlist
20
  """
21
 
22
+ def __init__(self, url: str, suppress_exception: bool = False):
23
  self.playlist_url = url
24
+ self.video_urls: List[str] = []
25
  self.suppress_exception = suppress_exception
26
 
27
+ def construct_playlist_url(self) -> str:
28
  """There are two kinds of playlist urls in YouTube. One that contains
29
  watch?v= in URL, another one contains the "playlist?list=" portion. It
30
  is preferable to work with the later one.
 
54
  load_more_url = ""
55
  return load_more_url
56
 
57
+ def parse_links(self) -> List[str]:
58
  """Parse the video links from the page source, extracts and
59
  returns the /watch?v= part from video link href
60
  It's an alternative for BeautifulSoup
 
116
  return (str(i).zfill(digits) for i in range(start, stop, step))
117
 
118
  def download_all(
119
+ self,
120
+ download_path: Optional[str] = None,
121
+ prefix_number: bool = True,
122
+ reverse_numbering: bool = False,
123
+ ) -> None:
124
  """Download all the videos in the the playlist. Initially, download
125
  resolution is 720p (or highest available), later more option
126
  should be added to download resolution of choice
 
176
  dl_stream.download(download_path)
177
  logger.debug("download complete")
178
 
179
+ def title(self) -> Optional[str]:
180
  """return playlist title (name)
181
  """
182
  try:
 
184
  req = request.get(url)
185
  open_tag = "<title>"
186
  end_tag = "</title>"
187
+ pattern = re.compile(open_tag + "(.+?)" + end_tag)
188
+ match = pattern.search(req)
189
+
190
+ if match is None:
191
+ return None
192
+
193
+ return (
194
+ match.group()
195
+ .replace(open_tag, "")
196
+ .replace(end_tag, "")
197
+ .replace("- YouTube", "")
198
+ .strip()
199
+ )
200
  except Exception as e:
201
  logger.debug(e)
202
  return None
pytube/extract.py CHANGED
@@ -96,11 +96,7 @@ def eurl(video_id: str) -> str:
96
 
97
 
98
  def video_info_url(
99
- video_id: str,
100
- watch_url: str,
101
- watch_html: Optional[str],
102
- embed_html: str,
103
- age_restricted: bool,
104
  ) -> str:
105
  """Construct the video_info url.
106
 
@@ -108,8 +104,6 @@ def video_info_url(
108
  A YouTube video identifier.
109
  :param str watch_url:
110
  A YouTube watch url.
111
- :param str watch_html:
112
- (Unused) The html contents of the watch page.
113
  :param str embed_html:
114
  The html contents of the embed page (for age restricted videos).
115
  :param bool age_restricted:
@@ -139,7 +133,7 @@ def video_info_url(
139
  return "https://youtube.com/get_video_info?" + urlencode(params)
140
 
141
 
142
- def js_url(html: str, age_restricted: bool = False) -> str:
143
  """Get the base JavaScript url.
144
 
145
  Construct the base JavaScript url, which contains the decipher
@@ -151,7 +145,7 @@ def js_url(html: str, age_restricted: bool = False) -> str:
151
  Is video age restricted.
152
 
153
  """
154
- ytplayer_config = get_ytplayer_config(html, age_restricted)
155
  base_js = ytplayer_config["assets"]["js"]
156
  return "https://youtube.com" + base_js
157
 
 
96
 
97
 
98
  def video_info_url(
99
+ video_id: str, watch_url: str, embed_html: Optional[str], age_restricted: bool,
 
 
 
 
100
  ) -> str:
101
  """Construct the video_info url.
102
 
 
104
  A YouTube video identifier.
105
  :param str watch_url:
106
  A YouTube watch url.
 
 
107
  :param str embed_html:
108
  The html contents of the embed page (for age restricted videos).
109
  :param bool age_restricted:
 
133
  return "https://youtube.com/get_video_info?" + urlencode(params)
134
 
135
 
136
+ def js_url(html: str, age_restricted: Optional[bool] = False) -> str:
137
  """Get the base JavaScript url.
138
 
139
  Construct the base JavaScript url, which contains the decipher
 
145
  Is video age restricted.
146
 
147
  """
148
+ ytplayer_config = get_ytplayer_config(html, age_restricted or False)
149
  base_js = ytplayer_config["assets"]["js"]
150
  return "https://youtube.com" + base_js
151
 
pytube/monostate.py ADDED
@@ -0,0 +1,59 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import io
2
+ from typing import Any, Optional
3
+ from typing_extensions import Protocol
4
+
5
+
6
+ class OnProgress(Protocol):
7
+ def __call__(
8
+ self,
9
+ stream: Any,
10
+ chunk: Any,
11
+ file_handler: io.BufferedWriter,
12
+ bytes_remaining: int,
13
+ ) -> None:
14
+ ...
15
+
16
+ """On download progress callback function.
17
+
18
+ :param stream:
19
+ An instance of :class:`Stream <Stream>` being downloaded.
20
+ :type stream:
21
+ :py:class:`pytube.Stream`
22
+ :param str chunk:
23
+ Segment of media file binary data, not yet written to disk.
24
+ :param file_handler:
25
+ The file handle where the media is being written to.
26
+ :type file_handler:
27
+ :py:class:`io.BufferedWriter`
28
+ :param int bytes_remaining:
29
+ How many bytes have been downloaded.
30
+
31
+ """
32
+
33
+
34
+ class OnComplete(Protocol):
35
+ def __call__(self, stream: Any, file_handler: io.BufferedWriter) -> None:
36
+ ...
37
+
38
+ """On download complete handler function.
39
+
40
+ :param stream:
41
+ An instance of :class:`Stream <Stream>` being downloaded.
42
+ :type stream:
43
+ :py:class:`pytube.Stream`
44
+ :param file_handler:
45
+ The file handle where the media is being written to.
46
+ :type file_handler:
47
+ :py:class:`io.BufferedWriter`
48
+
49
+ :rtype: None
50
+
51
+ """
52
+
53
+
54
+ class Monostate:
55
+ def __init__(
56
+ self, on_progress: Optional[OnProgress], on_complete: Optional[OnComplete]
57
+ ):
58
+ self.on_progress = on_progress
59
+ self.on_complete = on_complete
pytube/streams.py CHANGED
@@ -18,7 +18,7 @@ from pytube import extract
18
  from pytube import request
19
  from pytube.helpers import safe_filename
20
  from pytube.itags import get_format_profile
21
-
22
 
23
  logger = logging.getLogger(__name__)
24
 
@@ -26,7 +26,7 @@ logger = logging.getLogger(__name__)
26
  class Stream(object):
27
  """Container for stream manifest data."""
28
 
29
- def __init__(self, stream: Dict, player_config_args: Dict, monostate: Dict):
30
  """Construct a :class:`Stream <Stream>`.
31
 
32
  :param dict stream:
@@ -303,7 +303,7 @@ class Stream(object):
303
  indent=2,
304
  ),
305
  )
306
- on_progress = self._monostate["on_progress"]
307
  if on_progress:
308
  logger.debug("calling on_progress callback %s", on_progress)
309
  on_progress(self, chunk, file_handler, bytes_remaining)
@@ -320,7 +320,7 @@ class Stream(object):
320
 
321
  """
322
  logger.debug("download finished")
323
- on_complete = self._monostate["on_complete"]
324
  if on_complete:
325
  logger.debug("calling on_complete callback %s", on_complete)
326
  on_complete(self, file_handle)
 
18
  from pytube import request
19
  from pytube.helpers import safe_filename
20
  from pytube.itags import get_format_profile
21
+ from pytube.monostate import Monostate
22
 
23
  logger = logging.getLogger(__name__)
24
 
 
26
  class Stream(object):
27
  """Container for stream manifest data."""
28
 
29
+ def __init__(self, stream: Dict, player_config_args: Dict, monostate: Monostate):
30
  """Construct a :class:`Stream <Stream>`.
31
 
32
  :param dict stream:
 
303
  indent=2,
304
  ),
305
  )
306
+ on_progress = self._monostate.on_progress
307
  if on_progress:
308
  logger.debug("calling on_progress callback %s", on_progress)
309
  on_progress(self, chunk, file_handler, bytes_remaining)
 
320
 
321
  """
322
  logger.debug("download finished")
323
+ on_complete = self._monostate.on_complete
324
  if on_complete:
325
  logger.debug("calling on_complete callback %s", on_complete)
326
  on_complete(self, file_handle)
tests/test_extract.py CHANGED
@@ -19,7 +19,6 @@ def test_info_url(cipher_signature):
19
  video_info_url = extract.video_info_url(
20
  video_id=cipher_signature.video_id,
21
  watch_url=cipher_signature.watch_url,
22
- watch_html=cipher_signature.watch_html,
23
  embed_html="",
24
  age_restricted=False,
25
  )
 
19
  video_info_url = extract.video_info_url(
20
  video_id=cipher_signature.video_id,
21
  watch_url=cipher_signature.watch_url,
 
22
  embed_html="",
23
  age_restricted=False,
24
  )