Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactoring eurotherm tests to work with multiple sensors #625

Merged
merged 22 commits into from
Nov 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
6a56ae8
Add basic test for disconnected eurotherm
esmith1729 May 13, 2024
f392909
Add pv assert to ensure that RBV doesn't change when the eurotherm is…
esmith1729 May 14, 2024
8325e9a
Merge remote-tracking branch 'origin/master' into ticket8310_euro_mis…
esmith1729 Oct 21, 2024
b8c1776
change set on device to run function
esmith1729 Oct 30, 2024
b01c6f3
refactoring
esmith1729 Oct 30, 2024
2beeacd
Add several changes needed to work with new eurotherm emulator design
esmith1729 Nov 1, 2024
b7b1171
Reformat simulated delay tests to work with new emulator structure
esmith1729 Nov 4, 2024
92fa146
Change reset_device and set_setpoint_and current temp to only run for…
esmith1729 Nov 4, 2024
2c06ed8
add sensor list, change backdoor set to backdoor run function
esmith1729 Nov 4, 2024
df795bc
Change backdoor set to backdoor run function, and add sensors list
esmith1729 Nov 4, 2024
2cb3644
Add test to check that one sensor can disconnect, and another can sti…
esmith1729 Nov 5, 2024
cb3e858
made sensors and pv_sensors constants, and import contextmanager, add…
esmith1729 Nov 7, 2024
b8f2e2d
various fixes to satisfy ruff and pyright
esmith1729 Nov 7, 2024
7af68e8
Change name of method parameter to not have conflicts with eibisynch …
esmith1729 Nov 7, 2024
ec53a65
Change name of method parameter to not conflict with method of same n…
esmith1729 Nov 7, 2024
0dba4af
Format fixes for ruff and pyright, add LewisLauncher to setUp to fix …
esmith1729 Nov 7, 2024
0530b4b
Made sensors constant, some file reformatting for ruff, add LewisLaun…
esmith1729 Nov 7, 2024
fad7212
set_delay_time now no longer takes an address
esmith1729 Nov 7, 2024
7e31bf2
missed on address
esmith1729 Nov 7, 2024
5cb841b
assert_that_pv_is_not_number's tolerance should default to float 0.0
esmith1729 Nov 11, 2024
145f57c
Changes for ruff/pyright errors
esmith1729 Nov 11, 2024
ba36337
Ruff formatting
esmith1729 Nov 11, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 76 additions & 41 deletions common_tests/eurotherm.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
import abc
import os
import time
import typing
import unittest
from typing import ContextManager

from parameterized import parameterized

from utils.calibration_utils import reset_calibration_file, use_calibration_file
from utils.channel_access import ChannelAccess
from utils.emulator_launcher import LewisLauncher
from utils.ioc_launcher import EPICS_TOP, IOCRegister
from utils.testing import get_running_lewis_and_ioc, parameterized_list, skip_if_recsim

Expand All @@ -19,69 +23,85 @@
# PIDs cannot be floating-point
PID_TEST_VALUES = [-50, 50, 3000]

SENSORS = ["01", "02", "03", "04", "05", "06"]

class EurothermBaseTests(metaclass=abc.ABCMeta):
PV_SENSORS = ["A01", "A02", "A03", "A04", "A05", "A06"]


# This class is only valid for classes which also derive from unittest.TestCase,
# and we can't derive from unittest.TestCase at runtime, because
# unittest would try to execute them as tests
class EurothermBaseTests(
unittest.TestCase if typing.TYPE_CHECKING else object, metaclass=abc.ABCMeta
):
"""
Tests for the Eurotherm temperature controller.
"""

@abc.abstractmethod
def get_device(self):
def get_device(self) -> str:
pass

@abc.abstractmethod
def get_emulator_device(self):
def get_emulator_device(self) -> str:
pass

@abc.abstractmethod
def _get_temperature_setter_wrapper(self):
def _get_temperature_setter_wrapper(self) -> ContextManager:
pass

@abc.abstractmethod
def get_scaling(self):
def get_scaling(self) -> str:
pass

def get_prefix(self):
def get_prefix(self) -> str:
return "{}:A01".format(self.get_device())

def setUp(self):
self._setup_lewis_and_channel_access()
self._reset_device_state()
self.ca_no_prefix = ChannelAccess()
self._lewis: LewisLauncher

