Source code for wraquant.ts.decomposition

"""Time series decomposition methods.

Decomposition separates a time series into interpretable components --
typically trend, seasonal, and residual -- enabling cleaner analysis and
forecasting. This module provides three approaches:

1. **Classical decomposition** (``seasonal_decompose``) -- simple moving-
   average-based extraction of trend and seasonal components. Fast and
   easy to understand, but uses fixed seasonal patterns and loses data
   at endpoints.

2. **STL decomposition** (``stl_decompose``) -- Seasonal and Trend
   decomposition using Loess. Robust to outliers, allows the seasonal
   component to vary over time, and preserves all data points. This is
   the recommended default for most applications.

3. **Hodrick-Prescott filter** (``trend_filter``) -- extracts a smooth
   trend by penalising acceleration. Popular in macroeconomics but
   controversial due to endpoint instability and spurious cycle
   extraction.

When to use each:
    - **STL**: best general-purpose choice. Handles outliers, time-
      varying seasonality, and does not lose endpoints. Use this unless
      you have a specific reason not to.
    - **Classical decompose**: when you need a simple, fast
      decomposition and are comfortable with fixed seasonality. Good
      for EDA and presentation.
    - **HP filter**: for macroeconomic trend extraction where
      convention dictates its use (GDP, unemployment). Avoid for
      financial time series where it can create spurious cycles.

References:
    - Cleveland et al. (1990), "STL: A Seasonal-Trend Decomposition
      Procedure Based on Loess"
    - Hodrick & Prescott (1997), "Postwar U.S. Business Cycles: An
      Empirical Investigation"
    - Hamilton (2018), "Why You Should Never Use the Hodrick-Prescott
      Filter"
"""

from __future__ import annotations

from typing import Any

import numpy as np
import pandas as pd
from statsmodels.tsa.filters.hp_filter import hpfilter
from statsmodels.tsa.seasonal import STL
from statsmodels.tsa.seasonal import seasonal_decompose as sm_seasonal_decompose

from wraquant.core.decorators import requires_extra


