Skip to content

Commit

Permalink
Add more tests (#212)
Browse files Browse the repository at this point in the history
* Add test for BBFYield plugin

* Add test for WFSim connection plugin

* Refactor microphysics tests

* Add some tests for waveform building

* Adding first truth plugin test

* Add tests for clustering functions

* Add test for gas phase volume plugin

* Add tests for all FDC options
  • Loading branch information
HenningSE authored May 16, 2024
1 parent a9d9d89 commit e3523ca
Show file tree
Hide file tree
Showing 7 changed files with 341 additions and 2 deletions.
1 change: 1 addition & 0 deletions fuse/plugins/micro_physics/yields.py
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,7 @@ def get_quanta(self, energy, field):
return beta_photons, beta_electrons, q_.excitons


@export
class BBFYields(FuseBasePlugin):
__version__ = "0.1.1"

Expand Down
27 changes: 26 additions & 1 deletion tests/test_FullChain.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
TIMEOUT = 240


class TestFullChain(unittest.TestCase):
class TestFullChainBase(unittest.TestCase):
__test__ = True

@classmethod
Expand Down Expand Up @@ -46,6 +46,10 @@ def tearDown(self):
shutil.rmtree(self.temp_dir.name)
os.makedirs(self.temp_dir.name)


class TestFullChain(TestFullChainBase):
__test__ = True

@timeout_decorator.timeout(TIMEOUT, exception_message="S1PhotonHits timed out")
def test_S1PhotonHits(self):
self.test_context.make(self.run_number, "s1_photon_hits")
Expand Down Expand Up @@ -88,6 +92,27 @@ def test_PulseWindow(self):
def test_PMTResponseAndDAQ(self):
self.test_context.make(self.run_number, "raw_records")

@timeout_decorator.timeout(TIMEOUT, exception_message="ElectronDrift_noFDC timed out")
def test_ElectronDrift_noFDC(self):
self.test_context.set_config({"field_distortion_model": None})
self.test_context.make(self.run_number, "drifted_electrons")

@timeout_decorator.timeout(TIMEOUT, exception_message="ElectronDrift_inverseFDC timed out")
def test_ElectronDrift_inverseFDC(self):
self.test_context.set_config(
{
"field_distortion_model": "inverse_fdc",
"fdc_map_fuse": "itp_map://resource://XnT_3D_FDC_xyt_dummy_all_zeros_v0.1.json.gz?"
"&fmt=json.gz&method=WeightedNearestNeighbors",
}
)
self.test_context.make(self.run_number, "drifted_electrons")

@timeout_decorator.timeout(TIMEOUT, exception_message="ElectronDrift_comsolFDC timed out")
def test_ElectronDrift_comsolFDC(self):
self.test_context.set_config({"field_distortion_model": "comsol"})
self.test_context.make(self.run_number, "drifted_electrons")


class TestChunkedFullChain(TestFullChain):
__test__ = True
Expand Down
29 changes: 28 additions & 1 deletion tests/test_MicroPhysics.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
TIMEOUT = 60


class TestMicroPhysics(unittest.TestCase):
class TestMicroPhysicsBase(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls.temp_dir = tempfile.TemporaryDirectory()
Expand Down Expand Up @@ -42,6 +42,12 @@ def tearDown(self):
shutil.rmtree(self.temp_dir.name)
os.makedirs(self.temp_dir.name)


class TestMicroPhysics(TestMicroPhysicsBase):
@classmethod
def setUpClass(cls):
super().setUpClass()

@timeout_decorator.timeout(TIMEOUT, exception_message="ChunkInput timed out")
def test_ChunkInput(self):
self.test_context.make(self.run_number, "geant4_interactions")
Expand Down Expand Up @@ -71,5 +77,26 @@ def test_MicroPhysicsSummary(self):
self.test_context.make(self.run_number, "microphysics_summary")


class TestMicroPhysicsAlternativePlugins(TestMicroPhysicsBase):
@classmethod
def setUpClass(cls):
super().setUpClass()

@timeout_decorator.timeout(TIMEOUT, exception_message="BBFYields timed out")
def test_BBFYields(self):
self.test_context.register(fuse.plugins.BBFYields)
self.test_context.make(self.run_number, "quanta")

@timeout_decorator.timeout(TIMEOUT, exception_message="WFSim connection timed out")
def test_WFSimConnection(self):
self.test_context.register(fuse.plugins.output_plugin)
self.test_context.make(self.run_number, "wfsim_instructions")

@timeout_decorator.timeout(TIMEOUT, exception_message="GasPhasePlugin timed out")
def test_GasPhasePlugin(self):
self.test_context.register(fuse.plugins.XENONnT_GasPhase)
self.test_context.make(self.run_number, "gas_phase_interactions")


if __name__ == "__main__":
unittest.main()
63 changes: 63 additions & 0 deletions tests/test_clustering_functions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
import unittest
import numpy as np
from fuse.plugins.micro_physics.find_cluster import _find_cluster, simple_1d_clustering


class TestFindCluster(unittest.TestCase):

def test__find_cluster_all_separate(self):

dtype = np.dtype([("x", np.float32), ("y", np.float32), ("z", np.float32)])
x = np.zeros(6, dtype=dtype)
x["x"] = [1, 2, 3, 4, 5, 6]
x["y"] = [1, 2, 3, 4, 5, 6]
x["z"] = [1, 2, 3, 4, 5, 6]

cluster_size_space = 0.1
expected_result = np.array([0, 1, 2, 3, 4, 5], dtype=np.int32)
result = _find_cluster(x, cluster_size_space)
np.testing.assert_array_equal(result, expected_result)

def test__find_cluster_all_merged(self):

dtype = np.dtype([("x", np.float32), ("y", np.float32), ("z", np.float32)])
x = np.zeros(6, dtype=dtype)
x["x"] = [1, 2, 3, 4, 5, 6]
x["y"] = [1, 2, 3, 4, 5, 6]
x["z"] = [1, 2, 3, 4, 5, 6]

cluster_size_space = 1.8
expected_result = np.array([0, 0, 0, 0, 0, 0], dtype=np.int32)
result = _find_cluster(x, cluster_size_space)
np.testing.assert_array_equal(result, expected_result)

def test__find_cluster_two_clusters(self):

dtype = np.dtype([("x", np.float32), ("y", np.float32), ("z", np.float32)])
x = np.zeros(7, dtype=dtype)
x["x"] = [1, 2, 3, 7, 5, 6, 2.5]
x["y"] = [1, 2, 3, 7, 5, 6, 2.5]
x["z"] = [1, 2, 3, 7, 5, 6, 2.5]

cluster_size_space = 1.8
expected_result = np.array([0, 0, 0, 1, 1, 1, 0], dtype=np.int32)
result = _find_cluster(x, cluster_size_space)
np.testing.assert_array_equal(result, expected_result)

def test_simple_1d_clustering_two_clusters_ordered_input(self):
data = np.array([0, 1, 2, 4, 5], dtype=np.float32)
scale = 1
expected_result = np.array([0, 0, 0, 1, 1], dtype=np.int32)
result = simple_1d_clustering(data, scale)
np.testing.assert_array_equal(result, expected_result)

def test_simple_1d_clustering_three_clusters_unordered_input(self):
data = np.array([4, 1, 2, 10, 5], dtype=np.float32)
scale = 1
expected_result = np.array([1, 0, 0, 2, 1], dtype=np.int32)
result = simple_1d_clustering(data, scale)
np.testing.assert_array_equal(result, expected_result)


if __name__ == "__main__":
unittest.main()
70 changes: 70 additions & 0 deletions tests/test_common_functions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
import numpy as np
import awkward as ak
import unittest
from fuse.common import awkward_to_flat_numpy, full_array_to_numpy, dynamic_chunking


class TestFullArrayToNumpy(unittest.TestCase):
def test_full_array_to_numpy(self):
array = ak.Array(
{
"x": [[1, 2, 3], [4, 5], [6]],
"y": [[7, 8, 9], [10, 11], [12]],
"z": [[13, 14, 15], [16, 17], [18]],
}
)
dtype = np.dtype([("x", np.int64), ("y", np.int64), ("z", np.int64)])
expected_output = np.zeros(6, dtype=dtype)
expected_output["x"] = [1, 2, 3, 4, 5, 6]
expected_output["y"] = [7, 8, 9, 10, 11, 12]
expected_output["z"] = [13, 14, 15, 16, 17, 18]

result = full_array_to_numpy(array, dtype)

np.testing.assert_array_equal(result, expected_output)


class TestAwkwardToFlatNumpy(unittest.TestCase):
def test_empty_array(self):
# Test when the input array is empty
array = ak.Array([])
result = awkward_to_flat_numpy(array)
expected = np.array([])
np.testing.assert_allclose(result, expected)

def test_single_jagged_layer(self):
# Test when the input array has a single jagged layer
array = ak.Array([[1, 2, 3], [4, 5], [6]])
result = awkward_to_flat_numpy(array)
expected = np.array([1, 2, 3, 4, 5, 6])
np.testing.assert_allclose(result, expected)


class TestDynamicChunking(unittest.TestCase):
def test_two_chunks(self):
data = np.array([1, 2, 3, 4, 7, 8, 9, 10])
scale = 2
n_min = 2
expected_clusters = np.array([0, 0, 0, 0, 1, 1, 1, 1])
clusters = dynamic_chunking(data, scale, n_min)
np.testing.assert_array_equal(clusters, expected_clusters)

def test_single_chunks(self):
data = np.array([1, 2, 3, 4])
scale = 2
n_min = 2
expected_clusters = np.array([0, 0, 0, 0])
clusters = dynamic_chunking(data, scale, n_min)
np.testing.assert_array_equal(clusters, expected_clusters)

def test_chunk_extension(self):
data = np.array([1, 2, 3, 4, 7, 8, 9, 10])
scale = 2
n_min = 5
expected_clusters = np.array([0, 0, 0, 0, 0, 0, 0, 0])
clusters = dynamic_chunking(data, scale, n_min)
np.testing.assert_array_equal(clusters, expected_clusters)


if __name__ == "__main__":
unittest.main()
58 changes: 58 additions & 0 deletions tests/test_truth_plugins.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
import os
import shutil
import unittest
import tempfile
import timeout_decorator
import fuse
import straxen
from _utils import test_root_file_name

TIMEOUT = 240


class TestTruthPlugins(unittest.TestCase):
__test__ = True

@classmethod
def setUpClass(cls):
cls.temp_dir = tempfile.TemporaryDirectory()

cls.test_context = fuse.context.full_chain_context(
output_folder=cls.temp_dir.name, run_without_proper_corrections=True
)

cls.test_context.set_config(
{
"path": cls.temp_dir.name,
"file_name": test_root_file_name,
"entry_stop": 5,
}
)

cls.run_number = "TestRun_00000"

@classmethod
def tearDownClass(cls):
cls.temp_dir.cleanup()

def setUp(self):
downloader = straxen.MongoDownloader(store_files_at=(self.temp_dir.name,))
downloader.download_single(test_root_file_name, human_readable_file_name=True)

assert os.path.exists(os.path.join(self.temp_dir.name, test_root_file_name))

self.test_context.make(self.run_number, "photon_summary")
self.test_context.make(self.run_number, "raw_records")

def tearDown(self):
# self.temp_dir.cleanup()
shutil.rmtree(self.temp_dir.name)
os.makedirs(self.temp_dir.name)

@timeout_decorator.timeout(TIMEOUT, exception_message="RecordsTruth timed out")
def test_RecordsTruth(self):
self.test_context.make(self.run_number, "records_truth")


if __name__ == "__main__":
unittest.main()
95 changes: 95 additions & 0 deletions tests/test_waveform_building_functions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
import numpy as np
import unittest
from fuse.plugins.pmt_and_daq.pmt_response_and_daq import (
split_photons,
add_noise,
add_baseline,
split_data,
add_current,
)


class TestWaveformBuildingFunctions(unittest.TestCase):
def test_split_photons(self):
propagated_photons = np.array(
[(1, 10), (1, 20), (2, 30), (2, 40), (2, 50), (3, 60)],
dtype=[("pulse_id", int), ("time", int)],
)

expected_result = [
np.array([(1, 10), (1, 20)], dtype=[("pulse_id", int), ("time", int)]),
np.array([(2, 30), (2, 40), (2, 50)], dtype=[("pulse_id", int), ("time", int)]),
np.array([(3, 60)], dtype=[("pulse_id", int), ("time", int)]),
]

result, unique_pulse_ids = split_photons(propagated_photons)

self.assertEqual(len(result), len(expected_result))
for i in range(len(result)):
np.testing.assert_array_equal(result[i], expected_result[i])
np.testing.assert_array_equal(unique_pulse_ids, np.array([1, 2, 3]))

def test_add_noise(self):
# Test the add_noise function
array = np.array([1, 2, 3, 4, 5, 6])
time = 30
noise_in_channel = np.array([0, 0, 1, 0, 0, 0, 0])

result = add_noise(array, time, noise_in_channel)

# Assert the result is correct
expected_result = np.array([1, 2, 3, 4, 5, 7])
np.testing.assert_array_equal(result, expected_result)

def test_add_baseline(self):
# Test the add_baseline function
data = np.array([0, 1, 2, 3, 4])
baseline = 16000

add_baseline(data, baseline)

# Assert that the data array has been modified correctly
np.testing.assert_array_equal(data, np.array([16000, 16001, 16002, 16003, 16004]))

def test_split_data(self):
# Test the split_data function
data = np.array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9])
samples_per_record = 3

