Source code for MDAnalysis.visualization.streamlines_3D

# -*- Mode: python; tab-width: 4; indent-tabs-mode:nil; coding:utf-8 -*-
# vim: tabstop=4 expandtab shiftwidth=4 softtabstop=4
#
# MDAnalysis --- https://www.mdanalysis.org
# Copyright (c) 2006-2017 The MDAnalysis Development Team and contributors
# (see the file AUTHORS for the full list of names)
#
# Released under the Lesser GNU Public Licence, v2.1 or any higher version
#
# Please cite your use of MDAnalysis in published work:
#
# R. J. Gowers, M. Linke, J. Barnoud, T. J. E. Reddy, M. N. Melo, S. L. Seyler,
# D. L. Dotson, J. Domanski, S. Buchoux, I. M. Kenney, and O. Beckstein.
# MDAnalysis: A Python package for the rapid analysis of molecular dynamics
# simulations. In S. Benthall and S. Rostrup editors, Proceedings of the 15th
# Python in Science Conference, pages 102-109, Austin, TX, 2016. SciPy.
# doi: 10.25080/majora-629e541a-00e
#
# N. Michaud-Agrawal, E. J. Denning, T. B. Woolf, and O. Beckstein.
# MDAnalysis: A Toolkit for the Analysis of Molecular Dynamics Simulations.
# J. Comput. Chem. 32 (2011), 2319--2327, doi:10.1002/jcc.21787
#

"""Streamplots (3D) --- :mod:`MDAnalysis.visualization.streamlines_3D`
===================================================================

:Authors: Tyler Reddy and Matthieu Chavent
:Year: 2014
:Copyright: Lesser GNU Public License v2.1+


The :func:`generate_streamlines_3d` function can generate a 3D flow field from
a MD trajectory, for instance, lipid molecules in a virus capsid. It can make
use of multiple cores to perform the analyis in parallel (using
:mod:`multiprocessing`).

.. rubric: References

.. footbibliography::

See Also
--------
MDAnalysis.visualization.streamlines : streamplots in 2D


.. autofunction:: generate_streamlines_3d

"""
import multiprocessing

import numpy as np
import numpy.testing
import scipy
import scipy.spatial.distance

import MDAnalysis


def determine_container_limits(
    topology_file_path, trajectory_file_path, buffer_value
):
    """Calculate the extent of the atom coordinates + buffer.

    A function for the parent process which should take the input trajectory
    and calculate the limits of the container for the system and return these
    limits.

    Parameters
    ----------
    topology_file_path : str
        topology file name
    trajectory_file_path : str
        trajectory file name
    buffer_value : float
        buffer value (padding) in +/- {x, y, z}
    """
    universe_object = MDAnalysis.Universe(
        topology_file_path, trajectory_file_path
    )
    all_atom_selection = universe_object.select_atoms(
        "all"
    )  # select all particles
    all_atom_coordinate_array = all_atom_selection.positions
    x_min, x_max, y_min, y_max, z_min, z_max = [
        all_atom_coordinate_array[..., 0].min(),
        all_atom_coordinate_array[..., 0].max(),
        all_atom_coordinate_array[..., 1].min(),
        all_atom_coordinate_array[..., 1].max(),
        all_atom_coordinate_array[..., 2].min(),
        all_atom_coordinate_array[..., 2].max(),
    ]
    tuple_of_limits = (
        x_min - buffer_value,
        x_max + buffer_value,
        y_min - buffer_value,
        y_max + buffer_value,
        z_min - buffer_value,
        z_max + buffer_value,
    )  # using buffer_value to catch particles near edges
    return tuple_of_limits


def produce_grid(tuple_of_limits, grid_spacing):
    """Produce a 3D grid for the simulation system.

    The partitioning is based on the tuple of Cartesian Coordinate limits
    calculated in an earlier step.

    Parameters
    ----------
    tuple_of_limits : tuple
        ``x_min, x_max, y_min, y_max, z_min, z_max``
    grid_spacing : float
        grid size in all directions in ångström

    Returns
    -------
    grid : array
        ``numpy.mgrid[x_min:x_max:grid_spacing, y_min:y_max:grid_spacing, z_min:z_max:grid_spacing]``

    """
    x_min, x_max, y_min, y_max, z_min, z_max = tuple_of_limits
    grid = np.mgrid[
        x_min:x_max:grid_spacing,
        y_min:y_max:grid_spacing,
        z_min:z_max:grid_spacing,
    ]
    return grid


