Spaces:
Runtime error
Runtime error
File size: 7,003 Bytes
4f8ad24 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 |
import logging
from functools import reduce
from operator import __or__
from typing import Iterator, Tuple, Optional, List, Mapping
from hbutils.string import plural_word
from .anime_pictures import AnimePicturesSource
from .base import BaseDataSource
from .danbooru import ATFBooruSource, DanbooruSource, DanbooruLikeSource
from .konachan import KonachanSource, KonachanNetSource, HypnoHubSource, LolibooruSource, XbooruSource, YandeSource, \
Rule34Source, KonachanLikeSource
from .pixiv import PixivSearchSource
from .sankaku import SankakuSource
from .wallhaven import WallHavenSource
from .zerochan import ZerochanSource
from ..model import ImageItem
_PRESET_SITES = ('zerochan', 'danbooru')
_REGISTERED_SITE_SOURCES = {
'anime_pictures': AnimePicturesSource,
'atfbooru': ATFBooruSource,
# 'sankaku': SankakuSource, # still something wrong with sankaku source
'danbooru': DanbooruSource,
'hypnohub': HypnoHubSource,
'konachan': KonachanSource,
'konachan_net': KonachanNetSource,
'lolibooru': LolibooruSource,
'rule34': Rule34Source,
# 'safebooru': SafebooruSource,
'xbooru': XbooruSource,
'yande': YandeSource,
'zerochan': ZerochanSource,
'wallhaven': WallHavenSource,
'pixiv': PixivSearchSource,
}
class GcharAutoSource(BaseDataSource):
def __init__(self, ch, allow_fuzzy: bool = False, fuzzy_threshold: int = 80, contains_extra: bool = True,
sure_only: bool = True, preset_sites: Tuple[str, ...] = _PRESET_SITES,
max_preset_limit: Optional[int] = None, main_sources_count: int = 3,
blacklist_sites: Tuple[str, ...] = (), pixiv_refresh_token: Optional[str] = None,
extra_cfg: Optional[Mapping[str, dict]] = None):
from gchar.games import get_character
from gchar.games.base import Character
if isinstance(ch, Character):
self.ch = ch
else:
self.ch = get_character(ch, allow_fuzzy, fuzzy_threshold, contains_extra)
if not self.ch:
raise ValueError(f'Character {ch!r} not found.')
logging.info(f'Character {self.ch!r} found in gchar.')
self.sure_only = sure_only
self.pixiv_refresh_token = pixiv_refresh_token
self.extra_cfg = dict(extra_cfg or {})
for site in preset_sites:
assert site in _REGISTERED_SITE_SOURCES, f'Preset site {site!r} not available.'
self.preset_sites = sorted(preset_sites)
self.max_preset_limit = max_preset_limit
if 'pixiv' in self.preset_sites and not self.pixiv_refresh_token:
raise ValueError('Pixiv refresh token not given for presetting pixiv source!')
self.main_sources_count = main_sources_count
self.blacklist_sites = blacklist_sites
def _select_keyword_for_site(self, site) -> Tuple[Optional[str], Optional[int]]:
from gchar.resources.sites import list_site_tags
from gchar.resources.pixiv import get_pixiv_keywords, get_pixiv_posts
if site == 'pixiv':
keyword = get_pixiv_keywords(self.ch)
cnt = get_pixiv_posts(self.ch)
count = 0 if cnt is None else cnt[0]
return keyword, count
else:
tags: List[Tuple[str, int]] = list_site_tags(self.ch, site, sure_only=self.sure_only, with_posts=True)
tags = sorted(tags, key=lambda x: (-x[1], x[0]))
if tags:
return tags[0]
else:
return None, None
def _build_source_on_site(self, site) -> Optional[BaseDataSource]:
site_class = _REGISTERED_SITE_SOURCES[site]
keyword, count = self._select_keyword_for_site(site)
if keyword is not None:
extra_cfg = dict(self.extra_cfg.get(site, None) or {})
logging.info(f'Recommended keyword for site {site!r} is {keyword!r}, '
f'with {plural_word(count, "known post")}.')
if issubclass(site_class, (DanbooruLikeSource, AnimePicturesSource)):
return site_class([keyword, 'solo'], **extra_cfg)
elif issubclass(site_class, (KonachanLikeSource, SankakuSource)):
return site_class([keyword], **extra_cfg)
elif issubclass(site_class, ZerochanSource):
return ZerochanSource(keyword, strict=True, **extra_cfg)
elif issubclass(site_class, WallHavenSource):
return site_class(keyword, **extra_cfg)
elif issubclass(site_class, (PixivSearchSource,)):
return site_class(keyword, refresh_token=self.pixiv_refresh_token, **extra_cfg)
else:
raise TypeError(f'Unknown class {site_class!r} for keyword {keyword!r}.') # pragma: no cover
else:
logging.info(f'No keyword recommendation for site {site!r}.')
return None
def _build_preset_source(self) -> Optional[BaseDataSource]:
logging.info('Building preset sites sources ...')
sources = [
self._build_source_on_site(site)
for site in self.preset_sites
]
sources = [source for source in sources if source is not None]
if sources:
retval = reduce(__or__, sources)
if self.max_preset_limit is not None:
retval = retval[:self.max_preset_limit]
return retval
else:
return None
def _build_main_source(self) -> Optional[BaseDataSource]:
_all_sites = set(_REGISTERED_SITE_SOURCES.keys())
if not self.pixiv_refresh_token:
_all_sites.remove('pixiv')
_all_sites = sorted(_all_sites - set(self.preset_sites) - set(self.blacklist_sites))
logging.info(f'Available sites for main sources: {_all_sites!r}.')
site_pairs = []
for site in _all_sites:
keyword, count = self._select_keyword_for_site(site)
if keyword is not None:
site_pairs.append((site, keyword, count))
site_pairs = sorted(site_pairs, key=lambda x: -x[2])[:self.main_sources_count]
logging.info(f'Selected main sites: {site_pairs!r}')
sources = [
self._build_source_on_site(site)
for site, _, _ in site_pairs
]
sources = [source for source in sources if source is not None]
if sources:
return reduce(__or__, sources)
else:
return None
def _build_source(self) -> Optional[BaseDataSource]:
preset_source = self._build_preset_source()
main_source = self._build_main_source()
if preset_source and main_source:
return preset_source + main_source
elif preset_source:
return preset_source
elif main_source:
return main_source
else:
return None
def _iter(self) -> Iterator[ImageItem]:
source = self._build_source()
if source is not None:
yield from source._iter()
|