llama-3-remake-interpretation / llama_interpretation.py
gate369's picture
Upload folder using huggingface_hub
a7b0685 verified
import torch
import torch.nn as nn
import torch.nn.functional as F
import math
import torch.optim as optim
from transformers import AutoModelForCausalLM
from transformers.modeling_utils import PreTrainedModel
from transformers.configuration_utils import PretrainedConfig
# Update the DecoderLayer to use the grouped MultiHeadAttention
class DecoderLayer(nn.Module):
def __init__(self, d_model, n_heads, dim_feedforward, dropout=0.1, group_size=16):
super(DecoderLayer, self).__init__()
self.self_attn = MultiHeadAttention(d_model, n_heads, dropout, group_size)
self.feed_forward = PositionwiseFeedForward(d_model, dim_feedforward, dropout)
self.layer_norm1 = nn.LayerNorm(d_model)
self.layer_norm2 = nn.LayerNorm(d_model)
self.dropout = nn.Dropout(dropout)
def forward(self, x):
# Self-Attention Mechanism (SA)
norm_x = self.layer_norm1(x)
x = x + self.dropout(self.self_attn(norm_x, norm_x, norm_x))
# Feed-Forward Network (FFN)
norm_x = self.layer_norm2(x)
x = x + self.dropout(self.feed_forward(norm_x))
return x
class MultiHeadAttention(nn.Module):
def __init__(self, d_model, n_heads, dropout=0.1, group_size=16):
super(MultiHeadAttention, self).__init__()
self.query_linear = nn.Linear(d_model, d_model)
self.key_linear = nn.Linear(d_model, d_model)
self.value_linear = nn.Linear(d_model, d_model)
self.dropout = nn.Dropout(dropout)
self.n_heads = n_heads
self.d_model = d_model
self.group_size = group_size
def forward(self, query, key, value):
# Compute attention scores
query = self.query_linear(query)
key = self.key_linear(key)
value = self.value_linear(value)
# Split the input sequences into groups
query_groups = query.chunk(self.group_size, dim=1)
key_groups = key.chunk(self.group_size, dim=1)
value_groups = value.chunk(self.group_size, dim=1)
attention_scores = []
for q, k, v in zip(query_groups, key_groups, value_groups):
scores = torch.matmul(q, k.transpose(-1, -2)) / math.sqrt(self.d_model)
scores = F.softmax(scores, dim=-1)
scores = self.dropout(scores)
attention_scores.append(torch.matmul(scores, v))
# Concatenate the outputs from all groups
output = torch.cat(attention_scores, dim=1)
return output
class PositionwiseFeedForward(nn.Module):
def __init__(self, d_model, dim_feedforward, dropout=0.1):
super(PositionwiseFeedForward, self).__init__()
self.linear1 = nn.Linear(d_model, dim_feedforward)
self.dropout = nn.Dropout(dropout)
self.linear2 = nn.Linear(dim_feedforward, d_model)
def forward(self, x):
x = F.relu(self.linear1(x))
x = self.dropout(x)
x = self.linear2(x)
return x
# Update the Decoder class to use the grouped MultiHeadAttention
class Decoder(nn.Module):
def __init__(self, num_layers, d_model, n_heads, dim_feedforward, dropout=0.1, group_size=16):
super(Decoder, self).__init__()
self.layers = nn.ModuleList([
DecoderLayer(d_model, n_heads, dim_feedforward, dropout, group_size)
for _ in range(num_layers)
])
self.layer_norm = nn.LayerNorm(d_model)
def forward(self, x):
for layer in self.layers:
x = layer(x)
x = self.layer_norm(x)
return x
class Embeddings(nn.Module):
def __init__(self, d_model, vocab_size):
super(Embeddings, self).__init__()
self.lut = nn.Embedding(vocab_size, d_model)
self.d_model = d_model
def forward(self, x):
return self.lut(x) * math.sqrt(self.d_model)
class PositionalEncoding(nn.Module):
def __init__(self, d_model, dropout=0.1, max_len=5000):
super(PositionalEncoding, self).__init__()
self.dropout = nn.Dropout(dropout)
pe = torch.zeros(max_len, d_model)
position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
pe[:, 0::2] = torch.sin(position * div_term)
pe[:, 1::2] = torch.cos(position * div_term)
pe = pe.unsqueeze(0).transpose(0, 1)
self.register_buffer('pe', pe)
def forward(self, x):
x = x + self.pe[:x.size(0), :]
return self.dropout(x)
class RMSNorm(nn.Module):
def __init__(self, dim, epsilon=1e-6, scale=True):
super(RMSNorm, self).__init__()
self.epsilon = epsilon
self.scale = scale
self.weight = nn.Parameter(torch.ones(dim))
def forward(self, x):
rms = torch.sqrt(torch.mean(torch.square(x), dim=-1, keepdim=True))
if self.scale:
weight = self.weight / (rms + self.epsilon)
return weight * x
else:
return x / (rms + self.epsilon)
class TransformerDecoder(nn.Module):
def __init__(self, num_layers, d_model, n_heads, dim_feedforward, dropout=0.1, vocab_size=10000, group_size=16):
super(TransformerDecoder, self).__init__()
self.embeddings = Embeddings(d_model, vocab_size)
self.positional_encoding = PositionalEncoding(d_model, dropout)
self.decoder = Decoder(num_layers, d_model, n_heads, dim_feedforward, dropout)
self.rms_norm = RMSNorm(d_model)
self.group_size = group_size
def forward(self, x):
x = self.embeddings(x)
x = self.positional_encoding(x)
x = self.decoder(x)
x = self.rms_norm(x)
return x
class TransformerDecoderLM(nn.Module):
def __init__(self, num_layers, d_model, n_heads, dim_feedforward, dropout=0.1, vocab_size=10000, group_size=16):
super(TransformerDecoderLM, self).__init__()
self.transformer = TransformerDecoder(num_layers, d_model, n_heads, dim_feedforward, dropout, vocab_size, group_size)
self.lm_head = nn.Linear(d_model, vocab_size)
def forward(self, input_ids):
transformer_output = self.transformer(input_ids)
lm_logits = self.lm_head(transformer_output)
return lm_logits
class CustomConfig(PretrainedConfig):
model_type = "custom_transformer"
def __init__(self, num_layers=6, d_model=512, n_heads=8, dim_feedforward=2048, dropout=0.1, vocab_size=10000, group_size=16, **kwargs):
self.num_layers = num_layers
self.d_model = d_model
self.n_heads = n_heads
self.dim_feedforward = dim_feedforward
self.dropout = dropout
self.vocab_size = vocab_size
self.group_size = group_size
super().__init__(**kwargs)
class CustomTransformerForCausalLM(PreTrainedModel):
config_class = CustomConfig
def __init__(self, config):
super().__init__(config)
self.transformer = TransformerDecoderLM(
num_layers=config.num_layers,
d_model=config.d_model,
n_heads=config.n_heads,
dim_feedforward=config.dim_feedforward,
dropout=config.dropout,
vocab_size=config.vocab_size,
group_size=config.group_size
)
def forward(self, input_ids, labels=None):
logits = self.transformer(input_ids)
loss = None
if labels is not None:
loss_fct = nn.CrossEntropyLoss()
loss = loss_fct(logits.view(-1, logits.size(-1)), labels.view(-1))
return {"loss": loss, "logits": logits}