|
import jittor as jt |
|
from jittor import init |
|
from jittor import nn |
|
from jittor.dataset.mnist import MNIST |
|
import jittor.transform as transform |
|
import argparse |
|
import os |
|
import numpy as np |
|
import math |
|
import time |
|
import cv2 |
|
|
|
jt.flags.use_cuda = 1 |
|
os.makedirs('images', exist_ok=True) |
|
os.makedirs("saved_models", exist_ok=True) |
|
|
|
parser = argparse.ArgumentParser() |
|
parser.add_argument('--n_epochs', type=int, default=200, help='训练的时期数') |
|
parser.add_argument('--batch_size', type=int, default=64, help='批次大小') |
|
parser.add_argument('--lr', type=float, default=0.0002, help='学习率') |
|
parser.add_argument('--b1', type=float, default=0.5, help='梯度的一阶动量衰减') |
|
parser.add_argument('--b2', type=float, default=0.999, help='梯度的一阶动量衰减') |
|
parser.add_argument('--n_cpu', type=int, default=8, help='批处理生成期间要使用的 cpu 线程数') |
|
parser.add_argument('--latent_dim', type=int, default=100, help='潜在空间的维度') |
|
parser.add_argument('--img_size', type=int, default=28, help='每个图像尺寸的大小') |
|
parser.add_argument('--channels', type=int, default=1, help='图像通道数') |
|
parser.add_argument('--sample_interval', type=int, default=400, help='图像样本之间的间隔') |
|
|
|
opt = parser.parse_args() |
|
print(opt) |
|
img_shape = (opt.channels, opt.img_size, opt.img_size) |
|
|
|
|
|
def save_image(img, path, nrow=None): |
|
N,C,W,H = img.shape |
|
''' |
|
[-1,700,28] , img2的形状(1,700,28) |
|
img[0][0][0] = img2[0][0] |
|
img2:[ |
|
[1*28] |
|
......(一共700个) |
|
](1,700,28) |
|
''' |
|
img2=img.reshape([-1,W*nrow*nrow,H]) |
|
|
|
img=img2[:,:W*nrow,:] |
|
for i in range(1,nrow): |
|
''' |
|
img(1,140,28),img2(1,700,28) |
|
img从(1,140,28)->(1,140,28+28)->...->(1,140,28+28+28+28)=(1,140,140) |
|
np.concatenate把两个三维数组合并 |
|
''' |
|
img=np.concatenate([img,img2[:,W*nrow*i:W*nrow*(i+1),:]],axis=2) |
|
|
|
img=(img+1.0)/2.0*255 |
|
|
|
|
|
img=img.transpose((1,2,0)) |
|
|
|
cv2.imwrite(path,img) |
|
|
|
|
|
class Generator(nn.Module): |
|
|
|
def __init__(self): |
|
super(Generator, self).__init__() |
|
|
|
def block(in_feat, out_feat, normalize=True): |
|
layers = [nn.Linear(in_feat, out_feat)] |
|
if normalize: |
|
layers.append(nn.BatchNorm1d(out_feat, 0.8)) |
|
layers.append(nn.LeakyReLU(scale=0.2)) |
|
return layers |
|
self.model = nn.Sequential(*block(opt.latent_dim, 128, normalize=False), *block(128, 256), *block(256, 512), *block(512, 1024), nn.Linear(1024, int(np.prod(img_shape))), nn.Tanh()) |
|
|
|
def execute(self, z): |
|
img = self.model(z) |
|
img = img.view((img.shape[0], *img_shape)) |
|
return img |
|
|
|
|
|
class Discriminator(nn.Module): |
|
|
|
def __init__(self): |
|
super(Discriminator, self).__init__() |
|
self.model = nn.Sequential(nn.Linear(int(np.prod(img_shape)), 512), nn.LeakyReLU(scale=0.2), nn.Linear(512, 256), nn.LeakyReLU(scale=0.2), nn.Linear(256, 1), nn.Sigmoid()) |
|
|
|
def execute(self, img): |
|
img_flat = img.view((img.shape[0], (- 1))) |
|
validity = self.model(img_flat) |
|
return validity |
|
|
|
|
|
''' |
|
源码: |
|
class BCELoss(Module): |
|
def __init__(self, weight=None, size_average=True): |
|
self.weight = weight |
|
self.size_average = size_average |
|
def execute(self, output, target): |
|
return bce_loss(output, target, self.weight, self.size_average) |
|
|
|
# weight:表示对loss中每个元素的加权权值,默认为None |
|
# size_average:指定输出的格式,包括'mean','sum' |
|
# output:判别器对生成的数据的判别结果(64*1) |
|
# target:判别器对真实的数据的判别结果(64*1) |
|
def bce_loss(output, target, weight=None, size_average=True): |
|
# jt.maximum(x,y):返回x和y的元素最大值 |
|
# 公式:损失值 = -权重*[ 理想结果*log(判别结果) + (1-理想结果)*log(1-判别结果) ] |
|
loss = - ( |
|
target * jt.log(jt.maximum(output, 1e-20)) |
|
+ |
|
(1 - target) * jt.log(jt.maximum(1 - output, 1e-20)) |
|
) |
|
if weight is not None: |
|
loss *= weight |
|
if size_average: |
|
return loss.mean()# 求均值 |
|
else: |
|
return loss.sum()# 求和 |
|
''' |
|
|
|
adversarial_loss = nn.BCELoss() |
|
|
|
|
|
generator = Generator() |
|
discriminator = Discriminator() |
|
|
|
|
|
transform = transform.Compose([ |
|
transform.Resize(size=opt.img_size), |
|
transform.Gray(), |
|
transform.ImageNormalize(mean=[0.5], std=[0.5]), |
|
]) |
|
dataloader = MNIST(train=True, transform=transform).set_attrs(batch_size=opt.batch_size, shuffle=True) |
|
|
|
|
|
optimizer_G = jt.optim.Adam(generator.parameters(), lr=opt.lr, betas=(opt.b1, opt.b2)) |
|
optimizer_D = jt.optim.Adam(discriminator.parameters(), lr=opt.lr, betas=(opt.b1, opt.b2)) |
|
|
|
warmup_times = -1 |
|
run_times = 3000 |
|
total_time = 0. |
|
cnt = 0 |
|
|
|
|
|
|
|
|
|
|
|
for epoch in range(opt.n_epochs): |
|
for (i, (real_imgs, _)) in enumerate(dataloader): |
|
|
|
''' |
|
valid表示真,全为1,fake表示假,全为0 |
|
img.shape[0]:图像的垂直尺寸(高度)h |
|
[ [1.0]...(一共h个)...[1.0] ] 64*1的数组 |
|
''' |
|
valid = jt.ones([real_imgs.shape[0], 1]).stop_grad() |
|
fake = jt.zeros([real_imgs.shape[0], 1]).stop_grad() |
|
|
|
|
|
|
|
|
|
|
|
|
|
''' |
|
随机生成一个符合正态分布的噪声,numpy.random.normal(loc=0.0, scale=1.0, size=None) |
|
loc:正态分布的均值,对应着这个分布的中心,0说明这一个以Y轴为对称轴的正态分布 |
|
scale:正态分布的标准差,对应分布的宽度,scale越大,正态分布的曲线越矮胖,scale越小,曲线越高瘦 |
|
shape:(图片的高度h,潜在空间的维度100) == (64,100) == z.shape |
|
''' |
|
z = jt.array(np.random.normal(0, 1, (real_imgs.shape[0], opt.latent_dim)).astype(np.float32)) |
|
|
|
''' |
|
gen_imgs的形状:(64,1,28,28), 64*1中每个元素都是28*28 |
|
[ |
|
[28*28] |
|
...... (一共64个28*28) |
|
] |
|
''' |
|
gen_imgs = generator(z) |
|
|
|
''' |
|
把生成的图片数据放进判别器中,让判别器对其进行分类,计算出数据可能是真实数据的概率值(0-1之间的数) |
|
valid当作是判别器分类的结果,全为1说明判别器认为这个数据来源于真实图片 |
|
adversarial_loss会调用bce_loss求损失值 |
|
因为我们需要使生成器生成的数据越来越像真实的数据,所以我们需要这两个数据越来越相似[discriminator(gen_imgs)和valid] |
|
loss(x,y)=-w*[ylogx+(1-y)log(1-x)] |
|
生成器理想条件下,discriminator(gen_imgs)=1,loss(1,1)=0 |
|
''' |
|
g_loss = adversarial_loss(discriminator(gen_imgs), valid) |
|
|
|
optimizer_G.step(g_loss) |
|
|
|
|
|
|
|
|
|
|
|
|
|
''' |
|
real_imgs:加载的训练集数据 |
|
把训练集数据放进判别器,得到判别器对训练集数据的判别结果,计算出数据可能是真实数据的概率值 |
|
valid当作是判别器分类的结果,全为1说明判别器认为这个数据来源于真实图片 |
|
因为我们需要使判别器把训练集数据判别为真实数据,所以我们需要使这两个数据越来越相似[discriminator(real_imgs), valid] |
|
loss(x,y)=-w*[ylogx+(1-y)log(1-x)] |
|
判别器理想条件下,discriminator(real_imgs)=1,loss(1,1)=0 |
|
''' |
|
real_loss = adversarial_loss(discriminator(real_imgs), valid) |
|
|
|
''' |
|
gen_imgs:生成器生成的图片数据 |
|
把生成的图片数据放进判别器中,让判别器对其进行分类,计算出数据可能是真实数据的概率值(0-1之间的数) |
|
fake当作是判别器分类的结果,全为0说明判别器认为这个数据来源于生成的数据,而不是真实现实中的数据 |
|
调用bce_loss求损失值 |
|
因为我们需要使判别器能识别出机器生成的图片数据,所以我们需要使这两个数越来越相似[discriminator(gen_imgs), fake] |
|
loss(x,y)=-w*[ylogx+(1-y)log(1-x)] |
|
判别器理想条件下,discriminator(gen_imgs)=0,loss(0,0)=0 |
|
''' |
|
fake_loss = adversarial_loss(discriminator(gen_imgs), fake) |
|
|
|
d_loss = ((real_loss + fake_loss) / 2) |
|
|
|
optimizer_D.step(d_loss) |
|
|
|
|
|
|
|
|
|
|
|
|
|
if warmup_times==-1: |
|
''' |
|
D loss:判别器的损失值,越小越好(0-1的数) |
|
G loss:生成器的损失值,越小越好(0-1的数) |
|
numpy():把Var数据类型的数据转换成array数据类型 |
|
''' |
|
print(('[Epoch %d/%d] [Batch %d/%d] [D loss: %f] [G loss: %f]' % (epoch, opt.n_epochs, i, len(dataloader), d_loss.numpy()[0], g_loss.numpy()[0]))) |
|
|
|
batches_done = ((epoch * len(dataloader)) + i) |
|
|
|
if ((batches_done % opt.sample_interval) == 0): |
|
|
|
save_image(gen_imgs.data[:25], ('images/GAN_images/%d.png' % batches_done), nrow=5) |
|
else: |
|
jt.sync_all() |
|
cnt += 1 |
|
print(cnt) |
|
if cnt == warmup_times: |
|
jt.sync_all(True) |
|
sta = time.time() |
|
if cnt > warmup_times + run_times: |
|
jt.sync_all(True) |
|
total_time = time.time() - sta |
|
print(f"run {run_times} iters cost {total_time} seconds, and avg {total_time / run_times} one iter.") |
|
exit(0) |
|
|
|
|
|
if (epoch+1) % 10 == 0: |
|
generator.save("saved_models/generator_last.pkl") |
|
discriminator.save("saved_models/discriminator_last.pkl") |
|
|