Skip to content

Commit

Permalink
Moves generic action to task code to socs.util
Browse files Browse the repository at this point in the history
  • Loading branch information
jlashner committed Sep 12, 2024
1 parent 0fd6841 commit cc1d1f6
Show file tree
Hide file tree
Showing 3 changed files with 295 additions and 219 deletions.
310 changes: 95 additions & 215 deletions socs/agents/lakeshore240/agent.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,18 @@
import argparse
import dataclasses
import os
import queue
import time
import traceback
import warnings
from dataclasses import dataclass, fields
from typing import (
Any, Dict, Generator, Optional, Tuple, Type, Callable, get_args, get_origin,
Union
)
from dataclasses import dataclass
from typing import ( Any, Dict, Optional, )

import txaio # type: ignore
from ocs import ocs_agent, site_config
from ocs.ocs_twisted import Pacemaker

from socs.Lakeshore.Lakeshore240 import Module
from socs.util import BaseAction, register_task_from_action, OcsOpReturnType

txaio.use_twisted()

Expand All @@ -25,215 +22,91 @@
log = txaio.make_logger() # pylint: disable=E1101


ActionReturnType = Optional[Dict[str, Any]]
OcsOpReturnType = Tuple[bool, str]
OcsInlineCallbackReturnType = Generator[Any, Any, OcsOpReturnType]
class LS240Action(BaseAction):
def process(self, module: Module) -> None:
raise NotImplementedError

@dataclass
class UploadCalCurve(LS240Action):
"""upload_cal_curve(channel, filename)
class Actions:
"Namespace to hold action classes for the Lakeshore240 agent."
**Task** - Upload a calibration curve to a channel.
@dataclass
class BaseAction:
"Base class for all actions."

def __post_init__(self) -> None:
self.processed: bool = False
self.success: bool = False
self.traceback: Optional[str] = None
self.result: ActionReturnType = None

def resolve_action(
self,
success: bool,
traceback: Optional[str] = None,
result: ActionReturnType = None
) -> None:
self.success = success
self.traceback = traceback
self.result = result
self.processed = True

def process(self, module: Module) -> ActionReturnType:
raise NotImplementedError

def sleep_until_processed(self, interval=0.2) -> None:
while not self.processed:
time.sleep(interval)

@dataclass
class UploadCalCurve(BaseAction):
"""upload_cal_curve(channel, filename)
**Task** - Upload a calibration curve to a channel.
Args
------
channel (int):
Channel number, 1-8.
filename (str):
Filename for calibration curve.
"""

channel: int
filename: str

def process(self, module: Module) -> ActionReturnType:
log.info(f"Starting upload to channel {self.channel}...")
channel = module.channels[self.channel - 1]
channel.load_curve(self.filename)
time.sleep(0.1)
return None

@dataclass
class SetValues(BaseAction):
"""set_values(channel, sensor=None, auto_range=None, range=None,\
current_reversal=None, units=None, enabled=None, name=None)
**Task** - Set sensor parameters for a Lakeshore240 Channel.
Args
---------
channel (int):
Channel number to set. Valid choices are 1-8.
sensor (int, optional):
Specifies sensor type. See
:func:`socs.Lakeshore.Lakeshore240.Channel.set_values` for
possible types.
auto_range (int, optional):
Specifies if channel should use autorange. Must be 0 or 1.
range (int, optional):
Specifies range if auto_range is false. Only settable for NTC
RTD. See
:func:`socs.Lakeshore.Lakeshore240.Channel.set_values` for
possible ranges.
current_reversal (int, optional):
Specifies if input current reversal is on or off.
Always 0 if input is a diode.
units (int, optional):
Specifies preferred units parameter, and sets the units for
alarm settings. See
:func:`socs.Lakeshore.Lakeshore240.Channel.set_values` for
possible units.
enabled (int, optional):
Sets if channel is enabled.
name (str, optional):
Sets name of channel.
"""

channel: int
sensor: Optional[int] = None
auto_range: Optional[int] = None
range: Optional[int] = None
current_reversal: Optional[int] = None
units: Optional[int] = None
enabled: Optional[int] = None
name: Optional[str] = None

def process(self, module: Module) -> ActionReturnType:
log.info(f"Setting values for channel {self.channel}...")
module.channels[self.channel - 1].set_values(
sensor=self.sensor,
auto_range=self.auto_range,
range=self.range,
current_reversal=self.current_reversal,
unit=self.units,
enabled=self.enabled,
name=self.name,
)
time.sleep(0.1)
return None


def is_instanceable(t: Type) -> bool:
"""
Checks if its possible to run isinstance with a specified type. This is
needed because older version of python don't let you run this on subscripted
generics.
Args
------
channel (int):
Channel number, 1-8.
filename (str):
Filename for calibration curve.
"""
try:
isinstance(0, t)
return True
except Exception:
return False

def get_param_type(t: Type) -> Optional[Type]:
"""
Takes in a dataclass field type and returns a type that is accepted
by the OCS param decorator. This will return the original type if it
works with isinstance, or will attempt to unwrap an optional type. Other
types are not currently supported. If it fails, it will return None.
"""
origin_type = get_origin(t)

# Unwrap possible option type
if origin_type == Union:
sub_types = get_args(t)
if len(sub_types) != 2:
return None
if type(None) not in sub_types:
return None
for st in sub_types:
if st is not type(None):
if is_instanceable(st):
return st

elif is_instanceable(t):
return t

return None


def register_task_from_action(
agent: ocs_agent.OCSAgent,
name: str,
action_class: Type[Actions.BaseAction],
queue: "queue.Queue[Actions.BaseAction]"
) -> None:
"""
Registers an OCSTask from an Action type. This will define ocs_params based
on the dataclass fields, and set the task docstrings equal to the action
class docstrings.
channel: int
filename: str

