hbmartin commited on
Commit
099e410
·
unverified ·
2 Parent(s): 2ff0295 c45419e

Merge pull request #35 from hbmartin/martin.playlist-pagination

Browse files
.flake8 CHANGED
@@ -1,3 +1,3 @@
1
  [flake8]
2
  ignore = E231,E203,W503
3
- max-line-length = 88
 
1
  [flake8]
2
  ignore = E231,E203,W503
3
+ max-line-length = 89
pytube/__main__.py CHANGED
@@ -17,11 +17,11 @@ from html import unescape
17
  from pytube import Caption
18
  from pytube import CaptionQuery
19
  from pytube import extract
20
- from pytube import mixins
21
  from pytube import request
22
  from pytube import Stream
23
  from pytube import StreamQuery
24
- from pytube.mixins import install_proxy
 
25
  from pytube.exceptions import VideoUnavailable
26
  from pytube.monostate import OnProgress, OnComplete, Monostate
27
 
@@ -135,11 +135,11 @@ class YouTube:
135
  # unscramble the progressive and adaptive stream manifests.
136
  for fmt in stream_maps:
137
  if not self.age_restricted and fmt in self.vid_info:
138
- mixins.apply_descrambler(self.vid_info, fmt)
139
- mixins.apply_descrambler(self.player_config_args, fmt)
140
 
141
  try:
142
- mixins.apply_signature(
143
  self.player_config_args, fmt, self.js # type: ignore
144
  )
145
  except TypeError:
@@ -147,7 +147,7 @@ class YouTube:
147
  self.js_url = extract.js_url(self.embed_html, self.age_restricted)
148
  self.js = request.get(self.js_url)
149
  assert self.js is not None
150
- mixins.apply_signature(self.player_config_args, fmt, self.js)
151
 
152
  # build instances of :class:`Stream <Stream>`
153
  self.initialize_stream_objects(fmt)
 
17
  from pytube import Caption
18
  from pytube import CaptionQuery
19
  from pytube import extract
 
20
  from pytube import request
21
  from pytube import Stream
22
  from pytube import StreamQuery
23
+ from pytube.extract import apply_descrambler, apply_signature
24
+ from pytube.helpers import install_proxy
25
  from pytube.exceptions import VideoUnavailable
26
  from pytube.monostate import OnProgress, OnComplete, Monostate
27
 
 
135
  # unscramble the progressive and adaptive stream manifests.
136
  for fmt in stream_maps:
137
  if not self.age_restricted and fmt in self.vid_info:
138
+ apply_descrambler(self.vid_info, fmt)
139
+ apply_descrambler(self.player_config_args, fmt)
140
 
141
  try:
142
+ apply_signature(
143
  self.player_config_args, fmt, self.js # type: ignore
144
  )
145
  except TypeError:
 
147
  self.js_url = extract.js_url(self.embed_html, self.age_restricted)
148
  self.js = request.get(self.js_url)
149
  assert self.js is not None
150
+ apply_signature(self.player_config_args, fmt, self.js)
151
 
152
  # build instances of :class:`Stream <Stream>`
153
  self.initialize_stream_objects(fmt)
pytube/cipher.py CHANGED
@@ -54,7 +54,7 @@ def get_initial_function_name(js: str) -> str:
54
  regex = re.compile(pattern)
55
  results = regex.search(js)
56
  if results:
57
- logger.debug(f"finished regex search, matched: {pattern}")
58
  return results.group(1)
59
 
60
  raise RegexMatchError(caller="get_initial_function_name", pattern="multiple")
 
54
  regex = re.compile(pattern)
55
  results = regex.search(js)
56
  if results:
57
+ logger.debug("finished regex search, matched: %s", pattern)
58
  return results.group(1)
59
 
60
  raise RegexMatchError(caller="get_initial_function_name", pattern="multiple")
