Source code for wraquant.ml.models

"""Model wrappers for financial machine-learning workflows.

Functions that require scikit-learn are guarded by the
``@requires_extra('ml')`` decorator so that the rest of the package can
be imported without it.
"""

from __future__ import annotations

from typing import Any, Literal, Sequence

import numpy as np
import pandas as pd

from wraquant.core.decorators import requires_extra

__all__ = [
    "walk_forward_train",
    "ensemble_predict",
    "feature_importance_mdi",
    "feature_importance_mda",
    "sequential_feature_selection",
]


# ---------------------------------------------------------------------------
# Walk-forward analysis
# ---------------------------------------------------------------------------


[docs] @requires_extra("ml") def walk_forward_train( model: Any, X: pd.DataFrame | np.ndarray, y: pd.Series | np.ndarray, train_size: int = 252, test_size: int = 21, step_size: int = 21, ) -> dict[str, Any]: """Walk-forward (expanding or rolling window) analysis. Use walk-forward analysis to evaluate a model under realistic conditions where only past data is available for training at each step. This is the standard time-series cross-validation approach in quantitative finance, avoiding the look-ahead bias inherent in random K-fold splits. At each step the model is cloned (via scikit-learn's ``clone``), fitted on the training window, and used to predict the test window. Parameters ---------- model : estimator A scikit-learn-compatible estimator that implements ``fit`` and ``predict``. X : pd.DataFrame or np.ndarray Feature matrix. y : pd.Series or np.ndarray Target vector. train_size : int Number of training observations in the first window (default 252, approximately one trading year). test_size : int Number of test observations per fold (default 21, approximately one trading month). step_size : int Number of observations to step forward between folds. Returns ------- dict ``predictions`` : np.ndarray Concatenated out-of-sample predictions across all folds. ``actuals`` : np.ndarray Corresponding true values. Compare with predictions to measure forecast accuracy. ``test_indices`` : np.ndarray Original row indices for each prediction, useful for aligning results back to a DatetimeIndex. ``n_folds`` : int Number of walk-forward folds executed. Example ------- >>> from sklearn.linear_model import Ridge >>> import numpy as np, pandas as pd >>> np.random.seed(42) >>> X = pd.DataFrame(np.random.randn(500, 3), columns=['mom', 'vol', 'size']) >>> y = X['mom'] * 0.5 + np.random.randn(500) * 0.1 >>> result = walk_forward_train(Ridge(), X, y, train_size=252, test_size=21) >>> result['n_folds'] > 0 True >>> len(result['predictions']) == len(result['actuals']) True Notes ----- The window is *expanding* (all data from the start up to the current train end is used). For a rolling window, see ``wraquant.ml.pipeline.walk_forward_backtest`` which supports both modes. See Also -------- wraquant.ml.pipeline.walk_forward_backtest : Full walk-forward backtest with PnL. wraquant.ml.preprocessing.purged_kfold : Purged K-fold cross-validation. """ from sklearn.base import clone X_arr = np.asarray(X) y_arr = np.asarray(y) n = len(X_arr) all_preds: list[np.ndarray] = [] all_actuals: list[np.ndarray] = [] all_indices: list[np.ndarray] = [] n_folds = 0 start = 0 while start + train_size + test_size <= n: train_end = start + train_size test_end = min(train_end + test_size, n) X_train = X_arr[start:train_end] y_train = y_arr[start:train_end] X_test = X_arr[train_end:test_end] y_test = y_arr[train_end:test_end] m = clone(model) m.fit(X_train, y_train) preds = m.predict(X_test) all_preds.append(np.asarray(preds)) all_actuals.append(np.asarray(y_test)) all_indices.append(np.arange(train_end, test_end)) n_folds += 1 start += step_size return { "predictions": np.concatenate(all_preds) if all_preds else np.array([]), "actuals": np.concatenate(all_actuals) if all_actuals else np.array([]), "test_indices": ( np.concatenate(all_indices) if all_indices else np.array([], dtype=int) ), "n_folds": n_folds, }
# --------------------------------------------------------------------------- # Ensemble prediction # ---------------------------------------------------------------------------
[docs] def ensemble_predict( models: Sequence[Any], X: pd.DataFrame | np.ndarray, method: Literal["mean", "median", "vote"] = "mean", ) -> np.ndarray: """Generate ensemble predictions from multiple fitted models. Use ensemble prediction to combine several models (e.g., Ridge, Random Forest, Gradient Boosting) into a single, more robust forecast. Ensembles reduce variance and are standard practice in alpha research and competition-winning pipelines. Parameters ---------- models : Sequence Fitted scikit-learn-compatible estimators. Each must implement ``predict(X)``. X : pd.DataFrame or np.ndarray Feature matrix. method : {'mean', 'median', 'vote'} Aggregation method. ``'mean'`` and ``'median'`` average the raw predictions (best for regression); ``'vote'`` takes the mode (majority vote, best for classification). Returns ------- np.ndarray Aggregated predictions. For ``'mean'``/``'median'``, the values are continuous. For ``'vote'``, the values are discrete class labels. Example ------- >>> from sklearn.linear_model import Ridge, Lasso >>> import numpy as np >>> np.random.seed(0) >>> X_train = np.random.randn(200, 3) >>> y_train = X_train @ [1, 0.5, 0] + np.random.randn(200) * 0.1 >>> m1 = Ridge().fit(X_train, y_train) >>> m2 = Lasso(alpha=0.01).fit(X_train, y_train) >>> X_test = np.random.randn(50, 3) >>> preds = ensemble_predict([m1, m2], X_test, method='mean') >>> preds.shape (50,) See Also -------- walk_forward_train : Walk-forward evaluation for individual models. """ X_arr = np.asarray(X) preds = np.column_stack([np.asarray(m.predict(X_arr)) for m in models]) if method == "mean": return preds.mean(axis=1) if method == "median": return np.median(preds, axis=1) if method == "vote": from scipy.stats import mode as _mode result = _mode(preds, axis=1, keepdims=False) return np.asarray(result.mode).ravel() raise ValueError(f"Unknown method '{method}'; use 'mean', 'median', or 'vote'.")
# --------------------------------------------------------------------------- # Feature importance # ---------------------------------------------------------------------------
[docs] def feature_importance_mdi( model: Any, feature_names: Sequence[str], ) -> pd.Series: """Mean Decrease Impurity (MDI) feature importance. Use MDI as a fast, first-pass feature ranking after fitting a tree-based model. MDI measures how much each feature contributes to reducing node impurity (Gini for classification, variance for regression) across all trees. Reads ``model.feature_importances_`` (available on tree-based estimators after fitting) and returns a sorted ``pd.Series``. Parameters ---------- model : estimator A fitted tree-based estimator with a ``feature_importances_`` attribute (e.g. ``RandomForestClassifier``). feature_names : Sequence[str] Feature names corresponding to the columns of the training data. Returns ------- pd.Series Importance values indexed by feature name, sorted descending. Higher values indicate features that contributed more to splits. Values sum to 1.0 for scikit-learn tree ensembles. Example ------- >>> from sklearn.ensemble import RandomForestClassifier >>> import numpy as np >>> np.random.seed(42) >>> X = np.random.randn(300, 4) >>> y = (X[:, 0] > 0).astype(int) >>> rf = RandomForestClassifier(n_estimators=50, random_state=42).fit(X, y) >>> imp = feature_importance_mdi(rf, ['momentum', 'vol', 'size', 'value']) >>> imp.index[0] # most important feature 'momentum' Notes ----- MDI is biased toward high-cardinality and continuous features. For an unbiased alternative, use ``feature_importance_mda`` (permutation importance). See Also -------- feature_importance_mda : Permutation-based importance (unbiased). wraquant.ml.advanced.random_forest_importance : Combined RF fit + importance. """ importances = np.asarray(model.feature_importances_) series = pd.Series(importances, index=list(feature_names), name="mdi_importance") return series.sort_values(ascending=False)
[docs] @requires_extra("ml") def feature_importance_mda( model: Any, X: pd.DataFrame | np.ndarray, y: pd.Series | np.ndarray, feature_names: Sequence[str], n_repeats: int = 10, ) -> pd.Series: """Mean Decrease Accuracy (permutation importance). Use MDA when you need an unbiased estimate of feature importance that accounts for feature interactions and is not affected by cardinality bias. Unlike MDI, MDA evaluates on held-out data and directly measures how much predictive power is lost when a feature is shuffled. Repeatedly permutes each feature and measures the decrease in the model's score. Parameters ---------- model : estimator A fitted scikit-learn-compatible estimator. X : pd.DataFrame or np.ndarray Feature matrix (test or validation set). y : pd.Series or np.ndarray True labels. feature_names : Sequence[str] Feature names corresponding to columns of *X*. n_repeats : int Number of permutation repeats per feature. More repeats yield more stable estimates but increase runtime linearly. Returns ------- pd.Series Mean importance values indexed by feature name, sorted descending. Positive values indicate features whose permutation hurts the model score; negative values suggest noise features. Example ------- >>> from sklearn.ensemble import RandomForestClassifier >>> import numpy as np >>> np.random.seed(42) >>> X = np.random.randn(300, 4) >>> y = (X[:, 0] + 0.3 * X[:, 2] > 0).astype(int) >>> rf = RandomForestClassifier(n_estimators=50, random_state=42).fit(X, y) >>> imp = feature_importance_mda(rf, X, y, ['mom', 'vol', 'size', 'val']) >>> imp.iloc[0] > 0 # top feature has positive importance True Notes ----- MDA is model-agnostic and works with any estimator that exposes a ``score`` method. Correlated features share importance: permuting one leaves its correlated partner to compensate, so both appear less important than they truly are. References ---------- - Breiman (2001), "Random Forests" - Lopez de Prado (2018), "Advances in Financial Machine Learning", Ch. 8 See Also -------- feature_importance_mdi : Faster but biased impurity-based importance. wraquant.ml.pipeline.feature_importance_shap : SHAP-based importance. """ from sklearn.inspection import permutation_importance result = permutation_importance( model, np.asarray(X), np.asarray(y), n_repeats=n_repeats, random_state=42, ) series = pd.Series( result.importances_mean, index=list(feature_names), name="mda_importance", ) return series.sort_values(ascending=False)
# --------------------------------------------------------------------------- # Sequential feature selection # ---------------------------------------------------------------------------
[docs] @requires_extra("ml") def sequential_feature_selection( model: Any, X: pd.DataFrame | np.ndarray, y: pd.Series | np.ndarray, n_features: int = 5, direction: Literal["forward", "backward"] = "forward", cv: int = 5, ) -> list[str | int]: """Sequential (forward / backward) feature selection. Use sequential feature selection when you want to find a compact subset of features that maximises predictive performance. Forward selection greedily adds the best feature at each step; backward selection starts with all features and removes the least useful. Parameters ---------- model : estimator A scikit-learn-compatible estimator. X : pd.DataFrame or np.ndarray Feature matrix. y : pd.Series or np.ndarray Target vector. n_features : int Number of features to select. direction : {'forward', 'backward'} Selection direction. Forward is faster when ``n_features`` is small relative to total features; backward is faster when you want to drop only a few. cv : int Number of cross-validation folds. Returns ------- list[str | int] Selected feature names (if *X* is a DataFrame) or column indices. Example ------- >>> from sklearn.linear_model import Ridge >>> import numpy as np, pandas as pd >>> np.random.seed(42) >>> X = pd.DataFrame(np.random.randn(200, 6), ... columns=['f1','f2','f3','f4','f5','f6']) >>> y = X['f1'] * 2 + X['f3'] + np.random.randn(200) * 0.1 >>> selected = sequential_feature_selection(Ridge(), X, y, n_features=2) >>> len(selected) 2 See Also -------- feature_importance_mdi : Impurity-based ranking (faster, less rigorous). feature_importance_mda : Permutation-based ranking. """ from sklearn.feature_selection import SequentialFeatureSelector sfs = SequentialFeatureSelector( model, n_features_to_select=n_features, direction=direction, cv=cv, ) sfs.fit(np.asarray(X), np.asarray(y)) support = sfs.get_support() if isinstance(X, pd.DataFrame): return list(X.columns[support]) return list(np.where(support)[0])