|
import csv |
|
import torch |
|
import torch.nn as nn |
|
from torch.utils.data import Dataset, DataLoader |
|
from torch.nn.utils.rnn import pad_sequence |
|
import math |
|
import progressbar |
|
|
|
device="cpu" |
|
|
|
def CreateBar(): |
|
global bar |
|
bar = progressbar.ProgressBar(maxval=100, \ |
|
widgets=[progressbar.Bar('=', '[', ']'), ' ', progressbar.Percentage()]) |
|
bar.start() |
|
|
|
tokens = list("azertyuiopqsdfghjklmwxcvbnäüöß—– ") |
|
tokensdict = {} |
|
|
|
for i in range(len(tokens)): |
|
tokensdict.update({tokens[i]: [0] * i + [0] * (len(tokens) - (i + 1))}) |
|
|
|
|
|
with open("C:\\Users\\marc2\\Downloads\\7eaaf0e22461b505c749e268c0b72bc4-12ebe211a929f039791dfeaa1a019b64cadddaf1\\7eaaf0e22461b505c749e268c0b72bc4-12ebe211a929f039791dfeaa1a019b64cadddaf1\\top-german-verbs.csv", 'r', encoding="utf-8") as file: |
|
|
|
reader = [i for i in csv.reader(file)][1:] |
|
|
|
class CSVDataset(Dataset): |
|
def __init__(self, features, labels): |
|
self.features = features |
|
self.labels = labels |
|
|
|
def __len__(self): |
|
return len(self.features) |
|
|
|
def __getitem__(self, idx): |
|
sample = self.features[idx], self.labels[idx] |
|
return sample |
|
|
|
|
|
features = [] |
|
labels = [] |
|
padding=len(tokens) |
|
|
|
for i in reader: |
|
k = [] |
|
for j in i[2]: |
|
k += [tokens.index(j)] |
|
|
|
features += [torch.Tensor(k)] |
|
k = [] |
|
for j in i[8]: |
|
k += [tokens.index(j)] |
|
|
|
labels += [torch.Tensor(k)] |
|
|
|
MyDataset = CSVDataset(features=features, labels=labels) |
|
|
|
class TransformerModel(nn.Module): |
|
def __init__(self, vocab_size, emb_dim, nhead, num_encoder_layers, num_decoder_layers, dim_feedforward, dropout=0.1): |
|
super().__init__() |
|
self.custom_embedding = nn.Embedding(vocab_size, emb_dim, padding_idx=padding).to(device) |
|
self.pos_encoder = PositionalEncoding(emb_dim, dropout).to(device) |
|
encoder_layer = nn.TransformerEncoderLayer(emb_dim, nhead, dim_feedforward, dropout, batch_first=True).to(device) |
|
self.transformer_encoder = nn.TransformerEncoder(encoder_layer, num_encoder_layers) |
|
decoder_layer = nn.TransformerDecoderLayer(emb_dim, nhead, dim_feedforward, dropout, batch_first=True).to(device) |
|
self.transformer_decoder = nn.TransformerDecoder(decoder_layer, num_decoder_layers) |
|
self.output_layer = nn.Linear(emb_dim, vocab_size).to(device) |
|
|
|
def forward(self, src, tgt, src_mask=None, tgt_mask=None, memory_mask=None, src_key_padding_mask=None, tgt_key_padding_mask=None, memory_key_padding_mask=None): |
|
|
|
|
|
src_emb = self.custom_embedding(src.long()) |
|
src_emb = self.pos_encoder(src_emb) |
|
|
|
tgt_emb = self.custom_embedding(tgt.long()) |
|
|
|
tgt_emb = self.pos_encoder(tgt_emb) |
|
|
|
encoder_output = self.transformer_encoder(src_emb, src_mask, src_key_padding_mask) |
|
decoder_output = self.transformer_decoder(tgt_emb, encoder_output, tgt_mask, memory_mask, tgt_key_padding_mask, memory_key_padding_mask) |
|
output = self.output_layer(decoder_output[:, -1, :]) |
|
|
|
return output |
|
|
|
class PositionalEncoding(nn.Module): |
|
def __init__(self, d_model, dropout=0.1, max_len=5000): |
|
super(PositionalEncoding, self).__init__() |
|
self.dropout = nn.Dropout(p=dropout) |
|
|
|
pe = torch.zeros(max_len, d_model) |
|
position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1) |
|
div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model)) |
|
pe[:, 0::2] = torch.sin(position * div_term) |
|
pe[:, 1::2] = torch.cos(position * div_term) |
|
pe = pe.unsqueeze(0) |
|
self.register_buffer('pe', pe) |
|
|
|
def forward(self, x): |
|
x = x + self.pe[:, :x.size(1), :] |
|
return self.dropout(x) |
|
|
|
def collate_fn(batch): |
|
inputs = [item[0].to(device) for item in batch] |
|
targets = [item[1].to(device) for item in batch] |
|
inputs = pad_sequence(inputs, batch_first=True, padding_value=padding) |
|
targets = pad_sequence(targets, batch_first=True, padding_value=padding) |
|
return inputs, targets |
|
|
|
train_loader = DataLoader(MyDataset, batch_size=32, shuffle=True, collate_fn=collate_fn) |
|
|
|
model = TransformerModel(vocab_size=len(tokens)+1, emb_dim=16, nhead=4, num_encoder_layers=2, num_decoder_layers=2, dim_feedforward=256) |
|
loss_fn = nn.CrossEntropyLoss() |
|
optimizer = torch.optim.Adam(model.parameters(), lr=0.001) |
|
|
|
epochs = 100 |
|
|
|
try: |
|
model.load_state_dict(torch.load("data/PrateritumGPT.pth")) |
|
print("Sucessfully loaded model.") |
|
except: |
|
pass |
|
|
|
|
|
inp=input("Which verb? ") |
|
src=[[]] |
|
tgt=[[tokens.index(inp[0])]] |
|
for i in inp: |
|
src[0]+=[tokens.index(i)] |
|
str_=inp[0] |
|
for i in range(100): |
|
out=model(torch.Tensor(src).to(device),torch.Tensor(tgt).to(device)).tolist()[0] |
|
Best=0 |
|
Best_=tokens.index(" ") |
|
for k,f in enumerate(out): |
|
if f>Best: |
|
Best=f |
|
Best_=k |
|
if Best_==len(tokens): |
|
break |
|
str_+=tokens[Best_] |
|
tgt[0]+=[Best_] |
|
|
|
print(str_) |
|
|
|
|
|
for epoch in range(epochs): |
|
total_loss = 0.0 |
|
|
|
CreateBar() |
|
|
|
bar.start() |
|
|
|
for batch_idx, (inputs, targets) in enumerate(train_loader): |
|
|
|
|
|
|
|
targets.to(device) |
|
inputs.to(device) |
|
|
|
for i in range(1, targets.shape[1]): |
|
optimizer.zero_grad() |
|
output = model(inputs, targets[:, :i]) |
|
|
|
loss = loss_fn(output, targets[:, i].long()) |
|
loss.backward() |
|
optimizer.step() |
|
|
|
total_loss += loss.item() |
|
|
|
mask = targets[:, i] != len(tokens) |
|
targets = targets[mask] |
|
inputs = inputs[mask] |
|
|
|
bar.update((batch_idx+1)/len(train_loader)*100) |
|
|
|
|
|
|
|
bar.finish() |
|
|
|
print(f"Epoch {epoch + 1}/{epochs}, Loss: {total_loss / len(train_loader)}") |
|
|
|
torch.save(model.state_dict(), "data/PrateritumGPT.pth") |
|
|