|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import inspect |
|
import logging |
|
import os |
|
from typing import Callable, Dict, List, Optional, Tuple |
|
|
|
import torch |
|
from torch import Tensor, device, dtype, nn |
|
from torch.nn import CrossEntropyLoss |
|
from torch.nn import functional as F |
|
|
|
from .activations import get_activation |
|
from .configuration_utils import PretrainedConfig |
|
from .file_utils import ( |
|
DUMMY_INPUTS, |
|
TF2_WEIGHTS_NAME, |
|
TF_WEIGHTS_NAME, |
|
WEIGHTS_NAME, |
|
cached_path, |
|
hf_bucket_url, |
|
is_remote_url, |
|
) |
|
from .generation_utils import GenerationMixin |
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
try: |
|
from torch.nn import Identity |
|
except ImportError: |
|
|
|
class Identity(nn.Module): |
|
r"""A placeholder identity operator that is argument-insensitive. |
|
""" |
|
|
|
def __init__(self, *args, **kwargs): |
|
super().__init__() |
|
|
|
def forward(self, input): |
|
return input |
|
|
|
|
|
def find_pruneable_heads_and_indices( |
|
heads: List, n_heads: int, head_size: int, already_pruned_heads: set |
|
) -> Tuple[set, "torch.LongTensor"]: |
|
mask = torch.ones(n_heads, head_size) |
|
heads = set(heads) - already_pruned_heads |
|
for head in heads: |
|
|
|
head = head - sum(1 if h < head else 0 for h in already_pruned_heads) |
|
mask[head] = 0 |
|
mask = mask.view(-1).contiguous().eq(1) |
|
index: torch.LongTensor = torch.arange(len(mask))[mask].long() |
|
return heads, index |
|
|
|
|
|
class ModuleUtilsMixin: |
|
""" |
|
A few utilities for torch.nn.Modules, to be used as a mixin. |
|
""" |
|
|
|
def num_parameters(self, only_trainable: bool = False) -> int: |
|
""" |
|
Get number of (optionally, trainable) parameters in the module. |
|
""" |
|
params = filter(lambda x: x.requires_grad, self.parameters()) if only_trainable else self.parameters() |
|
return sum(p.numel() for p in params) |
|
|
|
@staticmethod |
|
def _hook_rss_memory_pre_forward(module, *args, **kwargs): |
|
try: |
|
import psutil |
|
except (ImportError): |
|
raise ImportError("You need to install psutil (pip install psutil) to use memory tracing.") |
|
|
|
process = psutil.Process(os.getpid()) |
|
mem = process.memory_info() |
|
module.mem_rss_pre_forward = mem.rss |
|
return None |
|
|
|
@staticmethod |
|
def _hook_rss_memory_post_forward(module, *args, **kwargs): |
|
try: |
|
import psutil |
|
except (ImportError): |
|
raise ImportError("You need to install psutil (pip install psutil) to use memory tracing.") |
|
|
|
process = psutil.Process(os.getpid()) |
|
mem = process.memory_info() |
|
module.mem_rss_post_forward = mem.rss |
|
mem_rss_diff = module.mem_rss_post_forward - module.mem_rss_pre_forward |
|
module.mem_rss_diff = mem_rss_diff + (module.mem_rss_diff if hasattr(module, "mem_rss_diff") else 0) |
|
return None |
|
|
|
def add_memory_hooks(self): |
|
""" Add a memory hook before and after each sub-module forward pass to record increase in memory consumption. |
|
Increase in memory consumption is stored in a `mem_rss_diff` attribute for each module and can be reset to zero with `model.reset_memory_hooks_state()` |
|
""" |
|
for module in self.modules(): |
|
module.register_forward_pre_hook(self._hook_rss_memory_pre_forward) |
|
module.register_forward_hook(self._hook_rss_memory_post_forward) |
|
self.reset_memory_hooks_state() |
|
|
|
def reset_memory_hooks_state(self): |
|
for module in self.modules(): |
|
module.mem_rss_diff = 0 |
|
module.mem_rss_post_forward = 0 |
|
module.mem_rss_pre_forward = 0 |
|
|
|
@property |
|
def device(self) -> device: |
|
""" |
|
Get torch.device from module, assuming that the whole module has one device. |
|
""" |
|
try: |
|
return next(self.parameters()).device |
|
except StopIteration: |
|
|
|
|
|
def find_tensor_attributes(module: nn.Module) -> List[Tuple[str, Tensor]]: |
|
tuples = [(k, v) for k, v in module.__dict__.items() if torch.is_tensor(v)] |
|
return tuples |
|
|
|
gen = self._named_members(get_members_fn=find_tensor_attributes) |
|
first_tuple = next(gen) |
|
return first_tuple[1].device |
|
|
|
@property |
|
def dtype(self) -> dtype: |
|
""" |
|
Get torch.dtype from module, assuming that the whole module has one dtype. |
|
""" |
|
try: |
|
return next(self.parameters()).dtype |
|
except StopIteration: |
|
|
|
|
|
def find_tensor_attributes(module: nn.Module) -> List[Tuple[str, Tensor]]: |
|
tuples = [(k, v) for k, v in module.__dict__.items() if torch.is_tensor(v)] |
|
return tuples |
|
|
|
gen = self._named_members(get_members_fn=find_tensor_attributes) |
|
first_tuple = next(gen) |
|
return first_tuple[1].dtype |
|
|
|
def invert_attention_mask(self, encoder_attention_mask: Tensor) -> Tensor: |
|
"""type: torch.Tensor -> torch.Tensor""" |
|
if encoder_attention_mask.dim() == 3: |
|
encoder_extended_attention_mask = encoder_attention_mask[:, None, :, :] |
|
if encoder_attention_mask.dim() == 2: |
|
encoder_extended_attention_mask = encoder_attention_mask[:, None, None, :] |
|
|
|
|
|
|
|
|
|
|
|
encoder_extended_attention_mask = encoder_extended_attention_mask.to(dtype=self.dtype) |
|
|
|
if self.dtype == torch.float16: |
|
encoder_extended_attention_mask = (1.0 - encoder_extended_attention_mask) * -1e4 |
|
elif self.dtype == torch.float32: |
|
encoder_extended_attention_mask = (1.0 - encoder_extended_attention_mask) * -1e9 |
|
else: |
|
raise ValueError( |
|
"{} not recognized. `dtype` should be set to either `torch.float32` or `torch.float16`".format( |
|
self.dtype |
|
) |
|
) |
|
|
|
return encoder_extended_attention_mask |
|
|
|
def get_extended_attention_mask(self, attention_mask: Tensor, input_shape: Tuple, device: device) -> Tensor: |
|
"""Makes broadcastable attention mask and causal mask so that future and maked tokens are ignored. |
|
|
|
Arguments: |
|
attention_mask: torch.Tensor with 1 indicating tokens to ATTEND to |
|
input_shape: tuple, shape of input_ids |
|
device: torch.Device, usually self.device |
|
|
|
Returns: |
|
torch.Tensor with dtype of attention_mask.dtype |
|
""" |
|
|
|
|
|
if attention_mask.dim() == 3: |
|
extended_attention_mask = attention_mask[:, None, :, :] |
|
elif attention_mask.dim() == 2: |
|
|
|
|
|
|
|
if self.config.is_decoder: |
|
batch_size, seq_length = input_shape |
|
seq_ids = torch.arange(seq_length, device=device) |
|
causal_mask = seq_ids[None, None, :].repeat(batch_size, seq_length, 1) <= seq_ids[None, :, None] |
|
|
|
causal_mask = causal_mask.to(attention_mask.dtype) |
|
extended_attention_mask = causal_mask[:, None, :, :] * attention_mask[:, None, None, :] |
|
else: |
|
extended_attention_mask = attention_mask[:, None, None, :] |
|
else: |
|
raise ValueError( |
|
"Wrong shape for input_ids (shape {}) or attention_mask (shape {})".format( |
|
input_shape, attention_mask.shape |
|
) |
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
extended_attention_mask = extended_attention_mask.to(dtype=self.dtype) |
|
extended_attention_mask = (1.0 - extended_attention_mask) * -10000.0 |
|
return extended_attention_mask |
|
|
|
def get_head_mask(self, head_mask: Tensor, num_hidden_layers: int, is_attention_chunked: bool = False) -> Tensor: |
|
""" |
|
# Prepare head mask if needed |
|
# 1.0 in head_mask indicate we keep the head |
|
attention_probs has shape bsz x n_heads x N x N |
|
Arguments: |
|
head_mask: torch.Tensor or None: has shape [num_heads] or [num_hidden_layers x num_heads] |
|
num_hidden_layers: int |
|
Returns: |
|
Tensor of shape shape [num_hidden_layers x batch x num_heads x seq_length x seq_length] |
|
or list with [None] for each layer |
|
""" |
|
if head_mask is not None: |
|
head_mask = self._convert_head_mask_to_5d(head_mask, num_hidden_layers) |
|
if is_attention_chunked is True: |
|
head_mask = head_mask.unsqueeze(-1) |
|
else: |
|
head_mask = [None] * num_hidden_layers |
|
|
|
return head_mask |
|
|
|
def _convert_head_mask_to_5d(self, head_mask, num_hidden_layers): |
|
"""-> [num_hidden_layers x batch x num_heads x seq_length x seq_length]""" |
|
if head_mask.dim() == 1: |
|
head_mask = head_mask.unsqueeze(0).unsqueeze(0).unsqueeze(-1).unsqueeze(-1) |
|
head_mask = head_mask.expand(num_hidden_layers, -1, -1, -1, -1) |
|
elif head_mask.dim() == 2: |
|
head_mask = head_mask.unsqueeze(1).unsqueeze(-1).unsqueeze(-1) |
|
assert head_mask.dim() == 5, f"head_mask.dim != 5, instead {head_mask.dim()}" |
|
head_mask = head_mask.to(dtype=self.dtype) |
|
return head_mask |
|
|
|
|
|
class PreTrainedModel(nn.Module, ModuleUtilsMixin, GenerationMixin): |
|
r""" Base class for all models. |
|
|
|
:class:`~transformers.PreTrainedModel` takes care of storing the configuration of the models and handles methods for loading/downloading/saving models |
|
as well as a few methods common to all models to (i) resize the input embeddings and (ii) prune heads in the self-attention heads. |
|
|
|
Class attributes (overridden by derived classes): |
|
- ``config_class``: a class derived from :class:`~transformers.PretrainedConfig` to use as configuration class for this model architecture. |
|
- ``load_tf_weights``: a python ``method`` for loading a TensorFlow checkpoint in a PyTorch model, taking as arguments: |
|
|
|
- ``model``: an instance of the relevant subclass of :class:`~transformers.PreTrainedModel`, |
|
- ``config``: an instance of the relevant subclass of :class:`~transformers.PretrainedConfig`, |
|
- ``path``: a path (string) to the TensorFlow checkpoint. |
|
|
|
- ``base_model_prefix``: a string indicating the attribute associated to the base model in derived classes of the same architecture adding modules on top of the base model. |
|
""" |
|
config_class = None |
|
base_model_prefix = "" |
|
|
|
@property |
|
def dummy_inputs(self): |
|
""" Dummy inputs to do a forward pass in the network. |
|
|
|
Returns: |
|
torch.Tensor with dummy inputs |
|
""" |
|
return {"input_ids": torch.tensor(DUMMY_INPUTS)} |
|
|
|
def __init__(self, config, *inputs, **kwargs): |
|
super().__init__() |
|
if not isinstance(config, PretrainedConfig): |
|
raise ValueError( |
|
"Parameter config in `{}(config)` should be an instance of class `PretrainedConfig`. " |
|
"To create a model from a pretrained model use " |
|
"`model = {}.from_pretrained(PRETRAINED_MODEL_NAME)`".format( |
|
self.__class__.__name__, self.__class__.__name__ |
|
) |
|
) |
|
|
|
self.config = config |
|
|
|
@property |
|
def base_model(self): |
|
return getattr(self, self.base_model_prefix, self) |
|
|
|
def get_input_embeddings(self): |
|
""" |
|
Returns the model's input embeddings. |
|
|
|
Returns: |
|
:obj:`nn.Module`: |
|
A torch module mapping vocabulary to hidden states. |
|
""" |
|
base_model = getattr(self, self.base_model_prefix, self) |
|
if base_model is not self: |
|
return base_model.get_input_embeddings() |
|
else: |
|
raise NotImplementedError |
|
|
|
def set_input_embeddings(self, value: nn.Module): |
|
""" |
|
Set model's input embeddings |
|
|
|
Args: |
|
value (:obj:`nn.Module`): |
|
A module mapping vocabulary to hidden states. |
|
""" |
|
base_model = getattr(self, self.base_model_prefix, self) |
|
if base_model is not self: |
|
base_model.set_input_embeddings(value) |
|
else: |
|
raise NotImplementedError |
|
|
|
def get_output_embeddings(self): |
|
""" |
|
Returns the model's output embeddings. |
|
|
|
Returns: |
|
:obj:`nn.Module`: |
|
A torch module mapping hidden states to vocabulary. |
|
""" |
|
return None |
|
|
|
def tie_weights(self): |
|
""" |
|
Tie the weights between the input embeddings and the output embeddings. |
|
If the `torchscript` flag is set in the configuration, can't handle parameter sharing so we are cloning |
|
the weights instead. |
|
""" |
|
output_embeddings = self.get_output_embeddings() |
|
if output_embeddings is not None: |
|
self._tie_or_clone_weights(output_embeddings, self.get_input_embeddings()) |
|
|
|
def _tie_or_clone_weights(self, output_embeddings, input_embeddings): |
|
""" Tie or clone module weights depending of whether we are using TorchScript or not |
|
""" |
|
if self.config.torchscript: |
|
output_embeddings.weight = nn.Parameter(input_embeddings.weight.clone()) |
|
else: |
|
output_embeddings.weight = input_embeddings.weight |
|
|
|
if getattr(output_embeddings, "bias", None) is not None: |
|
output_embeddings.bias.data = torch.nn.functional.pad( |
|
output_embeddings.bias.data, |
|
(0, output_embeddings.weight.shape[0] - output_embeddings.bias.shape[0],), |
|
"constant", |
|
0, |
|
) |
|
if hasattr(output_embeddings, "out_features") and hasattr(input_embeddings, "num_embeddings"): |
|
output_embeddings.out_features = input_embeddings.num_embeddings |
|
|
|
def resize_token_embeddings(self, new_num_tokens: Optional[int] = None): |
|
""" Resize input token embeddings matrix of the model if new_num_tokens != config.vocab_size. |
|
Take care of tying weights embeddings afterwards if the model class has a `tie_weights()` method. |
|
|
|
Arguments: |
|
|
|
new_num_tokens: (`optional`) int: |
|
New number of tokens in the embedding matrix. Increasing the size will add newly initialized vectors at the end. Reducing the size will remove vectors from the end. |
|
If not provided or None: does nothing and just returns a pointer to the input tokens ``torch.nn.Embeddings`` Module of the model. |
|
|
|
Return: ``torch.nn.Embeddings`` |
|
Pointer to the input tokens Embeddings Module of the model |
|
""" |
|
base_model = getattr(self, self.base_model_prefix, self) |
|
model_embeds = base_model._resize_token_embeddings(new_num_tokens) |
|
if new_num_tokens is None: |
|
return model_embeds |
|
|
|
|
|
self.config.vocab_size = new_num_tokens |
|
base_model.vocab_size = new_num_tokens |
|
|
|
|
|
self.tie_weights() |
|
|
|
return model_embeds |
|
|
|
def _resize_token_embeddings(self, new_num_tokens): |
|
old_embeddings = self.get_input_embeddings() |
|
new_embeddings = self._get_resized_embeddings(old_embeddings, new_num_tokens) |
|
self.set_input_embeddings(new_embeddings) |
|
return self.get_input_embeddings() |
|
|
|
def _get_resized_embeddings( |
|
self, old_embeddings: torch.nn.Embedding, new_num_tokens: Optional[int] = None |
|
) -> torch.nn.Embedding: |
|
""" Build a resized Embedding Module from a provided token Embedding Module. |
|
Increasing the size will add newly initialized vectors at the end |
|
Reducing the size will remove vectors from the end |
|
|
|
Args: |
|
old_embeddings: ``torch.nn.Embedding`` |
|
Old embeddings to be resized. |
|
new_num_tokens: (`optional`) int |
|
New number of tokens in the embedding matrix. |
|
Increasing the size will add newly initialized vectors at the end |
|
Reducing the size will remove vectors from the end |
|
If not provided or None: return the provided token Embedding Module. |
|
Return: ``torch.nn.Embedding`` |
|
Pointer to the resized Embedding Module or the old Embedding Module if new_num_tokens is None |
|
""" |
|
if new_num_tokens is None: |
|
return old_embeddings |
|
|
|
old_num_tokens, old_embedding_dim = old_embeddings.weight.size() |
|
if old_num_tokens == new_num_tokens: |
|
return old_embeddings |
|
|
|
|
|
new_embeddings = nn.Embedding(new_num_tokens, old_embedding_dim) |
|
new_embeddings.to(old_embeddings.weight.device) |
|
|
|
|
|
self._init_weights(new_embeddings) |
|
|
|
|
|
num_tokens_to_copy = min(old_num_tokens, new_num_tokens) |
|
new_embeddings.weight.data[:num_tokens_to_copy, :] = old_embeddings.weight.data[:num_tokens_to_copy, :] |
|
|
|
return new_embeddings |
|
|
|
def init_weights(self): |
|
""" Initialize and prunes weights if needed. """ |
|
|
|
self.apply(self._init_weights) |
|
|
|
|
|
if self.config.pruned_heads: |
|
self.prune_heads(self.config.pruned_heads) |
|
|
|
|
|
self.tie_weights() |
|
|
|
def prune_heads(self, heads_to_prune: Dict): |
|
""" Prunes heads of the base model. |
|
|
|
Arguments: |
|
|
|
heads_to_prune: dict with keys being selected layer indices (`int`) and associated values being the list of heads to prune in said layer (list of `int`). |
|
E.g. {1: [0, 2], 2: [2, 3]} will prune heads 0 and 2 on layer 1 and heads 2 and 3 on layer 2. |
|
""" |
|
|
|
for layer, heads in heads_to_prune.items(): |
|
union_heads = set(self.config.pruned_heads.get(layer, [])) | set(heads) |
|
self.config.pruned_heads[layer] = list(union_heads) |
|
|
|
self.base_model._prune_heads(heads_to_prune) |
|
|
|
def save_pretrained(self, save_directory): |
|
""" Save a model and its configuration file to a directory, so that it |
|
can be re-loaded using the `:func:`~transformers.PreTrainedModel.from_pretrained`` class method. |
|
|
|
Arguments: |
|
save_directory: directory to which to save. |
|
""" |
|
if os.path.isfile(save_directory): |
|
logger.error("Provided path ({}) should be a directory, not a file".format(save_directory)) |
|
return |
|
os.makedirs(save_directory, exist_ok=True) |
|
|
|
|
|
model_to_save = self.module if hasattr(self, "module") else self |
|
|
|
|
|
model_to_save.config.architectures = [model_to_save.__class__.__name__] |
|
|
|
|
|
output_model_file = os.path.join(save_directory, WEIGHTS_NAME) |
|
|
|
if getattr(self.config, "xla_device", False): |
|
import torch_xla.core.xla_model as xm |
|
|
|
if xm.is_master_ordinal(): |
|
|
|
model_to_save.config.save_pretrained(save_directory) |
|
|
|
xm.save(model_to_save.state_dict(), output_model_file) |
|
else: |
|
model_to_save.config.save_pretrained(save_directory) |
|
torch.save(model_to_save.state_dict(), output_model_file) |
|
|
|
logger.info("Model weights saved in {}".format(output_model_file)) |
|
|
|
@classmethod |
|
def from_pretrained(cls, pretrained_model_name_or_path, *model_args, **kwargs): |
|
r"""Instantiate a pretrained pytorch model from a pre-trained model configuration. |
|
|
|
The model is set in evaluation mode by default using ``model.eval()`` (Dropout modules are deactivated) |
|
To train the model, you should first set it back in training mode with ``model.train()`` |
|
|
|
The warning ``Weights from XXX not initialized from pretrained model`` means that the weights of XXX do not come pre-trained with the rest of the model. |
|
It is up to you to train those weights with a downstream fine-tuning task. |
|
|
|
The warning ``Weights from XXX not used in YYY`` means that the layer XXX is not used by YYY, therefore those weights are discarded. |
|
|
|
Parameters: |
|
pretrained_model_name_or_path: either: |
|
- a string with the `shortcut name` of a pre-trained model to load from cache or download, e.g.: ``bert-base-uncased``. |
|
- a string with the `identifier name` of a pre-trained model that was user-uploaded to our S3, e.g.: ``dbmdz/bert-base-german-cased``. |
|
- a path to a `directory` containing model weights saved using :func:`~transformers.PreTrainedModel.save_pretrained`, e.g.: ``./my_model_directory/``. |
|
- a path or url to a `tensorflow index checkpoint file` (e.g. `./tf_model/model.ckpt.index`). In this case, ``from_tf`` should be set to True and a configuration object should be provided as ``config`` argument. This loading path is slower than converting the TensorFlow checkpoint in a PyTorch model using the provided conversion scripts and loading the PyTorch model afterwards. |
|
- None if you are both providing the configuration and state dictionary (resp. with keyword arguments ``config`` and ``state_dict``) |
|
|
|
model_args: (`optional`) Sequence of positional arguments: |
|
All remaning positional arguments will be passed to the underlying model's ``__init__`` method |
|
|
|
config: (`optional`) one of: |
|
- an instance of a class derived from :class:`~transformers.PretrainedConfig`, or |
|
- a string valid as input to :func:`~transformers.PretrainedConfig.from_pretrained()` |
|
|
|
Configuration for the model to use instead of an automatically loaded configuation. Configuration can be automatically loaded when: |
|
- the model is a model provided by the library (loaded with the ``shortcut-name`` string of a pretrained model), or |
|
- the model was saved using :func:`~transformers.PreTrainedModel.save_pretrained` and is reloaded by suppling the save directory. |
|
- the model is loaded by suppling a local directory as ``pretrained_model_name_or_path`` and a configuration JSON file named `config.json` is found in the directory. |
|
|
|
state_dict: (`optional`) dict: |
|
an optional state dictionnary for the model to use instead of a state dictionary loaded from saved weights file. |
|
This option can be used if you want to create a model from a pretrained configuration but load your own weights. |
|
In this case though, you should check if using :func:`~transformers.PreTrainedModel.save_pretrained` and :func:`~transformers.PreTrainedModel.from_pretrained` is not a simpler option. |
|
|
|
cache_dir: (`optional`) string: |
|
Path to a directory in which a downloaded pre-trained model |
|
configuration should be cached if the standard cache should not be used. |
|
|
|
force_download: (`optional`) boolean, default False: |
|
Force to (re-)download the model weights and configuration files and override the cached versions if they exists. |
|
|
|
resume_download: (`optional`) boolean, default False: |
|
Do not delete incompletely recieved file. Attempt to resume the download if such a file exists. |
|
|
|
proxies: (`optional`) dict, default None: |
|
A dictionary of proxy servers to use by protocol or endpoint, e.g.: {'http': 'foo.bar:3128', 'http://hostname': 'foo.bar:4012'}. |
|
The proxies are used on each request. |
|
|
|
output_loading_info: (`optional`) boolean: |
|
Set to ``True`` to also return a dictionnary containing missing keys, unexpected keys and error messages. |
|
|
|
kwargs: (`optional`) Remaining dictionary of keyword arguments: |
|
Can be used to update the configuration object (after it being loaded) and initiate the model. (e.g. ``output_attention=True``). Behave differently depending on whether a `config` is provided or automatically loaded: |
|
|
|
- If a configuration is provided with ``config``, ``**kwargs`` will be directly passed to the underlying model's ``__init__`` method (we assume all relevant updates to the configuration have already been done) |
|
- If a configuration is not provided, ``kwargs`` will be first passed to the configuration class initialization function (:func:`~transformers.PretrainedConfig.from_pretrained`). Each key of ``kwargs`` that corresponds to a configuration attribute will be used to override said attribute with the supplied ``kwargs`` value. Remaining keys that do not correspond to any configuration attribute will be passed to the underlying model's ``__init__`` function. |
|
|
|
Examples:: |
|
|
|
# For example purposes. Not runnable. |
|
model = BertModel.from_pretrained('bert-base-uncased') # Download model and configuration from S3 and cache. |
|
model = BertModel.from_pretrained('./test/saved_model/') # E.g. model was saved using `save_pretrained('./test/saved_model/')` |
|
model = BertModel.from_pretrained('bert-base-uncased', output_attention=True) # Update configuration during loading |
|
assert model.config.output_attention == True |
|
# Loading from a TF checkpoint file instead of a PyTorch model (slower) |
|
config = BertConfig.from_json_file('./tf_model/my_tf_model_config.json') |
|
model = BertModel.from_pretrained('./tf_model/my_tf_checkpoint.ckpt.index', from_tf=True, config=config) |
|
|
|
""" |
|
config = kwargs.pop("config", None) |
|
state_dict = kwargs.pop("state_dict", None) |
|
cache_dir = kwargs.pop("cache_dir", None) |
|
from_tf = kwargs.pop("from_tf", False) |
|
force_download = kwargs.pop("force_download", False) |
|
resume_download = kwargs.pop("resume_download", False) |
|
proxies = kwargs.pop("proxies", None) |
|
output_loading_info = kwargs.pop("output_loading_info", False) |
|
local_files_only = kwargs.pop("local_files_only", False) |
|
use_cdn = kwargs.pop("use_cdn", True) |
|
|
|
|
|
if not isinstance(config, PretrainedConfig): |
|
config_path = config if config is not None else pretrained_model_name_or_path |
|
config, model_kwargs = cls.config_class.from_pretrained( |
|
config_path, |
|
*model_args, |
|
cache_dir=cache_dir, |
|
return_unused_kwargs=True, |
|
force_download=force_download, |
|
resume_download=resume_download, |
|
proxies=proxies, |
|
local_files_only=local_files_only, |
|
**kwargs, |
|
) |
|
else: |
|
model_kwargs = kwargs |
|
|
|
|
|
if pretrained_model_name_or_path is not None: |
|
if os.path.isdir(pretrained_model_name_or_path): |
|
if from_tf and os.path.isfile(os.path.join(pretrained_model_name_or_path, TF_WEIGHTS_NAME + ".index")): |
|
|
|
archive_file = os.path.join(pretrained_model_name_or_path, TF_WEIGHTS_NAME + ".index") |
|
elif from_tf and os.path.isfile(os.path.join(pretrained_model_name_or_path, TF2_WEIGHTS_NAME)): |
|
|
|
archive_file = os.path.join(pretrained_model_name_or_path, TF2_WEIGHTS_NAME) |
|
elif os.path.isfile(os.path.join(pretrained_model_name_or_path, WEIGHTS_NAME)): |
|
|
|
archive_file = os.path.join(pretrained_model_name_or_path, WEIGHTS_NAME) |
|
else: |
|
raise EnvironmentError( |
|
"Error no file named {} found in directory {} or `from_tf` set to False".format( |
|
[WEIGHTS_NAME, TF2_WEIGHTS_NAME, TF_WEIGHTS_NAME + ".index"], |
|
pretrained_model_name_or_path, |
|
) |
|
) |
|
elif os.path.isfile(pretrained_model_name_or_path) or is_remote_url(pretrained_model_name_or_path): |
|
archive_file = pretrained_model_name_or_path |
|
elif os.path.isfile(pretrained_model_name_or_path + ".index"): |
|
assert ( |
|
from_tf |
|
), "We found a TensorFlow checkpoint at {}, please set from_tf to True to load from this checkpoint".format( |
|
pretrained_model_name_or_path + ".index" |
|
) |
|
archive_file = pretrained_model_name_or_path + ".index" |
|
else: |
|
archive_file = hf_bucket_url( |
|
pretrained_model_name_or_path, |
|
filename=(TF2_WEIGHTS_NAME if from_tf else WEIGHTS_NAME), |
|
use_cdn=use_cdn, |
|
) |
|
|
|
try: |
|
|
|
resolved_archive_file = cached_path( |
|
archive_file, |
|
cache_dir=cache_dir, |
|
force_download=force_download, |
|
proxies=proxies, |
|
resume_download=resume_download, |
|
local_files_only=local_files_only, |
|
) |
|
if resolved_archive_file is None: |
|
raise EnvironmentError |
|
except EnvironmentError: |
|
msg = ( |
|
f"Can't load weights for '{pretrained_model_name_or_path}'. Make sure that:\n\n" |
|
f"- '{pretrained_model_name_or_path}' is a correct model identifier listed on 'https://huggingface.co/models'\n\n" |
|
f"- or '{pretrained_model_name_or_path}' is the correct path to a directory containing a file named one of {WEIGHTS_NAME}, {TF2_WEIGHTS_NAME}, {TF_WEIGHTS_NAME}.\n\n" |
|
) |
|
raise EnvironmentError(msg) |
|
|
|
if resolved_archive_file == archive_file: |
|
logger.info("loading weights file {}".format(archive_file)) |
|
else: |
|
logger.info("loading weights file {} from cache at {}".format(archive_file, resolved_archive_file)) |
|
else: |
|
resolved_archive_file = None |
|
|
|
|
|
model = cls(config, *model_args, **model_kwargs) |
|
|
|
if state_dict is None and not from_tf: |
|
try: |
|
state_dict = torch.load(resolved_archive_file, map_location="cpu") |
|
except Exception: |
|
raise OSError( |
|
"Unable to load weights from pytorch checkpoint file. " |
|
"If you tried to load a PyTorch model from a TF 2.0 checkpoint, please set from_tf=True. " |
|
) |
|
|
|
missing_keys = [] |
|
unexpected_keys = [] |
|
error_msgs = [] |
|
|
|
if from_tf: |
|
if resolved_archive_file.endswith(".index"): |
|
|
|
model = cls.load_tf_weights(model, config, resolved_archive_file[:-6]) |
|
else: |
|
|
|
try: |
|
from transformers import load_tf2_checkpoint_in_pytorch_model |
|
|
|
model = load_tf2_checkpoint_in_pytorch_model(model, resolved_archive_file, allow_missing_keys=True) |
|
except ImportError: |
|
logger.error( |
|
"Loading a TensorFlow model in PyTorch, requires both PyTorch and TensorFlow to be installed. Please see " |
|
"https://pytorch.org/ and https://www.tensorflow.org/install/ for installation instructions." |
|
) |
|
raise |
|
else: |
|
|
|
old_keys = [] |
|
new_keys = [] |
|
for key in state_dict.keys(): |
|
new_key = None |
|
if "gamma" in key: |
|
new_key = key.replace("gamma", "weight") |
|
if "beta" in key: |
|
new_key = key.replace("beta", "bias") |
|
if new_key: |
|
old_keys.append(key) |
|
new_keys.append(new_key) |
|
for old_key, new_key in zip(old_keys, new_keys): |
|
state_dict[new_key] = state_dict.pop(old_key) |
|
|
|
|
|
metadata = getattr(state_dict, "_metadata", None) |
|
state_dict = state_dict.copy() |
|
if metadata is not None: |
|
state_dict._metadata = metadata |
|
|
|
|
|
|
|
''' |
|
for key, _ in state_dict.items(): |
|
print(key) |
|
''' |
|
|
|
|
|
|
|
|
|
|
|
def load(module: nn.Module, prefix=""): |
|
local_metadata = {} if metadata is None else metadata.get(prefix[:-1], {}) |
|
module._load_from_state_dict( |
|
state_dict, prefix, local_metadata, True, missing_keys, unexpected_keys, error_msgs, |
|
) |
|
for name, child in module._modules.items(): |
|
if child is not None: |
|
load(child, prefix + name + ".") |
|
|
|
|
|
start_prefix = "" |
|
model_to_load = model |
|
has_prefix_module = any(s.startswith(cls.base_model_prefix) for s in state_dict.keys()) |
|
if not hasattr(model, cls.base_model_prefix) and has_prefix_module: |
|
start_prefix = cls.base_model_prefix + "." |
|
if hasattr(model, cls.base_model_prefix) and not has_prefix_module: |
|
model_to_load = getattr(model, cls.base_model_prefix) |
|
|
|
load(model_to_load, prefix=start_prefix) |
|
|
|
if model.__class__.__name__ != model_to_load.__class__.__name__: |
|
base_model_state_dict = model_to_load.state_dict().keys() |
|
head_model_state_dict_without_base_prefix = [ |
|
key.split(cls.base_model_prefix + ".")[-1] for key in model.state_dict().keys() |
|
] |
|
|
|
missing_keys.extend(head_model_state_dict_without_base_prefix - base_model_state_dict) |
|
|
|
if len(unexpected_keys) > 0: |
|
logger.warning( |
|
f"Some weights of the model checkpoint at {pretrained_model_name_or_path} were not used when " |
|
f"initializing {model.__class__.__name__}: {unexpected_keys}\n" |
|
f"- This IS expected if you are initializing {model.__class__.__name__} from the checkpoint of a model trained on another task " |
|
f"or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPretraining model).\n" |
|
f"- This IS NOT expected if you are initializing {model.__class__.__name__} from the checkpoint of a model that you expect " |
|
f"to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model)." |
|
) |
|
else: |
|
logger.info(f"All model checkpoint weights were used when initializing {model.__class__.__name__}.\n") |
|
if len(missing_keys) > 0: |
|
logger.warning( |
|
f"Some weights of {model.__class__.__name__} were not initialized from the model checkpoint at {pretrained_model_name_or_path} " |
|
f"and are newly initialized: {missing_keys}\n" |
|
f"You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference." |
|
) |
|
else: |
|
logger.info( |
|
f"All the weights of {model.__class__.__name__} were initialized from the model checkpoint at {pretrained_model_name_or_path}.\n" |
|
f"If your task is similar to the task the model of the ckeckpoint was trained on, " |
|
f"you can already use {model.__class__.__name__} for predictions without further training." |
|
) |
|
if len(error_msgs) > 0: |
|
raise RuntimeError( |
|
"Error(s) in loading state_dict for {}:\n\t{}".format( |
|
model.__class__.__name__, "\n\t".join(error_msgs) |
|
) |
|
) |
|
model.tie_weights() |
|
|
|
|
|
model.eval() |
|
|
|
if output_loading_info: |
|
loading_info = { |
|
"missing_keys": missing_keys, |
|
"unexpected_keys": unexpected_keys, |
|
"error_msgs": error_msgs, |
|
} |
|
return model, loading_info |
|
|
|
if hasattr(config, "xla_device") and config.xla_device: |
|
import torch_xla.core.xla_model as xm |
|
|
|
model = xm.send_cpu_data_to_device(model, xm.xla_device()) |
|
model.to(xm.xla_device()) |
|
|
|
return model |
|
|
|
|
|
class Conv1D(nn.Module): |
|
def __init__(self, nf, nx): |
|
""" Conv1D layer as defined by Radford et al. for OpenAI GPT (and also used in GPT-2) |
|
Basically works like a Linear layer but the weights are transposed |
|
""" |
|
super().__init__() |
|
self.nf = nf |
|
w = torch.empty(nx, nf) |
|
nn.init.normal_(w, std=0.02) |
|
self.weight = nn.Parameter(w) |
|
self.bias = nn.Parameter(torch.zeros(nf)) |
|
|
|
def forward(self, x): |
|
size_out = x.size()[:-1] + (self.nf,) |
|
x = torch.addmm(self.bias, x.view(-1, x.size(-1)), self.weight) |
|
x = x.view(*size_out) |
|
return x |
|
|
|
|
|
class PoolerStartLogits(nn.Module): |
|
""" Compute SQuAD start_logits from sequence hidden states. """ |
|
|
|
def __init__(self, config): |
|
super().__init__() |
|
self.dense = nn.Linear(config.hidden_size, 1) |
|
|
|
def forward(self, hidden_states, p_mask=None): |
|
""" Args: |
|
**p_mask**: (`optional`) ``torch.FloatTensor`` of shape `(batch_size, seq_len)` |
|
invalid position mask such as query and special symbols (PAD, SEP, CLS) |
|
1.0 means token should be masked. |
|
""" |
|
x = self.dense(hidden_states).squeeze(-1) |
|
|
|
if p_mask is not None: |
|
if next(self.parameters()).dtype == torch.float16: |
|
x = x * (1 - p_mask) - 65500 * p_mask |
|
else: |
|
x = x * (1 - p_mask) - 1e30 * p_mask |
|
|
|
return x |
|
|
|
|
|
class PoolerEndLogits(nn.Module): |
|
""" Compute SQuAD end_logits from sequence hidden states and start token hidden state. |
|
""" |
|
|
|
def __init__(self, config): |
|
super().__init__() |
|
self.dense_0 = nn.Linear(config.hidden_size * 2, config.hidden_size) |
|
self.activation = nn.Tanh() |
|
self.LayerNorm = nn.LayerNorm(config.hidden_size, eps=config.layer_norm_eps) |
|
self.dense_1 = nn.Linear(config.hidden_size, 1) |
|
|
|
def forward(self, hidden_states, start_states=None, start_positions=None, p_mask=None): |
|
""" Args: |
|
One of ``start_states``, ``start_positions`` should be not None. |
|
If both are set, ``start_positions`` overrides ``start_states``. |
|
|
|
**start_states**: ``torch.LongTensor`` of shape identical to hidden_states |
|
hidden states of the first tokens for the labeled span. |
|
**start_positions**: ``torch.LongTensor`` of shape ``(batch_size,)`` |
|
position of the first token for the labeled span: |
|
**p_mask**: (`optional`) ``torch.FloatTensor`` of shape ``(batch_size, seq_len)`` |
|
Mask of invalid position such as query and special symbols (PAD, SEP, CLS) |
|
1.0 means token should be masked. |
|
""" |
|
assert ( |
|
start_states is not None or start_positions is not None |
|
), "One of start_states, start_positions should be not None" |
|
if start_positions is not None: |
|
slen, hsz = hidden_states.shape[-2:] |
|
start_positions = start_positions[:, None, None].expand(-1, -1, hsz) |
|
start_states = hidden_states.gather(-2, start_positions) |
|
start_states = start_states.expand(-1, slen, -1) |
|
|
|
x = self.dense_0(torch.cat([hidden_states, start_states], dim=-1)) |
|
x = self.activation(x) |
|
x = self.LayerNorm(x) |
|
x = self.dense_1(x).squeeze(-1) |
|
|
|
if p_mask is not None: |
|
if next(self.parameters()).dtype == torch.float16: |
|
x = x * (1 - p_mask) - 65500 * p_mask |
|
else: |
|
x = x * (1 - p_mask) - 1e30 * p_mask |
|
|
|
return x |
|
|
|
|
|
class PoolerAnswerClass(nn.Module): |
|
""" Compute SQuAD 2.0 answer class from classification and start tokens hidden states. """ |
|
|
|
def __init__(self, config): |
|
super().__init__() |
|
self.dense_0 = nn.Linear(config.hidden_size * 2, config.hidden_size) |
|
self.activation = nn.Tanh() |
|
self.dense_1 = nn.Linear(config.hidden_size, 1, bias=False) |
|
|
|
def forward(self, hidden_states, start_states=None, start_positions=None, cls_index=None): |
|
""" |
|
Args: |
|
One of ``start_states``, ``start_positions`` should be not None. |
|
If both are set, ``start_positions`` overrides ``start_states``. |
|
|
|
**start_states**: ``torch.LongTensor`` of shape identical to ``hidden_states``. |
|
hidden states of the first tokens for the labeled span. |
|
**start_positions**: ``torch.LongTensor`` of shape ``(batch_size,)`` |
|
position of the first token for the labeled span. |
|
**cls_index**: torch.LongTensor of shape ``(batch_size,)`` |
|
position of the CLS token. If None, take the last token. |
|
|
|
note(Original repo): |
|
no dependency on end_feature so that we can obtain one single `cls_logits` |
|
for each sample |
|
""" |
|
hsz = hidden_states.shape[-1] |
|
assert ( |
|
start_states is not None or start_positions is not None |
|
), "One of start_states, start_positions should be not None" |
|
if start_positions is not None: |
|
start_positions = start_positions[:, None, None].expand(-1, -1, hsz) |
|
start_states = hidden_states.gather(-2, start_positions).squeeze(-2) |
|
|
|
if cls_index is not None: |
|
cls_index = cls_index[:, None, None].expand(-1, -1, hsz) |
|
cls_token_state = hidden_states.gather(-2, cls_index).squeeze(-2) |
|
else: |
|
cls_token_state = hidden_states[:, -1, :] |
|
|
|
x = self.dense_0(torch.cat([start_states, cls_token_state], dim=-1)) |
|
x = self.activation(x) |
|
x = self.dense_1(x).squeeze(-1) |
|
|
|
return x |
|
|
|
|
|
class SQuADHead(nn.Module): |
|
r""" A SQuAD head inspired by XLNet. |
|
|
|
Parameters: |
|
config (:class:`~transformers.XLNetConfig`): Model configuration class with all the parameters of the model. |
|
|
|
Inputs: |
|
**hidden_states**: ``torch.FloatTensor`` of shape ``(batch_size, seq_len, hidden_size)`` |
|
hidden states of sequence tokens |
|
**start_positions**: ``torch.LongTensor`` of shape ``(batch_size,)`` |
|
position of the first token for the labeled span. |
|
**end_positions**: ``torch.LongTensor`` of shape ``(batch_size,)`` |
|
position of the last token for the labeled span. |
|
**cls_index**: torch.LongTensor of shape ``(batch_size,)`` |
|
position of the CLS token. If None, take the last token. |
|
**is_impossible**: ``torch.LongTensor`` of shape ``(batch_size,)`` |
|
Whether the question has a possible answer in the paragraph or not. |
|
**p_mask**: (`optional`) ``torch.FloatTensor`` of shape ``(batch_size, seq_len)`` |
|
Mask of invalid position such as query and special symbols (PAD, SEP, CLS) |
|
1.0 means token should be masked. |
|
|
|
Outputs: `Tuple` comprising various elements depending on the configuration (config) and inputs: |
|
**loss**: (`optional`, returned if both ``start_positions`` and ``end_positions`` are provided) ``torch.FloatTensor`` of shape ``(1,)``: |
|
Classification loss as the sum of start token, end token (and is_impossible if provided) classification losses. |
|
**start_top_log_probs**: (`optional`, returned if ``start_positions`` or ``end_positions`` is not provided) |
|
``torch.FloatTensor`` of shape ``(batch_size, config.start_n_top)`` |
|
Log probabilities for the top config.start_n_top start token possibilities (beam-search). |
|
**start_top_index**: (`optional`, returned if ``start_positions`` or ``end_positions`` is not provided) |
|
``torch.LongTensor`` of shape ``(batch_size, config.start_n_top)`` |
|
Indices for the top config.start_n_top start token possibilities (beam-search). |
|
**end_top_log_probs**: (`optional`, returned if ``start_positions`` or ``end_positions`` is not provided) |
|
``torch.FloatTensor`` of shape ``(batch_size, config.start_n_top * config.end_n_top)`` |
|
Log probabilities for the top ``config.start_n_top * config.end_n_top`` end token possibilities (beam-search). |
|
**end_top_index**: (`optional`, returned if ``start_positions`` or ``end_positions`` is not provided) |
|
``torch.LongTensor`` of shape ``(batch_size, config.start_n_top * config.end_n_top)`` |
|
Indices for the top ``config.start_n_top * config.end_n_top`` end token possibilities (beam-search). |
|
**cls_logits**: (`optional`, returned if ``start_positions`` or ``end_positions`` is not provided) |
|
``torch.FloatTensor`` of shape ``(batch_size,)`` |
|
Log probabilities for the ``is_impossible`` label of the answers. |
|
""" |
|
|
|
def __init__(self, config): |
|
super().__init__() |
|
self.start_n_top = config.start_n_top |
|
self.end_n_top = config.end_n_top |
|
|
|
self.start_logits = PoolerStartLogits(config) |
|
self.end_logits = PoolerEndLogits(config) |
|
self.answer_class = PoolerAnswerClass(config) |
|
|
|
def forward( |
|
self, hidden_states, start_positions=None, end_positions=None, cls_index=None, is_impossible=None, p_mask=None, |
|
): |
|
outputs = () |
|
|
|
start_logits = self.start_logits(hidden_states, p_mask=p_mask) |
|
|
|
if start_positions is not None and end_positions is not None: |
|
|
|
for x in (start_positions, end_positions, cls_index, is_impossible): |
|
if x is not None and x.dim() > 1: |
|
x.squeeze_(-1) |
|
|
|
|
|
end_logits = self.end_logits(hidden_states, start_positions=start_positions, p_mask=p_mask) |
|
|
|
loss_fct = CrossEntropyLoss() |
|
start_loss = loss_fct(start_logits, start_positions) |
|
end_loss = loss_fct(end_logits, end_positions) |
|
total_loss = (start_loss + end_loss) / 2 |
|
|
|
if cls_index is not None and is_impossible is not None: |
|
|
|
cls_logits = self.answer_class(hidden_states, start_positions=start_positions, cls_index=cls_index) |
|
loss_fct_cls = nn.BCEWithLogitsLoss() |
|
cls_loss = loss_fct_cls(cls_logits, is_impossible) |
|
|
|
|
|
total_loss += cls_loss * 0.5 |
|
|
|
outputs = (total_loss,) + outputs |
|
|
|
else: |
|
|
|
bsz, slen, hsz = hidden_states.size() |
|
start_log_probs = F.softmax(start_logits, dim=-1) |
|
|
|
start_top_log_probs, start_top_index = torch.topk( |
|
start_log_probs, self.start_n_top, dim=-1 |
|
) |
|
start_top_index_exp = start_top_index.unsqueeze(-1).expand(-1, -1, hsz) |
|
start_states = torch.gather(hidden_states, -2, start_top_index_exp) |
|
start_states = start_states.unsqueeze(1).expand(-1, slen, -1, -1) |
|
|
|
hidden_states_expanded = hidden_states.unsqueeze(2).expand_as( |
|
start_states |
|
) |
|
p_mask = p_mask.unsqueeze(-1) if p_mask is not None else None |
|
end_logits = self.end_logits(hidden_states_expanded, start_states=start_states, p_mask=p_mask) |
|
end_log_probs = F.softmax(end_logits, dim=1) |
|
|
|
end_top_log_probs, end_top_index = torch.topk( |
|
end_log_probs, self.end_n_top, dim=1 |
|
) |
|
end_top_log_probs = end_top_log_probs.view(-1, self.start_n_top * self.end_n_top) |
|
end_top_index = end_top_index.view(-1, self.start_n_top * self.end_n_top) |
|
|
|
start_states = torch.einsum("blh,bl->bh", hidden_states, start_log_probs) |
|
cls_logits = self.answer_class(hidden_states, start_states=start_states, cls_index=cls_index) |
|
|
|
outputs = (start_top_log_probs, start_top_index, end_top_log_probs, end_top_index, cls_logits,) + outputs |
|
|
|
|
|
|
|
return outputs |
|
|
|
|
|
class SequenceSummary(nn.Module): |
|
r""" Compute a single vector summary of a sequence hidden states according to various possibilities: |
|
Args of the config class: |
|
summary_type: |
|
- 'last' => [default] take the last token hidden state (like XLNet) |
|
- 'first' => take the first token hidden state (like Bert) |
|
- 'mean' => take the mean of all tokens hidden states |
|
- 'cls_index' => supply a Tensor of classification token position (GPT/GPT-2) |
|
- 'attn' => Not implemented now, use multi-head attention |
|
summary_use_proj: Add a projection after the vector extraction |
|
summary_proj_to_labels: If True, the projection outputs to config.num_labels classes (otherwise to hidden_size). Default: False. |
|
summary_activation: 'tanh' or another string => add an activation to the output, Other => no activation. Default |
|
summary_first_dropout: Add a dropout before the projection and activation |
|
summary_last_dropout: Add a dropout after the projection and activation |
|
""" |
|
|
|
def __init__(self, config: PretrainedConfig): |
|
super().__init__() |
|
|
|
self.summary_type = getattr(config, "summary_type", "last") |
|
if self.summary_type == "attn": |
|
|
|
|
|
|
|
raise NotImplementedError |
|
|
|
self.summary = Identity() |
|
if hasattr(config, "summary_use_proj") and config.summary_use_proj: |
|
if hasattr(config, "summary_proj_to_labels") and config.summary_proj_to_labels and config.num_labels > 0: |
|
num_classes = config.num_labels |
|
else: |
|
num_classes = config.hidden_size |
|
self.summary = nn.Linear(config.hidden_size, num_classes) |
|
|
|
activation_string = getattr(config, "summary_activation", None) |
|
self.activation: Callable = (get_activation(activation_string) if activation_string else Identity()) |
|
|
|
self.first_dropout = Identity() |
|
if hasattr(config, "summary_first_dropout") and config.summary_first_dropout > 0: |
|
self.first_dropout = nn.Dropout(config.summary_first_dropout) |
|
|
|
self.last_dropout = Identity() |
|
if hasattr(config, "summary_last_dropout") and config.summary_last_dropout > 0: |
|
self.last_dropout = nn.Dropout(config.summary_last_dropout) |
|
|
|
def forward(self, hidden_states, cls_index=None): |
|
""" hidden_states: float Tensor in shape [bsz, ..., seq_len, hidden_size], the hidden-states of the last layer. |
|
cls_index: [optional] position of the classification token if summary_type == 'cls_index', |
|
shape (bsz,) or more generally (bsz, ...) where ... are optional leading dimensions of hidden_states. |
|
if summary_type == 'cls_index' and cls_index is None: |
|
we take the last token of the sequence as classification token |
|
""" |
|
if self.summary_type == "last": |
|
output = hidden_states[:, -1] |
|
elif self.summary_type == "first": |
|
output = hidden_states[:, 0] |
|
elif self.summary_type == "mean": |
|
output = hidden_states.mean(dim=1) |
|
elif self.summary_type == "cls_index": |
|
if cls_index is None: |
|
cls_index = torch.full_like(hidden_states[..., :1, :], hidden_states.shape[-2] - 1, dtype=torch.long,) |
|
else: |
|
cls_index = cls_index.unsqueeze(-1).unsqueeze(-1) |
|
cls_index = cls_index.expand((-1,) * (cls_index.dim() - 1) + (hidden_states.size(-1),)) |
|
|
|
output = hidden_states.gather(-2, cls_index).squeeze(-2) |
|
elif self.summary_type == "attn": |
|
raise NotImplementedError |
|
|
|
output = self.first_dropout(output) |
|
output = self.summary(output) |
|
output = self.activation(output) |
|
output = self.last_dropout(output) |
|
|
|
return output |
|
|
|
|
|
def prune_linear_layer(layer, index, dim=0): |
|
""" Prune a linear layer (a model parameters) to keep only entries in index. |
|
Return the pruned layer as a new layer with requires_grad=True. |
|
Used to remove heads. |
|
""" |
|
index = index.to(layer.weight.device) |
|
W = layer.weight.index_select(dim, index).clone().detach() |
|
if layer.bias is not None: |
|
if dim == 1: |
|
b = layer.bias.clone().detach() |
|
else: |
|
b = layer.bias[index].clone().detach() |
|
new_size = list(layer.weight.size()) |
|
new_size[dim] = len(index) |
|
new_layer = nn.Linear(new_size[1], new_size[0], bias=layer.bias is not None).to(layer.weight.device) |
|
new_layer.weight.requires_grad = False |
|
new_layer.weight.copy_(W.contiguous()) |
|
new_layer.weight.requires_grad = True |
|
if layer.bias is not None: |
|
new_layer.bias.requires_grad = False |
|
new_layer.bias.copy_(b.contiguous()) |
|
new_layer.bias.requires_grad = True |
|
return new_layer |
|
|
|
|
|
def prune_conv1d_layer(layer, index, dim=1): |
|
""" Prune a Conv1D layer (a model parameters) to keep only entries in index. |
|
A Conv1D work as a Linear layer (see e.g. BERT) but the weights are transposed. |
|
Return the pruned layer as a new layer with requires_grad=True. |
|
Used to remove heads. |
|
""" |
|
index = index.to(layer.weight.device) |
|
W = layer.weight.index_select(dim, index).clone().detach() |
|
if dim == 0: |
|
b = layer.bias.clone().detach() |
|
else: |
|
b = layer.bias[index].clone().detach() |
|
new_size = list(layer.weight.size()) |
|
new_size[dim] = len(index) |
|
new_layer = Conv1D(new_size[1], new_size[0]).to(layer.weight.device) |
|
new_layer.weight.requires_grad = False |
|
new_layer.weight.copy_(W.contiguous()) |
|
new_layer.weight.requires_grad = True |
|
new_layer.bias.requires_grad = False |
|
new_layer.bias.copy_(b.contiguous()) |
|
new_layer.bias.requires_grad = True |
|
return new_layer |
|
|
|
|
|
def prune_layer(layer, index, dim=None): |
|
""" Prune a Conv1D or nn.Linear layer (a model parameters) to keep only entries in index. |
|
Return the pruned layer as a new layer with requires_grad=True. |
|
Used to remove heads. |
|
""" |
|
if isinstance(layer, nn.Linear): |
|
return prune_linear_layer(layer, index, dim=0 if dim is None else dim) |
|
elif isinstance(layer, Conv1D): |
|
return prune_conv1d_layer(layer, index, dim=1 if dim is None else dim) |
|
else: |
|
raise ValueError("Can't prune layer of class {}".format(layer.__class__)) |
|
|
|
|
|
def apply_chunking_to_forward( |
|
chunk_size: int, chunk_dim: int, forward_fn: Callable[..., torch.Tensor], *input_tensors |
|
) -> torch.Tensor: |
|
""" |
|
This function chunks the `input_tensors` into smaller input tensor parts of size `chunk_size` over the dimension `chunk_dim`. |
|
It then applies a layer `forward_fn` to each chunk independently to save memory. |
|
If the `forward_fn` is independent across the `chunk_dim` this function will yield the |
|
same result as not applying it. |
|
|
|
Args: |
|
chunk_size: int - the chunk size of a chunked tensor. `num_chunks` = `len(input_tensors[0]) / chunk_size` |
|
chunk_dim: int - the dimension over which the input_tensors should be chunked |
|
forward_fn: fn - the forward fn of the model |
|
input_tensors: tuple(torch.Tensor) - the input tensors of `forward_fn` which are chunked |
|
Returns: |
|
a Tensor with the same shape the foward_fn would have given if applied |
|
|
|
|
|
Examples:: |
|
|
|
# rename the usual forward() fn to forward_chunk() |
|
def forward_chunk(self, hidden_states): |
|
hidden_states = self.decoder(hidden_states) |
|
return hidden_states |
|
|
|
# implement a chunked forward function |
|
def forward(self, hidden_states): |
|
return apply_chunking_to_forward(self.chunk_size_lm_head, self.seq_len_dim, self.forward_chunk, hidden_states) |
|
""" |
|
|
|
assert len(input_tensors) > 0, "{} has to be a tuple/list of tensors".format(input_tensors) |
|
tensor_shape = input_tensors[0].shape |
|
assert all( |
|
input_tensor.shape == tensor_shape for input_tensor in input_tensors |
|
), "All input tenors have to be of the same shape" |
|
|
|
|
|
num_args_in_forward_chunk_fn = len(inspect.signature(forward_fn).parameters) |
|
assert num_args_in_forward_chunk_fn == len( |
|
input_tensors |
|
), "forward_chunk_fn expects {} arguments, but only {} input tensors are given".format( |
|
num_args_in_forward_chunk_fn, len(input_tensors) |
|
) |
|
|
|
if chunk_size > 0: |
|
assert ( |
|
input_tensors[0].shape[chunk_dim] % chunk_size == 0 |
|
), "The dimension to be chunked {} has to be a multiple of the chunk size {}".format( |
|
input_tensors[0].shape[chunk_dim], chunk_size |
|
) |
|
|
|
num_chunks = input_tensors[0].shape[chunk_dim] // chunk_size |
|
|
|
|
|
input_tensors_chunks = tuple(input_tensor.chunk(num_chunks, dim=chunk_dim) for input_tensor in input_tensors) |
|
|
|
output_chunks = tuple(forward_fn(*input_tensors_chunk) for input_tensors_chunk in zip(*input_tensors_chunks)) |
|
|
|
return torch.cat(output_chunks, dim=chunk_dim) |
|
|
|
return forward_fn(*input_tensors) |
|
|