Source code for wraquant.ts.seasonality

"""Seasonality detection and feature engineering."""

from __future__ import annotations

import numpy as np
import pandas as pd
from scipy.signal import find_peaks, periodogram


[docs] def detect_seasonality( data: pd.Series, max_period: int = 365, ) -> list[int]: """Detect dominant seasonal periods via spectral analysis. Uses Welch's periodogram to identify significant frequency peaks. Parameters: data: Time series to analyse. max_period: Maximum period to consider. Returns: List of detected seasonal periods (in number of observations), sorted by spectral power (strongest first). """ from wraquant.core._coerce import coerce_series data = coerce_series(data, name="data") clean = data.dropna().values freqs, power = periodogram(clean) # Ignore the DC component (index 0) and frequencies below 1/max_period min_freq = 1.0 / max_period if max_period > 0 else 0.0 mask = freqs > min_freq freqs = freqs[mask] power = power[mask] if len(power) == 0: return [] peak_indices, _ = find_peaks(power, height=np.median(power) * 3) if len(peak_indices) == 0: return [] # Sort peaks by power (descending) sorted_peaks = sorted(peak_indices, key=lambda i: power[i], reverse=True) periods = [] for idx in sorted_peaks: if freqs[idx] > 0: period = int(round(1.0 / freqs[idx])) if 2 <= period <= max_period and period not in periods: periods.append(period) return periods
[docs] def fourier_features( index: pd.DatetimeIndex, period: int, n_harmonics: int, ) -> pd.DataFrame: """Generate Fourier sine/cosine features for a datetime index. Useful for encoding seasonality as regression features. Parameters: index: Datetime index. period: Seasonal period (in the same time unit as the index). n_harmonics: Number of Fourier harmonics to generate. Returns: DataFrame with ``sin_k`` and ``cos_k`` columns for each harmonic. """ t = np.arange(len(index), dtype=np.float64) columns: dict[str, np.ndarray] = {} for k in range(1, n_harmonics + 1): angle = 2 * np.pi * k * t / period columns[f"sin_{k}"] = np.sin(angle) columns[f"cos_{k}"] = np.cos(angle) return pd.DataFrame(columns, index=index)
# --------------------------------------------------------------------------- # Multi-period Fourier Features # ---------------------------------------------------------------------------
[docs] def multi_fourier_features( index: pd.DatetimeIndex, periods: list[int], n_harmonics: int | list[int] = 3, ) -> pd.DataFrame: """Generate Fourier terms for multiple seasonal periods. Creates sin/cos pairs for each period-harmonic combination, producing a feature matrix suitable for regression-based seasonal modelling (e.g., linear regression, Prophet-style models, or as exogenous regressors for ARIMA). For a period P and harmonic k, the features are: ``sin(2 * pi * k * t / P)`` and ``cos(2 * pi * k * t / P)`` This captures up to k-th order seasonal variation within each period. Parameters: index: Datetime index of the time series. periods: List of seasonal periods (in the same units as the index). For example, ``[7, 365]`` for daily data with weekly and yearly seasonality. n_harmonics: Number of harmonics per period. Can be a single int (applied to all periods) or a list of ints (one per period). Default 3. Returns: DataFrame with columns named ``sin_P{period}_H{harmonic}`` and ``cos_P{period}_H{harmonic}`` for each period-harmonic pair. Shape is ``(len(index), 2 * sum(harmonics))``. Example: >>> import pandas as pd >>> idx = pd.date_range("2020-01-01", periods=365, freq="D") >>> df = multi_fourier_features(idx, periods=[7, 365], n_harmonics=3) >>> df.shape[1] 12 """ if isinstance(n_harmonics, int): harmonics_per_period = [n_harmonics] * len(periods) else: if len(n_harmonics) != len(periods): msg = "n_harmonics list must have same length as periods" raise ValueError(msg) harmonics_per_period = n_harmonics t = np.arange(len(index), dtype=np.float64) columns: dict[str, np.ndarray] = {} for period, n_h in zip(periods, harmonics_per_period): for k in range(1, n_h + 1): angle = 2 * np.pi * k * t / period columns[f"sin_P{period}_H{k}"] = np.sin(angle) columns[f"cos_P{period}_H{k}"] = np.cos(angle) return pd.DataFrame(columns, index=index)
# --------------------------------------------------------------------------- # Seasonal Strength # ---------------------------------------------------------------------------
[docs] def seasonal_strength( data: pd.Series, period: int | None = None, ) -> float: """Quantify the strength of seasonality in a time series. Uses the Wang, Smith, and Hyndman (2006) measure: ``F_s = max(0, 1 - Var(R_t) / Var(S_t + R_t))`` where S_t is the seasonal component and R_t is the remainder from an STL decomposition. Returns a float in [0, 1]: - 0.0: no detectable seasonality (remainder dominates). - 1.0: perfectly seasonal (no noise). - > 0.9 is typically considered "strong" seasonality. - < 0.4 is typically "weak". Parameters: data: Time series to evaluate. NaN values are dropped. period: Seasonal period for the STL decomposition. If ``None``, defaults to 7. Returns: Strength of seasonality as a float in [0, 1]. Example: >>> import numpy as np, pandas as pd >>> t = np.arange(200, dtype=float) >>> pure_seasonal = pd.Series(10 * np.sin(2 * np.pi * t / 20)) >>> strength = seasonal_strength(pure_seasonal, period=20) >>> strength > 0.9 True References: - Wang, X., Smith, K. & Hyndman, R. (2006), "Characteristic- Based Clustering for Time Series Data", Data Mining and Knowledge Discovery. """ from statsmodels.tsa.seasonal import STL as _STL from wraquant.core._coerce import coerce_series data = coerce_series(data, name="data") clean = data.dropna() if period is None: period = 7 if len(clean) < 2 * period: return 0.0 result = _STL(clean, period=period).fit() seasonal_component = result.seasonal remainder = result.resid var_remainder = np.var(remainder) var_deseasonalised = np.var(seasonal_component + remainder) if var_deseasonalised < 1e-15: return 0.0 strength = max(0.0, 1.0 - var_remainder / var_deseasonalised) return float(strength)
# --------------------------------------------------------------------------- # Multi-Seasonal Decomposition # ---------------------------------------------------------------------------
[docs] def multi_seasonal_decompose( data: pd.Series, periods: list[int], ) -> dict: """Decompose a time series with multiple seasonal periods. Handles complex seasonality (e.g., daily data with both weekly and yearly patterns) by using MSTL (Multiple Seasonal-Trend decomposition using Loess) from statsmodels when available, or an iterative STL approach as fallback. Parameters: data: Time series to decompose. NaN values are dropped. periods: List of seasonal periods, ordered from shortest to longest. For example, ``[7, 365]`` for weekly and yearly seasonality in daily data. Returns: Dictionary with: - ``trend``: pd.Series of the trend component. - ``seasonal``: dict mapping each period to its pd.Series seasonal component. - ``residual``: pd.Series of the residual after removing trend and all seasonal components. Example: >>> import numpy as np, pandas as pd >>> t = np.arange(730, dtype=float) >>> weekly = 3 * np.sin(2 * np.pi * t / 7) >>> yearly = 5 * np.sin(2 * np.pi * t / 365) >>> trend = 0.01 * t >>> data = pd.Series(trend + weekly + yearly) >>> result = multi_seasonal_decompose(data, periods=[7, 365]) >>> sorted(result['seasonal'].keys()) [7, 365] References: - Bandara, K. et al. (2021), "MSTL: A Seasonal-Trend Decomposition Algorithm for Time Series with Multiple Seasonal Patterns", arXiv:2107.13462. """ from statsmodels.tsa.seasonal import STL as _STL from wraquant.core._coerce import coerce_series data = coerce_series(data, name="data") clean = data.dropna() # Try MSTL first (available in statsmodels >= 0.14) try: from statsmodels.tsa.seasonal import MSTL mstl = MSTL(clean, periods=periods) result = mstl.fit() seasonal_dict: dict[int, pd.Series] = {} # MSTL returns seasonal as a DataFrame with one column per period if hasattr(result.seasonal, "columns"): for i, period in enumerate(periods): col = result.seasonal.iloc[:, i] seasonal_dict[period] = pd.Series( col.values, index=clean.index, name=f"seasonal_{period}", ) else: # Single seasonal component seasonal_dict[periods[0]] = pd.Series( result.seasonal.values if hasattr(result.seasonal, "values") else result.seasonal, index=clean.index, name=f"seasonal_{periods[0]}", ) return { "trend": pd.Series(result.trend, index=clean.index, name="trend"), "seasonal": seasonal_dict, "residual": pd.Series(result.resid, index=clean.index, name="residual"), } except (ImportError, TypeError): pass # Fallback: iterative STL # Process periods from shortest to longest sorted_periods = sorted(periods) remainder = clean.copy() seasonal_dict = {} for period in sorted_periods: if len(remainder) < 2 * period: seasonal_dict[period] = pd.Series( 0.0, index=clean.index, name=f"seasonal_{period}", ) continue stl_result = _STL(remainder, period=period).fit() seasonal_dict[period] = pd.Series( stl_result.seasonal.values, index=clean.index, name=f"seasonal_{period}", ) remainder = pd.Series( stl_result.trend.values + stl_result.resid.values, index=clean.index, ) # Final STL on remainder to get trend if len(sorted_periods) > 0: final_period = sorted_periods[0] if len(remainder) >= 2 * final_period: final_stl = _STL(remainder, period=final_period).fit() trend = pd.Series( final_stl.trend.values, index=clean.index, name="trend", ) residual = pd.Series( final_stl.resid.values, index=clean.index, name="residual", ) else: trend = remainder residual = pd.Series(0.0, index=clean.index, name="residual") else: trend = remainder residual = pd.Series(0.0, index=clean.index, name="residual") return { "trend": trend, "seasonal": seasonal_dict, "residual": residual, }