# -*- coding: utf-8 -*- # Copyright (c) Facebook, Inc. and its affiliates. import contextlib import copy import io import itertools import logging import numpy as np import os from collections import OrderedDict from typing import Dict, Iterable, List, Optional import pycocotools.mask as mask_utils import torch from pycocotools.coco import COCO from tabulate import tabulate from detectron2.config import CfgNode from detectron2.data import MetadataCatalog from detectron2.evaluation import DatasetEvaluator from detectron2.structures import BoxMode from detectron2.utils.comm import gather, get_rank, is_main_process, synchronize from detectron2.utils.file_io import PathManager from detectron2.utils.logger import create_small_table from densepose.converters import ToChartResultConverter, ToMaskConverter from densepose.data.datasets.coco import maybe_filter_and_map_categories_cocoapi from densepose.structures import ( DensePoseChartPredictorOutput, DensePoseEmbeddingPredictorOutput, quantize_densepose_chart_result, ) from .densepose_coco_evaluation import DensePoseCocoEval, DensePoseEvalMode from .mesh_alignment_evaluator import MeshAlignmentEvaluator from .tensor_storage import ( SingleProcessFileTensorStorage, SingleProcessRamTensorStorage, SingleProcessTensorStorage, SizeData, storage_gather, ) class DensePoseCOCOEvaluator(DatasetEvaluator): def __init__( self, dataset_name, distributed, output_dir=None, evaluator_type: str = "iuv", min_iou_threshold: float = 0.5, storage: Optional[SingleProcessTensorStorage] = None, embedder=None, should_evaluate_mesh_alignment: bool = False, mesh_alignment_mesh_names: Optional[List[str]] = None, ): self._embedder = embedder self._distributed = distributed self._output_dir = output_dir self._evaluator_type = evaluator_type self._storage = storage self._should_evaluate_mesh_alignment = should_evaluate_mesh_alignment assert not ( should_evaluate_mesh_alignment and embedder is None ), "Mesh alignment evaluation is activated, but no vertex embedder provided!" if should_evaluate_mesh_alignment: self._mesh_alignment_evaluator = MeshAlignmentEvaluator( embedder, mesh_alignment_mesh_names, ) self._cpu_device = torch.device("cpu") self._logger = logging.getLogger(__name__) self._metadata = MetadataCatalog.get(dataset_name) self._min_threshold = min_iou_threshold json_file = PathManager.get_local_path(self._metadata.json_file) with contextlib.redirect_stdout(io.StringIO()): self._coco_api = COCO(json_file) maybe_filter_and_map_categories_cocoapi(dataset_name, self._coco_api) def reset(self): self._predictions = [] def process(self, inputs, outputs): """ Args: inputs: the inputs to a COCO model (e.g., GeneralizedRCNN). It is a list of dict. Each dict corresponds to an image and contains keys like "height", "width", "file_name", "image_id". outputs: the outputs of a COCO model. It is a list of dicts with key "instances" that contains :class:`Instances`. The :class:`Instances` object needs to have `densepose` field. """ for input, output in zip(inputs, outputs): instances = output["instances"].to(self._cpu_device) if not instances.has("pred_densepose"): continue prediction_list = prediction_to_dict( instances, input["image_id"], self._embedder, self._metadata.class_to_mesh_name, self._storage is not None, ) if self._storage is not None: for prediction_dict in prediction_list: dict_to_store = {} for field_name in self._storage.data_schema: dict_to_store[field_name] = prediction_dict[field_name] record_id = self._storage.put(dict_to_store) prediction_dict["record_id"] = record_id prediction_dict["rank"] = get_rank() for field_name in self._storage.data_schema: del prediction_dict[field_name] self._predictions.extend(prediction_list) def evaluate(self, img_ids=None): if self._distributed: synchronize() predictions = gather(self._predictions) predictions = list(itertools.chain(*predictions)) else: predictions = self._predictions multi_storage = storage_gather(self._storage) if self._storage is not None else None if not is_main_process(): return return copy.deepcopy(self._eval_predictions(predictions, multi_storage, img_ids)) def _eval_predictions(self, predictions, multi_storage=None, img_ids=None): """ Evaluate predictions on densepose. Return results with the metrics of the tasks. """ self._logger.info("Preparing results for COCO format ...") if self._output_dir: PathManager.mkdirs(self._output_dir) file_path = os.path.join(self._output_dir, "coco_densepose_predictions.pth") with PathManager.open(file_path, "wb") as f: torch.save(predictions, f) self._logger.info("Evaluating predictions ...") res = OrderedDict() results_gps, results_gpsm, results_segm = _evaluate_predictions_on_coco( self._coco_api, predictions, multi_storage, self._embedder, class_names=self._metadata.get("thing_classes"), min_threshold=self._min_threshold, img_ids=img_ids, ) res["densepose_gps"] = results_gps res["densepose_gpsm"] = results_gpsm res["densepose_segm"] = results_segm if self._should_evaluate_mesh_alignment: res["densepose_mesh_alignment"] = self._evaluate_mesh_alignment() return res def _evaluate_mesh_alignment(self): self._logger.info("Mesh alignment evaluation ...") mean_ge, mean_gps, per_mesh_metrics = self._mesh_alignment_evaluator.evaluate() results = { "GE": mean_ge * 100, "GPS": mean_gps * 100, } mesh_names = set() for metric_name in per_mesh_metrics: for mesh_name, value in per_mesh_metrics[metric_name].items(): results[f"{metric_name}-{mesh_name}"] = value * 100 mesh_names.add(mesh_name) self._print_mesh_alignment_results(results, mesh_names) return results def _print_mesh_alignment_results(self, results: Dict[str, float], mesh_names: Iterable[str]): self._logger.info("Evaluation results for densepose, mesh alignment:") self._logger.info(f'| {"Mesh":13s} | {"GErr":7s} | {"GPS":7s} |') self._logger.info("| :-----------: | :-----: | :-----: |") for mesh_name in mesh_names: ge_key = f"GE-{mesh_name}" ge_str = f"{results[ge_key]:.4f}" if ge_key in results else " " gps_key = f"GPS-{mesh_name}" gps_str = f"{results[gps_key]:.4f}" if gps_key in results else " " self._logger.info(f"| {mesh_name:13s} | {ge_str:7s} | {gps_str:7s} |") self._logger.info("| :-------------------------------: |") ge_key = "GE" ge_str = f"{results[ge_key]:.4f}" if ge_key in results else " " gps_key = "GPS" gps_str = f"{results[gps_key]:.4f}" if gps_key in results else " " self._logger.info(f'| {"MEAN":13s} | {ge_str:7s} | {gps_str:7s} |') def prediction_to_dict(instances, img_id, embedder, class_to_mesh_name, use_storage): """ Args: instances (Instances): the output of the model img_id (str): the image id in COCO Returns: list[dict]: the results in densepose evaluation format """ scores = instances.scores.tolist() classes = instances.pred_classes.tolist() raw_boxes_xywh = BoxMode.convert( instances.pred_boxes.tensor.clone(), BoxMode.XYXY_ABS, BoxMode.XYWH_ABS ) if isinstance(instances.pred_densepose, DensePoseEmbeddingPredictorOutput): results_densepose = densepose_cse_predictions_to_dict( instances, embedder, class_to_mesh_name, use_storage ) elif isinstance(instances.pred_densepose, DensePoseChartPredictorOutput): if not use_storage: results_densepose = densepose_chart_predictions_to_dict(instances) else: results_densepose = densepose_chart_predictions_to_storage_dict(instances) results = [] for k in range(len(instances)): result = { "image_id": img_id, "category_id": classes[k], "bbox": raw_boxes_xywh[k].tolist(), "score": scores[k], } results.append({**result, **results_densepose[k]}) return results def densepose_chart_predictions_to_dict(instances): segmentations = ToMaskConverter.convert( instances.pred_densepose, instances.pred_boxes, instances.image_size ) results = [] for k in range(len(instances)): densepose_results_quantized = quantize_densepose_chart_result( ToChartResultConverter.convert(instances.pred_densepose[k], instances.pred_boxes[k]) ) densepose_results_quantized.labels_uv_uint8 = ( densepose_results_quantized.labels_uv_uint8.cpu() ) segmentation = segmentations.tensor[k] segmentation_encoded = mask_utils.encode( np.require(segmentation.numpy(), dtype=np.uint8, requirements=["F"]) ) segmentation_encoded["counts"] = segmentation_encoded["counts"].decode("utf-8") result = { "densepose": densepose_results_quantized, "segmentation": segmentation_encoded, } results.append(result) return results def densepose_chart_predictions_to_storage_dict(instances): results = [] for k in range(len(instances)): densepose_predictor_output = instances.pred_densepose[k] result = { "coarse_segm": densepose_predictor_output.coarse_segm.squeeze(0).cpu(), "fine_segm": densepose_predictor_output.fine_segm.squeeze(0).cpu(), "u": densepose_predictor_output.u.squeeze(0).cpu(), "v": densepose_predictor_output.v.squeeze(0).cpu(), } results.append(result) return results def densepose_cse_predictions_to_dict(instances, embedder, class_to_mesh_name, use_storage): results = [] for k in range(len(instances)): cse = instances.pred_densepose[k] results.append( { "coarse_segm": cse.coarse_segm[0].cpu(), "embedding": cse.embedding[0].cpu(), } ) return results def _evaluate_predictions_on_coco( coco_gt, coco_results, multi_storage=None, embedder=None, class_names=None, min_threshold: float = 0.5, img_ids=None, ): logger = logging.getLogger(__name__) densepose_metrics = _get_densepose_metrics(min_threshold) if len(coco_results) == 0: # cocoapi does not handle empty results very well logger.warn("No predictions from the model! Set scores to -1") results_gps = {metric: -1 for metric in densepose_metrics} results_gpsm = {metric: -1 for metric in densepose_metrics} results_segm = {metric: -1 for metric in densepose_metrics} return results_gps, results_gpsm, results_segm coco_dt = coco_gt.loadRes(coco_results) results = [] for eval_mode_name in ["GPS", "GPSM", "IOU"]: eval_mode = getattr(DensePoseEvalMode, eval_mode_name) coco_eval = DensePoseCocoEval( coco_gt, coco_dt, "densepose", multi_storage, embedder, dpEvalMode=eval_mode ) result = _derive_results_from_coco_eval( coco_eval, eval_mode_name, densepose_metrics, class_names, min_threshold, img_ids ) results.append(result) return results def _get_densepose_metrics(min_threshold: float = 0.5): metrics = ["AP"] if min_threshold <= 0.201: metrics += ["AP20"] if min_threshold <= 0.301: metrics += ["AP30"] if min_threshold <= 0.401: metrics += ["AP40"] metrics.extend(["AP50", "AP75", "APm", "APl", "AR", "AR50", "AR75", "ARm", "ARl"]) return metrics def _derive_results_from_coco_eval( coco_eval, eval_mode_name, metrics, class_names, min_threshold: float, img_ids ): if img_ids is not None: coco_eval.params.imgIds = img_ids coco_eval.params.iouThrs = np.linspace( min_threshold, 0.95, int(np.round((0.95 - min_threshold) / 0.05)) + 1, endpoint=True ) coco_eval.evaluate() coco_eval.accumulate() coco_eval.summarize() results = {metric: float(coco_eval.stats[idx] * 100) for idx, metric in enumerate(metrics)} logger = logging.getLogger(__name__) logger.info( f"Evaluation results for densepose, {eval_mode_name} metric: \n" + create_small_table(results) ) if class_names is None or len(class_names) <= 1: return results # Compute per-category AP, the same way as it is done in D2 # (see detectron2/evaluation/coco_evaluation.py): precisions = coco_eval.eval["precision"] # precision has dims (iou, recall, cls, area range, max dets) assert len(class_names) == precisions.shape[2] results_per_category = [] for idx, name in enumerate(class_names): # area range index 0: all area ranges # max dets index -1: typically 100 per image precision = precisions[:, :, idx, 0, -1] precision = precision[precision > -1] ap = np.mean(precision) if precision.size else float("nan") results_per_category.append((f"{name}", float(ap * 100))) # tabulate it n_cols = min(6, len(results_per_category) * 2) results_flatten = list(itertools.chain(*results_per_category)) results_2d = itertools.zip_longest(*[results_flatten[i::n_cols] for i in range(n_cols)]) table = tabulate( results_2d, tablefmt="pipe", floatfmt=".3f", headers=["category", "AP"] * (n_cols // 2), numalign="left", ) logger.info(f"Per-category {eval_mode_name} AP: \n" + table) results.update({"AP-" + name: ap for name, ap in results_per_category}) return results def build_densepose_evaluator_storage(cfg: CfgNode, output_folder: str): storage_spec = cfg.DENSEPOSE_EVALUATION.STORAGE if storage_spec == "none": return None evaluator_type = cfg.DENSEPOSE_EVALUATION.TYPE # common output tensor sizes hout = cfg.MODEL.ROI_DENSEPOSE_HEAD.HEATMAP_SIZE wout = cfg.MODEL.ROI_DENSEPOSE_HEAD.HEATMAP_SIZE n_csc = cfg.MODEL.ROI_DENSEPOSE_HEAD.NUM_COARSE_SEGM_CHANNELS # specific output tensors if evaluator_type == "iuv": n_fsc = cfg.MODEL.ROI_DENSEPOSE_HEAD.NUM_PATCHES + 1 schema = { "coarse_segm": SizeData(dtype="float32", shape=(n_csc, hout, wout)), "fine_segm": SizeData(dtype="float32", shape=(n_fsc, hout, wout)), "u": SizeData(dtype="float32", shape=(n_fsc, hout, wout)), "v": SizeData(dtype="float32", shape=(n_fsc, hout, wout)), } elif evaluator_type == "cse": embed_size = cfg.MODEL.ROI_DENSEPOSE_HEAD.CSE.EMBED_SIZE schema = { "coarse_segm": SizeData(dtype="float32", shape=(n_csc, hout, wout)), "embedding": SizeData(dtype="float32", shape=(embed_size, hout, wout)), } else: raise ValueError(f"Unknown evaluator type: {evaluator_type}") # storage types if storage_spec == "ram": storage = SingleProcessRamTensorStorage(schema, io.BytesIO()) elif storage_spec == "file": fpath = os.path.join(output_folder, f"DensePoseEvaluatorStorage.{get_rank()}.bin") PathManager.mkdirs(output_folder) storage = SingleProcessFileTensorStorage(schema, fpath, "wb") else: raise ValueError(f"Unknown storage specification: {storage_spec}") return storage