Source code for wraquant.ml.clustering

"""Financial clustering methods.

Provides correlation-based asset clustering, market-regime detection, and
optimal-cluster-count selection.
"""

from __future__ import annotations

from typing import Any, Literal

import numpy as np
import pandas as pd
from scipy.cluster.hierarchy import fcluster, linkage
from scipy.spatial.distance import squareform

from wraquant.core._coerce import coerce_dataframe
from wraquant.core.decorators import requires_extra

__all__ = [
    "correlation_clustering",
    "regime_clustering",
    "optimal_clusters",
]


# ---------------------------------------------------------------------------
# Correlation-based clustering
# ---------------------------------------------------------------------------


[docs] def correlation_clustering( returns: pd.DataFrame, n_clusters: int | None = None, method: Literal["hierarchical", "spectral"] = "hierarchical", ) -> dict[str, Any]: """Cluster assets by their return correlations. Use correlation clustering to group assets that move together, which is useful for portfolio diversification (allocate across clusters), risk management (monitor cluster concentration), and statistical arbitrage (trade within-cluster mean-reversion). The correlation-based distance is ``d(i,j) = sqrt(0.5 * (1 - rho_ij))``, which maps perfect correlation to distance 0 and perfect negative correlation to distance 1. Parameters ---------- returns : pd.DataFrame T x N return matrix (rows = observations, columns = assets). n_clusters : int or None Number of clusters. If ``None`` the optimal number is chosen automatically (silhouette score for hierarchical, or defaults to ``3`` for spectral). method : {'hierarchical', 'spectral'} Clustering algorithm. Hierarchical uses Ward linkage and produces a dendrogram-compatible linkage matrix. Spectral uses the correlation matrix as affinity and finds clusters via eigenvalue decomposition. Returns ------- dict ``labels`` : np.ndarray Cluster assignment for each asset (0-indexed, length N). Assets with the same label belong to the same cluster. ``n_clusters`` : int Number of clusters found or specified. ``linkage_matrix`` : np.ndarray or None Linkage matrix (hierarchical only). Pass to ``scipy.cluster.hierarchy.dendrogram`` for visualization. Example ------- >>> import pandas as pd, numpy as np >>> np.random.seed(42) >>> # 3 groups of correlated assets >>> factor = np.random.randn(252, 3) >>> returns = pd.DataFrame( ... np.column_stack([factor[:, i % 3] + np.random.randn(252) * 0.5 ... for i in range(9)]), ... columns=[f'asset_{i}' for i in range(9)] ... ) >>> result = correlation_clustering(returns, n_clusters=3) >>> result['n_clusters'] 3 >>> len(result['labels']) == 9 True See Also -------- regime_clustering : Cluster time periods into regimes. optimal_clusters : Determine optimal cluster count. wraquant.ml.preprocessing.detoned_correlation : Remove market mode before clustering. """ returns = coerce_dataframe(returns, name="returns") corr = returns.corr().values n = corr.shape[0] if method == "hierarchical": # Distance = sqrt(0.5 * (1 - rho)) dist_matrix = np.sqrt(0.5 * (1 - corr)) np.fill_diagonal(dist_matrix, 0.0) # Ensure symmetry dist_matrix = (dist_matrix + dist_matrix.T) / 2.0 condensed = squareform(dist_matrix, checks=False) link = linkage(condensed, method="ward") if n_clusters is None: n_clusters = _optimal_k_from_linkage(dist_matrix, link, max_k=min(10, n)) labels = fcluster(link, t=n_clusters, criterion="maxclust") - 1 return { "labels": labels, "n_clusters": int(n_clusters), "linkage_matrix": link, } if method == "spectral": return _spectral_clustering(corr, n_clusters or 3) raise ValueError(f"Unknown method '{method}'; use 'hierarchical' or 'spectral'.")
def _optimal_k_from_linkage( dist_matrix: np.ndarray, link: np.ndarray, max_k: int = 10, ) -> int: """Find optimal number of clusters via silhouette score.""" from scipy.spatial.distance import squareform as _squareform condensed = _squareform(dist_matrix, checks=False) best_k = 2 best_score = -1.0 for k in range(2, max_k + 1): labels = fcluster(link, t=k, criterion="maxclust") if len(set(labels)) < 2: continue score = _silhouette_score_simple(condensed, labels, dist_matrix.shape[0]) if score > best_score: best_score = score best_k = k return best_k def _silhouette_score_simple( condensed_dist: np.ndarray, labels: np.ndarray, n: int, ) -> float: """Simplified silhouette score from a condensed distance matrix.""" full_dist = squareform(condensed_dist) unique_labels = np.unique(labels) if len(unique_labels) < 2: return -1.0 sil = np.zeros(n) for i in range(n): same = labels == labels[i] same[i] = False if same.sum() == 0: sil[i] = 0.0 continue a_i = full_dist[i, same].mean() b_i = np.inf for lab in unique_labels: if lab == labels[i]: continue other = labels == lab if other.sum() == 0: continue b_i = min(b_i, full_dist[i, other].mean()) sil[i] = (b_i - a_i) / max(a_i, b_i) if max(a_i, b_i) > 0 else 0.0 return float(sil.mean()) @requires_extra("ml") def _spectral_clustering( corr: np.ndarray, n_clusters: int, ) -> dict[str, Any]: """Spectral clustering on the correlation matrix.""" from sklearn.cluster import SpectralClustering # Shift correlation to [0, 1] for affinity affinity = (corr + 1.0) / 2.0 np.fill_diagonal(affinity, 1.0) sc = SpectralClustering( n_clusters=n_clusters, affinity="precomputed", random_state=42, assign_labels="kmeans", ) labels = sc.fit_predict(affinity) return { "labels": labels, "n_clusters": n_clusters, "linkage_matrix": None, } # --------------------------------------------------------------------------- # Regime clustering # ---------------------------------------------------------------------------
[docs] @requires_extra("ml") def regime_clustering( features: pd.DataFrame | np.ndarray, n_regimes: int = 2, method: Literal["gmm", "kmeans"] = "gmm", ) -> dict[str, Any]: """Cluster time periods into market regimes. Use regime clustering when you want to identify distinct market states (e.g., bull/bear, risk-on/risk-off, high/low volatility) from observable features without a pre-defined model. GMM is preferred because it assigns soft probabilities to each regime; KMeans provides hard assignments only. Parameters ---------- features : pd.DataFrame or np.ndarray Feature matrix where each row is a time observation. Common inputs include rolling volatility, returns, spreads, and VIX. n_regimes : int Number of regimes to identify (default 2, typical for risk-on/risk-off). method : {'gmm', 'kmeans'} Clustering algorithm. ``'gmm'`` (Gaussian Mixture Model) provides probabilistic assignments; ``'kmeans'`` provides hard assignments and is faster. Returns ------- dict ``labels`` : np.ndarray Regime assignment for each time period (0-indexed). ``n_regimes`` : int Number of regimes. ``model`` : object Fitted GaussianMixture or KMeans model. For GMM, call ``model.predict_proba(X)`` to get regime probabilities. Example ------- >>> import numpy as np, pandas as pd >>> np.random.seed(42) >>> vol = np.concatenate([np.random.randn(100) * 0.5 + 0.1, ... np.random.randn(100) * 0.5 + 0.3]) >>> features = pd.DataFrame({'vol': vol, 'vol_sq': vol ** 2}) >>> result = regime_clustering(features, n_regimes=2) >>> result['n_regimes'] 2 >>> len(result['labels']) == 200 True See Also -------- correlation_clustering : Cluster assets (cross-sectional). optimal_clusters : Find the optimal number of clusters/regimes. wraquant.regimes : HMM and Markov-switching regime detection. """ X = np.asarray(features) if X.ndim == 1: X = X.reshape(-1, 1) if method == "gmm": from sklearn.mixture import GaussianMixture gm = GaussianMixture(n_components=n_regimes, random_state=42) labels = gm.fit_predict(X) return {"labels": labels, "n_regimes": n_regimes, "model": gm} if method == "kmeans": from sklearn.cluster import KMeans km = KMeans(n_clusters=n_regimes, random_state=42, n_init=10) labels = km.fit_predict(X) return {"labels": labels, "n_regimes": n_regimes, "model": km} raise ValueError(f"Unknown method '{method}'; use 'gmm' or 'kmeans'.")
# --------------------------------------------------------------------------- # Optimal cluster count # ---------------------------------------------------------------------------
[docs] @requires_extra("ml") def optimal_clusters( data: pd.DataFrame | np.ndarray, max_k: int = 10, method: Literal["silhouette", "bic"] = "silhouette", ) -> int: """Determine the optimal number of clusters. Use this function before calling ``correlation_clustering`` or ``regime_clustering`` to select the number of clusters data-adaptively rather than guessing. Parameters ---------- data : pd.DataFrame or np.ndarray Feature matrix. max_k : int Maximum number of clusters to evaluate (default 10). method : {'silhouette', 'bic'} Selection criterion. ``'silhouette'`` uses the silhouette score with KMeans (higher is better, range [-1, 1]); ``'bic'`` uses the Bayesian Information Criterion with a Gaussian Mixture Model (lower is better). Silhouette is faster; BIC is more principled for probabilistic models. Returns ------- int Optimal number of clusters (between 2 and *max_k*). Use this value as ``n_clusters`` in ``correlation_clustering`` or ``n_regimes`` in ``regime_clustering``. Example ------- >>> import numpy as np >>> np.random.seed(42) >>> # Generate data with 3 natural clusters >>> data = np.vstack([np.random.randn(50, 2) + [0, 0], ... np.random.randn(50, 2) + [5, 5], ... np.random.randn(50, 2) + [10, 0]]) >>> k = optimal_clusters(data, max_k=6) >>> 2 <= k <= 6 True See Also -------- correlation_clustering : Cluster assets by correlation. regime_clustering : Cluster time periods into regimes. """ X = np.asarray(data) if X.ndim == 1: X = X.reshape(-1, 1) if method == "silhouette": from sklearn.cluster import KMeans from sklearn.metrics import silhouette_score best_k = 2 best_score = -1.0 for k in range(2, max_k + 1): km = KMeans(n_clusters=k, random_state=42, n_init=10) labels = km.fit_predict(X) if len(set(labels)) < 2: continue score = silhouette_score(X, labels) if score > best_score: best_score = score best_k = k return best_k if method == "bic": from sklearn.mixture import GaussianMixture best_k = 2 best_bic = np.inf for k in range(2, max_k + 1): gm = GaussianMixture(n_components=k, random_state=42) gm.fit(X) bic = gm.bic(X) if bic < best_bic: best_bic = bic best_k = k return best_k raise ValueError(f"Unknown method '{method}'; use 'silhouette' or 'bic'.")