Source code for MDAnalysis.topology.base
# -*- Mode: python; tab-width: 4; indent-tabs-mode:nil; coding:utf-8 -*-
# vim: tabstop=4 expandtab shiftwidth=4 softtabstop=4
#
# MDAnalysis --- https://www.mdanalysis.org
# Copyright (c) 2006-2017 The MDAnalysis Development Team and contributors
# (see the file AUTHORS for the full list of names)
#
# Released under the GNU Public Licence, v2 or any higher version
#
# Please cite your use of MDAnalysis in published work:
#
# R. J. Gowers, M. Linke, J. Barnoud, T. J. E. Reddy, M. N. Melo, S. L. Seyler,
# D. L. Dotson, J. Domanski, S. Buchoux, I. M. Kenney, and O. Beckstein.
# MDAnalysis: A Python package for the rapid analysis of molecular dynamics
# simulations. In S. Benthall and S. Rostrup editors, Proceedings of the 15th
# Python in Science Conference, pages 102-109, Austin, TX, 2016. SciPy.
# doi: 10.25080/majora-629e541a-00e
#
# N. Michaud-Agrawal, E. J. Denning, T. B. Woolf, and O. Beckstein.
# MDAnalysis: A Toolkit for the Analysis of Molecular Dynamics Simulations.
# J. Comput. Chem. 32 (2011), 2319--2327, doi:10.1002/jcc.21787
#
"""
Base topology reader classes --- :mod:`MDAnalysis.topology.base`
================================================================
Derive topology reader classes from the base class in this module. All
topology readers raise :exc:`IOError` upon failing to read a topology
file and :exc:`ValueError` upon failing to make sense of the read data.
Classes
-------
.. autoclass:: TopologyReaderBase
   :members:
   :inherited-members:
"""
from functools import reduce
import itertools
import numpy as np
import warnings
from .. import _PARSERS, _PARSER_HINTS
from ..coordinates.base import IOBase
from ..lib import util
class _Topologymeta(type):
    """Internal: Topology Parser registration voodoo
    When classes which inherit from TopologyReaderBase are *defined*
    this metaclass makes it known to MDAnalysis.  The optional `format`
    attribute and `_format_hint` staticmethod are read:
     - `format` defines the file extension this Parser targets.
     - `_format_hint` defines a function which returns a boolean if the
       Parser can process a particular object
    Eg::
      class ThingParser(TopologyReaderBase):
          format = ['foo', 'bar']
          @staticmethod
          _format_hint(thing):
              try:
                  import WeirdPackage
              except ImportError:
                  return False
              return isinstance(thing, WeirdPackage.Thing)
    This way there is no strict dependency on "WeirdPackage", but if
    a user supplies a WeirdPackage.Thing the "ThingParser' will be able
    to step up and read it.
    .. versionchanged:: 1.0.0
       Added format_hint functionality
    """
    def __init__(cls, name, bases, classdict):
        type.__init__(type, name, bases, classdict)
        try:
            fmt = util.asiterable(classdict['format'])
        except KeyError:
            pass
        else:
            for fmt_name in fmt:
                fmt_name = fmt_name.upper()
                _PARSERS[fmt_name] = cls
                if '_format_hint' in classdict:
                    _PARSER_HINTS[fmt_name] = classdict['_format_hint'].__func__
[docs]class TopologyReaderBase(IOBase, metaclass=_Topologymeta):
    """Base class for topology readers
    Parameters
    ----------
    filename : str
        name of the topology file
    universe : Universe, optional
        Supply a Universe to the Parser.  This then passes it to the
        atom instances that are created within parsers.
    All topology readers must define a `parse` method which
    returns a Topology object
    Raises
    ------
    * :exc:`IOError` upon failing to read a topology file
    * :exc:`ValueError` upon failing to make sense of the read data
    .. versionadded:: 0.9.0
    .. versionchanged:: 0.9.2
       Added keyword 'universe' to pass to Atom creation.
    """
    def __init__(self, filename):
        self.filename = filename
    def parse(self, **kwargs):  # pragma: no cover
        raise NotImplementedError("Override this in each subclass")
def squash_by(child_parent_ids, *attributes):
    """Squash a child-parent relationship
    Arguments
    ---------
    child_parent_ids - array of ids (unique values that identify the parent)
    *attributes - other arrays that need to follow the sorting of ids
    Returns
    -------
    child_parents_idx - an array of len(child) which points to the index of
                        parent
    parent_ids - len(parent) of the ids
    *parent_attrs - len(parent) of the other attributes
    """
    unique_resids, sort_mask, atom_idx = np.unique(
        child_parent_ids, return_index=True, return_inverse=True)
    return atom_idx, unique_resids, [attr[sort_mask] for attr in attributes]
def change_squash(criteria, to_squash):
    """Squash per atom data to per residue according to changes in resid
    Parameters
    ----------
    criteria : list of numpy ndarray
      Arrays which when changing indicate a new residue
    to_squash : list of numpy arrays
      Arrays which get squashed according to the criteria arrays
    Returns
    -------
    residx : numpy array
      The Residue *index* that each Atom gets assigned to. [len(resids)]
    squashed : numpy array
      The to_squash arrays reduced down to per Residue values
    Example
    -------
    resids = np.array([2, 2, 3, 3, 2, 2])
    resnames = np.array(['RsA', 'RsA', 'RsB', 'RsB', 'RsC', 'RsC'])
    segids = np.array(['A', 'A', 'A', 'A', 'B', 'B'])
    residx, (new_resids, new_resnames, new_segids) = resid_change_squash(
                                                      (resids,), (resids, resnames, segids))
    # Per atom res index
    residx: [0, 0, 1, 1, 2, 2]
    # Per residue record of each attribute
    new_resids: [2, 3, 2]
    new_resnames: ['RsA', 'RsB', 'RsC']
    new_segids: ['A', 'A', 'B']
    """
    def get_borders(*arrays):
        """Generator of indices to slice arrays when they change"""
        borders = np.nonzero(reduce(np.logical_or,
                                    (a[:-1] != a[1:] for a in arrays)))
        # Add Nones so we can slice from start to end
        return [None] + list(borders[0] + 1) + [None]
    l0 = len(criteria[0])
    if not all(len(other) == l0
               for other in itertools.chain(criteria[1:], to_squash)):
        raise ValueError("All arrays must be equally sized")
    # 1) Detect where resids change
    borders = get_borders(*criteria)
    # Number of groups = number of changes + 1
    # 2 `None`s have been added, so -1
    nres = len(borders) - 1
    # 2) Allocate new arrays
    # Per atom record of what residue they belong to
    residx = np.zeros_like(criteria[0], dtype=int)
    # Per residue record of various attributes
    new_others = [np.zeros(nres, dtype=o.dtype) for o in to_squash]
    # 3) Slice through resids and others to find values
    for i, (x, y) in enumerate(zip(borders[:-1], borders[1:])):
        residx[x:y] = i  # atoms between x & y are in the i'th residue
        for old, new in zip(to_squash, new_others):
            new[i] = old[x:y][0]  # TODO: Check that x:y is the same
            # Should be the same for self consistency...
    return residx, new_others
def reduce_singular(values):
    """Returns the value in an array of length 1, or
    the tuple of an array with a longer lengh.
    Parameters
    ----------
    values: array-like
        Array to squash
    Returns
    -------
    values: tuple or single value
    """
    if len(values) == 1:
        return values[0]
    else:
        return tuple(values)