import torch import torch.nn as nn import torch.nn.functional as F import math from torch.utils.checkpoint import checkpoint class RMSNorm(nn.Module): def __init__(self, dim, eps=1e-6): super().__init__() self.eps = eps self.weight = nn.Parameter(torch.ones(dim)) def forward(self, x): mean_square = torch.mean(x ** 2, dim=-1, keepdim=True) normalized_x = x / torch.sqrt(mean_square + self.eps) return self.weight * normalized_x class RotaryPositionalEmbedding(nn.Module): def __init__(self, dim): super().__init__() self.dim = dim def forward(self, x): max_len = x.size(1) freqs = torch.arange(0, self.dim // 2, dtype=torch.float32).to(x.device) inv_freq = 1.0 / (10000 ** (freqs / (self.dim // 2))) t = torch.arange(max_len, dtype=torch.float32).to(x.device) sinusoid_inp = torch.outer(t, inv_freq) sin_inp = sinusoid_inp.sin() cos_inp = sinusoid_inp.cos() emb_sin_cos = torch.stack((sin_inp, cos_inp), dim=-1).view(max_len, -1) return x + emb_sin_cos[:max_len, :self.dim].unsqueeze(0) def apply_rotary_emb(xq, xk, freqs_cis): xq_ = torch.view_as_complex(xq.float().reshape(*xq.shape[:-1], -1, 2)) xk_ = torch.view_as_complex(xk.float().reshape(*xk.shape[:-1], -1, 2)) freqs_cis = reshape_for_broadcast(freqs_cis, xq_) xq_out = torch.view_as_real(xq_ * freqs_cis).flatten(3) xk_out = torch.view_as_real(xk_ * freqs_cis).flatten(3) return xq_out.type_as(xq), xk_out.type_as(xk) def reshape_for_broadcast(freqs_cis, x): ndim = x.ndim assert 0 <= 1 < ndim assert freqs_cis.shape == (x.shape[1], x.shape[-1]) shape = [d if i == 1 or i == ndim - 1 else 1 for i, d in enumerate(x.shape)] return freqs_cis.view(*shape) class SwiGLU(nn.Module): def __init__(self, embed_size, expansion_factor=4): super().__init__() self.fc1 = nn.Linear(embed_size, expansion_factor * embed_size) self.fc2 = nn.Linear(expansion_factor * embed_size, embed_size) self.dropout = nn.Dropout(0.1) def forward(self, x): x = self.fc1(x) x = F.silu(x) * x x = self.dropout(x) x = self.fc2(x) return x class SelfAttention(nn.Module): def __init__(self, embed_size, heads): super().__init__() self.embed_size = embed_size self.heads = heads self.head_dim = embed_size // heads assert embed_size % heads == 0, "Embed size must be divisible by heads" self.values = nn.Linear(embed_size, embed_size, bias=False) self.keys = nn.Linear(embed_size, embed_size, bias=False) self.queries = nn.Linear(embed_size, embed_size, bias=False) self.fc_out = nn.Linear(embed_size, embed_size) def forward(self, values, keys, queries, mask=None): N = queries.shape[0] value_len, key_len, query_len = values.shape[1], keys.shape[1], queries.shape[1] values = self.values(values).view(N, value_len, self.heads, self.head_dim).transpose(1, 2) keys = self.keys(keys).view(N, key_len, self.heads, self.head_dim).transpose(1, 2) queries = self.queries(queries).view(N, query_len, self.heads, self.head_dim).transpose(1, 2) energy = torch.einsum("bthd,bshd->bhts", [queries, keys]) if mask is not None: energy = energy.masked_fill(mask == 0, float('-inf')) attention = torch.softmax(energy / (self.head_dim ** 0.5), dim=-1) out = torch.einsum("bhts,bshd->bthd", [attention, values]).transpose(1, 2).reshape(N, query_len, self.embed_size) return self.fc_out(out) class TransformerBlock(nn.Module): def __init__(self, embed_size, heads, expansion_factor=4, dropout=0.1, checkpoint=False): super().__init__() self.attention = SelfAttention(embed_size, heads) self.feed_forward = SwiGLU(embed_size, expansion_factor) self.norm1 = RMSNorm(embed_size) self.norm2 = RMSNorm(embed_size) self.rotary_pos_emb = RotaryPositionalEmbedding(embed_size) self.checkpoint = checkpoint def forward(self, value, mask=None): def forward_fn(value, mask): value = self.rotary_pos_emb(value) attention = self.attention(value, value, value, mask) x = self.norm1(attention + value) forward = self.feed_forward(x) out = self.norm2(forward + x) return out if self.checkpoint: return checkpoint(forward_fn, value, mask) else: return forward_fn(value, mask) class GPT(nn.Module): def __init__(self, vocab_size, embed_size, num_layers, heads, max_length, expansion_factor=4, dropout=0.1, checkpoint=False): super().__init__() self.word_embedding = nn.Embedding(vocab_size, embed_size) self.position_embedding = nn.Embedding(max_length, embed_size) self.src_vocab_size = vocab_size self.layers = nn.ModuleList( [TransformerBlock(embed_size, heads, expansion_factor, dropout, checkpoint) for _ in range(num_layers)] ) self.norm = RMSNorm(embed_size) self.fc_out = nn.Linear(embed_size, vocab_size) def forward(self, x, mask=None): positions = torch.arange(0, x.size(1)).unsqueeze(0).to(x.device) x = self.word_embedding(x) + self.position_embedding(positions) for layer in self.layers: x = layer(x, mask) x = self.norm(x) return self.fc_out(x) device = torch.device("cuda" if torch.cuda.is_available() else "cpu") model = GPT(vocab_size=10000, embed_size=768, num_layers=20, heads=16, max_length=512, checkpoint=True) model.to(device) inputs = torch.randint(0, 10000, (1, 100), device=device) outputs = model(inputs) print(outputs.shape) # Should output: [1, 100, 10000]