Pyteomics documentation v4.7.1

pyteomics.mzml

Contents

Source code for pyteomics.mzml

"""
mzml - reader for mass spectrometry data in mzML format
=======================================================

Summary
-------

mzML is a standard rich XML-format for raw mass spectrometry data storage.
Please refer to `psidev.info <http://www.psidev.info/index.php?q=node/257>`_
for the detailed specification of the format and structure of mzML files.

This module provides a minimalistic way to extract information from mzML
files. You can use the old functional interface (:py:func:`read`) or the new
object-oriented interface (:py:class:`MzML` or :py:class:`PreIndexedMzML`)
to iterate over entries in ``<spectrum>`` elements.
:py:class:`MzML` and :py:class:`PreIndexedMzML` also support direct indexing
with spectrum IDs.

Data access
-----------

  :py:class:`MzML` - a class representing a single mzML file.
  Other data access functions use this class internally.

  :py:class:`PreIndexedMzML` - a class representing a single mzML file.
  Uses byte offsets listed at the end of the file for quick access to spectrum elements.

  :py:func:`read` - iterate through spectra in mzML file. Data from a
  single spectrum are converted to a human-readable dict. Spectra themselves are
  stored under 'm/z array' and 'intensity array' keys.

  :py:func:`chain` - read multiple mzML files at once.

  :py:func:`chain.from_iterable` - read multiple files at once, using an
  iterable of files.

Controlled Vocabularies
~~~~~~~~~~~~~~~~~~~~~~~
mzML relies on controlled vocabularies to describe its contents extensibly. See
`Controlled Vocabulary Terms <../data.html#controlled-vocabulary-terms-in-structured-data>`_
for more details on how they are used.

Handling Time Units and Other Qualified Quantities
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
mzML contains information which may be described as using a variety of different time units.
See `Unit Handling <../data.html#unit-handling>`_ for more information.

Deprecated functions
--------------------

  :py:func:`version_info` - get version information about the mzML file.
  You can just read the corresponding attribute of the :py:class:`MzML` object.

  :py:func:`iterfind` - iterate over elements in an mzML file.
  You can just call the corresponding method of the :py:class:`MzML` object.

Dependencies
------------

This module requires :py:mod:`lxml` and :py:mod:`numpy`.

-------------------------------------------------------------------------------
"""

#   Copyright 2012 Anton Goloborodko, Lev Levitsky
#
#   Licensed under the Apache License, Version 2.0 (the "License");
#   you may not use this file except in compliance with the License.
#   You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
#   Unless required by applicable law or agreed to in writing, software
#   distributed under the License is distributed on an "AS IS" BASIS,
#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#   See the License for the specific language governing permissions and
#   limitations under the License.

import re
import warnings
import numpy as np
from . import xml, auxiliary as aux, _schema_defaults
from .xml import etree

NON_STANDARD_DATA_ARRAY = 'non-standard data array'

STANDARD_ARRAYS = set([
    'm/z array',
    'intensity array',
    'charge array',
    'signal to noise array',
    'time array',
    'wavelength array',
    'flow rate array',
    'pressure array',
    'temperature array',
    'mean charge array',
    'resolution array',
    'baseline array',
    'noise array',
    'sampled noise m/z array',
    'sampled noise intensity array',
    'sampled noise baseline array',
    'ion mobility array',
    'deconvoluted ion mobility drift time array',
    'deconvoluted inverse reduced ion mobility array',
    'deconvoluted ion mobility array',
    'raw ion mobility drift time array',
    'raw inverse reduced ion mobility array',
    'raw ion mobility array',
    'mean inverse reduced ion mobility array',
    'mean ion mobility array',
    'mean ion mobility drift time array',
    'mass array',
    'scanning quadrupole position lower bound m/z array',
    'scanning quadrupole position upper bound m/z array',
])