pytube/contrib/playlist.py CHANGED
@@ -4,14 +4,12 @@
4
  import json
5
  import logging
6
  import re
7
- from collections import OrderedDict
8
  from datetime import date, datetime
9
  from typing import List, Optional, Iterable, Dict
10
  from urllib.parse import parse_qs
11
 
12
  from pytube import request, YouTube
13
- from pytube.helpers import cache, deprecated
14
- from pytube.mixins import install_proxy
15
 
16
  logger = logging.getLogger(__name__)
17
 
@@ -46,6 +44,8 @@ class Playlist:
46
  f"{month} {day:0>2} {year}", "%b %d %Y"
47
  ).date()
48
 
 
 
49
  @staticmethod
50
  def _find_load_more_url(req: str) -> Optional[str]:
51
  """Given an html page or a fragment thereof, looks for
@@ -60,41 +60,58 @@ class Playlist:
60
 
61
  return None
62
 
63
- def parse_links(self, until_watch_id: Optional[str] = None) -> List[str]:
 
 
 
 
64
  """Parse the video links from the page source, extracts and
65
  returns the /watch?v= part from video link href
66
  """
67
  req = self.html
68
-
69
- # split the page source by line and process each line
70
- content = [x for x in req.split("\n") if "pl-video-title-link" in x]
71
- link_list = [x.split('href="', 1)[1].split("&", 1)[0] for x in content]
 
 
 
 
 
72
 
73
  # The above only returns 100 or fewer links
74
  # Simulating a browser request for the load more link
75
  load_more_url = self._find_load_more_url(req)
 
76
  while load_more_url: # there is an url found
 
 
 
 
 
 
 
 
 
77
  if until_watch_id:
78
  try:
79
- trim_index = link_list.index(f"/watch?v={until_watch_id}")
80
- return link_list[:trim_index]
 
81
  except ValueError:
82
  pass
83
- logger.debug("load more url: %s", load_more_url)
84
- req = request.get(load_more_url)
85
- load_more = json.loads(req)
86
- videos = re.findall(
87
- r"href=\"(/watch\?v=[\w-]*)", load_more["content_html"],
88
- )
89
- # remove duplicates
90
- link_list.extend(list(OrderedDict.fromkeys(videos)))
91
  load_more_url = self._find_load_more_url(
92
  load_more["load_more_widget_html"],
93
  )
94
 
95
- return link_list
 
 
 
96
 
97
- def trimmed(self, video_id: str) -> List[str]:
98
  """Retrieve a list of YouTube video URLs trimmed at the given video ID
99
  i.e. if the playlist has video IDs 1,2,3,4 calling trimmed(3) returns [1,2]
100
  :type video_id: str
@@ -103,8 +120,9 @@ class Playlist:
103
  :returns:
104
  List of video URLs from the playlist trimmed at the given ID
105
  """
106
- trimmed_watch = self.parse_links(until_watch_id=video_id)
107
- return [self._video_url(watch_path) for watch_path in trimmed_watch]
 
108
 
109
  @property # type: ignore
110
  @cache
@@ -114,10 +132,15 @@ class Playlist:
114
  :returns:
115
  List of video URLs
116
  """
117
- return [self._video_url(watch_path) for watch_path in self.parse_links()]
 
 
118
 
119
  @property
120
  def videos(self) -> Iterable[YouTube]:
 
 
 
121
  for url in self.video_urls:
122
  yield YouTube(url)
123
 
 
4
  import json
5
  import logging
6
  import re
 
7
  from datetime import date, datetime
8
  from typing import List, Optional, Iterable, Dict
9
  from urllib.parse import parse_qs
10
 
11
  from pytube import request, YouTube
12
+ from pytube.helpers import cache, deprecated, install_proxy, uniqueify
 
13
 
14
  logger = logging.getLogger(__name__)
15
 
 
44
  f"{month} {day:0>2} {year}", "%b %d %Y"
45
  ).date()
