Source code for wraquant.risk.dcc

"""Dynamic Conditional Correlation (DCC-GARCH) models.

Provides univariate GARCH(1,1) fitting, DCC parameter estimation via MLE,
rolling DCC-based correlations, correlation forecasting, and time-varying
covariance matrix computation.  All implementations use pure numpy/scipy.
"""

from __future__ import annotations

from typing import Any

import numpy as np
from scipy import optimize

# ---------------------------------------------------------------------------
# Univariate GARCH(1,1)
# ---------------------------------------------------------------------------


def _garch11_loglik(
    params: np.ndarray,
    returns: np.ndarray,
) -> float:
    """Negative log-likelihood for a GARCH(1,1) model.

    sigma_t^2 = omega + alpha * r_{t-1}^2 + beta * sigma_{t-1}^2
    """
    omega, alpha, beta = params
    if omega <= 0 or alpha < 0 or beta < 0 or alpha + beta >= 1:
        return 1e12

    T = len(returns)
    sigma2 = np.empty(T)
    sigma2[0] = np.var(returns)

    for t in range(1, T):
        sigma2[t] = omega + alpha * returns[t - 1] ** 2 + beta * sigma2[t - 1]
        if sigma2[t] <= 0:
            return 1e12

    # Gaussian log-likelihood (ignoring constant)
    ll = -0.5 * np.sum(np.log(sigma2) + returns**2 / sigma2)
    return -ll


def _fit_garch11(returns: np.ndarray) -> dict[str, Any]:
    """Fit univariate GARCH(1,1) via MLE.

    Returns:
        Dict with ``"omega"``, ``"alpha"``, ``"beta"``,
        ``"conditional_vol"`` (array of sigma_t).
    """
    var0 = np.var(returns)
    x0 = np.array([var0 * 0.05, 0.05, 0.90])

    res = optimize.minimize(
        _garch11_loglik,
        x0,
        args=(returns,),
        method="Nelder-Mead",
        options={"maxiter": 10000, "xatol": 1e-8, "fatol": 1e-8},
    )
    omega, alpha, beta = res.x
    omega = max(omega, 1e-10)
    alpha = max(alpha, 1e-6)
    beta = max(beta, 1e-6)
    # Re-normalise if needed
    if alpha + beta >= 1.0:
        s = alpha + beta
        alpha = alpha / s * 0.999
        beta = beta / s * 0.999

    T = len(returns)
    sigma2 = np.empty(T)
    sigma2[0] = np.var(returns)
    for t in range(1, T):
        sigma2[t] = omega + alpha * returns[t - 1] ** 2 + beta * sigma2[t - 1]

    return {
        "omega": float(omega),
        "alpha": float(alpha),
        "beta": float(beta),
        "conditional_vol": np.sqrt(sigma2),
    }


# ---------------------------------------------------------------------------
# DCC estimation
# ---------------------------------------------------------------------------


def _dcc_loglik(
    params: np.ndarray,
    std_residuals: np.ndarray,
    qbar: np.ndarray,
) -> float:
    """Negative log-likelihood for DCC parameters (a, b).

    Q_t = (1 - a - b) * Qbar + a * e_{t-1} e_{t-1}' + b * Q_{t-1}
    R_t = diag(Q_t)^{-1/2} Q_t diag(Q_t)^{-1/2}
    """
    a, b = params
    if a < 0 or b < 0 or a + b >= 1:
        return 1e12

    T, k = std_residuals.shape
    Qt = qbar.copy()
    ll = 0.0
    c = 1 - a - b

    for t in range(1, T):
        et_prev = std_residuals[t - 1].reshape(-1, 1)
        Qt = c * qbar + a * (et_prev @ et_prev.T) + b * Qt

        # Correlation matrix from Qt
        d = np.sqrt(np.diag(Qt))
        if np.any(d <= 0):
            return 1e12
        D_inv = np.diag(1.0 / d)
        Rt = D_inv @ Qt @ D_inv

        # Ensure positive definite
        det_Rt = np.linalg.det(Rt)
        if det_Rt <= 0:
            return 1e12

        et = std_residuals[t]
        quad_form = et @ np.linalg.solve(Rt, et) - et @ et
        ll += -0.5 * (np.log(det_Rt) + quad_form)

    return -ll


