Skip to content

BamStore

quantnado.dataset.store_bam.BamStore

BamStore(
    store_path: Path | str,
    chromsizes: dict[str, int] | Path | str,
    sample_names: list[str],
    *,
    chunk_len: int | None = None,
    construction_compression: str = DEFAULT_CONSTRUCTION_COMPRESSION,
    overwrite: bool = True,
    resume: bool = False,
    read_only: bool = False,
    stranded: "str | list[str] | dict[str, str] | None" = None,
)

Bases: BaseStore

Zarr-backed BAM signal store for per-chromosome, per-sample data and metadata.

Use from_bam_files to create a new store, or open to load an existing one.

By default, open attaches in read-only mode to prevent accidental data corruption. To modify the store, pass read_only=False.

Example: # Read-only (default) store = BamStore.open("/path/to/store.zarr") print(store.sample_names)

# Writable
store = BamStore.open("/path/to/store.zarr", read_only=False)
store.add_metadata_column(...)
Source code in quantnado/dataset/store_bam.py
def __init__(
    self,
    store_path: Path | str,
    chromsizes: dict[str, int] | Path | str,
    sample_names: list[str],
    *,
    chunk_len: int | None = None,
    construction_compression: str = DEFAULT_CONSTRUCTION_COMPRESSION,
    overwrite: bool = True,
    resume: bool = False,
    read_only: bool = False,
    stranded: "str | list[str] | dict[str, str] | None" = None,
) -> None:
    self.path = Path(store_path)
    self.store_path = self._normalize_path(self.path)

    # Initialize BaseStore attributes
    if self.store_path.exists() and not overwrite:
        self.root = zarr.open_group(str(self.store_path), mode="r" if read_only else "r+")
        self._init_common_attributes(sample_names)
    else:
        # For new or overwritten stores, we don't have a zarr group yet
        # but we need to set up the basic attributes for processing
        self.sample_names = [str(s) for s in sample_names]
        self._setup_sample_lookup()
        self._chromsizes = _parse_chromsizes(chromsizes)
        self._chromosomes = sorted(list(self._chromsizes.keys()))
        self.completed_mask_raw = np.zeros(len(self.sample_names), dtype=bool)
        self._metadata_cache = None

    self.n_samples = len(self.sample_names)
    self.sample_hash = _compute_sample_hash(self.sample_names)
    self.construction_compression, self.compressors = _resolve_construction_compressors(
        construction_compression
    )
    self.read_only = read_only
    self._strandedness_map: dict[str, str | None] = _normalize_strandedness(
        stranded, self.sample_names
    )
    # Expose a summary attribute: the original value (for attrs storage / repr)
    self.stranded = stranded

    if self.n_samples == 0:
        raise ValueError("sample_names must not be empty")

    if self.store_path.exists():
        if overwrite:
            if read_only:
                raise ValueError("Cannot overwrite store in read-only mode.")
            logger.warning(f"Deleting existing store at: {self.store_path}")
            if self.store_path.is_dir():
                shutil.rmtree(self.store_path)
            else:
                self.store_path.unlink()
            self.chunk_len = _resolve_chunk_len(
                self.chromsizes,
                self.store_path,
                chunk_len,
            )
            self._init_store()
        elif resume:
            self._load_existing()
            self.chunk_len = int(
                self.root.attrs.get(
                    "chunk_len",
                    _resolve_chunk_len(self.chromsizes, self.store_path, chunk_len),
                )
            )
            self._validate_sample_names()
        else:
            raise FileExistsError(
                f"Store already exists at {self.store_path}; set overwrite=True or resume=True"
            )
    else:
        if read_only:
            raise FileNotFoundError(f"Store does not exist at {self.store_path} (read_only=True)")
        self.chunk_len = _resolve_chunk_len(
            self.chromsizes,
            self.store_path,
            chunk_len,
        )
        self._init_store()

library_sizes property

library_sizes: Series | None

Total mapped reads per sample as a pd.Series indexed by sample name.

Returns None for stores built before library-size tracking was added. Use :func:quantnado.get_library_sizes to get a helpful error in that case.

dataset property

dataset

Expose the root Zarr group.

n_completed property

n_completed: int

Return number of completed samples.

open classmethod

open(
    store_path: str | Path, read_only: bool = True
) -> "BamStore"

Open an existing BAM Zarr store for reading (default) or writing.

Args: store_path: Path to the Zarr store directory. read_only: If True (default), disables all write operations.

Returns: BamStore instance attached to the on-disk store.

Raises: FileNotFoundError: If the store or required attributes are missing. ValueError: If the store is missing required metadata.