46
 
47
+ self._video_regex = re.compile(r"href=\"(/watch\?v=[\w-]*)")
48
+
49
  @staticmethod
50
  def _find_load_more_url(req: str) -> Optional[str]:
51
  """Given an html page or a fragment thereof, looks for
 
60
 
61
  return None
62
 
63
+ @deprecated("This function will be removed in the future, please use .video_urls")
64
+ def parse_links(self) -> List[str]: # pragma: no cover
65
+ return self.video_urls
66
+
67
+ def _paginate(self, until_watch_id: Optional[str] = None) -> Iterable[List[str]]:
68
  """Parse the video links from the page source, extracts and
69
  returns the /watch?v= part from video link href
70
  """
71
  req = self.html
72
+ videos_urls = self._extract_videos(req)
73
+ if until_watch_id:
74
+ try:
75
+ trim_index = videos_urls.index(f"/watch?v={until_watch_id}")
76
+ yield videos_urls[:trim_index]
77
+ return
78
+ except ValueError:
79
+ pass
80
+ yield videos_urls
81
 
82
  # The above only returns 100 or fewer links
83
  # Simulating a browser request for the load more link
84
  load_more_url = self._find_load_more_url(req)
85
+
86
  while load_more_url: # there is an url found
87
+ logger.debug("load more url: %s", load_more_url)
88
+ req = request.get(load_more_url)
89
+ load_more = json.loads(req)
90
+ try:
91
+ html = load_more["content_html"]
92
+ except KeyError:
93
+ logger.debug("Could not find content_html")
94
+ return
95
+ videos_urls = self._extract_videos(html)
96
  if until_watch_id:
97
  try:
98
+ trim_index = videos_urls.index(f"/watch?v={until_watch_id}")
99
+ yield videos_urls[:trim_index]
100
+ return
101
  except ValueError:
102
  pass
103
+ yield videos_urls
104
+
 
 
 
 
 
 
105
  load_more_url = self._find_load_more_url(
106
  load_more["load_more_widget_html"],
107
  )
108
 
109
+ return
110
+
111
+ def _extract_videos(self, html: str) -> List[str]:
112
+ return uniqueify(self._video_regex.findall(html))
113
 
114
+ def trimmed(self, video_id: str) -> Iterable[str]:
115
  """Retrieve a list of YouTube video URLs trimmed at the given video ID
116
  i.e. if the playlist has video IDs 1,2,3,4 calling trimmed(3) returns [1,2]
117
  :type video_id: str
 
120
  :returns:
121
  List of video URLs from the playlist trimmed at the given ID
122
  """
123
+ for page in self._paginate(until_watch_id=video_id):
124
+ for watch_path in page:
125
+ yield self._video_url(watch_path)
126
 
127
  @property # type: ignore
128
  @cache
 
132
  :returns:
133
  List of video URLs
134
  """
135
+ return [
136
+ self._video_url(video) for page in list(self._paginate()) for video in page
137
+ ]
138
 
139
  @property
140
  def videos(self) -> Iterable[YouTube]:
141
+ """Iterable of YouTube objects representing videos in this playlist
142
+ :rtype: Iterable[YouTube]
143
+ """
144
  for url in self.video_urls:
145
  yield YouTube(url)
146
 
pytube/extract.py CHANGED
@@ -1,15 +1,18 @@
1
  # -*- coding: utf-8 -*-
2
  """This module contains all non-cipher related data extraction logic."""
3
  import json
 
4
  import re
5
  from collections import OrderedDict
6
 
7
  from html.parser import HTMLParser
8
- from typing import Any, Optional, Tuple, List
9
- from urllib.parse import quote
10
  from urllib.parse import urlencode
11
- from pytube.exceptions import RegexMatchError, HTMLParseError
12
- from pytube.helpers import regex_search
 
 
13
 
14
 
15
  class PytubeHTMLParser(HTMLParser):
