"""Advanced regime detection integrations using optional packages.
Provides wrappers around pomegranate, filterpy, river, and dynamax for
Hidden Markov Models, Kalman filtering, online drift detection, and
Linear Gaussian State Space Models.
"""
from __future__ import annotations
from typing import Any
import numpy as np
import pandas as pd
from wraquant.core.decorators import requires_extra
__all__ = [
"pomegranate_hmm",
"filterpy_kalman",
"river_drift_detector",
"dynamax_lgssm",
"pykalman_filter",
]
[docs]
@requires_extra("regimes")
def pomegranate_hmm(
data: pd.Series | np.ndarray,
n_states: int = 2,
) -> dict[str, Any]:
"""Fit a Hidden Markov Model using pomegranate.
Parameters
----------
data : pd.Series or np.ndarray
Univariate observations (e.g. returns).
n_states : int, default 2
Number of hidden states.
Returns
-------
dict
Dictionary containing:
* **states** -- predicted hidden state sequence (1-D int array).
* **means** -- estimated mean of each state's emission distribution.
* **model** -- the fitted pomegranate ``DenseHMM`` object.
* **n_states** -- number of hidden states.
"""
from pomegranate.distributions import Normal
from pomegranate.hmm import DenseHMM
values = np.asarray(data, dtype=np.float64).reshape(-1, 1)
distributions = [Normal() for _ in range(n_states)]
model = DenseHMM(distributions=distributions, verbose=False)
model.fit(values.reshape(1, -1, 1))
states = model.predict(values.reshape(1, -1, 1))
states = np.asarray(states).flatten()
means = []
for dist in model.distributions:
means.append(float(dist.means[0]))
return {
"states": states.astype(int),
"means": means,
"model": model,
"n_states": n_states,
}
[docs]
@requires_extra("regimes")
def filterpy_kalman(
observations: np.ndarray | pd.Series,
F: np.ndarray,
H: np.ndarray,
Q: np.ndarray,
R: np.ndarray,
x0: np.ndarray | None = None,
P0: np.ndarray | None = None,
) -> dict[str, Any]:
"""Run a Kalman filter using filterpy.
Parameters
----------
observations : np.ndarray or pd.Series
Observed measurements. Shape ``(T,)`` for univariate or
``(T, dim_z)`` for multivariate observations.
F : np.ndarray
State transition matrix. Shape ``(dim_x, dim_x)``.
H : np.ndarray
Observation (measurement) matrix. Shape ``(dim_z, dim_x)``.
Q : np.ndarray
Process noise covariance. Shape ``(dim_x, dim_x)``.
R : np.ndarray
Measurement noise covariance. Shape ``(dim_z, dim_z)``.
x0 : np.ndarray or None, default None
Initial state estimate. Shape ``(dim_x,)`` or ``(dim_x, 1)``.
Defaults to zeros.
P0 : np.ndarray or None, default None
Initial covariance estimate. Shape ``(dim_x, dim_x)``.
Defaults to identity.
Returns
-------
dict
Dictionary containing:
* **filtered_states** -- filtered state estimates, shape ``(T, dim_x)``.
* **filtered_covariances** -- filtered covariance matrices,
shape ``(T, dim_x, dim_x)``.
* **log_likelihood** -- total log-likelihood.
* **residuals** -- measurement residuals, shape ``(T, dim_z)``.
"""
from filterpy.kalman import KalmanFilter
obs = np.asarray(observations, dtype=np.float64)
if obs.ndim == 1:
obs = obs.reshape(-1, 1)
dim_z = obs.shape[1]
dim_x = F.shape[0]
kf = KalmanFilter(dim_x=dim_x, dim_z=dim_z)
kf.F = F.copy()
kf.H = H.copy()
kf.Q = Q.copy()
kf.R = R.copy()
if x0 is not None:
kf.x = x0.reshape(dim_x, 1).copy()
if P0 is not None:
kf.P = P0.copy()
T = len(obs)
filtered_states = np.zeros((T, dim_x))
filtered_covs = np.zeros((T, dim_x, dim_x))
residuals = np.zeros((T, dim_z))
log_likelihood = 0.0
for t in range(T):
kf.predict()
kf.update(obs[t])
filtered_states[t] = kf.x.flatten()
filtered_covs[t] = kf.P.copy()
residuals[t] = kf.y.flatten()
log_likelihood += float(kf.log_likelihood)
return {
"filtered_states": filtered_states,
"filtered_covariances": filtered_covs,
"log_likelihood": log_likelihood,
"residuals": residuals,
}
[docs]
@requires_extra("regimes")
def river_drift_detector(
stream: np.ndarray | pd.Series | list[float],
method: str = "adwin",
) -> dict[str, Any]:
"""Detect concept drift in a data stream using river.
Processes each observation sequentially and records indices where
a drift is detected.
Parameters
----------
stream : np.ndarray, pd.Series, or list of float
Sequential stream of numeric observations.
method : str, default 'adwin'
Drift detection method:
* ``'adwin'`` -- Adaptive Windowing (ADWIN)
* ``'ddm'`` -- Drift Detection Method
* ``'eddm'`` -- Early Drift Detection Method
* ``'page_hinkley'`` -- Page-Hinkley test
Returns
-------
dict
Dictionary containing:
* **drift_indices** -- list of indices where drift was detected.
* **n_drifts** -- total number of drifts detected.
* **method** -- drift detection method used.
"""
from river import drift
from river.drift import binary as _binary
detectors: dict[str, Any] = {
"adwin": drift.ADWIN,
"ddm": _binary.DDM,
"eddm": _binary.EDDM,
"page_hinkley": drift.PageHinkley,
}
detector_cls = detectors.get(method)
if detector_cls is None:
raise ValueError(
f"Unknown method: {method!r}. Choose from {list(detectors)}."
)
detector = detector_cls()
values = np.asarray(stream, dtype=np.float64)
drift_indices: list[int] = []
for i, val in enumerate(values):
detector.update(float(val))
if detector.drift_detected:
drift_indices.append(i)
return {
"drift_indices": drift_indices,
"n_drifts": len(drift_indices),
"method": method,
}
# ---------------------------------------------------------------------------
# dynamax Linear Gaussian SSM
# ---------------------------------------------------------------------------
[docs]
@requires_extra("regimes")
def dynamax_lgssm(
observations: np.ndarray | pd.Series,
state_dim: int = 2,
emission_dim: int | None = None,
n_iters: int = 100,
) -> dict[str, Any]:
"""Fit a Linear Gaussian State Space Model using dynamax (JAX-based).
Parameters
----------
observations : np.ndarray or pd.Series
Observations of shape ``(T, obs_dim)`` or ``(T,)`` for univariate.
state_dim : int, default 2
Dimensionality of the latent state.
emission_dim : int or None
Observation dimension. Inferred from *observations* if None.
n_iters : int, default 100
Number of EM iterations.
Returns
-------
dict
Dictionary containing:
* **filtered_means** -- filtered state means, shape ``(T, state_dim)``.
* **filtered_covs** -- filtered state covariances.
* **smoothed_means** -- smoothed state means.
* **params** -- learned model parameters.
* **log_likelihoods** -- EM log-likelihood trace.
"""
from dynamax.linear_gaussian_ssm import LinearGaussianSSM
import jax.numpy as jnp
import jax.random as jr
obs = jnp.array(np.asarray(observations, dtype=np.float64))
if obs.ndim == 1:
obs = obs.reshape(-1, 1)
obs_dim = obs.shape[1]
emission_dim = emission_dim or obs_dim
model = LinearGaussianSSM(state_dim=state_dim, emission_dim=emission_dim)
key = jr.PRNGKey(0)
params, props = model.initialize(key)
params, log_liks = model.fit_em(params, props, obs, num_iters=n_iters)
filtered = model.filter(params, obs)
smoothed = model.smoother(params, obs)
return {
"filtered_means": np.asarray(filtered.filtered_means),
"filtered_covs": np.asarray(filtered.filtered_covariances),
"smoothed_means": np.asarray(smoothed.smoothed_means),
"params": params,
"log_likelihoods": np.asarray(log_liks),
}
# ---------------------------------------------------------------------------
# pykalman Kalman filter with EM learning
# ---------------------------------------------------------------------------
[docs]
@requires_extra("regimes")
def pykalman_filter(
observations: np.ndarray | pd.Series,
transition_matrices: np.ndarray | None = None,
observation_matrices: np.ndarray | None = None,
n_em_iter: int = 10,
) -> dict[str, Any]:
"""Kalman filter with EM parameter learning via pykalman.
Unlike the pure-numpy Kalman in ``regimes/kalman.py``, pykalman's
``KalmanFilter`` can **learn** the transition/observation matrices
and noise covariances from data via Expectation-Maximization. Use
this when you don't know the system parameters.
Parameters
----------
observations : np.ndarray or pd.Series
Observations of shape ``(T, obs_dim)`` or ``(T,)`` for
univariate data.
transition_matrices : np.ndarray or None, default None
Initial guess for the state transition matrix. When *None*,
pykalman learns it from data via EM.
observation_matrices : np.ndarray or None, default None
Initial guess for the observation matrix. When *None*,
pykalman learns it from data via EM.
n_em_iter : int, default 10
Number of EM iterations for parameter learning.
Returns
-------
dict
Dictionary containing:
* **filtered_means** -- filtered state estimates, shape
``(T, state_dim)``.
* **filtered_covs** -- filtered state covariance matrices,
shape ``(T, state_dim, state_dim)``.
* **smoothed_means** -- smoothed (RTS) state estimates,
shape ``(T, state_dim)``.
* **smoothed_covs** -- smoothed state covariance matrices,
shape ``(T, state_dim, state_dim)``.
* **learned_params** -- dict with learned ``transition``,
``observation``, ``transition_cov``, and ``observation_cov``
matrices.
* **log_likelihood** -- total log-likelihood of the data
under the learned model.
Example
-------
>>> import numpy as np
>>> from wraquant.regimes.integrations import pykalman_filter
>>> obs = np.cumsum(np.random.default_rng(0).normal(0, 1, 100))
>>> result = pykalman_filter(obs, n_em_iter=5)
>>> result["smoothed_means"].shape
(100, 1)
Notes
-----
Reference: Shumway & Stoffer (1982). "An Approach to Time Series
Smoothing and Forecasting Using the EM Algorithm." *Journal of
Time Series Analysis*, 3(4), 253-264.
See Also
--------
filterpy_kalman : When system parameters are known a priori.
dynamax_lgssm : JAX-based alternative with GPU support.
"""
from pykalman import KalmanFilter
obs = np.asarray(observations, dtype=np.float64)
if obs.ndim == 1:
obs = obs.reshape(-1, 1)
obs_dim = obs.shape[1]
# Build keyword arguments; only pass matrices if provided
kf_kwargs: dict[str, Any] = {}
if transition_matrices is not None:
kf_kwargs["transition_matrices"] = transition_matrices
if observation_matrices is not None:
kf_kwargs["observation_matrices"] = observation_matrices
# Infer state dimension from provided matrices, default to obs_dim
if transition_matrices is not None:
state_dim = transition_matrices.shape[0]
elif observation_matrices is not None:
state_dim = observation_matrices.shape[1]
else:
state_dim = obs_dim
kf = KalmanFilter(
n_dim_state=state_dim,
n_dim_obs=obs_dim,
**kf_kwargs,
)
# Run EM to learn parameters
kf = kf.em(obs, n_iter=n_em_iter)
# Filter and smooth
filtered_means, filtered_covs = kf.filter(obs)
smoothed_means, smoothed_covs = kf.smooth(obs)
# Log-likelihood
log_likelihood = float(kf.loglikelihood(obs))
return {
"filtered_means": np.asarray(filtered_means),
"filtered_covs": np.asarray(filtered_covs),
"smoothed_means": np.asarray(smoothed_means),
"smoothed_covs": np.asarray(smoothed_covs),
"learned_params": {
"transition": np.asarray(kf.transition_matrices),
"observation": np.asarray(kf.observation_matrices),
"transition_cov": np.asarray(kf.transition_covariance),
"observation_cov": np.asarray(kf.observation_covariance),
},
"log_likelihood": log_likelihood,
}