Source code for wraquant.regimes.kalman

"""Kalman filter, smoother, and time-varying regression.

This module provides pure-numpy implementations of the Kalman filter,
Rauch-Tung-Striebel smoother, time-varying coefficient regression
(dynamic linear model), and the Unscented Kalman Filter for nonlinear
state estimation.

**When to use each:**

- **kalman_filter**: Linear state-space models where you want filtered
  (causal) state estimates. Core building block.
- **kalman_smoother**: When you have all data and want the best possible
  state estimates (non-causal, uses future data).
- **kalman_regression**: Estimate time-varying betas (e.g., hedge ratios,
  factor exposures) that evolve as random walks.
- **unscented_kalman**: Nonlinear dynamics or observation models where
  the standard Kalman filter's linearity assumption breaks down.

References:
    Durbin, J. & Koopman, S. J. (2012). *Time Series Analysis by State
    Space Methods*. Oxford University Press.

    Haykin, S. (2001). *Kalman Filtering and Neural Networks*. Wiley.

    Julier, S. J. & Uhlmann, J. K. (2004). "Unscented Filtering and
    Nonlinear Estimation." *Proceedings of the IEEE*, 92(3).
"""

from __future__ import annotations

from typing import Any, Callable

import numpy as np
import pandas as pd

from wraquant.core._coerce import coerce_array


# ---------------------------------------------------------------------------
# Kalman Filter
# ---------------------------------------------------------------------------


