SNT-700M / model.py
saintyboy's picture
Create model.py
dc49c0d verified
import torch
import torch.nn as nn
import torch.nn.functional as F
import math
from torch.utils.checkpoint import checkpoint
class RMSNorm(nn.Module):
def __init__(self, dim, eps=1e-6):
super().__init__()
self.eps = eps
self.weight = nn.Parameter(torch.ones(dim))
def forward(self, x):
mean_square = torch.mean(x ** 2, dim=-1, keepdim=True)
normalized_x = x / torch.sqrt(mean_square + self.eps)
return self.weight * normalized_x
class RotaryPositionalEmbedding(nn.Module):
def __init__(self, dim):
super().__init__()
self.dim = dim
def forward(self, x):
max_len = x.size(1)
freqs = torch.arange(0, self.dim // 2, dtype=torch.float32).to(x.device)
inv_freq = 1.0 / (10000 ** (freqs / (self.dim // 2)))
t = torch.arange(max_len, dtype=torch.float32).to(x.device)
sinusoid_inp = torch.outer(t, inv_freq)
sin_inp = sinusoid_inp.sin()
cos_inp = sinusoid_inp.cos()
emb_sin_cos = torch.stack((sin_inp, cos_inp), dim=-1).view(max_len, -1)
return x + emb_sin_cos[:max_len, :self.dim].unsqueeze(0)
def apply_rotary_emb(xq, xk, freqs_cis):
xq_ = torch.view_as_complex(xq.float().reshape(*xq.shape[:-1], -1, 2))
xk_ = torch.view_as_complex(xk.float().reshape(*xk.shape[:-1], -1, 2))
freqs_cis = reshape_for_broadcast(freqs_cis, xq_)
xq_out = torch.view_as_real(xq_ * freqs_cis).flatten(3)
xk_out = torch.view_as_real(xk_ * freqs_cis).flatten(3)
return xq_out.type_as(xq), xk_out.type_as(xk)
def reshape_for_broadcast(freqs_cis, x):
ndim = x.ndim
assert 0 <= 1 < ndim
assert freqs_cis.shape == (x.shape[1], x.shape[-1])
shape = [d if i == 1 or i == ndim - 1 else 1 for i, d in enumerate(x.shape)]
return freqs_cis.view(*shape)
class SwiGLU(nn.Module):
def __init__(self, embed_size, expansion_factor=4):
super().__init__()
self.fc1 = nn.Linear(embed_size, expansion_factor * embed_size)
self.fc2 = nn.Linear(expansion_factor * embed_size, embed_size)
self.dropout = nn.Dropout(0.1)
def forward(self, x):
x = self.fc1(x)
x = F.silu(x) * x
x = self.dropout(x)
x = self.fc2(x)
return x
class SelfAttention(nn.Module):
def __init__(self, embed_size, heads):
super().__init__()
self.embed_size = embed_size
self.heads = heads
self.head_dim = embed_size // heads
assert embed_size % heads == 0, "Embed size must be divisible by heads"
self.values = nn.Linear(embed_size, embed_size, bias=False)
self.keys = nn.Linear(embed_size, embed_size, bias=False)
self.queries = nn.Linear(embed_size, embed_size, bias=False)
self.fc_out = nn.Linear(embed_size, embed_size)
def forward(self, values, keys, queries, mask=None):
N = queries.shape[0]
value_len, key_len, query_len = values.shape[1], keys.shape[1], queries.shape[1]
values = self.values(values).view(N, value_len, self.heads, self.head_dim).transpose(1, 2)
keys = self.keys(keys).view(N, key_len, self.heads, self.head_dim).transpose(1, 2)
queries = self.queries(queries).view(N, query_len, self.heads, self.head_dim).transpose(1, 2)
energy = torch.einsum("bthd,bshd->bhts", [queries, keys])
if mask is not None:
energy = energy.masked_fill(mask == 0, float('-inf'))
attention = torch.softmax(energy / (self.head_dim ** 0.5), dim=-1)
out = torch.einsum("bhts,bshd->bthd", [attention, values]).transpose(1, 2).reshape(N, query_len, self.embed_size)
return self.fc_out(out)
class TransformerBlock(nn.Module):
def __init__(self, embed_size, heads, expansion_factor=4, dropout=0.1, checkpoint=False):
super().__init__()
self.attention = SelfAttention(embed_size, heads)
self.feed_forward = SwiGLU(embed_size, expansion_factor)
self.norm1 = RMSNorm(embed_size)
self.norm2 = RMSNorm(embed_size)
self.rotary_pos_emb = RotaryPositionalEmbedding(embed_size)
self.checkpoint = checkpoint
def forward(self, value, mask=None):
def forward_fn(value, mask):
value = self.rotary_pos_emb(value)
attention = self.attention(value, value, value, mask)
x = self.norm1(attention + value)
forward = self.feed_forward(x)
out = self.norm2(forward + x)
return out
if self.checkpoint:
return checkpoint(forward_fn, value, mask)
else:
return forward_fn(value, mask)
class GPT(nn.Module):
def __init__(self, vocab_size, embed_size, num_layers, heads, max_length, expansion_factor=4, dropout=0.1, checkpoint=False):
super().__init__()
self.word_embedding = nn.Embedding(vocab_size, embed_size)
self.position_embedding = nn.Embedding(max_length, embed_size)
self.src_vocab_size = vocab_size
self.layers = nn.ModuleList(
[TransformerBlock(embed_size, heads, expansion_factor, dropout, checkpoint)
for _ in range(num_layers)]
)
self.norm = RMSNorm(embed_size)
self.fc_out = nn.Linear(embed_size, vocab_size)
def forward(self, x, mask=None):
positions = torch.arange(0, x.size(1)).unsqueeze(0).to(x.device)
x = self.word_embedding(x) + self.position_embedding(positions)
for layer in self.layers:
x = layer(x, mask)
x = self.norm(x)
return self.fc_out(x)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = GPT(vocab_size=10000, embed_size=768, num_layers=20, heads=16, max_length=512, checkpoint=True)
model.to(device)
inputs = torch.randint(0, 10000, (1, 100), device=device)
outputs = model(inputs)
print(outputs.shape) # Should output: [1, 100, 10000]