Skip to content

QuantNadoDataset

quantnado.analysis.core.QuantNadoDataset

QuantNadoDataset(
    path: Path | str, annotation: str | Path | None = None
)

Unified read-only xarray view over QuantNado zarr stores.

Parameters:

Name Type Description Default
path Path | str

Path to either: - a directory containing per-sample .zarr stores, or - a single combined .zarr store written by :meth:combine. .tar.gz/.tgz archives containing either layout are extracted to a temporary directory and opened read-only.

required

Examples:

>>> qn = QuantNadoDataset("dataset/")
>>> region = qn.sel(chrom="chr1", start=1_000_000, end=1_001_000)
>>> region["atac"].sel(sample="ATAC-SEM-1")
>>> tree = qn.to_datatree()
Source code in quantnado/analysis/core.py
def __init__(self, path: Path | str, annotation: str | Path | None = None) -> None:
    input_path = Path(path)
    self.source_path = input_path
    self.path = input_path
    self._archive_tmpdir: tempfile.TemporaryDirectory[str] | None = None
    self._stores: list[_PerSampleStore] = []
    self._combined_root: zarr.Group | None = None
    self._combined = False
    self._genes_df: pd.DataFrame | None = None
    self._exons_df: pd.DataFrame | None = None
    self._subset_samples: list[str] | None = None
    self._group_sets: dict[str, dict[str, list[str]]] = {}
    self._last_group_name: str | None = None

    if input_path.exists() and _is_tar_archive(input_path):
        self._archive_tmpdir = tempfile.TemporaryDirectory(prefix="quantnado-")
        self.path = _extract_tar_archive(input_path, Path(self._archive_tmpdir.name))

    if self.path.is_dir() and not str(self.path).endswith(".zarr"):
        # Directory of per-sample zarrs
        self._load_directory(self.path)
    elif self.path.exists():
        root = zarr.open_group(str(self.path), mode="r")
        if _is_combined_zarr(root):
            self._combined_root = root
            self._combined = True
        elif _is_per_sample_zarr(root):
            # Single per-sample zarr opened directly
            self._stores.append(_PerSampleStore(self.path, root))
        else:
            raise ValueError(f"Cannot determine store layout for {self.path}")
    else:
        raise FileNotFoundError(f"Path does not exist: {self.path}")

    if annotation is not None:
        self.set_annotation(annotation)

assays property

assays: list[str]

Distinct biological assay types present (e.g. 'ATAC', 'RNA', 'METH').

array_keys property

array_keys: list[str]

All zarr data-variable names (e.g. 'atac', 'rna_fwd', 'coverage', 'AF').

groups property

groups: GroupInfo

Assay-based sample groups for the current dataset view.

group_sets property

group_sets: dict[str, GroupInfo]

Cached named group sets for the current dataset view.

metadata property

metadata: DataFrame

Return per-sample metadata with core fields plus cached group_by labels.

available_peak_methods property

available_peak_methods: list[str]

Peak-calling methods supported by :meth:call_peaks.

coverage property

coverage: '_PlotnadoCoverageAdapter'

Sub-store adapter for plotnado stranded coverage tracks (RNA).

methylation property

methylation: '_PlotnadoMethylAdapter'

Sub-store adapter for plotnado methylation tracks.

variants property

variants: '_PlotnadoVariantsAdapter'

Sub-store adapter for plotnado variant tracks.

info property

info: DatasetInfo

Concise dataset summary with notebook-friendly representation.

group_by

group_by(
    by: str = "assay",
    *,
    groups: "dict[str, list[str] | str] | None" = None,
    match: str = "exact",
    drop_empty: bool = True,
    **named_groups: "dict[str, list[str] | str]",
) -> "GroupInfo | NamedGroupInfo"

Build sample groups from metadata or an explicit mapping.

Parameters:

Name Type Description Default
by str

Metadata field to group on. Currently supports "assay", "ip", and "stranded".

'assay'
groups 'dict[str, list[str] | str] | None'

Optional explicit label -> sample list mapping. When provided, this takes precedence over by and is validated against the current dataset view. With match="exact" (default), values are treated as explicit sample names. With match="contains", string or string-list values are treated as case-insensitive substrings to search for in sample names.

None
match str

How to interpret groups values. One of "exact" or "contains".

'exact'
drop_empty bool

If True (default), drop groups whose label is empty.

True
Source code in quantnado/analysis/core.py
def group_by(
    self,
    by: str = "assay",
    *,
    groups: "dict[str, list[str] | str] | None" = None,
    match: str = "exact",
    drop_empty: bool = True,
    **named_groups: "dict[str, list[str] | str]",
) -> "GroupInfo | NamedGroupInfo":
    """Build sample groups from metadata or an explicit mapping.

    Parameters
    ----------
    by:
        Metadata field to group on. Currently supports ``"assay"``,
        ``"ip"``, and ``"stranded"``.
    groups:
        Optional explicit label -> sample list mapping. When provided, this
        takes precedence over ``by`` and is validated against the current
        dataset view. With ``match="exact"`` (default), values are treated
        as explicit sample names. With ``match="contains"``, string or
        string-list values are treated as case-insensitive substrings to
        search for in sample names.
    match:
        How to interpret ``groups`` values. One of ``"exact"`` or
        ``"contains"``.
    drop_empty:
        If True (default), drop groups whose label is empty.
    """
    if named_groups:
        result: dict[str, GroupInfo] = {}
        for name, spec in named_groups.items():
            self._last_group_name = str(name)
            if isinstance(spec, str) and spec.lower() in {"assay", "ip", "stranded"}:
                result[str(name)] = self.group_by(
                    by=spec,
                    drop_empty=drop_empty,
                )
            else:
                result[str(name)] = self.group_by(
                    groups=spec,
                    match=match,
                    drop_empty=drop_empty,
                    by="assay",
                )
            self._last_group_name = str(name)
        return NamedGroupInfo(result)

    if groups is not None:
        if isinstance(groups, str):
            if groups.lower() in {"assay", "ip", "stranded"}:
                return self.group_by(by=groups, drop_empty=drop_empty)
            raise ValueError(
                "group_by(groups=...) expects a mapping of label -> samples/patterns, "
                "or use a metadata shorthand like ip='ip'."
            )
        valid = set(self.sample_names)
        grouped: dict[str, list[str]] = {}
        if match not in {"exact", "contains"}:
            raise ValueError("group_by(match=...) currently supports 'exact' or 'contains'")

        for label, samples in groups.items():
            if match == "exact":
                if isinstance(samples, str):
                    requested = [samples]
                else:
                    requested = [str(s) for s in samples]
                grouped[str(label)] = [str(s) for s in requested if str(s) in valid]
            else:
                patterns = [samples] if isinstance(samples, str) else [str(s) for s in samples]
                patterns_lower = [p.lower() for p in patterns]
                grouped[str(label)] = [
                    sample
                    for sample in self.sample_names
                    if any(pattern in sample.lower() for pattern in patterns_lower)
                ]
        result = GroupInfo({k: v for k, v in grouped.items() if v})
        cache_name = self._last_group_name or "group"
        self._group_sets[cache_name] = {k: list(v) for k, v in result.items()}
        return result

    key = by.lower()
    if key == "assay":
        values = self._get_assay_per_sample()
        labels = [str(v).upper() if v else "" for v in values]
    elif key == "ip":
        values = self._get_ip_per_sample()
        labels = [str(v) for v in values]
    elif key == "stranded":
        values = self._get_stranded_per_sample()
        labels = [str(v) for v in values]
    else:
        raise ValueError("group_by(by=...) currently supports 'assay', 'ip', or 'stranded'")

    grouped: dict[str, list[str]] = {}
    for sample, label in zip(self.sample_names, labels):
        if drop_empty and not label:
            continue
        grouped.setdefault(label or "UNKNOWN", []).append(sample)
    result = GroupInfo(grouped)
    self._group_sets[key] = {k: list(v) for k, v in result.items()}
    self._last_group_name = key
    return result

subset

subset(
    assay: "str | Sequence[str] | None" = None,
    samples: "str | Sequence[str] | None" = None,
    ip: "str | Sequence[str] | None" = None,
    group: "str | Sequence[str] | dict[str, str | Sequence[str]] | None" = None,
) -> "QuantNadoDataset"

Return a new QuantNadoDataset restricted to the specified filters.

No data is copied — the returned object shares the same zarr handles. Use this to avoid repeating assay= or samples= on every call::

rna = qn.subset(assay="RNA")
reduced = rna.reduce(intervals_path="promoters.bed")
normalised = rna.normalise(reduced, method="cpm")

Parameters:

Name Type Description Default
assay 'str | Sequence[str] | None'

One or more assay types (e.g. "RNA", ["ATAC", "ChIP"]).

None
samples 'str | Sequence[str] | None'

Explicit sample name(s).

None
ip 'str | Sequence[str] | None'

One or more IP / target labels (e.g. "MLL", ["H3K27ac", "TET2"]).

None
group 'str | Sequence[str] | dict[str, str | Sequence[str]] | None'

Labels from cached group sets. Pass a string / list to use the most recent :meth:group_by namespace, or a mapping like {"treatment": "treated", "replicate": "rep1"}.

None

Returns:

Type Description
QuantNadoDataset

A lightweight view over the same stores, filtered to the resolved samples.

Source code in quantnado/analysis/core.py
def subset(
    self,
    assay: "str | Sequence[str] | None" = None,
    samples: "str | Sequence[str] | None" = None,
    ip: "str | Sequence[str] | None" = None,
    group: "str | Sequence[str] | dict[str, str | Sequence[str]] | None" = None,
) -> "QuantNadoDataset":
    """Return a new QuantNadoDataset restricted to the specified filters.

    No data is copied — the returned object shares the same zarr handles.
    Use this to avoid repeating ``assay=`` or ``samples=`` on every call::

        rna = qn.subset(assay="RNA")
        reduced = rna.reduce(intervals_path="promoters.bed")
        normalised = rna.normalise(reduced, method="cpm")

    Parameters
    ----------
    assay:
        One or more assay types (e.g. ``"RNA"``, ``["ATAC", "ChIP"]``).
    samples:
        Explicit sample name(s).
    ip:
        One or more IP / target labels (e.g. ``"MLL"``, ``["H3K27ac", "TET2"]``).
    group:
        Labels from cached group sets. Pass a string / list to use the most
        recent :meth:`group_by` namespace, or a mapping like
        ``{"treatment": "treated", "replicate": "rep1"}``.

    Returns
    -------
    QuantNadoDataset
        A lightweight view over the same stores, filtered to the resolved samples.
    """
    resolved = QuantNadoDataset._resolve_samples(
        self,
        assay=assay,
        samples=samples,
        ip=ip,
        group=group,
    )

    new: QuantNadoDataset = object.__new__(QuantNadoDataset)
    new.path = self.path
    new.source_path = getattr(self, "source_path", self.path)
    new._archive_tmpdir = getattr(self, "_archive_tmpdir", None)
    new._combined = self._combined
    new._combined_root = self._combined_root
    new._genes_df = self._genes_df
    new._exons_df = self._exons_df
    new._group_sets = {k: dict(v) for k, v in self._group_sets.items()}
    new._last_group_name = self._last_group_name

    if self._combined:
        new._stores = []
        new._subset_samples = resolved
    else:
        resolved_set = set(resolved)
        new._stores = [
            s
            for s in self._stores
            if s.sample in resolved_set
            or any(f"{s.sample}_{vp}" in resolved_set for vp in s.viewpoints)
        ]
        new._subset_samples = None

    return new

