Spaces:
Runtime error
Runtime error
import logging | |
from functools import reduce | |
from operator import __or__ | |
from typing import Iterator, Tuple, Optional, List, Mapping | |
from hbutils.string import plural_word | |
from .anime_pictures import AnimePicturesSource | |
from .base import BaseDataSource | |
from .danbooru import ATFBooruSource, DanbooruSource, DanbooruLikeSource | |
from .konachan import KonachanSource, KonachanNetSource, HypnoHubSource, LolibooruSource, XbooruSource, YandeSource, \ | |
Rule34Source, KonachanLikeSource | |
from .pixiv import PixivSearchSource | |
from .sankaku import SankakuSource | |
from .wallhaven import WallHavenSource | |
from .zerochan import ZerochanSource | |
from ..model import ImageItem | |
_PRESET_SITES = ('zerochan', 'danbooru') | |
_REGISTERED_SITE_SOURCES = { | |
'anime_pictures': AnimePicturesSource, | |
'atfbooru': ATFBooruSource, | |
# 'sankaku': SankakuSource, # still something wrong with sankaku source | |
'danbooru': DanbooruSource, | |
'hypnohub': HypnoHubSource, | |
'konachan': KonachanSource, | |
'konachan_net': KonachanNetSource, | |
'lolibooru': LolibooruSource, | |
'rule34': Rule34Source, | |
# 'safebooru': SafebooruSource, | |
'xbooru': XbooruSource, | |
'yande': YandeSource, | |
'zerochan': ZerochanSource, | |
'wallhaven': WallHavenSource, | |
'pixiv': PixivSearchSource, | |
} | |
class GcharAutoSource(BaseDataSource): | |
def __init__(self, ch, allow_fuzzy: bool = False, fuzzy_threshold: int = 80, contains_extra: bool = True, | |
sure_only: bool = True, preset_sites: Tuple[str, ...] = _PRESET_SITES, | |
max_preset_limit: Optional[int] = None, main_sources_count: int = 3, | |
blacklist_sites: Tuple[str, ...] = (), pixiv_refresh_token: Optional[str] = None, | |
extra_cfg: Optional[Mapping[str, dict]] = None): | |
from gchar.games import get_character | |
from gchar.games.base import Character | |
if isinstance(ch, Character): | |
self.ch = ch | |
else: | |
self.ch = get_character(ch, allow_fuzzy, fuzzy_threshold, contains_extra) | |
if not self.ch: | |
raise ValueError(f'Character {ch!r} not found.') | |
logging.info(f'Character {self.ch!r} found in gchar.') | |
self.sure_only = sure_only | |
self.pixiv_refresh_token = pixiv_refresh_token | |
self.extra_cfg = dict(extra_cfg or {}) | |
for site in preset_sites: | |
assert site in _REGISTERED_SITE_SOURCES, f'Preset site {site!r} not available.' | |
self.preset_sites = sorted(preset_sites) | |
self.max_preset_limit = max_preset_limit | |
if 'pixiv' in self.preset_sites and not self.pixiv_refresh_token: | |
raise ValueError('Pixiv refresh token not given for presetting pixiv source!') | |
self.main_sources_count = main_sources_count | |
self.blacklist_sites = blacklist_sites | |
def _select_keyword_for_site(self, site) -> Tuple[Optional[str], Optional[int]]: | |
from gchar.resources.sites import list_site_tags | |
from gchar.resources.pixiv import get_pixiv_keywords, get_pixiv_posts | |
if site == 'pixiv': | |
keyword = get_pixiv_keywords(self.ch) | |
cnt = get_pixiv_posts(self.ch) | |
count = 0 if cnt is None else cnt[0] | |
return keyword, count | |
else: | |
tags: List[Tuple[str, int]] = list_site_tags(self.ch, site, sure_only=self.sure_only, with_posts=True) | |
tags = sorted(tags, key=lambda x: (-x[1], x[0])) | |
if tags: | |
return tags[0] | |
else: | |
return None, None | |
def _build_source_on_site(self, site) -> Optional[BaseDataSource]: | |
site_class = _REGISTERED_SITE_SOURCES[site] | |
keyword, count = self._select_keyword_for_site(site) | |
if keyword is not None: | |
extra_cfg = dict(self.extra_cfg.get(site, None) or {}) | |
logging.info(f'Recommended keyword for site {site!r} is {keyword!r}, ' | |
f'with {plural_word(count, "known post")}.') | |
if issubclass(site_class, (DanbooruLikeSource, AnimePicturesSource)): | |
return site_class([keyword, 'solo'], **extra_cfg) | |
elif issubclass(site_class, (KonachanLikeSource, SankakuSource)): | |
return site_class([keyword], **extra_cfg) | |
elif issubclass(site_class, ZerochanSource): | |
return ZerochanSource(keyword, strict=True, **extra_cfg) | |
elif issubclass(site_class, WallHavenSource): | |
return site_class(keyword, **extra_cfg) | |
elif issubclass(site_class, (PixivSearchSource,)): | |
return site_class(keyword, refresh_token=self.pixiv_refresh_token, **extra_cfg) | |
else: | |
raise TypeError(f'Unknown class {site_class!r} for keyword {keyword!r}.') # pragma: no cover | |
else: | |
logging.info(f'No keyword recommendation for site {site!r}.') | |
return None | |
def _build_preset_source(self) -> Optional[BaseDataSource]: | |
logging.info('Building preset sites sources ...') | |
sources = [ | |
self._build_source_on_site(site) | |
for site in self.preset_sites | |
] | |
sources = [source for source in sources if source is not None] | |
if sources: | |
retval = reduce(__or__, sources) | |
if self.max_preset_limit is not None: | |
retval = retval[:self.max_preset_limit] | |
return retval | |
else: | |
return None | |
def _build_main_source(self) -> Optional[BaseDataSource]: | |
_all_sites = set(_REGISTERED_SITE_SOURCES.keys()) | |
if not self.pixiv_refresh_token: | |
_all_sites.remove('pixiv') | |
_all_sites = sorted(_all_sites - set(self.preset_sites) - set(self.blacklist_sites)) | |
logging.info(f'Available sites for main sources: {_all_sites!r}.') | |
site_pairs = [] | |
for site in _all_sites: | |
keyword, count = self._select_keyword_for_site(site) | |
if keyword is not None: | |
site_pairs.append((site, keyword, count)) | |
site_pairs = sorted(site_pairs, key=lambda x: -x[2])[:self.main_sources_count] | |
logging.info(f'Selected main sites: {site_pairs!r}') | |
sources = [ | |
self._build_source_on_site(site) | |
for site, _, _ in site_pairs | |
] | |
sources = [source for source in sources if source is not None] | |
if sources: | |
return reduce(__or__, sources) | |
else: | |
return None | |
def _build_source(self) -> Optional[BaseDataSource]: | |
preset_source = self._build_preset_source() | |
main_source = self._build_main_source() | |
if preset_source and main_source: | |
return preset_source + main_source | |
elif preset_source: | |
return preset_source | |
elif main_source: | |
return main_source | |
else: | |
return None | |
def _iter(self) -> Iterator[ImageItem]: | |
source = self._build_source() | |
if source is not None: | |
yield from source._iter() | |