Spaces:
Sleeping
Sleeping
import gradio as gr | |
import numpy as np | |
import random | |
import torch | |
import transformers | |
from transformers import GPT2LMHeadModel, GPT2Tokenizer, Trainer, TrainingArguments, DataCollatorForLanguageModeling | |
from datasets import Dataset | |
# import spaces | |
# Set random seeds for reproducibility | |
random.seed(42) | |
np.random.seed(42) | |
torch.manual_seed(42) | |
def generate_demo_data(num_samples=60): | |
subjects = [ | |
'Artificial intelligence', 'Climate change', 'Renewable energy', | |
'Space exploration', 'Quantum computing', 'Genetic engineering', | |
'Blockchain technology', 'Virtual reality', 'Cybersecurity', | |
'Biotechnology', 'Nanotechnology', 'Astrophysics' | |
] | |
verbs = [ | |
'is transforming', 'is influencing', 'is revolutionizing', | |
'is challenging', 'is advancing', 'is reshaping', 'is impacting', | |
'is enhancing', 'is disrupting', 'is redefining' | |
] | |
objects = [ | |
'modern science', 'global economies', 'healthcare systems', | |
'communication methods', 'educational approaches', | |
'environmental policies', 'social interactions', 'the job market', | |
'data security', 'the entertainment industry' | |
] | |
data = [] | |
for i in range(num_samples): | |
subject = random.choice(subjects) | |
verb = random.choice(verbs) | |
obj = random.choice(objects) | |
sentence = f"{subject} {verb} {obj}." | |
data.append(sentence) | |
return data | |
def load_data(uploaded_file): | |
data = uploaded_file.decode("utf-8") | |
data = data.splitlines() | |
return data | |
def prepare_dataset(data, tokenizer, block_size=128): | |
def tokenize_function(examples): | |
return tokenizer(examples['text'], truncation=True, max_length=block_size, padding='max_length') | |
raw_dataset = Dataset.from_dict({'text': data}) | |
tokenized_dataset = raw_dataset.map(tokenize_function, batched=True, remove_columns=['text']) | |
tokenized_dataset = tokenized_dataset.map( | |
lambda examples: {'labels': examples['input_ids']}, | |
batched=True | |
) | |
tokenized_dataset.set_format(type='torch', columns=['input_ids', 'attention_mask', 'labels']) | |
return tokenized_dataset | |
# @spaces.GPU() | |
def fitness_function(individual, train_dataset, tokenizer, model_state_dict): | |
# Initialize the model inside this function | |
model = GPT2LMHeadModel.from_pretrained('gpt2') | |
model.load_state_dict(model_state_dict) | |
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') | |
model.to(device) | |
training_args = TrainingArguments( | |
output_dir='./results', | |
overwrite_output_dir=True, | |
num_train_epochs=individual['epochs'], | |
per_device_train_batch_size=individual['batch_size'], | |
learning_rate=individual['learning_rate'], | |
logging_steps=10, | |
save_steps=10, | |
save_total_limit=2, | |
report_to='none', | |
) | |
data_collator = DataCollatorForLanguageModeling( | |
tokenizer=tokenizer, mlm=False | |
) | |
trainer = Trainer( | |
model=model, | |
args=training_args, | |
data_collator=data_collator, | |
train_dataset=train_dataset, | |
eval_dataset=None, | |
) | |
trainer.train() | |
logs = [log for log in trainer.state.log_history if 'loss' in log] | |
if logs: | |
loss = logs[-1]['loss'] | |
else: | |
loss = float('inf') | |
# Clean up GPU memory | |
del model | |
torch.cuda.empty_cache() | |
return loss | |
def create_population(size, param_bounds): | |
population = [] | |
for _ in range(size): | |
individual = { | |
'learning_rate': random.uniform(*param_bounds['learning_rate']), | |
'epochs': random.randint(*param_bounds['epochs']), | |
'batch_size': random.choice(param_bounds['batch_size']), | |
} | |
population.append(individual) | |
return population | |
def select_mating_pool(population, fitnesses, num_parents): | |
parents = [population[i] for i in np.argsort(fitnesses)[:num_parents]] | |
return parents | |
def crossover(parents, offspring_size): | |
offspring = [] | |
for _ in range(offspring_size): | |
parent1 = random.choice(parents) | |
parent2 = random.choice(parents) | |
child = { | |
'learning_rate': random.choice([parent1['learning_rate'], parent2['learning_rate']]), | |
'epochs': random.choice([parent1['epochs'], parent2['epochs']]), | |
'batch_size': random.choice([parent1['batch_size'], parent2['batch_size']]), | |
} | |
offspring.append(child) | |
return offspring | |
def mutation(offspring, param_bounds, mutation_rate=0.1): | |
for individual in offspring: | |
if random.random() < mutation_rate: | |
individual['learning_rate'] = random.uniform(*param_bounds['learning_rate']) | |
if random.random() < mutation_rate: | |
individual['epochs'] = random.randint(*param_bounds['epochs']) | |
if random.random() < mutation_rate: | |
individual['batch_size'] = random.choice(param_bounds['batch_size']) | |
return offspring | |
def gpt2_fine_tuning(option, uploaded_file, population_size, num_generations, num_parents, mutation_rate): | |
if option == 'DEMO': | |
data = generate_demo_data() | |
else: | |
if uploaded_file is None: | |
return "Please upload a valid text file." | |
data = load_data(uploaded_file) | |
tokenizer = GPT2Tokenizer.from_pretrained('gpt2') | |
tokenizer.pad_token = tokenizer.eos_token | |
train_dataset = prepare_dataset(data, tokenizer) | |
# Save the initial model state dict | |
initial_model = GPT2LMHeadModel.from_pretrained('gpt2') | |
model_state_dict = initial_model.state_dict() | |
del initial_model | |
param_bounds = { | |
'learning_rate': (1e-5, 5e-5), | |
'epochs': (1, 2), # Adjusted to ensure training fits within 120 seconds | |
'batch_size': [2, 4] # Adjusted for the same reason | |
} | |
population = create_population(population_size, param_bounds) | |
best_individual = None | |
best_fitness = float('inf') | |
fitness_history = [] | |
for generation in range(num_generations): | |
fitnesses = [] | |
for individual in population: | |
# Call fitness_function for each individual | |
fitness = fitness_function(individual, train_dataset, tokenizer, model_state_dict) | |
fitnesses.append(fitness) | |
if fitness < best_fitness: | |
best_fitness = fitness | |
best_individual = individual | |
fitness_history.append(min(fitnesses)) | |
parents = select_mating_pool(population, fitnesses, num_parents) | |
offspring_size = population_size - num_parents | |
offspring = crossover(parents, offspring_size) | |
offspring = mutation(offspring, param_bounds, mutation_rate) | |
population = parents + offspring | |
return f"Best Hyperparameters: {best_individual}\nBest Fitness (Loss): {best_fitness}" | |
# Gradio Interface | |
demo_option = gr.Radio(["DEMO", "Upload Text File"], label="Data Source") | |
upload_file = gr.File(label="Upload Text File") | |
population_size_input = gr.Slider(minimum=4, maximum=10, value=4, label="Population Size") # Adjusted for performance | |
num_generations_input = gr.Slider(minimum=1, maximum=5, value=2, label="Number of Generations") # Adjusted for performance | |
num_parents_input = gr.Slider(minimum=2, maximum=4, value=2, label="Number of Parents") # Adjusted for performance | |
mutation_rate_input = gr.Slider(minimum=0.0, maximum=1.0, value=0.1, label="Mutation Rate") | |
gr.Interface( | |
fn=gpt2_fine_tuning, | |
inputs=[demo_option, upload_file, population_size_input, num_generations_input, num_parents_input, mutation_rate_input], | |
outputs="text", | |
title="GPT-2 Fine-Tuning with Genetic Algorithm", | |
).launch() | |