Source code for MDAnalysis.visualization.streamlines
# -*- Mode: python; tab-width: 4; indent-tabs-mode:nil; coding:utf-8 -*-
# vim: tabstop=4 expandtab shiftwidth=4 softtabstop=4
#
# MDAnalysis --- https://www.mdanalysis.org
# Copyright (c) 2006-2017 The MDAnalysis Development Team and contributors
# (see the file AUTHORS for the full list of names)
#
# Released under the Lesser GNU Public Licence, v2.1 or any higher version
#
# Please cite your use of MDAnalysis in published work:
#
# R. J. Gowers, M. Linke, J. Barnoud, T. J. E. Reddy, M. N. Melo, S. L. Seyler,
# D. L. Dotson, J. Domanski, S. Buchoux, I. M. Kenney, and O. Beckstein.
# MDAnalysis: A Python package for the rapid analysis of molecular dynamics
# simulations. In S. Benthall and S. Rostrup editors, Proceedings of the 15th
# Python in Science Conference, pages 102-109, Austin, TX, 2016. SciPy.
# doi: 10.25080/majora-629e541a-00e
#
# N. Michaud-Agrawal, E. J. Denning, T. B. Woolf, and O. Beckstein.
# MDAnalysis: A Toolkit for the Analysis of Molecular Dynamics Simulations.
# J. Comput. Chem. 32 (2011), 2319--2327, doi:10.1002/jcc.21787
#
"""Streamplots (2D) --- :mod:`MDAnalysis.visualization.streamlines`
=================================================================
:Authors: Tyler Reddy and Matthieu Chavent
:Year: 2014
:Copyright: Lesser GNU Public License v2.1+
The :func:`generate_streamlines` function can generate a 2D flow field from a
MD trajectory, for instance, lipid molecules in a flat membrane. It can make
use of multiple cores to perform the analyis in parallel (using
:mod:`multiprocessing`).
See Also
--------
MDAnalysis.visualization.streamlines_3D : streamplots in 3D
.. autofunction:: generate_streamlines
"""
import multiprocessing
import numpy as np
import scipy
try:
import matplotlib
import matplotlib.path
except ImportError:
raise ImportError(
"2d streamplot module requires: matplotlib.path for its "
"path.Path.contains_points method. The installation "
"instructions for the matplotlib module can be found here: "
"http://matplotlib.org/faq/installing_faq.html?highlight=install"
) from None
import MDAnalysis
def produce_grid(tuple_of_limits, grid_spacing):
"""Produce a 2D grid for the simulation system.
The grid is based on the tuple of Cartesian Coordinate limits calculated in
an earlier step.
Parameters
----------
tuple_of_limits : tuple
``x_min, x_max, y_min, y_max``
grid_spacing : float
grid size in all directions in ångström
Returns
-------
grid : array
``numpy.mgrid[x_min:x_max:grid_spacing, y_min:y_max:grid_spacing]``
"""
x_min, x_max, y_min, y_max = tuple_of_limits
grid = np.mgrid[x_min:x_max:grid_spacing, y_min:y_max:grid_spacing]
return grid
def split_grid(grid, num_cores):
"""Split the grid into blocks of vertices.
Take the overall `grid` for the system and split it into lists of
square vertices that can be distributed to each core.
Parameters
----------
grid : numpy.array
2D array
num_cores : int
number of partitions to generate
Returns
-------
list_square_vertex_arrays_per_core : array of arrays
split the list of square vertices
``[[v1,v2,v3,v4],[v1,v2,v3,v4],...,...]`` into roughly equally-sized
sublists to be distributed over the available cores on the system
list_parent_index_values : array of arrays
arrays of `[[row, column], [row, column], ...]`` for each core
current_row : int
last row + 1
current_column : int
last column + 1
Note
----
Limited to 2D for now.
"""
# produce an array containing the cartesian coordinates of all vertices in the grid:
x_array, y_array = grid
grid_vertex_cartesian_array = np.dstack((x_array, y_array))
# the grid_vertex_cartesian_array has N_rows, with each row corresponding to a column of coordinates in the grid (
# so a given row has shape N_rows, 2); overall shape (N_columns_in_grid, N_rows_in_a_column, 2)
# although I'll eventually want a pure numpy/scipy/vector-based solution, for now I'll allow loops to simplify the
# division of the cartesian coordinates into a list of the squares in the grid
list_all_squares_in_grid = (
[]
) # should eventually be a nested list of all the square vertices in the grid/system
list_parent_index_values = (
[]
) # want an ordered list of assignment indices for reconstructing the grid positions
# in the parent process
current_column = 0
while current_column < grid_vertex_cartesian_array.shape[0] - 1:
# go through all the columns except the last one and account for the square vertices (the last column
# has no 'right neighbour')
current_row = 0
while current_row < grid_vertex_cartesian_array.shape[1] - 1:
# all rows except the top row, which doesn't have a row above it for forming squares
bottom_left_vertex_current_square = grid_vertex_cartesian_array[
current_column, current_row
]
bottom_right_vertex_current_square = grid_vertex_cartesian_array[
current_column + 1, current_row
]
top_right_vertex_current_square = grid_vertex_cartesian_array[
current_column + 1, current_row + 1
]
top_left_vertex_current_square = grid_vertex_cartesian_array[
current_column, current_row + 1
]
# append the vertices of this square to the overall list of square vertices:
list_all_squares_in_grid.append(
[
bottom_left_vertex_current_square,
bottom_right_vertex_current_square,
top_right_vertex_current_square,
top_left_vertex_current_square,
]
)
list_parent_index_values.append([current_row, current_column])
current_row += 1
current_column += 1
# split the list of square vertices [[v1,v2,v3,v4],[v1,v2,v3,v4],...,...] into roughly equally-sized sublists to
# be distributed over the available cores on the system:
list_square_vertex_arrays_per_core = np.array_split(
list_all_squares_in_grid, num_cores
)
list_parent_index_values = np.array_split(
list_parent_index_values, num_cores
)
return [
list_square_vertex_arrays_per_core,
list_parent_index_values,
current_row,
current_column,
]
# some private utility functions for trajectory iteration
def _produce_list_indices_point_in_polygon_this_frame(
vertex_coord_list, relevant_particle_coordinate_array_xy
):
"""
Produce a list of indices for particles of interest in the current frame.
The list is ordered according to the order of the squares in the grid.
Each entry in the list is a tuple of indices for the particles in that square.
The list is the same length as the number of squares in the grid.
"""
list_indices_point_in_polygon = []
for square_vertices in vertex_coord_list:
path_object = matplotlib.path.Path(square_vertices)
index_list_in_polygon = np.where(
path_object.contains_points(relevant_particle_coordinate_array_xy)
)
list_indices_point_in_polygon.append(index_list_in_polygon)
return list_indices_point_in_polygon
def _produce_list_centroids_this_frame(
list_indices_in_polygon, relevant_particle_coordinate_array_xy
):
"""
Produce a list of centroids for the particles in the current frame.
The list is ordered according to the order of the squares in the grid.
Each entry in the list is a numpy array of the centroid coordinates for the particles in that square.
The list is the same length as the number of squares in the grid.
If there are no particles in a square, the entry is None.
"""
list_centroids_this_frame = []
for indices in list_indices_in_polygon:
if (
not indices[0].size > 0
): # if there are no particles of interest in this particular square
list_centroids_this_frame.append(None)
else:
current_coordinate_array_in_square = (
relevant_particle_coordinate_array_xy[indices]
)
current_square_indices_centroid = np.average(
current_coordinate_array_in_square, axis=0
)
list_centroids_this_frame.append(current_square_indices_centroid)
return list_centroids_this_frame # a list of numpy xy centroid arrays for this frame
def per_core_work(
topology_file_path,
trajectory_file_path,
list_square_vertex_arrays_this_core,
MDA_selection,
start_frame,
end_frame,
reconstruction_index_list,
maximum_delta_magnitude,
):
"""Run the analysis on one core.
The code to perform on a given core given the list of square vertices assigned to it.
"""
# obtain the relevant coordinates for particles of interest
universe_object = MDAnalysis.Universe(
topology_file_path, trajectory_file_path
)
list_previous_frame_centroids = []
list_previous_frame_indices = []
for ts in universe_object.trajectory:
if ts.frame < start_frame: # don't start until first specified frame
continue
relevant_particle_coordinate_array_xy = universe_object.select_atoms(
MDA_selection
).positions[..., :-1]
# only 2D / xy coords for now
# I will need a list of indices for relevant particles falling within each square in THIS frame:
list_indices_in_squares_this_frame = (
_produce_list_indices_point_in_polygon_this_frame(
list_square_vertex_arrays_this_core,
relevant_particle_coordinate_array_xy,
)
)
# likewise, I will need a list of centroids of particles in each square (same order as above list):
list_centroids_in_squares_this_frame = (
_produce_list_centroids_this_frame(
list_indices_in_squares_this_frame,
relevant_particle_coordinate_array_xy,
)
)
if (
list_previous_frame_indices
): # if the previous frame had indices in at least one square I will need to use
# those indices to generate the updates to the corresponding centroids in this frame:
list_centroids_this_frame_using_indices_from_last_frame = (
_produce_list_centroids_this_frame(
list_previous_frame_indices,
relevant_particle_coordinate_array_xy,
)
)
# I need to write a velocity of zero if there are any 'empty' squares in either frame:
xy_deltas_to_write = []
for square_1_centroid, square_2_centroid in zip(
list_centroids_this_frame_using_indices_from_last_frame,
list_previous_frame_centroids,
):
if square_1_centroid is None or square_2_centroid is None:
xy_deltas_to_write.append([0, 0])
else:
xy_deltas_to_write.append(
np.subtract(
square_1_centroid, square_2_centroid
).tolist()
)
# xy_deltas_to_write = np.subtract(np.array(
# list_centroids_this_frame_using_indices_from_last_frame),np.array(list_previous_frame_centroids))
xy_deltas_to_write = np.array(xy_deltas_to_write)
# now filter the array to only contain distances in the range [-8,8] as a placeholder for dealing with PBC
# issues (Matthieu seemed to use a limit of 8 as well);
xy_deltas_to_write = np.clip(
xy_deltas_to_write,
-maximum_delta_magnitude,
maximum_delta_magnitude,
)
# with the xy and dx,dy values calculated I need to set the values from this frame to previous frame
# values in anticipation of the next frame:
list_previous_frame_centroids = (
list_centroids_in_squares_this_frame[:]
)
list_previous_frame_indices = list_indices_in_squares_this_frame[:]
else: # either no points in squares or after the first frame I'll just reset the 'previous' values so they
# can be used when consecutive frames have proper values
list_previous_frame_centroids = (
list_centroids_in_squares_this_frame[:]
)
list_previous_frame_indices = list_indices_in_squares_this_frame[:]
if ts.frame > end_frame:
break # stop here
return list(zip(reconstruction_index_list, xy_deltas_to_write.tolist()))
[docs]
def generate_streamlines(
topology_file_path,
trajectory_file_path,
grid_spacing,
MDA_selection,
start_frame,
end_frame,
xmin,
xmax,
ymin,
ymax,
maximum_delta_magnitude,
num_cores="maximum",
):
r"""Produce the x and y components of a 2D streamplot data set.
Parameters
----------
topology_file_path : str
Absolute path to the topology file
trajectory_file_path : str
Absolute path to the trajectory file. It will normally be desirable
to filter the trajectory with a tool such as GROMACS
:program:`g_filter` (see :footcite:p:`Chavent2014`)
grid_spacing : float
The spacing between grid lines (angstroms)
MDA_selection : str
MDAnalysis selection string
start_frame : int
First frame number to parse
end_frame : int
Last frame number to parse
xmin : float
Minimum coordinate boundary for x-axis (angstroms)
xmax : float
Maximum coordinate boundary for x-axis (angstroms)
ymin : float
Minimum coordinate boundary for y-axis (angstroms)
ymax : float
Maximum coordinate boundary for y-axis (angstroms)
maximum_delta_magnitude : float
Absolute value of the largest displacement tolerated for the
centroid of a group of particles ( angstroms). Values above this
displacement will not count in the streamplot (treated as
excessively large displacements crossing the periodic boundary)
num_cores : int or 'maximum' (optional)
The number of cores to use. (Default 'maximum' uses all available
cores)
Returns
-------
dx_array : array of floats
An array object containing the displacements in the x direction
dy_array : array of floats
An array object containing the displacements in the y direction
average_displacement : float
:math:`\frac{\sum\sqrt[]{dx^2 + dy^2}}{N}`
standard_deviation_of_displacement : float
standard deviation of :math:`\sqrt[]{dx^2 + dy^2}`
Examples
--------
Generate 2D streamlines and plot::
import matplotlib, matplotlib.pyplot, np
import MDAnalysis, MDAnalysis.visualization.streamlines
u1, v1, average_displacement, standard_deviation_of_displacement =
MDAnalysis.visualization.streamlines.generate_streamlines('testing.gro', 'testing_filtered.xtc',
grid_spacing=20, MDA_selection='name PO4', start_frame=2, end_frame=3,
xmin=-8.73000049591, xmax= 1225.96008301,
ymin= -12.5799999237, ymax=1224.34008789,
maximum_delta_magnitude=1.0, num_cores=16)
x = np.linspace(0, 1200, 61)
y = np.linspace(0, 1200, 61)
speed = np.sqrt(u1*u1 + v1*v1)
fig = matplotlib.pyplot.figure()
ax = fig.add_subplot(111, aspect='equal')
ax.set_xlabel('x ($\AA$)')
ax.set_ylabel('y ($\AA$)')
ax.streamplot(x, y, u1, v1, density=(10,10), color=speed, linewidth=3*speed/speed.max())
fig.savefig('testing_streamline.png',dpi=300)
.. image:: testing_streamline.png
References
----------
.. footbibliography::
See Also
--------
MDAnalysis.visualization.streamlines_3D.generate_streamlines_3d
"""
# work out the number of cores to use:
if num_cores == "maximum":
num_cores = multiprocessing.cpu_count() # use all available cores
else:
num_cores = num_cores # use the value specified by the user
# assert isinstance(num_cores,(int,long)), "The number of specified cores must (of course) be an integer."
np.seterr(all="warn", over="raise")
parent_list_deltas = [] # collect all data from child processes here
def log_result_to_parent(delta_array):
parent_list_deltas.extend(delta_array)
tuple_of_limits = (xmin, xmax, ymin, ymax)
grid = produce_grid(
tuple_of_limits=tuple_of_limits, grid_spacing=grid_spacing
)
(
list_square_vertex_arrays_per_core,
list_parent_index_values,
total_rows,
total_columns,
) = split_grid(grid=grid, num_cores=num_cores)
pool = multiprocessing.Pool(num_cores)
for vertex_sublist, index_sublist in zip(
list_square_vertex_arrays_per_core, list_parent_index_values
):
pool.apply_async(
per_core_work,
args=(
topology_file_path,
trajectory_file_path,
vertex_sublist,
MDA_selection,
start_frame,
end_frame,
index_sublist,
maximum_delta_magnitude,
),
callback=log_result_to_parent,
)
pool.close()
pool.join()
dx_array = np.zeros((total_rows, total_columns))
dy_array = np.zeros((total_rows, total_columns))
# the parent_list_deltas is shaped like this: [ ([row_index,column_index],[dx,dy]), ... (...),...,]
for (
index_array,
delta_array,
) in (
parent_list_deltas
): # go through the list in the parent process and assign to the
# appropriate positions in the dx and dy matrices:
# build in a filter to replace all values at the cap (currently between -8,8) with 0 to match Matthieu's code
# (I think eventually we'll reduce the cap to a narrower boundary though)
index_1 = index_array.tolist()[0]
index_2 = index_array.tolist()[1]
if abs(delta_array[0]) == maximum_delta_magnitude:
dx_array[index_1, index_2] = 0
else:
dx_array[index_1, index_2] = delta_array[0]
if abs(delta_array[1]) == maximum_delta_magnitude:
dy_array[index_1, index_2] = 0
else:
dy_array[index_1, index_2] = delta_array[1]
# at Matthieu's request, we now want to calculate the average and standard deviation of the displacement values:
displacement_array = np.sqrt(dx_array**2 + dy_array**2)
average_displacement = np.average(displacement_array)
standard_deviation_of_displacement = np.std(displacement_array)
return (
dx_array,
dy_array,
average_displacement,
standard_deviation_of_displacement,
)