@@ -206,3 +209,117 @@ def get_vid_descr(html: str) -> str:
206
  html_parser = PytubeHTMLParser()
207
  html_parser.feed(html)
208
  return html_parser.vid_descr
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
  # -*- coding: utf-8 -*-
2
  """This module contains all non-cipher related data extraction logic."""
3
  import json
4
+ import pprint
5
  import re
6
  from collections import OrderedDict
7
 
8
  from html.parser import HTMLParser
9
+ from typing import Any, Optional, Tuple, List, Dict
10
+ from urllib.parse import quote, parse_qs, unquote, parse_qsl
11
  from urllib.parse import urlencode
12
+
13
+ from pytube import cipher
14
+ from pytube.exceptions import RegexMatchError, HTMLParseError, LiveStreamError
15
+ from pytube.helpers import regex_search, logger
16
 
17
 
18
  class PytubeHTMLParser(HTMLParser):
 
209
  html_parser = PytubeHTMLParser()
210
  html_parser.feed(html)
211
  return html_parser.vid_descr
212
+
213
+
214
+ def apply_signature(config_args: Dict, fmt: str, js: str) -> None:
215
+ """Apply the decrypted signature to the stream manifest.
216
+
217
+ :param dict config_args:
218
+ Details of the media streams available.
219
+ :param str fmt:
220
+ Key in stream manifests (``ytplayer_config``) containing progressive
221
+ download or adaptive streams (e.g.: ``url_encoded_fmt_stream_map`` or
222
+ ``adaptive_fmts``).
223
+ :param str js:
224
+ The contents of the base.js asset file.
225
+
226
+ """
227
+ stream_manifest = config_args[fmt]
228
+ live_stream = (
229
+ json.loads(config_args["player_response"])
230
+ .get("playabilityStatus", {},)
231
+ .get("liveStreamability")
232
+ )
233
+ for i, stream in enumerate(stream_manifest):
234
+ try:
235
+ url: str = stream["url"]
236
+ except KeyError:
237
+ if live_stream:
238
+ raise LiveStreamError("Video is currently being streamed live")
239
+ # 403 Forbidden fix.
240
+ if "signature" in url or (
241
+ "s" not in stream and ("&sig=" in url or "&lsig=" in url)
242
+ ):
243
+ # For certain videos, YouTube will just provide them pre-signed, in
244
+ # which case there's no real magic to download them and we can skip
245
+ # the whole signature descrambling entirely.
246
+ logger.debug("signature found, skip decipher")
247
+ continue
248
+
249
+ if js is not None:
250
+ signature = cipher.get_signature(js, stream["s"])
251
+ else:
252
+ # signature not present in url (line 33), need js to descramble
253
+ # TypeError caught in __main__
254
+ raise TypeError("JS is None")
255
+
256
+ logger.debug(
257
+ "finished descrambling signature for itag=%s\n%s",
258
+ stream["itag"],
259
+ pprint.pformat({"s": stream["s"], "signature": signature,}, indent=2,),
260
+ )
261
+ # 403 forbidden fix
262
+ stream_manifest[i]["url"] = url + "&sig=" + signature
263
+
264
+
265
+ def apply_descrambler(stream_data: Dict, key: str) -> None:
266
+ """Apply various in-place transforms to YouTube's media stream data.
267
+
268
+ Creates a ``list`` of dictionaries by string splitting on commas, then
269
+ taking each list item, parsing it as a query string, converting it to a
270
+ ``dict`` and unquoting the value.
271
+
272
+ :param dict stream_data:
273
+ Dictionary containing query string encoded values.
274
+ :param str key:
275
+ Name of the key in dictionary.
276
+
277
+ **Example**:
278
+
279
+ >>> d = {'foo': 'bar=1&var=test,em=5&t=url%20encoded'}
280
+ >>> apply_descrambler(d, 'foo')
281
+ >>> print(d)
282
+ {'foo': [{'bar': '1', 'var': 'test'}, {'em': '5', 't': 'url encoded'}]}
283
+
284
+ """
285
+ if key == "url_encoded_fmt_stream_map" and not stream_data.get(
286
+ "url_encoded_fmt_stream_map"
287
+ ):
288
+ formats = json.loads(stream_data["player_response"])["streamingData"]["formats"]
289
+ formats.extend(
290
+ json.loads(stream_data["player_response"])["streamingData"][
291
+ "adaptiveFormats"
292
+ ]
293
+ )
294
+ try:
295
+ stream_data[key] = [
296
+ {
297
+ "url": format_item["url"],
298
+ "type": format_item["mimeType"],
299
+ "quality": format_item["quality"],
300
+ "itag": format_item["itag"],
301
+ }
302
+ for format_item in formats
303
+ ]
304
+ except KeyError:
305
+ cipher_url = [
306
+ parse_qs(formats[i]["cipher"]) for i, data in enumerate(formats)
307
+ ]
308
+ stream_data[key] = [
309
+ {
310
+ "url": cipher_url[i]["url"][0],
311
+ "s": cipher_url[i]["s"][0],
312
+ "type": format_item["mimeType"],
313
+ "quality": format_item["quality"],
314
+ "itag": format_item["itag"],
315
+ }
316
+ for i, format_item in enumerate(formats)
317
+ ]
318
+ else:
319
+ stream_data[key] = [
320
+ {k: unquote(v) for k, v in parse_qsl(i)}
321
+ for i in stream_data[key].split(",")
322
+ ]
323
+ logger.debug(
324
+ "applying descrambler\n%s", pprint.pformat(stream_data[key], indent=2),
325
+ )
pytube/helpers.py CHANGED
@@ -6,7 +6,8 @@ import os
6
  import pprint