def _setup_lewis_and_channel_access(self):
self._lewis, self._ioc = get_running_lewis_and_ioc("eurotherm", "EUROTHRM_01")
self._lewis, self._ioc = get_running_lewis_and_ioc("eurotherm", "EUROTHRM_01") # type:ignore
self.ca = ChannelAccess(device_prefix="EUROTHRM_01", default_wait_time=0)
self.ca.assert_that_pv_exists("A01:RBV", timeout=30)
self.ca.assert_that_pv_exists("A01:CAL:SEL", timeout=10)

def _reset_device_state(self, sensor="A01"):
self._lewis.backdoor_set_on_device("connected", True)
def _reset_device_state(self, sensor=PV_SENSORS[0]):
i = PV_SENSORS.index(sensor)
self._lewis.backdoor_run_function_on_device("set_connected", [SENSORS[i], True])
reset_calibration_file(self.ca, prefix=f"{sensor}:")

intial_temp = 0.0

self._set_setpoint_and_current_temperature(intial_temp)

self._lewis.backdoor_set_on_device("ramping_on", False)
self._lewis.backdoor_set_on_device("ramp_rate", 1.0)
self._lewis.backdoor_run_function_on_device("set_ramping_on", [SENSORS[0], True])
self._lewis.backdoor_run_function_on_device("set_ramp_rate", [SENSORS[0], 1.0])
self.ca.set_pv_value(f"{sensor}:RAMPON:SP", 0, sleep_after_set=0)

self._set_setpoint_and_current_temperature(intial_temp)
self.ca.assert_that_pv_is(f"{sensor}:TEMP", intial_temp)
# Ensure the temperature isn't being changed by a ramp any more
self.ca.assert_that_pv_value_is_unchanged(f"{sensor}:TEMP", 5)
self.ca.assert_that_pv_value_is_unchanged(f"{sensor}:TEMP", wait=3)

def _set_setpoint_and_current_temperature(self, temperature, sensor="A01"):
def _set_setpoint_and_current_temperature(self, temperature, sensor=PV_SENSORS[0]):
if IOCRegister.uses_rec_sim:
self.ca.set_pv_value(f"{sensor}:SIM:TEMP:SP", temperature)
self.ca.assert_that_pv_is(f"{sensor}:SIM:TEMP", temperature)
self.ca.assert_that_pv_is(f"{sensor}:SIM:TEMP:SP", temperature)
self.ca.assert_that_pv_is(f"{sensor}:SIM:TEMP:SP:RBV", temperature)
else:
self._lewis.backdoor_set_on_device("current_temperature", temperature)
i = PV_SENSORS.index(sensor)
self._lewis.backdoor_run_function_on_device(
"set_current_temperature", [SENSORS[i], temperature]
)
self.ca.assert_that_pv_is_number(f"{sensor}:TEMP", temperature, 0.1, timeout=30)
self._lewis.backdoor_set_on_device("ramp_setpoint_temperature", temperature)
self._lewis.backdoor_run_function_on_device(
"set_ramp_setpoint_temperature", [SENSORS[i], temperature]
)
self.ca.assert_that_pv_is_number(f"{sensor}:TEMP:SP:RBV", temperature, 0.1, timeout=30)

def test_WHEN_read_rbv_temperature_THEN_rbv_value_is_same_as_backdoor(self):
Expand Down Expand Up @@ -114,21 +134,25 @@ def test_WHEN_set_ramp_rate_in_K_per_min_THEN_current_temperature_reaches_set_po
"A01:TEMP:SP:RBV", setpoint_temperature, tolerance=0.1, timeout=60
)
end = time.time()
self.assertAlmostEquals(
self.assertAlmostEqual(
end - start,
60.0 * (setpoint_temperature - start_temperature) / ramp_rate,
delta=0.1 * (end - start),
) # Lower tolerance will be too tight given scan rate

def test_WHEN_sensor_disconnected_THEN_ramp_setting_is_disabled(self):
self._lewis.backdoor_set_on_device("current_temperature", SENSOR_DISCONNECTED_VALUE)
self._lewis.backdoor_run_function_on_device(
"set_current_temperature", [SENSORS[0], SENSOR_DISCONNECTED_VALUE]
)

self.ca.assert_that_pv_is_number("A01:RAMPON:SP.DISP", 1)

def test_GIVEN_sensor_disconnected_WHEN_sensor_reconnected_THEN_ramp_setting_is_enabled(self):
self._lewis.backdoor_set_on_device("current_temperature", SENSOR_DISCONNECTED_VALUE)
self._lewis.backdoor_run_function_on_device(
"set_current_temperature", [SENSORS[0], SENSOR_DISCONNECTED_VALUE]
)

self._lewis.backdoor_set_on_device("current_temperature", 0)
self._lewis.backdoor_run_function_on_device("set_current_temperature", [SENSORS[0], 0])

self.ca.assert_that_pv_is_number("A01:RAMPON:SP.DISP", 0)

Expand All @@ -137,7 +161,9 @@ def test_GIVEN_ramp_was_off_WHEN_sensor_disconnected_THEN_ramp_is_off_and_cached
):
self.ca.set_pv_value("A01:RAMPON:SP", 0)

