MASA_GroundingDINO / masa /apis /masa_inference.py
JohanDL's picture
initial commit
f1dd031
raw
history blame contribute delete
No virus
10.2 kB
import copy
import time
import warnings
from pathlib import Path
from typing import Optional, Sequence, Union
import numpy as np
import torch
import torch.nn as nn
from mmcv.ops import RoIPool
from mmcv.transforms import Compose
from mmdet.evaluation import get_classes
from mmdet.registry import MODELS
from mmdet.structures import DetDataSample, SampleList
from mmdet.utils import ConfigType, get_test_pipeline_cfg
from mmengine.config import Config
from mmengine.dataset import default_collate
from mmengine.model.utils import revert_sync_batchnorm
from mmengine.registry import init_default_scope
from mmengine.runner import autocast, load_checkpoint
ImagesType = Union[str, np.ndarray, Sequence[str], Sequence[np.ndarray]]
def init_masa(
config: Union[str, Path, Config],
checkpoint: Optional[str] = None,
palette: str = "none",
device: str = "cuda:0",
cfg_options: Optional[dict] = None,
) -> nn.Module:
"""Initialize a unified masa detector from config file.
Args:
config (str, :obj:`Path`, or :obj:`mmengine.Config`): Config file path,
:obj:`Path`, or the config object.
checkpoint (str, optional): Checkpoint path. If left as None, the model
will not load any weights.
palette (str): Color palette used for visualization. If palette
is stored in checkpoint, use checkpoint's palette first, otherwise
use externally passed palette. Currently, supports 'coco', 'voc',
'citys' and 'random'. Defaults to none.
device (str): The device where the anchors will be put on.
Defaults to cuda:0.
cfg_options (dict, optional): Options to override some settings in
the used config.
Returns:
nn.Module: The constructed detector.
"""
if isinstance(config, (str, Path)):
config = Config.fromfile(config)
elif not isinstance(config, Config):
raise TypeError(
"config must be a filename or Config object, " f"but got {type(config)}"
)
with_backbone = config.model.get("backbone", False)
if with_backbone:
if cfg_options is not None:
config.merge_from_dict(cfg_options)
elif "init_cfg" in config.model.backbone:
config.model.backbone.init_cfg = None
else:
if cfg_options is not None:
config.merge_from_dict(cfg_options)
elif "init_cfg" in config.model.detector.backbone:
config.model.detector.backbone.init_cfg = None
scope = config.get("default_scope", "mmdet")
if scope is not None:
init_default_scope(config.get("default_scope", "mmdet"))
model = MODELS.build(config.model)
model = revert_sync_batchnorm(model)
if checkpoint is None:
warnings.simplefilter("once")
warnings.warn("checkpoint is None, use COCO classes by default.")
model.dataset_meta = {"classes": get_classes("coco")}
else:
checkpoint = load_checkpoint(model, checkpoint, map_location="cpu")
# Weights converted from elsewhere may not have meta fields.
checkpoint_meta = checkpoint.get("meta", {})
# save the dataset_meta in the model for convenience
if "dataset_meta" in checkpoint_meta:
# mmdet 3.x, all keys should be lowercase
model.dataset_meta = {
k.lower(): v for k, v in checkpoint_meta["dataset_meta"].items()
}
elif "CLASSES" in checkpoint_meta:
# < mmdet 3.x
classes = checkpoint_meta["CLASSES"]
model.dataset_meta = {"classes": classes}
else:
warnings.simplefilter("once")
warnings.warn(
"dataset_meta or class names are not saved in the "
"checkpoint's meta data, use COCO classes by default."
)
model.dataset_meta = {"classes": get_classes("coco")}
# Priority: args.palette -> config -> checkpoint
if palette != "none":
model.dataset_meta["palette"] = palette
else:
if "palette" not in model.dataset_meta:
warnings.warn(
"palette does not exist, random is used by default. "
"You can also set the palette to customize."
)
model.dataset_meta["palette"] = "random"
model.cfg = config # save the config in the model for convenience
model.to(device)
model.eval()
return model
def inference_detector(
model: nn.Module,
imgs: ImagesType,
test_pipeline: Optional[Compose] = None,
text_prompt: Optional[str] = None,
custom_entities: bool = False,
fp16: bool = False,
) -> Union[DetDataSample, SampleList]:
"""Inference image(s) with the detector.
Args:
model (nn.Module): The loaded detector.
imgs (str, ndarray, Sequence[str/ndarray]):
Either image files or loaded images.
test_pipeline (:obj:`Compose`): Test pipeline.
Returns:
:obj:`DetDataSample` or list[:obj:`DetDataSample`]:
If imgs is a list or tuple, the same length list type results
will be returned, otherwise return the detection results directly.
"""
if isinstance(imgs, (list, tuple)):
is_batch = True
else:
imgs = [imgs]
is_batch = False
cfg = model.cfg
if test_pipeline is None:
cfg = cfg.copy()
test_pipeline = get_test_pipeline_cfg(cfg)
if isinstance(imgs[0], np.ndarray):
# Calling this method across libraries will result
# in module unregistered error if not prefixed with mmdet.
test_pipeline[0].type = "mmdet.LoadImageFromNDArray"
test_pipeline = Compose(test_pipeline)
if model.data_preprocessor.device.type == "cpu":
for m in model.modules():
assert not isinstance(
m, RoIPool
), "CPU inference with RoIPool is not supported currently."
result_list = []
for i, img in enumerate(imgs):
# prepare data
if isinstance(img, np.ndarray):
# TODO: remove img_id.
data_ = dict(img=img, img_id=0)
else:
# TODO: remove img_id.
data_ = dict(img_path=img, img_id=0)
if text_prompt:
data_["text"] = text_prompt
data_["custom_entities"] = custom_entities
# build the data pipeline
data_ = test_pipeline(data_)
data_["inputs"] = [data_["inputs"]]
data_["data_samples"] = [data_["data_samples"]]
# forward the model
with torch.no_grad():
with autocast(enabled=fp16):
results = model.test_step(data_)[0]
result_list.append(results)
if not is_batch:
return result_list[0]
else:
return result_list
def inference_masa(
model: nn.Module,
img: np.ndarray,
frame_id: int,
video_len: int,
test_pipeline: Optional[Compose] = None,
text_prompt=None,
custom_entities: bool = False,
det_bboxes=None,
det_labels=None,
fp16=False,
detector_type="mmdet",
show_fps=False,
) -> SampleList:
"""Inference image(s) with the masa model.
Args:
model (nn.Module): The loaded mot model.
img (np.ndarray): Loaded image.
frame_id (int): frame id.
video_len (int): demo video length
Returns:
SampleList: The tracking data samples.
"""
data = dict(
img=[img.astype(np.float32)],
# img=[img.astype(np.uint8)],
frame_id=[frame_id],
ori_shape=[img.shape[:2]],
img_id=[frame_id + 1],
ori_video_length=[video_len],
)
if text_prompt is not None:
if detector_type == "mmdet":
data["text"] = [text_prompt]
data["custom_entities"] = [custom_entities]
elif detector_type == "yolo-world":
data["texts"] = [text_prompt]
data["custom_entities"] = [custom_entities]
data = test_pipeline(data)
# forward the model
with torch.no_grad():
data = default_collate([data])
if det_bboxes is not None:
data["data_samples"][0].video_data_samples[0].det_bboxes = det_bboxes
data["data_samples"][0].video_data_samples[0].det_labels = det_labels
# measure FPS ##
if show_fps:
start = time.time()
with autocast(enabled=fp16):
result = model.test_step(data)[0]
end = time.time()
fps = 1 / (end - start)
return result, fps
else:
with autocast(enabled=fp16):
result = model.test_step(data)[0]
return result
def build_test_pipeline(
cfg: ConfigType, with_text=False, detector_type="mmdet"
) -> ConfigType:
"""Build test_pipeline for mot/vis demo. In mot/vis infer, original
test_pipeline should remove the "LoadImageFromFile" and
"LoadTrackAnnotations".
Args:
cfg (ConfigDict): The loaded config.
Returns:
ConfigType: new test_pipeline
"""
# remove the "LoadImageFromFile" and "LoadTrackAnnotations" in pipeline
transform_broadcaster = cfg.inference_pipeline[0].copy()
if detector_type == "yolo-world":
kept_transform = []
for transform in transform_broadcaster["transforms"]:
if (
transform["type"] == "mmyolo.YOLOv5KeepRatioResize"
or transform["type"] == "mmyolo.LetterResize"
):
kept_transform.append(transform)
transform_broadcaster["transforms"] = kept_transform
pack_track_inputs = cfg.test_dataloader.dataset.pipeline[-1].copy()
test_pipeline = Compose([transform_broadcaster, pack_track_inputs])
else:
for transform in transform_broadcaster["transforms"]:
if "Resize" in transform["type"]:
transform_broadcaster["transforms"] = transform
pack_track_inputs = cfg.inference_pipeline[-1].copy()
if with_text:
pack_track_inputs["meta_keys"] = ("text", "custom_entities")
test_pipeline = Compose([transform_broadcaster, pack_track_inputs])
return test_pipeline