7
  import re
8
  import warnings
9
- from typing import TypeVar, Callable, Optional
 
10
 
11
  from pytube.exceptions import RegexMatchError
12
 
@@ -156,3 +157,20 @@ def target_directory(output_path: Optional[str] = None) -> str:
156
  output_path = os.getcwd()
157
  os.makedirs(output_path, exist_ok=True)
158
  return output_path
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
6
  import pprint
7
  import re
8
  import warnings
9
+ from typing import TypeVar, Callable, Optional, Dict, List, Any
10
+ from urllib import request
11
 
12
  from pytube.exceptions import RegexMatchError
13
 
 
157
  output_path = os.getcwd()
158
  os.makedirs(output_path, exist_ok=True)
159
  return output_path
160
+
161
+
162
+ def install_proxy(proxy_handler: Dict[str, str]) -> None:
163
+ proxy_support = request.ProxyHandler(proxy_handler)
164
+ opener = request.build_opener(proxy_support)
165
+ request.install_opener(opener)
166
+
167
+
168
+ def uniqueify(duped_list: List) -> List:
169
+ seen: Dict[Any, bool] = {}
170
+ result = []
171
+ for item in duped_list:
172
+ if item in seen:
173
+ continue
174
+ seen[item] = True
175
+ result.append(item)
176
+ return result
pytube/mixins.py DELETED
@@ -1,137 +0,0 @@
1
- # -*- coding: utf-8 -*-
2
- """Applies in-place data mutations."""
3
-
4
- import json
5
- import logging
6
- import pprint
7
- from typing import Dict
8
-
9
- from pytube import cipher
10
- from urllib import request
11
- from urllib.parse import parse_qsl
12
- from urllib.parse import parse_qs
13
- from urllib.parse import unquote
14
- from pytube.exceptions import LiveStreamError
15
-
16
-
17
- logger = logging.getLogger(__name__)
18
-
19
-
20
- def apply_signature(config_args: Dict, fmt: str, js: str) -> None:
21
- """Apply the decrypted signature to the stream manifest.
22
-
23
- :param dict config_args:
24
- Details of the media streams available.
25
- :param str fmt:
26
- Key in stream manifests (``ytplayer_config``) containing progressive
27
- download or adaptive streams (e.g.: ``url_encoded_fmt_stream_map`` or
28
- ``adaptive_fmts``).
29
- :param str js:
30
- The contents of the base.js asset file.
31
-
32
- """
33
- stream_manifest = config_args[fmt]
34
- live_stream = (
35
- json.loads(config_args["player_response"])
36
- .get("playabilityStatus", {},)
37
- .get("liveStreamability")
38
- )
39
- for i, stream in enumerate(stream_manifest):
40
- try:
41
- url: str = stream["url"]
42
- except KeyError:
43
- if live_stream:
44
- raise LiveStreamError("Video is currently being streamed live")
45
- # 403 Forbidden fix.
46
- if "signature" in url or (
47
- "s" not in stream and ("&sig=" in url or "&lsig=" in url)
48
- ):
49
- # For certain videos, YouTube will just provide them pre-signed, in
50
- # which case there's no real magic to download them and we can skip
51
- # the whole signature descrambling entirely.
52
- logger.debug("signature found, skip decipher")
53
- continue
54
-
55
- if js is not None:
56
- signature = cipher.get_signature(js, stream["s"])
57
- else:
58
- # signature not present in url (line 33), need js to descramble
59
- # TypeError caught in __main__
60
- raise TypeError("JS is None")
61
-
62
- logger.debug(
63
- "finished descrambling signature for itag=%s\n%s",
64
- stream["itag"],
65
- pprint.pformat({"s": stream["s"], "signature": signature,}, indent=2,),
66
- )
67
- # 403 forbidden fix
68
- stream_manifest[i]["url"] = url + "&sig=" + signature
69
-
70
-
71
- def apply_descrambler(stream_data: Dict, key: str) -> None:
72
- """Apply various in-place transforms to YouTube's media stream data.
73
-
74
- Creates a ``list`` of dictionaries by string splitting on commas, then
75
- taking each list item, parsing it as a query string, converting it to a
76
- ``dict`` and unquoting the value.
77
-
78
- :param dict stream_data:
79
- Dictionary containing query string encoded values.
80
- :param str key:
81
- Name of the key in dictionary.
82
-
83
- **Example**:
84
-
85
- >>> d = {'foo': 'bar=1&var=test,em=5&t=url%20encoded'}
86
- >>> apply_descrambler(d, 'foo')
87
- >>> print(d)
88
- {'foo': [{'bar': '1', 'var': 'test'}, {'em': '5', 't': 'url encoded'}]}
89
-
90
- """
91
- if key == "url_encoded_fmt_stream_map" and not stream_data.get(
92
- "url_encoded_fmt_stream_map"
93
- ):
94
- formats = json.loads(stream_data["player_response"])["streamingData"]["formats"]
95
- formats.extend(
96
- json.loads(stream_data["player_response"])["streamingData"][
97
- "adaptiveFormats"
98
- ]
99
- )
100
- try:
101
- stream_data[key] = [
102
- {
103
- "url": format_item["url"],
104
- "type": format_item["mimeType"],
105
- "quality": format_item["quality"],
106
- "itag": format_item["itag"],
107
- }
108
- for format_item in formats
109
- ]
110
- except KeyError:
111
- cipher_url = [
112
- parse_qs(formats[i]["cipher"]) for i, data in enumerate(formats)
113
- ]
114
- stream_data[key] = [
115
- {
116
- "url": cipher_url[i]["url"][0],
117
- "s": cipher_url[i]["s"][0],
118
- "type": format_item["mimeType"],
119
- "quality": format_item["quality"],
120
- "itag": format_item["itag"],
121
- }
122
- for i, format_item in enumerate(formats)
123
- ]
124
- else:
125
- stream_data[key] = [
126
- {k: unquote(v) for k, v in parse_qsl(i)}
127
- for i in stream_data[key].split(",")
128
- ]
129
- logger.debug(
130
- "applying descrambler\n%s", pprint.pformat(stream_data[key], indent=2),
131
- )
132
-
133
-
134
- def install_proxy(proxy_handler: Dict[str, str]) -> None:
135
- proxy_support = request.ProxyHandler(proxy_handler)
136
- opener = request.build_opener(proxy_support)
137
- request.install_opener(opener)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
tests/conftest.py CHANGED
@@ -61,3 +61,14 @@ def playlist_html():
61
  )
