|
import torch
|
|
import torch.nn as nn
|
|
import torch.nn.functional as F
|
|
import math
|
|
import torch.optim as optim
|
|
from transformers import AutoModelForCausalLM
|
|
from transformers.modeling_utils import PreTrainedModel
|
|
from transformers.configuration_utils import PretrainedConfig
|
|
|
|
class DecoderLayer(nn.Module):
|
|
def __init__(self, d_model, n_heads, dim_feedforward, dropout=0.1, group_size=16):
|
|
super(DecoderLayer, self).__init__()
|
|
self.self_attn = MultiHeadAttention(d_model, n_heads, dropout, group_size)
|
|
self.feed_forward = PositionwiseFeedForward(d_model, dim_feedforward, dropout)
|
|
self.layer_norm1 = nn.LayerNorm(d_model)
|
|
self.layer_norm2 = nn.LayerNorm(d_model)
|
|
self.dropout = nn.Dropout(dropout)
|
|
|
|
def forward(self, x):
|
|
|
|
norm_x = self.layer_norm1(x)
|
|
x = x + self.dropout(self.self_attn(norm_x, norm_x, norm_x))
|
|
|
|
norm_x = self.layer_norm2(x)
|
|
x = x + self.dropout(self.feed_forward(norm_x))
|
|
return x
|
|
class MultiHeadAttention(nn.Module):
|
|
def __init__(self, d_model, n_heads, dropout=0.1, group_size=16):
|
|
super(MultiHeadAttention, self).__init__()
|
|
self.query_linear = nn.Linear(d_model, d_model)
|
|
self.key_linear = nn.Linear(d_model, d_model)
|
|
self.value_linear = nn.Linear(d_model, d_model)
|
|
self.dropout = nn.Dropout(dropout)
|
|
self.n_heads = n_heads
|
|
self.d_model = d_model
|
|
self.group_size = group_size
|
|
|
|
def forward(self, query, key, value):
|
|
|
|
query = self.query_linear(query)
|
|
key = self.key_linear(key)
|
|
value = self.value_linear(value)
|
|
|
|
|
|
query_groups = query.chunk(self.group_size, dim=1)
|
|
key_groups = key.chunk(self.group_size, dim=1)
|
|
value_groups = value.chunk(self.group_size, dim=1)
|
|
|
|
attention_scores = []
|
|
for q, k, v in zip(query_groups, key_groups, value_groups):
|
|
scores = torch.matmul(q, k.transpose(-1, -2)) / math.sqrt(self.d_model)
|
|
scores = F.softmax(scores, dim=-1)
|
|
scores = self.dropout(scores)
|
|
attention_scores.append(torch.matmul(scores, v))
|
|
|
|
|
|
output = torch.cat(attention_scores, dim=1)
|
|
return output
|
|
|
|
class PositionwiseFeedForward(nn.Module):
|
|
def __init__(self, d_model, dim_feedforward, dropout=0.1):
|
|
super(PositionwiseFeedForward, self).__init__()
|
|
self.linear1 = nn.Linear(d_model, dim_feedforward)
|
|
self.dropout = nn.Dropout(dropout)
|
|
self.linear2 = nn.Linear(dim_feedforward, d_model)
|
|
|
|
def forward(self, x):
|
|
x = F.relu(self.linear1(x))
|
|
x = self.dropout(x)
|
|
x = self.linear2(x)
|
|
return x
|
|
|
|
|
|
class Decoder(nn.Module):
|
|
def __init__(self, num_layers, d_model, n_heads, dim_feedforward, dropout=0.1, group_size=16):
|
|
super(Decoder, self).__init__()
|
|
self.layers = nn.ModuleList([
|
|
DecoderLayer(d_model, n_heads, dim_feedforward, dropout, group_size)
|
|
for _ in range(num_layers)
|
|
])
|
|
self.layer_norm = nn.LayerNorm(d_model)
|
|
|
|
def forward(self, x):
|
|
for layer in self.layers:
|
|
x = layer(x)
|
|
x = self.layer_norm(x)
|
|
return x
|
|
|
|
class Embeddings(nn.Module):
|
|
def __init__(self, d_model, vocab_size):
|
|
super(Embeddings, self).__init__()
|
|
self.lut = nn.Embedding(vocab_size, d_model)
|
|
self.d_model = d_model
|
|
|
|
def forward(self, x):
|
|
return self.lut(x) * math.sqrt(self.d_model)
|
|
|
|
class PositionalEncoding(nn.Module):
|
|
def __init__(self, d_model, dropout=0.1, max_len=5000):
|
|
super(PositionalEncoding, self).__init__()
|
|
self.dropout = nn.Dropout(dropout)
|
|
|
|
pe = torch.zeros(max_len, d_model)
|
|
position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
|
|
div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
|
|
pe[:, 0::2] = torch.sin(position * div_term)
|
|
pe[:, 1::2] = torch.cos(position * div_term)
|
|
pe = pe.unsqueeze(0).transpose(0, 1)
|
|
self.register_buffer('pe', pe)
|
|
|
|
def forward(self, x):
|
|
x = x + self.pe[:x.size(0), :]
|
|
return self.dropout(x)
|
|
class RMSNorm(nn.Module):
|
|
def __init__(self, dim, epsilon=1e-6, scale=True):
|
|
super(RMSNorm, self).__init__()
|
|
self.epsilon = epsilon
|
|
self.scale = scale
|
|
self.weight = nn.Parameter(torch.ones(dim))
|
|
|
|
def forward(self, x):
|
|
rms = torch.sqrt(torch.mean(torch.square(x), dim=-1, keepdim=True))
|
|
if self.scale:
|
|
weight = self.weight / (rms + self.epsilon)
|
|
return weight * x
|
|
else:
|
|
return x / (rms + self.epsilon)
|
|
class TransformerDecoder(nn.Module):
|
|
def __init__(self, num_layers, d_model, n_heads, dim_feedforward, dropout=0.1, vocab_size=10000, group_size=16):
|
|
super(TransformerDecoder, self).__init__()
|
|
self.embeddings = Embeddings(d_model, vocab_size)
|
|
self.positional_encoding = PositionalEncoding(d_model, dropout)
|
|
self.decoder = Decoder(num_layers, d_model, n_heads, dim_feedforward, dropout)
|
|
self.rms_norm = RMSNorm(d_model)
|
|
self.group_size = group_size
|
|
|
|
def forward(self, x):
|
|
x = self.embeddings(x)
|
|
x = self.positional_encoding(x)
|
|
x = self.decoder(x)
|
|
x = self.rms_norm(x)
|
|
return x
|
|
class TransformerDecoderLM(nn.Module):
|
|
def __init__(self, num_layers, d_model, n_heads, dim_feedforward, dropout=0.1, vocab_size=10000, group_size=16):
|
|
super(TransformerDecoderLM, self).__init__()
|
|
self.transformer = TransformerDecoder(num_layers, d_model, n_heads, dim_feedforward, dropout, vocab_size, group_size)
|
|
self.lm_head = nn.Linear(d_model, vocab_size)
|
|
|
|
def forward(self, input_ids):
|
|
transformer_output = self.transformer(input_ids)
|
|
lm_logits = self.lm_head(transformer_output)
|
|
return lm_logits
|
|
class CustomConfig(PretrainedConfig):
|
|
model_type = "custom_transformer"
|
|
def __init__(self, num_layers=6, d_model=512, n_heads=8, dim_feedforward=2048, dropout=0.1, vocab_size=10000, group_size=16, **kwargs):
|
|
self.num_layers = num_layers
|
|
self.d_model = d_model
|
|
self.n_heads = n_heads
|
|
self.dim_feedforward = dim_feedforward
|
|
self.dropout = dropout
|
|
self.vocab_size = vocab_size
|
|
self.group_size = group_size
|
|
super().__init__(**kwargs)
|
|
|
|
class CustomTransformerForCausalLM(PreTrainedModel):
|
|
config_class = CustomConfig
|
|
def __init__(self, config):
|
|
super().__init__(config)
|
|
self.transformer = TransformerDecoderLM(
|
|
num_layers=config.num_layers,
|
|
d_model=config.d_model,
|
|
n_heads=config.n_heads,
|
|
dim_feedforward=config.dim_feedforward,
|
|
dropout=config.dropout,
|
|
vocab_size=config.vocab_size,
|
|
group_size=config.group_size
|
|
)
|
|
|
|
def forward(self, input_ids, labels=None):
|
|
logits = self.transformer(input_ids)
|
|
|
|
loss = None
|
|
if labels is not None:
|
|
loss_fct = nn.CrossEntropyLoss()
|
|
loss = loss_fct(logits.view(-1, logits.size(-1)), labels.view(-1))
|
|
|
|
return {"loss": loss, "logits": logits}
|
|
|
|
|