I/O (wraquant.io)

ETL, database connectivity, cloud storage, file I/O, and streaming data utilities for loading and persisting financial data.

Quick Example

from wraquant.io import database, files, cloud

# Read from a SQL database
df = database.read_sql("SELECT * FROM prices WHERE date > '2023-01-01'",
                       connection_string="sqlite:///market.db")

# Export to Parquet (columnar, compressed)
files.to_parquet(df, "prices.parquet")

# Read from cloud storage
df = cloud.read_s3("s3://bucket/prices.parquet")

See also

API Reference

I/O connectors for file formats, streaming, and report export.

This module consolidates all data persistence and export functionality for wraquant. It handles reading and writing financial data to local files in multiple formats, real-time tick streaming via WebSocket connections, and exporting analysis results to JSON, dictionaries, and formatted tearsheets.

Key components:

  • File I/O – Read and write CSV, Parquet, HDF5, and Excel files with read_csv / write_csv, read_parquet / write_parquet, read_hdf / write_hdf, read_excel / write_excel. Parquet is recommended for large datasets (columnar, compressed, preserves dtypes).

  • StreamingWebSocketClient connects to real-time market data feeds, and TickBuffer accumulates ticks into OHLCV bars for downstream consumption.

  • Exportto_tearsheet renders analysis results as formatted HTML/PDF reports, to_json / to_dict serialize results for APIs, and format_table produces publication-ready text tables.

Example

>>> from wraquant.io import read_parquet, write_parquet, to_json
>>> prices = read_parquet("prices.parquet")
>>> # ... run analysis ...
>>> write_parquet(prices, "prices_clean.parquet")
>>> json_str = to_json({"sharpe": 1.5, "max_dd": -0.12})

Use wraquant.io for persisting data to disk and exporting results. For fetching data from external market data providers (Yahoo Finance, FRED), use wraquant.data instead. For database and cloud storage connectors (SQL, S3, GCS), install the etl or warehouse extra groups.

read_csv(path, date_column='Date', parse_dates=True, **kwargs)[source]

Read a CSV file with financial defaults.

By default, parses a Date column and sets it as the index, which matches the most common layout for financial time-series CSVs (Yahoo Finance downloads, FRED exports, etc.). Pass parse_dates=False to disable automatic date parsing.

Parameters:
  • path (str | Path) – Path to the CSV file.

  • date_column (str, default: 'Date') – Name of the date column to parse and set as index (default 'Date'). Ignored when parse_dates is False.

  • parse_dates (bool, default: True) – If True, parse date_column as datetime and use it as the DataFrame index.

  • **kwargs (Any) – Additional keyword arguments forwarded to pandas.read_csv().

Returns:

DataFrame with the CSV contents, optionally

date-indexed.

Return type:

DataFrame

Example

>>> df = read_csv("prices.csv")
>>> df = read_csv("data.csv", parse_dates=False)

See also

write_csv: Write a DataFrame to CSV. read_parquet: Faster columnar format for large datasets.

write_csv(data, path, **kwargs)[source]

Write a DataFrame or Series to CSV.

Creates parent directories automatically if they do not exist.

Parameters:
Return type:

None

Example

>>> write_csv(df, "output/prices.csv")

See also

read_csv: Read a CSV file with financial defaults.

read_parquet(path, columns=None, **kwargs)[source]

Read a Parquet file.

Parquet is the recommended format for large financial datasets: it is columnar, compressed, and preserves dtypes (including datetime indices) without the ambiguity of CSV parsing.

Parameters:
  • path (str | Path) – Path to the Parquet file.

  • columns (list[str] | None, default: None) – Subset of columns to read. None reads all columns. Selecting a subset is much faster for wide datasets.

  • **kwargs (Any) – Additional keyword arguments forwarded to pandas.read_parquet().

Returns:

DataFrame with the Parquet contents.

Return type:

DataFrame

Example

>>> df = read_parquet("prices.parquet")
>>> df = read_parquet("prices.parquet", columns=["close", "volume"])

See also

write_parquet: Write a DataFrame to Parquet.

write_parquet(data, path, **kwargs)[source]

Write a DataFrame to Parquet format.

