Source code for wraquant.ts.anomaly

"""Time series anomaly detection.

Provides methods for detecting anomalous observations in time series
data using statistical and machine learning approaches:

1. **Isolation Forest** on rolling features -- detects contextual
   anomalies by featurising the local window (mean, std, skew) and
   running an isolation forest on the feature space.

2. **Forecast-based anomaly detection** -- fits a time series model
   and flags observations that deviate from the forecast by more than
   k standard deviations. Does NOT require fbprophet.

3. **Rolling Grubbs test** -- applies the classical Grubbs outlier test
   in a rolling window to detect local outliers.
"""

from __future__ import annotations

import warnings
from typing import Any

import numpy as np
import pandas as pd
from scipy import stats as sp_stats

from wraquant.core.decorators import requires_extra


# ---------------------------------------------------------------------------
# Isolation Forest on Time Series Features
# ---------------------------------------------------------------------------


[docs] @requires_extra("ml") def isolation_forest_ts( data: pd.Series, window: int = 20, contamination: float = 0.05, features: list[str] | None = None, random_state: int = 42, ) -> dict: """Anomaly detection using Isolation Forest on rolling features. Rather than applying Isolation Forest directly to the raw values, this function first computes rolling window features (mean, std, skew, min, max) to capture the *context* of each observation. This approach detects **contextual anomalies** -- values that are unusual given their local context, even if they are not globally extreme. Parameters: data: Time series. NaN values are dropped. window: Rolling window size for feature computation (default 20). contamination: Expected proportion of anomalies in the data (default 0.05). Controls the threshold for the Isolation Forest decision function. features: List of rolling features to compute. If ``None``, uses ``["mean", "std", "skew", "min", "max"]``. Supported values: ``"mean"``, ``"std"``, ``"skew"``, ``"min"``, ``"max"``, ``"median"``, ``"range"``. random_state: Random seed for reproducibility (default 42). Returns: Dictionary with: - ``anomaly_scores``: pd.Series of anomaly scores from the Isolation Forest decision function (lower = more anomalous). - ``anomaly_labels``: pd.Series of int labels (1 = normal, -1 = anomaly). - ``threshold``: float, the decision function threshold. - ``anomaly_indices``: list of index values flagged as anomalies. - ``n_anomalies``: int, number of detected anomalies. Example: >>> import numpy as np, pandas as pd >>> rng = np.random.default_rng(42) >>> x = rng.normal(0, 1, 500) >>> x[100] = 20.0 # inject anomaly >>> x[300] = -15.0 # inject anomaly >>> data = pd.Series(x) >>> result = isolation_forest_ts(data, window=20) >>> result['n_anomalies'] > 0 True References: - Liu, F.T. et al. (2008), "Isolation Forest", ICDM. """ from sklearn.ensemble import IsolationForest clean = data.dropna() if features is None: features = ["mean", "std", "skew", "min", "max"] # Compute rolling features rolling = clean.rolling(window=window, min_periods=max(window // 2, 2)) feature_cols: dict[str, pd.Series] = {} for feat in features: if feat == "mean": feature_cols["roll_mean"] = rolling.mean() elif feat == "std": feature_cols["roll_std"] = rolling.std() elif feat == "skew": feature_cols["roll_skew"] = rolling.skew() elif feat == "min": feature_cols["roll_min"] = rolling.min() elif feat == "max": feature_cols["roll_max"] = rolling.max() elif feat == "median": feature_cols["roll_median"] = rolling.median() elif feat == "range": feature_cols["roll_range"] = rolling.max() - rolling.min() # Also include the raw value and its deviation from rolling mean feature_cols["value"] = clean if "roll_mean" in feature_cols: feature_cols["deviation"] = clean - feature_cols["roll_mean"] feature_df = pd.DataFrame(feature_cols, index=clean.index).dropna() # Fit Isolation Forest iso = IsolationForest( contamination=contamination, random_state=random_state, n_estimators=100, ) feature_matrix = feature_df.values labels = iso.fit_predict(feature_matrix) scores = iso.decision_function(feature_matrix) # Build result series aligned to the feature DataFrame index anomaly_scores = pd.Series(scores, index=feature_df.index, name="anomaly_score") anomaly_labels = pd.Series(labels, index=feature_df.index, name="anomaly_label") anomaly_mask = labels == -1 anomaly_indices = list(feature_df.index[anomaly_mask]) threshold = float(iso.offset_) return { "anomaly_scores": anomaly_scores, "anomaly_labels": anomaly_labels, "threshold": threshold, "anomaly_indices": anomaly_indices, "n_anomalies": int(anomaly_mask.sum()), }
# --------------------------------------------------------------------------- # Forecast-Based Anomaly Detection # ---------------------------------------------------------------------------
[docs] def prophet_anomaly( data: pd.Series, k: float = 3.0, seasonal_period: int | None = None, trend: bool = True, ) -> dict: """Forecast-based anomaly detection using statsmodels. Fits an Unobserved Components Model (local level + optional seasonal) and flags observations where the actual value deviates from the smoothed state by more than ``k`` standard deviations of the residuals. This approach does NOT require fbprophet or Prophet. It uses pure statsmodels for a fully open-source, lightweight solution. Parameters: data: Time series. NaN values are dropped. k: Number of standard deviations for the anomaly threshold (default 3.0). Lower values flag more anomalies. seasonal_period: Seasonal period for the model. If ``None``, no seasonal component is included. trend: Include a trend component (default True). Returns: Dictionary with: - ``forecast``: pd.Series of smoothed / fitted values. - ``residuals``: pd.Series of (actual - forecast). - ``anomaly_mask``: pd.Series of bool, True for anomalies. - ``anomaly_indices``: list of index values flagged as anomalies. - ``upper_bound``: pd.Series, forecast + k * sigma. - ``lower_bound``: pd.Series, forecast - k * sigma. - ``sigma``: float, standard deviation of residuals. - ``n_anomalies``: int, number of detected anomalies. Example: >>> import numpy as np, pandas as pd >>> rng = np.random.default_rng(42) >>> x = rng.normal(0, 1, 300) >>> x[50] = 15.0 # anomaly >>> x[200] = -12.0 # anomaly >>> data = pd.Series(x) >>> result = prophet_anomaly(data, k=3.0) >>> result['n_anomalies'] >= 2 True References: - Harvey, A.C. (1989), *Forecasting, Structural Time Series Models and the Kalman Filter*. Cambridge University Press. """ from statsmodels.tsa.statespace.structural import UnobservedComponents clean = data.dropna() level_type = "local linear trend" if trend else "local level" with warnings.catch_warnings(): warnings.simplefilter("ignore") model = UnobservedComponents( clean, level=level_type, seasonal=seasonal_period, stochastic_seasonal=True if seasonal_period else False, ) fit = model.fit(disp=False, maxiter=500) # Smoothed state = fitted values fitted = pd.Series(fit.fittedvalues, index=clean.index, name="fitted") residuals = clean - fitted sigma = float(np.std(residuals.dropna())) if sigma < 1e-10: sigma = 1e-10 upper = fitted + k * sigma lower = fitted - k * sigma anomaly_mask = (clean > upper) | (clean < lower) anomaly_indices = list(clean.index[anomaly_mask]) return { "forecast": fitted, "residuals": residuals, "anomaly_mask": anomaly_mask, "anomaly_indices": anomaly_indices, "upper_bound": pd.Series(upper, index=clean.index, name="upper"), "lower_bound": pd.Series(lower, index=clean.index, name="lower"), "sigma": sigma, "n_anomalies": int(anomaly_mask.sum()), }
# --------------------------------------------------------------------------- # Rolling Grubbs Test # ---------------------------------------------------------------------------
[docs] def grubbs_test_ts( data: pd.Series, window: int = 50, significance: float = 0.05, ) -> dict: """Rolling Grubbs test for outlier detection in time series. Applies the Grubbs test (maximum normed residual test) within a rolling window to detect observations that are statistically unlikely given the local distribution. The Grubbs test statistic for a sample of size n is: ``G = max|x_i - mean| / std`` and the critical value is derived from the t-distribution: ``G_crit = ((n-1) / sqrt(n)) * sqrt(t^2 / (n - 2 + t^2))`` where ``t`` is the critical value of the t-distribution with ``n-2`` degrees of freedom at significance ``alpha / (2n)``. Parameters: data: Time series. NaN values are dropped. window: Rolling window size (default 50). Should be large enough for the Grubbs test to be meaningful (>= 7). significance: Significance level for the Grubbs test (default 0.05). Returns: Dictionary with: - ``outlier_mask``: pd.Series of bool, True for detected outliers. - ``test_statistics``: pd.Series of Grubbs G statistics at each position. - ``outlier_indices``: list of index values flagged as outliers. - ``n_outliers``: int, number of detected outliers. Example: >>> import numpy as np, pandas as pd >>> rng = np.random.default_rng(42) >>> x = rng.normal(0, 1, 200) >>> x[75] = 20.0 # obvious outlier >>> data = pd.Series(x) >>> result = grubbs_test_ts(data, window=50) >>> 75 in result['outlier_indices'] True References: - Grubbs, F.E. (1950), "Sample Criteria for Testing Outlying Observations", Annals of Mathematical Statistics. """ clean = data.dropna() n_obs = len(clean) values = clean.values.astype(np.float64) outlier_flags = np.zeros(n_obs, dtype=bool) g_stats = np.full(n_obs, np.nan) half_w = window // 2 for i in range(n_obs): start = max(0, i - half_w) end = min(n_obs, i + half_w + 1) w = values[start:end] n_w = len(w) if n_w < 7: continue mean_w = np.mean(w) std_w = np.std(w, ddof=1) if std_w < 1e-15: continue # Grubbs statistic for the center point g = abs(values[i] - mean_w) / std_w g_stats[i] = g # Critical value t_crit = sp_stats.t.ppf(1 - significance / (2 * n_w), n_w - 2) g_crit = ((n_w - 1) / np.sqrt(n_w)) * np.sqrt( t_crit ** 2 / (n_w - 2 + t_crit ** 2) ) if g > g_crit: outlier_flags[i] = True outlier_mask = pd.Series(outlier_flags, index=clean.index, name="outlier") test_statistics = pd.Series(g_stats, index=clean.index, name="grubbs_G") outlier_indices = list(clean.index[outlier_flags]) return { "outlier_mask": outlier_mask, "test_statistics": test_statistics, "outlier_indices": outlier_indices, "n_outliers": int(outlier_flags.sum()), }