File size: 13,803 Bytes
81a5d0a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
from typing import Iterable, Callable, Tuple, List, Union, Dict
import numpy as np
from copy import deepcopy as copy
from .utils import *
from itertools import chain
from abc import abstractproperty, abstractmethod
from .hw_nats_fast_interface import HW_NATS_FastInterface


class Individual:
    """
    Base Class for all individuals in the population.
    Base class attributes are the genotype identifying the individual (and, therefore, the network) and its
    index within the search space it is drawn from.
    """
    def __init__(self, genotype:List[str], index:int):
        self._genotype = genotype
        self.index=index
        self._fitness = None
    
    @abstractproperty
    def genotype(self): 
        """This class is used to define the network architecture."""
        raise NotImplementedError("Implement this property in child classes!")
    
    @abstractproperty
    def fitness(self):
        """This class is used to define the fitness of the individual."""
        raise NotImplementedError("Implement this property in child classes!")

    @abstractmethod
    def update_idx(self):
        """Update the index of the individual in the population"""
        raise NotImplementedError("Implement this method in child classes!")
    
    @abstractmethod
    def update_genotype(self, new_genotype:List[str]): 
        """Update current genotype with new one. When doing so, also the network field is updated"""
        raise NotImplementedError("Implement this method in child classes!")
    
    @abstractmethod
    def update_fitness(self, metric:Callable, attribute:str="net"): 
        """Update the current value of fitness using provided metric"""
        raise NotImplementedError("Implement this method in child classes!")
    

class FastIndividual(Individual): 
    """
    Fast individuals are used in the context of age-regularized genetic algorithms and, therefore, are
    characterized by an additional field, i.e. age.
    """
    def __init__(
        self,
        genotype:List[str],
        index:int, 
        genotype_to_idx:Dict[str, int],
        age:int=0):
        
        # init parent class
        super().__init__(genotype, index)

        self.age = age
        self.genotype_to_idx = genotype_to_idx
        
    @property
    def genotype(self): 
        return self._genotype
    
    @property
    def fitness(self): 
        return self._fitness
    
    def update_idx(self):
        self.index = self.genotype_to_idx["/".join(self._genotype)]

    def update_genotype(self, new_genotype:List[str]): 
        """Update current genotype with new one. When doing so, also the network field is updated"""
        self._genotype = new_genotype
        self.update_idx()
    
    def update_fitness(self, metric:Callable, attribute:str="net"): 
        """Update the current value of fitness using provided metric"""
        self._fitness = metric(getattr(self, attribute))

