"""Panel data econometrics.
Provides pooled OLS, fixed effects (within estimator), random effects (GLS),
between effects, first-difference estimator, and the Hausman specification
test. These are the workhorses of empirical asset pricing with panel data
(e.g. Fama-MacBeth regressions, firm-level regressions with firm and time
fixed effects).
"""
from __future__ import annotations
from typing import Any
import numpy as np
import pandas as pd
from scipy import stats as sp_stats
from wraquant.core._coerce import coerce_array
[docs]
def pooled_ols(
y: np.ndarray | pd.Series,
X: np.ndarray | pd.DataFrame,
) -> dict[str, Any]:
"""Pooled OLS regression ignoring panel structure.
Treats all observations as independent cross-sectional data. This is
generally inconsistent when unobserved heterogeneity is present but
serves as a baseline for comparison with panel estimators.
Parameters:
y: Dependent variable (n,).
X: Design matrix (n, k). Should include a constant column if an
intercept is desired.
Returns:
Dictionary with ``coefficients``, ``std_errors``, ``t_stats``,
``p_values``, ``r_squared``, ``adj_r_squared``, ``residuals``,
and ``nobs``.
"""
import statsmodels.api as sm
y_arr = coerce_array(y, name="y")
X_arr = np.asarray(X, dtype=np.float64)
result = sm.OLS(y_arr, X_arr).fit()
return {
"coefficients": result.params,
"std_errors": result.bse,
"t_stats": result.tvalues,
"p_values": result.pvalues,
"r_squared": float(result.rsquared),
"adj_r_squared": float(result.rsquared_adj),
"residuals": result.resid,
"nobs": int(result.nobs),
}
[docs]
def fixed_effects(
y: pd.Series,
X: pd.DataFrame,
entity_col: str,
time_col: str | None = None,
) -> dict[str, Any]:
"""Entity fixed effects regression (within estimator).
Demeans all variables by entity (and optionally by time period) before
running OLS, which is algebraically equivalent to including entity
dummies. This removes time-invariant unobserved heterogeneity.
Parameters:
y: Dependent variable. Must share the same index as *X*.
X: Regressor DataFrame. Must contain *entity_col* (and optionally
*time_col*). All other columns are treated as regressors.
entity_col: Column name identifying the cross-sectional unit.
time_col: Optional column name identifying the time period for
two-way fixed effects.
Returns:
Dictionary with ``coefficients``, ``std_errors``, ``t_stats``,
``p_values``, ``r_squared`` (within), ``entity_effects``,
``residuals``, ``nobs``, and ``n_entities``.
"""
df = X.copy()
df["__y__"] = coerce_array(y, name="y")
# Identify regressor columns (exclude entity and time columns)
drop_cols = {entity_col}
if time_col is not None:
drop_cols.add(time_col)
reg_cols = [c for c in X.columns if c not in drop_cols]
# Entity demeaning
group_means = df.groupby(entity_col)[["__y__", *reg_cols]].transform("mean")
df_demeaned = df[["__y__", *reg_cols]] - group_means
# Time demeaning (two-way FE)
if time_col is not None:
time_means = df.copy()
time_means[["__y__", *reg_cols]] = df_demeaned[["__y__", *reg_cols]]
time_group_means = (
time_means.groupby(time_col)[["__y__", *reg_cols]].transform("mean")
)
overall_means = df_demeaned[["__y__", *reg_cols]].mean()
df_demeaned[["__y__", *reg_cols]] = (
df_demeaned[["__y__", *reg_cols]] - time_group_means + overall_means
)
y_dm = df_demeaned["__y__"].values
X_dm = df_demeaned[reg_cols].values
n = len(y_dm)
k = X_dm.shape[1]
n_entities = df[entity_col].nunique()
# OLS on demeaned data — canonical import
from wraquant.stats.regression import ols as _ols
_ols_result = _ols(y_dm, X_dm, add_constant=False)
beta = _ols_result["coefficients"]
residuals = y_dm - X_dm @ beta
# Degrees of freedom: n - n_entities - k (for entity FE)
df_resid = n - n_entities - k
if time_col is not None:
n_times = df[time_col].nunique()
df_resid = n - n_entities - n_times - k + 1
sigma2 = np.sum(residuals**2) / max(df_resid, 1)
XtX_inv = np.linalg.inv(X_dm.T @ X_dm)
cov_beta = sigma2 * XtX_inv
std_errors = np.sqrt(np.diag(cov_beta))
t_stats = beta / std_errors
p_values = 2.0 * (1.0 - sp_stats.t.cdf(np.abs(t_stats), df=max(df_resid, 1)))
# Within R-squared
ss_res = np.sum(residuals**2)
ss_tot = np.sum((y_dm - y_dm.mean()) ** 2)
r_squared = 1.0 - ss_res / ss_tot if ss_tot > 0 else 0.0
# Entity effects (group mean of y - X @ beta)
y_arr = np.asarray(y, dtype=float)
X_reg = X[reg_cols].values
entity_effects_raw = y_arr - X_reg @ beta
entity_effects_series = pd.Series(entity_effects_raw, index=X.index)
entity_effects = entity_effects_series.groupby(df[entity_col]).mean()
coef_series = pd.Series(beta, index=reg_cols)
se_series = pd.Series(std_errors, index=reg_cols)
t_series = pd.Series(t_stats, index=reg_cols)
p_series = pd.Series(p_values, index=reg_cols)
return {
"coefficients": coef_series,
"std_errors": se_series,
"t_stats": t_series,
"p_values": p_series,
"r_squared": float(r_squared),
"entity_effects": entity_effects,
"residuals": residuals,
"nobs": n,
"n_entities": n_entities,
}
[docs]
def random_effects(
y: pd.Series,
X: pd.DataFrame,
entity_col: str,
) -> dict[str, Any]:
"""Random effects (GLS) panel regression.
Assumes that the unobserved entity effect is uncorrelated with the
regressors. Applies a partial demeaning transformation using the
estimated variance components, then runs GLS.
Parameters:
y: Dependent variable.
X: Regressor DataFrame containing *entity_col*.
entity_col: Column identifying the cross-sectional unit.
Returns:
Dictionary with ``coefficients``, ``std_errors``, ``t_stats``,
``p_values``, ``r_squared``, ``theta`` (partial-demeaning
parameter), ``residuals``, and ``nobs``.
"""
df = X.copy()
df["__y__"] = coerce_array(y, name="y")
reg_cols = [c for c in X.columns if c != entity_col]
groups = df[entity_col]
# --- Step 1: estimate variance components from FE residuals ---
group_means_y = df.groupby(entity_col)["__y__"].transform("mean")
group_means_X = df.groupby(entity_col)[reg_cols].transform("mean")
y_dm = df["__y__"].values - group_means_y.values
X_dm = df[reg_cols].values - group_means_X.values
n = len(y_dm)
n_entities = groups.nunique()
k = X_dm.shape[1]
from wraquant.stats.regression import ols as _ols
_fe_result = _ols(y_dm, X_dm, add_constant=False)
beta_fe = _fe_result["coefficients"]
resid_fe = y_dm - X_dm @ beta_fe
sigma2_e = np.sum(resid_fe**2) / max(n - n_entities - k, 1)
# Between estimator residuals for sigma2_u
T_i = groups.value_counts() # observations per entity
T_bar = T_i.mean()
# Total residual from pooled OLS for sigma2_total
import statsmodels.api as sm
X_with_const = sm.add_constant(df[reg_cols].values)
pooled = sm.OLS(df["__y__"].values, X_with_const).fit()
# Variance of group means of pooled residuals
pooled_resid = pd.Series(pooled.resid, index=df.index)
group_mean_resid = pooled_resid.groupby(groups).mean()
sigma2_between = group_mean_resid.var()
sigma2_u = max(float(sigma2_between) - sigma2_e / T_bar, 0.0)
# --- Step 2: partial demeaning ---
# theta_i = 1 - sqrt(sigma2_e / (T_i * sigma2_u + sigma2_e))
theta_dict = {}
for entity, Ti in T_i.items():
denom = Ti * sigma2_u + sigma2_e
if denom > 0:
theta_dict[entity] = 1.0 - np.sqrt(sigma2_e / denom)
else:
theta_dict[entity] = 0.0
theta_series = groups.map(theta_dict).values
# Quasi-demeaned variables
y_re = df["__y__"].values - theta_series * group_means_y.values
X_re = df[reg_cols].values - theta_series[:, None] * group_means_X.values
X_re_const = sm.add_constant(X_re)
# GLS estimation
result = sm.OLS(y_re, X_re_const).fit()
# Use all coefficients (intercept + slopes)
coefficients = result.params
coef_names = ["const", *reg_cols]
residuals = df["__y__"].values - sm.add_constant(df[reg_cols].values) @ coefficients
# R-squared
ss_res = np.sum(residuals**2)
ss_tot = np.sum((df["__y__"].values - df["__y__"].mean()) ** 2)
r_squared = 1.0 - ss_res / ss_tot if ss_tot > 0 else 0.0
theta_avg = float(np.mean(list(theta_dict.values())))
return {
"coefficients": pd.Series(coefficients, index=coef_names),
"std_errors": pd.Series(result.bse, index=coef_names),
"t_stats": pd.Series(result.tvalues, index=coef_names),
"p_values": pd.Series(result.pvalues, index=coef_names),
"r_squared": float(r_squared),
"theta": theta_avg,
"residuals": residuals,
"nobs": n,
}
[docs]
def hausman_test(
fe_results: dict[str, Any],
re_results: dict[str, Any],
) -> dict[str, Any]:
"""Hausman specification test for fixed vs. random effects.
Under the null hypothesis, both FE and RE are consistent, but RE is
efficient. Rejection favours fixed effects.
Parameters:
fe_results: Output from :func:`fixed_effects`.
re_results: Output from :func:`random_effects`.
Returns:
Dictionary with ``statistic``, ``p_value``, ``df``, and ``prefer``
(``"fe"`` or ``"re"``).
"""
beta_fe = np.asarray(fe_results["coefficients"])
beta_re_full = np.asarray(re_results["coefficients"])
# RE includes a constant; FE does not. Align on the slope coefficients.
fe_names = list(fe_results["coefficients"].index)
re_names = list(re_results["coefficients"].index)
# Find common slope names
common = [n for n in fe_names if n in re_names and n != "const"]
if not common:
common = fe_names # fallback
fe_idx = [fe_names.index(n) for n in common]
re_idx = [re_names.index(n) for n in common]
b_fe = beta_fe[fe_idx]
b_re = beta_re_full[re_idx]
diff = b_fe - b_re
# Variance of the difference
var_fe = np.diag(
np.asarray(fe_results["std_errors"])[fe_idx] ** 2
) * np.eye(len(common))
var_re = np.diag(
np.asarray(re_results["std_errors"])[re_idx] ** 2
) * np.eye(len(common))
# Reconstruct covariance matrices from std errors (diagonal approx)
cov_diff = var_fe - var_re
# Ensure positive definiteness by taking absolute values on diagonal
cov_diff = np.abs(cov_diff)
try:
cov_diff_inv = np.linalg.inv(cov_diff)
except np.linalg.LinAlgError:
cov_diff_inv = np.linalg.pinv(cov_diff)
stat = float(diff @ cov_diff_inv @ diff)
df = len(common)
p_value = 1.0 - sp_stats.chi2.cdf(stat, df=df)
return {
"statistic": float(stat),
"p_value": float(p_value),
"df": df,
"prefer": "fe" if p_value < 0.05 else "re",
}
[docs]
def between_effects(
y: pd.Series,
X: pd.DataFrame,
entity_col: str,
) -> dict[str, Any]:
"""Between estimator (regression on group means).
Collapses the panel to entity-level means and runs OLS. Exploits only
the cross-sectional (between) variation and is inconsistent in the
presence of entity-level omitted variables.
Parameters:
y: Dependent variable.
X: Regressor DataFrame containing *entity_col*.
entity_col: Column identifying the cross-sectional unit.
Returns:
Dictionary with ``coefficients``, ``std_errors``, ``t_stats``,
``p_values``, ``r_squared``, ``residuals``, and ``n_entities``.
"""
import statsmodels.api as sm
df = X.copy()
df["__y__"] = coerce_array(y, name="y")
reg_cols = [c for c in X.columns if c != entity_col]
# Group means
gm = df.groupby(entity_col)[["__y__", *reg_cols]].mean()
# Drop columns that are constant across entities (e.g. time index)
varying = [c for c in reg_cols if gm[c].std() > 1e-12]
y_bar = gm["__y__"].values
X_bar = sm.add_constant(gm[varying].values)
result = sm.OLS(y_bar, X_bar).fit()
coef_names = ["const", *varying]
return {
"coefficients": pd.Series(result.params, index=coef_names),
"std_errors": pd.Series(result.bse, index=coef_names),
"t_stats": pd.Series(result.tvalues, index=coef_names),
"p_values": pd.Series(result.pvalues, index=coef_names),
"r_squared": float(result.rsquared),
"residuals": result.resid,
"n_entities": len(gm),
}
[docs]
def first_difference(
y: pd.Series,
X: pd.DataFrame,
entity_col: str,
time_col: str,
) -> dict[str, Any]:
"""First-difference estimator.
Differences all variables within each entity, eliminating the
time-invariant entity effect. Under strict exogeneity this is
consistent; it is often preferred over FE when the errors are a
random walk.
Parameters:
y: Dependent variable.
X: Regressor DataFrame containing *entity_col* and *time_col*.
entity_col: Column identifying the cross-sectional unit.
time_col: Column identifying the time period.
Returns:
Dictionary with ``coefficients``, ``std_errors``, ``t_stats``,
``p_values``, ``r_squared``, ``residuals``, and ``nobs``.
"""
import statsmodels.api as sm
df = X.copy()
df["__y__"] = coerce_array(y, name="y")
reg_cols = [c for c in X.columns if c not in {entity_col, time_col}]
# Sort and difference within entity
df = df.sort_values([entity_col, time_col])
diff_cols = ["__y__", *reg_cols]
df_diff = df.groupby(entity_col)[diff_cols].diff().dropna()
y_d = df_diff["__y__"].values
X_d = df_diff[reg_cols].values
result = sm.OLS(y_d, X_d).fit()
return {
"coefficients": pd.Series(result.params, index=reg_cols),
"std_errors": pd.Series(result.bse, index=reg_cols),
"t_stats": pd.Series(result.tvalues, index=reg_cols),
"p_values": pd.Series(result.pvalues, index=reg_cols),
"r_squared": float(result.rsquared),
"residuals": result.resid,
"nobs": len(y_d),
}