haotiz's picture
initial commit
708dec4
# Copyright (c) Aishwarya Kamath & Nicolas Carion. Licensed under the Apache License 2.0. All Rights Reserved
# Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved
import json
import os
import time
from collections import defaultdict
import pycocotools.mask as mask_utils
import torchvision
from PIL import Image
# from .coco import ConvertCocoPolysToMask, make_coco_transforms
from .modulated_coco import ConvertCocoPolysToMask
def _isArrayLike(obj):
return hasattr(obj, "__iter__") and hasattr(obj, "__len__")
class LVIS:
def __init__(self, annotation_path=None):
"""Class for reading and visualizing annotations.
Args:
annotation_path (str): location of annotation file
"""
self.anns = {}
self.cats = {}
self.imgs = {}
self.img_ann_map = defaultdict(list)
self.cat_img_map = defaultdict(list)
self.dataset = {}
if annotation_path is not None:
print("Loading annotations.")
tic = time.time()
self.dataset = self._load_json(annotation_path)
print("Done (t={:0.2f}s)".format(time.time() - tic))
assert type(self.dataset) == dict, "Annotation file format {} not supported.".format(type(self.dataset))
self._create_index()
def _load_json(self, path):
with open(path, "r") as f:
return json.load(f)
def _create_index(self):
print("Creating index.")
self.img_ann_map = defaultdict(list)
self.cat_img_map = defaultdict(list)
self.anns = {}
self.cats = {}
self.imgs = {}
for ann in self.dataset["annotations"]:
self.img_ann_map[ann["image_id"]].append(ann)
self.anns[ann["id"]] = ann
for img in self.dataset["images"]:
self.imgs[img["id"]] = img
for cat in self.dataset["categories"]:
self.cats[cat["id"]] = cat
for ann in self.dataset["annotations"]:
self.cat_img_map[ann["category_id"]].append(ann["image_id"])
print("Index created.")
def get_ann_ids(self, img_ids=None, cat_ids=None, area_rng=None):
"""Get ann ids that satisfy given filter conditions.
Args:
img_ids (int array): get anns for given imgs
cat_ids (int array): get anns for given cats
area_rng (float array): get anns for a given area range. e.g [0, inf]
Returns:
ids (int array): integer array of ann ids
"""
if img_ids is not None:
img_ids = img_ids if _isArrayLike(img_ids) else [img_ids]
if cat_ids is not None:
cat_ids = cat_ids if _isArrayLike(cat_ids) else [cat_ids]
anns = []
if img_ids is not None:
for img_id in img_ids:
anns.extend(self.img_ann_map[img_id])
else:
anns = self.dataset["annotations"]
# return early if no more filtering required
if cat_ids is None and area_rng is None:
return [_ann["id"] for _ann in anns]
cat_ids = set(cat_ids)
if area_rng is None:
area_rng = [0, float("inf")]
ann_ids = [
_ann["id"]
for _ann in anns
if _ann["category_id"] in cat_ids and _ann["area"] > area_rng[0] and _ann["area"] < area_rng[1]
]
return ann_ids
def get_cat_ids(self):
"""Get all category ids.
Returns:
ids (int array): integer array of category ids
"""
return list(self.cats.keys())
def get_img_ids(self):
"""Get all img ids.
Returns:
ids (int array): integer array of image ids
"""
return list(self.imgs.keys())
def _load_helper(self, _dict, ids):
if ids is None:
return list(_dict.values())
elif _isArrayLike(ids):
return [_dict[id] for id in ids]
else:
return [_dict[ids]]
def load_anns(self, ids=None):
"""Load anns with the specified ids. If ids=None load all anns.
Args:
ids (int array): integer array of annotation ids
Returns:
anns (dict array) : loaded annotation objects
"""
return self._load_helper(self.anns, ids)
def load_cats(self, ids):
"""Load categories with the specified ids. If ids=None load all
categories.
Args:
ids (int array): integer array of category ids
Returns:
cats (dict array) : loaded category dicts
"""
return self._load_helper(self.cats, ids)
def load_imgs(self, ids):
"""Load categories with the specified ids. If ids=None load all images.
Args:
ids (int array): integer array of image ids
Returns:
imgs (dict array) : loaded image dicts
"""
return self._load_helper(self.imgs, ids)
def download(self, save_dir, img_ids=None):
"""Download images from mscoco.org server.
Args:
save_dir (str): dir to save downloaded images
img_ids (int array): img ids of images to download
"""
imgs = self.load_imgs(img_ids)
if not os.path.exists(save_dir):
os.makedirs(save_dir)
for img in imgs:
file_name = os.path.join(save_dir, img["file_name"])
if not os.path.exists(file_name):
from urllib.request import urlretrieve
urlretrieve(img["coco_url"], file_name)
def ann_to_rle(self, ann):
"""Convert annotation which can be polygons, uncompressed RLE to RLE.
Args:
ann (dict) : annotation object
Returns:
ann (rle)
"""
img_data = self.imgs[ann["image_id"]]
h, w = img_data["height"], img_data["width"]
segm = ann["segmentation"]
if isinstance(segm, list):
# polygon -- a single object might consist of multiple parts
# we merge all parts into one mask rle code
rles = mask_utils.frPyObjects(segm, h, w)
rle = mask_utils.merge(rles)
elif isinstance(segm["counts"], list):
# uncompressed RLE
rle = mask_utils.frPyObjects(segm, h, w)
else:
# rle
rle = ann["segmentation"]
return rle
def ann_to_mask(self, ann):
"""Convert annotation which can be polygons, uncompressed RLE, or RLE
to binary mask.
Args:
ann (dict) : annotation object
Returns:
binary mask (numpy 2D array)
"""
rle = self.ann_to_rle(ann)
return mask_utils.decode(rle)
class LvisDetectionBase(torchvision.datasets.VisionDataset):
def __init__(self, root, annFile, transform=None, target_transform=None, transforms=None):
super(LvisDetectionBase, self).__init__(root, transforms, transform, target_transform)
self.lvis = LVIS(annFile)
self.ids = list(sorted(self.lvis.imgs.keys()))
def __getitem__(self, index):
"""
Args:
index (int): Index
Returns:
tuple: Tuple (image, target). target is the object returned by ``coco.loadAnns``.
"""
lvis = self.lvis
img_id = self.ids[index]
ann_ids = lvis.get_ann_ids(img_ids=img_id)
target = lvis.load_anns(ann_ids)
path = "/".join(self.lvis.load_imgs(img_id)[0]["coco_url"].split("/")[-2:])
img = Image.open(os.path.join(self.root, path)).convert("RGB")
if self.transforms is not None:
img, target = self.transforms(img, target)
return img, target
def __len__(self):
return len(self.ids)
class LvisDetection(LvisDetectionBase):
def __init__(self, img_folder, ann_file, transforms, return_masks=False, **kwargs):
super(LvisDetection, self).__init__(img_folder, ann_file)
self.ann_file = ann_file
self._transforms = transforms
self.prepare = ConvertCocoPolysToMask(return_masks)
def __getitem__(self, idx):
img, target = super(LvisDetection, self).__getitem__(idx)
image_id = self.ids[idx]
target = {"image_id": image_id, "annotations": target}
img, target = self.prepare(img, target)
if self._transforms is not None:
img = self._transforms(img)
return img, target, idx
def get_raw_image(self, idx):
img, target = super(LvisDetection, self).__getitem__(idx)
return img
def categories(self):
id2cat = {c["id"]: c for c in self.lvis.dataset["categories"]}
all_cats = sorted(list(id2cat.keys()))
categories = {}
for l in list(all_cats):
categories[l] = id2cat[l]['name']
return categories