import warnings warnings.simplefilter('ignore') import numpy as np import pandas as pd from tqdm import tqdm from sklearn import metrics import transformers import torch from torch.utils.data import Dataset, DataLoader, RandomSampler, SequentialSampler from transformers import DistilBertTokenizer, DistilBertModel import logging logging.basicConfig(level=logging.ERROR) # # Setting up the device for GPU usage from torch import cuda device = 'cuda' if cuda.is_available() else 'cpu' def hamming_score(y_true, y_pred, normalize=True, sample_weight=None): acc_list = [] for i in range(y_true.shape[0]): set_true = set( np.where(y_true[i])[0] ) set_pred = set( np.where(y_pred[i])[0] ) tmp_a = None if len(set_true) == 0 and len(set_pred) == 0: tmp_a = 1 else: tmp_a = len(set_true.intersection(set_pred))/\ float( len(set_true.union(set_pred)) ) acc_list.append(tmp_a) return np.mean(acc_list) data = pd.read_csv('Vulnerable code dataset 15_12_22 - Training.csv') #data.drop(['source_name'], inplace=True, axis=1) new_df = pd.DataFrame() new_df['text'] = data['text'] new_df['labels'] = data['label'] new_df.head() # Sections of config # Defining some key variables that will be used later on in the training MAX_LEN = 128 TRAIN_BATCH_SIZE = 4 VALID_BATCH_SIZE = 4 EPOCHS = 1 LEARNING_RATE = 1e-05 tokenizer = DistilBertTokenizer.from_pretrained('distilbert-base-uncased', truncation=True, do_lower_case=True) class MultiLabelDataset(Dataset): def __init__(self, dataframe, tokenizer, max_len): self.tokenizer = tokenizer self.data = dataframe self.text = dataframe.text self.targets = self.data.labels self.max_len = max_len def __len__(self): return len(self.text) def __getitem__(self, index): text = str(self.text[index]) text = " ".join(text.split()) inputs = self.tokenizer.encode_plus( text, None, add_special_tokens=True, max_length=self.max_len, pad_to_max_length=True, return_token_type_ids=True ) ids = inputs['input_ids'] mask = inputs['attention_mask'] token_type_ids = inputs["token_type_ids"] return { 'ids': torch.tensor(ids, dtype=torch.long), 'mask': torch.tensor(mask, dtype=torch.long), 'token_type_ids': torch.tensor(token_type_ids, dtype=torch.long), #'targets': torch.tensor(self.targets[index], dtype=torch.float) } train_size = 0.8 train_data=new_df.sample(frac=train_size,random_state=200) test_data=new_df.drop(train_data.index).reset_index(drop=True) train_data = train_data.reset_index(drop=True) print("FULL Dataset: {}".format(new_df.shape)) print("TRAIN Dataset: {}".format(train_data.shape)) print("TEST Dataset: {}".format(test_data.shape)) training_set = MultiLabelDataset(train_data, tokenizer, MAX_LEN) testing_set = MultiLabelDataset(test_data, tokenizer, MAX_LEN) train_params = {'batch_size': TRAIN_BATCH_SIZE, 'shuffle': True, 'num_workers': 0 } test_params = {'batch_size': VALID_BATCH_SIZE, 'shuffle': True, 'num_workers': 0 } training_loader = DataLoader(training_set, **train_params) testing_loader = DataLoader(testing_set, **test_params) # Creating the customized model, by adding a drop out and a dense layer on top of distil bert to get the final output for the model. class DistilBERTClass(torch.nn.Module): def __init__(self): super(DistilBERTClass, self).__init__() self.l1 = DistilBertModel.from_pretrained("distilbert-base-uncased") self.pre_classifier = torch.nn.Linear(768, 768) self.dropout = torch.nn.Dropout(0.1) self.classifier = torch.nn.Linear(768, 6) def forward(self, input_ids, attention_mask, token_type_ids): output_1 = self.l1(input_ids=input_ids, attention_mask=attention_mask) hidden_state = output_1[0] pooler = hidden_state[:, 0] pooler = self.pre_classifier(pooler) pooler = torch.nn.Tanh()(pooler) pooler = self.dropout(pooler) output = self.classifier(pooler) return output model = DistilBERTClass() model.to(device) def loss_fn(outputs, targets): return torch.nn.BCEWithLogitsLoss()(outputs, targets) optimizer = torch.optim.Adam(params = model.parameters(), lr=LEARNING_RATE) def train(epoch): model.train() for _,data in tqdm(enumerate(training_loader, 0)): ids = data['ids'].to(device, dtype = torch.long) mask = data['mask'].to(device, dtype = torch.long) token_type_ids = data['token_type_ids'].to(device, dtype = torch.long) #targets = data['targets'].to(device, dtype = torch.float) outputs = model(ids, mask, token_type_ids) optimizer.zero_grad() #loss = loss_fn(outputs) #if _%5000==0: # print(f'Epoch: {epoch}, Loss: {loss.item()}') #loss.backward() #optimizer.step() #for epoch in range(EPOCHS): # train(epoch) def validation(testing_loader): model.eval() fin_targets=[] fin_outputs=[] with torch.no_grad(): for _, data in tqdm(enumerate(testing_loader, 0)): ids = data['ids'].to(device, dtype = torch.long) mask = data['mask'].to(device, dtype = torch.long) token_type_ids = data['token_type_ids'].to(device, dtype = torch.long) # targets = data['targets'].to(device, dtype = torch.float) outputs = model(ids, mask, token_type_ids) #fin_targets.extend(targets.cpu().detach().numpy().tolist()) fin_outputs.extend(torch.sigmoid(outputs).cpu().detach().numpy().tolist()) return fin_outputs, fin_targets outputs = validation(testing_loader) print(outputs) #final_outputs = np.array(outputs) >=0.5 #val_hamming_loss = metrics.hamming_loss(final_outputs) #val_hamming_score = hamming_score(np.array(final_outputs)) #print(f"Hamming Score = {val_hamming_score}") #print(f"Hamming Loss = {val_hamming_loss}")