|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import math |
|
from typing import List, Optional, Tuple, Union |
|
|
|
import numpy as np |
|
import paddle |
|
|
|
from ..configuration_utils import ConfigMixin, register_to_config |
|
from .scheduling_utils import SchedulerMixin, SchedulerOutput |
|
|
|
|
|
class IPNDMScheduler(SchedulerMixin, ConfigMixin): |
|
""" |
|
Improved Pseudo numerical methods for diffusion models (iPNDM) ported from @crowsonkb's amazing k-diffusion |
|
[library](https://github.com/crowsonkb/v-diffusion-pytorch/blob/987f8985e38208345c1959b0ea767a625831cc9b/diffusion/sampling.py#L296) |
|
|
|
[`~ConfigMixin`] takes care of storing all config attributes that are passed in the scheduler's `__init__` |
|
function, such as `num_train_timesteps`. They can be accessed via `scheduler.config.num_train_timesteps`. |
|
[`SchedulerMixin`] provides general loading and saving functionality via the [`SchedulerMixin.save_pretrained`] and |
|
[`~SchedulerMixin.from_pretrained`] functions. |
|
|
|
For more details, see the original paper: https://arxiv.org/abs/2202.09778 |
|
|
|
Args: |
|
num_train_timesteps (`int`): number of diffusion steps used to train the model. |
|
trained_betas (`np.ndarray`, optional): |
|
option to pass an array of betas directly to the constructor to bypass `beta_start`, `beta_end` etc. |
|
""" |
|
|
|
order = 1 |
|
|
|
@register_to_config |
|
def __init__( |
|
self, num_train_timesteps: int = 1000, trained_betas: Optional[Union[np.ndarray, List[float]]] = None |
|
): |
|
|
|
self.set_timesteps(num_train_timesteps) |
|
|
|
|
|
self.init_noise_sigma = 1.0 |
|
|
|
|
|
|
|
|
|
self.pndm_order = 4 |
|
|
|
|
|
self.ets = [] |
|
|
|
def set_timesteps(self, num_inference_steps: int): |
|
""" |
|
Sets the discrete timesteps used for the diffusion chain. Supporting function to be run before inference. |
|
|
|
Args: |
|
num_inference_steps (`int`): |
|
the number of diffusion steps used when generating samples with a pre-trained model. |
|
""" |
|
self.num_inference_steps = num_inference_steps |
|
steps = paddle.linspace(1, 0, num_inference_steps + 1)[:-1] |
|
steps = paddle.concat([steps, paddle.to_tensor([0.0])]) |
|
|
|
if self.config.trained_betas is not None: |
|
self.betas = paddle.to_tensor(self.config.trained_betas, dtype="float32") |
|
else: |
|
self.betas = paddle.sin(steps * math.pi / 2) ** 2 |
|
|
|
self.alphas = (1.0 - self.betas**2) ** 0.5 |
|
|
|
self.timesteps = (paddle.atan2(self.betas, self.alphas) / math.pi * 2)[:-1] |
|
|
|
self.ets = [] |
|
|
|
def step( |
|
self, |
|
model_output: paddle.Tensor, |
|
timestep: int, |
|
sample: paddle.Tensor, |
|
return_dict: bool = True, |
|
) -> Union[SchedulerOutput, Tuple]: |
|
""" |
|
Step function propagating the sample with the linear multi-step method. This has one forward pass with multiple |
|
times to approximate the solution. |
|
|
|
Args: |
|
model_output (`paddle.Tensor`): direct output from learned diffusion model. |
|
timestep (`int`): current discrete timestep in the diffusion chain. |
|
sample (`paddle.Tensor`): |
|
current instance of sample being created by diffusion process. |
|
return_dict (`bool`): option for returning tuple rather than SchedulerOutput class |
|
|
|
Returns: |
|
[`~scheduling_utils.SchedulerOutput`] or `tuple`: [`~scheduling_utils.SchedulerOutput`] if `return_dict` is |
|
True, otherwise a `tuple`. When returning a tuple, the first element is the sample tensor. |
|
|
|
""" |
|
if self.num_inference_steps is None: |
|
raise ValueError( |
|
"Number of inference steps is 'None', you need to run 'set_timesteps' after creating the scheduler" |
|
) |
|
|
|
timestep_index = (self.timesteps == timestep).nonzero().item() |
|
prev_timestep_index = timestep_index + 1 |
|
|
|
ets = sample * self.betas[timestep_index] + model_output * self.alphas[timestep_index] |
|
self.ets.append(ets) |
|
|
|
if len(self.ets) == 1: |
|
ets = self.ets[-1] |
|
elif len(self.ets) == 2: |
|
ets = (3 * self.ets[-1] - self.ets[-2]) / 2 |
|
elif len(self.ets) == 3: |
|
ets = (23 * self.ets[-1] - 16 * self.ets[-2] + 5 * self.ets[-3]) / 12 |
|
else: |
|
ets = (1 / 24) * (55 * self.ets[-1] - 59 * self.ets[-2] + 37 * self.ets[-3] - 9 * self.ets[-4]) |
|
|
|
prev_sample = self._get_prev_sample(sample, timestep_index, prev_timestep_index, ets) |
|
|
|
if not return_dict: |
|
return (prev_sample,) |
|
|
|
return SchedulerOutput(prev_sample=prev_sample) |
|
|
|
def scale_model_input(self, sample: paddle.Tensor, *args, **kwargs) -> paddle.Tensor: |
|
""" |
|
Ensures interchangeability with schedulers that need to scale the denoising model input depending on the |
|
current timestep. |
|
|
|
Args: |
|
sample (`paddle.Tensor`): input sample |
|
|
|
Returns: |
|
`paddle.Tensor`: scaled input sample |
|
""" |
|
return sample |
|
|
|
def _get_prev_sample(self, sample, timestep_index, prev_timestep_index, ets): |
|
alpha = self.alphas[timestep_index] |
|
sigma = self.betas[timestep_index] |
|
|
|
next_alpha = self.alphas[prev_timestep_index] |
|
next_sigma = self.betas[prev_timestep_index] |
|
|
|
pred = (sample - sigma * ets) / max(alpha, 1e-8) |
|
prev_sample = next_alpha * pred + ets * next_sigma |
|
|
|
return prev_sample |
|
|
|
def __len__(self): |
|
return self.config.num_train_timesteps |
|
|