# Source code for matchms.similarity.CosineHungarian

```from typing import Tuple
import numpy
from scipy.optimize import linear_sum_assignment
from matchms.similarity.spectrum_similarity_functions import collect_peak_pairs
from matchms.typing import SpectrumType
from .BaseSimilarity import BaseSimilarity

[docs]class CosineHungarian(BaseSimilarity):
"""Calculate 'cosine similarity score' between two spectra (using Hungarian algorithm).

The cosine score aims at quantifying the similarity between two mass spectra.
The score is calculated by finding best possible matches between peaks
of two spectra. Two peaks are considered a potential match if their
m/z ratios lie within the given 'tolerance'.
The underlying peak assignment problem is here solved using the Hungarian algorithm.
This can perform notably slower than the 'greedy' implementation in
:class:`~matchms.similarity.CosineGreedy`, but does represent a mathematically proper
solution to the problem.
"""
# Set key characteristics as class attributes
is_commutative = True
# Set output data type, e.g. ("score", "float") or [("score", "float"), ("matches", "int")]
score_datatype = [("score", numpy.float64), ("matches", "int")]

[docs]    def __init__(self, tolerance: float = 0.1, mz_power: float = 0.0,
intensity_power: float = 1.0):
"""
Parameters
----------
tolerance:
Peaks will be considered a match when <= tolerance apart. Default is 0.1.
mz_power:
The power to raise m/z to in the cosine function. The default is 0, in which
case the peak intensity products will not depend on the m/z ratios.
intensity_power:
The power to raise intensity to in the cosine function. The default is 1.
"""
self.tolerance = tolerance
self.mz_power = mz_power
self.intensity_power = intensity_power

[docs]    def pair(self, reference: SpectrumType, query: SpectrumType) -> Tuple[float, int]:
"""Calculate cosine score between two spectra.

Parameters
----------
reference
Single reference spectrum.
query
Single query spectrum.

Returns:
--------

Tuple with cosine score and number of matched peaks.
"""
def get_matching_pairs():
"""Get pairs of peaks that match within the given tolerance."""
matching_pairs = collect_peak_pairs(spec1, spec2, self.tolerance, shift=0.0,
mz_power=self.mz_power,
intensity_power=self.intensity_power)
if matching_pairs is None:
return None
matching_pairs = matching_pairs[numpy.argsort(matching_pairs[:, 2])[::-1], :]
return matching_pairs

def get_matching_pairs_matrix():
"""Create matrix of multiplied intensities of all matching pairs
between spectrum1 and spectrum2.
Returns
paired_peaks1:
list of paired peaks of spectrum1
paired_peaks2:
list of paired peaks of spectrum2
matching_pairs_matrix:
Array of multiplied intensities between all matching peaks.
"""
if matching_pairs is None:
return None, None, None
paired_peaks1 = list(set(matching_pairs[:, 0]))
paired_peaks2 = list(set(matching_pairs[:, 1]))
matrix_size = (len(paired_peaks1), len(paired_peaks2))
matching_pairs_matrix = numpy.ones(matrix_size)
for i in range(matching_pairs.shape[0]):
matching_pairs_matrix[paired_peaks1.index(matching_pairs[i, 0]),
paired_peaks2.index(matching_pairs[i, 1])] = 1 - matching_pairs[i, 2]
return paired_peaks1, paired_peaks2, matching_pairs_matrix

def solve_hungarian():
"""Use hungarian algorithm to solve the linear sum assignment problem."""
row_ind, col_ind = linear_sum_assignment(matching_pairs_matrix)
score = len(row_ind) - matching_pairs_matrix[row_ind, col_ind].sum()
used_matches = [(paired_peaks1[x], paired_peaks2[y]) for (x, y) in zip(row_ind, col_ind)]
return score, used_matches

def calc_score():
"""Calculate cosine similarity score."""
if matching_pairs_matrix is None:
return numpy.asarray((0.0, 0), dtype=self.score_datatype)
score, used_matches = solve_hungarian()
# Normalize score:
spec1_power = numpy.power(spec1[:, 0], self.mz_power) \
* numpy.power(spec1[:, 1], self.intensity_power)
spec2_power = numpy.power(spec2[:, 0], self.mz_power) \
* numpy.power(spec2[:, 1], self.intensity_power)
score = score/(numpy.sqrt(numpy.sum(spec1_power**2)) * numpy.sqrt(numpy.sum(spec2_power**2)))
return numpy.asarray((score, len(used_matches)), dtype=self.score_datatype)

spec1 = reference.peaks.to_numpy
spec2 = query.peaks.to_numpy
matching_pairs = get_matching_pairs()
paired_peaks1, paired_peaks2, matching_pairs_matrix = get_matching_pairs_matrix()
return calc_score()
```