|
import os
|
|
import torch
|
|
from torch import nn
|
|
from torch.nn import functional as F
|
|
from transformers import PreTrainedModel
|
|
from .configuration_MyResnet import MyResnetConfig
|
|
|
|
|
|
os.environ['CUDA_LAUNCH_BLOCKING'] = '1'
|
|
|
|
"""
|
|
定义自己的模型
|
|
"""
|
|
|
|
|
|
|
|
class Residual(nn.Module):
|
|
def __init__(self, input_channels, num_channels,
|
|
use_1x1conv=False, strides=1):
|
|
super().__init__()
|
|
|
|
self.conv1 = nn.Conv2d(input_channels, num_channels,
|
|
kernel_size=3, padding=1, stride=strides)
|
|
|
|
self.conv2 = nn.Conv2d(num_channels, num_channels,
|
|
kernel_size=3, padding=1)
|
|
|
|
if use_1x1conv:
|
|
self.conv3 = nn.Conv2d(input_channels, num_channels,
|
|
kernel_size=1, stride=strides)
|
|
else:
|
|
self.conv3 = None
|
|
|
|
self.bn1 = nn.BatchNorm2d(num_channels)
|
|
self.bn2 = nn.BatchNorm2d(num_channels)
|
|
|
|
def forward(self, X):
|
|
|
|
Y = F.relu(self.bn1(self.conv1(X)))
|
|
|
|
Y = self.bn2(self.conv2(Y))
|
|
|
|
if self.conv3:
|
|
X = self.conv3(X)
|
|
|
|
Y += X
|
|
return F.relu(Y)
|
|
|
|
|
|
|
|
def resnet_block(input_channels, num_channels, num_residuals,
|
|
first_block=False):
|
|
"""
|
|
:param first_block: 是否为第一个块,用于确定是否需要1x1卷积
|
|
:param input_channels: 输入通道数
|
|
:param num_channels: 残差块的输出通道数
|
|
:param num_residuals: 残差块的数量
|
|
:return: 组合后的多个残差块
|
|
"""
|
|
blk = []
|
|
for i in range(num_residuals):
|
|
|
|
if i == 0 and not first_block:
|
|
blk.append(Residual(input_channels, num_channels,
|
|
use_1x1conv=True, strides=2))
|
|
else:
|
|
blk.append(Residual(num_channels, num_channels))
|
|
return blk
|
|
|
|
|
|
|
|
def net(in_channels, num_channels, num_residuals, num_classes):
|
|
"""
|
|
:param in_channels: 输入图像的通道数
|
|
:param num_channels: 第一个卷积层的输出通道数
|
|
:param num_residuals: 每个阶段的残差块数量
|
|
:param num_classes: 分类的数量
|
|
:return: 构建的残差网络模型
|
|
"""
|
|
|
|
b1 = nn.Sequential(nn.Conv2d(in_channels, num_channels, kernel_size=7, stride=2, padding=3),
|
|
nn.BatchNorm2d(64), nn.ReLU(),
|
|
nn.MaxPool2d(kernel_size=3, stride=2, padding=1))
|
|
|
|
|
|
b2 = nn.Sequential(*resnet_block(64, num_channels, num_residuals[0], first_block=True))
|
|
b3 = nn.Sequential(*resnet_block(num_channels, num_channels * 2, num_residuals[1]))
|
|
b4 = nn.Sequential(*resnet_block(num_channels * 2, num_channels * 4, num_residuals[2]))
|
|
b5 = nn.Sequential(*resnet_block(num_channels * 4, num_channels * 8, num_residuals[3]))
|
|
|
|
|
|
resnet = nn.Sequential(b1, b2, b3, b4, b5,
|
|
nn.AdaptiveAvgPool2d((1, 1)),
|
|
nn.Flatten(), nn.Linear(num_channels * 8, num_classes))
|
|
return resnet
|
|
|
|
|
|
"""
|
|
把模型封装成huggingface的模型,
|
|
可以使用transformers库进行训练和推理
|
|
这里定义了两个模型类:一个用于从一批图像中提取隐藏特征(类似于 BertModel),
|
|
另一个适用于图像分类(类似于 BertForSequenceClassification)。
|
|
"""
|
|
|
|
|
|
class MyResnetModel(PreTrainedModel):
|
|
config_class = MyResnetConfig
|
|
|
|
def __init__(self, config):
|
|
super().__init__(config)
|
|
|
|
self.model = net(
|
|
in_channels=config.in_channels,
|
|
num_channels=config.num_channels,
|
|
num_residuals=config.num_residuals,
|
|
num_classes=config.num_classes
|
|
)
|
|
|
|
def forward(self, tensor, labels=None):
|
|
return self.model.forward_features(tensor)
|
|
|
|
|
|
class MyResnetModelForImageClassification(PreTrainedModel):
|
|
config_class = MyResnetConfig
|
|
|
|
def __init__(self, config):
|
|
super().__init__(config)
|
|
|
|
self.model = net(
|
|
in_channels=config.in_channels,
|
|
num_channels=config.num_channels,
|
|
num_residuals=config.num_residuals,
|
|
num_classes=config.num_classes
|
|
)
|
|
|
|
"""
|
|
你可以让模型返回任何你想要的内容,
|
|
但是像这样返回一个字典,并在传递标签时包含loss,可以使你的模型能够在 Trainer 类中直接使用。
|
|
只要你计划使用自己的训练循环或其他库进行训练,也可以使用其他输出格式。
|
|
"""
|
|
|
|
def forward(self, X, y):
|
|
|
|
|
|
y_hat = self.model(X)
|
|
if y is not None:
|
|
|
|
loss = torch.nn.functional.cross_entropy(y_hat, y)
|
|
return {"loss": loss, "logits": y_hat}
|
|
return {"logits": y_hat}
|
|
|
|
def forward_features(self, X):
|
|
|
|
for layer in self.model:
|
|
X = layer(X)
|
|
print(layer.__class__.__name__, 'output shape:\t', X.shape)
|
|
|