Source code for wraquant.regimes.hmm

"""Hidden Markov Model and mixture-model regime detection.

This module provides production-quality regime detection for financial
time series using Gaussian Hidden Markov Models (HMMs), Markov-switching
models, and Gaussian mixture models (GMMs).

**When to use each model:**

- **Gaussian HMM** (``fit_gaussian_hmm``): The workhorse. Models returns as
  emissions from a latent Markov chain. Captures temporal persistence of
  regimes (markets stay in bull/bear for extended periods).
- **Markov-Switching Regression** (``fit_ms_regression``): When you want
  regime-dependent regression coefficients or intercepts. Useful for
  modeling time-varying factor exposures.
- **Gaussian Mixture** (``gaussian_mixture_regimes``): Quick classification
  without temporal structure. Good for labeling return distributions when
  you do not care about transition dynamics.

References:
    Hamilton, J. D. (1989). "A New Approach to the Economic Analysis of
    Nonstationary Time Series and the Business Cycle." *Econometrica*, 57(2).

    Ang, A. & Bekaert, G. (2002). "Regime Switches in Interest Rates."
    *Journal of Business & Economic Statistics*, 20(2).
"""

from __future__ import annotations

from typing import Any

import numpy as np
import pandas as pd

from wraquant.core._coerce import coerce_array
from wraquant.core.decorators import requires_extra

# ---------------------------------------------------------------------------
# Gaussian HMM
# ---------------------------------------------------------------------------