62
  with gzip.open(file_path, "rb") as f:
63
  return f.read().decode("utf-8")
 
 
 
 
 
 
 
 
 
 
 
 
61
  )
62
  with gzip.open(file_path, "rb") as f:
63
  return f.read().decode("utf-8")
64
+
65
+
66
+ @pytest.fixture
67
+ def playlist_long_html():
68
+ """Youtube playlist HTML loaded on 2020-01-25 from
69
+ https://www.youtube.com/playlist?list=PLzMcBGfZo4-mP7qA9cagf68V06sko5otr"""
70
+ file_path = os.path.join(
71
+ os.path.dirname(os.path.realpath(__file__)), "mocks", "playlist_long.html.gz"
72
+ )
73
+ with gzip.open(file_path, "rb") as f:
74
+ return f.read().decode("utf-8")
tests/contrib/test_playlist.py CHANGED
@@ -58,30 +58,6 @@ def test_init_with_watch_id(request_get):
58
  )
59
 
60
 
61
- @mock.patch("pytube.contrib.playlist.request.get")
62
- def test_parse_links(request_get, playlist_html):
63
- url = "https://www.fakeurl.com/playlist?list=whatever"
64
- request_get.return_value = playlist_html
65
- playlist = Playlist(url)
66
- playlist._find_load_more_url = MagicMock(return_value=None)
67
- links = playlist.parse_links()
68
- request_get.assert_called()
69
- assert links == [
70
- "/watch?v=ujTCoH21GlA",
71
- "/watch?v=45ryDIPHdGg",
72
- "/watch?v=1BYu65vLKdA",
73
- "/watch?v=3AQ_74xrch8",
74
- "/watch?v=ddqQUz9mZaM",
75
- "/watch?v=vwLT6bZrHEE",
76
- "/watch?v=TQKI0KE-JYY",
77
- "/watch?v=dNBvQ38MlT8",
78
- "/watch?v=JHxyrMgOUWI",
79
- "/watch?v=l2I8NycJMCY",
80
- "/watch?v=g1Zbuk1gAfk",
81
- "/watch?v=zixd-si9Q-o",
82
- ]
83
-
84
-
85
  @mock.patch("pytube.contrib.playlist.request.get")
