import logging
import os
from collections import OrderedDict
from datetime import datetime
from typing import Any, Dict, Iterable, List, Optional, Tuple, Union
import matchms.similarity as mssimilarity
from matchms import calculate_scores
from matchms.filtering.filter_order import ALL_FILTERS
from matchms.filtering.SpectrumProcessor import SpectrumProcessor
from matchms.importing.load_spectra import load_list_of_spectrum_files
from matchms.logging_functions import (add_logging_to_file,
reset_matchms_logger,
set_matchms_logger_level)
from matchms.typing import SpectrumType
from matchms.yaml_file_functions import (load_workflow_from_yaml_file,
ordered_dump)
_masking_functions = ["filter_by_range"]
_score_functions = {key.lower(): f for key, f in mssimilarity.__dict__.items() if callable(f)}
logger = logging.getLogger("matchms")
[docs]def create_workflow(yaml_file_name: Optional[str] = None,
query_filters: Iterable[Union[str, Tuple[str, Dict[str, Any]]]] = (),
reference_filters: Iterable[Union[str, Tuple[str, Dict[str, Any]]]] = (),
score_computations: Iterable[Union[str, List[dict]]] = (),
) -> OrderedDict:
"""Creates a workflow that specifies the filters and scores needed to be run by Pipeline
Example code can be found in the docstring of Pipeline.
:param yaml_file_name:
A yaml file containing the workflow settings will be saved if a file name is specified.
If None no yaml file will be saved.
:param query_filters:
Additional filters that should be applied to the query spectra.
:param reference_filters:
Additional filters that should be applied to the reference spectra
:param score_computations:
Score computations that should be performed.
"""
workflow = OrderedDict()
queries_processor = SpectrumProcessor(query_filters)
workflow["query_filters"] = queries_processor.processing_steps
reference_processor = SpectrumProcessor(reference_filters)
workflow["reference_filters"] = reference_processor.processing_steps
workflow["score_computations"] = score_computations
if yaml_file_name is not None:
assert not os.path.exists(yaml_file_name), \
"This yaml file name already exists. " \
"To use the settings in the yaml file, please use the load_workflow_from_yaml_file function " \
"in yaml_file_functions.py or check the tutorial."
with open(yaml_file_name, 'w', encoding="utf-8") as file:
file.write("# Matchms pipeline config file \n")
file.write("# Change and adapt fields where necessary \n")
file.write("# " + 20 * "=" + " \n")
ordered_dump(workflow, file)
return workflow
[docs]class Pipeline:
"""Central pipeline class.
The matchms Pipeline class is meant to make running extensive analysis pipelines
fast and easy. It can be used in two different ways. First, a pipeline can be defined
using a config file (a yaml file, best to start from the template provided to define
your own pipline).
Once a config file is defined, the pipeline can be executed with the following code:
.. code-block:: python
from matchms.Pipeline import Pipeline, load_workflow_from_yaml_file
workflow = load_workflow_from_yaml_file("my_config_file.yaml")
pipeline = Pipeline(workflow)
# Optional steps
pipeline.logging_file = "my_pipeline.log"
pipeline.logging_level = "ERROR"
pipeline.run("my_spectrums.mgf")
The second way to define a pipeline is via a Python script. The following code is an
example of how this works:
.. code-block:: python
from matchms.Pipeline import Pipeline, create_workflow
workflow = create_workflow(
yaml_file_name="my_config_file.yaml", # The workflow will be stored in a yaml file.
predefined_processing_queries="basic",
additional_filters_queries=[
["add_parent_mass"],
["normalize_intensities"],
["select_by_relative_intensity", {"intensity_from": 0.0, "intensity_to": 1.0}],
["select_by_mz", {"mz_from": 0, "mz_to": 1000}],
["require_minimum_number_of_peaks", {"n_required": 5}]],
predefined_processing_reference="basic",
additional_filters_references=["add_fingerprint"],
score_computations=[["precursormzmatch", {"tolerance": 120.0}],
["cosinegreedy", {"tolerance": 1.0}]
["filter_by_range", {"name": "CosineGreedy_score", "low": 0.3}],
["modifiedcosine", {"tolerance": 1.0}],
["filter_by_range", {"name": "ModifiedCosine_score", "low": 0.3}]],
)
pipeline = Pipeline(workflow)
pipeline.logging_file = "my_pipeline.log"
pipeline.logging_level = "WARNING"
pipeline.run("my_query_spectra.mgf", "my_reference_spectra.mgf")
To combine this with custom made scores or available matchms-compatible scores
such as `Spec2Vec` or `MS2DeepScore`, it is also possible to pass objects instead of
names to create_workflow
.. code-block:: python
from spec2vec import Spec2Vec
workflow = create_workflow(score_computations = [["precursormzmatch", {"tolerance": 120.0}],
[Spec2Vec, {"model": "my_spec2vec_model.model"}],
["filter_by_range", {"name": "Spec2Vec", "low": 0.3}]])
"""
[docs] def __init__(self, workflow: OrderedDict,
progress_bar=True,
logging_level: str = "WARNING",
logging_file: Optional[str] = None):
"""
Parameters
----------
workflow:
Contains an orderedDict containing the workflow settings. Can be created using create_workflow.
progress_bar:
Default is True. Set to False if no progress bar should be displayed.
"""
self._spectrums_queries = None
self._spectrums_references = None
self.is_symmetric = False
self.scores = None
self.logging_level = logging_level
self.logging_file = logging_file
self.progress_bar = progress_bar
self.__workflow = workflow
self.check_workflow()
self._initialize_spectrum_processor_queries()
if self.is_symmetric is False:
self._initialize_spectrum_processor_references()
def _initialize_spectrum_processor_queries(self):
"""Initialize spectrum processing workflow for the query spectra."""
self.write_to_logfile("--- Processing pipeline query spectra: ---")
self.processing_queries = SpectrumProcessor(self.__workflow["query_filters"])
self.write_to_logfile(str(self.processing_queries))
if self.processing_queries.processing_steps != self.__workflow["query_filters"]:
logger.warning("The order of the filters has been changed compared to the Yaml file.")
def _initialize_spectrum_processor_references(self):
"""Initialize spectrum processing workflow for the reference spectra."""
self.write_to_logfile("--- Processing pipeline reference spectra: ---")
self.processing_references = SpectrumProcessor(self.__workflow["reference_filters"])
self.write_to_logfile(str(self.processing_references))
if self.processing_queries.processing_steps != self.__workflow["query_filters"]:
logger.warning("The order of the filters has been changed compared to the Yaml file.")
[docs] def check_workflow(self):
"""Define Pipeline workflow based on a yaml file (config_file).
"""
assert isinstance(self.__workflow, OrderedDict), \
f"Workflow is expectd to be a OrderedDict, instead it was of type {type(self.__workflow)}"
expected_keys = {"query_filters", "reference_filters", "score_computations"}
assert set(self.__workflow.keys()) == expected_keys
check_score_computation(score_computations=self.score_computations)
[docs] def run(self, query_files, reference_files = None, cleaned_query_file=None, cleaned_reference_file=None):
"""Execute the defined Pipeline workflow.
This method will execute all steps of the workflow.
1) Initializing the log file and importing the spectrums
2) Spectrum processing (using matchms filters)
3) Score Computations
"""
if cleaned_reference_file is not None:
if os.path.exists(cleaned_reference_file):
raise FileExistsError("The specified save references file already exists")
if cleaned_query_file is not None:
if os.path.exists(cleaned_query_file):
raise FileExistsError("The specified save queries file already exists")
self.set_logging()
self.write_to_logfile("--- Start running matchms pipeline. ---")
self.write_to_logfile(f"Start time: {str(datetime.now())}")
self.import_spectrums(query_files, reference_files)
# Processing
self.write_to_logfile("--- Processing spectra ---")
self.write_to_logfile(f"Time: {str(datetime.now())}")
# Process query spectra
spectrums, report = self.processing_queries.process_spectrums(self._spectrums_queries,
progress_bar=self.progress_bar,
cleaned_spectra_file=cleaned_query_file)
self._spectrums_queries = spectrums
self.write_to_logfile(str(report))
# Process reference spectra (if necessary)
if self.is_symmetric is False:
self._spectrums_references, report = self.processing_references.process_spectrums(
self._spectrums_references, progress_bar=self.progress_bar, cleaned_spectra_file=cleaned_reference_file)
self.write_to_logfile(str(report))
else:
self._spectrums_references = self._spectrums_queries
# Score computation and masking
self.write_to_logfile("--- Computing scores ---")
for i, computation in enumerate(self.score_computations):
self.write_to_logfile(f"Time: {str(datetime.now())}")
if not isinstance(computation, list):
computation = [computation]
if isinstance(computation[0], str) and computation[0] in _masking_functions:
self.write_to_logfile(f"-- Score masking: {computation} --")
self._apply_score_masking(computation)
else:
self.write_to_logfile(f"-- Score computation: {computation} --")
self._apply_similarity_measure(computation, i)
self.write_to_logfile(f"--- Pipeline run finised ({str(datetime.now())}) ---")
def _apply_score_masking(self, computation):
"""Apply filter to remove scores which are out of the set range.
"""
if len(computation) == 1:
name = self.scores.score_names[-1]
self.scores.filter_by_range(name=name)
elif "name" not in computation[1]:
name = self.scores.scores.score_names[-1]
self.scores.filter_by_range(name=name, **computation[1])
else:
self.scores.filter_by_range(**computation[1])
def _apply_similarity_measure(self, computation, i):
"""Run score computations for all listed methods and on all loaded and processed spectra.
"""
def get_similarity_measure(computation):
if isinstance(computation[0], str):
if len(computation) > 1:
return _score_functions[computation[0]](**computation[1])
return _score_functions[computation[0]]()
if callable(computation[0]):
if len(computation) > 1:
return computation[0](**computation[1])
return computation[0]()
raise TypeError("Unknown similarity measure.")
similarity_measure = get_similarity_measure(computation)
# If this is the first score computation:
if i == 0:
self.scores = calculate_scores(self._spectrums_references,
self._spectrums_queries,
similarity_measure,
array_type="sparse",
is_symmetric=self.is_symmetric)
else:
new_scores = similarity_measure.sparse_array(references=self._spectrums_references,
queries=self._spectrums_queries,
idx_row=self.scores.scores.row,
idx_col=self.scores.scores.col,
is_symmetric=self.is_symmetric)
self.scores.scores.add_sparse_data(self.scores.scores.row,
self.scores.scores.col,
new_scores,
similarity_measure.__class__.__name__)
[docs] def set_logging(self):
"""Set the matchms logger to write messages to file (if defined).
"""
reset_matchms_logger()
set_matchms_logger_level(self.logging_level)
if self.logging_file is not None:
add_logging_to_file(self.logging_file,
loglevel=self.logging_level,
remove_stream_handlers=True)
else:
logger.warning("No logging file was defined."
"Logging messages will not be written to file.")
[docs] def write_to_logfile(self, line):
"""Write message to log file.
"""
if self.logging_file is not None:
with open(self.logging_file, "a", encoding="utf-8") as f:
f.write(line + '\n')
[docs] def import_spectrums(self,
query_files: Union[List[str], str],
reference_files: Optional[Union[List[str], str]] = None):
"""Import spectra from file(s).
Parameters
----------
query_files
List of files, or single filename, containing the query spectra.
reference_files
List of files, or single filename, containing the reference spectra.
If set to None (default) then all query spectra will be compared to each other.
"""
# import query spectra
self.write_to_logfile("--- Importing data ---")
self._spectrums_queries = load_list_of_spectrum_files(query_files)
self.write_to_logfile(f"Loaded query spectra from {query_files}")
# import reference spectra
if reference_files is None:
self.is_symmetric = True
self._spectrums_references = self._spectrums_queries
self.write_to_logfile("Reference spectra are equal to the query spectra (is_symmetric = True)")
else:
self._spectrums_references = load_list_of_spectrum_files(reference_files)
self.write_to_logfile(f"Loaded reference spectra from {reference_files}")
# Getter & Setters
@property
def score_computations(self) -> Iterable[Union[str, List[dict]]]:
return self.__workflow.get("score_computations")
@score_computations.setter
def score_computations(self, computations):
self.__workflow["score_computations"] = computations
check_score_computation(score_computations=self.score_computations)
@property
def query_filters(self) -> Iterable[Union[str, List[dict]]]:
return self.__workflow.get("query_filters")
@query_filters.setter
def query_filters(self, filters: Iterable[Union[str, List[dict]]]):
self.__workflow["query_filters"] = filters
self._initialize_spectrum_processor_queries()
@property
def reference_filters(self) -> Iterable[Union[str, List[dict]]]:
return self.__workflow.get("reference_filters")
@reference_filters.setter
def reference_filters(self, filters: Iterable[Union[str, List[dict]]]):
self.__workflow["reference_filters"] = filters
self._initialize_spectrum_processor_references()
@property
def spectrums_queries(self) -> List[SpectrumType]:
return self._spectrums_queries
@property
def spectrums_references(self) -> List[SpectrumType]:
return self._spectrums_references
[docs]def get_unused_filters(yaml_file):
"""Prints all filter names that are in ALL_FILTERS, but not in the yaml file"""
workflow = load_workflow_from_yaml_file(yaml_file)
processor = SpectrumProcessor(workflow["query_filters"])
filters_used = [filter_function.__name__ for filter_function in processor.filters]
for filter_function in ALL_FILTERS:
if filter_function.__name__ not in filters_used:
print(filter_function.__name__)
[docs]def check_score_computation(score_computations: Iterable[Union[str, List[dict]]]):
"""Check if the score computations seem OK before running.
Aim is to avoid pipeline crashing after long computation.
"""
# Check if all score compuation steps exist
for computation in score_computations:
if not isinstance(computation, list):
computation = [computation]
if isinstance(computation[0], str) and computation[0] in _masking_functions:
continue
if isinstance(computation[0], str) and computation[0] in _score_functions:
continue
if callable(computation[0]):
continue
raise ValueError(f"Unknown score computation: {computation[0]}.")