Skip to content

API Reference

Squiggy provides both a simple functional API and an object-oriented API for working with Oxford Nanopore sequencing data.

Quick Start

from squiggy import load_pod5, load_bam, plot_read, get_read_ids

# Load POD5 file (populates global kernel state)
reader, read_ids = load_pod5("data.pod5")

# Optionally load BAM for base annotations
load_bam("alignments.bam")

# Generate plot (routes to Positron Plots pane automatically)
html = plot_read(read_ids[0])

File I/O

load_pod5

load_pod5(
    file_path: str,
    build_index: bool = True,
    use_cache: bool = True,
) -> None

Load a POD5 file into the global kernel session (OPTIMIZED)

This function mutates the global squiggy_kernel object, making POD5 data available for subsequent plotting and analysis calls.

Performance optimizations: - Lazy read ID loading (O(1) memory vs. O(n)) - Optional index building for O(1) lookups - Persistent caching for instant subsequent loads

Parameters:

Name Type Description Default
file_path str

Path to POD5 file

required
build_index bool

Whether to build read ID index (default: True)

True
use_cache bool

Whether to use persistent cache (default: True)

True

Returns:

Type Description
None

None (mutates global squiggy_kernel)

Examples:

>>> from squiggy import load_pod5
>>> from squiggy.io import squiggy_kernel
>>> load_pod5('data.pod5')
>>> print(f"Loaded {len(squiggy_kernel._read_ids)} reads")
>>> # Session is available as squiggy_kernel in kernel
>>> first_read = next(squiggy_kernel._reader.reads())
Source code in squiggy/io.py
def load_pod5(file_path: str, build_index: bool = True, use_cache: bool = True) -> None:
    """
    Load a POD5 file into the global kernel session (OPTIMIZED)

    This function mutates the global squiggy_kernel object, making
    POD5 data available for subsequent plotting and analysis calls.

    Performance optimizations:
    - Lazy read ID loading (O(1) memory vs. O(n))
    - Optional index building for O(1) lookups
    - Persistent caching for instant subsequent loads

    Args:
        file_path: Path to POD5 file
        build_index: Whether to build read ID index (default: True)
        use_cache: Whether to use persistent cache (default: True)

    Returns:
        None (mutates global squiggy_kernel)

    Examples:
        >>> from squiggy import load_pod5
        >>> from squiggy.io import squiggy_kernel
        >>> load_pod5('data.pod5')
        >>> print(f"Loaded {len(squiggy_kernel._read_ids)} reads")
        >>> # Session is available as squiggy_kernel in kernel
        >>> first_read = next(squiggy_kernel._reader.reads())
    """
    # Convert to absolute path
    abs_path = Path(file_path).resolve()

    if not abs_path.exists():
        raise FileNotFoundError(f"Failed to open pod5 file at: {abs_path}")

    # Close previous reader if exists
    squiggy_kernel.close_pod5()

    # Open new reader
    reader = pod5.Reader(str(abs_path))

    # Create lazy read list (O(1) memory overhead)
    lazy_read_list = LazyReadList(reader)

    # Try to load index from cache
    cached_index = None
    if use_cache and squiggy_kernel.cache:
        cached_index = squiggy_kernel.cache.load_pod5_index(abs_path)

    # Build or restore index
    if build_index:
        if cached_index:
            # Restore from cache
            pod5_index = Pod5Index()
            pod5_index._index = cached_index
        else:
            # Build fresh
            pod5_index = Pod5Index()
            pod5_index.build(reader)

            # Save to cache for next time
            if use_cache and squiggy_kernel.cache:
                squiggy_kernel.cache.save_pod5_index(abs_path, pod5_index._index)

        squiggy_kernel._pod5_index = pod5_index
    else:
        squiggy_kernel._pod5_index = None

    # Store state in session
    squiggy_kernel._reader = reader
    squiggy_kernel._pod5_path = str(abs_path)
    squiggy_kernel._read_ids = lazy_read_list

load_bam

load_bam(
    file_path: str,
    build_ref_mapping: bool = True,
    use_cache: bool = True,
) -> None

Load a BAM file into the global kernel session (OPTIMIZED)

This function mutates the global squiggy_kernel object, making BAM alignment data available for subsequent plotting and analysis calls.

Performance optimizations: - Single-pass metadata collection (3-4x faster than old 4-scan approach) - Eager reference mapping (transparent cost, eliminates UI freezes) - Persistent caching for instant subsequent loads

Parameters:

Name Type Description Default
file_path str

Path to BAM file

required
build_ref_mapping bool

Whether to build reference→reads mapping (default: True)

True
use_cache bool

Whether to use persistent cache (default: True)

True

Returns:

Type Description
None

None (mutates global squiggy_kernel)

Examples:

>>> from squiggy import load_bam
>>> from squiggy.io import squiggy_kernel
>>> load_bam('alignments.bam')
>>> print(squiggy_kernel._bam_info['references'])
>>> if squiggy_kernel._bam_info['has_modifications']:
...     print(f"Modifications: {squiggy_kernel._bam_info['modification_types']}")
>>> if squiggy_kernel._bam_info['has_event_alignment']:
...     print("Event alignment data available")
Source code in squiggy/io.py
def load_bam(
    file_path: str, build_ref_mapping: bool = True, use_cache: bool = True
) -> None:
    """
    Load a BAM file into the global kernel session (OPTIMIZED)

    This function mutates the global squiggy_kernel object, making
    BAM alignment data available for subsequent plotting and analysis calls.

    Performance optimizations:
    - Single-pass metadata collection (3-4x faster than old 4-scan approach)
    - Eager reference mapping (transparent cost, eliminates UI freezes)
    - Persistent caching for instant subsequent loads

    Args:
        file_path: Path to BAM file
        build_ref_mapping: Whether to build reference→reads mapping (default: True)
        use_cache: Whether to use persistent cache (default: True)

    Returns:
        None (mutates global squiggy_kernel)

    Examples:
        >>> from squiggy import load_bam
        >>> from squiggy.io import squiggy_kernel
        >>> load_bam('alignments.bam')
        >>> print(squiggy_kernel._bam_info['references'])
        >>> if squiggy_kernel._bam_info['has_modifications']:
        ...     print(f"Modifications: {squiggy_kernel._bam_info['modification_types']}")
        >>> if squiggy_kernel._bam_info['has_event_alignment']:
        ...     print("Event alignment data available")
    """
    # Convert to absolute path
    abs_path = Path(file_path).resolve()

    if not abs_path.exists():
        raise FileNotFoundError(f"BAM file not found: {abs_path}")

    # Try cache first for complete metadata (Phase 2 optimization)
    metadata = None
    if use_cache and squiggy_kernel.cache:
        metadata = squiggy_kernel.cache.load_bam_metadata(abs_path)

    # If cache miss or disabled, collect fresh metadata (Phase 1 single-pass scan)
    if metadata is None:
        metadata = _collect_bam_metadata_single_pass(abs_path, build_ref_mapping)

        # Save to cache for instant future loads (Phase 2)
        if use_cache and squiggy_kernel.cache:
            squiggy_kernel.cache.save_bam_metadata(abs_path, metadata)

    # Build metadata dict for session
    bam_info = {
        "file_path": str(abs_path),
        "num_reads": metadata["num_reads"],
        "references": metadata["references"],
        "has_modifications": metadata["has_modifications"],
        "modification_types": metadata["modification_types"],
        "has_probabilities": metadata["has_probabilities"],
        "has_event_alignment": metadata["has_event_alignment"],
    }

    # Get ref_mapping from metadata
    ref_mapping = metadata.get("ref_mapping")

    # Store state in session
    squiggy_kernel._bam_path = str(abs_path)
    squiggy_kernel._bam_info = bam_info
    squiggy_kernel._ref_mapping = ref_mapping

load_fasta

load_fasta(file_path: str) -> None

Load a FASTA file into the global kernel session

This function mutates the global squiggy_kernel object, making FASTA reference sequences available for subsequent motif search and analysis calls.

If a FASTA index (.fai) doesn't exist, it will be automatically created using pysam.faidx().

Parameters:

Name Type Description Default
file_path str

Path to FASTA file (index will be created if missing)

required

Returns:

Type Description
None

None (mutates global squiggy_kernel)

Examples:

>>> from squiggy import load_fasta
>>> from squiggy.io import squiggy_kernel
>>> load_fasta('genome.fa')  # Creates .fai index if needed
>>> print(squiggy_kernel._fasta_info['references'])
>>> # Use with motif search
>>> from squiggy.motif import search_motif
>>> matches = list(search_motif(squiggy_kernel._fasta_path, "DRACH"))
Source code in squiggy/io.py
def load_fasta(file_path: str) -> None:
    """
    Load a FASTA file into the global kernel session

    This function mutates the global squiggy_kernel object, making
    FASTA reference sequences available for subsequent motif search and
    analysis calls.

    If a FASTA index (.fai) doesn't exist, it will be automatically created
    using pysam.faidx().

    Args:
        file_path: Path to FASTA file (index will be created if missing)

    Returns:
        None (mutates global squiggy_kernel)

    Examples:
        >>> from squiggy import load_fasta
        >>> from squiggy.io import squiggy_kernel
        >>> load_fasta('genome.fa')  # Creates .fai index if needed
        >>> print(squiggy_kernel._fasta_info['references'])
        >>> # Use with motif search
        >>> from squiggy.motif import search_motif
        >>> matches = list(search_motif(squiggy_kernel._fasta_path, "DRACH"))
    """
    # Convert to absolute path
    abs_path = os.path.abspath(file_path)

    if not os.path.exists(abs_path):
        raise FileNotFoundError(f"FASTA file not found: {abs_path}")

    # Check for index, create if missing
    fai_path = abs_path + ".fai"
    if not os.path.exists(fai_path):
        try:
            pysam.faidx(abs_path)
        except Exception as e:
            raise RuntimeError(
                f"Failed to create FASTA index for {abs_path}. "
                f"Error: {e}. You can also create it manually with: samtools faidx {abs_path}"
            ) from e

    # Open FASTA file to get metadata
    fasta = pysam.FastaFile(abs_path)

    try:
        # Extract reference information
        references = list(fasta.references)
        lengths = list(fasta.lengths)

        # Build metadata dict
        fasta_info = {
            "file_path": abs_path,
            "references": references,
            "num_references": len(references),
            "reference_lengths": dict(zip(references, lengths, strict=True)),
        }

    finally:
        fasta.close()

    # Store state in session (no global keyword needed - just mutating object!)
    squiggy_kernel._fasta_path = abs_path
    squiggy_kernel._fasta_info = fasta_info

close_pod5

close_pod5()

Close the currently open POD5 reader

Call this to free resources when done.

Examples:

>>> from squiggy import load_pod5, close_pod5
>>> load_pod5('data.pod5')
>>> # ... work with data ...
>>> close_pod5()
Source code in squiggy/io.py
def close_pod5():
    """
    Close the currently open POD5 reader

    Call this to free resources when done.

    Examples:
        >>> from squiggy import load_pod5, close_pod5
        >>> load_pod5('data.pod5')
        >>> # ... work with data ...
        >>> close_pod5()
    """
    # Clear session (no global keyword needed!)
    squiggy_kernel.close_pod5()

close_bam

close_bam()

Clear the currently loaded BAM file state

Call this to free BAM-related resources when done. Unlike close_pod5(), this doesn't need to close a file handle since BAM files are opened and closed per-operation.

Examples:

>>> from squiggy import load_bam, close_bam
>>> load_bam('alignments.bam')
>>> # ... work with alignments ...
>>> close_bam()
Source code in squiggy/io.py
def close_bam():
    """
    Clear the currently loaded BAM file state

    Call this to free BAM-related resources when done. Unlike close_pod5(),
    this doesn't need to close a file handle since BAM files are opened
    and closed per-operation.

    Examples:
        >>> from squiggy import load_bam, close_bam
        >>> load_bam('alignments.bam')
        >>> # ... work with alignments ...
        >>> close_bam()
    """
    # Clear session (no global keyword needed!)
    squiggy_kernel.close_bam()

close_fasta

close_fasta()

Clear the currently loaded FASTA file state

Call this to free FASTA-related resources when done. Unlike close_pod5(), this doesn't need to close a file handle since FASTA files are opened and closed per-operation.

Examples:

>>> from squiggy import load_fasta, close_fasta
>>> load_fasta('genome.fa')
>>> # ... work with sequences ...
>>> close_fasta()
Source code in squiggy/io.py
def close_fasta():
    """
    Clear the currently loaded FASTA file state

    Call this to free FASTA-related resources when done. Unlike close_pod5(),
    this doesn't need to close a file handle since FASTA files are opened
    and closed per-operation.

    Examples:
        >>> from squiggy import load_fasta, close_fasta
        >>> load_fasta('genome.fa')
        >>> # ... work with sequences ...
        >>> close_fasta()
    """
    # Clear session (no global keyword needed!)
    squiggy_kernel.close_fasta()

get_current_files

get_current_files() -> dict[str, str | None]

Get paths of currently loaded files

Returns:

Type Description
dict[str, str | None]

Dict with pod5_path and bam_path (may be None)

Source code in squiggy/io.py
def get_current_files() -> dict[str, str | None]:
    """
    Get paths of currently loaded files

    Returns:
        Dict with pod5_path and bam_path (may be None)
    """
    return {
        "pod5_path": squiggy_kernel._pod5_path,
        "bam_path": squiggy_kernel._bam_path,
    }

get_read_ids

get_read_ids() -> list[str]

Get list of read IDs from currently loaded POD5 file

Returns:

Type Description
list[str]

List of read ID strings (materialized from lazy list if needed)

Source code in squiggy/io.py
def get_read_ids() -> list[str]:
    """
    Get list of read IDs from currently loaded POD5 file

    Returns:
        List of read ID strings (materialized from lazy list if needed)
    """
    if not squiggy_kernel._read_ids:
        raise ValueError("No POD5 file is currently loaded")

    # Convert LazyReadList to list if needed
    if isinstance(squiggy_kernel._read_ids, LazyReadList):
        return list(squiggy_kernel._read_ids)
    return squiggy_kernel._read_ids

get_bam_modification_info

get_bam_modification_info(file_path: str) -> dict

Check if BAM file contains base modification tags (MM/ML)

Parameters:

Name Type Description Default
file_path str

Path to BAM file

required

Returns:

Type Description
dict

Dict with: - has_modifications: bool - modification_types: list of modification codes (e.g., ['m', 'h']) - sample_count: number of reads checked

Examples:

>>> from squiggy import get_bam_modification_info
>>> mod_info = get_bam_modification_info('alignments.bam')
>>> if mod_info['has_modifications']:
...     print(f"Found modifications: {mod_info['modification_types']}")
Source code in squiggy/io.py
def get_bam_modification_info(file_path: str) -> dict:
    """
    Check if BAM file contains base modification tags (MM/ML)

    Args:
        file_path: Path to BAM file

    Returns:
        Dict with:
            - has_modifications: bool
            - modification_types: list of modification codes (e.g., ['m', 'h'])
            - sample_count: number of reads checked

    Examples:
        >>> from squiggy import get_bam_modification_info
        >>> mod_info = get_bam_modification_info('alignments.bam')
        >>> if mod_info['has_modifications']:
        ...     print(f"Found modifications: {mod_info['modification_types']}")
    """
    if not os.path.exists(file_path):
        raise FileNotFoundError(f"BAM file not found: {file_path}")

    from .constants import BAM_SAMPLE_SIZE

    modification_types = set()
    has_modifications = False
    has_ml = False
    reads_checked = 0
    max_reads_to_check = BAM_SAMPLE_SIZE  # Sample first N reads

    try:
        bam = pysam.AlignmentFile(file_path, "rb", check_sq=False)

        for read in bam.fetch(until_eof=True):
            if reads_checked >= max_reads_to_check:
                break

            reads_checked += 1

            # Check for modifications using pysam's modified_bases property
            # This is cleaner and more reliable than parsing MM tags manually
            if hasattr(read, "modified_bases") and read.modified_bases:
                has_modifications = True

                # modified_bases returns dict with format:
                # {(canonical_base, strand, mod_code): [(position, quality), ...]}
                for (
                    _canonical_base,
                    _strand,
                    mod_code,
                ), _mod_list in read.modified_bases.items():
                    # mod_code can be str (e.g., 'm') or int (e.g., 17596)
                    # Store as-is to preserve type
                    modification_types.add(mod_code)

            # Check for ML tag (modification probabilities)
            if read.has_tag("ML"):
                has_ml = True

        bam.close()

    except Exception as e:
        logger.warning(f"Error checking BAM modifications: {e}")
        return {
            "has_modifications": False,
            "modification_types": [],
            "sample_count": 0,
            "has_probabilities": False,
        }

    # Convert modification_types to list, handling mixed str/int types
    # Sort with custom key that converts to string for comparison
    mod_types_list = sorted(modification_types, key=str)

    return {
        "has_modifications": has_modifications,
        "modification_types": mod_types_list,
        "sample_count": reads_checked,
        "has_probabilities": has_ml,
    }

get_bam_event_alignment_status

get_bam_event_alignment_status(file_path: str) -> bool

Check if BAM file contains event alignment data (mv tag)

The mv tag contains the move table from basecalling, which maps nanopore signal events to basecalled nucleotides. This is required for event-aligned plotting mode.

Parameters:

Name Type Description Default
file_path str

Path to BAM file

required

Returns:

Type Description
bool

True if mv tag is found in sampled reads

Examples:

>>> from squiggy import get_bam_event_alignment_status
>>> has_events = get_bam_event_alignment_status('alignments.bam')
>>> if has_events:
...     print("BAM contains event alignment data")
Source code in squiggy/io.py
def get_bam_event_alignment_status(file_path: str) -> bool:
    """
    Check if BAM file contains event alignment data (mv tag)

    The mv tag contains the move table from basecalling, which maps
    nanopore signal events to basecalled nucleotides. This is required
    for event-aligned plotting mode.

    Args:
        file_path: Path to BAM file

    Returns:
        True if mv tag is found in sampled reads

    Examples:
        >>> from squiggy import get_bam_event_alignment_status
        >>> has_events = get_bam_event_alignment_status('alignments.bam')
        >>> if has_events:
        ...     print("BAM contains event alignment data")
    """
    from .constants import BAM_SAMPLE_SIZE

    if not os.path.exists(file_path):
        raise FileNotFoundError(f"BAM file not found: {file_path}")

    max_reads_to_check = BAM_SAMPLE_SIZE  # Sample first N reads

    try:
        bam = pysam.AlignmentFile(file_path, "rb", check_sq=False)

        for i, read in enumerate(bam.fetch(until_eof=True)):
            if i >= max_reads_to_check:
                break

            # Check for mv tag (move table)
            if read.has_tag("mv"):
                bam.close()
                return True

        bam.close()

    except Exception as e:
        logger.warning(f"Error checking BAM event alignment: {e}")
        return False

    return False

get_read_to_reference_mapping

get_read_to_reference_mapping() -> dict[str, list[str]]

Get mapping of reference names to read IDs from currently loaded BAM

Returns:

Type Description
dict[str, list[str]]

Dict mapping reference name to list of read IDs

Raises:

Type Description
RuntimeError

If no BAM file is loaded

Examples:

>>> from squiggy import load_bam, get_read_to_reference_mapping
>>> load_bam('alignments.bam')
>>> mapping = get_read_to_reference_mapping()
>>> print(f"References: {list(mapping.keys())}")
Source code in squiggy/io.py
def get_read_to_reference_mapping() -> dict[str, list[str]]:
    """
    Get mapping of reference names to read IDs from currently loaded BAM

    Returns:
        Dict mapping reference name to list of read IDs

    Raises:
        RuntimeError: If no BAM file is loaded

    Examples:
        >>> from squiggy import load_bam, get_read_to_reference_mapping
        >>> load_bam('alignments.bam')
        >>> mapping = get_read_to_reference_mapping()
        >>> print(f"References: {list(mapping.keys())}")
    """
    if squiggy_kernel._bam_path is None:
        raise RuntimeError("No BAM file is currently loaded")

    if not os.path.exists(squiggy_kernel._bam_path):
        raise FileNotFoundError(f"BAM file not found: {squiggy_kernel._bam_path}")

    # Open BAM file
    bam = pysam.AlignmentFile(squiggy_kernel._bam_path, "rb", check_sq=False)

    # Map reference to read IDs
    ref_to_reads: dict[str, list[str]] = {}

    try:
        for read in bam.fetch(until_eof=True):
            if read.is_unmapped:
                continue

            ref_name = bam.get_reference_name(read.reference_id)
            read_id = read.query_name

            if ref_name not in ref_to_reads:
                ref_to_reads[ref_name] = []

            ref_to_reads[ref_name].append(read_id)

    finally:
        bam.close()

    # Store in session (no global keyword needed!)
    squiggy_kernel._ref_mapping = ref_to_reads

    return ref_to_reads

Plotting Functions

Single File Plotting

plot_read

plot_read(
    read_id: str,
    mode: str = "SINGLE",
    normalization: str = "ZNORM",
    theme: str = "LIGHT",
    downsample: int = None,
    show_dwell_time: bool = False,
    show_labels: bool = True,
    position_label_interval: int = None,
    scale_dwell_time: bool = False,
    min_mod_probability: float = 0.5,
    enabled_mod_types: list = None,
    show_signal_points: bool = False,
    clip_x_to_alignment: bool = True,
    sample_name: str | None = None,
    coordinate_space: str = "signal",
) -> str

Generate a Bokeh HTML plot for a single read

Parameters:

Name Type Description Default
read_id str

Read ID to plot

required
mode str

Plot mode (SINGLE, EVENTALIGN)

'SINGLE'
normalization str

Normalization method (NONE, ZNORM, MEDIAN, MAD)

'ZNORM'
theme str

Color theme (LIGHT, DARK)

'LIGHT'
downsample int

Downsampling factor (1 = no downsampling, 10 = every 10th point)

None
show_dwell_time bool

Color bases by dwell time (requires event-aligned mode)

False
show_labels bool

Show base labels on plot (event-aligned mode)

True
position_label_interval int

Interval for position labels

None
scale_dwell_time bool

Scale x-axis by cumulative dwell time instead of regular time

False
min_mod_probability float

Minimum probability threshold for displaying modifications (0-1)

0.5
enabled_mod_types list

List of modification type codes to display (None = all)

None
show_signal_points bool

Show individual signal points as circles

False
clip_x_to_alignment bool

If True, x-axis shows only aligned region (default True). If False, x-axis extends to include soft-clipped regions.

True
sample_name str | None

(Multi-sample mode) Name of the sample to plot from. If provided, plots from that specific sample instead of the global session.

None
coordinate_space str

X-axis coordinate system ('signal' or 'sequence'). 'signal' uses sample indices, 'sequence' uses genomic positions (requires BAM).

'signal'

Returns:

Type Description
str

Bokeh HTML string

Examples:

