Skip to content

BamStore

quantnado.dataset.store_bam.BamStore

BamStore(
    store_path: Path | str,
    assay: str,
    sample: str,
    chromsizes: dict[str, int],
    *,
    ip: str | None = None,
    stranded: str | None = None,
    viewpoints: list[str] | None = None,
    chunk_len: int,
    compressors: list,
    overwrite: bool = True,
)

Per-sample Zarr store for BAM-derived genomic coverage.

One zarr per BAM file. Each chromosome is a zarr group containing (1, chrom_len) arrays — one row per sample (always 1 for per-sample stores).

Use :meth:from_bam_files to create a new store or :meth:open to read one.

Source code in quantnado/dataset/store_bam.py
def __init__(
    self,
    store_path: Path | str,
    assay: str,
    sample: str,
    chromsizes: dict[str, int],
    *,
    ip: str | None = None,
    stranded: str | None = None,
    viewpoints: list[str] | None = None,
    chunk_len: int,
    compressors: list,
    overwrite: bool = True,
) -> None:
    self.store_path = _normalize_path(store_path)
    self.assay = assay
    self.sample = sample
    self.ip = ip
    self.chromsizes = chromsizes
    self.chromosomes = sorted(chromsizes.keys())
    self.stranded = stranded if isinstance(stranded, str) and stranded in ("R", "F") else None
    self.viewpoints = viewpoints or []
    self.chunk_len = chunk_len
    self.compressors = compressors
    self._array_key = array_key(assay, ip)

    if overwrite:
        _delete_path(self.store_path)

    self.store_path.parent.mkdir(parents=True, exist_ok=True)
    local_store = LocalStore(str(self.store_path))
    self.root = zarr.group(store=local_store, overwrite=True, zarr_format=3)
    self.meta = create_metadata_group(self.root, sample)
    self._init_arrays()
    self.root.attrs.update({
        "assay": assay,
        "sample": sample,
        "ip": ip or "",
        "chromsizes": chromsizes,
        "chunk_len": chunk_len,
        "stranded": stranded or "",
        "viewpoints": self.viewpoints,
    })

open classmethod

open(
    store_path: str | Path, read_only: bool = True
) -> "BamStore"

Open an existing per-sample BamStore zarr (read-only by default).

Source code in quantnado/dataset/store_bam.py
@classmethod
def open(cls, store_path: str | Path, read_only: bool = True) -> "BamStore":
    """Open an existing per-sample BamStore zarr (read-only by default)."""
    store_path = _normalize_path(store_path)
    if not store_path.exists():
        raise FileNotFoundError(f"Store not found: {store_path}")
    mode = "r" if read_only else "r+"
    if str(store_path).endswith(".zarr.zip"):
        root = zarr.open_group(store=ZipStore(str(store_path), mode="r"), mode="r")
    else:
        root = zarr.open_group(str(store_path), mode=mode)
    # Return a thin wrapper — just expose the root for QuantNadoDataset
    obj = object.__new__(cls)
    obj.store_path = store_path
    obj.root = root
    attrs = dict(root.attrs)
    obj.assay = attrs.get("assay", "")
    obj.sample = attrs.get("sample", "")
    obj.ip = attrs.get("ip", "")
    obj.chromsizes = {str(k): int(v) for k, v in attrs.get("chromsizes", {}).items()}
    obj.chromosomes = sorted(obj.chromsizes.keys())
    obj.chunk_len = int(attrs.get("chunk_len", 65536))
    raw_stranded = attrs.get("stranded", "")
    obj.stranded = raw_stranded if raw_stranded in ("R", "F") else None
    obj.viewpoints = attrs.get("viewpoints", [])
    obj.meta = root.get("metadata")
    obj._array_key = array_key(obj.assay, obj.ip)
    return obj

from_bam_files classmethod

from_bam_files(
    bam_path: str | Path,
    store_path: Path | str,
    assay: str,
    sample: str,
    chromsizes: str | Path | dict[str, int] | None = None,
    *,
    ip: str | None = None,
    stranded: str | None = None,
    bam_filter: ReadFilter | None = None,
    count_fragments: bool | None = None,
    viewpoint_tag: str = "VP",
    chunk_len: int | None = None,
    construction_compression: str = DEFAULT_CONSTRUCTION_COMPRESSION,
    overwrite: bool = True,
    filter_chromosomes: bool = True,
    test: bool = False,
    test_chromosomes: list[str]
    | tuple[str, ...]
    | None = None,
    staging_dir: Path | str | None = None,
    log_file: Path | None = None,
) -> "BamStore"

