""" 视觉编码器 """ #视觉编码器 from transformers import CLIPModel from transformers import CLIPConfig vision_config=CLIPConfig.from_pretrained("openai/clip-vit-base-patch32") clip_model = CLIPModel._from_config(vision_config) vision_model=clip_model.vision_model vision_projection=clip_model.visual_projection #自实现qwen2.5-0.5B """ 语言模型 """ import torch import torch.nn as nn #from torch.nn.attention import SDPBackend, sdpa_kernel #所有decoder层共用一个Qwen2RotaryEmbedding,减少模型体积 #llama系的RoPE实现 def rotate_half(x): """Rotates half the hidden dims of the input.""" x1 = x[..., : x.shape[-1] // 2] x2 = x[..., x.shape[-1] // 2 :] return torch.cat((-x2, x1), dim=-1) class Qwen2RotaryEmbedding(nn.Module): def __init__(self, head_dim, max_position_embeddings=2048, base=10000, device=None): super().__init__() self.dim = head_dim self.max_position_embeddings = max_position_embeddings self.base = base inv_freq = 1.0 / (self.base ** (torch.arange(0, self.dim, 2).float().to(device) / self.dim)) self.register_buffer("inv_freq", inv_freq, persistent=False) # Build here to make `torch.jit.trace` work. self._set_cos_sin_cache( # seq_len=max_position_embeddings, device=self.inv_freq.device, dtype=torch.get_default_dtype() seq_len=max_position_embeddings, device=self.inv_freq.device, dtype=torch.float32 ) def _set_cos_sin_cache(self, seq_len, device, dtype): self.max_seq_len_cached = seq_len t = torch.arange(self.max_seq_len_cached, device=device, dtype=self.inv_freq.dtype) freqs = torch.outer(t, self.inv_freq) # Different from paper, but it uses a different permutation in order to obtain the same calculation emb = torch.cat((freqs, freqs), dim=-1) self.register_buffer("cos_cached", emb.cos().to(dtype), persistent=False) self.register_buffer("sin_cached", emb.sin().to(dtype), persistent=False) def forward(self, q,k,use_cache=False): seq_len = k.size(2) # x: [bs, num_attention_heads, seq_len, head_size] if seq_len > self.max_seq_len_cached: self._set_cos_sin_cache(seq_len=seq_len, device=q.device, dtype=q.dtype) cos_pos=self.cos_cached[:seq_len].to(dtype=q.dtype).unsqueeze(0).unsqueeze(0) sin_pos=self.sin_cached[:seq_len].to(dtype=q.dtype).unsqueeze(0).unsqueeze(0) #print(cos_pos.size()) if use_cache: q_embed=q*cos_pos[:,:,-1,:].unsqueeze(1)+rotate_half(q)*sin_pos[:,:,-1,:].unsqueeze(1) else: q_embed=q*cos_pos+rotate_half(q)*sin_pos k_embed=k*cos_pos+rotate_half(k)*sin_pos #print(q_embed.size()) #print(k_embed.size()) return q_embed,k_embed """ 分组注意力层 """ def repeat_kv(hidden_states: torch.Tensor, n_rep: int) -> torch.Tensor: """ This is the equivalent of torch.repeat_interleave(x, dim=1, repeats=n_rep). The hidden states go from (batch, num_key_value_heads, seqlen, head_dim) to (batch, num_attention_heads, seqlen, head_dim) """ batch, num_key_value_heads, slen, head_dim = hidden_states.shape if n_rep == 1: return hidden_states # 如果 n_rep 为 1,则无需重复,直接返回 # 在 dim=2(即 seqlen 维度之间插入一个新维度),并扩展到 (batch, num_key_value_heads, n_rep, slen, head_dim) hidden_states = hidden_states[:, :, None, :, :].expand(batch, num_key_value_heads, n_rep, slen, head_dim) # 将其形状调整为 (batch, num_key_value_heads * n_rep, slen, head_dim) return hidden_states.reshape(batch, num_key_value_heads * n_rep, slen, head_dim) import math class Qwen2SdpaAttention(nn.Module): def __init__(self,hidden_size,num_attention_heads,num_kv_heads): super(Qwen2SdpaAttention,self).__init__() self.hidden_size=hidden_size self.num_attention_heads=num_attention_heads self.attention_head_size=hidden_size//num_attention_heads self.num_kv_heads=num_kv_heads self.id=id self.q_proj=nn.Linear(hidden_size,hidden_size,bias=True) self.k_proj=nn.Linear(hidden_size,hidden_size//(num_attention_heads//num_kv_heads),bias=True) self.v_proj=nn.Linear(hidden_size,hidden_size//(num_attention_heads//num_kv_heads),bias=True) self.o_proj=nn.Linear(hidden_size,hidden_size,bias=False) self.rotary_emb=nn.Identity() #self.rotary_emb=Qwen2RotaryEmbedding(head_dim=self.attention_head_size,max_position_embeddings=max_position_embeddings,dtype=dtype) def forward(self,input_ids,attention_mask,position_embedding,use_cache=False,past_kv=None,id=None): """ 如果启用kv缓存,输入的是一个单词的embedding,形状为[batch_size,1,hidden_size] q的形状是[batch_size,1,hidden_size] k的形状为[batch_size,seq_len,hidden_size//(num_attention_heads//num_kv_heads)] v的形状为[batch_size,seq_len,hidden_size//(num_attention_heads//num_kv_heads)] 考虑到预启动阶段。 """ batch_size,seq_len,_=input_ids.size() q=self.q_proj(input_ids) k=self.k_proj(input_ids) v=self.v_proj(input_ids) if use_cache: if id not in past_kv.keys(): past_kv[id]=k,v flag=True else: k_cache,v_cache=past_kv[id] k=torch.cat((k_cache,k),dim=1) v=torch.cat((v_cache,v),dim=1) past_kv[id]=(k,v) flag=False #转化成多头 permute是根据当前填入位置选择索引 q=q.view(batch_size,-1,self.num_attention_heads,self.attention_head_size).permute(0,2,1,3) #print(q.size()) k=k.view(batch_size,-1,self.num_kv_heads,self.attention_head_size).permute(0,2,1,3) v=v.view(batch_size,-1,self.num_kv_heads, self.attention_head_size).permute(0, 2, 1, 3) #旋转位置编码 if position_embedding is not None: q,k=position_embedding(q,k,use_cache=use_cache) else: q,k=self.rotary_emb(q,k,use_cache=use_cache) #计算分组注意力层 k=repeat_kv(k,self.num_attention_heads//self.num_kv_heads) v=repeat_kv(v,self.num_attention_heads//self.num_kv_heads) #print(k.size()) #print(v.size()) #casual_mask=torch.tril(torch.ones(1,1,seq_len,seq_len)).to(input_ids.device) #attention_mask=attention_mask.unsqueeze(1).unsqueeze(-1) #att_mask=attention_mask*casual_mask #print(q.dtype) #print(k.dtype) #print(v.dtype) #with sdpa_kernel(SDPBackend.FLASH_ATTENTION): attention_logits=F.scaled_dot_product_attention(q, k, v, is_causal=flag) attention_logits=attention_logits.permute(0,2,1,3).contiguous().view(batch_size,seq_len,self.hidden_size) attention_output=self.o_proj(attention_logits) return attention_output #激活函数 import torch.nn.functional as F class SiLU(nn.Module): def __init__(self): super().__init__() def forward(self, input): return F.silu(input, inplace=False) #前馈层 import torch import torch.nn as nn import torch.nn.functional as F class Qwen2MLP(nn.Module): def __init__(self,input_dim,expand_dim): super(Qwen2MLP,self).__init__() self.gate_proj=nn.Linear(input_dim,expand_dim,bias=False) self.up_proj=nn.Linear(input_dim,expand_dim,bias=False) self.down_proj=nn.Linear(expand_dim,input_dim,bias=False) self.act_fn=SiLU() def forward(self,x): return self.down_proj(self.act_fn(self.gate_proj(x)) * self.up_proj(x)) #qwenRMSNorm class Qwen2RMSNorm(nn.Module): def __init__(self,hidden_size,eps=1e-6): super().__init__() self.weight=nn.Parameter(torch.ones(hidden_size)) self.variance_epsilon=eps def forward(self,hidden_states): old_dtype=hidden_states.dtype hidden_states = hidden_states.to(torch.float32) variance=hidden_states.pow(2).mean(-1,keepdim=True) hidden_states=hidden_states*torch.rsqrt(variance+self.variance_epsilon) return self.weight*hidden_states.to(old_dtype) #decoder层 class Qwen2DecoderLayer(nn.Module): def __init__(self,hidden_size,num_attention_heads,num_kv_heads,expand_dim): super(Qwen2DecoderLayer, self).__init__() self.self_attn =Qwen2SdpaAttention(hidden_size=hidden_size,num_attention_heads=num_attention_heads,num_kv_heads=num_kv_heads) self.mlp=Qwen2MLP(input_dim=hidden_size,expand_dim=expand_dim) self.input_layernorm=Qwen2RMSNorm(hidden_size) self.post_attention_layernorm=Qwen2RMSNorm(hidden_size) def forward(self,hidden_states,attention_mask,position_embedding,use_cache=False,past_kv=None,id=None): residual=hidden_states hidden_states=self.input_layernorm(hidden_states) output=self.self_attn(hidden_states,attention_mask,position_embedding,use_cache=use_cache,past_kv=past_kv,id=id) output_=residual+output residual=output_ output_=self.post_attention_layernorm(output_) output_=self.mlp(output_) output_=residual+output_ return output_ #模型主体 class Qwen2Model(nn.Module): def __init__(self,vocab_size,hidden_size,num_layers,num_attention_heads,num_kv_heads,max_position_embeddings,expand_dim): super().__init__() self.embed_tokens=nn.Embedding(vocab_size,hidden_size) self.layers=nn.ModuleList( [Qwen2DecoderLayer(hidden_size=hidden_size,num_attention_heads=num_attention_heads,num_kv_heads=num_kv_heads,expand_dim=expand_dim) for _ in range(num_layers)] ) self.norm=Qwen2RMSNorm(hidden_size) self.rotary_emb=Qwen2RotaryEmbedding(head_dim=hidden_size//num_attention_heads,max_position_embeddings=max_position_embeddings) def forward(self,input_ids,attention_mask,use_cache=False,past_kv=None): token_embed=self.embed_tokens(input_ids) #with sdpa_kernel(SDPBackend.FLASH_ATTENTION): for index,layer in enumerate(self.layers): token_embed=layer(token_embed,attention_mask,self.rotary_emb,use_cache=use_cache,past_kv=past_kv,id=index) token_embed=self.norm(token_embed) return token_embed #文本预测生成模型 class Qwen2ForCausalLM(nn.Module): def __init__(self, config): super().__init__() self.config = config self.model=Qwen2Model(vocab_size=config.vocab_size, hidden_size=config.hidden_size, num_layers=config.num_layers, num_attention_heads=config.num_attention_heads,num_kv_heads=config.num_kv_heads,expand_dim=config.expand_dim,max_position_embeddings=config.max_position_embeddings) self.lm_head=nn.Linear(config.hidden_size,config.vocab_size,bias=False) self.dtype=config.dtype def forward(self,input_ids,attention_mask,use_cache=False,past_kv=None): if use_cache: if past_kv is None: past_kv={} output=self.model(input_ids=input_ids,attention_mask=attention_mask,use_cache=use_cache,past_kv=past_kv) logits=self.lm_head(output) return logits,past_kv else: output=self.model(input_ids=input_ids,attention_mask=attention_mask) logits=self.lm_head(output) return logits class Qwen2config: def __init__(self): self.name = "Qwen2.5-0.5B" self.vocab_size=151936 self.hidden_size=896 self.num_layers=24 self.num_kv_heads=2 self.num_attention_heads=14 self.max_position_embeddings= 32768 self.expand_dim=4864 self.dtype=torch.float16 config=Qwen2config() qwen_model=Qwen2ForCausalLM(config) #qwenva模型主体实现 #对齐层 class AlignLayer(torch.nn.Module): def __init__(self,text1_dim,text2_dim,expand_dim): super(AlignLayer, self).__init__() self.vision_proj=vision_projection.to(dtype=config.dtype) self.expand_proj=torch.nn.Linear(text1_dim,expand_dim) self.text_proj=torch.nn.Linear(expand_dim,text2_dim) self.activate=torch.nn.SiLU() def forward(self,vision_embedding): embed=self.vision_proj(vision_embedding) embed=self.expand_proj(embed) embed=self.activate(embed) embed=self.text_proj(embed) return embed text_model=qwen_model rotary_emb=text_model.model.rotary_emb text_embedding=text_model.model.embed_tokens transformer=text_model.model.layers lm_head=text_model.lm_head from transformers import AutoTokenizer model_name="Qwen/Qwen2.5-0.5B" tokenizer=AutoTokenizer.from_pretrained(model_name) tokenizer.add_special_tokens({"additional_special_tokens": [""]}) from huggingface_hub import PyTorchModelHubMixin class Qwenva(torch.nn.Module,PyTorchModelHubMixin): def __init__(self,text1_dim,text2_dim,expand_dim,dtype=config.dtype): super(Qwenva, self).__init__() self.vision_encoder=vision_model.to(dtype=config.dtype) self.text_embedding=text_embedding self.align_layer=AlignLayer(text1_dim,text2_dim,expand_dim).to(dtype) # 确保 align_layer 的参数梯度可用 self.transformer=transformer self.rotary_emb=rotary_emb #for param in self.rotary_emb.parameters(): #param.requires_grad = False self.lm_head=lm_head self.tokenizer=tokenizer def forward(self,input_ids,attention_mask,pixel_values=None,image_idx=None,use_cache=True,past_kv=None): #print(align_embedding.shape) if past_kv is None and pixel_values is not None: token_embedding=self.text_embedding(input_ids) batch_size=input_ids.shape[0] vision_embedding=self.vision_encoder(pixel_values)[1] #print(vision_embedding.shape,attention_mask.shape) align_embedding=self.align_layer(vision_embedding) #print(align_embedding.shape) #print(vision_embedding.shape,attention_mask.shape) align_embedding=self.align_layer(vision_embedding) mix_embedding=token_embedding.clone() #print(mix_embedding.shape) #print(align_embedding.shape) #print(image_idx.shape) #生成有效的嵌入位置坐标,image_idx的形状为[batch_size,1] valid_indices = image_idx.ne(-100) #print(valid_indices.squeeze()) valid_positions = torch.arange(batch_size).to(input_ids.device) #print(valid_positions) valid_positions = valid_positions[valid_indices.squeeze()].squeeze() #print(valid_positions) valid_image_idx =image_idx[valid_positions] #print(valid_image_idx) mix_embedding[valid_positions,valid_image_idx] = align_embedding[valid_positions] past_kv={} else: mix_embedding=self.text_embedding(input_ids) for index,layer in enumerate(self.transformer): mix_embedding=layer(mix_embedding,attention_mask,position_embedding=self.rotary_emb,use_cache=use_cache,past_kv=past_kv,id=index) #print(mix_embedding.shape) logits=self.lm_head(mix_embedding) if use_cache: return logits,past_kv else: return logits def generate(self,input_ids,attention_mask,pixel_values=None,image_idx=None,temperature=1,top_k=2,repetition_penalty=1.0,max_length=300): import math device=input_ids.device #system_user_len=input_ids.shape[1] token_eos = torch.tensor(tokenizer.encode('<|im_end|>')).to(device) # 终止符,遇到该字符就结束推理 out_token = None #start_token=input_ids temperature=temperature top_k=top_k repetition_penalty =repetition_penalty # 重复惩罚 import torch.nn.functional as F past_kv=None with torch.no_grad(): while out_token != token_eos and len(input_ids[0,:]) 1: for i in input_ids[0]: logits[0,-1,i] /= repetition_penalty #top_k采样 top_k_logits,top_k_indices=torch.topk(logits[0,-1,:],k=top_k) out_token=top_k_indices[torch.multinomial(F.softmax(top_k_logits/temperature,dim=-1),num_samples=1)].unsqueeze(0) #最大采样 #out_token=torch.argmax(logits[0,-1,:]).unsqueeze(0).unsqueeze(0) #start_token=out_token input_ids =torch.cat([input_ids ,out_token], dim=1) # 每次都把之前的所有token与推理得到的新token拼接起来作为下次的输入 attention_mask = torch.cat([attention_mask,torch.ones(1,1).to(device)], dim=1) # 注意力掩码也要跟着变化 #text = self.tokenizer.decode(input_ids[0,:]) return input_ids #processor实现,负责与处理数据 from transformers import CLIPProcessor, AutoTokenizer processor = CLIPProcessor.from_pretrained("openai/clip-vit-base-patch32") model_name="Qwen/Qwen2.5-0.5B" tokenizer=AutoTokenizer.from_pretrained(model_name) tokenizer.add_special_tokens({"additional_special_tokens": [""]}) import torch from huggingface_hub import TextGenerationOutputToken from transformers import ProcessorMixin class Proccessor(ProcessorMixin): feature_extractor_class: str = "CLIPProcessor" tokenizer_class: str = "Qwen2TokenizerFast" def __init__(self,feature_extractor,tokenizer): super().__init__(feature_extractor=feature_extractor,tokenizer=tokenizer) self.tokenizer=tokenizer self.feature_extractor=feature_extractor self.image_token=self.tokenizer.encode('')[0] def __call__(self,input_data,input_image=None,device="cuda"): if isinstance(input_data,str): input_=self.tokenizer.apply_chat_template( [{'role':'user','content':'\n{}'.format(input_data)} ], add_generation_prompt=True,) elif isinstance(input_data,list): input_=self.tokenizer.apply_chat_template( input_data, add_generation_prompt=True, ) input_ids=torch.tensor(input_).unsqueeze(0).to(device) attention_mask=torch.ones(1,len(input_ids[0])).to(device) img_idx=input_.index(self.image_token) img_idx=torch.tensor(img_idx).unsqueeze(0).to(device) if input_image is not None: inputs = self.feature_extractor(images=input_image, return_tensors="pt") pixel_values=inputs['pixel_values'].to('cuda') return { "input_ids":input_ids, "attention_mask":attention_mask, "pixel_values":pixel_values, "image_idx":img_idx } else: return { "input_ids":input_ids, "attention_mask":attention_mask} processor=Proccessor(processor,tokenizer) model=Qwenva(512,896,4096,dtype=config.dtype) model.load_state_dict(torch.load("./qwenva.pth",weights_only=True)) model.eval()