Spaces:
Sleeping
Sleeping
""" | |
Based on code from Marcin Andrychowicz | |
""" | |
import numpy as np | |
class Normalizer(object): | |
def __init__( | |
self, | |
size, | |
eps=1e-8, | |
default_clip_range=np.inf, | |
mean=0, | |
std=1, | |
): | |
self.size = size | |
self.eps = eps | |
self.default_clip_range = default_clip_range | |
self.sum = np.zeros(self.size, np.float32) | |
self.sumsq = np.zeros(self.size, np.float32) | |
self.count = np.ones(1, np.float32) | |
self.mean = mean + np.zeros(self.size, np.float32) | |
self.std = std * np.ones(self.size, np.float32) | |
self.synchronized = True | |
def update(self, v): | |
if v.ndim == 1: | |
v = np.expand_dims(v, 0) | |
assert v.ndim == 2 | |
assert v.shape[1] == self.size | |
self.sum += v.sum(axis=0) | |
self.sumsq += (np.square(v)).sum(axis=0) | |
self.count[0] += v.shape[0] | |
self.synchronized = False | |
def normalize(self, v, clip_range=None): | |
if not self.synchronized: | |
self.synchronize() | |
if clip_range is None: | |
clip_range = self.default_clip_range | |
mean, std = self.mean, self.std | |
if v.ndim == 2: | |
mean = mean.reshape(1, -1) | |
std = std.reshape(1, -1) | |
return np.clip((v - mean) / std, -clip_range, clip_range) | |
def denormalize(self, v): | |
if not self.synchronized: | |
self.synchronize() | |
mean, std = self.mean, self.std | |
if v.ndim == 2: | |
mean = mean.reshape(1, -1) | |
std = std.reshape(1, -1) | |
return mean + v * std | |
def synchronize(self): | |
self.mean[...] = self.sum / self.count[0] | |
self.std[...] = np.sqrt( | |
np.maximum( | |
np.square(self.eps), | |
self.sumsq / self.count[0] - np.square(self.mean) | |
) | |
) | |
self.synchronized = True | |
class IdentityNormalizer(object): | |
def __init__(self, *args, **kwargs): | |
pass | |
def update(self, v): | |
pass | |
def normalize(self, v, clip_range=None): | |
return v | |
def denormalize(self, v): | |
return v | |
class FixedNormalizer(object): | |
def __init__( | |
self, | |
size, | |
default_clip_range=np.inf, | |
mean=0, | |
std=1, | |
eps=1e-8, | |
): | |
assert std > 0 | |
std = std + eps | |
self.size = size | |
self.default_clip_range = default_clip_range | |
self.mean = mean + np.zeros(self.size, np.float32) | |
self.std = std + np.zeros(self.size, np.float32) | |
self.eps = eps | |
def set_mean(self, mean): | |
self.mean = mean + np.zeros(self.size, np.float32) | |
def set_std(self, std): | |
std = std + self.eps | |
self.std = std + np.zeros(self.size, np.float32) | |
def normalize(self, v, clip_range=None): | |
if clip_range is None: | |
clip_range = self.default_clip_range | |
mean, std = self.mean, self.std | |
if v.ndim == 2: | |
mean = mean.reshape(1, -1) | |
std = std.reshape(1, -1) | |
return np.clip((v - mean) / std, -clip_range, clip_range) | |
def denormalize(self, v): | |
mean, std = self.mean, self.std | |
if v.ndim == 2: | |
mean = mean.reshape(1, -1) | |
std = std.reshape(1, -1) | |
return mean + v * std | |
def copy_stats(self, other): | |
self.set_mean(other.mean) | |
self.set_std(other.std) | |