import itertools from typing import List, Optional, Tuple, Union import torch from torch import nn from transformers.modeling_outputs import BaseModelOutput from transformers import Wav2Vec2BertModel, MllamaPreTrainedModel from .configuration_llama3 import MllamaAudioConfig class MllamaAudioModel(MllamaPreTrainedModel): config_class = MllamaAudioConfig base_model_prefix = "audio_model" def __init__(self, config: MllamaAudioConfig, text_embedding: nn.Embedding): super().__init__(config) assert config.add_adapter is True, f'{type(self).__name__} requires add adapter to be true.' assert config.output_hidden_size == text_embedding.weight.shape[1], f'Output hidden size({config.output_hidden_size}) of audio model and text embedding({text_embedding.weight.shape[1]}) must match!' self.text_embedding = text_embedding self.audio_embedding = Wav2Vec2BertModel(config) self.start_of_audio = nn.Parameter(data=torch.mean(text_embedding.weight, dim=0).unsqueeze(0), requires_grad=True) self.end_of_audio = nn.Parameter(data=torch.mean(text_embedding.weight, dim=0).unsqueeze(0), requires_grad=True) self.filler_token_id = config.filler_token_id def forward( self, audio_features: torch.Tensor = None, input_ids: torch.LongTensor = None, return_dict: Optional[bool] = None, ) -> Union[BaseModelOutput, Tuple[torch.Tensor, ...]]: input_embeddings = self.text_embedding(torch.clamp(input_ids, min=0)) bs, max_num_img, _, _ = audio_features.shape for i in range(bs): for j in range(max_num_img): audio_id = -1 - j idx = torch.where(input_ids[i] == audio_id) if idx.numel() > 0: input_embeddings[i][idx] = torch.concat([self.start_of_audio, audio_features[i, j][idx], self.end_of_audio]) idx = torch.where(input_ids < 0 and input_ids >= -max_num_img) input_ids[idx].fill_(self.filler_token_id) if return_dict: return dict(input_embeddings=input_embeddings) return input_embeddings