Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

102 import new bristol fos driver from diamondos #103

Merged
merged 4 commits into from
Oct 1, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ when necessary for the use of a specific QMI driver:
- [RPi.GPIO](https://pypi.org/project/RPi.GPIO/)
- [Silicon Labs CP210x USB to UART Bridge](https://www.silabs.com/developers/usb-to-uart-bridge-vcp-drivers)
- [uldaq.py](https://pypi.org/project/uldaq/)
- [mcculw.py](https://pypi.org/project/mcculw/)
- [usbdll.dll](https://www.newport.com/software-and-drivers)
- [VCP driver](https://ftdichip.com/Drivers/vcp-drivers/)
- [stmcdc.inf](https://www.wieserlabs.com/products/radio-frequency/flexdds-ng-dual/FlexDDS-NG-ad9910_standalone.zip)
Expand Down
64 changes: 64 additions & 0 deletions bin/instruments/qmi_bristol_fos
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
#! /usr/bin/env python3

"""Command line client for the Bristol FOS Fiber-optic switch."""

import argparse
from contextlib import nullcontext, AbstractContextManager
import sys

import qmi
from qmi.instruments.bristol import Bristol_Fos
from qmi.utils.context_managers import start_stop, open_close


def main() -> None:

parser = argparse.ArgumentParser()
parser.description = "Command line client for the Bristol FOS Fiber-optic switch."

source = parser.add_mutually_exclusive_group(required=True)
source.add_argument("--rpc", help="QMI RPC address of device.")
source.add_argument("--unique_id", help="Unique ID of the device")

parser.add_argument("--board_id", type=int, help="Board number for the device. Windows only!")
parser.add_argument("--channel", type=int, help="Channel number to set")

args = parser.parse_args()

instr: Bristol_Fos
with start_stop(qmi, "bristol_fos_client", console_loglevel="WARNING"), parse_source(args) as instr:
rbudhrani marked this conversation as resolved.
Show resolved Hide resolved
if args.channel is None:
print(f"Bristol FOS with unique ID {args.unique_id} found. No channel selected.")

else:
print(f"Selecting channel {args.channel} for Bristol FOS with unique ID {args.unique_id}.")
instr.select_channel(args.channel)


def parse_source(args) -> AbstractContextManager:
# make the instrument
if args.unique_id is not None and args.board_id is not None:
return open_close(qmi.make_instrument(
instrument_name="FOS",
instrument_class=Bristol_Fos,
unique_id=args.unique_id,
board_id=int(args.board_id)
))

elif args.unique_id is not None:
return open_close(qmi.make_instrument(
instrument_name="FOS",
instrument_class=Bristol_Fos,
unique_id=args.unique_id
))

elif args.rpc is not None:
# get the instrument
qmi.context().connect_to_peer(args.rpc.split('.')[0])
return nullcontext(qmi.get_instrument(args.rpc))

raise ValueError("Expected an unique ID or RPC address for the instrument!")


if __name__ == "__main__":
sys.exit(main())
2 changes: 1 addition & 1 deletion qmi/instruments/bristol/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,4 @@
"""

from qmi.instruments.bristol.bristol_871a import Bristol_871A
from qmi.instruments.bristol.fos import Bristol_FOS
from qmi.instruments.bristol.fos import Bristol_Fos as Bristol_Fos
221 changes: 183 additions & 38 deletions qmi/instruments/bristol/fos.py
Original file line number Diff line number Diff line change
@@ -1,51 +1,78 @@
"""
Instrument driver for the four-channel Bristol Fiber-Optic Switch (FOS)

.. autoclass:: Bristol_FOS
:members:
:undoc-members:
"""

import logging
import sys
import typing
from typing import Optional

import qmi.core.exceptions
from qmi.core.context import QMI_Context
from qmi.core.instrument import QMI_Instrument
from qmi.core.rpc import rpc_method

# Lazy import of the "uldaq" module. See the function _import_modules() below.
# Lazy import of the "uldaq" or "ul" and "enums" modules. See the function _import_modules() below.
uldaq = None
ul, enums = None, None
if typing.TYPE_CHECKING:
import uldaq
else:
uldaq = None
if sys.platform.startswith("linux") or sys.platform == "darwin":
import uldaq # type: ignore

if sys.platform.startswith("win"):
import ul # type: ignore
import enums # type: ignore


# Global variable holding the logger for this module.
_logger = logging.getLogger(__name__)


def _import_modules() -> None:
"""Import the "uldaq" library.
"""Import the "uldaq" library or "mcculw" modules.

This import is done in a function, instead of at the top-level,
to avoid an unnecessary dependency for programs that do not access
the instrument directly.
This import is done in a function, instead of at the top-level, to avoid an unnecessary
dependencies for programs that do not access the instrument directly.
"""
global uldaq
if uldaq is None:
import uldaq # pylint: disable=W0621
global uldaq, ul, enums
_logger.debug("Importing %s modules", sys.platform)
if (sys.platform.startswith("linux") or sys.platform == "darwin") and uldaq is None:
import uldaq # type: ignore

elif sys.platform.startswith("win") and (ul is None or enums is None):
from mcculw import ul, enums # type: ignore
rbudhrani marked this conversation as resolved.
Show resolved Hide resolved

class Bristol_FOS(QMI_Instrument):

def __init__(self, context: QMI_Context, name: str, unique_id: str) -> None:
super().__init__(context, name)
class _Bristol_FosUnix:
"""Unix version for the Bristol FOS instrument driver."""
def __init__(self, unique_id: str) -> None:
self._unique_id = unique_id
self._device = None
self._dio_device = None

# Import the "uldaq" module.
_import_modules()
@property
def device(self) -> "uldaq.DaqDevice": # type: ignore
"""Property for holding the DAQ device object."""
assert self._device is not None
return self._device

@property
def dio_device(self) -> "uldaq.DioDevice": # type: ignore
"""Property for holding the DIO device object."""
assert self._dio_device is not None
return self._dio_device

@staticmethod
def _find_device_descriptor(unique_id: str) -> "Optional[uldaq.DaqDeviceDescriptor]":
_import_modules()
def _find_device_descriptor(unique_id: str) -> "Optional[uldaq.DaqDeviceDescriptor]": # type: ignore
"""A method to retrieve a specific instrument's 'device descriptor' object based on unique ID of the instrument.

Parameters:
unique_id: A unique ID string.

Returns:
device_descriptor: The device descriptor with matching unique ID or None.
"""
assert uldaq is not None
device_descriptors = uldaq.get_daq_device_inventory(uldaq.InterfaceType.ANY)
for device_descriptor in device_descriptors:
if device_descriptor.unique_id == unique_id:
Expand All @@ -54,45 +81,163 @@ def _find_device_descriptor(unique_id: str) -> "Optional[uldaq.DaqDeviceDescript

@rpc_method
def open(self) -> None:
self._check_is_closed()

device_descriptor = self._find_device_descriptor(self._unique_id)
if device_descriptor is None:
raise ValueError("Bristol FOS with unique_id {!r} not found.".format(self._unique_id))
_logger.error("Bristol FOS with unique_id '%s' not found.", self._unique_id)
raise ValueError(f"Bristol FOS with unique_id '{self._unique_id!r}' not found.")

assert uldaq is not None
device = uldaq.DaqDevice(device_descriptor)
try:
device.connect()
try:
dio_device = device.get_dio_device()
dio_device.d_config_port(uldaq.DigitalPortType.FIRSTPORTA, uldaq.DigitalDirection.OUTPUT)
except:
except Exception as exc:
device.disconnect()
raise
except:
raise Exception from exc
except Exception as exc:
device.release()
raise
_logger.error("Bristol FOS device connection failed with: %s", str(exc))
raise qmi.core.exceptions.QMI_InstrumentException("Bristol FOS device connection failed.") from exc

self._device = device
self._dio_device = dio_device

@rpc_method
def close(self) -> None:
self.device.disconnect()
self.device.release()
self._device = None
self._dio_device = None

@rpc_method
def select_channel(self, channel: int) -> None:
assert uldaq is not None
# Note that the 'channel parameter has values 1..4 ; these are mapped to value 0..3 here.
self.dio_device.d_out(uldaq.DigitalPortType.FIRSTPORTA, channel - 1)


class _Bristol_FosWindows:
"""Windows version for the Bristol FOS instrument driver"""

def __init__(self, unique_id: str, board_id: int) -> None:
self._unique_id = unique_id
self._board_id = board_id

@property
def board_id(self) -> int:
"""A property for instrument board number. It must be within [0, 99]."""
assert 0 <= self._board_id <= 99
return self._board_id

@board_id.setter
def board_id(self, new_id: int) -> None:
"""A property for setting a new instrument board number. It must be within [0, 99]."""
assert isinstance(new_id, int)
assert 0 <= new_id <= 99
self._board_id = new_id

@staticmethod
def _find_device_descriptor(unique_id: str) -> "Optional[structs.DaqDeviceDescriptor]": # type: ignore
"""A method to retrieve a specific instrument's 'device descriptor' object based on unique ID of the instrument.

Parameters:
unique_id: A unique ID string.

Returns:
device_descriptor: The device descriptor with matching unique ID or None.
"""
assert ul is not None and enums is not None
device_descriptors = ul.get_daq_device_inventory(enums.InterfaceType.ANY)
for device_descriptor in device_descriptors:
if device_descriptor.unique_id == unique_id:
return device_descriptor
return None # Device not found.

@rpc_method
def open(self) -> None:
assert ul is not None and enums is not None
ul.ignore_instacal() # With this we ignore 'cg.cfg' file and enable runtime configuring.
device_descriptor = self._find_device_descriptor(self._unique_id)
if device_descriptor is None:
_logger.error("Bristol FOS with unique_id '%s' not found.", self._unique_id)
raise ValueError(f"Bristol FOS with unique_id '{self._unique_id!r}' not found.")

try:
ul.create_daq_device(self.board_id, device_descriptor)
assert self.board_id == ul.get_board_number(
device_descriptor
), f"{self.board_id} != {ul.get_board_number(device_descriptor)}"
ul.d_config_port(self.board_id, enums.DigitalPortType.FIRSTPORTA, enums.DigitalIODirection.OUT)
except Exception as exc:
_logger.error("Bristol FOS device configuration failed with: %s", str(exc))
ul.release_daq_device(self.board_id)
raise qmi.core.exceptions.QMI_InstrumentException("Bristol FOS device port configuration failed.") from exc

@rpc_method
def close(self) -> None:
assert ul is not None
ul.release_daq_device(self.board_id)
self._board_id = None

@rpc_method
def select_channel(self, channel: int) -> None:
assert ul is not None
# Note that the 'channel parameter has values 1..4 ; these are mapped to value 0..3 here.
ul.d_out(self._board_id, enums.DigitalPortType.FIRSTPORTA, channel - 1)


class Bristol_Fos(QMI_Instrument):
"""Base class for the Bristol FOS instrument driver"""

def __init__(self, context: QMI_Context, name: str, unique_id: str, board_id: int = 0) -> None:
"""Base class initialization. Depending on the system platform, a Windows-compatible or
Unix-compatible driver version is instantiated.

Attributes:
fos: The instantiated FOS device on the driver.

Parameters:
context: A QMI_Context instance.
name: Name for the instrument in the context.
unique_id: An unique identification number, a.k.a. serial number of the device.
board_id: Board number for Windows driver. Not used for Unix driver.
rbudhrani marked this conversation as resolved.
Show resolved Hide resolved
"""
super().__init__(context, name)
if sys.platform.startswith("win"):
self.fos = _Bristol_FosWindows(unique_id, board_id)

else:
self.fos = _Bristol_FosUnix(unique_id)

# Import the support module.
_import_modules()

@rpc_method
def open(self) -> None:
self._check_is_closed()
self.fos.open()
super().open()

@rpc_method
def close(self) -> None:
self._check_is_open()
assert self._device is not None
self._device.disconnect()
self._device.release()
self._device = None
self._dio_device = None
self.fos.close()
super().close()

@rpc_method
def select_channel(self, channel: int) -> None:
"""Method for selecting an output channel for DIO device.

Parameters:
channel: The output channel number. Must be in range [1, 4].

Raises:
ValueError: At invalid channel number.
"""
self._check_is_open()
if not channel in [1, 2, 3, 4]:
if channel not in range(1, 5):
raise ValueError("Bad channel: {}".format(channel))
assert self._dio_device is not None
# Note that the 'channel parameter has values 1..4 ; these are mapped to value 0..3 here.
self._dio_device.d_out(uldaq.DigitalPortType.FIRSTPORTA, channel - 1)

self.fos.select_channel(channel)
Loading