"""Lesser-known and exotic technical analysis indicators.
This module provides uncommon or specialized indicators that measure
various aspects of market behaviour. All functions accept ``pd.Series``
inputs and return ``pd.Series`` (or ``dict[str, pd.Series]`` for
multi-output indicators).
"""
from __future__ import annotations
import numpy as np
import pandas as pd
__all__ = [
"choppiness_index",
"random_walk_index",
"polarized_fractal_efficiency",
"price_zone_oscillator",
"ergodic_oscillator",
"elder_thermometer",
"market_facilitation_index",
"efficiency_ratio",
"trend_intensity_index",
"directional_movement_index",
"kairi",
"gopalakrishnan_range",
"pretty_good_oscillator",
"connors_tps",
"relative_momentum_index",
]
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
from wraquant.ta._validators import validate_period as _validate_period
from wraquant.ta._validators import validate_series as _validate_series
def _ema(data: pd.Series, period: int) -> pd.Series:
"""Internal EMA helper to avoid circular import."""
return data.ewm(span=period, adjust=False, min_periods=period).mean()
def _sma(data: pd.Series, period: int) -> pd.Series:
"""Internal SMA helper to avoid circular import."""
return data.rolling(window=period, min_periods=period).mean()
def _true_range(high: pd.Series, low: pd.Series, close: pd.Series) -> pd.Series:
"""Internal True Range helper."""
prev_close = close.shift(1)
tr = pd.concat(
[high - low, (high - prev_close).abs(), (low - prev_close).abs()],
axis=1,
).max(axis=1)
return tr
def _atr(high: pd.Series, low: pd.Series, close: pd.Series, period: int) -> pd.Series:
"""Internal ATR helper (SMA of True Range)."""
tr = _true_range(high, low, close)
return tr.rolling(window=period, min_periods=period).mean()
# ---------------------------------------------------------------------------
# Choppiness Index
# ---------------------------------------------------------------------------
[docs]
def choppiness_index(
high: pd.Series,
low: pd.Series,
close: pd.Series,
period: int = 14,
) -> pd.Series:
"""Choppiness Index.
Measures whether the market is trending or range-bound.
Interpretation:
- **> 61.8**: Choppy / range-bound market. Avoid trend-following
strategies; use mean-reversion instead.
- **< 38.2**: Strong trending market. Use trend-following
strategies; avoid mean-reversion.
- **38.2-61.8**: Transitional zone.
- Does NOT indicate trend direction, only whether a trend exists.
- Low choppiness often precedes a breakout.
Trading rules:
- Apply trend strategies when CI < 38.2.
- Apply range strategies when CI > 61.8.
- Wait for CI to drop before entering breakout trades.
``CI = 100 * log10(sum(ATR(1), n) / (highest_high - lowest_low)) / log10(n)``
Parameters
----------
high : pd.Series
High prices.
low : pd.Series
Low prices.
close : pd.Series
Close prices.
period : int, default 14
Look-back period.
Returns
-------
pd.Series
Choppiness Index values, typically in [0, 100].
Example
-------
>>> result = choppiness_index(high, low, close, period=14)
"""
high = _validate_series(high, "high")
low = _validate_series(low, "low")
close = _validate_series(close, "close")
_validate_period(period)
tr = _true_range(high, low, close)
atr_sum = tr.rolling(window=period, min_periods=period).sum()
highest = high.rolling(window=period, min_periods=period).max()
lowest = low.rolling(window=period, min_periods=period).min()
hl_range = highest - lowest
hl_range = hl_range.replace(0.0, np.nan)
result = 100.0 * np.log10(atr_sum / hl_range) / np.log10(period)
result.name = "choppiness_index"
return result
# ---------------------------------------------------------------------------
# Random Walk Index
# ---------------------------------------------------------------------------
[docs]
def random_walk_index(
high: pd.Series,
low: pd.Series,
close: pd.Series,
period: int = 14,
) -> dict[str, pd.Series]:
"""Random Walk Index (RWI).
Compares the range of directional price moves to the expected range
of a random walk. Values above 1.0 suggest trending behavior.
Interpretation:
- **RWI High > 1**: Upward price movement exceeds what a
random walk would produce = genuine uptrend.
- **RWI Low > 1**: Downward price movement exceeds random
walk = genuine downtrend.
- **Both < 1**: Price movement is consistent with random
noise = no trend.
- **RWI High > RWI Low**: Bullish bias.
- **RWI Low > RWI High**: Bearish bias.
Parameters
----------
high : pd.Series
High prices.
low : pd.Series
Low prices.
close : pd.Series
Close prices.
period : int, default 14
Look-back period.
Returns
-------
dict[str, pd.Series]
``rwi_high`` and ``rwi_low``.
Example
-------
>>> result = random_walk_index(high, low, close, period=14)
>>> result["rwi_high"]
"""
high = _validate_series(high, "high")
low = _validate_series(low, "low")
close = _validate_series(close, "close")
_validate_period(period)
tr = _true_range(high, low, close)
atr_val = tr.rolling(window=period, min_periods=period).mean()
h = high.values.astype(float)
l = low.values.astype(float)
atr_arr = atr_val.values.astype(float)
n = len(h)
rwi_high = np.full(n, np.nan)
rwi_low = np.full(n, np.nan)
for i in range(period, n):
max_rwi_h = 0.0
max_rwi_l = 0.0
for j in range(1, period + 1):
denom = atr_arr[i] * np.sqrt(j)
if denom > 0 and not np.isnan(denom):
rwi_h = (h[i] - l[i - j]) / denom
rwi_l = (h[i - j] - l[i]) / denom
max_rwi_h = max(max_rwi_h, rwi_h)
max_rwi_l = max(max_rwi_l, rwi_l)
rwi_high[i] = max_rwi_h
rwi_low[i] = max_rwi_l
idx = high.index
return {
"rwi_high": pd.Series(rwi_high, index=idx, name="rwi_high"),
"rwi_low": pd.Series(rwi_low, index=idx, name="rwi_low"),
}
# ---------------------------------------------------------------------------
# Polarized Fractal Efficiency
# ---------------------------------------------------------------------------
[docs]
def polarized_fractal_efficiency(
close: pd.Series,
period: int = 10,
smoothing: int = 5,
) -> pd.Series:
"""Polarized Fractal Efficiency (PFE).
Measures the efficiency of the price path using fractal geometry.
A straight-line move yields +/- 100; a random walk yields ~0.
Interpretation:
- **> 0**: Price is moving efficiently upward (trending up).
- **< 0**: Price is moving efficiently downward (trending down).
- **Near 0**: Choppy, inefficient movement (no trend).
- **> +50**: Strong bullish trend.
- **< -50**: Strong bearish trend.
- Think of it as "how direct is the price path" -- values near
+/-100 mean a straight line, near 0 means a random walk.
``PFE = sign(direction) * sqrt(sum_sq_diff + direction^2) / sum_single_diff * 100``
The raw PFE is then smoothed with an EMA.
Parameters
----------
close : pd.Series
Close prices.
period : int, default 10
Look-back period.
smoothing : int, default 5
EMA smoothing period applied to the raw PFE.
Returns
-------
pd.Series
PFE values, bounded roughly in [-100, 100].
Example
-------
>>> result = polarized_fractal_efficiency(close, period=10)
"""
close = _validate_series(close, "close")
_validate_period(period)
values = close.values.astype(float)
n = len(values)
pfe_raw = np.full(n, np.nan)
for i in range(period, n):
# Total direction (end-to-end)
direction = values[i] - values[i - period]
# Sum of individual bar-to-bar distances (Euclidean with unit x-step)
path_length = 0.0
for j in range(i - period + 1, i + 1):
diff = values[j] - values[j - 1]
path_length += np.sqrt(1.0 + diff * diff)
# Fractal dimension of the path
fractal_length = np.sqrt(period * period + direction * direction)
if path_length > 0:
sign = 1.0 if direction > 0 else -1.0
pfe_raw[i] = sign * (fractal_length / path_length) * 100.0
raw_series = pd.Series(pfe_raw, index=close.index)
result = _ema(raw_series, smoothing)
result.name = "pfe"
return result
# ---------------------------------------------------------------------------
# Price Zone Oscillator
# ---------------------------------------------------------------------------
[docs]
def price_zone_oscillator(
close: pd.Series,
period: int = 14,
) -> pd.Series:
"""Price Zone Oscillator (PZO).
An EMA-based oscillator that classifies price action into zones.
A positive close change gets +close, negative gets -close.
``PZO = 100 * EMA(signed_close, period) / EMA(close, period)``
Parameters
----------
close : pd.Series
Close prices.
period : int, default 14
EMA period.
Returns
-------
pd.Series
PZO values, typically in [-100, 100].
Example
-------
>>> result = price_zone_oscillator(close, period=14)
"""
close = _validate_series(close, "close")
_validate_period(period)
diff = close.diff()
sign = diff.apply(lambda x: 1.0 if x > 0 else (-1.0 if x < 0 else 0.0))
signed_close = close * sign
ema_signed = _ema(signed_close, period)
ema_close = _ema(close, period)
result = 100.0 * ema_signed / ema_close.replace(0.0, np.nan)
result.name = "pzo"
return result
# ---------------------------------------------------------------------------
# Ergodic Oscillator
# ---------------------------------------------------------------------------
[docs]
def ergodic_oscillator(
close: pd.Series,
fast: int = 5,
slow: int = 20,
signal: int = 5,
) -> dict[str, pd.Series]:
"""Ergodic Oscillator.
A True Strength Index variant that produces a histogram (oscillator
minus signal line) for identifying momentum shifts.
Parameters
----------
close : pd.Series
Close prices.
fast : int, default 5
Fast double-smoothing EMA period.
slow : int, default 20
Slow double-smoothing EMA period.
signal : int, default 5
Signal line EMA period.
Returns
-------
dict[str, pd.Series]
``ergodic`` (the TSI line), ``signal``, and ``histogram``.
Example
-------
>>> result = ergodic_oscillator(close)
>>> result["ergodic"]
"""
close = _validate_series(close, "close")
_validate_period(fast, "fast")
_validate_period(slow, "slow")
_validate_period(signal, "signal")
diff = close.diff()
double_smoothed = _ema(_ema(diff, slow), fast)
double_smoothed_abs = _ema(_ema(diff.abs(), slow), fast)
ergodic_line = 100.0 * double_smoothed / double_smoothed_abs.replace(0.0, np.nan)
signal_line = _ema(ergodic_line, signal)
histogram = ergodic_line - signal_line
return {
"ergodic": ergodic_line.rename("ergodic"),
"signal": signal_line.rename("ergodic_signal"),
"histogram": histogram.rename("ergodic_histogram"),
}
# ---------------------------------------------------------------------------
# Elder Thermometer
# ---------------------------------------------------------------------------
[docs]
def elder_thermometer(
high: pd.Series,
low: pd.Series,
period: int = 22,
) -> pd.Series:
"""Elder Thermometer.
Measures the distance of the current bar from the previous bar's
range, indicating how far price has moved beyond the prior bar.
``Thermo = max(high - prev_high, prev_low - low, 0)``
The result is smoothed with an EMA.
Parameters
----------
high : pd.Series
High prices.
low : pd.Series
Low prices.
period : int, default 22
EMA smoothing period.
Returns
-------
pd.Series
Elder Thermometer values (non-negative).
Example
-------
>>> result = elder_thermometer(high, low, period=22)
"""
high = _validate_series(high, "high")
low = _validate_series(low, "low")
_validate_period(period)
up_move = (high - high.shift(1)).clip(lower=0.0)
down_move = (low.shift(1) - low).clip(lower=0.0)
thermo_raw = pd.concat([up_move, down_move], axis=1).max(axis=1)
result = _ema(thermo_raw, period)
result.name = "elder_thermometer"
return result
# ---------------------------------------------------------------------------
# Market Facilitation Index
# ---------------------------------------------------------------------------
[docs]
def market_facilitation_index(
high: pd.Series,
low: pd.Series,
volume: pd.Series,
) -> pd.Series:
"""Market Facilitation Index (MFI / BW MFI).
Also known as the Bill Williams Market Facilitation Index. Measures
the efficiency of price movement per unit of volume.
``MFI = (high - low) / volume``
Parameters
----------
high : pd.Series
High prices.
low : pd.Series
Low prices.
volume : pd.Series
Volume data.
Returns
-------
pd.Series
Market Facilitation Index values.
Example
-------
>>> result = market_facilitation_index(high, low, volume)
"""
high = _validate_series(high, "high")
low = _validate_series(low, "low")
volume = _validate_series(volume, "volume")
result = (high - low) / volume.replace(0.0, np.nan)
result.name = "mfi_bw"
return result
# ---------------------------------------------------------------------------
# Efficiency Ratio
# ---------------------------------------------------------------------------
[docs]
def efficiency_ratio(data: pd.Series, period: int = 10) -> pd.Series:
"""Efficiency Ratio (ER).
Measures the efficiency of price movement as the ratio of net
directional change to the total path traveled.
``ER = |close - close[n]| / sum(|close - close[1]|, n)``
Values near 1.0 indicate a strong trend; values near 0.0 indicate
choppy, range-bound price action.
Parameters
----------
data : pd.Series
Price series (typically close).
period : int, default 10
Look-back period.
Returns
-------
pd.Series
Efficiency ratio values in [0, 1].
Example
-------
>>> result = efficiency_ratio(close, period=10)
"""
data = _validate_series(data)
_validate_period(period)
direction = (data - data.shift(period)).abs()
volatility = data.diff().abs().rolling(window=period, min_periods=period).sum()
result = direction / volatility.replace(0.0, np.nan)
result.name = "efficiency_ratio"
return result
# ---------------------------------------------------------------------------
# Trend Intensity Index
# ---------------------------------------------------------------------------
[docs]
def trend_intensity_index(
data: pd.Series,
period: int = 30,
) -> pd.Series:
"""Trend Intensity Index (TII).
Measures the percentage of closes above or below the SMA over the
look-back period.
``TII = 100 * (count_above - count_below) / period``
Parameters
----------
data : pd.Series
Price series (typically close).
period : int, default 30
Look-back period.
Returns
-------
pd.Series
TII values in [-100, 100]. Positive indicates bullish bias;
negative indicates bearish bias.
Example
-------
>>> result = trend_intensity_index(close, period=30)
"""
data = _validate_series(data)
_validate_period(period)
sma_val = _sma(data, period)
deviation = data - sma_val
def _tii(window: np.ndarray) -> float:
above = np.sum(window > 0)
below = np.sum(window < 0)
return 100.0 * (above - below) / len(window)
result = deviation.rolling(window=period, min_periods=period).apply(_tii, raw=True)
result.name = "tii"
return result
# ---------------------------------------------------------------------------
# Directional Movement Index (simplified)
# ---------------------------------------------------------------------------
[docs]
def directional_movement_index(
high: pd.Series,
low: pd.Series,
close: pd.Series,
period: int = 14,
) -> dict[str, pd.Series]:
"""Simplified Directional Movement Index (DMI).
Computes +DI and -DI without the full ADX smoothing, providing raw
directional movement readings.
Parameters
----------
high : pd.Series
High prices.
low : pd.Series
Low prices.
close : pd.Series
Close prices.
period : int, default 14
Smoothing period.
Returns
-------
dict[str, pd.Series]
``plus_di`` (+DI) and ``minus_di`` (-DI), both in [0, 100].
Example
-------
>>> result = directional_movement_index(high, low, close, period=14)
>>> result["plus_di"]
"""
high = _validate_series(high, "high")
low = _validate_series(low, "low")
close = _validate_series(close, "close")
_validate_period(period)
up_move = high - high.shift(1)
down_move = low.shift(1) - low
plus_dm = pd.Series(
np.where((up_move > down_move) & (up_move > 0), up_move, 0.0),
index=high.index,
)
minus_dm = pd.Series(
np.where((down_move > up_move) & (down_move > 0), down_move, 0.0),
index=high.index,
)
tr = _true_range(high, low, close)
atr_val = tr.rolling(window=period, min_periods=period).sum()
plus_dm_sum = plus_dm.rolling(window=period, min_periods=period).sum()
minus_dm_sum = minus_dm.rolling(window=period, min_periods=period).sum()
plus_di = 100.0 * plus_dm_sum / atr_val.replace(0.0, np.nan)
minus_di = 100.0 * minus_dm_sum / atr_val.replace(0.0, np.nan)
return {
"plus_di": plus_di.rename("plus_di"),
"minus_di": minus_di.rename("minus_di"),
}
# ---------------------------------------------------------------------------
# KAIRI — % deviation from SMA
# ---------------------------------------------------------------------------
[docs]
def kairi(data: pd.Series, period: int = 14) -> pd.Series:
"""KAIRI — percentage deviation from the SMA.
``KAIRI = ((close - SMA(close, period)) / SMA(close, period)) * 100``
Parameters
----------
data : pd.Series
Price series (typically close).
period : int, default 14
SMA period.
Returns
-------
pd.Series
KAIRI values (percentage, unbounded).
Example
-------
>>> result = kairi(close, period=14)
"""
data = _validate_series(data)
_validate_period(period)
sma_val = _sma(data, period)
result = ((data - sma_val) / sma_val.replace(0.0, np.nan)) * 100.0
result.name = "kairi"
return result
# ---------------------------------------------------------------------------
# Gopalakrishnan Range Index
# ---------------------------------------------------------------------------
[docs]
def gopalakrishnan_range(
high: pd.Series,
low: pd.Series,
period: int = 5,
) -> pd.Series:
"""Gopalakrishnan Range Index (GAPO).
``GAPO = log(max_high - min_low) / log(period)``
Measures the log of the high-low range relative to the log of the
look-back period. Higher values indicate larger ranges.
Parameters
----------
high : pd.Series
High prices.
low : pd.Series
Low prices.
period : int, default 5
Look-back period.
Returns
-------
pd.Series
GAPO values (non-negative).
Example
-------
>>> result = gopalakrishnan_range(high, low, period=5)
"""
high = _validate_series(high, "high")
low = _validate_series(low, "low")
_validate_period(period)
max_high = high.rolling(window=period, min_periods=period).max()
min_low = low.rolling(window=period, min_periods=period).min()
hl_range = (max_high - min_low).clip(lower=1e-10)
result = np.log(hl_range) / np.log(period)
result.name = "gapo"
return result
# ---------------------------------------------------------------------------
# Pretty Good Oscillator
# ---------------------------------------------------------------------------
[docs]
def pretty_good_oscillator(
high: pd.Series,
low: pd.Series,
close: pd.Series,
period: int = 14,
) -> pd.Series:
"""Pretty Good Oscillator (PGO).
``PGO = (close - SMA(close, period)) / ATR(period)``
Normalizes the deviation from the SMA by the ATR, producing a
volatility-adjusted momentum reading.
Parameters
----------
high : pd.Series
High prices.
low : pd.Series
Low prices.
close : pd.Series
Close prices.
period : int, default 14
SMA / ATR period.
Returns
-------
pd.Series
PGO values (unbounded).
Example
-------
>>> result = pretty_good_oscillator(high, low, close, period=14)
"""
high = _validate_series(high, "high")
low = _validate_series(low, "low")
close = _validate_series(close, "close")
_validate_period(period)
sma_val = _sma(close, period)
atr_val = _atr(high, low, close, period)
result = (close - sma_val) / atr_val.replace(0.0, np.nan)
result.name = "pgo"
return result
# ---------------------------------------------------------------------------
# Connors TPS (Trend/Percentile/Streak)
# ---------------------------------------------------------------------------
[docs]
def connors_tps(data: pd.Series, period: int = 2) -> pd.Series:
"""ConnorsRSI TPS component — cumulative streak RSI.
Computes an up/down streak series, then applies RSI to the streak
values. This isolates the streak component of ConnorsRSI.
Parameters
----------
data : pd.Series
Price series (typically close).
period : int, default 2
RSI period applied to the streak series.
Returns
-------
pd.Series
Streak RSI values in [0, 100].
Example
-------
>>> result = connors_tps(close, period=2)
"""
data = _validate_series(data)
_validate_period(period)
diff = data.diff()
values = diff.values.astype(float)
n = len(values)
streak = np.zeros(n)
for i in range(1, n):
if np.isnan(values[i]):
streak[i] = 0.0
elif values[i] > 0:
streak[i] = max(streak[i - 1], 0) + 1
elif values[i] < 0:
streak[i] = min(streak[i - 1], 0) - 1
else:
streak[i] = 0.0
streak_series = pd.Series(streak, index=data.index)
# Apply RSI to the streak
delta = streak_series.diff()
gain = delta.clip(lower=0.0)
loss = (-delta).clip(lower=0.0)
avg_gain = gain.ewm(alpha=1.0 / period, min_periods=period, adjust=False).mean()
avg_loss = loss.ewm(alpha=1.0 / period, min_periods=period, adjust=False).mean()
rs = avg_gain / avg_loss.replace(0.0, np.nan)
result = 100.0 - (100.0 / (1.0 + rs))
result.name = "connors_tps"
return result
# ---------------------------------------------------------------------------
# Relative Momentum Index
# ---------------------------------------------------------------------------
[docs]
def relative_momentum_index(
data: pd.Series,
period: int = 14,
momentum_period: int = 4,
) -> pd.Series:
"""Relative Momentum Index (RMI).
RSI applied to momentum (``close - close[momentum_period]``) instead
of the simple one-bar change. This produces a smoother oscillator
that reacts to the direction and magnitude of price swings over
*momentum_period* bars.
Parameters
----------
data : pd.Series
Price series (typically close).
period : int, default 14
RSI smoothing period.
momentum_period : int, default 4
Look-back for the momentum (difference) calculation.
Returns
-------
pd.Series
RMI values in [0, 100].
Example
-------
>>> result = relative_momentum_index(close, period=14, momentum_period=4)
"""
data = _validate_series(data)
_validate_period(period)
_validate_period(momentum_period, "momentum_period")
delta = data - data.shift(momentum_period)
gain = delta.clip(lower=0.0)
loss = (-delta).clip(lower=0.0)
avg_gain = gain.ewm(alpha=1.0 / period, min_periods=period, adjust=False).mean()
avg_loss = loss.ewm(alpha=1.0 / period, min_periods=period, adjust=False).mean()
rs = avg_gain / avg_loss.replace(0.0, np.nan)
result = 100.0 - (100.0 / (1.0 + rs))
result.name = "rmi"
return result