# -*- coding: utf8 -*-
"""
mzmlb - reader for mass spectrometry data in mzMLb format
=========================================================
.. warning::
This is a **Provisional Implementation**. The mzMLb format has been published
but is not yet broadly available.
Summary
-------
mzMLb is an HDF5 container format wrapping around the standard rich XML-format
for raw mass spectrometry data storage. Please refer to [1]_ for more information
about mzMLb and its features. Please refer to
`psidev.info <https://www.psidev.info/mzML>`_ for the detailed
specification of the format and structure of mzML files.
This module provides a minimalistic way to extract information from mzMLb
files. You can use the old functional interface (:py:func:`read`) or the new
object-oriented interface (:py:class:`MzMLb` to iterate over entries in ``<spectrum>`` elements.
:py:class:`MzMLb` also support direct indexing with spectrum IDs or indices.
Data access
-----------
:py:class:`MzMLb` - a class representing a single mzMLb file.
Other data access functions use this class internally.
:py:func:`read` - iterate through spectra in mzMLb file. Data from a
single spectrum are converted to a human-readable dict. Spectra themselves are
stored under 'm/z array' and 'intensity array' keys.
:py:func:`chain` - read multiple mzMLb files at once.
:py:func:`chain.from_iterable` - read multiple files at once, using an
iterable of files.
Controlled Vocabularies
~~~~~~~~~~~~~~~~~~~~~~~
mzMLb relies on controlled vocabularies to describe its contents extensibly. See
`Controlled Vocabulary Terms <../data.html#controlled-vocabulary-terms-in-structured-data>`_
for more details on how they are used.
Handling Time Units and Other Qualified Quantities
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
mzMLb contains information which may be described as using a variety of different time units.
See `Unit Handling <../data.html#unit-handling>`_ for more information.
References
----------
.. [1] Bhamber, R. S., Jankevics, A., Deutsch, E. W., Jones, A. R., & Dowsey, A. W. (2021).
MzMLb: A Future-Proof Raw Mass Spectrometry Data Format Based on Standards-Compliant
mzML and Optimized for Speed and Storage Requirements. Journal of Proteome Research,
20(1), 172–183. https://doi.org/10.1021/acs.jproteome.0c00192
"""
import io
import warnings
import logging
from collections import namedtuple
import h5py
try:
logging.getLogger("hdf5plugin").addHandler(logging.NullHandler())
import hdf5plugin
except ImportError:
hdf5plugin = None
import numpy as np
from pyteomics.mzml import MzML as _MzML
from pyteomics.auxiliary.file_helpers import HierarchicalOffsetIndex, TaskMappingMixin, TimeOrderedIndexedReaderMixin, FileReader
from pyteomics import auxiliary as aux, xml
[docs]
def delta_predict(data, copy=True):
'''Reverse the lossy transformation of the delta compression
helper.
Parameters
----------
data : :class:`numpy.ndarray`
The data to transform
copy : bool
Whether to make a copy of the data array or transform it in-place.
Returns
-------
:class:`numpy.ndarray`
The transformed data array
'''
if copy:
out = data.copy()
else:
out = data
for i in range(2, len(data)):
out[i] = out[i] + out[i - 1] - out[0]
return out
[docs]
def linear_predict(data, copy=True):
'''Reverse the lossy transformation of the linear interpolation compression
helper.
Parameters
----------
data : :class:`numpy.ndarray`
The data to transform
copy : bool
Whether to make a copy of the data array or transform it in-place.
Returns
-------
:class:`numpy.ndarray`
The transformed data array
'''
if copy:
out = data.copy()
else:
out = data
for i in range(2, len(data)):
out[i] = out[i] + 2 * out[i - 1] - out[i - 2] - out[1]
return out
[docs]
class HDF5ByteBuffer(io.RawIOBase):
'''Helper class that looks file-like so that we can pass a HDF5 byte dataset to
an arbitrary XML parser.
Implements :class:`~io.RawIOBase` for reading.
'''
[docs]
def __init__(self, buffer, offset=None):
if offset is None:
offset = 0
self.buffer = buffer
self.offset = offset
self.size = self.buffer.size
self.mode = 'rb'
[docs]
def readable(self):
return True
[docs]
def seekable(self):
return True
[docs]
def isatty(self):
return False
[docs]
def seek(self, offset, whence=0):
if whence == io.SEEK_SET:
self.offset = offset
elif whence == io.SEEK_CUR:
self.offset += offset
elif whence == io.SEEK_END:
self.offset = self.size - offset
else:
raise ValueError("Bad whence %r" % whence)
return self.offset
[docs]
def tell(self):
return self.offset
[docs]
def close(self):
return
@property
def closed(self):
return False
def readinto(self, b):
n = len(b)
temp = self._read(n)
m = len(temp)
b[:m] = temp[:]
return m
[docs]
def readall(self):
return bytes(self._read(-1))
def read(self, n=-1):
return bytes(self._read(n))
def write(self, b):
raise ValueError("Read-only stream")
def _read(self, n=-1):
if n == -1:
n = self.size + 1
dat = bytearray(np.array(self.buffer[self.offset:self.offset + n]))
self.offset += n
return dat
[docs]
class external_array_slice(namedtuple('external_array_slice',
['array_name', 'offset', 'length', 'source', 'transform', 'key', 'dtype'])):
[docs]
def decode(self):
"""Decode :attr:`data` into a numerical array
Returns
-------
np.ndarray
"""
return self.source._decode_record(self)
[docs]
class ExternalDataMzML(_MzML):
'''An MzML parser that reads data arrays from an external provider.
This is an implementation detail of :class:`MzMLb`.
'''
[docs]
def __init__(self, *args, **kwargs):
self._external_data_registry = kwargs.pop("external_data_registry", None)
super(ExternalDataMzML, self).__init__(*args, **kwargs)
def _make_record(self, array_name, offset, length, transform, name, dtype):
return external_array_slice(array_name, offset, length, self, transform, name, dtype)
def _transform_array(self, array, transform):
if transform is None:
return array
elif "linear prediction" == transform:
return linear_predict(array, copy=False)
elif "delta prediction" == transform:
return delta_predict(array, copy=False)
else:
raise ValueError("Transformation not recognized")
def _retrieve_external_array(self, array_name, length, offset):
array = self._external_data_registry.get(array_name, length, offset)
return array
[docs]
def decode_data_array(self, array_name, offset, length, transform=None, dtype=np.float64):
array = self._retrieve_external_array(array_name, length, offset)
array = self._transform_array(array, transform)
return array
def _decode_record(self, record):
array = self.decode_data_array(
record.array_name, record.offset, record.length, record.transform, record.dtype)
return self._finalize_record_conversion(array, record)
def _handle_binary(self, info, **kwargs):
if not self.decode_binary:
self.decode_binary = True
# Binary decoding works totally differently here, not supporting the previous signatures
# that the parent method will use. Pretend we are decoding because it is a no-op in the
# parent method.
result = super(ExternalDataMzML, self)._handle_binary(info, **kwargs)
self.decode_binary = False
else:
result = super(ExternalDataMzML, self)._handle_binary(info, **kwargs)
try:
array_name = info['external HDF5 dataset']
except KeyError:
array_name = info['external dataset']
offset = int(info['external offset'])
length = int(info['external array length'])
transform = None
# The zlib compression in these two terms happens automatically during HDF5 encoding and
# the reader needn't even know about it. Need an example of how Numpress will be signaled.
if "linear prediction" in info or "truncation, linear prediction and zlib compression" in info:
transform = 'linear prediction'
elif "delta prediction" in info or "truncation, delta prediction and zlib compression" in info:
transform = 'delta prediction'
if not self.decode_binary:
name = self._detect_array_name(info)
result[name] = self._make_record(
array_name, offset, length, transform, name,
self._external_data_registry.dtype_of(array_name))
return result
array = self._retrieve_external_array(array_name, length, offset)
if len(result) == 1:
name = next(iter(result))
else:
name = self._detect_array_name(info)
result[name] = self._convert_array(name, array)
return result
[docs]
def reset(self):
super(ExternalDataMzML, self).reset()
self._external_data_registry.clear()
[docs]
class chunk_interval_cache_record(namedtuple("chunk_interval_cache_record", ("start", "end", "array"))):
def contains(self, start, end):
if self.start <= start:
if end < self.end:
return True
return False
def get(self, start, end):
return self.array[start - self.start:end - self.start]
def __eq__(self, other):
return self.start == other.start and self.end == other.end
def __ne__(self, other):
return not self == other
def __hash__(self):
return hash(self.start)
[docs]
class ExternalArrayRegistry(object):
'''Read chunks out of a single long array
This is an implementation detail of :class:`MzMLb`
Attributes
----------
registry : Mapping
A mapping from array name to the out-of-core array object.
chunk_size : int
The number of entries to chunk together and keep in memory.
chunk_cache : dict
A mapping from array name to cached array blocks.
'''
[docs]
def __init__(self, registry, chunk_size=None):
if chunk_size is None:
chunk_size = 2 ** 20
else:
chunk_size = int(chunk_size)
self.registry = registry
self.chunk_cache = {}
self.chunk_size = chunk_size
def clear(self):
self.chunk_cache.clear()
def _get_raw(self, array_name, start, end):
return self.registry[array_name][start:end]
def _make_cache_record(self, array_name, start, end):
return chunk_interval_cache_record(start, end, self._get_raw(array_name, start, end))
def get(self, array_name, length, offset=0):
start = offset
end = start + length
try:
cache_record = self.chunk_cache[array_name]
if cache_record.contains(start, end):
return cache_record.get(start, end)
else:
cache_record = self._make_cache_record(
array_name, start, start + max(length, self.chunk_size))
self.chunk_cache[array_name] = cache_record
return cache_record.get(start, end)
except KeyError:
cache_record = self._make_cache_record(
array_name, start, start + max(length, self.chunk_size))
self.chunk_cache[array_name] = cache_record
return cache_record.get(start, end)
return self.registry[array_name][offset:offset + length]
def dtype_of(self, array_name):
return self.registry[array_name].dtype
def __call__(self, array_name, length, offset=0):
return self.get(array_name, length, offset)
[docs]
class MzMLb(TimeOrderedIndexedReaderMixin, TaskMappingMixin):
'''A parser for mzMLb [1]_.
Provides an identical interface to :class:`~pyteomics.mzml.MzML`.
Attributes
----------
path : str, Path-like, or file-like object
The mzMLb file path or a file-like object providing it.
handle : :class:`h5py.File`
The raw HDF5 file container.
mzml_parser : :class:`~.ExternalDataMzML`
The mzML parser for the XML stream inside the HDF5 file with
special behavior for retrieving the out-of-band data arrays
from their respective storage locations.
schema_version : str
The mzMLb HDF5 schema version, distinct from the mzML schema inside it.
References
----------
[1] Bhamber, R. S., Jankevics, A., Deutsch, E. W., Jones, A. R., & Dowsey, A. W. (2021).
MzMLb: A Future-Proof Raw Mass Spectrometry Data Format Based on Standards-Compliant
mzML and Optimized for Speed and Storage Requirements. Journal of Proteome Research,
20(1), 172–183. https://doi.org/10.1021/acs.jproteome.0c00192
'''
_default_iter_tag = ExternalDataMzML._default_iter_tag
file_format = "mzMLb"
[docs]
def __init__(self, path, hdfargs=None, mzmlargs=None, allow_updates=False,
use_index=True, **kwargs):
if hdfargs is None:
hdfargs = {}
if mzmlargs is None:
mzmlargs = {}
mzmlargs.update(kwargs)
self.path = path
self._hdfargs = hdfargs
self._mzmlargs = mzmlargs
self._allow_updates = allow_updates
self.handle = h5py.File(self.path, 'r+' if self._allow_updates else 'r', **hdfargs)
self.schema_version = self.handle['mzML'].attrs.get('version')
self._check_compressor()
self._xml_buffer = io.BufferedReader(HDF5ByteBuffer(self.handle['mzML']))
self._array_registry = ExternalArrayRegistry(self.handle)
self._make_mzml_parser(mzmlargs)
super(MzMLb, self).__init__(**kwargs)
def _check_compressor(self):
for key in self.handle.keys():
if "spectrum_MS_" in key or "chromatogram_MS_":
data = self.handle[key]
try:
filts = data._filters
except AttributeError:
continue
if '32001' in filts:
if hdf5plugin is None:
warnings.warn(
("Blosc meta-compressor detected, but hdf5plugin is "
"not installed, may not be able to access %r") % (key))
def _make_mzml_parser(self, kwargs):
self._mzml_parser = ExternalDataMzML(
self._xml_buffer, external_data_registry=self._array_registry,
use_index=False, **kwargs)
self._mzml_parser._offset_index = self.build_byte_index()
self._mzml_parser._use_index = True
@property
def name(self):
if hasattr(self.path, 'name'):
return self.path.name
return self.path
def build_byte_index(self):
index = HierarchicalOffsetIndex()
for label in [u'spectrum', u'chromatogram']:
sub = index[label]
ids = bytearray(np.array(self.handle['mzML_{}Index_idRef'.format(label)])).split(b"\x00")
offsets = self.handle["mzML_{}Index".format(label)][:-1]
for i, o in enumerate(offsets):
sub[ids[i].decode('utf8')] = o
return index
[docs]
def get_by_id(self, id):
"""Parse the file and return the element with `id` attribute equal
to `elem_id`. Returns :py:const:`None` if no such element is found.
Parameters
----------
elem_id : str
The value of the `id` attribute to match.
Returns
-------
out : :py:class:`dict` or :py:const:`None`
"""
return self._mzml_parser.get_by_id(id)
def get_by_ids(self, ids):
return self._mzml_parser.get_by_ids(ids)
def get_by_index(self, i):
return self._mzml_parser.get_by_index(i)
def get_by_indexes(self, indexes):
return self._mzml_parser.get_by_indexes(indexes)
def get_by_index_slice(self, s):
return self._mzml_parser.get_by_index_slice(s)
def get_by_key_slice(self, s):
return self._mzml_parser.get_by_key_slice(s)
def __contains__(self, key):
return key in self.index
def __getitem__(self, i):
return self._mzml_parser[i]
def __len__(self):
return len(self._mzml_parser)
def __iter__(self):
return iter(self._mzml_parser)
def __next__(self):
return next(self._mzml_parser)
def next(self):
return self.__next__()
def __reduce__(self):
return self.__class__, (self.path, self._hdfargs, self._mzmlargs, self._allow_updates)
def close(self):
self.handle.close()
def __enter__(self):
return self
def __exit__(self, type, value, traceback):
self.close()
def iterfind(self, *args, **kwargs):
iterf = self._mzml_parser.iterfind(*args, **kwargs)
iterf.parser = self
return iterf
def _iterfind_impl(self, path, *args, **kwargs):
return self._mzml_parser._iterfind_impl(path, *args, **kwargs)
@property
def index(self):
return self._mzml_parser.index
@property
def _offset_index(self):
return self._mzml_parser._offset_index
@property
def default_index(self):
return self._mzml_parser.default_index
def _get_time(self, scan):
return self._mzml_parser._get_time(scan)
@property
def mzml_parser(self):
return self._mzml_parser
def _task_map_iterator(self):
"""Returns the :class:`Iteratable` to use when dealing work items onto the input IPC
queue used by :meth:`map`
Returns
-------
:class:`Iteratable`
"""
return iter(self.index[self._default_iter_tag])
def read(self, n=-1):
return self._mzml_parser.read(n)
def reset(self):
self._mzml_parser.reset()
def seek(self, offset, whence=0):
self._mzml_parser.seek(offset, whence)
def tell(self):
return self._mzml_parser.tell()
[docs]
def get_dataset(self, name):
'''Get an HDF5 dataset by its name or path relative to
the root node.
.. warning::
Because this accesses HDF5 data directly, it may be possible to mutate
the underlying file if :attr:`allow_updates` is :const:`True`.
Parameters
----------
name : :class:`str`
The dataset name or path.
Returns
-------
:class:`h5py.Dataset` or :class:`h5py.Group`
Raises
------
KeyError :
The name is not found.
'''
return self.handle[name]
[docs]
def read(source, dtype=None):
"""Parse `source` and iterate through spectra.
Parameters
----------
source : str or file
A path to a target mzMLb file or the file object itself.
dtype : type or dict, optional
dtype to convert arrays to, one for both m/z and intensity arrays or one for each key.
If :py:class:`dict`, keys should be 'm/z array' and 'intensity array'.
Returns
-------
out : iterator
An iterator over the dicts with spectrum properties.
"""
reader = MzMLb(source, dtype=dtype)
return reader
# The MzMLb class is detatched from the normal :class:`FileReader`-based inheritance tree,
# this grafts it back on for :func:`isinstance` and :func:`issubclass` tests at least.
FileReader.register(MzMLb)
version_info = xml._make_version_info(MzMLb)
# chain = aux._make_chain(read, 'read')
chain = aux.ChainBase._make_chain(MzMLb)