try: import detectron2 except: import os os.system('pip install git+https://github.com/facebookresearch/detectron2.git') import torch from detectron2.utils.logger import setup_logger setup_logger() from detectron2.config import get_cfg import detectron2.data.transforms as T from detectron2.checkpoint import DetectionCheckpointer from detectron2.modeling import build_model from detectron2.data.detection_utils import read_image from detectron2.utils.visualizer import Visualizer from detectron2.data import MetadataCatalog import numpy as np import cv2 import os import time import pickle import gradio as gr import tqdm import matplotlib.pyplot as plt import io from PIL import Image torch.manual_seed(0) np.random.seed(0) torch.backends.cudnn.deterministic = True torch.backends.cudnn.benchmark = False from models.regnet import build_regnet_fpn_backbone import models.metadata as metadata from utils_clustering import * from base_cam import EigenCAM from pytorch_grad_cam.utils.model_targets import FasterRCNNBoxScoreTarget fullName2ab_dict = {'PASCAL-VOC':"voc", 'BDD100K':"bdd", 'KITTI':"kitti", 'Speed signs':"speed", 'NuScenes':"nu"} ab2FullName_dict = {'voc':"PASCAL-VOC", 'bdd':"BDD100K", 'kitti':"KITTI", 'speed':"Speed signs", 'nu':"NuScenes"} class Detectron2Monitor(): def __init__(self, id, backbone, confidence_threshold=0.05): self.id, self.label_list = self._get_label_list(id) self.backbone = backbone self.confidence_threshold = confidence_threshold self.cfg, self.device, self.model = self._get_model() self.label_dict = {i:label for i, label in enumerate(self.label_list)} self.eval_list = ["ID-voc-OOD-coco", "OOD-open", "voc-val"] if self.id == "voc" else ["ID-bdd-OOD-coco", "OOD-open", "voc-ood", f"{self.id}-val"] MetadataCatalog.get("custom_dataset").set(thing_classes=self.label_list) def _get_label_list(self, id): id = fullName2ab_dict[id] if id == 'voc': label_list = metadata.VOC_THING_CLASSES elif id == 'bdd': label_list = metadata.BDD_THING_CLASSES elif id == 'kitti': label_list = metadata.KITTI_THING_CLASSES elif id == 'speed' or id == 'prescan': label_list = metadata.SPEED_THING_CLASSES else: label_list = metadata.NU_THING_CLASSES return id, label_list def _get_model(self): cfg = get_cfg() cfg.merge_from_file(f"models/configs/vanilla_{self.backbone}.yaml") cfg.MODEL.WEIGHTS = f"models/weights/model_final_{self.backbone}_{self.id}.pth" cfg.MODEL.DEVICE='cpu' cfg.MODEL.ROI_HEADS.NUM_CLASSES = len(self.label_list) cfg.MODEL.ROI_HEADS.SCORE_THRESH_TEST = self.confidence_threshold model = build_model(cfg) model.eval() checkpointer = DetectionCheckpointer(model) checkpointer.load(cfg.MODEL.WEIGHTS) device = torch.device("cuda" if torch.cuda.is_available() else "cpu") model = model.to(device) return cfg, device, model def _inference(self, model, inputs): with torch.no_grad(): images = model.preprocess_image(inputs) features = model.backbone(images.tensor) proposals, _ = model.proposal_generator(images, features, None) # RPN features_ = [features[f] for f in model.roi_heads.box_in_features] box_features = model.roi_heads.box_pooler(features_, [x.proposal_boxes for x in proposals]) box_features = model.roi_heads.box_head(box_features) # features of all 1k candidates predictions = model.roi_heads.box_predictor(box_features) pred_instances, pred_inds = model.roi_heads.box_predictor.inference(predictions, proposals) pred_instances = model.roi_heads.forward_with_given_boxes(features, pred_instances) # output boxes, masks, scores, etc pred_instances = model._postprocess(pred_instances, inputs, images.image_sizes) # scale box to orig size # features of the proposed boxes feats = box_features[pred_inds].cpu().numpy() return pred_instances, feats def _load_monitors(self, clustering_algo, nb_clusters, eps=5, min_samples=10): if clustering_algo == "dbscan": with open(f"monitors/{self.id}/{self.backbone}/{clustering_algo}/eps{eps}_min_samples{min_samples}.pkl", 'rb') as f: monitors_dict = pickle.load(f) else: with open(f"monitors/{self.id}/{self.backbone}/{clustering_algo}/{nb_clusters}.pkl", 'rb') as f: monitors_dict = pickle.load(f) return monitors_dict def _evaluate(self, clustering_algo, nb_clusters, eps, min_samples): dataset_name = f"{self.id}-val" with open(f'val_feats/{self.id}/{self.backbone}/{dataset_name}_feats_tp_dict.pickle', 'rb') as f: feats_tp_dict = pickle.load(f) with open(f'val_feats/{self.id}/{self.backbone}/{dataset_name}_feats_fp_dict.pickle', 'rb') as f: feats_fp_dict = pickle.load(f) monitors_dict = self._load_monitors(clustering_algo, nb_clusters, eps, min_samples) # make verdicts on ID data data_tp = [] data_fp = [] accept_sum = {"tp": 0, "fp": 0} reject_sum = {"tp": 0, "fp": 0} for label in tqdm.tqdm(self.label_list, desc="Evaluation on ID data"): if label in monitors_dict: verdict = monitors_dict[label].make_verdicts(feats_tp_dict[label]) data_tp.append([label, len(verdict), np.sum(verdict)/len(verdict)]) accept_sum["tp"] += np.sum(verdict) reject_sum["tp"] += len(verdict) - np.sum(verdict) verdict = monitors_dict[label].make_verdicts(feats_fp_dict[label]) data_fp.append([label, len(verdict), (len(verdict)-np.sum(verdict))/len(verdict)]) accept_sum["fp"] += np.sum(verdict) reject_sum["fp"] += len(verdict) - np.sum(verdict) TPR = round((accept_sum['tp'] / (reject_sum['tp'] + accept_sum['tp'])*100), 2) FPR = round((accept_sum['fp'] / (reject_sum['fp'] + accept_sum['fp'])*100), 2) id_name = ab2FullName_dict[self.id] df_id = pd.DataFrame([[id_name, f"{TPR}%", f"{FPR}%"]], columns=["Dataset", "TPR", "FPR"]) data_ood = [] i = 0 self.eval_list.remove(dataset_name) for dataset_name in tqdm.tqdm(self.eval_list, desc="Evaluation on OOD data"): accept_sum = {"tp": 0, "fp": 0} reject_sum = {"tp": 0, "fp": 0} with open(f'val_feats/{self.id}/{self.backbone}/{dataset_name}_feats_fp_dict.pickle', 'rb') as f: feats_fp_dict = pickle.load(f) for label in self.label_list: if label in monitors_dict: verdict = monitors_dict[label].make_verdicts(feats_fp_dict[label]) accept_sum["fp"] += np.sum(verdict) reject_sum["fp"] += len(verdict) - np.sum(verdict) FPR = round((accept_sum['fp'] / (reject_sum['fp'] + accept_sum['fp'])*100), 2) data_ood.append([dataset_name, str(FPR)+"%"]) i += 1 # prepare dataframes df_ood = pd.DataFrame(data_ood, columns=["Dataset", "FPR"]) df_ood["Dataset"] = ["COCO", "Open Images"] if self.id == "voc" else ["COCO", "Open Images", "VOC-OOD"] return df_id, df_ood def _postprocess_cam(self, raw_cam, img_width, img_height): cam_orig = np.sum(raw_cam, axis=0) # [H,W] cam_orig = np.maximum(cam_orig, 0) # ReLU cam_orig -= np.min(cam_orig) cam_orig /= np.max(cam_orig) cam = cv2.resize(cam_orig, (img_width, img_height)) return cam def _fasterrcnn_reshape_transform(self, x): target_size = x['p6'].size()[-2 : ] activations = [] for key, value in x.items(): activations.append(torch.nn.functional.interpolate(torch.abs(value), target_size, mode='bilinear')) activations = torch.cat(activations, axis=1) return activations def _get_input_dict(self, original_image): height, width = original_image.shape[:2] transform_gen = T.ResizeShortestEdge( [self.cfg.INPUT.MIN_SIZE_TEST, self.cfg.INPUT.MIN_SIZE_TEST], self.cfg.INPUT.MAX_SIZE_TEST ) image = transform_gen.get_transform(original_image).apply_image(original_image) image = torch.as_tensor(image.astype("float32").transpose(2, 0, 1)) inputs = {"image": image, "height": height, "width": width} return inputs def get_output(self, monitors_dict, img): image = read_image(img, format="BGR") input_image_dict = [self._get_input_dict(image)] pred_instances, feats = self._inference(self.model, input_image_dict) detections = pred_instances[0]["instances"].to("cpu") cls_idxs = detections.pred_classes.detach().numpy() # get labels from class indices labels = [self.label_dict[i] for i in cls_idxs] # count values in labels, and return a dictionary labels_count_dict = dict((i, labels.count(i)) for i in labels) v = Visualizer(image[..., ::-1], MetadataCatalog.get("custom_dataset"), scale=1) v = v.draw_instance_predictions(detections) img_detection = v.get_image() df = pd.DataFrame(list(labels_count_dict.items()), columns=['Object', 'Count']) verdicts = [] for label, feat in zip(labels, feats): verdict = monitors_dict[label].make_verdicts(feat[np.newaxis,:])[0] verdicts.append(verdict) detections_ood = detections[[i for i, x in enumerate(verdicts) if not x]] detections_ood.pred_classes = torch.tensor([5]*len(detections_ood.pred_classes)) labels_ood = [label for label, verdict in zip(labels, verdicts) if not verdict] verdicts_ood = ["Rejected"]*len(labels_ood) df_verdict = pd.DataFrame(list(zip(labels_ood, verdicts_ood)), columns=['Object', 'Verdict']) v = Visualizer(image[..., ::-1], MetadataCatalog.get("custom_dataset"), scale=1) for box in detections_ood.pred_boxes.to('cpu'): v.draw_box(box) v.draw_text("OOD", tuple(box[:2].numpy())) v = v.get_output() img_ood = v.get_image() pred_bboxes = detections.pred_boxes.tensor.numpy().astype(np.int32) target_layers = [self.model.backbone] targets = [FasterRCNNBoxScoreTarget(labels=labels, bounding_boxes=pred_bboxes)] cam = EigenCAM(self.model, target_layers, use_cuda=False, reshape_transform=self._fasterrcnn_reshape_transform) grayscale_cam = cam(input_image_dict, targets) cam = self._postprocess_cam(grayscale_cam, input_image_dict[0]["width"], input_image_dict[0]["height"]) plt.rcParams["figure.figsize"] = (30,10) plt.imshow(img_detection[..., ::-1], interpolation='none') plt.imshow(cam, cmap='jet', alpha=0.5) plt.axis("off") img_buff = io.BytesIO() plt.savefig(img_buff, format='png', bbox_inches='tight', pad_inches=0) img_cam = Image.open(img_buff) image_dict = {} image_dict["image"] = image image_dict["cam"] = img_cam image_dict["detection"] = img_detection image_dict["verdict"] = img_ood return image_dict, df, df_verdict