self._lewis.backdoor_set_on_device("current_temperature", SENSOR_DISCONNECTED_VALUE)
self._lewis.backdoor_run_function_on_device(
"set_current_temperature", [SENSORS[0], SENSOR_DISCONNECTED_VALUE]
)

self.ca.assert_that_pv_is("A01:RAMPON", "OFF")
self.ca.assert_that_pv_is("A01:RAMPON:CACHE", "OFF")
Expand All @@ -147,17 +173,21 @@ def test_GIVEN_ramp_was_on_WHEN_sensor_disconnected_THEN_ramp_is_off_and_cached_
):
self.ca.set_pv_value("A01:RAMPON:SP", 1)

self._lewis.backdoor_set_on_device("current_temperature", SENSOR_DISCONNECTED_VALUE)
self._lewis.backdoor_run_function_on_device(
"set_current_temperature", [SENSORS[0], SENSOR_DISCONNECTED_VALUE]
)

self.ca.assert_that_pv_is("A01:RAMPON", "OFF")
self.ca.assert_that_pv_is("A01:RAMPON:CACHE", "ON")

def test_GIVEN_ramp_was_on_WHEN_sensor_disconnected_and_reconnected_THEN_ramp_is_on(self):
self.ca.set_pv_value("A01:RAMPON:SP", 1)

self._lewis.backdoor_set_on_device("current_temperature", SENSOR_DISCONNECTED_VALUE)
self._lewis.backdoor_run_function_on_device(
"set_current_temperature", [SENSORS[0], SENSOR_DISCONNECTED_VALUE]
)
self.ca.assert_that_pv_is("A01:RAMPON", "OFF")
self._lewis.backdoor_set_on_device("current_temperature", 0)
self._lewis.backdoor_run_function_on_device("set_current_temperature", [SENSORS[0], 0])

self.ca.assert_that_pv_is("A01:RAMPON", "ON")

Expand Down Expand Up @@ -265,7 +295,7 @@ def test_WHEN_config_file_and_temperature_unit_changed_THEN_then_ramp_rate_unit_
]
)
def test_GIVEN_None_txt_calibration_file_WHEN_temperature_is_set_THEN(
self, _, temperature, expected_value_of_under_range_calc_pv
self, _, temperature, exp_val
):
# Arrange

