Spaces:
Sleeping
Sleeping
import os | |
import cv2 | |
import torch | |
import numpy as np | |
from omegaconf import OmegaConf | |
from random import randint | |
import json | |
# import codecs | |
from . import dongba_sd_helper as sd_helper | |
from PIL import Image | |
from .dongda_gpt_helper import GPTHelper | |
from .omegaconf_utils import load_from_config | |
import gradio as gr | |
def _overlay_composition(img, x, y, w, h, alpha, clr, composition_image): | |
# img to weights | |
paint = 1.0 - (img.astype(float) / 255.0).mean(axis=2) | |
# clip | |
mask = (paint > 0.25) | |
paint[mask] = 1.0 | |
paint[~mask] = 0.0 | |
# # gamma before resize | |
# paint = np.power(paint, 1.0 / 2.2) | |
t = int(min(max(composition_image.shape[0] * y, 0), composition_image.shape[0] - 2)) | |
b = int(min(max(composition_image.shape[0] * (y + h), 0), composition_image.shape[0] - 1)) | |
l = int(min(max(composition_image.shape[1] * x, 0), composition_image.shape[1] - 2)) | |
r = int(min(max(composition_image.shape[1] * (x + w), 0), composition_image.shape[1] - 1)) | |
to_w = int(r - l) | |
to_h = int(b - t) | |
paint = cv2.resize(paint, (to_w, to_h))[:, :, None] | |
# # gamma after resize | |
# paint = np.power(paint, 2.2) | |
# blending | |
block = composition_image[t:b, l:r, :] | |
block = block * (1.0 - paint * alpha) + (clr[None, None, :] * paint * alpha) | |
composition_image[t:b, l:r, :] = block | |
return composition_image | |
class DongbaDreamer(): | |
def __init__(self, config) -> None: | |
self.setup_config(config) | |
self.reset_logs() | |
def reset_logs(self): | |
self.logs = 'Dongba Dreamer\n' | |
def setup_config(self, config): | |
self.config = config | |
config.gpt.yaml_dir = config.yaml_dir | |
# gpt | |
self.gpt_helper = GPTHelper(config.gpt) | |
self.gpt_helper.log = self.log | |
# sd | |
if config.get('generator_sd', None): | |
self.generator_sd = sd_helper.SDHelper(config.generator_sd) | |
if config.get('controlnet_sd', None): | |
self.controlnet_sd = sd_helper.SDHelper(config.controlnet_sd) | |
def get_logs(self): | |
return self.logs | |
def log(self, text, end='\n'): | |
print(f'{text}') | |
self.logs += text + end | |
################################################## | |
def composite_image(self, canvas_width, canvas_height): | |
dat_dir = self.config.dat_dir | |
composition_image = np.zeros((canvas_height, canvas_width, 3)) | |
x_max = max([item['x'] for item in self.info['composition']]) | |
y_max = max([item['y'] for item in self.info['composition']]) | |
x_max = x_max * 1.2 | |
y_max = y_max * 1.2 | |
for item in self.info['composition']: | |
idx = item['idx'] | |
img_dir = os.path.join(dat_dir, f'{idx+1:04d}') | |
if not os.path.exists(img_dir): | |
self.log(f'{img_dir} not exist!') | |
continue | |
img_fns = [fn for fn in os.listdir(img_dir) if fn.endswith('.png')] | |
if len(img_fns) == 0: | |
self.log(f'{img_dir} is empty!') | |
continue | |
x = item['x'] / x_max | |
y = item['y'] / y_max | |
w = 180 / canvas_width | |
h = 180 / canvas_height | |
x = x - w / 2 | |
y = y - h / 2 | |
select_i = randint(0, len(img_fns)-1) | |
img_fn = os.path.join(img_dir, img_fns[select_i]) | |
img = cv2.imread(img_fn, cv2.IMREAD_UNCHANGED) | |
img, alpha = img[:, :, :3], img[:, :, 3:] | |
clr = np.array([1.0, 1.0, 1.0]) | |
img = 255 - img | |
img[img < 250] = 0 | |
alpha = 1.0 | |
composition_image = _overlay_composition(img, x, y, w, h, alpha, clr, composition_image) | |
composition_image = (composition_image.clip(0, 1) * 255).astype(np.uint8) | |
return composition_image | |
################################################## | |
def process_words(self, image_topic, canvas_width, canvas_height, num_words=0): | |
if image_topic is None or len(image_topic) == 0: | |
return [] | |
self.log('----------------------------------------') | |
self.log(f'这幅画的主题是:') | |
self.log(f'{image_topic}') | |
self.log(f'分辨率是 {canvas_width}x{canvas_height}') | |
self.log('----------------------------------------') | |
self.log(f'<GPT> 生成 stable diffusion prompt: ') | |
image_prompt = self.gpt_helper.query_image_prompt(image_topic) | |
image_prompt = 'Joan Miro style abstract painting. ' + image_prompt | |
self.log(f'{image_prompt}') | |
self.log('----------------------------------------') | |
keywords = self.gpt_helper.query_keywords(image_topic) | |
self.log(f'<GPT> 画面关键词有:{keywords}') | |
self.log('----------------------------------------') | |
################################################## | |
self.log(f'查询东巴文数据库:') | |
words_to_use = [] | |
for keyword in keywords: | |
words = self.gpt_helper.query_in_faiss_db(keyword) | |
if len(words) > 0: | |
select_i = 0 | |
self.log(f"- {keyword}: {[w['word'] for w in words]}, 选择:{words[select_i]['word']}") | |
words_to_use.append(words[select_i]) | |
# keywords_to_query = ', '.join([w['word'] for w in words_to_use]) | |
keywords_to_query = ', '.join([f"{w['idx']}:{w['word']}" for w in words_to_use]) | |
self.log('----------------------------------------') | |
################################################## | |
self.info = { | |
'image_topic': image_topic, | |
'keywords': keywords, | |
'words_to_use': words_to_use, | |
'keywords_to_query': keywords_to_query, | |
} | |
# info_fn = os.path.join(self.config.work_dir, 'info.json') | |
# with codecs.open(info_fn, 'w', encoding='utf-8') as fp: | |
# json.dump(self.info, fp, indent=4, ensure_ascii=False) | |
################################################## | |
self.log('<gpt> 生成构图...', end='') | |
composition_txt = self.gpt_helper.query_composition(image_topic, keywords_to_query, canvas_width, canvas_height, num_words=num_words) | |
self.log(composition_txt) | |
composition_txt = composition_txt.replace('```', '') | |
composition_txt = composition_txt.replace('json', '') | |
# with open(os.path.join(self.config.work_dir, 'composition.json'), 'w') as fp: | |
# fp.write(composition_txt) | |
composition = json.loads(composition_txt.replace('/n', '')) | |
self.info.update({ | |
'composition': composition, | |
}) | |
# with codecs.open(info_fn, 'w', encoding='utf-8') as fp: | |
# json.dump(self.info, fp, indent=4, ensure_ascii=False) | |
################################################## | |
word_images = [] | |
self.log('构图由远及近分别是:') | |
for word_i, item in enumerate(composition): | |
idx = item['idx'] | |
img_dir = os.path.join(self.config.dat_dir, f'{idx+1:04d}') | |
if not os.path.exists(img_dir): | |
self.log(f'{img_dir} not exist!') | |
continue | |
self.log(f'- {item["name"]}') | |
img_fns = [fn for fn in os.listdir(img_dir) if fn.endswith('.png')] | |
# select_i = randint(0, len(img_fns)-1) | |
select_i = 0 | |
img_fn = os.path.join(img_dir, img_fns[select_i]) | |
img = cv2.imread(img_fn) | |
self.info['composition'][word_i]['select_i'] = select_i | |
word_images.append(Image.fromarray(img)) | |
self.log('----------------------------------------') | |
self.word_images = word_images | |
# composite | |
composition_image = self.composite_image(canvas_width, canvas_height) | |
return { | |
'word_images': word_images, | |
'image_prompt': image_prompt, | |
'composition_image': composition_image, | |
} | |
################################################## | |
def process_sd(self, image_prompt, composition_image): | |
self.log(f'<sd> 生成图片: {image_prompt}') | |
if self.config.condition_type == 'mixed_canny': | |
reference_image = self.generator_sd.forward(image_prompt)[0] | |
composition_image = composition_image | |
control_canny = sd_helper.make_merged_canny(composition_image, reference_image, 0.5) | |
control_image = Image.fromarray(control_canny) | |
# control_image.save(os.path.join(work_dir, 'control_canny.jpg')) | |
self.log('<sd> 注入灵魂...') | |
rlt_image = self.controlnet_sd.forward(image_prompt, control_image=control_image) | |
# image.save(os.path.join(work_dir, "moon-v1.5-canny.png")) | |
images = [*rlt_image, control_image, | |
reference_image, | |
Image.fromarray(composition_image)] | |
else: | |
control_image = Image.fromarray(composition_image) | |
rlt_image = self.controlnet_sd.forward(image_prompt, control_image=control_image) | |
images = [*rlt_image, control_image] | |
return images | |