import torch from torch import nn import torchaudio from transformers import PreTrainedModel, AutoModelForCausalLM, AutoTokenizer, HubertModel, AutoProcessor, AutoConfig, AutoModel from .config import SpeechLLMModelConfig from peft import LoraConfig, get_peft_model class HubertXCNNEnoder(nn.Module): def __init__(self, audio_enc_dim, llm_dim, encoder_name): super().__init__() config = AutoConfig.from_pretrained(encoder_name) self.encoder = AutoModel.from_config(config) self.cnn = nn.Sequential( nn.ReLU(), nn.Conv1d(audio_enc_dim, llm_dim // 2, kernel_size=5, stride=1, padding=0), nn.ReLU(), nn.Conv1d(llm_dim // 2, llm_dim, kernel_size=5, stride=2, padding=0), nn.ReLU(), nn.Conv1d(llm_dim, llm_dim, kernel_size=3, stride=1, padding=0), ) def forward(self, x): x = self.encoder(x).last_hidden_state x = self.cnn(x.transpose(1, 2)).transpose(1, 2) return x def return_device(self): return next(self.parameters()).device class SpeechLLMModel(PreTrainedModel): config_class = SpeechLLMModelConfig def __init__(self, config): super().__init__(config) self.audio_processor = AutoProcessor.from_pretrained(config.audio_processor_name) self.audio_encoder = HubertXCNNEnoder(config.audio_enc_dim, config.llm_dim, config.audio_encoder_name) llm_config = AutoConfig.from_pretrained(config.llm_model_name) self.llm_model = AutoModelForCausalLM.from_config(llm_config) self.llm_tokenizer = AutoTokenizer.from_pretrained(config.llm_model_name) peft_config = LoraConfig( r=4, lora_alpha=8, target_modules=['q_proj', 'k_proj', 'v_proj', 'o_proj', 'up_proj', 'down_proj', 'gate_proj'], lora_dropout=0.05, task_type="CAUSAL_LM", ) self.llm_model = get_peft_model(self.llm_model, peft_config) self.llm_model = self.llm_model.merge_and_unload() def encode(self, mel, pre_tokenized_ids, post_tokenized_ids, output_tokenized_ids): batch_size = mel.shape[0] with torch.no_grad(): speech_embeds = self.audio_encoder(mel) embedder = self.llm_model.model.embed_tokens pre_prompt_embeds = embedder(pre_tokenized_ids) post_prompt_embeds = embedder(post_tokenized_ids) output_prompt_embeds = embedder(output_tokenized_ids) combined_embeds = torch.cat([pre_prompt_embeds, speech_embeds, post_prompt_embeds, output_prompt_embeds], dim=1) atts = torch.ones(combined_embeds.size()[:-1], dtype=torch.long).to(combined_embeds.device) input_token_length = pre_tokenized_ids.shape[1] + speech_embeds.shape[1] + post_tokenized_ids.shape[1] label_ids = torch.cat([ torch.ones([batch_size, input_token_length], device=combined_embeds.device) * -100, output_tokenized_ids ], 1).to(combined_embeds.device).to(torch.int64) return combined_embeds, atts, label_ids def forward(self, wav_tensor, pre_tokenized_ids, post_tokenized_ids, output_tokenized_ids, attention_mask=None): combined_embeds, atts, label_ids = self.encode(wav_tensor, pre_tokenized_ids, post_tokenized_ids, output_tokenized_ids) outputs = self.llm_model(inputs_embeds=combined_embeds, attention_mask=attention_mask) return outputs def generate_meta(self, audio_path, instruction="Give me the following information about the audio [Transcript]", max_new_tokens=2000): device = self.audio_encoder.return_device() pre_speech_prompt = f'''Instruction: {instruction} Input: ''' post_speech_prompt = f''' Output:''' output_prompt = '\n' with torch.no_grad(): wav_tensor, sr = torchaudio.load(audio_path) wav_tensor = self.audio_processor(wav_tensor.squeeze(), return_tensors="pt", sampling_rate=16000).input_values pre_tokenized_ids = self.llm_tokenizer(pre_speech_prompt, padding="do_not_pad", return_tensors='pt', truncation=False, add_special_tokens=False)["input_ids"] post_tokenized_ids = self.llm_tokenizer(post_speech_prompt, padding="do_not_pad", return_tensors='pt', truncation=False, add_special_tokens=False)["input_ids"] output_tokenized_ids = self.llm_tokenizer(output_prompt, padding="do_not_pad", return_tensors='pt', truncation=False, add_special_tokens=False)["input_ids"] combined_embeds, atts, label_ids = self.encode(wav_tensor.to(device), pre_tokenized_ids.to(device), post_tokenized_ids.to(device), output_tokenized_ids.to(device)) out = self.llm_model.generate( inputs_embeds=combined_embeds, max_new_tokens=max_new_tokens, ).cpu().tolist()[0] output_text = self.llm_tokenizer.decode(out, skip_special_tokens=True) return output_text