"""Signal processing filters for financial time series."""
from __future__ import annotations
import numpy as np
from numpy.typing import ArrayLike
from scipy.ndimage import median_filter as _scipy_median_filter
from scipy.signal import savgol_filter
from wraquant.core._coerce import coerce_array
__all__ = [
"savitzky_golay",
"kalman_smooth",
"wavelet_denoise",
"median_filter",
"exponential_smooth",
"hodrick_prescott",
]
[docs]
def savitzky_golay(
data: ArrayLike,
window: int = 11,
polyorder: int = 3,
) -> np.ndarray:
"""Apply a Savitzky-Golay smoothing filter.
Parameters
----------
data : array_like
Input time series.
window : int, optional
Window length (must be odd and > *polyorder*; default 11).
polyorder : int, optional
Polynomial order for the local fit (default 3).
Returns
-------
np.ndarray
Smoothed signal.
Example
-------
>>> import numpy as np
>>> from wraquant.math.signals import savitzky_golay
>>> noisy = np.sin(np.linspace(0, 4*np.pi, 200)) + np.random.randn(200)*0.3
>>> smooth = savitzky_golay(noisy, window=15, polyorder=3)
>>> smooth.shape
(200,)
>>> np.std(smooth) < np.std(noisy) # smoothing reduces variance
True
See Also
--------
kalman_smooth : Kalman-filter-based smoothing (adaptive).
exponential_smooth : Simple exponential smoothing.
"""
data = coerce_array(data, name="data")
# Ensure window is odd
if window % 2 == 0:
window += 1
return savgol_filter(data, window_length=window, polyorder=polyorder)
[docs]
def kalman_smooth(
data: ArrayLike,
process_var: float = 1e-5,
measurement_var: float = 1e-1,
) -> np.ndarray:
"""Simple 1-D Kalman smoother (forward pass).
Implements a constant-level model with Gaussian noise.
Parameters
----------
data : array_like
Observed (noisy) time series.
process_var : float, optional
Process noise variance (default 1e-5).
measurement_var : float, optional
Measurement noise variance (default 1e-1).
Returns
-------
np.ndarray
Kalman-smoothed estimate (same length as *data*).
Example
-------
>>> import numpy as np
>>> from wraquant.math.signals import kalman_smooth
>>> true_signal = np.cumsum(np.random.randn(200) * 0.01)
>>> noisy = true_signal + np.random.randn(200) * 0.05
>>> smooth = kalman_smooth(noisy, process_var=1e-5, measurement_var=0.01)
>>> smooth.shape
(200,)
Notes
-----
Larger *process_var* makes the filter more responsive to changes;
larger *measurement_var* makes it smoother. For trending markets,
increase process_var; for noisy mean-reverting signals, increase
measurement_var.
See Also
--------
savitzky_golay : Polynomial smoothing (no tuning parameters beyond window).
wraquant.regimes : Kalman filter with state-space models.
"""
data = coerce_array(data, name="data")
n = len(data)
smoothed = np.empty(n, dtype=float)
# Initialise
x_est = data[0]
p_est = 1.0
for i in range(n):
# Predict
x_pred = x_est
p_pred = p_est + process_var
# Update
k = p_pred / (p_pred + measurement_var)
x_est = x_pred + k * (data[i] - x_pred)
p_est = (1.0 - k) * p_pred
smoothed[i] = x_est
return smoothed
[docs]
def wavelet_denoise(
data: ArrayLike,
wavelet: str = "db4",
level: int | None = None,
threshold: str = "soft",
) -> np.ndarray:
"""Wavelet denoising of a time series.
Requires the optional ``pywavelets`` package (install via
``pip install wraquant[timeseries]``).
Parameters
----------
data : array_like
Input time series.
wavelet : str, optional
Wavelet name (default ``'db4'``).
level : int or None, optional
Decomposition level. ``None`` uses the maximum useful level.
threshold : {'soft', 'hard'}, optional
Thresholding mode (default ``'soft'``).
Returns
-------
np.ndarray
Denoised signal.
Raises
------
ImportError
If ``pywt`` is not installed.
Example
-------
>>> import numpy as np
>>> from wraquant.math.signals import wavelet_denoise # doctest: +SKIP
>>> noisy = np.sin(np.linspace(0, 4*np.pi, 256)) + np.random.randn(256)*0.5
>>> clean = wavelet_denoise(noisy, wavelet='db4') # doctest: +SKIP
>>> np.std(clean) < np.std(noisy) # doctest: +SKIP
True
See Also
--------
savitzky_golay : Polynomial smoothing (no optional dependencies).
median_filter : Robust to outlier spikes.
"""
try:
import pywt # type: ignore[import-untyped]
except ImportError as exc:
raise ImportError(
"wavelet_denoise requires PyWavelets. "
"Install it with: pip install wraquant[timeseries]"
) from exc
data = coerce_array(data, name="data")
if level is None:
level = pywt.dwt_max_level(len(data), pywt.Wavelet(wavelet).dec_len)
coeffs = pywt.wavedec(data, wavelet, level=level)
# Universal threshold (VisuShrink)
sigma = np.median(np.abs(coeffs[-1])) / 0.6745
uthresh = sigma * np.sqrt(2.0 * np.log(len(data)))
denoised_coeffs = [coeffs[0]] # keep approximation coefficients
for c in coeffs[1:]:
denoised_coeffs.append(pywt.threshold(c, value=uthresh, mode=threshold))
return pywt.waverec(denoised_coeffs, wavelet)[: len(data)]
[docs]
def exponential_smooth(
data: ArrayLike,
alpha: float = 0.3,
) -> np.ndarray:
r"""Simple exponential smoothing.
.. math::
s_t = \\alpha \\, x_t + (1 - \\alpha) \\, s_{t-1}
Parameters
----------
data : array_like
Input time series.
alpha : float, optional
Smoothing factor in ``(0, 1]`` (default 0.3).
Returns
-------
np.ndarray
Smoothed signal.
Example
-------
>>> import numpy as np
>>> from wraquant.math.signals import exponential_smooth
>>> data = np.array([1.0, 2.0, 3.0, 2.5, 2.0])
>>> smooth = exponential_smooth(data, alpha=0.5)
>>> smooth[0]
1.0
>>> smooth[-1] < data[-2] # smoothed toward the end
True
See Also
--------
kalman_smooth : Adaptive smoothing with noise model.
savitzky_golay : Local polynomial smoothing.
"""
data = coerce_array(data, name="data")
n = len(data)
smoothed = np.empty(n, dtype=float)
smoothed[0] = data[0]
for i in range(1, n):
smoothed[i] = alpha * data[i] + (1.0 - alpha) * smoothed[i - 1]
return smoothed
[docs]
def hodrick_prescott(
data: ArrayLike,
lamb: float = 1600.0,
) -> tuple[np.ndarray, np.ndarray]:
"""Hodrick-Prescott filter decomposition.
Decomposes *data* into a trend and a cyclical component.
Parameters
----------
data : array_like
Input time series.
lamb : float, optional
Smoothing parameter (default 1600, standard for quarterly data).
Use 6.25 for annual data, 129600 for monthly data.
Returns
-------
tuple of np.ndarray
``(trend, cycle)`` where ``trend + cycle == data``.
Example
-------
>>> import numpy as np
>>> from wraquant.math.signals import hodrick_prescott
>>> data = np.cumsum(np.random.randn(100) * 0.01) # random walk
>>> trend, cycle = hodrick_prescott(data, lamb=1600)
>>> trend.shape == cycle.shape == (100,)
True
>>> np.allclose(trend + cycle, data)
True
Notes
-----
Common lambda values: 6.25 (annual), 1600 (quarterly), 129600 (monthly).
The HP filter is controversial for financial data due to end-point bias
and spurious cyclicality.
See Also
--------
wraquant.ts.decomposition : Seasonal decomposition of time series.
wraquant.math.spectral.bandpass_filter : Frequency-domain detrending.
"""
data = coerce_array(data, name="data")
n = len(data)
# Construct the second-difference matrix K (n-2 x n)
# and solve (I + lamb * K'K) * trend = data
# Using sparse tridiagonal solver for efficiency
# The normal equations give a pentadiagonal system.
# We build it directly.
# Diagonal bands of (I + lamb * K'K)
# K'K is a banded matrix; we solve via dense linear algebra for
# moderate n, which is typical for financial time series.
diag_main = np.ones(n)
diag_main[0] += lamb
diag_main[1] += 5.0 * lamb
diag_main[n - 2] += 5.0 * lamb
diag_main[n - 1] += lamb
for i in range(2, n - 2):
diag_main[i] += 6.0 * lamb
diag_1 = np.zeros(n - 1)
diag_1[0] = -2.0 * lamb
diag_1[n - 2] = -2.0 * lamb
for i in range(1, n - 2):
diag_1[i] = -4.0 * lamb
diag_2 = np.full(n - 2, lamb)
# Build the pentadiagonal matrix
from scipy.linalg import solve_banded
# Banded storage for solve_banded:
# ab[0, :] = second super-diagonal
# ab[1, :] = first super-diagonal
# ab[2, :] = main diagonal
# ab[3, :] = first sub-diagonal
# ab[4, :] = second sub-diagonal
ab = np.zeros((5, n))
ab[2, :] = diag_main
# First super/sub diagonals
ab[1, 1:] = diag_1
ab[3, :-1] = diag_1
# Second super/sub diagonals
ab[0, 2:] = diag_2
ab[4, :-2] = diag_2
trend = solve_banded((2, 2), ab, data)
cycle = data - trend
return trend, cycle