Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[QMI-094] #95

Merged
merged 12 commits into from
Aug 14, 2024
12 changes: 6 additions & 6 deletions .github/badges/pylint.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
178 changes: 96 additions & 82 deletions qmi/core/transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,14 @@
import socket
import sys
import time
import vxi11 # type: ignore
from typing import Any, Mapping, Optional, List, Tuple, Type
if sys.version_info >= (3, 9):
from collections.abc import Mapping
else:
from typing import Mapping
from typing import Any, Dict, Optional, List, Tuple, Type

import serial
import vxi11 # type: ignore

from qmi.core.context import QMI_Context
from qmi.core.exceptions import (
Expand All @@ -22,17 +26,16 @@


class QMI_Transport:
"""QMI_Transport is the base class for bidirectional byte stream
transport implementations, typically used to talk to instruments.
"""QMI_Transport is the base class for bidirectional byte stream transport implementations,
typically used to talk to instruments.

An instance of QMI_Transport represents a channel that admits reading
and writing of arbitrary byte sequences. Message boundaries are not
preserved. Subclasses of QMI_Transport implement the transport API
for specific types of communication channels.
An instance of QMI_Transport represents a channel that admits reading and writing of arbitrary
byte sequences. Message boundaries are not preserved. Subclasses of QMI_Transport implement the
transport API for specific types of communication channels.

Once created, a QMI_Transport needs to be opened via the open() method before reading and writing. When the
application has finished using the transport, it must call the close() method to close the underlying channel and
release system resources.
Once created, a QMI_Transport needs to be opened via the open() method before reading and writing.
When the application has finished using the transport, it must call the close() method to close
the underlying channel and release system resources.
"""

def __init__(self) -> None:
Expand Down Expand Up @@ -179,16 +182,30 @@ def discard_read(self) -> None:


class TransportDescriptorParser:

"""This class is for creating a transport-specific parser classes and has (static) methods that are
used for parsing transport strings.
"""
def __init__(self,
interface: str,
positionals: List[Tuple[str, Tuple[Type, bool]]],
keywords: Mapping[str, Tuple[Type, bool]]):
keywords: Mapping[str, Tuple[Type, bool]]
) -> None:
self.interface = interface
self._positionals = positionals
self._keywords = keywords

def parse_parameter_strings(self, transport_descriptor: str, default_parameters=None) -> Mapping[str, Any]:
def parse_parameter_strings(
self, transport_descriptor: str, default_parameters: Optional[Dict[str, Any]] = None
) -> Mapping[str, Any]:
heevasti marked this conversation as resolved.
Show resolved Hide resolved
"""Method for parsing transport descriptor strings.

Parameters:
transport_descriptor: The string to parse.
default_parameters: Dictionary of default parameters to be used if not present in the string.

Returns:
parameters: A generic [KT, VT] type mapped object.
"""
if default_parameters is None:
parameters = {}
else:
Expand Down Expand Up @@ -216,13 +233,13 @@ def parse_parameter_strings(self, transport_descriptor: str, default_parameters=
return parameters

@staticmethod
def _parse_parts(transport_description: str) -> List[str]:
def _parse_parts(transport_descriptor: str) -> List[str]:
regex = re.compile(
r"((?:^([^:]+))|" # transport interface: i.e. serial:...
r"(?::\[(.+)[\]$])|" # enclosed parameter (for example used in ipv6): i.e. ...:[param]:... or ...:[param]
r"(?::([^:]+)))") # regular parameter: i.e. ...:param:... or ...:param
parts = []
for match in re.finditer(regex, transport_description):
for match in re.finditer(regex, transport_descriptor):
if match[2]: # transport interface
parts.append(match[2])
elif match[3]: # enclosed parameter
Expand All @@ -231,9 +248,9 @@ def _parse_parts(transport_description: str) -> List[str]:
parts.append(match[4])
else:
raise QMI_TransportDescriptorException(
"Invalid transport descriptor {!r}".format(transport_description))
"Invalid transport descriptor {!r}".format(transport_descriptor))
if len(parts) < 2:
raise QMI_TransportDescriptorException("Invalid transport descriptor {!r}".format(transport_description))
raise QMI_TransportDescriptorException("Invalid transport descriptor {!r}".format(transport_descriptor))
return parts

@staticmethod
Expand All @@ -242,16 +259,17 @@ def _parse_interface(transport_descriptor: str) -> str:
return parts[0]

def match_interface(self, transport_descriptor: str) -> bool:
"""A method to check the transport descriptor is used with the correct parser class."""
interface = self._parse_interface(transport_descriptor).lower()
return self.interface == interface

def _check_missing_parameters(self, parameters: Mapping[str, Any]):
def _check_missing_parameters(self, parameters: Dict[str, Any]):
req_params = self._get_required_parameters()
missing_parameters = req_params.difference(parameters.keys())
if len(missing_parameters) > 0:
raise QMI_TransportDescriptorException('Missing required parameter(s): {}'.format(missing_parameters))

def _parse_positional_parameters(self, params: List[str]) -> Mapping[str, Any]:
def _parse_positional_parameters(self, params: List[str]) -> Dict[str, Any]:
positional_params = [param for param in params if not self._is_keyword_param(param)]
d = dict()
for (name, (ty, _)), param in zip(self._positionals, positional_params):
Expand All @@ -262,7 +280,7 @@ def _parse_positional_parameters(self, params: List[str]) -> Mapping[str, Any]:
ty, param)
return d

def _parse_keyword_parameters(self, strings: List[str]) -> Mapping[str, Any]:
def _parse_keyword_parameters(self, strings: List[str]) -> Dict[str, Any]:
rbudhrani marked this conversation as resolved.
Show resolved Hide resolved
keyword_strings = [param for param in strings if self._is_keyword_param(param)]
parameters = dict()
for keyword_string in keyword_strings:
Expand Down Expand Up @@ -344,15 +362,15 @@ class QMI_SerialTransport(QMI_Transport):
"""Byte stream transport via serial port.

This class can also be used for "virtual" serial ports via USB.
"""

# Set a fixed read timeout on the serial port device.
# The actual specified timeout for read() and read_until() calls will be
# rounded up to a multiple of this fixed timeout.
# The timeout parameter of the serial port device must be fixed because
# changing the timeout causes reprogramming of the serial port parameters,
# which is a slow operation and can even cause data loss (with an FTDI
# device under Windows).
Attributes:
SERIAL_READ_TIMEOUT: Set a fixed read timeout on the serial port device. The actual specified timeout
for read() and read_until() calls will be rounded up to a multiple of this fixed
timeout. The timeout parameter of the serial port device must be fixed because
changing the timeout causes reprogramming of the serial port parameters,
which is a slow operation and can even cause data loss (with an FTDI
device under Windows).
"""
SERIAL_READ_TIMEOUT = 0.040 # 40 ms

def __init__(self,
Expand Down Expand Up @@ -904,19 +922,16 @@ class QMI_UsbTmcTransport(QMI_Transport):
* write() writes the specified bytes as a single USBTMC message.
* read_until() reads a single USBTMC message (until the device indicates
end-of-message) and returns the fetched bytes.

Attributes:
DEFAULT_READ_TIMEOUT: Default timeout in seconds for USBTMC read transactions.
WRITE_TIMEOUT: Timeout in seconds for USBTMC write transactions.
"""

# Default timeout in seconds for USBTMC read transactions.
DEFAULT_READ_TIMEOUT = 60

# Timeout in seconds for USBTMC write transactions.
WRITE_TIMEOUT = 5

def __init__(self,
vendorid: int,
productid: int,
serialnr: str,
) -> None:
def __init__(self, vendorid: int, productid: int, serialnr: str) -> None:
"""Initialize te specified USB device as USBTMC instrument.

The first USBTMC-compatible interface of the USB device will be used.
Expand Down Expand Up @@ -1115,7 +1130,7 @@ def _safe_instr(self) -> vxi11.Instrument:

This aids in static typechecking, since whereas the type of _instr is Optional[T], the result of this method is
guaranteed to be of type T. It is a QMI-internal bug if this property is used in case _instr is None. In that
case, we raise an AssertionError, and we hope the users will complain to us so we can fix the bug in the
case, we raise an AssertionError, and we hope the users will complain to us, so we can fix the bug in the
library.

Raises: AssertionError: in case the property is used when the underlying value of _instr is None.
Expand Down Expand Up @@ -1274,55 +1289,54 @@ def list_usbtmc_transports() -> List[str]:
return QMI_PyUsbTmcTransport.list_resources()


def create_transport(transport_descriptor: str,
default_attributes: Optional[Mapping[str, Any]] = None) -> QMI_Transport:
def create_transport(
transport_descriptor: str, default_attributes: Optional[Dict[str, Any]] = None
) -> QMI_Transport:
"""Create a bidirectional communication channel.

A transport_descriptor specifies all information that may be needed to
open a transport, including parameters such as port number, baud rate, etc.
A transport_descriptor specifies all information that may be needed to open a transport, including parameters
such as port number, baud rate, etc. Certain entries are obligatory, like giving the host IP address for UDP and
TCP transports. Other entries are optional, and are indicated with `<`, `>` characters. For those entries, if
not given, the string format below indicates the default value used in that case with the `=value` part. Do not
include the `<`, `>` characters in the strings.

String format:
- UDP connection: "tcp:host[:port]"
- TCP connection: "tcp:host[:port][:connect_timeout=T]"
- Serial port: "serial:device[:baudrate=115200][:databits=8][:parity=N][:stopbits=1]"
- USBTMC device: "usbtmc[:vendorid=0xvid][:productid=0xpid]:serialnr=sn"
- GPIB device: "gpib:[board=0]:primary_addr[:secondary_addr=2][:connect_timeout=30.0]"
- VXI-11 instrument: "vxi11:host"

"host" (for UDP, TCP & VXI-11 transports) specifies the host name or IP address of
the UDP server/TCP client. Numerical IPv6 addresses must be enclosed in square brackets.

"port" (for UDP and TCP transports) specifies the UDP/TCP port number of the server/client.

"connect_timeout" is TCP connection timeout. Default is 10s.

"device" (for serial port transports) is the name of the serial port,
for example "COM3" or "/dev/ttyUSB0".

"baudrate" (for serial port transports) specifies the number of bits per second.
This attribute is only required for instruments with a configurable baud rate.

"bytesize" (for serial port transports) specifies the number of data bits
per character (valid range 5 - 8).
This attribute is only required for instruments with a configurable character format.

"parity" (for serial port transports) specifies the parity bit ('O' or 'E' or ''N').
This attribute is only required for instruments with a configurable character format.

"stopbits" (for serial port transports) specifies the number of stop bits (1 or 1.5 or 2).
This attribute is only required for instruments with a configurable character format.

"rtscts" (for serial port transports) enables or disables RTS/CTS flow control.
Possible values are True and False; the default is False.

"vendorid" is the USB Vendor ID as a decimal number or as hexadecimal with 0x prefix.
"productid" is the USB Product ID as a decimal number or as hexadecimal with 0x prefix.
"serialnr" is the USB serial number string.

"primary_addr" is GPIB device number (integer).
"board" is optional GPIB interface number (GPIB[board]::...). Default is None.
"secondary_addr" is optional secondary device address number. Default is None.
"connect_timeout" is for opening resource for GPIB device, in seconds; the default is 30s.
- UDP connection: "udp:host<:port>"
- TCP connection: "tcp:host<:port><:connect_timeout=10>"
- Serial port: "serial:device<:baudrate=115200><:databits=8><:parity=N><:stopbits=1>"
- USBTMC device: "usbtmc:vendorid:productid:serialnr"
- GPIB device: "gpib:<board=None:>primary_addr<:secondary_addr=None><:connect_timeout=30.0>"

UDP, TCP and VXI-11:
- "host" (for UDP, TCP & VXI-11 transports) specifies the host name or IP address of the UDP server/TCP client.
Numerical IPv6 addresses must be enclosed in square brackets, e.g. "tcp:[2620:0:2d0:200::8]:5000".
- "port" (for UDP and TCP transports) specifies the UDP/TCP port number of the server/client.
- "connect_timeout" is TCP connection timeout.

Serial:
- "device" is the name of the serial port, for example "COM3" or "/dev/ttyUSB0".
- "baudrate" specifies the number of bits per second.
This attribute is only required for instruments with a configurable baud rate.
- "bytesize" specifies the number of data bits per character (valid range 5 - 8).
This attribute is only required for instruments with a configurable character format.
- "parity" specifies the parity bit ('O' or 'E' or ''N').
This attribute is only required for instruments with a configurable character format.
- "stopbits" specifies the number of stop bits (1 or 1.5 or 2).
This attribute is only required for instruments with a configurable character format.
- "rtscts" enables or disables RTS/CTS flow control.
Possible values are True and False; the default is False.

USBTMC:
- "vendorid" is the USB Vendor ID as a decimal number or as hexadecimal with 0x prefix.
- "productid" is the USB Product ID as a decimal number or as hexadecimal with 0x prefix.
- "serialnr" is the USB serial number string.

GPIB:
- "primary_addr" is GPIB device number (integer).
- "board" is optional GPIB interface number (in VISA syntax GPIB[board]::...).
- "secondary_addr" is optional secondary device address number.
- "connect_timeout" is for opening resource for GPIB device, in seconds.
"""
if SerialTransportDescriptorParser.match_interface(transport_descriptor):
attributes = SerialTransportDescriptorParser.parse_parameter_strings(transport_descriptor, default_attributes)
Expand Down
2 changes: 1 addition & 1 deletion qmi/core/transport_gpib_visa.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def __init__(
board: Optional[int] = None,
secondary_addr: Optional[int] = None,
connect_timeout: float = 30.0
):
) -> None:
"""Initialization of the Gpib transport.

Parameters:
Expand Down
2 changes: 1 addition & 1 deletion qmi/core/transport_usbtmc_pyusb.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

class QMI_PyUsbTmcTransport(QMI_UsbTmcTransport):

def __init__(self, vendorid: int, productid: int, serialnr: str):
def __init__(self, vendorid: int, productid: int, serialnr: str) -> None:
super().__init__(vendorid, productid, serialnr)
self._device: Optional[usbtmc.Instrument] = None

Expand Down
2 changes: 1 addition & 1 deletion qmi/core/transport_usbtmc_visa.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

class QMI_VisaUsbTmcTransport(QMI_UsbTmcTransport):

def __init__(self, vendorid: int, productid: int, serialnr: str):
def __init__(self, vendorid: int, productid: int, serialnr: str) -> None:
super().__init__(vendorid, productid, serialnr)
self._device: Optional[pyvisa.ResourceManager] = None

Expand Down
6 changes: 6 additions & 0 deletions tests/core/test_transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,12 @@ def test_parse_tcp_ipv6(self, mock):
self.assertIs(trans, mock.return_value)
mock.assert_called_once_with(host="2620:0:2d0:200::8", port=5000)

@unittest.mock.patch("qmi.core.transport.QMI_TcpTransport")
def test_parse_tcp_ipv6_range(self, mock):
trans = create_transport("tcp:[ 2001:db8:1234::/48]:5000")
self.assertIs(trans, mock.return_value)
mock.assert_called_once_with(host=" 2001:db8:1234::/48", port=5000)

@unittest.mock.patch("qmi.core.transport.QMI_TcpTransport")
def test_parse_tcp_attrs(self, mock):
trans = create_transport("tcp:localhost:1234:connect_timeout=1")
Expand Down
Loading