Install necessary libraries
import os
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
from torch.utils.data import DataLoader
import torchvision.transforms as transforms
import torchvision.datasets as datasets
from sklearn.metrics import accuracy_score, confusion_matrix, ConfusionMatrixDisplay
import matplotlib.pyplot as plt
from huggingface_hub import hf_hub_download
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f'Using device: {device}')
username = "Vijayendra"
model_name_best = "QST-CIFAR10-BestModel"
save_dir = './hf_models'
os.makedirs(save_dir, exist_ok=True)
transform = transforms.Compose([
transforms.ToTensor(),
transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2470, 0.2435, 0.2616))
])
cifar10_test = datasets.CIFAR10(root='./data', train=False, download=True, transform=transform)
test_loader = DataLoader(cifar10_test, batch_size=128, shuffle=False, num_workers=4)
class PatchEmbedding(nn.Module):
def __init__(self, img_size=32, patch_size=4, in_channels=3, embed_dim=256):
super(PatchEmbedding, self).__init__()
self.img_size = img_size
self.patch_size = patch_size
self.num_patches = (img_size // patch_size) ** 2
self.embed_dim = embed_dim
self.conv_layers = nn.Sequential(
nn.Conv2d(in_channels, embed_dim // 2, kernel_size=3, stride=1, padding=1),
nn.BatchNorm2d(embed_dim // 2),
nn.ReLU(),
nn.Conv2d(embed_dim // 2, embed_dim, kernel_size=3, stride=1, padding=1),
nn.BatchNorm2d(embed_dim),
nn.ReLU(),
)
self.proj = nn.Conv2d(embed_dim, embed_dim, kernel_size=patch_size, stride=patch_size)
def forward(self, x):
x = self.conv_layers(x)
x = self.proj(x)
x = x.flatten(2)
x = x.transpose(1, 2)
return x
class SequentialAttentionBlock(nn.Module):
def __init__(self, embed_dim, num_heads, dropout=0.1):
super(SequentialAttentionBlock, self).__init__()
self.attention = nn.MultiheadAttention(embed_dim, num_heads, dropout=dropout)
self.norm = nn.LayerNorm(embed_dim)
self.dropout = nn.Dropout(dropout)
def forward(self, x):
seq_length = x.size(0)
attn_mask = torch.triu(torch.ones(seq_length, seq_length), diagonal=1).bool().to(x.device)
attn_output, _ = self.attention(x, x, x, attn_mask=attn_mask)
x = self.norm(x + attn_output)
return self.dropout(x)
class CyclicAttentionBlockCRF(nn.Module):
def __init__(self, embed_dim, num_heads, dropout=0.1):
super(CyclicAttentionBlockCRF, self).__init__()
self.attention = nn.MultiheadAttention(embed_dim, num_heads, dropout=dropout)
self.norm = nn.LayerNorm(embed_dim)
self.dropout = nn.Dropout(dropout)
self.cyclic_operator = nn.Linear(embed_dim, embed_dim, bias=False)
def forward(self, x):
attn_output, _ = self.attention(x, x, x)
x = self.norm(x + attn_output)
cyclic_term = self.cyclic_alignment(attn_output)
x = self.norm(x + cyclic_term)
return self.dropout(x)
def cyclic_alignment(self, attn_output):
cyclic_term = self.cyclic_operator(attn_output)
cyclic_term = torch.roll(cyclic_term, shifts=1, dims=0)
return cyclic_term
class CombinedTransformerBlock(nn.Module):
def __init__(self, embed_dim, num_heads, ff_dim, dropout=0.1, dropconnect_p=0.5):
super(CombinedTransformerBlock, self).__init__()
self.initial_attention = nn.MultiheadAttention(embed_dim, num_heads, dropout=dropconnect_p)
self.norm0 = nn.LayerNorm(embed_dim)
self.attention1 = nn.MultiheadAttention(embed_dim, num_heads, dropout=dropconnect_p)
self.norm1 = nn.LayerNorm(embed_dim)
self.dropconnect = nn.Dropout(dropconnect_p)
self.cyclic_attention = CyclicAttentionBlockCRF(embed_dim, num_heads, dropout)
self.sequential_attention = SequentialAttentionBlock(embed_dim, num_heads, dropout)
self.attention2 = nn.MultiheadAttention(embed_dim, num_heads, dropout=dropconnect_p)
self.norm2 = nn.LayerNorm(embed_dim)
self.ff = nn.Sequential(
nn.Linear(embed_dim, ff_dim),
nn.ReLU(),
nn.Linear(ff_dim, embed_dim)
)
self.norm3 = nn.LayerNorm(embed_dim)
self.dropout = nn.Dropout(dropout)
def forward(self, x):
attn_output, _ = self.initial_attention(x, x, x)
x = self.norm0(x + attn_output)
attn_output, _ = self.attention1(x, x, x)
x = self.norm1(x + attn_output)
x = self.dropconnect(x)
x = self.cyclic_attention(x)
x = self.sequential_attention(x)
attn_output, _ = self.attention2(x, x, x)
x = self.norm2(x + attn_output)
ff_output = self.ff(x)
x = self.norm3(x + self.dropout(ff_output))
return x
class DecoderBlock(nn.Module):
def __init__(self, embed_dim, num_heads, ff_dim, dropout=0.1):
super(DecoderBlock, self).__init__()
self.attention = nn.MultiheadAttention(embed_dim, num_heads, dropout=dropout)
self.norm1 = nn.LayerNorm(embed_dim)
self.cyclic_attention = CyclicAttentionBlockCRF(embed_dim, num_heads, dropout)
self.ff = nn.Sequential(
nn.Linear(embed_dim, ff_dim),
nn.ReLU(),
nn.Linear(ff_dim, embed_dim)
)
self.norm2 = nn.LayerNorm(embed_dim)
self.dropout = nn.Dropout(dropout)
def forward(self, x, encoder_output):
attn_output, _ = self.attention(x, encoder_output, encoder_output)
x = self.norm1(x + attn_output)
x = self.cyclic_attention(x)
ff_output = self.ff(x)
x = self.norm2(x + self.dropout(ff_output))
return x
class CustomTransformer(nn.Module):
def __init__(self, embed_dim, num_heads, ff_dim, num_classes, num_layers=6, dropconnect_p=0.5):
super(CustomTransformer, self).__init__()
self.embed_dim = embed_dim
self.num_patches = (32 // 4) ** 2
self.patch_embedding = PatchEmbedding(embed_dim=embed_dim)
self.positional_encoding = nn.Parameter(torch.zeros(1, self.num_patches, embed_dim))
nn.init.trunc_normal_(self.positional_encoding, std=0.02)
self.encoder_blocks = nn.ModuleList([
CombinedTransformerBlock(embed_dim, num_heads, ff_dim, dropconnect_p=dropconnect_p)
for _ in range(num_layers)
])
self.decoder_blocks = nn.ModuleList([
DecoderBlock(embed_dim, num_heads, ff_dim)
for _ in range(num_layers)
])
self.fc = nn.Linear(embed_dim, num_classes)
def forward(self, x):
x = self.patch_embedding(x)
x += self.positional_encoding
x = x.transpose(0, 1)
encoder_output = x
for encoder in self.encoder_blocks:
encoder_output = encoder(encoder_output)
decoder_output = encoder_output
for decoder in self.decoder_blocks:
decoder_output = decoder(decoder_output, encoder_output)
decoder_output = decoder_output.mean(dim=0)
logits = self.fc(decoder_output)
return logits
embed_dim = 512
num_heads = 32
ff_dim = 1024
num_classes = 10
num_layers = 10
model_best = CustomTransformer(embed_dim, num_heads, ff_dim, num_classes, num_layers=num_layers).to(device)
model_best_path = hf_hub_download(repo_id=f"{username}/{model_name_best}", filename="model_best.pth")
model_best.load_state_dict(torch.load(model_best_path, map_location=device))
model_best.eval()
test_labels = []
test_preds_best = []
with torch.no_grad():
for images_test, labels_test in test_loader:
images_test = images_test.to(device)
logits_best = model_best(images_test)
probs_best = F.softmax(logits_best, dim=1).cpu().numpy()
test_preds_best.extend(probs_best)
test_labels.extend(labels_test.numpy())
test_preds_best_labels = np.argmax(test_preds_best, axis=1)
test_labels = np.array(test_labels)
test_accuracy = accuracy_score(test_labels, test_preds_best_labels)
print(f'Test Accuracy of Best Model: {test_accuracy * 100:.2f}%')
cm = confusion_matrix(test_labels, test_preds_best_labels)
disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=cifar10_test.classes)
disp.plot(cmap=plt.cm.Blues)
plt.xticks(rotation=45, ha='right')
plt.title('Confusion Matrix for Best Model on CIFAR-10 Test Set')
plt.savefig(os.path.join(save_dir, 'best_model_confusion_matrix.png'))
plt.show()