Spaces:
Paused
Paused
# Copyright 2023 The HuggingFace Team. All rights reserved. | |
# | |
# Licensed under the Apache License, Version 2.0 (the "License"); | |
# you may not use this file except in compliance with the License. | |
# You may obtain a copy of the License at | |
# | |
# http://www.apache.org/licenses/LICENSE-2.0 | |
# | |
# Unless required by applicable law or agreed to in writing, software | |
# distributed under the License is distributed on an "AS IS" BASIS, | |
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
# See the License for the specific language governing permissions and | |
# limitations under the License. | |
import inspect | |
from typing import Any, Callable, Dict, List, Optional, Tuple, Union | |
import numpy as np | |
import PIL.Image | |
import torch | |
import torch.nn.functional as F | |
from transformers import CLIPImageProcessor, CLIPTextModel, CLIPTokenizer, CLIPVisionModelWithProjection | |
from diffusers.image_processor import PipelineImageInput, VaeImageProcessor | |
from diffusers.loaders import FromSingleFileMixin, IPAdapterMixin, LoraLoaderMixin, TextualInversionLoaderMixin | |
from diffusers.models import AutoencoderKL, ControlNetModel, ImageProjection, UNet2DConditionModel | |
from controlnet_sync import ControlNetModelSync | |
from diffusers.models.lora import adjust_lora_scale_text_encoder | |
from diffusers.schedulers import KarrasDiffusionSchedulers | |
from diffusers.utils import ( | |
USE_PEFT_BACKEND, | |
deprecate, | |
logging, | |
replace_example_docstring, | |
scale_lora_layers, | |
unscale_lora_layers, | |
) | |
from diffusers.utils.torch_utils import is_compiled_module, is_torch_version, randn_tensor | |
# from diffusers.pipelines.pipeline_utils import DiffusionPipeline | |
from pipeline_utils_sync import DiffusionPipeline | |
from diffusers.pipelines.stable_diffusion.pipeline_output import StableDiffusionPipelineOutput | |
from diffusers.pipelines.stable_diffusion.safety_checker import StableDiffusionSafetyChecker | |
from diffusers.pipelines.controlnet.multicontrolnet import MultiControlNetModel | |
from SyncDreamer.ldm.models.diffusion.sync_dreamer import SyncMultiviewDiffusion, SyncDDIMSampler | |
from SyncDreamer.ldm.util import prepare_inputs | |
from tqdm import tqdm | |
logger = logging.get_logger(__name__) # pylint: disable=invalid-name | |
EXAMPLE_DOC_STRING = """ | |
Examples: | |
```py | |
>>> # !pip install opencv-python transformers accelerate | |
>>> from diffusers import StableDiffusionControlNetPipeline, ControlNetModel, UniPCMultistepScheduler | |
>>> from diffusers.utils import load_image | |
>>> import numpy as np | |
>>> import torch | |
>>> import cv2 | |
>>> from PIL import Image | |
>>> # download an image | |
>>> image = load_image( | |
... "https://hf.co/datasets/huggingface/documentation-images/resolve/main/diffusers/input_image_vermeer.png" | |
... ) | |
>>> image = np.array(image) | |
>>> # get canny image | |
>>> image = cv2.Canny(image, 100, 200) | |
>>> image = image[:, :, None] | |
>>> image = np.concatenate([image, image, image], axis=2) | |
>>> canny_image = Image.fromarray(image) | |
>>> # load control net and stable diffusion v1-5 | |
>>> controlnet = ControlNetModel.from_pretrained("lllyasviel/sd-controlnet-canny", torch_dtype=torch.float16) | |
>>> pipe = StableDiffusionControlNetPipeline.from_pretrained( | |
... "runwayml/stable-diffusion-v1-5", controlnet=controlnet, torch_dtype=torch.float16 | |
... ) | |
>>> # speed up diffusion process with faster scheduler and memory optimization | |
>>> pipe.scheduler = UniPCMultistepScheduler.from_config(pipe.scheduler.config) | |
>>> # remove following line if xformers is not installed | |
>>> pipe.enable_xformers_memory_efficient_attention() | |
>>> pipe.enable_model_cpu_offload() | |
>>> # generate image | |
>>> generator = torch.manual_seed(0) | |
>>> image = pipe( | |
... "futuristic-looking woman", num_inference_steps=20, generator=generator, image=canny_image | |
... ).images[0] | |
``` | |
""" | |
class StableDiffusionControlNetPipeline( | |
DiffusionPipeline, TextualInversionLoaderMixin, LoraLoaderMixin, IPAdapterMixin, FromSingleFileMixin | |
): | |
r""" | |
Pipeline for text-to-image generation using Stable Diffusion with ControlNet guidance. | |
This model inherits from [`DiffusionPipeline`]. Check the superclass documentation for the generic methods | |
implemented for all pipelines (downloading, saving, running on a particular device, etc.). | |
The pipeline also inherits the following loading methods: | |
- [`~loaders.TextualInversionLoaderMixin.load_textual_inversion`] for loading textual inversion embeddings | |
- [`~loaders.LoraLoaderMixin.load_lora_weights`] for loading LoRA weights | |
- [`~loaders.LoraLoaderMixin.save_lora_weights`] for saving LoRA weights | |
- [`~loaders.FromSingleFileMixin.from_single_file`] for loading `.ckpt` files | |
- [`~loaders.IPAdapterMixin.load_ip_adapter`] for loading IP Adapters | |
Args: | |
vae ([`AutoencoderKL`]): | |
Variational Auto-Encoder (VAE) model to encode and decode images to and from latent representations. | |
text_encoder ([`~transformers.CLIPTextModel`]): | |
Frozen text-encoder ([clip-vit-large-patch14](https://huggingface.co/openai/clip-vit-large-patch14)). | |
tokenizer ([`~transformers.CLIPTokenizer`]): | |
A `CLIPTokenizer` to tokenize text. | |
unet ([`UNet2DConditionModel`]): | |
A `UNet2DConditionModel` to denoise the encoded image latents. | |
controlnet ([`ControlNetModel`] or `List[ControlNetModel]`): | |
Provides additional conditioning to the `unet` during the denoising process. If you set multiple | |
ControlNets as a list, the outputs from each ControlNet are added together to create one combined | |
additional conditioning. | |
scheduler ([`SchedulerMixin`]): | |
A scheduler to be used in combination with `unet` to denoise the encoded image latents. Can be one of | |
[`DDIMScheduler`], [`LMSDiscreteScheduler`], or [`PNDMScheduler`]. | |
safety_checker ([`StableDiffusionSafetyChecker`]): | |
Classification module that estimates whether generated images could be considered offensive or harmful. | |
Please refer to the [model card](https://huggingface.co/runwayml/stable-diffusion-v1-5) for more details | |
about a model's potential harms. | |
feature_extractor ([`~transformers.CLIPImageProcessor`]): | |
A `CLIPImageProcessor` to extract features from generated images; used as inputs to the `safety_checker`. | |
""" | |
model_cpu_offload_seq = "text_encoder->image_encoder->unet->vae" | |
_optional_components = ["safety_checker", "feature_extractor", "image_encoder"] | |
_exclude_from_cpu_offload = ["safety_checker"] | |
_callback_tensor_inputs = ["latents", "prompt_embeds", "negative_prompt_embeds"] | |
def __init__( | |
self, | |
controlnet: Union[ControlNetModel, List[ControlNetModel], Tuple[ControlNetModel], MultiControlNetModel], | |
dreamer: SyncMultiviewDiffusion, | |
requires_safety_checker: bool = True, | |
): | |
super().__init__() | |
self.register_modules( | |
controlnet=controlnet, | |
dreamer = dreamer, | |
) | |
def __call__( | |
self, | |
conditioning_image = None, | |
height: Optional[int] = None, | |
width: Optional[int] = None, | |
num_inference_steps: int = 50, | |
timesteps: List[int] = None, | |
guidance_scale: float = 7.5, | |
negative_prompt: Optional[Union[str, List[str]]] = None, | |
num_images_per_prompt: Optional[int] = 1, | |
eta: float = 0.0, | |
generator: Optional[Union[torch.Generator, List[torch.Generator]]] = None, | |
latents: Optional[torch.FloatTensor] = None, | |
prompt_embeds: Optional[torch.FloatTensor] = None, | |
negative_prompt_embeds: Optional[torch.FloatTensor] = None, | |
ip_adapter_image: Optional[PipelineImageInput] = None, | |
output_type: Optional[str] = "pil", | |
return_dict: bool = True, | |
cross_attention_kwargs: Optional[Dict[str, Any]] = None, | |
controlnet_conditioning_scale: Union[float, List[float]] = 1.0, | |
guess_mode: bool = False, | |
control_guidance_start: Union[float, List[float]] = 0.0, | |
control_guidance_end: Union[float, List[float]] = 1.0, | |
clip_skip: Optional[int] = None, | |
callback_on_step_end: Optional[Callable[[int, int, Dict], None]] = None, | |
callback_on_step_end_tensor_inputs: List[str] = ["latents"], | |
**kwargs, | |
): | |
r""" | |
The call function to the pipeline for generation. | |
Args: | |
prompt (`str` or `List[str]`, *optional*): | |
The prompt or prompts to guide image generation. If not defined, you need to pass `prompt_embeds`. | |
image (`torch.FloatTensor`, `PIL.Image.Image`, `np.ndarray`, `List[torch.FloatTensor]`, `List[PIL.Image.Image]`, `List[np.ndarray]`,: | |
`List[List[torch.FloatTensor]]`, `List[List[np.ndarray]]` or `List[List[PIL.Image.Image]]`): | |
The ControlNet input condition to provide guidance to the `unet` for generation. If the type is | |
specified as `torch.FloatTensor`, it is passed to ControlNet as is. `PIL.Image.Image` can also be | |
accepted as an image. The dimensions of the output image defaults to `image`'s dimensions. If height | |
and/or width are passed, `image` is resized accordingly. If multiple ControlNets are specified in | |
`init`, images must be passed as a list such that each element of the list can be correctly batched for | |
input to a single ControlNet. | |
height (`int`, *optional*, defaults to `self.unet.config.sample_size * self.vae_scale_factor`): | |
The height in pixels of the generated image. | |
width (`int`, *optional*, defaults to `self.unet.config.sample_size * self.vae_scale_factor`): | |
The width in pixels of the generated image. | |
num_inference_steps (`int`, *optional*, defaults to 50): | |
The number of denoising steps. More denoising steps usually lead to a higher quality image at the | |
expense of slower inference. | |
timesteps (`List[int]`, *optional*): | |
Custom timesteps to use for the denoising process with schedulers which support a `timesteps` argument | |
in their `set_timesteps` method. If not defined, the default behavior when `num_inference_steps` is | |
passed will be used. Must be in descending order. | |
guidance_scale (`float`, *optional*, defaults to 7.5): | |
A higher guidance scale value encourages the model to generate images closely linked to the text | |
`prompt` at the expense of lower image quality. Guidance scale is enabled when `guidance_scale > 1`. | |
negative_prompt (`str` or `List[str]`, *optional*): | |
The prompt or prompts to guide what to not include in image generation. If not defined, you need to | |
pass `negative_prompt_embeds` instead. Ignored when not using guidance (`guidance_scale < 1`). | |
num_images_per_prompt (`int`, *optional*, defaults to 1): | |
The number of images to generate per prompt. | |
eta (`float`, *optional*, defaults to 0.0): | |
Corresponds to parameter eta (η) from the [DDIM](https://arxiv.org/abs/2010.02502) paper. Only applies | |
to the [`~schedulers.DDIMScheduler`], and is ignored in other schedulers. | |
generator (`torch.Generator` or `List[torch.Generator]`, *optional*): | |
A [`torch.Generator`](https://pytorch.org/docs/stable/generated/torch.Generator.html) to make | |
generation deterministic. | |
latents (`torch.FloatTensor`, *optional*): | |
Pre-generated noisy latents sampled from a Gaussian distribution, to be used as inputs for image | |
generation. Can be used to tweak the same generation with different prompts. If not provided, a latents | |
tensor is generated by sampling using the supplied random `generator`. | |
prompt_embeds (`torch.FloatTensor`, *optional*): | |
Pre-generated text embeddings. Can be used to easily tweak text inputs (prompt weighting). If not | |
provided, text embeddings are generated from the `prompt` input argument. | |
negative_prompt_embeds (`torch.FloatTensor`, *optional*): | |
Pre-generated negative text embeddings. Can be used to easily tweak text inputs (prompt weighting). If | |
not provided, `negative_prompt_embeds` are generated from the `negative_prompt` input argument. | |
ip_adapter_image: (`PipelineImageInput`, *optional*): Optional image input to work with IP Adapters. | |
output_type (`str`, *optional*, defaults to `"pil"`): | |
The output format of the generated image. Choose between `PIL.Image` or `np.array`. | |
return_dict (`bool`, *optional*, defaults to `True`): | |
Whether or not to return a [`~pipelines.stable_diffusion.StableDiffusionPipelineOutput`] instead of a | |
plain tuple. | |
callback (`Callable`, *optional*): | |
A function that calls every `callback_steps` steps during inference. The function is called with the | |
following arguments: `callback(step: int, timestep: int, latents: torch.FloatTensor)`. | |
callback_steps (`int`, *optional*, defaults to 1): | |
The frequency at which the `callback` function is called. If not specified, the callback is called at | |
every step. | |
cross_attention_kwargs (`dict`, *optional*): | |
A kwargs dictionary that if specified is passed along to the [`AttentionProcessor`] as defined in | |
[`self.processor`](https://github.com/huggingface/diffusers/blob/main/src/diffusers/models/attention_processor.py). | |
controlnet_conditioning_scale (`float` or `List[float]`, *optional*, defaults to 1.0): | |
The outputs of the ControlNet are multiplied by `controlnet_conditioning_scale` before they are added | |
to the residual in the original `unet`. If multiple ControlNets are specified in `init`, you can set | |
the corresponding scale as a list. | |
guess_mode (`bool`, *optional*, defaults to `False`): | |
The ControlNet encoder tries to recognize the content of the input image even if you remove all | |
prompts. A `guidance_scale` value between 3.0 and 5.0 is recommended. | |
control_guidance_start (`float` or `List[float]`, *optional*, defaults to 0.0): | |
The percentage of total steps at which the ControlNet starts applying. | |
control_guidance_end (`float` or `List[float]`, *optional*, defaults to 1.0): | |
The percentage of total steps at which the ControlNet stops applying. | |
clip_skip (`int`, *optional*): | |
Number of layers to be skipped from CLIP while computing the prompt embeddings. A value of 1 means that | |
the output of the pre-final layer will be used for computing the prompt embeddings. | |
callback_on_step_end (`Callable`, *optional*): | |
A function that calls at the end of each denoising steps during the inference. The function is called | |
with the following arguments: `callback_on_step_end(self: DiffusionPipeline, step: int, timestep: int, | |
callback_kwargs: Dict)`. `callback_kwargs` will include a list of all tensors as specified by | |
`callback_on_step_end_tensor_inputs`. | |
callback_on_step_end_tensor_inputs (`List`, *optional*): | |
The list of tensor inputs for the `callback_on_step_end` function. The tensors specified in the list | |
will be passed as `callback_kwargs` argument. You will only be able to include variables listed in the | |
`._callback_tensor_inputs` attribute of your pipeine class. | |
Examples: | |
Returns: | |
[`~pipelines.stable_diffusion.StableDiffusionPipelineOutput`] or `tuple`: | |
If `return_dict` is `True`, [`~pipelines.stable_diffusion.StableDiffusionPipelineOutput`] is returned, | |
otherwise a `tuple` is returned where the first element is a list with the generated images and the | |
second element is a list of `bool`s indicating whether the corresponding generated image contains | |
"not-safe-for-work" (nsfw) content. | |
""" | |
callback = kwargs.pop("callback", None) | |
callback_steps = kwargs.pop("callback_steps", None) | |
if callback is not None: | |
deprecate( | |
"callback", | |
"1.0.0", | |
"Passing `callback` as an input argument to `__call__` is deprecated, consider using `callback_on_step_end`", | |
) | |
if callback_steps is not None: | |
deprecate( | |
"callback_steps", | |
"1.0.0", | |
"Passing `callback_steps` as an input argument to `__call__` is deprecated, consider using `callback_on_step_end`", | |
) | |
controlnet = self.controlnet._orig_mod if is_compiled_module(self.controlnet) else self.controlnet | |
# align format for control guidance | |
if not isinstance(control_guidance_start, list) and isinstance(control_guidance_end, list): | |
control_guidance_start = len(control_guidance_end) * [control_guidance_start] | |
elif not isinstance(control_guidance_end, list) and isinstance(control_guidance_start, list): | |
control_guidance_end = len(control_guidance_start) * [control_guidance_end] | |
elif not isinstance(control_guidance_start, list) and not isinstance(control_guidance_end, list): | |
mult = len(controlnet.nets) if isinstance(controlnet, MultiControlNetModel) else 1 | |
control_guidance_start, control_guidance_end = ( | |
mult * [control_guidance_start], | |
mult * [control_guidance_end], | |
) | |
def drop(cond, mask): | |
shape = cond.shape | |
B = shape[0] | |
cond = mask.view(B,*[1 for _ in range(len(shape)-1)]) * cond | |
return cond | |
def get_drop_scheme(B, device): | |
drop_scheme = 'default' | |
if drop_scheme=='default': | |
random = torch.rand(B, dtype=torch.float32, device=device) | |
drop_clip = (random > 0.15) & (random <= 0.2) | |
drop_volume = (random > 0.1) & (random <= 0.15) | |
drop_concat = (random > 0.05) & (random <= 0.1) | |
drop_all = random <= 0.05 | |
else: | |
raise NotImplementedError | |
return drop_clip, drop_volume, drop_concat, drop_all | |
def unet_wrapper_forward(x, t, clip_embed, volume_feats, x_concat, is_train=False): | |
drop_conditions = False | |
if drop_conditions and is_train: | |
B = x.shape[0] | |
drop_clip, drop_volume, drop_concat, drop_all = get_drop_scheme(B, x.device) | |
clip_mask = 1.0 - (drop_clip | drop_all).float() | |
clip_embed = drop(clip_embed, clip_mask) | |
volume_mask = 1.0 - (drop_volume | drop_all).float() | |
for k, v in volume_feats.items(): | |
volume_feats[k] = drop(v, mask=volume_mask) | |
concat_mask = 1.0 - (drop_concat | drop_all).float() | |
x_concat = drop(x_concat, concat_mask) | |
use_zero_123 = True | |
if use_zero_123: | |
# zero123 does not multiply this when encoding, maybe a bug for zero123 | |
first_stage_scale_factor = 0.18215 | |
x_concat_ = x_concat * 1.0 | |
x_concat_[:, :4] = x_concat_[:, :4] / first_stage_scale_factor | |
else: | |
x_concat_ = x_concat | |
x = torch.cat([x, x_concat_], 1) | |
return x, t, clip_embed, volume_feats | |
def unet_wrapper_forward_unconditional(x, t, clip_embed, volume_feats, x_concat): | |
""" | |
@param x: B,4,H,W | |
@param t: B, | |
@param clip_embed: B,M,768 | |
@param volume_feats: B,C,D,H,W | |
@param x_concat: B,C,H,W | |
@param is_train: | |
@return: | |
""" | |
x_ = torch.cat([x] * 2, 0) | |
t_ = torch.cat([t] * 2, 0) | |
clip_embed_ = torch.cat([clip_embed, torch.zeros_like(clip_embed)], 0) | |
v_ = {} | |
for k, v in volume_feats.items(): | |
v_[k] = torch.cat([v, torch.zeros_like(v)], 0) | |
x_concat_ = torch.cat([x_concat, torch.zeros_like(x_concat)], 0) | |
use_zero_123 = True | |
if use_zero_123: | |
# zero123 does not multiply this when encoding, maybe a bug for zero123 | |
first_stage_scale_factor = 0.18215 | |
x_concat_[:, :4] = x_concat_[:, :4] / first_stage_scale_factor | |
x_ = torch.cat([x_, x_concat_], 1) | |
return x_, t_, clip_embed_, v_ | |
def repeat_to_batch(tensor, B, VN): | |
t_shape = tensor.shape | |
ones = [1 for _ in range(len(t_shape)-1)] | |
tensor_new = tensor.view(B,1,*t_shape[1:]).repeat(1,VN,*ones).view(B*VN,*t_shape[1:]) | |
return tensor_new | |
flags_input = conditioning_image | |
flags_sample_steps = 50 | |
weight_dtype = torch.float32 | |
data = prepare_inputs(flags_input, 30, -1) | |
for k, v in data.items(): | |
data[k] = v.unsqueeze(0).cuda() | |
data[k] = torch.repeat_interleave(data[k], repeats=1, dim=0) | |
sampler = SyncDDIMSampler(self.dreamer, flags_sample_steps) | |
data["conditioning_pixel_values"] = data['input_image'] | |
_, clip_embed, input_info = self.dreamer.prepare(data) | |
controlnet_image = data["conditioning_pixel_values"].to(dtype=weight_dtype) | |
controlnet_image = controlnet_image.permute(0, 3, 1, 2) # B, c, h, w | |
image_size = 256 | |
latent_size = image_size//8 | |
C, H, W = 4, latent_size, latent_size | |
B = clip_embed.shape[0] | |
N = 16 | |
device = 'cuda' | |
x_target_noisy = torch.randn([B, N, C, H, W], device=device) | |
timesteps = sampler.ddim_timesteps | |
time_range = np.flip(timesteps) | |
total_steps = timesteps.shape[0] | |
iterator = tqdm(time_range, desc='DDIM Sampler', total=total_steps) | |
for i, step in enumerate(iterator): | |
index = total_steps - i - 1 # index in ddim state | |
is_step0=index==0 | |
time_steps = torch.full((B,), step, device=device, dtype=torch.long) | |
x_input, elevation_input = input_info['x'], input_info['elevation'] | |
B, N, C, H, W = x_target_noisy.shape | |
# construct source data | |
v_embed = self.dreamer.get_viewpoint_embedding(B, elevation_input) # B,N,v_dim | |
t_embed = self.dreamer.embed_time(time_steps) # B,t_dim | |
spatial_volume = self.dreamer.spatial_volume.construct_spatial_volume(x_target_noisy, t_embed, v_embed, self.dreamer.poses, self.dreamer.Ks) | |
cfg_scale = 2.0 | |
unconditional_scale = cfg_scale | |
batch_view_num = 4 | |
e_t = [] | |
target_indices = torch.arange(N) # N | |
for ni in range(0, N, batch_view_num): | |
x_target_noisy_ = x_target_noisy[:, ni:ni + batch_view_num] | |
VN = x_target_noisy_.shape[1] | |
x_target_noisy_ = x_target_noisy_.reshape(B*VN,C,H,W) | |
time_steps_ = repeat_to_batch(time_steps, B, VN) | |
target_indices_ = target_indices[ni:ni+batch_view_num].unsqueeze(0).repeat(B,1) | |
clip_embed_, volume_feats_, x_concat_ = self.dreamer.get_target_view_feats(x_input, spatial_volume, clip_embed, t_embed, v_embed, target_indices_) | |
if unconditional_scale!=1.0: | |
x_, t_, clip_embed_, volume_feats_ = unet_wrapper_forward_unconditional(x_target_noisy_, time_steps_, clip_embed_, volume_feats_, x_concat_) | |
down_block_res_samples, mid_block_res_sample = controlnet( | |
x=x_, | |
timesteps=t_, | |
controlnet_cond=controlnet_image, | |
conditioning_scale=1.0, | |
context=clip_embed_, | |
return_dict=False, | |
source_dict=volume_feats_, | |
) | |
noise, s_uc = self.dreamer.model.diffusion_model(x_, t_, clip_embed_, down_block_res_samples, mid_block_res_sample, source_dict=volume_feats_).chunk(2) | |
noise = s_uc + unconditional_scale * (noise - s_uc) | |
else: | |
x_noisy_, timesteps, clip_embed, volume_feats = unet_wrapper_forward(x_target_noisy_, time_steps_, clip_embed_, volume_feats_, x_concat_, is_train=False) | |
down_block_res_samples, mid_block_res_sample = controlnet( | |
x=x_noisy_, | |
timesteps=timesteps, | |
controlnet_cond=controlnet_image, | |
conditioning_scale=1.0, | |
context=clip_embed, | |
return_dict=False, | |
source_dict=volume_feats, | |
) | |
noise = self.dreamer.model.diffusion_model(x_noisy_, timesteps, clip_embed, down_block_res_samples, mid_block_res_sample, source_dict=volume_feats) | |
e_t.append(noise.view(B,VN,4,H,W)) | |
e_t = torch.cat(e_t, 1) | |
x_target_noisy = sampler.denoise_apply_impl(x_target_noisy, index, e_t, is_step0) | |
N = x_target_noisy.shape[1] | |
x_sample = torch.stack([self.dreamer.decode_first_stage(x_target_noisy[:, ni]) for ni in range(N)], 1) | |
B, N, _, H, W = x_sample.shape | |
x_sample = (torch.clamp(x_sample,max=1.0,min=-1.0) + 1) * 0.5 | |
x_sample = x_sample.permute(0,1,3,4,2).cpu().numpy() * 255 | |
x_sample = x_sample.astype(np.uint8) | |
return x_sample[0, :, :, :, :] | |