import torch import torch.nn as nn import torch.optim as optim from torchvision import datasets, transforms, models from torch.utils.data import DataLoader import os import copy from torch.optim.lr_scheduler import ReduceLROnPlateau from torchvision.models import resnet50, ResNet50_Weights import ssl ssl._create_default_https_context = ssl._create_unverified_context # data transformations with augmentation train_transform = transforms.Compose([ transforms.RandomResizedCrop(224), transforms.RandomHorizontalFlip(), transforms.RandomRotation(10), transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2), transforms.ToTensor(), transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]) ]) val_test_transform = transforms.Compose([ transforms.Resize(256), transforms.CenterCrop(224), transforms.ToTensor(), transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]) ]) class ResNetLungCancer(nn.Module): def __init__(self, num_classes, use_pretrained=True): super(ResNetLungCancer, self).__init__() if use_pretrained: weights = ResNet50_Weights.IMAGENET1K_V1 else: weights = None self.resnet = resnet50(weights=weights) num_ftrs = self.resnet.fc.in_features self.resnet.fc = nn.Identity() # remove the final fully connected layer self.fc = nn.Sequential( nn.Linear(num_ftrs, 256), nn.ReLU(), nn.Dropout(0.5), nn.Linear(256, num_classes) ) def forward(self, x): x = self.resnet(x) return self.fc(x) # train function def train_model(model, train_loader, valid_loader, criterion, optimizer, scheduler, num_epochs=50, device='cuda'): best_model_wts = copy.deepcopy(model.state_dict()) best_acc = 0.0 for epoch in range(num_epochs): print(f'Epoch {epoch}/{num_epochs - 1}') print('-' * 10) for phase in ['train', 'valid']: if phase == 'train': model.train() dataloader = train_loader else: model.eval() dataloader = valid_loader running_loss = 0.0 running_corrects = 0 for inputs, labels in dataloader: inputs = inputs.to(device) labels = labels.to(device) optimizer.zero_grad() with torch.set_grad_enabled(phase == 'train'): outputs = model(inputs) _, preds = torch.max(outputs, 1) loss = criterion(outputs, labels) if phase == 'train': loss.backward() optimizer.step() running_loss += loss.item() * inputs.size(0) running_corrects += torch.sum(preds == labels.data) epoch_loss = running_loss / len(dataloader.dataset) epoch_acc = running_corrects.double() / len(dataloader.dataset) print(f'{phase} Loss: {epoch_loss:.4f} Acc: {epoch_acc:.4f}') if phase == 'valid': scheduler.step(epoch_acc) current_lr = optimizer.param_groups[0]['lr'] print(f'Learning rate: {current_lr}') if epoch_acc > best_acc: best_acc = epoch_acc best_model_wts = copy.deepcopy(model.state_dict()) print() print(f'Best val Acc: {best_acc:.4f}') model.load_state_dict(best_model_wts) return model # eval the model def evaluate_model(model, test_loader, device='cuda'): model.eval() running_corrects = 0 with torch.no_grad(): for inputs, labels in test_loader: inputs = inputs.to(device) labels = labels.to(device) outputs = model(inputs) _, preds = torch.max(outputs, 1) running_corrects += torch.sum(preds == labels.data) test_acc = running_corrects.double() / len(test_loader.dataset) print(f'Test Acc: {test_acc:.4f}') if __name__ == "__main__": # device device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") print(f"Using device: {device}") # data data_dir = 'Processed_Data' train_dataset = datasets.ImageFolder(os.path.join(data_dir, 'train'), transform=train_transform) valid_dataset = datasets.ImageFolder(os.path.join(data_dir, 'valid'), transform=val_test_transform) test_dataset = datasets.ImageFolder(os.path.join(data_dir, 'test'), transform=val_test_transform) # dataloaders batch_size = 32 train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=4) valid_loader = DataLoader(valid_dataset, batch_size=batch_size, shuffle=False, num_workers=4) test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False, num_workers=4) print(f"Number of training images: {len(train_dataset)}") print(f"Number of validation images: {len(valid_dataset)}") print(f"Number of test images: {len(test_dataset)}") # initialize model, loss, and optimizer num_classes = len(train_dataset.classes) model = ResNetLungCancer(num_classes) model = model.to(device) criterion = nn.CrossEntropyLoss() pretrained_params = list(model.resnet.parameters()) new_params = list(model.fc.parameters()) optimizer = optim.Adam([ {'params': pretrained_params, 'lr': 1e-5}, {'params': new_params, 'lr': 1e-4} ], weight_decay=1e-6) scheduler = ReduceLROnPlateau(optimizer, mode='max', factor=0.5, patience=7) # train the model trained_model = train_model(model, train_loader, valid_loader, criterion, optimizer, scheduler, num_epochs=50, device=device) # eval the model evaluate_model(trained_model, test_loader, device=device) # save the model weights torch.save(trained_model.state_dict(), 'lung_cancer_detection_model.pth') # save the model in ONNX format dummy_input = torch.randn(1, 3, 224, 224).to(device) torch.onnx.export(trained_model, dummy_input, "lung_cancer_detection_model.onnx", input_names=['input'], output_names=['output']) print("Training completed. Model saved.")