Source code for pymoo.algorithms.soo.nonconvex.pso

import numpy as np

from pymoo.algorithms.soo.nonconvex.ga import FitnessSurvival
from pymoo.core.algorithm import Algorithm
from pymoo.core.individual import Individual
from pymoo.core.initialization import Initialization
from pymoo.core.population import Population
from pymoo.core.repair import NoRepair
from pymoo.core.replacement import ImprovementReplacement
from pymoo.docs import parse_doc_string
from pymoo.operators.crossover.dex import repair_random_init
from pymoo.operators.mutation.pm import PM
from pymoo.operators.repair.bounds_repair import is_out_of_bounds_by_problem
from pymoo.operators.repair.to_bound import set_to_bounds_if_outside
from pymoo.operators.sampling.lhs import LHS
from pymoo.util.display.column import Column
from pymoo.util.display.single import SingleObjectiveOutput
from pymoo.util.misc import norm_eucl_dist
from pymoo.util import default_random_state


# =========================================================================================================
# Display
# =========================================================================================================

class PSOFuzzyOutput(SingleObjectiveOutput):

    def __init__(self):
        super().__init__()

        self.f = Column(name="f", width=8)
        self.S = Column(name="S", width=7)
        self.w = Column(name="w", width=7)
        self.c1 = Column(name="c1", width=8)
        self.c2 = Column(name="c2", width=8)

        self.columns += [self.f, self.S, self.w, self.c1, self.c2]

    def update(self, algorithm):
        super().update(algorithm)

        self.f.set(algorithm.f)
        self.S.set(algorithm.strategy)
        self.w.set(algorithm.w)
        self.c1.set(algorithm.c1)
        self.c2.set(algorithm.c2)


# =========================================================================================================
# Adaptation Constants
# =========================================================================================================


def S1_exploration(f):
    if f <= 0.4:
        return 0
    elif 0.4 < f <= 0.6:
        return 5 * f - 2
    elif 0.6 < f <= 0.7:
        return 1
    elif 0.7 < f <= 0.8:
        return -10 * f + 8
    elif 0.8 < f:
        return 0


def S2_exploitation(f):
    if f <= 0.2:
        return 0
    elif 0.2 < f <= 0.3:
        return 10 * f - 2
    elif 0.3 < f <= 0.4:
        return 1
    elif 0.4 < f <= 0.6:
        return -5 * f + 3
    elif 0.6 < f:
        return 0


def S3_convergence(f):
    if f <= 0.1:
        return 1
    elif 0.1 < f <= 0.3:
        return -5 * f + 1.5
    elif 0.3 < f:
        return 0


def S4_jumping_out(f):
    if f <= 0.7:
        return 0
    elif 0.7 < f <= 0.9:
        return 5 * f - 3.5
    elif 0.9 < f:
        return 1


# =========================================================================================================
# Equation
# =========================================================================================================

@default_random_state
def pso_equation(X, P_X, S_X, V, V_max, w, c1, c2, r1=None, r2=None, random_state=None):
    n_particles, n_var = X.shape

    if r1 is None:
        r1 = random_state.random((n_particles, n_var))

    if r2 is None:
        r2 = random_state.random((n_particles, n_var))

    inerta = w * V
    cognitive = c1 * r1 * (P_X - X)
    social = c2 * r2 * (S_X - X)

    # calculate the velocity vector
    Vp = inerta + cognitive + social
    Vp = set_to_bounds_if_outside(Vp, - V_max, V_max)

    Xp = X + Vp

    return Xp, Vp


# =========================================================================================================
# Implementation
# =========================================================================================================


