pymoo
Latest Version: pymoo==0.3.1

Parallelization

Most algorithms in our framework are population based. Therefore, the evaluation function receives not only one but multiple solutions at a time. The amount of individuals depends on the algorithm logic and population size. The function targets to set objective and constraint values into a matrix and the evaluation can be done independent of pymoo. In the following different implementations of the evaluation function are discussed.

Vectorized

By default matrix and vector based operations are used to speed up calculations. An example of a vectorized evaluation is already provided in our Getting Started Guide.

[1]:
import autograd.numpy as anp

from pymoo.model.problem import Problem

class MyProblem(Problem):

    def __init__(self, **kwargs):
        super().__init__(n_var=10, n_obj=1, n_constr=0, xl=-5, xu=5, **kwargs)

    def _evaluate(self, x, out, *args, **kwargs):
         out["F"] = anp.sum(x ** 2, axis=1)

By default no parallelization is applied and only the code is executed.

[2]:
problem = MyProblem(parallelization = None)

Create some sample input:

[3]:
import numpy as np

X = np.random.random((10000, problem.n_var))
[4]:
%time F = problem.evaluate(X)
CPU times: user 6.56 ms, sys: 1.41 ms, total: 7.96 ms
Wall time: 1.97 ms

Multiple Threads

Another way of defining a problem is by using the elementwise_evaluation and then execute each evaluation is a different thread. This can be automatically achieved by the following code:

[5]:
class MyProblem(Problem):

    def __init__(self, **kwargs):
        super().__init__(n_var=10, n_obj=1, n_constr=0, xl=-5, xu=5,
                         elementwise_evaluation=True, **kwargs)

    def _evaluate(self, x, out, *args, **kwargs):
         out["F"] = (x ** 2).sum()
[6]:
problem = MyProblem(parallelization = ("threads", 4))
[7]:
%time F = problem.evaluate(X)
CPU times: user 127 ms, sys: 27.7 ms, total: 154 ms
Wall time: 171 ms

Please note, that for small examples as shown here vectorization will always be faster. However, problem evaluation functions which are based on simulations can often not be easily vectorized because of their independent behavior.

Dask

More advances it to distribute the evaluation function to a couple of workers. There exists a couple of framework that support the distribution of code. For our framework, we recommend using Dask.

A documentation to setup the cluster is available here. Basically, you first start a scheduler somewhere and then connect workers to it. Then, a client object connects to the scheduler and distributes the jobs automatically for you.

[8]:
import numpy as np
from dask.distributed import Client
from pymoo.model.problem import Problem


# connect to the dask scheduler
client = Client(address="host-94108.dhcp.egr.msu.edu:8786")

# define the evaluation function that directly returns the objective and/or
# constraint values. Please note it must be elementwise_evaluation.
def fun(x):
    return {
        "F": np.sum(x ** 2)
    }

# define a problem without any evaluation function - everything is done by parallelization
class MyProblem(Problem):

    def __init__(self, *args, **kwargs):
        super().__init__(n_var=10, n_obj=1, n_constr=0, xl=-5, xu=5,
                         elementwise_evaluation=True, *args, **kwargs)

# create the problem and set the parallelization to dask
problem = MyProblem(parallelization=("dask", client, fun))

X = np.random.random((10, problem.n_var))
[9]:
%timeit problem.evaluate(X)
111 ms ± 46.4 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)

Manually use Dask

In order to have full control over the distribution you can use Dask manually in the _evaluate function. By using the parallelization this is done internally, however, it might be useful to have full control over it.

[10]:
class MyProblem(Problem):

    def __init__(self, *args, **kwargs):
        super().__init__(n_var=10, n_obj=1, n_constr=0, xl=-5, xu=5,
                         elementwise_evaluation=True, *args, **kwargs)

    def _evaluate(self, X, out, *args, **kwargs):

        def fun(x):
            return np.sum(x ** 2)

        jobs = [client.submit(fun, x) for x in X]
        out["F"] = np.row_stack([job.result() for job in jobs])


problem = MyProblem()
[11]:
%timeit problem.evaluate(X)
1.11 s ± 41 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)