Skip to content

Commit

Permalink
Merge pull request #1145 from kain88-de/refactor-encore-confdist
Browse files Browse the repository at this point in the history
Refactor encore `conformational_distance_matrix`
  • Loading branch information
kain88-de authored Jan 15, 2017
2 parents d88567f + c069958 commit 789a96c
Show file tree
Hide file tree
Showing 8 changed files with 173 additions and 316 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ before_install:
- conda install --yes pylint
install:
# Minimal installation!
- conda create --yes -q -n pyenv python=$PYTHON_VERSION numpy mmtf-python nose=1.3.7 mock sphinx=1.3 six biopython networkx cython
- conda create --yes -q -n pyenv python=$PYTHON_VERSION numpy mmtf-python nose=1.3.7 mock sphinx=1.3 six biopython networkx cython joblib
- source activate pyenv
# Install griddataformats from PIP so that scipy is only installed in the full build (#1147)
- pip install griddataformats
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,12 @@
try:
import sklearn.cluster
except ImportError:
sklearn = None
msg = "sklearn.cluster could not be imported: some functionality will " \
"not be available in encore.fit_clusters()"
warnings.warn(msg, category=ImportWarning)
logging.warn(msg)
del msg
sklearn = None
msg = "sklearn.cluster could not be imported: some functionality will " \
"not be available in encore.fit_clusters()"
warnings.warn(msg, category=ImportWarning)
logging.warn(msg)
del msg


