Source code for wraquant.risk.monte_carlo

"""Advanced Monte Carlo methods for risk measurement.

Variance reduction techniques, bootstrap methods, and filtered
historical simulation for improved VaR/ES estimation.
"""

from __future__ import annotations

import numpy as np
from scipy.stats import norm as _norm

__all__ = [
    "antithetic_variates",
    "block_bootstrap",
    "filtered_historical_simulation",
    "importance_sampling_var",
    "stationary_bootstrap",
    "stratified_sampling",
]


[docs] def importance_sampling_var( returns: np.ndarray, n_sims: int = 10_000, target_quantile: float = 0.01, shift: float | None = None, seed: int | None = None, ) -> dict[str, float]: """Estimate VaR via importance sampling. Shifts the sampling distribution toward the tail to obtain more accurate estimates of extreme quantiles with fewer simulations. Parameters: returns: 1-D array of historical returns. n_sims: Number of Monte Carlo draws. target_quantile: Quantile level for VaR (e.g., 0.01 for 1%). shift: Mean shift for the importance sampling distribution. If ``None``, automatically set to the empirical quantile. seed: Random seed for reproducibility. Returns: Dictionary with keys: - ``var``: Estimated VaR (positive number = loss). - ``effective_sample_size``: Effective sample size after reweighting, as a fraction of ``n_sims``. """ from wraquant.core._coerce import coerce_array returns = coerce_array(returns, name="returns") mu = np.mean(returns) sigma = np.std(returns) if sigma < 1e-15: return {"var": 0.0, "effective_sample_size": 1.0} if shift is None: shift = np.quantile(returns, target_quantile) - mu rng = np.random.default_rng(seed) # Draw from shifted distribution (importance distribution) samples = rng.normal(loc=mu + shift, scale=sigma, size=n_sims) # Likelihood ratios (original / importance) log_ratios = ( -0.5 * ((samples - mu) / sigma) ** 2 + 0.5 * ((samples - mu - shift) / sigma) ** 2 ) weights = np.exp(log_ratios) # Normalise weights weights /= weights.sum() # Sort samples and compute weighted quantile order = np.argsort(samples) sorted_samples = samples[order] sorted_weights = weights[order] cum_weights = np.cumsum(sorted_weights) idx = np.searchsorted(cum_weights, target_quantile) idx = min(idx, len(sorted_samples) - 1) var_estimate = -sorted_samples[idx] # Effective sample size ess = 1.0 / np.sum(weights**2) / n_sims return { "var": float(var_estimate), "effective_sample_size": float(ess), }
[docs] def antithetic_variates( mu: float | np.ndarray, sigma: float | np.ndarray, n_sims: int, n_assets: int = 1, seed: int | None = None, ) -> np.ndarray: """Generate antithetic variate samples for variance reduction. Produces ``n_sims`` paired samples (original + antithetic) from a normal distribution. The antithetic counterpart mirrors each draw about the mean, reducing variance for monotone functions. Parameters: mu: Mean(s) of the distribution. Scalar or array of length ``n_assets``. sigma: Standard deviation(s). Scalar or array of length ``n_assets``. n_sims: Number of *pairs* to generate (total output = 2 * n_sims rows). n_assets: Number of assets / dimensions. seed: Random seed for reproducibility. Returns: Array of shape ``(2 * n_sims, n_assets)`` containing the original and antithetic draws interleaved. """ from wraquant.core._coerce import coerce_array rng = np.random.default_rng(seed) mu = np.broadcast_to(np.asarray(mu, dtype=float), (n_assets,)) sigma = np.broadcast_to(np.asarray(sigma, dtype=float), (n_assets,)) z = rng.standard_normal((n_sims, n_assets)) original = mu + sigma * z antithetic = mu - sigma * z # Interleave original and antithetic result = np.empty((2 * n_sims, n_assets)) result[0::2] = original result[1::2] = antithetic return result
[docs] def stratified_sampling( returns: np.ndarray, n_strata: int = 10, n_sims: int = 10_000, seed: int | None = None, ) -> np.ndarray: """Stratified sampling for VaR estimation. Divides the probability space into equal strata and draws uniformly within each stratum, ensuring better coverage of the tails. Parameters: returns: 1-D array of historical returns (used to fit a normal distribution). n_strata: Number of strata. n_sims: Total number of draws (distributed evenly across strata). seed: Random seed for reproducibility. Returns: 1-D array of ``n_sims`` stratified samples drawn from the fitted normal distribution. """ from wraquant.core._coerce import coerce_array returns = coerce_array(returns, name="returns") mu = np.mean(returns) sigma = np.std(returns) rng = np.random.default_rng(seed) sims_per_stratum = n_sims // n_strata remainder = n_sims - sims_per_stratum * n_strata samples = [] for i in range(n_strata): n_this = sims_per_stratum + (1 if i < remainder else 0) lo = i / n_strata hi = (i + 1) / n_strata # Uniform within the stratum u = rng.uniform(lo, hi, size=n_this) samples.append(_norm.ppf(u, loc=mu, scale=sigma)) return np.concatenate(samples)
[docs] def block_bootstrap( returns: np.ndarray, block_size: int, n_sims: int = 1_000, seed: int | None = None, ) -> np.ndarray: """Block bootstrap for autocorrelated time series. Resamples contiguous blocks of the input series to preserve serial dependence (Kunsch 1989, Liu & Singh 1992). Parameters: returns: 1-D array of return observations. block_size: Length of each block. n_sims: Number of bootstrap replications. seed: Random seed for reproducibility. Returns: 2-D array of shape ``(n_sims, len(returns))`` where each row is one bootstrap replicate. """ from wraquant.core._coerce import coerce_array returns = coerce_array(returns, name="returns") n = len(returns) if block_size < 1 or block_size > n: raise ValueError("block_size must be between 1 and len(returns)") rng = np.random.default_rng(seed) n_blocks = int(np.ceil(n / block_size)) result = np.empty((n_sims, n)) for i in range(n_sims): # Draw random block starting points starts = rng.integers(0, n - block_size + 1, size=n_blocks) blocks = [returns[s : s + block_size] for s in starts] replicate = np.concatenate(blocks)[:n] result[i] = replicate return result
[docs] def stationary_bootstrap( returns: np.ndarray, avg_block_size: float = 10.0, n_sims: int = 1_000, seed: int | None = None, ) -> np.ndarray: """Stationary bootstrap with random block sizes (Politis & Romano 1994). Block lengths follow a geometric distribution with mean ``avg_block_size``, producing a strictly stationary resampled series. Parameters: returns: 1-D array of return observations. avg_block_size: Expected block size (must be > 1). n_sims: Number of bootstrap replications. seed: Random seed for reproducibility. Returns: 2-D array of shape ``(n_sims, len(returns))`` where each row is one bootstrap replicate. """ from wraquant.core._coerce import coerce_array returns = coerce_array(returns, name="returns") n = len(returns) if avg_block_size < 1: raise ValueError("avg_block_size must be >= 1") rng = np.random.default_rng(seed) p = 1.0 / avg_block_size # probability of starting new block result = np.empty((n_sims, n)) for i in range(n_sims): idx = rng.integers(0, n) # random starting point for j in range(n): result[i, j] = returns[idx % n] # With probability p, jump to a new random position if rng.random() < p: idx = rng.integers(0, n) else: idx += 1 return result
[docs] def filtered_historical_simulation( returns: np.ndarray, vol_model: str = "ewma", decay: float = 0.94, n_sims: int = 1_000, seed: int | None = None, ) -> dict[str, np.ndarray]: """Filtered historical simulation (FHS). Combines a volatility model (EWMA or simple GARCH(1,1)) with historical bootstrap of standardised residuals to produce volatility-adjusted scenario returns. Parameters: returns: 1-D array of historical returns. vol_model: Volatility model — ``"ewma"`` (default) or ``"garch"``. decay: EWMA decay factor (lambda) or, for ``"garch"``, the persistence parameter beta. n_sims: Number of simulated next-period returns. seed: Random seed for reproducibility. Returns: Dictionary with keys: - ``simulated_returns``: 1-D array of ``n_sims`` simulated next-period returns. - ``current_vol``: Estimated current volatility used for rescaling. - ``standardised_residuals``: Standardised residuals from the volatility model. """ from wraquant.core._coerce import coerce_array returns = coerce_array(returns, name="returns") n = len(returns) if n < 3: raise ValueError("Need at least 3 return observations") if vol_model == "ewma": # EWMA variance var_t = np.zeros(n) var_t[0] = returns[0] ** 2 for i in range(1, n): var_t[i] = decay * var_t[i - 1] + (1.0 - decay) * returns[i - 1] ** 2 elif vol_model == "garch": # Simple GARCH(1,1): sigma^2_t = omega + alpha * r_{t-1}^2 + beta * sigma^2_{t-1} # Estimate omega and alpha from the data alpha = 1.0 - decay # alpha = 1 - beta for a simple parameterisation omega = ( np.var(returns) * (1.0 - decay - alpha) if (1.0 - decay - alpha) > 0 else 1e-6 ) omega = max(omega, 1e-10) var_t = np.zeros(n) var_t[0] = np.var(returns) for i in range(1, n): var_t[i] = omega + alpha * returns[i - 1] ** 2 + decay * var_t[i - 1] else: msg = f"Unknown vol_model: {vol_model!r}" raise ValueError(msg) sigma_t = np.sqrt(np.maximum(var_t, 1e-15)) standardised = returns / sigma_t current_vol = sigma_t[-1] # One-step-ahead volatility forecast if vol_model == "ewma": next_var = decay * var_t[-1] + (1.0 - decay) * returns[-1] ** 2 else: next_var = omega + alpha * returns[-1] ** 2 + decay * var_t[-1] forecast_vol = np.sqrt(max(next_var, 1e-15)) # Bootstrap standardised residuals and rescale rng = np.random.default_rng(seed) boot_idx = rng.integers(0, n, size=n_sims) simulated = forecast_vol * standardised[boot_idx] return { "simulated_returns": simulated, "current_vol": float(current_vol), "standardised_residuals": standardised, }