Source code for matchms.Pipeline

import logging
import operator
import os
from collections import OrderedDict
from collections.abc import Callable, Iterable, Sequence
from datetime import datetime
import numpy as np
import matchms.similarity as mssimilarity
from matchms.filtering.filter_order import ALL_FILTERS
from matchms.filtering.SpectrumProcessor import (
    FunctionWithParametersType,
    SpectrumProcessor,
)
from matchms.importing.load_spectra import load_list_of_spectrum_files
from matchms.logging_functions import (
    add_logging_to_file,
    reset_matchms_logger,
    set_matchms_logger_level,
)
from matchms.Scores import Scores, ScoresMask
from matchms.similarity.BaseSimilarity import BaseSimilarity
from matchms.typing import SpectrumType
from matchms.yaml_file_functions import load_workflow_from_yaml_file, ordered_dump


logger = logging.getLogger("matchms")

_MASKING_FUNCTIONS = {"mask"}

_SCORE_FUNCTIONS = {}
for key, value in mssimilarity.__dict__.items():
    if isinstance(value, type) and issubclass(value, BaseSimilarity) and value is not BaseSimilarity:
        _SCORE_FUNCTIONS[key.lower()] = value


[docs] def create_workflow( yaml_file_name: str | None = None, spectra_1_filters: Iterable[str | Callable | FunctionWithParametersType] = (), spectra_2_filters: Iterable[str | Callable | FunctionWithParametersType] = (), score_computations: Iterable[str | list[str | dict]] = (), ) -> OrderedDict: """Create a workflow specification for Pipeline.""" workflow = OrderedDict() processor_1 = SpectrumProcessor(spectra_1_filters) processor_2 = SpectrumProcessor(spectra_2_filters) workflow["spectra_1_filters"] = processor_1.processing_steps workflow["spectra_2_filters"] = processor_2.processing_steps workflow["score_computations"] = list(score_computations) if yaml_file_name is not None: if os.path.exists(yaml_file_name): raise FileExistsError( "This yaml file already exists. " "Use load_workflow_from_yaml_file(...) to load an existing workflow." ) with open(yaml_file_name, "w", encoding="utf-8") as file: file.write("# Matchms pipeline config file\n") file.write("# Change and adapt fields where necessary\n") file.write("# " + 20 * "=" + "\n") ordered_dump(workflow, file) return workflow
[docs] class Pipeline: """Central pipeline class. The pipeline applies filters to one or two collections of spectra and then executes a sequence of similarity computations and mask steps. Notes ----- - If only ``spectra_1`` is provided during :meth:`run`, the pipeline assumes a symmetric all-vs-all computation and sets ``is_symmetric=True``. - If ``spectra_2`` is also provided, the pipeline computes ``spectra_1`` vs ``spectra_2`` and sets ``is_symmetric=False``. """
[docs] def __init__( self, workflow: OrderedDict, progress_bar: bool = True, logging_level: str = "WARNING", logging_file: str | None = None, ): self._spectra_1: list[SpectrumType] = [] self._spectra_2: list[SpectrumType] | None = None self.scores: Scores | None = None self.mask: ScoresMask | None = None self.progress_bar = progress_bar self.logging_level = logging_level self.logging_file = logging_file self.__workflow = workflow self.check_workflow() self.processing_spectra_1: SpectrumProcessor | None = None self.processing_spectra_2: SpectrumProcessor | None = None self._initialize_spectrum_processor_1() self._initialize_spectrum_processor_2()
@classmethod def from_yaml( cls, yaml_file_name: str, progress_bar: bool = True, logging_level: str = "WARNING", logging_file: str | None = None, ) -> "Pipeline": workflow = load_workflow_from_yaml_file(yaml_file_name) return cls( workflow=workflow, progress_bar=progress_bar, logging_level=logging_level, logging_file=logging_file, ) def _initialize_spectrum_processor_1(self) -> None: self.write_to_logfile("--- Processing pipeline spectra_1: ---") self.processing_spectra_1 = SpectrumProcessor(self.__workflow["spectra_1_filters"]) self.write_to_logfile(str(self.processing_spectra_1)) if self.processing_spectra_1.processing_steps != self.__workflow["spectra_1_filters"]: logger.warning("The order of spectra_1 filters has been changed compared to the yaml file.") def _initialize_spectrum_processor_2(self) -> None: self.write_to_logfile("--- Processing pipeline spectra_2: ---") self.processing_spectra_2 = SpectrumProcessor(self.__workflow["spectra_2_filters"]) self.write_to_logfile(str(self.processing_spectra_2)) if self.processing_spectra_2.processing_steps != self.__workflow["spectra_2_filters"]: logger.warning("The order of spectra_2 filters has been changed compared to the yaml file.") def check_workflow(self) -> None: if not isinstance(self.__workflow, OrderedDict): raise TypeError( "Workflow is expected to be an OrderedDict, " f"but got type {type(self.__workflow)}." ) expected_keys = {"spectra_1_filters", "spectra_2_filters", "score_computations"} if set(self.__workflow.keys()) != expected_keys: raise ValueError( f"Workflow must contain exactly keys {expected_keys}, " f"but got {set(self.__workflow.keys())}." ) check_score_computation(self.__workflow["score_computations"])
[docs] def run( self, spectra_1, spectra_2=None, cleaned_spectra_1_file=None, cleaned_spectra_2_file=None, create_report: bool = True, ): """Execute the pipeline workflow.""" if cleaned_spectra_1_file is not None and os.path.exists(cleaned_spectra_1_file): raise FileExistsError("The specified cleaned spectra_1 file already exists.") if cleaned_spectra_2_file is not None and os.path.exists(cleaned_spectra_2_file): raise FileExistsError("The specified cleaned spectra_2 file already exists.") self.set_logging() self.write_to_logfile("--- Start running matchms pipeline. ---") self.write_to_logfile(f"Start time: {datetime.now()}") self.import_spectra(spectra_1, spectra_2) self.write_to_logfile("--- Processing spectra ---") self.write_to_logfile(f"Time: {datetime.now()}") report = None if self.processing_spectra_1 is not None: self._spectra_1, report = self.processing_spectra_1.process_spectra( self._spectra_1, progress_bar=self.progress_bar, cleaned_spectra_file=cleaned_spectra_1_file, create_report=create_report, ) self.write_to_logfile(str(report)) if cleaned_spectra_1_file is not None: self.write_to_logfile(f"--- Spectra_1 written to {cleaned_spectra_1_file} ---") if self.processing_spectra_2 is not None and self._spectra_2 is not None: self._spectra_2, report = self.processing_spectra_2.process_spectra( self._spectra_2, progress_bar=self.progress_bar, cleaned_spectra_file=cleaned_spectra_2_file, create_report=create_report, ) self.write_to_logfile(str(report)) if cleaned_spectra_2_file is not None: self.write_to_logfile(f"--- Spectra_2 written to {cleaned_spectra_2_file} ---") self.scores = None self.mask = None self.write_to_logfile("--- Computing scores ---") for computation in self.score_computations: self.write_to_logfile(f"Time: {datetime.now()}") if not isinstance(computation, list): computation = [computation] step_name = computation[0] if isinstance(step_name, str) and step_name.lower() in _MASKING_FUNCTIONS: self.write_to_logfile(f"-- Score masking: {computation} --") self._apply_score_masking(computation) else: self.write_to_logfile(f"-- Score computation: {computation} --") self._apply_similarity_measure(computation) self.write_to_logfile(f"--- Pipeline run finished ({datetime.now()}) ---") return report
def _apply_score_masking(self, computation) -> None: if self.scores is None: raise ValueError("No scores have been computed yet, so masking cannot be applied.") params = computation[1] if len(computation) > 1 else {} operation_name = params.get("operation") value = params.get("value") field = params.get("field") if operation_name is None or "value" not in params: raise ValueError( "Mask computation requires parameters {'operation': ..., 'value': ...}." ) self.mask = _build_mask_from_scores( scores=self.scores, operation_name=operation_name, value=value, field=field, ) def _apply_similarity_measure(self, computation) -> None: similarity_measure = _instantiate_similarity(computation) if self.mask is None: self.scores = similarity_measure.matrix( spectra_1=self._spectra_1, spectra_2=self._spectra_2, progress_bar=self.progress_bar, ) return idx_row, idx_col = _mask_to_index_arrays(self.mask) self.scores = similarity_measure.sparse_matrix( spectra_1=self._spectra_1, spectra_2=self._spectra_2, idx_row=idx_row, idx_col=idx_col, progress_bar=self.progress_bar, ) def set_logging(self) -> None: reset_matchms_logger() set_matchms_logger_level(self.logging_level) if self.logging_file is not None: add_logging_to_file( self.logging_file, loglevel=self.logging_level, remove_stream_handlers=True, ) else: logger.warning("No logging file was defined. Logging messages will not be written to file.") def write_to_logfile(self, line: str) -> None: if self.logging_file is not None: with open(self.logging_file, "a", encoding="utf-8") as file: file.write(line + "\n")
[docs] def import_spectra( self, spectra_1: list[str] | str, spectra_2: list[str] | str | None = None, ) -> None: """Import one or two spectra collections from file(s).""" self.write_to_logfile("--- Importing data ---") self._spectra_1 = load_list_of_spectrum_files(spectra_1) self.write_to_logfile(f"Loaded spectra_1 from {spectra_1}") if spectra_2 is None: self.is_symmetric = True self._spectra_2 = None self.write_to_logfile("No spectra_2 given, using symmetric computation (is_symmetric = True)") else: self.is_symmetric = False self._spectra_2 = load_list_of_spectrum_files(spectra_2) self.write_to_logfile(f"Loaded spectra_2 from {spectra_2}")
def save_as_yaml(self, yaml_file_name: str) -> None: if os.path.exists(yaml_file_name): raise FileExistsError("The specified yaml file already exists.") workflow = OrderedDict() workflow["spectra_1_filters"] = self.processing_spectra_1.processing_steps workflow["spectra_2_filters"] = self.processing_spectra_2.processing_steps workflow["score_computations"] = list(self.score_computations) with open(yaml_file_name, "w", encoding="utf-8") as file: file.write("# Matchms pipeline config file\n") file.write("# Change and adapt fields where necessary\n") file.write("# " + 20 * "=" + "\n") ordered_dump(workflow, file) @property def score_computations(self) -> Sequence[str | list[dict]]: return self.__workflow["score_computations"] @score_computations.setter def score_computations(self, computations): check_score_computation(computations) self.__workflow["score_computations"] = computations @property def spectra_1_filters(self): return self.__workflow["spectra_1_filters"] @spectra_1_filters.setter def spectra_1_filters(self, filters): self.__workflow["spectra_1_filters"] = filters self._initialize_spectrum_processor_1() @property def spectra_2_filters(self): return self.__workflow["spectra_2_filters"] @spectra_2_filters.setter def spectra_2_filters(self, filters): self.__workflow["spectra_2_filters"] = filters self._initialize_spectrum_processor_2() @property def spectra_1(self) -> list[SpectrumType]: return self._spectra_1 @property def spectra_2(self) -> list[SpectrumType] | None: return self._spectra_2
def _instantiate_similarity(computation) -> BaseSimilarity: """Instantiate a similarity measure from a score computation specification.""" name_or_callable = computation[0] params = computation[1] if len(computation) > 1 else {} if isinstance(name_or_callable, str): similarity_cls = _SCORE_FUNCTIONS.get(name_or_callable.lower()) if similarity_cls is None: raise ValueError(f"Unknown score computation: {name_or_callable!r}.") return similarity_cls(**params) if callable(name_or_callable): return name_or_callable(**params) raise TypeError(f"Unknown similarity specification: {name_or_callable!r}.") def _mask_to_index_arrays(mask: ScoresMask) -> tuple[np.ndarray, np.ndarray]: """Convert a ScoresMask to index arrays for rows and columns.""" if mask.is_sparse: return mask.row, mask.col row, col = np.nonzero(mask.dense_mask) return row.astype(np.int_), col.astype(np.int_) def _build_mask_from_scores( scores: Scores, operation_name: str, value, field: str | None = None, ) -> ScoresMask: operations = { ">": operator.gt, ">=": operator.ge, "<": operator.lt, "<=": operator.le, "==": operator.eq, "!=": operator.ne, } if operation_name not in operations: raise ValueError( f"Unknown mask operation {operation_name!r}. Supported operations are: {tuple(operations)}." ) target = scores if field is None else scores[field] try: mask = operations[operation_name](target, value) except TypeError as exc: if field is None and not scores.is_scalar: raise TypeError( "Masking a multi-field Scores object requires specifying a field, " "for example ['mask', {'field': 'score', 'operation': '>=', 'value': 0.3}]." ) from exc raise if not isinstance(mask, ScoresMask): raise TypeError("Score comparison did not produce a ScoresMask as expected.") return mask
[docs] def get_unused_filters(yaml_file): """Checks which filters from matchms are not used in the yaml file.""" workflow = load_workflow_from_yaml_file(yaml_file) processor = SpectrumProcessor(workflow["spectra_1_filters"]) filters_used = [filter_function.__name__ for filter_function in processor.filters] for filter_function in ALL_FILTERS: if filter_function.__name__ not in filters_used: print(filter_function.__name__)
[docs] def check_score_computation(score_computations: Sequence[str | list[dict]]) -> None: """Check if score computations are valid.""" if score_computations is None: return n_steps = len(score_computations) for i, computation in enumerate(score_computations): if not isinstance(computation, list): computation = [computation] if len(computation) == 0: raise ValueError("Empty score computation step is not allowed.") step = computation[0] params = computation[1] if len(computation) > 1 else {} if isinstance(step, str) and step.lower() in _MASKING_FUNCTIONS: if "operation" not in params or "value" not in params: raise ValueError( "Mask computation requires parameters {'operation': ..., 'value': ...}." ) if i == 0 or i == n_steps - 1: raise ValueError("Masking at the start or end of score computations is not allowed.") continue if isinstance(step, str): if step.lower() not in _SCORE_FUNCTIONS: raise ValueError(f"Unknown score computation: {step!r}.") continue if callable(step): continue raise ValueError(f"Unknown score computation: {step!r}.")