result = split_data(data, samples_per_record)

# Assert that the result is a tuple of two np.ndarrays
self.assertIsInstance(result, tuple)
self.assertIsInstance(result[0], np.ndarray)
self.assertIsInstance(result[1], int)

# Assert that the result has the correct shapes
self.assertEqual(result[0].shape, (4, 3))
self.assertEqual(result[1], 4)

np.testing.assert_array_equal(result[0][0], data[0:3])
np.testing.assert_array_equal(result[0][1], data[3:6])
np.testing.assert_array_equal(result[0][2], data[6:9])
np.testing.assert_array_equal(result[0][3], np.array([9, 0, 0]))

def test_add_current(self):
photon_timings = np.array([10, 50])
photon_gains = np.array([100, 200])
pulse_left = 0
dt = 10
pmt_current_templates = [np.array([1, 2, 3, 4, 5])]
pulse_current = np.zeros(11)

add_current(
photon_timings, photon_gains, pulse_left, dt, pmt_current_templates, pulse_current
)

# Assert that the pulse_current array has been modified correctly
expected_result = np.array(
[0.0, 100.0, 200.0, 300.0, 400.0, 700.0, 400.0, 600.0, 800.0, 1000.0, 0.0]
)
np.testing.assert_array_equal(pulse_current, expected_result)


if __name__ == "__main__":
unittest.main()

0 comments on commit e3523ca

Please sign in to comment.