Parallelization

In practice, parallelization is essential and can significantly speed up optimization. For population-based algorithms, the evaluation of a set of solutions can be parallelized easily by parallelizing the evaluation itself.

Vectorized Matrix Operations

One way is using the NumPy matrix operations, which has been used for almost all test problems implemented in pymoo. By default, elementwise_evaluation is set to False, which implies the _evaluate retrieves a set of solutions. Thus, x is a matrix where each row is an individual, and each column a variable.

[ ]:
import numpy as np
from pymoo.core.problem import Problem

class MyProblem(Problem):

    def __init__(self, **kwargs):
        super().__init__(n_var=10, n_obj=1, n_ieq_constr=0, xl=-5, xu=5, **kwargs)

    def _evaluate(self, x, out, *args, **kwargs):
         out["F"] = np.sum(x ** 2, axis=1)

problem = MyProblem()

The axis=1 operation parallelizes the sum of the matrix directly using an efficient NumPy operation.

[ ]:
from pymoo.algorithms.soo.nonconvex.ga import GA
from pymoo.optimize import minimize

res = minimize(problem, GA(), termination=("n_gen", 200), seed=1)
print('Threads:', res.exec_time)

Starmap Interface

In general, pymoo allows passing a starmap object to be used for parallelization. The starmap interface is defined in the Python standard library multiprocessing.Pool.starmap function. This allows excellent and flexible parallelization opportunities.

IMPORTANT: Please note that the problem needs to have set elementwise_evaluation=True, which implicates one call of _evaluate only takes care of a single solution.

[ ]:
from pymoo.core.problem import ElementwiseProblem

class MyProblem(ElementwiseProblem):

    def __init__(self, **kwargs):
        super().__init__(n_var=10, n_obj=1, n_ieq_constr=0, xl=-5, xu=5, **kwargs)

    def _evaluate(self, x, out, *args, **kwargs):
         out["F"] = (x ** 2).sum()

Then, we can pass a starmap object to be used for parallelization.

Threads

[ ]:
from multiprocessing.pool import ThreadPool
from pymoo.core.problem import StarmapParallelization
from pymoo.algorithms.soo.nonconvex.ga import GA
from pymoo.optimize import minimize


# initialize the thread pool and create the runner
n_threads = 4
pool = ThreadPool(n_threads)
runner = StarmapParallelization(pool.starmap)

# define the problem by passing the starmap interface of the thread pool
problem = MyProblem(elementwise_runner=runner)

res = minimize(problem, GA(), termination=("n_gen", 200), seed=1)
print('Threads:', res.exec_time)

pool.close()

Processes

[ ]:
import multiprocessing
from pymoo.algorithms.soo.nonconvex.ga import GA
from pymoo.optimize import minimize


# initialize the thread pool and create the runner
n_proccess = 8
pool = multiprocessing.Pool(n_proccess)
runner = StarmapParallelization(pool.starmap)

# define the problem by passing the starmap interface of the thread pool
problem = MyProblem(elementwise_runner=runner)

res = minimize(problem, GA(), termination=("n_gen", 200), seed=1)
print('Threads:', res.exec_time)

pool.close()

Note: Here clearly the overhead of serializing and transfer the data are visible.

Dask

More advanced is to distribute the evaluation function to a couple of workers. There exists a couple of frameworks that support the distribution of code. For our framework, we recommend using Dask.

Documentation to setup the cluster is available here. You first start a scheduler somewhere and then connect workers to it. Then, a client object connects to the scheduler and distributes the jobs automatically for you.

[ ]:
from pymoo.algorithms.soo.nonconvex.ga import GA
from pymoo.optimize import minimize
from pymoo.core.problem import DaskParallelization

from dask.distributed import Client
client = Client()
client.restart()
print("DASK STARTED")

# initialize the thread pool and create the runner
runner = DaskParallelization(client)

# define the problem by passing the starmap interface of the thread pool
problem = MyProblem(elementwise_runner=runner)

res = minimize(problem, GA(), termination=("n_gen", 200), seed=1)
print('Threads:', res.exec_time)

client.close()
print("DASK SHUTDOWN")

Note: Here, the overhead of transferring data to the workers of Dask is dominating. However, if your problem is computationally more expensive, this shall not be the case anymore.

Custom Parallelization

If you need more control over the parallelization process, we like to provide an example of fully customizable parallelization. The _evaluate function gets the whole set of solutions to be evaluated because, by default, elementwise_evaluation is disabled.

Threads

Thus, a thread pool can be initialized in the constructor of the Problem class and then be used to speed up the evaluation. The code below basically does what internally happens using the starmap interface of pymoo directly (with an inline function definition and without some overhead, this is why it is slightly faster).

[ ]:
from pymoo.core.problem import Problem

pool = ThreadPool(8)

class MyProblem(Problem):

    def __init__(self, **kwargs):
        super().__init__(n_var=10, n_obj=1, n_ieq_constr=0, xl=-5, xu=5, **kwargs)

    def _evaluate(self, X, out, *args, **kwargs):

        # define the function
        def my_eval(x):
            return (x ** 2).sum()

        # prepare the parameters for the pool
        params = [[X[k]] for k in range(len(X))]

        # calculate the function values in a parallelized manner and wait until done
        F = pool.starmap(my_eval, params)

        # store the function values and return them.
        out["F"] = np.array(F)

problem = MyProblem()
[ ]:
res = minimize(problem, GA(), termination=("n_gen", 200), seed=1)
print('Threads:', res.exec_time)
[ ]:
pool.close()

Dask

[ ]:
import numpy as np
from dask.distributed import Client

from pymoo.core.problem import Problem
from pymoo.optimize import minimize

client = Client(processes=False)

class MyProblem(Problem):

    def __init__(self, *args, **kwargs):
        super().__init__(n_var=10, n_obj=1, n_ieq_constr=0, xl=-5, xu=5,
                         elementwise_evaluation=False, *args, **kwargs)

    def _evaluate(self, X, out, *args, **kwargs):
        def fun(x):
            return np.sum(x ** 2)

        jobs = [client.submit(fun, x) for x in X]
        out["F"] = np.row_stack([job.result() for job in jobs])

[ ]:
problem = MyProblem()

res = minimize(problem, GA(), termination=("n_gen", 200), seed=1)
print('Dask:', res.exec_time)

client.close()