from typing import Optional, Tuple, Union import torch from torch import nn from transformers.modeling_outputs import BaseModelOutput from transformers import Wav2Vec2BertModel, Wav2Vec2BertConfig, Wav2Vec2BertPreTrainedModel from transformers.models.mllama.configuration_mllama import MllamaTextConfig class Llama3Embedding(Wav2Vec2BertPreTrainedModel): base_model_prefix = "audio_model" def __init__(self, config: Wav2Vec2BertConfig, text_config: MllamaTextConfig): super().__init__(config) assert config.add_adapter is True, f'{type(self).__name__} requires add adapter to be true.' assert config.output_hidden_size == text_config.hidden_size self.text_embeddings = nn.Embedding(text_config.vocab_size, text_config.hidden_size, text_config.pad_token_id) self.audio_embedding = Wav2Vec2BertModel(config) self.start_of_audio = nn.Parameter(data=torch.zeros((1, config.output_hidden_size)), requires_grad=True) self.end_of_audio = nn.Parameter(data=torch.zeros((1, config.output_hidden_size)), requires_grad=True) self.text_config = text_config def _init_weights(self, module): std = self.text_config.initializer_range """Initialize the weights""" if isinstance(module, Wav2Vec2BertSelfAttention): if hasattr(module, "pos_bias_u"): nn.init.xavier_uniform_(module.pos_bias_u) if hasattr(module, "pos_bias_v"): nn.init.xavier_uniform_(module.pos_bias_v) elif isinstance(module, Wav2Vec2BertFeatureProjection): k = math.sqrt(1 / module.projection.in_features) nn.init.uniform_(module.projection.weight, a=-k, b=k) nn.init.uniform_(module.projection.bias, a=-k, b=k) elif isinstance(module, nn.Linear): module.weight.data.normal_(mean=0.0, std=self.config.initializer_range) if module.bias is not None: module.bias.data.zero_() elif isinstance(module, (nn.LayerNorm, nn.GroupNorm)): module.bias.data.zero_() module.weight.data.fill_(1.0) elif isinstance(module, nn.Conv1d): nn.init.kaiming_normal_(module.weight) if module.bias is not None: k = math.sqrt(module.groups / (module.in_channels * module.kernel_size[0])) nn.init.uniform_(module.bias, a=-k, b=k) elif isinstance(module, nn.Embedding): module.weight.data.normal_(mean=0.0, std=std) if module.padding_idx is not None: module.weight.data[module.padding_idx].zero_() elif isinstance(module, nn.Parameter): module.data.normal_(mean=0.0, std=std) def forward( self, input_ids: torch.LongTensor = None, audio_features: Optional[torch.Tensor] = None, ) -> Union[BaseModelOutput, Tuple[torch.Tensor, ...]]: input_embeddings = self.text_embeddings(torch.clamp(input_ids, min=0)) if audio_features is None: return input_embeddings bs, max_num_img, l, d = audio_features.shape audio_embeddings = self.audio_embedding(input_features=audio_features.view((bs*max_num_img, l, d)))['last_hidden_state'] audio_embeddings = audio_embeddings.view((bs, max_num_img, -1, self.start_of_audio.shape[-1])) for i in range(bs): for j in range(max_num_img): audio_id = -1 - j if torch.any(input_ids[i] == audio_id): positions = torch.nonzero(input_ids[i] == audio_id, as_tuple=True) seq_len = input_embeddings[i][positions].shape[0] - 2 input_embeddings[i] = input_embeddings[i].index_put(positions, torch.concat([self.start_of_audio, audio_embeddings[i, j, :seq_len, :], self.end_of_audio]), accumulate=False) return input_embeddings