Source code for wraquant.ts.features

"""Time series feature extraction.

Provides functions to compute summary features from time series data
for use in classification, clustering, or exploratory analysis. Features
fall into three categories:

1. **Autocorrelation features**: ACF, PACF, Ljung-Box test for serial
   dependence structure.
2. **Spectral features**: power spectral density, dominant frequency,
   spectral entropy, spectral flatness.
3. **Complexity features**: sample entropy, approximate entropy,
   permutation entropy for measuring signal regularity/predictability.
"""

from __future__ import annotations

import warnings
from math import factorial

import numpy as np
import pandas as pd
from scipy import stats as sp_stats


# ---------------------------------------------------------------------------
# Autocorrelation Features
# ---------------------------------------------------------------------------


[docs] def autocorrelation_features( data: pd.Series, n_lags: int = 40, significance: float = 0.05, ) -> dict: """Compute autocorrelation and partial autocorrelation features. Extracts the ACF and PACF values up to ``n_lags``, identifies statistically significant lags, and runs the Ljung-Box test for overall serial correlation. **Interpretation**: - **ACF** measures linear dependence between y_t and y_{t-k}. For AR(p) processes, ACF decays exponentially. For MA(q), ACF cuts off after lag q. - **PACF** measures the correlation between y_t and y_{t-k} after removing the effect of intermediate lags. For AR(p), PACF cuts off after lag p. - **Ljung-Box**: tests H0 that the first m autocorrelations are jointly zero. A significant result indicates the series has exploitable temporal structure. Parameters: data: Time series. NaN values are dropped. n_lags: Number of lags to compute (default 40). significance: Significance level for identifying significant lags and Ljung-Box test (default 0.05). Returns: Dictionary with: - ``acf``: 1-D numpy array of ACF values (lags 0 to n_lags). - ``pacf``: 1-D numpy array of PACF values (lags 0 to n_lags). - ``significant_acf_lags``: list of lag indices where ACF exceeds the Bartlett confidence band. - ``significant_pacf_lags``: list of lag indices where PACF exceeds the confidence band. - ``ljung_box``: dict with ``statistic``, ``p_value``, and ``is_significant`` for the Ljung-Box test at the maximum lag. - ``first_significant_lag``: int or None, the first lag with significant ACF (excluding lag 0). Example: >>> import numpy as np, pandas as pd >>> rng = np.random.default_rng(42) >>> # AR(1) process >>> x = np.zeros(500) >>> for i in range(1, 500): ... x[i] = 0.7 * x[i-1] + rng.normal() >>> result = autocorrelation_features(pd.Series(x), n_lags=20) >>> result['acf'][1] > 0.5 # strong lag-1 autocorrelation True """ from statsmodels.tsa.stattools import acf as sm_acf from statsmodels.tsa.stattools import pacf as sm_pacf from wraquant.core._coerce import coerce_series data = coerce_series(data, name="data") clean = data.dropna().values n = len(clean) n_lags = min(n_lags, n // 2 - 1) with warnings.catch_warnings(): warnings.simplefilter("ignore") acf_vals = sm_acf(clean, nlags=n_lags, fft=True) try: pacf_vals = sm_pacf(clean, nlags=n_lags, method="ywm") except Exception: # noqa: BLE001 pacf_vals = sm_pacf(clean, nlags=min(n_lags, n // 3), method="ywm") # Pad with zeros if needed if len(pacf_vals) < n_lags + 1: pacf_vals = np.concatenate( [pacf_vals, np.zeros(n_lags + 1 - len(pacf_vals))] ) # Bartlett confidence band: +/- z / sqrt(n) z = sp_stats.norm.ppf(1 - significance / 2) conf_band = z / np.sqrt(n) significant_acf = [ int(i) for i in range(1, len(acf_vals)) if abs(acf_vals[i]) > conf_band ] significant_pacf = [ int(i) for i in range(1, len(pacf_vals)) if abs(pacf_vals[i]) > conf_band ] # Ljung-Box test from statsmodels.stats.diagnostic import acorr_ljungbox with warnings.catch_warnings(): warnings.simplefilter("ignore") lb = acorr_ljungbox(clean, lags=[n_lags], return_df=True) lb_stat = float(lb["lb_stat"].iloc[0]) lb_pval = float(lb["lb_pvalue"].iloc[0]) return { "acf": acf_vals, "pacf": pacf_vals, "significant_acf_lags": significant_acf, "significant_pacf_lags": significant_pacf, "ljung_box": { "statistic": lb_stat, "p_value": lb_pval, "is_significant": lb_pval < significance, }, "first_significant_lag": significant_acf[0] if significant_acf else None, }
# --------------------------------------------------------------------------- # Spectral Features # ---------------------------------------------------------------------------
[docs] def spectral_features( data: pd.Series, fs: float = 1.0, ) -> dict: """Compute power spectral density features. Extracts summary statistics from the periodogram including the dominant frequency, spectral entropy (a measure of spectral complexity), and spectral flatness (how noise-like the signal is). Parameters: data: Time series. NaN values are dropped. fs: Sampling frequency (default 1.0). For daily data with annual cycles, use ``fs=1.0`` and interpret frequencies as cycles per observation. Returns: Dictionary with: - ``dominant_frequency``: float, the frequency with the highest spectral power (excluding DC component). - ``dominant_period``: float, the period corresponding to the dominant frequency (``1 / dominant_frequency``). - ``spectral_entropy``: float, Shannon entropy of the normalised power spectral density. Higher values indicate a more complex, broadband signal; lower values indicate a few dominant frequencies. Range: [0, log2(N/2)]. - ``spectral_flatness``: float in [0, 1]. Ratio of geometric mean to arithmetic mean of the PSD. A value near 1 indicates white noise; near 0 indicates a tonal/periodic signal. Also known as Wiener entropy. - ``spectral_centroid``: float, the weighted mean frequency (centre of mass of the spectrum). - ``psd``: 1-D numpy array of power spectral density values. - ``frequencies``: 1-D numpy array of corresponding frequencies. Example: >>> import numpy as np, pandas as pd >>> t = np.arange(500, dtype=float) >>> pure_tone = pd.Series(np.sin(2 * np.pi * t / 20)) # period=20 >>> result = spectral_features(pure_tone) >>> abs(result['dominant_period'] - 20) < 2 True >>> result['spectral_flatness'] < 0.3 # tonal signal True """ from scipy.signal import periodogram from wraquant.core._coerce import coerce_series data = coerce_series(data, name="data") clean = data.dropna().values.astype(np.float64) freqs, psd = periodogram(clean, fs=fs) # Exclude DC component if len(freqs) > 1: freqs_no_dc = freqs[1:] psd_no_dc = psd[1:] else: freqs_no_dc = freqs psd_no_dc = psd if len(psd_no_dc) == 0 or np.sum(psd_no_dc) < 1e-15: return { "dominant_frequency": 0.0, "dominant_period": float("inf"), "spectral_entropy": 0.0, "spectral_flatness": 0.0, "spectral_centroid": 0.0, "psd": psd, "frequencies": freqs, } # Dominant frequency peak_idx = int(np.argmax(psd_no_dc)) dominant_freq = float(freqs_no_dc[peak_idx]) dominant_period = 1.0 / dominant_freq if dominant_freq > 0 else float("inf") # Spectral entropy psd_norm = psd_no_dc / np.sum(psd_no_dc) psd_norm = psd_norm[psd_norm > 0] # avoid log(0) spectral_entropy = float(-np.sum(psd_norm * np.log2(psd_norm))) # Spectral flatness (Wiener entropy) log_psd = np.log(psd_no_dc[psd_no_dc > 0]) geo_mean = np.exp(np.mean(log_psd)) arith_mean = np.mean(psd_no_dc[psd_no_dc > 0]) spectral_flatness = float(geo_mean / arith_mean) if arith_mean > 0 else 0.0 # Spectral centroid spectral_centroid = float( np.sum(freqs_no_dc * psd_no_dc) / np.sum(psd_no_dc) ) return { "dominant_frequency": dominant_freq, "dominant_period": dominant_period, "spectral_entropy": spectral_entropy, "spectral_flatness": spectral_flatness, "spectral_centroid": spectral_centroid, "psd": psd, "frequencies": freqs, }
# --------------------------------------------------------------------------- # Complexity Features # ---------------------------------------------------------------------------
[docs] def complexity_features( data: pd.Series, m: int = 2, r: float | None = None, order: int = 3, ) -> dict: """Compute entropy-based complexity measures for a time series. Extracts three entropy measures that quantify the regularity and predictability of the signal: 1. **Sample entropy (SampEn)**: probability that patterns similar for m points remain similar for m+1 points. Lower values indicate more regularity (self-similarity); higher values indicate more complexity. Defined as -ln(A/B) where A is the count of m+1 length template matches and B is the count of m length matches, within tolerance r. 2. **Approximate entropy (ApEn)**: similar to SampEn but includes self-matches, making it slightly biased toward regularity. Historically introduced first (Pincus 1991). 3. **Permutation entropy (PeEn)**: based on the frequency of ordinal patterns of length ``order``. Robust to noise and monotonic transformations. Normalised to [0, 1]. Parameters: data: Time series. NaN values are dropped. m: Embedding dimension for SampEn and ApEn (default 2). Typical values: 2 or 3. r: Tolerance threshold for SampEn and ApEn. If ``None``, defaults to ``0.2 * std(data)`` (standard choice). order: Ordinal pattern length for permutation entropy (default 3). Typical values: 3-7. Returns: Dictionary with: - ``sample_entropy``: float, SampEn(m, r). Values typically range from 0 (perfectly regular) to ~2.5 (random). - ``approximate_entropy``: float, ApEn(m, r). - ``permutation_entropy``: float, normalised PeEn in [0, 1]. 0 = perfectly predictable, 1 = maximally complex/random. Example: >>> import numpy as np, pandas as pd >>> rng = np.random.default_rng(42) >>> regular = pd.Series(np.sin(np.arange(500) * 0.1)) >>> random = pd.Series(rng.normal(0, 1, 500)) >>> reg_feat = complexity_features(regular) >>> rnd_feat = complexity_features(random) >>> reg_feat['sample_entropy'] < rnd_feat['sample_entropy'] True References: - Richman, J.S. & Moorman, J.R. (2000), "Physiological time-series analysis using approximate entropy and sample entropy", AJP. - Pincus, S.M. (1991), "Approximate entropy as a measure of system complexity", PNAS. - Bandt, C. & Pompe, B. (2002), "Permutation Entropy: A Natural Complexity Measure for Time Series", PRL. """ from wraquant.core._coerce import coerce_series data = coerce_series(data, name="data") clean = data.dropna().values.astype(np.float64) n = len(clean) if r is None: r = 0.2 * np.std(clean) if r < 1e-10: r = 0.2 # --- Sample Entropy --- sample_ent = _sample_entropy(clean, m, r) # --- Approximate Entropy --- approx_ent = _approximate_entropy(clean, m, r) # --- Permutation Entropy --- perm_ent = _permutation_entropy(clean, order) return { "sample_entropy": sample_ent, "approximate_entropy": approx_ent, "permutation_entropy": perm_ent, }
def _sample_entropy(x: np.ndarray, m: int, r: float) -> float: """Compute sample entropy SampEn(m, r) for a 1-D array.""" n = len(x) def _count_matches(template_len: int) -> int: count = 0 for i in range(n - template_len): for j in range(i + 1, n - template_len): if np.max(np.abs(x[i : i + template_len] - x[j : j + template_len])) <= r: count += 1 return count b = _count_matches(m) a = _count_matches(m + 1) if b == 0: return float("inf") if a == 0: return float("inf") return float(-np.log(a / b)) def _approximate_entropy(x: np.ndarray, m: int, r: float) -> float: """Compute approximate entropy ApEn(m, r) for a 1-D array.""" n = len(x) def _phi(template_len: int) -> float: templates = np.array([x[i : i + template_len] for i in range(n - template_len + 1)]) counts = np.zeros(len(templates)) for i, t in enumerate(templates): for j, s in enumerate(templates): if np.max(np.abs(t - s)) <= r: counts[i] += 1 counts /= len(templates) return float(np.mean(np.log(counts))) phi_m = _phi(m) phi_m1 = _phi(m + 1) return float(phi_m - phi_m1) def _permutation_entropy(x: np.ndarray, order: int) -> float: """Compute normalised permutation entropy for a 1-D array.""" n = len(x) if n < order: return 0.0 # Count ordinal patterns pattern_counts: dict[tuple[int, ...], int] = {} for i in range(n - order + 1): pattern = tuple(int(j) for j in np.argsort(x[i : i + order])) pattern_counts[pattern] = pattern_counts.get(pattern, 0) + 1 total = sum(pattern_counts.values()) probs = np.array([c / total for c in pattern_counts.values()]) # Shannon entropy normalised by max possible entropy h = float(-np.sum(probs * np.log2(probs))) max_h = np.log2(factorial(order)) if max_h < 1e-15: return 0.0 return float(h / max_h)