import torch import torch.nn as nn from torch.nn import BCEWithLogitsLoss, CrossEntropyLoss, MSELoss from transformers.models.wav2vec2.modeling_wav2vec2 import ( Wav2Vec2PreTrainedModel, Wav2Vec2Model ) from transformers.models.hubert.modeling_hubert import ( HubertPreTrainedModel, HubertModel ) from src.modeling_outputs import SpeechClassifierOutput class Wav2Vec2ClassificationHead(nn.Module): """Head for wav2vec classification task.""" def __init__(self, config): super().__init__() self.dense = nn.Linear(config.hidden_size, config.hidden_size) self.dropout = nn.Dropout(config.final_dropout) self.out_proj = nn.Linear(config.hidden_size, config.num_labels) def forward(self, features, **kwargs): x = features x = self.dropout(x) x = self.dense(x) x = torch.tanh(x) x = self.dropout(x) x = self.out_proj(x) return x class Wav2Vec2ForSpeechClassification(Wav2Vec2PreTrainedModel): def __init__(self, config): super().__init__(config) self.num_labels = config.num_labels self.pooling_mode = config.pooling_mode self.config = config self.wav2vec2 = Wav2Vec2Model(config) self.classifier = Wav2Vec2ClassificationHead(config) self.init_weights() def freeze_feature_extractor(self): self.wav2vec2.feature_extractor._freeze_parameters() def merged_strategy( self, hidden_states, mode="mean" ): if mode == "mean": outputs = torch.mean(hidden_states, dim=1) elif mode == "sum": outputs = torch.sum(hidden_states, dim=1) elif mode == "max": outputs = torch.max(hidden_states, dim=1)[0] else: raise Exception( "The pooling method hasn't been defined! Your pooling mode must be one of these ['mean', 'sum', 'max']") return outputs def forward( self, input_values, attention_mask=None, output_attentions=None, output_hidden_states=None, return_dict=None, labels=None, ): return_dict = return_dict if return_dict is not None else self.config.use_return_dict outputs = self.wav2vec2( input_values, attention_mask=attention_mask, output_attentions=output_attentions, output_hidden_states=output_hidden_states, return_dict=return_dict, ) hidden_states = outputs[0] hidden_states = self.merged_strategy(hidden_states, mode=self.pooling_mode) logits = self.classifier(hidden_states) loss = None if labels is not None: if self.config.problem_type is None: if self.num_labels == 1: self.config.problem_type = "regression" elif self.num_labels > 1 and (labels.dtype == torch.long or labels.dtype == torch.int): self.config.problem_type = "single_label_classification" else: self.config.problem_type = "multi_label_classification" if self.config.problem_type == "regression": loss_fct = MSELoss() loss = loss_fct(logits.view(-1, self.num_labels), labels) elif self.config.problem_type == "single_label_classification": loss_fct = CrossEntropyLoss() loss = loss_fct(logits.view(-1, self.num_labels), labels.view(-1)) elif self.config.problem_type == "multi_label_classification": loss_fct = BCEWithLogitsLoss() loss = loss_fct(logits, labels) if not return_dict: output = (logits,) + outputs[2:] return ((loss,) + output) if loss is not None else output return SpeechClassifierOutput( loss=loss, logits=logits, hidden_states=outputs.hidden_states, attentions=outputs.attentions, ) class HubertClassificationHead(nn.Module): """Head for hubert classification task.""" def __init__(self, config): super().__init__() self.dense = nn.Linear(config.hidden_size, config.hidden_size) self.dropout = nn.Dropout(config.final_dropout) self.out_proj = nn.Linear(config.hidden_size, config.num_labels) def forward(self, features, **kwargs): x = features x = self.dropout(x) x = self.dense(x) x = torch.tanh(x) x = self.dropout(x) x = self.out_proj(x) return x class HubertForSpeechClassification(HubertPreTrainedModel): def __init__(self, config): super().__init__(config) self.num_labels = config.num_labels self.pooling_mode = config.pooling_mode self.config = config self.hubert = HubertModel(config) self.classifier = HubertClassificationHead(config) self.init_weights() def freeze_feature_extractor(self): self.hubert.feature_extractor._freeze_parameters() def merged_strategy( self, hidden_states, mode="mean" ): if mode == "mean": outputs = torch.mean(hidden_states, dim=1) elif mode == "sum": outputs = torch.sum(hidden_states, dim=1) elif mode == "max": outputs = torch.max(hidden_states, dim=1)[0] else: raise Exception( "The pooling method hasn't been defined! Your pooling mode must be one of these ['mean', 'sum', 'max']") return outputs def forward( self, input_values, attention_mask=None, output_attentions=None, output_hidden_states=None, return_dict=None, labels=None, ): return_dict = return_dict if return_dict is not None else self.config.use_return_dict outputs = self.hubert( input_values, attention_mask=attention_mask, output_attentions=output_attentions, output_hidden_states=output_hidden_states, return_dict=return_dict, ) hidden_states = outputs[0] hidden_states = self.merged_strategy(hidden_states, mode=self.pooling_mode) logits = self.classifier(hidden_states) loss = None if labels is not None: if self.config.problem_type is None: if self.num_labels == 1: self.config.problem_type = "regression" elif self.num_labels > 1 and (labels.dtype == torch.long or labels.dtype == torch.int): self.config.problem_type = "single_label_classification" else: self.config.problem_type = "multi_label_classification" if self.config.problem_type == "regression": loss_fct = MSELoss() loss = loss_fct(logits.view(-1, self.num_labels), labels) elif self.config.problem_type == "single_label_classification": loss_fct = CrossEntropyLoss() loss = loss_fct(logits.view(-1, self.num_labels), labels.view(-1)) elif self.config.problem_type == "multi_label_classification": loss_fct = BCEWithLogitsLoss() loss = loss_fct(logits, labels) if not return_dict: output = (logits,) + outputs[2:] return ((loss,) + output) if loss is not None else output return SpeechClassifierOutput( loss=loss, logits=logits, hidden_states=outputs.hidden_states, attentions=outputs.attentions, )