set_annotation

set_annotation(gtf_path: str | Path) -> None

Attach a GTF annotation file for gene-name-based queries.

Parameters:

Name Type Description Default
gtf_path str | Path

Path to a GTF or GTF.gz file (e.g. hg38.gtf.gz).

required
Source code in quantnado/analysis/core.py
def set_annotation(self, gtf_path: str | Path) -> None:
    """Attach a GTF annotation file for gene-name-based queries.

    Parameters
    ----------
    gtf_path:
        Path to a GTF or GTF.gz file (e.g. hg38.gtf.gz).
    """
    from .features import load_gtf

    gtf = load_gtf(
        str(gtf_path),
        feature_types=["gene", "exon"],
        usecols=[
            "gene_id",
            "gene_name",
            "transcript_id",
            "gene_type",
            "gene_biotype",
            "exon_number",
        ],
    )
    df = pd.DataFrame(gtf)
    self._genes_df = df[df["feature"] == "gene"].reset_index(drop=True)
    self._exons_df = df[df["feature"] == "exon"].reset_index(drop=True)
    if self._genes_df.empty:
        logger.warning("No 'gene' features found in GTF; falling back to 'transcript'")
        transcript_gtf = load_gtf(
            str(gtf_path),
            feature_types=["transcript"],
            usecols=["gene_id", "gene_name", "transcript_id", "gene_type", "gene_biotype"],
        )
        self._genes_df = pd.DataFrame(transcript_gtf).reset_index(drop=True)
    logger.info(f"Loaded annotation: {len(self._genes_df):,} genes from {gtf_path}")

gene_info

gene_info(name: str) -> dict

Look up a gene by name and return its coordinates and exon structure.

Parameters:

Name Type Description Default
name str

Gene name (e.g. "GNAQ"). Case-sensitive; falls back to case-insensitive.

required

Returns:

Type Description
dict with keys: gene_name, gene_id, chrom, start, end, strand, locus, exons.
Coordinates are 1-based inclusive.
Source code in quantnado/analysis/core.py
def gene_info(self, name: str) -> dict:
    """Look up a gene by name and return its coordinates and exon structure.

    Parameters
    ----------
    name:
        Gene name (e.g. "GNAQ"). Case-sensitive; falls back to case-insensitive.

    Returns
    -------
    dict with keys: gene_name, gene_id, chrom, start, end, strand, locus, exons.
    Coordinates are 1-based inclusive.
    """
    if self._genes_df is None:
        raise RuntimeError(
            "No annotation loaded. Call set_annotation() or pass annotation= to __init__."
        )

    if "gene_name" not in self._genes_df.columns:
        raise KeyError(f"Annotation has no 'gene_name' column — cannot look up '{name}'.")

    # Case-sensitive lookup first, then case-insensitive fallback
    hits = self._genes_df[self._genes_df["gene_name"] == name]
    if hits.empty:
        hits = self._genes_df[self._genes_df["gene_name"].str.upper() == name.upper()]
    if hits.empty:
        raise KeyError(f"Gene '{name}' not found in annotation.")

    # Multiple hits (e.g. PAR regions) → take the largest span
    if len(hits) > 1:
        hits = hits.loc[[(hits["End"] - hits["Start"]).idxmax()]]
    row = hits.iloc[0]

    # PyRanges: 0-based half-open [Start, End) → 1-based inclusive [start, end]
    chrom = str(row["Chromosome"])
    start_1 = int(row["Start"]) + 1
    end_1 = int(row["End"])
    strand = str(row.get("Strand", "+"))
    gene_id = row.get("gene_id") or None
    gene_name_out = row.get("gene_name") or name

    # Exons for this gene
    exons_out: list[dict] = []
    if self._exons_df is not None and not self._exons_df.empty:
        if gene_id and "gene_id" in self._exons_df.columns:
            ex = self._exons_df[self._exons_df["gene_id"] == gene_id]
        elif "gene_name" in self._exons_df.columns:
            ex = self._exons_df[self._exons_df["gene_name"] == gene_name_out]
        else:
            ex = self._exons_df.iloc[0:0]  # empty
        for _, erow in ex.iterrows():
            exons_out.append(
                {
                    "start": int(erow["Start"]) + 1,
                    "end": int(erow["End"]),
                    "exon_number": str(erow["exon_number"])
                    if "exon_number" in erow and pd.notna(erow.get("exon_number"))
                    else None,
                }
            )
        exons_out.sort(key=lambda e: e["start"])

    return {
        "gene_name": gene_name_out,
        "gene_id": gene_id,
        "chrom": chrom,
        "start": start_1,
        "end": end_1,
        "strand": strand,
        "locus": f"{chrom}:{start_1}-{end_1}",
        "exons": exons_out,
    }

sel_gene

sel_gene(
    name: str,
    padding: int = 0,
    assay: "str | Sequence[str] | None" = None,
    samples: "str | Sequence[str] | None" = None,
) -> xr.Dataset

Select a genomic region by gene name.

Parameters:

Name Type Description Default
name str

Gene name (e.g. "GNAQ").

required
padding int

Extra bases to add on each side of the gene body (default: 0).

0
assay 'str | Sequence[str] | None'

Optional assay filter passed to :meth:sel.

None

Returns:

Type Description
xr.Dataset with gene metadata in ``.attrs``:
``gene_name``, ``gene_id``, ``gene_strand``, ``locus``, ``exons``.
Source code in quantnado/analysis/core.py
def sel_gene(
    self,
    name: str,
    padding: int = 0,
    assay: "str | Sequence[str] | None" = None,
    samples: "str | Sequence[str] | None" = None,
) -> xr.Dataset:
    """Select a genomic region by gene name.

    Parameters
    ----------
    name:
        Gene name (e.g. "GNAQ").
    padding:
        Extra bases to add on each side of the gene body (default: 0).
    assay:
        Optional assay filter passed to :meth:`sel`.

    Returns
    -------
    xr.Dataset with gene metadata in ``.attrs``:
    ``gene_name``, ``gene_id``, ``gene_strand``, ``locus``, ``exons``.
    """
    info = self.gene_info(name)
    chrom = info["chrom"]
    chrom_len = self.chromsizes.get(chrom, 0)

    start = max(1, info["start"] - padding)
    end = info["end"] + padding
    if chrom_len:
        end = min(end, chrom_len)

    ds = self.sel(chrom, start, end, assay=assay, samples=samples)
    ds.attrs.update(
        {
            "gene_name": info["gene_name"],
            "gene_id": info["gene_id"],
            "gene_strand": info["strand"],
            "locus": f"{chrom}:{start}-{end}",
            "exons": info["exons"],
        }
    )
    return ds

sel

sel(
    chrom: str,
    start: int | None = None,
    end: int | None = None,
    assay: "str | Sequence[str] | None" = None,
    samples: "str | Sequence[str] | None" = None,
) -> xr.Dataset

Extract a genomic region as an xr.Dataset.

Parameters:

Name Type Description Default
chrom str

Chromosome name (e.g. "chr1").

required
start int | None

1-based start position (inclusive). Defaults to 1.

None
end int | None

1-based end position (inclusive). Defaults to chromosome length.

None
assay 'str | Sequence[str] | None'

If provided, restrict to samples whose assay type matches (e.g. "atac", "rna", "meth"). The returned Dataset will only contain the array keys that belong to that assay, and only the samples that have that assay type attached as the assay coordinate.

None

Returns:

Type Description
Dataset

dims: sample × position coords: position (1-based), sample, assay (non-index on sample) data_vars: one per assay key (atac, chip_h3k27ac, rna_fwd, …)

Source code in quantnado/analysis/core.py
def sel(
    self,
    chrom: str,
    start: int | None = None,
    end: int | None = None,
    assay: "str | Sequence[str] | None" = None,
    samples: "str | Sequence[str] | None" = None,
) -> xr.Dataset:
    """Extract a genomic region as an xr.Dataset.

    Parameters
    ----------
    chrom:
        Chromosome name (e.g. ``"chr1"``).
    start:
        1-based start position (inclusive). Defaults to 1.
    end:
        1-based end position (inclusive). Defaults to chromosome length.
    assay:
        If provided, restrict to samples whose assay type matches
        (e.g. ``"atac"``, ``"rna"``, ``"meth"``). The returned Dataset
        will only contain the array keys that belong to that assay, and
        only the samples that have that assay type attached as the
        ``assay`` coordinate.

    Returns
    -------
    xr.Dataset
        dims: sample × position
        coords: position (1-based), sample, assay (non-index on sample)
        data_vars: one per assay key (atac, chip_h3k27ac, rna_fwd, …)
    """
    chrom_len = self.chromsizes.get(chrom)
    if chrom_len is None:
        raise ValueError(f"Chromosome '{chrom}' not in store. Available: {self.chromosomes}")

    start_1 = start if start is not None else 1
    end_1 = end if end is not None else chrom_len

    if start_1 < 1:
        raise ValueError(f"start must be >= 1 (1-based), got {start_1}")
    if end_1 > chrom_len:
        raise ValueError(f"end {end_1} exceeds chromosome length {chrom_len}")
    if end_1 < start_1:
        raise ValueError(f"end {end_1} < start {start_1}")

    # 0-based slice for array indexing
    s0 = start_1 - 1
    e0 = end_1

    position_coords = np.arange(start_1, end_1 + 1, dtype=np.int64)

    if self._combined:
        ds = self._sel_combined(chrom, s0, e0, position_coords)
    else:
        ds = self._sel_per_sample(chrom, s0, e0, position_coords)

    if samples is not None:
        ds = ds.sel(sample=self._resolve_samples(samples=samples))
    elif assay is not None:
        assay_upper = {a.upper() for a in ([assay] if isinstance(assay, str) else assay)}
        if "assay" not in ds.coords:
            raise ValueError("Dataset has no 'assay' coordinate — cannot filter by assay.")
        assay_mask = np.array([a in assay_upper for a in ds.coords["assay"].values], dtype=bool)
        if not assay_mask.any():
            available = sorted(set(ds.coords["assay"].values))
            raise ValueError(f"Assay '{assay}' not found. Available: {available}")
        ds = ds.isel(sample=assay_mask)

    stranded_by_sample = dict(zip(self.sample_names, self._get_stranded_per_sample()))
    return _orient_rna_strands(ds, stranded_by_sample)

