"""Multi-objective optimization.
Core routines (weighted-sum Pareto approximation and epsilon-constraint)
use pure scipy. NSGA-II is gated behind the ``optimization`` extra
(pymoo).
"""
from __future__ import annotations
from typing import Any, Callable
import numpy as np
import numpy.typing as npt
from scipy import optimize
from wraquant.core._coerce import coerce_array # noqa: F401 — wired for type-system consistency
from wraquant.core.decorators import requires_extra
[docs]
def pareto_front(
objectives: list[Callable[..., float]],
constraints: list[dict[str, Any]] | None = None,
n_points: int = 50,
bounds: list[tuple[float, float]] | None = None,
) -> dict[str, Any]:
"""Approximate the Pareto frontier via the weighted-sum method.
Use the weighted-sum approach for a quick Pareto front approximation
when you have a convex bi-objective problem (e.g., risk vs. return,
cost vs. tracking error). For non-convex problems or more than 2
objectives, use ``nsga2`` instead.
For each of *n_points* evenly-spaced weight vectors the scalar
weighted-sum of the objectives is minimised using SLSQP.
Parameters:
objectives: List of scalar objective functions ``f(x) -> float``
to be minimised simultaneously.
constraints: Scipy-compatible constraint dicts applied to every
sub-problem.
n_points: Number of Pareto-front samples (default 50).
bounds: Variable bounds ``[(lb, ub), ...]``.
Returns:
Dict with keys ``points`` (non-dominated decision vectors, shape
``(k, n)``), ``objectives`` (corresponding objective values,
shape ``(k, m)``), and ``n_points`` (number of non-dominated
points found).
Example:
>>> import numpy as np
>>> f1 = lambda x: float(x[0] ** 2)
>>> f2 = lambda x: float((x[0] - 1) ** 2)
>>> result = pareto_front([f1, f2], bounds=[(0, 1)], n_points=20)
>>> result['n_points'] > 0
True
See Also:
nsga2: Evolutionary multi-objective optimizer (handles non-convex).
epsilon_constraint: Epsilon-constraint Pareto method.
"""
n_obj = len(objectives)
if n_obj < 2:
raise ValueError("Need at least two objectives for a Pareto front.")
# Generate weight vectors (simplex grid for 2 objectives,
# uniform for >2)
if n_obj == 2:
alphas = np.linspace(0.0, 1.0, n_points)
weight_vectors = np.column_stack([alphas, 1.0 - alphas])
else:
rng = np.random.default_rng(0)
raw = rng.dirichlet(np.ones(n_obj), size=n_points)
weight_vectors = raw
if bounds is not None:
n_vars = len(bounds)
else:
# infer from constraints or default
n_vars = 2
points: list[npt.NDArray] = []
obj_values: list[npt.NDArray] = []
for w in weight_vectors:
def weighted_obj(x: npt.NDArray, _w: npt.NDArray = w) -> float:
return float(sum(_w[i] * objectives[i](x) for i in range(n_obj)))
x0 = np.zeros(n_vars)
if bounds is not None:
x0 = np.array([(lb + ub) / 2 for lb, ub in bounds])
result = optimize.minimize(
weighted_obj,
x0,
method="SLSQP",
bounds=bounds,
constraints=constraints,
)
if result.success:
points.append(result.x)
obj_values.append(np.array([objectives[i](result.x) for i in range(n_obj)]))
if not points:
return {
"points": np.empty((0, n_vars)),
"objectives": np.empty((0, n_obj)),
"n_points": 0,
}
pts = np.array(points)
objs = np.array(obj_values)
# Filter dominated solutions
mask = _non_dominated_mask(objs)
pts = pts[mask]
objs = objs[mask]
return {
"points": pts,
"objectives": objs,
"n_points": int(pts.shape[0]),
}
def _non_dominated_mask(objectives: npt.NDArray[np.floating]) -> npt.NDArray[np.bool_]:
"""Return boolean mask of non-dominated rows (all objectives minimised)."""
n = objectives.shape[0]
mask = np.ones(n, dtype=bool)
for i in range(n):
if not mask[i]:
continue
for j in range(i + 1, n):
if not mask[j]:
continue
# Check if j dominates i
if np.all(objectives[j] <= objectives[i]) and np.any(
objectives[j] < objectives[i]
):
mask[i] = False
break
# Check if i dominates j
if np.all(objectives[i] <= objectives[j]) and np.any(
objectives[i] < objectives[j]
):
mask[j] = False
return mask
[docs]
@requires_extra("optimization")
def nsga2(
objectives: list[Callable[..., float]],
bounds: list[tuple[float, float]],
pop_size: int = 100,
n_gen: int = 200,
seed: int | None = None,
) -> dict[str, Any]:
"""NSGA-II multi-objective optimisation via pymoo.
Use NSGA-II when you have two or more competing objectives and want
to find the Pareto-optimal frontier. In finance, this is used for
risk-return trade-offs, cost-tracking trade-offs in execution, and
multi-factor portfolio construction. NSGA-II is an evolutionary
algorithm that maintains diversity across the Pareto front.
Parameters:
objectives: List of scalar objective functions ``f(x) -> float``,
each to be minimised.
bounds: Variable bounds ``[(lb, ub), ...]``.
pop_size: Population size (default 100). Larger populations
improve frontier coverage but increase computation.
n_gen: Number of generations (default 200).
seed: Random seed for reproducibility.
Returns:
Dict with keys:
- ``pareto_front``: np.ndarray of decision vectors on the
Pareto front (shape ``(n_points, n_var)``).
- ``pareto_objectives``: np.ndarray of objective values (shape
``(n_points, n_obj)``).
- ``n_points``: number of Pareto-optimal solutions found.
Example:
>>> import numpy as np
>>> # Minimise x^2 and (x-2)^2 simultaneously
>>> f1 = lambda x: float(x[0] ** 2)
>>> f2 = lambda x: float((x[0] - 2) ** 2)
>>> result = nsga2([f1, f2], bounds=[(0, 3)], pop_size=50, n_gen=100, seed=42)
>>> result['n_points'] > 0
True
References:
- Deb et al. (2002), "A Fast and Elitist Multiobjective Genetic
Algorithm: NSGA-II"
See Also:
pareto_front: Weighted-sum Pareto approximation (no pymoo needed).
epsilon_constraint: Epsilon-constraint method.
"""
from pymoo.algorithms.moo.nsga2 import NSGA2
from pymoo.core.problem import ElementwiseProblem
from pymoo.optimize import minimize as pymoo_minimize
from pymoo.termination import get_termination
n_var = len(bounds)
n_obj = len(objectives)
xl = np.array([b[0] for b in bounds])
xu = np.array([b[1] for b in bounds])
class _Problem(ElementwiseProblem):
def __init__(self) -> None:
super().__init__(n_var=n_var, n_obj=n_obj, xl=xl, xu=xu)
def _evaluate(
self, x: npt.NDArray, out: dict, *args: Any, **kwargs: Any
) -> None:
out["F"] = np.array([obj(x) for obj in objectives])
algorithm = NSGA2(pop_size=pop_size)
termination = get_termination("n_gen", n_gen)
result = pymoo_minimize(
_Problem(),
algorithm,
termination,
seed=seed,
verbose=False,
)
if result.F is not None:
return {
"pareto_front": result.X,
"pareto_objectives": result.F,
"n_points": int(result.F.shape[0]),
}
return {
"pareto_front": np.empty((0, n_var)),
"pareto_objectives": np.empty((0, n_obj)),
"n_points": 0,
}
[docs]
def epsilon_constraint(
primary_obj: Callable[..., float],
secondary_objs: list[Callable[..., float]],
epsilon_values: list[npt.NDArray[np.floating]],
bounds: list[tuple[float, float]] | None = None,
constraints: list[dict[str, Any]] | None = None,
) -> dict[str, Any]:
"""Epsilon-constraint method for multi-objective optimization.
Minimises the *primary_obj* while constraining each secondary
objective to be at most the corresponding epsilon value.
Parameters:
primary_obj: Primary objective function to minimise.
secondary_objs: Secondary objective functions.
epsilon_values: List of 1-D arrays. For each secondary objective
``k``, ``epsilon_values[k]`` gives the set of upper-bound
values to iterate over. The Cartesian product of all epsilon
arrays defines the grid of sub-problems.
bounds: Variable bounds ``[(lb, ub), ...]``.
constraints: Additional scipy constraint dicts.
Returns:
Dict with keys ``points`` (decision vectors), ``primary_values``
(primary objective at each point), ``secondary_values`` (secondary
objective values), and ``n_points``.
"""
if len(secondary_objs) != len(epsilon_values):
raise ValueError("Length of secondary_objs and epsilon_values must match.")
if bounds is not None:
n_vars = len(bounds)
else:
n_vars = 2
# Build epsilon grid via Cartesian product
eps_arrays = [np.asarray(ev, dtype=float) for ev in epsilon_values]
grids = np.meshgrid(*eps_arrays, indexing="ij")
eps_grid = np.column_stack([g.ravel() for g in grids])
points: list[npt.NDArray] = []
primary_vals: list[float] = []
secondary_vals: list[npt.NDArray] = []
base_constraints = list(constraints) if constraints else []
for eps_row in eps_grid:
sub_constraints = list(base_constraints)
for k, sec_obj in enumerate(secondary_objs):
eps_k = float(eps_row[k])
sub_constraints.append(
{
"type": "ineq",
"fun": lambda x, _f=sec_obj, _e=eps_k: _e - _f(x),
}
)
x0 = np.zeros(n_vars)
if bounds is not None:
x0 = np.array([(lb + ub) / 2 for lb, ub in bounds])
result = optimize.minimize(
primary_obj,
x0,
method="SLSQP",
bounds=bounds,
constraints=sub_constraints,
)
if result.success:
points.append(result.x)
primary_vals.append(float(result.fun))
secondary_vals.append(np.array([sec(result.x) for sec in secondary_objs]))
if not points:
n_sec = len(secondary_objs)
return {
"points": np.empty((0, n_vars)),
"primary_values": np.empty(0),
"secondary_values": np.empty((0, n_sec)),
"n_points": 0,
}
return {
"points": np.array(points),
"primary_values": np.array(primary_vals),
"secondary_values": np.array(secondary_vals),
"n_points": len(points),
}