Source code for MDAnalysis.coordinates.chemfiles
# -*- Mode: python; tab-width: 4; indent-tabs-mode:nil; coding:utf-8 -*-
# vim: tabstop=4 expandtab shiftwidth=4 softtabstop=4
#
# MDAnalysis --- https://www.mdanalysis.org
# Copyright (c) 2006-2017 The MDAnalysis Development Team and contributors
# (see the file AUTHORS for the full list of names)
#
# Released under the GNU Public Licence, v2 or any higher version
#
# Please cite your use of MDAnalysis in published work:
#
# R. J. Gowers, M. Linke, J. Barnoud, T. J. E. Reddy, M. N. Melo, S. L. Seyler,
# D. L. Dotson, J. Domanski, S. Buchoux, I. M. Kenney, and O. Beckstein.
# MDAnalysis: A Python package for the rapid analysis of molecular dynamics
# simulations. In S. Benthall and S. Rostrup editors, Proceedings of the 15th
# Python in Science Conference, pages 102-109, Austin, TX, 2016. SciPy.
#
# N. Michaud-Agrawal, E. J. Denning, T. B. Woolf, and O. Beckstein.
# MDAnalysis: A Toolkit for the Analysis of Molecular Dynamics Simulations.
# J. Comput. Chem. 32 (2011), 2319--2327, doi:10.1002/jcc.21787
#
"""
Reading trajectory with Chemfiles --- :mod:`MDAnalysis.coordinates.chemfiles`
=============================================================================
Classes to read and write files using the `chemfiles`_ library. This library
provides C++ implementation of multiple formats readers and writers, the full
list if available `here <formats>`_.
.. _chemfiles: https://chemfiles.org
.. _formats: http://chemfiles.org/chemfiles/latest/formats.html
Classes
-------
.. autoclass:: ChemfilesReader
.. autoclass:: ChemfilesWriter
"""
from __future__ import absolute_import, division
from __future__ import print_function, unicode_literals
import numpy as np
from six import string_types
from distutils.version import LooseVersion
import warnings
from . import base, core
try:
    import chemfiles
except ImportError:
    HAS_CHEMFILES = False
else:
    HAS_CHEMFILES = True
MIN_CHEMFILES_VERSION = LooseVersion("0.9")
MAX_CHEMFILES_VERSION = LooseVersion("0.10")
def check_chemfiles_version():
    """Check an appropriate Chemfiles is available
    Returns True if a usable chemfiles version is available
    .. versionadded:: 1.0.0
    """
    if not HAS_CHEMFILES:
        warnings.warn(
            "No Chemfiles package found.  "
            "Try installing with 'pip install chemfiles'"
        )
        return False
    version = LooseVersion(chemfiles.__version__)
    wrong = version < MIN_CHEMFILES_VERSION or version >= MAX_CHEMFILES_VERSION
    if wrong:
        warnings.warn(
            "unsupported Chemfiles version {}, we need a version >{} and <{}"
            .format(version, MIN_CHEMFILES_VERSION, MAX_CHEMFILES_VERSION)
        )
    return not wrong