def encode_centroid_info(clusters, cluster_centers_indices):
Expand Down
213 changes: 54 additions & 159 deletions package/MDAnalysis/analysis/encore/confdistmatrix.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,6 @@ class to compute an RMSD matrix in such a way is also available.
"""

import numpy as np
from multiprocessing import Process, Array, RawValue
from ctypes import c_float
from getpass import getuser
from socket import gethostname
from datetime import datetime
Expand All @@ -49,61 +47,51 @@ class to compute an RMSD matrix in such a way is also available.
from ..align import rotation_matrix

from .cutils import PureRMSD
from .utils import TriangularMatrix, trm_indeces, \
AnimatedProgressBar
from .utils import TriangularMatrix, trm_indices

try:
from joblib import Parallel, delayed
except ImportError:
import warnings
warnings.warn( "Couldn't import joblib. Can't use conformational_distance_matrix", category=ImportWarning)


def conformational_distance_matrix(ensemble,
conf_dist_function, selection="",
superimposition_selection="", ncores=1, pairwise_align=True,
mass_weighted=True, metadata=True, *args, **kwargs):
conf_dist_function, selection="",
superimposition_selection="", n_jobs=1, pairwise_align=True,
mass_weighted=True, metadata=True, verbose=False):
"""
Run the conformational distance matrix calculation.
args and kwargs are passed to conf_dist_function.
Parameters
----------
ensemble : Universe object
Universe object for which the conformational distance matrix will
be computed.
conf_dist_function : function object
Function that fills the matrix with conformational distance
values. See set_rmsd_matrix_elements for an example.
pairwise_align : bool
Whether to perform pairwise alignment between conformations.
Default is True (do the superimposition)
mass_weighted : bool
Whether to perform mass-weighted superimposition and metric
calculation. Default is True.
metadata : bool
Whether to build a metadata dataset for the calculated matrix.
Default is True.
ncores : int
n_jobs : int
Number of cores to be used for parallel calculation
Default is 1.
Default is 1. -1 uses all available cores
Returns
-------
conf_dist_matrix : encore.utils.TriangularMatrix object
Conformational distance matrix in triangular representation.
"""

# Decide how many cores have to be used. Since the main process is
# stopped while the workers do their job, ncores workers will be
# spawned.

if ncores < 1:
ncores = 1

# framesn: number of frames
framesn = len(ensemble.trajectory.timeseries(
ensemble.select_atoms(selection), format='fac'))
Expand Down Expand Up @@ -160,73 +148,30 @@ def conformational_distance_matrix(ensemble,
else:
subset_masses = None

# matsize: number of elements of the triangular matrix, diagonal
# elements included.
matsize = framesn * (framesn + 1) / 2

# Calculate the number of matrix elements that each core has to
# calculate as equally as possible.
if ncores > matsize:
ncores = matsize
runs_per_worker = [matsize / int(ncores) for x in range(ncores)]
unfair_work = matsize % ncores
for i in range(unfair_work):
runs_per_worker[i] += 1

# Splice the matrix in ncores segments. Calculate the first and the
# last (i,j) matrix elements of the slices that will be assigned to
# each worker. Each of them will proceed in a column-then-row order
# (e.g. 0,0 1,0 1,1 2,0 2,1 2,2 ... )
i = 0
a = [0, 0]
b = [0, 0]
tasks_per_worker = []
for n,r in enumerate(runs_per_worker):
while i * (i - 1) / 2 < np.sum(runs_per_worker[:n + 1]):
i += 1
b = [i - 2,
np.sum(runs_per_worker[0:n + 1]) - (i - 2) * (i - 1) / 2 - 1]
tasks_per_worker.append((tuple(a), tuple(b)))
if b[0] == b[1]:
a[0] = b[0] + 1
a[1] = 0
else:
a[0] = b[0]
a[1] = b[1] + 1

# Allocate for output matrix
distmat = Array(c_float, matsize)
matsize = framesn * (framesn + 1) / 2
distmat = np.empty(matsize, np.float64)

# Prepare progress bar stuff and run it
pbar = AnimatedProgressBar(end=matsize, width=80)
partial_counters = [RawValue('i', 0) for i in range(ncores)]

# Initialize workers. Simple worker doesn't perform fitting,
# fitter worker does.

workers = [Process(target=conf_dist_function, args=(
tasks_per_worker[i],
indices = trm_indices((0, 0), (framesn - 1, framesn - 1))
Parallel(n_jobs=n_jobs, verbose=verbose)(delayed(conf_dist_function)(
element,
rmsd_coordinates,
distmat,
masses,
fitting_coordinates,
subset_masses,
partial_counters[i],
args,
kwargs)) for i in range(ncores)]
masses) for element in indices)

# Start & join the workers
for w in workers:
w.start()
for w in workers:
w.join()

# When the workers have finished, return a TriangularMatrix object
return TriangularMatrix(distmat, metadata=metadata)


def set_rmsd_matrix_elements(tasks, coords, rmsdmat, masses, fit_coords=None,
fit_masses=None, pbar_counter=None, *args, **kwargs):
fit_masses=None, pbar_counter=None, *args, **kwargs):

'''
RMSD Matrix calculator
Expand All @@ -237,7 +182,7 @@ def set_rmsd_matrix_elements(tasks, coords, rmsdmat, masses, fit_coords=None,
tasks : iterator of int of length 2
Given a triangular matrix, this function will calculate RMSD
values from element tasks[0] to tasks[1]. Since the matrix
is triangular, the trm_indeces matrix automatically
is triangular, the trm_indices matrix automatically
calculates the corrisponding i,j matrix indices.
The matrix is written as an array in a row-major
order (see the TriangularMatrix class for details).
Expand All @@ -264,82 +209,38 @@ def set_rmsd_matrix_elements(tasks, coords, rmsdmat, masses, fit_coords=None,
fit_masses : numpy.array
Array of atomic masses, having the same order as the
fit_coords array
pbar_counter : multiprocessing.RawValue
Thread-safe shared value. This counter is updated at
every cycle and used to evaluate the progress of
each worker in a parallel calculation.
'''

i, j = tasks

if fit_coords is None and fit_masses is None:
for i, j in trm_indeces(tasks[0], tasks[1]):
summasses = np.sum(masses)
rmsdmat[(i + 1) * i / 2 + j] = PureRMSD(coords[i].astype(np.float64),
coords[j].astype(np.float64),
coords[j].shape[0],
masses,
summasses)
summasses = np.sum(masses)
rmsdmat[(i + 1) * i / 2 + j] = PureRMSD(coords[i],
coords[j],
coords[j].shape[0],
masses,
summasses)

elif fit_coords is not None and fit_coords is not None:
for i, j in trm_indeces(tasks[0], tasks[1]):
summasses = np.sum(masses)
subset_weights = np.asarray(fit_masses) / np.mean(fit_masses)
com_i = np.average(fit_coords[i], axis=0,
weights=fit_masses)
translated_i = coords[i] - com_i
subset1_coords = fit_coords[i] - com_i
com_j = np.average(fit_coords[j], axis=0,
weights=fit_masses)
translated_j = coords[j] - com_j
subset2_coords = fit_coords[j] - com_j
rotamat = rotation_matrix(subset1_coords, subset2_coords,
subset_weights)[0]
rotated_i = np.transpose(np.dot(rotamat, np.transpose(translated_i)))
rmsdmat[(i + 1) * i / 2 + j] = PureRMSD(
rotated_i.astype(np.float64), translated_j.astype(np.float64),
coords[j].shape[0], masses, summasses)

summasses = np.sum(masses)
subset_weights = np.asarray(fit_masses) / np.mean(fit_masses)
com_i = np.average(fit_coords[i], axis=0,
weights=fit_masses)
translated_i = coords[i] - com_i
subset1_coords = fit_coords[i] - com_i
com_j = np.average(fit_coords[j], axis=0,
weights=fit_masses)
translated_j = coords[j] - com_j
subset2_coords = fit_coords[j] - com_j
rotamat = rotation_matrix(subset1_coords, subset2_coords,
subset_weights)[0]
rotated_i = np.transpose(np.dot(rotamat, np.transpose(translated_i)))
rmsdmat[(i + 1) * i / 2 + j] = PureRMSD(
rotated_i.astype(np.float64), translated_j.astype(np.float64),
coords[j].shape[0], masses, summasses)
else:
raise TypeError("Both fit_coords and fit_masses must be specified \
if one of them is given")

if pbar_counter is not None:
pbar_counter.value += 1

def pbar_updater(pbar, pbar_counters, max_val, update_interval=0.2):
'''Method that updates and prints the progress bar, upon polling
progress status from workers.
Parameters
----------
pbar : encore.utils.AnimatedProgressBar object
Progress bar object
pbar_counters : list of multiprocessing.RawValue
List of counters. Each worker is given a counter, which is updated
at every cycle. In this way the _pbar_updater process can
asynchronously fetch progress reports.
max_val : int
Total number of matrix elements to be calculated
update_interval : float
Number of seconds between progress bar updates
'''

val = 0
while val < max_val:
val = 0
for c in pbar_counters:
val += c.value
pbar.update(val)
pbar.show_progress()
sleep(update_interval)



def get_distance_matrix(ensemble,
selection="name CA",
Expand All @@ -348,7 +249,8 @@ def get_distance_matrix(ensemble,
superimpose=True,
superimposition_subset="name CA",
mass_weighted=True,
ncores=1,
n_jobs=1,
verbose=False,
*conf_dist_args,
**conf_dist_kwargs):
"""
Expand All @@ -369,34 +271,28 @@ def get_distance_matrix(ensemble,
Parameters
----------
ensemble : Universe
selection : str
Atom selection string in the MDAnalysis format. Default is "name CA"
load_matrix : str, optional
Load similarity/dissimilarity matrix from numpy binary file instead
of calculating it (default is None). A filename is required.
save_matrix : bool, optional
Save calculated matrix as numpy binary file (default is None). A
filename is required.
superimpose : bool, optional
Whether to superimpose structures before calculating distance
(default is True).
superimposition_subset : str, optional
Group for superimposition using MDAnalysis selection syntax
(default is CA atoms: "name CA")
mass_weighted : bool, optional
calculate a mass-weighted RMSD (default is True). If set to False
the superimposition will also not be mass-weighted.
ncores : int, optional
Maximum number of cores to be used (default is 1)
n_jobs : int, optional
Maximum number of cores to be used (default is 1). If -1 use all cores.
verbose : bool, optional
print progress
Returns
-------
Expand Down Expand Up @@ -446,13 +342,12 @@ def get_distance_matrix(ensemble,
# Use superimposition subset, if necessary. If the pairwise alignment
# is not required, it will not be performed anyway.
confdistmatrix = conformational_distance_matrix(ensemble,
conf_dist_function=set_rmsd_matrix_elements,
selection=selection,
pairwise_align=superimpose,
mass_weighted=mass_weighted,
ncores=ncores,
*conf_dist_args,
kwargs=conf_dist_kwargs)
conf_dist_function=set_rmsd_matrix_elements,
selection=selection,
pairwise_align=superimpose,
mass_weighted=mass_weighted,
n_jobs=n_jobs,
verbose=verbose)

logging.info(" Done!")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,11 @@
try:
import sklearn.decomposition
except ImportError:
sklearn = None
msg = "sklearn.decomposition could not be imported: some functionality will"\
"not be available in encore.dimensionality_reduction()"
warnings.warn(msg, category=ImportWarning)
logging.warn(msg)
del msg
sklearn = None
import warnings
warnings.warn("sklearn.decomposition could not be imported: some "
"functionality will not be available in "
"encore.dimensionality_reduction()", category=ImportWarning)


class DimensionalityReductionMethod (object):
Expand Down
Loading

0 comments on commit 789a96c

Please sign in to comment.