86
  def test_video_urls(request_get, playlist_html):
87
  url = "https://www.fakeurl.com/playlist?list=whatever"
@@ -144,9 +120,68 @@ def test_trimmed(request_get, playlist_html):
144
  url = "https://www.fakeurl.com/playlist?list=whatever"
145
  request_get.return_value = playlist_html
146
  playlist = Playlist(url)
147
- playlist._find_load_more_url = MagicMock(return_value="dummy")
148
  assert request_get.call_count == 1
149
- assert playlist.trimmed("1BYu65vLKdA") == [
 
150
  "https://www.youtube.com/watch?v=ujTCoH21GlA",
151
  "https://www.youtube.com/watch?v=45ryDIPHdGg",
152
  ]
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
58
  )
59
 
60
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
61
  @mock.patch("pytube.contrib.playlist.request.get")
62
  def test_video_urls(request_get, playlist_html):
63
  url = "https://www.fakeurl.com/playlist?list=whatever"
 
120
  url = "https://www.fakeurl.com/playlist?list=whatever"
121
  request_get.return_value = playlist_html
122
  playlist = Playlist(url)
123
+ playlist._find_load_more_url = MagicMock(return_value=None)
124
  assert request_get.call_count == 1
125
+ trimmed = list(playlist.trimmed("1BYu65vLKdA"))
126
+ assert trimmed == [
127
  "https://www.youtube.com/watch?v=ujTCoH21GlA",
128
  "https://www.youtube.com/watch?v=45ryDIPHdGg",
129
  ]
