"""Financial network analysis.
Build and analyse networks from correlation structures, Granger causality,
and other dependence measures. Includes systemic risk scoring and
contagion simulation.
"""
from __future__ import annotations
import numpy as np
import pandas as pd
from numpy.typing import ArrayLike
from scipy import linalg as sp_linalg
from wraquant.core._coerce import coerce_array, coerce_dataframe
__all__ = [
"correlation_network",
"minimum_spanning_tree",
"centrality_measures",
"community_detection",
"systemic_risk_score",
"contagion_simulation",
"granger_network",
]
# ---------------------------------------------------------------------------
# Network construction
# ---------------------------------------------------------------------------
[docs]
def correlation_network(
returns_df: pd.DataFrame,
threshold: float = 0.3,
) -> dict[str, np.ndarray | list[str]]:
"""Build an adjacency matrix from pairwise correlations.
Edges with absolute correlation below *threshold* are set to zero.
Parameters
----------
returns_df : pd.DataFrame
Asset returns, columns are assets.
threshold : float, optional
Minimum absolute correlation to retain an edge (default 0.3).
Returns
-------
dict
``adjacency`` – (n, n) adjacency matrix (0/1).
``correlation`` – (n, n) full correlation matrix.
``asset_names`` – list of column names.
Example
-------
>>> import pandas as pd, numpy as np
>>> np.random.seed(42)
>>> returns = pd.DataFrame(np.random.randn(252, 4),
... columns=['AAPL', 'MSFT', 'GOOG', 'AMZN'])
>>> net = correlation_network(returns, threshold=0.1)
>>> net['adjacency'].shape
(4, 4)
>>> net['asset_names']
['AAPL', 'MSFT', 'GOOG', 'AMZN']
See Also
--------
minimum_spanning_tree : Build an MST from the correlation matrix.
granger_network : Build a directed network from Granger causality.
centrality_measures : Rank nodes by importance within the network.
"""
returns_df = coerce_dataframe(returns_df, name="returns_df")
corr = returns_df.corr().values
n = corr.shape[0]
adj = (np.abs(corr) >= threshold).astype(float)
# Remove self-loops
np.fill_diagonal(adj, 0.0)
return {
"adjacency": adj,
"correlation": corr,
"asset_names": list(returns_df.columns),
}
[docs]
def minimum_spanning_tree(
correlation_matrix: ArrayLike,
) -> np.ndarray:
"""Compute the minimum spanning tree of a correlation matrix.
Uses the distance metric ``d = sqrt(2 * (1 - rho))`` and Prim's algorithm.
Parameters
----------
correlation_matrix : array_like
(n, n) symmetric correlation matrix.
Returns
-------
np.ndarray
(n, n) adjacency matrix of the MST (symmetric, 0/1 entries).
Example
-------
>>> import numpy as np
>>> from wraquant.math.network import minimum_spanning_tree
>>> corr = np.array([[1.0, 0.8, 0.3],
... [0.8, 1.0, 0.5],
... [0.3, 0.5, 1.0]])
>>> mst = minimum_spanning_tree(corr)
>>> mst.shape
(3, 3)
>>> int(mst.sum() / 2) # MST of 3 nodes has 2 edges
2
See Also
--------
correlation_network : Build a thresholded correlation network.
centrality_measures : Compute centrality on the resulting MST.
"""
corr = np.asarray(correlation_matrix, dtype=np.float64)
n = corr.shape[0]
# Distance from correlation
dist = np.sqrt(np.clip(2.0 * (1.0 - corr), 0.0, None))
np.fill_diagonal(dist, 0.0)
# Prim's algorithm
in_tree = np.zeros(n, dtype=bool)
mst_adj = np.zeros((n, n), dtype=float)
min_cost = np.full(n, np.inf)
min_edge = np.full(n, -1, dtype=int)
# Start from node 0
min_cost[0] = 0.0
for _ in range(n):
# Pick the cheapest node not yet in the tree
candidates = np.where(~in_tree)[0]
u = candidates[np.argmin(min_cost[candidates])]
in_tree[u] = True
if min_edge[u] >= 0:
mst_adj[u, min_edge[u]] = 1.0
mst_adj[min_edge[u], u] = 1.0
# Update costs
for v in range(n):
if not in_tree[v] and dist[u, v] < min_cost[v]:
min_cost[v] = dist[u, v]
min_edge[v] = u
return mst_adj
# ---------------------------------------------------------------------------
# Centrality
# ---------------------------------------------------------------------------
[docs]
def centrality_measures(
adjacency_matrix: ArrayLike,
asset_names: list[str] | None = None,
) -> dict[str, np.ndarray | list[str]]:
"""Compute degree, betweenness, and eigenvector centrality.
Parameters
----------
adjacency_matrix : array_like
(n, n) adjacency matrix (can be weighted; zeros = no edge).
asset_names : list of str or None, optional
Labels for the nodes; defaults to ``["0", "1", ...]``.
Returns
-------
dict
``degree`` – degree centrality (normalised). Higher values
indicate nodes connected to more of the network.
``betweenness`` – betweenness centrality. Higher values indicate
nodes that bridge different clusters.
``eigenvector`` – eigenvector centrality. Higher values indicate
nodes connected to other well-connected nodes.
``asset_names`` – node labels.
Example
-------
>>> import numpy as np
>>> from wraquant.math.network import centrality_measures
>>> adj = np.array([[0, 1, 1], [1, 0, 0], [1, 0, 0]])
>>> result = centrality_measures(adj, asset_names=['A', 'B', 'C'])
>>> result['degree'][0] # node A connected to both B and C
1.0
See Also
--------
correlation_network : Build the adjacency matrix from returns.
systemic_risk_score : Systemic importance via MES or CoVaR.
"""
adj = np.asarray(adjacency_matrix, dtype=np.float64)
n = adj.shape[0]
if asset_names is None:
asset_names = [str(i) for i in range(n)]
# --- Degree centrality (normalised) ---
degree = adj.sum(axis=1) / max(n - 1, 1)
# --- Betweenness centrality (Brandes-style BFS on unweighted) ---
binary = (adj != 0).astype(float)
betweenness = np.zeros(n)
for s in range(n):
# BFS from s
stack: list[int] = []
pred: list[list[int]] = [[] for _ in range(n)]
sigma = np.zeros(n)
sigma[s] = 1.0
dist_bfs = np.full(n, -1, dtype=int)
dist_bfs[s] = 0
queue = [s]
head = 0
while head < len(queue):
v = queue[head]
head += 1
stack.append(v)
neighbours = np.where(binary[v] > 0)[0]
for w in neighbours:
if dist_bfs[w] < 0:
dist_bfs[w] = dist_bfs[v] + 1
queue.append(w)
if dist_bfs[w] == dist_bfs[v] + 1:
sigma[w] += sigma[v]
pred[w].append(v)
delta = np.zeros(n)
while stack:
w = stack.pop()
for v in pred[w]:
c = (sigma[v] / sigma[w]) * (1.0 + delta[w])
delta[v] += c
if w != s:
betweenness[w] += delta[w]
# Normalise betweenness
norm_factor = max((n - 1) * (n - 2), 1)
betweenness /= norm_factor
# --- Eigenvector centrality (power iteration) ---
eig_cent = _eigenvector_centrality(adj)
return {
"degree": degree,
"betweenness": betweenness,
"eigenvector": eig_cent,
"asset_names": asset_names,
}
def _eigenvector_centrality(
adj: np.ndarray,
max_iter: int = 300,
tol: float = 1e-8,
) -> np.ndarray:
"""Power-iteration eigenvector centrality."""
n = adj.shape[0]
x = np.ones(n) / n
for _ in range(max_iter):
x_new = adj @ x
norm = np.linalg.norm(x_new)
if norm == 0:
return np.zeros(n)
x_new /= norm
if np.linalg.norm(x_new - x) < tol:
break
x = x_new
return x_new
# ---------------------------------------------------------------------------
# Community detection
# ---------------------------------------------------------------------------
def _kmeans(
X: np.ndarray,
k: int,
max_iter: int = 100,
seed: int = 0,
) -> np.ndarray:
"""Minimal k-means++ for spectral clustering."""
rng = np.random.default_rng(seed)
n = X.shape[0]
# k-means++ initialization
centroids = np.empty((k, X.shape[1]))
centroids[0] = X[rng.integers(n)]
for c in range(1, k):
dists = np.min(
np.linalg.norm(X[:, None, :] - centroids[None, :c, :], axis=2), axis=1
)
probs = dists**2
total = probs.sum()
probs = probs / total if total > 0 else np.ones(n) / n
centroids[c] = X[rng.choice(n, p=probs)]
labels = np.zeros(n, dtype=int)
for _ in range(max_iter):
# Assign
dists = np.linalg.norm(X[:, None, :] - centroids[None, :, :], axis=2)
new_labels = np.argmin(dists, axis=1)
if np.array_equal(new_labels, labels):
break
labels = new_labels
# Update centroids
for c in range(k):
members = X[labels == c]
if len(members) > 0:
centroids[c] = members.mean(axis=0)
return labels
# ---------------------------------------------------------------------------
# Systemic risk
# ---------------------------------------------------------------------------
[docs]
def systemic_risk_score(
returns_df: pd.DataFrame,
method: str = "mes",
quantile: float = 0.05,
) -> pd.Series:
"""Compute a systemic importance score for each asset.
Parameters
----------
returns_df : pd.DataFrame
Asset returns, columns are assets.
method : {'mes', 'covar', 'connectedness'}, optional
Scoring method:
- ``'mes'`` – Marginal Expected Shortfall: expected loss of asset
conditional on market being in its lower *quantile* tail.
- ``'covar'`` – Delta CoVaR: change in system VaR conditional on
asset being at its VaR vs median.
- ``'connectedness'`` – Diebold-Yilmaz–style total connectedness
from variance decomposition (simplified).
quantile : float, optional
Tail quantile for MES / CoVaR (default 0.05).
Returns
-------
pd.Series
Systemic risk score for each asset (higher = more systemic).
Raises
------
ValueError
If *method* is not recognised.
Example
-------
>>> import pandas as pd, numpy as np
>>> np.random.seed(42)
>>> returns = pd.DataFrame(np.random.randn(252, 3) * 0.01,
... columns=['SPY', 'QQQ', 'IWM'])
>>> scores = systemic_risk_score(returns, method='mes', quantile=0.05)
>>> len(scores) == 3
True
See Also
--------
contagion_simulation : Simulate shock propagation through a network.
centrality_measures : Network centrality as an alternative importance measure.
wraquant.risk.metrics : VaR and CVaR risk metrics.
"""
returns_df = coerce_dataframe(returns_df, name="returns_df")
if method == "mes":
return _marginal_expected_shortfall(returns_df, quantile)
elif method == "covar":
return _delta_covar(returns_df, quantile)
elif method == "connectedness":
return _connectedness_score(returns_df)
else:
raise ValueError(
f"Unknown method {method!r}; use 'mes', 'covar', or 'connectedness'."
)
def _marginal_expected_shortfall(
returns_df: pd.DataFrame,
quantile: float,
) -> pd.Series:
"""MES: expected asset return when the market is in the tail."""
market = returns_df.mean(axis=1)
threshold = np.quantile(market.values, quantile)
tail_mask = market <= threshold
mes = returns_df[tail_mask].mean()
# More negative MES = higher systemic risk, so negate for scoring
return -mes
def _delta_covar(
returns_df: pd.DataFrame,
quantile: float,
) -> pd.Series:
"""Simplified Delta CoVaR."""
market = returns_df.mean(axis=1).values
scores = {}
for col in returns_df.columns:
asset = returns_df[col].values
# VaR of market conditional on asset being at its VaR
asset_var = np.quantile(asset, quantile)
stress_mask = asset <= asset_var
median_mask = (asset >= np.quantile(asset, 0.45)) & (
asset <= np.quantile(asset, 0.55)
)
if stress_mask.sum() > 0 and median_mask.sum() > 0:
covar_stress = np.quantile(market[stress_mask], quantile)
covar_median = np.quantile(market[median_mask], quantile)
scores[col] = abs(covar_stress - covar_median)
else:
scores[col] = 0.0
return pd.Series(scores)
def _connectedness_score(returns_df: pd.DataFrame) -> pd.Series:
"""Simplified Diebold-Yilmaz connectedness based on correlation."""
corr = returns_df.corr().values
n = corr.shape[0]
# Total connectedness: sum of off-diagonal squared correlations per asset
sq = corr ** 2
np.fill_diagonal(sq, 0.0)
scores = sq.sum(axis=1) / max(n - 1, 1)
return pd.Series(scores, index=returns_df.columns)
# ---------------------------------------------------------------------------
# Contagion simulation
# ---------------------------------------------------------------------------
[docs]
def contagion_simulation(
adjacency_matrix: ArrayLike,
shock_node: int,
shock_magnitude: float = 1.0,
threshold: float = 0.5,
max_rounds: int = 100,
) -> dict[str, np.ndarray | int]:
"""Simulate shock propagation through a financial network.
At each round, a node that receives total stress above *threshold*
"defaults" and propagates the shock to its neighbours weighted by
the adjacency matrix.
Parameters
----------
adjacency_matrix : array_like
(n, n) weighted adjacency matrix.
shock_node : int
Index of the initially shocked node.
shock_magnitude : float, optional
Size of the initial shock (default 1.0).
threshold : float, optional
Stress threshold for contagion (default 0.5).
max_rounds : int, optional
Maximum propagation rounds (default 100).
Returns
-------
dict
``defaulted`` – boolean array of which nodes defaulted.
``stress`` – cumulative stress on each node.
``rounds`` – number of propagation rounds.
``cascade_size`` – total number of defaults. A cascade_size
close to *n* indicates high systemic fragility.
Example
-------
>>> import numpy as np
>>> from wraquant.math.network import contagion_simulation
>>> adj = np.array([[0, 0.6, 0.3],
... [0.4, 0, 0.5],
... [0.2, 0.3, 0]], dtype=float)
>>> result = contagion_simulation(adj, shock_node=0, shock_magnitude=1.0,
... threshold=0.5)
>>> result['defaulted'][0]
True
>>> result['cascade_size'] >= 1
True
See Also
--------
systemic_risk_score : Identify which nodes are most systemically important.
correlation_network : Build the adjacency matrix from return data.
"""
adj = np.asarray(adjacency_matrix, dtype=np.float64)
n = adj.shape[0]
stress = np.zeros(n)
defaulted = np.zeros(n, dtype=bool)
# Initial shock
stress[shock_node] = shock_magnitude
defaulted[shock_node] = True
rounds = 0
for _ in range(max_rounds):
# Find newly defaulted nodes (stress >= threshold, not already defaulted)
new_defaults = (~defaulted) & (stress >= threshold)
if not new_defaults.any():
break
rounds += 1
defaulted |= new_defaults
# Propagate shock from newly defaulted nodes
for node in np.where(new_defaults)[0]:
stress += adj[node] * stress[node]
return {
"defaulted": defaulted,
"stress": stress,
"rounds": rounds,
"cascade_size": int(defaulted.sum()),
}
# ---------------------------------------------------------------------------
# Granger causality network
# ---------------------------------------------------------------------------
[docs]
def granger_network(
returns_df: pd.DataFrame,
max_lag: int = 5,
significance: float = 0.05,
) -> dict[str, np.ndarray | list[str]]:
"""Build a directed network from pairwise Granger causality tests.
For each pair (i, j) test whether lagged values of *j* help predict *i*
beyond *i*'s own lags, using an F-test.
Parameters
----------
returns_df : pd.DataFrame
Asset returns, columns are assets.
max_lag : int, optional
Maximum lag order (default 5).
significance : float, optional
p-value threshold for a significant Granger-causal link (default 0.05).
Returns
-------
dict
``adjacency`` – (n, n) directed adjacency matrix; entry (i, j) = 1
means *j* Granger-causes *i*.
``pvalues`` – (n, n) matrix of p-values.
``asset_names`` – column names.
Example
-------
>>> import pandas as pd, numpy as np
>>> np.random.seed(42)
>>> x = np.random.randn(200)
>>> y = np.concatenate([[0], x[:-1]]) + np.random.randn(200) * 0.1
>>> df = pd.DataFrame({'X': x, 'Y': y})
>>> result = granger_network(df, max_lag=3, significance=0.05)
>>> result['adjacency'].shape
(2, 2)
See Also
--------
correlation_network : Undirected network from correlations.
wraquant.math.information.transfer_entropy : Information-theoretic causality.
"""
from scipy.stats import f as f_dist
returns_df = coerce_dataframe(returns_df, name="returns_df")
cols = list(returns_df.columns)
n = len(cols)
data = returns_df.values
T = data.shape[0]
pvalues = np.ones((n, n))
adjacency = np.zeros((n, n))
for i in range(n):
for j in range(n):
if i == j:
continue
pval = _granger_f_test(data[:, i], data[:, j], max_lag, T)
pvalues[i, j] = pval
if pval < significance:
adjacency[i, j] = 1.0
return {
"adjacency": adjacency,
"pvalues": pvalues,
"asset_names": cols,
}
def _granger_f_test(
y: np.ndarray,
x: np.ndarray,
max_lag: int,
T: int,
) -> float:
"""F-test for Granger causality: does *x* Granger-cause *y*?"""
from scipy.stats import f as f_dist
# Build lagged matrices
n_obs = T - max_lag
if n_obs <= 2 * max_lag + 1:
return 1.0 # Not enough data
Y = y[max_lag:]
# Restricted model: y ~ y_lags
X_r = np.column_stack(
[y[max_lag - lag: T - lag] for lag in range(1, max_lag + 1)]
)
X_r = np.column_stack([np.ones(n_obs), X_r])
# Unrestricted model: y ~ y_lags + x_lags
X_u = np.column_stack(
[y[max_lag - lag: T - lag] for lag in range(1, max_lag + 1)]
+ [x[max_lag - lag: T - lag] for lag in range(1, max_lag + 1)]
)
X_u = np.column_stack([np.ones(n_obs), X_u])
# OLS residuals
try:
from wraquant.stats.regression import ols
result_r = ols(Y, X_r, add_constant=False)
resid_r = result_r["residuals"]
ssr_r = resid_r @ resid_r
result_u = ols(Y, X_u, add_constant=False)
resid_u = result_u["residuals"]
ssr_u = resid_u @ resid_u
except (np.linalg.LinAlgError, Exception):
return 1.0
df1 = max_lag
df2 = n_obs - X_u.shape[1]
if df2 <= 0 or ssr_u <= 0:
return 1.0
f_stat = ((ssr_r - ssr_u) / df1) / (ssr_u / df2)
pval = 1.0 - f_dist.cdf(f_stat, df1, df2)
return float(pval)