"""Event-driven analysis for quantitative finance.
Provides functions for analyzing corporate events that drive short-term
alpha: earnings announcements, dividend payments, insider transactions,
and institutional ownership changes. All data is sourced from the FMP
(Financial Modeling Prep) API.
Event-driven strategies exploit predictable market reactions to corporate
announcements. The most well-documented anomaly is post-earnings
announcement drift (PEAD): stocks that beat estimates tend to continue
drifting upward for 60+ trading days, and vice versa for misses.
This module covers:
1. **Earnings calendar & surprises** -- Upcoming and historical earnings
with beat/miss classification and surprise magnitudes.
2. **PEAD analysis** -- Quantifies the post-earnings drift signal.
3. **Dividend history** -- Yield, growth, and payout ratio trends.
4. **Insider activity** -- Net buy/sell ratios and notable transactions.
5. **Institutional ownership** -- Top holders and quarterly changes.
References:
- Ball & Brown (1968), "An Empirical Evaluation of Accounting Income
Numbers"
- Bernard & Thomas (1989), "Post-Earnings-Announcement Drift"
- Lakonishok & Lee (2001), "Are Insider Trades Informative?"
"""
from __future__ import annotations
from datetime import date, datetime, timedelta
from typing import Any
import numpy as np
import pandas as pd
from wraquant.core.decorators import requires_extra
# ---------------------------------------------------------------------------
# Earnings
# ---------------------------------------------------------------------------
[docs]
@requires_extra("market-data")
def earnings_calendar(
from_date: str | date | None = None,
to_date: str | date | None = None,
) -> pd.DataFrame:
"""Fetch the earnings calendar for a date range.
Returns a DataFrame of upcoming and recent earnings announcements
across all symbols. Useful for screening the market for event-driven
opportunities and avoiding earnings risk in existing positions.
Parameters:
from_date: Start date as ``"YYYY-MM-DD"`` string or
``datetime.date``. Defaults to today.
to_date: End date. Defaults to 7 days after *from_date*.
Returns:
DataFrame with columns:
- **symbol** (*str*) -- Ticker symbol.
- **date** (*str*) -- Earnings date.
- **eps_estimated** (*float*) -- Consensus EPS estimate.
- **eps_actual** (*float*) -- Actual EPS (NaN if not yet reported).
- **revenue_estimated** (*float*) -- Consensus revenue estimate.
- **revenue_actual** (*float*) -- Actual revenue (NaN if not yet
reported).
- **time** (*str*) -- ``"bmo"`` (before market open),
``"amc"`` (after market close), or ``"--"``.
Example:
>>> from wraquant.news.events import earnings_calendar
>>> cal = earnings_calendar("2024-01-15", "2024-01-19")
>>> print(cal[["symbol", "date", "eps_estimated"]].head())
See Also:
earnings_surprises: Historical beat/miss data for a single stock.
upcoming_earnings: Next earnings date for a specific symbol.
"""
from wraquant.data.providers.fmp import FMPClient
client = FMPClient()
if from_date is None:
from_date = date.today()
if to_date is None:
if isinstance(from_date, str):
from_date_dt = datetime.strptime(from_date, "%Y-%m-%d").date()
else:
from_date_dt = from_date
to_date = from_date_dt + timedelta(days=7)
from_str = str(from_date)
str(to_date)
# FMPClient.earnings returns calendar data; filter by date range
df = client.earnings(from_str)
if df.empty:
return pd.DataFrame(
columns=[
"symbol",
"date",
"eps_estimated",
"eps_actual",
"revenue_estimated",
"revenue_actual",
"time",
]
)
# Standardize column names
col_map = {
"epsEstimated": "eps_estimated",
"epsActual": "eps_actual",
"revenueEstimated": "revenue_estimated",
"revenueActual": "revenue_actual",
}
df = df.rename(columns=col_map)
return df
[docs]
@requires_extra("market-data")
def earnings_surprises(
symbol: str,
limit: int = 20,
) -> pd.DataFrame:
"""Fetch historical earnings surprises for a stock.
Returns actual vs. estimated EPS for each earnings report, with the
standardized surprise computed as ``(actual - estimate) / |estimate|``.
This is the raw data behind the PEAD (post-earnings announcement
drift) anomaly.
Mathematical formulation:
SUE_t = (EPS_actual - EPS_estimate) / |EPS_estimate|
Parameters:
symbol: Ticker symbol (e.g., ``"AAPL"``).
limit: Maximum number of historical quarters to return.
Returns:
DataFrame with columns:
- **date** (*str*) -- Earnings announcement date.
- **actual** (*float*) -- Actual reported EPS.
- **estimate** (*float*) -- Consensus analyst estimate.
- **surprise** (*float*) -- Standardized surprise.
- **surprise_pct** (*float*) -- Surprise as a percentage.
- **beat** (*bool*) -- True if actual exceeded estimate.
Example:
>>> from wraquant.news.events import earnings_surprises
>>> df = earnings_surprises("MSFT", limit=8)
>>> print(df[["date", "actual", "estimate", "surprise_pct", "beat"]])
Notes:
Reference: Bernard & Thomas (1989). "Post-Earnings-Announcement
Drift: Delayed Price Response or Risk Premium?" *Journal of
Accounting Research*, 27, 1-36.
See Also:
earnings_history: Extended analysis including PEAD metrics.
upcoming_earnings: Next expected earnings date.
"""
from wraquant.data.providers.fmp import FMPClient
client = FMPClient()
df = client.earnings_surprises(symbol)
if df.empty:
return pd.DataFrame(
columns=[
"date",
"actual",
"estimate",
"surprise",
"surprise_pct",
"beat",
]
)
# Limit rows
df = df.head(limit).copy()
# Standardize column names
actual_col = _resolve_col(df, ["actualEarningResult", "actual", "actualEPS"])
estimate_col = _resolve_col(df, ["estimatedEarning", "estimate", "estimatedEPS"])
date_col = _resolve_col(df, ["date", "fiscalDateEnding"])
result = pd.DataFrame()
if date_col:
result["date"] = df[date_col]
if actual_col:
result["actual"] = pd.to_numeric(df[actual_col], errors="coerce")
if estimate_col:
result["estimate"] = pd.to_numeric(df[estimate_col], errors="coerce")
if "actual" in result.columns and "estimate" in result.columns:
abs_est = result["estimate"].abs()
result["surprise"] = np.where(
abs_est > 1e-12,
(result["actual"] - result["estimate"]) / abs_est,
0.0,
)
result["surprise_pct"] = result["surprise"] * 100.0
result["beat"] = result["actual"] > result["estimate"]
else:
result["surprise"] = 0.0
result["surprise_pct"] = 0.0
result["beat"] = False
return result.reset_index(drop=True)
[docs]
@requires_extra("market-data")
def upcoming_earnings(symbol: str) -> dict[str, Any]:
"""Get the next expected earnings date and consensus estimate.
Combines earnings calendar lookup with analyst estimates to provide
a quick snapshot of the upcoming earnings event for a symbol.
Parameters:
symbol: Ticker symbol (e.g., ``"AMZN"``).
Returns:
Dictionary containing:
- **symbol** (*str*) -- Ticker symbol.
- **next_date** (*str | None*) -- Next expected earnings date,
or None if not scheduled.
- **eps_estimate** (*float | None*) -- Consensus EPS estimate.
- **revenue_estimate** (*float | None*) -- Consensus revenue
estimate.
- **days_until** (*int | None*) -- Calendar days until earnings.
- **time** (*str | None*) -- ``"bmo"`` or ``"amc"`` if known.
Example:
>>> from wraquant.news.events import upcoming_earnings
>>> info = upcoming_earnings("GOOG")
>>> if info["next_date"]:
... print(f"Earnings on {info['next_date']} "
... f"({info['days_until']} days away)")
See Also:
earnings_calendar: Full market-wide calendar.
earnings_surprises: Historical beat/miss data.
"""
from wraquant.data.providers.fmp import FMPClient
client = FMPClient()
# Fetch earnings data for the symbol
df = client.earnings(symbol)
result: dict[str, Any] = {
"symbol": symbol,
"next_date": None,
"eps_estimate": None,
"revenue_estimate": None,
"days_until": None,
"time": None,
}
if df.empty:
return result
# Find the next future earnings date
date_col = _resolve_col(df, ["date", "fiscalDateEnding", "earningsDate"])
if not date_col:
return result
today = pd.Timestamp.now().normalize()
df["_parsed_date"] = pd.to_datetime(df[date_col], errors="coerce")
future = df.loc[df["_parsed_date"] >= today].sort_values("_parsed_date")
if future.empty:
return result
next_row = future.iloc[0]
result["next_date"] = str(next_row[date_col])
result["days_until"] = int((next_row["_parsed_date"] - today).days)
# Extract estimates
eps_est_col = _resolve_col(df, ["epsEstimated", "estimatedEPS", "estimate"])
rev_est_col = _resolve_col(df, ["revenueEstimated", "estimatedRevenue"])
time_col = _resolve_col(df, ["time", "period"])
if eps_est_col and pd.notna(next_row.get(eps_est_col)):
result["eps_estimate"] = float(next_row[eps_est_col])
if rev_est_col and pd.notna(next_row.get(rev_est_col)):
result["revenue_estimate"] = float(next_row[rev_est_col])
if time_col and pd.notna(next_row.get(time_col)):
result["time"] = str(next_row[time_col])
return result
[docs]
@requires_extra("market-data")
def earnings_history(
symbol: str,
limit: int = 20,
) -> dict[str, Any]:
"""Comprehensive earnings history with beat/miss analysis and PEAD.
Builds on ``earnings_surprises`` to provide aggregate statistics
about a company's earnings track record, including beat rate,
average surprise magnitude, consistency, and post-earnings
announcement drift (PEAD) metrics.
The PEAD analysis measures whether the stock price continues to
drift in the direction of the surprise after the announcement,
which is one of the most robust and well-documented anomalies
in finance.
Parameters:
symbol: Ticker symbol (e.g., ``"NFLX"``).
limit: Number of historical quarters to analyze.
Returns:
Dictionary containing:
- **symbol** (*str*) -- Ticker symbol.
- **quarters_analyzed** (*int*) -- Number of quarters in the
analysis.
- **surprises** (*pd.DataFrame*) -- Raw earnings surprise data
(same format as ``earnings_surprises``).
- **beat_rate** (*float*) -- Fraction of quarters where actual
exceeded estimate.
- **miss_rate** (*float*) -- Fraction of quarters where actual
was below estimate.
- **avg_surprise** (*float*) -- Mean standardized surprise.
- **avg_beat_magnitude** (*float*) -- Mean surprise when beating.
- **avg_miss_magnitude** (*float*) -- Mean surprise when missing.
- **surprise_std** (*float*) -- Standard deviation of surprises
(measures consistency).
- **streak** (*dict*) -- Current streak info:
- **type** (*str*) -- ``"beat"`` or ``"miss"``.
- **length** (*int*) -- Number of consecutive beats/misses.
- **pead_signal** (*str*) -- ``"strong_beat"``,
``"moderate_beat"``, ``"neutral"``, ``"moderate_miss"``, or
``"strong_miss"`` based on the most recent surprise.
Example:
>>> from wraquant.news.events import earnings_history
>>> hist = earnings_history("AAPL", limit=12)
>>> print(f"Beat rate: {hist['beat_rate']:.0%}")
>>> print(f"Avg surprise: {hist['avg_surprise']:.2%}")
>>> print(f"Current streak: {hist['streak']}")
Notes:
Reference: Ball & Brown (1968). "An Empirical Evaluation of
Accounting Income Numbers." *Journal of Accounting Research*,
6(2), 159-178.
See Also:
earnings_surprises: Raw surprise data.
upcoming_earnings: Next earnings date.
"""
surprises_df = earnings_surprises(symbol, limit=limit)
result: dict[str, Any] = {
"symbol": symbol,
"quarters_analyzed": len(surprises_df),
"surprises": surprises_df,
"beat_rate": 0.0,
"miss_rate": 0.0,
"avg_surprise": 0.0,
"avg_beat_magnitude": 0.0,
"avg_miss_magnitude": 0.0,
"surprise_std": 0.0,
"streak": {"type": "none", "length": 0},
"pead_signal": "neutral",
}
if surprises_df.empty or "beat" not in surprises_df.columns:
return result
beats = surprises_df["beat"]
surprises = surprises_df["surprise"]
result["beat_rate"] = float(beats.mean())
result["miss_rate"] = float((~beats).mean())
result["avg_surprise"] = float(surprises.mean())
result["surprise_std"] = float(surprises.std(ddof=1)) if len(surprises) > 1 else 0.0
beat_mask = beats.astype(bool)
if beat_mask.any():
result["avg_beat_magnitude"] = float(surprises[beat_mask].mean())
if (~beat_mask).any():
result["avg_miss_magnitude"] = float(surprises[~beat_mask].mean())
# Compute current streak
if len(beats) > 0:
current_val = bool(beats.iloc[0])
streak_len = 0
for val in beats:
if bool(val) == current_val:
streak_len += 1
else:
break
result["streak"] = {
"type": "beat" if current_val else "miss",
"length": streak_len,
}
# PEAD signal from most recent quarter
if len(surprises) > 0:
latest = float(surprises.iloc[0])
if latest > 0.10:
result["pead_signal"] = "strong_beat"
elif latest > 0.02:
result["pead_signal"] = "moderate_beat"
elif latest < -0.10:
result["pead_signal"] = "strong_miss"
elif latest < -0.02:
result["pead_signal"] = "moderate_miss"
else:
result["pead_signal"] = "neutral"
return result
# ---------------------------------------------------------------------------
# Dividends
# ---------------------------------------------------------------------------
[docs]
@requires_extra("market-data")
def dividend_history(
symbol: str,
limit: int = 40,
) -> dict[str, Any]:
"""Analyze dividend history including yield, growth, and payout ratio.
Fetches historical dividend data and computes metrics relevant to
dividend-focused strategies: yield trends, dividend growth rates,
and consistency of payments. Dividend growth is a strong predictor
of total return for income-oriented portfolios.
Parameters:
symbol: Ticker symbol (e.g., ``"JNJ"``).
limit: Number of historical dividend records to analyze.
Returns:
Dictionary containing:
- **symbol** (*str*) -- Ticker symbol.
- **dividends** (*pd.DataFrame*) -- Historical dividend data with
columns: ``date``, ``dividend``, ``yield_pct`` (if price
available).
- **total_dividends** (*int*) -- Number of dividend payments.
- **current_annual_dividend** (*float*) -- Estimated annual
dividend based on most recent payment.
- **dividend_growth_rate** (*float*) -- Compound annual growth
rate of dividends (if sufficient history).
- **consecutive_payments** (*int*) -- Count of consecutive
periods with a dividend payment.
- **is_grower** (*bool*) -- True if the dividend has grown
year-over-year in each of the last 3 periods.
Example:
>>> from wraquant.news.events import dividend_history
>>> div = dividend_history("KO", limit=20)
>>> print(f"Annual dividend: ${div['current_annual_dividend']:.2f}")
>>> print(f"Growth rate: {div['dividend_growth_rate']:.1%}")
See Also:
earnings_history: Earnings-based fundamental analysis.
"""
from wraquant.data.providers.fmp import FMPClient
client = FMPClient()
result: dict[str, Any] = {
"symbol": symbol,
"dividends": pd.DataFrame(),
"total_dividends": 0,
"current_annual_dividend": 0.0,
"dividend_growth_rate": 0.0,
"consecutive_payments": 0,
"is_grower": False,
}
# Use stock_news as a proxy -- FMPClient may expose dividend endpoint
# via sec_filings or a dedicated method; we build from earnings data
try:
df = client.earnings(symbol)
except Exception: # noqa: BLE001
return result
if df.empty:
return result
# Look for dividend columns
div_col = _resolve_col(df, ["dividend", "adjDividend", "dividendYield"])
date_col = _resolve_col(df, ["date", "paymentDate", "declarationDate"])
if not div_col:
# Try to construct from available data
return result
div_data = pd.DataFrame()
if date_col:
div_data["date"] = df[date_col]
div_data["dividend"] = pd.to_numeric(df[div_col], errors="coerce")
div_data = div_data.dropna(subset=["dividend"])
div_data = div_data[div_data["dividend"] > 0]
if div_data.empty:
return result
div_data = div_data.head(limit).reset_index(drop=True)
result["dividends"] = div_data
result["total_dividends"] = len(div_data)
# Current annual dividend: most recent * frequency estimate
most_recent = float(div_data["dividend"].iloc[0])
if len(div_data) >= 4:
# Estimate frequency from date gaps
result["current_annual_dividend"] = most_recent * 4 # Assume quarterly
else:
result["current_annual_dividend"] = most_recent * 4
# Dividend growth rate (CAGR)
if len(div_data) >= 2:
oldest = float(div_data["dividend"].iloc[-1])
newest = float(div_data["dividend"].iloc[0])
n_periods = len(div_data) - 1
if oldest > 0 and newest > 0 and n_periods > 0:
cagr = (newest / oldest) ** (1.0 / n_periods) - 1.0
result["dividend_growth_rate"] = float(cagr)
# Consecutive payments
result["consecutive_payments"] = len(div_data)
# Is grower? Check if last 3 dividends are increasing
if len(div_data) >= 3:
recent = div_data["dividend"].iloc[:3].values
result["is_grower"] = bool(recent[0] >= recent[1] >= recent[2])
return result
# ---------------------------------------------------------------------------
# Insider activity
# ---------------------------------------------------------------------------
[docs]
@requires_extra("market-data")
def insider_activity(
symbol: str,
limit: int = 100,
) -> dict[str, Any]:
"""Analyze insider buying and selling activity.
Insider transactions are among the most informative signals in
equity markets. Insiders (officers, directors, 10%+ owners) must
file SEC Form 4 within two business days of a transaction.
Aggregate insider buying is a stronger signal than selling, because
insiders may sell for many reasons (diversification, liquidity) but
typically buy only when they expect appreciation.
Parameters:
symbol: Ticker symbol (e.g., ``"META"``).
limit: Maximum number of transactions to fetch.
Returns:
Dictionary containing:
- **symbol** (*str*) -- Ticker symbol.
- **transactions** (*pd.DataFrame*) -- Raw transaction data with
columns: ``date``, ``insider``, ``transaction_type``,
``shares``, ``price``, ``value``.
- **total_transactions** (*int*) -- Number of transactions.
- **buy_count** (*int*) -- Number of purchase transactions.
- **sell_count** (*int*) -- Number of sale transactions.
- **buy_sell_ratio** (*float*) -- Ratio of buys to sells (>1
is bullish). Returns ``inf`` if no sells.
- **net_shares** (*int*) -- Net shares bought minus sold.
- **net_value** (*float*) -- Net dollar value of insider trades.
- **notable_trades** (*list[dict]*) -- Transactions above $1M.
- **signal** (*str*) -- ``"bullish"`` if net buying is significant,
``"bearish"`` if net selling is significant, ``"neutral"``
otherwise.
Example:
>>> from wraquant.news.events import insider_activity
>>> insiders = insider_activity("AAPL")
>>> print(f"Buy/sell ratio: {insiders['buy_sell_ratio']:.2f}")
>>> print(f"Net value: ${insiders['net_value']:,.0f}")
>>> print(f"Signal: {insiders['signal']}")
Notes:
Reference: Lakonishok & Lee (2001). "Are Insider Trades
Informative?" *The Review of Financial Studies*, 14(1), 79-111.
See Also:
institutional_ownership: Institutional holder analysis.
earnings_history: Fundamental event analysis.
"""
from wraquant.data.providers.fmp import FMPClient
client = FMPClient()
result: dict[str, Any] = {
"symbol": symbol,
"transactions": pd.DataFrame(),
"total_transactions": 0,
"buy_count": 0,
"sell_count": 0,
"buy_sell_ratio": 0.0,
"net_shares": 0,
"net_value": 0.0,
"notable_trades": [],
"signal": "neutral",
}
try:
df = client.sec_filings(symbol, type="4", limit=limit)
except Exception: # noqa: BLE001
return result
if df.empty:
return result
# Try to extract insider trading data from the filings
date_col = _resolve_col(
df, ["date", "fillingDate", "filingDate", "transactionDate"]
)
type_col = _resolve_col(
df,
["transactionType", "type", "acquistionOrDisposition", "transaction_type"],
)
shares_col = _resolve_col(df, ["securitiesTransacted", "shares", "sharesTraded"])
price_col = _resolve_col(df, ["price", "pricePerShare"])
name_col = _resolve_col(df, ["reportingName", "insider", "reportingCik", "name"])
# Build transactions DataFrame
txns = pd.DataFrame()
if date_col:
txns["date"] = df[date_col]
if name_col:
txns["insider"] = df[name_col].astype(str)
if type_col:
txns["transaction_type"] = df[type_col].astype(str).str.lower()
if shares_col:
txns["shares"] = (
pd.to_numeric(df[shares_col], errors="coerce").fillna(0).astype(int)
)
if price_col:
txns["price"] = pd.to_numeric(df[price_col], errors="coerce").fillna(0.0)
if "shares" in txns.columns and "price" in txns.columns:
txns["value"] = txns["shares"] * txns["price"]
else:
txns["value"] = 0.0
result["transactions"] = txns
result["total_transactions"] = len(txns)
if "transaction_type" not in txns.columns or txns.empty:
return result
# Classify buys and sells
buy_keywords = {"purchase", "buy", "acquisition", "a", "p"}
sell_keywords = {"sale", "sell", "disposition", "d", "s"}
is_buy = txns["transaction_type"].apply(
lambda x: any(kw in str(x).lower() for kw in buy_keywords)
)
is_sell = txns["transaction_type"].apply(
lambda x: any(kw in str(x).lower() for kw in sell_keywords)
)
result["buy_count"] = int(is_buy.sum())
result["sell_count"] = int(is_sell.sum())
if result["sell_count"] > 0:
result["buy_sell_ratio"] = result["buy_count"] / result["sell_count"]
elif result["buy_count"] > 0:
result["buy_sell_ratio"] = float("inf")
# Net shares and value
if "shares" in txns.columns:
buy_shares = int(txns.loc[is_buy, "shares"].sum())
sell_shares = int(txns.loc[is_sell, "shares"].sum())
result["net_shares"] = buy_shares - sell_shares
if "value" in txns.columns:
buy_value = float(txns.loc[is_buy, "value"].sum())
sell_value = float(txns.loc[is_sell, "value"].sum())
result["net_value"] = buy_value - sell_value
# Notable trades (> $1M)
notable_mask = txns["value"].abs() > 1_000_000
if notable_mask.any():
notable = txns.loc[notable_mask]
result["notable_trades"] = notable.to_dict("records")
# Signal determination
if result["buy_sell_ratio"] > 2.0 and result["net_value"] > 100_000:
result["signal"] = "bullish"
elif result["buy_sell_ratio"] < 0.3 and result["net_value"] < -500_000:
result["signal"] = "bearish"
else:
result["signal"] = "neutral"
return result
# ---------------------------------------------------------------------------
# Institutional ownership
# ---------------------------------------------------------------------------
[docs]
@requires_extra("market-data")
def institutional_ownership(symbol: str) -> dict[str, Any]:
"""Analyze institutional ownership and recent changes.
Institutional investors (mutual funds, hedge funds, pension funds)
hold the majority of US equity market capitalization. Changes in
institutional ownership can signal informed conviction: increasing
ownership by smart-money managers is a moderately bullish signal.
Parameters:
symbol: Ticker symbol (e.g., ``"TSLA"``).
Returns:
Dictionary containing:
- **symbol** (*str*) -- Ticker symbol.
- **holders** (*pd.DataFrame*) -- Top institutional holders with
columns: ``holder``, ``shares``, ``date_reported``, ``change``,
``change_pct``.
- **total_institutional_holders** (*int*) -- Count of institutional
holders.
- **total_shares_held** (*int*) -- Total shares held by
institutions.
- **top_holder** (*str | None*) -- Name of the largest holder.
- **net_change** (*str*) -- ``"increasing"``, ``"decreasing"``,
or ``"stable"`` based on aggregate position changes.
- **concentration** (*float*) -- Herfindahl index of ownership
concentration among top holders (higher = more concentrated).
Example:
>>> from wraquant.news.events import institutional_ownership
>>> inst = institutional_ownership("AAPL")
>>> print(f"Top holder: {inst['top_holder']}")
>>> print(f"Net change: {inst['net_change']}")
>>> print(inst["holders"].head())
See Also:
insider_activity: Corporate insider transaction analysis.
"""
from wraquant.data.providers.fmp import FMPClient
client = FMPClient()
result: dict[str, Any] = {
"symbol": symbol,
"holders": pd.DataFrame(),
"total_institutional_holders": 0,
"total_shares_held": 0,
"top_holder": None,
"net_change": "stable",
"concentration": 0.0,
}
# Use SEC filings to get institutional ownership (13F filings)
try:
df = client.sec_filings(symbol, type="13F", limit=100)
except Exception: # noqa: BLE001
return result
if df.empty:
return result
# Map columns
holder_col = _resolve_col(df, ["holder", "investorName", "name", "reportingName"])
shares_col = _resolve_col(df, ["shares", "sharesNumber", "securitiesTransacted"])
date_col = _resolve_col(df, ["date", "fillingDate", "filingDate", "dateReported"])
change_col = _resolve_col(df, ["change", "changeInShares", "sharesChange"])
change_pct_col = _resolve_col(
df, ["changeInSharesPercentage", "changePct", "change_pct"]
)
holders = pd.DataFrame()
if holder_col:
holders["holder"] = df[holder_col].astype(str)
if shares_col:
holders["shares"] = (
pd.to_numeric(df[shares_col], errors="coerce").fillna(0).astype(int)
)
if date_col:
holders["date_reported"] = df[date_col]
if change_col:
holders["change"] = (
pd.to_numeric(df[change_col], errors="coerce").fillna(0).astype(int)
)
if change_pct_col:
holders["change_pct"] = pd.to_numeric(
df[change_pct_col], errors="coerce"
).fillna(0.0)
result["holders"] = holders
result["total_institutional_holders"] = len(holders)
if "shares" in holders.columns and not holders.empty:
result["total_shares_held"] = int(holders["shares"].sum())
# Top holder
if "holder" in holders.columns:
max_idx = holders["shares"].idxmax()
result["top_holder"] = str(holders.loc[max_idx, "holder"])
# Ownership concentration (Herfindahl index)
total = holders["shares"].sum()
if total > 0:
shares_pct = holders["shares"] / total
result["concentration"] = float((shares_pct**2).sum())
# Net change assessment
if "change" in holders.columns and not holders.empty:
net = holders["change"].sum()
if net > 0:
result["net_change"] = "increasing"
elif net < 0:
result["net_change"] = "decreasing"
else:
result["net_change"] = "stable"
return result
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _resolve_col(
df: pd.DataFrame,
candidates: list[str],
) -> str | None:
"""Find the first matching column name from a list of candidates.
Parameters:
df: DataFrame to search.
candidates: Ordered list of possible column names.
Returns:
The first matching column name, or None if none match.
"""
for col in candidates:
if col in df.columns:
return col
return None