def process(self, module: Module) -> None:
log.info(f"Starting upload to channel {self.channel}...")
channel = module.channels[self.channel - 1]
channel.load_curve(self.filename)
time.sleep(0.1)
raise Exception("TEST")

@dataclass
class SetValues(LS240Action):
"""set_values(channel, sensor=None, auto_range=None, range=None,\
current_reversal=None, units=None, enabled=None, name=None)
**Task** - Set sensor parameters for a Lakeshore240 Channel.
Args
---------
channel (int):
Channel number to set. Valid choices are 1-8.
sensor (int, optional):
Specifies sensor type. See
:func:`socs.Lakeshore.Lakeshore240.Channel.set_values` for
possible types.
auto_range (int, optional):
Specifies if channel should use autorange. Must be 0 or 1.
range (int, optional):
Specifies range if auto_range is false. Only settable for NTC
RTD. See
:func:`socs.Lakeshore.Lakeshore240.Channel.set_values` for
possible ranges.
current_reversal (int, optional):
Specifies if input current reversal is on or off.
Always 0 if input is a diode.
units (int, optional):
Specifies preferred units parameter, and sets the units for
alarm settings. See
:func:`socs.Lakeshore.Lakeshore240.Channel.set_values` for
possible units.
enabled (int, optional):
Sets if channel is enabled.
name (str, optional):
Sets name of channel.
"""

def task(
session: ocs_agent.OpSession,
params: Optional[Dict[str, Any]] = None
) -> OcsOpReturnType:
_params = {} if params is None else params
action = action_class(**_params)
queue.put(action)
action.sleep_until_processed()

if not action.success:
log.error("{name} failed to process...", name=name)
if action.traceback is not None:
log.error("traceback:\n{traceback}", traceback=action.traceback)
return False, f"{name} failed"

if action.result is not None:
session.data.update(action.result)

return True, f"{name} succeded"

task.__doc__ = action_class.__doc__

# Adds ocs parameters
for f in fields(action_class):
param_type = get_param_type(f.type)
if param_type is None:
raise ValueError(f"Unsupported param type for arg {f.name}: {f.type}")
param_kwargs: Dict[str, Any] = {
'type': param_type,
}
if f.default != dataclasses.MISSING:
param_kwargs['default'] = f.default
if isinstance(f.metadata, dict):
param_kwargs.update(f.metadata.get('ocs_param_kwargs', {}))
task = ocs_agent.param(f.name, **param_kwargs)(task)

agent.register_task(name, task)

channel: int
sensor: Optional[int] = None
auto_range: Optional[int] = None
range: Optional[int] = None
current_reversal: Optional[int] = None
units: Optional[int] = None
enabled: Optional[int] = None
name: Optional[str] = None

def process(self, module: Module) -> None:
log.info(f"Setting values for channel {self.channel}...")
module.channels[self.channel - 1].set_values(
sensor=self.sensor,
auto_range=self.auto_range,
range=self.range,
current_reversal=self.current_reversal,
unit=self.units,
enabled=self.enabled,
name=self.name,
)
time.sleep(0.1)

class LS240_Agent:
def __init__(
Expand All @@ -245,14 +118,17 @@ def __init__(
self.agent: ocs_agent.OCSAgent = agent
self.port = port
self.f_sample = f_sample
self.action_queue: "queue.Queue[Actions.BaseAction]" = queue.Queue()
self.action_queue: "queue.Queue[LS240Action]" = queue.Queue()

def queue_action(action: LS240Action):
self.action_queue.put(action)

# Register Operaionts
register_task_from_action(
agent, 'set_values', Actions.SetValues, self.action_queue
agent, "set_values", SetValues, queue_action
)
register_task_from_action(
agent, 'upload_cal_curve', Actions.UploadCalCurve, self.action_queue
agent, "upload_cal_curve", UploadCalCurve, queue_action
)
agent.register_process("main", self.main, self._stop_main, startup=True)

Expand Down Expand Up @@ -307,11 +183,15 @@ def _process_actions(self, module: Module) -> None:
action = self.action_queue.get()
try:
log.info(f"Running action {action}")
result = action.process(module)
action.resolve_action(True, result=result)
action.process(module)
action.resolve_action(True)
except Exception: # pylint: disable=broad-except
log.error(f"Error processing action: {action}")
action.resolve_action(False, traceback=traceback.format_exc())
action.resolve_action(
False,
traceback=traceback.format_exc(),
return_message="Uncaught Exception"
)

def main(
self,
Expand All @@ -328,7 +208,7 @@ def main(
# Clear pre-existing actions
while not self.action_queue.empty():
action = self.action_queue.get()
action.resolve_action(False, traceback="Aborted by main process")
action.resolve_action(False, return_message="Aborted by main process")

exceptions_to_attempt_reconnect = (ConnectionError, TimeoutError)

Expand Down
Loading

0 comments on commit cc1d1f6

Please sign in to comment.