"""Financial data preprocessing utilities.
Implements purged cross-validation, fractional differentiation, and
random-matrix-theory denoising -- all central to the *Advances in Financial
Machine Learning* workflow (Lopez de Prado).
"""
from __future__ import annotations
from itertools import combinations
from typing import Generator
import numpy as np
import pandas as pd
from scipy.optimize import minimize_scalar
from wraquant.core._coerce import coerce_dataframe, coerce_series
__all__ = [
"purged_kfold",
"combinatorial_purged_kfold",
"fractional_differentiation",
"denoised_correlation",
"detoned_correlation",
]
# ---------------------------------------------------------------------------
# Purged K-Fold cross-validation
# ---------------------------------------------------------------------------
[docs]
def purged_kfold(
X: pd.DataFrame | np.ndarray,
y: pd.Series | np.ndarray,
n_splits: int = 5,
embargo_pct: float = 0.01,
) -> Generator[tuple[np.ndarray, np.ndarray], None, None]:
"""Purged K-Fold cross-validation.
Use purged K-fold instead of standard K-fold whenever your labels
overlap in time (e.g., forward returns computed over a window).
Standard K-fold leaks future information because a training sample's
label may depend on prices that appear in the test set. Purging
removes an embargo zone after each test fold to break this leakage.
Ensures that training observations that immediately follow a test
observation are removed (embargo) so that information cannot leak
through overlapping labels.
Parameters
----------
X : pd.DataFrame or np.ndarray
Feature matrix (only its length is used).
y : pd.Series or np.ndarray
Target vector (only its length is used).
n_splits : int
Number of folds.
embargo_pct : float
Fraction of total samples to embargo after each test fold.
For daily data with 5-day forward labels, ``0.01`` embargoes
~2.5 days on a 252-sample dataset.
Yields
------
tuple[np.ndarray, np.ndarray]
``(train_indices, test_indices)`` for each fold.
Example
-------
>>> import numpy as np
>>> X = np.random.randn(500, 3)
>>> y = np.random.randn(500)
>>> folds = list(purged_kfold(X, y, n_splits=5, embargo_pct=0.02))
>>> len(folds)
5
>>> train_idx, test_idx = folds[0]
>>> len(train_idx) + len(test_idx) < 500 # embargo removes some samples
True
References
----------
- Lopez de Prado (2018), "Advances in Financial Machine Learning", Ch. 7
See Also
--------
combinatorial_purged_kfold : Generates all C(n, k) purged splits.
wraquant.ml.pipeline.FinancialPipeline : Pipeline that uses purged K-fold.
"""
n_samples = len(X)
embargo_size = int(n_samples * embargo_pct)
indices = np.arange(n_samples)
fold_sizes = np.full(n_splits, n_samples // n_splits, dtype=int)
fold_sizes[: n_samples % n_splits] += 1
current = 0
for fold_size in fold_sizes:
test_start = current
test_end = current + fold_size
test_idx = indices[test_start:test_end]
# Embargo zone immediately after the test set
embargo_end = min(test_end + embargo_size, n_samples)
train_mask = np.ones(n_samples, dtype=bool)
train_mask[test_start:embargo_end] = False
train_idx = indices[train_mask]
yield train_idx, test_idx
current = test_end
[docs]
def combinatorial_purged_kfold(
X: pd.DataFrame | np.ndarray,
y: pd.Series | np.ndarray,
n_splits: int = 5,
n_test_splits: int = 2,
embargo_pct: float = 0.01,
) -> Generator[tuple[np.ndarray, np.ndarray], None, None]:
"""Combinatorial purged K-Fold cross-validation.
Use combinatorial purged K-fold when you need more backtest paths
than standard purged K-fold provides. By choosing ``n_test_splits``
groups as the test set from ``n_splits`` total groups, this generates
C(n_splits, n_test_splits) distinct train/test splits -- each with
an embargo to prevent information leakage.
Generates all C(n_splits, n_test_splits) train/test combinations,
applying an embargo after each test group to prevent leakage.
Parameters
----------
X : pd.DataFrame or np.ndarray
Feature matrix.
y : pd.Series or np.ndarray
Target vector.
n_splits : int
Total number of groups.
n_test_splits : int
Number of groups held out for testing in each split.
embargo_pct : float
Fraction of total samples to embargo after each test group.
Yields
------
tuple[np.ndarray, np.ndarray]
``(train_indices, test_indices)`` for each combination.
Example
-------
>>> import numpy as np
>>> X = np.random.randn(500, 3)
>>> y = np.random.randn(500)
>>> folds = list(combinatorial_purged_kfold(X, y, n_splits=5, n_test_splits=2))
>>> len(folds) # C(5, 2) = 10
10
References
----------
- Lopez de Prado (2018), "Advances in Financial Machine Learning", Ch. 12
See Also
--------
purged_kfold : Simpler purged K-fold with n_splits folds.
"""
n_samples = len(X)
embargo_size = int(n_samples * embargo_pct)
indices = np.arange(n_samples)
# Build group boundaries
fold_sizes = np.full(n_splits, n_samples // n_splits, dtype=int)
fold_sizes[: n_samples % n_splits] += 1
group_bounds: list[tuple[int, int]] = []
current = 0
for fs in fold_sizes:
group_bounds.append((current, current + fs))
current += fs
for test_groups in combinations(range(n_splits), n_test_splits):
test_mask = np.zeros(n_samples, dtype=bool)
embargo_mask = np.zeros(n_samples, dtype=bool)
for g in test_groups:
start, end = group_bounds[g]
test_mask[start:end] = True
embargo_end = min(end + embargo_size, n_samples)
embargo_mask[end:embargo_end] = True
train_mask = ~(test_mask | embargo_mask)
yield indices[train_mask], indices[test_mask]
# ---------------------------------------------------------------------------
# Fractional differentiation
# ---------------------------------------------------------------------------
def _frac_diff_weights(d: float, threshold: float) -> np.ndarray:
"""Compute the weights for fractional differentiation.
Weights are generated until they fall below *threshold*.
"""
weights = [1.0]
k = 1
while True:
w = -weights[-1] * (d - k + 1) / k
if abs(w) < threshold:
break
weights.append(w)
k += 1
return np.array(weights[::-1], dtype=np.float64)
[docs]
def fractional_differentiation(
series: pd.Series,
d: float = 0.5,
threshold: float = 1e-5,
) -> pd.Series:
"""Fractionally differentiate a time series.
Use fractional differentiation to make a price or factor series
stationary (required by many ML models) while retaining as much
memory (long-range dependence) as possible. Standard first
differencing (d=1) makes the series stationary but destroys all
memory. Fractional differencing with d=0.3-0.5 achieves stationarity
while preserving most of the signal.
Applies the fractional differentiation operator of order *d*
(Hosking, 1981) to obtain a (near-)stationary series while
preserving long-range memory.
The operator is defined as:
(1 - B)^d = sum_{k=0}^{inf} C(d,k) * (-B)^k
where B is the backshift operator and C(d,k) are the binomial-like
weights.
Parameters
----------
series : pd.Series
Input time series (e.g., log prices).
d : float
Fractional differentiation order (0 < d < 1 for partial
differentiation; d = 1 is the standard first difference).
Start with d=0.5 and decrease until the ADF test rejects at
the desired significance level.
threshold : float
Minimum absolute weight to retain. Smaller values use more
lagged observations but increase computational cost.
Returns
-------
pd.Series
Fractionally differentiated series (initial rows where the full
convolution is not available are dropped). Test stationarity
with an ADF test; if non-stationary, increase *d*.
Example
-------
>>> import pandas as pd, numpy as np
>>> np.random.seed(42)
>>> prices = pd.Series(100 + np.cumsum(np.random.randn(300) * 0.5),
... name='close')
>>> frac_diff = fractional_differentiation(prices, d=0.4)
>>> len(frac_diff) < len(prices) # initial rows dropped
True
>>> frac_diff.std() > 0 # non-trivial output
True
References
----------
- Hosking (1981), "Fractional Differencing"
- Lopez de Prado (2018), "Advances in Financial Machine Learning", Ch. 5
See Also
--------
denoised_correlation : Random Matrix Theory denoising.
"""
series = coerce_series(series, name="series")
weights = _frac_diff_weights(d, threshold)
width = len(weights)
values = series.values.astype(float)
result = np.full(len(values), np.nan)
for i in range(width - 1, len(values)):
result[i] = np.dot(weights, values[i - width + 1 : i + 1])
out = pd.Series(result, index=series.index, name=series.name)
return out.dropna()
# ---------------------------------------------------------------------------
# Random Matrix Theory denoising
# ---------------------------------------------------------------------------
def _marchenko_pastur_pdf(
x: np.ndarray,
q: float,
sigma2: float = 1.0,
) -> np.ndarray:
"""Marchenko-Pastur probability density function."""
lambda_plus = sigma2 * (1 + np.sqrt(1 / q)) ** 2
lambda_minus = sigma2 * (1 - np.sqrt(1 / q)) ** 2
pdf = np.zeros_like(x)
mask = (x >= lambda_minus) & (x <= lambda_plus)
pdf[mask] = (
q
/ (2 * np.pi * sigma2)
* np.sqrt((lambda_plus - x[mask]) * (x[mask] - lambda_minus))
/ x[mask]
)
return pdf
def _fit_mp_sigma(eigenvalues: np.ndarray, q: float) -> float:
"""Fit sigma^2 to the bulk of eigenvalues via KS-like minimisation."""
def _neg_loglik(sigma2: float) -> float:
if sigma2 <= 0:
return 1e12
lambda_plus = sigma2 * (1 + np.sqrt(1 / q)) ** 2
# Count eigenvalues below lambda+
n_below = int(np.sum(eigenvalues <= lambda_plus))
expected = int(
len(eigenvalues) * min(1.0, 1.0 / q) if q >= 1 else len(eigenvalues)
)
return float((n_below - expected) ** 2)
res = minimize_scalar(_neg_loglik, bounds=(0.01, 5.0), method="bounded")
return float(res.x)
[docs]
def denoised_correlation(
returns: pd.DataFrame,
n_components: int | None = None,
) -> np.ndarray:
"""Denoise a correlation matrix using Random Matrix Theory.
Use denoised correlation before portfolio optimization or clustering
to remove noise eigenvalues that arise from finite-sample estimation.
When T/N (observations/assets) is not large, the sample correlation
matrix contains substantial noise. RMT denoising replaces eigenvalues
consistent with random noise (Marchenko-Pastur distribution) with
their average, producing a cleaner matrix that leads to more stable
portfolio weights.
Eigenvalues that fall within the Marchenko-Pastur distribution are
replaced by their average, shrinking noise while preserving signal.
Parameters
----------
returns : pd.DataFrame
T x N return matrix (rows = observations, columns = assets).
n_components : int or None
Number of signal eigenvalues to keep. If ``None``, they are
determined automatically from the Marchenko-Pastur bound.
Returns
-------
np.ndarray
Denoised correlation matrix of shape ``(N, N)``. The matrix is
symmetric, positive semi-definite, and has unit diagonal. Use
it in place of ``returns.corr()`` for portfolio optimization.
Example
-------
>>> import pandas as pd, numpy as np
>>> np.random.seed(42)
>>> returns = pd.DataFrame(np.random.randn(252, 10) * 0.01)
>>> clean_corr = denoised_correlation(returns)
>>> clean_corr.shape
(10, 10)
>>> np.allclose(np.diag(clean_corr), 1.0) # unit diagonal
True
Notes
-----
The Marchenko-Pastur upper bound is:
lambda_+ = sigma^2 * (1 + sqrt(N/T))^2
Eigenvalues above this threshold are retained as "signal"; those
below are replaced.
References
----------
- Laloux et al. (1999), "Noise dressing of financial correlation
matrices"
- Lopez de Prado (2018), "Advances in Financial Machine Learning", Ch. 2
See Also
--------
detoned_correlation : Remove the market mode from a correlation matrix.
"""
returns = coerce_dataframe(returns, name="returns")
corr = np.array(returns.corr())
t, n = returns.shape
q = t / n
eigenvalues, eigenvectors = np.linalg.eigh(corr)
# Sort descending
idx = np.argsort(eigenvalues)[::-1]
eigenvalues = eigenvalues[idx]
eigenvectors = eigenvectors[:, idx]
if n_components is None:
sigma2 = _fit_mp_sigma(eigenvalues, q)
lambda_plus = sigma2 * (1 + np.sqrt(1 / q)) ** 2
n_components = int(np.sum(eigenvalues > lambda_plus))
n_components = max(n_components, 1)
# Replace noise eigenvalues with their mean
noise_eigenvalues = eigenvalues[n_components:]
if len(noise_eigenvalues) > 0:
noise_mean = noise_eigenvalues.mean()
eigenvalues[n_components:] = noise_mean
# Reconstruct
diag = np.diag(eigenvalues)
corr_denoised = eigenvectors @ diag @ eigenvectors.T
# Rescale to unit diagonal
d = np.sqrt(np.diag(corr_denoised))
d[d == 0] = 1.0
corr_denoised = corr_denoised / np.outer(d, d)
np.fill_diagonal(corr_denoised, 1.0)
return corr_denoised
[docs]
def detoned_correlation(
corr: np.ndarray,
n_components: int = 1,
) -> np.ndarray:
"""Remove the first *n_components* eigenvectors (market mode) from a
correlation matrix.
Use detoned correlation when you want to uncover residual co-movement
structure after removing the dominant market factor. The first
eigenvector of asset returns typically represents the "market mode"
(all assets moving together). Removing it reveals sector, style, or
idiosyncratic clustering that is hidden when the market factor
dominates. This is particularly useful before hierarchical
clustering or community detection.
Parameters
----------
corr : np.ndarray
Correlation matrix of shape ``(N, N)``.
n_components : int
Number of leading eigenvalues/vectors to remove (default 1,
which removes only the market factor).
Returns
-------
np.ndarray
De-toned correlation matrix of shape ``(N, N)``. The matrix is
symmetric with unit diagonal but is *not* positive definite
(some eigenvalues are set to zero).
Example
-------
>>> import numpy as np
>>> np.random.seed(42)
>>> corr = np.corrcoef(np.random.randn(5, 252))
>>> detoned = detoned_correlation(corr, n_components=1)
>>> detoned.shape
(5, 5)
>>> np.allclose(np.diag(detoned), 1.0)
True
References
----------
- Lopez de Prado (2020), "Machine Learning for Asset Managers", Ch. 2
See Also
--------
denoised_correlation : Remove noise eigenvalues from a correlation matrix.
wraquant.ml.clustering.correlation_clustering : Cluster assets by correlation.
"""
eigenvalues, eigenvectors = np.linalg.eigh(corr)
idx = np.argsort(eigenvalues)[::-1]
eigenvalues = eigenvalues[idx]
eigenvectors = eigenvectors[:, idx]
# Zero-out the leading components
eigenvalues[:n_components] = 0.0
corr_detoned = eigenvectors @ np.diag(eigenvalues) @ eigenvectors.T
# Rescale to unit diagonal
d = np.sqrt(np.diag(corr_detoned))
d[d == 0] = 1.0
corr_detoned = corr_detoned / np.outer(d, d)
np.fill_diagonal(corr_detoned, 1.0)
return corr_detoned