narugo's picture
dev(narugo): quick webp
4737e31
raw
history blame
4.75 kB
import json
import os
from collections import defaultdict
from functools import lru_cache
from typing import List, Dict
import faiss
import gradio as gr
import numpy as np
from PIL import Image
from cheesechaser.datapool import YandeWebpDataPool, ZerochanWebpDataPool, GelbooruWebpDataPool, \
KonachanWebpDataPool, AnimePicturesWebpDataPool, DanbooruNewestWebpDataPool, Rule34WebpDataPool
from hfutils.operate import get_hf_fs, get_hf_client
from hfutils.utils import TemporaryDirectory
from imgutils.tagging import wd14
from pools import quick_webp_pool
_REPO_ID = 'deepghs/index_experiments'
hf_fs = get_hf_fs()
hf_client = get_hf_client()
_DEFAULT_MODEL_NAME = 'SwinV2_v3_dgzyka_23325111_8GB'
_ALL_MODEL_NAMES = [
os.path.dirname(os.path.relpath(path, _REPO_ID))
for path in hf_fs.glob(f'{_REPO_ID}/*/knn.index')
]
_SITE_CLS = {
'danbooru': DanbooruNewestWebpDataPool,
'yandere': YandeWebpDataPool,
'zerochan': ZerochanWebpDataPool,
'gelbooru': GelbooruWebpDataPool,
'konachan': KonachanWebpDataPool,
'anime_pictures': AnimePicturesWebpDataPool,
'rule34': Rule34WebpDataPool,
}
def _get_from_ids(site_name: str, ids: List[int]) -> Dict[int, Image.Image]:
with TemporaryDirectory() as td:
site_cls = _SITE_CLS.get(site_name) or quick_webp_pool(site_name, 3)
datapool = site_cls()
datapool.batch_download_to_directory(
resource_ids=ids,
dst_dir=td,
)
retval = {}
for file in os.listdir(td):
id_ = int(os.path.splitext(file)[0])
image = Image.open(os.path.join(td, file))
image.load()
retval[id_] = image
return retval
def _get_from_raw_ids(ids: List[str]) -> Dict[str, Image.Image]:
_sites = defaultdict(list)
for id_ in ids:
site_name, num_id = id_.rsplit('_', maxsplit=1)
num_id = int(num_id)
_sites[site_name].append(num_id)
_retval = {}
for site_name, site_ids in _sites.items():
_retval.update({
f'{site_name}_{id_}': image
for id_, image in _get_from_ids(site_name, site_ids).items()
})
return _retval
@lru_cache(maxsize=3)
def _get_index_info(repo_id: str, model_name: str):
image_ids = np.load(hf_client.hf_hub_download(
repo_id=repo_id,
repo_type='model',
filename=f'{model_name}/ids.npy',
))
knn_index = faiss.read_index(hf_client.hf_hub_download(
repo_id=repo_id,
repo_type='model',
filename=f'{model_name}/knn.index',
))
config = json.loads(open(hf_client.hf_hub_download(
repo_id=repo_id,
repo_type='model',
filename=f'{model_name}/infos.json',
)).read())["index_param"]
faiss.ParameterSpace().set_index_parameters(knn_index, config)
return image_ids, knn_index
def search(model_name: str, img_input, n_neighbours: int):
images_ids, knn_index = _get_index_info(_REPO_ID, model_name)
embeddings = wd14.get_wd14_tags(
img_input,
model_name="SwinV2_v3",
fmt="embedding",
)
embeddings = np.expand_dims(embeddings, 0)
faiss.normalize_L2(embeddings)
dists, indexes = knn_index.search(embeddings, k=n_neighbours)
neighbours_ids = images_ids[indexes][0]
captions = []
images = []
ids_to_images = _get_from_raw_ids(neighbours_ids)
for image_id, dist in zip(neighbours_ids, dists[0]):
if image_id in ids_to_images:
images.append(ids_to_images[image_id])
captions.append(f"{image_id}/{dist:.2f}")
return list(zip(images, captions))
if __name__ == "__main__":
with gr.Blocks() as demo:
with gr.Row():
with gr.Column():
img_input = gr.Image(type="pil", label="Input")
with gr.Column():
with gr.Row():
n_model = gr.Dropdown(
choices=_ALL_MODEL_NAMES,
value=_DEFAULT_MODEL_NAME,
label='Index to Use',
)
with gr.Row():
n_neighbours = gr.Slider(
minimum=1,
maximum=50,
value=20,
step=1,
label="# of images",
)
find_btn = gr.Button("Find similar images")
with gr.Row():
similar_images = gr.Gallery(label="Similar images", columns=[5])
find_btn.click(
fn=search,
inputs=[
n_model,
img_input,
n_neighbours,
],
outputs=[similar_images],
)
demo.queue().launch()