to_datatree

to_datatree(
    chromosomes: list[str] | None = None, lazy: bool = True
) -> xr.DataTree

Return the full dataset as an xr.DataTree.

Each chromosome is a child node containing an xr.Dataset with 1-based position coordinates and assay data variables.

Parameters:

Name Type Description Default
chromosomes list[str] | None

Subset of chromosomes. Defaults to all.

None
lazy bool

If True (default), use lazy dask arrays without materializing position coordinate arrays. This is much faster for large datasets. If False, materializes the full position coordinate for each chromosome.

True
Source code in quantnado/analysis/core.py
def to_datatree(self, chromosomes: list[str] | None = None, lazy: bool = True) -> xr.DataTree:
    """Return the full dataset as an xr.DataTree.

    Each chromosome is a child node containing an xr.Dataset with
    1-based position coordinates and assay data variables.

    Parameters
    ----------
    chromosomes:
        Subset of chromosomes. Defaults to all.
    lazy:
        If True (default), use lazy dask arrays without materializing
        position coordinate arrays. This is much faster for large datasets.
        If False, materializes the full position coordinate for each chromosome.
    """
    chroms = chromosomes if chromosomes is not None else self.chromosomes
    nodes: dict[str, xr.Dataset] = {"/": xr.Dataset(attrs={"chromsizes": self.chromsizes})}
    for chrom in chroms:
        chrom_len = self.chromsizes[chrom]
        ds = self.sel(chrom)

        if not lazy:
            # Materialize position coords (slow for large chromosomes)
            position_coords = np.arange(1, chrom_len + 1, dtype=np.int64)
            ds = ds.assign_coords(position=position_coords)

        nodes[chrom] = ds
    return xr.DataTree.from_dict(nodes)

extract

extract(
    feature_type: str = "promoter",
    GTF_FILE: str | None = None,
    anchor_feature: str = "gene",
    fixed_width: int | None = None,
    upstream: int | None = None,
    downstream: int | None = None,
    anchor: str = "start",
    flip_strand: bool = True,
    bin_size: int = 50,
    assay: "str | Sequence[str] | None" = None,
    modality: "str | Sequence[str] | None" = None,
    samples: "str | Sequence[str] | None" = None,
) -> xr.DataArray

Extract signal into fixed-width bins around genomic features.

Parameters:

Name Type Description Default
feature_type str

Feature type: "promoter", "gene", "transcript", or "exon".

'promoter'
GTF_FILE str

Path to GTF file.

None
anchor_feature str

Which feature type to anchor promoters on when feature_type="promoter". Usually "gene" or "transcript". This is passed through to :func:quantnado.analysis.features.extract_promoters.

'gene'
fixed_width int

If provided, expands features to this width around anchor point. If None, uses feature length.

None
upstream int

Window around the anchor in base pairs. When provided, positions are extracted from anchor - upstream to anchor + downstream and plotted on a signed coordinate axis (for example -2000 .. 0 .. 2000). Cannot be used together with fixed_width.

None
downstream int

Window around the anchor in base pairs. When provided, positions are extracted from anchor - upstream to anchor + downstream and plotted on a signed coordinate axis (for example -2000 .. 0 .. 2000). Cannot be used together with fixed_width.

None
anchor str

Anchor point: "start", "end", or "midpoint".

'start'
flip_strand bool

If True (default), reverse minus-strand intervals after extraction so the returned windows are oriented 5'→3' relative to the anchor. This is especially useful for gene/transcript-body style plots where the gene body should lie to the right of the TSS.

True
bin_size int

Width of each bin in bp (default: 50).

50
assay str

Assay type to restrict samples to (e.g. "RNA", "ATAC", "METH"). Also accepted as the array key for backward compatibility.

None
modality str

Array key to extract (e.g. "rna_fwd", "coverage", "methyl_pct"). Required when assay is a type name rather than an array key.

None
samples list of str

Sample names to extract. If None, uses all samples.

None

Returns:

Type Description
DataArray

Shape (interval, bin, sample) with binned signal.

Source code in quantnado/analysis/core.py
def extract(
    self,
    feature_type: str = "promoter",
    GTF_FILE: str | None = None,
    anchor_feature: str = "gene",
    fixed_width: int | None = None,
    upstream: int | None = None,
    downstream: int | None = None,
    anchor: str = "start",
    flip_strand: bool = True,
    bin_size: int = 50,
    assay: "str | Sequence[str] | None" = None,
    modality: "str | Sequence[str] | None" = None,
    samples: "str | Sequence[str] | None" = None,
) -> xr.DataArray:
    """Extract signal into fixed-width bins around genomic features.

    Parameters
    ----------
    feature_type : str
        Feature type: "promoter", "gene", "transcript", or "exon".
    GTF_FILE : str
        Path to GTF file.
    anchor_feature : str
        Which feature type to anchor promoters on when ``feature_type="promoter"``.
        Usually ``"gene"`` or ``"transcript"``. This is passed through to
        :func:`quantnado.analysis.features.extract_promoters`.
    fixed_width : int, optional
        If provided, expands features to this width around anchor point.
        If None, uses feature length.
    upstream, downstream : int, optional
        Window around the anchor in base pairs. When provided, positions are
        extracted from ``anchor - upstream`` to ``anchor + downstream`` and
        plotted on a signed coordinate axis (for example ``-2000 .. 0 .. 2000``).
        Cannot be used together with ``fixed_width``.
    anchor : str
        Anchor point: "start", "end", or "midpoint".
    flip_strand : bool
        If True (default), reverse minus-strand intervals after extraction
        so the returned windows are oriented 5'→3' relative to the anchor.
        This is especially useful for gene/transcript-body style plots where
        the gene body should lie to the right of the TSS.
    bin_size : int
        Width of each bin in bp (default: 50).
    assay : str, optional
        Assay type to restrict samples to (e.g. "RNA", "ATAC", "METH").
        Also accepted as the array key for backward compatibility.
    modality : str, optional
        Array key to extract (e.g. "rna_fwd", "coverage", "methyl_pct").
        Required when assay is a type name rather than an array key.
    samples : list of str, optional
        Sample names to extract. If None, uses all samples.

    Returns
    -------
    xr.DataArray
        Shape (interval, bin, sample) with binned signal.
    """
    import pandas as pd

    from .features import extract_feature_ranges, extract_promoters, load_gtf
    from .ranges import extract_signal_into_bins

    if GTF_FILE is None:
        raise ValueError("GTF_FILE is required")
    if fixed_width is not None and (upstream is not None or downstream is not None):
        raise ValueError("Cannot specify both fixed_width and upstream/downstream")

    # Resolve modality (array key) and optional assay-type sample filter.
    # Backward compat: if only assay is given and it looks like an array key,
    # treat it as modality with no sample filtering.
    array_keys = self.array_keys
    if modality is not None:
        array_key = self._resolve_modalities(modality)
    elif isinstance(assay, str) and assay.lower() in [k.lower() for k in array_keys]:
        array_key = assay
        assay = None  # not a type filter
    elif assay is not None:
        raise ValueError(
            f"modality is required when assay='{assay}' is an assay type. "
            f"Available array keys: {array_keys}"
        )
    else:
        raise ValueError("Either assay (array key) or modality must be provided.")

    if samples is None:
        if assay is not None:
            samples = self._resolve_samples(assay=assay)
        else:
            samples = self.sample_names
    else:
        samples = self._resolve_samples(samples=samples)

    # Load and extract features
    gtf = load_gtf(GTF_FILE)

    _promoter_upstream = upstream if upstream is not None else 1000
    _promoter_downstream = downstream if downstream is not None else 200

    if feature_type == "promoter":
        features_pr = extract_promoters(
            gtf,
            upstream=_promoter_upstream,
            downstream=_promoter_downstream,
            anchor_feature=anchor_feature,
        )
    else:
        features_pr = extract_feature_ranges(gtf, feature_type=feature_type)

    features_df = pd.DataFrame(features_pr)

    # Rename PyRanges columns to standard names
    if "Chromosome" in features_df.columns:
        features_df = features_df.rename(columns={"Chromosome": "chrom"})
    if "Start" in features_df.columns:
        features_df = features_df.rename(columns={"Start": "start"})
    if "End" in features_df.columns:
        features_df = features_df.rename(columns={"End": "end"})

    strand_col = next((c for c in ("Strand", "strand") if c in features_df.columns), None)
    strands = features_df[strand_col].fillna("+").astype(str).values if strand_col else None

    if feature_type == "promoter":
        # extract_promoters already sets start = TSS - upstream, end = TSS + downstream
        # for all strands, so no anchor re-computation or re-windowing is needed.
        window_upstream = _promoter_upstream
        window_downstream = _promoter_downstream
    else:
        # Apply fixed-width or upstream/downstream windowing around the chosen anchor
        if anchor == "start":
            anchor_pos = features_df["start"].values.copy()
            if strands is not None:
                minus_mask = strands == "-"
                anchor_pos[minus_mask] = features_df.loc[minus_mask, "end"].values
        elif anchor == "end":
            anchor_pos = features_df["end"].values.copy()
            if strands is not None:
                plus_mask = strands == "+"
                minus_mask = strands == "-"
                anchor_pos[plus_mask] = features_df.loc[plus_mask, "end"].values
                anchor_pos[minus_mask] = features_df.loc[minus_mask, "start"].values
        elif anchor == "midpoint":
            anchor_pos = (
                (features_df["start"].values + features_df["end"].values) // 2
            ).astype(int)
        else:
            raise ValueError(f"Unknown anchor: {anchor}")

        if upstream is not None or downstream is not None:
            left = upstream if upstream is not None else 0
            right = downstream if downstream is not None else 0
            features_df["start"] = anchor_pos - left
            features_df["end"] = anchor_pos + right
            window_upstream = left
            window_downstream = right
        elif fixed_width is not None:
            half_width = fixed_width // 2
            features_df["start"] = anchor_pos - half_width
            features_df["end"] = anchor_pos + (fixed_width - half_width)
            window_upstream = half_width
            window_downstream = fixed_width - half_width
        else:
            window_upstream = None
            window_downstream = None

    # Convert to 1-based intervals (vectorised — avoids slow iterrows)
    intervals = list(
        zip(
            features_df["chrom"].tolist(),
            (features_df["start"].values + 1).tolist(),
            features_df["end"].values.tolist(),
        )
    )

    # Extract signal into bins
    signal_array = extract_signal_into_bins(intervals, self, array_key, bin_size, samples)

    # Create DataArray
    n_intervals, n_bins, n_samples = signal_array.shape
    interval_ids = np.arange(n_intervals)
    if window_upstream is not None:
        bin_ids = np.arange(n_bins, dtype=np.int64) * bin_size - int(window_upstream)
    else:
        bin_ids = np.arange(n_bins, dtype=np.int64)

    strand_values = (
        strands if strands is not None else np.array(["+"] * n_intervals, dtype=object)
    )

    if flip_strand and strands is not None:
        minus_mask = strand_values == "-"
        if np.any(minus_mask):
            signal_array = signal_array.copy()
            signal_array[minus_mask] = signal_array[minus_mask, ::-1, :]

    da = xr.DataArray(
        signal_array,
        dims=("interval", "bin", "sample"),
        coords={
            "interval": interval_ids,
            "bin": bin_ids,
            "sample": samples,
            "strand": ("interval", strand_values),
        },
        attrs={
            "upstream": window_upstream,
            "downstream": window_downstream,
            "anchor": anchor,
            "bin_size": bin_size,
            "strand_flipped": bool(flip_strand and strands is not None),
        },
    )

    return da

