alignscore-safetensor / modelalign.py
PANH's picture
Rename modelalign to modelalign.py
e8fc77f verified
import math
from typing import Optional, Tuple
from transformers import AdamW, get_linear_schedule_with_warmup, AutoConfig
from transformers import BertForPreTraining, BertModel, RobertaModel, AlbertModel, AlbertForMaskedLM, RobertaForMaskedLM
import torch
import torch.nn as nn
import pytorch_lightning as pl
from sklearn.metrics import f1_score
from dataclasses import dataclass
class BERTAlignModel(pl.LightningModule):
def __init__(self, model='bert-base-uncased', using_pretrained=True, *args, **kwargs) -> None:
super().__init__()
# Already defined in lightning: self.device
self.save_hyperparameters()
self.model = model
if 'muppet' in model:
assert using_pretrained == True, "Only support pretrained muppet!"
self.base_model = RobertaModel.from_pretrained(model)
self.mlm_head = RobertaForMaskedLM(AutoConfig.from_pretrained(model)).lm_head
elif 'roberta' in model:
if using_pretrained:
self.base_model = RobertaModel.from_pretrained(model)
self.mlm_head = RobertaForMaskedLM.from_pretrained(model).lm_head
else:
self.base_model = RobertaModel(AutoConfig.from_pretrained(model))
self.mlm_head = RobertaForMaskedLM(AutoConfig.from_pretrained(model)).lm_head
elif 'albert' in model:
if using_pretrained:
self.base_model = AlbertModel.from_pretrained(model)
self.mlm_head = AlbertForMaskedLM.from_pretrained(model).predictions
else:
self.base_model = AlbertModel(AutoConfig.from_pretrained(model))
self.mlm_head = AlbertForMaskedLM(AutoConfig.from_pretrained(model)).predictions
elif 'bert' in model:
if using_pretrained:
self.base_model = BertModel.from_pretrained(model)
self.mlm_head = BertForPreTraining.from_pretrained(model).cls.predictions
else:
self.base_model = BertModel(AutoConfig.from_pretrained(model))
self.mlm_head = BertForPreTraining(AutoConfig.from_pretrained(model)).cls.predictions
elif 'electra' in model:
self.generator = BertModel(AutoConfig.from_pretrained('prajjwal1/bert-small'))
self.generator_mlm = BertForPreTraining(AutoConfig.from_pretrained('prajjwal1/bert-small')).cls.predictions
self.base_model = BertModel(AutoConfig.from_pretrained('bert-base-uncased'))
self.discriminator_predictor = ElectraDiscriminatorPredictions(self.base_model.config)
self.bin_layer = nn.Linear(self.base_model.config.hidden_size, 2)
self.tri_layer = nn.Linear(self.base_model.config.hidden_size, 3)
self.reg_layer = nn.Linear(self.base_model.config.hidden_size, 1)
self.dropout = nn.Dropout(p=0.1)
self.need_mlm = True
self.is_finetune = False
self.mlm_loss_factor = 0.5
self.softmax = nn.Softmax(dim=-1)
def forward(self, batch):
if 'electra' in self.model:
return self.electra_forward(batch)
base_model_output = self.base_model(
input_ids = batch['input_ids'],
attention_mask = batch['attention_mask'],
token_type_ids = batch['token_type_ids'] if 'token_type_ids' in batch.keys() else None
)
prediction_scores = self.mlm_head(base_model_output.last_hidden_state) ## sequence_output for mlm
seq_relationship_score = self.bin_layer(self.dropout(base_model_output.pooler_output)) ## pooled output for classification
tri_label_score = self.tri_layer(self.dropout(base_model_output.pooler_output))
reg_label_score = self.reg_layer(base_model_output.pooler_output)
total_loss = None
if 'mlm_label' in batch.keys(): ### 'mlm_label' and 'align_label' when training
ce_loss_fct = nn.CrossEntropyLoss(reduction='sum')
masked_lm_loss = ce_loss_fct(prediction_scores.view(-1, self.base_model.config.vocab_size), batch['mlm_label'].view(-1)) #/ self.con vocabulary
next_sentence_loss = ce_loss_fct(seq_relationship_score.view(-1, 2), batch['align_label'].view(-1)) / math.log(2)
tri_label_loss = ce_loss_fct(tri_label_score.view(-1, 3), batch['tri_label'].view(-1)) / math.log(3)
reg_label_loss = self.mse_loss(reg_label_score.view(-1), batch['reg_label'].view(-1), reduction='sum')
masked_lm_loss_num = torch.sum(batch['mlm_label'].view(-1) != -100)
next_sentence_loss_num = torch.sum(batch['align_label'].view(-1) != -100)
tri_label_loss_num = torch.sum(batch['tri_label'].view(-1) != -100)
reg_label_loss_num = torch.sum(batch['reg_label'].view(-1) != -100.0)
return ModelOutput(
loss=total_loss,
all_loss=[masked_lm_loss, next_sentence_loss, tri_label_loss, reg_label_loss] if 'mlm_label' in batch.keys() else None,
loss_nums=[masked_lm_loss_num, next_sentence_loss_num, tri_label_loss_num, reg_label_loss_num] if 'mlm_label' in batch.keys() else None,
prediction_logits=prediction_scores,
seq_relationship_logits=seq_relationship_score,
tri_label_logits=tri_label_score,
reg_label_logits=reg_label_score,
hidden_states=base_model_output.hidden_states,
attentions=base_model_output.attentions
)
def electra_forward(self, batch):
if 'mlm_label' in batch.keys():
ce_loss_fct = nn.CrossEntropyLoss()
generator_output = self.generator_mlm(self.generator(
input_ids = batch['input_ids'],
attention_mask = batch['attention_mask'],
token_type_ids = batch['token_type_ids'] if 'token_type_ids' in batch.keys() else None
).last_hidden_state)
masked_lm_loss = ce_loss_fct(generator_output.view(-1, self.generator.config.vocab_size), batch['mlm_label'].view(-1))
hallucinated_tokens = batch['input_ids'].clone()
hallucinated_tokens[batch['mlm_label']!=-100] = torch.argmax(generator_output, dim=-1)[batch['mlm_label']!=-100]
replaced_token_label = (batch['input_ids'] == hallucinated_tokens).long()#.type(torch.LongTensor) #[batch['mlm_label'] == -100] = -100
replaced_token_label[batch['mlm_label']!=-100] = (batch['mlm_label'] == hallucinated_tokens)[batch['mlm_label']!=-100].long()
replaced_token_label[batch['input_ids'] == 0] = -100 ### ignore paddings
base_model_output = self.base_model(
input_ids = hallucinated_tokens if 'mlm_label' in batch.keys() else batch['input_ids'],
attention_mask = batch['attention_mask'],
token_type_ids = batch['token_type_ids'] if 'token_type_ids' in batch.keys() else None
)
hallu_detect_score = self.discriminator_predictor(base_model_output.last_hidden_state)
seq_relationship_score = self.bin_layer(self.dropout(base_model_output.pooler_output)) ## pooled output for classification
tri_label_score = self.tri_layer(self.dropout(base_model_output.pooler_output))
reg_label_score = self.reg_layer(base_model_output.pooler_output)
total_loss = None
if 'mlm_label' in batch.keys(): ### 'mlm_label' and 'align_label' when training
total_loss = []
ce_loss_fct = nn.CrossEntropyLoss()
hallu_detect_loss = ce_loss_fct(hallu_detect_score.view(-1,2),replaced_token_label.view(-1))
next_sentence_loss = ce_loss_fct(seq_relationship_score.view(-1, 2), batch['align_label'].view(-1))
tri_label_loss = ce_loss_fct(tri_label_score.view(-1, 3), batch['tri_label'].view(-1))
reg_label_loss = self.mse_loss(reg_label_score.view(-1), batch['reg_label'].view(-1))
total_loss.append(10.0 * hallu_detect_loss if not torch.isnan(hallu_detect_loss).item() else 0.)
total_loss.append(0.2 * masked_lm_loss if (not torch.isnan(masked_lm_loss).item() and self.need_mlm) else 0.)
total_loss.append(next_sentence_loss if not torch.isnan(next_sentence_loss).item() else 0.)
total_loss.append(tri_label_loss if not torch.isnan(tri_label_loss).item() else 0.)
total_loss.append(reg_label_loss if not torch.isnan(reg_label_loss).item() else 0.)
total_loss = sum(total_loss)
return ModelOutput(
loss=total_loss,
all_loss=[masked_lm_loss, next_sentence_loss, tri_label_loss, reg_label_loss, hallu_detect_loss] if 'mlm_label' in batch.keys() else None,
prediction_logits=hallu_detect_score,
seq_relationship_logits=seq_relationship_score,
tri_label_logits=tri_label_score,
reg_label_logits=reg_label_score,
hidden_states=base_model_output.hidden_states,
attentions=base_model_output.attentions
)
def training_step(self, train_batch, batch_idx):
output = self(train_batch)
return {'losses': output.all_loss, 'loss_nums': output.loss_nums}
def training_step_end(self, step_output):
losses = step_output['losses']
loss_nums = step_output['loss_nums']
assert len(loss_nums) == len(losses), 'loss_num should be the same length as losses'
loss_mlm_num = torch.sum(loss_nums[0])
loss_bin_num = torch.sum(loss_nums[1])
loss_tri_num = torch.sum(loss_nums[2])
loss_reg_num = torch.sum(loss_nums[3])
loss_mlm = torch.sum(losses[0]) / loss_mlm_num if loss_mlm_num > 0 else 0.
loss_bin = torch.sum(losses[1]) / loss_bin_num if loss_bin_num > 0 else 0.
loss_tri = torch.sum(losses[2]) / loss_tri_num if loss_tri_num > 0 else 0.
loss_reg = torch.sum(losses[3]) / loss_reg_num if loss_reg_num > 0 else 0.
total_loss = self.mlm_loss_factor * loss_mlm + loss_bin + loss_tri + loss_reg
self.log('train_loss', total_loss)# , sync_dist=True
self.log('mlm_loss', loss_mlm)
self.log('bin_label_loss', loss_bin)
self.log('tri_label_loss', loss_tri)
self.log('reg_label_loss', loss_reg)
return total_loss
def validation_step(self, val_batch, batch_idx):
if not self.is_finetune:
with torch.no_grad():
output = self(val_batch)
return {'losses': output.all_loss, 'loss_nums': output.loss_nums}
with torch.no_grad():
output = self(val_batch)['seq_relationship_logits']
output = self.softmax(output)[:, 1].tolist()
pred = [int(align_prob>0.5) for align_prob in output]
labels = val_batch['align_label'].tolist()
return {"pred": pred, 'labels': labels}#, "preds":preds, "labels":x['labels']}
def validation_step_end(self, step_output):
losses = step_output['losses']
loss_nums = step_output['loss_nums']
assert len(loss_nums) == len(losses), 'loss_num should be the same length as losses'
loss_mlm_num = torch.sum(loss_nums[0])
loss_bin_num = torch.sum(loss_nums[1])
loss_tri_num = torch.sum(loss_nums[2])
loss_reg_num = torch.sum(loss_nums[3])
loss_mlm = torch.sum(losses[0]) / loss_mlm_num if loss_mlm_num > 0 else 0.
loss_bin = torch.sum(losses[1]) / loss_bin_num if loss_bin_num > 0 else 0.
loss_tri = torch.sum(losses[2]) / loss_tri_num if loss_tri_num > 0 else 0.
loss_reg = torch.sum(losses[3]) / loss_reg_num if loss_reg_num > 0 else 0.
total_loss = self.mlm_loss_factor * loss_mlm + loss_bin + loss_tri + loss_reg
self.log('train_loss', total_loss)# , sync_dist=True
self.log('mlm_loss', loss_mlm)
self.log('bin_label_loss', loss_bin)
self.log('tri_label_loss', loss_tri)
self.log('reg_label_loss', loss_reg)
return total_loss
def validation_epoch_end(self, outputs):
if not self.is_finetune:
total_loss = torch.stack(outputs).mean()
self.log("val_loss", total_loss, prog_bar=True, sync_dist=True)
else:
all_predictions = []
all_labels = []
for each_output in outputs:
all_predictions.extend(each_output['pred'])
all_labels.extend(each_output['labels'])
self.log("f1", f1_score(all_labels, all_predictions), prog_bar=True, sync_dist=True)
def configure_optimizers(self):
"""Prepare optimizer and schedule (linear warmup and decay)"""
no_decay = ["bias", "LayerNorm.weight"]
optimizer_grouped_parameters = [
{
"params": [p for n, p in self.named_parameters() if not any(nd in n for nd in no_decay)],
"weight_decay": self.hparams.weight_decay,
},
{
"params": [p for n, p in self.named_parameters() if any(nd in n for nd in no_decay)],
"weight_decay": 0.0,
},
]
optimizer = AdamW(optimizer_grouped_parameters, lr=self.hparams.learning_rate, eps=self.hparams.adam_epsilon)
scheduler = get_linear_schedule_with_warmup(
optimizer,
num_warmup_steps=int(self.hparams.warmup_steps_portion * self.trainer.estimated_stepping_batches),
num_training_steps=self.trainer.estimated_stepping_batches,
)
scheduler = {"scheduler": scheduler, "interval": "step", "frequency": 1}
return [optimizer], [scheduler]
def mse_loss(self, input, target, ignored_index=-100.0, reduction='mean'):
mask = (target == ignored_index)
out = (input[~mask]-target[~mask])**2
if reduction == "mean":
return out.mean()
elif reduction == "sum":
return out.sum()
class ElectraDiscriminatorPredictions(nn.Module):
"""Prediction module for the discriminator, made up of two dense layers."""
def __init__(self, config):
super().__init__()
self.dense = nn.Linear(config.hidden_size, config.hidden_size)
self.dense_prediction = nn.Linear(config.hidden_size, 2)
self.config = config
self.gelu = nn.GELU()
def forward(self, discriminator_hidden_states):
hidden_states = self.dense(discriminator_hidden_states)
hidden_states = self.gelu(hidden_states)
logits = self.dense_prediction(hidden_states).squeeze(-1)
return logits
@dataclass
class ModelOutput():
loss: Optional[torch.FloatTensor] = None
all_loss: Optional[list] = None
loss_nums: Optional[list] = None
prediction_logits: torch.FloatTensor = None
seq_relationship_logits: torch.FloatTensor = None
tri_label_logits: torch.FloatTensor = None
reg_label_logits: torch.FloatTensor = None
hidden_states: Optional[Tuple[torch.FloatTensor]] = None
attentions: Optional[Tuple[torch.FloatTensor]] = None