Spaces:
Runtime error
Runtime error
File size: 3,154 Bytes
4f8ad24 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 |
import os
import re
from typing import Optional, List, Iterator, Tuple, Union
import requests
import xmltodict
from hbutils.system import urlsplit
from .web import WebDataSource, NoURL
from ..utils import get_requests_session
class PahealSource(WebDataSource):
def __init__(self, tags: List[str], user_id: Optional[str] = None, api_key: Optional[str] = None,
min_size: Optional[int] = 800, download_silent: bool = True, group_name: str = 'paheal'):
WebDataSource.__init__(self, group_name, get_requests_session(), download_silent)
self.tags = tags
self.min_size = min_size
self.user_id, self.api_key = user_id, api_key
def _params(self, page):
params = {
'tags': ' '.join(self.tags),
'limit': '100',
'page': str(page),
}
if self.user_id and self.api_key:
params['user_id'] = self.user_id
params['api_key'] = self.api_key
return params
def _select_url(self, data):
if self.min_size is not None:
url_names = [key for key in data.keys() if key.endswith('_url')]
name_pairs = [
*(
(name, f'{name[:-4]}_width', f'{name[:-4]}_height')
for name in url_names
),
('file_url', 'width', 'height'),
]
f_url, f_width, f_height = None, None, None
for url_name, width_name, height_name in name_pairs:
if url_name in data and width_name in data and height_name in data:
url, width, height = data[url_name], int(data[width_name]), int(data[height_name])
if width >= self.min_size and height >= self.min_size:
if f_url is None or width < f_width:
f_url, f_width, f_height = url, width, height
if f_url is not None:
return f_url
if 'file_url' in data:
return data['file_url']
else:
raise NoURL
def _iter_data(self) -> Iterator[Tuple[Union[str, int], str, dict]]:
page = 1
while True:
resp = requests.get('https://rule34.paheal.net/api/danbooru/find_posts/index.xml',
params=self._params(page))
resp.raise_for_status()
posts = xmltodict.parse(resp.text)['posts']['tag']
for data in posts:
data = {key.lstrip('@'): value for key, value in data.items()}
try:
url = self._select_url(data)
except NoURL:
continue
_, ext_name = os.path.splitext(urlsplit(url).filename)
filename = f'{self.group_name}_{data["id"]}{ext_name}'
meta = {
'paheal': data,
'group_id': f'{self.group_name}_{data["id"]}',
'filename': filename,
'tags': {key: 1.0 for key in re.split(r'\s+', data['tags'])}
}
yield data["id"], url, meta
page += 1
|