import os import cv2 import torch import numpy as np from omegaconf import OmegaConf from random import randint import json # import codecs from . import dongba_sd_helper as sd_helper from PIL import Image from .dongda_gpt_helper import GPTHelper from .omegaconf_utils import load_from_config import gradio as gr def _overlay_composition(img, x, y, w, h, alpha, clr, composition_image): # img to weights paint = 1.0 - (img.astype(float) / 255.0).mean(axis=2) # clip mask = (paint > 0.25) paint[mask] = 1.0 paint[~mask] = 0.0 # # gamma before resize # paint = np.power(paint, 1.0 / 2.2) t = int(min(max(composition_image.shape[0] * y, 0), composition_image.shape[0] - 2)) b = int(min(max(composition_image.shape[0] * (y + h), 0), composition_image.shape[0] - 1)) l = int(min(max(composition_image.shape[1] * x, 0), composition_image.shape[1] - 2)) r = int(min(max(composition_image.shape[1] * (x + w), 0), composition_image.shape[1] - 1)) to_w = int(r - l) to_h = int(b - t) paint = cv2.resize(paint, (to_w, to_h))[:, :, None] # # gamma after resize # paint = np.power(paint, 2.2) # blending block = composition_image[t:b, l:r, :] block = block * (1.0 - paint * alpha) + (clr[None, None, :] * paint * alpha) composition_image[t:b, l:r, :] = block return composition_image class DongbaDreamer(): def __init__(self, config) -> None: self.setup_config(config) self.reset_logs() def reset_logs(self): self.logs = 'Dongba Dreamer\n' def setup_config(self, config): self.config = config config.gpt.yaml_dir = config.yaml_dir # gpt self.gpt_helper = GPTHelper(config.gpt) self.gpt_helper.log = self.log # sd if config.get('generator_sd', None): self.generator_sd = sd_helper.SDHelper(config.generator_sd) if config.get('controlnet_sd', None): self.controlnet_sd = sd_helper.SDHelper(config.controlnet_sd) def get_logs(self): return self.logs def log(self, text, end='\n'): print(f'{text}') self.logs += text + end ################################################## def composite_image(self, canvas_width, canvas_height): dat_dir = self.config.dat_dir composition_image = np.zeros((canvas_height, canvas_width, 3)) x_max = max([item['x'] for item in self.info['composition']]) y_max = max([item['y'] for item in self.info['composition']]) x_max = x_max * 1.2 y_max = y_max * 1.2 for item in self.info['composition']: idx = item['idx'] img_dir = os.path.join(dat_dir, f'{idx+1:04d}') if not os.path.exists(img_dir): self.log(f'{img_dir} not exist!') continue img_fns = [fn for fn in os.listdir(img_dir) if fn.endswith('.png')] if len(img_fns) == 0: self.log(f'{img_dir} is empty!') continue x = item['x'] / x_max y = item['y'] / y_max w = 180 / canvas_width h = 180 / canvas_height x = x - w / 2 y = y - h / 2 select_i = randint(0, len(img_fns)-1) img_fn = os.path.join(img_dir, img_fns[select_i]) img = cv2.imread(img_fn, cv2.IMREAD_UNCHANGED) img, alpha = img[:, :, :3], img[:, :, 3:] clr = np.array([1.0, 1.0, 1.0]) img = 255 - img img[img < 250] = 0 alpha = 1.0 composition_image = _overlay_composition(img, x, y, w, h, alpha, clr, composition_image) composition_image = (composition_image.clip(0, 1) * 255).astype(np.uint8) return composition_image ################################################## def process_words(self, image_topic, canvas_width, canvas_height, num_words=0): if image_topic is None or len(image_topic) == 0: return [] self.log('----------------------------------------') self.log(f'这幅画的主题是:') self.log(f'{image_topic}') self.log(f'分辨率是 {canvas_width}x{canvas_height}') self.log('----------------------------------------') self.log(f' 生成 stable diffusion prompt: ') image_prompt = self.gpt_helper.query_image_prompt(image_topic) image_prompt = 'Joan Miro style abstract painting. ' + image_prompt self.log(f'{image_prompt}') self.log('----------------------------------------') keywords = self.gpt_helper.query_keywords(image_topic) self.log(f' 画面关键词有:{keywords}') self.log('----------------------------------------') ################################################## self.log(f'查询东巴文数据库:') words_to_use = [] for keyword in keywords: words = self.gpt_helper.query_in_faiss_db(keyword) if len(words) > 0: select_i = 0 self.log(f"- {keyword}: {[w['word'] for w in words]}, 选择:{words[select_i]['word']}") words_to_use.append(words[select_i]) # keywords_to_query = ', '.join([w['word'] for w in words_to_use]) keywords_to_query = ', '.join([f"{w['idx']}:{w['word']}" for w in words_to_use]) self.log('----------------------------------------') ################################################## self.info = { 'image_topic': image_topic, 'keywords': keywords, 'words_to_use': words_to_use, 'keywords_to_query': keywords_to_query, } # info_fn = os.path.join(self.config.work_dir, 'info.json') # with codecs.open(info_fn, 'w', encoding='utf-8') as fp: # json.dump(self.info, fp, indent=4, ensure_ascii=False) ################################################## self.log(' 生成构图...', end='') composition_txt = self.gpt_helper.query_composition(image_topic, keywords_to_query, canvas_width, canvas_height, num_words=num_words) self.log(composition_txt) composition_txt = composition_txt.replace('```', '') composition_txt = composition_txt.replace('json', '') # with open(os.path.join(self.config.work_dir, 'composition.json'), 'w') as fp: # fp.write(composition_txt) composition = json.loads(composition_txt.replace('/n', '')) self.info.update({ 'composition': composition, }) # with codecs.open(info_fn, 'w', encoding='utf-8') as fp: # json.dump(self.info, fp, indent=4, ensure_ascii=False) ################################################## word_images = [] self.log('构图由远及近分别是:') for word_i, item in enumerate(composition): idx = item['idx'] img_dir = os.path.join(self.config.dat_dir, f'{idx+1:04d}') if not os.path.exists(img_dir): self.log(f'{img_dir} not exist!') continue self.log(f'- {item["name"]}') img_fns = [fn for fn in os.listdir(img_dir) if fn.endswith('.png')] # select_i = randint(0, len(img_fns)-1) select_i = 0 img_fn = os.path.join(img_dir, img_fns[select_i]) img = cv2.imread(img_fn) self.info['composition'][word_i]['select_i'] = select_i word_images.append(Image.fromarray(img)) self.log('----------------------------------------') self.word_images = word_images # composite composition_image = self.composite_image(canvas_width, canvas_height) return { 'word_images': word_images, 'image_prompt': image_prompt, 'composition_image': composition_image, } ################################################## def process_sd(self, image_prompt, composition_image): self.log(f' 生成图片: {image_prompt}') if self.config.condition_type == 'mixed_canny': reference_image = self.generator_sd.forward(image_prompt)[0] composition_image = composition_image control_canny = sd_helper.make_merged_canny(composition_image, reference_image, 0.5) control_image = Image.fromarray(control_canny) # control_image.save(os.path.join(work_dir, 'control_canny.jpg')) self.log(' 注入灵魂...') rlt_image = self.controlnet_sd.forward(image_prompt, control_image=control_image) # image.save(os.path.join(work_dir, "moon-v1.5-canny.png")) images = [*rlt_image, control_image, reference_image, Image.fromarray(composition_image)] else: control_image = Image.fromarray(composition_image) rlt_image = self.controlnet_sd.forward(image_prompt, control_image=control_image) images = [*rlt_image, control_image] return images