From 6a56ae8eaed63e04d9a8fd8a691e851c5a993148 Mon Sep 17 00:00:00 2001 From: esmithExperimentControls Date: Mon, 13 May 2024 16:50:51 +0100 Subject: [PATCH 01/21] Add basic test for disconnected eurotherm --- common_tests/eurotherm.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/common_tests/eurotherm.py b/common_tests/eurotherm.py index cdd47350..51d7cc1c 100644 --- a/common_tests/eurotherm.py +++ b/common_tests/eurotherm.py @@ -255,6 +255,10 @@ def test_WHEN_disconnected_THEN_in_alarm(self, record): # Assert alarms clear on reconnection with self._get_temperature_setter_wrapper(): self.ca.assert_that_pv_alarm_is(record, ChannelAccess.Alarms.NONE, timeout=30) + + def test_WHEN_eurotherm_missing_THEN_updates_of_PVs_stop(self): + self._lewis.backdoor_simulate_disconnected_device() + @parameterized.expand(parameterized_list(PID_TEST_VALUES)) @skip_if_recsim("Backdoor not available in recsim") From f392909b82d8bb44d62b78711fb9096f9bd4a434 Mon Sep 17 00:00:00 2001 From: esmithExperimentControls Date: Tue, 14 May 2024 14:16:09 +0100 Subject: [PATCH 02/21] Add pv assert to ensure that RBV doesn't change when the eurotherm is disconnected --- common_tests/eurotherm.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/common_tests/eurotherm.py b/common_tests/eurotherm.py index 51d7cc1c..143bd6aa 100644 --- a/common_tests/eurotherm.py +++ b/common_tests/eurotherm.py @@ -257,8 +257,8 @@ def test_WHEN_disconnected_THEN_in_alarm(self, record): self.ca.assert_that_pv_alarm_is(record, ChannelAccess.Alarms.NONE, timeout=30) def test_WHEN_eurotherm_missing_THEN_updates_of_PVs_stop(self): - self._lewis.backdoor_simulate_disconnected_device() - + with self._lewis.backdoor_simulate_disconnected_device(): + self.ca.assert_that_pv_value_is_unchanged("A01:RBV", 20) @parameterized.expand(parameterized_list(PID_TEST_VALUES)) @skip_if_recsim("Backdoor not available in recsim") From b8c17767115a078340f1f258b6e51c521640080f Mon Sep 17 00:00:00 2001 From: esmithExperimentControls Date: Wed, 30 Oct 2024 10:20:10 +0000 Subject: [PATCH 03/21] change set on device to run function --- common_tests/eurotherm.py | 44 ++++++++++++++++++++------------------- 1 file changed, 23 insertions(+), 21 deletions(-) diff --git a/common_tests/eurotherm.py b/common_tests/eurotherm.py index 1b511c02..e06a28d4 100644 --- a/common_tests/eurotherm.py +++ b/common_tests/eurotherm.py @@ -19,6 +19,7 @@ # PIDs cannot be floating-point PID_TEST_VALUES = [-50, 50, 3000] +sensors = ["0011", "0022", "0033", "0044", "0055", "0066"] class EurothermBaseTests(metaclass=abc.ABCMeta): """ @@ -55,16 +56,17 @@ def _setup_lewis_and_channel_access(self): self.ca.assert_that_pv_exists("A01:RBV", timeout=30) self.ca.assert_that_pv_exists("A01:CAL:SEL", timeout=10) - def _reset_device_state(self, sensor="A01"): - self._lewis.backdoor_set_on_device("connected", True) - reset_calibration_file(self.ca, prefix=f"{sensor}:") + def _reset_device_state(self): + for sensor in sensors: + self._lewis.backdoor_run_function_on_device("set_connected", [sensor , True]) + reset_calibration_file(self.ca, prefix=f"{'A01'}:") intial_temp = 0.0 self._set_setpoint_and_current_temperature(intial_temp) - self._lewis.backdoor_set_on_device("ramping_on", False) - self._lewis.backdoor_set_on_device("ramp_rate", 1.0) + self._lewis.backdoor_run_function_on_device("set_ramping_on", [sensors[0]]) + self._lewis.backdoor_run_function_on_device("set_ramp_rate", [sensors[0], 1.0]) self.ca.set_pv_value(f"{sensor}:RAMPON:SP", 0, sleep_after_set=0) self._set_setpoint_and_current_temperature(intial_temp) @@ -79,9 +81,9 @@ def _set_setpoint_and_current_temperature(self, temperature, sensor="A01"): self.ca.assert_that_pv_is(f"{sensor}:SIM:TEMP:SP", temperature) self.ca.assert_that_pv_is(f"{sensor}:SIM:TEMP:SP:RBV", temperature) else: - self._lewis.backdoor_set_on_device("current_temperature", temperature) + self._lewis.backdoor_run_function_on_device("set_current_temperature", [sensors[0], temperature]) self.ca.assert_that_pv_is_number(f"{sensor}:TEMP", temperature, 0.1, timeout=30) - self._lewis.backdoor_set_on_device("ramp_setpoint_temperature", temperature) + self._lewis.backdoor_run_function_on_device("set_ramp_setpoint_temperature", [sensors[0], temperature]) self.ca.assert_that_pv_is_number(f"{sensor}:TEMP:SP:RBV", temperature, 0.1, timeout=30) def test_WHEN_read_rbv_temperature_THEN_rbv_value_is_same_as_backdoor(self): @@ -121,14 +123,14 @@ def test_WHEN_set_ramp_rate_in_K_per_min_THEN_current_temperature_reaches_set_po ) # Lower tolerance will be too tight given scan rate def test_WHEN_sensor_disconnected_THEN_ramp_setting_is_disabled(self): - self._lewis.backdoor_set_on_device("current_temperature", SENSOR_DISCONNECTED_VALUE) + self._lewis.backdoor_run_function_on_device("set_current_temperature", [sensors[0], SENSOR_DISCONNECTED_VALUE]) self.ca.assert_that_pv_is_number("A01:RAMPON:SP.DISP", 1) def test_GIVEN_sensor_disconnected_WHEN_sensor_reconnected_THEN_ramp_setting_is_enabled(self): - self._lewis.backdoor_set_on_device("current_temperature", SENSOR_DISCONNECTED_VALUE) + self._lewis.backdoor_run_function_on_device("set_current_temperature", [sensors[0], SENSOR_DISCONNECTED_VALUE]) - self._lewis.backdoor_set_on_device("current_temperature", 0) + self._lewis.backdoor_run_function_on_device("set_current_temperature", [sensors[0], 0]) self.ca.assert_that_pv_is_number("A01:RAMPON:SP.DISP", 0) @@ -137,7 +139,7 @@ def test_GIVEN_ramp_was_off_WHEN_sensor_disconnected_THEN_ramp_is_off_and_cached ): self.ca.set_pv_value("A01:RAMPON:SP", 0) - self._lewis.backdoor_set_on_device("current_temperature", SENSOR_DISCONNECTED_VALUE) + self._lewis.backdoor_run_function_on_device("set_current_temperature", [sensors[0], SENSOR_DISCONNECTED_VALUE]) self.ca.assert_that_pv_is("A01:RAMPON", "OFF") self.ca.assert_that_pv_is("A01:RAMPON:CACHE", "OFF") @@ -147,7 +149,7 @@ def test_GIVEN_ramp_was_on_WHEN_sensor_disconnected_THEN_ramp_is_off_and_cached_ ): self.ca.set_pv_value("A01:RAMPON:SP", 1) - self._lewis.backdoor_set_on_device("current_temperature", SENSOR_DISCONNECTED_VALUE) + self._lewis.backdoor_run_function_on_device("set_current_temperature", [sensors[0], SENSOR_DISCONNECTED_VALUE]) self.ca.assert_that_pv_is("A01:RAMPON", "OFF") self.ca.assert_that_pv_is("A01:RAMPON:CACHE", "ON") @@ -155,9 +157,9 @@ def test_GIVEN_ramp_was_on_WHEN_sensor_disconnected_THEN_ramp_is_off_and_cached_ def test_GIVEN_ramp_was_on_WHEN_sensor_disconnected_and_reconnected_THEN_ramp_is_on(self): self.ca.set_pv_value("A01:RAMPON:SP", 1) - self._lewis.backdoor_set_on_device("current_temperature", SENSOR_DISCONNECTED_VALUE) + self._lewis.backdoor_run_function_on_device("set_current_temperature", [sensors[0], SENSOR_DISCONNECTED_VALUE]) self.ca.assert_that_pv_is("A01:RAMPON", "OFF") - self._lewis.backdoor_set_on_device("current_temperature", 0) + self._lewis.backdoor_run_function_on_device("set_current_temperature", [sensors[0], 0]) self.ca.assert_that_pv_is("A01:RAMPON", "ON") @@ -311,31 +313,31 @@ def test_WHEN_eurotherm_missing_THEN_updates_of_PVs_stop(self): @parameterized.expand(parameterized_list(PID_TEST_VALUES)) @skip_if_recsim("Backdoor not available in recsim") def test_WHEN_p_set_via_backdoor_THEN_p_updates(self, _, val): - self._lewis.backdoor_set_on_device("p", val) + self._lewis.backdoor_run_function_on_device("set_p", [sensors[0], val]) self.ca.assert_that_pv_is_number("A01:P", val, tolerance=0.05, timeout=15) @parameterized.expand(parameterized_list(PID_TEST_VALUES)) @skip_if_recsim("Backdoor not available in recsim") def test_WHEN_i_set_via_backdoor_THEN_i_updates(self, _, val): - self._lewis.backdoor_set_on_device("i", val) + self._lewis.backdoor_run_function_on_device("set_i", [sensors[0], val]) self.ca.assert_that_pv_is_number("A01:I", val, tolerance=0.05, timeout=15) @parameterized.expand(parameterized_list(PID_TEST_VALUES)) @skip_if_recsim("Backdoor not available in recsim") def test_WHEN_d_set_via_backdoor_THEN_d_updates(self, _, val): - self._lewis.backdoor_set_on_device("d", val) + self._lewis.backdoor_run_function_on_device("set_d", [sensors[0], val]) self.ca.assert_that_pv_is_number("A01:D", val, tolerance=0.05, timeout=15) @parameterized.expand(parameterized_list(TEST_VALUES)) @skip_if_recsim("Backdoor not available in recsim") def test_WHEN_output_set_via_backdoor_THEN_output_updates(self, _, val): - self._lewis.backdoor_set_on_device("output", val) + self._lewis.backdoor_run_function_on_device("set_output", [sensors[0], val]) self.ca.assert_that_pv_is_number("A01:OUTPUT", val, tolerance=0.05, timeout=15) @parameterized.expand(parameterized_list(TEST_VALUES)) @skip_if_recsim("Backdoor not available in recsim") def test_WHEN_output_set_via_backdoor_THEN_output_updates(self, _, val): - self._lewis.backdoor_set_on_device("max_output", val) + self._lewis.backdoor_run_function_on_device("set_max_output", [sensors[0], val]) self.ca.assert_that_pv_is_number("A01:MAX_OUTPUT", val, tolerance=0.05, timeout=15) @parameterized.expand(parameterized_list([0, 100, 3276])) @@ -347,11 +349,11 @@ def test_WHEN_output_rate_set_THEN_output_rate_updates(self, _, val): @parameterized.expand(parameterized_list(TEST_VALUES)) @skip_if_recsim("Backdoor not available in recsim") def test_WHEN_high_limit_set_via_backdoor_THEN_high_lim_updates(self, _, val): - self._lewis.backdoor_set_on_device("high_lim", val) + self._lewis.backdoor_run_function_on_device("set_high_lim", [sensors[0], val]) self.ca.assert_that_pv_is_number("A01:HILIM", val, tolerance=0.05, timeout=15) @parameterized.expand(parameterized_list(TEST_VALUES)) @skip_if_recsim("Backdoor not available in recsim") def test_WHEN_low_limit_set_via_backdoor_THEN_low_lim_updates(self, _, val): - self._lewis.backdoor_set_on_device("low_lim", val) + self._lewis.backdoor_run_function_on_device("set_low_lim", [sensors[0], val]) self.ca.assert_that_pv_is_number("A01:LOWLIM", val, tolerance=0.05, timeout=15) From b01c6f379ac4c3bf7337d2e22dabcf9137bbdd88 Mon Sep 17 00:00:00 2001 From: esmithExperimentControls Date: Wed, 30 Oct 2024 13:59:18 +0000 Subject: [PATCH 04/21] refactoring --- common_tests/eurotherm.py | 13 ++++++++----- utils/emulator_launcher.py | 11 +++++++++++ 2 files changed, 19 insertions(+), 5 deletions(-) diff --git a/common_tests/eurotherm.py b/common_tests/eurotherm.py index e06a28d4..8f07e966 100644 --- a/common_tests/eurotherm.py +++ b/common_tests/eurotherm.py @@ -1,6 +1,7 @@ import abc import os import time +import contextlib from parameterized import parameterized @@ -21,6 +22,8 @@ sensors = ["0011", "0022", "0033", "0044", "0055", "0066"] +PV_sensors = ["A01", "A02", "A03", "A04", "A05", "A06"] + class EurothermBaseTests(metaclass=abc.ABCMeta): """ Tests for the Eurotherm temperature controller. @@ -56,9 +59,9 @@ def _setup_lewis_and_channel_access(self): self.ca.assert_that_pv_exists("A01:RBV", timeout=30) self.ca.assert_that_pv_exists("A01:CAL:SEL", timeout=10) - def _reset_device_state(self): - for sensor in sensors: - self._lewis.backdoor_run_function_on_device("set_connected", [sensor , True]) + def _reset_device_state(self, sensor=PV_sensors[0]): + for item in sensors: + self._lewis.backdoor_run_function_on_device("set_connected", [item , True]) reset_calibration_file(self.ca, prefix=f"{'A01'}:") intial_temp = 0.0 @@ -74,7 +77,7 @@ def _reset_device_state(self): # Ensure the temperature isn't being changed by a ramp any more self.ca.assert_that_pv_value_is_unchanged(f"{sensor}:TEMP", 5) - def _set_setpoint_and_current_temperature(self, temperature, sensor="A01"): + def _set_setpoint_and_current_temperature(self, temperature, sensor=PV_sensors[0]): if IOCRegister.uses_rec_sim: self.ca.set_pv_value(f"{sensor}:SIM:TEMP:SP", temperature) self.ca.assert_that_pv_is(f"{sensor}:SIM:TEMP", temperature) @@ -307,7 +310,7 @@ def test_WHEN_disconnected_THEN_in_alarm(self, record): self.ca.assert_that_pv_alarm_is(record, ChannelAccess.Alarms.NONE, timeout=30) def test_WHEN_eurotherm_missing_THEN_updates_of_PVs_stop(self): - with self._lewis.backdoor_simulate_disconnected_device(): + with self._lewis.backdoor_simulate_disconnected_addr(): self.ca.assert_that_pv_value_is_unchanged("A01:RBV", 20) @parameterized.expand(parameterized_list(PID_TEST_VALUES)) diff --git a/utils/emulator_launcher.py b/utils/emulator_launcher.py index 5b3b3a46..815e94cf 100644 --- a/utils/emulator_launcher.py +++ b/utils/emulator_launcher.py @@ -480,6 +480,17 @@ def backdoor_simulate_disconnected_device( finally: self.backdoor_set_on_device(emulator_property, True) + @contextlib.contextmanager + def backdoor_simulate_disconnected_addr(self, emulator_property = "address"): + """ + Simulate device with disconnected ADDR, such as Eurotherm + """ + self.backdoor_run_function_on_device("set_connected", ['0011' , False]) + try: + yield + finally: + self.backdoor_run_function_on_device("set_connected", ['0011' , True]) + class NullEmulatorLauncher(EmulatorLauncher): """ From 2beeacdef913b9368ab1b73d6415580fbaaece04 Mon Sep 17 00:00:00 2001 From: esmithExperimentControls Date: Fri, 1 Nov 2024 14:06:46 +0000 Subject: [PATCH 05/21] Add several changes needed to work with new eurotherm emulator design --- common_tests/eurotherm.py | 10 +++++----- tests/eurotherm_eibisynch.py | 12 ++++++------ utils/emulator_launcher.py | 4 ++-- 3 files changed, 13 insertions(+), 13 deletions(-) diff --git a/common_tests/eurotherm.py b/common_tests/eurotherm.py index 8f07e966..296eaf24 100644 --- a/common_tests/eurotherm.py +++ b/common_tests/eurotherm.py @@ -20,7 +20,7 @@ # PIDs cannot be floating-point PID_TEST_VALUES = [-50, 50, 3000] -sensors = ["0011", "0022", "0033", "0044", "0055", "0066"] +sensors = ["01", "02", "03", "04", "05", "06"] PV_sensors = ["A01", "A02", "A03", "A04", "A05", "A06"] @@ -60,15 +60,15 @@ def _setup_lewis_and_channel_access(self): self.ca.assert_that_pv_exists("A01:CAL:SEL", timeout=10) def _reset_device_state(self, sensor=PV_sensors[0]): - for item in sensors: - self._lewis.backdoor_run_function_on_device("set_connected", [item , True]) + for address in sensors: + self._lewis.backdoor_run_function_on_device("set_connected", [address , True]) reset_calibration_file(self.ca, prefix=f"{'A01'}:") intial_temp = 0.0 self._set_setpoint_and_current_temperature(intial_temp) - self._lewis.backdoor_run_function_on_device("set_ramping_on", [sensors[0]]) + self._lewis.backdoor_run_function_on_device("set_ramping_on", [sensors[0], True]) self._lewis.backdoor_run_function_on_device("set_ramp_rate", [sensors[0], 1.0]) self.ca.set_pv_value(f"{sensor}:RAMPON:SP", 0, sleep_after_set=0) @@ -303,7 +303,7 @@ def test_GIVEN_None_txt_calibration_file_WHEN_temperature_is_set_THEN( ) def test_WHEN_disconnected_THEN_in_alarm(self, record): self.ca.assert_that_pv_alarm_is(record, ChannelAccess.Alarms.NONE) - with self._lewis.backdoor_simulate_disconnected_device(): + with self._lewis.backdoor_simulate_disconnected_addr(): self.ca.assert_that_pv_alarm_is(record, ChannelAccess.Alarms.INVALID, timeout=60) # Assert alarms clear on reconnection with self._get_temperature_setter_wrapper(): diff --git a/tests/eurotherm_eibisynch.py b/tests/eurotherm_eibisynch.py index 8f09a37e..21faf329 100644 --- a/tests/eurotherm_eibisynch.py +++ b/tests/eurotherm_eibisynch.py @@ -45,7 +45,7 @@ class EurothermTests(EurothermBaseTests, unittest.TestCase): def setUp(self): super(EurothermTests, self).setUp() - self._lewis.backdoor_set_on_device("scaling", 1.0 / float(SCALING)) + self._lewis.backdoor_run_function_on_device("set_scaling", ["01", 1.0]) def get_device(self): return DEVICE @@ -115,20 +115,20 @@ def test_GIVEN_None_txt_calibration_file_WHEN_changed_to_C006_txt_calibration_fi def test_GIVEN_simulated_delay_WHEN_temperature_read_from_multiple_sensors_THEN_all_reads_correct( self, ): - self._lewis.backdoor_set_on_device("delay_time", 300 / 1000) + self._lewis.backdoor_run_function_on_device("set_delay_time", ["01", (300 / 1000)]) for temp in range(1, 10): - self._lewis.backdoor_set_on_device("current_temperature", float(temp)) + self._lewis.backdoor_run_function_on_device("set_current_temperature", ["01", float(temp)]) for id in range(1, 7): sensor = f"A{id:02}" self.ca.assert_that_pv_is(f"{sensor}:RBV", float(temp)) - self._lewis.backdoor_set_on_device("delay_time", None) + self._lewis.backdoor_run_function_on_device("set_delay_time", [None, None]) def test_GIVEN_simulated_delay_WHEN_temperature_set_on_multiple_sensors_THEN_all_reads_correct( self, ): - self._lewis.backdoor_set_on_device("delay_time", 300 / 1000) + self._lewis.backdoor_run_function_on_device("set_delay_time", ["01", (300 / 1000)]) for id in range(1, 7): sensor = f"A{id:02}" self._reset_device_state(sensor=sensor) @@ -138,4 +138,4 @@ def test_GIVEN_simulated_delay_WHEN_temperature_set_on_multiple_sensors_THEN_all self._set_setpoint_and_current_temperature(float(temp), sensor=sensor) self.ca.assert_that_pv_is(f"{sensor}:RBV", float(temp), timeout=30.0) - self._lewis.backdoor_set_on_device("delay_time", None) + self._lewis.backdoor_run_function_on_device("set_delay_time", [None, None]) diff --git a/utils/emulator_launcher.py b/utils/emulator_launcher.py index 815e94cf..e8006ea3 100644 --- a/utils/emulator_launcher.py +++ b/utils/emulator_launcher.py @@ -485,11 +485,11 @@ def backdoor_simulate_disconnected_addr(self, emulator_property = "address"): """ Simulate device with disconnected ADDR, such as Eurotherm """ - self.backdoor_run_function_on_device("set_connected", ['0011' , False]) + self.backdoor_run_function_on_device("set_connected", ['01' , False]) try: yield finally: - self.backdoor_run_function_on_device("set_connected", ['0011' , True]) + self.backdoor_run_function_on_device("set_connected", ['01' , True]) class NullEmulatorLauncher(EmulatorLauncher): From b7b11715f1d9e11304e995450a50b347162e8041 Mon Sep 17 00:00:00 2001 From: esmithExperimentControls Date: Mon, 4 Nov 2024 13:58:06 +0000 Subject: [PATCH 06/21] Reformat simulated delay tests to work with new emulator structure --- tests/eurotherm_eibisynch.py | 20 +++++++++----------- 1 file changed, 9 insertions(+), 11 deletions(-) diff --git a/tests/eurotherm_eibisynch.py b/tests/eurotherm_eibisynch.py index 21faf329..670de415 100644 --- a/tests/eurotherm_eibisynch.py +++ b/tests/eurotherm_eibisynch.py @@ -116,14 +116,15 @@ def test_GIVEN_simulated_delay_WHEN_temperature_read_from_multiple_sensors_THEN_ self, ): self._lewis.backdoor_run_function_on_device("set_delay_time", ["01", (300 / 1000)]) + for id in range(1, 7): + PV_sensor = f"A{id:02}" + sensor = f"{id:02}" - for temp in range(1, 10): - self._lewis.backdoor_run_function_on_device("set_current_temperature", ["01", float(temp)]) - for id in range(1, 7): - sensor = f"A{id:02}" - self.ca.assert_that_pv_is(f"{sensor}:RBV", float(temp)) + for temp in range(1, 3): + self._lewis.backdoor_run_function_on_device("set_current_temperature", [sensor, float(temp)]) + self.ca.assert_that_pv_is(f"{PV_sensor}:RBV", float(temp)) - self._lewis.backdoor_run_function_on_device("set_delay_time", [None, None]) + self._lewis.backdoor_run_function_on_device("set_delay_time", [None, 0.0]) def test_GIVEN_simulated_delay_WHEN_temperature_set_on_multiple_sensors_THEN_all_reads_correct( self, @@ -131,11 +132,8 @@ def test_GIVEN_simulated_delay_WHEN_temperature_set_on_multiple_sensors_THEN_all self._lewis.backdoor_run_function_on_device("set_delay_time", ["01", (300 / 1000)]) for id in range(1, 7): sensor = f"A{id:02}" - self._reset_device_state(sensor=sensor) - for id in range(1, 7): - sensor = f"A{id:02}" - for temp in range(1, 5): + for temp in range(1, 3): self._set_setpoint_and_current_temperature(float(temp), sensor=sensor) self.ca.assert_that_pv_is(f"{sensor}:RBV", float(temp), timeout=30.0) - self._lewis.backdoor_run_function_on_device("set_delay_time", [None, None]) + self._lewis.backdoor_run_function_on_device("set_delay_time", [None, 0.0]) From 92fa1462fe3c3b445e5390b9a423a04a9623aae0 Mon Sep 17 00:00:00 2001 From: esmithExperimentControls Date: Mon, 4 Nov 2024 13:58:54 +0000 Subject: [PATCH 07/21] Change reset_device and set_setpoint_and current temp to only run for specified sensor, unless none is given --- common_tests/eurotherm.py | 30 ++++++++++++++++-------------- 1 file changed, 16 insertions(+), 14 deletions(-) diff --git a/common_tests/eurotherm.py b/common_tests/eurotherm.py index 296eaf24..1bd7ed9c 100644 --- a/common_tests/eurotherm.py +++ b/common_tests/eurotherm.py @@ -60,9 +60,9 @@ def _setup_lewis_and_channel_access(self): self.ca.assert_that_pv_exists("A01:CAL:SEL", timeout=10) def _reset_device_state(self, sensor=PV_sensors[0]): - for address in sensors: - self._lewis.backdoor_run_function_on_device("set_connected", [address , True]) - reset_calibration_file(self.ca, prefix=f"{'A01'}:") + i = PV_sensors.index(sensor) + self._lewis.backdoor_run_function_on_device("set_connected", [sensors[i], True]) + reset_calibration_file(self.ca, prefix=f"{sensor}:") intial_temp = 0.0 @@ -74,8 +74,8 @@ def _reset_device_state(self, sensor=PV_sensors[0]): self._set_setpoint_and_current_temperature(intial_temp) self.ca.assert_that_pv_is(f"{sensor}:TEMP", intial_temp) - # Ensure the temperature isn't being changed by a ramp any more - self.ca.assert_that_pv_value_is_unchanged(f"{sensor}:TEMP", 5) + #Ensure the temperature isn't being changed by a ramp any more + self.ca.assert_that_pv_value_is_unchanged(f"{sensor}:TEMP", wait=3) def _set_setpoint_and_current_temperature(self, temperature, sensor=PV_sensors[0]): if IOCRegister.uses_rec_sim: @@ -84,9 +84,10 @@ def _set_setpoint_and_current_temperature(self, temperature, sensor=PV_sensors[0 self.ca.assert_that_pv_is(f"{sensor}:SIM:TEMP:SP", temperature) self.ca.assert_that_pv_is(f"{sensor}:SIM:TEMP:SP:RBV", temperature) else: - self._lewis.backdoor_run_function_on_device("set_current_temperature", [sensors[0], temperature]) + i = PV_sensors.index(sensor) + self._lewis.backdoor_run_function_on_device("set_current_temperature", [sensors[i], temperature]) self.ca.assert_that_pv_is_number(f"{sensor}:TEMP", temperature, 0.1, timeout=30) - self._lewis.backdoor_run_function_on_device("set_ramp_setpoint_temperature", [sensors[0], temperature]) + self._lewis.backdoor_run_function_on_device("set_ramp_setpoint_temperature", [sensors[i], temperature]) self.ca.assert_that_pv_is_number(f"{sensor}:TEMP:SP:RBV", temperature, 0.1, timeout=30) def test_WHEN_read_rbv_temperature_THEN_rbv_value_is_same_as_backdoor(self): @@ -289,8 +290,8 @@ def test_GIVEN_None_txt_calibration_file_WHEN_temperature_is_set_THEN( self.ca.assert_that_pv_is("A01:TEMP:RANGE:UNDER.A", temperature) self.ca.assert_that_pv_is("A01:TEMP:RANGE:UNDER", expected_value_of_under_range_calc_pv) - @parameterized.expand( - [ + def test_WHEN_disconnected_THEN_in_alarm(self): + records = [ "A01:TEMP", "A01:TEMP:SP:RBV", "A01:P", @@ -300,14 +301,15 @@ def test_GIVEN_None_txt_calibration_file_WHEN_temperature_is_set_THEN( "A01:MAX_OUTPUT", "A01:LOWLIM", ] - ) - def test_WHEN_disconnected_THEN_in_alarm(self, record): - self.ca.assert_that_pv_alarm_is(record, ChannelAccess.Alarms.NONE) + for record in records: + self.ca.assert_that_pv_alarm_is(record, ChannelAccess.Alarms.NONE) with self._lewis.backdoor_simulate_disconnected_addr(): - self.ca.assert_that_pv_alarm_is(record, ChannelAccess.Alarms.INVALID, timeout=60) + for record in records: + self.ca.assert_that_pv_alarm_is(record, ChannelAccess.Alarms.INVALID, timeout=60) # Assert alarms clear on reconnection with self._get_temperature_setter_wrapper(): - self.ca.assert_that_pv_alarm_is(record, ChannelAccess.Alarms.NONE, timeout=30) + for record in records: + self.ca.assert_that_pv_alarm_is(record, ChannelAccess.Alarms.NONE, timeout=30) def test_WHEN_eurotherm_missing_THEN_updates_of_PVs_stop(self): with self._lewis.backdoor_simulate_disconnected_addr(): From 2c06ed81f9df3e1fd2978ff6358116e436e8a30f Mon Sep 17 00:00:00 2001 From: esmithExperimentControls Date: Mon, 4 Nov 2024 15:51:14 +0000 Subject: [PATCH 08/21] add sensor list, change backdoor set to backdoor run function --- tests/eurotherm_modbus.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/tests/eurotherm_modbus.py b/tests/eurotherm_modbus.py index 0a0b2d1f..339ebb0b 100644 --- a/tests/eurotherm_modbus.py +++ b/tests/eurotherm_modbus.py @@ -45,11 +45,13 @@ TEST_MODES = [TestModes.DEVSIM] +sensors = ["01", "02", "03", "04", "05", "06"] + class EurothermModbusTests(EurothermBaseTests, unittest.TestCase): def setUp(self): super(EurothermModbusTests, self).setUp() - self._lewis.backdoor_set_on_device("scaling", 1.0 / float(SCALING)) + self._lewis.backdoor_run_function_on_device("set_scaling", [sensors[0], 1.0 / float(SCALING)]) def get_device(self): return DEVICE From df795bcbfe0be9921ab1c001e481cc32750f1199 Mon Sep 17 00:00:00 2001 From: esmithExperimentControls Date: Mon, 4 Nov 2024 15:52:46 +0000 Subject: [PATCH 09/21] Change backdoor set to backdoor run function, and add sensors list --- tests/eurotherm_modbus_needlevalve.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/tests/eurotherm_modbus_needlevalve.py b/tests/eurotherm_modbus_needlevalve.py index 0497dbb8..10dd7c76 100644 --- a/tests/eurotherm_modbus_needlevalve.py +++ b/tests/eurotherm_modbus_needlevalve.py @@ -59,11 +59,12 @@ TEST_MODES = [TestModes.DEVSIM] +sensors = ["01", "02", "03", "04", "05", "06"] class EurothermModbusNeedleValveTests(EurothermBaseTests, unittest.TestCase): def setUp(self): super(EurothermModbusNeedleValveTests, self).setUp() - self._lewis.backdoor_set_on_device("scaling", 1.0 / float(SCALING)) + self._lewis.backdoor_run_function_on_device("set_scaling", [sensors[0], 1.0 / float(SCALING)]) with ManagerMode(ChannelAccess()): self.ca.set_pv_value("A01:FLOW_SP_MODE_SELECT:SP", "AUTO") @@ -82,12 +83,12 @@ def _get_temperature_setter_wrapper(self): # READ TESTS @skip_if_recsim("Backdoor not available in recsim") def test_WHEN_using_needle_valve_THEN_flow_exists(self): - self._lewis.backdoor_set_on_device("needlevalve_flow", 5.0) + self._lewis.backdoor_run_function_on_device("set_needlevalve_flow", [sensors[0], 5.0]) self.ca.assert_that_pv_is_number("A01:FLOW", 5.0, tolerance=0, timeout=15) @skip_if_recsim("Backdoor not available in recsim") def test_WHEN_using_needle_valve_THEN_valve_dir_exists(self): - self._lewis.backdoor_set_on_device("needlevalve_direction", 1) + self._lewis.backdoor_run_function_on_device("set_needlevalve_direction", [sensors[0], 1]) self.ca.assert_that_pv_is("A01:VALVE_DIR", "OPENING", timeout=15) # WRITE TESTS From 2cb3644fd10d5a95d66bd7c27a48e63149cf1c3d Mon Sep 17 00:00:00 2001 From: esmithExperimentControls Date: Tue, 5 Nov 2024 17:11:13 +0000 Subject: [PATCH 10/21] Add test to check that one sensor can disconnect, and another can still read --- tests/eurotherm_eibisynch.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/tests/eurotherm_eibisynch.py b/tests/eurotherm_eibisynch.py index 670de415..5e33244f 100644 --- a/tests/eurotherm_eibisynch.py +++ b/tests/eurotherm_eibisynch.py @@ -137,3 +137,15 @@ def test_GIVEN_simulated_delay_WHEN_temperature_set_on_multiple_sensors_THEN_all self.ca.assert_that_pv_is(f"{sensor}:RBV", float(temp), timeout=30.0) self._lewis.backdoor_run_function_on_device("set_delay_time", [None, 0.0]) + + def test_GIVEN_multiple_sensors_WHEN_one_sensor_disconnected_THEN_other_sensors_can_still_read(self, ): + # Ensure that A02 is connected + self._lewis.backdoor_run_function_on_device("set_connected", ['02', True]) + + # Disconnect A01 and check PV for A01 is in alarm, and PV for A02 is not: + with self._lewis.backdoor_simulate_disconnected_addr(): + self.ca.assert_that_pv_alarm_is("A01:TEMP", self.ca.Alarms.INVALID, timeout=30) + self.ca.assert_that_pv_alarm_is_not("A02:TEMP", self.ca.Alarms.INVALID, timeout=30) + + # Ensure that after backdoor_run_function, the PV for A01 gets reconnected: + self.ca.assert_that_pv_alarm_is("A01:TEMP", self.ca.Alarms.NONE, timeout=30) \ No newline at end of file From cb3e8581ab9ae6599b59685dfd14faefbbb871d2 Mon Sep 17 00:00:00 2001 From: esmithExperimentControls Date: Thu, 7 Nov 2024 08:45:08 +0000 Subject: [PATCH 11/21] made sensors and pv_sensors constants, and import contextmanager, added return type hints to methods to satisfy ruff fixes --- common_tests/eurotherm.py | 61 ++++++++++++++++++++------------------- 1 file changed, 31 insertions(+), 30 deletions(-) diff --git a/common_tests/eurotherm.py b/common_tests/eurotherm.py index 1bd7ed9c..c14f8580 100644 --- a/common_tests/eurotherm.py +++ b/common_tests/eurotherm.py @@ -4,6 +4,7 @@ import contextlib from parameterized import parameterized +from typing import ContextManager from utils.calibration_utils import reset_calibration_file, use_calibration_file from utils.channel_access import ChannelAccess @@ -20,9 +21,9 @@ # PIDs cannot be floating-point PID_TEST_VALUES = [-50, 50, 3000] -sensors = ["01", "02", "03", "04", "05", "06"] +SENSORS = ["01", "02", "03", "04", "05", "06"] -PV_sensors = ["A01", "A02", "A03", "A04", "A05", "A06"] +PV_SENSORS = ["A01", "A02", "A03", "A04", "A05", "A06"] class EurothermBaseTests(metaclass=abc.ABCMeta): """ @@ -30,22 +31,22 @@ class EurothermBaseTests(metaclass=abc.ABCMeta): """ @abc.abstractmethod - def get_device(self): + def get_device(self) -> str: pass @abc.abstractmethod - def get_emulator_device(self): + def get_emulator_device(self) -> str: pass @abc.abstractmethod - def _get_temperature_setter_wrapper(self): + def _get_temperature_setter_wrapper(self) -> ContextManager: pass @abc.abstractmethod - def get_scaling(self): + def get_scaling(self) -> str: pass - def get_prefix(self): + def get_prefix(self) -> str: return "{}:A01".format(self.get_device()) def setUp(self): @@ -59,17 +60,17 @@ def _setup_lewis_and_channel_access(self): self.ca.assert_that_pv_exists("A01:RBV", timeout=30) self.ca.assert_that_pv_exists("A01:CAL:SEL", timeout=10) - def _reset_device_state(self, sensor=PV_sensors[0]): - i = PV_sensors.index(sensor) - self._lewis.backdoor_run_function_on_device("set_connected", [sensors[i], True]) + def _reset_device_state(self, sensor=PV_SENSORS[0]): + i = PV_SENSORS.index(sensor) + self._lewis.backdoor_run_function_on_device("set_connected", [SENSORS[i], True]) reset_calibration_file(self.ca, prefix=f"{sensor}:") intial_temp = 0.0 self._set_setpoint_and_current_temperature(intial_temp) - self._lewis.backdoor_run_function_on_device("set_ramping_on", [sensors[0], True]) - self._lewis.backdoor_run_function_on_device("set_ramp_rate", [sensors[0], 1.0]) + self._lewis.backdoor_run_function_on_device("set_ramping_on", [SENSORS[0], True]) + self._lewis.backdoor_run_function_on_device("set_ramp_rate", [SENSORS[0], 1.0]) self.ca.set_pv_value(f"{sensor}:RAMPON:SP", 0, sleep_after_set=0) self._set_setpoint_and_current_temperature(intial_temp) @@ -77,17 +78,17 @@ def _reset_device_state(self, sensor=PV_sensors[0]): #Ensure the temperature isn't being changed by a ramp any more self.ca.assert_that_pv_value_is_unchanged(f"{sensor}:TEMP", wait=3) - def _set_setpoint_and_current_temperature(self, temperature, sensor=PV_sensors[0]): + def _set_setpoint_and_current_temperature(self, temperature, sensor=PV_SENSORS[0]): if IOCRegister.uses_rec_sim: self.ca.set_pv_value(f"{sensor}:SIM:TEMP:SP", temperature) self.ca.assert_that_pv_is(f"{sensor}:SIM:TEMP", temperature) self.ca.assert_that_pv_is(f"{sensor}:SIM:TEMP:SP", temperature) self.ca.assert_that_pv_is(f"{sensor}:SIM:TEMP:SP:RBV", temperature) else: - i = PV_sensors.index(sensor) - self._lewis.backdoor_run_function_on_device("set_current_temperature", [sensors[i], temperature]) + i = PV_SENSORS.index(sensor) + self._lewis.backdoor_run_function_on_device("set_current_temperature", [SENSORS[i], temperature]) self.ca.assert_that_pv_is_number(f"{sensor}:TEMP", temperature, 0.1, timeout=30) - self._lewis.backdoor_run_function_on_device("set_ramp_setpoint_temperature", [sensors[i], temperature]) + self._lewis.backdoor_run_function_on_device("set_ramp_setpoint_temperature", [SENSORS[i], temperature]) self.ca.assert_that_pv_is_number(f"{sensor}:TEMP:SP:RBV", temperature, 0.1, timeout=30) def test_WHEN_read_rbv_temperature_THEN_rbv_value_is_same_as_backdoor(self): @@ -127,14 +128,14 @@ def test_WHEN_set_ramp_rate_in_K_per_min_THEN_current_temperature_reaches_set_po ) # Lower tolerance will be too tight given scan rate def test_WHEN_sensor_disconnected_THEN_ramp_setting_is_disabled(self): - self._lewis.backdoor_run_function_on_device("set_current_temperature", [sensors[0], SENSOR_DISCONNECTED_VALUE]) + self._lewis.backdoor_run_function_on_device("set_current_temperature", [SENSORS[0], SENSOR_DISCONNECTED_VALUE]) self.ca.assert_that_pv_is_number("A01:RAMPON:SP.DISP", 1) def test_GIVEN_sensor_disconnected_WHEN_sensor_reconnected_THEN_ramp_setting_is_enabled(self): - self._lewis.backdoor_run_function_on_device("set_current_temperature", [sensors[0], SENSOR_DISCONNECTED_VALUE]) + self._lewis.backdoor_run_function_on_device("set_current_temperature", [SENSORS[0], SENSOR_DISCONNECTED_VALUE]) - self._lewis.backdoor_run_function_on_device("set_current_temperature", [sensors[0], 0]) + self._lewis.backdoor_run_function_on_device("set_current_temperature", [SENSORS[0], 0]) self.ca.assert_that_pv_is_number("A01:RAMPON:SP.DISP", 0) @@ -143,7 +144,7 @@ def test_GIVEN_ramp_was_off_WHEN_sensor_disconnected_THEN_ramp_is_off_and_cached ): self.ca.set_pv_value("A01:RAMPON:SP", 0) - self._lewis.backdoor_run_function_on_device("set_current_temperature", [sensors[0], SENSOR_DISCONNECTED_VALUE]) + self._lewis.backdoor_run_function_on_device("set_current_temperature", [SENSORS[0], SENSOR_DISCONNECTED_VALUE]) self.ca.assert_that_pv_is("A01:RAMPON", "OFF") self.ca.assert_that_pv_is("A01:RAMPON:CACHE", "OFF") @@ -153,7 +154,7 @@ def test_GIVEN_ramp_was_on_WHEN_sensor_disconnected_THEN_ramp_is_off_and_cached_ ): self.ca.set_pv_value("A01:RAMPON:SP", 1) - self._lewis.backdoor_run_function_on_device("set_current_temperature", [sensors[0], SENSOR_DISCONNECTED_VALUE]) + self._lewis.backdoor_run_function_on_device("set_current_temperature", [SENSORS[0], SENSOR_DISCONNECTED_VALUE]) self.ca.assert_that_pv_is("A01:RAMPON", "OFF") self.ca.assert_that_pv_is("A01:RAMPON:CACHE", "ON") @@ -161,9 +162,9 @@ def test_GIVEN_ramp_was_on_WHEN_sensor_disconnected_THEN_ramp_is_off_and_cached_ def test_GIVEN_ramp_was_on_WHEN_sensor_disconnected_and_reconnected_THEN_ramp_is_on(self): self.ca.set_pv_value("A01:RAMPON:SP", 1) - self._lewis.backdoor_run_function_on_device("set_current_temperature", [sensors[0], SENSOR_DISCONNECTED_VALUE]) + self._lewis.backdoor_run_function_on_device("set_current_temperature", [SENSORS[0], SENSOR_DISCONNECTED_VALUE]) self.ca.assert_that_pv_is("A01:RAMPON", "OFF") - self._lewis.backdoor_run_function_on_device("set_current_temperature", [sensors[0], 0]) + self._lewis.backdoor_run_function_on_device("set_current_temperature", [SENSORS[0], 0]) self.ca.assert_that_pv_is("A01:RAMPON", "ON") @@ -318,31 +319,31 @@ def test_WHEN_eurotherm_missing_THEN_updates_of_PVs_stop(self): @parameterized.expand(parameterized_list(PID_TEST_VALUES)) @skip_if_recsim("Backdoor not available in recsim") def test_WHEN_p_set_via_backdoor_THEN_p_updates(self, _, val): - self._lewis.backdoor_run_function_on_device("set_p", [sensors[0], val]) + self._lewis.backdoor_run_function_on_device("set_p", [SENSORS[0], val]) self.ca.assert_that_pv_is_number("A01:P", val, tolerance=0.05, timeout=15) @parameterized.expand(parameterized_list(PID_TEST_VALUES)) @skip_if_recsim("Backdoor not available in recsim") def test_WHEN_i_set_via_backdoor_THEN_i_updates(self, _, val): - self._lewis.backdoor_run_function_on_device("set_i", [sensors[0], val]) + self._lewis.backdoor_run_function_on_device("set_i", [SENSORS[0], val]) self.ca.assert_that_pv_is_number("A01:I", val, tolerance=0.05, timeout=15) @parameterized.expand(parameterized_list(PID_TEST_VALUES)) @skip_if_recsim("Backdoor not available in recsim") def test_WHEN_d_set_via_backdoor_THEN_d_updates(self, _, val): - self._lewis.backdoor_run_function_on_device("set_d", [sensors[0], val]) + self._lewis.backdoor_run_function_on_device("set_d", [SENSORS[0], val]) self.ca.assert_that_pv_is_number("A01:D", val, tolerance=0.05, timeout=15) @parameterized.expand(parameterized_list(TEST_VALUES)) @skip_if_recsim("Backdoor not available in recsim") def test_WHEN_output_set_via_backdoor_THEN_output_updates(self, _, val): - self._lewis.backdoor_run_function_on_device("set_output", [sensors[0], val]) + self._lewis.backdoor_run_function_on_device("set_output", [SENSORS[0], val]) self.ca.assert_that_pv_is_number("A01:OUTPUT", val, tolerance=0.05, timeout=15) @parameterized.expand(parameterized_list(TEST_VALUES)) @skip_if_recsim("Backdoor not available in recsim") def test_WHEN_output_set_via_backdoor_THEN_output_updates(self, _, val): - self._lewis.backdoor_run_function_on_device("set_max_output", [sensors[0], val]) + self._lewis.backdoor_run_function_on_device("set_max_output", [SENSORS[0], val]) self.ca.assert_that_pv_is_number("A01:MAX_OUTPUT", val, tolerance=0.05, timeout=15) @parameterized.expand(parameterized_list([0, 100, 3276])) @@ -354,11 +355,11 @@ def test_WHEN_output_rate_set_THEN_output_rate_updates(self, _, val): @parameterized.expand(parameterized_list(TEST_VALUES)) @skip_if_recsim("Backdoor not available in recsim") def test_WHEN_high_limit_set_via_backdoor_THEN_high_lim_updates(self, _, val): - self._lewis.backdoor_run_function_on_device("set_high_lim", [sensors[0], val]) + self._lewis.backdoor_run_function_on_device("set_high_lim", [SENSORS[0], val]) self.ca.assert_that_pv_is_number("A01:HILIM", val, tolerance=0.05, timeout=15) @parameterized.expand(parameterized_list(TEST_VALUES)) @skip_if_recsim("Backdoor not available in recsim") def test_WHEN_low_limit_set_via_backdoor_THEN_low_lim_updates(self, _, val): - self._lewis.backdoor_run_function_on_device("set_low_lim", [sensors[0], val]) + self._lewis.backdoor_run_function_on_device("set_low_lim", [SENSORS[0], val]) self.ca.assert_that_pv_is_number("A01:LOWLIM", val, tolerance=0.05, timeout=15) From b8f2e2d739241b8d383728d29d0f7abba39e6503 Mon Sep 17 00:00:00 2001 From: esmithExperimentControls Date: Thu, 7 Nov 2024 08:46:11 +0000 Subject: [PATCH 12/21] various fixes to satisfy ruff and pyright --- tests/eurotherm_eibisynch.py | 48 +++++++++++++++++------------------- 1 file changed, 23 insertions(+), 25 deletions(-) diff --git a/tests/eurotherm_eibisynch.py b/tests/eurotherm_eibisynch.py index 5e33244f..9f98cd95 100644 --- a/tests/eurotherm_eibisynch.py +++ b/tests/eurotherm_eibisynch.py @@ -2,6 +2,7 @@ import unittest from parameterized import parameterized +from utils.emulator_launcher import LewisLauncher from common_tests.eurotherm import ( NONE_TXT_CALIBRATION_MAX_TEMPERATURE, @@ -15,7 +16,7 @@ DEVICE = "EUROTHRM_01" EMULATOR = "eurotherm" SCALING = "1.0" - +TEST_MODES = [TestModes.DEVSIM] IOCS = [ { "name": DEVICE, @@ -39,13 +40,11 @@ ] -TEST_MODES = [TestModes.DEVSIM] - - class EurothermTests(EurothermBaseTests, unittest.TestCase): def setUp(self): super(EurothermTests, self).setUp() self._lewis.backdoor_run_function_on_device("set_scaling", ["01", 1.0]) + self._lewis:LewisLauncher def get_device(self): return DEVICE @@ -67,8 +66,8 @@ def _get_temperature_setter_wrapper(self): ] ) def test_GIVEN_None_txt_calibration_file_WHEN_temperature_is_set_THEN( - self, _, temperature, expected_value_of_over_range_calc_pv - ): + self, _: str, temperature: float, expected_value_of_over_range_calc_pv: float + ) -> None: """ Note: this test can only run on BISYNCH eurotherms, modbus max temperature is 6553.5 but ramp file goes up to 10,000 and this test attempts to check this behaviour @@ -87,15 +86,16 @@ def test_GIVEN_None_txt_calibration_file_WHEN_temperature_is_set_THEN( self.ca.assert_that_pv_is("A01:TEMP:RANGE:OVER.A", temperature) self.ca.assert_that_pv_is("A01:TEMP:RANGE:OVER", expected_value_of_over_range_calc_pv) - def test_GIVEN_None_txt_calibration_file_WHEN_changed_to_C006_txt_calibration_file_THEN_the_calibration_limits_change( + def test_GIVEN_None_txt_calib_file_WHEN_changed_to_C006_calib_file_THEN_the_calib_limits_change( self, - ): + ) -> None: """ - Note: this test can only run on BISYNCH eurotherms, modbus max temperature is 6553.5 but ramp file goes up + Note: this test can only run on BISYNCH eurotherms, + modbus max temperature is 6553.5 but ramp file goes up to 10,000 and this test attempts to check this behaviour """ - C006_CALIBRATION_FILE_MAX = 330.26135292267900000000 - C006_CALIBRATION_FILE_MIN = 1.20927230303971000000 + c006_calibration_file_max = 330.26135292267900000000 + c006_calibration_file_min = 1.20927230303971000000 # Arrange self._assert_using_mock_table_location() @@ -109,26 +109,24 @@ def test_GIVEN_None_txt_calibration_file_WHEN_changed_to_C006_txt_calibration_fi # Act: with use_calibration_file(self.ca, "C006.txt", prefix="A01:"): # Assert - self.ca.assert_that_pv_is("A01:TEMP:RANGE:OVER.B", C006_CALIBRATION_FILE_MAX) - self.ca.assert_that_pv_is("A01:TEMP:RANGE:UNDER.B", C006_CALIBRATION_FILE_MIN) + self.ca.assert_that_pv_is("A01:TEMP:RANGE:OVER.B", c006_calibration_file_max) + self.ca.assert_that_pv_is("A01:TEMP:RANGE:UNDER.B", c006_calibration_file_min) - def test_GIVEN_simulated_delay_WHEN_temperature_read_from_multiple_sensors_THEN_all_reads_correct( - self, - ): + def test_GIVEN_sim_delay_WHEN_temp_read_from_many_sensors_THEN_all_reads_correct(self) -> None: self._lewis.backdoor_run_function_on_device("set_delay_time", ["01", (300 / 1000)]) for id in range(1, 7): - PV_sensor = f"A{id:02}" + pv_sensor = f"A{id:02}" sensor = f"{id:02}" for temp in range(1, 3): - self._lewis.backdoor_run_function_on_device("set_current_temperature", [sensor, float(temp)]) - self.ca.assert_that_pv_is(f"{PV_sensor}:RBV", float(temp)) + self._lewis.backdoor_run_function_on_device( + "set_current_temperature", [sensor, float(temp)] + ) + self.ca.assert_that_pv_is(f"{pv_sensor}:RBV", float(temp)) self._lewis.backdoor_run_function_on_device("set_delay_time", [None, 0.0]) - def test_GIVEN_simulated_delay_WHEN_temperature_set_on_multiple_sensors_THEN_all_reads_correct( - self, - ): + def test_GIVEN_sim_delay_WHEN_temp_set_on_multiple_sensors_THEN_all_reads_correct(self) -> None: self._lewis.backdoor_run_function_on_device("set_delay_time", ["01", (300 / 1000)]) for id in range(1, 7): sensor = f"A{id:02}" @@ -138,9 +136,9 @@ def test_GIVEN_simulated_delay_WHEN_temperature_set_on_multiple_sensors_THEN_all self._lewis.backdoor_run_function_on_device("set_delay_time", [None, 0.0]) - def test_GIVEN_multiple_sensors_WHEN_one_sensor_disconnected_THEN_other_sensors_can_still_read(self, ): + def test_GIVEN_many_sensors_WHEN_one_sensor_disconnect_THEN_other_sensors_read_ok(self) -> None: # Ensure that A02 is connected - self._lewis.backdoor_run_function_on_device("set_connected", ['02', True]) + self._lewis.backdoor_run_function_on_device("set_connected", ["02", True]) # Disconnect A01 and check PV for A01 is in alarm, and PV for A02 is not: with self._lewis.backdoor_simulate_disconnected_addr(): @@ -148,4 +146,4 @@ def test_GIVEN_multiple_sensors_WHEN_one_sensor_disconnected_THEN_other_sensors_ self.ca.assert_that_pv_alarm_is_not("A02:TEMP", self.ca.Alarms.INVALID, timeout=30) # Ensure that after backdoor_run_function, the PV for A01 gets reconnected: - self.ca.assert_that_pv_alarm_is("A01:TEMP", self.ca.Alarms.NONE, timeout=30) \ No newline at end of file + self.ca.assert_that_pv_alarm_is("A01:TEMP", self.ca.Alarms.NONE, timeout=30) From 7af68e8d6e90a77d69d9e4ffb4a9caa0b3a9abd3 Mon Sep 17 00:00:00 2001 From: esmithExperimentControls Date: Thu, 7 Nov 2024 09:00:48 +0000 Subject: [PATCH 13/21] Change name of method parameter to not have conflicts with eibisynch method of same name --- common_tests/eurotherm.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/common_tests/eurotherm.py b/common_tests/eurotherm.py index c14f8580..bb1162ee 100644 --- a/common_tests/eurotherm.py +++ b/common_tests/eurotherm.py @@ -272,7 +272,7 @@ def test_WHEN_config_file_and_temperature_unit_changed_THEN_then_ramp_rate_unit_ ] ) def test_GIVEN_None_txt_calibration_file_WHEN_temperature_is_set_THEN( - self, _, temperature, expected_value_of_under_range_calc_pv + self, _, temperature, exp_val ): # Arrange @@ -289,7 +289,7 @@ def test_GIVEN_None_txt_calibration_file_WHEN_temperature_is_set_THEN( # Assert self.ca.assert_that_pv_is("A01:TEMP:RANGE:UNDER.A", temperature) - self.ca.assert_that_pv_is("A01:TEMP:RANGE:UNDER", expected_value_of_under_range_calc_pv) + self.ca.assert_that_pv_is("A01:TEMP:RANGE:UNDER", expected_value=exp_val) def test_WHEN_disconnected_THEN_in_alarm(self): records = [ From ec53a6525ad204c94dd3c58f926f3e44692ddca1 Mon Sep 17 00:00:00 2001 From: esmithExperimentControls Date: Thu, 7 Nov 2024 09:01:48 +0000 Subject: [PATCH 14/21] Change name of method parameter to not conflict with method of same name in eurotherm tests --- tests/eurotherm_eibisynch.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/eurotherm_eibisynch.py b/tests/eurotherm_eibisynch.py index 9f98cd95..2b21195f 100644 --- a/tests/eurotherm_eibisynch.py +++ b/tests/eurotherm_eibisynch.py @@ -66,7 +66,7 @@ def _get_temperature_setter_wrapper(self): ] ) def test_GIVEN_None_txt_calibration_file_WHEN_temperature_is_set_THEN( - self, _: str, temperature: float, expected_value_of_over_range_calc_pv: float + self, _: str, temperature: float, exp_val: float ) -> None: """ Note: this test can only run on BISYNCH eurotherms, modbus max temperature is 6553.5 but ramp file goes up @@ -84,7 +84,7 @@ def test_GIVEN_None_txt_calibration_file_WHEN_temperature_is_set_THEN( # Assert self.ca.assert_that_pv_is("A01:TEMP:RANGE:OVER.A", temperature) - self.ca.assert_that_pv_is("A01:TEMP:RANGE:OVER", expected_value_of_over_range_calc_pv) + self.ca.assert_that_pv_is("A01:TEMP:RANGE:OVER", expected_value=exp_val) def test_GIVEN_None_txt_calib_file_WHEN_changed_to_C006_calib_file_THEN_the_calib_limits_change( self, From 0dba4afc388ba8557ea58f3a6e89bf3f0ef88386 Mon Sep 17 00:00:00 2001 From: esmithExperimentControls Date: Thu, 7 Nov 2024 09:02:37 +0000 Subject: [PATCH 15/21] Format fixes for ruff and pyright, add LewisLauncher to setUp to fix ruff error --- tests/eurotherm_modbus.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/tests/eurotherm_modbus.py b/tests/eurotherm_modbus.py index 339ebb0b..385ebc9d 100644 --- a/tests/eurotherm_modbus.py +++ b/tests/eurotherm_modbus.py @@ -4,6 +4,7 @@ from parameterized import parameterized from common_tests.eurotherm import PID_TEST_VALUES, EurothermBaseTests +from utils.emulator_launcher import LewisLauncher from utils.ioc_launcher import ProcServLauncher, get_default_ioc_dir from utils.test_modes import TestModes from utils.testing import parameterized_list @@ -19,7 +20,6 @@ "ioc_launcher_class": ProcServLauncher, "macros": { "COMMS_MODE": "modbus", - "NEEDLE_VALVE": "no", "ADDR_1": "01", "ADDR_2": "", "ADDR_3": "", @@ -51,7 +51,10 @@ class EurothermModbusTests(EurothermBaseTests, unittest.TestCase): def setUp(self): super(EurothermModbusTests, self).setUp() - self._lewis.backdoor_run_function_on_device("set_scaling", [sensors[0], 1.0 / float(SCALING)]) + self._lewis.backdoor_run_function_on_device( + "set_scaling", [sensors[0], 1.0 / float(SCALING)] + ) + self._lewis:LewisLauncher def get_device(self): return DEVICE From 0530b4ba0787c292eee8ba86d288e6cb9ff50039 Mon Sep 17 00:00:00 2001 From: esmithExperimentControls Date: Thu, 7 Nov 2024 09:08:12 +0000 Subject: [PATCH 16/21] Made sensors constant, some file reformatting for ruff, add LewisLauncher to setUp --- tests/eurotherm_modbus_needlevalve.py | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/tests/eurotherm_modbus_needlevalve.py b/tests/eurotherm_modbus_needlevalve.py index 10dd7c76..61ab88f5 100644 --- a/tests/eurotherm_modbus_needlevalve.py +++ b/tests/eurotherm_modbus_needlevalve.py @@ -5,6 +5,7 @@ from common_tests.eurotherm import EurothermBaseTests from utils.channel_access import ChannelAccess +from utils.emulator_launcher import LewisLauncher from utils.ioc_launcher import ProcServLauncher, get_default_ioc_dir from utils.test_modes import TestModes from utils.testing import ManagerMode, parameterized_list, skip_if_recsim @@ -59,14 +60,18 @@ TEST_MODES = [TestModes.DEVSIM] -sensors = ["01", "02", "03", "04", "05", "06"] +SENSORS = ["01", "02", "03", "04", "05", "06"] + class EurothermModbusNeedleValveTests(EurothermBaseTests, unittest.TestCase): def setUp(self): super(EurothermModbusNeedleValveTests, self).setUp() - self._lewis.backdoor_run_function_on_device("set_scaling", [sensors[0], 1.0 / float(SCALING)]) + self._lewis.backdoor_run_function_on_device( + "set_scaling", [SENSORS[0], 1.0 / float(SCALING)] + ) with ManagerMode(ChannelAccess()): self.ca.set_pv_value("A01:FLOW_SP_MODE_SELECT:SP", "AUTO") + self._lewis:LewisLauncher def get_device(self): return DEVICE @@ -83,12 +88,12 @@ def _get_temperature_setter_wrapper(self): # READ TESTS @skip_if_recsim("Backdoor not available in recsim") def test_WHEN_using_needle_valve_THEN_flow_exists(self): - self._lewis.backdoor_run_function_on_device("set_needlevalve_flow", [sensors[0], 5.0]) + self._lewis.backdoor_run_function_on_device("set_needlevalve_flow", [SENSORS[0], 5.0]) self.ca.assert_that_pv_is_number("A01:FLOW", 5.0, tolerance=0, timeout=15) @skip_if_recsim("Backdoor not available in recsim") def test_WHEN_using_needle_valve_THEN_valve_dir_exists(self): - self._lewis.backdoor_run_function_on_device("set_needlevalve_direction", [sensors[0], 1]) + self._lewis.backdoor_run_function_on_device("set_needlevalve_direction", [SENSORS[0], 1]) self.ca.assert_that_pv_is("A01:VALVE_DIR", "OPENING", timeout=15) # WRITE TESTS From fad7212ab53f6036faedf62be1ca61c7735792e0 Mon Sep 17 00:00:00 2001 From: esmithExperimentControls Date: Thu, 7 Nov 2024 16:13:07 +0000 Subject: [PATCH 17/21] set_delay_time now no longer takes an address --- tests/eurotherm_eibisynch.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/eurotherm_eibisynch.py b/tests/eurotherm_eibisynch.py index 2b21195f..0dd23ebf 100644 --- a/tests/eurotherm_eibisynch.py +++ b/tests/eurotherm_eibisynch.py @@ -113,7 +113,7 @@ def test_GIVEN_None_txt_calib_file_WHEN_changed_to_C006_calib_file_THEN_the_cali self.ca.assert_that_pv_is("A01:TEMP:RANGE:UNDER.B", c006_calibration_file_min) def test_GIVEN_sim_delay_WHEN_temp_read_from_many_sensors_THEN_all_reads_correct(self) -> None: - self._lewis.backdoor_run_function_on_device("set_delay_time", ["01", (300 / 1000)]) + self._lewis.backdoor_run_function_on_device("set_delay_time", [(300 / 1000)]) for id in range(1, 7): pv_sensor = f"A{id:02}" sensor = f"{id:02}" @@ -127,14 +127,14 @@ def test_GIVEN_sim_delay_WHEN_temp_read_from_many_sensors_THEN_all_reads_correct self._lewis.backdoor_run_function_on_device("set_delay_time", [None, 0.0]) def test_GIVEN_sim_delay_WHEN_temp_set_on_multiple_sensors_THEN_all_reads_correct(self) -> None: - self._lewis.backdoor_run_function_on_device("set_delay_time", ["01", (300 / 1000)]) + self._lewis.backdoor_run_function_on_device("set_delay_time", [(300 / 1000)]) for id in range(1, 7): sensor = f"A{id:02}" for temp in range(1, 3): self._set_setpoint_and_current_temperature(float(temp), sensor=sensor) self.ca.assert_that_pv_is(f"{sensor}:RBV", float(temp), timeout=30.0) - self._lewis.backdoor_run_function_on_device("set_delay_time", [None, 0.0]) + self._lewis.backdoor_run_function_on_device("set_delay_time", [0.0]) def test_GIVEN_many_sensors_WHEN_one_sensor_disconnect_THEN_other_sensors_read_ok(self) -> None: # Ensure that A02 is connected From 7e31bf26d785f3ff3a32ecc6ae64d1e5a512f773 Mon Sep 17 00:00:00 2001 From: esmithExperimentControls Date: Thu, 7 Nov 2024 16:16:01 +0000 Subject: [PATCH 18/21] missed on address --- tests/eurotherm_eibisynch.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/eurotherm_eibisynch.py b/tests/eurotherm_eibisynch.py index 0dd23ebf..07ba1abc 100644 --- a/tests/eurotherm_eibisynch.py +++ b/tests/eurotherm_eibisynch.py @@ -124,7 +124,7 @@ def test_GIVEN_sim_delay_WHEN_temp_read_from_many_sensors_THEN_all_reads_correct ) self.ca.assert_that_pv_is(f"{pv_sensor}:RBV", float(temp)) - self._lewis.backdoor_run_function_on_device("set_delay_time", [None, 0.0]) + self._lewis.backdoor_run_function_on_device("set_delay_time", [0.0]) def test_GIVEN_sim_delay_WHEN_temp_set_on_multiple_sensors_THEN_all_reads_correct(self) -> None: self._lewis.backdoor_run_function_on_device("set_delay_time", [(300 / 1000)]) From 5cb841b707983ff24c3776ba51ce1fee9df0e858 Mon Sep 17 00:00:00 2001 From: esmithExperimentControls Date: Mon, 11 Nov 2024 10:27:17 +0000 Subject: [PATCH 19/21] assert_that_pv_is_not_number's tolerance should default to float 0.0 --- utils/channel_access.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/utils/channel_access.py b/utils/channel_access.py index 019201c4..3d2ec84b 100644 --- a/utils/channel_access.py +++ b/utils/channel_access.py @@ -433,7 +433,7 @@ def assert_that_pv_is_number( pv_value_source=pv_value_source, ) - def assert_that_pv_is_not_number(self, pv, restricted, tolerance=0, timeout=None): + def assert_that_pv_is_not_number(self, pv, restricted, tolerance=0.0, timeout=None): """ Assert that the pv is at least tolerance from the restricted value within the timeout From 145f57cb009dcea20de0eb937f96b9377fd24d90 Mon Sep 17 00:00:00 2001 From: esmithExperimentControls Date: Mon, 11 Nov 2024 11:44:38 +0000 Subject: [PATCH 20/21] Changes for ruff/pyright errors --- common_tests/eurotherm.py | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/common_tests/eurotherm.py b/common_tests/eurotherm.py index bb1162ee..a335b2a0 100644 --- a/common_tests/eurotherm.py +++ b/common_tests/eurotherm.py @@ -1,13 +1,15 @@ import abc import os import time -import contextlib +import typing +import unittest +from typing import ContextManager from parameterized import parameterized -from typing import ContextManager from utils.calibration_utils import reset_calibration_file, use_calibration_file from utils.channel_access import ChannelAccess +from utils.emulator_launcher import LewisLauncher from utils.ioc_launcher import EPICS_TOP, IOCRegister from utils.testing import get_running_lewis_and_ioc, parameterized_list, skip_if_recsim @@ -25,7 +27,10 @@ PV_SENSORS = ["A01", "A02", "A03", "A04", "A05", "A06"] -class EurothermBaseTests(metaclass=abc.ABCMeta): +# This class is only valid for classes which also derive from unittest.TestCase, +# and we can't derive from unittest.TestCase at runtime, because +# unittest would try to execute them as tests +class EurothermBaseTests(unittest.TestCase if typing.TYPE_CHECKING else object, metaclass=abc.ABCMeta): """ Tests for the Eurotherm temperature controller. """ @@ -53,9 +58,10 @@ def setUp(self): self._setup_lewis_and_channel_access() self._reset_device_state() self.ca_no_prefix = ChannelAccess() + self._lewis:LewisLauncher def _setup_lewis_and_channel_access(self): - self._lewis, self._ioc = get_running_lewis_and_ioc("eurotherm", "EUROTHRM_01") + self._lewis, self._ioc = get_running_lewis_and_ioc("eurotherm", "EUROTHRM_01") # type:ignore self.ca = ChannelAccess(device_prefix="EUROTHRM_01", default_wait_time=0) self.ca.assert_that_pv_exists("A01:RBV", timeout=30) self.ca.assert_that_pv_exists("A01:CAL:SEL", timeout=10) @@ -121,7 +127,7 @@ def test_WHEN_set_ramp_rate_in_K_per_min_THEN_current_temperature_reaches_set_po "A01:TEMP:SP:RBV", setpoint_temperature, tolerance=0.1, timeout=60 ) end = time.time() - self.assertAlmostEquals( + self.assertAlmostEqual( end - start, 60.0 * (setpoint_temperature - start_temperature) / ramp_rate, delta=0.1 * (end - start), @@ -342,7 +348,7 @@ def test_WHEN_output_set_via_backdoor_THEN_output_updates(self, _, val): @parameterized.expand(parameterized_list(TEST_VALUES)) @skip_if_recsim("Backdoor not available in recsim") - def test_WHEN_output_set_via_backdoor_THEN_output_updates(self, _, val): + def test_WHEN_max_output_set_via_backdoor_THEN_output_updates(self, _, val): self._lewis.backdoor_run_function_on_device("set_max_output", [SENSORS[0], val]) self.ca.assert_that_pv_is_number("A01:MAX_OUTPUT", val, tolerance=0.05, timeout=15) From ba363378c40f92462fc6427f2e0e35d611408cf8 Mon Sep 17 00:00:00 2001 From: esmithExperimentControls Date: Mon, 11 Nov 2024 12:11:07 +0000 Subject: [PATCH 21/21] Ruff formatting --- common_tests/eurotherm.py | 45 ++++++++++++++++++--------- tests/eurotherm_eibisynch.py | 4 +-- tests/eurotherm_modbus.py | 2 +- tests/eurotherm_modbus_needlevalve.py | 2 +- utils/emulator_launcher.py | 6 ++-- 5 files changed, 38 insertions(+), 21 deletions(-) diff --git a/common_tests/eurotherm.py b/common_tests/eurotherm.py index a335b2a0..d5037199 100644 --- a/common_tests/eurotherm.py +++ b/common_tests/eurotherm.py @@ -27,10 +27,13 @@ PV_SENSORS = ["A01", "A02", "A03", "A04", "A05", "A06"] -# This class is only valid for classes which also derive from unittest.TestCase, -# and we can't derive from unittest.TestCase at runtime, because + +# This class is only valid for classes which also derive from unittest.TestCase, +# and we can't derive from unittest.TestCase at runtime, because # unittest would try to execute them as tests -class EurothermBaseTests(unittest.TestCase if typing.TYPE_CHECKING else object, metaclass=abc.ABCMeta): +class EurothermBaseTests( + unittest.TestCase if typing.TYPE_CHECKING else object, metaclass=abc.ABCMeta +): """ Tests for the Eurotherm temperature controller. """ @@ -58,10 +61,10 @@ def setUp(self): self._setup_lewis_and_channel_access() self._reset_device_state() self.ca_no_prefix = ChannelAccess() - self._lewis:LewisLauncher + self._lewis: LewisLauncher def _setup_lewis_and_channel_access(self): - self._lewis, self._ioc = get_running_lewis_and_ioc("eurotherm", "EUROTHRM_01") # type:ignore + self._lewis, self._ioc = get_running_lewis_and_ioc("eurotherm", "EUROTHRM_01") # type:ignore self.ca = ChannelAccess(device_prefix="EUROTHRM_01", default_wait_time=0) self.ca.assert_that_pv_exists("A01:RBV", timeout=30) self.ca.assert_that_pv_exists("A01:CAL:SEL", timeout=10) @@ -81,7 +84,7 @@ def _reset_device_state(self, sensor=PV_SENSORS[0]): self._set_setpoint_and_current_temperature(intial_temp) self.ca.assert_that_pv_is(f"{sensor}:TEMP", intial_temp) - #Ensure the temperature isn't being changed by a ramp any more + # Ensure the temperature isn't being changed by a ramp any more self.ca.assert_that_pv_value_is_unchanged(f"{sensor}:TEMP", wait=3) def _set_setpoint_and_current_temperature(self, temperature, sensor=PV_SENSORS[0]): @@ -92,9 +95,13 @@ def _set_setpoint_and_current_temperature(self, temperature, sensor=PV_SENSORS[0 self.ca.assert_that_pv_is(f"{sensor}:SIM:TEMP:SP:RBV", temperature) else: i = PV_SENSORS.index(sensor) - self._lewis.backdoor_run_function_on_device("set_current_temperature", [SENSORS[i], temperature]) + self._lewis.backdoor_run_function_on_device( + "set_current_temperature", [SENSORS[i], temperature] + ) self.ca.assert_that_pv_is_number(f"{sensor}:TEMP", temperature, 0.1, timeout=30) - self._lewis.backdoor_run_function_on_device("set_ramp_setpoint_temperature", [SENSORS[i], temperature]) + self._lewis.backdoor_run_function_on_device( + "set_ramp_setpoint_temperature", [SENSORS[i], temperature] + ) self.ca.assert_that_pv_is_number(f"{sensor}:TEMP:SP:RBV", temperature, 0.1, timeout=30) def test_WHEN_read_rbv_temperature_THEN_rbv_value_is_same_as_backdoor(self): @@ -134,12 +141,16 @@ def test_WHEN_set_ramp_rate_in_K_per_min_THEN_current_temperature_reaches_set_po ) # Lower tolerance will be too tight given scan rate def test_WHEN_sensor_disconnected_THEN_ramp_setting_is_disabled(self): - self._lewis.backdoor_run_function_on_device("set_current_temperature", [SENSORS[0], SENSOR_DISCONNECTED_VALUE]) + self._lewis.backdoor_run_function_on_device( + "set_current_temperature", [SENSORS[0], SENSOR_DISCONNECTED_VALUE] + ) self.ca.assert_that_pv_is_number("A01:RAMPON:SP.DISP", 1) def test_GIVEN_sensor_disconnected_WHEN_sensor_reconnected_THEN_ramp_setting_is_enabled(self): - self._lewis.backdoor_run_function_on_device("set_current_temperature", [SENSORS[0], SENSOR_DISCONNECTED_VALUE]) + self._lewis.backdoor_run_function_on_device( + "set_current_temperature", [SENSORS[0], SENSOR_DISCONNECTED_VALUE] + ) self._lewis.backdoor_run_function_on_device("set_current_temperature", [SENSORS[0], 0]) @@ -150,7 +161,9 @@ def test_GIVEN_ramp_was_off_WHEN_sensor_disconnected_THEN_ramp_is_off_and_cached ): self.ca.set_pv_value("A01:RAMPON:SP", 0) - self._lewis.backdoor_run_function_on_device("set_current_temperature", [SENSORS[0], SENSOR_DISCONNECTED_VALUE]) + self._lewis.backdoor_run_function_on_device( + "set_current_temperature", [SENSORS[0], SENSOR_DISCONNECTED_VALUE] + ) self.ca.assert_that_pv_is("A01:RAMPON", "OFF") self.ca.assert_that_pv_is("A01:RAMPON:CACHE", "OFF") @@ -160,7 +173,9 @@ def test_GIVEN_ramp_was_on_WHEN_sensor_disconnected_THEN_ramp_is_off_and_cached_ ): self.ca.set_pv_value("A01:RAMPON:SP", 1) - self._lewis.backdoor_run_function_on_device("set_current_temperature", [SENSORS[0], SENSOR_DISCONNECTED_VALUE]) + self._lewis.backdoor_run_function_on_device( + "set_current_temperature", [SENSORS[0], SENSOR_DISCONNECTED_VALUE] + ) self.ca.assert_that_pv_is("A01:RAMPON", "OFF") self.ca.assert_that_pv_is("A01:RAMPON:CACHE", "ON") @@ -168,7 +183,9 @@ def test_GIVEN_ramp_was_on_WHEN_sensor_disconnected_THEN_ramp_is_off_and_cached_ def test_GIVEN_ramp_was_on_WHEN_sensor_disconnected_and_reconnected_THEN_ramp_is_on(self): self.ca.set_pv_value("A01:RAMPON:SP", 1) - self._lewis.backdoor_run_function_on_device("set_current_temperature", [SENSORS[0], SENSOR_DISCONNECTED_VALUE]) + self._lewis.backdoor_run_function_on_device( + "set_current_temperature", [SENSORS[0], SENSOR_DISCONNECTED_VALUE] + ) self.ca.assert_that_pv_is("A01:RAMPON", "OFF") self._lewis.backdoor_run_function_on_device("set_current_temperature", [SENSORS[0], 0]) @@ -317,7 +334,7 @@ def test_WHEN_disconnected_THEN_in_alarm(self): with self._get_temperature_setter_wrapper(): for record in records: self.ca.assert_that_pv_alarm_is(record, ChannelAccess.Alarms.NONE, timeout=30) - + def test_WHEN_eurotherm_missing_THEN_updates_of_PVs_stop(self): with self._lewis.backdoor_simulate_disconnected_addr(): self.ca.assert_that_pv_value_is_unchanged("A01:RBV", 20) diff --git a/tests/eurotherm_eibisynch.py b/tests/eurotherm_eibisynch.py index 07ba1abc..ab1fb334 100644 --- a/tests/eurotherm_eibisynch.py +++ b/tests/eurotherm_eibisynch.py @@ -2,7 +2,6 @@ import unittest from parameterized import parameterized -from utils.emulator_launcher import LewisLauncher from common_tests.eurotherm import ( NONE_TXT_CALIBRATION_MAX_TEMPERATURE, @@ -10,6 +9,7 @@ EurothermBaseTests, ) from utils.calibration_utils import use_calibration_file +from utils.emulator_launcher import LewisLauncher from utils.ioc_launcher import ProcServLauncher, get_default_ioc_dir from utils.test_modes import TestModes @@ -44,7 +44,7 @@ class EurothermTests(EurothermBaseTests, unittest.TestCase): def setUp(self): super(EurothermTests, self).setUp() self._lewis.backdoor_run_function_on_device("set_scaling", ["01", 1.0]) - self._lewis:LewisLauncher + self._lewis: LewisLauncher def get_device(self): return DEVICE diff --git a/tests/eurotherm_modbus.py b/tests/eurotherm_modbus.py index 385ebc9d..0f54a031 100644 --- a/tests/eurotherm_modbus.py +++ b/tests/eurotherm_modbus.py @@ -54,7 +54,7 @@ def setUp(self): self._lewis.backdoor_run_function_on_device( "set_scaling", [sensors[0], 1.0 / float(SCALING)] ) - self._lewis:LewisLauncher + self._lewis: LewisLauncher def get_device(self): return DEVICE diff --git a/tests/eurotherm_modbus_needlevalve.py b/tests/eurotherm_modbus_needlevalve.py index 61ab88f5..a32afe4f 100644 --- a/tests/eurotherm_modbus_needlevalve.py +++ b/tests/eurotherm_modbus_needlevalve.py @@ -71,7 +71,7 @@ def setUp(self): ) with ManagerMode(ChannelAccess()): self.ca.set_pv_value("A01:FLOW_SP_MODE_SELECT:SP", "AUTO") - self._lewis:LewisLauncher + self._lewis: LewisLauncher def get_device(self): return DEVICE diff --git a/utils/emulator_launcher.py b/utils/emulator_launcher.py index e8006ea3..95eda343 100644 --- a/utils/emulator_launcher.py +++ b/utils/emulator_launcher.py @@ -481,15 +481,15 @@ def backdoor_simulate_disconnected_device( self.backdoor_set_on_device(emulator_property, True) @contextlib.contextmanager - def backdoor_simulate_disconnected_addr(self, emulator_property = "address"): + def backdoor_simulate_disconnected_addr(self, emulator_property="address"): """ Simulate device with disconnected ADDR, such as Eurotherm """ - self.backdoor_run_function_on_device("set_connected", ['01' , False]) + self.backdoor_run_function_on_device("set_connected", ["01", False]) try: yield finally: - self.backdoor_run_function_on_device("set_connected", ['01' , True]) + self.backdoor_run_function_on_device("set_connected", ["01", True]) class NullEmulatorLauncher(EmulatorLauncher):