# -*- coding: utf-8 -*- """HabibiTranslator.ipynb Automatically generated by Colab. Original file is located at https://colab.research.google.com/drive/1lYP3XxUCWdiihU0mIejW_KCqTvy7-tz6 """ import torch torch.cuda.is_available() import torch import torch.nn as nn import torch.optim as optim import math from datasets import load_dataset import numpy as np from collections import Counter import gradio as gr # Seting random seed for reproducibility torch.manual_seed(42) device = torch.device("cuda" if torch.cuda.is_available() else "cpu") dataset = load_dataset('Helsinki-NLP/tatoeba_mt', 'ara-eng', trust_remote_code=True) # tokenization (word-level) def tokenize(text): return text.split() # Building vocabulary from dataset def build_vocab(data, tokenizer, min_freq=2): counter = Counter() for example in data: counter.update(tokenizer(example['sourceString'])) counter.update(tokenizer(example['targetString'])) # Adding special tokens specials = ['', '', '', ''] vocab = specials + [word for word, freq in counter.items() if freq >= min_freq] word2idx = {word: idx for idx, word in enumerate(vocab)} idx2word = {idx: word for word, idx in word2idx.items()} return word2idx, idx2word # Converting text to tensor (adjusted to fit special tokens within max_len) def text_to_tensor(text, vocab, tokenizer, max_len=52): tokens = tokenizer(text)[:max_len - 2] # Reserving space for and tokens = [''] + tokens + [''] tensor = [vocab.get(token, vocab['']) for token in tokens] return torch.tensor(tensor, dtype=torch.long) train_data = dataset['validation'] # Using validation as training data for demo test_data = dataset['test'] # Building shared vocabulary (for simplicity, using both languages in one vocab) word2idx, idx2word = build_vocab(train_data, tokenize) # Hyperparameters for data max_len = 52 # Increased to account for and batch_size = 32 train_data_list = list(train_data) # Convert Dataset to list once print(f"Length of train_data_list: {len(train_data_list)}") def get_batches(data_list, batch_size, max_len=52): total_batches = len(data_list) // batch_size + (1 if len(data_list) % batch_size else 0) print(f"Total batches to process: {total_batches}") for i in range(0, len(data_list), batch_size): batch = data_list[i:i + batch_size] src_batch = [text_to_tensor(example['sourceString'], word2idx, tokenize, max_len) for example in batch] tgt_batch = [text_to_tensor(example['targetString'], word2idx, tokenize, max_len) for example in batch] src_batch = nn.utils.rnn.pad_sequence(src_batch, padding_value=word2idx[''], batch_first=False).to(device) tgt_batch = nn.utils.rnn.pad_sequence(tgt_batch, padding_value=word2idx[''], batch_first=False).to(device) if src_batch.size(0) > max_len: src_batch = src_batch[:max_len, :] elif src_batch.size(0) < max_len: padding = torch.full((max_len - src_batch.size(0), src_batch.size(1)), word2idx[''], dtype=torch.long).to(device) src_batch = torch.cat([src_batch, padding], dim=0) if tgt_batch.size(0) > max_len: tgt_batch = tgt_batch[:max_len, :] elif tgt_batch.size(0) < max_len: padding = torch.full((max_len - tgt_batch.size(0), tgt_batch.size(1)), word2idx[''], dtype=torch.long).to(device) tgt_batch = torch.cat([tgt_batch, padding], dim=0) src_batch = src_batch.transpose(0, 1) # [batch_size, seq_len] tgt_batch = tgt_batch.transpose(0, 1) # [batch_size, seq_len] yield src_batch, tgt_batch print("Revised Chunk 1 (Seventh Iteration) completed: Dataset loaded and preprocessing debugged.") class PositionalEncoding(nn.Module): def __init__(self, d_model, max_len=52): super().__init__() pe = torch.zeros(max_len, d_model) position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1) div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model)) pe[:, 0::2] = torch.sin(position * div_term) pe[:, 1::2] = torch.cos(position * div_term) pe = pe.unsqueeze(0) # Shape: (1, max_len, d_model) self.register_buffer('pe', pe) def forward(self, x): return x + self.pe[:, :x.size(1), :] class MultiHeadAttention(nn.Module): def __init__(self, d_model, num_heads): super().__init__() assert d_model % num_heads == 0 self.d_model = d_model self.num_heads = num_heads self.d_k = d_model // num_heads self.W_q = nn.Linear(d_model, d_model) self.W_k = nn.Linear(d_model, d_model) self.W_v = nn.Linear(d_model, d_model) self.W_o = nn.Linear(d_model, d_model) def scaled_dot_product_attention(self, Q, K, V, mask=None): scores = torch.matmul(Q, K.transpose(-2, -1)) / math.sqrt(self.d_k) if mask is not None: scores = scores.masked_fill(mask == 0, -1e9) attn = torch.softmax(scores, dim=-1) return torch.matmul(attn, V) def forward(self, Q, K, V, mask=None): batch_size = Q.size(0) seq_len_q = Q.size(1) seq_len_k = K.size(1) Q = self.W_q(Q) K = self.W_k(K) V = self.W_v(V) Q = Q.view(batch_size, seq_len_q, self.num_heads, self.d_k).transpose(1, 2) K = K.view(batch_size, seq_len_k, self.num_heads, self.d_k).transpose(1, 2) V = V.view(batch_size, seq_len_k, self.num_heads, self.d_k).transpose(1, 2) output = self.scaled_dot_product_attention(Q, K, V, mask) output = output.transpose(1, 2).contiguous().view(batch_size, seq_len_q, self.d_model) return self.W_o(output) class FeedForward(nn.Module): def __init__(self, d_model, d_ff): super().__init__() self.linear1 = nn.Linear(d_model, d_ff) self.linear2 = nn.Linear(d_ff, d_model) self.relu = nn.ReLU() def forward(self, x): return self.linear2(self.relu(self.linear1(x))) class EncoderLayer(nn.Module): def __init__(self, d_model, num_heads, d_ff, dropout=0.1): super().__init__() self.mha = MultiHeadAttention(d_model, num_heads) self.ff = FeedForward(d_model, d_ff) self.norm1 = nn.LayerNorm(d_model) self.norm2 = nn.LayerNorm(d_model) self.dropout = nn.Dropout(dropout) def forward(self, x, mask=None): attn_output = self.mha(x, x, x, mask) x = self.norm1(x + self.dropout(attn_output)) ff_output = self.ff(x) return self.norm2(x + self.dropout(ff_output)) class DecoderLayer(nn.Module): def __init__(self, d_model, num_heads, d_ff, dropout=0.1): super().__init__() self.mha1 = MultiHeadAttention(d_model, num_heads) self.mha2 = MultiHeadAttention(d_model, num_heads) self.ff = FeedForward(d_model, d_ff) self.norm1 = nn.LayerNorm(d_model) self.norm2 = nn.LayerNorm(d_model) self.norm3 = nn.LayerNorm(d_model) self.dropout = nn.Dropout(dropout) def forward(self, x, enc_output, src_mask=None, tgt_mask=None): attn1_output = self.mha1(x, x, x, tgt_mask) x = self.norm1(x + self.dropout(attn1_output)) attn2_output = self.mha2(x, enc_output, enc_output, src_mask) x = self.norm2(x + self.dropout(attn2_output)) ff_output = self.ff(x) return self.norm3(x + self.dropout(ff_output)) class Transformer(nn.Module): def __init__(self, src_vocab_size, tgt_vocab_size, d_model=256, num_heads=8, num_layers=3, d_ff=1024, max_len=52, dropout=0.1): super().__init__() self.d_model = d_model self.src_embedding = nn.Embedding(src_vocab_size, d_model) self.tgt_embedding = nn.Embedding(tgt_vocab_size, d_model) self.pos_encoding = PositionalEncoding(d_model, max_len) self.encoder_layers = nn.ModuleList([EncoderLayer(d_model, num_heads, d_ff, dropout) for _ in range(num_layers)]) self.decoder_layers = nn.ModuleList([DecoderLayer(d_model, num_heads, d_ff, dropout) for _ in range(num_layers)]) self.fc_out = nn.Linear(d_model, tgt_vocab_size) self.dropout = nn.Dropout(dropout) def generate_mask(self, src, tgt): src_mask = (src != word2idx['']).unsqueeze(1).unsqueeze(2) tgt_mask = (tgt != word2idx['']).unsqueeze(1).unsqueeze(3) seq_len = tgt.size(1) nopeak_mask = (1 - torch.triu(torch.ones(1, seq_len, seq_len), diagonal=1)).bool().to(device) tgt_mask = tgt_mask & nopeak_mask return src_mask, tgt_mask def forward(self, src, tgt): src_mask, tgt_mask = self.generate_mask(src, tgt) src_embedded = self.dropout(self.pos_encoding(self.src_embedding(src) * math.sqrt(self.d_model))) tgt_embedded = self.dropout(self.pos_encoding(self.tgt_embedding(tgt) * math.sqrt(self.d_model))) enc_output = src_embedded for enc_layer in self.encoder_layers: enc_output = enc_layer(enc_output, src_mask) dec_output = tgt_embedded for dec_layer in self.decoder_layers: dec_output = dec_layer(dec_output, enc_output, src_mask, tgt_mask) return self.fc_out(dec_output) print("Revised Chunk 2 (Fourth Iteration) completed: Transformer model fixed with max_len=52.") vocab_size = len(word2idx) model = Transformer( src_vocab_size=vocab_size, tgt_vocab_size=vocab_size, d_model=256, num_heads=8, num_layers=3, d_ff=1024, max_len=52, dropout=0.1 ).to(device) # Loss and optimizer criterion = nn.CrossEntropyLoss(ignore_index=word2idx['']) optimizer = optim.Adam(model.parameters(), lr=0.0001) # Training loop with progress feedback def train(model, data, epochs=20): model.train() total_batches = len(data) // batch_size + (1 if len(data) % batch_size else 0) print(f"Total batches per epoch: {total_batches}") for epoch in range(epochs): total_loss = 0 for batch_idx, (src_batch, tgt_batch) in enumerate(get_batches(data, batch_size, max_len=52), 1): if batch_idx % 100 == 0: # Printing every 100 batches for feedback print(f"Epoch {epoch + 1}, Batch {batch_idx}/{total_batches} ") optimizer.zero_grad() output = model(src_batch, tgt_batch[:, :-1]) loss = criterion(output.view(-1, vocab_size), tgt_batch[:, 1:].reshape(-1)) loss.backward() optimizer.step() total_loss += loss.item() avg_loss = total_loss / total_batches print(f"Epoch {epoch + 1}/{epochs}, Loss: {avg_loss:.4f}") # Main function def translate(model, sentence, max_len=52): model.eval() with torch.no_grad(): src = text_to_tensor(sentence, word2idx, tokenize, max_len).unsqueeze(0).to(device) tgt = torch.tensor([word2idx['']], dtype=torch.long).unsqueeze(0).to(device) for _ in range(max_len): output = model(src, tgt) next_token = output[:, -1, :].argmax(dim=-1).item() if next_token == word2idx['']: break tgt = torch.cat([tgt, torch.tensor([[next_token]], dtype=torch.long).to(device)], dim=1) translated = [idx2word[idx.item()] for idx in tgt[0] if idx.item() in idx2word] return ' '.join(translated[1:]) # Testing test_sentence = "عمرك رايح المكسيك؟" translated = translate(model, test_sentence) print(f"Input: {test_sentence}") print(f"Translated: {translated}") print("Chunk 3 completed: Training and inference implemented.") device = torch.device("cuda" if torch.cuda.is_available() else "cpu") # Instantiate the model (assuming train_dataset is already defined) model = Transformer( src_vocab_size=vocab_size, tgt_vocab_size=vocab_size ).to(device) # Load model checkpoint and set to evaluation mode model.load_state_dict(torch.load("habibi.pth", map_location=device)) model.eval() def gradio_translate(text): return translate(model, text) interface = gr.Interface( fn=gradio_translate, inputs=gr.Textbox(lines=2, placeholder="Enter Arabic sentence here..."), outputs="text", title="Habibi-Translator", description="Translate Arabic sentences to English using a Transformer model." ) interface.launch() print("Chunk 4 completed: Gradio interface deployed.")