Skip to content

Commit

Permalink
feat: add coordination number analysis;
Browse files Browse the repository at this point in the history
refactor: use mda.analysisbase to calculate ldx
  • Loading branch information
Cloudac7 committed Sep 22, 2023
1 parent 3c8d8d5 commit 23ad94b
Show file tree
Hide file tree
Showing 4 changed files with 150 additions and 62 deletions.
79 changes: 17 additions & 62 deletions miko/structure/cluster.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
import numpy as np
import numpy.typing as npt
import pandas as pd

from typing import Optional
from scipy.spatial import distance
from scipy.optimize import curve_fit
from MDAnalysis import Universe


from miko.structure.lindemann_index import LindemannIndex


class Cluster(object):
def __init__(self, path=None, **kwargs):
self.path = path
Expand All @@ -19,7 +24,7 @@ def convert_universe(cls, u: Universe, **kwargs):
Args:
u (Universe): Trajectory instance.
Returns:
Cluster: Cluster instance.
"""
Expand All @@ -36,7 +41,7 @@ def load_mda_trajectory(self, **kwargs) -> Universe:
if kwargs.get("topology_format", None) is None:
kwargs["topology_format"] = "XYZ"
return Universe(self.path, **kwargs)

def distance_to_com(self, selection_cluster: str):
"""Analyze cluster atoms by calculating distance to center of mass.
Expand All @@ -56,67 +61,17 @@ def distance_to_com(self, selection_cluster: str):
dis = distance.euclidean(t, cg)
distances[q, p] = dis
return distances

def lindemann_per_frames(self, selection_cluster: str) -> np.ndarray:
"""Calculate the lindemann index for each atom AND FRAME

Warning this can produce extremly large ndarrays in memory
depending on the size of the cluster and the ammount of frames.
Args:
u (Universe): MDA trajectory instance.
selection_cluster (str): Selection language to select atoms in cluster.
Returns:
np.ndarray: An ndarray of shape (len_frames, natoms)
References:
https://github.com/ybyygu/lindemann-index/blob/master/src/lindemann.pyx
http://math.stackexchange.com/a/116344
"""
def lindemann_per_frames(self,
selection_cluster: str,
box: Optional[npt.NDArray] = None) -> np.ndarray:
u = self.universe
sele_ori = u.select_atoms(selection_cluster)
natoms = len(sele_ori)
nframes = len(u.trajectory)
len_frames = len(u.trajectory)
array_mean = np.zeros((natoms, natoms))
array_var = np.zeros((natoms, natoms))
# array_distance = np.zeros((natoms, natoms))
iframe = 1
lindex_array = np.zeros((len_frames, natoms, natoms))
cluster = u.select_atoms(selection_cluster, updating=True)
for q, ts in enumerate(u.trajectory):
# print(ts)
coords = cluster.positions
n, p = coords.shape
array_distance = distance.cdist(coords, coords)

# update mean and var arrays based on Welford algorithm (1962)
for i in range(natoms):
for j in range(i + 1, natoms):
xn = array_distance[i, j]
mean = array_mean[i, j]
var = array_var[i, j]
delta = xn - mean
# update mean
array_mean[i, j] = mean + delta / iframe
# update variance
array_var[i, j] = var + delta * (xn - array_mean[i, j])
iframe += 1
if iframe > nframes + 1:
break

for i in range(natoms):
for j in range(i + 1, natoms):
array_mean[j, i] = array_mean[i, j]
array_var[j, i] = array_var[i, j]

lindemann_indices = np.divide(
np.sqrt(np.divide(array_var, iframe - 1)), array_mean
)
lindex_array[q] = lindemann_indices

return np.array([np.nanmean(i, axis=1) for i in lindex_array])
ag = u.select_atoms(selection_cluster)
li = LindemannIndex(ag, box=box)
li.run()

lindex_array = np.array(li.results.lindemann_index)
return lindex_array


