Skip to content

121 augmentor implementation #133

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
169 changes: 140 additions & 29 deletions microscope/filterwheels/aurox.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,40 @@

"""Adds support for Aurox devices

Requires package hidapi."""
Requires package hidapi.

Config sample:

device(microscope.filterwheels.aurox.Clarity,
{'camera': 'microscope.Cameras.cameramodule.SomeCamera',
'camera.someSetting': value})

Deconvolving data requires:
* availability of clarity_process and cv2
* successful completion of a calibration step
+ set_mode(Modes.calibrate)
+ trigger the camera to generate an image
+ when the camera returns the image, calibration is complete
"""
import hid
import logging
import microscope.devices
from enum import Enum
from typing import Mapping
from enum import IntEnum

_logger = logging.getLogger(__name__)

try:
# Currently, clarity_process is a module that is not packaged, so needs
# to be put on the python path somewhere manually.
from clarity_process import ClarityProcessor
except Exception:
_logger.warning("Could not import clarity_process module:"
"no processing available.")

## Clarity constants. These may differ across products, so mangle names.
Mode = IntEnum("Mode", "difference, raw, calibrate")

# Clarity constants. These may differ across products, so mangle names.
# USB IDs
_Clarity__VENDORID = 0x1F0A
_Clarity__PRODUCTID = 0x0088
Expand Down Expand Up @@ -72,11 +99,53 @@
_Clarity__SETSVCMODE1 = 0xe0 #1 byte for service mode. SLEEP activates service mode. RUN returns to normal mode.


class Clarity(microscope.devices.FilterWheelBase):
class _CameraAugmentor:
def __init__(self, **kwargs):
super().__init__(**kwargs)
self._aurox_mode = Mode.raw
self._processor = None

def set_aurox_mode(self, mode):
self._aurox_mode = mode

def _process_data(self, data):
"""Process data depending on state of self._aurox_mode."""
if self._aurox_mode == Mode.raw:
return data
elif self._aurox_mode == Mode.difference:
if self._processor is None:
raise Exception("Not calibrated yet - can not process image")
return self._processor.process(data)
elif self._aurox_mode == Mode.calibrate:
# This will introduce a significant delay, but returning the
# image indicates that the calibration step is complete.
self._processor = clarity_process.ClarityProcessor(data)
return data
else:
raise Exception("Unrecognised mode: %s", self._aurox_mode)

def get_sensor_shape(self):
"""Return image shape accounting for rotation and Aurox processing."""
shape = self._get_sensor_shape()
# Does current mode combine two halves into a single image?
if self._aurox_mode in [Mode.difference]:
shape = (shape[1] // 2, shape[0])
# Does the current transform perform a 90-degree rotation?
if self._transform[2]:
# 90 degree rotation
shape = (shape[1], shape[0])
return shape


class Clarity(microscope.devices.ControllerDevice,
microscope.devices.FilterWheelBase):
"""Adds support for Aurox Clarity

Acts as a ControllerDevice providing the camera attached to the Clarity."""
_slide_to_sectioning = {__SLDPOS0: 'bypass',
__SLDPOS1: 'low',
__SLDPOS2: 'mid',
__SLDPOS3: 'high',}
__SLDPOS3: 'high'}
_positions = 4
_resultlen = {__GETONOFF: 1,
__GETDOOR: 1,
Expand All @@ -86,25 +155,71 @@ class Clarity(microscope.devices.FilterWheelBase):
__GETSERIAL: 4,
__FULLSTAT: 10}

def __init__(self, **kwargs):
def __init__(self, camera=None, camera_kwargs={}, **kwargs) -> None:
"""Create a Clarity instance controlling an optional Camera device.

