Source code for matchms.similarity.FlashSimilarity

import logging
import multiprocessing as mp
import platform
from collections.abc import Sequence
import numpy as np
from numba import njit
from tqdm import tqdm
from matchms.Scores import Scores
from matchms.typing import SpectrumType
from .BaseSimilarity import BaseSimilarity
from .flash_utils import _build_library_index, _clean_and_weight


logger = logging.getLogger("matchms")


class _BaseFlashSimilarity(BaseSimilarity):
    """Shared base class for Flash-based similarities."""

    is_commutative = True

    def __init__(
        self,
        matching_mode: str = "fragment",           # 'fragment' | 'neutral_loss' | 'hybrid'
        tolerance: float = 0.02,
        use_ppm: bool = False,
        intensity_power: float = 1.0,
        remove_precursor: bool = False,
        precursor_window: float = 1.6,
        noise_cutoff: float = 0.01,
        normalize_to_half: bool = False,
        merge_within: float = 0,
        identity_precursor_tolerance: float | None = None,
        identity_use_ppm: bool = False,
        dtype: np.dtype = np.float64,
    ):
        if matching_mode not in ("fragment", "neutral_loss", "hybrid"):
            raise ValueError("matching_mode must be 'fragment', 'neutral_loss', or 'hybrid'")

        self.matching_mode = matching_mode
        self.tolerance = tolerance
        self.use_ppm = use_ppm
        self.intensity_power = intensity_power
        self.remove_precursor = remove_precursor
        self.precursor_window = precursor_window
        self.noise_cutoff = noise_cutoff
        self.normalize_to_half = normalize_to_half
        self.merge_within = merge_within
        self.identity_precursor_tolerance = identity_precursor_tolerance
        self.identity_use_ppm = identity_use_ppm
        self.dtype = np.dtype(dtype)

    @property
    def _weighing_type(self) -> str:
        raise NotImplementedError

    @property
    def _compute_l2(self) -> bool:
        raise NotImplementedError

    @property
    def _worker(self):
        raise NotImplementedError

    @property
    def _descriptor_name(self) -> str:
        raise NotImplementedError

    def _prepare(self, spectrum: SpectrumType) -> tuple[np.ndarray, float | None]:
        peaks = spectrum.peaks.to_numpy
        pmz = spectrum.metadata.get("precursor_mz", None)
        cleaned = _clean_and_weight(
            peaks,
            pmz,
            intensity_power=self.intensity_power,
            remove_precursor=self.remove_precursor,
            precursor_window=self.precursor_window,
            noise_cutoff=self.noise_cutoff,
            normalize_to_half=self.normalize_to_half,
            merge_within_da=self.merge_within,
            weighing_type=self._weighing_type,
            dtype=self.dtype,
        )
        return cleaned, (None if pmz is None else float(pmz))

    def _build_library(self, spectra_2: Sequence[SpectrumType]):
        lib_proc = []
        lib_pmz = []

        for spectrum_2 in spectra_2:
            peaks = spectrum_2.peaks.to_numpy
            pmz = spectrum_2.metadata.get("precursor_mz", None)
            cleaned = _clean_and_weight(
                peaks,
                pmz,
                intensity_power=self.intensity_power,
                remove_precursor=self.remove_precursor,
                precursor_window=self.precursor_window,
                noise_cutoff=self.noise_cutoff,
                normalize_to_half=self.normalize_to_half,
                merge_within_da=self.merge_within,
                weighing_type=self._weighing_type,
                dtype=self.dtype,
            )
            lib_proc.append(cleaned)
            lib_pmz.append(None if pmz is None else float(pmz))

        compute_nl = self.matching_mode in ("neutral_loss", "hybrid")

        return _build_library_index(
            lib_proc,
            lib_pmz,
            compute_neutral_loss=compute_nl,
            compute_l2_norm=self._compute_l2,
            dtype=self.dtype,
        )

    def _make_worker_cfg(self) -> dict:
        return {
            "tol": float(self.tolerance),
            "use_ppm": bool(self.use_ppm),
            "matching_mode": self.matching_mode,
            "compute_nl": (self.matching_mode in ("neutral_loss", "hybrid")),
            "iden_tol": (
                None
                if self.identity_precursor_tolerance is None
                else float(self.identity_precursor_tolerance)
            ),
            "iden_use_ppm": bool(self.identity_use_ppm),
        }

    def _prepare_row_inputs(self, spectra_1: Sequence[SpectrumType]):
        row_inputs = []
        for i, spectrum_1 in enumerate(spectra_1):
            peaks_1, pmz_1 = self._prepare(spectrum_1)
            row_inputs.append((i, peaks_1, pmz_1))
        return row_inputs

    def _run_row_workers(
        self,
        row_inputs,
        lib,
        cfg,
        progress_bar: bool,
        n_jobs: int,
        descriptor: str,
    ):
        _set_globals(lib, cfg)
        worker = self._worker
        n_rows = len(row_inputs)

        results = []

        if n_jobs in (None, 1, 0):
            for item in tqdm(
                row_inputs,
                total=n_rows,
                desc=descriptor + " (matrix)",
                disable=not progress_bar,
            ):
                results.append(worker(item))
            return results

        # Windows fallback
        if platform.system() == "Windows":
            print(
                f"{self.__class__.__name__}.matrix: n_jobs != 1 is not yet implemented on Windows; "
                "falling back to n_jobs=1."
            )
            for item in tqdm(
                row_inputs,
                total=n_rows,
                desc=descriptor + " (matrix)",
                disable=not progress_bar,
            ):
                results.append(worker(item))
            return results

        n_cpus = mp.cpu_count()
        if n_jobs < 0:
            n_jobs = max(1, n_cpus + 1 + n_jobs)
        n_jobs = max(1, min(n_jobs, n_cpus))

        start_methods = mp.get_all_start_methods()
        use_fork = ("fork" in start_methods) and (platform.system() != "Windows")

        if not use_fork:
            print(
                f"{self.__class__.__name__}.matrix: parallel execution requires 'fork'; "
                "falling back to n_jobs=1."
            )
            for item in tqdm(
                row_inputs,
                total=n_rows,
                desc=descriptor + " (matrix)",
                disable=not progress_bar,
            ):
                results.append(worker(item))
            return results

        ctx = mp.get_context("fork")
        with ctx.Pool(processes=n_jobs) as pool:
            for result in tqdm(
                pool.imap(worker, row_inputs, chunksize=8),
                total=n_rows,
                desc=descriptor + f" (parallel x{n_jobs})",
                disable=not progress_bar,
            ):
                results.append(result)

        return results


