"""Time series econometrics.
Provides Vector Autoregression (VAR), Vector Error Correction Models (VECM),
Granger causality tests, impulse response functions, forecast error variance
decomposition, and structural break tests. These are the core multivariate
time series tools used in macrofinance and empirical asset pricing
(Campbell-Lo-MacKinlay ch. 11; Hamilton ch. 11-19; Lutkepohl, 2005).
"""
from __future__ import annotations
from typing import Any
import numpy as np
import pandas as pd
from scipy import stats as sp_stats
[docs]
def var_model(
data: pd.DataFrame | np.ndarray,
max_lags: int | None = None,
ic: str = "aic",
) -> dict[str, Any]:
"""Fit a Vector Autoregression (VAR) model.
Selects lag order by information criterion and estimates the reduced-form
VAR by equation-by-equation OLS.
Parameters:
data: Multivariate time series (T, k). Columns are treated as
endogenous variables.
max_lags: Maximum lag order to consider. Defaults to
``int(12 * (T / 100) ** (1/4))``.
ic: Information criterion for lag selection -- ``"aic"``, ``"bic"``,
``"hqic"``, or ``"fpe"``.
Returns:
Dictionary with ``coefficients`` (k x k*p + 1 matrix where the last
column is the intercept), ``lag_order``, ``residuals`` (T-p, k),
``sigma_u`` (innovation covariance), ``aic``, ``bic``, ``fittedvalues``,
and a ``forecast`` callable ``(steps) -> np.ndarray``.
"""
from statsmodels.tsa.api import VAR as _VAR
if isinstance(data, np.ndarray):
data = pd.DataFrame(data)
model = _VAR(data)
if max_lags is None:
T = len(data)
max_lags = int(12 * (T / 100) ** 0.25)
max_lags = max(max_lags, 1)
result = model.fit(maxlags=max_lags, ic=ic)
# Build coefficient matrix [A1 | A2 | ... | Ap | c]
p = result.k_ar
k = result.neqs
coef_matrices = []
for lag in range(1, p + 1):
coef_matrices.append(result.coefs[lag - 1]) # (k, k)
intercept = result.intercept.reshape(-1, 1) # (k, 1)
coef_full = np.hstack([*coef_matrices, intercept]) # (k, k*p + 1)
def _forecast(steps: int) -> np.ndarray:
return result.forecast(result.endog[-p:], steps=steps)
return {
"coefficients": coef_full,
"lag_order": p,
"residuals": result.resid,
"sigma_u": result.sigma_u,
"aic": float(result.aic),
"bic": float(result.bic),
"fittedvalues": result.fittedvalues,
"forecast": _forecast,
}
[docs]
def vecm_model(
data: pd.DataFrame | np.ndarray,
k_ar_diff: int = 1,
det_order: int = 0,
) -> dict[str, Any]:
"""Fit a Vector Error Correction Model (VECM) for cointegrated systems.
Estimates the Johansen cointegrating rank and then fits the VECM of the
form: Delta y_t = alpha * beta' * y_{t-1} + Gamma * Delta y_{t-1} + ...
Parameters:
data: Multivariate time series (T, k).
k_ar_diff: Number of lagged difference terms.
det_order: Deterministic term order (-1 = none, 0 = constant inside
the cointegrating relation, 1 = linear trend).
Returns:
Dictionary with ``alpha`` (adjustment coefficients), ``beta``
(cointegrating vectors), ``gamma`` (short-run coefficients),
``det_coef`` (deterministic coefficients), ``coint_rank``,
``residuals``, and ``sigma_u``.
"""
from statsmodels.tsa.vector_ar.vecm import VECM, select_coint_rank
if isinstance(data, np.ndarray):
data = pd.DataFrame(data)
# Select cointegration rank via trace test
rank_test = select_coint_rank(data, det_order=det_order, k_ar_diff=k_ar_diff)
coint_rank = rank_test.rank
# Ensure at least rank 1 for estimation (VECM requires rank >= 1)
if coint_rank == 0:
coint_rank = 1
model = VECM(data, k_ar_diff=k_ar_diff, coint_rank=coint_rank, deterministic="ci")
result = model.fit()
return {
"alpha": result.alpha,
"beta": result.beta,
"gamma": result.gamma,
"det_coef": result.det_coef,
"coint_rank": coint_rank,
"residuals": result.resid,
"sigma_u": result.sigma_u,
}
[docs]
def granger_causality(
data: pd.DataFrame | np.ndarray,
max_lag: int = 10,
) -> dict[str, Any]:
"""Pairwise Granger causality tests for all variable pairs.
For each ordered pair (X, Y), tests whether lagged values of X help
predict Y beyond Y's own lags. Uses a VAR framework.
Parameters:
data: Multivariate time series (T, k).
max_lag: Maximum lag order to test.
Returns:
Dictionary mapping ``"X -> Y"`` to a dict with ``f_statistic``,
``p_value``, ``lag_order``, and ``is_causal`` (at 5 % level) for
each directed pair.
"""
from statsmodels.tsa.stattools import grangercausalitytests
if isinstance(data, np.ndarray):
data = pd.DataFrame(data)
columns = list(data.columns)
k = len(columns)
results: dict[str, Any] = {}
for i in range(k):
for j in range(k):
if i == j:
continue
# grangercausalitytests expects [y, x] column order
pair_data = data[[columns[j], columns[i]]].dropna()
if len(pair_data) < max_lag + 2:
continue
try:
gc = grangercausalitytests(pair_data, maxlag=max_lag, verbose=False)
except Exception:
continue
# Find the best lag by minimum p-value
best_lag = 1
best_p = 1.0
best_f = 0.0
for lag in range(1, max_lag + 1):
if lag in gc:
f_test = gc[lag][0]["ssr_ftest"]
f_val, p_val = f_test[0], f_test[1]
if p_val < best_p:
best_p = p_val
best_f = f_val
best_lag = lag
label = f"{columns[i]} -> {columns[j]}"
results[label] = {
"f_statistic": float(best_f),
"p_value": float(best_p),
"lag_order": best_lag,
"is_causal": bool(best_p < 0.05),
}
return results
[docs]
def impulse_response(
var_coefficients: np.ndarray,
n_periods: int = 20,
shock_var: int = 0,
) -> np.ndarray:
"""Compute impulse response functions from VAR coefficient matrices.
Applies a one-unit shock to *shock_var* at time 0 and traces out the
dynamic response of all variables over *n_periods*.
Parameters:
var_coefficients: VAR coefficient matrix of shape (k, k*p) or
(k, k*p + 1) where the last column is the intercept. The
k-by-k blocks are [A1 | A2 | ... | Ap].
n_periods: Number of periods for the IRF (default 20).
shock_var: Index of the variable receiving the unit shock.
Returns:
Array of shape (n_periods + 1, k) where row *h* is the response
of all k variables at horizon h.
"""
coef = np.asarray(var_coefficients, dtype=float)
k = coef.shape[0]
# Determine number of lags
n_cols = coef.shape[1]
# If the last column is an intercept (n_cols not divisible by k), strip it
if n_cols % k != 0:
coef = coef[:, : -(n_cols % k)]
p = coef.shape[1] // k
A_mats = [coef[:, i * k : (i + 1) * k] for i in range(p)]
# Compute IRFs by recursion
irf = np.zeros((n_periods + 1, k))
# Initial shock: unit impulse to shock_var
irf[0, shock_var] = 1.0
for h in range(1, n_periods + 1):
for lag, A in enumerate(A_mats):
idx = h - lag - 1
if 0 <= idx:
irf[h] += A @ irf[idx]
return irf
[docs]
def variance_decomposition(
var_coefficients: np.ndarray,
n_periods: int = 20,
) -> np.ndarray:
"""Forecast error variance decomposition from VAR coefficients.
Decomposes the forecast error variance of each variable into
contributions from each structural shock (Cholesky identification).
Parameters:
var_coefficients: VAR coefficient matrix (k, k*p) or (k, k*p + 1).
n_periods: Forecast horizon.
Returns:
Array of shape (n_periods + 1, k, k) where entry [h, i, j] is the
fraction of the h-step forecast error variance of variable *i*
attributable to shocks in variable *j*.
"""
coef = np.asarray(var_coefficients, dtype=float)
k = coef.shape[0]
# Compute IRFs for each shock variable
irfs = np.zeros((n_periods + 1, k, k)) # [horizon, response_var, shock_var]
for shock in range(k):
irfs[:, :, shock] = impulse_response(coef, n_periods, shock_var=shock)
# Cumulative squared IRFs
cum_sq = np.cumsum(irfs**2, axis=0) # (n_periods + 1, k, k)
# Total variance for each variable at each horizon
total_var = cum_sq.sum(axis=2, keepdims=True) # (n_periods + 1, k, 1)
# Avoid division by zero at h=0
total_var = np.where(total_var == 0, 1.0, total_var)
fevd = cum_sq / total_var # (n_periods + 1, k, k)
return fevd
[docs]
def structural_break_test(
y: np.ndarray | pd.Series,
X: np.ndarray | pd.DataFrame | None = None,
method: str = "chow",
break_point: int | None = None,
) -> dict[str, Any]:
"""Test for structural breaks in a regression relationship.
Parameters:
y: Dependent variable (T,).
X: Design matrix (T, k). If ``None``, a constant-only model is
used (testing a break in the mean).
method: ``"chow"`` for a known break point, or ``"sup_f"`` for the
supremum-F test (Andrews, 1993) which searches over candidate
break points.
break_point: Observation index of the hypothesised break (required
for ``method="chow"``). For ``"sup_f"`` this is ignored.
Returns:
Dictionary with ``f_statistic``, ``p_value``, ``break_point``, and
``is_break`` (at 5 % level).
Raises:
ValueError: If ``method="chow"`` and *break_point* is ``None``.
"""
import statsmodels.api as sm
y_arr = np.asarray(y, dtype=float).ravel()
T = len(y_arr)
if X is None:
X_arr = np.ones((T, 1))
else:
X_arr = np.asarray(X, dtype=float)
if X_arr.ndim == 1:
X_arr = X_arr.reshape(-1, 1)
k = X_arr.shape[1]
if method == "chow":
if break_point is None:
msg = "break_point must be specified for method='chow'"
raise ValueError(msg)
# Delegate to stats/tests.chow_test (single source of truth)
from wraquant.stats.tests import chow_test as _chow_test
chow_result = _chow_test(y_arr, X_arr, break_point=break_point, add_constant=False)
return {
"f_statistic": chow_result["f_stat"],
"p_value": chow_result["p_value"],
"break_point": break_point,
"is_break": chow_result["break_detected"],
}
elif method == "sup_f":
# Andrews (1993) supremum F-test
# Search over the central 70% of observations
trim = max(int(0.15 * T), k + 1)
candidates = range(trim, T - trim)
best_f = -np.inf
best_bp = trim
best_p = 1.0
# Full-sample SSR
ols_full = sm.OLS(y_arr, X_arr).fit()
ssr_full = np.sum(ols_full.resid**2)
for bp in candidates:
if bp < k + 1 or (T - bp) < k + 1:
continue
y1, X1 = y_arr[:bp], X_arr[:bp]
y2, X2 = y_arr[bp:], X_arr[bp:]
try:
ols1 = sm.OLS(y1, X1).fit()
ols2 = sm.OLS(y2, X2).fit()
except Exception:
continue
ssr_sub = np.sum(ols1.resid**2) + np.sum(ols2.resid**2)
df_num = k
df_denom = T - 2 * k
f_val = ((ssr_full - ssr_sub) / df_num) / (
ssr_sub / max(df_denom, 1)
)
if f_val > best_f:
best_f = f_val
best_bp = bp
# Approximate p-value using F distribution (conservative)
df_num = k
df_denom = T - 2 * k
best_p = 1.0 - sp_stats.f.cdf(best_f, df_num, max(df_denom, 1))
return {
"f_statistic": float(best_f),
"p_value": float(best_p),
"break_point": best_bp,
"is_break": bool(best_p < 0.05),
}
else:
msg = f"Unknown method: {method!r}. Use 'chow' or 'sup_f'."
raise ValueError(msg)