Create a per-sample BamStore zarr from a single BAM file.

Parameters:

Name Type Description Default
bam_path str | Path

Path to the aligned BAM file.

required
store_path Path | str

Output .zarr directory.

required
assay str

Assay type string (e.g. "ATAC", "ChIP", "CUT&TAG", "RNA", "MCC").

required
sample str

Sample name (used in metadata and as sample coordinate).

required
chromsizes str | Path | dict[str, int] | None

Path to .chrom.sizes file, dict, or None to infer from BAM header.

None
ip str | None

IP target (ChIP/CUT&TAG only). Combined with assay to form array key.

None
stranded str | None

Strand orientation: "R" (reverse), "F" (forward), or None (unstranded). Required for RNA assays.

None
Source code in quantnado/dataset/store_bam.py
@classmethod
def from_bam_files(
    cls,
    bam_path: str | Path,
    store_path: Path | str,
    assay: str,
    sample: str,
    chromsizes: str | Path | dict[str, int] | None = None,
    *,
    ip: str | None = None,
    stranded: str | None = None,
    bam_filter: bamnado.ReadFilter | None = None,
    count_fragments: bool | None = None,
    viewpoint_tag: str = "VP",
    chunk_len: int | None = None,
    construction_compression: str = DEFAULT_CONSTRUCTION_COMPRESSION,
    overwrite: bool = True,
    filter_chromosomes: bool = True,
    test: bool = False,
    test_chromosomes: list[str] | tuple[str, ...] | None = None,
    staging_dir: Path | str | None = None,
    log_file: Path | None = None,
) -> "BamStore":
    """Create a per-sample BamStore zarr from a single BAM file.

    Parameters
    ----------
    bam_path:
        Path to the aligned BAM file.
    store_path:
        Output .zarr directory.
    assay:
        Assay type string (e.g. "ATAC", "ChIP", "CUT&TAG", "RNA", "MCC").
    sample:
        Sample name (used in metadata and as sample coordinate).
    chromsizes:
        Path to .chrom.sizes file, dict, or None to infer from BAM header.
    ip:
        IP target (ChIP/CUT&TAG only). Combined with assay to form array key.
    stranded:
        Strand orientation: "R" (reverse), "F" (forward), or None (unstranded).
        Required for RNA assays.
    """
    if log_file is not None:
        from quantnado.utils import setup_logging
        setup_logging(Path(log_file), verbose=False)

    bam_path = str(bam_path)
    is_mcc = assay.upper() == "MCC"

    if chromsizes is None:
        logger.info(f"Extracting chromsizes from {bam_path}")
        chromsizes = _get_chromsizes_from_bam(bam_path)

    chromsizes_dict = _parse_chromsizes(
        chromsizes,
        filter_chromosomes=filter_chromosomes,
        test=test,
        test_chromosomes=test_chromosomes,
    )
    # MCC reads are single-end after ligation; disable proper_pair filter
    read_filter = bam_filter or bamnado.ReadFilter(proper_pair=not is_mcc)
    use_fragment = bool(count_fragments) if count_fragments is not None else assay.upper() == "RNA"

    viewpoints: list[str] = []
    if is_mcc:
        logger.info(f"Scanning MCC viewpoints from {bam_path}")
        viewpoints = _get_viewpoints_from_mcc_bam(bam_path, viewpoint_tag=viewpoint_tag)
        logger.info(f"Found {len(viewpoints)} viewpoints: {viewpoints}")

    resolved_chunk_len = _resolve_chunk_len(chromsizes_dict, Path(store_path), chunk_len)
    compressors = _resolve_compressors(construction_compression)

    final_path = _normalize_path(store_path)
    if staging_dir is not None:
        staged_path = Path(staging_dir) / f".{final_path.stem}.staging-{uuid.uuid4().hex}.zarr"
        build_path = staged_path
    else:
        build_path = final_path

    store = cls(
        store_path=build_path,
        assay=assay,
        sample=sample,
        chromsizes=chromsizes_dict,
        ip=ip,
        stranded=stranded,
        viewpoints=viewpoints,
        chunk_len=resolved_chunk_len,
        compressors=compressors,
        overwrite=overwrite,
    )
    store._viewpoint_tag = viewpoint_tag

    if is_mcc:
        sparsity = store._write_viewpoint_coverage(bam_path, read_filter, use_fragment)
    else:
        sparsity = store._write_coverage(bam_path, read_filter, use_fragment)

    store._finalise(bam_path, sparsity)

    if staging_dir is not None:
        _publish_staged(build_path, final_path)
        return cls.open(final_path, read_only=False)

    return store