Spaces:
Running
on
A10G
Running
on
A10G
# Copyright 2023 Peter Willemsen <peter@codebuffet.co>. All rights reserved. | |
# | |
# Licensed under the Apache License, Version 2.0 (the "License"); | |
# you may not use this file except in compliance with the License. | |
# You may obtain a copy of the License at | |
# | |
# http://www.apache.org/licenses/LICENSE-2.0 | |
# | |
# Unless required by applicable law or agreed to in writing, software | |
# distributed under the License is distributed on an "AS IS" BASIS, | |
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
# See the License for the specific language governing permissions and | |
# limitations under the License. | |
import math | |
from typing import Callable, List, Optional, Union | |
import numpy as np | |
import PIL | |
import torch | |
from PIL import Image | |
from transformers import CLIPTextModel, CLIPTokenizer | |
from diffusers.models import AutoencoderKL, UNet2DConditionModel | |
from diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion_upscale import StableDiffusionUpscalePipeline | |
from diffusers.schedulers import DDIMScheduler, DDPMScheduler, LMSDiscreteScheduler, PNDMScheduler | |
def make_transparency_mask(size, overlap_pixels, remove_borders=[]): | |
size_x = size[0] - overlap_pixels * 2 | |
size_y = size[1] - overlap_pixels * 2 | |
for letter in ["l", "r"]: | |
if letter in remove_borders: | |
size_x += overlap_pixels | |
for letter in ["t", "b"]: | |
if letter in remove_borders: | |
size_y += overlap_pixels | |
mask = np.ones((size_y, size_x), dtype=np.uint8) * 255 | |
mask = np.pad(mask, mode="linear_ramp", pad_width=overlap_pixels, end_values=0) | |
if "l" in remove_borders: | |
mask = mask[:, overlap_pixels : mask.shape[1]] | |
if "r" in remove_borders: | |
mask = mask[:, 0 : mask.shape[1] - overlap_pixels] | |
if "t" in remove_borders: | |
mask = mask[overlap_pixels : mask.shape[0], :] | |
if "b" in remove_borders: | |
mask = mask[0 : mask.shape[0] - overlap_pixels, :] | |
return mask | |
def clamp(n, smallest, largest): | |
return max(smallest, min(n, largest)) | |
def clamp_rect(rect: [int], min: [int], max: [int]): | |
return ( | |
clamp(rect[0], min[0], max[0]), | |
clamp(rect[1], min[1], max[1]), | |
clamp(rect[2], min[0], max[0]), | |
clamp(rect[3], min[1], max[1]), | |
) | |
def add_overlap_rect(rect: [int], overlap: int, image_size: [int]): | |
rect = list(rect) | |
rect[0] -= overlap | |
rect[1] -= overlap | |
rect[2] += overlap | |
rect[3] += overlap | |
rect = clamp_rect(rect, [0, 0], [image_size[0], image_size[1]]) | |
return rect | |
def squeeze_tile(tile, original_image, original_slice, slice_x): | |
result = Image.new("RGB", (tile.size[0] + original_slice, tile.size[1])) | |
result.paste( | |
original_image.resize((tile.size[0], tile.size[1]), Image.BICUBIC).crop( | |
(slice_x, 0, slice_x + original_slice, tile.size[1]) | |
), | |
(0, 0), | |
) | |
result.paste(tile, (original_slice, 0)) | |
return result | |
def unsqueeze_tile(tile, original_image_slice): | |
crop_rect = (original_image_slice * 4, 0, tile.size[0], tile.size[1]) | |
tile = tile.crop(crop_rect) | |
return tile | |
def next_divisible(n, d): | |
divisor = n % d | |
return n - divisor | |
class StableDiffusionTiledUpscalePipeline(StableDiffusionUpscalePipeline): | |
r""" | |
Pipeline for tile-based text-guided image super-resolution using Stable Diffusion 2, trading memory for compute | |
to create gigantic images. | |
This model inherits from [`StableDiffusionUpscalePipeline`]. Check the superclass documentation for the generic methods the | |
library implements for all the pipelines (such as downloading or saving, running on a particular device, etc.) | |
Args: | |
vae ([`AutoencoderKL`]): | |
Variational Auto-Encoder (VAE) Model to encode and decode images to and from latent representations. | |
text_encoder ([`CLIPTextModel`]): | |
Frozen text-encoder. Stable Diffusion uses the text portion of | |
[CLIP](https://huggingface.co/docs/transformers/model_doc/clip#transformers.CLIPTextModel), specifically | |
the [clip-vit-large-patch14](https://huggingface.co/openai/clip-vit-large-patch14) variant. | |
tokenizer (`CLIPTokenizer`): | |
Tokenizer of class | |
[CLIPTokenizer](https://huggingface.co/docs/transformers/v4.21.0/en/model_doc/clip#transformers.CLIPTokenizer). | |
unet ([`UNet2DConditionModel`]): Conditional U-Net architecture to denoise the encoded image latents. | |
low_res_scheduler ([`SchedulerMixin`]): | |
A scheduler used to add initial noise to the low res conditioning image. It must be an instance of | |
[`DDPMScheduler`]. | |
scheduler ([`SchedulerMixin`]): | |
A scheduler to be used in combination with `unet` to denoise the encoded image latents. Can be one of | |
[`DDIMScheduler`], [`LMSDiscreteScheduler`], or [`PNDMScheduler`]. | |
""" | |
def __init__( | |
self, | |
vae: AutoencoderKL, | |
text_encoder: CLIPTextModel, | |
tokenizer: CLIPTokenizer, | |
unet: UNet2DConditionModel, | |
low_res_scheduler: DDPMScheduler, | |
scheduler: Union[DDIMScheduler, PNDMScheduler, LMSDiscreteScheduler], | |
max_noise_level: int = 350, | |
): | |
super().__init__( | |
vae=vae, | |
text_encoder=text_encoder, | |
tokenizer=tokenizer, | |
unet=unet, | |
low_res_scheduler=low_res_scheduler, | |
scheduler=scheduler, | |
max_noise_level=max_noise_level, | |
) | |
def _process_tile(self, original_image_slice, x, y, tile_size, tile_border, image, final_image, **kwargs): | |
torch.manual_seed(0) | |
crop_rect = ( | |
min(image.size[0] - (tile_size + original_image_slice), x * tile_size), | |
min(image.size[1] - (tile_size + original_image_slice), y * tile_size), | |
min(image.size[0], (x + 1) * tile_size), | |
min(image.size[1], (y + 1) * tile_size), | |
) | |
crop_rect_with_overlap = add_overlap_rect(crop_rect, tile_border, image.size) | |
tile = image.crop(crop_rect_with_overlap) | |
translated_slice_x = ((crop_rect[0] + ((crop_rect[2] - crop_rect[0]) / 2)) / image.size[0]) * tile.size[0] | |
translated_slice_x = translated_slice_x - (original_image_slice / 2) | |
translated_slice_x = max(0, translated_slice_x) | |
to_input = squeeze_tile(tile, image, original_image_slice, translated_slice_x) | |
orig_input_size = to_input.size | |
to_input = to_input.resize((tile_size, tile_size), Image.BICUBIC) | |
upscaled_tile = super(StableDiffusionTiledUpscalePipeline, self).__call__(image=to_input, **kwargs).images[0] | |
upscaled_tile = upscaled_tile.resize((orig_input_size[0] * 4, orig_input_size[1] * 4), Image.BICUBIC) | |
upscaled_tile = unsqueeze_tile(upscaled_tile, original_image_slice) | |
upscaled_tile = upscaled_tile.resize((tile.size[0] * 4, tile.size[1] * 4), Image.BICUBIC) | |
remove_borders = [] | |
if x == 0: | |
remove_borders.append("l") | |
elif crop_rect[2] == image.size[0]: | |
remove_borders.append("r") | |
if y == 0: | |
remove_borders.append("t") | |
elif crop_rect[3] == image.size[1]: | |
remove_borders.append("b") | |
transparency_mask = Image.fromarray( | |
make_transparency_mask( | |
(upscaled_tile.size[0], upscaled_tile.size[1]), tile_border * 4, remove_borders=remove_borders | |
), | |
mode="L", | |
) | |
final_image.paste( | |
upscaled_tile, (crop_rect_with_overlap[0] * 4, crop_rect_with_overlap[1] * 4), transparency_mask | |
) | |
def __call__( | |
self, | |
prompt: Union[str, List[str]], | |
image: Union[PIL.Image.Image, List[PIL.Image.Image]], | |
num_inference_steps: int = 75, | |
guidance_scale: float = 9.0, | |
noise_level: int = 50, | |
negative_prompt: Optional[Union[str, List[str]]] = None, | |
num_images_per_prompt: Optional[int] = 1, | |
eta: float = 0.0, | |
generator: Optional[torch.Generator] = None, | |
latents: Optional[torch.FloatTensor] = None, | |
callback: Optional[Callable[[int, int, torch.FloatTensor], None]] = None, | |
callback_steps: int = 1, | |
tile_size: int = 128, | |
tile_border: int = 32, | |
original_image_slice: int = 32, | |
): | |
r""" | |
Function invoked when calling the pipeline for generation. | |
Args: | |
prompt (`str` or `List[str]`): | |
The prompt or prompts to guide the image generation. | |
image (`PIL.Image.Image` or List[`PIL.Image.Image`] or `torch.FloatTensor`): | |
`Image`, or tensor representing an image batch which will be upscaled. * | |
num_inference_steps (`int`, *optional*, defaults to 50): | |
The number of denoising steps. More denoising steps usually lead to a higher quality image at the | |
expense of slower inference. | |
guidance_scale (`float`, *optional*, defaults to 7.5): | |
Guidance scale as defined in [Classifier-Free Diffusion Guidance](https://arxiv.org/abs/2207.12598). | |
`guidance_scale` is defined as `w` of equation 2. of [Imagen | |
Paper](https://arxiv.org/pdf/2205.11487.pdf). Guidance scale is enabled by setting `guidance_scale > | |
1`. Higher guidance scale encourages to generate images that are closely linked to the text `prompt`, | |
usually at the expense of lower image quality. | |
negative_prompt (`str` or `List[str]`, *optional*): | |
The prompt or prompts not to guide the image generation. Ignored when not using guidance (i.e., ignored | |
if `guidance_scale` is less than `1`). | |
num_images_per_prompt (`int`, *optional*, defaults to 1): | |
The number of images to generate per prompt. | |
eta (`float`, *optional*, defaults to 0.0): | |
Corresponds to parameter eta (η) in the DDIM paper: https://arxiv.org/abs/2010.02502. Only applies to | |
[`schedulers.DDIMScheduler`], will be ignored for others. | |
generator (`torch.Generator`, *optional*): | |
A [torch generator](https://pytorch.org/docs/stable/generated/torch.Generator.html) to make generation | |
deterministic. | |
latents (`torch.FloatTensor`, *optional*): | |
Pre-generated noisy latents, sampled from a Gaussian distribution, to be used as inputs for image | |
generation. Can be used to tweak the same generation with different prompts. If not provided, a latents | |
tensor will ge generated by sampling using the supplied random `generator`. | |
tile_size (`int`, *optional*): | |
The size of the tiles. Too big can result in an OOM-error. | |
tile_border (`int`, *optional*): | |
The number of pixels around a tile to consider (bigger means less seams, too big can lead to an OOM-error). | |
original_image_slice (`int`, *optional*): | |
The amount of pixels of the original image to calculate with the current tile (bigger means more depth | |
is preserved, less blur occurs in the final image, too big can lead to an OOM-error or loss in detail). | |
callback (`Callable`, *optional*): | |
A function that take a callback function with a single argument, a dict, | |
that contains the (partially) processed image under "image", | |
as well as the progress (0 to 1, where 1 is completed) under "progress". | |
Returns: A PIL.Image that is 4 times larger than the original input image. | |
""" | |
final_image = Image.new("RGB", (image.size[0] * 4, image.size[1] * 4)) | |
tcx = math.ceil(image.size[0] / tile_size) | |
tcy = math.ceil(image.size[1] / tile_size) | |
total_tile_count = tcx * tcy | |
current_count = 0 | |
for y in range(tcy): | |
for x in range(tcx): | |
self._process_tile( | |
original_image_slice, | |
x, | |
y, | |
tile_size, | |
tile_border, | |
image, | |
final_image, | |
prompt=prompt, | |
num_inference_steps=num_inference_steps, | |
guidance_scale=guidance_scale, | |
noise_level=noise_level, | |
negative_prompt=negative_prompt, | |
num_images_per_prompt=num_images_per_prompt, | |
eta=eta, | |
generator=generator, | |
latents=latents, | |
) | |
current_count += 1 | |
if callback is not None: | |
callback({"progress": current_count / total_tile_count, "image": final_image}) | |
return final_image | |
def main(): | |
# Run a demo | |
model_id = "stabilityai/stable-diffusion-x4-upscaler" | |
pipe = StableDiffusionTiledUpscalePipeline.from_pretrained(model_id, revision="fp16", torch_dtype=torch.float16) | |
pipe = pipe.to("cuda") | |
image = Image.open("../../docs/source/imgs/diffusers_library.jpg") | |
def callback(obj): | |
print(f"progress: {obj['progress']:.4f}") | |
obj["image"].save("diffusers_library_progress.jpg") | |
final_image = pipe(image=image, prompt="Black font, white background, vector", noise_level=40, callback=callback) | |
final_image.save("diffusers_library.jpg") | |
if __name__ == "__main__": | |
main() | |