Spaces:
Runtime error
Runtime error
from transformers import AutoTokenizer, AutoModelForSequenceClassification, AdamW, get_linear_schedule_with_warmup | |
from sklearn.metrics import classification_report, f1_score | |
from torch.utils.data import Dataset, DataLoader | |
from argparse import ArgumentParser | |
from str2bool import str2bool | |
from torch import nn | |
import pandas as pd | |
import numpy as np | |
import torch | |
parser = ArgumentParser() | |
parser.add_argument("-dataframe", required=True, help="Path to dataframe with columns ['text', 'label', 'split']") # 'data/small_dataset.csv' | |
parser.add_argument("-model",required=True, help='Pre-traied model from huggingface or path to local folder with config.json') # '../norbert3-x-small/' | |
parser.add_argument("-custom_wrapper", default=False, type=lambda x: bool(str2bool(x)), help='Boolean argument - True if use custom wrapper, False if use AutoModelForSequenceClassification') # True | |
parser.add_argument("-lr", default='1e-05', help='Learning rate.') | |
parser.add_argument("-max_length", default='512', help='Max lenght of the sequence in tokens.') | |
parser.add_argument("-warmup", default='2', help='The number of steps for the warmup phase.') | |
parser.add_argument("-batch_size", default='4', help='Batch size.') | |
parser.add_argument("-epochs", default='20', help='Number of epochs for training.') | |
args = parser.parse_args() | |
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") | |
class Dataset(Dataset): | |
def __init__(self, texts, targets, tokenizer, max_len): | |
self.texts = texts | |
self.targets = targets | |
self.tokenizer = tokenizer | |
self.max_len = max_len | |
def __len__(self): | |
return len(self.texts) | |
def __getitem__(self, item): | |
text = str(self.texts[item]) | |
target = self.targets[item] | |
encoding = self.tokenizer.encode_plus( | |
text, | |
add_special_tokens=True, | |
max_length=self.max_len, | |
return_token_type_ids=False, | |
pad_to_max_length=True, | |
return_attention_mask=True, | |
truncation=True, | |
return_tensors='pt', | |
) | |
return { | |
'text': text, | |
'input_ids': encoding['input_ids'].flatten(), | |
'attention_mask': encoding['attention_mask'].flatten(), | |
'targets': torch.tensor(target, dtype=torch.long) | |
} | |
def create_data_loader(df, tokenizer, max_len, batch_size): | |
ds = Dataset( | |
texts=df.text.to_numpy(), | |
targets=df.label.to_numpy(), | |
tokenizer=tokenizer, | |
max_len=max_len | |
) | |
return DataLoader( | |
ds, | |
batch_size=batch_size | |
) | |
class SentimentClassifier(nn.Module): | |
def __init__(self, n_classes): | |
super(SentimentClassifier, self).__init__() | |
if not args.custom_wrapper: | |
self.bert = AutoModelForSequenceClassification.from_pretrained(args.model, num_labels=n_classes, ignore_mismatched_sizes=True) | |
if args.custom_wrapper: | |
from modeling_norbert import NorbertForSequenceClassification | |
self.bert = NorbertForSequenceClassification.from_pretrained(args.model, num_labels=n_classes, ignore_mismatched_sizes=True) | |
def forward(self, input_ids, attention_mask): | |
bert_output = self.bert( | |
input_ids=input_ids, | |
attention_mask=attention_mask, | |
return_dict=True | |
) | |
logits = bert_output.logits | |
return logits | |
def train_epoch( | |
model, | |
data_loader, | |
loss_fn, | |
optimizer, | |
device, | |
scheduler, | |
n_examples | |
): | |
y_true, y_pred = [], [] | |
model = model.train() | |
losses = [] | |
correct_predictions = 0 | |
for d in data_loader: | |
input_ids = d["input_ids"].to(device) | |
attention_mask = d["attention_mask"].to(device) | |
targets = d["targets"].to(device) | |
y_true += targets.tolist() | |
outputs = model( | |
input_ids=input_ids, | |
attention_mask=attention_mask | |
) | |
preds_idxs = torch.max(outputs, dim=1).indices | |
y_pred += preds_idxs.numpy().tolist() | |
loss = loss_fn(outputs, targets) | |
correct_predictions += torch.sum(preds_idxs == targets) | |
losses.append(loss.item()) | |
loss.backward() | |
nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0) | |
optimizer.step() | |
scheduler.step() | |
optimizer.zero_grad() | |
f1 = f1_score(y_true, y_pred, average='macro') | |
return correct_predictions.double() / n_examples, np.mean(losses), f1 | |
def eval_model(model, data_loader, loss_fn, device, n_examples): | |
model = model.eval() | |
losses = [] | |
correct_predictions = 0 | |
y_true, y_pred = [], [] | |
with torch.no_grad(): | |
for d in data_loader: | |
input_ids = d["input_ids"].to(device) | |
attention_mask = d["attention_mask"].to(device) | |
targets = d["targets"].to(device) | |
y_true += targets.tolist() | |
outputs = model( | |
input_ids=input_ids, | |
attention_mask=attention_mask | |
) | |
_, preds = torch.max(outputs, dim=1) | |
y_pred += preds.tolist() | |
loss = loss_fn(outputs, targets) | |
correct_predictions += torch.sum(preds == targets) | |
losses.append(loss.item()) | |
f1 = f1_score(y_true, y_pred, average='macro') | |
report = classification_report(y_true, y_pred) | |
return correct_predictions.double() / n_examples, np.mean(losses), f1, report | |
df = pd.read_csv(args.dataframe) | |
df_train = df[df['split'] == 'train'] | |
df_val = df[df['split'] == 'dev'] | |
df_test = df[df['split'] == 'test'] | |
print(f'Train samples: {len(df_train)}') | |
print(f'Validation samples: {len(df_val)}') | |
print(f'Test samples: {len(df_test)}') | |
tokenizer = AutoTokenizer.from_pretrained(args.model) | |
max_length = int(args.max_length) | |
batch_size = int(args.batch_size) | |
epochs = int(args.epochs) | |
train_data_loader = create_data_loader(df_train, tokenizer, max_length, batch_size) | |
val_data_loader = create_data_loader(df_val, tokenizer, max_length, batch_size) | |
test_data_loader = create_data_loader(df_test, tokenizer, max_length, batch_size) | |
class_names = df.label.unique() | |
model = SentimentClassifier(len(class_names)) | |
model = model.to(device) | |
loss_fn = nn.CrossEntropyLoss().to(device) | |
optimizer = torch.optim.AdamW(model.parameters(), lr=float(args.lr)) | |
total_steps = len(train_data_loader) * epochs | |
scheduler = get_linear_schedule_with_warmup( | |
optimizer, | |
num_warmup_steps=int(args.warmup), | |
num_training_steps=total_steps | |
) | |
for epoch in range(epochs): | |
print(f'Epoch {epoch + 1}/{epochs}') | |
print('-' * 10) | |
train_acc, train_loss, train_f1 = train_epoch( | |
model, | |
train_data_loader, | |
loss_fn, | |
optimizer, | |
device, | |
scheduler, | |
len(df_train) | |
) | |
print() | |
print(f'Train loss -- {train_loss} -- accuracy {train_acc} -- f1 {train_f1}') | |
# save model | |
# !!!!!!!!!!!!!!!! | |
model_name = args.model.split('/')[-1] if args.model.split('/')[-1] != '' else args.model.split('/')[-2] | |
torch.save(model.state_dict(),f'saved_models/{model_name}_epoch_{epochs}.bin') | |
val_acc, val_loss, val_f1, report = eval_model( | |
model, | |
val_data_loader, | |
loss_fn, | |
device, | |
len(df_val) | |
) | |
print() | |
print(f'Val loss {val_loss} -- accuracy -- {val_acc} -- f1 {val_f1}') | |
print(report) | |
test_acc, test_loss, test_f1, test_report = eval_model( | |
model, | |
test_data_loader, | |
loss_fn, | |
device, | |
len(df_test) | |
) | |
print() | |
print('-------------TESTINGS-----------------') | |
print() | |
print(f'Test accuracy {test_acc}, f1 {test_f1}') | |
print(test_report) |