[docs] def kalman_filter( observations: np.ndarray, F: np.ndarray, H: np.ndarray, Q: np.ndarray, R: np.ndarray, x0: np.ndarray, P0: np.ndarray, ) -> dict[str, Any]: """Run a linear Kalman filter over a sequence of observations. The Kalman filter is the optimal recursive estimator for a linear Gaussian state-space model: .. math:: x_t = F \\, x_{t-1} + w_t, \\quad w_t \\sim N(0, Q) y_t = H \\, x_t + v_t, \\quad v_t \\sim N(0, R) At each time step, the filter produces: 1. **Prediction**: :math:`\\hat{x}_{t|t-1}` and :math:`P_{t|t-1}` 2. **Update**: :math:`\\hat{x}_{t|t}` and :math:`P_{t|t}` after incorporating observation :math:`y_t` **Financial applications:** - Estimating latent factors or unobserved states - Noise reduction (e.g., estimating "true" volatility from noisy proxies) - Preprocessing for regime detection Parameters: observations: Observation matrix of shape ``(T, m)`` where *T* is the number of time steps and *m* is the observation dimension. For univariate observations, shape ``(T, 1)``. F: State transition matrix ``(n, n)``. H: Observation matrix ``(m, n)``. Q: Process noise covariance ``(n, n)``. R: Observation noise covariance ``(m, m)``. x0: Initial state estimate ``(n,)`` or ``(n, 1)``. P0: Initial state covariance ``(n, n)``. Returns: Dictionary with: - ``filtered_states`` (np.ndarray): Filtered state estimates, shape ``(T, n)``. - ``filtered_covs`` (np.ndarray): Filtered covariance matrices, shape ``(T, n, n)``. - ``predicted_states`` (np.ndarray): One-step-ahead predicted states, shape ``(T, n)``. - ``predicted_covs`` (np.ndarray): One-step-ahead predicted covariances, shape ``(T, n, n)``. - ``innovations`` (np.ndarray): Innovation (residual) sequence, shape ``(T, m)``. - ``innovation_covs`` (np.ndarray): Innovation covariances, shape ``(T, m, m)``. - ``kalman_gains`` (np.ndarray): Kalman gain matrices, shape ``(T, n, m)``. - ``log_likelihood`` (float): Total log-likelihood computed from the innovation sequence. Example: >>> # Track a random walk with noisy observations >>> F = np.array([[1.0]]) >>> H = np.array([[1.0]]) >>> Q = np.array([[0.01]]) # Process noise >>> R = np.array([[0.1]]) # Observation noise >>> x0 = np.array([0.0]) >>> P0 = np.array([[1.0]]) >>> obs = np.cumsum(np.random.randn(100, 1) * 0.1) + \\ ... np.random.randn(100, 1) * 0.3 >>> result = kalman_filter(obs, F, H, Q, R, x0, P0) >>> print(result['log_likelihood']) >>> print(result['filtered_states'][-5:]) See Also: kalman_smoother: Backward pass for optimal smoothed estimates. kalman_regression: Time-varying coefficient regression. unscented_kalman: Nonlinear state estimation. """ observations = np.asarray(observations, dtype=np.float64) if observations.ndim == 1: observations = observations.reshape(-1, 1) x0 = np.atleast_1d(x0).flatten() n = len(x0) T = len(observations) m = H.shape[0] filtered_states = np.zeros((T, n)) filtered_covs = np.zeros((T, n, n)) predicted_states = np.zeros((T, n)) predicted_covs = np.zeros((T, n, n)) innovations = np.zeros((T, m)) innovation_covs = np.zeros((T, m, m)) kalman_gains = np.zeros((T, n, m)) x = x0.copy() P = P0.copy() log_likelihood = 0.0 for t in range(T): # Predict x_pred = F @ x P_pred = F @ P @ F.T + Q predicted_states[t] = x_pred predicted_covs[t] = P_pred # Innovation y = observations[t] - H @ x_pred S = H @ P_pred @ H.T + R innovations[t] = y innovation_covs[t] = S # Log-likelihood contribution: -0.5 * (log|S| + y' S^-1 y + m*log(2pi)) S_inv = np.linalg.inv(S) sign, logdet = np.linalg.slogdet(S) log_likelihood += -0.5 * ( logdet + y @ S_inv @ y + m * np.log(2 * np.pi) ) # Kalman gain K = P_pred @ H.T @ S_inv kalman_gains[t] = K # Update x = x_pred + K @ y P = (np.eye(n) - K @ H) @ P_pred filtered_states[t] = x filtered_covs[t] = P return { "filtered_states": filtered_states, "filtered_covs": filtered_covs, "predicted_states": predicted_states, "predicted_covs": predicted_covs, "innovations": innovations, "innovation_covs": innovation_covs, "kalman_gains": kalman_gains, "log_likelihood": float(log_likelihood), }
# --------------------------------------------------------------------------- # Rauch-Tung-Striebel Smoother # ---------------------------------------------------------------------------
[docs] def kalman_smoother( observations: np.ndarray, F: np.ndarray, H: np.ndarray, Q: np.ndarray, R: np.ndarray, x0: np.ndarray, P0: np.ndarray, ) -> dict[str, Any]: """Run the Kalman filter followed by the RTS backward smoother. The Rauch-Tung-Striebel (RTS) smoother uses all observations (past and future) to produce the optimal state estimate at each time step. The smoothed estimates have lower variance than the filtered estimates. **When to use:** Offline analysis where you have the complete time series and want the best possible state estimates. Parameters: observations: Observations, shape ``(T, m)``. F: State transition matrix ``(n, n)``. H: Observation matrix ``(m, n)``. Q: Process noise covariance ``(n, n)``. R: Observation noise covariance ``(m, m)``. x0: Initial state ``(n,)``. P0: Initial covariance ``(n, n)``. Returns: Dictionary with everything from ``kalman_filter`` plus: - ``smoothed_states`` (np.ndarray): Smoothed state estimates, shape ``(T, n)``. - ``smoothed_covs`` (np.ndarray): Smoothed covariance matrices, shape ``(T, n, n)``. Example: >>> result = kalman_smoother(obs, F, H, Q, R, x0, P0) >>> # Smoothed states are more accurate than filtered >>> print(result['smoothed_states'][-5:]) >>> print(result['filtered_states'][-5:]) See Also: kalman_filter: Forward pass only. kalman_regression: Time-varying betas using DLM. """ filt = kalman_filter(observations, F, H, Q, R, x0, P0) T = len(observations) n = F.shape[0] smoothed_states = np.zeros((T, n)) smoothed_covs = np.zeros((T, n, n)) # Initialise from last filtered state smoothed_states[-1] = filt["filtered_states"][-1] smoothed_covs[-1] = filt["filtered_covs"][-1] # Backward pass for t in range(T - 2, -1, -1): P_filt = filt["filtered_covs"][t] P_pred = filt["predicted_covs"][t + 1] # Smoother gain P_pred_inv = np.linalg.inv(P_pred) L = P_filt @ F.T @ P_pred_inv x_filt = filt["filtered_states"][t] x_pred = filt["predicted_states"][t + 1] smoothed_states[t] = x_filt + L @ ( smoothed_states[t + 1] - x_pred ) smoothed_covs[t] = P_filt + L @ ( smoothed_covs[t + 1] - P_pred ) @ L.T result = dict(filt) result["smoothed_states"] = smoothed_states result["smoothed_covs"] = smoothed_covs return result
# --------------------------------------------------------------------------- # Kalman Regression (Time-Varying Coefficients) # ---------------------------------------------------------------------------
[docs] def kalman_regression( y: pd.Series | np.ndarray, X: pd.DataFrame | np.ndarray, state_noise: float = 1e-4, obs_noise: float | None = None, x0: np.ndarray | None = None, P0_scale: float = 1.0, smooth: bool = True, ) -> dict[str, Any]: """Estimate time-varying regression coefficients using a Kalman filter. Models the regression coefficients as a random walk: .. math:: \\beta_t = \\beta_{t-1} + w_t, \\quad w_t \\sim N(0, Q) y_t = X_t' \\beta_t + v_t, \\quad v_t \\sim N(0, R) This is a Dynamic Linear Model (DLM) where the state vector is the coefficient vector. Common financial applications: - **Time-varying betas**: Estimate CAPM beta that changes over time - **Dynamic hedge ratios**: For pairs trading or hedging - **Factor exposure monitoring**: Track how factor loadings evolve Parameters: y: Dependent variable, shape ``(T,)``. X: Regressors, shape ``(T, p)``. Include a column of ones for an intercept. state_noise: Process noise variance for each coefficient. Controls how quickly coefficients can change. Larger values allow faster adaptation. obs_noise: Observation noise variance. If None, estimated as the variance of OLS residuals. x0: Initial coefficient estimate, shape ``(p,)``. If None, OLS estimates are used. P0_scale: Scale for initial covariance (``P0 = P0_scale * I``). smooth: If True, apply the RTS smoother for better estimates. Returns: Dictionary with: - ``coefficients`` (np.ndarray): Time-varying coefficients, shape ``(T, p)``. If ``smooth=True``, these are smoothed. - ``coefficient_covs`` (np.ndarray): Covariance of coefficients, shape ``(T, p, p)``. - ``residuals`` (np.ndarray): Observation residuals, shape ``(T,)``. - ``log_likelihood`` (float): Log-likelihood. - ``filtered_coefficients`` (np.ndarray): Filtered (causal) coefficient estimates, shape ``(T, p)``. Example: >>> import numpy as np, pandas as pd >>> # Time-varying beta estimation >>> market = np.random.randn(500) * 0.01 >>> # True beta changes from 1.0 to 1.5 halfway >>> true_beta = np.where(np.arange(500) < 250, 1.0, 1.5) >>> stock = true_beta * market + np.random.randn(500) * 0.005 >>> X = np.column_stack([np.ones(500), market]) >>> result = kalman_regression(stock, X, state_noise=1e-5) >>> betas = result['coefficients'][:, 1] # Time-varying beta >>> print(f"Beta at t=100: {betas[100]:.2f}") # ~1.0 >>> print(f"Beta at t=400: {betas[400]:.2f}") # ~1.5 Notes: The ``state_noise`` parameter is critical. Too large and the estimates are noisy; too small and they adapt too slowly. A good starting point is ``1e-4`` to ``1e-6`` for daily returns. See Also: kalman_filter: General-purpose Kalman filter. kalman_smoother: RTS backward smoother. """ y_arr = coerce_array(y, "y") X_arr = np.asarray(X, dtype=np.float64) if X_arr.ndim == 1: X_arr = X_arr.reshape(-1, 1) T, p = X_arr.shape # State-space formulation F = np.eye(p) # Coefficients follow random walk Q = np.eye(p) * state_noise # Process noise # Estimate observation noise from OLS if not provided if obs_noise is None: try: beta_ols = np.linalg.lstsq(X_arr, y_arr, rcond=None)[0] resid = y_arr - X_arr @ beta_ols obs_noise = float(np.var(resid)) except np.linalg.LinAlgError: obs_noise = float(np.var(y_arr)) R = np.array([[obs_noise]]) # Initial state if x0 is None: try: x0 = np.linalg.lstsq(X_arr[:min(50, T)], y_arr[:min(50, T)], rcond=None)[0] except np.linalg.LinAlgError: x0 = np.zeros(p) else: x0 = np.atleast_1d(x0).flatten() P0 = np.eye(p) * P0_scale # Run filter with time-varying H filtered_states = np.zeros((T, p)) filtered_covs = np.zeros((T, p, p)) predicted_states = np.zeros((T, p)) predicted_covs = np.zeros((T, p, p)) residuals = np.zeros(T) log_likelihood = 0.0 x = x0.copy() P = P0.copy() for t in range(T): # Predict x_pred = F @ x P_pred = F @ P @ F.T + Q predicted_states[t] = x_pred predicted_covs[t] = P_pred # Time-varying observation matrix H_t = X_arr[t : t + 1, :] # shape (1, p) # Innovation y_pred = float((H_t @ x_pred).item()) innovation = y_arr[t] - y_pred S = H_t @ P_pred @ H_t.T + R S_inv = 1.0 / S[0, 0] residuals[t] = innovation # Log-likelihood log_likelihood += -0.5 * ( np.log(S[0, 0]) + innovation**2 * S_inv + np.log(2 * np.pi) ) # Kalman gain K = P_pred @ H_t.T * S_inv # shape (p, 1) # Update x = x_pred + K.flatten() * innovation P = (np.eye(p) - K @ H_t) @ P_pred filtered_states[t] = x filtered_covs[t] = P result: dict[str, Any] = { "filtered_coefficients": filtered_states, "coefficients": filtered_states, "coefficient_covs": filtered_covs, "residuals": residuals, "log_likelihood": float(log_likelihood), } # Apply RTS smoother if requested if smooth and T > 1: smoothed_states = np.zeros((T, p)) smoothed_covs = np.zeros((T, p, p)) smoothed_states[-1] = filtered_states[-1] smoothed_covs[-1] = filtered_covs[-1] for t in range(T - 2, -1, -1): P_filt = filtered_covs[t] P_pred = predicted_covs[t + 1] P_pred_inv = np.linalg.inv(P_pred) L = P_filt @ F.T @ P_pred_inv smoothed_states[t] = filtered_states[t] + L @ ( smoothed_states[t + 1] - predicted_states[t + 1] ) smoothed_covs[t] = P_filt + L @ ( smoothed_covs[t + 1] - P_pred ) @ L.T result["coefficients"] = smoothed_states result["coefficient_covs"] = smoothed_covs return result
# --------------------------------------------------------------------------- # Unscented Kalman Filter # ---------------------------------------------------------------------------
[docs] def unscented_kalman( observations: np.ndarray, state_dim: int, f_func: Callable[[np.ndarray], np.ndarray], h_func: Callable[[np.ndarray], np.ndarray], Q: np.ndarray, R: np.ndarray, x0: np.ndarray, P0: np.ndarray, alpha: float = 1e-3, beta: float = 2.0, kappa: float = 0.0, ) -> dict[str, Any]: """Unscented Kalman Filter for nonlinear state estimation. The UKF uses sigma points to propagate the state distribution through nonlinear dynamics and observation functions, avoiding the need for Jacobian computation (unlike the Extended Kalman Filter). The state-space model: .. math:: x_t = f(x_{t-1}) + w_t, \\quad w_t \\sim N(0, Q) y_t = h(x_t) + v_t, \\quad v_t \\sim N(0, R) **When to use:** When ``f`` or ``h`` are nonlinear. Examples include stochastic volatility models, nonlinear factor models, or target tracking with angular observations. Parameters: observations: Observations, shape ``(T, m)``. state_dim: Dimension of the state vector (``n``). f_func: State transition function. Takes ``x`` of shape ``(n,)`` and returns predicted state of shape ``(n,)``. h_func: Observation function. Takes ``x`` of shape ``(n,)`` and returns predicted observation of shape ``(m,)``. Q: Process noise covariance ``(n, n)``. R: Observation noise covariance ``(m, m)``. x0: Initial state estimate ``(n,)``. P0: Initial covariance ``(n, n)``. alpha: Spread of sigma points (typically ``1e-4`` to ``1``). beta: Prior knowledge of distribution (``2`` is optimal for Gaussian). kappa: Secondary scaling parameter (``0`` or ``3 - n``). Returns: Dictionary with: - ``filtered_states`` (np.ndarray): Filtered state estimates, shape ``(T, n)``. - ``filtered_covs`` (np.ndarray): Filtered covariances, shape ``(T, n, n)``. - ``innovations`` (np.ndarray): Innovation sequence, shape ``(T, m)``. - ``log_likelihood`` (float): Approximate log-likelihood. Example: >>> # Nonlinear observation model: observe angle to a target >>> def f(x): ... return x # Random walk dynamics >>> def h(x): ... return np.array([np.arctan2(x[1], x[0])]) # Angle >>> result = unscented_kalman( ... observations, state_dim=2, f_func=f, h_func=h, ... Q=np.eye(2)*0.01, R=np.array([[0.1]]), ... x0=np.zeros(2), P0=np.eye(2), ... ) Notes: The UKF with the standard unscented transform uses ``2n + 1`` sigma points. Computational cost scales as ``O(n^2)`` per step (vs ``O(n^3)`` for EKF with Jacobian). See Also: kalman_filter: Linear Kalman filter. kalman_smoother: Linear Kalman smoother. """ n = state_dim T = len(observations) obs = np.atleast_2d(observations) if obs.shape[0] == 1 and T > 1: obs = obs.T m = obs.shape[1] # Sigma point weights lam = alpha**2 * (n + kappa) - n gamma = np.sqrt(n + lam) # Weight vectors W_m = np.zeros(2 * n + 1) W_c = np.zeros(2 * n + 1) W_m[0] = lam / (n + lam) W_c[0] = lam / (n + lam) + (1 - alpha**2 + beta) for i in range(1, 2 * n + 1): W_m[i] = 1.0 / (2 * (n + lam)) W_c[i] = 1.0 / (2 * (n + lam)) filtered_states = np.zeros((T, n)) filtered_covs = np.zeros((T, n, n)) innovations_arr = np.zeros((T, m)) log_likelihood = 0.0 x = np.atleast_1d(x0).flatten().copy() P = P0.copy() for t in range(T): # Generate sigma points try: sqrt_P = np.linalg.cholesky(P) except np.linalg.LinAlgError: # If P is not PD, add small diagonal P += np.eye(n) * 1e-8 sqrt_P = np.linalg.cholesky(P) sigma_pts = np.zeros((2 * n + 1, n)) sigma_pts[0] = x for i in range(n): sigma_pts[i + 1] = x + gamma * sqrt_P[:, i] sigma_pts[n + i + 1] = x - gamma * sqrt_P[:, i] # Propagate through state transition sigma_pts_pred = np.array([f_func(sp) for sp in sigma_pts]) # Predicted state mean and covariance x_pred = W_m @ sigma_pts_pred P_pred = Q.copy() for i in range(2 * n + 1): diff = sigma_pts_pred[i] - x_pred P_pred += W_c[i] * np.outer(diff, diff) # Propagate through observation function obs_sigma = np.array([h_func(sp) for sp in sigma_pts_pred]) # Predicted observation y_pred = W_m @ obs_sigma # Innovation covariance and cross-covariance S = R.copy() P_xy = np.zeros((n, m)) for i in range(2 * n + 1): obs_diff = obs_sigma[i] - y_pred state_diff = sigma_pts_pred[i] - x_pred S += W_c[i] * np.outer(obs_diff, obs_diff) P_xy += W_c[i] * np.outer(state_diff, obs_diff) # Innovation innovation = obs[t] - y_pred innovations_arr[t] = innovation # Log-likelihood contribution S_inv = np.linalg.inv(S) sign, logdet = np.linalg.slogdet(S) log_likelihood += -0.5 * ( logdet + innovation @ S_inv @ innovation + m * np.log(2 * np.pi) ) # Kalman gain K = P_xy @ S_inv # Update x = x_pred + K @ innovation P = P_pred - K @ S @ K.T filtered_states[t] = x filtered_covs[t] = P return { "filtered_states": filtered_states, "filtered_covs": filtered_covs, "innovations": innovations_arr, "log_likelihood": float(log_likelihood), }