"""Online (streaming) machine learning for quantitative finance.
Provides recursive and weighted regression algorithms that update
incrementally with each new observation, enabling real-time tracking of
time-varying relationships in financial data.
These algorithms require only numpy and pandas -- no optional dependencies.
"""
from __future__ import annotations
from typing import Any
import numpy as np
import pandas as pd
__all__ = [
"online_linear_regression",
"exponential_weighted_regression",
]
# ---------------------------------------------------------------------------
# Online Linear Regression (Recursive Least Squares)
# ---------------------------------------------------------------------------
[docs]
def online_linear_regression(
X: pd.DataFrame | np.ndarray,
y: pd.Series | np.ndarray,
forgetting_factor: float = 1.0,
initial_covariance: float = 100.0,
) -> dict[str, Any]:
"""Recursive Least Squares (RLS) online linear regression.
Processes observations one at a time, updating regression coefficients
with each new data point. This is the online analogue of ordinary least
squares and is fundamental to adaptive signal processing in finance:
tracking time-varying betas, hedge ratios, and factor loadings.
When to use:
Use online regression when you need to:
- Track a hedge ratio that drifts over time (pairs trading).
- Estimate time-varying factor exposures (rolling beta).
- Build adaptive trading signals that respond to regime changes.
- Process streaming tick data without re-estimating from scratch.
Mathematical background:
Recursive Least Squares maintains:
P_t = (1/lambda) * (P_{t-1} - K_t x_t^T P_{t-1})
K_t = P_{t-1} x_t / (lambda + x_t^T P_{t-1} x_t)
w_t = w_{t-1} + K_t (y_t - x_t^T w_{t-1})
where:
- w_t is the coefficient vector at time t
- P_t is the inverse covariance matrix (precision)
- K_t is the Kalman gain
- lambda is the forgetting factor (1 = no forgetting, <1 = down-weight old data)
With lambda = 1 and infinite data, RLS converges to OLS. With
lambda < 1, the effective window length is approximately
1 / (1 - lambda) observations.
Parameters
----------
X : pd.DataFrame or np.ndarray
Feature matrix of shape ``(T, p)`` where T is the number of
observations and p is the number of features.
y : pd.Series or np.ndarray
Target vector of length T.
forgetting_factor : float
Forgetting factor lambda in (0, 1]. Values close to 1 give long
memory; values like 0.99 give an effective window of ~100
observations. Use 0.95-0.99 for fast-adapting signals.
initial_covariance : float
Scalar multiplier for the initial covariance matrix P_0 = c * I.
Larger values make the filter more responsive early on.
Returns
-------
dict
``coefficients``: np.ndarray of shape ``(T, p)`` -- the
time-varying coefficient vector at each step,
``predictions``: np.ndarray of shape ``(T,)`` -- one-step-ahead
predictions (each y_hat_t uses coefficients estimated from data
up to t-1),
``residuals``: np.ndarray of shape ``(T,)`` -- prediction errors,
``final_coefficients``: np.ndarray of shape ``(p,)`` -- the
coefficients at the last time step.
Example
-------
>>> import numpy as np
>>> np.random.seed(42)
>>> T = 500
>>> X = np.random.randn(T, 2)
>>> # True coefficients shift halfway through
>>> beta_true = np.where(np.arange(T)[:, None] < 250,
... [1.0, 0.5], [0.5, 1.0])
>>> y = np.sum(X * beta_true, axis=1) + np.random.randn(T) * 0.1
>>> result = online_linear_regression(X, y, forgetting_factor=0.98)
>>> result["coefficients"].shape
(500, 2)
>>> # After convergence, coefficients should track the true values
>>> np.abs(result["final_coefficients"][0] - 0.5) < 0.3
True
Caveats
-------
- The forgetting factor is critical: too low causes noisy estimates,
too high causes slow adaptation to regime changes.
- RLS assumes the noise variance is constant; for heteroskedastic
data, consider the exponential weighted variant or Kalman filters.
- Initial predictions (before the filter converges) should be
discarded in any evaluation.
References
----------
- Haykin (2002), "Adaptive Filter Theory", Ch. 13 (RLS)
- Montana et al. (2009), "Flexible least squares for temporal data
mining and statistical arbitrage"
"""
X_arr = np.asarray(X, dtype=np.float64)
y_arr = np.asarray(y, dtype=np.float64).ravel()
if X_arr.ndim == 1:
X_arr = X_arr.reshape(-1, 1)
T, p = X_arr.shape
lam = forgetting_factor
# Initialise
w = np.zeros(p, dtype=np.float64)
P = np.eye(p, dtype=np.float64) * initial_covariance
coefficients = np.zeros((T, p), dtype=np.float64)
predictions = np.zeros(T, dtype=np.float64)
residuals = np.zeros(T, dtype=np.float64)
for t in range(T):
x_t = X_arr[t] # (p,)
# One-step-ahead prediction using current coefficients
y_hat = x_t @ w
predictions[t] = y_hat
residuals[t] = y_arr[t] - y_hat
# Kalman gain
Px = P @ x_t # (p,)
denom = lam + x_t @ Px
K = Px / denom # (p,)
# Update coefficients
w = w + K * residuals[t]
# Update inverse covariance
P = (P - np.outer(K, x_t @ P)) / lam
coefficients[t] = w.copy()
return {
"coefficients": coefficients,
"predictions": predictions,
"residuals": residuals,
"final_coefficients": coefficients[-1].copy(),
}
# ---------------------------------------------------------------------------
# Exponential Weighted Regression
# ---------------------------------------------------------------------------
[docs]
def exponential_weighted_regression(
X: pd.DataFrame | np.ndarray,
y: pd.Series | np.ndarray,
halflife: float = 63.0,
min_periods: int = 30,
) -> dict[str, Any]:
"""Exponentially weighted linear regression favouring recent data.
At each time step t, fits a weighted least squares regression where
observation weights decay exponentially into the past. This produces
smooth, adaptive coefficient estimates that naturally respond to
regime changes without the abrupt sensitivity of rolling-window OLS.
When to use:
Use exponential weighted regression when:
- You want smoother coefficient paths than RLS.
- The halflife of predictive relationships is approximately known
(e.g., 63 trading days ~ 3 months).
- You need an interpretable "recency bias" in your factor model.
Mathematical background:
At time t, the weight for observation s (where s <= t) is:
w_s = exp(-ln(2) * (t - s) / halflife)
The weighted regression solves:
beta_t = (X_t^T W_t X_t)^{-1} X_t^T W_t y_t
where W_t = diag(w_0, w_1, ..., w_t). This is equivalent to
EWMA smoothing of the sufficient statistics X^T X and X^T y.
Parameters
----------
X : pd.DataFrame or np.ndarray
Feature matrix of shape ``(T, p)``.
y : pd.Series or np.ndarray
Target vector of length T.
halflife : float
Halflife in observations. After ``halflife`` observations, the
weight of a past data point has decayed to 50%. Common financial
values: 21 (1 month), 63 (1 quarter), 252 (1 year).
min_periods : int
Minimum number of observations before producing a coefficient
estimate. Earlier entries are filled with NaN.
Returns
-------
dict
``coefficients``: np.ndarray of shape ``(T, p)`` -- time-varying
coefficients (NaN for the first ``min_periods - 1`` rows),
``predictions``: np.ndarray of shape ``(T,)`` -- fitted values
using contemporaneous coefficients,
``residuals``: np.ndarray of shape ``(T,)`` -- prediction errors,
``final_coefficients``: np.ndarray of shape ``(p,)`` -- last
estimated coefficients.
Example
-------
>>> import numpy as np
>>> np.random.seed(0)
>>> T = 300
>>> X = np.random.randn(T, 2)
>>> beta_true = np.column_stack([
... np.linspace(1, 0, T), # drifting coefficient
... np.full(T, 0.5), # constant coefficient
... ])
>>> y = np.sum(X * beta_true, axis=1) + np.random.randn(T) * 0.1
>>> result = exponential_weighted_regression(X, y, halflife=60)
>>> result["coefficients"].shape
(300, 2)
Caveats
-------
- Halflife selection is subjective; cross-validate if possible.
- For very short halflives (<10), the effective sample size is small
and estimates become noisy.
- Assumes homoskedastic errors; for heteroskedastic data, consider
EWMA-weighted robust regression.
- Numerically less stable than RLS for ill-conditioned problems.
References
----------
- Pozzi et al. (2012), "Exponentially weighted moving average charts
for detecting concept drift"
- de Prado (2018), "Advances in Financial Machine Learning", Ch. 17
"""
X_arr = np.asarray(X, dtype=np.float64)
y_arr = np.asarray(y, dtype=np.float64).ravel()
if X_arr.ndim == 1:
X_arr = X_arr.reshape(-1, 1)
T, p = X_arr.shape
decay = np.log(2.0) / halflife
coefficients = np.full((T, p), np.nan, dtype=np.float64)
predictions = np.full(T, np.nan, dtype=np.float64)
residuals = np.full(T, np.nan, dtype=np.float64)
for t in range(min_periods - 1, T):
# Weights for observations 0..t
ages = np.arange(t, -1, -1, dtype=np.float64) # t, t-1, ..., 0
weights = np.exp(-decay * ages)
X_t = X_arr[: t + 1]
y_t = y_arr[: t + 1]
# Weighted normal equations: (X^T W X) beta = X^T W y
W_sqrt = np.sqrt(weights)
Xw = X_t * W_sqrt[:, None]
yw = y_t * W_sqrt
XtX = Xw.T @ Xw
Xty = Xw.T @ yw
try:
beta = np.linalg.solve(XtX, Xty)
except np.linalg.LinAlgError:
# Singular matrix -- use pseudoinverse
beta = np.linalg.lstsq(XtX, Xty, rcond=None)[0]
coefficients[t] = beta
predictions[t] = X_arr[t] @ beta
residuals[t] = y_arr[t] - predictions[t]
last_valid = coefficients[~np.isnan(coefficients[:, 0])]
final = last_valid[-1] if len(last_valid) > 0 else np.full(p, np.nan)
return {
"coefficients": coefficients,
"predictions": predictions,
"residuals": residuals,
"final_coefficients": final,
}