# CHM-Corr Classifier import argparse import json import pickle import random from itertools import product import numpy as np import torch import torch.nn as nn import torchvision.transforms as transforms from torch.utils.data import DataLoader from torchvision.datasets import ImageFolder from tqdm import tqdm from common.evaluation import Evaluator from model import chmnet from model.base.geometry import Geometry from Utils import ( CosineCustomDataset, PairedLayer4Extractor, compute_spatial_similarity, generate_mask, normalize_array, get_transforms, arg_topK, ) # Setting the random seed random.seed(42) # Helper Function to_np = lambda x: x.data.to("cpu").numpy() # CHMNet Config chm_args = dict( { "alpha": [0.05, 0.1], "img_size": 240, "ktype": "psi", "load": "pas_psi.pt", } ) class CHMGridTransfer: def __init__( self, query_image, support_set, support_set_labels, train_folder, top_N, top_K, binarization_threshold, chm_source_transform, chm_target_transform, cosine_source_transform, cosine_target_transform, batch_size=64, ): self.N = top_N self.K = top_K self.BS = batch_size self.chm_source_transform = chm_source_transform self.chm_target_transform = chm_target_transform self.cosine_source_transform = cosine_source_transform self.cosine_target_transform = cosine_target_transform self.source_embeddings = None self.target_embeddings = None self.correspondence_map = None self.similarity_maps = None self.reverse_similarity_maps = None self.transferred_points = None self.binarization_threshold = binarization_threshold # UPDATE THIS self.q = query_image self.support_set = support_set self.labels_ss = support_set_labels def build(self): # C.M.H test_ds = CosineCustomDataset( query_image=self.q, supporting_set=self.support_set, source_transform=self.chm_source_transform, target_transform=self.chm_target_transform, ) test_dl = DataLoader(test_ds, batch_size=self.BS, shuffle=False) self.find_correspondences(test_dl) # LAYER 4s test_ds = CosineCustomDataset( query_image=self.q, supporting_set=self.support_set, source_transform=self.cosine_source_transform, target_transform=self.cosine_target_transform, ) test_dl = DataLoader(test_ds, batch_size=self.BS, shuffle=False) self.compute_embeddings(test_dl) self.compute_similarity_map() def find_correspondences(self, test_dl): model = chmnet.CHMNet(chm_args["ktype"]) model.load_state_dict( torch.load(chm_args["load"], map_location=torch.device("cpu")) ) Evaluator.initialize(chm_args["alpha"]) Geometry.initialize(img_size=chm_args["img_size"]) grid_results = [] transferred_points = [] # FIXED GRID HARD CODED fixed_src_grid_points = list( product( np.linspace(1 + 17, 240 - 17 - 1, 7), np.linspace(1 + 17, 240 - 17 - 1, 7), ) ) fixed_src_grid_points = np.asarray(fixed_src_grid_points, dtype=np.float64).T with torch.no_grad(): model.eval() for idx, batch in enumerate(tqdm(test_dl)): keypoints = ( torch.tensor(fixed_src_grid_points) .unsqueeze(0) .repeat(batch["src_img"].shape[0], 1, 1) ) n_pts = torch.tensor( np.asarray(batch["src_img"].shape[0] * [49]), dtype=torch.long ) corr_matrix = model(batch["src_img"], batch["trg_img"]) prd_kps = Geometry.transfer_kps( corr_matrix, keypoints, n_pts, normalized=False ) transferred_points.append(prd_kps.cpu().numpy()) for tgt_points in prd_kps: tgt_grid = [] for x, y in zip(tgt_points[0], tgt_points[1]): tgt_grid.append( [int(((x + 1) / 2.0) * 7), int(((y + 1) / 2.0) * 7)] ) grid_results.append(tgt_grid) self.correspondence_map = grid_results self.transferred_points = np.vstack(transferred_points) def compute_embeddings(self, test_dl): paired_extractor = PairedLayer4Extractor() source_embeddings = [] target_embeddings = [] with torch.no_grad(): for idx, batch in enumerate(test_dl): s_e, t_e = paired_extractor((batch["src_img"], batch["trg_img"])) source_embeddings.append(s_e) target_embeddings.append(t_e) # EMBEDDINGS self.source_embeddings = torch.cat(source_embeddings, axis=0) self.target_embeddings = torch.cat(target_embeddings, axis=0) def compute_similarity_map(self): CosSim = nn.CosineSimilarity(dim=0, eps=1e-6) similarity_maps = [] rsimilarity_maps = [] grid = [] for i in range(7): for j in range(7): grid.append([i, j]) # Compute for all image pairs for i in range(len(self.correspondence_map)): cosine_map = np.zeros((7, 7)) reverse_cosine_map = np.zeros((7, 7)) # calculate cosine based on the chm corr. map for S, T in zip(grid, self.correspondence_map[i]): v1 = self.source_embeddings[i][:, S[0], S[1]] v2 = self.target_embeddings[i][:, T[0], T[1]] covalue = CosSim(v1, v2) cosine_map[S[0], S[1]] = covalue reverse_cosine_map[T[0], T[1]] = covalue similarity_maps.append(cosine_map) rsimilarity_maps.append(reverse_cosine_map) self.similarity_maps = similarity_maps self.reverse_similarity_maps = rsimilarity_maps def compute_score_using_cc(self): # CC MAPS SIMS_source, SIMS_target = [], [] for i in range(len(self.source_embeddings)): simA, simB = compute_spatial_similarity( to_np(self.source_embeddings[i]), to_np(self.target_embeddings[i]) ) SIMS_source.append(simA) SIMS_target.append(simB) SIMS_source = np.stack(SIMS_source, axis=0) # SIMS_target = np.stack(SIMS_target, axis=0) top_cos_values = [] for i in range(len(self.similarity_maps)): cosine_value = np.multiply( self.similarity_maps[i], generate_mask( normalize_array(SIMS_source[i]), t=self.binarization_threshold ), ) top_5_indicies = np.argsort(cosine_value.T.reshape(-1))[::-1][:5] mean_of_top_5 = np.mean( [cosine_value.T.reshape(-1)[x] for x in top_5_indicies] ) top_cos_values.append(np.mean(mean_of_top_5)) return top_cos_values def compute_score_using_custom_points(self, selected_keypoint_masks): top_cos_values = [] for i in range(len(self.similarity_maps)): cosine_value = np.multiply(self.similarity_maps[i], selected_keypoint_masks) top_indicies = np.argsort(cosine_value.T.reshape(-1))[::-1] mean_of_tops = np.mean( [cosine_value.T.reshape(-1)[x] for x in top_indicies] ) top_cos_values.append(np.mean(mean_of_tops)) return top_cos_values def export(self): storage = { "N": self.N, "K": self.K, "source_embeddings": self.source_embeddings, "target_embeddings": self.target_embeddings, "correspondence_map": self.correspondence_map, "similarity_maps": self.similarity_maps, "T": self.binarization_threshold, "query": self.q, "support_set": self.support_set, "labels_for_support_set": self.labels_ss, "rsimilarity_maps": self.reverse_similarity_maps, "transferred_points": self.transferred_points, } return ModifiableCHMResults(storage) class ModifiableCHMResults: def __init__(self, storage): self.N = storage["N"] self.K = storage["K"] self.source_embeddings = storage["source_embeddings"] self.target_embeddings = storage["target_embeddings"] self.correspondence_map = storage["correspondence_map"] self.similarity_maps = storage["similarity_maps"] self.T = storage["T"] self.q = storage["query"] self.support_set = storage["support_set"] self.labels_ss = storage["labels_for_support_set"] self.rsimilarity_maps = storage["rsimilarity_maps"] self.transferred_points = storage["transferred_points"] self.similarity_maps_masked = None self.SIMS_source = None self.SIMS_target = None self.masked_sim_values = [] self.top_cos_values = [] def compute_score_using_cc(self): # CC MAPS SIMS_source, SIMS_target = [], [] for i in range(len(self.source_embeddings)): simA, simB = compute_spatial_similarity( to_np(self.source_embeddings[i]), to_np(self.target_embeddings[i]) ) SIMS_source.append(simA) SIMS_target.append(simB) SIMS_source = np.stack(SIMS_source, axis=0) SIMS_target = np.stack(SIMS_target, axis=0) self.SIMS_source = SIMS_source self.SIMS_target = SIMS_target top_cos_values = [] for i in range(len(self.similarity_maps)): masked_sim_values = np.multiply( self.similarity_maps[i], generate_mask(normalize_array(SIMS_source[i]), t=self.T), ) self.masked_sim_values.append(masked_sim_values) top_5_indicies = np.argsort(masked_sim_values.T.reshape(-1))[::-1][:5] mean_of_top_5 = np.mean( [masked_sim_values.T.reshape(-1)[x] for x in top_5_indicies] ) top_cos_values.append(np.mean(mean_of_top_5)) self.top_cos_values = top_cos_values return top_cos_values def compute_score_using_custom_points(self, selected_keypoint_masks): top_cos_values = [] similarity_maps_masked = [] for i in range(len(self.similarity_maps)): cosine_value = np.multiply(self.similarity_maps[i], selected_keypoint_masks) similarity_maps_masked.append(cosine_value) top_indicies = np.argsort(cosine_value.T.reshape(-1))[::-1] mean_of_tops = np.mean( [cosine_value.T.reshape(-1)[x] for x in top_indicies] ) top_cos_values.append(np.mean(mean_of_tops)) self.similarity_maps_masked = similarity_maps_masked return top_cos_values def predict_using_cc(self): top_cos_values = self.compute_score_using_cc() # Predict prediction = np.argmax( np.bincount( [self.labels_ss[x] for x in np.argsort(top_cos_values)[::-1][: self.K]] ) ) prediction_weight = np.max( np.bincount( [self.labels_ss[x] for x in np.argsort(top_cos_values)[::-1][: self.K]] ) ) reranked_nns_idx = [x for x in np.argsort(top_cos_values)[::-1]] reranked_nns_files = [self.support_set[x] for x in reranked_nns_idx] topK_idx = [ x for x in np.argsort(top_cos_values)[::-1] if self.labels_ss[x] == prediction ] topK_files = [self.support_set[x] for x in topK_idx] topK_cmaps = [self.correspondence_map[x] for x in topK_idx] topK_similarity_maps = [self.similarity_maps[x] for x in topK_idx] topK_rsimilarity_maps = [self.rsimilarity_maps[x] for x in topK_idx] topK_transfered_points = [self.transferred_points[x] for x in topK_idx] predicted_folder_name = topK_files[0].split("/")[-2] return ( topK_idx, prediction, predicted_folder_name, prediction_weight, topK_files[: self.K], reranked_nns_files[: self.K], topK_cmaps[: self.K], topK_similarity_maps[: self.K], topK_rsimilarity_maps[: self.K], topK_transfered_points[: self.K], ) def predict_custom_pairs(self, selected_keypoint_masks): top_cos_values = self.compute_score_using_custom_points(selected_keypoint_masks) # Predict prediction = np.argmax( np.bincount( [self.labels_ss[x] for x in np.argsort(top_cos_values)[::-1][: self.K]] ) ) prediction_weight = np.max( np.bincount( [self.labels_ss[x] for x in np.argsort(top_cos_values)[::-1][: self.K]] ) ) reranked_nns_idx = [x for x in np.argsort(top_cos_values)[::-1]] reranked_nns_files = [self.support_set[x] for x in reranked_nns_idx] topK_idx = [ x for x in np.argsort(top_cos_values)[::-1] if self.labels_ss[x] == prediction ] topK_files = [self.support_set[x] for x in topK_idx] topK_cmaps = [self.correspondence_map[x] for x in topK_idx] topK_similarity_maps = [self.similarity_maps[x] for x in topK_idx] topK_rsimilarity_maps = [self.rsimilarity_maps[x] for x in topK_idx] topK_transferred_points = [self.transferred_points[x] for x in topK_idx] # topK_scores = [top_cos_values[x] for x in topK_idx] topK_masked_sims = [self.similarity_maps_masked[x] for x in topK_idx] predicted_folder_name = topK_files[0].split("/")[-2] non_zero_mask = np.count_nonzero(selected_keypoint_masks) return ( topK_idx, prediction, predicted_folder_name, prediction_weight, topK_files[: self.K], reranked_nns_files[: self.K], topK_cmaps[: self.K], topK_similarity_maps[: self.K], topK_rsimilarity_maps[: self.K], topK_transferred_points[: self.K], topK_masked_sims[: self.K], non_zero_mask, ) def export_visualizations_results( reranker_output, knn_predicted_label, knn_confidence, topK_knns, K=20, N=50, T=0.55, ): """ Export all details for visualization and analysis """ non_zero_mask = 5 # default value ( topK_idx, p, pfn, pr, rfiles, reranked_nns, cmaps, sims, rsims, trns_kpts, ) = reranker_output.predict_using_cc() MASKED_COSINE_VALUES = [ np.multiply( sims[X], generate_mask( normalize_array(reranker_output.SIMS_source[topK_idx[X]]), t=T ), ) for X in range(len(sims)) ] list_of_source_points = [] list_of_target_points = [] for CK in range(len(sims)): target_keypoints = [] topk_index = arg_topK(MASKED_COSINE_VALUES[CK], topK=non_zero_mask) for i in range(non_zero_mask): # Number of Connections # Psource = point_list[topk_index[i]] x, y = trns_kpts[CK].T[topk_index[i]] Ptarget = int(((x + 1) / 2.0) * 240), int(((y + 1) / 2.0) * 240) target_keypoints.append(Ptarget) # Uniform Grid of points a = np.linspace(1 + 17, 240 - 17 - 1, 7) b = np.linspace(1 + 17, 240 - 17 - 1, 7) point_list = list(product(a, b)) list_of_source_points.append(np.asarray([point_list[x] for x in topk_index])) list_of_target_points.append(np.asarray(target_keypoints)) # EXPORT OUTPUT detailed_output = { "q": reranker_output.q, "K": K, "N": N, "knn-prediction": knn_predicted_label, "knn-prediction-confidence": knn_confidence, "knn-nearest-neighbors": topK_knns, "chm-prediction": pfn, "chm-prediction-confidence": pr, "chm-nearest-neighbors": rfiles, "correspondance_map": cmaps, "masked_cos_values": MASKED_COSINE_VALUES, "src-keypoints": list_of_source_points, "tgt-keypoints": list_of_target_points, "non_zero_mask": non_zero_mask, "transferred_kpoints": trns_kpts, } return detailed_output def chm_classify_and_visualize( query_image, kNN_results, support, TRAIN_SET, N=50, K=20, T=0.55, BS=64 ): global chm_args chm_src_t, chm_tgt_t, cos_src_t, cos_tgt_t = get_transforms("single", chm_args) knn_predicted_label, knn_confidence, topK_knns = kNN_results reranker = CHMGridTransfer( query_image=query_image, support_set=support[0], support_set_labels=support[1], train_folder=TRAIN_SET, top_N=N, top_K=K, binarization_threshold=T, chm_source_transform=chm_src_t, chm_target_transform=chm_tgt_t, cosine_source_transform=cos_src_t, cosine_target_transform=cos_tgt_t, batch_size=BS, ) # Building the reranker reranker.build() # Make a ModifiableCHMResults exported_reranker = reranker.export() # Export A details for visualizations output = export_visualizations_results( exported_reranker, knn_predicted_label, knn_confidence, topK_knns, K, N, T, ) return output