diff --git a/microscope/filterwheels/aurox.py b/microscope/filterwheels/aurox.py index bb7ba672..07877686 100644 --- a/microscope/filterwheels/aurox.py +++ b/microscope/filterwheels/aurox.py @@ -18,13 +18,40 @@ """Adds support for Aurox devices -Requires package hidapi.""" +Requires package hidapi. +Config sample: + +device(microscope.filterwheels.aurox.Clarity, + {'camera': 'microscope.Cameras.cameramodule.SomeCamera', + 'camera.someSetting': value}) + +Deconvolving data requires: + * availability of clarity_process and cv2 + * successful completion of a calibration step + + set_mode(Modes.calibrate) + + trigger the camera to generate an image + + when the camera returns the image, calibration is complete +""" import hid +import logging import microscope.devices -from enum import Enum +from typing import Mapping +from enum import IntEnum + +_logger = logging.getLogger(__name__) + +try: + # Currently, clarity_process is a module that is not packaged, so needs + # to be put on the python path somewhere manually. + from clarity_process import ClarityProcessor +except Exception: + _logger.warning("Could not import clarity_process module:" + "no processing available.") -## Clarity constants. These may differ across products, so mangle names. +Mode = IntEnum("Mode", "difference, raw, calibrate") + +# Clarity constants. These may differ across products, so mangle names. # USB IDs _Clarity__VENDORID = 0x1F0A _Clarity__PRODUCTID = 0x0088 @@ -72,11 +99,53 @@ _Clarity__SETSVCMODE1 = 0xe0 #1 byte for service mode. SLEEP activates service mode. RUN returns to normal mode. -class Clarity(microscope.devices.FilterWheelBase): +class _CameraAugmentor: + def __init__(self, **kwargs): + super().__init__(**kwargs) + self._aurox_mode = Mode.raw + self._processor = None + + def set_aurox_mode(self, mode): + self._aurox_mode = mode + + def _process_data(self, data): + """Process data depending on state of self._aurox_mode.""" + if self._aurox_mode == Mode.raw: + return data + elif self._aurox_mode == Mode.difference: + if self._processor is None: + raise Exception("Not calibrated yet - can not process image") + return self._processor.process(data) + elif self._aurox_mode == Mode.calibrate: + # This will introduce a significant delay, but returning the + # image indicates that the calibration step is complete. + self._processor = clarity_process.ClarityProcessor(data) + return data + else: + raise Exception("Unrecognised mode: %s", self._aurox_mode) + + def get_sensor_shape(self): + """Return image shape accounting for rotation and Aurox processing.""" + shape = self._get_sensor_shape() + # Does current mode combine two halves into a single image? + if self._aurox_mode in [Mode.difference]: + shape = (shape[1] // 2, shape[0]) + # Does the current transform perform a 90-degree rotation? + if self._transform[2]: + # 90 degree rotation + shape = (shape[1], shape[0]) + return shape + + +class Clarity(microscope.devices.ControllerDevice, + microscope.devices.FilterWheelBase): + """Adds support for Aurox Clarity + + Acts as a ControllerDevice providing the camera attached to the Clarity.""" _slide_to_sectioning = {__SLDPOS0: 'bypass', __SLDPOS1: 'low', __SLDPOS2: 'mid', - __SLDPOS3: 'high',} + __SLDPOS3: 'high'} _positions = 4 _resultlen = {__GETONOFF: 1, __GETDOOR: 1, @@ -86,25 +155,71 @@ class Clarity(microscope.devices.FilterWheelBase): __GETSERIAL: 4, __FULLSTAT: 10} - def __init__(self, **kwargs): + def __init__(self, camera=None, camera_kwargs={}, **kwargs) -> None: + """Create a Clarity instance controlling an optional Camera device. + + :param camera: a class to control the connected camera + :param camera_kwargs: parameters passed to camera as keyword arguments + """ super().__init__(**kwargs) from threading import Lock + # A comms lock. self._lock = Lock() + # The hid connection object self._hid = None + self._devices = {} + if camera is None: + self._cam = None + _logger.warning("No camera specified.") + self._can_process = False + else: + AugmentedCamera = type("AuroxAugmented" + camera.__name__, + (_CameraAugmentor, camera), {}) + self._cam = AugmentedCamera(**camera_kwargs) + self._can_process = 'ClarityProcessor' in globals() + # Acquisition mode + self._mode = Mode.raw + # Add device settings self.add_setting("sectioning", "enum", self.get_slide_position, lambda val: self.set_slide_position(val), self._slide_to_sectioning) + self.add_setting("mode", "enum", + lambda: self._mode.name, + self.set_mode, + Mode) + + @property + def devices(self) -> Mapping[str, microscope.devices.Device]: + """Devices property, required by ControllerDevice interface.""" + if self._cam: + return {'camera': self._cam} + else: + return {} + + def set_mode(self, mode: Mode) -> None: + """Set the operation mode""" + if mode in [Mode.calibrate, Mode.difference] and not self._can_process: + raise Exception("Processing not available") + else: + self._cam.set_aurox_mode(mode) + if mode == Mode.calibrate: + self._set_calibration(True) + else: + self._set_calibration(False) def _send_command(self, command, param=0, max_length=16, timeout_ms=100): """Send a command to the Clarity and return its response""" if not self._hid: - self.open() + try: + self.open() + except Exception: + raise Exception("Connection error") with self._lock: # The device expects a list of 16 integers - buffer = [0x00] * max_length # The 0th element must be 0. - buffer[1] = command # The 1st element is the command - buffer[2] = param # The 2nd element is any command argument. + buffer = [0x00] * max_length # The 0th element must be 0. + buffer[1] = command # The 1st element is the command + buffer[2] = param # The 2nd element is any command argument. result = self._hid.write(buffer) if result == -1: # Nothing to read back. Check hid error state. @@ -140,7 +255,7 @@ def open(self): h = hid.device() h.open(vendor_id=__VENDORID, product_id=__PRODUCTID) h.set_nonblocking(False) - except: + except Exception: raise self._hid = h @@ -153,15 +268,13 @@ def get_id(self): return self._send_command(__GETSERIAL) def _on_enable(self): - if not self.is_connected: - self.open() self._send_command(__SETONOFF, __RUN) return self._send_command(__GETONOFF) == __RUN def _on_disable(self): self._send_command(__SETONOFF, __SLEEP) - def set_calibration(self, state): + def _set_calibration(self, state): if state: result = self._send_command(__SETCAL, __CALON) else: @@ -188,12 +301,17 @@ def get_slides(self): return (self._slide_to_sectioning) def get_status(self): - # Fetch 10 bytes VERSION[3],ONOFF,SHUTTER,SLIDE,FILT,CAL,??,?? - result = self._send_command(__FULLSTAT) - if result is None: - return # A status dict to populate and return - status = {} + status = dict.fromkeys(['connected', 'on', 'door open', 'slide', + 'filter', 'calibration', 'busy', 'mode']) + status['mode'] = self._mode.name + # Fetch 10 bytes VERSION[3],ONOFF,SHUTTER,SLIDE,FILT,CAL,??,?? + try: + result = self._send_command(__FULLSTAT) + status['connected'] = True + except Exception: + status['connected'] = False + return status # A list to track states, any one of which mean the device is busy. busy = [] # Disk running @@ -228,10 +346,6 @@ def get_status(self): status['busy'] = any(busy) return status - # Implemented by FilterWheelBase - #def get_filters(self): - # pass - def moving(self): """Report whether or not the device is between positions.""" import time @@ -243,14 +357,14 @@ def moving(self): moving = False for i in range(5): moving = moving or any( (self.get_slide_position() == __SLDMID, - self.get_position() == __FLTMID) ) + self.get_position() == __FLTMID)) time.sleep(0.01) return moving def get_position(self): """Return the current filter position""" result = self._send_command(__GETFILT) - if result == __FLTERR: + if result == __FLTERR: raise Exception("Filter position error.") return result @@ -263,8 +377,5 @@ def set_position(self, pos, blocking=True): pass return result - def _on_shutdown(self): - pass - def initialize(self): - pass + super().initialize()