|
from typing import Dict, List, Union |
|
from pathlib import Path |
|
import datasets |
|
import torch |
|
import evaluate |
|
import json |
|
from tqdm import tqdm |
|
|
|
import copy |
|
import pickle |
|
from typing import Dict, List, Tuple, Union |
|
from tqdm import tqdm |
|
import numpy as np |
|
import torch |
|
import torch.distributed as dist |
|
from datasets import Dataset |
|
|
|
__author__ = 'tsungyi' |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
try: |
|
iou = _mask.iou |
|
merge = _mask.merge |
|
frPyObjects = _mask.frPyObjects |
|
except: |
|
pass |
|
|
|
|
|
def encode(bimask): |
|
if len(bimask.shape) == 3: |
|
return _mask.encode(bimask) |
|
elif len(bimask.shape) == 2: |
|
h, w = bimask.shape |
|
return _mask.encode(bimask.reshape((h, w, 1), order='F'))[0] |
|
|
|
|
|
def decode(rleObjs): |
|
if type(rleObjs) == list: |
|
return _mask.decode(rleObjs) |
|
else: |
|
return _mask.decode([rleObjs])[:, :, 0] |
|
|
|
|
|
def area(rleObjs): |
|
if type(rleObjs) == list: |
|
return _mask.area(rleObjs) |
|
else: |
|
return _mask.area([rleObjs])[0] |
|
|
|
|
|
def toBbox(rleObjs): |
|
if type(rleObjs) == list: |
|
return _mask.toBbox(rleObjs) |
|
else: |
|
return _mask.toBbox([rleObjs])[0] |
|
|
|
|
|
|
|
|
|
|
|
from typing import List |
|
import numpy as np |
|
|
|
|
|
class maskUtils(object): |
|
@staticmethod |
|
def iou( |
|
dt: List[List[float]], gt: List[List[float]], iscrowd: List[bool] |
|
) -> np.ndarray: |
|
""" |
|
Calculate the intersection over union (IoU) between detection bounding boxes (dt) and \ |
|
ground truth bounding boxes (gt). |
|
Reference: https://github.com/rafaelpadilla/review_object_detection_metrics |
|
|
|
Args: |
|
dt (List[List[float]]): List of detection bounding boxes in the \ |
|
format [x, y, width, height]. |
|
gt (List[List[float]]): List of ground-truth bounding boxes in the \ |
|
format [x, y, width, height]. |
|
iscrowd (List[bool]): List indicating if each ground-truth bounding box \ |
|
is a crowd region or not. |
|
|
|
Returns: |
|
np.ndarray: Array of IoU values of shape (len(dt), len(gt)). |
|
""" |
|
assert len(iscrowd) == len(gt), "iou(iscrowd=) must have the same length as gt" |
|
if len(dt) == 0 or len(gt) == 0: |
|
return [] |
|
ious = np.zeros((len(dt), len(gt)), dtype=np.float64) |
|
for g_idx, g in enumerate(gt): |
|
for d_idx, d in enumerate(dt): |
|
ious[d_idx, g_idx] = _jaccard(d, g, iscrowd[g_idx]) |
|
return ious |
|
|
|
|
|
def _jaccard(a: List[float], b: List[float], iscrowd: bool) -> float: |
|
""" |
|
Calculate the Jaccard index (intersection over union) between two bounding boxes. |
|
For "crowd" regions, we use a modified criteria. If a gt object is |
|
marked as "iscrowd", we allow a dt to match any subregion of the gt. |
|
Choosing gt' in the crowd gt that best matches the dt can be done using |
|
gt'=intersect(dt,gt). Since by definition union(gt',dt)=dt, computing |
|
iou(gt,dt,iscrowd) = iou(gt',dt) = area(intersect(gt,dt)) / area(dt) |
|
For crowd gt regions we use this modified criteria above for the iou. |
|
|
|
Args: |
|
a (List[float]): Bounding box coordinates in the format [x, y, width, height]. |
|
b (List[float]): Bounding box coordinates in the format [x, y, width, height]. |
|
iscrowd (bool): Flag indicating if the second bounding box is a crowd region or not. |
|
|
|
Returns: |
|
float: Jaccard index between the two bounding boxes. |
|
""" |
|
eps = 4e-12 |
|
xa, ya, x2a, y2a = a[0], a[1], a[0] + a[2], a[1] + a[3] |
|
xb, yb, x2b, y2b = b[0], b[1], b[0] + b[2], b[1] + b[3] |
|
|
|
|
|
xi = max(xa, xb) |
|
|
|
x2i = min(x2a, x2b) |
|
|
|
yi = max(ya, yb) |
|
y2i = min(y2a, y2b) |
|
|
|
|
|
Aa = max(x2a - xa, 0.) * max(y2a - ya, 0.) |
|
Ab = max(x2b - xb, 0.) * max(y2b - yb, 0.) |
|
Ai = max(x2i - xi, 0.) * max(y2i - yi, 0.) |
|
|
|
if iscrowd: |
|
return Ai / (Aa + eps) |
|
|
|
return Ai / (Aa + Ab - Ai + eps) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
__author__ = "tsungyi" |
|
|
|
import copy |
|
import datetime |
|
import time |
|
from collections import defaultdict |
|
from packaging import version |
|
|
|
import numpy as np |
|
|
|
if version.parse(np.__version__) < version.parse("1.24"): |
|
dtype_float = np.float |
|
else: |
|
dtype_float = np.float32 |
|
|
|
|
|
class COCOeval: |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def __init__(self, cocoGt=None, cocoDt=None, iouType="segm"): |
|
""" |
|
Initialize CocoEval using coco APIs for gt and dt |
|
:param cocoGt: coco object with ground truth annotations |
|
:param cocoDt: coco object with detection results |
|
:return: None |
|
""" |
|
if not iouType: |
|
print("iouType not specified. use default iouType segm") |
|
self.cocoGt = cocoGt |
|
self.cocoDt = cocoDt |
|
self.evalImgs = defaultdict( |
|
list |
|
) |
|
self.eval = {} |
|
self._gts = defaultdict(list) |
|
self._dts = defaultdict(list) |
|
self.params = Params(iouType=iouType) |
|
self._paramsEval = {} |
|
self.stats = [] |
|
self.ious = {} |
|
if not cocoGt is None: |
|
self.params.imgIds = sorted(cocoGt.getImgIds()) |
|
self.params.catIds = sorted(cocoGt.getCatIds()) |
|
|
|
def _prepare(self): |
|
""" |
|
Prepare ._gts and ._dts for evaluation based on params |
|
:return: None |
|
""" |
|
|
|
def _toMask(anns, coco): |
|
|
|
for ann in anns: |
|
rle = coco.annToRLE(ann) |
|
ann["segmentation"] = rle |
|
|
|
p = self.params |
|
if p.useCats: |
|
gts = self.cocoGt.loadAnns( |
|
self.cocoGt.getAnnIds(imgIds=p.imgIds, catIds=p.catIds) |
|
) |
|
dts = self.cocoDt.loadAnns( |
|
self.cocoDt.getAnnIds(imgIds=p.imgIds, catIds=p.catIds) |
|
) |
|
else: |
|
gts = self.cocoGt.loadAnns(self.cocoGt.getAnnIds(imgIds=p.imgIds)) |
|
dts = self.cocoDt.loadAnns(self.cocoDt.getAnnIds(imgIds=p.imgIds)) |
|
|
|
|
|
if p.iouType == "segm": |
|
_toMask(gts, self.cocoGt) |
|
_toMask(dts, self.cocoDt) |
|
|
|
for gt in gts: |
|
gt["ignore"] = gt["ignore"] if "ignore" in gt else 0 |
|
gt["ignore"] = "iscrowd" in gt and gt["iscrowd"] |
|
if p.iouType == "keypoints": |
|
gt["ignore"] = (gt["num_keypoints"] == 0) or gt["ignore"] |
|
self._gts = defaultdict(list) |
|
self._dts = defaultdict(list) |
|
for gt in gts: |
|
self._gts[gt["image_id"], gt["category_id"]].append(gt) |
|
for dt in dts: |
|
self._dts[dt["image_id"], dt["category_id"]].append(dt) |
|
self.evalImgs = defaultdict(list) |
|
self.eval = {} |
|
|
|
def evaluate(self): |
|
""" |
|
Run per image evaluation on given images and store results (a list of dict) in self.evalImgs |
|
:return: None |
|
""" |
|
|
|
|
|
p = self.params |
|
|
|
if not p.useSegm is None: |
|
p.iouType = "segm" if p.useSegm == 1 else "bbox" |
|
|
|
|
|
|
|
|
|
|
|
|
|
p.imgIds = list(np.unique(p.imgIds)) |
|
if p.useCats: |
|
p.catIds = list(np.unique(p.catIds)) |
|
p.maxDets = sorted(p.maxDets) |
|
self.params = p |
|
|
|
self._prepare() |
|
|
|
catIds = p.catIds if p.useCats else [-1] |
|
|
|
if p.iouType == "segm" or p.iouType == "bbox": |
|
computeIoU = self.computeIoU |
|
elif p.iouType == "keypoints": |
|
computeIoU = self.computeOks |
|
self.ious = { |
|
(imgId, catId): computeIoU(imgId, catId) |
|
for imgId in p.imgIds |
|
for catId in catIds |
|
} |
|
|
|
evaluateImg = self.evaluateImg |
|
maxDet = p.maxDets[-1] |
|
self.evalImgs = [ |
|
evaluateImg(imgId, catId, areaRng, maxDet) |
|
for catId in catIds |
|
for areaRng in p.areaRng |
|
for imgId in p.imgIds |
|
] |
|
self._paramsEval = copy.deepcopy(self.params) |
|
ret_evalImgs = np.asarray(self.evalImgs).reshape( |
|
len(catIds), len(p.areaRng), len(p.imgIds) |
|
) |
|
|
|
|
|
return ret_evalImgs |
|
|
|
def computeIoU(self, imgId, catId): |
|
p = self.params |
|
if p.useCats: |
|
gt = self._gts[imgId, catId] |
|
dt = self._dts[imgId, catId] |
|
else: |
|
gt = [_ for cId in p.catIds for _ in self._gts[imgId, cId]] |
|
dt = [_ for cId in p.catIds for _ in self._dts[imgId, cId]] |
|
if len(gt) == 0 and len(dt) == 0: |
|
return [] |
|
inds = np.argsort([-d["score"] for d in dt], kind="mergesort") |
|
dt = [dt[i] for i in inds] |
|
if len(dt) > p.maxDets[-1]: |
|
dt = dt[0: p.maxDets[-1]] |
|
|
|
if p.iouType == "segm": |
|
g = [g["segmentation"] for g in gt] |
|
d = [d["segmentation"] for d in dt] |
|
elif p.iouType == "bbox": |
|
g = [g["bbox"] for g in gt] |
|
d = [d["bbox"] for d in dt] |
|
else: |
|
raise Exception("unknown iouType for iou computation") |
|
|
|
|
|
iscrowd = [int(o["iscrowd"]) for o in gt] |
|
ious = maskUtils.iou(d, g, iscrowd) |
|
return ious |
|
|
|
def computeOks(self, imgId, catId): |
|
p = self.params |
|
|
|
gts = self._gts[imgId, catId] |
|
dts = self._dts[imgId, catId] |
|
inds = np.argsort([-d["score"] for d in dts], kind="mergesort") |
|
dts = [dts[i] for i in inds] |
|
if len(dts) > p.maxDets[-1]: |
|
dts = dts[0: p.maxDets[-1]] |
|
|
|
if len(gts) == 0 or len(dts) == 0: |
|
return [] |
|
ious = np.zeros((len(dts), len(gts))) |
|
sigmas = p.kpt_oks_sigmas |
|
vars = (sigmas * 2) ** 2 |
|
k = len(sigmas) |
|
|
|
for j, gt in enumerate(gts): |
|
|
|
g = np.array(gt["keypoints"]) |
|
xg = g[0::3] |
|
yg = g[1::3] |
|
vg = g[2::3] |
|
k1 = np.count_nonzero(vg > 0) |
|
bb = gt["bbox"] |
|
x0 = bb[0] - bb[2] |
|
x1 = bb[0] + bb[2] * 2 |
|
y0 = bb[1] - bb[3] |
|
y1 = bb[1] + bb[3] * 2 |
|
for i, dt in enumerate(dts): |
|
d = np.array(dt["keypoints"]) |
|
xd = d[0::3] |
|
yd = d[1::3] |
|
if k1 > 0: |
|
|
|
dx = xd - xg |
|
dy = yd - yg |
|
else: |
|
|
|
z = np.zeros((k)) |
|
dx = np.max((z, x0 - xd), axis=0) + np.max((z, xd - x1), axis=0) |
|
dy = np.max((z, y0 - yd), axis=0) + np.max((z, yd - y1), axis=0) |
|
e = (dx ** 2 + dy ** 2) / vars / (gt["area"] + np.spacing(1)) / 2 |
|
if k1 > 0: |
|
e = e[vg > 0] |
|
ious[i, j] = np.sum(np.exp(-e)) / e.shape[0] |
|
return ious |
|
|
|
def evaluateImg(self, imgId, catId, aRng, maxDet): |
|
""" |
|
perform evaluation for single category and image |
|
:return: dict (single image results) |
|
""" |
|
p = self.params |
|
if p.useCats: |
|
gt = self._gts[imgId, catId] |
|
dt = self._dts[imgId, catId] |
|
else: |
|
gt = [_ for cId in p.catIds for _ in self._gts[imgId, cId]] |
|
dt = [_ for cId in p.catIds for _ in self._dts[imgId, cId]] |
|
if len(gt) == 0 and len(dt) == 0: |
|
return None |
|
|
|
for g in gt: |
|
if g["ignore"] or (g["area"] < aRng[0] or g["area"] > aRng[1]): |
|
g["_ignore"] = 1 |
|
else: |
|
g["_ignore"] = 0 |
|
|
|
|
|
gtind = np.argsort([g["_ignore"] for g in gt], kind="mergesort") |
|
gt = [gt[i] for i in gtind] |
|
dtind = np.argsort([-d["score"] for d in dt], kind="mergesort") |
|
dt = [dt[i] for i in dtind[0:maxDet]] |
|
iscrowd = [int(o["iscrowd"]) for o in gt] |
|
|
|
ious = ( |
|
self.ious[imgId, catId][:, gtind] |
|
if len(self.ious[imgId, catId]) > 0 |
|
else self.ious[imgId, catId] |
|
) |
|
|
|
T = len(p.iouThrs) |
|
G = len(gt) |
|
D = len(dt) |
|
gtm = np.zeros((T, G)) |
|
dtm = np.zeros((T, D)) |
|
gtIg = np.array([g["_ignore"] for g in gt]) |
|
dtIg = np.zeros((T, D)) |
|
if not len(ious) == 0: |
|
for tind, t in enumerate(p.iouThrs): |
|
for dind, d in enumerate(dt): |
|
|
|
iou = min([t, 1 - 1e-10]) |
|
m = -1 |
|
for gind, g in enumerate(gt): |
|
|
|
if gtm[tind, gind] > 0 and not iscrowd[gind]: |
|
continue |
|
|
|
if m > -1 and gtIg[m] == 0 and gtIg[gind] == 1: |
|
break |
|
|
|
if ious[dind, gind] < iou: |
|
continue |
|
|
|
iou = ious[dind, gind] |
|
m = gind |
|
|
|
if m == -1: |
|
continue |
|
dtIg[tind, dind] = gtIg[m] |
|
dtm[tind, dind] = gt[m]["id"] |
|
gtm[tind, m] = d["id"] |
|
|
|
a = np.array([d["area"] < aRng[0] or d["area"] > aRng[1] for d in dt]).reshape( |
|
(1, len(dt)) |
|
) |
|
dtIg = np.logical_or(dtIg, np.logical_and(dtm == 0, np.repeat(a, T, 0))) |
|
|
|
return { |
|
"image_id": imgId, |
|
"category_id": catId, |
|
"aRng": aRng, |
|
"maxDet": maxDet, |
|
"dtIds": [d["id"] for d in dt], |
|
"gtIds": [g["id"] for g in gt], |
|
"dtMatches": dtm, |
|
"gtMatches": gtm, |
|
"dtScores": [d["score"] for d in dt], |
|
"gtIgnore": gtIg, |
|
"dtIgnore": dtIg, |
|
} |
|
|
|
def accumulate(self, p=None): |
|
""" |
|
Accumulate per image evaluation results and store the result in self.eval |
|
:param p: input params for evaluation |
|
:return: None |
|
""" |
|
print("Accumulating evaluation results...") |
|
tic = time.time() |
|
if not self.evalImgs: |
|
print("Please run evaluate() first") |
|
|
|
if p is None: |
|
p = self.params |
|
p.catIds = p.catIds if p.useCats == 1 else [-1] |
|
T = len(p.iouThrs) |
|
R = len(p.recThrs) |
|
K = len(p.catIds) if p.useCats else 1 |
|
A = len(p.areaRng) |
|
M = len(p.maxDets) |
|
precision = -np.ones( |
|
(T, R, K, A, M) |
|
) |
|
recall = -np.ones((T, K, A, M)) |
|
scores = -np.ones((T, R, K, A, M)) |
|
|
|
|
|
_pe = self._paramsEval |
|
catIds = _pe.catIds if _pe.useCats else [-1] |
|
setK = set(catIds) |
|
setA = set(map(tuple, _pe.areaRng)) |
|
setM = set(_pe.maxDets) |
|
setI = set(_pe.imgIds) |
|
|
|
k_list = [n for n, k in enumerate(p.catIds) if k in setK] |
|
m_list = [m for n, m in enumerate(p.maxDets) if m in setM] |
|
a_list = [ |
|
n for n, a in enumerate(map(lambda x: tuple(x), p.areaRng)) if a in setA |
|
] |
|
i_list = [n for n, i in enumerate(p.imgIds) if i in setI] |
|
I0 = len(_pe.imgIds) |
|
A0 = len(_pe.areaRng) |
|
|
|
for k, k0 in enumerate(k_list): |
|
Nk = k0 * A0 * I0 |
|
for a, a0 in enumerate(a_list): |
|
Na = a0 * I0 |
|
for m, maxDet in enumerate(m_list): |
|
E = [self.evalImgs[Nk + Na + i] for i in i_list] |
|
E = [e for e in E if not e is None] |
|
if len(E) == 0: |
|
continue |
|
dtScores = np.concatenate([e["dtScores"][0:maxDet] for e in E]) |
|
|
|
|
|
|
|
inds = np.argsort(-dtScores, kind="mergesort") |
|
dtScoresSorted = dtScores[inds] |
|
|
|
dtm = np.concatenate( |
|
[e["dtMatches"][:, 0:maxDet] for e in E], axis=1 |
|
)[:, inds] |
|
dtIg = np.concatenate( |
|
[e["dtIgnore"][:, 0:maxDet] for e in E], axis=1 |
|
)[:, inds] |
|
gtIg = np.concatenate([e["gtIgnore"] for e in E]) |
|
npig = np.count_nonzero(gtIg == 0) |
|
if npig == 0: |
|
continue |
|
tps = np.logical_and(dtm, np.logical_not(dtIg)) |
|
fps = np.logical_and(np.logical_not(dtm), np.logical_not(dtIg)) |
|
|
|
tp_sum = np.cumsum(tps, axis=1).astype(dtype=dtype_float) |
|
fp_sum = np.cumsum(fps, axis=1).astype(dtype=dtype_float) |
|
for t, (tp, fp) in enumerate(zip(tp_sum, fp_sum)): |
|
tp = np.array(tp) |
|
fp = np.array(fp) |
|
nd = len(tp) |
|
rc = tp / npig |
|
pr = tp / (fp + tp + np.spacing(1)) |
|
q = np.zeros((R,)) |
|
ss = np.zeros((R,)) |
|
|
|
if nd: |
|
recall[t, k, a, m] = rc[-1] |
|
else: |
|
recall[t, k, a, m] = 0 |
|
|
|
|
|
|
|
pr = pr.tolist() |
|
q = q.tolist() |
|
|
|
for i in range(nd - 1, 0, -1): |
|
if pr[i] > pr[i - 1]: |
|
pr[i - 1] = pr[i] |
|
|
|
inds = np.searchsorted(rc, p.recThrs, side="left") |
|
try: |
|
for ri, pi in enumerate(inds): |
|
q[ri] = pr[pi] |
|
ss[ri] = dtScoresSorted[pi] |
|
except: |
|
pass |
|
precision[t, :, k, a, m] = np.array(q) |
|
scores[t, :, k, a, m] = np.array(ss) |
|
self.eval = { |
|
"params": p, |
|
"counts": [T, R, K, A, M], |
|
"date": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), |
|
"precision": precision, |
|
"recall": recall, |
|
"scores": scores, |
|
} |
|
toc = time.time() |
|
print("DONE (t={:0.2f}s).".format(toc - tic)) |
|
|
|
def summarize(self): |
|
""" |
|
Compute and display summary metrics for evaluation results. |
|
Note this functin can *only* be applied on the default parameter setting |
|
""" |
|
|
|
def _summarize(ap=1, iouThr=None, areaRng="all", maxDets=100): |
|
p = self.params |
|
iStr = " {:<18} {} @[ IoU={:<9} | area={:>6s} | maxDets={:>3d} ] = {:0.3f}" |
|
titleStr = "Average Precision" if ap == 1 else "Average Recall" |
|
typeStr = "(AP)" if ap == 1 else "(AR)" |
|
iouStr = ( |
|
"{:0.2f}:{:0.2f}".format(p.iouThrs[0], p.iouThrs[-1]) |
|
if iouThr is None |
|
else "{:0.2f}".format(iouThr) |
|
) |
|
|
|
aind = [i for i, aRng in enumerate(p.areaRngLbl) if aRng == areaRng] |
|
mind = [i for i, mDet in enumerate(p.maxDets) if mDet == maxDets] |
|
if ap == 1: |
|
|
|
s = self.eval["precision"] |
|
|
|
if iouThr is not None: |
|
t = np.where(iouThr == p.iouThrs)[0] |
|
s = s[t] |
|
s = s[:, :, :, aind, mind] |
|
else: |
|
|
|
s = self.eval["recall"] |
|
if iouThr is not None: |
|
t = np.where(iouThr == p.iouThrs)[0] |
|
s = s[t] |
|
s = s[:, :, aind, mind] |
|
if len(s[s > -1]) == 0: |
|
mean_s = -1 |
|
else: |
|
mean_s = np.mean(s[s > -1]) |
|
print(iStr.format(titleStr, typeStr, iouStr, areaRng, maxDets, mean_s)) |
|
return mean_s |
|
|
|
def _summarizeDets(): |
|
stats = np.zeros((12,)) |
|
stats[0] = _summarize(1) |
|
stats[1] = _summarize(1, iouThr=0.5, maxDets=self.params.maxDets[2]) |
|
stats[2] = _summarize(1, iouThr=0.75, maxDets=self.params.maxDets[2]) |
|
stats[3] = _summarize(1, areaRng="small", maxDets=self.params.maxDets[2]) |
|
stats[4] = _summarize(1, areaRng="medium", maxDets=self.params.maxDets[2]) |
|
stats[5] = _summarize(1, areaRng="large", maxDets=self.params.maxDets[2]) |
|
stats[6] = _summarize(0, maxDets=self.params.maxDets[0]) |
|
stats[7] = _summarize(0, maxDets=self.params.maxDets[1]) |
|
stats[8] = _summarize(0, maxDets=self.params.maxDets[2]) |
|
stats[9] = _summarize(0, areaRng="small", maxDets=self.params.maxDets[2]) |
|
stats[10] = _summarize(0, areaRng="medium", maxDets=self.params.maxDets[2]) |
|
stats[11] = _summarize(0, areaRng="large", maxDets=self.params.maxDets[2]) |
|
return stats |
|
|
|
def _summarizeKps(): |
|
stats = np.zeros((10,)) |
|
stats[0] = _summarize(1, maxDets=20) |
|
stats[1] = _summarize(1, maxDets=20, iouThr=0.5) |
|
stats[2] = _summarize(1, maxDets=20, iouThr=0.75) |
|
stats[3] = _summarize(1, maxDets=20, areaRng="medium") |
|
stats[4] = _summarize(1, maxDets=20, areaRng="large") |
|
stats[5] = _summarize(0, maxDets=20) |
|
stats[6] = _summarize(0, maxDets=20, iouThr=0.5) |
|
stats[7] = _summarize(0, maxDets=20, iouThr=0.75) |
|
stats[8] = _summarize(0, maxDets=20, areaRng="medium") |
|
stats[9] = _summarize(0, maxDets=20, areaRng="large") |
|
return stats |
|
|
|
if not self.eval: |
|
raise Exception("Please run accumulate() first") |
|
iouType = self.params.iouType |
|
if iouType == "segm" or iouType == "bbox": |
|
summarize = _summarizeDets |
|
elif iouType == "keypoints": |
|
summarize = _summarizeKps |
|
self.stats = summarize() |
|
|
|
def __str__(self): |
|
self.summarize() |
|
|
|
|
|
class Params: |
|
""" |
|
Params for coco evaluation api |
|
""" |
|
|
|
def setDetParams(self): |
|
self.imgIds = [] |
|
self.catIds = [] |
|
|
|
self.iouThrs = np.linspace( |
|
0.5, 0.95, int(np.round((0.95 - 0.5) / 0.05)) + 1, endpoint=True |
|
) |
|
self.recThrs = np.linspace( |
|
0.0, 1.00, int(np.round((1.00 - 0.0) / 0.01)) + 1, endpoint=True |
|
) |
|
self.maxDets = [1, 10, 100] |
|
self.areaRng = [ |
|
[0 ** 2, 1e5 ** 2], |
|
[0 ** 2, 32 ** 2], |
|
[32 ** 2, 96 ** 2], |
|
[96 ** 2, 1e5 ** 2], |
|
] |
|
self.areaRngLbl = ["all", "small", "medium", "large"] |
|
self.useCats = 1 |
|
|
|
def setKpParams(self): |
|
self.imgIds = [] |
|
self.catIds = [] |
|
|
|
self.iouThrs = np.linspace( |
|
0.5, 0.95, int(np.round((0.95 - 0.5) / 0.05)) + 1, endpoint=True |
|
) |
|
self.recThrs = np.linspace( |
|
0.0, 1.00, int(np.round((1.00 - 0.0) / 0.01)) + 1, endpoint=True |
|
) |
|
self.maxDets = [20] |
|
self.areaRng = [[0 ** 2, 1e5 ** 2], [32 ** 2, 96 ** 2], [96 ** 2, 1e5 ** 2]] |
|
self.areaRngLbl = ["all", "medium", "large"] |
|
self.useCats = 1 |
|
self.kpt_oks_sigmas = ( |
|
np.array( |
|
[ |
|
0.26, |
|
0.25, |
|
0.25, |
|
0.35, |
|
0.35, |
|
0.79, |
|
0.79, |
|
0.72, |
|
0.72, |
|
0.62, |
|
0.62, |
|
1.07, |
|
1.07, |
|
0.87, |
|
0.87, |
|
0.89, |
|
0.89, |
|
] |
|
) |
|
/ 10.0 |
|
) |
|
|
|
def __init__(self, iouType="segm"): |
|
if iouType == "bbox": |
|
self.setDetParams() |
|
else: |
|
raise Exception("iouType not supported") |
|
self.iouType = iouType |
|
|
|
self.useSegm = None |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
__author__ = "tylin" |
|
__version__ = "2.0" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import copy |
|
import itertools |
|
import json |
|
|
|
import os |
|
import sys |
|
import time |
|
from collections import defaultdict |
|
|
|
import matplotlib.pyplot as plt |
|
import numpy as np |
|
from matplotlib.collections import PatchCollection |
|
from matplotlib.patches import Polygon |
|
|
|
PYTHON_VERSION = sys.version_info[0] |
|
if PYTHON_VERSION == 2: |
|
from urllib import urlretrieve |
|
elif PYTHON_VERSION == 3: |
|
from urllib.request import urlretrieve |
|
|
|
|
|
def _isArrayLike(obj): |
|
return hasattr(obj, "__iter__") and hasattr(obj, "__len__") |
|
|
|
|
|
class COCO: |
|
def __init__(self, annotations=None): |
|
""" |
|
Constructor of Microsoft COCO helper class for reading and visualizing annotations. |
|
:param annotation_file (str): location of annotation file |
|
:param image_folder (str): location to the folder that hosts images. |
|
:return: |
|
""" |
|
|
|
self.dataset, self.anns, self.cats, self.imgs = dict(), dict(), dict(), dict() |
|
self.imgToAnns, self.catToImgs = defaultdict(list), defaultdict(list) |
|
|
|
if annotations: |
|
assert ( |
|
type(annotations) == dict |
|
), f"annotation file format {type(annotations)} not supported." |
|
self.dataset = annotations |
|
self.createIndex() |
|
|
|
def createIndex(self): |
|
|
|
print("creating index...") |
|
anns, cats, imgs = {}, {}, {} |
|
imgToAnns, catToImgs = defaultdict(list), defaultdict(list) |
|
if "annotations" in self.dataset: |
|
for ann in self.dataset["annotations"]: |
|
imgToAnns[ann["image_id"]].append(ann) |
|
anns[ann["id"]] = ann |
|
|
|
if "images" in self.dataset: |
|
for img in self.dataset["images"]: |
|
imgs[img["id"]] = img |
|
|
|
if "categories" in self.dataset: |
|
for cat in self.dataset["categories"]: |
|
cats[cat["id"]] = cat |
|
|
|
if "annotations" in self.dataset and "categories" in self.dataset: |
|
for ann in self.dataset["annotations"]: |
|
catToImgs[ann["category_id"]].append(ann["image_id"]) |
|
|
|
print("index created!") |
|
|
|
|
|
self.anns = anns |
|
self.imgToAnns = imgToAnns |
|
self.catToImgs = catToImgs |
|
self.imgs = imgs |
|
self.cats = cats |
|
|
|
def info(self): |
|
""" |
|
Print information about the annotation file. |
|
:return: |
|
""" |
|
for key, value in self.dataset["info"].items(): |
|
print("{}: {}".format(key, value)) |
|
|
|
def getAnnIds(self, imgIds=[], catIds=[], areaRng=[], iscrowd=None): |
|
""" |
|
Get ann ids that satisfy given filter conditions. default skips that filter |
|
:param imgIds (int array) : get anns for given imgs |
|
catIds (int array) : get anns for given cats |
|
areaRng (float array) : get anns for given area range (e.g. [0 inf]) |
|
iscrowd (boolean) : get anns for given crowd label (False or True) |
|
:return: ids (int array) : integer array of ann ids |
|
""" |
|
imgIds = imgIds if _isArrayLike(imgIds) else [imgIds] |
|
catIds = catIds if _isArrayLike(catIds) else [catIds] |
|
|
|
if len(imgIds) == len(catIds) == len(areaRng) == 0: |
|
anns = self.dataset["annotations"] |
|
else: |
|
if not len(imgIds) == 0: |
|
lists = [ |
|
self.imgToAnns[imgId] for imgId in imgIds if imgId in self.imgToAnns |
|
] |
|
anns = list(itertools.chain.from_iterable(lists)) |
|
else: |
|
anns = self.dataset["annotations"] |
|
anns = ( |
|
anns |
|
if len(catIds) == 0 |
|
else [ann for ann in anns if ann["category_id"] in catIds] |
|
) |
|
anns = ( |
|
anns |
|
if len(areaRng) == 0 |
|
else [ |
|
ann |
|
for ann in anns |
|
if ann["area"] > areaRng[0] and ann["area"] < areaRng[1] |
|
] |
|
) |
|
if not iscrowd == None: |
|
ids = [ann["id"] for ann in anns if ann["iscrowd"] == iscrowd] |
|
else: |
|
ids = [ann["id"] for ann in anns] |
|
return ids |
|
|
|
def getCatIds(self, catNms=[], supNms=[], catIds=[]): |
|
""" |
|
filtering parameters. default skips that filter. |
|
:param catNms (str array) : get cats for given cat names |
|
:param supNms (str array) : get cats for given supercategory names |
|
:param catIds (int array) : get cats for given cat ids |
|
:return: ids (int array) : integer array of cat ids |
|
""" |
|
catNms = catNms if _isArrayLike(catNms) else [catNms] |
|
supNms = supNms if _isArrayLike(supNms) else [supNms] |
|
catIds = catIds if _isArrayLike(catIds) else [catIds] |
|
|
|
if len(catNms) == len(supNms) == len(catIds) == 0: |
|
cats = self.dataset["categories"] |
|
else: |
|
cats = self.dataset["categories"] |
|
cats = ( |
|
cats |
|
if len(catNms) == 0 |
|
else [cat for cat in cats if cat["name"] in catNms] |
|
) |
|
cats = ( |
|
cats |
|
if len(supNms) == 0 |
|
else [cat for cat in cats if cat["supercategory"] in supNms] |
|
) |
|
cats = ( |
|
cats |
|
if len(catIds) == 0 |
|
else [cat for cat in cats if cat["id"] in catIds] |
|
) |
|
ids = [cat["id"] for cat in cats] |
|
return ids |
|
|
|
def getImgIds(self, imgIds=[], catIds=[]): |
|
""" |
|
Get img ids that satisfy given filter conditions. |
|
:param imgIds (int array) : get imgs for given ids |
|
:param catIds (int array) : get imgs with all given cats |
|
:return: ids (int array) : integer array of img ids |
|
""" |
|
imgIds = imgIds if _isArrayLike(imgIds) else [imgIds] |
|
catIds = catIds if _isArrayLike(catIds) else [catIds] |
|
|
|
if len(imgIds) == len(catIds) == 0: |
|
ids = self.imgs.keys() |
|
else: |
|
ids = set(imgIds) |
|
for i, catId in enumerate(catIds): |
|
if i == 0 and len(ids) == 0: |
|
ids = set(self.catToImgs[catId]) |
|
else: |
|
ids &= set(self.catToImgs[catId]) |
|
return list(ids) |
|
|
|
def loadAnns(self, ids=[]): |
|
""" |
|
Load anns with the specified ids. |
|
:param ids (int array) : integer ids specifying anns |
|
:return: anns (object array) : loaded ann objects |
|
""" |
|
if _isArrayLike(ids): |
|
return [self.anns[id] for id in ids] |
|
elif type(ids) == int: |
|
return [self.anns[ids]] |
|
|
|
def loadCats(self, ids=[]): |
|
""" |
|
Load cats with the specified ids. |
|
:param ids (int array) : integer ids specifying cats |
|
:return: cats (object array) : loaded cat objects |
|
""" |
|
if _isArrayLike(ids): |
|
return [self.cats[id] for id in ids] |
|
elif type(ids) == int: |
|
return [self.cats[ids]] |
|
|
|
def loadImgs(self, ids=[]): |
|
""" |
|
Load anns with the specified ids. |
|
:param ids (int array) : integer ids specifying img |
|
:return: imgs (object array) : loaded img objects |
|
""" |
|
if _isArrayLike(ids): |
|
return [self.imgs[id] for id in ids] |
|
elif type(ids) == int: |
|
return [self.imgs[ids]] |
|
|
|
def showAnns(self, anns, draw_bbox=False): |
|
""" |
|
Display the specified annotations. |
|
:param anns (array of object): annotations to display |
|
:return: None |
|
""" |
|
if len(anns) == 0: |
|
return 0 |
|
if "segmentation" in anns[0] or "keypoints" in anns[0]: |
|
datasetType = "instances" |
|
elif "caption" in anns[0]: |
|
datasetType = "captions" |
|
else: |
|
raise Exception("datasetType not supported") |
|
if datasetType == "instances": |
|
ax = plt.gca() |
|
ax.set_autoscale_on(False) |
|
polygons = [] |
|
color = [] |
|
for ann in anns: |
|
c = (np.random.random((1, 3)) * 0.6 + 0.4).tolist()[0] |
|
if "segmentation" in ann: |
|
if type(ann["segmentation"]) == list: |
|
|
|
for seg in ann["segmentation"]: |
|
poly = np.array(seg).reshape((int(len(seg) / 2), 2)) |
|
polygons.append(Polygon(poly)) |
|
color.append(c) |
|
else: |
|
raise NotImplementedError( |
|
"This type is not is not supported yet." |
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if "keypoints" in ann and type(ann["keypoints"]) == list: |
|
|
|
sks = np.array(self.loadCats(ann["category_id"])[0]["skeleton"]) - 1 |
|
kp = np.array(ann["keypoints"]) |
|
x = kp[0::3] |
|
y = kp[1::3] |
|
v = kp[2::3] |
|
for sk in sks: |
|
if np.all(v[sk] > 0): |
|
plt.plot(x[sk], y[sk], linewidth=3, color=c) |
|
plt.plot( |
|
x[v > 0], |
|
y[v > 0], |
|
"o", |
|
markersize=8, |
|
markerfacecolor=c, |
|
markeredgecolor="k", |
|
markeredgewidth=2, |
|
) |
|
plt.plot( |
|
x[v > 1], |
|
y[v > 1], |
|
"o", |
|
markersize=8, |
|
markerfacecolor=c, |
|
markeredgecolor=c, |
|
markeredgewidth=2, |
|
) |
|
|
|
if draw_bbox: |
|
[bbox_x, bbox_y, bbox_w, bbox_h] = ann["bbox"] |
|
poly = [ |
|
[bbox_x, bbox_y], |
|
[bbox_x, bbox_y + bbox_h], |
|
[bbox_x + bbox_w, bbox_y + bbox_h], |
|
[bbox_x + bbox_w, bbox_y], |
|
] |
|
np_poly = np.array(poly).reshape((4, 2)) |
|
polygons.append(Polygon(np_poly)) |
|
color.append(c) |
|
|
|
p = PatchCollection(polygons, facecolor=color, linewidths=0, alpha=0.4) |
|
ax.add_collection(p) |
|
p = PatchCollection( |
|
polygons, facecolor="none", edgecolors=color, linewidths=2 |
|
) |
|
ax.add_collection(p) |
|
elif datasetType == "captions": |
|
for ann in anns: |
|
print(ann["caption"]) |
|
|
|
def loadRes(self, resFile): |
|
""" |
|
Load result file and return a result api object. |
|
:param resFile (str) : file name of result file |
|
:return: res (obj) : result api object |
|
""" |
|
res = COCO() |
|
res.dataset["images"] = [img for img in self.dataset["images"]] |
|
|
|
print("Loading and preparing results...") |
|
tic = time.time() |
|
if type(resFile) == str or (PYTHON_VERSION == 2 and type(resFile) == unicode): |
|
anns = json.load(open(resFile)) |
|
elif type(resFile) == np.ndarray: |
|
anns = self.loadNumpyAnnotations(resFile) |
|
else: |
|
anns = resFile |
|
assert type(anns) == list, "results in not an array of objects" |
|
annsImgIds = [ann["image_id"] for ann in anns] |
|
assert set(annsImgIds) == ( |
|
set(annsImgIds) & set(self.getImgIds()) |
|
), "Results do not correspond to current coco set" |
|
if "caption" in anns[0]: |
|
raise NotImplementedError("Evaluating caption is not supported yet.") |
|
elif "bbox" in anns[0] and not anns[0]["bbox"] == []: |
|
res.dataset["categories"] = copy.deepcopy(self.dataset["categories"]) |
|
for id, ann in enumerate(anns): |
|
bb = ann["bbox"] |
|
x1, x2, y1, y2 = [bb[0], bb[0] + bb[2], bb[1], bb[1] + bb[3]] |
|
if not "segmentation" in ann: |
|
ann["segmentation"] = [[x1, y1, x1, y2, x2, y2, x2, y1]] |
|
ann["area"] = bb[2] * bb[3] |
|
ann["id"] = id + 1 |
|
ann["iscrowd"] = 0 |
|
elif "segmentation" in anns[0]: |
|
raise NotImplementedError("Evaluating caption is not supported yet.") |
|
elif "keypoints" in anns[0]: |
|
raise NotImplementedError("Evaluating caption is not supported yet.") |
|
print("DONE (t={:0.2f}s)".format(time.time() - tic)) |
|
|
|
res.dataset["annotations"] = anns |
|
res.createIndex() |
|
return res |
|
|
|
def download(self, tarDir=None, imgIds=[]): |
|
""" |
|
Download COCO images from mscoco.org server. |
|
:param tarDir (str): COCO results directory name |
|
imgIds (list): images to be downloaded |
|
:return: |
|
""" |
|
if tarDir is None: |
|
print("Please specify target directory") |
|
return -1 |
|
if len(imgIds) == 0: |
|
imgs = self.imgs.values() |
|
else: |
|
imgs = self.loadImgs(imgIds) |
|
N = len(imgs) |
|
if not os.path.exists(tarDir): |
|
os.makedirs(tarDir) |
|
for i, img in enumerate(imgs): |
|
tic = time.time() |
|
fname = os.path.join(tarDir, img["file_name"]) |
|
if not os.path.exists(fname): |
|
urlretrieve(img["coco_url"], fname) |
|
print( |
|
"downloaded {}/{} images (t={:0.1f}s)".format(i, N, time.time() - tic) |
|
) |
|
|
|
def loadNumpyAnnotations(self, data): |
|
""" |
|
Convert result data from a numpy array [Nx7] where each row contains {imageID,x1,y1,w,h,score,class} |
|
:param data (numpy.ndarray) |
|
:return: annotations (python nested list) |
|
""" |
|
print("Converting ndarray to lists...") |
|
assert type(data) == np.ndarray |
|
print(data.shape) |
|
assert data.shape[1] == 7 |
|
N = data.shape[0] |
|
ann = [] |
|
for i in range(N): |
|
if i % 1000000 == 0: |
|
print("{}/{}".format(i, N)) |
|
ann += [ |
|
{ |
|
"image_id": int(data[i, 0]), |
|
"bbox": [data[i, 1], data[i, 2], data[i, 3], data[i, 4]], |
|
"score": data[i, 5], |
|
"category_id": int(data[i, 6]), |
|
} |
|
] |
|
return ann |
|
|
|
def annToRLE(self, ann): |
|
""" |
|
Convert annotation which can be polygons, uncompressed RLE to RLE. |
|
:return: binary mask (numpy 2D array) |
|
""" |
|
t = self.imgs[ann["image_id"]] |
|
h, w = t["height"], t["width"] |
|
segm = ann["segmentation"] |
|
if type(segm) == list: |
|
raise NotImplementedError("This type is not is not supported yet.") |
|
|
|
|
|
|
|
|
|
elif type(segm["counts"]) == list: |
|
raise NotImplementedError("This type is not is not supported yet.") |
|
|
|
|
|
else: |
|
|
|
rle = ann["segmentation"] |
|
return rle |
|
|
|
def annToMask(self, ann): |
|
""" |
|
Convert annotation which can be polygons, uncompressed RLE, or RLE to binary mask. |
|
:return: binary mask (numpy 2D array) |
|
""" |
|
rle = self.annToRLE(ann) |
|
|
|
raise NotImplementedError("This type is not is not supported yet.") |
|
return m |
|
|
|
|
|
|
|
_TYPING_BOX = Tuple[float, float, float, float] |
|
_TYPING_SCORES = List[float] |
|
_TYPING_LABELS = List[int] |
|
_TYPING_BOXES = List[_TYPING_BOX] |
|
_TYPING_PRED_REF = Union[_TYPING_SCORES, _TYPING_LABELS, _TYPING_BOXES] |
|
_TYPING_PREDICTION = Dict[str, _TYPING_PRED_REF] |
|
_TYPING_REFERENCE = Dict[str, _TYPING_PRED_REF] |
|
_TYPING_PREDICTIONS = Dict[int, _TYPING_PREDICTION] |
|
|
|
|
|
def convert_to_xywh(boxes: torch.Tensor) -> torch.Tensor: |
|
""" |
|
Convert bounding boxes from (xmin, ymin, xmax, ymax) format to (x, y, width, height) format. |
|
|
|
Args: |
|
boxes (torch.Tensor): Tensor of shape (N, 4) representing bounding boxes in \ |
|
(xmin, ymin, xmax, ymax) format. |
|
|
|
Returns: |
|
torch.Tensor: Tensor of shape (N, 4) representing bounding boxes in (x, y, width, height) \ |
|
format. |
|
""" |
|
xmin, ymin, xmax, ymax = boxes.unbind(1) |
|
return torch.stack((xmin, ymin, xmax - xmin, ymax - ymin), dim=1) |
|
|
|
|
|
def create_common_coco_eval( |
|
coco_eval: COCOeval, img_ids: List[int], eval_imgs: np.ndarray |
|
) -> None: |
|
""" |
|
Create a common COCO evaluation by merging image IDs and evaluation images into the \ |
|
coco_eval object. |
|
|
|
Args: |
|
coco_eval: COCOeval evaluation object. |
|
img_ids (List[int]): Tensor of image IDs. |
|
eval_imgs (torch.Tensor): Tensor of evaluation images. |
|
""" |
|
img_ids, eval_imgs = merge(img_ids, eval_imgs) |
|
img_ids = list(img_ids) |
|
eval_imgs = list(eval_imgs.flatten()) |
|
|
|
coco_eval.evalImgs = eval_imgs |
|
coco_eval.params.imgIds = img_ids |
|
coco_eval._paramsEval = copy.deepcopy(coco_eval.params) |
|
|
|
|
|
def merge(img_ids: List[int], eval_imgs: np.ndarray) -> Tuple[np.ndarray, np.ndarray]: |
|
""" |
|
Merge image IDs and evaluation images from different processes. |
|
|
|
Args: |
|
img_ids (List[int]): List of image ID arrays from different processes. |
|
eval_imgs (np.ndarray): Evaluation images from different processes. |
|
|
|
Returns: |
|
Tuple[np.ndarray, np.ndarray]: Merged image IDs and evaluation images. |
|
""" |
|
all_img_ids = all_gather(img_ids) |
|
all_eval_imgs = all_gather(eval_imgs) |
|
|
|
merged_img_ids = [] |
|
for p in all_img_ids: |
|
merged_img_ids.extend(p) |
|
|
|
merged_eval_imgs = [] |
|
for p in all_eval_imgs: |
|
merged_eval_imgs.append(p) |
|
|
|
merged_img_ids = np.array(merged_img_ids) |
|
merged_eval_imgs = np.concatenate(merged_eval_imgs, 2) |
|
|
|
|
|
merged_img_ids, idx = np.unique(merged_img_ids, return_index=True) |
|
merged_eval_imgs = merged_eval_imgs[..., idx] |
|
|
|
return merged_img_ids, merged_eval_imgs |
|
|
|
|
|
def all_gather(data: List[int]) -> List[List[int]]: |
|
""" |
|
Run all_gather on arbitrary picklable data (not necessarily tensors). |
|
|
|
Args: |
|
data (List[int]): any picklable object |
|
Returns: |
|
List[List[int]]: list of data gathered from each rank |
|
""" |
|
world_size = get_world_size() |
|
if world_size == 1: |
|
return [data] |
|
|
|
|
|
buffer = pickle.dumps(data) |
|
storage = torch.ByteStorage.from_buffer(buffer) |
|
tensor = torch.ByteTensor(storage).to("cuda") |
|
|
|
|
|
local_size = torch.tensor([tensor.numel()], device="cuda") |
|
size_list = [torch.tensor([0], device="cuda") for _ in range(world_size)] |
|
dist.all_gather(size_list, local_size) |
|
size_list = [int(size.item()) for size in size_list] |
|
max_size = max(size_list) |
|
|
|
|
|
|
|
|
|
tensor_list = [] |
|
for _ in size_list: |
|
tensor_list.append(torch.empty((max_size,), dtype=torch.uint8, device="cuda")) |
|
if local_size != max_size: |
|
padding = torch.empty( |
|
size=(max_size - local_size,), dtype=torch.uint8, device="cuda" |
|
) |
|
tensor = torch.cat((tensor, padding), dim=0) |
|
dist.all_gather(tensor_list, tensor) |
|
|
|
data_list = [] |
|
for size, tensor in zip(size_list, tensor_list): |
|
buffer = tensor.cpu().numpy().tobytes()[:size] |
|
data_list.append(pickle.loads(buffer)) |
|
|
|
return data_list |
|
|
|
|
|
def get_world_size() -> int: |
|
""" |
|
Get the number of processes in the distributed environment. |
|
|
|
Returns: |
|
int: Number of processes. |
|
""" |
|
if not is_dist_avail_and_initialized(): |
|
return 1 |
|
return dist.get_world_size() |
|
|
|
|
|
def is_dist_avail_and_initialized() -> bool: |
|
""" |
|
Check if distributed environment is available and initialized. |
|
|
|
Returns: |
|
bool: True if distributed environment is available and initialized, False otherwise. |
|
""" |
|
return dist.is_available() and dist.is_initialized() |
|
|
|
|
|
import contextlib |
|
import copy |
|
import os |
|
from typing import Dict, List, Union |
|
|
|
import numpy as np |
|
import torch |
|
|
|
_SUPPORTED_TYPES = ["bbox"] |
|
|
|
|
|
class COCOEvaluator(object): |
|
""" |
|
Class to perform evaluation for the COCO dataset. |
|
""" |
|
|
|
def __init__(self, coco_gt: COCO, iou_types: List[str] = ["bbox"]): |
|
""" |
|
Initializes COCOEvaluator with the ground truth COCO dataset and IoU types. |
|
|
|
Args: |
|
coco_gt: The ground truth COCO dataset. |
|
iou_types: Intersection over Union (IoU) types for evaluation (Supported: "bbox"). |
|
""" |
|
self.coco_gt = copy.deepcopy(coco_gt) |
|
|
|
self.coco_eval = {} |
|
for iou_type in iou_types: |
|
assert iou_type in _SUPPORTED_TYPES, ValueError( |
|
f"IoU type not supported {iou_type}" |
|
) |
|
self.coco_eval[iou_type] = COCOeval(self.coco_gt, iouType=iou_type) |
|
|
|
self.iou_types = iou_types |
|
self.img_ids = [] |
|
self.eval_imgs = {k: [] for k in iou_types} |
|
|
|
def update(self, predictions: _TYPING_PREDICTIONS) -> None: |
|
""" |
|
Update the evaluator with new predictions. |
|
|
|
Args: |
|
predictions: The predictions to update. |
|
""" |
|
img_ids = list(np.unique(list(predictions.keys()))) |
|
self.img_ids.extend(img_ids) |
|
|
|
for iou_type in self.iou_types: |
|
results = self.prepare(predictions, iou_type) |
|
|
|
|
|
with open(os.devnull, "w") as devnull: |
|
with contextlib.redirect_stdout(devnull): |
|
coco_dt = COCO.loadRes(self.coco_gt, results) if results else COCO() |
|
coco_eval = self.coco_eval[iou_type] |
|
|
|
coco_eval.cocoDt = coco_dt |
|
coco_eval.params.imgIds = list(img_ids) |
|
eval_imgs = coco_eval.evaluate() |
|
self.eval_imgs[iou_type].append(eval_imgs) |
|
|
|
def synchronize_between_processes(self) -> None: |
|
""" |
|
Synchronizes evaluation images between processes. |
|
""" |
|
for iou_type in self.iou_types: |
|
self.eval_imgs[iou_type] = np.concatenate(self.eval_imgs[iou_type], 2) |
|
create_common_coco_eval( |
|
self.coco_eval[iou_type], self.img_ids, self.eval_imgs[iou_type] |
|
) |
|
|
|
def accumulate(self) -> None: |
|
""" |
|
Accumulates the evaluation results. |
|
""" |
|
for coco_eval in self.coco_eval.values(): |
|
coco_eval.accumulate() |
|
|
|
def summarize(self) -> None: |
|
""" |
|
Prints the IoU metric and summarizes the evaluation results. |
|
""" |
|
for iou_type, coco_eval in self.coco_eval.items(): |
|
print("IoU metric: {}".format(iou_type)) |
|
coco_eval.summarize() |
|
|
|
def prepare( |
|
self, predictions: _TYPING_PREDICTIONS, iou_type: str |
|
) -> List[Dict[str, Union[int, _TYPING_BOX, float]]]: |
|
""" |
|
Prepares the predictions for COCO detection. |
|
|
|
Args: |
|
predictions: The predictions to prepare. |
|
iou_type: The Intersection over Union (IoU) type for evaluation. |
|
|
|
Returns: |
|
A dictionary with the prepared predictions. |
|
""" |
|
if iou_type == "bbox": |
|
return self.prepare_for_coco_detection(predictions) |
|
else: |
|
raise ValueError(f"IoU type not supported {iou_type}") |
|
|
|
def _post_process_stats( |
|
self, stats, coco_eval_object, iou_type="bbox" |
|
) -> Dict[str, float]: |
|
""" |
|
Prepares the predictions for COCO detection. |
|
|
|
Args: |
|
predictions: The predictions to prepare. |
|
iou_type: The Intersection over Union (IoU) type for evaluation. |
|
|
|
Returns: |
|
A dictionary with the prepared predictions. |
|
""" |
|
if iou_type not in _SUPPORTED_TYPES: |
|
raise ValueError(f"iou_type '{iou_type}' not supported") |
|
|
|
current_max_dets = coco_eval_object.params.maxDets |
|
|
|
index_to_title = { |
|
"bbox": { |
|
0: f"AP-IoU=0.50:0.95-area=all-maxDets={current_max_dets[2]}", |
|
1: f"AP-IoU=0.50-area=all-maxDets={current_max_dets[2]}", |
|
2: f"AP-IoU=0.75-area=all-maxDets={current_max_dets[2]}", |
|
3: f"AP-IoU=0.50:0.95-area=small-maxDets={current_max_dets[2]}", |
|
4: f"AP-IoU=0.50:0.95-area=medium-maxDets={current_max_dets[2]}", |
|
5: f"AP-IoU=0.50:0.95-area=large-maxDets={current_max_dets[2]}", |
|
6: f"AR-IoU=0.50:0.95-area=all-maxDets={current_max_dets[0]}", |
|
7: f"AR-IoU=0.50:0.95-area=all-maxDets={current_max_dets[1]}", |
|
8: f"AR-IoU=0.50:0.95-area=all-maxDets={current_max_dets[2]}", |
|
9: f"AR-IoU=0.50:0.95-area=small-maxDets={current_max_dets[2]}", |
|
10: f"AR-IoU=0.50:0.95-area=medium-maxDets={current_max_dets[2]}", |
|
11: f"AR-IoU=0.50:0.95-area=large-maxDets={current_max_dets[2]}", |
|
}, |
|
"keypoints": { |
|
0: "AP-IoU=0.50:0.95-area=all-maxDets=20", |
|
1: "AP-IoU=0.50-area=all-maxDets=20", |
|
2: "AP-IoU=0.75-area=all-maxDets=20", |
|
3: "AP-IoU=0.50:0.95-area=medium-maxDets=20", |
|
4: "AP-IoU=0.50:0.95-area=large-maxDets=20", |
|
5: "AR-IoU=0.50:0.95-area=all-maxDets=20", |
|
6: "AR-IoU=0.50-area=all-maxDets=20", |
|
7: "AR-IoU=0.75-area=all-maxDets=20", |
|
8: "AR-IoU=0.50:0.95-area=medium-maxDets=20", |
|
9: "AR-IoU=0.50:0.95-area=large-maxDets=20", |
|
}, |
|
} |
|
|
|
output_dict: Dict[str, float] = {} |
|
for index, stat in enumerate(stats): |
|
output_dict[index_to_title[iou_type][index]] = stat |
|
|
|
return output_dict |
|
|
|
def get_results(self) -> Dict[str, Dict[str, float]]: |
|
""" |
|
Gets the results of the COCO evaluation. |
|
|
|
Returns: |
|
A dictionary with the results of the COCO evaluation. |
|
""" |
|
output_dict = {} |
|
|
|
for iou_type, coco_eval in self.coco_eval.items(): |
|
if iou_type == "segm": |
|
iou_type = "bbox" |
|
output_dict[f"iou_{iou_type}"] = self._post_process_stats( |
|
coco_eval.stats, coco_eval, iou_type |
|
) |
|
return output_dict |
|
|
|
def prepare_for_coco_detection( |
|
self, predictions: _TYPING_PREDICTIONS |
|
) -> List[Dict[str, Union[int, _TYPING_BOX, float]]]: |
|
""" |
|
Prepares the predictions for COCO detection. |
|
|
|
Args: |
|
predictions: The predictions to prepare. |
|
|
|
Returns: |
|
A list of dictionaries with the prepared predictions. |
|
""" |
|
coco_results = [] |
|
for original_id, prediction in predictions.items(): |
|
if len(prediction) == 0: |
|
continue |
|
|
|
boxes = prediction["boxes"] |
|
if len(boxes) == 0: |
|
continue |
|
|
|
if not isinstance(boxes, torch.Tensor): |
|
boxes = torch.as_tensor(boxes) |
|
boxes = boxes.tolist() |
|
|
|
scores = prediction["scores"] |
|
if not isinstance(scores, list): |
|
scores = scores.tolist() |
|
|
|
labels = prediction["labels"] |
|
if not isinstance(labels, list): |
|
labels = prediction["labels"].tolist() |
|
|
|
coco_results.extend( |
|
[ |
|
{ |
|
"image_id": original_id, |
|
"category_id": labels[k], |
|
"bbox": box, |
|
"score": scores[k], |
|
} |
|
for k, box in enumerate(boxes) |
|
] |
|
) |
|
return coco_results |
|
|
|
|
|
_DESCRIPTION = "This class evaluates object detection models using the COCO dataset \ |
|
and its evaluation metrics." |
|
_HOMEPAGE = "https://cocodataset.org" |
|
_CITATION = """ |
|
@misc{lin2015microsoft, \ |
|
title={Microsoft COCO: Common Objects in Context}, |
|
author={Tsung-Yi Lin and Michael Maire and Serge Belongie and Lubomir Bourdev and \ |
|
Ross Girshick and James Hays and Pietro Perona and Deva Ramanan and C. Lawrence Zitnick \ |
|
and Piotr Dollár}, |
|
year={2015}, |
|
eprint={1405.0312}, |
|
archivePrefix={arXiv}, |
|
primaryClass={cs.CV} |
|
} |
|
""" |
|
_REFERENCE_URLS = [ |
|
"https://ieeexplore.ieee.org/abstract/document/9145130", |
|
"https://www.mdpi.com/2079-9292/10/3/279", |
|
"https://cocodataset.org/#detection-eval", |
|
] |
|
_KWARGS_DESCRIPTION = """\ |
|
Computes COCO metrics for object detection: AP(mAP) and its variants. |
|
|
|
Args: |
|
coco (COCO): COCO Evaluator object for evaluating predictions. |
|
**kwargs: Additional keyword arguments forwarded to evaluate.Metrics. |
|
""" |
|
|
|
|
|
class EvaluateObjectDetection(evaluate.Metric): |
|
""" |
|
Class for evaluating object detection models. |
|
""" |
|
|
|
def __init__(self, json_gt: Union[Path, Dict], iou_type: str = "bbox", **kwargs): |
|
""" |
|
Initializes the EvaluateObjectDetection class. |
|
|
|
Args: |
|
json_gt: JSON with ground-truth annotations in COCO format. |
|
# coco_groundtruth (COCO): COCO Evaluator object for evaluating predictions. |
|
**kwargs: Additional keyword arguments forwarded to evaluate.Metrics. |
|
""" |
|
super().__init__(**kwargs) |
|
|
|
|
|
if isinstance(json_gt, Path): |
|
assert json_gt.exists(), f"Path {json_gt} does not exist." |
|
with open(json_gt) as f: |
|
json_data = json.load(f) |
|
elif isinstance(json_gt, dict): |
|
json_data = json_gt |
|
coco = COCO(json_data) |
|
|
|
self.coco_evaluator = COCOEvaluator(coco, [iou_type]) |
|
|
|
def remove_classes(self, classes_to_remove: List[str]): |
|
to_remove = [c.upper() for c in classes_to_remove] |
|
cats = {} |
|
for id, cat in self.coco_evaluator.coco_eval["bbox"].cocoGt.cats.items(): |
|
if cat["name"].upper() not in to_remove: |
|
cats[id] = cat |
|
self.coco_evaluator.coco_eval["bbox"].cocoGt.cats = cats |
|
self.coco_evaluator.coco_gt.cats = cats |
|
self.coco_evaluator.coco_gt.dataset["categories"] = list(cats.values()) |
|
self.coco_evaluator.coco_eval["bbox"].params.catIds = [c["id"] for c in cats.values()] |
|
|
|
def _info(self): |
|
""" |
|
Returns the MetricInfo object with information about the module. |
|
|
|
Returns: |
|
evaluate.MetricInfo: Metric information object. |
|
""" |
|
return evaluate.MetricInfo( |
|
module_type="metric", |
|
description=_DESCRIPTION, |
|
citation=_CITATION, |
|
inputs_description=_KWARGS_DESCRIPTION, |
|
|
|
features=datasets.Features( |
|
{ |
|
"predictions": [ |
|
datasets.Features( |
|
{ |
|
"scores": datasets.Sequence(datasets.Value("float")), |
|
"labels": datasets.Sequence(datasets.Value("int64")), |
|
"boxes": datasets.Sequence( |
|
datasets.Sequence(datasets.Value("float")) |
|
), |
|
} |
|
) |
|
], |
|
"references": [ |
|
datasets.Features( |
|
{ |
|
"image_id": datasets.Sequence(datasets.Value("int64")), |
|
} |
|
) |
|
], |
|
} |
|
), |
|
|
|
homepage=_HOMEPAGE, |
|
|
|
reference_urls=_REFERENCE_URLS, |
|
) |
|
|
|
def _preprocess( |
|
self, predictions: List[Dict[str, torch.Tensor]] |
|
) -> List[_TYPING_PREDICTION]: |
|
""" |
|
Preprocesses the predictions before computing the scores. |
|
|
|
Args: |
|
predictions (List[Dict[str, torch.Tensor]]): A list of prediction dicts. |
|
|
|
Returns: |
|
List[_TYPING_PREDICTION]: A list of preprocessed prediction dicts. |
|
""" |
|
processed_predictions = [] |
|
for pred in predictions: |
|
processed_pred: _TYPING_PREDICTION = {} |
|
for k, val in pred.items(): |
|
if isinstance(val, torch.Tensor): |
|
val = val.detach().cpu().tolist() |
|
if k == "labels": |
|
val = list(map(int, val)) |
|
processed_pred[k] = val |
|
processed_predictions.append(processed_pred) |
|
return processed_predictions |
|
|
|
def _clear_predictions(self, predictions): |
|
|
|
required = ["scores", "labels", "boxes"] |
|
ret = [] |
|
for prediction in predictions: |
|
ret.append({k: v for k, v in prediction.items() if k in required}) |
|
return ret |
|
|
|
def _clear_references(self, references): |
|
required = [""] |
|
ret = [] |
|
for ref in references: |
|
ret.append({k: v for k, v in ref.items() if k in required}) |
|
return ret |
|
|
|
def add(self, *, prediction=None, reference=None, **kwargs): |
|
""" |
|
Preprocesses the predictions and references and calls the parent class function. |
|
|
|
Args: |
|
prediction: A list of prediction dicts. |
|
reference: A list of reference dicts. |
|
**kwargs: Additional keyword arguments. |
|
""" |
|
if prediction is not None: |
|
prediction = self._clear_predictions(prediction) |
|
prediction = self._preprocess(prediction) |
|
|
|
res = {} |
|
for output, target in zip(prediction, reference): |
|
res[target["image_id"][0]] = output |
|
self.coco_evaluator.update(res) |
|
|
|
super(evaluate.Metric, self).add(prediction=prediction, references=reference, **kwargs) |
|
|
|
def _compute( |
|
self, |
|
predictions: List[List[_TYPING_PREDICTION]], |
|
references: List[List[_TYPING_REFERENCE]], |
|
) -> Dict[str, Dict[str, float]]: |
|
""" |
|
Returns the evaluation scores. |
|
|
|
Args: |
|
predictions (List[List[_TYPING_PREDICTION]]): A list of predictions. |
|
references (List[List[_TYPING_REFERENCE]]): A list of references. |
|
|
|
Returns: |
|
Dict: A dictionary containing evaluation scores. |
|
""" |
|
print("Synchronizing processes") |
|
self.coco_evaluator.synchronize_between_processes() |
|
|
|
print("Accumulating values") |
|
self.coco_evaluator.accumulate() |
|
|
|
print("Summarizing results") |
|
self.coco_evaluator.summarize() |
|
|
|
stats = self.coco_evaluator.get_results() |
|
return stats |
|
|