Source code for wraquant.regimes.scoring

"""Regime quality assessment and method comparison.

Evaluates how *good* a set of detected regimes actually is by measuring
stability, separability, and predictability.  These scores help you
decide whether your regime model is capturing meaningful market structure
or just fitting noise.

Three orthogonal quality dimensions
------------------------------------

1. **Stability** (:func:`regime_stability_score`): Are the regimes
   persistent?  Rapidly flickering regimes are hard to trade.
2. **Separation** (:func:`regime_separation_score`): Are the regimes
   statistically distinct?  Overlapping distributions offer no
   actionable information.
3. **Predictability** (:func:`regime_predictability`): Can regime
   transitions be forecast?  A model is useful only if it gives you
   a head-start on the next shift.

Use :func:`compare_regime_methods` to run all three metrics across
multiple detection algorithms on the same data.
"""

from __future__ import annotations

from typing import Any, Callable

import numpy as np
import pandas as pd


# ---------------------------------------------------------------------------
# Regime Stability Score
# ---------------------------------------------------------------------------


[docs] def regime_stability_score( states: np.ndarray | pd.Series, transition_matrix: np.ndarray | None = None, ) -> dict[str, Any]: """Measure how stable the detected regimes are. Stability quantifies the temporal persistence of regimes. Stable regimes last many periods and transitions are infrequent. A stability score close to 1 indicates regimes that are highly persistent; a score near 0 indicates rapid, noisy switching that is likely untradable. **How it works:** Three sub-metrics are combined (equal-weighted average): 1. **Average duration** (normalised): longer average stays = more stable. Normalised as ``1 - 1/avg_duration`` so it maps to [0, 1]. 2. **Transition frequency** (inverse): fewer transitions per unit time = more stable. Computed as ``1 - n_transitions / (T-1)``. 3. **Probability entropy** (inverse): when the transition matrix rows have low entropy (i.e., dominated by the diagonal), the chain is persistent. Normalised by ``log(K)`` where *K* is the number of regimes. **Interpretation guidance:** - **> 0.8**: Highly stable regimes, suitable for discrete portfolio switches. - **0.5 -- 0.8**: Moderate stability, consider probability-weighted blending rather than hard switches. - **< 0.5**: Unstable, likely overfitting or too many regimes. Parameters: states: Integer regime labels, shape ``(T,)``. transition_matrix: Optional ``(K, K)`` transition matrix. If ``None``, an empirical matrix is computed from *states*. Returns: Dictionary with: - **stability_score** (float): Composite stability score in [0, 1]. - **avg_duration** (float): Average regime duration in periods. - **transition_frequency** (float): Fraction of time steps with a regime change. - **probability_entropy** (float): Mean row entropy of the transition matrix (nats). - **n_transitions** (int): Total number of regime transitions. - **per_regime_duration** (dict[int, float]): Average duration per regime. Example: >>> states = np.array([0]*100 + [1]*100 + [0]*100) >>> result = regime_stability_score(states) >>> print(f"Stability: {result['stability_score']:.2f}") 0.99 See Also: regime_separation_score: Distribution distance between regimes. compare_regime_methods: Multi-method comparison. """ s = np.asarray(states, dtype=int).flatten() T = len(s) unique_states = np.unique(s) K = len(unique_states) # Transition count transitions = np.sum(s[1:] != s[:-1]) transition_freq = transitions / max(T - 1, 1) # Per-regime durations durations: dict[int, list[int]] = {int(k): [] for k in unique_states} current_state = int(s[0]) current_len = 1 for t in range(1, T): if int(s[t]) == current_state: current_len += 1 else: durations[current_state].append(current_len) current_state = int(s[t]) current_len = 1 durations[current_state].append(current_len) per_regime_duration = { k: float(np.mean(v)) if v else 0.0 for k, v in durations.items() } all_durations = [d for dlist in durations.values() for d in dlist] avg_duration = float(np.mean(all_durations)) if all_durations else 1.0 # Transition matrix if transition_matrix is None: transition_matrix = _empirical_transmat(s, K) # Probability entropy: mean row entropy eps = 1e-12 row_entropies = [] for row in transition_matrix: p = row[row > eps] entropy = -float(np.sum(p * np.log(p))) row_entropies.append(entropy) mean_entropy = float(np.mean(row_entropies)) max_entropy = np.log(K) if K > 1 else 1.0 # Sub-scores duration_score = 1.0 - 1.0 / max(avg_duration, 1.0) freq_score = 1.0 - transition_freq entropy_score = 1.0 - mean_entropy / max(max_entropy, eps) stability = float(np.mean([duration_score, freq_score, entropy_score])) return { "stability_score": np.clip(stability, 0.0, 1.0), "avg_duration": avg_duration, "transition_frequency": transition_freq, "probability_entropy": mean_entropy, "n_transitions": int(transitions), "per_regime_duration": per_regime_duration, }
# --------------------------------------------------------------------------- # Regime Separation Score # ---------------------------------------------------------------------------
[docs] def regime_separation_score( returns: np.ndarray | pd.Series, states: np.ndarray | pd.Series, ) -> dict[str, Any]: """Measure how well-separated the regime distributions are. Separation quantifies whether the regimes have meaningfully distinct statistical properties. High separation means the regimes carry actionable information; low separation means the classification is arbitrary. **How it works:** For each pair of regimes *(i, j)*: 1. **Bhattacharyya distance**: measures divergence between two Gaussian distributions. Larger = more separated. 2. **Overlap coefficient**: fraction of probability mass that overlaps when the two regime distributions are superimposed. Smaller = more separated. The composite score is the average pairwise Bhattacharyya distance, normalised to [0, 1] via a sigmoid transform. **Interpretation guidance:** - **> 0.7**: Well-separated regimes with distinct return distributions. - **0.3 -- 0.7**: Moderate separation. Regimes differ but there is meaningful overlap. - **< 0.3**: Poor separation. Consider reducing the number of regimes. Parameters: returns: Return series, shape ``(T,)``. states: Integer regime labels, shape ``(T,)``. Returns: Dictionary with: - **separation_score** (float): Composite separation in [0, 1]. - **pairwise_bhattacharyya** (dict[tuple, float]): Bhattacharyya distance for each regime pair ``(i, j)``. - **pairwise_overlap** (dict[tuple, float]): Overlap coefficient for each pair. - **per_regime_stats** (dict[int, dict]): Mean and std per regime. Example: >>> rng = np.random.default_rng(0) >>> returns = np.concatenate([ ... rng.normal(0.01, 0.01, 200), ... rng.normal(-0.02, 0.03, 200), ... ]) >>> states = np.array([0]*200 + [1]*200) >>> result = regime_separation_score(returns, states) >>> print(f"Separation: {result['separation_score']:.2f}") See Also: regime_stability_score: Temporal persistence of regimes. compare_regime_methods: Multi-method comparison. """ r = np.asarray(returns, dtype=np.float64).flatten() s = np.asarray(states, dtype=int).flatten() if len(r) != len(s): raise ValueError( f"returns and states must have same length, got {len(r)} vs {len(s)}" ) unique_states = sorted(np.unique(s)) K = len(unique_states) # Per-regime statistics per_regime_stats: dict[int, dict[str, float]] = {} for k in unique_states: mask = s == k rk = r[mask] per_regime_stats[int(k)] = { "mean": float(np.mean(rk)), "std": float(np.std(rk, ddof=1)) if len(rk) > 1 else 0.0, "n": int(len(rk)), } # Pairwise metrics pairwise_bhattacharyya: dict[tuple[int, int], float] = {} pairwise_overlap: dict[tuple[int, int], float] = {} for i_idx in range(K): for j_idx in range(i_idx + 1, K): ki, kj = unique_states[i_idx], unique_states[j_idx] mu_i = per_regime_stats[int(ki)]["mean"] mu_j = per_regime_stats[int(kj)]["mean"] s_i = max(per_regime_stats[int(ki)]["std"], 1e-12) s_j = max(per_regime_stats[int(kj)]["std"], 1e-12) # Bhattacharyya distance for Gaussians var_i, var_j = s_i ** 2, s_j ** 2 avg_var = (var_i + var_j) / 2.0 db = 0.25 * (mu_i - mu_j) ** 2 / avg_var + 0.5 * np.log( avg_var / np.sqrt(var_i * var_j) ) pairwise_bhattacharyya[(int(ki), int(kj))] = float(db) # Overlap coefficient (Bhattacharyya coefficient = exp(-db)) bc = float(np.exp(-db)) pairwise_overlap[(int(ki), int(kj))] = bc # Composite score: sigmoid of average Bhattacharyya distance if pairwise_bhattacharyya: avg_db = float(np.mean(list(pairwise_bhattacharyya.values()))) # Sigmoid: maps (0, inf) -> (0, 1), with db=2 -> ~0.76 separation = float(1.0 - np.exp(-avg_db)) else: separation = 0.0 return { "separation_score": np.clip(separation, 0.0, 1.0), "pairwise_bhattacharyya": pairwise_bhattacharyya, "pairwise_overlap": pairwise_overlap, "per_regime_stats": per_regime_stats, }
# --------------------------------------------------------------------------- # Regime Predictability # ---------------------------------------------------------------------------
[docs] def regime_predictability( states: np.ndarray | pd.Series, transition_matrix: np.ndarray | None = None, test_fraction: float = 0.3, ) -> dict[str, Any]: """Assess whether regime transitions can be predicted. A regime model is useful for trading only if you can anticipate transitions before they happen (or at least classify the current regime in real time). This function evaluates predictability using transition-probability-based forecasting. **How it works:** 1. Split the state sequence into train / test at ``(1 - test_fraction) * T``. 2. Estimate the transition matrix from the training set. 3. For each test observation, predict the next state as the most probable successor under the estimated transition matrix. 4. Compute out-of-sample accuracy and compare to a naive baseline (always predict the most common state). **Interpretation guidance:** - **accuracy > baseline + 0.1**: The transition structure adds predictive value. Use regime probabilities to adjust portfolio weights. - **accuracy ~ baseline**: No predictive value from transitions. Consider whether the regime labeling is too noisy or the number of states is wrong. - **economic_value**: A rough estimate of the annualised Sharpe improvement from perfect regime prediction vs buy-and-hold. This uses per-regime Sharpe ratios and is indicative only. Parameters: states: Integer regime labels, shape ``(T,)``. transition_matrix: If provided, used instead of the training-set estimate. test_fraction: Fraction of data reserved for testing. Returns: Dictionary with: - **predictability_score** (float): Composite predictability in [0, 1]. Combines accuracy lift and transition regularity. - **accuracy** (float): Out-of-sample one-step prediction accuracy. - **baseline_accuracy** (float): Accuracy of always predicting the most common state. - **accuracy_lift** (float): ``accuracy - baseline_accuracy``. - **transition_matrix_train** (np.ndarray): Transition matrix estimated from training data. Example: >>> # Highly predictable two-state chain >>> rng = np.random.default_rng(0) >>> states = np.zeros(500, dtype=int) >>> for t in range(1, 500): ... if states[t-1] == 0: ... states[t] = 0 if rng.random() < 0.95 else 1 ... else: ... states[t] = 1 if rng.random() < 0.90 else 0 >>> result = regime_predictability(states) >>> print(f"Accuracy: {result['accuracy']:.2f}") See Also: regime_stability_score: Regime persistence. regime_separation_score: Distribution distinctness. compare_regime_methods: Multi-method comparison. """ s = np.asarray(states, dtype=int).flatten() T = len(s) split = int(T * (1.0 - test_fraction)) split = max(split, 2) split = min(split, T - 2) s_train = s[:split] s_test = s[split:] T_test = len(s_test) unique_states = np.unique(s) K = len(unique_states) # Estimate transition matrix from training data if transition_matrix is None: tm = _empirical_transmat(s_train, K) else: tm = np.array(transition_matrix, dtype=np.float64) # Out-of-sample one-step prediction correct = 0 for t in range(T_test - 1): current = int(s_test[t]) actual_next = int(s_test[t + 1]) if current < K: predicted = int(np.argmax(tm[current])) else: predicted = 0 if predicted == actual_next: correct += 1 accuracy = correct / max(T_test - 1, 1) # Baseline: always predict most common state in test from collections import Counter counts = Counter(s_test.tolist()) most_common = counts.most_common(1)[0][0] baseline_correct = sum( 1 for t in range(T_test - 1) if int(s_test[t + 1]) == most_common ) baseline_accuracy = baseline_correct / max(T_test - 1, 1) accuracy_lift = accuracy - baseline_accuracy # Composite predictability score # Combines accuracy lift (clipped to [0, 0.5], scaled to [0, 1]) # and diagonal dominance of the transition matrix diag_strength = float(np.mean(np.diag(tm))) if K > 1 else 0.5 lift_score = np.clip(accuracy_lift * 2.0, 0.0, 1.0) predictability = float(np.mean([lift_score, diag_strength])) return { "predictability_score": np.clip(predictability, 0.0, 1.0), "accuracy": accuracy, "baseline_accuracy": baseline_accuracy, "accuracy_lift": accuracy_lift, "transition_matrix_train": tm, }
# --------------------------------------------------------------------------- # Compare Regime Methods # ---------------------------------------------------------------------------
[docs] def compare_regime_methods( returns: pd.Series | np.ndarray, methods: dict[str, Callable[..., Any]] | None = None, method_kwargs: dict[str, dict[str, Any]] | None = None, ) -> pd.DataFrame: """Compare multiple regime detection methods on the same data. Runs each method on the provided return series and evaluates stability, separation, and predictability. Returns a DataFrame for easy comparison. **Interpretation guidance:** - Use this to pick the best regime model for your asset or strategy. The "best" model depends on your use-case: high **stability** for discrete allocation switches, high **separation** for regime-conditional signals, high **predictability** for transition timing. Parameters: returns: Return series, shape ``(T,)``. methods: Dictionary mapping method names to callables. Each callable must accept ``returns`` and return a tuple ``(states, transition_matrix)`` where *states* is an integer array and *transition_matrix* is ``(K, K)`` or ``None``. If ``None``, a default set is used (requires ``detect_regimes`` from ``wraquant.regimes.base``). method_kwargs: Per-method keyword arguments. Keys should match *methods* keys. Returns: pd.DataFrame indexed by method name with columns: - ``stability``, ``separation``, ``predictability`` - ``avg_duration``, ``n_transitions``, ``n_regimes`` - ``composite`` (equal-weighted average of the three scores) Example: >>> import numpy as np >>> rng = np.random.default_rng(0) >>> returns = np.concatenate([ ... rng.normal(0.01, 0.01, 200), ... rng.normal(-0.02, 0.03, 200), ... ]) >>> df = compare_regime_methods(pd.Series(returns)) >>> print(df.sort_values("composite", ascending=False)) See Also: regime_stability_score: Stability metric details. regime_separation_score: Separation metric details. regime_predictability: Predictability metric details. """ r = np.asarray(returns, dtype=np.float64).flatten() method_kwargs = method_kwargs or {} if methods is None: methods = _default_methods() records = [] for name, method_fn in methods.items(): kwargs = method_kwargs.get(name, {}) try: result = method_fn(returns, **kwargs) if isinstance(result, tuple): states_arr, tm = result elif isinstance(result, dict): states_arr = result.get("states", np.zeros(len(r), dtype=int)) tm = result.get("transition_matrix", None) else: # Assume it's a RegimeResult states_arr = result.states tm = result.transition_matrix except Exception as exc: # Record failure without crashing the comparison records.append({ "method": name, "stability": float("nan"), "separation": float("nan"), "predictability": float("nan"), "avg_duration": float("nan"), "n_transitions": 0, "n_regimes": 0, "composite": float("nan"), "error": str(exc), }) continue states_arr = np.asarray(states_arr, dtype=int).flatten() K = len(np.unique(states_arr)) stab = regime_stability_score(states_arr, tm) sep = regime_separation_score(r, states_arr) pred = regime_predictability(states_arr, tm) composite = float(np.mean([ stab["stability_score"], sep["separation_score"], pred["predictability_score"], ])) records.append({ "method": name, "stability": stab["stability_score"], "separation": sep["separation_score"], "predictability": pred["predictability_score"], "avg_duration": stab["avg_duration"], "n_transitions": stab["n_transitions"], "n_regimes": K, "composite": composite, }) df = pd.DataFrame(records) if "method" in df.columns: df = df.set_index("method") return df
# --------------------------------------------------------------------------- # Internal helpers # --------------------------------------------------------------------------- def _empirical_transmat(states: np.ndarray, n_regimes: int) -> np.ndarray: """Compute empirical transition matrix from a state sequence.""" tm = np.zeros((n_regimes, n_regimes)) s = np.asarray(states, dtype=int).flatten() for t in range(len(s) - 1): i, j = int(s[t]), int(s[t + 1]) if i < n_regimes and j < n_regimes: tm[i, j] += 1 row_sums = tm.sum(axis=1, keepdims=True) row_sums = np.maximum(row_sums, 1e-10) return tm / row_sums def _default_methods() -> dict[str, Callable[..., Any]]: """Build default method dictionary using detect_regimes.""" from wraquant.regimes.base import detect_regimes def _make_detector(method: str) -> Callable[..., Any]: def _detect(returns: Any, **kwargs: Any) -> Any: return detect_regimes(returns, method=method, **kwargs) return _detect return { "hmm": _make_detector("hmm"), "gmm": _make_detector("gmm"), "kmeans": _make_detector("kmeans"), }