[docs] class FlashEntropy(_BaseFlashSimilarity): """ Flash entropy similarity (Li & Fiehn, 2023) with a fast .matrix() that builds a library-wide index over 'queries' and streams all 'references' through it. Key options: - matching_mode: 'fragment', 'neutral_loss', or 'hybrid' (fragment-priority). - tolerance in Da or symmetric ppm (use_ppm=True). - cleanup: remove precursor & > (precursor_mz - 1.6), 1% noise removal, entropy weighting, normalize ∑I' = 0.5, optional within-peak merge. Notes: - .pair() works but is not the fast path. Use .matrix(). - For identity-search behavior, pass identity_precursor_tolerance (Da or ppm). Parameters ---------- matching_mode: Matching mode: 'fragment', 'neutral_loss', or 'hybrid' (default is 'fragment'). tolerance: Matching tolerance in Da or ppm (use_ppm=True). Default is 0.02. use_ppm: If True, interpret `tolerance` as parts-per-million. Default is False. remove_precursor: If True, remove precursor peak and peaks within precursor_window. Default is False. precursor_window: If remove_precursor is True, remove peaks within this window around the precursor m/z. Default is 1.6 Da (as suggested by Li & Fiehn(2023)). noise_cutoff: If > 0, remove peaks with intensities below this fraction of the maximum intensity. Default is 0.01 (1%). normalize_to_half: If True, normalize intensities such that the sum of intensities is 0.5. Default is True. merge_within: If > 0, merge peaks within this distance (in Da) to a single peak. Default is 0. identity_precursor_tolerance: If not None, enforce identity search behavior by requiring the precursor m/z of the query to be within this tolerance of the reference precursor m/z. identity_use_ppm: If True, interpret `identity_precursor_tolerance` as ppm. Default is False. dtype: Data type for the output scores. Default is np.float64 which properly accounts for highest resolution MS/MS data (even far beyond current MS/MS possibilties!). To save memory, np.float32 can be used instead, which is sufficient for peak resolutions up to about 8,000,000. """ is_commutative = True score_datatype = np.float32 score_fields = ("score",)
[docs] def __init__(self, *args, normalize_to_half: bool = True, **kwargs): super().__init__( *args, normalize_to_half=normalize_to_half, **kwargs, )
@property def _weighing_type(self) -> str: return "entropy" @property def _compute_l2(self) -> bool: return False @property def _worker(self): return _row_task_entropy @property def _descriptor_name(self) -> str: return f"Flash entropy ({self.matching_mode})"
[docs] def pair(self, spectrum_1: SpectrumType, spectrum_2: SpectrumType) -> np.ndarray: """ Compute Flash Entropy for a single (reference, query) pair. Uses the same preprocessing and scoring logic as the matrix path, but builds a tiny 1-spectrum library from the query. Careful: This is not the fast intended use; better .matrix() instead. """ logger.warning("This is not the fast intended use; better use .matrix() instead.") # Preprocess both spectra peaks_1, pmz_1 = self._prepare(spectrum_1) peaks_2, pmz_2 = self._prepare(spectrum_2) if peaks_1.size == 0 or peaks_2.size == 0: return np.asarray(0.0, dtype=self.dtype) compute_nl = self.matching_mode in ("neutral_loss", "hybrid") lib = _build_library_index( [peaks_2], [pmz_2], compute_neutral_loss=compute_nl, compute_l2_norm=False, dtype=self.dtype, ) _set_globals(lib, self._make_worker_cfg()) _, row = _row_task_entropy((0, peaks_1, pmz_1)) return np.asarray(row[0], dtype=self.dtype)
# FAST + PARALLEL score matrix computation
[docs] def matrix( self, spectra_1: Sequence[SpectrumType], spectra_2: Sequence[SpectrumType] | None = None, score_fields: Sequence[str] | None = None, progress_bar: bool = True, n_jobs: int = -1, ): """ Calculate matrix of Flash Entropy scores. Parameters ---------- spectra_1 First collection of input spectra. spectra_2 Second collection of input spectra. If None, compare `spectra_1` against itself. score_fields Requested score fields. Only ``("score",)`` is supported. progress_bar When True, show a progress bar. n_jobs Number of parallel jobs to run. Default is -1, which means that all available CPUs minus one will be used. Returns ------- Scores Dense score matrix as a ``Scores`` object. """ selected_fields = self._resolve_score_fields(score_fields) if selected_fields != ("score",): raise NotImplementedError( "FlashEntropy.matrix() supports only score_fields=('score',)." ) spectra_2, is_symmetric = self._prepare_inputs(spectra_1, spectra_2) n_rows = len(spectra_1) n_cols = len(spectra_2) if is_symmetric and n_rows != n_cols: raise ValueError("Self-comparison requires same number of rows and columns.") lib = self._build_library(spectra_2) row_inputs = self._prepare_row_inputs(spectra_1) results = self._run_row_workers( row_inputs=row_inputs, lib=lib, cfg=self._make_worker_cfg(), progress_bar=progress_bar, n_jobs=n_jobs, descriptor=self._descriptor_name, ) out_score = np.zeros((n_rows, n_cols), dtype=self.dtype) for row_idx, row_score in results: out_score[row_idx, :] = row_score return Scores({"score": out_score.astype(self.score_datatype, copy=False)})
[docs] class CosineFlash(_BaseFlashSimilarity): """ Flash Cosine similarity following the original Flash Entropy (Li & Fiehn, 2023) with a fast .matrix() that builds a library-wide index over 'queries' and streams all 'references' through it. This corresponds to the "CosineGreedy" scoring logic but with the same fast Flash path as Flash Entropy. Key options: - matching_mode: 'fragment', 'neutral_loss', or 'hybrid' (fragment-priority). - tolerance in Da or symmetric ppm (use_ppm=True). - cleanup: remove precursor & > (precursor_mz - 1.6), 1% noise removal, entropy weighting, normalize ∑I' = 0.5, optional within-peak merge. Notes: - .pair() works but is not the fast path. Use .matrix(). - For identity-search behavior, pass identity_precursor_tolerance (Da or ppm). Parameters ---------- matching_mode: Matching mode: 'fragment', 'neutral_loss', or 'hybrid' (default is 'fragment'). tolerance: Matching tolerance in Da or ppm (use_ppm=True). Default is 0.02. use_ppm: If True, interpret `tolerance` as parts-per-million. Default is False. intensity_power: The power to raise intensity to in the cosine function. The default is 1 (no weighting). remove_precursor: If True, remove precursor peak and peaks within precursor_window. Default is False. precursor_window: If remove_precursor is True, remove peaks within this window around the precursor m/z. Default is 1.6 Da (as suggested by Li & Fiehn(2023)). noise_cutoff: If > 0, remove peaks with intensities below this fraction of the maximum intensity. Default is 0.01 (1%). normalize_to_half: If True, normalize intensities such that the sum of intensities is 0.5. Default is False. merge_within: If > 0, merge peaks within this distance (in Da) to a single peak. Default is 0. identity_precursor_tolerance: If not None, enforce identity search behavior by requiring the precursor m/z of the query to be within this tolerance of the reference precursor m/z. identity_use_ppm: If True, interpret `identity_precursor_tolerance` as ppm. Default is False. dtype: Data type for the output scores. Default is np.float64 which properly accounts for highest resolution MS/MS data (even far beyond current MS/MS possibilties!). To save memory, np.float32 can be used instead, which is sufficient for peak resolutions up to about 8,000,000. """ score_fields = ("score", "matches")
[docs] def __init__(self, *args, dtype: np.dtype = np.float64, **kwargs): super().__init__(*args, dtype=dtype, **kwargs) self.score_datatype = np.dtype( [("score", self.dtype), ("matches", np.int32)] )
@property def _weighing_type(self) -> str: return "cosine" @property def _compute_l2(self) -> bool: return True @property def _worker(self): return _row_task_cosine @property def _descriptor_name(self) -> str: return f"Flash cosine ({self.matching_mode})"
[docs] def pair(self, spectrum_1: SpectrumType, spectrum_2: SpectrumType) -> np.ndarray: logger.warning("This is not the fast intended use; better use .matrix() instead.") peaks_1, pmz_1 = self._prepare(spectrum_1) peaks_2, pmz_2 = self._prepare(spectrum_2) if peaks_1.size == 0 or peaks_2.size == 0: return np.asarray((0.0, 0), dtype=self.score_datatype) compute_nl = self.matching_mode in ("neutral_loss", "hybrid") lib = _build_library_index( [peaks_2], [pmz_2], compute_neutral_loss=compute_nl, compute_l2_norm=True, dtype=self.dtype, ) _set_globals(lib, self._make_worker_cfg()) _, row_score, row_matches = _row_task_cosine((0, peaks_1, pmz_1)) return np.asarray((row_score[0], row_matches[0]), dtype=self.score_datatype)
[docs] def matrix( self, spectra_1: Sequence[SpectrumType], spectra_2: Sequence[SpectrumType] | None = None, score_fields: Sequence[str] | None = None, progress_bar: bool = True, n_jobs: int = -1, ): """ Calculate matrix of Flash Cosine scores. Parameters ---------- spectra_1 First collection of input spectra. spectra_2 Second collection of input spectra. If None, compare `spectra_1` against itself. score_fields Requested score fields. Only ``("score",)`` is supported. progress_bar When True, show a progress bar. n_jobs Number of parallel jobs to run. Default is -1, which means that all available CPUs minus one will be used. Returns ------- Scores Dense score matrix as a ``Scores`` object. """ selected_fields = self._resolve_score_fields(score_fields) spectra_2, is_symmetric = self._prepare_inputs(spectra_1, spectra_2) n_rows = len(spectra_1) n_cols = len(spectra_2) if is_symmetric and n_rows != n_cols: raise ValueError("Self-comparison requires same number of rows and columns.") lib = self._build_library(spectra_2) row_inputs = self._prepare_row_inputs(spectra_1) results = self._run_row_workers( row_inputs=row_inputs, lib=lib, cfg=self._make_worker_cfg(), progress_bar=progress_bar, n_jobs=n_jobs, descriptor=self._descriptor_name, ) out_score = np.zeros((n_rows, n_cols), dtype=self.dtype) out_matches = np.zeros((n_rows, n_cols), dtype=np.int32) for row_idx, row_score, row_matches in results: out_score[row_idx, :] = row_score out_matches[row_idx, :] = row_matches result = {} if "score" in selected_fields: result["score"] = out_score.astype(self.dtype, copy=False) if "matches" in selected_fields: result["matches"] = out_matches return Scores(result)
# ===================== worker plumbing ===================== # Globals visible to workers _G_LIB = None _G_CFG = None def _set_globals(lib_obj, cfg): """ Install the shared library index and constant config in module-level globals. Worker functions read from these globals to avoid re-pickling large arrays for every work item (especially efficient with fork on Unix). """ global _G_LIB, _G_CFG _G_LIB = lib_obj _G_CFG = cfg # ====================== Numba-accelerated helpers ==================== @njit(cache=True, nogil=True) def _search_spec_in_fragment_window(lib_mz: np.ndarray, mz: float, tol: float, use_ppm: bool): """ Find the range of indices in lib_mz that fall within the tolerance window of mz. Returns (a, b) such that lib_mz[a:b] are the candidates. """ mz_tolerance = _search_window_halfwidth_nb(mz, tol, use_ppm) lo_mz = mz - mz_tolerance hi_mz = mz + mz_tolerance a = np.searchsorted(lib_mz, lo_mz, side="left") b = np.searchsorted(lib_mz, hi_mz, side="right") return a, b @njit(cache=True, nogil=True) def _search_window_halfwidth_nb(m: float, tol: float, use_ppm: bool) -> float: """ Compute half-width of a symmetric search window around mass ``m``. If ``use_ppm`` is False: returns ``tol`` (Da half-width). If ``use_ppm`` is True: uses symmetric-ppm definition: |m2 - m1| <= tol[ppm] * 1e-6 * 0.5 * (m1 + m2) which corresponds to a half-window of approximately (tol * 1e-6 * mass), corrected for symmetry. Notes ----- This helper is only for search-window bounds. Candidate matches are still filtered by exact checks in :func:`_within_tol_nb`. """ if not use_ppm: return tol c = tol * 1e-6 denom = 1.0 - 0.5 * c return (c * m) / denom if denom > 0.0 else (c * m * 2.0) @njit(cache=True, nogil=True) def _xlog2_scalar_nb(x: float) -> float: """ Numerically stable x * log2(x) for scalar x with x <= 0 mapped to 0. """ if x <= 0.0: return 0.0 return x * np.log2(x) @njit(cache=True, nogil=True) def _within_tol_nb(m1: float, m2: float, tol: float, use_ppm: bool) -> bool: """ Numba-fied symmetric tolerance check. Symmetric ppm definition: |m2 - m1| <= tol[ppm] * 1e-6 * 0.5 * (m1 + m2) Otherwise absolute Da tolerance: |m2 - m1| <= tol This is the exact tolerance predicate used to trim search windows produced by :func:`_search_window_halfwidth_nb`. """ if use_ppm: return abs(m2 - m1) <= (tol * 1e-6) * 0.5 * (m1 + m2) return abs(m2 - m1) <= tol @njit(cache=True, nogil=True) def _gather_fragment_candidate_cols_numba( query_mz: np.ndarray, query_intensity: np.ndarray, lib_mz: np.ndarray, lib_spec_index: np.ndarray, tol: float, use_ppm: bool, n_cols: int, ) -> np.ndarray: """ Collect candidate library spectrum ids for fragment matching. Parameters ---------- query_mz, query_intensity : np.ndarray Reference spectrum peaks for one matrix row. ``query_mz`` must be sorted ascending. Non-positive intensities are ignored. lib_mz : np.ndarray Global library product m/z values sorted ascending. lib_spec_index : np.ndarray Spectrum id for each entry in ``lib_mz``. tol : float Fragment tolerance in Da or ppm. use_ppm : bool Whether ``tol`` is interpreted as ppm. n_cols : int Total number of library spectra. Returns ------- np.ndarray[int64] Unique candidate spectrum ids with at least one fragment inside tolerance. Order is first-seen scan order (not sorted). Notes ----- Candidate windows use ``_search_spec_in_fragment_window`` and are trimmed with exact ``_within_tol_nb`` checks. """ if n_cols == 0: return np.empty(0, dtype=np.int64) seen = np.zeros(n_cols, dtype=np.uint8) candidate_cols = np.empty(n_cols, dtype=np.int64) n_candidates = 0 for q_idx in range(query_mz.shape[0]): Iq = float(query_intensity[q_idx]) if Iq <= 0.0: continue mz_q = float(query_mz[q_idx]) a, b = _search_spec_in_fragment_window(lib_mz, mz_q, tol, use_ppm) if a >= b: continue for j in range(a, b): if not _within_tol_nb(mz_q, float(lib_mz[j]), tol, use_ppm): continue col = int(lib_spec_index[j]) if seen[col] == 0: seen[col] = 1 candidate_cols[n_candidates] = col n_candidates += 1 return candidate_cols[:n_candidates] @njit(cache=True, nogil=True) def _accumulate_fragment_row_pairwise_numba( scores: np.ndarray, query_mz: np.ndarray, query_intensity: np.ndarray, candidate_cols: np.ndarray, spec_offsets: np.ndarray, spec_mz: np.ndarray, spec_intensity: np.ndarray, tol: float, use_ppm: bool, ) -> None: """ One-to-one entropy accumulation against selected library spectra. This follows the same two-pointer matching style as ms_entropy pairwise: peaks are sorted by m/z and each peak can be used at most once. Parameters ---------- scores : np.ndarray Dense row buffer of length ``n_library_spectra``. Updated in place. query_mz, query_intensity : np.ndarray Reference spectrum peaks for this row. ``query_mz`` must be sorted. candidate_cols : np.ndarray[int64] Spectrum ids that should be scored for this row. spec_offsets : np.ndarray[int64] Prefix offsets into ``spec_mz/spec_int`` (one slice per spectrum). spec_mz, spec_intensity : np.ndarray Spectrum-major product arrays from the library index. tol : float Fragment tolerance in Da or ppm. use_ppm : bool Whether ``tol`` is interpreted as ppm. Notes ----- Non-positive intensities are skipped before attempting a match, so they do not consume one-to-one pointer advancement. """ for candidate_idx in range(candidate_cols.shape[0]): col = int(candidate_cols[candidate_idx]) a = 0 b = int(spec_offsets[col]) b_end = int(spec_offsets[col + 1]) if b >= b_end: continue score = 0.0 while a < query_mz.shape[0] and b < b_end: Iq = float(query_intensity[a]) if Iq <= 0.0: a += 1 continue Ilib = float(spec_intensity[b]) if Ilib <= 0.0: b += 1 continue mz_q = float(query_mz[a]) mz_lib = float(spec_mz[b]) diff = mz_q - mz_lib max_allowed_mass_difference = _search_window_halfwidth_nb(mz_q, tol, use_ppm) if diff < -max_allowed_mass_difference: a += 1 continue if diff > max_allowed_mass_difference: b += 1 continue # Keep exact tolerance trimming consistent with the rest of this module. if _within_tol_nb(mz_q, mz_lib, tol, use_ppm): score += _xlog2_scalar_nb(Ilib + Iq) - _xlog2_scalar_nb(Iq) - _xlog2_scalar_nb(Ilib) a += 1 b += 1 elif diff < 0.0: a += 1 else: b += 1 scores[col] = score @njit(cache=True, nogil=True) def _accumulate_fragment_row_numba( scores: np.ndarray, query_mz: np.ndarray, query_intensity: np.ndarray, lib_mz: np.ndarray, lib_intensity: np.ndarray, lib_spec_index: np.ndarray, tol: float, use_ppm: bool ) -> None: """ Accumulate fragment-based contributions for a single row (one reference spectrum). For each query peak (m/z, Iq), find all library peaks whose m/z lies within the symmetric-ppm/Da tolerance window around the query m/z. For each match, add the incremental entropy term to `scores[col]`, where `col` is the index of the library spectrum that the matched library peak belongs to. Parameters ---------- scores : float[dtype], shape (n_library_spectra,) Output buffer, accumulated in-place. query_mz, query_intensity : float[dtype] Peaks of the reference spectrum being compared (this row). lib_mz, lib_intensity : float[dtype] Global concatenated *query* (library) product peaks, sorted by m/z. lib_spec_index : int32 For each entry of lib_mz, the source spectrum index (column) in the library. tol : float Mass tolerance (Da or ppm). use_ppm : bool Whether to treat `tol` as ppm. """ n_q = query_mz.shape[0] for q_idx in range(n_q): mz_q = float(query_mz[q_idx]) Iq = float(query_intensity[q_idx]) if Iq <= 0.0: continue a, b = _search_spec_in_fragment_window(lib_mz, mz_q, tol, use_ppm) if a >= b: continue for j in range(a, b): mz_lib = float(lib_mz[j]) if use_ppm: if abs(mz_lib - mz_q) > (tol * 1e-6) * 0.5 * (mz_lib + mz_q): continue else: if abs(mz_lib - mz_q) > tol: continue Ilib = float(lib_intensity[j]) col = int(lib_spec_index[j]) incr = _xlog2_scalar_nb(Ilib + Iq) - _xlog2_scalar_nb(Iq) - _xlog2_scalar_nb(Ilib) scores[col] += incr @njit(cache=True, nogil=True) def _in_any_fragment_window(prod_idx: int, prod_min: np.ndarray, prod_max: np.ndarray) -> bool: # true if prod_idx falls into ANY [prod_min[k], prod_max[k]) interval # (arrays come precomputed; when prefer_fragments=False, caller passes size 0 arrays) for k in range(prod_min.shape[0]): if prod_idx >= prod_min[k] and prod_idx < prod_max[k]: return True return False @njit(cache=True, nogil=True) def _spec_in_fragment_window(cols_target: int, peaks_spec: np.ndarray, ap: int, bp: int) -> bool: # true if cols_target appears in peaks_spec[ap:bp] for t in range(ap, bp): if int(peaks_spec[t]) == cols_target: return True return False @njit(cache=True, nogil=True) def _accumulate_nl_row_numba(scores: np.ndarray, ref_mz: np.ndarray, ref_int: np.ndarray, ref_pmz_val: float, nl_mz: np.ndarray, nl_int: np.ndarray, nl_spec: np.ndarray, nl_prod_idx: np.ndarray, peaks_mz: np.ndarray, peaks_spec: np.ndarray, tol: float, use_ppm: bool, prefer_fragments: bool, prod_min: np.ndarray, prod_max: np.ndarray) -> None: """ Accumulate neutral-loss (NL) contributions for a single row (one reference spectrum). For each reference peak of m/z = m_ref with intensity Iq: - Compute the loss L = precursor_ref - m_ref. - Find all library neutral-loss peaks within tolerance of L using nl_mz (sorted). - For each candidate: * If hybrid mode (prefer_fragments=True), apply two pruning rules: RULE 1: Drop if the candidate's corresponding product-peak index lies inside ANY fragment window of the current reference spectrum. RULE 2: Drop if the current reference *fragment* also matches the same library spectrum (i.e., we prefer fragment matches over NL matches). * Add the entropy increment to scores[col]. Arrays ------ nl_* arrays are built from the library (query) spectra with known precursors. nl_product_pos maps each NL entry back to a position in the global product-peak arrays; this is used by hybrid rules. """ if not (nl_mz.size > 0 and ref_mz.size > 0): return if np.isnan(ref_pmz_val): return for i in range(ref_mz.shape[0]): Iq = float(ref_int[i]) if Iq <= 0.0: continue loss = float(ref_pmz_val) - float(ref_mz[i]) a, b = _search_spec_in_fragment_window(nl_mz, loss, tol, use_ppm) if a >= b: continue # (Hybrid) fragment window for this reference peak ap = 0 bp = 0 if prefer_fragments: mz1 = float(ref_mz[i]) ap, bp = _search_spec_in_fragment_window(peaks_mz, mz1, tol, use_ppm) for j in range(a, b): nl2 = float(nl_mz[j]) # symmetric ppm / Da check if use_ppm: if abs(nl2 - loss) > (tol * 1e-6) * 0.5 * (nl2 + loss): continue else: if abs(nl2 - loss) > tol: continue lib = float(nl_int[j]) col = int(nl_spec[j]) if prefer_fragments: # RULE 1: drop if product peak falls in ANY fragment window of the query spectrum if _in_any_fragment_window(int(nl_prod_idx[j]), prod_min, prod_max): continue # RULE 2 (per-peak): if this query peak also fragment-matches the same library spectrum, drop if ap < bp and _spec_in_fragment_window(col, peaks_spec, ap, bp): continue incr= _xlog2_scalar_nb(lib + Iq) - _xlog2_scalar_nb(Iq) - _xlog2_scalar_nb(lib) scores[col] += incr def _row_task_entropy(args): """ Compute one matrix row for a given reference spectrum. Parameters ---------- args : tuple (row_index, peaks_array, precursor_mz_or_None) Returns ------- (row_index, row_scores) `row_scores` is a dense vector of length n_library_spectra in the same float dtype as the index. This function is safe to call from a process pool since it only reads globals set by `_set_globals`. """ row_idx, ref_peaks, ref_pmz = args cfg = _G_CFG lib = _G_LIB dtype = lib.dtype scores = np.zeros(lib.n_specs, dtype=dtype) if cfg["matching_mode"] == "fragment": # Fragment mode uses a two-step algorithm: # 1) gather candidate columns from the global m/z-sorted index, # 2) run one-to-one pairwise scoring only for those spectra using the # spectrum-major index layout (spec_offsets/spec_mz/spec_int). candidate_cols = _gather_fragment_candidate_cols_numba( ref_peaks[:, 0], ref_peaks[:, 1], lib.peaks_mz, lib.peaks_spec_idx, float(cfg["tol"]), bool(cfg["use_ppm"]), lib.n_specs, ) _accumulate_fragment_row_pairwise_numba( scores, ref_peaks[:, 0], ref_peaks[:, 1], candidate_cols, lib.spec_offsets, lib.spec_mz, lib.spec_int, float(cfg["tol"]), bool(cfg["use_ppm"]), ) else: _accumulate_fragment_row_numba(scores, ref_peaks[:, 0], ref_peaks[:, 1], lib.peaks_mz, lib.peaks_int, lib.peaks_spec_idx, float(cfg["tol"]), bool(cfg["use_ppm"])) # neutral-loss / hybrid ONLY if library has NL view if cfg["compute_nl"]: prefer_frag = (cfg["matching_mode"] == "hybrid") if prefer_frag: prod_min = np.empty(ref_peaks.shape[0], dtype=np.int64) prod_max = np.empty(ref_peaks.shape[0], dtype=np.int64) for k in range(ref_peaks.shape[0]): # <-- use k mz1 = float(ref_peaks[k, 0]) a, b = _search_spec_in_fragment_window(lib.peaks_mz, mz1, float(cfg["tol"]), bool(cfg["use_ppm"])) prod_min[k] = a prod_max[k] = b else: prod_min = np.empty(0, dtype=np.int64) prod_max = np.empty(0, dtype=np.int64) ref_pmz_val = float(ref_pmz) if (ref_pmz is not None) else np.nan _accumulate_nl_row_numba(scores, ref_peaks[:, 0], ref_peaks[:, 1], ref_pmz_val, lib.nl_mz, lib.nl_int, lib.nl_spec_idx, lib.nl_product_idx, lib.peaks_mz, lib.peaks_spec_idx, float(cfg["tol"]), bool(cfg["use_ppm"]), prefer_frag, prod_min, prod_max) # identity mask if cfg["iden_tol"] is not None and ref_pmz is not None: if cfg["iden_use_ppm"]: allow = np.abs(lib.precursor_mz - ref_pmz) <= (cfg["iden_tol"] * 1e-6 * 0.5 * (lib.precursor_mz + ref_pmz)) else: allow = np.abs(lib.precursor_mz - ref_pmz) <= cfg["iden_tol"] allow &= np.isfinite(lib.precursor_mz) scores[~allow] = 0.0 return (row_idx, scores) # ===================== Cosine related helpers ===================== @njit(cache=True, nogil=True) def _precursor_shift_is_effectively_zero_nb( ref_pmz: float, lib_pmz: float, tol: float, use_ppm: bool, ) -> bool: if np.isnan(ref_pmz) or np.isnan(lib_pmz): return False return _within_tol_nb(ref_pmz, lib_pmz, tol, use_ppm) @njit(cache=True, nogil=True) def _count_candidates_per_col_nb( query_mz: np.ndarray, query_int: np.ndarray, has_pmz: bool, query_pmz: float, peaks_mz: np.ndarray, peaks_spec_idx: np.ndarray, nl_mz: np.ndarray, nl_spec_idx: np.ndarray, nl_prod_idx: np.ndarray, lib_precursor_mz: np.ndarray, tol: float, use_ppm: bool, do_frag: bool, do_nl: bool, n_cols: int ) -> np.ndarray: """ Count (fragment + neutral loss) candidate pairs per column (library spectrum). Returns ------- counts_by_col : int32[n_cols] Number of candidate (i,j) pairs that belong to each library spectrum (column). """ counts = np.zeros(n_cols, dtype=np.int32) # Fragment matches if do_frag: for i in range(query_mz.shape[0]): Iq = float(query_int[i]) if Iq <= 0.0: continue mz = float(query_mz[i]) a, b = _search_spec_in_fragment_window(peaks_mz, mz, tol, use_ppm) for j in range(a, b): # exact symmetric check to trim the searchsorted window if _within_tol_nb(mz, float(peaks_mz[j]), tol, use_ppm): col = int(peaks_spec_idx[j]) counts[col] += 1 # Neutral-loss matches if do_nl and has_pmz and (nl_mz.size > 0): for i in range(query_mz.shape[0]): Iq = float(query_int[i]) if Iq <= 0.0: continue loss = query_pmz - float(query_mz[i]) a, b = _search_spec_in_fragment_window(nl_mz, loss, tol, use_ppm) for k in range(a, b): if not _within_tol_nb(loss, float(nl_mz[k]), tol, use_ppm): continue col = int(nl_spec_idx[k]) # Hybrid ModifiedCosine compatibility: # if precursor masses are within tolerance, the shifted/NL branch # would duplicate fragment matching and should be ignored. if do_frag: lib_pmz = float(lib_precursor_mz[col]) if _precursor_shift_is_effectively_zero_nb(query_pmz, lib_pmz, tol, use_ppm): continue counts[col] += 1 return counts @njit(cache=True, nogil=True) def _fill_candidates_per_col_nb( query_mz: np.ndarray, query_int: np.ndarray, has_pmz: bool, qpmz: float, peaks_mz: np.ndarray, peaks_int: np.ndarray, peaks_spec_idx: np.ndarray, nl_mz: np.ndarray, nl_spec_idx: np.ndarray, nl_prod_idx: np.ndarray, lib_precursor_mz: np.ndarray, tol: float, use_ppm: bool, do_frag: bool, do_nl: bool, col_offsets: np.ndarray, # int64, length n_cols+1 counts_by_col: np.ndarray # int64, length n_cols ) -> tuple: """ Materialize all candidates in CSR-like form grouped by column. Parameters ---------- col_offsets : int64[n_cols+1] Prefix sums of counts (0, c0, c0+c1, ...). counts_by_col : int64[n_cols] As returned by _count_candidates_per_col_nb. Returns ------- ref_idx : int32[n_total] lib_idx : int32[n_total] score : float64[n_total] is_frag : uint8[n_total] (1 if fragment, 0 if NL) """ n_cols = counts_by_col.shape[0] n_total = int(col_offsets[n_cols]) ref_idx = np.empty(n_total, dtype=np.int32) lib_idx = np.empty(n_total, dtype=np.int32) score = np.empty(n_total, dtype=np.float64) is_frag = np.empty(n_total, dtype=np.uint8) pos = col_offsets[:-1].copy() if do_frag: for i in range(query_mz.shape[0]): Iq = float(query_int[i]) if Iq <= 0.0: continue mz = float(query_mz[i]) a, b = _search_spec_in_fragment_window(peaks_mz, mz, tol, use_ppm) for j in range(a, b): mj = float(peaks_mz[j]) if not _within_tol_nb(mz, mj, tol, use_ppm): continue spec_idx = int(peaks_spec_idx[j]) col_idx = int(pos[spec_idx]) ref_idx[col_idx] = i lib_idx[col_idx] = j score[col_idx] = Iq * float(peaks_int[j]) is_frag[col_idx] = 1 pos[spec_idx] = col_idx + 1 if do_nl and has_pmz and (nl_mz.size > 0): for i in range(query_mz.shape[0]): Iq = float(query_int[i]) if Iq <= 0.0: continue loss = qpmz - float(query_mz[i]) a, b = _search_spec_in_fragment_window(nl_mz, loss, tol, use_ppm) for k in range(a, b): mk = float(nl_mz[k]) if not _within_tol_nb(loss, mk, tol, use_ppm): continue spec_idx = int(nl_spec_idx[k]) # Same skip as in the count pass. if do_frag: lib_pmz = float(lib_precursor_mz[spec_idx]) if _precursor_shift_is_effectively_zero_nb(qpmz, lib_pmz, tol, use_ppm): continue j = int(nl_prod_idx[k]) col_idx = int(pos[spec_idx]) ref_idx[col_idx] = i lib_idx[col_idx] = j score[col_idx] = Iq * float(peaks_int[j]) is_frag[col_idx] = 0 pos[spec_idx] = col_idx + 1 return ref_idx, lib_idx, score, is_frag @njit(cache=True, nogil=True) def _greedy_scores_and_matches_all_cols_nb( n_cols: int, n_q: int, col_offsets: np.ndarray, ref_idx: np.ndarray, lib_idx: np.ndarray, score: np.ndarray, is_frag: np.ndarray, lib_spec_l2: np.ndarray, q_l2: float, ): """ Greedy, non-overlapping selection *per column* with score tie-break by fragment flag. For each column c: 1) Build its candidate view: indices [start:end) 2) Sort by key = score + (is_frag * eps), descending 3) Walk candidates; accept if neither query-peak i nor lib-peak j used yet 4) Accumulate dot and divide by norms Returns ------- out : float64[n_cols] Cosine or modified-cosine per library spectrum. """ out_score = np.zeros(n_cols, dtype=np.float64) out_matches = np.zeros(n_cols, dtype=np.int32) used_q = np.empty(n_q, dtype=np.uint8) eps = 1e-12 for col in range(n_cols): start = int(col_offsets[col]) end = int(col_offsets[col + 1]) size = end - start if size <= 0: continue s_ref = ref_idx[start:end] s_lib = lib_idx[start:end] s_score = score[start:end] s_frag = is_frag[start:end] key = np.empty(size, dtype=np.float64) for t in range(size): key[t] = s_score[t] + (eps if s_frag[t] == 1 else 0.0) order = np.argsort(-key) for qi in range(n_q): used_q[qi] = 0 used_lib_j = np.empty(size, dtype=np.int32) used_lib_n = 0 dot = 0.0 n_match = 0 for r in range(size): idx = int(order[r]) i = int(s_ref[idx]) j = int(s_lib[idx]) if used_q[i] == 1: continue seen = False for u in range(used_lib_n): if used_lib_j[u] == j: seen = True break if seen: continue used_q[i] = 1 used_lib_j[used_lib_n] = j used_lib_n += 1 dot += float(s_score[idx]) n_match += 1 denom = q_l2 * float(lib_spec_l2[col]) if denom > 0.0 and dot > 0.0: out_score[col] = dot / denom out_matches[col] = n_match return out_score, out_matches @njit(cache=True, nogil=True) def _greedy_scores_and_matches_all_cols_nb( n_cols: int, n_q: int, col_offsets: np.ndarray, ref_idx: np.ndarray, lib_idx: np.ndarray, score: np.ndarray, is_frag: np.ndarray, lib_spec_l2: np.ndarray, q_l2: float, ): out_score = np.zeros(n_cols, dtype=np.float64) out_matches = np.zeros(n_cols, dtype=np.int32) used_q = np.empty(n_q, dtype=np.uint8) eps = 1e-12 for col in range(n_cols): start = int(col_offsets[col]) end = int(col_offsets[col + 1]) size = end - start if size <= 0: continue s_ref = ref_idx[start:end] s_lib = lib_idx[start:end] s_score = score[start:end] s_frag = is_frag[start:end] key = np.empty(size, dtype=np.float64) for t in range(size): key[t] = s_score[t] + (eps if s_frag[t] == 1 else 0.0) order = np.argsort(-key) for qi in range(n_q): used_q[qi] = 0 used_lib_j = np.empty(size, dtype=np.int32) used_lib_n = 0 dot = 0.0 n_match = 0 for r in range(size): idx = int(order[r]) i = int(s_ref[idx]) j = int(s_lib[idx]) if used_q[i] == 1: continue seen = False for u in range(used_lib_n): if used_lib_j[u] == j: seen = True break if seen: continue used_q[i] = 1 used_lib_j[used_lib_n] = j used_lib_n += 1 dot += float(s_score[idx]) n_match += 1 denom = q_l2 * float(lib_spec_l2[col]) if denom > 0.0 and dot > 0.0: out_score[col] = dot / denom out_matches[col] = n_match return out_score, out_matches # ==================== Row worker (cosine) ==================== def _row_task_cosine(args): """ Compute one row (reference spectrum vs all library spectra) of (modified) cosine scores using a fully Numba-accelerated pipeline. Steps (per row / reference spectrum) ------------------------------------ 1. Build candidate (i,j) pairs against the global, sorted library index: - fragment matches around each reference m/z - neutral-loss matches around (precursor_ref - m/z), if enabled 2. Group candidates by target library spectrum (column) in CSR-like form using prefix sums (`col_offsets`). 3. For each column, greedily select a maximum-weight, non-overlapping subset of pairs (no query or library peak may be used twice), sorting by score (descending) with a tie-break to prefer fragments. 4. Normalize by L2 norms to produce cosine / modified cosine. Parameters ---------- args : tuple (row_index, peaks_array, precursor_mz_or_None) """ row_idx, ref_peaks, ref_pmz = args lib = _G_LIB cfg = _G_CFG if ref_peaks.size == 0: return ( row_idx, np.zeros(lib.n_specs, dtype=lib.dtype), np.zeros(lib.n_specs, dtype=np.int32), ) ref_mz = ref_peaks[:, 0] ref_int = ref_peaks[:, 1] q_l2 = float(np.sqrt(np.sum(ref_int * ref_int, dtype=np.float64))) if q_l2 == 0.0: return ( row_idx, np.zeros(lib.n_specs, dtype=lib.dtype), np.zeros(lib.n_specs, dtype=np.int32), ) match_mode = cfg["matching_mode"] do_frag = (match_mode == "fragment") or (match_mode == "hybrid") do_nl = (match_mode == "neutral_loss") or (match_mode == "hybrid") has_pmz = ref_pmz is not None ref_pmz = float(ref_pmz) if has_pmz else 0.0 tol = float(cfg["tol"]) use_ppm = bool(cfg["use_ppm"]) n_cols = int(lib.n_specs) counts_by_col = _count_candidates_per_col_nb( ref_mz, ref_int, has_pmz, ref_pmz, lib.peaks_mz, lib.peaks_spec_idx.astype(np.int32, copy=False), (lib.nl_mz if (cfg["compute_nl"] and lib.nl_mz is not None) else np.empty(0, np.float64)), (lib.nl_spec_idx if (cfg["compute_nl"] and lib.nl_spec_idx is not None) else np.empty(0, np.int32)), (lib.nl_product_idx if (cfg["compute_nl"] and lib.nl_product_idx is not None) else np.empty(0, np.int32)), lib.precursor_mz.astype(np.float64, copy=False), tol, use_ppm, do_frag, (do_nl and cfg["compute_nl"]), n_cols, ) col_offsets = np.empty(n_cols + 1, dtype=np.int64) col_offsets[0] = 0 for c in range(n_cols): col_offsets[c + 1] = col_offsets[c] + counts_by_col[c] n_total = int(col_offsets[-1]) if n_total == 0: out_score = np.zeros(n_cols, dtype=lib.dtype) out_matches = np.zeros(n_cols, dtype=np.int32) else: ref_idx, lib_idx, score, is_frag = _fill_candidates_per_col_nb( ref_mz, ref_int, has_pmz, ref_pmz, lib.peaks_mz, lib.peaks_int, lib.peaks_spec_idx.astype(np.int32, copy=False), (lib.nl_mz if (cfg["compute_nl"] and lib.nl_mz is not None) else np.empty(0, np.float64)), (lib.nl_spec_idx if (cfg["compute_nl"] and lib.nl_spec_idx is not None) else np.empty(0, np.int32)), (lib.nl_product_idx if (cfg["compute_nl"] and lib.nl_product_idx is not None) else np.empty(0, np.int32)), lib.precursor_mz.astype(np.float64, copy=False), tol, use_ppm, do_frag, (do_nl and cfg["compute_nl"]), col_offsets, counts_by_col, ) out_score64, out_matches = _greedy_scores_and_matches_all_cols_nb( n_cols, ref_mz.shape[0], col_offsets, ref_idx, lib_idx, score, is_frag, lib.spec_l2.astype(np.float64, copy=False), q_l2, ) out_score = out_score64.astype(lib.dtype, copy=False) iden_tol = cfg["iden_tol"] if (iden_tol is not None) and has_pmz: if cfg["iden_use_ppm"]: allow = np.abs(lib.precursor_mz - ref_pmz) <= ( iden_tol * 1e-6 * 0.5 * (lib.precursor_mz + ref_pmz) ) else: allow = np.abs(lib.precursor_mz - ref_pmz) <= iden_tol allow &= np.isfinite(lib.precursor_mz) out_score[~allow] = 0.0 out_matches[~allow] = 0 return row_idx, out_score, out_matches