Source code for wraquant.vol.realized

"""Realized volatility estimators.

Provides various volatility estimators from OHLCV data, including
classical, range-based, and high-frequency estimators.
"""

from __future__ import annotations

import numpy as np
import pandas as pd

from wraquant.core._coerce import coerce_series


[docs] def realized_volatility( returns: pd.Series, window: int = 20, annualize: bool = True, periods_per_year: int = 252, ) -> pd.Series: """Rolling realized volatility from return series. Parameters: returns: Return series. window: Rolling window size. annualize: Whether to annualize the volatility. periods_per_year: Periods per year for annualization. Returns: Rolling realized volatility series. """ returns = coerce_series(returns, "returns") vol = returns.rolling(window).std() if annualize: vol = vol * np.sqrt(periods_per_year) return vol
[docs] def parkinson( high: pd.Series, low: pd.Series, window: int = 20, annualize: bool = True, periods_per_year: int = 252, ) -> pd.Series: """Parkinson (1980) range-based volatility estimator. More efficient than close-to-close for continuous processes. Parameters: high: High price series. low: Low price series. window: Rolling window size. annualize: Whether to annualize. periods_per_year: Periods per year. Returns: Parkinson volatility series. """ high = coerce_series(high, "high") low = coerce_series(low, "low") log_hl = np.log(high / low) factor = 1.0 / (4.0 * np.log(2)) var = factor * (log_hl**2).rolling(window).mean() vol = np.sqrt(var) if annualize: vol = vol * np.sqrt(periods_per_year) return vol
[docs] def garman_klass( open_: pd.Series, high: pd.Series, low: pd.Series, close: pd.Series, window: int = 20, annualize: bool = True, periods_per_year: int = 252, ) -> pd.Series: """Garman-Klass (1980) volatility estimator. Uses open, high, low, close for higher efficiency than Parkinson or close-to-close. Parameters: open_: Open price series. high: High price series. low: Low price series. close: Close price series. window: Rolling window size. annualize: Whether to annualize. periods_per_year: Periods per year. Returns: Garman-Klass volatility series. """ open_ = coerce_series(open_, "open") high = coerce_series(high, "high") low = coerce_series(low, "low") close = coerce_series(close, "close") log_hl = np.log(high / low) log_co = np.log(close / open_) var = (0.5 * log_hl**2 - (2 * np.log(2) - 1) * log_co**2).rolling(window).mean() vol = np.sqrt(var.clip(lower=0)) if annualize: vol = vol * np.sqrt(periods_per_year) return vol
[docs] def rogers_satchell( open_: pd.Series, high: pd.Series, low: pd.Series, close: pd.Series, window: int = 20, annualize: bool = True, periods_per_year: int = 252, ) -> pd.Series: """Rogers-Satchell (1991) volatility estimator. Handles drift in the price process, unlike Parkinson or Garman-Klass. Parameters: open_: Open price series. high: High price series. low: Low price series. close: Close price series. window: Rolling window size. annualize: Whether to annualize. periods_per_year: Periods per year. Returns: Rogers-Satchell volatility series. """ open_ = coerce_series(open_, "open") high = coerce_series(high, "high") low = coerce_series(low, "low") close = coerce_series(close, "close") log_ho = np.log(high / open_) log_hc = np.log(high / close) log_lo = np.log(low / open_) log_lc = np.log(low / close) var = (log_ho * log_hc + log_lo * log_lc).rolling(window).mean() vol = np.sqrt(var.clip(lower=0)) if annualize: vol = vol * np.sqrt(periods_per_year) return vol
[docs] def yang_zhang( open_: pd.Series, high: pd.Series, low: pd.Series, close: pd.Series, window: int = 20, annualize: bool = True, periods_per_year: int = 252, ) -> pd.Series: """Yang-Zhang (2000) volatility estimator. Combines overnight and Rogers-Satchell estimators for minimum variance under drift and opening jumps. Parameters: open_: Open price series. high: High price series. low: Low price series. close: Close price series. window: Rolling window size. annualize: Whether to annualize. periods_per_year: Periods per year. Returns: Yang-Zhang volatility series. """ open_ = coerce_series(open_, "open") high = coerce_series(high, "high") low = coerce_series(low, "low") close = coerce_series(close, "close") k = 0.34 / (1.34 + (window + 1) / (window - 1)) # Overnight variance log_oc = np.log(open_ / close.shift(1)) overnight_var = log_oc.rolling(window).var() # Open-to-close variance log_co = np.log(close / open_) oc_var = log_co.rolling(window).var() # Rogers-Satchell rs = rogers_satchell(open_, high, low, close, window, annualize=False) rs_var = rs**2 var = overnight_var + k * oc_var + (1 - k) * rs_var vol = np.sqrt(var.clip(lower=0)) if annualize: vol = vol * np.sqrt(periods_per_year) return vol
[docs] def bipower_variation( returns: pd.Series, window: int = 20, annualize: bool = True, periods_per_year: int = 252, ) -> pd.Series: """Bipower variation -- jump-robust realised volatility estimator. The bipower variation (BPV) of Barndorff-Nielsen and Shephard (2004) estimates the *continuous* (diffusive) component of quadratic variation by using the product of adjacent absolute returns instead of squared returns. Because jumps affect only isolated observations, taking the product of *consecutive* absolute returns washes out the jump contribution asymptotically: .. math:: BPV_t = \\frac{\\pi}{2} \\sum_{i=2}^{n} |r_{t,i}| \\cdot |r_{t,i-1}| where the :math:`\\pi / 2` factor corrects for the bias introduced by using absolute values of normal random variables (:math:`E[|z|] = \\sqrt{2/\\pi}` for standard normal *z*). Parameters: returns: Return series (not prices). window: Rolling window size. Each BPV estimate uses *window* observations. annualize: Whether to annualize by ``sqrt(periods_per_year)``. periods_per_year: Trading periods per year (252 for daily). Returns: Rolling bipower variation series (as volatility, i.e. the square root of the bipower variation divided by the window) with the same index as the input. Example: >>> import pandas as pd, numpy as np >>> rng = np.random.default_rng(42) >>> returns = pd.Series(rng.normal(0, 0.01, 200)) >>> bpv = bipower_variation(returns, window=20) >>> (bpv.dropna() > 0).all() True Notes: In the absence of jumps, BPV converges to the same value as standard realised variance. When jumps are present, BPV < RV and the difference ``RV - BPV`` estimates the jump component. This is a rolling window version that computes BPV over a trailing window of *window* observations at each point. Reference: Barndorff-Nielsen, O.E. and Shephard, N. (2004). "Power and Bipower Variation with Stochastic Volatility and Jumps." *Journal of Financial Econometrics*, 2(1), 1--37. See Also: realized_volatility: Standard rolling realised volatility. jump_test_bns: Formal jump detection test using BPV. """ returns = coerce_series(returns, "returns") abs_ret = returns.abs() # Product of consecutive absolute returns products = abs_ret * abs_ret.shift(1) # Rolling sum with pi/2 correction, then normalise by window bpv_raw = (np.pi / 2.0) * products.rolling(window).sum() / window vol = np.sqrt(bpv_raw.clip(lower=0)) if annualize: vol = vol * np.sqrt(periods_per_year) return vol
[docs] def jump_test_bns( returns: pd.Series, window: int = 20, alpha: float = 0.05, ) -> dict[str, object]: """Barndorff-Nielsen & Shephard (2004) jump detection test. Tests whether jumps are present in a return series by comparing realised variance (RV) to bipower variation (BPV). Under the null hypothesis of no jumps the two estimators are asymptotically equivalent. A significantly positive difference indicates the presence of jumps. The test statistic is: .. math:: z = \\frac{RV - BPV}{\\sqrt{\\theta \\cdot QP}} where *QP* is the realised quarticity (estimated via the quad-power quarticity estimator) and *theta* is a finite-sample correction factor: .. math:: \\theta = \\left(\\frac{\\pi^2}{4} + \\pi - 5\\right) \\cdot \\frac{1}{n} The statistic is asymptotically standard normal under the null of no jumps. Parameters: returns: Return series (not prices). window: Number of observations to use for the test. The test is applied to the most recent *window* observations. Use the full series length by setting ``window=len(returns)``. alpha: Significance level for the jump detection decision. Default 0.05. Returns: Dictionary containing: - **rv** (*float*) -- Realised variance over the window. - **bpv** (*float*) -- Bipower variation over the window. - **jump_component** (*float*) -- ``max(RV - BPV, 0)``. - **continuous_component** (*float*) -- ``BPV``. - **z_statistic** (*float*) -- Test statistic (standard normal under the null of no jumps). - **p_value** (*float*) -- One-sided p-value. - **jump_detected** (*bool*) -- ``True`` if p-value < alpha. Example: >>> import pandas as pd, numpy as np >>> rng = np.random.default_rng(42) >>> returns = pd.Series(rng.normal(0, 0.01, 100)) >>> result = jump_test_bns(returns, window=100) >>> 'z_statistic' in result and 'p_value' in result True Notes: The test has low power against small or infrequent jumps in short samples. For daily data, windows of at least 60--100 observations are recommended. Reference: Barndorff-Nielsen, O.E. and Shephard, N. (2004). "Power and Bipower Variation with Stochastic Volatility and Jumps." *Journal of Financial Econometrics*, 2(1), 1--37. See Also: bipower_variation: The jump-robust volatility estimator. realized_volatility: Standard realised volatility. """ from scipy import stats as sp_stats from wraquant.core._coerce import coerce_array r = coerce_array(returns, "returns") n = min(window, len(r)) r = r[-n:] # Realised variance rv = float(np.sum(r**2)) # Bipower variation abs_r = np.abs(r) bpv = float((np.pi / 2.0) * np.sum(abs_r[1:] * abs_r[:-1])) # Quad-power quarticity for variance of (RV - BPV) mu1 = np.sqrt(2.0 / np.pi) # E[|z|] for standard normal # Tri-power quarticity-based approach: use realised quad-power if n >= 5: qpq = float( n * (np.pi**2 / 4.0) * np.sum( abs_r[3:] ** (4.0 / 3.0) * abs_r[2:-1] ** (4.0 / 3.0) * abs_r[1:-2] ** (4.0 / 3.0) ) / (n - 3) ) else: qpq = rv**2 # fallback theta = (np.pi**2 / 4.0 + np.pi - 5.0) denom = np.sqrt(max(theta * qpq / n, 1e-30)) z_stat = float((rv - bpv) / denom) # One-sided test: jumps cause RV > BPV p_value = float(1.0 - sp_stats.norm.cdf(z_stat)) return { "rv": rv, "bpv": bpv, "jump_component": float(max(rv - bpv, 0.0)), "continuous_component": bpv, "z_statistic": z_stat, "p_value": p_value, "jump_detected": p_value < alpha, }
[docs] def two_scale_realized_variance( returns: pd.Series, window: int = 20, n_slow: int = 5, annualize: bool = True, periods_per_year: int = 252, ) -> pd.Series: """Two-Scale Realised Variance (TSRV) -- noise-robust RV estimator. The TSRV of Zhang, Mykland, and Ait-Sahalia (2005) eliminates the bias caused by market microstructure noise (bid-ask bounce, discrete prices) by combining realised variances computed at two different sampling frequencies. The fast-scale estimator is contaminated by noise; the slow-scale estimator has less noise but more discretisation error. Their optimal linear combination cancels the noise bias: .. math:: TSRV = RV^{(slow)} - \\frac{\\bar{n}}{n} RV^{(all)} where :math:`RV^{(slow)}` is the average of sub-sampled RVs at a slower frequency, :math:`RV^{(all)}` uses all ticks, and :math:`\\bar{n}` is the average sub-sample size. Parameters: returns: Return series. For intraday data, these should be tick-by-tick or high-frequency log returns. window: Rolling window size. n_slow: Sub-sampling factor for the slow scale. The fast scale uses every observation; the slow scale uses every *n_slow*-th. Larger values reduce noise but increase discretisation error. Default 5. annualize: Whether to annualize. periods_per_year: Trading periods per year. Returns: Rolling TSRV series (as volatility, i.e. square root). Example: >>> import pandas as pd, numpy as np >>> rng = np.random.default_rng(42) >>> returns = pd.Series(rng.normal(0, 0.01, 200)) >>> tsrv = two_scale_realized_variance(returns, window=20) >>> (tsrv.dropna() > 0).all() True Notes: TSRV is consistent for the integrated variance even in the presence of i.i.d. microstructure noise, unlike standard RV which diverges as the sampling frequency increases. It is particularly useful for intraday data sampled at very high frequencies (seconds or ticks). Reference: Zhang, L., Mykland, P.A., and Ait-Sahalia, Y. (2005). "A Tale of Two Time Scales: Determining Integrated Volatility with Noisy High-Frequency Data." *Journal of the American Statistical Association*, 100(472), 1394--1411. See Also: realized_volatility: Standard RV (no noise correction). realized_kernel: Alternative noise-robust estimator. """ returns = coerce_series(returns, "returns") # Fast scale: standard RV using all returns rv_fast = (returns**2).rolling(window).sum() # Slow scale: sub-sampled RV # Average over n_slow sub-grids for efficiency rv_slow_accum = pd.Series(0.0, index=returns.index) for offset in range(n_slow): sub_returns = returns.iloc[offset::n_slow].reindex(returns.index) rv_sub = (sub_returns**2).rolling( max(window // n_slow, 1) ).sum() rv_slow_accum = rv_slow_accum + rv_sub.fillna(0) rv_slow = rv_slow_accum / n_slow # Noise-bias correction: TSRV = RV_slow - (n_bar / n) * RV_fast n_bar = window / n_slow tsrv_raw = rv_slow - (n_bar / window) * rv_fast # Clip to zero (numerical) tsrv_raw = tsrv_raw.clip(lower=0) vol = np.sqrt(tsrv_raw / window) if annualize: vol = vol * np.sqrt(periods_per_year) return vol
[docs] def realized_kernel( returns: pd.Series, window: int = 20, kernel: str = "parzen", annualize: bool = True, periods_per_year: int = 252, ) -> pd.Series: """Realised kernel estimator -- noise-robust flat-top kernel RV. The realised kernel of Barndorff-Nielsen, Hansen, Lunde, and Shephard (2008) uses a kernel-weighted sum of autocovariances of returns to produce a consistent, non-negative estimator of integrated variance in the presence of market microstructure noise: .. math:: RK = \\sum_{h=-H}^{H} k\\!\\left(\\frac{h}{H+1}\\right) \\hat{\\gamma}(h) where :math:`\\hat{\\gamma}(h) = \\sum_i r_i r_{i-h}` is the sample autocovariance at lag *h*, and :math:`k(\\cdot)` is a kernel function with :math:`k(0) = 1`, :math:`k(x) = 0` for :math:`|x| > 1`. The Parzen kernel is used by default as it guarantees non-negativity: .. math:: k(x) = \\begin{cases} 1 - 6x^2 + 6|x|^3 & \\text{if } |x| \\le 1/2 \\\\ 2(1-|x|)^3 & \\text{if } 1/2 < |x| \\le 1 \\\\ 0 & \\text{otherwise} \\end{cases} Parameters: returns: Return series. window: Rolling window size. kernel: Kernel function. Currently supported: ``"parzen"`` (default). The Parzen kernel guarantees non-negativity. annualize: Whether to annualize. periods_per_year: Trading periods per year. Returns: Rolling realised kernel volatility series (square root of the kernel-weighted variance). Example: >>> import pandas as pd, numpy as np >>> rng = np.random.default_rng(42) >>> returns = pd.Series(rng.normal(0, 0.01, 200)) >>> rk = realized_kernel(returns, window=20) >>> (rk.dropna() > 0).all() True Notes: The realised kernel is rate-optimal under i.i.d. noise and achieves a convergence rate of :math:`n^{-1/5}` (vs :math:`n^{-1/4}` for standard RV with optimal sparse sampling). The bandwidth *H* is chosen automatically as :math:`H = \\lceil c \\cdot n^{3/5} \\rceil` where *c* is a constant typically set to 1. Reference: Barndorff-Nielsen, O.E., Hansen, P.R., Lunde, A., and Shephard, N. (2008). "Designing Realized Kernels to Measure the Ex-Post Variation of Equity Prices in the Presence of Noise." *Econometrica*, 76(6), 1481--1536. See Also: two_scale_realized_variance: Alternative noise-robust estimator. realized_volatility: Standard RV without noise correction. bipower_variation: Jump-robust estimator. """ if kernel != "parzen": msg = f"Only 'parzen' kernel is currently supported, got '{kernel}'." raise ValueError(msg) def _parzen(x: np.ndarray) -> np.ndarray: """Parzen kernel function.""" ax = np.abs(x) out = np.zeros_like(ax) mask1 = ax <= 0.5 mask2 = (ax > 0.5) & (ax <= 1.0) out[mask1] = 1.0 - 6.0 * ax[mask1] ** 2 + 6.0 * ax[mask1] ** 3 out[mask2] = 2.0 * (1.0 - ax[mask2]) ** 3 return out r = coerce_series(returns, "returns") r_arr = r.to_numpy(dtype=np.float64) n = len(r_arr) out = np.full(n, np.nan) for t in range(window, n): seg = r_arr[t - window : t] seg_n = len(seg) # Bandwidth: c * n^(3/5), with c = 1 H = max(int(np.ceil(seg_n ** 0.6)), 1) # Compute autocovariances up to lag H rk_val = 0.0 for h in range(-H, H + 1): weight = _parzen(np.array([h / (H + 1)]))[0] if h >= 0: gamma_h = float(np.sum(seg[h:] * seg[: seg_n - h])) else: ah = -h gamma_h = float(np.sum(seg[: seg_n - ah] * seg[ah:])) rk_val += weight * gamma_h out[t] = max(rk_val, 0.0) vol = np.sqrt(out / window) result = pd.Series(vol, index=r.index, name="realized_kernel") if annualize: result = result * np.sqrt(periods_per_year) return result