import logging
from typing import Dict, Tuple
import numpy as np
from scipy.optimize import linear_sum_assignment
from matchms.similarity.spectrum_similarity_functions import collect_peak_pairs
from matchms.typing import SpectrumType
from ._precursor_validation import get_valid_precursor_mz
from .BaseSimilarity import BaseSimilarity
from .CosineHungarian import CosineHungarian
logger = logging.getLogger("matchms")
[docs]
class ModifiedCosineHungarian(BaseSimilarity):
"""Calculate exact modified cosine score between mass spectra.
The modified cosine score quantifies similarity between two mass spectra with
optional precursor-based mass shift. Potential matches are all peak pairs that
are within ``tolerance`` either unshifted or shifted by
``precursor_mz(reference) - precursor_mz(query)``.
Peak assignment is solved globally via Hungarian assignment
(linear sum assignment), which yields an exact one-to-one maximum-weight
matching.
See Watrous et al. [PNAS, 2012, https://www.pnas.org/content/109/26/E1743]
for the modified cosine concept.
"""
is_commutative = True
score_datatype = [("score", np.float64), ("matches", "int")]
[docs]
def __init__(self, tolerance: float = 0.1, mz_power: float = 0.0, intensity_power: float = 1.0):
"""Initialize exact modified cosine.
Parameters
----------
tolerance:
Peaks will be considered a match when <= tolerance apart. Default is 0.1.
mz_power:
The power to raise mz to in the cosine function. The default is 0, in which
case the peak intensity products will not depend on the m/z ratios.
intensity_power:
The power to raise intensity to in the cosine function. The default is 1.
"""
self.tolerance = tolerance
self.mz_power = mz_power
self.intensity_power = intensity_power
[docs]
def pair(self, reference: SpectrumType, query: SpectrumType) -> Tuple[float, int]:
"""Calculate exact modified cosine score between two spectra."""
precursor_mz_ref = get_valid_precursor_mz(reference, logger)
precursor_mz_query = get_valid_precursor_mz(query, logger)
mass_shift = precursor_mz_ref - precursor_mz_query
if abs(mass_shift) <= self.tolerance:
return CosineHungarian(
tolerance=self.tolerance,
mz_power=self.mz_power,
intensity_power=self.intensity_power,
).pair(reference, query)
def get_matching_pairs():
"""Find all candidate matching peak pairs for unshifted and shifted views."""
zero_pairs = collect_peak_pairs(
spec1, spec2, self.tolerance, shift=0.0,
mz_power=self.mz_power, intensity_power=self.intensity_power
)
nonzero_pairs = collect_peak_pairs(
spec1, spec2, self.tolerance, shift=mass_shift,
mz_power=self.mz_power, intensity_power=self.intensity_power
)
if zero_pairs is None:
zero_pairs = np.zeros((0, 3))
if nonzero_pairs is None:
nonzero_pairs = np.zeros((0, 3))
if zero_pairs.shape[0] == 0 and nonzero_pairs.shape[0] == 0:
return np.zeros((0, 3))
return np.concatenate((zero_pairs, nonzero_pairs), axis=0)
def build_weight_matrix(matching_pairs: np.ndarray):
"""Build dense weight matrix from matching pairs.
Duplicate (i, j) edges can occur when both shift=0 and shift!=0 generate the
same pair; keep the maximum weight for the edge.
"""
if matching_pairs.shape[0] == 0:
return None, None, None
deduplicated_edges: Dict[Tuple[int, int], float] = {}
for peak_i, peak_j, weight in matching_pairs:
edge = (int(peak_i), int(peak_j))
current_weight = deduplicated_edges.get(edge)
if current_weight is None or weight > current_weight:
deduplicated_edges[edge] = float(weight)
if len(deduplicated_edges) == 0:
return None, None, None
paired_peaks1 = sorted({edge[0] for edge in deduplicated_edges})
paired_peaks2 = sorted({edge[1] for edge in deduplicated_edges})
idx_map1 = {peak_idx: i for i, peak_idx in enumerate(paired_peaks1)}
idx_map2 = {peak_idx: j for j, peak_idx in enumerate(paired_peaks2)}
weights = np.zeros((len(paired_peaks1), len(paired_peaks2)), dtype=np.float64)
for (peak_i, peak_j), weight in deduplicated_edges.items():
weights[idx_map1[peak_i], idx_map2[peak_j]] = weight
return paired_peaks1, paired_peaks2, weights
def solve_hungarian(weights: np.ndarray):
"""Solve maximum weight matching with Hungarian assignment."""
n_rows, n_cols = weights.shape
size = max(n_rows, n_cols)
padded_weights = np.zeros((size, size), dtype=np.float64)
padded_weights[:n_rows, :n_cols] = weights
max_weight = padded_weights.max()
costs = max_weight - padded_weights
row_ind, col_ind = linear_sum_assignment(costs)
score = 0.0
used_matches = []
for i, j in zip(row_ind, col_ind):
if i < n_rows and j < n_cols and weights[i, j] > 0:
score += weights[i, j]
used_matches.append((i, j))
return score, used_matches
spec1 = reference.peaks.to_numpy
spec2 = query.peaks.to_numpy
matching_pairs = get_matching_pairs()
paired_peaks1, paired_peaks2, weights = build_weight_matrix(matching_pairs)
if weights is None:
return np.asarray((0.0, 0), dtype=self.score_datatype)
score, used_matches = solve_hungarian(weights)
spec1_power = np.power(spec1[:, 0], self.mz_power) * np.power(spec1[:, 1], self.intensity_power)
spec2_power = np.power(spec2[:, 0], self.mz_power) * np.power(spec2[:, 1], self.intensity_power)
denominator = np.sqrt(np.sum(spec1_power ** 2)) * np.sqrt(np.sum(spec2_power ** 2))
if denominator > 0:
score = score / denominator
else:
score = 0.0
used_matches = [(paired_peaks1[i], paired_peaks2[j]) for i, j in used_matches]
return np.asarray((score, len(used_matches)), dtype=self.score_datatype)