[docs]class ChemfilesReader(base.ReaderBase):
    """Coordinate reader using chemfiles.
    The file format to used is guessed based on the file extension. If no
    matching format is found, a ``ChemfilesError`` is raised. It is also
    possible to manually specify the format to use for a given file.
    .. versionadded:: 1.0.0
    """
    format = 'chemfiles'
    units = {'time': 'fs', 'length': 'Angstrom'}
    def __init__(self, filename, chemfiles_format="", **kwargs):
        """
        Parameters
        ----------
        filename : chemfiles.Trajectory or str
            the chemfiles object to read or filename to read
        chemfiles_format : str (optional)
            if *filename* was a string, use the given format name instead of
            guessing from the extension. The `list of supported formats
            <formats>`_ and the associated names is available in chemfiles
            documentation.
        **kwargs : dict
            General reader arguments.
        .. _formats: http://chemfiles.org/chemfiles/latest/formats.html
        """
        if not check_chemfiles_version():
            raise RuntimeError("Please install Chemfiles > {}"
                               "".format(MIN_CHEMFILES_VERSION))
        super(ChemfilesReader, self).__init__(filename, **kwargs)
        self._format = chemfiles_format
        self._cached_n_atoms = None
        self._open()
        self.ts = self._Timestep(self.n_atoms, **self._ts_kwargs)
        self.next()
    @staticmethod
    def _format_hint(thing):
        """Can this Reader read *thing*"""
        # nb, filename strings can still get passed through if
        # format='CHEMFILES' is used
        return HAS_CHEMFILES and isinstance(thing, chemfiles.Trajectory)
    def _open(self):
        self._closed = False
        self._step = 0
        self._frame = -1
        if isinstance(self.filename, chemfiles.Trajectory):
            self._file = self.filename
        else:
            self._file = chemfiles.Trajectory(self.filename, 'r', self._format)
    def close(self):
        """close reader"""
        if not self._closed:
            self._file.close()
            self._closed = True
    @property
    def n_frames(self):
        """number of frames in trajectory"""
        return self._file.nsteps
    @property
    def n_atoms(self):
        """number of atoms in the first frame of the trajectory"""
        if self._cached_n_atoms is None:
            self._cached_n_atoms = len(self._file.read_step(0).atoms)
        return self._cached_n_atoms
    def _reopen(self):
        """reopen trajectory"""
        self.close()
        self._open()
    def _read_frame(self, i):
        """read frame i"""
        self._step = i
        return self._read_next_timestep()
    def _read_next_timestep(self, ts=None):
        """copy next frame into timestep"""
        if self._step >= self.n_frames:
            raise IOError('trying to go over trajectory limit')
        if ts is None:
            ts = self.ts
        self.ts = ts
        frame = self._file.read_step(self._step)
        self._frame_to_ts(frame, ts)
        ts.frame = frame.step
        self._step += 1
        return ts
    def _frame_to_ts(self, frame, ts):
        """convert a chemfiles frame to a :class:`TimeStep`"""
        if len(frame.atoms) != self.n_atoms:
            raise IOError(
                "The number of atom changed in the trajectory. "
                "This is not supported by MDAnalysis."
            )
        ts.dimensions[:] = frame.cell.lengths + frame.cell.angles
        ts.positions[:] = frame.positions[:]
        if frame.has_velocities():
            ts.has_velocities = True
            ts.velocities[:] = frame.velocities[:]
    def Writer(self, filename, n_atoms=None, **kwargs):
        """Return writer for trajectory format"""
        if n_atoms is None:
            n_atoms = self.n_atoms
        return ChemfilesWriter(filename, n_atoms, **kwargs)