[docs] @requires_extra("regimes") def fit_gaussian_hmm( returns: pd.Series | np.ndarray, n_states: int = 2, covariance_type: str = "full", n_init: int = 10, n_iter: int = 200, tol: float = 1e-4, random_state: int = 42, ) -> dict[str, Any]: """Fit a Gaussian Hidden Markov Model to financial return data. A Gaussian HMM assumes that observed returns are generated by a latent (hidden) Markov chain with *K* states. Each state *k* has its own Gaussian emission distribution N(mu_k, sigma_k). The model learns: 1. **Transition matrix** A[i,j] = P(s_t = j | s_{t-1} = i) 2. **Emission means** mu_k for each state 3. **Emission covariances** Sigma_k for each state 4. **Initial state distribution** pi_k = P(s_0 = k) The EM algorithm (Baum-Welch) is used for parameter estimation. Because EM can converge to local optima, multiple random restarts (``n_init``) are performed and the model with the highest log-likelihood is retained. **Typical interpretation for 2-state model on equity returns:** - **Low-volatility state** ("risk-on" / bull): Positive mean return, low standard deviation, high Sharpe ratio. Markets are calm. - **High-volatility state** ("risk-off" / crisis): Near-zero or negative mean return, high standard deviation. Markets are stressed. States are automatically re-ordered so that state 0 has the lowest volatility and state K-1 has the highest volatility. This ensures consistent labeling across runs. Parameters: returns: Return series (simple or log returns). If a pandas Series, NaN values are dropped and the index is preserved. n_states: Number of hidden states. Common choices are 2 (bull/bear) or 3 (bull/neutral/bear). Values above 5 risk overfitting. covariance_type: Type of covariance matrix. One of ``'full'``, ``'diag'``, ``'spherical'``, ``'tied'``. n_init: Number of random restarts for EM. The model with the highest log-likelihood is selected. More restarts reduce the chance of a poor local optimum. n_iter: Maximum number of EM iterations per restart. tol: Convergence threshold for EM. random_state: Base random seed for reproducibility. Returns: Dictionary with the following keys: - ``states`` (np.ndarray): Most likely state sequence (Viterbi), shape ``(T,)``, integers in ``[0, K-1]``. - ``state_probs`` (np.ndarray): Smoothed state probabilities, shape ``(T, K)``. Each row sums to 1. - ``transition_matrix`` (np.ndarray): Transition probability matrix, shape ``(K, K)``. Row *i* sums to 1. - ``means`` (np.ndarray): Emission means, shape ``(K,)``. - ``covariances`` (np.ndarray): Emission covariances, shape ``(K,)`` (variance per state for univariate data). - ``startprob`` (np.ndarray): Initial state distribution, shape ``(K,)``. - ``log_likelihood`` (float): Total log-likelihood of the data. - ``aic`` (float): Akaike Information Criterion. - ``bic`` (float): Bayesian Information Criterion. - ``n_states`` (int): Number of states. - ``steady_state`` (np.ndarray): Ergodic (stationary) distribution of the Markov chain, shape ``(K,)``. - ``avg_duration`` (np.ndarray): Expected duration in each state, ``1 / (1 - A[i,i])``, shape ``(K,)``. - ``model`` (GaussianHMM): The fitted hmmlearn model object. - ``index`` (pd.Index | None): Original index if input was a pandas Series. Example: >>> import numpy as np >>> import pandas as pd >>> rng = np.random.default_rng(42) >>> # Simulate 2-regime returns >>> low_vol = rng.normal(0.001, 0.008, 300) >>> high_vol = rng.normal(-0.002, 0.025, 200) >>> returns = pd.Series(np.concatenate([low_vol, high_vol])) >>> result = fit_gaussian_hmm(returns, n_states=2) >>> print(result['means']) # Per-state means >>> print(result['transition_matrix']) # Transition probabilities >>> print(result['steady_state']) # Long-run regime probabilities Notes: The model uses hmmlearn's ``GaussianHMM``. For very long series (>10,000 observations), consider downsampling or using the variational Bayes variant. See Also: fit_ms_regression: Markov-switching regression with statsmodels. gaussian_mixture_regimes: GMM-based regime classification. regime_statistics: Compute per-regime financial statistics. regime_transition_analysis: Analyze regime transition dynamics. wraquant.regimes.base.detect_regimes: Unified high-level interface. wraquant.backtest.position.regime_signal_filter: Gate signals by regime. wraquant.viz.dashboard.regime_dashboard: Visualize regime analysis. """ from hmmlearn.hmm import GaussianHMM # Prepare data -- preserve index if input is a Series index = returns.index if isinstance(returns, pd.Series) else None arr = coerce_array(returns, "returns") mask = ~np.isnan(arr) arr = arr[mask] if index is not None: # Align index with non-NaN entries full_idx = returns.dropna().index if len(full_idx) == len(arr): index = full_idx X = arr.reshape(-1, 1) T = len(X) # Fit with multiple restarts and keep the best best_model = None best_score = -np.inf for i in range(n_init): model = GaussianHMM( n_components=n_states, covariance_type=covariance_type, n_iter=n_iter, tol=tol, random_state=random_state + i, ) try: model.fit(X) score = model.score(X) if score > best_score: best_score = score best_model = model except Exception: # EM can fail on degenerate initialisations; skip and try again continue if best_model is None: raise RuntimeError( f"All {n_init} EM restarts failed. Try different n_states or " "check that the data has sufficient variation." ) model = best_model # Decode: Viterbi for most likely states, posterior for probabilities states = model.predict(X) state_probs = model.predict_proba(X) # Re-order states by ascending volatility for consistent labeling variances = _extract_variances(model.covars_, n_states, covariance_type) order = np.argsort(variances) # Remap states state_map = {int(old): new for new, old in enumerate(order)} states = np.array([state_map[int(s)] for s in states]) state_probs = state_probs[:, order] means = model.means_.flatten()[order] covariances = variances[order] startprob = model.startprob_[order] # Reorder transition matrix transmat = model.transmat_[np.ix_(order, order)] # Information criteria # Number of free parameters for Gaussian HMM n_params = _count_hmm_params(n_states, covariance_type) log_likelihood = float(best_score) aic = float(2 * n_params - 2 * log_likelihood) bic = float(n_params * np.log(T) - 2 * log_likelihood) # Steady-state (ergodic) distribution from transition matrix steady_state = _compute_steady_state(transmat) # Expected duration in each state: 1 / (1 - p_ii) self_transition = np.diag(transmat) avg_duration = 1.0 / np.maximum(1.0 - self_transition, 1e-10) return { "states": states, "state_probs": state_probs, "transition_matrix": transmat, "means": means, "covariances": covariances, "startprob": startprob, "log_likelihood": log_likelihood, "aic": aic, "bic": bic, "n_states": n_states, "steady_state": steady_state, "avg_duration": avg_duration, "model": model, "index": index, }
# --------------------------------------------------------------------------- # Legacy wrapper (backward compatibility) # ---------------------------------------------------------------------------
[docs] @requires_extra("regimes") def fit_hmm(returns: pd.Series, n_states: int = 2) -> Any: """Fit a Gaussian HMM to return data (legacy interface). This is a thin compatibility wrapper around ``fit_gaussian_hmm``. New code should use ``fit_gaussian_hmm`` directly for richer output. Parameters: returns: Simple return series. n_states: Number of hidden states. Returns: Fitted ``hmmlearn.hmm.GaussianHMM`` model. See Also: fit_gaussian_hmm: Full-featured HMM fitting with rich output. """ result = fit_gaussian_hmm(returns, n_states=n_states, n_init=5) return result["model"]
# --------------------------------------------------------------------------- # Predict regime # ---------------------------------------------------------------------------
[docs] def predict_regime( model: Any, returns: pd.Series | np.ndarray, ) -> dict[str, Any]: """Predict regime states and probabilities for new data. Given a fitted HMM (from ``fit_gaussian_hmm`` or hmmlearn directly), decode the most likely hidden state sequence and compute posterior state probabilities for new observations. Parameters: model: A fitted HMM model with ``predict`` and ``predict_proba`` methods (e.g., from hmmlearn), **or** the dict returned by ``fit_gaussian_hmm`` (in which case the ``'model'`` key is used). returns: Return series to classify. NaN values are dropped. Returns: Dictionary with: - ``states`` (np.ndarray): Most likely state at each time step. - ``state_probs`` (np.ndarray): Posterior probabilities, shape ``(T, K)``. - ``index`` (pd.Index | None): Preserved index if input was a pandas Series. Example: >>> result = fit_gaussian_hmm(train_returns, n_states=2) >>> pred = predict_regime(result, test_returns) >>> print(pred['states']) >>> print(pred['state_probs'][:5]) """ # Accept dict output from fit_gaussian_hmm if isinstance(model, dict) and "model" in model: model = model["model"] index = returns.index if isinstance(returns, pd.Series) else None arr = coerce_array(returns, "returns") mask = ~np.isnan(arr) arr = arr[mask] if index is not None: full_idx = returns.dropna().index if len(full_idx) == len(arr): index = full_idx X = arr.reshape(-1, 1) states = model.predict(X) state_probs = model.predict_proba(X) return { "states": states, "state_probs": state_probs, "index": index, }
# --------------------------------------------------------------------------- # Markov-Switching Regression (statsmodels) # ---------------------------------------------------------------------------
[docs] def fit_ms_regression( endog: pd.Series | np.ndarray, k_regimes: int = 2, trend: str = "c", switching_variance: bool = True, exog: pd.DataFrame | np.ndarray | None = None, n_iter: int = 200, ) -> dict[str, Any]: """Fit a Markov-switching regression model using statsmodels. This wraps ``statsmodels.tsa.regime_switching.MarkovRegression``, which estimates a linear regression where the intercept (and optionally variance) can switch between regimes according to a hidden Markov chain. **When to use:** When you want to model how the data-generating process changes across regimes, including regime-dependent means and volatilities. More interpretable than a raw HMM for economic modeling because you get regression coefficients per regime. The model: .. math:: y_t = \\mu_{s_t} + x_t' \\beta_{s_t} + \\sigma_{s_t} \\epsilon_t where :math:`s_t \\in \\{0, 1, \\ldots, K-1\\}` follows a first-order Markov chain with transition matrix :math:`P`. Parameters: endog: Endogenous (dependent) variable. Typically a return series. k_regimes: Number of regimes (hidden states). trend: Trend specification for the mean equation. ``'c'`` (constant), ``'t'`` (linear trend), ``'ct'`` (both), ``'n'`` (no trend). switching_variance: If True, the variance switches across regimes. Almost always desirable for financial data. exog: Optional exogenous regressors, shape ``(T, p)``. The coefficients on these regressors are **not** switching by default (the intercept and variance switch). n_iter: Maximum number of EM iterations. Returns: Dictionary with: - ``smoothed_probs`` (np.ndarray): Smoothed regime probabilities, shape ``(T, K)``. ``smoothed_probs[:, k]`` is the probability of being in regime *k* at each time step given all data. - ``filtered_probs`` (np.ndarray): Filtered regime probabilities (using only past data), shape ``(T, K)``. - ``states`` (np.ndarray): Most likely state sequence (argmax of smoothed probabilities), shape ``(T,)``. - ``transition_matrix`` (np.ndarray): Transition matrix, shape ``(K, K)``. - ``regime_params`` (dict): Regime-specific parameters. Keys are ``'mean_0'``, ``'mean_1'``, ..., ``'sigma2_0'``, etc. - ``log_likelihood`` (float): Log-likelihood of the fitted model. - ``aic`` (float): Akaike Information Criterion. - ``bic`` (float): Bayesian Information Criterion. - ``summary`` (str): Text summary from statsmodels. - ``model_result`` (MarkovRegressionResults): The full statsmodels result object. Example: >>> import numpy as np, pandas as pd >>> rng = np.random.default_rng(0) >>> returns = pd.Series(np.concatenate([ ... rng.normal(0.001, 0.01, 200), ... rng.normal(-0.002, 0.03, 200), ... ])) >>> result = fit_ms_regression(returns, k_regimes=2) >>> print(result['transition_matrix']) >>> print(result['regime_params']) Notes: Requires ``statsmodels >= 0.13``. The model uses Hamilton's (1989) filter for inference. Convergence can be sensitive to starting values. See Also: fit_gaussian_hmm: HMM using hmmlearn (more flexible emission distributions). regime_statistics: Compute financial statistics per regime. """ from statsmodels.tsa.regime_switching.markov_regression import ( MarkovRegression, ) y = coerce_array(endog, "endog") model = MarkovRegression( y, k_regimes=k_regimes, trend=trend, switching_variance=switching_variance, exog=exog, ) result = model.fit(maxiter=n_iter, disp=False) # Extract regime-specific parameters regime_params: dict[str, float] = {} for k in range(k_regimes): regime_params[f"mean_{k}"] = float(result.params[k]) if switching_variance: for k in range(k_regimes): regime_params[f"sigma2_{k}"] = float(result.params[k_regimes + k]) # Transition matrix from result transmat = np.array(result.regime_transition).T # statsmodels returns (K, T) shaped arrays; transpose to (T, K) raw_smoothed = np.array(result.smoothed_marginal_probabilities) raw_filtered = np.array(result.filtered_marginal_probabilities) # Ensure shape is (T, K) regardless of statsmodels version if ( raw_smoothed.shape[0] == k_regimes and raw_smoothed.shape[0] != raw_smoothed.shape[1] ): smoothed_probs = raw_smoothed.T else: smoothed_probs = raw_smoothed if ( raw_filtered.shape[0] == k_regimes and raw_filtered.shape[0] != raw_filtered.shape[1] ): filtered_probs = raw_filtered.T else: filtered_probs = raw_filtered states = np.argmax(smoothed_probs, axis=1) return { "smoothed_probs": smoothed_probs, "filtered_probs": filtered_probs, "states": states, "transition_matrix": transmat, "regime_params": regime_params, "log_likelihood": float(result.llf), "aic": float(result.aic), "bic": float(result.bic), "summary": str(result.summary()), "model_result": result, }
# --------------------------------------------------------------------------- # Regime statistics # ---------------------------------------------------------------------------
[docs] def regime_statistics( returns: pd.Series | np.ndarray, states: np.ndarray, ) -> pd.DataFrame: """Compute descriptive statistics for each detected regime. After fitting an HMM or GMM, this function summarizes the financial characteristics of each regime. Typical interpretation for a 2-state model on equity returns: - **Regime 0** (low vol): "risk-on" or "bull market" -- positive mean return, low volatility, high Sharpe ratio. - **Regime 1** (high vol): "risk-off" or "crisis" -- negative or zero mean, high volatility, negative Sharpe. Parameters: returns: Return series aligned with ``states``. Can be a pandas Series or a numpy array. states: Integer regime labels (0, 1, ..., K-1), same length as ``returns``. Returns: pd.DataFrame with one row per regime and columns: ``mean``, ``std``, ``sharpe``, ``sortino_ratio``, ``min``, ``max``, ``skewness``, ``kurtosis``, ``max_drawdown``, ``VaR_95``, ``CVaR_95``, ``n_observations``, ``pct_time``, ``avg_duration``. - ``sharpe``: Annualized Sharpe ratio assuming 252 trading days. - ``sortino_ratio``: Annualized Sortino ratio (downside deviation only, 252 trading days). - ``max_drawdown``: Maximum peak-to-trough drawdown within the regime's return segments. - ``VaR_95``: 5th percentile of returns (historical Value at Risk at the 95 % confidence level). - ``CVaR_95``: Mean of returns below the VaR_95 threshold (Conditional Value at Risk / Expected Shortfall). - ``avg_duration``: Mean consecutive time steps spent in the regime. Example: >>> result = fit_gaussian_hmm(returns, n_states=2) >>> stats = regime_statistics(returns, result['states']) >>> print(stats) mean std sharpe ... avg_duration regime 0 0.000523 0.008123 1.0223 ... 15.3 1 -0.001842 0.024561 -1.1917 ... 7.8 See Also: fit_gaussian_hmm: Fit HMM to obtain state assignments. wraquant.regimes.base.regime_report: Comprehensive regime analysis. wraquant.backtest.position.regime_conditional_sizing: Adjust positions by regime. regime_transition_analysis: Analyze transition dynamics. """ r = coerce_array(returns, "returns") s = np.asarray(states, dtype=int).flatten() if len(r) != len(s): raise ValueError( f"returns and states must have the same length, " f"got {len(r)} and {len(s)}." ) total = len(r) unique_states = sorted(np.unique(s)) records = [] for state in unique_states: mask = s == state regime_r = r[mask] n_obs = int(mask.sum()) mean_ret = float(np.mean(regime_r)) std_ret = float(np.std(regime_r, ddof=1)) if n_obs > 1 else 0.0 # Use canonical Sharpe implementation from wraquant.risk.metrics import sharpe_ratio as _sharpe_ratio sharpe = ( _sharpe_ratio(pd.Series(regime_r)) if n_obs > 2 else 0.0 ) # Sortino ratio (downside deviation only) downside = regime_r[regime_r < 0] if len(downside) > 1: downside_std = float(np.sqrt(np.mean(downside**2))) sortino = ( float(mean_ret / downside_std * np.sqrt(252)) if downside_std > 1e-12 else 0.0 ) else: sortino = 0.0 # Max drawdown within the regime's contiguous segments max_dd = _max_drawdown_from_returns(regime_r) # VaR and CVaR at 95 % confidence var_95 = float(np.percentile(regime_r, 5)) cvar_mask = regime_r <= var_95 cvar_95 = float(np.mean(regime_r[cvar_mask])) if cvar_mask.any() else var_95 # Average consecutive duration in this regime avg_dur = _avg_regime_duration(s, state) records.append( { "regime": state, "mean": mean_ret, "std": std_ret, "sharpe": sharpe, "sortino_ratio": sortino, "min": float(np.min(regime_r)), "max": float(np.max(regime_r)), "skewness": float(_skewness(regime_r)), "kurtosis": float(_kurtosis(regime_r)), "max_drawdown": max_dd, "VaR_95": var_95, "CVaR_95": cvar_95, "n_observations": n_obs, "pct_time": float(n_obs / total) if total > 0 else 0.0, "avg_duration": avg_dur, } ) return pd.DataFrame(records).set_index("regime")
# --------------------------------------------------------------------------- # Regime transition analysis # ---------------------------------------------------------------------------
[docs] def regime_transition_analysis( states: np.ndarray, transition_matrix: np.ndarray | None = None, ) -> dict[str, Any]: """Analyze regime transition dynamics. Computes empirical and model-based statistics about how regimes switch. Useful for understanding regime persistence and expected timing of regime changes. Parameters: states: Integer regime labels, shape ``(T,)``. transition_matrix: Model-estimated transition matrix, shape ``(K, K)``. If None, the empirical transition matrix is computed from ``states``. Returns: Dictionary with: - ``empirical_transition_matrix`` (np.ndarray): Transition counts normalised to probabilities, shape ``(K, K)``. - ``transition_matrix`` (np.ndarray): Model or empirical transition matrix used for analysis. - ``avg_duration`` (np.ndarray): Expected duration in each regime (time steps), ``1 / (1 - P[i,i])``. - ``steady_state`` (np.ndarray): Ergodic distribution from the transition matrix. - ``regime_counts`` (dict): Number of distinct "visits" to each regime. - ``regime_durations`` (dict): List of consecutive durations for each regime. - ``expected_return_time`` (np.ndarray): Expected number of steps to return to each regime from stationarity. Example: >>> result = fit_gaussian_hmm(returns, n_states=2) >>> analysis = regime_transition_analysis( ... result['states'], result['transition_matrix'] ... ) >>> print(f"Bull market lasts ~{analysis['avg_duration'][0]:.0f} days") >>> print(f"Steady state: {analysis['steady_state']}") See Also: fit_gaussian_hmm: Fit HMM to obtain states and transition matrix. regime_statistics: Financial statistics per regime. """ s = np.asarray(states, dtype=int).flatten() unique_states = sorted(np.unique(s)) K = len(unique_states) # Empirical transition matrix emp_transmat = np.zeros((K, K)) state_to_idx = {state: i for i, state in enumerate(unique_states)} for t in range(len(s) - 1): i = state_to_idx[s[t]] j = state_to_idx[s[t + 1]] emp_transmat[i, j] += 1 # Normalise rows row_sums = emp_transmat.sum(axis=1, keepdims=True) row_sums = np.maximum(row_sums, 1e-10) emp_transmat = emp_transmat / row_sums # Use model transition matrix if provided, else empirical transmat = transition_matrix if transition_matrix is not None else emp_transmat # Expected duration: 1 / (1 - p_ii) self_transition = np.diag(transmat) avg_duration = 1.0 / np.maximum(1.0 - self_transition, 1e-10) # Steady state steady_state = _compute_steady_state(transmat) # Regime visits and durations regime_counts: dict[int, int] = {st: 0 for st in unique_states} regime_durations: dict[int, list[int]] = {st: [] for st in unique_states} if len(s) > 0: current_state = s[0] current_duration = 1 regime_counts[current_state] = 1 for t in range(1, len(s)): if s[t] == current_state: current_duration += 1 else: regime_durations[current_state].append(current_duration) current_state = s[t] current_duration = 1 regime_counts[current_state] += 1 regime_durations[current_state].append(current_duration) # Expected return time: 1 / pi_i expected_return = np.zeros(K) for i in range(K): expected_return[i] = ( 1.0 / steady_state[i] if steady_state[i] > 1e-10 else np.inf ) return { "empirical_transition_matrix": emp_transmat, "transition_matrix": transmat, "avg_duration": avg_duration, "steady_state": steady_state, "regime_counts": regime_counts, "regime_durations": regime_durations, "expected_return_time": expected_return, }
# --------------------------------------------------------------------------- # Gaussian Mixture Regimes # ---------------------------------------------------------------------------
[docs] def gaussian_mixture_regimes( returns: pd.Series | np.ndarray, n_components: int = 2, covariance_type: str = "full", n_init: int = 10, random_state: int = 42, ) -> dict[str, Any]: """Classify returns into regimes using a Gaussian Mixture Model. Unlike HMMs, GMMs do not model temporal dependence between states. Each observation is independently assigned to the most likely Gaussian component. This makes GMMs: - **Faster** to fit than HMMs. - **Simpler** to interpret (no transition dynamics). - **Less powerful** for capturing regime persistence. Good for quick regime labeling or as a baseline before fitting an HMM. The model: .. math:: p(r_t) = \\sum_{k=1}^{K} \\pi_k \\, \\mathcal{N}(r_t; \\mu_k, \\Sigma_k) Parameters: returns: Return series. NaN values are dropped. n_components: Number of mixture components (regimes). covariance_type: Covariance type (``'full'``, ``'tied'``, ``'diag'``, ``'spherical'``). n_init: Number of random restarts. random_state: Random seed. Returns: Dictionary with: - ``states`` (np.ndarray): Component assignments, shape ``(T,)``. - ``state_probs`` (np.ndarray): Posterior probabilities, shape ``(T, K)``. - ``means`` (np.ndarray): Component means, shape ``(K,)``. - ``covariances`` (np.ndarray): Component variances, shape ``(K,)``. - ``weights`` (np.ndarray): Mixing proportions, shape ``(K,)``. - ``aic`` (float): AIC. - ``bic`` (float): BIC. - ``model`` (GaussianMixture): Fitted sklearn model. - ``index`` (pd.Index | None): Preserved index. Example: >>> result = gaussian_mixture_regimes(returns, n_components=2) >>> print(result['means']) # Component means >>> print(result['weights']) # Mixing proportions >>> # Combine with regime_statistics for financial interpretation >>> stats = regime_statistics(returns, result['states']) See Also: fit_gaussian_hmm: HMM with temporal regime dynamics. regime_statistics: Per-regime financial statistics. """ from sklearn.mixture import GaussianMixture index = returns.index if isinstance(returns, pd.Series) else None arr = coerce_array(returns, "returns") mask = ~np.isnan(arr) arr = arr[mask] if index is not None: full_idx = returns.dropna().index if len(full_idx) == len(arr): index = full_idx X = arr.reshape(-1, 1) gmm = GaussianMixture( n_components=n_components, covariance_type=covariance_type, n_init=n_init, random_state=random_state, ) gmm.fit(X) states = gmm.predict(X) state_probs = gmm.predict_proba(X) # Re-order by ascending variance variances = gmm.covariances_.flatten()[:n_components] order = np.argsort(variances) state_map = {int(old): new for new, old in enumerate(order)} states = np.array([state_map[int(s)] for s in states]) state_probs = state_probs[:, order] means = gmm.means_.flatten()[order] covariances = variances[order] weights = gmm.weights_[order] return { "states": states, "state_probs": state_probs, "means": means, "covariances": covariances, "weights": weights, "aic": float(gmm.aic(X)), "bic": float(gmm.bic(X)), "model": gmm, "index": index, }
# --------------------------------------------------------------------------- # Rolling Regime Probability # ---------------------------------------------------------------------------
[docs] @requires_extra("regimes") def rolling_regime_probability( returns: pd.Series, n_states: int = 2, window: int | None = None, min_window: int = 60, n_init: int = 5, ) -> pd.DataFrame: """Compute time-varying regime probabilities using rolling/expanding HMM. Fits a Gaussian HMM at each time step using either a rolling or expanding window, producing a time series of regime probabilities that adapts to new data. Useful for real-time regime monitoring. Parameters: returns: Return series. n_states: Number of hidden states. window: Rolling window size. If None, uses an expanding window (all data up to time *t*). min_window: Minimum number of observations required before fitting the first model. n_init: Number of EM restarts per window. Returns: pd.DataFrame with columns ``prob_0``, ``prob_1``, ..., ``prob_{K-1}`` indexed by the return series index. Rows before ``min_window`` are NaN. Example: >>> probs = rolling_regime_probability(returns, n_states=2, window=252) >>> probs.plot(title="Regime Probabilities Over Time") Notes: This is computationally expensive. For 1000 observations with ``window=252``, approximately 750 HMM fits are performed. Consider using a large ``window`` or expanding mode for efficiency. See Also: fit_gaussian_hmm: Single-shot HMM fitting. """ clean = returns.dropna() T = len(clean) values = clean.values.reshape(-1, 1) prob_cols = [f"prob_{k}" for k in range(n_states)] result = pd.DataFrame(np.nan, index=clean.index, columns=prob_cols, dtype=float) for t in range(min_window, T): if window is not None: start = max(0, t - window) else: start = 0 X_window = values[start : t + 1] try: hmm_result = fit_gaussian_hmm( X_window.flatten(), n_states=n_states, n_init=n_init, n_iter=100, ) # Take the probability at the last time step last_probs = hmm_result["state_probs"][-1] result.iloc[t] = last_probs except Exception: # If EM fails for this window, leave as NaN pass return result
# --------------------------------------------------------------------------- # Regime-Aware Portfolio # ---------------------------------------------------------------------------
[docs] def regime_aware_portfolio( regime_probs: np.ndarray, regime_weights: np.ndarray, ) -> np.ndarray: """Compute blended portfolio weights using regime probabilities. Given the current probability of being in each regime and the optimal portfolio weights for each regime, compute a probability-weighted blend. This bridges regime detection (``regimes/``) with portfolio optimisation (``opt/``). The blended weight for asset *j* is: .. math:: w_j = \\sum_{k=1}^{K} P(s_t = k) \\cdot w_j^{(k)} Parameters: regime_probs: Current regime probabilities, shape ``(K,)``. Must sum to 1. regime_weights: Optimal weights for each regime, shape ``(K, n_assets)``. Row *k* contains the weights for regime *k*. Returns: np.ndarray: Blended portfolio weights, shape ``(n_assets,)``. Sums to 1 if each row of ``regime_weights`` sums to 1. Example: >>> # 60% probability of bull, 40% probability of bear >>> probs = np.array([0.6, 0.4]) >>> # Bull: 80% equity / 20% bonds; Bear: 30% equity / 70% bonds >>> weights = np.array([[0.8, 0.2], [0.3, 0.7]]) >>> blended = regime_aware_portfolio(probs, weights) >>> print(blended) # [0.6*0.8 + 0.4*0.3, 0.6*0.2 + 0.4*0.7] [0.6 0.4] See Also: fit_gaussian_hmm: Obtain regime probabilities. rolling_regime_probability: Time-varying regime probabilities. """ probs = np.asarray(regime_probs, dtype=np.float64) weights = np.asarray(regime_weights, dtype=np.float64) if probs.ndim != 1: raise ValueError(f"regime_probs must be 1-D, got shape {probs.shape}") if weights.ndim != 2: raise ValueError(f"regime_weights must be 2-D, got shape {weights.shape}") if probs.shape[0] != weights.shape[0]: raise ValueError( f"Number of regimes in probs ({probs.shape[0]}) and weights " f"({weights.shape[0]}) must match." ) return probs @ weights
# --------------------------------------------------------------------------- # Markov-Switching Autoregression # ---------------------------------------------------------------------------
[docs] def fit_ms_autoregression( endog: pd.Series | np.ndarray, k_regimes: int = 2, order: int = 1, switching_ar: bool = True, switching_variance: bool = True, exog_tvtp: np.ndarray | pd.DataFrame | None = None, ) -> dict[str, Any]: """Fit a Markov-switching autoregression (MS-AR) model. This wraps ``statsmodels.tsa.regime_switching.MarkovAutoregression``, which extends the standard Markov-switching regression to include autoregressive lags whose coefficients can also switch across regimes. **Hamilton (1989) framework:** The seminal Hamilton (1989) paper introduced the Markov-switching autoregressive model to study business-cycle dynamics. The key insight is that the data-generating process can shift between distinct regimes (e.g., expansion vs. recession) governed by a latent Markov chain, and within each regime the series follows an AR(p) process with regime-specific parameters: .. math:: y_t = \\mu_{s_t} + \\phi_{1,s_t}\\,(y_{t-1} - \\mu_{s_{t-1}}) + \\cdots + \\phi_{p,s_t}\\,(y_{t-p} - \\mu_{s_{t-p}}) + \\sigma_{s_t}\\,\\epsilon_t **When to use MS-AR vs. plain HMM:** - Use **MS-AR** when the series exhibits serial correlation *within* each regime (e.g., GDP growth, interest rates, momentum returns). The autoregressive terms capture local dynamics that a plain HMM ignores. - Use a **plain Gaussian HMM** (``fit_gaussian_hmm``) when returns are approximately i.i.d. within each regime and only the mean and variance shift. This is typical for daily equity returns. - Use **MS-Regression** (``fit_ms_regression``) when you want regime-dependent intercepts/exogenous coefficients but no AR lags. Parameters: endog: Endogenous (dependent) variable. Typically a return or growth-rate series. k_regimes: Number of regimes (hidden states). 2 is the most common choice (e.g., expansion/recession). order: Autoregressive lag order *p*. Higher orders capture longer memory within each regime. switching_ar: If True, the AR coefficients switch across regimes. If False, only the intercept and/or variance switch. switching_variance: If True, the innovation variance switches across regimes. Almost always desirable for financial data. exog_tvtp: Optional exogenous variables for time-varying transition probabilities (TVTP). Shape ``(T, q)``. When provided, the transition probabilities are modelled as logistic functions of these variables, allowing macro indicators to influence regime-switching dynamics. Returns: Dictionary with: - ``smoothed_probs`` (np.ndarray): Smoothed regime probabilities, shape ``(T, K)``. - ``filtered_probs`` (np.ndarray): Filtered regime probabilities, shape ``(T, K)``. - ``states`` (np.ndarray): Most likely state sequence (argmax of smoothed probabilities), shape ``(T,)``. - ``transition_matrix`` (np.ndarray): Transition matrix, shape ``(K, K)``. - ``expected_durations`` (np.ndarray): Expected duration in each regime, ``1 / (1 - P[k,k])``, shape ``(K,)``. - ``regime_params`` (dict): Per-regime parameters including ``'mean_k'``, ``'sigma2_k'``, and ``'ar_k_j'`` (AR coefficient *j* for regime *k*). - ``aic`` (float): Akaike Information Criterion. - ``bic`` (float): Bayesian Information Criterion. - ``summary`` (str): Text summary from statsmodels. - ``model_result``: The fitted statsmodels result object. Example: >>> import numpy as np, pandas as pd >>> rng = np.random.default_rng(0) >>> # Simulate AR(1) with switching mean and persistence >>> returns = pd.Series(np.concatenate([ ... rng.normal(0.002, 0.008, 250), ... rng.normal(-0.001, 0.020, 250), ... ])) >>> result = fit_ms_autoregression(returns, k_regimes=2, order=1) >>> print(result['transition_matrix']) >>> print(result['regime_params']) Notes: Requires ``statsmodels >= 0.13``. Convergence can be slow for higher-order AR models or many regimes. The Hamilton filter is used for inference. References: Hamilton, J. D. (1989). "A New Approach to the Economic Analysis of Nonstationary Time Series and the Business Cycle." *Econometrica*, 57(2), 357-384. See Also: fit_ms_regression: Markov-switching regression (no AR terms). fit_gaussian_hmm: HMM without autoregressive dynamics. select_n_states: BIC-based state selection. """ from statsmodels.tsa.regime_switching.markov_autoregression import ( MarkovAutoregression, ) y = coerce_array(endog, "endog") model = MarkovAutoregression( y, k_regimes=k_regimes, order=order, switching_ar=switching_ar, switching_variance=switching_variance, exog_tvtp=exog_tvtp, ) result = model.fit(disp=False) # Extract regime-specific parameters regime_params: dict[str, float] = {} for k in range(k_regimes): regime_params[f"mean_{k}"] = float(result.params[k]) if switching_variance: for k in range(k_regimes): regime_params[f"sigma2_{k}"] = float(result.params[k_regimes + k]) # AR coefficients if switching_ar: for k in range(k_regimes): for j in range(order): key = f"ar_{k}_{j + 1}" # statsmodels stores AR params after mean + variance params base_idx = k_regimes + (k_regimes if switching_variance else 1) ar_idx = base_idx + k * order + j if ar_idx < len(result.params): regime_params[key] = float(result.params[ar_idx]) else: for j in range(order): key = f"ar_{j + 1}" base_idx = k_regimes + (k_regimes if switching_variance else 1) ar_idx = base_idx + j if ar_idx < len(result.params): regime_params[key] = float(result.params[ar_idx]) # Transition matrix -- statsmodels may return (K, K) or (K, K, T) raw_transmat = np.array(result.regime_transition) if raw_transmat.ndim == 3: # Time-varying transition probs: average across time and transpose transmat = raw_transmat.mean(axis=-1).T elif raw_transmat.ndim == 2: transmat = raw_transmat.T else: transmat = raw_transmat # Ensure 2-D (K, K) transmat = np.atleast_2d(transmat) # Probabilities -- statsmodels returns (K, T); transpose to (T, K) raw_smoothed = np.array(result.smoothed_marginal_probabilities) raw_filtered = np.array(result.filtered_marginal_probabilities) if ( raw_smoothed.shape[0] == k_regimes and raw_smoothed.shape[0] != raw_smoothed.shape[1] ): smoothed_probs = raw_smoothed.T else: smoothed_probs = raw_smoothed if ( raw_filtered.shape[0] == k_regimes and raw_filtered.shape[0] != raw_filtered.shape[1] ): filtered_probs = raw_filtered.T else: filtered_probs = raw_filtered states = np.argmax(smoothed_probs, axis=1) # Expected durations diag = np.diag(transmat).copy() expected_durations = 1.0 / np.maximum(1.0 - diag, 1e-10) return { "smoothed_probs": smoothed_probs, "filtered_probs": filtered_probs, "states": states, "transition_matrix": transmat, "expected_durations": expected_durations, "regime_params": regime_params, "aic": float(result.aic), "bic": float(result.bic), "summary": str(result.summary()), "model_result": result, }
# --------------------------------------------------------------------------- # BIC-Based State Selection # ---------------------------------------------------------------------------
[docs] @requires_extra("regimes") def select_n_states( returns: pd.Series | np.ndarray, max_states: int = 5, n_init: int = 10, n_iter: int = 200, random_state: int = 42, ) -> dict[str, Any]: """Select the optimal number of HMM states using the BIC criterion. Fits Gaussian HMMs with ``n_states`` from 2 to ``max_states`` and selects the model that minimises the Bayesian Information Criterion (BIC). **Why BIC over AIC for HMM state selection:** The BIC imposes a stronger penalty on model complexity than the AIC, which is critical for HMM selection because: 1. **HMM parameter count grows quadratically** with the number of states (transition matrix has K*(K-1) free parameters), so without a strong penalty the AIC tends to overfit by selecting too many states. 2. **BIC is consistent**: as sample size grows, BIC selects the true model (if it is in the candidate set). AIC tends to overfit even asymptotically. 3. In practice, financial regime models with 2-3 states are usually adequate; BIC correctly penalises 4+ state models that capture noise rather than true regimes. The BIC is defined as: .. math:: \\text{BIC} = k \\ln(T) - 2 \\ln(\\hat{L}) where *k* is the number of free parameters, *T* is the sample size, and :math:`\\hat{L}` is the maximised likelihood. Parameters: returns: Return series. NaN values are dropped. max_states: Maximum number of states to consider (inclusive). The search covers ``[2, 3, ..., max_states]``. n_init: Number of random restarts per model fit. n_iter: Maximum EM iterations per restart. random_state: Base random seed. Returns: Dictionary with: - ``optimal_n_states`` (int): The number of states with the lowest BIC. - ``scores`` (dict[int, float]): Mapping from *n_states* to its BIC value. - ``best_model`` (dict): The full result dict from ``fit_gaussian_hmm`` for the optimal model. Example: >>> result = select_n_states(returns, max_states=5) >>> print(f"Optimal states: {result['optimal_n_states']}") >>> print(result['scores']) {2: -2345.6, 3: -2310.1, 4: -2290.3, 5: -2275.0} See Also: fit_gaussian_hmm: Fit a single HMM with a fixed number of states. """ scores: dict[int, float] = {} best_bic = np.inf best_n = 2 best_result: dict[str, Any] | None = None for n in range(2, max_states + 1): try: result = fit_gaussian_hmm( returns, n_states=n, n_init=n_init, n_iter=n_iter, random_state=random_state, ) bic_val = result["bic"] scores[n] = bic_val if bic_val < best_bic: best_bic = bic_val best_n = n best_result = result except Exception: # If fitting fails for this n, skip scores[n] = np.inf if best_result is None: raise RuntimeError( "All HMM fits failed. Check that the data has sufficient " "variation for regime detection." ) return { "optimal_n_states": best_n, "scores": scores, "best_model": best_result, }
# --------------------------------------------------------------------------- # Multivariate HMM # ---------------------------------------------------------------------------
[docs] @requires_extra("regimes") def fit_multivariate_hmm( returns: pd.DataFrame, n_states: int = 2, covariance_type: str = "full", n_init: int = 10, n_iter: int = 200, random_state: int = 42, ) -> dict[str, Any]: """Fit a multivariate Gaussian HMM for multi-asset regime detection. Extends the univariate HMM to a vector of asset returns, enabling detection of regimes that manifest as shifts in *cross-asset correlations*, not just individual-asset volatility. **Why multivariate regime detection matters:** Single-asset HMMs detect volatility regimes but miss a crucial phenomenon: **correlation regime shifts**. During crises, asset correlations spike toward 1, destroying diversification precisely when it is needed most. A multivariate HMM captures this by estimating a full covariance matrix per regime. Typical findings on equity/bond data: - **Low-vol regime**: Low equity vol, low bond vol, moderate negative equity-bond correlation (diversification works). - **High-vol regime**: High equity vol, elevated bond vol, correlations shift (equities become more correlated with each other, equity-bond correlation may flip sign). The model estimates per-regime: - Mean return vector :math:`\\mu_k \\in \\mathbb{R}^d` - Full covariance matrix :math:`\\Sigma_k \\in \\mathbb{R}^{d \\times d}` - From which per-regime correlation matrices are derived. Parameters: returns: DataFrame of multi-asset returns, shape ``(T, d)`` where *d* is the number of assets. Column names are preserved for labeling. NaN rows are dropped. n_states: Number of hidden states (regimes). covariance_type: Covariance parameterisation. ``'full'`` is strongly recommended for capturing correlation shifts. n_init: Number of random EM restarts. n_iter: Maximum EM iterations per restart. random_state: Random seed. Returns: Dictionary with: - ``states`` (np.ndarray): Most likely state sequence (Viterbi), shape ``(T,)``. - ``state_probs`` (np.ndarray): Smoothed state probabilities, shape ``(T, K)``. - ``means`` (np.ndarray): Per-regime mean vectors, shape ``(K, d)``. - ``covariances`` (np.ndarray): Per-regime covariance matrices, shape ``(K, d, d)``. - ``per_regime_correlations`` (dict[int, np.ndarray]): Per-regime correlation matrices, keyed by regime index. - ``transition_matrix`` (np.ndarray): Transition matrix ``(K, K)``. - ``log_likelihood`` (float): Total log-likelihood. - ``aic`` (float): AIC. - ``bic`` (float): BIC. - ``model``: The fitted hmmlearn model. - ``index`` (pd.Index | None): Preserved DatetimeIndex. - ``columns`` (list[str]): Asset names. Example: >>> import pandas as pd, numpy as np >>> rng = np.random.default_rng(42) >>> # Low-vol regime: low correlation >>> cov_low = np.array([[0.0001, 0.00002], [0.00002, 0.00015]]) >>> low = rng.multivariate_normal([0.001, 0.0008], cov_low, 300) >>> # High-vol regime: high correlation >>> cov_high = np.array([[0.0009, 0.0006], [0.0006, 0.0008]]) >>> high = rng.multivariate_normal([-0.001, -0.0005], cov_high, 200) >>> df = pd.DataFrame(np.vstack([low, high]), columns=['SPY', 'QQQ']) >>> result = fit_multivariate_hmm(df, n_states=2) >>> print(result['per_regime_correlations'][0]) >>> print(result['per_regime_correlations'][1]) See Also: fit_gaussian_hmm: Univariate HMM. regime_conditional_moments: Per-regime moments from states. """ from hmmlearn.hmm import GaussianHMM # Prepare data if not isinstance(returns, pd.DataFrame): raise TypeError("returns must be a pd.DataFrame for multivariate HMM") clean = returns.dropna() index = clean.index columns = list(clean.columns) X = clean.values.astype(np.float64) T, d = X.shape # Fit with multiple restarts best_model = None best_score = -np.inf for i in range(n_init): model = GaussianHMM( n_components=n_states, covariance_type=covariance_type, n_iter=n_iter, random_state=random_state + i, ) try: model.fit(X) score = model.score(X) if score > best_score: best_score = score best_model = model except Exception: continue if best_model is None: raise RuntimeError(f"All {n_init} EM restarts failed for multivariate HMM.") model = best_model states = model.predict(X) state_probs = model.predict_proba(X) # Extract means and covariances means = model.means_ # (K, d) if covariance_type == "full": covariances = model.covars_ # (K, d, d) elif covariance_type == "diag": covariances = np.array([np.diag(model.covars_[k]) for k in range(n_states)]) elif covariance_type == "spherical": covariances = np.array([np.eye(d) * model.covars_[k] for k in range(n_states)]) elif covariance_type == "tied": covariances = np.array([model.covars_ for _ in range(n_states)]) else: covariances = model.covars_ # Re-order states by ascending total variance (trace of cov) total_var = np.array([np.trace(covariances[k]) for k in range(n_states)]) order = np.argsort(total_var) state_map = {int(old): new for new, old in enumerate(order)} states = np.array([state_map[int(s)] for s in states]) state_probs = state_probs[:, order] means = means[order] covariances = covariances[order] transmat = model.transmat_[np.ix_(order, order)] # Per-regime correlation matrices per_regime_correlations: dict[int, np.ndarray] = {} for k in range(n_states): cov_k = covariances[k] std_k = np.sqrt(np.diag(cov_k)) outer = np.outer(std_k, std_k) outer = np.maximum(outer, 1e-12) per_regime_correlations[k] = cov_k / outer # Information criteria if covariance_type == "full": n_cov = n_states * d * (d + 1) // 2 elif covariance_type == "diag": n_cov = n_states * d elif covariance_type == "spherical": n_cov = n_states elif covariance_type == "tied": n_cov = d * (d + 1) // 2 else: n_cov = n_states * d * (d + 1) // 2 n_params = ( (n_states - 1) # initial state + n_states * (n_states - 1) # transition matrix + n_states * d # means + n_cov # covariances ) log_likelihood = float(best_score) aic = float(2 * n_params - 2 * log_likelihood) bic = float(n_params * np.log(T) - 2 * log_likelihood) return { "states": states, "state_probs": state_probs, "means": means, "covariances": covariances, "per_regime_correlations": per_regime_correlations, "transition_matrix": transmat, "log_likelihood": log_likelihood, "aic": aic, "bic": bic, "model": model, "index": index, "columns": columns, }
# --------------------------------------------------------------------------- # Regime-Conditional Moments # ---------------------------------------------------------------------------
[docs] def regime_conditional_moments( returns: pd.DataFrame, states: np.ndarray, ) -> dict[int, dict[str, np.ndarray]]: """Compute per-regime mean vector and covariance/correlation matrices. After fitting a multivariate HMM (or assigning regimes by any method), this function extracts the sample moments for each regime. These moments are the building blocks for **regime-aware portfolio construction**: the mean vector feeds into the expected-return input and the covariance matrix replaces the unconditional covariance in a mean-variance optimiser. Parameters: returns: DataFrame of multi-asset returns, shape ``(T, d)``. states: Integer regime labels, shape ``(T,)``, aligned with ``returns``. Returns: Dictionary keyed by regime index (int), where each value is a dict with: - ``mean`` (np.ndarray): Sample mean vector, shape ``(d,)``. - ``cov`` (np.ndarray): Sample covariance matrix, shape ``(d, d)``. - ``corr`` (np.ndarray): Sample correlation matrix, shape ``(d, d)``. Example: >>> result = fit_multivariate_hmm(returns_df, n_states=2) >>> moments = regime_conditional_moments( ... returns_df, result['states'] ... ) >>> # Use regime-0 covariance for portfolio optimisation >>> cov_bull = moments[0]['cov'] >>> mu_bull = moments[0]['mean'] See Also: fit_multivariate_hmm: Multi-asset regime detection. regime_aware_portfolio: Blend weights across regimes. """ if not isinstance(returns, pd.DataFrame): raise TypeError("returns must be a pd.DataFrame") R = returns.values.astype(np.float64) s = np.asarray(states, dtype=int).flatten() if len(R) != len(s): raise ValueError( f"returns and states must have the same length, " f"got {len(R)} and {len(s)}." ) unique_states = sorted(np.unique(s)) result: dict[int, dict[str, np.ndarray]] = {} for state in unique_states: mask = s == state regime_R = R[mask] mean_vec = np.mean(regime_R, axis=0) cov_mat = np.cov(regime_R, rowvar=False, ddof=1) # Ensure 2-D even for single-asset edge case if cov_mat.ndim == 0: cov_mat = np.array([[float(cov_mat)]]) # Correlation from covariance std_vec = np.sqrt(np.diag(cov_mat)) outer = np.outer(std_vec, std_vec) outer = np.maximum(outer, 1e-12) corr_mat = cov_mat / outer result[state] = { "mean": mean_vec, "cov": cov_mat, "corr": corr_mat, } return result
# --------------------------------------------------------------------------- # Internal helpers # --------------------------------------------------------------------------- def _max_drawdown_from_returns(returns: np.ndarray) -> float: """Compute maximum drawdown from a return array.""" if len(returns) < 2: return 0.0 cumulative = np.cumprod(1.0 + returns) running_max = np.maximum.accumulate(cumulative) drawdowns = (cumulative - running_max) / np.maximum(running_max, 1e-12) return float(np.min(drawdowns)) def _count_hmm_params(n_states: int, covariance_type: str) -> int: """Count free parameters in a univariate Gaussian HMM.""" # Initial state: K - 1 # Transition matrix: K * (K - 1) # Means: K # Covariances depend on type (univariate case) n_init_params = n_states - 1 n_trans_params = n_states * (n_states - 1) n_mean_params = n_states if covariance_type in ("full", "diag", "diagonal", "spherical"): n_cov_params = n_states elif covariance_type == "tied": n_cov_params = 1 else: n_cov_params = n_states return n_init_params + n_trans_params + n_mean_params + n_cov_params def _extract_variances( covars: np.ndarray, n_states: int, covariance_type: str, ) -> np.ndarray: """Extract per-state variance from hmmlearn covars_ for univariate data. hmmlearn stores covariances differently depending on covariance_type: - full: (n_states, n_features, n_features) - diag: (n_states, n_features) - spherical: (n_states,) - tied: (n_features, n_features) -- same for all states Returns a flat 1-D float array of shape (n_states,). """ if covariance_type == "spherical": result = np.asarray(covars, dtype=np.float64).flatten()[:n_states] elif covariance_type in ("diag", "diagonal"): # Shape (n_states, n_features) -- take first feature result = np.array([float(covars[k].flatten()[0]) for k in range(n_states)]) elif covariance_type == "tied": # Same covariance for all states var = float(np.asarray(covars).flatten()[0]) result = np.full(n_states, var) else: # full: (n_states, n_features, n_features) -- extract [0,0] result = np.array( [float(np.asarray(covars[k]).flatten()[0]) for k in range(n_states)] ) return result def _compute_steady_state(transmat: np.ndarray) -> np.ndarray: """Compute the ergodic (stationary) distribution of a transition matrix. Solves pi @ P = pi, sum(pi) = 1 using the eigenvalue method. """ K = transmat.shape[0] # Find left eigenvector with eigenvalue 1 eigenvalues, eigenvectors = np.linalg.eig(transmat.T) # Find the index of eigenvalue closest to 1 idx = np.argmin(np.abs(eigenvalues - 1.0)) steady = np.real(eigenvectors[:, idx]) steady = np.abs(steady) # Ensure non-negative total = steady.sum() if total > 0: steady /= total else: steady = np.ones(K) / K return steady def _avg_regime_duration(states: np.ndarray, target_state: int) -> float: """Compute average consecutive duration in a particular regime.""" durations: list[int] = [] current_duration = 0 for s in states: if s == target_state: current_duration += 1 else: if current_duration > 0: durations.append(current_duration) current_duration = 0 if current_duration > 0: durations.append(current_duration) return float(np.mean(durations)) if durations else 0.0 def _skewness(x: np.ndarray) -> float: """Compute sample skewness.""" n = len(x) if n < 3: return 0.0 m = np.mean(x) s = np.std(x, ddof=1) if s < 1e-12: return 0.0 return float(np.mean(((x - m) / s) ** 3) * n / ((n - 1) * (n - 2) / n)) def _kurtosis(x: np.ndarray) -> float: """Compute sample excess kurtosis.""" n = len(x) if n < 4: return 0.0 m = np.mean(x) s = np.std(x, ddof=1) if s < 1e-12: return 0.0 return float(np.mean(((x - m) / s) ** 4) - 3.0)