Spaces:
Running
Running
import inspect | |
from pathlib import Path | |
from tempfile import TemporaryDirectory | |
from typing import List, Optional, Tuple, Union, Dict, Any, Callable, OrderedDict | |
import numpy as np | |
import openvino | |
import torch | |
from diffusers.pipelines.stable_diffusion import StableDiffusionPipelineOutput | |
from optimum.intel.openvino.modeling_diffusion import OVStableDiffusionPipeline, OVModelUnet, OVModelVaeDecoder, OVModelTextEncoder, OVModelVaeEncoder, VaeImageProcessor | |
from optimum.utils import ( | |
DIFFUSION_MODEL_TEXT_ENCODER_2_SUBFOLDER, | |
DIFFUSION_MODEL_TEXT_ENCODER_SUBFOLDER, | |
DIFFUSION_MODEL_UNET_SUBFOLDER, | |
DIFFUSION_MODEL_VAE_DECODER_SUBFOLDER, | |
DIFFUSION_MODEL_VAE_ENCODER_SUBFOLDER, | |
) | |
from diffusers import logging | |
logger = logging.get_logger(__name__) # pylint: disable=invalid-name | |
class LCMOVModelUnet(OVModelUnet): | |
def __call__( | |
self, | |
sample: np.ndarray, | |
timestep: np.ndarray, | |
encoder_hidden_states: np.ndarray, | |
timestep_cond: Optional[np.ndarray] = None, | |
text_embeds: Optional[np.ndarray] = None, | |
time_ids: Optional[np.ndarray] = None, | |
): | |
self._compile() | |
inputs = { | |
"sample": sample, | |
"timestep": timestep, | |
"encoder_hidden_states": encoder_hidden_states, | |
} | |
if timestep_cond is not None: | |
inputs["timestep_cond"] = timestep_cond | |
if text_embeds is not None: | |
inputs["text_embeds"] = text_embeds | |
if time_ids is not None: | |
inputs["time_ids"] = time_ids | |
outputs = self.request(inputs, shared_memory=True) | |
return list(outputs.values()) | |
class OVLatentConsistencyModelPipeline(OVStableDiffusionPipeline): | |
def __init__( | |
self, | |
vae_decoder: openvino.runtime.Model, | |
text_encoder: openvino.runtime.Model, | |
unet: openvino.runtime.Model, | |
config: Dict[str, Any], | |
tokenizer: "CLIPTokenizer", | |
scheduler: Union["DDIMScheduler", "PNDMScheduler", "LMSDiscreteScheduler"], | |
feature_extractor: Optional["CLIPFeatureExtractor"] = None, | |
vae_encoder: Optional[openvino.runtime.Model] = None, | |
text_encoder_2: Optional[openvino.runtime.Model] = None, | |
tokenizer_2: Optional["CLIPTokenizer"] = None, | |
device: str = "CPU", | |
dynamic_shapes: bool = True, | |
compile: bool = True, | |
ov_config: Optional[Dict[str, str]] = None, | |
model_save_dir: Optional[Union[str, Path, TemporaryDirectory]] = None, | |
**kwargs, | |
): | |
self._internal_dict = config | |
self._device = device.upper() | |
self.is_dynamic = dynamic_shapes | |
self.ov_config = ov_config if ov_config is not None else {} | |
self._model_save_dir = ( | |
Path(model_save_dir.name) if isinstance(model_save_dir, TemporaryDirectory) else model_save_dir | |
) | |
self.vae_decoder = OVModelVaeDecoder(vae_decoder, self) | |
self.unet = LCMOVModelUnet(unet, self) | |
self.text_encoder = OVModelTextEncoder(text_encoder, self) if text_encoder is not None else None | |
self.text_encoder_2 = ( | |
OVModelTextEncoder(text_encoder_2, self, model_name=DIFFUSION_MODEL_TEXT_ENCODER_2_SUBFOLDER) | |
if text_encoder_2 is not None | |
else None | |
) | |
self.vae_encoder = OVModelVaeEncoder(vae_encoder, self) if vae_encoder is not None else None | |
if "block_out_channels" in self.vae_decoder.config: | |
self.vae_scale_factor = 2 ** (len(self.vae_decoder.config["block_out_channels"]) - 1) | |
else: | |
self.vae_scale_factor = 8 | |
self.image_processor = VaeImageProcessor(vae_scale_factor=self.vae_scale_factor) | |
self.tokenizer = tokenizer | |
self.tokenizer_2 = tokenizer_2 | |
self.scheduler = scheduler | |
self.feature_extractor = feature_extractor | |
self.safety_checker = None | |
self.preprocessors = [] | |
if self.is_dynamic: | |
self.reshape(batch_size=-1, height=-1, width=-1, num_images_per_prompt=-1) | |
if compile: | |
self.compile() | |
sub_models = { | |
DIFFUSION_MODEL_TEXT_ENCODER_SUBFOLDER: self.text_encoder, | |
DIFFUSION_MODEL_UNET_SUBFOLDER: self.unet, | |
DIFFUSION_MODEL_VAE_DECODER_SUBFOLDER: self.vae_decoder, | |
DIFFUSION_MODEL_VAE_ENCODER_SUBFOLDER: self.vae_encoder, | |
DIFFUSION_MODEL_TEXT_ENCODER_2_SUBFOLDER: self.text_encoder_2, | |
} | |
for name in sub_models.keys(): | |
self._internal_dict[name] = ( | |
("optimum", sub_models[name].__class__.__name__) if sub_models[name] is not None else (None, None) | |
) | |
self._internal_dict.pop("vae", None) | |
def _reshape_unet( | |
self, | |
model: openvino.runtime.Model, | |
batch_size: int = -1, | |
height: int = -1, | |
width: int = -1, | |
num_images_per_prompt: int = -1, | |
tokenizer_max_length: int = -1, | |
): | |
if batch_size == -1 or num_images_per_prompt == -1: | |
batch_size = -1 | |
else: | |
batch_size = batch_size * num_images_per_prompt | |
height = height // self.vae_scale_factor if height > 0 else height | |
width = width // self.vae_scale_factor if width > 0 else width | |
shapes = {} | |
for inputs in model.inputs: | |
shapes[inputs] = inputs.get_partial_shape() | |
if inputs.get_any_name() == "timestep": | |
shapes[inputs][0] = 1 | |
elif inputs.get_any_name() == "sample": | |
in_channels = self.unet.config.get("in_channels", None) | |
if in_channels is None: | |
in_channels = shapes[inputs][1] | |
if in_channels.is_dynamic: | |
logger.warning( | |
"Could not identify `in_channels` from the unet configuration, to statically reshape the unet please provide a configuration." | |
) | |
self.is_dynamic = True | |
shapes[inputs] = [batch_size, in_channels, height, width] | |
elif inputs.get_any_name() == "timestep_cond": | |
shapes[inputs] = [batch_size, inputs.get_partial_shape()[1]] | |
elif inputs.get_any_name() == "text_embeds": | |
shapes[inputs] = [batch_size, self.text_encoder_2.config["projection_dim"]] | |
elif inputs.get_any_name() == "time_ids": | |
shapes[inputs] = [batch_size, inputs.get_partial_shape()[1]] | |
else: | |
shapes[inputs][0] = batch_size | |
shapes[inputs][1] = tokenizer_max_length | |
model.reshape(shapes) | |
return model | |
def get_guidance_scale_embedding(self, w, embedding_dim=512, dtype=np.float32): | |
""" | |
see https://github.com/google-research/vdm/blob/dc27b98a554f65cdc654b800da5aa1846545d41b/model_vdm.py#L298 | |
Args: | |
timesteps: np.array: generate embedding vectors at these timesteps | |
embedding_dim: int: dimension of the embeddings to generate | |
dtype: data type of the generated embeddings | |
Returns: | |
embedding vectors with shape `(len(timesteps), embedding_dim)` | |
""" | |
assert len(w.shape) == 1 | |
w = w * 1000. | |
half_dim = embedding_dim // 2 | |
emb = np.log(np.array(10000.)) / (half_dim - 1) | |
emb = np.exp(np.arange(half_dim, dtype=dtype) * -emb) | |
emb = w.astype(dtype)[:, None] * emb[None, :] | |
emb = np.concatenate([np.sin(emb), np.cos(emb)], axis=1) | |
if embedding_dim % 2 == 1: # zero pad | |
emb = np.pad(emb, (0, 1)) | |
assert emb.shape == (w.shape[0], embedding_dim) | |
return emb | |
# Adapted from https://github.com/huggingface/optimum/blob/15b8d1eed4d83c5004d3b60f6b6f13744b358f01/optimum/pipelines/diffusers/pipeline_stable_diffusion.py#L201 | |
def __call__( | |
self, | |
prompt: Optional[Union[str, List[str]]] = None, | |
height: Optional[int] = None, | |
width: Optional[int] = None, | |
num_inference_steps: int = 4, | |
original_inference_steps: int = None, | |
guidance_scale: float = 7.5, | |
num_images_per_prompt: int = 1, | |
eta: float = 0.0, | |
generator: Optional[np.random.RandomState] = None, | |
latents: Optional[np.ndarray] = None, | |
prompt_embeds: Optional[np.ndarray] = None, | |
output_type: str = "pil", | |
return_dict: bool = True, | |
callback: Optional[Callable[[int, int, np.ndarray], None]] = None, | |
callback_steps: int = 1, | |
guidance_rescale: float = 0.0, | |
): | |
r""" | |
Function invoked when calling the pipeline for generation. | |
Args: | |
prompt (`Optional[Union[str, List[str]]]`, defaults to None): | |
The prompt or prompts to guide the image generation. If not defined, one has to pass `prompt_embeds`. | |
instead. | |
height (`Optional[int]`, defaults to None): | |
The height in pixels of the generated image. | |
width (`Optional[int]`, defaults to None): | |
The width in pixels of the generated image. | |
num_inference_steps (`int`, defaults to 4): | |
The number of denoising steps. More denoising steps usually lead to a higher quality image at the | |
expense of slower inference. | |
original_inference_steps (`int`, *optional*): | |
The original number of inference steps use to generate a linearly-spaced timestep schedule, from which | |
we will draw `num_inference_steps` evenly spaced timesteps from as our final timestep schedule, | |
following the Skipping-Step method in the paper (see Section 4.3). If not set this will default to the | |
scheduler's `original_inference_steps` attribute. | |
guidance_scale (`float`, defaults to 7.5): | |
Guidance scale as defined in [Classifier-Free Diffusion Guidance](https://arxiv.org/abs/2207.12598). | |
`guidance_scale` is defined as `w` of equation 2. of [Imagen | |
Paper](https://arxiv.org/pdf/2205.11487.pdf). Guidance scale is enabled by setting `guidance_scale > | |
1`. Higher guidance scale encourages to generate images that are closely linked to the text `prompt`, | |
usually at the expense of lower image quality. | |
num_images_per_prompt (`int`, defaults to 1): | |
The number of images to generate per prompt. | |
eta (`float`, defaults to 0.0): | |
Corresponds to parameter eta (η) in the DDIM paper: https://arxiv.org/abs/2010.02502. Only applies to | |
[`schedulers.DDIMScheduler`], will be ignored for others. | |
generator (`Optional[np.random.RandomState]`, defaults to `None`):: | |
A np.random.RandomState to make generation deterministic. | |
latents (`Optional[np.ndarray]`, defaults to `None`): | |
Pre-generated noisy latents, sampled from a Gaussian distribution, to be used as inputs for image | |
generation. Can be used to tweak the same generation with different prompts. If not provided, a latents | |
tensor will ge generated by sampling using the supplied random `generator`. | |
prompt_embeds (`Optional[np.ndarray]`, defaults to `None`): | |
Pre-generated text embeddings. Can be used to easily tweak text inputs, *e.g.* prompt weighting. If not | |
provided, text embeddings will be generated from `prompt` input argument. | |
output_type (`str`, defaults to `"pil"`): | |
The output format of the generate image. Choose between | |
[PIL](https://pillow.readthedocs.io/en/stable/): `PIL.Image.Image` or `np.array`. | |
return_dict (`bool`, defaults to `True`): | |
Whether or not to return a [`~pipelines.stable_diffusion.StableDiffusionPipelineOutput`] instead of a | |
plain tuple. | |
callback (Optional[Callable], defaults to `None`): | |
A function that will be called every `callback_steps` steps during inference. The function will be | |
called with the following arguments: `callback(step: int, timestep: int, latents: torch.FloatTensor)`. | |
callback_steps (`int`, defaults to 1): | |
The frequency at which the `callback` function will be called. If not specified, the callback will be | |
called at every step. | |
guidance_rescale (`float`, defaults to 0.0): | |
Guidance rescale factor proposed by [Common Diffusion Noise Schedules and Sample Steps are | |
Flawed](https://arxiv.org/pdf/2305.08891.pdf) `guidance_scale` is defined as `φ` in equation 16. of | |
[Common Diffusion Noise Schedules and Sample Steps are Flawed](https://arxiv.org/pdf/2305.08891.pdf). | |
Guidance rescale factor should fix overexposure when using zero terminal SNR. | |
Returns: | |
[`~pipelines.stable_diffusion.StableDiffusionPipelineOutput`] or `tuple`: | |
[`~pipelines.stable_diffusion.StableDiffusionPipelineOutput`] if `return_dict` is True, otherwise a `tuple. | |
When returning a tuple, the first element is a list with the generated images, and the second element is a | |
list of `bool`s denoting whether the corresponding generated image likely represents "not-safe-for-work" | |
(nsfw) content, according to the `safety_checker`. | |
""" | |
height = height or self.unet.config.get("sample_size", 64) * self.vae_scale_factor | |
width = width or self.unet.config.get("sample_size", 64) * self.vae_scale_factor | |
# check inputs. Raise error if not correct | |
self.check_inputs( | |
prompt, height, width, callback_steps, None, prompt_embeds, None | |
) | |
# define call parameters | |
if isinstance(prompt, str): | |
batch_size = 1 | |
elif isinstance(prompt, list): | |
batch_size = len(prompt) | |
else: | |
batch_size = prompt_embeds.shape[0] | |
if generator is None: | |
generator = np.random | |
# Create torch.Generator instance with same state as np.random.RandomState | |
torch_generator = torch.Generator().manual_seed(int(generator.get_state()[1][0])) | |
#do_classifier_free_guidance = guidance_scale > 1.0 | |
# NOTE: when a LCM is distilled from an LDM via latent consistency distillation (Algorithm 1) with guided | |
# distillation, the forward pass of the LCM learns to approximate sampling from the LDM using CFG with the | |
# unconditional prompt "" (the empty string). Due to this, LCMs currently do not support negative prompts. | |
prompt_embeds = self._encode_prompt( | |
prompt, | |
num_images_per_prompt, | |
False, | |
negative_prompt=None, | |
prompt_embeds=prompt_embeds, | |
negative_prompt_embeds=None, | |
) | |
# set timesteps | |
self.scheduler.set_timesteps(num_inference_steps, "cpu", original_inference_steps=original_inference_steps) | |
timesteps = self.scheduler.timesteps | |
latents = self.prepare_latents( | |
batch_size * num_images_per_prompt, | |
self.unet.config.get("in_channels", 4), | |
height, | |
width, | |
prompt_embeds.dtype, | |
generator, | |
latents, | |
) | |
# Get Guidance Scale Embedding | |
w = np.tile(guidance_scale - 1, batch_size * num_images_per_prompt) | |
w_embedding = self.get_guidance_scale_embedding(w, embedding_dim=self.unet.config.get("time_cond_proj_dim", 256)) | |
# Adapted from diffusers to extend it for other runtimes than ORT | |
timestep_dtype = self.unet.input_dtype.get("timestep", np.float32) | |
# prepare extra kwargs for the scheduler step, since not all schedulers have the same signature | |
# eta (η) is only used with the DDIMScheduler, it will be ignored for other schedulers. | |
# eta corresponds to η in DDIM paper: https://arxiv.org/abs/2010.02502 | |
# and should be between [0, 1] | |
accepts_eta = "eta" in set(inspect.signature(self.scheduler.step).parameters.keys()) | |
extra_step_kwargs = {} | |
if accepts_eta: | |
extra_step_kwargs["eta"] = eta | |
accepts_generator = "generator" in set(inspect.signature(self.scheduler.step).parameters.keys()) | |
if accepts_generator: | |
extra_step_kwargs["generator"] = torch_generator | |
num_warmup_steps = len(timesteps) - num_inference_steps * self.scheduler.order | |
for i, t in enumerate(self.progress_bar(timesteps)): | |
# predict the noise residual | |
timestep = np.array([t], dtype=timestep_dtype) | |
noise_pred = self.unet(sample=latents, timestep=timestep, timestep_cond = w_embedding, encoder_hidden_states=prompt_embeds)[0] | |
# compute the previous noisy sample x_t -> x_t-1 | |
latents, denoised = self.scheduler.step( | |
torch.from_numpy(noise_pred), t, torch.from_numpy(latents), **extra_step_kwargs, return_dict = False | |
) | |
latents, denoised = latents.numpy(), denoised.numpy() | |
# call the callback, if provided | |
if i == len(timesteps) - 1 or ((i + 1) > num_warmup_steps and (i + 1) % self.scheduler.order == 0): | |
if callback is not None and i % callback_steps == 0: | |
callback(i, t, latents) | |
if output_type == "latent": | |
image = latents | |
has_nsfw_concept = None | |
else: | |
denoised /= self.vae_decoder.config.get("scaling_factor", 0.18215) | |
# it seems likes there is a strange result for using half-precision vae decoder if batchsize>1 | |
image = np.concatenate( | |
[self.vae_decoder(latent_sample=denoised[i : i + 1])[0] for i in range(latents.shape[0])] | |
) | |
image, has_nsfw_concept = self.run_safety_checker(image) | |
if has_nsfw_concept is None: | |
do_denormalize = [True] * image.shape[0] | |
else: | |
do_denormalize = [not has_nsfw for has_nsfw in has_nsfw_concept] | |
image = self.image_processor.postprocess(image, output_type=output_type, do_denormalize=do_denormalize) | |
if not return_dict: | |
return (image, has_nsfw_concept) | |
return StableDiffusionPipelineOutput(images=image, nsfw_content_detected=has_nsfw_concept) | |