class Genetic: 
    def __init__(
        self, 
        genome:Iterable[str], 
        searchspace:HW_NATS_FastInterface,
        strategy:Tuple[str, str]="comma", 
        tournament_size:int=5):
        
        self.genome = set(genome) if not isinstance(genome, set) else genome
        self.strategy = strategy
        self.tournament_size = tournament_size
        self.searchspace = searchspace

    def tournament(self, population:Iterable[Individual]) -> Iterable[Individual]:
        """
        Return tournament, i.e. a random subset of population of size tournament size. 
        Sampling is done without replacement to ensure diversity inside the actual tournament.
        """
        return np.random.choice(a=population, size=self.tournament_size, replace=False).tolist()
    
    def obtain_parents(self, population:Iterable[Individual], n_parents:int=2) -> Iterable[Individual]:
        """Obtain n_parents from population. Parents are defined as the fittest individuals in n_parents tournaments"""
        tournament = self.tournament(population = population)
        # parents are defined as fittest individuals in tournaments
        parents = sorted(tournament, key = lambda individual: individual.fitness, reverse=True)[:n_parents]
        return parents
    
    def mutate(self, 
               individual:Individual, 
               n_loci:int=1, 
               genes_prob:Tuple[None, List[float]]=None) -> Individual: 
        """Applies mutation to a given individual"""
        for _ in range(n_loci): 
            mutant_genotype = copy(individual.genotype)
            # select a locus in the genotype (that is, where mutation will occurr)
            if genes_prob is None:  # uniform probability over all loci
                mutant_locus = np.random.randint(low=0, high=len(mutant_genotype))
            else:  # custom probability distrubution over which locus to mutate
                mutant_locus = np.random.choice(mutant_genotype, p=genes_prob)
            # mapping the locus to the actual gene that will effectively change
            mutant_gene = mutant_genotype[mutant_locus]
            operation, level = mutant_gene.split("~")  # splits the gene into operation and level
            # mutation changes gene, so the current one must be removed from the pool of candidate genes
            mutations = self.genome.difference([operation])
            
            # overwriting the mutant gene with a new one - probability of chosing how to mutate should be selected as well
            mutant_genotype[mutant_locus] = np.random.choice(a=list(mutations)) + f"~{level}"

        mutant_individual = FastIndividual(genotype=None, genotype_to_idx=self.searchspace.architecture_to_index, index=None)
        mutant_individual.update_genotype(mutant_genotype)

        return mutant_individual
    
    def recombine(self, individuals:Iterable[Individual], P_parent1:float=0.5) -> Individual: 
        """Performs recombination of two given `individuals`"""
        if len(individuals) != 2: 
            raise ValueError("Number of individuals cannot be different from 2!")
        
        individual1, individual2 = individuals
        recombinant_genotype = [None for _ in range(len(individual1.genotype))]
        for locus_idx, (gene_1, gene_2) in enumerate(zip(individual1.genotype, individual2.genotype)):
            # chose genes from parent1 according to P_parent1
            recombinant_genotype[locus_idx] = gene_1 if np.random.random() <= P_parent1 else gene_2

        recombinant = FastIndividual(genotype=None, genotype_to_idx=self.searchspace.architecture_to_index, index=None)
        recombinant.update_genotype(list(recombinant_genotype))

        return recombinant

