File size: 5,950 Bytes
fb22435 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 |
import os
import torch
from torch import nn
from torch.nn import functional as F
from transformers import PreTrainedModel
from .configuration_MyResnet import MyResnetConfig
# 设置CUDA异常阻塞,用于调试CUDA相关问题
os.environ['CUDA_LAUNCH_BLOCKING'] = '1'
"""
定义自己的模型
"""
# 定义残差块
class Residual(nn.Module):
def __init__(self, input_channels, num_channels,
use_1x1conv=False, strides=1):
super().__init__()
# 第一个3x3卷积层
self.conv1 = nn.Conv2d(input_channels, num_channels,
kernel_size=3, padding=1, stride=strides)
# 第二个3x3卷积层
self.conv2 = nn.Conv2d(num_channels, num_channels,
kernel_size=3, padding=1)
# 可选的1x1卷积层,用于调整输入的通道数
if use_1x1conv:
self.conv3 = nn.Conv2d(input_channels, num_channels,
kernel_size=1, stride=strides)
else:
self.conv3 = None
# 批量归一化层
self.bn1 = nn.BatchNorm2d(num_channels)
self.bn2 = nn.BatchNorm2d(num_channels)
def forward(self, X):
# 第一个卷积 -> 批量归一化 -> ReLU激活
Y = F.relu(self.bn1(self.conv1(X)))
# 第二个卷积 -> 批量归一化
Y = self.bn2(self.conv2(Y))
# 如果使用1x1卷积,调整输入的通道数
if self.conv3:
X = self.conv3(X)
# 将输入与输出相加
Y += X
return F.relu(Y) # 返回激活后的结果
# 组合多个残差块
def resnet_block(input_channels, num_channels, num_residuals,
first_block=False):
"""
:param first_block: 是否为第一个块,用于确定是否需要1x1卷积
:param input_channels: 输入通道数
:param num_channels: 残差块的输出通道数
:param num_residuals: 残差块的数量
:return: 组合后的多个残差块
"""
blk = []
for i in range(num_residuals):
# 第一个残差块需要降维
if i == 0 and not first_block:
blk.append(Residual(input_channels, num_channels,
use_1x1conv=True, strides=2))
else:
blk.append(Residual(num_channels, num_channels))
return blk
# 定义残差网络
def net(in_channels, num_channels, num_residuals, num_classes):
"""
:param in_channels: 输入图像的通道数
:param num_channels: 第一个卷积层的输出通道数
:param num_residuals: 每个阶段的残差块数量
:param num_classes: 分类的数量
:return: 构建的残差网络模型
"""
# 首先是一个7x7卷积层,接着是批量归一化、ReLU激活和3x3最大池化
b1 = nn.Sequential(nn.Conv2d(in_channels, num_channels, kernel_size=7, stride=2, padding=3),
nn.BatchNorm2d(64), nn.ReLU(),
nn.MaxPool2d(kernel_size=3, stride=2, padding=1))
# 构建多个残差块
b2 = nn.Sequential(*resnet_block(64, num_channels, num_residuals[0], first_block=True))
b3 = nn.Sequential(*resnet_block(num_channels, num_channels * 2, num_residuals[1]))
b4 = nn.Sequential(*resnet_block(num_channels * 2, num_channels * 4, num_residuals[2]))
b5 = nn.Sequential(*resnet_block(num_channels * 4, num_channels * 8, num_residuals[3]))
# 全局平均池化后,连接一个全连接层进行分类
resnet = nn.Sequential(b1, b2, b3, b4, b5,
nn.AdaptiveAvgPool2d((1, 1)),
nn.Flatten(), nn.Linear(num_channels * 8, num_classes))
return resnet
"""
把模型封装成huggingface的模型,
可以使用transformers库进行训练和推理
这里定义了两个模型类:一个用于从一批图像中提取隐藏特征(类似于 BertModel),
另一个适用于图像分类(类似于 BertForSequenceClassification)。
"""
class MyResnetModel(PreTrainedModel):
config_class = MyResnetConfig # 指定配置类
def __init__(self, config):
super().__init__(config)
# 根据配置初始化模型
self.model = net(
in_channels=config.in_channels,
num_channels=config.num_channels,
num_residuals=config.num_residuals,
num_classes=config.num_classes
)
def forward(self, tensor, labels=None):
return self.model.forward_features(tensor) # 返回特征
class MyResnetModelForImageClassification(PreTrainedModel):
config_class = MyResnetConfig # 指定配置类
def __init__(self, config):
super().__init__(config)
# 根据配置初始化模型
self.model = net(
in_channels=config.in_channels,
num_channels=config.num_channels,
num_residuals=config.num_residuals,
num_classes=config.num_classes
)
"""
你可以让模型返回任何你想要的内容,
但是像这样返回一个字典,并在传递标签时包含loss,可以使你的模型能够在 Trainer 类中直接使用。
只要你计划使用自己的训练循环或其他库进行训练,也可以使用其他输出格式。
"""
def forward(self, X, y):
# 前向传播,计算模型输出
# print(y)
y_hat = self.model(X)
if y is not None:
# 计算损失
loss = torch.nn.functional.cross_entropy(y_hat, y)
return {"loss": loss, "logits": y_hat} # 返回损失和输出
return {"logits": y_hat}
def forward_features(self, X):
# 返回特征
for layer in self.model:
X = layer(X)
print(layer.__class__.__name__, 'output shape:\t', X.shape)
|