"""Spectral analysis / FFT tools for financial data."""
from __future__ import annotations
import numpy as np
from numpy.typing import ArrayLike
from scipy import signal as sp_signal
from wraquant.core._coerce import coerce_array
__all__ = [
"fft_decompose",
"dominant_frequencies",
"spectral_density",
"bandpass_filter",
"spectral_entropy",
]
[docs]
def fft_decompose(
data: ArrayLike,
n_components: int | None = None,
) -> dict[str, np.ndarray]:
"""Perform FFT decomposition of a 1-D signal.
Parameters
----------
data : array_like
Input time series (real-valued, 1-D).
n_components : int or None, optional
If given, retain only the first *n_components* positive-frequency
components (sorted by amplitude descending). ``None`` keeps all.
Returns
-------
dict
``frequencies`` – normalised frequencies (cycles / sample).
``amplitudes`` – amplitude of each frequency component.
``phases`` – phase angle (radians) of each component.
``power`` – power spectrum (amplitude squared).
Example
-------
>>> import numpy as np
>>> from wraquant.math.spectral import fft_decompose
>>> t = np.arange(256)
>>> signal = np.sin(2 * np.pi * t / 20) # period=20 bars
>>> result = fft_decompose(signal, n_components=3)
>>> result['frequencies'].shape[0]
3
See Also
--------
dominant_frequencies : Extract the strongest cyclical components.
spectral_density : Power spectral density estimation.
bandpass_filter : Isolate a frequency band from the signal.
"""
data = coerce_array(data, name="data")
n = len(data)
fft_vals = np.fft.rfft(data)
freqs = np.fft.rfftfreq(n)
amplitudes = np.abs(fft_vals) * 2.0 / n
phases = np.angle(fft_vals)
power = amplitudes**2
if n_components is not None:
idx = np.argsort(amplitudes)[::-1][:n_components]
idx = np.sort(idx) # keep frequency order
freqs = freqs[idx]
amplitudes = amplitudes[idx]
phases = phases[idx]
power = power[idx]
return {
"frequencies": freqs,
"amplitudes": amplitudes,
"phases": phases,
"power": power,
}
[docs]
def dominant_frequencies(
data: ArrayLike,
n_top: int = 5,
) -> dict[str, np.ndarray]:
"""Identify the dominant cyclical frequencies in *data*.
Parameters
----------
data : array_like
Input time series.
n_top : int, optional
Number of top frequencies to return (default 5).
Returns
-------
dict
``frequency`` – normalised frequencies of the top components.
``period`` – corresponding period in bars (1 / frequency).
Periods near common trading horizons (5, 21, 63 bars) may
indicate weekly, monthly, or quarterly cycles.
``amplitude`` – amplitude of each component.
Example
-------
>>> import numpy as np
>>> from wraquant.math.spectral import dominant_frequencies
>>> t = np.arange(500)
>>> signal = np.sin(2 * np.pi * t / 21) + 0.5 * np.sin(2 * np.pi * t / 63)
>>> result = dominant_frequencies(signal, n_top=2)
>>> # The two dominant periods should be near 21 and 63
>>> sorted(result['period'])[:2] # doctest: +SKIP
[21.0, 63.0]
See Also
--------
fft_decompose : Full FFT decomposition.
spectral_entropy : Measure spectral complexity.
"""
data = coerce_array(data, name="data")
n = len(data)
fft_vals = np.fft.rfft(data)
freqs = np.fft.rfftfreq(n)
amplitudes = np.abs(fft_vals) * 2.0 / n
# Exclude DC component (index 0)
if len(freqs) > 1:
freqs = freqs[1:]
amplitudes = amplitudes[1:]
n_top = min(n_top, len(freqs))
idx = np.argsort(amplitudes)[::-1][:n_top]
top_freqs = freqs[idx]
top_amps = amplitudes[idx]
periods = np.where(top_freqs > 0, 1.0 / top_freqs, np.inf)
return {
"frequency": top_freqs,
"period": periods,
"amplitude": top_amps,
}
[docs]
def spectral_density(
data: ArrayLike,
method: str = "periodogram",
) -> dict[str, np.ndarray]:
"""Estimate the power spectral density (PSD) of *data*.
Parameters
----------
data : array_like
Input time series.
method : {'periodogram', 'welch'}, optional
Estimation method (default ``'periodogram'``).
Returns
-------
dict
``frequencies`` – frequency axis.
``psd`` – power spectral density values. Higher PSD at
low frequencies indicates trending behaviour; flat PSD
indicates white noise.
Raises
------
ValueError
If *method* is not recognised.
Example
-------
>>> import numpy as np
>>> from wraquant.math.spectral import spectral_density
>>> data = np.random.randn(500)
>>> result = spectral_density(data, method='welch')
>>> len(result['frequencies']) == len(result['psd'])
True
See Also
--------
fft_decompose : Raw FFT decomposition.
spectral_entropy : Single-number summary of spectral shape.
"""
data = coerce_array(data, name="data")
if method == "periodogram":
freqs, psd = sp_signal.periodogram(data)
elif method == "welch":
freqs, psd = sp_signal.welch(data)
else:
raise ValueError(f"Unknown method {method!r}; use 'periodogram' or 'welch'.")
return {"frequencies": freqs, "psd": psd}
[docs]
def bandpass_filter(
data: ArrayLike,
low_freq: float,
high_freq: float,
sampling_rate: float = 1.0,
) -> np.ndarray:
"""Apply a bandpass filter to isolate a frequency band.
Uses a 5th-order Butterworth filter applied forward-backward
(zero phase shift) via :func:`scipy.signal.sosfiltfilt`.
Parameters
----------
data : array_like
Input time series.
low_freq : float
Lower cut-off frequency (Hz or cycles/sample when
``sampling_rate=1.0``).
high_freq : float
Upper cut-off frequency.
sampling_rate : float, optional
Sampling rate of the data (default 1.0).
Returns
-------
np.ndarray
Filtered signal.
Example
-------
>>> import numpy as np
>>> from wraquant.math.spectral import bandpass_filter
>>> t = np.arange(500)
>>> # Signal with a 21-bar cycle plus high-frequency noise
>>> signal = np.sin(2 * np.pi * t / 21) + 0.5 * np.random.randn(500)
>>> filtered = bandpass_filter(signal, low_freq=0.04, high_freq=0.06)
>>> filtered.shape
(500,)
See Also
--------
wraquant.math.signals.savitzky_golay : Polynomial smoothing filter.
dominant_frequencies : Identify which frequencies to isolate.
"""
data = coerce_array(data, name="data")
nyquist = sampling_rate / 2.0
low = low_freq / nyquist
high = high_freq / nyquist
# Clip to valid Butterworth range (0, 1)
low = max(low, 1e-10)
high = min(high, 1.0 - 1e-10)
sos = sp_signal.butter(5, [low, high], btype="band", output="sos")
return sp_signal.sosfiltfilt(sos, data)
[docs]
def spectral_entropy(data: ArrayLike) -> float:
"""Compute the spectral entropy of *data*.
Spectral entropy measures the "flatness" of the power spectrum.
A perfectly periodic signal has low spectral entropy; white noise
has high spectral entropy (close to ``log2(N/2)``).
Parameters
----------
data : array_like
Input time series.
Returns
-------
float
Normalised spectral entropy in [0, 1]. Values near 0 indicate
a strongly periodic signal; values near 1 indicate noise-like
behaviour with no dominant frequency.
Example
-------
>>> import numpy as np
>>> from wraquant.math.spectral import spectral_entropy
>>> # Pure sine wave has low spectral entropy
>>> t = np.arange(256)
>>> se_sine = spectral_entropy(np.sin(2 * np.pi * t / 20))
>>> se_noise = spectral_entropy(np.random.randn(256))
>>> se_sine < se_noise
True
See Also
--------
wraquant.math.information.entropy : Shannon entropy of a data distribution.
spectral_density : Full PSD estimation.
"""
data = coerce_array(data, name="data")
_, psd = sp_signal.periodogram(data)
# Normalise PSD to a probability distribution
psd = psd[psd > 0]
if len(psd) == 0:
return 0.0
psd_norm = psd / psd.sum()
# Shannon entropy normalised by log2(N)
ent = -np.sum(psd_norm * np.log2(psd_norm))
max_ent = np.log2(len(psd_norm))
if max_ent == 0:
return 0.0
return float(ent / max_ent)