Pyteomics documentation v4.7.1



Source code for pyteomics.mzxml

mzxml - reader for mass spectrometry data in mzXML format


**mzXML** is a (formerly) standard XML-format for raw mass spectrometry data storage,
intended to be replaced with **mzML**.

This module provides a minimalistic way to extract information from mzXML
files. You can use the old functional interface (:py:func:`read`) or the new
object-oriented interface (:py:class:`MzXML`)
to iterate over entries in ``<scan>`` elements.
:py:class:`MzXML` also supports direct indexing with scan IDs.

Data access

  :py:class:`MzXML` - a class representing a single mzXML file.
  Other data access functions use this class internally.

  :py:func:`read` - iterate through spectra in mzXML file. Data from a
  single scan are converted to a human-readable dict. Spectra themselves are
  stored under 'm/z array' and 'intensity array' keys.

  :py:func:`chain` - read multiple mzXML files at once.

  :py:func:`chain.from_iterable` - read multiple files at once, using an
  iterable of files.

Deprecated functions

  :py:func:`version_info` - get version information about the mzXML file.
  You can just read the corresponding attribute of the :py:class:`MzXML` object.

  :py:func:`iterfind` - iterate over elements in an mzXML file.
  You can just call the corresponding method of the :py:class:`MzXML` object.


This module requires :py:mod:`lxml` and :py:mod:`numpy`.


#   Copyright 2016 Joshua Klein, Lev Levitsky
#   Licensed under the Apache License, Version 2.0 (the "License");
#   you may not use this file except in compliance with the License.
#   You may obtain a copy of the License at
#   Unless required by applicable law or agreed to in writing, software
#   distributed under the License is distributed on an "AS IS" BASIS,
#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#   See the License for the specific language governing permissions and
#   limitations under the License.

import heapq

from . import xml, auxiliary as aux, _schema_defaults
import numpy as np

