"""Pure numpy/scipy Bayesian methods for quantitative finance.
Includes conjugate Bayesian linear regression, Bayesian Sharpe ratio
estimation, Bayesian portfolio allocation, Bayesian VaR, credible
intervals, Bayes factors, and posterior predictive sampling.
"""
from __future__ import annotations
from dataclasses import dataclass, field
import numpy as np
from scipy import stats
__all__ = [
"bayesian_regression",
"bayesian_sharpe",
"bayesian_portfolio",
"bayesian_var",
"credible_interval",
"bayes_factor",
"posterior_predictive",
"bayesian_linear_regression",
"bayesian_factor_model",
"bayesian_changepoint",
"bayesian_portfolio_bl",
"bayesian_volatility",
"bayesian_cointegration",
"bayesian_regime_inference",
"model_comparison",
]
# ---------------------------------------------------------------------------
# Result dataclasses
# ---------------------------------------------------------------------------
@dataclass
class BayesianRegressionResult:
"""Result container for Bayesian linear regression.
Parameters
----------
posterior_mean : np.ndarray
Posterior mean of the coefficient vector.
posterior_cov : np.ndarray
Posterior covariance matrix of the coefficients.
sigma2 : float
Estimated noise variance.
log_marginal_likelihood : float
Log marginal likelihood of the data under the model.
n_obs : int
Number of observations.
n_features : int
Number of features (including intercept if present).
"""
posterior_mean: np.ndarray
posterior_cov: np.ndarray
sigma2: float
log_marginal_likelihood: float
n_obs: int
n_features: int
@dataclass
class BayesianSharpeResult:
"""Result container for Bayesian Sharpe ratio estimation.
Parameters
----------
posterior_mean : float
Posterior mean of the Sharpe ratio.
posterior_std : float
Posterior standard deviation of the Sharpe ratio.
ci_lower : float
Lower bound of the 95% credible interval.
ci_upper : float
Upper bound of the 95% credible interval.
prob_positive : float
Posterior probability that the Sharpe ratio is positive.
samples : np.ndarray
Posterior samples of the Sharpe ratio.
"""
posterior_mean: float
posterior_std: float
ci_lower: float
ci_upper: float
prob_positive: float
samples: np.ndarray
@dataclass
class BayesianPortfolioResult:
"""Result container for Bayesian portfolio allocation.
Parameters
----------
weights_mean : np.ndarray
Mean of posterior portfolio weights.
weights_std : np.ndarray
Standard deviation of posterior portfolio weights.
expected_return : float
Expected portfolio return under posterior mean weights.
expected_risk : float
Expected portfolio risk under posterior mean weights.
weight_samples : np.ndarray
Posterior samples of portfolio weights (n_samples, n_assets).
"""
weights_mean: np.ndarray
weights_std: np.ndarray
expected_return: float
expected_risk: float
weight_samples: np.ndarray
@dataclass
class BayesianVaRResult:
"""Result container for Bayesian Value-at-Risk.
Parameters
----------
var_mean : float
Mean of the posterior VaR distribution.
var_std : float
Standard deviation of the posterior VaR distribution.
ci_lower : float
Lower bound of the 95% credible interval for VaR.
ci_upper : float
Upper bound of the 95% credible interval for VaR.
var_samples : np.ndarray
Posterior samples of VaR.
"""
var_mean: float
var_std: float
ci_lower: float
ci_upper: float
var_samples: np.ndarray
# ---------------------------------------------------------------------------
# Bayesian linear regression (conjugate prior)
# ---------------------------------------------------------------------------
[docs]
def bayesian_regression(
y: np.ndarray,
X: np.ndarray,
prior_mean: np.ndarray | None = None,
prior_cov: np.ndarray | None = None,
) -> BayesianRegressionResult:
"""Conjugate Bayesian linear regression with known noise variance estimate.
Unlike OLS (which gives a single point estimate of beta), Bayesian
regression gives a full posterior distribution, answering: "Given
this data and my prior beliefs, what range of coefficient values is
plausible?" This is especially valuable with small samples or when
you need to propagate parameter uncertainty into downstream
decisions (portfolio construction, risk budgeting).
The model is y = X @ beta + eps, eps ~ N(0, sigma^2 I), with a
normal prior on beta. Sigma^2 is estimated from OLS residuals
(plug-in approach).
Interpretation:
- **posterior_mean** is the Bayesian point estimate of beta.
With a weakly informative prior, it will be close to OLS.
- **posterior_cov** encodes parameter uncertainty. The diagonal
gives the variance of each coefficient; off-diagonals indicate
which coefficients are correlated (common in multicollinear
factor models).
- **log_marginal_likelihood** is used for model comparison via
Bayes factors. Higher values indicate better fit after
penalising complexity.
When to use:
- Small sample sizes (< 100 obs) where OLS standard errors
are unreliable.
- When you have informative priors from economic theory.
- When you need to compare nested and non-nested models via
Bayes factors (see ``model_comparison``).
Parameters:
y: Response vector (n_obs,).
X: Design matrix (n_obs, n_features). Include an intercept
column if desired.
prior_mean: Prior mean for beta (n_features,). Defaults to
zeros (agnostic prior). Set to economic priors if available
(e.g., CAPM beta prior of 1.0).
prior_cov: Prior covariance for beta (n_features, n_features).
Defaults to 10 * I (weakly informative). Smaller values
express stronger prior conviction.
Returns:
BayesianRegressionResult containing:
- **posterior_mean** (*ndarray*) -- Posterior mean of beta.
- **posterior_cov** (*ndarray*) -- Posterior covariance of beta.
- **sigma2** (*float*) -- Estimated noise variance.
- **log_marginal_likelihood** (*float*) -- For model comparison.
Example:
>>> import numpy as np
>>> rng = np.random.default_rng(0)
>>> X = np.column_stack([np.ones(100), rng.normal(size=100)])
>>> y = X @ np.array([0.5, 1.2]) + rng.normal(0, 0.3, 100)
>>> result = bayesian_regression(y, X)
>>> print(f"Beta: {result.posterior_mean}") # close to [0.5, 1.2]
See Also:
bayesian_linear_regression: Full Normal-Inverse-Gamma conjugate
model that also treats sigma^2 as unknown.
model_comparison: Compare multiple regression models via Bayes
factors.
"""
y = np.asarray(y, dtype=float).ravel()
X = np.asarray(X, dtype=float)
if X.ndim == 1:
X = X.reshape(-1, 1)
n, k = X.shape
if prior_mean is None:
prior_mean = np.zeros(k)
else:
prior_mean = np.asarray(prior_mean, dtype=float).ravel()
if prior_cov is None:
prior_cov = 10.0 * np.eye(k)
else:
prior_cov = np.asarray(prior_cov, dtype=float)
# Estimate sigma^2 from OLS (use canonical wraquant regression)
from wraquant.stats.regression import ols as _ols
_ols_result = _ols(y, X, add_constant=False)
beta_ols = _ols_result["coefficients"]
resid = y - X @ beta_ols
sigma2 = float(np.sum(resid**2) / max(n - k, 1))
# Posterior
prior_prec = np.linalg.inv(prior_cov)
data_prec = X.T @ X / sigma2
posterior_prec = prior_prec + data_prec
posterior_cov = np.linalg.inv(posterior_prec)
posterior_mean = posterior_cov @ (prior_prec @ prior_mean + X.T @ y / sigma2)
# Log marginal likelihood (approximate)
sign_prior, logdet_prior = np.linalg.slogdet(prior_cov)
sign_post, logdet_post = np.linalg.slogdet(posterior_cov)
log_ml = (
-0.5 * n * np.log(2 * np.pi * sigma2)
- 0.5 * np.sum(resid**2) / sigma2
+ 0.5 * logdet_post
- 0.5 * logdet_prior
)
return BayesianRegressionResult(
posterior_mean=posterior_mean,
posterior_cov=posterior_cov,
sigma2=sigma2,
log_marginal_likelihood=float(log_ml),
n_obs=n,
n_features=k,
)
# ---------------------------------------------------------------------------
# Bayesian Sharpe ratio
# ---------------------------------------------------------------------------
[docs]
def bayesian_sharpe(
returns: np.ndarray,
prior_mu: float = 0.0,
prior_sigma: float = 1.0,
n_samples: int = 10_000,
rng_seed: int = 42,
) -> BayesianSharpeResult:
"""Estimate the Sharpe ratio with full Bayesian uncertainty.
Unlike the classical Sharpe ratio (a single point estimate), the
Bayesian Sharpe provides a full posterior distribution, answering:
"Given this data, what range of Sharpe ratios is plausible?" This
is critical for short track records where the point estimate is
unreliable -- a 2-year track record has a standard error on the
Sharpe of roughly 1/sqrt(T) ~ 0.7.
Uses a normal-inverse-gamma conjugate prior for the mean and
variance of returns, then computes the posterior distribution of
mu / sigma.
Interpretation:
- If the 95% credible interval includes 0, the strategy may
not have genuine alpha.
- Width of the interval indicates estimation uncertainty.
A Sharpe of 1.0 with CI [0.2, 1.8] is very different from
1.0 with CI [0.9, 1.1].
- **prob_positive** > 0.95 suggests genuine positive
risk-adjusted returns.
- **prob_positive** < 0.8 means you cannot confidently
distinguish the strategy from random noise.
When to use:
- Track records shorter than 3 years.
- When you need confidence in the Sharpe estimate before
allocating capital.
- For comparing strategies with different track lengths.
- When the classical Sharpe looks "too good" on a short sample.
Red flags:
- Very wide CI (> 1.5 units): not enough data to judge.
- prob_positive between 0.5 and 0.8: inconclusive.
- Posterior mean much higher than OLS Sharpe: check the prior.
Parameters:
returns: Return series (daily, weekly, or monthly -- the
Sharpe ratio is in the same frequency as the returns).
prior_mu: Prior mean for the return mean. Default 0 (agnostic).
prior_sigma: Prior std for the return mean. Default 1 (weakly
informative). Set lower to express stronger conviction.
n_samples: Number of posterior samples. 10,000 is usually
sufficient for stable credible intervals.
rng_seed: Random seed for reproducibility.
Returns:
BayesianSharpeResult containing:
- **posterior_mean** (*float*) -- Posterior mean Sharpe ratio.
- **posterior_std** (*float*) -- Posterior standard deviation.
- **ci_lower** (*float*) -- 2.5th percentile (lower bound).
- **ci_upper** (*float*) -- 97.5th percentile (upper bound).
- **prob_positive** (*float*) -- P(Sharpe > 0). Values > 0.95
suggest genuine positive risk-adjusted returns.
- **samples** (*ndarray*) -- Raw posterior samples for custom
analysis (e.g., P(Sharpe > 1)).
Example:
>>> import numpy as np
>>> rng = np.random.default_rng(0)
>>> daily_returns = rng.normal(0.0003, 0.01, 252) # ~1 year
>>> result = bayesian_sharpe(daily_returns)
>>> print(f"Sharpe: {result.posterior_mean:.2f} "
... f"[{result.ci_lower:.2f}, {result.ci_upper:.2f}]")
>>> print(f"P(Sharpe > 0): {result.prob_positive:.1%}")
See Also:
risk.metrics.sharpe_ratio: Classical point estimate.
bayesian_regression: Full Bayesian regression for factor models.
"""
returns = np.asarray(returns, dtype=float).ravel()
n = len(returns)
rng = np.random.default_rng(rng_seed)
# Sufficient statistics
y_bar = np.mean(returns)
s2 = np.var(returns, ddof=1)
# Normal-inverse-gamma posterior parameters
prior_var = prior_sigma**2
kappa_0 = 1.0 / prior_var
kappa_n = kappa_0 + n
mu_n = (kappa_0 * prior_mu + n * y_bar) / kappa_n
# Posterior for sigma^2: Inverse-Gamma
alpha_0 = 1.0 # weakly informative
beta_0 = 1.0
alpha_n = alpha_0 + n / 2.0
beta_n = beta_0 + 0.5 * (n - 1) * s2 + 0.5 * kappa_0 * n * (y_bar - prior_mu) ** 2 / kappa_n
# Sample sigma^2 from Inverse-Gamma (via Gamma for 1/sigma^2)
sigma2_samples = 1.0 / rng.gamma(alpha_n, 1.0 / beta_n, size=n_samples)
sigma2_samples = np.maximum(sigma2_samples, 1e-15)
# Sample mu | sigma^2 from Normal
mu_samples = rng.normal(mu_n, np.sqrt(sigma2_samples / kappa_n))
# Sharpe ratio samples
sharpe_samples = mu_samples / np.sqrt(sigma2_samples)
ci = credible_interval(sharpe_samples, alpha=0.05)
return BayesianSharpeResult(
posterior_mean=float(np.mean(sharpe_samples)),
posterior_std=float(np.std(sharpe_samples)),
ci_lower=float(ci[0]),
ci_upper=float(ci[1]),
prob_positive=float(np.mean(sharpe_samples > 0)),
samples=sharpe_samples,
)
# ---------------------------------------------------------------------------
# Bayesian portfolio allocation
# ---------------------------------------------------------------------------
[docs]
def bayesian_portfolio(
returns: np.ndarray,
prior_cov_scale: float = 1.0,
n_samples: int = 5_000,
rng_seed: int = 42,
) -> BayesianPortfolioResult:
"""Portfolio allocation with full parameter uncertainty.
Classical mean-variance optimization treats the sample mean and
covariance as if they were the true values, leading to
over-concentrated portfolios and poor out-of-sample performance.
The Bayesian approach samples from the posterior of (mu, Sigma)
using a conjugate normal-inverse-Wishart prior, then computes the
optimal portfolio for each posterior draw. The result is a
distribution of optimal weights, not a single fragile answer.
Interpretation:
- **weights_mean** is the "average" optimal portfolio across
all plausible parameter values. It is typically more
diversified than the plug-in MVO solution.
- **weights_std** quantifies allocation uncertainty. Large
std means the data does not strongly support a precise
allocation to that asset.
- If weights_std for an asset is comparable to weights_mean,
the allocation is essentially noise -- consider removing
that asset or using stronger priors.
- **weight_samples** can be used to compute the probability
that a weight exceeds a threshold (e.g., P(w_i > 0.3)).
When to use:
- Short estimation windows (< 5 years of monthly data).
- When MVO gives extreme, unintuitive weights.
- When you want to quantify how much you can trust the
optimal allocation.
Red flags:
- weights_std >> weights_mean: allocation is noise.
- Negative expected_return under posterior mean weights:
the prior is dragging estimates toward zero.
Parameters:
returns: Return matrix (n_periods, n_assets).
prior_cov_scale: Scale factor for the prior covariance.
Larger values give a more diffuse (less informative)
prior, letting the data speak. Default is 1.0.
n_samples: Number of posterior portfolio weight samples.
More samples give smoother weight distributions.
rng_seed: Random seed for reproducibility.
Returns:
BayesianPortfolioResult containing:
- **weights_mean** (*ndarray*) -- Mean posterior weights.
- **weights_std** (*ndarray*) -- Weight uncertainty per asset.
- **expected_return** (*float*) -- Portfolio return under
posterior mean weights.
- **expected_risk** (*float*) -- Portfolio risk under
posterior mean weights.
- **weight_samples** (*ndarray*) -- (n_samples, n_assets)
raw weight draws for custom analysis.
Example:
>>> import numpy as np
>>> rng = np.random.default_rng(0)
>>> returns = rng.normal(0.001, 0.02, (252, 4))
>>> result = bayesian_portfolio(returns, n_samples=2000)
>>> print(f"Weights: {result.weights_mean}")
>>> print(f"Weight uncertainty: {result.weights_std}")
See Also:
bayesian_portfolio_bl: Black-Litterman model with subjective
views and Bayesian uncertainty.
opt.mean_variance: Classical plug-in MVO.
"""
returns = np.asarray(returns, dtype=float)
if returns.ndim == 1:
returns = returns.reshape(-1, 1)
n, p = returns.shape
rng = np.random.default_rng(rng_seed)
# Sufficient statistics
y_bar = np.mean(returns, axis=0)
S = np.cov(returns, rowvar=False, ddof=1)
if S.ndim == 0:
S = S.reshape(1, 1)
# Conjugate prior parameters (weakly informative)
mu_0 = np.zeros(p)
kappa_0 = 0.01
nu_0 = p + 2.0
Psi_0 = prior_cov_scale * np.eye(p)
# Posterior parameters
kappa_n = kappa_0 + n
mu_n = (kappa_0 * mu_0 + n * y_bar) / kappa_n
nu_n = nu_0 + n
Psi_n = Psi_0 + (n - 1) * S + (kappa_0 * n / kappa_n) * np.outer(y_bar - mu_0, y_bar - mu_0)
weight_samples = np.zeros((n_samples, p))
for i in range(n_samples):
# Sample Sigma from Inverse-Wishart
# Inverse-Wishart(nu_n, Psi_n) = inv(Wishart(nu_n, inv(Psi_n)))
Psi_n_inv = np.linalg.inv(Psi_n)
# Ensure symmetric positive definite
Psi_n_inv = 0.5 * (Psi_n_inv + Psi_n_inv.T) + 1e-10 * np.eye(p)
W = stats.wishart.rvs(df=int(nu_n), scale=Psi_n_inv, random_state=rng)
if np.ndim(W) == 0:
W = np.array([[W]])
Sigma_sample = np.linalg.inv(W)
Sigma_sample = 0.5 * (Sigma_sample + Sigma_sample.T)
# Sample mu | Sigma from Normal
mu_cov = Sigma_sample / kappa_n
mu_cov = 0.5 * (mu_cov + mu_cov.T) + 1e-12 * np.eye(p)
mu_sample = rng.multivariate_normal(mu_n, mu_cov)
# Compute mean-variance optimal weights (unconstrained)
try:
Sigma_inv = np.linalg.inv(Sigma_sample + 1e-10 * np.eye(p))
w = Sigma_inv @ mu_sample
w_sum = np.sum(w)
if abs(w_sum) > 1e-12:
w = w / w_sum # normalize to sum to 1
else:
w = np.ones(p) / p
except np.linalg.LinAlgError:
w = np.ones(p) / p
weight_samples[i] = w
weights_mean = np.mean(weight_samples, axis=0)
weights_std = np.std(weight_samples, axis=0)
# Portfolio metrics under posterior mean weights
expected_return = float(weights_mean @ y_bar)
expected_risk = float(np.sqrt(weights_mean @ S @ weights_mean))
return BayesianPortfolioResult(
weights_mean=weights_mean,
weights_std=weights_std,
expected_return=expected_return,
expected_risk=expected_risk,
weight_samples=weight_samples,
)
# ---------------------------------------------------------------------------
# Bayesian VaR
# ---------------------------------------------------------------------------
[docs]
def bayesian_var(
returns: np.ndarray,
confidence: float = 0.95,
n_posterior: int = 10_000,
rng_seed: int = 42,
) -> BayesianVaRResult:
"""Value-at-Risk with full Bayesian parameter uncertainty.
Classical VaR computes a single number: "at the 95% level, you
will not lose more than X." But that number itself is estimated
from data and has uncertainty. Bayesian VaR samples from the
posterior of (mu, sigma^2) using conjugate priors, then computes
VaR for each draw, producing a distribution of VaR estimates
that answers: "How uncertain is the VaR number itself?"
This is critical for regulatory capital (where VaR drives reserve
requirements) and for risk budgeting (where VaR determines position
sizing). Under-estimating VaR uncertainty leads to hidden tail risk.
Interpretation:
- **var_mean** is the posterior expected VaR -- your best
single estimate.
- **var_std** quantifies estimation uncertainty. If var_std
is large relative to var_mean, you need more data or a
more informative prior.
- **ci_upper** is the "conservative VaR": use this for
prudent risk management. ci_upper >> var_mean signals high
model risk.
- **var_samples** can be used to compute P(VaR > threshold),
answering "what is the probability that our true VaR
exceeds the regulatory threshold?"
When to use:
- Short return histories (< 2 years daily).
- When you need to report model risk alongside VaR.
- For stress testing: use the upper CI as a conservative VaR.
Red flags:
- var_std > 0.5 * var_mean: VaR estimate is unreliable.
- ci_upper > 2 * var_mean: extreme model uncertainty.
- Negative var_mean: possible with very noisy priors.
Parameters:
returns: Array of observed returns (daily, weekly, etc.).
confidence: Confidence level (e.g., 0.95 for 95% VaR,
0.99 for 99% VaR). Higher confidence = more extreme
losses = more estimation uncertainty.
n_posterior: Number of posterior samples. 10,000 is usually
sufficient.
rng_seed: Random seed for reproducibility.
Returns:
BayesianVaRResult containing:
- **var_mean** (*float*) -- Posterior mean VaR (positive =
loss).
- **var_std** (*float*) -- Posterior std of VaR.
- **ci_lower** (*float*) -- 2.5th percentile (optimistic VaR).
- **ci_upper** (*float*) -- 97.5th percentile (conservative
VaR -- use this for prudent risk management).
- **var_samples** (*ndarray*) -- Raw posterior VaR samples.
Example:
>>> import numpy as np
>>> rng = np.random.default_rng(0)
>>> returns = rng.normal(-0.0001, 0.015, 252)
>>> result = bayesian_var(returns, confidence=0.99)
>>> print(f"VaR: {result.var_mean:.4f} "
... f"[{result.ci_lower:.4f}, {result.ci_upper:.4f}]")
See Also:
risk.metrics.value_at_risk: Classical historical VaR.
risk.evt: Extreme value theory VaR for tail estimation.
"""
returns = np.asarray(returns, dtype=float).ravel()
n = len(returns)
rng = np.random.default_rng(rng_seed)
y_bar = np.mean(returns)
s2 = np.var(returns, ddof=1)
# Weakly informative conjugate prior
kappa_0 = 0.01
mu_0 = 0.0
alpha_0 = 1.0
beta_0 = 1.0
kappa_n = kappa_0 + n
mu_n = (kappa_0 * mu_0 + n * y_bar) / kappa_n
alpha_n = alpha_0 + n / 2.0
beta_n = beta_0 + 0.5 * (n - 1) * s2 + 0.5 * kappa_0 * n * (y_bar - mu_0) ** 2 / kappa_n
# Sample sigma^2 from Inverse-Gamma
sigma2_samples = 1.0 / rng.gamma(alpha_n, 1.0 / beta_n, size=n_posterior)
sigma2_samples = np.maximum(sigma2_samples, 1e-15)
# Sample mu | sigma^2
mu_samples = rng.normal(mu_n, np.sqrt(sigma2_samples / kappa_n))
# VaR for each posterior sample
z = stats.norm.ppf(1 - confidence)
var_samples = -(mu_samples + z * np.sqrt(sigma2_samples))
ci = credible_interval(var_samples, alpha=0.05)
return BayesianVaRResult(
var_mean=float(np.mean(var_samples)),
var_std=float(np.std(var_samples)),
ci_lower=float(ci[0]),
ci_upper=float(ci[1]),
var_samples=var_samples,
)
# ---------------------------------------------------------------------------
# Credible interval
# ---------------------------------------------------------------------------
[docs]
def credible_interval(
samples: np.ndarray,
alpha: float = 0.05,
) -> tuple[float, float]:
"""Compute the Highest Posterior Density (HPD) credible interval.
The HPD interval is the shortest interval containing (1 - alpha)
of the posterior mass. Unlike equal-tailed intervals (which just
take the alpha/2 and 1-alpha/2 quantiles), the HPD interval is
always the narrowest possible -- important for skewed posteriors
(e.g., variance or Sharpe ratio posteriors).
Interpretation:
- There is a (1 - alpha) posterior probability that the true
parameter lies in this interval. This is the statement
most practitioners actually want (unlike frequentist
confidence intervals).
- A narrow HPD interval = precise estimate.
- An HPD interval that includes 0 = cannot rule out zero effect.
- For symmetric posteriors, HPD equals the equal-tailed interval.
For skewed posteriors (common for variances, Sharpe ratios),
HPD is always shorter.
Parameters:
samples: Posterior samples (1D array).
alpha: Significance level. Default 0.05 (95% credible interval).
Returns:
(lower, upper) bounds of the HPD interval.
Example:
>>> import numpy as np
>>> samples = np.random.default_rng(0).normal(2.0, 0.5, 10000)
>>> lo, hi = credible_interval(samples, alpha=0.05)
>>> print(f"95% HPD: [{lo:.2f}, {hi:.2f}]") # ~[1.0, 3.0]
"""
samples = np.asarray(samples, dtype=float).ravel()
sorted_samples = np.sort(samples)
n = len(sorted_samples)
n_included = int(np.ceil((1 - alpha) * n))
# Find the shortest interval
widths = sorted_samples[n_included:] - sorted_samples[: n - n_included]
if len(widths) == 0:
return (float(sorted_samples[0]), float(sorted_samples[-1]))
idx = int(np.argmin(widths))
return (float(sorted_samples[idx]), float(sorted_samples[idx + n_included]))
# ---------------------------------------------------------------------------
# Bayes factor
# ---------------------------------------------------------------------------
[docs]
def bayes_factor(
log_likelihood_1: float,
log_likelihood_2: float,
) -> float:
"""Compute the Bayes factor comparing two models.
The Bayes factor is the Bayesian alternative to likelihood ratio
tests and information criteria. It quantifies how much the data
favors one model over another, automatically penalising complexity
(Occam's razor is built in, unlike AIC/BIC which use fixed
penalties).
BF = exp(log_likelihood_1 - log_likelihood_2).
Interpretation (Kass & Raftery, 1995 scale):
- BF < 1/100: decisive evidence for model 2.
- BF 1/100 to 1/10: strong evidence for model 2.
- BF 1/10 to 1/3: moderate evidence for model 2.
- BF 1/3 to 1: weak evidence for model 2.
- BF 1 to 3: weak evidence for model 1.
- BF 3 to 10: moderate evidence for model 1.
- BF 10 to 100: strong evidence for model 1.
- BF > 100: decisive evidence for model 1.
When to use:
- Comparing nested or non-nested regression models.
- Deciding whether an extra factor belongs in a return model.
- Testing whether a regime-switching model is justified vs
a single-regime model.
Parameters:
log_likelihood_1: Log marginal likelihood of model 1.
Obtain from ``bayesian_linear_regression(...).log_marginal_likelihood``.
log_likelihood_2: Log marginal likelihood of model 2.
Returns:
Bayes factor (BF > 1 favors model 1, BF < 1 favors model 2).
Example:
>>> # Compare a 3-factor model to a 1-factor model
>>> bf = bayes_factor(log_ml_3factor, log_ml_1factor)
>>> print(f"BF = {bf:.1f}") # BF > 10 = strong evidence
"""
diff = log_likelihood_1 - log_likelihood_2
# Clip to avoid overflow
diff = np.clip(diff, -500, 500)
return float(np.exp(diff))
# ---------------------------------------------------------------------------
# Posterior predictive
# ---------------------------------------------------------------------------
[docs]
def posterior_predictive(
y: np.ndarray,
X: np.ndarray,
prior_mean: np.ndarray | None = None,
prior_cov: np.ndarray | None = None,
n_samples: int = 1_000,
rng_seed: int = 42,
X_new: np.ndarray | None = None,
) -> np.ndarray:
"""Generate posterior predictive samples for Bayesian linear regression.
The posterior predictive distribution answers: "Given the data I
have observed, what range of outcomes should I expect for new
observations?" Unlike point predictions from OLS, the posterior
predictive integrates over parameter uncertainty AND noise
uncertainty, giving calibrated prediction intervals.
Interpretation:
- Each row of the output is a plausible future outcome vector
under a different posterior draw of beta and sigma.
- The spread across rows at a given prediction point is the
prediction interval. This is always wider than a confidence
interval on the mean because it includes both parameter
uncertainty and noise.
- If prediction intervals are very wide relative to the
response scale, the model has low predictive power.
When to use:
- Forecasting returns or factor exposures with uncertainty.
- Generating scenario paths for stress testing.
- Checking model calibration: are observed values typically
within the 95% predictive interval?
Parameters:
y: Response vector (n_obs,).
X: Design matrix (n_obs, n_features).
prior_mean: Prior mean for beta. Defaults to zeros.
prior_cov: Prior covariance for beta. Defaults to 10 * I.
n_samples: Number of posterior predictive samples.
rng_seed: Random seed for reproducibility.
X_new: New design matrix for predictions. If None, uses X
(in-sample predictions, useful for calibration checks).
Returns:
ndarray of shape (n_samples, n_pred) -- posterior predictive
samples. Take ``np.percentile(result, [2.5, 97.5], axis=0)``
for a 95% prediction interval.
Example:
>>> import numpy as np
>>> rng = np.random.default_rng(0)
>>> X = np.column_stack([np.ones(100), rng.normal(size=100)])
>>> y = X @ np.array([1.0, 2.0]) + rng.normal(0, 0.5, 100)
>>> X_new = np.column_stack([np.ones(5), rng.normal(size=5)])
>>> ppc = posterior_predictive(y, X, X_new=X_new)
>>> pi = np.percentile(ppc, [2.5, 97.5], axis=0)
>>> print(f"95% PI for first prediction: [{pi[0, 0]:.2f}, {pi[1, 0]:.2f}]")
"""
rng = np.random.default_rng(rng_seed)
result = bayesian_regression(y, X, prior_mean, prior_cov)
if X_new is None:
X_new = np.asarray(X, dtype=float)
if X_new.ndim == 1:
X_new = X_new.reshape(-1, 1)
else:
X_new = np.asarray(X_new, dtype=float)
if X_new.ndim == 1:
X_new = X_new.reshape(-1, 1)
n_pred = X_new.shape[0]
predictions = np.zeros((n_samples, n_pred))
# Ensure posterior_cov is positive definite
post_cov = result.posterior_cov
post_cov = 0.5 * (post_cov + post_cov.T) + 1e-10 * np.eye(post_cov.shape[0])
for i in range(n_samples):
# Sample beta from posterior
beta_sample = rng.multivariate_normal(result.posterior_mean, post_cov)
# Sample noise
y_hat = X_new @ beta_sample
noise = rng.normal(0, np.sqrt(result.sigma2), size=n_pred)
predictions[i] = y_hat + noise
return predictions
# ---------------------------------------------------------------------------
# Enhanced Bayesian linear regression (Normal-InverseGamma conjugate)
# ---------------------------------------------------------------------------
@dataclass
class BayesianLinearRegressionResult:
"""Result container for the enhanced Bayesian linear regression.
This uses the full Normal-Inverse-Gamma conjugate prior, which means
**both** the regression coefficients and the noise variance are treated
as unknown and given a joint prior. The posterior is available in
closed form, so no MCMC is needed.
Attributes:
posterior_mean: Posterior mean of the coefficient vector beta.
posterior_cov_unscaled: Posterior precision-scaled covariance
(V_n). The actual posterior covariance of beta given sigma^2
is ``sigma^2 * posterior_cov_unscaled``.
a_n: Shape parameter of the posterior Inverse-Gamma for sigma^2.
b_n: Scale parameter of the posterior Inverse-Gamma for sigma^2.
sigma2_mean: Posterior mean of sigma^2 = b_n / (a_n - 1).
log_marginal_likelihood: Log marginal likelihood p(y | model),
used for Bayes factor model comparison.
credible_intervals: (n_features, 2) array of 95 % credible
intervals for each coefficient, marginalised over sigma^2
(Student-t).
n_obs: Number of observations.
n_features: Number of features.
Notes:
**Bayesian vs frequentist**: A 95 % credible interval means
"there is a 95 % posterior probability that the parameter lies in
this interval", which is the statement most practitioners actually
want. A frequentist 95 % confidence interval instead says "if we
repeated the experiment many times, 95 % of the resulting intervals
would contain the true value" -- a subtly different claim.
"""
posterior_mean: np.ndarray
posterior_cov_unscaled: np.ndarray
a_n: float
b_n: float
sigma2_mean: float
log_marginal_likelihood: float
credible_intervals: np.ndarray
n_obs: int
n_features: int
[docs]
def bayesian_linear_regression(
y: np.ndarray,
X: np.ndarray,
prior_mean: np.ndarray | None = None,
prior_cov: np.ndarray | None = None,
a_0: float = 1.0,
b_0: float = 1.0,
alpha: float = 0.05,
) -> BayesianLinearRegressionResult:
"""Enhanced Bayesian linear regression with Normal-Inverse-Gamma prior.
This is the "textbook" conjugate Bayesian regression where both the
coefficients **and** the noise variance are unknown:
y | X, beta, sigma^2 ~ N(X beta, sigma^2 I)
beta | sigma^2 ~ N(m_0, sigma^2 V_0)
sigma^2 ~ InvGamma(a_0, b_0)
The posterior is available in closed form as:
beta | sigma^2, y ~ N(m_n, sigma^2 V_n)
sigma^2 | y ~ InvGamma(a_n, b_n)
beta | y ~ Student-t(2 a_n, m_n, (b_n / a_n) V_n)
**When to use this instead of OLS**: Use this when you want full
uncertainty quantification (credible intervals on coefficients *and*
on sigma^2), when your data set is small and prior information
matters, or when you need the marginal likelihood for model
comparison (Bayes factors).
Args:
y: Response vector of shape ``(n,)``.
X: Design matrix of shape ``(n, k)``. Include an intercept
column yourself if you want one.
prior_mean: Prior mean for beta, shape ``(k,)``. Defaults to
zeros (agnostic prior).
prior_cov: Prior covariance scale for beta, ``V_0`` of shape
``(k, k)``. The actual prior covariance is
``sigma^2 * prior_cov``. Defaults to ``100 * I`` (weakly
informative).
a_0: Shape parameter for the Inverse-Gamma prior on sigma^2.
Default ``1.0``.
b_0: Scale parameter for the Inverse-Gamma prior on sigma^2.
Default ``1.0``.
alpha: Significance level for credible intervals. Default ``0.05``
gives 95 % intervals.
Returns:
BayesianLinearRegressionResult with closed-form posterior
analytics, credible intervals, and log marginal likelihood.
Example:
>>> import numpy as np
>>> rng = np.random.default_rng(0)
>>> X = np.column_stack([np.ones(100), rng.normal(size=100)])
>>> y = X @ np.array([1.0, 2.0]) + rng.normal(0, 0.5, 100)
>>> result = bayesian_linear_regression(y, X)
>>> print(result.posterior_mean) # close to [1, 2]
"""
y = np.asarray(y, dtype=float).ravel()
X = np.asarray(X, dtype=float)
if X.ndim == 1:
X = X.reshape(-1, 1)
n, k = X.shape
if prior_mean is None:
prior_mean = np.zeros(k)
else:
prior_mean = np.asarray(prior_mean, dtype=float).ravel()
if prior_cov is None:
prior_cov = 100.0 * np.eye(k)
else:
prior_cov = np.asarray(prior_cov, dtype=float)
# Prior precision
V0_inv = np.linalg.inv(prior_cov)
# Posterior for beta
XtX = X.T @ X
Vn_inv = V0_inv + XtX
V_n = np.linalg.inv(Vn_inv)
m_n = V_n @ (V0_inv @ prior_mean + X.T @ y)
# Posterior for sigma^2
a_n = a_0 + n / 2.0
residual_term = float(
y @ y
+ prior_mean @ V0_inv @ prior_mean
- m_n @ Vn_inv @ m_n
)
b_n = b_0 + 0.5 * residual_term
sigma2_mean = b_n / max(a_n - 1.0, 1e-12)
# Log marginal likelihood p(y | model)
sign0, logdet_V0 = np.linalg.slogdet(prior_cov)
sign_n, logdet_Vn = np.linalg.slogdet(V_n)
from scipy.special import gammaln
log_ml = (
-0.5 * n * np.log(2.0 * np.pi)
+ 0.5 * logdet_Vn
- 0.5 * logdet_V0
+ a_0 * np.log(b_0)
- a_n * np.log(b_n)
+ float(gammaln(a_n) - gammaln(a_0))
)
# Credible intervals via marginal Student-t for each coefficient
df_t = 2.0 * a_n
scale = np.sqrt(np.diag(V_n) * b_n / a_n)
t_crit = stats.t.ppf(1.0 - alpha / 2.0, df=df_t)
ci = np.column_stack([m_n - t_crit * scale, m_n + t_crit * scale])
return BayesianLinearRegressionResult(
posterior_mean=m_n,
posterior_cov_unscaled=V_n,
a_n=float(a_n),
b_n=float(b_n),
sigma2_mean=float(sigma2_mean),
log_marginal_likelihood=float(log_ml),
credible_intervals=ci,
n_obs=n,
n_features=k,
)
# ---------------------------------------------------------------------------
# Bayesian factor model (Bayesian PCA)
# ---------------------------------------------------------------------------
@dataclass
class BayesianFactorModelResult:
"""Result container for the Bayesian factor model.
Attributes:
loadings_mean: Posterior mean of the factor loadings matrix,
shape ``(n_variables, n_factors)``.
loadings_std: Posterior std of the factor loadings, same shape.
scores_mean: Posterior mean of the factor scores,
shape ``(n_obs, n_factors)``.
explained_variance: Fraction of total variance explained by each
factor, shape ``(n_factors,)``.
explained_variance_ci: 95 % credible intervals for explained
variance, shape ``(n_factors, 2)``.
noise_variance: Estimated idiosyncratic noise variance per
variable, shape ``(n_variables,)``.
n_factors: Number of latent factors used.
"""
loadings_mean: np.ndarray
loadings_std: np.ndarray
scores_mean: np.ndarray
explained_variance: np.ndarray
explained_variance_ci: np.ndarray
noise_variance: np.ndarray
n_factors: int
[docs]
def bayesian_factor_model(
X: np.ndarray,
n_factors: int = 2,
n_samples: int = 1_000,
rng_seed: int = 42,
) -> BayesianFactorModelResult:
"""Bayesian factor model via Gibbs sampling (Bayesian PCA).
Estimates a latent factor model of the form:
X = F @ Lambda^T + epsilon
where F is the ``(n_obs, n_factors)`` matrix of latent factor scores
and Lambda is the ``(n_variables, n_factors)`` loading matrix. Both
are given conjugate Gaussian priors and sampled via Gibbs.
**When to use this**: Use a Bayesian factor model instead of standard
PCA when you need uncertainty estimates on the loadings and explained
variance, especially with short financial time series where the
number of observations is not much larger than the number of assets.
Args:
X: Data matrix of shape ``(n_obs, n_variables)``. Columns are
typically asset returns.
n_factors: Number of latent factors to estimate.
n_samples: Number of Gibbs samples to draw (after a built-in
burn-in of ``n_samples // 2``).
rng_seed: Random seed for reproducibility.
Returns:
BayesianFactorModelResult with posterior summaries for loadings,
scores, and explained variance.
Example:
>>> import numpy as np
>>> rng = np.random.default_rng(0)
>>> F = rng.normal(size=(200, 2))
>>> L = rng.normal(size=(5, 2))
>>> X = F @ L.T + rng.normal(0, 0.3, (200, 5))
>>> result = bayesian_factor_model(X, n_factors=2)
>>> print(result.loadings_mean.shape) # (5, 2)
"""
rng = np.random.default_rng(rng_seed)
X = np.asarray(X, dtype=float)
if X.ndim == 1:
X = X.reshape(-1, 1)
n, p = X.shape
# Centre the data
X_mean = X.mean(axis=0)
X_c = X - X_mean
burn_in = n_samples // 2
total = n_samples + burn_in
# Initialise from SVD
U, S, Vt = np.linalg.svd(X_c, full_matrices=False)
F = U[:, :n_factors] * S[:n_factors]
Lambda = Vt[:n_factors, :].T
psi = np.var(X_c - F @ Lambda.T, axis=0) + 1e-6
# Storage
Lambda_samples = np.zeros((n_samples, p, n_factors))
F_samples = np.zeros((n_samples, n, n_factors))
psi_samples = np.zeros((n_samples, p))
for it in range(total):
# --- Sample Lambda | F, psi ---
for j in range(p):
prec_L = np.eye(n_factors) + (F.T @ F) / psi[j]
cov_L = np.linalg.inv(prec_L)
mean_L = cov_L @ (F.T @ X_c[:, j]) / psi[j]
Lambda[j, :] = rng.multivariate_normal(mean_L, cov_L)
# --- Sample F | Lambda, psi ---
Psi_inv = np.diag(1.0 / psi)
prec_F = np.eye(n_factors) + Lambda.T @ Psi_inv @ Lambda
cov_F = np.linalg.inv(prec_F)
for i in range(n):
mean_F = cov_F @ (Lambda.T @ Psi_inv @ X_c[i, :])
F[i, :] = rng.multivariate_normal(mean_F, cov_F)
# --- Sample psi | F, Lambda ---
residuals = X_c - F @ Lambda.T
for j in range(p):
a_post = 1.0 + n / 2.0
b_post = 1.0 + 0.5 * np.sum(residuals[:, j] ** 2)
psi[j] = 1.0 / rng.gamma(a_post, 1.0 / b_post)
if it >= burn_in:
idx = it - burn_in
Lambda_samples[idx] = Lambda.copy()
F_samples[idx] = F.copy()
psi_samples[idx] = psi.copy()
# Posterior summaries
loadings_mean = Lambda_samples.mean(axis=0)
loadings_std = Lambda_samples.std(axis=0)
scores_mean = F_samples.mean(axis=0)
noise_variance = psi_samples.mean(axis=0)
# Explained variance with credible intervals
total_var = np.var(X_c, axis=0).sum()
ev_samples = np.zeros((n_samples, n_factors))
for s in range(n_samples):
for f in range(n_factors):
col_var = np.var(F_samples[s, :, f]) * np.sum(Lambda_samples[s, :, f] ** 2)
ev_samples[s, f] = col_var / total_var if total_var > 0 else 0.0
explained_variance = ev_samples.mean(axis=0)
ev_ci = np.column_stack([
np.percentile(ev_samples, 2.5, axis=0),
np.percentile(ev_samples, 97.5, axis=0),
])
return BayesianFactorModelResult(
loadings_mean=loadings_mean,
loadings_std=loadings_std,
scores_mean=scores_mean,
explained_variance=explained_variance,
explained_variance_ci=ev_ci,
noise_variance=noise_variance,
n_factors=n_factors,
)
# ---------------------------------------------------------------------------
# Bayesian online changepoint detection (Adams-MacKay)
# ---------------------------------------------------------------------------
@dataclass
class BayesianChangepointResult:
"""Result container for Bayesian online changepoint detection.
Attributes:
run_length_probs: Matrix of run-length probabilities, shape
``(T, T + 1)`` where entry ``[t, r]`` is the probability
that at time ``t`` the current run length is ``r``.
changepoint_posterior: Posterior probability of a changepoint at
each time step, shape ``(T,)``.
most_likely_changepoints: Indices where the changepoint
posterior exceeds ``threshold``.
hazard_rate: The constant hazard rate (1 / expected run length)
used.
"""
run_length_probs: np.ndarray
changepoint_posterior: np.ndarray
most_likely_changepoints: np.ndarray
hazard_rate: float
[docs]
def bayesian_changepoint(
data: np.ndarray,
hazard: float = 1.0 / 100.0,
mu_0: float = 0.0,
kappa_0: float = 1.0,
alpha_0: float = 1.0,
beta_0: float = 1.0,
threshold: float = 0.3,
) -> BayesianChangepointResult:
"""Bayesian online changepoint detection (Adams & MacKay, 2007).
This algorithm processes observations one at a time and maintains a
probability distribution over the *run length* -- how long since the
last changepoint. At each new observation the run-length
distribution is updated in O(t) time, making the algorithm
efficient for streaming data.
The underlying model assumes each segment has data drawn from a
Normal distribution with unknown mean and variance, using a
Normal-Inverse-Gamma conjugate prior that is re-initialised at each
changepoint.
**When to use this**: Use this for detecting structural breaks in
financial time series (regime changes, volatility shifts, mean
reversions). Unlike classical CUSUM or Bai-Perron tests, this gives
a full posterior probability of a changepoint at each time step
rather than a binary yes/no answer.
Args:
data: Univariate time series of shape ``(T,)``.
hazard: Constant hazard rate = 1 / (expected run length).
``hazard = 0.01`` means changepoints happen roughly every
100 time steps.
mu_0: Prior mean for the Normal component.
kappa_0: Prior precision scaling (how confident in ``mu_0``).
alpha_0: Shape parameter for the Inverse-Gamma noise prior.
beta_0: Scale parameter for the Inverse-Gamma noise prior.
threshold: Minimum posterior probability to flag a changepoint.
Returns:
BayesianChangepointResult with run-length probabilities and
changepoint posterior.
Example:
>>> import numpy as np
>>> rng = np.random.default_rng(0)
>>> data = np.concatenate([rng.normal(0, 1, 100),
... rng.normal(5, 1, 100)])
>>> result = bayesian_changepoint(data, hazard=1/50)
>>> cps = result.most_likely_changepoints
>>> # Should find a changepoint near index 100
"""
data = np.asarray(data, dtype=float).ravel()
T = len(data)
# Run-length probabilities: R[t, r] = P(r_t = r | x_{1:t})
R = np.zeros((T + 1, T + 1))
R[0, 0] = 1.0
# Sufficient statistics for each run length
mu = np.full(T + 1, mu_0)
kappa = np.full(T + 1, kappa_0)
alpha = np.full(T + 1, alpha_0)
beta = np.full(T + 1, beta_0)
for t in range(T):
x = data[t]
# Predictive probability under each run length (Student-t)
df = 2.0 * alpha[: t + 1]
loc = mu[: t + 1]
scale = np.sqrt(beta[: t + 1] * (kappa[: t + 1] + 1.0) / (kappa[: t + 1] * alpha[: t + 1]))
pred = stats.t.pdf(x, df=df, loc=loc, scale=scale)
# Growth probability (run length increases by 1)
growth = R[t, : t + 1] * pred * (1.0 - hazard)
# Changepoint probability (run length resets to 0)
cp = np.sum(R[t, : t + 1] * pred * hazard)
# Update run-length distribution
R[t + 1, 1 : t + 2] = growth
R[t + 1, 0] = cp
# Normalise
evidence = R[t + 1, : t + 2].sum()
if evidence > 0:
R[t + 1, : t + 2] /= evidence
# Update sufficient statistics for the new run lengths
new_mu = np.empty(t + 2)
new_kappa = np.empty(t + 2)
new_alpha = np.empty(t + 2)
new_beta = np.empty(t + 2)
# Run length 0: reset to prior
new_mu[0] = mu_0
new_kappa[0] = kappa_0
new_alpha[0] = alpha_0
new_beta[0] = beta_0
# Run lengths 1 .. t+1: Bayesian update
old_mu = mu[: t + 1]
old_kappa = kappa[: t + 1]
old_alpha = alpha[: t + 1]
old_beta = beta[: t + 1]
new_kappa[1 : t + 2] = old_kappa + 1.0
new_mu[1 : t + 2] = (old_kappa * old_mu + x) / new_kappa[1 : t + 2]
new_alpha[1 : t + 2] = old_alpha + 0.5
new_beta[1 : t + 2] = (
old_beta
+ 0.5 * old_kappa * (x - old_mu) ** 2 / new_kappa[1 : t + 2]
)
mu = new_mu
kappa = new_kappa
alpha = new_alpha
beta = new_beta
# Changepoint detection via MAP run-length drops.
#
# The raw P(r_t = 0) is always approximately equal to the hazard rate
# when a single run length dominates (a well-known property of the
# Adams-MacKay algorithm). Instead, we detect changepoints by
# looking at the MAP (most probable) run length: when it drops
# sharply from a large value back toward zero, a changepoint has
# occurred.
map_run_length = np.zeros(T, dtype=int)
for t in range(T):
map_run_length[t] = int(np.argmax(R[t + 1, : t + 2]))
# Build a changepoint posterior from run-length drops:
# P(cp at t) = P(r_t <= short_threshold) where short_threshold
# captures the probability mass at short run lengths.
cp_posterior = np.zeros(T)
for t in range(T):
short_cutoff = max(3, int(0.05 * (t + 1)))
cp_posterior[t] = float(np.sum(R[t + 1, :short_cutoff]))
# Also flag changepoints where the MAP run length drops
# significantly (drops by more than 50% in one step)
cp_from_map = np.zeros(T, dtype=bool)
for t in range(1, T):
if map_run_length[t] < map_run_length[t - 1] * 0.5 and map_run_length[t - 1] > 5:
cp_from_map[t] = True
most_likely = np.where(cp_from_map | (cp_posterior > threshold))[0]
return BayesianChangepointResult(
run_length_probs=R[1:, :],
changepoint_posterior=cp_posterior,
most_likely_changepoints=most_likely,
hazard_rate=hazard,
)
# ---------------------------------------------------------------------------
# Enhanced Bayesian portfolio (Black-Litterman with full posterior)
# ---------------------------------------------------------------------------
@dataclass
class BayesianPortfolioBLResult:
"""Result container for the enhanced Black-Litterman portfolio.
Attributes:
posterior_mean: Posterior mean of expected returns, shape
``(n_assets,)``.
posterior_cov: Posterior covariance of returns, shape
``(n_assets, n_assets)``.
weights_mean: Mean of posterior optimal portfolio weights.
weights_std: Standard deviation of posterior optimal weights.
weights_ci: 95 % credible intervals for each weight, shape
``(n_assets, 2)``.
weight_samples: Raw posterior weight samples, shape
``(n_samples, n_assets)``.
"""
posterior_mean: np.ndarray
posterior_cov: np.ndarray
weights_mean: np.ndarray
weights_std: np.ndarray
weights_ci: np.ndarray
weight_samples: np.ndarray
[docs]
def bayesian_portfolio_bl(
returns: np.ndarray,
views: np.ndarray | None = None,
view_confidences: np.ndarray | None = None,
P: np.ndarray | None = None,
tau: float = 0.05,
risk_aversion: float = 2.5,
n_samples: int = 5_000,
rng_seed: int = 42,
) -> BayesianPortfolioBLResult:
"""Black-Litterman model with full Bayesian posterior sampling.
The Black-Litterman model combines a market equilibrium prior
(implied by CAPM or equal-weighted) with investor views to produce
a posterior distribution of expected returns. This implementation
goes beyond the standard BL point estimate: it samples from the
full posterior to give uncertainty-aware optimal weights.
**When to use this**: Use BL when you have subjective views about
expected returns or relative performance and want to combine them
with market-implied priors. The Bayesian extension is especially
useful for constructing robust portfolios with weight confidence
intervals.
Args:
returns: Historical return matrix, shape ``(n_periods, n_assets)``.
views: View vector ``q``, shape ``(n_views,)``. If None, the
posterior equals the prior (equilibrium returns).
view_confidences: Diagonal of the view uncertainty matrix
``Omega``, shape ``(n_views,)``. Smaller values mean more
confident views. If None, defaults to
``tau * diag(P @ Sigma @ P^T)``.
P: Pick matrix mapping assets to views, shape
``(n_views, n_assets)``. Required if ``views`` is given.
tau: Scaling factor for the uncertainty in the prior mean.
Typical range 0.01 -- 0.10.
risk_aversion: Risk-aversion parameter delta for the quadratic
utility. Default ``2.5``.
n_samples: Number of posterior samples for weight uncertainty.
rng_seed: Random seed for reproducibility.
Returns:
BayesianPortfolioBLResult with posterior mean/covariance of
returns and weight uncertainty.
Example:
>>> import numpy as np
>>> rng = np.random.default_rng(0)
>>> returns = rng.normal(0.001, 0.02, (252, 3))
>>> # View: asset 0 will outperform asset 1 by 2 % annualised
>>> P = np.array([[1, -1, 0]])
>>> q = np.array([0.02 / 252])
>>> result = bayesian_portfolio_bl(returns, views=q, P=P)
"""
rng = np.random.default_rng(rng_seed)
returns = np.asarray(returns, dtype=float)
if returns.ndim == 1:
returns = returns.reshape(-1, 1)
n, p = returns.shape
Sigma = np.cov(returns, rowvar=False, ddof=1)
if Sigma.ndim == 0:
Sigma = Sigma.reshape(1, 1)
# Equilibrium returns (reverse optimisation with equal-weighted mkt portfolio)
w_mkt = np.ones(p) / p
pi = risk_aversion * Sigma @ w_mkt # equilibrium excess returns
# Prior for mu: N(pi, tau * Sigma)
tau_Sigma = tau * Sigma
if views is not None:
views = np.asarray(views, dtype=float).ravel()
if P is None:
raise ValueError("Pick matrix P is required when views are provided.")
P = np.asarray(P, dtype=float)
if P.ndim == 1:
P = P.reshape(1, -1)
n_views = len(views)
if view_confidences is None:
Omega = np.diag(np.diag(tau * P @ Sigma @ P.T))
else:
Omega = np.diag(np.asarray(view_confidences, dtype=float).ravel())
# Posterior mean and covariance of mu (Black-Litterman formula)
tau_Sigma_inv = np.linalg.inv(tau_Sigma)
Omega_inv = np.linalg.inv(Omega)
post_prec = tau_Sigma_inv + P.T @ Omega_inv @ P
post_cov_mu = np.linalg.inv(post_prec)
post_mean_mu = post_cov_mu @ (tau_Sigma_inv @ pi + P.T @ Omega_inv @ views)
else:
post_mean_mu = pi
post_cov_mu = tau_Sigma
# Posterior covariance of returns (for portfolio optimisation)
post_cov_returns = Sigma + post_cov_mu
# Sample from posterior of mu, then compute optimal weights
post_cov_mu_sym = 0.5 * (post_cov_mu + post_cov_mu.T) + 1e-10 * np.eye(p)
weight_samples = np.zeros((n_samples, p))
Sigma_reg = Sigma + 1e-10 * np.eye(p)
Sigma_inv = np.linalg.inv(Sigma_reg)
for i in range(n_samples):
mu_sample = rng.multivariate_normal(post_mean_mu, post_cov_mu_sym)
# Optimal weights: w* = (1/delta) Sigma^{-1} mu
w = Sigma_inv @ mu_sample / risk_aversion
w_sum = np.sum(w)
if abs(w_sum) > 1e-12:
w = w / w_sum
else:
w = np.ones(p) / p
weight_samples[i] = w
weights_mean = weight_samples.mean(axis=0)
weights_std = weight_samples.std(axis=0)
weights_ci = np.column_stack([
np.percentile(weight_samples, 2.5, axis=0),
np.percentile(weight_samples, 97.5, axis=0),
])
return BayesianPortfolioBLResult(
posterior_mean=post_mean_mu,
posterior_cov=post_cov_returns,
weights_mean=weights_mean,
weights_std=weights_std,
weights_ci=weights_ci,
weight_samples=weight_samples,
)
# ---------------------------------------------------------------------------
# Bayesian stochastic volatility
# ---------------------------------------------------------------------------
@dataclass
class BayesianVolatilityResult:
"""Result container for the Bayesian stochastic volatility model.
Attributes:
vol_mean: Posterior mean of the time-varying volatility path,
shape ``(T,)``.
vol_ci_lower: Lower 2.5 % quantile of the volatility path.
vol_ci_upper: Upper 97.5 % quantile of the volatility path.
mu_posterior: Posterior samples of the log-vol mean level.
phi_posterior: Posterior samples of the AR(1) persistence
parameter.
sigma_eta_posterior: Posterior samples of the log-vol innovation
std.
"""
vol_mean: np.ndarray
vol_ci_lower: np.ndarray
vol_ci_upper: np.ndarray
mu_posterior: np.ndarray
phi_posterior: np.ndarray
sigma_eta_posterior: np.ndarray
[docs]
def bayesian_volatility(
returns: np.ndarray,
n_samples: int = 2_000,
burn_in: int = 1_000,
rng_seed: int = 42,
) -> BayesianVolatilityResult:
"""Bayesian stochastic volatility model via MCMC.
Estimates a time-varying volatility path using the standard
stochastic volatility (SV) model:
y_t = exp(h_t / 2) * epsilon_t, epsilon_t ~ N(0, 1)
h_t = mu + phi * (h_{t-1} - mu) + eta_t, eta_t ~ N(0, sigma_eta^2)
where ``h_t`` is the log-variance at time ``t``, ``mu`` is the
long-run mean of log-variance, ``phi`` is the persistence
(typically close to 1 for financial data), and ``sigma_eta`` is
the volatility-of-volatility.
The model is estimated using a Metropolis-within-Gibbs sampler:
the log-volatility path ``h`` is sampled block-wise, and the
parameters ``(mu, phi, sigma_eta)`` are sampled from their
conditionals.
**When to use this**: Use this instead of GARCH when you believe
volatility evolves as a latent (unobserved) state rather than
being a deterministic function of past returns. The Bayesian
approach gives full uncertainty bands on the volatility path.
Args:
returns: Return series of shape ``(T,)``. Should be
de-meaned (or close to zero-mean).
n_samples: Number of MCMC samples to keep after burn-in.
burn_in: Number of initial samples to discard.
rng_seed: Random seed.
Returns:
BayesianVolatilityResult with posterior volatility path and
parameter posteriors.
Example:
>>> import numpy as np
>>> rng = np.random.default_rng(0)
>>> vol = np.exp(0.5 * np.cumsum(rng.normal(0, 0.1, 200)))
>>> returns = vol * rng.normal(size=200)
>>> result = bayesian_volatility(returns, n_samples=500, burn_in=200)
>>> print(result.vol_mean.shape) # (200,)
"""
rng = np.random.default_rng(rng_seed)
returns = np.asarray(returns, dtype=float).ravel()
T = len(returns)
# Replace exact zeros to avoid log(0)
y = returns.copy()
y[y == 0] = 1e-8
# Initialise log-volatility from log(y^2)
log_y2 = np.log(y ** 2 + 1e-8)
h = log_y2.copy() # initial log-vol
# Initialise parameters
mu = np.mean(h)
phi = 0.95
sigma_eta = 0.2
# Storage
total = n_samples + burn_in
h_samples = np.zeros((n_samples, T))
mu_samples = np.zeros(n_samples)
phi_samples = np.zeros(n_samples)
sigma_eta_samples = np.zeros(n_samples)
for it in range(total):
# --- Sample h (single-site Metropolis) ---
for t in range(T):
# Propose from random walk
h_prop = h[t] + rng.normal(0, 0.5)
# Log-likelihood contribution at t
ll_curr = -0.5 * h[t] - 0.5 * y[t] ** 2 * np.exp(-h[t])
ll_prop = -0.5 * h_prop - 0.5 * y[t] ** 2 * np.exp(-h_prop)
# Log-prior contribution from AR(1) transitions
lp_curr = 0.0
lp_prop = 0.0
if t == 0:
# Stationary distribution prior for h_0
var_stat = sigma_eta ** 2 / max(1.0 - phi ** 2, 1e-8)
lp_curr += -0.5 * (h[t] - mu) ** 2 / var_stat
lp_prop += -0.5 * (h_prop - mu) ** 2 / var_stat
else:
lp_curr += -0.5 * (h[t] - mu - phi * (h[t - 1] - mu)) ** 2 / (sigma_eta ** 2)
lp_prop += -0.5 * (h_prop - mu - phi * (h[t - 1] - mu)) ** 2 / (sigma_eta ** 2)
if t < T - 1:
lp_curr += -0.5 * (h[t + 1] - mu - phi * (h[t] - mu)) ** 2 / (sigma_eta ** 2)
lp_prop += -0.5 * (h[t + 1] - mu - phi * (h_prop - mu)) ** 2 / (sigma_eta ** 2)
log_alpha = (ll_prop + lp_prop) - (ll_curr + lp_curr)
if np.log(rng.uniform()) < log_alpha:
h[t] = h_prop
# --- Sample mu | h, phi, sigma_eta ---
h_diff = h[1:] - phi * h[:-1]
# mu contribution: h_t = mu + phi*(h_{t-1} - mu) + eta
# => h_t = mu*(1-phi) + phi*h_{t-1} + eta
# So h_diff = h_t - phi*h_{t-1} ~ N(mu*(1-phi), sigma_eta^2)
n_trans = T - 1
prior_prec_mu = 1.0 / 10.0 # prior N(0, 10)
data_prec_mu = n_trans * (1 - phi) ** 2 / (sigma_eta ** 2)
post_prec_mu = prior_prec_mu + data_prec_mu
post_mean_mu = (prior_prec_mu * 0.0 + (1 - phi) * np.sum(h_diff) / (sigma_eta ** 2)) / post_prec_mu
mu = rng.normal(post_mean_mu, 1.0 / np.sqrt(post_prec_mu))
# --- Sample phi | h, mu, sigma_eta (truncated to (-1, 1)) ---
x_t = h[:-1] - mu
y_t = h[1:] - mu
x_sum = np.sum(x_t ** 2)
if x_sum > 1e-12:
post_var_phi = sigma_eta ** 2 / x_sum
post_mean_phi = np.sum(x_t * y_t) / x_sum
phi_prop = rng.normal(post_mean_phi, np.sqrt(post_var_phi))
if abs(phi_prop) < 1.0:
phi = phi_prop
# --- Sample sigma_eta^2 | h, mu, phi ---
residuals = y_t - phi * x_t
a_post = 1.0 + n_trans / 2.0
b_post = 1.0 + 0.5 * np.sum(residuals ** 2)
sigma_eta_sq = 1.0 / rng.gamma(a_post, 1.0 / b_post)
sigma_eta = np.sqrt(max(sigma_eta_sq, 1e-10))
if it >= burn_in:
idx = it - burn_in
h_samples[idx] = h.copy()
mu_samples[idx] = mu
phi_samples[idx] = phi
sigma_eta_samples[idx] = sigma_eta
# Posterior volatility path
vol_samples = np.exp(h_samples / 2.0)
vol_mean = vol_samples.mean(axis=0)
vol_ci_lower = np.percentile(vol_samples, 2.5, axis=0)
vol_ci_upper = np.percentile(vol_samples, 97.5, axis=0)
return BayesianVolatilityResult(
vol_mean=vol_mean,
vol_ci_lower=vol_ci_lower,
vol_ci_upper=vol_ci_upper,
mu_posterior=mu_samples,
phi_posterior=phi_samples,
sigma_eta_posterior=sigma_eta_samples,
)
# ---------------------------------------------------------------------------
# Bayesian cointegration test
# ---------------------------------------------------------------------------
@dataclass
class BayesianCointegrationResult:
"""Result container for the Bayesian cointegration test.
Attributes:
prob_cointegrated: Posterior probability that the two series are
cointegrated (based on the residual unit-root test).
cointegrating_vector_mean: Posterior mean of the cointegrating
vector ``[1, -beta]`` (i.e. beta is the slope coefficient).
cointegrating_vector_std: Posterior std of beta.
residual_adf_samples: Posterior samples of the ADF-like
autoregressive coefficient on the residuals.
spread_mean: Posterior mean of the spread ``y - beta * x``.
"""
prob_cointegrated: float
cointegrating_vector_mean: float
cointegrating_vector_std: float
residual_adf_samples: np.ndarray
spread_mean: np.ndarray
[docs]
def bayesian_cointegration(
y: np.ndarray,
x: np.ndarray,
n_samples: int = 5_000,
rng_seed: int = 42,
) -> BayesianCointegrationResult:
"""Bayesian cointegration test between two time series.
Tests whether ``y`` and ``x`` are cointegrated by:
1. Estimating the cointegrating regression ``y = alpha + beta * x + e``
using Bayesian linear regression (to get a posterior for beta).
2. Testing the residuals ``e_t`` for stationarity using a Bayesian
version of the ADF test: ``Delta e_t = rho * e_{t-1} + u_t``.
If ``rho < 0``, the residuals are mean-reverting (cointegrated).
The posterior probability of cointegration is estimated as
``P(rho < 0 | data)``.
**When to use this**: Use this for pairs trading (testing if two
assets share a long-run equilibrium) or for constructing
cointegrated portfolios. The Bayesian approach gives a probability
rather than a p-value, which is more natural for decision-making.
Args:
y: First time series, shape ``(T,)``.
x: Second time series, shape ``(T,)``.
n_samples: Number of posterior samples for the ADF coefficient.
rng_seed: Random seed.
Returns:
BayesianCointegrationResult with the posterior probability
of cointegration and the cointegrating vector.
Example:
>>> import numpy as np
>>> rng = np.random.default_rng(0)
>>> x = np.cumsum(rng.normal(size=300))
>>> y = 0.8 * x + rng.normal(0, 0.5, 300) # cointegrated
>>> result = bayesian_cointegration(y, x)
>>> print(f"P(cointegrated) = {result.prob_cointegrated:.2f}")
"""
rng = np.random.default_rng(rng_seed)
y = np.asarray(y, dtype=float).ravel()
x = np.asarray(x, dtype=float).ravel()
T = len(y)
# Step 1: Bayesian regression y = alpha + beta * x + e
X_reg = np.column_stack([np.ones(T), x])
reg_result = bayesian_linear_regression(y, X_reg)
beta_mean = reg_result.posterior_mean[1]
beta_std = np.sqrt(reg_result.posterior_cov_unscaled[1, 1] * reg_result.sigma2_mean)
# Sample beta from posterior (marginal Student-t)
df_beta = 2.0 * reg_result.a_n
scale_beta = np.sqrt(reg_result.posterior_cov_unscaled[1, 1] * reg_result.b_n / reg_result.a_n)
beta_samples = stats.t.rvs(
df=df_beta, loc=beta_mean, scale=scale_beta, size=n_samples, random_state=rng
)
# Step 2: For each beta sample, compute residuals and test for unit root
rho_samples = np.zeros(n_samples)
for i in range(n_samples):
resid = y - reg_result.posterior_mean[0] - beta_samples[i] * x
# ADF-like regression: Delta_e_t = rho * e_{t-1} + u_t
de = np.diff(resid)
e_lag = resid[:-1]
if np.var(e_lag) < 1e-15:
rho_samples[i] = 0.0
continue
# Simple OLS for rho
rho_ols = np.sum(de * e_lag) / np.sum(e_lag ** 2)
resid_u = de - rho_ols * e_lag
sigma2_u = np.var(resid_u, ddof=1) if len(resid_u) > 1 else 1e-6
var_rho = sigma2_u / max(np.sum(e_lag ** 2), 1e-12)
rho_samples[i] = rng.normal(rho_ols, np.sqrt(max(var_rho, 1e-12)))
prob_coint = float(np.mean(rho_samples < 0))
# Spread under posterior mean beta
spread_mean = y - reg_result.posterior_mean[0] - beta_mean * x
return BayesianCointegrationResult(
prob_cointegrated=prob_coint,
cointegrating_vector_mean=float(beta_mean),
cointegrating_vector_std=float(beta_std),
residual_adf_samples=rho_samples,
spread_mean=spread_mean,
)
# ---------------------------------------------------------------------------
# Bayesian model comparison
# ---------------------------------------------------------------------------
[docs]
def model_comparison(
y: np.ndarray,
X_list: list[np.ndarray],
model_names: list[str] | None = None,
) -> "pd.DataFrame":
"""Bayesian model comparison via marginal likelihood and information criteria.
Fits each candidate model using ``bayesian_linear_regression`` and
computes:
- **Log marginal likelihood** (for Bayes factor computation).
- **WAIC** (Widely Applicable Information Criterion) -- a Bayesian
analogue of AIC that accounts for the effective number of
parameters.
- **LOO-CV** (Leave-One-Out Cross-Validation) approximation via
importance sampling.
- **Bayes factor** relative to the best model.
Models are ranked by log marginal likelihood (higher is better).
**When to use this**: Use this when choosing between competing
regression specifications (e.g., which factors belong in a return
model). The marginal likelihood automatically penalises model
complexity (Occam's razor), unlike AIC/BIC which use fixed
penalties.
Args:
y: Response vector of shape ``(n,)``.
X_list: List of design matrices, one per model. Each has shape
``(n, k_m)`` where ``k_m`` can differ.
model_names: Optional names for each model. Defaults to
``['model_0', 'model_1', ...]``.
Returns:
pd.DataFrame with columns ``log_marginal_likelihood``,
``waic``, ``loo_cv``, ``bayes_factor``, ``rank``, sorted by
rank (best first).
Example:
>>> import numpy as np
>>> rng = np.random.default_rng(0)
>>> n = 200
>>> x1 = rng.normal(size=n)
>>> x2 = rng.normal(size=n)
>>> noise = rng.normal(size=n)
>>> y = 1.0 + 2.0 * x1 + rng.normal(0, 0.5, n)
>>> X_good = np.column_stack([np.ones(n), x1])
>>> X_bad = np.column_stack([np.ones(n), x2])
>>> df = model_comparison(y, [X_good, X_bad], ["good", "bad"])
>>> assert df.index[0] == "good" # good model ranked first
"""
import pandas as pd
y = np.asarray(y, dtype=float).ravel()
n_models = len(X_list)
if model_names is None:
model_names = [f"model_{i}" for i in range(n_models)]
results = []
for i, X in enumerate(X_list):
X = np.asarray(X, dtype=float)
if X.ndim == 1:
X = X.reshape(-1, 1)
n = len(y)
res = bayesian_linear_regression(y, X)
# WAIC approximation
# Compute pointwise log-likelihood at posterior mean
y_hat = X @ res.posterior_mean
sigma2_est = res.sigma2_mean
log_lik = -0.5 * np.log(2 * np.pi * sigma2_est) - 0.5 * (y - y_hat) ** 2 / sigma2_est
# p_waic: effective number of parameters (variance of log-lik)
# Approximate using posterior predictive variance
p_waic = float(res.n_features) # simple approximation
lppd = float(np.sum(log_lik))
waic = -2.0 * (lppd - p_waic)
# LOO-CV approximation (Pareto-smoothed importance sampling idea,
# simplified to analytical leave-one-out for linear regression)
H = X @ np.linalg.inv(X.T @ X + 1e-10 * np.eye(X.shape[1])) @ X.T
h_diag = np.diag(H)
resid = y - y_hat
loo_resid = resid / np.maximum(1.0 - h_diag, 1e-8)
loo_cv = float(np.mean(loo_resid ** 2))
results.append({
"log_marginal_likelihood": res.log_marginal_likelihood,
"waic": waic,
"loo_cv": loo_cv,
})
# Bayes factors relative to best model
log_mls = np.array([r["log_marginal_likelihood"] for r in results])
best_log_ml = np.max(log_mls)
for i, r in enumerate(results):
diff = log_mls[i] - best_log_ml
r["bayes_factor"] = float(np.exp(np.clip(diff, -500, 500)))
# Rank by log marginal likelihood (higher is better)
ranking = np.argsort(-log_mls)
for i, r in enumerate(results):
r["rank"] = int(np.where(ranking == i)[0][0]) + 1
df = pd.DataFrame(results, index=model_names)
df = df.sort_values("rank")
return df
# ---------------------------------------------------------------------------
# Bayesian regime inference — bayes/ → regimes/ bridge
# ---------------------------------------------------------------------------
[docs]
def bayesian_regime_inference(
returns: np.ndarray,
n_regimes: int = 2,
n_samples: int = 2000,
seed: int | None = None,
) -> "RegimeResult":
"""Bayesian regime detection using conjugate priors.
Alternative to frequentist HMM (``wraquant.regimes.base.detect_regimes``)
that provides full posterior uncertainty on regime assignments and
transition probabilities. Uses a Gibbs sampler with conjugate
Normal-Inverse-Gamma priors for each regime's mean and variance, and
a Dirichlet prior for transition probabilities.
This bridges ``bayes`` and ``regimes`` by returning a ``RegimeResult``
(the standard container from ``wraquant.regimes.base``), so the output
can be fed directly into downstream regime-aware portfolio or risk
functions.
The generative model is:
s_t | s_{t-1} ~ Categorical(transition_matrix[s_{t-1}])
r_t | s_t ~ N(mu_{s_t}, sigma^2_{s_t})
with priors:
mu_k ~ N(0, prior_var)
sigma^2_k ~ InvGamma(alpha_0, beta_0)
transition[k] ~ Dirichlet(1, ..., 1)
Parameters:
returns: Return series, shape ``(T,)``.
n_regimes: Number of regimes to detect (default 2).
n_samples: Number of Gibbs samples to draw (after a burn-in
of ``n_samples // 2``). More samples give smoother
posterior estimates.
seed: Random seed for reproducibility.
Returns:
RegimeResult with:
- ``states`` -- MAP (most probable) regime at each time step.
- ``probabilities`` -- Posterior regime probabilities (T, K).
- ``transition_matrix`` -- Posterior mean transition matrix.
- ``means`` -- Posterior mean returns per regime.
- ``covariances`` -- Posterior variance per regime.
- ``statistics`` -- Per-regime summary statistics.
Example:
>>> import numpy as np
>>> rng = np.random.default_rng(42)
>>> # Two-regime returns: low vol and high vol
>>> regime = np.concatenate([np.zeros(200), np.ones(200)]).astype(int)
>>> mu = np.array([0.001, -0.002])
>>> sigma = np.array([0.01, 0.03])
>>> returns = rng.normal(mu[regime], sigma[regime])
>>> result = bayesian_regime_inference(returns, n_regimes=2, seed=42)
>>> result.n_regimes
2
>>> result.states.shape
(400,)
Notes:
The Gibbs sampler alternates between:
1. Sampling regime states via forward-filtering backward-sampling.
2. Sampling regime parameters (mu, sigma^2) from conjugate posteriors.
3. Sampling transition probabilities from Dirichlet posteriors.
References:
- Chib (1996), "Calculating posterior distributions and modal
estimates in Markov mixture models"
- Hamilton (1989), "A new approach to the economic analysis of
nonstationary time series"
See Also:
wraquant.regimes.base.detect_regimes: Frequentist regime detection.
wraquant.regimes.base.RegimeResult: Standard result container.
bayesian_changepoint: Changepoint detection (different model).
"""
import pandas as pd
from wraquant.regimes.base import RegimeResult
rng = np.random.default_rng(seed)
returns = np.asarray(returns, dtype=float).ravel()
T = len(returns)
K = n_regimes
burn_in = n_samples // 2
total_iter = n_samples + burn_in
# --- Initialise parameters ---
# K-means-like initialisation: sort returns and split into K groups
sorted_r = np.sort(returns)
chunk = T // K
mu = np.array([np.mean(sorted_r[i * chunk : (i + 1) * chunk]) for i in range(K)])
sigma2 = np.array([np.var(sorted_r[i * chunk : (i + 1) * chunk], ddof=1) + 1e-8 for i in range(K)])
# Transition matrix: high self-persistence
trans = np.full((K, K), 0.05 / max(K - 1, 1))
np.fill_diagonal(trans, 0.95)
trans = trans / trans.sum(axis=1, keepdims=True)
# States: initialise from closest mean
states = np.zeros(T, dtype=int)
for t in range(T):
states[t] = int(np.argmin([abs(returns[t] - mu[k]) for k in range(K)]))
# Prior hyperparameters
alpha_0 = 2.0 # InvGamma shape
beta_0 = 0.001 # InvGamma scale
mu_prior = 0.0
mu_prior_var = 1.0
dirichlet_alpha = np.ones(K) # symmetric Dirichlet
# Storage for posterior samples
state_samples = np.zeros((n_samples, T), dtype=int)
trans_samples = np.zeros((n_samples, K, K))
mu_samples = np.zeros((n_samples, K))
sigma2_samples = np.zeros((n_samples, K))
for it in range(total_iter):
# --- Step 1: Sample states (forward-filtering backward-sampling) ---
# Forward pass: compute filtered probabilities
log_alpha = np.zeros((T, K))
for k in range(K):
log_alpha[0, k] = -0.5 * np.log(2 * np.pi * sigma2[k]) - 0.5 * (returns[0] - mu[k]) ** 2 / sigma2[k]
# Normalise
log_alpha[0] -= np.max(log_alpha[0])
alpha_filt = np.exp(log_alpha[0])
alpha_filt /= alpha_filt.sum()
log_alpha[0] = np.log(alpha_filt + 1e-300)
for t in range(1, T):
for k in range(K):
log_emit = -0.5 * np.log(2 * np.pi * sigma2[k]) - 0.5 * (returns[t] - mu[k]) ** 2 / sigma2[k]
# Sum over previous states
log_trans = np.log(trans[:, k] + 1e-300)
log_alpha[t, k] = log_emit + np.logaddexp.reduce(log_alpha[t - 1] + log_trans)
# Normalise to prevent underflow
max_la = np.max(log_alpha[t])
log_alpha[t] -= max_la
exp_la = np.exp(log_alpha[t])
exp_la /= exp_la.sum()
log_alpha[t] = np.log(exp_la + 1e-300)
# Backward sampling
# Sample last state
prob_T = np.exp(log_alpha[T - 1])
prob_T /= prob_T.sum()
states[T - 1] = rng.choice(K, p=prob_T)
for t in range(T - 2, -1, -1):
log_prob = log_alpha[t] + np.log(trans[:, states[t + 1]] + 1e-300)
log_prob -= np.max(log_prob)
prob = np.exp(log_prob)
prob /= prob.sum()
states[t] = rng.choice(K, p=prob)
# --- Step 2: Sample mu, sigma^2 for each regime ---
for k in range(K):
mask = states == k
n_k = int(mask.sum())
if n_k < 2:
# Not enough data; keep current values
continue
r_k = returns[mask]
y_bar = np.mean(r_k)
# Posterior for mu | sigma^2 (conjugate normal)
prior_prec = 1.0 / mu_prior_var
data_prec = n_k / sigma2[k]
post_prec = prior_prec + data_prec
post_mean = (prior_prec * mu_prior + data_prec * y_bar) / post_prec
mu[k] = rng.normal(post_mean, 1.0 / np.sqrt(post_prec))
# Posterior for sigma^2 (conjugate InvGamma)
a_post = alpha_0 + n_k / 2.0
b_post = beta_0 + 0.5 * np.sum((r_k - mu[k]) ** 2)
sigma2[k] = 1.0 / rng.gamma(a_post, 1.0 / max(b_post, 1e-12))
sigma2[k] = max(sigma2[k], 1e-10)
# --- Step 3: Sample transition matrix (Dirichlet posterior) ---
counts = np.zeros((K, K))
for t in range(T - 1):
counts[states[t], states[t + 1]] += 1
for k in range(K):
post_alpha = dirichlet_alpha + counts[k]
trans[k] = rng.dirichlet(post_alpha)
# Store after burn-in
if it >= burn_in:
idx = it - burn_in
state_samples[idx] = states.copy()
trans_samples[idx] = trans.copy()
mu_samples[idx] = mu.copy()
sigma2_samples[idx] = sigma2.copy()
# --- Compute posterior summaries ---
# Posterior regime probabilities: fraction of samples in each regime
probabilities = np.zeros((T, K))
for k in range(K):
probabilities[:, k] = np.mean(state_samples == k, axis=0)
# MAP states
map_states = np.argmax(probabilities, axis=1)
# Re-order by ascending volatility
post_sigma2_mean = sigma2_samples.mean(axis=0)
order = np.argsort(post_sigma2_mean)
state_map = {int(old): new for new, old in enumerate(order)}
map_states_ordered = np.array([state_map[int(s)] for s in map_states])
probs_ordered = probabilities[:, order]
mu_ordered = mu_samples.mean(axis=0)[order]
sigma2_ordered = post_sigma2_mean[order]
# Reorder transition matrix
trans_mean = trans_samples.mean(axis=0)
trans_ordered = trans_mean[np.ix_(order, order)]
# Compute per-regime statistics
records = []
for k in range(K):
mask = map_states_ordered == k
n_k = int(mask.sum())
r_k = returns[mask]
records.append({
"regime": k,
"mean": float(np.mean(r_k)) if n_k > 0 else 0.0,
"std": float(np.std(r_k, ddof=1)) if n_k > 1 else 0.0,
"pct_time": float(n_k / T),
"n_obs": n_k,
})
statistics = pd.DataFrame(records).set_index("regime")
return RegimeResult(
states=map_states_ordered,
probabilities=probs_ordered,
transition_matrix=trans_ordered,
n_regimes=K,
means=mu_ordered,
covariances=sigma2_ordered,
statistics=statistics,
method="bayesian_gibbs",
model=None,
metadata={
"n_samples": n_samples,
"burn_in": burn_in,
"mu_samples": mu_samples,
"sigma2_samples": sigma2_samples,
"trans_samples": trans_samples,
},
)