[docs] def seasonal_decompose( data: pd.Series, period: int | None = None, model: str = "additive", ) -> Any: """Decompose a time series into trend, seasonal, and residual components. Classical decomposition extracts the trend via a centred moving average of width equal to the seasonal period, then computes the seasonal component as the average deviation from the trend for each season. When to use: Use classical decomposition for quick EDA and when the seasonal pattern is approximately constant over time. For production forecasting or when outliers are present, prefer ``stl_decompose`` which is more robust. Mathematical formulation: Additive: y_t = T_t + S_t + R_t Multiplicative: y_t = T_t * S_t * R_t where T is trend, S is seasonal, and R is residual. How to interpret: - ``result.trend``: long-term direction of the series. NaN at endpoints (half the period on each side). - ``result.seasonal``: periodic pattern that repeats every ``period`` observations. Constant across years. - ``result.resid``: what remains after removing trend and seasonal. Should look like white noise if the decomposition is adequate. Parameters: data: Time series to decompose. period: Seasonal period (e.g., 12 for monthly data with annual seasonality, 252 for daily with annual). Inferred from the index frequency when *None*. model: ``"additive"`` (default) -- use when seasonal amplitude is roughly constant. ``"multiplicative"`` -- use when seasonal amplitude scales with the level. Returns: ``DecomposeResult`` with ``trend``, ``seasonal``, and ``resid`` attributes, each a ``pd.Series``. Example: >>> import pandas as pd, numpy as np >>> idx = pd.date_range("2020-01-01", periods=120, freq="MS") >>> data = pd.Series(np.arange(120.0) + 5 * np.sin(np.arange(120) * 2 * np.pi / 12), index=idx) >>> result = seasonal_decompose(data, period=12) >>> result.seasonal.iloc[0] != 0 True See Also: stl_decompose: Robust decomposition with time-varying seasonality. trend_filter: Hodrick-Prescott trend extraction. """ return sm_seasonal_decompose(data, model=model, period=period)
[docs] def stl_decompose( data: pd.Series, period: int | None = None, ) -> Any: """STL (Seasonal and Trend decomposition using Loess) decomposition. STL uses locally weighted regression (Loess) to extract trend and seasonal components. Unlike classical decomposition, STL allows the seasonal pattern to evolve over time and is robust to outliers via iterative re-weighting. When to use: STL is the recommended default decomposition method. Use it when: - The seasonal pattern changes strength or shape over time. - The data contains outliers or level shifts. - You need values at the endpoints (no NaN padding). Prefer classical ``seasonal_decompose`` only for quick EDA where simplicity matters more than accuracy. Mathematical background: STL applies two nested Loess smoothers iteratively: 1. Inner loop: extract seasonal component via Loess on sub-series (one per season), then extract trend via Loess on the deseasonalised series. 2. Outer loop: compute robustness weights based on residual magnitude, then re-run the inner loop. How to interpret: Same as classical decomposition: ``trend``, ``seasonal``, ``resid``. The key difference is that ``seasonal`` can vary over time (plot it to see evolution). The ``resid`` should be approximately stationary with no remaining seasonal pattern. Parameters: data: Time series to decompose. period: Seasonal period (e.g., 7 for daily data with weekly seasonality). Uses 7 when *None* and no index frequency is available. Returns: ``STLForecast`` result with ``trend``, ``seasonal``, and ``resid`` attributes, each a ``pd.Series``. Example: >>> import pandas as pd, numpy as np >>> idx = pd.date_range("2020-01-01", periods=365, freq="D") >>> data = pd.Series(np.arange(365.0) + 3 * np.sin(np.arange(365) * 2 * np.pi / 7), index=idx) >>> result = stl_decompose(data, period=7) >>> hasattr(result, "trend") True See Also: seasonal_decompose: Simpler classical decomposition. trend_filter: Hodrick-Prescott trend extraction. References: - Cleveland et al. (1990), "STL: A Seasonal-Trend Decomposition Procedure Based on Loess" """ if period is None: period = 7 return STL(data, period=period).fit()
[docs] def trend_filter( data: pd.Series, method: str = "hp", lamb: float = 1600, ) -> pd.Series: """Extract the trend component from a time series. Currently supports the Hodrick-Prescott (HP) filter, which separates a time series into a smooth trend and a cyclical component by minimising the sum of squared deviations from the trend subject to a penalty on the trend's second difference (acceleration). When to use: The HP filter is standard in macroeconomic analysis (GDP, unemployment, inflation) where convention dictates its use. For financial time series, prefer STL or moving averages -- the HP filter can create spurious cycles and has well-documented endpoint instability (Hamilton 2018). Mathematical formulation: min_tau sum_{t=1}^T (y_t - tau_t)^2 + lambda * sum_{t=3}^T ((tau_t - tau_{t-1}) - (tau_{t-1} - tau_{t-2}))^2 where tau is the trend and lambda controls smoothness. Higher lambda => smoother trend. How to interpret: The returned series is the trend component. The cyclical component is ``data - trend``. Common lambda values: - Monthly data: lambda = 129600 (Ravn & Uhlig) - Quarterly data: lambda = 1600 (the Hodrick-Prescott default) - Annual data: lambda = 6.25 Parameters: data: Time series. method: Filter method -- ``"hp"`` (Hodrick-Prescott, default). lamb: Smoothing parameter. Default 1600 (appropriate for quarterly data). Returns: Trend component as a Series. Raises: ValueError: If *method* is not recognized. Example: >>> import pandas as pd, numpy as np >>> data = pd.Series(np.cumsum(np.random.randn(100)) + np.arange(100) * 0.5) >>> trend = trend_filter(data, lamb=1600) >>> len(trend) == len(data) True See Also: stl_decompose: Preferred for most decomposition tasks. seasonal_decompose: Classical moving-average decomposition. References: - Hodrick & Prescott (1997), "Postwar U.S. Business Cycles" - Hamilton (2018), "Why You Should Never Use the HP Filter" """ if method == "hp": _cycle, trend = hpfilter(data.dropna(), lamb=lamb) return trend msg = f"Unknown trend filter method: {method!r}" raise ValueError(msg)
# --------------------------------------------------------------------------- # Singular Spectrum Analysis # ---------------------------------------------------------------------------
[docs] def ssa_decompose( data: pd.Series, window: int | None = None, n_components: int | None = None, groups: dict[str, list[int]] | None = None, ) -> dict: """Singular Spectrum Analysis (SSA) decomposition. SSA embeds a time series into a trajectory (Hankel) matrix, applies SVD to extract orthogonal components, and reconstructs interpretable signal components (trend, oscillatory modes, noise). **SSA vs STL**: - **STL** assumes a fixed seasonal period that you specify up front. It decomposes into exactly three components: trend, seasonal, residual. Works best when the periodicity is known and stable. - **SSA** is data-adaptive and makes no assumption about periodicity. It discovers the dominant oscillatory modes directly from the data via singular values. Better for non-stationary signals, signals with multiple or changing periodicities, or when you want to separate trend from multiple oscillatory components. Algorithm: 1. **Embedding**: construct the L x K trajectory matrix (Hankel) where L = window length, K = N - L + 1. 2. **SVD**: decompose the trajectory matrix into singular triplets. 3. **Grouping**: assign singular triplets to interpretable groups (trend, oscillatory, noise) either automatically or via user specification. 4. **Reconstruction**: diagonal averaging (Hankelisation) of each group's partial matrix to recover the time-domain component. Parameters: data: Time series to decompose. NaN values are dropped. window: Embedding window length L. Should be large enough to capture the longest period of interest but not exceed N/2. If ``None``, defaults to ``N // 2``. n_components: Number of leading singular triplets to retain. If ``None``, all are retained. groups: Optional grouping of singular triplet indices into named components. For example ``{"trend": [0], "seasonal": [1, 2]}``. Each key maps to a list of 0-based component indices. If ``None``, each singular triplet is returned as its own component (``"component_0"``, ``"component_1"``, ...). Returns: Dictionary with: - ``components``: dict mapping component name to reconstructed ``pd.Series``. - ``singular_values``: 1-D numpy array of singular values. - ``explained_variance``: 1-D numpy array of explained variance ratio per singular value (sums to 1.0 over all values). - ``window``: embedding window length used. Example: >>> import numpy as np, pandas as pd >>> t = np.arange(200, dtype=float) >>> trend = 0.05 * t >>> osc = 3 * np.sin(2 * np.pi * t / 20) >>> noise = np.random.default_rng(42).normal(0, 0.3, 200) >>> data = pd.Series(trend + osc + noise) >>> result = ssa_decompose(data, window=40, n_components=5) >>> # Components sum back to approximately the original >>> recon = sum(result['components'].values()) >>> np.allclose(recon.values, data.values, atol=1e-8) True References: - Golyandina, N. & Zhigljavsky, A. (2013), *Singular Spectrum Analysis for Time Series*. Springer. - Vautard, R. & Ghil, M. (1989), "Singular spectrum analysis in nonlinear dynamics, with applications to paleoclimatic time series", Physica D. """ clean = data.dropna() x = clean.values.astype(np.float64) n = len(x) if window is None: window = n // 2 if window < 2: window = 2 if window > n // 2: window = n // 2 k = n - window + 1 # Step 1: Build trajectory (Hankel) matrix traj = np.empty((window, k), dtype=np.float64) for i in range(window): traj[i, :] = x[i : i + k] # Step 2: SVD u, s, vt = np.linalg.svd(traj, full_matrices=False) # Explained variance s_sq = s ** 2 explained_variance = s_sq / s_sq.sum() # Determine how many components to keep if n_components is not None: n_components = min(n_components, len(s)) else: n_components = len(s) # Step 3 & 4: Reconstruction via diagonal averaging def _reconstruct_component(indices: list[int]) -> np.ndarray: """Reconstruct a time-domain signal from selected singular triplets.""" # Build partial trajectory matrix partial = np.zeros_like(traj) for idx in indices: if idx < len(s): partial += s[idx] * np.outer(u[:, idx], vt[idx, :]) # Diagonal averaging (Hankelisation) result = np.zeros(n, dtype=np.float64) counts = np.zeros(n, dtype=np.float64) l_star = min(window, k) k_star = max(window, k) for i in range(n): start = max(0, i - k_star + 1) end = min(i, l_star - 1) + 1 for j in range(start, end): if window <= k: result[i] += partial[j, i - j] else: result[i] += partial[i - j, j] counts[i] += 1.0 result /= counts return result # Build component dictionary components: dict[str, pd.Series] = {} if groups is not None: for name, indices in groups.items(): valid = [i for i in indices if i < n_components] if valid: recon = _reconstruct_component(valid) components[name] = pd.Series( recon, index=clean.index, name=name, ) else: for i in range(n_components): name = f"component_{i}" recon = _reconstruct_component([i]) components[name] = pd.Series( recon, index=clean.index, name=name, ) return { "components": components, "singular_values": s[:n_components], "explained_variance": explained_variance[:n_components], "window": window, }
# --------------------------------------------------------------------------- # Empirical Mode Decomposition # ---------------------------------------------------------------------------
[docs] def emd_decompose( data: pd.Series, max_imfs: int = 10, max_siftings: int = 100, sifting_threshold: float = 0.05, ) -> dict: """Empirical Mode Decomposition (EMD) via the sifting algorithm. EMD adaptively decomposes a non-stationary, non-linear signal into a finite set of Intrinsic Mode Functions (IMFs) and a monotonic residual. Each IMF captures a narrow-band oscillatory mode whose frequency and amplitude can vary over time. Unlike Fourier or wavelet analysis, EMD makes no assumption about basis functions -- the decomposition is entirely data-driven. Algorithm (sifting): 1. Identify local maxima and minima of the signal. 2. Construct upper and lower envelopes by cubic spline interpolation through the extrema. 3. Compute the mean of the envelopes. 4. Subtract the mean from the signal to obtain a candidate IMF. 5. Repeat steps 1-4 until the candidate satisfies the IMF criteria (symmetric envelopes, near-zero mean). 6. Subtract the extracted IMF from the signal and repeat from step 1 to extract the next IMF. Parameters: data: Time series to decompose. NaN values are dropped. max_imfs: Maximum number of IMFs to extract (default 10). max_siftings: Maximum sifting iterations per IMF (default 100). sifting_threshold: Convergence threshold for the normalised squared difference between successive sifting iterations (default 0.05). Returns: Dictionary with: - ``imfs``: 2-D numpy array of shape ``(n_imfs, N)`` where each row is an IMF ordered from highest to lowest frequency. - ``residual``: 1-D numpy array of the monotonic residual. - ``n_imfs``: number of IMFs extracted. Example: >>> import numpy as np, pandas as pd >>> t = np.linspace(0, 1, 500) >>> sig = np.sin(2 * np.pi * 5 * t) + np.sin(2 * np.pi * 20 * t) + t >>> data = pd.Series(sig) >>> result = emd_decompose(data, max_imfs=5) >>> result['imfs'].shape[0] >= 1 True >>> np.all(np.isfinite(result['imfs'])) True References: - Huang, N.E. et al. (1998), "The empirical mode decomposition and the Hilbert spectrum for nonlinear and non-stationary time series analysis", Proceedings of the Royal Society A. """ from scipy.interpolate import CubicSpline clean = data.dropna() x = clean.values.astype(np.float64).copy() n = len(x) imfs: list[np.ndarray] = [] residual = x.copy() for _ in range(max_imfs): h = residual.copy() prev_h = np.zeros_like(h) for _ in range(max_siftings): # Find local extrema maxima = [] minima = [] for i in range(1, n - 1): if h[i] > h[i - 1] and h[i] > h[i + 1]: maxima.append(i) elif h[i] < h[i - 1] and h[i] < h[i + 1]: minima.append(i) # Need at least 2 maxima and 2 minima for splines if len(maxima) < 2 or len(minima) < 2: break # Extend endpoints for better boundary behavior max_idx = np.array([0] + maxima + [n - 1]) min_idx = np.array([0] + minima + [n - 1]) max_val = h[max_idx] min_val = h[min_idx] # Cubic spline envelopes t_axis = np.arange(n) upper_env = CubicSpline(max_idx, max_val)(t_axis) lower_env = CubicSpline(min_idx, min_val)(t_axis) mean_env = (upper_env + lower_env) / 2.0 prev_h = h.copy() h = h - mean_env # Check convergence (normalised squared difference) denom = np.sum(prev_h ** 2) if denom > 0: sd = np.sum((h - prev_h) ** 2) / denom if sd < sifting_threshold: break imfs.append(h) residual = residual - h # Stop if the residual is monotonic or nearly constant diffs = np.diff(residual) if np.all(diffs >= 0) or np.all(diffs <= 0) or np.std(residual) < 1e-10: break imfs_array = np.array(imfs) if imfs else np.empty((0, n)) return { "imfs": imfs_array, "residual": residual, "n_imfs": len(imfs), }
# --------------------------------------------------------------------------- # Wavelet Decomposition # ---------------------------------------------------------------------------
[docs] @requires_extra("timeseries") def wavelet_decompose( data: pd.Series, wavelet: str = "db4", level: int | None = None, ) -> dict: """Multi-level discrete wavelet decomposition. Decomposes a time series into approximation and detail coefficients at multiple resolution levels using the Discrete Wavelet Transform (DWT). At each level, the signal is split into a low-frequency approximation and a high-frequency detail component. **When to use**: - Analysing time-frequency structure at multiple scales. - Denoising (threshold the detail coefficients). - Feature extraction for ML (energy at each scale). - When the signal has transient or localised frequency content that Fourier analysis would miss. Parameters: data: Time series to decompose. NaN values are dropped. wavelet: Wavelet family and order. Common choices: ``"db4"`` (Daubechies-4, good general purpose), ``"haar"`` (simplest, piecewise constant), ``"sym5"`` (near-symmetric Daubechies), ``"coif3"`` (Coiflet, near-symmetric with vanishing moments). level: Decomposition level. If ``None``, the maximum useful level is computed automatically via ``pywt.dwt_max_level``. Returns: Dictionary with: - ``approximation``: 1-D numpy array of the final-level approximation coefficients (lowest frequency content). - ``details``: list of 1-D numpy arrays, one per level, ordered from the finest (highest frequency, level 1) to the coarsest (level ``level``). ``details[0]`` is level 1 (finest detail). - ``wavelet``: wavelet name used. - ``level``: decomposition level used. - ``coeffs``: raw coefficient list ``[cA_n, cD_n, ..., cD_1]`` as returned by ``pywt.wavedec`` for advanced use. Example: >>> import numpy as np, pandas as pd >>> data = pd.Series(np.random.default_rng(42).normal(0, 1, 256)) >>> result = wavelet_decompose(data, wavelet="db4") # doctest: +SKIP >>> len(result['details']) == result['level'] # doctest: +SKIP True References: - Mallat, S. (2009), *A Wavelet Tour of Signal Processing*. Academic Press. """ import pywt clean = data.dropna() values = clean.values.astype(np.float64) if level is None: level = pywt.dwt_max_level(len(values), wavelet) coeffs = pywt.wavedec(values, wavelet, level=level) # coeffs = [cA_n, cD_n, cD_{n-1}, ..., cD_1] approximation = coeffs[0] # Reverse detail order so details[0] = level 1 (finest) details = list(reversed(coeffs[1:])) return { "approximation": approximation, "details": details, "wavelet": wavelet, "level": level, "coeffs": coeffs, }
# --------------------------------------------------------------------------- # Unobserved Components Model # ---------------------------------------------------------------------------
[docs] def unobserved_components( data: pd.Series, level: bool = True, trend: bool = False, cycle: bool = False, seasonal: int | None = None, stochastic_level: bool = True, stochastic_trend: bool = True, stochastic_cycle: bool = True, stochastic_seasonal: bool = True, ) -> dict: """Unobserved Components Model (UCM) decomposition. Fits a structural time series model (Harvey 1989) that decomposes the series into interpretable latent components estimated via the Kalman filter and smoother: - **Level** (local level / random walk): stochastic intercept. - **Trend** (local linear trend): level + stochastic slope. - **Cycle**: damped stochastic trigonometric cycle. - **Seasonal**: time-varying seasonal pattern. - **Irregular**: white noise observation error. The model is: ``y_t = mu_t + gamma_t + c_t + epsilon_t`` where mu is the trend component (level + slope), gamma is seasonal, c is cycle, and epsilon is irregular. **When to use**: - Macro-economic or business time series where you want interpretable, probabilistic decomposition. - When you need confidence intervals on components (STL does not provide these). - Structural break analysis: sudden changes show up as jumps in the stochastic level. Parameters: data: Time series to decompose. Should have a DatetimeIndex for best results, but integer index also works. level: Include a stochastic level component (default True). trend: Include a stochastic trend / slope component (default False). When True, the model is a local linear trend. cycle: Include a damped stochastic cycle (default False). seasonal: Seasonal period. If ``None``, no seasonal component is included. stochastic_level: Allow the level to vary over time (default True). stochastic_trend: Allow the slope to vary over time (default True). stochastic_cycle: Allow the cycle amplitude/phase to vary (default True). stochastic_seasonal: Allow the seasonal pattern to evolve (default True). Returns: Dictionary with: - ``trend``: pd.Series of the estimated trend (level + slope). - ``trend_ci``: pd.DataFrame with ``lower`` and ``upper`` columns (95% confidence interval for the trend). - ``seasonal``: pd.Series of the seasonal component (or None if no seasonal was specified). - ``seasonal_ci``: pd.DataFrame or None. - ``cycle``: pd.Series of the cycle component (or None). - ``cycle_ci``: pd.DataFrame or None. - ``irregular``: pd.Series of the irregular / residual component. - ``model``: the fitted statsmodels ``UnobservedComponentsResults`` object for further inspection. Example: >>> import numpy as np, pandas as pd >>> idx = pd.date_range("2015-01-01", periods=120, freq="MS") >>> t = np.arange(120, dtype=float) >>> y = 0.5 * t + 3 * np.sin(2 * np.pi * t / 12) + np.random.default_rng(42).normal(0, 1, 120) >>> data = pd.Series(y, index=idx) >>> result = unobserved_components(data, level=True, trend=True, seasonal=12) >>> result['trend'] is not None True References: - Harvey, A.C. (1989), *Forecasting, Structural Time Series Models and the Kalman Filter*. Cambridge University Press. - Durbin, J. & Koopman, S.J. (2012), *Time Series Analysis by State Space Methods*. Oxford University Press. """ import warnings from statsmodels.tsa.statespace.structural import UnobservedComponents with warnings.catch_warnings(): warnings.simplefilter("ignore") model = UnobservedComponents( data, level=("local linear trend" if trend else "local level") if level else False, cycle=cycle, seasonal=seasonal, stochastic_level=stochastic_level, stochastic_trend=stochastic_trend, stochastic_cycle=stochastic_cycle, stochastic_seasonal=stochastic_seasonal, ) fit = model.fit(disp=False, maxiter=500) idx = data.index # Extract trend component trend_vals = fit.level["smoothed"] if hasattr(fit, "trend") and fit.trend is not None: trend_slope = fit.trend["smoothed"] trend_vals = trend_vals # level already includes the trend effect trend_series = pd.Series(trend_vals, index=idx, name="trend") # Confidence intervals for trend (approximate via +-2 * std) trend_ci = None try: trend_var = fit.level["smoothed_cov"].flatten() trend_std = np.sqrt(np.maximum(trend_var, 0)) trend_ci = pd.DataFrame( { "lower": trend_vals - 1.96 * trend_std, "upper": trend_vals + 1.96 * trend_std, }, index=idx, ) except Exception: # noqa: BLE001 pass # Seasonal seasonal_series = None seasonal_ci = None if seasonal is not None: try: seasonal_vals = fit.seasonal["smoothed"] seasonal_series = pd.Series(seasonal_vals, index=idx, name="seasonal") seasonal_var = fit.seasonal["smoothed_cov"].flatten() seasonal_std = np.sqrt(np.maximum(seasonal_var, 0)) seasonal_ci = pd.DataFrame( { "lower": seasonal_vals - 1.96 * seasonal_std, "upper": seasonal_vals + 1.96 * seasonal_std, }, index=idx, ) except Exception: # noqa: BLE001 pass # Cycle cycle_series = None cycle_ci = None if cycle: try: cycle_vals = fit.cycle["smoothed"] cycle_series = pd.Series(cycle_vals, index=idx, name="cycle") cycle_var = fit.cycle["smoothed_cov"].flatten() cycle_std = np.sqrt(np.maximum(cycle_var, 0)) cycle_ci = pd.DataFrame( { "lower": cycle_vals - 1.96 * cycle_std, "upper": cycle_vals + 1.96 * cycle_std, }, index=idx, ) except Exception: # noqa: BLE001 pass # Irregular irregular = pd.Series(fit.resid, index=idx, name="irregular") return { "trend": trend_series, "trend_ci": trend_ci, "seasonal": seasonal_series, "seasonal_ci": seasonal_ci, "cycle": cycle_series, "cycle_ci": cycle_ci, "irregular": irregular, "model": fit, }