"""Module to define the model.""" # Resources # https://lightning.ai/docs/pytorch/stable/starter/introduction.html # https://lightning.ai/docs/pytorch/stable/starter/converting.html # https://lightning.ai/docs/pytorch/stable/notebooks/lightning_examples/cifar10-baseline.html import modules.config as config import pytorch_lightning as pl import torch import torch.nn as nn import torch.nn.functional as F import torch.optim as optim import torchinfo from torch.optim.lr_scheduler import OneCycleLR from torch_lr_finder import LRFinder from torchmetrics import Accuracy # What is the start LR and weight decay you'd prefer? PREFERRED_START_LR = config.PREFERRED_START_LR PREFERRED_WEIGHT_DECAY = config.PREFERRED_WEIGHT_DECAY def detailed_model_summary(model, input_size): """Define a function to print the model summary.""" # https://github.com/TylerYep/torchinfo torchinfo.summary( model, input_size=input_size, batch_dim=0, col_names=( "input_size", "kernel_size", "output_size", "num_params", "trainable", ), verbose=1, col_width=16, ) ############# Assignment 13 Model ############# # This is for Assignment 13 # Model used from Assignment 11 and converted to lightning model class CustomResNet(pl.LightningModule): """This defines the structure of the NN.""" # Class variable to print shape print_shape = False # Default dropout value dropout_value = 0.02 def __init__(self): super().__init__() # Define loss function # https://pytorch.org/docs/stable/generated/torch.nn.CrossEntropyLoss.html self.loss_function = torch.nn.CrossEntropyLoss() # Define accuracy function # https://torchmetrics.readthedocs.io/en/stable/classification/accuracy.html self.accuracy_function = Accuracy(task="multiclass", num_classes=10) # Add results dictionary self.results = { "train_loss": [], "train_acc": [], "test_loss": [], "test_acc": [], "val_loss": [], "val_acc": [], } # Save misclassified images self.misclassified_image_data = {"images": [], "ground_truths": [], "predicted_vals": []} # LR self.learning_rate = PREFERRED_START_LR # Model Notes # PrepLayer - Conv 3x3 s1, p1) >> BN >> RELU [64k] # 1. Input size: 32x32x3 self.prep = nn.Sequential( nn.Conv2d( in_channels=3, out_channels=64, kernel_size=(3, 3), stride=1, padding=1, dilation=1, bias=False, ), nn.BatchNorm2d(64), nn.ReLU(), nn.Dropout(self.dropout_value), ) # Layer1: X = Conv 3x3 (s1, p1) >> MaxPool2D >> BN >> RELU [128k] self.layer1_x = nn.Sequential( nn.Conv2d( in_channels=64, out_channels=128, kernel_size=(3, 3), stride=1, padding=1, dilation=1, bias=False, ), nn.MaxPool2d(kernel_size=2, stride=2), nn.BatchNorm2d(128), nn.ReLU(), nn.Dropout(self.dropout_value), ) # Layer1: R1 = ResBlock( (Conv-BN-ReLU-Conv-BN-ReLU))(X) [128k] self.layer1_r1 = nn.Sequential( nn.Conv2d( in_channels=128, out_channels=128, kernel_size=(3, 3), stride=1, padding=1, dilation=1, bias=False, ), nn.BatchNorm2d(128), nn.ReLU(), nn.Dropout(self.dropout_value), nn.Conv2d( in_channels=128, out_channels=128, kernel_size=(3, 3), stride=1, padding=1, dilation=1, bias=False, ), nn.BatchNorm2d(128), nn.ReLU(), nn.Dropout(self.dropout_value), ) # Layer 2: Conv 3x3 [256k], MaxPooling2D, BN, ReLU self.layer2 = nn.Sequential( nn.Conv2d( in_channels=128, out_channels=256, kernel_size=(3, 3), stride=1, padding=1, dilation=1, bias=False, ), nn.MaxPool2d(kernel_size=2, stride=2), nn.BatchNorm2d(256), nn.ReLU(), nn.Dropout(self.dropout_value), ) # Layer 3: X = Conv 3x3 (s1, p1) >> MaxPool2D >> BN >> RELU [512k] self.layer3_x = nn.Sequential( nn.Conv2d( in_channels=256, out_channels=512, kernel_size=(3, 3), stride=1, padding=1, dilation=1, bias=False, ), nn.MaxPool2d(kernel_size=2, stride=2), nn.BatchNorm2d(512), nn.ReLU(), nn.Dropout(self.dropout_value), ) # Layer 3: R2 = ResBlock( (Conv-BN-ReLU-Conv-BN-ReLU))(X) [512k] self.layer3_r2 = nn.Sequential( nn.Conv2d( in_channels=512, out_channels=512, kernel_size=(3, 3), stride=1, padding=1, dilation=1, bias=False, ), nn.BatchNorm2d(512), nn.ReLU(), nn.Dropout(self.dropout_value), nn.Conv2d( in_channels=512, out_channels=512, kernel_size=(3, 3), stride=1, padding=1, dilation=1, bias=False, ), nn.BatchNorm2d(512), nn.ReLU(), nn.Dropout(self.dropout_value), ) # MaxPooling with Kernel Size 4 # If stride is None, it is set to kernel_size self.maxpool = nn.MaxPool2d(kernel_size=4, stride=4) # FC Layer self.fc = nn.Linear(512, 10) # Save hyperparameters self.save_hyperparameters() def print_view(self, x, msg=""): """Print shape of the model""" if self.print_shape: if msg != "": print(msg, "\n\t", x.shape, "\n") else: print(x.shape) def forward(self, x): """Forward pass""" # PrepLayer x = self.prep(x) self.print_view(x, "PrepLayer") # Layer 1 x = self.layer1_x(x) self.print_view(x, "Layer 1, X") r1 = self.layer1_r1(x) self.print_view(r1, "Layer 1, R1") x = x + r1 self.print_view(x, "Layer 1, X + R1") # Layer 2 x = self.layer2(x) self.print_view(x, "Layer 2") # Layer 3 x = self.layer3_x(x) self.print_view(x, "Layer 3, X") r2 = self.layer3_r2(x) self.print_view(r2, "Layer 3, R2") x = x + r2 self.print_view(x, "Layer 3, X + R2") # MaxPooling x = self.maxpool(x) self.print_view(x, "Max Pooling") # FC Layer # Reshape before FC such that it becomes 1D x = x.view(x.shape[0], -1) self.print_view(x, "Reshape before FC") x = self.fc(x) self.print_view(x, "After FC") # Softmax return F.log_softmax(x, dim=-1) # Alert: Remove this function later as Tuner is now being used to automatically find the best LR def find_optimal_lr(self, train_loader): """Use LR Finder to find the best starting learning rate""" # https://github.com/davidtvs/pytorch-lr-finder # https://github.com/davidtvs/pytorch-lr-finder#notes # https://github.com/davidtvs/pytorch-lr-finder/blob/master/torch_lr_finder/lr_finder.py # New optimizer with default LR tmp_optimizer = optim.Adam(self.parameters(), lr=PREFERRED_START_LR, weight_decay=PREFERRED_WEIGHT_DECAY) # Create LR finder object lr_finder = LRFinder(self, optimizer=tmp_optimizer, criterion=self.loss_function) lr_finder.range_test(train_loader=train_loader, end_lr=10, num_iter=100) # https://github.com/davidtvs/pytorch-lr-finder/issues/88 _, suggested_lr = lr_finder.plot(suggest_lr=True) lr_finder.reset() # plot.figure.savefig("LRFinder - Suggested Max LR.png") print(f"Suggested Max LR: {suggested_lr}") if suggested_lr is None: suggested_lr = PREFERRED_START_LR return suggested_lr # optimiser function def configure_optimizers(self): """Add ADAM optimizer to the lightning module""" optimizer = optim.Adam(self.parameters(), lr=self.learning_rate, weight_decay=PREFERRED_WEIGHT_DECAY) # Percent start for OneCycleLR # Handles the case where max_epochs is less than 5 percent_start = 5 / int(self.trainer.max_epochs) if percent_start >= 1: percent_start = 0.3 # https://lightning.ai/docs/pytorch/stable/common/optimization.html#total-stepping-batches scheduler_dict = { "scheduler": OneCycleLR( optimizer=optimizer, max_lr=self.learning_rate, total_steps=int(self.trainer.estimated_stepping_batches), pct_start=percent_start, div_factor=100, three_phase=False, anneal_strategy="linear", final_div_factor=100, verbose=False, ), "interval": "step", } return {"optimizer": optimizer, "lr_scheduler": scheduler_dict} # Define loss function def compute_loss(self, prediction, target): """Compute Loss""" # Calculate loss loss = self.loss_function(prediction, target) return loss # Define accuracy function def compute_accuracy(self, prediction, target): """Compute accuracy""" # Calculate accuracy acc = self.accuracy_function(prediction, target) return acc * 100 # Function to compute loss and accuracy for both training and validation def compute_metrics(self, batch): """Function to calculate loss and accuracy""" # Get data and target from batch data, target = batch # Generate predictions using model pred = self(data) # Calculate loss for the batch loss = self.compute_loss(prediction=pred, target=target) # Calculate accuracy for the batch acc = self.compute_accuracy(prediction=pred, target=target) return loss, acc # Get misclassified images based on how many images to return def store_misclassified_images(self): """Get an array of misclassified images""" self.misclassified_image_data = {"images": [], "ground_truths": [], "predicted_vals": []} # Initialize the model to evaluation mode self.eval() # Disable gradient calculation while testing with torch.no_grad(): for batch in self.trainer.test_dataloaders: # Move data and labels to device data, target = batch data, target = data.to(self.device), target.to(self.device) # Predict using model pred = self(data) # Get the index of the max log-probability output = pred.argmax(dim=1) # Save the incorrect predictions incorrect_indices = ~output.eq(target) # Store images incorrectly predicted, generated predictions and the actual value self.misclassified_image_data["images"].extend(data[incorrect_indices]) self.misclassified_image_data["ground_truths"].extend(target[incorrect_indices]) self.misclassified_image_data["predicted_vals"].extend(output[incorrect_indices]) # training function def training_step(self, batch, batch_idx): """Training step""" # Compute loss and accuracy loss, acc = self.compute_metrics(batch) self.log("train_loss", loss, prog_bar=True, on_epoch=True, logger=True) self.log("train_acc", acc, prog_bar=True, on_epoch=True, logger=True) # Return training loss return loss # validation function def validation_step(self, batch, batch_idx): """Validation step""" # Compute loss and accuracy loss, acc = self.compute_metrics(batch) self.log("val_loss", loss, prog_bar=True, on_epoch=True, logger=True) self.log("val_acc", acc, prog_bar=True, on_epoch=True, logger=True) # Return validation loss return loss # test function will just use validation step def test_step(self, batch, batch_idx): """Test step""" # Compute loss and accuracy loss, acc = self.compute_metrics(batch) self.log("test_loss", loss, prog_bar=False, on_epoch=True, logger=True) self.log("test_acc", acc, prog_bar=False, on_epoch=True, logger=True) # Return validation loss return loss # At the end of train epoch append the training loss and accuracy to an instance variable called results def on_train_epoch_end(self): """On train epoch end""" # Append training loss and accuracy to results self.results["train_loss"].append(self.trainer.callback_metrics["train_loss"].detach().item()) self.results["train_acc"].append(self.trainer.callback_metrics["train_acc"].detach().item()) # At the end of validation epoch append the validation loss and accuracy to an instance variable called results def on_validation_epoch_end(self): """On validation epoch end""" # Append validation loss and accuracy to results self.results["test_loss"].append(self.trainer.callback_metrics["val_loss"].detach().item()) self.results["test_acc"].append(self.trainer.callback_metrics["val_acc"].detach().item()) # # At the end of test epoch append the test loss and accuracy to an instance variable called results # def on_test_epoch_end(self): # """On test epoch end""" # # Append test loss and accuracy to results # self.results["test_loss"].append(self.trainer.callback_metrics["test_loss"].detach().item()) # self.results["test_acc"].append(self.trainer.callback_metrics["test_acc"].detach().item()) # At the end of test save misclassified images, the predictions and ground truth in an instance variable called misclassified_image_data def on_test_end(self): """On test end""" print("Test ended! Saving misclassified images") # Get misclassified images self.store_misclassified_images()