"""Lévy processes for fat-tailed asset models.
Provides simulation, density evaluation, and calibration for:
- Variance Gamma (VG)
- Normal Inverse Gaussian (NIG)
- CGMY
- Stable Lévy processes
All implementations are pure numpy/scipy.
"""
from __future__ import annotations
import numpy as np
from numpy.typing import ArrayLike
from scipy.optimize import minimize
from scipy.special import kv as bessel_kv # modified Bessel K_v
from scipy.stats import norm
from wraquant.core._coerce import coerce_array
__all__ = [
"variance_gamma_pdf",
"variance_gamma_simulate",
"nig_pdf",
"nig_simulate",
"cgmy_simulate",
"fit_variance_gamma",
"fit_nig",
"levy_stable_simulate",
"characteristic_function_vg",
]
# ---------------------------------------------------------------------------
# Variance Gamma
# ---------------------------------------------------------------------------
[docs]
def variance_gamma_pdf(
x: ArrayLike,
sigma: float,
nu: float,
theta: float,
) -> np.ndarray:
r"""Evaluate the Variance Gamma probability density function.
.. math::
f(x) = \frac{2\,e^{\theta\,x / \sigma^2}}
{\nu^{1/\nu}\,\sqrt{2\pi}\,\sigma\,\Gamma(1/\nu)}
\left(\frac{x^2}{2\sigma^2/\nu + \theta^2}\right)^{1/(2\nu) - 1/4}
K_{1/\nu - 1/2}\!\left(\frac{1}{\sigma^2}
\sqrt{x^2(2\sigma^2/\nu + \theta^2)}\right)
Parameters
----------
x : array_like
Points at which to evaluate the PDF.
sigma : float
Volatility of the Brownian motion component (> 0).
nu : float
Variance rate of the Gamma subordinator (> 0).
theta : float
Drift of the Brownian motion component (controls skewness).
Returns
-------
np.ndarray
PDF values.
Example
-------
>>> import numpy as np
>>> from wraquant.math.levy import variance_gamma_pdf
>>> x = np.linspace(-0.05, 0.05, 5)
>>> pdf = variance_gamma_pdf(x, sigma=0.01, nu=0.5, theta=-0.001)
>>> pdf.shape
(5,)
>>> pdf[2] > pdf[0] # peak near center
True
See Also
--------
variance_gamma_simulate : Simulate paths from a VG process.
fit_variance_gamma : Calibrate VG parameters to return data.
nig_pdf : Normal Inverse Gaussian PDF (alternative fat-tailed model).
"""
from scipy.special import gamma as gamma_fn
x = coerce_array(x, name="x")
sigma2 = sigma ** 2
# Parameters for the Bessel representation
lam = 1.0 / nu
alpha_param = theta / sigma2
beta_param = np.sqrt(theta ** 2 + 2.0 * sigma2 / nu) / sigma2
order = lam - 0.5
abs_x = np.abs(x)
# Avoid division by zero at x=0
safe_x = np.where(abs_x > 0, abs_x, 1e-300)
log_pdf = (
np.log(2.0)
+ alpha_param * x
+ np.log(safe_x) * (lam - 0.5)
- (lam * np.log(nu) + 0.5 * np.log(2.0 * np.pi) + np.log(sigma)
+ np.log(gamma_fn(lam)))
+ np.log(np.maximum(bessel_kv(order, beta_param * safe_x), 1e-300))
- (lam - 0.5) * np.log(sigma2 * beta_param)
)
pdf = np.exp(log_pdf)
# Handle x = 0 gracefully
pdf = np.where(np.isfinite(pdf), pdf, 0.0)
return pdf
[docs]
def variance_gamma_simulate(
sigma: float,
nu: float,
theta: float,
n_steps: int,
dt: float = 1.0 / 252,
seed: int | None = None,
) -> np.ndarray:
"""Simulate a Variance Gamma process via time-changed Brownian motion.
X(t) = theta * G(t) + sigma * W(G(t))
where G is a Gamma subordinator with shape = dt/nu and scale = nu.
Parameters
----------
sigma : float
Volatility parameter (> 0).
nu : float
Variance rate of the Gamma time change (> 0).
theta : float
Drift parameter (skewness).
n_steps : int
Number of time steps.
dt : float, optional
Time increment (default 1/252 for daily).
seed : int or None, optional
Random seed.
Returns
-------
np.ndarray
Cumulative VG process values of length *n_steps + 1* (starts at 0).
Example
-------
>>> from wraquant.math.levy import variance_gamma_simulate
>>> path = variance_gamma_simulate(sigma=0.01, nu=0.5, theta=-0.001,
... n_steps=252, seed=42)
>>> path.shape
(253,)
>>> path[0]
0.0
See Also
--------
variance_gamma_pdf : Evaluate the VG density.
nig_simulate : Simulate a Normal Inverse Gaussian process.
cgmy_simulate : Simulate a CGMY process (generalises VG).
"""
rng = np.random.default_rng(seed)
shape = dt / nu
scale = nu
# Gamma increments
dg = rng.gamma(shape, scale, size=n_steps)
# Brownian increments subordinated by Gamma
dw = rng.standard_normal(n_steps)
increments = theta * dg + sigma * np.sqrt(dg) * dw
return np.concatenate([[0.0], np.cumsum(increments)])
[docs]
def characteristic_function_vg(
u: ArrayLike,
sigma: float,
nu: float,
theta: float,
) -> np.ndarray:
r"""Characteristic function of the Variance Gamma distribution at time 1.
.. math::
\phi(u) = \left(1 - i\,\theta\,\nu\,u
+ \tfrac{1}{2}\sigma^2\nu\,u^2\right)^{-1/\nu}
Parameters
----------
u : array_like
Fourier variable(s).
sigma : float
Volatility parameter.
nu : float
Variance rate parameter.
theta : float
Drift parameter.
Returns
-------
np.ndarray
Complex-valued characteristic function values.
Example
-------
>>> import numpy as np
>>> from wraquant.math.levy import characteristic_function_vg
>>> phi = characteristic_function_vg(np.array([0.0, 1.0]), sigma=0.01,
... nu=0.5, theta=-0.001)
>>> abs(phi[0]) # phi(0) = 1 for any characteristic function
1.0
See Also
--------
variance_gamma_pdf : Density evaluation (Fourier-inversion counterpart).
"""
u = np.asarray(u, dtype=complex)
base = 1.0 - 1j * theta * nu * u + 0.5 * sigma ** 2 * nu * u ** 2
return base ** (-1.0 / nu)
# ---------------------------------------------------------------------------
# Normal Inverse Gaussian
# ---------------------------------------------------------------------------
[docs]
def nig_pdf(
x: ArrayLike,
alpha: float,
beta: float,
mu: float,
delta: float,
) -> np.ndarray:
r"""Evaluate the Normal Inverse Gaussian probability density function.
.. math::
f(x) = \frac{\alpha\,\delta}{\pi}
\exp\!\left(\delta\sqrt{\alpha^2 - \beta^2}
+ \beta(x - \mu)\right)
\frac{K_1\!\left(\alpha\sqrt{\delta^2 + (x-\mu)^2}\right)}
{\sqrt{\delta^2 + (x - \mu)^2}}
Parameters
----------
x : array_like
Points at which to evaluate the PDF.
alpha : float
Tail heaviness parameter (> 0, alpha > |beta|).
beta : float
Skewness parameter (-alpha < beta < alpha).
mu : float
Location parameter.
delta : float
Scale parameter (> 0).
Returns
-------
np.ndarray
PDF values.
Example
-------
>>> import numpy as np
>>> from wraquant.math.levy import nig_pdf
>>> x = np.linspace(-0.05, 0.05, 5)
>>> pdf = nig_pdf(x, alpha=50.0, beta=-5.0, mu=0.0, delta=0.01)
>>> pdf.shape
(5,)
>>> (pdf >= 0).all()
True
See Also
--------
nig_simulate : Simulate paths from an NIG process.
fit_nig : Calibrate NIG parameters to return data.
variance_gamma_pdf : VG density (alternative fat-tailed model).
"""
x = coerce_array(x, name="x")
gamma_param = np.sqrt(alpha ** 2 - beta ** 2)
q = np.sqrt(delta ** 2 + (x - mu) ** 2)
log_pdf = (
np.log(alpha * delta / np.pi)
+ delta * gamma_param
+ beta * (x - mu)
+ np.log(np.maximum(bessel_kv(1.0, alpha * q), 1e-300))
- np.log(q)
)
pdf = np.exp(log_pdf)
pdf = np.where(np.isfinite(pdf), pdf, 0.0)
return pdf
[docs]
def nig_simulate(
alpha: float,
beta: float,
mu: float,
delta: float,
n_steps: int,
dt: float = 1.0 / 252,
seed: int | None = None,
) -> np.ndarray:
"""Simulate a Normal Inverse Gaussian process.
NIG is obtained as a normal variance-mean mixture with an Inverse
Gaussian subordinator.
Parameters
----------
alpha : float
Tail heaviness (> 0, alpha > |beta|).
beta : float
Skewness.
mu : float
Location (drift).
delta : float
Scale (> 0).
n_steps : int
Number of time steps.
dt : float, optional
Time increment (default 1/252).
seed : int or None, optional
Random seed.
Returns
-------
np.ndarray
Cumulative NIG process of length *n_steps + 1* (starts at 0).
Example
-------
>>> from wraquant.math.levy import nig_simulate
>>> path = nig_simulate(alpha=50.0, beta=-5.0, mu=0.0, delta=0.01,
... n_steps=252, seed=42)
>>> path.shape
(253,)
>>> path[0]
0.0
See Also
--------
nig_pdf : Evaluate the NIG density.
variance_gamma_simulate : Simulate a VG process.
"""
rng = np.random.default_rng(seed)
gamma_param = np.sqrt(alpha ** 2 - beta ** 2)
# Inverse Gaussian increments IG(delta*dt, delta*dt*gamma)
ig_mu_param = delta * dt / gamma_param if gamma_param > 0 else delta * dt
ig_lambda = (delta * dt) ** 2
# Simulate IG via the standard method
ig = _inverse_gaussian_sample(rng, ig_mu_param, ig_lambda, n_steps)
# Normal variance-mean mixture
z = rng.standard_normal(n_steps)
increments = mu * dt + beta * ig + np.sqrt(ig) * z
return np.concatenate([[0.0], np.cumsum(increments)])
def _inverse_gaussian_sample(
rng: np.random.Generator,
mu_ig: float,
lam: float,
n: int,
) -> np.ndarray:
"""Sample from Inverse Gaussian(mu, lambda) using the standard algorithm."""
v = rng.standard_normal(n)
y = v ** 2
x = mu_ig + (mu_ig ** 2 * y) / (2.0 * lam) - (mu_ig / (2.0 * lam)) * np.sqrt(
4.0 * mu_ig * lam * y + mu_ig ** 2 * y ** 2
)
u = rng.uniform(size=n)
result = np.where(u <= mu_ig / (mu_ig + x), x, mu_ig ** 2 / x)
return result
# ---------------------------------------------------------------------------
# CGMY
# ---------------------------------------------------------------------------
[docs]
def cgmy_simulate(
C: float,
G: float,
M: float,
Y: float,
n_steps: int,
dt: float = 1.0 / 252,
seed: int | None = None,
) -> np.ndarray:
"""Simulate a CGMY process via series representation (shot-noise).
For Y < 0 the process has finite activity and can be approximated
by compound Poisson. For 0 < Y < 2 the process has infinite
activity and is approximated by truncating the small-jump component
at epsilon and adding a Brownian correction.
Parameters
----------
C : float
Overall activity level (> 0).
G : float
Rate of exponential decay for negative jumps (> 0).
M : float
Rate of exponential decay for positive jumps (> 0).
Y : float
Fine structure parameter (Y < 2).
n_steps : int
Number of time steps.
dt : float, optional
Time increment (default 1/252).
seed : int or None, optional
Random seed.
Returns
-------
np.ndarray
Cumulative CGMY process of length *n_steps + 1* (starts at 0).
Example
-------
>>> from wraquant.math.levy import cgmy_simulate
>>> path = cgmy_simulate(C=1.0, G=5.0, M=10.0, Y=0.5,
... n_steps=252, seed=42)
>>> path.shape
(253,)
>>> path[0]
0.0
Notes
-----
The CGMY process generalises the Variance Gamma process (VG corresponds
to Y = 0). Higher Y values produce more small jumps; Y < 0 yields
finite activity (compound Poisson).
See Also
--------
variance_gamma_simulate : Simulate VG (special case Y = 0).
levy_stable_simulate : Simulate a stable Levy process.
"""
rng = np.random.default_rng(seed)
if Y < 0:
# Finite activity: compound Poisson approximation
return _cgmy_finite_activity(C, G, M, Y, n_steps, dt, rng)
else:
# Infinite activity: truncated small jumps + Gaussian correction
return _cgmy_infinite_activity(C, G, M, Y, n_steps, dt, rng)
def _cgmy_finite_activity(
C: float,
G: float,
M: float,
Y: float,
n_steps: int,
dt: float,
rng: np.random.Generator,
) -> np.ndarray:
"""CGMY with finite activity via compound Poisson."""
from scipy.special import gamma as gamma_fn
# Total intensity (for finite activity Y < 0)
intensity = C * (gamma_fn(-Y) * (G ** Y + M ** Y))
increments = np.zeros(n_steps)
for i in range(n_steps):
n_jumps = rng.poisson(intensity * dt)
if n_jumps > 0:
# Each jump: choose positive with prob M^Y/(G^Y+M^Y)
p_pos = M ** Y / (G ** Y + M ** Y)
signs = rng.binomial(1, p_pos, size=n_jumps)
sizes = np.empty(n_jumps)
for k in range(n_jumps):
if signs[k]:
sizes[k] = rng.exponential(1.0 / M)
else:
sizes[k] = -rng.exponential(1.0 / G)
increments[i] = sizes.sum()
return np.concatenate([[0.0], np.cumsum(increments)])
def _cgmy_infinite_activity(
C: float,
G: float,
M: float,
Y: float,
n_steps: int,
dt: float,
rng: np.random.Generator,
) -> np.ndarray:
"""CGMY with infinite activity via truncation + Gaussian correction."""
epsilon = 1e-4 # truncation level
# Approximate: large jumps as compound Poisson, small jumps as Gaussian
# Tail mass of CGMY Lévy measure: integral_{epsilon}^{inf} C*x^{-1-Y}*e^{-Mx} dx
# For Y > 0, use power-law tail approximation: C * epsilon^{-Y} * exp(-M*eps) / Y
if Y > 0:
intensity_pos = C * epsilon ** (-Y) * np.exp(-M * epsilon) / Y
intensity_neg = C * epsilon ** (-Y) * np.exp(-G * epsilon) / Y
else:
# Y = 0: finite activity, integral = C * E_1(M*epsilon) ~ -C*log(M*eps)
intensity_pos = C * max(-np.log(M * epsilon), 1.0) if M * epsilon > 0 else C
intensity_neg = C * max(-np.log(G * epsilon), 1.0) if G * epsilon > 0 else C
total_intensity = intensity_pos + intensity_neg
if not np.isfinite(total_intensity) or total_intensity <= 0:
total_intensity = max(intensity_pos, intensity_neg, 1.0)
# Ensure finite intensities
intensity_pos = min(intensity_pos, total_intensity) if np.isfinite(intensity_pos) else total_intensity / 2
intensity_neg = min(intensity_neg, total_intensity) if np.isfinite(intensity_neg) else total_intensity / 2
# Variance of small jumps (< epsilon): sigma^2 ~ 2*C*epsilon^(2-Y)/(2-Y)
small_var = 2.0 * C * epsilon ** (2.0 - Y) / max(2.0 - Y, 0.01) * dt
increments = np.zeros(n_steps)
for i in range(n_steps):
# Large jumps
n_jumps = rng.poisson(total_intensity * dt)
if n_jumps > 0:
p_pos = intensity_pos / total_intensity if total_intensity > 0 else 0.5
signs = rng.binomial(1, p_pos, size=n_jumps)
sizes = np.empty(n_jumps)
for k in range(n_jumps):
if signs[k]:
sizes[k] = epsilon + rng.exponential(1.0 / M)
else:
sizes[k] = -(epsilon + rng.exponential(1.0 / G))
increments[i] = sizes.sum()
# Gaussian correction for small jumps
if small_var > 0:
increments[i] += rng.normal(0, np.sqrt(small_var))
return np.concatenate([[0.0], np.cumsum(increments)])
# ---------------------------------------------------------------------------
# Stable Lévy
# ---------------------------------------------------------------------------
[docs]
def levy_stable_simulate(
alpha: float,
beta: float,
n_steps: int,
seed: int | None = None,
) -> np.ndarray:
r"""Simulate a stable Lévy process using the Chambers-Mallows-Stuck method.
Parameters
----------
alpha : float
Stability index, 0 < alpha <= 2.
beta : float
Skewness parameter, -1 <= beta <= 1.
n_steps : int
Number of increments.
seed : int or None, optional
Random seed.
Returns
-------
np.ndarray
Cumulative process values of length *n_steps + 1* (starts at 0).
Example
-------
>>> from wraquant.math.levy import levy_stable_simulate
>>> path = levy_stable_simulate(alpha=1.7, beta=0.0, n_steps=500, seed=42)
>>> path.shape
(501,)
>>> path[0]
0.0
Notes
-----
alpha = 2 recovers Brownian motion; alpha = 1 with beta = 0 yields a
Cauchy process. Smaller alpha produces heavier tails.
See Also
--------
variance_gamma_simulate : VG process (finite variance, unlike stable).
cgmy_simulate : CGMY process (tempered stable, finite moments).
"""
rng = np.random.default_rng(seed)
increments = _stable_random(rng, alpha, beta, n_steps)
return np.concatenate([[0.0], np.cumsum(increments)])
def _stable_random(
rng: np.random.Generator,
alpha: float,
beta: float,
n: int,
) -> np.ndarray:
"""Chambers-Mallows-Stuck algorithm for stable random variates."""
V = rng.uniform(-np.pi / 2, np.pi / 2, size=n)
W = rng.exponential(1.0, size=n)
if alpha == 1.0:
X = (
(np.pi / 2 + beta * V) * np.tan(V)
- beta * np.log(np.pi / 2 * W * np.cos(V) / (np.pi / 2 + beta * V))
)
else:
zeta = -beta * np.tan(np.pi * alpha / 2)
xi = np.arctan(-zeta) / alpha
s = (1.0 + zeta ** 2) ** (1.0 / (2.0 * alpha))
X = s * (
np.sin(alpha * (V + xi))
/ np.cos(V) ** (1.0 / alpha)
* (np.cos(V - alpha * (V + xi)) / W) ** ((1.0 - alpha) / alpha)
)
return X
# ---------------------------------------------------------------------------
# Fitting / calibration
# ---------------------------------------------------------------------------
[docs]
def fit_variance_gamma(
returns: ArrayLike,
) -> dict[str, float]:
"""Fit a Variance Gamma distribution to return data via MLE.
Parameters
----------
returns : array_like
Observed returns.
Returns
-------
dict
``sigma`` – fitted volatility.
``nu`` – fitted variance rate.
``theta`` – fitted drift.
``log_likelihood`` – maximised log-likelihood.
Example
-------
>>> import numpy as np
>>> from wraquant.math.levy import fit_variance_gamma, variance_gamma_simulate
>>> # Generate synthetic VG returns and re-fit
>>> path = variance_gamma_simulate(sigma=0.01, nu=0.5, theta=-0.001,
... n_steps=2000, seed=42)
>>> returns = np.diff(path)
>>> result = fit_variance_gamma(returns)
>>> result['sigma'] > 0
True
See Also
--------
variance_gamma_pdf : Evaluate the fitted density.
fit_nig : Fit an NIG distribution instead.
"""
returns = coerce_array(returns, name="returns")
# Initial guesses from moments
mu_hat = np.mean(returns)
sigma_hat = np.std(returns)
skew_hat = float(np.mean(((returns - mu_hat) / sigma_hat) ** 3))
kurt_hat = float(np.mean(((returns - mu_hat) / sigma_hat) ** 4))
sigma0 = max(sigma_hat, 1e-6)
nu0 = max((kurt_hat / 3.0 - 1.0), 0.01)
theta0 = skew_hat * sigma_hat / max(3.0 * nu0, 0.01)
def neg_ll(params: np.ndarray) -> float:
s, v, th = params
if s <= 1e-10 or v <= 1e-10:
return 1e12
try:
pdf_vals = variance_gamma_pdf(returns, s, v, th)
pdf_vals = np.maximum(pdf_vals, 1e-300)
ll = np.sum(np.log(pdf_vals))
if not np.isfinite(ll):
return 1e12
return -ll
except Exception:
return 1e12
result = minimize(
neg_ll,
x0=np.array([sigma0, nu0, theta0]),
method="Nelder-Mead",
options={"maxiter": 5000, "xatol": 1e-8, "fatol": 1e-8},
)
sigma_fit, nu_fit, theta_fit = result.x
return {
"sigma": float(abs(sigma_fit)),
"nu": float(abs(nu_fit)),
"theta": float(theta_fit),
"log_likelihood": float(-result.fun),
}
[docs]
def fit_nig(
returns: ArrayLike,
) -> dict[str, float]:
"""Fit a Normal Inverse Gaussian distribution to return data via MLE.
Parameters
----------
returns : array_like
Observed returns.
Returns
-------
dict
``alpha`` – fitted tail parameter.
``beta`` – fitted skewness parameter.
``mu`` – fitted location.
``delta`` – fitted scale.
``log_likelihood`` – maximised log-likelihood.
Example
-------
>>> import numpy as np
>>> from wraquant.math.levy import fit_nig
>>> rng = np.random.default_rng(42)
>>> returns = rng.standard_t(df=5, size=1000) * 0.01
>>> result = fit_nig(returns)
>>> result['alpha'] > 0 and result['delta'] > 0
True
See Also
--------
nig_pdf : Evaluate the fitted density.
fit_variance_gamma : Fit a VG distribution instead.
"""
returns = coerce_array(returns, name="returns")
mu_hat = np.mean(returns)
sigma_hat = np.std(returns)
# Reasonable initial guesses
alpha0 = 1.0 / max(sigma_hat, 1e-6)
beta0 = 0.0
mu0 = mu_hat
delta0 = max(sigma_hat, 1e-6)
def neg_ll(params: np.ndarray) -> float:
a, b, m, d = params
if a <= abs(b) + 1e-6 or d <= 1e-10 or a <= 0:
return 1e12
try:
pdf_vals = nig_pdf(returns, a, b, m, d)
pdf_vals = np.maximum(pdf_vals, 1e-300)
ll = np.sum(np.log(pdf_vals))
if not np.isfinite(ll):
return 1e12
return -ll
except Exception:
return 1e12
result = minimize(
neg_ll,
x0=np.array([alpha0, beta0, mu0, delta0]),
method="Nelder-Mead",
options={"maxiter": 5000, "xatol": 1e-8, "fatol": 1e-8},
)
a_fit, b_fit, m_fit, d_fit = result.x
return {
"alpha": float(abs(a_fit)),
"beta": float(b_fit),
"mu": float(m_fit),
"delta": float(abs(d_fit)),
"log_likelihood": float(-result.fun),
}