"""Advanced scikit-learn models for quantitative finance.
Provides production-ready wrappers around SVM, Random Forest, Gradient
Boosting, Gaussian Process, Isolation Forest, and PCA -- all with
finance-specific defaults, comprehensive docstrings, and clean return
interfaces.
All functions guard sklearn imports behind ``@requires_extra('ml')`` so the
rest of wraquant works without scikit-learn installed.
"""
from __future__ import annotations
from typing import Any, Literal, Sequence
import numpy as np
import pandas as pd
from wraquant.core.decorators import requires_extra
__all__ = [
"svm_classifier",
"random_forest_importance",
"gradient_boost_forecast",
"gaussian_process_regression",
"isolation_forest_anomaly",
"pca_factor_model",
]
# ---------------------------------------------------------------------------
# SVM classifier
# ---------------------------------------------------------------------------
[docs]
@requires_extra("ml")
def svm_classifier(
X_train: pd.DataFrame | np.ndarray,
y_train: pd.Series | np.ndarray,
X_test: pd.DataFrame | np.ndarray,
y_test: pd.Series | np.ndarray,
kernel: Literal["rbf", "linear", "poly"] = "rbf",
C_range: Sequence[float] = (0.1, 1.0, 10.0),
gamma_range: Sequence[float | str] = ("scale", 0.01, 0.1),
cv: int = 5,
) -> dict[str, Any]:
"""Train an SVM classifier for market regime classification.
Support Vector Machines find the maximum-margin hyperplane separating
classes. With the RBF kernel, SVMs can capture non-linear decision
boundaries in feature space, making them effective for classifying
market regimes (bull/bear/neutral) from derived features like
volatility, momentum, and volume profiles.
When to use:
Use SVM when you have a moderate number of features (5-100),
moderate dataset size (500-50k), and need robust classification
with good generalisation. SVMs handle high-dimensional spaces well
and are resistant to overfitting when C is properly tuned.
Mathematical background:
SVM solves:
min_{w,b} (1/2) ||w||^2 + C * sum_i max(0, 1 - y_i(w.x_i + b))
The RBF kernel maps inputs to infinite-dimensional space:
K(x, x') = exp(-gamma * ||x - x'||^2)
Grid search over C (regularisation) and gamma (kernel width)
selects the best hyperparameters via cross-validation.
Parameters
----------
X_train : pd.DataFrame or np.ndarray
Training feature matrix.
y_train : pd.Series or np.ndarray
Training labels (e.g., 1 = bull, 0 = neutral, -1 = bear).
X_test : pd.DataFrame or np.ndarray
Test feature matrix.
y_test : pd.Series or np.ndarray
Test labels.
kernel : {'rbf', 'linear', 'poly'}
SVM kernel function.
C_range : Sequence[float]
Regularisation parameter values to search.
gamma_range : Sequence[float | str]
Kernel coefficient values to search (ignored for linear kernel).
cv : int
Cross-validation folds for grid search.
Returns
-------
dict
``model``: fitted SVC,
``predictions``: np.ndarray of test predictions,
``accuracy``: float,
``confusion_matrix``: np.ndarray,
``best_params``: dict of best C and gamma,
``cv_score``: float (mean CV accuracy).
Example
-------
>>> import numpy as np
>>> X = np.random.randn(200, 5)
>>> y = (X[:, 0] > 0).astype(int)
>>> result = svm_classifier(X[:150], y[:150], X[150:], y[150:])
>>> result["accuracy"] > 0.5
True
Caveats
-------
- Scale features before training (StandardScaler recommended).
- SVMs are O(n^2) in memory and O(n^3) in time -- avoid for n > 100k.
- For imbalanced classes, set ``class_weight='balanced'`` on the SVC.
References
----------
- Cortes & Vapnik (1995), "Support-Vector Networks"
"""
from sklearn.metrics import accuracy_score, confusion_matrix
from sklearn.model_selection import GridSearchCV
from sklearn.svm import SVC
X_tr = np.asarray(X_train)
y_tr = np.asarray(y_train)
X_te = np.asarray(X_test)
y_te = np.asarray(y_test)
param_grid: dict[str, list[Any]] = {"C": list(C_range)}
if kernel != "linear":
param_grid["gamma"] = list(gamma_range)
svc = SVC(kernel=kernel, class_weight="balanced")
grid = GridSearchCV(svc, param_grid, cv=cv, scoring="accuracy", n_jobs=1)
grid.fit(X_tr, y_tr)
best_model = grid.best_estimator_
preds = best_model.predict(X_te)
acc = float(accuracy_score(y_te, preds))
cm = confusion_matrix(y_te, preds)
return {
"model": best_model,
"predictions": preds,
"accuracy": acc,
"confusion_matrix": cm,
"best_params": grid.best_params_,
"cv_score": float(grid.best_score_),
}
# ---------------------------------------------------------------------------
# Random Forest feature importance
# ---------------------------------------------------------------------------
[docs]
@requires_extra("ml")
def random_forest_importance(
X: pd.DataFrame | np.ndarray,
y: pd.Series | np.ndarray,
feature_names: Sequence[str] | None = None,
n_estimators: int = 100,
max_depth: int | None = 5,
random_state: int = 42,
task: Literal["classification", "regression"] = "classification",
) -> dict[str, Any]:
"""Rank features by importance using a Random Forest.
Random Forests aggregate many decorrelated decision trees and measure
each feature's contribution to reducing impurity (Gini for
classification, variance for regression). This produces a natural
feature ranking useful for selecting the most predictive signals from
a large universe of technical indicators, fundamental factors, or
alternative data features.
When to use:
Use as a first-pass feature selector when you have many candidate
features (>20) and want to identify which ones carry signal. Fast,
non-parametric, and handles mixed feature types.
Mathematical background:
Mean Decrease Impurity (MDI) for feature j:
Imp(j) = sum_{t in T_j} p(t) * Delta_i(t)
where T_j is the set of tree nodes splitting on feature j, p(t) is
the fraction of samples reaching node t, and Delta_i(t) is the
impurity decrease. MDI is averaged over all trees in the forest.
Parameters
----------
X : pd.DataFrame or np.ndarray
Feature matrix.
y : pd.Series or np.ndarray
Target vector.
feature_names : Sequence[str] or None
Feature names. If None and X is a DataFrame, column names are used.
n_estimators : int
Number of trees.
max_depth : int or None
Maximum tree depth (None for unlimited).
random_state : int
Random seed for reproducibility.
task : {'classification', 'regression'}
Type of prediction task.
Returns
-------
dict
``importance``: pd.Series of feature importances sorted descending,
``model``: fitted RandomForest estimator,
``oob_score``: float (out-of-bag score if available, else None).
Example
-------
>>> import numpy as np
>>> X = np.random.randn(300, 10)
>>> y = (X[:, 0] + 0.5 * X[:, 3] > 0).astype(int)
>>> result = random_forest_importance(X, y)
>>> result["importance"].index[0] # top feature is likely 0
0
Caveats
-------
- MDI importance is biased toward high-cardinality features; consider
permutation importance (``feature_importance_mda``) as a complement.
- Correlated features share importance, causing both to appear weaker.
References
----------
- Breiman (2001), "Random Forests"
- Lopez de Prado (2018), "Advances in Financial Machine Learning", Ch.8
"""
from sklearn.ensemble import RandomForestClassifier, RandomForestRegressor
X_arr = np.asarray(X)
y_arr = np.asarray(y)
if feature_names is None:
if isinstance(X, pd.DataFrame):
feature_names = list(X.columns)
else:
feature_names = list(range(X_arr.shape[1]))
if task == "classification":
rf = RandomForestClassifier(
n_estimators=n_estimators,
max_depth=max_depth,
random_state=random_state,
oob_score=True,
n_jobs=1,
)
else:
rf = RandomForestRegressor(
n_estimators=n_estimators,
max_depth=max_depth,
random_state=random_state,
oob_score=True,
n_jobs=1,
)
rf.fit(X_arr, y_arr)
importance = pd.Series(
rf.feature_importances_,
index=feature_names,
name="importance",
).sort_values(ascending=False)
oob = float(rf.oob_score_) if hasattr(rf, "oob_score_") else None
return {
"importance": importance,
"model": rf,
"oob_score": oob,
}
# ---------------------------------------------------------------------------
# Gradient Boosting
# ---------------------------------------------------------------------------
[docs]
@requires_extra("ml")
def gradient_boost_forecast(
X_train: pd.DataFrame | np.ndarray,
y_train: pd.Series | np.ndarray,
X_test: pd.DataFrame | np.ndarray,
y_test: pd.Series | np.ndarray | None = None,
task: Literal["classification", "regression"] = "regression",
n_estimators: int = 200,
max_depth: int = 4,
learning_rate: float = 0.1,
subsample: float = 0.8,
cv: int = 5,
feature_names: Sequence[str] | None = None,
) -> dict[str, Any]:
"""Gradient boosting for forecasting or classification.
Gradient Boosting sequentially fits weak learners (shallow trees) to
the residuals of the ensemble, greedily minimising a loss function. It
is the workhorse of tabular ML in quant finance -- used for return
prediction, alpha factor construction, default prediction, and more.
When to use:
Use gradient boosting as your default tabular model. It handles
non-linearities, feature interactions, and missing values naturally.
Preferred over linear models when you have >500 samples and >5
features.
Mathematical background:
At each stage m, the model adds a tree h_m that minimises:
F_m(x) = F_{m-1}(x) + nu * h_m(x)
where h_m fits the negative gradient of the loss:
h_m = argmin_h sum_i L(y_i, F_{m-1}(x_i) + h(x_i))
For regression with squared loss, h_m fits the residuals.
For classification with log-loss, h_m fits the log-odds residuals.
Parameters
----------
X_train : pd.DataFrame or np.ndarray
Training feature matrix.
y_train : pd.Series or np.ndarray
Training target.
X_test : pd.DataFrame or np.ndarray
Test feature matrix.
y_test : pd.Series or np.ndarray or None
Test target (if provided, test metrics are computed).
task : {'regression', 'classification'}
Prediction task.
n_estimators : int
Number of boosting stages.
max_depth : int
Maximum depth of individual trees.
learning_rate : float
Shrinkage applied to each tree's contribution.
subsample : float
Fraction of training samples used per tree (stochastic boosting).
cv : int
Cross-validation folds for reporting training CV score.
feature_names : Sequence[str] or None
Feature names for importance ranking.
Returns
-------
dict
``model``: fitted GradientBoosting estimator,
``predictions``: np.ndarray of test predictions,
``feature_importance``: pd.Series (sorted descending),
``cv_scores``: np.ndarray of cross-validation scores,
``test_score``: float or None (R^2 for regression, accuracy for
classification).
Example
-------
>>> import numpy as np
>>> X = np.random.randn(300, 5)
>>> y = X[:, 0] * 2 + X[:, 1] + np.random.randn(300) * 0.5
>>> result = gradient_boost_forecast(X[:250], y[:250], X[250:], y[250:])
>>> result["test_score"] > 0
True
Caveats
-------
- Overfits if n_estimators is too large; use early stopping or CV.
- Sensitive to learning_rate / n_estimators trade-off.
- For >100k samples, consider XGBoost/LightGBM for speed.
References
----------
- Friedman (2001), "Greedy Function Approximation: A Gradient Boosting
Machine"
"""
from sklearn.ensemble import (
GradientBoostingClassifier,
GradientBoostingRegressor,
)
from sklearn.model_selection import cross_val_score
X_tr = np.asarray(X_train)
y_tr = np.asarray(y_train)
X_te = np.asarray(X_test)
if feature_names is None:
if isinstance(X_train, pd.DataFrame):
feature_names = list(X_train.columns)
else:
feature_names = list(range(X_tr.shape[1]))
if task == "regression":
gb = GradientBoostingRegressor(
n_estimators=n_estimators,
max_depth=max_depth,
learning_rate=learning_rate,
subsample=subsample,
random_state=42,
)
scoring = "r2"
else:
gb = GradientBoostingClassifier(
n_estimators=n_estimators,
max_depth=max_depth,
learning_rate=learning_rate,
subsample=subsample,
random_state=42,
)
scoring = "accuracy"
gb.fit(X_tr, y_tr)
preds = gb.predict(X_te)
cv_scores = cross_val_score(gb, X_tr, y_tr, cv=cv, scoring=scoring)
importance = pd.Series(
gb.feature_importances_,
index=feature_names,
name="importance",
).sort_values(ascending=False)
test_score: float | None = None
if y_test is not None:
y_te = np.asarray(y_test)
if task == "regression":
ss_res = np.sum((y_te - preds) ** 2)
ss_tot = np.sum((y_te - y_te.mean()) ** 2)
test_score = float(1 - ss_res / ss_tot) if ss_tot > 0 else 0.0
else:
test_score = float(np.mean(preds == y_te))
return {
"model": gb,
"predictions": preds,
"feature_importance": importance,
"cv_scores": cv_scores,
"test_score": test_score,
}
# ---------------------------------------------------------------------------
# Gaussian Process regression
# ---------------------------------------------------------------------------
[docs]
@requires_extra("ml")
def gaussian_process_regression(
X_train: pd.DataFrame | np.ndarray,
y_train: pd.Series | np.ndarray,
X_test: pd.DataFrame | np.ndarray,
kernel: str = "rbf",
alpha: float = 1e-2,
n_restarts: int = 5,
) -> dict[str, Any]:
"""Gaussian Process regression with uncertainty quantification.
Gaussian Processes (GPs) define a distribution over functions and
provide both point predictions and calibrated confidence intervals.
In finance, GPs are used for smooth yield-curve fitting,
volatility-surface interpolation, and any setting where uncertainty
matters as much as the prediction.
When to use:
Use GP when you need uncertainty estimates (e.g., confidence bands
on a yield curve) and have a small-to-moderate dataset (<5000
observations). The cubic complexity makes GPs impractical for
large datasets without approximations.
Mathematical background:
A GP assumes f(x) ~ GP(m(x), k(x, x')), where:
m(x) is the mean function (usually 0)
k(x, x') is the kernel (covariance function)
Posterior predictive at test point x*:
mu* = k(x*, X) [K + sigma^2 I]^{-1} y
sigma*^2 = k(x*, x*) - k(x*, X) [K + sigma^2 I]^{-1} k(X, x*)
where K_{ij} = k(x_i, x_j) and sigma^2 is the noise variance.
Parameters
----------
X_train : pd.DataFrame or np.ndarray
Training features.
y_train : pd.Series or np.ndarray
Training target.
X_test : pd.DataFrame or np.ndarray
Test features.
kernel : str
Kernel type: ``'rbf'``, ``'matern'``, or ``'rational_quadratic'``.
alpha : float
Noise level (regularisation diagonal added to the kernel matrix).
n_restarts : int
Number of optimiser restarts for kernel hyperparameters.
Returns
-------
dict
``predictions``: np.ndarray of mean predictions,
``std``: np.ndarray of predictive standard deviations,
``confidence_lower``: np.ndarray (mean - 1.96 * std),
``confidence_upper``: np.ndarray (mean + 1.96 * std),
``model``: fitted GaussianProcessRegressor.
Example
-------
>>> import numpy as np
>>> X_train = np.linspace(0, 10, 50).reshape(-1, 1)
>>> y_train = np.sin(X_train).ravel() + np.random.randn(50) * 0.1
>>> X_test = np.linspace(0, 10, 20).reshape(-1, 1)
>>> result = gaussian_process_regression(X_train, y_train, X_test)
>>> result["predictions"].shape
(20,)
>>> result["std"].shape
(20,)
Caveats
-------
- Complexity is O(n^3) for training and O(n^2) per prediction.
- For large datasets, use sparse GP approximations (not included here).
- Kernel choice strongly affects results; try multiple kernels.
References
----------
- Rasmussen & Williams (2006), "Gaussian Processes for Machine Learning"
"""
from sklearn.gaussian_process import GaussianProcessRegressor
from sklearn.gaussian_process.kernels import (
ConstantKernel,
Matern,
RationalQuadratic,
RBF,
)
X_tr = np.asarray(X_train)
y_tr = np.asarray(y_train).ravel()
X_te = np.asarray(X_test)
kernel_map = {
"rbf": ConstantKernel(1.0) * RBF(length_scale=1.0),
"matern": ConstantKernel(1.0) * Matern(length_scale=1.0, nu=1.5),
"rational_quadratic": ConstantKernel(1.0) * RationalQuadratic(
length_scale=1.0, alpha=1.0
),
}
kern = kernel_map.get(kernel)
if kern is None:
raise ValueError(
f"Unknown kernel '{kernel}'; choose from 'rbf', 'matern', "
f"'rational_quadratic'."
)
gp = GaussianProcessRegressor(
kernel=kern,
alpha=alpha,
n_restarts_optimizer=n_restarts,
random_state=42,
)
gp.fit(X_tr, y_tr)
mean, std = gp.predict(X_te, return_std=True)
return {
"predictions": mean,
"std": std,
"confidence_lower": mean - 1.96 * std,
"confidence_upper": mean + 1.96 * std,
"model": gp,
}
# ---------------------------------------------------------------------------
# Isolation Forest anomaly detection
# ---------------------------------------------------------------------------
[docs]
@requires_extra("ml")
def isolation_forest_anomaly(
returns: pd.Series | pd.DataFrame | np.ndarray,
contamination: float = 0.05,
n_estimators: int = 200,
random_state: int = 42,
) -> dict[str, Any]:
"""Detect anomalous days in return data using Isolation Forest.
Isolation Forest detects anomalies by randomly partitioning data and
measuring how quickly each observation is isolated. Anomalous points
(outlier returns, flash crashes, liquidity events) are isolated in
fewer splits because they sit far from the bulk of the distribution.
When to use:
Use for unsupervised anomaly detection in returns, volumes, or
spreads. Works well when you do not have labelled anomalies and
want to flag unusual market days for review. Robust to
high-dimensional feature spaces.
Mathematical background:
For a sample x, the anomaly score is based on the average path
length E[h(x)] across the isolation trees:
s(x, n) = 2^{-E[h(x)] / c(n)}
where c(n) is the average path length in a binary search tree of
n samples. Score close to 1 means anomaly; close to 0.5 means
normal.
Parameters
----------
returns : pd.Series, pd.DataFrame, or np.ndarray
Return data. If 1-D, treated as a single feature; if 2-D, each
column is a feature (e.g., return, volume, spread).
contamination : float
Expected fraction of anomalies in the dataset (0 < c < 0.5).
n_estimators : int
Number of isolation trees.
random_state : int
Random seed.
Returns
-------
dict
``anomaly_labels``: np.ndarray of -1 (anomaly) / 1 (normal),
``anomaly_scores``: np.ndarray of continuous anomaly scores
(lower = more anomalous),
``anomaly_mask``: np.ndarray of bool (True for anomalies),
``n_anomalies``: int,
``model``: fitted IsolationForest.
Example
-------
>>> import numpy as np
>>> rets = np.random.randn(500) * 0.01
>>> rets[100] = 0.15 # inject anomaly
>>> result = isolation_forest_anomaly(rets, contamination=0.02)
>>> result["anomaly_mask"][100]
True
Caveats
-------
- The contamination parameter is a prior; misspecification leads to
over- or under-detection.
- Isolation Forest assumes anomalies are both rare and different;
clustered anomalies may be missed.
- For time-series anomaly detection, consider adding lagged features.
References
----------
- Liu, Ting & Zhou (2008), "Isolation Forest"
"""
from sklearn.ensemble import IsolationForest
X = np.asarray(returns)
if X.ndim == 1:
X = X.reshape(-1, 1)
iso = IsolationForest(
n_estimators=n_estimators,
contamination=contamination,
random_state=random_state,
n_jobs=1,
)
labels = iso.fit_predict(X)
scores = iso.decision_function(X)
anomaly_mask = labels == -1
return {
"anomaly_labels": labels,
"anomaly_scores": scores,
"anomaly_mask": anomaly_mask,
"n_anomalies": int(anomaly_mask.sum()),
"model": iso,
}
# ---------------------------------------------------------------------------
# PCA factor model
# ---------------------------------------------------------------------------
[docs]
@requires_extra("ml")
def pca_factor_model(
returns: pd.DataFrame,
n_components: int | None = None,
explained_variance_threshold: float = 0.90,
) -> dict[str, Any]:
"""Build a PCA-based latent factor model from asset returns.
Principal Component Analysis extracts orthogonal linear combinations
of asset returns that explain the most variance. The first PC
typically captures the market factor, the second often captures a
value/growth or sector rotation, and so on.
When to use:
Use PCA factor models for dimensionality reduction in portfolio
construction, risk decomposition, statistical arbitrage (pairs
trading on residuals), and understanding co-movement structure.
Mathematical background:
Given return matrix R (T x N), PCA decomposes the covariance:
Sigma = V Lambda V^T
where Lambda = diag(lambda_1, ..., lambda_N) are eigenvalues and
V are eigenvectors (loadings). Factor returns are:
F = R @ V[:, :k] (T x k)
The fraction of variance explained by the first k components:
sum(lambda_1..k) / sum(lambda_1..N)
Parameters
----------
returns : pd.DataFrame
T x N return matrix (rows = observations, columns = assets).
n_components : int or None
Number of principal components. If None, selects enough to explain
``explained_variance_threshold`` of total variance.
explained_variance_threshold : float
Minimum cumulative explained variance ratio when ``n_components``
is None.
Returns
-------
dict
``loadings``: pd.DataFrame of shape ``(N, n_components)`` --
asset loadings on each factor,
``factor_returns``: pd.DataFrame of shape ``(T, n_components)`` --
time series of factor returns,
``explained_variance_ratio``: np.ndarray of per-component variance
ratios,
``cumulative_variance``: np.ndarray of cumulative variance ratios,
``n_components``: int,
``model``: fitted PCA object.
Example
-------
>>> import numpy as np, pandas as pd
>>> returns = pd.DataFrame(np.random.randn(252, 20) * 0.01)
>>> result = pca_factor_model(returns, n_components=3)
>>> result["factor_returns"].shape
(252, 3)
Caveats
-------
- PCA is linear; for non-linear dimensionality reduction, use the VAE
in ``wraquant.ml.deep.autoencoder_features``.
- Eigenvalues from small samples are noisy; use Random Matrix Theory
denoising (``wraquant.ml.preprocessing.denoised_correlation``) first.
- Components are not guaranteed to have economic meaning.
References
----------
- Jolliffe (2002), "Principal Component Analysis"
- Avellaneda & Lee (2010), "Statistical arbitrage in the US equities
market"
"""
from sklearn.decomposition import PCA
R = np.asarray(returns, dtype=np.float64)
if n_components is None:
# Fit full PCA to find the right number of components
pca_full = PCA()
pca_full.fit(R)
cumvar = np.cumsum(pca_full.explained_variance_ratio_)
n_components = int(np.searchsorted(cumvar, explained_variance_threshold) + 1)
n_components = min(n_components, R.shape[1])
pca = PCA(n_components=n_components)
factor_returns = pca.fit_transform(R)
# Loadings: eigenvectors scaled by sqrt(eigenvalue)
loadings = pca.components_.T # (N, n_components)
asset_names = returns.columns if isinstance(returns, pd.DataFrame) else None
factor_names = [f"PC{i + 1}" for i in range(n_components)]
loadings_df = pd.DataFrame(
loadings,
index=asset_names if asset_names is not None else range(R.shape[1]),
columns=factor_names,
)
index = returns.index if isinstance(returns, pd.DataFrame) else None
factor_df = pd.DataFrame(
factor_returns,
index=index,
columns=factor_names,
)
return {
"loadings": loadings_df,
"factor_returns": factor_df,
"explained_variance_ratio": pca.explained_variance_ratio_,
"cumulative_variance": np.cumsum(pca.explained_variance_ratio_),
"n_components": n_components,
"model": pca,
}