import sys
from abc import ABC, abstractmethod
import numpy as np
from pymoo.algorithms.base.genetic import GeneticAlgorithm
from pymoo.algorithms.moo.nsga2 import binary_tournament
from pymoo.docs import parse_doc_string
from pymoo.operators.crossover.sbx import SBX
from pymoo.operators.mutation.pm import PM
from pymoo.operators.sampling.rnd import FloatRandomSampling
from pymoo.operators.selection.tournament import TournamentSelection
from pymoo.operators.survival.rank_and_crowding import RankAndCrowding
from pymoo.termination.default import DefaultMultiObjectiveTermination
from pymoo.util import value_functions as mvf
from pymoo.util.display.multi import MultiObjectiveOutput
from pymoo.util.nds.non_dominated_sorting import NonDominatedSorting
from pymoo.util.reference_direction import select_points_with_maximum_distance
from pymoo.util.vf_dominator import VFDominator
# =========================================================================================================
# Implementation
# =========================================================================================================
class AutomatedDM(ABC):
def __init__(self, get_pairwise_ranks_func=None):
self.get_pairwise_ranks_func = get_pairwise_ranks_func
@abstractmethod
def makeDecision(self, F):
pass
def makePairwiseDecision(self, F):
dm = lambda F: self.makeDecision(F)
ranks = self.get_pairwise_ranks_func(F, 1, dm=dm)
return ranks
[docs]
class PINSGA2(GeneticAlgorithm):
def __init__(self,
pop_size=100,
sampling=FloatRandomSampling(),
selection=TournamentSelection(func_comp=binary_tournament),
crossover=SBX(eta=15, prob=0.9),
mutation=PM(eta=20),
output=MultiObjectiveOutput(),
tau=10,
eta=4,
opt_method="trust-constr",
vf_type="poly",
eps_max=1000,
ranking_type='pairwise',
presi_signs=None,
automated_dm=None,
verbose=False,
**kwargs):
self.survival = RankAndCrowding(nds=NonDominatedSorting(dominator=VFDominator(self)))
super().__init__(
pop_size=pop_size,
sampling=sampling,
selection=selection,
crossover=crossover,
mutation=mutation,
survival=self.survival,
output=output,
advance_after_initial_infill=True,
**kwargs)
self.termination = DefaultMultiObjectiveTermination()
self.tournament_type = 'comp_by_dom_and_crowding'
self.ranking_type=ranking_type
self.presi_signs=presi_signs
self.vf_type = vf_type
self.opt_method = opt_method
self.tau = tau
self.eta = eta
self.eta_F = []
self.vf_res = None
self.v2 = None
self.vf_plot_flag = False
self.vf_plot = None
self.historical_F = None
self.prev_pop = None
self.fronts = []
self.eps_max = eps_max
self.verbose = verbose
if automated_dm is not None:
automated_dm.get_pairwise_ranks_func = self._get_pairwise_ranks
self.automated_dm = automated_dm
def _warn(self, msg):
if self.verbose:
sys.stderr.write(msg)
@staticmethod
def _prompt_for_ranks(F, presi_signs):
for (e, f) in enumerate(F):
print("Solution %d %s" % (e + 1, f * presi_signs))
dim = F.shape[0]
raw_ranks = input(f"Ranks (e.g., \"3, {dim}, ..., 1\" for 3rd best, {dim}th best, ..., 1st best): ")
if raw_ranks == "":
ranks = []
else:
ranks = [int(raw_rank) for raw_rank in raw_ranks.split() ]
return ranks
@staticmethod
def _present_ranks(F, dm_ranks, presi_signs):
print("Solutions are ranked as:")
for (e, f) in enumerate(F):
print("Solution %d %s: Rank %d" % (e + 1, f * presi_signs, dm_ranks[e]))
@staticmethod
def _get_pairwise_ranks(F, presi_signs, dm=None):
if not dm:
dm = lambda F: input("\nWhich solution do you like best?\n" + \
f"[a] {F[0]}\n" + \
f"[b] {F[1]}\n" + \
"[c] These solutions are equivalent.\n--> " )
# initialize empty ranking
_ranks = []
for i, f in enumerate( F ):
# handle empty case, put first element in first place
if not _ranks:
_ranks.append( [i] )
else:
inserted = False
# for each remaining elements, compare to all currently ranked elements
for j, group in enumerate( _ranks ):
# get pairwise preference from user
while True:
points_to_compare = np.array( [f*presi_signs, F[ group[0] ]*presi_signs] )
preference_raw = dm( points_to_compare )
preference = preference_raw.strip().lower()
if preference in ['a', 'b', 'c']:
break
print("Invalid input. Please enter 'a', 'b', or 'c'.")
# if better than currently ranked element place before that element
if preference == 'a':
_ranks.insert( j, [i] )
inserted = True
break
# if equal to currently ranked element place with that element
elif preference == 'c':
group.append( i )
inserted = True
break
# if found to be worse than all place at the end
if not inserted:
_ranks.append( [i] )
ranks = np.zeros ( len( F ), dtype=int )
for rank, group in enumerate( _ranks ):
for index in group:
ranks[index] = rank
return np.array( ranks ) + 1
@staticmethod
def _get_ranks(F, presi_signs):
ranks_invalid = True
dim = F.shape[0]
print(f"Give each solution a ranking, with 1 being the highest score, and {dim} being the lowest score:")
ranks = PINSGA2._prompt_for_ranks(F, presi_signs)
while ranks_invalid:
fc = F.shape[0]
if len(ranks) > 0 and max(ranks) <= fc and min(ranks) >= 1:
ranks_invalid = False
else:
print("Invalid ranks given. Please try again")
ranks = PINSGA2._prompt_for_ranks(F, presi_signs)
return np.array(ranks);
def _reset_dm_preference(self):
self._warn("Back-tracking and removing DM preference from search.")
self.eta_F = []
self.vf_res = None
self.v2 = None
self.vf_plot_flag = False
self.vf_plot = None
self.pop = self.prev_pop
def _advance(self, infills=None, **kwargs):
super()._advance(infills=infills, **kwargs)
rank, F = self.pop.get("rank", "F")
self.fronts = rank
F = F[rank == 0]
if self.historical_F is not None:
self.historical_F = np.vstack((self.historical_F, F))
else:
self.historical_F = F
to_find = self.eta if F.shape[0] >= self.eta else F.shape[0]
if self.presi_signs is None:
self.presi_signs = np.ones(F.shape[1])
# Eta is the number of solutions displayed to the DM
eta_F_indices = select_points_with_maximum_distance(F, to_find, random_state=self.random_state)
self.eta_F = F[eta_F_indices]
self.eta_F = self.eta_F[self.eta_F[:,0].argsort()]
# Remove duplicate rows
self.eta_F = np.unique(self.eta_F, axis=0)
# A frozen view of the optimization each tau generations
self.paused_F = F
# Record the previous population in case we need to back track
self.prev_pop = self.pop
dm_time = self.n_gen % self.tau == 0
# Check whether we have more than one solution
if dm_time and len(self.eta_F) < 2:
self._warn("Population only contains one non-dominated solution. ")
self._reset_dm_preference()
elif dm_time:
# Check if the DM is a machine or a human
if self.automated_dm is None:
# Human DM
if self.ranking_type == "absolute":
dm_ranks = PINSGA2._get_ranks(self.eta_F, self.presi_signs)
elif self.ranking_type == "pairwise":
dm_ranks = PINSGA2._get_pairwise_ranks(self.eta_F, self.presi_signs)
PINSGA2._present_ranks(self.eta_F, dm_ranks, self.presi_signs)
else:
raise ValueError("Invalid ranking type [%s] given." % self.ranking_type)
else:
# Automated DM
if self.ranking_type == "absolute":
dm_ranks = self.automated_dm.makeDecision(self.eta_F)
elif self.ranking_type == "pairwise":
dm_ranks = self.automated_dm.makePairwiseDecision(self.eta_F)
else:
raise ValueError("Invalid ranking type [%s] given." % self.ranking_type)
if len(set(rank)) == 0:
self._warn("No preference between any two points provided.")
self._reset_dm_preference()
return
eta_F = self.eta_F
while eta_F.shape[0] > 1:
if self.vf_type == "linear":
vf_res = mvf.create_linear_vf(eta_F * -1,
dm_ranks.tolist(),
eps_max=self.eps_max,
method=self.opt_method)
elif self.vf_type == "poly":
vf_res = mvf.create_poly_vf(eta_F * -1,
dm_ranks.tolist(),
eps_max=self.eps_max,
method=self.opt_method)
else:
raise ValueError("Value function %s not supported" % self.vf_type)
# check if we were able to model the VF
if vf_res.fit:
self.vf_res = vf_res
self.vf_plot_flag = True
self.v2 = self.vf_res.vf(eta_F[dm_ranks[1] - 1] * -1).item()
break
else:
# If we didn't the model, try to remove the least preferred point and try to refit
self._warn("Could not fit a function to the DM preference")
if eta_F.shape[0] == 2:
# If not, reset and use normal domination
self._warn("Removing DM preference")
self._reset_dm_preference()
break
else:
self._warn("Removing the second best preferred solution from the fit.")
# ranks start at 1, not zero
rank_to_remove = dm_ranks[1]
eta_F = np.delete(eta_F, rank_to_remove - 1, axis=0)
dm_ranks = np.concatenate(([dm_ranks[0]], dm_ranks[2:]))
# update the ranks, since we just removed one
dm_ranks[dm_ranks > rank_to_remove] = dm_ranks[dm_ranks > rank_to_remove] - 1
parse_doc_string(PINSGA2.__init__)