def _decode_peaks(info, peaks_data):
    """Decode the interleaved base 64 encoded, potentially
    compressed, raw data points.

    info : dict
        The current context
    peaks_data : str
        The textually encoded peak data

    tuple of np.array
        A pair of NumPy arrays containing
        m/z and intensity values.
    compressed = (info.get('compressionType') == 'zlib')
    dt = np.float32 if info['precision'] == '32' else np.float64
    dtype = np.dtype([('m/z array', dt), ('intensity array', dt)]).newbyteorder('>')
    data = aux._decode_base64_data_array(peaks_data, dtype, compressed)
    return data

class IteratorQueue(object):
    def __init__(self, iterator):
        q = list()
        self.queue = q
        self.iterator = iterator
        self.last_index = -1
        self.producer = self.consume(iterator)

    def insert_item(self, scan):
        heapq.heappush(self.queue, (int(scan['num']), scan))

    def __iter__(self):
        return self.producer

    def consume(self, iterator):
        for scan in iterator:
            scan.pop("scan", None)
            if scan['msLevel'] != 1:
                barrier = int(scan['num'])
                while True:
                    idx, item = heapq.heappop(self.queue)
                    if idx >= barrier:
                    yield item
        while self.queue:
            idx, item = heapq.heappop(self.queue)
            yield item

[docs] class MzXML(aux.BinaryArrayConversionMixin, aux.TimeOrderedIndexedReaderMixin, xml.MultiProcessingXML, xml.IndexSavingXML): """Parser class for mzXML files.""" _root_element = 'mzXML' _default_iter_tag = 'scan' _indexed_tags = {'scan'} _indexed_tag_keys = {'scan': 'num'} _default_version = None _default_schema = _schema_defaults._mzxml_schema_defaults _default_id_attr = 'num'
[docs] def __init__(self, *args, **kwargs): self.decode_binary = kwargs.pop('decode_binary', True) super(MzXML, self).__init__(*args, **kwargs)
def __getstate__(self): state = super(MzXML, self).__getstate__() state['decode_binary'] = self.decode_binary return state def __setstate__(self, state): super(MzXML, self).__setstate__(state) self.decode_binary = state['decode_binary'] def _get_info_smart(self, element, **kw): name = xml._local_name(element) kwargs = dict(kw) rec = kwargs.pop('recursive', None) if name in {'mzXML'}: info = self._get_info(element, recursive=( rec if rec is not None else False), **kwargs) else: info = self._get_info(element, recursive=(rec if rec is not None else True), **kwargs) if 'num' in info and isinstance(info, dict): info['id'] = info['num'] if 'peaks' in info and isinstance(info, dict): self._decode_peaks(info) return info def _determine_compression(self, info): if info.get('compressionType') == 'zlib': return 'zlib compression' return "no compression" def _determine_dtype(self, info): dt = np.float32 if info['precision'] == '32' else np.float64 endianess = ">" if info['byteOrder'] in ('network', "big") else "<" dtype = np.dtype( [('m/z array', dt), ('intensity array', dt)]).newbyteorder(endianess) return dtype def _finalize_record_conversion(self, array, record): key = record.key return self._convert_array(key, array[key]) def _decode_peaks(self, info): # handle cases where peaks is the encoded binary data which must be # unpacked if not isinstance(info['peaks'], (dict, list)): compression_type = self._determine_compression(info) dtype = self._determine_dtype(info) binary = info.pop('peaks') if not self.decode_binary: for k in self._array_keys: record = self._make_record(binary, compression_type, dtype, k) info[k] = record else: peak_data = self.decode_data_array(binary, compression_type, dtype) for k in self._array_keys: info[k] = self._convert_array(k, peak_data[k]) # otherwise we've already decoded the arrays and we're just passing # them up the hierarchy else: if not self.decode_binary: arrays = info.pop('peaks')[0] for k in self._array_keys: info[k] = arrays[k] else: peak_data = info.pop('peaks')[0] for k in self._array_keys: info[k] = self._convert_array(k, peak_data.get(k, np.array([])))
[docs] def iterfind(self, path, **kwargs): if path == 'scan': generator = super(MzXML, self).iterfind(path, **kwargs) for item in IteratorQueue(generator): yield item else: for item in super(MzXML, self).iterfind(path, **kwargs): yield item
def _get_time(self, scan): return scan['retentionTime']
[docs] def read(source, read_schema=False, iterative=True, use_index=False, dtype=None, huge_tree=False, decode_binary=True): """Parse `source` and iterate through spectra. Parameters ---------- source : str or file A path to a target mzML file or the file object itself. read_schema : bool, optional If :py:const:`True`, attempt to extract information from the XML schema mentioned in the mzML header. Otherwise, use default parameters. Not recommended without Internet connection or if you don't like to get the related warnings. iterative : bool, optional Defines whether iterative parsing should be used. It helps reduce memory usage at almost the same parsing speed. Default is :py:const:`True`. use_index : bool, optional Defines whether an index of byte offsets needs to be created for spectrum elements. Default is :py:const:`False`. decode_binary : bool, optional Defines whether binary data should be decoded and included in the output (under "m/z array", "intensity array", etc.). Default is :py:const:`True`. huge_tree : bool, optional This option is passed to the `lxml` parser and defines whether security checks for XML tree depth and node size should be disabled. Default is :py:const:`False`. Enable this option for trusted files to avoid XMLSyntaxError exceptions (e.g. `XMLSyntaxError: xmlSAX2Characters: huge text node`). Returns ------- out : iterator An iterator over the dicts with spectrum properties. """ return MzXML(source, read_schema=read_schema, iterative=iterative, use_index=use_index, dtype=dtype, huge_tree=huge_tree, decode_binary=decode_binary)
[docs] def iterfind(source, path, **kwargs): """Parse `source` and yield info on elements with specified local name or by specified XPath. .. note:: This function is provided for backward compatibility only. If you do multiple :py:func:`iterfind` calls on one file, you should create an :py:class:`MzXML` object and use its :py:meth:`!iterfind` method. Parameters ---------- source : str or file File name or file-like object. path : str Element name or XPath-like expression. Only local names separated with slashes are accepted. An asterisk (`*`) means any element. You can specify a single condition in the end, such as: ``"/path/to/element[some_value>1.5]"`` Note: you can do much more powerful filtering using plain Python. The path can be absolute or "free". Please don't specify namespaces. recursive : bool, optional If :py:const:`False`, subelements will not be processed when extracting info from elements. Default is :py:const:`True`. iterative : bool, optional Specifies whether iterative XML parsing should be used. Iterative parsing significantly reduces memory usage and may be just a little slower. When `retrieve_refs` is :py:const:`True`, however, it is highly recommended to disable iterative parsing if possible. Default value is :py:const:`True`. read_schema : bool, optional If :py:const:`True`, attempt to extract information from the XML schema mentioned in the mzIdentML header (default). Otherwise, use default parameters. Disable this to avoid waiting on slow network connections or if you don't like to get the related warnings. decode_binary : bool, optional Defines whether binary data should be decoded and included in the output (under "m/z array", "intensity array", etc.). Default is :py:const:`True`. Returns ------- out : iterator """ return MzXML(source, **kwargs).iterfind(path, **kwargs)
version_info = xml._make_version_info(MzXML) # chain = aux._make_chain(read, 'read') chain = aux.ChainBase._make_chain(MzXML)