Spaces:
Sleeping
Sleeping
#!/usr/bin/env python | |
# coding: utf-8 | |
import torch | |
import torch.nn as nn | |
import torch.optim as optim | |
from torch.utils.data import DataLoader, random_split | |
import pandas as pd | |
from tqdm import tqdm | |
import time | |
from .Utilities import LanguageDataset | |
class Seq2Seq(): | |
""" | |
Base class for Seq2Seq (text-generation models). This class will be inherited by wrappers of transformers like GPT2 | |
and T5. | |
Attributes: | |
Methods: | |
""" | |
def __init__(self, gpu=0, max_length=0, model_path=None): | |
# Load Seq2Seq to device based on available hardware | |
if torch.cuda.is_available(): | |
self.device = torch.device('cuda') | |
else: | |
try: | |
self.device = torch.device('mps') # Apple Silicon | |
except Exception: | |
self.device = torch.device('cpu') | |
# GPU that model will run on | |
self.gpu = gpu | |
# Model specs | |
if model_path: self.model = torch.load(model_path).to(self.device) | |
else: self.model = None | |
self.model_name = "" | |
self.tokenizer = None | |
self.max_length = max_length | |
# Training specs | |
self.train_loader = None | |
self.valid_loader = None | |
self.results = pd.DataFrame(columns=['epoch', 'model_arch', 'batch_size', 'gpu', 'training_loss', 'validation_loss', 'epoch_duration_sec']) | |
def load_data(self, df, batch_size, train_ratio=0.8): | |
self.batch_size = batch_size | |
dataset = LanguageDataset(df, self.tokenizer) | |
train_size = int(0.8*len(dataset)) | |
valid_size = len(dataset) - train_size | |
train_data, valid_data = random_split(dataset, [train_size, valid_size]) | |
self.max_length = dataset.max_length | |
self.train_loader = DataLoader(train_data, batch_size=self.batch_size, shuffle=True) | |
self.valid_loader = DataLoader(valid_data, batch_size=self.batch_size) | |
""" Return training results """ | |
def summary(self): | |
return self.results | |
""" Save model to path """ | |
def to_pt(self, path): | |
torch.save(self.model, path) | |
class GPT2(Seq2Seq): | |
""" | |
This is the GPT2 implementation of Seq2Seq. | |
""" | |
def __init__(self, gpu, model_name, batch_size=16): | |
super().__init__(gpu, max_length=0) | |
from transformers import GPT2Tokenizer, GPT2LMHeadModel | |
self.model_name = model_name | |
self.model = GPT2LMHeadModel.from_pretrained(self.model_name).to(self.device) | |
self.tokenizer = GPT2Tokenizer.from_pretrained(self.model_name) | |
self.tokenizer.pad_token = self.tokenizer.eos_token | |
def train(self, num_epochs=3, train_ratio=0.8): | |
criterion = nn.CrossEntropyLoss(ignore_index=self.tokenizer.pad_token_id) | |
optimizer = optim.Adam(self.model.parameters(), lr=5e-4) | |
# Init a results dataframe | |
results = pd.DataFrame(columns=['epoch', 'transformer', 'batch_size', 'gpu', | |
'training_loss', 'validation_loss', 'epoch_duration_sec']) | |
# The training loop | |
for epoch in range(num_epochs): | |
start_time = time.time() # Start the timer for the epoch | |
# Training | |
## This line tells the self.model we're in 'learning mode' | |
self.model.train() | |
epoch_training_loss = 0 | |
train_iterator = tqdm(self.train_loader, | |
desc=f"Training Epoch {epoch + 1}/{num_epochs} Batch Size: {self.batch_size}, Transformer: {self.model_name}") | |
for batch in train_iterator: | |
optimizer.zero_grad() | |
inputs = batch['input_ids'].squeeze(1).to(self.device) | |
targets = inputs.clone() | |
outputs = self.model(input_ids=inputs, labels=targets) | |
loss = outputs.loss | |
loss.backward() | |
optimizer.step() | |
train_iterator.set_postfix({'Training Loss': loss.item()}) | |
epoch_training_loss += loss.item() | |
avg_epoch_training_loss = epoch_training_loss / len(train_iterator) | |
# Validation | |
## This line below tells the self.model to 'stop learning' | |
self.model.eval() | |
epoch_validation_loss = 0 | |
total_loss = 0 | |
valid_iterator = tqdm(self.valid_loader, desc=f"Validation Epoch {epoch + 1}/{num_epochs}") | |
with torch.no_grad(): | |
for batch in valid_iterator: | |
inputs = batch['input_ids'].squeeze(1).to(self.device) | |
targets = inputs.clone() | |
outputs = self.model(input_ids=inputs, labels=targets) | |
loss = outputs.loss | |
total_loss += loss | |
valid_iterator.set_postfix({'Validation Loss': loss.item()}) | |
epoch_validation_loss += loss.item() | |
avg_epoch_validation_loss = epoch_validation_loss / len(self.valid_loader) | |
end_time = time.time() # End the timer for the epoch | |
epoch_duration_sec = end_time - start_time # Calculate the duration in seconds | |
new_row = {'transformer': self.model_name, | |
'batch_size': self.batch_size, | |
'gpu': self.gpu, | |
'epoch': epoch + 1, | |
'training_loss': avg_epoch_training_loss, | |
'validation_loss': avg_epoch_validation_loss, | |
'epoch_duration_sec': epoch_duration_sec} # Add epoch_duration to the dataframe | |
self.results.loc[len(self.results)] = new_row | |
print(f"Epoch: {epoch + 1}, Validation Loss: {total_loss / len(self.valid_loader)}") | |
def generate_text(self, input_str, top_k=16, top_p=0.95, temperature=1.0, repetition_penalty=1.2): | |
# Encode string to tokens | |
input_ids= self.tokenizer.encode(input_str, return_tensors='pt').to(self.device) | |
# Feed tokens to model and get outcome tokens | |
output = self.model.generate( | |
input_ids, | |
max_length=self.max_length, | |
num_return_sequences=1, | |
do_sample=True, | |
top_k=top_k, | |
top_p=top_p, | |
temperature=temperature, | |
repetition_penalty=repetition_penalty | |
) | |
# Decode tokens to string | |
decoded_output = self.tokenizer.decode(output[0], skip_special_tokens=True) | |
return decoded_output | |
class FlanT5(Seq2Seq): | |
""" | |
This is the T5 implementation of Seq2Seq - it is designed to support T5 models of various sizes. | |
""" | |
def __init__(self, gpu, model_name, batch_size=16): | |
super().__init__(gpu, max_length=0) | |
from transformers import T5ForConditionalGeneration, T5Tokenizer | |
self.model_name = model_name | |
self.model = T5ForConditionalGeneration.from_pretrained(self.model_name).to(self.device) | |
self.tokenizer = T5Tokenizer.from_pretrained(self.model_name) | |
self.tokenizer.pad_token = self.tokenizer.eos_token | |
def train(self, num_epochs=3, train_ratio=0.8): | |
criterion = nn.CrossEntropyLoss(ignore_index=self.tokenizer.pad_token_id) | |
optimizer = optim.Adam(self.model.parameters(), lr=5e-4) | |
# Init a results dataframe | |
self.results = pd.DataFrame(columns=['epoch', 'transformer', 'batch_size', 'gpu', | |
'training_loss', 'validation_loss', 'epoch_duration_sec']) | |
# The training loop | |
for epoch in range(num_epochs): | |
start_time = time.time() # Start the timer for the epoch | |
# Training | |
## This line tells the model we're in 'learning mode' | |
self.model.train() | |
epoch_training_loss = 0 | |
train_iterator = tqdm(self.train_loader, | |
desc=f"Training Epoch {epoch + 1}/{num_epochs} Batch Size: {self.batch_size}, Transformer: {self.model_name}") | |
for batch in train_iterator: | |
optimizer.zero_grad() | |
inputs = batch['input_ids'].squeeze(1).to(self.device) | |
targets = batch['labels'].squeeze(1).to(self.device) | |
outputs = self.model(input_ids=inputs, labels=targets) | |
loss = outputs.loss | |
loss.backward() | |
optimizer.step() | |
train_iterator.set_postfix({'Training Loss': loss.item()}) | |
epoch_training_loss += loss.item() | |
avg_epoch_training_loss = epoch_training_loss / len(train_iterator) | |
# Validation | |
## This line below tells the model to 'stop learning' | |
self.model.eval() | |
epoch_validation_loss = 0 | |
total_loss = 0 | |
valid_iterator = tqdm(self.valid_loader, desc=f"Validation Epoch {epoch + 1}/{num_epochs}") | |
with torch.no_grad(): | |
for batch in valid_iterator: | |
inputs = batch['input_ids'].squeeze(1).to(self.device) | |
targets = batch['labels'].squeeze(1).to(self.device) | |
outputs = self.model(input_ids=inputs, labels=targets) | |
loss = outputs.loss | |
total_loss += loss | |
valid_iterator.set_postfix({'Validation Loss': loss.item()}) | |
epoch_validation_loss += loss.item() | |
avg_epoch_validation_loss = epoch_validation_loss / len(self.valid_loader) | |
end_time = time.time() # End the timer for the epoch | |
epoch_duration_sec = end_time - start_time # Calculate the duration in seconds | |
new_row = {'transformer': self.model_name, | |
'batch_size': self.batch_size, | |
'gpu': self.gpu, | |
'epoch': epoch + 1, | |
'training_loss': avg_epoch_training_loss, | |
'validation_loss': avg_epoch_validation_loss, | |
'epoch_duration_sec': epoch_duration_sec} # Add epoch_duration to the dataframe | |
self.results.loc[len(self.results)] = new_row | |
print(f"Epoch: {epoch + 1}, Validation Loss: {total_loss / len(self.valid_loader)}") | |
def generate_text(self, input_str, top_k=16, top_p=0.95, temperature=1.0, repetition_penalty=1.2): | |
# Encode input string into tensors via the FlanT5 tokenizer | |
input_ids = self.tokenizer.encode(input_str, return_tensors='pt', max_length=self.max_length, truncation=True).to(self.device) | |
# Run tensors through model to get output tensor values | |
output_ids = self.model.generate(input_ids, | |
max_length=self.max_length, | |
do_sample=True, | |
top_k=top_k, | |
top_p=top_p, | |
temperature=temperature, | |
repetition_penalty=repetition_penalty) | |
# Decode output tensors to text vi | |
output_str = self.tokenizer.decode(output_ids[0], skip_special_tokens=True) | |
return output_str |