Spaces:
Runtime error
Runtime error
from diffusers import StableDiffusionPipeline | |
import gc | |
import gradio as gr | |
pipe = StableDiffusionPipeline.from_pretrained("CompVis/stable-diffusion-v1-4").to("cpu") | |
text_encoder = pipe.text_encoder | |
text_encoder.eval() | |
unet = pipe.unet | |
unet.eval() | |
vae = pipe.vae | |
vae.eval() | |
del pipe | |
gc.collect() | |
from pathlib import Path | |
import torch | |
import openvino as pv | |
text_encoder_path=Path("text_encoder.xml") | |
def cleanup_cache(): | |
torch._C._jit_clear_class_registry() | |
torch.jit._recursive.concrete_type_store=torch.jit._recursive.ConcreteTypeStore() | |
torch.jit._state._clear_class_state() | |
def convert_encoder(text_encoder:torch.nn.Module,ir_path:Path): | |
""" | |
Convert Text Encoder mode. | |
Function accepts text encoder model, and prepares example inputs for conversion, | |
Parameters: | |
text_encoder (torch.nn.Module): text_encoder model from Stable Diffusion pipeline | |
ir_path (Path): File for storing model | |
Returns: | |
None | |
""" | |
input_ids=torch.ones((1,77),dtype=torch.long) | |
text_encoder.eval() | |
with torch.no_grad(): | |
ov_model=pv.convert_model(text_encoder,example_input=input_ids,input=[(1,77),]) | |
pv.save_model(ov_model,ir_path) | |
del ov_model | |
cleanup_cache() | |
print(f"Text Encoder successfully converted to TR and saved to {ir_path}") | |
if not text_encoder_path.exists(): | |
convert_encoder(text_encoder,text_encoder_path) | |
else: | |
print(f"Text encoder will be loaded from {text_encoder_path}") | |
del text_encoder | |
gc.collect() | |
import numpy as np | |
unet_path=Path("unet.xml") | |
dtype_mapping={ | |
torch.float32: pv.Type.f32, | |
torch.float64: pv.Type.f64 | |
} | |
def convert_unet(unet:torch.nn.Module,ir_path:Path): | |
""" | |
Convert U-net model to IR format. | |
Function accepts unet model, prepares example inputs for conversion, | |
Parameters: | |
unet (StableDiffusionPipeline): unet from Stable Diffusion pipeline | |
ir_path (Path): File for storing model | |
Returns: | |
None | |
""" | |
encoder_hidden_state=torch.ones((2,77,768)) | |
latents_shape=(2,4,512 // 8,512 // 8) | |
latents=torch.randn(latents_shape) | |
t=torch.from_numpy(np.array(1,dtype=float)) | |
dummy_inputs=(latents,t,encoder_hidden_state) | |
input_info=[] | |
for input_tensor in dummy_inputs: | |
shape=pv.PartialShape(tuple(input_tensor.shape)) | |
element_type=dtype_mapping[input_tensor.dtype] | |
input_info.append((shape,element_type)) | |
unet.eval() | |
with torch.no_grad(): | |
pv_model=pv.convert_model(unet,example_input=dummy_inputs,input=input_info) | |
pv.save_model(pv_model,ir_path) | |
del pv_model | |
cleanup_cache() | |
print(f"Unet successfully converted to IR and saved to {ir_path}") | |
if not unet_path.exists(): | |
convert_unet(unet,unet_path) | |
gc.collect() | |
else: | |
print(f"unet will be loaded from {unet_path}") | |
del unet | |
gc.collect() | |
VAE_ENCODER_PATH = Path("vae_encoder.xml") | |
def convert_vae_encoder(vae: torch.nn.Module, ir_path: Path): | |
class VAEEncoder(torch.nn.Module): | |
def __init__(self, vae): | |
super().__init__() | |
self.vae = vae | |
def forward(self, image): | |
return self.vae.encode(x=image)["latent_dist"].sample() | |
vae_encoder = VAEEncoder(vae) | |
vae_encoder.eval() | |
image = torch.zeros((1, 3, 512, 512)) | |
with torch.no_grad(): | |
ov_model = pv.convert_model(vae_encoder, example_input=image, input=[((1,3,512,512),)]) | |
pv.save_model(ov_model, ir_path) | |
del ov_model | |
cleanup_cache() | |
print(f'VAE encoder successfully converted to IR and saved to {ir_path}') | |
if not VAE_ENCODER_PATH.exists(): | |
convert_vae_encoder(vae, VAE_ENCODER_PATH) | |
else: | |
print(f"VAE encoder will be loaded from {VAE_ENCODER_PATH}") | |
VAE_DECODER_PATH = Path('vae_decoder.xml') | |
def convert_vae_decoder(vae: torch.nn.Module, ir_path: Path): | |
class VAEDecoder(torch.nn.Module): | |
def __init__(self, vae): | |
super().__init__() | |
self.vae = vae | |
def forward(self, latents): | |
return self.vae.decode(latents) | |
vae_decoder = VAEDecoder(vae) | |
latents = torch.zeros((1, 4, 64, 64)) | |
vae_decoder.eval() | |
with torch.no_grad(): | |
ov_model = pv.convert_model(vae_decoder, example_input=latents, input=[((1,4,64,64),)]) | |
pv.save_model(ov_model, ir_path) | |
del ov_model | |
cleanup_cache() | |
print(f'VAE decoder successfully converted to IR and saved to {ir_path}') | |
if not VAE_DECODER_PATH.exists(): | |
convert_vae_decoder(vae, VAE_DECODER_PATH) | |
else: | |
print(f"VAE decoder will be loaded from {VAE_DECODER_PATH}") | |
del vae | |
gc.collect() | |
import inspect | |
from typing import List,Optional,Union,Dict | |
import PIL | |
import cv2 | |
from transformers import CLIPTokenizer | |
from diffusers.pipelines.pipeline_utils import DiffusionPipeline | |
from diffusers.schedulers import DDIMScheduler,LMSDiscreteScheduler,PNDMScheduler | |
from openvino.runtime import Model | |
def scale_window(dst_width:int,dst_height:int,image_width:int,image_height:int): | |
im_scale=min(dst_height / image_height,dst_width / image_width) | |
return int(im_scale * image_width), int(im_scale * image_height) | |
def preprocess(image:PIL.Image.Image): | |
src_width,src_height=image.size | |
dst_width,dst_height=scale_window(512,512,src_width,src_height) | |
image=np.array(image.resize((dst_width,dst_height),resample=PIL.Image.Resampling.LANCZOS))[None,:] | |
pad_width=512-dst_width | |
pad_height=512-dst_height | |
pad=((0,0),(0,pad_height),(0,pad_width),(0,0)) | |
image=np.pad(image,pad,mode="constant") | |
image=image.astype(np.float32) / 255.0 | |
image=2.0* image - 1.0 | |
image=image.transpose(0,3,1,2) | |
return image, {"padding":pad,"src_width":src_width,"src_height":src_height} | |
class OVStableDiffusionPipeline(DiffusionPipeline): | |
def __init__( | |
self, | |
vae_decoder: Model, | |
text_encoder: Model, | |
tokenizer: CLIPTokenizer, | |
unet: Model, | |
scheduler: Union[DDIMScheduler, PNDMScheduler, LMSDiscreteScheduler], | |
vae_encoder: Model = None, | |
): | |
super().__init__() | |
self.scheduler = scheduler | |
self.vae_decoder = vae_decoder | |
self.vae_encoder = vae_encoder | |
self.text_encoder = text_encoder | |
self.unet = unet | |
self._text_encoder_output = text_encoder.output(0) | |
self._unet_output = unet.output(0) | |
self._vae_d_output = vae_decoder.output(0) | |
self._vae_e_output = vae_encoder.output(0) if vae_encoder is not None else None | |
self.height = 512 | |
self.width = 512 | |
self.tokenizer = tokenizer | |
def __call__( | |
self, | |
prompt: Union[str, List[str]], | |
image: PIL.Image.Image = None, | |
num_inference_steps: Optional[int] = 50, | |
negative_prompt: Union[str, List[str]] = None, | |
guidance_scale: Optional[float] = 7.5, | |
eta: Optional[float] = 0.0, | |
output_type: Optional[str] = "pil", | |
seed: Optional[int] = None, | |
strength: float = 1.0, | |
gif: Optional[bool] = False, | |
**kwargs, | |
): | |
if seed is not None: | |
np.random.seed(seed) | |
img_buffer = [] | |
do_classifier_free_guidance = guidance_scale > 1.0 | |
# get prompt text embeddings | |
text_embeddings = self._encode_prompt(prompt, do_classifier_free_guidance=do_classifier_free_guidance, negative_prompt=negative_prompt) | |
# set timesteps | |
accepts_offset = "offset" in set(inspect.signature(self.scheduler.set_timesteps).parameters.keys()) | |
extra_set_kwargs = {} | |
if accepts_offset: | |
extra_set_kwargs["offset"] = 1 | |
self.scheduler.set_timesteps(num_inference_steps, **extra_set_kwargs) | |
timesteps, num_inference_steps = self.get_timesteps(num_inference_steps, strength) | |
latent_timestep = timesteps[:1] | |
# get the initial random noise unless the user supplied it | |
latents, meta = self.prepare_latents(image, latent_timestep) | |
# prepare extra kwargs for the scheduler step, since not all schedulers have the same signature | |
# eta (η) is only used with the DDIMScheduler, it will be ignored for other schedulers. | |
# eta corresponds to η in DDIM paper: https://arxiv.org/abs/2010.02502 | |
# and should be between [0, 1] | |
accepts_eta = "eta" in set(inspect.signature(self.scheduler.step).parameters.keys()) | |
extra_step_kwargs = {} | |
if accepts_eta: | |
extra_step_kwargs["eta"] = eta | |
for i, t in enumerate(self.progress_bar(timesteps)): | |
# expand the latents if you are doing classifier free guidance | |
latent_model_input = np.concatenate([latents] * 2) if do_classifier_free_guidance else latents | |
latent_model_input = self.scheduler.scale_model_input(latent_model_input, t) | |
# predict the noise residual | |
noise_pred = self.unet([latent_model_input, t, text_embeddings])[self._unet_output] | |
# perform guidance | |
if do_classifier_free_guidance: | |
noise_pred_uncond, noise_pred_text = noise_pred[0], noise_pred[1] | |
noise_pred = noise_pred_uncond + guidance_scale * (noise_pred_text - noise_pred_uncond) | |
# compute the previous noisy sample x_t -> x_t-1 | |
latents = self.scheduler.step(torch.from_numpy(noise_pred), t, torch.from_numpy(latents), **extra_step_kwargs)["prev_sample"].numpy() | |
if gif: | |
image = self.vae_decoder(latents * (1 / 0.18215))[self._vae_d_output] | |
image = self.postprocess_image(image, meta, output_type) | |
img_buffer.extend(image) | |
# scale and decode the image latents with vae | |
image = self.vae_decoder(latents * (1 / 0.18215))[self._vae_d_output] | |
image = self.postprocess_image(image, meta, output_type) | |
return {"sample": image, 'iterations': img_buffer} | |
def _encode_prompt(self, prompt:Union[str, List[str]], num_images_per_prompt:int = 1, do_classifier_free_guidance:bool = True, negative_prompt:Union[str, List[str]] = None): | |
""" | |
Encodes the prompt into text encoder hidden states. | |
Parameters: | |
prompt (str or list(str)): prompt to be encoded | |
num_images_per_prompt (int): number of images that should be generated per prompt | |
do_classifier_free_guidance (bool): whether to use classifier free guidance or not | |
negative_prompt (str or list(str)): negative prompt to be encoded | |
Returns: | |
text_embeddings (np.ndarray): text encoder hidden states | |
""" | |
batch_size = len(prompt) if isinstance(prompt, list) else 1 | |
# tokenize input prompts | |
text_inputs = self.tokenizer( | |
prompt, | |
padding="max_length", | |
max_length=self.tokenizer.model_max_length, | |
truncation=True, | |
return_tensors="np", | |
) | |
text_input_ids = text_inputs.input_ids | |
text_embeddings = self.text_encoder( | |
text_input_ids)[self._text_encoder_output] | |
# duplicate text embeddings for each generation per prompt | |
if num_images_per_prompt != 1: | |
bs_embed, seq_len, _ = text_embeddings.shape | |
text_embeddings = np.tile( | |
text_embeddings, (1, num_images_per_prompt, 1)) | |
text_embeddings = np.reshape( | |
text_embeddings, (bs_embed * num_images_per_prompt, seq_len, -1)) | |
# get unconditional embeddings for classifier free guidance | |
if do_classifier_free_guidance: | |
uncond_tokens: List[str] | |
max_length = text_input_ids.shape[-1] | |
if negative_prompt is None: | |
uncond_tokens = [""] * batch_size | |
elif isinstance(negative_prompt, str): | |
uncond_tokens = [negative_prompt] | |
else: | |
uncond_tokens = negative_prompt | |
uncond_input = self.tokenizer( | |
uncond_tokens, | |
padding="max_length", | |
max_length=max_length, | |
truncation=True, | |
return_tensors="np", | |
) | |
uncond_embeddings = self.text_encoder(uncond_input.input_ids)[self._text_encoder_output] | |
# duplicate unconditional embeddings for each generation per prompt, using mps friendly method | |
seq_len = uncond_embeddings.shape[1] | |
uncond_embeddings = np.tile(uncond_embeddings, (1, num_images_per_prompt, 1)) | |
uncond_embeddings = np.reshape(uncond_embeddings, (batch_size * num_images_per_prompt, seq_len, -1)) | |
# For classifier free guidance, we need to do two forward passes. | |
# Here we concatenate the unconditional and text embeddings into a single batch | |
# to avoid doing two forward passes | |
text_embeddings = np.concatenate([uncond_embeddings, text_embeddings]) | |
return text_embeddings | |
def prepare_latents(self, image:PIL.Image.Image = None, latent_timestep:torch.Tensor = None): | |
""" | |
Function for getting initial latents for starting generation | |
Parameters: | |
image (PIL.Image.Image, *optional*, None): | |
Input image for generation, if not provided randon noise will be used as starting point | |
latent_timestep (torch.Tensor, *optional*, None): | |
Predicted by scheduler initial step for image generation, required for latent image mixing with nosie | |
Returns: | |
latents (np.ndarray): | |
Image encoded in latent space | |
""" | |
latents_shape = (1, 4, self.height // 8, self.width // 8) | |
noise = np.random.randn(*latents_shape).astype(np.float32) | |
if image is None: | |
# if you use LMSDiscreteScheduler, let's make sure latents are multiplied by sigmas | |
if isinstance(self.scheduler, LMSDiscreteScheduler): | |
noise = noise * self.scheduler.sigmas[0].numpy() | |
return noise, {} | |
input_image, meta = preprocess(image) | |
latents = self.vae_encoder(input_image)[self._vae_e_output] * 0.18215 | |
latents = self.scheduler.add_noise(torch.from_numpy(latents), torch.from_numpy(noise), latent_timestep).numpy() | |
return latents, meta | |
def postprocess_image(self, image:np.ndarray, meta:Dict, output_type:str = "pil"): | |
""" | |
Postprocessing for decoded image. Takes generated image decoded by VAE decoder, unpad it to initila image size (if required), | |
normalize and convert to [0, 255] pixels range. Optionally, convertes it from np.ndarray to PIL.Image format | |
Parameters: | |
image (np.ndarray): | |
Generated image | |
meta (Dict): | |
Metadata obtained on latents preparing step, can be empty | |
output_type (str, *optional*, pil): | |
Output format for result, can be pil or numpy | |
Returns: | |
image (List of np.ndarray or PIL.Image.Image): | |
Postprocessed images | |
""" | |
if "padding" in meta: | |
pad = meta["padding"] | |
(_, end_h), (_, end_w) = pad[1:3] | |
h, w = image.shape[2:] | |
unpad_h = h - end_h | |
unpad_w = w - end_w | |
image = image[:, :, :unpad_h, :unpad_w] | |
image = np.clip(image / 2 + 0.5, 0, 1) | |
image = np.transpose(image, (0, 2, 3, 1)) | |
# 9. Convert to PIL | |
if output_type == "pil": | |
image = self.numpy_to_pil(image) | |
if "src_height" in meta: | |
orig_height, orig_width = meta["src_height"], meta["src_width"] | |
image = [img.resize((orig_width, orig_height), | |
PIL.Image.Resampling.LANCZOS) for img in image] | |
else: | |
if "src_height" in meta: | |
orig_height, orig_width = meta["src_height"], meta["src_width"] | |
image = [cv2.resize(img, (orig_width, orig_width)) | |
for img in image] | |
return image | |
def get_timesteps(self, num_inference_steps:int, strength:float): | |
""" | |
Helper function for getting scheduler timesteps for generation | |
In case of image-to-image generation, it updates number of steps according to strength | |
Parameters: | |
num_inference_steps (int): | |
number of inference steps for generation | |
strength (float): | |
value between 0.0 and 1.0, that controls the amount of noise that is added to the input image. | |
Values that approach 1.0 enable lots of variations but will also produce images that are not semantically consistent with the input. | |
""" | |
# get the original timestep using init_timestep | |
init_timestep = min(int(num_inference_steps * strength), num_inference_steps) | |
t_start = max(num_inference_steps - init_timestep, 0) | |
timesteps = self.scheduler.timesteps[t_start:] | |
return timesteps, num_inference_steps - t_start | |
core=pv.Core() | |
import ipywidgets as widgets | |
device=widgets.Dropdown( | |
options=core.available_devices+["AUTO"], | |
value="CPU", | |
desciption="Device:", | |
disabled=False, | |
) | |
device | |
text_enc=core.compile_model(text_encoder_path,device.value) | |
unet_model=core.compile_model(unet_path,device.value) | |
pv_config={"INFERENCE_PRECISION_HINT":"f32"}if device.value !="CPU" else {} | |
vae_decoder=core.compile_model(VAE_DECODER_PATH,device.value,pv_config) | |
vae_encoder=core.compile_model(VAE_ENCODER_PATH,device.value,pv_config) | |
from transformers import CLIPTokenizer | |
from diffusers.schedulers import LMSDiscreteScheduler | |
lms=LMSDiscreteScheduler( | |
beta_start=0.00085, | |
beta_end=0.012, | |
beta_schedule="scaled_linear") | |
tokenizer=CLIPTokenizer.from_pretrained("openai/clip-vit-large-patch14") | |
pv_pipe=OVStableDiffusionPipeline( | |
tokenizer=tokenizer, | |
text_encoder=text_enc, | |
unet=unet_model, | |
vae_encoder=vae_encoder, | |
vae_decoder=vae_decoder, | |
scheduler=lms) | |
from ipywidgets import widgets | |
sample_text=("A Dog wearing golden rich mens necklace") | |
text_prompt=widgets.Text(value=sample_text,description="A Dog wearing golden rich mens necklace ") | |
num_steps=widgets.IntSlider(min=1,max=50,value=20,description="steps:") | |
seed=widgets.IntSlider(min=0,max=10000000,description="seed:",value=54) | |
widgets.VBox([text_prompt,seed,num_steps]) | |
result=pv_pipe(text_prompt.value,num_inference_steps=num_steps.value,seed=seed.value) | |
def generate_text(text,seed,num_steps,strength,_=gr.Progress(track_tqdm=True)): | |
result=pv_pipe(text,num_inference_steps=num_steps,seed=seed) | |
return result["sample"][0] | |
def generate_image(img,text,seed,num_steps,strength,_=gr.Progress(track_tqdm=True)): | |
result=pv_pipe(text,img,num_inference_steps=num_steps,seed=seed,strength=strength) | |
return result["sample"][0] | |
with gr.Blocks() as demo: | |
with gr.Tab("Zero-shot Text-to-Image Generation"): | |
with gr.Row(): | |
with gr.Column(): | |
text_input=gr.Textbox(lines=3,label="Text") | |
seed_input=gr.Slider(0,10000000,value=42,label="seed") | |
steps_input=gr.Slider(1,50,value=20,step=1,label="steps") | |
out=gr.Image(label="Result",type="pil") | |
btn=gr.Button() | |
btn.click(generate_text,[text_input,seed_input,steps_input],out) | |
gr.Examples([[sample_text,42,20]],[text_input,seed_input,steps_input]) | |
try: | |
demo.queue().launch(debug=True) | |
except Exception: | |
demo.queue().launch(share=True,debug=True) | |