Source code for matchms.utils

import csv
import logging
import os
from functools import lru_cache
from typing import Callable, Iterable, List
from warnings import warn
from .typing import SpectrumType


logger = logging.getLogger("matchms")


[docs] def get_first_common_element(first: Iterable[str], second: Iterable[str]) -> str: """ Find the first common element between two iterables. Args: first (Iterable[str]): The first iterable of strings. second (Iterable[str]): The second iterable of strings. Returns: str: The first common element found in both iterables, or None if no common element is found. """ return next((item for item in first if item in second), None)
[docs] def get_common_keys(first: List[str], second: List[str]) -> List[str]: """Get common elements of two sets of strings in a case insensitive way. Args: first (List[str]): First list of strings. second (List[str]): List of strings to search for matches. Returns: List[str]: List of common elements without regarding case of first list. """ return [value for value in first if value in second or value.lower() in second]
[docs] def filter_none(iterable: Iterable) -> Iterable: """Filter iterable to remove 'None' elements. Args: iterable (Iterable): Iterable to filter. Returns: Iterable: Filtered iterable. """ return filter(lambda x: x is not None, iterable)
[docs] def load_known_key_conversions(key_conversions_file: str = None) -> dict: """Load dictionary of known key conversions. Makes sure that file loading is cached. Parameters ---------- key_conversions_file : str, optional Path to the CSV file containing key conversions. If not provided, the default file "known_key_conversions.csv" located in the "data" directory relative to the current script will be used. Returns ------- dict A dictionary containing the key conversions loaded from the specified file. Raises ------ AssertionError If the specified key conversions file does not exist. """ if key_conversions_file is None: key_conversions_file = os.path.join(os.path.dirname(__file__), "data", "known_key_conversions.csv") assert os.path.isfile(key_conversions_file), f"Could not find {key_conversions_file}" return _load_key_conversions(key_conversions_file, "known_synonym", "matchms_default")
[docs] def load_export_key_conversions(export_key_conversions_file: str = None, export_style: str = None) -> dict: """Load dictionary of export key conversions. Makes sure that file loading is cached. Parameters ---------- export_key_conversions_file : str, optional Path to the file containing export key conversions. If None, a default path is used. export_style : str, optional Style of the export keys to be used. Returns ------- dict Dictionary containing the export key conversions. Raises ------ AssertionError If the specified export_key_conversions_file does not exist. """ if export_key_conversions_file is None: export_key_conversions_file = os.path.join(os.path.dirname(__file__), "data", "export_key_conversions.csv") assert os.path.isfile(export_key_conversions_file), f"Could not find {export_key_conversions_file}" return _load_key_conversions(export_key_conversions_file, "matchms", export_style)
@lru_cache(maxsize=4) def _load_key_conversions(file: str, source: str, target: str) -> dict: """ Load key conversions from a CSV file. This function reads a CSV file and creates a dictionary that maps values from the 'source' column to the 'target' column. Parameters ---------- file : str Path to the CSV file. source : str The column name in the CSV file to use as the source keys. target : str The column name in the CSV file to use as the target values. Returns ------- dict A dictionary where the keys are values from the 'source' column and the values are from the 'target' column. """ with open(file, newline="", encoding="utf-8-sig") as csvfile: reader = csv.DictReader(csvfile) key_conversions = {} for row in reader: key_conversions[row[source]] = row[target] return key_conversions
[docs] def fingerprint_export_warning(spectra: List[SpectrumType]): """ Check if any spectrum in the provided list contains a "fingerprint" and log a warning if so. Parameters ---------- spectra : List[SpectrumType] A list of spectrum objects to be checked for the presence of a "fingerprint". Notes ----- This function will log a warning message if any spectrum in the list has a "fingerprint" attribute that is not None. """ if any(x.get("fingerprint") is not None for x in spectra): logger.warning("fingerprint found but will not be written to file.")
[docs] def filter_empty_spectra(spectra: List[SpectrumType]) -> List[SpectrumType]: """Filter None values in spectra list. Parameters ---------- spectra List of spectra to filter. """ return [x for x in spectra if x is not None]
[docs] def rename_deprecated_params(param_mapping: dict, version: str = None) -> Callable: """Decorator for renaming old, deprecated parameters. Usage example: .. testcode:: @rename_deprecated_params({"spectrums": "spectra"}, version="0.1.0") def example_func(spectra: List[Spectrum], another_param: str): some function logic using spectra example_func(spectrums: [...], another_param: "some string") Parameters ---------- param_mapping Dict of mapping from old to new parameter names e.g., {"spectrums": "spectra"} version Version in which the parameters are marked as deprecated. Returns: Callable function. """ def decorator(func: Callable) -> Callable: def wrapper(*args, **kwargs): # New args new_args = list(args) new_kwargs = kwargs.copy() # Handle positional arguments for i, (old_param, new_param) in enumerate(param_mapping.items()): if i < len(new_args): new_kwargs[new_param] = new_args[i] # Handle keyword arguments for old_param, new_param in param_mapping.items(): if old_param in kwargs: new_kwargs[new_param] = kwargs.pop(old_param) warning_msg = ( f"Parameter '{old_param}' is deprecated and will be removed in the future. " f"Use '{new_param}' instead." ) if version is not None: warning_msg += f" -- Deprecated since version {version}." warn(warning_msg, DeprecationWarning, stacklevel=2) # Remove old params in keyword arguments, if present for old_param in param_mapping.keys(): new_kwargs.pop(old_param, None) # Create final args based on new keyword arguments final_args = [] for i, param in enumerate(func.__code__.co_varnames): if param in new_kwargs: final_args.append(new_kwargs.pop(param)) elif i < len(new_args): final_args.append(new_args[i]) else: break return func(*final_args, **new_kwargs) return wrapper return decorator
[docs] def to_camel_case(snake_str: str) -> str: """Converts snake_case to camelCase. Used for conversion between snake_case and camelCase to match third party libraries (e.g., RDKit). Parameters ---------- snake_str The input string in snake_case Returns ------- camelCased str """ components = snake_str.split("_") return components[0] + "".join(word.capitalize() for word in components[1:])