class Population: 
    def __init__(self,
                 searchspace:object,
                 init_population:Union[bool, Iterable]=True,
                 n_individuals:int=20,
                 normalization:str='dynamic'): 
        self.searchspace = searchspace
        if init_population is True:
            self._population = generate_population(searchspace_interface=searchspace, n_individuals=n_individuals)
        else: 
            self._population = init_population
        
        self.oldest = None
        self.worst_n = None
        self.normalization = normalization.lower()
    
    def __iter__(self): 
        for i in self._population: 
            yield i
    
    @property
    def individuals(self):
        return self._population
    
    def update_population(self, new_population:Iterable[Individual]): 
        """Overwrites current population with new one stored in `new_population`"""
        if all([isinstance(el, Individual) for el in new_population]):
            del self._population
            self._population = new_population
        else:
            raise ValueError("new_population is not an Iterable of `Individual` datatype!")

    def fittest_n(self, n:int=1): 
        """Return first `n` individuals based on fitness value"""
        return sorted(self._population, key=lambda individual: individual.fitness, reverse=True)[:n]
    
    def update_ranking(self): 
        """Updates the ranking in the population in light of fitness value"""
        sorted_individuals = sorted(self._population, key=lambda individual: individual.fitness, reverse=True)
        
        # ranking in light of individuals 
        for ranking, individual in enumerate(sorted_individuals):
            individual.update_ranking(new_rank=ranking)

    def update_fitness(self, fitness_function:Callable): 
        """Updates the fitness value of individuals in the population"""
        for individual in self.individuals: 
            fitness_function(individual)
    
    def apply_on_individuals(self, function:Callable)->Union[Iterable, None]: 
        """Applies a function on each individual in the population
        
        Args: 
            function (Callable): function to apply on each individual. Must return an object of class Individual.
        Returns: 
            Union[Iterable, None]: Iterable when inplace=False represents the individuals with function applied.
                                   None represents the output when inplace=True (hence function is applied on the
                                   actual population.
        """
        self._population = [function(individual) for individual in self._population]

    def set_extremes(self, score:str):
        """Set the maximal&minimal value in the population for the score 'score' (must be a class attribute)"""
        if self.normalization == 'dynamic':
            # accessing to the score of each individual
            scores = [getattr(ind, score) for ind in self.individuals]
            min_value = min(scores)
            max_value = max(scores)
        elif self.normalization == 'minmax':
            # extreme_scores is a 2x`number_of_scores`
            min_value, max_value = self.extreme_scores[:, self.scores_dict[score]]
        elif self.normalization == 'standard':
            # extreme_scores is a 2x`number_of_scores`
            mean_value, std_value = self.extreme_scores[:, self.scores_dict[score]]

        if self.normalization in ['minmax', 'dynamic']:
            setattr(self, f"max_{score}", max_value)
            setattr(self, f"min_{score}", min_value)
        else:
            setattr(self, f"mean_{score}", mean_value)
            setattr(self, f"std_{score}", std_value)

    def age(self): 
        """Embeds ageing into the process"""
        def individuals_ageing(individual): 
            individual.age += 1
            return individual

        self.apply_on_individuals(function=individuals_ageing)
    
    def add_to_population(self, new_individuals:Iterable[Individual]): 
        """Add new_individuals to population"""
        self._population = list(chain(self.individuals, new_individuals))
    
    def remove_from_population(self, attribute:str="fitness", n:int=1, ascending:bool=True): 
        """
        Remove first/last `n` elements from sorted population population in `ascending/descending`
        order based on the value of `attribute`.
        """
        # check that input attribute is an attribute of all the individuals
        if not all([hasattr(el, attribute) for el in self.individuals]):
            raise ValueError(f"Attribute '{attribute}' is not an attribute of all the individuals!")
        
        # sort the population based on the value of attribute
        sorted_population = sorted(self.individuals, key=lambda ind: getattr(ind, attribute), reverse=False if ascending else True)
        # new population is old population minus the `n` worst individuals with respect to `attribute`
        self.update_population(sorted_population[n:])

    def update_oldest(self, candidate:Individual): 
        """Updates oldest individual in the population"""
        if candidate.age >= self.oldest.age: 
            self.oldest = candidate
        else: 
            pass

    def update_worst_n(self, candidate:Individual, attribute:str="fitness", n:int=2): 
        """Updates worst_n elements in the population"""
        if hasattr(candidate, attribute): 
            if any([getattr(candidate, attribute) < getattr(worst, attribute) for worst in self.worst_n]):
                # candidate is worse than one of the worst individuals
                bad_individuals = self.worst_n + candidate
                # sort in increasing values of fitness
                bad_sorted = sorted(bad_individuals, lambda ind: getattr(ind, attribute))
                self.worst_n = bad_sorted[:n]  # return new worst individuals
    
    def set_oldest(self): 
        """Sets oldest individual in population"""
        self.oldest = max(self.individuals, key=lambda ind: ind.age)
    
    def set_worst_n(self, attribute:str="fitness", n:int=2): 
        """Sets worst n elements based on the value of arbitrary attribute"""
        self.worst_n = sorted(self.individuals, key=lambda ind: getattr(ind, attribute))[:n]
        

def generate_population(searchspace_interface:HW_NATS_FastInterface, n_individuals:int=20)->list: 
    """This function generates a population of FastInviduals based on the searchspace interface"""
    # at first generate full cell-structure and unique network indices
    cells, indices = searchspace_interface.generate_random_samples(n_samples=n_individuals)
    
    # mapping strings to list of genes (~genomes)
    genotypes = map(lambda cell: searchspace_interface.architecture_to_list(cell), cells)
    # turn full architecture and cell-structure into genetic population individual
    population = [
        FastIndividual(genotype=genotype, index=index, genotype_to_idx=searchspace_interface.architecture_to_index) 
        for genotype, index in zip(genotypes, indices)
    ]
    
    return population