def split_grid(grid, num_cores):
    """Split the grid into blocks of vertices.

    Take the overall `grid` for the system and split it into lists of cube
    vertices that can be distributed to each core.

    Parameters
    ----------
    grid : numpy.array
        3D array
    num_cores : int
        number of partitions to generate

    Returns
    -------
    list_dictionaries_for_cores : list of dict
    total_cubes : int
    num_sheets : int
    delta_array_shape : tuple

    """
    # unpack the x,y,z mgrid arrays
    x, y, z = grid
    num_z_values = z.shape[-1]
    num_sheets = z.shape[0]
    delta_array_shape = tuple(
        [n - 1 for n in x.shape]
    )  # the final target shape for return delta arrays is n-1 in each dimension

    ordered_list_per_sheet_x_values = []
    for (
        x_sheet
    ) in (
        x
    ):  # each x_sheet should have shape (25,23) and the same x value in each element
        array_all_x_values_current_sheet = x_sheet.flatten()
        ordered_list_per_sheet_x_values.append(
            array_all_x_values_current_sheet
        )
    ordered_list_per_sheet_y_values = []
    for y_columns in y:
        array_all_y_values_current_sheet = y_columns.flatten()
        ordered_list_per_sheet_y_values.append(
            array_all_y_values_current_sheet
        )
    ordered_list_per_sheet_z_values = []
    for z_slices in z:
        array_all_z_values_current_sheet = z_slices.flatten()
        ordered_list_per_sheet_z_values.append(
            array_all_z_values_current_sheet
        )

    ordered_list_cartesian_coordinates_per_sheet = []
    for x_sheet_coords, y_sheet_coords, z_sheet_coords in zip(
        ordered_list_per_sheet_x_values,
        ordered_list_per_sheet_y_values,
        ordered_list_per_sheet_z_values,
    ):
        ordered_list_cartesian_coordinates_per_sheet.append(
            list(zip(x_sheet_coords, y_sheet_coords, z_sheet_coords))
        )
    array_ordered_cartesian_coords_per_sheet = np.array(
        ordered_list_cartesian_coordinates_per_sheet
    )
    # now I'm going to want to build cubes in an ordered fashion, and in such a way that I can track the index /
    # centroid of each cube for domain decomposition / reconstruction and mayavi mlab.flow() input
    # cubes will be formed from N - 1 base sheets combined with subsequent sheets
    current_base_sheet = 0
    dictionary_cubes_centroids_indices = {}
    cube_counter = 0
    while current_base_sheet < num_sheets - 1:
        current_base_sheet_array = array_ordered_cartesian_coords_per_sheet[
            current_base_sheet
        ]
        current_top_sheet_array = array_ordered_cartesian_coords_per_sheet[
            current_base_sheet + 1
        ]  # the points of the sheet 'to the right' in the grid
        current_index = 0
        while current_index < current_base_sheet_array.shape[0] - num_z_values:
            # iterate through all the indices in each of the sheet arrays (careful to avoid extra
            # points not needed for cubes)
            column_z_level = (
                0  # start at the bottom of a given 4-point column and work up
            )
            while column_z_level < num_z_values - 1:
                current_list_cube_vertices = []
                first_two_vertices_base_sheet = current_base_sheet_array[
                    current_index : current_index + 2, ...
                ].tolist()
                first_two_vertices_top_sheet = current_top_sheet_array[
                    current_index : current_index + 2, ...
                ].tolist()
                next_two_vertices_base_sheet = current_base_sheet_array[
                    current_index
                    + num_z_values : 2
                    + num_z_values
                    + current_index,
                    ...,
                ].tolist()
                next_two_vertices_top_sheet = current_top_sheet_array[
                    current_index
                    + num_z_values : 2
                    + num_z_values
                    + current_index,
                    ...,
                ].tolist()
                for vertex_set in [
                    first_two_vertices_base_sheet,
                    first_two_vertices_top_sheet,
                    next_two_vertices_base_sheet,
                    next_two_vertices_top_sheet,
                ]:
                    current_list_cube_vertices.extend(vertex_set)
                vertex_array = np.array(current_list_cube_vertices)
                assert vertex_array.shape == (
                    8,
                    3,
                ), "vertex_array has incorrect shape"
                cube_centroid = np.average(
                    np.array(current_list_cube_vertices), axis=0
                )
                dictionary_cubes_centroids_indices[cube_counter] = {
                    "centroid": cube_centroid,
                    "vertex_list": current_list_cube_vertices,
                }
                cube_counter += 1
                current_index += 1
                column_z_level += 1
                if (
                    column_z_level == num_z_values - 1
                ):  # the loop will break but I should also increment the
                    # current_index
                    current_index += 1
        current_base_sheet += 1
    total_cubes = len(dictionary_cubes_centroids_indices)

    # produce an array of pseudo cube indices (actually the dictionary keys which are cube numbers in string format):
    pseudo_cube_indices = np.arange(0, total_cubes)
    sublist_of_cube_indices_per_core = np.array_split(
        pseudo_cube_indices, num_cores
    )
    # now, the split of pseudoindices seems to work well, and the above sublist_of_cube_indices_per_core is a list of
    # arrays of cube numbers / keys in the original dictionary
    # now I think I'll try to produce a list of dictionaries that each contain their assigned cubes based on the above
    #  per core split
    list_dictionaries_for_cores = []
    subdictionary_counter = 0
    for array_cube_indices in sublist_of_cube_indices_per_core:
        current_core_dictionary = {}
        items_to_pop = len(array_cube_indices)
        items_popped = 0
        while items_popped < items_to_pop:
            key, value = dictionary_cubes_centroids_indices.popitem()
            current_core_dictionary.update({key: value})
            items_popped += 1
        list_dictionaries_for_cores.append(current_core_dictionary)
        subdictionary_counter += 1
    return (
        list_dictionaries_for_cores,
        total_cubes,
        num_sheets,
        delta_array_shape,
    )


