Source code for wraquant.io.cloud
"""Cloud storage connectors for S3 and Google Cloud Storage.
Functions are gated behind optional dependencies (``s3fs``/``boto3`` for
AWS S3, ``gcsfs`` for Google Cloud Storage) which are part of the ``etl``
extra group.
"""
from __future__ import annotations
from typing import Any
import pandas as pd
from wraquant.core.decorators import requires_extra
__all__ = [
"read_s3",
"write_s3",
"list_s3",
"read_gcs",
"write_gcs",
]
[docs]
@requires_extra("etl")
def read_s3(
bucket: str,
key: str,
**kwargs: Any,
) -> pd.DataFrame:
"""Read a file from Amazon S3 into a DataFrame.
Supports Parquet and CSV formats, determined by the file extension.
Requires ``s3fs`` (part of the ``etl`` extra).
Parameters:
bucket: S3 bucket name.
key: Object key (path) within the bucket.
**kwargs: Additional keyword arguments forwarded to the
underlying pandas reader (``read_parquet`` or ``read_csv``).
Returns:
DataFrame with the file contents.
"""
s3_path = f"s3://{bucket}/{key}"
if key.endswith(".parquet") or key.endswith(".pq"):
return pd.read_parquet(s3_path, **kwargs)
else:
return pd.read_csv(s3_path, **kwargs)
[docs]
@requires_extra("etl")
def write_s3(
data: pd.DataFrame,
bucket: str,
key: str,
**kwargs: Any,
) -> None:
"""Write a DataFrame to Amazon S3.
Supports Parquet and CSV formats, determined by the file extension.
Requires ``s3fs`` (part of the ``etl`` extra).
Parameters:
data: DataFrame to write.
bucket: S3 bucket name.
key: Object key (path) within the bucket.
**kwargs: Additional keyword arguments forwarded to the
underlying pandas writer (``to_parquet`` or ``to_csv``).
"""
s3_path = f"s3://{bucket}/{key}"
if key.endswith(".parquet") or key.endswith(".pq"):
data.to_parquet(s3_path, **kwargs)
else:
data.to_csv(s3_path, **kwargs)
[docs]
@requires_extra("etl")
def list_s3(
bucket: str,
prefix: str = "",
**kwargs: Any,
) -> list[str]:
"""List files in an S3 bucket under a given prefix.
Requires ``s3fs`` (part of the ``etl`` extra).
Parameters:
bucket: S3 bucket name.
prefix: Key prefix to filter results. Defaults to listing the
entire bucket.
**kwargs: Additional keyword arguments forwarded to
``s3fs.S3FileSystem.ls``.
Returns:
List of object keys matching the prefix.
"""
import s3fs
fs = s3fs.S3FileSystem(**kwargs)
path = f"{bucket}/{prefix}" if prefix else bucket
return fs.ls(path)
[docs]
@requires_extra("etl")
def read_gcs(
bucket: str,
blob: str,
**kwargs: Any,
) -> pd.DataFrame:
"""Read a file from Google Cloud Storage into a DataFrame.
Supports Parquet and CSV formats, determined by the file extension.
Requires ``gcsfs`` (part of the ``etl`` extra).
Parameters:
bucket: GCS bucket name.
blob: Blob path within the bucket.
**kwargs: Additional keyword arguments forwarded to the
underlying pandas reader.
Returns:
DataFrame with the file contents.
"""
gcs_path = f"gs://{bucket}/{blob}"
if blob.endswith(".parquet") or blob.endswith(".pq"):
return pd.read_parquet(gcs_path, **kwargs)
else:
return pd.read_csv(gcs_path, **kwargs)
[docs]
@requires_extra("etl")
def write_gcs(
data: pd.DataFrame,
bucket: str,
blob: str,
**kwargs: Any,
) -> None:
"""Write a DataFrame to Google Cloud Storage.
Supports Parquet and CSV formats, determined by the file extension.
Requires ``gcsfs`` (part of the ``etl`` extra).
Parameters:
data: DataFrame to write.
bucket: GCS bucket name.
blob: Blob path within the bucket.
**kwargs: Additional keyword arguments forwarded to the
underlying pandas writer.
"""
gcs_path = f"gs://{bucket}/{blob}"
if blob.endswith(".parquet") or blob.endswith(".pq"):
data.to_parquet(gcs_path, **kwargs)
else:
data.to_csv(gcs_path, **kwargs)