[docs] class PSO(Algorithm): def __init__(self, pop_size=25, sampling=LHS(), w=0.9, c1=2.0, c2=2.0, adaptive=True, initial_velocity="random", max_velocity_rate=0.20, pertube_best=True, repair=NoRepair(), output=PSOFuzzyOutput(), **kwargs): """ Parameters ---------- pop_size : The size of the swarm being used. sampling : {sampling} adaptive : bool Whether w, c1, and c2 are changed dynamically over time. The update uses the spread from the global optimum to determine suitable values. w : float The inertia F to be used in each iteration for the velocity update. This can be interpreted as the momentum term regarding the velocity. If `adaptive=True` this is only the initially used value. c1 : float The cognitive impact (personal best) during the velocity update. If `adaptive=True` this is only the initially used value. c2 : float The social impact (global best) during the velocity update. If `adaptive=True` this is only the initially used value. initial_velocity : str - ('random', or 'zero') How the initial velocity of each particle should be assigned. Either 'random' which creates a random velocity vector or 'zero' which makes the particles start to find the direction through the velocity update equation. max_velocity_rate : float The maximum velocity rate. It is determined variable (and not vector) wise. We consider the rate here since the value is normalized regarding the `xl` and `xu` defined in the problem. pertube_best : bool Some studies have proposed to mutate the global best because it has been found to converge better. Which means the population size is reduced by one particle and one function evaluation is spend additionally to permute the best found solution so far. """ super().__init__(output=output, **kwargs) self.initialization = Initialization(sampling) self.pop_size = pop_size self.adaptive = adaptive self.pertube_best = pertube_best self.V_max = None self.initial_velocity = initial_velocity self.max_velocity_rate = max_velocity_rate self.repair = repair self.w = w self.c1 = c1 self.c2 = c2 self.particles = None self.sbest = None def _setup(self, problem, **kwargs): self.V_max = self.max_velocity_rate * (problem.xu - problem.xl) self.f, self.strategy = None, None def _initialize_infill(self): return self.initialization.do(self.problem, self.pop_size, algorithm=self, random_state=self.random_state) def _initialize_advance(self, infills=None, **kwargs): particles = self.pop if self.initial_velocity == "random": init_V = self.random_state.random((len(particles), self.problem.n_var)) * self.V_max[None, :] elif self.initial_velocity == "zero": init_V = np.zeros((len(particles), self.problem.n_var)) else: raise Exception("Unknown velocity initialization.") particles.set("V", init_V) self.particles = particles super()._initialize_advance(infills=infills, **kwargs) def _infill(self): problem, particles, pbest = self.problem, self.particles, self.pop (X, V) = particles.get("X", "V") P_X = pbest.get("X") sbest = self._social_best() S_X = sbest.get("X") Xp, Vp = pso_equation(X, P_X, S_X, V, self.V_max, self.w, self.c1, self.c2, random_state=self.random_state) # if the problem has boundaries to be considered if problem.has_bounds(): for k in range(20): # find the individuals which are still infeasible m = is_out_of_bounds_by_problem(problem, Xp) if len(m) == 0: break # actually execute the differential equation Xp[m], Vp[m] = pso_equation(X[m], P_X[m], S_X[m], V[m], self.V_max, self.w, self.c1, self.c2, random_state=self.random_state) # if still infeasible do a random initialization Xp = repair_random_init(Xp, X, *problem.bounds(), random_state=self.random_state) # create the offspring population off = Population.new(X=Xp, V=Vp) # try to improve the current best with a pertubation if self.pertube_best: k = FitnessSurvival().do(problem, pbest, n_survive=1, return_indices=True)[0] mut = PM(prob=0.9, eta=self.random_state.uniform(5, 30), at_least_once=False) mutant = mut(problem, Population(Individual(X=pbest[k].X)), random_state=self.random_state)[0] off[k].set("X", mutant.X) self.repair(problem, off) self.sbest = sbest return off def _advance(self, infills=None, **kwargs): assert infills is not None, "This algorithms uses the AskAndTell interface thus 'infills' must to be provided." # set the new population to be equal to the offsprings self.particles = infills # if an offspring has improved the personal store that index has_improved = ImprovementReplacement().do(self.problem, self.pop, infills, return_indices=True) # set the personal best which have been improved self.pop[has_improved] = infills[has_improved] if self.adaptive: self._adapt() def _social_best(self): return Population([self.opt[0]] * len(self.pop)) def _adapt(self): pop = self.pop X, F = pop.get("X", "F") sbest = self.sbest w, c1, c2, = self.w, self.c1, self.c2 # get the average distance from one to another for normalization D = norm_eucl_dist(self.problem, X, X) mD = D.sum(axis=1) / (len(pop) - 1) _min, _max = mD.min(), mD.max() # get the average distance to the best g_D = norm_eucl_dist(self.problem, sbest.get("X"), X).mean() f = (g_D - _min) / (_max - _min + 1e-32) S = np.array([S1_exploration(f), S2_exploitation(f), S3_convergence(f), S4_jumping_out(f)]) strategy = S.argmax() + 1 delta = 0.05 + (self.random_state.random() * 0.05) if strategy == 1: c1 += delta c2 -= delta elif strategy == 2: c1 += 0.5 * delta c2 -= 0.5 * delta elif strategy == 3: c1 += 0.5 * delta c2 += 0.5 * delta elif strategy == 4: c1 -= delta c2 += delta c1 = max(1.5, min(2.5, c1)) c2 = max(1.5, min(2.5, c2)) if c1 + c2 > 4.0: c1 = 4.0 * (c1 / (c1 + c2)) c2 = 4.0 * (c2 / (c1 + c2)) w = 1 / (1 + 1.5 * np.exp(-2.6 * f)) self.f = f self.strategy = strategy self.c1 = c1 self.c2 = c2 self.w = w
parse_doc_string(PSO.__init__)