"""Pre-built quantitative finance workflows that chain wraquant modules.
These recipes show how the library's modules work together as a
cohesive framework rather than independent tools. Each recipe is a
complete pipeline that wires data through several wraquant subsystems
and returns a consolidated result dictionary.
Recipes are intentionally *thin* orchestration layers. The real logic
lives in the individual modules; recipes just sequence the calls,
align data, and assemble the outputs.
Example:
>>> import wraquant as wq
>>> result = wq.analyze(daily_returns)
>>> print(result["risk"]["sharpe"])
"""
from __future__ import annotations
from typing import Any
import numpy as np
import pandas as pd
# ---------------------------------------------------------------------------
# analyze -- "just give me everything"
# ---------------------------------------------------------------------------
[docs]
def analyze(
returns: pd.Series | pd.DataFrame,
benchmark: pd.Series | None = None,
) -> dict[str, Any]:
"""Quick comprehensive analysis of a return series.
The "just give me everything" function. Runs relevant analyses
from stats, risk, vol, regimes, and ts modules and returns
a comprehensive report.
Pipeline: returns -> descriptive stats -> risk metrics ->
distribution fit -> stationarity test -> (optional) regime
detection -> (optional) GARCH volatility -> (optional)
benchmark-relative metrics.
Chains: stats -> risk -> ts -> regimes -> vol.
Parameters:
returns: Return series or multi-asset DataFrame. If a
DataFrame is provided, the first column is used as the
primary series.
benchmark: Optional benchmark return series for relative
metrics (information ratio, beta).
Returns:
Dictionary with sections:
- **descriptive** -- mean, std, skew, kurtosis, min, max, count.
- **risk** -- sharpe, sortino, max_drawdown.
- **distribution** -- fitted normal params + KS test.
- **stationarity** -- ADF test statistic, p-value, is_stationary.
- **regime** *(optional, requires >= 100 obs)* -- current regime,
probabilities, n_regimes.
- **volatility** *(optional, requires >= 200 obs)* -- GARCH
persistence, half-life, current conditional vol.
- **relative** *(only when benchmark provided)* -- information
ratio, beta.
Example:
>>> import pandas as pd, numpy as np
>>> np.random.seed(42)
>>> rets = pd.Series(np.random.normal(0.0005, 0.01, 500))
>>> report = analyze(rets)
>>> sorted(report.keys()) # doctest: +NORMALIZE_WHITESPACE
['descriptive', 'distribution', 'risk', 'stationarity']
"""
from wraquant.risk.metrics import max_drawdown, sharpe_ratio, sortino_ratio
from wraquant.stats.descriptive import summary_stats
from wraquant.stats.distributions import fit_distribution
from wraquant.ts.stationarity import adf_test
if isinstance(returns, pd.DataFrame):
primary = returns.iloc[:, 0].dropna()
else:
primary = returns.dropna()
# Ensure we have a pd.Series for the functions that expect one
r_series = pd.Series(primary.values, index=primary.index, name="returns")
result: dict[str, Any] = {
"descriptive": summary_stats(r_series),
"risk": {
"sharpe": sharpe_ratio(r_series),
"sortino": sortino_ratio(r_series),
"max_drawdown": max_drawdown(
(1 + r_series).cumprod(),
),
},
"distribution": fit_distribution(r_series, dist="norm"),
"stationarity": adf_test(r_series),
}
# Optional: regime detection (may fail for short series or
# if hmmlearn is not installed)
try:
from wraquant.regimes.base import detect_regimes
if len(r_series) >= 100:
regime = detect_regimes(r_series.values, method="hmm", n_regimes=2)
result["regime"] = {
"current": regime.current_regime,
"probabilities": regime.current_probabilities.tolist(),
"n_regimes": regime.n_regimes,
}
except Exception: # noqa: BLE001
pass
# Optional: GARCH vol (requires arch)
try:
from wraquant.vol.models import garch_fit
if len(r_series) >= 200:
# arch expects percentage returns
garch = garch_fit(r_series * 100)
result["volatility"] = {
"persistence": garch["persistence"],
"half_life": garch["half_life"],
"current_vol": float(garch["conditional_volatility"].iloc[-1]) / 100,
}
except Exception: # noqa: BLE001
pass
# Benchmark-relative metrics
if benchmark is not None:
from wraquant.risk.metrics import information_ratio
b = benchmark.dropna()
n = min(len(r_series), len(b))
r_aligned = r_series.iloc[-n:].reset_index(drop=True)
b_aligned = b.iloc[-n:].reset_index(drop=True)
result["relative"] = {
"information_ratio": information_ratio(r_aligned, b_aligned),
"beta": float(
np.cov(r_aligned.values, b_aligned.values)[0, 1]
/ np.var(b_aligned.values)
),
}
return result
# ---------------------------------------------------------------------------
# regime_aware_backtest
# ---------------------------------------------------------------------------
[docs]
def regime_aware_backtest(
prices: pd.Series,
n_regimes: int = 2,
bull_weight: float = 1.0,
bear_weight: float = 0.0,
vol_target: float = 0.15,
) -> dict[str, Any]:
"""Full regime-aware backtest pipeline.
Pipeline: prices -> returns -> detect regimes -> compute regime
stats -> size positions by regime -> generate strategy returns ->
tearsheet + risk metrics.
Chains: data -> regimes -> backtest -> risk.
Parameters:
prices: Price series (e.g. adjusted close).
n_regimes: Number of market regimes (default 2: bull/bear).
bull_weight: Portfolio weight in the low-volatility (bull)
regime. 1.0 = fully invested.
bear_weight: Portfolio weight in the high-volatility (bear)
regime. 0.0 = flat.
vol_target: Annual volatility target for position sizing
(informational; not used for scaling in this recipe).
Returns:
Dictionary with:
- **regime_result** -- ``RegimeResult`` dataclass from
``wraquant.regimes.base``.
- **strategy_returns** -- pd.Series of regime-weighted strategy
returns.
- **tearsheet** -- comprehensive tearsheet dict from
``wraquant.backtest.tearsheet``.
- **regime_stats** -- per-regime summary DataFrame.
- **risk_metrics** -- dict with sharpe, sortino, max_drawdown.
Example:
>>> import pandas as pd, numpy as np
>>> np.random.seed(42)
>>> prices = pd.Series(
... np.cumprod(1 + np.random.normal(0.0003, 0.01, 500)),
... index=pd.bdate_range("2020-01-01", periods=500),
... )
>>> result = regime_aware_backtest(prices)
>>> "strategy_returns" in result
True
"""
from wraquant.backtest.position import regime_conditional_sizing
from wraquant.backtest.tearsheet import comprehensive_tearsheet
from wraquant.regimes.base import detect_regimes
from wraquant.risk.metrics import max_drawdown, sharpe_ratio, sortino_ratio
returns = prices.pct_change().dropna()
# 1. Detect regimes
regime_result = detect_regimes(returns.values, method="hmm", n_regimes=n_regimes)
# 2. Size positions by regime
# regime_conditional_sizing expects {str: float} dicts for both
# probabilities and multipliers.
base_weights = np.array([1.0])
regime_multipliers = {
f"regime_{k}": (bull_weight if k == 0 else bear_weight)
for k in range(n_regimes)
}
n = min(len(returns), len(regime_result.probabilities))
positions = np.empty(n)
for t in range(n):
probs_dict = {
f"regime_{k}": float(regime_result.probabilities[t, k])
for k in range(n_regimes)
}
sized = regime_conditional_sizing(
base_weights,
probs_dict,
regime_multipliers,
)
positions[t] = float(sized[0])
# 3. Generate strategy returns
strategy_returns = pd.Series(
returns.values[-n:] * positions,
index=returns.index[-n:],
name="regime_strategy",
)
# 4. Tearsheet + risk
tearsheet = comprehensive_tearsheet(strategy_returns)
equity_curve = (1 + strategy_returns).cumprod()
return {
"regime_result": regime_result,
"strategy_returns": strategy_returns,
"tearsheet": tearsheet,
"regime_stats": regime_result.statistics,
"risk_metrics": {
"sharpe": sharpe_ratio(strategy_returns),
"sortino": sortino_ratio(strategy_returns),
"max_drawdown": max_drawdown(equity_curve),
},
}
# ---------------------------------------------------------------------------
# garch_risk_pipeline
# ---------------------------------------------------------------------------
[docs]
def garch_risk_pipeline(
returns: pd.Series,
vol_model: str = "GJR",
dist: str = "t",
var_alpha: float = 0.05,
) -> dict[str, Any]:
"""GARCH volatility -> VaR/CVaR -> stress testing pipeline.
Pipeline: returns -> fit GARCH -> conditional vol -> time-varying
VaR -> news impact curve -> stress scenarios -> risk report.
Chains: vol -> risk -> stress.
Parameters:
returns: Simple return series (daily).
vol_model: GARCH variant -- ``"GARCH"``, ``"GJR"``, or
``"EGARCH"``.
dist: Error distribution for the GARCH model. ``"normal"``,
``"t"`` (Student-t), or ``"skewt"`` (skewed Student-t).
var_alpha: Significance level for VaR (0.05 = 95% VaR).
Returns:
Dictionary with:
- **garch** -- fitted GARCH result dict (params, conditional
vol, diagnostics).
- **var** -- time-varying VaR/CVaR result dict.
- **news_impact** -- news impact curve dict.
- **diagnostics** -- summary dict with persistence, half_life,
current_vol, breach_rate.
Example:
>>> import pandas as pd, numpy as np
>>> np.random.seed(42)
>>> rets = pd.Series(np.random.normal(0.0003, 0.01, 500))
>>> result = garch_risk_pipeline(rets)
>>> "garch" in result and "var" in result
True
"""
from wraquant.risk.var import garch_var
from wraquant.vol.models import (
egarch_fit,
garch_fit,
gjr_garch_fit,
news_impact_curve,
)
# 1. Fit GARCH
fit_fns = {"GARCH": garch_fit, "GJR": gjr_garch_fit, "EGARCH": egarch_fit}
fit_fn = fit_fns.get(vol_model.upper(), garch_fit)
garch_result = fit_fn(returns, dist=dist)
# 2. Time-varying VaR
var_result = garch_var(returns, vol_model=vol_model, dist=dist, alpha=var_alpha)
# 3. News impact curve
nic = news_impact_curve(returns.values, model_type=vol_model.lower())
return {
"garch": garch_result,
"var": var_result,
"news_impact": nic,
"diagnostics": {
"persistence": garch_result["persistence"],
"half_life": garch_result["half_life"],
"current_vol": float(garch_result["conditional_volatility"].iloc[-1]),
"breach_rate": var_result["breach_rate"],
},
}
# ---------------------------------------------------------------------------
# ml_alpha_pipeline
# ---------------------------------------------------------------------------
[docs]
def ml_alpha_pipeline(
prices_df: pd.DataFrame,
target_col: str,
model: str = "gradient_boost",
walk_forward_windows: int = 5,
) -> dict[str, Any]:
"""ML alpha research pipeline.
Pipeline: prices -> features -> walk-forward train/predict ->
evaluate -> feature importance.
Chains: ml/features -> ml/pipeline -> ml/advanced -> risk.
Parameters:
prices_df: Multi-asset price DataFrame (columns = tickers).
target_col: Column name of the target asset to predict.
model: Model type. Currently uses sklearn's
``GradientBoostingClassifier`` under the hood.
walk_forward_windows: Not used in current implementation
(walk-forward uses fixed train/test sizes).
Returns:
Dictionary with:
- **walk_forward** -- walk-forward backtest result dict
(predictions, actuals, pnl, sharpe, hit_rate, equity_curve).
- **feature_importance** -- random forest importance ranking
(or None if too few samples).
- **hit_rate** -- out-of-sample directional accuracy.
- **sharpe** -- out-of-sample Sharpe ratio.
Example:
>>> import pandas as pd, numpy as np
>>> np.random.seed(42)
>>> prices = pd.DataFrame({
... "SPY": np.cumprod(1 + np.random.normal(0.0003, 0.01, 600)),
... "TLT": np.cumprod(1 + np.random.normal(0.0001, 0.005, 600)),
... })
>>> result = ml_alpha_pipeline(prices, target_col="SPY")
>>> "walk_forward" in result
True
"""
from sklearn.ensemble import GradientBoostingClassifier
from wraquant.ml.advanced import random_forest_importance
from wraquant.ml.features import return_features, volatility_features
from wraquant.ml.pipeline import walk_forward_backtest
returns = prices_df.pct_change().dropna()
# 1. Build features (return_features expects a price series)
features = return_features(prices_df[target_col])
vol_feats = volatility_features(returns[target_col])
all_features = pd.concat([features, vol_feats], axis=1).dropna()
# 2. Align target -- binary classification: up/down
target = returns[target_col].reindex(all_features.index)
valid = all_features.index.intersection(target.dropna().index)
X = all_features.loc[valid]
y_binary = (target.loc[valid] > 0).astype(int)
# 3. Walk-forward backtest (expects a sklearn-compatible estimator)
clf = GradientBoostingClassifier(
n_estimators=100,
max_depth=3,
random_state=42,
)
wf_result = walk_forward_backtest(
model=clf,
X=X,
y=y_binary,
train_size=min(252, len(X) // 2),
test_size=21,
step_size=21,
)
# 4. Feature importance (only if enough data)
importance = None
if len(X) > 50:
importance = random_forest_importance(X, y_binary)
return {
"walk_forward": wf_result,
"feature_importance": importance,
"hit_rate": wf_result.get("hit_rate", 0),
"sharpe": wf_result.get("sharpe", 0),
}
# ---------------------------------------------------------------------------
# portfolio_construction_pipeline
# ---------------------------------------------------------------------------
[docs]
def portfolio_construction_pipeline(
returns_df: pd.DataFrame,
method: str = "risk_parity",
regime_aware: bool = True,
n_regimes: int = 2,
) -> dict[str, Any]:
"""Full portfolio construction pipeline.
Pipeline: returns -> covariance estimation -> optimize ->
(optional) regime adjust -> risk decomposition -> betas.
Chains: stats -> opt -> regimes -> risk/portfolio_analytics.
Parameters:
returns_df: Multi-asset return DataFrame (columns = tickers,
rows = daily returns).
method: Optimization method. ``"risk_parity"`` (default) or
``"mean_variance"``.
regime_aware: If True, adjust weights by current regime
probability (scales down in high-vol regimes).
n_regimes: Number of regimes for the optional regime adjustment.
Returns:
Dictionary with:
- **weights** -- dict mapping asset name to weight.
- **optimization** -- ``OptimizationResult`` dataclass.
- **component_var** -- per-asset VaR contribution (pd.Series).
- **diversification_ratio** -- portfolio diversification ratio.
- **betas** -- dict mapping asset name to rolling beta vs
first asset.
- **regime_adjusted** -- bool indicating whether regime
scaling was applied.
Example:
>>> import pandas as pd, numpy as np
>>> np.random.seed(42)
>>> rets = pd.DataFrame(
... np.random.randn(252, 3) * np.array([0.01, 0.02, 0.005]),
... columns=["Bonds", "Equity", "Gold"],
... )
>>> result = portfolio_construction_pipeline(rets, regime_aware=False)
>>> sum(result["weights"].values()) # doctest: +ELLIPSIS
1.0...
"""
from wraquant.opt.portfolio import mean_variance, risk_parity
from wraquant.risk.beta import rolling_beta
from wraquant.risk.portfolio_analytics import component_var, diversification_ratio
# 1. Optimize
if method == "risk_parity":
opt_result = risk_parity(returns_df)
else:
opt_result = mean_variance(returns_df)
weights = opt_result.weights.copy()
# 2. Regime adjustment (optional)
if regime_aware:
try:
from wraquant.regimes.base import detect_regimes
# Use first asset as market proxy
market = returns_df.iloc[:, 0].values
regime = detect_regimes(market, method="hmm", n_regimes=n_regimes)
current_prob = regime.current_probabilities
# Scale down in high-vol regime: probability of low-vol regime
vol_scale = float(current_prob[0])
weights = weights * (0.5 + 0.5 * vol_scale)
weights = weights / weights.sum() # renormalize
except Exception: # noqa: BLE001
# If regime detection fails, proceed without adjustment
regime_aware = False
# 3. Risk decomposition
cov = np.cov(returns_df.values, rowvar=False) * 252
comp_var = component_var(weights, returns_df)
div_ratio = diversification_ratio(weights, cov)
# 4. Rolling betas vs first asset
market_returns = returns_df.iloc[:, 0]
betas: dict[str, float] = {}
for col in returns_df.columns:
b = rolling_beta(returns_df[col], market_returns, window=60)
valid = b.dropna()
betas[col] = float(valid.iloc[-1]) if len(valid) > 0 else 1.0
return {
"weights": dict(zip(returns_df.columns, weights.tolist(), strict=False)),
"optimization": opt_result,
"component_var": comp_var,
"diversification_ratio": div_ratio,
"betas": betas,
"regime_adjusted": regime_aware,
}