"""Financial ML pipeline utilities.
Provides chronology-aware pipeline wrappers, walk-forward backtesting with
PnL tracking, and SHAP-based feature importance -- all designed to prevent
data leakage that is rampant in naive ML-for-finance workflows.
"""
from __future__ import annotations
from typing import Any, Sequence
import numpy as np
import pandas as pd
from wraquant.core.decorators import requires_extra
__all__ = [
"FinancialPipeline",
"walk_forward_backtest",
"feature_importance_shap",
]
# ---------------------------------------------------------------------------
# FinancialPipeline
# ---------------------------------------------------------------------------
[docs]
class FinancialPipeline:
"""Sklearn Pipeline wrapper that enforces chronological splitting.
Standard sklearn ``Pipeline`` + ``cross_val_score`` uses random K-Fold
which leaks future information into the training set.
``FinancialPipeline`` wraps an sklearn ``Pipeline`` and replaces all
cross-validation with purged K-fold that respects time ordering and
applies an embargo window to prevent information leakage through
overlapping labels.
Parameters
----------
steps : list[tuple[str, estimator]]
List of ``(name, transform)`` tuples defining the pipeline,
identical to the ``steps`` parameter of ``sklearn.pipeline.Pipeline``.
n_splits : int
Number of folds for purged K-fold cross-validation.
embargo_pct : float
Fraction of total samples to embargo after each test fold,
preventing label leakage from overlapping targets.
Example
-------
>>> from sklearn.preprocessing import StandardScaler
>>> from sklearn.linear_model import Ridge
>>> import numpy as np
>>> X = np.random.randn(500, 5)
>>> y = X @ np.array([1, 0.5, 0, 0, 0]) + np.random.randn(500) * 0.1
>>> pipe = FinancialPipeline(
... steps=[('scaler', StandardScaler()), ('ridge', Ridge())],
... n_splits=5,
... )
>>> result = pipe.fit_evaluate(X, y)
>>> len(result['fold_scores']) == 5
True
References
----------
- Lopez de Prado (2018), "Advances in Financial Machine Learning", Ch. 7
"""
[docs]
def __init__(
self,
steps: list[tuple[str, Any]],
n_splits: int = 5,
embargo_pct: float = 0.01,
) -> None:
self.steps = steps
self.n_splits = n_splits
self.embargo_pct = embargo_pct
self._pipeline: Any = None
def _build_pipeline(self) -> Any:
"""Build the underlying sklearn Pipeline."""
from sklearn.pipeline import Pipeline
return Pipeline(self.steps)
[docs]
def fit(
self,
X: pd.DataFrame | np.ndarray,
y: pd.Series | np.ndarray,
) -> FinancialPipeline:
"""Fit the pipeline on the full dataset.
Parameters
----------
X : pd.DataFrame or np.ndarray
Feature matrix.
y : pd.Series or np.ndarray
Target vector.
Returns
-------
FinancialPipeline
Self, for method chaining.
"""
self._pipeline = self._build_pipeline()
self._pipeline.fit(np.asarray(X), np.asarray(y))
return self
[docs]
def predict(self, X: pd.DataFrame | np.ndarray) -> np.ndarray:
"""Generate predictions using the fitted pipeline.
Parameters
----------
X : pd.DataFrame or np.ndarray
Feature matrix.
Returns
-------
np.ndarray
Predictions.
"""
if self._pipeline is None:
raise RuntimeError("Pipeline has not been fitted. Call fit() first.")
return np.asarray(self._pipeline.predict(np.asarray(X)))
[docs]
def fit_evaluate(
self,
X: pd.DataFrame | np.ndarray,
y: pd.Series | np.ndarray,
) -> dict[str, Any]:
"""Fit with purged K-fold cross-validation and return results.
Uses purged K-fold splitting to evaluate the pipeline without data
leakage. After cross-validation, fits the pipeline on the full
dataset.
Parameters
----------
X : pd.DataFrame or np.ndarray
Feature matrix.
y : pd.Series or np.ndarray
Target vector.
Returns
-------
dict
``fold_scores``: list of per-fold R-squared scores,
``mean_score``: float mean of fold scores,
``std_score``: float std of fold scores,
``pipeline``: the fitted sklearn Pipeline.
"""
from sklearn.base import clone
from sklearn.metrics import r2_score
from wraquant.ml.preprocessing import purged_kfold
X_arr = np.asarray(X)
y_arr = np.asarray(y)
fold_scores: list[float] = []
for train_idx, test_idx in purged_kfold(
X_arr, y_arr, n_splits=self.n_splits, embargo_pct=self.embargo_pct
):
pipe = clone(self._build_pipeline())
pipe.fit(X_arr[train_idx], y_arr[train_idx])
preds = pipe.predict(X_arr[test_idx])
score = float(r2_score(y_arr[test_idx], preds))
fold_scores.append(score)
# Fit on full data
self._pipeline = self._build_pipeline()
self._pipeline.fit(X_arr, y_arr)
return {
"fold_scores": fold_scores,
"mean_score": float(np.mean(fold_scores)),
"std_score": float(np.std(fold_scores)),
"pipeline": self._pipeline,
}
# ---------------------------------------------------------------------------
# Walk-forward backtest
# ---------------------------------------------------------------------------
[docs]
def walk_forward_backtest(
model: Any,
X: pd.DataFrame | np.ndarray,
y: pd.Series | np.ndarray,
train_size: int = 252,
test_size: int = 21,
step_size: int = 21,
expanding: bool = True,
) -> dict[str, Any]:
"""Full walk-forward ML backtest with PnL tracking.
Walk-forward validation is the gold standard for evaluating ML models in
finance because it mirrors real trading: train on historical data,
predict the next period, observe actual outcome, then advance.
Why walk-forward instead of standard cross-validation?
Standard K-Fold CV randomly shuffles observations, allowing the model
to "peek" at future data during training. In finance, this creates
massive upward bias in performance estimates. Walk-forward enforces
strict temporal ordering: the model only ever trains on data that
would have been available at the time of prediction.
The function supports both expanding windows (training set grows over
time, using all available history) and rolling windows (fixed-size
training window that slides forward). Expanding windows are preferred
when you believe the data-generating process is stable; rolling windows
are better when you expect structural breaks or regime changes.
Parameters
----------
model : estimator
A scikit-learn-compatible estimator with ``fit`` and ``predict``.
X : pd.DataFrame or np.ndarray
Feature matrix.
y : pd.Series or np.ndarray
Target vector (typically forward returns for PnL calculation).
train_size : int
Number of training observations in the initial window.
test_size : int
Number of test observations per fold.
step_size : int
Number of observations to advance between folds.
expanding : bool
If True, the training window expands over time. If False, a
rolling window of fixed ``train_size`` is used.
Returns
-------
dict
``predictions``: np.ndarray of concatenated out-of-sample predictions,
``actuals``: np.ndarray of corresponding true values,
``pnl``: np.ndarray of per-period PnL (prediction * actual, assuming
long when prediction > 0),
``sharpe``: float annualised Sharpe ratio of the PnL series
(assuming 252 trading days),
``hit_rate``: float fraction of periods where prediction sign
matches actual sign,
``equity_curve``: np.ndarray cumulative PnL.
Example
-------
>>> from sklearn.linear_model import Ridge
>>> import numpy as np
>>> np.random.seed(42)
>>> X = np.random.randn(600, 5)
>>> y = X @ np.array([0.5, 0.3, 0, 0, 0]) + np.random.randn(600) * 0.5
>>> result = walk_forward_backtest(Ridge(), X, y, train_size=200, test_size=20)
>>> len(result['predictions']) > 0
True
>>> 'sharpe' in result
True
References
----------
- Lopez de Prado (2018), "Advances in Financial Machine Learning", Ch. 12
- Bailey et al. (2014), "The Deflated Sharpe Ratio"
"""
from sklearn.base import clone
X_arr = np.asarray(X)
y_arr = np.asarray(y)
n = len(X_arr)
all_preds: list[np.ndarray] = []
all_actuals: list[np.ndarray] = []
start = 0
while start + train_size + test_size <= n:
train_start = 0 if expanding else start
train_end = start + train_size
test_end = min(train_end + test_size, n)
X_train = X_arr[train_start:train_end]
y_train = y_arr[train_start:train_end]
X_test = X_arr[train_end:test_end]
y_test = y_arr[train_end:test_end]
m = clone(model)
m.fit(X_train, y_train)
preds = np.asarray(m.predict(X_test))
all_preds.append(preds)
all_actuals.append(np.asarray(y_test))
start += step_size
if not all_preds:
return {
"predictions": np.array([]),
"actuals": np.array([]),
"pnl": np.array([]),
"sharpe": 0.0,
"hit_rate": 0.0,
"equity_curve": np.array([]),
}
predictions = np.concatenate(all_preds)
actuals = np.concatenate(all_actuals)
# PnL: long when prediction > 0, short when prediction < 0
pnl = np.sign(predictions) * actuals
# Sharpe ratio — delegate to canonical implementation
from wraquant.risk.metrics import sharpe_ratio as _sharpe_ratio
sharpe = _sharpe_ratio(pd.Series(pnl))
# Hit rate: fraction of correct sign predictions
hit_rate = float(np.mean(np.sign(predictions) == np.sign(actuals)))
# Equity curve
equity_curve = np.cumsum(pnl)
return {
"predictions": predictions,
"actuals": actuals,
"pnl": pnl,
"sharpe": sharpe,
"hit_rate": hit_rate,
"equity_curve": equity_curve,
}
# ---------------------------------------------------------------------------
# SHAP feature importance
# ---------------------------------------------------------------------------
[docs]
@requires_extra("ml")
def feature_importance_shap(
model: Any,
X: pd.DataFrame | np.ndarray,
feature_names: Sequence[str] | None = None,
max_samples: int = 500,
) -> dict[str, Any]:
"""Compute SHAP-based feature importance for any sklearn model.
SHAP (SHapley Additive exPlanations) values provide a theoretically
grounded decomposition of each prediction into per-feature contributions.
Unlike impurity-based importance (MDI), SHAP values are consistent and
account for feature interactions.
Parameters
----------
model : estimator
A fitted scikit-learn-compatible estimator.
X : pd.DataFrame or np.ndarray
Feature matrix to explain (typically the test set).
feature_names : Sequence[str] or None
Feature names. If None and X is a DataFrame, column names are used.
max_samples : int
Maximum number of samples to use for computing SHAP values.
Subsampled if X has more rows than this.
Returns
-------
dict
``shap_values``: np.ndarray of shape ``(n_samples, n_features)``
containing per-sample SHAP values,
``feature_importance``: np.ndarray of shape ``(n_features,)``
giving mean absolute SHAP value per feature (sorted descending),
``feature_names``: list of feature names ordered by importance.
Raises
------
MissingDependencyError
If shap is not installed.
Example
-------
>>> from sklearn.ensemble import RandomForestRegressor
>>> import numpy as np
>>> np.random.seed(42)
>>> X = np.random.randn(200, 5)
>>> y = X[:, 0] * 2 + X[:, 1] + np.random.randn(200) * 0.1
>>> model = RandomForestRegressor(n_estimators=50, random_state=42)
>>> model.fit(X, y)
RandomForestRegressor(n_estimators=50, random_state=42)
>>> result = feature_importance_shap(model, X)
>>> result["shap_values"].shape[1] == 5
True
References
----------
- Lundberg & Lee (2017), "A Unified Approach to Interpreting Model
Predictions"
"""
try:
import shap
except ImportError:
raise ImportError(
"shap is required for SHAP feature importance but is not installed. "
"Install it with: pip install shap"
)
X_arr = np.asarray(X)
if feature_names is None:
if isinstance(X, pd.DataFrame):
feature_names = list(X.columns)
else:
feature_names = [f"feature_{i}" for i in range(X_arr.shape[1])]
# Subsample if needed
if len(X_arr) > max_samples:
rng = np.random.default_rng(42)
idx = rng.choice(len(X_arr), size=max_samples, replace=False)
X_sample = X_arr[idx]
else:
X_sample = X_arr
# Use KernelExplainer for model-agnostic SHAP values
# Use a small background dataset for efficiency
bg_size = min(100, len(X_sample))
background = shap.kmeans(X_sample, bg_size)
explainer = shap.KernelExplainer(model.predict, background)
shap_values = explainer.shap_values(X_sample)
shap_values = np.asarray(shap_values)
# Mean absolute SHAP value per feature
mean_abs_shap = np.mean(np.abs(shap_values), axis=0)
# Sort by importance
sort_idx = np.argsort(mean_abs_shap)[::-1]
sorted_names = [feature_names[i] for i in sort_idx]
sorted_importance = mean_abs_shap[sort_idx]
return {
"shap_values": shap_values,
"feature_importance": sorted_importance,
"feature_names": sorted_names,
}