def per_core_work(
    start_frame_coord_array,
    end_frame_coord_array,
    dictionary_cube_data_this_core,
    MDA_selection,
    start_frame,
    end_frame,
):
    """Run the analysis on one core.

    The code to perform on a given core given the dictionary of cube data.
    """
    list_previous_frame_centroids = []
    list_previous_frame_indices = []
    # define some utility functions for trajectory iteration:

    def point_in_cube(
        array_point_coordinates, list_cube_vertices, cube_centroid
    ):
        """Determine if an array of coordinates are within a cube."""
        # the simulation particle point can't be more than half the cube side length away from the cube centroid in
        # any given dimension:
        array_cube_vertices = np.array(list_cube_vertices)
        cube_half_side_length = (
            scipy.spatial.distance.pdist(
                array_cube_vertices, "euclidean"
            ).min()
            / 2.0
        )
        array_cube_vertex_distances_from_centroid = (
            scipy.spatial.distance.cdist(
                array_cube_vertices, cube_centroid[np.newaxis, :]
            )
        )
        np.testing.assert_allclose(
            array_cube_vertex_distances_from_centroid.min(),
            array_cube_vertex_distances_from_centroid.max(),
            rtol=0,
            atol=1.5e-4,
            err_msg="not all cube vertex to centroid distances are the same, "
            "so not a true cube",
        )
        absolute_delta_coords = np.absolute(
            np.subtract(array_point_coordinates, cube_centroid)
        )
        absolute_delta_x_coords = absolute_delta_coords[..., 0]
        indices_delta_x_acceptable = np.where(
            absolute_delta_x_coords <= cube_half_side_length
        )
        absolute_delta_y_coords = absolute_delta_coords[..., 1]
        indices_delta_y_acceptable = np.where(
            absolute_delta_y_coords <= cube_half_side_length
        )
        absolute_delta_z_coords = absolute_delta_coords[..., 2]
        indices_delta_z_acceptable = np.where(
            absolute_delta_z_coords <= cube_half_side_length
        )
        intersection_xy_acceptable_arrays = np.intersect1d(
            indices_delta_x_acceptable[0], indices_delta_y_acceptable[0]
        )
        overall_indices_points_in_current_cube = np.intersect1d(
            intersection_xy_acceptable_arrays, indices_delta_z_acceptable[0]
        )
        return overall_indices_points_in_current_cube

    def update_dictionary_point_in_cube_start_frame(
        array_simulation_particle_coordinates, dictionary_cube_data_this_core
    ):
        """Basically update the cube dictionary objects assigned to this core to contain a new key/value pair
        corresponding to the indices of the relevant particles that fall within a given cube. Also, for a given cube,
        store a key/value pair for the centroid of the particles that fall within the cube.
        """
        cube_counter = 0
        for key, cube in dictionary_cube_data_this_core.items():
            index_list_in_cube = point_in_cube(
                array_simulation_particle_coordinates,
                cube["vertex_list"],
                cube["centroid"],
            )
            cube["start_frame_index_list_in_cube"] = index_list_in_cube
            if (
                len(index_list_in_cube) > 0
            ):  # if there's at least one particle in this cube
                centroid_particles_in_cube = np.average(
                    array_simulation_particle_coordinates[index_list_in_cube],
                    axis=0,
                )
                cube["centroid_of_particles_first_frame"] = (
                    centroid_particles_in_cube
                )
            else:  # empty cube
                cube["centroid_of_particles_first_frame"] = None
            cube_counter += 1

    def update_dictionary_end_frame(
        array_simulation_particle_coordinates, dictionary_cube_data_this_core
    ):
        """Update the cube dictionary objects again as appropriate for the second and final frame."""
        cube_counter = 0
        for key, cube in dictionary_cube_data_this_core.items():
            # if there were no particles in the cube in the first frame, then set dx,dy,dz each to 0
            if cube["centroid_of_particles_first_frame"] is None:
                cube["dx"] = 0
                cube["dy"] = 0
                cube["dz"] = 0
            else:  # there was at least one particle in the starting cube so we can get dx,dy,dz centroid values
                new_coordinate_array_for_particles_starting_in_this_cube = (
                    array_simulation_particle_coordinates[
                        cube["start_frame_index_list_in_cube"]
                    ]
                )
                new_centroid_for_particles_starting_in_this_cube = np.average(
                    new_coordinate_array_for_particles_starting_in_this_cube,
                    axis=0,
                )
                cube["centroid_of_paticles_final_frame"] = (
                    new_centroid_for_particles_starting_in_this_cube
                )
                delta_centroid_array_this_cube = (
                    new_centroid_for_particles_starting_in_this_cube
                    - cube["centroid_of_particles_first_frame"]
                )
                cube["dx"] = delta_centroid_array_this_cube[0]
                cube["dy"] = delta_centroid_array_this_cube[1]
                cube["dz"] = delta_centroid_array_this_cube[2]
            cube_counter += 1

    # now that the parent process is dealing with the universe object & grabbing required coordinates, each child
    # process only needs to take the coordinate arrays & perform the operations with its assigned cubes (no more file
    #  opening and trajectory iteration on each core--which I'm hoping will substantially reduce the physical memory
    # footprint of my 3D streamplot code)
    update_dictionary_point_in_cube_start_frame(
        start_frame_coord_array, dictionary_cube_data_this_core
    )
    update_dictionary_end_frame(
        end_frame_coord_array, dictionary_cube_data_this_core
    )
    return dictionary_cube_data_this_core


