Source code for pymoo.algorithms.moo.moead

import numpy as np
from scipy.spatial.distance import cdist

from pymoo.algorithms.base.genetic import GeneticAlgorithm
from pymoo.core.algorithm import LoopwiseAlgorithm
from pymoo.core.duplicate import NoDuplicateElimination
from pymoo.core.population import Population
from pymoo.core.selection import Selection
from pymoo.core.variable import Real, get
from pymoo.docs import parse_doc_string
from pymoo.operators.crossover.sbx import SBX
from pymoo.operators.mutation.pm import PM
from pymoo.operators.sampling.rnd import FloatRandomSampling
from pymoo.util.display.multi import MultiObjectiveOutput
from pymoo.util.reference_direction import default_ref_dirs


class NeighborhoodSelection(Selection):

    def __init__(self, prob=1.0) -> None:
        super().__init__()
        self.prob = Real(prob, bounds=(0.0, 1.0))

    def _do(self, problem, pop, n_select, n_parents, neighbors=None, random_state=None, **kwargs):
        assert n_select == len(neighbors)
        P = np.full((n_select, n_parents), -1)

        prob = get(self.prob, size=n_select)

        for k in range(n_select):
            if random_state.random() < prob[k]:
                P[k] = random_state.choice(neighbors[k], n_parents, replace=False)
            else:
                P[k] = random_state.permutation(len(pop))[:n_parents]

        return P


# =========================================================================================================
# Implementation
# =========================================================================================================

[docs] class MOEAD(LoopwiseAlgorithm, GeneticAlgorithm): def __init__(self, ref_dirs=None, n_neighbors=20, decomposition=None, prob_neighbor_mating=0.9, sampling=FloatRandomSampling(), crossover=SBX(prob=1.0, eta=20), mutation=PM(prob_var=None, eta=20), output=MultiObjectiveOutput(), **kwargs): # reference directions used for MOEAD self.ref_dirs = ref_dirs # the decomposition metric used self.decomposition = decomposition # the number of neighbors considered during mating self.n_neighbors = n_neighbors self.neighbors = None self.selection = NeighborhoodSelection(prob=prob_neighbor_mating) super().__init__(pop_size=len(ref_dirs), sampling=sampling, crossover=crossover, mutation=mutation, eliminate_duplicates=NoDuplicateElimination(), output=output, advance_after_initialization=False, **kwargs) def _setup(self, problem, **kwargs): assert not problem.has_constraints(), "This implementation of MOEAD does not support any constraints." # if no reference directions have been provided get them and override the population size and other settings if self.ref_dirs is None: self.ref_dirs = default_ref_dirs(problem.n_obj) self.pop_size = len(self.ref_dirs) # neighbours includes the entry by itself intentionally for the survival method self.neighbors = np.argsort(cdist(self.ref_dirs, self.ref_dirs), axis=1, kind='quicksort')[:, :self.n_neighbors] # if the decomposition is not set yet, set the default if self.decomposition is None: self.decomposition = default_decomp(problem) def _initialize_advance(self, infills=None, **kwargs): super()._initialize_advance(infills, **kwargs) self.ideal = np.min(self.pop.get("F"), axis=0) def _next(self): pop = self.pop # iterate for each member of the population in random order for k in self.random_state.permutation(len(pop)): # get the parents using the neighborhood selection P = self.selection.do(self.problem, pop, 1, self.mating.crossover.n_parents, neighbors=[self.neighbors[k]], random_state=self.random_state) # perform a mating using the default operators - if more than one offspring just pick the first off = self.random_state.choice(self.mating.do(self.problem, pop, 1, parents=P, n_max_iterations=1, random_state=self.random_state)) # evaluate the offspring off = yield off # update the ideal point self.ideal = np.min(np.vstack([self.ideal, off.F]), axis=0) # now actually do the replacement of the individual is better self._replace(k, off) def _replace(self, k, off): pop = self.pop # calculate the decomposed values for each neighbor N = self.neighbors[k] FV = self.decomposition.do(pop[N].get("F"), weights=self.ref_dirs[N, :], ideal_point=self.ideal) off_FV = self.decomposition.do(off.F[None, :], weights=self.ref_dirs[N, :], ideal_point=self.ideal) # this makes the algorithm to support constraints - not originally proposed though and not tested enough # if self.problem.has_constraints(): # CV, off_CV = pop[N].get("CV")[:, 0], np.full(len(off_FV), off.CV) # fmax = max(FV.max(), off_FV.max()) # FV, off_FV = parameter_less(FV, CV, fmax=fmax), parameter_less(off_FV, off_CV, fmax=fmax) # get the absolute index in F where offspring is better than the current F (decomposed space) I = np.where(off_FV < FV)[0] pop[N[I]] = off
class ParallelMOEAD(MOEAD): def __init__(self, ref_dirs, **kwargs): super().__init__(ref_dirs, **kwargs) self.indices = None def _infill(self): pop_size, cross_parents, cross_off = self.pop_size, self.mating.crossover.n_parents, self.mating.crossover.n_offsprings # do the mating in a random order indices = self.random_state.permutation(len(self.pop))[:self.n_offsprings] # get the parents using the neighborhood selection P = self.selection.do(self.problem, self.pop, self.n_offsprings, cross_parents, neighbors=self.neighbors[indices], random_state=self.random_state) # do not any duplicates elimination - thus this results in exactly pop_size * n_offsprings offsprings off = self.mating.do(self.problem, self.pop, 1e12, n_max_iterations=1, parents=P, random_state=self.random_state) # select a random offspring from each mating off = Population.create(*[self.random_state.choice(pool) for pool in np.reshape(off, (self.n_offsprings, -1))]) # store the indices because of the neighborhood matching in advance self.indices = indices return off def _advance(self, infills=None, **kwargs): assert len(self.indices) == len(infills), "Number of infills must be equal to the one created beforehand." # update the ideal point before starting to replace self.ideal = np.min(np.vstack([self.ideal, infills.get("F")]), axis=0) # now do the replacements as in the loop-wise version for k, off in enumerate(infills): self._replace(self.indices[k], off) def default_decomp(problem): if problem.n_obj <= 2: from pymoo.decomposition.tchebicheff import Tchebicheff return Tchebicheff() else: from pymoo.decomposition.pbi import PBI return PBI() parse_doc_string(MOEAD.__init__)