call_peaks

call_peaks(
    output_dir: "str | Path",
    method: "str | Sequence[str] | None" = None,
    assay: "str | Sequence[str] | None" = None,
    **kwargs,
) -> "dict[str, Path] | dict[str, dict[str, Path]]"

Call peaks for all completed samples in the dataset.

Parameters:

Name Type Description Default
output_dir str or Path

Directory where per-sample BED files are written.

required
method ('quantile', 'seacr', 'lanceotron')

Peak-calling algorithm. Pass one method or a list of methods. When omitted, one method is auto-selected from the biological assay type:

  • ATAC or unknown → "quantile"
  • CUT&TAG / CUT&RUN → "seacr"
  • ChIP → "lanceotron"
"quantile"
assay str

Zarr array key to call peaks on (e.g. "coverage"). Defaults to "coverage" when present in the store, otherwise the first array key.

None
**kwargs

Passed through to the underlying caller. Common options:

  • blacklist_file – BED path of excluded regions (all methods)
  • quantile – threshold quantile (quantile method)
  • fdr_threshold – FDR cutoff (seacr method)
  • score_threshold – minimum classification score (lanceotron)
  • n_workers – parallel workers (seacr / lanceotron)
{}

Returns:

Type Description
dict[str, Path] or dict[str, dict[str, Path]]

For one method: sample name → path to the output BED file. For multiple methods: method name → (sample name → path).

Examples:

>>> qn.available_peak_methods
['quantile', 'seacr', 'lanceotron']
>>> beds = qn.subset(assay="ATAC").call_peaks("peaks/atac/")
>>> beds = qn.subset(assay="CUT&TAG").call_peaks("peaks/cat/", method="seacr")
>>> beds_by_method = qn.call_peaks("peaks/", method=["quantile", "lanceotron"], assay=["ATAC", "CHIP"])
Source code in quantnado/analysis/core.py
def call_peaks(
    self,
    output_dir: "str | Path",
    method: "str | Sequence[str] | None" = None,
    assay: "str | Sequence[str] | None" = None,
    **kwargs,
) -> "dict[str, Path] | dict[str, dict[str, Path]]":
    """Call peaks for all completed samples in the dataset.

    Parameters
    ----------
    output_dir : str or Path
        Directory where per-sample BED files are written.
    method : {"quantile", "seacr", "lanceotron"} or sequence, optional
        Peak-calling algorithm. Pass one method or a list of methods.
        When omitted, one method is auto-selected from the biological assay
        type:

        * ATAC or unknown → ``"quantile"``
        * CUT&TAG / CUT&RUN → ``"seacr"``
        * ChIP → ``"lanceotron"``
    assay : str, optional
        Zarr array key to call peaks on (e.g. ``"coverage"``).
        Defaults to ``"coverage"`` when present in the store, otherwise
        the first array key.
    **kwargs
        Passed through to the underlying caller.  Common options:

        * ``blacklist_file`` – BED path of excluded regions (all methods)
        * ``quantile`` – threshold quantile (quantile method)
        * ``fdr_threshold`` – FDR cutoff (seacr method)
        * ``score_threshold`` – minimum classification score (lanceotron)
        * ``n_workers`` – parallel workers (seacr / lanceotron)

    Returns
    -------
    dict[str, Path] or dict[str, dict[str, Path]]
        For one method: sample name → path to the output BED file.
        For multiple methods: method name → (sample name → path).

    Examples
    --------
    >>> qn.available_peak_methods
    ['quantile', 'seacr', 'lanceotron']
    >>> beds = qn.subset(assay="ATAC").call_peaks("peaks/atac/")
    >>> beds = qn.subset(assay="CUT&TAG").call_peaks("peaks/cat/", method="seacr")
    >>> beds_by_method = qn.call_peaks("peaks/", method=["quantile", "lanceotron"], assay=["ATAC", "CHIP"])
    """
    from pathlib import Path as _Path

    from ..peak_calling.call_lanceotron_peaks import call_lanceotron_peaks_from_zarr
    from ..peak_calling.call_quantile_peaks import call_quantile_peaks_from_zarr
    from ..peak_calling.call_seacr_peaks import call_seacr_peaks_from_zarr

    output_dir = _Path(output_dir)
    methods = (
        [method] if isinstance(method, str) or method is None else [str(m) for m in method]
    )

    # Auto-select calling method from biological assay types
    if method is None:
        bio_assays = {a.upper() for a in self.assays}
        if bio_assays & {"CUT&TAG", "CUTTAG", "CAT", "CUT&RUN", "CUTRUN"}:
            methods = ["seacr"]
        elif bio_assays & {"CHIP"}:
            methods = ["lanceotron"]
        else:
            methods = ["quantile"]
        logger.info(
            f"Auto-selected peak-calling method '{methods[0]}' for assays {sorted(bio_assays)}"
        )

    def _resolve_peak_tasks(
        assay_value: "str | Sequence[str] | None",
    ) -> "list[tuple[str, list[str] | None, str]]":
        array_keys = self.array_keys
        if assay_value is None:
            default_key = (
                "coverage"
                if "coverage" in array_keys
                else (array_keys[0] if array_keys else None)
            )
            return [(default_key, None, default_key)] if default_key is not None else []

        requested = [assay_value] if isinstance(assay_value, str) else list(assay_value)
        resolved: list[tuple[str, list[str] | None, str]] = []
        assay_by_sample = dict(zip(self.sample_names, self._get_assay_per_sample()))
        key_to_samples: dict[str, list[str]]
        if self._combined:
            key_to_samples = {
                str(k): [str(s) for s in v]
                for k, v in dict(self._combined_root.attrs.get("key_to_samples", {})).items()
            }
        else:
            key_to_samples = {}
            for store in self._stores:
                samples_for_store = (
                    [f"{store.sample}_{vp}" for vp in store.viewpoints]
                    if store.viewpoints
                    else [store.sample]
                )
                for key in store.array_keys():
                    key_to_samples.setdefault(key, []).extend(samples_for_store)

        for raw in requested:
            value = str(raw)
            matched_key = next((k for k in array_keys if k.lower() == value.lower()), None)
            if matched_key is not None:
                samples_for_key = key_to_samples.get(matched_key)
                task = (matched_key, samples_for_key, matched_key)
                if task not in resolved:
                    resolved.append(task)
                continue

            assay_upper = value.upper()
            assay_samples = self._resolve_samples(assay=assay_upper)
            explicit_keys: list[str] = []
            for key in array_keys:
                samples_for_key = key_to_samples.get(key, [])
                if any(
                    assay_by_sample.get(sample, "").upper() == assay_upper
                    for sample in samples_for_key
                ):
                    explicit_keys.append(key)

            if explicit_keys:
                for key in explicit_keys:
                    samples_for_key = [
                        sample
                        for sample in key_to_samples.get(key, [])
                        if assay_by_sample.get(sample, "").upper() == assay_upper
                    ]
                    label = assay_upper.lower() if key == "coverage" else key
                    task = (key, samples_for_key or None, label)
                    if task not in resolved:
                        resolved.append(task)
            elif assay_samples:
                coverage_key = "coverage" if "coverage" in array_keys else None
                if coverage_key is not None:
                    label = assay_upper.lower()
                    task = (coverage_key, assay_samples, label)
                    if task not in resolved:
                        resolved.append(task)

        if not resolved:
            raise ValueError(
                f"Could not resolve assay={assay_value!r} to any peak-callable array keys. "
                f"Available assays: {self.assays}; available array keys: {self.array_keys}"
            )
        return resolved

    _dispatch = {
        "quantile": call_quantile_peaks_from_zarr,
        "seacr": call_seacr_peaks_from_zarr,
        "lanceotron": call_lanceotron_peaks_from_zarr,
    }
    unknown = [m for m in methods if m not in _dispatch]
    if unknown:
        raise ValueError(f"Unknown method(s) {unknown!r}. Choose from: {sorted(_dispatch)}")

    tasks = _resolve_peak_tasks(assay)
    multi_key = len(tasks) > 1
    multi_method = len(methods) > 1
    results: dict[str, dict[str, Path]] = {}
    for selected_method in methods:
        bed_paths: list[str] = []
        for array_key, selected_samples, label in tasks:
            base_output_dir = output_dir / selected_method if multi_method else output_dir
            key_output_dir = base_output_dir / label if multi_key else base_output_dir
            bed_paths.extend(
                _dispatch[selected_method](
                    self.path,
                    key_output_dir,
                    assay=array_key,
                    samples=selected_samples,
                    **kwargs,
                )
            )
        results[selected_method] = {_Path(p).stem: _Path(p) for p in bed_paths}

    return results[methods[0]] if len(methods) == 1 else results

