Spaces:
Sleeping
Sleeping
import gradio as gr | |
from typing import Dict, Tuple | |
from tqdm import tqdm | |
import torch | |
import torch.nn as nn | |
import torch.nn.functional as F | |
from torch.utils.data import DataLoader | |
from torchvision import models, transforms | |
from torchvision.utils import save_image, make_grid | |
import matplotlib.pyplot as plt | |
from matplotlib.animation import FuncAnimation, PillowWriter | |
import numpy as np | |
from IPython.display import HTML | |
from diffusion_utilities import * | |
#openai.api_key = os.getenv('OPENAI_API_KEY') | |
class ContextUnet(nn.Module): | |
def __init__(self, in_channels, n_feat=256, n_cfeat=10, height=28): # cfeat - context features | |
super(ContextUnet, self).__init__() | |
# number of input channels, number of intermediate feature maps and number of classes | |
self.in_channels = in_channels | |
self.n_feat = n_feat | |
self.n_cfeat = n_cfeat | |
self.h = height #assume h == w. must be divisible by 4, so 28,24,20,16... | |
# Initialize the initial convolutional layer | |
self.init_conv = ResidualConvBlock(in_channels, n_feat, is_res=True) | |
# Initialize the down-sampling path of the U-Net with two levels | |
self.down1 = UnetDown(n_feat, n_feat) # down1 #[10, 256, 8, 8] | |
self.down2 = UnetDown(n_feat, 2 * n_feat) # down2 #[10, 256, 4, 4] | |
# original: self.to_vec = nn.Sequential(nn.AvgPool2d(7), nn.GELU()) | |
self.to_vec = nn.Sequential(nn.AvgPool2d((4)), nn.GELU()) | |
# Embed the timestep and context labels with a one-layer fully connected neural network | |
self.timeembed1 = EmbedFC(1, 2*n_feat) | |
self.timeembed2 = EmbedFC(1, 1*n_feat) | |
self.contextembed1 = EmbedFC(n_cfeat, 2*n_feat) | |
self.contextembed2 = EmbedFC(n_cfeat, 1*n_feat) | |
# Initialize the up-sampling path of the U-Net with three levels | |
self.up0 = nn.Sequential( | |
nn.ConvTranspose2d(2 * n_feat, 2 * n_feat, self.h//4, self.h//4), | |
nn.GroupNorm(8, 2 * n_feat), # normalize | |
nn.ReLU(), | |
) | |
self.up1 = UnetUp(4 * n_feat, n_feat) | |
self.up2 = UnetUp(2 * n_feat, n_feat) | |
# Initialize the final convolutional layers to map to the same number of channels as the input image | |
self.out = nn.Sequential( | |
nn.Conv2d(2 * n_feat, n_feat, 3, 1, 1), # reduce number of feature maps #in_channels, out_channels, kernel_size, stride=1, padding=0 | |
nn.GroupNorm(8, n_feat), # normalize | |
nn.ReLU(), | |
nn.Conv2d(n_feat, self.in_channels, 3, 1, 1), # map to same number of channels as input | |
) | |
def forward(self, x, t, c=None): | |
""" | |
x : (batch, n_feat, h, w) : input image | |
t : (batch, n_cfeat) : time step | |
c : (batch, n_classes) : context label | |
""" | |
# x is the input image, c is the context label, t is the timestep, context_mask says which samples to block the context on | |
# pass the input image through the initial convolutional layer | |
x = self.init_conv(x) | |
# pass the result through the down-sampling path | |
down1 = self.down1(x) #[10, 256, 8, 8] | |
down2 = self.down2(down1) #[10, 256, 4, 4] | |
# convert the feature maps to a vector and apply an activation | |
hiddenvec = self.to_vec(down2) | |
# mask out context if context_mask == 1 | |
if c is None: | |
c = torch.zeros(x.shape[0], self.n_cfeat).to(x) | |
# embed context and timestep | |
cemb1 = self.contextembed1(c).view(-1, self.n_feat * 2, 1, 1) # (batch, 2*n_feat, 1,1) | |
temb1 = self.timeembed1(t).view(-1, self.n_feat * 2, 1, 1) | |
cemb2 = self.contextembed2(c).view(-1, self.n_feat, 1, 1) | |
temb2 = self.timeembed2(t).view(-1, self.n_feat, 1, 1) | |
#print(f"uunet forward: cemb1 {cemb1.shape}. temb1 {temb1.shape}, cemb2 {cemb2.shape}. temb2 {temb2.shape}") | |
up1 = self.up0(hiddenvec) | |
up2 = self.up1(cemb1*up1 + temb1, down2) # add and multiply embeddings | |
up3 = self.up2(cemb2*up2 + temb2, down1) | |
out = self.out(torch.cat((up3, x), 1)) | |
return out | |
# hyperparameters | |
# diffusion hyperparameters | |
timesteps = 500 | |
beta1 = 1e-4 | |
beta2 = 0.02 | |
# network hyperparameters | |
device = torch.device("cuda:0" if torch.cuda.is_available() else torch.device('cpu')) | |
n_feat = 64 # 64 hidden dimension feature | |
n_cfeat = 5 # context vector is of size 5 | |
height = 16 # 16x16 image | |
save_dir = './weights/' | |
# training hyperparameters | |
batch_size = 100 | |
n_epoch = 32 | |
lrate=1e-3 | |
# construct DDPM noise schedule | |
b_t = (beta2 - beta1) * torch.linspace(0, 1, timesteps + 1, device=device) + beta1 | |
a_t = 1 - b_t | |
ab_t = torch.cumsum(a_t.log(), dim=0).exp() | |
ab_t[0] = 1 | |
# construct model | |
nn_model = ContextUnet(in_channels=3, n_feat=n_feat, n_cfeat=n_cfeat, height=height).to(device) | |
# define sampling function for DDIM | |
# removes the noise using ddim | |
def denoise_ddim(x, t, t_prev, pred_noise): | |
ab = ab_t[t] | |
ab_prev = ab_t[t_prev] | |
x0_pred = ab_prev.sqrt() / ab.sqrt() * (x - (1 - ab).sqrt() * pred_noise) | |
dir_xt = (1 - ab_prev).sqrt() * pred_noise | |
return x0_pred + dir_xt | |
# load in model weights and set to eval mode | |
nn_model.load_state_dict(torch.load(f"{save_dir}/model_31.pth", map_location=device)) | |
nn_model.eval() | |
print("Loaded in Model without context") | |
# sample quickly using DDIM | |
def sample_ddim(n_sample, n=20): | |
# x_T ~ N(0, 1), sample initial noise | |
samples = torch.randn(n_sample, 3, height, height).to(device) | |
# array to keep track of generated steps for plotting | |
intermediate = [] | |
step_size = timesteps // n | |
for i in range(timesteps, 0, -step_size): | |
print(f'sampling timestep {i:3d}', end='\r') | |
# reshape time tensor | |
t = torch.tensor([i / timesteps])[:, None, None, None].to(device) | |
eps = nn_model(samples, t) # predict noise e_(x_t,t) | |
samples = denoise_ddim(samples, i, i - step_size, eps) | |
intermediate.append(samples.detach().cpu().numpy()) | |
intermediate = np.stack(intermediate) | |
return samples, intermediate | |
def greet(input): | |
samples, intermediate = sample_ddim(32, n=25) | |
response = intermediate[-1] | |
return response | |
#iface = gr.Interface(fn=greet, inputs="text", outputs="text") | |
#iface.launch() | |
#iface = gr.Interface(fn=greet, inputs=[gr.Textbox(label="Text to find entities", lines=2)], outputs=[gr.HighlightedText(label="Text with entities")], title="NER with dslim/bert-base-NER", description="Find entities using the `dslim/bert-base-NER` model under the hood!", allow_flagging="never", examples=["My name is Andrew and I live in California", "My name is Poli and work at HuggingFace"]) | |
iface = gr.Interface(fn=greet, inputs=[gr.Textbox(label="Co-Retailing Business")], outputs="image") | |
iface.launch() | |