Spaces:
Sleeping
Sleeping
import torch | |
from detectron2.utils.logger import setup_logger | |
setup_logger() | |
from detectron2.config import get_cfg | |
import detectron2.data.transforms as T | |
from detectron2.checkpoint import DetectionCheckpointer | |
from detectron2.modeling import build_model | |
from detectron2.data.detection_utils import read_image | |
from detectron2.utils.visualizer import Visualizer | |
from detectron2.data import MetadataCatalog | |
import numpy as np | |
import cv2 | |
import os | |
import time | |
import pickle | |
import gradio as gr | |
import tqdm | |
import matplotlib.pyplot as plt | |
import io | |
from PIL import Image | |
torch.manual_seed(0) | |
np.random.seed(0) | |
torch.backends.cudnn.deterministic = True | |
torch.backends.cudnn.benchmark = False | |
from models.regnet import build_regnet_fpn_backbone | |
import models.metadata as metadata | |
from utils_clustering import * | |
from base_cam import EigenCAM | |
from pytorch_grad_cam.utils.model_targets import FasterRCNNBoxScoreTarget | |
fullName2ab_dict = {'PASCAL-VOC':"voc", 'BDD100K':"bdd", 'KITTI':"kitti", 'Speed signs':"speed", 'NuScenes':"nu"} | |
ab2FullName_dict = {'voc':"PASCAL-VOC", 'bdd':"BDD100K", 'kitti':"KITTI", 'speed':"Speed signs", 'nu':"NuScenes"} | |
class Detectron2Monitor(): | |
def __init__(self, id, backbone, confidence_threshold=0.05): | |
self.id, self.label_list = self._get_label_list(id) | |
self.backbone = backbone | |
self.confidence_threshold = confidence_threshold | |
self.cfg, self.device, self.model = self._get_model() | |
self.label_dict = {i:label for i, label in enumerate(self.label_list)} | |
self.eval_list = ["ID-voc-OOD-coco", "OOD-open", "voc-val"] if self.id == "voc" else ["ID-bdd-OOD-coco", "OOD-open", "voc-ood", f"{self.id}-val"] | |
MetadataCatalog.get("custom_dataset").set(thing_classes=self.label_list) | |
def _get_label_list(self, id): | |
id = fullName2ab_dict[id] | |
if id == 'voc': | |
label_list = metadata.VOC_THING_CLASSES | |
elif id == 'bdd': | |
label_list = metadata.BDD_THING_CLASSES | |
elif id == 'kitti': | |
label_list = metadata.KITTI_THING_CLASSES | |
elif id == 'speed' or id == 'prescan': | |
label_list = metadata.SPEED_THING_CLASSES | |
else: | |
label_list = metadata.NU_THING_CLASSES | |
return id, label_list | |
def _get_model(self): | |
cfg = get_cfg() | |
cfg.merge_from_file(f"models/configs/vanilla_{self.backbone}.yaml") | |
cfg.MODEL.WEIGHTS = f"models/weights/model_final_{self.backbone}_{self.id}.pth" | |
cfg.MODEL.DEVICE='cpu' | |
cfg.MODEL.ROI_HEADS.NUM_CLASSES = len(self.label_list) | |
cfg.MODEL.ROI_HEADS.SCORE_THRESH_TEST = self.confidence_threshold | |
model = build_model(cfg) | |
model.eval() | |
checkpointer = DetectionCheckpointer(model) | |
checkpointer.load(cfg.MODEL.WEIGHTS) | |
device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
model = model.to(device) | |
return cfg, device, model | |
def _inference(self, model, inputs): | |
with torch.no_grad(): | |
images = model.preprocess_image(inputs) | |
features = model.backbone(images.tensor) | |
proposals, _ = model.proposal_generator(images, features, None) # RPN | |
features_ = [features[f] for f in model.roi_heads.box_in_features] | |
box_features = model.roi_heads.box_pooler(features_, [x.proposal_boxes for x in proposals]) | |
box_features = model.roi_heads.box_head(box_features) # features of all 1k candidates | |
predictions = model.roi_heads.box_predictor(box_features) | |
pred_instances, pred_inds = model.roi_heads.box_predictor.inference(predictions, proposals) | |
pred_instances = model.roi_heads.forward_with_given_boxes(features, pred_instances) | |
# output boxes, masks, scores, etc | |
pred_instances = model._postprocess(pred_instances, inputs, images.image_sizes) # scale box to orig size | |
# features of the proposed boxes | |
feats = box_features[pred_inds].cpu().numpy() | |
return pred_instances, feats | |
def _load_monitors(self, clustering_algo, nb_clusters, eps=5, min_samples=10): | |
if clustering_algo == "dbscan": | |
with open(f"monitors/{self.id}/{self.backbone}/{clustering_algo}/eps{eps}_min_samples{min_samples}.pkl", 'rb') as f: | |
monitors_dict = pickle.load(f) | |
else: | |
with open(f"monitors/{self.id}/{self.backbone}/{clustering_algo}/{nb_clusters}.pkl", 'rb') as f: | |
monitors_dict = pickle.load(f) | |
return monitors_dict | |
def _evaluate(self, clustering_algo, nb_clusters, eps, min_samples): | |
dataset_name = f"{self.id}-val" | |
with open(f'val_feats/{self.id}/{self.backbone}/{dataset_name}_feats_tp_dict.pickle', 'rb') as f: | |
feats_tp_dict = pickle.load(f) | |
with open(f'val_feats/{self.id}/{self.backbone}/{dataset_name}_feats_fp_dict.pickle', 'rb') as f: | |
feats_fp_dict = pickle.load(f) | |
monitors_dict = self._load_monitors(clustering_algo, nb_clusters, eps, min_samples) | |
# make verdicts on ID data | |
data_tp = [] | |
data_fp = [] | |
accept_sum = {"tp": 0, "fp": 0} | |
reject_sum = {"tp": 0, "fp": 0} | |
for label in tqdm.tqdm(self.label_list, desc="Evaluation on ID data"): | |
if label in monitors_dict: | |
verdict = monitors_dict[label].make_verdicts(feats_tp_dict[label]) | |
data_tp.append([label, len(verdict), np.sum(verdict)/len(verdict)]) | |
accept_sum["tp"] += np.sum(verdict) | |
reject_sum["tp"] += len(verdict) - np.sum(verdict) | |
verdict = monitors_dict[label].make_verdicts(feats_fp_dict[label]) | |
data_fp.append([label, len(verdict), (len(verdict)-np.sum(verdict))/len(verdict)]) | |
accept_sum["fp"] += np.sum(verdict) | |
reject_sum["fp"] += len(verdict) - np.sum(verdict) | |
TPR = round((accept_sum['tp'] / (reject_sum['tp'] + accept_sum['tp'])*100), 2) | |
FPR = round((accept_sum['fp'] / (reject_sum['fp'] + accept_sum['fp'])*100), 2) | |
id_name = ab2FullName_dict[self.id] | |
df_id = pd.DataFrame([[id_name, f"{TPR}%", f"{FPR}%"]], columns=["Dataset", "TPR", "FPR"]) | |
data_ood = [] | |
i = 0 | |
self.eval_list.remove(dataset_name) | |
for dataset_name in tqdm.tqdm(self.eval_list, desc="Evaluation on OOD data"): | |
accept_sum = {"tp": 0, "fp": 0} | |
reject_sum = {"tp": 0, "fp": 0} | |
with open(f'val_feats/{self.id}/{self.backbone}/{dataset_name}_feats_fp_dict.pickle', 'rb') as f: | |
feats_fp_dict = pickle.load(f) | |
for label in self.label_list: | |
if label in monitors_dict: | |
verdict = monitors_dict[label].make_verdicts(feats_fp_dict[label]) | |
accept_sum["fp"] += np.sum(verdict) | |
reject_sum["fp"] += len(verdict) - np.sum(verdict) | |
FPR = round((accept_sum['fp'] / (reject_sum['fp'] + accept_sum['fp'])*100), 2) | |
data_ood.append([dataset_name, str(FPR)+"%"]) | |
i += 1 | |
# prepare dataframes | |
df_ood = pd.DataFrame(data_ood, columns=["Dataset", "FPR"]) | |
df_ood["Dataset"] = ["COCO", "Open Images"] if self.id == "voc" else ["COCO", "Open Images", "VOC-OOD"] | |
return df_id, df_ood | |
def _postprocess_cam(self, raw_cam, img_width, img_height): | |
cam_orig = np.sum(raw_cam, axis=0) # [H,W] | |
cam_orig = np.maximum(cam_orig, 0) # ReLU | |
cam_orig -= np.min(cam_orig) | |
cam_orig /= np.max(cam_orig) | |
cam = cv2.resize(cam_orig, (img_width, img_height)) | |
return cam | |
def _fasterrcnn_reshape_transform(self, x): | |
target_size = x['p6'].size()[-2 : ] | |
activations = [] | |
for key, value in x.items(): | |
activations.append(torch.nn.functional.interpolate(torch.abs(value), target_size, mode='bilinear')) | |
activations = torch.cat(activations, axis=1) | |
return activations | |
def _get_input_dict(self, original_image): | |
height, width = original_image.shape[:2] | |
transform_gen = T.ResizeShortestEdge( | |
[self.cfg.INPUT.MIN_SIZE_TEST, self.cfg.INPUT.MIN_SIZE_TEST], self.cfg.INPUT.MAX_SIZE_TEST | |
) | |
image = transform_gen.get_transform(original_image).apply_image(original_image) | |
image = torch.as_tensor(image.astype("float32").transpose(2, 0, 1)) | |
inputs = {"image": image, "height": height, "width": width} | |
return inputs | |
def get_output(self, monitors_dict, img): | |
image = read_image(img, format="BGR") | |
input_image_dict = [self._get_input_dict(image)] | |
pred_instances, feats = self._inference(self.model, input_image_dict) | |
detections = pred_instances[0]["instances"].to("cpu") | |
cls_idxs = detections.pred_classes.detach().numpy() | |
# get labels from class indices | |
labels = [self.label_dict[i] for i in cls_idxs] | |
# count values in labels, and return a dictionary | |
labels_count_dict = dict((i, labels.count(i)) for i in labels) | |
v = Visualizer(image[..., ::-1], MetadataCatalog.get("custom_dataset"), scale=1) | |
v = v.draw_instance_predictions(detections) | |
img_detection = v.get_image() | |
df = pd.DataFrame(list(labels_count_dict.items()), columns=['Object', 'Count']) | |
verdicts = [] | |
for label, feat in zip(labels, feats): | |
verdict = monitors_dict[label].make_verdicts(feat[np.newaxis,:])[0] | |
verdicts.append(verdict) | |
detections_ood = detections[[i for i, x in enumerate(verdicts) if not x]] | |
detections_ood.pred_classes = torch.tensor([5]*len(detections_ood.pred_classes)) | |
labels_ood = [label for label, verdict in zip(labels, verdicts) if not verdict] | |
verdicts_ood = ["Rejected"]*len(labels_ood) | |
df_verdict = pd.DataFrame(list(zip(labels_ood, verdicts_ood)), columns=['Object', 'Verdict']) | |
v = Visualizer(image[..., ::-1], MetadataCatalog.get("custom_dataset"), scale=1) | |
for box in detections_ood.pred_boxes.to('cpu'): | |
v.draw_box(box) | |
v.draw_text("OOD", tuple(box[:2].numpy())) | |
v = v.get_output() | |
img_ood = v.get_image() | |
pred_bboxes = detections.pred_boxes.tensor.numpy().astype(np.int32) | |
target_layers = [self.model.backbone] | |
targets = [FasterRCNNBoxScoreTarget(labels=labels, bounding_boxes=pred_bboxes)] | |
cam = EigenCAM(self.model, | |
target_layers, | |
use_cuda=False, | |
reshape_transform=self._fasterrcnn_reshape_transform) | |
grayscale_cam = cam(input_image_dict, targets) | |
cam = self._postprocess_cam(grayscale_cam, input_image_dict[0]["width"], input_image_dict[0]["height"]) | |
plt.rcParams["figure.figsize"] = (30,10) | |
plt.imshow(img_detection[..., ::-1], interpolation='none') | |
plt.imshow(cam, cmap='jet', alpha=0.5) | |
plt.axis("off") | |
img_buff = io.BytesIO() | |
plt.savefig(img_buff, format='png', bbox_inches='tight', pad_inches=0) | |
img_cam = Image.open(img_buff) | |
image_dict = {} | |
image_dict["image"] = image | |
image_dict["cam"] = img_cam | |
image_dict["detection"] = img_detection | |
image_dict["verdict"] = img_ood | |
return image_dict, df, df_verdict |