Ahsen Khaliq
add files
16aee22
# Copyright (c) Facebook, Inc. and its affiliates.
# Modified by Bowen Cheng from https://github.com/sukjunhwang/IFC
import copy
import logging
import random
import numpy as np
from typing import List, Union
import torch
from detectron2.config import configurable
from detectron2.structures import (
BitMasks,
Boxes,
BoxMode,
Instances,
)
from detectron2.data import detection_utils as utils
from detectron2.data import transforms as T
from .augmentation import build_augmentation
__all__ = ["YTVISDatasetMapper", "CocoClipDatasetMapper"]
def filter_empty_instances(instances, by_box=True, by_mask=True, box_threshold=1e-5):
"""
Filter out empty instances in an `Instances` object.
Args:
instances (Instances):
by_box (bool): whether to filter out instances with empty boxes
by_mask (bool): whether to filter out instances with empty masks
box_threshold (float): minimum width and height to be considered non-empty
Returns:
Instances: the filtered instances.
"""
assert by_box or by_mask
r = []
if by_box:
r.append(instances.gt_boxes.nonempty(threshold=box_threshold))
if instances.has("gt_masks") and by_mask:
r.append(instances.gt_masks.nonempty())
if not r:
return instances
m = r[0]
for x in r[1:]:
m = m & x
instances.gt_ids[~m] = -1
return instances
def _get_dummy_anno(num_classes):
return {
"iscrowd": 0,
"category_id": num_classes,
"id": -1,
"bbox": np.array([0, 0, 0, 0]),
"bbox_mode": BoxMode.XYXY_ABS,
"segmentation": [np.array([0.0] * 6)]
}
def ytvis_annotations_to_instances(annos, image_size):
"""
Create an :class:`Instances` object used by the models,
from instance annotations in the dataset dict.
Args:
annos (list[dict]): a list of instance annotations in one image, each
element for one instance.
image_size (tuple): height, width
Returns:
Instances:
It will contain fields "gt_boxes", "gt_classes", "gt_ids",
"gt_masks", if they can be obtained from `annos`.
This is the format that builtin models expect.
"""
boxes = [BoxMode.convert(obj["bbox"], obj["bbox_mode"], BoxMode.XYXY_ABS) for obj in annos]
target = Instances(image_size)
target.gt_boxes = Boxes(boxes)
classes = [int(obj["category_id"]) for obj in annos]
classes = torch.tensor(classes, dtype=torch.int64)
target.gt_classes = classes
ids = [int(obj["id"]) for obj in annos]
ids = torch.tensor(ids, dtype=torch.int64)
target.gt_ids = ids
if len(annos) and "segmentation" in annos[0]:
segms = [obj["segmentation"] for obj in annos]
masks = []
for segm in segms:
assert segm.ndim == 2, "Expect segmentation of 2 dimensions, got {}.".format(
segm.ndim
)
# mask array
masks.append(segm)
# torch.from_numpy does not support array with negative stride.
masks = BitMasks(
torch.stack([torch.from_numpy(np.ascontiguousarray(x)) for x in masks])
)
target.gt_masks = masks
return target
class YTVISDatasetMapper:
"""
A callable which takes a dataset dict in YouTube-VIS Dataset format,
and map it into a format used by the model.
"""
@configurable
def __init__(
self,
is_train: bool,
*,
augmentations: List[Union[T.Augmentation, T.Transform]],
image_format: str,
use_instance_mask: bool = False,
sampling_frame_num: int = 2,
sampling_frame_range: int = 5,
sampling_frame_shuffle: bool = False,
num_classes: int = 40,
):
"""
NOTE: this interface is experimental.
Args:
is_train: whether it's used in training or inference
augmentations: a list of augmentations or deterministic transforms to apply
image_format: an image format supported by :func:`detection_utils.read_image`.
use_instance_mask: whether to process instance segmentation annotations, if available
"""
# fmt: off
self.is_train = is_train
self.augmentations = T.AugmentationList(augmentations)
self.image_format = image_format
self.use_instance_mask = use_instance_mask
self.sampling_frame_num = sampling_frame_num
self.sampling_frame_range = sampling_frame_range
self.sampling_frame_shuffle = sampling_frame_shuffle
self.num_classes = num_classes
# fmt: on
logger = logging.getLogger(__name__)
mode = "training" if is_train else "inference"
logger.info(f"[DatasetMapper] Augmentations used in {mode}: {augmentations}")
@classmethod
def from_config(cls, cfg, is_train: bool = True):
augs = build_augmentation(cfg, is_train)
sampling_frame_num = cfg.INPUT.SAMPLING_FRAME_NUM
sampling_frame_range = cfg.INPUT.SAMPLING_FRAME_RANGE
sampling_frame_shuffle = cfg.INPUT.SAMPLING_FRAME_SHUFFLE
ret = {
"is_train": is_train,
"augmentations": augs,
"image_format": cfg.INPUT.FORMAT,
"use_instance_mask": cfg.MODEL.MASK_ON,
"sampling_frame_num": sampling_frame_num,
"sampling_frame_range": sampling_frame_range,
"sampling_frame_shuffle": sampling_frame_shuffle,
"num_classes": cfg.MODEL.SEM_SEG_HEAD.NUM_CLASSES,
}
return ret
def __call__(self, dataset_dict):
"""
Args:
dataset_dict (dict): Metadata of one video, in YTVIS Dataset format.
Returns:
dict: a format that builtin models in detectron2 accept
"""
# TODO consider examining below deepcopy as it costs huge amount of computations.
dataset_dict = copy.deepcopy(dataset_dict) # it will be modified by code below
video_length = dataset_dict["length"]
if self.is_train:
ref_frame = random.randrange(video_length)
start_idx = max(0, ref_frame-self.sampling_frame_range)
end_idx = min(video_length, ref_frame+self.sampling_frame_range + 1)
selected_idx = np.random.choice(
np.array(list(range(start_idx, ref_frame)) + list(range(ref_frame+1, end_idx))),
self.sampling_frame_num - 1,
)
selected_idx = selected_idx.tolist() + [ref_frame]
selected_idx = sorted(selected_idx)
if self.sampling_frame_shuffle:
random.shuffle(selected_idx)
else:
selected_idx = range(video_length)
video_annos = dataset_dict.pop("annotations", None)
file_names = dataset_dict.pop("file_names", None)
if self.is_train:
_ids = set()
for frame_idx in selected_idx:
_ids.update([anno["id"] for anno in video_annos[frame_idx]])
ids = dict()
for i, _id in enumerate(_ids):
ids[_id] = i
dataset_dict["image"] = []
dataset_dict["instances"] = []
dataset_dict["file_names"] = []
for frame_idx in selected_idx:
dataset_dict["file_names"].append(file_names[frame_idx])
# Read image
image = utils.read_image(file_names[frame_idx], format=self.image_format)
utils.check_image_size(dataset_dict, image)
aug_input = T.AugInput(image)
transforms = self.augmentations(aug_input)
image = aug_input.image
image_shape = image.shape[:2] # h, w
# Pytorch's dataloader is efficient on torch.Tensor due to shared-memory,
# but not efficient on large generic data structures due to the use of pickle & mp.Queue.
# Therefore it's important to use torch.Tensor.
dataset_dict["image"].append(torch.as_tensor(np.ascontiguousarray(image.transpose(2, 0, 1))))
if (video_annos is None) or (not self.is_train):
continue
# NOTE copy() is to prevent annotations getting changed from applying augmentations
_frame_annos = []
for anno in video_annos[frame_idx]:
_anno = {}
for k, v in anno.items():
_anno[k] = copy.deepcopy(v)
_frame_annos.append(_anno)
# USER: Implement additional transformations if you have other types of data
annos = [
utils.transform_instance_annotations(obj, transforms, image_shape)
for obj in _frame_annos
if obj.get("iscrowd", 0) == 0
]
sorted_annos = [_get_dummy_anno(self.num_classes) for _ in range(len(ids))]
for _anno in annos:
idx = ids[_anno["id"]]
sorted_annos[idx] = _anno
_gt_ids = [_anno["id"] for _anno in sorted_annos]
instances = utils.annotations_to_instances(sorted_annos, image_shape, mask_format="bitmask")
instances.gt_ids = torch.tensor(_gt_ids)
if instances.has("gt_masks"):
instances.gt_boxes = instances.gt_masks.get_bounding_boxes()
instances = filter_empty_instances(instances)
else:
instances.gt_masks = BitMasks(torch.empty((0, *image_shape)))
dataset_dict["instances"].append(instances)
return dataset_dict
class CocoClipDatasetMapper:
"""
A callable which takes a COCO image which converts into multiple frames,
and map it into a format used by the model.
"""
@configurable
def __init__(
self,
is_train: bool,
*,
augmentations: List[Union[T.Augmentation, T.Transform]],
image_format: str,
use_instance_mask: bool = False,
sampling_frame_num: int = 2,
):
"""
NOTE: this interface is experimental.
Args:
is_train: whether it's used in training or inference
augmentations: a list of augmentations or deterministic transforms to apply
image_format: an image format supported by :func:`detection_utils.read_image`.
use_instance_mask: whether to process instance segmentation annotations, if available
"""
# fmt: off
self.is_train = is_train
self.augmentations = T.AugmentationList(augmentations)
self.image_format = image_format
self.use_instance_mask = use_instance_mask
self.sampling_frame_num = sampling_frame_num
# fmt: on
logger = logging.getLogger(__name__)
mode = "training" if is_train else "inference"
logger.info(f"[DatasetMapper] Augmentations used in {mode}: {augmentations}")
@classmethod
def from_config(cls, cfg, is_train: bool = True):
augs = build_augmentation(cfg, is_train)
sampling_frame_num = cfg.INPUT.SAMPLING_FRAME_NUM
ret = {
"is_train": is_train,
"augmentations": augs,
"image_format": cfg.INPUT.FORMAT,
"use_instance_mask": cfg.MODEL.MASK_ON,
"sampling_frame_num": sampling_frame_num,
}
return ret
def __call__(self, dataset_dict):
"""
Args:
dataset_dict (dict): Metadata of one image, in Detectron2 Dataset format.
Returns:
dict: a format that builtin models in detectron2 accept
"""
dataset_dict = copy.deepcopy(dataset_dict) # it will be modified by code below
img_annos = dataset_dict.pop("annotations", None)
file_name = dataset_dict.pop("file_name", None)
original_image = utils.read_image(file_name, format=self.image_format)
dataset_dict["image"] = []
dataset_dict["instances"] = []
dataset_dict["file_names"] = [file_name] * self.sampling_frame_num
for _ in range(self.sampling_frame_num):
utils.check_image_size(dataset_dict, original_image)
aug_input = T.AugInput(original_image)
transforms = self.augmentations(aug_input)
image = aug_input.image
image_shape = image.shape[:2] # h, w
# Pytorch's dataloader is efficient on torch.Tensor due to shared-memory,
# but not efficient on large generic data structures due to the use of pickle & mp.Queue.
# Therefore it's important to use torch.Tensor.
dataset_dict["image"].append(torch.as_tensor(np.ascontiguousarray(image.transpose(2, 0, 1))))
if (img_annos is None) or (not self.is_train):
continue
_img_annos = []
for anno in img_annos:
_anno = {}
for k, v in anno.items():
_anno[k] = copy.deepcopy(v)
_img_annos.append(_anno)
# USER: Implement additional transformations if you have other types of data
annos = [
utils.transform_instance_annotations(obj, transforms, image_shape)
for obj in _img_annos
if obj.get("iscrowd", 0) == 0
]
_gt_ids = list(range(len(annos)))
for idx in range(len(annos)):
if len(annos[idx]["segmentation"]) == 0:
annos[idx]["segmentation"] = [np.array([0.0] * 6)]
instances = utils.annotations_to_instances(annos, image_shape, mask_format="bitmask")
instances.gt_ids = torch.tensor(_gt_ids)
if instances.has("gt_masks"):
instances.gt_boxes = instances.gt_masks.get_bounding_boxes()
instances = filter_empty_instances(instances)
else:
instances.gt_masks = BitMasks(torch.empty((0, *image_shape)))
dataset_dict["instances"].append(instances)
return dataset_dict