-
Notifications
You must be signed in to change notification settings - Fork 0
/
utils.py
89 lines (75 loc) · 4.07 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
import platform
import threading
import time
def convert_com_port(com_port):
"""Convert between Windows and Linux style serial ports.
Args:
com_port (str): Windows-style COM port (e.g. 'COM1')
Returns:
str: Appropriate port name for current platform
"""
if platform.system() != "Windows":
# Convert Windows-style COM port to Linux-style
return f"/dev/ttyS{int(com_port[-1]) - 1}"
return com_port
def pressure_alarm(low_threshold=10, high_threshold=30):
"""
Decorator function that keeps track of pressure for safe operation. It will trigger
an alarm if low or high pressure thresholds are exceeded. In the event of a trigger
all shutoff valves will be closed and the process temperature will be taken down to 20C
Args:
low_threshold (int): lower pressure limit trigger value in case of working under vacuum
high_theshold (int): higher pressure limit trigger value in case of a serious flow restriction event
"""
def decorator(func):
def wrapper(self, *args, **kwargs):
# Flag to signal when the monitored method has finished
finished = threading.Event()
# Define a background function to monitor the pressure
def monitor_pressure():
while not finished.is_set():
# Read the pressure values
p_a, p_b = self.flowSMS.pressure_report()
# Check if either pressure exceeds the threshold
if p_a > high_threshold or p_b > high_threshold:
self.flowSMS.setpoints() # Trigger adjustment if above threshold
try:
print(
"!!!!!!!!!!!!!!HIGH PRESSURE ALARM!!!!!!!!!!!!!!\n!!!!!!!!!!!!!!HIGH PRESSURE ALARM!!!!!!!!!!!!!!\n",
"!!!!!!!!!!!!!!HIGH PRESSURE ALARM!!!!!!!!!!!!!!\n!!!!!!!!!!!!!!HIGH PRESSURE ALARM!!!!!!!!!!!!!!\n",
f"PRESSURE IN LINE A = {p_a} psia, PRESSURE IN LINE B = {p_b} psia.\n",
"CLOSING ALL SHUTOFF VALVES AND TAKING SYSTEM TO ROOM TEMPERATURE",
)
finished.set() # Stop monitoring if alarm is triggered
self.setpoint_finish_experiment()
return
except (ValueError, TypeError):
continue
elif p_a < low_threshold or p_b < low_threshold:
self.flowSMS.setpoints() # Trigger adjustment if above threshold
try:
print(
"!!!!!!!!!!!!!!LOW PRESSURE ALARM!!!!!!!!!!!!!!\n!!!!!!!!!!!!!!LOW PRESSURE ALARM!!!!!!!!!!!!!!\n",
"!!!!!!!!!!!!!!LOW PRESSURE ALARM!!!!!!!!!!!!!!\n!!!!!!!!!!!!!!LOW PRESSURE ALARM!!!!!!!!!!!!!!\n",
f"PRESSURE IN LINE A = {p_a} psia, PRESSURE IN LINE B = {p_b} psia.\n",
"CLOSING ALL SHUTOFF VALVES AND TAKING SYSTEM TO ROOM TEMPERATURE",
)
finished.set() # Stop monitoring if alarm is triggered
self.setpoint_finish_experiment()
return
except (ValueError, TypeError):
continue
time.sleep(1) # Check every second
# Start monitoring in a separate thread
monitor_thread = threading.Thread(target=monitor_pressure)
monitor_thread.start()
try:
# Execute the main function while monitoring continues in the background
result = func(self, *args, **kwargs)
finally:
# Signal the monitor thread to stop after the function completes
finished.set()
monitor_thread.join() # Ensure the monitor thread completes
return result
return wrapper
return decorator