# -*- coding: utf-8 -*- # Copyright (c) Alibaba, Inc. and its affiliates. import copy import math import random import numpy as np import torch import torch.nn as nn import torch.nn.functional as F import torchvision.transforms.functional as TF from PIL import Image import torchvision.transforms as T from scepter.modules.model.registry import DIFFUSIONS from scepter.modules.model.utils.basic_utils import check_list_of_list from scepter.modules.model.utils.basic_utils import \ pack_imagelist_into_tensor_v2 as pack_imagelist_into_tensor from scepter.modules.model.utils.basic_utils import ( to_device, unpack_tensor_into_imagelist) from scepter.modules.utils.distribute import we from scepter.modules.utils.logger import get_logger from scepter.modules.inference.diffusion_inference import DiffusionInference, get_model def process_edit_image(images, masks, tasks, max_seq_len=1024, max_aspect_ratio=4, d=16, **kwargs): if not isinstance(images, list): images = [images] if not isinstance(masks, list): masks = [masks] if not isinstance(tasks, list): tasks = [tasks] img_tensors = [] mask_tensors = [] for img, mask, task in zip(images, masks, tasks): if mask is None or mask == '': mask = Image.new('L', img.size, 0) W, H = img.size if H / W > max_aspect_ratio: img = TF.center_crop(img, [int(max_aspect_ratio * W), W]) mask = TF.center_crop(mask, [int(max_aspect_ratio * W), W]) elif W / H > max_aspect_ratio: img = TF.center_crop(img, [H, int(max_aspect_ratio * H)]) mask = TF.center_crop(mask, [H, int(max_aspect_ratio * H)]) H, W = img.height, img.width scale = min(1.0, math.sqrt(max_seq_len / ((H / d) * (W / d)))) rH = int(H * scale) // d * d # ensure divisible by self.d rW = int(W * scale) // d * d img = TF.resize(img, (rH, rW), interpolation=TF.InterpolationMode.BICUBIC) mask = TF.resize(mask, (rH, rW), interpolation=TF.InterpolationMode.NEAREST_EXACT) mask = np.asarray(mask) mask = np.where(mask > 128, 1, 0) mask = mask.astype( np.float32) if np.any(mask) else np.ones_like(mask).astype( np.float32) img_tensor = TF.to_tensor(img).to(we.device_id) img_tensor = TF.normalize(img_tensor, mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5]) mask_tensor = TF.to_tensor(mask).to(we.device_id) if task in ['inpainting', 'Try On', 'Inpainting']: mask_indicator = mask_tensor.repeat(3, 1, 1) img_tensor[mask_indicator == 1] = -1.0 img_tensors.append(img_tensor) mask_tensors.append(mask_tensor) return img_tensors, mask_tensors class TextEmbedding(nn.Module): def __init__(self, embedding_shape): super().__init__() self.pos = nn.Parameter(data=torch.zeros(embedding_shape)) class RefinerInference(DiffusionInference): def init_from_cfg(self, cfg): super().init_from_cfg(cfg) self.diffusion = DIFFUSIONS.build(cfg.MODEL.DIFFUSION, logger=self.logger) \ if cfg.MODEL.have('DIFFUSION') else None self.max_seq_length = cfg.MODEL.get("MAX_SEQ_LENGTH", 4096) assert self.diffusion is not None self.dynamic_load(self.cond_stage_model, 'cond_stage_model') self.dynamic_load(self.diffusion_model, 'diffusion_model') self.dynamic_load(self.first_stage_model, 'first_stage_model') @torch.no_grad() def encode_first_stage(self, x, **kwargs): _, dtype = self.get_function_info(self.first_stage_model, 'encode') with torch.autocast('cuda', enabled=dtype in ('float16', 'bfloat16'), dtype=getattr(torch, dtype)): def run_one_image(u): zu = get_model(self.first_stage_model).encode(u) if isinstance(zu, (tuple, list)): zu = zu[0] return zu z = [run_one_image(u.unsqueeze(0) if u.dim == 3 else u) for u in x] return z def upscale_resize(self, image, interpolation=T.InterpolationMode.BILINEAR): c, H, W = image.shape scale = max(1.0, math.sqrt(self.max_seq_length / ((H / 16) * (W / 16)))) rH = int(H * scale) // 16 * 16 # ensure divisible by self.d rW = int(W * scale) // 16 * 16 image = T.Resize((rH, rW), interpolation=interpolation, antialias=True)(image) return image @torch.no_grad() def decode_first_stage(self, z): _, dtype = self.get_function_info(self.first_stage_model, 'decode') with torch.autocast('cuda', enabled=dtype in ('float16', 'bfloat16'), dtype=getattr(torch, dtype)): return [get_model(self.first_stage_model).decode(zu) for zu in z] def noise_sample(self, num_samples, h, w, seed, device = None, dtype = torch.bfloat16): noise = torch.randn( num_samples, 16, # allow for packing 2 * math.ceil(h / 16), 2 * math.ceil(w / 16), device=device, dtype=dtype, generator=torch.Generator(device=device).manual_seed(seed), ) return noise def refine(self, x_samples=None, prompt=None, reverse_scale=-1., seed = 2024, use_dynamic_model = False, **kwargs ): print(prompt) value_input = copy.deepcopy(self.input) x_samples = [self.upscale_resize(x) for x in x_samples] noise = [] for i, x in enumerate(x_samples): noise_ = self.noise_sample(1, x.shape[1], x.shape[2], seed, device = x.device) noise.append(noise_) noise, x_shapes = pack_imagelist_into_tensor(noise) if reverse_scale > 0: if use_dynamic_model: self.dynamic_load(self.first_stage_model, 'first_stage_model') x_samples = [x.unsqueeze(0) for x in x_samples] x_start = self.encode_first_stage(x_samples, **kwargs) if use_dynamic_model: self.dynamic_unload(self.first_stage_model, 'first_stage_model', skip_loaded=True) x_start, _ = pack_imagelist_into_tensor(x_start) else: x_start = None # cond stage if use_dynamic_model: self.dynamic_load(self.cond_stage_model, 'cond_stage_model') function_name, dtype = self.get_function_info(self.cond_stage_model) with torch.autocast('cuda', enabled=dtype == 'float16', dtype=getattr(torch, dtype)): ctx = getattr(get_model(self.cond_stage_model), function_name)(prompt) ctx["x_shapes"] = x_shapes if use_dynamic_model: self.dynamic_unload(self.cond_stage_model, 'cond_stage_model', skip_loaded=True) if use_dynamic_model: self.dynamic_load(self.diffusion_model, 'diffusion_model') # UNet use input n_prompt function_name, dtype = self.get_function_info( self.diffusion_model) with torch.autocast('cuda', enabled=dtype in ('float16', 'bfloat16'), dtype=getattr(torch, dtype)): solver_sample = value_input.get('sample', 'flow_euler') sample_steps = value_input.get('sample_steps', 20) guide_scale = value_input.get('guide_scale', 3.5) if guide_scale is not None: guide_scale = torch.full((noise.shape[0],), guide_scale, device=noise.device, dtype=noise.dtype) else: guide_scale = None latent = self.diffusion.sample( noise=noise, sampler=solver_sample, model=get_model(self.diffusion_model), model_kwargs={"cond": ctx, "guidance": guide_scale}, steps=sample_steps, show_progress=True, guide_scale=guide_scale, return_intermediate=None, reverse_scale=reverse_scale, x=x_start, **kwargs).float() latent = unpack_tensor_into_imagelist(latent, x_shapes) if use_dynamic_model: self.dynamic_unload(self.diffusion_model, 'diffusion_model', skip_loaded=True) if use_dynamic_model: self.dynamic_load(self.first_stage_model, 'first_stage_model') x_samples = self.decode_first_stage(latent) if use_dynamic_model: self.dynamic_unload(self.first_stage_model, 'first_stage_model', skip_loaded=True) return x_samples class ACEInference(DiffusionInference): def __init__(self, logger=None): if logger is None: logger = get_logger(name='scepter') self.logger = logger self.loaded_model = {} self.loaded_model_name = [ 'diffusion_model', 'first_stage_model', 'cond_stage_model' ] def init_from_cfg(self, cfg): self.name = cfg.NAME self.is_default = cfg.get('IS_DEFAULT', False) self.use_dynamic_model = cfg.get('USE_DYNAMIC_MODEL', True) module_paras = self.load_default(cfg.get('DEFAULT_PARAS', None)) assert cfg.have('MODEL') self.diffusion_model = self.infer_model( cfg.MODEL.DIFFUSION_MODEL, module_paras.get( 'DIFFUSION_MODEL', None)) if cfg.MODEL.have('DIFFUSION_MODEL') else None self.first_stage_model = self.infer_model( cfg.MODEL.FIRST_STAGE_MODEL, module_paras.get( 'FIRST_STAGE_MODEL', None)) if cfg.MODEL.have('FIRST_STAGE_MODEL') else None self.cond_stage_model = self.infer_model( cfg.MODEL.COND_STAGE_MODEL, module_paras.get( 'COND_STAGE_MODEL', None)) if cfg.MODEL.have('COND_STAGE_MODEL') else None self.refiner_model_cfg = cfg.get('REFINER_MODEL', None) # self.refiner_scale = cfg.get('REFINER_SCALE', 0.) # self.refiner_prompt = cfg.get('REFINER_PROMPT', "") self.ace_prompt = cfg.get("ACE_PROMPT", []) if self.refiner_model_cfg: self.refiner_module = RefinerInference(self.logger) self.refiner_module.init_from_cfg(self.refiner_model_cfg) else: self.refiner_module = None self.diffusion = DIFFUSIONS.build(cfg.MODEL.DIFFUSION, logger=self.logger) self.interpolate_func = lambda x: (F.interpolate( x.unsqueeze(0), scale_factor=1 / self.size_factor, mode='nearest-exact') if x is not None else None) self.text_indentifers = cfg.MODEL.get('TEXT_IDENTIFIER', []) self.use_text_pos_embeddings = cfg.MODEL.get('USE_TEXT_POS_EMBEDDINGS', False) if self.use_text_pos_embeddings: self.text_position_embeddings = TextEmbedding( (10, 4096)).eval().requires_grad_(False).to(we.device_id) else: self.text_position_embeddings = None self.max_seq_len = cfg.MODEL.DIFFUSION_MODEL.MAX_SEQ_LEN self.scale_factor = cfg.get('SCALE_FACTOR', 0.18215) self.size_factor = cfg.get('SIZE_FACTOR', 8) self.decoder_bias = cfg.get('DECODER_BIAS', 0) self.default_n_prompt = cfg.get('DEFAULT_N_PROMPT', '') self.dynamic_load(self.cond_stage_model, 'cond_stage_model') self.dynamic_load(self.diffusion_model, 'diffusion_model') self.dynamic_load(self.first_stage_model, 'first_stage_model') @torch.no_grad() def encode_first_stage(self, x, **kwargs): _, dtype = self.get_function_info(self.first_stage_model, 'encode') with torch.autocast('cuda', enabled=(dtype != 'float32'), dtype=getattr(torch, dtype)): z = [ self.scale_factor * get_model(self.first_stage_model)._encode( i.unsqueeze(0).to(getattr(torch, dtype))) for i in x ] return z @torch.no_grad() def decode_first_stage(self, z): _, dtype = self.get_function_info(self.first_stage_model, 'decode') with torch.autocast('cuda', enabled=(dtype != 'float32'), dtype=getattr(torch, dtype)): x = [ get_model(self.first_stage_model)._decode( 1. / self.scale_factor * i.to(getattr(torch, dtype))) for i in z ] return x @torch.no_grad() def __call__(self, image=None, mask=None, prompt='', task=None, negative_prompt='', output_height=512, output_width=512, sampler='ddim', sample_steps=20, guide_scale=4.5, guide_rescale=0.5, seed=-1, history_io=None, tar_index=0, **kwargs): input_image, input_mask = image, mask g = torch.Generator(device=we.device_id) seed = seed if seed >= 0 else random.randint(0, 2**32 - 1) g.manual_seed(int(seed)) if input_image is not None: # assert isinstance(input_image, list) and isinstance(input_mask, list) if task is None: task = [''] * len(input_image) if not isinstance(prompt, list): prompt = [prompt] * len(input_image) if history_io is not None and len(history_io) > 0: his_image, his_maks, his_prompt, his_task = history_io[ 'image'], history_io['mask'], history_io[ 'prompt'], history_io['task'] assert len(his_image) == len(his_maks) == len( his_prompt) == len(his_task) input_image = his_image + input_image input_mask = his_maks + input_mask task = his_task + task prompt = his_prompt + [prompt[-1]] prompt = [ pp.replace('{image}', f'{{image{i}}}') if i > 0 else pp for i, pp in enumerate(prompt) ] edit_image, edit_image_mask = process_edit_image( input_image, input_mask, task, max_seq_len=self.max_seq_len) image, image_mask = edit_image[tar_index], edit_image_mask[ tar_index] edit_image, edit_image_mask = [edit_image], [edit_image_mask] else: edit_image = edit_image_mask = [[]] image = torch.zeros( size=[3, int(output_height), int(output_width)]) image_mask = torch.ones( size=[1, int(output_height), int(output_width)]) if not isinstance(prompt, list): prompt = [prompt] image, image_mask, prompt = [image], [image_mask], [prompt] assert check_list_of_list(prompt) and check_list_of_list( edit_image) and check_list_of_list(edit_image_mask) # Assign Negative Prompt if isinstance(negative_prompt, list): negative_prompt = negative_prompt[0] assert isinstance(negative_prompt, str) n_prompt = copy.deepcopy(prompt) for nn_p_id, nn_p in enumerate(n_prompt): assert isinstance(nn_p, list) n_prompt[nn_p_id][-1] = negative_prompt is_txt_image = sum([len(e_i) for e_i in edit_image]) < 1 image = to_device(image) refiner_scale = kwargs.pop("refiner_scale", 0.0) refiner_prompt = kwargs.pop("refiner_prompt", "") use_ace = kwargs.pop("use_ace", True) # <= 0 use ace as the txt2img generator. if use_ace and (not is_txt_image or refiner_scale <= 0): ctx, null_ctx = {}, {} # Get Noise Shape self.dynamic_load(self.first_stage_model, 'first_stage_model') x = self.encode_first_stage(image) self.dynamic_unload(self.first_stage_model, 'first_stage_model', skip_loaded=True) noise = [ torch.empty(*i.shape, device=we.device_id).normal_(generator=g) for i in x ] noise, x_shapes = pack_imagelist_into_tensor(noise) ctx['x_shapes'] = null_ctx['x_shapes'] = x_shapes image_mask = to_device(image_mask, strict=False) cond_mask = [self.interpolate_func(i) for i in image_mask ] if image_mask is not None else [None] * len(image) ctx['x_mask'] = null_ctx['x_mask'] = cond_mask # Encode Prompt self.dynamic_load(self.cond_stage_model, 'cond_stage_model') function_name, dtype = self.get_function_info(self.cond_stage_model) cont, cont_mask = getattr(get_model(self.cond_stage_model), function_name)(prompt) cont, cont_mask = self.cond_stage_embeddings(prompt, edit_image, cont, cont_mask) null_cont, null_cont_mask = getattr(get_model(self.cond_stage_model), function_name)(n_prompt) null_cont, null_cont_mask = self.cond_stage_embeddings( prompt, edit_image, null_cont, null_cont_mask) self.dynamic_unload(self.cond_stage_model, 'cond_stage_model', skip_loaded=False) ctx['crossattn'] = cont null_ctx['crossattn'] = null_cont # Encode Edit Images self.dynamic_load(self.first_stage_model, 'first_stage_model') edit_image = [to_device(i, strict=False) for i in edit_image] edit_image_mask = [to_device(i, strict=False) for i in edit_image_mask] e_img, e_mask = [], [] for u, m in zip(edit_image, edit_image_mask): if u is None: continue if m is None: m = [None] * len(u) e_img.append(self.encode_first_stage(u, **kwargs)) e_mask.append([self.interpolate_func(i) for i in m]) self.dynamic_unload(self.first_stage_model, 'first_stage_model', skip_loaded=True) null_ctx['edit'] = ctx['edit'] = e_img null_ctx['edit_mask'] = ctx['edit_mask'] = e_mask # Diffusion Process self.dynamic_load(self.diffusion_model, 'diffusion_model') function_name, dtype = self.get_function_info(self.diffusion_model) with torch.autocast('cuda', enabled=dtype in ('float16', 'bfloat16'), dtype=getattr(torch, dtype)): latent = self.diffusion.sample( noise=noise, sampler=sampler, model=get_model(self.diffusion_model), model_kwargs=[{ 'cond': ctx, 'mask': cont_mask, 'text_position_embeddings': self.text_position_embeddings.pos if hasattr( self.text_position_embeddings, 'pos') else None }, { 'cond': null_ctx, 'mask': null_cont_mask, 'text_position_embeddings': self.text_position_embeddings.pos if hasattr( self.text_position_embeddings, 'pos') else None }] if guide_scale is not None and guide_scale > 1 else { 'cond': null_ctx, 'mask': cont_mask, 'text_position_embeddings': self.text_position_embeddings.pos if hasattr( self.text_position_embeddings, 'pos') else None }, steps=sample_steps, show_progress=True, seed=seed, guide_scale=guide_scale, guide_rescale=guide_rescale, return_intermediate=None, **kwargs) self.dynamic_unload(self.diffusion_model, 'diffusion_model', skip_loaded=False) # Decode to Pixel Space self.dynamic_load(self.first_stage_model, 'first_stage_model') samples = unpack_tensor_into_imagelist(latent, x_shapes) x_samples = self.decode_first_stage(samples) self.dynamic_unload(self.first_stage_model, 'first_stage_model', skip_loaded=False) x_samples = [x.squeeze(0) for x in x_samples] else: x_samples = image if self.refiner_module and refiner_scale > 0: if is_txt_image: random.shuffle(self.ace_prompt) input_refine_prompt = [self.ace_prompt[0] + refiner_prompt if p[0] == "" else p[0] for p in prompt] input_refine_scale = -1. else: input_refine_prompt = [p[0].replace("{image}", "") + " " + refiner_prompt for p in prompt] input_refine_scale = refiner_scale print(input_refine_prompt) x_samples = self.refiner_module.refine(x_samples, reverse_scale = input_refine_scale, prompt= input_refine_prompt, seed=seed, use_dynamic_model=self.use_dynamic_model) imgs = [ torch.clamp((x_i.float() + 1.0) / 2.0 + self.decoder_bias / 255, min=0.0, max=1.0).squeeze(0).permute(1, 2, 0).cpu().numpy() for x_i in x_samples ] imgs = [Image.fromarray((img * 255).astype(np.uint8)) for img in imgs] return imgs def cond_stage_embeddings(self, prompt, edit_image, cont, cont_mask): if self.use_text_pos_embeddings and not torch.sum( self.text_position_embeddings.pos) > 0: identifier_cont, _ = getattr(get_model(self.cond_stage_model), 'encode')(self.text_indentifers, return_mask=True) self.text_position_embeddings.load_state_dict( {'pos': identifier_cont[:, 0, :]}) cont_, cont_mask_ = [], [] for pp, edit, c, cm in zip(prompt, edit_image, cont, cont_mask): if isinstance(pp, list): cont_.append([c[-1], *c] if len(edit) > 0 else [c[-1]]) cont_mask_.append([cm[-1], *cm] if len(edit) > 0 else [cm[-1]]) else: raise NotImplementedError return cont_, cont_mask_