Creates parent directories automatically. Parquet preserves dtypes exactly and is significantly faster to read/write than CSV for large datasets.

Parameters:
Return type:

None

Example

>>> write_parquet(df, "output/prices.parquet")

See also

read_parquet: Read a Parquet file.

read_hdf(path, key='data', **kwargs)[source]

Read an HDF5 file.

Requires the tables (PyTables) package to be installed.

Parameters:
  • path (str | Path) – Path to the HDF5 file.

  • key (str, default: 'data') – The group identifier in the HDF5 store.

  • **kwargs (Any) – Additional keyword arguments forwarded to pandas.read_hdf().

Return type:

DataFrame

Returns:

DataFrame with the HDF5 contents.

write_hdf(data, path, key='data', **kwargs)[source]

Write a DataFrame or Series to HDF5 format.

Requires the tables (PyTables) package to be installed.

Parameters:
Return type:

None

read_excel(path, sheet_name=0, **kwargs)[source]

Read an Excel file.

Parameters:
  • path (str | Path) – Path to the Excel file (.xlsx or .xls).

  • sheet_name (str | int, default: 0) – Name or index of the sheet to read. Defaults to the first sheet.

  • **kwargs (Any) – Additional keyword arguments forwarded to pandas.read_excel().

Return type:

DataFrame

Returns:

DataFrame with the Excel sheet contents.

write_excel(data, path, sheet_name='Sheet1', **kwargs)[source]

Write a DataFrame to Excel format.

Parameters:
Return type:

None

class WebSocketClient[source]

Bases: object

Async WebSocket client for streaming market data.

Wraps the websockets library to provide a simple interface for subscribing to real-time data feeds.

Parameters:
  • url (str) – WebSocket server URL (e.g., "wss://stream.example.com").

  • on_message (Optional[Callable[[str], None]], default: None) – Optional callback invoked with each received message.

  • on_error (Optional[Callable[[Exception], None]], default: None) – Optional callback invoked when an error occurs.

Example

>>> client = WebSocketClient("wss://stream.example.com/v1/ws")
>>> client.on_message = lambda msg: print(msg)
>>> client.run()  # blocks until disconnected
__init__(url, on_message=None, on_error=None)[source]
Parameters:
Return type:

None

async connect()[source]

Open the WebSocket connection.

Requires the websockets package (part of the ingestion extra).

Return type:

None

async disconnect()[source]

Close the WebSocket connection gracefully.

Return type:

None

async subscribe(channels)[source]

Subscribe to one or more data channels.

Parameters:

channels (list[str]) – List of channel identifiers to subscribe to.

Return type:

None

async unsubscribe(channels)[source]

Unsubscribe from one or more data channels.

Parameters:

channels (list[str]) – List of channel identifiers to unsubscribe from.

Return type:

None

run()[source]

Start the WebSocket event loop (blocking).

Connects to the server and listens for messages until the connection is closed or disconnect() is called.

Return type:

None

class TickBuffer[source]

Bases: object

Buffer incoming ticks and aggregate them into OHLCV bars.

Stores raw tick data and groups it into time-based bars at the requested interval.

Parameters:

bar_interval (str, default: '1min') – Pandas-compatible frequency string for bar aggregation (e.g., '1min', '5min', '1h').

Example

>>> buf = TickBuffer(bar_interval="1min")
>>> buf.add_tick(pd.Timestamp("2024-01-02 09:30:00.100"), 150.25, 100)
>>> buf.add_tick(pd.Timestamp("2024-01-02 09:30:00.500"), 150.50, 200)
>>> bars = buf.get_bars()
__init__(bar_interval='1min')[source]
Parameters:

bar_interval (str, default: '1min')

Return type:

None

add_tick(timestamp, price, volume=0)[source]

Add a single tick to the buffer.

Parameters:
  • timestamp (datetime | Timestamp) – Tick timestamp.

  • price (float) – Tick price.

  • volume (float, default: 0) – Tick volume. Defaults to 0.

Return type:

None

get_bars()[source]

Aggregate buffered ticks into OHLCV bars.

Return type:

DataFrame

Returns:

