NCTCMumbai's picture
Upload 2571 files
0b8359d
raw
history blame
14.4 kB
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
"""Genetic algorithm for BF tasks.
Inspired by https://github.com/primaryobjects/AI-Programmer.
GA function code borrowed from https://github.com/DEAP/deap.
"""
from collections import namedtuple
import random
from absl import flags
from absl import logging
import numpy as np
from six.moves import xrange
from common import bf # brain coder
from common import utils # brain coder
from single_task import misc # brain coder
FLAGS = flags.FLAGS
# Saving reward of previous programs saves computation if a program appears
# again.
USE_REWARD_CACHE = True # Disable this if GA is using up too much memory.
GENES = bf.CHARS
MAX_PROGRAM_STEPS = 500
STEP_BONUS = True
ALPHANUM_CHARS = (
['_'] +
[chr(ord('a') + i_) for i_ in range(26)] +
[chr(ord('A') + i_) for i_ in range(26)] +
[chr(ord('0') + i_) for i_ in range(10)])
Result = namedtuple(
'Result',
['reward', 'inputs', 'code_outputs', 'target_outputs', 'type_in',
'type_out', 'base', 'correct'])
class IOType(object):
string = 'string'
integer = 'integer'
class CustomType(object):
def __init__(self, to_str_fn):
self.to_str_fn = to_str_fn
def __call__(self, obj):
return self.to_str_fn(obj)
def tokens_list_repr(tokens, repr_type, base):
"""Make human readable representation of program IO."""
if isinstance(repr_type, CustomType):
return repr_type(tokens)
elif repr_type == IOType.string:
chars = (
[ALPHANUM_CHARS[t] for t in tokens] if base < len(ALPHANUM_CHARS)
else [chr(t) for t in tokens])
return ''.join(chars)
elif repr_type == IOType.integer:
return str(tokens)
raise ValueError('No such representation type "%s"', repr_type)
def io_repr(result):
"""Make human readable representation of test cases."""
inputs = ','.join(
tokens_list_repr(tokens, result.type_in, result.base)
for tokens in result.inputs)
code_outputs = ','.join(
tokens_list_repr(tokens, result.type_out, result.base)
for tokens in result.code_outputs)
target_outputs = ','.join(
tokens_list_repr(tokens, result.type_out, result.base)
for tokens in result.target_outputs)
return inputs, target_outputs, code_outputs
def make_task_eval_fn(task_manager):
"""Returns a wrapper that converts an RL task into a GA task.
Args:
task_manager: Is a task manager object from code_tasks.py
Returns:
A function that takes as input a single list of a code chars, and outputs
a Result namedtuple instance containing the reward and information about
code execution.
"""
def to_data_list(single_or_tuple):
if isinstance(single_or_tuple, misc.IOTuple):
return list(single_or_tuple)
return [single_or_tuple]
def to_ga_type(rl_type):
if rl_type == misc.IOType.string:
return IOType.string
return IOType.integer
# Wrapper function.
def evalbf(bf_chars):
result = task_manager._score_code(''.join(bf_chars))
reward = sum(result.episode_rewards)
correct = result.reason == 'correct'
return Result(
reward=reward,
inputs=to_data_list(result.input_case),
code_outputs=to_data_list(result.code_output),
target_outputs=to_data_list(result.correct_output),
type_in=to_ga_type(result.input_type),
type_out=to_ga_type(result.output_type),
correct=correct,
base=task_manager.task.base)
return evalbf
def debug_str(individual, task_eval_fn):
res = task_eval_fn(individual)
input_str, target_output_str, code_output_str = io_repr(res)
return (
''.join(individual) +
' | ' + input_str +
' | ' + target_output_str +
' | ' + code_output_str +
' | ' + str(res.reward) +
' | ' + str(res.correct))
def mutate_single(code_tokens, mutation_rate):
"""Mutate a single code string.
Args:
code_tokens: A string/list/Individual of BF code chars. Must end with EOS
symbol '_'.
mutation_rate: Float between 0 and 1 which sets the probability of each char
being mutated.
Returns:
An Individual instance containing the mutated code string.
Raises:
ValueError: If `code_tokens` does not end with EOS symbol.
"""
if len(code_tokens) <= 1:
return code_tokens
if code_tokens[-1] == '_':
# Do this check to ensure that the code strings have not been corrupted.
raise ValueError('`code_tokens` must end with EOS symbol.')
else:
cs = Individual(code_tokens)
eos = []
mutated = False
for pos in range(len(cs)):
if random.random() < mutation_rate:
mutated = True
new_char = GENES[random.randrange(len(GENES))]
x = random.random()
if x < 0.25 and pos != 0 and pos != len(cs) - 1:
# Insertion mutation.
if random.random() < 0.50:
# Shift up.
cs = cs[:pos] + [new_char] + cs[pos:-1]
else:
# Shift down.
cs = cs[1:pos] + [new_char] + cs[pos:]
elif x < 0.50:
# Deletion mutation.
if random.random() < 0.50:
# Shift down.
cs = cs[:pos] + cs[pos + 1:] + [new_char]
else:
# Shift up.
cs = [new_char] + cs[:pos] + cs[pos + 1:]
elif x < 0.75:
# Shift rotate mutation (position invariant).
if random.random() < 0.50:
# Shift down.
cs = cs[1:] + [cs[0]]
else:
# Shift up.
cs = [cs[-1]] + cs[:-1]
else:
# Replacement mutation.
cs = cs[:pos] + [new_char] + cs[pos + 1:]
assert len(cs) + len(eos) == len(code_tokens)
if mutated:
return Individual(cs + eos)
else:
return Individual(code_tokens)
def crossover(parent1, parent2):
"""Performs crossover mating between two code strings.
Crossover mating is where a random position is selected, and the chars
after that point are swapped. The resulting new code strings are returned.
Args:
parent1: First code string.
parent2: Second code string.
Returns:
A 2-tuple of children, i.e. the resulting code strings after swapping.
"""
max_parent, min_parent = (
(parent1, parent2) if len(parent1) > len(parent2)
else (parent2, parent1))
pos = random.randrange(len(max_parent))
if pos >= len(min_parent):
child1 = max_parent[:pos]
child2 = min_parent + max_parent[pos:]
else:
child1 = max_parent[:pos] + min_parent[pos:]
child2 = min_parent[:pos] + max_parent[pos:]
return Individual(child1), Individual(child2)
def _make_even(n):
"""Return largest even integer less than or equal to `n`."""
return (n >> 1) << 1
def mutate_and_crossover(population, mutation_rate, crossover_rate):
"""Take a generational step over a population.
Transforms population of parents into population of children (of the same
size) via crossover mating and then mutation on the resulting children.
Args:
population: Parent population. A list of Individual objects.
mutation_rate: Probability of mutation. See `mutate_single`.
crossover_rate: Probability that two parents will mate.
Returns:
Child population. A list of Individual objects.
"""
children = [None] * len(population)
for i in xrange(0, _make_even(len(population)), 2):
p1 = population[i]
p2 = population[i + 1]
if random.random() < crossover_rate:
p1, p2 = crossover(p1, p2)
c1 = mutate_single(p1, mutation_rate)
c2 = mutate_single(p2, mutation_rate)
children[i] = c1
children[i + 1] = c2
if children[-1] is None:
children[-1] = population[-1]
return children
def ga_loop(population, cxpb, mutpb, ngen, task_eval_fn, halloffame=None,
checkpoint_writer=None):
"""A bare bones genetic algorithm.
Similar to chapter 7 of Back, Fogel and Michalewicz, "Evolutionary
Computation 1 : Basic Algorithms and Operators", 2000.
Args:
population: A list of individuals.
cxpb: The probability of mating two individuals.
mutpb: The probability of mutating a gene.
ngen: The number of generation. Unlimited if zero.
task_eval_fn: A python function which maps an Individual to a Result
namedtuple.
halloffame: (optional) a utils.MaxUniquePriorityQueue object that will be
used to aggregate the best individuals found during search.
checkpoint_writer: (optional) an object that can save and load populations.
Needs to have `write`, `load`, and `has_checkpoint` methods. Used to
periodically save progress. In event of a restart, the population will
be loaded from disk.
Returns:
GaResult namedtuple instance. This contains information about the GA run,
including the resulting population, best reward (fitness) obtained, and
the best code string found.
"""
has_checkpoint = False
if checkpoint_writer and checkpoint_writer.has_checkpoint():
try:
gen, population, halloffame = checkpoint_writer.load()
except EOFError: # Data was corrupted. Start over.
pass
else:
has_checkpoint = True
logging.info(
'Loaded population from checkpoint. Starting at generation %d', gen)
# Evaluate the individuals with an invalid fitness
invalid_ind = [ind for ind in population if not ind.fitness.valid]
for ind in invalid_ind:
ind.fitness.values = task_eval_fn(ind).reward,
for _, ind in halloffame.iter_in_order():
ind.fitness.values = task_eval_fn(ind).reward,
if not has_checkpoint:
# Evaluate the individuals with an invalid fitness
invalid_ind = [ind for ind in population if not ind.fitness.valid]
for ind in invalid_ind:
ind.fitness.values = task_eval_fn(ind).reward,
if halloffame is not None:
for ind in population:
halloffame.push(ind.fitness.values, tuple(ind), ind)
logging.info('Initialized new population.')
gen = 1
pop_size = len(population)
program_reward_cache = {} if USE_REWARD_CACHE else None
# Begin the generational process
while ngen == 0 or gen <= ngen:
# Select the next generation individuals
offspring = roulette_selection(population, pop_size - len(halloffame))
# Vary the pool of individuals
# offspring = varAnd(offspring, toolbox, cxpb, mutpb)
offspring = mutate_and_crossover(
offspring, mutation_rate=mutpb, crossover_rate=cxpb)
# Evaluate the individuals with an invalid fitness
invalid_ind = [ind for ind in offspring if not ind.fitness.valid]
for ind in invalid_ind:
str_repr = ''.join(ind)
if program_reward_cache is not None and str_repr in program_reward_cache:
ind.fitness.values = (program_reward_cache[str_repr],)
else:
eval_result = task_eval_fn(ind)
ind.fitness.values = (eval_result.reward,)
if program_reward_cache is not None:
program_reward_cache[str_repr] = eval_result.reward
# Replace the current population by the offspring
population = list(offspring)
# Update the hall of fame with the generated individuals
if halloffame is not None:
for ind in population:
halloffame.push(ind.fitness.values, tuple(ind), ind)
# elitism
population.extend([ind for _, ind in halloffame.iter_in_order()])
if gen % 100 == 0:
top_code = '\n'.join([debug_str(ind, task_eval_fn)
for ind in topk(population, k=4)])
logging.info('gen: %d\nNPE: %d\n%s\n\n', gen, gen * pop_size, top_code)
best_code = ''.join(halloffame.get_max()[1])
res = task_eval_fn(best_code)
# Write population and hall-of-fame to disk.
if checkpoint_writer:
checkpoint_writer.write(gen, population, halloffame)
if res.correct:
logging.info('Solution found:\n%s\nreward = %s\n',
best_code, res.reward)
break
gen += 1
best_code = ''.join(halloffame.get_max()[1])
res = task_eval_fn(best_code)
return GaResult(
population=population, best_code=best_code, reward=res.reward,
solution_found=res.correct, generations=gen,
num_programs=gen * len(population),
max_generations=ngen, max_num_programs=ngen * len(population))
GaResult = namedtuple(
'GaResult',
['population', 'best_code', 'reward', 'generations', 'num_programs',
'solution_found', 'max_generations', 'max_num_programs'])
def reward_conversion(reward):
"""Convert real value into positive value."""
if reward <= 0:
return 0.05
return reward + 0.05
def roulette_selection(population, k):
"""Select `k` individuals with prob proportional to fitness.
Each of the `k` selections is independent.
Warning:
The roulette selection by definition cannot be used for minimization
or when the fitness can be smaller or equal to 0.
Args:
population: A list of Individual objects to select from.
k: The number of individuals to select.
Returns:
A list of selected individuals.
"""
fitnesses = np.asarray(
[reward_conversion(ind.fitness.values[0])
for ind in population])
assert np.all(fitnesses > 0)
sum_fits = fitnesses.sum()
chosen = [None] * k
for i in xrange(k):
u = random.random() * sum_fits
sum_ = 0
for ind, fitness in zip(population, fitnesses):
sum_ += fitness
if sum_ > u:
chosen[i] = Individual(ind)
break
if not chosen[i]:
chosen[i] = Individual(population[-1])
return chosen
def make_population(make_individual_fn, n):
return [make_individual_fn() for _ in xrange(n)]
def best(population):
best_ind = None
for ind in population:
if best_ind is None or best_ind.fitness.values < ind.fitness.values:
best_ind = ind
return best_ind
def topk(population, k):
q = utils.MaxUniquePriorityQueue(k)
for ind in population:
q.push(ind.fitness.values, tuple(ind), ind)
return [ind for _, ind in q.iter_in_order()]
class Fitness(object):
def __init__(self):
self.values = ()
@property
def valid(self):
"""Assess if a fitness is valid or not."""
return bool(self.values)
class Individual(list):
def __init__(self, *args):
super(Individual, self).__init__(*args)
self.fitness = Fitness()
def random_individual(genome_size):
return lambda: Individual(np.random.choice(GENES, genome_size).tolist())