|
import torch |
|
import torch.nn as nn |
|
import torch.nn.functional as F |
|
import math |
|
from torch.utils.checkpoint import checkpoint |
|
|
|
class RMSNorm(nn.Module): |
|
def __init__(self, dim, eps=1e-6): |
|
super().__init__() |
|
self.eps = eps |
|
self.weight = nn.Parameter(torch.ones(dim)) |
|
|
|
def forward(self, x): |
|
mean_square = torch.mean(x ** 2, dim=-1, keepdim=True) |
|
normalized_x = x / torch.sqrt(mean_square + self.eps) |
|
return self.weight * normalized_x |
|
|
|
class RotaryPositionalEmbedding(nn.Module): |
|
def __init__(self, dim): |
|
super().__init__() |
|
self.dim = dim |
|
|
|
def forward(self, x): |
|
max_len = x.size(1) |
|
freqs = torch.arange(0, self.dim // 2, dtype=torch.float32).to(x.device) |
|
inv_freq = 1.0 / (10000 ** (freqs / (self.dim // 2))) |
|
t = torch.arange(max_len, dtype=torch.float32).to(x.device) |
|
sinusoid_inp = torch.outer(t, inv_freq) |
|
sin_inp = sinusoid_inp.sin() |
|
cos_inp = sinusoid_inp.cos() |
|
emb_sin_cos = torch.stack((sin_inp, cos_inp), dim=-1).view(max_len, -1) |
|
return x + emb_sin_cos[:max_len, :self.dim].unsqueeze(0) |
|
|
|
def apply_rotary_emb(xq, xk, freqs_cis): |
|
xq_ = torch.view_as_complex(xq.float().reshape(*xq.shape[:-1], -1, 2)) |
|
xk_ = torch.view_as_complex(xk.float().reshape(*xk.shape[:-1], -1, 2)) |
|
freqs_cis = reshape_for_broadcast(freqs_cis, xq_) |
|
xq_out = torch.view_as_real(xq_ * freqs_cis).flatten(3) |
|
xk_out = torch.view_as_real(xk_ * freqs_cis).flatten(3) |
|
return xq_out.type_as(xq), xk_out.type_as(xk) |
|
|
|
def reshape_for_broadcast(freqs_cis, x): |
|
ndim = x.ndim |
|
assert 0 <= 1 < ndim |
|
assert freqs_cis.shape == (x.shape[1], x.shape[-1]) |
|
shape = [d if i == 1 or i == ndim - 1 else 1 for i, d in enumerate(x.shape)] |
|
return freqs_cis.view(*shape) |
|
|
|
class SwiGLU(nn.Module): |
|
def __init__(self, embed_size, expansion_factor=4): |
|
super().__init__() |
|
self.fc1 = nn.Linear(embed_size, expansion_factor * embed_size) |
|
self.fc2 = nn.Linear(expansion_factor * embed_size, embed_size) |
|
self.dropout = nn.Dropout(0.1) |
|
|
|
def forward(self, x): |
|
x = self.fc1(x) |
|
x = F.silu(x) * x |
|
x = self.dropout(x) |
|
x = self.fc2(x) |
|
return x |
|
|
|
class SelfAttention(nn.Module): |
|
def __init__(self, embed_size, heads): |
|
super().__init__() |
|
self.embed_size = embed_size |
|
self.heads = heads |
|
self.head_dim = embed_size // heads |
|
|
|
assert embed_size % heads == 0, "Embed size must be divisible by heads" |
|
|
|
self.values = nn.Linear(embed_size, embed_size, bias=False) |
|
self.keys = nn.Linear(embed_size, embed_size, bias=False) |
|
self.queries = nn.Linear(embed_size, embed_size, bias=False) |
|
self.fc_out = nn.Linear(embed_size, embed_size) |
|
|
|
def forward(self, values, keys, queries, mask=None): |
|
N = queries.shape[0] |
|
value_len, key_len, query_len = values.shape[1], keys.shape[1], queries.shape[1] |
|
|
|
values = self.values(values).view(N, value_len, self.heads, self.head_dim).transpose(1, 2) |
|
keys = self.keys(keys).view(N, key_len, self.heads, self.head_dim).transpose(1, 2) |
|
queries = self.queries(queries).view(N, query_len, self.heads, self.head_dim).transpose(1, 2) |
|
|
|
energy = torch.einsum("bthd,bshd->bhts", [queries, keys]) |
|
|
|
if mask is not None: |
|
energy = energy.masked_fill(mask == 0, float('-inf')) |
|
|
|
attention = torch.softmax(energy / (self.head_dim ** 0.5), dim=-1) |
|
|
|
out = torch.einsum("bhts,bshd->bthd", [attention, values]).transpose(1, 2).reshape(N, query_len, self.embed_size) |
|
return self.fc_out(out) |
|
|
|
class TransformerBlock(nn.Module): |
|
def __init__(self, embed_size, heads, expansion_factor=4, dropout=0.1, checkpoint=False): |
|
super().__init__() |
|
self.attention = SelfAttention(embed_size, heads) |
|
self.feed_forward = SwiGLU(embed_size, expansion_factor) |
|
self.norm1 = RMSNorm(embed_size) |
|
self.norm2 = RMSNorm(embed_size) |
|
self.rotary_pos_emb = RotaryPositionalEmbedding(embed_size) |
|
self.checkpoint = checkpoint |
|
|
|
def forward(self, value, mask=None): |
|
def forward_fn(value, mask): |
|
value = self.rotary_pos_emb(value) |
|
attention = self.attention(value, value, value, mask) |
|
x = self.norm1(attention + value) |
|
forward = self.feed_forward(x) |
|
out = self.norm2(forward + x) |
|
return out |
|
|
|
if self.checkpoint: |
|
return checkpoint(forward_fn, value, mask) |
|
else: |
|
return forward_fn(value, mask) |
|
|
|
class GPT(nn.Module): |
|
def __init__(self, vocab_size, embed_size, num_layers, heads, max_length, expansion_factor=4, dropout=0.1, checkpoint=False): |
|
super().__init__() |
|
self.word_embedding = nn.Embedding(vocab_size, embed_size) |
|
self.position_embedding = nn.Embedding(max_length, embed_size) |
|
|
|
self.src_vocab_size = vocab_size |
|
|
|
self.layers = nn.ModuleList( |
|
[TransformerBlock(embed_size, heads, expansion_factor, dropout, checkpoint) |
|
for _ in range(num_layers)] |
|
) |
|
self.norm = RMSNorm(embed_size) |
|
self.fc_out = nn.Linear(embed_size, vocab_size) |
|
|
|
def forward(self, x, mask=None): |
|
positions = torch.arange(0, x.size(1)).unsqueeze(0).to(x.device) |
|
x = self.word_embedding(x) + self.position_embedding(positions) |
|
|
|
for layer in self.layers: |
|
x = layer(x, mask) |
|
|
|
x = self.norm(x) |
|
return self.fc_out(x) |
|
|
|
device = torch.device("cuda" if torch.cuda.is_available() else "cpu") |
|
model = GPT(vocab_size=10000, embed_size=768, num_layers=20, heads=16, max_length=512, checkpoint=True) |
|
model.to(device) |
|
|
|
inputs = torch.randint(0, 10000, (1, 100), device=device) |
|
outputs = model(inputs) |
|
print(outputs.shape) |
|
|