import numpy as np
from numba import njit # type: ignore[attr-defined, import-untyped]
@njit(cache=True)
def _sorted_peak_order(spec):
"""Return intensity-sorted indices with deterministic tie-breaking.
Peaks are primarily sorted by ascending intensity. Equal-intensity peaks are
reordered by descending original index, so reverse iteration visits the
lower-m/z peak first because input spectra are already sorted by m/z.
"""
order = np.argsort(spec[:, 1])
start = 0
n = order.shape[0]
while start < n:
end = start + 1
intensity = spec[order[start], 1]
while end < n and spec[order[end], 1] == intensity:
end += 1
for left in range(start, end):
max_pos = left
for right in range(left + 1, end):
if order[right] > order[max_pos]:
max_pos = right
if max_pos != left:
tmp = order[left]
order[left] = order[max_pos]
order[max_pos] = tmp
start = end
return order
[docs]
@njit(cache=True)
def sirius_merge_close_peaks(spec, mz_tolerance):
"""Merge close peaks following the Sirius/BOECKER lab algorithm.
Peaks are merged greedily in descending intensity order. Each unconsumed peak
adopts its own m/z and sums the intensities of all unconsumed neighbors within
a merge window of 2 * mz_tolerance. The result is guaranteed to have consecutive
m/z gaps > 2 * mz_tolerance. When multiple peaks share the same intensity,
the lower-m/z peak is processed first so the representative peak stays
deterministic across NumPy and Numba sort implementations.
Parameters
----------
spec
2D array (N, 2) with columns [mz, intensity], sorted by ascending m/z.
mz_tolerance
Tolerance for scoring. Merge window is 2 * mz_tolerance.
Returns
-------
numpy.ndarray
(M, 2) array of merged peaks sorted by ascending m/z.
"""
n = spec.shape[0]
if n == 0:
return spec.copy()
merge_window = 2.0 * mz_tolerance
# Sort indices by ascending intensity and iterate in reverse (descending).
order = _sorted_peak_order(spec)
consumed = np.zeros(n, dtype=np.bool_)
merged_mz = np.empty(n, dtype=np.float64)
merged_int = np.empty(n, dtype=np.float64)
count = 0
for k in range(n - 1, -1, -1):
i = order[k]
if consumed[i]:
continue
# This peak becomes the representative; start with its intensity
mz_i = spec[i, 0]
total_intensity = spec[i, 1]
consumed[i] = True
# Scan left in m/z order
j = i - 1
while j >= 0:
if consumed[j]:
j -= 1
continue
if mz_i - spec[j, 0] > merge_window:
break
total_intensity += spec[j, 1]
consumed[j] = True
j -= 1
# Scan right in m/z order
j = i + 1
while j < n:
if consumed[j]:
j += 1
continue
if spec[j, 0] - mz_i > merge_window:
break
total_intensity += spec[j, 1]
consumed[j] = True
j += 1
merged_mz[count] = mz_i
merged_int[count] = total_intensity
count += 1
# Build result and sort by ascending m/z
result = np.empty((count, 2), dtype=np.float64)
result[:, 0] = merged_mz[:count]
result[:, 1] = merged_int[:count]
sort_idx = np.argsort(result[:, 0])
return result[sort_idx]
[docs]
@njit(cache=True)
def linear_cosine_score(spec1, spec2, tolerance, mz_power, intensity_power):
"""Compute the CosineLinear similarity between two well-separated spectra.
Both spectra must have consecutive m/z gaps > 2 * tolerance (as ensured by
sirius_merge_close_peaks). Uses an O(n+m) two-pointer sweep.
Parameters
----------
spec1
2D array (N, 2) with columns [mz, intensity], sorted ascending m/z.
spec2
2D array (M, 2) with columns [mz, intensity], sorted ascending m/z.
tolerance
Maximum allowed difference between m/z values for a match.
mz_power
Power to raise m/z values to.
intensity_power
Power to raise intensity values to.
Returns
-------
score : float
Cosine similarity score.
matches : int
Number of matched peak pairs.
"""
n1 = spec1.shape[0]
n2 = spec2.shape[0]
if n1 == 0 or n2 == 0:
return 0.0, 0
# Compute weighted products for each spectrum
products1 = np.empty(n1, dtype=np.float64)
for i in range(n1):
products1[i] = (spec1[i, 0] ** mz_power) * (spec1[i, 1] ** intensity_power)
products2 = np.empty(n2, dtype=np.float64)
for i in range(n2):
products2[i] = (spec2[i, 0] ** mz_power) * (spec2[i, 1] ** intensity_power)
# Compute norms
norm1 = 0.0
for i in range(n1):
norm1 += products1[i] * products1[i]
norm1 = np.sqrt(norm1)
norm2 = 0.0
for i in range(n2):
norm2 += products2[i] * products2[i]
norm2 = np.sqrt(norm2)
if norm1 == 0.0 or norm2 == 0.0:
return 0.0, 0
# Two-pointer sweep
matched_sum = 0.0
matches = 0
i = 0
j = 0
while i < n1 and j < n2:
diff = spec1[i, 0] - spec2[j, 0]
if abs(diff) <= tolerance:
matched_sum += products1[i] * products2[j]
matches += 1
i += 1
j += 1
elif diff < 0:
i += 1
else:
j += 1
score = matched_sum / (norm1 * norm2)
return score, matches