"""Optimal stopping theory for finance.
Longstaff-Schwartz American option pricing, binomial American options,
optimal exit thresholds, sequential testing, and change-point detection.
"""
from __future__ import annotations
from typing import Callable
import numpy as np
from numpy.typing import ArrayLike
from wraquant.core._coerce import coerce_array
__all__ = [
"longstaff_schwartz",
"binomial_american",
"optimal_exit_threshold",
"sequential_probability_ratio",
"cusum_stopping",
"secretary_problem_threshold",
]
# ---------------------------------------------------------------------------
# Longstaff-Schwartz
# ---------------------------------------------------------------------------
[docs]
def longstaff_schwartz(
paths: ArrayLike,
strike: float,
rf_rate: float,
dt: float,
basis_functions: str = "laguerre",
option_type: str = "put",
) -> float:
"""Longstaff-Schwartz algorithm for American option pricing.
Parameters
----------
paths : array_like
Simulated price paths of shape ``(n_paths, n_steps + 1)``.
Column 0 is the initial price.
strike : float
Strike price.
rf_rate : float
Risk-free rate (annualised, continuously compounded).
dt : float
Time step (in years).
basis_functions : {'laguerre', 'polynomial'}, optional
Basis for the continuation-value regression (default ``'laguerre'``).
option_type : {'put', 'call'}, optional
Option type (default ``'put'``).
Returns
-------
float
Estimated American option price.
Example
-------
>>> import numpy as np
>>> from wraquant.math.optimal_stopping import longstaff_schwartz
>>> rng = np.random.default_rng(42)
>>> S0, K, r, vol, T, n_steps = 100, 100, 0.05, 0.2, 1.0, 50
>>> dt = T / n_steps
>>> # Simulate GBM paths
>>> z = rng.standard_normal((10000, n_steps))
>>> paths = np.zeros((10000, n_steps + 1))
>>> paths[:, 0] = S0
>>> for i in range(n_steps):
... paths[:, i+1] = paths[:, i] * np.exp((r - 0.5*vol**2)*dt + vol*np.sqrt(dt)*z[:, i])
>>> price = longstaff_schwartz(paths, strike=K, rf_rate=r, dt=dt)
>>> price > 0
True
Notes
-----
Reference: Longstaff, F. A. & Schwartz, E. S. (2001). "Valuing
American Options by Simulation." *Review of Financial Studies*, 14(1),
113-147.
See Also
--------
binomial_american : Binomial tree pricing (exact, but slower for many steps).
wraquant.price : Options pricing module with Black-Scholes and SDEs.
"""
paths = np.asarray(paths, dtype=float)
n_paths, n_steps_plus_1 = paths.shape
n_steps = n_steps_plus_1 - 1
discount = np.exp(-rf_rate * dt)
# Payoff at each node
if option_type == "put":
payoff_fn: Callable[[np.ndarray], np.ndarray] = lambda s: np.maximum(strike - s, 0.0)
else:
payoff_fn = lambda s: np.maximum(s - strike, 0.0)
# Cash flow matrix: stores the time at which the option is exercised
# and the corresponding payoff
cashflows = payoff_fn(paths[:, -1])
exercise_time = np.full(n_paths, n_steps, dtype=int)
# Backward induction
for t in range(n_steps - 1, 0, -1):
S_t = paths[:, t]
intrinsic = payoff_fn(S_t)
itm = intrinsic > 0
if itm.sum() == 0:
continue
# Discounted future cash flows for ITM paths
steps_to_cf = exercise_time[itm] - t
Y = cashflows[itm] * discount ** steps_to_cf
X = S_t[itm]
# Build basis
basis = _build_basis(X, basis_functions)
# Regression
try:
from wraquant.stats.regression import ols
result = ols(Y, basis, add_constant=False)
coeffs = result["coefficients"]
continuation = basis @ coeffs
except (np.linalg.LinAlgError, Exception):
continuation = Y
# Exercise decision
exercise_now = intrinsic[itm] >= continuation
itm_indices = np.where(itm)[0]
ex_idx = itm_indices[exercise_now]
cashflows[ex_idx] = intrinsic[itm][exercise_now]
exercise_time[ex_idx] = t
# Discount all cash flows to time 0
discounted = cashflows * discount ** exercise_time
return float(np.mean(discounted))
def _build_basis(
x: np.ndarray,
basis_type: str,
degree: int = 3,
) -> np.ndarray:
"""Build basis functions for the LSM regression."""
if basis_type == "laguerre":
# First few Laguerre polynomials
L0 = np.ones_like(x)
L1 = 1.0 - x
L2 = 0.5 * (x ** 2 - 4 * x + 2)
return np.column_stack([L0, L1, L2])
elif basis_type == "polynomial":
return np.column_stack([x ** i for i in range(degree + 1)])
else:
raise ValueError(
f"Unknown basis {basis_type!r}; use 'laguerre' or 'polynomial'."
)
# ---------------------------------------------------------------------------
# Binomial American option
# ---------------------------------------------------------------------------
[docs]
def binomial_american(
spot: float,
strike: float,
rf_rate: float,
vol: float,
T: float,
n_steps: int,
option_type: str = "put",
) -> float:
"""Price an American option using the CRR binomial tree.
Parameters
----------
spot : float
Current underlying price.
strike : float
Strike price.
rf_rate : float
Risk-free rate (annualised).
vol : float
Volatility (annualised).
T : float
Time to expiration (years).
n_steps : int
Number of tree steps.
option_type : {'put', 'call'}, optional
Option type (default ``'put'``).
Returns
-------
float
American option price.
Example
-------
>>> from wraquant.math.optimal_stopping import binomial_american
>>> price = binomial_american(spot=100, strike=100, rf_rate=0.05,
... vol=0.2, T=1.0, n_steps=200)
>>> price > 0
True
>>> # American put is worth at least European put
>>> price >= 4.0 # approximate lower bound
True
See Also
--------
longstaff_schwartz : Monte Carlo pricing for path-dependent payoffs.
"""
dt = T / n_steps
u = np.exp(vol * np.sqrt(dt))
d = 1.0 / u
p = (np.exp(rf_rate * dt) - d) / (u - d)
disc = np.exp(-rf_rate * dt)
# Terminal asset prices
j = np.arange(n_steps + 1, dtype=float)
prices = spot * u ** (n_steps - j) * d ** j
# Terminal payoff
if option_type == "put":
values = np.maximum(strike - prices, 0.0)
else:
values = np.maximum(prices - strike, 0.0)
# Backward induction with early exercise
for step in range(n_steps - 1, -1, -1):
values = disc * (p * values[:-1] + (1 - p) * values[1:])
prices_step = spot * u ** (step - np.arange(step + 1, dtype=float)) * d ** np.arange(step + 1, dtype=float)
if option_type == "put":
intrinsic = np.maximum(strike - prices_step, 0.0)
else:
intrinsic = np.maximum(prices_step - strike, 0.0)
values = np.maximum(values, intrinsic)
return float(values[0])
# ---------------------------------------------------------------------------
# Optimal exit threshold (Ornstein-Uhlenbeck)
# ---------------------------------------------------------------------------
[docs]
def optimal_exit_threshold(
mu: float,
sigma: float,
transaction_cost: float,
) -> dict[str, float]:
r"""Compute the optimal exit threshold for an OU mean-reverting process.
For a process :math:`dX = -\mu\,X\,dt + \sigma\,dW`, the optimal
exit threshold balances expected profit against the cost of trading.
Uses the analytical approximation:
.. math::
x^* \approx \sigma \sqrt{\frac{2}{\mu}}
\ln\!\left(\frac{\sigma}{\sqrt{2\mu}\,c}\right)
when the threshold is large relative to *transaction_cost*.
Parameters
----------
mu : float
Mean-reversion speed (> 0).
sigma : float
Volatility of the OU process (> 0).
transaction_cost : float
Round-trip transaction cost per unit.
Returns
-------
dict
``entry_threshold`` – optimal entry (in units of process value).
``exit_threshold`` – optimal exit threshold. Higher values
mean waiting for a larger dislocation before exiting.
``expected_profit`` – approximate expected profit per trade.
Example
-------
>>> from wraquant.math.optimal_stopping import optimal_exit_threshold
>>> result = optimal_exit_threshold(mu=5.0, sigma=0.5,
... transaction_cost=0.001)
>>> result['exit_threshold'] > 0
True
>>> result['expected_profit'] > 0
True
See Also
--------
wraquant.stats.cointegration : Test for cointegration before pairs trading.
wraquant.execution.optimal : Optimal execution with Almgren-Chriss.
"""
if mu <= 0 or sigma <= 0:
raise ValueError("mu and sigma must be positive.")
# Approximate optimal threshold
ratio = sigma / (np.sqrt(2.0 * mu) * max(transaction_cost, 1e-12))
if ratio > 1.0:
x_star = sigma * np.sqrt(2.0 / mu) * np.sqrt(np.log(ratio))
else:
x_star = transaction_cost * 2.0 # fallback
# Expected profit: threshold minus costs
expected_profit = x_star - transaction_cost
return {
"entry_threshold": 0.0, # enter when X crosses zero (mean)
"exit_threshold": float(x_star),
"expected_profit": float(max(expected_profit, 0.0)),
}
# ---------------------------------------------------------------------------
# Sequential Probability Ratio Test
# ---------------------------------------------------------------------------
[docs]
def sequential_probability_ratio(
observations: ArrayLike,
h0_dist: tuple[str, dict[str, float]],
h1_dist: tuple[str, dict[str, float]],
alpha: float = 0.05,
beta: float = 0.05,
) -> dict[str, float | int | str]:
"""Sequential Probability Ratio Test (SPRT).
Sequentially tests H0 vs H1 with controlled error probabilities.
Parameters
----------
observations : array_like
Sequentially observed data.
h0_dist : tuple
``(distribution_name, params)`` for H0.
Supported: ``('normal', {'mu': ..., 'sigma': ...})``.
h1_dist : tuple
``(distribution_name, params)`` for H1.
alpha : float, optional
Type I error probability (default 0.05).
beta : float, optional
Type II error probability (default 0.05).
Returns
-------
dict
``decision`` – ``'reject_h0'``, ``'accept_h0'``, or ``'inconclusive'``.
``stopping_time`` – index at which the decision was reached. Fewer
observations needed indicates stronger signal.
``log_ratio`` – final log-likelihood ratio.
Example
-------
>>> import numpy as np
>>> from wraquant.math.optimal_stopping import sequential_probability_ratio
>>> rng = np.random.default_rng(42)
>>> # Generate data from H1 (mu=0.5) and test against H0 (mu=0)
>>> obs = rng.normal(0.5, 1.0, size=100)
>>> result = sequential_probability_ratio(
... obs,
... h0_dist=('normal', {'mu': 0.0, 'sigma': 1.0}),
... h1_dist=('normal', {'mu': 0.5, 'sigma': 1.0}),
... )
>>> result['decision']
'reject_h0'
See Also
--------
cusum_stopping : CUSUM-based change detection (non-parametric alternative).
"""
observations = coerce_array(observations, name="observations")
# Wald boundaries
A = np.log((1.0 - beta) / alpha) # upper boundary (reject H0)
B = np.log(beta / (1.0 - alpha)) # lower boundary (accept H0)
cumulative_lr = 0.0
stopping_time = len(observations)
decision = "inconclusive"
for i, x in enumerate(observations):
lr = _log_likelihood_ratio(x, h0_dist, h1_dist)
cumulative_lr += lr
if cumulative_lr >= A:
decision = "reject_h0"
stopping_time = i + 1
break
elif cumulative_lr <= B:
decision = "accept_h0"
stopping_time = i + 1
break
return {
"decision": decision,
"stopping_time": stopping_time,
"log_ratio": float(cumulative_lr),
}
def _log_likelihood_ratio(
x: float,
h0_dist: tuple[str, dict[str, float]],
h1_dist: tuple[str, dict[str, float]],
) -> float:
"""Log-likelihood ratio for a single observation."""
from scipy.stats import norm as sp_norm
name0, params0 = h0_dist
name1, params1 = h1_dist
if name0 == "normal" and name1 == "normal":
ll0 = sp_norm.logpdf(x, loc=params0["mu"], scale=params0["sigma"])
ll1 = sp_norm.logpdf(x, loc=params1["mu"], scale=params1["sigma"])
return float(ll1 - ll0)
else:
raise ValueError(f"Unsupported distributions: {name0}, {name1}")
# ---------------------------------------------------------------------------
# CUSUM stopping rule
# ---------------------------------------------------------------------------
[docs]
def cusum_stopping(
observations: ArrayLike,
target_mean: float,
threshold: float,
) -> dict[str, float | int | bool]:
"""CUSUM stopping rule for detecting a mean shift.
Monitors cumulative sum of deviations from *target_mean* and triggers
a stop when the CUSUM statistic exceeds *threshold*.
Parameters
----------
observations : array_like
Sequential observations.
target_mean : float
Expected mean under the null (no-change) hypothesis.
threshold : float
CUSUM threshold for triggering a detection.
Returns
-------
dict
``detected`` – whether a change was detected.
``stopping_time`` – index of detection (or length of data if none).
``cusum_pos`` – final positive CUSUM statistic.
``cusum_neg`` – final negative CUSUM statistic.
``cusum_values`` – array of max(cusum_pos, |cusum_neg|) at each step.
Example
-------
>>> import numpy as np
>>> from wraquant.math.optimal_stopping import cusum_stopping
>>> # Returns with a mean shift at index 50
>>> obs = np.concatenate([np.random.randn(50) * 0.01,
... np.random.randn(50) * 0.01 + 0.05])
>>> result = cusum_stopping(obs, target_mean=0.0, threshold=0.5)
>>> result['detected']
True
>>> result['stopping_time'] > 50 # detected after the shift
True
See Also
--------
sequential_probability_ratio : Parametric sequential test.
wraquant.ts.changepoints : Change-point detection for time series.
"""
observations = coerce_array(observations, name="observations")
n = len(observations)
cusum_pos = 0.0
cusum_neg = 0.0
detected = False
stopping_time = n
cusum_values = np.zeros(n)
for i in range(n):
cusum_pos = max(0.0, cusum_pos + (observations[i] - target_mean))
cusum_neg = min(0.0, cusum_neg + (observations[i] - target_mean))
cusum_values[i] = max(cusum_pos, abs(cusum_neg))
if cusum_pos > threshold or abs(cusum_neg) > threshold:
detected = True
stopping_time = i + 1
break
return {
"detected": detected,
"stopping_time": stopping_time,
"cusum_pos": float(cusum_pos),
"cusum_neg": float(cusum_neg),
"cusum_values": cusum_values[: stopping_time],
}
# ---------------------------------------------------------------------------
# Secretary problem
# ---------------------------------------------------------------------------
[docs]
def secretary_problem_threshold(
n_candidates: int,
) -> dict[str, float | int]:
"""Compute the optimal stopping threshold for the secretary problem.
The classical 1/e rule: reject the first *r - 1* candidates, then
hire the first one who is better than all previously seen.
Parameters
----------
n_candidates : int
Total number of candidates.
Returns
-------
dict
``threshold`` – number of candidates to unconditionally reject.
``optimal_fraction`` – threshold / n_candidates. Converges to
1/e (approximately 0.368) as *n_candidates* grows.
``success_probability`` – probability of selecting the best candidate.
Converges to 1/e for large *n_candidates*.
Example
-------
>>> from wraquant.math.optimal_stopping import secretary_problem_threshold
>>> result = secretary_problem_threshold(100)
>>> 30 <= result['threshold'] <= 40 # close to 1/e * 100 ~ 37
True
>>> result['success_probability'] > 0.3
True
See Also
--------
sequential_probability_ratio : Sequential testing framework.
"""
if n_candidates <= 0:
raise ValueError("n_candidates must be positive.")
if n_candidates == 1:
return {
"threshold": 0,
"optimal_fraction": 0.0,
"success_probability": 1.0,
}
# Find r that maximises the probability of selecting the best
best_prob = 0.0
best_r = 1
for r in range(1, n_candidates + 1):
# P(win) = (r-1)/n * sum_{i=r}^{n} 1/(i-1)
prob = 0.0
for i in range(r, n_candidates + 1):
prob += 1.0 / (i - 1) if i > 1 else 1.0
prob *= (r - 1) / n_candidates
if r == 1:
# If r=1, we pick the first candidate: prob = 1/n
prob = 1.0 / n_candidates
if prob > best_prob:
best_prob = prob
best_r = r
return {
"threshold": best_r - 1, # reject first (r-1) candidates
"optimal_fraction": float((best_r - 1) / n_candidates),
"success_probability": float(best_prob),
}