"""Historical crisis analysis and drawdown attribution.
This module provides tools for analysing portfolio behaviour during
historical crises, measuring event impacts, quantifying contagion,
and attributing drawdowns to individual assets.
These functions complement the stress testing module (``risk.stress``)
by focusing on *what actually happened* rather than hypothetical
scenarios. Use them for:
- Post-mortem analysis: understand what drove past losses.
- Regime-aware portfolio construction: identify assets that provide
protection in crises.
- Contagion monitoring: detect when correlations spike during stress.
- Investor reporting: show drawdown history with recovery timelines.
References:
- Forbes & Rigobon (2002), "No Contagion, Only Interdependence"
- Bacon (2008), "Practical Portfolio Performance Measurement and
Attribution"
"""
from __future__ import annotations
from typing import Any
import numpy as np
import pandas as pd
from wraquant.risk.metrics import max_drawdown as _max_drawdown
[docs]
def crisis_drawdowns(
returns: pd.Series,
top_n: int = 5,
) -> pd.DataFrame:
"""Identify the top N drawdowns with full lifecycle metrics.
Scans the return series for the largest peak-to-trough drawdowns and
reports start date, trough date, recovery date, duration, and
magnitude for each.
When to use:
Use crisis drawdowns for:
- Investor reporting: show the worst historical losses and
recovery times.
- Strategy evaluation: compare drawdown profiles across strategies.
- Risk limit calibration: set max drawdown limits based on
historical experience.
Parameters:
returns: Simple return series with a DatetimeIndex (or integer
index).
top_n: Number of largest drawdowns to return.
Returns:
pd.DataFrame with columns:
- **start** -- Date the drawdown began (peak).
- **trough** -- Date of maximum drawdown.
- **end** -- Date the drawdown recovered (or last date if
still in drawdown).
- **drawdown** -- Magnitude of drawdown (negative number).
- **days_to_trough** -- Trading days from start to trough.
- **days_to_recovery** -- Trading days from trough to recovery
(NaN if not recovered).
- **total_days** -- Total drawdown duration.
Example:
>>> import pandas as pd, numpy as np
>>> np.random.seed(42)
>>> idx = pd.bdate_range("2020-01-01", periods=500)
>>> returns = pd.Series(np.random.normal(0.0003, 0.01, 500), index=idx)
>>> dd = crisis_drawdowns(returns, top_n=3)
>>> len(dd) <= 3
True
See Also:
drawdown_attribution: Which assets caused the drawdowns.
wraquant.risk.metrics.max_drawdown: Single worst drawdown.
"""
clean = returns.dropna()
cum = (1 + clean).cumprod()
running_max = cum.cummax()
drawdowns = (cum - running_max) / running_max
# Find drawdown periods
in_drawdown = drawdowns < 0
records: list[dict[str, Any]] = []
i = 0
n = len(drawdowns)
while i < n:
if in_drawdown.iloc[i]:
start_idx = i - 1 if i > 0 else 0
# Find trough
j = i
while j < n and in_drawdown.iloc[j]:
j += 1
# j is now the first index where drawdown == 0 (recovery) or end
trough_pos = int(drawdowns.iloc[start_idx:j].argmin()) + start_idx
dd_val = float(drawdowns.iloc[trough_pos])
start_date = drawdowns.index[start_idx]
trough_date = drawdowns.index[trough_pos]
if j < n:
end_date = drawdowns.index[j]
days_to_recovery = j - trough_pos
total_days = j - start_idx
else:
end_date = drawdowns.index[-1]
days_to_recovery = float("nan")
total_days = n - 1 - start_idx
records.append(
{
"start": start_date,
"trough": trough_date,
"end": end_date,
"drawdown": dd_val,
"days_to_trough": trough_pos - start_idx,
"days_to_recovery": days_to_recovery,
"total_days": total_days,
}
)
i = j
else:
i += 1
if not records:
return pd.DataFrame(
columns=[
"start",
"trough",
"end",
"drawdown",
"days_to_trough",
"days_to_recovery",
"total_days",
]
)
df = pd.DataFrame(records)
df = df.sort_values("drawdown", ascending=True).head(top_n).reset_index(drop=True)
return df
[docs]
def event_impact(
returns: pd.Series,
event_dates: list[str],
window: int = 10,
) -> dict[str, Any]:
"""Measure portfolio returns around specific events.
For each event date, extracts the returns in a window before and
after the event and computes cumulative return, max drawdown, and
volatility within each window.
When to use:
Use event impact analysis for:
- Post-mortem: "how did the portfolio react to the Fed rate hike?"
- Event studies: systematic analysis of recurring events
(earnings, FOMC, NFP).
- Scenario planning: calibrate stress scenarios based on actual
event impacts.
Parameters:
returns: Return series with a DatetimeIndex.
event_dates: List of event date strings (ISO format, e.g.,
"2020-03-16"). Dates not in the index are matched to the
nearest available date.
window: Number of trading days before and after the event to
analyse.
Returns:
Dictionary mapping each event date string to a dict with:
- **pre_cumulative** (*float*) -- Cumulative return in the window
before the event.
- **post_cumulative** (*float*) -- Cumulative return in the window
after the event.
- **event_day_return** (*float*) -- Return on the event day itself.
- **pre_vol** (*float*) -- Volatility in the pre-event window.
- **post_vol** (*float*) -- Volatility in the post-event window.
- **total_impact** (*float*) -- Cumulative return over the full
window (pre + event + post).
Example:
>>> import pandas as pd, numpy as np
>>> np.random.seed(42)
>>> idx = pd.bdate_range("2020-01-01", periods=252)
>>> returns = pd.Series(np.random.normal(0.0005, 0.01, 252), index=idx)
>>> result = event_impact(returns, ["2020-03-16", "2020-06-15"], window=5)
>>> len(result) >= 1
True
See Also:
wraquant.risk.stress.historical_stress_test: Replay known crises.
crisis_drawdowns: Top drawdown periods.
"""
results: dict[str, dict[str, float]] = {}
ret = returns.dropna()
for date_str in event_dates:
target = pd.Timestamp(date_str)
# Find nearest date in index
if hasattr(ret.index, "get_indexer"):
idx = ret.index.get_indexer([target], method="nearest")[0]
else:
idx = int(np.argmin(np.abs(ret.index - target)))
if idx < 0 or idx >= len(ret):
continue
# Pre-event window
pre_start = max(0, idx - window)
pre_slice = ret.iloc[pre_start:idx]
# Post-event window
post_end = min(len(ret), idx + window + 1)
post_slice = ret.iloc[idx + 1 : post_end]
# Event day
event_return = float(ret.iloc[idx])
pre_cum = (
float(np.prod(1 + pre_slice.values) - 1) if len(pre_slice) > 0 else 0.0
)
post_cum = (
float(np.prod(1 + post_slice.values) - 1) if len(post_slice) > 0 else 0.0
)
pre_vol = float(pre_slice.std()) if len(pre_slice) > 1 else 0.0
post_vol = float(post_slice.std()) if len(post_slice) > 1 else 0.0
# Total impact over full window
full_slice = ret.iloc[pre_start:post_end]
total = (
float(np.prod(1 + full_slice.values) - 1) if len(full_slice) > 0 else 0.0
)
results[date_str] = {
"pre_cumulative": pre_cum,
"post_cumulative": post_cum,
"event_day_return": event_return,
"pre_vol": pre_vol,
"post_vol": post_vol,
"total_impact": total,
}
return results
[docs]
def contagion_analysis(
returns_df: pd.DataFrame,
crisis_dates: tuple[str, str],
) -> dict[str, Any]:
"""Compare normal vs. crisis-period correlations to detect contagion.
Contagion occurs when correlations increase during stress periods
beyond what would be expected from higher volatility alone. This
function computes the correlation matrix in normal and crisis periods
and tests for statistically significant increases.
When to use:
Use contagion analysis for:
- Evaluating diversification reliability: do correlations spike
when you need diversification most?
- Stress testing: adjust portfolio correlations based on
empirically observed crisis behaviour.
- Regime-aware portfolio construction: allocate less to assets
that become highly correlated during crises.
Parameters:
returns_df: Multi-asset return DataFrame with DatetimeIndex.
crisis_dates: Tuple of (start_date, end_date) strings defining
the crisis period.
Returns:
Dictionary containing:
- **normal_corr** (*pd.DataFrame*) -- Correlation matrix during
non-crisis period.
- **crisis_corr** (*pd.DataFrame*) -- Correlation matrix during
the crisis period.
- **corr_change** (*pd.DataFrame*) -- Change in correlation
(crisis - normal).
- **avg_normal_corr** (*float*) -- Average off-diagonal correlation
in normal period.
- **avg_crisis_corr** (*float*) -- Average off-diagonal correlation
in crisis period.
- **contagion_detected** (*bool*) -- True if average crisis
correlation significantly exceeds normal.
- **n_normal** (*int*) -- Number of normal-period observations.
- **n_crisis** (*int*) -- Number of crisis-period observations.
Example:
>>> import pandas as pd, numpy as np
>>> np.random.seed(42)
>>> idx = pd.bdate_range("2019-01-01", periods=500)
>>> returns = pd.DataFrame({
... "A": np.random.normal(0.0005, 0.01, 500),
... "B": np.random.normal(0.0003, 0.012, 500),
... }, index=idx)
>>> result = contagion_analysis(returns, ("2020-02-01", "2020-06-01"))
>>> "contagion_detected" in result
True
See Also:
wraquant.risk.stress.joint_stress_test: Apply correlation shocks.
References:
- Forbes & Rigobon (2002), "No Contagion, Only Interdependence:
Measuring Stock Market Comovements"
"""
start = pd.Timestamp(crisis_dates[0])
end = pd.Timestamp(crisis_dates[1])
crisis_mask = (returns_df.index >= start) & (returns_df.index <= end)
normal_mask = ~crisis_mask
crisis_returns = returns_df[crisis_mask].dropna()
normal_returns = returns_df[normal_mask].dropna()
crisis_corr = crisis_returns.corr()
normal_corr = normal_returns.corr()
corr_change = crisis_corr - normal_corr
n = len(returns_df.columns)
def _avg_offdiag(corr_df: pd.DataFrame) -> float:
"""Mean of off-diagonal elements."""
vals = corr_df.values
mask = ~np.eye(n, dtype=bool)
if mask.sum() == 0:
return 0.0
return float(np.mean(vals[mask]))
avg_normal = _avg_offdiag(normal_corr)
avg_crisis = _avg_offdiag(crisis_corr)
# Simple heuristic: contagion detected if crisis correlation is
# substantially higher (> 0.1 absolute increase)
contagion = avg_crisis - avg_normal > 0.1
return {
"normal_corr": normal_corr,
"crisis_corr": crisis_corr,
"corr_change": corr_change,
"avg_normal_corr": avg_normal,
"avg_crisis_corr": avg_crisis,
"contagion_detected": contagion,
"n_normal": int(normal_mask.sum()),
"n_crisis": int(crisis_mask.sum()),
}
[docs]
def drawdown_attribution(
returns_df: pd.DataFrame,
weights: np.ndarray,
) -> pd.DataFrame:
"""Attribute portfolio drawdowns to individual asset contributions.
For each point in the drawdown, decomposes the portfolio's loss from
peak into per-asset contributions. This shows which assets are
responsible for the drawdown at each point in time.
When to use:
Use drawdown attribution for:
- Post-mortem analysis: "which position caused the 2020 drawdown?"
- Risk monitoring: track per-asset drawdown contributions in
real time.
- Portfolio construction: identify assets that consistently
contribute to drawdowns and consider hedging or removing them.
Parameters:
returns_df: Multi-asset return DataFrame (columns = assets).
weights: Portfolio weight vector aligned with columns.
Returns:
pd.DataFrame with:
- **portfolio_dd** -- Total portfolio drawdown at each point.
- One column per asset showing that asset's contribution to
the drawdown.
Example:
>>> import pandas as pd, numpy as np
>>> np.random.seed(42)
>>> idx = pd.bdate_range("2020-01-01", periods=252)
>>> returns = pd.DataFrame({
... "A": np.random.normal(0.0005, 0.01, 252),
... "B": np.random.normal(0.0003, 0.015, 252),
... }, index=idx)
>>> weights = np.array([0.6, 0.4])
>>> attr = drawdown_attribution(returns, weights)
>>> "portfolio_dd" in attr.columns
True
See Also:
crisis_drawdowns: Identify top drawdown periods.
wraquant.risk.stress.marginal_stress_contribution: Stress-based
attribution.
"""
clean = returns_df.dropna()
assets = clean.columns.tolist()
# Portfolio returns
port_returns = clean.values @ weights
# Portfolio cumulative and drawdowns
port_cum = np.cumprod(1 + port_returns)
port_running_max = np.maximum.accumulate(port_cum)
port_dd = (port_cum - port_running_max) / port_running_max
# Per-asset cumulative weighted returns
weighted_returns = clean.values * weights[np.newaxis, :]
asset_cum = np.cumsum(weighted_returns, axis=0)
# Attribution: per-asset contribution to drawdown
# We track cumulative weighted return since the peak
n = len(clean)
peak_idx = 0
contributions = np.zeros((n, len(assets)))
cum_port = np.cumsum(port_returns)
running_max_cum = np.maximum.accumulate(cum_port)
for t in range(n):
# Find the most recent peak
if t > 0 and cum_port[t - 1] >= running_max_cum[t - 1] - 1e-15:
peak_idx = t
# Asset contribution = cumulative weighted return since peak
if t > peak_idx:
contributions[t] = asset_cum[t] - asset_cum[peak_idx]
elif t == peak_idx and t > 0:
contributions[t] = 0.0
result = pd.DataFrame(
contributions, index=clean.index, columns=[f"{a}_contribution" for a in assets]
)
result.insert(0, "portfolio_dd", port_dd)
return result