Skip to content

Commit

Permalink
Merge pull request #474 from MDAnalysis/single-base-reader-test
Browse files Browse the repository at this point in the history
add base reader testing class
  • Loading branch information
richardjgowers committed Oct 25, 2015
2 parents cfdf38b + 63aa6e9 commit 96e3ce9
Show file tree
Hide file tree
Showing 10 changed files with 454 additions and 202 deletions.
44 changes: 30 additions & 14 deletions package/MDAnalysis/coordinates/XYZ.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,12 @@ def _get_atomnames(self, atoms):
pass
# list or string (can be a single atom name... deal with this in
# write_next_timestep() once we know n_atoms)
return np.asarray(util.asiterable(atoms))
if self.n_atoms is None:
return np.asarray(util.asiterable(atoms))
if isinstance(atoms, list):
return atoms
else:
return np.asarray([atoms for _ in range(self.n_atoms)])

def close(self):
"""Close the trajectory file and finalize the writing"""
Expand Down Expand Up @@ -215,6 +220,7 @@ def write_next_timestep(self, ts=None):
else:
ts = self.ts


if self.n_atoms is not None:
if self.n_atoms != ts.n_atoms:
raise ValueError('n_atoms keyword was specified indicating '
Expand Down Expand Up @@ -379,21 +385,31 @@ def open_trajectory(self):

return self.xyzfile

def Writer(self, filename, **kwargs):
"""Returns a XYZWriter for *filename* with the same parameters as this XYZ.
All values can be changed through keyword arguments.
:Arguments:
*filename*
filename of the output DCD trajectory
:Keywords:
*atoms*
names of the atoms (if not taken from atom groups)
def Writer(self, filename, n_atoms=None, **kwargs):
"""Returns a XYZWriter for *filename* with the same parameters as this
XYZ.
:Returns: :class:`XYZWriter` (see there for more details)
Parameters
----------
filename: str
filename of the output trajectory
n_atoms: int (optional)
number of atoms. If none is given use the same number of atoms from
the reader instance is used
**kwargs:
See :class:`XYZWriter` for additional kwargs
Returns
-------
:class:`XYZWriter` (see there for more details)
See Also
--------
:class: `XYZWriter`
"""
return XYZWriter(filename, **kwargs)
if n_atoms is None:
n_atoms = self.n_atoms
return XYZWriter(filename, n_atoms=n_atoms, **kwargs)

def close(self):
"""Close xyz trajectory file if it was open."""
Expand Down
237 changes: 236 additions & 1 deletion testsuite/MDAnalysisTests/coordinates/base.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
import MDAnalysis as mda
import numpy as np
from six.moves import zip
from MDAnalysis.coordinates.base import Timestep

from numpy.testing import assert_equal, assert_raises, assert_almost_equal
from numpy.testing import (assert_equal, assert_raises, assert_almost_equal,
assert_array_almost_equal, raises)
from unittest import TestCase
import tempdir

from MDAnalysisTests.coordinates.reference import RefAdKSmall

Expand Down Expand Up @@ -92,3 +96,234 @@ def test_last_slice(self):
trj_iter = self.universe.trajectory[-1:]
frames = [ts.frame for ts in trj_iter]
assert_equal(frames, np.arange(self.universe.trajectory.n_frames))


class BaseReference(object):
def __init__(self):
self.trajectory = None
self.n_atoms = 5
self.n_frames = 5
# default for the numpy test functions
self.prec = 6

self.first_frame = Timestep(self.n_atoms)
self.first_frame.positions = np.arange(
3 * self.n_atoms).reshape(self.n_atoms, 3)
self.first_frame.frame = 0

self.second_frame = self.first_frame.copy()
self.second_frame.positions = 2 ** 1 * self.first_frame.positions
self.second_frame.frame = 1

self.last_frame = self.first_frame.copy()
self.last_frame.positions = 2 ** 4 * self.first_frame.positions
self.last_frame.frame = self.n_frames - 1

# remember frames are 0 indexed
self.jump_to_frame = self.first_frame.copy()
self.jump_to_frame.positions = 2 ** 3 * self.first_frame.positions
self.jump_to_frame.frame = 3

self.dimensions = np.array([80, 80, 80, 60, 60, 90], dtype=np.float32)
self.volume = mda.lib.mdamath.box_volume(self.dimensions)
self.time = 0
self.dt = 1
self.totaltime = 5

def iter_ts(self, i):
ts = self.first_frame.copy()
ts.positions = 2**i * self.first_frame.positions
ts.time = i
ts.frame = i
return ts


class BaseReaderTest(object):
def __init__(self, reference):
self.ref = reference
self.reader = self.ref.reader(self.ref.trajectory)

def test_n_atoms(self):
assert_equal(self.reader.n_atoms, self.ref.n_atoms)

def test_n_frames(self):
assert_equal(len(self.reader), self.ref.n_frames)

def test_first_frame(self):
self.reader.rewind()
assert_timestep_almost_equal(self.reader.ts, self.ref.first_frame,
decimal=self.ref.prec)

def test_double_close(self):
self.reader.close()
self.reader.close()
self.reader._reopen()

def test_reopen(self):
self.reader.close()
self.reader._reopen()
ts = self.reader.next()
assert_timestep_almost_equal(ts, self.ref.first_frame,
decimal=self.ref.prec)

def test_last_frame(self):
ts = self.reader[-1]
assert_timestep_almost_equal(ts, self.ref.last_frame,
decimal=self.ref.prec)

def test_next_gives_second_frame(self):
reader = self.ref.reader(self.ref.trajectory)
ts = reader.next()
assert_timestep_almost_equal(ts, self.ref.second_frame,
decimal=self.ref.prec)

@raises(IndexError)
def test_go_over_last_frame(self):
self.reader[self.ref.n_frames + 1]

def test_frame_jump(self):
ts = self.reader[self.ref.jump_to_frame.frame]
assert_timestep_almost_equal(ts, self.ref.jump_to_frame,
decimal=self.ref.prec)

def test_get_writer(self):
with tempdir.in_tempdir():
self.outfile = 'test-writer' + self.ref.ext
with self.reader.Writer(self.outfile) as W:
assert_equal(isinstance(W, self.ref.writer), True)
assert_equal(W.n_atoms, self.reader.n_atoms)

def test_get_writer_2(self):
with tempdir.in_tempdir():
self.outfile = 'test-writer' + self.ref.ext
with self.reader.Writer(self.outfile, n_atoms=100) as W:
assert_equal(isinstance(W, self.ref.writer), True)
assert_equal(W.n_atoms, 100)

def test_dt(self):
assert_equal(self.reader.dt, self.ref.dt)

def test_total_time(self):
assert_equal(self.reader.totaltime, self.ref.totaltime)

def test_dimensions(self):
assert_array_almost_equal(self.reader.ts.dimensions,
self.ref.dimensions,
decimal=self.ref.prec)

def test_volume(self):
self.reader.rewind()
vol = self.reader.ts.volume
assert_array_almost_equal(vol, self.ref.volume)

def test_iter(self):
for i, ts in enumerate(self.reader):
assert_timestep_almost_equal(ts, self.ref.iter_ts(i),
decimal=self.ref.prec)



class BaseWriterTest(object):
def __init__(self, reference):
self.ref = reference
self.tmpdir = tempdir.TempDir()
self.reader = self.ref.reader(self.ref.trajectory)

def tmp_file(self, name):
return self.tmpdir.name + name + '.' + self.ref.ext

def test_write_trajectory_timestep(self):
outfile = self.tmp_file('write-timestep-test')
with self.ref.writer(outfile) as W:
for ts in self.reader:
W.write(ts)
self._check_copy(outfile)

def test_write_trajectory_atomgroup(self):
uni = mda.Universe(self.ref.topology, self.ref.trajectory)
outfile = self.tmp_file('write-atoms-test')
with self.ref.writer(outfile) as w:
for ts in uni.trajectory:
w.write(uni.atoms)
self._check_copy(outfile)

def test_write_trajectory_universe(self):
uni = mda.Universe(self.ref.topology, self.ref.trajectory)
outfile = self.tmp_file('write-uni-test')
with self.ref.writer(outfile) as w:
for ts in uni.trajectory:
w.write(uni)
self._check_copy(outfile)

def test_write_selection(self):
uni = mda.Universe(self.ref.topology, self.ref.trajectory)
sel_str = 'resid 1'
sel = uni.select_atoms(sel_str)
outfile = self.tmp_file('write-selection-test')

with self.ref.writer(outfile) as W:
for ts in uni.trajectory:
W.write(sel.atoms)

copy = self.ref.reader(outfile)
for orig_ts, copy_ts in zip(uni.trajectory, copy):
assert_array_almost_equal(
copy_ts._pos, sel.atoms.positions, self.ref.prec,
err_msg="coordinate mismatch between original and written "
"trajectory at frame {} (orig) vs {} (copy)".format(
orig_ts.frame, copy_ts.frame))

def _check_copy(self, fname):
copy = self.ref.reader(fname)
assert_equal(self.reader.n_frames, copy.n_frames)
for orig_ts, copy_ts in zip(self.reader, copy):
assert_timestep_almost_equal(
copy_ts, orig_ts, decimal=self.ref.prec)

@raises(mda.NoDataError)
def test_write_none(self):
outfile = self.tmp_file('write-none')
with self.ref.writer(outfile) as w:
w.write(None)


def assert_timestep_almost_equal(A, B, decimal=6, verbose=True):
if not isinstance(A, Timestep):
raise AssertionError('A is not of type Timestep')
if not isinstance(B, Timestep):
raise AssertionError('B is not of type Timestep')

if A.frame != B.frame:
raise AssertionError('A and B refer to different frames: '
'A.frame = {}, B.frame={}'.format(
A.frame, B.frame))

if A.time != B.time:
raise AssertionError('A and B refer to different times: '
'A.time = {}, B.time={}'.format(
A.time, B.time))

if A.n_atoms != B.n_atoms:
raise AssertionError('A and B have a differnent number of atoms: '
'A.n_atoms = {}, B.n_atoms = {}'.format(
A.n_atoms, B.n_atoms))

assert_array_almost_equal(A.positions, B.positions, decimal=decimal,
err_msg='Timestep positions', verbose=verbose)

if A.has_velocities != B.has_velocities:
raise AssertionError('Only one Timestep has velocities:'
'A.has_velocities = {}, B.has_velocities'.format(
A.has_velocities, B.has_velocities))
if A.has_velocities:
assert_array_almost_equal(A.velocities, B.velocities, decimal=decimal,
err_msg='Timestep velocities',
verbose=verbose)

if A.has_forces != B.has_forces:
raise AssertionError('Only one Timestep has forces:'
'A.has_forces = {}, B.has_forces'.format(
A.has_forces, B.has_forces))
if A.has_forces:
assert_array_almost_equal(A.forces, B.forces, decimal=decimal,
err_msg='Timestep forces', verbose=verbose)
Loading

0 comments on commit 96e3ce9

Please sign in to comment.