"""Trend indicators.
This module provides indicators that identify and measure the direction
and strength of price trends. All functions accept ``pd.Series`` inputs
and return ``pd.Series`` (or ``dict[str, pd.Series]`` for multi-output
indicators).
"""
from __future__ import annotations
import numpy as np
import pandas as pd
__all__ = [
"adx",
"aroon",
"psar",
"vortex",
"trix",
"linear_regression",
"linear_regression_slope",
"zigzag",
"heikin_ashi",
"mcginley_dynamic",
"schaff_trend_cycle",
"guppy_mma",
"rainbow_ma",
"hull_ma",
"zero_lag_ema",
"vidya",
"tilson_t3",
"fractal_adaptive_ma",
]
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
from wraquant.ta._validators import validate_period as _validate_period
from wraquant.ta._validators import validate_series as _validate_series
def _ema(data: pd.Series, period: int) -> pd.Series:
"""Internal EMA helper."""
return data.ewm(span=period, adjust=False, min_periods=period).mean()
def _wma(data: pd.Series, period: int) -> pd.Series:
"""Internal Weighted Moving Average helper."""
weights = np.arange(1, period + 1, dtype=float)
def _apply_wma(window: np.ndarray) -> float:
return np.dot(window, weights) / weights.sum()
return data.rolling(window=period, min_periods=period).apply(_apply_wma, raw=True)
def _wilder_smooth(data: pd.Series, period: int) -> pd.Series:
"""Wilder smoothing (equivalent to ``ewm(alpha=1/period)``)."""
return data.ewm(alpha=1.0 / period, min_periods=period, adjust=False).mean()
# ---------------------------------------------------------------------------
# ADX (Average Directional Index)
# ---------------------------------------------------------------------------
[docs]
def adx(
high: pd.Series,
low: pd.Series,
close: pd.Series,
period: int = 14,
) -> dict[str, pd.Series]:
"""Average Directional Index (ADX) with +DI and -DI.
Measures the strength of a trend regardless of its direction.
+DI and -DI show the direction.
Interpretation:
- **ADX > 25**: Trend is strong enough to trade.
- **ADX > 50**: Very strong trend.
- **ADX < 20**: No trend (range-bound / choppy).
- **ADX rising**: Trend is strengthening.
- **ADX falling**: Trend is weakening (not necessarily reversing).
- **+DI > -DI**: Uptrend (buyers dominate).
- **-DI > +DI**: Downtrend (sellers dominate).
- **+DI/-DI crossover**: Trend direction change.
Trading rules:
- Buy when +DI crosses above -DI AND ADX > 25 (trending up).
- Sell when -DI crosses above +DI AND ADX > 25 (trending down).
- Avoid trading when ADX < 20 (no trend).
- Use ADX as a filter: only apply trend-following strategies
when ADX > 25; use mean-reversion when ADX < 20.
Parameters
----------
high : pd.Series
High prices.
low : pd.Series
Low prices.
close : pd.Series
Close prices.
period : int, default 14
Smoothing period.
Returns
-------
dict[str, pd.Series]
``adx``, ``plus_di``, ``minus_di`` — all in [0, 100].
"""
high = _validate_series(high, "high")
low = _validate_series(low, "low")
close = _validate_series(close, "close")
_validate_period(period)
# True Range
prev_close = close.shift(1)
tr = pd.concat(
[high - low, (high - prev_close).abs(), (low - prev_close).abs()],
axis=1,
).max(axis=1)
# Directional Movement
up_move = high - high.shift(1)
down_move = low.shift(1) - low
plus_dm = pd.Series(
np.where((up_move > down_move) & (up_move > 0), up_move, 0.0),
index=high.index,
)
minus_dm = pd.Series(
np.where((down_move > up_move) & (down_move > 0), down_move, 0.0),
index=high.index,
)
# Wilder smoothing
smoothed_tr = _wilder_smooth(tr, period)
smoothed_plus_dm = _wilder_smooth(plus_dm, period)
smoothed_minus_dm = _wilder_smooth(minus_dm, period)
plus_di = 100.0 * smoothed_plus_dm / smoothed_tr
minus_di = 100.0 * smoothed_minus_dm / smoothed_tr
dx = 100.0 * (plus_di - minus_di).abs() / (plus_di + minus_di)
adx_val = _wilder_smooth(dx, period)
return {
"adx": adx_val.rename("adx"),
"plus_di": plus_di.rename("plus_di"),
"minus_di": minus_di.rename("minus_di"),
}
# ---------------------------------------------------------------------------
# Aroon
# ---------------------------------------------------------------------------
[docs]
def aroon(
high: pd.Series,
low: pd.Series,
period: int = 25,
) -> dict[str, pd.Series]:
"""Aroon Indicator.
Measures the time since the last high/low to identify trend
direction and strength.
Interpretation:
- **Aroon Up > 70**: Strong uptrend (recent new highs).
- **Aroon Down > 70**: Strong downtrend (recent new lows).
- **Aroon Up < 30**: Weak uptrend (no recent new highs).
- **Aroon Down < 30**: Weak downtrend (no recent new lows).
- **Aroon Up crosses above Aroon Down**: New uptrend starting.
- **Aroon Down crosses above Aroon Up**: New downtrend starting.
- **Both below 50**: Consolidation / no trend.
- **Oscillator**: Positive = uptrend, negative = downtrend.
Trading rules:
- Buy when Aroon Up crosses above Aroon Down.
- Sell when Aroon Down crosses above Aroon Up.
- Strongest signal when one Aroon is above 70 and the other
is below 30.
Parameters
----------
high : pd.Series
High prices.
low : pd.Series
Low prices.
period : int, default 25
Look-back period.
Returns
-------
dict[str, pd.Series]
``aroon_up``, ``aroon_down``, ``oscillator`` — up/down in [0, 100],
oscillator in [-100, 100].
"""
high = _validate_series(high, "high")
low = _validate_series(low, "low")
_validate_period(period)
def _bars_since_high(window: np.ndarray) -> float:
return float(period - (period - np.argmax(window)))
def _bars_since_low(window: np.ndarray) -> float:
return float(period - (period - np.argmin(window)))
rolling_high_idx = high.rolling(window=period + 1, min_periods=period + 1).apply(
_bars_since_high, raw=True
)
rolling_low_idx = low.rolling(window=period + 1, min_periods=period + 1).apply(
_bars_since_low, raw=True
)
aroon_up = (rolling_high_idx / period) * 100.0
aroon_down = (rolling_low_idx / period) * 100.0
oscillator = aroon_up - aroon_down
return {
"aroon_up": aroon_up.rename("aroon_up"),
"aroon_down": aroon_down.rename("aroon_down"),
"oscillator": oscillator.rename("aroon_oscillator"),
}
# ---------------------------------------------------------------------------
# Parabolic SAR
# ---------------------------------------------------------------------------
[docs]
def psar(
high: pd.Series,
low: pd.Series,
close: pd.Series,
af_start: float = 0.02,
af_step: float = 0.02,
af_max: float = 0.2,
) -> pd.Series:
"""Parabolic SAR (Stop and Reverse).
A trend-following indicator that provides entry/exit points and
trailing stop levels.
Interpretation:
- **SAR dots below price**: Uptrend. The SAR value is a
trailing stop / support level.
- **SAR dots above price**: Downtrend. The SAR value is a
trailing stop / resistance level.
- **SAR flip (below to above)**: Sell signal -- trend has
reversed from bullish to bearish.
- **SAR flip (above to below)**: Buy signal -- trend has
reversed from bearish to bullish.
Trading rules:
- Buy when SAR flips below price (dots move under candles).
- Sell when SAR flips above price (dots move above candles).
- Use SAR values as trailing stop-loss levels.
- Works best in trending markets; generates many false signals
in ranging markets. Combine with ADX to filter.
Parameters
----------
high : pd.Series
High prices.
low : pd.Series
Low prices.
close : pd.Series
Close prices.
af_start : float, default 0.02
Initial acceleration factor.
af_step : float, default 0.02
Acceleration factor increment.
af_max : float, default 0.2
Maximum acceleration factor.
Returns
-------
pd.Series
Parabolic SAR values. Values above price indicate downtrend;
values below price indicate uptrend.
"""
high = _validate_series(high, "high")
low = _validate_series(low, "low")
close = _validate_series(close, "close")
h = high.values.astype(float)
l = low.values.astype(float) # noqa: E741
n = len(close)
sar = np.full(n, np.nan)
if n < 2:
return pd.Series(sar, index=close.index, name="psar")
# Initialize
is_uptrend = True
af = af_start
ep = h[0]
sar[0] = l[0]
for i in range(1, n):
prev_sar = sar[i - 1]
if is_uptrend:
sar[i] = prev_sar + af * (ep - prev_sar)
# SAR must not be above the prior two lows
sar[i] = min(sar[i], l[i - 1])
if i >= 2:
sar[i] = min(sar[i], l[i - 2])
if l[i] < sar[i]:
# Switch to downtrend
is_uptrend = False
sar[i] = ep
ep = l[i]
af = af_start
else:
if h[i] > ep:
ep = h[i]
af = min(af + af_step, af_max)
else:
sar[i] = prev_sar + af * (ep - prev_sar)
# SAR must not be below the prior two highs
sar[i] = max(sar[i], h[i - 1])
if i >= 2:
sar[i] = max(sar[i], h[i - 2])
if h[i] > sar[i]:
# Switch to uptrend
is_uptrend = True
sar[i] = ep
ep = h[i]
af = af_start
else:
if l[i] < ep:
ep = l[i]
af = min(af + af_step, af_max)
return pd.Series(sar, index=close.index, name="psar")
# ---------------------------------------------------------------------------
# Vortex Indicator
# ---------------------------------------------------------------------------
[docs]
def vortex(
high: pd.Series,
low: pd.Series,
close: pd.Series,
period: int = 14,
) -> dict[str, pd.Series]:
"""Vortex Indicator.
Captures positive and negative trend movement by comparing the
distance between current high/low and previous low/high.
Interpretation:
- **+VI > -VI**: Uptrend -- positive vortex movement dominates.
- **-VI > +VI**: Downtrend -- negative vortex movement dominates.
- **+VI crosses above -VI**: Bullish trend change signal.
- **-VI crosses above +VI**: Bearish trend change signal.
- **Both near 1.0**: Trend is neutral or transitioning.
Trading rules:
- Buy when +VI crosses above -VI.
- Sell when -VI crosses above +VI.
- Use a threshold (e.g. 1.1) to filter weak crossovers.
Parameters
----------
high : pd.Series
High prices.
low : pd.Series
Low prices.
close : pd.Series
Close prices.
period : int, default 14
Look-back period.
Returns
-------
dict[str, pd.Series]
``plus_vi`` and ``minus_vi``.
"""
high = _validate_series(high, "high")
low = _validate_series(low, "low")
close = _validate_series(close, "close")
_validate_period(period)
prev_close = close.shift(1)
tr = pd.concat(
[high - low, (high - prev_close).abs(), (low - prev_close).abs()],
axis=1,
).max(axis=1)
plus_vm = (high - low.shift(1)).abs()
minus_vm = (low - high.shift(1)).abs()
sum_tr = tr.rolling(window=period, min_periods=period).sum()
plus_vi_val = plus_vm.rolling(window=period, min_periods=period).sum() / sum_tr
minus_vi_val = minus_vm.rolling(window=period, min_periods=period).sum() / sum_tr
return {
"plus_vi": plus_vi_val.rename("plus_vi"),
"minus_vi": minus_vi_val.rename("minus_vi"),
}
# ---------------------------------------------------------------------------
# TRIX
# ---------------------------------------------------------------------------
[docs]
def trix(data: pd.Series, period: int = 15) -> pd.Series:
"""TRIX -- Triple-smoothed EMA rate of change.
``TRIX = 100 * ROC(EMA(EMA(EMA(data))))``
Filters out insignificant price movements by triple-smoothing
before computing the rate of change.
Interpretation:
- **Above zero**: Bullish momentum (triple-smoothed trend up).
- **Below zero**: Bearish momentum.
- **Zero-line crossover**: Buy signal when crossing above zero;
sell when crossing below.
- **Signal line**: Often paired with a signal EMA for crossover
signals.
- Extremely smooth; good for identifying major trend changes
but lags significantly.
Trading rules:
- Buy when TRIX crosses above zero.
- Sell when TRIX crosses below zero.
- Use for long-term trend identification, not short-term timing.
Parameters
----------
data : pd.Series
Price series.
period : int, default 15
EMA period for each smoothing pass.
Returns
-------
pd.Series
TRIX values.
"""
data = _validate_series(data)
_validate_period(period)
ema1 = _ema(data, period)
ema2 = _ema(ema1, period)
ema3 = _ema(ema2, period)
result = ema3.pct_change() * 100.0
result.name = "trix"
return result
# ---------------------------------------------------------------------------
# Linear Regression
# ---------------------------------------------------------------------------
[docs]
def linear_regression(
data: pd.Series,
period: int = 14,
) -> dict[str, pd.Series]:
"""Rolling Linear Regression.
Fits an OLS regression line to the last *period* values at each bar.
Interpretation:
- **Positive slope**: Price trend is up over the window.
- **Negative slope**: Price trend is down.
- **R-squared near 1**: Price is moving in a clean line
(strong trend with low noise).
- **R-squared near 0**: No linear trend (choppy/range-bound).
- **Value**: The regression endpoint acts as a smoothed trend
line with less lag than a moving average.
Trading rules:
- Trade in the direction of the slope when R-squared > 0.5.
- Avoid trend-following trades when R-squared < 0.2.
Parameters
----------
data : pd.Series
Price series.
period : int, default 14
Look-back window.
Returns
-------
dict[str, pd.Series]
``value`` (end-of-line prediction), ``slope``, ``intercept``,
``r_squared``.
"""
data = _validate_series(data)
_validate_period(period)
n = len(data)
values = data.values.astype(float)
slope_arr = np.full(n, np.nan)
intercept_arr = np.full(n, np.nan)
r_squared_arr = np.full(n, np.nan)
value_arr = np.full(n, np.nan)
x = np.arange(period, dtype=float)
x_mean = x.mean()
ss_x = np.sum((x - x_mean) ** 2)
for i in range(period - 1, n):
y = values[i - period + 1 : i + 1]
if np.any(np.isnan(y)):
continue
y_mean = y.mean()
ss_xy = np.sum((x - x_mean) * (y - y_mean))
ss_y = np.sum((y - y_mean) ** 2)
slope_arr[i] = ss_xy / ss_x
intercept_arr[i] = y_mean - slope_arr[i] * x_mean
value_arr[i] = intercept_arr[i] + slope_arr[i] * (period - 1)
if ss_y == 0:
r_squared_arr[i] = 1.0
else:
r_squared_arr[i] = (ss_xy**2) / (ss_x * ss_y)
idx = data.index
return {
"value": pd.Series(value_arr, index=idx, name="linreg_value"),
"slope": pd.Series(slope_arr, index=idx, name="linreg_slope"),
"intercept": pd.Series(intercept_arr, index=idx, name="linreg_intercept"),
"r_squared": pd.Series(r_squared_arr, index=idx, name="linreg_r_squared"),
}
[docs]
def linear_regression_slope(data: pd.Series, period: int = 14) -> pd.Series:
"""Rolling Linear Regression Slope.
A convenience wrapper around :func:`linear_regression` that returns
only the slope component.
Interpretation:
- **Positive**: Uptrend over the look-back period.
- **Negative**: Downtrend.
- **Magnitude**: Steeper slope = stronger trend.
- **Zero crossover**: Trend direction change.
Parameters
----------
data : pd.Series
Price series.
period : int, default 14
Look-back window.
Returns
-------
pd.Series
Slope values.
"""
return linear_regression(data, period)["slope"]
# ---------------------------------------------------------------------------
# ZigZag
# ---------------------------------------------------------------------------
[docs]
def zigzag(
close: pd.Series,
pct_change: float = 5.0,
) -> pd.Series:
"""ZigZag indicator -- connects swing highs and lows.
Identifies pivots where price reverses by at least *pct_change* percent,
then linearly interpolates between them.
Interpretation:
- Filters out noise to reveal the true swing structure of price.
- **Not a trading signal**: The last segment repaints as new
data arrives. Use only for historical analysis.
- **Swing counting**: Measure the distance between pivots for
wave analysis (Elliott Wave, harmonic patterns).
- **Support/resistance**: Pivot levels often act as future S/R.
- Useful for backtesting swing-trading strategies.
Parameters
----------
close : pd.Series
Close prices.
pct_change : float, default 5.0
Minimum percentage change to register a new pivot.
Returns
-------
pd.Series
ZigZag line (interpolated between pivots). Non-pivot bars are
filled via linear interpolation; leading/trailing NaNs remain where
no pivot has been established.
Example
-------
>>> import pandas as pd
>>> zz = zigzag(pd.Series([100, 110, 105, 95, 100, 90]), pct_change=5.0)
"""
close = _validate_series(close, "close")
if pct_change <= 0:
raise ValueError(f"pct_change must be > 0, got {pct_change}")
values = close.values.astype(float)
n = len(values)
result = np.full(n, np.nan)
if n < 2:
return pd.Series(result, index=close.index, name="zigzag")
threshold = pct_change / 100.0
# Find first valid (non-NaN) index
start = 0
while start < n and np.isnan(values[start]):
start += 1
if start >= n - 1:
return pd.Series(result, index=close.index, name="zigzag")
pivots: list[tuple[int, float]] = [(start, values[start])]
last_pivot_val = values[start]
direction = 0 # 0 = unknown, 1 = up, -1 = down
for i in range(start + 1, n):
if np.isnan(values[i]):
continue
change = (values[i] - last_pivot_val) / abs(last_pivot_val)
if direction == 0:
if change >= threshold:
direction = 1
pivots.append((i, values[i]))
last_pivot_val = values[i]
elif change <= -threshold:
direction = -1
pivots.append((i, values[i]))
last_pivot_val = values[i]
elif direction == 1:
if values[i] > last_pivot_val:
# Extend the up-move — update the last pivot
pivots[-1] = (i, values[i])
last_pivot_val = values[i]
elif (values[i] - last_pivot_val) / abs(last_pivot_val) <= -threshold:
direction = -1
pivots.append((i, values[i]))
last_pivot_val = values[i]
else: # direction == -1
if values[i] < last_pivot_val:
pivots[-1] = (i, values[i])
last_pivot_val = values[i]
elif (values[i] - last_pivot_val) / abs(last_pivot_val) >= threshold:
direction = 1
pivots.append((i, values[i]))
last_pivot_val = values[i]
# Place pivot values
for idx, val in pivots:
result[idx] = val
# Interpolate between pivots
if len(pivots) >= 2:
pivot_indices = [p[0] for p in pivots]
pivot_values = [p[1] for p in pivots]
for k in range(len(pivot_indices) - 1):
i0, i1 = pivot_indices[k], pivot_indices[k + 1]
v0, v1 = pivot_values[k], pivot_values[k + 1]
for j in range(i0, i1 + 1):
result[j] = v0 + (v1 - v0) * (j - i0) / (i1 - i0)
return pd.Series(result, index=close.index, name="zigzag")
# ---------------------------------------------------------------------------
# Heikin-Ashi
# ---------------------------------------------------------------------------
[docs]
def heikin_ashi(
open_: pd.Series,
high: pd.Series,
low: pd.Series,
close: pd.Series,
) -> dict[str, pd.Series]:
"""Heikin-Ashi modified OHLC candles.
Modified candlesticks that smooth price action and make trends
easier to identify.
Interpretation:
- **Green HA candles with no lower shadow**: Strong uptrend.
- **Red HA candles with no upper shadow**: Strong downtrend.
- **Small body with long shadows**: Indecision / potential
reversal.
- **Color change**: Possible trend reversal (green to red or
vice versa).
- HA candles smooth noise, making it easier to stay in trends
and avoid premature exits.
Trading rules:
- Stay long as long as HA candles remain green.
- Stay short as long as HA candles remain red.
- Watch for doji-like HA candles as reversal warnings.
- Note: HA prices are synthetic -- do not use them for actual
order placement. Use regular prices for entries/exits.
Parameters
----------
open_ : pd.Series
Open prices.
high : pd.Series
High prices.
low : pd.Series
Low prices.
close : pd.Series
Close prices.
Returns
-------
dict[str, pd.Series]
``ha_open``, ``ha_high``, ``ha_low``, ``ha_close``.
Example
-------
>>> import pandas as pd
>>> ha = heikin_ashi(
... pd.Series([100, 102]),
... pd.Series([105, 106]),
... pd.Series([99, 101]),
... pd.Series([104, 103]),
... )
"""
open_ = _validate_series(open_, "open_")
high = _validate_series(high, "high")
low = _validate_series(low, "low")
close = _validate_series(close, "close")
o = open_.values.astype(float)
h = high.values.astype(float)
l = low.values.astype(float) # noqa: E741
c = close.values.astype(float)
n = len(close)
ha_close_arr = (o + h + l + c) / 4.0
ha_open_arr = np.empty(n)
ha_open_arr[0] = (o[0] + c[0]) / 2.0
for i in range(1, n):
ha_open_arr[i] = (ha_open_arr[i - 1] + ha_close_arr[i - 1]) / 2.0
ha_high_arr = np.maximum(h, np.maximum(ha_open_arr, ha_close_arr))
ha_low_arr = np.minimum(l, np.minimum(ha_open_arr, ha_close_arr))
idx = close.index
return {
"ha_open": pd.Series(ha_open_arr, index=idx, name="ha_open"),
"ha_high": pd.Series(ha_high_arr, index=idx, name="ha_high"),
"ha_low": pd.Series(ha_low_arr, index=idx, name="ha_low"),
"ha_close": pd.Series(ha_close_arr, index=idx, name="ha_close"),
}
# ---------------------------------------------------------------------------
# McGinley Dynamic
# ---------------------------------------------------------------------------
[docs]
def mcginley_dynamic(
data: pd.Series,
period: int = 14,
) -> pd.Series:
"""McGinley Dynamic -- adaptive moving average.
Adjusts its speed based on market velocity, reducing whipsaws compared
to a standard EMA.
Interpretation:
- **Price above McGinley**: Bullish trend.
- **Price below McGinley**: Bearish trend.
- Automatically speeds up in fast markets and slows down in
slow markets, reducing false crossovers.
- Acts as a more reliable dynamic support/resistance than
traditional moving averages.
``MD_t = MD_{t-1} + (price - MD_{t-1}) / (N * (price / MD_{t-1})^4)``
Parameters
----------
data : pd.Series
Price series (typically close).
period : int, default 14
Smoothing period.
Returns
-------
pd.Series
McGinley Dynamic values.
Example
-------
>>> import pandas as pd
>>> md = mcginley_dynamic(pd.Series([100, 102, 101, 103, 105]))
"""
data = _validate_series(data)
_validate_period(period)
values = data.values.astype(float)
n = len(values)
result = np.full(n, np.nan)
# Seed with first non-NaN value
start = 0
while start < n and np.isnan(values[start]):
start += 1
if start >= n:
return pd.Series(result, index=data.index, name="mcginley_dynamic")
result[start] = values[start]
for i in range(start + 1, n):
if np.isnan(values[i]):
result[i] = result[i - 1]
continue
prev = result[i - 1]
if prev == 0:
result[i] = values[i]
continue
ratio = values[i] / prev
denom = period * (ratio**4)
result[i] = prev + (values[i] - prev) / denom
return pd.Series(result, index=data.index, name="mcginley_dynamic")
# ---------------------------------------------------------------------------
# Schaff Trend Cycle
# ---------------------------------------------------------------------------
[docs]
def schaff_trend_cycle(
close: pd.Series,
period: int = 10,
fast: int = 23,
slow: int = 50,
) -> pd.Series:
"""Schaff Trend Cycle -- MACD passed through a double stochastic.
Combines the MACD histogram with stochastic smoothing for a faster,
smoother oscillator bounded between 0 and 100.
Interpretation:
- **> 75**: Bullish trend established.
- **< 25**: Bearish trend established.
- **Crosses above 25**: Buy signal (bearish-to-bullish shift).
- **Crosses below 75**: Sell signal (bullish-to-bearish shift).
- Faster than MACD because of the double stochastic processing.
- Spends most of its time at extremes (near 0 or 100), with
quick transitions between them.
Parameters
----------
close : pd.Series
Close prices.
period : int, default 10
Stochastic look-back period.
fast : int, default 23
Fast EMA period for MACD.
slow : int, default 50
Slow EMA period for MACD.
Returns
-------
pd.Series
STC values in [0, 100].
Example
-------
>>> import pandas as pd, numpy as np
>>> stc = schaff_trend_cycle(pd.Series(np.random.randn(100).cumsum() + 100))
"""
close = _validate_series(close, "close")
_validate_period(period)
_validate_period(fast, "fast")
_validate_period(slow, "slow")
macd_line = _ema(close, fast) - _ema(close, slow)
# First stochastic of MACD
lowest_macd = macd_line.rolling(window=period, min_periods=1).min()
highest_macd = macd_line.rolling(window=period, min_periods=1).max()
denom1 = highest_macd - lowest_macd
stoch1 = pd.Series(
np.where(denom1 != 0, (macd_line - lowest_macd) / denom1 * 100.0, 50.0),
index=close.index,
)
# Smooth with EMA
pf = _ema(stoch1, period)
# Second stochastic
lowest_pf = pf.rolling(window=period, min_periods=1).min()
highest_pf = pf.rolling(window=period, min_periods=1).max()
denom2 = highest_pf - lowest_pf
stoch2 = pd.Series(
np.where(denom2 != 0, (pf - lowest_pf) / denom2 * 100.0, 50.0),
index=close.index,
)
result = _ema(stoch2, period)
result.name = "schaff_trend_cycle"
return result
# ---------------------------------------------------------------------------
# Guppy Multiple Moving Average
# ---------------------------------------------------------------------------
[docs]
def guppy_mma(
data: pd.Series,
) -> dict[str, pd.Series]:
"""Guppy Multiple Moving Average (GMMA).
Developed by Daryl Guppy. Uses two groups of EMAs to visualize
the behavior of both traders (short-term) and investors (long-term).
Interpretation:
- **Short-term group fanning out above long-term group**:
Strong uptrend with trader and investor agreement.
- **Short-term group fanning out below long-term group**:
Strong downtrend.
- **Groups compressing**: Trend weakening, consolidation.
- **Short-term crossing long-term**: Trend change signal.
- **Wide separation between groups**: Strong conviction.
- **Groups intertwined/overlapping**: Indecision, no trend.
Returns two groups of EMAs:
- **Short-term group**: 3, 5, 8, 10, 12, 15
- **Long-term group**: 30, 35, 40, 45, 50, 60
Parameters
----------
data : pd.Series
Price series (typically close).
Returns
-------
dict[str, pd.Series]
Keys ``short_3``, ``short_5``, ..., ``short_15``,
``long_30``, ``long_35``, ..., ``long_60``.
Example
-------
>>> import pandas as pd, numpy as np
>>> g = guppy_mma(pd.Series(np.random.randn(100).cumsum() + 100))
"""
data = _validate_series(data)
short_periods = [3, 5, 8, 10, 12, 15]
long_periods = [30, 35, 40, 45, 50, 60]
result: dict[str, pd.Series] = {}
for p in short_periods:
key = f"short_{p}"
result[key] = _ema(data, p).rename(key)
for p in long_periods:
key = f"long_{p}"
result[key] = _ema(data, p).rename(key)
return result
# ---------------------------------------------------------------------------
# Rainbow Moving Average
# ---------------------------------------------------------------------------
[docs]
def rainbow_ma(
data: pd.Series,
period: int = 10,
levels: int = 10,
) -> dict[str, pd.Series]:
"""Rainbow Moving Average -- recursive SMAs.
Each level is an SMA of the previous level. Level 1 is SMA of *data*,
level 2 is SMA of level 1, and so on.
Interpretation:
- **Price above all bands**: Strong uptrend.
- **Price below all bands**: Strong downtrend.
- **Bands fanning out**: Trend strengthening.
- **Bands compressing**: Trend weakening or consolidation.
- **Price touching inner bands then bouncing**: Pullback
buy/sell opportunity in the direction of the trend.
Parameters
----------
data : pd.Series
Price series.
period : int, default 10
SMA period applied at each level.
levels : int, default 10
Number of SMA recursions (typically 10).
Returns
-------
dict[str, pd.Series]
Keys ``sma_1`` through ``sma_{levels}``.
Example
-------
>>> import pandas as pd, numpy as np
>>> rb = rainbow_ma(pd.Series(np.random.randn(100).cumsum() + 100))
"""
data = _validate_series(data)
_validate_period(period)
if levels < 1:
raise ValueError(f"levels must be >= 1, got {levels}")
result: dict[str, pd.Series] = {}
current = data
for i in range(1, levels + 1):
current = current.rolling(window=period, min_periods=period).mean()
key = f"sma_{i}"
result[key] = current.rename(key)
return result
# ---------------------------------------------------------------------------
# Hull Moving Average
# ---------------------------------------------------------------------------
[docs]
def hull_ma(data: pd.Series, period: int = 16) -> pd.Series:
"""Hull Moving Average (HMA).
``HMA = WMA(2 * WMA(n/2) - WMA(n), sqrt(n))``
Provides a fast, smooth moving average with reduced lag.
Interpretation:
- **Price above HMA**: Bullish.
- **Price below HMA**: Bearish.
- **HMA slope change**: Early trend change signal.
HMA turning up = bullish; turning down = bearish.
- Extremely responsive due to the sqrt(n) final smoothing;
excellent for short-term trend following.
Parameters
----------
data : pd.Series
Price series.
period : int, default 16
HMA period.
Returns
-------
pd.Series
Hull Moving Average values.
Example
-------
>>> import pandas as pd, numpy as np
>>> hma = hull_ma(pd.Series(np.random.randn(100).cumsum() + 100), period=16)
"""
data = _validate_series(data)
_validate_period(period)
half_period = max(int(period / 2), 1)
sqrt_period = max(int(np.sqrt(period)), 1)
wma_half = _wma(data, half_period)
wma_full = _wma(data, period)
diff = 2.0 * wma_half - wma_full
result = _wma(diff, sqrt_period)
result.name = "hull_ma"
return result
# ---------------------------------------------------------------------------
# Zero-Lag EMA
# ---------------------------------------------------------------------------
[docs]
def zero_lag_ema(data: pd.Series, period: int = 21) -> pd.Series:
"""Zero-Lag Exponential Moving Average (ZLEMA).
Compensates for inherent EMA lag by applying the EMA to a
de-lagged series: ``zlema_input = data + (data - data.shift(lag))``
where ``lag = (period - 1) / 2``.
Interpretation:
- Same signals as EMA but with near-zero lag.
- More responsive to recent price changes, giving earlier
crossover signals.
- Can overshoot in choppy markets due to the de-lagging
adjustment.
Parameters
----------
data : pd.Series
Price series.
period : int, default 21
EMA period.
Returns
-------
pd.Series
ZLEMA values.
Example
-------
>>> import pandas as pd, numpy as np
>>> zl = zero_lag_ema(pd.Series(np.random.randn(100).cumsum() + 100))
"""
data = _validate_series(data)
_validate_period(period)
lag = int((period - 1) / 2)
adjusted = data + (data - data.shift(lag))
result = _ema(adjusted, period)
result.name = "zero_lag_ema"
return result
# ---------------------------------------------------------------------------
# VIDYA (Variable Index Dynamic Average)
# ---------------------------------------------------------------------------
[docs]
def vidya(
data: pd.Series,
period: int = 14,
smooth: int = 5,
) -> pd.Series:
"""Variable Index Dynamic Average (VIDYA).
Uses the Chande Momentum Oscillator (CMO) as a volatility index to
dynamically adjust the smoothing constant of an EMA.
Interpretation:
- Behaves like a fast EMA in trending markets (high CMO) and
a slow EMA in ranging markets (low CMO).
- **Price above VIDYA**: Bullish trend.
- **Price below VIDYA**: Bearish trend.
- Less prone to whipsaws than standard EMA in choppy markets.
``VIDYA_t = alpha * |CMO_t| * price_t + (1 - alpha * |CMO_t|) * VIDYA_{t-1}``
Parameters
----------
data : pd.Series
Price series.
period : int, default 14
CMO look-back period.
smooth : int, default 5
Smoothing period to derive the base alpha (``2 / (smooth + 1)``).
Returns
-------
pd.Series
VIDYA values.
Example
-------
>>> import pandas as pd, numpy as np
>>> v = vidya(pd.Series(np.random.randn(100).cumsum() + 100))
"""
data = _validate_series(data)
_validate_period(period)
_validate_period(smooth, "smooth")
values = data.values.astype(float)
n = len(values)
result = np.full(n, np.nan)
# Compute CMO
diff = np.diff(values, prepend=np.nan)
gains = np.where(diff > 0, diff, 0.0)
losses = np.where(diff < 0, -diff, 0.0)
gain_sum = pd.Series(gains).rolling(window=period, min_periods=period).sum().values
loss_sum = pd.Series(losses).rolling(window=period, min_periods=period).sum().values
total = gain_sum + loss_sum
cmo = np.where(total != 0, (gain_sum - loss_sum) / total, 0.0)
abs_cmo = np.abs(cmo)
alpha = 2.0 / (smooth + 1)
# Seed: first bar where CMO is valid
start = period # period bars needed for CMO
if start >= n:
return pd.Series(result, index=data.index, name="vidya")
result[start] = values[start]
for i in range(start + 1, n):
if np.isnan(values[i]):
result[i] = result[i - 1]
continue
sc = alpha * abs_cmo[i]
result[i] = sc * values[i] + (1.0 - sc) * result[i - 1]
return pd.Series(result, index=data.index, name="vidya")
# ---------------------------------------------------------------------------
# Tilson T3
# ---------------------------------------------------------------------------
[docs]
def tilson_t3(
data: pd.Series,
period: int = 5,
volume_factor: float = 0.7,
) -> pd.Series:
"""Tilson T3 -- triple-smoothed exponential moving average.
Applies six sequential EMAs with Tilson coefficients derived from the
*volume_factor* to produce an ultra-smooth, low-lag average.
Interpretation:
- Extremely smooth trend line with less lag than TEMA.
- **Price above T3**: Bullish.
- **Price below T3**: Bearish.
- **T3 slope change**: Very reliable trend change signal
due to the heavy smoothing.
- Best for medium to long-term trend identification.
Parameters
----------
data : pd.Series
Price series.
period : int, default 5
EMA period for each pass.
volume_factor : float, default 0.7
Volume factor (commonly 0.7). Controls the overshoot reduction.
Returns
-------
pd.Series
Tilson T3 values.
Example
-------
>>> import pandas as pd, numpy as np
>>> t3 = tilson_t3(pd.Series(np.random.randn(100).cumsum() + 100))
"""
data = _validate_series(data)
_validate_period(period)
vf = volume_factor
c1 = -(vf**3)
c2 = 3 * vf**2 + 3 * vf**3
c3 = -6 * vf**2 - 3 * vf - 3 * vf**3
c4 = 1 + 3 * vf + vf**3 + 3 * vf**2
e1 = _ema(data, period)
e2 = _ema(e1, period)
e3 = _ema(e2, period)
e4 = _ema(e3, period)
e5 = _ema(e4, period)
e6 = _ema(e5, period)
result = c1 * e6 + c2 * e5 + c3 * e4 + c4 * e3
result.name = "tilson_t3"
return result
# ---------------------------------------------------------------------------
# Fractal Adaptive Moving Average (FRAMA)
# ---------------------------------------------------------------------------
[docs]
def fractal_adaptive_ma(
data: pd.Series,
period: int = 16,
) -> pd.Series:
"""Fractal Adaptive Moving Average (FRAMA).
Uses the fractal dimension of the price series to dynamically adjust
the EMA smoothing factor. More responsive in trending markets and
slower during consolidation.
Interpretation:
- **Price above FRAMA**: Bullish trend.
- **Price below FRAMA**: Bearish trend.
- Adapts automatically: becomes responsive (like short EMA) in
trending markets and sluggish (like long EMA) during
consolidation.
- Excellent for trend following with automatic noise filtering.
Parameters
----------
data : pd.Series
Price series.
period : int, default 16
Look-back period (should be even; if odd, it is rounded up).
Returns
-------
pd.Series
FRAMA values.
Example
-------
>>> import pandas as pd, numpy as np
>>> f = fractal_adaptive_ma(pd.Series(np.random.randn(200).cumsum() + 100))
"""
data = _validate_series(data)
_validate_period(period)
# Ensure period is even
if period % 2 != 0:
period += 1
half = period // 2
values = data.values.astype(float)
n = len(values)
result = np.full(n, np.nan)
if n < period:
return pd.Series(result, index=data.index, name="frama")
# Seed FRAMA with the value at the start of the window
result[period - 1] = values[period - 1]
for i in range(period, n):
# High-low range of first half, second half, and full window
window = values[i - period + 1 : i + 1]
first_half = window[:half]
second_half = window[half:]
n1 = (np.max(first_half) - np.min(first_half)) / half
n2 = (np.max(second_half) - np.min(second_half)) / half
n3 = (np.max(window) - np.min(window)) / period
if n1 + n2 > 0 and n3 > 0:
d = (np.log(n1 + n2) - np.log(n3)) / np.log(2)
else:
d = 1.0
alpha = np.exp(-4.6 * (d - 1.0))
alpha = max(0.01, min(alpha, 1.0))
result[i] = alpha * values[i] + (1.0 - alpha) * result[i - 1]
return pd.Series(result, index=data.index, name="frama")