peak_overlap

peak_overlap(
    peak_sets: "dict[str, str | Path]",
) -> "pd.DataFrame"

Compute overlap statistics across 2–4 peak sets.

Parameters:

Name Type Description Default
peak_sets dict[str, str or Path]

Label → BED file path.

required

Returns:

Type Description
DataFrame

Rows are exclusive Venn regions; columns are combination, n_peaks, and pct_of_first.

Examples:

>>> counts = qn.peak_overlap({
...     "ATAC":    "peaks/atac/ATAC-SEM-1.bed",
...     "CUT&TAG": "peaks/cat/CAT-HSC_H3K27ac.bed",
... })
Source code in quantnado/analysis/core.py
def peak_overlap(
    self,
    peak_sets: "dict[str, str | Path]",
) -> "pd.DataFrame":
    """Compute overlap statistics across 2–4 peak sets.

    Parameters
    ----------
    peak_sets : dict[str, str or Path]
        Label → BED file path.

    Returns
    -------
    pd.DataFrame
        Rows are exclusive Venn regions; columns are ``combination``,
        ``n_peaks``, and ``pct_of_first``.

    Examples
    --------
    >>> counts = qn.peak_overlap({
    ...     "ATAC":    "peaks/atac/ATAC-SEM-1.bed",
    ...     "CUT&TAG": "peaks/cat/CAT-HSC_H3K27ac.bed",
    ... })
    """
    from .peaks import overlap_peaks

    return overlap_peaks(peak_sets)

venn_peaks

venn_peaks(
    peak_sets: "dict[str, str | Path]",
    ax: "plt.axes.Axes | None" = None,
    title: "str | None" = None,
    colors: "list[str] | None" = None,
    alpha: float = 0.5,
    figsize: "tuple[float, float]" = (5.5, 5.5),
) -> "plt.axes.Axes"

Venn diagram for 2 or 3 peak sets.

Parameters:

Name Type Description Default
peak_sets dict[str, str or Path]

Label → BED file path. Must have exactly 2 or 3 entries.

required
ax Axes
None
title str
None
colors list of str
None
alpha float
0.5
figsize tuple
(5.5, 5.5)

Returns:

Type Description
Axes

Examples:

>>> ax = qn.venn_peaks(
...     {"ATAC": "peaks/atac/SEM-1.bed", "ChIP": "peaks/chip/SEM-1.bed"},
...     title="Peak overlap",
... )
Source code in quantnado/analysis/core.py
def venn_peaks(
    self,
    peak_sets: "dict[str, str | Path]",
    ax: "plt.axes.Axes | None" = None,
    title: "str | None" = None,
    colors: "list[str] | None" = None,
    alpha: float = 0.50,
    figsize: "tuple[float, float]" = (5.5, 5.5),
) -> "plt.axes.Axes":
    """Venn diagram for 2 or 3 peak sets.

    Parameters
    ----------
    peak_sets : dict[str, str or Path]
        Label → BED file path.  Must have exactly 2 or 3 entries.
    ax : matplotlib.axes.Axes, optional
    title : str, optional
    colors : list of str, optional
    alpha : float
    figsize : tuple

    Returns
    -------
    matplotlib.axes.Axes

    Examples
    --------
    >>> ax = qn.venn_peaks(
    ...     {"ATAC": "peaks/atac/SEM-1.bed", "ChIP": "peaks/chip/SEM-1.bed"},
    ...     title="Peak overlap",
    ... )
    """
    from .peaks import venn_plot

    return venn_plot(
        peak_sets,
        ax=ax,
        title=title,
        colors=colors,
        alpha=alpha,
        figsize=figsize,
    )

combine classmethod

combine(
    src: Path | str,
    output: Path | str,
    overwrite: bool = True,
    n_workers: int = 1,
) -> "QuantNadoDataset"

Combine a directory of per-sample zarrs into a single multi-sample zarr.

Only completed stores are included. Same-assay arrays are stacked along axis 0: (1, chrom_len) × N → (N, chrom_len).

Parameters:

Name Type Description Default
src Path | str

Directory containing per-sample .zarr stores.

required
output Path | str

Path for the combined .zarr output.

required
overwrite bool

Delete output if it already exists.

True
n_workers int

Number of thread workers for row-copy tasks. 1 preserves the previous serial behaviour.

1
Source code in quantnado/analysis/core.py
@classmethod
def combine(
    cls,
    src: Path | str,
    output: Path | str,
    overwrite: bool = True,
    n_workers: int = 1,
) -> "QuantNadoDataset":
    """Combine a directory of per-sample zarrs into a single multi-sample zarr.

    Only ``completed`` stores are included.  Same-assay arrays are stacked
    along axis 0: ``(1, chrom_len) × N → (N, chrom_len)``.

    Parameters
    ----------
    src:
        Directory containing per-sample ``.zarr`` stores.
    output:
        Path for the combined ``.zarr`` output.
    overwrite:
        Delete ``output`` if it already exists.
    n_workers:
        Number of thread workers for row-copy tasks. ``1`` preserves the
        previous serial behaviour.
    """
    from zarr.core.array_spec import ArrayConfig
    from zarr.storage import LocalStore

    n_workers = max(1, int(n_workers))
    combine_start = time.perf_counter()
    src_ds = cls(src)
    if src_ds._combined:
        raise ValueError("src is already a combined store")

    output_path = Path(output)
    if overwrite and output_path.exists():
        import shutil

        shutil.rmtree(output_path) if output_path.is_dir() else output_path.unlink()

    output_path.parent.mkdir(parents=True, exist_ok=True)
    out_root = zarr.group(store=LocalStore(str(output_path)), overwrite=True, zarr_format=3)
    write_config = ArrayConfig(order="C", write_empty_chunks=True)

    # Collect metadata across all stores
    all_samples: list[str] = src_ds.sample_names
    all_array_keys: list[str] = [
        key for key in src_ds.array_keys if key not in _COVERAGE_COLLAPSE_KEYS
    ]
    all_assay_types: list[str] = src_ds.assays  # biological types e.g. ['atac', 'meth', 'rna']

    # Group stores by assay key; also build key→sample-names mapping for sel()
    key_to_stores: dict[str, list[_PerSampleStore]] = {}
    keys_by_store = {id(store): store.array_keys() for store in src_ds._stores}
    for store in src_ds._stores:
        for key in keys_by_store[id(store)]:
            key_to_stores.setdefault(key, []).append(store)

    key_to_samples: dict[str, list[str]] = {}
    for key, stores in key_to_stores.items():
        names: list[str] = []
        for store in stores:
            if store.viewpoints and key.startswith("viewpoint_"):
                vp = key[len("viewpoint_") :]
                names.append(f"{store.sample}_{vp}")
            else:
                names.append(store.sample)
        key_to_samples[key] = names
    # Unified coverage spans all samples regardless of assay
    key_to_samples["coverage"] = all_samples
    if "coverage" not in all_array_keys:
        all_array_keys = sorted(set(all_array_keys) | {"coverage"})

    chromsizes = src_ds.chromsizes
    chunk_len = max(1, src_ds._stores[0].chunk_len if src_ds._stores else 65536)
    logger.info(
        f"Combining {len(src_ds._stores)} store(s), {len(all_samples)} sample row(s), "
        f"{len(chromsizes)} chromosome(s) -> {output_path} "
        f"(workers={n_workers}, chunk_len={chunk_len:,})"
    )

    for chrom, chrom_len in chromsizes.items():
        chrom_start = time.perf_counter()
        logger.info(f"Combining chromosome {chrom} ({chrom_len:,} bp)")
        grp = out_root.require_group(chrom)
        for key, stores in key_to_stores.items():
            if key == "coverage" or key in _COVERAGE_COLLAPSE_KEYS:
                continue  # written below as unified across all samples
            present_stores = [store for store in stores if chrom in store.chromosomes]
            if not present_stores:
                continue
            n = len(present_stores)
            first_arr = present_stores[0].get_array(chrom, key)
            first_dtype = first_arr.dtype
            fill = np.nan if np.issubdtype(first_dtype, np.floating) else 0
            out_arr = grp.create_array(
                key,
                shape=(n, chrom_len),
                chunks=(1, chunk_len),
                dtype=first_dtype,
                fill_value=fill,
                config=write_config,
                overwrite=True,
            )
            key_start = time.perf_counter()
            tasks = [
                partial(
                    _copy_array_row_chunks,
                    store.get_array(chrom, key),
                    out_arr,
                    row_idx,
                    chrom_len,
                    chunk_len,
                )
                for row_idx, store in enumerate(present_stores)
            ]
            bytes_written = _run_copy_tasks(tasks, n_workers)
            elapsed = time.perf_counter() - key_start
            logger.info(
                f"Combined {chrom}:{key} ({n} row(s)) in {elapsed:.1f}s "
                f"({_format_rate(bytes_written, elapsed)})"
            )

        # Unified coverage: one row per sample, using primary signal per assay.
        # METH → coverage, RNA → rna_fwd + rna_rev, SNP → DP,
        # ATAC/ChIP/CUT&TAG → first key, MCC → viewpoint_{vp} per viewpoint.
        cov_arr = grp.create_array(
            "coverage",
            shape=(len(all_samples), chrom_len),
            chunks=(1, chunk_len),
            dtype=np.float32,
            fill_value=0.0,
            config=write_config,
            overwrite=True,
        )
        cov_start = time.perf_counter()
        cov_tasks = []
        cov_row_idx = 0
        for store in src_ds._stores:
            keys = keys_by_store[id(store)]
            keys_set = set(keys)
            missing_chrom = chrom not in store.chromosomes
            if store.viewpoints:
                for vp in store.viewpoints:
                    vp_key = f"viewpoint_{vp}"
                    if not missing_chrom and vp_key in keys_set:
                        cov_tasks.append(
                            partial(
                                _copy_sum_array_row_chunks,
                                (store.get_array(chrom, vp_key),),
                                cov_arr,
                                cov_row_idx,
                                chrom_len,
                                chunk_len,
                            )
                        )
                    cov_row_idx += 1
            elif missing_chrom:
                cov_row_idx += 1
            elif "coverage" in keys_set:
                cov_tasks.append(
                    partial(
                        _copy_sum_array_row_chunks,
                        (store.get_array(chrom, "coverage"),),
                        cov_arr,
                        cov_row_idx,
                        chrom_len,
                        chunk_len,
                    )
                )
                cov_row_idx += 1
            elif "rna_fwd" in keys_set:
                src_arrays = [store.get_array(chrom, "rna_fwd")]
                if "rna_rev" in keys_set:
                    src_arrays.append(store.get_array(chrom, "rna_rev"))
                cov_tasks.append(
                    partial(
                        _copy_sum_array_row_chunks,
                        tuple(src_arrays),
                        cov_arr,
                        cov_row_idx,
                        chrom_len,
                        chunk_len,
                    )
                )
                cov_row_idx += 1
            elif "DP" in keys_set:
                cov_tasks.append(
                    partial(
                        _copy_sum_array_row_chunks,
                        (store.get_array(chrom, "DP"),),
                        cov_arr,
                        cov_row_idx,
                        chrom_len,
                        chunk_len,
                    )
                )
                cov_row_idx += 1
            else:
                first_key = keys[0]
                cov_tasks.append(
                    partial(
                        _copy_sum_array_row_chunks,
                        (store.get_array(chrom, first_key),),
                        cov_arr,
                        cov_row_idx,
                        chrom_len,
                        chunk_len,
                    )
                )
                cov_row_idx += 1
        bytes_written = _run_copy_tasks(cov_tasks, n_workers)
        elapsed = time.perf_counter() - cov_start
        logger.info(
            f"Combined {chrom}:coverage ({len(all_samples)} row(s)) in {elapsed:.1f}s "
            f"({_format_rate(bytes_written, elapsed)})"
        )
        logger.info(f"Finished chromosome {chrom} in {time.perf_counter() - chrom_start:.1f}s")

    # Write combined metadata
    meta_grp = out_root.require_group("metadata")
    sn_arr = meta_grp.require_array(
        "sample_names", shape=(len(all_samples),), dtype="str", overwrite=True
    )
    sn_arr[:] = all_samples
    # Expand per-store metadata to per-sample (MCC stores contribute N viewpoints each)
    completed_list: list[bool] = []
    total_reads_list: list[int] = []
    assay_list: list[str] = []
    ip_list: list[str] = []
    mean_rl_list: list[float] = []
    sparsity_list: list[float] = []
    stranded_list: list[str] = []
    for s in src_ds._stores:
        n = len(s.viewpoints) if s.viewpoints else 1
        completed_list.extend([s.completed] * n)
        total_reads_list.extend([s.total_reads] * n)
        assay_list.extend([s.assay.upper()] * n)
        ip_list.extend([str(s.ip or "")] * n)
        mean_rl_list.extend([s.mean_read_length] * n)
        sparsity_list.extend([s.sparsity] * n)
        stranded_list.extend([str(s.stranded or "")] * n)
    completed = np.array(completed_list, dtype=bool)
    meta_grp.require_array("completed", shape=(len(all_samples),), dtype=bool, overwrite=True)
    meta_grp["completed"][:] = completed
    meta_grp.require_array(
        "total_reads", shape=(len(all_samples),), dtype=np.int64, overwrite=True
    )
    meta_grp["total_reads"][:] = np.array(total_reads_list, dtype=np.int64)
    assay_arr = meta_grp.require_array(
        "assay", shape=(len(all_samples),), dtype="str", overwrite=True
    )
    assay_arr[:] = assay_list
    ip_arr = meta_grp.require_array(
        "ip", shape=(len(all_samples),), dtype="str", overwrite=True
    )
    ip_arr[:] = ip_list
    meta_grp.require_array(
        "mean_read_length", shape=(len(all_samples),), dtype=np.float32, overwrite=True
    )
    meta_grp["mean_read_length"][:] = np.array(mean_rl_list, dtype=np.float32)
    meta_grp.require_array(
        "sparsity", shape=(len(all_samples),), dtype=np.float32, overwrite=True
    )
    meta_grp["sparsity"][:] = np.array(sparsity_list, dtype=np.float32)
    stranded_arr = meta_grp.require_array(
        "stranded", shape=(len(all_samples),), dtype="str", overwrite=True
    )
    stranded_arr[:] = stranded_list

    out_root.attrs.update(
        {
            "assay_types": all_assay_types,
            "array_keys": all_array_keys,
            "sample_names": all_samples,
            "key_to_samples": key_to_samples,
            "chromsizes": chromsizes,
            "chunk_len": chunk_len,
        }
    )

    zarr.consolidate_metadata(str(output_path))
    logger.info(
        f"Combined {len(src_ds._stores)} stores -> {output_path} "
        f"in {time.perf_counter() - combine_start:.1f}s"
    )
    return cls(output_path)

