|
import numbers |
|
import random |
|
import warnings |
|
from dataclasses import asdict, dataclass |
|
from typing import Any, Dict, List, Optional, Sequence, Tuple, Union |
|
|
|
import torch |
|
import torchvision.transforms.functional as F |
|
from torchvision.transforms import ( |
|
CenterCrop, |
|
ColorJitter, |
|
Compose, |
|
Grayscale, |
|
InterpolationMode, |
|
Normalize, |
|
RandomResizedCrop, |
|
Resize, |
|
ToTensor, |
|
) |
|
from transformers.image_utils import OPENAI_CLIP_MEAN, OPENAI_CLIP_STD |
|
|
|
OPENAI_DATASET_MEAN = tuple(OPENAI_CLIP_MEAN) |
|
OPENAI_DATASET_STD = tuple(OPENAI_CLIP_STD) |
|
|
|
|
|
@dataclass |
|
class PreprocessCfg: |
|
size: Union[int, Tuple[int, int]] = 224 |
|
mode: str = 'RGB' |
|
mean: Tuple[float, ...] = OPENAI_DATASET_MEAN |
|
std: Tuple[float, ...] = OPENAI_DATASET_STD |
|
interpolation: str = 'bicubic' |
|
resize_mode: str = 'shortest' |
|
fill_color: int = 0 |
|
|
|
def __post_init__(self): |
|
assert self.mode in ('RGB',) |
|
|
|
@property |
|
def num_channels(self): |
|
return 3 |
|
|
|
@property |
|
def input_size(self): |
|
return (self.num_channels,) + (self.size, self.size) |
|
|
|
|
|
_PREPROCESS_KEYS = set(asdict(PreprocessCfg()).keys()) |
|
|
|
|
|
def merge_preprocess_dict( |
|
base: Union[PreprocessCfg, Dict], |
|
overlay: Dict, |
|
): |
|
"""Merge overlay key-value pairs on top of base preprocess cfg or dict. |
|
Input dicts are filtered based on PreprocessCfg fields. |
|
""" |
|
if isinstance(base, PreprocessCfg): |
|
base_clean = asdict(base) |
|
else: |
|
base_clean = {k: v for k, v in base.items() if k in _PREPROCESS_KEYS} |
|
if overlay: |
|
overlay_clean = { |
|
k: v for k, v in overlay.items() if k in _PREPROCESS_KEYS and v is not None |
|
} |
|
base_clean.update(overlay_clean) |
|
return base_clean |
|
|
|
|
|
def merge_preprocess_kwargs(base: Union[PreprocessCfg, Dict], **kwargs): |
|
return merge_preprocess_dict(base, kwargs) |
|
|
|
|
|
@dataclass |
|
class AugmentationCfg: |
|
scale: Tuple[float, float] = (0.9, 1.0) |
|
ratio: Optional[Tuple[float, float]] = None |
|
color_jitter: Optional[ |
|
Union[float, Tuple[float, float, float], Tuple[float, float, float, float]] |
|
] = None |
|
re_prob: Optional[float] = None |
|
re_count: Optional[int] = None |
|
use_timm: bool = False |
|
|
|
|
|
color_jitter_prob: float = None |
|
gray_scale_prob: float = None |
|
|
|
|
|
def _setup_size(size, error_msg): |
|
if isinstance(size, numbers.Number): |
|
return int(size), int(size) |
|
|
|
if isinstance(size, Sequence) and len(size) == 1: |
|
return size[0], size[0] |
|
|
|
if len(size) != 2: |
|
raise ValueError(error_msg) |
|
|
|
return size |
|
|
|
|
|
class ResizeKeepRatio: |
|
"""Resize and Keep Ratio |
|
|
|
Copy & paste from `timm` |
|
""" |
|
|
|
def __init__( |
|
self, |
|
size, |
|
longest=0.0, |
|
interpolation=InterpolationMode.BICUBIC, |
|
random_scale_prob=0.0, |
|
random_scale_range=(0.85, 1.05), |
|
random_aspect_prob=0.0, |
|
random_aspect_range=(0.9, 1.11), |
|
): |
|
if isinstance(size, (list, tuple)): |
|
self.size = tuple(size) |
|
else: |
|
self.size = (size, size) |
|
self.interpolation = interpolation |
|
self.longest = float(longest) |
|
self.random_scale_prob = random_scale_prob |
|
self.random_scale_range = random_scale_range |
|
self.random_aspect_prob = random_aspect_prob |
|
self.random_aspect_range = random_aspect_range |
|
|
|
@staticmethod |
|
def get_params( |
|
img, |
|
target_size, |
|
longest, |
|
random_scale_prob=0.0, |
|
random_scale_range=(0.85, 1.05), |
|
random_aspect_prob=0.0, |
|
random_aspect_range=(0.9, 1.11), |
|
): |
|
"""Get parameters""" |
|
source_size = img.size[::-1] |
|
h, w = source_size |
|
target_h, target_w = target_size |
|
ratio_h = h / target_h |
|
ratio_w = w / target_w |
|
ratio = max(ratio_h, ratio_w) * longest + min(ratio_h, ratio_w) * ( |
|
1.0 - longest |
|
) |
|
if random_scale_prob > 0 and random.random() < random_scale_prob: |
|
ratio_factor = random.uniform(random_scale_range[0], random_scale_range[1]) |
|
ratio_factor = (ratio_factor, ratio_factor) |
|
else: |
|
ratio_factor = (1.0, 1.0) |
|
if random_aspect_prob > 0 and random.random() < random_aspect_prob: |
|
aspect_factor = random.uniform( |
|
random_aspect_range[0], random_aspect_range[1] |
|
) |
|
ratio_factor = ( |
|
ratio_factor[0] / aspect_factor, |
|
ratio_factor[1] * aspect_factor, |
|
) |
|
size = [round(x * f / ratio) for x, f in zip(source_size, ratio_factor)] |
|
return size |
|
|
|
def __call__(self, img): |
|
""" |
|
Args: |
|
img (PIL Image): Image to be cropped and resized. |
|
|
|
Returns: |
|
PIL Image: Resized, padded to at least target size, possibly |
|
cropped to exactly target size |
|
""" |
|
size = self.get_params( |
|
img, |
|
self.size, |
|
self.longest, |
|
self.random_scale_prob, |
|
self.random_scale_range, |
|
self.random_aspect_prob, |
|
self.random_aspect_range, |
|
) |
|
img = F.resize(img, size, self.interpolation) |
|
return img |
|
|
|
def __repr__(self): |
|
format_string = self.__class__.__name__ + '(size={0}'.format(self.size) |
|
format_string += f', interpolation={self.interpolation})' |
|
format_string += f', longest={self.longest:.3f})' |
|
return format_string |
|
|
|
|
|
def center_crop_or_pad( |
|
img: torch.Tensor, output_size: List[int], fill=0 |
|
) -> torch.Tensor: |
|
"""Center crops and/or pads the given image. |
|
If the image is torch Tensor, it is expected |
|
to have [..., H, W] shape, where ... means an arbitrary number of leading |
|
dimensions. If image size is smaller than output size along any edge, image is |
|
padded with 0 and then center cropped. |
|
|
|
Args: |
|
img (PIL Image or Tensor): Image to be cropped. |
|
output_size (sequence or int): (height, width) of the crop box. If int or |
|
sequence with single int, it is used for both directions. |
|
fill (int, Tuple[int]): Padding color |
|
|
|
Returns: |
|
PIL Image or Tensor: Cropped image. |
|
""" |
|
if isinstance(output_size, numbers.Number): |
|
output_size = (int(output_size), int(output_size)) |
|
elif isinstance(output_size, (tuple, list)) and len(output_size) == 1: |
|
output_size = (output_size[0], output_size[0]) |
|
|
|
_, image_height, image_width = F.get_dimensions(img) |
|
crop_height, crop_width = output_size |
|
|
|
if crop_width > image_width or crop_height > image_height: |
|
padding_ltrb = [ |
|
(crop_width - image_width) // 2 if crop_width > image_width else 0, |
|
(crop_height - image_height) // 2 if crop_height > image_height else 0, |
|
(crop_width - image_width + 1) // 2 if crop_width > image_width else 0, |
|
(crop_height - image_height + 1) // 2 if crop_height > image_height else 0, |
|
] |
|
img = F.pad(img, padding_ltrb, fill=fill) |
|
_, image_height, image_width = F.get_dimensions(img) |
|
if crop_width == image_width and crop_height == image_height: |
|
return img |
|
|
|
crop_top = int(round((image_height - crop_height) / 2.0)) |
|
crop_left = int(round((image_width - crop_width) / 2.0)) |
|
return F.crop(img, crop_top, crop_left, crop_height, crop_width) |
|
|
|
|
|
class CenterCropOrPad(torch.nn.Module): |
|
"""Crops the given image at the center. |
|
If the image is torch Tensor, it is expected |
|
to have [..., H, W] shape, where ... means an arbitrary number of leading |
|
dimensions. If image size is smaller than output size along any edge, image is |
|
padded with 0 and then center cropped. |
|
|
|
Args: |
|
size (sequence or int): Desired output size of the crop. If size is an |
|
int instead of sequence like (h, w), a square crop (size, size) is |
|
made. If provided a sequence of length 1, it will be interpreted as |
|
(size[0], size[0]). |
|
""" |
|
|
|
def __init__(self, size, fill=0): |
|
super().__init__() |
|
self.size = _setup_size( |
|
size, error_msg='Please provide only two dimensions (h, w) for size.' |
|
) |
|
self.fill = fill |
|
|
|
def forward(self, img): |
|
""" |
|
Args: |
|
img (PIL Image or Tensor): Image to be cropped. |
|
|
|
Returns: |
|
PIL Image or Tensor: Cropped image. |
|
""" |
|
return center_crop_or_pad(img, self.size, fill=self.fill) |
|
|
|
def __repr__(self) -> str: |
|
return f'{self.__class__.__name__}(size={self.size})' |
|
|
|
|
|
def _convert_to_rgb(image): |
|
return image.convert('RGB') |
|
|
|
|
|
class _ColorJitter(object): |
|
""" |
|
Apply Color Jitter to the PIL image with a specified probability. |
|
""" |
|
|
|
def __init__(self, brightness=0.0, contrast=0.0, saturation=0.0, hue=0.0, p=0.8): |
|
assert 0.0 <= p <= 1.0 |
|
self.p = p |
|
self.transf = ColorJitter( |
|
brightness=brightness, contrast=contrast, saturation=saturation, hue=hue |
|
) |
|
|
|
def __call__(self, img): |
|
if random.random() < self.p: |
|
return self.transf(img) |
|
else: |
|
return img |
|
|
|
|
|
class _GrayScale(object): |
|
""" |
|
Apply Gray Scale to the PIL image with a specified probability. |
|
""" |
|
|
|
def __init__(self, p=0.2): |
|
assert 0.0 <= p <= 1.0 |
|
self.p = p |
|
self.transf = Grayscale(num_output_channels=3) |
|
|
|
def __call__(self, img): |
|
if random.random() < self.p: |
|
return self.transf(img) |
|
else: |
|
return img |
|
|
|
|
|
def image_transform( |
|
image_size: Union[int, Tuple[int, int]], |
|
is_train: bool, |
|
mean: Optional[Tuple[float, ...]] = None, |
|
std: Optional[Tuple[float, ...]] = None, |
|
resize_mode: Optional[str] = None, |
|
interpolation: Optional[str] = None, |
|
fill_color: int = 0, |
|
aug_cfg: Optional[Union[Dict[str, Any], AugmentationCfg]] = None, |
|
): |
|
mean = mean or OPENAI_DATASET_MEAN |
|
if not isinstance(mean, (list, tuple)): |
|
mean = (mean,) * 3 |
|
|
|
std = std or OPENAI_DATASET_STD |
|
if not isinstance(std, (list, tuple)): |
|
std = (std,) * 3 |
|
|
|
interpolation = interpolation or 'bicubic' |
|
assert interpolation in ['bicubic', 'bilinear', 'random'] |
|
|
|
|
|
interpolation_mode = ( |
|
InterpolationMode.BILINEAR |
|
if interpolation == 'bilinear' |
|
else InterpolationMode.BICUBIC |
|
) |
|
|
|
resize_mode = resize_mode or 'shortest' |
|
assert resize_mode in ('shortest', 'longest', 'squash') |
|
|
|
if isinstance(aug_cfg, dict): |
|
aug_cfg = AugmentationCfg(**aug_cfg) |
|
else: |
|
aug_cfg = aug_cfg or AugmentationCfg() |
|
|
|
normalize = Normalize(mean=mean, std=std) |
|
|
|
if is_train: |
|
aug_cfg_dict = {k: v for k, v in asdict(aug_cfg).items() if v is not None} |
|
use_timm = aug_cfg_dict.pop('use_timm', False) |
|
if use_timm: |
|
from timm.data import create_transform |
|
|
|
if isinstance(image_size, (tuple, list)): |
|
assert len(image_size) >= 2 |
|
input_size = (3,) + image_size[-2:] |
|
else: |
|
input_size = (3, image_size, image_size) |
|
|
|
aug_cfg_dict.setdefault('color_jitter', None) |
|
|
|
aug_cfg_dict.pop('color_jitter_prob', None) |
|
aug_cfg_dict.pop('gray_scale_prob', None) |
|
|
|
train_transform = create_transform( |
|
input_size=input_size, |
|
is_training=True, |
|
hflip=0.0, |
|
mean=mean, |
|
std=std, |
|
re_mode='pixel', |
|
interpolation=interpolation, |
|
**aug_cfg_dict, |
|
) |
|
else: |
|
train_transform = [ |
|
RandomResizedCrop( |
|
image_size, |
|
scale=aug_cfg_dict.pop('scale'), |
|
interpolation=InterpolationMode.BICUBIC, |
|
), |
|
_convert_to_rgb, |
|
] |
|
if aug_cfg.color_jitter_prob: |
|
assert ( |
|
aug_cfg.color_jitter is not None and len(aug_cfg.color_jitter) == 4 |
|
) |
|
train_transform.extend( |
|
[_ColorJitter(*aug_cfg.color_jitter, p=aug_cfg.color_jitter_prob)] |
|
) |
|
if aug_cfg.gray_scale_prob: |
|
train_transform.extend([_GrayScale(aug_cfg.gray_scale_prob)]) |
|
train_transform.extend( |
|
[ |
|
ToTensor(), |
|
normalize, |
|
] |
|
) |
|
train_transform = Compose(train_transform) |
|
if aug_cfg_dict: |
|
warnings.warn( |
|
f'Unused augmentation cfg items, specify `use_timm` to use ' |
|
f'({list(aug_cfg_dict.keys())}).' |
|
) |
|
return train_transform |
|
else: |
|
if resize_mode == 'longest': |
|
transforms = [ |
|
ResizeKeepRatio( |
|
image_size, interpolation=interpolation_mode, longest=1 |
|
), |
|
CenterCropOrPad(image_size, fill=fill_color), |
|
] |
|
elif resize_mode == 'squash': |
|
if isinstance(image_size, int): |
|
image_size = (image_size, image_size) |
|
transforms = [ |
|
Resize(image_size, interpolation=interpolation_mode), |
|
] |
|
else: |
|
assert resize_mode == 'shortest' |
|
if not isinstance(image_size, (tuple, list)): |
|
image_size = (image_size, image_size) |
|
if image_size[0] == image_size[1]: |
|
|
|
|
|
transforms = [Resize(image_size[0], interpolation=interpolation_mode)] |
|
else: |
|
|
|
transforms = [ResizeKeepRatio(image_size)] |
|
transforms += [CenterCrop(image_size)] |
|
|
|
transforms.extend( |
|
[ |
|
_convert_to_rgb, |
|
ToTensor(), |
|
normalize, |
|
] |
|
) |
|
return Compose(transforms) |
|
|
|
|
|
def image_transform_v2( |
|
cfg: PreprocessCfg, |
|
is_train: bool, |
|
aug_cfg: Optional[Union[Dict[str, Any], AugmentationCfg]] = None, |
|
): |
|
return image_transform( |
|
image_size=cfg.size, |
|
is_train=is_train, |
|
mean=cfg.mean, |
|
std=cfg.std, |
|
interpolation=cfg.interpolation, |
|
resize_mode=cfg.resize_mode, |
|
fill_color=cfg.fill_color, |
|
aug_cfg=aug_cfg, |
|
) |
|
|