from transformers import AutoTokenizer, AutoModelForSequenceClassification, AdamW, get_linear_schedule_with_warmup from sklearn.metrics import classification_report, f1_score from torch.utils.data import Dataset, DataLoader from argparse import ArgumentParser from str2bool import str2bool from torch import nn import pandas as pd import numpy as np import torch parser = ArgumentParser() parser.add_argument("-dataframe", required=True, help="Path to dataframe with columns ['text', 'label', 'split']") # 'data/small_dataset.csv' parser.add_argument("-model",required=True, help='Pre-traied model from huggingface or path to local folder with config.json') # '../norbert3-x-small/' parser.add_argument("-custom_wrapper", default=False, type=lambda x: bool(str2bool(x)), help='Boolean argument - True if use custom wrapper, False if use AutoModelForSequenceClassification') # True parser.add_argument("-lr", default='1e-05', help='Learning rate.') parser.add_argument("-max_length", default='512', help='Max lenght of the sequence in tokens.') parser.add_argument("-warmup", default='2', help='The number of steps for the warmup phase.') parser.add_argument("-batch_size", default='4', help='Batch size.') parser.add_argument("-epochs", default='20', help='Number of epochs for training.') args = parser.parse_args() device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") class Dataset(Dataset): def __init__(self, texts, targets, tokenizer, max_len): self.texts = texts self.targets = targets self.tokenizer = tokenizer self.max_len = max_len def __len__(self): return len(self.texts) def __getitem__(self, item): text = str(self.texts[item]) target = self.targets[item] encoding = self.tokenizer.encode_plus( text, add_special_tokens=True, max_length=self.max_len, return_token_type_ids=False, pad_to_max_length=True, return_attention_mask=True, truncation=True, return_tensors='pt', ) return { 'text': text, 'input_ids': encoding['input_ids'].flatten(), 'attention_mask': encoding['attention_mask'].flatten(), 'targets': torch.tensor(target, dtype=torch.long) } def create_data_loader(df, tokenizer, max_len, batch_size): ds = Dataset( texts=df.text.to_numpy(), targets=df.label.to_numpy(), tokenizer=tokenizer, max_len=max_len ) return DataLoader( ds, batch_size=batch_size ) class SentimentClassifier(nn.Module): def __init__(self, n_classes): super(SentimentClassifier, self).__init__() if not args.custom_wrapper: self.bert = AutoModelForSequenceClassification.from_pretrained(args.model, num_labels=n_classes, ignore_mismatched_sizes=True) if args.custom_wrapper: from modeling_norbert import NorbertForSequenceClassification self.bert = NorbertForSequenceClassification.from_pretrained(args.model, num_labels=n_classes, ignore_mismatched_sizes=True) def forward(self, input_ids, attention_mask): bert_output = self.bert( input_ids=input_ids, attention_mask=attention_mask, return_dict=True ) logits = bert_output.logits return logits def train_epoch( model, data_loader, loss_fn, optimizer, device, scheduler, n_examples ): y_true, y_pred = [], [] model = model.train() losses = [] correct_predictions = 0 for d in data_loader: input_ids = d["input_ids"].to(device) attention_mask = d["attention_mask"].to(device) targets = d["targets"].to(device) y_true += targets.tolist() outputs = model( input_ids=input_ids, attention_mask=attention_mask ) preds_idxs = torch.max(outputs, dim=1).indices y_pred += preds_idxs.numpy().tolist() loss = loss_fn(outputs, targets) correct_predictions += torch.sum(preds_idxs == targets) losses.append(loss.item()) loss.backward() nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0) optimizer.step() scheduler.step() optimizer.zero_grad() f1 = f1_score(y_true, y_pred, average='macro') return correct_predictions.double() / n_examples, np.mean(losses), f1 def eval_model(model, data_loader, loss_fn, device, n_examples): model = model.eval() losses = [] correct_predictions = 0 y_true, y_pred = [], [] with torch.no_grad(): for d in data_loader: input_ids = d["input_ids"].to(device) attention_mask = d["attention_mask"].to(device) targets = d["targets"].to(device) y_true += targets.tolist() outputs = model( input_ids=input_ids, attention_mask=attention_mask ) _, preds = torch.max(outputs, dim=1) y_pred += preds.tolist() loss = loss_fn(outputs, targets) correct_predictions += torch.sum(preds == targets) losses.append(loss.item()) f1 = f1_score(y_true, y_pred, average='macro') report = classification_report(y_true, y_pred) return correct_predictions.double() / n_examples, np.mean(losses), f1, report df = pd.read_csv(args.dataframe) df_train = df[df['split'] == 'train'] df_val = df[df['split'] == 'dev'] df_test = df[df['split'] == 'test'] print(f'Train samples: {len(df_train)}') print(f'Validation samples: {len(df_val)}') print(f'Test samples: {len(df_test)}') tokenizer = AutoTokenizer.from_pretrained(args.model) max_length = int(args.max_length) batch_size = int(args.batch_size) epochs = int(args.epochs) train_data_loader = create_data_loader(df_train, tokenizer, max_length, batch_size) val_data_loader = create_data_loader(df_val, tokenizer, max_length, batch_size) test_data_loader = create_data_loader(df_test, tokenizer, max_length, batch_size) class_names = df.label.unique() model = SentimentClassifier(len(class_names)) model = model.to(device) loss_fn = nn.CrossEntropyLoss().to(device) optimizer = torch.optim.AdamW(model.parameters(), lr=float(args.lr)) total_steps = len(train_data_loader) * epochs scheduler = get_linear_schedule_with_warmup( optimizer, num_warmup_steps=int(args.warmup), num_training_steps=total_steps ) for epoch in range(epochs): print(f'Epoch {epoch + 1}/{epochs}') print('-' * 10) train_acc, train_loss, train_f1 = train_epoch( model, train_data_loader, loss_fn, optimizer, device, scheduler, len(df_train) ) print() print(f'Train loss -- {train_loss} -- accuracy {train_acc} -- f1 {train_f1}') # save model # !!!!!!!!!!!!!!!! model_name = args.model.split('/')[-1] if args.model.split('/')[-1] != '' else args.model.split('/')[-2] torch.save(model.state_dict(),f'saved_models/{model_name}_epoch_{epochs}.bin') val_acc, val_loss, val_f1, report = eval_model( model, val_data_loader, loss_fn, device, len(df_val) ) print() print(f'Val loss {val_loss} -- accuracy -- {val_acc} -- f1 {val_f1}') print(report) test_acc, test_loss, test_f1, test_report = eval_model( model, test_data_loader, loss_fn, device, len(df_test) ) print() print('-------------TESTINGS-----------------') print() print(f'Test accuracy {test_acc}, f1 {test_f1}') print(test_report)