Source code for matchms.similarity.CosineHungarian

from typing import List, Optional, Tuple
import numpy as np
from scipy.optimize import linear_sum_assignment
from matchms.similarity.spectrum_similarity_functions import collect_peak_pairs
from matchms.typing import SpectrumType
from .BaseSimilarity import BaseSimilarity


[docs] class CosineHungarian(BaseSimilarity): """Calculate 'cosine similarity score' between two spectra using the Hungarian algorithm. The cosine score quantifies the similarity between two mass spectra by finding the optimal one-to-one matching between their peaks. Two peaks are considered a potential match if their m/z ratios lie within the given *tolerance*. The peak assignment is solved using the Hungarian algorithm (``scipy.optimize.linear_sum_assignment``), which finds the assignment that maximises the sum of intensity products. This is mathematically optimal but can be notably slower than the greedy heuristic in :class:`~matchms.similarity.CosineGreedy`. """ is_commutative = True score_datatype = [("score", np.float64), ("matches", "int")]
[docs] def __init__(self, tolerance: float = 0.1, mz_power: float = 0.0, intensity_power: float = 1.0): """ Parameters ---------- tolerance: Peaks will be considered a match when <= tolerance apart. Default is 0.1. mz_power: The power to raise m/z to in the cosine function. The default is 0, in which case the peak intensity products will not depend on the m/z ratios. intensity_power: The power to raise intensity to in the cosine function. The default is 1. """ self.tolerance = tolerance self.mz_power = mz_power self.intensity_power = intensity_power
[docs] def pair(self, reference: SpectrumType, query: SpectrumType) -> Tuple[float, int]: """Calculate cosine score between two spectra. Parameters ---------- reference Single reference spectrum. query Single query spectrum. Returns ------- Tuple with cosine score and number of matched peaks. """ def get_matching_pairs() -> Optional[np.ndarray]: """Find all within-tolerance peak pairs, sorted by descending intensity product. Returns ``None`` when no peaks fall within *tolerance*. """ matching_pairs = collect_peak_pairs(spec1, spec2, self.tolerance, shift=0.0, mz_power=self.mz_power, intensity_power=self.intensity_power) if matching_pairs is None: return None matching_pairs = matching_pairs[np.argsort(matching_pairs[:, 2], kind='mergesort')[::-1], :] return matching_pairs def get_matching_pairs_matrix() -> Tuple[ Optional[List[float]], Optional[List[float]], Optional[np.ndarray] ]: """Build a cost matrix for the Hungarian algorithm. Rows correspond to the unique reference peaks that participate in at least one within-tolerance pair; columns correspond to the unique query peaks. The matrix is initialised to ``1.0`` (maximum cost) and actual within-tolerance pairs are set to ``1 - intensity_product``. Returns ------- paired_peaks1 Unique reference-peak indices involved in at least one match. paired_peaks2 Unique query-peak indices involved in at least one match. cost_matrix Cost matrix of shape ``(len(paired_peaks1), len(paired_peaks2))``. Cells that remain ``1.0`` have **no** real within-tolerance pair. """ if matching_pairs is None: return None, None, None paired_peaks1 = list(set(matching_pairs[:, 0])) paired_peaks2 = list(set(matching_pairs[:, 1])) matrix_size = (len(paired_peaks1), len(paired_peaks2)) cost_matrix = np.ones(matrix_size) for i in range(matching_pairs.shape[0]): cost_matrix[paired_peaks1.index(matching_pairs[i, 0]), paired_peaks2.index(matching_pairs[i, 1])] = 1 - matching_pairs[i, 2] return paired_peaks1, paired_peaks2, cost_matrix def solve_hungarian() -> Tuple[float, List[Tuple[int, int]]]: """Solve the optimal peak assignment via the Hungarian algorithm. The algorithm assigns ``min(rows, cols)`` pairs. Some of those assignments may land on cells that were never overwritten from the initial ``1.0`` — i.e. peak pairs that are **not** within tolerance ("phantom pairs"). These contribute ``1 - 1.0 = 0`` to the score, so the score is unaffected, but they must be excluded from the match count. The score formula ``len(row_ind) - sum(costs)`` relies on phantoms contributing exactly ``1.0`` to the sum, so the score line must use **all** assignments (including phantoms). Returns ------- score Un-normalised cosine score (sum of intensity products for matched peaks). used_matches List of ``(reference_peak_idx, query_peak_idx)`` tuples for the real (non-phantom) matched pairs only. """ row_ind, col_ind = linear_sum_assignment(matching_pairs_matrix) # Score uses ALL assignments: phantoms add 1.0 to the sum, cancelling # with the len(row_ind) term and contributing 0 to the score. score = len(row_ind) - matching_pairs_matrix[row_ind, col_ind].sum() # Match count excludes phantoms (cells still at 1.0). used_matches = [ (int(paired_peaks1[x]), int(paired_peaks2[y])) for (x, y) in zip(row_ind, col_ind) if matching_pairs_matrix[x, y] < 1.0 ] return score, used_matches def calc_score() -> np.ndarray: """Compute the normalised cosine score and match count.""" if matching_pairs_matrix is None: return np.asarray((0.0, 0), dtype=self.score_datatype) score, used_matches = solve_hungarian() spec1_power = np.power(spec1[:, 0], self.mz_power) \ * np.power(spec1[:, 1], self.intensity_power) spec2_power = np.power(spec2[:, 0], self.mz_power) \ * np.power(spec2[:, 1], self.intensity_power) score = score/(np.sqrt(np.sum(spec1_power**2)) * np.sqrt(np.sum(spec2_power**2))) return np.asarray((score, len(used_matches)), dtype=self.score_datatype) spec1 = reference.peaks.to_numpy spec2 = query.peaks.to_numpy matching_pairs = get_matching_pairs() paired_peaks1, paired_peaks2, matching_pairs_matrix = get_matching_pairs_matrix() return calc_score()