|
import torch |
|
import torch.nn as nn |
|
import torch.optim as optim |
|
from torchvision import datasets, transforms, models |
|
from torch.utils.data import DataLoader |
|
import os |
|
import copy |
|
from torch.optim.lr_scheduler import ReduceLROnPlateau |
|
from torchvision.models import resnet50, ResNet50_Weights |
|
import ssl |
|
ssl._create_default_https_context = ssl._create_unverified_context |
|
|
|
|
|
train_transform = transforms.Compose([ |
|
transforms.RandomResizedCrop(224), |
|
transforms.RandomHorizontalFlip(), |
|
transforms.RandomRotation(10), |
|
transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2), |
|
transforms.ToTensor(), |
|
transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]) |
|
]) |
|
|
|
val_test_transform = transforms.Compose([ |
|
transforms.Resize(256), |
|
transforms.CenterCrop(224), |
|
transforms.ToTensor(), |
|
transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]) |
|
]) |
|
|
|
|
|
class ResNetLungCancer(nn.Module): |
|
def __init__(self, num_classes, use_pretrained=True): |
|
super(ResNetLungCancer, self).__init__() |
|
if use_pretrained: |
|
weights = ResNet50_Weights.IMAGENET1K_V1 |
|
else: |
|
weights = None |
|
self.resnet = resnet50(weights=weights) |
|
num_ftrs = self.resnet.fc.in_features |
|
self.resnet.fc = nn.Identity() |
|
self.fc = nn.Sequential( |
|
nn.Linear(num_ftrs, 256), |
|
nn.ReLU(), |
|
nn.Dropout(0.5), |
|
nn.Linear(256, num_classes) |
|
) |
|
|
|
def forward(self, x): |
|
x = self.resnet(x) |
|
return self.fc(x) |
|
|
|
|
|
|
|
def train_model(model, train_loader, valid_loader, criterion, optimizer, scheduler, num_epochs=50, device='cuda'): |
|
best_model_wts = copy.deepcopy(model.state_dict()) |
|
best_acc = 0.0 |
|
|
|
for epoch in range(num_epochs): |
|
print(f'Epoch {epoch}/{num_epochs - 1}') |
|
print('-' * 10) |
|
|
|
for phase in ['train', 'valid']: |
|
if phase == 'train': |
|
model.train() |
|
dataloader = train_loader |
|
else: |
|
model.eval() |
|
dataloader = valid_loader |
|
|
|
running_loss = 0.0 |
|
running_corrects = 0 |
|
|
|
for inputs, labels in dataloader: |
|
inputs = inputs.to(device) |
|
labels = labels.to(device) |
|
|
|
optimizer.zero_grad() |
|
|
|
with torch.set_grad_enabled(phase == 'train'): |
|
outputs = model(inputs) |
|
_, preds = torch.max(outputs, 1) |
|
loss = criterion(outputs, labels) |
|
|
|
if phase == 'train': |
|
loss.backward() |
|
optimizer.step() |
|
|
|
running_loss += loss.item() * inputs.size(0) |
|
running_corrects += torch.sum(preds == labels.data) |
|
|
|
epoch_loss = running_loss / len(dataloader.dataset) |
|
epoch_acc = running_corrects.double() / len(dataloader.dataset) |
|
|
|
print(f'{phase} Loss: {epoch_loss:.4f} Acc: {epoch_acc:.4f}') |
|
|
|
if phase == 'valid': |
|
scheduler.step(epoch_acc) |
|
current_lr = optimizer.param_groups[0]['lr'] |
|
print(f'Learning rate: {current_lr}') |
|
if epoch_acc > best_acc: |
|
best_acc = epoch_acc |
|
best_model_wts = copy.deepcopy(model.state_dict()) |
|
|
|
print() |
|
|
|
print(f'Best val Acc: {best_acc:.4f}') |
|
model.load_state_dict(best_model_wts) |
|
return model |
|
|
|
|
|
def evaluate_model(model, test_loader, device='cuda'): |
|
model.eval() |
|
running_corrects = 0 |
|
|
|
with torch.no_grad(): |
|
for inputs, labels in test_loader: |
|
inputs = inputs.to(device) |
|
labels = labels.to(device) |
|
|
|
outputs = model(inputs) |
|
_, preds = torch.max(outputs, 1) |
|
running_corrects += torch.sum(preds == labels.data) |
|
|
|
test_acc = running_corrects.double() / len(test_loader.dataset) |
|
print(f'Test Acc: {test_acc:.4f}') |
|
|
|
if __name__ == "__main__": |
|
|
|
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") |
|
print(f"Using device: {device}") |
|
|
|
|
|
data_dir = 'Processed_Data' |
|
train_dataset = datasets.ImageFolder(os.path.join(data_dir, 'train'), transform=train_transform) |
|
valid_dataset = datasets.ImageFolder(os.path.join(data_dir, 'valid'), transform=val_test_transform) |
|
test_dataset = datasets.ImageFolder(os.path.join(data_dir, 'test'), transform=val_test_transform) |
|
|
|
|
|
batch_size = 32 |
|
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=4) |
|
valid_loader = DataLoader(valid_dataset, batch_size=batch_size, shuffle=False, num_workers=4) |
|
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False, num_workers=4) |
|
|
|
print(f"Number of training images: {len(train_dataset)}") |
|
print(f"Number of validation images: {len(valid_dataset)}") |
|
print(f"Number of test images: {len(test_dataset)}") |
|
|
|
|
|
num_classes = len(train_dataset.classes) |
|
model = ResNetLungCancer(num_classes) |
|
model = model.to(device) |
|
|
|
criterion = nn.CrossEntropyLoss() |
|
|
|
pretrained_params = list(model.resnet.parameters()) |
|
new_params = list(model.fc.parameters()) |
|
|
|
optimizer = optim.Adam([ |
|
{'params': pretrained_params, 'lr': 1e-5}, |
|
{'params': new_params, 'lr': 1e-4} |
|
], weight_decay=1e-6) |
|
|
|
scheduler = ReduceLROnPlateau(optimizer, mode='max', factor=0.5, patience=7) |
|
|
|
|
|
trained_model = train_model(model, train_loader, valid_loader, criterion, optimizer, scheduler, num_epochs=50, device=device) |
|
|
|
|
|
evaluate_model(trained_model, test_loader, device=device) |
|
|
|
|
|
torch.save(trained_model.state_dict(), 'lung_cancer_detection_model.pth') |
|
|
|
|
|
dummy_input = torch.randn(1, 3, 224, 224).to(device) |
|
torch.onnx.export(trained_model, dummy_input, "lung_cancer_detection_model.onnx", input_names=['input'], output_names=['output']) |
|
|
|
print("Training completed. Model saved.") |