test_mllama_11B_v3 / mllama_audio_model.py
AlexHung29629's picture
Update mllama_audio_model.py
9ff5fb4 verified
raw
history blame
4.23 kB
from typing import Optional, Tuple, Union
import torch
from torch import nn
from transformers.modeling_outputs import BaseModelOutput
from transformers import Wav2Vec2BertModel, Wav2Vec2BertConfig, MllamaPreTrainedModel
from transformers.models.wav2vec2_bert.modeling_wav2vec2_bert import Wav2Vec2BertAdapterLayer
from .configuration_llama3 import Llama3Config
class AudioAdapter(nn.Module):
def __init__(self, config: Wav2Vec2BertConfig):
super().__init__()
# feature dim might need to be down-projected
if config.output_hidden_size != config.hidden_size:
self.proj = nn.Linear(config.hidden_size, config.output_hidden_size)
else:
self.proj = None
self.layers = nn.ModuleList(Wav2Vec2BertAdapterLayer(config) for _ in range(config.num_adapter_layers))
self.layerdrop = config.layerdrop
self.kernel_size = config.adapter_kernel_size
self.stride = config.adapter_stride
def _compute_sub_sample_lengths_from_attention_mask(self, seq_lens):
if seq_lens is None:
return seq_lens
pad = self.stride // 2
seq_lens = ((seq_lens + 2 * pad - self.kernel_size) / self.stride) + 1
return seq_lens.floor()
def forward(self, hidden_states, attention_mask=None):
# down project hidden_states if necessary
if self.proj is not None:
hidden_states = self.proj(hidden_states)
sub_sampled_lengths = None
if attention_mask is not None:
sub_sampled_lengths = (attention_mask.size(1) - (1 - attention_mask.int()).sum(1)).to(hidden_states.device)
for layer in self.layers:
layerdrop_prob = torch.rand([])
sub_sampled_lengths = self._compute_sub_sample_lengths_from_attention_mask(sub_sampled_lengths)
if not self.training or (layerdrop_prob > self.layerdrop):
hidden_states = layer(
hidden_states, attention_mask=attention_mask, sub_sampled_lengths=sub_sampled_lengths
)
return hidden_states
class Llama3Embedding(MllamaPreTrainedModel):
config_class = Llama3Config
base_model_prefix = "audio_model"
def __init__(self, config: Llama3Config):
super().__init__(config)
assert config.audio_config.output_hidden_size == config.text_config.hidden_size
self.text_embeddings = nn.Embedding(config.text_config.vocab_size, config.text_config.hidden_size, config.text_config.pad_token_id)
config.audio_config.add_adapter = False
self.audio_encoder = Wav2Vec2BertModel(config.audio_config)
self.audio_adapter = AudioAdapter(config.audio_config)
self.start_of_audio = nn.Parameter(data=torch.zeros((1, config.audio_config.output_hidden_size)), requires_grad=True)
self.end_of_audio = nn.Parameter(data=torch.zeros((1, config.audio_config.output_hidden_size)), requires_grad=True)
self.text_config = config.text_config
def forward(
self,
input_ids: torch.LongTensor = None,
audio_features: Optional[torch.Tensor] = None,
) -> Union[BaseModelOutput, Tuple[torch.Tensor, ...]]:
input_embeddings = self.text_embeddings(torch.clamp(input_ids, min=0))
if audio_features is None:
return input_embeddings
bs, max_num_img, l, d = audio_features.shape
audio_embeddings = self.audio_encoder(input_features=audio_features.view((bs*max_num_img, l, d)))['last_hidden_state']
audio_embeddings = self.audio_adapter(audio_embeddings)
audio_embeddings = audio_embeddings.view((bs, max_num_img, -1, self.start_of_audio.shape[-1]))
for i in range(bs):
for j in range(max_num_img):
audio_id = -1 - j
if torch.any(input_ids[i] == audio_id):
positions = torch.nonzero(input_ids[i] == audio_id, as_tuple=True)
seq_len = input_embeddings[i][positions].shape[0] - 2
input_embeddings[i] = input_embeddings[i].index_put(positions, torch.concat([self.start_of_audio, audio_embeddings[i, j, :, :], self.end_of_audio]), accumulate=False)
return input_embeddings