diff --git a/qmi/core/transport.py b/qmi/core/transport.py index ffe5e395..5f875d6f 100644 --- a/qmi/core/transport.py +++ b/qmi/core/transport.py @@ -5,10 +5,14 @@ import socket import sys import time -import vxi11 # type: ignore -from typing import Any, Mapping, Optional, List, Tuple, Type +if sys.version_info >= (3, 9): + from collections.abc import Mapping +else: + from typing import Mapping +from typing import Any, Dict, Optional, List, Tuple, Type import serial +import vxi11 # type: ignore from qmi.core.context import QMI_Context from qmi.core.exceptions import ( @@ -22,17 +26,16 @@ class QMI_Transport: - """QMI_Transport is the base class for bidirectional byte stream - transport implementations, typically used to talk to instruments. + """QMI_Transport is the base class for bidirectional byte stream transport implementations, + typically used to talk to instruments. - An instance of QMI_Transport represents a channel that admits reading - and writing of arbitrary byte sequences. Message boundaries are not - preserved. Subclasses of QMI_Transport implement the transport API - for specific types of communication channels. + An instance of QMI_Transport represents a channel that admits reading and writing of arbitrary + byte sequences. Message boundaries are not preserved. Subclasses of QMI_Transport implement the + transport API for specific types of communication channels. - Once created, a QMI_Transport needs to be opened via the open() method before reading and writing. When the - application has finished using the transport, it must call the close() method to close the underlying channel and - release system resources. + Once created, a QMI_Transport needs to be opened via the open() method before reading and writing. + When the application has finished using the transport, it must call the close() method to close + the underlying channel and release system resources. """ def __init__(self) -> None: @@ -179,20 +182,34 @@ def discard_read(self) -> None: class TransportDescriptorParser: - + """This class is for creating a transport-specific parser classes and has (static) methods that are + used for parsing transport strings. + """ def __init__(self, interface: str, positionals: List[Tuple[str, Tuple[Type, bool]]], - keywords: Mapping[str, Tuple[Type, bool]]): + keywords: Mapping[str, Tuple[Type, bool]] + ) -> None: self.interface = interface self._positionals = positionals self._keywords = keywords - def parse_parameter_strings(self, transport_descriptor: str, default_parameters=None) -> Mapping[str, Any]: + def parse_parameter_strings( + self, transport_descriptor: str, default_parameters: Optional[Mapping[str, Any]] = None + ) -> Dict[str, Any]: + """Method for parsing transport descriptor strings. + + Parameters: + transport_descriptor: The string to parse. + default_parameters: Dictionary of default parameters to be used if not present in the string. + + Returns: + parameters: A dictionary object of the parsed parameters. + """ if default_parameters is None: parameters = {} else: - parameters = default_parameters.copy() + parameters = dict(default_parameters) # Drop unexpected default parameters. # These may be intended for a different transport interface. @@ -216,13 +233,13 @@ def parse_parameter_strings(self, transport_descriptor: str, default_parameters= return parameters @staticmethod - def _parse_parts(transport_description: str) -> List[str]: + def _parse_parts(transport_descriptor: str) -> List[str]: regex = re.compile( r"((?:^([^:]+))|" # transport interface: i.e. serial:... r"(?::\[(.+)[\]$])|" # enclosed parameter (for example used in ipv6): i.e. ...:[param]:... or ...:[param] r"(?::([^:]+)))") # regular parameter: i.e. ...:param:... or ...:param parts = [] - for match in re.finditer(regex, transport_description): + for match in re.finditer(regex, transport_descriptor): if match[2]: # transport interface parts.append(match[2]) elif match[3]: # enclosed parameter @@ -231,9 +248,9 @@ def _parse_parts(transport_description: str) -> List[str]: parts.append(match[4]) else: raise QMI_TransportDescriptorException( - "Invalid transport descriptor {!r}".format(transport_description)) + "Invalid transport descriptor {!r}".format(transport_descriptor)) if len(parts) < 2: - raise QMI_TransportDescriptorException("Invalid transport descriptor {!r}".format(transport_description)) + raise QMI_TransportDescriptorException("Invalid transport descriptor {!r}".format(transport_descriptor)) return parts @staticmethod @@ -242,16 +259,17 @@ def _parse_interface(transport_descriptor: str) -> str: return parts[0] def match_interface(self, transport_descriptor: str) -> bool: + """A method to check the transport descriptor is used with the correct parser class.""" interface = self._parse_interface(transport_descriptor).lower() return self.interface == interface - def _check_missing_parameters(self, parameters: Mapping[str, Any]): + def _check_missing_parameters(self, parameters: Dict[str, Any]): req_params = self._get_required_parameters() missing_parameters = req_params.difference(parameters.keys()) if len(missing_parameters) > 0: raise QMI_TransportDescriptorException('Missing required parameter(s): {}'.format(missing_parameters)) - def _parse_positional_parameters(self, params: List[str]) -> Mapping[str, Any]: + def _parse_positional_parameters(self, params: List[str]) -> Dict[str, Any]: positional_params = [param for param in params if not self._is_keyword_param(param)] d = dict() for (name, (ty, _)), param in zip(self._positionals, positional_params): @@ -262,7 +280,7 @@ def _parse_positional_parameters(self, params: List[str]) -> Mapping[str, Any]: ty, param) return d - def _parse_keyword_parameters(self, strings: List[str]) -> Mapping[str, Any]: + def _parse_keyword_parameters(self, strings: List[str]) -> Dict[str, Any]: keyword_strings = [param for param in strings if self._is_keyword_param(param)] parameters = dict() for keyword_string in keyword_strings: @@ -273,7 +291,7 @@ def _parse_keyword_parameters(self, strings: List[str]) -> Mapping[str, Any]: if k in self._keywords.keys(): try: ty = self._keywords[k][0] - if ty == int and v.startswith('0x'): + if ty is int and v.startswith('0x'): parameters[k] = int(v, 16) else: parameters[k] = ty(v) @@ -344,15 +362,15 @@ class QMI_SerialTransport(QMI_Transport): """Byte stream transport via serial port. This class can also be used for "virtual" serial ports via USB. - """ - # Set a fixed read timeout on the serial port device. - # The actual specified timeout for read() and read_until() calls will be - # rounded up to a multiple of this fixed timeout. - # The timeout parameter of the serial port device must be fixed because - # changing the timeout causes reprogramming of the serial port parameters, - # which is a slow operation and can even cause data loss (with an FTDI - # device under Windows). + Attributes: + SERIAL_READ_TIMEOUT: Set a fixed read timeout on the serial port device. The actual specified timeout + for read() and read_until() calls will be rounded up to a multiple of this fixed + timeout. The timeout parameter of the serial port device must be fixed because + changing the timeout causes reprogramming of the serial port parameters, + which is a slow operation and can even cause data loss (with an FTDI + device under Windows). + """ SERIAL_READ_TIMEOUT = 0.040 # 40 ms def __init__(self, @@ -904,19 +922,16 @@ class QMI_UsbTmcTransport(QMI_Transport): * write() writes the specified bytes as a single USBTMC message. * read_until() reads a single USBTMC message (until the device indicates end-of-message) and returns the fetched bytes. + + Attributes: + DEFAULT_READ_TIMEOUT: Default timeout in seconds for USBTMC read transactions. + WRITE_TIMEOUT: Timeout in seconds for USBTMC write transactions. """ - # Default timeout in seconds for USBTMC read transactions. DEFAULT_READ_TIMEOUT = 60 - - # Timeout in seconds for USBTMC write transactions. WRITE_TIMEOUT = 5 - def __init__(self, - vendorid: int, - productid: int, - serialnr: str, - ) -> None: + def __init__(self, vendorid: int, productid: int, serialnr: str) -> None: """Initialize te specified USB device as USBTMC instrument. The first USBTMC-compatible interface of the USB device will be used. @@ -1115,7 +1130,7 @@ def _safe_instr(self) -> vxi11.Instrument: This aids in static typechecking, since whereas the type of _instr is Optional[T], the result of this method is guaranteed to be of type T. It is a QMI-internal bug if this property is used in case _instr is None. In that - case, we raise an AssertionError, and we hope the users will complain to us so we can fix the bug in the + case, we raise an AssertionError, and we hope the users will complain to us, so we can fix the bug in the library. Raises: AssertionError: in case the property is used when the underlying value of _instr is None. @@ -1274,55 +1289,54 @@ def list_usbtmc_transports() -> List[str]: return QMI_PyUsbTmcTransport.list_resources() -def create_transport(transport_descriptor: str, - default_attributes: Optional[Mapping[str, Any]] = None) -> QMI_Transport: +def create_transport( + transport_descriptor: str, default_attributes: Optional[Dict[str, Any]] = None +) -> QMI_Transport: """Create a bidirectional communication channel. - A transport_descriptor specifies all information that may be needed to - open a transport, including parameters such as port number, baud rate, etc. + A transport_descriptor specifies all information that may be needed to open a transport, including parameters + such as port number, baud rate, etc. Certain entries are obligatory, like giving the host IP address for UDP and + TCP transports. Other entries are optional, and are indicated with `<`, `>` characters. For those entries, if + not given, the string format below indicates the default value used in that case with the `=value` part. Do not + include the `<`, `>` characters in the strings. String format: - - UDP connection: "tcp:host[:port]" - - TCP connection: "tcp:host[:port][:connect_timeout=T]" - - Serial port: "serial:device[:baudrate=115200][:databits=8][:parity=N][:stopbits=1]" - - USBTMC device: "usbtmc[:vendorid=0xvid][:productid=0xpid]:serialnr=sn" - - GPIB device: "gpib:[board=0]:primary_addr[:secondary_addr=2][:connect_timeout=30.0]" - VXI-11 instrument: "vxi11:host" - - "host" (for UDP, TCP & VXI-11 transports) specifies the host name or IP address of - the UDP server/TCP client. Numerical IPv6 addresses must be enclosed in square brackets. - - "port" (for UDP and TCP transports) specifies the UDP/TCP port number of the server/client. - - "connect_timeout" is TCP connection timeout. Default is 10s. - - "device" (for serial port transports) is the name of the serial port, - for example "COM3" or "/dev/ttyUSB0". - - "baudrate" (for serial port transports) specifies the number of bits per second. - This attribute is only required for instruments with a configurable baud rate. - - "bytesize" (for serial port transports) specifies the number of data bits - per character (valid range 5 - 8). - This attribute is only required for instruments with a configurable character format. - - "parity" (for serial port transports) specifies the parity bit ('O' or 'E' or ''N'). - This attribute is only required for instruments with a configurable character format. - - "stopbits" (for serial port transports) specifies the number of stop bits (1 or 1.5 or 2). - This attribute is only required for instruments with a configurable character format. - - "rtscts" (for serial port transports) enables or disables RTS/CTS flow control. - Possible values are True and False; the default is False. - - "vendorid" is the USB Vendor ID as a decimal number or as hexadecimal with 0x prefix. - "productid" is the USB Product ID as a decimal number or as hexadecimal with 0x prefix. - "serialnr" is the USB serial number string. - - "primary_addr" is GPIB device number (integer). - "board" is optional GPIB interface number (GPIB[board]::...). Default is None. - "secondary_addr" is optional secondary device address number. Default is None. - "connect_timeout" is for opening resource for GPIB device, in seconds; the default is 30s. + - UDP connection: "udp:host<:port>" + - TCP connection: "tcp:host<:port><:connect_timeout=10>" + - Serial port: "serial:device<:baudrate=115200><:databits=8><:parity=N><:stopbits=1>" + - USBTMC device: "usbtmc:vendorid:productid:serialnr" + - GPIB device: "gpib:primary_addr<:secondary_addr=None><:connect_timeout=30.0>" + + UDP, TCP and VXI-11: + - "host" (for UDP, TCP & VXI-11 transports) specifies the host name or IP address of the UDP server/TCP client. + Numerical IPv6 addresses must be enclosed in square brackets, e.g. "tcp:[2620:0:2d0:200::8]:5000". + - "port" (for UDP and TCP transports) specifies the UDP/TCP port number of the server/client. + - "connect_timeout" is TCP connection timeout. + + Serial: + - "device" is the name of the serial port, for example "COM3" or "/dev/ttyUSB0". + - "baudrate" specifies the number of bits per second. + This attribute is only required for instruments with a configurable baud rate. + - "bytesize" specifies the number of data bits per character (valid range 5 - 8). + This attribute is only required for instruments with a configurable character format. + - "parity" specifies the parity bit ('O' or 'E' or ''N'). + This attribute is only required for instruments with a configurable character format. + - "stopbits" specifies the number of stop bits (1 or 1.5 or 2). + This attribute is only required for instruments with a configurable character format. + - "rtscts" enables or disables RTS/CTS flow control. + Possible values are True and False; the default is False. + + USBTMC: + - "vendorid" is the USB Vendor ID as a decimal number or as hexadecimal with 0x prefix. + - "productid" is the USB Product ID as a decimal number or as hexadecimal with 0x prefix. + - "serialnr" is the USB serial number string. + + GPIB: + - "primary_addr" is GPIB device number (integer). + - "board" is optional GPIB interface number (in VISA syntax GPIB[board]::...). + - "secondary_addr" is optional secondary device address number. + - "connect_timeout" is for opening resource for GPIB device, in seconds. """ if SerialTransportDescriptorParser.match_interface(transport_descriptor): attributes = SerialTransportDescriptorParser.parse_parameter_strings(transport_descriptor, default_attributes) diff --git a/qmi/core/transport_gpib_visa.py b/qmi/core/transport_gpib_visa.py index 83ccfe27..8940652a 100644 --- a/qmi/core/transport_gpib_visa.py +++ b/qmi/core/transport_gpib_visa.py @@ -47,7 +47,7 @@ def __init__( board: Optional[int] = None, secondary_addr: Optional[int] = None, connect_timeout: float = 30.0 - ): + ) -> None: """Initialization of the Gpib transport. Parameters: diff --git a/qmi/core/transport_usbtmc_pyusb.py b/qmi/core/transport_usbtmc_pyusb.py index a895f041..87bebfd1 100644 --- a/qmi/core/transport_usbtmc_pyusb.py +++ b/qmi/core/transport_usbtmc_pyusb.py @@ -11,7 +11,7 @@ class QMI_PyUsbTmcTransport(QMI_UsbTmcTransport): - def __init__(self, vendorid: int, productid: int, serialnr: str): + def __init__(self, vendorid: int, productid: int, serialnr: str) -> None: super().__init__(vendorid, productid, serialnr) self._device: Optional[usbtmc.Instrument] = None diff --git a/qmi/core/transport_usbtmc_visa.py b/qmi/core/transport_usbtmc_visa.py index 4442098c..6d8b0f3a 100644 --- a/qmi/core/transport_usbtmc_visa.py +++ b/qmi/core/transport_usbtmc_visa.py @@ -24,7 +24,7 @@ class QMI_VisaUsbTmcTransport(QMI_UsbTmcTransport): - def __init__(self, vendorid: int, productid: int, serialnr: str): + def __init__(self, vendorid: int, productid: int, serialnr: str) -> None: super().__init__(vendorid, productid, serialnr) self._device: Optional[pyvisa.ResourceManager] = None diff --git a/tests/core/test_transport.py b/tests/core/test_transport.py index 8a603bf9..c5f5f493 100644 --- a/tests/core/test_transport.py +++ b/tests/core/test_transport.py @@ -220,6 +220,12 @@ def test_parse_tcp_ipv6(self, mock): self.assertIs(trans, mock.return_value) mock.assert_called_once_with(host="2620:0:2d0:200::8", port=5000) + @unittest.mock.patch("qmi.core.transport.QMI_TcpTransport") + def test_parse_tcp_ipv6_range(self, mock): + trans = create_transport("tcp:[ 2001:db8:1234::/48]:5000") + self.assertIs(trans, mock.return_value) + mock.assert_called_once_with(host=" 2001:db8:1234::/48", port=5000) + @unittest.mock.patch("qmi.core.transport.QMI_TcpTransport") def test_parse_tcp_attrs(self, mock): trans = create_transport("tcp:localhost:1234:connect_timeout=1")