import datetime import glob import json import os.path import zipfile from typing import Union, Tuple, List, Optional import pandas as pd from ditk import logging from gchar.games import get_character from gchar.games.base import Character from hbutils.string import plural_word from hbutils.system import TemporaryDirectory from huggingface_hub import CommitOperationAdd, hf_hub_url from waifuc.action import NoMonochromeAction, FilterSimilarAction, \ TaggingAction, PersonSplitAction, FaceCountAction, CCIPAction, ModeConvertAction, ClassFilterAction, \ FileOrderAction, RatingFilterAction, BaseAction, RandomFilenameAction, PaddingAlignAction, ThreeStageSplitAction, \ AlignMinSizeAction, MinSizeFilterAction, FilterAction from waifuc.action.filter import MinAreaFilterAction from waifuc.export import SaveExporter, TextualInversionExporter from waifuc.model import ImageItem from waifuc.source import GcharAutoSource, BaseDataSource, LocalSource from waifuc.utils import task_ctx from ..utils import number_to_tag, get_ch_name, get_alphabet_name, get_hf_client, download_file, get_hf_fs def get_source(source) -> BaseDataSource: if isinstance(source, (str, Character)): source = GcharAutoSource(source, main_sources_count=5) elif isinstance(source, BaseDataSource): pass else: raise TypeError(f'Unknown source type - {source!r}.') return source def get_main_source(source, no_r18: bool = False, bg_color: str = 'white', no_monochrome_check: bool = False, drop_multi: bool = True, skip: bool = False) -> BaseDataSource: source: BaseDataSource = get_source(source) if not skip: actions = [ModeConvertAction('RGB', bg_color)] if not no_monochrome_check: actions.append(NoMonochromeAction()) # no monochrome, greyscale or sketch actions.append(ClassFilterAction(['illustration', 'bangumi'])) # no comic or 3d if no_r18: actions.append(RatingFilterAction(['safe', 'r15'])) actions.append(FilterSimilarAction('all')) # filter duplicated images if drop_multi: actions.append(FaceCountAction(count=1, level='n')) # drop images with 0 or >1 faces actions.extend([ PersonSplitAction(level='n'), # crop for each person FaceCountAction(count=1, level='n'), FileOrderAction(), # Rename files in order # CCIPAction(min_val_count=15), # CCIP, filter the character you may not want to see in dataset FilterSimilarAction('all'), # filter duplicated images MinSizeFilterAction(320), TaggingAction(force=True, character_threshold=1.01), ]) actions.append(RandomFilenameAction(ext='.png')) else: actions = [] return source.attach(*actions) def actions_parse(actions: Union[int, Tuple[int, int], List[BaseAction]], bg_color: str = 'white'): if isinstance(actions, list): return actions elif isinstance(actions, tuple): width, height = actions return [PaddingAlignAction((width, height), bg_color)] elif isinstance(actions, int): return [AlignMinSizeAction(actions)] else: raise TypeError(f'Unknown post action type - {actions!r}.') class CustomMinSizeAction(FilterAction): def __init__(self, main_size: int = 280, min_eye_size: int = 180): self.main_size = main_size self.min_eye_size = min_eye_size def check(self, item: ImageItem) -> bool: min_size = min(item.image.width, item.image.height) if 'crop' in item.meta and item.meta['crop']['type'] == 'eye': return min_size >= self.min_eye_size else: return min_size >= self.main_size _SOURCES = { 'native': [ TaggingAction(force=False, character_threshold=1.01), ], 'stage3': [ ThreeStageSplitAction(split_person=False), FilterSimilarAction(), MinSizeFilterAction(280), TaggingAction(force=False, character_threshold=1.01), ], 'stage3-eyes': [ ThreeStageSplitAction(split_person=False, split_eyes=True), FilterSimilarAction(), CustomMinSizeAction(280, 180), TaggingAction(force=False, character_threshold=1.01), ] } _DEFAULT_RESOLUTIONS = { 'raw': ('native', [], 'Raw data with meta information.'), 'raw-stage3': ('stage3', [], '3-stage cropped raw data with meta information.'), 'raw-stage3-eyes': ('stage3-eyes', [], '3-stage cropped (with eye-focus) raw data with meta information.'), '384x512': ('native', (384, 512), '384x512 aligned dataset.'), # '512x512': ('native', (512, 512), '512x512 aligned dataset.'), '512x704': ('native', (512, 704), '512x704 aligned dataset.'), # '640x640': ('native', (640, 640), '640x640 aligned dataset.'), '640x880': ('native', (640, 880), '640x880 aligned dataset.'), 'stage3-640': ('stage3', 640, '3-stage cropped dataset with the shorter side not exceeding 640 pixels.'), 'stage3-800': ('stage3', 800, '3-stage cropped dataset with the shorter side not exceeding 800 pixels.'), 'stage3-p512-640': ('stage3', [MinAreaFilterAction(512), AlignMinSizeAction(640)], '3-stage cropped dataset with the area not less than 512x512 pixels.'), # 'stage3-1200': ('stage3', 1200, '3-stage cropped dataset with the shorter side not exceeding 1200 pixels.'), 'stage3-eyes-640': ('stage3-eyes', 640, '3-stage cropped (with eye-focus) dataset ' 'with the shorter side not exceeding 640 pixels.'), 'stage3-eyes-800': ('stage3-eyes', 800, '3-stage cropped (with eye-focus) dataset ' 'with the shorter side not exceeding 800 pixels.'), } DATASET_PVERSION = 'v1.4' def crawl_dataset_to_huggingface( source: Union[str, Character, BaseDataSource], repository: Optional[str] = None, name: Optional[str] = None, limit: Optional[int] = 10000, min_images: int = 3000, no_r18: bool = False, bg_color: str = 'white', drop_multi: bool = True, skip_preprocess: bool = False, no_monochrome_check: bool = False, repo_type: str = 'dataset', revision: str = 'main', path_in_repo: str = '.', private: bool = False, ): if isinstance(source, (str, Character)): if isinstance(source, str): source = get_character(source) name = f'{source.enname} ({source.__official_name__})' if not repository: repository = f'AppleHarem/{get_ch_name(source)}' else: if name is None: raise ValueError('Name must be specified when source is not str or character.') if not repository: repository = f'AppleHarem/{get_alphabet_name(name)}' hf_fs = get_hf_fs() if hf_fs.exists(f'datasets/{repository}/.gitattributes'): logging.warn(f'{repository} exists, skipped.') return origin_source = get_main_source(source, no_r18, bg_color, no_monochrome_check, drop_multi, skip_preprocess) with TemporaryDirectory() as td: # save origin directory origin_dir = os.path.join(td, 'origin') os.makedirs(origin_dir, exist_ok=True) if limit is not None: origin_source = origin_source[:limit] with task_ctx('origin'): origin_source.export(SaveExporter(origin_dir)) img_count = len(glob.glob(os.path.join(origin_dir, '*.png'))) if img_count < min_images: logging.warn(f'Only {plural_word(img_count, "image")} found for {name} which is too few, ' f'skip post-processing and uploading.') return source_dir = os.path.join(td, 'source') os.makedirs(source_dir, exist_ok=True) for sname, actions in _SOURCES.items(): with task_ctx(f'source/{sname}'): LocalSource(origin_dir).attach(*actions).export(SaveExporter(os.path.join(source_dir, sname))) processed_dir = os.path.join(td, 'processed') os.makedirs(processed_dir, exist_ok=True) archive_dir = os.path.join(td, 'archives') os.makedirs(archive_dir, exist_ok=True) files_to_upload: List[Tuple[str, str]] = [] resolutions = _DEFAULT_RESOLUTIONS columns = ['Name', 'Images', 'Download', 'Description'] rows = [] for rname, (sname, actions, description) in resolutions.items(): actions = actions_parse(actions, bg_color) ox = LocalSource(os.path.join(source_dir, sname)) current_processed_dir = os.path.join(processed_dir, rname) with task_ctx(f'archive/{rname}'): if not rname.startswith('raw'): # raw is preserved for exporting json data ox.attach(*actions).export(TextualInversionExporter(current_processed_dir)) else: ox.attach(*actions).export(SaveExporter(current_processed_dir)) current_img_cnt = len(glob.glob(os.path.join(current_processed_dir, '*.png'))) zip_file = os.path.join(archive_dir, f'dataset-{rname}.zip') with zipfile.ZipFile(zip_file, mode='w') as zf: for directory, _, files in os.walk(current_processed_dir): for file in files: file_path = os.path.join(directory, file) rel_file_path = os.path.relpath(file_path, current_processed_dir) zf.write( file_path, '/'.join(rel_file_path.split(os.sep)) ) rows.append(( rname, current_img_cnt, f'[Download]({os.path.basename(zip_file)})', description, )) files_to_upload.append((zip_file, os.path.basename(zip_file))) meta_file = os.path.join(td, 'meta.json') with open(meta_file, 'w', encoding='utf-8') as mf: json.dump({ 'name': name, 'version': DATASET_PVERSION, }, mf, indent=4, sort_keys=True, ensure_ascii=False) files_to_upload.append((meta_file, 'meta.json')) readme_file = os.path.join(td, 'README.md') with open(readme_file, 'w', encoding='utf-8') as rf: print(f'---', file=rf) print(f'license: mit', file=rf) print(f'task_categories:', file=rf) print(f'- text-to-image', file=rf) print(f'tags:', file=rf) print(f'- art', file=rf) print(f'- not-for-all-audiences', file=rf) print(f'size_categories:', file=rf) print(f'- {number_to_tag(img_count)}', file=rf) print(f'---', file=rf) print(f'', file=rf) print(f'# Dataset of {name}', file=rf) print(f'', file=rf) print(f'This is the dataset of {name}, ' f'containing {plural_word(img_count, "images")} and their tags.', file=rf) print(f'', file=rf) print(f'Images are crawled from many sites (e.g. danbooru, pixiv, zerochan ...), ' f'the auto-crawling system is powered by [DeepGHS Team](https://github.com/deepghs)' f'([huggingface organization](https://huggingface.co/deepghs)). ', file=rf) print(f'This is a WebUI contains crawlers and other thing: ' f'([LittleAppleWebUI](https://github.com/LittleApple-fp16/LittleAppleWebUI))', file=rf) print(f'', file=rf) df = pd.DataFrame(columns=columns, data=rows) print(df.to_markdown(index=False), file=rf) print('', file=rf) files_to_upload.append((readme_file, 'README.md')) hf_client = get_hf_client() hf_fs = get_hf_fs() logging.info(f'Initialize repository {repository!r}') if not hf_fs.exists(f'datasets/{repository}/.gitattributes'): hf_client.create_repo(repo_id=repository, repo_type=repo_type, exist_ok=True, private=private) current_time = datetime.datetime.now().astimezone().strftime('%Y-%m-%d %H:%M:%S %Z') commit_message = f"Publish character {name}, on {current_time}" logging.info(f'Publishing character {name!r} to repository {repository!r} ...') hf_client.create_commit( repository, [ CommitOperationAdd( path_in_repo=f'{path_in_repo}/{filename}', path_or_fileobj=local_file, ) for local_file, filename in files_to_upload ], commit_message=commit_message, repo_type=repo_type, revision=revision, run_as_future=False, ) def remake_dataset_to_huggingface( repository: Optional[str] = None, limit: Optional[int] = 200, min_images: int = 10, no_r18: bool = False, bg_color: str = 'white', drop_multi: bool = True, repo_type: str = 'dataset', revision: str = 'main', path_in_repo: str = '.', ): hf_fs = get_hf_fs() with TemporaryDirectory() as td: zip_file = os.path.join(td, 'dataset-raw.zip') download_file(hf_hub_url(repository, 'dataset-raw.zip', repo_type='dataset'), zip_file) source_dir = os.path.join(td, 'source') os.makedirs(source_dir, exist_ok=True) with zipfile.ZipFile(zip_file, 'r') as zf: zf.extractall(source_dir) source = LocalSource(source_dir) name = None if hf_fs.exists(f'datasets/{repository}/meta.json'): meta_json = json.loads(hf_fs.read_text(f'datasets/{repository}/meta.json')) if 'name' in meta_json: name = meta_json['name'] name = name or repository.split('/')[-1] return crawl_dataset_to_huggingface( source, repository, name, limit, min_images, no_r18, bg_color, drop_multi, True, repo_type, revision, path_in_repo )