DataFrame with columns open, high, low, close, volume indexed by the bar period start time. Returns an empty DataFrame if no ticks have been added.

flush()[source]

Return completed bars and clear the internal buffer.

Return type:

DataFrame

Returns:

DataFrame with OHLCV bars from all buffered ticks.

clear()[source]

Clear all buffered ticks without returning bars.

Return type:

None

__len__()[source]

Return the number of buffered ticks.

Return type:

int

to_tearsheet(returns, benchmark=None, output_path=None)[source]

Generate a performance tearsheet from a return series.

Computes the key performance and risk metrics that every portfolio analysis should include, and returns them as a serialisable dictionary. Optionally writes the result to a JSON file for reporting or downstream consumption.

Metrics computed: total return, annualized return, annualized volatility, Sharpe ratio, maximum drawdown, and Calmar ratio. When a benchmark is provided, also computes correlation and information ratio.

Parameters:
  • returns (Series) – Series of portfolio returns (simple, not cumulative), indexed by datetime.

  • benchmark (Series | None, default: None) – Optional benchmark return series for relative metrics. When provided, the two series are aligned by index.

  • output_path (str | Path | None, default: None) – If provided, write the tearsheet dict to this JSON file.

Returns:

Dictionary with keys total_return,

annualized_return, annualized_volatility, sharpe_ratio, max_drawdown, calmar_ratio, n_periods, and optionally benchmark_correlation and information_ratio.

Return type:

dict[str, Any]

Example

>>> import pandas as pd, numpy as np
>>> returns = pd.Series(np.random.randn(252) * 0.01)
>>> sheet = to_tearsheet(returns)
>>> "sharpe_ratio" in sheet
True

See also

to_json: Serialize any data to JSON. format_table: Pretty-print a DataFrame.

to_json(data, path=None, orient='records')[source]

Export data to JSON format.

Handles DataFrames, Series, and plain dictionaries. When a file path is provided, the JSON is written to disk; otherwise the JSON string is returned for further use (e.g., sending via an API).

Parameters:
  • data (DataFrame | Series | dict[str, Any]) – Data to serialize. DataFrames and Series use the pandas JSON serializer; plain dicts use the stdlib json module.

  • path (str | Path | None, default: None) – If provided, write the JSON string to this file and return None. Otherwise, return the JSON string.

  • orient (str, default: 'records') – Orientation for pandas.DataFrame.to_json() (e.g., 'records', 'index', 'columns').

Returns:

JSON string when path is None; otherwise

None.

Return type:

str | None

Example

>>> json_str = to_json({"sharpe": 1.2, "max_dd": -0.15})
>>> isinstance(json_str, str)
True

See also

to_dict: Convert to a nested dictionary. to_tearsheet: Generate a full performance report.

to_dict(data)[source]

Convert a DataFrame or Series to a nested dictionary.

For a DataFrame, produces {column: {index: value, ...}, ...}. For a Series, produces {index: value, ...}. Useful for serialization, API responses, or interop with non-pandas code.

Parameters:

data (DataFrame | Series) – DataFrame or Series to convert.

Returns:

Nested dictionary representation of the data.

Return type:

dict[str, Any]

Example

>>> import pandas as pd
>>> s = pd.Series([1, 2, 3], index=["a", "b", "c"])
>>> to_dict(s)
{'a': 1, 'b': 2, 'c': 3}

See also

to_json: Serialize to JSON string.

format_table(data, precision=4, pct_columns=None)[source]

Format a DataFrame as a pretty-printed table string.

Produces a human-readable text table suitable for console output, log files, or email reports. Numeric columns are formatted to a fixed number of decimal places, and designated columns are displayed as percentages.

Parameters:
  • data (DataFrame) – DataFrame to format.

  • precision (int, default: 4) – Number of decimal places for numeric columns (default 4).

  • pct_columns (list[str] | None, default: None) – Column names to format as percentages (values are multiplied by 100 and suffixed with %).

Returns:

String representation of the formatted table.

Return type:

str

Example

>>> import pandas as pd
>>> df = pd.DataFrame({"return": [0.05], "vol": [0.15]})
>>> print(format_table(df, pct_columns=["return", "vol"]))
...