[docs]class ChemfilesWriter(base.WriterBase):
    """
    Coordinate writer using chemfiles.
    The file format to used is guessed based on the file extension. If no
    matching format is found, a ``ChemfilesError`` is raised. It is also
    possible to manually specify the format to use for a given file.
    Chemfiles support writting to files with varying number of atoms if the
    underlying format support it. This is the case of most of text-based
    formats.
    .. versionadded:: 1.0.0
    """
    format = 'chemfiles'
    multiframe = True
    # chemfiles mostly[1] uses these units for the in-memory representation,
    # and convert into the file format units when writing.
    #
    # [1] mostly since some format don't have a specified unit
    # (XYZ for example), so then chemfiles just assume they are in A and fs.
    units = {'time': 'fs', 'length': 'Angstrom'}
    def __init__(self, filename, n_atoms=0, mode="w", chemfiles_format="", topology=None, **kwargs):
        """
        Parameters
        ----------
        filename: str
            filename of trajectory file.
        n_atoms: int
            number of atoms in the trajectory to write. This value is not
            used and can vary during trajectory, if the underlying format
            support it
        mode : str (optional)
            file opening mode: use 'a' to append to an existing file or 'w' to
            create a new file
        chemfiles_format : str (optional)
            use the given format name instead of guessing from the extension.
            The `list of supported formats <formats>`_ and the associated names
            is available in chemfiles documentation.
        topology : Universe or AtomGroup (optional)
            use the topology from this :class:`~MDAnalysis.core.groups.AtomGroup`
            or :class:`~MDAnalysis.core.universe.Universe` to write all the
            timesteps to the file
        **kwargs : dict
            General writer arguments.
        .. _formats: http://chemfiles.org/chemfiles/latest/formats.html
        """
        if not check_chemfiles_version():
            raise RuntimeError("Please install Chemfiles > {}"
                               "".format(MIN_CHEMFILES_VERSION))
        self.filename = filename
        self.n_atoms = n_atoms
        if mode != "a" and mode != "w":
            raise IOError("Expected 'a' or 'w' as mode in ChemfilesWriter")
        self._file = chemfiles.Trajectory(self.filename, mode, chemfiles_format)
        self._closed = False
        if topology is not None:
            if isinstance(topology, string_types):
                self._file.set_topology(topology)
            else:
                topology = self._topology_to_chemfiles(topology, n_atoms)
                self._file.set_topology(topology)
    def close(self):
        """Close the trajectory file and finalize the writing"""
        if hasattr(self, "_closed") and not self._closed:
            self._file.close()
            self._closed = True
    def _write_next_frame(self, obj):
        """Write information associated with ``obj`` at current frame into trajectory.
        Topology for the output is taken from the `obj` or default to the value
        of the `topology` keyword supplied to the :class:`ChemfilesWriter`
        constructor.
        If `obj` contains velocities, and the underlying format supports it, the
        velocities are writen to the file. Writing forces is unsupported at the
        moment.
        Parameters
        ----------
        obj : AtomGroup or Universe
            The :class:`~MDAnalysis.core.groups.AtomGroup` or
            :class:`~MDAnalysis.core.universe.Universe` to write.
        """
        # TODO 2.0: Remove timestep logic
        if hasattr(obj, "atoms"):
            if hasattr(obj, 'universe'):
                # For AtomGroup and children (Residue, ResidueGroup, Segment)
                ts_full = obj.universe.trajectory.ts
                if ts_full.n_atoms == obj.atoms.n_atoms:
                    ts = ts_full
                else:
                    # Only populate a time step with the selected atoms.
                    ts = ts_full.copy_slice(atoms.indices)
            elif hasattr(obj, 'trajectory'):
                # For Universe only --- get everything
                ts = obj.trajectory.ts
        else:
            if isinstance(obj, base.Timestep):
                ts = obj
                topology = None
            else:
                raise TypeError("No Timestep found in obj argument")
        frame = self._timestep_to_chemfiles(ts)
        frame.topology = self._topology_to_chemfiles(obj, len(frame.atoms))
        self._file.write(frame)
    def _timestep_to_chemfiles(self, ts):
        """
        Convert a Timestep to a chemfiles Frame
        """
        # TODO: CONVERTERS?
        frame = chemfiles.Frame()
        frame.resize(ts.n_atoms)
        if ts.has_positions:
            frame.positions[:] = ts.positions[:]
        if ts.has_velocities:
            frame.add_velocities()
            frame.velocities[:] = ts.velocities[:]
        frame.cell = chemfiles.UnitCell(*ts.dimensions)
        return frame
    def _topology_to_chemfiles(self, obj, n_atoms):
        """
        Convert an AtomGroup or an Universe to a chemfiles Topology
        """
        topology = chemfiles.Topology()
        if not hasattr(obj, "atoms"):
            # use an empty topology
            topology.resize(n_atoms)
            return topology
        # (1) add all atoms to the topology
        residues = {}
        for atom in obj.atoms:
            name = getattr(atom, 'name', "")
            type = getattr(atom, 'type', name)
            chemfiles_atom = chemfiles.Atom(name, type)
            if hasattr(atom, 'altLoc'):
                chemfiles_atom["altloc"] = str(atom.altLoc)
            if hasattr(atom, 'segid'):
                chemfiles_atom["segid"] = str(atom.segid)
            if hasattr(atom, 'segindex'):
                chemfiles_atom["segindex"] = int(atom.segindex)
            if hasattr(atom, 'resid'):
                resname = getattr(atom, 'resname', "")
                if atom.resid not in residues.keys():
                    residues[atom.resid] = chemfiles.Residue(resname, atom.resid)
                residue = residues[atom.resid]
                atom_idx = len(topology.atoms)
                residue.atoms.append(atom_idx)
                if hasattr(atom, "record_type"):
                    # set corresponding chemfiles residue property
                    if atom.record_type == "ATOM":
                        residue["is_standard_pdb"] = True
                    else:
                        residue["is_standard_pdb"] = False
            topology.atoms.append(chemfiles_atom)
        # (2) add residues to the topology
        for residue in residues.values():
            topology.residues.append(residue)
        # (3) add bonds to the topology
        for bond in getattr(obj, "bonds", []):
            topology.add_bond(bond.atoms[0].ix, bond.atoms[1].ix)
        return topology