Spaces:
Runtime error
Runtime error
import datetime | |
import os | |
from enum import Enum | |
from typing import Optional, Iterator, List, Tuple, Union | |
from hbutils.system import urlsplit | |
from .web import NoURL, WebDataSource | |
from ..utils import get_requests_session, srequest | |
class Rating(str, Enum): | |
SAFE = "s" | |
QUESTIONABLE = "q" | |
EXPLICIT = "e" | |
class PostOrder(Enum): | |
POPULARITY = "popularity" | |
DATE = "date" | |
QUALITY = "quality" | |
RANDOM = "random" | |
RECENTLY_FAVORITED = "recently_favorited" | |
RECENTLY_VOTED = "recently_voted" | |
class FileType(Enum): | |
IMAGE = "image" # jpeg, png, webp formats | |
GIF = "animated_gif" # gif format | |
VIDEO = "video" # mp4, webm formats | |
def _tags_by_kwargs(**kwargs): | |
tags = [] | |
for k, v in kwargs.items(): | |
if v is None: | |
pass | |
elif k in {"order", "rating", "file_type"} and v is not FileType.IMAGE: # noqa | |
tags.append(f"{k}:{v.value}") | |
elif k in {"threshold", "recommended_for", "voted"}: | |
tags.append(f"{k}:{v}") | |
elif k == "date": | |
date = "..".join(d.strftime("%Y-%m-%dT%H:%M") for d in self.date) # type: ignore[union-attr] | |
tags.append(f"date:{date}") | |
elif k == "added_by": | |
for user in self.added_by: # type: ignore[union-attr] | |
tags.append(f"user:{user}") | |
return tags | |
class SankakuSource(WebDataSource): | |
def __init__(self, tags: List[str], order: Optional[PostOrder] = None, | |
rating: Optional[Rating] = None, file_type: Optional[FileType] = None, | |
date: Optional[Tuple[datetime.datetime, datetime.datetime]] = None, | |
username: Optional[str] = None, password: Optional[str] = None, access_token: Optional[str] = None, | |
min_size: Optional[int] = 800, download_silent: bool = True, group_name: str = 'sankaku', **kwargs): | |
WebDataSource.__init__(self, group_name, get_requests_session(), download_silent) | |
self.tags = tags + _tags_by_kwargs(order=order, rating=rating, file_type=file_type, date=date, **kwargs) | |
self.username, self.password = username, password | |
self.access_token = access_token | |
self.min_size = min_size | |
self.auth_session = get_requests_session(headers={ | |
'Content-Type': 'application/json; charset=utf-8', | |
'Accept-Encoding': 'gzip, deflate, br', | |
'Host': 'capi-v2.sankakucomplex.com', | |
'X-Requested-With': 'com.android.browser', | |
}) | |
_FILE_URLS = [ | |
('sample_url', 'sample_width', 'sample_height'), | |
('preview_url', 'preview_width', 'preview_height'), | |
('file_url', 'width', 'height'), | |
] | |
def _select_url(self, data): | |
if self.min_size is not None: | |
f_url, f_width, f_height = None, None, None | |
for url_name, width_name, height_name in self._FILE_URLS: | |
if url_name in data and width_name in data and height_name in data: | |
url, width, height = data[url_name], data[width_name], data[height_name] | |
if width and height and width >= self.min_size and height >= self.min_size: | |
if f_url is None or width < f_width: | |
f_url, f_width, f_height = url, width, height | |
if f_url is not None: | |
return f_url | |
if 'file_url' in data and data['file_url']: | |
return data['file_url'] | |
else: | |
raise NoURL | |
def _login(self): | |
if self.access_token: | |
self.auth_session.headers.update({ | |
"Authorization": f"Bearer {self.access_token}", | |
}) | |
elif self.username and self.password: | |
resp = srequest(self.auth_session, 'POST', 'https://login.sankakucomplex.com/auth/token', | |
json={"login": self.username, "password": self.password}) | |
resp.raise_for_status() | |
login_data = resp.json() | |
self.auth_session.headers.update({ | |
"Authorization": f"{login_data['token_type']} {login_data['access_token']}", | |
}) | |
def _iter_data(self) -> Iterator[Tuple[Union[str, int], str, dict]]: | |
self._login() | |
page = 1 | |
while True: | |
resp = srequest(self.auth_session, 'GET', 'https://capi-v2.sankakucomplex.com/posts', params={ | |
'lang': 'en', | |
'page': str(page), | |
'limit': '100', | |
'tags': ' '.join(self.tags), | |
}) | |
resp.raise_for_status() | |
if not resp.json(): | |
break | |
for data in resp.json(): | |
if 'file_type' not in data or 'image' not in data['file_type']: | |
continue | |
try: | |
url = self._select_url(data) | |
except NoURL: | |
continue | |
_, ext_name = os.path.splitext(urlsplit(url).filename) | |
filename = f'{self.group_name}_{data["id"]}{ext_name}' | |
meta = { | |
'sankaku': data, | |
'group_id': f'{self.group_name}_{data["id"]}', | |
'filename': filename, | |
'tags': {key: 1.0 for key in [t_item['name'] for t_item in data['tags']]} | |
} | |
yield data["id"], url, meta | |
page += 1 | |