Source code for wraquant.io.database
"""Database connectors for reading and writing financial data.
All functions in this module require the ``etl`` optional dependency group
which provides SQLAlchemy and connectorx.
"""
from __future__ import annotations
from typing import Any
import pandas as pd
from wraquant.core.decorators import requires_extra
__all__ = [
"read_sql",
"write_sql",
"create_engine",
"read_sql_fast",
]
[docs]
@requires_extra("etl")
def read_sql(
query: str,
connection_string: str,
**kwargs: Any,
) -> pd.DataFrame:
"""Read data from a SQL database using SQLAlchemy.
Connects to any SQLAlchemy-supported database (PostgreSQL, MySQL,
SQLite, etc.), executes the query, and returns the result as a
DataFrame. The connection is automatically closed after the read.
Parameters:
query (str): SQL query string or table name.
connection_string (str): SQLAlchemy-compatible connection URI
(e.g., ``"postgresql://user:pass@host/db"``).
**kwargs: Additional keyword arguments forwarded to
:func:`pandas.read_sql`.
Returns:
pd.DataFrame: DataFrame with the query results.
Example:
>>> df = read_sql("SELECT * FROM prices", "sqlite:///data.db") # doctest: +SKIP
See Also:
write_sql: Write a DataFrame to a SQL table.
read_sql_fast: Faster alternative using connectorx.
"""
import sqlalchemy
engine = sqlalchemy.create_engine(connection_string)
with engine.connect() as conn:
return pd.read_sql(query, conn, **kwargs)
[docs]
@requires_extra("etl")
def write_sql(
data: pd.DataFrame,
table_name: str,
connection_string: str,
if_exists: str = "append",
**kwargs: Any,
) -> None:
"""Write a DataFrame to a SQL database table.
Inserts the DataFrame rows into the specified table, with
configurable behavior when the table already exists. The
transaction is committed automatically.
Parameters:
data (pd.DataFrame): DataFrame to write.
table_name (str): Destination table name.
connection_string (str): SQLAlchemy-compatible connection URI.
if_exists (str): Behavior when the table already exists:
``'fail'`` (raise), ``'replace'`` (drop and recreate),
or ``'append'`` (insert rows, default).
**kwargs: Additional keyword arguments forwarded to
:meth:`pandas.DataFrame.to_sql`.
Example:
>>> write_sql(df, "prices", "sqlite:///data.db") # doctest: +SKIP
See Also:
read_sql: Read data from a SQL database.
"""
import sqlalchemy
engine = sqlalchemy.create_engine(connection_string)
with engine.connect() as conn:
data.to_sql(table_name, conn, if_exists=if_exists, **kwargs)
conn.commit()
[docs]
@requires_extra("etl")
def create_engine(
connection_string: str,
**kwargs: Any,
) -> Any:
"""Create a SQLAlchemy engine.
A thin wrapper that provides a consistent interface and keeps the
SQLAlchemy import gated behind the ``etl`` extra.
Parameters:
connection_string: SQLAlchemy-compatible connection URI.
**kwargs: Additional keyword arguments forwarded to
:func:`sqlalchemy.create_engine`.
Returns:
A SQLAlchemy ``Engine`` instance.
"""
import sqlalchemy
return sqlalchemy.create_engine(connection_string, **kwargs)
[docs]
@requires_extra("etl")
def read_sql_fast(
query: str,
connection_string: str,
) -> pd.DataFrame:
"""Read from a SQL database using connectorx for speed, with pandas fallback.
connectorx can be 5--10x faster than pandas+SQLAlchemy for large
result sets because it uses native database drivers and avoids
Python-level row iteration. If connectorx is not installed or
encounters an unsupported driver, the function transparently falls
back to the standard SQLAlchemy path.
Parameters:
query (str): SQL query string.
connection_string (str): Database connection URI.
Returns:
pd.DataFrame: DataFrame with the query results.
Example:
>>> df = read_sql_fast("SELECT * FROM ticks", "postgresql://user:pw@host/db") # doctest: +SKIP
See Also:
read_sql: Standard SQLAlchemy-based reader.
"""
try:
import connectorx as cx
return cx.read_sql(connection_string, query)
except (ImportError, Exception):
# Fall back to pandas + SQLAlchemy when connectorx is unavailable
# or encounters an unsupported database driver.
import sqlalchemy
engine = sqlalchemy.create_engine(connection_string)
with engine.connect() as conn:
return pd.read_sql(query, conn)