"""Statistical technical analysis indicators.
This module provides rolling statistical measures commonly used in
quantitative analysis and systematic trading. All functions accept
``pd.Series`` inputs and return ``pd.Series``.
"""
from __future__ import annotations
import numpy as np
import pandas as pd
__all__ = [
"zscore",
"percentile_rank",
"mean_deviation",
"median",
"skewness",
"kurtosis",
"entropy",
"hurst_exponent",
"correlation",
"beta",
"r_squared",
"information_coefficient",
]
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
from wraquant.ta._validators import validate_period as _validate_period
from wraquant.ta._validators import validate_series as _validate_series
# ---------------------------------------------------------------------------
# Z-Score
# ---------------------------------------------------------------------------
[docs]
def zscore(data: pd.Series, period: int = 20) -> pd.Series:
"""Rolling z-score of price.
Measures how many standard deviations the current value is from the
rolling mean.
Interpretation:
- **> +2**: Price is 2+ standard deviations above mean --
statistically unusual (overbought).
- **< -2**: Price is 2+ standard deviations below mean --
statistically unusual (oversold).
- **Near 0**: Price is near its rolling average (fair value).
- Mean-reversion traders buy at z < -2 and sell at z > +2.
- In trending markets, z-score can stay extreme for extended
periods -- use with a trend filter.
Parameters
----------
data : pd.Series
Price series.
period : int, default 20
Rolling window length.
Returns
-------
pd.Series
Z-score values (unbounded).
Example
-------
>>> import pandas as pd
>>> close = pd.Series([10, 11, 12, 11, 10, 9, 10, 11, 12, 13])
>>> zscore(close, period=5) # doctest: +SKIP
"""
data = _validate_series(data)
_validate_period(period)
rolling_mean = data.rolling(window=period, min_periods=period).mean()
rolling_std = data.rolling(window=period, min_periods=period).std(ddof=1)
result = (data - rolling_mean) / rolling_std
result.name = "zscore"
return result
# ---------------------------------------------------------------------------
# Percentile Rank
# ---------------------------------------------------------------------------
[docs]
def percentile_rank(data: pd.Series, period: int = 20) -> pd.Series:
"""Rolling percentile rank.
Computes the percentage of values within the rolling window that are
less than or equal to the current value.
Parameters
----------
data : pd.Series
Price series.
period : int, default 20
Rolling window length.
Returns
-------
pd.Series
Percentile rank in [0, 100].
Example
-------
>>> import pandas as pd
>>> close = pd.Series([10, 11, 12, 11, 10, 9, 10, 11, 12, 13])
>>> percentile_rank(close, period=5) # doctest: +SKIP
"""
data = _validate_series(data)
_validate_period(period)
def _pct_rank(window: np.ndarray) -> float:
current = window[-1]
return np.sum(window <= current) / len(window) * 100.0
result = data.rolling(window=period, min_periods=period).apply(_pct_rank, raw=True)
result.name = "percentile_rank"
return result
# ---------------------------------------------------------------------------
# Mean Deviation
# ---------------------------------------------------------------------------
[docs]
def mean_deviation(data: pd.Series, period: int = 20) -> pd.Series:
"""Rolling mean absolute deviation.
Computes the average of absolute deviations from the rolling mean.
Parameters
----------
data : pd.Series
Price series.
period : int, default 20
Rolling window length.
Returns
-------
pd.Series
Mean absolute deviation values (>= 0).
Example
-------
>>> import pandas as pd
>>> close = pd.Series([10, 11, 12, 11, 10, 9, 10, 11, 12, 13])
>>> mean_deviation(close, period=5) # doctest: +SKIP
"""
data = _validate_series(data)
_validate_period(period)
result = data.rolling(window=period, min_periods=period).apply(
lambda x: np.mean(np.abs(x - np.mean(x))), raw=True
)
result.name = "mean_deviation"
return result
# ---------------------------------------------------------------------------
# Median
# ---------------------------------------------------------------------------
# ---------------------------------------------------------------------------
# Skewness
# ---------------------------------------------------------------------------
[docs]
def skewness(data: pd.Series, period: int = 20) -> pd.Series:
"""Rolling skewness.
Measures the asymmetry of the distribution of values within the
rolling window. Uses Fisher's definition (bias-corrected).
Interpretation:
- **Positive skew**: Distribution has a long right tail --
occasional large positive moves (typical of call option payoffs).
- **Negative skew**: Distribution has a long left tail --
occasional large negative moves (crash risk).
- **Near 0**: Symmetric distribution (normal-like).
- Negative skew in returns is common for equities -- tail
risk is to the downside.
Parameters
----------
data : pd.Series
Price series.
period : int, default 20
Rolling window length (must be >= 3).
Returns
-------
pd.Series
Skewness values (unbounded).
Example
-------
>>> import pandas as pd
>>> close = pd.Series(range(30), dtype=float)
>>> skewness(close, period=20) # doctest: +SKIP
"""
data = _validate_series(data)
_validate_period(period)
result = data.rolling(window=period, min_periods=period).skew()
result.name = "skewness"
return result
# ---------------------------------------------------------------------------
# Kurtosis
# ---------------------------------------------------------------------------
[docs]
def kurtosis(data: pd.Series, period: int = 20) -> pd.Series:
"""Rolling kurtosis (excess kurtosis, Fisher's definition).
Measures the tailedness of the distribution of values within the
rolling window. Normal distribution has excess kurtosis of 0.
Interpretation:
- **> 0 (leptokurtic)**: Fatter tails than normal -- more
extreme moves than a Gaussian would predict. Common in
financial returns.
- **< 0 (platykurtic)**: Thinner tails than normal -- fewer
extreme moves.
- **Near 0 (mesokurtic)**: Normal-like tail behavior.
- High kurtosis warns of tail risk -- standard VaR and
Gaussian-based risk models will underestimate risk.
Parameters
----------
data : pd.Series
Price series.
period : int, default 20
Rolling window length (must be >= 4).
Returns
-------
pd.Series
Excess kurtosis values (unbounded).
Example
-------
>>> import pandas as pd
>>> close = pd.Series(range(30), dtype=float)
>>> kurtosis(close, period=20) # doctest: +SKIP
"""
data = _validate_series(data)
_validate_period(period)
result = data.rolling(window=period, min_periods=period).kurt()
result.name = "kurtosis"
return result
# ---------------------------------------------------------------------------
# Entropy
# ---------------------------------------------------------------------------
[docs]
def entropy(data: pd.Series, period: int = 20, bins: int = 10) -> pd.Series:
"""Rolling Shannon entropy of binned price changes.
Discretises the price changes within the rolling window into *bins*
equal-width bins and computes Shannon entropy in nats (natural log).
Parameters
----------
data : pd.Series
Price series.
period : int, default 20
Rolling window length.
bins : int, default 10
Number of histogram bins for discretisation.
Returns
-------
pd.Series
Shannon entropy values (>= 0).
Example
-------
>>> import pandas as pd
>>> close = pd.Series(range(30), dtype=float)
>>> entropy(close, period=20, bins=5) # doctest: +SKIP
"""
data = _validate_series(data)
_validate_period(period)
changes = data.diff()
def _shannon(window: np.ndarray) -> float:
counts, _ = np.histogram(window, bins=bins)
probs = counts / counts.sum()
# Filter out zero-probability bins to avoid log(0)
probs = probs[probs > 0]
return float(-np.sum(probs * np.log(probs)))
result = changes.rolling(window=period, min_periods=period).apply(
_shannon, raw=True
)
result.name = "entropy"
return result
# ---------------------------------------------------------------------------
# Hurst Exponent
# ---------------------------------------------------------------------------
[docs]
def hurst_exponent(data: pd.Series, period: int = 100) -> pd.Series:
"""Rolling Hurst exponent via the rescaled range (R/S) method.
* H < 0.5: mean-reverting
* H = 0.5: random walk
* H > 0.5: trending
Parameters
----------
data : pd.Series
Price series.
period : int, default 100
Rolling window length (should be >= 20 for reliable estimates).
Returns
-------
pd.Series
Hurst exponent estimates in roughly [0, 1].
Example
-------
>>> import pandas as pd, numpy as np
>>> np.random.seed(42)
>>> close = pd.Series(100 + np.cumsum(np.random.randn(200)))
>>> hurst_exponent(close, period=100) # doctest: +SKIP
"""
data = _validate_series(data)
_validate_period(period)
def _rs_hurst(window: np.ndarray) -> float:
n = len(window)
if n < 20:
return np.nan
# Use log returns for stationarity
returns = np.diff(np.log(window))
if len(returns) == 0:
return np.nan
mean_ret = np.mean(returns)
deviations = returns - mean_ret
cumdev = np.cumsum(deviations)
r = np.max(cumdev) - np.min(cumdev)
s = np.std(returns, ddof=1)
if s == 0 or r == 0:
return np.nan
# Use multiple sub-ranges for a more robust estimate
rs_values = []
lengths = []
m = n - 1 # number of returns
for div in [2, 4, 8]:
sub_len = m // div
if sub_len < 4:
continue
rs_sub = []
for i in range(div):
sub = returns[i * sub_len : (i + 1) * sub_len]
sub_mean = np.mean(sub)
sub_dev = np.cumsum(sub - sub_mean)
sub_r = np.max(sub_dev) - np.min(sub_dev)
sub_s = np.std(sub, ddof=1)
if sub_s > 0:
rs_sub.append(sub_r / sub_s)
if rs_sub:
rs_values.append(np.log(np.mean(rs_sub)))
lengths.append(np.log(sub_len))
# Add full-range R/S
rs_values.append(np.log(r / s))
lengths.append(np.log(m))
if len(rs_values) < 2:
# Fallback: simple R/S estimate
return float(np.log(r / s) / np.log(m))
# Linear regression of log(R/S) vs log(n)
coeffs = np.polyfit(lengths, rs_values, 1)
return float(coeffs[0])
result = data.rolling(window=period, min_periods=period).apply(_rs_hurst, raw=True)
result.name = "hurst_exponent"
return result
# ---------------------------------------------------------------------------
# Correlation
# ---------------------------------------------------------------------------
[docs]
def correlation(
data: pd.Series,
other: pd.Series,
period: int = 20,
) -> pd.Series:
"""Rolling Pearson correlation between two series.
Parameters
----------
data : pd.Series
First price series.
other : pd.Series
Second price series.
period : int, default 20
Rolling window length.
Returns
-------
pd.Series
Correlation values in [-1, 1].
Example
-------
>>> import pandas as pd
>>> x = pd.Series([1, 2, 3, 4, 5, 6, 7, 8, 9, 10], dtype=float)
>>> y = pd.Series([2, 4, 6, 8, 10, 12, 14, 16, 18, 20], dtype=float)
>>> correlation(x, y, period=5) # doctest: +SKIP
"""
data = _validate_series(data, "data")
other = _validate_series(other, "other")
_validate_period(period)
result = data.rolling(window=period, min_periods=period).corr(other)
result.name = "correlation"
return result
# ---------------------------------------------------------------------------
# Beta
# ---------------------------------------------------------------------------
[docs]
def beta(
data: pd.Series,
benchmark: pd.Series,
period: int = 60,
) -> pd.Series:
"""Rolling beta (OLS slope of data returns vs benchmark returns).
Interpretation:
- **Beta = 1**: Asset moves in line with the benchmark.
- **Beta > 1**: Asset is more volatile than the benchmark
(amplifies market moves). E.g. beta = 1.5 means the stock
moves 1.5% for every 1% benchmark move.
- **Beta < 1**: Asset is less volatile than the benchmark.
- **Beta < 0**: Asset moves inversely to the benchmark (rare).
- **Rising beta**: Asset becoming more correlated/volatile
relative to benchmark.
- Beta is the cornerstone of CAPM and factor models.
Parameters
----------
data : pd.Series
Asset price series.
benchmark : pd.Series
Benchmark price series.
period : int, default 60
Rolling window length.
Returns
-------
pd.Series
Beta values (unbounded).
Example
-------
>>> import pandas as pd, numpy as np
>>> np.random.seed(42)
>>> stock = pd.Series(100 + np.cumsum(np.random.randn(100)))
>>> market = pd.Series(100 + np.cumsum(np.random.randn(100)))
>>> beta(stock, market, period=30) # doctest: +SKIP
"""
data = _validate_series(data, "data")
benchmark = _validate_series(benchmark, "benchmark")
_validate_period(period)
data_ret = data.pct_change()
bench_ret = benchmark.pct_change()
covar = data_ret.rolling(window=period, min_periods=period).cov(bench_ret)
var_bench = bench_ret.rolling(window=period, min_periods=period).var(ddof=1)
result = covar / var_bench
result.name = "beta"
return result
# ---------------------------------------------------------------------------
# R-Squared
# ---------------------------------------------------------------------------
[docs]
def r_squared(
data: pd.Series,
benchmark: pd.Series,
period: int = 60,
) -> pd.Series:
"""Rolling R-squared (coefficient of determination).
Computed as the square of the rolling Pearson correlation of returns.
Parameters
----------
data : pd.Series
Asset price series.
benchmark : pd.Series
Benchmark price series.
period : int, default 60
Rolling window length.
Returns
-------
pd.Series
R-squared values in [0, 1].
Example
-------
>>> import pandas as pd, numpy as np
>>> np.random.seed(42)
>>> stock = pd.Series(100 + np.cumsum(np.random.randn(100)))
>>> market = pd.Series(100 + np.cumsum(np.random.randn(100)))
>>> r_squared(stock, market, period=30) # doctest: +SKIP
"""
data = _validate_series(data, "data")
benchmark = _validate_series(benchmark, "benchmark")
_validate_period(period)
data_ret = data.pct_change()
bench_ret = benchmark.pct_change()
corr = data_ret.rolling(window=period, min_periods=period).corr(bench_ret)
result = corr**2
result.name = "r_squared"
return result
# ---------------------------------------------------------------------------
# Information Coefficient
# ---------------------------------------------------------------------------
def _rankdata(arr: np.ndarray) -> np.ndarray:
"""Assign ranks to data (average method for ties)."""
n = len(arr)
order = np.argsort(arr)
ranks = np.empty(n, dtype=float)
ranks[order] = np.arange(1, n + 1, dtype=float)
# Handle ties with average rank
i = 0
while i < n:
j = i
while j < n - 1 and arr[order[j + 1]] == arr[order[j]]:
j += 1
if j > i:
avg_rank = np.mean(ranks[order[i : j + 1]])
ranks[order[i : j + 1]] = avg_rank
i = j + 1
return ranks