"""Cross-sectional econometric methods.
Provides robust OLS, quantile regression, instrumental variables (2SLS),
GMM estimation, and the Sargan overidentification test -- core tools for
empirical asset pricing and cross-sectional return analysis.
"""
from __future__ import annotations
from typing import Any, Callable
import numpy as np
import pandas as pd
from scipy import optimize as sp_optimize
from scipy import stats as sp_stats
from wraquant.core._coerce import coerce_array
[docs]
def robust_ols(
y: np.ndarray | pd.Series,
X: np.ndarray | pd.DataFrame,
cov_type: str = "HC1",
) -> dict[str, Any]:
"""OLS regression with heteroskedasticity-robust standard errors.
Implements the HC0 -- HC3 covariance estimators of White (1980) and
MacKinnon and White (1985). Point estimates are identical to plain
OLS; only the standard errors, t-statistics, and p-values differ.
Parameters:
y: Dependent variable (n,).
X: Design matrix (n, k). A constant is **not** added automatically;
use ``sm.add_constant(X)`` beforehand if you need an intercept.
cov_type: Heteroskedasticity-robust covariance type. One of
``"HC0"``, ``"HC1"``, ``"HC2"``, or ``"HC3"``.
Returns:
Dictionary with ``coefficients``, ``std_errors``, ``t_stats``,
``p_values``, ``r_squared``, ``adj_r_squared``, ``residuals``,
``cov_type``, and ``nobs``.
"""
import statsmodels.api as sm
y_arr = coerce_array(y, name="y")
X_arr = np.asarray(X, dtype=np.float64)
model = sm.OLS(y_arr, X_arr)
result = model.fit(cov_type=cov_type)
return {
"coefficients": result.params,
"std_errors": result.bse,
"t_stats": result.tvalues,
"p_values": result.pvalues,
"r_squared": float(result.rsquared),
"adj_r_squared": float(result.rsquared_adj),
"residuals": result.resid,
"cov_type": cov_type,
"nobs": int(result.nobs),
}
[docs]
def quantile_regression(
y: np.ndarray | pd.Series,
X: np.ndarray | pd.DataFrame,
quantile: float = 0.5,
) -> dict[str, Any]:
"""Quantile regression via statsmodels.
Estimates the conditional quantile function, generalising OLS which
estimates the conditional *mean*. Useful for understanding heterogeneous
effects across the return distribution.
Parameters:
y: Dependent variable (n,).
X: Design matrix (n, k).
quantile: Quantile to estimate (0 < quantile < 1). Defaults to the
median (0.5).
Returns:
Dictionary with ``coefficients``, ``std_errors``, ``t_stats``,
``p_values``, ``quantile``, ``pseudo_r_squared``, and ``nobs``.
"""
import statsmodels.api as sm
y_arr = coerce_array(y, name="y")
X_arr = np.asarray(X, dtype=np.float64)
model = sm.QuantReg(y_arr, X_arr)
result = model.fit(q=quantile)
return {
"coefficients": result.params,
"std_errors": result.bse,
"t_stats": result.tvalues,
"p_values": result.pvalues,
"quantile": quantile,
"pseudo_r_squared": float(result.prsquared),
"nobs": int(result.nobs),
}
[docs]
def two_stage_least_squares(
y: np.ndarray | pd.Series,
X: np.ndarray | pd.DataFrame,
instruments: np.ndarray | pd.DataFrame,
endog_vars: list[int] | np.ndarray,
) -> dict[str, Any]:
"""Two-stage least squares (2SLS) instrumental variables estimator.
Addresses endogeneity by instrumenting the endogenous regressors in *X*
with exogenous *instruments*. The first stage regresses each endogenous
variable on the instruments, and the second stage regresses *y* on the
fitted values plus the exogenous regressors.
Parameters:
y: Dependent variable (n,).
X: Full design matrix (n, k) containing both exogenous and endogenous
regressors.
instruments: Matrix of excluded instruments (n, m) with m >= number
of endogenous variables (order condition).
endog_vars: Column indices (0-based) within *X* that are endogenous.
Returns:
Dictionary with ``coefficients``, ``std_errors``, ``t_stats``,
``p_values``, ``r_squared``, ``residuals``, ``first_stage_f``,
and ``nobs``.
Raises:
ValueError: If the order condition is violated (fewer instruments
than endogenous variables).
"""
y_arr = coerce_array(y, name="y")
X_arr = np.asarray(X, dtype=np.float64)
Z_excl = np.asarray(instruments, dtype=np.float64)
endog_idx = np.asarray(endog_vars).ravel()
n, k = X_arr.shape
n_endog = len(endog_idx)
if Z_excl.ndim == 1:
Z_excl = Z_excl.reshape(-1, 1)
if Z_excl.shape[1] < n_endog:
msg = (
f"Order condition violated: {Z_excl.shape[1]} excluded instruments "
f"for {n_endog} endogenous variables."
)
raise ValueError(msg)
# Identify exogenous columns in X
all_idx = np.arange(k)
exog_idx = np.setdiff1d(all_idx, endog_idx)
X_exog = X_arr[:, exog_idx]
# Full instrument set = exogenous regressors + excluded instruments
Z = np.hstack([X_exog, Z_excl])
# --- First stage: regress each endogenous variable on Z ---
P_z = Z @ np.linalg.lstsq(Z, np.eye(n), rcond=None)[0] # noqa: N806
# More stable: projection matrix via QR
Q, R = np.linalg.qr(Z)
P_z = Q @ Q.T # noqa: N806
X_hat = X_arr.copy()
first_stage_f_stats = []
for idx in endog_idx:
X_hat[:, idx] = P_z @ X_arr[:, idx]
# First-stage F-statistic
fitted = X_hat[:, idx]
resid_first = X_arr[:, idx] - fitted
ss_reg = np.sum((fitted - np.mean(X_arr[:, idx])) ** 2)
ss_res = np.sum(resid_first**2)
df_reg = Z.shape[1]
df_res = n - Z.shape[1]
f_val = (ss_reg / df_reg) / (ss_res / max(df_res, 1))
first_stage_f_stats.append(float(f_val))
# --- Second stage: regress y on X_hat ---
from wraquant.stats.regression import ols as _ols
_2sls_result = _ols(y_arr, X_hat, add_constant=False)
beta_2sls = _2sls_result["coefficients"]
# Residuals use actual X, not fitted X_hat
residuals = y_arr - X_arr @ beta_2sls
# Standard errors
sigma2 = np.sum(residuals**2) / (n - k)
# 2SLS variance: sigma^2 * (X_hat' X_hat)^{-1}
XhXh_inv = np.linalg.inv(X_hat.T @ X_hat)
cov_beta = sigma2 * XhXh_inv
std_errors = np.sqrt(np.diag(cov_beta))
t_stats = beta_2sls / std_errors
p_values = 2.0 * (1.0 - sp_stats.t.cdf(np.abs(t_stats), df=n - k))
# R-squared (may be negative for IV)
ss_tot = np.sum((y_arr - np.mean(y_arr)) ** 2)
ss_res = np.sum(residuals**2)
r_squared = 1.0 - ss_res / ss_tot
return {
"coefficients": beta_2sls,
"std_errors": std_errors,
"t_stats": t_stats,
"p_values": p_values,
"r_squared": float(r_squared),
"residuals": residuals,
"first_stage_f": first_stage_f_stats,
"nobs": n,
}
[docs]
def gmm_estimation(
moment_conditions: Callable[[np.ndarray, np.ndarray], np.ndarray],
params_init: np.ndarray,
W: np.ndarray | None = None,
*,
data: np.ndarray | None = None,
max_iter: int = 2,
) -> dict[str, Any]:
"""Generalized Method of Moments (GMM) estimation.
Minimises the quadratic form g(theta)' W g(theta) where g(theta) is the
sample average of moment conditions. Supports iterated GMM (two-step by
default).
Parameters:
moment_conditions: Callable ``(params, data) -> (n, q)`` returning the
n-by-q matrix of moment conditions evaluated at *params*. Each
row corresponds to an observation, each column to a moment
condition.
params_init: Initial parameter vector (p,).
W: Weighting matrix (q, q). Defaults to the identity matrix
(one-step GMM). After the first step the optimal weighting
matrix is computed from the residual moment conditions.
data: Data array passed as the second argument to
*moment_conditions*. If ``None``, a zero-length array is used.
max_iter: Maximum number of GMM iterations (default 2 = two-step GMM).
Returns:
Dictionary with ``params``, ``objective``, ``moment_conditions_mean``,
``W``, ``nobs``, and ``n_moments``.
"""
theta = np.asarray(params_init, dtype=float).copy()
if data is None:
data = np.empty(0)
data_arr = np.asarray(data, dtype=float)
# Evaluate moment conditions to determine dimensions
g_mat = moment_conditions(theta, data_arr)
n, q = g_mat.shape
if W is None:
W_mat = np.eye(q)
else:
W_mat = np.asarray(W, dtype=float)
for _ in range(max_iter):
def _objective(
params: np.ndarray,
_W: np.ndarray = W_mat,
) -> float:
g = moment_conditions(params, data_arr)
g_bar = g.mean(axis=0)
return float(g_bar @ _W @ g_bar)
result = sp_optimize.minimize(
_objective,
theta,
method="BFGS",
options={"maxiter": 5000, "gtol": 1e-10},
)
theta = result.x
# Update weighting matrix (optimal = inverse of S)
g_mat = moment_conditions(theta, data_arr)
g_bar = g_mat.mean(axis=0)
g_centered = g_mat - g_bar
S = (g_centered.T @ g_centered) / n
try:
W_mat = np.linalg.inv(S)
except np.linalg.LinAlgError:
W_mat = np.linalg.pinv(S)
g_final = moment_conditions(theta, data_arr)
g_bar_final = g_final.mean(axis=0)
obj_val = float(g_bar_final @ W_mat @ g_bar_final)
return {
"params": theta,
"objective": obj_val,
"moment_conditions_mean": g_bar_final,
"W": W_mat,
"nobs": n,
"n_moments": q,
}
[docs]
def sargan_test(
residuals: np.ndarray | pd.Series,
instruments: np.ndarray | pd.DataFrame,
) -> dict[str, Any]:
"""Sargan-Hansen J-test for overidentifying restrictions.
Tests whether excluded instruments are validly uncorrelated with the
structural error. Only applicable when the model is overidentified
(more instruments than endogenous variables).
Parameters:
residuals: 2SLS residuals (n,).
instruments: Full instrument matrix (n, m) including both included
and excluded instruments.
Returns:
Dictionary with ``statistic``, ``p_value``, ``df``, and ``is_valid``
(instruments are valid at 5 % level if not rejected).
"""
resid = coerce_array(residuals, name="residuals")
Z = np.asarray(instruments, dtype=np.float64)
if Z.ndim == 1:
Z = Z.reshape(-1, 1)
n, m = Z.shape
# Regress residuals on instruments
Q, R = np.linalg.qr(Z)
fitted = Q @ (Q.T @ resid)
# J = n * R^2 from regressing residuals on instruments
ss_res = np.sum((resid - fitted) ** 2)
ss_tot = np.sum((resid - resid.mean()) ** 2)
if ss_tot == 0:
r_squared = 0.0
else:
r_squared = 1.0 - ss_res / ss_tot
j_stat = n * r_squared
# Degrees of freedom = number of overidentifying restrictions
# In practice, the user should pass the number of endogenous regressors
# separately. Here we use m - 1 as a common case (one endogenous var).
# The test is chi-squared with df = m - k_endog, but since we do not
# know k_endog here, we report the statistic and let the user set df.
# We default to df = m (equivalent to regressing residuals on all instruments).
df = m
p_value = 1.0 - sp_stats.chi2.cdf(j_stat, df=max(df, 1))
return {
"statistic": float(j_stat),
"p_value": float(p_value),
"df": int(df),
"is_valid": bool(p_value > 0.05),
}