Skip to content

Commit

Permalink
feat: add utils and tools for atomic properties
Browse files Browse the repository at this point in the history
feat: add `GroupsDistanceIndex` analysis
  • Loading branch information
Cloudac7 committed Nov 10, 2023
1 parent 3c40ef6 commit 8ee5c31
Show file tree
Hide file tree
Showing 7 changed files with 286 additions and 0 deletions.
Empty file added miko/atomic/__init__.py
Empty file.
23 changes: 23 additions & 0 deletions miko/atomic/atomic_energy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import numpy as np

from joblib import Parallel, delayed
from .utils import load_reader

def atomic_ener_model_devi_atomic(*files, key_name, **kwargs):
"""Calculate the deviation of atomic energy from a model."""

def _atomic_ener_model_devi(f, key_name, **kwargs):
reader = load_reader(f, **kwargs)
results = reader.read_atomic_property()
return results[key_name]

results = Parallel(n_jobs=-1, verbose=5)(
delayed(_atomic_ener_model_devi)(f, key_name, **kwargs) for f in files
)
results = np.array(results)

return np.std(results, axis=0)

def atomic_ener_model_devi(*files, key_name, **kwargs):
results = atomic_ener_model_devi_atomic(*files, key_name=key_name, **kwargs)
return np.max(results, axis=1), np.min(results, axis=1), np.mean(results, axis=1)
137 changes: 137 additions & 0 deletions miko/atomic/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
import numpy as np

def load_reader(path, format="lammps-dump"):
if format == "lammps-dump":
return LAMMPSDumpReader(path, format=format)
elif format == "netcdf":
return NetCDFReader(path, format=format)
else:
raise NotImplementedError(
"Format {} not implemented".format(format))


class AtomicPropertyReader:
"""
A class for reading atomic properties from a file.
Args:
path (str): The path to the file to be read.
format (str, optional): The format of the file. Defaults to "lammps-dump".
Raises:
NotImplementedError: If the specified format is not implemented.
Attributes:
path (str): The path to the file to be read.
format (str): The format of the file.
"""

def __init__(self, path, format="lammps-dump"):
self.path = path
self.format = format

def read_atomic_property(self):
pass


class LAMMPSDumpReader(AtomicPropertyReader):
"""
A class for reading atomic properties from LAMMPS dump files.
Attributes:
path (str): The path to the LAMMPS dump file.
Methods:
read_atomic_property(): Reads all frames from the LAMMPS dump file and returns the atomic properties.
Raises:
ValueError: If results keys do not match atomic properties in file.
"""
def _read_single_frame(self, f, results=None):
"""
Reads a single frame from the LAMMPS dump file.
Args:
f (file): The file object to read from.
results (dict, optional): The dictionary to store the atomic properties. Defaults to None.
Returns:
bool: True if a frame is successfully read, False otherwise.
"""
if not f.readline(): # ITEM TIMESTEP
return False
f.readline()
f.readline() # ITEM NUMBER OF ATOMS
n_atoms = int(f.readline())

# triclinic = len(f.readline().split()) == 9 # ITEM BOX BOUNDS
for _ in range(4):
f.readline()

indices = np.zeros(n_atoms, dtype=int)
atom_line = f.readline() # ITEM ATOMS etc
attrs = atom_line.split()[2:] # attributes on coordinate line
attr_to_col_ix = {x: i for i, x in enumerate(attrs)}

_has_atomic_properties = any("c_" in ix for ix in attr_to_col_ix)
atomic_cols = [(ix, attr_to_col_ix[ix])
for ix in attr_to_col_ix if "c_" in ix] if _has_atomic_properties else []
ids = "id" in attr_to_col_ix

if results is None:
results = {dim[0]: [] for dim in atomic_cols}
elif results == {}:
for dim in atomic_cols:
results[dim[0]] = []
elif results.keys() != set([dim[0] for dim in atomic_cols]):
raise ValueError(
"Results keys do not match atomic properties in file")

data = {dim[0]: np.zeros(n_atoms) for dim in atomic_cols}

for i in range(n_atoms):
fields = f.readline().split()
if ids:
indices[i] = fields[attr_to_col_ix["id"]]
if _has_atomic_properties:
for dim in atomic_cols:
data[dim[0]][i] = fields[dim[1]]
order = np.argsort(indices)
if _has_atomic_properties:
for dim in atomic_cols:
data[dim[0]] = data[dim[0]][order]
results[dim[0]].append(data[dim[0]])
return True

def read_atomic_property(self):
"""
Reads all frames from the LAMMPS dump file and returns the atomic properties.
Returns:
dict: A dictionary containing the atomic properties.
"""
with open(self.path) as f:
# Initialize results with empty lists for each atomic property
results = {}

# Read the rest of the frames
while self._read_single_frame(f, results):
continue

# Convert lists to ndarrays
for key in results:
results[key] = np.array(results[key])
return results


class NetCDFReader(AtomicPropertyReader):
"""A class for reading atomic properties from netCDF files."""

def read_atomic_property(self):
try:
from netCDF4 import Dataset # type: ignore
except ImportError:
raise ImportError("netCDF4 is not installed")

