import json
from dataclasses import dataclass
from pathlib import Path
import numpy as np
from scipy.sparse import coo_array
from matchms.typing import ScoresType
[docs]
@dataclass(frozen=True)
class ScoresMask:
"""Boolean mask for Scores, stored either densely or as sparse coordinates.
Parameters
----------
shape
Shape of the score matrix this mask applies to.
dense_mask
Optional dense boolean array of shape `shape`. If provided, `row` and `col` must be None.
row
Optional array of row indices for sparse representation.
col
Optional array of column indices for sparse representation. If provided, `dense_mask` must be None.
"""
shape: tuple[int, int]
dense_mask: np.ndarray | None = None
row: np.ndarray | None = None
col: np.ndarray | None = None
def __post_init__(self):
has_dense = self.dense_mask is not None
has_sparse = self.row is not None or self.col is not None
if has_dense and has_sparse:
raise ValueError("ScoresMask must be either dense or sparse, not both.")
if not has_dense and not has_sparse:
raise ValueError("ScoresMask requires either dense_mask or row/col.")
if has_sparse:
if self.row is None or self.col is None:
raise ValueError("Sparse ScoresMask requires both row and col.")
if self.row.shape != self.col.shape:
raise ValueError("row and col must have the same shape.")
@property
def is_sparse(self) -> bool:
return self.dense_mask is None
def to_dense(self) -> np.ndarray:
if self.dense_mask is not None:
return self.dense_mask
mask = np.zeros(self.shape, dtype=bool)
mask[self.row, self.col] = True
return mask
def __and__(self, other: "ScoresMask") -> "ScoresMask":
self._check_shape(other)
if self.is_sparse and other.is_sparse:
return self._from_coord_set(self._coord_set() & other._coord_set())
return ScoresMask(shape=self.shape, dense_mask=self.to_dense() & other.to_dense())
def __or__(self, other: "ScoresMask") -> "ScoresMask":
self._check_shape(other)
if self.is_sparse and other.is_sparse:
return self._from_coord_set(self._coord_set() | other._coord_set())
return ScoresMask(shape=self.shape, dense_mask=self.to_dense() | other.to_dense())
def __invert__(self) -> "ScoresMask":
return ScoresMask(shape=self.shape, dense_mask=~self.to_dense())
def _check_shape(self, other: "ScoresMask") -> None:
if self.shape != other.shape:
raise ValueError(f"Incompatible mask shapes: {self.shape} and {other.shape}.")
def _coord_set(self) -> set[tuple[int, int]]:
return set(zip(self.row.tolist(), self.col.tolist(), strict=True))
def _from_coord_set(self, coords: set[tuple[int, int]]) -> "ScoresMask":
if not coords:
row = np.array([], dtype=np.int_)
col = np.array([], dtype=np.int_)
else:
coords = sorted(coords)
row = np.array([r for r, _ in coords], dtype=np.int_)
col = np.array([c for _, c in coords], dtype=np.int_)
return ScoresMask(shape=self.shape, row=row, col=col)
[docs]
class Scores:
"""Container for computed matchms scores.
The ``Scores`` class stores the output of one similarity computation and provides
a small, intuitive API that works for both dense and sparse score matrices.
A ``Scores`` instance can represent either:
- a scalar score matrix with one field, usually ``"score"``
- a multi-field score result, for example ``"score"`` and ``"matches"``
- dense data stored as NumPy arrays
- sparse data stored as SciPy COO arrays
Parameters
----------
data
Dictionary mapping score field names to score data.
Each value must be either a 2D NumPy array or a SciPy ``coo_array``.
All fields must have the same shape and must all be either dense or sparse.
Notes
-----
The class is designed to offer a consistent API independent of the underlying
storage format.
Field access
Score fields can be accessed by name, for example ``scores["score"]`` or
``scores["matches"]``. Field selection returns another ``Scores`` object
containing only the selected field.
Scalar scores
If only one field is present, direct comparisons are supported, for example
``scores > 0.5``. This is equivalent to ``scores["score"] > 0.5``.
Masking
Boolean masking returns a filtered ``Scores`` object with the same shape.
For example, ``scores[scores["score"] > 0.5]`` keeps only entries where the
condition is true.
Slicing
Basic slicing is supported, for example ``scores[3, 4]``, ``scores[3, :]``,
or ``scores[:, 2]``.
Conversion
Use :meth:`to_array` to obtain a dense NumPy representation and
:meth:`to_coo` to obtain a sparse COO representation.
Examples
--------
Scalar dense scores:
>>> scores = Scores({"score": np.array([[1.0, 0.0], [0.3, 0.8]])})
>>> scores["score"].to_array()
array([[1. , 0. ],
[0.3, 0.8]])
>>> filtered = scores[scores > 0.5]
>>> filtered.to_array()
array([[1. , 0. ],
[0. , 0.8]])
Multi-field scores:
>>> scores = Scores({
... "score": np.array([[1.0, 0.0], [0.3, 0.8]]),
... "matches": np.array([[5, 0], [1, 4]])
... })
>>> scores["score"].to_array()
array([[1. , 0. ],
[0.3, 0.8]])
>>> scores["matches"].to_array()
array([[5, 0],
[1, 4]])
>>> good = scores[(scores["score"] > 0.2) & (scores["matches"] >= 2)]
>>> good.to_array("score")
array([[1. , 0. ],
[0. , 0.8]])
"""
_FORMAT_NAME = "matchms.Scores"
_FORMAT_VERSION = 1
_METADATA_KEY = "__scores_metadata__"
[docs]
def __init__(self, data: dict[str, np.ndarray | coo_array]):
if not data:
raise ValueError("Scores requires at least one score field.")
self._data = dict(data)
self._score_fields = tuple(data.keys())
first_value = next(iter(self._data.values()))
self._is_sparse = isinstance(first_value, coo_array)
self._shape = first_value.shape
for field, value in self._data.items():
if isinstance(value, coo_array) != self._is_sparse:
raise ValueError("All score fields must be either dense or sparse.")
if value.shape != self._shape:
raise ValueError(
f"All score fields must have the same shape. "
f"Field {field!r} has shape {value.shape}, expected {self._shape}."
)
def __repr__(self) -> str:
kind = "sparse" if self.is_sparse else "dense"
return f"Scores(shape={self.shape}, score_fields={self.score_fields}, kind={kind})"
@property
def shape(self) -> tuple[int, int]:
return self._shape
@property
def score_fields(self) -> tuple[str, ...]:
return self._score_fields
@property
def is_sparse(self) -> bool:
return self._is_sparse
@property
def is_scalar(self) -> bool:
return len(self.score_fields) == 1
def to_array(self, field: str | None = None) -> np.ndarray:
field = self._resolve_field(field)
value = self._data[field]
if self.is_sparse:
return value.toarray()
return value.copy()
def to_coo(self, field: str | None = None) -> coo_array:
field = self._resolve_field(field)
value = self._data[field]
if self.is_sparse:
return value
row, col = np.nonzero(value)
return coo_array((value[row, col], (row, col)), shape=value.shape)
def filter(self, mask) -> ScoresType:
if isinstance(mask, ScoresMask):
return self._filter_with_scores_mask(mask)
mask = np.asarray(mask, dtype=bool)
if mask.shape != self.shape:
raise ValueError(f"Mask has shape {mask.shape}, expected {self.shape}.")
return self._filter_with_dense_mask(mask)
def __getitem__(self, key):
"""Access fields, apply masks, or slice score data."""
if isinstance(key, str):
return Scores({key: self._data[self._resolve_field(key)]})
if isinstance(key, ScoresMask):
return self.filter(key)
if isinstance(key, np.ndarray):
return self.filter(key)
if self.is_scalar:
field = self.score_fields[0]
return self.to_array(field)[key]
if isinstance(key, tuple):
return {field: self.to_array(field)[key] for field in self.score_fields}
sliced = {field: self.to_array(field)[key] for field in self.score_fields}
normalized = {}
for field, value in sliced.items():
arr = np.asarray(value)
if arr.ndim == 1:
normalized[field] = arr.reshape(1, -1)
else:
normalized[field] = arr
return Scores(normalized)
def __gt__(self, other):
"""Element-wise comparison for scalar Scores."""
return self._compare_scalar(other, np.greater, sparse_safe=True)
def __ge__(self, other):
"""Element-wise comparison for scalar Scores."""
return self._compare_scalar(other, np.greater_equal, sparse_safe=True)
def __lt__(self, other):
"""Element-wise comparison for scalar Scores."""
return self._compare_scalar(other, np.less, sparse_safe=False)
def __le__(self, other):
"""Element-wise comparison for scalar Scores."""
return self._compare_scalar(other, np.less_equal, sparse_safe=False)
def __eq__(self, other):
"""Element-wise comparison for scalar Scores."""
return self._compare_scalar(other, np.equal, sparse_safe=False)
def __ne__(self, other):
"""Element-wise comparison for scalar Scores."""
return self._compare_scalar(other, np.not_equal, sparse_safe=False)
def _filter_with_scores_mask(self, mask: ScoresMask) -> ScoresType:
if mask.shape != self.shape:
raise ValueError(f"Mask has shape {mask.shape}, expected {self.shape}.")
if self.is_sparse and mask.is_sparse:
return self._filter_sparse_with_sparse_mask(mask)
return self._filter_with_dense_mask(mask.to_dense())
def _filter_sparse_with_sparse_mask(self, mask: ScoresMask) -> ScoresType:
mask_coords = set(zip(mask.row.tolist(), mask.col.tolist(), strict=True))
filtered = {}
for field in self.score_fields:
coo = self.to_coo(field)
keep = np.array(
[(r, c) in mask_coords for r, c in zip(coo.row.tolist(), coo.col.tolist(), strict=True)],
dtype=bool,
)
filtered[field] = coo_array(
(coo.data[keep], (coo.row[keep], coo.col[keep])),
shape=self.shape,
)
return Scores(filtered)
def _filter_with_dense_mask(self, mask: np.ndarray) -> ScoresType:
filtered = {}
for field in self.score_fields:
arr = self.to_array(field)
arr = np.where(mask, arr, 0)
if self.is_sparse:
row, col = np.nonzero(arr)
filtered[field] = coo_array((arr[row, col], (row, col)), shape=self.shape)
else:
filtered[field] = arr
return Scores(filtered)
def _resolve_field(self, field: str | None) -> str:
if field is None:
if self.is_scalar:
return self.score_fields[0]
raise KeyError(f"Field name required. Available fields: {self.score_fields}.")
if field in self._data:
return field
if field == "score" and self.is_scalar:
return self.score_fields[0]
raise KeyError(f"Unknown field {field!r}. Available fields: {self.score_fields}.")
def _compare_scalar(self, other, op, sparse_safe: bool) -> ScoresMask:
"""Compare scalar Scores against a value and return a mask."""
if not self.is_scalar:
raise TypeError(
"Direct comparisons are only supported for scalar Scores. "
f"Available score fields: {self.score_fields}."
)
if not self.is_sparse:
return ScoresMask(shape=self.shape, dense_mask=op(self.to_array(), other))
if sparse_safe and other >= 0:
coo = self.to_coo()
keep = op(coo.data, other)
return ScoresMask(
shape=self.shape,
row=coo.row[keep],
col=coo.col[keep],
)
return ScoresMask(shape=self.shape, dense_mask=op(self.to_array(), other))
[docs]
def copy(self) -> ScoresType:
"""Return a copy of the Scores object.
Dense score fields are copied as independent NumPy arrays. Sparse score
fields are copied as independent SciPy COO arrays. The returned
``Scores`` object preserves the score fields, shape, and storage mode of
the original object.
"""
copied_data = {
field: value.copy()
for field, value in self._data.items()
}
return self.__class__(copied_data)
# File I/O methods for saving and loading Scores objects to/from .npz files
# ---------------------------------------------------------------------------------
[docs]
def save(self, path: str | Path, compressed: bool = True) -> None:
"""Save the Scores object to a single `.npz` file.
Parameters
----------
path
Output file path.
compressed
If True, use ``numpy.savez_compressed``. Default is True.
"""
path = Path(path)
metadata = {
"format": self._FORMAT_NAME,
"version": self._FORMAT_VERSION,
"is_sparse": self.is_sparse,
"score_fields": list(self.score_fields),
"shape": list(self.shape),
}
payload = {
self._METADATA_KEY: np.array(json.dumps(metadata)),
}
if self.is_sparse:
for field in self.score_fields:
coo = self.to_coo(field)
payload[f"{field}__row"] = coo.row
payload[f"{field}__col"] = coo.col
payload[f"{field}__data"] = coo.data
else:
for field in self.score_fields:
payload[field] = self._data[field]
saver = np.savez_compressed if compressed else np.savez
saver(path, **payload)
[docs]
@classmethod
def load(cls, path: str | Path) -> "Scores":
"""Load a Scores object from a `.npz` file.
Parameters
----------
path
Input file path.
Returns
-------
Scores
Reconstructed Scores object.
"""
path = Path(path)
with np.load(path, allow_pickle=False) as npz:
if cls._METADATA_KEY not in npz:
raise ValueError(
f"File {path} does not contain {cls._FORMAT_NAME} metadata."
)
metadata = json.loads(str(npz[cls._METADATA_KEY]))
cls._validate_metadata(metadata, path)
is_sparse = bool(metadata["is_sparse"])
score_fields = tuple(metadata["score_fields"])
shape = tuple(metadata["shape"])
data = {}
if is_sparse:
for field in score_fields:
row_key = f"{field}__row"
col_key = f"{field}__col"
data_key = f"{field}__data"
missing = [key for key in (row_key, col_key, data_key) if key not in npz]
if missing:
raise ValueError(
f"File {path} is missing sparse data for field {field!r}: {missing}"
)
row = npz[row_key]
col = npz[col_key]
values = npz[data_key]
data[field] = coo_array((values, (row, col)), shape=shape)
else:
for field in score_fields:
if field not in npz:
raise ValueError(
f"File {path} is missing dense data for field {field!r}."
)
data[field] = npz[field]
return cls(data)
@classmethod
def _validate_metadata(cls, metadata: dict, path: Path) -> None:
"""Validate loaded metadata."""
if metadata.get("format") != cls._FORMAT_NAME:
raise ValueError(
f"File {path} is not a {cls._FORMAT_NAME} file."
)
if metadata.get("version") != cls._FORMAT_VERSION:
raise ValueError(
f"Unsupported {cls._FORMAT_NAME} version {metadata.get('version')} in file {path}."
)
required_keys = {"format", "version", "is_sparse", "score_fields", "shape"}
missing = required_keys.difference(metadata)
if missing:
raise ValueError(
f"File {path} is missing metadata keys: {sorted(missing)}"
)