|
import math |
|
from typing import Dict, List, Optional, Union |
|
import numpy as np |
|
import torch |
|
from transformers.tokenization_utils_base import AudioInput |
|
from transformers.models.seamless_m4t.feature_extraction_seamless_m4t import SeamlessM4TFeatureExtractor |
|
from transformers.utils import TensorType |
|
from transformers.feature_extraction_utils import BatchFeature |
|
|
|
|
|
|
|
def make_list_of_audio_clips(audio: AudioInput) -> List[List[Optional[np.ndarray]]]: |
|
""" |
|
Convert a single audio clip or a list of audio clips to a list of numpy arrays. |
|
|
|
Args: |
|
audio (`AudioInput`): |
|
A single audio or a list of audio clips. |
|
|
|
Returns: |
|
A list of numpy arrays. |
|
""" |
|
|
|
if not isinstance(audio, (list, tuple)): |
|
output = [[audio]] |
|
|
|
else: |
|
if all(isinstance(audio_i, (list, tuple)) for audio_i in audio): |
|
|
|
output = audio |
|
else: |
|
|
|
output = [audio] |
|
|
|
return output |
|
|
|
def build_audio_tokens(encoding: Dict, audio_features: List[List[np.ndarray]], audio_token_id: int) -> Dict: |
|
bs = len(audio_features) |
|
for i in range(bs): |
|
for j in range(len(audio_features[i])): |
|
token_id = -1 - j |
|
pos = encoding['input_ids'][i].index(audio_token_id) |
|
encoding['input_ids'][i] = encoding['input_ids'][i][:pos] \ |
|
+ [token_id] * get_num_embeddings(audio_features[i][j].size(0)) \ |
|
+ encoding['input_ids'][i][pos+1:] |
|
encoding['attention_mask'][i] = [1] * len(encoding['input_ids'][i]) |
|
return encoding |
|
|
|
def get_num_embeddings(num_framses, adapter_kernel_size=7, adapter_stride=4) -> int: |
|
return math.ceil((num_framses - adapter_kernel_size) / adapter_stride) + 1 + 2 |
|
|
|
|
|
class MllamaAudioFeatureExtractor(SeamlessM4TFeatureExtractor): |
|
|
|
def __call__( |
|
self, |
|
batch_audio_clips: List[List[AudioInput]], |
|
return_tensors: Optional[Union[str, TensorType]] = None, |
|
) -> BatchFeature: |
|
audio_features = [[ super().__call__(audio_j, return_attention_mask=False)['input_features'][0] for audio_j in audio_i ] for audio_i in batch_audio_clips ] |
|
packed_audio_features = self.pack_audio_clips(audio_features) |
|
|
|
encoded_audio_inputs = BatchFeature( |
|
data={ |
|
"audio_features": packed_audio_features, |
|
}, |
|
tensor_type=return_tensors, |
|
) |
|
|
|
return encoded_audio_inputs |
|
|
|
def pack_audio_clips(batch_audio_clips: List[List[np.ndarray]]) -> np.ndarray: |
|
assert batch_audio_clips[0][0].ndim == 2 |
|
|
|
batch_size = len(batch_audio_clips) |
|
max_num_clips = max([len(clips) for clips in batch_audio_clips]) |
|
max_frames = max([clip.size(0) for clips in batch_audio_clips for clip in clips]) |
|
feature_dim = batch_audio_clips[0][0].size(1) |
|
|
|
stacked_audio_clips = np.zeros((batch_size, max_num_clips, max_frames, feature_dim), dtype=np.float32) |
|
for i, clips in enumerate(batch_audio_clips): |
|
for j, clip in enumerate(clips): |
|
stacked_audio_clips[i, j, :clip.shape[0], :] = clip |
|
|
|
return stacked_audio_clips |