|
import copy |
|
from dataclasses import dataclass |
|
import math |
|
from safetensors.torch import load_model |
|
import torch |
|
from torch import nn |
|
from transformers.modeling_utils import ModuleUtilsMixin, PretrainedConfig, PreTrainedModel |
|
from transformers.modeling_outputs import ModelOutput |
|
from transformers.models.t5.configuration_t5 import T5Config |
|
from transformers.models.t5.modeling_t5 import ( |
|
T5LayerNorm, |
|
T5DenseGatedActDense, |
|
) |
|
from typing import Optional |
|
|
|
|
|
|
|
|
|
@dataclass |
|
class EncoderOutput(ModelOutput): |
|
hidden_states: torch.FloatTensor = None |
|
attention_mask: torch.FloatTensor = None |
|
|
|
|
|
@dataclass |
|
class Seq2SeqLMOutput(ModelOutput): |
|
loss: torch.FloatTensor = None |
|
logits: torch.FloatTensor = None |
|
encoder_outputs: EncoderOutput = None |
|
|
|
|
|
class T5LayerFF(nn.Module): |
|
def __init__(self, config: T5Config): |
|
super().__init__() |
|
assert config.is_gated_act |
|
self.DenseReluDense = T5DenseGatedActDense(config) |
|
self.layer_norm = T5LayerNorm(config.d_model, eps=config.layer_norm_epsilon) |
|
self.dropout = nn.Dropout(config.dropout_rate) |
|
|
|
def forward(self, hidden_states): |
|
forwarded_states = self.layer_norm(hidden_states).type_as(hidden_states) |
|
forwarded_states = self.DenseReluDense(forwarded_states) |
|
hidden_states = hidden_states + self.dropout(forwarded_states) |
|
return hidden_states |
|
|
|
|
|
class T5Attention(nn.Module): |
|
def __init__(self, config: T5Config, has_relative_attention_bias=False): |
|
super().__init__() |
|
self.is_decoder = config.is_decoder |
|
self.has_relative_attention_bias = has_relative_attention_bias |
|
self.relative_attention_num_buckets = config.relative_attention_num_buckets |
|
self.relative_attention_max_distance = config.relative_attention_max_distance |
|
self.d_model = config.d_model |
|
self.key_value_proj_dim = config.d_kv |
|
self.n_heads = config.num_heads |
|
self.dropout = config.dropout_rate |
|
self.inner_dim = self.n_heads * self.key_value_proj_dim |
|
|
|
self.q = nn.Linear(self.d_model, self.inner_dim, bias=False) |
|
self.k = nn.Linear(self.d_model, self.inner_dim, bias=False) |
|
self.v = nn.Linear(self.d_model, self.inner_dim, bias=False) |
|
self.o = nn.Linear(self.inner_dim, self.d_model, bias=False) |
|
|
|
if self.has_relative_attention_bias: |
|
self.relative_attention_bias = nn.Embedding(self.relative_attention_num_buckets, self.n_heads) |
|
|
|
@staticmethod |
|
def _relative_position_bucket(relative_position, bidirectional=True, num_buckets=32, max_distance=128): |
|
""" |
|
Adapted from Mesh Tensorflow: |
|
https://github.com/tensorflow/mesh/blob/0cb87fe07da627bf0b7e60475d59f95ed6b5be3d/mesh_tensorflow/transformer/transformer_layers.py#L593 |
|
|
|
Translate relative position to a bucket number for relative attention. The relative position is defined as |
|
memory_position - query_position, i.e. the distance in tokens from the attending position to the attended-to |
|
position. If bidirectional=False, then positive relative positions are invalid. We use smaller buckets for |
|
small absolute relative_position and larger buckets for larger absolute relative_positions. All relative |
|
positions >=max_distance map to the same bucket. All relative positions <=-max_distance map to the same bucket. |
|
This should allow for more graceful generalization to longer sequences than the model has been trained on |
|
|
|
Args: |
|
relative_position: an int32 Tensor |
|
bidirectional: a boolean - whether the attention is bidirectional |
|
num_buckets: an integer |
|
max_distance: an integer |
|
|
|
Returns: |
|
a Tensor with the same shape as relative_position, containing int32 values in the range [0, num_buckets) |
|
""" |
|
relative_buckets = 0 |
|
if bidirectional: |
|
num_buckets //= 2 |
|
relative_buckets += (relative_position > 0).to(torch.long) * num_buckets |
|
relative_position = torch.abs(relative_position) |
|
else: |
|
relative_position = -torch.min(relative_position, torch.zeros_like(relative_position)) |
|
|
|
|
|
|
|
max_exact = num_buckets // 2 |
|
is_small = relative_position < max_exact |
|
|
|
|
|
relative_position_if_large = max_exact + ( |
|
torch.log(relative_position.float() / max_exact) |
|
/ math.log(max_distance / max_exact) |
|
* (num_buckets - max_exact) |
|
).to(torch.long) |
|
relative_position_if_large = torch.min( |
|
relative_position_if_large, torch.full_like(relative_position_if_large, num_buckets - 1) |
|
) |
|
|
|
relative_buckets += torch.where(is_small, relative_position, relative_position_if_large) |
|
return relative_buckets |
|
|
|
def compute_bias(self, query_length, key_length, device=None): |
|
"""Compute binned relative position bias""" |
|
if device is None: |
|
device = self.relative_attention_bias.weight.device |
|
context_position = torch.arange(query_length, dtype=torch.long, device=device)[:, None] |
|
memory_position = torch.arange(key_length, dtype=torch.long, device=device)[None, :] |
|
relative_position = memory_position - context_position |
|
relative_position_bucket = self._relative_position_bucket( |
|
relative_position, |
|
bidirectional=(not self.is_decoder), |
|
num_buckets=self.relative_attention_num_buckets, |
|
max_distance=self.relative_attention_max_distance, |
|
) |
|
values = self.relative_attention_bias(relative_position_bucket) |
|
values = values.permute([2, 0, 1]).unsqueeze(0) |
|
return values |
|
|
|
def forward( |
|
self, |
|
hidden_states, |
|
mask=None, |
|
key_value_states=None, |
|
position_bias=None, |
|
): |
|
""" |
|
Self-attention (if key_value_states is None) or attention over source sentence (provided by key_value_states). |
|
""" |
|
|
|
|
|
batch_size, seq_length = hidden_states.shape[:2] |
|
real_seq_length = seq_length |
|
key_length = real_seq_length if key_value_states is None else key_value_states.shape[1] |
|
|
|
def shape(states): |
|
"""projection""" |
|
return states.view(batch_size, -1, self.n_heads, self.key_value_proj_dim).transpose(1, 2) |
|
|
|
def unshape(states): |
|
"""reshape""" |
|
return states.transpose(1, 2).contiguous().view(batch_size, -1, self.inner_dim) |
|
|
|
query_states = self.q(hidden_states) |
|
if key_value_states is None: |
|
key_states, value_states = self.k(hidden_states), self.v(hidden_states) |
|
else: |
|
key_states, value_states = self.k(key_value_states), self.v(key_value_states) |
|
query_states, key_states, value_states = shape(query_states), shape(key_states), shape(value_states) |
|
|
|
scores = torch.matmul( |
|
query_states, key_states.transpose(3, 2) |
|
) |
|
|
|
if position_bias is None: |
|
if not self.has_relative_attention_bias: |
|
position_bias = torch.zeros( |
|
(1, self.n_heads, real_seq_length, key_length), device=scores.device, dtype=scores.dtype |
|
) |
|
else: |
|
position_bias = self.compute_bias(real_seq_length, key_length, device=scores.device) |
|
|
|
if mask is not None: |
|
|
|
position_bias = position_bias + mask |
|
|
|
position_bias_masked = position_bias |
|
|
|
scores += position_bias_masked |
|
attn_weights = nn.functional.softmax(scores.float(), dim=-1).type_as( |
|
scores |
|
) |
|
attn_weights = nn.functional.dropout( |
|
attn_weights, p=self.dropout, training=self.training |
|
) |
|
|
|
attn_output = unshape(torch.matmul(attn_weights, value_states)) |
|
attn_output = self.o(attn_output) |
|
|
|
return (attn_output, position_bias) |
|
|
|
|
|
class T5LayerSelfAttention(nn.Module): |
|
def __init__(self, config, has_relative_attention_bias=False): |
|
super().__init__() |
|
self.SelfAttention = T5Attention(config, has_relative_attention_bias=has_relative_attention_bias) |
|
self.layer_norm = T5LayerNorm(config.d_model, eps=config.layer_norm_epsilon) |
|
self.dropout = nn.Dropout(config.dropout_rate) |
|
|
|
def forward( |
|
self, |
|
hidden_states, |
|
attention_mask=None, |
|
position_bias=None, |
|
): |
|
normed_hidden_states = self.layer_norm(hidden_states).type_as(hidden_states) |
|
attention_output = self.SelfAttention( |
|
normed_hidden_states, |
|
mask=attention_mask, |
|
position_bias=position_bias, |
|
) |
|
hidden_states = hidden_states + self.dropout(attention_output[0]) |
|
outputs = (hidden_states,) + attention_output[1:] |
|
return outputs |
|
|
|
|
|
class T5LayerCrossAttention(nn.Module): |
|
def __init__(self, config): |
|
super().__init__() |
|
self.EncDecAttention = T5Attention(config, has_relative_attention_bias=False) |
|
self.layer_norm = T5LayerNorm(config.d_model, eps=config.layer_norm_epsilon) |
|
self.dropout = nn.Dropout(config.dropout_rate) |
|
|
|
def forward( |
|
self, |
|
hidden_states, |
|
key_value_states, |
|
attention_mask=None, |
|
position_bias=None, |
|
): |
|
normed_hidden_states = self.layer_norm(hidden_states) |
|
attention_output = self.EncDecAttention( |
|
normed_hidden_states, |
|
mask=attention_mask, |
|
key_value_states=key_value_states, |
|
position_bias=position_bias, |
|
) |
|
layer_output = hidden_states + self.dropout(attention_output[0]) |
|
outputs = (layer_output,) + attention_output[1:] |
|
return outputs |
|
|
|
|
|
class T5Block(nn.Module): |
|
def __init__(self, config, has_relative_attention_bias=False): |
|
super().__init__() |
|
self.is_decoder = config.is_decoder |
|
self.layer = nn.ModuleList() |
|
self.layer.append(T5LayerSelfAttention(config, has_relative_attention_bias=has_relative_attention_bias)) |
|
if self.is_decoder: |
|
self.layer.append(T5LayerCrossAttention(config)) |
|
|
|
self.layer.append(T5LayerFF(config)) |
|
|
|
def forward( |
|
self, |
|
hidden_states, |
|
attention_mask=None, |
|
position_bias=None, |
|
encoder_hidden_states=None, |
|
encoder_attention_mask=None, |
|
encoder_decoder_position_bias=None, |
|
): |
|
self_attention_outputs = self.layer[0]( |
|
hidden_states, |
|
attention_mask=attention_mask, |
|
position_bias=position_bias, |
|
) |
|
hidden_states = self_attention_outputs[0] |
|
attention_outputs = self_attention_outputs[1:] |
|
|
|
if self.is_decoder and encoder_hidden_states is not None: |
|
cross_attention_outputs = self.layer[1]( |
|
hidden_states, |
|
key_value_states=encoder_hidden_states, |
|
attention_mask=encoder_attention_mask, |
|
position_bias=encoder_decoder_position_bias, |
|
) |
|
hidden_states = cross_attention_outputs[0] |
|
|
|
|
|
attention_outputs = attention_outputs + cross_attention_outputs[1:] |
|
|
|
|
|
hidden_states = self.layer[-1](hidden_states) |
|
|
|
outputs = (hidden_states,) |
|
outputs = outputs + attention_outputs |
|
|
|
return outputs |
|
|
|
|
|
class T5Stack(nn.Module, ModuleUtilsMixin): |
|
def __init__(self, config, embed_tokens): |
|
super().__init__() |
|
assert embed_tokens is not None |
|
|
|
self.config = config |
|
self.embed_tokens = embed_tokens |
|
self.is_decoder = config.is_decoder |
|
|
|
self.block = nn.ModuleList( |
|
[T5Block(config, has_relative_attention_bias=bool(i == 0)) for i in range(config.num_layers)] |
|
) |
|
|
|
self.final_layer_norm = T5LayerNorm(config.d_model, eps=config.layer_norm_epsilon) |
|
self.dropout = nn.Dropout(config.dropout_rate) |
|
|
|
def forward( |
|
self, |
|
input_ids=None, |
|
attention_mask=None, |
|
encoder_hidden_states=None, |
|
encoder_attention_mask=None, |
|
) -> EncoderOutput: |
|
input_shape = input_ids.size() |
|
batch_size, seq_length = input_shape |
|
|
|
inputs_embeds = self.embed_tokens(input_ids) |
|
|
|
if hasattr(self.config, 'is_bf16') and self.config.is_bf16: |
|
inputs_embeds = inputs_embeds.to(torch.bfloat16) |
|
|
|
|
|
if attention_mask is None: |
|
attention_mask = torch.ones(batch_size, seq_length, device=inputs_embeds.device) |
|
|
|
if self.is_decoder and encoder_attention_mask is None and encoder_hidden_states is not None: |
|
encoder_seq_length = encoder_hidden_states.shape[1] |
|
encoder_attention_mask = torch.ones( |
|
batch_size, encoder_seq_length, device=inputs_embeds.device, dtype=torch.long |
|
) |
|
|
|
|
|
|
|
extended_attention_mask = self.get_extended_attention_mask(attention_mask, input_shape) |
|
|
|
|
|
|
|
if self.is_decoder and encoder_hidden_states is not None: |
|
encoder_batch_size, encoder_sequence_length, _ = encoder_hidden_states.size() |
|
encoder_hidden_shape = (encoder_batch_size, encoder_sequence_length) |
|
if encoder_attention_mask is None: |
|
encoder_attention_mask = torch.ones(encoder_hidden_shape, device=inputs_embeds.device) |
|
encoder_extended_attention_mask = self.invert_attention_mask(encoder_attention_mask) |
|
else: |
|
encoder_extended_attention_mask = None |
|
|
|
position_bias = None |
|
encoder_decoder_position_bias = None |
|
|
|
hidden_states = self.dropout(inputs_embeds) |
|
|
|
for _, layer_module in enumerate(self.block): |
|
layer_outputs = layer_module( |
|
hidden_states, |
|
attention_mask=extended_attention_mask, |
|
position_bias=position_bias, |
|
encoder_hidden_states=encoder_hidden_states, |
|
encoder_attention_mask=encoder_extended_attention_mask, |
|
encoder_decoder_position_bias=encoder_decoder_position_bias, |
|
) |
|
hidden_states = layer_outputs[0] |
|
|
|
|
|
position_bias = layer_outputs[1] |
|
if self.is_decoder and encoder_hidden_states is not None: |
|
encoder_decoder_position_bias = layer_outputs[2] |
|
|
|
hidden_states = self.final_layer_norm(hidden_states).type_as(hidden_states) |
|
hidden_states = self.dropout(hidden_states) |
|
|
|
return EncoderOutput( |
|
hidden_states=hidden_states, |
|
attention_mask=attention_mask, |
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
class T5EncoderModel(nn.Module): |
|
def __init__(self, config: T5Config): |
|
super().__init__() |
|
config.is_encoder_decoder = False |
|
assert not config.tie_word_embeddings |
|
self.config = config |
|
self.model_dim = config.d_model |
|
self.shared = nn.Embedding(config.vocab_size, config.d_model) |
|
encoder_config = copy.deepcopy(config) |
|
encoder_config.is_decoder = False |
|
self.encoder = T5Stack(encoder_config, self.shared) |
|
flux_dev_d_model = 4096 |
|
self.last_layer = nn.Sequential( |
|
nn.Linear(config.d_model, flux_dev_d_model), |
|
nn.ReLU(), |
|
nn.Linear(flux_dev_d_model, flux_dev_d_model) |
|
) |
|
self.apply(self._init_weights) |
|
|
|
def forward( |
|
self, |
|
input_ids: Optional[torch.LongTensor] = None, |
|
attention_mask: Optional[torch.FloatTensor] = None |
|
): |
|
encoder_outputs = self.encoder( |
|
input_ids=input_ids, |
|
attention_mask=attention_mask, |
|
) |
|
|
|
return self.last_layer(encoder_outputs.hidden_states) |
|
|
|
def get_input_embeddings(self): |
|
return self.shared |
|
|
|
def _init_weights(self, module): |
|
factor = self.config.initializer_factor |
|
if isinstance(module, T5LayerNorm): |
|
module.weight.data.fill_(factor * 1.0) |
|
elif isinstance(module, T5EncoderModel): |
|
module.shared.weight.data.normal_(mean=0.0, std=factor * 1.0) |
|
if hasattr(module, "lm_head") and not self.config.tie_word_embeddings: |
|
module.lm_head.weight.data.normal_(mean=0.0, std=factor * 1.0) |
|
elif isinstance(module, T5DenseGatedActDense): |
|
d_ff, d_model = module.wi_0.weight.data.size() |
|
module.wi_0.weight.data.normal_(mean=0.0, std=factor * ((d_model) ** -0.5)) |
|
module.wi_1.weight.data.normal_(mean=0.0, std=factor * ((d_model) ** -0.5)) |
|
module.wo.weight.data.normal_(mean=0.0, std=factor * ((d_ff) ** -0.5)) |
|
elif isinstance(module, T5Attention): |
|
d_model = self.config.d_model |
|
key_value_proj_dim = self.config.d_kv |
|
n_heads = self.config.num_heads |
|
module.q.weight.data.normal_(mean=0.0, std=factor * ((d_model * key_value_proj_dim) ** -0.5)) |
|
module.k.weight.data.normal_(mean=0.0, std=factor * (d_model ** -0.5)) |
|
module.v.weight.data.normal_(mean=0.0, std=factor * (d_model ** -0.5)) |
|
module.o.weight.data.normal_(mean=0.0, std=factor * ((n_heads * key_value_proj_dim) ** -0.5)) |
|
if hasattr(module, "relative_attention_bias"): |
|
module.relative_attention_bias.weight.data.normal_(mean=0.0, std=factor * ((d_model) ** -0.5)) |
|
|
|
class PretrainedTextEncoder(PreTrainedModel): |
|
|
|
|
|
|
|
|
|
|
|
def __init__(self, config, model): |
|
super().__init__(config, model) |
|
self.model = model |
|
|
|
def load_model(self, filepath): |
|
load_model(self.model, filepath) |
|
|
|
def forward(self, x, output_hidden_states=False): |
|
return self.model(x), |
|
|
|
t5_config = T5Config(d_model=4096 // 2, |
|
dd_ff=10240 // 2, |
|
num_layers=2, |
|
num_heads=32, |
|
is_gated_act=True, |
|
tie_word_embeddings=False, |
|
max_seq_len=512) |
|
|