reduce

reduce(
    intervals_path: "str | None" = None,
    ranges_df=None,
    gtf_path: "str | None" = None,
    feature_type: "str | None" = None,
    reduction: str = "mean",
    assay: "str | Sequence[str] | None" = None,
    samples: "str | Sequence[str] | None" = None,
    modality: "str | Sequence[str] | None" = None,
    **kwargs,
)

Reduce signal over genomic intervals.

Parameters:

Name Type Description Default
intervals_path 'str | None'

Path to a BED or GTF file.

None
ranges_df

Pre-parsed ranges DataFrame / PyRanges.

None
gtf_path 'str | None'

GTF file path (used with feature_type).

None
feature_type 'str | None'

Feature type (e.g. "gene", "promoter").

None
reduction str

One of "mean", "sum", "max", "min", "median".

'mean'
assay 'str | Sequence[str] | None'

Restrict to samples of this assay type.

None
samples 'str | Sequence[str] | None'

Explicit sample names (overrides assay).

None
modality 'str | Sequence[str] | None'

Zarr array key (e.g. "atac", "rna_fwd").

None

Returns:

Type Description
Dataset
Source code in quantnado/analysis/core.py
def reduce(
    self,
    intervals_path: "str | None" = None,
    ranges_df=None,
    gtf_path: "str | None" = None,
    feature_type: "str | None" = None,
    reduction: str = "mean",
    assay: "str | Sequence[str] | None" = None,
    samples: "str | Sequence[str] | None" = None,
    modality: "str | Sequence[str] | None" = None,
    **kwargs,
):
    """Reduce signal over genomic intervals.

    Parameters
    ----------
    intervals_path:
        Path to a BED or GTF file.
    ranges_df:
        Pre-parsed ranges DataFrame / PyRanges.
    gtf_path:
        GTF file path (used with *feature_type*).
    feature_type:
        Feature type (e.g. ``"gene"``, ``"promoter"``).
    reduction:
        One of ``"mean"``, ``"sum"``, ``"max"``, ``"min"``, ``"median"``.
    assay:
        Restrict to samples of this assay type.
    samples:
        Explicit sample names (overrides *assay*).
    modality:
        Zarr array key (e.g. ``"atac"``, ``"rna_fwd"``).

    Returns
    -------
    xr.Dataset
    """
    from .reduce import reduce_byranges_signal

    resolved = self._resolve_samples(assay=assay, samples=samples)
    indices = self._sample_indices(resolved)

    return reduce_byranges_signal(
        self,
        ranges_df=ranges_df,
        intervals_path=intervals_path,
        feature_type=feature_type,
        gtf_path=gtf_path,
        reduction=reduction,
        sample_indices=indices if len(indices) < len(self.sample_names) else None,
        array_key=self._resolve_modalities(modality) if modality is not None else None,
        **kwargs,
    )

count_features

count_features(
    gtf_file: "str | None" = None,
    bed_file: "str | None" = None,
    ranges_df=None,
    feature_type: str = "gene",
    engine: str = "signal",
    assay: "str | Sequence[str] | None" = None,
    samples: "str | Sequence[str] | None" = None,
    modality: "str | Sequence[str] | None" = None,
    **kwargs,
)

Count or quantify genomic features into a feature-by-sample matrix.

Parameters:

Name Type Description Default
gtf_file 'str | None'

Path to GTF file.

None
bed_file 'str | None'

Path to BED file.

None
ranges_df

Pre-parsed ranges DataFrame.

None
feature_type str

GTF feature level (default "gene"). Combine with feature_id_attr in kwargs for featureCounts-like -t/-g semantics, for example feature_type="exon", feature_id_attr="gene_name".

'gene'
engine str

Counting backend.

  • "signal" (default): quantify stored QuantNado signal over features. This is coverage/signal-derived summarisation and is useful for exploratory matrices, clustering, and general signal analysis.
  • "bam": reserved for future BAM-backed, featureCounts-style read/fragement assignment. Not implemented yet.
'signal'
assay 'str | Sequence[str] | None'

Restrict to samples of this assay type.

None
samples 'str | Sequence[str] | None'

Explicit sample names (overrides assay).

None
modality 'str | Sequence[str] | None'

Zarr array key hint (e.g. "rna_fwd").

None

Returns:

Type Description
tuple[DataFrame, DataFrame]

(counts_df, feature_metadata)

Source code in quantnado/analysis/core.py
def count_features(
    self,
    gtf_file: "str | None" = None,
    bed_file: "str | None" = None,
    ranges_df=None,
    feature_type: str = "gene",
    engine: str = "signal",
    assay: "str | Sequence[str] | None" = None,
    samples: "str | Sequence[str] | None" = None,
    modality: "str | Sequence[str] | None" = None,
    **kwargs,
):
    """Count or quantify genomic features into a feature-by-sample matrix.

    Parameters
    ----------
    gtf_file:
        Path to GTF file.
    bed_file:
        Path to BED file.
    ranges_df:
        Pre-parsed ranges DataFrame.
    feature_type:
        GTF feature level (default ``"gene"``).
        Combine with ``feature_id_attr`` in ``kwargs`` for featureCounts-like
        ``-t/-g`` semantics, for example ``feature_type="exon"``,
        ``feature_id_attr="gene_name"``.
    engine:
        Counting backend.

        - ``"signal"`` (default): quantify stored QuantNado signal over features.
          This is coverage/signal-derived summarisation and is useful for
          exploratory matrices, clustering, and general signal analysis.
        - ``"bam"``: reserved for future BAM-backed, featureCounts-style
          read/fragement assignment. Not implemented yet.
    assay:
        Restrict to samples of this assay type.
    samples:
        Explicit sample names (overrides *assay*).
    modality:
        Zarr array key hint (e.g. ``"rna_fwd"``).

    Returns
    -------
    tuple[pd.DataFrame, pd.DataFrame]
        (counts_df, feature_metadata)
    """
    engine = engine.lower()
    if engine != "signal":
        if engine == "bam":
            raise NotImplementedError(
                "count_features(engine='bam') is not implemented yet. "
                "Use quantify_signal(...) or count_features(engine='signal') "
                "for stored-signal quantification today."
            )
        raise ValueError("engine must be either 'signal' or 'bam'")

    from .counts import count_features as _count_features

    resolved = self._resolve_samples(assay=assay, samples=samples)
    return _count_features(
        self,
        ranges_df=ranges_df,
        bed_file=bed_file,
        gtf_file=gtf_file,
        feature_type=feature_type,
        samples=resolved if len(resolved) < len(self.sample_names) else None,
        modality=self._resolve_modalities(modality) if modality is not None else None,
        **kwargs,
    )

