Source code for wraquant.regimes.labels

"""Regime labeling, classification, and duration analysis.

Provides rule-based and statistical approaches to labeling market
regimes without requiring a fitted model.  These functions are useful
for backtesting, for creating training labels for supervised regime
classifiers, and for generating interpretable regime descriptions.
"""

from __future__ import annotations

from typing import Any

import numpy as np
import pandas as pd

from wraquant.core._coerce import coerce_series


[docs] def label_regimes(states: pd.Series, returns: pd.Series) -> pd.Series: """Assign descriptive labels to numeric regime states. States are sorted by mean return: the state with the highest mean return is labeled ``"bull"``, the lowest ``"bear"``, and any intermediate states ``"neutral_1"``, ``"neutral_2"``, etc. Parameters: states: Integer regime state series. returns: Corresponding return series (same index). Returns: Series of string regime labels. """ returns = coerce_series(returns, name="returns") states = coerce_series(states, name="states") aligned_returns, aligned_states = returns.align(states, join="inner") unique_states = sorted(aligned_states.unique()) if len(unique_states) <= 1: return pd.Series("neutral", index=aligned_states.index, name="regime_label") # Rank states by mean return mean_by_state = { s: float(aligned_returns[aligned_states == s].mean()) for s in unique_states } ranked = sorted(mean_by_state, key=lambda s: mean_by_state[s]) label_map: dict[int, str] = {} label_map[ranked[0]] = "bear" label_map[ranked[-1]] = "bull" for i, s in enumerate(ranked[1:-1], start=1): label_map[s] = f"neutral_{i}" return aligned_states.map(label_map).rename("regime_label")
[docs] def regime_statistics( returns: pd.Series, states: pd.Series, ) -> pd.DataFrame: """Compute descriptive statistics for each regime. Parameters: returns: Return series. states: Integer regime state series (same index). Returns: DataFrame indexed by regime state with columns for mean, std, skew, count, and fraction of total observations. """ returns = coerce_series(returns, name="returns") states = coerce_series(states, name="states") aligned_returns, aligned_states = returns.align(states, join="inner") total = len(aligned_returns) records = [] for state in sorted(aligned_states.unique()): mask = aligned_states == state regime_rets = aligned_returns[mask] records.append( { "state": state, "mean": float(regime_rets.mean()), "std": float(regime_rets.std()), "skew": float(regime_rets.skew()), "count": int(mask.sum()), "fraction": float(mask.sum() / total) if total > 0 else 0.0, } ) return pd.DataFrame(records).set_index("state")
# --------------------------------------------------------------------------- # Volatility regime labels # ---------------------------------------------------------------------------
[docs] def volatility_regime_labels( returns: pd.Series | np.ndarray, *, window: int = 21, n_levels: int = 3, quantiles: list[float] | None = None, ) -> pd.Series: """Label regimes based on realised volatility quantiles. A simple, model-free approach that classifies each period by where its rolling volatility falls within the historical distribution. No fitting, no hidden states -- just raw vol percentiles. **Interpretation guidance:** - ``"low_vol"`` periods typically correspond to trending or complacent markets. Strategy-wise, favour momentum and carry. - ``"high_vol"`` periods correspond to stressed or mean-reverting markets. Favour defensive positioning or mean-reversion. - ``"medium_vol"`` is the transition zone. Parameters: returns: Return series. window: Rolling window for realised volatility estimation. Default 21 (roughly one trading month). n_levels: Number of volatility levels. Default 3 produces ``low_vol`` / ``medium_vol`` / ``high_vol``. Use 2 for a binary split or 4+ for finer granularity. quantiles: Explicit quantile boundaries. If provided, overrides ``n_levels``. Must have ``n_levels - 1`` elements, each in (0, 1). Returns: pd.Series of string labels (e.g., ``"low_vol"``, ``"medium_vol"``, ``"high_vol"``). NaN-filled for the warm-up period where rolling volatility is unavailable. Example: >>> import pandas as pd, numpy as np >>> rng = np.random.default_rng(0) >>> returns = pd.Series(rng.normal(0, 0.01, 500)) >>> labels = volatility_regime_labels(returns, n_levels=3) >>> print(labels.value_counts()) See Also: trend_regime_labels: Label by trend direction. composite_regime_labels: Combine vol + trend labels. """ r = pd.Series(np.asarray(returns, dtype=np.float64).flatten()) rolling_vol = r.rolling(window=window, min_periods=max(window // 2, 2)).std() # Determine quantile boundaries if quantiles is None: quantiles = [i / n_levels for i in range(1, n_levels)] thresholds = rolling_vol.quantile(quantiles).values # Assign labels level_names = _vol_level_names(n_levels) labels = pd.Series(np.nan, index=r.index, name="vol_regime", dtype=object) valid = rolling_vol.notna() vol_vals = rolling_vol[valid].values label_arr = np.full(len(vol_vals), level_names[-1], dtype=object) for i, thresh in enumerate(thresholds): label_arr[vol_vals <= thresh] = level_names[min(i, len(level_names) - 1)] # Fix: assign from highest threshold down so that each observation # gets the correct bucket label_arr = np.full(len(vol_vals), level_names[0], dtype=object) for i in range(len(thresholds)): label_arr[vol_vals > thresholds[i]] = level_names[i + 1] labels.loc[valid] = label_arr # Propagate the original index if returns was a Series if isinstance(returns, pd.Series): labels.index = returns.index return labels
# --------------------------------------------------------------------------- # Trend regime labels # ---------------------------------------------------------------------------
[docs] def trend_regime_labels( returns: pd.Series | np.ndarray, *, fast_window: int = 10, slow_window: int = 50, hysteresis: float = 0.0005, ) -> pd.Series: """Label regimes based on moving average slope with hysteresis. Uses a dual moving-average crossover system with a hysteresis band to avoid whipsaw signals. The result is a clean, three-state classification: **uptrend**, **downtrend**, or **sideways**. **Interpretation guidance:** - ``"uptrend"``: Fast MA is above slow MA by more than the hysteresis threshold. Bullish bias. - ``"downtrend"``: Fast MA is below slow MA by more than the hysteresis threshold. Bearish bias. - ``"sideways"``: The two MAs are within the hysteresis band. No directional conviction -- favour range-bound strategies. Parameters: returns: Return series. fast_window: Fast moving average window (periods). slow_window: Slow moving average window (periods). hysteresis: Minimum difference between fast and slow MA (in return units) required to declare a trend. Larger values suppress whipsaws but delay signals. Returns: pd.Series of string labels (``"uptrend"``, ``"downtrend"``, ``"sideways"``). NaN-filled during warm-up. Example: >>> import pandas as pd, numpy as np >>> rng = np.random.default_rng(0) >>> returns = pd.Series(rng.normal(0.001, 0.01, 500)) >>> labels = trend_regime_labels(returns) >>> print(labels.value_counts()) See Also: volatility_regime_labels: Label by vol level. composite_regime_labels: Combine vol + trend labels. """ r = pd.Series(np.asarray(returns, dtype=np.float64).flatten()) # Cumulative returns (price proxy) cum_price = (1 + r).cumprod() fast_ma = cum_price.rolling(window=fast_window, min_periods=fast_window).mean() slow_ma = cum_price.rolling(window=slow_window, min_periods=slow_window).mean() diff = fast_ma - slow_ma labels = pd.Series(np.nan, index=r.index, name="trend_regime", dtype=object) valid = diff.notna() # Apply hysteresis diff_vals = diff[valid].values label_arr = np.where( diff_vals > hysteresis, "uptrend", np.where(diff_vals < -hysteresis, "downtrend", "sideways"), ) labels.loc[valid] = label_arr if isinstance(returns, pd.Series): labels.index = returns.index return labels
# --------------------------------------------------------------------------- # Composite regime labels # ---------------------------------------------------------------------------
[docs] def composite_regime_labels( returns: pd.Series | np.ndarray, *, vol_window: int = 21, fast_window: int = 10, slow_window: int = 50, hysteresis: float = 0.0005, n_vol_levels: int = 2, ) -> pd.Series: """Combine volatility and trend regimes into composite states. Creates 4-6 composite labels by crossing trend direction (uptrend / downtrend / sideways) with volatility level (low / high or low / medium / high). Common composite states: - **bull_calm**: Uptrend + low vol. The best environment for passive equity holding. - **bull_volatile**: Uptrend + high vol. Often late-cycle or recovery rallies. - **bear_calm**: Downtrend + low vol. Grinding bear markets. - **bear_volatile**: Downtrend + high vol. Crisis periods (2008, March 2020). - **sideways_calm**: Range-bound, quiet. - **sideways_volatile**: Choppy, difficult to trade. **Interpretation guidance:** The composite label captures both *direction* and *turbulence*, which together determine the optimal strategy. For instance, momentum strategies work in ``bull_calm`` but fail in ``bear_volatile``. Parameters: returns: Return series. vol_window: Window for rolling volatility. fast_window: Fast MA window for trend. slow_window: Slow MA window for trend. hysteresis: Trend hysteresis threshold. n_vol_levels: 2 or 3 volatility levels. Returns: pd.Series of string composite labels. NaN-filled during warm-up. Example: >>> import pandas as pd, numpy as np >>> rng = np.random.default_rng(0) >>> returns = pd.Series(rng.normal(0.001, 0.01, 500)) >>> labels = composite_regime_labels(returns) >>> print(labels.value_counts()) See Also: volatility_regime_labels: Volatility-only labeling. trend_regime_labels: Trend-only labeling. regime_duration_analysis: Analyse how long each composite state typically lasts. """ vol_labels = volatility_regime_labels( returns, window=vol_window, n_levels=n_vol_levels, ) trend_labels = trend_regime_labels( returns, fast_window=fast_window, slow_window=slow_window, hysteresis=hysteresis, ) # Map trend labels to short names trend_map = { "uptrend": "bull", "downtrend": "bear", "sideways": "sideways", } # Map vol labels to short names vol_map = { "low_vol": "calm", "medium_vol": "moderate", "high_vol": "volatile", } composite = pd.Series( np.nan, index=vol_labels.index, name="composite_regime", dtype=object, ) both_valid = vol_labels.notna() & trend_labels.notna() trend_short = trend_labels[both_valid].map(trend_map) vol_short = vol_labels[both_valid].map(vol_map) composite.loc[both_valid] = trend_short.astype(str) + "_" + vol_short.astype(str) return composite
# --------------------------------------------------------------------------- # Regime duration analysis # ---------------------------------------------------------------------------
[docs] def regime_duration_analysis( states: pd.Series | np.ndarray, ) -> dict[str, Any]: """Analyse how long each regime typically lasts. Computes the survival function, hazard rate, and expected remaining duration for each regime. This helps answer questions like "we've been in a bull regime for 60 days -- how much longer can we expect it to last?" **Interpretation guidance:** - **survival_curve[k]**: Probability that a regime-*k* spell lasts at least *d* periods. A slowly-decaying curve means the regime tends to persist. - **hazard_rate[k]**: Instantaneous probability of exiting regime *k* after having been in it for *d* periods. If the hazard rate is approximately constant, regime duration is memoryless (geometric distribution, consistent with Markov). An *increasing* hazard rate means longer spells are more likely to end soon. - **expected_remaining[k]**: Given that we are currently in regime *k* and have been for *d* periods, how many more periods should we expect? Computed from the empirical survival function. Parameters: states: Integer regime labels, shape ``(T,)``. Returns: Dictionary with: - **durations** (dict[int, list[int]]): List of spell durations for each regime. - **survival_curve** (dict[int, pd.Series]): Kaplan-Meier style survival curve for each regime, indexed by duration. - **hazard_rate** (dict[int, pd.Series]): Empirical hazard rate for each regime, indexed by duration. - **expected_remaining** (dict[int, pd.Series]): Expected remaining duration conditional on having survived *d* periods, indexed by duration. - **summary** (pd.DataFrame): Per-regime summary with ``mean_duration``, ``median_duration``, ``max_duration``, ``n_spells``. Example: >>> states = np.array([0]*50 + [1]*30 + [0]*80 + [1]*40) >>> result = regime_duration_analysis(states) >>> print(result["summary"]) >>> # Survival curve for regime 0 >>> print(result["survival_curve"][0]) See Also: regime_stability_score: Composite stability metric. composite_regime_labels: Generate regime labels to analyse. """ s = np.asarray(states, dtype=int).flatten() T = len(s) unique_states = sorted(np.unique(s)) # Extract spell durations durations: dict[int, list[int]] = {int(k): [] for k in unique_states} current_state = int(s[0]) current_len = 1 for t in range(1, T): if int(s[t]) == current_state: current_len += 1 else: durations[current_state].append(current_len) current_state = int(s[t]) current_len = 1 durations[current_state].append(current_len) # Survival curves, hazard rates, expected remaining duration survival_curves: dict[int, pd.Series] = {} hazard_rates: dict[int, pd.Series] = {} expected_remaining: dict[int, pd.Series] = {} summary_records = [] for k in unique_states: k = int(k) durs = durations[k] if not durs: survival_curves[k] = pd.Series(dtype=float) hazard_rates[k] = pd.Series(dtype=float) expected_remaining[k] = pd.Series(dtype=float) summary_records.append({ "regime": k, "mean_duration": 0.0, "median_duration": 0.0, "max_duration": 0, "n_spells": 0, }) continue max_dur = max(durs) n_spells = len(durs) # Kaplan-Meier style survival: S(d) = P(duration >= d) surv = np.zeros(max_dur + 1) for d in range(max_dur + 1): surv[d] = sum(1 for dur in durs if dur >= d) / n_spells surv_series = pd.Series( surv, index=range(max_dur + 1), name=f"survival_{k}", ) survival_curves[k] = surv_series # Hazard rate: h(d) = P(exit at d | survived to d) # h(d) = (S(d) - S(d+1)) / S(d) hazard = np.zeros(max_dur) for d in range(max_dur): if surv[d] > 0: hazard[d] = (surv[d] - surv[d + 1]) / surv[d] else: hazard[d] = 0.0 hazard_rates[k] = pd.Series( hazard, index=range(max_dur), name=f"hazard_{k}", ) # Expected remaining duration given survival to d: # E[remaining | survived d] = sum_{j=d}^{max} S(j) / S(d) - 1 # (using discrete version) exp_rem = np.zeros(max_dur + 1) for d in range(max_dur + 1): if surv[d] > 0: exp_rem[d] = sum(surv[j] for j in range(d, max_dur + 1)) / surv[d] else: exp_rem[d] = 0.0 expected_remaining[k] = pd.Series( exp_rem, index=range(max_dur + 1), name=f"expected_remaining_{k}", ) summary_records.append({ "regime": k, "mean_duration": float(np.mean(durs)), "median_duration": float(np.median(durs)), "max_duration": int(max_dur), "n_spells": n_spells, }) summary = pd.DataFrame(summary_records).set_index("regime") return { "durations": durations, "survival_curve": survival_curves, "hazard_rate": hazard_rates, "expected_remaining": expected_remaining, "summary": summary, }
# --------------------------------------------------------------------------- # Internal helpers # --------------------------------------------------------------------------- def _vol_level_names(n: int) -> list[str]: """Generate volatility level names for *n* levels.""" if n == 2: return ["low_vol", "high_vol"] elif n == 3: return ["low_vol", "medium_vol", "high_vol"] else: return [f"vol_level_{i}" for i in range(n)]