Source code in quantnado/dataset/store_bam.py
@classmethod
def open(cls, store_path: str | Path, read_only: bool = True) -> "BamStore":
    """
    Open an existing BAM Zarr store for reading (default) or writing.

    Args:
        store_path: Path to the Zarr store directory.
        read_only: If True (default), disables all write operations.

    Returns:
        BamStore instance attached to the on-disk store.

    Raises:
        FileNotFoundError: If the store or required attributes are missing.
        ValueError: If the store is missing required metadata.
    """
    store_path = cls._normalize_path(store_path)
    if not store_path.exists():
        raise FileNotFoundError(f"Store does not exist at {store_path}")
    # Open Zarr group in appropriate mode
    mode = "r" if read_only else "r+"
    group = zarr.open_group(str(store_path), mode=mode)
    # Read required root attributes
    try:
        sample_names = list(group.attrs["sample_names"])
        chromsizes = dict(group.attrs["chromsizes"])
        chunk_len = int(group.attrs["chunk_len"])
    except KeyError as e:
        raise ValueError(f"Missing required attribute in store: {e}")
    # stranded may be stored as a dict (new format) or string (old format)
    raw_strand = group.attrs.get("stranded")
    if isinstance(raw_strand, dict):
        # Convert back to a per-sample dict, dropping empty strings → None
        stranded: "str | dict[str, str] | None" = {
            s: (lt if lt else None) for s, lt in raw_strand.items()
        }
        # If all values are None, simplify to None
        if not any(stranded.values()):
            stranded = None
    else:
        stranded = raw_strand or None
    # Return BamStore instance
    return cls(
        store_path=store_path,
        chromsizes=chromsizes,
        sample_names=sample_names,
        chunk_len=chunk_len,
        overwrite=False,
        resume=True,
        read_only=read_only,
        stranded=stranded,
    )

process_samples

process_samples(
    bam_files: list[str], max_workers: int = 1
) -> None

Process BAM files and write signal data to the zarr store.

Samples are processed sequentially to minimise peak memory. Each sample's chromosomes are streamed directly to disk so only a single chromosome's array is ever resident in memory at a time.

Within each sample, chromosome processing is parallelised using max_workers threads (bamnado releases the GIL).

Source code in quantnado/dataset/store_bam.py
def process_samples(
    self,
    bam_files: list[str],
    max_workers: int = 1,
) -> None:
    """Process BAM files and write signal data to the zarr store.

    Samples are processed **sequentially** to minimise peak memory.  Each
    sample's chromosomes are streamed directly to disk so only a single
    chromosome's array is ever resident in memory at a time.

    Within each sample, chromosome processing is parallelised using
    ``max_workers`` threads (bamnado releases the GIL).
    """
    if len(bam_files) != self.n_samples:
        raise ValueError("bam_files length must match number of sample_names")

    chromsizes_dict = self.chromsizes
    completed = self.completed_mask

    for sample_idx, (bam_file, sample_name) in enumerate(
        zip(bam_files, self.sample_names)
    ):
        if completed[sample_idx]:
            logger.info(
                f"Skipping completed sample '{sample_name}' (index {sample_idx})"
            )
            continue

        self._process_and_write_single_sample(
            sample_idx,
            bam_file,
            sample_name,
            chromsizes_dict,
            max_workers=max_workers,
        )

    all_sparsity = self.meta["sparsity"][:]
    if np.isfinite(all_sparsity).any():
        self.root.attrs["average_sparsity"] = float(np.nanmean(all_sparsity))

from_bam_files classmethod

from_bam_files(
    bam_files: list[str],
    chromsizes: str | Path | dict[str, int] | None = None,
    store_path: Path | str | None = None,
    metadata: DataFrame
    | Path
    | str
    | list[Path | str]
    | None = None,
    bam_sample_names: list[str] | None = None,
    *,
    filter_chromosomes: bool = True,
    overwrite: bool = True,
    resume: bool = False,
    sample_column: str = "sample_id",
    chunk_len: int | None = None,
    construction_compression: str = DEFAULT_CONSTRUCTION_COMPRESSION,
    local_staging: bool = False,
    staging_dir: Path | str | None = None,
    max_workers: int = 1,
    log_file: Path | None = None,
    test: bool = False,
    stranded: "str | list[str] | dict[str, str] | None" = None,
) -> "BamStore"

Create BamStore from list of BAM files and optionally attach metadata.

If chunk_len is omitted, a filesystem-aware value is derived from quantnado.utils.estimate_chunk_len using the destination store path.

If local_staging is enabled or staging_dir is provided, construction is performed under scratch storage and then published to the final store path.

construction_compression controls build-time compression only and does not affect reader compatibility.

max_workers controls chromosome-level parallelism within each sample. Samples are processed sequentially to keep memory usage low.

