test_llama_3 / audio_processing_mllama.py
AlexHung29629's picture
Upload 13 files
4335708 verified
import math
from typing import Dict, List, Optional, Union
import numpy as np
import torch
from transformers.tokenization_utils_base import AudioInput
from transformers.models.seamless_m4t.feature_extraction_seamless_m4t import SeamlessM4TFeatureExtractor
from transformers.utils import TensorType
from transformers.feature_extraction_utils import BatchFeature
def make_list_of_audio_clips(audio: AudioInput) -> List[List[Optional[np.ndarray]]]:
"""
Convert a single audio clip or a list of audio clips to a list of numpy arrays.
Args:
audio (`AudioInput`):
A single audio or a list of audio clips.
Returns:
A list of numpy arrays.
"""
# If it's a single audil clip, convert it to a list of lists
if not isinstance(audio, (list, tuple)):
output = [[audio]]
else:
if all(isinstance(audio_i, (list, tuple)) for audio_i in audio):
# If it's a list of batches, it's already in the right format
output = audio
else:
# If it's a list of audio clips, it's a single batch, so convert it to a list of lists
output = [audio]
return output
def build_audio_tokens(encoding: Dict, audio_features: List[List[np.ndarray]], audio_token_id: int) -> Dict:
bs = len(audio_features)
for i in range(bs):
for j in range(len(audio_features[i])):
token_id = -1 - j
pos = encoding['input_ids'][i].index(audio_token_id)
encoding['input_ids'][i] = encoding['input_ids'][i][:pos] \
+ [token_id] * get_num_embeddings(audio_features[i][j].size(0)) \
+ encoding['input_ids'][i][pos+1:]
encoding['attention_mask'][i] = [1] * len(encoding['input_ids'][i])
return encoding
def get_num_embeddings(num_framses, adapter_kernel_size=7, adapter_stride=4) -> int:
return math.ceil((num_framses - adapter_kernel_size) / adapter_stride) + 1 + 2 # 2 = <|begin_of_audio|>, <|end_of_audio|>
class MllamaAudioFeatureExtractor(SeamlessM4TFeatureExtractor):
def __call__(
self,
batch_audio_clips: List[List[AudioInput]],
return_tensors: Optional[Union[str, TensorType]] = None,
) -> BatchFeature:
audio_features = [[ super().__call__(audio_j, return_attention_mask=False)['input_features'][0] for audio_j in audio_i ] for audio_i in batch_audio_clips ]
packed_audio_features = self.pack_audio_clips(audio_features)
encoded_audio_inputs = BatchFeature(
data={
"audio_features": packed_audio_features,
},
tensor_type=return_tensors,
)
return encoded_audio_inputs
def pack_audio_clips(batch_audio_clips: List[List[np.ndarray]]) -> np.ndarray:
assert batch_audio_clips[0][0].ndim == 2 # sequence length x feature dimension
# Determine output shape: (batch_size, max_num_clips, max_frames, feature_dim)
batch_size = len(batch_audio_clips)
max_num_clips = max([len(clips) for clips in batch_audio_clips])
max_frames = max([clip.size(0) for clips in batch_audio_clips for clip in clips])
feature_dim = batch_audio_clips[0][0].size(1)
stacked_audio_clips = np.zeros((batch_size, max_num_clips, max_frames, feature_dim), dtype=np.float32)
for i, clips in enumerate(batch_audio_clips):
for j, clip in enumerate(clips):
stacked_audio_clips[i, j, :clip.shape[0], :] = clip
return stacked_audio_clips