[docs] def dcc_garch( returns: np.ndarray, p: int = 1, q: int = 1, ) -> dict[str, Any]: """Fit a DCC-GARCH(p, q) model. Currently supports p=1, q=1 (DCC(1,1)). Procedure: 1. Fit univariate GARCH(1,1) to each return series. 2. Compute standardized residuals. 3. Estimate DCC parameters (a, b) via MLE. Parameters: returns: Array of shape ``(T, k)`` with k asset return series. p: DCC lag order for innovation (currently must be 1). q: DCC lag order for conditional (currently must be 1). Returns: Dict with keys: * ``"a"`` -- DCC innovation parameter. * ``"b"`` -- DCC persistence parameter. * ``"garch_params"`` -- list of per-asset GARCH(1,1) parameter dicts. * ``"qbar"`` -- unconditional correlation matrix of standardized residuals. * ``"conditional_vols"`` -- array of per-asset conditional volatilities ``(T, k)``. * ``"std_residuals"`` -- standardized residuals ``(T, k)``. """ from wraquant.core._coerce import coerce_array if p != 1 or q != 1: msg = "Only DCC(1,1) is currently supported" raise ValueError(msg) returns = np.asarray(returns, dtype=np.float64) T, k = returns.shape # Step 1: fit univariate GARCH to each series garch_results = [] cond_vols = np.empty((T, k)) std_resids = np.empty((T, k)) for j in range(k): g = _fit_garch11(returns[:, j]) garch_results.append(g) cond_vols[:, j] = g["conditional_vol"] std_resids[:, j] = returns[:, j] / g["conditional_vol"] # Step 2: unconditional correlation of standardized residuals qbar = np.corrcoef(std_resids, rowvar=False) # Step 3: estimate DCC parameters res = optimize.minimize( _dcc_loglik, x0=np.array([0.01, 0.95]), args=(std_resids, qbar), method="Nelder-Mead", options={"maxiter": 10000, "xatol": 1e-8, "fatol": 1e-8}, ) a_hat, b_hat = res.x a_hat = max(float(a_hat), 1e-6) b_hat = max(float(b_hat), 1e-6) if a_hat + b_hat >= 1.0: s = a_hat + b_hat a_hat = a_hat / s * 0.999 b_hat = b_hat / s * 0.999 return { "a": a_hat, "b": b_hat, "garch_params": [ {"omega": g["omega"], "alpha": g["alpha"], "beta": g["beta"]} for g in garch_results ], "qbar": qbar, "conditional_vols": cond_vols, "std_residuals": std_resids, }
# --------------------------------------------------------------------------- # Rolling correlation # ---------------------------------------------------------------------------
[docs] def rolling_correlation_dcc( returns: np.ndarray, window: int | None = None, ) -> dict[str, Any]: """Compute DCC-based time-varying correlation matrices. Fits DCC-GARCH to the full sample, then extracts the time-varying correlation matrix at each time step. Parameters: returns: Array of shape ``(T, k)``. window: Ignored (present for API compatibility with simple rolling-window methods). DCC uses the full sample. Returns: Dict with keys: * ``"correlations"`` -- array of shape ``(T, k, k)`` with the time-varying correlation matrices. * ``"dcc_model"`` -- the fitted DCC model dict. """ from wraquant.core._coerce import coerce_array returns = np.asarray(returns, dtype=np.float64) model = dcc_garch(returns) corrs = _compute_dcc_correlations(model) return { "correlations": corrs, "dcc_model": model, }
def _compute_dcc_correlations(model: dict[str, Any]) -> np.ndarray: """Extract time-varying correlation matrices from a fitted DCC model.""" a = model["a"] b = model["b"] qbar = model["qbar"] std_resids = model["std_residuals"] T, k = std_resids.shape c = 1 - a - b correlations = np.empty((T, k, k)) Qt = qbar.copy() correlations[0] = qbar.copy() for t in range(1, T): et_prev = std_resids[t - 1].reshape(-1, 1) Qt = c * qbar + a * (et_prev @ et_prev.T) + b * Qt d = np.sqrt(np.diag(Qt)) D_inv = np.diag(1.0 / np.maximum(d, 1e-10)) Rt = D_inv @ Qt @ D_inv # Clip to valid correlation range np.clip(Rt, -1, 1, out=Rt) np.fill_diagonal(Rt, 1.0) correlations[t] = Rt return correlations # --------------------------------------------------------------------------- # Forecasting # ---------------------------------------------------------------------------
[docs] def forecast_correlation( dcc_model: dict[str, Any], horizon: int = 1, ) -> dict[str, Any]: """Forecast future correlation matrices from a fitted DCC model. Uses the mean-reverting property of DCC: ``E[Q_{T+h}] -> Qbar`` as ``h -> inf``. For finite horizons the forecasted Q is computed recursively assuming that future innovations have zero outer product (their expectation). Parameters: dcc_model: Output of :func:`dcc_garch`. horizon: Number of steps ahead to forecast. Returns: Dict with keys: * ``"forecasted_correlations"`` -- array of shape ``(horizon, k, k)``. * ``"forecasted_covariances"`` -- array of shape ``(horizon, k, k)``. """ a = dcc_model["a"] b = dcc_model["b"] qbar = dcc_model["qbar"] std_resids = dcc_model["std_residuals"] cond_vols = dcc_model["conditional_vols"] garch_params = dcc_model["garch_params"] T, k = std_resids.shape c = 1 - a - b # Last Qt Qt = qbar.copy() for t in range(1, T): et_prev = std_resids[t - 1].reshape(-1, 1) Qt = c * qbar + a * (et_prev @ et_prev.T) + b * Qt # Forecast GARCH volatilities last_sigma2 = cond_vols[-1] ** 2 last_r2 = (std_resids[-1] * cond_vols[-1]) ** 2 # original returns squared forecasted_corrs = np.empty((horizon, k, k)) forecasted_covs = np.empty((horizon, k, k)) sigma2_forecast = last_sigma2.copy() for h in range(horizon): # GARCH vol forecast: sigma2_{T+h} = omega + (alpha+beta)*sigma2_{T+h-1} for j in range(k): gp = garch_params[j] if h == 0: sigma2_forecast[j] = ( gp["omega"] + gp["alpha"] * last_r2[j] + gp["beta"] * last_sigma2[j] ) else: sigma2_forecast[j] = ( gp["omega"] + (gp["alpha"] + gp["beta"]) * sigma2_forecast[j] ) # DCC correlation forecast: Qt -> Qbar # E[e_{t} e_{t}'] = Qbar (under stationarity) Qt = c * qbar + a * qbar + b * Qt # = (c+a)*qbar + b*Qt = (1-b)*qbar + b*Qt d = np.sqrt(np.diag(Qt)) D_inv = np.diag(1.0 / np.maximum(d, 1e-10)) Rt = D_inv @ Qt @ D_inv np.clip(Rt, -1, 1, out=Rt) np.fill_diagonal(Rt, 1.0) forecasted_corrs[h] = Rt # Covariance = D_sigma * R * D_sigma D_sigma = np.diag(np.sqrt(np.maximum(sigma2_forecast, 1e-15))) forecasted_covs[h] = D_sigma @ Rt @ D_sigma return { "forecasted_correlations": forecasted_corrs, "forecasted_covariances": forecasted_covs, }
# --------------------------------------------------------------------------- # Conditional covariance # ---------------------------------------------------------------------------
[docs] def conditional_covariance( returns: np.ndarray, dcc_params: dict[str, Any] | None = None, ) -> dict[str, Any]: """Compute time-varying covariance matrices from DCC-GARCH. If *dcc_params* is not supplied, the model is fitted to *returns* automatically. Parameters: returns: Array of shape ``(T, k)``. dcc_params: Pre-fitted DCC model dict (optional). Returns: Dict with keys: * ``"covariances"`` -- array of shape ``(T, k, k)`` with conditional covariance matrices. * ``"correlations"`` -- array of shape ``(T, k, k)`` with conditional correlation matrices. * ``"volatilities"`` -- array of shape ``(T, k)`` with conditional volatilities. """ from wraquant.core._coerce import coerce_array returns = np.asarray(returns, dtype=np.float64) if dcc_params is None: dcc_params = dcc_garch(returns) cond_vols = dcc_params["conditional_vols"] corrs = _compute_dcc_correlations(dcc_params) T, k = returns.shape covs = np.empty((T, k, k)) for t in range(T): D = np.diag(cond_vols[t]) covs[t] = D @ corrs[t] @ D return { "covariances": covs, "correlations": corrs, "volatilities": cond_vols, }