def produce_coordinate_arrays_single_process(
    topology_file_path,
    trajectory_file_path,
    MDA_selection,
    start_frame,
    end_frame,
):
    """Generate coordinate arrays.

    To reduce memory footprint produce only a single MDA selection and get
    desired coordinate arrays; can later send these coordinate arrays to all
    child processes rather than having each child process open a trajectory and
    waste memory.

    """
    universe_object = MDAnalysis.Universe(
        topology_file_path, trajectory_file_path
    )
    relevant_particles = universe_object.select_atoms(MDA_selection)
    # pull out coordinate arrays from desired frames:
    for ts in universe_object.trajectory:
        if ts.frame > end_frame:
            break  # stop here
        if ts.frame == start_frame:
            start_frame_relevant_particle_coordinate_array_xyz = (
                relevant_particles.positions
            )
        elif ts.frame == end_frame:
            end_frame_relevant_particle_coordinate_array_xyz = (
                relevant_particles.positions
            )
        else:
            continue
    return (
        start_frame_relevant_particle_coordinate_array_xyz,
        end_frame_relevant_particle_coordinate_array_xyz,
    )


[docs] def generate_streamlines_3d( topology_file_path, trajectory_file_path, grid_spacing, MDA_selection, start_frame, end_frame, xmin, xmax, ymin, ymax, zmin, zmax, maximum_delta_magnitude=2.0, num_cores="maximum", ): r"""Produce the x, y and z components of a 3D streamplot data set. Parameters ---------- topology_file_path : str Absolute path to the topology file trajectory_file_path : str Absolute path to the trajectory file. It will normally be desirable to filter the trajectory with a tool such as GROMACS :program:`g_filter` (see :footcite:p:`Chavent2014`) grid_spacing : float The spacing between grid lines (angstroms) MDA_selection : str MDAnalysis selection string start_frame : int First frame number to parse end_frame : int Last frame number to parse xmin : float Minimum coordinate boundary for x-axis (angstroms) xmax : float Maximum coordinate boundary for x-axis (angstroms) ymin : float Minimum coordinate boundary for y-axis (angstroms) ymax : float Maximum coordinate boundary for y-axis (angstroms) maximum_delta_magnitude : float Absolute value of the largest displacement tolerated for the centroid of a group of particles ( angstroms). Values above this displacement will not count in the streamplot (treated as excessively large displacements crossing the periodic boundary) num_cores : int or 'maximum' (optional) The number of cores to use. (Default 'maximum' uses all available cores) Returns ------- dx_array : array of floats An array object containing the displacements in the x direction dy_array : array of floats An array object containing the displacements in the y direction dz_array : array of floats An array object containing the displacements in the z direction Examples -------- Generate 3D streamlines and visualize in `mayavi`_:: import numpy as np import MDAnalysis import MDAnalysis.visualization.streamlines_3D import mayavi, mayavi.mlab # assign coordinate system limits and grid spacing: x_lower,x_upper = -8.73, 1225.96 y_lower,y_upper = -12.58, 1224.34 z_lower,z_upper = -300, 300 grid_spacing_value = 20 x1, y1, z1 = MDAnalysis.visualization.streamlines_3D.generate_streamlines_3d( 'testing.gro', 'testing_filtered.xtc', xmin=x_lower, xmax=x_upper, ymin=y_lower, ymax=y_upper, zmin=z_lower, zmax=z_upper, grid_spacing=grid_spacing_value, MDA_selection = 'name PO4', start_frame=2, end_frame=3, num_cores='maximum') x, y, z = np.mgrid[x_lower:x_upper:x1.shape[0]*1j, y_lower:y_upper:y1.shape[1]*1j, z_lower:z_upper:z1.shape[2]*1j] # plot with mayavi: fig = mayavi.mlab.figure(bgcolor=(1.0, 1.0, 1.0), size=(800, 800), fgcolor=(0, 0, 0)) for z_value in np.arange(z_lower, z_upper, grid_spacing_value): st = mayavi.mlab.flow(x, y, z, x1, y1, z1, line_width=1, seedtype='plane', integration_direction='both') st.streamline_type = 'tube' st.tube_filter.radius = 2 st.seed.widget.origin = np.array([ x_lower, y_upper, z_value]) st.seed.widget.point1 = np.array([ x_upper, y_upper, z_value]) st.seed.widget.point2 = np.array([ x_lower, y_lower, z_value]) st.seed.widget.resolution = int(x1.shape[0]) st.seed.widget.enabled = False mayavi.mlab.axes(extent = [0, 1200, 0, 1200, -300, 300]) fig.scene.z_plus_view() mayavi.mlab.savefig('test_streamplot_3D.png') # more compelling examples can be produced for vesicles and other spherical systems .. image:: test_streamplot_3D.png See Also -------- MDAnalysis.visualization.streamlines.generate_streamlines .. _mayavi: http://docs.enthought.com/mayavi/mayavi/ """ # work out the number of cores to use: if num_cores == "maximum": num_cores = multiprocessing.cpu_count() # use all available cores else: num_cores = num_cores # use the value specified by the user # assert isinstance(num_cores,(int,long)), "The number of specified cores must (of course) be an integer." np.seterr(all="warn", over="raise") parent_cube_dictionary = {} # collect all data from child processes here def log_result_to_parent(process_dict): parent_cube_dictionary.update(process_dict) # step 1: produce tuple of cartesian coordinate limits for the first frame # tuple_of_limits = determine_container_limits(topology_file_path = topology_file_path,trajectory_file_path = # trajectory_file_path,buffer_value=buffer_value) tuple_of_limits = (xmin, xmax, ymin, ymax, zmin, zmax) # step 2: produce a suitable grid (will assume that grid size / container size does not vary during simulation--or # at least not beyond the buffer limit, such that this grid can be used for all subsequent frames) grid = produce_grid( tuple_of_limits=tuple_of_limits, grid_spacing=grid_spacing ) # step 3: split the grid into a dictionary of cube information that can be sent to each core for processing: list_dictionaries_for_cores, total_cubes, num_sheets, delta_array_shape = ( split_grid(grid=grid, num_cores=num_cores) ) # step 3b: produce required coordinate arrays on a single core to avoid making a universe object on each core: start_frame_coord_array, end_frame_coord_array = ( produce_coordinate_arrays_single_process( topology_file_path, trajectory_file_path, MDA_selection, start_frame, end_frame, ) ) # step 4: per process work using the above grid data split pool = multiprocessing.Pool(num_cores) for sub_dictionary_of_cube_data in list_dictionaries_for_cores: pool.apply_async( per_core_work, args=( start_frame_coord_array, end_frame_coord_array, sub_dictionary_of_cube_data, MDA_selection, start_frame, end_frame, ), callback=log_result_to_parent, ) pool.close() pool.join() # so, at this stage the parent process now has a single dictionary with all the cube objects updated from all # available cores # the 3D streamplot (i.e, mayavi flow() function) will require separate 3D np arrays for dx,dy,dz # the shape of each 3D array will unfortunately have to match the mgrid data structure (bit of a pain): ( # num_sheets - 1, num_sheets - 1, cubes_per_column) cubes_per_sheet = int(float(total_cubes) / float(num_sheets - 1)) # produce dummy zero arrays for dx,dy,dz of the appropriate shape: dx_array = np.zeros(delta_array_shape) dy_array = np.zeros(delta_array_shape) dz_array = np.zeros(delta_array_shape) # now use the parent cube dictionary to correctly substitute in dx,dy,dz values current_sheet = 0 # which is also the current row y_index_current_sheet = 0 # sub row z_index_current_column = 0 # column total_cubes_current_sheet = 0 for cube_number in range(0, total_cubes): dx_array[ current_sheet, y_index_current_sheet, z_index_current_column ] = parent_cube_dictionary[cube_number]["dx"] dy_array[ current_sheet, y_index_current_sheet, z_index_current_column ] = parent_cube_dictionary[cube_number]["dy"] dz_array[ current_sheet, y_index_current_sheet, z_index_current_column ] = parent_cube_dictionary[cube_number]["dz"] z_index_current_column += 1 total_cubes_current_sheet += 1 if z_index_current_column == delta_array_shape[2]: # done building current y-column so iterate y value and reset z z_index_current_column = 0 y_index_current_sheet += 1 if ( y_index_current_sheet == delta_array_shape[1] ): # current sheet is complete current_sheet += 1 y_index_current_sheet = 0 # restart for new sheet z_index_current_column = 0 total_cubes_current_sheet = 0 # now set velocity component values greater than a certain cutoff to 0, # because they tend to reflect spurious values (i.e., PBC jumping) dx_array[abs(dx_array) >= maximum_delta_magnitude] = 1.0 dy_array[abs(dy_array) >= maximum_delta_magnitude] = 1.0 dz_array[abs(dz_array) >= maximum_delta_magnitude] = 1.0 return (dx_array, dy_array, dz_array)