"""Hidden Markov Model and mixture-model regime detection.
This module provides production-quality regime detection for financial
time series using Gaussian Hidden Markov Models (HMMs), Markov-switching
models, and Gaussian mixture models (GMMs).
**When to use each model:**
- **Gaussian HMM** (``fit_gaussian_hmm``): The workhorse. Models returns as
emissions from a latent Markov chain. Captures temporal persistence of
regimes (markets stay in bull/bear for extended periods).
- **Markov-Switching Regression** (``fit_ms_regression``): When you want
regime-dependent regression coefficients or intercepts. Useful for
modeling time-varying factor exposures.
- **Gaussian Mixture** (``gaussian_mixture_regimes``): Quick classification
without temporal structure. Good for labeling return distributions when
you do not care about transition dynamics.
References:
Hamilton, J. D. (1989). "A New Approach to the Economic Analysis of
Nonstationary Time Series and the Business Cycle." *Econometrica*, 57(2).
Ang, A. & Bekaert, G. (2002). "Regime Switches in Interest Rates."
*Journal of Business & Economic Statistics*, 20(2).
"""
from __future__ import annotations
from typing import Any
import numpy as np
import pandas as pd
from wraquant.core._coerce import coerce_array
from wraquant.core.decorators import requires_extra
# ---------------------------------------------------------------------------
# Gaussian HMM
# ---------------------------------------------------------------------------
[docs]
@requires_extra("regimes")
def fit_gaussian_hmm(
returns: pd.Series | np.ndarray,
n_states: int = 2,
covariance_type: str = "full",
n_init: int = 10,
n_iter: int = 200,
tol: float = 1e-4,
random_state: int = 42,
) -> dict[str, Any]:
"""Fit a Gaussian Hidden Markov Model to financial return data.
A Gaussian HMM assumes that observed returns are generated by a latent
(hidden) Markov chain with *K* states. Each state *k* has its own
Gaussian emission distribution N(mu_k, sigma_k). The model learns:
1. **Transition matrix** A[i,j] = P(s_t = j | s_{t-1} = i)
2. **Emission means** mu_k for each state
3. **Emission covariances** Sigma_k for each state
4. **Initial state distribution** pi_k = P(s_0 = k)
The EM algorithm (Baum-Welch) is used for parameter estimation. Because
EM can converge to local optima, multiple random restarts (``n_init``)
are performed and the model with the highest log-likelihood is retained.
**Typical interpretation for 2-state model on equity returns:**
- **Low-volatility state** ("risk-on" / bull): Positive mean return,
low standard deviation, high Sharpe ratio. Markets are calm.
- **High-volatility state** ("risk-off" / crisis): Near-zero or negative
mean return, high standard deviation. Markets are stressed.
States are automatically re-ordered so that state 0 has the lowest
volatility and state K-1 has the highest volatility. This ensures
consistent labeling across runs.
Parameters:
returns: Return series (simple or log returns). If a pandas Series,
NaN values are dropped and the index is preserved.
n_states: Number of hidden states. Common choices are 2 (bull/bear)
or 3 (bull/neutral/bear). Values above 5 risk overfitting.
covariance_type: Type of covariance matrix. One of ``'full'``,
``'diag'``, ``'spherical'``, ``'tied'``.
n_init: Number of random restarts for EM. The model with the
highest log-likelihood is selected. More restarts reduce the
chance of a poor local optimum.
n_iter: Maximum number of EM iterations per restart.
tol: Convergence threshold for EM.
random_state: Base random seed for reproducibility.
Returns:
Dictionary with the following keys:
- ``states`` (np.ndarray): Most likely state sequence (Viterbi),
shape ``(T,)``, integers in ``[0, K-1]``.
- ``state_probs`` (np.ndarray): Smoothed state probabilities,
shape ``(T, K)``. Each row sums to 1.
- ``transition_matrix`` (np.ndarray): Transition probability
matrix, shape ``(K, K)``. Row *i* sums to 1.
- ``means`` (np.ndarray): Emission means, shape ``(K,)``.
- ``covariances`` (np.ndarray): Emission covariances,
shape ``(K,)`` (variance per state for univariate data).
- ``startprob`` (np.ndarray): Initial state distribution,
shape ``(K,)``.
- ``log_likelihood`` (float): Total log-likelihood of the data.
- ``aic`` (float): Akaike Information Criterion.
- ``bic`` (float): Bayesian Information Criterion.
- ``n_states`` (int): Number of states.
- ``steady_state`` (np.ndarray): Ergodic (stationary) distribution
of the Markov chain, shape ``(K,)``.
- ``avg_duration`` (np.ndarray): Expected duration in each state,
``1 / (1 - A[i,i])``, shape ``(K,)``.
- ``model`` (GaussianHMM): The fitted hmmlearn model object.
- ``index`` (pd.Index | None): Original index if input was a
pandas Series.
Example:
>>> import numpy as np
>>> import pandas as pd
>>> rng = np.random.default_rng(42)
>>> # Simulate 2-regime returns
>>> low_vol = rng.normal(0.001, 0.008, 300)
>>> high_vol = rng.normal(-0.002, 0.025, 200)
>>> returns = pd.Series(np.concatenate([low_vol, high_vol]))
>>> result = fit_gaussian_hmm(returns, n_states=2)
>>> print(result['means']) # Per-state means
>>> print(result['transition_matrix']) # Transition probabilities
>>> print(result['steady_state']) # Long-run regime probabilities
Notes:
The model uses hmmlearn's ``GaussianHMM``. For very long series
(>10,000 observations), consider downsampling or using the
variational Bayes variant.
See Also:
fit_ms_regression: Markov-switching regression with statsmodels.
gaussian_mixture_regimes: GMM-based regime classification.
regime_statistics: Compute per-regime financial statistics.
regime_transition_analysis: Analyze regime transition dynamics.
wraquant.regimes.base.detect_regimes: Unified high-level interface.
wraquant.backtest.position.regime_signal_filter: Gate signals by regime.
wraquant.viz.dashboard.regime_dashboard: Visualize regime analysis.
"""
from hmmlearn.hmm import GaussianHMM
# Prepare data -- preserve index if input is a Series
index = returns.index if isinstance(returns, pd.Series) else None
arr = coerce_array(returns, "returns")
mask = ~np.isnan(arr)
arr = arr[mask]
if index is not None:
# Align index with non-NaN entries
full_idx = returns.dropna().index
if len(full_idx) == len(arr):
index = full_idx
X = arr.reshape(-1, 1)
T = len(X)
# Fit with multiple restarts and keep the best
best_model = None
best_score = -np.inf
for i in range(n_init):
model = GaussianHMM(
n_components=n_states,
covariance_type=covariance_type,
n_iter=n_iter,
tol=tol,
random_state=random_state + i,
)
try:
model.fit(X)
score = model.score(X)
if score > best_score:
best_score = score
best_model = model
except Exception:
# EM can fail on degenerate initialisations; skip and try again
continue
if best_model is None:
raise RuntimeError(
f"All {n_init} EM restarts failed. Try different n_states or "
"check that the data has sufficient variation."
)
model = best_model
# Decode: Viterbi for most likely states, posterior for probabilities
states = model.predict(X)
state_probs = model.predict_proba(X)
# Re-order states by ascending volatility for consistent labeling
variances = _extract_variances(model.covars_, n_states, covariance_type)
order = np.argsort(variances)
# Remap states
state_map = {int(old): new for new, old in enumerate(order)}
states = np.array([state_map[int(s)] for s in states])
state_probs = state_probs[:, order]
means = model.means_.flatten()[order]
covariances = variances[order]
startprob = model.startprob_[order]
# Reorder transition matrix
transmat = model.transmat_[np.ix_(order, order)]
# Information criteria
# Number of free parameters for Gaussian HMM
n_params = _count_hmm_params(n_states, covariance_type)
log_likelihood = float(best_score)
aic = float(2 * n_params - 2 * log_likelihood)
bic = float(n_params * np.log(T) - 2 * log_likelihood)
# Steady-state (ergodic) distribution from transition matrix
steady_state = _compute_steady_state(transmat)
# Expected duration in each state: 1 / (1 - p_ii)
self_transition = np.diag(transmat)
avg_duration = 1.0 / np.maximum(1.0 - self_transition, 1e-10)
return {
"states": states,
"state_probs": state_probs,
"transition_matrix": transmat,
"means": means,
"covariances": covariances,
"startprob": startprob,
"log_likelihood": log_likelihood,
"aic": aic,
"bic": bic,
"n_states": n_states,
"steady_state": steady_state,
"avg_duration": avg_duration,
"model": model,
"index": index,
}
# ---------------------------------------------------------------------------
# Legacy wrapper (backward compatibility)
# ---------------------------------------------------------------------------
[docs]
@requires_extra("regimes")
def fit_hmm(returns: pd.Series, n_states: int = 2) -> Any:
"""Fit a Gaussian HMM to return data (legacy interface).
This is a thin compatibility wrapper around ``fit_gaussian_hmm``.
New code should use ``fit_gaussian_hmm`` directly for richer output.
Parameters:
returns: Simple return series.
n_states: Number of hidden states.
Returns:
Fitted ``hmmlearn.hmm.GaussianHMM`` model.
See Also:
fit_gaussian_hmm: Full-featured HMM fitting with rich output.
"""
result = fit_gaussian_hmm(returns, n_states=n_states, n_init=5)
return result["model"]
# ---------------------------------------------------------------------------
# Predict regime
# ---------------------------------------------------------------------------
[docs]
def predict_regime(
model: Any,
returns: pd.Series | np.ndarray,
) -> dict[str, Any]:
"""Predict regime states and probabilities for new data.
Given a fitted HMM (from ``fit_gaussian_hmm`` or hmmlearn directly),
decode the most likely hidden state sequence and compute posterior
state probabilities for new observations.
Parameters:
model: A fitted HMM model with ``predict`` and ``predict_proba``
methods (e.g., from hmmlearn), **or** the dict returned by
``fit_gaussian_hmm`` (in which case the ``'model'`` key is
used).
returns: Return series to classify. NaN values are dropped.
Returns:
Dictionary with:
- ``states`` (np.ndarray): Most likely state at each time step.
- ``state_probs`` (np.ndarray): Posterior probabilities, shape
``(T, K)``.
- ``index`` (pd.Index | None): Preserved index if input was a
pandas Series.
Example:
>>> result = fit_gaussian_hmm(train_returns, n_states=2)
>>> pred = predict_regime(result, test_returns)
>>> print(pred['states'])
>>> print(pred['state_probs'][:5])
"""
# Accept dict output from fit_gaussian_hmm
if isinstance(model, dict) and "model" in model:
model = model["model"]
index = returns.index if isinstance(returns, pd.Series) else None
arr = coerce_array(returns, "returns")
mask = ~np.isnan(arr)
arr = arr[mask]
if index is not None:
full_idx = returns.dropna().index
if len(full_idx) == len(arr):
index = full_idx
X = arr.reshape(-1, 1)
states = model.predict(X)
state_probs = model.predict_proba(X)
return {
"states": states,
"state_probs": state_probs,
"index": index,
}
# ---------------------------------------------------------------------------
# Markov-Switching Regression (statsmodels)
# ---------------------------------------------------------------------------
[docs]
def fit_ms_regression(
endog: pd.Series | np.ndarray,
k_regimes: int = 2,
trend: str = "c",
switching_variance: bool = True,
exog: pd.DataFrame | np.ndarray | None = None,
n_iter: int = 200,
) -> dict[str, Any]:
"""Fit a Markov-switching regression model using statsmodels.
This wraps ``statsmodels.tsa.regime_switching.MarkovRegression``,
which estimates a linear regression where the intercept (and
optionally variance) can switch between regimes according to a
hidden Markov chain.
**When to use:** When you want to model how the data-generating
process changes across regimes, including regime-dependent means
and volatilities. More interpretable than a raw HMM for economic
modeling because you get regression coefficients per regime.
The model:
.. math::
y_t = \\mu_{s_t} + x_t' \\beta_{s_t} + \\sigma_{s_t} \\epsilon_t
where :math:`s_t \\in \\{0, 1, \\ldots, K-1\\}` follows a first-order
Markov chain with transition matrix :math:`P`.
Parameters:
endog: Endogenous (dependent) variable. Typically a return series.
k_regimes: Number of regimes (hidden states).
trend: Trend specification for the mean equation.
``'c'`` (constant), ``'t'`` (linear trend), ``'ct'`` (both),
``'n'`` (no trend).
switching_variance: If True, the variance switches across regimes.
Almost always desirable for financial data.
exog: Optional exogenous regressors, shape ``(T, p)``. The
coefficients on these regressors are **not** switching by
default (the intercept and variance switch).
n_iter: Maximum number of EM iterations.
Returns:
Dictionary with:
- ``smoothed_probs`` (np.ndarray): Smoothed regime probabilities,
shape ``(T, K)``. ``smoothed_probs[:, k]`` is the probability
of being in regime *k* at each time step given all data.
- ``filtered_probs`` (np.ndarray): Filtered regime probabilities
(using only past data), shape ``(T, K)``.
- ``states`` (np.ndarray): Most likely state sequence (argmax
of smoothed probabilities), shape ``(T,)``.
- ``transition_matrix`` (np.ndarray): Transition matrix,
shape ``(K, K)``.
- ``regime_params`` (dict): Regime-specific parameters. Keys
are ``'mean_0'``, ``'mean_1'``, ..., ``'sigma2_0'``, etc.
- ``log_likelihood`` (float): Log-likelihood of the fitted model.
- ``aic`` (float): Akaike Information Criterion.
- ``bic`` (float): Bayesian Information Criterion.
- ``summary`` (str): Text summary from statsmodels.
- ``model_result`` (MarkovRegressionResults): The full
statsmodels result object.
Example:
>>> import numpy as np, pandas as pd
>>> rng = np.random.default_rng(0)
>>> returns = pd.Series(np.concatenate([
... rng.normal(0.001, 0.01, 200),
... rng.normal(-0.002, 0.03, 200),
... ]))
>>> result = fit_ms_regression(returns, k_regimes=2)
>>> print(result['transition_matrix'])
>>> print(result['regime_params'])
Notes:
Requires ``statsmodels >= 0.13``. The model uses Hamilton's
(1989) filter for inference. Convergence can be sensitive to
starting values.
See Also:
fit_gaussian_hmm: HMM using hmmlearn (more flexible emission
distributions).
regime_statistics: Compute financial statistics per regime.
"""
from statsmodels.tsa.regime_switching.markov_regression import (
MarkovRegression,
)
y = coerce_array(endog, "endog")
model = MarkovRegression(
y,
k_regimes=k_regimes,
trend=trend,
switching_variance=switching_variance,
exog=exog,
)
result = model.fit(maxiter=n_iter, disp=False)
# Extract regime-specific parameters
regime_params: dict[str, float] = {}
for k in range(k_regimes):
regime_params[f"mean_{k}"] = float(result.params[k])
if switching_variance:
for k in range(k_regimes):
regime_params[f"sigma2_{k}"] = float(result.params[k_regimes + k])
# Transition matrix from result
transmat = np.array(result.regime_transition).T
# statsmodels returns (K, T) shaped arrays; transpose to (T, K)
raw_smoothed = np.array(result.smoothed_marginal_probabilities)
raw_filtered = np.array(result.filtered_marginal_probabilities)
# Ensure shape is (T, K) regardless of statsmodels version
if (
raw_smoothed.shape[0] == k_regimes
and raw_smoothed.shape[0] != raw_smoothed.shape[1]
):
smoothed_probs = raw_smoothed.T
else:
smoothed_probs = raw_smoothed
if (
raw_filtered.shape[0] == k_regimes
and raw_filtered.shape[0] != raw_filtered.shape[1]
):
filtered_probs = raw_filtered.T
else:
filtered_probs = raw_filtered
states = np.argmax(smoothed_probs, axis=1)
return {
"smoothed_probs": smoothed_probs,
"filtered_probs": filtered_probs,
"states": states,
"transition_matrix": transmat,
"regime_params": regime_params,
"log_likelihood": float(result.llf),
"aic": float(result.aic),
"bic": float(result.bic),
"summary": str(result.summary()),
"model_result": result,
}
# ---------------------------------------------------------------------------
# Regime statistics
# ---------------------------------------------------------------------------
[docs]
def regime_statistics(
returns: pd.Series | np.ndarray,
states: np.ndarray,
) -> pd.DataFrame:
"""Compute descriptive statistics for each detected regime.
After fitting an HMM or GMM, this function summarizes the financial
characteristics of each regime. Typical interpretation for a
2-state model on equity returns:
- **Regime 0** (low vol): "risk-on" or "bull market" -- positive
mean return, low volatility, high Sharpe ratio.
- **Regime 1** (high vol): "risk-off" or "crisis" -- negative
or zero mean, high volatility, negative Sharpe.
Parameters:
returns: Return series aligned with ``states``. Can be a pandas
Series or a numpy array.
states: Integer regime labels (0, 1, ..., K-1), same length
as ``returns``.
Returns:
pd.DataFrame with one row per regime and columns:
``mean``, ``std``, ``sharpe``, ``sortino_ratio``, ``min``, ``max``,
``skewness``, ``kurtosis``, ``max_drawdown``, ``VaR_95``,
``CVaR_95``, ``n_observations``, ``pct_time``, ``avg_duration``.
- ``sharpe``: Annualized Sharpe ratio assuming 252 trading days.
- ``sortino_ratio``: Annualized Sortino ratio (downside deviation
only, 252 trading days).
- ``max_drawdown``: Maximum peak-to-trough drawdown within the
regime's return segments.
- ``VaR_95``: 5th percentile of returns (historical Value at Risk
at the 95 % confidence level).
- ``CVaR_95``: Mean of returns below the VaR_95 threshold
(Conditional Value at Risk / Expected Shortfall).
- ``avg_duration``: Mean consecutive time steps spent in the regime.
Example:
>>> result = fit_gaussian_hmm(returns, n_states=2)
>>> stats = regime_statistics(returns, result['states'])
>>> print(stats)
mean std sharpe ... avg_duration
regime
0 0.000523 0.008123 1.0223 ... 15.3
1 -0.001842 0.024561 -1.1917 ... 7.8
See Also:
fit_gaussian_hmm: Fit HMM to obtain state assignments.
wraquant.regimes.base.regime_report: Comprehensive regime analysis.
wraquant.backtest.position.regime_conditional_sizing: Adjust positions by regime.
regime_transition_analysis: Analyze transition dynamics.
"""
r = coerce_array(returns, "returns")
s = np.asarray(states, dtype=int).flatten()
if len(r) != len(s):
raise ValueError(
f"returns and states must have the same length, "
f"got {len(r)} and {len(s)}."
)
total = len(r)
unique_states = sorted(np.unique(s))
records = []
for state in unique_states:
mask = s == state
regime_r = r[mask]
n_obs = int(mask.sum())
mean_ret = float(np.mean(regime_r))
std_ret = float(np.std(regime_r, ddof=1)) if n_obs > 1 else 0.0
# Use canonical Sharpe implementation
from wraquant.risk.metrics import sharpe_ratio as _sharpe_ratio
sharpe = (
_sharpe_ratio(pd.Series(regime_r))
if n_obs > 2
else 0.0
)
# Sortino ratio (downside deviation only)
downside = regime_r[regime_r < 0]
if len(downside) > 1:
downside_std = float(np.sqrt(np.mean(downside**2)))
sortino = (
float(mean_ret / downside_std * np.sqrt(252))
if downside_std > 1e-12
else 0.0
)
else:
sortino = 0.0
# Max drawdown within the regime's contiguous segments
max_dd = _max_drawdown_from_returns(regime_r)
# VaR and CVaR at 95 % confidence
var_95 = float(np.percentile(regime_r, 5))
cvar_mask = regime_r <= var_95
cvar_95 = float(np.mean(regime_r[cvar_mask])) if cvar_mask.any() else var_95
# Average consecutive duration in this regime
avg_dur = _avg_regime_duration(s, state)
records.append(
{
"regime": state,
"mean": mean_ret,
"std": std_ret,
"sharpe": sharpe,
"sortino_ratio": sortino,
"min": float(np.min(regime_r)),
"max": float(np.max(regime_r)),
"skewness": float(_skewness(regime_r)),
"kurtosis": float(_kurtosis(regime_r)),
"max_drawdown": max_dd,
"VaR_95": var_95,
"CVaR_95": cvar_95,
"n_observations": n_obs,
"pct_time": float(n_obs / total) if total > 0 else 0.0,
"avg_duration": avg_dur,
}
)
return pd.DataFrame(records).set_index("regime")
# ---------------------------------------------------------------------------
# Regime transition analysis
# ---------------------------------------------------------------------------
[docs]
def regime_transition_analysis(
states: np.ndarray,
transition_matrix: np.ndarray | None = None,
) -> dict[str, Any]:
"""Analyze regime transition dynamics.
Computes empirical and model-based statistics about how regimes
switch. Useful for understanding regime persistence and expected
timing of regime changes.
Parameters:
states: Integer regime labels, shape ``(T,)``.
transition_matrix: Model-estimated transition matrix, shape
``(K, K)``. If None, the empirical transition matrix is
computed from ``states``.
Returns:
Dictionary with:
- ``empirical_transition_matrix`` (np.ndarray): Transition
counts normalised to probabilities, shape ``(K, K)``.
- ``transition_matrix`` (np.ndarray): Model or empirical
transition matrix used for analysis.
- ``avg_duration`` (np.ndarray): Expected duration in each
regime (time steps), ``1 / (1 - P[i,i])``.
- ``steady_state`` (np.ndarray): Ergodic distribution from
the transition matrix.
- ``regime_counts`` (dict): Number of distinct "visits" to
each regime.
- ``regime_durations`` (dict): List of consecutive durations
for each regime.
- ``expected_return_time`` (np.ndarray): Expected number of
steps to return to each regime from stationarity.
Example:
>>> result = fit_gaussian_hmm(returns, n_states=2)
>>> analysis = regime_transition_analysis(
... result['states'], result['transition_matrix']
... )
>>> print(f"Bull market lasts ~{analysis['avg_duration'][0]:.0f} days")
>>> print(f"Steady state: {analysis['steady_state']}")
See Also:
fit_gaussian_hmm: Fit HMM to obtain states and transition matrix.
regime_statistics: Financial statistics per regime.
"""
s = np.asarray(states, dtype=int).flatten()
unique_states = sorted(np.unique(s))
K = len(unique_states)
# Empirical transition matrix
emp_transmat = np.zeros((K, K))
state_to_idx = {state: i for i, state in enumerate(unique_states)}
for t in range(len(s) - 1):
i = state_to_idx[s[t]]
j = state_to_idx[s[t + 1]]
emp_transmat[i, j] += 1
# Normalise rows
row_sums = emp_transmat.sum(axis=1, keepdims=True)
row_sums = np.maximum(row_sums, 1e-10)
emp_transmat = emp_transmat / row_sums
# Use model transition matrix if provided, else empirical
transmat = transition_matrix if transition_matrix is not None else emp_transmat
# Expected duration: 1 / (1 - p_ii)
self_transition = np.diag(transmat)
avg_duration = 1.0 / np.maximum(1.0 - self_transition, 1e-10)
# Steady state
steady_state = _compute_steady_state(transmat)
# Regime visits and durations
regime_counts: dict[int, int] = {st: 0 for st in unique_states}
regime_durations: dict[int, list[int]] = {st: [] for st in unique_states}
if len(s) > 0:
current_state = s[0]
current_duration = 1
regime_counts[current_state] = 1
for t in range(1, len(s)):
if s[t] == current_state:
current_duration += 1
else:
regime_durations[current_state].append(current_duration)
current_state = s[t]
current_duration = 1
regime_counts[current_state] += 1
regime_durations[current_state].append(current_duration)
# Expected return time: 1 / pi_i
expected_return = np.zeros(K)
for i in range(K):
expected_return[i] = (
1.0 / steady_state[i] if steady_state[i] > 1e-10 else np.inf
)
return {
"empirical_transition_matrix": emp_transmat,
"transition_matrix": transmat,
"avg_duration": avg_duration,
"steady_state": steady_state,
"regime_counts": regime_counts,
"regime_durations": regime_durations,
"expected_return_time": expected_return,
}
# ---------------------------------------------------------------------------
# Gaussian Mixture Regimes
# ---------------------------------------------------------------------------
[docs]
def gaussian_mixture_regimes(
returns: pd.Series | np.ndarray,
n_components: int = 2,
covariance_type: str = "full",
n_init: int = 10,
random_state: int = 42,
) -> dict[str, Any]:
"""Classify returns into regimes using a Gaussian Mixture Model.
Unlike HMMs, GMMs do not model temporal dependence between states.
Each observation is independently assigned to the most likely
Gaussian component. This makes GMMs:
- **Faster** to fit than HMMs.
- **Simpler** to interpret (no transition dynamics).
- **Less powerful** for capturing regime persistence.
Good for quick regime labeling or as a baseline before fitting an HMM.
The model:
.. math::
p(r_t) = \\sum_{k=1}^{K} \\pi_k \\, \\mathcal{N}(r_t; \\mu_k, \\Sigma_k)
Parameters:
returns: Return series. NaN values are dropped.
n_components: Number of mixture components (regimes).
covariance_type: Covariance type (``'full'``, ``'tied'``,
``'diag'``, ``'spherical'``).
n_init: Number of random restarts.
random_state: Random seed.
Returns:
Dictionary with:
- ``states`` (np.ndarray): Component assignments, shape ``(T,)``.
- ``state_probs`` (np.ndarray): Posterior probabilities, shape
``(T, K)``.
- ``means`` (np.ndarray): Component means, shape ``(K,)``.
- ``covariances`` (np.ndarray): Component variances, shape
``(K,)``.
- ``weights`` (np.ndarray): Mixing proportions, shape ``(K,)``.
- ``aic`` (float): AIC.
- ``bic`` (float): BIC.
- ``model`` (GaussianMixture): Fitted sklearn model.
- ``index`` (pd.Index | None): Preserved index.
Example:
>>> result = gaussian_mixture_regimes(returns, n_components=2)
>>> print(result['means']) # Component means
>>> print(result['weights']) # Mixing proportions
>>> # Combine with regime_statistics for financial interpretation
>>> stats = regime_statistics(returns, result['states'])
See Also:
fit_gaussian_hmm: HMM with temporal regime dynamics.
regime_statistics: Per-regime financial statistics.
"""
from sklearn.mixture import GaussianMixture
index = returns.index if isinstance(returns, pd.Series) else None
arr = coerce_array(returns, "returns")
mask = ~np.isnan(arr)
arr = arr[mask]
if index is not None:
full_idx = returns.dropna().index
if len(full_idx) == len(arr):
index = full_idx
X = arr.reshape(-1, 1)
gmm = GaussianMixture(
n_components=n_components,
covariance_type=covariance_type,
n_init=n_init,
random_state=random_state,
)
gmm.fit(X)
states = gmm.predict(X)
state_probs = gmm.predict_proba(X)
# Re-order by ascending variance
variances = gmm.covariances_.flatten()[:n_components]
order = np.argsort(variances)
state_map = {int(old): new for new, old in enumerate(order)}
states = np.array([state_map[int(s)] for s in states])
state_probs = state_probs[:, order]
means = gmm.means_.flatten()[order]
covariances = variances[order]
weights = gmm.weights_[order]
return {
"states": states,
"state_probs": state_probs,
"means": means,
"covariances": covariances,
"weights": weights,
"aic": float(gmm.aic(X)),
"bic": float(gmm.bic(X)),
"model": gmm,
"index": index,
}
# ---------------------------------------------------------------------------
# Rolling Regime Probability
# ---------------------------------------------------------------------------
[docs]
@requires_extra("regimes")
def rolling_regime_probability(
returns: pd.Series,
n_states: int = 2,
window: int | None = None,
min_window: int = 60,
n_init: int = 5,
) -> pd.DataFrame:
"""Compute time-varying regime probabilities using rolling/expanding HMM.
Fits a Gaussian HMM at each time step using either a rolling or
expanding window, producing a time series of regime probabilities
that adapts to new data. Useful for real-time regime monitoring.
Parameters:
returns: Return series.
n_states: Number of hidden states.
window: Rolling window size. If None, uses an expanding window
(all data up to time *t*).
min_window: Minimum number of observations required before
fitting the first model.
n_init: Number of EM restarts per window.
Returns:
pd.DataFrame with columns ``prob_0``, ``prob_1``, ..., ``prob_{K-1}``
indexed by the return series index. Rows before ``min_window``
are NaN.
Example:
>>> probs = rolling_regime_probability(returns, n_states=2, window=252)
>>> probs.plot(title="Regime Probabilities Over Time")
Notes:
This is computationally expensive. For 1000 observations with
``window=252``, approximately 750 HMM fits are performed. Consider
using a large ``window`` or expanding mode for efficiency.
See Also:
fit_gaussian_hmm: Single-shot HMM fitting.
"""
clean = returns.dropna()
T = len(clean)
values = clean.values.reshape(-1, 1)
prob_cols = [f"prob_{k}" for k in range(n_states)]
result = pd.DataFrame(np.nan, index=clean.index, columns=prob_cols, dtype=float)
for t in range(min_window, T):
if window is not None:
start = max(0, t - window)
else:
start = 0
X_window = values[start : t + 1]
try:
hmm_result = fit_gaussian_hmm(
X_window.flatten(),
n_states=n_states,
n_init=n_init,
n_iter=100,
)
# Take the probability at the last time step
last_probs = hmm_result["state_probs"][-1]
result.iloc[t] = last_probs
except Exception:
# If EM fails for this window, leave as NaN
pass
return result
# ---------------------------------------------------------------------------
# Regime-Aware Portfolio
# ---------------------------------------------------------------------------
[docs]
def regime_aware_portfolio(
regime_probs: np.ndarray,
regime_weights: np.ndarray,
) -> np.ndarray:
"""Compute blended portfolio weights using regime probabilities.
Given the current probability of being in each regime and the
optimal portfolio weights for each regime, compute a
probability-weighted blend. This bridges regime detection
(``regimes/``) with portfolio optimisation (``opt/``).
The blended weight for asset *j* is:
.. math::
w_j = \\sum_{k=1}^{K} P(s_t = k) \\cdot w_j^{(k)}
Parameters:
regime_probs: Current regime probabilities, shape ``(K,)``.
Must sum to 1.
regime_weights: Optimal weights for each regime, shape
``(K, n_assets)``. Row *k* contains the weights for
regime *k*.
Returns:
np.ndarray: Blended portfolio weights, shape ``(n_assets,)``.
Sums to 1 if each row of ``regime_weights`` sums to 1.
Example:
>>> # 60% probability of bull, 40% probability of bear
>>> probs = np.array([0.6, 0.4])
>>> # Bull: 80% equity / 20% bonds; Bear: 30% equity / 70% bonds
>>> weights = np.array([[0.8, 0.2], [0.3, 0.7]])
>>> blended = regime_aware_portfolio(probs, weights)
>>> print(blended) # [0.6*0.8 + 0.4*0.3, 0.6*0.2 + 0.4*0.7]
[0.6 0.4]
See Also:
fit_gaussian_hmm: Obtain regime probabilities.
rolling_regime_probability: Time-varying regime probabilities.
"""
probs = np.asarray(regime_probs, dtype=np.float64)
weights = np.asarray(regime_weights, dtype=np.float64)
if probs.ndim != 1:
raise ValueError(f"regime_probs must be 1-D, got shape {probs.shape}")
if weights.ndim != 2:
raise ValueError(f"regime_weights must be 2-D, got shape {weights.shape}")
if probs.shape[0] != weights.shape[0]:
raise ValueError(
f"Number of regimes in probs ({probs.shape[0]}) and weights "
f"({weights.shape[0]}) must match."
)
return probs @ weights
# ---------------------------------------------------------------------------
# Markov-Switching Autoregression
# ---------------------------------------------------------------------------
[docs]
def fit_ms_autoregression(
endog: pd.Series | np.ndarray,
k_regimes: int = 2,
order: int = 1,
switching_ar: bool = True,
switching_variance: bool = True,
exog_tvtp: np.ndarray | pd.DataFrame | None = None,
) -> dict[str, Any]:
"""Fit a Markov-switching autoregression (MS-AR) model.
This wraps ``statsmodels.tsa.regime_switching.MarkovAutoregression``,
which extends the standard Markov-switching regression to include
autoregressive lags whose coefficients can also switch across regimes.
**Hamilton (1989) framework:**
The seminal Hamilton (1989) paper introduced the Markov-switching
autoregressive model to study business-cycle dynamics. The key insight
is that the data-generating process can shift between distinct
regimes (e.g., expansion vs. recession) governed by a latent Markov
chain, and within each regime the series follows an AR(p) process
with regime-specific parameters:
.. math::
y_t = \\mu_{s_t} + \\phi_{1,s_t}\\,(y_{t-1} - \\mu_{s_{t-1}})
+ \\cdots + \\phi_{p,s_t}\\,(y_{t-p} - \\mu_{s_{t-p}})
+ \\sigma_{s_t}\\,\\epsilon_t
**When to use MS-AR vs. plain HMM:**
- Use **MS-AR** when the series exhibits serial correlation *within*
each regime (e.g., GDP growth, interest rates, momentum returns).
The autoregressive terms capture local dynamics that a plain HMM
ignores.
- Use a **plain Gaussian HMM** (``fit_gaussian_hmm``) when returns
are approximately i.i.d. within each regime and only the mean and
variance shift. This is typical for daily equity returns.
- Use **MS-Regression** (``fit_ms_regression``) when you want
regime-dependent intercepts/exogenous coefficients but no AR lags.
Parameters:
endog: Endogenous (dependent) variable. Typically a return or
growth-rate series.
k_regimes: Number of regimes (hidden states). 2 is the most
common choice (e.g., expansion/recession).
order: Autoregressive lag order *p*. Higher orders capture
longer memory within each regime.
switching_ar: If True, the AR coefficients switch across
regimes. If False, only the intercept and/or variance
switch.
switching_variance: If True, the innovation variance switches
across regimes. Almost always desirable for financial data.
exog_tvtp: Optional exogenous variables for time-varying
transition probabilities (TVTP). Shape ``(T, q)``. When
provided, the transition probabilities are modelled as
logistic functions of these variables, allowing macro
indicators to influence regime-switching dynamics.
Returns:
Dictionary with:
- ``smoothed_probs`` (np.ndarray): Smoothed regime probabilities,
shape ``(T, K)``.
- ``filtered_probs`` (np.ndarray): Filtered regime probabilities,
shape ``(T, K)``.
- ``states`` (np.ndarray): Most likely state sequence (argmax of
smoothed probabilities), shape ``(T,)``.
- ``transition_matrix`` (np.ndarray): Transition matrix,
shape ``(K, K)``.
- ``expected_durations`` (np.ndarray): Expected duration in each
regime, ``1 / (1 - P[k,k])``, shape ``(K,)``.
- ``regime_params`` (dict): Per-regime parameters including
``'mean_k'``, ``'sigma2_k'``, and ``'ar_k_j'`` (AR
coefficient *j* for regime *k*).
- ``aic`` (float): Akaike Information Criterion.
- ``bic`` (float): Bayesian Information Criterion.
- ``summary`` (str): Text summary from statsmodels.
- ``model_result``: The fitted statsmodels result object.
Example:
>>> import numpy as np, pandas as pd
>>> rng = np.random.default_rng(0)
>>> # Simulate AR(1) with switching mean and persistence
>>> returns = pd.Series(np.concatenate([
... rng.normal(0.002, 0.008, 250),
... rng.normal(-0.001, 0.020, 250),
... ]))
>>> result = fit_ms_autoregression(returns, k_regimes=2, order=1)
>>> print(result['transition_matrix'])
>>> print(result['regime_params'])
Notes:
Requires ``statsmodels >= 0.13``. Convergence can be slow for
higher-order AR models or many regimes. The Hamilton filter is
used for inference.
References:
Hamilton, J. D. (1989). "A New Approach to the Economic Analysis
of Nonstationary Time Series and the Business Cycle."
*Econometrica*, 57(2), 357-384.
See Also:
fit_ms_regression: Markov-switching regression (no AR terms).
fit_gaussian_hmm: HMM without autoregressive dynamics.
select_n_states: BIC-based state selection.
"""
from statsmodels.tsa.regime_switching.markov_autoregression import (
MarkovAutoregression,
)
y = coerce_array(endog, "endog")
model = MarkovAutoregression(
y,
k_regimes=k_regimes,
order=order,
switching_ar=switching_ar,
switching_variance=switching_variance,
exog_tvtp=exog_tvtp,
)
result = model.fit(disp=False)
# Extract regime-specific parameters
regime_params: dict[str, float] = {}
for k in range(k_regimes):
regime_params[f"mean_{k}"] = float(result.params[k])
if switching_variance:
for k in range(k_regimes):
regime_params[f"sigma2_{k}"] = float(result.params[k_regimes + k])
# AR coefficients
if switching_ar:
for k in range(k_regimes):
for j in range(order):
key = f"ar_{k}_{j + 1}"
# statsmodels stores AR params after mean + variance params
base_idx = k_regimes + (k_regimes if switching_variance else 1)
ar_idx = base_idx + k * order + j
if ar_idx < len(result.params):
regime_params[key] = float(result.params[ar_idx])
else:
for j in range(order):
key = f"ar_{j + 1}"
base_idx = k_regimes + (k_regimes if switching_variance else 1)
ar_idx = base_idx + j
if ar_idx < len(result.params):
regime_params[key] = float(result.params[ar_idx])
# Transition matrix -- statsmodels may return (K, K) or (K, K, T)
raw_transmat = np.array(result.regime_transition)
if raw_transmat.ndim == 3:
# Time-varying transition probs: average across time and transpose
transmat = raw_transmat.mean(axis=-1).T
elif raw_transmat.ndim == 2:
transmat = raw_transmat.T
else:
transmat = raw_transmat
# Ensure 2-D (K, K)
transmat = np.atleast_2d(transmat)
# Probabilities -- statsmodels returns (K, T); transpose to (T, K)
raw_smoothed = np.array(result.smoothed_marginal_probabilities)
raw_filtered = np.array(result.filtered_marginal_probabilities)
if (
raw_smoothed.shape[0] == k_regimes
and raw_smoothed.shape[0] != raw_smoothed.shape[1]
):
smoothed_probs = raw_smoothed.T
else:
smoothed_probs = raw_smoothed
if (
raw_filtered.shape[0] == k_regimes
and raw_filtered.shape[0] != raw_filtered.shape[1]
):
filtered_probs = raw_filtered.T
else:
filtered_probs = raw_filtered
states = np.argmax(smoothed_probs, axis=1)
# Expected durations
diag = np.diag(transmat).copy()
expected_durations = 1.0 / np.maximum(1.0 - diag, 1e-10)
return {
"smoothed_probs": smoothed_probs,
"filtered_probs": filtered_probs,
"states": states,
"transition_matrix": transmat,
"expected_durations": expected_durations,
"regime_params": regime_params,
"aic": float(result.aic),
"bic": float(result.bic),
"summary": str(result.summary()),
"model_result": result,
}
# ---------------------------------------------------------------------------
# BIC-Based State Selection
# ---------------------------------------------------------------------------
[docs]
@requires_extra("regimes")
def select_n_states(
returns: pd.Series | np.ndarray,
max_states: int = 5,
n_init: int = 10,
n_iter: int = 200,
random_state: int = 42,
) -> dict[str, Any]:
"""Select the optimal number of HMM states using the BIC criterion.
Fits Gaussian HMMs with ``n_states`` from 2 to ``max_states`` and
selects the model that minimises the Bayesian Information Criterion
(BIC).
**Why BIC over AIC for HMM state selection:**
The BIC imposes a stronger penalty on model complexity than the AIC,
which is critical for HMM selection because:
1. **HMM parameter count grows quadratically** with the number of
states (transition matrix has K*(K-1) free parameters), so without
a strong penalty the AIC tends to overfit by selecting too many
states.
2. **BIC is consistent**: as sample size grows, BIC selects the true
model (if it is in the candidate set). AIC tends to overfit even
asymptotically.
3. In practice, financial regime models with 2-3 states are usually
adequate; BIC correctly penalises 4+ state models that capture
noise rather than true regimes.
The BIC is defined as:
.. math::
\\text{BIC} = k \\ln(T) - 2 \\ln(\\hat{L})
where *k* is the number of free parameters, *T* is the sample size,
and :math:`\\hat{L}` is the maximised likelihood.
Parameters:
returns: Return series. NaN values are dropped.
max_states: Maximum number of states to consider (inclusive).
The search covers ``[2, 3, ..., max_states]``.
n_init: Number of random restarts per model fit.
n_iter: Maximum EM iterations per restart.
random_state: Base random seed.
Returns:
Dictionary with:
- ``optimal_n_states`` (int): The number of states with the
lowest BIC.
- ``scores`` (dict[int, float]): Mapping from *n_states* to
its BIC value.
- ``best_model`` (dict): The full result dict from
``fit_gaussian_hmm`` for the optimal model.
Example:
>>> result = select_n_states(returns, max_states=5)
>>> print(f"Optimal states: {result['optimal_n_states']}")
>>> print(result['scores'])
{2: -2345.6, 3: -2310.1, 4: -2290.3, 5: -2275.0}
See Also:
fit_gaussian_hmm: Fit a single HMM with a fixed number of states.
"""
scores: dict[int, float] = {}
best_bic = np.inf
best_n = 2
best_result: dict[str, Any] | None = None
for n in range(2, max_states + 1):
try:
result = fit_gaussian_hmm(
returns,
n_states=n,
n_init=n_init,
n_iter=n_iter,
random_state=random_state,
)
bic_val = result["bic"]
scores[n] = bic_val
if bic_val < best_bic:
best_bic = bic_val
best_n = n
best_result = result
except Exception:
# If fitting fails for this n, skip
scores[n] = np.inf
if best_result is None:
raise RuntimeError(
"All HMM fits failed. Check that the data has sufficient "
"variation for regime detection."
)
return {
"optimal_n_states": best_n,
"scores": scores,
"best_model": best_result,
}
# ---------------------------------------------------------------------------
# Multivariate HMM
# ---------------------------------------------------------------------------
[docs]
@requires_extra("regimes")
def fit_multivariate_hmm(
returns: pd.DataFrame,
n_states: int = 2,
covariance_type: str = "full",
n_init: int = 10,
n_iter: int = 200,
random_state: int = 42,
) -> dict[str, Any]:
"""Fit a multivariate Gaussian HMM for multi-asset regime detection.
Extends the univariate HMM to a vector of asset returns, enabling
detection of regimes that manifest as shifts in *cross-asset
correlations*, not just individual-asset volatility.
**Why multivariate regime detection matters:**
Single-asset HMMs detect volatility regimes but miss a crucial
phenomenon: **correlation regime shifts**. During crises, asset
correlations spike toward 1, destroying diversification precisely
when it is needed most. A multivariate HMM captures this by
estimating a full covariance matrix per regime.
Typical findings on equity/bond data:
- **Low-vol regime**: Low equity vol, low bond vol, moderate
negative equity-bond correlation (diversification works).
- **High-vol regime**: High equity vol, elevated bond vol,
correlations shift (equities become more correlated with each
other, equity-bond correlation may flip sign).
The model estimates per-regime:
- Mean return vector :math:`\\mu_k \\in \\mathbb{R}^d`
- Full covariance matrix :math:`\\Sigma_k \\in \\mathbb{R}^{d \\times d}`
- From which per-regime correlation matrices are derived.
Parameters:
returns: DataFrame of multi-asset returns, shape ``(T, d)``
where *d* is the number of assets. Column names are
preserved for labeling. NaN rows are dropped.
n_states: Number of hidden states (regimes).
covariance_type: Covariance parameterisation. ``'full'`` is
strongly recommended for capturing correlation shifts.
n_init: Number of random EM restarts.
n_iter: Maximum EM iterations per restart.
random_state: Random seed.
Returns:
Dictionary with:
- ``states`` (np.ndarray): Most likely state sequence (Viterbi),
shape ``(T,)``.
- ``state_probs`` (np.ndarray): Smoothed state probabilities,
shape ``(T, K)``.
- ``means`` (np.ndarray): Per-regime mean vectors,
shape ``(K, d)``.
- ``covariances`` (np.ndarray): Per-regime covariance matrices,
shape ``(K, d, d)``.
- ``per_regime_correlations`` (dict[int, np.ndarray]): Per-regime
correlation matrices, keyed by regime index.
- ``transition_matrix`` (np.ndarray): Transition matrix ``(K, K)``.
- ``log_likelihood`` (float): Total log-likelihood.
- ``aic`` (float): AIC.
- ``bic`` (float): BIC.
- ``model``: The fitted hmmlearn model.
- ``index`` (pd.Index | None): Preserved DatetimeIndex.
- ``columns`` (list[str]): Asset names.
Example:
>>> import pandas as pd, numpy as np
>>> rng = np.random.default_rng(42)
>>> # Low-vol regime: low correlation
>>> cov_low = np.array([[0.0001, 0.00002], [0.00002, 0.00015]])
>>> low = rng.multivariate_normal([0.001, 0.0008], cov_low, 300)
>>> # High-vol regime: high correlation
>>> cov_high = np.array([[0.0009, 0.0006], [0.0006, 0.0008]])
>>> high = rng.multivariate_normal([-0.001, -0.0005], cov_high, 200)
>>> df = pd.DataFrame(np.vstack([low, high]), columns=['SPY', 'QQQ'])
>>> result = fit_multivariate_hmm(df, n_states=2)
>>> print(result['per_regime_correlations'][0])
>>> print(result['per_regime_correlations'][1])
See Also:
fit_gaussian_hmm: Univariate HMM.
regime_conditional_moments: Per-regime moments from states.
"""
from hmmlearn.hmm import GaussianHMM
# Prepare data
if not isinstance(returns, pd.DataFrame):
raise TypeError("returns must be a pd.DataFrame for multivariate HMM")
clean = returns.dropna()
index = clean.index
columns = list(clean.columns)
X = clean.values.astype(np.float64)
T, d = X.shape
# Fit with multiple restarts
best_model = None
best_score = -np.inf
for i in range(n_init):
model = GaussianHMM(
n_components=n_states,
covariance_type=covariance_type,
n_iter=n_iter,
random_state=random_state + i,
)
try:
model.fit(X)
score = model.score(X)
if score > best_score:
best_score = score
best_model = model
except Exception:
continue
if best_model is None:
raise RuntimeError(f"All {n_init} EM restarts failed for multivariate HMM.")
model = best_model
states = model.predict(X)
state_probs = model.predict_proba(X)
# Extract means and covariances
means = model.means_ # (K, d)
if covariance_type == "full":
covariances = model.covars_ # (K, d, d)
elif covariance_type == "diag":
covariances = np.array([np.diag(model.covars_[k]) for k in range(n_states)])
elif covariance_type == "spherical":
covariances = np.array([np.eye(d) * model.covars_[k] for k in range(n_states)])
elif covariance_type == "tied":
covariances = np.array([model.covars_ for _ in range(n_states)])
else:
covariances = model.covars_
# Re-order states by ascending total variance (trace of cov)
total_var = np.array([np.trace(covariances[k]) for k in range(n_states)])
order = np.argsort(total_var)
state_map = {int(old): new for new, old in enumerate(order)}
states = np.array([state_map[int(s)] for s in states])
state_probs = state_probs[:, order]
means = means[order]
covariances = covariances[order]
transmat = model.transmat_[np.ix_(order, order)]
# Per-regime correlation matrices
per_regime_correlations: dict[int, np.ndarray] = {}
for k in range(n_states):
cov_k = covariances[k]
std_k = np.sqrt(np.diag(cov_k))
outer = np.outer(std_k, std_k)
outer = np.maximum(outer, 1e-12)
per_regime_correlations[k] = cov_k / outer
# Information criteria
if covariance_type == "full":
n_cov = n_states * d * (d + 1) // 2
elif covariance_type == "diag":
n_cov = n_states * d
elif covariance_type == "spherical":
n_cov = n_states
elif covariance_type == "tied":
n_cov = d * (d + 1) // 2
else:
n_cov = n_states * d * (d + 1) // 2
n_params = (
(n_states - 1) # initial state
+ n_states * (n_states - 1) # transition matrix
+ n_states * d # means
+ n_cov # covariances
)
log_likelihood = float(best_score)
aic = float(2 * n_params - 2 * log_likelihood)
bic = float(n_params * np.log(T) - 2 * log_likelihood)
return {
"states": states,
"state_probs": state_probs,
"means": means,
"covariances": covariances,
"per_regime_correlations": per_regime_correlations,
"transition_matrix": transmat,
"log_likelihood": log_likelihood,
"aic": aic,
"bic": bic,
"model": model,
"index": index,
"columns": columns,
}
# ---------------------------------------------------------------------------
# Regime-Conditional Moments
# ---------------------------------------------------------------------------
[docs]
def regime_conditional_moments(
returns: pd.DataFrame,
states: np.ndarray,
) -> dict[int, dict[str, np.ndarray]]:
"""Compute per-regime mean vector and covariance/correlation matrices.
After fitting a multivariate HMM (or assigning regimes by any
method), this function extracts the sample moments for each regime.
These moments are the building blocks for **regime-aware portfolio
construction**: the mean vector feeds into the expected-return
input and the covariance matrix replaces the unconditional covariance
in a mean-variance optimiser.
Parameters:
returns: DataFrame of multi-asset returns, shape ``(T, d)``.
states: Integer regime labels, shape ``(T,)``, aligned with
``returns``.
Returns:
Dictionary keyed by regime index (int), where each value is
a dict with:
- ``mean`` (np.ndarray): Sample mean vector, shape ``(d,)``.
- ``cov`` (np.ndarray): Sample covariance matrix,
shape ``(d, d)``.
- ``corr`` (np.ndarray): Sample correlation matrix,
shape ``(d, d)``.
Example:
>>> result = fit_multivariate_hmm(returns_df, n_states=2)
>>> moments = regime_conditional_moments(
... returns_df, result['states']
... )
>>> # Use regime-0 covariance for portfolio optimisation
>>> cov_bull = moments[0]['cov']
>>> mu_bull = moments[0]['mean']
See Also:
fit_multivariate_hmm: Multi-asset regime detection.
regime_aware_portfolio: Blend weights across regimes.
"""
if not isinstance(returns, pd.DataFrame):
raise TypeError("returns must be a pd.DataFrame")
R = returns.values.astype(np.float64)
s = np.asarray(states, dtype=int).flatten()
if len(R) != len(s):
raise ValueError(
f"returns and states must have the same length, "
f"got {len(R)} and {len(s)}."
)
unique_states = sorted(np.unique(s))
result: dict[int, dict[str, np.ndarray]] = {}
for state in unique_states:
mask = s == state
regime_R = R[mask]
mean_vec = np.mean(regime_R, axis=0)
cov_mat = np.cov(regime_R, rowvar=False, ddof=1)
# Ensure 2-D even for single-asset edge case
if cov_mat.ndim == 0:
cov_mat = np.array([[float(cov_mat)]])
# Correlation from covariance
std_vec = np.sqrt(np.diag(cov_mat))
outer = np.outer(std_vec, std_vec)
outer = np.maximum(outer, 1e-12)
corr_mat = cov_mat / outer
result[state] = {
"mean": mean_vec,
"cov": cov_mat,
"corr": corr_mat,
}
return result
# ---------------------------------------------------------------------------
# Internal helpers
# ---------------------------------------------------------------------------
def _max_drawdown_from_returns(returns: np.ndarray) -> float:
"""Compute maximum drawdown from a return array."""
if len(returns) < 2:
return 0.0
cumulative = np.cumprod(1.0 + returns)
running_max = np.maximum.accumulate(cumulative)
drawdowns = (cumulative - running_max) / np.maximum(running_max, 1e-12)
return float(np.min(drawdowns))
def _count_hmm_params(n_states: int, covariance_type: str) -> int:
"""Count free parameters in a univariate Gaussian HMM."""
# Initial state: K - 1
# Transition matrix: K * (K - 1)
# Means: K
# Covariances depend on type (univariate case)
n_init_params = n_states - 1
n_trans_params = n_states * (n_states - 1)
n_mean_params = n_states
if covariance_type in ("full", "diag", "diagonal", "spherical"):
n_cov_params = n_states
elif covariance_type == "tied":
n_cov_params = 1
else:
n_cov_params = n_states
return n_init_params + n_trans_params + n_mean_params + n_cov_params
def _extract_variances(
covars: np.ndarray,
n_states: int,
covariance_type: str,
) -> np.ndarray:
"""Extract per-state variance from hmmlearn covars_ for univariate data.
hmmlearn stores covariances differently depending on covariance_type:
- full: (n_states, n_features, n_features)
- diag: (n_states, n_features)
- spherical: (n_states,)
- tied: (n_features, n_features) -- same for all states
Returns a flat 1-D float array of shape (n_states,).
"""
if covariance_type == "spherical":
result = np.asarray(covars, dtype=np.float64).flatten()[:n_states]
elif covariance_type in ("diag", "diagonal"):
# Shape (n_states, n_features) -- take first feature
result = np.array([float(covars[k].flatten()[0]) for k in range(n_states)])
elif covariance_type == "tied":
# Same covariance for all states
var = float(np.asarray(covars).flatten()[0])
result = np.full(n_states, var)
else:
# full: (n_states, n_features, n_features) -- extract [0,0]
result = np.array(
[float(np.asarray(covars[k]).flatten()[0]) for k in range(n_states)]
)
return result
def _compute_steady_state(transmat: np.ndarray) -> np.ndarray:
"""Compute the ergodic (stationary) distribution of a transition matrix.
Solves pi @ P = pi, sum(pi) = 1 using the eigenvalue method.
"""
K = transmat.shape[0]
# Find left eigenvector with eigenvalue 1
eigenvalues, eigenvectors = np.linalg.eig(transmat.T)
# Find the index of eigenvalue closest to 1
idx = np.argmin(np.abs(eigenvalues - 1.0))
steady = np.real(eigenvectors[:, idx])
steady = np.abs(steady) # Ensure non-negative
total = steady.sum()
if total > 0:
steady /= total
else:
steady = np.ones(K) / K
return steady
def _avg_regime_duration(states: np.ndarray, target_state: int) -> float:
"""Compute average consecutive duration in a particular regime."""
durations: list[int] = []
current_duration = 0
for s in states:
if s == target_state:
current_duration += 1
else:
if current_duration > 0:
durations.append(current_duration)
current_duration = 0
if current_duration > 0:
durations.append(current_duration)
return float(np.mean(durations)) if durations else 0.0
def _skewness(x: np.ndarray) -> float:
"""Compute sample skewness."""
n = len(x)
if n < 3:
return 0.0
m = np.mean(x)
s = np.std(x, ddof=1)
if s < 1e-12:
return 0.0
return float(np.mean(((x - m) / s) ** 3) * n / ((n - 1) * (n - 2) / n))
def _kurtosis(x: np.ndarray) -> float:
"""Compute sample excess kurtosis."""
n = len(x)
if n < 4:
return 0.0
m = np.mean(x)
s = np.std(x, ddof=1)
if s < 1e-12:
return 0.0
return float(np.mean(((x - m) / s) ** 4) - 3.0)