Source code for wraquant.ts.stochastic

"""Stochastic process forecasting for financial time series.

Provides parameter estimation and forecasting for continuous-time
stochastic processes commonly used in quantitative finance:

- **Ornstein-Uhlenbeck (OU)**: mean-reverting diffusion for spreads,
  rates, and volatility.
- **Merton Jump-Diffusion**: GBM with Poisson jumps for fat-tailed
  asset returns.
- **Regime-Switching**: separate models per hidden regime, blended by
  regime probabilities.
- **Vector Autoregression (VAR)**: multi-asset linear forecasting with
  impulse response and variance decomposition.
"""

from __future__ import annotations

import warnings
from typing import Any, Callable, Sequence

import numpy as np
import pandas as pd
from scipy import optimize, stats as sp_stats

from wraquant.core.decorators import requires_extra


# ---------------------------------------------------------------------------
# ornstein_uhlenbeck_forecast
# ---------------------------------------------------------------------------


[docs] def ornstein_uhlenbeck_forecast( data: pd.Series, h: int = 10, dt: float | None = None, n_simulations: int = 1000, confidence_level: float = 0.95, ) -> dict: """Forecast using an Ornstein-Uhlenbeck (OU) mean-reverting process. The OU process is the continuous-time analogue of an AR(1) process and is the workhorse model for mean-reverting financial quantities. Use for mean-reverting assets like spreads, interest rates, volatility (VIX), pairs-trading residuals, or any series that fluctuates around a long-run equilibrium. SDE: ``dX_t = theta * (mu - X_t) * dt + sigma * dW_t`` where: - ``theta`` > 0: speed of mean reversion (higher = faster). - ``mu``: long-run mean level. - ``sigma``: volatility of the diffusion. - ``W_t``: standard Brownian motion. The **half-life** of mean reversion is ``ln(2) / theta``, giving the expected time for a deviation to halve. Parameter estimation: Parameters are estimated via Maximum Likelihood on the discrete-time transition density, which is Gaussian: ``X_{t+dt} | X_t ~ N(mu + (X_t - mu) * exp(-theta*dt), sigma^2 / (2*theta) * (1 - exp(-2*theta*dt)))`` Parameters: data: Time series of observed values. h: Number of steps to forecast (default 10). dt: Time step between observations. If ``None``, inferred from the index (1/252 for business days, 1.0 otherwise). n_simulations: Number of Monte Carlo paths for confidence bands (default 1000). confidence_level: Confidence level for intervals (default 0.95). Returns: Dictionary with: - ``params``: dict with ``theta``, ``mu``, ``sigma``. - ``half_life``: half-life of mean reversion (in observation units). - ``forecast``: pd.Series of expected path (conditional mean). - ``confidence_intervals``: dict with ``lower`` and ``upper`` pd.Series. Example: >>> import pandas as pd, numpy as np >>> rng = np.random.default_rng(42) >>> # Simulate OU process >>> n, theta, mu, sigma = 500, 5.0, 100.0, 2.0 >>> x = np.zeros(n); x[0] = 100.0 >>> dt = 1 / 252 >>> for i in range(1, n): ... x[i] = x[i-1] + theta * (mu - x[i-1]) * dt + sigma * np.sqrt(dt) * rng.normal() >>> data = pd.Series(x) >>> result = ornstein_uhlenbeck_forecast(data, h=20) >>> result['half_life'] > 0 True References: Uhlenbeck, G.E. & Ornstein, L.S. (1930). On the theory of Brownian motion. *Physical Review*, 36(5), 823. """ y = data.values.astype(float) n = len(y) # Infer dt if dt is None: if isinstance(data.index, pd.DatetimeIndex) and len(data.index) > 1: freq = pd.infer_freq(data.index) if freq is not None and freq in ("B", "D"): dt = 1.0 / 252.0 else: median_diff = np.median( np.diff(data.index.astype(np.int64)) / 1e9 / 86400 ) dt = float(median_diff) / 252.0 if median_diff > 0 else 1.0 else: dt = 1.0 # MLE estimation via discrete-time transition density # X_{t+1} | X_t ~ N(X_t * exp(-theta*dt) + mu*(1-exp(-theta*dt)), # sigma^2/(2*theta)*(1-exp(-2*theta*dt))) x_prev = y[:-1] x_next = y[1:] def neg_log_lik(params: np.ndarray) -> float: th, m, s = params if th <= 0 or s <= 0: return 1e12 exp_neg = np.exp(-th * dt) mean = x_prev * exp_neg + m * (1 - exp_neg) var = (s**2) / (2 * th) * (1 - np.exp(-2 * th * dt)) if var <= 0: return 1e12 residuals = x_next - mean nll = 0.5 * np.sum( np.log(2 * np.pi * var) + residuals**2 / var ) return float(nll) # Initial guesses from AR(1) regression slope, intercept = np.polyfit(x_prev, x_next, 1) phi = max(min(slope, 0.9999), 0.0001) theta0 = -np.log(phi) / dt mu0 = intercept / (1 - phi) resid_var = np.var(x_next - (slope * x_prev + intercept)) sigma0 = np.sqrt( max(resid_var * 2 * theta0 / (1 - np.exp(-2 * theta0 * dt)), 1e-8) ) with warnings.catch_warnings(): warnings.simplefilter("ignore") result = optimize.minimize( neg_log_lik, x0=[theta0, mu0, sigma0], method="Nelder-Mead", options={"maxiter": 5000, "xatol": 1e-8, "fatol": 1e-8}, ) theta_hat, mu_hat, sigma_hat = result.x theta_hat = max(theta_hat, 1e-6) sigma_hat = max(sigma_hat, 1e-8) half_life = np.log(2) / theta_hat # Conditional mean forecast x_last = y[-1] steps = np.arange(1, h + 1) exp_decay = np.exp(-theta_hat * dt * steps) mean_fcast = mu_hat + (x_last - mu_hat) * exp_decay # Conditional variance var_fcast = ( sigma_hat**2 / (2 * theta_hat) * (1 - np.exp(-2 * theta_hat * dt * steps)) ) std_fcast = np.sqrt(np.maximum(var_fcast, 0)) z = sp_stats.norm.ppf(0.5 + confidence_level / 2) # Build forecast index if isinstance(data.index, pd.DatetimeIndex) and data.index.freq is not None: idx = pd.date_range( start=data.index[-1] + data.index.freq, periods=h, freq=data.index.freq, ) elif isinstance(data.index, pd.DatetimeIndex) and len(data.index) > 1: freq = pd.infer_freq(data.index) if freq is not None: idx = pd.date_range( start=data.index[-1] + pd.tseries.frequencies.to_offset(freq), periods=h, freq=freq, ) else: idx = pd.RangeIndex(h) else: last = data.index[-1] if isinstance(last, (int, np.integer)): idx = pd.RangeIndex(start=last + 1, stop=last + 1 + h) else: idx = pd.RangeIndex(h) return { "params": { "theta": float(theta_hat), "mu": float(mu_hat), "sigma": float(sigma_hat), }, "half_life": float(half_life), "forecast": pd.Series(mean_fcast, index=idx, name="ou_forecast"), "confidence_intervals": { "lower": pd.Series( mean_fcast - z * std_fcast, index=idx, name="lower" ), "upper": pd.Series( mean_fcast + z * std_fcast, index=idx, name="upper" ), }, }
# --------------------------------------------------------------------------- # jump_diffusion_forecast # ---------------------------------------------------------------------------
[docs] def jump_diffusion_forecast( data: pd.Series, h: int = 10, dt: float | None = None, n_paths: int = 1000, confidence_level: float = 0.95, seed: int | None = None, ) -> dict: """Merton jump-diffusion model forecast via Monte Carlo simulation. Extends geometric Brownian motion (GBM) by adding Poisson-driven jumps, capturing the fat tails and sudden moves observed in real asset returns. SDE: ``dS/S = (mu - lambda*k) * dt + sigma * dW + J * dN`` where: - ``mu``: drift of the diffusion component. - ``sigma``: diffusion volatility. - ``lambda``: jump intensity (expected jumps per unit time). - ``J``: jump size, ``ln(1+J) ~ N(mu_j, sigma_j^2)``. - ``N_t``: Poisson process with intensity ``lambda``. - ``k = exp(mu_j + sigma_j^2/2) - 1``. Parameter estimation: A moment-matching approach is used on log-returns: - Jump intensity estimated from excess kurtosis. - Jump mean and variance from the non-Gaussian tail structure. Parameters: data: Price series (positive values). h: Forecast horizon in steps (default 10). dt: Time step between observations (default 1/252 for daily). n_paths: Number of Monte Carlo simulation paths (default 1000). confidence_level: Confidence level for intervals (default 0.95). seed: Random seed for reproducibility. Returns: Dictionary with: - ``params``: dict with ``mu``, ``sigma``, ``lambda_``, ``mu_j``, ``sigma_j``. - ``forecast_paths``: np.ndarray of shape ``(n_paths, h)`` with simulated price paths. - ``mean_forecast``: pd.Series of mean across paths. - ``confidence_intervals``: dict with ``lower`` and ``upper`` pd.Series. Example: >>> import pandas as pd, numpy as np >>> prices = pd.Series(100 * np.exp(np.cumsum(np.random.randn(200) * 0.01))) >>> result = jump_diffusion_forecast(prices, h=10, n_paths=500, seed=42) >>> result['forecast_paths'].shape (500, 10) References: Merton, R.C. (1976). Option pricing when underlying stock returns are discontinuous. *Journal of Financial Economics*, 3(1-2), 125-144. """ rng = np.random.default_rng(seed) prices = data.values.astype(float) log_returns = np.diff(np.log(prices)) n = len(log_returns) if dt is None: dt = 1.0 / 252.0 # Moment-matching estimation mu_total = float(np.mean(log_returns) / dt) sigma_total = float(np.std(log_returns) / np.sqrt(dt)) kurt = float(sp_stats.kurtosis(log_returns, fisher=True)) skew_val = float(sp_stats.skew(log_returns)) # Estimate jump parameters from excess kurtosis # If kurtosis > 0, attribute excess to jumps if kurt > 0.5: lambda_ = max(kurt / (dt * 10), 0.5) # jump intensity sigma_j = max(abs(skew_val) * 0.1, 0.01) mu_j = -sigma_j**2 / 2 # risk-neutral adjustment sigma_diff = max( np.sqrt(max(sigma_total**2 - lambda_ * sigma_j**2, 1e-8)), sigma_total * 0.5, ) else: lambda_ = 0.5 mu_j = 0.0 sigma_j = 0.01 sigma_diff = sigma_total k = np.exp(mu_j + sigma_j**2 / 2) - 1 mu_diff = mu_total + lambda_ * k # Monte Carlo simulation s0 = prices[-1] paths = np.zeros((n_paths, h)) for i in range(n_paths): s = s0 for t_step in range(h): # Diffusion dw = rng.normal(0, np.sqrt(dt)) # Jumps n_jumps = rng.poisson(lambda_ * dt) jump = 0.0 if n_jumps > 0: jump = np.sum(rng.normal(mu_j, sigma_j, n_jumps)) log_return = ( (mu_diff - 0.5 * sigma_diff**2 - lambda_ * k) * dt + sigma_diff * dw + jump ) s = s * np.exp(log_return) paths[i, t_step] = s # Summary statistics mean_path = np.mean(paths, axis=0) alpha_q = (1 - confidence_level) / 2 lower = np.quantile(paths, alpha_q, axis=0) upper = np.quantile(paths, 1 - alpha_q, axis=0) # Build forecast index if isinstance(data.index, pd.DatetimeIndex) and data.index.freq is not None: idx = pd.date_range( start=data.index[-1] + data.index.freq, periods=h, freq=data.index.freq, ) elif isinstance(data.index, pd.DatetimeIndex) and len(data.index) > 1: freq = pd.infer_freq(data.index) if freq is not None: idx = pd.date_range( start=data.index[-1] + pd.tseries.frequencies.to_offset(freq), periods=h, freq=freq, ) else: idx = pd.RangeIndex(h) else: last = data.index[-1] if isinstance(last, (int, np.integer)): idx = pd.RangeIndex(start=last + 1, stop=last + 1 + h) else: idx = pd.RangeIndex(h) return { "params": { "mu": float(mu_diff), "sigma": float(sigma_diff), "lambda_": float(lambda_), "mu_j": float(mu_j), "sigma_j": float(sigma_j), }, "forecast_paths": paths, "mean_forecast": pd.Series( mean_path, index=idx, name="jd_mean_forecast" ), "confidence_intervals": { "lower": pd.Series(lower, index=idx, name="lower"), "upper": pd.Series(upper, index=idx, name="upper"), }, }
# --------------------------------------------------------------------------- # regime_switching_forecast # ---------------------------------------------------------------------------
[docs] def regime_switching_forecast( data: pd.Series, n_regimes: int = 2, h: int = 10, ) -> dict: """Forecast using a Markov regime-switching model. Fits a Markov-switching autoregression where the series parameters (mean, variance) switch between discrete hidden regimes. Forecasts are blended across regimes weighted by the predicted regime probabilities. When to use: - Series that alternate between distinct states (e.g., bull/bear markets, high/low volatility regimes). - When a single-model forecast is inadequate because the data generation process changes over time. Parameters: data: Time series to forecast. n_regimes: Number of hidden regimes (default 2). h: Forecast horizon (default 10). Returns: Dictionary with: - ``forecast``: pd.Series of blended point forecasts. - ``regime_forecasts``: dict mapping regime index to its pd.Series forecast. - ``regime_probs``: np.ndarray of shape ``(h, n_regimes)`` with predicted regime probabilities. - ``blended``: same as ``forecast`` (for clarity). Example: >>> import pandas as pd, numpy as np >>> rng = np.random.default_rng(42) >>> # Bull/bear regime data >>> regimes = np.concatenate([rng.normal(0.1, 0.5, 150), ... rng.normal(-0.05, 1.0, 150)]) >>> data = pd.Series(np.cumsum(regimes)) >>> result = regime_switching_forecast(data, n_regimes=2, h=10) >>> result['regime_probs'].shape[1] 2 References: Hamilton, J.D. (1989). A New Approach to the Economic Analysis of Nonstationary Time Series and the Business Cycle. *Econometrica*, 57(2), 357-384. """ from statsmodels.tsa.regime_switching.markov_autoregression import ( MarkovAutoregression, ) y = data.values.astype(float) with warnings.catch_warnings(): warnings.simplefilter("ignore") model = MarkovAutoregression( data, k_regimes=n_regimes, order=1, switching_ar=False ) fit = model.fit(maxiter=200, disp=False) # Smoothed regime probabilities at the last observation smoothed = fit.smoothed_marginal_probabilities last_probs = smoothed.iloc[-1].values # shape (n_regimes,) # Transition matrix trans_mat = fit.regime_transition # shape (n_regimes, n_regimes) # In statsmodels, regime_transition[i, j] = P(regime j at t+1 | regime i at t) # We need P(regime at t+1) = trans_mat.T @ P(regime at t) # Regime-specific parameters regime_means = np.array( [fit.params[f"const[{k}]"] for k in range(n_regimes)] ) # AR coefficient (non-switching) ar_coef = fit.params.get("x1", 0.0) if isinstance(ar_coef, (pd.Series, np.ndarray)): ar_coef = float(ar_coef) # Forecast regime_probs = np.zeros((h, n_regimes)) current_probs = last_probs.copy() last_val = y[-1] regime_fcasts = np.zeros((n_regimes, h)) for step in range(h): # Update regime probabilities current_probs = trans_mat @ current_probs current_probs = current_probs / current_probs.sum() regime_probs[step] = current_probs for k in range(n_regimes): if step == 0: regime_fcasts[k, step] = regime_means[k] + ar_coef * last_val else: regime_fcasts[k, step] = ( regime_means[k] + ar_coef * regime_fcasts[k, step - 1] ) # Blended forecast blended = np.sum(regime_fcasts * regime_probs.T, axis=0) # Build index if isinstance(data.index, pd.DatetimeIndex) and data.index.freq is not None: idx = pd.date_range( start=data.index[-1] + data.index.freq, periods=h, freq=data.index.freq, ) elif isinstance(data.index, pd.DatetimeIndex) and len(data.index) > 1: freq = pd.infer_freq(data.index) if freq is not None: idx = pd.date_range( start=data.index[-1] + pd.tseries.frequencies.to_offset(freq), periods=h, freq=freq, ) else: idx = pd.RangeIndex(h) else: last_idx = data.index[-1] if isinstance(last_idx, (int, np.integer)): idx = pd.RangeIndex(start=last_idx + 1, stop=last_idx + 1 + h) else: idx = pd.RangeIndex(h) regime_forecast_dict = { k: pd.Series( regime_fcasts[k], index=idx, name=f"regime_{k}_forecast" ) for k in range(n_regimes) } fcast_series = pd.Series(blended, index=idx, name="regime_forecast") return { "forecast": fcast_series, "regime_forecasts": regime_forecast_dict, "regime_probs": regime_probs, "blended": fcast_series, }
# --------------------------------------------------------------------------- # var_forecast # ---------------------------------------------------------------------------
[docs] def var_forecast( data: pd.DataFrame, h: int = 10, maxlags: int | None = None, ic: str = "aic", ) -> dict: """Vector Autoregression (VAR) for multi-asset forecasting. VAR models each variable as a linear function of its own lags and the lags of all other variables. This captures lead-lag relationships (e.g., one asset predicting another) and provides impulse response functions (IRF) and forecast error variance decomposition (FEVD). When to use: - Forecasting multiple related time series simultaneously (e.g., a portfolio of assets, yield curve factors). - Analysing dynamic interactions: which variables Granger-cause which. - Computing impulse responses: how a shock to one variable propagates to others. Math: ``Y_t = c + A_1 * Y_{t-1} + ... + A_p * Y_{t-p} + e_t`` where ``Y_t`` is a (k x 1) vector, ``A_i`` are (k x k) coefficient matrices, and ``e_t ~ N(0, Sigma)``. Parameters: data: DataFrame with multiple columns (one per variable). Should be stationary (difference if needed). h: Forecast horizon (default 10). maxlags: Maximum lag order to consider. If ``None``, uses ``12 * (nobs/100)^(1/4)`` rule of thumb. ic: Information criterion for lag selection: ``"aic"``, ``"bic"``, ``"hqic"``, ``"fpe"`` (default ``"aic"``). Returns: Dictionary with: - ``forecast``: pd.DataFrame of point forecasts. - ``irf``: impulse response function results (dict of DataFrames keyed by shocked variable). - ``fevd``: forecast error variance decomposition (dict of DataFrames keyed by target variable). - ``granger_causality``: dict of p-values for Granger causality tests. - ``lag_order``: selected lag order. Example: >>> import pandas as pd, numpy as np >>> rng = np.random.default_rng(42) >>> n = 200 >>> x1 = np.cumsum(rng.normal(0, 1, n)) >>> x2 = 0.3 * np.roll(x1, 1) + rng.normal(0, 1, n) >>> data = pd.DataFrame({'x1': np.diff(x1), 'x2': np.diff(x2)}) >>> result = var_forecast(data, h=5) >>> result['forecast'].shape[0] 5 References: Lutkepohl, H. (2005). *New Introduction to Multiple Time Series Analysis*. Springer. """ from statsmodels.tsa.api import VAR as StatsVAR var_model = StatsVAR(data) if maxlags is None: maxlags = max(int(12 * (len(data) / 100) ** 0.25), 1) with warnings.catch_warnings(): warnings.simplefilter("ignore") fit = var_model.fit(maxlags=maxlags, ic=ic) lag_order = fit.k_ar columns = list(data.columns) # Forecast fcast = fit.forecast(data.values[-lag_order:], steps=h) if isinstance(data.index, pd.DatetimeIndex) and data.index.freq is not None: idx = pd.date_range( start=data.index[-1] + data.index.freq, periods=h, freq=data.index.freq, ) else: idx = pd.RangeIndex(h) forecast_df = pd.DataFrame(fcast, columns=columns, index=idx) # Impulse Response Functions irf_result = fit.irf(periods=h) irf_dict: dict[str, pd.DataFrame] = {} for i, col in enumerate(columns): irf_dict[col] = pd.DataFrame( irf_result.irfs[:, :, i], columns=columns, index=np.arange(h + 1), ) # Forecast Error Variance Decomposition fevd_result = fit.fevd(periods=h) fevd_dict: dict[str, pd.DataFrame] = {} decomp = fevd_result.decomp n_periods_fevd = decomp.shape[0] for i, col in enumerate(columns): fevd_dict[col] = pd.DataFrame( decomp[:, i, :], columns=columns, index=np.arange(1, n_periods_fevd + 1), ) # Granger causality tests gc_dict: dict[str, float] = {} for caused in columns: for causing in columns: if caused == causing: continue key = f"{causing} -> {caused}" try: with warnings.catch_warnings(): warnings.simplefilter("ignore") gc_test = fit.test_causality( caused, [causing], kind="f" ) gc_dict[key] = float(gc_test.pvalue) except Exception: # noqa: BLE001 gc_dict[key] = float("nan") return { "forecast": forecast_df, "irf": irf_dict, "fevd": fevd_dict, "granger_causality": gc_dict, "lag_order": lag_order, }