|
|
|
import math |
|
import numbers |
|
import torch |
|
import torch.nn as nn |
|
import torch.nn.functional as F |
|
import torchvision.transforms as transforms |
|
import numpy as np |
|
import torchvision.datasets as datasets |
|
from util.feature_extraction_utils import feature_extractor |
|
from backbone.model_irse import IR_50, IR_152 |
|
from backbone.model_resnet import ResNet_50, ResNet_152 |
|
|
|
device = torch.device("cuda" if torch.cuda.is_available() else "cpu") |
|
tensor_transform = transforms.ToTensor() |
|
pil_transform = transforms.ToPILImage() |
|
|
|
|
|
class ImageFolderWithPaths(datasets.ImageFolder): |
|
"""Custom dataset that includes image file paths. Extends |
|
torchvision.datasets.ImageFolder |
|
""" |
|
|
|
|
|
def __getitem__(self, index): |
|
|
|
original_tuple = super(ImageFolderWithPaths, self).__getitem__(index) |
|
|
|
path = self.imgs[index][0] |
|
|
|
tuple_with_path = original_tuple + (path,) |
|
return tuple_with_path |
|
|
|
|
|
class GaussianSmoothing(nn.Module): |
|
""" |
|
Apply gaussian smoothing on a |
|
1d, 2d or 3d tensor. Filtering is performed seperately for each channel |
|
in the input using a depthwise convolution. |
|
Arguments: |
|
channels (int, sequence): Number of channels of the input tensors. Output will |
|
have this number of channels as well. |
|
kernel_size (int, sequence): Size of the gaussian kernel. |
|
sigma (float, sequence): Standard deviation of the gaussian kernel. |
|
dim (int, optional): The number of dimensions of the data. |
|
Default value is 2 (spatial). |
|
""" |
|
|
|
def __init__(self, channels, kernel_size, sigma, dim=2): |
|
super(GaussianSmoothing, self).__init__() |
|
if isinstance(kernel_size, numbers.Number): |
|
kernel_size = [kernel_size] * dim |
|
if isinstance(sigma, numbers.Number): |
|
sigma = [sigma] * dim |
|
|
|
|
|
|
|
kernel = 1 |
|
meshgrids = torch.meshgrid( |
|
[torch.arange(size, dtype=torch.float32) for size in kernel_size] |
|
) |
|
for size, std, mgrid in zip(kernel_size, sigma, meshgrids): |
|
mean = (size - 1) / 2 |
|
kernel *= ( |
|
1 |
|
/ (std * math.sqrt(2 * math.pi)) |
|
* torch.exp(-(((mgrid - mean) / std) ** 2) / 2) |
|
) |
|
|
|
|
|
kernel = kernel / torch.sum(kernel) |
|
|
|
|
|
kernel = kernel.view(1, 1, *kernel.size()) |
|
kernel = kernel.repeat(channels, *[1] * (kernel.dim() - 1)) |
|
|
|
self.register_buffer("weight", kernel) |
|
self.groups = channels |
|
|
|
if dim == 1: |
|
self.conv = F.conv1d |
|
elif dim == 2: |
|
self.conv = F.conv2d |
|
elif dim == 3: |
|
self.conv = F.conv3d |
|
else: |
|
raise RuntimeError( |
|
"Only 1, 2 and 3 dimensions are supported. Received {}.".format(dim) |
|
) |
|
self.pad_size = int(kernel_size[0] / 2) |
|
|
|
def forward(self, input): |
|
""" |
|
Apply gaussian filter to input. |
|
Arguments: |
|
input (torch.Tensor): Input to apply gaussian filter on. |
|
Returns: |
|
filtered (torch.Tensor): Filtered output. |
|
""" |
|
input = F.pad( |
|
input, |
|
(self.pad_size, self.pad_size, self.pad_size, self.pad_size), |
|
mode="reflect", |
|
) |
|
return self.conv(input, weight=self.weight, groups=self.groups) |
|
|
|
|
|
class dim_reduction(nn.Module): |
|
def __init__(self, V): |
|
super(dim_reduction, self).__init__() |
|
self.V = V |
|
|
|
def forward(self, input): |
|
return torch.matmul(input, self.V.to(input.device)) |
|
|
|
|
|
def get_ensemble( |
|
models, |
|
sigma_gf, |
|
kernel_size_gf, |
|
combination, |
|
V_reduction, |
|
warp=False, |
|
theta_warp=None, |
|
): |
|
device = torch.device("cuda" if torch.cuda.is_available() else "cpu") |
|
|
|
|
|
|
|
feature_extractor_ensemble = [] |
|
if sigma_gf is not None: |
|
|
|
gaussian_filtering = GaussianSmoothing(3, kernel_size_gf, sigma_gf) |
|
if V_reduction is None: |
|
for model in models: |
|
feature_extractor_model = nn.DataParallel( |
|
nn.Sequential( |
|
gaussian_filtering, |
|
feature_extractor( |
|
model=model, warp=warp, theta_warp=theta_warp |
|
), |
|
) |
|
).to(device) |
|
feature_extractor_ensemble.append(feature_extractor_model) |
|
if combination: |
|
feature_extractor_model = nn.DataParallel( |
|
feature_extractor(model=model, warp=warp, theta_warp=theta_warp) |
|
).to(device) |
|
feature_extractor_ensemble.append(feature_extractor_model) |
|
|
|
else: |
|
for i, model in enumerate(models): |
|
feature_extractor_model = nn.DataParallel( |
|
nn.Sequential( |
|
gaussian_filtering, |
|
feature_extractor( |
|
model=model, warp=warp, theta_warp=theta_warp |
|
), |
|
dim_reduction(V_reduction[i]), |
|
) |
|
).to(device) |
|
feature_extractor_ensemble.append(feature_extractor_model) |
|
if combination: |
|
feature_extractor_model = nn.DataParallel( |
|
nn.Sequential( |
|
feature_extractor( |
|
model=model, warp=warp, theta_warp=theta_warp |
|
), |
|
dim_reduction(V_reduction[i]), |
|
) |
|
).to(device) |
|
feature_extractor_ensemble.append(feature_extractor_model) |
|
|
|
else: |
|
if V_reduction is None: |
|
for model in models: |
|
feature_extractor_model = nn.DataParallel( |
|
feature_extractor(model=model, warp=warp, theta_warp=theta_warp) |
|
).to(device) |
|
feature_extractor_ensemble.append(feature_extractor_model) |
|
else: |
|
for i, model in enumerate(models): |
|
feature_extractor_model = nn.DataParallel( |
|
nn.Sequential( |
|
feature_extractor( |
|
model=model, warp=warp, theta_warp=theta_warp |
|
), |
|
dim_reduction(V_reduction[i]), |
|
) |
|
).to(device) |
|
feature_extractor_ensemble.append(feature_extractor_model) |
|
|
|
return feature_extractor_ensemble |
|
|
|
|
|
def extract_features(imgs, feature_extractor_ensemble, dim): |
|
|
|
|
|
features = torch.zeros(imgs.shape[0], len(feature_extractor_ensemble), dim) |
|
for i, feature_extractor_model in enumerate(feature_extractor_ensemble): |
|
|
|
features_model = feature_extractor_model(imgs) |
|
features[:, i, :] = features_model |
|
|
|
return features |
|
|
|
|
|
def prepare_models( |
|
model_backbones, |
|
input_size, |
|
model_roots, |
|
kernel_size_attack, |
|
sigma_attack, |
|
combination, |
|
using_subspace, |
|
V_reduction_root, |
|
): |
|
device = torch.device("cuda" if torch.cuda.is_available() else "cpu") |
|
|
|
backbone_dict = { |
|
"IR_50": IR_50(input_size), |
|
"IR_152": IR_152(input_size), |
|
"ResNet_50": ResNet_50(input_size), |
|
"ResNet_152": ResNet_152(input_size), |
|
} |
|
|
|
print("Loading Attack Backbone Checkpoint '{}'".format(model_roots)) |
|
print("=" * 20) |
|
|
|
models_attack = [] |
|
for i in range(len(model_backbones)): |
|
model = backbone_dict[model_backbones[i]] |
|
state_dict = torch.hub.load_state_dict_from_url( |
|
model_roots[i], map_location=device, progress=True |
|
) |
|
model.load_state_dict(state_dict) |
|
models_attack.append(model) |
|
|
|
if using_subspace: |
|
V_reduction = [] |
|
for i in range(len(model_backbones)): |
|
V_reduction.append(torch.tensor(np.load(V_reduction_root[i]))) |
|
|
|
dim = V_reduction[0].shape[1] |
|
else: |
|
V_reduction = None |
|
dim = 512 |
|
|
|
return models_attack, V_reduction, dim |
|
|
|
|
|
def prepare_data( |
|
query_data_root, target_data_root, freq, batch_size, warp=False, theta_warp=None |
|
): |
|
data = datasets.ImageFolder(query_data_root, tensor_transform) |
|
|
|
subset_query = list(range(0, len(data), freq)) |
|
subset_gallery = [x for x in list(range(0, len(data))) if x not in subset_query] |
|
query_set = torch.utils.data.Subset(data, subset_query) |
|
gallery_set = torch.utils.data.Subset(data, subset_gallery) |
|
|
|
if target_data_root is not None: |
|
target_data = datasets.ImageFolder(target_data_root, tensor_transform) |
|
target_loader = torch.utils.data.DataLoader(target_data, batch_size=batch_size) |
|
else: |
|
target_loader = None |
|
|
|
query_loader = torch.utils.data.DataLoader(query_set, batch_size=batch_size) |
|
gallery_loader = torch.utils.data.DataLoader(gallery_set, batch_size=batch_size) |
|
|
|
return query_loader, gallery_loader, target_loader |
|
|
|
|
|
def prepare_dir_vec(dir_vec_extractor, imgs, dim, combination): |
|
dir_vec = extract_features(imgs, dir_vec_extractor, dim).detach().cpu() |
|
if combination: |
|
dir_vec = torch.repeat_interleave(dir_vec, 2, 1) |
|
return dir_vec |
|
|