"""Time series forecasting wrappers.
Provides classical univariate forecasting models (ARIMA, exponential
smoothing), automated model selection, and residual diagnostics.
The two primary approaches are:
1. **Exponential Smoothing (ETS / Holt-Winters)** -- weighted averages
of past observations with exponentially decaying weights. Best for
series with clear trend and/or seasonal patterns and relatively
stable structure. Fast to fit, interpretable, and often competitive
with more complex models for short-horizon forecasts.
2. **ARIMA / auto_arima** -- Box-Jenkins methodology that models the
series as a linear combination of its own lags (AR), differences
(I), and past forecast errors (MA). More flexible than ETS for
capturing complex autocorrelation patterns, but requires careful
order selection (handled automatically by ``auto_arima``).
When to use each:
- **ETS**: strong seasonality, smooth trend, short horizon (1-12
steps). Typical for retail sales, energy demand, macro indicators.
- **ARIMA**: irregular patterns, no obvious seasonality, or when
you need confidence intervals that account for parameter
uncertainty. ``auto_arima`` handles order selection via AIC
minimisation.
- **Neither**: for high-frequency financial returns (near-random-
walk), consider GARCH (``wraquant.vol``) for volatility
forecasting or ML models (``wraquant.ml``).
References:
- Hyndman & Athanasopoulos (2021), "Forecasting: Principles and
Practice" (3rd ed.)
- Box, Jenkins & Reinsel (2015), "Time Series Analysis"
"""
from __future__ import annotations
import itertools
import warnings
from typing import Any, Callable, Sequence
import numpy as np
import pandas as pd
from scipy import stats as sp_stats
from statsmodels.stats.diagnostic import acorr_ljungbox, het_arch
from statsmodels.stats.stattools import durbin_watson, jarque_bera
from statsmodels.tsa.holtwinters import ExponentialSmoothing, SimpleExpSmoothing
from statsmodels.tsa.stattools import acf, pacf
from wraquant.core.decorators import requires_extra
[docs]
def exponential_smoothing(data: pd.Series, **kwargs: Any) -> Any:
"""Fit a Holt-Winters exponential smoothing model.
Holt-Winters decomposes the series into level, trend, and seasonal
components, updating each with exponentially decaying weights. It
is one of the most widely used forecasting methods for business
and macro time series.
When to use:
Use exponential smoothing when:
- The series has a clear trend and/or seasonal pattern.
- You need fast, interpretable forecasts.
- The forecast horizon is short (1-2 seasonal cycles).
Prefer ARIMA (``auto_arima``) when the autocorrelation structure
is more complex or when you need differencing to achieve
stationarity. Prefer ML models (``wraquant.ml``) for nonlinear
patterns.
Mathematical formulation:
Simple: l_t = alpha * y_t + (1 - alpha) * l_{t-1}
Double: adds trend b_t = beta * (l_t - l_{t-1}) + (1 - beta) * b_{t-1}
Triple: adds seasonal s_t (additive or multiplicative)
How to interpret:
Call ``.forecast(h)`` on the returned result for h-step ahead
forecasts. Use ``.summary()`` to inspect smoothing parameters
(alpha, beta, gamma). Low alpha => model relies on history;
high alpha => model follows recent data closely.
Parameters:
data: Time series to fit. Should have a ``DatetimeIndex`` with
a set frequency for seasonal models.
**kwargs: Keyword arguments forwarded to
``statsmodels.tsa.holtwinters.ExponentialSmoothing``.
Key arguments: ``trend`` (``"add"`` or ``"mul"``),
``seasonal`` (``"add"`` or ``"mul"``),
``seasonal_periods`` (int).
Returns:
Fitted ``HoltWintersResultsWrapper``. Call ``.forecast(h)``
for predictions.
Example:
>>> import pandas as pd, numpy as np
>>> idx = pd.date_range("2020-01-01", periods=120, freq="MS")
>>> data = pd.Series(np.arange(120.0) + np.random.randn(120) * 5, index=idx)
>>> result = exponential_smoothing(data, trend="add")
>>> forecast = result.forecast(12)
>>> len(forecast)
12
See Also:
auto_arima: Automatic ARIMA model selection.
wraquant.ts.decomposition.stl_decompose: Decompose before
forecasting.
References:
- Holt (1957), "Forecasting seasonals and trends by
exponentially weighted moving averages"
- Winters (1960), "Forecasting Sales by Exponentially Weighted
Moving Averages"
"""
model = ExponentialSmoothing(data, **kwargs)
return model.fit()
[docs]
@requires_extra("timeseries")
def auto_arima(data: pd.Series, **kwargs: Any) -> Any:
"""Automatically select and fit the best ARIMA model.
Performs a stepwise or grid search over ARIMA(p,d,q) orders,
selecting the model that minimises AIC (by default). This
automates the Box-Jenkins methodology: differencing to achieve
stationarity, then selecting AR and MA orders.
When to use:
Use ``auto_arima`` when:
- You want ARIMA but are unsure of the correct order.
- The series has complex autocorrelation (beyond what ETS
handles).
- You need forecast confidence intervals.
Prefer ``exponential_smoothing`` for simple trend/seasonal
series where speed matters. Prefer ``arima_model_selection``
when you want to compare all candidate orders explicitly.
How to interpret:
The returned model object has:
- ``.predict(n_periods)`` for point forecasts.
- ``.predict_in_sample()`` for fitted values.
- ``.summary()`` for model diagnostics (coefficients,
AIC/BIC, residual tests).
- ``.order()`` for the selected (p,d,q) order.
Use ``arima_diagnostics`` to validate the residuals.
Parameters:
data: Time series to fit. Should have >50 observations for
reliable order selection.
**kwargs: Keyword arguments forwarded to
``pmdarima.auto_arima``. Key arguments include
``seasonal`` (bool), ``m`` (seasonal period),
``stepwise`` (bool, default True for speed),
``information_criterion`` (``"aic"`` or ``"bic"``).
Returns:
Fitted ARIMA model from pmdarima with ``.predict()``,
``.summary()``, and ``.order()`` methods.
Raises:
ImportError: If the ``timeseries`` optional dependency group
(``pmdarima``) is not installed.
Example:
>>> import pandas as pd, numpy as np
>>> data = pd.Series(np.cumsum(np.random.randn(200)))
>>> model = auto_arima(data) # doctest: +SKIP
>>> model.order() # doctest: +SKIP
(1, 1, 0)
See Also:
exponential_smoothing: Holt-Winters ETS models.
arima_diagnostics: Residual checks for fitted ARIMA models.
arima_model_selection: Manual grid search over ARIMA orders.
References:
- Hyndman & Khandakar (2008), "Automatic Time Series
Forecasting: the forecast Package for R"
"""
import pmdarima as pm
return pm.auto_arima(data, **kwargs)
[docs]
def arima_diagnostics(
model_result: Any,
nlags: int = 10,
alpha: float = 0.05,
) -> dict:
"""Run comprehensive residual diagnostics on a fitted ARIMA model.
After fitting an ARIMA model, it is critical to check that the
residuals behave like white noise. This function runs:
- **Ljung-Box test**: no remaining autocorrelation in residuals.
- **Jarque-Bera test**: normality of residuals.
- **ARCH-LM test**: no remaining heteroskedasticity (ARCH effects).
- **Durbin-Watson statistic**: first-order autocorrelation check.
- **ACF/PACF values**: for visual inspection of residual structure.
Parameters:
model_result: Fitted model result from ``statsmodels`` ARIMA/SARIMAX
(must have a ``.resid`` attribute) or ``pmdarima`` ARIMA
(must have a ``.resid()`` method).
nlags: Number of lags for autocorrelation tests (default 10).
alpha: Significance level for pass/fail decisions (default 0.05).
Returns:
Dictionary with:
- ``ljung_box``: dict with ``statistic``, ``p_value``, ``pass``.
- ``jarque_bera``: dict with ``statistic``, ``p_value``, ``pass``.
- ``arch_lm``: dict with ``statistic``, ``p_value``, ``pass``.
- ``durbin_watson``: Durbin-Watson statistic (near 2 = no autocorrelation).
- ``acf_values``: autocorrelation function values.
- ``pacf_values``: partial autocorrelation function values.
- ``model_adequate``: ``True`` if all diagnostic tests pass.
Example:
>>> from statsmodels.tsa.arima.model import ARIMA
>>> import pandas as pd, numpy as np
>>> data = pd.Series(np.cumsum(np.random.randn(200)))
>>> fit = ARIMA(data, order=(1, 1, 1)).fit()
>>> arima_diagnostics(fit) # doctest: +SKIP
"""
# Extract residuals (handle both statsmodels and pmdarima)
if hasattr(model_result, "resid") and callable(model_result.resid):
resid = model_result.resid()
elif hasattr(model_result, "resid"):
resid = model_result.resid
else:
msg = "Model result must have a 'resid' attribute or method."
raise AttributeError(msg)
resid = np.asarray(resid, dtype=float)
resid = resid[~np.isnan(resid)]
# Ljung-Box test for autocorrelation
lb_result = acorr_ljungbox(resid, lags=nlags, return_df=True)
lb_last = lb_result.iloc[-1]
lb_stat = float(lb_last["lb_stat"])
lb_pval = float(lb_last["lb_pvalue"])
lb_pass = bool(lb_pval > alpha)
# Jarque-Bera normality test
jb_stat, jb_pval, jb_skew, jb_kurt = jarque_bera(resid)
jb_pass = bool(jb_pval > alpha)
# ARCH-LM test for heteroskedasticity
arch_stat, arch_pval, _, _ = het_arch(resid, nlags=nlags)
arch_pass = bool(arch_pval > alpha)
# Durbin-Watson statistic
dw = float(durbin_watson(resid))
# ACF / PACF values
max_acf_lags = min(nlags, len(resid) // 2 - 1)
if max_acf_lags < 1:
max_acf_lags = 1
acf_vals = acf(resid, nlags=max_acf_lags, fft=True)
pacf_vals = pacf(resid, nlags=max_acf_lags)
model_adequate = lb_pass and jb_pass and arch_pass
return {
"ljung_box": {
"statistic": lb_stat,
"p_value": lb_pval,
"pass": lb_pass,
},
"jarque_bera": {
"statistic": float(jb_stat),
"p_value": float(jb_pval),
"pass": jb_pass,
},
"arch_lm": {
"statistic": float(arch_stat),
"p_value": float(arch_pval),
"pass": arch_pass,
},
"durbin_watson": dw,
"acf_values": acf_vals,
"pacf_values": pacf_vals,
"model_adequate": model_adequate,
}
[docs]
def arima_model_selection(
data: pd.Series,
p_range: range | list[int] = range(0, 4),
d_range: range | list[int] = range(0, 3),
q_range: range | list[int] = range(0, 4),
criterion: str = "aic",
) -> pd.DataFrame:
"""Compare ARIMA(p,d,q) combinations and rank by information criterion.
Performs a grid search over combinations of ARIMA orders and ranks
models by AIC or BIC. Use this to systematically find the best
ARIMA specification for a time series.
Parameters:
data: Time series to model.
p_range: Range of AR orders to test (default 0-3).
d_range: Range of differencing orders to test (default 0-2).
q_range: Range of MA orders to test (default 0-3).
criterion: Ranking criterion, ``"aic"`` (default) or ``"bic"``.
Returns:
DataFrame with columns: ``order`` (tuple), ``aic``, ``bic``,
``converged``, sorted by the chosen criterion (ascending).
Example:
>>> import pandas as pd, numpy as np
>>> data = pd.Series(np.cumsum(np.random.randn(200)))
>>> arima_model_selection(data, p_range=range(0, 3), d_range=range(0, 2), q_range=range(0, 3)) # doctest: +SKIP
"""
from statsmodels.tsa.arima.model import ARIMA
rows: list[dict] = []
for p, d, q in itertools.product(p_range, d_range, q_range):
try:
with warnings.catch_warnings():
warnings.simplefilter("ignore")
model = ARIMA(data, order=(p, d, q))
fit = model.fit()
rows.append(
{
"order": (p, d, q),
"aic": float(fit.aic),
"bic": float(fit.bic),
"converged": bool(fit.mle_retvals.get("converged", True)),
}
)
except Exception: # noqa: BLE001
rows.append(
{
"order": (p, d, q),
"aic": float("inf"),
"bic": float("inf"),
"converged": False,
}
)
df = pd.DataFrame(rows)
if not df.empty:
df = df.sort_values(criterion).reset_index(drop=True)
return df
# ---------------------------------------------------------------------------
# Helpers (private)
# ---------------------------------------------------------------------------
def _detect_seasonal_period(data: pd.Series) -> int:
"""Auto-detect seasonal period from a time series index or ACF.
Uses the index frequency if available, otherwise picks the first
significant ACF peak beyond lag 1.
Returns:
Detected seasonal period (minimum 2).
"""
freq_map: dict[str, int] = {
"B": 5,
"D": 7,
"W": 52,
"M": 12,
"MS": 12,
"Q": 4,
"QS": 4,
"Y": 1,
"YS": 1,
"h": 24,
"H": 24,
"min": 60,
"T": 60,
}
if hasattr(data.index, "freqstr") and data.index.freqstr is not None:
base = data.index.freqstr.split("-")[0]
if base in freq_map:
return freq_map[base]
# Fall back to ACF peak detection
n = len(data)
max_lag = min(n // 2 - 1, 120)
if max_lag < 3:
return 2
acf_vals = acf(data.values, nlags=max_lag, fft=True)
# Skip lag 0 and 1, find first local max
for i in range(2, len(acf_vals) - 1):
if acf_vals[i] > acf_vals[i - 1] and acf_vals[i] > acf_vals[i + 1]:
return max(i, 2)
return 2
def _naive_seasonal_forecast(
data: pd.Series, h: int, seasonal_period: int
) -> np.ndarray:
"""Seasonal naive forecast: repeat last full season."""
vals = data.values
forecasts = np.empty(h)
for i in range(h):
idx = len(vals) - seasonal_period + (i % seasonal_period)
if idx < 0:
idx = len(vals) - 1
forecasts[i] = vals[idx]
return forecasts
def _drift_forecast(data: pd.Series, h: int) -> np.ndarray:
"""Random walk with drift forecast."""
vals = data.values
n = len(vals)
drift = (vals[-1] - vals[0]) / (n - 1) if n > 1 else 0.0
return np.array([vals[-1] + drift * (i + 1) for i in range(h)])
def _make_forecast_index(data: pd.Series, h: int) -> pd.Index:
"""Create a forecast index extending the data's index by *h* periods."""
if isinstance(data.index, pd.DatetimeIndex) and data.index.freq is not None:
return pd.date_range(
start=data.index[-1] + data.index.freq,
periods=h,
freq=data.index.freq,
)
if isinstance(data.index, pd.DatetimeIndex) and len(data.index) > 1:
freq = pd.infer_freq(data.index)
if freq is not None:
return pd.date_range(
start=data.index[-1] + pd.tseries.frequencies.to_offset(freq),
periods=h,
freq=freq,
)
# fallback: integer index
last = data.index[-1]
if isinstance(last, (int, np.integer)):
return pd.RangeIndex(start=last + 1, stop=last + 1 + h)
return pd.RangeIndex(h)
def _safe_hw(data: pd.Series, h: int, seasonal_period: int) -> np.ndarray:
"""Holt-Winters wrapper that falls back to NaN on failure."""
try:
res = holt_winters_forecast(data, h=h, seasonal_periods=seasonal_period)
return res["forecast"].values
except Exception: # noqa: BLE001
return np.full(h, np.nan)
# ---------------------------------------------------------------------------
# theta_forecast
# ---------------------------------------------------------------------------
[docs]
def theta_forecast(
data: pd.Series,
h: int = 10,
theta: float = 2.0,
) -> dict:
"""Theta method forecast (Assimakopoulos & Nikolopoulos, 2000).
The Theta method decomposes the series into two "theta lines":
- **Theta-0**: the linear regression line (long-run trend).
- **Theta-2**: an amplified version of the original curvature,
forecast with Simple Exponential Smoothing (SES).
The final forecast is the average of these two components. Despite
its simplicity, Theta consistently ranks among the best methods in
forecasting competitions (M3, M4).
When to use:
- Quick, competitive baseline for non-seasonal or deseasonalised
series.
- When you need a robust method that does not require order
selection (unlike ARIMA).
Math:
Given a series *y_t*, define the second differences
``D^2(y_t) = y_t - 2*y_{t-1} + y_{t-2}``. The theta line
``Z_theta(t)`` satisfies ``D^2(Z) = theta * D^2(y)``. For
``theta=0`` the solution is a straight line; for ``theta=2``
the curvature is doubled.
Parameters:
data: Time series (at least 6 observations).
h: Forecast horizon (default 10).
theta: Theta parameter controlling curvature amplification
(default 2.0).
Returns:
Dictionary with:
- ``forecast``: pd.Series of point forecasts.
- ``fitted_values``: pd.Series of in-sample fitted values.
- ``theta_params``: dict with ``theta``, ``ses_alpha``,
``drift``.
References:
Assimakopoulos, V., & Nikolopoulos, K. (2000). The Theta
model: a decomposition approach to forecasting.
*International Journal of Forecasting*, 16(4), 521-530.
Example:
>>> import pandas as pd, numpy as np
>>> data = pd.Series(np.cumsum(np.random.randn(100)) + 50)
>>> result = theta_forecast(data, h=12)
>>> result['forecast'].shape
(12,)
"""
y = data.values.astype(float)
n = len(y)
t = np.arange(1, n + 1, dtype=float)
# Theta-0: linear regression line
slope, intercept = np.polyfit(t, y, 1)
theta0_in = intercept + slope * t
theta0_fcast = np.array(
[intercept + slope * (n + i) for i in range(1, h + 1)]
)
# Theta-2: amplified curvature via SES
theta2_in = theta * y - (theta - 1) * theta0_in
with warnings.catch_warnings():
warnings.simplefilter("ignore")
ses_model = SimpleExpSmoothing(theta2_in).fit(optimized=True)
ses_alpha = float(ses_model.params.get("smoothing_level", 0.5))
theta2_fcast = ses_model.forecast(h)
# Average the two theta lines
fcast_vals = 0.5 * (theta0_fcast + theta2_fcast)
fitted_vals = 0.5 * (theta0_in + ses_model.fittedvalues)
idx = _make_forecast_index(data, h)
from wraquant.core.results import ForecastResult
return ForecastResult(
forecast=pd.Series(fcast_vals, index=idx, name="theta_forecast"),
fitted_values=fitted_vals,
method="theta",
metrics={
"theta": theta,
"ses_alpha": ses_alpha,
"drift": float(slope),
},
)
# ---------------------------------------------------------------------------
# ses_forecast
# ---------------------------------------------------------------------------
[docs]
def ses_forecast(
data: pd.Series,
h: int = 10,
alpha: float | None = None,
) -> dict:
"""Simple Exponential Smoothing (SES) with optimal alpha.
SES is the simplest exponential smoothing method. It assigns
exponentially decreasing weights to past observations. The single
smoothing parameter ``alpha`` controls how fast old observations
are discounted:
- ``alpha`` near 1 => recent data dominates (series is volatile).
- ``alpha`` near 0 => old data persists (series is stable).
When to use:
- Stationary series with no clear trend or seasonality.
- Quick baseline forecast.
- As a building block inside ensemble or Theta methods.
Math:
``l_t = alpha * y_t + (1 - alpha) * l_{t-1}``
Forecast: ``y_{t+h|t} = l_t`` (flat forecast for all horizons).
Parameters:
data: Time series to forecast.
h: Forecast horizon (default 10).
alpha: Smoothing parameter in (0, 1). If ``None`` (default),
the optimal alpha is estimated by minimising SSE.
Returns:
Dictionary with:
- ``forecast``: pd.Series of point forecasts.
- ``alpha``: optimised smoothing parameter.
- ``fitted_values``: pd.Series of in-sample fitted values.
- ``residuals``: pd.Series of in-sample residuals.
Example:
>>> import pandas as pd, numpy as np
>>> data = pd.Series(np.random.randn(100).cumsum() + 50)
>>> result = ses_forecast(data, h=5)
>>> len(result['forecast'])
5
"""
with warnings.catch_warnings():
warnings.simplefilter("ignore")
if alpha is not None:
fit = SimpleExpSmoothing(data).fit(
smoothing_level=alpha, optimized=False
)
else:
fit = SimpleExpSmoothing(data).fit(optimized=True)
opt_alpha = float(fit.params.get("smoothing_level", alpha or 0.5))
fcast = fit.forecast(h)
idx = _make_forecast_index(data, h)
from wraquant.core.results import ForecastResult
return ForecastResult(
forecast=pd.Series(
fcast.values, index=idx, name="ses_forecast"
),
fitted_values=fit.fittedvalues.values,
residuals=data.values - fit.fittedvalues.values,
method="ses",
metrics={"alpha": opt_alpha},
model=fit,
)
# ---------------------------------------------------------------------------
# holt_winters_forecast
# ---------------------------------------------------------------------------
[docs]
def holt_winters_forecast(
data: pd.Series,
h: int = 10,
seasonal: str = "add",
seasonal_periods: int | None = None,
trend: str | None = "add",
) -> dict:
"""Holt-Winters exponential smoothing with trend and seasonality.
This is the full triple exponential smoothing method. It extends
SES by adding a trend component (Holt) and a seasonal component
(Winters), supporting both additive and multiplicative seasonality.
When to use:
- Series with clear trend **and** seasonality (e.g., monthly
sales, quarterly GDP).
- Choose ``seasonal='mul'`` when seasonal fluctuations grow
with the level of the series.
Math (additive):
``l_t = alpha * (y_t - s_{t-m}) + (1 - alpha) * (l_{t-1} + b_{t-1})``
``b_t = beta * (l_t - l_{t-1}) + (1 - beta) * b_{t-1}``
``s_t = gamma * (y_t - l_t) + (1 - gamma) * s_{t-m}``
Parameters:
data: Time series (must have at least 2 full seasonal cycles).
h: Forecast horizon (default 10).
seasonal: ``"add"`` (default) or ``"mul"`` for additive or
multiplicative seasonality.
seasonal_periods: Number of periods in a season. Auto-detected
from the index frequency if ``None``.
trend: Trend component type: ``"add"`` (default), ``"mul"``,
or ``None`` for no trend.
Returns:
Dictionary with:
- ``forecast``: pd.Series of point forecasts.
- ``params``: dict with ``alpha``, ``beta``, ``gamma``.
- ``fitted_values``: pd.Series of in-sample fitted values.
- ``seasonal_components``: pd.Series of estimated seasonal
factors.
Example:
>>> import pandas as pd, numpy as np
>>> idx = pd.date_range('2020-01-01', periods=120, freq='MS')
>>> seasonal = 10 * np.sin(2 * np.pi * np.arange(120) / 12)
>>> data = pd.Series(
... np.arange(120) * 0.5 + seasonal + np.random.randn(120),
... index=idx,
... )
>>> result = holt_winters_forecast(data, h=12, seasonal_periods=12)
>>> len(result['forecast'])
12
"""
if seasonal_periods is None:
seasonal_periods = _detect_seasonal_period(data)
with warnings.catch_warnings():
warnings.simplefilter("ignore")
model = ExponentialSmoothing(
data,
trend=trend,
seasonal=seasonal,
seasonal_periods=seasonal_periods,
)
fit = model.fit(optimized=True)
fcast = fit.forecast(h)
idx = _make_forecast_index(data, h)
params = {
"alpha": float(fit.params.get("smoothing_level", 0.0)),
"beta": float(fit.params.get("smoothing_trend", 0.0)),
"gamma": float(fit.params.get("smoothing_seasonal", 0.0)),
}
# Extract seasonal component
season_vals = fit.season if hasattr(fit, "season") else np.zeros(len(data))
if season_vals is None:
season_vals = np.zeros(len(data))
from wraquant.core.results import ForecastResult
return ForecastResult(
forecast=pd.Series(
fcast.values, index=idx, name="hw_forecast"
),
fitted_values=fit.fittedvalues.values,
method="holt_winters",
metrics=params,
model=fit,
)
# ---------------------------------------------------------------------------
# forecast_evaluation
# ---------------------------------------------------------------------------
[docs]
def forecast_evaluation(
actual: pd.Series | np.ndarray,
forecast: pd.Series | np.ndarray,
naive_forecast: pd.Series | np.ndarray | None = None,
benchmark_forecast: pd.Series | np.ndarray | None = None,
) -> dict:
"""Comprehensive forecast accuracy metrics.
Computes a wide set of point-forecast accuracy measures used in
the forecasting literature. When a *benchmark_forecast* is
provided the Diebold-Mariano test is run to determine whether the
two forecasts have significantly different accuracy.
Metrics included:
- **RMSE** (Root Mean Squared Error): penalises large errors.
- **MAE** (Mean Absolute Error): robust to outliers.
- **MAPE** (Mean Absolute Percentage Error): scale-independent
but undefined when actuals are zero.
- **SMAPE** (Symmetric MAPE): bounded version of MAPE.
- **MdAPE** (Median Absolute Percentage Error): robust MAPE.
- **MASE** (Mean Absolute Scaled Error): scaled by in-sample
naive error; needs *naive_forecast* or falls back to
``MAE / mean(|diff(actual)|)``.
- **Directional accuracy**: fraction of periods where the
forecast correctly predicts the sign of the change.
- **Diebold-Mariano test**: compares this forecast against
*benchmark_forecast* (two-sided, squared-error loss).
Parameters:
actual: Observed values.
forecast: Predicted values (same length as *actual*).
naive_forecast: Optional naive forecast for MASE scaling.
If ``None``, MASE is computed using the mean absolute
first difference of *actual*.
benchmark_forecast: Optional second forecast for
Diebold-Mariano comparison.
Returns:
Dictionary with:
- ``rmse``, ``mae``, ``mape``, ``smape``, ``mdape``, ``mase``,
``directional_accuracy``.
- ``diebold_mariano``: dict with ``statistic`` and ``p_value``
(only present when *benchmark_forecast* is given).
References:
Diebold, F.X. & Mariano, R.S. (1995). Comparing Predictive
Accuracy. *Journal of Business & Economic Statistics*, 13(3).
Example:
>>> import numpy as np
>>> actual = np.array([1.0, 2.0, 3.0, 4.0, 5.0])
>>> pred = np.array([1.1, 2.2, 2.8, 4.3, 4.7])
>>> metrics = forecast_evaluation(actual, pred)
>>> 'rmse' in metrics
True
"""
a = np.asarray(actual, dtype=float)
f = np.asarray(forecast, dtype=float)
e = a - f
# RMSE
rmse = float(np.sqrt(np.mean(e**2)))
# MAE
mae = float(np.mean(np.abs(e)))
# MAPE (exclude zeros in actual)
nonzero = a != 0
if np.any(nonzero):
mape = float(np.mean(np.abs(e[nonzero] / a[nonzero])) * 100)
else:
mape = float("inf")
# SMAPE
denom = np.abs(a) + np.abs(f)
nonzero_denom = denom != 0
if np.any(nonzero_denom):
smape = float(
np.mean(
2.0 * np.abs(e[nonzero_denom]) / denom[nonzero_denom]
)
* 100
)
else:
smape = 0.0
# MdAPE
if np.any(nonzero):
mdape = float(
np.median(np.abs(e[nonzero] / a[nonzero])) * 100
)
else:
mdape = float("inf")
# MASE
if naive_forecast is not None:
naive_err = np.mean(
np.abs(a - np.asarray(naive_forecast, dtype=float))
)
else:
naive_err = (
np.mean(np.abs(np.diff(a))) if len(a) > 1 else 1.0
)
mase = float(mae / naive_err) if naive_err > 0 else float("inf")
# Directional accuracy
if len(a) > 1:
actual_dir = np.sign(np.diff(a))
fcast_dir = np.sign(np.diff(f))
dir_acc = float(np.mean(actual_dir == fcast_dir) * 100)
else:
dir_acc = float("nan")
result: dict[str, Any] = {
"rmse": rmse,
"mae": mae,
"mape": mape,
"smape": smape,
"mdape": mdape,
"mase": mase,
"directional_accuracy": dir_acc,
}
# Diebold-Mariano test
if benchmark_forecast is not None:
b = np.asarray(benchmark_forecast, dtype=float)
e_bench = a - b
d = e**2 - e_bench**2
d_mean = np.mean(d)
n_dm = len(d)
# Newey-West style variance (lag 0 only for simplicity)
d_var = np.var(d, ddof=1) / n_dm if n_dm > 1 else 1.0
dm_stat = (
float(d_mean / np.sqrt(d_var)) if d_var > 0 else 0.0
)
dm_pval = float(2 * (1 - sp_stats.norm.cdf(abs(dm_stat))))
result["diebold_mariano"] = {
"statistic": dm_stat,
"p_value": dm_pval,
}
return result
# ---------------------------------------------------------------------------
# auto_forecast
# ---------------------------------------------------------------------------
[docs]
def auto_forecast(
data: pd.Series,
h: int = 10,
holdout_pct: float = 0.2,
seasonal_period: int | None = None,
) -> dict:
"""Unified auto-forecasting: try multiple models, pick the best.
Fits several forecasting methods on a training subset, evaluates
them on a held-out test set, and returns the forecast from the
best model re-fitted on the full data. This is the recommended
starting point when you have no prior knowledge about the series
dynamics.
Models tried:
- **SES**: Simple Exponential Smoothing.
- **Holt-Winters (additive)**: trend + additive seasonality.
- **Theta**: Theta method.
- **Seasonal naive**: repeat the last seasonal cycle.
- **Drift**: random walk with drift.
When to use auto_forecast vs manual model selection:
Use ``auto_forecast`` for rapid prototyping, exploratory work,
or when you need a robust default. Switch to manual model
selection (``arima_model_selection``, ``auto_arima``) when you
need fine-grained control, interpretability, or domain-specific
constraints.
Parameters:
data: Time series (at least 30 observations recommended).
h: Forecast horizon (default 10).
holdout_pct: Fraction of data reserved for model evaluation
(default 0.20).
seasonal_period: Seasonal period for Holt-Winters and seasonal
naive. Auto-detected if ``None``.
Returns:
Dictionary with:
- ``best_model``: name of the winning model.
- ``forecast``: pd.Series of point forecasts from the best
model (re-fitted on full data).
- ``confidence_intervals``: dict with ``lower`` and ``upper``
pd.Series (approximate, based on residual std).
- ``model_comparison``: pd.DataFrame with RMSE, MAE, MAPE per
model.
- ``residual_diagnostics``: dict with ``mean``, ``std``,
``ljung_box_pvalue`` of residuals from the best model.
Example:
>>> import pandas as pd, numpy as np
>>> data = pd.Series(np.cumsum(np.random.randn(200)) + 100)
>>> result = auto_forecast(data, h=10)
>>> result['best_model'] in [
... 'ses', 'holt_winters', 'theta', 'seasonal_naive', 'drift',
... ]
True
"""
n = len(data)
split = int(n * (1 - holdout_pct))
train, test = data.iloc[:split], data.iloc[split:]
test_h = len(test)
if seasonal_period is None:
seasonal_period = _detect_seasonal_period(data)
# ------------------------------------------------------------------
# Candidate model functions
# ------------------------------------------------------------------
def _try_ses(tr: pd.Series, fh: int) -> np.ndarray:
return ses_forecast(tr, h=fh)["forecast"].values
def _try_hw(tr: pd.Series, fh: int) -> np.ndarray:
try:
return holt_winters_forecast(
tr, h=fh, seasonal_periods=seasonal_period
)["forecast"].values
except Exception: # noqa: BLE001
return np.full(fh, np.nan)
def _try_theta(tr: pd.Series, fh: int) -> np.ndarray:
return theta_forecast(tr, h=fh)["forecast"].values
def _try_snaive(tr: pd.Series, fh: int) -> np.ndarray:
return _naive_seasonal_forecast(tr, fh, seasonal_period)
candidates: dict[str, Callable] = {
"ses": _try_ses,
"holt_winters": _try_hw,
"theta": _try_theta,
"seasonal_naive": _try_snaive,
"drift": _drift_forecast,
}
# Evaluate on hold-out
rows: list[dict] = []
for name, fn in candidates.items():
with warnings.catch_warnings():
warnings.simplefilter("ignore")
pred = fn(train, test_h)
if np.any(np.isnan(pred)):
rows.append(
{
"model": name,
"rmse": float("inf"),
"mae": float("inf"),
"mape": float("inf"),
}
)
continue
metrics = forecast_evaluation(test.values, pred)
rows.append(
{
"model": name,
"rmse": metrics["rmse"],
"mae": metrics["mae"],
"mape": metrics["mape"],
}
)
comparison = pd.DataFrame(rows).sort_values("rmse").reset_index(
drop=True
)
best_name = str(comparison.iloc[0]["model"])
# Re-fit best model on full data
with warnings.catch_warnings():
warnings.simplefilter("ignore")
best_fcast = candidates[best_name](data, h)
idx = _make_forecast_index(data, h)
fcast_series = pd.Series(
best_fcast, index=idx, name="auto_forecast"
)
# Approximate confidence intervals from first-difference std
resid_std = float(np.std(data.values[1:] - data.values[:-1]))
z = 1.96
ci_lower = pd.Series(
best_fcast - z * resid_std * np.sqrt(np.arange(1, h + 1)),
index=idx,
name="lower",
)
ci_upper = pd.Series(
best_fcast + z * resid_std * np.sqrt(np.arange(1, h + 1)),
index=idx,
name="upper",
)
# Residual diagnostics for best model
with warnings.catch_warnings():
warnings.simplefilter("ignore")
in_sample_pred = candidates[best_name](data, len(data))
residuals = data.values - in_sample_pred[: len(data)]
residuals = residuals[~np.isnan(residuals)]
lb_pval = float("nan")
if len(residuals) > 10:
try:
lb = acorr_ljungbox(residuals, lags=10, return_df=True)
lb_pval = float(lb.iloc[-1]["lb_pvalue"])
except Exception: # noqa: BLE001
pass
from wraquant.core.results import ForecastResult
return ForecastResult(
forecast=fcast_series,
confidence_lower=ci_lower.values,
confidence_upper=ci_upper.values,
method=best_name,
fitted_values=in_sample_pred[: len(data)],
residuals=residuals,
metrics={
"best_model": best_name,
"residual_mean": (
float(np.mean(residuals))
if len(residuals) > 0
else float("nan")
),
"residual_std": (
float(np.std(residuals))
if len(residuals) > 0
else float("nan")
),
"ljung_box_pvalue": lb_pval,
},
model=comparison,
)
# ---------------------------------------------------------------------------
# ensemble_forecast
# ---------------------------------------------------------------------------
[docs]
def ensemble_forecast(
data: pd.Series,
h: int = 10,
methods: Sequence[str] | None = None,
holdout_pct: float = 0.2,
seasonal_period: int | None = None,
) -> dict:
"""Combine multiple forecasting models via inverse-RMSE weighting.
Ensemble forecasting often outperforms any single model because it
diversifies model risk. Weights are proportional to the inverse
RMSE on a held-out validation set, so better-performing models
contribute more.
When to use:
- When no single model clearly dominates.
- For production pipelines where robustness matters more than
interpretability.
Parameters:
data: Time series to forecast.
h: Forecast horizon (default 10).
methods: List of method names to include. Supported:
``"ses"``, ``"theta"``, ``"holt_winters"``,
``"drift"``. If ``None``, uses all four.
holdout_pct: Fraction of data for validation (default 0.20).
seasonal_period: Seasonal period for Holt-Winters.
Auto-detected if ``None``.
Returns:
Dictionary with:
- ``forecast``: pd.Series of weighted-average forecasts.
- ``weights``: dict mapping method name to its weight.
- ``individual_forecasts``: dict mapping method name to
pd.Series forecast.
- ``rmse_per_model``: dict mapping method name to its
hold-out RMSE.
Example:
>>> import pandas as pd, numpy as np
>>> data = pd.Series(np.cumsum(np.random.randn(200)) + 100)
>>> result = ensemble_forecast(data, h=10)
>>> abs(sum(result['weights'].values()) - 1.0) < 1e-6
True
"""
if methods is None:
methods = ["ses", "theta", "holt_winters", "drift"]
if seasonal_period is None:
seasonal_period = _detect_seasonal_period(data)
n = len(data)
split = int(n * (1 - holdout_pct))
train, test = data.iloc[:split], data.iloc[split:]
test_h = len(test)
method_fns: dict[str, Callable] = {
"ses": lambda tr, fh: ses_forecast(tr, h=fh)["forecast"].values,
"theta": lambda tr, fh: theta_forecast(tr, h=fh)[
"forecast"
].values,
"holt_winters": lambda tr, fh: _safe_hw(
tr, fh, seasonal_period
),
"drift": _drift_forecast,
}
rmses: dict[str, float] = {}
for name in methods:
if name not in method_fns:
continue
with warnings.catch_warnings():
warnings.simplefilter("ignore")
try:
pred = method_fns[name](train, test_h)
if np.any(np.isnan(pred)):
rmses[name] = float("inf")
else:
rmses[name] = float(
np.sqrt(np.mean((test.values - pred) ** 2))
)
except Exception: # noqa: BLE001
rmses[name] = float("inf")
# Inverse-RMSE weights (exclude inf)
finite_rmses = {
k: v for k, v in rmses.items() if np.isfinite(v) and v > 0
}
if not finite_rmses:
finite_rmses = {k: 1.0 for k in rmses}
inv = {k: 1.0 / v for k, v in finite_rmses.items()}
total_inv = sum(inv.values())
weights = {k: v / total_inv for k, v in inv.items()}
# Generate full-data forecasts
idx = _make_forecast_index(data, h)
individual: dict[str, pd.Series] = {}
combined = np.zeros(h)
for name, w in weights.items():
with warnings.catch_warnings():
warnings.simplefilter("ignore")
pred = method_fns[name](data, h)
s = pd.Series(pred, index=idx, name=name)
individual[name] = s
combined += w * pred
return {
"forecast": pd.Series(
combined, index=idx, name="ensemble_forecast"
),
"weights": weights,
"individual_forecasts": individual,
"rmse_per_model": rmses,
}
# ---------------------------------------------------------------------------
# rolling_forecast
# ---------------------------------------------------------------------------
[docs]
def rolling_forecast(
data: pd.Series,
forecast_fn: Callable[[pd.Series, int], np.ndarray | pd.Series],
h: int = 1,
initial_window: int | None = None,
step: int = 1,
) -> dict:
"""Walk-forward (expanding window) out-of-sample forecasting.
At each step the model is re-fitted on all data up to time *t*,
then forecasts *h* steps ahead. The result is a time series of
true out-of-sample predictions that can be compared against
actuals.
This is the gold standard for evaluating forecasting models
because it mimics how the model would be used in production.
Parameters:
data: Full time series.
forecast_fn: Callable ``(train_series, h) -> array-like``
that returns *h* point forecasts.
h: Forecast horizon at each step (default 1).
initial_window: Minimum training window size. Defaults to
``max(30, len(data) // 3)``.
step: Number of observations to advance between re-fits
(default 1).
Returns:
Dictionary with:
- ``forecasts``: pd.Series of out-of-sample predictions.
- ``actuals``: pd.Series of corresponding actual values.
- ``errors``: pd.Series of forecast errors.
- ``cumulative_metrics``: dict with cumulative RMSE, MAE.
Example:
>>> import pandas as pd, numpy as np
>>> data = pd.Series(np.cumsum(np.random.randn(100)) + 50)
>>> def my_model(train, h):
... return np.full(h, train.iloc[-1])
>>> result = rolling_forecast(
... data, my_model, h=1, initial_window=50,
... )
>>> len(result['forecasts']) > 0
True
"""
n = len(data)
if initial_window is None:
initial_window = max(30, n // 3)
fcast_vals: list[float] = []
actual_vals: list[float] = []
fcast_idx: list = []
t = initial_window
while t + h <= n:
train = data.iloc[:t]
with warnings.catch_warnings():
warnings.simplefilter("ignore")
try:
pred = forecast_fn(train, h)
pred = np.asarray(pred).ravel()
fcast_vals.append(float(pred[-1])) # h-step-ahead
except Exception: # noqa: BLE001
fcast_vals.append(float("nan"))
actual_vals.append(float(data.iloc[t + h - 1]))
fcast_idx.append(data.index[t + h - 1])
t += step
forecasts = pd.Series(
fcast_vals, index=fcast_idx, name="rolling_forecast"
)
actuals = pd.Series(
actual_vals, index=fcast_idx, name="actuals"
)
errors = actuals - forecasts
valid = ~np.isnan(errors.values)
cum_rmse = (
float(np.sqrt(np.mean(errors.values[valid] ** 2)))
if np.any(valid)
else float("nan")
)
cum_mae = (
float(np.mean(np.abs(errors.values[valid])))
if np.any(valid)
else float("nan")
)
return {
"forecasts": forecasts,
"actuals": actuals,
"errors": errors,
"cumulative_metrics": {"rmse": cum_rmse, "mae": cum_mae},
}
# ---------------------------------------------------------------------------
# garch_residual_forecast — ts/ ↔ vol/ integration
# ---------------------------------------------------------------------------
[docs]
def garch_residual_forecast(
returns: pd.Series,
vol_model: str = "GARCH",
forecast_method: str = "ses",
horizon: int = 10,
garch_p: int = 1,
garch_q: int = 1,
) -> dict:
"""Forecast returns using GARCH-filtered residuals.
This function chains the ``vol`` and ``ts`` modules together:
1. Fit a GARCH model (from ``wraquant.vol.models``) to extract
conditional volatility and standardised residuals.
2. Forecast the standardised residuals (which should be approximately
iid) using a classical time-series method (SES, Theta, or Holt-
Winters from ``wraquant.ts.forecasting``).
3. Forecast conditional volatility forward using the GARCH model.
4. Recombine: ``return_forecast = residual_forecast * vol_forecast``.
The key insight is that raw financial returns are not iid (they
exhibit volatility clustering). GARCH filtering removes the
heteroskedasticity, producing residuals that classical forecasting
methods can handle. The recombination step re-introduces the
time-varying volatility into the forecast.
Chains: ``vol.garch_fit`` -> ``ts.ses_forecast`` (or other) ->
``vol.garch_forecast`` -> recombine.
Parameters:
returns: Return series (daily or intraday).
vol_model: GARCH variant to use for filtering. Options:
``"GARCH"``, ``"EGARCH"``, ``"GJR"``.
forecast_method: Method for forecasting the standardised
residuals. Options: ``"ses"`` (default), ``"theta"``,
``"holt_winters"``.
horizon: Number of periods to forecast ahead (default 10).
garch_p: GARCH lag order (default 1).
garch_q: ARCH lag order (default 1).
Returns:
Dictionary containing:
- ``'return_forecast'`` (*np.ndarray*) -- Forecasted returns
(residual_forecast * vol_forecast), shape ``(horizon,)``.
- ``'vol_forecast'`` (*np.ndarray*) -- GARCH conditional
volatility forecast, shape ``(horizon,)``.
- ``'residual_forecast'`` (*np.ndarray*) -- Forecasted
standardised residuals, shape ``(horizon,)``.
- ``'standardized_residuals'`` (*np.ndarray*) -- In-sample
standardised residuals from the GARCH fit.
- ``'garch_params'`` (*dict*) -- Fitted GARCH parameters.
Example:
>>> import pandas as pd, numpy as np
>>> np.random.seed(42)
>>> returns = pd.Series(np.random.randn(500) * 0.01)
>>> result = garch_residual_forecast(returns, horizon=5)
>>> len(result['return_forecast'])
5
>>> len(result['vol_forecast'])
5
Notes:
This approach is related to the "GARCH-in-mean" literature but
separates the volatility and level dynamics for modularity.
See Also:
wraquant.vol.models.garch_fit: GARCH model fitting.
wraquant.vol.models.garch_forecast: GARCH volatility forecasting.
ses_forecast: SES for residual forecasting.
theta_forecast: Theta method for residual forecasting.
"""
from wraquant.vol.models import garch_fit, garch_forecast
# Step 1: Fit GARCH to get conditional vol and standardised residuals
fit_result = garch_fit(returns, p=garch_p, q=garch_q)
std_resid = fit_result["standardized_residuals"]
garch_params = fit_result["params"]
# Step 2: Forecast standardised residuals with a classical method
# Clean residuals for forecasting
resid_clean = std_resid[np.isfinite(std_resid)]
resid_series = pd.Series(resid_clean)
forecast_fns = {
"ses": lambda s, h: ses_forecast(s, h=h)["forecast"].values,
"theta": lambda s, h: theta_forecast(s, h=h)["forecast"].values,
"holt_winters": lambda s, h: holt_winters_forecast(s, h=h)[
"forecast"
].values,
}
fn = forecast_fns.get(forecast_method)
if fn is None:
raise ValueError(
f"forecast_method must be one of {list(forecast_fns)}, "
f"got {forecast_method!r}"
)
with warnings.catch_warnings():
warnings.simplefilter("ignore")
residual_fcast = fn(resid_series, horizon)
# Step 3: Forecast conditional volatility using GARCH
vol_fcast_result = garch_forecast(returns, horizon=horizon, p=garch_p, q=garch_q)
vol_fcast = np.asarray(vol_fcast_result["forecast_volatility"])[:horizon]
# Step 4: Recombine — return = residual * conditional_vol
return_fcast = residual_fcast[:horizon] * vol_fcast[:horizon]
return {
"return_forecast": return_fcast,
"vol_forecast": vol_fcast,
"residual_forecast": residual_fcast[:horizon],
"standardized_residuals": std_resid,
"garch_params": garch_params,
}