Spaces:
Running
on
Zero
Running
on
Zero
import torch | |
import torch.nn as nn | |
from torch.nn import init | |
import functools | |
from torch.optim import lr_scheduler | |
import numpy as np | |
import torch.nn.functional as F | |
from torch.nn.modules.normalization import LayerNorm | |
import os | |
from torch.nn.utils import spectral_norm | |
from torchvision import models | |
############################################################################### | |
# Helper functions | |
############################################################################### | |
def init_weights(net, init_type='normal', init_gain=0.02): | |
"""Initialize network weights. | |
Parameters: | |
net (network) -- network to be initialized | |
init_type (str) -- the name of an initialization method: normal | xavier | kaiming | orthogonal | |
init_gain (float) -- scaling factor for normal, xavier and orthogonal. | |
We use 'normal' in the original pix2pix and CycleGAN paper. But xavier and kaiming might | |
work better for some applications. Feel free to try yourself. | |
""" | |
def init_func(m): # define the initialization function | |
classname = m.__class__.__name__ | |
if hasattr(m, 'weight') and (classname.find('Conv') != -1 or classname.find('Linear') != -1): | |
if init_type == 'normal': | |
init.normal_(m.weight.data, 0.0, init_gain) | |
elif init_type == 'xavier': | |
init.xavier_normal_(m.weight.data, gain=init_gain) | |
elif init_type == 'kaiming': | |
#init.kaiming_normal_(m.weight.data, a=0, mode='fan_in') | |
init.kaiming_normal_(m.weight.data, a=0.2, mode='fan_in', nonlinearity='leaky_relu') | |
elif init_type == 'orthogonal': | |
init.orthogonal_(m.weight.data, gain=init_gain) | |
else: | |
raise NotImplementedError('initialization method [%s] is not implemented' % init_type) | |
if hasattr(m, 'bias') and m.bias is not None: | |
init.constant_(m.bias.data, 0.0) | |
elif classname.find('BatchNorm2d') != -1: # BatchNorm Layer's weight is not a matrix; only normal distribution applies. | |
init.normal_(m.weight.data, 1.0, init_gain) | |
init.constant_(m.bias.data, 0.0) | |
print('initialize network with %s' % init_type) | |
net.apply(init_func) # apply the initialization function <init_func> | |
def init_net(net, init_type='normal', init_gain=0.02, gpu_ids=[], init=True): | |
"""Initialize a network: 1. register CPU/GPU device (with multi-GPU support); 2. initialize the network weights | |
Parameters: | |
net (network) -- the network to be initialized | |
init_type (str) -- the name of an initialization method: normal | xavier | kaiming | orthogonal | |
gain (float) -- scaling factor for normal, xavier and orthogonal. | |
gpu_ids (int list) -- which GPUs the network runs on: e.g., 0,1,2 | |
Return an initialized network. | |
""" | |
if len(gpu_ids) > 0: | |
assert(torch.cuda.is_available()) | |
net.to(gpu_ids[0]) | |
if init: | |
init_weights(net, init_type, init_gain=init_gain) | |
return net | |
def get_scheduler(optimizer, opt): | |
"""Return a learning rate scheduler | |
Parameters: | |
optimizer -- the optimizer of the network | |
opt (option class) -- stores all the experiment flags; needs to be a subclass of BaseOptions. | |
opt.lr_policy is the name of learning rate policy: linear | step | plateau | cosine | |
For 'linear', we keep the same learning rate for the first <opt.niter> epochs | |
and linearly decay the rate to zero over the next <opt.niter_decay> epochs. | |
For other schedulers (step, plateau, and cosine), we use the default PyTorch schedulers. | |
See https://pytorch.org/docs/stable/optim.html for more details. | |
""" | |
if opt.lr_policy == 'linear': | |
def lambda_rule(epoch): | |
lr_l = 1.0 - max(0, epoch + opt.epoch_count - opt.niter) / float(opt.niter_decay + 1) | |
return lr_l | |
scheduler = lr_scheduler.LambdaLR(optimizer, lr_lambda=lambda_rule) | |
elif opt.lr_policy == 'step': | |
scheduler = lr_scheduler.StepLR(optimizer, step_size=opt.lr_decay_iters, gamma=0.1) | |
elif opt.lr_policy == 'plateau': | |
scheduler = lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.2, threshold=0.01, patience=5) | |
elif opt.lr_policy == 'cosine': | |
scheduler = lr_scheduler.CosineAnnealingLR(optimizer, T_max=opt.niter, eta_min=0) | |
else: | |
return NotImplementedError('learning rate policy [%s] is not implemented', opt.lr_policy) | |
return scheduler | |
class LayerNormWarpper(nn.Module): | |
def __init__(self, num_features): | |
super(LayerNormWarpper, self).__init__() | |
self.num_features = int(num_features) | |
def forward(self, x): | |
x = nn.LayerNorm([self.num_features, x.size()[2], x.size()[3]], elementwise_affine=False).cuda()(x) | |
return x | |
def get_norm_layer(norm_type='instance'): | |
"""Return a normalization layer | |
Parameters: | |
norm_type (str) -- the name of the normalization layer: batch | instance | none | |
For BatchNorm, we use learnable affine parameters and track running statistics (mean/stddev). | |
For InstanceNorm, we do not use learnable affine parameters. We do not track running statistics. | |
""" | |
if norm_type == 'batch': | |
norm_layer = functools.partial(nn.BatchNorm2d, affine=True, track_running_stats=True) | |
elif norm_type == 'instance': | |
norm_layer = functools.partial(nn.InstanceNorm2d, affine=False, track_running_stats=False) | |
elif norm_type == 'layer': | |
norm_layer = functools.partial(LayerNormWarpper) | |
elif norm_type == 'none': | |
norm_layer = None | |
else: | |
raise NotImplementedError('normalization layer [%s] is not found' % norm_type) | |
return norm_layer | |
def get_non_linearity(layer_type='relu'): | |
if layer_type == 'relu': | |
nl_layer = functools.partial(nn.ReLU, inplace=True) | |
elif layer_type == 'lrelu': | |
nl_layer = functools.partial( | |
nn.LeakyReLU, negative_slope=0.2, inplace=True) | |
elif layer_type == 'elu': | |
nl_layer = functools.partial(nn.ELU, inplace=True) | |
elif layer_type == 'selu': | |
nl_layer = functools.partial(nn.SELU, inplace=True) | |
elif layer_type == 'prelu': | |
nl_layer = functools.partial(nn.PReLU) | |
else: | |
raise NotImplementedError( | |
'nonlinearity activitation [%s] is not found' % layer_type) | |
return nl_layer | |
def define_G(input_nc, output_nc, nz, ngf, netG='unet_128', norm='batch', nl='relu', use_noise=False, | |
use_dropout=False, init_type='xavier', init_gain=0.02, gpu_ids=[], where_add='input', upsample='bilinear'): | |
net = None | |
norm_layer = get_norm_layer(norm_type=norm) | |
nl_layer = get_non_linearity(layer_type=nl) | |
# print(norm, norm_layer) | |
if nz == 0: | |
where_add = 'input' | |
if netG == 'unet_128' and where_add == 'input': | |
net = G_Unet_add_input(input_nc, output_nc, nz, 7, ngf, norm_layer=norm_layer, nl_layer=nl_layer, use_noise=use_noise, | |
use_dropout=use_dropout, upsample=upsample, device=gpu_ids) | |
elif netG == 'unet_128_G' and where_add == 'input': | |
net = G_Unet_add_input_G(input_nc, output_nc, nz, 7, ngf, norm_layer=norm_layer, nl_layer=nl_layer, use_noise=use_noise, | |
use_dropout=use_dropout, upsample=upsample, device=gpu_ids) | |
elif netG == 'unet_256' and where_add == 'input': | |
net = G_Unet_add_input(input_nc, output_nc, nz, 8, ngf, norm_layer=norm_layer, nl_layer=nl_layer, use_noise=use_noise, | |
use_dropout=use_dropout, upsample=upsample, device=gpu_ids) | |
elif netG == 'unet_256_G' and where_add == 'input': | |
net = G_Unet_add_input_G(input_nc, output_nc, nz, 8, ngf, norm_layer=norm_layer, nl_layer=nl_layer, use_noise=use_noise, | |
use_dropout=use_dropout, upsample=upsample, device=gpu_ids) | |
elif netG == 'unet_128' and where_add == 'all': | |
net = G_Unet_add_all(input_nc, output_nc, nz, 7, ngf, norm_layer=norm_layer, nl_layer=nl_layer, use_noise=use_noise, | |
use_dropout=use_dropout, upsample=upsample) | |
elif netG == 'unet_256' and where_add == 'all': | |
net = G_Unet_add_all(input_nc, output_nc, nz, 8, ngf, norm_layer=norm_layer, nl_layer=nl_layer, use_noise=use_noise, | |
use_dropout=use_dropout, upsample=upsample) | |
else: | |
raise NotImplementedError('Generator model name [%s] is not recognized' % net) | |
# print(net) | |
return init_net(net, init_type, init_gain, gpu_ids) | |
def define_C(input_nc, output_nc, nz, ngf, netC='unet_128', norm='instance', nl='relu', | |
use_dropout=False, init_type='normal', init_gain=0.02, gpu_ids=[], upsample='basic'): | |
net = None | |
norm_layer = get_norm_layer(norm_type=norm) | |
nl_layer = get_non_linearity(layer_type=nl) | |
if netC == 'resnet_9blocks': | |
net = ResnetGenerator(input_nc, output_nc, ngf, norm_layer=norm_layer, use_dropout=use_dropout, n_blocks=9) | |
elif netC == 'resnet_6blocks': | |
net = ResnetGenerator(input_nc, output_nc, ngf, norm_layer=norm_layer, use_dropout=use_dropout, n_blocks=6) | |
elif netC == 'unet_128': | |
net = G_Unet_add_input_C(input_nc, output_nc, 0, 7, ngf, norm_layer=norm_layer, nl_layer=nl_layer, | |
use_dropout=use_dropout, upsample=upsample) | |
elif netC == 'unet_256': | |
net = G_Unet_add_input(input_nc, output_nc, 0, 8, ngf, norm_layer=norm_layer, nl_layer=nl_layer, | |
use_dropout=use_dropout, upsample=upsample) | |
elif netC == 'unet_32': | |
net = G_Unet_add_input(input_nc, output_nc, 0, 5, ngf, norm_layer=norm_layer, nl_layer=nl_layer, | |
use_dropout=use_dropout, upsample=upsample) | |
else: | |
raise NotImplementedError('Generator model name [%s] is not recognized' % net) | |
return init_net(net, init_type, init_gain, gpu_ids) | |
def define_D(input_nc, ndf, netD, norm='batch', nl='lrelu', init_type='xavier', init_gain=0.02, num_Ds=1, gpu_ids=[]): | |
net = None | |
norm_layer = get_norm_layer(norm_type=norm) | |
nl = 'lrelu' # use leaky relu for D | |
nl_layer = get_non_linearity(layer_type=nl) | |
if netD == 'basic_128': | |
net = D_NLayers(input_nc, ndf, n_layers=2, norm_layer=norm_layer, nl_layer=nl_layer) | |
elif netD == 'basic_256': | |
net = D_NLayers(input_nc, ndf, n_layers=3, norm_layer=norm_layer, nl_layer=nl_layer) | |
elif netD == 'basic_128_multi': | |
net = D_NLayersMulti(input_nc=input_nc, ndf=ndf, n_layers=2, norm_layer=norm_layer, num_D=num_Ds, nl_layer=nl_layer) | |
elif netD == 'basic_256_multi': | |
net = D_NLayersMulti(input_nc=input_nc, ndf=ndf, n_layers=3, norm_layer=norm_layer, num_D=num_Ds, nl_layer=nl_layer) | |
else: | |
raise NotImplementedError('Discriminator model name [%s] is not recognized' % net) | |
return init_net(net, init_type, init_gain, gpu_ids) | |
def define_E(input_nc, output_nc, ndf, netE, norm='batch', nl='lrelu', | |
init_type='xavier', init_gain=0.02, gpu_ids=[], vaeLike=False): | |
net = None | |
norm_layer = get_norm_layer(norm_type=norm) | |
nl = 'lrelu' # use leaky relu for E | |
nl_layer = get_non_linearity(layer_type=nl) | |
if netE == 'resnet_128': | |
net = E_ResNet(input_nc, output_nc, ndf, n_blocks=4, norm_layer=norm_layer, | |
nl_layer=nl_layer, vaeLike=vaeLike) | |
elif netE == 'resnet_256': | |
net = E_ResNet(input_nc, output_nc, ndf, n_blocks=5, norm_layer=norm_layer, | |
nl_layer=nl_layer, vaeLike=vaeLike) | |
elif netE == 'conv_128': | |
net = E_NLayers(input_nc, output_nc, ndf, n_layers=4, norm_layer=norm_layer, | |
nl_layer=nl_layer, vaeLike=vaeLike) | |
elif netE == 'conv_256': | |
net = E_NLayers(input_nc, output_nc, ndf, n_layers=5, norm_layer=norm_layer, | |
nl_layer=nl_layer, vaeLike=vaeLike) | |
else: | |
raise NotImplementedError('Encoder model name [%s] is not recognized' % net) | |
return init_net(net, init_type, init_gain, gpu_ids, False) | |
class ResnetGenerator(nn.Module): | |
def __init__(self, input_nc, output_nc, ngf=64, n_downsampling=3, norm_layer=None, use_dropout=False, n_blocks=6, padding_type='replicate'): | |
assert(n_blocks >= 0) | |
super(ResnetGenerator, self).__init__() | |
self.input_nc = input_nc | |
self.output_nc = output_nc | |
self.ngf = ngf | |
if type(norm_layer) == functools.partial: # no need to use bias as BatchNorm2d has affine parameters | |
use_bias = norm_layer.func != nn.BatchNorm2d | |
else: | |
use_bias = norm_layer != nn.BatchNorm2d | |
model = [nn.ReplicationPad2d(3), | |
nn.Conv2d(input_nc, ngf, kernel_size=7, padding=0, | |
bias=use_bias)] | |
if norm_layer is not None: | |
model += [norm_layer(ngf)] | |
model += [nn.ReLU(True)] | |
# n_downsampling = 2 | |
for i in range(n_downsampling): | |
mult = 2**i | |
model += [nn.ReplicationPad2d(1),nn.Conv2d(ngf * mult, ngf * mult * 2, kernel_size=3, | |
stride=2, padding=0, bias=use_bias)] | |
# model += [nn.Conv2d(ngf * mult, ngf * mult * 2, kernel_size=3, | |
# stride=2, padding=1, bias=use_bias)] | |
if norm_layer is not None: | |
model += [norm_layer(ngf * mult * 2)] | |
model += [nn.ReLU(True)] | |
mult = 2**n_downsampling | |
for i in range(n_blocks): | |
model += [ResnetBlock(ngf * mult, padding_type=padding_type, norm_layer=norm_layer, use_dropout=use_dropout, use_bias=use_bias)] | |
for i in range(n_downsampling): | |
mult = 2**(n_downsampling - i) | |
# model += [nn.ConvTranspose2d(ngf * mult, int(ngf * mult / 2), | |
# kernel_size=3, stride=2, | |
# padding=1, output_padding=1, | |
# bias=use_bias)] | |
# if norm_layer is not None: | |
# model += [norm_layer(ngf * mult / 2)] | |
# model += [nn.ReLU(True)] | |
model += upsampleLayer(ngf * mult, int(ngf * mult / 2), upsample='bilinear', padding_type=padding_type) | |
if norm_layer is not None: | |
model += [norm_layer(int(ngf * mult / 2))] | |
model += [nn.ReLU(True)] | |
model +=[nn.ReplicationPad2d(1), | |
nn.Conv2d(int(ngf * mult / 2), int(ngf * mult / 2), kernel_size=3, padding=0)] | |
if norm_layer is not None: | |
model += [norm_layer(ngf * mult / 2)] | |
model += [nn.ReLU(True)] | |
model += [nn.ReplicationPad2d(3)] | |
model += [nn.Conv2d(ngf, output_nc, kernel_size=7, padding=0)] | |
#model += [nn.Tanh()] | |
self.model = nn.Sequential(*model) | |
def forward(self, input): | |
return self.model(input) | |
# Define a resnet block | |
class ResnetBlock(nn.Module): | |
def __init__(self, dim, padding_type, norm_layer, use_dropout, use_bias): | |
super(ResnetBlock, self).__init__() | |
self.conv_block = self.build_conv_block(dim, padding_type, norm_layer, use_dropout, use_bias) | |
def build_conv_block(self, dim, padding_type, norm_layer, use_dropout, use_bias): | |
conv_block = [] | |
p = 0 | |
if padding_type == 'reflect': | |
conv_block += [nn.ReflectionPad2d(1)] | |
elif padding_type == 'replicate': | |
conv_block += [nn.ReplicationPad2d(1)] | |
elif padding_type == 'zero': | |
p = 1 | |
else: | |
raise NotImplementedError('padding [%s] is not implemented' % padding_type) | |
conv_block += [nn.Conv2d(dim, dim, kernel_size=3, padding=p, bias=use_bias)] | |
if norm_layer is not None: | |
conv_block += [norm_layer(dim)] | |
conv_block += [nn.ReLU(True)] | |
# if use_dropout: | |
# conv_block += [nn.Dropout(0.5)] | |
p = 0 | |
if padding_type == 'reflect': | |
conv_block += [nn.ReflectionPad2d(1)] | |
elif padding_type == 'replicate': | |
conv_block += [nn.ReplicationPad2d(1)] | |
elif padding_type == 'zero': | |
p = 1 | |
else: | |
raise NotImplementedError('padding [%s] is not implemented' % padding_type) | |
conv_block += [nn.Conv2d(dim, dim, kernel_size=3, padding=p, bias=use_bias)] | |
if norm_layer is not None: | |
conv_block += [norm_layer(dim)] | |
return nn.Sequential(*conv_block) | |
def forward(self, x): | |
out = x + self.conv_block(x) | |
return out | |
class D_NLayersMulti(nn.Module): | |
def __init__(self, input_nc, ndf=64, n_layers=3, | |
norm_layer=nn.BatchNorm2d, num_D=1, nl_layer=None): | |
super(D_NLayersMulti, self).__init__() | |
# st() | |
self.num_D = num_D | |
self.nl_layer=nl_layer | |
if num_D == 1: | |
layers = self.get_layers(input_nc, ndf, n_layers, norm_layer) | |
self.model = nn.Sequential(*layers) | |
else: | |
layers = self.get_layers(input_nc, ndf, n_layers, norm_layer) | |
self.add_module("model_0", nn.Sequential(*layers)) | |
self.down = nn.functional.interpolate | |
for i in range(1, num_D): | |
ndf_i = int(round(ndf / (2**i))) | |
layers = self.get_layers(input_nc, ndf_i, n_layers, norm_layer) | |
self.add_module("model_%d" % i, nn.Sequential(*layers)) | |
def get_layers(self, input_nc, ndf=64, n_layers=3, norm_layer=nn.BatchNorm2d): | |
kw = 3 | |
padw = 1 | |
sequence = [spectral_norm(nn.Conv2d(input_nc, ndf, kernel_size=kw, | |
stride=2, padding=padw)), nn.LeakyReLU(0.2, True)] | |
nf_mult = 1 | |
nf_mult_prev = 1 | |
for n in range(1, n_layers): | |
nf_mult_prev = nf_mult | |
nf_mult = min(2**n, 8) | |
sequence += [spectral_norm(nn.Conv2d(ndf * nf_mult_prev, ndf * nf_mult, | |
kernel_size=kw, stride=2, padding=padw))] | |
if norm_layer: | |
sequence += [norm_layer(ndf * nf_mult)] | |
sequence += [self.nl_layer()] | |
nf_mult_prev = nf_mult | |
nf_mult = min(2**n_layers, 8) | |
sequence += [spectral_norm(nn.Conv2d(ndf * nf_mult_prev, ndf * nf_mult, | |
kernel_size=kw, stride=1, padding=padw))] | |
if norm_layer: | |
sequence += [norm_layer(ndf * nf_mult)] | |
sequence += [self.nl_layer()] | |
sequence += [spectral_norm(nn.Conv2d(ndf * nf_mult, 1, | |
kernel_size=kw, stride=1, padding=padw))] | |
return sequence | |
def forward(self, input): | |
if self.num_D == 1: | |
return self.model(input) | |
result = [] | |
down = input | |
for i in range(self.num_D): | |
model = getattr(self, "model_%d" % i) | |
result.append(model(down)) | |
if i != self.num_D - 1: | |
down = self.down(down, scale_factor=0.5, mode='bilinear') | |
return result | |
class D_NLayers(nn.Module): | |
"""Defines a PatchGAN discriminator""" | |
def __init__(self, input_nc, ndf=64, n_layers=3, norm_layer=nn.BatchNorm2d): | |
"""Construct a PatchGAN discriminator | |
Parameters: | |
input_nc (int) -- the number of channels in input images | |
ndf (int) -- the number of filters in the last conv layer | |
n_layers (int) -- the number of conv layers in the discriminator | |
norm_layer -- normalization layer | |
""" | |
super(D_NLayers, self).__init__() | |
if type(norm_layer) == functools.partial: # no need to use bias as BatchNorm2d has affine parameters | |
use_bias = norm_layer.func != nn.BatchNorm2d | |
else: | |
use_bias = norm_layer != nn.BatchNorm2d | |
kw = 3 | |
padw = 1 | |
sequence = [nn.Conv2d(input_nc, ndf, kernel_size=kw, stride=2, padding=padw), nn.LeakyReLU(0.2, True)] | |
nf_mult = 1 | |
nf_mult_prev = 1 | |
for n in range(1, n_layers): # gradually increase the number of filters | |
nf_mult_prev = nf_mult | |
nf_mult = min(2 ** n, 8) | |
sequence += [ | |
nn.Conv2d(ndf * nf_mult_prev, ndf * nf_mult, kernel_size=kw, stride=2, padding=padw, bias=use_bias), | |
norm_layer(ndf * nf_mult), | |
nn.LeakyReLU(0.2, True) | |
] | |
nf_mult_prev = nf_mult | |
nf_mult = min(2 ** n_layers, 8) | |
sequence += [ | |
nn.Conv2d(ndf * nf_mult_prev, ndf * nf_mult, kernel_size=kw, stride=1, padding=padw, bias=use_bias), | |
norm_layer(ndf * nf_mult), | |
nn.LeakyReLU(0.2, True) | |
] | |
sequence += [nn.Conv2d(ndf * nf_mult, 1, kernel_size=kw, stride=1, padding=padw)] # output 1 channel prediction map | |
self.model = nn.Sequential(*sequence) | |
def forward(self, input): | |
"""Standard forward.""" | |
return self.model(input) | |
class G_Unet_add_input(nn.Module): | |
def __init__(self, input_nc, output_nc, nz, num_downs, ngf=64, | |
norm_layer=None, nl_layer=None, use_dropout=False, use_noise=False, | |
upsample='basic', device=0): | |
super(G_Unet_add_input, self).__init__() | |
self.nz = nz | |
max_nchn = 8 | |
noise = [] | |
for i in range(num_downs+1): | |
if use_noise: | |
noise.append(True) | |
else: | |
noise.append(False) | |
# construct unet structure | |
#print(num_downs) | |
unet_block = UnetBlock_A(ngf * max_nchn, ngf * max_nchn, ngf * max_nchn, noise=noise[num_downs-1], | |
innermost=True, norm_layer=norm_layer, nl_layer=nl_layer, upsample=upsample) | |
for i in range(num_downs - 5): | |
unet_block = UnetBlock_A(ngf * max_nchn, ngf * max_nchn, ngf * max_nchn, unet_block, noise[num_downs-i-3], | |
norm_layer=norm_layer, nl_layer=nl_layer, use_dropout=use_dropout, upsample=upsample) | |
unet_block = UnetBlock_A(ngf * 4, ngf * 4, ngf * max_nchn, unet_block, noise[2], | |
norm_layer=norm_layer, nl_layer=nl_layer, upsample=upsample) | |
unet_block = UnetBlock_A(ngf * 2, ngf * 2, ngf * 4, unet_block, noise[1], | |
norm_layer=norm_layer, nl_layer=nl_layer, upsample=upsample) | |
unet_block = UnetBlock_A(ngf, ngf, ngf * 2, unet_block, noise[0], | |
norm_layer=norm_layer, nl_layer=nl_layer, upsample=upsample) | |
unet_block = UnetBlock_A(input_nc + nz, output_nc, ngf, unet_block, None, | |
outermost=True, norm_layer=norm_layer, nl_layer=nl_layer, upsample=upsample) | |
self.model = unet_block | |
def forward(self, x, z=None): | |
if self.nz > 0: | |
z_img = z.view(z.size(0), z.size(1), 1, 1).expand( | |
z.size(0), z.size(1), x.size(2), x.size(3)) | |
x_with_z = torch.cat([x, z_img], 1) | |
else: | |
x_with_z = x # no z | |
return torch.tanh(self.model(x_with_z)) | |
# return self.model(x_with_z) | |
class G_Unet_add_input_G(nn.Module): | |
def __init__(self, input_nc, output_nc, nz, num_downs, ngf=64, | |
norm_layer=None, nl_layer=None, use_dropout=False, use_noise=False, | |
upsample='basic', device=0): | |
super(G_Unet_add_input_G, self).__init__() | |
self.nz = nz | |
max_nchn = 8 | |
noise = [] | |
for i in range(num_downs+1): | |
if use_noise: | |
noise.append(True) | |
else: | |
noise.append(False) | |
# construct unet structure | |
#print(num_downs) | |
unet_block = UnetBlock_G(ngf * max_nchn, ngf * max_nchn, ngf * max_nchn, noise=False, | |
innermost=True, norm_layer=norm_layer, nl_layer=nl_layer, upsample=upsample) | |
for i in range(num_downs - 5): | |
unet_block = UnetBlock_G(ngf * max_nchn, ngf * max_nchn, ngf * max_nchn, unet_block, noise=False, | |
norm_layer=norm_layer, nl_layer=nl_layer, use_dropout=use_dropout, upsample=upsample) | |
unet_block = UnetBlock_G(ngf * 4, ngf * 4, ngf * max_nchn, unet_block, noise[2], | |
norm_layer=norm_layer, nl_layer=nl_layer, upsample='basic') | |
unet_block = UnetBlock_G(ngf * 2, ngf * 2, ngf * 4, unet_block, noise[1], | |
norm_layer=norm_layer, nl_layer=nl_layer, upsample='basic') | |
unet_block = UnetBlock_G(ngf, ngf, ngf * 2, unet_block, noise[0], | |
norm_layer=norm_layer, nl_layer=nl_layer, upsample='basic') | |
unet_block = UnetBlock_G(input_nc + nz, output_nc, ngf, unet_block, None, | |
outermost=True, norm_layer=norm_layer, nl_layer=nl_layer, upsample='basic') | |
self.model = unet_block | |
def forward(self, x, z=None): | |
if self.nz > 0: | |
z_img = z.view(z.size(0), z.size(1), 1, 1).expand( | |
z.size(0), z.size(1), x.size(2), x.size(3)) | |
x_with_z = torch.cat([x, z_img], 1) | |
else: | |
x_with_z = x # no z | |
# return F.tanh(self.model(x_with_z)) | |
return self.model(x_with_z) | |
class G_Unet_add_input_C(nn.Module): | |
def __init__(self, input_nc, output_nc, nz, num_downs, ngf=64, | |
norm_layer=None, nl_layer=None, use_dropout=False, use_noise=False, | |
upsample='basic', device=0): | |
super(G_Unet_add_input_C, self).__init__() | |
self.nz = nz | |
max_nchn = 8 | |
# construct unet structure | |
#print(num_downs) | |
unet_block = UnetBlock(ngf * max_nchn, ngf * max_nchn, ngf * max_nchn, noise=False, | |
innermost=True, norm_layer=norm_layer, nl_layer=nl_layer, upsample=upsample) | |
for i in range(num_downs - 5): | |
unet_block = UnetBlock(ngf * max_nchn, ngf * max_nchn, ngf * max_nchn, unet_block, noise=False, | |
norm_layer=norm_layer, nl_layer=nl_layer, use_dropout=use_dropout, upsample=upsample) | |
unet_block = UnetBlock(ngf * 4, ngf * 4, ngf * max_nchn, unet_block, noise=False, | |
norm_layer=norm_layer, nl_layer=nl_layer, upsample=upsample) | |
unet_block = UnetBlock(ngf * 2, ngf * 2, ngf * 4, unet_block, noise=False, | |
norm_layer=norm_layer, nl_layer=nl_layer, upsample=upsample) | |
unet_block = UnetBlock(ngf, ngf, ngf * 2, unet_block, noise=False, | |
norm_layer=norm_layer, nl_layer=nl_layer, upsample=upsample) | |
unet_block = UnetBlock(input_nc + nz, output_nc, ngf, unet_block, noise=False, | |
outermost=True, norm_layer=norm_layer, nl_layer=nl_layer, upsample=upsample) | |
self.model = unet_block | |
def forward(self, x, z=None): | |
if self.nz > 0: | |
z_img = z.view(z.size(0), z.size(1), 1, 1).expand( | |
z.size(0), z.size(1), x.size(2), x.size(3)) | |
x_with_z = torch.cat([x, z_img], 1) | |
else: | |
x_with_z = x # no z | |
# return torch.tanh(self.model(x_with_z)) | |
return self.model(x_with_z) | |
def upsampleLayer(inplanes, outplanes, kw=1, upsample='basic', padding_type='replicate'): | |
# padding_type = 'zero' | |
if upsample == 'basic': | |
upconv = [nn.ConvTranspose2d(inplanes, outplanes, kernel_size=4, stride=2, padding=1)]#, padding_mode='replicate' | |
elif upsample == 'bilinear' or upsample == 'nearest' or upsample == 'linear': | |
upconv = [nn.Upsample(scale_factor=2, mode=upsample, align_corners=True), | |
#nn.ReplicationPad2d(1), | |
nn.Conv2d(inplanes, outplanes, kernel_size=1, stride=1, padding=0)] | |
# p = kw//2 | |
# upconv = [nn.Upsample(scale_factor=2, mode=upsample, align_corners=True), | |
# nn.Conv2d(inplanes, outplanes, kernel_size=kw, stride=1, padding=p, padding_mode='replicate')] | |
else: | |
raise NotImplementedError( | |
'upsample layer [%s] not implemented' % upsample) | |
return upconv | |
class UnetBlock_G(nn.Module): | |
def __init__(self, input_nc, outer_nc, inner_nc, | |
submodule=None, noise=None, outermost=False, innermost=False, | |
norm_layer=None, nl_layer=None, use_dropout=False, upsample='basic', padding_type='replicate'): | |
super(UnetBlock_G, self).__init__() | |
self.outermost = outermost | |
p = 0 | |
downconv = [] | |
if padding_type == 'reflect': | |
downconv += [nn.ReflectionPad2d(1)] | |
elif padding_type == 'replicate': | |
downconv += [nn.ReplicationPad2d(1)] | |
elif padding_type == 'zero': | |
p = 1 | |
else: | |
raise NotImplementedError( | |
'padding [%s] is not implemented' % padding_type) | |
downconv += [nn.Conv2d(input_nc, inner_nc, | |
kernel_size=3, stride=2, padding=p)] | |
# downsample is different from upsample | |
downrelu = nn.LeakyReLU(0.2, True) | |
downnorm = norm_layer(inner_nc) if norm_layer is not None else None | |
uprelu = nl_layer() | |
uprelu2 = nl_layer() | |
uppad = nn.ReplicationPad2d(1) | |
upnorm = norm_layer(outer_nc) if norm_layer is not None else None | |
upnorm2 = norm_layer(outer_nc) if norm_layer is not None else None | |
self.noiseblock = ApplyNoise(outer_nc) | |
self.noise = noise | |
if outermost: | |
upconv = upsampleLayer(inner_nc * 2, inner_nc, upsample=upsample, padding_type=padding_type) | |
uppad = nn.ReplicationPad2d(3) | |
upconv2 = nn.Conv2d(inner_nc, outer_nc, kernel_size=7, padding=0) | |
down = downconv | |
up = [uprelu] + upconv | |
if upnorm is not None: | |
up += [norm_layer(inner_nc)] | |
# upconv = upsampleLayer(inner_nc * 2, outer_nc, upsample=upsample, padding_type=padding_type) | |
# upconv2 = nn.Conv2d(outer_nc, outer_nc, kernel_size=3, padding=0) | |
# down = downconv | |
# up = [uprelu] + upconv | |
# if upnorm is not None: | |
# up += [norm_layer(outer_nc)] | |
up +=[uprelu2, uppad, upconv2] #+ [nn.Tanh()] | |
model = down + [submodule] + up | |
elif innermost: | |
upconv = upsampleLayer(inner_nc, outer_nc, upsample=upsample, padding_type=padding_type) | |
upconv2 = nn.Conv2d(outer_nc, outer_nc, kernel_size=3, padding=p) | |
down = [downrelu] + downconv | |
up = [uprelu] + upconv | |
if upnorm is not None: | |
up += [upnorm] | |
up += [uprelu2, uppad, upconv2] | |
if upnorm2 is not None: | |
up += [upnorm2] | |
model = down + up | |
else: | |
upconv = upsampleLayer(inner_nc * 2, outer_nc, upsample=upsample, padding_type=padding_type) | |
upconv2 = nn.Conv2d(outer_nc, outer_nc, kernel_size=3, padding=p) | |
down = [downrelu] + downconv | |
if downnorm is not None: | |
down += [downnorm] | |
up = [uprelu] + upconv | |
if upnorm is not None: | |
up += [upnorm] | |
up += [uprelu2, uppad, upconv2] | |
if upnorm2 is not None: | |
up += [upnorm2] | |
if use_dropout: | |
model = down + [submodule] + up + [nn.Dropout(0.5)] | |
else: | |
model = down + [submodule] + up | |
self.model = nn.Sequential(*model) | |
def forward(self, x): | |
if self.outermost: | |
return self.model(x) | |
else: | |
x2 = self.model(x) | |
if self.noise: | |
x2 = self.noiseblock(x2, self.noise) | |
return torch.cat([x2, x], 1) | |
class UnetBlock(nn.Module): | |
def __init__(self, input_nc, outer_nc, inner_nc, | |
submodule=None, noise=None, outermost=False, innermost=False, | |
norm_layer=None, nl_layer=None, use_dropout=False, upsample='basic', padding_type='replicate'): | |
super(UnetBlock, self).__init__() | |
self.outermost = outermost | |
p = 0 | |
downconv = [] | |
if padding_type == 'reflect': | |
downconv += [nn.ReflectionPad2d(1)] | |
elif padding_type == 'replicate': | |
downconv += [nn.ReplicationPad2d(1)] | |
elif padding_type == 'zero': | |
p = 1 | |
else: | |
raise NotImplementedError( | |
'padding [%s] is not implemented' % padding_type) | |
downconv += [nn.Conv2d(input_nc, inner_nc, | |
kernel_size=3, stride=2, padding=p)] | |
# downsample is different from upsample | |
downrelu = nn.LeakyReLU(0.2, True) | |
downnorm = norm_layer(inner_nc) if norm_layer is not None else None | |
uprelu = nl_layer() | |
uprelu2 = nl_layer() | |
uppad = nn.ReplicationPad2d(1) | |
upnorm = norm_layer(outer_nc) if norm_layer is not None else None | |
upnorm2 = norm_layer(outer_nc) if norm_layer is not None else None | |
self.noiseblock = ApplyNoise(outer_nc) | |
self.noise = noise | |
if outermost: | |
upconv = upsampleLayer(inner_nc * 2, outer_nc, upsample=upsample, padding_type=padding_type) | |
upconv2 = nn.Conv2d(outer_nc, outer_nc, kernel_size=3, padding=p) | |
down = downconv | |
up = [uprelu] + upconv | |
if upnorm is not None: | |
up += [upnorm] | |
up +=[uprelu2, uppad, upconv2] #+ [nn.Tanh()] | |
model = down + [submodule] + up | |
elif innermost: | |
upconv = upsampleLayer(inner_nc, outer_nc, upsample=upsample, padding_type=padding_type) | |
upconv2 = nn.Conv2d(outer_nc, outer_nc, kernel_size=3, padding=p) | |
down = [downrelu] + downconv | |
up = [uprelu] + upconv | |
if upnorm is not None: | |
up += [upnorm] | |
up += [uprelu2, uppad, upconv2] | |
if upnorm2 is not None: | |
up += [upnorm2] | |
model = down + up | |
else: | |
upconv = upsampleLayer(inner_nc * 2, outer_nc, upsample=upsample, padding_type=padding_type) | |
upconv2 = nn.Conv2d(outer_nc, outer_nc, kernel_size=3, padding=p) | |
down = [downrelu] + downconv | |
if downnorm is not None: | |
down += [downnorm] | |
up = [uprelu] + upconv | |
if upnorm is not None: | |
up += [upnorm] | |
up += [uprelu2, uppad, upconv2] | |
if upnorm2 is not None: | |
up += [upnorm2] | |
if use_dropout: | |
model = down + [submodule] + up + [nn.Dropout(0.5)] | |
else: | |
model = down + [submodule] + up | |
self.model = nn.Sequential(*model) | |
def forward(self, x): | |
if self.outermost: | |
return self.model(x) | |
else: | |
x2 = self.model(x) | |
if self.noise: | |
x2 = self.noiseblock(x2, self.noise) | |
return torch.cat([x2, x], 1) | |
# Defines the submodule with skip connection. | |
# X -------------------identity---------------------- X | |
# |-- downsampling -- |submodule| -- upsampling --| | |
class UnetBlock_A(nn.Module): | |
def __init__(self, input_nc, outer_nc, inner_nc, | |
submodule=None, noise=None, outermost=False, innermost=False, | |
norm_layer=None, nl_layer=None, use_dropout=False, upsample='basic', padding_type='replicate'): | |
super(UnetBlock_A, self).__init__() | |
self.outermost = outermost | |
p = 0 | |
downconv = [] | |
if padding_type == 'reflect': | |
downconv += [nn.ReflectionPad2d(1)] | |
elif padding_type == 'replicate': | |
downconv += [nn.ReplicationPad2d(1)] | |
elif padding_type == 'zero': | |
p = 1 | |
else: | |
raise NotImplementedError( | |
'padding [%s] is not implemented' % padding_type) | |
downconv += [spectral_norm(nn.Conv2d(input_nc, inner_nc, | |
kernel_size=3, stride=2, padding=p))] | |
# downsample is different from upsample | |
downrelu = nn.LeakyReLU(0.2, True) | |
downnorm = norm_layer(inner_nc) if norm_layer is not None else None | |
uprelu = nl_layer() | |
uprelu2 = nl_layer() | |
uppad = nn.ReplicationPad2d(1) | |
upnorm = norm_layer(outer_nc) if norm_layer is not None else None | |
upnorm2 = norm_layer(outer_nc) if norm_layer is not None else None | |
self.noiseblock = ApplyNoise(outer_nc) | |
self.noise = noise | |
if outermost: | |
upconv = upsampleLayer(inner_nc * 1, outer_nc, upsample=upsample, padding_type=padding_type) | |
upconv2 = spectral_norm(nn.Conv2d(outer_nc, outer_nc, kernel_size=3, padding=p)) | |
down = downconv | |
up = [uprelu] + upconv | |
if upnorm is not None: | |
up += [upnorm] | |
up +=[uprelu2, uppad, upconv2] #+ [nn.Tanh()] | |
model = down + [submodule] + up | |
elif innermost: | |
upconv = upsampleLayer(inner_nc, outer_nc, upsample=upsample, padding_type=padding_type) | |
upconv2 = spectral_norm(nn.Conv2d(outer_nc, outer_nc, kernel_size=3, padding=p)) | |
down = [downrelu] + downconv | |
up = [uprelu] + upconv | |
if upnorm is not None: | |
up += [upnorm] | |
up += [uprelu2, uppad, upconv2] | |
if upnorm2 is not None: | |
up += [upnorm2] | |
model = down + up | |
else: | |
upconv = upsampleLayer(inner_nc * 1, outer_nc, upsample=upsample, padding_type=padding_type) | |
upconv2 = spectral_norm(nn.Conv2d(outer_nc, outer_nc, kernel_size=3, padding=p)) | |
down = [downrelu] + downconv | |
if downnorm is not None: | |
down += [downnorm] | |
up = [uprelu] + upconv | |
if upnorm is not None: | |
up += [upnorm] | |
up += [uprelu2, uppad, upconv2] | |
if upnorm2 is not None: | |
up += [upnorm2] | |
if use_dropout: | |
model = down + [submodule] + up + [nn.Dropout(0.5)] | |
else: | |
model = down + [submodule] + up | |
self.model = nn.Sequential(*model) | |
def forward(self, x): | |
if self.outermost: | |
return self.model(x) | |
else: | |
x2 = self.model(x) | |
if self.noise: | |
x2 = self.noiseblock(x2, self.noise) | |
if x2.shape[-1]==x.shape[-1]: | |
return x2 + x | |
else: | |
x2 = F.interpolate(x2, x.shape[2:]) | |
return x2 + x | |
class E_ResNet(nn.Module): | |
def __init__(self, input_nc=3, output_nc=1, ndf=64, n_blocks=4, | |
norm_layer=None, nl_layer=None, vaeLike=False): | |
super(E_ResNet, self).__init__() | |
self.vaeLike = vaeLike | |
max_ndf = 4 | |
conv_layers = [ | |
nn.Conv2d(input_nc, ndf, kernel_size=3, stride=2, padding=1, bias=True)] | |
for n in range(1, n_blocks): | |
input_ndf = ndf * min(max_ndf, n) | |
output_ndf = ndf * min(max_ndf, n + 1) | |
conv_layers += [BasicBlock(input_ndf, | |
output_ndf, norm_layer, nl_layer)] | |
conv_layers += [nl_layer(), nn.AdaptiveAvgPool2d(4)] | |
if vaeLike: | |
self.fc = nn.Sequential(*[nn.Linear(output_ndf * 16, output_nc)]) | |
self.fcVar = nn.Sequential(*[nn.Linear(output_ndf * 16, output_nc)]) | |
else: | |
self.fc = nn.Sequential(*[nn.Linear(output_ndf * 16, output_nc)]) | |
self.conv = nn.Sequential(*conv_layers) | |
def forward(self, x): | |
x_conv = self.conv(x) | |
conv_flat = x_conv.view(x.size(0), -1) | |
output = self.fc(conv_flat) | |
if self.vaeLike: | |
outputVar = self.fcVar(conv_flat) | |
return output, outputVar | |
else: | |
return output | |
return output | |
# Defines the Unet generator. | |
# |num_downs|: number of downsamplings in UNet. For example, | |
# if |num_downs| == 7, image of size 128x128 will become of size 1x1 | |
# at the bottleneck | |
class G_Unet_add_all(nn.Module): | |
def __init__(self, input_nc, output_nc, nz, num_downs, ngf=64, | |
norm_layer=None, nl_layer=None, use_dropout=False, use_noise=False, upsample='basic'): | |
super(G_Unet_add_all, self).__init__() | |
self.nz = nz | |
self.mapping = G_mapping(self.nz, self.nz, 512, normalize_latents=False, lrmul=1) | |
self.truncation_psi = 0 | |
self.truncation_cutoff = 0 | |
# - 2 means we start from feature map with height and width equals 4. | |
# as this example, we get num_layers = 18. | |
num_layers = int(np.log2(512)) * 2 - 2 | |
# Noise inputs. | |
self.noise_inputs = [] | |
for layer_idx in range(num_layers): | |
res = layer_idx // 2 + 2 | |
shape = [1, 1, 2 ** res, 2 ** res] | |
self.noise_inputs.append(torch.randn(*shape).to("cuda")) | |
# construct unet structure | |
unet_block = UnetBlock_with_z(ngf * 8, ngf * 8, ngf * 8, nz, submodule=None, innermost=True, | |
norm_layer=norm_layer, nl_layer=nl_layer, upsample=upsample) | |
unet_block = UnetBlock_with_z(ngf * 8, ngf * 8, ngf * 8, nz, submodule=unet_block, | |
norm_layer=norm_layer, nl_layer=nl_layer, use_dropout=use_dropout, upsample=upsample) | |
for i in range(num_downs - 6): | |
unet_block = UnetBlock_with_z(ngf * 8, ngf * 8, ngf * 8, nz, submodule=unet_block, | |
norm_layer=norm_layer, nl_layer=nl_layer, use_dropout=use_dropout, upsample=upsample) | |
unet_block = UnetBlock_with_z(ngf * 4, ngf * 4, ngf * 8, nz, submodule=unet_block, | |
norm_layer=norm_layer, nl_layer=nl_layer, upsample=upsample) | |
unet_block = UnetBlock_with_z(ngf * 2, ngf * 2, ngf * 4, nz, submodule=unet_block, | |
norm_layer=norm_layer, nl_layer=nl_layer, upsample=upsample) | |
unet_block = UnetBlock_with_z(ngf, ngf, ngf * 2, nz, submodule=unet_block, | |
norm_layer=norm_layer, nl_layer=nl_layer, upsample=upsample) | |
unet_block = UnetBlock_with_z(input_nc, output_nc, ngf, nz, submodule=unet_block, | |
outermost=True, norm_layer=norm_layer, nl_layer=nl_layer, upsample=upsample) | |
self.model = unet_block | |
def forward(self, x, z): | |
dlatents1, num_layers = self.mapping(z) | |
dlatents1 = dlatents1.unsqueeze(1) | |
dlatents1 = dlatents1.expand(-1, int(num_layers), -1) | |
# Apply truncation trick. | |
if self.truncation_psi and self.truncation_cutoff: | |
coefs = np.ones([1, num_layers, 1], dtype=np.float32) | |
for i in range(num_layers): | |
if i < self.truncation_cutoff: | |
coefs[:, i, :] *= self.truncation_psi | |
"""Linear interpolation. | |
a + (b - a) * t (a = 0) | |
reduce to | |
b * t | |
""" | |
dlatents1 = dlatents1 * torch.Tensor(coefs).to(dlatents1.device) | |
return torch.tanh(self.model(x, dlatents1, self.noise_inputs)) | |
class ApplyNoise(nn.Module): | |
def __init__(self, channels): | |
super().__init__() | |
self.channels = channels | |
self.weight = nn.Parameter(torch.randn(channels), requires_grad=True) | |
self.bias = nn.Parameter(torch.zeros(channels), requires_grad=True) | |
def forward(self, x, noise): | |
W,_ = torch.split(self.weight.view(1, -1, 1, 1), self.channels // 2, dim=1) | |
B,_ = torch.split(self.bias.view(1, -1, 1, 1), self.channels // 2, dim=1) | |
Z = torch.zeros_like(W) | |
w = torch.cat([W,Z], dim=1).to(x.device) | |
b = torch.cat([B,Z], dim=1).to(x.device) | |
adds = w * torch.randn_like(x) + b | |
return x + adds.type_as(x) | |
class FC(nn.Module): | |
def __init__(self, | |
in_channels, | |
out_channels, | |
gain=2**(0.5), | |
use_wscale=False, | |
lrmul=1.0, | |
bias=True): | |
""" | |
The complete conversion of Dense/FC/Linear Layer of original Tensorflow version. | |
""" | |
super(FC, self).__init__() | |
he_std = gain * in_channels ** (-0.5) # He init | |
if use_wscale: | |
init_std = 1.0 / lrmul | |
self.w_lrmul = he_std * lrmul | |
else: | |
init_std = he_std / lrmul | |
self.w_lrmul = lrmul | |
self.weight = torch.nn.Parameter(torch.randn(out_channels, in_channels) * init_std) | |
if bias: | |
self.bias = torch.nn.Parameter(torch.zeros(out_channels)) | |
self.b_lrmul = lrmul | |
else: | |
self.bias = None | |
def forward(self, x): | |
if self.bias is not None: | |
out = F.linear(x, self.weight * self.w_lrmul, self.bias * self.b_lrmul) | |
else: | |
out = F.linear(x, self.weight * self.w_lrmul) | |
out = F.leaky_relu(out, 0.2, inplace=True) | |
return out | |
class ApplyStyle(nn.Module): | |
""" | |
@ref: https://github.com/lernapparat/lernapparat/blob/master/style_gan/pytorch_style_gan.ipynb | |
""" | |
def __init__(self, latent_size, channels, use_wscale, nl_layer): | |
super(ApplyStyle, self).__init__() | |
modules = [nn.Linear(latent_size, channels*2)] | |
if nl_layer: | |
modules += [nl_layer()] | |
self.linear = nn.Sequential(*modules) | |
def forward(self, x, latent): | |
style = self.linear(latent) # style => [batch_size, n_channels*2] | |
shape = [-1, 2, x.size(1), 1, 1] | |
style = style.view(shape) # [batch_size, 2, n_channels, ...] | |
x = x * (style[:, 0] + 1.) + style[:, 1] | |
return x | |
class PixelNorm(nn.Module): | |
def __init__(self, epsilon=1e-8): | |
""" | |
@notice: avoid in-place ops. | |
https://discuss.pytorch.org/t/encounter-the-runtimeerror-one-of-the-variables-needed-for-gradient-computation-has-been-modified-by-an-inplace-operation/836/3 | |
""" | |
super(PixelNorm, self).__init__() | |
self.epsilon = epsilon | |
def forward(self, x): | |
tmp = torch.mul(x, x) # or x ** 2 | |
tmp1 = torch.rsqrt(torch.mean(tmp, dim=1, keepdim=True) + self.epsilon) | |
return x * tmp1 | |
class InstanceNorm(nn.Module): | |
def __init__(self, epsilon=1e-8): | |
""" | |
@notice: avoid in-place ops. | |
https://discuss.pytorch.org/t/encounter-the-runtimeerror-one-of-the-variables-needed-for-gradient-computation-has-been-modified-by-an-inplace-operation/836/3 | |
""" | |
super(InstanceNorm, self).__init__() | |
self.epsilon = epsilon | |
def forward(self, x): | |
x = x - torch.mean(x, (2, 3), True) | |
tmp = torch.mul(x, x) # or x ** 2 | |
tmp = torch.rsqrt(torch.mean(tmp, (2, 3), True) + self.epsilon) | |
return x * tmp | |
class LayerEpilogue(nn.Module): | |
def __init__(self, channels, dlatent_size, use_wscale, use_noise, | |
use_pixel_norm, use_instance_norm, use_styles, nl_layer=None): | |
super(LayerEpilogue, self).__init__() | |
self.use_noise = use_noise | |
if use_noise: | |
self.noise = ApplyNoise(channels) | |
self.act = nn.LeakyReLU(negative_slope=0.2) | |
if use_pixel_norm: | |
self.pixel_norm = PixelNorm() | |
else: | |
self.pixel_norm = None | |
if use_instance_norm: | |
self.instance_norm = InstanceNorm() | |
else: | |
self.instance_norm = None | |
if use_styles: | |
self.style_mod = ApplyStyle(dlatent_size, channels, use_wscale=use_wscale, nl_layer=nl_layer) | |
else: | |
self.style_mod = None | |
def forward(self, x, noise, dlatents_in_slice=None): | |
# if noise is not None: | |
if self.use_noise: | |
x = self.noise(x, noise) | |
x = self.act(x) | |
if self.pixel_norm is not None: | |
x = self.pixel_norm(x) | |
if self.instance_norm is not None: | |
x = self.instance_norm(x) | |
if self.style_mod is not None: | |
x = self.style_mod(x, dlatents_in_slice) | |
return x | |
class G_mapping(nn.Module): | |
def __init__(self, | |
mapping_fmaps=512, | |
dlatent_size=512, | |
resolution=512, | |
normalize_latents=True, # Normalize latent vectors (Z) before feeding them to the mapping layers? | |
use_wscale=True, # Enable equalized learning rate? | |
lrmul=0.01, # Learning rate multiplier for the mapping layers. | |
gain=2**(0.5), # original gain in tensorflow. | |
nl_layer=None | |
): | |
super(G_mapping, self).__init__() | |
self.mapping_fmaps = mapping_fmaps | |
func = [ | |
nn.Linear(self.mapping_fmaps, dlatent_size) | |
] | |
if nl_layer: | |
func += [nl_layer()] | |
for j in range(0,4): | |
func += [ | |
nn.Linear(dlatent_size, dlatent_size) | |
] | |
if nl_layer: | |
func += [nl_layer()] | |
self.func = nn.Sequential(*func) | |
#FC(self.mapping_fmaps, dlatent_size, gain, lrmul=lrmul, use_wscale=use_wscale), | |
#FC(dlatent_size, dlatent_size, gain, lrmul=lrmul, use_wscale=use_wscale), | |
self.normalize_latents = normalize_latents | |
self.resolution_log2 = int(np.log2(resolution)) | |
self.num_layers = self.resolution_log2 * 2 - 2 | |
self.pixel_norm = PixelNorm() | |
# - 2 means we start from feature map with height and width equals 4. | |
# as this example, we get num_layers = 18. | |
def forward(self, x): | |
if self.normalize_latents: | |
x = self.pixel_norm(x) | |
out = self.func(x) | |
return out, self.num_layers | |
class UnetBlock_with_z(nn.Module): | |
def __init__(self, input_nc, outer_nc, inner_nc, nz=0, | |
submodule=None, outermost=False, innermost=False, | |
norm_layer=None, nl_layer=None, use_dropout=False, | |
upsample='basic', padding_type='replicate'): | |
super(UnetBlock_with_z, self).__init__() | |
p = 0 | |
downconv = [] | |
if padding_type == 'reflect': | |
downconv += [nn.ReflectionPad2d(1)] | |
elif padding_type == 'replicate': | |
downconv += [nn.ReplicationPad2d(1)] | |
elif padding_type == 'zero': | |
p = 1 | |
else: | |
raise NotImplementedError( | |
'padding [%s] is not implemented' % padding_type) | |
self.outermost = outermost | |
self.innermost = innermost | |
self.nz = nz | |
# input_nc = input_nc + nz | |
downconv += [spectral_norm(nn.Conv2d(input_nc, inner_nc, | |
kernel_size=3, stride=2, padding=p))] | |
# downsample is different from upsample | |
downrelu = nn.LeakyReLU(0.2, True) | |
downnorm = norm_layer(inner_nc) if norm_layer is not None else None | |
uprelu = nl_layer() | |
uprelu2 = nl_layer() | |
uppad = nn.ReplicationPad2d(1) | |
upnorm = norm_layer(outer_nc) if norm_layer is not None else None | |
upnorm2 = norm_layer(outer_nc) if norm_layer is not None else None | |
use_styles=False | |
uprelu = nl_layer() | |
if self.nz >0: | |
use_styles=True | |
if outermost: | |
self.adaIn = LayerEpilogue(inner_nc, self.nz, use_wscale=True, use_noise=False, | |
use_pixel_norm=True, use_instance_norm=True, use_styles=use_styles, nl_layer=nl_layer) | |
upconv = upsampleLayer( | |
inner_nc , outer_nc, upsample=upsample, padding_type=padding_type) | |
upconv2 = spectral_norm(nn.Conv2d(outer_nc, outer_nc, kernel_size=3, padding=p)) | |
down = downconv | |
up = [uprelu] + upconv | |
if upnorm is not None: | |
up += [upnorm] | |
up +=[uprelu2, uppad, upconv2] #+ [nn.Tanh()] | |
elif innermost: | |
self.adaIn = LayerEpilogue(inner_nc, self.nz, use_wscale=True, use_noise=True, | |
use_pixel_norm=True, use_instance_norm=True, use_styles=use_styles, nl_layer=nl_layer) | |
upconv = upsampleLayer( | |
inner_nc, outer_nc, upsample=upsample, padding_type=padding_type) | |
upconv2 = spectral_norm(nn.Conv2d(outer_nc, outer_nc, kernel_size=3, padding=p)) | |
down = [downrelu] + downconv | |
up = [uprelu] + upconv | |
if norm_layer is not None: | |
up += [norm_layer(outer_nc)] | |
up += [uprelu2, uppad, upconv2] | |
if upnorm2 is not None: | |
up += [upnorm2] | |
else: | |
self.adaIn = LayerEpilogue(inner_nc, self.nz, use_wscale=True, use_noise=False, | |
use_pixel_norm=True, use_instance_norm=True, use_styles=use_styles, nl_layer=nl_layer) | |
upconv = upsampleLayer( | |
inner_nc , outer_nc, upsample=upsample, padding_type=padding_type) | |
upconv2 = spectral_norm(nn.Conv2d(outer_nc, outer_nc, kernel_size=3, padding=p)) | |
down = [downrelu] + downconv | |
if norm_layer is not None: | |
down += [norm_layer(inner_nc)] | |
up = [uprelu] + upconv | |
if norm_layer is not None: | |
up += [norm_layer(outer_nc)] | |
up += [uprelu2, uppad, upconv2] | |
if upnorm2 is not None: | |
up += [upnorm2] | |
if use_dropout: | |
up += [nn.Dropout(0.5)] | |
self.down = nn.Sequential(*down) | |
self.submodule = submodule | |
self.up = nn.Sequential(*up) | |
def forward(self, x, z, noise): | |
if self.outermost: | |
x1 = self.down(x) | |
x2 = self.submodule(x1, z[:,2:], noise[2:]) | |
return self.up(x2) | |
elif self.innermost: | |
x1 = self.down(x) | |
x_and_z = self.adaIn(x1, noise[0], z[:,0]) | |
x2 = self.up(x_and_z) | |
x2 = F.interpolate(x2, x.shape[2:]) | |
return x2 + x | |
else: | |
x1 = self.down(x) | |
x2 = self.submodule(x1, z[:,2:], noise[2:]) | |
x_and_z = self.adaIn(x2, noise[0], z[:,0]) | |
return self.up(x_and_z) + x | |
class E_NLayers(nn.Module): | |
def __init__(self, input_nc, output_nc=1, ndf=64, n_layers=4, | |
norm_layer=None, nl_layer=None, vaeLike=False): | |
super(E_NLayers, self).__init__() | |
self.vaeLike = vaeLike | |
kw, padw = 3, 1 | |
sequence = [spectral_norm(nn.Conv2d(input_nc, ndf, kernel_size=kw, | |
stride=2, padding=padw, padding_mode='replicate')), nl_layer()] | |
nf_mult = 1 | |
nf_mult_prev = 1 | |
for n in range(1, n_layers): | |
nf_mult_prev = nf_mult | |
nf_mult = min(2**n, 8) | |
sequence += [spectral_norm(nn.Conv2d(ndf * nf_mult_prev, ndf * nf_mult, | |
kernel_size=kw, stride=2, padding=padw, padding_mode='replicate'))] | |
if norm_layer is not None: | |
sequence += [norm_layer(ndf * nf_mult)] | |
sequence += [nl_layer()] | |
sequence += [nn.AdaptiveAvgPool2d(4)] | |
self.conv = nn.Sequential(*sequence) | |
self.fc = nn.Sequential(*[spectral_norm(nn.Linear(ndf * nf_mult * 16, output_nc))]) | |
if vaeLike: | |
self.fcVar = nn.Sequential(*[spectral_norm(nn.Linear(ndf * nf_mult * 16, output_nc))]) | |
def forward(self, x): | |
x_conv = self.conv(x) | |
conv_flat = x_conv.view(x.size(0), -1) | |
output = self.fc(conv_flat) | |
if self.vaeLike: | |
outputVar = self.fcVar(conv_flat) | |
return output, outputVar | |
return output | |
class BasicBlock(nn.Module): | |
def __init__(self, inplanes, outplanes): | |
super(BasicBlock, self).__init__() | |
layers = [] | |
norm_layer=get_norm_layer(norm_type='layer') #functools.partial(LayerNorm) | |
# norm_layer = None | |
nl_layer=nn.ReLU() | |
if norm_layer is not None: | |
layers += [norm_layer(inplanes)] | |
layers += [nl_layer] | |
layers += [nn.ReplicationPad2d(1), | |
nn.Conv2d(inplanes, outplanes, kernel_size=3, stride=1, | |
padding=0, bias=True)] | |
self.conv = nn.Sequential(*layers) | |
def forward(self, x): | |
return self.conv(x) | |
def define_SVAE(inc=96, outc=3, outplanes=64, blocks=1, netVAE='SVAE', model_name='', load_ext=True, save_dir='', | |
init_type="normal", init_gain=0.02, gpu_ids=[]): | |
if netVAE == 'SVAE': | |
net = ScreenVAE(inc=inc, outc=outc, outplanes=outplanes, blocks=blocks, save_dir=save_dir, | |
init_type=init_type, init_gain=init_gain, gpu_ids=gpu_ids) | |
else: | |
raise NotImplementedError('Encoder model name [%s] is not recognized' % net) | |
init_net(net, init_type=init_type, init_gain=init_gain, gpu_ids=gpu_ids) | |
net.load_networks('latest') | |
return net | |
class ScreenVAE(nn.Module): | |
def __init__(self,inc=1,outc=4, outplanes=64, downs=5, blocks=2,load_ext=True, save_dir='',init_type="normal", init_gain=0.02, gpu_ids=[]): | |
super(ScreenVAE, self).__init__() | |
self.inc = inc | |
self.outc = outc | |
self.save_dir = save_dir | |
norm_layer=functools.partial(LayerNormWarpper) | |
nl_layer=nn.LeakyReLU | |
self.model_names=['enc','dec'] | |
self.enc=define_C(inc+1, outc*2, 0, 24, netC='resnet_6blocks', | |
norm='layer', nl='lrelu', use_dropout=True, init_type='kaiming', | |
gpu_ids=gpu_ids, upsample='bilinear') | |
self.dec=define_G(outc, inc, 0, 48, netG='unet_128_G', | |
norm='layer', nl='lrelu', use_dropout=True, init_type='kaiming', | |
gpu_ids=gpu_ids, where_add='input', upsample='bilinear', use_noise=True) | |
for param in self.parameters(): | |
param.requires_grad = False | |
def load_networks(self, epoch): | |
"""Load all the networks from the disk. | |
Parameters: | |
epoch (int) -- current epoch; used in the file name '%s_net_%s.pth' % (epoch, name) | |
""" | |
for name in self.model_names: | |
if isinstance(name, str): | |
load_filename = '%s_net_%s.pth' % (epoch, name) | |
load_path = os.path.join(self.save_dir, load_filename) | |
net = getattr(self, name) | |
if isinstance(net, torch.nn.DataParallel): | |
net = net.module | |
print('loading the model from %s' % load_path) | |
state_dict = torch.load( | |
load_path, map_location=lambda storage, loc: storage.cuda()) | |
if hasattr(state_dict, '_metadata'): | |
del state_dict._metadata | |
net.load_state_dict(state_dict) | |
del state_dict | |
def npad(self, im, pad=128): | |
h,w = im.shape[-2:] | |
hp = h //pad*pad+pad | |
wp = w //pad*pad+pad | |
return F.pad(im, (0, wp-w, 0, hp-h), mode='replicate') | |
def forward(self, x, line=None, img_input=True, output_screen_only=True): | |
if img_input: | |
if line is None: | |
line = torch.ones_like(x) | |
else: | |
line = torch.sign(line) | |
x = torch.clamp(x + (1-line),-1,1) | |
h,w = x.shape[-2:] | |
input = torch.cat([x, line], 1) | |
input = self.npad(input) | |
inter = self.enc(input)[:,:,:h,:w] | |
scr, logvar = torch.split(inter, (self.outc, self.outc), dim=1) | |
if output_screen_only: | |
return scr | |
recons = self.dec(scr) | |
return recons, scr, logvar | |
else: | |
h,w = x.shape[-2:] | |
x = self.npad(x) | |
recons = self.dec(x)[:,:,:h,:w] | |
recons = (recons+1)*(line+1)/2-1 | |
return torch.clamp(recons,-1,1) | |