"""
pycmplot.io
===========
Functions for loading, validating, and pre-processing GWAS summary statistics
files. Handles delimiter auto-detection (whitespace, tab, comma), gzip
decompression, and resolution of column-name variants to the canonical set
used throughout the package.
The primary entry point for the plotting pipeline is
:func:`get_sumstats_and_merged_sector_list`, which loads all tracks, runs
coordinate liftover when needed, extracts lead SNPs, generates the hits
summary table, and computes the merged Circos sector-size dictionary — all
in a single call.
Notes
-----
This module is called automatically by the command-line entry point and by
:func:`pycmplot._core.main`; most users will not need to import it directly.
It is documented here for users who wish to load and pre-process summary
statistics programmatically before passing them to the plotting functions.
"""
from __future__ import annotations
import csv
import gzip
import sys
import re
import logging
from collections import defaultdict
from pathlib import Path
from typing import Optional
import natsort
import numpy as np
import pandas as pd
from pycmplot.constants import CHROM_ORDER
from pycmplot.stats import get_lead_snps, get_highlight_snps
from pycmplot.annotation import get_hits_summary_table
from pycmplot.resources import ResourceConfig, default_resources
logging.getLogger("matplotlib.font_manager").setLevel(logging.ERROR)
logging.basicConfig(level=logging.INFO, format="[%(levelname)s] %(message)s", stream=sys.stdout)
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# File utilities
# ---------------------------------------------------------------------------
[docs]
def smart_open(file_path: str):
"""Open a plain-text or gzip-compressed file transparently.
Detects gzip compression from the ``.gz`` file suffix; all other paths are
opened as plain text.
Parameters
----------
file_path : str or pathlib.Path
Path to the file to open.
Returns
-------
io.TextIOWrapper or gzip.GzipFile
An open, readable text-mode file object. Must be used as a context
manager (``with smart_open(...) as f: ...``).
Examples
--------
>>> from pycmplot.io import smart_open
>>> with smart_open("HbF.tsv.gz") as f:
... header = f.readline()
"""
path = Path(file_path)
if path.suffix == ".gz":
return gzip.open(file_path, "rt")
return open(file_path, "r")
[docs]
def resolve_delimiter(delim: str) -> str:
"""Map a human-readable delimiter name to its single-character representation.
Parameters
----------
delim : str
A delimiter name — one of ``'space'``, ``'tab'``, ``'comma'``,
``'colon'``, ``'semi-colon'``, ``'semicolon'`` — or a single bare
character (e.g. ``'|'``). Matching is case-insensitive.
Returns
-------
str
The corresponding single-character separator string.
Raises
------
TypeError
If *delim* is not a string.
ValueError
If *delim* is neither a recognised name nor a single character.
Examples
--------
>>> from pycmplot.io import resolve_delimiter
>>> resolve_delimiter("tab")
'\\t'
>>> resolve_delimiter(",")
','
"""
if not isinstance(delim, str):
raise TypeError("Delimiter must be a string.")
mapping = {
"space": " ",
"tab": "\t",
"comma": ",",
"colon": ":",
"semi-colon": ";",
"semicolon": ";",
}
key = delim.strip().lower()
if key in mapping:
return mapping[key]
if len(key) == 1:
return key # allow bare characters like '\t'
raise ValueError(
f"Invalid delimiter '{delim}'. "
"Choose from: space, tab, comma, colon, semi-colon."
)
[docs]
def detect_delimiter(file_path: str, sample_size: int = 5_000):
"""Infer the field delimiter of a summary statistics file automatically.
Reads the first *sample_size* bytes of *file_path* and passes the content
to :class:`csv.Sniffer`. Falls back to a character-frequency heuristic
(testing ``','``, ``'\\t'``, ``' '``, ``';'``, ``'|'``) if
:class:`csv.Sniffer` raises :class:`csv.Error`.
Parameters
----------
file_path : str or pathlib.Path
Path to the summary statistics file. Gzip-compressed files (``.gz``)
are supported transparently via :func:`smart_open`.
sample_size : int, optional
Number of bytes to read for delimiter detection. Default is ``5000``.
Returns
-------
delimiter : str
The inferred single-character field separator (e.g. ``'\\t'``,
``','``, ``' '``).
dialect : csv.Dialect or None
The :class:`csv.Dialect` object returned by :class:`csv.Sniffer`, or
``None`` when the fallback heuristic was used.
Examples
--------
>>> from pycmplot.io import detect_delimiter
>>> delim, dialect = detect_delimiter("HbF.tsv.gz")
>>> delim
'\\t'
"""
with smart_open(file_path) as f:
sample = f.read(sample_size)
try:
dialect = csv.Sniffer().sniff(sample)
return dialect.delimiter, dialect
except csv.Error:
return _fallback_delimiter(sample), None
def _fallback_delimiter(sample: str) -> str:
candidates = [",", "\t", " ", ";", "|"]
counts = {d: sample.count(d) for d in candidates}
best = max(counts, key=counts.get)
if counts[best] == 0:
raise ValueError("Unable to detect delimiter automatically.")
return best
# ------------------------------------------------------------------
# Random string for output paths
# ------------------------------------------------------------------
[docs]
def generate_random_string(length):
"""Generate a random alphanumeric string.
Used internally to create a unique output file-name component when no
``--plot_title`` is provided.
Parameters
----------
length : int
Number of characters in the output string.
Returns
-------
str
Random string drawn from ASCII letters (upper- and lower-case) and
digits (``[A-Za-z0-9]``).
Examples
--------
>>> from pycmplot.io import generate_random_string
>>> s = generate_random_string(10)
>>> len(s)
10
"""
import random
import string
# Combine uppercase, lowercase, and digits
characters = string.ascii_letters + string.digits
# random.choices picks multiple characters with replacement
return ''.join(random.choices(characters, k=length))
# ------------------------------------------------------------------
# Output paths
# ------------------------------------------------------------------
[docs]
def get_output_paths(
labels,
mode: Optional[str] = 'lm',
logp: bool = False,
output_dir: Optional[str] = None,
plot_title: Optional[str] = None,
output_format: Optional[str] = 'png'
):
"""Construct output file paths for the plot image and locus summary table.
Creates *output_dir* (including any missing parent directories) and derives
deterministic, human-readable file names from the plot title, track labels,
plot mode, and y-axis scale.
Parameters
----------
labels : list of str
Track labels joined with underscores in the output file name.
mode : {'lm', 'cm'}, optional
Plot mode: ``'lm'`` for linear Manhattan, ``'cm'`` for circular.
Default is ``'lm'``.
logp : bool, optional
When ``True`` the string ``'_logp'`` is appended to the base name;
otherwise ``'_pval'`` is appended. Default is ``False``.
output_dir : str or pathlib.Path, optional
Directory in which output files will be written. Created with
``mkdir(parents=True, exist_ok=True)`` if it does not already exist.
Default is ``'.'``.
plot_title : str, optional
Human-readable plot title. Non-alphanumeric characters are stripped and
spaces replaced with underscores for safe use in file names. When
``None`` a 10-character random alphanumeric string is used instead.
output_format : str, optional
Image file extension without the leading dot (e.g. ``'png'``, ``'pdf'``,
``'svg'``). Default is ``'png'``.
Returns
-------
plt_name : str
Absolute path to the output plot image file.
table_out : str
Absolute path to the output locus summary table TSV file.
plt_base : str
Absolute path base (no extension) used to derive the QQ-plot output
stems.
Examples
--------
>>> from pycmplot.io import get_output_paths
>>> plt_name, table_out, plt_base = get_output_paths(
... labels=["HbF", "MCV"],
... mode="lm",
... logp=True,
... output_dir="./results",
... plot_title="RBC Traits",
... )
>>> plt_name
'.../results/RBC_Traits_HbF_MCV_lm_logp.png'
"""
if output_dir is None:
output_dir = '.'
out_path = Path(output_dir).resolve()
out_path.mkdir(parents=True, exist_ok=True)
if plot_title:
#pltitle = re.sub(r"[^a-zA-Z0-9\s]", "", plot_title).replace(" ", "_")
pltitle = [ re.sub(r"[^a-zA-Z0-9\s]", "", p) for p in plot_title.split('_') ]
pltitle = '_'.join(pltitle).replace(" ", "_")
else:
pltitle = generate_random_string(10)
#labels = [re.sub(r"[^a-zA-Z0-9\s]", "", x).replace(" ", "_") for x in labels]
labels = [ x.replace(" ", "_") for x in labels ]
suffix = "_logp" if logp else "_pval"
plt_base = str(out_path / f"{pltitle}_{'_'.join(labels)}_{mode.lower()}{suffix}")
plt_name = f"{plt_base}.{output_format.lower()}"
table_out = f"{plt_base}_locus_summary_table.tsv"
return plt_name, table_out, plt_base
# ---------------------------------------------------------------------------
# input formatter
# ---------------------------------------------------------------------------
# ---------------------------------------------------------------------------
# Sector-size helpers
# ---------------------------------------------------------------------------
def _merge_min_max_lists(dicts: list[dict]) -> dict:
"""Merge per-chromosome [min, max] lists across multiple sumstats."""
temp: dict = defaultdict(list)
for d in dicts:
for key, values in d.items():
temp[key].extend(values)
return {k: [min(v), max(v)] for k, v in temp.items()}
# ---------------------------------------------------------------------------
# Memory usage
# ---------------------------------------------------------------------------
def _get_memory_usage(mem_df: int):
if mem_df > 1e6:
df_mem = mem_df / 1e9
unit = 'GB'
else:
df_mem = mem_df / 1e6
unit = 'MB'
if df_mem >= 0.001 and df_mem < 1:
df_mem = df_mem * 1000
unit = 'MB'
if df_mem < 0.001:
df_mem = df_mem * 1000
unit = 'KB'
return f"{df_mem:.3g} {unit}"
# ---------------------------------------------------------------------------
# Density-aware "auto" thinning for Manhattan / circular plotting
# ---------------------------------------------------------------------------
[docs]
def auto_thin_for_manhattan(
df: "pd.DataFrame",
keep_threshold: float = 2.0,
max_below: int = 200_000,
logp: bool = True,
logp_col: str = "logP",
p_col: str = "P",
seed: int = 42,
) -> "pd.DataFrame":
"""Density-aware sub-sampling for Manhattan-style scatter plots.
Inspired by ``gwaslab``'s default behaviour, this helper preserves *every*
variant whose "interestingness" signal is at or above ``keep_threshold``
(so peaks, suggestive hits, genome-wide-significant hits, and extreme
selection-scan values are kept verbatim) and uniformly sub-samples the
dense bulk below the threshold down to at most ``max_below`` rows in
total. For a 10 M-variant scan with the defaults below, this typically
cuts the plotted point count from 10 M to ~200 K + a few hundred
peaks — visually indistinguishable above the suggestive band, but two
orders of magnitude faster to render.
Two modes, switched by *logp*:
* **P-value mode** (*logp=True*, the default). ``signal = -log10(P)``.
``keep_threshold`` is in ``-log10(P)`` units (default ``2.0``,
i.e. ``P <= 0.01``). Variants with ``-log10(P) >= keep_threshold``
are all retained.
* **Raw-statistic mode** (*logp=False*). ``signal = |value|`` of
*p_col* — the column carrying the test statistic. This is the
right mode for non-p-value scans such as iHS, XP-EHH, F_ST,
Fay & Wu's H, Tajima's D, etc., where "interesting" means large
magnitude (positive or negative). ``keep_threshold`` is then in
the units of the underlying statistic (default still ``2.0``,
which is a sensible cutoff for standardised selection scans;
override with e.g. ``0.05`` for F_ST).
Parameters
----------
df : pandas.DataFrame
Input DataFrame. In p-value mode, must contain either *logp_col*
(preferred) or *p_col*. In raw-statistic mode, must contain
*p_col*. When the relevant column is absent, *df* is returned
unchanged.
keep_threshold : float, optional
Threshold above which all variants are retained. Interpreted in
``-log10(P)`` units when *logp=True* (default ``2.0``), or in the
natural units of the underlying statistic when *logp=False*.
max_below : int, optional
Maximum number of below-threshold rows to retain, sampled
uniformly at random. Default ``200_000``.
logp : bool, optional
When ``True`` (default), interpret the data as p-values and use
``-log10(P)`` as the signal. When ``False``, treat *p_col* as a
raw statistic and use ``|value|`` as the signal.
logp_col : str, optional
Name of the precomputed ``-log10(P)`` column for p-value mode.
Default ``'logP'``.
p_col : str, optional
Name of the raw p-value column (p-value mode) or test-statistic
column (raw-statistic mode). Default ``'P'``.
seed : int, optional
Seed for the RNG used to sub-sample the bulk. Default ``42``.
Returns
-------
pandas.DataFrame
Sub-sampled view of *df* preserving the original index ordering.
When the below-threshold count is already <= *max_below*, the
input is returned unchanged.
Examples
--------
GWAS p-values (default):
>>> thinned = auto_thin_for_manhattan(df, keep_threshold=2.0)
iHS / XP-EHH (signed selection statistics, ``|value|`` >= 2):
>>> thinned = auto_thin_for_manhattan(
... df, logp=False, keep_threshold=2.0, p_col="iHS",
... )
F_ST (unsigned, 0–1, outlier cutoff e.g. 0.05):
>>> thinned = auto_thin_for_manhattan(
... df, logp=False, keep_threshold=0.05, p_col="FST",
... )
"""
if df is None or len(df.index) == 0:
return df
if logp:
# p-value mode: use precomputed logP if present, else derive it.
if logp_col in df.columns:
signal = df[logp_col].to_numpy()
elif p_col in df.columns:
with np.errstate(divide="ignore", invalid="ignore"):
signal = -np.log10(df[p_col].to_numpy())
else:
return df
else:
# Raw-statistic mode: |value| of the test-statistic column. Works
# for signed statistics (iHS, XP-EHH, Fay & Wu's H, Tajima's D) as
# well as unsigned ones (F_ST).
if p_col not in df.columns:
return df
signal = np.abs(df[p_col].to_numpy(dtype=float))
above = signal >= keep_threshold
below_idx = np.flatnonzero(~above & np.isfinite(signal))
if below_idx.size <= max_below:
return df
rng = np.random.default_rng(seed)
keep_below = rng.choice(below_idx, size=max_below, replace=False)
keep_mask = above.copy()
keep_mask[keep_below] = True
# Preserve the input DataFrame's positional ordering so chromosomes
# remain sorted as the caller left them.
return df.iloc[np.flatnonzero(keep_mask)].copy()
# ---------------------------------------------------------------------------
# Main loader
# ---------------------------------------------------------------------------
[docs]
def get_sumstats_and_merged_sector_list(
sum_stats: list[str],
labels: list[str],
logp: bool = False,
trim_pval: Optional[float] = None,
file_info: Optional[dict] = None,
sort_tracks: Optional[str] = None,
table_out: Optional[str] = None,
signif_threshold: Optional[float] = None,
signif_line: Optional[float] = None,
suggest_threshold: Optional[float] = None,
highlight: Optional[bool] = False,
highlight_thresh: Optional[float] = 5e-08,
resources: Optional[ResourceConfig] = None,
compute_pvals: bool = True,
auto_thin: bool = True,
auto_thin_threshold: float = 2.0,
auto_thin_max_below: int = 200_000,
):
"""Load summary statistics, run liftover, extract lead SNPs, and compute merged Circos sector sizes.
This is the primary data-loading function for the plotting pipeline.
For each track it reads the file using the column mapping from
``file_info``, optionally filters by ``trim_pval``, normalises
chromosome names (``chr`` prefix stripped; ``23`` to ``X``, ``24`` to
``Y``, ``M`` / ``MTDNA`` to ``MT``), lifts over hg19 coordinates when a
build column is present, and extracts lead SNPs. After all tracks are
loaded it builds the hits summary table, derives significance
thresholds, optionally sorts tracks, and computes the merged
sector-size dict consumed by both plotters.
Parameters
----------
sum_stats : list of str
Paths to summary statistics files (gzip supported).
labels : list of str
Track labels in the same order as *sum_stats*.
logp : bool, optional
If ``True``, a ``logP`` column (–log₁₀(P)) is added to every loaded
DataFrame and used for lead-SNP ranking and threshold-line computation.
Default is ``False``.
trim_pval : float, optional
Drop variants with ``P > trim_pval`` before any further processing.
Strongly recommended for large files (e.g. ``0.01``). Default is
``None`` (no trimming; variants with ``P > 1`` are still removed).
file_info : dict, optional
Column-resolution mapping as returned by
:func:`prep_pycmplot_input_info`. Must be supplied for data to be
loaded.
sort_tracks : {'label', 'chrom_len', None}, optional
Track ordering after loading. ``'label'`` sorts alphabetically;
``'chrom_len'`` sorts by the number of distinct chromosomes (most
chromosomes first). ``None`` preserves input order.
Default is ``'chrom_len'``.
table_out : str, optional
File path at which to write the locus summary table TSV. Passed
through to :func:`~pycmplot.annotation.get_hits_summary_table`.
signif_threshold : float, optional
Genome-wide significance threshold for lead-SNP extraction and the
significance line. When ``None``, computed as
``max(0.05 / N, 5e-8)`` where *N* is the variant count in the last
loaded track; falls back to ``5e-8`` when *trim_pval* is set.
signif_line : float, optional
Explicit significance-line value drawn on the plot. When ``None``,
*signif_threshold* is used. If *logp* is ``True`` and the value is
< 1, it is converted to –log₁₀ scale automatically.
suggest_threshold : float, optional
Suggestive significance threshold for a second dashed line. Defaults
to ``1e-5``.
resources : ResourceConfig, optional
:class:`~pycmplot.resources.ResourceConfig` instance supplying paths to
the liftover chain file and gene-info reference files. Falls back to
:data:`~pycmplot.resources.default_resources`.
Returns
-------
dict
A dictionary with the following keys:
* ``'sectors'`` — ``dict`` mapping ``chromosome → [min_pos, max_pos]``
across all tracks, in natural chromosome order (``'1'``, ``'2'``,
…, ``'X'``, ``'Y'``), with a ``'Spacer1'`` entry appended for
y-axis labelling.
* ``'dfs'`` — ``dict`` mapping ``label → [DataFrame, n_chroms]``.
Each DataFrame contains canonical columns ``CHR``, ``POS``,
``SNP``, ``P``, ``LABEL`` and optionally ``logP``, ``BUILD``,
``OLD_POS``, ``OLD_BUILD`` (when a build column and liftover
were applied).
* ``'annot'`` — :class:`pandas.DataFrame` containing the clumped
locus summary with nearest-gene annotations. Empty when no
variants pass the significance threshold.
* ``'lines'`` — ``list`` of ``{'genome': float, 'suggestive': float}``
dicts, one per track, in the final sorted order.
* ``'pvals'`` — ``dict`` mapping ``label → numpy.ndarray`` of raw
(un-trimmed) p-values for QQ plotting.
See Also
--------
prep_pycmplot_input_info :
Resolves column names and delimiters; its output is passed as
*file_info*.
pycmplot.annotation.get_hits_summary_table :
Gene annotation and distance-based clumping of the locus table.
pycmplot.liftover.liftover_position :
hg19 → hg38 coordinate conversion applied row-wise.
Examples
--------
>>> from pycmplot.io import prep_pycmplot_input_info
>>> from pycmplot.io import get_sumstats_and_merged_sector_list
>>> files = ["HbF.tsv.gz", "MCV.txt.gz"]
>>> labels = ["HbF", "MCV"]
>>> file_info = prep_pycmplot_input_info(files, labels)
>>> result = get_sumstats_and_merged_sector_list(
... sum_stats=files,
... labels=labels,
... logp=True,
... trim_pval=0.01,
... file_info=file_info,
... signif_threshold=5e-8,
... )
>>> sorted(result.keys())
['annot', 'dfs', 'lines', 'pvals', 'sectors']
>>> list(result["sectors"].keys())[:4]
['1', '2', '3', '4']
"""
if resources is None:
resources = default_resources
from pycmplot.liftover import liftover_position
# Build a label → file path mapping
sumstats: dict[str, list] = {
name: [path] for name, path in zip(labels, sum_stats)
}
sumstats_loaded: dict[str, list] = {}
pval_dict: dict[str, np.ndarray | pd.Series] = {}
all_lead_snps: list[pd.DataFrame] = []
for label in sumstats.keys() & (file_info or {}).keys():
sumstat_cols = file_info[label][0]
sumstat_dtypes = file_info[label][1]
sumstat_newcols= file_info[label][2]
sep = file_info[label][3]
build = None
try:
build = file_info[label][4]
except Exception:
pass
logger.info("Loading %s [%s] ...", label, sumstats[label][0])
# Prefer the ``pyarrow`` CSV engine when available — it's typically
# 1.5–3× faster than the default C engine on large GWAS summary
# statistics, especially for files with many numeric columns. The
# ``pyarrow`` engine ignores the ``dtype=`` argument for category
# casts, so we cast the chromosome column to ``Categorical`` after
# the read. Any pyarrow-side failure (missing package, unsupported
# option, etc.) falls back to the default C engine.
read_kwargs = dict(
filepath_or_buffer=sumstats[label][0],
sep=sep,
header=0,
usecols=sumstat_cols,
)
try:
df = pd.read_csv(
**read_kwargs,
engine="pyarrow",
dtype_backend="numpy_nullable",
).rename(columns=sumstat_newcols)
except (ImportError, ValueError, TypeError):
df = pd.read_csv(
**read_kwargs,
dtype=sumstat_dtypes,
).rename(columns=sumstat_newcols)
# Coerce POS to numeric, drop rows that fail to parse, then store
# as plain int64 (not nullable ``Int64``) so downstream arithmetic
# / ``max()`` / categorical groupby reductions cannot leak
# ``pd.NA`` into mixed-type expressions.
df["POS"] = pd.to_numeric(df["POS"], errors="coerce")
df = df.dropna(subset=["POS"]).copy()
df["POS"] = df["POS"].astype("int64")
pre_trim_mem = _get_memory_usage(df.memory_usage(deep=True).sum())
pre_trim_vars = len(df.index)
logger.info("Loaded %s variants from summary stat file, using %s of memory", pre_trim_vars, pre_trim_mem)
# Get dict of p-values for QQ-plotting before applying trim_pval.
# Computing this is only meaningful when a QQ plot will actually be
# rendered downstream; for Manhattan-only or circular-only runs we
# skip the ~80 MB copy at 10 M variants entirely.
if compute_pvals:
logger.info("Extracting raw p-values for QQ-plotting ...")
pval_dict[label] = df["P"].dropna().astype(float).values
else:
pval_dict[label] = None
# Density-aware auto-thinning for Manhattan / circular rendering.
# Applied after lead-SNP extraction so the leads come from the full
# dataset, and after liftover so coordinates are final. Variants
# at or above ``auto_thin_threshold`` (default ``-log10(P) >= 2``)
# are kept verbatim, so all suggestive / significant hits and their
# surrounding LD bumps survive untouched — only the dense null
# background is sub-sampled. Skipped automatically when the
# below-threshold count is already small.
if auto_thin:
n_before = len(df.index)
df = auto_thin_for_manhattan(
df,
keep_threshold=auto_thin_threshold,
max_below=auto_thin_max_below,
logp=logp,
)
n_after = len(df.index)
if n_after < n_before:
signal_desc = (
"-log10(P)" if logp else "|value| of test statistic"
)
logger.info(
"Auto-thinning: %s -> %s variants (kept all %s >= %s; "
"down-sampled below-threshold background to <=%s).",
n_before, n_after, signal_desc,
auto_thin_threshold, auto_thin_max_below,
)
# Add build column if not exist and build supplied
if build:
df['BUILD'] = build
df['BUILD'] = df['BUILD'].astype('category')
# Trim insignificant variants for faster plotting
if trim_pval:
logger.info("Excluding variants with p-value less than %s to speed up Manhattan plotting ...", trim_pval)
df = df[df["P"].astype(float) <= float(trim_pval)]
post_trim_mem = _get_memory_usage(df.memory_usage(deep=True).sum())
post_trim_vars = len(df.index)
logger.info("%s variants remain after trimming, using %s of memory", post_trim_vars, post_trim_mem)
# this breaks with statistics that have both negative and positive values
# such as iHS
#else:
# df = df[df["P"].astype(float) <= 1]
if logp:
logger.info("Adding a 'logP' column ...")
df["logP"] = -np.log10(df["P"])
df["LABEL"] = label
# Normalise chromosome names — done once here and stored as a
# ``Categorical`` with ``CHROM_ORDER`` as the canonical category
# set. Downstream plotting code can recognise this dtype and skip
# repeating the (string-heavy) normalisation, and any aliasing /
# filtering on chromosome name becomes integer-code work rather
# than per-element Python string ops.
#
# Critically, when CHR comes in as a ``Categorical`` (the dtype we
# request in ``prep_pycmplot_input_info`` for any non-build file)
# the actual normalisation is applied to the **categories**, not
# to the underlying N-row code array. That turns a 500K (or 10M)
# per-element ``str.replace + str.upper + replace`` chain into the
# equivalent work on ~25 distinct chromosome labels.
logger.info('Normalizing chromosome names {"23": "X", "24": "Y", "M": "MT", "MTDNA": "MT"} ...')
chr_col_data = df["CHR"]
alias = {"23": "X", "24": "Y", "M": "MT", "MTDNA": "MT"}
if isinstance(chr_col_data.dtype, pd.CategoricalDtype):
# Fast path: rewrite categories, then re-cast to the canonical
# CHROM_ORDER ordered Categorical.
cats = pd.Series(chr_col_data.cat.categories.astype(str))
new_cats = (
cats.str.replace("chr", "", regex=False)
.str.upper()
.replace(alias)
.tolist()
)
chr_col_data = chr_col_data.cat.rename_categories(new_cats)
else:
# Defensive slow path for callers that bypass our dtype hints.
chr_col_data = (
chr_col_data
.astype(str)
.str.replace("chr", "", regex=False)
.str.upper()
.replace(alias)
)
df["CHR"] = pd.Categorical(
chr_col_data, categories=list(CHROM_ORDER), ordered=True
)
# Drop rows whose chromosome label is not in CHROM_ORDER (they
# become NaN under the categorical cast).
before = len(df.index)
df = df[df["CHR"].notna()].copy()
dropped = before - len(df.index)
if dropped:
logger.warning(
"Dropped %s row(s) with chromosome label outside CHROM_ORDER",
dropped,
)
# Liftover hg18/hg19 data if needed.
#
# ``sumstats_loaded[label]`` is not populated until the very end of
# this iteration (line ``sumstats_loaded[label] = [df, n_chroms]``),
# so the result must be assigned to the *local* ``df`` — writing
# into ``sumstats_loaded[label][0]`` here raised ``KeyError`` (e.g.
# ``KeyError: 'MCV'``) the first time the liftover branch fired on
# a given track.
if "BUILD" in df.columns and (
"hg19" in df["BUILD"].unique() or "hg18" in df["BUILD"].unique()
):
builds_present = sorted(
b for b in df["BUILD"].unique() if b in {"hg18", "hg19"}
)
logger.info(
"Converting %s coordinates to hg38 ...", "/".join(builds_present)
)
df = liftover_position(df, resources=resources)
# get highlight SNPs
if highlight:
logger.info("Extracting lead variants and variants to highlight ...")
else:
logger.info("Extracting lead variants ...")
df, leads = get_highlight_snps(
df=df,
window=500_000,
highlight=highlight,
highlight_thresh=highlight_thresh,
logp=logp,
)
## Lead SNPs
#logger.info("Extracting lead variants ...")
#leads = get_lead_snps(
# df=sumstats_loaded[label][0],
# signif_threshold=signif_threshold or 5e-8,
# logp=logp,
#)
all_lead_snps.append(leads)
# Number of distinct chromosomes (for track sorting)
n_chroms = len(df["CHR"].unique()) - 1
sumstats_loaded[label] = [df, n_chroms]
# Combine lead SNPs and filter to significance threshold
all_lead_snps_df = (
pd.concat(all_lead_snps, ignore_index=True).drop_duplicates()
if all_lead_snps
else pd.DataFrame()
)
if not all_lead_snps_df.empty and signif_threshold:
all_lead_snps_df = all_lead_snps_df[
all_lead_snps_df["P"] <= signif_threshold
]
hits_table = (
get_hits_summary_table(
leads_df=all_lead_snps_df,
table_out=table_out,
window_kb=2_000,
resources=resources,
)
if not all_lead_snps_df.empty
else pd.DataFrame()
)
# Derive significance/suggestive thresholds
if not signif_threshold:
if trim_pval:
signif_threshold = 5e-8
elif sumstats_loaded:
last_label = list(sumstats_loaded)[-1]
n = len(sumstats_loaded[last_label][0]["P"])
signif_threshold = max(0.05 / n, 5e-8)
else:
signif_threshold = 5e-8
if not suggest_threshold:
suggest_threshold = 1e-5
suggest_line = suggest_threshold
if logp:
suggest_line = -np.log10(suggest_threshold)
if signif_line is None:
signif_line = signif_threshold
if logp:
signif_line = -np.log10(signif_threshold)
else:
if logp and signif_line < 1:
signif_line = -np.log10(signif_line)
signif_lines = [
{"genome": signif_line, "suggestive": suggest_line}
for _ in sumstats
]
# sort dicts by user-supplied order
sumstats_loaded = {key: sumstats_loaded[key] for key in labels if key in sumstats_loaded}
pval_dict = {key: pval_dict[key] for key in labels if key in pval_dict}
# or sort by user option
if sort_tracks is not None:
if sort_tracks.lower() == "label":
sumstats_loaded = dict(sorted(sumstats_loaded.items()))
else: # chrom_len: most chromosomes first (descending n_chroms)
sumstats_loaded = dict(
sorted(
sumstats_loaded.items(),
key=lambda item: -int(item[1][1]),
)
)
# Compute per-sumstat sector sizes (chrom → [min_pos, max_pos])
assoc_sector_sizes_list: list[dict] = []
min_dic_val = None
logger.info("Computing per-sumstat sector sizes (chrom → [min_pos, max_pos])")
for df, _n in sumstats_loaded.values():
assoc = df[~(df["CHR"].str.len() > 2)].copy()
assoc["POS"] = assoc["POS"].fillna(0).astype(int)
assoc_dic: dict[str, list] = {}
for chrom in assoc["CHR"].unique():
sub = assoc[assoc["CHR"] == chrom]
lo_val = max(sub["POS"].min() - 1_000_000, 0)
hi_val = sub["POS"].max()
# Ensure sector sizes are within chrom ranges if liftover
#chrom_max = hi_val
#if liftover:
# hg38_chr_lengths = {k.replace("chr",""): v for k, v in hg38_chr_lengths.items()}
# chrom_max = hg38_chr_lengths[chrom]
#hi_val = min(hi_val, chrom_max)
assoc_dic[str(chrom)] = [lo_val, hi_val]
min_dic_val = min(assoc_dic.values())
assoc_sector_sizes_list.append(assoc_dic)
merged = _merge_min_max_lists(assoc_sector_sizes_list)
merged = dict(natsort.natsorted(merged.items(), key=lambda item: item[0]))
if "23" in merged:
merged["X"] = merged.pop("23")
# Add spacer sector for y-axis labelling
if min_dic_val is not None:
#if len(labels) <= 5:
# merged["Spacer1"] = [x + x / 2 for x in min_dic_val]
#else:
merged["Spacer1"] = [x * 2 for x in min_dic_val]
return {"sectors": merged, "dfs": sumstats_loaded, "annot": hits_table, "lines": signif_lines, "pvals": pval_dict}