rawalkhirodkar's picture
Add initial commit
28c256d
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the license found in the
# LICENSE file in the root directory of this source tree.
import logging
from abc import ABCMeta, abstractmethod
from typing import Any, List, Optional, Sequence, Union
from torch import Tensor
from mmengine.dist import (broadcast_object_list, collect_results,
is_main_process)
from mmengine.fileio import dump
from mmengine.logging import print_log
from mmengine.registry import METRICS
from mmengine.structures import BaseDataElement
class BaseMetric(metaclass=ABCMeta):
"""Base class for a metric.
The metric first processes each batch of data_samples and predictions,
and appends the processed results to the results list. Then it
collects all results together from all ranks if distributed training
is used. Finally, it computes the metrics of the entire dataset.
A subclass of class:`BaseMetric` should assign a meaningful value to the
class attribute `default_prefix`. See the argument `prefix` for details.
Args:
collect_device (str): Device name used for collecting results from
different ranks during distributed training. Must be 'cpu' or
'gpu'. Defaults to 'cpu'.
prefix (str, optional): The prefix that will be added in the metric
names to disambiguate homonymous metrics of different evaluators.
If prefix is not provided in the argument, self.default_prefix
will be used instead. Default: None
collect_dir: (str, optional): Synchronize directory for collecting data
from different ranks. This argument should only be configured when
``collect_device`` is 'cpu'. Defaults to None.
`New in version 0.7.3.`
"""
default_prefix: Optional[str] = None
def __init__(self,
collect_device: str = 'cpu',
prefix: Optional[str] = None,
collect_dir: Optional[str] = None) -> None:
if collect_dir is not None and collect_device != 'cpu':
raise ValueError('`collec_dir` could only be configured when '
"`collect_device='cpu'`")
self._dataset_meta: Union[None, dict] = None
self.collect_device = collect_device
self.results: List[Any] = []
self.prefix = prefix or self.default_prefix
self.collect_dir = collect_dir
if self.prefix is None:
print_log(
'The prefix is not set in metric class '
f'{self.__class__.__name__}.',
logger='current',
level=logging.WARNING)
@property
def dataset_meta(self) -> Optional[dict]:
"""Optional[dict]: Meta info of the dataset."""
return self._dataset_meta
@dataset_meta.setter
def dataset_meta(self, dataset_meta: dict) -> None:
"""Set the dataset meta info to the metric."""
self._dataset_meta = dataset_meta
@abstractmethod
def process(self, data_batch: Any, data_samples: Sequence[dict]) -> None:
"""Process one batch of data samples and predictions. The processed
results should be stored in ``self.results``, which will be used to
compute the metrics when all batches have been processed.
Args:
data_batch (Any): A batch of data from the dataloader.
data_samples (Sequence[dict]): A batch of outputs from
the model.
"""
@abstractmethod
def compute_metrics(self, results: list) -> dict:
"""Compute the metrics from processed results.
Args:
results (list): The processed results of each batch.
Returns:
dict: The computed metrics. The keys are the names of the metrics,
and the values are corresponding results.
"""
def evaluate(self, size: int) -> dict:
"""Evaluate the model performance of the whole dataset after processing
all batches.
Args:
size (int): Length of the entire validation dataset. When batch
size > 1, the dataloader may pad some data samples to make
sure all ranks have the same length of dataset slice. The
``collect_results`` function will drop the padded data based on
this size.
Returns:
dict: Evaluation metrics dict on the val dataset. The keys are the
names of the metrics, and the values are corresponding results.
"""
if len(self.results) == 0:
print_log(
f'{self.__class__.__name__} got empty `self.results`. Please '
'ensure that the processed results are properly added into '
'`self.results` in `process` method.',
logger='current',
level=logging.WARNING)
if self.collect_device == 'cpu':
results = collect_results(
self.results,
size,
self.collect_device,
tmpdir=self.collect_dir)
else:
results = collect_results(self.results, size, self.collect_device)
if is_main_process():
# cast all tensors in results list to cpu
results = _to_cpu(results)
_metrics = self.compute_metrics(results) # type: ignore
# Add prefix to metric names
if self.prefix:
_metrics = {
'/'.join((self.prefix, k)): v
for k, v in _metrics.items()
}
metrics = [_metrics]
else:
metrics = [None] # type: ignore
broadcast_object_list(metrics)
# reset the results list
self.results.clear()
return metrics[0]
@METRICS.register_module()
class DumpResults(BaseMetric):
"""Dump model predictions to a pickle file for offline evaluation.
Args:
out_file_path (str): Path of the dumped file. Must end with '.pkl'
or '.pickle'.
collect_device (str): Device name used for collecting results from
different ranks during distributed training. Must be 'cpu' or
'gpu'. Defaults to 'cpu'.
collect_dir: (str, optional): Synchronize directory for collecting data
from different ranks. This argument should only be configured when
``collect_device`` is 'cpu'. Defaults to None.
`New in version 0.7.3.`
"""
def __init__(self,
out_file_path: str,
collect_device: str = 'cpu',
collect_dir: Optional[str] = None) -> None:
super().__init__(
collect_device=collect_device, collect_dir=collect_dir)
if not out_file_path.endswith(('.pkl', '.pickle')):
raise ValueError('The output file must be a pkl file.')
self.out_file_path = out_file_path
def process(self, data_batch: Any, predictions: Sequence[dict]) -> None:
"""transfer tensors in predictions to CPU."""
self.results.extend(_to_cpu(predictions))
def compute_metrics(self, results: list) -> dict:
"""dump the prediction results to a pickle file."""
dump(results, self.out_file_path)
print_log(
f'Results has been saved to {self.out_file_path}.',
logger='current')
return {}
def _to_cpu(data: Any) -> Any:
"""transfer all tensors and BaseDataElement to cpu."""
if isinstance(data, (Tensor, BaseDataElement)):
return data.to('cpu')
elif isinstance(data, list):
return [_to_cpu(d) for d in data]
elif isinstance(data, tuple):
return tuple(_to_cpu(d) for d in data)
elif isinstance(data, dict):
return {k: _to_cpu(v) for k, v in data.items()}
else:
return data