Source code for pymoo.algorithms.moo.nsga3

import warnings

import numpy as np
from numpy.linalg import LinAlgError

from pymoo.algorithms.base.genetic import GeneticAlgorithm
from pymoo.core.survival import Survival
from pymoo.docs import parse_doc_string
from pymoo.operators.crossover.sbx import SBX
from pymoo.operators.mutation.pm import PM
from pymoo.operators.sampling.rnd import FloatRandomSampling
from pymoo.operators.selection.tournament import TournamentSelection, compare
from pymoo.util.display.multi import MultiObjectiveOutput
from pymoo.functions import load_function
from pymoo.util.misc import intersect, has_feasible
from pymoo.util.nds.non_dominated_sorting import NonDominatedSorting
from pymoo.util import default_random_state


# =========================================================================================================
# Implementation
# =========================================================================================================

@default_random_state
def comp_by_cv_then_random(pop, P, random_state=None, **kwargs):
    S = np.full(P.shape[0], np.nan)

    for i in range(P.shape[0]):
        a, b = P[i, 0], P[i, 1]

        # if at least one solution is infeasible
        if pop[a].CV > 0.0 or pop[b].CV > 0.0:
            S[i] = compare(a, pop[a].CV, b, pop[b].CV, method='smaller_is_better', return_random_if_equal=True)

        # both solutions are feasible just set random
        else:
            S[i] = random_state.choice([a, b])

    return S[:, None].astype(int)