130
+
131
+
132
+ @mock.patch("pytube.contrib.playlist.request.get")
133
+ def test_playlist_failed_pagination(request_get, playlist_long_html):
134
+ url = "https://www.fakeurl.com/playlist?list=whatever"
135
+ request_get.side_effect = [
136
+ playlist_long_html,
137
+ "{}",
138
+ ]
139
+ playlist = Playlist(url)
140
+ video_urls = playlist.video_urls
141
+ assert len(video_urls) == 100
142
+ assert request_get.call_count == 2
143
+ request_get.assert_called_with(
144
+ "https://www.youtube.com/browse_ajax?action_continuation=1&amp;continuation"
145
+ "=4qmFsgIsEhpWTFVVYS12aW9HaGUyYnRCY1puZWFQb25LQRoOZWdaUVZEcERSMUUlM0Q%253D"
146
+ )
147
+
148
+
149
+ @mock.patch("pytube.contrib.playlist.request.get")
150
+ def test_playlist_pagination(request_get, playlist_html, playlist_long_html):
151
+ url = "https://www.fakeurl.com/playlist?list=whatever"
152
+ request_get.side_effect = [
153
+ playlist_long_html,
154
+ '{"content_html":"<a href=\\"/watch?v=BcWz41-4cDk&amp;feature=plpp_video&amp;ved'
155
+ '=CCYQxjQYACITCO33n5-pn-cCFUG3xAodLogN2yj6LA\\">}", "load_more_widget_html":""}',
156
+ "{}",
157
+ ]
158
+ playlist = Playlist(url)
159
+ assert len(playlist.video_urls) == 101
160
+ assert request_get.call_count == 2
161
+
162
+
163
+ @mock.patch("pytube.contrib.playlist.request.get")
164
+ def test_trimmed_pagination(request_get, playlist_html, playlist_long_html):
165
+ url = "https://www.fakeurl.com/playlist?list=whatever"
166
+ request_get.side_effect = [
167
+ playlist_long_html,
168
+ '{"content_html":"<a href=\\"/watch?v=BcWz41-4cDk&amp;feature=plpp_video&amp;ved'
169
+ '=CCYQxjQYACITCO33n5-pn-cCFUG3xAodLogN2yj6LA\\">}", "load_more_widget_html":""}',
170
+ "{}",
171
+ ]
172
+ playlist = Playlist(url)
173
+ assert len(list(playlist.trimmed("FN9vC8aR7Yk"))) == 3
174
+ assert request_get.call_count == 1
175
+
176
+
177
+ @mock.patch("pytube.contrib.playlist.request.get")
178
+ def test_trimmed_pagination_not_found(request_get, playlist_html, playlist_long_html):
179
+ url = "https://www.fakeurl.com/playlist?list=whatever"
180
+ request_get.side_effect = [
181
+ playlist_long_html,
182
+ '{"content_html":"<a href=\\"/watch?v=BcWz41-4cDk&amp;feature=plpp_video&amp;ved'
183
+ '=CCYQxjQYACITCO33n5-pn-cCFUG3xAodLogN2yj6LA\\">}", "load_more_widget_html":""}',
184
+ "{}",
185
+ ]
186
+ playlist = Playlist(url)
187
+ assert len(list(playlist.trimmed("wont-be-found"))) == 101
tests/mocks/playlist_long.html.gz ADDED
Binary file (47.7 kB). View file