|
import math |
|
import os |
|
import warnings |
|
from dataclasses import dataclass |
|
from functools import lru_cache, partial |
|
from pathlib import Path |
|
from typing import Dict, List, Optional, Tuple, Union |
|
|
|
import torch |
|
import torch.nn.functional as F |
|
from torch import Tensor, nn |
|
from torch.autograd import Function |
|
from torch.autograd.function import once_differentiable |
|
|
|
from transformers.activations import ACT2CLS, ACT2FN |
|
from transformers.image_transforms import center_to_corners_format, corners_to_center_format |
|
from transformers.modeling_outputs import BaseModelOutput |
|
from transformers.modeling_utils import PreTrainedModel |
|
from transformers.utils import ( |
|
ModelOutput, |
|
add_start_docstrings, |
|
add_start_docstrings_to_model_forward, |
|
is_ninja_available, |
|
is_scipy_available, |
|
is_torch_cuda_available, |
|
logging, |
|
replace_return_docstrings, |
|
requires_backends, |
|
) |
|
|
|
from transformers.models.rt_detr.configuration_rt_detr_resnet import RTDetrResNetConfig |
|
from transformers.models.rt_detr.modeling_rt_detr import ( |
|
RTDetrConfig, |
|
RTDetrDecoderOutput, |
|
RTDetrModelOutput, |
|
RTDetrObjectDetectionOutput, |
|
RTDetrFrozenBatchNorm2d, |
|
RTDetrConvEncoder, |
|
RTDetrConvNormLayer, |
|
RTDetrEncoderLayer, |
|
RTDetrRepVggBlock, |
|
RTDetrCSPRepLayer, |
|
RTDetrMultiscaleDeformableAttention, |
|
RTDetrMultiheadAttention, |
|
RTDetrDecoderLayer, |
|
RTDetrPreTrainedModel, |
|
RTDetrEncoder, |
|
RTDetrHybridEncoder, |
|
RTDetrDecoder, |
|
RTDetrModel, |
|
RTDetrMLPPredictionHead, |
|
RTDetrForObjectDetection |
|
) |
|
from transformers.loss.loss_rt_detr import (RTDetrLoss, RTDetrHungarianMatcher) |
|
from transformers.utils.backbone_utils import load_backbone |
|
|
|
|
|
|
|
class RTDetrV2Config(RTDetrConfig): |
|
model_type = "rt_detr_v2" |
|
def __init__( |
|
self, |
|
decoder_n_levels=3, |
|
decoder_offset_scale=0.5, |
|
**kwargs |
|
): |
|
super().__init__(**kwargs) |
|
self.decoder_n_levels = decoder_n_levels |
|
self.decoder_offset_scale = decoder_offset_scale |
|
|
|
class RTDetrV2ResNetConfig(RTDetrResNetConfig): |
|
model_type = "rt_detr_v2_resnet" |
|
|
|
|
|
logger = logging.get_logger(__name__) |
|
|
|
|
|
class RTDetrV2DecoderOutput(RTDetrDecoderOutput): |
|
pass |
|
|
|
class RTDetrV2ModelOutput(RTDetrModelOutput): |
|
pass |
|
|
|
class RTDetrV2ObjectDetectionOutput(RTDetrObjectDetectionOutput): |
|
pass |
|
|
|
class RTDetrV2FrozenBatchNorm2d(RTDetrFrozenBatchNorm2d): |
|
pass |
|
|
|
|
|
class RTDetrV2ConvEncoder(RTDetrConvEncoder): |
|
pass |
|
|
|
class RTDetrV2ConvNormLayer(RTDetrConvNormLayer): |
|
pass |
|
|
|
class RTDetrV2EncoderLayer(RTDetrEncoderLayer): |
|
pass |
|
|
|
class RTDetrV2RepVggBlock(RTDetrRepVggBlock): |
|
pass |
|
|
|
class RTDetrV2CSPRepLayer(RTDetrCSPRepLayer): |
|
pass |
|
|
|
|
|
|
|
def multi_scale_deformable_attention_v2( |
|
value: Tensor, |
|
value_spatial_shapes: Tensor, |
|
sampling_locations: Tensor, |
|
attention_weights: Tensor, |
|
num_points_list: List[int], |
|
method="default", |
|
) -> Tensor: |
|
batch_size, _, num_heads, hidden_dim = value.shape |
|
_, num_queries, num_heads, num_levels, num_points = sampling_locations.shape |
|
value_list = ( |
|
value.permute(0, 2, 3, 1) |
|
.flatten(0, 1) |
|
.split([height.item() * width.item() for height, width in value_spatial_shapes], dim=-1) |
|
) |
|
|
|
if method == "default": |
|
sampling_grids = 2 * sampling_locations - 1 |
|
elif method == "discrete": |
|
sampling_grids = sampling_locations |
|
sampling_grids = sampling_grids.permute(0, 2, 1, 3, 4).flatten(0, 1) |
|
sampling_grids = sampling_grids.split(num_points_list, dim=-2) |
|
sampling_value_list = [] |
|
for level_id, (height, width) in enumerate(value_spatial_shapes): |
|
|
|
|
|
|
|
|
|
value_l_ = value_list[level_id].reshape(batch_size * num_heads, hidden_dim, height, width) |
|
|
|
|
|
|
|
sampling_grid_l_ = sampling_grids[level_id] |
|
|
|
if method == "default": |
|
sampling_value_l_ = nn.functional.grid_sample( |
|
value_l_, sampling_grid_l_, mode="bilinear", padding_mode="zeros", align_corners=False |
|
) |
|
elif method == "discrete": |
|
sampling_coord = (sampling_grid_l_ * torch.tensor([[width, height]], device=value.device) + 0.5).to( |
|
torch.int64 |
|
) |
|
|
|
|
|
sampling_coord_x = sampling_coord[..., 0].clamp(0, width - 1) |
|
sampling_coord_y = sampling_coord[..., 1].clamp(0, height - 1) |
|
|
|
|
|
sampling_coord = torch.stack([sampling_coord_x, sampling_coord_y], dim=-1) |
|
sampling_coord = sampling_coord.reshape(batch_size * num_heads, num_queries * num_points_list[level_id], 2) |
|
sampling_idx = ( |
|
torch.arange(sampling_coord.shape[0], device=value.device) |
|
.unsqueeze(-1) |
|
.repeat(1, sampling_coord.shape[1]) |
|
) |
|
sampling_value_l_ = value_l_[sampling_idx, :, sampling_coord[..., 1], sampling_coord[..., 0]] |
|
sampling_value_l_ = sampling_value_l_.permute(0, 2, 1).reshape( |
|
batch_size * num_heads, hidden_dim, num_queries, num_points_list[level_id] |
|
) |
|
sampling_value_list.append(sampling_value_l_) |
|
|
|
|
|
|
|
attention_weights = attention_weights.permute(0, 2, 1, 3).reshape( |
|
batch_size * num_heads, 1, num_queries, sum(num_points_list) |
|
) |
|
output = ( |
|
(torch.concat(sampling_value_list, dim=-1) * attention_weights) |
|
.sum(-1) |
|
.view(batch_size, num_heads * hidden_dim, num_queries) |
|
) |
|
return output.transpose(1, 2).contiguous() |
|
|
|
|
|
def __init__(self, config: RTDetrV2Config): |
|
super().__init__(config, config.decoder_attention_heads, config.decoder_n_points) |
|
self.n_levels = config.decoder_n_levels |
|
self.offset_scale = config.decoder_offset_scale |
|
|
|
class RTDetrV2MultiscaleDeformableAttention(RTDetrMultiscaleDeformableAttention): |
|
|
|
def __init__(self, config: RTDetrV2Config): |
|
super().__init__(config, config.decoder_attention_heads, config.decoder_n_points) |
|
self.n_levels = config.decoder_n_levels |
|
self.offset_scale = config.decoder_offset_scale |
|
n_points_list = [self.n_points for _ in range(self.n_levels)] |
|
self.n_points_list = n_points_list |
|
n_points_scale = [1 / n for n in n_points_list for _ in range(n)] |
|
self.register_buffer("n_points_scale", torch.tensor(n_points_scale, dtype=torch.float32)) |
|
|
|
self._reset_parameters() |
|
|
|
def _reset_parameters(self): |
|
nn.init.constant_(self.sampling_offsets.weight.data, 0.0) |
|
default_dtype = torch.get_default_dtype() |
|
thetas = torch.arange(self.n_heads, dtype=torch.int64).to(default_dtype) * (2.0 * math.pi / self.n_heads) |
|
grid_init = torch.stack([thetas.cos(), thetas.sin()], -1) |
|
grid_init = ( |
|
(grid_init / grid_init.abs().max(-1, keepdim=True)[0]) |
|
.view(self.n_heads, 1, 1, 2) |
|
.repeat(1, self.n_levels, self.n_points, 1) |
|
) |
|
for i in range(self.n_points): |
|
grid_init[:, :, i, :] *= i + 1 |
|
with torch.no_grad(): |
|
self.sampling_offsets.bias = nn.Parameter(grid_init.view(-1)) |
|
nn.init.constant_(self.attention_weights.weight.data, 0.0) |
|
nn.init.constant_(self.attention_weights.bias.data, 0.0) |
|
nn.init.xavier_uniform_(self.value_proj.weight.data) |
|
nn.init.constant_(self.value_proj.bias.data, 0.0) |
|
nn.init.xavier_uniform_(self.output_proj.weight.data) |
|
nn.init.constant_(self.output_proj.bias.data, 0.0) |
|
|
|
|
|
def forward( |
|
self, |
|
hidden_states: torch.Tensor, |
|
attention_mask: Optional[torch.Tensor] = None, |
|
encoder_hidden_states=None, |
|
encoder_attention_mask=None, |
|
position_embeddings: Optional[torch.Tensor] = None, |
|
reference_points=None, |
|
spatial_shapes=None, |
|
level_start_index=None, |
|
output_attentions: bool = False, |
|
): |
|
|
|
if position_embeddings is not None: |
|
hidden_states = self.with_pos_embed(hidden_states, position_embeddings) |
|
|
|
batch_size, num_queries, _ = hidden_states.shape |
|
batch_size, sequence_length, _ = encoder_hidden_states.shape |
|
if (spatial_shapes[:, 0] * spatial_shapes[:, 1]).sum() != sequence_length: |
|
raise ValueError( |
|
"Make sure to align the spatial shapes with the sequence length of the encoder hidden states" |
|
) |
|
|
|
value = self.value_proj(encoder_hidden_states) |
|
if attention_mask is not None: |
|
|
|
value = value.masked_fill(~attention_mask[..., None], float(0)) |
|
value = value.view(batch_size, sequence_length, self.n_heads, self.d_model // self.n_heads) |
|
sampling_offsets = self.sampling_offsets(hidden_states).view( |
|
batch_size, num_queries, self.n_heads, self.n_levels * self.n_points, 2 |
|
) |
|
attention_weights = self.attention_weights(hidden_states).view( |
|
batch_size, num_queries, self.n_heads, self.n_levels * self.n_points |
|
) |
|
attention_weights = F.softmax(attention_weights, -1).view( |
|
batch_size, num_queries, self.n_heads, self.n_levels * self.n_points |
|
) |
|
|
|
num_coordinates = reference_points.shape[-1] |
|
if num_coordinates == 2: |
|
offset_normalizer = torch.stack([spatial_shapes[..., 1], spatial_shapes[..., 0]], -1) |
|
sampling_locations = ( |
|
reference_points[:, :, None, :, None, :] |
|
+ sampling_offsets / offset_normalizer[None, None, None, :, None, :] |
|
) |
|
elif num_coordinates == 4: |
|
n_points_scale = self.n_points_scale.to(dtype=hidden_states.dtype).unsqueeze(-1) |
|
offset = sampling_offsets * n_points_scale * reference_points[:, :, None, :, 2:] * self.offset_scale |
|
sampling_locations = reference_points[:, :, None, :, :2] + offset |
|
else: |
|
raise ValueError(f"Last dim of reference_points must be 2 or 4, but got {reference_points.shape[-1]}") |
|
|
|
if self.disable_custom_kernels: |
|
|
|
output = multi_scale_deformable_attention_v2( |
|
value, spatial_shapes, sampling_locations, attention_weights, self.n_points_list |
|
) |
|
else: |
|
try: |
|
|
|
output = MultiScaleDeformableAttentionFunction.apply( |
|
value, |
|
spatial_shapes, |
|
level_start_index, |
|
sampling_locations, |
|
attention_weights, |
|
self.im2col_step, |
|
) |
|
except Exception: |
|
|
|
output = multi_scale_deformable_attention_v2( |
|
value, spatial_shapes, sampling_locations, attention_weights, self.n_points_list |
|
) |
|
output = self.output_proj(output) |
|
|
|
return output, attention_weights |
|
|
|
class RTDetrV2MultiheadAttention(RTDetrMultiheadAttention): |
|
pass |
|
|
|
class RTDetrV2DecoderLayer(RTDetrDecoderLayer): |
|
pass |
|
|
|
|
|
class RTDetrV2PreTrainedModel(RTDetrPreTrainedModel): |
|
config_class = RTDetrV2Config |
|
base_model_prefix = "rt_detr_v2" |
|
main_input_name = "pixel_values" |
|
_no_split_modules = [r"RTDetrV2ConvEncoder", r"RTDetrV2EncoderLayer", r"RTDetrV2DecoderLayer"] |
|
|
|
|
|
class RTDetrV2Encoder(RTDetrEncoder): |
|
pass |
|
|
|
class RTDetrV2HybridEncoder(RTDetrHybridEncoder): |
|
pass |
|
|
|
class RTDetrV2Decoder(RTDetrDecoder): |
|
pass |
|
|
|
|
|
class RTDetrV2Model(RTDetrModel): |
|
pass |
|
|
|
class RTDetrV2Loss(RTDetrLoss): |
|
pass |
|
|
|
|
|
class RTDetrV2MLPPredictionHead(RTDetrMLPPredictionHead): |
|
pass |
|
|
|
class RTDetrV2HungarianMatcher(RTDetrHungarianMatcher): |
|
pass |
|
|
|
|
|
|
|
class RTDetrV2ForObjectDetection(RTDetrForObjectDetection): |
|
pass |
|
|
|
|