diff --git a/fuse/plugins/micro_physics/yields.py b/fuse/plugins/micro_physics/yields.py index aa6ff3a9..16fad945 100644 --- a/fuse/plugins/micro_physics/yields.py +++ b/fuse/plugins/micro_physics/yields.py @@ -298,6 +298,7 @@ def get_quanta(self, energy, field): return beta_photons, beta_electrons, q_.excitons +@export class BBFYields(FuseBasePlugin): __version__ = "0.1.1" diff --git a/tests/test_FullChain.py b/tests/test_FullChain.py index c2b87259..0f279f2d 100644 --- a/tests/test_FullChain.py +++ b/tests/test_FullChain.py @@ -10,7 +10,7 @@ TIMEOUT = 240 -class TestFullChain(unittest.TestCase): +class TestFullChainBase(unittest.TestCase): __test__ = True @classmethod @@ -46,6 +46,10 @@ def tearDown(self): shutil.rmtree(self.temp_dir.name) os.makedirs(self.temp_dir.name) + +class TestFullChain(TestFullChainBase): + __test__ = True + @timeout_decorator.timeout(TIMEOUT, exception_message="S1PhotonHits timed out") def test_S1PhotonHits(self): self.test_context.make(self.run_number, "s1_photon_hits") @@ -88,6 +92,27 @@ def test_PulseWindow(self): def test_PMTResponseAndDAQ(self): self.test_context.make(self.run_number, "raw_records") + @timeout_decorator.timeout(TIMEOUT, exception_message="ElectronDrift_noFDC timed out") + def test_ElectronDrift_noFDC(self): + self.test_context.set_config({"field_distortion_model": None}) + self.test_context.make(self.run_number, "drifted_electrons") + + @timeout_decorator.timeout(TIMEOUT, exception_message="ElectronDrift_inverseFDC timed out") + def test_ElectronDrift_inverseFDC(self): + self.test_context.set_config( + { + "field_distortion_model": "inverse_fdc", + "fdc_map_fuse": "itp_map://resource://XnT_3D_FDC_xyt_dummy_all_zeros_v0.1.json.gz?" + "&fmt=json.gz&method=WeightedNearestNeighbors", + } + ) + self.test_context.make(self.run_number, "drifted_electrons") + + @timeout_decorator.timeout(TIMEOUT, exception_message="ElectronDrift_comsolFDC timed out") + def test_ElectronDrift_comsolFDC(self): + self.test_context.set_config({"field_distortion_model": "comsol"}) + self.test_context.make(self.run_number, "drifted_electrons") + class TestChunkedFullChain(TestFullChain): __test__ = True diff --git a/tests/test_MicroPhysics.py b/tests/test_MicroPhysics.py index 6a7c812f..046933dc 100644 --- a/tests/test_MicroPhysics.py +++ b/tests/test_MicroPhysics.py @@ -10,7 +10,7 @@ TIMEOUT = 60 -class TestMicroPhysics(unittest.TestCase): +class TestMicroPhysicsBase(unittest.TestCase): @classmethod def setUpClass(cls): cls.temp_dir = tempfile.TemporaryDirectory() @@ -42,6 +42,12 @@ def tearDown(self): shutil.rmtree(self.temp_dir.name) os.makedirs(self.temp_dir.name) + +class TestMicroPhysics(TestMicroPhysicsBase): + @classmethod + def setUpClass(cls): + super().setUpClass() + @timeout_decorator.timeout(TIMEOUT, exception_message="ChunkInput timed out") def test_ChunkInput(self): self.test_context.make(self.run_number, "geant4_interactions") @@ -71,5 +77,26 @@ def test_MicroPhysicsSummary(self): self.test_context.make(self.run_number, "microphysics_summary") +class TestMicroPhysicsAlternativePlugins(TestMicroPhysicsBase): + @classmethod + def setUpClass(cls): + super().setUpClass() + + @timeout_decorator.timeout(TIMEOUT, exception_message="BBFYields timed out") + def test_BBFYields(self): + self.test_context.register(fuse.plugins.BBFYields) + self.test_context.make(self.run_number, "quanta") + + @timeout_decorator.timeout(TIMEOUT, exception_message="WFSim connection timed out") + def test_WFSimConnection(self): + self.test_context.register(fuse.plugins.output_plugin) + self.test_context.make(self.run_number, "wfsim_instructions") + + @timeout_decorator.timeout(TIMEOUT, exception_message="GasPhasePlugin timed out") + def test_GasPhasePlugin(self): + self.test_context.register(fuse.plugins.XENONnT_GasPhase) + self.test_context.make(self.run_number, "gas_phase_interactions") + + if __name__ == "__main__": unittest.main() diff --git a/tests/test_clustering_functions.py b/tests/test_clustering_functions.py new file mode 100644 index 00000000..1839b2ea --- /dev/null +++ b/tests/test_clustering_functions.py @@ -0,0 +1,63 @@ +import unittest +import numpy as np +from fuse.plugins.micro_physics.find_cluster import _find_cluster, simple_1d_clustering + + +class TestFindCluster(unittest.TestCase): + + def test__find_cluster_all_separate(self): + + dtype = np.dtype([("x", np.float32), ("y", np.float32), ("z", np.float32)]) + x = np.zeros(6, dtype=dtype) + x["x"] = [1, 2, 3, 4, 5, 6] + x["y"] = [1, 2, 3, 4, 5, 6] + x["z"] = [1, 2, 3, 4, 5, 6] + + cluster_size_space = 0.1 + expected_result = np.array([0, 1, 2, 3, 4, 5], dtype=np.int32) + result = _find_cluster(x, cluster_size_space) + np.testing.assert_array_equal(result, expected_result) + + def test__find_cluster_all_merged(self): + + dtype = np.dtype([("x", np.float32), ("y", np.float32), ("z", np.float32)]) + x = np.zeros(6, dtype=dtype) + x["x"] = [1, 2, 3, 4, 5, 6] + x["y"] = [1, 2, 3, 4, 5, 6] + x["z"] = [1, 2, 3, 4, 5, 6] + + cluster_size_space = 1.8 + expected_result = np.array([0, 0, 0, 0, 0, 0], dtype=np.int32) + result = _find_cluster(x, cluster_size_space) + np.testing.assert_array_equal(result, expected_result) + + def test__find_cluster_two_clusters(self): + + dtype = np.dtype([("x", np.float32), ("y", np.float32), ("z", np.float32)]) + x = np.zeros(7, dtype=dtype) + x["x"] = [1, 2, 3, 7, 5, 6, 2.5] + x["y"] = [1, 2, 3, 7, 5, 6, 2.5] + x["z"] = [1, 2, 3, 7, 5, 6, 2.5] + + cluster_size_space = 1.8 + expected_result = np.array([0, 0, 0, 1, 1, 1, 0], dtype=np.int32) + result = _find_cluster(x, cluster_size_space) + np.testing.assert_array_equal(result, expected_result) + + def test_simple_1d_clustering_two_clusters_ordered_input(self): + data = np.array([0, 1, 2, 4, 5], dtype=np.float32) + scale = 1 + expected_result = np.array([0, 0, 0, 1, 1], dtype=np.int32) + result = simple_1d_clustering(data, scale) + np.testing.assert_array_equal(result, expected_result) + + def test_simple_1d_clustering_three_clusters_unordered_input(self): + data = np.array([4, 1, 2, 10, 5], dtype=np.float32) + scale = 1 + expected_result = np.array([1, 0, 0, 2, 1], dtype=np.int32) + result = simple_1d_clustering(data, scale) + np.testing.assert_array_equal(result, expected_result) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_common_functions.py b/tests/test_common_functions.py new file mode 100644 index 00000000..90f1c4aa --- /dev/null +++ b/tests/test_common_functions.py @@ -0,0 +1,70 @@ +import numpy as np +import awkward as ak +import unittest +from fuse.common import awkward_to_flat_numpy, full_array_to_numpy, dynamic_chunking + + +class TestFullArrayToNumpy(unittest.TestCase): + def test_full_array_to_numpy(self): + array = ak.Array( + { + "x": [[1, 2, 3], [4, 5], [6]], + "y": [[7, 8, 9], [10, 11], [12]], + "z": [[13, 14, 15], [16, 17], [18]], + } + ) + dtype = np.dtype([("x", np.int64), ("y", np.int64), ("z", np.int64)]) + expected_output = np.zeros(6, dtype=dtype) + expected_output["x"] = [1, 2, 3, 4, 5, 6] + expected_output["y"] = [7, 8, 9, 10, 11, 12] + expected_output["z"] = [13, 14, 15, 16, 17, 18] + + result = full_array_to_numpy(array, dtype) + + np.testing.assert_array_equal(result, expected_output) + + +class TestAwkwardToFlatNumpy(unittest.TestCase): + def test_empty_array(self): + # Test when the input array is empty + array = ak.Array([]) + result = awkward_to_flat_numpy(array) + expected = np.array([]) + np.testing.assert_allclose(result, expected) + + def test_single_jagged_layer(self): + # Test when the input array has a single jagged layer + array = ak.Array([[1, 2, 3], [4, 5], [6]]) + result = awkward_to_flat_numpy(array) + expected = np.array([1, 2, 3, 4, 5, 6]) + np.testing.assert_allclose(result, expected) + + +class TestDynamicChunking(unittest.TestCase): + def test_two_chunks(self): + data = np.array([1, 2, 3, 4, 7, 8, 9, 10]) + scale = 2 + n_min = 2 + expected_clusters = np.array([0, 0, 0, 0, 1, 1, 1, 1]) + clusters = dynamic_chunking(data, scale, n_min) + np.testing.assert_array_equal(clusters, expected_clusters) + + def test_single_chunks(self): + data = np.array([1, 2, 3, 4]) + scale = 2 + n_min = 2 + expected_clusters = np.array([0, 0, 0, 0]) + clusters = dynamic_chunking(data, scale, n_min) + np.testing.assert_array_equal(clusters, expected_clusters) + + def test_chunk_extension(self): + data = np.array([1, 2, 3, 4, 7, 8, 9, 10]) + scale = 2 + n_min = 5 + expected_clusters = np.array([0, 0, 0, 0, 0, 0, 0, 0]) + clusters = dynamic_chunking(data, scale, n_min) + np.testing.assert_array_equal(clusters, expected_clusters) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_truth_plugins.py b/tests/test_truth_plugins.py new file mode 100644 index 00000000..4780adfc --- /dev/null +++ b/tests/test_truth_plugins.py @@ -0,0 +1,58 @@ +import os +import shutil +import unittest +import tempfile +import timeout_decorator +import fuse +import straxen +from _utils import test_root_file_name + +TIMEOUT = 240 + + +class TestTruthPlugins(unittest.TestCase): + __test__ = True + + @classmethod + def setUpClass(cls): + cls.temp_dir = tempfile.TemporaryDirectory() + + cls.test_context = fuse.context.full_chain_context( + output_folder=cls.temp_dir.name, run_without_proper_corrections=True + ) + + cls.test_context.set_config( + { + "path": cls.temp_dir.name, + "file_name": test_root_file_name, + "entry_stop": 5, + } + ) + + cls.run_number = "TestRun_00000" + + @classmethod + def tearDownClass(cls): + cls.temp_dir.cleanup() + + def setUp(self): + downloader = straxen.MongoDownloader(store_files_at=(self.temp_dir.name,)) + downloader.download_single(test_root_file_name, human_readable_file_name=True) + + assert os.path.exists(os.path.join(self.temp_dir.name, test_root_file_name)) + + self.test_context.make(self.run_number, "photon_summary") + self.test_context.make(self.run_number, "raw_records") + + def tearDown(self): + # self.temp_dir.cleanup() + shutil.rmtree(self.temp_dir.name) + os.makedirs(self.temp_dir.name) + + @timeout_decorator.timeout(TIMEOUT, exception_message="RecordsTruth timed out") + def test_RecordsTruth(self): + self.test_context.make(self.run_number, "records_truth") + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_waveform_building_functions.py b/tests/test_waveform_building_functions.py new file mode 100644 index 00000000..4ff742d5 --- /dev/null +++ b/tests/test_waveform_building_functions.py @@ -0,0 +1,95 @@ +import numpy as np +import unittest +from fuse.plugins.pmt_and_daq.pmt_response_and_daq import ( + split_photons, + add_noise, + add_baseline, + split_data, + add_current, +) + + +class TestWaveformBuildingFunctions(unittest.TestCase): + def test_split_photons(self): + propagated_photons = np.array( + [(1, 10), (1, 20), (2, 30), (2, 40), (2, 50), (3, 60)], + dtype=[("pulse_id", int), ("time", int)], + ) + + expected_result = [ + np.array([(1, 10), (1, 20)], dtype=[("pulse_id", int), ("time", int)]), + np.array([(2, 30), (2, 40), (2, 50)], dtype=[("pulse_id", int), ("time", int)]), + np.array([(3, 60)], dtype=[("pulse_id", int), ("time", int)]), + ] + + result, unique_pulse_ids = split_photons(propagated_photons) + + self.assertEqual(len(result), len(expected_result)) + for i in range(len(result)): + np.testing.assert_array_equal(result[i], expected_result[i]) + np.testing.assert_array_equal(unique_pulse_ids, np.array([1, 2, 3])) + + def test_add_noise(self): + # Test the add_noise function + array = np.array([1, 2, 3, 4, 5, 6]) + time = 30 + noise_in_channel = np.array([0, 0, 1, 0, 0, 0, 0]) + + result = add_noise(array, time, noise_in_channel) + + # Assert the result is correct + expected_result = np.array([1, 2, 3, 4, 5, 7]) + np.testing.assert_array_equal(result, expected_result) + + def test_add_baseline(self): + # Test the add_baseline function + data = np.array([0, 1, 2, 3, 4]) + baseline = 16000 + + add_baseline(data, baseline) + + # Assert that the data array has been modified correctly + np.testing.assert_array_equal(data, np.array([16000, 16001, 16002, 16003, 16004])) + + def test_split_data(self): + # Test the split_data function + data = np.array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9]) + samples_per_record = 3 + + result = split_data(data, samples_per_record) + + # Assert that the result is a tuple of two np.ndarrays + self.assertIsInstance(result, tuple) + self.assertIsInstance(result[0], np.ndarray) + self.assertIsInstance(result[1], int) + + # Assert that the result has the correct shapes + self.assertEqual(result[0].shape, (4, 3)) + self.assertEqual(result[1], 4) + + np.testing.assert_array_equal(result[0][0], data[0:3]) + np.testing.assert_array_equal(result[0][1], data[3:6]) + np.testing.assert_array_equal(result[0][2], data[6:9]) + np.testing.assert_array_equal(result[0][3], np.array([9, 0, 0])) + + def test_add_current(self): + photon_timings = np.array([10, 50]) + photon_gains = np.array([100, 200]) + pulse_left = 0 + dt = 10 + pmt_current_templates = [np.array([1, 2, 3, 4, 5])] + pulse_current = np.zeros(11) + + add_current( + photon_timings, photon_gains, pulse_left, dt, pmt_current_templates, pulse_current + ) + + # Assert that the pulse_current array has been modified correctly + expected_result = np.array( + [0.0, 100.0, 200.0, 300.0, 400.0, 700.0, 400.0, 600.0, 800.0, 1000.0, 0.0] + ) + np.testing.assert_array_equal(pulse_current, expected_result) + + +if __name__ == "__main__": + unittest.main()