Database

Database connectors for reading and writing financial data.

All functions in this module require the etl optional dependency group which provides SQLAlchemy and connectorx.

read_sql(query, connection_string, **kwargs)[source]

Read data from a SQL database using SQLAlchemy.

Connects to any SQLAlchemy-supported database (PostgreSQL, MySQL, SQLite, etc.), executes the query, and returns the result as a DataFrame. The connection is automatically closed after the read.

Parameters:
  • query (str) – SQL query string or table name.

  • connection_string (str) – SQLAlchemy-compatible connection URI (e.g., "postgresql://user:pass@host/db").

  • **kwargs (Any) – Additional keyword arguments forwarded to pandas.read_sql().

Returns:

DataFrame with the query results.

Return type:

DataFrame

Example

>>> df = read_sql("SELECT * FROM prices", "sqlite:///data.db")

See also

write_sql: Write a DataFrame to a SQL table. read_sql_fast: Faster alternative using connectorx.

write_sql(data, table_name, connection_string, if_exists='append', **kwargs)[source]

Write a DataFrame to a SQL database table.

Inserts the DataFrame rows into the specified table, with configurable behavior when the table already exists. The transaction is committed automatically.

Parameters:
  • data (DataFrame) – DataFrame to write.

  • table_name (str) – Destination table name.

  • connection_string (str) – SQLAlchemy-compatible connection URI.

  • if_exists (str, default: 'append') – Behavior when the table already exists: 'fail' (raise), 'replace' (drop and recreate), or 'append' (insert rows, default).

  • **kwargs (Any) – Additional keyword arguments forwarded to pandas.DataFrame.to_sql().

Return type:

None

Example

>>> write_sql(df, "prices", "sqlite:///data.db")

See also

read_sql: Read data from a SQL database.

create_engine(connection_string, **kwargs)[source]

Create a SQLAlchemy engine.

A thin wrapper that provides a consistent interface and keeps the SQLAlchemy import gated behind the etl extra.

Parameters:
  • connection_string (str) – SQLAlchemy-compatible connection URI.

  • **kwargs (Any) – Additional keyword arguments forwarded to sqlalchemy.create_engine().

Return type:

Any

Returns:

A SQLAlchemy Engine instance.

read_sql_fast(query, connection_string)[source]

Read from a SQL database using connectorx for speed, with pandas fallback.

connectorx can be 5–10x faster than pandas+SQLAlchemy for large result sets because it uses native database drivers and avoids Python-level row iteration. If connectorx is not installed or encounters an unsupported driver, the function transparently falls back to the standard SQLAlchemy path.

Parameters:
  • query (str) – SQL query string.

  • connection_string (str) – Database connection URI.

Returns:

DataFrame with the query results.

Return type:

DataFrame

Example

>>> df = read_sql_fast("SELECT * FROM ticks", "postgresql://user:pw@host/db")

See also

read_sql: Standard SQLAlchemy-based reader.

Files

File format I/O with financial defaults.

Provides convenience wrappers around pandas I/O functions with sensible defaults for financial time-series data (date parsing, index handling, etc.).

read_csv(path, date_column='Date', parse_dates=True, **kwargs)[source]

Read a CSV file with financial defaults.

By default, parses a Date column and sets it as the index, which matches the most common layout for financial time-series CSVs (Yahoo Finance downloads, FRED exports, etc.). Pass parse_dates=False to disable automatic date parsing.

Parameters:
  • path (str | Path) – Path to the CSV file.

  • date_column (str, default: 'Date') – Name of the date column to parse and set as index (default 'Date'). Ignored when parse_dates is False.

  • parse_dates (bool, default: True) – If True, parse date_column as datetime and use it as the DataFrame index.

  • **kwargs (Any) – Additional keyword arguments forwarded to pandas.read_csv().

Returns:

DataFrame with the CSV contents, optionally

date-indexed.

Return type:

DataFrame

Example

>>> df = read_csv("prices.csv")
>>> df = read_csv("data.csv", parse_dates=False)

See also

write_csv: Write a DataFrame to CSV. read_parquet: Faster columnar format for large datasets.

