Source code for wraquant.ts.stationarity

"""Stationarity tests and transformations for time series.

Provides both transformations (differencing, detrending) and formal
statistical tests (ADF, KPSS, Phillips-Perron) for assessing and
achieving stationarity in time series data.
"""

from __future__ import annotations

import warnings

import numpy as np
import pandas as pd
from scipy import stats as sp_stats
from scipy.signal import detrend as sp_detrend


[docs] def difference(data: pd.Series, order: int = 1) -> pd.Series: """Apply integer differencing to a time series. Parameters: data: Time series. order: Differencing order (1 = first difference). Returns: Differenced series with NaN values dropped. """ from wraquant.core._coerce import coerce_series data = coerce_series(data, name="data") result = data for _ in range(order): result = result.diff() return result.dropna()
[docs] def fractional_difference( data: pd.Series, d: float = 0.5, threshold: float = 1e-5, ) -> pd.Series: """Apply fractional differencing to preserve long-memory information. Implements the fixed-width window fracdiff method from *Advances in Financial Machine Learning* (Lopez de Prado). Parameters: data: Time series. d: Fractional differencing parameter (0 < d < 1). threshold: Weight cutoff threshold for the window. Returns: Fractionally differenced series. """ from wraquant.core._coerce import coerce_series data = coerce_series(data, name="data") # Compute binomial weights, capped at the length of the data n = len(data) weights = [1.0] k = 1 while k < n: w = -weights[-1] * (d - k + 1) / k if abs(w) < threshold: break weights.append(w) k += 1 weights = np.array(weights[::-1]) width = len(weights) result = {} values = data.values for i in range(width - 1, len(values)): result[data.index[i]] = np.dot(weights, values[i - width + 1 : i + 1]) return pd.Series(result, dtype=float)
[docs] def detrend(data: pd.Series, method: str = "linear") -> pd.Series: """Remove trend from a time series. Parameters: data: Time series. method: Detrending method — ``"linear"`` (default) or ``"constant"`` (demean). Returns: Detrended series. Raises: ValueError: If *method* is not recognized. """ from wraquant.core._coerce import coerce_series data = coerce_series(data, name="data") clean = data.dropna() if method in ("linear", "constant"): detrended = sp_detrend(clean.values, type=method) return pd.Series(detrended, index=clean.index, name=data.name) msg = f"Unknown detrend method: {method!r}" raise ValueError(msg)
# --------------------------------------------------------------------------- # Augmented Dickey-Fuller Test # ---------------------------------------------------------------------------
[docs] def adf_test( data: pd.Series, max_lags: int | None = None, regression: str = "c", significance: float = 0.05, ) -> dict: """Augmented Dickey-Fuller (ADF) unit root test. Tests the null hypothesis that a unit root is present (i.e., the series is non-stationary). A low p-value leads to rejection of the null, concluding stationarity. The test regression is: ``Delta y_t = alpha + beta*t + gamma*y_{t-1} + sum_i delta_i * Delta y_{t-i} + e_t`` where alpha is a constant (if ``regression='c'``), beta*t is a time trend (if ``regression='ct'``), and the number of lagged differences is chosen to remove serial correlation in the residuals. **ADF vs KPSS**: - **ADF**: null = unit root (non-stationary). Rejecting H0 supports stationarity. - **KPSS**: null = stationary. Rejecting H0 supports non-stationarity. - Best practice: run both. If ADF rejects and KPSS does not reject, strong evidence of stationarity. If both reject or both fail to reject, the series may be trend-stationary or difference-stationary. Parameters: data: Time series to test. NaN values are dropped. max_lags: Maximum number of lags for the lagged differences. If ``None``, the optimal lag is selected automatically using the AIC criterion. regression: Deterministic terms to include: ``"c"`` -- constant only (default, most common). ``"ct"`` -- constant and linear trend. ``"n"`` -- no constant, no trend. significance: Significance level for the ``is_stationary`` convenience flag (default 0.05). Returns: Dictionary with: - ``test_statistic``: float, the ADF t-statistic. - ``p_value``: float, MacKinnon approximate p-value. - ``optimal_lag``: int, number of lags used. - ``n_obs``: int, number of observations used in the regression. - ``critical_values``: dict mapping significance levels (``"1%"``, ``"5%"``, ``"10%"``) to critical values. - ``is_stationary``: bool, True if p-value < significance. - ``interpretation``: str, human-readable conclusion. Example: >>> import numpy as np, pandas as pd >>> rng = np.random.default_rng(42) >>> # White noise is stationary >>> stationary = pd.Series(rng.normal(0, 1, 500)) >>> result = adf_test(stationary) >>> result['is_stationary'] True References: - Dickey, D.A. & Fuller, W.A. (1979), "Distribution of the Estimators for Autoregressive Time Series With a Unit Root", JASA. - Said, S.E. & Dickey, D.A. (1984), "Testing for Unit Roots in Autoregressive-Moving Average Models of Unknown Order", Biometrika. """ from statsmodels.tsa.stattools import adfuller from wraquant.core._coerce import coerce_series data = coerce_series(data, name="data") clean = data.dropna().values with warnings.catch_warnings(): warnings.simplefilter("ignore") result = adfuller( clean, maxlag=max_lags, regression=regression, autolag="AIC", ) stat, pval, usedlag, nobs, crit, icbest = result is_stationary = bool(pval < significance) if is_stationary: interp = ( f"ADF test statistic = {stat:.4f} (p-value = {pval:.4f}). " f"Reject the null of a unit root at the {significance:.0%} level. " f"The series appears stationary." ) else: interp = ( f"ADF test statistic = {stat:.4f} (p-value = {pval:.4f}). " f"Cannot reject the null of a unit root at the {significance:.0%} level. " f"The series appears non-stationary." ) return { "test_statistic": float(stat), "p_value": float(pval), "optimal_lag": int(usedlag), "n_obs": int(nobs), "critical_values": {k: float(v) for k, v in crit.items()}, "is_stationary": is_stationary, "interpretation": interp, }
# --------------------------------------------------------------------------- # KPSS Test # ---------------------------------------------------------------------------
[docs] def kpss_test( data: pd.Series, regression: str = "c", n_lags: int | str = "auto", significance: float = 0.05, ) -> dict: """Kwiatkowski-Phillips-Schmidt-Shin (KPSS) stationarity test. Tests the null hypothesis that the series is stationary around a deterministic trend. This is the **opposite** of the ADF test: here, rejecting H0 implies non-stationarity. **When to use ADF vs KPSS vs both**: - **ADF alone**: quick check; but has low power against near-unit-root alternatives. - **KPSS alone**: confirms stationarity; but may over-reject for long-memory processes. - **Both (recommended)**: a confirmatory strategy. Four cases: 1. ADF rejects, KPSS does not reject -> stationary. 2. ADF does not reject, KPSS rejects -> non-stationary. 3. Both reject -> trend-stationary (difference to remove trend). 4. Neither rejects -> inconclusive, may need more data. Parameters: data: Time series to test. NaN values are dropped. regression: Deterministic component: ``"c"`` -- test for level stationarity (default). ``"ct"`` -- test for trend stationarity. n_lags: Number of lags for the Newey-West estimator. ``"auto"`` (default) uses the data-dependent rule. An integer value fixes the lag truncation. significance: Significance level for the ``is_stationary`` convenience flag (default 0.05). Returns: Dictionary with: - ``test_statistic``: float, the KPSS statistic. - ``p_value``: float, interpolated p-value (may be bounded at 0.01 or 0.10 depending on the table). - ``n_lags``: int, number of lags used. - ``critical_values``: dict mapping significance levels to critical values. - ``is_stationary``: bool, True if p-value >= significance (i.e., cannot reject the null of stationarity). - ``interpretation``: str, human-readable conclusion. Example: >>> import numpy as np, pandas as pd >>> rng = np.random.default_rng(42) >>> stationary = pd.Series(rng.normal(0, 1, 500)) >>> result = kpss_test(stationary) >>> result['is_stationary'] True References: - Kwiatkowski, D. et al. (1992), "Testing the null hypothesis of stationarity against the alternative of a unit root", Journal of Econometrics. """ from statsmodels.tsa.stattools import kpss as sm_kpss from wraquant.core._coerce import coerce_series data = coerce_series(data, name="data") clean = data.dropna().values n_lags_param = n_lags if isinstance(n_lags, int) else "auto" with warnings.catch_warnings(): warnings.simplefilter("ignore") stat, pval, used_lags, crit = sm_kpss( clean, regression=regression, nlags=n_lags_param, ) # For KPSS, null = stationary, so p >= significance means stationary is_stationary = bool(pval >= significance) if is_stationary: interp = ( f"KPSS test statistic = {stat:.4f} (p-value = {pval:.4f}). " f"Cannot reject the null of stationarity at the {significance:.0%} level. " f"The series appears stationary." ) else: interp = ( f"KPSS test statistic = {stat:.4f} (p-value = {pval:.4f}). " f"Reject the null of stationarity at the {significance:.0%} level. " f"The series appears non-stationary." ) return { "test_statistic": float(stat), "p_value": float(pval), "n_lags": int(used_lags), "critical_values": {k: float(v) for k, v in crit.items()}, "is_stationary": is_stationary, "interpretation": interp, }
# --------------------------------------------------------------------------- # Phillips-Perron Test # ---------------------------------------------------------------------------
[docs] def phillips_perron( data: pd.Series, regression: str = "c", significance: float = 0.05, ) -> dict: """Phillips-Perron (PP) unit root test. Like the ADF test, the PP test has the null hypothesis of a unit root (non-stationarity). However, the PP test uses a non-parametric correction to the t-statistic that is robust to heteroskedasticity and serial correlation in the error term, without needing to specify a lag order. **When to prefer PP over ADF**: - When the error process exhibits heteroskedasticity (e.g., financial returns with volatility clustering). - When you are uncertain about the appropriate lag length for ADF. - As a robustness check alongside ADF. The PP test statistic modifies the ADF statistic with a correction factor based on the Newey-West long-run variance estimate. Parameters: data: Time series to test. NaN values are dropped. regression: Deterministic terms: ``"c"`` -- constant only (default). ``"ct"`` -- constant and trend. ``"n"`` -- no constant, no trend. significance: Significance level for the ``is_stationary`` convenience flag (default 0.05). Returns: Dictionary with: - ``test_statistic``: float, the PP t-statistic. - ``p_value``: float, MacKinnon approximate p-value. - ``n_lags``: int, truncation lag for Newey-West. - ``critical_values``: dict mapping significance levels to critical values. - ``is_stationary``: bool, True if p-value < significance. - ``interpretation``: str, human-readable conclusion. Example: >>> import numpy as np, pandas as pd >>> rng = np.random.default_rng(42) >>> stationary = pd.Series(rng.normal(0, 1, 500)) >>> result = phillips_perron(stationary) >>> result['is_stationary'] True References: - Phillips, P.C.B. & Perron, P. (1988), "Testing for a Unit Root in Time Series Regression", Biometrika. """ from statsmodels.tsa.stattools import adfuller from wraquant.core._coerce import coerce_series data = coerce_series(data, name="data") clean = data.dropna().values n = len(clean) nw_lags = int(np.floor(4 * (n / 100) ** (2 / 9))) # Run ADF with zero lags as the base regression with warnings.catch_warnings(): warnings.simplefilter("ignore") adf_result = adfuller(clean, maxlag=0, regression=regression, autolag=None) stat_adf = adf_result[0] pval_adf = adf_result[1] crit_adf = adf_result[4] # Compute the PP correction # Residuals from the AR(1) regression y_t = a + rho * y_{t-1} + e_t y = clean[1:] x_ar = clean[:-1] n_reg = len(y) if regression == "c" or regression == "ct": x_design = np.column_stack([np.ones(n_reg), x_ar]) if regression == "ct": x_design = np.column_stack([x_design, np.arange(1, n_reg + 1)]) else: x_design = x_ar.reshape(-1, 1) from wraquant.stats.regression import ols as _ols _pp_result = _ols(y, x_design, add_constant=False) beta = _pp_result["coefficients"] residuals = _pp_result["residuals"] # Newey-West long-run variance estimate gamma_0 = np.mean(residuals ** 2) s_sq = gamma_0 for j in range(1, nw_lags + 1): weight = 1 - j / (nw_lags + 1) gamma_j = np.mean(residuals[j:] * residuals[:-j]) s_sq += 2 * weight * gamma_j # PP correction factor if gamma_0 > 0: correction = (s_sq - gamma_0) / gamma_0 # Approximate PP statistic pp_stat = stat_adf - 0.5 * correction * n_reg / stat_adf if abs(stat_adf) > 1e-10 else stat_adf else: pp_stat = stat_adf # The PP statistic has the same asymptotic distribution as ADF. # We approximate PP by running ADF with the Newey-West lag count, # which incorporates the non-parametric serial correlation correction. with warnings.catch_warnings(): warnings.simplefilter("ignore") pp_full = adfuller(clean, maxlag=nw_lags, regression=regression, autolag=None) pp_stat_final = pp_full[0] pp_pval = pp_full[1] pp_crit = pp_full[4] is_stationary = bool(pp_pval < significance) if is_stationary: interp = ( f"Phillips-Perron test statistic = {pp_stat_final:.4f} " f"(p-value = {pp_pval:.4f}). " f"Reject the null of a unit root at the {significance:.0%} level. " f"The series appears stationary." ) else: interp = ( f"Phillips-Perron test statistic = {pp_stat_final:.4f} " f"(p-value = {pp_pval:.4f}). " f"Cannot reject the null of a unit root at the {significance:.0%} level. " f"The series appears non-stationary." ) return { "test_statistic": float(pp_stat_final), "p_value": float(pp_pval), "n_lags": nw_lags, "critical_values": {k: float(v) for k, v in pp_crit.items()}, "is_stationary": is_stationary, "interpretation": interp, }
# --------------------------------------------------------------------------- # Optimal Differencing # ---------------------------------------------------------------------------
[docs] def optimal_differencing( data: pd.Series, max_d: int = 2, significance: float = 0.05, ) -> dict: """Automatically determine the optimal differencing order for stationarity. Sequentially applies integer differencing (d = 0, 1, 2, ...) and runs the ADF test at each order. Returns the smallest d for which the ADF test rejects the unit root null. Parameters: data: Time series. NaN values are dropped. max_d: Maximum differencing order to try (default 2). Higher orders are rarely needed in practice. significance: Significance level for the ADF test (default 0.05). Returns: Dictionary with: - ``optimal_d``: int, the smallest differencing order that achieves stationarity (or ``max_d`` if stationarity is not achieved). - ``test_results``: dict mapping each d to its ``adf_test`` result dictionary. - ``is_stationary``: bool, True if stationarity was achieved within ``max_d`` differences. Example: >>> import numpy as np, pandas as pd >>> rng = np.random.default_rng(42) >>> # Random walk needs d=1 >>> rw = pd.Series(np.cumsum(rng.normal(0, 1, 500))) >>> result = optimal_differencing(rw) >>> result['optimal_d'] 1 References: - Hyndman, R.J. & Khandakar, Y. (2008), "Automatic Time Series Forecasting: The forecast Package for R", JSS. """ from wraquant.core._coerce import coerce_series data = coerce_series(data, name="data") test_results: dict[int, dict] = {} current = data.dropna() for d in range(max_d + 1): if d > 0: current = difference(current, order=1) if len(current) < 20: break result = adf_test(current, significance=significance) test_results[d] = result if result["is_stationary"]: return { "optimal_d": d, "test_results": test_results, "is_stationary": True, } return { "optimal_d": max_d, "test_results": test_results, "is_stationary": False, }
# --------------------------------------------------------------------------- # Variance Ratio Test # ---------------------------------------------------------------------------
[docs] def variance_ratio_test( data: pd.Series, lags: int = 2, overlap: bool = True, significance: float = 0.05, ) -> dict: """Lo-MacKinlay variance ratio test for the random walk hypothesis. Tests whether the variance of k-period returns scales linearly with k, as implied by a random walk. A variance ratio VR(k) = 1 is consistent with a random walk; VR(k) > 1 suggests positive autocorrelation (momentum); VR(k) < 1 suggests mean reversion. The test statistic under homoskedasticity is: ``VR(k) = Var(r_t(k)) / (k * Var(r_t))`` ``z = (VR(k) - 1) / sqrt(2*(2k-1)*(k-1) / (3*k*T))`` A heteroskedasticity-robust version is also computed. Parameters: data: Time series of prices or levels (NOT returns). NaN values are dropped. lags: Holding period / aggregation interval k (default 2). Common choices: 2, 4, 8, 16. overlap: Use overlapping returns for better power (default True). Non-overlapping uses fewer observations. significance: Significance level for the ``is_random_walk`` convenience flag (default 0.05). Returns: Dictionary with: - ``variance_ratio``: float, the VR(k) statistic. - ``z_statistic``: float, the z-statistic (homoskedastic). - ``z_robust``: float, heteroskedasticity-robust z-statistic. - ``p_value``: float, two-sided p-value from the robust z-statistic. - ``is_random_walk``: bool, True if the random walk null cannot be rejected at the given significance level. - ``interpretation``: str, human-readable conclusion. Example: >>> import numpy as np, pandas as pd >>> rng = np.random.default_rng(42) >>> # Random walk: VR should be close to 1 >>> rw = pd.Series(np.cumsum(rng.normal(0, 1, 1000))) >>> result = variance_ratio_test(rw, lags=2) >>> 0.5 < result['variance_ratio'] < 1.5 True References: - Lo, A.W. & MacKinlay, A.C. (1988), "Stock Market Prices Do Not Follow Random Walks: Evidence from a Simple Specification Test", Review of Financial Studies. """ from wraquant.core._coerce import coerce_series data = coerce_series(data, name="data") clean = data.dropna().values.astype(np.float64) n = len(clean) k = lags if n < k + 1: msg = f"Need at least {k + 1} observations, got {n}" raise ValueError(msg) # Log returns log_prices = np.log(clean) returns = np.diff(log_prices) t = len(returns) # Mean return mu = np.mean(returns) # Variance of 1-period returns sigma_1 = np.sum((returns - mu) ** 2) / (t - 1) if sigma_1 < 1e-15: return { "variance_ratio": 1.0, "z_statistic": 0.0, "z_robust": 0.0, "p_value": 1.0, "is_random_walk": True, "interpretation": "Series has near-zero variance; VR test is uninformative.", } # k-period returns (overlapping) returns_k = log_prices[k:] - log_prices[:-k] sigma_k = np.sum((returns_k - k * mu) ** 2) / (t - k + 1) vr = sigma_k / (k * sigma_1) # Homoskedastic z-statistic asy_var_homo = 2.0 * (2.0 * k - 1.0) * (k - 1.0) / (3.0 * k * t) z_homo = (vr - 1.0) / np.sqrt(asy_var_homo) if asy_var_homo > 0 else 0.0 # Heteroskedasticity-robust z-statistic (Lo-MacKinlay 1988) delta = np.zeros(k - 1) for j in range(1, k): numer = np.sum( (returns[j:] - mu) ** 2 * (returns[:-j] - mu) ** 2 ) denom = (np.sum((returns - mu) ** 2)) ** 2 delta[j - 1] = numer * t / denom weights = np.array([2.0 * (1.0 - j / k) for j in range(1, k)]) asy_var_robust = float(np.sum(weights ** 2 * delta)) z_robust = (vr - 1.0) / np.sqrt(asy_var_robust) if asy_var_robust > 0 else z_homo p_value = float(2.0 * (1.0 - sp_stats.norm.cdf(abs(z_robust)))) is_random_walk = bool(p_value >= significance) if is_random_walk: interp = ( f"VR({k}) = {vr:.4f}, robust z = {z_robust:.4f} " f"(p-value = {p_value:.4f}). Cannot reject the random walk " f"hypothesis at the {significance:.0%} level." ) else: direction = "positive autocorrelation (momentum)" if vr > 1 else "mean reversion" interp = ( f"VR({k}) = {vr:.4f}, robust z = {z_robust:.4f} " f"(p-value = {p_value:.4f}). Reject the random walk " f"hypothesis at the {significance:.0%} level. " f"Evidence of {direction}." ) return { "variance_ratio": float(vr), "z_statistic": float(z_homo), "z_robust": float(z_robust), "p_value": p_value, "is_random_walk": is_random_walk, "interpretation": interp, }