Source code in quantnado/dataset/store_bam.py
@classmethod
def from_bam_files(
    cls,
    bam_files: list[str],
    chromsizes: str | Path | dict[str, int] | None = None,
    store_path: Path | str | None = None,
    metadata: pd.DataFrame | Path | str | list[Path | str] | None = None,
    bam_sample_names: list[str] | None = None,
    *,
    filter_chromosomes: bool = True,
    overwrite: bool = True,
    resume: bool = False,
    sample_column: str = "sample_id",
    chunk_len: int | None = None,
    construction_compression: str = DEFAULT_CONSTRUCTION_COMPRESSION,
    local_staging: bool = False,
    staging_dir: Path | str | None = None,
    max_workers: int = 1,
    log_file: Path | None = None,
    test: bool = False,
    stranded: "str | list[str] | dict[str, str] | None" = None,
) -> "BamStore":
    """
    Create BamStore from list of BAM files and optionally attach metadata.

    If chunk_len is omitted, a filesystem-aware value is derived from
    quantnado.utils.estimate_chunk_len using the destination store path.

    If local_staging is enabled or staging_dir is provided, construction is
    performed under scratch storage and then published to the final store path.

    construction_compression controls build-time compression only and does
    not affect reader compatibility.

    max_workers controls chromosome-level parallelism within each sample.
    Samples are processed sequentially to keep memory usage low.
    """

    if log_file is not None:
        from quantnado.utils import setup_logging

        setup_logging(Path(log_file), verbose=False)

    if chromsizes is None:
        if not bam_files:
            raise ValueError(
                "bam_files list is empty; cannot extract chromsizes from BAM."
            )
        logger.info(f"Extracting chromsizes from {bam_files[0]}")
        chromsizes_raw = _get_chromsizes_from_bam(bam_files[0])
    else:
        chromsizes_raw = chromsizes

    chromsizes_dict = _parse_chromsizes(
        chromsizes_raw, filter_chromosomes=filter_chromosomes, test=test
    )

    if bam_sample_names is not None:
        if len(bam_sample_names) != len(bam_files):
            raise ValueError(
                f"bam_sample_names length ({len(bam_sample_names)}) != bam_files length ({len(bam_files)})"
            )
        sample_names = bam_sample_names
    else:
        sample_names = [Path(f).stem for f in bam_files]

    if store_path is None:
        raise ValueError("store_path must be provided.")

    final_store_path = cls._normalize_path(store_path)
    staging_enabled = local_staging or staging_dir is not None

    if resume and staging_enabled:
        raise ValueError(
            "resume=True is not supported with local staging; resume the final store directly"
        )

    if staging_enabled and final_store_path.exists() and not overwrite:
        raise FileExistsError(
            f"Store already exists at {final_store_path}; set overwrite=True to publish staged output"
        )

    build_store_path = (
        _build_staging_store_path(final_store_path, staging_dir)
        if staging_enabled
        else final_store_path
    )
    resolved_chunk_len = _resolve_chunk_len(
        chromsizes_dict,
        final_store_path,
        chunk_len,
    )

    if staging_enabled:
        logger.info(
            f"Building dataset under staging path {build_store_path} before publishing to {final_store_path}"
        )

    try:
        store = cls(
            store_path=build_store_path,
            chromsizes=chromsizes_dict,
            sample_names=sample_names,
            chunk_len=resolved_chunk_len,
            construction_compression=construction_compression,
            overwrite=True if staging_enabled else overwrite,
            resume=False if staging_enabled else resume,
            stranded=stranded,
        )
        store.process_samples(bam_files, max_workers=max_workers)

        if metadata is not None:
            if isinstance(metadata, list):
                metadata_df = cls._combine_metadata_files(metadata)
            elif isinstance(metadata, (str, Path)):
                metadata_df = pd.read_csv(metadata)
            else:
                metadata_df = metadata

            store.set_metadata(metadata_df, sample_column=sample_column)

        if staging_enabled:
            _publish_staged_store(build_store_path, final_store_path)
            logger.info(f"Published staged dataset to {final_store_path}")
            return cls.open(final_store_path, read_only=False)

        return store
    except Exception:
        if staging_enabled:
            _delete_store_path(build_store_path)
        raise

metadata_from_csv classmethod

metadata_from_csv(
    path: Path | str, **kwargs
) -> pd.DataFrame

Helper to load metadata from CSV.

Source code in quantnado/dataset/store_bam.py
@classmethod
def metadata_from_csv(cls, path: Path | str, **kwargs) -> pd.DataFrame:
    """Helper to load metadata from CSV."""
    return pd.read_csv(path, **kwargs)

metadata_from_json classmethod

metadata_from_json(path: Path | str) -> pd.DataFrame

Helper to load metadata from JSON.

Source code in quantnado/dataset/store_bam.py
@classmethod
def metadata_from_json(cls, path: Path | str) -> pd.DataFrame:
    """Helper to load metadata from JSON."""
    with open(path) as f:
        data = json.load(f)
    return pd.DataFrame(data)