[docs] class MzML(aux.BinaryArrayConversionMixin, aux.TimeOrderedIndexedReaderMixin, xml.MultiProcessingXML, xml.IndexSavingXML): """Parser class for mzML files.""" file_format = 'mzML' _root_element = 'mzML' _default_schema = _schema_defaults._mzml_schema_defaults _default_version = '1.1.0' _default_iter_tag = 'spectrum' _structures_to_flatten = {'binaryDataArrayList', 'referenceableParamGroupRef'} _indexed_tags = {'spectrum', 'chromatogram'}
[docs] def __init__(self, *args, **kwargs): self.decode_binary = kwargs.pop('decode_binary', True) self._referenceable_param_groups = {} super(MzML, self).__init__(*args, **kwargs)
def __getstate__(self): state = super(MzML, self).__getstate__() state['decode_binary'] = self.decode_binary return state def __setstate__(self, state): super(MzML, self).__setstate__(state) self.decode_binary = state['decode_binary'] def _handle_referenceable_param_group(self, param_group_ref, **kwargs): ref_name = param_group_ref.attrib['ref'] if ref_name not in self._referenceable_param_groups: params = self._referenceable_param_groups[ref_name] = self._retrieve_param_group(ref_name) return params return self._referenceable_param_groups[ref_name] @xml._keepstate def _retrieve_param_group(self, ref_name): group = self.get_by_id(ref_name) group.pop("id", None) return [xml._XMLParam(k, v, None) for k, v in group.items()] def _detect_array_name(self, info): """Determine what the appropriate name for this array is by inspecting the available param-based keys. Parameters ---------- info : dict The collapsed binary tag plus associated *Param data Returns ------- out : str The name for this array entry """ # If this is a non-standard array, we hope the userParams # will conform to the same array suffix pattern. is_non_standard = False # Accumulate possible name candidates candidates = [] for k in info: if k.endswith(' array') and not info[k]: if NON_STANDARD_DATA_ARRAY == k: is_non_standard = True else: candidates.append(k) # A non-standard data array term key might have the name for the data array # as the value. nonstandard_name = info.get(NON_STANDARD_DATA_ARRAY) if nonstandard_name: return nonstandard_name if isinstance(info.get('name'), list): for val in info['name']: if val.endswith(' array'): if NON_STANDARD_DATA_ARRAY == val: is_non_standard = True else: candidates.append(val) # Name candidate resolution n_candidates = len(candidates) # Easy case, exactly one name given if n_candidates == 1: return candidates[0] # We are missing information, but at least # if we know the array is non-standard we # can report it as such. Otherwise fall back # to "binary". This fallback signals special # behavior elsewhere. if n_candidates == 0: invalid = {"encodedLength", "dataProcessingRef", "arrayLength", "binary"} for k in info: if k in invalid: continue candidates.append(k) if len(candidates) == 0: if is_non_standard: return NON_STANDARD_DATA_ARRAY warnings.warn("No options for non-standard data array") return "binary" else: warnings.warn( "Multiple options for naming binary array after no valid name found: %r" % candidates) return max(candidates, key=len) # Multiple choices means we need to make a decision which could # mask data from the user. This should never happen but stay safe. # There are multiple options to choose from. There is no way to # make a good choice here. We first prefer the standardized # arrays before falling back to just guessing. else: candidates = set(candidates) # Maybe we just have a repeated term? if len(candidates) == 1: return next(iter(candidates)) warnings.warn( "Multiple options for naming binary array: %r" % candidates) standard_options = candidates & STANDARD_ARRAYS if standard_options: return max(standard_options, key=len) return max(candidates, key=len) def _determine_array_dtype(self, info): dtype = None types = {'32-bit float': np.float32, '64-bit float': np.float64, '32-bit integer': np.int32, '64-bit integer': np.int64, 'null-terminated ASCII string': np.uint8} for t, code in types.items(): if t in info: dtype = code del info[t] break # sometimes it's under 'name' else: if 'name' in info: for t, code in types.items(): if t in info['name']: dtype = code info['name'].remove(t) break return dtype def _determine_compression(self, info): known_compression_types = set(self.compression_type_map) found_compression_types = known_compression_types & set(info) if found_compression_types: found_compression_types = tuple(found_compression_types) if len(found_compression_types) == 1: del info[found_compression_types[0]] return found_compression_types[0] warnings.warn("Multiple options for binary array compression: %r" % ( found_compression_types,)) return found_compression_types[0] elif "name" in info: found_compression_types = known_compression_types & set(info['name']) if found_compression_types: found_compression_types = tuple(found_compression_types) if len(found_compression_types) == 1: del info['name'][found_compression_types[0]] return found_compression_types[0] else: warnings.warn("Multiple options for binary array compression: %r" % ( found_compression_types,)) return found_compression_types[0] else: return 'no compression' def _handle_binary(self, info, **kwargs): """Special handling when processing and flattening a <binary> tag and its sibling *Param tags. Parameters ---------- info : dict Unprocessed binary array data and metadata Returns ------- out : dict The processed and flattened data array and metadata """ dtype = self._determine_array_dtype(info) compressed = self._determine_compression(info) name = self._detect_array_name(info) binary = info.pop('binary') if not self.decode_binary: info[name] = self._make_record(binary, compressed, dtype, name) return info if binary: array = self.decode_data_array(binary, compressed, dtype) else: array = np.array([], dtype=dtype) if name == 'binary': info[name] = self._convert_array(None, array) else: info = {name: self._convert_array(name, array)} return info def _get_info_smart(self, element, **kw): name = xml._local_name(element) kwargs = dict(kw) rec = kwargs.pop('recursive', None) if name in {'indexedmzML', 'mzML'}: info = self._get_info(element, recursive=(rec if rec is not None else False), **kwargs) else: info = self._get_info(element, recursive=(rec if rec is not None else True), **kwargs) if 'binary' in info and isinstance(info, dict): info = self._handle_binary(info, **kwargs) if 'binaryDataArray' in info and isinstance(info, dict): for array in info.pop('binaryDataArray'): info.update(array) intkeys = {'ms level'} for k in intkeys: if k in info: try: info[k] = int(info[k]) except (ValueError, TypeError): pass return info def _retrieve_refs(self, info, **kwargs): """Retrieves and embeds the data for each attribute in `info` that ends in _ref. Removes the id attribute from `info`""" for k, v in dict(info).items(): if k == 'ref': by_id = self.get_by_id(v, retrieve_refs=True) if by_id is None: warnings.warn('Ignoring unresolved reference: ' + v) else: info.update(by_id) del info[k] info.pop('id', None) @staticmethod def _get_time(scan): return scan['scanList']['scan'][0]['scan start time']
[docs] def read(source, read_schema=False, iterative=True, use_index=False, dtype=None, huge_tree=False, decode_binary=True): """Parse `source` and iterate through spectra. Parameters ---------- source : str or file A path to a target mzML file or the file object itself. read_schema : bool, optional If :py:const:`True`, attempt to extract information from the XML schema mentioned in the mzML header. Otherwise, use default parameters. Not recommended without Internet connection or if you don't like to get the related warnings. iterative : bool, optional Defines whether iterative parsing should be used. It helps reduce memory usage at almost the same parsing speed. Default is :py:const:`True`. use_index : bool, optional Defines whether an index of byte offsets needs to be created for spectrum elements. Default is :py:const:`False`. dtype : type or dict, optional dtype to convert arrays to, one for both m/z and intensity arrays or one for each key. If :py:class:`dict`, keys should be 'm/z array' and 'intensity array'. decode_binary : bool, optional Defines whether binary data should be decoded and included in the output (under "m/z array", "intensity array", etc.). Default is :py:const:`True`. huge_tree : bool, optional This option is passed to the `lxml` parser and defines whether security checks for XML tree depth and node size should be disabled. Default is :py:const:`False`. Enable this option for trusted files to avoid XMLSyntaxError exceptions (e.g. `XMLSyntaxError: xmlSAX2Characters: huge text node`). Returns ------- out : iterator An iterator over the dicts with spectrum properties. """ return MzML(source, read_schema=read_schema, iterative=iterative, use_index=use_index, dtype=dtype, huge_tree=huge_tree, decode_binary=decode_binary)
[docs] def iterfind(source, path, **kwargs): """Parse `source` and yield info on elements with specified local name or by specified "XPath". .. note:: This function is provided for backward compatibility only. If you do multiple :py:func:`iterfind` calls on one file, you should create an :py:class:`MzML` object and use its :py:meth:`!iterfind` method. Parameters ---------- source : str or file File name or file-like object. path : str Element name or XPath-like expression. Only local names separated with slashes are accepted. An asterisk (`*`) means any element. You can specify a single condition in the end, such as: ``"/path/to/element[some_value>1.5]"`` Note: you can do much more powerful filtering using plain Python. The path can be absolute or "free". Please don't specify namespaces. recursive : bool, optional If :py:const:`False`, subelements will not be processed when extracting info from elements. Default is :py:const:`True`. iterative : bool, optional Specifies whether iterative XML parsing should be used. Iterative parsing significantly reduces memory usage and may be just a little slower. When `retrieve_refs` is :py:const:`True`, however, it is highly recommended to disable iterative parsing if possible. Default value is :py:const:`True`. read_schema : bool, optional If :py:const:`True`, attempt to extract information from the XML schema mentioned in the mzIdentML header. Otherwise, use default parameters. Not recommended without Internet connection or if you don't like to get the related warnings. decode_binary : bool, optional Defines whether binary data should be decoded and included in the output (under "m/z array", "intensity array", etc.). Default is :py:const:`True`. Returns ------- out : iterator """ return MzML(source, **kwargs).iterfind(path, **kwargs)
version_info = xml._make_version_info(MzML) # chain = aux._make_chain(read, 'read') chain = aux.ChainBase._make_chain(MzML)
[docs] class PreIndexedMzML(MzML): """Parser class for mzML files, subclass of :py:class:`MzML`. Uses byte offsets listed at the end of the file for quick access to spectrum elements. """
[docs] def build_byte_index(self): """ Build up a :class:`HierarchicalOffsetIndex` of offsets for elements. Calls :meth:`_find_index_list` or falls back on regular :class:`MzML` indexing. Returns ------- out : HierarchicalOffsetIndex """ index = self._find_index_list() if index: return index else: warnings.warn('Could not extract the embedded offset index. Falling back to default indexing procedure.') return super(PreIndexedMzML, self).build_byte_index()
@xml._keepstate def _iterparse_index_list(self, offset): index_map = xml.HierarchicalOffsetIndex() index = index_map._inner_type() self._source.seek(offset) try: for event, elem in etree.iterparse(self._source, events=('start', 'end'), remove_comments=True): if event == 'start': if elem.tag == 'index': index = {} index_map[elem.attrib['name']] = index else: if elem.tag == 'offset': index[elem.attrib['idRef']] = int(elem.text) elem.clear() except etree.XMLSyntaxError: # The iteration has reached the end of the indexList tag and the parser # encounters the later elements in the document. pass return index_map @xml._keepstate def _find_index_list_offset(self): """ Search relative to the bottom of the file upwards to find the offsets of the index lists. Returns ------- list of int A list of byte offsets for `<indexList>` elements """ self._source.seek(-1024, 2) text = self._source.read(1024) index_offsets = list(map(int, re.findall(br'<indexListOffset>(\d+)</indexListOffset>', text))) return index_offsets @xml._keepstate def _find_index_list(self): """ Extract lists of index offsets from the end of the file. Returns ------- dict of str -> dict of str -> int """ offsets = self._find_index_list_offset() index_list = xml.HierarchicalOffsetIndex() for offset in offsets: # Sometimes the offset is at the very beginning of the file, # due to a bug in an older version of ProteoWizard. If this crude # check fails, don't bother searching the entire file, and fall back # on the base class's mechanisms. # # Alternative behavior here would be to start searching for the start # of the index from the bottom of the file, but this version of Proteowizard # also emits invalid offsets which do not improve retrieval time. if offset < 1024: continue index_list = self._iterparse_index_list(offset) return index_list

Contents