Expand All @@ -282,10 +312,10 @@ def test_GIVEN_None_txt_calibration_file_WHEN_temperature_is_set_THEN(
# Assert

self.ca.assert_that_pv_is("A01:TEMP:RANGE:UNDER.A", temperature)
self.ca.assert_that_pv_is("A01:TEMP:RANGE:UNDER", expected_value_of_under_range_calc_pv)
self.ca.assert_that_pv_is("A01:TEMP:RANGE:UNDER", expected_value=exp_val)

@parameterized.expand(
[
def test_WHEN_disconnected_THEN_in_alarm(self):
records = [
"A01:TEMP",
"A01:TEMP:SP:RBV",
"A01:P",
Expand All @@ -295,43 +325,48 @@ def test_GIVEN_None_txt_calibration_file_WHEN_temperature_is_set_THEN(
"A01:MAX_OUTPUT",
"A01:LOWLIM",
]
)
def test_WHEN_disconnected_THEN_in_alarm(self, record):
self.ca.assert_that_pv_alarm_is(record, ChannelAccess.Alarms.NONE)
with self._lewis.backdoor_simulate_disconnected_device():
self.ca.assert_that_pv_alarm_is(record, ChannelAccess.Alarms.INVALID, timeout=60)
for record in records:
self.ca.assert_that_pv_alarm_is(record, ChannelAccess.Alarms.NONE)
with self._lewis.backdoor_simulate_disconnected_addr():
for record in records:
self.ca.assert_that_pv_alarm_is(record, ChannelAccess.Alarms.INVALID, timeout=60)
# Assert alarms clear on reconnection
with self._get_temperature_setter_wrapper():
self.ca.assert_that_pv_alarm_is(record, ChannelAccess.Alarms.NONE, timeout=30)
for record in records:
self.ca.assert_that_pv_alarm_is(record, ChannelAccess.Alarms.NONE, timeout=30)

def test_WHEN_eurotherm_missing_THEN_updates_of_PVs_stop(self):
with self._lewis.backdoor_simulate_disconnected_addr():
self.ca.assert_that_pv_value_is_unchanged("A01:RBV", 20)

@parameterized.expand(parameterized_list(PID_TEST_VALUES))
@skip_if_recsim("Backdoor not available in recsim")
def test_WHEN_p_set_via_backdoor_THEN_p_updates(self, _, val):
self._lewis.backdoor_set_on_device("p", val)
self._lewis.backdoor_run_function_on_device("set_p", [SENSORS[0], val])
self.ca.assert_that_pv_is_number("A01:P", val, tolerance=0.05, timeout=15)

@parameterized.expand(parameterized_list(PID_TEST_VALUES))
@skip_if_recsim("Backdoor not available in recsim")
def test_WHEN_i_set_via_backdoor_THEN_i_updates(self, _, val):
self._lewis.backdoor_set_on_device("i", val)
self._lewis.backdoor_run_function_on_device("set_i", [SENSORS[0], val])
self.ca.assert_that_pv_is_number("A01:I", val, tolerance=0.05, timeout=15)

@parameterized.expand(parameterized_list(PID_TEST_VALUES))
@skip_if_recsim("Backdoor not available in recsim")
def test_WHEN_d_set_via_backdoor_THEN_d_updates(self, _, val):
self._lewis.backdoor_set_on_device("d", val)
self._lewis.backdoor_run_function_on_device("set_d", [SENSORS[0], val])
self.ca.assert_that_pv_is_number("A01:D", val, tolerance=0.05, timeout=15)

@parameterized.expand(parameterized_list(TEST_VALUES))
@skip_if_recsim("Backdoor not available in recsim")
def test_WHEN_output_set_via_backdoor_THEN_output_updates(self, _, val):
self._lewis.backdoor_set_on_device("output", val)
self._lewis.backdoor_run_function_on_device("set_output", [SENSORS[0], val])
self.ca.assert_that_pv_is_number("A01:OUTPUT", val, tolerance=0.05, timeout=15)

@parameterized.expand(parameterized_list(TEST_VALUES))
@skip_if_recsim("Backdoor not available in recsim")
def test_WHEN_output_set_via_backdoor_THEN_output_updates(self, _, val):
self._lewis.backdoor_set_on_device("max_output", val)
def test_WHEN_max_output_set_via_backdoor_THEN_output_updates(self, _, val):
self._lewis.backdoor_run_function_on_device("set_max_output", [SENSORS[0], val])
self.ca.assert_that_pv_is_number("A01:MAX_OUTPUT", val, tolerance=0.05, timeout=15)

@parameterized.expand(parameterized_list([0, 100, 3276]))
Expand All @@ -343,11 +378,11 @@ def test_WHEN_output_rate_set_THEN_output_rate_updates(self, _, val):
@parameterized.expand(parameterized_list(TEST_VALUES))
@skip_if_recsim("Backdoor not available in recsim")
def test_WHEN_high_limit_set_via_backdoor_THEN_high_lim_updates(self, _, val):
self._lewis.backdoor_set_on_device("high_lim", val)
self._lewis.backdoor_run_function_on_device("set_high_lim", [SENSORS[0], val])
self.ca.assert_that_pv_is_number("A01:HILIM", val, tolerance=0.05, timeout=15)

@parameterized.expand(parameterized_list(TEST_VALUES))
@skip_if_recsim("Backdoor not available in recsim")
def test_WHEN_low_limit_set_via_backdoor_THEN_low_lim_updates(self, _, val):
self._lewis.backdoor_set_on_device("low_lim", val)
self._lewis.backdoor_run_function_on_device("set_low_lim", [SENSORS[0], val])
self.ca.assert_that_pv_is_number("A01:LOWLIM", val, tolerance=0.05, timeout=15)
Loading