from abc import abstractmethod from typing import Any, Tuple import numpy as np import torch import torch.nn.functional as F from torch import nn class DiagonalGaussianDistribution(object): def __init__(self, parameters, deterministic=False): self.parameters = parameters self.mean, self.logvar = torch.chunk(parameters, 2, dim=1) self.logvar = torch.clamp(self.logvar, -30.0, 20.0) self.deterministic = deterministic self.std = torch.exp(0.5 * self.logvar) self.var = torch.exp(self.logvar) if self.deterministic: self.var = self.std = torch.zeros_like(self.mean).to(device=self.parameters.device) def sample(self): # x = self.mean + self.std * torch.randn(self.mean.shape).to( # device=self.parameters.device # ) x = self.mean + self.std * torch.randn_like(self.mean) return x def kl(self, other=None): if self.deterministic: return torch.Tensor([0.0]) else: if other is None: return 0.5 * torch.sum( torch.pow(self.mean, 2) + self.var - 1.0 - self.logvar, dim=[1, 2, 3], ) else: return 0.5 * torch.sum( torch.pow(self.mean - other.mean, 2) / other.var + self.var / other.var - 1.0 - self.logvar + other.logvar, dim=[1, 2, 3], ) def nll(self, sample, dims=[1, 2, 3]): if self.deterministic: return torch.Tensor([0.0]) logtwopi = np.log(2.0 * np.pi) return 0.5 * torch.sum( logtwopi + self.logvar + torch.pow(sample - self.mean, 2) / self.var, dim=dims, ) def mode(self): return self.mean class AbstractRegularizer(nn.Module): def __init__(self): super().__init__() def forward(self, z: torch.Tensor) -> Tuple[torch.Tensor, dict]: raise NotImplementedError() @abstractmethod def get_trainable_parameters(self) -> Any: raise NotImplementedError() class IdentityRegularizer(AbstractRegularizer): def forward(self, z: torch.Tensor) -> Tuple[torch.Tensor, dict]: return z, dict() def get_trainable_parameters(self) -> Any: yield from () def measure_perplexity(predicted_indices: torch.Tensor, num_centroids: int) -> Tuple[torch.Tensor, torch.Tensor]: # src: https://github.com/karpathy/deep-vector-quantization/blob/main/model.py # eval cluster perplexity. when perplexity == num_embeddings then all clusters are used exactly equally encodings = F.one_hot(predicted_indices, num_centroids).float().reshape(-1, num_centroids) avg_probs = encodings.mean(0) perplexity = (-(avg_probs * torch.log(avg_probs + 1e-10)).sum()).exp() cluster_use = torch.sum(avg_probs > 0) return perplexity, cluster_use class DiagonalGaussianRegularizer(AbstractRegularizer): def __init__(self, sample: bool = True): super().__init__() self.sample = sample def get_trainable_parameters(self) -> Any: yield from () def forward(self, z: torch.Tensor) -> Tuple[torch.Tensor, dict]: log = dict() posterior = DiagonalGaussianDistribution(z) if self.sample: z = posterior.sample() else: z = posterior.mode() kl_loss = posterior.kl() kl_loss = torch.sum(kl_loss) / kl_loss.shape[0] log["kl_loss"] = kl_loss return z, log