Source code for wraquant.stats.cointegration

"""Cointegration tests and pairs trading utilities for financial data."""

from __future__ import annotations

from itertools import combinations

import numpy as np
import pandas as pd
from statsmodels.tsa.stattools import adfuller

from wraquant.core.decorators import requires_extra


[docs] def engle_granger( y1: pd.Series, y2: pd.Series, max_lag: int | None = None, ) -> dict: """Engle-Granger two-step cointegration test. Regresses *y1* on *y2* via OLS, then tests the residuals for a unit root using the Augmented Dickey-Fuller test. Parameters: y1: First price series. y2: Second price series. max_lag: Maximum number of lags for the ADF test. When *None*, ``adfuller`` selects lags automatically via AIC. Returns: Dictionary with ``statistic`` (ADF test statistic), ``p_value``, ``is_cointegrated`` (at 5 % significance), ``hedge_ratio`` (OLS slope coefficient), and ``residuals`` (pd.Series). """ y1_clean = y1.dropna() y2_clean = y2.dropna() # Align on common index common = y1_clean.index.intersection(y2_clean.index) y1_vals = y1_clean.loc[common].values.astype(float) y2_vals = y2_clean.loc[common].values.astype(float) # Step 1: OLS regression y1 = beta * y2 + alpha + eps — canonical import from wraquant.stats.regression import ols as _ols _eg_result = _ols(y1_vals, y2_vals, add_constant=True) # coefficients[0] = intercept (alpha), coefficients[1] = slope (beta) beta = _eg_result["coefficients"][1] residuals = y1_vals - beta * y2_vals # Step 2: ADF test on residuals adf_result = adfuller(residuals, maxlag=max_lag, autolag="AIC") stat, p_value = adf_result[0], adf_result[1] residual_series = pd.Series(residuals, index=common, name="residuals") return { "statistic": float(stat), "p_value": float(p_value), "is_cointegrated": bool(p_value < 0.05), "hedge_ratio": float(beta), "residuals": residual_series, }
[docs] @requires_extra("timeseries") def johansen( data: pd.DataFrame, det_order: int = 0, k_ar_diff: int = 1, ) -> dict: """Johansen cointegration test for multiple time series. Requires the ``timeseries`` optional dependency group (provides ``statsmodels.tsa.vector_ar.vecm``). Parameters: data: DataFrame of price series (columns = assets). det_order: Deterministic term order. ``-1`` for no deterministic term, ``0`` for constant, ``1`` for linear trend. k_ar_diff: Number of lagged differences in the model. Returns: Dictionary with ``trace_stats`` (array of trace statistics), ``eigenvalues``, ``critical_values_95`` (95 % critical values), and ``n_cointegrating`` (number of cointegrating relationships at the 5 % level). """ from statsmodels.tsa.vector_ar.vecm import coint_johansen clean = data.dropna() result = coint_johansen(clean.values, det_order, k_ar_diff) trace_stats = result.lr1 crit_95 = result.cvt[:, 1] # 95% critical values eigenvalues = result.eig # Count cointegrating relationships (trace stat > critical value) n_coint = int(np.sum(trace_stats > crit_95)) return { "trace_stats": trace_stats, "eigenvalues": eigenvalues, "critical_values_95": crit_95, "n_cointegrating": n_coint, }
[docs] def half_life(spread: pd.Series) -> float: """Estimate the half-life of mean reversion for a spread series. Fits an OLS regression of the change in spread on the lagged spread level: ``delta_spread = phi * spread_lag + eps``. The half-life is ``-log(2) / log(1 + phi)``. Parameters: spread: Spread (or residual) series. Returns: Half-life in the same time units as the spread index. Returns ``float('inf')`` when the spread is not mean-reverting. """ clean = spread.dropna().values.astype(float) lag = clean[:-1] delta = np.diff(clean) # OLS: delta = phi * lag + eps (no intercept needed for half-life) phi = float(np.dot(lag, delta) / np.dot(lag, lag)) if phi >= 0: # Not mean-reverting return float("inf") hl = -np.log(2) / np.log(1 + phi) return float(hl)
[docs] def spread( y1: pd.Series, y2: pd.Series, hedge_ratio: float | None = None, ) -> pd.Series: """Compute the spread between two price series. When *hedge_ratio* is ``None`` it is estimated via OLS. Parameters: y1: First price series (the dependent variable). y2: Second price series (the independent variable). hedge_ratio: Explicit hedge ratio. If ``None``, the ratio is estimated from the data using OLS. Returns: Spread series (``y1 - hedge_ratio * y2``). """ common = y1.dropna().index.intersection(y2.dropna().index) y1_aligned = y1.loc[common] y2_aligned = y2.loc[common] if hedge_ratio is None: y1_vals = y1_aligned.values.astype(float) y2_vals = y2_aligned.values.astype(float) from wraquant.stats.regression import ols as _ols _sp_result = _ols(y1_vals, y2_vals, add_constant=True) beta = _sp_result["coefficients"][1] # slope (hedge ratio) else: beta = hedge_ratio result = y1_aligned - beta * y2_aligned result.name = "spread" return result
[docs] def zscore_signal(spread: pd.Series, window: int = 20) -> pd.Series: """Compute a rolling z-score of the spread for trading signals. Parameters: spread: Spread series. window: Rolling window size for mean and standard deviation. Returns: Rolling z-score series. """ rolling_mean = spread.rolling(window).mean() rolling_std = spread.rolling(window).std(ddof=1) z = (spread - rolling_mean) / rolling_std z.name = "zscore" return z
[docs] def hedge_ratio( y1: pd.Series, y2: pd.Series, method: str = "ols", ) -> float: """Estimate the hedge ratio between two price series. Parameters: y1: First (dependent) price series. y2: Second (independent) price series. method: Estimation method — ``"ols"`` for ordinary least squares or ``"tls"`` for total least squares (orthogonal regression). Returns: Hedge ratio as a float. Raises: ValueError: If *method* is not recognized. """ common = y1.dropna().index.intersection(y2.dropna().index) y1_vals = y1.loc[common].values.astype(float) y2_vals = y2.loc[common].values.astype(float) if method == "ols": from wraquant.stats.regression import ols as _ols _hr_result = _ols(y1_vals, y2_vals, add_constant=True) return float(_hr_result["coefficients"][1]) # slope elif method == "tls": # Total least squares via SVD data = np.column_stack([y2_vals - y2_vals.mean(), y1_vals - y1_vals.mean()]) _u, _s, vt = np.linalg.svd(data, full_matrices=False) # The TLS slope is -V[0,1] / V[1,1] beta = -vt[-1, 0] / vt[-1, 1] return float(beta) else: msg = f"Unknown hedge ratio method: {method!r}" raise ValueError(msg)
[docs] def pairs_backtest_signals( spread: pd.Series, entry_z: float = 2.0, exit_z: float = 0.5, ) -> pd.Series: """Generate pairs trading signals based on z-score thresholds. The strategy goes short the spread when z-score > *entry_z*, goes long when z-score < -*entry_z*, and exits when the z-score crosses back inside ``[-exit_z, exit_z]``. Parameters: spread: Spread series (raw, not z-scored). entry_z: Z-score threshold for entry (absolute value). exit_z: Z-score threshold for exit (absolute value). Returns: Signal series with values in ``{-1, 0, 1}``. ``1`` means long the spread, ``-1`` means short, ``0`` means flat. """ z = zscore_signal(spread) signals = pd.Series(0, index=spread.index, name="signal", dtype=int) position = 0 for i in range(len(z)): if np.isnan(z.iloc[i]): signals.iloc[i] = 0 continue zval = z.iloc[i] if position == 0: if zval > entry_z: position = -1 # short spread elif zval < -entry_z: position = 1 # long spread elif position == 1: if zval > -exit_z: position = 0 elif position == -1: if zval < exit_z: position = 0 signals.iloc[i] = position return signals
[docs] def find_cointegrated_pairs( prices_df: pd.DataFrame, significance: float = 0.05, ) -> list[tuple]: """Scan a DataFrame of price series and find all cointegrated pairs. For each pair of columns, the Engle-Granger cointegration test is applied. Pairs with a p-value below *significance* are returned. Parameters: prices_df: DataFrame of price series (columns = asset names). significance: Significance level for the cointegration test. Returns: List of tuples ``(asset1, asset2, p_value, hedge_ratio)`` for each cointegrated pair, sorted by p-value ascending. """ columns = prices_df.columns.tolist() pairs: list[tuple] = [] for col1, col2 in combinations(columns, 2): y1 = prices_df[col1].dropna() y2 = prices_df[col2].dropna() if len(y1) < 30 or len(y2) < 30: continue result = engle_granger(y1, y2) if result["p_value"] < significance: pairs.append((col1, col2, result["p_value"], result["hedge_ratio"])) pairs.sort(key=lambda x: x[2]) return pairs