"""Implied volatility and volatility surface construction.
Provides Newton-Raphson implied volatility solver and tools for
building volatility smiles and surfaces from market data.
"""
from __future__ import annotations
from collections.abc import Sequence
import numpy as np
import numpy.typing as npt
from wraquant.core.exceptions import PricingError
from wraquant.core.types import OptionType
from wraquant.price.greeks import vega as bs_vega
from wraquant.price.options import black_scholes
__all__ = [
"implied_volatility",
"vol_smile",
"vol_surface",
]
def _parse_option_type(option_type: str | OptionType) -> OptionType:
"""Normalize option_type to OptionType enum."""
if isinstance(option_type, OptionType):
return option_type
return OptionType(option_type.lower())
[docs]
def implied_volatility(
market_price: float,
S: float,
K: float,
T: float,
r: float,
option_type: str | OptionType = "call",
tol: float = 1e-8,
max_iter: int = 200,
initial_guess: float = 0.3,
) -> np.float64:
r"""Compute implied volatility using Newton-Raphson iteration.
Solves for :math:`\sigma` such that
:math:`\text{BS}(S, K, T, r, \sigma) = \text{market\_price}` by
iterating :math:`\sigma_{n+1} = \sigma_n - (C(\sigma_n) - C_{\text{mkt}}) / \mathcal{V}(\sigma_n)`,
where :math:`\mathcal{V}` is the Black-Scholes vega.
Use implied volatility to translate observed option prices into a
single number that summarises the market's view of future
uncertainty. Comparing implied vols across strikes and expiries
reveals the volatility smile/skew.
When vega is too small (deep in/out of the money), the solver
automatically falls back to a robust bisection method.
Parameters:
market_price (float): Observed market price of the option.
S (float): Current underlying price.
K (float): Strike price.
T (float): Time to expiration in years.
r (float): Risk-free interest rate (annualized).
option_type (str | OptionType): ``'call'`` or ``'put'``.
tol (float): Convergence tolerance on the price difference.
max_iter (int): Maximum number of Newton iterations.
initial_guess (float): Starting volatility estimate. 0.3 (30%)
is a reasonable default for most equity options.
Returns:
np.float64: Implied volatility (annualized). For example, 0.20
corresponds to 20% annualized volatility.
Raises:
PricingError: If the solver does not converge within *max_iter*
iterations.
Example:
>>> price = black_scholes(100, 100, 1.0, 0.05, 0.2)
>>> implied_volatility(price, 100, 100, 1.0, 0.05)
0.2000...
See Also:
vol_smile: Compute implied vols across multiple strikes.
vol_surface: Build a full implied volatility surface.
"""
otype = _parse_option_type(option_type)
sigma = initial_guess
for _ in range(max_iter):
bs_price = black_scholes(S, K, T, r, sigma, otype)
v = bs_vega(S, K, T, r, sigma)
if v < 1e-12:
# Vega too small; switch to bisection fallback
return _implied_vol_bisection(market_price, S, K, T, r, otype, tol)
diff = bs_price - market_price
if abs(diff) < tol:
return np.float64(sigma)
sigma = sigma - diff / v
# Keep sigma in reasonable bounds
if sigma <= 0.0:
sigma = 1e-4
elif sigma > 10.0:
sigma = 10.0
raise PricingError(
f"Implied volatility solver did not converge after {max_iter} iterations. "
f"Last sigma={sigma:.6f}, price diff={diff:.2e}"
)
def _implied_vol_bisection(
market_price: float,
S: float,
K: float,
T: float,
r: float,
option_type: OptionType,
tol: float,
lo: float = 1e-6,
hi: float = 10.0,
max_iter: int = 200,
) -> np.float64:
"""Bisection fallback for implied volatility when Newton fails."""
for _ in range(max_iter):
mid = (lo + hi) / 2.0
price = black_scholes(S, K, T, r, mid, option_type)
diff = price - market_price
if abs(diff) < tol:
return np.float64(mid)
if diff > 0:
hi = mid
else:
lo = mid
raise PricingError(
f"Implied volatility bisection did not converge after {max_iter} iterations."
)
[docs]
def vol_smile(
strikes: Sequence[float] | npt.NDArray[np.floating],
market_prices: Sequence[float] | npt.NDArray[np.floating],
S: float,
T: float,
r: float,
option_type: str | OptionType = "call",
) -> dict[str, npt.NDArray[np.float64]]:
"""Compute a volatility smile from market prices at multiple strikes.
For a fixed expiry, the implied volatility typically varies across
strikes, forming a "smile" or "skew" pattern. This function
computes the implied vol at each strike by inverting the
Black-Scholes formula, producing the raw data for smile analysis,
model calibration, or visualization.
Parameters:
strikes (Sequence[float] | ndarray): Array of strike prices.
market_prices (Sequence[float] | ndarray): Corresponding market
option prices (same length as *strikes*).
S (float): Current underlying price.
T (float): Time to expiration in years.
r (float): Risk-free interest rate (annualized).
option_type (str | OptionType): ``'call'`` or ``'put'``.
Returns:
dict[str, ndarray]: Dictionary with keys:
- ``'strikes'`` -- 1D array of strike prices.
- ``'implied_vols'`` -- 1D array of implied volatilities
corresponding to each strike.
Raises:
ValueError: If *strikes* and *market_prices* differ in length.
PricingError: If any individual implied vol fails to converge.
Example:
>>> strikes = [90, 95, 100, 105, 110]
>>> prices = [12.5, 9.0, 6.0, 3.8, 2.1]
>>> result = vol_smile(strikes, prices, 100, 0.5, 0.05)
>>> len(result['implied_vols'])
5
See Also:
implied_volatility: Single-strike implied vol solver.
vol_surface: Extend the smile across multiple expiries.
"""
otype = _parse_option_type(option_type)
strikes_arr = np.asarray(strikes, dtype=np.float64)
prices_arr = np.asarray(market_prices, dtype=np.float64)
if len(strikes_arr) != len(prices_arr):
raise ValueError("strikes and market_prices must have the same length.")
ivols = np.empty(len(strikes_arr), dtype=np.float64)
for i, (k, p) in enumerate(zip(strikes_arr, prices_arr, strict=False)):
ivols[i] = implied_volatility(float(p), S, float(k), T, r, otype)
return {
"strikes": strikes_arr,
"implied_vols": ivols,
}
[docs]
def vol_surface(
strikes: Sequence[float] | npt.NDArray[np.floating],
expiries: Sequence[float] | npt.NDArray[np.floating],
market_prices: Sequence[Sequence[float]] | npt.NDArray[np.floating],
S: float,
r: float,
option_type: str | OptionType = "call",
) -> dict[str, npt.NDArray[np.float64]]:
"""Construct a volatility surface from market prices across strikes and expiries.
The volatility surface is the central object in options trading: it
maps every (strike, expiry) pair to an implied volatility. Use the
surface for interpolation to price off-market options, for
calibrating stochastic volatility models (Heston, SABR), or for
detecting relative-value opportunities.
Parameters:
strikes (Sequence[float] | ndarray): Array of strike prices
(columns of the surface).
expiries (Sequence[float] | ndarray): Array of times to expiry
in years (rows of the surface).
market_prices (Sequence[Sequence[float]] | ndarray): 2D array of
option prices, shape ``(len(expiries), len(strikes))``.
S (float): Current underlying price.
r (float): Risk-free interest rate (annualized).
option_type (str | OptionType): ``'call'`` or ``'put'``.
Returns:
dict[str, ndarray]: Dictionary with keys:
- ``'strikes'`` -- 1D array of strikes.
- ``'expiries'`` -- 1D array of expiries.
- ``'implied_vols'`` -- 2D array of implied vols, shape
``(len(expiries), len(strikes))``.
Raises:
ValueError: If the shape of *market_prices* does not match
``(len(expiries), len(strikes))``.
Example:
>>> strikes = [95, 100, 105]
>>> expiries = [0.25, 0.5]
>>> prices = [[6.0, 3.5, 1.5], [8.0, 5.5, 3.5]]
>>> surface = vol_surface(strikes, expiries, prices, 100, 0.05)
>>> surface['implied_vols'].shape
(2, 3)
See Also:
vol_smile: Single-expiry volatility smile.
implied_volatility: Single-point implied vol solver.
"""
otype = _parse_option_type(option_type)
strikes_arr = np.asarray(strikes, dtype=np.float64)
expiries_arr = np.asarray(expiries, dtype=np.float64)
prices_arr = np.asarray(market_prices, dtype=np.float64)
if prices_arr.shape != (len(expiries_arr), len(strikes_arr)):
raise ValueError(
f"market_prices shape {prices_arr.shape} does not match "
f"(len(expiries), len(strikes)) = ({len(expiries_arr)}, {len(strikes_arr)})"
)
ivols = np.empty_like(prices_arr, dtype=np.float64)
for i, T in enumerate(expiries_arr):
for j, k in enumerate(strikes_arr):
ivols[i, j] = implied_volatility(
float(prices_arr[i, j]), S, float(k), float(T), r, otype
)
return {
"strikes": strikes_arr,
"expiries": expiries_arr,
"implied_vols": ivols,
}