Source code for pymoo.algorithms.moo.rnsga3

import numpy as np

from pymoo.algorithms.moo.nsga3 import calc_niche_count, niching, comp_by_cv_then_random, associate_to_niches, NSGA3
from pymoo.core.survival import Survival
from pymoo.docs import parse_doc_string
from pymoo.operators.sampling.rnd import FloatRandomSampling
from pymoo.operators.selection.tournament import TournamentSelection
from pymoo.util.misc import intersect
from pymoo.util.nds.non_dominated_sorting import NonDominatedSorting
from pymoo.util.normalization import denormalize, get_extreme_points_c, get_nadir_point
from pymoo.util.reference_direction import UniformReferenceDirectionFactory


# =========================================================================================================
# Implementation
# =========================================================================================================


[docs] class RNSGA3(NSGA3): def __init__(self, ref_points, pop_per_ref_point, mu=0.05, sampling=FloatRandomSampling(), selection=TournamentSelection(func_comp=comp_by_cv_then_random), eliminate_duplicates=True, n_offsprings=None, **kwargs): """ Parameters ---------- ref_points : {ref_points} pop_per_ref_point : int Size of the population used for each reference point. mu : float Defines the init_simplex_scale of the reference lines used during survival selection. Increasing mu will result having solutions with a larger spread. n_offsprings : {n_offsprings} sampling : {sampling} selection : {selection} crossover : {crossover} mutation : {mutation} eliminate_duplicates : {eliminate_duplicates} """ # number of objectives the reference lines have n_obj = ref_points.shape[1] # add the aspiration point lines aspiration_ref_dirs = UniformReferenceDirectionFactory(n_dim=n_obj, n_points=pop_per_ref_point).do() survival = AspirationPointSurvival(ref_points, aspiration_ref_dirs, mu=mu) pop_size = ref_points.shape[0] * aspiration_ref_dirs.shape[0] + aspiration_ref_dirs.shape[1] ref_dirs = None super().__init__(ref_dirs, pop_size=pop_size, sampling=sampling, selection=selection, survival=survival, eliminate_duplicates=eliminate_duplicates, n_offsprings=n_offsprings, **kwargs) def _setup(self, problem, **kwargs): if self.survival.ref_points.shape[1] != problem.n_obj: raise Exception("Dimensionality of reference points must be equal to the number of objectives: %s != %s" % (self.survival.ref_points.shape[1], problem.n_obj)) def _finalize(self): pass
class AspirationPointSurvival(Survival): def __init__(self, ref_points, aspiration_ref_dirs, mu=0.1): super().__init__() self.ref_points = ref_points self.aspiration_ref_dirs = aspiration_ref_dirs self.mu = mu self.ref_dirs = aspiration_ref_dirs self.extreme_points = None self.intercepts = None self.nadir_point = None self.opt = None self.ideal_point = np.full(ref_points.shape[1], np.inf) self.worst_point = np.full(ref_points.shape[1], -np.inf) def _do(self, problem, pop, n_survive, D=None, random_state=None, **kwargs): # attributes to be set after the survival F = pop.get("F") # find or usually update the new ideal point - from feasible solutions self.ideal_point = np.min(np.vstack((self.ideal_point, F, self.ref_points)), axis=0) self.worst_point = np.max(np.vstack((self.worst_point, F, self.ref_points)), axis=0) # calculate the fronts of the population fronts, rank = NonDominatedSorting().do(F, return_rank=True, n_stop_if_ranked=n_survive) non_dominated, last_front = fronts[0], fronts[-1] # find the extreme points for normalization self.extreme_points = get_extreme_points_c( np.vstack([F[non_dominated], self.ref_points]) , self.ideal_point, extreme_points=self.extreme_points) # find the intercepts for normalization and do backup if gaussian elimination fails worst_of_population = np.max(F, axis=0) worst_of_front = np.max(F[non_dominated, :], axis=0) self.nadir_point = get_nadir_point(self.extreme_points, self.ideal_point, self.worst_point, worst_of_population, worst_of_front) # consider only the population until we come to the splitting front I = np.concatenate(fronts) pop, rank, F = pop[I], rank[I], F[I] # update the front indices for the current population counter = 0 for i in range(len(fronts)): for j in range(len(fronts[i])): fronts[i][j] = counter counter += 1 last_front = fronts[-1] unit_ref_points = (self.ref_points - self.ideal_point) / (self.nadir_point - self.ideal_point) ref_dirs = get_ref_dirs_from_points(unit_ref_points, self.aspiration_ref_dirs, mu=self.mu) self.ref_dirs = denormalize(ref_dirs, self.ideal_point, self.nadir_point) # associate individuals to niches niche_of_individuals, dist_to_niche, dist_matrix = associate_to_niches(F, ref_dirs, self.ideal_point, self.nadir_point) pop.set('rank', rank, 'niche', niche_of_individuals, 'dist_to_niche', dist_to_niche) # set the optimum, first front and closest to all reference directions closest = np.unique(dist_matrix[:, np.unique(niche_of_individuals)].argmin(axis=0)) self.opt = pop[intersect(fronts[0], closest)] # if we need to select individuals to survive if len(pop) > n_survive: # if there is only one front if len(fronts) == 1: n_remaining = n_survive until_last_front = np.array([], dtype=int) niche_count = np.zeros(len(ref_dirs), dtype=int) # if some individuals already survived else: until_last_front = np.concatenate(fronts[:-1]) niche_count = calc_niche_count(len(ref_dirs), niche_of_individuals[until_last_front]) n_remaining = n_survive - len(until_last_front) S = niching(pop[last_front], n_remaining, niche_count, niche_of_individuals[last_front], dist_to_niche[last_front], random_state=random_state) survivors = np.concatenate((until_last_front, last_front[S].tolist())) pop = pop[survivors] return pop def get_ref_dirs_from_points(ref_point, ref_dirs, mu=0.1): """ This function takes user specified reference points, and creates smaller sets of equidistant Das-Dennis points around the projection of user points on the Das-Dennis hyperplane :param ref_point: List of user specified reference points :param n_obj: Number of objectives to consider :param mu: Shrinkage factor (0-1), Smaller = tighter convergence, Larger= larger convergence :return: Set of reference points """ n_obj = ref_point.shape[1] val = [] n_vector = np.ones(n_obj) / np.sqrt(n_obj) # Normal vector of Das Dennis plane point_on_plane = np.eye(n_obj)[0] # Point on Das-Dennis for point in ref_point: ref_dir_for_aspiration_point = np.copy(ref_dirs) # Copy of computed reference directions ref_dir_for_aspiration_point = mu * ref_dir_for_aspiration_point cent = np.mean(ref_dir_for_aspiration_point, axis=0) # Find centroid of shrunken reference points # Project shrunken Das-Dennis points back onto original Das-Dennis hyperplane intercept = line_plane_intersection(np.zeros(n_obj), point, point_on_plane, n_vector) shift = intercept - cent # shift vector ref_dir_for_aspiration_point += shift # If reference directions are located outside of first octant, redefine points onto the border if not (ref_dir_for_aspiration_point > 0).min(): ref_dir_for_aspiration_point[ref_dir_for_aspiration_point < 0] = 0 ref_dir_for_aspiration_point = ref_dir_for_aspiration_point / np.sum(ref_dir_for_aspiration_point, axis=1)[ :, None] val.extend(ref_dir_for_aspiration_point) val.extend(np.eye(n_obj)) # Add extreme points return np.array(val) # intersection function def line_plane_intersection(l0, l1, p0, p_no, epsilon=1e-6): """ l0, l1: define the line p0, p_no: define the plane: p0 is a point on the plane (plane coordinate). p_no is a normal vector defining the plane direction; (does not need to be normalized). reference: https://en.wikipedia.org/wiki/Line%E2%80%93plane_intersection return a Vector or None (when the intersection can't be found). """ l = l1 - l0 dot = np.dot(l, p_no) if abs(dot) > epsilon: # the factor of the point between p0 -> p1 (0 - 1) # if 'fac' is between (0 - 1) the point intersects with the segment. # otherwise: # < 0.0: behind p0. # > 1.0: in front of p1. w = p0 - l0 d = np.dot(w, p_no) / dot l = l * d return l0 + l else: # The segment is parallel to plane then return the perpendicular projection ref_proj = l1 - (np.dot(l1 - p0, p_no) * p_no) return ref_proj parse_doc_string(RNSGA3.__init__)