"""Data loading and validation utilities for SyNG-BTS.
This module provides helpers for loading bundled/user data, resolving flexible
input forms (DataFrame, path, bundled name), and validating feature matrices.
"""
from pathlib import Path
import pandas as pd
# Try importlib.resources (Python 3.9+) with fallback to importlib_resources
try:
from importlib.resources import as_file, files
except ImportError:
from importlib_resources import as_file, files
def load_bundled_data(
subdir: str,
filename: str,
groups_filename: str | None = None,
) -> tuple[pd.DataFrame, pd.Series | None]:
"""
Load a bundled Parquet data file from the package's data directory.
Parameters
----------
subdir : str
The subdirectory within syng_bts/data/ (e.g., "examples", "transfer").
filename : str
The Parquet filename to load (feature-only).
groups_filename : str or None
Optional sidecar Parquet filename containing a ``groups`` column.
When provided and the file exists, a :class:`pd.Series` is returned
as the second element of the tuple.
Returns
-------
tuple[pd.DataFrame, pd.Series | None]
``(features_df, groups_series_or_none)``. The groups Series shares
the same index as the features DataFrame.
"""
features_df = _read_bundled_parquet(subdir, filename)
groups: pd.Series | None = None
if groups_filename is not None:
try:
groups_df = _read_bundled_parquet(subdir, groups_filename)
groups = groups_df["groups"]
except (FileNotFoundError, KeyError):
groups = None
return features_df, groups
def _read_bundled_parquet(subdir: str, filename: str) -> pd.DataFrame:
"""Read a single Parquet file from the bundled data directory."""
try:
data_package = files("syng_bts.data")
resource = data_package
for part in subdir.split("/"):
resource = resource.joinpath(part)
resource = resource.joinpath(filename)
with as_file(resource) as path:
return pd.read_parquet(path, engine="pyarrow")
except (TypeError, AttributeError, FileNotFoundError) as e:
import syng_bts
package_dir = Path(syng_bts.__file__).parent
file_path = package_dir / "data" / subdir / filename
if file_path.exists():
return pd.read_parquet(file_path, engine="pyarrow")
raise FileNotFoundError(
f"Could not find bundled data file: {subdir}/{filename}"
) from e
# Map of known bundled datasets to their package locations and subdirectories.
# Format: "dataset_name": ("subdir_path", "features.parquet", "groups.parquet" | None)
BUNDLED_DATASETS: dict[str, tuple[str, str, str | None]] = {
# Example datasets
"SKCMPositive_4": ("examples", "SKCMPositive_4.parquet", None),
# Transfer learning datasets
"BRCA": ("transfer", "BRCA.parquet", None),
"PRAD": ("transfer", "PRAD.parquet", None),
# BRCA subtype case study
"BRCASubtypeSel": (
"case/brca_subtype",
"BRCASubtypeSel.parquet",
"BRCASubtypeSel_groups.parquet",
),
"BRCASubtypeSel_test": (
"case/brca_subtype",
"BRCASubtypeSel_test.parquet",
"BRCASubtypeSel_test_groups.parquet",
),
"BRCASubtypeSel_train": (
"case/brca_subtype",
"BRCASubtypeSel_train.parquet",
"BRCASubtypeSel_train_groups.parquet",
),
"BRCASubtypeSel_train_epoch285_CVAE1-20_generated": (
"case/brca_subtype",
"BRCASubtypeSel_train_epoch285_CVAE1-20_generated.parquet",
"BRCASubtypeSel_train_epoch285_CVAE1-20_generated_groups.parquet",
),
# LIHC subtype case study
"LIHCSubtypeFamInd": (
"case/lihc_subtype",
"LIHCSubtypeFamInd.parquet",
"LIHCSubtypeFamInd_groups.parquet",
),
"LIHCSubtypeFamInd_DESeq": (
"case/lihc_subtype",
"LIHCSubtypeFamInd_DESeq.parquet",
"LIHCSubtypeFamInd_DESeq_groups.parquet",
),
"LIHCSubtypeFamInd_test74": (
"case/lihc_subtype",
"LIHCSubtypeFamInd_test74.parquet",
"LIHCSubtypeFamInd_test74_groups.parquet",
),
"LIHCSubtypeFamInd_test74_DESeq": (
"case/lihc_subtype",
"LIHCSubtypeFamInd_test74_DESeq.parquet",
"LIHCSubtypeFamInd_test74_DESeq_groups.parquet",
),
"LIHCSubtypeFamInd_train294": (
"case/lihc_subtype",
"LIHCSubtypeFamInd_train294.parquet",
"LIHCSubtypeFamInd_train294_groups.parquet",
),
"LIHCSubtypeFamInd_train294_DESeq": (
"case/lihc_subtype",
"LIHCSubtypeFamInd_train294_DESeq.parquet",
"LIHCSubtypeFamInd_train294_DESeq_groups.parquet",
),
}
[docs]
def list_bundled_datasets() -> list:
"""
List all available bundled datasets.
Returns
-------
list
List of dataset names that can be loaded with :func:`resolve_data`.
"""
return list(BUNDLED_DATASETS.keys())
def _read_user_file(path: Path) -> pd.DataFrame:
"""Read a user-provided CSV or Parquet file.
Raises
------
ValueError
If the file extension is not ``.csv`` or ``.parquet``.
"""
ext = path.suffix.lower()
if ext == ".parquet":
return pd.read_parquet(path, engine="pyarrow")
if ext == ".csv":
return pd.read_csv(path, header=0)
raise ValueError(
f"Unsupported file type '{ext}' for '{path.name}'. "
"Only .csv and .parquet are supported."
)
# ---------------------------------------------------------------------------
# Strict data-contract validator
# ---------------------------------------------------------------------------
_METADATA_COLUMNS = {"groups", "samples"}
def _validate_feature_data(df: pd.DataFrame) -> None:
"""Validate that a DataFrame conforms to the feature-only contract.
Experiment entry points should call this **after** resolving the data
(i.e. after ``resolve_data()``). The validator checks:
1. All columns are numeric.
2. No metadata-like columns (``groups``, ``samples``) are present.
3. The index contains unique values (used as sample identifiers).
Parameters
----------
df : pd.DataFrame
The feature DataFrame to validate.
Raises
------
ValueError
If any of the above constraints are violated.
"""
# 1. Reject metadata-like column names
bad_cols = _METADATA_COLUMNS & {c.lower() for c in df.columns}
if bad_cols:
offending = [c for c in df.columns if c.lower() in bad_cols]
raise ValueError(
f"DataFrame contains metadata column(s) {offending!r} which must "
"not be included in the feature matrix. Pass group labels via the "
"'groups' parameter instead and ensure sample IDs are in the "
"DataFrame index, not a column."
)
# 2. All columns must be numeric
non_numeric = df.select_dtypes(exclude="number").columns.tolist()
if non_numeric:
raise ValueError(
f"DataFrame contains non-numeric column(s): {non_numeric!r}. "
"Only numeric feature columns are allowed. Remove or convert "
"non-numeric columns before passing to experiment functions."
)
# 3. Index must be unique
if not df.index.is_unique:
n_dup = df.index.duplicated().sum()
raise ValueError(
f"DataFrame index contains {n_dup} duplicate value(s). "
"Each row must have a unique identifier (sample ID) as its index."
)
[docs]
def resolve_data(
data: "pd.DataFrame | str | Path",
) -> "tuple[pd.DataFrame, pd.Series | None]":
"""
Resolve a flexible data input to a pandas DataFrame and optional groups.
Accepts a DataFrame (returned as-is with ``None`` groups), a file path
(loaded via ``pd.read_csv`` / ``pd.read_parquet``), or the name of a
bundled dataset.
Parameters
----------
data : pd.DataFrame, str, or Path
One of:
- A ``pd.DataFrame`` — returned directly with groups ``None``.
- A ``str`` or ``Path`` pointing to an existing CSV or Parquet file
(must include an extension such as ``.csv`` or ``.parquet``).
- A plain name (no extension, no path separators) of a bundled
dataset, e.g. ``"SKCMPositive_4"``.
Returns
-------
tuple[pd.DataFrame, pd.Series | None]
``(features_df, groups_or_none)``. Groups are a :class:`pd.Series`
only when the input is a bundled dataset that ships with a groups
sidecar. For user-provided files and DataFrames, groups are
always ``None``.
Raises
------
ValueError
If *data* looks like a bundled-dataset name but is not found in the
registry. The error message lists all available bundled datasets.
FileNotFoundError
If *data* looks like a file path but the file does not exist.
TypeError
If *data* is not a DataFrame, str, or Path.
Examples
--------
>>> from syng_bts.data_utils import resolve_data
>>> df, groups = resolve_data("SKCMPositive_4") # bundled
>>> df, groups = resolve_data("./my_data/custom.csv") # file path
>>> df, groups = resolve_data(existing_dataframe) # pass-through
"""
# 1. DataFrame pass-through
if isinstance(data, pd.DataFrame):
return data, None
# 2. Convert to string for inspection
if isinstance(data, Path):
data_str = str(data)
elif isinstance(data, str):
data_str = data
else:
raise TypeError(
f"'data' must be a pd.DataFrame, str, or Path, got {type(data).__name__}"
)
path = Path(data_str)
# 3. If it looks like a real file path (has path separators), try to load it
has_separators = "/" in data_str or "\\" in data_str
if has_separators:
if path.exists():
return _read_user_file(path), None
raise FileNotFoundError(f"Data file not found: {path}")
# 4. Treat as a bundled dataset name — strip .csv/.parquet if the user added it
name = data_str
if name.lower().endswith(".csv"):
name = name[: -len(".csv")]
elif name.lower().endswith(".parquet"):
name = name[: -len(".parquet")]
bundled_info = BUNDLED_DATASETS.get(name)
if bundled_info is not None:
subdir, filename, groups_filename = bundled_info
return load_bundled_data(subdir, filename, groups_filename)
# 5. Last resort: try as a local file (e.g. "myfile.csv" in cwd)
if path.suffix and path.exists():
return _read_user_file(path), None
available = ", ".join(sorted(BUNDLED_DATASETS.keys()))
raise ValueError(
f"Unknown dataset name '{name}'. Available bundled datasets: {available}"
)
def _derive_dataname(
data: "pd.DataFrame | str | Path",
name: "str | None" = None,
) -> str:
"""
Derive a short human-readable name for a dataset.
The name is used in output filenames and metadata. An explicit *name*
always takes priority.
Parameters
----------
data : pd.DataFrame, str, or Path
The original ``data`` argument the user passed.
name : str or None
Explicit override. When provided, returned as-is.
Returns
-------
str
A short identifier for the dataset.
"""
if name is not None:
return name
if isinstance(data, (str, Path)):
p = Path(data)
# Strip .csv extension if present for bundled-name lookup
stem = p.stem if p.suffix else str(p)
# If user passed a bare name like "SKCMPositive_4", stem == name
return stem
# DataFrame: try df.attrs["name"], fall back to "data"
if isinstance(data, pd.DataFrame):
return data.attrs.get("name", "data")
return "data"