Lumina-Next-T2I / transport /integrators.py
PommesPeter's picture
Update transport/integrators.py
8236505 verified
raw
history blame
No virus
3.91 kB
import numpy as np
import torch as th
import torch.nn as nn
from torchdiffeq import odeint
from functools import partial
from tqdm import tqdm
class sde:
"""SDE solver class"""
def __init__(
self,
drift,
diffusion,
*,
t0,
t1,
num_steps,
sampler_type,
):
assert t0 < t1, "SDE sampler has to be in forward time"
self.num_timesteps = num_steps
self.t = th.linspace(t0, t1, num_steps)
self.dt = self.t[1] - self.t[0]
self.drift = drift
self.diffusion = diffusion
self.sampler_type = sampler_type
def __Euler_Maruyama_step(self, x, mean_x, t, model, **model_kwargs):
w_cur = th.randn(x.size()).to(x)
t = th.ones(x.size(0)).to(x) * t
dw = w_cur * th.sqrt(self.dt)
drift = self.drift(x, t, model, **model_kwargs)
diffusion = self.diffusion(x, t)
mean_x = x + drift * self.dt
x = mean_x + th.sqrt(2 * diffusion) * dw
return x, mean_x
def __Heun_step(self, x, _, t, model, **model_kwargs):
w_cur = th.randn(x.size()).to(x)
dw = w_cur * th.sqrt(self.dt)
t_cur = th.ones(x.size(0)).to(x) * t
diffusion = self.diffusion(x, t_cur)
xhat = x + th.sqrt(2 * diffusion) * dw
K1 = self.drift(xhat, t_cur, model, **model_kwargs)
xp = xhat + self.dt * K1
K2 = self.drift(xp, t_cur + self.dt, model, **model_kwargs)
return (
xhat + 0.5 * self.dt * (K1 + K2),
xhat,
) # at last time point we do not perform the heun step
def __forward_fn(self):
"""TODO: generalize here by adding all private functions ending with steps to it"""
sampler_dict = {
"Euler": self.__Euler_Maruyama_step,
"Heun": self.__Heun_step,
}
try:
sampler = sampler_dict[self.sampler_type]
except:
raise NotImplementedError("Smapler type not implemented.")
return sampler
def sample(self, init, model, **model_kwargs):
"""forward loop of sde"""
x = init
mean_x = init
samples = []
sampler = self.__forward_fn()
for ti in self.t[:-1]:
with th.no_grad():
x, mean_x = sampler(x, mean_x, ti, model, **model_kwargs)
samples.append(x)
return samples
class ode:
"""ODE solver class"""
def __init__(
self,
drift,
*,
t0,
t1,
sampler_type,
num_steps,
atol,
rtol,
time_shifting_factor=None,
):
assert t0 < t1, "ODE sampler has to be in forward time"
self.drift = drift
self.t = th.linspace(t0, t1, num_steps)
if time_shifting_factor == 0:
t_1 = 1 / (1 + th.exp(-6 * (self.t - 0.6)))
t_2 = 1 - 1 / (1 + th.exp(20 * (self.t - 0.6)))
self.t = th.where(self.t < 0.6, t_1, t_2)
else:
self.t = self.t / (self.t + time_shifting_factor - time_shifting_factor * self.t)
self.atol = atol
self.rtol = rtol
self.sampler_type = sampler_type
def sample(self, x, model, **model_kwargs):
device = x[0].device if isinstance(x, tuple) else x.device
def _fn(t, x):
t = (
th.ones(x[0].size(0)).to(device) * t
if isinstance(x, tuple)
else th.ones(x.size(0)).to(device) * t
)
model_output = self.drift(x, t, model, **model_kwargs)
return model_output
t = self.t.to(device)
atol = [self.atol] * len(x) if isinstance(x, tuple) else [self.atol]
rtol = [self.rtol] * len(x) if isinstance(x, tuple) else [self.rtol]
samples = odeint(_fn, x, t, method=self.sampler_type, atol=atol, rtol=rtol)
return samples