"""Advanced smoothing and filtering indicators.
This module provides sophisticated moving averages and digital filters used
in technical analysis. All functions accept ``pd.Series`` inputs and return
``pd.Series``.
"""
from __future__ import annotations
import numpy as np
import pandas as pd
__all__ = [
"alma",
"lsma",
"swma",
"sinema",
"trima",
"jma",
"gaussian_filter",
"butterworth_filter",
"supersmoother",
"hann_window_ma",
"hamming_window_ma",
"kaufman_efficiency_ratio",
]
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
from wraquant.ta._validators import validate_period as _validate_period
from wraquant.ta._validators import validate_series as _validate_series
# ---------------------------------------------------------------------------
# ALMA — Arnaud Legoux Moving Average
# ---------------------------------------------------------------------------
[docs]
def alma(
data: pd.Series,
period: int = 9,
offset: float = 0.85,
sigma: float = 6.0,
) -> pd.Series:
"""Arnaud Legoux Moving Average (ALMA).
A Gaussian-weighted moving average that allows the user to control the
position of the bell curve along the window via *offset* and the width
via *sigma*.
Interpretation:
- **Price above ALMA**: Bullish.
- **Price below ALMA**: Bearish.
- Combines the responsiveness of EMA with the smoothness of
Gaussian weighting. The offset parameter lets you place
more weight on recent prices (offset near 1) or older
prices (offset near 0).
- Excellent alternative to EMA for crossover systems.
Parameters
----------
data : pd.Series
Price series (typically close).
period : int, default 9
Window length.
offset : float, default 0.85
Controls the position of the Gaussian peak within the window.
0 = far left (oldest), 1 = far right (newest).
sigma : float, default 6.0
Controls the width of the Gaussian bell curve. Higher values
produce a broader, smoother curve.
Returns
-------
pd.Series
ALMA values. The first ``period - 1`` entries are ``NaN``.
Example
-------
>>> result = alma(close, period=9, offset=0.85, sigma=6.0)
"""
data = _validate_series(data)
_validate_period(period)
m = offset * (period - 1)
s = period / sigma
weights = np.array([np.exp(-((i - m) ** 2) / (2.0 * s * s)) for i in range(period)])
weights = weights / weights.sum()
def _alma(window: np.ndarray) -> float:
return np.dot(window, weights)
result = data.rolling(window=period, min_periods=period).apply(_alma, raw=True)
result.name = "alma"
return result
# ---------------------------------------------------------------------------
# LSMA — Least Squares Moving Average
# ---------------------------------------------------------------------------
[docs]
def lsma(data: pd.Series, period: int = 25) -> pd.Series:
"""Least Squares Moving Average (LSMA).
Also known as the Linear Regression Value or End Point Moving Average.
At each bar, a least-squares line is fit over the window and the
endpoint of the line is returned.
Parameters
----------
data : pd.Series
Price series.
period : int, default 25
Window length for the linear regression.
Returns
-------
pd.Series
LSMA values.
Example
-------
>>> result = lsma(close, period=25)
"""
data = _validate_series(data)
_validate_period(period)
def _linreg_endpoint(window: np.ndarray) -> float:
n = len(window)
x = np.arange(n, dtype=float)
slope, intercept = np.polyfit(x, window, 1)
return intercept + slope * (n - 1)
result = data.rolling(window=period, min_periods=period).apply(
_linreg_endpoint, raw=True
)
result.name = "lsma"
return result
# ---------------------------------------------------------------------------
# SWMA — Symmetrically Weighted Moving Average
# ---------------------------------------------------------------------------
[docs]
def swma(data: pd.Series) -> pd.Series:
"""Symmetrically Weighted Moving Average (SWMA).
A 4-bar weighted average using weights ``[1, 2, 2, 1] / 6``.
Parameters
----------
data : pd.Series
Price series.
Returns
-------
pd.Series
SWMA values. The first 3 entries are ``NaN``.
Example
-------
>>> result = swma(close)
"""
data = _validate_series(data)
weights = np.array([1.0, 2.0, 2.0, 1.0]) / 6.0
def _swma(window: np.ndarray) -> float:
return np.dot(window, weights)
result = data.rolling(window=4, min_periods=4).apply(_swma, raw=True)
result.name = "swma"
return result
# ---------------------------------------------------------------------------
# SineMA — Sine-Weighted Moving Average
# ---------------------------------------------------------------------------
[docs]
def sinema(data: pd.Series, period: int = 14) -> pd.Series:
"""Sine-Weighted Moving Average.
Each element in the window is weighted by the sine of its proportional
position within a half-period (pi), giving the most weight to the
center of the window.
Parameters
----------
data : pd.Series
Price series.
period : int, default 14
Window length.
Returns
-------
pd.Series
Sine-weighted moving average values.
Example
-------
>>> result = sinema(close, period=14)
"""
data = _validate_series(data)
_validate_period(period)
weights = np.array([np.sin(np.pi * (i + 1) / (period + 1)) for i in range(period)])
weights = weights / weights.sum()
def _sinema(window: np.ndarray) -> float:
return np.dot(window, weights)
result = data.rolling(window=period, min_periods=period).apply(_sinema, raw=True)
result.name = "sinema"
return result
# ---------------------------------------------------------------------------
# TRIMA — Triangular Moving Average
# ---------------------------------------------------------------------------
[docs]
def trima(data: pd.Series, period: int = 20) -> pd.Series:
"""Triangular Moving Average (TRIMA).
Equivalent to a double SMA: ``SMA(SMA(data, ceil((period+1)/2)),
floor((period+1)/2))``. This produces a smoother curve than a single
SMA by effectively giving the most weight to the center of the window.
Parameters
----------
data : pd.Series
Price series.
period : int, default 20
Overall window length.
Returns
-------
pd.Series
TRIMA values.
Example
-------
>>> result = trima(close, period=20)
"""
data = _validate_series(data)
_validate_period(period)
# Determine the two sub-periods
half1 = int(np.ceil((period + 1) / 2))
half2 = int(np.floor((period + 1) / 2))
sma1 = data.rolling(window=half1, min_periods=half1).mean()
result = sma1.rolling(window=half2, min_periods=half2).mean()
result.name = "trima"
return result
# ---------------------------------------------------------------------------
# JMA — Jurik Moving Average (approximation)
# ---------------------------------------------------------------------------
[docs]
def jma(
data: pd.Series,
period: int = 7,
phase: float = 50.0,
power: int = 2,
) -> pd.Series:
"""Jurik Moving Average approximation (JMA).
An adaptive moving average that attempts to minimize lag and overshoot.
This is an approximation of the proprietary Jurik algorithm using an
adaptive EMA with phase and power controls.
Parameters
----------
data : pd.Series
Price series.
period : int, default 7
Smoothing period.
phase : float, default 50.0
Phase parameter in the range [-100, 100]. Controls the tradeoff
between lag and overshoot. 0 is balanced, positive reduces lag.
power : int, default 2
Power parameter controlling the smoothing curve shape.
Returns
-------
pd.Series
JMA values.
Example
-------
>>> result = jma(close, period=7, phase=50, power=2)
"""
data = _validate_series(data)
_validate_period(period)
# Compute beta from period
beta = 0.45 * (period - 1) / (0.45 * (period - 1) + 2.0)
# Compute phase ratio
if phase < -100:
phase_ratio = 0.5
elif phase > 100:
phase_ratio = 2.5
else:
phase_ratio = phase / 100.0 + 1.5
alpha = beta**power
values = data.values.astype(float)
n = len(values)
result = np.full(n, np.nan)
if n == 0:
return pd.Series(result, index=data.index, name="jma")
# Find first valid index
first_valid = 0
while first_valid < n and np.isnan(values[first_valid]):
first_valid += 1
if first_valid >= n:
return pd.Series(result, index=data.index, name="jma")
# Initialize
e0 = values[first_valid]
e1 = 0.0
e2 = 0.0
jma_val = values[first_valid]
result[first_valid] = jma_val
for i in range(first_valid + 1, n):
if np.isnan(values[i]):
result[i] = np.nan
continue
e0 = (1.0 - alpha) * values[i] + alpha * e0
e1 = (values[i] - e0) * (1.0 - beta) + beta * e1
e2 = (e0 + phase_ratio * e1 - jma_val) * (1.0 - alpha) ** 2 + (alpha**2) * e2
jma_val = jma_val + e2
result[i] = jma_val
return pd.Series(result, index=data.index, name="jma")
# ---------------------------------------------------------------------------
# Gaussian Filter
# ---------------------------------------------------------------------------
[docs]
def gaussian_filter(data: pd.Series, period: int = 14, poles: int = 2) -> pd.Series:
"""Gaussian-weighted rolling filter.
Applies a discrete Gaussian kernel over the rolling window. The
standard deviation of the kernel is set to ``period / 4`` so that
the window captures approximately two standard deviations.
Parameters
----------
data : pd.Series
Price series.
period : int, default 14
Window length.
poles : int, default 2
Number of standard deviations captured within the window. Used
to set ``sigma = period / (2 * poles)``.
Returns
-------
pd.Series
Gaussian-filtered values.
Example
-------
>>> result = gaussian_filter(close, period=14)
"""
data = _validate_series(data)
_validate_period(period)
sigma = period / (2.0 * poles)
center = (period - 1) / 2.0
weights = np.array(
[np.exp(-0.5 * ((i - center) / sigma) ** 2) for i in range(period)]
)
weights = weights / weights.sum()
def _gauss(window: np.ndarray) -> float:
return np.dot(window, weights)
result = data.rolling(window=period, min_periods=period).apply(_gauss, raw=True)
result.name = "gaussian_filter"
return result
# ---------------------------------------------------------------------------
# Butterworth Filter
# ---------------------------------------------------------------------------
[docs]
def butterworth_filter(data: pd.Series, period: int = 14) -> pd.Series:
"""2nd-order Butterworth low-pass filter (IIR).
This implements the classic Ehlers two-pole Butterworth filter, which
provides smooth output with minimal lag relative to its degree of
smoothing.
Parameters
----------
data : pd.Series
Price series.
period : int, default 14
Cut-off period in bars.
Returns
-------
pd.Series
Butterworth-filtered values.
Example
-------
>>> result = butterworth_filter(close, period=14)
"""
data = _validate_series(data)
_validate_period(period)
# Butterworth coefficients (2-pole)
a = np.exp(-np.sqrt(2.0) * np.pi / period)
b = 2.0 * a * np.cos(np.sqrt(2.0) * np.pi / period)
c2 = b
c3 = -(a * a)
c1 = 1.0 - c2 - c3
values = data.values.astype(float)
n = len(values)
result = np.full(n, np.nan)
# Seed first two valid values
first_valid = 0
while first_valid < n and np.isnan(values[first_valid]):
first_valid += 1
if first_valid >= n:
return pd.Series(result, index=data.index, name="butterworth")
result[first_valid] = values[first_valid]
if first_valid + 1 < n:
result[first_valid + 1] = values[first_valid + 1]
for i in range(first_valid + 2, n):
if np.isnan(values[i]):
result[i] = np.nan
continue
result[i] = c1 * values[i] + c2 * result[i - 1] + c3 * result[i - 2]
return pd.Series(result, index=data.index, name="butterworth")
# ---------------------------------------------------------------------------
# Super Smoother
# ---------------------------------------------------------------------------
[docs]
def supersmoother(data: pd.Series, period: int = 14) -> pd.Series:
"""Ehlers Super Smoother (2-pole Butterworth variant).
A modified Butterworth filter by John Ehlers that removes aliasing
noise while retaining a smooth, low-lag response.
Parameters
----------
data : pd.Series
Price series.
period : int, default 14
Cut-off period in bars.
Returns
-------
pd.Series
Super-smoothed values.
Example
-------
>>> result = supersmoother(close, period=14)
"""
data = _validate_series(data)
_validate_period(period)
# Ehlers super smoother coefficients
f = 1.414 * np.pi / period
a1 = np.exp(-f)
b1 = 2.0 * a1 * np.cos(f)
c3 = -(a1 * a1)
c2 = b1
c1 = 1.0 - c2 - c3
values = data.values.astype(float)
n = len(values)
result = np.full(n, np.nan)
first_valid = 0
while first_valid < n and np.isnan(values[first_valid]):
first_valid += 1
if first_valid >= n:
return pd.Series(result, index=data.index, name="supersmoother")
result[first_valid] = values[first_valid]
if first_valid + 1 < n:
result[first_valid + 1] = values[first_valid + 1]
for i in range(first_valid + 2, n):
if np.isnan(values[i]):
result[i] = np.nan
continue
result[i] = c1 * values[i] + c2 * result[i - 1] + c3 * result[i - 2]
return pd.Series(result, index=data.index, name="supersmoother")
# ---------------------------------------------------------------------------
# Hann Window Moving Average
# ---------------------------------------------------------------------------
[docs]
def hann_window_ma(data: pd.Series, period: int = 14) -> pd.Series:
"""Hann (raised cosine) windowed moving average.
Each element in the window is weighted by the Hann function:
``w(i) = 0.5 * (1 - cos(2 * pi * i / (N - 1)))``
Parameters
----------
data : pd.Series
Price series.
period : int, default 14
Window length.
Returns
-------
pd.Series
Hann-windowed moving average values.
Example
-------
>>> result = hann_window_ma(close, period=14)
"""
data = _validate_series(data)
_validate_period(period)
if period == 1:
result = data.copy()
result.name = "hann_ma"
return result
weights = np.array(
[0.5 * (1.0 - np.cos(2.0 * np.pi * i / (period - 1))) for i in range(period)]
)
weights = weights / weights.sum()
def _hann(window: np.ndarray) -> float:
return np.dot(window, weights)
result = data.rolling(window=period, min_periods=period).apply(_hann, raw=True)
result.name = "hann_ma"
return result
# ---------------------------------------------------------------------------
# Hamming Window Moving Average
# ---------------------------------------------------------------------------
[docs]
def hamming_window_ma(data: pd.Series, period: int = 14) -> pd.Series:
"""Hamming windowed moving average.
Each element in the window is weighted by the Hamming function:
``w(i) = 0.54 - 0.46 * cos(2 * pi * i / (N - 1))``
Parameters
----------
data : pd.Series
Price series.
period : int, default 14
Window length.
Returns
-------
pd.Series
Hamming-windowed moving average values.
Example
-------
>>> result = hamming_window_ma(close, period=14)
"""
data = _validate_series(data)
_validate_period(period)
if period == 1:
result = data.copy()
result.name = "hamming_ma"
return result
weights = np.array(
[0.54 - 0.46 * np.cos(2.0 * np.pi * i / (period - 1)) for i in range(period)]
)
weights = weights / weights.sum()
def _hamming(window: np.ndarray) -> float:
return np.dot(window, weights)
result = data.rolling(window=period, min_periods=period).apply(_hamming, raw=True)
result.name = "hamming_ma"
return result
# ---------------------------------------------------------------------------
# Kaufman Efficiency Ratio
# ---------------------------------------------------------------------------
[docs]
def kaufman_efficiency_ratio(data: pd.Series, period: int = 10) -> pd.Series:
"""Kaufman Efficiency Ratio (ER).
Measures the efficiency of price movement as the ratio of directional
change to total path length. This is the core component of the Kaufman
Adaptive Moving Average (KAMA).
``ER = |close - close[period]| / sum(|close - close[1]|, period)``
Values near 1.0 indicate strong trending; values near 0.0 indicate
choppy / mean-reverting markets.
Parameters
----------
data : pd.Series
Price series.
period : int, default 10
Look-back period.
Returns
-------
pd.Series
Efficiency ratio values in [0, 1].
Example
-------
>>> result = kaufman_efficiency_ratio(close, period=10)
"""
data = _validate_series(data)
_validate_period(period)
direction = (data - data.shift(period)).abs()
volatility = data.diff().abs().rolling(window=period, min_periods=period).sum()
result = direction / volatility.replace(0.0, np.nan)
result.name = "efficiency_ratio"
return result