write_csv(data, path, **kwargs)[source]

Write a DataFrame or Series to CSV.

Creates parent directories automatically if they do not exist.

Parameters:
Return type:

None

Example

>>> write_csv(df, "output/prices.csv")

See also

read_csv: Read a CSV file with financial defaults.

read_parquet(path, columns=None, **kwargs)[source]

Read a Parquet file.

Parquet is the recommended format for large financial datasets: it is columnar, compressed, and preserves dtypes (including datetime indices) without the ambiguity of CSV parsing.

Parameters:
  • path (str | Path) – Path to the Parquet file.

  • columns (list[str] | None, default: None) – Subset of columns to read. None reads all columns. Selecting a subset is much faster for wide datasets.

  • **kwargs (Any) – Additional keyword arguments forwarded to pandas.read_parquet().

Returns:

DataFrame with the Parquet contents.

Return type:

DataFrame

Example

>>> df = read_parquet("prices.parquet")
>>> df = read_parquet("prices.parquet", columns=["close", "volume"])

See also

write_parquet: Write a DataFrame to Parquet.

write_parquet(data, path, **kwargs)[source]

Write a DataFrame to Parquet format.

Creates parent directories automatically. Parquet preserves dtypes exactly and is significantly faster to read/write than CSV for large datasets.

Parameters:
Return type:

None

Example

>>> write_parquet(df, "output/prices.parquet")

See also

read_parquet: Read a Parquet file.

read_hdf(path, key='data', **kwargs)[source]

Read an HDF5 file.

Requires the tables (PyTables) package to be installed.

Parameters:
  • path (str | Path) – Path to the HDF5 file.

  • key (str, default: 'data') – The group identifier in the HDF5 store.

  • **kwargs (Any) – Additional keyword arguments forwarded to pandas.read_hdf().

Return type:

DataFrame

Returns:

DataFrame with the HDF5 contents.

write_hdf(data, path, key='data', **kwargs)[source]

Write a DataFrame or Series to HDF5 format.

Requires the tables (PyTables) package to be installed.

Parameters:
Return type:

None

read_excel(path, sheet_name=0, **kwargs)[source]

Read an Excel file.

Parameters:
  • path (str | Path) – Path to the Excel file (.xlsx or .xls).

  • sheet_name (str | int, default: 0) – Name or index of the sheet to read. Defaults to the first sheet.

  • **kwargs (Any) – Additional keyword arguments forwarded to pandas.read_excel().

Return type:

DataFrame

Returns:

DataFrame with the Excel sheet contents.

write_excel(data, path, sheet_name='Sheet1', **kwargs)[source]

Write a DataFrame to Excel format.

Parameters:
Return type:

None

Export

Export and reporting utilities.

Functions for converting financial data into various output formats suitable for reporting, serialization, and display.

to_tearsheet(returns, benchmark=None, output_path=None)[source]

Generate a performance tearsheet from a return series.

Computes the key performance and risk metrics that every portfolio analysis should include, and returns them as a serialisable dictionary. Optionally writes the result to a JSON file for reporting or downstream consumption.

Metrics computed: total return, annualized return, annualized volatility, Sharpe ratio, maximum drawdown, and Calmar ratio. When a benchmark is provided, also computes correlation and information ratio.

Parameters:
  • returns (Series) – Series of portfolio returns (simple, not cumulative), indexed by datetime.

  • benchmark (Series | None, default: None) – Optional benchmark return series for relative metrics. When provided, the two series are aligned by index.

  • output_path (str | Path | None, default: None) – If provided, write the tearsheet dict to this JSON file.

Returns:

Dictionary with keys total_return,

annualized_return, annualized_volatility, sharpe_ratio, max_drawdown, calmar_ratio, n_periods, and optionally benchmark_correlation and information_ratio.

Return type:

dict[str, Any]

Example

>>> import pandas as pd, numpy as np
>>> returns = pd.Series(np.random.randn(252) * 0.01)
>>> sheet = to_tearsheet(returns)
>>> "sharpe_ratio" in sheet
True

See also

to_json: Serialize any data to JSON. format_table: Pretty-print a DataFrame.

