"""Cycle analysis indicators.
This module provides indicators based on digital signal processing
techniques, primarily those developed by John Ehlers. They detect
dominant cycle periods and separate trend from cycle components.
All functions accept ``pd.Series`` inputs and return ``pd.Series``
(or ``dict[str, pd.Series]`` for multi-output indicators).
"""
from __future__ import annotations
import numpy as np
import pandas as pd
__all__ = [
"hilbert_transform_dominant_period",
"hilbert_transform_trend_mode",
"hilbert_instantaneous_phase",
"sine_wave",
"even_better_sinewave",
"roofing_filter",
"decycler",
"bandpass_filter",
]
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
from wraquant.ta._validators import validate_period as _validate_period
from wraquant.ta._validators import validate_series as _validate_series
def _highpass_filter(data: np.ndarray, period: int) -> np.ndarray:
"""Two-pole high-pass filter (Ehlers).
Removes components with period longer than *period* bars.
"""
n = len(data)
alpha = (np.cos(2 * np.pi / period) + np.sin(2 * np.pi / period) - 1) / np.cos(
2 * np.pi / period
)
hp = np.zeros(n)
for i in range(2, n):
hp[i] = (
(1 - alpha / 2)
* (1 - alpha / 2)
* (data[i] - 2 * data[i - 1] + data[i - 2])
+ 2 * (1 - alpha) * hp[i - 1]
- (1 - alpha) * (1 - alpha) * hp[i - 2]
)
return hp
def _supersmoother(data: np.ndarray, period: int) -> np.ndarray:
"""Ehlers two-pole super-smoother filter."""
n = len(data)
a1 = np.exp(-np.sqrt(2) * np.pi / period)
b1 = 2 * a1 * np.cos(np.sqrt(2) * np.pi / period)
c2 = b1
c3 = -a1 * a1
c1 = 1 - c2 - c3
ss = np.zeros(n)
for i in range(2, n):
ss[i] = c1 * (data[i] + data[i - 1]) / 2 + c2 * ss[i - 1] + c3 * ss[i - 2]
return ss
# ---------------------------------------------------------------------------
# Hilbert Transform — Dominant Period
# ---------------------------------------------------------------------------
# ---------------------------------------------------------------------------
# Hilbert Transform — Trend Mode
# ---------------------------------------------------------------------------
# ---------------------------------------------------------------------------
# Hilbert Instantaneous Phase (Trendline)
# ---------------------------------------------------------------------------
[docs]
def hilbert_instantaneous_phase(data: pd.Series) -> pd.Series:
"""Instantaneous trendline via Hilbert Transform.
Computes a smooth trendline by applying the dominant cycle period
as an adaptive moving average length.
Parameters
----------
data : pd.Series
Price series.
Returns
-------
pd.Series
Instantaneous trendline values.
Example
-------
>>> import pandas as pd, numpy as np
>>> close = pd.Series(np.sin(np.linspace(0, 8 * np.pi, 200)) * 10 + 100)
>>> hilbert_instantaneous_phase(close) # doctest: +SKIP
"""
data = _validate_series(data)
dc = hilbert_transform_dominant_period(data)
values = data.values.astype(float)
n = len(values)
trendline = np.full(n, np.nan)
for i in range(1, n):
if np.isnan(dc.iloc[i]):
continue
period = max(int(dc.iloc[i]), 1)
start = max(0, i - period + 1)
trendline[i] = np.mean(values[start : i + 1])
result = pd.Series(trendline, index=data.index, name="instantaneous_trendline")
return result
# ---------------------------------------------------------------------------
# Sine Wave
# ---------------------------------------------------------------------------
[docs]
def sine_wave(data: pd.Series) -> dict[str, pd.Series]:
"""Ehlers Sine Wave indicator.
Uses the dominant cycle period to compute the sine and lead-sine
values, generating buy/sell signals on crossovers.
Interpretation:
- **Sine crosses above lead_sine**: Buy signal (cycle turning up).
- **Sine crosses below lead_sine**: Sell signal (cycle turning down).
- When both values are near +/-1, the market is in cycle mode.
- When values are erratic or near zero, the market may be
trending rather than cycling.
Parameters
----------
data : pd.Series
Price series.
Returns
-------
dict[str, pd.Series]
``sine`` and ``lead_sine`` series.
Example
-------
>>> import pandas as pd, numpy as np
>>> close = pd.Series(np.sin(np.linspace(0, 8 * np.pi, 200)) * 10 + 100)
>>> result = sine_wave(close) # doctest: +SKIP
"""
data = _validate_series(data)
dc = hilbert_transform_dominant_period(data)
n = len(data)
sine_out = np.full(n, np.nan)
lead_out = np.full(n, np.nan)
for i in range(n):
if np.isnan(dc.iloc[i]):
continue
period = dc.iloc[i]
if period > 0:
phase = 2 * np.pi / period
sine_out[i] = np.sin(phase * i)
lead_out[i] = np.sin(phase * i + np.pi / 4)
return {
"sine": pd.Series(sine_out, index=data.index, name="sine"),
"lead_sine": pd.Series(lead_out, index=data.index, name="lead_sine"),
}
# ---------------------------------------------------------------------------
# Even Better Sinewave (EBSW)
# ---------------------------------------------------------------------------
[docs]
def even_better_sinewave(
data: pd.Series,
hp_period: int = 40,
ss_period: int = 10,
) -> pd.Series:
"""Ehlers Even Better Sinewave (EBSW).
Combines a high-pass filter, super-smoother, and autocorrelation
to produce an oscillator that identifies the dominant cycle.
Interpretation:
- **Near +1**: Cycle is at or near a peak.
- **Near -1**: Cycle is at or near a trough.
- **Zero crossover up**: Cycle turning bullish.
- **Zero crossover down**: Cycle turning bearish.
- More reliable than the original Sine Wave indicator because
it better separates cycle from trend components.
Parameters
----------
data : pd.Series
Price series.
hp_period : int, default 40
High-pass filter period.
ss_period : int, default 10
Super-smoother period.
Returns
-------
pd.Series
EBSW oscillator values in approximately [-1, 1].
Example
-------
>>> import pandas as pd, numpy as np
>>> close = pd.Series(np.sin(np.linspace(0, 8 * np.pi, 200)) * 10 + 100)
>>> even_better_sinewave(close) # doctest: +SKIP
"""
data = _validate_series(data)
values = data.values.astype(float)
n = len(values)
# High-pass filter
hp = _highpass_filter(values, hp_period)
# Super-smoother
filt = _supersmoother(hp, ss_period)
# Wave computation
wave = np.full(n, np.nan)
for i in range(1, n):
rms = 0.0
count = min(i + 1, ss_period)
for j in range(count):
rms += filt[i - j] ** 2
rms = np.sqrt(rms / count) if count > 0 else 0.0
if rms > 0:
wave[i] = filt[i] / rms
else:
wave[i] = 0.0
# Clamp to [-1, 1]
wave = np.clip(wave, -1.0, 1.0)
result = pd.Series(wave, index=data.index, name="ebsw")
return result
# ---------------------------------------------------------------------------
# Roofing Filter
# ---------------------------------------------------------------------------
[docs]
def roofing_filter(
data: pd.Series,
hp_period: int = 48,
lp_period: int = 10,
) -> pd.Series:
"""Ehlers Roofing Filter.
Applies a high-pass filter followed by a super-smoother low-pass
filter to isolate the dominant cycle from both trend and noise.
Interpretation:
- Output oscillates around zero, showing the pure cycle
component of price.
- **Positive**: Cycle is in the up phase.
- **Negative**: Cycle is in the down phase.
- Use to identify cycle turning points without trend or
noise contamination.
Parameters
----------
data : pd.Series
Price series.
hp_period : int, default 48
High-pass filter cutoff period.
lp_period : int, default 10
Low-pass (super-smoother) cutoff period.
Returns
-------
pd.Series
Filtered cycle component.
Example
-------
>>> import pandas as pd, numpy as np
>>> close = pd.Series(np.sin(np.linspace(0, 8 * np.pi, 200)) * 10 + 100)
>>> roofing_filter(close) # doctest: +SKIP
"""
data = _validate_series(data)
values = data.values.astype(float)
hp = _highpass_filter(values, hp_period)
ss = _supersmoother(hp, lp_period)
result = pd.Series(ss, index=data.index, name="roofing_filter")
return result
# ---------------------------------------------------------------------------
# Decycler
# ---------------------------------------------------------------------------
[docs]
def decycler(data: pd.Series, hp_period: int = 125) -> pd.Series:
"""Ehlers Decycler.
Removes the cycle component from the price series, keeping only
the trend. Computed as ``price - highpass(price)``.
Interpretation:
- Shows the pure trend component of price with cycles removed.
- **Price above decycler**: Bullish trend.
- **Price below decycler**: Bearish trend.
- Extremely smooth with virtually no lag -- one of the best
trend-following overlays available.
Parameters
----------
data : pd.Series
Price series.
hp_period : int, default 125
High-pass filter cutoff period. Components with period shorter
than this are removed (cycles).
Returns
-------
pd.Series
Trend-only (decycled) series.
Example
-------
>>> import pandas as pd, numpy as np
>>> close = pd.Series(np.sin(np.linspace(0, 8 * np.pi, 200)) * 10 + 100)
>>> decycler(close) # doctest: +SKIP
"""
data = _validate_series(data)
values = data.values.astype(float)
hp = _highpass_filter(values, hp_period)
trend = values - hp
result = pd.Series(trend, index=data.index, name="decycler")
return result
# ---------------------------------------------------------------------------
# Bandpass Filter
# ---------------------------------------------------------------------------
[docs]
def bandpass_filter(
data: pd.Series,
period: int = 20,
bandwidth: float = 0.3,
) -> dict[str, pd.Series]:
"""Ehlers Bandpass Filter.
Isolates the cycle component at the specified period. Returns both
the bandpass filter output and a trigger signal (one-bar lag).
Interpretation:
- **BP crosses above trigger**: Buy signal (cycle turning up).
- **BP crosses below trigger**: Sell signal (cycle turning down).
- **BP at peak**: Cycle high -- potential sell zone.
- **BP at trough**: Cycle low -- potential buy zone.
- Only isolates the cycle at the specified period; other
frequencies are filtered out.
Parameters
----------
data : pd.Series
Price series.
period : int, default 20
Centre period of the bandpass.
bandwidth : float, default 0.3
Bandwidth as a fraction of the centre frequency.
Returns
-------
dict[str, pd.Series]
``bp`` (bandpass) and ``trigger`` (one-bar lag of bp).
Example
-------
>>> import pandas as pd, numpy as np
>>> close = pd.Series(np.sin(np.linspace(0, 8 * np.pi, 200)) * 10 + 100)
>>> result = bandpass_filter(close, period=20) # doctest: +SKIP
"""
data = _validate_series(data)
_validate_period(period)
values = data.values.astype(float)
n = len(values)
beta_val = np.cos(2 * np.pi / period)
gamma_val = 1 / np.cos(2 * np.pi * bandwidth / period)
alpha_val = gamma_val - np.sqrt(gamma_val * gamma_val - 1)
bp = np.zeros(n)
for i in range(2, n):
bp[i] = (
0.5 * (1 - alpha_val) * (values[i] - values[i - 2])
+ beta_val * (1 + alpha_val) * bp[i - 1]
- alpha_val * bp[i - 2]
)
trigger = np.zeros(n)
trigger[1:] = bp[:-1]
return {
"bp": pd.Series(bp, index=data.index, name="bandpass"),
"trigger": pd.Series(trigger, index=data.index, name="bandpass_trigger"),
}