|
import torch |
|
from torch import nn |
|
from torch.nn import BCEWithLogitsLoss, CrossEntropyLoss, MSELoss |
|
from transformers.models.mamba.modeling_mamba import ( |
|
MambaPreTrainedModel, |
|
MambaModel, |
|
MambaCache, |
|
MAMBA_INPUTS_DOCSTRING, |
|
MAMBA_START_DOCSTRING, |
|
) |
|
from typing import List, Optional, Tuple, Union |
|
from transformers.utils import ( |
|
ModelOutput, |
|
add_start_docstrings, |
|
add_start_docstrings_to_model_forward, |
|
add_code_sample_docstrings, |
|
) |
|
from dataclasses import dataclass |
|
|
|
|
|
_CHECKPOINT_FOR_DOC = "state-spaces/mamba-130m-hf" |
|
_CONFIG_FOR_DOC = "MambaConfig" |
|
|
|
|
|
@dataclass |
|
class MambaSequenceClassifierOutput(ModelOutput): |
|
""" |
|
Base class for outputs of sentence classification models. |
|
|
|
Args: |
|
loss (`torch.FloatTensor` of shape `(1,)`, *optional*, returned when `labels` is provided): |
|
Classification (or regression if config.num_labels==1) loss. |
|
logits (`torch.FloatTensor` of shape `(batch_size, config.num_labels)`): |
|
Classification (or regression if config.num_labels==1) scores (before SoftMax). |
|
cache_params (list of five `torch.FloatTensor` of shape `(batch_size, hidden_size, num_hidden_layers)`): |
|
The state of the model at the last time step. Can be used in a forward method with the next `input_ids` to |
|
avoid providing the old `input_ids`. |
|
hidden_states (`tuple(torch.FloatTensor)`, *optional*, returned when `output_hidden_states=True` is passed or when `config.output_hidden_states=True`): |
|
Tuple of `torch.FloatTensor` (one for the output of the embeddings, if the model has an embedding layer, + |
|
one for the output of each layer) of shape `(batch_size, sequence_length, hidden_size)`. |
|
|
|
Hidden-states of the model at the output of each layer plus the optional initial embedding outputs. |
|
""" |
|
|
|
loss: Optional[torch.FloatTensor] = None |
|
logits: torch.FloatTensor = None |
|
|
|
cache_params: Optional[List[torch.FloatTensor]] = None |
|
|
|
hidden_states: Optional[Tuple[torch.FloatTensor, ...]] = None |
|
|
|
|
|
class MambaClassificationHead(nn.Module): |
|
"""Head for sentence-level classification tasks.""" |
|
|
|
def __init__(self, config): |
|
super().__init__() |
|
self.out_proj = nn.Linear(config.hidden_size, config.num_labels, bias=False) |
|
self.out_proj.weight.data.normal_(mean=0.0, std=config.initializer_range) |
|
|
|
self.config = config |
|
|
|
def forward(self, features, **kwargs): |
|
x = features |
|
x = self.out_proj(x) |
|
return x |
|
|
|
|
|
@add_start_docstrings( |
|
"""Mamba Model backbone with a sequence classification/regression head on top (a linear layer on top of |
|
the pooled output) e.g. for GLUE tasks.""", |
|
MAMBA_START_DOCSTRING, |
|
) |
|
class MambaForSequenceClassification(MambaPreTrainedModel): |
|
def __init__(self, config): |
|
super().__init__(config) |
|
self.num_labels = config.num_labels |
|
self.backbone = MambaModel(config) |
|
self.classifier = MambaClassificationHead(config) |
|
|
|
|
|
self.post_init() |
|
|
|
@add_start_docstrings_to_model_forward( |
|
MAMBA_INPUTS_DOCSTRING.format("batch_size, sequence_length") |
|
) |
|
@add_code_sample_docstrings( |
|
checkpoint=_CHECKPOINT_FOR_DOC, |
|
output_type=MambaSequenceClassifierOutput, |
|
config_class=_CONFIG_FOR_DOC, |
|
) |
|
def forward( |
|
self, |
|
input_ids: Optional[torch.LongTensor] = None, |
|
inputs_embeds: Optional[torch.FloatTensor] = None, |
|
cache_params: Optional[MambaCache] = None, |
|
use_cache: Optional[bool] = None, |
|
labels: Optional[torch.LongTensor] = None, |
|
output_hidden_states: Optional[bool] = None, |
|
return_dict: Optional[bool] = None, |
|
**kwargs, |
|
) -> Union[Tuple, MambaSequenceClassifierOutput]: |
|
r""" |
|
labels (`torch.LongTensor` of shape `(batch_size,)`, *optional*): |
|
Labels for computing the sequence classification/regression loss. |
|
Indices should be in `[0, ..., config.num_labels - 1]`. |
|
If `config.num_labels == 1` a regression loss is computed (Mean-Square loss), |
|
If `config.num_labels > 1` a classification loss is computed (Cross-Entropy). |
|
""" |
|
return_dict = ( |
|
return_dict if return_dict is not None else self.config.use_return_dict |
|
) |
|
|
|
mamba_outputs = self.backbone( |
|
input_ids, |
|
cache_params=cache_params, |
|
use_cache=use_cache, |
|
inputs_embeds=inputs_embeds, |
|
output_hidden_states=output_hidden_states, |
|
return_dict=return_dict, |
|
) |
|
hidden_states = mamba_outputs[0] |
|
logits = self.classifier(hidden_states) |
|
|
|
if input_ids is not None: |
|
batch_size, sequence_length = input_ids.shape[:2] |
|
else: |
|
batch_size, sequence_length = inputs_embeds.shape[:2] |
|
assert ( |
|
self.config.pad_token_id is not None or batch_size == 1 |
|
), "Cannot handle batch sizes > 1 if no padding token is defined." |
|
|
|
if self.config.pad_token_id is None: |
|
sequence_lengths = -1 |
|
else: |
|
if input_ids is not None: |
|
|
|
sequence_lengths = ( |
|
torch.eq(input_ids, self.config.pad_token_id).int().argmax(-1) - 1 |
|
) |
|
sequence_lengths = sequence_lengths % input_ids.shape[-1] |
|
sequence_lengths = sequence_lengths.to(logits.device) |
|
else: |
|
sequence_lengths = -1 |
|
print( |
|
f"{self.__class__.__name__} will not detect padding tokens in `inputs_embeds`. Results may be " |
|
"unexpected if using padding tokens in conjunction with `inputs_embeds.`" |
|
) |
|
|
|
pooled_logits = logits[ |
|
torch.arange(batch_size, device=logits.device), sequence_lengths |
|
] |
|
|
|
loss = None |
|
if labels is not None: |
|
if self.config.problem_type is None: |
|
if self.num_labels == 1: |
|
self.config.problem_type = "regression" |
|
elif self.num_labels > 1 and ( |
|
labels.dtype == torch.long or labels.dtype == torch.int |
|
): |
|
self.config.problem_type = "single_label_classification" |
|
else: |
|
self.config.problem_type = "multi_label_classification" |
|
|
|
if self.config.problem_type == "regression": |
|
loss_fct = MSELoss() |
|
if self.num_labels == 1: |
|
loss = loss_fct(pooled_logits.squeeze(), labels.squeeze()) |
|
else: |
|
loss = loss_fct(pooled_logits, labels) |
|
elif self.config.problem_type == "single_label_classification": |
|
loss_fct = CrossEntropyLoss() |
|
loss = loss_fct( |
|
pooled_logits.view(-1, self.num_labels), labels.view(-1) |
|
) |
|
elif self.config.problem_type == "multi_label_classification": |
|
loss_fct = BCEWithLogitsLoss() |
|
loss = loss_fct(pooled_logits, labels) |
|
|
|
if not return_dict: |
|
output = (pooled_logits,) + mamba_outputs[1:] |
|
return ((loss,) + output) if loss is not None else output |
|
|
|
return MambaSequenceClassifierOutput( |
|
loss=loss, |
|
logits=pooled_logits, |
|
cache_params=mamba_outputs.cache_params, |
|
hidden_states=mamba_outputs.hidden_states, |
|
) |
|
|