to_json(data, path=None, orient='records')[source]

Export data to JSON format.

Handles DataFrames, Series, and plain dictionaries. When a file path is provided, the JSON is written to disk; otherwise the JSON string is returned for further use (e.g., sending via an API).

Parameters:
  • data (DataFrame | Series | dict[str, Any]) – Data to serialize. DataFrames and Series use the pandas JSON serializer; plain dicts use the stdlib json module.

  • path (str | Path | None, default: None) – If provided, write the JSON string to this file and return None. Otherwise, return the JSON string.

  • orient (str, default: 'records') – Orientation for pandas.DataFrame.to_json() (e.g., 'records', 'index', 'columns').

Returns:

JSON string when path is None; otherwise

None.

Return type:

str | None

Example

>>> json_str = to_json({"sharpe": 1.2, "max_dd": -0.15})
>>> isinstance(json_str, str)
True

See also

to_dict: Convert to a nested dictionary. to_tearsheet: Generate a full performance report.

to_dict(data)[source]

Convert a DataFrame or Series to a nested dictionary.

For a DataFrame, produces {column: {index: value, ...}, ...}. For a Series, produces {index: value, ...}. Useful for serialization, API responses, or interop with non-pandas code.

Parameters:

data (DataFrame | Series) – DataFrame or Series to convert.

Returns:

Nested dictionary representation of the data.

Return type:

dict[str, Any]

Example

>>> import pandas as pd
>>> s = pd.Series([1, 2, 3], index=["a", "b", "c"])
>>> to_dict(s)
{'a': 1, 'b': 2, 'c': 3}

See also

to_json: Serialize to JSON string.

format_table(data, precision=4, pct_columns=None)[source]

Format a DataFrame as a pretty-printed table string.

Produces a human-readable text table suitable for console output, log files, or email reports. Numeric columns are formatted to a fixed number of decimal places, and designated columns are displayed as percentages.

Parameters:
  • data (DataFrame) – DataFrame to format.

  • precision (int, default: 4) – Number of decimal places for numeric columns (default 4).

  • pct_columns (list[str] | None, default: None) – Column names to format as percentages (values are multiplied by 100 and suffixed with %).

Returns:

String representation of the formatted table.

Return type:

str

Example

>>> import pandas as pd
>>> df = pd.DataFrame({"return": [0.05], "vol": [0.15]})
>>> print(format_table(df, pct_columns=["return", "vol"]))
...

Cloud Storage

Cloud storage connectors for S3 and Google Cloud Storage.

Functions are gated behind optional dependencies (s3fs/boto3 for AWS S3, gcsfs for Google Cloud Storage) which are part of the etl extra group.

read_s3(bucket, key, **kwargs)[source]

Read a file from Amazon S3 into a DataFrame.

Supports Parquet and CSV formats, determined by the file extension. Requires s3fs (part of the etl extra).

Parameters:
  • bucket (str) – S3 bucket name.

  • key (str) – Object key (path) within the bucket.

  • **kwargs (Any) – Additional keyword arguments forwarded to the underlying pandas reader (read_parquet or read_csv).

Return type:

DataFrame

Returns:

DataFrame with the file contents.

write_s3(data, bucket, key, **kwargs)[source]

Write a DataFrame to Amazon S3.

Supports Parquet and CSV formats, determined by the file extension. Requires s3fs (part of the etl extra).

Parameters:
  • data (DataFrame) – DataFrame to write.

  • bucket (str) – S3 bucket name.

  • key (str) – Object key (path) within the bucket.

  • **kwargs (Any) – Additional keyword arguments forwarded to the underlying pandas writer (to_parquet or to_csv).

Return type:

None

list_s3(bucket, prefix='', **kwargs)[source]

List files in an S3 bucket under a given prefix.

Requires s3fs (part of the etl extra).

Parameters:
  • bucket (str) – S3 bucket name.

  • prefix (str, default: '') – Key prefix to filter results. Defaults to listing the entire bucket.

  • **kwargs (Any) – Additional keyword arguments forwarded to s3fs.S3FileSystem.ls.

Return type:

list[str]

Returns:

List of object keys matching the prefix.