>>> html = plot_read('read_001', mode='EVENTALIGN')
>>> # Extension displays this automatically
>>> # Or save to file:
>>> with open('plot.html', 'w') as f:
>>>     f.write(html)
Source code in squiggy/plotting.py
def plot_read(
    read_id: str,
    mode: str = "SINGLE",
    normalization: str = "ZNORM",
    theme: str = "LIGHT",
    downsample: int = None,
    show_dwell_time: bool = False,
    show_labels: bool = True,
    position_label_interval: int = None,
    scale_dwell_time: bool = False,
    min_mod_probability: float = 0.5,
    enabled_mod_types: list = None,
    show_signal_points: bool = False,
    clip_x_to_alignment: bool = True,
    sample_name: str | None = None,
    coordinate_space: str = "signal",
) -> str:
    """
    Generate a Bokeh HTML plot for a single read

    Args:
        read_id: Read ID to plot
        mode: Plot mode (SINGLE, EVENTALIGN)
        normalization: Normalization method (NONE, ZNORM, MEDIAN, MAD)
        theme: Color theme (LIGHT, DARK)
        downsample: Downsampling factor (1 = no downsampling, 10 = every 10th point)
        show_dwell_time: Color bases by dwell time (requires event-aligned mode)
        show_labels: Show base labels on plot (event-aligned mode)
        position_label_interval: Interval for position labels
        scale_dwell_time: Scale x-axis by cumulative dwell time instead of regular time
        min_mod_probability: Minimum probability threshold for displaying modifications (0-1)
        enabled_mod_types: List of modification type codes to display (None = all)
        show_signal_points: Show individual signal points as circles
        clip_x_to_alignment: If True, x-axis shows only aligned region (default True).
                             If False, x-axis extends to include soft-clipped regions.
        sample_name: (Multi-sample mode) Name of the sample to plot from. If provided,
                     plots from that specific sample instead of the global session.
        coordinate_space: X-axis coordinate system ('signal' or 'sequence').
                         'signal' uses sample indices, 'sequence' uses genomic positions (requires BAM).

    Returns:
        Bokeh HTML string

    Examples:
        >>> html = plot_read('read_001', mode='EVENTALIGN')
        >>> # Extension displays this automatically
        >>> # Or save to file:
        >>> with open('plot.html', 'w') as f:
        >>>     f.write(html)
    """

    # Determine which POD5 reader to use
    if sample_name:
        # Multi-sample mode: get reader from specific sample
        sample = squiggy_kernel.get_sample(sample_name)
        if not sample or sample._pod5_reader is None:
            raise ValueError(f"Sample '{sample_name}' not loaded or has no POD5 file.")
        reader = sample._pod5_reader
    else:
        # Single-file mode: use global reader
        reader = squiggy_kernel._reader
        if reader is None:
            raise ValueError("No POD5 file loaded. Call load_pod5() first.")

    # Apply defaults if not specified
    if downsample is None:
        downsample = DEFAULT_DOWNSAMPLE
    if position_label_interval is None:
        position_label_interval = DEFAULT_POSITION_LABEL_INTERVAL

    # Get read data (optimized with index if available)
    read_obj = get_read_by_id(read_id, sample_name=sample_name)

    if read_obj is None:
        raise ValueError(f"Read not found: {read_id}")

    # Parse parameters
    params = parse_plot_parameters(mode=mode, normalization=normalization, theme=theme)
    plot_mode = params["mode"]
    norm_method = params["normalization"]
    theme_enum = params["theme"]

    # Prepare data based on plot mode
    if plot_mode == PlotMode.SINGLE:
        # Single read mode
        data = {
            "signal": read_obj.signal,
            "read_id": read_id,
            "sample_rate": read_obj.run_info.sample_rate,
        }

        # If sequence coordinate space requested, get alignment data
        if coordinate_space == "sequence":
            if sample_name:
                sample = squiggy_kernel.get_sample(sample_name)
                bam_path = sample._bam_path if sample else None
            else:
                bam_path = squiggy_kernel._bam_path

            if bam_path is None:
                raise ValueError(
                    "Sequence coordinate space requires a BAM file. Call load_bam() first or use coordinate_space='signal'."
                )

            from .alignment import extract_alignment_from_bam

            aligned_read = extract_alignment_from_bam(bam_path, read_id)
            if aligned_read is None:
                raise ValueError(f"No alignment found for read {read_id} in BAM file.")

            data["aligned_read"] = aligned_read

        options = {
            "normalization": norm_method,
            "downsample": downsample,
            "show_signal_points": show_signal_points,
            "x_axis_mode": "dwell_time" if scale_dwell_time else "regular_time",
            "coordinate_space": coordinate_space,
        }

    elif plot_mode == PlotMode.EVENTALIGN:
        # Event-aligned mode: requires alignment
        if sample_name:
            sample = squiggy_kernel.get_sample(sample_name)
            bam_path = sample._bam_path if sample else None
            fasta_path = sample._fasta_path if sample else None
        else:
            bam_path = squiggy_kernel._bam_path
            fasta_path = squiggy_kernel._fasta_path

        if bam_path is None:
            raise ValueError(
                "EVENTALIGN mode requires a BAM file. Call load_bam() first."
            )

        from .alignment import extract_alignment_from_bam
        from .utils import get_reference_sequence_from_fasta

        aligned_read = extract_alignment_from_bam(bam_path, read_id)
        if aligned_read is None:
            raise ValueError(f"No alignment found for read {read_id} in BAM file.")

        # Fetch reference sequence (FASTA-first pattern)
        reference_sequence = ""
        if (
            hasattr(aligned_read, "reference_name")
            and hasattr(aligned_read, "reference_start")
            and hasattr(aligned_read, "reference_end")
        ):
            reference_sequence = get_reference_sequence_from_fasta(
                fasta_file=fasta_path,
                reference_name=aligned_read.reference_name,
                start=aligned_read.reference_start,
                end=aligned_read.reference_end,
                bam_file=bam_path,
                read_id=read_id,
            )

        data = {
            "reads": [(read_id, read_obj.signal, read_obj.run_info.sample_rate)],
            "aligned_reads": [aligned_read],
            "reference_sequence": reference_sequence,  # Add reference sequence
        }

        options = {
            "normalization": norm_method,
            "downsample": downsample,
            "show_dwell_time": scale_dwell_time,
            "show_labels": show_labels,
            "show_signal_points": show_signal_points,
            "position_label_interval": position_label_interval,
            "clip_x_to_alignment": clip_x_to_alignment,
        }

    else:
        raise ValueError(
            f"Plot mode {plot_mode} not supported for single read. Use SINGLE or EVENTALIGN."
        )

    # Create strategy and generate plot
    strategy = create_plot_strategy(plot_mode, theme_enum)
    html, fig = strategy.create_plot(data, options)

    # Route to Positron Plots pane if running in Positron
    _route_to_plots_pane(fig)

    return html

plot_reads

plot_reads(
    read_ids: list,
    mode: str = "OVERLAY",
    normalization: str = "ZNORM",
    theme: str = "LIGHT",
    downsample: int = None,
    show_dwell_time: bool = False,
    show_labels: bool = True,
    scale_dwell_time: bool = False,
    min_mod_probability: float = 0.5,
    enabled_mod_types: list = None,
    show_signal_points: bool = False,
    sample_name: str | None = None,
    read_sample_map: dict[str, str] | None = None,
    read_colors: dict[str, str] | None = None,
    coordinate_space: str = "signal",
) -> str

Generate a Bokeh HTML plot for multiple reads

Parameters:

Name Type Description Default
read_ids list

List of read IDs to plot

required
mode str

Plot mode (OVERLAY, STACKED, EVENTALIGN)

'OVERLAY'
normalization str

Normalization method (NONE, ZNORM, MEDIAN, MAD)

'ZNORM'
theme str

Color theme (LIGHT, DARK)

'LIGHT'
downsample int

Downsampling factor (1 = no downsampling, 10 = every 10th point)

None
show_dwell_time bool

Color bases by dwell time (EVENTALIGN mode only)

False
show_labels bool

Show base labels on plot (EVENTALIGN mode only)

True
scale_dwell_time bool

Scale x-axis by cumulative dwell time (EVENTALIGN mode only)

False
min_mod_probability float

Minimum probability threshold for displaying modifications

0.5
enabled_mod_types list

List of modification type codes to display

None
show_signal_points bool

Show individual signal points as circles

False
sample_name str | None

(Single-sample mode) Name of the sample to plot from. If provided, plots from that specific sample instead of the global session.

None
read_sample_map dict[str, str] | None

(Multi-sample mode) Dict mapping read_id → sample_name. If provided, reads are loaded from their respective samples. Takes precedence over sample_name parameter.

None
read_colors dict[str, str] | None

(Multi-sample mode) Dict mapping read_id → color hex string. If provided, each read uses its specified color instead of the default color cycling. Useful for sample-based coloring.

None
coordinate_space str

Coordinate system for x-axis ('signal' or 'sequence'). 'signal' uses raw sample points, 'sequence' uses BAM alignment positions.

'signal'

Returns:

Type Description
str

Bokeh HTML string

Examples:

>>> # Single sample
>>> html = plot_reads(['read_001', 'read_002'], mode='OVERLAY')
>>>
>>> # Multi-sample with custom colors
>>> read_map = {'read_001': 'sample_A', 'read_002': 'sample_B'}
>>> colors = {'read_001': '#E69F00', 'read_002': '#56B4E9'}
>>> html = plot_reads(['read_001', 'read_002'], mode='OVERLAY',
...                   read_sample_map=read_map, read_colors=colors)
Source code in squiggy/plotting.py
def plot_reads(
    read_ids: list,
    mode: str = "OVERLAY",
    normalization: str = "ZNORM",
    theme: str = "LIGHT",
    downsample: int = None,
    show_dwell_time: bool = False,
    show_labels: bool = True,
    scale_dwell_time: bool = False,
    min_mod_probability: float = 0.5,
    enabled_mod_types: list = None,
    show_signal_points: bool = False,
    sample_name: str | None = None,
    read_sample_map: dict[str, str] | None = None,
    read_colors: dict[str, str] | None = None,
    coordinate_space: str = "signal",
) -> str:
    """
    Generate a Bokeh HTML plot for multiple reads

    Args:
        read_ids: List of read IDs to plot
        mode: Plot mode (OVERLAY, STACKED, EVENTALIGN)
        normalization: Normalization method (NONE, ZNORM, MEDIAN, MAD)
        theme: Color theme (LIGHT, DARK)
        downsample: Downsampling factor (1 = no downsampling, 10 = every 10th point)
        show_dwell_time: Color bases by dwell time (EVENTALIGN mode only)
        show_labels: Show base labels on plot (EVENTALIGN mode only)
        scale_dwell_time: Scale x-axis by cumulative dwell time (EVENTALIGN mode only)
        min_mod_probability: Minimum probability threshold for displaying modifications
        enabled_mod_types: List of modification type codes to display
        show_signal_points: Show individual signal points as circles
        sample_name: (Single-sample mode) Name of the sample to plot from. If provided,
                     plots from that specific sample instead of the global session.
        read_sample_map: (Multi-sample mode) Dict mapping read_id → sample_name.
                         If provided, reads are loaded from their respective samples.
                         Takes precedence over sample_name parameter.
        read_colors: (Multi-sample mode) Dict mapping read_id → color hex string.
                     If provided, each read uses its specified color instead of
                     the default color cycling. Useful for sample-based coloring.
        coordinate_space: Coordinate system for x-axis ('signal' or 'sequence').
                          'signal' uses raw sample points, 'sequence' uses BAM alignment positions.

    Returns:
        Bokeh HTML string

    Examples:
        >>> # Single sample
        >>> html = plot_reads(['read_001', 'read_002'], mode='OVERLAY')
        >>>
        >>> # Multi-sample with custom colors
        >>> read_map = {'read_001': 'sample_A', 'read_002': 'sample_B'}
        >>> colors = {'read_001': '#E69F00', 'read_002': '#56B4E9'}
        >>> html = plot_reads(['read_001', 'read_002'], mode='OVERLAY',
        ...                   read_sample_map=read_map, read_colors=colors)
    """

    if not read_ids:
        raise ValueError("No read IDs provided.")

    # Apply defaults if not specified
    if downsample is None:
        downsample = DEFAULT_DOWNSAMPLE

    # Parse parameters
    params = parse_plot_parameters(mode=mode, normalization=normalization, theme=theme)
    plot_mode = params["mode"]
    norm_method = params["normalization"]
    theme_enum = params["theme"]

    # Collect read data - use multi-sample fetching if read_sample_map provided
    if read_sample_map:
        # Multi-sample mode: fetch reads from different samples
        from .io import get_reads_batch_multi_sample

        read_objs = get_reads_batch_multi_sample(read_sample_map)
    elif sample_name:
        # Single-sample mode: fetch from specified sample
        from .io import get_reads_batch

        read_objs = get_reads_batch(read_ids, sample_name=sample_name)
    else:
        # Legacy mode: fetch from global session
        from .io import get_reads_batch

        if squiggy_kernel._reader is None:
            raise ValueError("No POD5 file loaded. Call load_pod5() first.")
        read_objs = get_reads_batch(read_ids, sample_name=None)

    # Verify all reads were found
    missing = set(read_ids) - set(read_objs.keys())
    if missing:
        if len(missing) == 1:
            raise ValueError(f"Read not found: {list(missing)[0]}")
        else:
            raise ValueError(f"Reads not found: {list(missing)}")

    # Build reads_data list in original order
    reads_data = [
        (read_id, read_objs[read_id].signal, read_objs[read_id].run_info.sample_rate)
        for read_id in read_ids
    ]

    # Prepare data and options based on mode
    if plot_mode in (PlotMode.OVERLAY, PlotMode.STACKED):
        data = {"reads": reads_data}

        # If using sequence space, we need BAM alignments
        if coordinate_space == "sequence":
            # Determine which BAM file(s) to use
            if read_sample_map:
                # Multi-sample mode: each read may come from a different BAM
                from .alignment import extract_alignment_from_bam

                aligned_reads = []
                for read_id in read_ids:
                    sample_name_for_read = read_sample_map[read_id]
                    sample = squiggy_kernel.get_sample(sample_name_for_read)
                    if not sample or not sample._bam_path:
                        raise ValueError(
                            f"Sequence space requires BAM files. Sample '{sample_name_for_read}' has no BAM file loaded."
                        )
                    aligned_read = extract_alignment_from_bam(sample._bam_path, read_id)
                    if aligned_read is None:
                        raise ValueError(
                            f"No alignment found for read {read_id} in sample '{sample_name_for_read}'."
                        )
                    aligned_reads.append(aligned_read)
            elif sample_name:
                # Single-sample mode: use sample's BAM file
                from .alignment import extract_alignment_from_bam

                sample = squiggy_kernel.get_sample(sample_name)
                if not sample or not sample._bam_path:
                    raise ValueError(
                        f"Sequence space requires a BAM file. Sample '{sample_name}' has no BAM file loaded."
                    )
                aligned_reads = []
                for read_id in read_ids:
                    aligned_read = extract_alignment_from_bam(sample._bam_path, read_id)
                    if aligned_read is None:
                        raise ValueError(
                            f"No alignment found for read {read_id} in BAM file."
                        )
                    aligned_reads.append(aligned_read)
            else:
                # Legacy mode: use global BAM file
                from .alignment import extract_alignment_from_bam

                if squiggy_kernel._bam_path is None:
                    raise ValueError(
                        "Sequence space requires a BAM file. Call load_bam() first."
                    )
                aligned_reads = []
                for read_id in read_ids:
                    aligned_read = extract_alignment_from_bam(
                        squiggy_kernel._bam_path, read_id
                    )
                    if aligned_read is None:
                        raise ValueError(
                            f"No alignment found for read {read_id} in BAM file."
                        )
                    aligned_reads.append(aligned_read)

            # Add aligned reads to data
            data["aligned_reads"] = aligned_reads

        options = {
            "normalization": norm_method,
            "downsample": downsample,
            "show_signal_points": show_signal_points,
            "coordinate_space": coordinate_space,
        }
        # Add read colors if provided (for multi-sample coloring)
        if read_colors:
            options["read_colors"] = read_colors

    elif plot_mode == PlotMode.EVENTALIGN:
        # Event-aligned mode for multiple reads
        # Determine which BAM file to use
        if sample_name:
            # Multi-sample mode: use sample's BAM file
            sample = squiggy_kernel.get_sample(sample_name)
            if not sample or not sample._bam_path:
                raise ValueError(
                    f"EVENTALIGN mode requires a BAM file. Sample '{sample_name}' has no BAM file loaded."
                )
            bam_path = sample._bam_path
        else:
            # Single-file mode: use global BAM file
            if squiggy_kernel._bam_path is None:
                raise ValueError(
                    "EVENTALIGN mode requires a BAM file. Call load_bam() first."
                )
            bam_path = squiggy_kernel._bam_path

        from .alignment import extract_alignment_from_bam

        aligned_reads = []
        for read_id in read_ids:
            aligned_read = extract_alignment_from_bam(bam_path, read_id)
            if aligned_read is None:
                raise ValueError(f"No alignment found for read {read_id} in BAM file.")
            aligned_reads.append(aligned_read)

        data = {
            "reads": reads_data,
            "aligned_reads": aligned_reads,
        }

        options = {
            "normalization": norm_method,
            "downsample": downsample,
            "show_dwell_time": scale_dwell_time,
            "show_labels": show_labels,
            "show_signal_points": show_signal_points,
        }

    else:
        raise ValueError(
            f"Plot mode {plot_mode} not supported for multiple reads. "
            f"Use OVERLAY, STACKED, or EVENTALIGN."
        )

    # Create strategy and generate plot
    strategy = create_plot_strategy(plot_mode, theme_enum)
    html, fig = strategy.create_plot(data, options)

    # Route to Positron Plots pane if running in Positron
    _route_to_plots_pane(fig)

    return html

plot_aggregate

plot_aggregate(
    reference_name: str,
    max_reads: int = 100,
    normalization: str = "ZNORM",
    theme: str = "LIGHT",
    show_modifications: bool = True,
    mod_filter: dict | None = None,
    min_mod_frequency: float = 0.0,
    min_modified_reads: int = 1,
    show_pileup: bool = True,
    show_dwell_time: bool = True,
    show_signal: bool = True,
    show_quality: bool = True,
    clip_x_to_alignment: bool = True,
    transform_coordinates: bool = True,
    sample_name: str | None = None,
) -> str

Generate aggregate multi-read visualization for a reference sequence

Creates up to five synchronized tracks: 1. Modifications heatmap (optional, if BAM has MM/ML tags) 2. Base pileup (IGV-style stacked bar chart) 3. Dwell time per base (mean ± std dev) 4. Aggregate signal (mean ± std dev across reads) 5. Quality scores by position

Parameters:

Name Type Description Default
reference_name str

Name of reference sequence from BAM file

required
max_reads int

Maximum number of reads to sample for aggregation (default 100)

100
normalization str

Normalization method (NONE, ZNORM, MEDIAN, MAD)

'ZNORM'
theme str

Color theme (LIGHT, DARK)

'LIGHT'
show_modifications bool

Show modifications heatmap panel (default True)

True
mod_filter dict | None

Dictionary mapping modification codes to minimum probability thresholds (e.g., {'m': 0.8, 'a': 0.7}). If None, all modifications shown.

None
min_mod_frequency float

Minimum fraction of reads that must be modified at a position (0.0-1.0). Positions with lower modification frequency are excluded (default 0.0).

0.0
min_modified_reads int

Minimum number of reads that must have the modification at a position. Positions with fewer modified reads are excluded (default 1).

1
show_pileup bool

Show base pileup panel (default True)

True
show_dwell_time bool

Show dwell time panel (default True)

True
show_signal bool

Show signal panel (default True)

True
show_quality bool

Show quality panel (default True)

True
clip_x_to_alignment bool

If True, x-axis shows only aligned region (default True). If False, x-axis extends to include soft-clipped regions.

True
transform_coordinates bool

If True, transform to 1-based coordinates anchored to first reference base (default True). If False, use raw genomic coordinates.

True
sample_name str | None

(Multi-sample mode) Name of the sample to plot from. If provided, plots from that specific sample instead of the global session.

None

Returns:

Type Description
str

Bokeh HTML string with synchronized tracks

Examples:

>>> import squiggy
>>> squiggy.load_pod5('data.pod5')
>>> squiggy.load_bam('alignments.bam')
>>> html = squiggy.plot_aggregate('chr1', max_reads=50)
>>> # Filter modifications by type and probability
>>> html = squiggy.plot_aggregate('chr1', mod_filter={'m': 0.8, 'a': 0.7})
>>> # Extension displays this automatically

Raises:

Type Description
ValueError

If POD5 or BAM files not loaded

Source code in squiggy/plotting.py
def plot_aggregate(
    reference_name: str,
    max_reads: int = 100,
    normalization: str = "ZNORM",
    theme: str = "LIGHT",
    show_modifications: bool = True,
    mod_filter: dict | None = None,
    min_mod_frequency: float = 0.0,
    min_modified_reads: int = 1,
    show_pileup: bool = True,
    show_dwell_time: bool = True,
    show_signal: bool = True,
    show_quality: bool = True,
    clip_x_to_alignment: bool = True,
    transform_coordinates: bool = True,
    sample_name: str | None = None,
) -> str:
    """
    Generate aggregate multi-read visualization for a reference sequence

    Creates up to five synchronized tracks:
    1. Modifications heatmap (optional, if BAM has MM/ML tags)
    2. Base pileup (IGV-style stacked bar chart)
    3. Dwell time per base (mean ± std dev)
    4. Aggregate signal (mean ± std dev across reads)
    5. Quality scores by position

    Args:
        reference_name: Name of reference sequence from BAM file
        max_reads: Maximum number of reads to sample for aggregation (default 100)
        normalization: Normalization method (NONE, ZNORM, MEDIAN, MAD)
        theme: Color theme (LIGHT, DARK)
        show_modifications: Show modifications heatmap panel (default True)
        mod_filter: Dictionary mapping modification codes to minimum probability thresholds
                   (e.g., {'m': 0.8, 'a': 0.7}). If None, all modifications shown.
        min_mod_frequency: Minimum fraction of reads that must be modified at a position (0.0-1.0).
                          Positions with lower modification frequency are excluded (default 0.0).
        min_modified_reads: Minimum number of reads that must have the modification at a position.
                           Positions with fewer modified reads are excluded (default 1).
        show_pileup: Show base pileup panel (default True)
        show_dwell_time: Show dwell time panel (default True)
        show_signal: Show signal panel (default True)
        show_quality: Show quality panel (default True)
        clip_x_to_alignment: If True, x-axis shows only aligned region (default True).
                             If False, x-axis extends to include soft-clipped regions.
        transform_coordinates: If True, transform to 1-based coordinates anchored to first
                               reference base (default True). If False, use raw genomic coordinates.
        sample_name: (Multi-sample mode) Name of the sample to plot from. If provided,
                     plots from that specific sample instead of the global session.

    Returns:
        Bokeh HTML string with synchronized tracks

    Examples:
        >>> import squiggy
        >>> squiggy.load_pod5('data.pod5')
        >>> squiggy.load_bam('alignments.bam')
        >>> html = squiggy.plot_aggregate('chr1', max_reads=50)
        >>> # Filter modifications by type and probability
        >>> html = squiggy.plot_aggregate('chr1', mod_filter={'m': 0.8, 'a': 0.7})
        >>> # Extension displays this automatically

    Raises:
        ValueError: If POD5 or BAM files not loaded
    """

    # Determine which sample to use
    if sample_name:
        # Multi-sample mode: use sample-specific paths
        sample = squiggy_kernel.get_sample(sample_name)
        if not sample or sample._pod5_path is None:
            raise ValueError(f"Sample '{sample_name}' not loaded or has no POD5 file.")
        if not sample._bam_path:
            raise ValueError(
                f"Sample '{sample_name}' has no BAM file loaded. Aggregate plots require alignments."
            )
        pod5_path = sample._pod5_path
        bam_path = sample._bam_path
        fasta_path = sample._fasta_path
    else:
        # Single-file mode: use global session paths
        if squiggy_kernel._reader is None:
            raise ValueError("No POD5 file loaded. Call load_pod5() first.")
        if squiggy_kernel._bam_path is None:
            raise ValueError(
                "No BAM file loaded. Aggregate plots require alignments. Call load_bam() first."
            )
        pod5_path = squiggy_kernel._pod5_path
        bam_path = squiggy_kernel._bam_path
        fasta_path = squiggy_kernel._fasta_path

    # Parse parameters
    params = parse_plot_parameters(normalization=normalization, theme=theme)
    norm_method = params["normalization"]
    theme_enum = params["theme"]

    # Extract reads for this reference (expects file paths, not reader objects)
    reads_data = extract_reads_for_reference(
        pod5_file=pod5_path,
        bam_file=bam_path,
        reference_name=reference_name,
        max_reads=max_reads,
    )

    if not reads_data:
        raise ValueError(
            f"No reads found for reference '{reference_name}'. Check BAM file and reference name."
        )

    num_reads = len(reads_data)

    # Calculate aggregate statistics
    aggregate_stats = calculate_aggregate_signal(reads_data, norm_method)
    pileup_stats = calculate_base_pileup(
        reads_data,
        bam_file=bam_path,
        reference_name=reference_name,
        fasta_file=fasta_path,
    )
    quality_stats = calculate_quality_by_position(reads_data)
    modification_stats = calculate_modification_statistics(
        reads_data,
        mod_filter=mod_filter,
        min_frequency=min_mod_frequency,
        min_modified_reads=min_modified_reads,
    )
    dwell_stats = calculate_dwell_time_statistics(reads_data)

    # Convert to relative coordinates (1-based) for intuitive visualization

    # Store diagnostic info for plot title
    transformation_info = ""

    if transform_coordinates:
        # Anchor to first base of reference sequence (from pileup reference_bases)
        # This ensures x-axis position 1 = first base of the reference sequence
        reference_bases = pileup_stats.get("reference_bases", {})

        if reference_bases:
            # Use first position from reference_bases as anchor
            min_pos = min(reference_bases.keys())
        else:
            # Fallback: find minimum position across all tracks
            all_positions = []
            if "positions" in aggregate_stats and len(aggregate_stats["positions"]) > 0:
                all_positions.extend(list(aggregate_stats["positions"]))
            if "positions" in pileup_stats and len(pileup_stats["positions"]) > 0:
                all_positions.extend(list(pileup_stats["positions"]))
            if "positions" in quality_stats and len(quality_stats["positions"]) > 0:
                all_positions.extend(list(quality_stats["positions"]))

            if not all_positions:
                min_pos = None
            else:
                min_pos = int(np.min(all_positions))

        if min_pos is not None:
            offset = min_pos - 1  # Offset to make positions 1-based

            transformation_info = f"Ref-anchored (genomic pos {min_pos}→1)"

            # Transform aggregate_stats
            if "positions" in aggregate_stats and len(aggregate_stats["positions"]) > 0:
                old = list(aggregate_stats["positions"])
                aggregate_stats["positions"] = np.array([int(p) - offset for p in old])

            # Transform pileup_stats
            if "positions" in pileup_stats and len(pileup_stats["positions"]) > 0:
                old_positions = list(pileup_stats["positions"])
                new_positions = np.array([int(p) - offset for p in old_positions])
                pileup_stats["positions"] = new_positions

                # Remap counts dict - ensure consistent int types
                old_counts = pileup_stats["counts"]
                new_counts = {}
                for p in old_positions:
                    new_p = int(p) - offset
                    # Use int() to ensure Python int type for dictionary keys
                    new_counts[int(new_p)] = old_counts[int(p)]
                pileup_stats["counts"] = new_counts

                # Remap reference_bases dict
                if "reference_bases" in pileup_stats:
                    old_ref = pileup_stats["reference_bases"]
                    new_ref = {}
                    for p in old_positions:
                        old_p = int(p)
                        if old_p in old_ref:
                            new_p = int(old_p - offset)
                            new_ref[new_p] = old_ref[old_p]
                    pileup_stats["reference_bases"] = new_ref

            # Transform quality_stats
            if "positions" in quality_stats and len(quality_stats["positions"]) > 0:
                old = list(quality_stats["positions"])
                quality_stats["positions"] = np.array([int(p) - offset for p in old])

            # Transform modification_stats
            if modification_stats and modification_stats.get("mod_stats"):
                mod_stats = modification_stats["mod_stats"]
                new_mod_stats = {}
                for mod_code, pos_dict in mod_stats.items():
                    new_mod_stats[mod_code] = {}
                    for p, stats in pos_dict.items():
                        new_p = int(int(p) - offset)
                        new_mod_stats[mod_code][new_p] = stats
                modification_stats["mod_stats"] = new_mod_stats

                if "positions" in modification_stats:
                    old = modification_stats["positions"]
                    modification_stats["positions"] = [int(p) - offset for p in old]

            # Transform dwell_stats
            if (
                dwell_stats
                and "positions" in dwell_stats
                and len(dwell_stats["positions"]) > 0
            ):
                old = list(dwell_stats["positions"])
                dwell_stats["positions"] = np.array([int(p) - offset for p in old])

    # Prepare data for AggregatePlotStrategy
    data = {
        "aggregate_stats": aggregate_stats,
        "pileup_stats": pileup_stats,
        "quality_stats": quality_stats,
        "modification_stats": modification_stats,
        "dwell_stats": dwell_stats,
        "reference_name": reference_name,
        "num_reads": num_reads,
        "transformation_info": transformation_info,  # Diagnostic info
    }

    options = {
        "normalization": norm_method,
        "show_modifications": show_modifications,
        "show_pileup": show_pileup,
        "show_dwell_time": show_dwell_time,
        "show_signal": show_signal,
        "show_quality": show_quality,
        "clip_x_to_alignment": clip_x_to_alignment,
    }

    # Create strategy and generate plot
    strategy = create_plot_strategy(PlotMode.AGGREGATE, theme_enum)
    html, grid = strategy.create_plot(data, options)

    # Route to Positron Plots pane if running in Positron
    _route_to_plots_pane(grid)

    return html

