from __future__ import annotations
import copy
import json
import pickle
import numpy as np
from numpy.lib.recfunctions import unstructured_to_structured
from scipy.sparse import coo_matrix
from sparsestack import StackedSparseArray
from matchms.importing.load_from_json import scores_json_decoder
from matchms.similarity import get_similarity_function_by_name
from matchms.similarity.BaseSimilarity import BaseSimilarity
from matchms.typing import QueriesType, ReferencesType
[docs]class Scores:
"""Contains reference and query spectrums and the scores between them.
The scores can be retrieved as a matrix with the :py:attr:`Scores.scores` attribute.
The reference spectrum, query spectrum, score pairs can also be iterated over in query then reference order.
Example to calculate scores between 2 spectrums and iterate over the scores
.. testcode::
import numpy as np
from matchms import calculate_scores
from matchms import Spectrum
from matchms.similarity import CosineGreedy
spectrum_1 = Spectrum(mz=np.array([100, 150, 200.]),
intensities=np.array([0.7, 0.2, 0.1]),
metadata={'id': 'spectrum1'})
spectrum_2 = Spectrum(mz=np.array([100, 140, 190.]),
intensities=np.array([0.4, 0.2, 0.1]),
metadata={'id': 'spectrum2'})
spectrum_3 = Spectrum(mz=np.array([110, 140, 195.]),
intensities=np.array([0.6, 0.2, 0.1]),
metadata={'id': 'spectrum3'})
spectrum_4 = Spectrum(mz=np.array([100, 150, 200.]),
intensities=np.array([0.6, 0.1, 0.6]),
metadata={'id': 'spectrum4'})
references = [spectrum_1, spectrum_2]
queries = [spectrum_3, spectrum_4]
similarity_measure = CosineGreedy()
scores = calculate_scores(references, queries, similarity_measure)
for (reference, query, score) in scores:
print(f"Cosine score between {reference.get('id')} and {query.get('id')}" +
f" is {score[0]:.2f} with {score[1]} matched peaks")
Should output
.. testoutput::
Cosine score between spectrum1 and spectrum4 is 0.80 with 3 matched peaks
Cosine score between spectrum2 and spectrum3 is 0.14 with 1 matched peaks
Cosine score between spectrum2 and spectrum4 is 0.61 with 1 matched peaks
"""
[docs] def __init__(self, references: ReferencesType, queries: QueriesType,
is_symmetric: bool = False):
"""
Parameters
----------
references
List of reference objects
queries
List of query objects
is_symmetric
Set to True when *references* and *queries* are identical (as for instance for an all-vs-all
comparison). By using the fact that score[i,j] = score[j,i] the calculation will be about
2x faster. Default is False.
"""
Scores._validate_input_arguments(references, queries)
self.n_rows = len(references)
self.n_cols = len(queries)
self.references = np.asarray(references)
self.queries = np.asarray(queries)
self.is_symmetric = is_symmetric
self._scores = StackedSparseArray(self.n_rows, self.n_cols)
self._index = 0
def __eq__(self, other):
if isinstance(other, Scores):
if self.n_rows != other.n_rows or self.n_cols != other.n_cols:
return False
if not np.array_equal(self.references, other.references):
return False
if not np.array_equal(self.queries, other.queries):
return False
if self._scores != other._scores:
return False
return True
return NotImplemented
def __iter__(self):
return self
def __next__(self):
if self._index < len(self._scores.col):
i = self._index
result = [self._scores.data[name][i] for name in self._scores.score_names]
if not isinstance(result, tuple):
result = (result,)
self._index += 1
return (self.references[self._scores.row[i]],
self.queries[self._scores.col[i]]) + result
self._index = 0
raise StopIteration
def __repr__(self):
return self._scores.__repr__()
def __str__(self):
return self._scores.__str__()
@staticmethod
def _validate_input_arguments(references, queries):
assert isinstance(references, (list, tuple, np.ndarray)),\
"Expected input argument 'references' to be list or tuple or np.ndarray."
assert isinstance(queries, (list, tuple, np.ndarray)),\
"Expected input argument 'queries' to be list or tuple or np.ndarray."
[docs] def calculate(self, similarity_function: BaseSimilarity,
name: str = None,
array_type: str = "numpy",
join_type="left") -> Scores:
"""
Calculate the similarity between all reference objects vs all query objects using
the most suitable available implementation of the given similarity_function.
If Scores object already contains similarity scores, the newly computed measures
will be added to a new layer (name --> layer name).
Additional scores will be added as specified with join_type, the default being 'left'.
Parameters
----------
similarity_function
Function which accepts a reference + query object and returns a score or tuple of scores
name
Label of the new scores layer. If None, the name of the similarity_function class will be used.
array_type
Specify the type of array to store and compute the scores. Choose from "numpy" or "sparse".
join_type
Choose from left, right, outer, inner to specify the merge type.
"""
def is_sparse_advisable():
return (
(len(self._scores.score_names) > 0) # already scores in Scores
and (join_type in ["inner", "left"]) # inner/left join
and (len(self._scores.row) < (self.n_rows * self.n_cols)/2) # fewer than half of scores have entries
)
if name is None:
name = similarity_function.__class__.__name__
if (self.n_rows == 0) or (self.n_cols == 0):
raise ValueError("Number of elements must be >= 1")
if self.n_rows == self.n_cols == 1:
score = similarity_function.pair(self.references[0],
self.queries[0])
self._scores.add_dense_matrix(np.array([score]), name)
elif is_sparse_advisable():
new_scores = similarity_function.sparse_array(references=self.references,
queries=self.queries,
idx_row=self._scores.row,
idx_col=self._scores.col,
is_symmetric=self.is_symmetric)
self._scores.add_sparse_data(self._scores.row,
self._scores.col,
new_scores,
name)
else:
new_scores = similarity_function.matrix(self.references,
self.queries,
array_type=array_type,
is_symmetric=self.is_symmetric)
if isinstance(new_scores, np.ndarray):
self._scores.add_dense_matrix(new_scores, name, join_type=join_type)
elif len(new_scores.score_names) == 1:
new_scores.data.dtype.names = [name]
self._scores.add_sparse_data(new_scores.row,
new_scores.col,
new_scores.data, "", join_type=join_type)
else:
self._scores.add_sparse_data(new_scores.row,
new_scores.col,
new_scores.data, name, join_type=join_type)
return self
[docs] def scores_by_reference(self, reference: ReferencesType,
name: str = None, sort: bool = False) -> np.ndarray:
"""Return all scores of given name for the given reference spectrum.
Parameters
----------
reference
Single reference Spectrum.
name
Name of the score that should be returned (if multiple scores are stored).
sort
Set to True to obtain the scores in a sorted way (relying on the
:meth:`~.BaseSimilarity.sort` function from the given similarity_function).
"""
if name is None and len(self.score_names) > 1 and sort is True:
raise IndexError("For sorting, score must be specified")
assert reference in self.references, "Given input not found in references."
selected_idx = int(np.where(self.references == reference)[0])
_, r, scores_for_ref = self._scores[selected_idx, :]
if sort:
if name is None:
name = self._scores.guess_score_name()
if scores_for_ref.dtype.type == np.void:
query_idx_sorted = np.argsort(scores_for_ref[name])[::-1]
else:
query_idx_sorted = np.argsort(scores_for_ref)[::-1]
return list(zip(self.queries[r[query_idx_sorted]],
scores_for_ref[query_idx_sorted].copy()))
return list(zip(self.queries[r], scores_for_ref.copy()))
[docs] def scores_by_query(self, query: QueriesType,
name: str = None, sort: bool = False) -> np.ndarray:
"""Return all scores for the given query spectrum.
For example
.. testcode::
import numpy as np
from matchms import calculate_scores, Scores, Spectrum
from matchms.similarity import CosineGreedy
spectrum_1 = Spectrum(mz=np.array([100, 150, 200.]),
intensities=np.array([0.7, 0.2, 0.1]),
metadata={'id': 'spectrum1'})
spectrum_2 = Spectrum(mz=np.array([100, 140, 190.]),
intensities=np.array([0.4, 0.2, 0.1]),
metadata={'id': 'spectrum2'})
spectrum_3 = Spectrum(mz=np.array([110, 140, 195.]),
intensities=np.array([0.6, 0.2, 0.1]),
metadata={'id': 'spectrum3'})
spectrum_4 = Spectrum(mz=np.array([100, 150, 200.]),
intensities=np.array([0.6, 0.1, 0.6]),
metadata={'id': 'spectrum4'})
references = [spectrum_1, spectrum_2, spectrum_3]
queries = [spectrum_2, spectrum_3, spectrum_4]
scores = calculate_scores(references, queries, CosineGreedy())
selected_scores = scores.scores_by_query(spectrum_4, 'CosineGreedy_score', sort=True)
print([x[1][0].round(3) for x in selected_scores])
Should output
.. testoutput::
[0.796, 0.613]
Parameters
----------
query
Single query Spectrum.
name
Name of the score that should be returned (if multiple scores are stored).
sort
Set to True to obtain the scores in a sorted way (relying on the
:meth:`~.BaseSimilarity.sort` function from the given similarity_function).
"""
if name is None and len(self.score_names) > 1 and sort is True:
raise IndexError("For sorting, score must be specified")
assert query in self.queries, "Given input not found in queries."
selected_idx = int(np.where(self.queries == query)[0])
c, _, scores_for_query = self._scores[:, selected_idx]
if sort:
if name is None:
name = self._scores.guess_score_name()
# TODO: add option to use other sorting algorithm
if scores_for_query.dtype.type == np.void:
references_idx_sorted = np.argsort(scores_for_query[name])[::-1]
else:
references_idx_sorted = np.argsort(scores_for_query)[::-1]
return list(zip(self.references[c[references_idx_sorted]],
scores_for_query[references_idx_sorted].copy()))
return list(zip(self.references[c], scores_for_query.copy()))
[docs] def to_json(self, filename: str):
"""Export :py:class:`~matchms.Scores.Scores` to a JSON file.
Parameters
----------
filename
Path to file to write to
"""
with open(filename, "w", encoding="utf-8") as f:
json.dump(self, f, cls=ScoresJSONEncoder)
[docs] def to_pickle(self, filename: str):
"""Export :py:class:`~matchms.Scores.Scores` to a Pickle file.
Parameters
----------
filename
Path to file to write to
"""
with open(filename, "wb") as f:
pickle.dump(self, f)
[docs] def to_dict(self) -> dict:
"""Return a dictionary representation of scores."""
scores_dict = {"__Scores__": True,
"is_symmetric": self.is_symmetric,
"references": [reference.to_dict() for reference in self.references],
"queries": [query.to_dict() for query in self.queries] if not self.is_symmetric else None}
scores_dict.update(self.scores.to_dict())
return scores_dict
@property
def shape(self):
return self._scores.shape
@property
def score_names(self):
return self._scores.score_names
@property
def scores(self):
return self._scores
[docs] def filter_by_range(self, **kwargs):
"""Remove all scores for which the score `name` is outside the given range.
Parameters
----------
kwargs
See "Keyword arguments" section below.
Keyword arguments
-----------------
name
Name of the score which is used for filtering. Run `.score_names` to
see all scores stored in the sparse array.
low
Lower threshold below which all scores will be removed.
high
Upper threshold above of which all scores will be removed.
above_operator
Define operator to be used to compare against `low`. Default is '>'.
Possible choices are '>', '<', '>=', '<='.
below_operator
Define operator to be used to compare against `high`. Default is '<'.
Possible choices are '>', '<', '>=', '<='.
"""
self._scores = self._scores.filter_by_range(**kwargs)
[docs] def to_array(self, name=None) -> np.ndarray:
"""Scores as numpy array
For example
.. testcode::
import numpy as np
from matchms import calculate_scores, Scores, Spectrum
from matchms.similarity import IntersectMz
spectrum_1 = Spectrum(mz=np.array([100, 150, 200.]),
intensities=np.array([0.7, 0.2, 0.1]))
spectrum_2 = Spectrum(mz=np.array([100, 140, 190.]),
intensities=np.array([0.4, 0.2, 0.1]))
spectrums = [spectrum_1, spectrum_2]
scores = calculate_scores(spectrums, spectrums, IntersectMz()).to_array()
print(scores.shape)
print(scores)
Should output
.. testoutput::
(2, 2)
[[1. 0.2]
[0.2 1. ]]
Parameters
----------
name
Name of the score that should be returned (if multiple scores are stored).
"""
return self._scores.to_array(name)
[docs] def to_coo(self, name=None) -> coo_matrix:
"""Scores as scipy sparse COO matrix
Parameters
----------
name
Name of the score that should be returned (if multiple scores are stored).
"""
return self._scores.to_coo(name)
[docs]class ScoresBuilder:
"""
Builder class for :class:`~matchms.Scores`.
"""
[docs] def __init__(self):
self.references = None
self.queries = None
self.is_symmetric = None
self.scores = None
[docs] def build(self) -> Scores:
"""
Build scores object
"""
scores = Scores(references=self.references,
queries=self.queries,
is_symmetric=self.is_symmetric)
scores._scores = self.scores # pylint: disable=protected-access
return scores
[docs] def from_json(self, file_path: str):
"""
Import scores data from a JSON file.
Parameters
----------
file_path
Path to the scores file.
"""
with open(file_path, "rb") as f:
scores_dict = json.load(f, object_hook=scores_json_decoder)
self._validate_json_input(scores_dict)
self.is_symmetric = scores_dict["is_symmetric"]
self.references = scores_dict["references"]
self.queries = scores_dict["queries"] if not self.is_symmetric else self.references
self.scores = self._restructure_scores(scores_dict)
return self
@staticmethod
def _restructure_scores(scores_dict: dict) -> StackedSparseArray:
"""
Restructure scores from a nested list to a numpy array. If scores were stored as an array of tuples, restores
their original form.
"""
sparsestack = StackedSparseArray(scores_dict.get("n_row"), scores_dict.get("n_col"))
sparsestack.row = np.array(scores_dict.get("row"))
sparsestack.col = np.array(scores_dict.get("col"))
dtype = scores_dict.get("dtype")
if len(dtype[0]) > 1:
dtype = [(x[0], x[1]) for x in dtype]
sparsestack.data = unstructured_to_structured(np.array(scores_dict.get("data")),
dtype=np.dtype(dtype))
return sparsestack
@staticmethod
def _construct_similarity_functions(similarity_function_dict: dict) -> BaseSimilarity:
"""
Construct similarity function from its serialized form.
"""
similarity_function_class = get_similarity_function_by_name(similarity_function_dict.pop("__Similarity__"))
return similarity_function_class(**similarity_function_dict)
@staticmethod
def _validate_json_input(scores_dict: dict):
if {"__Scores__", "is_symmetric", "references", "queries", "row",
"col", "data", "dtype", "n_row", "n_col"} != scores_dict.keys():
raise ValueError("Scores JSON file does not match the expected schema.\n\
Make sure the file contains the following keys:\n\
['__Scores__', 'is_symmetric', 'references', 'queries', 'scores_row',\
'scores_col', 'scores_data', 'scores_dtype']")
[docs]class ScoresJSONEncoder(json.JSONEncoder):
[docs] def default(self, o):
"""JSON Encoder for a matchms.Scores.Scores object"""
class_name = o.__class__.__name__
# do isinstance(o, Scores) without importing matchms.Scores
if class_name == "Scores":
scores = copy.deepcopy(o)
return scores.to_dict()
return json.JSONEncoder.default(self, o)