[docs] class NSGA3(GeneticAlgorithm): def __init__(self, ref_dirs, pop_size=None, sampling=FloatRandomSampling(), selection=TournamentSelection(func_comp=comp_by_cv_then_random), crossover=SBX(eta=30, prob=1.0), mutation=PM(eta=20), eliminate_duplicates=True, n_offsprings=None, output=MultiObjectiveOutput(), **kwargs): """ Parameters ---------- ref_dirs : {ref_dirs} pop_size : int (default = None) By default the population size is set to None which means that it will be equal to the number of reference line. However, if desired this can be overwritten by providing a positive number. sampling : {sampling} selection : {selection} crossover : {crossover} mutation : {mutation} eliminate_duplicates : {eliminate_duplicates} n_offsprings : {n_offsprings} """ self.ref_dirs = ref_dirs # in case of R-NSGA-3 they will be None - otherwise this will be executed if self.ref_dirs is not None: if pop_size is None: pop_size = len(self.ref_dirs) if pop_size < len(self.ref_dirs): print( f"WARNING: pop_size={pop_size} is less than the number of reference directions ref_dirs={len(self.ref_dirs)}.\n" "This might cause unwanted behavior of the algorithm. \n" "Please make sure pop_size is equal or larger than the number of reference directions. ") if 'survival' in kwargs: survival = kwargs['survival'] del kwargs['survival'] else: survival = ReferenceDirectionSurvival(ref_dirs) super().__init__(pop_size=pop_size, sampling=sampling, selection=selection, crossover=crossover, mutation=mutation, survival=survival, eliminate_duplicates=eliminate_duplicates, n_offsprings=n_offsprings, output=output, advance_after_initial_infill=True, **kwargs) def _setup(self, problem, **kwargs): if self.ref_dirs is not None: if self.ref_dirs.shape[1] != problem.n_obj: raise Exception( "Dimensionality of reference points must be equal to the number of objectives: %s != %s" % (self.ref_dirs.shape[1], problem.n_obj)) def _set_optimum(self, **kwargs): if not has_feasible(self.pop): self.opt = self.pop[[np.argmin(self.pop.get("CV"))]] else: if len(self.survival.opt): self.opt = self.survival.opt
# ========================================================================================================= # Survival # ========================================================================================================= class ReferenceDirectionSurvival(Survival): def __init__(self, ref_dirs): super().__init__(filter_infeasible=True) self.ref_dirs = ref_dirs self.opt = None self.norm = HyperplaneNormalization(ref_dirs.shape[1]) def _do(self, problem, pop, n_survive, D=None, random_state=None, **kwargs): # attributes to be set after the survival F = pop.get("F") # calculate the fronts of the population fronts, rank = NonDominatedSorting().do(F, return_rank=True, n_stop_if_ranked=n_survive) non_dominated, last_front = fronts[0], fronts[-1] # update the hyperplane based boundary estimation hyp_norm = self.norm hyp_norm.update(F, nds=non_dominated) ideal, nadir = hyp_norm.ideal_point, hyp_norm.nadir_point # consider only the population until we come to the splitting front I = np.concatenate(fronts) pop, rank, F = pop[I], rank[I], F[I] # update the front indices for the current population counter = 0 for i in range(len(fronts)): for j in range(len(fronts[i])): fronts[i][j] = counter counter += 1 last_front = fronts[-1] # associate individuals to niches niche_of_individuals, dist_to_niche, dist_matrix = \ associate_to_niches(F, self.ref_dirs, ideal, nadir) # attributes of a population pop.set('rank', rank, 'niche', niche_of_individuals, 'dist_to_niche', dist_to_niche) # set the optimum, first front and closest to all reference directions closest = np.unique(dist_matrix[:, np.unique(niche_of_individuals)].argmin(axis=0)) self.opt = pop[intersect(fronts[0], closest)] if len(self.opt) == 0: self.opt = pop[fronts[0]] # if we need to select individuals to survive if len(pop) > n_survive: # if there is only one front if len(fronts) == 1: n_remaining = n_survive until_last_front = np.array([], dtype=int) niche_count = np.zeros(len(self.ref_dirs), dtype=int) # if some individuals already survived else: until_last_front = np.concatenate(fronts[:-1]) niche_count = calc_niche_count(len(self.ref_dirs), niche_of_individuals[until_last_front]) n_remaining = n_survive - len(until_last_front) S = niching(pop[last_front], n_remaining, niche_count, niche_of_individuals[last_front], dist_to_niche[last_front], random_state=random_state) survivors = np.concatenate((until_last_front, last_front[S].tolist())) pop = pop[survivors] return pop @default_random_state def niching(pop, n_remaining, niche_count, niche_of_individuals, dist_to_niche, random_state=None): survivors = [] # boolean array of elements that are considered for each iteration mask = np.full(len(pop), True) while len(survivors) < n_remaining: # number of individuals to select in this iteration n_select = n_remaining - len(survivors) # all niches where new individuals can be assigned to and the corresponding niche count next_niches_list = np.unique(niche_of_individuals[mask]) next_niche_count = niche_count[next_niches_list] # the minimum niche count min_niche_count = next_niche_count.min() # all niches with the minimum niche count (truncate randomly if there are more niches than remaining individuals) next_niches = next_niches_list[np.where(next_niche_count == min_niche_count)[0]] next_niches = next_niches[random_state.permutation(len(next_niches))[:n_select]] for next_niche in next_niches: # indices of individuals that are considered and assign to next_niche next_ind = np.where(np.logical_and(niche_of_individuals == next_niche, mask))[0] # shuffle to break random tie (equal perp. dist) or select randomly random_state.shuffle(next_ind) if niche_count[next_niche] == 0: next_ind = next_ind[np.argmin(dist_to_niche[next_ind])] else: # already randomized through shuffling next_ind = next_ind[0] # add the selected individual to the survivors mask[next_ind] = False survivors.append(int(next_ind)) # increase the corresponding niche count niche_count[next_niche] += 1 return survivors def associate_to_niches(F, niches, ideal_point, nadir_point, utopian_epsilon=0.0): utopian_point = ideal_point - utopian_epsilon denom = nadir_point - utopian_point denom[denom == 0] = 1e-12 # normalize by ideal point and intercepts N = (F - utopian_point) / denom dist_matrix = load_function("calc_perpendicular_distance")(N, niches) niche_of_individuals = np.argmin(dist_matrix, axis=1) dist_to_niche = dist_matrix[np.arange(F.shape[0]), niche_of_individuals] return niche_of_individuals, dist_to_niche, dist_matrix def calc_niche_count(n_niches, niche_of_individuals): niche_count = np.zeros(n_niches, dtype=int) index, count = np.unique(niche_of_individuals, return_counts=True) niche_count[index] = count return niche_count # ========================================================================================================= # Normalization # ========================================================================================================= class HyperplaneNormalization: def __init__(self, n_dim) -> None: super().__init__() self.ideal_point = np.full(n_dim, np.inf) self.worst_point = np.full(n_dim, -np.inf) self.nadir_point = None self.extreme_points = None def update(self, F, nds=None): # find or usually update the new ideal point - from feasible solutions self.ideal_point = np.min(np.vstack((self.ideal_point, F)), axis=0) self.worst_point = np.max(np.vstack((self.worst_point, F)), axis=0) # this decides whether only non-dominated points or all points are used to determine the extreme points if nds is None: nds = np.arange(len(F)) # find the extreme points for normalization self.extreme_points = get_extreme_points_c(F[nds, :], self.ideal_point, extreme_points=self.extreme_points) # find the intercepts for normalization and do backup if gaussian elimination fails worst_of_population = np.max(F, axis=0) worst_of_front = np.max(F[nds, :], axis=0) self.nadir_point = get_nadir_point(self.extreme_points, self.ideal_point, self.worst_point, worst_of_front, worst_of_population) def get_extreme_points_c(F, ideal_point, extreme_points=None): # calculate the asf which is used for the extreme point decomposition weights = np.eye(F.shape[1]) weights[weights == 0] = 1e6 # add the old extreme points to never lose them for normalization _F = F if extreme_points is not None: _F = np.concatenate([extreme_points, _F], axis=0) # use __F because we substitute small values to be 0 __F = _F - ideal_point __F[__F < 1e-3] = 0 # update the extreme points for the normalization having the highest asf value each F_asf = np.max(__F * weights[:, None, :], axis=2) I = np.argmin(F_asf, axis=1) extreme_points = _F[I, :] return extreme_points def get_nadir_point(extreme_points, ideal_point, worst_point, worst_of_front, worst_of_population): try: # find the intercepts using gaussian elimination M = extreme_points - ideal_point b = np.ones(extreme_points.shape[1]) plane = np.linalg.solve(M, b) warnings.simplefilter("ignore") intercepts = 1 / plane nadir_point = ideal_point + intercepts # check if the hyperplane makes sense if not np.allclose(np.dot(M, plane), b) or np.any(intercepts <= 1e-6): raise LinAlgError() # if the nadir point should be larger than any value discovered so far set it to that value # NOTE: different to the proposed version in the paper b = nadir_point > worst_point nadir_point[b] = worst_point[b] except LinAlgError: # fall back to worst of front otherwise nadir_point = worst_of_front # if the range is too small set it to worst of population b = nadir_point - ideal_point <= 1e-6 nadir_point[b] = worst_of_population[b] return nadir_point parse_doc_string(NSGA3.__init__)