FastStableDifussion / src /backend /openvino /stable_diffusion_engine.py
YoBatM's picture
Upload folder using huggingface_hub
99b955f verified
raw
history blame
80.1 kB
"""
Copyright(C) 2022-2023 Intel Corporation
SPDX - License - Identifier: Apache - 2.0
"""
import inspect
from typing import Union, Optional, Any, List, Dict
import numpy as np
# openvino
from openvino.runtime import Core
# tokenizer
from transformers import CLIPTokenizer
import torch
import random
from diffusers import DiffusionPipeline
from diffusers.schedulers import (DDIMScheduler,
LMSDiscreteScheduler,
PNDMScheduler,
EulerDiscreteScheduler,
EulerAncestralDiscreteScheduler)
from diffusers.image_processor import VaeImageProcessor
from diffusers.utils.torch_utils import randn_tensor
from diffusers.utils import PIL_INTERPOLATION
import cv2
import os
import sys
# for multithreading
import concurrent.futures
#For GIF
import PIL
from PIL import Image
import glob
import json
import time
def scale_fit_to_window(dst_width:int, dst_height:int, image_width:int, image_height:int):
"""
Preprocessing helper function for calculating image size for resize with peserving original aspect ratio
and fitting image to specific window size
Parameters:
dst_width (int): destination window width
dst_height (int): destination window height
image_width (int): source image width
image_height (int): source image height
Returns:
result_width (int): calculated width for resize
result_height (int): calculated height for resize
"""
im_scale = min(dst_height / image_height, dst_width / image_width)
return int(im_scale * image_width), int(im_scale * image_height)
def preprocess(image: PIL.Image.Image, ht=512, wt=512):
"""
Image preprocessing function. Takes image in PIL.Image format, resizes it to keep aspect ration and fits to model input window 512x512,
then converts it to np.ndarray and adds padding with zeros on right or bottom side of image (depends from aspect ratio), after that
converts data to float32 data type and change range of values from [0, 255] to [-1, 1], finally, converts data layout from planar NHWC to NCHW.
The function returns preprocessed input tensor and padding size, which can be used in postprocessing.
Parameters:
image (PIL.Image.Image): input image
Returns:
image (np.ndarray): preprocessed image tensor
meta (Dict): dictionary with preprocessing metadata info
"""
src_width, src_height = image.size
image = image.convert('RGB')
dst_width, dst_height = scale_fit_to_window(
wt, ht, src_width, src_height)
image = np.array(image.resize((dst_width, dst_height),
resample=PIL.Image.Resampling.LANCZOS))[None, :]
pad_width = wt - dst_width
pad_height = ht - dst_height
pad = ((0, 0), (0, pad_height), (0, pad_width), (0, 0))
image = np.pad(image, pad, mode="constant")
image = image.astype(np.float32) / 255.0
image = 2.0 * image - 1.0
image = image.transpose(0, 3, 1, 2)
return image, {"padding": pad, "src_width": src_width, "src_height": src_height}
def try_enable_npu_turbo(device, core):
import platform
if "windows" in platform.system().lower():
if "NPU" in device and "3720" not in core.get_property('NPU', 'DEVICE_ARCHITECTURE'):
try:
core.set_property(properties={'NPU_TURBO': 'YES'},device_name='NPU')
except:
print(f"Failed loading NPU_TURBO for device {device}. Skipping... ")
else:
print_npu_turbo_art()
else:
print(f"Skipping NPU_TURBO for device {device}")
elif "linux" in platform.system().lower():
if os.path.isfile('/sys/module/intel_vpu/parameters/test_mode'):
with open('/sys/module/intel_vpu/version', 'r') as f:
version = f.readline().split()[0]
if tuple(map(int, version.split('.'))) < tuple(map(int, '1.9.0'.split('.'))):
print(f"The driver intel_vpu-1.9.0 (or later) needs to be loaded for NPU Turbo (currently {version}). Skipping...")
else:
with open('/sys/module/intel_vpu/parameters/test_mode', 'r') as tm_file:
test_mode = int(tm_file.readline().split()[0])
if test_mode == 512:
print_npu_turbo_art()
else:
print("The driver >=intel_vpu-1.9.0 was must be loaded with "
"\"modprobe intel_vpu test_mode=512\" to enable NPU_TURBO "
f"(currently test_mode={test_mode}). Skipping...")
else:
print(f"The driver >=intel_vpu-1.9.0 must be loaded with \"modprobe intel_vpu test_mode=512\" to enable NPU_TURBO. Skipping...")
else:
print(f"This platform ({platform.system()}) does not support NPU Turbo")
def result(var):
return next(iter(var.values()))
class StableDiffusionEngineAdvanced(DiffusionPipeline):
def __init__(self, model="runwayml/stable-diffusion-v1-5",
tokenizer="openai/clip-vit-large-patch14",
device=["CPU", "CPU", "CPU", "CPU"]):
try:
self.tokenizer = CLIPTokenizer.from_pretrained(model, local_files_only=True)
except:
self.tokenizer = CLIPTokenizer.from_pretrained(tokenizer)
self.tokenizer.save_pretrained(model)
self.core = Core()
self.core.set_property({'CACHE_DIR': os.path.join(model, 'cache')})
try_enable_npu_turbo(device, self.core)
print("Loading models... ")
with concurrent.futures.ThreadPoolExecutor(max_workers=8) as executor:
futures = {
"unet_time_proj": executor.submit(self.core.compile_model, os.path.join(model, "unet_time_proj.xml"), device[0]),
"text": executor.submit(self.load_model, model, "text_encoder", device[0]),
"unet": executor.submit(self.load_model, model, "unet_int8", device[1]),
"unet_neg": executor.submit(self.load_model, model, "unet_int8", device[2]) if device[1] != device[2] else None,
"vae_decoder": executor.submit(self.load_model, model, "vae_decoder", device[3]),
"vae_encoder": executor.submit(self.load_model, model, "vae_encoder", device[3])
}
self.unet_time_proj = futures["unet_time_proj"].result()
self.text_encoder = futures["text"].result()
self.unet = futures["unet"].result()
self.unet_neg = futures["unet_neg"].result() if futures["unet_neg"] else self.unet
self.vae_decoder = futures["vae_decoder"].result()
self.vae_encoder = futures["vae_encoder"].result()
print("Text Device:", device[0])
print("unet Device:", device[1])
print("unet-neg Device:", device[2])
print("VAE Device:", device[3])
self._text_encoder_output = self.text_encoder.output(0)
self._vae_d_output = self.vae_decoder.output(0)
self._vae_e_output = self.vae_encoder.output(0) if self.vae_encoder else None
self.set_dimensions()
self.infer_request_neg = self.unet_neg.create_infer_request()
self.infer_request = self.unet.create_infer_request()
self.infer_request_time_proj = self.unet_time_proj.create_infer_request()
self.time_proj_constants = np.load(os.path.join(model, "time_proj_constants.npy"))
def load_model(self, model, model_name, device):
if "NPU" in device:
with open(os.path.join(model, f"{model_name}.blob"), "rb") as f:
return self.core.import_model(f.read(), device)
return self.core.compile_model(os.path.join(model, f"{model_name}.xml"), device)
def set_dimensions(self):
latent_shape = self.unet.input("latent_model_input").shape
if latent_shape[1] == 4:
self.height = latent_shape[2] * 8
self.width = latent_shape[3] * 8
else:
self.height = latent_shape[1] * 8
self.width = latent_shape[2] * 8
def __call__(
self,
prompt,
init_image = None,
negative_prompt=None,
scheduler=None,
strength = 0.5,
num_inference_steps = 32,
guidance_scale = 7.5,
eta = 0.0,
create_gif = False,
model = None,
callback = None,
callback_userdata = None
):
# extract condition
text_input = self.tokenizer(
prompt,
padding="max_length",
max_length=self.tokenizer.model_max_length,
truncation=True,
return_tensors="np",
)
text_embeddings = self.text_encoder(text_input.input_ids)[self._text_encoder_output]
# do classifier free guidance
do_classifier_free_guidance = guidance_scale > 1.0
if do_classifier_free_guidance:
if negative_prompt is None:
uncond_tokens = [""]
elif isinstance(negative_prompt, str):
uncond_tokens = [negative_prompt]
else:
uncond_tokens = negative_prompt
tokens_uncond = self.tokenizer(
uncond_tokens,
padding="max_length",
max_length=self.tokenizer.model_max_length, #truncation=True,
return_tensors="np"
)
uncond_embeddings = self.text_encoder(tokens_uncond.input_ids)[self._text_encoder_output]
text_embeddings = np.concatenate([uncond_embeddings, text_embeddings])
# set timesteps
accepts_offset = "offset" in set(inspect.signature(scheduler.set_timesteps).parameters.keys())
extra_set_kwargs = {}
if accepts_offset:
extra_set_kwargs["offset"] = 1
scheduler.set_timesteps(num_inference_steps, **extra_set_kwargs)
timesteps, num_inference_steps = self.get_timesteps(num_inference_steps, strength, scheduler)
latent_timestep = timesteps[:1]
# get the initial random noise unless the user supplied it
latents, meta = self.prepare_latents(init_image, latent_timestep, scheduler)
# prepare extra kwargs for the scheduler step, since not all schedulers have the same signature
# eta (η) is only used with the DDIMScheduler, it will be ignored for other schedulers.
# eta corresponds to η in DDIM paper: https://arxiv.org/abs/2010.02502
# and should be between [0, 1]
accepts_eta = "eta" in set(inspect.signature(scheduler.step).parameters.keys())
extra_step_kwargs = {}
if accepts_eta:
extra_step_kwargs["eta"] = eta
if create_gif:
frames = []
for i, t in enumerate(self.progress_bar(timesteps)):
if callback:
callback(i, callback_userdata)
# expand the latents if we are doing classifier free guidance
noise_pred = []
latent_model_input = latents
latent_model_input = scheduler.scale_model_input(latent_model_input, t)
latent_model_input_neg = latent_model_input
if self.unet.input("latent_model_input").shape[1] != 4:
#print("In transpose")
try:
latent_model_input = latent_model_input.permute(0,2,3,1)
except:
latent_model_input = latent_model_input.transpose(0,2,3,1)
if self.unet_neg.input("latent_model_input").shape[1] != 4:
#print("In transpose")
try:
latent_model_input_neg = latent_model_input_neg.permute(0,2,3,1)
except:
latent_model_input_neg = latent_model_input_neg.transpose(0,2,3,1)
time_proj_constants_fp16 = np.float16(self.time_proj_constants)
t_scaled_fp16 = time_proj_constants_fp16 * np.float16(t)
cosine_t_fp16 = np.cos(t_scaled_fp16)
sine_t_fp16 = np.sin(t_scaled_fp16)
t_scaled = self.time_proj_constants * np.float32(t)
cosine_t = np.cos(t_scaled)
sine_t = np.sin(t_scaled)
time_proj_dict = {"sine_t" : np.float32(sine_t), "cosine_t" : np.float32(cosine_t)}
self.infer_request_time_proj.start_async(time_proj_dict)
self.infer_request_time_proj.wait()
time_proj = self.infer_request_time_proj.get_output_tensor(0).data.astype(np.float32)
input_tens_neg_dict = {"time_proj": np.float32(time_proj), "latent_model_input":latent_model_input_neg, "encoder_hidden_states": np.expand_dims(text_embeddings[0], axis=0)}
input_tens_dict = {"time_proj": np.float32(time_proj), "latent_model_input":latent_model_input, "encoder_hidden_states": np.expand_dims(text_embeddings[1], axis=0)}
self.infer_request_neg.start_async(input_tens_neg_dict)
self.infer_request.start_async(input_tens_dict)
self.infer_request_neg.wait()
self.infer_request.wait()
noise_pred_neg = self.infer_request_neg.get_output_tensor(0)
noise_pred_pos = self.infer_request.get_output_tensor(0)
noise_pred.append(noise_pred_neg.data.astype(np.float32))
noise_pred.append(noise_pred_pos.data.astype(np.float32))
# perform guidance
if do_classifier_free_guidance:
noise_pred_uncond, noise_pred_text = noise_pred[0], noise_pred[1]
noise_pred = noise_pred_uncond + guidance_scale * (noise_pred_text - noise_pred_uncond)
# compute the previous noisy sample x_t -> x_t-1
latents = scheduler.step(torch.from_numpy(noise_pred), t, torch.from_numpy(latents), **extra_step_kwargs)["prev_sample"].numpy()
if create_gif:
frames.append(latents)
if callback:
callback(num_inference_steps, callback_userdata)
# scale and decode the image latents with vae
latents = 1 / 0.18215 * latents
start = time.time()
image = self.vae_decoder(latents)[self._vae_d_output]
print("Decoder ended:",time.time() - start)
image = self.postprocess_image(image, meta)
if create_gif:
gif_folder=os.path.join(model,"../../../gif")
print("gif_folder:",gif_folder)
if not os.path.exists(gif_folder):
os.makedirs(gif_folder)
for i in range(0,len(frames)):
image = self.vae_decoder(frames[i]*(1/0.18215))[self._vae_d_output]
image = self.postprocess_image(image, meta)
output = gif_folder + "/" + str(i).zfill(3) +".png"
cv2.imwrite(output, image)
with open(os.path.join(gif_folder, "prompt.json"), "w") as file:
json.dump({"prompt": prompt}, file)
frames_image = [Image.open(image) for image in glob.glob(f"{gif_folder}/*.png")]
frame_one = frames_image[0]
gif_file=os.path.join(gif_folder,"stable_diffusion.gif")
frame_one.save(gif_file, format="GIF", append_images=frames_image, save_all=True, duration=100, loop=0)
return image
def prepare_latents(self, image:PIL.Image.Image = None, latent_timestep:torch.Tensor = None, scheduler = LMSDiscreteScheduler):
"""
Function for getting initial latents for starting generation
Parameters:
image (PIL.Image.Image, *optional*, None):
Input image for generation, if not provided randon noise will be used as starting point
latent_timestep (torch.Tensor, *optional*, None):
Predicted by scheduler initial step for image generation, required for latent image mixing with nosie
Returns:
latents (np.ndarray):
Image encoded in latent space
"""
latents_shape = (1, 4, self.height // 8, self.width // 8)
noise = np.random.randn(*latents_shape).astype(np.float32)
if image is None:
##print("Image is NONE")
# if we use LMSDiscreteScheduler, let's make sure latents are mulitplied by sigmas
if isinstance(scheduler, LMSDiscreteScheduler):
noise = noise * scheduler.sigmas[0].numpy()
return noise, {}
elif isinstance(scheduler, EulerDiscreteScheduler) or isinstance(scheduler,EulerAncestralDiscreteScheduler):
noise = noise * scheduler.sigmas.max().numpy()
return noise, {}
else:
return noise, {}
input_image, meta = preprocess(image,self.height,self.width)
moments = self.vae_encoder(input_image)[self._vae_e_output]
mean, logvar = np.split(moments, 2, axis=1)
std = np.exp(logvar * 0.5)
latents = (mean + std * np.random.randn(*mean.shape)) * 0.18215
latents = scheduler.add_noise(torch.from_numpy(latents), torch.from_numpy(noise), latent_timestep).numpy()
return latents, meta
def postprocess_image(self, image:np.ndarray, meta:Dict):
"""
Postprocessing for decoded image. Takes generated image decoded by VAE decoder, unpad it to initial image size (if required),
normalize and convert to [0, 255] pixels range. Optionally, convertes it from np.ndarray to PIL.Image format
Parameters:
image (np.ndarray):
Generated image
meta (Dict):
Metadata obtained on latents preparing step, can be empty
output_type (str, *optional*, pil):
Output format for result, can be pil or numpy
Returns:
image (List of np.ndarray or PIL.Image.Image):
Postprocessed images
if "src_height" in meta:
orig_height, orig_width = meta["src_height"], meta["src_width"]
image = [cv2.resize(img, (orig_width, orig_height))
for img in image]
return image
"""
if "padding" in meta:
pad = meta["padding"]
(_, end_h), (_, end_w) = pad[1:3]
h, w = image.shape[2:]
#print("image shape",image.shape[2:])
unpad_h = h - end_h
unpad_w = w - end_w
image = image[:, :, :unpad_h, :unpad_w]
image = np.clip(image / 2 + 0.5, 0, 1)
image = (image[0].transpose(1, 2, 0)[:, :, ::-1] * 255).astype(np.uint8)
if "src_height" in meta:
orig_height, orig_width = meta["src_height"], meta["src_width"]
image = cv2.resize(image, (orig_width, orig_height))
return image
def get_timesteps(self, num_inference_steps:int, strength:float, scheduler):
"""
Helper function for getting scheduler timesteps for generation
In case of image-to-image generation, it updates number of steps according to strength
Parameters:
num_inference_steps (int):
number of inference steps for generation
strength (float):
value between 0.0 and 1.0, that controls the amount of noise that is added to the input image.
Values that approach 1.0 allow for lots of variations but will also produce images that are not semantically consistent with the input.
"""
# get the original timestep using init_timestep
init_timestep = min(int(num_inference_steps * strength), num_inference_steps)
t_start = max(num_inference_steps - init_timestep, 0)
timesteps = scheduler.timesteps[t_start:]
return timesteps, num_inference_steps - t_start
class StableDiffusionEngine(DiffusionPipeline):
def __init__(
self,
model="bes-dev/stable-diffusion-v1-4-openvino",
tokenizer="openai/clip-vit-large-patch14",
device=["CPU","CPU","CPU","CPU"]):
self.core = Core()
self.core.set_property({'CACHE_DIR': os.path.join(model, 'cache')})
self.batch_size = 2 if device[1] == device[2] and device[1] == "GPU" else 1
try_enable_npu_turbo(device, self.core)
try:
self.tokenizer = CLIPTokenizer.from_pretrained(model, local_files_only=True)
except Exception as e:
print("Local tokenizer not found. Attempting to download...")
self.tokenizer = self.download_tokenizer(tokenizer, model)
print("Loading models... ")
with concurrent.futures.ThreadPoolExecutor(max_workers=8) as executor:
text_future = executor.submit(self.load_model, model, "text_encoder", device[0])
vae_de_future = executor.submit(self.load_model, model, "vae_decoder", device[3])
vae_en_future = executor.submit(self.load_model, model, "vae_encoder", device[3])
if self.batch_size == 1:
if "int8" not in model:
unet_future = executor.submit(self.load_model, model, "unet_bs1", device[1])
unet_neg_future = executor.submit(self.load_model, model, "unet_bs1", device[2]) if device[1] != device[2] else None
else:
unet_future = executor.submit(self.load_model, model, "unet_int8a16", device[1])
unet_neg_future = executor.submit(self.load_model, model, "unet_int8a16", device[2]) if device[1] != device[2] else None
else:
unet_future = executor.submit(self.load_model, model, "unet", device[1])
unet_neg_future = None
self.unet = unet_future.result()
self.unet_neg = unet_neg_future.result() if unet_neg_future else self.unet
self.text_encoder = text_future.result()
self.vae_decoder = vae_de_future.result()
self.vae_encoder = vae_en_future.result()
print("Text Device:", device[0])
print("unet Device:", device[1])
print("unet-neg Device:", device[2])
print("VAE Device:", device[3])
self._text_encoder_output = self.text_encoder.output(0)
self._unet_output = self.unet.output(0)
self._vae_d_output = self.vae_decoder.output(0)
self._vae_e_output = self.vae_encoder.output(0) if self.vae_encoder else None
self.unet_input_tensor_name = "sample" if 'sample' in self.unet.input(0).names else "latent_model_input"
if self.batch_size == 1:
self.infer_request = self.unet.create_infer_request()
self.infer_request_neg = self.unet_neg.create_infer_request()
self._unet_neg_output = self.unet_neg.output(0)
else:
self.infer_request = None
self.infer_request_neg = None
self._unet_neg_output = None
self.set_dimensions()
def load_model(self, model, model_name, device):
if "NPU" in device:
with open(os.path.join(model, f"{model_name}.blob"), "rb") as f:
return self.core.import_model(f.read(), device)
return self.core.compile_model(os.path.join(model, f"{model_name}.xml"), device)
def set_dimensions(self):
latent_shape = self.unet.input(self.unet_input_tensor_name).shape
if latent_shape[1] == 4:
self.height = latent_shape[2] * 8
self.width = latent_shape[3] * 8
else:
self.height = latent_shape[1] * 8
self.width = latent_shape[2] * 8
def __call__(
self,
prompt,
init_image=None,
negative_prompt=None,
scheduler=None,
strength=0.5,
num_inference_steps=32,
guidance_scale=7.5,
eta=0.0,
create_gif=False,
model=None,
callback=None,
callback_userdata=None
):
# extract condition
text_input = self.tokenizer(
prompt,
padding="max_length",
max_length=self.tokenizer.model_max_length,
truncation=True,
return_tensors="np",
)
text_embeddings = self.text_encoder(text_input.input_ids)[self._text_encoder_output]
# do classifier free guidance
do_classifier_free_guidance = guidance_scale > 1.0
if do_classifier_free_guidance:
if negative_prompt is None:
uncond_tokens = [""]
elif isinstance(negative_prompt, str):
uncond_tokens = [negative_prompt]
else:
uncond_tokens = negative_prompt
tokens_uncond = self.tokenizer(
uncond_tokens,
padding="max_length",
max_length=self.tokenizer.model_max_length, # truncation=True,
return_tensors="np"
)
uncond_embeddings = self.text_encoder(tokens_uncond.input_ids)[self._text_encoder_output]
text_embeddings = np.concatenate([uncond_embeddings, text_embeddings])
# set timesteps
accepts_offset = "offset" in set(inspect.signature(scheduler.set_timesteps).parameters.keys())
extra_set_kwargs = {}
if accepts_offset:
extra_set_kwargs["offset"] = 1
scheduler.set_timesteps(num_inference_steps, **extra_set_kwargs)
timesteps, num_inference_steps = self.get_timesteps(num_inference_steps, strength, scheduler)
latent_timestep = timesteps[:1]
# get the initial random noise unless the user supplied it
latents, meta = self.prepare_latents(init_image, latent_timestep, scheduler,model)
# prepare extra kwargs for the scheduler step, since not all schedulers have the same signature
# eta (η) is only used with the DDIMScheduler, it will be ignored for other schedulers.
# eta corresponds to η in DDIM paper: https://arxiv.org/abs/2010.02502
# and should be between [0, 1]
accepts_eta = "eta" in set(inspect.signature(scheduler.step).parameters.keys())
extra_step_kwargs = {}
if accepts_eta:
extra_step_kwargs["eta"] = eta
if create_gif:
frames = []
for i, t in enumerate(self.progress_bar(timesteps)):
if callback:
callback(i, callback_userdata)
if self.batch_size == 1:
# expand the latents if we are doing classifier free guidance
noise_pred = []
latent_model_input = latents
#Scales the denoising model input by `(sigma**2 + 1) ** 0.5` to match the Euler algorithm.
latent_model_input = scheduler.scale_model_input(latent_model_input, t)
latent_model_input_pos = latent_model_input
latent_model_input_neg = latent_model_input
if self.unet.input(self.unet_input_tensor_name).shape[1] != 4:
try:
latent_model_input_pos = latent_model_input_pos.permute(0,2,3,1)
except:
latent_model_input_pos = latent_model_input_pos.transpose(0,2,3,1)
if self.unet_neg.input(self.unet_input_tensor_name).shape[1] != 4:
try:
latent_model_input_neg = latent_model_input_neg.permute(0,2,3,1)
except:
latent_model_input_neg = latent_model_input_neg.transpose(0,2,3,1)
if "sample" in self.unet_input_tensor_name:
input_tens_neg_dict = {"sample" : latent_model_input_neg, "encoder_hidden_states": np.expand_dims(text_embeddings[0], axis=0), "timestep": np.expand_dims(np.float32(t), axis=0)}
input_tens_pos_dict = {"sample" : latent_model_input_pos, "encoder_hidden_states": np.expand_dims(text_embeddings[1], axis=0), "timestep": np.expand_dims(np.float32(t), axis=0)}
else:
input_tens_neg_dict = {"latent_model_input" : latent_model_input_neg, "encoder_hidden_states": np.expand_dims(text_embeddings[0], axis=0), "t": np.expand_dims(np.float32(t), axis=0)}
input_tens_pos_dict = {"latent_model_input" : latent_model_input_pos, "encoder_hidden_states": np.expand_dims(text_embeddings[1], axis=0), "t": np.expand_dims(np.float32(t), axis=0)}
self.infer_request_neg.start_async(input_tens_neg_dict)
self.infer_request.start_async(input_tens_pos_dict)
self.infer_request_neg.wait()
self.infer_request.wait()
noise_pred_neg = self.infer_request_neg.get_output_tensor(0)
noise_pred_pos = self.infer_request.get_output_tensor(0)
noise_pred.append(noise_pred_neg.data.astype(np.float32))
noise_pred.append(noise_pred_pos.data.astype(np.float32))
else:
latent_model_input = np.concatenate([latents] * 2) if do_classifier_free_guidance else latents
latent_model_input = scheduler.scale_model_input(latent_model_input, t)
noise_pred = self.unet([latent_model_input, np.array(t, dtype=np.float32), text_embeddings])[self._unet_output]
if do_classifier_free_guidance:
noise_pred_uncond, noise_pred_text = noise_pred[0], noise_pred[1]
noise_pred = noise_pred_uncond + guidance_scale * (noise_pred_text - noise_pred_uncond)
# compute the previous noisy sample x_t -> x_t-1
latents = scheduler.step(torch.from_numpy(noise_pred), t, torch.from_numpy(latents), **extra_step_kwargs)["prev_sample"].numpy()
if create_gif:
frames.append(latents)
if callback:
callback(num_inference_steps, callback_userdata)
# scale and decode the image latents with vae
#if self.height == 512 and self.width == 512:
latents = 1 / 0.18215 * latents
image = self.vae_decoder(latents)[self._vae_d_output]
image = self.postprocess_image(image, meta)
return image
def prepare_latents(self, image: PIL.Image.Image = None, latent_timestep: torch.Tensor = None,
scheduler=LMSDiscreteScheduler,model=None):
"""
Function for getting initial latents for starting generation
Parameters:
image (PIL.Image.Image, *optional*, None):
Input image for generation, if not provided randon noise will be used as starting point
latent_timestep (torch.Tensor, *optional*, None):
Predicted by scheduler initial step for image generation, required for latent image mixing with nosie
Returns:
latents (np.ndarray):
Image encoded in latent space
"""
latents_shape = (1, 4, self.height // 8, self.width // 8)
noise = np.random.randn(*latents_shape).astype(np.float32)
if image is None:
#print("Image is NONE")
# if we use LMSDiscreteScheduler, let's make sure latents are mulitplied by sigmas
if isinstance(scheduler, LMSDiscreteScheduler):
noise = noise * scheduler.sigmas[0].numpy()
return noise, {}
elif isinstance(scheduler, EulerDiscreteScheduler):
noise = noise * scheduler.sigmas.max().numpy()
return noise, {}
else:
return noise, {}
input_image, meta = preprocess(image, self.height, self.width)
moments = self.vae_encoder(input_image)[self._vae_e_output]
if "sd_2.1" in model:
latents = moments * 0.18215
else:
mean, logvar = np.split(moments, 2, axis=1)
std = np.exp(logvar * 0.5)
latents = (mean + std * np.random.randn(*mean.shape)) * 0.18215
latents = scheduler.add_noise(torch.from_numpy(latents), torch.from_numpy(noise), latent_timestep).numpy()
return latents, meta
def postprocess_image(self, image: np.ndarray, meta: Dict):
"""
Postprocessing for decoded image. Takes generated image decoded by VAE decoder, unpad it to initila image size (if required),
normalize and convert to [0, 255] pixels range. Optionally, convertes it from np.ndarray to PIL.Image format
Parameters:
image (np.ndarray):
Generated image
meta (Dict):
Metadata obtained on latents preparing step, can be empty
output_type (str, *optional*, pil):
Output format for result, can be pil or numpy
Returns:
image (List of np.ndarray or PIL.Image.Image):
Postprocessed images
if "src_height" in meta:
orig_height, orig_width = meta["src_height"], meta["src_width"]
image = [cv2.resize(img, (orig_width, orig_height))
for img in image]
return image
"""
if "padding" in meta:
pad = meta["padding"]
(_, end_h), (_, end_w) = pad[1:3]
h, w = image.shape[2:]
# print("image shape",image.shape[2:])
unpad_h = h - end_h
unpad_w = w - end_w
image = image[:, :, :unpad_h, :unpad_w]
image = np.clip(image / 2 + 0.5, 0, 1)
image = (image[0].transpose(1, 2, 0)[:, :, ::-1] * 255).astype(np.uint8)
if "src_height" in meta:
orig_height, orig_width = meta["src_height"], meta["src_width"]
image = cv2.resize(image, (orig_width, orig_height))
return image
# image = (image / 2 + 0.5).clip(0, 1)
# image = (image[0].transpose(1, 2, 0)[:, :, ::-1] * 255).astype(np.uint8)
def get_timesteps(self, num_inference_steps: int, strength: float, scheduler):
"""
Helper function for getting scheduler timesteps for generation
In case of image-to-image generation, it updates number of steps according to strength
Parameters:
num_inference_steps (int):
number of inference steps for generation
strength (float):
value between 0.0 and 1.0, that controls the amount of noise that is added to the input image.
Values that approach 1.0 allow for lots of variations but will also produce images that are not semantically consistent with the input.
"""
# get the original timestep using init_timestep
init_timestep = min(int(num_inference_steps * strength), num_inference_steps)
t_start = max(num_inference_steps - init_timestep, 0)
timesteps = scheduler.timesteps[t_start:]
return timesteps, num_inference_steps - t_start
class LatentConsistencyEngine(DiffusionPipeline):
def __init__(
self,
model="SimianLuo/LCM_Dreamshaper_v7",
tokenizer="openai/clip-vit-large-patch14",
device=["CPU", "CPU", "CPU"],
):
super().__init__()
try:
self.tokenizer = CLIPTokenizer.from_pretrained(model, local_files_only=True)
except:
self.tokenizer = CLIPTokenizer.from_pretrained(tokenizer)
self.tokenizer.save_pretrained(model)
self.core = Core()
self.core.set_property({'CACHE_DIR': os.path.join(model, 'cache')}) # adding caching to reduce init time
try_enable_npu_turbo(device, self.core)
with concurrent.futures.ThreadPoolExecutor(max_workers=8) as executor:
text_future = executor.submit(self.load_model, model, "text_encoder", device[0])
unet_future = executor.submit(self.load_model, model, "unet", device[1])
vae_de_future = executor.submit(self.load_model, model, "vae_decoder", device[2])
print("Text Device:", device[0])
self.text_encoder = text_future.result()
self._text_encoder_output = self.text_encoder.output(0)
print("Unet Device:", device[1])
self.unet = unet_future.result()
self._unet_output = self.unet.output(0)
self.infer_request = self.unet.create_infer_request()
print(f"VAE Device: {device[2]}")
self.vae_decoder = vae_de_future.result()
self.infer_request_vae = self.vae_decoder.create_infer_request()
self.safety_checker = None #pipe.safety_checker
self.feature_extractor = None #pipe.feature_extractor
self.vae_scale_factor = 2 ** 3
self.image_processor = VaeImageProcessor(vae_scale_factor=self.vae_scale_factor)
def load_model(self, model, model_name, device):
if "NPU" in device:
with open(os.path.join(model, f"{model_name}.blob"), "rb") as f:
return self.core.import_model(f.read(), device)
return self.core.compile_model(os.path.join(model, f"{model_name}.xml"), device)
def _encode_prompt(
self,
prompt,
num_images_per_prompt,
prompt_embeds: None,
):
r"""
Encodes the prompt into text encoder hidden states.
Args:
prompt (`str` or `List[str]`, *optional*):
prompt to be encoded
num_images_per_prompt (`int`):
number of images that should be generated per prompt
prompt_embeds (`torch.FloatTensor`, *optional*):
Pre-generated text embeddings. Can be used to easily tweak text inputs, *e.g.* prompt weighting. If not
provided, text embeddings will be generated from `prompt` input argument.
"""
if prompt_embeds is None:
text_inputs = self.tokenizer(
prompt,
padding="max_length",
max_length=self.tokenizer.model_max_length,
truncation=True,
return_tensors="pt",
)
text_input_ids = text_inputs.input_ids
untruncated_ids = self.tokenizer(
prompt, padding="longest", return_tensors="pt"
).input_ids
if untruncated_ids.shape[-1] >= text_input_ids.shape[
-1
] and not torch.equal(text_input_ids, untruncated_ids):
removed_text = self.tokenizer.batch_decode(
untruncated_ids[:, self.tokenizer.model_max_length - 1 : -1]
)
logger.warning(
"The following part of your input was truncated because CLIP can only handle sequences up to"
f" {self.tokenizer.model_max_length} tokens: {removed_text}"
)
prompt_embeds = self.text_encoder(text_input_ids, share_inputs=True, share_outputs=True)
prompt_embeds = torch.from_numpy(prompt_embeds[0])
bs_embed, seq_len, _ = prompt_embeds.shape
# duplicate text embeddings for each generation per prompt
prompt_embeds = prompt_embeds.repeat(1, num_images_per_prompt, 1)
prompt_embeds = prompt_embeds.view(
bs_embed * num_images_per_prompt, seq_len, -1
)
# Don't need to get uncond prompt embedding because of LCM Guided Distillation
return prompt_embeds
def run_safety_checker(self, image, dtype):
if self.safety_checker is None:
has_nsfw_concept = None
else:
if torch.is_tensor(image):
feature_extractor_input = self.image_processor.postprocess(
image, output_type="pil"
)
else:
feature_extractor_input = self.image_processor.numpy_to_pil(image)
safety_checker_input = self.feature_extractor(
feature_extractor_input, return_tensors="pt"
)
image, has_nsfw_concept = self.safety_checker(
images=image, clip_input=safety_checker_input.pixel_values.to(dtype)
)
return image, has_nsfw_concept
def prepare_latents(
self, batch_size, num_channels_latents, height, width, dtype, latents=None
):
shape = (
batch_size,
num_channels_latents,
height // self.vae_scale_factor,
width // self.vae_scale_factor,
)
if latents is None:
latents = torch.randn(shape, dtype=dtype)
# scale the initial noise by the standard deviation required by the scheduler
return latents
def get_w_embedding(self, w, embedding_dim=512, dtype=torch.float32):
"""
see https://github.com/google-research/vdm/blob/dc27b98a554f65cdc654b800da5aa1846545d41b/model_vdm.py#L298
Args:
timesteps: torch.Tensor: generate embedding vectors at these timesteps
embedding_dim: int: dimension of the embeddings to generate
dtype: data type of the generated embeddings
Returns:
embedding vectors with shape `(len(timesteps), embedding_dim)`
"""
assert len(w.shape) == 1
w = w * 1000.0
half_dim = embedding_dim // 2
emb = torch.log(torch.tensor(10000.0)) / (half_dim - 1)
emb = torch.exp(torch.arange(half_dim, dtype=dtype) * -emb)
emb = w.to(dtype)[:, None] * emb[None, :]
emb = torch.cat([torch.sin(emb), torch.cos(emb)], dim=1)
if embedding_dim % 2 == 1: # zero pad
emb = torch.nn.functional.pad(emb, (0, 1))
assert emb.shape == (w.shape[0], embedding_dim)
return emb
@torch.no_grad()
def __call__(
self,
prompt: Union[str, List[str]] = None,
height: Optional[int] = 512,
width: Optional[int] = 512,
guidance_scale: float = 7.5,
scheduler = None,
num_images_per_prompt: Optional[int] = 1,
latents: Optional[torch.FloatTensor] = None,
num_inference_steps: int = 4,
lcm_origin_steps: int = 50,
prompt_embeds: Optional[torch.FloatTensor] = None,
output_type: Optional[str] = "pil",
return_dict: bool = True,
model: Optional[Dict[str, any]] = None,
seed: Optional[int] = 1234567,
cross_attention_kwargs: Optional[Dict[str, Any]] = None,
callback = None,
callback_userdata = None
):
# 1. Define call parameters
if prompt is not None and isinstance(prompt, str):
batch_size = 1
elif prompt is not None and isinstance(prompt, list):
batch_size = len(prompt)
else:
batch_size = prompt_embeds.shape[0]
if seed is not None:
torch.manual_seed(seed)
#print("After Step 1: batch size is ", batch_size)
# do_classifier_free_guidance = guidance_scale > 0.0
# In LCM Implementation: cfg_noise = noise_cond + cfg_scale * (noise_cond - noise_uncond) , (cfg_scale > 0.0 using CFG)
# 2. Encode input prompt
prompt_embeds = self._encode_prompt(
prompt,
num_images_per_prompt,
prompt_embeds=prompt_embeds,
)
#print("After Step 2: prompt embeds is ", prompt_embeds)
#print("After Step 2: scheduler is ", scheduler )
# 3. Prepare timesteps
scheduler.set_timesteps(num_inference_steps, original_inference_steps=lcm_origin_steps)
timesteps = scheduler.timesteps
#print("After Step 3: timesteps is ", timesteps)
# 4. Prepare latent variable
num_channels_latents = 4
latents = self.prepare_latents(
batch_size * num_images_per_prompt,
num_channels_latents,
height,
width,
prompt_embeds.dtype,
latents,
)
latents = latents * scheduler.init_noise_sigma
#print("After Step 4: ")
bs = batch_size * num_images_per_prompt
# 5. Get Guidance Scale Embedding
w = torch.tensor(guidance_scale).repeat(bs)
w_embedding = self.get_w_embedding(w, embedding_dim=256)
#print("After Step 5: ")
# 6. LCM MultiStep Sampling Loop:
with self.progress_bar(total=num_inference_steps) as progress_bar:
for i, t in enumerate(timesteps):
if callback:
callback(i+1, callback_userdata)
ts = torch.full((bs,), t, dtype=torch.long)
# model prediction (v-prediction, eps, x)
model_pred = self.unet([latents, ts, prompt_embeds, w_embedding],share_inputs=True, share_outputs=True)[0]
# compute the previous noisy sample x_t -> x_t-1
latents, denoised = scheduler.step(
torch.from_numpy(model_pred), t, latents, return_dict=False
)
progress_bar.update()
#print("After Step 6: ")
vae_start = time.time()
if not output_type == "latent":
image = torch.from_numpy(self.vae_decoder(denoised / 0.18215, share_inputs=True, share_outputs=True)[0])
else:
image = denoised
print("Decoder Ended: ", time.time() - vae_start)
#post_start = time.time()
#if has_nsfw_concept is None:
do_denormalize = [True] * image.shape[0]
#else:
# do_denormalize = [not has_nsfw for has_nsfw in has_nsfw_concept]
#print ("After do_denormalize: image is ", image)
image = self.image_processor.postprocess(
image, output_type=output_type, do_denormalize=do_denormalize
)
return image[0]
class LatentConsistencyEngineAdvanced(DiffusionPipeline):
def __init__(
self,
model="SimianLuo/LCM_Dreamshaper_v7",
tokenizer="openai/clip-vit-large-patch14",
device=["CPU", "CPU", "CPU"],
):
super().__init__()
try:
self.tokenizer = CLIPTokenizer.from_pretrained(model, local_files_only=True)
except:
self.tokenizer = CLIPTokenizer.from_pretrained(tokenizer)
self.tokenizer.save_pretrained(model)
self.core = Core()
self.core.set_property({'CACHE_DIR': os.path.join(model, 'cache')}) # adding caching to reduce init time
#try_enable_npu_turbo(device, self.core)
with concurrent.futures.ThreadPoolExecutor(max_workers=8) as executor:
text_future = executor.submit(self.load_model, model, "text_encoder", device[0])
unet_future = executor.submit(self.load_model, model, "unet", device[1])
vae_de_future = executor.submit(self.load_model, model, "vae_decoder", device[2])
vae_encoder_future = executor.submit(self.load_model, model, "vae_encoder", device[2])
print("Text Device:", device[0])
self.text_encoder = text_future.result()
self._text_encoder_output = self.text_encoder.output(0)
print("Unet Device:", device[1])
self.unet = unet_future.result()
self._unet_output = self.unet.output(0)
self.infer_request = self.unet.create_infer_request()
print(f"VAE Device: {device[2]}")
self.vae_decoder = vae_de_future.result()
self.vae_encoder = vae_encoder_future.result()
self._vae_e_output = self.vae_encoder.output(0) if self.vae_encoder else None
self.infer_request_vae = self.vae_decoder.create_infer_request()
self.safety_checker = None #pipe.safety_checker
self.feature_extractor = None #pipe.feature_extractor
self.vae_scale_factor = 2 ** 3
self.image_processor = VaeImageProcessor(vae_scale_factor=self.vae_scale_factor)
def load_model(self, model, model_name, device):
print(f"Compiling the {model_name} to {device} ...")
return self.core.compile_model(os.path.join(model, f"{model_name}.xml"), device)
def get_timesteps(self, num_inference_steps:int, strength:float, scheduler):
"""
Helper function for getting scheduler timesteps for generation
In case of image-to-image generation, it updates number of steps according to strength
Parameters:
num_inference_steps (int):
number of inference steps for generation
strength (float):
value between 0.0 and 1.0, that controls the amount of noise that is added to the input image.
Values that approach 1.0 allow for lots of variations but will also produce images that are not semantically consistent with the input.
"""
# get the original timestep using init_timestep
init_timestep = min(int(num_inference_steps * strength), num_inference_steps)
t_start = max(num_inference_steps - init_timestep, 0)
timesteps = scheduler.timesteps[t_start:]
return timesteps, num_inference_steps - t_start
def _encode_prompt(
self,
prompt,
num_images_per_prompt,
prompt_embeds: None,
):
r"""
Encodes the prompt into text encoder hidden states.
Args:
prompt (`str` or `List[str]`, *optional*):
prompt to be encoded
num_images_per_prompt (`int`):
number of images that should be generated per prompt
prompt_embeds (`torch.FloatTensor`, *optional*):
Pre-generated text embeddings. Can be used to easily tweak text inputs, *e.g.* prompt weighting. If not
provided, text embeddings will be generated from `prompt` input argument.
"""
if prompt_embeds is None:
text_inputs = self.tokenizer(
prompt,
padding="max_length",
max_length=self.tokenizer.model_max_length,
truncation=True,
return_tensors="pt",
)
text_input_ids = text_inputs.input_ids
untruncated_ids = self.tokenizer(
prompt, padding="longest", return_tensors="pt"
).input_ids
if untruncated_ids.shape[-1] >= text_input_ids.shape[
-1
] and not torch.equal(text_input_ids, untruncated_ids):
removed_text = self.tokenizer.batch_decode(
untruncated_ids[:, self.tokenizer.model_max_length - 1 : -1]
)
logger.warning(
"The following part of your input was truncated because CLIP can only handle sequences up to"
f" {self.tokenizer.model_max_length} tokens: {removed_text}"
)
prompt_embeds = self.text_encoder(text_input_ids, share_inputs=True, share_outputs=True)
prompt_embeds = torch.from_numpy(prompt_embeds[0])
bs_embed, seq_len, _ = prompt_embeds.shape
# duplicate text embeddings for each generation per prompt
prompt_embeds = prompt_embeds.repeat(1, num_images_per_prompt, 1)
prompt_embeds = prompt_embeds.view(
bs_embed * num_images_per_prompt, seq_len, -1
)
# Don't need to get uncond prompt embedding because of LCM Guided Distillation
return prompt_embeds
def run_safety_checker(self, image, dtype):
if self.safety_checker is None:
has_nsfw_concept = None
else:
if torch.is_tensor(image):
feature_extractor_input = self.image_processor.postprocess(
image, output_type="pil"
)
else:
feature_extractor_input = self.image_processor.numpy_to_pil(image)
safety_checker_input = self.feature_extractor(
feature_extractor_input, return_tensors="pt"
)
image, has_nsfw_concept = self.safety_checker(
images=image, clip_input=safety_checker_input.pixel_values.to(dtype)
)
return image, has_nsfw_concep
def prepare_latents(
self,image,timestep,batch_size, num_channels_latents, height, width, dtype, scheduler,latents=None,
):
shape = (
batch_size,
num_channels_latents,
height // self.vae_scale_factor,
width // self.vae_scale_factor,
)
if image:
#latents_shape = (1, 4, 512, 512 // 8)
#input_image, meta = preprocess(image,512,512)
latents_shape = (1, 4, 512 // 8, 512 // 8)
noise = np.random.randn(*latents_shape).astype(np.float32)
input_image,meta = preprocess(image,512,512)
moments = self.vae_encoder(input_image)[self._vae_e_output]
mean, logvar = np.split(moments, 2, axis=1)
std = np.exp(logvar * 0.5)
latents = (mean + std * np.random.randn(*mean.shape)) * 0.18215
noise = torch.randn(shape, dtype=dtype)
#latents = scheduler.add_noise(init_latents, noise, timestep)
latents = scheduler.add_noise(torch.from_numpy(latents), noise, timestep)
else:
latents = torch.randn(shape, dtype=dtype)
# scale the initial noise by the standard deviation required by the scheduler
return latents
def get_w_embedding(self, w, embedding_dim=512, dtype=torch.float32):
"""
see https://github.com/google-research/vdm/blob/dc27b98a554f65cdc654b800da5aa1846545d41b/model_vdm.py#L298
Args:
timesteps: torch.Tensor: generate embedding vectors at these timesteps
embedding_dim: int: dimension of the embeddings to generate
dtype: data type of the generated embeddings
Returns:
embedding vectors with shape `(len(timesteps), embedding_dim)`
"""
assert len(w.shape) == 1
w = w * 1000.0
half_dim = embedding_dim // 2
emb = torch.log(torch.tensor(10000.0)) / (half_dim - 1)
emb = torch.exp(torch.arange(half_dim, dtype=dtype) * -emb)
emb = w.to(dtype)[:, None] * emb[None, :]
emb = torch.cat([torch.sin(emb), torch.cos(emb)], dim=1)
if embedding_dim % 2 == 1: # zero pad
emb = torch.nn.functional.pad(emb, (0, 1))
assert emb.shape == (w.shape[0], embedding_dim)
return emb
@torch.no_grad()
def __call__(
self,
prompt: Union[str, List[str]] = None,
init_image: Optional[PIL.Image.Image] = None,
strength: Optional[float] = 0.8,
height: Optional[int] = 512,
width: Optional[int] = 512,
guidance_scale: float = 7.5,
scheduler = None,
num_images_per_prompt: Optional[int] = 1,
latents: Optional[torch.FloatTensor] = None,
num_inference_steps: int = 4,
lcm_origin_steps: int = 50,
prompt_embeds: Optional[torch.FloatTensor] = None,
output_type: Optional[str] = "pil",
return_dict: bool = True,
model: Optional[Dict[str, any]] = None,
seed: Optional[int] = 1234567,
cross_attention_kwargs: Optional[Dict[str, Any]] = None,
callback = None,
callback_userdata = None
):
# 1. Define call parameters
if prompt is not None and isinstance(prompt, str):
batch_size = 1
elif prompt is not None and isinstance(prompt, list):
batch_size = len(prompt)
else:
batch_size = prompt_embeds.shape[0]
if seed is not None:
torch.manual_seed(seed)
#print("After Step 1: batch size is ", batch_size)
# do_classifier_free_guidance = guidance_scale > 0.0
# In LCM Implementation: cfg_noise = noise_cond + cfg_scale * (noise_cond - noise_uncond) , (cfg_scale > 0.0 using CFG)
# 2. Encode input prompt
prompt_embeds = self._encode_prompt(
prompt,
num_images_per_prompt,
prompt_embeds=prompt_embeds,
)
#print("After Step 2: prompt embeds is ", prompt_embeds)
#print("After Step 2: scheduler is ", scheduler )
# 3. Prepare timesteps
#scheduler.set_timesteps(num_inference_steps, original_inference_steps=lcm_origin_steps)
latent_timestep = None
if init_image:
scheduler.set_timesteps(num_inference_steps, original_inference_steps=lcm_origin_steps)
timesteps, num_inference_steps = self.get_timesteps(num_inference_steps, strength, scheduler)
latent_timestep = timesteps[:1]
else:
scheduler.set_timesteps(num_inference_steps, original_inference_steps=lcm_origin_steps)
timesteps = scheduler.timesteps
#timesteps = scheduler.timesteps
#latent_timestep = timesteps[:1].repeat(batch_size * num_images_per_prompt)
#print("timesteps: ", latent_timestep)
#print("After Step 3: timesteps is ", timesteps)
# 4. Prepare latent variable
num_channels_latents = 4
latents = self.prepare_latents(
init_image,
latent_timestep,
batch_size * num_images_per_prompt,
num_channels_latents,
height,
width,
prompt_embeds.dtype,
scheduler,
latents,
)
latents = latents * scheduler.init_noise_sigma
#print("After Step 4: ")
bs = batch_size * num_images_per_prompt
# 5. Get Guidance Scale Embedding
w = torch.tensor(guidance_scale).repeat(bs)
w_embedding = self.get_w_embedding(w, embedding_dim=256)
#print("After Step 5: ")
# 6. LCM MultiStep Sampling Loop:
with self.progress_bar(total=num_inference_steps) as progress_bar:
for i, t in enumerate(timesteps):
if callback:
callback(i+1, callback_userdata)
ts = torch.full((bs,), t, dtype=torch.long)
# model prediction (v-prediction, eps, x)
model_pred = self.unet([latents, ts, prompt_embeds, w_embedding],share_inputs=True, share_outputs=True)[0]
# compute the previous noisy sample x_t -> x_t-1
latents, denoised = scheduler.step(
torch.from_numpy(model_pred), t, latents, return_dict=False
)
progress_bar.update()
#print("After Step 6: ")
vae_start = time.time()
if not output_type == "latent":
image = torch.from_numpy(self.vae_decoder(denoised / 0.18215, share_inputs=True, share_outputs=True)[0])
else:
image = denoised
print("Decoder Ended: ", time.time() - vae_start)
#post_start = time.time()
#if has_nsfw_concept is None:
do_denormalize = [True] * image.shape[0]
#else:
# do_denormalize = [not has_nsfw for has_nsfw in has_nsfw_concept]
#print ("After do_denormalize: image is ", image)
image = self.image_processor.postprocess(
image, output_type=output_type, do_denormalize=do_denormalize
)
return image[0]
class StableDiffusionEngineReferenceOnly(DiffusionPipeline):
def __init__(
self,
#scheduler: Union[DDIMScheduler, PNDMScheduler, LMSDiscreteScheduler],
model="bes-dev/stable-diffusion-v1-4-openvino",
tokenizer="openai/clip-vit-large-patch14",
device=["CPU","CPU","CPU"]
):
#self.tokenizer = CLIPTokenizer.from_pretrained(tokenizer)
try:
self.tokenizer = CLIPTokenizer.from_pretrained(model,local_files_only=True)
except:
self.tokenizer = CLIPTokenizer.from_pretrained(tokenizer)
self.tokenizer.save_pretrained(model)
#self.scheduler = scheduler
# models
self.core = Core()
self.core.set_property({'CACHE_DIR': os.path.join(model, 'cache')}) #adding caching to reduce init time
# text features
print("Text Device:",device[0])
self.text_encoder = self.core.compile_model(os.path.join(model, "text_encoder.xml"), device[0])
self._text_encoder_output = self.text_encoder.output(0)
# diffusion
print("unet_w Device:",device[1])
self.unet_w = self.core.compile_model(os.path.join(model, "unet_reference_write.xml"), device[1])
self._unet_w_output = self.unet_w.output(0)
self.latent_shape = tuple(self.unet_w.inputs[0].shape)[1:]
print("unet_r Device:",device[1])
self.unet_r = self.core.compile_model(os.path.join(model, "unet_reference_read.xml"), device[1])
self._unet_r_output = self.unet_r.output(0)
# decoder
print("Vae Device:",device[2])
self.vae_decoder = self.core.compile_model(os.path.join(model, "vae_decoder.xml"), device[2])
# encoder
self.vae_encoder = self.core.compile_model(os.path.join(model, "vae_encoder.xml"), device[2])
self.init_image_shape = tuple(self.vae_encoder.inputs[0].shape)[2:]
self._vae_d_output = self.vae_decoder.output(0)
self._vae_e_output = self.vae_encoder.output(0) if self.vae_encoder is not None else None
self.height = self.unet_w.input(0).shape[2] * 8
self.width = self.unet_w.input(0).shape[3] * 8
def __call__(
self,
prompt,
image = None,
negative_prompt=None,
scheduler=None,
strength = 1.0,
num_inference_steps = 32,
guidance_scale = 7.5,
eta = 0.0,
create_gif = False,
model = None,
callback = None,
callback_userdata = None
):
# extract condition
text_input = self.tokenizer(
prompt,
padding="max_length",
max_length=self.tokenizer.model_max_length,
truncation=True,
return_tensors="np",
)
text_embeddings = self.text_encoder(text_input.input_ids)[self._text_encoder_output]
# do classifier free guidance
do_classifier_free_guidance = guidance_scale > 1.0
if do_classifier_free_guidance:
if negative_prompt is None:
uncond_tokens = [""]
elif isinstance(negative_prompt, str):
uncond_tokens = [negative_prompt]
else:
uncond_tokens = negative_prompt
tokens_uncond = self.tokenizer(
uncond_tokens,
padding="max_length",
max_length=self.tokenizer.model_max_length, #truncation=True,
return_tensors="np"
)
uncond_embeddings = self.text_encoder(tokens_uncond.input_ids)[self._text_encoder_output]
text_embeddings = np.concatenate([uncond_embeddings, text_embeddings])
# set timesteps
accepts_offset = "offset" in set(inspect.signature(scheduler.set_timesteps).parameters.keys())
extra_set_kwargs = {}
if accepts_offset:
extra_set_kwargs["offset"] = 1
scheduler.set_timesteps(num_inference_steps, **extra_set_kwargs)
timesteps, num_inference_steps = self.get_timesteps(num_inference_steps, strength, scheduler)
latent_timestep = timesteps[:1]
ref_image = self.prepare_image(
image=image,
width=512,
height=512,
)
# get the initial random noise unless the user supplied it
latents, meta = self.prepare_latents(None, latent_timestep, scheduler)
#ref_image_latents, _ = self.prepare_latents(init_image, latent_timestep, scheduler)
ref_image_latents = self.ov_prepare_ref_latents(ref_image)
# prepare extra kwargs for the scheduler step, since not all schedulers have the same signature
# eta (η) is only used with the DDIMScheduler, it will be ignored for other schedulers.
# eta corresponds to η in DDIM paper: https://arxiv.org/abs/2010.02502
# and should be between [0, 1]
accepts_eta = "eta" in set(inspect.signature(scheduler.step).parameters.keys())
extra_step_kwargs = {}
if accepts_eta:
extra_step_kwargs["eta"] = eta
if create_gif:
frames = []
for i, t in enumerate(self.progress_bar(timesteps)):
if callback:
callback(i, callback_userdata)
# expand the latents if we are doing classifier free guidance
latent_model_input = np.concatenate([latents] * 2) if do_classifier_free_guidance else latents
latent_model_input = scheduler.scale_model_input(latent_model_input, t)
# ref only part
noise = randn_tensor(
ref_image_latents.shape
)
ref_xt = scheduler.add_noise(
torch.from_numpy(ref_image_latents),
noise,
t.reshape(
1,
),
).numpy()
ref_xt = np.concatenate([ref_xt] * 2) if do_classifier_free_guidance else ref_xt
ref_xt = scheduler.scale_model_input(ref_xt, t)
# MODE = "write"
result_w_dict = self.unet_w([
ref_xt,
t,
text_embeddings
])
down_0_attn0 = result_w_dict["/unet/down_blocks.0/attentions.0/transformer_blocks.0/norm1/LayerNormalization_output_0"]
down_0_attn1 = result_w_dict["/unet/down_blocks.0/attentions.1/transformer_blocks.0/norm1/LayerNormalization_output_0"]
down_1_attn0 = result_w_dict["/unet/down_blocks.1/attentions.0/transformer_blocks.0/norm1/LayerNormalization_output_0"]
down_1_attn1 = result_w_dict["/unet/down_blocks.1/attentions.1/transformer_blocks.0/norm1/LayerNormalization_output_0"]
down_2_attn0 = result_w_dict["/unet/down_blocks.2/attentions.0/transformer_blocks.0/norm1/LayerNormalization_output_0"]
down_2_attn1 = result_w_dict["/unet/down_blocks.2/attentions.1/transformer_blocks.0/norm1/LayerNormalization_output_0"]
mid_attn0 = result_w_dict["/unet/mid_block/attentions.0/transformer_blocks.0/norm1/LayerNormalization_output_0"]
up_1_attn0 = result_w_dict["/unet/up_blocks.1/attentions.0/transformer_blocks.0/norm1/LayerNormalization_output_0"]
up_1_attn1 = result_w_dict["/unet/up_blocks.1/attentions.1/transformer_blocks.0/norm1/LayerNormalization_output_0"]
up_1_attn2 = result_w_dict["/unet/up_blocks.1/attentions.2/transformer_blocks.0/norm1/LayerNormalization_output_0"]
up_2_attn0 = result_w_dict["/unet/up_blocks.2/attentions.0/transformer_blocks.0/norm1/LayerNormalization_output_0"]
up_2_attn1 = result_w_dict["/unet/up_blocks.2/attentions.1/transformer_blocks.0/norm1/LayerNormalization_output_0"]
up_2_attn2 = result_w_dict["/unet/up_blocks.2/attentions.2/transformer_blocks.0/norm1/LayerNormalization_output_0"]
up_3_attn0 = result_w_dict["/unet/up_blocks.3/attentions.0/transformer_blocks.0/norm1/LayerNormalization_output_0"]
up_3_attn1 = result_w_dict["/unet/up_blocks.3/attentions.1/transformer_blocks.0/norm1/LayerNormalization_output_0"]
up_3_attn2 = result_w_dict["/unet/up_blocks.3/attentions.2/transformer_blocks.0/norm1/LayerNormalization_output_0"]
# MODE = "read"
noise_pred = self.unet_r([
latent_model_input, t, text_embeddings, down_0_attn0, down_0_attn1, down_1_attn0,
down_1_attn1, down_2_attn0, down_2_attn1, mid_attn0, up_1_attn0, up_1_attn1, up_1_attn2,
up_2_attn0, up_2_attn1, up_2_attn2, up_3_attn0, up_3_attn1, up_3_attn2
])[0]
# perform guidance
if do_classifier_free_guidance:
noise_pred_uncond, noise_pred_text = noise_pred[0], noise_pred[1]
noise_pred = noise_pred_uncond + guidance_scale * (noise_pred_text - noise_pred_uncond)
# compute the previous noisy sample x_t -> x_t-1
latents = scheduler.step(torch.from_numpy(noise_pred), t, torch.from_numpy(latents), **extra_step_kwargs)["prev_sample"].numpy()
if create_gif:
frames.append(latents)
if callback:
callback(num_inference_steps, callback_userdata)
# scale and decode the image latents with vae
image = self.vae_decoder(latents)[self._vae_d_output]
image = self.postprocess_image(image, meta)
if create_gif:
gif_folder=os.path.join(model,"../../../gif")
if not os.path.exists(gif_folder):
os.makedirs(gif_folder)
for i in range(0,len(frames)):
image = self.vae_decoder(frames[i])[self._vae_d_output]
image = self.postprocess_image(image, meta)
output = gif_folder + "/" + str(i).zfill(3) +".png"
cv2.imwrite(output, image)
with open(os.path.join(gif_folder, "prompt.json"), "w") as file:
json.dump({"prompt": prompt}, file)
frames_image = [Image.open(image) for image in glob.glob(f"{gif_folder}/*.png")]
frame_one = frames_image[0]
gif_file=os.path.join(gif_folder,"stable_diffusion.gif")
frame_one.save(gif_file, format="GIF", append_images=frames_image, save_all=True, duration=100, loop=0)
return image
def ov_prepare_ref_latents(self, refimage, vae_scaling_factor=0.18215):
#refimage = refimage.to(device=device, dtype=dtype)
# encode the mask image into latents space so we can concatenate it to the latents
moments = self.vae_encoder(refimage)[0]
mean, logvar = np.split(moments, 2, axis=1)
std = np.exp(logvar * 0.5)
ref_image_latents = (mean + std * np.random.randn(*mean.shape))
ref_image_latents = vae_scaling_factor * ref_image_latents
#ref_image_latents = scheduler.add_noise(torch.from_numpy(ref_image_latents), torch.from_numpy(noise), latent_timestep).numpy()
# aligning device to prevent device errors when concating it with the latent model input
#ref_image_latents = ref_image_latents.to(device=device, dtype=dtype)
return ref_image_latents
def prepare_latents(self, image:PIL.Image.Image = None, latent_timestep:torch.Tensor = None, scheduler = LMSDiscreteScheduler):
"""
Function for getting initial latents for starting generation
Parameters:
image (PIL.Image.Image, *optional*, None):
Input image for generation, if not provided randon noise will be used as starting point
latent_timestep (torch.Tensor, *optional*, None):
Predicted by scheduler initial step for image generation, required for latent image mixing with nosie
Returns:
latents (np.ndarray):
Image encoded in latent space
"""
latents_shape = (1, 4, self.height // 8, self.width // 8)
noise = np.random.randn(*latents_shape).astype(np.float32)
if image is None:
#print("Image is NONE")
# if we use LMSDiscreteScheduler, let's make sure latents are mulitplied by sigmas
if isinstance(scheduler, LMSDiscreteScheduler):
noise = noise * scheduler.sigmas[0].numpy()
return noise, {}
elif isinstance(scheduler, EulerDiscreteScheduler):
noise = noise * scheduler.sigmas.max().numpy()
return noise, {}
else:
return noise, {}
input_image, meta = preprocess(image,self.height,self.width)
moments = self.vae_encoder(input_image)[self._vae_e_output]
mean, logvar = np.split(moments, 2, axis=1)
std = np.exp(logvar * 0.5)
latents = (mean + std * np.random.randn(*mean.shape)) * 0.18215
latents = scheduler.add_noise(torch.from_numpy(latents), torch.from_numpy(noise), latent_timestep).numpy()
return latents, meta
def postprocess_image(self, image:np.ndarray, meta:Dict):
"""
Postprocessing for decoded image. Takes generated image decoded by VAE decoder, unpad it to initila image size (if required),
normalize and convert to [0, 255] pixels range. Optionally, convertes it from np.ndarray to PIL.Image format
Parameters:
image (np.ndarray):
Generated image
meta (Dict):
Metadata obtained on latents preparing step, can be empty
output_type (str, *optional*, pil):
Output format for result, can be pil or numpy
Returns:
image (List of np.ndarray or PIL.Image.Image):
Postprocessed images
if "src_height" in meta:
orig_height, orig_width = meta["src_height"], meta["src_width"]
image = [cv2.resize(img, (orig_width, orig_height))
for img in image]
return image
"""
if "padding" in meta:
pad = meta["padding"]
(_, end_h), (_, end_w) = pad[1:3]
h, w = image.shape[2:]
#print("image shape",image.shape[2:])
unpad_h = h - end_h
unpad_w = w - end_w
image = image[:, :, :unpad_h, :unpad_w]
image = np.clip(image / 2 + 0.5, 0, 1)
image = (image[0].transpose(1, 2, 0)[:, :, ::-1] * 255).astype(np.uint8)
if "src_height" in meta:
orig_height, orig_width = meta["src_height"], meta["src_width"]
image = cv2.resize(image, (orig_width, orig_height))
return image
#image = (image / 2 + 0.5).clip(0, 1)
#image = (image[0].transpose(1, 2, 0)[:, :, ::-1] * 255).astype(np.uint8)
def get_timesteps(self, num_inference_steps:int, strength:float, scheduler):
"""
Helper function for getting scheduler timesteps for generation
In case of image-to-image generation, it updates number of steps according to strength
Parameters:
num_inference_steps (int):
number of inference steps for generation
strength (float):
value between 0.0 and 1.0, that controls the amount of noise that is added to the input image.
Values that approach 1.0 allow for lots of variations but will also produce images that are not semantically consistent with the input.
"""
# get the original timestep using init_timestep
init_timestep = min(int(num_inference_steps * strength), num_inference_steps)
t_start = max(num_inference_steps - init_timestep, 0)
timesteps = scheduler.timesteps[t_start:]
return timesteps, num_inference_steps - t_start
def prepare_image(
self,
image,
width,
height,
do_classifier_free_guidance=False,
guess_mode=False,
):
if not isinstance(image, np.ndarray):
if isinstance(image, PIL.Image.Image):
image = [image]
if isinstance(image[0], PIL.Image.Image):
images = []
for image_ in image:
image_ = image_.convert("RGB")
image_ = image_.resize((width, height), resample=PIL_INTERPOLATION["lanczos"])
image_ = np.array(image_)
image_ = image_[None, :]
images.append(image_)
image = images
image = np.concatenate(image, axis=0)
image = np.array(image).astype(np.float32) / 255.0
image = (image - 0.5) / 0.5
image = image.transpose(0, 3, 1, 2)
elif isinstance(image[0], np.ndarray):
image = np.concatenate(image, dim=0)
if do_classifier_free_guidance and not guess_mode:
image = np.concatenate([image] * 2)
return image
def print_npu_turbo_art():
random_number = random.randint(1, 3)
if random_number == 1:
print(" ")
print(" ___ ___ ___ ___ ___ ___ ")
print(" /\ \ /\ \ /\ \ /\ \ /\ \ _____ /\ \ ")
print(" \:\ \ /::\ \ \:\ \ ___ \:\ \ /::\ \ /::\ \ /::\ \ ")
print(" \:\ \ /:/\:\__\ \:\ \ /\__\ \:\ \ /:/\:\__\ /:/\:\ \ /:/\:\ \ ")
print(" _____\:\ \ /:/ /:/ / ___ \:\ \ /:/ / ___ \:\ \ /:/ /:/ / /:/ /::\__\ /:/ \:\ \ ")
print(" /::::::::\__\ /:/_/:/ / /\ \ \:\__\ /:/__/ /\ \ \:\__\ /:/_/:/__/___ /:/_/:/\:|__| /:/__/ \:\__\ ")
print(" \:\~~\~~\/__/ \:\/:/ / \:\ \ /:/ / /::\ \ \:\ \ /:/ / \:\/:::::/ / \:\/:/ /:/ / \:\ \ /:/ / ")
print(" \:\ \ \::/__/ \:\ /:/ / /:/\:\ \ \:\ /:/ / \::/~~/~~~~ \::/_/:/ / \:\ /:/ / ")
print(" \:\ \ \:\ \ \:\/:/ / \/__\:\ \ \:\/:/ / \:\~~\ \:\/:/ / \:\/:/ / ")
print(" \:\__\ \:\__\ \::/ / \:\__\ \::/ / \:\__\ \::/ / \::/ / ")
print(" \/__/ \/__/ \/__/ \/__/ \/__/ \/__/ \/__/ \/__/ ")
print(" ")
elif random_number == 2:
print(" _ _ ____ _ _ _____ _ _ ____ ____ ___ ")
print("| \ | | | _ \ | | | | |_ _| | | | | | _ \ | __ ) / _ \ ")
print("| \| | | |_) | | | | | | | | | | | | |_) | | _ \ | | | |")
print("| |\ | | __/ | |_| | | | | |_| | | _ < | |_) | | |_| |")
print("|_| \_| |_| \___/ |_| \___/ |_| \_\ |____/ \___/ ")
print(" ")
else:
print("")
print(" ) ( ( ) ")
print(" ( /( )\ ) * ) )\ ) ( ( /( ")
print(" )\()) (()/( ( ` ) /( ( (()/( ( )\ )\()) ")
print("((_)\ /(_)) )\ ( )(_)) )\ /(_)) )((_) ((_)\ ")
print(" _((_) (_)) _ ((_) (_(_()) _ ((_) (_)) ((_)_ ((_) ")
print("| \| | | _ \ | | | | |_ _| | | | | | _ \ | _ ) / _ \ ")
print("| .` | | _/ | |_| | | | | |_| | | / | _ \ | (_) | ")
print("|_|\_| |_| \___/ |_| \___/ |_|_\ |___/ \___/ ")
print(" ")