#!/usr/bin/env python # coding: utf-8 import torch import torch.nn as nn import torch.optim as optim from torch.utils.data import DataLoader, random_split import pandas as pd from tqdm import tqdm import time from .Utilities import LanguageDataset class Seq2Seq(): """ Base class for Seq2Seq (text-generation models). This class will be inherited by wrappers of transformers like GPT2 and T5. Attributes: Methods: """ def __init__(self, gpu=0, max_length=0, model_path=None): # Load Seq2Seq to device based on available hardware if torch.cuda.is_available(): self.device = torch.device('cuda') else: try: self.device = torch.device('mps') # Apple Silicon except Exception: self.device = torch.device('cpu') # GPU that model will run on self.gpu = gpu # Model specs if model_path: self.model = torch.load(model_path).to(self.device) else: self.model = None self.model_name = "" self.tokenizer = None self.max_length = max_length # Training specs self.train_loader = None self.valid_loader = None self.results = pd.DataFrame(columns=['epoch', 'model_arch', 'batch_size', 'gpu', 'training_loss', 'validation_loss', 'epoch_duration_sec']) def load_data(self, df, batch_size, train_ratio=0.8): self.batch_size = batch_size dataset = LanguageDataset(df, self.tokenizer) train_size = int(0.8*len(dataset)) valid_size = len(dataset) - train_size train_data, valid_data = random_split(dataset, [train_size, valid_size]) self.max_length = dataset.max_length self.train_loader = DataLoader(train_data, batch_size=self.batch_size, shuffle=True) self.valid_loader = DataLoader(valid_data, batch_size=self.batch_size) """ Return training results """ def summary(self): return self.results """ Save model to path """ def to_pt(self, path): torch.save(self.model, path) class GPT2(Seq2Seq): """ This is the GPT2 implementation of Seq2Seq. """ def __init__(self, gpu, model_name, batch_size=16): super().__init__(gpu, max_length=0) from transformers import GPT2Tokenizer, GPT2LMHeadModel self.model_name = model_name self.model = GPT2LMHeadModel.from_pretrained(self.model_name).to(self.device) self.tokenizer = GPT2Tokenizer.from_pretrained(self.model_name) self.tokenizer.pad_token = self.tokenizer.eos_token def train(self, num_epochs=3, train_ratio=0.8): criterion = nn.CrossEntropyLoss(ignore_index=self.tokenizer.pad_token_id) optimizer = optim.Adam(self.model.parameters(), lr=5e-4) # Init a results dataframe results = pd.DataFrame(columns=['epoch', 'transformer', 'batch_size', 'gpu', 'training_loss', 'validation_loss', 'epoch_duration_sec']) # The training loop for epoch in range(num_epochs): start_time = time.time() # Start the timer for the epoch # Training ## This line tells the self.model we're in 'learning mode' self.model.train() epoch_training_loss = 0 train_iterator = tqdm(self.train_loader, desc=f"Training Epoch {epoch + 1}/{num_epochs} Batch Size: {self.batch_size}, Transformer: {self.model_name}") for batch in train_iterator: optimizer.zero_grad() inputs = batch['input_ids'].squeeze(1).to(self.device) targets = inputs.clone() outputs = self.model(input_ids=inputs, labels=targets) loss = outputs.loss loss.backward() optimizer.step() train_iterator.set_postfix({'Training Loss': loss.item()}) epoch_training_loss += loss.item() avg_epoch_training_loss = epoch_training_loss / len(train_iterator) # Validation ## This line below tells the self.model to 'stop learning' self.model.eval() epoch_validation_loss = 0 total_loss = 0 valid_iterator = tqdm(self.valid_loader, desc=f"Validation Epoch {epoch + 1}/{num_epochs}") with torch.no_grad(): for batch in valid_iterator: inputs = batch['input_ids'].squeeze(1).to(self.device) targets = inputs.clone() outputs = self.model(input_ids=inputs, labels=targets) loss = outputs.loss total_loss += loss valid_iterator.set_postfix({'Validation Loss': loss.item()}) epoch_validation_loss += loss.item() avg_epoch_validation_loss = epoch_validation_loss / len(self.valid_loader) end_time = time.time() # End the timer for the epoch epoch_duration_sec = end_time - start_time # Calculate the duration in seconds new_row = {'transformer': self.model_name, 'batch_size': self.batch_size, 'gpu': self.gpu, 'epoch': epoch + 1, 'training_loss': avg_epoch_training_loss, 'validation_loss': avg_epoch_validation_loss, 'epoch_duration_sec': epoch_duration_sec} # Add epoch_duration to the dataframe self.results.loc[len(self.results)] = new_row print(f"Epoch: {epoch + 1}, Validation Loss: {total_loss / len(self.valid_loader)}") def generate_text(self, input_str, top_k=16, top_p=0.95, temperature=1.0, repetition_penalty=1.2): # Encode string to tokens input_ids= self.tokenizer.encode(input_str, return_tensors='pt').to(self.device) # Feed tokens to model and get outcome tokens output = self.model.generate( input_ids, max_length=self.max_length, num_return_sequences=1, do_sample=True, top_k=top_k, top_p=top_p, temperature=temperature, repetition_penalty=repetition_penalty ) # Decode tokens to string decoded_output = self.tokenizer.decode(output[0], skip_special_tokens=True) return decoded_output class FlanT5(Seq2Seq): """ This is the T5 implementation of Seq2Seq - it is designed to support T5 models of various sizes. """ def __init__(self, gpu, model_name, batch_size=16): super().__init__(gpu, max_length=0) from transformers import T5ForConditionalGeneration, T5Tokenizer self.model_name = model_name self.model = T5ForConditionalGeneration.from_pretrained(self.model_name).to(self.device) self.tokenizer = T5Tokenizer.from_pretrained(self.model_name) self.tokenizer.pad_token = self.tokenizer.eos_token def train(self, num_epochs=3, train_ratio=0.8): criterion = nn.CrossEntropyLoss(ignore_index=self.tokenizer.pad_token_id) optimizer = optim.Adam(self.model.parameters(), lr=5e-4) # Init a results dataframe self.results = pd.DataFrame(columns=['epoch', 'transformer', 'batch_size', 'gpu', 'training_loss', 'validation_loss', 'epoch_duration_sec']) # The training loop for epoch in range(num_epochs): start_time = time.time() # Start the timer for the epoch # Training ## This line tells the model we're in 'learning mode' self.model.train() epoch_training_loss = 0 train_iterator = tqdm(self.train_loader, desc=f"Training Epoch {epoch + 1}/{num_epochs} Batch Size: {self.batch_size}, Transformer: {self.model_name}") for batch in train_iterator: optimizer.zero_grad() inputs = batch['input_ids'].squeeze(1).to(self.device) targets = batch['labels'].squeeze(1).to(self.device) outputs = self.model(input_ids=inputs, labels=targets) loss = outputs.loss loss.backward() optimizer.step() train_iterator.set_postfix({'Training Loss': loss.item()}) epoch_training_loss += loss.item() avg_epoch_training_loss = epoch_training_loss / len(train_iterator) # Validation ## This line below tells the model to 'stop learning' self.model.eval() epoch_validation_loss = 0 total_loss = 0 valid_iterator = tqdm(self.valid_loader, desc=f"Validation Epoch {epoch + 1}/{num_epochs}") with torch.no_grad(): for batch in valid_iterator: inputs = batch['input_ids'].squeeze(1).to(self.device) targets = batch['labels'].squeeze(1).to(self.device) outputs = self.model(input_ids=inputs, labels=targets) loss = outputs.loss total_loss += loss valid_iterator.set_postfix({'Validation Loss': loss.item()}) epoch_validation_loss += loss.item() avg_epoch_validation_loss = epoch_validation_loss / len(self.valid_loader) end_time = time.time() # End the timer for the epoch epoch_duration_sec = end_time - start_time # Calculate the duration in seconds new_row = {'transformer': self.model_name, 'batch_size': self.batch_size, 'gpu': self.gpu, 'epoch': epoch + 1, 'training_loss': avg_epoch_training_loss, 'validation_loss': avg_epoch_validation_loss, 'epoch_duration_sec': epoch_duration_sec} # Add epoch_duration to the dataframe self.results.loc[len(self.results)] = new_row print(f"Epoch: {epoch + 1}, Validation Loss: {total_loss / len(self.valid_loader)}") def generate_text(self, input_str, top_k=16, top_p=0.95, temperature=1.0, repetition_penalty=1.2): # Encode input string into tensors via the FlanT5 tokenizer input_ids = self.tokenizer.encode(input_str, return_tensors='pt', max_length=self.max_length, truncation=True).to(self.device) # Run tensors through model to get output tensor values output_ids = self.model.generate(input_ids, max_length=self.max_length, do_sample=True, top_k=top_k, top_p=top_p, temperature=temperature, repetition_penalty=repetition_penalty) # Decode output tensors to text vi output_str = self.tokenizer.decode(output_ids[0], skip_special_tokens=True) return output_str