"""Modern and advanced technical analysis indicators.
This module provides contemporary indicators including squeeze detection,
anchored VWAP, market structure analysis, and adaptive oscillators.
All functions accept ``pd.Series`` inputs and return ``pd.Series``
(or ``dict[str, pd.Series]`` for multi-output indicators).
"""
from __future__ import annotations
import numpy as np
import pandas as pd
__all__ = [
"squeeze_momentum",
"anchored_vwap",
"linear_regression_channel",
"pivot_points",
"market_structure",
"swing_points",
"volume_weighted_macd",
"ehlers_fisher",
"adaptive_rsi",
"relative_strength",
"linear_regression_forecast",
"standard_error_bands",
"r_squared_indicator",
"polynomial_regression",
"raff_regression_channel",
"detrended_regression",
]
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
from wraquant.ta._validators import validate_period as _validate_period
from wraquant.ta._validators import validate_series as _validate_series
def _ema(data: pd.Series, period: int) -> pd.Series:
"""Internal EMA helper to avoid circular import."""
return data.ewm(span=period, adjust=False, min_periods=period).mean()
def _sma(data: pd.Series, period: int) -> pd.Series:
"""Internal SMA helper."""
return data.rolling(window=period, min_periods=period).mean()
# ---------------------------------------------------------------------------
# Squeeze Momentum (TTM Squeeze)
# ---------------------------------------------------------------------------
[docs]
def squeeze_momentum(
high: pd.Series,
low: pd.Series,
close: pd.Series,
bb_period: int = 20,
bb_std: float = 2.0,
kc_period: int = 20,
kc_mult: float = 1.5,
mom_period: int = 12,
) -> dict[str, pd.Series]:
"""TTM Squeeze Momentum indicator.
Detects when Bollinger Bands are inside Keltner Channels (the
"squeeze") and measures momentum via a linear regression of price.
Interpretation:
- **squeeze_on = 1**: Bollinger Bands are inside Keltner
Channels. Volatility is compressed. A breakout is imminent.
- **squeeze_on = 0**: No squeeze. Normal volatility.
- **Momentum positive**: Bullish momentum -- breakout likely up.
- **Momentum negative**: Bearish momentum -- breakout likely down.
- **Momentum growing**: Accelerating.
- **Momentum shrinking**: Decelerating.
Trading rules:
- When squeeze fires (transitions from on to off), enter
in the direction of momentum.
- Exit when momentum starts to decelerate (histogram shrinks).
Parameters
----------
high : pd.Series
High prices.
low : pd.Series
Low prices.
close : pd.Series
Close prices.
bb_period : int, default 20
Bollinger Bands SMA period.
bb_std : float, default 2.0
Bollinger Bands standard deviation multiplier.
kc_period : int, default 20
Keltner Channel EMA period.
kc_mult : float, default 1.5
Keltner Channel ATR multiplier.
mom_period : int, default 12
Momentum linear regression period.
Returns
-------
dict[str, pd.Series]
``squeeze_on`` (bool: 1 = squeeze active), ``momentum`` (momentum
histogram values).
Example
-------
>>> import pandas as pd, numpy as np
>>> np.random.seed(42)
>>> n = 100
>>> c = pd.Series(100 + np.cumsum(np.random.randn(n) * 0.5))
>>> h = c + abs(np.random.randn(n) * 0.3)
>>> lo = c - abs(np.random.randn(n) * 0.3)
>>> result = squeeze_momentum(h, lo, c) # doctest: +SKIP
"""
high = _validate_series(high, "high")
low = _validate_series(low, "low")
close = _validate_series(close, "close")
# Bollinger Bands
bb_mid = _sma(close, bb_period)
bb_rolling_std = close.rolling(window=bb_period, min_periods=bb_period).std(ddof=0)
bb_upper = bb_mid + bb_std * bb_rolling_std
bb_lower = bb_mid - bb_std * bb_rolling_std
# Keltner Channels (using ATR via true range)
prev_close = close.shift(1)
tr = pd.concat(
[
high - low,
(high - prev_close).abs(),
(low - prev_close).abs(),
],
axis=1,
).max(axis=1)
atr_val = tr.ewm(alpha=1.0 / kc_period, min_periods=kc_period, adjust=False).mean()
kc_mid = _ema(close, kc_period)
kc_upper = kc_mid + kc_mult * atr_val
kc_lower = kc_mid - kc_mult * atr_val
# Squeeze: BB inside KC
squeeze_on = ((bb_lower > kc_lower) & (bb_upper < kc_upper)).astype(float)
# Momentum: linear regression value of (close - midline)
midline = (
high.rolling(window=kc_period, min_periods=kc_period).max()
+ low.rolling(window=kc_period, min_periods=kc_period).min()
) / 2
delta = close - (midline + bb_mid) / 2
def _linreg_value(window: np.ndarray) -> float:
n = len(window)
x = np.arange(n, dtype=float)
slope = (n * np.dot(x, window) - x.sum() * window.sum()) / (
n * np.dot(x, x) - x.sum() ** 2
)
intercept = (window.sum() - slope * x.sum()) / n
return float(slope * (n - 1) + intercept)
mom = delta.rolling(window=mom_period, min_periods=mom_period).apply(
_linreg_value, raw=True
)
return {
"squeeze_on": squeeze_on.rename("squeeze_on"),
"momentum": mom.rename("squeeze_momentum"),
}
# ---------------------------------------------------------------------------
# Anchored VWAP
# ---------------------------------------------------------------------------
[docs]
def anchored_vwap(
close: pd.Series,
volume: pd.Series,
anchor_index: int = 0,
) -> pd.Series:
"""VWAP anchored from a specific bar index.
Computes the Volume Weighted Average Price starting from
*anchor_index* onwards. Values before the anchor are ``NaN``.
Parameters
----------
close : pd.Series
Close (or typical) prices.
volume : pd.Series
Volume series.
anchor_index : int, default 0
The integer position index to begin the VWAP calculation from.
Returns
-------
pd.Series
Anchored VWAP values.
Example
-------
>>> import pandas as pd
>>> close = pd.Series([10, 11, 12, 11, 10, 9, 10, 11, 12, 13], dtype=float)
>>> volume = pd.Series([100, 200, 150, 300, 250, 100, 200, 150, 300, 250], dtype=float)
>>> anchored_vwap(close, volume, anchor_index=3) # doctest: +SKIP
"""
close = _validate_series(close, "close")
volume = _validate_series(volume, "volume")
pv = close * volume
result = pd.Series(np.nan, index=close.index, name="anchored_vwap")
cum_pv = pv.iloc[anchor_index:].cumsum()
cum_vol = volume.iloc[anchor_index:].cumsum()
result.iloc[anchor_index:] = (cum_pv / cum_vol).values
return result
# ---------------------------------------------------------------------------
# Linear Regression Channel
# ---------------------------------------------------------------------------
[docs]
def linear_regression_channel(
data: pd.Series,
period: int = 100,
std_dev: float = 2.0,
) -> dict[str, pd.Series]:
"""Linear regression channel with standard deviation bands.
Fits a rolling linear regression and constructs upper/lower
channel lines based on the standard error.
Parameters
----------
data : pd.Series
Price series.
period : int, default 100
Rolling window length for the regression.
std_dev : float, default 2.0
Number of standard deviations for the channel width.
Returns
-------
dict[str, pd.Series]
``middle`` (regression value), ``upper``, ``lower``.
Example
-------
>>> import pandas as pd, numpy as np
>>> close = pd.Series(100 + np.arange(120, dtype=float) * 0.5)
>>> result = linear_regression_channel(close, period=50) # doctest: +SKIP
"""
data = _validate_series(data)
_validate_period(period)
values = data.values.astype(float)
n = len(values)
mid = np.full(n, np.nan)
upper = np.full(n, np.nan)
lower = np.full(n, np.nan)
for i in range(period - 1, n):
window = values[i - period + 1 : i + 1]
x = np.arange(period, dtype=float)
slope = (period * np.dot(x, window) - x.sum() * window.sum()) / (
period * np.dot(x, x) - x.sum() ** 2
)
intercept = (window.sum() - slope * x.sum()) / period
reg_val = slope * (period - 1) + intercept
predicted = slope * x + intercept
residuals = window - predicted
std_err = np.std(residuals, ddof=1)
mid[i] = reg_val
upper[i] = reg_val + std_dev * std_err
lower[i] = reg_val - std_dev * std_err
return {
"middle": pd.Series(mid, index=data.index, name="lr_middle"),
"upper": pd.Series(upper, index=data.index, name="lr_upper"),
"lower": pd.Series(lower, index=data.index, name="lr_lower"),
}
# ---------------------------------------------------------------------------
# Pivot Points
# ---------------------------------------------------------------------------
[docs]
def pivot_points(
high: pd.Series,
low: pd.Series,
close: pd.Series,
method: str = "standard",
) -> dict[str, pd.Series]:
"""Pivot points with support and resistance levels.
Computes pivot point and two levels of support/resistance using
the prior bar's high, low, and close.
Parameters
----------
high : pd.Series
High prices.
low : pd.Series
Low prices.
close : pd.Series
Close prices.
method : str, default "standard"
Calculation method: ``"standard"``, ``"fibonacci"``, or ``"woodie"``.
Returns
-------
dict[str, pd.Series]
``pivot``, ``s1``, ``s2``, ``r1``, ``r2``.
Example
-------
>>> import pandas as pd
>>> h = pd.Series([12, 13, 14, 13, 12], dtype=float)
>>> lo = pd.Series([10, 11, 12, 11, 10], dtype=float)
>>> c = pd.Series([11, 12, 13, 12, 11], dtype=float)
>>> result = pivot_points(h, lo, c) # doctest: +SKIP
"""
high = _validate_series(high, "high")
low = _validate_series(low, "low")
close = _validate_series(close, "close")
h_prev = high.shift(1)
l_prev = low.shift(1)
c_prev = close.shift(1)
if method == "standard":
pp = (h_prev + l_prev + c_prev) / 3.0
r1 = 2 * pp - l_prev
s1 = 2 * pp - h_prev
r2 = pp + (h_prev - l_prev)
s2 = pp - (h_prev - l_prev)
elif method == "fibonacci":
pp = (h_prev + l_prev + c_prev) / 3.0
diff = h_prev - l_prev
r1 = pp + 0.382 * diff
r2 = pp + 0.618 * diff
s1 = pp - 0.382 * diff
s2 = pp - 0.618 * diff
elif method == "woodie":
pp = (h_prev + l_prev + 2 * close) / 4.0
r1 = 2 * pp - l_prev
s1 = 2 * pp - h_prev
r2 = pp + (h_prev - l_prev)
s2 = pp - (h_prev - l_prev)
else:
raise ValueError(
f"method must be 'standard', 'fibonacci', or 'woodie', got {method!r}"
)
return {
"pivot": pp.rename("pivot"),
"r1": r1.rename("r1"),
"r2": r2.rename("r2"),
"s1": s1.rename("s1"),
"s2": s2.rename("s2"),
}
# ---------------------------------------------------------------------------
# Market Structure
# ---------------------------------------------------------------------------
[docs]
def market_structure(
high: pd.Series,
low: pd.Series,
lookback: int = 5,
) -> dict[str, pd.Series]:
"""Higher highs / lower lows market structure detection.
Identifies swing highs and lows using a *lookback* window, then
labels each swing as higher-high (HH), lower-high (LH),
higher-low (HL), or lower-low (LL).
Parameters
----------
high : pd.Series
High prices.
low : pd.Series
Low prices.
lookback : int, default 5
Number of bars on each side to confirm a swing point.
Returns
-------
dict[str, pd.Series]
``swing_high`` (high values at swing highs, else NaN),
``swing_low`` (low values at swing lows, else NaN),
``structure`` (1 = bullish / HH+HL, -1 = bearish / LH+LL, 0 = neutral).
Example
-------
>>> import pandas as pd, numpy as np
>>> np.random.seed(42)
>>> h = pd.Series(100 + np.cumsum(np.random.randn(50) * 0.5) + 1)
>>> lo = h - 2
>>> result = market_structure(h, lo, lookback=3) # doctest: +SKIP
"""
high = _validate_series(high, "high")
low = _validate_series(low, "low")
_validate_period(lookback, "lookback")
n = len(high)
h_vals = high.values.astype(float)
l_vals = low.values.astype(float)
sh = np.full(n, np.nan)
sl = np.full(n, np.nan)
# Detect swing points
for i in range(lookback, n - lookback):
# Swing high: highest in both left and right windows
left_h = h_vals[i - lookback : i]
right_h = h_vals[i + 1 : i + lookback + 1]
if h_vals[i] >= np.max(left_h) and h_vals[i] >= np.max(right_h):
sh[i] = h_vals[i]
# Swing low: lowest in both left and right windows
left_l = l_vals[i - lookback : i]
right_l = l_vals[i + 1 : i + lookback + 1]
if l_vals[i] <= np.min(left_l) and l_vals[i] <= np.min(right_l):
sl[i] = l_vals[i]
# Determine structure
structure = np.zeros(n)
last_sh = np.nan
last_sl = np.nan
for i in range(n):
if not np.isnan(sh[i]):
if not np.isnan(last_sh):
if sh[i] > last_sh:
structure[i] = 1.0 # Higher high
elif sh[i] < last_sh:
structure[i] = -1.0 # Lower high
last_sh = sh[i]
elif not np.isnan(sl[i]):
if not np.isnan(last_sl):
if sl[i] > last_sl:
structure[i] = 1.0 # Higher low
elif sl[i] < last_sl:
structure[i] = -1.0 # Lower low
last_sl = sl[i]
# Forward-fill structure labels
struct_series = pd.Series(structure, index=high.index)
struct_series = struct_series.replace(0.0, np.nan).ffill().fillna(0.0)
return {
"swing_high": pd.Series(sh, index=high.index, name="swing_high"),
"swing_low": pd.Series(sl, index=high.index, name="swing_low"),
"structure": struct_series.rename("structure"),
}
# ---------------------------------------------------------------------------
# Swing Points
# ---------------------------------------------------------------------------
[docs]
def swing_points(
high: pd.Series,
low: pd.Series,
lookback: int = 5,
) -> dict[str, pd.Series]:
"""Swing high and low detection.
A swing high occurs when the high is the maximum of
``2 * lookback + 1`` bars centred on the pivot bar. Symmetrically
for swing lows.
Parameters
----------
high : pd.Series
High prices.
low : pd.Series
Low prices.
lookback : int, default 5
Number of bars on each side of the pivot.
Returns
-------
dict[str, pd.Series]
``swing_high`` (high values at swing highs, else NaN),
``swing_low`` (low values at swing lows, else NaN).
Example
-------
>>> import pandas as pd, numpy as np
>>> np.random.seed(42)
>>> h = pd.Series(100 + np.cumsum(np.random.randn(50) * 0.5) + 1)
>>> lo = h - 2
>>> result = swing_points(h, lo, lookback=3) # doctest: +SKIP
"""
high = _validate_series(high, "high")
low = _validate_series(low, "low")
_validate_period(lookback, "lookback")
n = len(high)
h_vals = high.values.astype(float)
l_vals = low.values.astype(float)
sh = np.full(n, np.nan)
sl = np.full(n, np.nan)
for i in range(lookback, n - lookback):
window_h = h_vals[i - lookback : i + lookback + 1]
if h_vals[i] == np.max(window_h):
sh[i] = h_vals[i]
window_l = l_vals[i - lookback : i + lookback + 1]
if l_vals[i] == np.min(window_l):
sl[i] = l_vals[i]
return {
"swing_high": pd.Series(sh, index=high.index, name="swing_high"),
"swing_low": pd.Series(sl, index=low.index, name="swing_low"),
}
# ---------------------------------------------------------------------------
# Volume-Weighted MACD
# ---------------------------------------------------------------------------
[docs]
def volume_weighted_macd(
close: pd.Series,
volume: pd.Series,
fast: int = 12,
slow: int = 26,
signal: int = 9,
) -> dict[str, pd.Series]:
"""MACD weighted by volume.
Uses volume-weighted moving averages instead of standard EMAs
for the fast and slow lines.
Parameters
----------
close : pd.Series
Close prices.
volume : pd.Series
Volume series.
fast : int, default 12
Fast VWMA period.
slow : int, default 26
Slow VWMA period.
signal : int, default 9
Signal EMA period.
Returns
-------
dict[str, pd.Series]
``macd``, ``signal``, ``histogram``.
Example
-------
>>> import pandas as pd, numpy as np
>>> np.random.seed(42)
>>> c = pd.Series(100 + np.cumsum(np.random.randn(100) * 0.5))
>>> v = pd.Series(np.random.randint(1000, 10000, 100), dtype=float)
>>> result = volume_weighted_macd(c, v) # doctest: +SKIP
"""
close = _validate_series(close, "close")
volume = _validate_series(volume, "volume")
def _vwma(price: pd.Series, vol: pd.Series, period: int) -> pd.Series:
pv = price * vol
return (
pv.rolling(window=period, min_periods=period).sum()
/ vol.rolling(window=period, min_periods=period).sum()
)
fast_vwma = _vwma(close, volume, fast)
slow_vwma = _vwma(close, volume, slow)
macd_line = fast_vwma - slow_vwma
signal_line = _ema(macd_line, signal)
hist = macd_line - signal_line
return {
"macd": macd_line.rename("vwmacd"),
"signal": signal_line.rename("vwmacd_signal"),
"histogram": hist.rename("vwmacd_histogram"),
}
# ---------------------------------------------------------------------------
# Ehlers Fisher Transform
# ---------------------------------------------------------------------------
[docs]
def ehlers_fisher(
high: pd.Series,
low: pd.Series,
period: int = 10,
) -> dict[str, pd.Series]:
"""Ehlers Fisher Transform.
Converts prices into a Gaussian normal distribution to create
sharp turning points, making it easier to identify reversals.
Parameters
----------
high : pd.Series
High prices.
low : pd.Series
Low prices.
period : int, default 10
Look-back period for the normalisation.
Returns
-------
dict[str, pd.Series]
``fisher`` and ``trigger`` (one-bar lag of fisher).
Example
-------
>>> import pandas as pd, numpy as np
>>> np.random.seed(42)
>>> h = pd.Series(100 + np.cumsum(np.random.randn(100) * 0.5) + 1)
>>> lo = h - 2
>>> result = ehlers_fisher(h, lo) # doctest: +SKIP
"""
high = _validate_series(high, "high")
low = _validate_series(low, "low")
_validate_period(period)
hl2 = (high + low) / 2.0
n = len(hl2)
highest = hl2.rolling(window=period, min_periods=period).max()
lowest = hl2.rolling(window=period, min_periods=period).min()
# Normalise to [-1, 1]
raw = 2.0 * (hl2 - lowest) / (highest - lowest) - 1.0
# Clamp to avoid atanh(1) = inf
raw = raw.clip(-0.999, 0.999)
# Smooth
val = np.zeros(n)
fisher_out = np.zeros(n)
trigger_out = np.zeros(n)
for i in range(n):
if np.isnan(raw.iloc[i]):
val[i] = 0.0
fisher_out[i] = np.nan
trigger_out[i] = np.nan
continue
val[i] = 0.5 * raw.iloc[i] + 0.5 * val[i - 1] if i > 0 else 0.5 * raw.iloc[i]
val[i] = np.clip(val[i], -0.999, 0.999)
fisher_out[i] = 0.5 * np.log((1 + val[i]) / (1 - val[i])) + 0.5 * (
fisher_out[i - 1] if i > 0 else 0.0
)
trigger_out[i] = fisher_out[i - 1] if i > 0 else np.nan
# Set pre-warmup to NaN
fisher_out[: period - 1] = np.nan
trigger_out[:period] = np.nan
return {
"fisher": pd.Series(fisher_out, index=high.index, name="fisher"),
"trigger": pd.Series(trigger_out, index=high.index, name="fisher_trigger"),
}
# ---------------------------------------------------------------------------
# Adaptive RSI
# ---------------------------------------------------------------------------
[docs]
def adaptive_rsi(
data: pd.Series,
base_period: int = 14,
vol_period: int = 10,
min_period: int = 5,
max_period: int = 50,
) -> pd.Series:
"""RSI with an adaptive period based on volatility.
The look-back period expands in low-volatility regimes and
contracts in high-volatility regimes, improving responsiveness.
Parameters
----------
data : pd.Series
Price series (typically close).
base_period : int, default 14
Base RSI period.
vol_period : int, default 10
Period for the volatility (standard deviation) calculation.
min_period : int, default 5
Minimum allowed RSI period.
max_period : int, default 50
Maximum allowed RSI period.
Returns
-------
pd.Series
Adaptive RSI values in [0, 100].
Example
-------
>>> import pandas as pd, numpy as np
>>> np.random.seed(42)
>>> close = pd.Series(100 + np.cumsum(np.random.randn(200) * 0.5))
>>> adaptive_rsi(close) # doctest: +SKIP
"""
data = _validate_series(data)
_validate_period(base_period)
n = len(data)
values = data.values.astype(float)
returns = np.diff(values, prepend=values[0])
# Rolling volatility
vol = data.pct_change().rolling(window=vol_period, min_periods=vol_period).std()
# Normalise volatility to [0, 1]
vol_min = vol.rolling(window=max_period, min_periods=1).min()
vol_max = vol.rolling(window=max_period, min_periods=1).max()
vol_norm = (vol - vol_min) / (vol_max - vol_min)
vol_norm = vol_norm.fillna(0.5)
# Adaptive period: high vol -> short period, low vol -> long period
adaptive_period = (max_period - vol_norm * (max_period - min_period)).clip(
min_period, max_period
)
result = np.full(n, np.nan)
delta = data.diff()
gain = delta.clip(lower=0.0).values
loss = (-delta).clip(lower=0.0).values
for i in range(max_period, n):
period = int(round(adaptive_period.iloc[i]))
window_gain = gain[i - period + 1 : i + 1]
window_loss = loss[i - period + 1 : i + 1]
avg_gain = np.mean(window_gain)
avg_loss = np.mean(window_loss)
if avg_loss == 0:
result[i] = 100.0
else:
rs = avg_gain / avg_loss
result[i] = 100.0 - 100.0 / (1.0 + rs)
out = pd.Series(result, index=data.index, name="adaptive_rsi")
return out
# ---------------------------------------------------------------------------
# Relative Strength (Ratio)
# ---------------------------------------------------------------------------
[docs]
def relative_strength(
data: pd.Series,
benchmark: pd.Series,
) -> pd.Series:
"""Relative strength ratio of one series to another.
Commonly used for pair analysis or sector rotation. A rising
ratio indicates *data* is outperforming *benchmark*.
Parameters
----------
data : pd.Series
Numerator price series (e.g., individual stock).
benchmark : pd.Series
Denominator price series (e.g., index or sector ETF).
Returns
-------
pd.Series
Ratio of data / benchmark.
Example
-------
>>> import pandas as pd
>>> stock = pd.Series([100, 105, 110, 108, 112], dtype=float)
>>> index = pd.Series([1000, 1010, 1005, 1015, 1020], dtype=float)
>>> relative_strength(stock, index) # doctest: +SKIP
"""
data = _validate_series(data, "data")
benchmark = _validate_series(benchmark, "benchmark")
result = data / benchmark
result.name = "relative_strength"
return result
# ---------------------------------------------------------------------------
# Linear Regression Forecast
# ---------------------------------------------------------------------------
[docs]
def linear_regression_forecast(
data: pd.Series,
period: int = 20,
forecast_bars: int = 1,
) -> pd.Series:
"""Rolling linear regression forecast N bars ahead.
Fits a linear regression over each rolling window and projects the
value *forecast_bars* steps beyond the last observation in the
window.
Parameters
----------
data : pd.Series
Price series (typically close).
period : int, default 20
Rolling window length for the regression.
forecast_bars : int, default 1
Number of bars ahead to forecast.
Returns
-------
pd.Series
Forecasted values.
Example
-------
>>> import pandas as pd, numpy as np
>>> close = pd.Series(100 + np.arange(50, dtype=float) * 0.5)
>>> linear_regression_forecast(close, period=20, forecast_bars=1) # doctest: +SKIP
"""
data = _validate_series(data)
_validate_period(period)
values = data.values.astype(float)
n = len(values)
result = np.full(n, np.nan)
x = np.arange(period, dtype=float)
x_sum = x.sum()
x_sq_sum = np.dot(x, x)
for i in range(period - 1, n):
window = values[i - period + 1 : i + 1]
if np.any(np.isnan(window)):
continue
w_sum = window.sum()
slope = (period * np.dot(x, window) - x_sum * w_sum) / (
period * x_sq_sum - x_sum**2
)
intercept = (w_sum - slope * x_sum) / period
result[i] = slope * (period - 1 + forecast_bars) + intercept
return pd.Series(result, index=data.index, name="lr_forecast")
# ---------------------------------------------------------------------------
# Standard Error Bands
# ---------------------------------------------------------------------------
[docs]
def standard_error_bands(
data: pd.Series,
period: int = 20,
num_bands: int = 3,
) -> dict[str, pd.Series]:
"""Linear regression line with standard error bands.
Fits a rolling linear regression and constructs bands at +-1, +-2,
and +-3 standard errors (configurable via *num_bands*).
Parameters
----------
data : pd.Series
Price series (typically close).
period : int, default 20
Rolling window length.
num_bands : int, default 3
Number of band levels (1, 2, ... *num_bands* standard errors).
Returns
-------
dict[str, pd.Series]
``middle`` plus ``upper_N`` and ``lower_N`` for each band
level N from 1 to *num_bands*.
Example
-------
>>> import pandas as pd, numpy as np
>>> close = pd.Series(100 + np.arange(50, dtype=float) * 0.5)
>>> result = standard_error_bands(close, period=20) # doctest: +SKIP
"""
data = _validate_series(data)
_validate_period(period)
values = data.values.astype(float)
n = len(values)
mid = np.full(n, np.nan)
std_err = np.full(n, np.nan)
x = np.arange(period, dtype=float)
x_sum = x.sum()
x_sq_sum = np.dot(x, x)
for i in range(period - 1, n):
window = values[i - period + 1 : i + 1]
if np.any(np.isnan(window)):
continue
w_sum = window.sum()
slope = (period * np.dot(x, window) - x_sum * w_sum) / (
period * x_sq_sum - x_sum**2
)
intercept = (w_sum - slope * x_sum) / period
reg_val = slope * (period - 1) + intercept
predicted = slope * x + intercept
residuals = window - predicted
se = np.std(residuals, ddof=1)
mid[i] = reg_val
std_err[i] = se
result: dict[str, pd.Series] = {
"middle": pd.Series(mid, index=data.index, name="seb_middle"),
}
for level in range(1, num_bands + 1):
result[f"upper_{level}"] = pd.Series(
mid + level * std_err, index=data.index, name=f"seb_upper_{level}"
)
result[f"lower_{level}"] = pd.Series(
mid - level * std_err, index=data.index, name=f"seb_lower_{level}"
)
return result
# ---------------------------------------------------------------------------
# R-Squared Indicator
# ---------------------------------------------------------------------------
[docs]
def r_squared_indicator(
data: pd.Series,
period: int = 14,
) -> pd.Series:
"""Rolling R-squared of linear regression as a trend strength measure.
An R-squared near 1.0 indicates prices are moving in a strong
linear trend; near 0.0 indicates choppy, non-trending movement.
Parameters
----------
data : pd.Series
Price series (typically close).
period : int, default 14
Rolling window length for the regression.
Returns
-------
pd.Series
R-squared values in [0, 1].
Example
-------
>>> import pandas as pd, numpy as np
>>> close = pd.Series(100 + np.arange(50, dtype=float) * 0.5)
>>> r_squared_indicator(close, period=14) # doctest: +SKIP
"""
data = _validate_series(data)
_validate_period(period)
values = data.values.astype(float)
n = len(values)
result = np.full(n, np.nan)
x = np.arange(period, dtype=float)
x_sum = x.sum()
x_sq_sum = np.dot(x, x)
for i in range(period - 1, n):
window = values[i - period + 1 : i + 1]
if np.any(np.isnan(window)):
continue
w_sum = window.sum()
slope = (period * np.dot(x, window) - x_sum * w_sum) / (
period * x_sq_sum - x_sum**2
)
intercept = (w_sum - slope * x_sum) / period
predicted = slope * x + intercept
ss_res = np.sum((window - predicted) ** 2)
ss_tot = np.sum((window - np.mean(window)) ** 2)
if ss_tot == 0:
result[i] = 1.0
else:
result[i] = 1.0 - ss_res / ss_tot
return pd.Series(result, index=data.index, name="r_squared")
# ---------------------------------------------------------------------------
# Polynomial Regression
# ---------------------------------------------------------------------------
[docs]
def polynomial_regression(
data: pd.Series,
period: int = 20,
degree: int = 2,
) -> pd.Series:
"""Rolling polynomial regression fitted values.
Fits a polynomial of the given degree over each rolling window
and returns the fitted value at the end of the window.
Parameters
----------
data : pd.Series
Price series (typically close).
period : int, default 20
Rolling window length.
degree : int, default 2
Polynomial degree (2 = quadratic, 3 = cubic).
Returns
-------
pd.Series
Fitted polynomial values at the end of each window.
Example
-------
>>> import pandas as pd, numpy as np
>>> close = pd.Series(100 + np.arange(50, dtype=float) ** 1.5 * 0.01)
>>> polynomial_regression(close, period=20, degree=2) # doctest: +SKIP
"""
data = _validate_series(data)
_validate_period(period)
if degree < 1:
raise ValueError(f"degree must be >= 1, got {degree}")
values = data.values.astype(float)
n = len(values)
result = np.full(n, np.nan)
for i in range(period - 1, n):
window = values[i - period + 1 : i + 1]
if np.any(np.isnan(window)):
continue
x = np.arange(period, dtype=float)
coeffs = np.polyfit(x, window, degree)
result[i] = np.polyval(coeffs, period - 1)
return pd.Series(result, index=data.index, name="poly_regression")
# ---------------------------------------------------------------------------
# Raff Regression Channel
# ---------------------------------------------------------------------------
[docs]
def raff_regression_channel(
data: pd.Series,
period: int = 50,
) -> dict[str, pd.Series]:
"""Raff regression channel using maximum deviation.
Fits a rolling linear regression and constructs channel lines
using the maximum absolute deviation of any point in the window
from the regression line.
Parameters
----------
data : pd.Series
Price series (typically close).
period : int, default 50
Rolling window length.
Returns
-------
dict[str, pd.Series]
``center`` (regression line), ``upper``, ``lower``.
Example
-------
>>> import pandas as pd, numpy as np
>>> close = pd.Series(100 + np.arange(80, dtype=float) * 0.5)
>>> result = raff_regression_channel(close, period=50) # doctest: +SKIP
"""
data = _validate_series(data)
_validate_period(period)
values = data.values.astype(float)
n = len(values)
center = np.full(n, np.nan)
upper = np.full(n, np.nan)
lower = np.full(n, np.nan)
x = np.arange(period, dtype=float)
x_sum = x.sum()
x_sq_sum = np.dot(x, x)
for i in range(period - 1, n):
window = values[i - period + 1 : i + 1]
if np.any(np.isnan(window)):
continue
w_sum = window.sum()
slope = (period * np.dot(x, window) - x_sum * w_sum) / (
period * x_sq_sum - x_sum**2
)
intercept = (w_sum - slope * x_sum) / period
reg_val = slope * (period - 1) + intercept
predicted = slope * x + intercept
max_dev = np.max(np.abs(window - predicted))
center[i] = reg_val
upper[i] = reg_val + max_dev
lower[i] = reg_val - max_dev
return {
"center": pd.Series(center, index=data.index, name="raff_center"),
"upper": pd.Series(upper, index=data.index, name="raff_upper"),
"lower": pd.Series(lower, index=data.index, name="raff_lower"),
}
# ---------------------------------------------------------------------------
# Detrended Regression
# ---------------------------------------------------------------------------
[docs]
def detrended_regression(
data: pd.Series,
period: int = 20,
) -> pd.Series:
"""Residuals from rolling linear regression (for mean reversion).
Fits a rolling linear regression and returns the residual
(actual minus predicted) at the end of each window. Positive
values indicate price above trend; negative below trend.
Parameters
----------
data : pd.Series
Price series (typically close).
period : int, default 20
Rolling window length.
Returns
-------
pd.Series
Detrended (residual) values.
Example
-------
>>> import pandas as pd, numpy as np
>>> close = pd.Series(100 + np.arange(50, dtype=float) * 0.5)
>>> detrended_regression(close, period=20) # doctest: +SKIP
"""
data = _validate_series(data)
_validate_period(period)
values = data.values.astype(float)
n = len(values)
result = np.full(n, np.nan)
x = np.arange(period, dtype=float)
x_sum = x.sum()
x_sq_sum = np.dot(x, x)
for i in range(period - 1, n):
window = values[i - period + 1 : i + 1]
if np.any(np.isnan(window)):
continue
w_sum = window.sum()
slope = (period * np.dot(x, window) - x_sum * w_sum) / (
period * x_sq_sum - x_sum**2
)
intercept = (w_sum - slope * x_sum) / period
predicted = slope * (period - 1) + intercept
result[i] = window[-1] - predicted
return pd.Series(result, index=data.index, name="detrended")