Multi-Sample Plotting

plot_motif_aggregate_all

plot_motif_aggregate_all(
    fasta_file: str,
    motif: str,
    upstream: int = None,
    downstream: int = None,
    max_reads_per_motif: int = 100,
    normalization: str = "ZNORM",
    theme: str = "LIGHT",
    strand: str = "both",
) -> str

Generate aggregate multi-read visualization across ALL motif matches

Creates a three-track plot showing aggregate statistics from reads aligned to ALL instances of the motif in the genome. The x-axis is centered on the motif position with configurable upstream/downstream windows.

This function combines reads from all motif matches into one aggregate view, providing a genome-wide perspective on signal patterns around the motif.

Parameters:

Name Type Description Default
fasta_file str

Path to indexed FASTA file (.fai required)

required
motif str

IUPAC nucleotide pattern (e.g., "DRACH", "YGCY")

required
upstream int

Number of bases upstream (5') of motif center (default=10)

None
downstream int

Number of bases downstream (3') of motif center (default=10)

None
max_reads_per_motif int

Maximum reads per motif match (default=100)

100
normalization str

Normalization method (NONE, ZNORM, MEDIAN, MAD)

'ZNORM'
theme str

Color theme (LIGHT, DARK)

'LIGHT'
strand str

Which strand to search ('+', '-', or 'both')

'both'

Returns:

Type Description
str

Bokeh HTML string with three synchronized tracks showing aggregate

str

statistics across all motif instances

Examples:

>>> import squiggy
>>> squiggy.load_pod5('data.pod5')
>>> squiggy.load_bam('alignments.bam')
>>> html = squiggy.plot_motif_aggregate_all(
...     fasta_file='genome.fa',
...     motif='DRACH',
...     upstream=20,
...     downstream=50
... )
>>> # Extension displays this automatically

Raises:

Type Description
ValueError

If POD5/BAM not loaded or no motif matches found

Source code in squiggy/plotting.py
def plot_motif_aggregate_all(
    fasta_file: str,
    motif: str,
    upstream: int = None,
    downstream: int = None,
    max_reads_per_motif: int = 100,
    normalization: str = "ZNORM",
    theme: str = "LIGHT",
    strand: str = "both",
) -> str:
    """
    Generate aggregate multi-read visualization across ALL motif matches

    Creates a three-track plot showing aggregate statistics from reads aligned
    to ALL instances of the motif in the genome. The x-axis is centered on the
    motif position with configurable upstream/downstream windows.

    This function combines reads from all motif matches into one aggregate view,
    providing a genome-wide perspective on signal patterns around the motif.

    Args:
        fasta_file: Path to indexed FASTA file (.fai required)
        motif: IUPAC nucleotide pattern (e.g., "DRACH", "YGCY")
        upstream: Number of bases upstream (5') of motif center (default=10)
        downstream: Number of bases downstream (3') of motif center (default=10)
        max_reads_per_motif: Maximum reads per motif match (default=100)
        normalization: Normalization method (NONE, ZNORM, MEDIAN, MAD)
        theme: Color theme (LIGHT, DARK)
        strand: Which strand to search ('+', '-', or 'both')

    Returns:
        Bokeh HTML string with three synchronized tracks showing aggregate
        statistics across all motif instances

    Examples:
        >>> import squiggy
        >>> squiggy.load_pod5('data.pod5')
        >>> squiggy.load_bam('alignments.bam')
        >>> html = squiggy.plot_motif_aggregate_all(
        ...     fasta_file='genome.fa',
        ...     motif='DRACH',
        ...     upstream=20,
        ...     downstream=50
        ... )
        >>> # Extension displays this automatically

    Raises:
        ValueError: If POD5/BAM not loaded or no motif matches found
    """

    # Apply defaults if not specified
    if upstream is None:
        upstream = DEFAULT_MOTIF_WINDOW_UPSTREAM
    if downstream is None:
        downstream = DEFAULT_MOTIF_WINDOW_DOWNSTREAM

    # Validate state
    if squiggy_kernel._reader is None:
        raise ValueError("No POD5 file loaded. Call load_pod5() first.")
    if squiggy_kernel._bam_path is None:
        raise ValueError(
            "No BAM file loaded. Motif aggregate plots require alignments. "
            "Call load_bam() first."
        )

    # Parse parameters
    params = parse_plot_parameters(normalization=normalization, theme=theme)
    norm_method = params["normalization"]
    theme_enum = params["theme"]

    # Search for all motif matches
    matches = list(search_motif(fasta_file, motif, strand=strand))

    if not matches:
        raise ValueError(f"No matches found for motif '{motif}' in FASTA file")

    # Extract and align reads from all motif matches
    all_aligned_reads = []
    num_matches_with_reads = 0

    for match_index, _motif_match in enumerate(matches):
        try:
            # Extract reads for this motif match
            reads_data, _ = extract_reads_for_motif(
                pod5_file=squiggy_kernel._pod5_path,
                bam_file=squiggy_kernel._bam_path,
                fasta_file=fasta_file,
                motif=motif,
                match_index=match_index,
                upstream=upstream,
                downstream=downstream,
                max_reads=max_reads_per_motif,
            )

            if reads_data:
                # Reads are already clipped and in motif-relative coordinates from extract_reads_for_motif()
                all_aligned_reads.extend(reads_data)
                num_matches_with_reads += 1

        except Exception:
            # Skip motif matches that fail (e.g., no reads, edge of chromosome)
            continue

    if not all_aligned_reads:
        raise ValueError(
            f"No reads found overlapping any of {len(matches)} motif matches. "
            "Try a different motif or increase window size."
        )

    num_reads = len(all_aligned_reads)

    # Reads are already clipped to the window in extract_reads_for_motif()
    # Calculate aggregate statistics across all reads
    aggregate_stats = calculate_aggregate_signal(all_aligned_reads, norm_method)

    # Calculate base pileup across all reads
    pileup_stats = calculate_base_pileup(
        all_aligned_reads,
        bam_file=None,  # Reads are in motif-relative coordinates
        reference_name=None,
        fasta_file=fasta_file,  # Use FASTA for accurate reference sequence
    )

    # Add motif sequence as reference bases for display
    # Center the motif in the coordinate system
    motif_length = len(motif)
    motif_center = 0  # Motif is centered at position 0 in motif-relative coordinates
    motif_start = motif_center - motif_length // 2

    # Create reference_bases dict mapping positions to motif letters
    reference_bases = {}
    for i, base in enumerate(motif.upper()):
        pos = motif_start + i
        reference_bases[pos] = base

    # Add to pileup_stats
    pileup_stats["reference_bases"] = reference_bases

    quality_stats = calculate_quality_by_position(all_aligned_reads)

    # Generate plot with informative title
    plot_title = (
        f"{motif} motif aggregate ({num_matches_with_reads}/{len(matches)} matches, "
        f"{num_reads} reads, -{upstream}bp to +{downstream}bp)"
    )

    # Prepare data for AggregatePlotStrategy
    data = {
        "aggregate_stats": aggregate_stats,
        "pileup_stats": pileup_stats,
        "quality_stats": quality_stats,
        "reference_name": plot_title,
        "num_reads": num_reads,
    }

    # Highlight motif positions (make them bold)
    motif_positions_set = set(range(motif_start, motif_start + motif_length))

    options = {"normalization": norm_method, "motif_positions": motif_positions_set}

    # Create strategy and generate plot
    strategy = create_plot_strategy(PlotMode.AGGREGATE, theme_enum)
    html, grid = strategy.create_plot(data, options)

    # Update x-axis title to reflect motif-relative coordinates
    # The aggregate plot strategy already handles the coordinate system,
    # but we can add a note about the window in the title

    # Route to Positron Plots pane if running in Positron
    _route_to_plots_pane(grid)

    return html

plot_signal_overlay_comparison

plot_signal_overlay_comparison(
    sample_names: list[str],
    reference_name: str | None = None,
    normalization: str = "ZNORM",
    theme: str = "LIGHT",
    max_reads: int | None = None,
) -> str

Generate signal overlay comparison plot for multiple samples

Creates a visualization overlaying raw signals from 2+ samples, each with distinct color from Okabe-Ito palette. Includes: 1. Signal Overlay Track: All sample signals overlaid with color per sample 2. Coverage Track: Read count per position for each sample 3. Reference Display: Nucleotide sequence annotations below signal track

Parameters:

Name Type Description Default
sample_names list[str]

List of sample names to compare (minimum 2 required)

required
reference_name str | None

Optional reference name (auto-detected from first sample's BAM)

None
normalization str

Normalization method (NONE, ZNORM, MEDIAN, MAD) - default ZNORM

'ZNORM'
theme str

Color theme (LIGHT, DARK)

'LIGHT'
max_reads int | None

Maximum reads per sample to load (default: min of available reads, capped at 100)

None

Returns:

Type Description
str

Bokeh HTML string with signal overlay comparison visualization

Examples:

>>> import squiggy
>>> squiggy.load_sample('alanine', 'ala_subset.pod5', 'ala_subset.aln.bam')
>>> squiggy.load_sample('arginine', 'arg_subset.pod5', 'arg_subset.aln.bam')
>>> html = squiggy.plot_signal_overlay_comparison(
...     ['alanine', 'arginine'],
...     normalization='ZNORM'
... )
>>> # Extension displays this automatically

Raises:

Type Description
ValueError

If fewer than 2 samples provided, samples not found, or missing BAM files

Source code in squiggy/plotting.py
def plot_signal_overlay_comparison(
    sample_names: list[str],
    reference_name: str | None = None,
    normalization: str = "ZNORM",
    theme: str = "LIGHT",
    max_reads: int | None = None,
) -> str:
    """
    Generate signal overlay comparison plot for multiple samples

    Creates a visualization overlaying raw signals from 2+ samples, each with
    distinct color from Okabe-Ito palette. Includes:
    1. Signal Overlay Track: All sample signals overlaid with color per sample
    2. Coverage Track: Read count per position for each sample
    3. Reference Display: Nucleotide sequence annotations below signal track

    Args:
        sample_names: List of sample names to compare (minimum 2 required)
        reference_name: Optional reference name (auto-detected from first sample's BAM)
        normalization: Normalization method (NONE, ZNORM, MEDIAN, MAD) - default ZNORM
        theme: Color theme (LIGHT, DARK)
        max_reads: Maximum reads per sample to load (default: min of available reads, capped at 100)

    Returns:
        Bokeh HTML string with signal overlay comparison visualization

    Examples:
        >>> import squiggy
        >>> squiggy.load_sample('alanine', 'ala_subset.pod5', 'ala_subset.aln.bam')
        >>> squiggy.load_sample('arginine', 'arg_subset.pod5', 'arg_subset.aln.bam')
        >>> html = squiggy.plot_signal_overlay_comparison(
        ...     ['alanine', 'arginine'],
        ...     normalization='ZNORM'
        ... )
        >>> # Extension displays this automatically

    Raises:
        ValueError: If fewer than 2 samples provided, samples not found, or missing BAM files
    """

    # Validate input
    if len(sample_names) < 2:
        raise ValueError(
            f"Signal overlay comparison requires at least 2 samples, got {len(sample_names)}"
        )

    # Get samples
    samples = []
    for name in sample_names:
        sample = squiggy_kernel.get_sample(name)
        if sample is None:
            raise ValueError(f"Sample '{name}' not found")
        samples.append(sample)

    # Validate all samples have POD5 and BAM loaded
    for sample in samples:
        if sample._pod5_reader is None:
            raise ValueError(f"Sample '{sample.name}' must have POD5 file loaded")
        if sample._bam_path is None:
            raise ValueError(
                f"Sample '{sample.name}' must have BAM file loaded. "
                "BAM files are required to align signals to reference positions."
            )

    # Parse parameters
    params = parse_plot_parameters(normalization=normalization, theme=theme)
    norm_method = params["normalization"]
    theme_enum = params["theme"]

    # Determine reference name
    if reference_name is None:
        # Get first reference from first sample's BAM
        if not samples[0]._bam_info or "references" not in samples[0]._bam_info:
            raise ValueError("BAM file must contain reference information")

        references = samples[0]._bam_info["references"]
        if not references:
            raise ValueError("No references found in BAM file")

        reference_name = references[0]["name"]

    # Validate all samples have same reference (Issue #121)
    samples_missing_reference = []
    for sample in samples:
        if not sample._bam_info or "references" not in sample._bam_info:
            samples_missing_reference.append((sample.name, "no BAM metadata"))
            continue

        refs = sample._bam_info["references"]
        ref_names = [r["name"] for r in refs]

        if reference_name not in ref_names:
            available_refs = ", ".join(ref_names[:3])
            if len(ref_names) > 3:
                available_refs += f", ... ({len(ref_names)} total)"
            samples_missing_reference.append(
                (sample.name, available_refs or "no references")
            )

    # If any samples are missing the reference, raise a clear error
    if samples_missing_reference:
        error_lines = [
            f"Cannot plot signal overlay comparison: Some samples do not have reads aligned to reference '{reference_name}':",
            "",
        ]
        for sample_name, available in samples_missing_reference:
            error_lines.append(f"  • Sample '{sample_name}': has {available}")

        error_lines.extend(
            [
                "",
                "Suggestion: Check which references each sample has in the Samples panel,",
                "or ensure all samples are aligned to the same reference genome before loading.",
            ]
        )
        raise ValueError("\n".join(error_lines))

    # Determine max_reads if not provided
    if max_reads is None:
        # Calculate available reads per sample and use minimum

        available_reads_per_sample = []
        for sample in samples:
            try:
                available = get_available_reads_for_reference(
                    bam_file=sample._bam_path,
                    reference_name=reference_name,
                )
                available_reads_per_sample.append(available)
            except Exception:
                available_reads_per_sample.append(100)  # Fallback

        # Use minimum available, capped at 100
        max_reads = (
            min(available_reads_per_sample) if available_reads_per_sample else 100
        )
        max_reads = min(max_reads, 100)  # Cap at 100 for performance

    # Extract aligned reads for each sample
    plot_data = []
    coverage_data = {}

    for sample in samples:
        reads = extract_reads_for_reference(
            pod5_file=sample._pod5_path,
            bam_file=sample._bam_path,
            reference_name=reference_name,
            max_reads=max_reads,
            random_sample=True,
        )

        if not reads:
            raise ValueError(
                f"No reads found for sample '{sample.name}' on reference '{reference_name}'"
            )

        # Calculate aggregate signal and coverage for this sample

        agg_stats = calculate_aggregate_signal(reads, norm_method)

        plot_data.append(
            {
                "name": sample.name,
                "positions": agg_stats.get(
                    "positions", np.arange(len(agg_stats.get("mean_signal", [])))
                ),
                "signal": agg_stats.get("mean_signal", np.array([])),
            }
        )

        coverage_data[sample.name] = agg_stats.get(
            "coverage", [1] * len(agg_stats.get("mean_signal", []))
        )

    # Get reference sequence (FASTA-first pattern)
    reference_sequence = ""
    if plot_data:
        # Determine genomic region from plot data
        all_positions = []
        for data in plot_data:
            positions = data.get("positions", [])
            if len(positions) > 0:
                all_positions.extend(positions)

        if all_positions:
            min_pos = int(min(all_positions))
            max_pos = int(max(all_positions))

            # Try FASTA first (most accurate and complete)
            first_sample = samples[0]
            if first_sample._fasta_path:
                try:
                    import pysam

                    fasta = pysam.FastaFile(first_sample._fasta_path)
                    reference_sequence = fasta.fetch(
                        reference_name, min_pos, max_pos + 1
                    )
                    fasta.close()
                except Exception:
                    # FASTA fetch failed, will fall back to BAM
                    pass

            # Fallback to BAM reconstruction if FASTA unavailable
            if not reference_sequence:
                try:
                    reads = extract_reads_for_reference(
                        pod5_file=first_sample._pod5_path,
                        bam_file=first_sample._bam_path,
                        reference_name=reference_name,
                        max_reads=1,
                    )
                    if reads:
                        reference_sequence = (
                            reads[0].get("reference_sequence", "") or ""
                        )
                except Exception:
                    # If we can't get reference sequence, continue without it
                    pass

    # Prepare data for plot strategy
    data = {
        "samples": plot_data,
        "reference_sequence": reference_sequence,
        "coverage": coverage_data,
    }

    options = {"normalization": norm_method}

    # Create strategy and generate plot
    strategy = create_plot_strategy(PlotMode.SIGNAL_OVERLAY_COMPARISON, theme_enum)
    html, grid = strategy.create_plot(data, options)

    # Route to Positron Plots pane if running in Positron
    _route_to_plots_pane(grid)

    return html

plot_delta_comparison

plot_delta_comparison(
    sample_names: list[str],
    reference_name: str = "Default",
    normalization: str = "NONE",
    theme: str = "LIGHT",
    max_reads: int | None = None,
) -> str

Generate delta comparison plot between two or more samples

Creates a visualization showing differences in aggregate statistics between samples. Shows: 1. Delta Signal Track: Mean signal differences (B - A) 2. Delta Stats Track: Coverage comparisons

Parameters:

Name Type Description Default
sample_names list[str]

List of sample names to compare (minimum 2 required)

required
reference_name str

Optional reference name for plot title (default: "Default")

'Default'
normalization str

Normalization method (NONE, ZNORM, MEDIAN, MAD)

'NONE'
theme str

Color theme (LIGHT, DARK)

'LIGHT'
max_reads int | None

Maximum reads per sample to load (default: min of available reads, capped at 100)

None

Returns:

Type Description
str

Bokeh HTML string with delta comparison visualization

Examples:

>>> import squiggy
>>> squiggy.load_sample('v4.2', 'data_v4.2.pod5', 'align_v4.2.bam')
>>> squiggy.load_sample('v5.0', 'data_v5.0.pod5', 'align_v5.0.bam')
>>> html = squiggy.plot_delta_comparison(['v4.2', 'v5.0'])
>>> # Extension displays this automatically

Raises:

Type Description
ValueError

If fewer than 2 samples provided or samples not found

Source code in squiggy/plotting.py
def plot_delta_comparison(
    sample_names: list[str],
    reference_name: str = "Default",
    normalization: str = "NONE",
    theme: str = "LIGHT",
    max_reads: int | None = None,
) -> str:
    """
    Generate delta comparison plot between two or more samples

    Creates a visualization showing differences in aggregate statistics
    between samples. Shows:
    1. Delta Signal Track: Mean signal differences (B - A)
    2. Delta Stats Track: Coverage comparisons

    Args:
        sample_names: List of sample names to compare (minimum 2 required)
        reference_name: Optional reference name for plot title (default: "Default")
        normalization: Normalization method (NONE, ZNORM, MEDIAN, MAD)
        theme: Color theme (LIGHT, DARK)
        max_reads: Maximum reads per sample to load (default: min of available reads, capped at 100)

    Returns:
        Bokeh HTML string with delta comparison visualization

    Examples:
        >>> import squiggy
        >>> squiggy.load_sample('v4.2', 'data_v4.2.pod5', 'align_v4.2.bam')
        >>> squiggy.load_sample('v5.0', 'data_v5.0.pod5', 'align_v5.0.bam')
        >>> html = squiggy.plot_delta_comparison(['v4.2', 'v5.0'])
        >>> # Extension displays this automatically

    Raises:
        ValueError: If fewer than 2 samples provided or samples not found
    """

    # Validate input
    if len(sample_names) < 2:
        raise ValueError("Delta comparison requires at least 2 samples")

    # Get samples
    samples = []
    for name in sample_names:
        sample = squiggy_kernel.get_sample(name)
        if sample is None:
            raise ValueError(f"Sample '{name}' not found")
        samples.append(sample)

    # For now, use first two samples for comparison (A vs B)
    sample_a = samples[0]
    sample_b = samples[1]

    # Validate both samples have POD5 and BAM loaded
    if sample_a._pod5_reader is None or sample_b._pod5_reader is None:
        raise ValueError("Both samples must have POD5 files loaded")

    if sample_a._bam_path is None or sample_b._bam_path is None:
        raise ValueError(
            "Both samples must have BAM files loaded for delta comparison. "
            "BAM files are required to align signals to reference positions."
        )

    # Parse parameters
    params = parse_plot_parameters(normalization=normalization, theme=theme)
    norm_method = params["normalization"]
    theme_enum = params["theme"]

    # Get first reference from sample A's BAM
    # (assumes both samples have the same reference genome)
    if not sample_a._bam_info or "references" not in sample_a._bam_info:
        raise ValueError("BAM file must be loaded with reference information")

    references = sample_a._bam_info["references"]
    if not references:
        raise ValueError("No references found in BAM file")

    reference_name = references[0]["name"]

    # Determine max_reads if not provided
    if max_reads is None:
        # Calculate available reads per sample and use minimum

        available_reads_per_sample = []
        for sample in [sample_a, sample_b]:
            try:
                available = get_available_reads_for_reference(
                    bam_file=sample._bam_path,
                    reference_name=reference_name,
                )
                available_reads_per_sample.append(available)
            except Exception:
                available_reads_per_sample.append(100)  # Fallback

        # Use minimum available, capped at 100
        max_reads = (
            min(available_reads_per_sample) if available_reads_per_sample else 100
        )
        max_reads = min(max_reads, 100)  # Cap at 100 for performance

    # Extract aligned reads from both samples using the proper utility function

    reads_a = extract_reads_for_reference(
        pod5_file=sample_a._pod5_path,
        bam_file=sample_a._bam_path,
        reference_name=reference_name,
        max_reads=max_reads,
        random_sample=True,
    )

    reads_b = extract_reads_for_reference(
        pod5_file=sample_b._pod5_path,
        bam_file=sample_b._bam_path,
        reference_name=reference_name,
        max_reads=max_reads,
        random_sample=True,
    )

    if not reads_a:
        raise ValueError(f"No reads found for sample A on reference '{reference_name}'")
    if not reads_b:
        raise ValueError(f"No reads found for sample B on reference '{reference_name}'")

    # Calculate aggregate statistics for both samples
    stats_a = calculate_aggregate_signal(reads_a, norm_method)
    stats_b = calculate_aggregate_signal(reads_b, norm_method)

    # Calculate deltas
    delta_stats = calculate_delta_stats(stats_a, stats_b)

    # Prepare data for DeltaPlotStrategy
    # Use positions from delta_stats (already truncated to match delta arrays)
    positions = delta_stats.get("positions")
    if positions is None:
        # Fallback: create position array matching delta length
        delta_signal = delta_stats.get("delta_mean_signal", [])
        positions = np.arange(len(delta_signal))

    data = {
        "positions": positions,
        "delta_mean_signal": delta_stats.get("delta_mean_signal", np.array([])),
        "delta_std_signal": delta_stats.get("delta_std_signal", np.array([])),
        "sample_a_name": sample_a.name,
        "sample_b_name": sample_b.name,
        "sample_a_coverage": stats_a.get("coverage", [1] * len(positions)),
        "sample_b_coverage": stats_b.get("coverage", [1] * len(positions)),
    }

    options = {"normalization": norm_method}

    # Create strategy and generate plot
    strategy = create_plot_strategy(PlotMode.DELTA, theme_enum)
    html, grid = strategy.create_plot(data, options)

    # Route to Positron Plots pane if running in Positron
    _route_to_plots_pane(grid)

    return html

Multi-Sample Management

load_sample

load_sample(
    name: str,
    pod5_path: str,
    bam_path: str | None = None,
    fasta_path: str | None = None,
) -> Sample

Load a POD5/BAM/FASTA sample set into the global session

Convenience function that loads a named sample into the global squiggy_kernel.

Parameters:

Name Type Description Default
name str

Unique identifier for this sample (e.g., 'model_v4.2')

required
pod5_path str

Path to POD5 file

required
bam_path str | None

Path to BAM file (optional)

None
fasta_path str | None

Path to FASTA file (optional)

None

Returns:

Type Description
Sample

The created Sample object

Examples:

>>> from squiggy import load_sample
>>> sample = load_sample('v4.2', 'data_v4.2.pod5', 'align_v4.2.bam')
>>> print(f"Loaded {len(sample._read_ids)} reads")
Source code in squiggy/io.py
def load_sample(
    name: str,
    pod5_path: str,
    bam_path: str | None = None,
    fasta_path: str | None = None,
) -> Sample:
    """
    Load a POD5/BAM/FASTA sample set into the global session

    Convenience function that loads a named sample into the global squiggy_kernel.

    Args:
        name: Unique identifier for this sample (e.g., 'model_v4.2')
        pod5_path: Path to POD5 file
        bam_path: Path to BAM file (optional)
        fasta_path: Path to FASTA file (optional)

    Returns:
        The created Sample object

    Examples:
        >>> from squiggy import load_sample
        >>> sample = load_sample('v4.2', 'data_v4.2.pod5', 'align_v4.2.bam')
        >>> print(f"Loaded {len(sample._read_ids)} reads")
    """
    return squiggy_kernel.load_sample(name, pod5_path, bam_path, fasta_path)

get_sample

get_sample(name: str) -> Sample | None

Get a loaded sample by name from the global session

Parameters:

Name Type Description Default
name str

Sample name

required

Returns:

Type Description
Sample | None

Sample object or None if not found

Examples:

>>> from squiggy import get_sample
>>> sample = get_sample('model_v4.2')
Source code in squiggy/io.py
def get_sample(name: str) -> Sample | None:
    """
    Get a loaded sample by name from the global session

    Args:
        name: Sample name

    Returns:
        Sample object or None if not found

    Examples:
        >>> from squiggy import get_sample
        >>> sample = get_sample('model_v4.2')
    """
    return squiggy_kernel.get_sample(name)

list_samples

list_samples() -> list[str]

List all loaded sample names in the global session

Returns:

Type Description
list[str]

List of sample names

Examples:

>>> from squiggy import list_samples
>>> names = list_samples()
>>> print(f"Loaded samples: {names}")
Source code in squiggy/io.py
def list_samples() -> list[str]:
    """
    List all loaded sample names in the global session

    Returns:
        List of sample names

    Examples:
        >>> from squiggy import list_samples
        >>> names = list_samples()
        >>> print(f"Loaded samples: {names}")
    """
    return squiggy_kernel.list_samples()

remove_sample

remove_sample(name: str) -> None

Unload a sample from the global session and free its resources

Parameters:

Name Type Description Default
name str

Sample name to remove

required

Examples:

>>> from squiggy import remove_sample
>>> remove_sample('model_v4.2')
Source code in squiggy/io.py
def remove_sample(name: str) -> None:
    """
    Unload a sample from the global session and free its resources

    Args:
        name: Sample name to remove

    Examples:
        >>> from squiggy import remove_sample
        >>> remove_sample('model_v4.2')
    """
    squiggy_kernel.remove_sample(name)

close_all_samples

close_all_samples() -> None

Close all samples and clear the global session

Examples:

>>> from squiggy import close_all_samples
>>> close_all_samples()
Source code in squiggy/io.py
def close_all_samples() -> None:
    """
    Close all samples and clear the global session

    Examples:
        >>> from squiggy import close_all_samples
        >>> close_all_samples()
    """
    squiggy_kernel.close_all()

get_common_reads

get_common_reads(sample_names: list[str]) -> set[str]

Get reads that are present in all specified samples

Finds the intersection of read IDs across multiple samples.

Parameters:

Name Type Description Default
sample_names list[str]

List of sample names to compare

required

Returns:

Type Description
set[str]

Set of read IDs present in all samples

Raises:

Type Description
ValueError

If any sample name not found

Examples:

>>> from squiggy import get_common_reads
>>> common = get_common_reads(['model_v4.2', 'model_v5.0'])
>>> print(f"Common reads: {len(common)}")
Source code in squiggy/io.py
def get_common_reads(sample_names: list[str]) -> set[str]:
    """
    Get reads that are present in all specified samples

    Finds the intersection of read IDs across multiple samples.

    Args:
        sample_names: List of sample names to compare

    Returns:
        Set of read IDs present in all samples

    Raises:
        ValueError: If any sample name not found

    Examples:
        >>> from squiggy import get_common_reads
        >>> common = get_common_reads(['model_v4.2', 'model_v5.0'])
        >>> print(f"Common reads: {len(common)}")
    """
    if not sample_names:
        return set()

    # Get first sample
    first_sample = squiggy_kernel.get_sample(sample_names[0])
    if first_sample is None:
        raise ValueError(f"Sample '{sample_names[0]}' not found")

    # Start with reads from first sample
    common = set(first_sample._read_ids)

    # Intersect with remaining samples
    for name in sample_names[1:]:
        sample = squiggy_kernel.get_sample(name)
        if sample is None:
            raise ValueError(f"Sample '{name}' not found")
        common &= set(sample._read_ids)

    return common

get_unique_reads

get_unique_reads(
    sample_name: str,
    exclude_samples: list[str] | None = None,
) -> set[str]

Get reads unique to a sample (not in other samples)

Finds reads that are only in the specified sample.

Parameters:

Name Type Description Default
sample_name str

Sample to find unique reads for

required
exclude_samples list[str] | None

Samples to exclude from (default: all other samples)

None

Returns:

Type Description
set[str]

Set of read IDs unique to the sample

Raises:

Type Description
ValueError

If sample not found

Examples:

>>> from squiggy import get_unique_reads
>>> unique_a = get_unique_reads('model_v4.2')
>>> unique_b = get_unique_reads('model_v5.0')
Source code in squiggy/io.py
def get_unique_reads(
    sample_name: str, exclude_samples: list[str] | None = None
) -> set[str]:
    """
    Get reads unique to a sample (not in other samples)

    Finds reads that are only in the specified sample.

    Args:
        sample_name: Sample to find unique reads for
        exclude_samples: Samples to exclude from (default: all other samples)

    Returns:
        Set of read IDs unique to the sample

    Raises:
        ValueError: If sample not found

    Examples:
        >>> from squiggy import get_unique_reads
        >>> unique_a = get_unique_reads('model_v4.2')
        >>> unique_b = get_unique_reads('model_v5.0')
    """
    sample = squiggy_kernel.get_sample(sample_name)
    if sample is None:
        raise ValueError(f"Sample '{sample_name}' not found")

    sample_reads = set(sample._read_ids)

    # Determine which samples to exclude
    if exclude_samples is None:
        # Exclude all other samples
        exclude_samples = [
            name for name in squiggy_kernel.list_samples() if name != sample_name
        ]

    # Remove reads that appear in any excluded sample
    for exclude_name in exclude_samples:
        exclude_sample = squiggy_kernel.get_sample(exclude_name)
        if exclude_sample is None:
            raise ValueError(f"Sample '{exclude_name}' not found")
        sample_reads -= set(exclude_sample._read_ids)

    return sample_reads

compare_samples

compare_samples(sample_names: list[str]) -> dict

Compare multiple samples and return analysis

Generates a comprehensive comparison of samples including read overlap, reference validation, and model provenance information.

Parameters:

Name Type Description Default
sample_names list[str]

List of sample names to compare

required

Returns:

Type Description
dict

Dict with comparison results: - samples: List of sample names - read_overlap: Read ID overlap analysis - reference_validation: Reference compatibility (if BAM files loaded) - sample_info: Basic info about each sample

Examples:

>>> from squiggy import compare_samples
>>> result = compare_samples(['model_v4.2', 'model_v5.0'])
>>> print(result['read_overlap'])
Source code in squiggy/io.py
def compare_samples(sample_names: list[str]) -> dict:
    """
    Compare multiple samples and return analysis

    Generates a comprehensive comparison of samples including read overlap,
    reference validation, and model provenance information.

    Args:
        sample_names: List of sample names to compare

    Returns:
        Dict with comparison results:
            - samples: List of sample names
            - read_overlap: Read ID overlap analysis
            - reference_validation: Reference compatibility (if BAM files loaded)
            - sample_info: Basic info about each sample

    Examples:
        >>> from squiggy import compare_samples
        >>> result = compare_samples(['model_v4.2', 'model_v5.0'])
        >>> print(result['read_overlap'])
    """
    from .utils import compare_read_sets, validate_sq_headers

    # Validate samples exist
    for name in sample_names:
        if squiggy_kernel.get_sample(name) is None:
            raise ValueError(f"Sample '{name}' not found")

    result = {
        "samples": sample_names,
        "sample_info": {},
        "read_overlap": {},
    }

    # Add basic info about each sample
    for name in sample_names:
        sample = squiggy_kernel.get_sample(name)
        result["sample_info"][name] = {
            "num_reads": len(sample._read_ids),
            "pod5_path": sample._pod5_path,
            "bam_path": sample._bam_path,
        }

    # Compare read sets for all pairs
    if len(sample_names) >= 2:
        for i, name_a in enumerate(sample_names):
            for name_b in sample_names[i + 1 :]:
                sample_a = squiggy_kernel.get_sample(name_a)
                sample_b = squiggy_kernel.get_sample(name_b)
                pair_key = f"{name_a}_vs_{name_b}"
                result["read_overlap"][pair_key] = compare_read_sets(
                    sample_a._read_ids, sample_b._read_ids
                )

    # Validate references if BAM files are loaded
    if len(sample_names) >= 2:
        bam_pairs = []
        for i, name_a in enumerate(sample_names):
            for name_b in sample_names[i + 1 :]:
                sample_a = squiggy_kernel.get_sample(name_a)
                sample_b = squiggy_kernel.get_sample(name_b)
                if sample_a._bam_path and sample_b._bam_path:
                    bam_pairs.append(
                        (name_a, name_b, sample_a._bam_path, sample_b._bam_path)
                    )

        if bam_pairs:
            result["reference_validation"] = {}
            for name_a, name_b, bam_a, bam_b in bam_pairs:
                pair_key = f"{name_a}_vs_{name_b}"
                try:
                    validation = validate_sq_headers(bam_a, bam_b)
                    result["reference_validation"][pair_key] = validation
                except Exception as e:
                    result["reference_validation"][pair_key] = {"error": str(e)}

    return result

Session Management

SquiggyKernel

SquiggyKernel(
    cache_dir: str | None = None, use_cache: bool = True
)

Manages kernel state for loaded POD5 and BAM files, supporting multiple samples

This kernel state manager handles multiple POD5/BAM pairs (samples) simultaneously, enabling comparison workflows. Maintains backward compatibility with single-sample API by delegating to the first loaded sample.

Attributes:

Name Type Description
samples dict[str, Sample]

Dict of Sample objects, keyed by sample name

reader dict[str, Sample]

POD5 file reader (first sample, for backward compat)

pod5_path dict[str, Sample]

Path to loaded POD5 file (first sample, for backward compat)

read_ids dict[str, Sample]

List of read IDs (first sample, for backward compat)

bam_path dict[str, Sample]

Path to loaded BAM file (first sample, for backward compat)

bam_info dict[str, Sample]

Metadata about loaded BAM file (first sample, for backward compat)

ref_mapping dict[str, Sample]

Mapping of reference names to read IDs

fasta_path dict[str, Sample]

Path to loaded FASTA file (first sample, for backward compat)

fasta_info dict[str, Sample]

Metadata about loaded FASTA file (first sample, for backward compat)

Examples:

>>> from squiggy import load_pod5, load_bam, load_sample
>>> # Single sample (backward compatible)
>>> load_pod5('data.pod5')
>>> # Multiple samples
>>> load_sample('model_v4.2', 'data_v4.2.pod5', 'align_v4.2.bam')
>>> load_sample('model_v5.0', 'data_v5.0.pod5', 'align_v5.0.bam')
>>> # Access
>>> sample = get_sample('model_v5.0')
>>> print(sample)
Source code in squiggy/io.py
def __init__(self, cache_dir: str | None = None, use_cache: bool = True):
    # Multi-sample support (NEW) - PUBLIC
    self.samples: dict[str, Sample] = {}

    # Single-sample properties (for backward compatibility) - INTERNAL
    self._reader: pod5.Reader | None = None
    self._pod5_path: str | None = None
    self._read_ids: list[str] | LazyReadList = []
    self._bam_path: str | None = None
    self._bam_info: dict | None = None
    self._ref_mapping: dict[str, list[str]] | None = None
    self._fasta_path: str | None = None
    self._fasta_info: dict | None = None

    # Performance optimization attributes (NEW) - INTERNAL
    self._pod5_index: Pod5Index | None = None

    # Cache integration (NEW) - PUBLIC
    from .cache import SquiggyCache

    cache_path = Path(cache_dir) if cache_dir else None
    self.cache = SquiggyCache(cache_path, enabled=use_cache) if use_cache else None

__dir__

__dir__()

Control what appears in Variables pane - only show public API

Source code in squiggy/io.py
def __dir__(self):
    """Control what appears in Variables pane - only show public API"""
    return [
        "samples",
        "cache",
        "load_sample",
        "get_sample",
        "list_samples",
        "remove_sample",
        "close_all",
        "close_bam",
        "close_fasta",
        "close_pod5",
    ]

__repr__

__repr__() -> str

Return informative summary of loaded files

Source code in squiggy/io.py
def __repr__(self) -> str:
    """Return informative summary of loaded files"""
    if self.samples:
        # Multi-sample mode
        parts = [f"SquiggyKernel: {len(self.samples)} sample(s)"]
        for name in sorted(self.samples.keys()):
            sample = self.samples[name]
            if sample._pod5_path:
                num_reads = len(sample._read_ids)
                parts.append(f"  {name}: {num_reads:,} reads")
        return "<" + "\n".join(parts) + ">"
    else:
        # Single-sample backward compat mode
        parts = []

        if self._pod5_path:
            filename = os.path.basename(self._pod5_path)
            parts.append(f"POD5: {filename} ({len(self._read_ids):,} reads)")

        if self._bam_path:
            filename = os.path.basename(self._bam_path)
            num_reads = self._bam_info.get("num_reads", 0) if self._bam_info else 0
            parts.append(f"BAM: {filename} ({num_reads:,} reads)")

            if self._bam_info:
                if self._bam_info.get("has_modifications"):
                    mod_types = ", ".join(
                        str(m) for m in self._bam_info["modification_types"]
                    )
                    parts.append(f"Modifications: {mod_types}")
                if self._bam_info.get("has_event_alignment"):
                    parts.append("Event alignment: yes")

        if self._fasta_path:
            filename = os.path.basename(self._fasta_path)
            num_refs = (
                len(self._fasta_info.get("references", []))
                if self._fasta_info
                else 0
            )
            parts.append(f"FASTA: {filename} ({num_refs:,} references)")

        if not parts:
            return "<SquiggyKernel: No files loaded>"

        return f"<SquiggyKernel: {' | '.join(parts)}>"

load_sample

load_sample(
    name: str,
    pod5_path: str,
    bam_path: str | None = None,
    fasta_path: str | None = None,
) -> Sample

Load a POD5/BAM/FASTA sample set into this session

Parameters:

Name Type Description Default
name str

Unique identifier for this sample (e.g., 'model_v4.2')

required
pod5_path str

Path to POD5 file

required
bam_path str | None

Path to BAM file (optional)

None
fasta_path str | None

Path to FASTA file (optional)

None

Returns:

Type Description
Sample

The created Sample object

Examples:

>>> sk = SquiggyKernel()
>>> sample = sk.load_sample('v4.2', 'data_v4.2.pod5', 'align_v4.2.bam')
>>> print(f"Loaded {len(sample._read_ids)} reads")
Source code in squiggy/io.py
def load_sample(
    self,
    name: str,
    pod5_path: str,
    bam_path: str | None = None,
    fasta_path: str | None = None,
) -> Sample:
    """
    Load a POD5/BAM/FASTA sample set into this session

    Args:
        name: Unique identifier for this sample (e.g., 'model_v4.2')
        pod5_path: Path to POD5 file
        bam_path: Path to BAM file (optional)
        fasta_path: Path to FASTA file (optional)

    Returns:
        The created Sample object

    Examples:
        >>> sk = SquiggyKernel()
        >>> sample = sk.load_sample('v4.2', 'data_v4.2.pod5', 'align_v4.2.bam')
        >>> print(f"Loaded {len(sample._read_ids)} reads")
    """
    # Close existing sample with this name if any
    if name in self.samples:
        self.samples[name].close()

    # Create new sample
    sample = Sample(name)

    # Load POD5
    abs_pod5_path = os.path.abspath(pod5_path)
    if not os.path.exists(abs_pod5_path):
        raise FileNotFoundError(f"POD5 file not found: {abs_pod5_path}")

    reader = pod5.Reader(abs_pod5_path)

    read_ids = [str(read.read_id) for read in reader.reads()]

    sample._pod5_path = abs_pod5_path
    sample._pod5_reader = reader
    sample._read_ids = read_ids

    # Load BAM if provided
    if bam_path:
        abs_bam_path = os.path.abspath(bam_path)
        if not os.path.exists(abs_bam_path):
            raise FileNotFoundError(f"BAM file not found: {abs_bam_path}")

        # Collect all metadata in a single pass (includes both ref_counts and ref_mapping)
        # ref_mapping is needed for expanding references in Read Explorer
        metadata = _collect_bam_metadata_single_pass(
            Path(abs_bam_path), build_ref_mapping=True
        )

        # Build metadata dict - both ref_counts and ref_mapping are computed during single BAM scan
        bam_info = {
            "file_path": abs_bam_path,
            "num_reads": metadata["num_reads"],
            "references": metadata["references"],
            "has_modifications": metadata["has_modifications"],
            "modification_types": metadata["modification_types"],
            "has_probabilities": metadata["has_probabilities"],
            "has_event_alignment": metadata["has_event_alignment"],
            "ref_counts": metadata["ref_counts"],  # Reference name → read count
            "ref_mapping": metadata[
                "ref_mapping"
            ],  # Reference name → read IDs (needed for expanding)
        }

        sample._bam_path = abs_bam_path
        sample._bam_info = bam_info

        # Validate that POD5 and BAM have overlapping read IDs
        bam_read_ids = set()
        for ref_read_ids in metadata["ref_mapping"].values():
            bam_read_ids.update(ref_read_ids)

        pod5_read_ids = set(sample._read_ids)
        overlap = pod5_read_ids & bam_read_ids

        if len(overlap) == 0:
            raise ValueError(
                f"No overlapping read IDs found between POD5 ({len(pod5_read_ids)} reads) "
                f"and BAM ({len(bam_read_ids)} reads). These files appear to be mismatched. "
                f"Please verify that the BAM file contains alignments for reads in the POD5 file."
            )

    # Load FASTA if provided
    if fasta_path:
        abs_fasta_path = os.path.abspath(fasta_path)
        if not os.path.exists(abs_fasta_path):
            raise FileNotFoundError(f"FASTA file not found: {abs_fasta_path}")

        # Check for index
        fai_path = abs_fasta_path + ".fai"
        if not os.path.exists(fai_path):
            raise FileNotFoundError(
                f"FASTA index not found: {fai_path}. "
                f"Create index with: samtools faidx {abs_fasta_path}"
            )

        # Open FASTA file to get metadata
        fasta = pysam.FastaFile(abs_fasta_path)

        try:
            # Extract reference information
            references = list(fasta.references)
            lengths = list(fasta.lengths)

            # Build metadata dict
            fasta_info = {
                "file_path": abs_fasta_path,
                "references": references,
                "num_references": len(references),
                "reference_lengths": dict(zip(references, lengths, strict=True)),
            }

        finally:
            fasta.close()

        sample._fasta_path = abs_fasta_path
        sample._fasta_info = fasta_info

        # Validate that FASTA and BAM have matching references (if BAM is loaded)
        if sample._bam_info:
            # Extract reference names from BAM metadata (stored as list of dicts)
            bam_refs = {ref["name"] for ref in sample._bam_info["references"]}
            fasta_refs = set(references)
            overlap_refs = bam_refs & fasta_refs

            if len(overlap_refs) == 0:
                raise ValueError(
                    f"No overlapping reference names found between BAM ({len(bam_refs)} refs) "
                    f"and FASTA ({len(fasta_refs)} refs). These files appear to be mismatched. "
                    f"BAM references: {sorted(bam_refs)[:3]}, "
                    f"FASTA references: {sorted(fasta_refs)[:3]}."
                )

    # Store sample
    self.samples[name] = sample

    return sample

get_sample

get_sample(name: str) -> Sample | None

Get a loaded sample by name

Parameters:

Name Type Description Default
name str

Sample name

required

Returns:

Type Description
Sample | None

Sample object or None if not found

Examples:

>>> sk = SquiggyKernel()
>>> sample = sk.get_sample('model_v4.2')
Source code in squiggy/io.py
def get_sample(self, name: str) -> Sample | None:
    """
    Get a loaded sample by name

    Args:
        name: Sample name

    Returns:
        Sample object or None if not found

    Examples:
        >>> sk = SquiggyKernel()
        >>> sample = sk.get_sample('model_v4.2')
    """
    sample = self.samples.get(name)
    return sample

list_samples

list_samples() -> list[str]

List all loaded sample names

Returns:

Type Description
list[str]

List of sample names in order they were loaded

Examples:

>>> sk = SquiggyKernel()
>>> names = sk.list_samples()
>>> print(f"Loaded samples: {names}")
Source code in squiggy/io.py
def list_samples(self) -> list[str]:
    """
    List all loaded sample names

    Returns:
        List of sample names in order they were loaded

    Examples:
        >>> sk = SquiggyKernel()
        >>> names = sk.list_samples()
        >>> print(f"Loaded samples: {names}")
    """
    return list(self.samples.keys())

remove_sample

remove_sample(name: str) -> None

Unload a sample and free its resources

Parameters:

Name Type Description Default
name str

Sample name to remove

required

Examples:

>>> sk = SquiggyKernel()
>>> sk.remove_sample('model_v4.2')
Source code in squiggy/io.py
def remove_sample(self, name: str) -> None:
    """
    Unload a sample and free its resources

    Args:
        name: Sample name to remove

    Examples:
        >>> sk = SquiggyKernel()
        >>> sk.remove_sample('model_v4.2')
    """
    if name in self.samples:
        self.samples[name].close()
        del self.samples[name]

close_pod5

close_pod5()

Close POD5 reader and clear POD5 state (backward compat mode)

Source code in squiggy/io.py
def close_pod5(self):
    """Close POD5 reader and clear POD5 state (backward compat mode)"""
    if self._reader is not None:
        self._reader.close()
        self._reader = None
    self._pod5_path = None
    self._read_ids = []
    self._pod5_index = None  # Clear index

close_bam

close_bam()

Clear BAM state (backward compat mode)

Source code in squiggy/io.py
def close_bam(self):
    """Clear BAM state (backward compat mode)"""
    self._bam_path = None
    self._bam_info = None
    self._ref_mapping = None

close_fasta

close_fasta()

Clear FASTA state (backward compat mode)

Source code in squiggy/io.py
def close_fasta(self):
    """Clear FASTA state (backward compat mode)"""
    self._fasta_path = None
    self._fasta_info = None

close_all

close_all()

Close all resources and clear all state

Source code in squiggy/io.py
def close_all(self):
    """Close all resources and clear all state"""
    # Close all samples
    for sample in list(self.samples.values()):
        sample.close()
    self.samples.clear()

    # Clear backward compat properties
    self.close_pod5()
    self.close_bam()
    self.close_fasta()

Sample

Sample(name: str)

Represents a single POD5/BAM file pair (sample/experiment)

This class encapsulates all data for one sequencing run or basecalling model, allowing multiple samples to be loaded and compared simultaneously.

Attributes:

Name Type Description
name

Unique identifier for this sample (e.g., 'basecaller_v4.2')

pod5_path

Path to POD5 file

pod5_reader

Open POD5 file reader

read_ids

List of read IDs in this sample

bam_path

Path to BAM file (optional)

bam_info

Metadata about BAM file

model_provenance

Model/basecaller information extracted from BAM header

fasta_path

Path to FASTA reference file (optional)

fasta_info

Metadata about FASTA file

Examples:

>>> sample = Sample('model_v4.2')
>>> sample.load_pod5('data_v4.2.pod5')
>>> sample.load_bam('align_v4.2.bam')
>>> print(f"{sample.name}: {len(sample._read_ids)} reads")

Initialize a new sample with the given name

Source code in squiggy/io.py
def __init__(self, name: str):
    """Initialize a new sample with the given name"""
    self.name = name
    self._pod5_path: str | None = None
    self._pod5_reader: pod5.Reader | None = None
    self._read_ids: list[str] = []
    self._bam_path: str | None = None
    self._bam_info: dict | None = None
    self._model_provenance: dict | None = None
    self._fasta_path: str | None = None
    self._fasta_info: dict | None = None

__repr__

__repr__() -> str

Return informative summary of sample state

Source code in squiggy/io.py
def __repr__(self) -> str:
    """Return informative summary of sample state"""
    parts = [f"Sample({self.name})"]

    if self._pod5_path:
        filename = os.path.basename(self._pod5_path)
        parts.append(f"POD5: {filename} ({len(self._read_ids):,} reads)")

    if self._bam_path:
        filename = os.path.basename(self._bam_path)
        num_reads = self._bam_info.get("num_reads", 0) if self._bam_info else 0
        parts.append(f"BAM: {filename} ({num_reads:,} reads)")

    if self._fasta_path:
        filename = os.path.basename(self._fasta_path)
        num_refs = (
            len(self._fasta_info.get("references", [])) if self._fasta_info else 0
        )
        parts.append(f"FASTA: {filename} ({num_refs:,} references)")

    if len(parts) == 1:
        return f"<Sample({self.name}): No files loaded>"

    return f"<{' | '.join(parts)}>"

close

close()

Close all resources and clear sample state

Source code in squiggy/io.py
def close(self):
    """Close all resources and clear sample state"""
    if self._pod5_reader is not None:
        self._pod5_reader.close()
        self._pod5_reader = None
    self._pod5_path = None
    self._read_ids = []
    self._bam_path = None
    self._bam_info = None
    self._model_provenance = None
    self._fasta_path = None
    self._fasta_info = None

LazyReadList

LazyReadList(reader: Reader)

Virtual list of read IDs - only materializes requested slices

Provides O(1) memory overhead instead of O(n) by lazily loading read IDs from POD5 file on demand. Works seamlessly with TypeScript's pagination pattern (offset/limit slicing).

Attributes:

Name Type Description
_reader

POD5 Reader instance

_cached_length int | None

Cached total read count (computed once)

_materialized_ids list[str] | None

Optional fully materialized list (for caching)

Examples:

>>> reader = pod5.Reader('file.pod5')
>>> lazy_list = LazyReadList(reader)
>>> len(lazy_list)  # Computes length once
1000000
>>> lazy_list[0:100]  # Only loads first 100 IDs
['read1', 'read2', ...]
>>> lazy_list[500000]  # Loads single ID at position 500000
'read500001'
Source code in squiggy/io.py
def __init__(self, reader: pod5.Reader):
    self._reader = reader
    self._cached_length: int | None = None
    self._materialized_ids: list[str] | None = None

__len__

__len__() -> int

Compute total read count (cached after first call)

Source code in squiggy/io.py
def __len__(self) -> int:
    """Compute total read count (cached after first call)"""
    if self._cached_length is None:
        if self._materialized_ids is not None:
            self._cached_length = len(self._materialized_ids)
        else:
            self._cached_length = sum(1 for _ in self._reader.reads())
    return self._cached_length

__getitem__

__getitem__(key: int | slice) -> str | list[str]

Get read ID(s) at index/slice - lazy loading

Source code in squiggy/io.py
def __getitem__(self, key: int | slice) -> str | list[str]:
    """Get read ID(s) at index/slice - lazy loading"""
    # If fully materialized, use it
    if self._materialized_ids is not None:
        return self._materialized_ids[key]

    if isinstance(key, slice):
        # Handle slice - only load requested range
        start, stop, step = key.indices(len(self))
        result = []
        for i, read in enumerate(self._reader.reads()):
            if i >= stop:
                break
            if start <= i < stop:
                result.append(str(read.read_id))
        return result[::step] if step != 1 else result
    else:
        # Single index lookup
        if key < 0:
            key = len(self) + key
        for i, read in enumerate(self._reader.reads()):
            if i == key:
                return str(read.read_id)
        raise IndexError(f"Read index out of range: {key}")

__iter__

__iter__() -> Iterator[str]

Iterate over all read IDs

Source code in squiggy/io.py
def __iter__(self) -> Iterator[str]:
    """Iterate over all read IDs"""
    if self._materialized_ids is not None:
        yield from self._materialized_ids
    else:
        for read in self._reader.reads():
            yield str(read.read_id)

materialize

materialize() -> list[str]

Fully materialize the list (for caching)

Returns:

Type Description
list[str]

Complete list of all read IDs

Source code in squiggy/io.py
def materialize(self) -> list[str]:
    """
    Fully materialize the list (for caching)

    Returns:
        Complete list of all read IDs
    """
    if self._materialized_ids is None:
        self._materialized_ids = [
            str(read.read_id) for read in self._reader.reads()
        ]
        self._cached_length = len(self._materialized_ids)
    return self._materialized_ids

__repr__

__repr__() -> str

Return informative summary of lazy read list

Source code in squiggy/io.py
def __repr__(self) -> str:
    """Return informative summary of lazy read list"""
    count = len(self)
    materialized = " (materialized)" if self._materialized_ids is not None else ""
    return f"<LazyReadList: {count:,} reads{materialized}>"

Pod5Index

Pod5Index()

Fast O(1) read lookup via read_id → file position mapping

Builds an index mapping read IDs to their position in the POD5 file, enabling constant-time lookups instead of O(n) linear scans.

Attributes:

Name Type Description
_index dict[str, int]

Dict mapping read_id (str) to file position (int)

Examples:

>>> reader = pod5.Reader('file.pod5')
>>> index = Pod5Index()
>>> index.build(reader)
>>> position = index.get_position('read_abc123')
>>> if position is not None:
...     # Use position for fast retrieval
...     pass
Source code in squiggy/io.py
def __init__(self):
    self._index: dict[str, int] = {}

build

build(reader: Reader) -> None

Build index by scanning file once

Parameters:

Name Type Description Default
reader Reader

POD5 Reader to index

required
Source code in squiggy/io.py
def build(self, reader: pod5.Reader) -> None:
    """
    Build index by scanning file once

    Args:
        reader: POD5 Reader to index
    """
    for idx, read in enumerate(reader.reads()):
        self._index[str(read.read_id)] = idx

get_position

get_position(read_id: str) -> int | None

Get file position for read_id (O(1) lookup)

Parameters:

Name Type Description Default
read_id str

Read ID to look up

required

Returns:

Type Description
int | None

File position or None if not found

Source code in squiggy/io.py
def get_position(self, read_id: str) -> int | None:
    """
    Get file position for read_id (O(1) lookup)

    Args:
        read_id: Read ID to look up

    Returns:
        File position or None if not found
    """
    return self._index.get(read_id)

has_read

has_read(read_id: str) -> bool

Check if read exists (O(1))

Parameters:

Name Type Description Default
read_id str

Read ID to check

required

Returns:

Type Description
bool

True if read exists in index

Source code in squiggy/io.py
def has_read(self, read_id: str) -> bool:
    """
    Check if read exists (O(1))

    Args:
        read_id: Read ID to check

    Returns:
        True if read exists in index
    """
    return read_id in self._index

__len__

__len__() -> int

Number of indexed reads

Source code in squiggy/io.py
def __len__(self) -> int:
    """Number of indexed reads"""
    return len(self._index)

__repr__

__repr__() -> str

Return informative summary of index

Source code in squiggy/io.py
def __repr__(self) -> str:
    """Return informative summary of index"""
    count = len(self._index)
    return f"<Pod5Index: {count:,} reads indexed>"

get_reads_batch

get_reads_batch(
    read_ids: list[str], sample_name: str | None = None
) -> dict[str, pod5.ReadRecord]

Fetch multiple reads in a single pass (O(n) instead of O(m×n))

This replaces the nested loop pattern where each read_id triggers a full file scan. Instead, we scan the file once and collect all requested reads.

Parameters:

Name Type Description Default
read_ids list[str]

List of read IDs to fetch

required
sample_name str | None

(Multi-sample mode) Name of sample to get reads from. If None, uses global session reader.

None

Returns:

Type Description
dict[str, ReadRecord]

Dict mapping read_id to ReadRecord for found reads

Raises:

Type Description
RuntimeError

If no POD5 file is loaded

Examples:

>>> from squiggy import load_pod5
>>> from squiggy.io import get_reads_batch
>>> load_pod5('file.pod5')
>>> reads = get_reads_batch(['read1', 'read2', 'read3'])
>>> for read_id, read_obj in reads.items():
...     print(f"{read_id}: {len(read_obj.signal)} samples")
Source code in squiggy/io.py
def get_reads_batch(
    read_ids: list[str], sample_name: str | None = None
) -> dict[str, pod5.ReadRecord]:
    """
    Fetch multiple reads in a single pass (O(n) instead of O(m×n))

    This replaces the nested loop pattern where each read_id triggers a full
    file scan. Instead, we scan the file once and collect all requested reads.

    Args:
        read_ids: List of read IDs to fetch
        sample_name: (Multi-sample mode) Name of sample to get reads from.
                     If None, uses global session reader.

    Returns:
        Dict mapping read_id to ReadRecord for found reads

    Raises:
        RuntimeError: If no POD5 file is loaded

    Examples:
        >>> from squiggy import load_pod5
        >>> from squiggy.io import get_reads_batch
        >>> load_pod5('file.pod5')
        >>> reads = get_reads_batch(['read1', 'read2', 'read3'])
        >>> for read_id, read_obj in reads.items():
        ...     print(f"{read_id}: {len(read_obj.signal)} samples")
    """
    # Determine which reader to use
    if sample_name:
        sample = squiggy_kernel.get_sample(sample_name)
        if not sample or sample._pod5_reader is None:
            raise RuntimeError(f"Sample '{sample_name}' not loaded or has no POD5 file")
        reader = sample._pod5_reader
    else:
        if squiggy_kernel._reader is None:
            raise RuntimeError("No POD5 file is currently loaded")
        reader = squiggy_kernel._reader

    needed = set(read_ids)
    found = {}

    for read in reader.reads():
        read_id = str(read.read_id)
        if read_id in needed:
            found[read_id] = read
            if len(found) == len(needed):
                break  # Early exit once all found

    return found

get_read_by_id

get_read_by_id(
    read_id: str, sample_name: str | None = None
) -> pod5.ReadRecord | None

Get a single read by ID using index if available

Uses Pod5Index for O(1) lookup if index is built, otherwise falls back to linear scan.

Parameters:

Name Type Description Default
read_id str

Read ID to fetch

required
sample_name str | None

(Multi-sample mode) Name of sample to get read from. If None, uses global session reader.

None

Returns:

Type Description
ReadRecord | None

ReadRecord or None if not found

Raises:

Type Description
RuntimeError

If no POD5 file is loaded

Examples:

>>> from squiggy import load_pod5
>>> from squiggy.io import get_read_by_id
>>> load_pod5('file.pod5')
>>> read = get_read_by_id('read_abc123')
>>> if read:
...     print(f"Signal length: {len(read.signal)}")
Source code in squiggy/io.py
def get_read_by_id(
    read_id: str, sample_name: str | None = None
) -> pod5.ReadRecord | None:
    """
    Get a single read by ID using index if available

    Uses Pod5Index for O(1) lookup if index is built, otherwise falls back
    to linear scan.

    Args:
        read_id: Read ID to fetch
        sample_name: (Multi-sample mode) Name of sample to get read from.
                     If None, uses global session reader.

    Returns:
        ReadRecord or None if not found

    Raises:
        RuntimeError: If no POD5 file is loaded

    Examples:
        >>> from squiggy import load_pod5
        >>> from squiggy.io import get_read_by_id
        >>> load_pod5('file.pod5')
        >>> read = get_read_by_id('read_abc123')
        >>> if read:
        ...     print(f"Signal length: {len(read.signal)}")
    """
    # Determine which reader to use
    if sample_name:
        sample = squiggy_kernel.get_sample(sample_name)
        if not sample or sample._pod5_reader is None:
            raise RuntimeError(f"Sample '{sample_name}' not loaded or has no POD5 file")
        reader = sample._pod5_reader
        pod5_index = sample.pod5_index if hasattr(sample, "pod5_index") else None
    else:
        if squiggy_kernel._reader is None:
            raise RuntimeError("No POD5 file is currently loaded")
        reader = squiggy_kernel._reader
        pod5_index = (
            squiggy_kernel._pod5_index
            if hasattr(squiggy_kernel, "pod5_index")
            else None
        )

    # Use index if available
    if pod5_index is not None:
        position = pod5_index.get_position(read_id)
        if position is None:
            return None

        # Use indexed access
        for idx, read in enumerate(reader.reads()):
            if idx == position:
                return read
        return None

    # Fallback to linear scan
    for read in reader.reads():
        if str(read.read_id) == read_id:
            return read
    return None

get_reads_for_reference_paginated

get_reads_for_reference_paginated(
    reference_name: str,
    offset: int = 0,
    limit: int | None = None,
) -> list[str]

Get reads for a specific reference with pagination support

This function enables lazy loading of reads by reference for the UI. Returns a slice of read IDs for the specified reference, supporting incremental data fetching.

Parameters:

Name Type Description Default
reference_name str

Name of reference sequence (e.g., 'chr1', 'contig_42')

required
offset int

Starting index in the read list (default: 0)

0
limit int | None

Maximum number of reads to return (default: None = all remaining)

None

Returns:

Type Description
list[str]

List of read IDs for the specified reference, sliced by offset/limit

Raises:

Type Description
RuntimeError

If no BAM file is loaded in the session

KeyError

If reference_name is not found in the BAM file

Examples:

>>> from squiggy import load_bam, load_pod5
>>> from squiggy.io import get_reads_for_reference_paginated
>>> load_pod5('reads.pod5')
>>> load_bam('alignments.bam')
>>> # Get first 500 reads for chr1
>>> reads = get_reads_for_reference_paginated('chr1', offset=0, limit=500)
>>> len(reads)
500
>>> # Get next 500 reads
>>> more_reads = get_reads_for_reference_paginated('chr1', offset=500, limit=500)
Source code in squiggy/io.py
def get_reads_for_reference_paginated(
    reference_name: str, offset: int = 0, limit: int | None = None
) -> list[str]:
    """
    Get reads for a specific reference with pagination support

    This function enables lazy loading of reads by reference for the UI.
    Returns a slice of read IDs for the specified reference, supporting
    incremental data fetching.

    Args:
        reference_name: Name of reference sequence (e.g., 'chr1', 'contig_42')
        offset: Starting index in the read list (default: 0)
        limit: Maximum number of reads to return (default: None = all remaining)

    Returns:
        List of read IDs for the specified reference, sliced by offset/limit

    Raises:
        RuntimeError: If no BAM file is loaded in the session
        KeyError: If reference_name is not found in the BAM file

    Examples:
        >>> from squiggy import load_bam, load_pod5
        >>> from squiggy.io import get_reads_for_reference_paginated
        >>> load_pod5('reads.pod5')
        >>> load_bam('alignments.bam')
        >>> # Get first 500 reads for chr1
        >>> reads = get_reads_for_reference_paginated('chr1', offset=0, limit=500)
        >>> len(reads)
        500
        >>> # Get next 500 reads
        >>> more_reads = get_reads_for_reference_paginated('chr1', offset=500, limit=500)
    """
    if squiggy_kernel._ref_mapping is None:
        raise RuntimeError(
            "No BAM file loaded. Call load_bam() before accessing reference reads."
        )

    if reference_name not in squiggy_kernel._ref_mapping:
        available_refs = list(squiggy_kernel._ref_mapping.keys())
        raise KeyError(
            f"Reference '{reference_name}' not found. "
            f"Available references: {available_refs[:5]}..."
        )

    all_reads = squiggy_kernel._ref_mapping[reference_name]

    if limit is None:
        return all_reads[offset:]
    return all_reads[offset : offset + limit]

Object-Oriented API

Pod5File

Pod5File(path: str | Path)

POD5 file reader with lazy loading

Provides object-oriented interface to POD5 files without global state. Supports context manager protocol for automatic cleanup.

Parameters:

Name Type Description Default
path str | Path

Path to POD5 file

required

Examples:

>>> with Pod5File('data.pod5') as pod5:
...     for read in pod5.iter_reads(limit=5):
...         print(read.read_id)

Open POD5 file for reading

Source code in squiggy/api.py
def __init__(self, path: str | Path):
    """Open POD5 file for reading"""
    resolved_path = Path(path).resolve()

    if not resolved_path.exists():
        raise FileNotFoundError(f"POD5 file not found: {resolved_path}")

    # Store as string to avoid Path object in variables pane
    self.path = str(resolved_path)

    # Open reader
    self._reader = pod5.Reader(self.path)

    # Cache read IDs (lazy loaded on first access)
    self._read_ids: list[str] | None = None

read_ids property

read_ids: list[str]

Get list of all read IDs in the file

__len__

__len__() -> int

Return number of reads in file

Source code in squiggy/api.py
def __len__(self) -> int:
    """Return number of reads in file"""
    return len(self.read_ids)

get_read

get_read(read_id: str) -> Read

Get a single read by ID

Parameters:

Name Type Description Default
read_id str

Read identifier

required

Returns:

Type Description
Read

Read object

Raises:

Type Description
ValueError

If read ID not found

Source code in squiggy/api.py
def get_read(self, read_id: str) -> "Read":
    """
    Get a single read by ID

    Args:
        read_id: Read identifier

    Returns:
        Read object

    Raises:
        ValueError: If read ID not found
    """
    for read_obj in self._reader.reads():
        if str(read_obj.read_id) == read_id:
            return Read(read_obj, self)

    raise ValueError(f"Read not found: {read_id}")

iter_reads

iter_reads(limit: int | None = None) -> Iterator[Read]

Iterate over reads (lazy loading)

Parameters:

Name Type Description Default
limit int | None

Maximum number of reads to return (None = all)

None

Yields:

Type Description
Read

Read objects

Examples:

>>> for read in pod5.iter_reads(limit=100):
...     print(f"{read.read_id}: {len(read.signal)} samples")
Source code in squiggy/api.py
def iter_reads(self, limit: int | None = None) -> Iterator["Read"]:
    """
    Iterate over reads (lazy loading)

    Args:
        limit: Maximum number of reads to return (None = all)

    Yields:
        Read objects

    Examples:
        >>> for read in pod5.iter_reads(limit=100):
        ...     print(f"{read.read_id}: {len(read.signal)} samples")
    """
    count = 0
    for read_obj in self._reader.reads():
        if limit is not None and count >= limit:
            break
        yield Read(read_obj, self)
        count += 1

close

close()

Close the POD5 file

Source code in squiggy/api.py
def close(self):
    """Close the POD5 file"""
    if self._reader is not None:
        self._reader.close()
        self._reader = None

__enter__

__enter__()

Context manager entry

Source code in squiggy/api.py
def __enter__(self):
    """Context manager entry"""
    return self

__exit__

__exit__(exc_type, exc_val, exc_tb)

Context manager exit

Source code in squiggy/api.py
def __exit__(self, exc_type, exc_val, exc_tb):
    """Context manager exit"""
    self.close()
    return False

BamFile

BamFile(path: str | Path)

BAM alignment file reader

Provides access to alignments, references, and base modifications. Supports context manager protocol for automatic cleanup.

Parameters:

Name Type Description Default
path str | Path

Path to BAM file (must be indexed with .bai)

required

Examples:

>>> with BamFile('alignments.bam') as bam:
...     alignment = bam.get_alignment('read_001')
...     print(alignment.sequence)

Open BAM file for reading

Source code in squiggy/api.py
def __init__(self, path: str | Path):
    """Open BAM file for reading"""
    resolved_path = Path(path).resolve()

    if not resolved_path.exists():
        raise FileNotFoundError(f"BAM file not found: {resolved_path}")

    # Store as string to avoid Path object in variables pane
    self.path = str(resolved_path)

    # Check for index (silently - user will get error if region queries fail)
    bai_path = Path(self.path + ".bai")
    if not bai_path.exists():
        # Try alternate index location
        alt_bai = Path(self.path).with_suffix(".bam.bai")
        if not alt_bai.exists():
            pass  # Index not found - queries may not work

    # Open BAM file
    self._bam = pysam.AlignmentFile(self.path, "rb", check_sq=False)

    # Cache references
    self._references: list[str] | None = None
    self._mod_info: dict | None = None

references property

references: list[str]

Get list of reference sequence names

get_alignment

get_alignment(read_id: str) -> AlignedRead | None

Get alignment for a specific read

Parameters:

Name Type Description Default
read_id str

Read identifier

required

Returns:

Type Description
AlignedRead | None

AlignedRead object or None if not found or no move table

Examples:

>>> alignment = bam.get_alignment('read_001')
>>> if alignment:
...     for base in alignment.bases:
...         print(f"{base.base} at signal {base.signal_start}-{base.signal_end}")
Source code in squiggy/api.py
def get_alignment(self, read_id: str) -> AlignedRead | None:
    """
    Get alignment for a specific read

    Args:
        read_id: Read identifier

    Returns:
        AlignedRead object or None if not found or no move table

    Examples:
        >>> alignment = bam.get_alignment('read_001')
        >>> if alignment:
        ...     for base in alignment.bases:
        ...         print(f"{base.base} at signal {base.signal_start}-{base.signal_end}")
    """
    return extract_alignment_from_bam(self.path, read_id)

iter_region

iter_region(
    chrom: str,
    start: int | None = None,
    end: int | None = None,
) -> Iterator[AlignedRead]

Iterate over alignments in a genomic region

Parameters:

Name Type Description Default
chrom str

Chromosome/reference name

required
start int | None

Start position (0-based, inclusive)

None
end int | None

End position (0-based, exclusive)

None

Yields:

Type Description
AlignedRead

AlignedRead objects that have move tables

Examples:

>>> for alignment in bam.iter_region('chr1', 1000, 2000):
...     print(f"{alignment.read_id} at {alignment.genomic_start}")
Source code in squiggy/api.py
def iter_region(
    self, chrom: str, start: int | None = None, end: int | None = None
) -> Iterator[AlignedRead]:
    """
    Iterate over alignments in a genomic region

    Args:
        chrom: Chromosome/reference name
        start: Start position (0-based, inclusive)
        end: End position (0-based, exclusive)

    Yields:
        AlignedRead objects that have move tables

    Examples:
        >>> for alignment in bam.iter_region('chr1', 1000, 2000):
        ...     print(f"{alignment.read_id} at {alignment.genomic_start}")
    """
    from .alignment import _parse_alignment

    for pysam_read in self._bam.fetch(chrom, start, end):
        aligned_read = _parse_alignment(pysam_read)
        if aligned_read is not None:  # Only yield if move table present
            yield aligned_read

get_modifications_info

get_modifications_info() -> dict

Check if BAM contains base modification tags

Returns:

Type Description
dict

Dict with: - has_modifications: bool - modification_types: list of modification codes - sample_count: number of reads checked - has_probabilities: bool (ML tag present)

Examples:

>>> mod_info = bam.get_modifications_info()
>>> if mod_info['has_modifications']:
...     print(f"Found modifications: {mod_info['modification_types']}")
Source code in squiggy/api.py
def get_modifications_info(self) -> dict:
    """
    Check if BAM contains base modification tags

    Returns:
        Dict with:
            - has_modifications: bool
            - modification_types: list of modification codes
            - sample_count: number of reads checked
            - has_probabilities: bool (ML tag present)

    Examples:
        >>> mod_info = bam.get_modifications_info()
        >>> if mod_info['has_modifications']:
        ...     print(f"Found modifications: {mod_info['modification_types']}")
    """
    if self._mod_info is not None:
        return self._mod_info

    # Import here to avoid circular dependency
    from .io import get_bam_modification_info

    self._mod_info = get_bam_modification_info(self.path)
    return self._mod_info

get_reads_overlapping_motif

get_reads_overlapping_motif(
    fasta_file: FastaFile | str | Path,
    motif: str,
    region: str | None = None,
    strand: str = "both",
) -> dict[str, list[AlignedRead]]

Find reads overlapping motif positions

Parameters:

Name Type Description Default
fasta_file FastaFile | str | Path

FastaFile object or path to indexed FASTA file

required
motif str

IUPAC nucleotide pattern (e.g., "DRACH")

required
region str | None

Optional region filter ("chrom:start-end")

None
strand str

Motif search strand ('+', '-', or 'both')

'both'

Returns:

Type Description
dict[str, list[AlignedRead]]

Dict mapping motif position keys to lists of AlignedRead objects

dict[str, list[AlignedRead]]

Position key format: "chrom:position:strand"

Examples:

>>> bam = BamFile('alignments.bam')
>>> fasta = FastaFile('genome.fa')
>>> overlaps = bam.get_reads_overlapping_motif(fasta, 'DRACH', region='chr1:1000-2000')
>>> for position, reads in overlaps.items():
...     print(f"{position}: {len(reads)} reads")
...     for read in reads:
...         print(f"  {read.read_id}")
Source code in squiggy/api.py
def get_reads_overlapping_motif(
    self,
    fasta_file: "FastaFile | str | Path",
    motif: str,
    region: str | None = None,
    strand: str = "both",
) -> dict[str, list[AlignedRead]]:
    """
    Find reads overlapping motif positions

    Args:
        fasta_file: FastaFile object or path to indexed FASTA file
        motif: IUPAC nucleotide pattern (e.g., "DRACH")
        region: Optional region filter ("chrom:start-end")
        strand: Motif search strand ('+', '-', or 'both')

    Returns:
        Dict mapping motif position keys to lists of AlignedRead objects
        Position key format: "chrom:position:strand"

    Examples:
        >>> bam = BamFile('alignments.bam')
        >>> fasta = FastaFile('genome.fa')
        >>> overlaps = bam.get_reads_overlapping_motif(fasta, 'DRACH', region='chr1:1000-2000')
        >>> for position, reads in overlaps.items():
        ...     print(f"{position}: {len(reads)} reads")
        ...     for read in reads:
        ...         print(f"  {read.read_id}")
    """
    from .alignment import _parse_alignment

    # Handle fasta_file parameter
    if isinstance(fasta_file, FastaFile):
        fasta_path = fasta_file.path
    else:
        fasta_path = Path(fasta_file).resolve()

    # Search for motif matches
    matches = list(search_motif(fasta_path, motif, region, strand))

    # Build dict of motif position -> overlapping reads
    overlaps: dict[str, list[AlignedRead]] = {}

    for match in matches:
        # Create position key
        position_key = f"{match.chrom}:{match.position}:{match.strand}"

        # Find reads overlapping this position
        reads_at_position = []

        # Query BAM for reads overlapping motif region
        for pysam_read in self._bam.fetch(match.chrom, match.position, match.end):
            aligned_read = _parse_alignment(pysam_read)
            if aligned_read is not None:  # Only include if has move table
                reads_at_position.append(aligned_read)

        if reads_at_position:
            overlaps[position_key] = reads_at_position

    return overlaps

close

close()

Close the BAM file

Source code in squiggy/api.py
def close(self):
    """Close the BAM file"""
    if self._bam is not None:
        self._bam.close()
        self._bam = None

__enter__

__enter__()

Context manager entry

Source code in squiggy/api.py
def __enter__(self):
    """Context manager entry"""
    return self

__exit__

__exit__(exc_type, exc_val, exc_tb)

Context manager exit

Source code in squiggy/api.py
def __exit__(self, exc_type, exc_val, exc_tb):
    """Context manager exit"""
    self.close()
    return False

FastaFile

FastaFile(path: str | Path)

FASTA reference file reader with motif search capabilities

Provides access to reference sequences and motif searching. Requires indexed FASTA file (.fai).

Parameters:

Name Type Description Default
path str | Path

Path to FASTA file (must be indexed with .fai)

required

Examples:

>>> with FastaFile('genome.fa') as fasta:
...     # Search for DRACH motif
...     for match in fasta.search_motif('DRACH', region='chr1:1000-2000'):
...         print(f"{match.chrom}:{match.position} {match.sequence}")

Open FASTA file for reading

Source code in squiggy/api.py
def __init__(self, path: str | Path):
    """Open FASTA file for reading"""
    resolved_path = Path(path).resolve()

    if not resolved_path.exists():
        raise FileNotFoundError(f"FASTA file not found: {resolved_path}")

    # Store as string to avoid Path object in variables pane
    self.path = str(resolved_path)

    # Check for index
    fai_path = Path(self.path + ".fai")
    if not fai_path.exists():
        raise FileNotFoundError(
            f"FASTA index not found: {fai_path}. "
            f"Create with: samtools faidx {self.path}"
        )

    # Open FASTA file
    self._fasta = pysam.FastaFile(self.path)

    # Cache references
    self._references: list[str] | None = None

references property

references: list[str]

Get list of reference sequence names

fetch

fetch(
    chrom: str,
    start: int | None = None,
    end: int | None = None,
) -> str

Fetch sequence from reference

Parameters:

Name Type Description Default
chrom str

Chromosome/reference name

required
start int | None

Start position (0-based, inclusive)

None
end int | None

End position (0-based, exclusive)

None

Returns:

Type Description
str

DNA sequence string

Examples:

>>> fasta = FastaFile('genome.fa')
>>> seq = fasta.fetch('chr1', 1000, 1100)
>>> print(seq)  # 100 bp sequence
Source code in squiggy/api.py
def fetch(
    self, chrom: str, start: int | None = None, end: int | None = None
) -> str:
    """
    Fetch sequence from reference

    Args:
        chrom: Chromosome/reference name
        start: Start position (0-based, inclusive)
        end: End position (0-based, exclusive)

    Returns:
        DNA sequence string

    Examples:
        >>> fasta = FastaFile('genome.fa')
        >>> seq = fasta.fetch('chr1', 1000, 1100)
        >>> print(seq)  # 100 bp sequence
    """
    return self._fasta.fetch(chrom, start, end)

search_motif

search_motif(
    motif: str,
    region: str | None = None,
    strand: str = "both",
) -> Iterator[MotifMatch]

Search for motif matches in FASTA file

Parameters:

Name Type Description Default
motif str

IUPAC nucleotide pattern (e.g., "DRACH", "YGCY")

required
region str | None

Optional region filter ("chrom", "chrom:start", "chrom:start-end") Positions are 1-based in input

None
strand str

Search strand ('+', '-', or 'both')

'both'

Yields:

Type Description
MotifMatch

MotifMatch objects for each match found

Examples:

>>> fasta = FastaFile('genome.fa')
>>> matches = list(fasta.search_motif('DRACH', region='chr1:1000-2000'))
>>> for match in matches:
...     print(f"{match.chrom}:{match.position+1} {match.sequence} ({match.strand})")
Source code in squiggy/api.py
def search_motif(
    self,
    motif: str,
    region: str | None = None,
    strand: str = "both",
) -> Iterator[MotifMatch]:
    """
    Search for motif matches in FASTA file

    Args:
        motif: IUPAC nucleotide pattern (e.g., "DRACH", "YGCY")
        region: Optional region filter ("chrom", "chrom:start", "chrom:start-end")
                Positions are 1-based in input
        strand: Search strand ('+', '-', or 'both')

    Yields:
        MotifMatch objects for each match found

    Examples:
        >>> fasta = FastaFile('genome.fa')
        >>> matches = list(fasta.search_motif('DRACH', region='chr1:1000-2000'))
        >>> for match in matches:
        ...     print(f"{match.chrom}:{match.position+1} {match.sequence} ({match.strand})")
    """
    return search_motif(self.path, motif, region, strand)

count_motifs

count_motifs(
    motif: str,
    region: str | None = None,
    strand: str = "both",
) -> int

Count total motif matches

Parameters:

Name Type Description Default
motif str

IUPAC nucleotide pattern

required
region str | None

Optional region filter

None
strand str

Search strand ('+', '-', or 'both')

'both'

Returns:

Type Description
int

Total number of matches

Examples:

>>> fasta = FastaFile('genome.fa')
>>> count = fasta.count_motifs('DRACH', region='chr1')
>>> print(f"Found {count} DRACH motifs")
Source code in squiggy/api.py
def count_motifs(
    self,
    motif: str,
    region: str | None = None,
    strand: str = "both",
) -> int:
    """
    Count total motif matches

    Args:
        motif: IUPAC nucleotide pattern
        region: Optional region filter
        strand: Search strand ('+', '-', or 'both')

    Returns:
        Total number of matches

    Examples:
        >>> fasta = FastaFile('genome.fa')
        >>> count = fasta.count_motifs('DRACH', region='chr1')
        >>> print(f"Found {count} DRACH motifs")
    """
    return sum(1 for _ in self.search_motif(motif, region, strand))

close

close()

Close the FASTA file

Source code in squiggy/api.py
def close(self):
    """Close the FASTA file"""
    if self._fasta is not None:
        self._fasta.close()
        self._fasta = None

__enter__

__enter__()

Context manager entry

Source code in squiggy/api.py
def __enter__(self):
    """Context manager entry"""
    return self

__exit__

__exit__(exc_type, exc_val, exc_tb)

Context manager exit

Source code in squiggy/api.py
def __exit__(self, exc_type, exc_val, exc_tb):
    """Context manager exit"""
    self.close()
    return False

Read

Read(pod5_read: ReadRecord, parent_file: Pod5File)

A single POD5 read with signal data

Provides access to raw signal, normalization, alignment, and plotting.

Attributes:

Name Type Description
read_id str

Read identifier

signal ndarray

Raw signal data (numpy array)

sample_rate int

Sampling rate in Hz

Initialize Read from pod5.Read object

Parameters:

Name Type Description Default
pod5_read ReadRecord

pod5.ReadRecord object

required
parent_file Pod5File

Parent Pod5File object

required
Source code in squiggy/api.py
def __init__(self, pod5_read: pod5.ReadRecord, parent_file: Pod5File):
    """
    Initialize Read from pod5.Read object

    Args:
        pod5_read: pod5.ReadRecord object
        parent_file: Parent Pod5File object
    """
    self._parent = parent_file

    # Cache all properties immediately (pod5 read handle is temporary)
    self._read_id = str(pod5_read.read_id)
    self._signal = pod5_read.signal  # Must cache now, handle closes later
    self._sample_rate = pod5_read.run_info.sample_rate

read_id property

read_id: str

Read identifier

signal property

signal: ndarray

Raw signal data as numpy array

sample_rate property

sample_rate: int

Sampling rate in Hz

get_normalized

get_normalized(
    method: str | NormalizationMethod = "ZNORM",
) -> np.ndarray

Get normalized signal

Parameters:

Name Type Description Default
method str | NormalizationMethod

Normalization method ('NONE', 'ZNORM', 'MEDIAN', 'MAD')

'ZNORM'

Returns:

Type Description
ndarray

Normalized signal as numpy array

Examples:

>>> read = pod5.get_read('read_001')
>>> znorm_signal = read.get_normalized('ZNORM')
>>> mad_signal = read.get_normalized('MAD')
Source code in squiggy/api.py
def get_normalized(self, method: str | NormalizationMethod = "ZNORM") -> np.ndarray:
    """
    Get normalized signal

    Args:
        method: Normalization method ('NONE', 'ZNORM', 'MEDIAN', 'MAD')

    Returns:
        Normalized signal as numpy array

    Examples:
        >>> read = pod5.get_read('read_001')
        >>> znorm_signal = read.get_normalized('ZNORM')
        >>> mad_signal = read.get_normalized('MAD')
    """
    if isinstance(method, str):
        method = NormalizationMethod[method.upper()]

    return normalize_signal(self.signal, method)

get_alignment

get_alignment(
    bam_file: BamFile | None = None,
    bam_path: str | Path | None = None,
) -> AlignedRead | None

Get alignment information from BAM file

Parameters:

Name Type Description Default
bam_file BamFile | None

BamFile object (recommended)

None
bam_path str | Path | None

Path to BAM file (alternative to bam_file)

None

Returns:

Type Description
AlignedRead | None

AlignedRead object or None if not found or no move table

Examples:

>>> bam = BamFile('alignments.bam')
>>> alignment = read.get_alignment(bam)
>>> if alignment:
...     print(f"Aligned to {alignment.chromosome}:{alignment.genomic_start}")
Source code in squiggy/api.py
def get_alignment(
    self, bam_file: "BamFile | None" = None, bam_path: str | Path | None = None
) -> AlignedRead | None:
    """
    Get alignment information from BAM file

    Args:
        bam_file: BamFile object (recommended)
        bam_path: Path to BAM file (alternative to bam_file)

    Returns:
        AlignedRead object or None if not found or no move table

    Examples:
        >>> bam = BamFile('alignments.bam')
        >>> alignment = read.get_alignment(bam)
        >>> if alignment:
        ...     print(f"Aligned to {alignment.chromosome}:{alignment.genomic_start}")
    """
    if bam_file is None and bam_path is None:
        raise ValueError("Must provide either bam_file or bam_path")

    path = bam_path if bam_path is not None else bam_file.path
    return extract_alignment_from_bam(Path(path), self.read_id)

plot

plot(
    mode: str = "SINGLE",
    normalization: str = "ZNORM",
    theme: str = "LIGHT",
    downsample: int = None,
    show_dwell_time: bool = False,
    show_labels: bool = True,
    position_label_interval: int = None,
    scale_dwell_time: bool = False,
    min_mod_probability: float = 0.5,
    enabled_mod_types: list | None = None,
    show_signal_points: bool = False,
    bam_file: BamFile | None = None,
) -> BokehFigure

Generate Bokeh plot for this read

Parameters:

Name Type Description Default
mode str

Plot mode ('SINGLE', 'EVENTALIGN')

'SINGLE'
normalization str

Normalization method ('NONE', 'ZNORM', 'MEDIAN', 'MAD')

'ZNORM'
theme str

Color theme ('LIGHT', 'DARK')

'LIGHT'
downsample int

Downsampling factor (1 = no downsampling)

None
show_dwell_time bool

Color bases by dwell time (EVENTALIGN mode)

False
show_labels bool

Show base labels (EVENTALIGN mode)

True
position_label_interval int

Interval for position labels

None
scale_dwell_time bool

Scale x-axis by cumulative dwell time

False
min_mod_probability float

Minimum probability for showing modifications

0.5
enabled_mod_types list | None

List of modification types to show (None = all)

None
show_signal_points bool

Show individual signal points

False
bam_file BamFile | None

BamFile object (required for EVENTALIGN mode)

None

Returns:

Type Description
figure

Bokeh Figure object (can be customized before display)

Examples:

>>> fig = read.plot(mode='EVENTALIGN', bam_file=bam)
>>> fig.title.text = "My Custom Title"
>>> from bokeh.plotting import show
>>> show(fig)
Source code in squiggy/api.py
def plot(
    self,
    mode: str = "SINGLE",
    normalization: str = "ZNORM",
    theme: str = "LIGHT",
    downsample: int = None,
    show_dwell_time: bool = False,
    show_labels: bool = True,
    position_label_interval: int = None,
    scale_dwell_time: bool = False,
    min_mod_probability: float = 0.5,
    enabled_mod_types: list | None = None,
    show_signal_points: bool = False,
    bam_file: "BamFile | None" = None,
) -> BokehFigure:
    """
    Generate Bokeh plot for this read

    Args:
        mode: Plot mode ('SINGLE', 'EVENTALIGN')
        normalization: Normalization method ('NONE', 'ZNORM', 'MEDIAN', 'MAD')
        theme: Color theme ('LIGHT', 'DARK')
        downsample: Downsampling factor (1 = no downsampling)
        show_dwell_time: Color bases by dwell time (EVENTALIGN mode)
        show_labels: Show base labels (EVENTALIGN mode)
        position_label_interval: Interval for position labels
        scale_dwell_time: Scale x-axis by cumulative dwell time
        min_mod_probability: Minimum probability for showing modifications
        enabled_mod_types: List of modification types to show (None = all)
        show_signal_points: Show individual signal points
        bam_file: BamFile object (required for EVENTALIGN mode)

    Returns:
        Bokeh Figure object (can be customized before display)

    Examples:
        >>> fig = read.plot(mode='EVENTALIGN', bam_file=bam)
        >>> fig.title.text = "My Custom Title"
        >>> from bokeh.plotting import show
        >>> show(fig)
    """
    from .constants import DEFAULT_DOWNSAMPLE, DEFAULT_POSITION_LABEL_INTERVAL

    # Apply defaults if not specified
    if downsample is None:
        downsample = DEFAULT_DOWNSAMPLE
    if position_label_interval is None:
        position_label_interval = DEFAULT_POSITION_LABEL_INTERVAL

    # Parse parameters
    plot_mode = PlotMode[mode.upper()]
    norm_method = NormalizationMethod[normalization.upper()]
    theme_enum = Theme[theme.upper()]

    # Get alignment if needed
    aligned_read = None

    if plot_mode == PlotMode.EVENTALIGN:
        if bam_file is None:
            raise ValueError("EVENTALIGN mode requires bam_file parameter")

        aligned_read = self.get_alignment(bam_file)
        if aligned_read is None:
            raise ValueError(
                f"Read {self.read_id} not found in BAM or has no move table. "
                f"Read may be unmapped or BAM may not contain event alignment data."
            )

    # Generate plot using plot strategy
    if plot_mode == PlotMode.SINGLE:
        data = {
            "signal": self.signal,
            "read_id": self.read_id,
            "sample_rate": self.sample_rate,
        }
        options = {
            "normalization": norm_method,
            "downsample": downsample,
            "show_signal_points": show_signal_points,
            "x_axis_mode": "dwell_time" if scale_dwell_time else "regular_time",
        }
    elif plot_mode == PlotMode.EVENTALIGN:
        data = {
            "reads": [(self.read_id, self.signal, self.sample_rate)],
            "aligned_reads": [aligned_read],
        }
        options = {
            "normalization": norm_method,
            "downsample": downsample,
            "show_dwell_time": scale_dwell_time,
            "show_labels": show_labels,
            "show_signal_points": show_signal_points,
        }
    else:
        raise ValueError(
            f"Unsupported plot mode for single read: {plot_mode}. "
            f"Supported modes: SINGLE, EVENTALIGN"
        )

    strategy = create_plot_strategy(plot_mode, theme_enum)
    _, fig = strategy.create_plot(data, options)

    return fig

figure_to_html

figure_to_html(
    fig: figure, title: str = "Squiggy Plot"
) -> str

Convert Bokeh Figure to HTML string

Utility function for converting Bokeh figures to standalone HTML. Useful when you need HTML output instead of interactive display.

Parameters:

Name Type Description Default
fig figure

Bokeh Figure object

required
title str

HTML document title

'Squiggy Plot'

Returns:

Type Description
str

HTML string with embedded Bokeh plot

Examples:

>>> fig = read.plot()
>>> html = figure_to_html(fig)
>>> with open('plot.html', 'w') as f:
...     f.write(html)
Source code in squiggy/api.py
def figure_to_html(fig: BokehFigure, title: str = "Squiggy Plot") -> str:
    """
    Convert Bokeh Figure to HTML string

    Utility function for converting Bokeh figures to standalone HTML.
    Useful when you need HTML output instead of interactive display.

    Args:
        fig: Bokeh Figure object
        title: HTML document title

    Returns:
        HTML string with embedded Bokeh plot

    Examples:
        >>> fig = read.plot()
        >>> html = figure_to_html(fig)
        >>> with open('plot.html', 'w') as f:
        ...     f.write(html)
    """
    from bokeh.embed import file_html
    from bokeh.resources import CDN

    return file_html(fig, CDN, title)

Alignment

BaseAnnotation dataclass

BaseAnnotation(
    base: str,
    position: int,
    signal_start: int,
    signal_end: int,
    genomic_pos: int | None = None,
    quality: int | None = None,
)

Single base annotation with signal alignment information

AlignedRead dataclass

AlignedRead(
    read_id: str,
    sequence: str,
    bases: list[BaseAnnotation],
    chromosome: str | None = None,
    genomic_start: int | None = None,
    genomic_end: int | None = None,
    strand: str | None = None,
    is_reverse: bool = False,
    modifications: list = list(),
)

POD5 read with base call annotations

extract_alignment_from_bam

extract_alignment_from_bam(
    bam_path: Path, read_id: str
) -> AlignedRead | None

Extract alignment information for a read from BAM file

Parameters:

Name Type Description Default
bam_path Path

Path to BAM file

required
read_id str

Read identifier to search for

required

Returns:

Type Description
AlignedRead | None

AlignedRead object or None if not found

Source code in squiggy/alignment.py
def extract_alignment_from_bam(bam_path: Path, read_id: str) -> AlignedRead | None:
    """Extract alignment information for a read from BAM file

    Args:
        bam_path: Path to BAM file
        read_id: Read identifier to search for

    Returns:
        AlignedRead object or None if not found
    """
    try:
        with pysam.AlignmentFile(str(bam_path), "rb", check_sq=False) as bam:
            for alignment in bam.fetch(until_eof=True):
                if alignment.query_name == read_id:
                    return _parse_alignment(alignment)
    except Exception:
        # Error reading BAM file - return None
        pass

    return None

get_base_to_signal_mapping

get_base_to_signal_mapping(
    aligned_read: AlignedRead,
) -> tuple[str, np.ndarray]

Extract sequence and signal mapping from AlignedRead

Parameters:

Name Type Description Default
aligned_read AlignedRead

AlignedRead object with base annotations

required

Returns:

Name Type Description
tuple tuple[str, ndarray]

(sequence, seq_to_sig_map) compatible with existing plotter

Source code in squiggy/alignment.py
def get_base_to_signal_mapping(aligned_read: AlignedRead) -> tuple[str, np.ndarray]:
    """Extract sequence and signal mapping from AlignedRead

    Args:
        aligned_read: AlignedRead object with base annotations

    Returns:
        tuple: (sequence, seq_to_sig_map) compatible with existing plotter
    """
    sequence = aligned_read.sequence
    seq_to_sig_map = np.array([base.signal_start for base in aligned_read.bases])

    return sequence, seq_to_sig_map

Modifications

ModificationAnnotation dataclass

ModificationAnnotation(
    position: int,
    genomic_pos: int | None,
    mod_code: str | int,
    canonical_base: str,
    probability: float,
    signal_start: int,
    signal_end: int,
)

Single base modification annotation with probability and signal alignment

extract_modifications_from_alignment

extract_modifications_from_alignment(
    alignment: AlignedSegment, bases: list
) -> list[ModificationAnnotation]

Extract modification annotations from a BAM alignment

Parses MM/ML tags via pysam's modified_bases property and maps modifications to signal positions using base annotations.

Parameters:

Name Type Description Default
alignment AlignedSegment

pysam AlignmentSegment object

required
bases list

List of BaseAnnotation objects with signal mappings

required

Returns:

Type Description
list[ModificationAnnotation]

List of ModificationAnnotation objects (empty if no modifications)

Note

The modified_bases property returns a dict with format: {(canonical_base, strand, mod_code): [(position, quality), ...]} where: - canonical_base: str (e.g., 'C', 'A') - strand: int (0=forward, 1=reverse) - mod_code: str or int (e.g., 'm' for 5mC, 17596 for inosine) - position: int (base position in read, 0-indexed) - quality: int (encoded as 256*probability, or -1 if unknown)

Examples:

>>> from squiggy.alignment import extract_alignment_from_bam
>>> aligned_read = extract_alignment_from_bam(bam_path, read_id)
>>> mods = extract_modifications_from_alignment(alignment, aligned_read.bases)
>>> for mod in mods:
...     print(f"{mod.mod_code} at pos {mod.position}: p={mod.probability}")
Source code in squiggy/modifications.py
def extract_modifications_from_alignment(
    alignment: pysam.AlignedSegment, bases: list
) -> list[ModificationAnnotation]:
    """Extract modification annotations from a BAM alignment

    Parses MM/ML tags via pysam's modified_bases property and maps modifications
    to signal positions using base annotations.

    Args:
        alignment: pysam AlignmentSegment object
        bases: List of BaseAnnotation objects with signal mappings

    Returns:
        List of ModificationAnnotation objects (empty if no modifications)

    Note:
        The modified_bases property returns a dict with format:
        {(canonical_base, strand, mod_code): [(position, quality), ...]}
        where:
        - canonical_base: str (e.g., 'C', 'A')
        - strand: int (0=forward, 1=reverse)
        - mod_code: str or int (e.g., 'm' for 5mC, 17596 for inosine)
        - position: int (base position in read, 0-indexed)
        - quality: int (encoded as 256*probability, or -1 if unknown)

    Examples:
        >>> from squiggy.alignment import extract_alignment_from_bam
        >>> aligned_read = extract_alignment_from_bam(bam_path, read_id)
        >>> mods = extract_modifications_from_alignment(alignment, aligned_read.bases)
        >>> for mod in mods:
        ...     print(f"{mod.mod_code} at pos {mod.position}: p={mod.probability}")
    """
    modifications = []

    # Check if modifications are present
    if not hasattr(alignment, "modified_bases") or not alignment.modified_bases:
        return modifications

    # Get modified bases dict from pysam
    # Format: {(canonical, strand, mod_type): [(pos, qual), ...]}
    modified_bases_dict = alignment.modified_bases

    # Create a quick lookup for bases by position
    base_lookup = {base.position: base for base in bases}

    # Iterate over all modification types in the alignment
    for (canonical_base, _strand, mod_code), mod_list in modified_bases_dict.items():
        # Process each modified position
        for position, quality in mod_list:
            # Convert quality to probability
            # Quality is encoded as (256 * probability), or -1 if unknown
            if quality < 0:
                # Unknown probability, skip
                continue

            probability = quality / 256.0

            # Cap probability at 1.0 (allow small floating point overflow)
            probability = min(probability, 1.0)

            # Get signal positions from the base annotation
            if position not in base_lookup:
                # Position not in base annotations (shouldn't happen, but be defensive)
                continue

            base = base_lookup[position]

            # Create modification annotation
            mod_annotation = ModificationAnnotation(
                position=position,
                genomic_pos=base.genomic_pos,
                mod_code=mod_code,
                canonical_base=canonical_base,
                probability=probability,
                signal_start=base.signal_start,
                signal_end=base.signal_end,
            )
            modifications.append(mod_annotation)

    return modifications

detect_modification_provenance

detect_modification_provenance(
    bam_file: Path,
) -> dict[str, Any]

Detect modification calling provenance from BAM header

Parses @PG (program) header lines to extract basecaller information, version, and model details. Useful for displaying metadata and understanding modification calling parameters.

Parameters:

Name Type Description Default
bam_file Path

Path to BAM file

required

Returns:

Type Description
dict[str, Any]

Dict with keys: - basecaller: str (e.g., "dorado", "remora", "guppy", or "Unknown") - version: str (e.g., "0.8.0" or "Unknown") - model: str (model name or "Unknown") - full_info: str (complete @PG command line for reference) - unknown: bool (True if provenance could not be determined)

Examples:

>>> provenance = detect_modification_provenance(bam_path)
>>> if not provenance["unknown"]:
...     print(f"Basecaller: {provenance['basecaller']} v{provenance['version']}")
...     print(f"Model: {provenance['model']}")
Source code in squiggy/modifications.py
def detect_modification_provenance(bam_file: Path) -> dict[str, Any]:
    """Detect modification calling provenance from BAM header

    Parses @PG (program) header lines to extract basecaller information, version,
    and model details. Useful for displaying metadata and understanding modification
    calling parameters.

    Args:
        bam_file: Path to BAM file

    Returns:
        Dict with keys:
            - basecaller: str (e.g., "dorado", "remora", "guppy", or "Unknown")
            - version: str (e.g., "0.8.0" or "Unknown")
            - model: str (model name or "Unknown")
            - full_info: str (complete @PG command line for reference)
            - unknown: bool (True if provenance could not be determined)

    Examples:
        >>> provenance = detect_modification_provenance(bam_path)
        >>> if not provenance["unknown"]:
        ...     print(f"Basecaller: {provenance['basecaller']} v{provenance['version']}")
        ...     print(f"Model: {provenance['model']}")
    """
    result = {
        "basecaller": "Unknown",
        "version": "Unknown",
        "model": "Unknown",
        "full_info": "",
        "unknown": True,
    }

    try:
        with pysam.AlignmentFile(str(bam_file), "rb", check_sq=False) as bam:
            header = bam.header.to_dict()

            # Look for @PG lines
            if "PG" not in header:
                return result

            for pg_entry in header["PG"]:
                # Check for dorado
                if "PN" in pg_entry and "dorado" in pg_entry["PN"].lower():
                    result["basecaller"] = "dorado"
                    result["version"] = pg_entry.get("VN", "Unknown")

                    # Try to extract model from command line (CL field)
                    if "CL" in pg_entry:
                        cl = pg_entry["CL"]
                        result["full_info"] = cl

                        # Look for model argument
                        if "--model" in cl or "-m" in cl:
                            parts = cl.split()
                            for i, part in enumerate(parts):
                                if part in ["--model", "-m"] and i + 1 < len(parts):
                                    result["model"] = parts[i + 1]
                                    break

                    result["unknown"] = False
                    break

                # Check for remora
                elif "PN" in pg_entry and "remora" in pg_entry["PN"].lower():
                    result["basecaller"] = "remora"
                    result["version"] = pg_entry.get("VN", "Unknown")

                    if "CL" in pg_entry:
                        result["full_info"] = pg_entry["CL"]

                    result["unknown"] = False
                    break

                # Check for guppy
                elif "PN" in pg_entry and "guppy" in pg_entry["PN"].lower():
                    result["basecaller"] = "guppy"
                    result["version"] = pg_entry.get("VN", "Unknown")

                    if "CL" in pg_entry:
                        result["full_info"] = pg_entry["CL"]

                    result["unknown"] = False
                    break

    except Exception:
        pass

    return result

MotifMatch dataclass

MotifMatch(
    chrom: str,
    position: int,
    sequence: str,
    strand: Literal["+", "-"],
)

Represents a single motif match in a genomic sequence

Attributes:

Name Type Description
chrom str

Chromosome/reference name

position int

0-based genomic position of match start

sequence str

Matched sequence

strand Literal['+', '-']

Strand ('+' or '-')

length int

Length of matched sequence

length property

length: int

Length of matched sequence

end property

end: int

End position (exclusive, 0-based)

iupac_to_regex

iupac_to_regex(pattern: str) -> str

Convert IUPAC nucleotide pattern to regular expression

Parameters:

Name Type Description Default
pattern str

IUPAC nucleotide pattern (e.g., "DRACH", "YGCY")

required

Returns:

Type Description
str

Regular expression pattern string

Examples:

>>> iupac_to_regex("DRACH")
'[AGT][AG]AC[ACT]'
>>> iupac_to_regex("YGCY")
'[CT]GC[CT]'

Raises:

Type Description
ValueError

If pattern contains invalid IUPAC codes

Source code in squiggy/motif.py
def iupac_to_regex(pattern: str) -> str:
    """
    Convert IUPAC nucleotide pattern to regular expression

    Args:
        pattern: IUPAC nucleotide pattern (e.g., "DRACH", "YGCY")

    Returns:
        Regular expression pattern string

    Examples:
        >>> iupac_to_regex("DRACH")
        '[AGT][AG]AC[ACT]'
        >>> iupac_to_regex("YGCY")
        '[CT]GC[CT]'

    Raises:
        ValueError: If pattern contains invalid IUPAC codes
    """
    regex_parts = []

    for char in pattern.upper():
        if char not in IUPAC_CODES:
            valid_codes = ", ".join(sorted(IUPAC_CODES.keys()))
            raise ValueError(
                f"Invalid IUPAC code '{char}' in pattern '{pattern}'. "
                f"Valid codes: {valid_codes}"
            )
        regex_parts.append(IUPAC_CODES[char])

    return "".join(regex_parts)

search_motif

search_motif(
    fasta_file: str | Path,
    motif: str,
    region: str | None = None,
    strand: Literal["+", "-", "both"] = "both",
) -> Iterator[MotifMatch]

Search for motif matches in FASTA file

Lazy iteration over matches for memory efficiency.

Parameters:

Name Type Description Default
fasta_file str | Path

Path to indexed FASTA file (.fai required)

required
motif str

IUPAC nucleotide pattern (e.g., "DRACH", "YGCY")

required
region str | None

Optional region filter ("chrom", "chrom:start", "chrom:start-end") Positions are 1-based in input, converted to 0-based internally

None
strand Literal['+', '-', 'both']

Search strand ('+', '-', or 'both')

'both'

Yields:

Type Description
MotifMatch

MotifMatch objects for each match found

Examples:

>>> matches = list(search_motif("genome.fa", "DRACH", region="chr1:1000-2000"))
>>> for match in matches:
...     print(f"{match.chrom}:{match.position+1} {match.sequence} ({match.strand})")

Raises:

Type Description
FileNotFoundError

If FASTA file or index not found

ValueError

If motif contains invalid IUPAC codes or region format is invalid

Source code in squiggy/motif.py
def search_motif(
    fasta_file: str | Path,
    motif: str,
    region: str | None = None,
    strand: Literal["+", "-", "both"] = "both",
) -> Iterator[MotifMatch]:
    """
    Search for motif matches in FASTA file

    Lazy iteration over matches for memory efficiency.

    Args:
        fasta_file: Path to indexed FASTA file (.fai required)
        motif: IUPAC nucleotide pattern (e.g., "DRACH", "YGCY")
        region: Optional region filter ("chrom", "chrom:start", "chrom:start-end")
                Positions are 1-based in input, converted to 0-based internally
        strand: Search strand ('+', '-', or 'both')

    Yields:
        MotifMatch objects for each match found

    Examples:
        >>> matches = list(search_motif("genome.fa", "DRACH", region="chr1:1000-2000"))
        >>> for match in matches:
        ...     print(f"{match.chrom}:{match.position+1} {match.sequence} ({match.strand})")

    Raises:
        FileNotFoundError: If FASTA file or index not found
        ValueError: If motif contains invalid IUPAC codes or region format is invalid
    """
    fasta_path = Path(fasta_file).resolve()

    if not fasta_path.exists():
        raise FileNotFoundError(f"FASTA file not found: {fasta_path}")

    # Check for index
    fai_path = Path(str(fasta_path) + ".fai")
    if not fai_path.exists():
        raise FileNotFoundError(
            f"FASTA index not found: {fai_path}. "
            f"Create index with: samtools faidx {fasta_path}"
        )

    # Convert motif to regex
    regex_pattern = iupac_to_regex(motif)
    regex = re.compile(regex_pattern, re.IGNORECASE)

    # Parse region if provided
    target_chrom = None
    target_start = None
    target_end = None

    if region:
        target_chrom, target_start, target_end = parse_region(region)

    # Open FASTA file
    with pysam.FastaFile(str(fasta_path)) as fasta:
        # Determine which chromosomes to search
        if target_chrom:
            if target_chrom not in fasta.references:
                raise ValueError(
                    f"Chromosome '{target_chrom}' not found in FASTA file. "
                    f"Available: {', '.join(fasta.references)}"
                )
            chroms = [target_chrom]
        else:
            chroms = list(fasta.references)

        # Search each chromosome
        for chrom in chroms:
            # Get sequence for region
            if target_chrom == chrom:
                seq = fasta.fetch(chrom, target_start, target_end)
                offset = target_start if target_start is not None else 0
            else:
                seq = fasta.fetch(chrom)
                offset = 0

            # Search forward strand
            if strand in ("+", "both"):
                for match in regex.finditer(seq):
                    yield MotifMatch(
                        chrom=chrom,
                        position=match.start() + offset,
                        sequence=match.group().upper(),
                        strand="+",
                    )

            # Search reverse strand
            if strand in ("-", "both"):
                rc_seq = reverse_complement(seq)
                rc_motif = reverse_complement(motif)
                rc_regex_pattern = iupac_to_regex(rc_motif)
                rc_regex = re.compile(rc_regex_pattern, re.IGNORECASE)

                for match in rc_regex.finditer(rc_seq):
                    # Convert position back to forward strand coordinates
                    rc_position = match.start()
                    match_len = len(match.group())
                    # Position on forward strand
                    fw_position = len(seq) - rc_position - match_len

                    # Get the actual sequence from the forward strand
                    fw_sequence = seq[fw_position : fw_position + match_len].upper()

                    yield MotifMatch(
                        chrom=chrom,
                        position=fw_position + offset,
                        sequence=fw_sequence,
                        strand="-",
                    )

count_motifs

count_motifs(
    fasta_file: str | Path,
    motif: str,
    region: str | None = None,
    strand: Literal["+", "-", "both"] = "both",
) -> int

Count total motif matches in FASTA file

Parameters:

Name Type Description Default
fasta_file str | Path

Path to indexed FASTA file (.fai required)

required
motif str

IUPAC nucleotide pattern (e.g., "DRACH", "YGCY")

required
region str | None

Optional region filter ("chrom:start-end")

None
strand Literal['+', '-', 'both']

Search strand ('+', '-', or 'both')

'both'

Returns:

Type Description
int

Total number of matches

Examples:

>>> count = count_motifs("genome.fa", "DRACH", region="chr1")
>>> print(f"Found {count} DRACH motifs on chr1")
Source code in squiggy/motif.py
def count_motifs(
    fasta_file: str | Path,
    motif: str,
    region: str | None = None,
    strand: Literal["+", "-", "both"] = "both",
) -> int:
    """
    Count total motif matches in FASTA file

    Args:
        fasta_file: Path to indexed FASTA file (.fai required)
        motif: IUPAC nucleotide pattern (e.g., "DRACH", "YGCY")
        region: Optional region filter ("chrom:start-end")
        strand: Search strand ('+', '-', or 'both')

    Returns:
        Total number of matches

    Examples:
        >>> count = count_motifs("genome.fa", "DRACH", region="chr1")
        >>> print(f"Found {count} DRACH motifs on chr1")
    """
    return sum(1 for _ in search_motif(fasta_file, motif, region, strand))

IUPAC_CODES module-attribute

IUPAC_CODES = {
    "A": "A",
    "C": "C",
    "G": "G",
    "T": "T",
    "U": "T",
    "R": "[AG]",
    "Y": "[CT]",
    "S": "[GC]",
    "W": "[AT]",
    "K": "[GT]",
    "M": "[AC]",
    "B": "[CGT]",
    "D": "[AGT]",
    "H": "[ACT]",
    "V": "[ACG]",
    "N": "[ACGT]",
}

Normalization

normalize_signal

normalize_signal(
    signal: ndarray, method: NormalizationMethod
) -> np.ndarray

Normalize signal data using specified method

Parameters:

Name Type Description Default
signal ndarray

Raw signal array (numpy array)

required
method NormalizationMethod

Normalization method to apply

required

Returns:

Type Description
ndarray

Normalized signal array

Source code in squiggy/normalization.py
def normalize_signal(signal: np.ndarray, method: NormalizationMethod) -> np.ndarray:
    """Normalize signal data using specified method

    Args:
        signal: Raw signal array (numpy array)
        method: Normalization method to apply

    Returns:
        Normalized signal array
    """
    if method == NormalizationMethod.NONE:
        return signal
    elif method == NormalizationMethod.ZNORM:
        return z_normalize(signal)
    elif method == NormalizationMethod.MEDIAN:
        return median_normalize(signal)
    elif method == NormalizationMethod.MAD:
        return mad_normalize(signal)
    else:
        raise ValueError(f"Unknown normalization method: {method}")

NormalizationMethod

Bases: Enum

Signal normalization methods

Constants

PlotMode

Bases: Enum

Available plotting modes for signal visualization

Theme

Bases: Enum

Application theme modes

BASE_COLORS module-attribute

BASE_COLORS = {
    "A": "#009E73",
    "C": "#F0E442",
    "G": "#0072B2",
    "T": "#D55E00",
    "U": "#D55E00",
    "N": "#808080",
}

BASE_COLORS_DARK module-attribute

BASE_COLORS_DARK = {
    "A": "#00d9a3",
    "C": "#fff34d",
    "G": "#4da6ff",
    "T": "#ff8c42",
    "U": "#ff8c42",
    "N": "#999999",
}

Plot Strategies

create_plot_strategy

create_plot_strategy(
    plot_mode: PlotMode, theme: Theme
) -> PlotStrategy

Factory function to create the appropriate plot strategy

Parameters:

Name Type Description Default
plot_mode PlotMode

PlotMode enum specifying which plot type to create

required
theme Theme

Theme enum (LIGHT or DARK)

required

Returns:

Type Description
PlotStrategy

PlotStrategy instance for the specified mode

Raises:

Type Description
ValueError

If plot_mode is not recognized

Examples:

>>> from squiggy.plot_factory import create_plot_strategy
>>> from squiggy.constants import PlotMode, Theme
>>>
>>> strategy = create_plot_strategy(PlotMode.SINGLE, Theme.LIGHT)
>>> html, fig = strategy.create_plot(data, options)
Source code in squiggy/plot_factory.py
def create_plot_strategy(plot_mode: PlotMode, theme: Theme) -> PlotStrategy:
    """
    Factory function to create the appropriate plot strategy

    Args:
        plot_mode: PlotMode enum specifying which plot type to create
        theme: Theme enum (LIGHT or DARK)

    Returns:
        PlotStrategy instance for the specified mode

    Raises:
        ValueError: If plot_mode is not recognized

    Examples:
        >>> from squiggy.plot_factory import create_plot_strategy
        >>> from squiggy.constants import PlotMode, Theme
        >>>
        >>> strategy = create_plot_strategy(PlotMode.SINGLE, Theme.LIGHT)
        >>> html, fig = strategy.create_plot(data, options)
    """
    strategy_map = {
        PlotMode.SINGLE: SingleReadPlotStrategy,
        PlotMode.OVERLAY: OverlayPlotStrategy,
        PlotMode.STACKED: StackedPlotStrategy,
        PlotMode.EVENTALIGN: EventAlignPlotStrategy,
        PlotMode.AGGREGATE: AggregatePlotStrategy,
        PlotMode.DELTA: DeltaPlotStrategy,
        PlotMode.SIGNAL_OVERLAY_COMPARISON: SignalOverlayComparisonStrategy,
        PlotMode.AGGREGATE_COMPARISON: AggregateComparisonStrategy,
    }

    strategy_class = strategy_map.get(plot_mode)
    if strategy_class is None:
        valid_modes = ", ".join(m.value for m in PlotMode)
        raise ValueError(f"Unknown plot mode: {plot_mode}. Valid modes: {valid_modes}")

    return strategy_class(theme)

Utilities

Signal Processing

get_basecall_data

get_basecall_data(
    bam_file: Path, read_id: str
) -> tuple[str | None, np.ndarray | None]

Extract basecall sequence and signal mapping from BAM file

Parameters:

Name Type Description Default
bam_file Path

Path to BAM file

required
read_id str

Read identifier to search for

required

Returns:

Type Description
tuple[str | None, ndarray | None]

(sequence, seq_to_sig_map) or (None, None) if not available

Source code in squiggy/utils.py
def get_basecall_data(
    bam_file: Path, read_id: str
) -> tuple[str | None, np.ndarray | None]:
    """Extract basecall sequence and signal mapping from BAM file

    Args:
        bam_file: Path to BAM file
        read_id: Read identifier to search for

    Returns:
        (sequence, seq_to_sig_map) or (None, None) if not available
    """
    if not bam_file:
        return None, None

    try:
        bam = pysam.AlignmentFile(str(bam_file), "rb", check_sq=False)

        # Find the read in BAM
        for read in bam.fetch(until_eof=True):
            if read.query_name == read_id:
                # Get sequence
                sequence = read.query_sequence

                # Get move table from BAM tags
                if read.has_tag("mv"):
                    move_table = np.array(read.get_tag("mv"), dtype=np.uint8)

                    # Extract stride (first element) and moves (remaining elements)
                    # Stride represents the neural network downsampling factor
                    # Typical values: 5 for DNA models, 10-12 for RNA models
                    stride = int(move_table[0])
                    moves = move_table[1:]

                    # Convert move table to signal-to-sequence mapping
                    seq_to_sig_map = []
                    sig_pos = 0
                    for move in moves:
                        if move == 1:
                            seq_to_sig_map.append(sig_pos)
                        sig_pos += stride

                    bam.close()
                    return sequence, np.array(seq_to_sig_map)

        bam.close()

    except Exception:
        pass

    return None, None

downsample_signal

downsample_signal(
    signal: ndarray, downsample_factor: int = None
) -> np.ndarray

Downsample signal array by taking every Nth point

Reduces the number of data points for faster plotting while preserving the overall shape of the signal.

Parameters:

Name Type Description Default
signal ndarray

Raw signal array (numpy array)

required
downsample_factor int

Factor by which to downsample (None = use default, 1 = no downsampling)

None

Returns:

Type Description
ndarray

Downsampled signal array

Source code in squiggy/utils.py
def downsample_signal(signal: np.ndarray, downsample_factor: int = None) -> np.ndarray:
    """Downsample signal array by taking every Nth point

    Reduces the number of data points for faster plotting while preserving
    the overall shape of the signal.

    Args:
        signal: Raw signal array (numpy array)
        downsample_factor: Factor by which to downsample (None = use default, 1 = no downsampling)

    Returns:
        Downsampled signal array
    """
    from .constants import DEFAULT_DOWNSAMPLE

    if downsample_factor is None:
        downsample_factor = DEFAULT_DOWNSAMPLE

    if downsample_factor <= 1:
        return signal

    # Take every Nth point
    return signal[::downsample_factor]

BAM/Alignment Utilities

get_bam_references

get_bam_references(bam_file: Path) -> list[dict]

Get list of reference sequences from BAM file with read counts

Parameters:

Name Type Description Default
bam_file Path

Path to BAM file

required

Returns:

Type Description
list[dict]

List of dicts with keys: - name: Reference name - length: Reference sequence length - read_count: Number of aligned reads (requires index)

Raises:

Type Description
FileNotFoundError

If BAM file doesn't exist

Source code in squiggy/utils.py
def get_bam_references(bam_file: Path) -> list[dict]:
    """Get list of reference sequences from BAM file with read counts

    Args:
        bam_file: Path to BAM file

    Returns:
        List of dicts with keys:
            - name: Reference name
            - length: Reference sequence length
            - read_count: Number of aligned reads (requires index)

    Raises:
        FileNotFoundError: If BAM file doesn't exist
    """
    if not bam_file or not Path(bam_file).exists():
        raise FileNotFoundError(f"BAM file not found: {bam_file}")

    references = []

    try:
        with pysam.AlignmentFile(str(bam_file), "rb", check_sq=False) as bam:
            # Get reference names and lengths from header
            for ref_name, ref_length in zip(bam.references, bam.lengths, strict=False):
                ref_info = {
                    "name": ref_name,
                    "length": ref_length,
                    "read_count": None,
                }

                # Try to count reads if BAM is indexed
                bai_path = Path(str(bam_file) + ".bai")
                if bai_path.exists():
                    try:
                        # Count mapped reads for this reference
                        count = bam.count(ref_name)
                        ref_info["read_count"] = count
                    except Exception:
                        # If counting fails, leave as None
                        pass

                # Only include references that have reads
                # If read_count is None (no index), include it
                # If read_count is 0, skip it
                if ref_info["read_count"] is None or ref_info["read_count"] > 0:
                    references.append(ref_info)

    except Exception as e:
        raise ValueError(f"Error reading BAM file: {str(e)}") from e

    return references

get_reads_in_region

get_reads_in_region(
    bam_file: Path,
    chromosome: str,
    start: int | None = None,
    end: int | None = None,
) -> dict

Query BAM file for reads aligning to a specific region

Requires BAM file to be indexed (.bai file must exist).

Parameters:

Name Type Description Default
bam_file Path

Path to BAM file

required
chromosome str

Chromosome/reference name

required
start int | None

Start position (0-based, inclusive) or None for entire chromosome

None
end int | None

End position (0-based, exclusive) or None for entire chromosome

None

Returns:

Type Description
dict

Dictionary mapping read_id -> alignment_info with keys: - read_id: Read identifier - chromosome: Reference name - start: Alignment start position - end: Alignment end position - strand: '+' or '-' - is_reverse: Boolean

Raises:

Type Description
ValueError

If BAM file is not indexed or region is invalid

FileNotFoundError

If BAM file doesn't exist

Source code in squiggy/utils.py
def get_reads_in_region(
    bam_file: Path, chromosome: str, start: int | None = None, end: int | None = None
) -> dict:
    """Query BAM file for reads aligning to a specific region

    Requires BAM file to be indexed (.bai file must exist).

    Args:
        bam_file: Path to BAM file
        chromosome: Chromosome/reference name
        start: Start position (0-based, inclusive) or None for entire chromosome
        end: End position (0-based, exclusive) or None for entire chromosome

    Returns:
        Dictionary mapping read_id -> alignment_info with keys:
            - read_id: Read identifier
            - chromosome: Reference name
            - start: Alignment start position
            - end: Alignment end position
            - strand: '+' or '-'
            - is_reverse: Boolean

    Raises:
        ValueError: If BAM file is not indexed or region is invalid
        FileNotFoundError: If BAM file doesn't exist
    """
    if not bam_file or not Path(bam_file).exists():
        raise FileNotFoundError(f"BAM file not found: {bam_file}")

    # Check for BAM index - will be created by caller if needed
    bai_path = Path(str(bam_file) + ".bai")
    if not bai_path.exists():
        raise ValueError(
            f"BAM index file not found: {bai_path}\n"
            "The BAM file must be indexed before querying regions."
        )

    reads_dict = {}

    try:
        with pysam.AlignmentFile(str(bam_file), "rb") as bam:
            # Verify chromosome exists in BAM header
            if chromosome not in bam.references:
                available = ", ".join(bam.references[:5])
                raise ValueError(
                    f"Chromosome '{chromosome}' not found in BAM file.\n"
                    f"Available references: {available}..."
                )

            # Query region
            if start is not None and end is not None:
                # Specific region
                alignments = bam.fetch(chromosome, start, end)
            else:
                # Entire chromosome
                alignments = bam.fetch(chromosome)

            # Collect alignment info
            for aln in alignments:
                if aln.is_unmapped:
                    continue

                reads_dict[aln.query_name] = {
                    "read_id": aln.query_name,
                    "chromosome": aln.reference_name,
                    "start": aln.reference_start,
                    "end": aln.reference_end,
                    "strand": "-" if aln.is_reverse else "+",
                    "is_reverse": aln.is_reverse,
                }

    except Exception as e:
        raise ValueError(f"Error querying BAM file: {str(e)}") from e

    return reads_dict

get_reference_sequence_for_read

get_reference_sequence_for_read(bam_file, read_id)

Extract the reference sequence for a given aligned read.

Parameters:

Name Type Description Default
bam_file

Path to BAM file

required
read_id

Read identifier

required

Returns:

Type Description

Tuple of (reference_sequence, reference_start, aligned_read)

Returns (None, None, None) if read not found or not aligned

Source code in squiggy/utils.py
def get_reference_sequence_for_read(bam_file, read_id):
    """
    Extract the reference sequence for a given aligned read.

    Args:
        bam_file: Path to BAM file
        read_id: Read identifier

    Returns:
        Tuple of (reference_sequence, reference_start, aligned_read)
        Returns (None, None, None) if read not found or not aligned
    """
    try:
        with pysam.AlignmentFile(str(bam_file), "rb", check_sq=False) as bam:
            # Find the alignment for this read
            aligned_read = None
            for aln in bam.fetch(until_eof=True):
                if aln.query_name == read_id:
                    aligned_read = aln
                    break

            if not aligned_read or aligned_read.is_unmapped:
                return None, None, None

            # Get reference sequence from the alignment
            ref_name = aligned_read.reference_name
            ref_start = aligned_read.reference_start

            # Check if reference sequence is available in BAM header
            if bam.header.get("SQ"):
                # Try to get reference sequence from header (if embedded)
                for sq in bam.header["SQ"]:
                    if sq["SN"] == ref_name:
                        # Some BAMs have embedded reference sequences
                        if "M5" in sq or "UR" in sq:
                            # Reference not embedded, need to reconstruct from alignment
                            break

            # Reconstruct reference sequence from aligned read
            # Use the aligned pairs to build the reference sequence
            ref_seq_list = []
            ref_positions = []

            for query_pos, ref_pos in aligned_read.get_aligned_pairs():
                if ref_pos is not None:  # Skip insertions in read
                    ref_positions.append(ref_pos)
                    if query_pos is not None:
                        # Match or mismatch
                        base = aligned_read.query_sequence[query_pos]
                        ref_seq_list.append(base)
                    else:
                        # Deletion in read (gap in query)
                        ref_seq_list.append("N")  # Use N for deletions

            ref_seq = "".join(ref_seq_list)

            return ref_seq, ref_start, aligned_read

    except Exception as e:
        raise ValueError(f"Error extracting reference sequence: {str(e)}") from e

open_bam_safe

open_bam_safe(bam_path: str | Path)

Context manager for safely opening and closing BAM files

This utility eliminates duplicate BAM file handling code by providing a consistent pattern for opening BAM files with proper resource cleanup.

Parameters:

Name Type Description Default
bam_path str | Path

Path to BAM file (string or Path object)

required

Yields:

Type Description

pysam.AlignmentFile: Opened BAM file handle

Raises:

Type Description
FileNotFoundError

If BAM file doesn't exist

IOError

If BAM file cannot be opened

Examples:

>>> from squiggy.utils import open_bam_safe
>>> with open_bam_safe("alignments.bam") as bam:
...     for read in bam:
...         print(read.query_name)
>>> # Automatically closes even on error
>>> with open_bam_safe("alignments.bam") as bam:
...     references = list(bam.references)
Source code in squiggy/utils.py
@contextmanager
def open_bam_safe(bam_path: str | Path):
    """
    Context manager for safely opening and closing BAM files

    This utility eliminates duplicate BAM file handling code by providing
    a consistent pattern for opening BAM files with proper resource cleanup.

    Args:
        bam_path: Path to BAM file (string or Path object)

    Yields:
        pysam.AlignmentFile: Opened BAM file handle

    Raises:
        FileNotFoundError: If BAM file doesn't exist
        IOError: If BAM file cannot be opened

    Examples:
        >>> from squiggy.utils import open_bam_safe
        >>> with open_bam_safe("alignments.bam") as bam:
        ...     for read in bam:
        ...         print(read.query_name)

        >>> # Automatically closes even on error
        >>> with open_bam_safe("alignments.bam") as bam:
        ...     references = list(bam.references)
    """
    bam_path = Path(bam_path)

    if not bam_path.exists():
        raise FileNotFoundError(f"BAM file not found: {bam_path}")

    bam = None
    try:
        # Open with check_sq=False to avoid issues with missing SQ headers
        bam = pysam.AlignmentFile(str(bam_path), "rb", check_sq=False)
        yield bam
    finally:
        if bam is not None:
            bam.close()

validate_sq_headers

validate_sq_headers(
    bam_file_a: str, bam_file_b: str
) -> dict

Validate that two BAM files have matching reference sequences

Compares the SQ (sequence) headers from two BAM files to ensure they have the same references. This is important for comparison analysis to ensure reads can be meaningfully compared.

Parameters:

Name Type Description Default
bam_file_a str

Path to first BAM file

required
bam_file_b str

Path to second BAM file

required

Returns:

Type Description
dict

Dict with validation results: - is_valid (bool): True if references match - references_a (list): References in file A - references_b (list): References in file B - missing_in_b (list): References in A but not B - missing_in_a (list): References in B but not A - matching_count (int): Number of matching references

Examples:

>>> from squiggy.utils import validate_sq_headers
>>> result = validate_sq_headers('align_a.bam', 'align_b.bam')
>>> if result['is_valid']:
...     print("References match!")
... else:
...     print(f"Missing in B: {result['missing_in_b']}")
Source code in squiggy/utils.py
def validate_sq_headers(bam_file_a: str, bam_file_b: str) -> dict:
    """
    Validate that two BAM files have matching reference sequences

    Compares the SQ (sequence) headers from two BAM files to ensure they
    have the same references. This is important for comparison analysis
    to ensure reads can be meaningfully compared.

    Args:
        bam_file_a: Path to first BAM file
        bam_file_b: Path to second BAM file

    Returns:
        Dict with validation results:
            - is_valid (bool): True if references match
            - references_a (list): References in file A
            - references_b (list): References in file B
            - missing_in_b (list): References in A but not B
            - missing_in_a (list): References in B but not A
            - matching_count (int): Number of matching references

    Examples:
        >>> from squiggy.utils import validate_sq_headers
        >>> result = validate_sq_headers('align_a.bam', 'align_b.bam')
        >>> if result['is_valid']:
        ...     print("References match!")
        ... else:
        ...     print(f"Missing in B: {result['missing_in_b']}")
    """
    if not os.path.exists(bam_file_a):
        raise FileNotFoundError(f"BAM file A not found: {bam_file_a}")

    if not os.path.exists(bam_file_b):
        raise FileNotFoundError(f"BAM file B not found: {bam_file_b}")

    refs_a = []
    refs_b = []

    try:
        # Get references from file A
        with pysam.AlignmentFile(str(bam_file_a), "rb", check_sq=False) as bam:
            refs_a = list(bam.references)

        # Get references from file B
        with pysam.AlignmentFile(str(bam_file_b), "rb", check_sq=False) as bam:
            refs_b = list(bam.references)

    except Exception as e:
        raise ValueError(f"Error reading BAM files: {str(e)}") from e

    # Find differences
    refs_set_a = set(refs_a)
    refs_set_b = set(refs_b)

    missing_in_b = list(refs_set_a - refs_set_b)
    missing_in_a = list(refs_set_b - refs_set_a)
    matching = list(refs_set_a & refs_set_b)

    is_valid = len(missing_in_a) == 0 and len(missing_in_b) == 0

    return {
        "is_valid": is_valid,
        "references_a": refs_a,
        "references_b": refs_b,
        "missing_in_b": missing_in_b,
        "missing_in_a": missing_in_a,
        "matching_count": len(matching),
    }

index_bam_file

index_bam_file(bam_file)

Generate BAM index file (.bai)

Creates a .bai index file for the given BAM file using pysam.

Parameters:

Name Type Description Default
bam_file

Path to BAM file

required

Raises:

Type Description
FileNotFoundError

If BAM file doesn't exist

Exception

If indexing fails

Source code in squiggy/utils.py
def index_bam_file(bam_file):
    """Generate BAM index file (.bai)

    Creates a .bai index file for the given BAM file using pysam.

    Args:
        bam_file: Path to BAM file

    Raises:
        FileNotFoundError: If BAM file doesn't exist
        Exception: If indexing fails
    """
    if not bam_file or not Path(bam_file).exists():
        raise FileNotFoundError(f"BAM file not found: {bam_file}")

    try:
        pysam.index(str(bam_file))
    except Exception as e:
        raise Exception(f"Failed to index BAM file: {str(e)}") from e

General Utilities

parse_region

parse_region(
    region_str: str,
) -> tuple[str | None, int | None, int | None]

Parse a genomic region string into components

Supports formats: - "chr1" (entire chromosome) - "chr1:1000" (single position) - "chr1:1000-2000" (range) - "chr1:1,000-2,000" (with commas)

Parameters:

Name Type Description Default
region_str str

Region string to parse

required

Returns:

Type Description
str | None

(chromosome, start, end) where start/end are None if not specified.

int | None

Returns (None, None, None) if parsing fails

Examples:

>>> parse_region("chr1")
("chr1", None, None)
>>> parse_region("chr1:1000-2000")
("chr1", 1000, 2000)
Source code in squiggy/utils.py
def parse_region(region_str: str) -> tuple[str | None, int | None, int | None]:
    """Parse a genomic region string into components

    Supports formats:
    - "chr1" (entire chromosome)
    - "chr1:1000" (single position)
    - "chr1:1000-2000" (range)
    - "chr1:1,000-2,000" (with commas)

    Args:
        region_str: Region string to parse

    Returns:
        (chromosome, start, end) where start/end are None if not specified.
        Returns (None, None, None) if parsing fails

    Examples:
        >>> parse_region("chr1")
        ("chr1", None, None)
        >>> parse_region("chr1:1000-2000")
        ("chr1", 1000, 2000)
    """
    if not region_str or not region_str.strip():
        return None, None, None

    region_str = region_str.strip().replace(",", "")  # Remove commas

    # Check for colon (indicates coordinates)
    if ":" in region_str:
        parts = region_str.split(":")
        if len(parts) != 2:
            return None, None, None

        chrom = parts[0].strip()
        coords = parts[1].strip()

        # Check for range (start-end)
        if "-" in coords:
            coord_parts = coords.split("-")
            if len(coord_parts) != 2:
                return None, None, None
            try:
                start = int(coord_parts[0].strip())
                end = int(coord_parts[1].strip())
                return chrom, start, end
            except ValueError:
                return None, None, None
        else:
            # Single position
            try:
                pos = int(coords)
                return chrom, pos, pos
            except ValueError:
                return None, None, None
    else:
        # Just chromosome name
        return region_str.strip(), None, None

reverse_complement

reverse_complement(seq: str) -> str

Return the reverse complement of a DNA sequence.

Parameters:

Name Type Description Default
seq str

DNA sequence string (A, C, G, T, N)

required

Returns:

Type Description
str

Reverse complement sequence

Source code in squiggy/utils.py
def reverse_complement(seq: str) -> str:
    """
    Return the reverse complement of a DNA sequence.

    Args:
        seq: DNA sequence string (A, C, G, T, N)

    Returns:
        Reverse complement sequence
    """
    complement = {"A": "T", "T": "A", "C": "G", "G": "C", "N": "N"}
    return "".join(complement.get(base, base) for base in reversed(seq))

get_test_data_path

get_test_data_path()

Get the path to the bundled test data directory

Returns the path to the squiggy/data directory which contains test/demo files: - yeast_trna_reads.pod5 - yeast_trna_mappings.bam - yeast_trna_mappings.bam.bai - yeast_trna.fa - yeast_trna.fa.fai

Returns:

Name Type Description
str

Path to the squiggy/data directory

Raises:

Type Description
FileNotFoundError

If data directory cannot be found

Examples:

>>> from pathlib import Path
>>> data_dir = Path(get_test_data_path())
>>> pod5_file = data_dir / 'yeast_trna_reads.pod5'
Source code in squiggy/utils.py
def get_test_data_path():
    """Get the path to the bundled test data directory

    Returns the path to the squiggy/data directory which contains test/demo files:
    - yeast_trna_reads.pod5
    - yeast_trna_mappings.bam
    - yeast_trna_mappings.bam.bai
    - yeast_trna.fa
    - yeast_trna.fa.fai

    Returns:
        str: Path to the squiggy/data directory

    Raises:
        FileNotFoundError: If data directory cannot be found

    Examples:
        >>> from pathlib import Path
        >>> data_dir = Path(get_test_data_path())
        >>> pod5_file = data_dir / 'yeast_trna_reads.pod5'
    """
    import importlib.util

    try:
        # Find the squiggy package location
        spec = importlib.util.find_spec("squiggy")
        if spec is None or spec.origin is None:
            raise FileNotFoundError("Could not locate squiggy package")

        package_dir = os.path.dirname(spec.origin)
        data_dir = os.path.join(package_dir, "data")

        # Verify the directory exists
        if not os.path.isdir(data_dir):
            raise FileNotFoundError(f"Data directory not found at {data_dir}")

        return data_dir

    except Exception as e:
        raise FileNotFoundError(f"Test data directory not found. Error: {e}") from None

extract_model_provenance

extract_model_provenance(bam_file: str) -> ModelProvenance

Extract basecalling model information from BAM file @PG headers

The @PG header record contains information about the program used to generate the alignments. For ONT sequencing, this typically includes basecalling information from guppy or dorado.

Parameters:

Name Type Description Default
bam_file str

Path to BAM file

required

Returns:

Type Description
ModelProvenance

ModelProvenance object with extracted metadata

Examples:

>>> from squiggy.utils import extract_model_provenance
>>> provenance = extract_model_provenance('alignments.bam')
>>> print(provenance.model_name)
>>> print(provenance.basecalling_model)
Source code in squiggy/utils.py
def extract_model_provenance(bam_file: str) -> ModelProvenance:
    """
    Extract basecalling model information from BAM file @PG headers

    The @PG header record contains information about the program used to
    generate the alignments. For ONT sequencing, this typically includes
    basecalling information from guppy or dorado.

    Args:
        bam_file: Path to BAM file

    Returns:
        ModelProvenance object with extracted metadata

    Examples:
        >>> from squiggy.utils import extract_model_provenance
        >>> provenance = extract_model_provenance('alignments.bam')
        >>> print(provenance.model_name)
        >>> print(provenance.basecalling_model)
    """
    if not os.path.exists(bam_file):
        raise FileNotFoundError(f"BAM file not found: {bam_file}")

    provenance = ModelProvenance()

    try:
        with pysam.AlignmentFile(str(bam_file), "rb", check_sq=False) as bam:
            # Get @PG header records
            if bam.header and "PG" in bam.header:
                pg_records = bam.header["PG"]

                # PG records are a list of dicts
                if not isinstance(pg_records, list):
                    pg_records = [pg_records]

                # Process each @PG record (usually the last one is the most relevant)
                for pg in pg_records:
                    # Extract program name
                    if "PN" in pg:
                        provenance.model_name = pg["PN"]

                    # Extract version
                    if "VN" in pg:
                        provenance.model_version = pg["VN"]

                    # Extract command line
                    if "CL" in pg:
                        provenance.command_line = pg["CL"]
                        # Try to extract model info from command line
                        # Common patterns: --model, -m, or model name in path
                        cl = pg["CL"]
                        if "--model" in cl or "-m " in cl:
                            # Parse model identifier from command line
                            parts = cl.split()
                            for i, part in enumerate(parts):
                                if part in ("--model", "-m") and i + 1 < len(parts):
                                    provenance.basecalling_model = parts[i + 1]
                                    break

                    # Try to extract kit info from description
                    if "DS" in pg:
                        provenance.flow_cell_kit = pg["DS"]

    except Exception:
        # Return partial provenance if there's an error
        pass

    return provenance

ModelProvenance dataclass

ModelProvenance(
    model_name: str | None = None,
    model_version: str | None = None,
    flow_cell_kit: str | None = None,
    basecalling_model: str | None = None,
    command_line: str | None = None,
)

Metadata about the basecalling model used to generate a dataset

Extracted from BAM file @PG headers, which contain information about the basecalling process and model version.

Attributes:

Name Type Description
model_name str | None

Name of the basecalling model (e.g., "guppy", "dorado")

model_version str | None

Version of the basecalling model

flow_cell_kit str | None

Sequencing kit used

basecalling_model str | None

Specific basecalling model identifier

command_line str | None

Full command line used for basecalling

__repr__

__repr__() -> str

Return informative summary of model provenance

Source code in squiggy/utils.py
def __repr__(self) -> str:
    """Return informative summary of model provenance"""
    parts = []

    if self.model_name:
        parts.append(f"Model: {self.model_name}")

    if self.model_version:
        parts.append(f"v{self.model_version}")

    if self.basecalling_model:
        parts.append(f"({self.basecalling_model})")

    if not parts:
        return "<ModelProvenance: Unknown>"

    return f"<ModelProvenance: {' '.join(parts)}>"

matches

matches(other: ModelProvenance) -> bool

Check if two ModelProvenance instances describe the same model

Source code in squiggy/utils.py
def matches(self, other: "ModelProvenance") -> bool:
    """Check if two ModelProvenance instances describe the same model"""
    if other is None:
        return False

    # Consider a match if both have same model_name and basecalling_model
    # (version differences might be acceptable)
    return (
        self.model_name == other.model_name
        and self.basecalling_model == other.basecalling_model
    )

parse_plot_parameters

parse_plot_parameters(
    mode: str | None = None,
    normalization: str = "ZNORM",
    theme: str = "LIGHT",
)

Parse and validate plot parameters (mode, normalization, theme)

This utility function eliminates duplicate parameter parsing code across plotting functions by centralizing the conversion from string parameters to enum values.

Parameters:

Name Type Description Default
mode str | None

Plot mode string (e.g., "SINGLE", "OVERLAY", "EVENTALIGN", "AGGREGATE"). If None, only normalization and theme are parsed.

None
normalization str

Normalization method string (default: "ZNORM"). Valid values: "NONE", "ZNORM", "MEDIAN", "MAD"

'ZNORM'
theme str

Color theme string (default: "LIGHT"). Valid values: "LIGHT", "DARK"

'LIGHT'

Returns:

Type Description

Dictionary with parsed enum values:

  • "mode": PlotMode enum (if mode parameter provided, else None)
  • "normalization": NormalizationMethod enum
  • "theme": Theme enum

Raises:

Type Description
KeyError

If invalid mode, normalization, or theme string provided

Examples:

>>> params = parse_plot_parameters(mode="SINGLE", normalization="ZNORM", theme="LIGHT")
>>> params["mode"]
<PlotMode.SINGLE: 'SINGLE'>
>>> params = parse_plot_parameters(normalization="MEDIAN", theme="DARK")
>>> params["normalization"]
<NormalizationMethod.MEDIAN: 'MEDIAN'>
Source code in squiggy/utils.py
def parse_plot_parameters(
    mode: str | None = None,
    normalization: str = "ZNORM",
    theme: str = "LIGHT",
):
    """
    Parse and validate plot parameters (mode, normalization, theme)

    This utility function eliminates duplicate parameter parsing code across
    plotting functions by centralizing the conversion from string parameters
    to enum values.

    Args:
        mode: Plot mode string (e.g., "SINGLE", "OVERLAY", "EVENTALIGN", "AGGREGATE").
              If None, only normalization and theme are parsed.
        normalization: Normalization method string (default: "ZNORM").
                       Valid values: "NONE", "ZNORM", "MEDIAN", "MAD"
        theme: Color theme string (default: "LIGHT").
               Valid values: "LIGHT", "DARK"

    Returns:
        Dictionary with parsed enum values:
        - "mode": PlotMode enum (if mode parameter provided, else None)
        - "normalization": NormalizationMethod enum
        - "theme": Theme enum

    Raises:
        KeyError: If invalid mode, normalization, or theme string provided

    Examples:
        >>> params = parse_plot_parameters(mode="SINGLE", normalization="ZNORM", theme="LIGHT")
        >>> params["mode"]
        <PlotMode.SINGLE: 'SINGLE'>

        >>> params = parse_plot_parameters(normalization="MEDIAN", theme="DARK")
        >>> params["normalization"]
        <NormalizationMethod.MEDIAN: 'MEDIAN'>
    """
    from .constants import NormalizationMethod, PlotMode, Theme

    result = {
        "normalization": NormalizationMethod[normalization.upper()],
        "theme": Theme[theme.upper()],
    }

    if mode is not None:
        result["mode"] = PlotMode[mode.upper()]

    return result