quantify_signal

quantify_signal(
    gtf_file: "str | None" = None,
    bed_file: "str | None" = None,
    ranges_df=None,
    feature_type: str = "gene",
    assay: "str | Sequence[str] | None" = None,
    samples: "str | Sequence[str] | None" = None,
    modality: "str | Sequence[str] | None" = None,
    return_metadata: bool = True,
    **kwargs,
)

Quantify stored signal over genomic features.

This is the explicit signal-based alternative to BAM-backed counting. Internally it reuses the current count_features(engine="signal") implementation and is intended for exploratory analysis, clustering, PCA, and assay-agnostic feature summarisation.

Parameters:

Name Type Description Default
gtf_file 'str | None'

Same feature selection inputs accepted by :meth:count_features.

None
bed_file 'str | None'

Same feature selection inputs accepted by :meth:count_features.

None
ranges_df 'str | None'

Same feature selection inputs accepted by :meth:count_features.

None
feature_type 'str | None'

Same feature selection inputs accepted by :meth:count_features.

None
assay 'str | Sequence[str] | None'

Restrict to samples of this assay type.

None
samples 'str | Sequence[str] | None'

Explicit sample names (overrides assay).

None
modality 'str | Sequence[str] | None'

Concrete zarr array key to quantify, for example "coverage" or "rna_fwd".

None
return_metadata bool

If True (default), return (matrix, feature_metadata). If False, return only the quantified matrix.

True

Returns:

Type Description
DataFrame | tuple[DataFrame, DataFrame]

Feature-by-sample quantified matrix, optionally with aligned feature metadata.

Source code in quantnado/analysis/core.py
def quantify_signal(
    self,
    gtf_file: "str | None" = None,
    bed_file: "str | None" = None,
    ranges_df=None,
    feature_type: str = "gene",
    assay: "str | Sequence[str] | None" = None,
    samples: "str | Sequence[str] | None" = None,
    modality: "str | Sequence[str] | None" = None,
    return_metadata: bool = True,
    **kwargs,
):
    """Quantify stored signal over genomic features.

    This is the explicit signal-based alternative to BAM-backed counting.
    Internally it reuses the current ``count_features(engine="signal")``
    implementation and is intended for exploratory analysis, clustering,
    PCA, and assay-agnostic feature summarisation.

    Parameters
    ----------
    gtf_file, bed_file, ranges_df, feature_type:
        Same feature selection inputs accepted by :meth:`count_features`.
    assay:
        Restrict to samples of this assay type.
    samples:
        Explicit sample names (overrides *assay*).
    modality:
        Concrete zarr array key to quantify, for example ``"coverage"``
        or ``"rna_fwd"``.
    return_metadata:
        If True (default), return ``(matrix, feature_metadata)``.
        If False, return only the quantified matrix.

    Returns
    -------
    pd.DataFrame | tuple[pd.DataFrame, pd.DataFrame]
        Feature-by-sample quantified matrix, optionally with aligned
        feature metadata.
    """
    kwargs.setdefault("integerize", False)
    matrix, feature_metadata = self.count_features(
        gtf_file=gtf_file,
        bed_file=bed_file,
        ranges_df=ranges_df,
        feature_type=feature_type,
        engine="signal",
        assay=assay,
        samples=samples,
        modality=modality,
        **kwargs,
    )
    return (matrix, feature_metadata) if return_metadata else matrix

normalise

normalise(
    data=None,
    method: str = "cpm",
    assay: "str | Sequence[str] | None" = None,
    samples: "str | Sequence[str] | None" = None,
    library_sizes=None,
    feature_lengths=None,
)

Normalise coverage signal or feature counts.

Parameters:

Name Type Description Default
data

xr.Dataset, xr.DataArray, or pd.DataFrame.

None
method str

"cpm", "rpkm", or "tpm".

'cpm'
assay 'str | Sequence[str] | None'

Pre-filter data to samples of this assay type.

None
samples 'str | Sequence[str] | None'

Explicit sample names (overrides assay).

None
library_sizes

Total mapped reads per sample; auto-read from store if omitted.

None
feature_lengths

Required for "rpkm" / "tpm" on DataFrames.

None
Source code in quantnado/analysis/core.py
def normalise(
    self,
    data=None,
    method: str = "cpm",
    assay: "str | Sequence[str] | None" = None,
    samples: "str | Sequence[str] | None" = None,
    library_sizes=None,
    feature_lengths=None,
):
    """Normalise coverage signal or feature counts.

    Parameters
    ----------
    data:
        xr.Dataset, xr.DataArray, or pd.DataFrame.
    method:
        ``"cpm"``, ``"rpkm"``, or ``"tpm"``.
    assay:
        Pre-filter data to samples of this assay type.
    samples:
        Explicit sample names (overrides *assay*).
    library_sizes:
        Total mapped reads per sample; auto-read from store if omitted.
    feature_lengths:
        Required for ``"rpkm"`` / ``"tpm"`` on DataFrames.
    """
    from .normalise import normalise as _normalise

    if data is None:
        return NormalisedQuantNadoDataset(
            self,
            method=method,
            library_sizes=library_sizes,
            feature_lengths=feature_lengths,
        )

    if assay is not None or samples is not None:
        resolved = self._resolve_samples(assay=assay, samples=samples)
        data = self._filter_sample_data(data, resolved)

    return _normalise(
        data,
        self,
        method=method,
        library_sizes=library_sizes,
        feature_lengths=feature_lengths,
    )

library_sizes

library_sizes(
    assay: "str | Sequence[str] | None" = None,
    samples: "str | Sequence[str] | None" = None,
)

Return total mapped reads per sample as a pd.Series.

Parameters:

Name Type Description Default
assay 'str | Sequence[str] | None'

Restrict to samples of this assay type.

None
samples 'str | Sequence[str] | None'

Explicit sample names (overrides assay).

None
Source code in quantnado/analysis/core.py
def library_sizes(
    self,
    assay: "str | Sequence[str] | None" = None,
    samples: "str | Sequence[str] | None" = None,
):
    """Return total mapped reads per sample as a pd.Series.

    Parameters
    ----------
    assay:
        Restrict to samples of this assay type.
    samples:
        Explicit sample names (overrides *assay*).
    """
    from .normalise import get_library_sizes

    sizes = get_library_sizes(self)
    if assay is not None or samples is not None:
        resolved = self._resolve_samples(assay=assay, samples=samples)
        sizes = sizes.reindex(resolved)
    return sizes

pca

pca(
    data_or_query=None,
    n_components: int = 5,
    assay: "str | Sequence[str] | None" = None,
    samples: "str | Sequence[str] | None" = None,
    modality: "str | Sequence[str] | None" = None,
    chromosome: "str | None" = None,
    nan_handling_strategy: str = "drop",
    standardize: bool = False,
    random_state: "int | None" = None,
    subset_size: "int | None" = None,
    subset_strategy: str = "random",
)

Run PCA on reduced genomic signal.

Parameters:

Name Type Description Default
data_or_query

Either a 2D DataArray, a chromosome name string, or None. When a chromosome or None is provided, data are auto-extracted from :meth:sel using modality (default "coverage").

None
n_components int

Number of principal components.

5
assay 'str | Sequence[str] | None'

Pre-filter samples before PCA.

None
samples 'str | Sequence[str] | None'

Explicit sample names (overrides assay).

None

Returns:

Type Description
tuple[PCA, DataArray]
Source code in quantnado/analysis/core.py
def pca(
    self,
    data_or_query=None,
    n_components: int = 5,
    assay: "str | Sequence[str] | None" = None,
    samples: "str | Sequence[str] | None" = None,
    modality: "str | Sequence[str] | None" = None,
    chromosome: "str | None" = None,
    nan_handling_strategy: str = "drop",
    standardize: bool = False,
    random_state: "int | None" = None,
    subset_size: "int | None" = None,
    subset_strategy: str = "random",
):
    """Run PCA on reduced genomic signal.

    Parameters
    ----------
    data_or_query:
        Either a 2D DataArray, a chromosome name string, or ``None``.
        When a chromosome or ``None`` is provided, data are auto-extracted
        from :meth:`sel` using ``modality`` (default ``"coverage"``).
    n_components:
        Number of principal components.
    assay:
        Pre-filter samples before PCA.
    samples:
        Explicit sample names (overrides *assay*).

    Returns
    -------
    tuple[PCA, xr.DataArray]
    """
    from .pca import run_pca as _run_pca

    pca_chromosome = chromosome

    if isinstance(data_or_query, xr.DataArray):
        data = data_or_query
        if assay is not None or samples is not None:
            data = self._filter_sample_data(
                data, self._resolve_samples(assay=assay, samples=samples)
            )
        if not any(coord in data.coords for coord in ("contig", "chrom")):
            pca_chromosome = None
    elif isinstance(data_or_query, xr.Dataset):
        data = data_or_query
        if assay is not None or samples is not None:
            data = self._filter_sample_data(
                data, self._resolve_samples(assay=assay, samples=samples)
            )
        resolved_modality = self._resolve_modalities(modality, default="coverage")
        if resolved_modality not in data.data_vars:
            raise ValueError(
                f"Modality '{resolved_modality}' not found in dataset input. "
                f"Available: {list(data.data_vars)}"
            )
        data = data[resolved_modality]
        if not any(coord in data.coords for coord in ("contig", "chrom")):
            pca_chromosome = None
    else:
        query_chrom = data_or_query if isinstance(data_or_query, str) else chromosome
        if query_chrom is None:
            raise ValueError("Provide a DataArray, a chromosome name, or set chromosome=...")
        selected = self.sel(query_chrom, assay=assay, samples=samples)
        resolved_modality = self._resolve_modalities(modality, default="coverage")
        if resolved_modality not in selected.data_vars:
            raise ValueError(
                f"Modality '{resolved_modality}' not found for chromosome '{query_chrom}'. "
                f"Available: {list(selected.data_vars)}"
            )
        data = selected[resolved_modality]

    return _run_pca(
        data,
        n_components=n_components,
        chromosome=pca_chromosome,
        nan_handling_strategy=nan_handling_strategy,
        standardize=standardize,
        random_state=random_state,
        subset_size=subset_size,
        subset_strategy=subset_strategy,
    )

