Source code for matchms.similarity.MetadataMatch

import logging
from typing import List
import numpy as np
from sparsestack import StackedSparseArray
from matchms.similarity.spectrum_similarity_functions import number_matching, number_matching_symmetric
from matchms.typing import SpectrumType
from .BaseSimilarity import BaseSimilarity


logger = logging.getLogger("matchms")


[docs] class MetadataMatch(BaseSimilarity): """Return True if metadata entries of a specified field match between two spectra. This is supposed to be used to compare a wide range of possible metadata entries and use this to later select related or similar spectra. Example to calculate scores between 2 pairs of spectra and iterate over the scores .. testcode:: import numpy as np from matchms import calculate_scores from matchms import Spectrum from matchms.similarity import MetadataMatch spectrum_1 = Spectrum(mz=np.array([]), intensities=np.array([]), metadata={"instrument_type": "orbitrap", "id": 1}) spectrum_2 = Spectrum(mz=np.array([]), intensities=np.array([]), metadata={"instrument_type": "qtof", "id": 2}) spectrum_3 = Spectrum(mz=np.array([]), intensities=np.array([]), metadata={"instrument_type": "qtof", "id": 3}) spectrum_4 = Spectrum(mz=np.array([]), intensities=np.array([]), metadata={"instrument_type": "orbitrap", "id": 4}) references = [spectrum_1, spectrum_2] queries = [spectrum_3, spectrum_4] similarity_score = MetadataMatch(field="instrument_type") scores = calculate_scores(references, queries, similarity_score) for (reference, query, score) in scores: print(f"Metadata match between {reference.get('id')} and {query.get('id')}" + f" is {bool(score[0])}") Should output .. testoutput:: Metadata match between 1 and 4 is True Metadata match between 2 and 3 is True """ # Set key characteristics as class attributes is_commutative = True score_datatype = bool
[docs] def __init__(self, field: str, matching_type: str = "equal_match", tolerance: float = 0.1): """ Parameters ---------- field Specify field name for metadata that should be compared. matching_type Specify how field entries should be matched. Can be one of ["equal_match", "difference"]. "equal_match": Entries must be exactly equal (default). "difference": Entries are considered a match if their numerical difference is less than or equal to "tolerance". tolerance Specify tolerance below which two values are counted as match. This only applied to numerical values. """ self.field = field self.tolerance = tolerance assert matching_type in ["equal_match", "difference"], "Expected type from ['equal_match', 'difference']" self.matching_type = matching_type
[docs] def pair(self, reference: SpectrumType, query: SpectrumType) -> float: """Compare precursor m/z between reference and query spectrum. Parameters ---------- reference Single reference spectrum. query Single query spectrum. """ entry_ref = reference.get(self.field) entry_query = query.get(self.field) if entry_ref is None or entry_query is None: return np.asarray(False, dtype=self.score_datatype) if self.matching_type == "equal_match": score = entry_ref == entry_query return np.asarray(score, dtype=self.score_datatype) if isinstance(entry_ref, (int, float)) and isinstance(entry_query, (int, float)): score = abs(entry_ref - entry_query) <= self.tolerance return np.asarray(score, dtype=self.score_datatype) logger.warning("Non-numerical entry not compatible with 'difference' method") return np.asarray(False, dtype=self.score_datatype)
[docs] def matrix(self, references: List[SpectrumType], queries: List[SpectrumType], array_type: str = "numpy", is_symmetric: bool = False) -> np.ndarray: """Compare parent masses between all references and queries. Parameters ---------- references List/array of reference spectra. queries List/array of Single query spectra. array_type Specify the output array type. Can be "numpy" or "sparse". Default is "numpy" and will return a numpy array. "sparse" will return a COO-sparse array. is_symmetric Set to True when *references* and *queries* are identical (as for instance for an all-vs-all comparison). By using the fact that score[i,j] = score[j,i] the calculation will be about 2x faster. """ # pylint: disable=too-many-locals if array_type not in ["numpy", "sparse"]: raise ValueError("array_type must be 'numpy' or 'sparse'.") def collect_entries(spectra): """Collect metadata entries.""" entries = [] for spectrum in spectra: entry = spectrum.get(self.field) if entry is None: msg = f"No {self.field} entry found for spectrum." logger.warning(msg) entry = np.nan elif self.matching_type == "difference" and not isinstance(entry, (int, float)): msg = f"Non-numerical entry ({entry}) not compatible with 'difference' method." logger.warning(msg) entry = np.nan entries.append(entry) return np.asarray(entries) entries_ref = collect_entries(references) entries_query = collect_entries(queries) if self.matching_type == "equal_match": if self.tolerance != 0: msg = "Tolerance is set but will be ignored because 'equal_match' does not use tolerance." logger.warning(msg) rows, cols = [], [] for i, entry in enumerate(entries_query): idx = np.where(entries_ref == entry)[0] rows.extend(idx) cols.extend([i] * len(idx)) rows = np.array(rows) cols = np.array(cols) scores = np.ones(len(rows)) else: if is_symmetric: rows, cols, scores = number_matching_symmetric(entries_ref, self.tolerance) else: rows, cols, scores = number_matching(entries_ref, entries_query, self.tolerance) if array_type == "sparse": scores_array = StackedSparseArray(len(entries_ref), len(entries_query)) scores_array.add_sparse_data(rows, cols, scores.astype(self.score_datatype), "") else: scores_array = np.zeros((len(entries_ref), len(entries_query)), dtype=self.score_datatype) scores_array[rows, cols] = scores.astype(self.score_datatype) return scores_array