# -*- coding: utf-8 -*- # @Time    : 2023/3/11 8:02 上午 # @Author  : NuoChen # @File    : code_classification.py ## ======== Roberta ======== import torch from torch import nn from torch.nn import CrossEntropyLoss, MSELoss, BCEWithLogitsLoss from transformers import RobertaModel from transformers.activations import ACT2FN from transformers.models.electra import ElectraModel from transformers.models.roformer import RoFormerModel from transformers.models.albert import AlbertModel from transformers.models.bert import BertModel, BertPreTrainedModel from transformers.models.deberta_v2 import DebertaV2Model, DebertaV2PreTrainedModel from transformers.modeling_outputs import SequenceClassifierOutput from transformers.models.roberta import RobertaPreTrainedModel from transformers.models.bert.modeling_bert import BertForSequenceClassification from transformers.models.megatron_bert import MegatronBertPreTrainedModel, MegatronBertModel import logging from typing import Optional, List, Union, Tuple import torch from torch._C import NoopLogger from torch.autograd import Variable import copy import torch.nn import torch.nn.functional as F from torch import Tensor from torch.nn import CrossEntropyLoss, MSELoss, BCEWithLogitsLoss from transformers import RobertaModel, RobertaPreTrainedModel from transformers.models.plbart.modeling_plbart import PLBartPreTrainedModel, PLBartClassificationHead, PLBartModel from transformers.models.plbart.configuration_plbart import PLBartConfig from transformers.models.t5.modeling_t5 import T5PreTrainedModel#, T5ClassificationHead, T5Model from transformers.models.t5.modeling_t5 import T5Config,T5Stack from transformers.modeling_outputs import SequenceClassifierOutput, Seq2SeqSequenceClassifierOutput, SequenceClassifierOutputWithPast from models.basic_modules.prefix_encoder import PrefixEncoder from models.basic_modules.adapter import BertAdaModel, RobertaAdaModel, init_adapter from tools.model_utils.parameter_freeze import ParameterFreeze freezer = ParameterFreeze() ## ======== Roberta ======== # Vanilla Fine-tuning For Roberta class RobertaForCodeClassification(RobertaPreTrainedModel): def __init__(self, config): super().__init__(config) self.num_labels = config.num_labels self.config = config self.roberta = RobertaModel(config) if self.config.use_freezing: self.roberta = freezer.freeze_lm(self.roberta) self.dropout = torch.nn.Dropout(config.hidden_dropout_prob) self.classifier = torch.nn.Linear(config.hidden_size, config.num_labels) self.init_weights() def freeze_backbone(self, use_freezing: bool=True): if use_freezing: self.roberta = freezer.freeze_lm(self.roberta) else: self.roberta = freezer.unfreeze_lm(self.roberta) def forward( self, input_ids=None, attention_mask=None, token_type_ids=None, position_ids=None, head_mask=None, inputs_embeds=None, labels=None, output_attentions=None, output_hidden_states=None, return_dict=None, ): r""" labels (:obj:`torch.LongTensor` of shape :obj:`(batch_size,)`, `optional`): Labels for computing the sequence classification/regression loss. Indices should be in :obj:`[0, ..., config.num_labels - 1]`. If :obj:`config.num_labels == 1` a regression loss is computed (Mean-Square loss), If :obj:`config.num_labels > 1` a classification loss is computed (Cross-Entropy). """ return_dict = return_dict if return_dict is not None else self.config.use_return_dict outputs = self.roberta( input_ids, attention_mask=attention_mask, token_type_ids=token_type_ids, position_ids=position_ids, head_mask=head_mask, inputs_embeds=inputs_embeds, output_attentions=output_attentions, output_hidden_states=output_hidden_states, return_dict=return_dict, ) pooled_output = outputs[1] pooled_output = self.dropout(pooled_output) logits = self.classifier(pooled_output) loss = None if labels is not None: loss_fct = CrossEntropyLoss() loss = loss_fct(logits.view(-1, self.num_labels), labels.view(-1)) if not return_dict: output = (logits,) + outputs[2:] return ((loss,) + output) if loss is not None else output return SequenceClassifierOutput( loss=loss, logits=logits, hidden_states=outputs.hidden_states, attentions=outputs.attentions, ) ## ======== CodeBERT ======== # Vanilla Fine-tuning For CodeBERT class CodeBERTForCodeClassification(RobertaPreTrainedModel): def __init__(self, config): super().__init__(config) self.num_labels = config.num_labels self.config = config self.roberta = RobertaModel(config) if self.config.use_freezing: self.roberta = freezer.freeze_lm(self.roberta) self.dropout = torch.nn.Dropout(config.hidden_dropout_prob) self.classifier = torch.nn.Linear(config.hidden_size, config.num_labels) self.init_weights() def freeze_backbone(self, use_freezing: bool=True): if use_freezing: self.roberta = freezer.freeze_lm(self.roberta) else: self.roberta = freezer.unfreeze_lm(self.roberta) def forward( self, input_ids=None, attention_mask=None, token_type_ids=None, position_ids=None, head_mask=None, inputs_embeds=None, labels=None, output_attentions=None, output_hidden_states=None, return_dict=None, ): r""" labels (:obj:`torch.LongTensor` of shape :obj:`(batch_size,)`, `optional`): Labels for computing the sequence classification/regression loss. Indices should be in :obj:`[0, ..., config.num_labels - 1]`. If :obj:`config.num_labels == 1` a regression loss is computed (Mean-Square loss), If :obj:`config.num_labels > 1` a classification loss is computed (Cross-Entropy). """ return_dict = return_dict if return_dict is not None else self.config.use_return_dict outputs = self.roberta( input_ids, attention_mask=attention_mask, token_type_ids=token_type_ids, position_ids=position_ids, head_mask=head_mask, inputs_embeds=inputs_embeds, output_attentions=output_attentions, output_hidden_states=output_hidden_states, return_dict=return_dict, ) pooled_output = outputs[1] pooled_output = self.dropout(pooled_output) logits = self.classifier(pooled_output) loss = None if labels is not None: if self.config.problem_type is None: if self.num_labels == 1: self.config.problem_type = "regression" elif self.num_labels > 1 and (labels.dtype == torch.long or labels.dtype == torch.int): self.config.problem_type = "single_label_classification" else: self.config.problem_type = "multi_label_classification" if self.config.problem_type == "regression": loss_fct = MSELoss() if self.num_labels == 1: loss = loss_fct(logits.squeeze(), labels.squeeze()) else: loss = loss_fct(logits, labels) elif self.config.problem_type == "single_label_classification": loss_fct = CrossEntropyLoss() loss = loss_fct(logits.view(-1, self.num_labels), labels.view(-1)) elif self.config.problem_type == "multi_label_classification": loss_fct = BCEWithLogitsLoss() loss = loss_fct(logits, labels) if not return_dict: output = (logits,) + outputs[2:] return ((loss,) + output) if loss is not None else output return SequenceClassifierOutput( loss=loss, logits=logits, hidden_states=outputs.hidden_states, attentions=outputs.attentions, ) ## ======== GraphCodeBERT ======== # Vanilla Fine-tuning For GraphCodeBERT class GraphCodeBERTForCodeClassification(RobertaPreTrainedModel): def __init__(self, config): super().__init__(config) self.num_labels = config.num_labels self.config = config self.roberta = RobertaModel(config) if self.config.use_freezing: self.roberta = freezer.freeze_lm(self.roberta) self.dropout = torch.nn.Dropout(config.hidden_dropout_prob) self.classifier = torch.nn.Linear(config.hidden_size, config.num_labels) self.init_weights() def freeze_backbone(self, use_freezing: bool=True): if use_freezing: self.roberta = freezer.freeze_lm(self.roberta) else: self.roberta = freezer.unfreeze_lm(self.roberta) def forward( self, input_ids=None, attention_mask=None, token_type_ids=None, position_ids=None, head_mask=None, inputs_embeds=None, labels=None, output_attentions=None, output_hidden_states=None, return_dict=None, ): r""" labels (:obj:`torch.LongTensor` of shape :obj:`(batch_size,)`, `optional`): Labels for computing the sequence classification/regression loss. Indices should be in :obj:`[0, ..., config.num_labels - 1]`. If :obj:`config.num_labels == 1` a regression loss is computed (Mean-Square loss), If :obj:`config.num_labels > 1` a classification loss is computed (Cross-Entropy). """ return_dict = return_dict if return_dict is not None else self.config.use_return_dict outputs = self.roberta( input_ids, attention_mask=attention_mask, token_type_ids=token_type_ids, position_ids=position_ids, head_mask=head_mask, inputs_embeds=inputs_embeds, output_attentions=output_attentions, output_hidden_states=output_hidden_states, return_dict=return_dict, ) pooled_output = outputs[1] pooled_output = self.dropout(pooled_output) logits = self.classifier(pooled_output) loss = None if labels is not None: loss_fct = CrossEntropyLoss() loss = loss_fct(logits.view(-1, self.num_labels), labels.view(-1)) if not return_dict: output = (logits,) + outputs[2:] return ((loss,) + output) if loss is not None else output return SequenceClassifierOutput( loss=loss, logits=logits, hidden_states=outputs.hidden_states, attentions=outputs.attentions, ) ## ======== PLBART ======== # Vanilla Fine-tuning For PLBART class PLBARTForCodeClassification(PLBartPreTrainedModel): _keys_to_ignore_on_load_missing = ["encoder.embed_tokens.weight", "decoder.embed_tokens.weight"] def __init__(self, config: PLBartConfig, **kwargs): super().__init__(config, **kwargs) self.model = PLBartModel(config) self.classification_head = PLBartClassificationHead( config.d_model, config.d_model, config.num_labels, config.classifier_dropout, ) self.model._init_weights(self.classification_head.dense) self.model._init_weights(self.classification_head.out_proj) # Copied from transformers.models.bart.modeling_bart.BartForSequenceClassification.forward def forward( self, input_ids: torch.LongTensor = None, attention_mask: Optional[torch.Tensor] = None, decoder_input_ids: Optional[torch.LongTensor] = None, decoder_attention_mask: Optional[torch.LongTensor] = None, head_mask: Optional[torch.Tensor] = None, decoder_head_mask: Optional[torch.Tensor] = None, cross_attn_head_mask: Optional[torch.Tensor] = None, encoder_outputs: Optional[List[torch.FloatTensor]] = None, inputs_embeds: Optional[torch.FloatTensor] = None, decoder_inputs_embeds: Optional[torch.FloatTensor] = None, labels: Optional[torch.LongTensor] = None, use_cache: Optional[bool] = None, output_attentions: Optional[bool] = None, output_hidden_states: Optional[bool] = None, return_dict: Optional[bool] = None, ) -> Union[Tuple, Seq2SeqSequenceClassifierOutput]: r""" labels (`torch.LongTensor` of shape `(batch_size,)`, *optional*): Labels for computing the sequence classification/regression loss. Indices should be in `[0, ..., config.num_labels - 1]`. If `config.num_labels > 1` a classification loss is computed (Cross-Entropy). """ return_dict = return_dict if return_dict is not None else self.config.use_return_dict if labels is not None: use_cache = False if input_ids is None and inputs_embeds is not None: raise NotImplementedError( f"Passing input embeddings is currently not supported for {self.__class__.__name__}" ) outputs = self.model( input_ids, attention_mask=attention_mask, decoder_input_ids=decoder_input_ids, decoder_attention_mask=decoder_attention_mask, head_mask=head_mask, decoder_head_mask=decoder_head_mask, cross_attn_head_mask=cross_attn_head_mask, encoder_outputs=encoder_outputs, inputs_embeds=inputs_embeds, decoder_inputs_embeds=decoder_inputs_embeds, use_cache=use_cache, output_attentions=output_attentions, output_hidden_states=output_hidden_states, return_dict=return_dict, ) hidden_states = outputs[0] # last hidden state eos_mask = input_ids.eq(self.config.eos_token_id).to(hidden_states.device) if len(torch.unique_consecutive(eos_mask.sum(1))) > 1: raise ValueError("All examples must have the same number of tokens.") sentence_representation = hidden_states[eos_mask, :].view(hidden_states.size(0), -1, hidden_states.size(-1))[ :, -1, : ] logits = self.classification_head(sentence_representation) loss = None if labels is not None: if self.config.problem_type is None: if self.config.num_labels == 1: self.config.problem_type = "regression" elif self.config.num_labels > 1 and (labels.dtype == torch.long or labels.dtype == torch.int): self.config.problem_type = "single_label_classification" else: self.config.problem_type = "multi_label_classification" if self.config.problem_type == "regression": loss_fct = MSELoss() if self.config.num_labels == 1: loss = loss_fct(logits.squeeze(), labels.squeeze()) else: loss = loss_fct(logits, labels) elif self.config.problem_type == "single_label_classification": loss_fct = CrossEntropyLoss() loss = loss_fct(logits.view(-1, self.config.num_labels), labels.view(-1)) elif self.config.problem_type == "multi_label_classification": loss_fct = BCEWithLogitsLoss() loss = loss_fct(logits, labels) if not return_dict: output = (logits,) + outputs[1:] return ((loss,) + output) if loss is not None else output return Seq2SeqSequenceClassifierOutput( loss=loss, logits=logits, past_key_values=outputs.past_key_values, decoder_hidden_states=outputs.decoder_hidden_states, decoder_attentions=outputs.decoder_attentions, cross_attentions=outputs.cross_attentions, encoder_last_hidden_state=outputs.encoder_last_hidden_state, encoder_hidden_states=outputs.encoder_hidden_states, encoder_attentions=outputs.encoder_attentions, ) ## ======== CodeT5 ======== # Vanilla Fine-tuning For CodeT5 class CodeT5ForCodeClassification(T5PreTrainedModel): _keys_to_ignore_on_load_missing = [r"encoder.embed_tokens.weight"] def __init__(self, config: T5Config): super().__init__(config) self.model_dim = config.d_model self.config.problem_type = None self.config.is_encoder_decoder = False self.shared = nn.Embedding(config.vocab_size, config.d_model) encoder_config = copy.deepcopy(config) encoder_config.is_decoder = False encoder_config.is_encoder_decoder = False encoder_config.use_cache = False self.encoder = T5Stack(encoder_config, self.shared) classifier_dropout = ( config.classifier_dropout if hasattr(config, 'classifier_dropout') else config.dropout_rate ) self.dropout = nn.Dropout(classifier_dropout) self.classifier = nn.Linear(config.d_model, config.num_labels) # Initialize weights and apply final processing self.post_init() # Model parallel self.model_parallel = False self.device_map = None def parallelize(self, device_map=None): self.device_map = ( get_device_map(len(self.encoder.block), range(torch.cuda.device_count())) if device_map is None else device_map ) assert_device_map(self.device_map, len(self.encoder.block)) self.encoder.parallelize(self.device_map) self.classifier.to(self.encoder.first_device) self.model_parallel = True def deparallelize(self): self.encoder.deparallelize() self.encoder = self.encoder.to("cpu") self.classifier = self.classifier.to("cpu") self.model_parallel = False self.device_map = None torch.cuda.empty_cache() def get_input_embeddings(self): return self.shared def set_input_embeddings(self, new_embeddings): self.shared = new_embeddings self.encoder.set_input_embeddings(new_embeddings) def get_encoder(self): return self.encoder def _prune_heads(self, heads_to_prune): """ Prunes heads of the model. heads_to_prune: dict of {layer_num: list of heads to prune in this layer} See base class PreTrainedModel """ for layer, heads in heads_to_prune.items(): self.encoder.block[layer].layer[0].SelfAttention.prune_heads(heads) def forward( self, input_ids: Optional[torch.LongTensor] = None, attention_mask: Optional[torch.FloatTensor] = None, head_mask: Optional[torch.FloatTensor] = None, inputs_embeds: Optional[torch.FloatTensor] = None, labels: Optional[torch.LongTensor] = None, output_attentions: Optional[bool] = None, output_hidden_states: Optional[bool] = None, return_dict: Optional[bool] = None, ) -> Union[Tuple[torch.FloatTensor], SequenceClassifierOutput]: r""" labels (`torch.LongTensor` of shape `(batch_size, sequence_length)`, *optional*): Labels for computing the token classification loss. Indices should be in `[0, ..., config.num_labels - 1]`. Returns: """ return_dict = return_dict if return_dict is not None else self.config.use_return_dict outputs = self.encoder( input_ids=input_ids, attention_mask=attention_mask, inputs_embeds=inputs_embeds, head_mask=head_mask, output_attentions=output_attentions, output_hidden_states=output_hidden_states, return_dict=return_dict, ) # Get last hidden indices # (batch_size) -> (batch_size, 1) -> (batch_size, hidden_size) -> (batch_size, 1, hidden_size) last_hidden_indices = ( (input_ids != self.config.pad_token_id).sum(dim=-1) - 1 ).unsqueeze(dim=-1).repeat(1, outputs[0].size(-1)).unsqueeze(1) sequence_output = outputs[0].gather(dim=1, index=last_hidden_indices).squeeze(1) sequence_output = self.dropout(sequence_output) logits = self.classifier(sequence_output) loss = None if labels is not None: if self.config.problem_type is None: if self.config.num_labels == 1: self.config.problem_type = "regression" elif self.config.num_labels > 1 and (labels.dtype == torch.long or labels.dtype == torch.int): self.config.problem_type = "single_label_classification" else: self.config.problem_type = "multi_label_classification" if self.config.problem_type == "regression": loss_fct = nn.MSELoss() if self.config.num_labels == 1: loss = loss_fct(logits.squeeze(), labels.squeeze()) else: loss = loss_fct(logits, labels) elif self.config.problem_type == "single_label_classification": loss_fct = nn.CrossEntropyLoss() loss = loss_fct(logits.view(-1, self.config.num_labels), labels.view(-1)) elif self.config.problem_type == "multi_label_classification": loss_fct = nn.BCEWithLogitsLoss() loss = loss_fct(logits, labels) if not return_dict: output = (logits,) + outputs[2:] return ((loss,) + output) if loss is not None else output return SequenceClassifierOutput( loss=loss, logits=logits, hidden_states=outputs.hidden_states, attentions=outputs.attentions )