Source code for pymoo.algorithms.moo.mopso_cd

import numpy as np

from pymoo.core.algorithm import Algorithm
from pymoo.core.population import Population
from pymoo.docs import parse_doc_string
from pymoo.operators.repair.to_bound import set_to_bounds_if_outside
from pymoo.operators.sampling.rnd import FloatRandomSampling
from pymoo.operators.survival.rank_and_crowding.metrics import get_crowding_function
from pymoo.termination.default import DefaultMultiObjectiveTermination
from pymoo.util import default_random_state
from pymoo.util.archive import MultiObjectiveArchive
from pymoo.util.display.multi import MultiObjectiveOutput
from pymoo.util.nds.non_dominated_sorting import NonDominatedSorting


[docs] class MOPSO_CD(Algorithm): """ Multi-Objective Particle Swarm Optimization with Crowding Distance (MOPSO-CD) algorithm. This implementation extends MOPSO with a crowding distance mechanism for leader selection and archive management to ensure a well-distributed Pareto front, suitable for problems like MO-HalfCheetah in multi-objective reinforcement learning. Parameters ---------- pop_size : int The population size (number of particles) w : float Inertia weight c1 : float Cognitive parameter (personal best influence) c2 : float Social parameter (global best influence) max_velocity_rate : float Maximum velocity rate relative to the variable range archive_size : int Maximum size of the external archive sampling : Sampling Sampling strategy for initialization output : Output Output display """ def __init__( self, pop_size=100, w=0.6, # Increased for better exploration c1=2.0, c2=2.0, max_velocity_rate=0.5, # Increased for better exploration archive_size=200, # Increased for better diversity sampling=FloatRandomSampling(), output=MultiObjectiveOutput(), **kwargs, ): super().__init__(output=output, **kwargs) self.pop_size = pop_size self.w = w self.c1 = c1 self.c2 = c2 self.max_velocity_rate = max_velocity_rate self.archive_size = archive_size self.sampling = sampling # Initialize termination if not provided self.termination = DefaultMultiObjectiveTermination() # Initialize components self.nds = NonDominatedSorting() self.cd = get_crowding_function("cd") self.archive = None self.velocities = None self.pbest = None self.pbest_f = None self.random_state = default_random_state(kwargs.get("seed")) def _setup(self, problem, **kwargs): """Setup the algorithm for the given problem""" super()._setup(problem, **kwargs) # Initialize the external archive self.archive = MultiObjectiveArchive(max_size=self.archive_size) # Compute maximum velocity based on problem bounds xl, xu = problem.bounds() self.v_max = self.max_velocity_rate * (xu - xl) # Initialize particles, velocities, and personal bests self.pop = self.sampling.do( problem, self.pop_size, random_state=self.random_state ) self.velocities = self.random_state.uniform( -self.v_max, self.v_max, (self.pop_size, problem.n_var) ) self.pbest = self.pop.copy() # Personal bests self.pbest_f = np.full( (self.pop_size, problem.n_obj), np.inf ) # Initialize with inf # Evaluate initial population to set personal best objectives self.evaluator.eval(self.problem, self.pop) def _initialize_infill(self): """Initialize the population and velocities""" # Initialize population using sampling pop = self.sampling.do(self.problem, self.pop_size, random_state=self.random_state) # Initialize velocities randomly self.velocities = self.random_state.uniform( -self.v_max, self.v_max, size=(self.pop_size, self.problem.n_var) ) # Initialize personal best (initially same as current positions) self.pbest = pop.copy() return pop def _initialize_advance(self, infills=None, **kwargs): """Initialize after evaluation""" self.pop = infills # Update archive with initial population self.archive = self._update_archive(infills) # Initialize personal best fitness self.pbest = infills.copy() self.pbest_f = infills.get("F").copy() def _infill(self): """Generate new solutions using PSO operators""" # Create new population X_new = np.zeros((self.pop_size, self.problem.n_var)) # Pre-select leaders for all particles to ensure diversity leaders = self._select_diverse_leaders() for i in range(self.pop_size): # Use pre-selected leader for this particle leader = leaders[i] if leaders[i] is not None else self.pop[i] # Generate random coefficients r1 = self.random_state.random(self.problem.n_var) r2 = self.random_state.random(self.problem.n_var) # Update velocity cognitive = self.c1 * r1 * (self.pbest[i].X - self.pop[i].X) social = self.c2 * r2 * (leader.X - self.pop[i].X) self.velocities[i] = self.w * self.velocities[i] + cognitive + social # Apply velocity bounds self.velocities[i] = set_to_bounds_if_outside( self.velocities[i], -self.v_max, self.v_max ) # Update position X_new[i] = self.pop[i].X + self.velocities[i] # Apply bounds to positions xl, xu = self.problem.bounds() X_new = set_to_bounds_if_outside(X_new, xl, xu) return Population.new("X", X_new) def _advance(self, infills=None, **kwargs): """Advance the algorithm state""" if infills is None: return # Update archive with crowding distance-based pruning combined_pop = Population.merge(self.pop, infills) self.archive = self._update_archive(combined_pop) # Update personal best self._update_pbest(infills) # Update current population self.pop = infills def _update_archive(self, pop): """Update the external archive with non-dominated solutions using crowding distance""" if len(pop) == 0: return self.archive # Combine current archive with new solutions if len(self.archive) > 0: combined = Population.merge(self.archive, pop) else: combined = pop # Find non-dominated solutions F = combined.get("F") I = self.nds.do(F, only_non_dominated_front=True) non_dominated = combined[I] # Apply archive size limit using crowding distance if len(non_dominated) > self.archive_size: # Use tournament selection to maintain diversity while keeping quality crowding = self.cd.do(non_dominated.get("F")) # Select solutions with better crowding distance (more diverse) selected_indices = [] remaining_indices = list(range(len(non_dominated))) while len(selected_indices) < self.archive_size and remaining_indices: # Tournament selection favoring higher crowding distance tournament_size = min(3, len(remaining_indices)) tournament_indices = self.random_state.choice( remaining_indices, size=tournament_size, replace=False ) # Select the one with highest crowding distance in tournament best_idx = tournament_indices[np.argmax(crowding[tournament_indices])] selected_indices.append(best_idx) remaining_indices.remove(best_idx) non_dominated = non_dominated[selected_indices] # Create new archive return MultiObjectiveArchive( individuals=non_dominated, max_size=self.archive_size ) def _select_diverse_leaders(self): """Select diverse leaders for all particles""" leaders = [] if len(self.archive) == 0: # If no archive, select randomly from population for i in range(self.pop_size): if len(self.pop) > 0: idx = self.random_state.integers(0, len(self.pop)) leaders.append(self.pop[idx]) else: leaders.append(None) return leaders # Ensure each particle gets a potentially different leader for i in range(self.pop_size): if len(self.archive) == 1: leaders.append(self.archive[0]) else: try: # Use binary tournament selection with crowding distance idx1 = self.random_state.integers(0, len(self.archive)) idx2 = self.random_state.integers(0, len(self.archive)) if idx1 == idx2: leaders.append(self.archive[idx1]) else: # Calculate crowding distance for comparison F = self.archive.get("F") crowding = self.cd.do(F) # Select leader with higher crowding distance (more diverse) if crowding[idx1] > crowding[idx2]: leaders.append(self.archive[idx1]) else: leaders.append(self.archive[idx2]) except Exception: # Fallback to random selection idx = self.random_state.integers(0, len(self.archive)) leaders.append(self.archive[idx]) return leaders def _update_pbest(self, new_pop): """Update personal best positions""" for i in range(len(new_pop)): # Compare new position with personal best if self._dominates(new_pop[i].F, self.pbest_f[i]): self.pbest[i] = new_pop[i].copy() self.pbest_f[i] = new_pop[i].F.copy() elif self._dominates(self.pbest_f[i], new_pop[i].F): # Keep current pbest pass else: # Non-dominated case: use crowding distance to decide # Combine both solutions to calculate crowding distance F_combined = np.vstack([self.pbest_f[i], new_pop[i].F]) try: crowding = self.cd.do(F_combined) # Select the one with higher crowding distance (more diverse) if crowding[1] > crowding[0]: # new solution has higher crowding self.pbest[i] = new_pop[i].copy() self.pbest_f[i] = new_pop[i].F.copy() # Otherwise keep current pbest except Exception: # Fallback to random selection if crowding distance fails if self.random_state.random() < 0.5: self.pbest[i] = new_pop[i].copy() self.pbest_f[i] = new_pop[i].F.copy() def _dominates(self, f1, f2): """Check if f1 dominates f2""" return np.all(f1 <= f2) and np.any(f1 < f2) def _set_optimum(self, **kwargs): """Set the optimum solutions from the archive""" if len(self.archive) > 0: self.opt = self.archive.copy() else: self.opt = Population.empty()
# Parse docstring for documentation parse_doc_string(MOPSO_CD.__init__)