"""Helper functions to build and handle spectral networks."""
from collections.abc import Sequence
import numpy as np
from matchms import Scores
[docs]
def get_top_hits(
scores: Scores,
top_n: int = 25,
axis: int = 1,
score_name: str | None = None,
identifiers: Sequence | None = None,
ignore_diagonal: bool = False,
) -> tuple[dict, dict]:
"""Get top_n highest scores and corresponding indices for each row or column.
Parameters
----------
scores
Matchms Scores object containing similarity values.
top_n
Number of top hits to return per row or column.
axis
Axis along which to search:
- ``axis=1``: get top hits for each row
- ``axis=0``: get top hits for each column
score_name
Name of the score field to use when ``scores`` contains multiple fields.
If None:
- scalar Scores: the only field is used
- multi-field Scores: defaults to ``"score"`` if available, otherwise raises
identifiers
Optional identifiers for the selected axis.
- for ``axis=1``, must have length ``scores.shape[0]``
- for ``axis=0``, must have length ``scores.shape[1]``
If None, integer indices are used as dictionary keys.
ignore_diagonal
If True, diagonal self-hits are excluded. This is only meaningful for
square score matrices where row and column indices refer to the same set.
Returns
-------
similars_idx, similars_scores
Two dictionaries:
- keys are identifiers (or integer row/column indices)
- values are NumPy arrays of hit indices and hit scores
"""
if axis not in (0, 1):
raise ValueError("axis must be 0 or 1.")
matrix = _get_score_array(scores, score_name)
n_rows, n_cols = matrix.shape
expected_len = n_rows if axis == 1 else n_cols
if identifiers is None:
identifiers = list(range(expected_len))
else:
if len(identifiers) != expected_len:
raise ValueError(
f"identifiers must have length {expected_len} for axis={axis}, "
f"but got length {len(identifiers)}."
)
if ignore_diagonal and n_rows != n_cols:
raise ValueError("ignore_diagonal=True requires a square score matrix.")
if axis == 1:
return _get_top_hits_along_rows(matrix, identifiers, top_n, ignore_diagonal)
return _get_top_hits_along_columns(matrix, identifiers, top_n, ignore_diagonal)
[docs]
def get_top_hits_by_row(
scores: Scores,
top_n: int = 25,
score_name: str | None = None,
identifiers: Sequence | None = None,
ignore_diagonal: bool = False,
) -> tuple[dict, dict]:
"""Get top hits for each row."""
return get_top_hits(
scores=scores,
top_n=top_n,
axis=1,
score_name=score_name,
identifiers=identifiers,
ignore_diagonal=ignore_diagonal,
)
[docs]
def get_top_hits_by_column(
scores: Scores,
top_n: int = 25,
score_name: str | None = None,
identifiers: Sequence | None = None,
ignore_diagonal: bool = False,
) -> tuple[dict, dict]:
"""Get top hits for each column."""
return get_top_hits(
scores=scores,
top_n=top_n,
axis=0,
score_name=score_name,
identifiers=identifiers,
ignore_diagonal=ignore_diagonal,
)
def _get_score_array(scores: Scores, score_name: str | None) -> np.ndarray:
"""Return the selected score field as a dense NumPy array."""
if score_name is None:
if scores.is_scalar:
return scores.to_array()
if "score" in scores.score_fields:
return scores["score"].to_array()
raise KeyError(
"score_name must be provided for multi-field Scores when no 'score' field exists. "
f"Available fields: {scores.score_fields}."
)
return scores[score_name].to_array()
def _sorted_top_indices(
values: np.ndarray,
top_n: int,
exclude_index: int | None = None,
) -> np.ndarray:
"""Return top indices sorted by descending score, ties by ascending index."""
if top_n <= 0 or len(values) == 0:
return np.array([], dtype=int)
extra = 1 if exclude_index is not None else 0
n_select = min(top_n + extra, len(values))
candidate_idx = np.argpartition(values, -n_select)[-n_select:]
if exclude_index is not None:
candidate_idx = candidate_idx[candidate_idx != exclude_index]
# Sort by descending score, then ascending index
candidate_scores = values[candidate_idx]
order = np.lexsort((candidate_idx, -candidate_scores))
return candidate_idx[order][:top_n]
def _get_top_hits_along_rows(
matrix: np.ndarray,
identifiers: Sequence,
top_n: int,
ignore_diagonal: bool,
) -> tuple[dict, dict]:
"""Get top hits for each row."""
similars_idx = {}
similars_scores = {}
for i in range(matrix.shape[0]):
values = matrix[i, :]
order = _sorted_top_indices(
values,
top_n=top_n,
exclude_index=i if ignore_diagonal else None,
)
similars_idx[identifiers[i]] = order
similars_scores[identifiers[i]] = values[order]
return similars_idx, similars_scores
def _get_top_hits_along_columns(
matrix: np.ndarray,
identifiers: Sequence,
top_n: int,
ignore_diagonal: bool,
) -> tuple[dict, dict]:
"""Get top hits for each column."""
similars_idx = {}
similars_scores = {}
for j in range(matrix.shape[1]):
values = matrix[:, j]
order = _sorted_top_indices(
values,
top_n=top_n,
exclude_index=j if ignore_diagonal else None,
)
similars_idx[identifiers[j]] = order
similars_scores[identifiers[j]] = values[order]
return similars_idx, similars_scores