read_gcs(bucket, blob, **kwargs)[source]

Read a file from Google Cloud Storage into a DataFrame.

Supports Parquet and CSV formats, determined by the file extension. Requires gcsfs (part of the etl extra).

Parameters:
  • bucket (str) – GCS bucket name.

  • blob (str) – Blob path within the bucket.

  • **kwargs (Any) – Additional keyword arguments forwarded to the underlying pandas reader.

Return type:

DataFrame

Returns:

DataFrame with the file contents.

write_gcs(data, bucket, blob, **kwargs)[source]

Write a DataFrame to Google Cloud Storage.

Supports Parquet and CSV formats, determined by the file extension. Requires gcsfs (part of the etl extra).

Parameters:
  • data (DataFrame) – DataFrame to write.

  • bucket (str) – GCS bucket name.

  • blob (str) – Blob path within the bucket.

  • **kwargs (Any) – Additional keyword arguments forwarded to the underlying pandas writer.

Return type:

None

Streaming

Real-time data streaming utilities.

Provides a WebSocket client for consuming streaming market data and a tick buffer for aggregating raw ticks into OHLCV bars.

class WebSocketClient[source]

Bases: object

Async WebSocket client for streaming market data.

Wraps the websockets library to provide a simple interface for subscribing to real-time data feeds.

Parameters:
  • url (str) – WebSocket server URL (e.g., "wss://stream.example.com").

  • on_message (Optional[Callable[[str], None]], default: None) – Optional callback invoked with each received message.

  • on_error (Optional[Callable[[Exception], None]], default: None) – Optional callback invoked when an error occurs.

Example

>>> client = WebSocketClient("wss://stream.example.com/v1/ws")
>>> client.on_message = lambda msg: print(msg)
>>> client.run()  # blocks until disconnected
__init__(url, on_message=None, on_error=None)[source]
Parameters:
Return type:

None

async connect()[source]

Open the WebSocket connection.

Requires the websockets package (part of the ingestion extra).

Return type:

None

async disconnect()[source]

Close the WebSocket connection gracefully.

Return type:

None

async subscribe(channels)[source]

Subscribe to one or more data channels.

Parameters:

channels (list[str]) – List of channel identifiers to subscribe to.

Return type:

None

async unsubscribe(channels)[source]

Unsubscribe from one or more data channels.

Parameters:

channels (list[str]) – List of channel identifiers to unsubscribe from.

Return type:

None

run()[source]

Start the WebSocket event loop (blocking).

Connects to the server and listens for messages until the connection is closed or disconnect() is called.

Return type:

None

class TickBuffer[source]

Bases: object

Buffer incoming ticks and aggregate them into OHLCV bars.

Stores raw tick data and groups it into time-based bars at the requested interval.

Parameters:

bar_interval (str, default: '1min') – Pandas-compatible frequency string for bar aggregation (e.g., '1min', '5min', '1h').

Example

>>> buf = TickBuffer(bar_interval="1min")
>>> buf.add_tick(pd.Timestamp("2024-01-02 09:30:00.100"), 150.25, 100)
>>> buf.add_tick(pd.Timestamp("2024-01-02 09:30:00.500"), 150.50, 200)
>>> bars = buf.get_bars()
__init__(bar_interval='1min')[source]
Parameters:

bar_interval (str, default: '1min')

Return type:

None

add_tick(timestamp, price, volume=0)[source]

Add a single tick to the buffer.

Parameters:
  • timestamp (datetime | Timestamp) – Tick timestamp.

  • price (float) – Tick price.

  • volume (float, default: 0) – Tick volume. Defaults to 0.

Return type:

None

get_bars()[source]

Aggregate buffered ticks into OHLCV bars.

Return type:

DataFrame

Returns:

DataFrame with columns open, high, low, close, volume indexed by the bar period start time. Returns an empty DataFrame if no ticks have been added.

flush()[source]

Return completed bars and clear the internal buffer.

Return type:

DataFrame

Returns:

DataFrame with OHLCV bars from all buffered ticks.

clear()[source]

Clear all buffered ticks without returning bars.

Return type:

None

__len__()[source]

Return the number of buffered ticks.

Return type:

int