:param camera: a class to control the connected camera
:param camera_kwargs: parameters passed to camera as keyword arguments
"""
super().__init__(**kwargs)
from threading import Lock
# A comms lock.
self._lock = Lock()
# The hid connection object
self._hid = None
self._devices = {}
if camera is None:
self._cam = None
_logger.warning("No camera specified.")
self._can_process = False
else:
AugmentedCamera = type("AuroxAugmented" + camera.__name__,
(_CameraAugmentor, camera), {})
self._cam = AugmentedCamera(**camera_kwargs)
self._can_process = 'ClarityProcessor' in globals()
# Acquisition mode
self._mode = Mode.raw
# Add device settings
self.add_setting("sectioning", "enum",
self.get_slide_position,
lambda val: self.set_slide_position(val),
self._slide_to_sectioning)
self.add_setting("mode", "enum",
lambda: self._mode.name,
self.set_mode,
Mode)

@property
def devices(self) -> Mapping[str, microscope.devices.Device]:
"""Devices property, required by ControllerDevice interface."""
if self._cam:
return {'camera': self._cam}
else:
return {}

def set_mode(self, mode: Mode) -> None:
"""Set the operation mode"""
if mode in [Mode.calibrate, Mode.difference] and not self._can_process:
raise Exception("Processing not available")
else:
self._cam.set_aurox_mode(mode)
if mode == Mode.calibrate:
self._set_calibration(True)
else:
self._set_calibration(False)

def _send_command(self, command, param=0, max_length=16, timeout_ms=100):
"""Send a command to the Clarity and return its response"""
if not self._hid:
self.open()
try:
self.open()
except Exception:
raise Exception("Connection error")
with self._lock:
# The device expects a list of 16 integers
buffer = [0x00] * max_length # The 0th element must be 0.
buffer[1] = command # The 1st element is the command
buffer[2] = param # The 2nd element is any command argument.
buffer = [0x00] * max_length # The 0th element must be 0.
buffer[1] = command # The 1st element is the command
buffer[2] = param # The 2nd element is any command argument.
result = self._hid.write(buffer)
if result == -1:
# Nothing to read back. Check hid error state.
Expand Down Expand Up @@ -140,7 +255,7 @@ def open(self):
h = hid.device()
h.open(vendor_id=__VENDORID, product_id=__PRODUCTID)
h.set_nonblocking(False)
except:
except Exception:
raise
self._hid = h

Expand All @@ -153,15 +268,13 @@ def get_id(self):
return self._send_command(__GETSERIAL)

def _on_enable(self):
if not self.is_connected:
self.open()
self._send_command(__SETONOFF, __RUN)
return self._send_command(__GETONOFF) == __RUN

def _on_disable(self):
self._send_command(__SETONOFF, __SLEEP)

def set_calibration(self, state):
def _set_calibration(self, state):
if state:
result = self._send_command(__SETCAL, __CALON)
else:
Expand All @@ -188,12 +301,17 @@ def get_slides(self):
return (self._slide_to_sectioning)

def get_status(self):
# Fetch 10 bytes VERSION[3],ONOFF,SHUTTER,SLIDE,FILT,CAL,??,??
result = self._send_command(__FULLSTAT)
if result is None:
return
# A status dict to populate and return
status = {}
status = dict.fromkeys(['connected', 'on', 'door open', 'slide',
'filter', 'calibration', 'busy', 'mode'])
status['mode'] = self._mode.name
# Fetch 10 bytes VERSION[3],ONOFF,SHUTTER,SLIDE,FILT,CAL,??,??
try:
result = self._send_command(__FULLSTAT)
status['connected'] = True
except Exception:
status['connected'] = False
return status
# A list to track states, any one of which mean the device is busy.
busy = []
# Disk running
Expand Down Expand Up @@ -228,10 +346,6 @@ def get_status(self):
status['busy'] = any(busy)
return status

# Implemented by FilterWheelBase
#def get_filters(self):
# pass

def moving(self):
"""Report whether or not the device is between positions."""
import time
Expand All @@ -243,14 +357,14 @@ def moving(self):
moving = False
for i in range(5):
moving = moving or any( (self.get_slide_position() == __SLDMID,
self.get_position() == __FLTMID) )
self.get_position() == __FLTMID))
time.sleep(0.01)
return moving

def get_position(self):
"""Return the current filter position"""
result = self._send_command(__GETFILT)
if result == __FLTERR:
if result == __FLTERR:
raise Exception("Filter position error.")
return result

Expand All @@ -263,8 +377,5 @@ def set_position(self, pos, blocking=True):
pass
return result

def _on_shutdown(self):
pass

def initialize(self):
pass
super().initialize()