Sheng Lei
Add application file
28de1fd
raw
history blame contribute delete
No virus
11 kB
#!/usr/bin/env python
# coding: utf-8
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, random_split
import pandas as pd
from tqdm import tqdm
import time
from .Utilities import LanguageDataset
class Seq2Seq():
"""
Base class for Seq2Seq (text-generation models). This class will be inherited by wrappers of transformers like GPT2
and T5.
Attributes:
Methods:
"""
def __init__(self, gpu=0, max_length=0, model_path=None):
# Load Seq2Seq to device based on available hardware
if torch.cuda.is_available():
self.device = torch.device('cuda')
else:
try:
self.device = torch.device('mps') # Apple Silicon
except Exception:
self.device = torch.device('cpu')
# GPU that model will run on
self.gpu = gpu
# Model specs
if model_path: self.model = torch.load(model_path).to(self.device)
else: self.model = None
self.model_name = ""
self.tokenizer = None
self.max_length = max_length
# Training specs
self.train_loader = None
self.valid_loader = None
self.results = pd.DataFrame(columns=['epoch', 'model_arch', 'batch_size', 'gpu', 'training_loss', 'validation_loss', 'epoch_duration_sec'])
def load_data(self, df, batch_size, train_ratio=0.8):
self.batch_size = batch_size
dataset = LanguageDataset(df, self.tokenizer)
train_size = int(0.8*len(dataset))
valid_size = len(dataset) - train_size
train_data, valid_data = random_split(dataset, [train_size, valid_size])
self.max_length = dataset.max_length
self.train_loader = DataLoader(train_data, batch_size=self.batch_size, shuffle=True)
self.valid_loader = DataLoader(valid_data, batch_size=self.batch_size)
""" Return training results """
def summary(self):
return self.results
""" Save model to path """
def to_pt(self, path):
torch.save(self.model, path)
class GPT2(Seq2Seq):
"""
This is the GPT2 implementation of Seq2Seq.
"""
def __init__(self, gpu, model_name, batch_size=16):
super().__init__(gpu, max_length=0)
from transformers import GPT2Tokenizer, GPT2LMHeadModel
self.model_name = model_name
self.model = GPT2LMHeadModel.from_pretrained(self.model_name).to(self.device)
self.tokenizer = GPT2Tokenizer.from_pretrained(self.model_name)
self.tokenizer.pad_token = self.tokenizer.eos_token
def train(self, num_epochs=3, train_ratio=0.8):
criterion = nn.CrossEntropyLoss(ignore_index=self.tokenizer.pad_token_id)
optimizer = optim.Adam(self.model.parameters(), lr=5e-4)
# Init a results dataframe
results = pd.DataFrame(columns=['epoch', 'transformer', 'batch_size', 'gpu',
'training_loss', 'validation_loss', 'epoch_duration_sec'])
# The training loop
for epoch in range(num_epochs):
start_time = time.time() # Start the timer for the epoch
# Training
## This line tells the self.model we're in 'learning mode'
self.model.train()
epoch_training_loss = 0
train_iterator = tqdm(self.train_loader,
desc=f"Training Epoch {epoch + 1}/{num_epochs} Batch Size: {self.batch_size}, Transformer: {self.model_name}")
for batch in train_iterator:
optimizer.zero_grad()
inputs = batch['input_ids'].squeeze(1).to(self.device)
targets = inputs.clone()
outputs = self.model(input_ids=inputs, labels=targets)
loss = outputs.loss
loss.backward()
optimizer.step()
train_iterator.set_postfix({'Training Loss': loss.item()})
epoch_training_loss += loss.item()
avg_epoch_training_loss = epoch_training_loss / len(train_iterator)
# Validation
## This line below tells the self.model to 'stop learning'
self.model.eval()
epoch_validation_loss = 0
total_loss = 0
valid_iterator = tqdm(self.valid_loader, desc=f"Validation Epoch {epoch + 1}/{num_epochs}")
with torch.no_grad():
for batch in valid_iterator:
inputs = batch['input_ids'].squeeze(1).to(self.device)
targets = inputs.clone()
outputs = self.model(input_ids=inputs, labels=targets)
loss = outputs.loss
total_loss += loss
valid_iterator.set_postfix({'Validation Loss': loss.item()})
epoch_validation_loss += loss.item()
avg_epoch_validation_loss = epoch_validation_loss / len(self.valid_loader)
end_time = time.time() # End the timer for the epoch
epoch_duration_sec = end_time - start_time # Calculate the duration in seconds
new_row = {'transformer': self.model_name,
'batch_size': self.batch_size,
'gpu': self.gpu,
'epoch': epoch + 1,
'training_loss': avg_epoch_training_loss,
'validation_loss': avg_epoch_validation_loss,
'epoch_duration_sec': epoch_duration_sec} # Add epoch_duration to the dataframe
self.results.loc[len(self.results)] = new_row
print(f"Epoch: {epoch + 1}, Validation Loss: {total_loss / len(self.valid_loader)}")
def generate_text(self, input_str, top_k=16, top_p=0.95, temperature=1.0, repetition_penalty=1.2):
# Encode string to tokens
input_ids= self.tokenizer.encode(input_str, return_tensors='pt').to(self.device)
# Feed tokens to model and get outcome tokens
output = self.model.generate(
input_ids,
max_length=self.max_length,
num_return_sequences=1,
do_sample=True,
top_k=top_k,
top_p=top_p,
temperature=temperature,
repetition_penalty=repetition_penalty
)
# Decode tokens to string
decoded_output = self.tokenizer.decode(output[0], skip_special_tokens=True)
return decoded_output
class FlanT5(Seq2Seq):
"""
This is the T5 implementation of Seq2Seq - it is designed to support T5 models of various sizes.
"""
def __init__(self, gpu, model_name, batch_size=16):
super().__init__(gpu, max_length=0)
from transformers import T5ForConditionalGeneration, T5Tokenizer
self.model_name = model_name
self.model = T5ForConditionalGeneration.from_pretrained(self.model_name).to(self.device)
self.tokenizer = T5Tokenizer.from_pretrained(self.model_name)
self.tokenizer.pad_token = self.tokenizer.eos_token
def train(self, num_epochs=3, train_ratio=0.8):
criterion = nn.CrossEntropyLoss(ignore_index=self.tokenizer.pad_token_id)
optimizer = optim.Adam(self.model.parameters(), lr=5e-4)
# Init a results dataframe
self.results = pd.DataFrame(columns=['epoch', 'transformer', 'batch_size', 'gpu',
'training_loss', 'validation_loss', 'epoch_duration_sec'])
# The training loop
for epoch in range(num_epochs):
start_time = time.time() # Start the timer for the epoch
# Training
## This line tells the model we're in 'learning mode'
self.model.train()
epoch_training_loss = 0
train_iterator = tqdm(self.train_loader,
desc=f"Training Epoch {epoch + 1}/{num_epochs} Batch Size: {self.batch_size}, Transformer: {self.model_name}")
for batch in train_iterator:
optimizer.zero_grad()
inputs = batch['input_ids'].squeeze(1).to(self.device)
targets = batch['labels'].squeeze(1).to(self.device)
outputs = self.model(input_ids=inputs, labels=targets)
loss = outputs.loss
loss.backward()
optimizer.step()
train_iterator.set_postfix({'Training Loss': loss.item()})
epoch_training_loss += loss.item()
avg_epoch_training_loss = epoch_training_loss / len(train_iterator)
# Validation
## This line below tells the model to 'stop learning'
self.model.eval()
epoch_validation_loss = 0
total_loss = 0
valid_iterator = tqdm(self.valid_loader, desc=f"Validation Epoch {epoch + 1}/{num_epochs}")
with torch.no_grad():
for batch in valid_iterator:
inputs = batch['input_ids'].squeeze(1).to(self.device)
targets = batch['labels'].squeeze(1).to(self.device)
outputs = self.model(input_ids=inputs, labels=targets)
loss = outputs.loss
total_loss += loss
valid_iterator.set_postfix({'Validation Loss': loss.item()})
epoch_validation_loss += loss.item()
avg_epoch_validation_loss = epoch_validation_loss / len(self.valid_loader)
end_time = time.time() # End the timer for the epoch
epoch_duration_sec = end_time - start_time # Calculate the duration in seconds
new_row = {'transformer': self.model_name,
'batch_size': self.batch_size,
'gpu': self.gpu,
'epoch': epoch + 1,
'training_loss': avg_epoch_training_loss,
'validation_loss': avg_epoch_validation_loss,
'epoch_duration_sec': epoch_duration_sec} # Add epoch_duration to the dataframe
self.results.loc[len(self.results)] = new_row
print(f"Epoch: {epoch + 1}, Validation Loss: {total_loss / len(self.valid_loader)}")
def generate_text(self, input_str, top_k=16, top_p=0.95, temperature=1.0, repetition_penalty=1.2):
# Encode input string into tensors via the FlanT5 tokenizer
input_ids = self.tokenizer.encode(input_str, return_tensors='pt', max_length=self.max_length, truncation=True).to(self.device)
# Run tensors through model to get output tensor values
output_ids = self.model.generate(input_ids,
max_length=self.max_length,
do_sample=True,
top_k=top_k,
top_p=top_p,
temperature=temperature,
repetition_penalty=repetition_penalty)
# Decode output tensors to text vi
output_str = self.tokenizer.decode(output_ids[0], skip_special_tokens=True)
return output_str