pca_scree

pca_scree(pca_obj, **kwargs)

Plot PCA scree (explained variance).

Source code in quantnado/analysis/core.py
def pca_scree(self, pca_obj, **kwargs):
    """Plot PCA scree (explained variance)."""
    from .pca import plot_pca_scree

    return plot_pca_scree(pca_obj, **kwargs)

pca_scatter

pca_scatter(
    pca_obj,
    pca_result,
    colour_by=None,
    shape_by=None,
    **kwargs,
)

Scatter plot of PCA-transformed samples.

Source code in quantnado/analysis/core.py
def pca_scatter(self, pca_obj, pca_result, colour_by=None, shape_by=None, **kwargs):
    """Scatter plot of PCA-transformed samples."""
    from .pca import plot_pca_scatter

    return plot_pca_scatter(
        pca_obj, pca_result, colour_by=colour_by, shape_by=shape_by, **kwargs
    )

metaplot

metaplot(
    data,
    data_rev=None,
    *,
    assay: "str | Sequence[str] | None" = None,
    samples: "str | Sequence[str] | None" = None,
    modality: "str | Sequence[str] | None" = None,
    **kwargs,
)

Plot a metagene profile. See :func:quantnado.analysis.plot.metaplot.

Source code in quantnado/analysis/core.py
def metaplot(
    self,
    data,
    data_rev=None,
    *,
    assay: "str | Sequence[str] | None" = None,
    samples: "str | Sequence[str] | None" = None,
    modality: "str | Sequence[str] | None" = None,
    **kwargs,
):
    """Plot a metagene profile. See :func:`quantnado.analysis.plot.metaplot`."""
    from .plot import metaplot as _metaplot

    if assay is not None or samples is not None:
        resolved = self._resolve_samples(assay=assay, samples=samples)
        data = self._filter_sample_data(data, resolved)
        if data_rev is not None:
            data_rev = self._filter_sample_data(data_rev, resolved)

    resolved_modality = self._resolve_modalities(modality) if modality is not None else None
    return _metaplot(data, data_rev, modality=resolved_modality, **kwargs)

tornadoplot

tornadoplot(
    data,
    data_rev=None,
    *,
    assay: "str | Sequence[str] | None" = None,
    samples: "str | Sequence[str] | None" = None,
    modality: "str | Sequence[str] | None" = None,
    **kwargs,
)

Tornado / heatmap plot. See :func:quantnado.analysis.plot.tornadoplot.

Source code in quantnado/analysis/core.py
def tornadoplot(
    self,
    data,
    data_rev=None,
    *,
    assay: "str | Sequence[str] | None" = None,
    samples: "str | Sequence[str] | None" = None,
    modality: "str | Sequence[str] | None" = None,
    **kwargs,
):
    """Tornado / heatmap plot. See :func:`quantnado.analysis.plot.tornadoplot`."""
    from .plot import tornadoplot as _tornadoplot

    if assay is not None or samples is not None:
        data = self._filter_sample_data(
            data, self._resolve_samples(assay=assay, samples=samples)
        )

    resolved_modality = self._resolve_modalities(modality) if modality is not None else None
    return _tornadoplot(data, data_rev, modality=resolved_modality, **kwargs)

heatmap

heatmap(
    data,
    *,
    assay: "str | Sequence[str] | None" = None,
    samples: "str | Sequence[str] | None" = None,
    exclude_zeros: bool = False,
    zscore: "int | None" = None,
    **kwargs,
)

Heatmap of reduced signal. See :func:quantnado.analysis.plot.heatmap.

Source code in quantnado/analysis/core.py
def heatmap(
    self,
    data,
    *,
    assay: "str | Sequence[str] | None" = None,
    samples: "str | Sequence[str] | None" = None,
    exclude_zeros: bool = False,
    zscore: "int | None" = None,
    **kwargs,
):
    """Heatmap of reduced signal. See :func:`quantnado.analysis.plot.heatmap`."""
    from .plot import heatmap as _heatmap

    if assay is not None or samples is not None:
        data = self._filter_sample_data(
            data, self._resolve_samples(assay=assay, samples=samples)
        )

    return _heatmap(data, exclude_zeros=exclude_zeros, zscore=zscore, **kwargs)

correlate

correlate(
    data,
    *,
    assay: "str | Sequence[str] | None" = None,
    samples: "str | Sequence[str] | None" = None,
    **kwargs,
)

Compute and plot sample correlation. See :func:quantnado.analysis.plot.correlate.

Source code in quantnado/analysis/core.py
def correlate(
    self,
    data,
    *,
    assay: "str | Sequence[str] | None" = None,
    samples: "str | Sequence[str] | None" = None,
    **kwargs,
):
    """Compute and plot sample correlation. See :func:`quantnado.analysis.plot.correlate`."""
    from .plot import correlate as _correlate

    if assay is not None or samples is not None:
        data = self._filter_sample_data(
            data, self._resolve_samples(assay=assay, samples=samples)
        )

    return _correlate(data, **kwargs)

locus_plot

locus_plot(
    locus,
    sample_names,
    modality=None,
    assay: "str | Sequence[str] | None" = None,
    **kwargs,
)

Plot a genomic locus. See :func:quantnado.analysis.plot.locus_plot.

Source code in quantnado/analysis/core.py
def locus_plot(
    self,
    locus,
    sample_names,
    modality=None,
    assay: "str | Sequence[str] | None" = None,
    **kwargs,
):
    """Plot a genomic locus. See :func:`quantnado.analysis.plot.locus_plot`."""
    from .plot import locus_plot

    if assay is not None:
        allowed = set(self._resolve_samples(assay=assay))
        sample_names = [s for s in sample_names if s in allowed]

    if isinstance(sample_names, str):
        sample_names = [sample_names]
    else:
        sample_names = list(sample_names)

    if modality is None:
        assay_by_sample = dict(zip(self.sample_names, self._get_assay_per_sample()))
        modalities = [
            "stranded_coverage"
            if assay_by_sample.get(sample_name, "").upper() == "RNA"
            else "methylation"
            if assay_by_sample.get(sample_name, "").upper() == "METH"
            else "variant"
            if assay_by_sample.get(sample_name, "").upper() == "SNP"
            else "coverage"
            for sample_name in sample_names
        ]
    else:
        modalities = self._resolve_modalities(modality, allow_multiple=True)
    return locus_plot(locus, sample_names=sample_names, modality=modalities, **kwargs)

extract_region

extract_region(
    region: str,
    samples=None,
    array_key: "str | None" = None,
) -> "xr.DataArray"

Extract a genomic region as an xr.DataArray for plotnado coverage tracks.

Parameters:

Name Type Description Default
region str

Genomic region string, e.g. "chr1:1000000-1001000".

required
samples

Sample name(s) to include. None returns all samples.

None
array_key 'str | None'

Explicit zarr array key (e.g. "atac", "chip_h3k27ac"). When omitted, the key that owns the requested sample is used.

None
Source code in quantnado/analysis/core.py
def extract_region(
    self,
    region: str,
    samples=None,
    array_key: "str | None" = None,
) -> "xr.DataArray":
    """Extract a genomic region as an ``xr.DataArray`` for plotnado coverage tracks.

    Parameters
    ----------
    region:
        Genomic region string, e.g. ``"chr1:1000000-1001000"``.
    samples:
        Sample name(s) to include. ``None`` returns all samples.
    array_key:
        Explicit zarr array key (e.g. ``"atac"``, ``"chip_h3k27ac"``).
        When omitted, the key that owns the requested sample is used.
    """
    chrom, start, end = _parse_plotnado_region(region)
    ds = self.sel(chrom, start, end, samples=samples)
    if array_key is not None:
        return ds[array_key]

    # When a single sample is requested, look up its owning key directly
    # so ChIP / CUT&TAG samples don't fall back to the first alphabetical key.
    if samples is not None:
        requested = [samples] if isinstance(samples, str) else list(samples)
        if len(requested) == 1:
            key = self._primary_array_key_for_sample(requested[0])
            if key is not None and key in ds:
                return ds[key]

    for key in ds.data_vars:
        if key not in _PLOTNADO_COVERAGE_SKIP:
            return ds[key]
    keys = list(ds.data_vars)
    if keys:
        return ds[keys[0]]
    raise KeyError("No array data found in this region")

normalised

normalised(
    method: str = "cpm",
    library_sizes: "pd.Series | dict | None" = None,
) -> "NormalisedQuantNadoDataset"

Compatibility alias for :meth:normalise with data=None.

Source code in quantnado/analysis/core.py
def normalised(
    self,
    method: str = "cpm",
    library_sizes: "pd.Series | dict | None" = None,
) -> "NormalisedQuantNadoDataset":
    """Compatibility alias for :meth:`normalise` with ``data=None``."""
    return self.normalise(method=method, library_sizes=library_sizes)

info_of

info_of(obj) -> ObjectInfo

Return a compact summary for xarray / pandas objects.

Source code in quantnado/analysis/core.py
def info_of(self, obj) -> ObjectInfo:
    """Return a compact summary for xarray / pandas objects."""
    if isinstance(obj, xr.DataArray):
        name = obj.name or "<unnamed>"
        return ObjectInfo(
            {
                "type": "DataArray",
                "name": name,
                "dims": list(obj.dims),
                "sizes": {k: int(v) for k, v in obj.sizes.items()},
                "dtype": str(obj.dtype),
                "coords": list(obj.coords),
            }
        )
    if isinstance(obj, xr.Dataset):
        return ObjectInfo(
            {
                "type": "Dataset",
                "dims": list(obj.dims),
                "sizes": {k: int(v) for k, v in obj.sizes.items()},
                "data_vars": list(obj.data_vars),
                "coords": list(obj.coords),
            }
        )
    if isinstance(obj, pd.DataFrame):
        return ObjectInfo(
            {
                "type": "DataFrame",
                "shape": list(obj.shape),
                "columns": list(obj.columns),
                "index_name": obj.index.name,
            }
        )
    raise TypeError(
        "info_of(...) expects an xarray DataArray, xarray Dataset, or pandas DataFrame"
    )