Spaces:
Runtime error
Runtime error
import torch | |
import numpy as np | |
import math | |
import base64 | |
import collections | |
import pycocotools.mask as mask_utils | |
from maskrcnn_benchmark.structures.bounding_box import BoxList | |
from maskrcnn_benchmark.structures.segmentation_mask import SegmentationMask | |
class LabelLoader(object): | |
def __init__(self, labelmap, extra_fields=(), filter_duplicate_relations=False, ignore_attr=None, ignore_rel=None, | |
mask_mode="poly"): | |
self.labelmap = labelmap | |
self.extra_fields = extra_fields | |
self.supported_fields = ["class", "conf", "attributes", 'scores_all', 'boxes_all', 'feature', "mask"] | |
self.filter_duplicate_relations = filter_duplicate_relations | |
self.ignore_attr = set(ignore_attr) if ignore_attr != None else set() | |
self.ignore_rel = set(ignore_rel) if ignore_rel != None else set() | |
assert mask_mode == "poly" or mask_mode == "mask" | |
self.mask_mode = mask_mode | |
def __call__(self, annotations, img_size, remove_empty=False, load_fields=None): | |
boxes = [obj["rect"] for obj in annotations] | |
boxes = torch.as_tensor(boxes).reshape(-1, 4) | |
target = BoxList(boxes, img_size, mode="xyxy") | |
if load_fields is None: | |
load_fields = self.extra_fields | |
for field in load_fields: | |
assert field in self.supported_fields, "Unsupported field {}".format(field) | |
if field == "class": | |
classes = self.add_classes(annotations) | |
target.add_field("labels", classes) | |
elif field == "conf": | |
confidences = self.add_confidences(annotations) | |
target.add_field("scores", confidences) | |
elif field == "attributes": | |
attributes = self.add_attributes(annotations) | |
target.add_field("attributes", attributes) | |
elif field == "scores_all": | |
scores_all = self.add_scores_all(annotations) | |
target.add_field("scores_all", scores_all) | |
elif field == "boxes_all": | |
boxes_all = self.add_boxes_all(annotations) | |
target.add_field("boxes_all", boxes_all) | |
elif field == "feature": | |
features = self.add_features(annotations) | |
target.add_field("box_features", features) | |
elif field == "mask": | |
masks, is_box_mask = self.add_masks(annotations, img_size) | |
target.add_field("masks", masks) | |
target.add_field("is_box_mask", is_box_mask) | |
target = target.clip_to_image(remove_empty=remove_empty) | |
return target | |
def get_box_mask(self, rect, img_size): | |
x1, y1, x2, y2 = rect[0], rect[1], rect[2], rect[3] | |
if self.mask_mode == "poly": | |
return [[x1, y1, x1, y2, x2, y2, x2, y1]] | |
elif self.mask_mode == "mask": | |
# note the order of height/width order in mask is opposite to image | |
mask = np.zeros([img_size[1], img_size[0]], dtype=np.uint8) | |
mask[math.floor(y1):math.ceil(y2), math.floor(x1):math.ceil(x2)] = 255 | |
encoded_mask = mask_utils.encode(np.asfortranarray(mask)) | |
encoded_mask["counts"] = encoded_mask["counts"].decode("utf-8") | |
return encoded_mask | |
def add_masks(self, annotations, img_size): | |
masks = [] | |
is_box_mask = [] | |
for obj in annotations: | |
if "mask" in obj: | |
masks.append(obj["mask"]) | |
is_box_mask.append(0) | |
else: | |
masks.append(self.get_box_mask(obj["rect"], img_size)) | |
is_box_mask.append(1) | |
masks = SegmentationMask(masks, img_size, mode=self.mask_mode) | |
is_box_mask = torch.tensor(is_box_mask) | |
return masks, is_box_mask | |
def add_classes(self, annotations): | |
class_names = [obj["class"] for obj in annotations] | |
classes = [None] * len(class_names) | |
for i in range(len(class_names)): | |
classes[i] = self.labelmap['class_to_ind'][class_names[i]] | |
return torch.tensor(classes) | |
def add_confidences(self, annotations): | |
confidences = [] | |
for obj in annotations: | |
if "conf" in obj: | |
confidences.append(obj["conf"]) | |
else: | |
confidences.append(1.0) | |
return torch.tensor(confidences) | |
def add_attributes(self, annotations): | |
# the maximal number of attributes per object is 16 | |
attributes = [[0] * 16 for _ in range(len(annotations))] | |
for i, obj in enumerate(annotations): | |
for j, attr in enumerate(obj["attributes"]): | |
attributes[i][j] = self.labelmap['attribute_to_ind'][attr] | |
return torch.tensor(attributes) | |
def add_features(self, annotations): | |
features = [] | |
for obj in annotations: | |
features.append(np.frombuffer(base64.b64decode(obj['feature']), np.float32)) | |
return torch.tensor(features) | |
def add_scores_all(self, annotations): | |
scores_all = [] | |
for obj in annotations: | |
scores_all.append(np.frombuffer(base64.b64decode(obj['scores_all']), np.float32)) | |
return torch.tensor(scores_all) | |
def add_boxes_all(self, annotations): | |
boxes_all = [] | |
for obj in annotations: | |
boxes_all.append(np.frombuffer(base64.b64decode(obj['boxes_all']), np.float32).reshape(-1, 4)) | |
return torch.tensor(boxes_all) | |
def relation_loader(self, relation_annos, target): | |
if self.filter_duplicate_relations: | |
# Filter out dupes! | |
all_rel_sets = collections.defaultdict(list) | |
for triplet in relation_annos: | |
all_rel_sets[(triplet['subj_id'], triplet['obj_id'])].append(triplet) | |
relation_annos = [np.random.choice(v) for v in all_rel_sets.values()] | |
# get M*M pred_labels | |
relation_triplets = [] | |
relations = torch.zeros([len(target), len(target)], dtype=torch.int64) | |
for i in range(len(relation_annos)): | |
if len(self.ignore_rel) != 0 and relation_annos[i]['class'] in self.ignore_rel: | |
continue | |
subj_id = relation_annos[i]['subj_id'] | |
obj_id = relation_annos[i]['obj_id'] | |
predicate = self.labelmap['relation_to_ind'][relation_annos[i]['class']] | |
relations[subj_id, obj_id] = predicate | |
relation_triplets.append([subj_id, obj_id, predicate]) | |
relation_triplets = torch.tensor(relation_triplets) | |
target.add_field("relation_labels", relation_triplets) | |
target.add_field("pred_labels", relations) | |
return target | |
class BoxLabelLoader(object): | |
def __init__(self, labelmap, extra_fields=(), ignore_attrs=(), | |
mask_mode="poly"): | |
self.labelmap = labelmap | |
self.extra_fields = extra_fields | |
self.ignore_attrs = ignore_attrs | |
assert mask_mode == "poly" or mask_mode == "mask" | |
self.mask_mode = mask_mode | |
self.all_fields = ["class", "mask", "confidence", | |
"attributes_encode", "IsGroupOf", "IsProposal"] | |
def __call__(self, annotations, img_size, remove_empty=True): | |
boxes = [obj["rect"] for obj in annotations] | |
boxes = torch.as_tensor(boxes).reshape(-1, 4) | |
target = BoxList(boxes, img_size, mode="xyxy") | |
for field in self.extra_fields: | |
assert field in self.all_fields, "Unsupported field {}".format(field) | |
if field == "class": | |
classes = self.add_classes_with_ignore(annotations) | |
target.add_field("labels", classes) | |
elif field == "mask": | |
masks, is_box_mask = self.add_masks(annotations, img_size) | |
target.add_field("masks", masks) | |
target.add_field("is_box_mask", is_box_mask) | |
elif field == "confidence": | |
confidences = self.add_confidences(annotations) | |
target.add_field("confidences", confidences) | |
elif field == "attributes_encode": | |
attributes = self.add_attributes(annotations) | |
target.add_field("attributes", attributes) | |
elif field == "IsGroupOf": | |
is_group = [1 if 'IsGroupOf' in obj and obj['IsGroupOf'] == 1 else 0 | |
for obj in annotations] | |
target.add_field("IsGroupOf", torch.tensor(is_group)) | |
elif field == "IsProposal": | |
is_proposal = [1 if "IsProposal" in obj and obj['IsProposal'] == 1 else 0 | |
for obj in annotations] | |
target.add_field("IsProposal", torch.tensor(is_proposal)) | |
target = target.clip_to_image(remove_empty=remove_empty) | |
return target | |
def add_classes_with_ignore(self, annotations): | |
class_names = [obj["class"] for obj in annotations] | |
classes = [None] * len(class_names) | |
if self.ignore_attrs: | |
for i, obj in enumerate(annotations): | |
if any([obj[attr] for attr in self.ignore_attrs if attr in obj]): | |
classes[i] = -1 | |
for i, cls in enumerate(classes): | |
if cls != -1: | |
classes[i] = self.labelmap[class_names[i]] + 1 # 0 is saved for background | |
return torch.tensor(classes) | |
def add_masks(self, annotations, img_size): | |
masks = [] | |
is_box_mask = [] | |
for obj in annotations: | |
if "mask" in obj: | |
masks.append(obj["mask"]) | |
is_box_mask.append(0) | |
else: | |
masks.append(self.get_box_mask(obj["rect"], img_size)) | |
is_box_mask.append(1) | |
masks = SegmentationMask(masks, img_size, mode=self.mask_mode) | |
is_box_mask = torch.tensor(is_box_mask) | |
return masks, is_box_mask | |
def get_box_mask(self, rect, img_size): | |
x1, y1, x2, y2 = rect[0], rect[1], rect[2], rect[3] | |
if self.mask_mode == "poly": | |
return [[x1, y1, x1, y2, x2, y2, x2, y1]] | |
elif self.mask_mode == "mask": | |
# note the order of height/width order in mask is opposite to image | |
mask = np.zeros([img_size[1], img_size[0]], dtype=np.uint8) | |
mask[math.floor(y1):math.ceil(y2), math.floor(x1):math.ceil(x2)] = 255 | |
encoded_mask = mask_utils.encode(np.asfortranarray(mask)) | |
encoded_mask["counts"] = encoded_mask["counts"].decode("utf-8") | |
return encoded_mask | |
def add_confidences(self, annotations): | |
confidences = [] | |
for obj in annotations: | |
if "confidence" in obj: | |
confidences.append(obj["confidence"]) | |
elif "conf" in obj: | |
confidences.append(obj["conf"]) | |
else: | |
confidences.append(1.0) | |
return torch.tensor(confidences) | |
def add_attributes(self, annotations): | |
# we know that the maximal number of attributes per object is 16 | |
attributes = [[0] * 16 for _ in range(len(annotations))] | |
for i, obj in enumerate(annotations): | |
attributes[i][:len(obj["attributes_encode"])] = obj["attributes_encode"] | |
return torch.tensor(attributes) | |