def distance_to_cnt(u: Universe, selection_cluster: str, cnt_direction: str):
Expand All @@ -129,7 +84,7 @@ def distance_to_cnt(u: Universe, selection_cluster: str, cnt_direction: str):
Returns:
_type_: _description_
"""
cnt = u.select_atoms('name C', updating=True) # C for carbon
cnt = u.select_atoms('name C', updating=True) # C for carbon
cluster = u.select_atoms(selection_cluster, updating=True)
distances = np.zeros((len(u.trajectory), len(cluster)))
for q, ts in enumerate(u.trajectory):
Expand Down
72 changes: 72 additions & 0 deletions miko/structure/coordination_number.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
import numpy as np
from numpy.typing import NDArray
from typing import Optional, Callable

from MDAnalysis import AtomGroup
from MDAnalysis.analysis.base import AnalysisBase
from MDAnalysis.analysis.distances import distance_array


def coordination_number_calculation(
ag1: AtomGroup,
ag2: AtomGroup,
r0: float,
d0: float = 0.0,
n: int = 6,
m: int = 12,
box: Optional[NDArray] = None,
switch_function: Optional[Callable] = None
):
d = distance_array(ag1, ag2, box)
d = d[d != 0.].reshape((len(ag1), -1))
if switch_function:
cn = switch_function(d, r0, d0, n, m) # ignore: type
else:
cn = (1 - ((d - d0) / r0) ** n) / (1 - ((d - d0) / r0) ** m)
return np.mean(np.sum(cn, axis=1))


class CoordinationNumber(AnalysisBase):
def __init__(self,
g1: AtomGroup,
g2: AtomGroup,
r0: float,
d0: float = 0.0,
n: int = 6,
m: int = 12,
box: Optional[NDArray] = None,
switch_function: Optional[Callable] = None,
**kwargs):
super(CoordinationNumber, self).__init__(
g1.universe.trajectory,
**kwargs
)
self.g1 = g1
self.g2 = g2
self.r0 = r0
self.d0 = d0
self.n = n
self.m = m
self.box = box
self.switch_function = switch_function


def _prepare(self):
self.results.coordination_number = []

def _single_frame(self):
# REQUIRED
# Called after the trajectory is moved onto each new frame.
# store an example_result of `some_function` for a single frame
self.results.coordination_number.append(
coordination_number_calculation(
ag1=self.g1,
ag2=self.g2,
r0=self.r0,
d0=self.d0,
n=self.n,
m=self.m,
box=self.box,
switch_function=self.switch_function
)
)
47 changes: 47 additions & 0 deletions miko/structure/lindemann_index.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import numpy as np
import MDAnalysis as mda

from MDAnalysis.analysis.base import AnalysisBase

from MDAnalysis.lib.distances import distance_array
from MDAnalysis.lib.log import ProgressBar


class LindemannIndex(AnalysisBase):

def __init__(self,
atomgroup: mda.AtomGroup,
box=None,
**kwargs):
self.ag = atomgroup
self.ag_natoms = len(self.ag)
self.box = box
super(LindemannIndex, self).__init__(
atomgroup.universe.trajectory,
**kwargs
)

def _prepare(self):

natoms = self.ag_natoms

self.array_mean = np.zeros((natoms, natoms))
self.array_var = np.zeros((natoms, natoms))

self.results.lindemann_index = []

def _single_frame(self):

n_atoms = len(self.ag)

array_distance = distance_array(self.ag, self.ag, box=self.box)
delta = array_distance - self.array_mean
self.array_mean += delta / (self._frame_index + 1)
self.array_var += delta * (array_distance - self.array_mean)

if self._frame_index == 0:
lindemann_indices = np.zeros((n_atoms,))
else:
lindemann_indices = np.nanmean(
np.sqrt(self.array_var / self._frame_index) / self.array_mean, axis=1)
self.results.lindemann_index.append(lindemann_indices)
14 changes: 14 additions & 0 deletions tests/structure/test_coordination_number.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import numpy as np
from pathlib import Path

import pytest
from MDAnalysis import Universe
from miko.structure.coordination_number import CoordinationNumber

def test_coordination(shared_datadir):
u = Universe(shared_datadir / 'test_case_O2Pt36.xyz')
ag1 = u.select_atoms("name O")
ag2 = u.select_atoms("name Pt")
cn = CoordinationNumber(ag1, ag2, 3.2, box=[18.0, 18.0, 18.0, 90.0, 90.0, 90.0]) # type: ignore
cn.run()
assert np.array(cn.results.coordination_number).shape == (3,)

0 comments on commit 23ad94b

Please sign in to comment.