with Dataset(self.path, 'r') as data:
return {k: np.asarray(v) for k, v in data.variables.items() if "c_" in k}
53 changes: 53 additions & 0 deletions miko/structure/dynamic_selection.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,3 +228,56 @@ def _single_frame(self):
self_distances.flatten())[:self.size]]
self.results.atom_groups.append(new_ag)
self.results.indices.append(new_ag.indices)


class GroupsDistanceIndex(AnalysisBase):

def __init__(self,
ag,
sliced_ags,
limitation: int = 1,
box=None,
zone=None,
**kwargs):
self.ag = ag
self.sliced_ags = sliced_ags
self.limitation = limitation
self.universe = ag.universe
self.box = box
self.zone = zone
if box and self.universe.dimensions is None:
self.universe.dimensions = box
super(GroupsDistanceIndex, self).__init__(
ag.universe.trajectory,
**kwargs
)

def _prepare(self):
self.results.atom_groups = []
self.results.indices = []

def _single_frame(self):
import heapq as hq
new_ag = self.ag

reference = self.ag.center_of_geometry()
reference[0] = 0.
reference[1] = 0.

min_queue = []

for i, sl in enumerate(self.sliced_ags):
temp_distances = sl.positions[:]
temp_distances[:, 0] = 0.
temp_distances[:, 1] = 0.
self_distance = np.mean(distance_array(
temp_distances, reference, box=self.box
))
min_queue.append(self_distance)

for i, j in hq.nsmallest(
self.limitation, enumerate(min_queue), key=lambda x: x[1]
):
new_ag = new_ag | self.sliced_ags[i]
self.results.atom_groups.append(new_ag)
self.results.indices.append(new_ag.indices)
Empty file added tests/atomic/__init__.py
Empty file.
35 changes: 35 additions & 0 deletions tests/atomic/test_atomic_energy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import numpy as np
from miko.atomic.atomic_energy import atomic_ener_model_devi_atomic
from miko.atomic.atomic_energy import atomic_ener_model_devi


def test_atomic_ener_model_devi_atomic(tmpdir):
# Create a test dump file
dummy_pe = []
for i in range(4):
dump_file = tmpdir / f'test_dump_file_{i}.dump'
np.random.seed(114514 + i)
dummy_pe_frame = np.random.rand(2)
dummy_pe.append([dummy_pe_frame])
with open(dump_file, 'w') as f:
f.write(
"ITEM: TIMESTEP\n0\n"
"ITEM: NUMBER OF ATOMS\n2\n"
"ITEM: BOX BOUNDS pp pp pp\n"
"0.0 10.0\n0.0 10.0\n0.0 10.0\n"
"ITEM: ATOMS id type x y z c_pe\n"
f"1 1 1.0 2.0 3.0 {dummy_pe_frame[0]}\n"
f"2 2 4.0 5.0 6.0 {dummy_pe_frame[1]}\n"
)

# Test atomic_ener_model_devi_atomic
std_dev = atomic_ener_model_devi_atomic(
*[tmpdir / f'test_dump_file_{i}.dump' for i in range(4)], key_name='c_pe'
)
assert np.allclose(std_dev, np.std(dummy_pe, axis=0))
max_dev, min_dev, mean_dev = atomic_ener_model_devi(
*[tmpdir / f'test_dump_file_{i}.dump' for i in range(4)], key_name='c_pe'
)
assert np.allclose(max_dev, np.max(np.std(dummy_pe, axis=0), axis=1))
assert np.allclose(min_dev, np.min(np.std(dummy_pe, axis=0), axis=1))
assert np.allclose(mean_dev, np.mean(np.std(dummy_pe, axis=0), axis=1))
38 changes: 38 additions & 0 deletions tests/atomic/test_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import numpy as np
from miko.atomic.utils import load_reader


def test_read_atomic_property(tmpdir):
# Create a test dump file
dump_file = tmpdir / 'test_dump_file.dump'
with open(dump_file, 'w') as f:
f.write(
"ITEM: TIMESTEP\n0\n"
"ITEM: NUMBER OF ATOMS\n2\n"
"ITEM: BOX BOUNDS pp pp pp\n"
"0.0 10.0\n0.0 10.0\n0.0 10.0\n"
"ITEM: ATOMS id type x y z c_1[1] c_1[2]\n"
"1 1 1.0 2.0 3.0 6.0 7.0\n"
"2 2 4.0 5.0 6.0 8.0 9.0\n"
"ITEM: TIMESTEP\n1\n"
"ITEM: NUMBER OF ATOMS\n2\n"
"ITEM: BOX BOUNDS pp pp pp\n"
"0.0 10.0\n0.0 10.0\n0.0 10.0\n"
"ITEM: ATOMS id type x y z c_1[1] c_1[2]\n"
"2 2 2.0 7.0 6.0 11.0 10.0\n"
"1 1 1.0 3.0 3.0 12.0 9.0\n"
)

# Initialize DecompUtils and call read_atomic_property
utils = load_reader(dump_file)
results = utils.read_atomic_property()

# Check the results
assert isinstance(results, dict)
assert len(results) == 2
assert np.array_equal(
results['c_1[1]'], np.array([[6., 8.], [12., 11.]])
)
assert np.array_equal(
results['c_1[2]'], np.array([[7., 9.], [9., 10.]])
)

0 comments on commit 8ee5c31

Please sign in to comment.